refactor(allanime): enhance error handling in API response processing

This commit is contained in:
benex
2024-12-02 11:41:51 +03:00
parent ddbb4ca451
commit 0b9da27def

View File

@@ -1,7 +1,6 @@
import json
import logging
from typing import TYPE_CHECKING
from ...anime_provider.base_provider import AnimeProvider
from ..decorators import debug_provider
from ..utils import give_random_quality, one_digit_symmetric_xor
@@ -14,11 +13,46 @@ logger = logging.getLogger(__name__)
class AllAnime(AnimeProvider):
"""
AllAnime is a provider class for fetching anime data from the AllAnime API.
Attributes:
HEADERS (dict): Default headers for API requests.
Methods:
_execute_graphql_query(query: str, variables: dict) -> dict:
Executes a GraphQL query and returns the response data.
search_for_anime(
**kwargs
) -> dict:
Searches for anime based on the provided keywords and other parameters.
get_anime(show_id: str) -> dict:
Retrieves detailed information about a specific anime by its ID.
_get_anime_episode(
show_id: str, episode, translation_type: str = "sub"
Retrieves information about a specific episode of an anime.
get_episode_streams(
) -> generator:
Retrieves streaming links for a specific episode of an anime.
"""
HEADERS = {
"Referer": API_REFERER,
}
def _execute_graphql_query(self, query: str, variables: dict):
"""
Executes a GraphQL query using the provided query string and variables.
Args:
query (str): The GraphQL query string to be executed.
variables (dict): A dictionary of variables to be used in the query.
Returns:
dict: The JSON response data from the GraphQL API.
Raises:
requests.exceptions.HTTPError: If the HTTP request returned an unsuccessful status code.
"""
response = self.session.get(
API_ENDPOINT,
params={
@@ -27,11 +61,8 @@ class AllAnime(AnimeProvider):
},
timeout=10,
)
if response.ok:
return response.json()["data"]
else:
logger.error(response.text)
return {}
response.raise_for_status()
return response.json()["data"]
@debug_provider
def search_for_anime(
@@ -46,95 +77,140 @@ class AllAnime(AnimeProvider):
unknown=True,
**kwargs,
):
search = {"allowAdult": nsfw, "allowUnknown": unknown, "query": search_keywords}
variables = {
"search": search,
"limit": limit,
"page": page,
"translationtype": translation_type,
"countryorigin": country_of_origin,
"""
Search for anime based on given keywords and filters.
Args:
search_keywords (str): The keywords to search for.
translation_type (str, optional): The type of translation to search for (e.g., "sub" or "dub"). Defaults to "sub".
limit (int, optional): The maximum number of results to return. Defaults to 40.
page (int, optional): The page number to return. Defaults to 1.
country_of_origin (str, optional): The country of origin filter. Defaults to "all".
nsfw (bool, optional): Whether to include adult content in the search results. Defaults to True.
unknown (bool, optional): Whether to include unknown content in the search results. Defaults to True.
**kwargs: Additional keyword arguments.
Returns:
dict: A dictionary containing the page information and a list of search results. Each result includes:
- id (str): The ID of the anime.
- title (str): The title of the anime.
- type (str): The type of the anime.
- availableEpisodes (int): The number of available episodes.
"""
search_results = self._execute_graphql_query(
SEARCH_GQL,
variables={
"search": {
"allowAdult": nsfw,
"allowUnknown": unknown,
"query": search_keywords,
},
"limit": limit,
"page": page,
"translationtype": translation_type,
"countryorigin": country_of_origin,
},
)
return {
"pageInfo": search_results["shows"]["pageInfo"],
"results": [
{
"id": result["_id"],
"title": result["name"],
"type": result["__typename"],
"availableEpisodes": result["availableEpisodes"],
}
for result in search_results["shows"]["edges"]
],
}
search_results = self._execute_graphql_query(SEARCH_GQL, variables)
page_info = search_results["shows"]["pageInfo"]
results = []
for result in search_results["shows"]["edges"]:
normalized_result = {
"id": result["_id"],
"title": result["name"],
"type": result["__typename"],
"availableEpisodes": result["availableEpisodes"],
}
results.append(normalized_result)
normalized_search_results = {
"pageInfo": page_info,
"results": results,
}
return normalized_search_results
@debug_provider
def get_anime(self, show_id: str):
variables = {"showId": show_id}
anime = self._execute_graphql_query(SHOW_GQL, variables)
id: str = anime["show"]["_id"]
title: str = anime["show"]["name"]
availableEpisodesDetail = anime["show"]["availableEpisodesDetail"]
self.store.set(show_id, "anime_info", {"title": title})
type = anime.get("__typename")
normalized_anime = {
"id": id,
"title": title,
"availableEpisodesDetail": availableEpisodesDetail,
"type": type,
def get_anime(self, id: str):
"""
Fetches anime details using the provided show ID.
Args:
id (str): The ID of the anime show to fetch details for.
Returns:
dict: A dictionary containing the anime details, including:
- id (str): The unique identifier of the anime show.
- title (str): The title of the anime show.
- availableEpisodesDetail (list): A list of available episodes details.
- type (str, optional): The type of the anime show.
"""
anime = self._execute_graphql_query(SHOW_GQL, variables={"showId": id})
self.store.set(id, "anime_info", {"title": anime["show"]["name"]})
return {
"id": anime["show"]["_id"],
"title": anime["show"]["name"],
"availableEpisodesDetail": anime["show"]["availableEpisodesDetail"],
"type": anime.get("__typename"),
}
return normalized_anime
@debug_provider
def _get_anime_episode(
self, show_id: str, episode, translation_type: str = "sub"
) -> "AllAnimeEpisode | dict":
variables = {
"showId": show_id,
"translationType": translation_type,
"episodeString": episode,
}
episode = self._execute_graphql_query(EPISODES_GQL, variables)
return episode["episode"]
self, anime_id: str, episode, translation_type: str = "sub"
) -> "AllAnimeEpisode":
"""
Fetches a specific episode of an anime by its ID and episode number.
Args:
anime_id (str): The unique identifier of the anime.
episode (str): The episode number or string identifier.
translation_type (str, optional): The type of translation for the episode. Defaults to "sub".
Returns:
AllAnimeEpisode: The episode details retrieved from the GraphQL query.
"""
return self._execute_graphql_query(
EPISODES_GQL,
variables={
"showId": anime_id,
"translationType": translation_type,
"episodeString": episode,
},
)["episode"]
@debug_provider
def get_episode_streams(
self, anime_id, episode_number: str, translation_type="sub"
):
"""
Retrieve streaming information for a specific episode of an anime.
Args:
anime_id (str): The unique identifier for the anime.
episode_number (str): The episode number to retrieve streams for.
translation_type (str, optional): The type of translation for the episode (e.g., "sub" for subtitles). Defaults to "sub".
Yields:
dict: A dictionary containing streaming information for the episode, including:
- server (str): The name of the streaming server.
- episode_title (str): The title of the episode.
- headers (dict): HTTP headers required for accessing the stream.
- subtitles (list): A list of subtitles available for the episode.
- links (list): A list of dictionaries containing streaming links and their quality.
"""
anime_title = (self.store.get(anime_id, "anime_info", "") or {"title": ""})[
"title"
]
allanime_episode = self._get_anime_episode(
anime_id, episode_number, translation_type
)
if not allanime_episode:
return []
embeds = allanime_episode["sourceUrls"]
@debug_provider
def _get_server(embed):
# filter the working streams no need to get all since the others are mostly hsl
# TODO: should i just get all the servers and handle the hsl??
if embed.get("sourceName", "") not in (
# priorities based on death note
"Sak", # 7
"S-mp4", # 7.9
"Luf-mp4", # 7.7
"Default", # 8.5
"Yt-mp4", # 7.9
"Kir", # NA
# "Vid-mp4" # 4
# "Ok", # 3.5
# "Ss-Hls", # 5.5
# "Mp4", # 4
):
return
"""
Retrieves the streaming server information based on the provided embed data.
Args:
embed (dict): A dictionary containing the embed data with keys such as "sourceUrl" and "sourceName".
Returns:
dict or None: A dictionary containing server information, headers, subtitles, episode title, and links if a valid URL is found and processed.
Returns None if the URL is not valid or not found.
The function performs the following steps:
1. Extracts the URL from the embed data.
2. Decodes the URL if it starts with "--".
3. Checks if the URL contains specific server identifiers and returns the corresponding server information.
4. Fetches the stream URL for an episode from the defined source names and returns the server information based on the source name.
"""
url = embed.get("sourceUrl")
#
if not url:
@@ -158,75 +234,80 @@ class AllAnime(AnimeProvider):
}
# get the stream url for an episode of the defined source names
embed_url = f"https://{API_BASE_URL}{url.replace('clock', 'clock.json')}"
resp = self.session.get(
embed_url,
response = self.session.get(
f"https://{API_BASE_URL}{url.replace('clock', 'clock.json')}",
timeout=10,
)
if resp.ok:
match embed["sourceName"]:
case "Luf-mp4":
logger.debug("Found streams from gogoanime")
return {
"server": "gogoanime",
"headers": {"Referer": f"https://{API_BASE_URL}/"},
"subtitles": [],
"episode_title": (
allanime_episode["notes"] or f"{anime_title}"
)
+ f"; Episode {episode_number}",
"links": give_random_quality(resp.json()["links"]),
}
case "Kir":
logger.debug("Found streams from wetransfer")
return {
"server": "wetransfer",
"headers": {"Referer": f"https://{API_BASE_URL}/"},
"subtitles": [],
"episode_title": (
allanime_episode["notes"] or f"{anime_title}"
)
+ f"; Episode {episode_number}",
"links": give_random_quality(resp.json()["links"]),
}
case "S-mp4":
logger.debug("Found streams from sharepoint")
return {
"server": "sharepoint",
"headers": {"Referer": f"https://{API_BASE_URL}/"},
"subtitles": [],
"episode_title": (
allanime_episode["notes"] or f"{anime_title}"
)
+ f"; Episode {episode_number}",
"links": give_random_quality(resp.json()["links"]),
}
case "Sak":
logger.debug("Found streams from dropbox")
return {
"server": "dropbox",
"headers": {"Referer": f"https://{API_BASE_URL}/"},
"subtitles": [],
"episode_title": (
allanime_episode["notes"] or f"{anime_title}"
)
+ f"; Episode {episode_number}",
"links": give_random_quality(resp.json()["links"]),
}
case "Default":
logger.debug("Found streams from wixmp")
return {
"server": "wixmp",
"headers": {"Referer": f"https://{API_BASE_URL}/"},
"subtitles": [],
"episode_title": (
allanime_episode["notes"] or f"{anime_title}"
)
+ f"; Episode {episode_number}",
"links": give_random_quality(resp.json()["links"]),
}
response.raise_for_status()
match embed["sourceName"]:
case "Luf-mp4":
logger.debug("Found streams from gogoanime")
return {
"server": "gogoanime",
"headers": {"Referer": f"https://{API_BASE_URL}/"},
"subtitles": [],
"episode_title": (allanime_episode["notes"] or f"{anime_title}")
+ f"; Episode {episode_number}",
"links": give_random_quality(response.json()["links"]),
}
case "Kir":
logger.debug("Found streams from wetransfer")
return {
"server": "wetransfer",
"headers": {"Referer": f"https://{API_BASE_URL}/"},
"subtitles": [],
"episode_title": (allanime_episode["notes"] or f"{anime_title}")
+ f"; Episode {episode_number}",
"links": give_random_quality(response.json()["links"]),
}
case "S-mp4":
logger.debug("Found streams from sharepoint")
return {
"server": "sharepoint",
"headers": {"Referer": f"https://{API_BASE_URL}/"},
"subtitles": [],
"episode_title": (allanime_episode["notes"] or f"{anime_title}")
+ f"; Episode {episode_number}",
"links": give_random_quality(response.json()["links"]),
}
case "Sak":
logger.debug("Found streams from dropbox")
return {
"server": "dropbox",
"headers": {"Referer": f"https://{API_BASE_URL}/"},
"subtitles": [],
"episode_title": (allanime_episode["notes"] or f"{anime_title}")
+ f"; Episode {episode_number}",
"links": give_random_quality(response.json()["links"]),
}
case "Default":
logger.debug("Found streams from wixmp")
return {
"server": "wixmp",
"headers": {"Referer": f"https://{API_BASE_URL}/"},
"subtitles": [],
"episode_title": (allanime_episode["notes"] or f"{anime_title}")
+ f"; Episode {episode_number}",
"links": give_random_quality(response.json()["links"]),
}
for embed in embeds:
for embed in allanime_episode["sourceUrls"]:
# filter the working streams no need to get all since the others are mostly hsl
# TODO: should i just get all the servers and handle the hsl??
if embed.get("sourceName", "") not in (
# priorities based on death note
"Sak", # 7
"S-mp4", # 7.9
"Luf-mp4", # 7.7
"Default", # 8.5
"Yt-mp4", # 7.9
"Kir", # NA
# "Vid-mp4" # 4
# "Ok", # 3.5
# "Ss-Hls", # 5.5
# "Mp4", # 4
):
continue
if server := _get_server(embed):
yield server