feat(allanime): improve error handling for stream source retrieval

This commit is contained in:
benex
2024-12-02 23:19:18 +03:00
parent 383cb62ede
commit 5f721847d7

View File

@@ -178,6 +178,210 @@ class AllAnime(AnimeProvider):
},
)["episode"]
@debug_provider
def _get_server(self, embed, anime_title: str, allanime_episode: "AllAnimeEpisode"):
"""
Retrieves the streaming server information for a given anime episode based on the provided embed data.
Args:
embed (dict): A dictionary containing the embed data, including the source URL and source name.
anime_title (str): The title of the anime.
allanime_episode (AllAnimeEpisode): An object representing the episode details.
Returns:
dict: A dictionary containing server information, headers, subtitles, episode title, and links to the stream.
Returns None if no valid URL or stream is found.
Raises:
requests.exceptions.RequestException: If there is an issue with the HTTP request.
"""
url = embed.get("sourceUrl")
#
if not url:
return
if url.startswith("--"):
url = one_digit_symmetric_xor(56, url[2:])
# FIRST CASE
match embed["sourceName"]:
case "Yt-mp4":
logger.debug("Found streams from Yt")
return {
"server": "Yt",
"episode_title": f"{anime_title}; Episode {episode_number}",
"headers": {"Referer": f"https://{API_BASE_URL}/"},
"subtitles": [],
"links": [
{
"link": url,
"quality": "1080",
}
],
}
case "Mp4":
logger.debug("Found streams from Mp4")
response = self.session.get(
url,
fresh=1, # pyright: ignore
timeout=10,
)
response.raise_for_status()
embed_html = response.text.replace(" ", "").replace("\n", "")
vid = MP4_SERVER_JUICY_STREAM_REGEX.search(embed_html)
if not vid:
return
return {
"server": "mp4-upload",
"headers": {"Referer": "https://www.mp4upload.com/"},
"subtitles": [],
"episode_title": (allanime_episode["notes"] or f"{anime_title}")
+ f"; Episode {episode_number}",
"links": [{"link": vid.group(1), "quality": "1080"}],
}
case "Fm-Hls":
# TODO: requires decoding obsfucated js (filemoon)
logger.debug("Found streams from Fm-Hls")
response = self.session.get(
url,
timeout=10,
)
response.raise_for_status()
embed_html = response.text.replace(" ", "").replace("\n", "")
vid = MP4_SERVER_JUICY_STREAM_REGEX.search(embed_html)
if not vid:
return
return {
"server": "filemoon",
"headers": {"Referer": "https://www.mp4upload.com/"},
"subtitles": [],
"episode_title": (allanime_episode["notes"] or f"{anime_title}")
+ f"; Episode {episode_number}",
"links": [{"link": vid.group(1), "quality": "1080"}],
}
case "Ok":
# TODO: requires decoding the obsfucated js (filemoon)
response = self.session.get(
url,
timeout=10,
)
response.raise_for_status()
embed_html = response.text.replace(" ", "").replace("\n", "")
vid = MP4_SERVER_JUICY_STREAM_REGEX.search(embed_html)
logger.debug("Found streams from Ok")
return {
"server": "filemoon",
"headers": {"Referer": f"https://{API_BASE_URL}/"},
"subtitles": [],
"episode_title": (allanime_episode["notes"] or f"{anime_title}")
+ f"; Episode {episode_number}",
"links": give_random_quality(response.json()["links"]),
}
case "Vid-mp4":
# TODO: requires some serious work i think : )
response = self.session.get(
url,
timeout=10,
)
response.raise_for_status()
embed_html = response.text.replace(" ", "").replace("\n", "")
logger.debug("Found streams from vid-mp4")
return {
"server": "Vid-mp4",
"headers": {"Referer": f"https://{API_BASE_URL}/"},
"subtitles": [],
"episode_title": (allanime_episode["notes"] or f"{anime_title}")
+ f"; Episode {episode_number}",
"links": give_random_quality(response.json()["links"]),
}
case "Ss-Hls":
# TODO: requires some serious work i think : )
response = self.session.get(
url,
timeout=10,
)
response.raise_for_status()
embed_html = response.text.replace(" ", "").replace("\n", "")
logger.debug("Found streams from Ss-Hls")
return {
"server": "StreamSb",
"headers": {"Referer": f"https://{API_BASE_URL}/"},
"subtitles": [],
"episode_title": (allanime_episode["notes"] or f"{anime_title}")
+ f"; Episode {episode_number}",
"links": give_random_quality(response.json()["links"]),
}
# get the stream url for an episode of the defined source names
response = self.session.get(
f"https://{API_BASE_URL}{url.replace('clock', 'clock.json')}",
timeout=10,
)
response.raise_for_status()
# SECOND CASE
match embed["sourceName"]:
case "Luf-mp4":
logger.debug("Found streams from gogoanime")
return {
"server": "gogoanime",
"headers": {"Referer": f"https://{API_BASE_URL}/"},
"subtitles": [],
"episode_title": (allanime_episode["notes"] or f"{anime_title}")
+ f"; Episode {episode_number}",
"links": give_random_quality(response.json()["links"]),
}
case "Kir":
logger.debug("Found streams from wetransfer")
return {
"server": "weTransfer",
"headers": {"Referer": f"https://{API_BASE_URL}/"},
"subtitles": [],
"episode_title": (allanime_episode["notes"] or f"{anime_title}")
+ f"; Episode {episode_number}",
"links": give_random_quality(response.json()["links"]),
}
case "S-mp4":
logger.debug("Found streams from sharepoint")
return {
"server": "sharepoint",
"headers": {"Referer": f"https://{API_BASE_URL}/"},
"subtitles": [],
"episode_title": (allanime_episode["notes"] or f"{anime_title}")
+ f"; Episode {episode_number}",
"links": give_random_quality(response.json()["links"]),
}
case "Sak":
logger.debug("Found streams from dropbox")
return {
"server": "dropbox",
"headers": {"Referer": f"https://{API_BASE_URL}/"},
"subtitles": [],
"episode_title": (allanime_episode["notes"] or f"{anime_title}")
+ f"; Episode {episode_number}",
"links": give_random_quality(response.json()["links"]),
}
case "Default":
logger.debug("Found streams from wixmp")
return {
"server": "wixmp",
"headers": {"Referer": f"https://{API_BASE_URL}/"},
"subtitles": [],
"episode_title": (allanime_episode["notes"] or f"{anime_title}")
+ f"; Episode {episode_number}",
"links": give_random_quality(response.json()["links"]),
}
case "Ak":
# TODO: works but needs further probing
logger.debug("Found streams from Ak")
return {
"server": "Ak",
"headers": {"Referer": f"https://{API_BASE_URL}/"},
"subtitles": [],
"episode_title": (allanime_episode["notes"] or f"{anime_title}")
+ f"; Episode {episode_number}",
"links": give_random_quality(response.json()["links"]),
}
@debug_provider
def get_episode_streams(
self, anime_id, episode_number: str, translation_type="sub"
@@ -203,213 +407,7 @@ class AllAnime(AnimeProvider):
anime_id, episode_number, translation_type
)
@debug_provider
def _get_server(embed):
"""
Retrieves the streaming server information based on the provided embed data.
Args:
embed (dict): A dictionary containing the embed data with keys such as "sourceUrl" and "sourceName".
Returns:
dict or None: A dictionary containing server information, headers, subtitles, episode title, and links if a valid URL is found and processed.
Returns None if the URL is not valid or not found.
The function performs the following steps:
1. Extracts the URL from the embed data.
2. Decodes the URL if it starts with "--".
3. Checks if the URL contains specific server identifiers and returns the corresponding server information.
4. Fetches the stream URL for an episode from the defined source names and returns the server information based on the source name.
"""
url = embed.get("sourceUrl")
#
if not url:
return
if url.startswith("--"):
url = one_digit_symmetric_xor(56, url[2:])
match embed["sourceName"]:
case "Yt-mp4":
logger.debug("Found streams from Yt")
return {
"server": "Yt",
"episode_title": f"{anime_title}; Episode {episode_number}",
"headers": {"Referer": f"https://{API_BASE_URL}/"},
"subtitles": [],
"links": [
{
"link": url,
"quality": "1080",
}
],
}
case "Mp4":
logger.debug("Found streams from Mp4")
response = self.session.get(
url,
timeout=10,
)
response.raise_for_status()
embed_html = response.text.replace(" ", "").replace("\n", "")
vid = MP4_SERVER_JUICY_STREAM_REGEX.search(embed_html)
if not vid:
return
return {
"server": "mp4-upload",
"headers": {"Referer": "https://www.mp4upload.com/"},
"subtitles": [],
"episode_title": (allanime_episode["notes"] or f"{anime_title}")
+ f"; Episode {episode_number}",
"links": [{"link": vid.group(1), "quality": "1080"}],
}
case "Fm-Hls":
# TODO: requires decoding obsfucated js (filemoon)
logger.debug("Found streams from Fm-Hls")
response = self.session.get(
url,
timeout=10,
)
response.raise_for_status()
embed_html = response.text.replace(" ", "").replace("\n", "")
vid = MP4_SERVER_JUICY_STREAM_REGEX.search(embed_html)
if not vid:
return
return {
"server": "filemoon",
"headers": {"Referer": "https://www.mp4upload.com/"},
"subtitles": [],
"episode_title": (allanime_episode["notes"] or f"{anime_title}")
+ f"; Episode {episode_number}",
"links": [{"link": vid.group(1), "quality": "1080"}],
}
case "Ok":
# TODO: requires decoding the obsfucated js (filemoon)
response = self.session.get(
url,
timeout=10,
)
response.raise_for_status()
embed_html = response.text.replace(" ", "").replace("\n", "")
vid = MP4_SERVER_JUICY_STREAM_REGEX.search(embed_html)
logger.debug("Found streams from Ok")
return {
"server": "filemoon",
"headers": {"Referer": f"https://{API_BASE_URL}/"},
"subtitles": [],
"episode_title": (allanime_episode["notes"] or f"{anime_title}")
+ f"; Episode {episode_number}",
"links": give_random_quality(response.json()["links"]),
}
case "Vid-mp4":
# TODO: requires some serious work i think : )
response = self.session.get(
url,
timeout=10,
)
response.raise_for_status()
embed_html = response.text.replace(" ", "").replace("\n", "")
logger.debug("Found streams from vid-mp4")
return {
"server": "Vid-mp4",
"headers": {"Referer": f"https://{API_BASE_URL}/"},
"subtitles": [],
"episode_title": (allanime_episode["notes"] or f"{anime_title}")
+ f"; Episode {episode_number}",
"links": give_random_quality(response.json()["links"]),
}
case "Ss-Hls":
# TODO: requires some serious work i think : )
response = self.session.get(
url,
timeout=10,
)
response.raise_for_status()
embed_html = response.text.replace(" ", "").replace("\n", "")
logger.debug("Found streams from Ss-Hls")
return {
"server": "StreamSb",
"headers": {"Referer": f"https://{API_BASE_URL}/"},
"subtitles": [],
"episode_title": (allanime_episode["notes"] or f"{anime_title}")
+ f"; Episode {episode_number}",
"links": give_random_quality(response.json()["links"]),
}
# get the stream url for an episode of the defined source names
response = self.session.get(
f"https://{API_BASE_URL}{url.replace('clock', 'clock.json')}",
timeout=10,
)
response.raise_for_status()
match embed["sourceName"]:
case "Luf-mp4":
logger.debug("Found streams from gogoanime")
return {
"server": "gogoanime",
"headers": {"Referer": f"https://{API_BASE_URL}/"},
"subtitles": [],
"episode_title": (allanime_episode["notes"] or f"{anime_title}")
+ f"; Episode {episode_number}",
"links": give_random_quality(response.json()["links"]),
}
case "Kir":
logger.debug("Found streams from wetransfer")
return {
"server": "weTransfer",
"headers": {"Referer": f"https://{API_BASE_URL}/"},
"subtitles": [],
"episode_title": (allanime_episode["notes"] or f"{anime_title}")
+ f"; Episode {episode_number}",
"links": give_random_quality(response.json()["links"]),
}
case "S-mp4":
logger.debug("Found streams from sharepoint")
return {
"server": "sharepoint",
"headers": {"Referer": f"https://{API_BASE_URL}/"},
"subtitles": [],
"episode_title": (allanime_episode["notes"] or f"{anime_title}")
+ f"; Episode {episode_number}",
"links": give_random_quality(response.json()["links"]),
}
case "Sak":
logger.debug("Found streams from dropbox")
return {
"server": "dropbox",
"headers": {"Referer": f"https://{API_BASE_URL}/"},
"subtitles": [],
"episode_title": (allanime_episode["notes"] or f"{anime_title}")
+ f"; Episode {episode_number}",
"links": give_random_quality(response.json()["links"]),
}
case "Default":
logger.debug("Found streams from wixmp")
return {
"server": "wixmp",
"headers": {"Referer": f"https://{API_BASE_URL}/"},
"subtitles": [],
"episode_title": (allanime_episode["notes"] or f"{anime_title}")
+ f"; Episode {episode_number}",
"links": give_random_quality(response.json()["links"]),
}
case "Ak":
# TODO: works but needs further probing
logger.debug("Found streams from Ak")
return {
"server": "Ak",
"headers": {"Referer": f"https://{API_BASE_URL}/"},
"subtitles": [],
"episode_title": (allanime_episode["notes"] or f"{anime_title}")
+ f"; Episode {episode_number}",
"links": give_random_quality(response.json()["links"]),
}
for embed in allanime_episode["sourceUrls"]:
# filter the working streams no need to get all since the others are mostly hsl
# TODO: should i just get all the servers and handle the hsl??
if embed.get("sourceName", "") not in (
# priorities based on death note
"Sak", # 7
@@ -427,7 +425,7 @@ class AllAnime(AnimeProvider):
):
logger.debug(f"Found {embed['sourceName']} but ignoring")
continue
if server := _get_server(embed):
if server := self._get_server(embed, anime_title, allanime_episode):
yield server