diff --git a/fastanime/libs/anime_provider/aniwatch/__init__.py b/fastanime/libs/anime_provider/aniwatch/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/fastanime/libs/anime_provider/aniwatch/api.py b/fastanime/libs/anime_provider/aniwatch/api.py new file mode 100644 index 0000000..5d2f970 --- /dev/null +++ b/fastanime/libs/anime_provider/aniwatch/api.py @@ -0,0 +1,167 @@ +import logging +import re +from itertools import cycle + +from yt_dlp.utils import ( + extract_attributes, + get_element_html_by_class, + get_elements_html_by_class, +) + +from ..base_provider import AnimeProvider +from ..common import fetch_anime_info_from_bal +from ..mini_anilist import search_for_anime_with_anilist +from ..utils import give_random_quality +from .constants import SERVERS_AVAILABLE +from .types import AniWatchStream + +logger = logging.getLogger(__name__) + +LINK_TO_STREAMS_REGEX = re.compile(r".*://(.*)/embed-(2|4|6)/e-([0-9])/(.*)\?.*") + + +class AniWatchApi(AnimeProvider): + def search_for_anime(self, anime_title: str, *args): + try: + return search_for_anime_with_anilist(anime_title) + except Exception as e: + logger.error(e) + + def get_anime(self, anilist_id, *args): + try: + bal_results = fetch_anime_info_from_bal(anilist_id) + if not bal_results: + return + ZORO = bal_results["Sites"]["Zoro"] + aniwatch_id = list(ZORO.keys())[0] + anime_url = f"https://hianime.to/ajax/v2/episode/list/{aniwatch_id}" + response = self.session.get(anime_url, timeout=10) + if response.status_code == 200: + response_json = response.json() + aniwatch_anime_page = response_json["html"] + episodes_info_container_html = get_element_html_by_class( + "ss-list", aniwatch_anime_page + ) + episodes_info_html_list = get_elements_html_by_class( + "ep-item", episodes_info_container_html + ) + # keys: [ data-number: episode_number, data-id: episode_id, title: episode_title , href:episode_page_url] + episodes_info_dicts = [ + extract_attributes(episode_dict) + for episode_dict in episodes_info_html_list + ] + episodes = [episode["data-number"] for episode in episodes_info_dicts] + self.episodes_info = [ + { + "id": episode["data-id"], + "title": f"{episode['title'] or ZORO['title']}; Episode {episode['data-number']}", + "episode": episode["data-number"], + } + for episode in episodes_info_dicts + ] + return { + "id": aniwatch_id, + "availableEpisodesDetail": { + "dub": episodes, + "sub": episodes, + "raw": episodes, + }, + "poster": ZORO[aniwatch_id]["image"], + "title": ZORO[aniwatch_id]["title"], + "episodes_info": self.episodes_info, + } + except Exception as e: + logger.error(e) + + def get_episode_streams(self, anime, episode, translation_type, *args): + try: + episode_details = [ + episode_details + for episode_details in self.episodes_info + if episode_details["episode"] == episode + ] + if not episode_details: + return + episode_details = episode_details[0] + episode_url = f"https://hianime.to/ajax/v2/episode/servers?episodeId={episode_details['id']}" + response = self.session.get(episode_url) + if response.status_code == 200: + response_json = response.json() + episode_page_html = response_json["html"] + servers_containers_html = get_elements_html_by_class( + "ps__-list", episode_page_html + ) + if not servers_containers_html: + return + # sub servers + try: + servers_html_sub = get_elements_html_by_class( + "server-item", servers_containers_html[0] + ) + except Exception: + logger.warn("AniWatch: sub not found") + servers_html_sub = None + + # dub servers + try: + servers_html_dub = get_elements_html_by_class( + "server-item", servers_containers_html[1] + ) + except Exception: + logger.warn("AniWatch: dub not found") + servers_html_dub = None + + if translation_type == "dub": + servers_html = servers_html_dub + else: + servers_html = servers_html_sub + if not servers_html: + return + for server_name, server_html in zip( + cycle(SERVERS_AVAILABLE), servers_html + ): + try: + # keys: [ data-type: translation_type, data-id: embed_id, data-server-id: server_id ] + servers_info = extract_attributes(server_html) + embed_url = f"https://hianime.to/ajax/v2/episode/sources?id={servers_info['data-id']}" + embed_response = self.session.get(embed_url) + if embed_response.status_code == 200: + embed_json = embed_response.json() + raw_link_to_streams = embed_json["link"] + match = LINK_TO_STREAMS_REGEX.match(raw_link_to_streams) + if not match: + continue + provider_domain = match.group(1) + embed_type = match.group(2) + episode_number = match.group(3) + source_id = match.group(4) + + link_to_streams = f"https://{provider_domain}/embed-{embed_type}/ajax/e-{episode_number}/getSources?id={source_id}" + link_to_streams_response = self.session.get(link_to_streams) + if link_to_streams_response.status_code == 200: + juicy_streams_json: "AniWatchStream" = ( + link_to_streams_response.json() + ) + yield { + "headers": {}, + "subtitles": [ + { + "url": track["file"], + "language": track["label"], + } + for track in juicy_streams_json["tracks"] + if track["kind"] == "captions" + ], + "server": server_name, + "episode_title": episode_details["title"], + "links": give_random_quality( + [ + {"link": link["file"], "type": link["type"]} + for link in juicy_streams_json["sources"] + ] + ), + } + except Exception as e: + logger.error(e) + except Exception as e: + logger.error(e) diff --git a/fastanime/libs/anime_provider/aniwatch/constants.py b/fastanime/libs/anime_provider/aniwatch/constants.py new file mode 100644 index 0000000..28ad1dc --- /dev/null +++ b/fastanime/libs/anime_provider/aniwatch/constants.py @@ -0,0 +1 @@ +SERVERS_AVAILABLE = ["HD1", "HD2", "StreamSB", "StreamTape"] diff --git a/fastanime/libs/anime_provider/aniwatch/types.py b/fastanime/libs/anime_provider/aniwatch/types.py new file mode 100644 index 0000000..4e1f3e5 --- /dev/null +++ b/fastanime/libs/anime_provider/aniwatch/types.py @@ -0,0 +1,26 @@ +from typing import Literal, TypedDict + + +class AniWatchSkipTime(TypedDict): + start: int + end: int + + +class AniWatchSource(TypedDict): + file: str + type: str + + +class AniWatchTrack(TypedDict): + file: str + label: str + kind: Literal["captions", "thumbnails", "audio"] + + +class AniWatchStream(TypedDict): + sources: list[AniWatchSource] + tracks: list[AniWatchTrack] + encrypted: bool + intro: AniWatchSkipTime + outro: AniWatchSkipTime + server: int diff --git a/fastanime/libs/anime_provider/common.py b/fastanime/libs/anime_provider/common.py new file mode 100644 index 0000000..8ff6b57 --- /dev/null +++ b/fastanime/libs/anime_provider/common.py @@ -0,0 +1,15 @@ +import logging + +from requests import get + +logger = logging.getLogger(__name__) + + +def fetch_anime_info_from_bal(anilist_id): + try: + url = f"https://raw.githubusercontent.com/bal-mackup/mal-backup/master/anilist/anime/{anilist_id}.json" + response = get(url, timeout=11) + if response.status_code == 200: + return response.json() + except Exception as e: + logger.error(e) diff --git a/fastanime/libs/anime_provider/mini_anilist.py b/fastanime/libs/anime_provider/mini_anilist.py new file mode 100644 index 0000000..e30f9f9 --- /dev/null +++ b/fastanime/libs/anime_provider/mini_anilist.py @@ -0,0 +1,153 @@ +import logging +from typing import TYPE_CHECKING + +from requests import post +from thefuzz import fuzz + +if TYPE_CHECKING: + from ..anilist.types import AnilistDataSchema +logger = logging.getLogger(__name__) + +ANILIST_ENDPOINT = "https://graphql.anilist.co" +""" +query($query:String){ + Page(perPage:50){ + pageInfo{ + total + currentPage + hasNextPage + } + media(search:$query,type:ANIME){ + id + idMal + title{ + romaji + english + } + episodes + status + nextAiringEpisode { + timeUntilAiring + airingAt + episode + } + } + } +} +""" + + +def search_for_anime_with_anilist(anime_title: str): + query = """ + query($query:String){ + Page(perPage:50){ + pageInfo{ + total + currentPage + hasNextPage + } + media(search:$query,type:ANIME){ + id + idMal + title{ + romaji + english + } + episodes + status + nextAiringEpisode { + timeUntilAiring + airingAt + episode + } + } + } + } + """ + response = post( + ANILIST_ENDPOINT, + json={"query": query, "variables": {"query": anime_title}}, + timeout=10, + ) + if response.status_code == 200: + anilist_data: "AnilistDataSchema" = response.json() + return { + "pageInfo": anilist_data["data"]["Page"]["pageInfo"], + "results": [ + { + "id": anime_result["id"], + "title": anime_result["title"]["romaji"] + or anime_result["title"]["english"], + "type": "anime", + "availableEpisodes": list( + range( + 1, + ( + anime_result["episodes"] + if not anime_result["status"] == "RELEASING" + and anime_result["episodes"] + else ( + anime_result["nextAiringEpisode"]["episode"] - 1 + if anime_result["nextAiringEpisode"] + else 0 + ) + ), + ) + ), + } + for anime_result in anilist_data["data"]["Page"]["media"] + ], + } + + +def get_mal_id_and_anilist_id(anime_title: str) -> "dict[str,int] | None": + """the abstraction over all none authenticated requests and that returns data of a similar type + + Args: + query: the anilist query + variables: the anilist api variables + + Returns: + a boolean indicating success and none or an anilist object depending on success + """ + query = """ + query($query:String){ + Page(perPage:50){ + pageInfo{ + total + currentPage + hasNextPage + } + media(search:$query,type:ANIME){ + id + idMal + title{ + romaji + english + } + } + } + } + """ + + try: + variables = {"query": anime_title} + response = post( + ANILIST_ENDPOINT, + json={"query": query, "variables": variables}, + timeout=10, + ) + anilist_data: "AnilistDataSchema" = response.json() + if response.status_code == 200: + anime = max( + anilist_data["data"]["Page"]["media"], + key=lambda anime: max( + ( + fuzz.ratio(anime, str(anime["title"]["romaji"])), + fuzz.ratio(anime_title, str(anime["title"]["english"])), + ) + ), + ) + return {"id_anilist": anime["id"], "id_mal": anime["idMal"]} + except Exception as e: + logger.error(f"Something unexpected occured {e}")