fix: Cannot fetch anime with a certain title

- added a replacing word dictionary
 - added a manual cache dictionary ID -> SearchResult to get more accurate results.
This commit is contained in:
axtrat
2025-08-24 17:01:09 +02:00
parent ff2a5d635a
commit c5c7644d0d
3 changed files with 91 additions and 55 deletions

View File

@@ -5,5 +5,7 @@ ANIMEUNITY_BASE = f"https://www.{ANIMEUNITY}"
MAX_TIMEOUT = 10
REPLACEMENT_WORDS = {"Season ": "", "Cour": "Part"}
TOKEN_REGEX = re.compile(r'<meta.*?name="csrf-token".*?content="([^"]*)".*?>')
DOWNLOAD_URL_REGEX = re.compile(r"window.downloadUrl\s*=\s*'([^']*)'")

View File

@@ -1,51 +1,59 @@
from typing import Literal
from ..types import (
Anime,
AnimeEpisodeInfo,
AnimeEpisodes,
EpisodeStream,
MediaTranslationType,
PageInfo,
SearchResult,
SearchResults,
Server,
)
translation_type_map = {
"sub": MediaTranslationType.SUB,
"dub": MediaTranslationType.DUB,
"raw": MediaTranslationType.RAW,
}
def map_to_search_results(
data: dict, translation_type: Literal["sub", "dub"]
) -> SearchResults:
results = []
for result in data:
mapped_result = map_to_search_result(result, translation_type)
if mapped_result:
results.append(mapped_result)
def map_to_search_results(data: dict) -> SearchResults:
return SearchResults(
page_info=PageInfo(),
results=[
SearchResult(
id=str(result["id"]),
title=get_real_title(result),
episodes=AnimeEpisodes(
sub=(
list(map(str, range(1, get_episodes_count(result) + 1)))
if result["dub"] == 0
else []
),
dub=(
list(map(str, range(1, get_episodes_count(result) + 1)))
if result["dub"] == 1
else []
),
),
# other_titles=[title for title in [result["title_eng"], result["title_it"]] if title],
media_type=result["type"],
score=result["score"],
status=result["status"],
season=result["season"],
poster=result["imageurl"],
year=result["date"],
)
for result in data
],
results=results,
)
def map_to_search_result(
data: dict, translation_type: Literal["sub", "dub"] | None
) -> SearchResult | None:
if translation_type and data["dub"] != 1 if translation_type == "dub" else 0:
return None
return SearchResult(
id=str(data["id"]),
title=get_real_title(data),
episodes=AnimeEpisodes(
sub=(
list(map(str, range(1, get_episodes_count(data) + 1)))
if data["dub"] == 0
else []
),
dub=(
list(map(str, range(1, get_episodes_count(data) + 1)))
if data["dub"] == 1
else []
),
),
# other_titles=[title for title in [result["title_eng"], result["title_it"]] if title],
media_type=data["type"],
score=data["score"],
status=data["status"],
# season=result["season"],
poster=data["imageurl"],
year=data["date"],
)
@@ -69,10 +77,7 @@ def map_to_anime_result(data: list, search_result: SearchResult) -> Anime:
AnimeEpisodeInfo(
id=str(episode["id"]),
episode=episode["number"],
# session_id=episode.get("session_id"),
title=f"{search_result.title} - Ep {episode['number']}",
# poster=episode["tg_post"],
# duration=episode.get("duration"),
)
for episode in data
],
@@ -88,7 +93,6 @@ def map_to_server(episode: AnimeEpisodeInfo, download_url: str) -> Server:
links=[
EpisodeStream(
link=download_url,
# translation_type=translation_type_map[episode.]
)
],
episode_title=episode.title,
@@ -111,7 +115,6 @@ def get_episodes_count(record: dict) -> int:
"""
Return the number of episodes from the record.
"""
if record.get("episodes_count", 0) > 0:
return record["episodes_count"]
else:
return record.get("real_episodes_count", 0)
if (count := record.get("real_episodes_count", 0)) > 0:
return count
return record.get("episodes_count", 0)

View File

@@ -6,8 +6,19 @@ from ..base import BaseAnimeProvider
from ..params import AnimeParams, EpisodeStreamsParams, SearchParams
from ..types import Anime, AnimeEpisodeInfo, SearchResult, SearchResults
from ..utils.debug import debug_provider
from .constants import ANIMEUNITY_BASE, DOWNLOAD_URL_REGEX, MAX_TIMEOUT, TOKEN_REGEX
from .mappers import map_to_anime_result, map_to_search_results, map_to_server
from .constants import (
ANIMEUNITY_BASE,
DOWNLOAD_URL_REGEX,
MAX_TIMEOUT,
REPLACEMENT_WORDS,
TOKEN_REGEX,
)
from .mappers import (
map_to_anime_result,
map_to_search_result,
map_to_search_results,
map_to_server,
)
logger = logging.getLogger(__name__)
@@ -16,6 +27,7 @@ class AnimeUnity(BaseAnimeProvider):
HEADERS = {
"User-Agent": UserAgentGenerator().random(),
}
_cache = dict[str, SearchResult]()
@lru_cache
def _get_token(self) -> None:
@@ -36,18 +48,32 @@ class AnimeUnity(BaseAnimeProvider):
@debug_provider
def search(self, params: SearchParams) -> SearchResults | None:
return self._search(params)
if not (res := self._search(params)):
return None
for result in res.results:
self._cache[result.id] = result
return res
@lru_cache
def _search(self, params: SearchParams) -> SearchResults | None:
self._get_token()
# Replace words in query to
query = params.query
for old, new in REPLACEMENT_WORDS.items():
query = query.replace(old, new)
response = self.client.post(
url=f"{ANIMEUNITY_BASE}/livesearch",
data={"title": params.query},
data={"title": query},
timeout=MAX_TIMEOUT,
)
response.raise_for_status()
return map_to_search_results(response.json().get("records", []))
return map_to_search_results(
response.json().get("records", []), params.translation_type
)
@debug_provider
def get(self, params: AnimeParams) -> Anime | None:
@@ -55,18 +81,23 @@ class AnimeUnity(BaseAnimeProvider):
@lru_cache()
def _get_search_result(self, params: AnimeParams) -> SearchResult | None:
search_results = self._search(SearchParams(query=params.query))
if not search_results or not search_results.results:
logger.error(f"No search results found for ID {params.id}")
return None
for search_result in search_results.results:
if search_result.id == params.id:
return search_result
if cached := self._cache.get(params.id):
return cached
response = self.client.get(
url=f"{ANIMEUNITY_BASE}/info_api/{params.id}/",
timeout=MAX_TIMEOUT,
)
response.raise_for_status()
data = response.json()
if res := map_to_search_result(data, None):
self._cache[params.id] = res
return res
@lru_cache
def _get_anime(self, params: AnimeParams) -> Anime | None:
search_result = self._get_search_result(params)
if not search_result:
if (search_result := self._get_search_result(params)) is None:
logger.error(f"No search result found for ID {params.id}")
return None