chore: cleanup

This commit is contained in:
Benexl
2026-05-02 21:41:19 +03:00
parent 74487a2d17
commit 9243222fee
31 changed files with 0 additions and 1773 deletions
@@ -1 +0,0 @@
@@ -1,39 +0,0 @@
import re
from .....core.constants import GRAPHQL_DIR
SERVERS_AVAILABLE = [
"sharepoint",
"dropbox",
"gogoanime",
"weTransfer",
"wixmp",
"Yt",
"mp4-upload",
]
API_BASE_URL = "allanime.day"
API_GRAPHQL_REFERER = "https://allanime.to/"
API_GRAPHQL_ENDPOINT = f"https://api.{API_BASE_URL}/api/"
API_GRAPHQL_HEADERS= {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Content-Type": "application/json",
"Origin": f"{API_GRAPHQL_REFERER}",
}
# search constants
DEFAULT_COUNTRY_OF_ORIGIN = "all"
DEFAULT_NSFW = True
DEFAULT_UNKNOWN = True
DEFAULT_PER_PAGE = 40
DEFAULT_PAGE = 1
# regex stuff
MP4_SERVER_JUICY_STREAM_REGEX = re.compile(
r"video/mp4\",src:\"(https?://.*/video\.mp4)\""
)
# graphql files
_GQL_QUERIES = GRAPHQL_DIR / "allanime" / "queries"
SEARCH_GQL = _GQL_QUERIES / "search.gql"
ANIME_GQL = _GQL_QUERIES / "anime.gql"
EPISODE_GQL = _GQL_QUERIES / "episodes.gql"
@@ -1,3 +0,0 @@
from .extractor import extract_server
__all__ = ["extract_server"]
@@ -1,31 +0,0 @@
from ...types import EpisodeStream, Server
from ..constants import API_BASE_URL
from ..types import AllAnimeEpisode, AllAnimeSource
from .base import BaseExtractor
class AkExtractor(BaseExtractor):
@classmethod
def extract(
cls,
url,
client,
episode_number: str,
episode: AllAnimeEpisode,
source: AllAnimeSource,
) -> Server:
response = client.get(
f"https://{API_BASE_URL}{url.replace('clock', 'clock.json')}",
timeout=10,
)
response.raise_for_status()
streams = response.json()
return Server(
name="Ak",
links=[
EpisodeStream(link=link, quality="1080") for link in streams["links"]
],
episode_title=episode["notes"],
headers={"Referer": f"https://{API_BASE_URL}/"},
)
@@ -1,20 +0,0 @@
from abc import ABC, abstractmethod
from httpx import Client
from ...types import Server
from ..types import AllAnimeEpisode, AllAnimeSource
class BaseExtractor(ABC):
@classmethod
@abstractmethod
def extract(
cls,
url: str,
client: Client,
episode_number: str,
episode: AllAnimeEpisode,
source: AllAnimeSource,
) -> Server | None:
pass
@@ -1,31 +0,0 @@
from ...types import EpisodeStream, Server
from ..constants import API_BASE_URL
from ..types import AllAnimeEpisode, AllAnimeSource
from .base import BaseExtractor
class SakExtractor(BaseExtractor):
@classmethod
def extract(
cls,
url,
client,
episode_number: str,
episode: AllAnimeEpisode,
source: AllAnimeSource,
) -> Server:
response = client.get(
f"https://{API_BASE_URL}{url.replace('clock', 'clock.json')}",
timeout=10,
)
response.raise_for_status()
streams = response.json()
return Server(
name="dropbox",
links=[
EpisodeStream(link=link, quality="1080") for link in streams["links"]
],
episode_title=episode["notes"],
headers={"Referer": f"https://{API_BASE_URL}/"},
)
@@ -1,65 +0,0 @@
from httpx import Client
from ...types import Server
from ..types import AllAnimeEpisode, AllAnimeSource
from ..utils import debug_extractor, logger, one_digit_symmetric_xor
from .ak import AkExtractor
from .dropbox import SakExtractor
from .filemoon import FmHlsExtractor, OkExtractor
from .gogoanime import Lufmp4Extractor
from .mp4_upload import Mp4Extractor
from .sharepoint import Smp4Extractor
from .streamsb import SsHlsExtractor
from .vid_mp4 import VidMp4Extractor
from .we_transfer import KirExtractor
from .wixmp import DefaultExtractor
from .yt_mp4 import YtExtractor
AVAILABLE_SOURCES = {
"Sak": SakExtractor,
"S-mp4": Smp4Extractor,
"Luf-Mp4": Lufmp4Extractor,
"Default": DefaultExtractor,
"Yt-mp4": YtExtractor,
"Kir": KirExtractor,
"Mp4": Mp4Extractor,
}
OTHER_SOURCES = {
"Ak": AkExtractor,
"Vid-mp4": VidMp4Extractor,
"Ok": OkExtractor,
"Ss-Hls": SsHlsExtractor,
"Fm-Hls": FmHlsExtractor,
}
@debug_extractor
def extract_server(
client: Client,
episode_number: str,
episode: AllAnimeEpisode,
source: AllAnimeSource,
) -> Server | None:
url = source.get("sourceUrl")
if not url:
logger.debug(f"Url not found in source: {source}")
return
if url.startswith("--"):
url = one_digit_symmetric_xor(56, url[2:])
logger.debug(f"Decrypting url for source: {source['sourceName']}")
if source["sourceName"] in OTHER_SOURCES:
logger.debug(f"Found {source['sourceName']} but ignoring")
return
if source["sourceName"] not in AVAILABLE_SOURCES:
logger.debug(
f"Found {source['sourceName']} but did not expect it, its time to scrape lol"
)
return
logger.debug(f"Found {source['sourceName']}")
return AVAILABLE_SOURCES[source["sourceName"]].extract(
url, client, episode_number, episode, source
)
@@ -1,62 +0,0 @@
from ...types import EpisodeStream, Server
from ..constants import API_BASE_URL, MP4_SERVER_JUICY_STREAM_REGEX
from ..types import AllAnimeEpisode, AllAnimeSource
from .base import BaseExtractor
# TODO: requires decoding obsfucated js (filemoon)
class FmHlsExtractor(BaseExtractor):
@classmethod
def extract(
cls,
url,
client,
episode_number: str,
episode: AllAnimeEpisode,
source: AllAnimeSource,
) -> Server:
response = client.get(
f"https://{API_BASE_URL}{url.replace('clock', 'clock.json')}",
timeout=10,
)
response.raise_for_status()
embed_html = response.text.replace(" ", "").replace("\n", "")
vid = MP4_SERVER_JUICY_STREAM_REGEX.search(embed_html)
if not vid:
raise Exception("")
return Server(
name="dropbox",
links=[EpisodeStream(link=vid.group(1), quality="1080")],
episode_title=episode["notes"],
headers={"Referer": "https://www.mp4upload.com/"},
)
# TODO: requires decoding obsfucated js (filemoon)
class OkExtractor(BaseExtractor):
@classmethod
def extract(
cls,
url,
client,
episode_number: str,
episode: AllAnimeEpisode,
source: AllAnimeSource,
) -> Server:
response = client.get(
f"https://{API_BASE_URL}{url.replace('clock', 'clock.json')}",
timeout=10,
)
response.raise_for_status()
embed_html = response.text.replace(" ", "").replace("\n", "")
vid = MP4_SERVER_JUICY_STREAM_REGEX.search(embed_html)
if not vid:
raise Exception("")
return Server(
name="dropbox",
links=[EpisodeStream(link=vid.group(1), quality="1080")],
episode_title=episode["notes"],
headers={"Referer": "https://www.mp4upload.com/"},
)
@@ -1,34 +0,0 @@
from ...types import EpisodeStream, Server
from ..constants import API_BASE_URL
from ..types import AllAnimeEpisode, AllAnimeEpisodeStreams, AllAnimeSource
from .base import BaseExtractor
class Lufmp4Extractor(BaseExtractor):
@classmethod
def extract(
cls,
url,
client,
episode_number: str,
episode: AllAnimeEpisode,
source: AllAnimeSource,
) -> Server:
response = client.get(
f"https://{API_BASE_URL}{url.replace('clock', 'clock.json')}",
timeout=10,
)
response.raise_for_status()
streams: AllAnimeEpisodeStreams = response.json()
return Server(
name="gogoanime",
links=[
EpisodeStream(
link=stream["link"], quality="1080", format=stream["resolutionStr"]
)
for stream in streams["links"]
],
episode_title=episode["notes"],
headers={"Referer": f"https://{API_BASE_URL}/"},
)
@@ -1,32 +0,0 @@
from ...types import EpisodeStream, Server
from ..constants import MP4_SERVER_JUICY_STREAM_REGEX
from ..utils import logger
from .base import BaseExtractor
class Mp4Extractor(BaseExtractor):
@classmethod
def extract(cls, url, client, episode_number, episode, source):
response = client.get(url, timeout=10, follow_redirects=True)
response.raise_for_status()
embed_html = response.text.replace(" ", "").replace("\n", "")
# NOTE: some of the video were deleted so the embed html will just be "Filewasdeleted"
vid = MP4_SERVER_JUICY_STREAM_REGEX.search(embed_html)
if not vid:
if embed_html == "Filewasdeleted":
logger.debug(
"Failed to extract stream url from mp4-uploads. Reason: Filewasdeleted"
)
return
logger.debug(
f"Failed to extract stream url from mp4-uploads. Reason: unknown. Embed html: {embed_html}"
)
return
return Server(
name="mp4-upload",
links=[EpisodeStream(link=vid.group(1), quality="1080")],
episode_title=episode["notes"],
headers={"Referer": "https://www.mp4upload.com/"},
)
@@ -1,28 +0,0 @@
from ...types import EpisodeStream, Server
from ..constants import API_BASE_URL
from ..types import AllAnimeEpisodeStreams
from .base import BaseExtractor
class Smp4Extractor(BaseExtractor):
@classmethod
def extract(cls, url, client, episode_number, episode, source):
response = client.get(
f"https://{API_BASE_URL}{url.replace('clock', 'clock.json')}",
timeout=10,
)
response.raise_for_status()
streams: AllAnimeEpisodeStreams = response.json()
return Server(
name="sharepoint",
links=[
EpisodeStream(
link=stream["link"],
quality="1080",
format=stream["resolutionStr"],
)
for stream in streams["links"]
],
episode_title=episode["notes"],
headers={"Referer": f"https://{API_BASE_URL}/"},
)
@@ -1,32 +0,0 @@
from ...types import EpisodeStream, Server
from ..constants import API_BASE_URL
from ..types import AllAnimeEpisode, AllAnimeSource
from .base import BaseExtractor
class SsHlsExtractor(BaseExtractor):
@classmethod
def extract(
cls,
url,
client,
episode_number: str,
episode: AllAnimeEpisode,
source: AllAnimeSource,
) -> Server:
# TODO: requires some serious work i think : )
response = client.get(
url,
timeout=10,
)
response.raise_for_status()
streams = response.json()["links"]
return Server(
name="StreamSb",
links=[
EpisodeStream(link=link, quality="1080") for link in streams["links"]
],
episode_title=episode["notes"],
headers={"Referer": f"https://{API_BASE_URL}/"},
)
@@ -1,32 +0,0 @@
from ...types import EpisodeStream, Server
from ..constants import API_BASE_URL
from ..types import AllAnimeEpisode, AllAnimeSource
from .base import BaseExtractor
# TODO: requires some serious work i think : )
class VidMp4Extractor(BaseExtractor):
@classmethod
def extract(
cls,
url,
client,
episode_number: str,
episode: AllAnimeEpisode,
source: AllAnimeSource,
) -> Server:
response = client.get(
f"https://{API_BASE_URL}{url.replace('clock', 'clock.json')}",
timeout=10,
)
response.raise_for_status()
streams = response.json()
return Server(
name="Vid-mp4",
links=[
EpisodeStream(link=link, quality="1080") for link in streams["links"]
],
episode_title=episode["notes"],
headers={"Referer": f"https://{API_BASE_URL}/"},
)
@@ -1,31 +0,0 @@
from ...types import EpisodeStream, Server
from ..constants import API_BASE_URL
from ..types import AllAnimeEpisode, AllAnimeSource
from .base import BaseExtractor
class KirExtractor(BaseExtractor):
@classmethod
def extract(
cls,
url,
client,
episode_number: str,
episode: AllAnimeEpisode,
source: AllAnimeSource,
) -> Server:
response = client.get(
f"https://{API_BASE_URL}{url.replace('clock', 'clock.json')}",
timeout=10,
)
response.raise_for_status()
streams = response.json()
return Server(
name="weTransfer",
links=[
EpisodeStream(link=link, quality="1080") for link in streams["links"]
],
episode_title=episode["notes"],
headers={"Referer": f"https://{API_BASE_URL}/"},
)
@@ -1,26 +0,0 @@
from ...types import EpisodeStream, Server
from ..constants import API_BASE_URL
from ..types import AllAnimeEpisodeStreams
from .base import BaseExtractor
class DefaultExtractor(BaseExtractor):
@classmethod
def extract(cls, url, client, episode_number, episode, source):
response = client.get(
f"https://{API_BASE_URL}{url.replace('clock', 'clock.json')}",
timeout=10,
)
response.raise_for_status()
streams: AllAnimeEpisodeStreams = response.json()
return Server(
name="wixmp",
links=[
EpisodeStream(
link=stream["link"], quality="1080", format=stream["resolutionStr"]
)
for stream in streams["links"]
],
episode_title=episode["notes"],
headers={"Referer": f"https://{API_BASE_URL}/"},
)
@@ -1,22 +0,0 @@
from ...types import EpisodeStream, Server
from ..constants import API_BASE_URL
from ..types import AllAnimeEpisode, AllAnimeSource
from .base import BaseExtractor
class YtExtractor(BaseExtractor):
@classmethod
def extract(
cls,
url,
client,
episode_number: str,
episode: AllAnimeEpisode,
source: AllAnimeSource,
) -> Server:
return Server(
name="Yt",
links=[EpisodeStream(link=url, quality="1080")],
episode_title=episode["notes"],
headers={"Referer": f"https://{API_BASE_URL}/"},
)
@@ -1,58 +0,0 @@
from typing import Union
from httpx import Response
from ..types import (
Anime,
AnimeEpisodes,
MediaTranslationType,
PageInfo,
SearchResult,
SearchResults,
)
from .types import AllAnimeSearchResults, AllAnimeShow
def generate_list(count: Union[int, str]) -> list[str]:
return list(map(str, range(int(count))))
translation_type_map = {
"sub": MediaTranslationType.SUB,
"dub": MediaTranslationType.DUB,
"raw": MediaTranslationType.RAW,
}
def map_to_search_results(response: Response) -> SearchResults:
search_results: AllAnimeSearchResults = response.json()["data"]
return SearchResults(
page_info=PageInfo(total=search_results["shows"]["pageInfo"]["total"]),
results=[
SearchResult(
id=result["_id"],
title=result["name"],
media_type=result["__typename"],
episodes=AnimeEpisodes(
sub=generate_list(result["availableEpisodes"]["sub"]),
dub=generate_list(result["availableEpisodes"]["dub"]),
raw=generate_list(result["availableEpisodes"]["raw"]),
),
)
for result in search_results["shows"]["edges"]
],
)
def map_to_anime_result(response: Response) -> Anime:
anime: AllAnimeShow = response.json()["data"]["show"]
return Anime(
id=anime["_id"],
title=anime["name"],
episodes=AnimeEpisodes(
sub=sorted(anime["availableEpisodesDetail"]["sub"], key=float),
dub=sorted(anime["availableEpisodesDetail"]["dub"], key=float),
raw=sorted(anime["availableEpisodesDetail"]["raw"], key=float),
),
type=anime.get("__typename"),
)
@@ -1,84 +0,0 @@
import logging
from typing import TYPE_CHECKING
from .....core.utils.graphql import execute_graphql
from ..base import BaseAnimeProvider
from ..utils.debug import debug_provider
from .constants import (
ANIME_GQL,
API_GRAPHQL_ENDPOINT,
API_GRAPHQL_HEADERS,
API_GRAPHQL_REFERER,
EPISODE_GQL,
SEARCH_GQL,
)
from .mappers import (
map_to_anime_result,
map_to_search_results,
)
if TYPE_CHECKING:
from .types import AllAnimeEpisode
logger = logging.getLogger(__name__)
class AllAnime(BaseAnimeProvider):
HEADERS = {"Referer": API_GRAPHQL_REFERER}
@debug_provider
def search(self, params):
response = execute_graphql(
API_GRAPHQL_ENDPOINT,
self.client,
SEARCH_GQL,
variables={
"search": {
"allowAdult": params.allow_nsfw,
"allowUnknown": params.allow_unknown,
"query": params.query,
},
"limit": params.page_limit,
"page": params.current_page,
"translationtype": params.translation_type,
"countryorigin": params.country_of_origin,
},
headers=API_GRAPHQL_HEADERS
)
return map_to_search_results(response)
@debug_provider
def get(self, params):
response = execute_graphql(
API_GRAPHQL_ENDPOINT,
self.client,
ANIME_GQL,
variables={"showId": params.id},
headers=API_GRAPHQL_HEADERS
)
return map_to_anime_result(response)
@debug_provider
def episode_streams(self, params):
from .extractors import extract_server
episode_response = execute_graphql(
API_GRAPHQL_ENDPOINT,
self.client,
EPISODE_GQL,
variables={
"showId": params.anime_id,
"translationType": params.translation_type,
"episodeString": params.episode,
},
headers=API_GRAPHQL_HEADERS
)
episode: AllAnimeEpisode = episode_response.json()["data"]["episode"]
for source in episode["sourceUrls"]:
if server := extract_server(self.client, params.episode, episode, source):
yield server
if __name__ == "__main__":
from ..utils.debug import test_anime_provider
test_anime_provider(AllAnime)
@@ -1,111 +0,0 @@
from enum import Enum
from typing import Literal, TypedDict
class Server(Enum):
SHAREPOINT = "sharepoint"
DROPBOX = "dropbox"
GOGOANIME = "gogoanime"
WETRANSFER = "weTransfer"
WIXMP = "wixmp"
YT = "Yt"
MP4_UPLOAD = "mp4-upload"
class AllAnimeEpisodesDetail(TypedDict):
dub: list[str]
sub: list[str]
raw: list[str]
class AllAnimeEpisodes(TypedDict):
dub: int
sub: int
raw: int
class AllAnimePageInfo(TypedDict):
total: int
class AllAnimeShow(TypedDict):
_id: str
name: str
availableEpisodesDetail: AllAnimeEpisodesDetail
__typename: str
class AllAnimeSearchResult(TypedDict):
_id: str
name: str
availableEpisodes: AllAnimeEpisodes
__typename: str | None
class AllAnimeShows(TypedDict):
pageInfo: AllAnimePageInfo
edges: list[AllAnimeSearchResult]
class AllAnimeSearchResults(TypedDict):
shows: AllAnimeShows
class AllAnimeSourceDownload(TypedDict):
sourceName: str
dowloadUrl: str
class AllAnimeSource(TypedDict):
sourceName: Literal[
"Sak",
"S-mp4",
"Luf-mp4",
"Default",
"Yt-mp4",
"Kir",
"Mp4",
"Ak",
"Vid-mp4",
"Ok",
"Ss-Hls",
"Fm-Hls",
]
sourceUrl: str
priority: float
sandbox: str
type: str
className: str
streamerId: str
downloads: AllAnimeSourceDownload
class AllAnimeEpisodeStream(TypedDict):
link: str
hls: bool
resolutionStr: str
fromCache: str
class AllAnimeEpisodeStreams(TypedDict):
links: list[AllAnimeEpisodeStream]
class AllAnimeEpisode(TypedDict):
episodeString: str
sourceUrls: list[AllAnimeSource]
notes: str | None
class AllAnimeStream:
link: str
mp4: bool
hls: bool | None
resolutionStr: str
fromCache: str
priority: int
headers: dict | None
class AllAnimeStreams:
links: list[AllAnimeStream]
@@ -1,92 +0,0 @@
import functools
import logging
import os
import re
from itertools import cycle
logger = logging.getLogger(__name__)
# Dictionary to map hex values to characters
hex_to_char = {
"01": "9",
"08": "0",
"05": "=",
"0a": "2",
"0b": "3",
"0c": "4",
"07": "?",
"00": "8",
"5c": "d",
"0f": "7",
"5e": "f",
"17": "/",
"54": "l",
"09": "1",
"48": "p",
"4f": "w",
"0e": "6",
"5b": "c",
"5d": "e",
"0d": "5",
"53": "k",
"1e": "&",
"5a": "b",
"59": "a",
"4a": "r",
"4c": "t",
"4e": "v",
"57": "o",
"51": "i",
}
def debug_extractor(extractor_function):
@functools.wraps(extractor_function)
def _provider_function_wrapper(*args):
if not os.environ.get("VIU_DEBUG"):
try:
return extractor_function(*args)
except Exception as e:
logger.error(
f"[AllAnime@Server={args[3].get('sourceName', 'UNKNOWN')}]: {e}"
)
else:
return extractor_function(*args)
return _provider_function_wrapper
def give_random_quality(links):
qualities = cycle(["1080", "720", "480", "360"])
return [
{**episode_stream, "quality": quality}
for episode_stream, quality in zip(links, qualities, strict=False)
]
def one_digit_symmetric_xor(password: int, target: str):
def genexp():
for segment in bytearray.fromhex(target):
yield segment ^ password
return bytes(genexp()).decode("utf-8")
def decode_hex_string(hex_string):
"""some of the sources encrypt the urls into hex codes this function decrypts the urls
Args:
hex_string ([TODO:parameter]): [TODO:description]
Returns:
[TODO:return]
"""
# Split the hex string into pairs of characters
hex_pairs = re.findall("..", hex_string)
# Decode each hex pair
decoded_chars = [hex_to_char.get(pair.lower(), pair) for pair in hex_pairs]
# TODO: Better type handling
return "".join(decoded_chars) # type: ignore
@@ -1 +0,0 @@
@@ -1,56 +0,0 @@
import re
ANIMEPAHE = "animepahe.pw"
ANIMEPAHE_BASE = f"https://{ANIMEPAHE}"
ANIMEPAHE_ENDPOINT = f"{ANIMEPAHE_BASE}/api"
CDN_PROVIDER = "kwik.cx"
CDN_PROVIDER_BASE = f"https://{CDN_PROVIDER}"
SERVERS_AVAILABLE = ["kwik"]
REQUEST_HEADERS = {
"Cookie": "__ddgid_=VvX0ebHrH2DsFZo4; __ddgmark_=3savRpSVFhvZcn5x; __ddg2_=buBJ3c4pNBYKFZNp; __ddg1_=rbVADKr9URtt55zoIGFa; SERVERID=janna; XSRF-TOKEN=eyJpdiI6IjV5bFNtd0phUHgvWGJxc25wL0VJSUE9PSIsInZhbHVlIjoicEJTZktlR2hxR2JZTWhnL0JzazlvZU5TQTR2bjBWZ2dDb0RwUXVUUWNSclhQWUhLRStYSmJmWmUxWkpiYkFRYU12RjFWejlSWHorME1wZG5qQ1U0TnFlNnBFR2laQjN1MjdyNjc5TjVPdXdJb2o5VkU1bEduRW9pRHNDTHh6Sy8iLCJtYWMiOiI0OTc0ZmNjY2UwMGJkOWY2MWNkM2NlMjk2ZGMyZGJmMWE0NTdjZTdkNGI2Y2IwNTIzZmFiZWU5ZTE2OTk0YmU4IiwidGFnIjoiIn0%3D; laravel_session=eyJpdiI6ImxvdlpqREFnTjdaeFJubUlXQWlJVWc9PSIsInZhbHVlIjoiQnE4R3VHdjZ4M1NDdEVWM1ZqMUxtNnVERnJCcmtCUHZKNzRPR2RFbzNFcStTL29xdnVTbWhsNVRBUXEybVZWNU1UYVlTazFqYlN5UjJva1k4czNGaXBTbkJJK01oTUd3VHRYVHBoc3dGUWxHYnFlS2NJVVNFbTFqMVBWdFpuVUgiLCJtYWMiOiI1NDdjZTVkYmNhNjUwZTMxZmRlZmVmMmRlMGNiYjAwYjlmYjFjY2U0MDc1YTQzZThiMTIxMjJlYTg1NTA4YjBmIiwidGFnIjoiIn0%3D; latest=5592",
"Host": ANIMEPAHE,
"Accept": "application, text/javascript, */*; q=0.01",
"Accept-Encoding": "Utf-8",
"Referer": ANIMEPAHE_BASE,
"DNT": "1",
"Connection": "keep-alive",
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Site": "same-origin",
"Sec-Fetch-Mode": "cors",
"TE": "trailers",
}
SERVER_HEADERS = {
"Host": "kwik.cx",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/png,image/svg+xml,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
"Accept-Encoding": "Utf-8",
"DNT": "1",
"Connection": "keep-alive",
"Referer": ANIMEPAHE_BASE + "/",
"Upgrade-Insecure-Requests": "1",
"Sec-Fetch-Dest": "iframe",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "cross-site",
"Priority": "u=4",
"TE": "trailers",
}
STREAM_HEADERS = {
# "Host": "vault-16.owocdn.top", # This will have to be the actual host of the stream (behind Kwik)
"Accept": "*/*",
"Accept-Language": "en-US,en;q=0.5",
"Accept-Encoding": "gzip, deflate, br, zstd",
"Origin": CDN_PROVIDER_BASE,
"Sec-GPC": "1",
"Connection": "keep-alive",
"Referer": CDN_PROVIDER_BASE + "/",
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "cross-site",
"TE": "trailers",
}
JUICY_STREAM_REGEX = re.compile(r"source='(.*)';")
KWIK_RE = re.compile(r"Player\|(.+?)'")
@@ -1,77 +0,0 @@
import re
def animepahe_key_creator(c: int, a: int):
from ...scraping.utils import encode_base_n
if c < a:
val_a = ""
else:
val_a = animepahe_key_creator(int(c / a), a)
c = c % a
if c > 35:
val_b = chr(c + 29)
else:
val_b = encode_base_n(c, 36)
return val_a + val_b
def animepahe_embed_decoder(
encoded_js_p: str,
base_a: int,
no_of_keys_c: int,
values_to_replace_with_k: list,
):
decode_mapper_d: dict = {}
for i in range(no_of_keys_c):
key = animepahe_key_creator(i, base_a)
val = values_to_replace_with_k[i] or key
decode_mapper_d[key] = val
return re.sub(
r"\b\w+\b", lambda match: decode_mapper_d[match.group(0)], encoded_js_p
)
PARAMETERS_REGEX = re.compile(r"eval\(function\(p,a,c,k,e,d\)\{.*\}\((.*?)\)\)$")
ENCODE_JS_REGEX = re.compile(r"'(.*?);',(\d+),(\d+),'(.*)'\.split")
def process_animepahe_embed_page(embed_page: str):
from ...scraping.html_parser import get_element_text_and_html_by_tag
encoded_js_string = ""
embed_page_content = embed_page
for _ in range(8):
text, html = get_element_text_and_html_by_tag("script", embed_page_content)
if not text and html:
embed_page_content = re.sub(html, "", embed_page_content)
continue
if text:
encoded_js_string = text.strip()
break
if not encoded_js_string:
return
obsfucated_js_parameter_match = PARAMETERS_REGEX.search(encoded_js_string)
if not obsfucated_js_parameter_match:
return
parameter_string = obsfucated_js_parameter_match.group(1)
encoded_js_parameter_string = ENCODE_JS_REGEX.search(parameter_string)
if not encoded_js_parameter_string:
return
p: str = encoded_js_parameter_string.group(1)
a: int = int(encoded_js_parameter_string.group(2))
c: int = int(encoded_js_parameter_string.group(3))
k: list = encoded_js_parameter_string.group(4).split("|")
return animepahe_embed_decoder(p, a, c, k).replace("\\", "")
if __name__ == "__main__":
# Testing time
filepath = input("Enter file name: ")
if filepath:
with open(filepath) as file:
data = file.read()
else:
data = """<script>eval(function(p,a,c,k,e,d){e=function(c){return(c<a?'':e(parseInt(c/a)))+((c=c%a)>35?String.fromCharCode(c+29):c.toString(36))};if(!''.replace(/^/,String)){while(c--){d[e(c)]=k[c]||e(c)}k=[function(e){return d[e]}];e=function(){return'\\w+'};c=1};while(c--){if(k[c]){p=p.replace(new RegExp('\\b'+e(c)+'\\b','g'),k[c])}}return p}('f $7={H:a(2){4 B(9.7.h(y z("(?:(?:^|.*;)\\\\s*"+d(2).h(/[\\-\\.\\+\\*]/g,"\\\\$&")+"\\\\s*\\\\=\\\\s*([^;]*).*$)|^.*$"),"$1"))||G},E:a(2,q,3,6,5,t){k(!2||/^(?:8|r\\-v|o|m|p)$/i.D(2)){4 w}f b="";k(3){F(3.J){j K:b=3===P?"; 8=O, I N Q M:u:u A":"; r-v="+3;n;j L:b="; 8="+3;n;j S:b="; 8="+3.Z();n}}9.7=d(2)+"="+d(q)+b+(5?"; m="+5:"")+(6?"; o="+6:"")+(t?"; p":"");4 x},Y:a(2,6,5){k(!2||!11.C(2)){4 w}9.7=d(2)+"=; 8=12, R 10 W l:l:l A"+(5?"; m="+5:"")+(6?"; o="+6:"");4 x},C:a(2){4(y z("(?:^|;\\\\s*)"+d(2).h(/[\\-\\.\\+\\*]/g,"\\\\$&")+"\\\\s*\\\\=")).D(9.7)},X:a(){f c=9.7.h(/((?:^|\\s*;)[^\\=]+)(?=;|$)|^\\s*|\\s*(?:\\=[^;]*)?(?:\\1|$)/g,"").T(/\\s*(?:\\=[^;]*)?;\\s*/);U(f e=0;e<c.V;e++){c[e]=B(c[e])}4 c}};',62,65,'||sKey|vEnd|return|sDomain|sPath|cookie|expires|document|function|sExpires|aKeys|encodeURIComponent|nIdx|var||replace||case|if|00|domain|break|path|secure|sValue|max||bSecure|59|age|false|true|new|RegExp|GMT|decodeURIComponent|hasItem|test|setItem|switch|null|getItem|31|constructor|Number|String|23|Dec|Fri|Infinity|9999|01|Date|split|for|length|1970|keys|removeItem|toUTCString|Jan|this|Thu'.split('|'),0,{}));eval(function(p,a,c,k,e,d){e=function(c){return(c<a?'':e(parseInt(c/a)))+((c=c%a)>35?String.fromCharCode(c+29):c.toString(36))};if(!''.replace(/^/,String)){while(c--){d[e(c)]=k[c]||e(c)}k=[function(e){return d[e]}];e=function(){return'\\w+'};c=1};while(c--){if(k[c]){p=p.replace(new RegExp('\\b'+e(c)+'\\b','g'),k[c])}}return p}('h o=\'1D://1C-E.1B.1A.1z/1y/E/1x/1w/1v.1u\';h d=s.r(\'d\');h 0=B 1t(d,{\'1s\':{\'1r\':i},\'1q\':\'16:9\',\'D\':1,\'1p\':5,\'1o\':{\'1n\':\'1m\'},1l:[\'7-1k\',\'7\',\'1j\',\'1i-1h\',\'1g\',\'1f-1e\',\'1d\',\'D\',\'1c\',\'1b\',\'1a\',\'19\',\'C\',\'18\'],\'C\':{\'17\':i}});8(!A.15()){d.14=o}x{j z={13:12,11:10,Z:Y,X:i,W:i};h c=B A(z);c.V(o);c.U(d);g.c=c}0.3("T",6=>{g.S.R.Q("P")});0.O=1;k v(b,n,m){8(b.y){b.y(n,m,N)}x 8(b.w){b.w(\'3\'+n,m)}}j 4=k(l){g.M.L(l,\'*\')};v(g,\'l\',k(e){j a=e.a;8(a===\'7\')0.7();8(a===\'f\')0.f();8(a===\'u\')0.u()});0.3(\'t\',6=>{4(\'t\')});0.3(\'7\',6=>{4(\'7\')});0.3(\'f\',6=>{4(\'f\')});0.3(\'K\',6=>{4(0.q);s.r(\'.J-I\').H=G(0.q.F(2))});0.3(\'p\',6=>{4(\'p\')});',62,102,'player|||on|sendMessage||event|play|if||data|element|hls|video||pause|window|const|true|var|function|message|eventHandler|eventName|source|ended|currentTime|querySelector|document|ready|stop|bindEvent|attachEvent|else|addEventListener|config|Hls|new|fullscreen|volume|01|toFixed|String|innerHTML|timestamp|ss|timeupdate|postMessage|parent|false|speed|landscape|lock|orientation|screen|enterfullscreen|attachMedia|loadSource|lowLatencyMode|enableWorker|Infinity|backBufferLength|600|maxMaxBufferLength|180|maxBufferLength|src|isSupported||iosNative|capture|airplay|pip|settings|captions|mute|time|current|progress|forward|fast|rewind|large|controls|kwik|key|storage|seekTime|ratio|global|keyboard|Plyr|m3u8|uwu|b92a392054c041a3f9c6eecabeb0e127183f44e547828447b10bca8d77523e6f|03|stream|org|nextcdn|files|eu|https'.split('|'),0,{}))</script>"""
print(process_animepahe_embed_page(data))
@@ -1,113 +0,0 @@
import logging
from ..types import (
Anime,
AnimeEpisodeInfo,
AnimeEpisodes,
EpisodeStream,
MediaTranslationType,
PageInfo,
SearchResult,
SearchResults,
Server,
)
from .types import (
AnimePaheAnimePage,
AnimePaheSearchPage,
)
translation_type_map = {
"sub": MediaTranslationType.SUB,
"dub": MediaTranslationType.DUB,
"raw": MediaTranslationType.RAW,
}
logger = logging.getLogger(__name__)
def map_to_search_results(data: AnimePaheSearchPage) -> SearchResults:
results = []
for result in data["data"]:
results.append(
SearchResult(
id=result["session"],
title=result["title"],
episodes=AnimeEpisodes(
sub=list(map(str, range(1, result["episodes"] + 1))),
dub=list(map(str, range(1, result["episodes"] + 1))),
raw=list(map(str, range(1, result["episodes"] + 1))),
),
media_type=result["type"],
score=result["score"],
status=result["status"],
season=result["season"],
poster=result["poster"],
year=str(result["year"]),
)
)
return SearchResults(
page_info=PageInfo(
total=data["total"],
per_page=data["per_page"],
current_page=data["current_page"],
),
results=results,
)
def map_to_anime_result(
search_result: SearchResult, anime: AnimePaheAnimePage
) -> Anime:
episodes_info = []
episodes = []
anime["data"] = sorted(anime["data"], key=lambda k: float(k["episode"]))
for ep_info in anime["data"]:
episodes.append(str(ep_info["episode"]))
episodes_info.append(
AnimeEpisodeInfo(
id=str(ep_info["id"]),
session_id=ep_info["session"],
episode=str(ep_info["episode"]),
title=ep_info["title"],
poster=ep_info["snapshot"],
duration=str(ep_info["duration"]),
)
)
return Anime(
id=search_result.id,
title=search_result.title,
episodes=AnimeEpisodes(
sub=episodes,
dub=episodes,
),
year=str(search_result.year),
poster=search_result.poster,
episodes_info=episodes_info,
)
def map_to_server(
episode: AnimeEpisodeInfo,
translation_type: str,
stream_links: list[tuple[str, str]],
headers: dict[str, str],
) -> Server:
links = [
EpisodeStream(
link=link[1],
quality=link[0] if link[0] in ["360", "480", "720", "1080"] else "1080", # type:ignore
translation_type=translation_type_map[translation_type],
)
for link in stream_links
]
# sort links by quality, best to worst
links.sort(key=lambda x: int(x.quality), reverse=True)
logger.debug(f"Aggregated links: {links}")
return Server(
name="kwik", links=links, episode_title=episode.title, headers=headers
)
@@ -1,214 +0,0 @@
import logging
from functools import lru_cache
from typing import Iterator, Optional
from urllib.parse import urlparse
from ..base import BaseAnimeProvider
from ..params import AnimeParams, EpisodeStreamsParams, SearchParams
from ..types import Anime, AnimeEpisodeInfo, SearchResult, SearchResults, Server
from ..utils.debug import debug_provider
from .constants import (
ANIMEPAHE_BASE,
ANIMEPAHE_ENDPOINT,
CDN_PROVIDER,
JUICY_STREAM_REGEX,
REQUEST_HEADERS,
SERVER_HEADERS,
STREAM_HEADERS,
)
from .extractor import process_animepahe_embed_page
from .mappers import map_to_anime_result, map_to_search_results, map_to_server
from .types import AnimePaheAnimePage, AnimePaheSearchPage
logger = logging.getLogger(__name__)
class AnimePahe(BaseAnimeProvider):
HEADERS = REQUEST_HEADERS
@debug_provider
def search(self, params: SearchParams) -> SearchResults | None:
return self._search(params)
@lru_cache()
def _search(self, params: SearchParams) -> SearchResults | None:
url_params = {"m": "search", "q": params.query}
response = self.client.get(ANIMEPAHE_ENDPOINT, params=url_params)
response.raise_for_status()
data: AnimePaheSearchPage = response.json()
if not data.get("data"):
return
return map_to_search_results(data)
@debug_provider
def get(self, params: AnimeParams) -> Anime | None:
return self._get_anime(params)
@lru_cache()
def _get_anime(self, params: AnimeParams) -> Anime | None:
page = 1
standardized_episode_number = 0
search_result = self._get_search_result(params)
if not search_result:
logger.error(f"No search result found for ID {params.id}")
return None
anime: Optional[AnimePaheAnimePage] = None
has_next_page = True
while has_next_page:
logger.debug(f"Loading page: {page}")
_anime_page = self._anime_page_loader(
m="release",
id=params.id,
sort="episode_asc",
page=page,
)
has_next_page = True if _anime_page["next_page_url"] else False
page += 1
if not anime:
anime = _anime_page
else:
anime["data"].extend(_anime_page["data"])
if anime:
for episode in anime.get("data", []):
if episode["episode"] % 1 == 0:
standardized_episode_number += 1
episode.update({"episode": standardized_episode_number})
else:
standardized_episode_number += episode["episode"] % 1
episode.update({"episode": standardized_episode_number})
standardized_episode_number = int(standardized_episode_number)
return map_to_anime_result(search_result, anime)
@lru_cache()
def _get_search_result(self, params: AnimeParams) -> Optional[SearchResult]:
search_results = self._search(SearchParams(query=params.query))
if not search_results or not search_results.results:
logger.error(f"No search results found for ID {params.id}")
return None
for search_result in search_results.results:
if search_result.id == params.id:
return search_result
@lru_cache()
def _anime_page_loader(self, m, id, sort, page) -> AnimePaheAnimePage:
url_params = {
"m": m,
"id": id,
"sort": sort,
"page": page,
}
response = self.client.get(ANIMEPAHE_ENDPOINT, params=url_params)
response.raise_for_status()
return response.json()
@debug_provider
def episode_streams(self, params: EpisodeStreamsParams) -> Iterator[Server] | None:
from ...scraping.html_parser import (
extract_attributes,
get_element_by_id,
get_elements_html_by_class,
)
episode = self._get_episode_info(params)
if not episode:
logger.error(
f"Episode {params.episode} doesn't exist for anime {params.anime_id}"
)
return
url = f"{ANIMEPAHE_BASE}/play/{params.anime_id}/{episode.session_id}"
response = self.client.get(url, follow_redirects=True)
response.raise_for_status()
c = get_element_by_id("resolutionMenu", response.text)
if not c:
logger.error("Resolution menu not found in the response")
return
resolutionMenuItems = get_elements_html_by_class("dropdown-item", c)
res_dicts = [extract_attributes(item) for item in resolutionMenuItems]
quality = None
translation_type = None
stream_links = []
stream_host = None
# TODO: better document the scraping process
for res_dict in res_dicts:
# the actual attributes are data attributes in the original html 'prefixed with data-'
embed_url = res_dict["src"]
logger.debug(f"Found embed url: {embed_url}")
data_audio = "dub" if res_dict["audio"] == "eng" else "sub"
if data_audio != params.translation_type:
logger.debug(f"Found {data_audio} but wanted {params.translation_type}")
continue
if not embed_url:
logger.warning("embed url not found please report to the developers")
continue
embed_response = self.client.get(
embed_url,
headers={
"User-Agent": self.client.headers["User-Agent"],
**SERVER_HEADERS,
},
)
embed_response.raise_for_status()
embed_page = embed_response.text
logger.debug("Processing embed page for JS decoding")
decoded_js = process_animepahe_embed_page(embed_page)
if not decoded_js:
logger.error("failed to decode embed page")
continue
logger.debug(f"Decoded JS: {decoded_js[:100]}...")
juicy_stream = JUICY_STREAM_REGEX.search(decoded_js)
if not juicy_stream:
logger.error("failed to find juicy stream")
continue
logger.debug(f"Found juicy stream: {juicy_stream.group(1)}")
juicy_stream = juicy_stream.group(1)
stream_host = urlparse(juicy_stream).hostname
quality = res_dict["resolution"]
logger.debug(f"Found quality: {quality}")
translation_type = data_audio
stream_links.append((quality, juicy_stream))
if translation_type and stream_links:
headers = {
"User-Agent": self.client.headers["User-Agent"],
"Host": stream_host or CDN_PROVIDER,
**STREAM_HEADERS,
}
yield map_to_server(
episode, translation_type, stream_links, headers=headers
)
@lru_cache()
def _get_episode_info(
self, params: EpisodeStreamsParams
) -> Optional[AnimeEpisodeInfo]:
anime_info = self._get_anime(
AnimeParams(id=params.anime_id, query=params.query)
)
if not anime_info:
logger.error(f"No anime info for {params.anime_id}")
return
if not anime_info.episodes_info:
logger.error(f"No episodes info for {params.anime_id}")
return
for episode in anime_info.episodes_info:
if episode.episode == params.episode:
return episode
if __name__ == "__main__":
from ..utils.debug import test_anime_provider
test_anime_provider(AnimePahe)
@@ -1,108 +0,0 @@
from enum import Enum
from typing import Literal, TypedDict
class Server(Enum):
KWIK = "Kwik"
class AnimePaheSearchResult(TypedDict):
id: str
title: str
type: str
episodes: int
status: str
season: str
year: int
score: int
poster: str
session: str
class AnimePaheSearchPage(TypedDict):
total: int
per_page: int
current_page: int
last_page: int
_from: int
to: int
data: list[AnimePaheSearchResult]
class Episode(TypedDict):
id: str
anime_id: int
episode: float
episode2: int
edition: str
title: str
snapshot: str # episode image
disc: str
audio: Literal["eng", "jpn"]
duration: str # time 00:00:00
session: str
filler: int
created_at: str
class AnimePaheAnimePage(TypedDict):
total: int
per_page: int
current_page: int
last_page: int
next_page_url: str | None
prev_page_url: str | None
_from: int
to: int
data: list[Episode]
class AnimePaheEpisodeInfo(TypedDict):
title: str
episode: float
id: str
translation_type: Literal["eng", "jpn"]
duration: str
poster: str
class AvailableEpisodesDetail(TypedDict):
sub: list[str]
dub: list[str]
raw: list[str]
class AnimePaheAnime(TypedDict):
id: str
title: str
year: int
season: str
poster: str
score: int
availableEpisodesDetail: AvailableEpisodesDetail
episodesInfo: list[AnimePaheEpisodeInfo]
class PageInfo(TypedDict):
total: int
perPage: int
currentPage: int
class AnimePaheSearchResults(TypedDict):
pageInfo: PageInfo
results: list[AnimePaheSearchResult]
class AnimePaheStreamLink(TypedDict):
quality: str
translation_type: Literal["sub", "dub"]
link: str
class AnimePaheServer(TypedDict):
server: Literal["kwik"]
links: list[AnimePaheStreamLink]
episode_title: str
subtitles: list
headers: dict
@@ -1,17 +0,0 @@
import re
ANIMEUNITY = "animeunity.so"
ANIMEUNITY_BASE = f"https://www.{ANIMEUNITY}"
MAX_TIMEOUT = 10
TOKEN_REGEX = re.compile(r'<meta.*?name="csrf-token".*?content="([^"]*)".*?>')
REPLACEMENT_WORDS = {"Season ": "", "Cour": "Part"}
# Server Specific
AVAILABLE_VIDEO_QUALITY = ["1080", "720", "480"]
VIDEO_INFO_REGEX = re.compile(r"window.video\s*=\s*(\{[^\}]*\})")
VIDEO_INFO_CLEAN_REGEX = re.compile(r'(?<!["\'])(\b\w+\b)(?=\s*:)')
DOWNLOAD_FILENAME_REGEX = re.compile(r"[?&]filename=([^&]+)")
QUALITY_REGEX = re.compile(r"/(\d{3,4}p)")
DOWNLOAD_URL_REGEX = re.compile(r"window.downloadUrl\s*=\s*'([^']*)'")
@@ -1,50 +0,0 @@
import logging
from .constants import (
DOWNLOAD_FILENAME_REGEX,
DOWNLOAD_URL_REGEX,
QUALITY_REGEX,
VIDEO_INFO_CLEAN_REGEX,
VIDEO_INFO_REGEX,
)
logger = logging.getLogger(__name__)
def extract_server_info(html_content: str, episode_title: str | None) -> dict | None:
"""
Extracts server information from the VixCloud/AnimeUnity embed page.
Handles extraction from both window.video object and download URL.
"""
video_info = VIDEO_INFO_REGEX.search(html_content)
download_url_match = DOWNLOAD_URL_REGEX.search(html_content)
if not (download_url_match and video_info):
return None
info_str = VIDEO_INFO_CLEAN_REGEX.sub(r'"\1"', video_info.group(1))
# Use eval context for JS constants
ctx = {"null": None, "true": True, "false": False}
try:
info = eval(info_str, ctx)
except Exception as e:
logger.error(f"Failed to parse JS object: {e}")
return None
download_url = download_url_match.group(1)
info["link"] = download_url
# Extract metadata from download URL if missing in window.video
if filename_match := DOWNLOAD_FILENAME_REGEX.search(download_url):
info["name"] = filename_match.group(1)
else:
info["name"] = f"{episode_title or 'Unknown'}"
if quality_match := QUALITY_REGEX.search(download_url):
# "720p" -> 720
info["quality"] = int(quality_match.group(1)[:-1])
else:
info["quality"] = 0 # Fallback
return info
@@ -1,133 +0,0 @@
from typing import Literal
from ..types import (
Anime,
AnimeEpisodeInfo,
AnimeEpisodes,
EpisodeStream,
MediaTranslationType,
PageInfo,
SearchResult,
SearchResults,
Server,
)
from .constants import AVAILABLE_VIDEO_QUALITY
def map_to_search_results(
data: dict, translation_type: Literal["sub", "dub"]
) -> SearchResults:
results = []
for result in data:
mapped_result = map_to_search_result(result, translation_type)
if mapped_result:
results.append(mapped_result)
return SearchResults(
page_info=PageInfo(),
results=results,
)
def map_to_search_result(
data: dict, translation_type: Literal["sub", "dub"] | None
) -> SearchResult | None:
if translation_type and data["dub"] != 1 if translation_type == "dub" else 0:
return None
return SearchResult(
id=str(data["id"]),
title=get_titles(data)[0] if get_titles(data) else "Unknown",
episodes=AnimeEpisodes(
sub=(
list(map(str, range(1, get_episodes_count(data) + 1)))
if data["dub"] == 0
else []
),
dub=(
list(map(str, range(1, get_episodes_count(data) + 1)))
if data["dub"] == 1
else []
),
),
other_titles=get_titles(data),
score=data["score"],
poster=data["imageurl"],
year=data["date"],
)
def map_to_anime_result(data: list, search_result: SearchResult) -> Anime:
return Anime(
id=search_result.id,
title=search_result.title,
episodes=AnimeEpisodes(
sub=[
episode["number"]
for episode in data
if len(search_result.episodes.sub) > 0
],
dub=[
episode["number"]
for episode in data
if len(search_result.episodes.dub) > 0
],
),
episodes_info=[
AnimeEpisodeInfo(
id=str(episode["id"]),
episode=episode["number"],
title=f"{search_result.title} - Ep {episode['number']}",
)
for episode in data
],
type=search_result.media_type,
poster=search_result.poster,
year=search_result.year,
)
def map_to_server(
episode: AnimeEpisodeInfo, info: dict, translation_type: Literal["sub", "dub"]
) -> Server:
return Server(
name="vixcloud",
links=[
EpisodeStream(
link=info["link"].replace(str(info["quality"]), quality),
title=info["name"],
quality=quality, # type: ignore
translation_type=MediaTranslationType(translation_type),
mp4=True,
)
for quality in sorted(
list(set(AVAILABLE_VIDEO_QUALITY + [str(info["quality"])])),
key=lambda x: int(x),
reverse=True,
)
if int(quality) <= info["quality"]
],
episode_title=episode.title,
)
def get_titles(data: dict) -> list[str]:
"""
Return the most appropriate title from the record.
"""
titles = []
if data.get("title_eng"):
titles.append(data["title_eng"])
if data.get("title"):
titles.append(data["title"])
if data.get("title_it"):
titles.append(data["title_it"])
return titles
def get_episodes_count(record: dict) -> int:
"""
Return the number of episodes from the record.
"""
if (count := record.get("real_episodes_count", 0)) > 0:
return count
return record.get("episodes_count", 0)
@@ -1,170 +0,0 @@
import logging
from functools import lru_cache
from ...scraping.user_agents import UserAgentGenerator
from ..base import BaseAnimeProvider
from ..params import AnimeParams, EpisodeStreamsParams, SearchParams
from ..types import Anime, AnimeEpisodeInfo, SearchResult, SearchResults
from ..utils.debug import debug_provider
from .constants import (
ANIMEUNITY_BASE,
MAX_TIMEOUT,
REPLACEMENT_WORDS,
TOKEN_REGEX,
)
from .extractor import extract_server_info
from .mappers import (
map_to_anime_result,
map_to_search_result,
map_to_search_results,
map_to_server,
)
logger = logging.getLogger(__name__)
class AnimeUnity(BaseAnimeProvider):
HEADERS = {
"User-Agent": UserAgentGenerator().random(),
}
_cache = dict[str, SearchResult]()
@lru_cache
def _get_token(self) -> None:
response = self.client.get(
ANIMEUNITY_BASE,
headers=self.HEADERS,
timeout=MAX_TIMEOUT,
follow_redirects=True,
)
response.raise_for_status()
token_match = TOKEN_REGEX.search(response.text)
if token_match:
self.HEADERS["x-csrf-token"] = token_match.group(1)
self.client.cookies = {
"animeunity_session": response.cookies.get("animeunity_session") or ""
}
self.client.headers = self.HEADERS
@debug_provider
def search(self, params: SearchParams) -> SearchResults | None:
if not (res := self._search(params)):
return None
for result in res.results:
self._cache[result.id] = result
return res
@lru_cache
def _search(self, params: SearchParams) -> SearchResults | None:
self._get_token()
# Replace words in query to
query = params.query
for old, new in REPLACEMENT_WORDS.items():
query = query.replace(old, new)
response = self.client.post(
url=f"{ANIMEUNITY_BASE}/livesearch",
data={"title": query},
timeout=MAX_TIMEOUT,
)
response.raise_for_status()
return map_to_search_results(
response.json().get("records", []), params.translation_type
)
@debug_provider
def get(self, params: AnimeParams) -> Anime | None:
return self._get_anime(params)
@lru_cache()
def _get_search_result(self, params: AnimeParams) -> SearchResult | None:
if cached := self._cache.get(params.id):
return cached
response = self.client.get(
url=f"{ANIMEUNITY_BASE}/info_api/{params.id}/",
timeout=MAX_TIMEOUT,
)
response.raise_for_status()
data = response.json()
if res := map_to_search_result(data, None):
self._cache[params.id] = res
return res
@lru_cache
def _get_anime(self, params: AnimeParams) -> Anime | None:
if (search_result := self._get_search_result(params)) is None:
logger.error(f"No search result found for ID {params.id}")
return None
# Fetch episodes in chunks
data = []
start_range = 1
episode_count = max(
len(search_result.episodes.sub), len(search_result.episodes.dub)
)
while start_range <= episode_count:
end_range = min(start_range + 119, episode_count)
response = self.client.get(
url=f"{ANIMEUNITY_BASE}/info_api/{params.id}/1",
params={
"start_range": start_range,
"end_range": end_range,
},
timeout=MAX_TIMEOUT,
)
response.raise_for_status()
data.extend(response.json().get("episodes", []))
start_range = end_range + 1
return map_to_anime_result(data, search_result)
@lru_cache()
def _get_episode_info(
self, params: EpisodeStreamsParams
) -> AnimeEpisodeInfo | None:
anime_info = self._get_anime(
AnimeParams(id=params.anime_id, query=params.query)
)
if not anime_info:
logger.error(f"No anime info for {params.anime_id}")
return
if not anime_info.episodes_info:
logger.error(f"No episodes info for {params.anime_id}")
return
for episode in anime_info.episodes_info:
if episode.episode == params.episode:
return episode
@debug_provider
def episode_streams(self, params: EpisodeStreamsParams):
if not (episode := self._get_episode_info(params)):
logger.error(
f"Episode {params.episode} doesn't exist for anime {params.anime_id}"
)
return
# Get the Server url
response = self.client.get(
url=f"{ANIMEUNITY_BASE}/embed-url/{episode.id}", timeout=MAX_TIMEOUT
)
response.raise_for_status()
# Fetch the Server page
video_response = self.client.get(url=response.text.strip(), timeout=MAX_TIMEOUT)
video_response.raise_for_status()
if not (info := extract_server_info(video_response.text, episode.title)):
logger.error(f"Failed to extract video info for episode {episode.id}")
return None
yield map_to_server(episode, info, params.translation_type)
if __name__ == "__main__":
from ..utils.debug import test_anime_provider
test_anime_provider(AnimeUnity)