Delete viu_media/libs/provider directory

This commit is contained in:
Benedict Xavier
2026-04-18 11:26:36 +03:00
committed by GitHub
parent f63c7661c6
commit 7e93e4eef9
42 changed files with 0 additions and 3140 deletions

View File

@@ -1,39 +0,0 @@
import re
from .....core.constants import GRAPHQL_DIR
SERVERS_AVAILABLE = [
"sharepoint",
"dropbox",
"gogoanime",
"weTransfer",
"wixmp",
"Yt",
"mp4-upload",
]
API_BASE_URL = "allanime.day"
API_GRAPHQL_REFERER = "https://allanime.to/"
API_GRAPHQL_ENDPOINT = f"https://api.{API_BASE_URL}/api/"
API_GRAPHQL_HEADERS= {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Content-Type": "application/json",
"Origin": f"{API_GRAPHQL_REFERER}",
}
# search constants
DEFAULT_COUNTRY_OF_ORIGIN = "all"
DEFAULT_NSFW = True
DEFAULT_UNKNOWN = True
DEFAULT_PER_PAGE = 40
DEFAULT_PAGE = 1
# regex stuff
MP4_SERVER_JUICY_STREAM_REGEX = re.compile(
r"video/mp4\",src:\"(https?://.*/video\.mp4)\""
)
# graphql files
_GQL_QUERIES = GRAPHQL_DIR / "allanime" / "queries"
SEARCH_GQL = _GQL_QUERIES / "search.gql"
ANIME_GQL = _GQL_QUERIES / "anime.gql"
EPISODE_GQL = _GQL_QUERIES / "episodes.gql"

View File

@@ -1,3 +0,0 @@
from .extractor import extract_server
__all__ = ["extract_server"]

View File

@@ -1,31 +0,0 @@
from ...types import EpisodeStream, Server
from ..constants import API_BASE_URL
from ..types import AllAnimeEpisode, AllAnimeSource
from .base import BaseExtractor
class AkExtractor(BaseExtractor):
@classmethod
def extract(
cls,
url,
client,
episode_number: str,
episode: AllAnimeEpisode,
source: AllAnimeSource,
) -> Server:
response = client.get(
f"https://{API_BASE_URL}{url.replace('clock', 'clock.json')}",
timeout=10,
)
response.raise_for_status()
streams = response.json()
return Server(
name="Ak",
links=[
EpisodeStream(link=link, quality="1080") for link in streams["links"]
],
episode_title=episode["notes"],
headers={"Referer": f"https://{API_BASE_URL}/"},
)

View File

@@ -1,20 +0,0 @@
from abc import ABC, abstractmethod
from httpx import Client
from ...types import Server
from ..types import AllAnimeEpisode, AllAnimeSource
class BaseExtractor(ABC):
@classmethod
@abstractmethod
def extract(
cls,
url: str,
client: Client,
episode_number: str,
episode: AllAnimeEpisode,
source: AllAnimeSource,
) -> Server | None:
pass

View File

@@ -1,31 +0,0 @@
from ...types import EpisodeStream, Server
from ..constants import API_BASE_URL
from ..types import AllAnimeEpisode, AllAnimeSource
from .base import BaseExtractor
class SakExtractor(BaseExtractor):
@classmethod
def extract(
cls,
url,
client,
episode_number: str,
episode: AllAnimeEpisode,
source: AllAnimeSource,
) -> Server:
response = client.get(
f"https://{API_BASE_URL}{url.replace('clock', 'clock.json')}",
timeout=10,
)
response.raise_for_status()
streams = response.json()
return Server(
name="dropbox",
links=[
EpisodeStream(link=link, quality="1080") for link in streams["links"]
],
episode_title=episode["notes"],
headers={"Referer": f"https://{API_BASE_URL}/"},
)

View File

@@ -1,65 +0,0 @@
from httpx import Client
from ...types import Server
from ..types import AllAnimeEpisode, AllAnimeSource
from ..utils import debug_extractor, logger, one_digit_symmetric_xor
from .ak import AkExtractor
from .dropbox import SakExtractor
from .filemoon import FmHlsExtractor, OkExtractor
from .gogoanime import Lufmp4Extractor
from .mp4_upload import Mp4Extractor
from .sharepoint import Smp4Extractor
from .streamsb import SsHlsExtractor
from .vid_mp4 import VidMp4Extractor
from .we_transfer import KirExtractor
from .wixmp import DefaultExtractor
from .yt_mp4 import YtExtractor
AVAILABLE_SOURCES = {
"Sak": SakExtractor,
"S-mp4": Smp4Extractor,
"Luf-Mp4": Lufmp4Extractor,
"Default": DefaultExtractor,
"Yt-mp4": YtExtractor,
"Kir": KirExtractor,
"Mp4": Mp4Extractor,
}
OTHER_SOURCES = {
"Ak": AkExtractor,
"Vid-mp4": VidMp4Extractor,
"Ok": OkExtractor,
"Ss-Hls": SsHlsExtractor,
"Fm-Hls": FmHlsExtractor,
}
@debug_extractor
def extract_server(
client: Client,
episode_number: str,
episode: AllAnimeEpisode,
source: AllAnimeSource,
) -> Server | None:
url = source.get("sourceUrl")
if not url:
logger.debug(f"Url not found in source: {source}")
return
if url.startswith("--"):
url = one_digit_symmetric_xor(56, url[2:])
logger.debug(f"Decrypting url for source: {source['sourceName']}")
if source["sourceName"] in OTHER_SOURCES:
logger.debug(f"Found {source['sourceName']} but ignoring")
return
if source["sourceName"] not in AVAILABLE_SOURCES:
logger.debug(
f"Found {source['sourceName']} but did not expect it, its time to scrape lol"
)
return
logger.debug(f"Found {source['sourceName']}")
return AVAILABLE_SOURCES[source["sourceName"]].extract(
url, client, episode_number, episode, source
)

View File

@@ -1,62 +0,0 @@
from ...types import EpisodeStream, Server
from ..constants import API_BASE_URL, MP4_SERVER_JUICY_STREAM_REGEX
from ..types import AllAnimeEpisode, AllAnimeSource
from .base import BaseExtractor
# TODO: requires decoding obsfucated js (filemoon)
class FmHlsExtractor(BaseExtractor):
@classmethod
def extract(
cls,
url,
client,
episode_number: str,
episode: AllAnimeEpisode,
source: AllAnimeSource,
) -> Server:
response = client.get(
f"https://{API_BASE_URL}{url.replace('clock', 'clock.json')}",
timeout=10,
)
response.raise_for_status()
embed_html = response.text.replace(" ", "").replace("\n", "")
vid = MP4_SERVER_JUICY_STREAM_REGEX.search(embed_html)
if not vid:
raise Exception("")
return Server(
name="dropbox",
links=[EpisodeStream(link=vid.group(1), quality="1080")],
episode_title=episode["notes"],
headers={"Referer": "https://www.mp4upload.com/"},
)
# TODO: requires decoding obsfucated js (filemoon)
class OkExtractor(BaseExtractor):
@classmethod
def extract(
cls,
url,
client,
episode_number: str,
episode: AllAnimeEpisode,
source: AllAnimeSource,
) -> Server:
response = client.get(
f"https://{API_BASE_URL}{url.replace('clock', 'clock.json')}",
timeout=10,
)
response.raise_for_status()
embed_html = response.text.replace(" ", "").replace("\n", "")
vid = MP4_SERVER_JUICY_STREAM_REGEX.search(embed_html)
if not vid:
raise Exception("")
return Server(
name="dropbox",
links=[EpisodeStream(link=vid.group(1), quality="1080")],
episode_title=episode["notes"],
headers={"Referer": "https://www.mp4upload.com/"},
)

View File

@@ -1,34 +0,0 @@
from ...types import EpisodeStream, Server
from ..constants import API_BASE_URL
from ..types import AllAnimeEpisode, AllAnimeEpisodeStreams, AllAnimeSource
from .base import BaseExtractor
class Lufmp4Extractor(BaseExtractor):
@classmethod
def extract(
cls,
url,
client,
episode_number: str,
episode: AllAnimeEpisode,
source: AllAnimeSource,
) -> Server:
response = client.get(
f"https://{API_BASE_URL}{url.replace('clock', 'clock.json')}",
timeout=10,
)
response.raise_for_status()
streams: AllAnimeEpisodeStreams = response.json()
return Server(
name="gogoanime",
links=[
EpisodeStream(
link=stream["link"], quality="1080", format=stream["resolutionStr"]
)
for stream in streams["links"]
],
episode_title=episode["notes"],
headers={"Referer": f"https://{API_BASE_URL}/"},
)

View File

@@ -1,32 +0,0 @@
from ...types import EpisodeStream, Server
from ..constants import MP4_SERVER_JUICY_STREAM_REGEX
from ..utils import logger
from .base import BaseExtractor
class Mp4Extractor(BaseExtractor):
@classmethod
def extract(cls, url, client, episode_number, episode, source):
response = client.get(url, timeout=10, follow_redirects=True)
response.raise_for_status()
embed_html = response.text.replace(" ", "").replace("\n", "")
# NOTE: some of the video were deleted so the embed html will just be "Filewasdeleted"
vid = MP4_SERVER_JUICY_STREAM_REGEX.search(embed_html)
if not vid:
if embed_html == "Filewasdeleted":
logger.debug(
"Failed to extract stream url from mp4-uploads. Reason: Filewasdeleted"
)
return
logger.debug(
f"Failed to extract stream url from mp4-uploads. Reason: unknown. Embed html: {embed_html}"
)
return
return Server(
name="mp4-upload",
links=[EpisodeStream(link=vid.group(1), quality="1080")],
episode_title=episode["notes"],
headers={"Referer": "https://www.mp4upload.com/"},
)

View File

@@ -1,28 +0,0 @@
from ...types import EpisodeStream, Server
from ..constants import API_BASE_URL
from ..types import AllAnimeEpisodeStreams
from .base import BaseExtractor
class Smp4Extractor(BaseExtractor):
@classmethod
def extract(cls, url, client, episode_number, episode, source):
response = client.get(
f"https://{API_BASE_URL}{url.replace('clock', 'clock.json')}",
timeout=10,
)
response.raise_for_status()
streams: AllAnimeEpisodeStreams = response.json()
return Server(
name="sharepoint",
links=[
EpisodeStream(
link=stream["link"],
quality="1080",
format=stream["resolutionStr"],
)
for stream in streams["links"]
],
episode_title=episode["notes"],
headers={"Referer": f"https://{API_BASE_URL}/"},
)

View File

@@ -1,32 +0,0 @@
from ...types import EpisodeStream, Server
from ..constants import API_BASE_URL
from ..types import AllAnimeEpisode, AllAnimeSource
from .base import BaseExtractor
class SsHlsExtractor(BaseExtractor):
@classmethod
def extract(
cls,
url,
client,
episode_number: str,
episode: AllAnimeEpisode,
source: AllAnimeSource,
) -> Server:
# TODO: requires some serious work i think : )
response = client.get(
url,
timeout=10,
)
response.raise_for_status()
streams = response.json()["links"]
return Server(
name="StreamSb",
links=[
EpisodeStream(link=link, quality="1080") for link in streams["links"]
],
episode_title=episode["notes"],
headers={"Referer": f"https://{API_BASE_URL}/"},
)

View File

@@ -1,32 +0,0 @@
from ...types import EpisodeStream, Server
from ..constants import API_BASE_URL
from ..types import AllAnimeEpisode, AllAnimeSource
from .base import BaseExtractor
# TODO: requires some serious work i think : )
class VidMp4Extractor(BaseExtractor):
@classmethod
def extract(
cls,
url,
client,
episode_number: str,
episode: AllAnimeEpisode,
source: AllAnimeSource,
) -> Server:
response = client.get(
f"https://{API_BASE_URL}{url.replace('clock', 'clock.json')}",
timeout=10,
)
response.raise_for_status()
streams = response.json()
return Server(
name="Vid-mp4",
links=[
EpisodeStream(link=link, quality="1080") for link in streams["links"]
],
episode_title=episode["notes"],
headers={"Referer": f"https://{API_BASE_URL}/"},
)

View File

@@ -1,31 +0,0 @@
from ...types import EpisodeStream, Server
from ..constants import API_BASE_URL
from ..types import AllAnimeEpisode, AllAnimeSource
from .base import BaseExtractor
class KirExtractor(BaseExtractor):
@classmethod
def extract(
cls,
url,
client,
episode_number: str,
episode: AllAnimeEpisode,
source: AllAnimeSource,
) -> Server:
response = client.get(
f"https://{API_BASE_URL}{url.replace('clock', 'clock.json')}",
timeout=10,
)
response.raise_for_status()
streams = response.json()
return Server(
name="weTransfer",
links=[
EpisodeStream(link=link, quality="1080") for link in streams["links"]
],
episode_title=episode["notes"],
headers={"Referer": f"https://{API_BASE_URL}/"},
)

View File

@@ -1,26 +0,0 @@
from ...types import EpisodeStream, Server
from ..constants import API_BASE_URL
from ..types import AllAnimeEpisodeStreams
from .base import BaseExtractor
class DefaultExtractor(BaseExtractor):
@classmethod
def extract(cls, url, client, episode_number, episode, source):
response = client.get(
f"https://{API_BASE_URL}{url.replace('clock', 'clock.json')}",
timeout=10,
)
response.raise_for_status()
streams: AllAnimeEpisodeStreams = response.json()
return Server(
name="wixmp",
links=[
EpisodeStream(
link=stream["link"], quality="1080", format=stream["resolutionStr"]
)
for stream in streams["links"]
],
episode_title=episode["notes"],
headers={"Referer": f"https://{API_BASE_URL}/"},
)

View File

@@ -1,22 +0,0 @@
from ...types import EpisodeStream, Server
from ..constants import API_BASE_URL
from ..types import AllAnimeEpisode, AllAnimeSource
from .base import BaseExtractor
class YtExtractor(BaseExtractor):
@classmethod
def extract(
cls,
url,
client,
episode_number: str,
episode: AllAnimeEpisode,
source: AllAnimeSource,
) -> Server:
return Server(
name="Yt",
links=[EpisodeStream(link=url, quality="1080")],
episode_title=episode["notes"],
headers={"Referer": f"https://{API_BASE_URL}/"},
)

View File

@@ -1,58 +0,0 @@
from typing import Union
from httpx import Response
from ..types import (
Anime,
AnimeEpisodes,
MediaTranslationType,
PageInfo,
SearchResult,
SearchResults,
)
from .types import AllAnimeSearchResults, AllAnimeShow
def generate_list(count: Union[int, str]) -> list[str]:
return list(map(str, range(int(count))))
translation_type_map = {
"sub": MediaTranslationType.SUB,
"dub": MediaTranslationType.DUB,
"raw": MediaTranslationType.RAW,
}
def map_to_search_results(response: Response) -> SearchResults:
search_results: AllAnimeSearchResults = response.json()["data"]
return SearchResults(
page_info=PageInfo(total=search_results["shows"]["pageInfo"]["total"]),
results=[
SearchResult(
id=result["_id"],
title=result["name"],
media_type=result["__typename"],
episodes=AnimeEpisodes(
sub=generate_list(result["availableEpisodes"]["sub"]),
dub=generate_list(result["availableEpisodes"]["dub"]),
raw=generate_list(result["availableEpisodes"]["raw"]),
),
)
for result in search_results["shows"]["edges"]
],
)
def map_to_anime_result(response: Response) -> Anime:
anime: AllAnimeShow = response.json()["data"]["show"]
return Anime(
id=anime["_id"],
title=anime["name"],
episodes=AnimeEpisodes(
sub=sorted(anime["availableEpisodesDetail"]["sub"], key=float),
dub=sorted(anime["availableEpisodesDetail"]["dub"], key=float),
raw=sorted(anime["availableEpisodesDetail"]["raw"], key=float),
),
type=anime.get("__typename"),
)

View File

@@ -1,84 +0,0 @@
import logging
from typing import TYPE_CHECKING
from .....core.utils.graphql import execute_graphql
from ..base import BaseAnimeProvider
from ..utils.debug import debug_provider
from .constants import (
ANIME_GQL,
API_GRAPHQL_ENDPOINT,
API_GRAPHQL_HEADERS,
API_GRAPHQL_REFERER,
EPISODE_GQL,
SEARCH_GQL,
)
from .mappers import (
map_to_anime_result,
map_to_search_results,
)
if TYPE_CHECKING:
from .types import AllAnimeEpisode
logger = logging.getLogger(__name__)
class AllAnime(BaseAnimeProvider):
HEADERS = {"Referer": API_GRAPHQL_REFERER}
@debug_provider
def search(self, params):
response = execute_graphql(
API_GRAPHQL_ENDPOINT,
self.client,
SEARCH_GQL,
variables={
"search": {
"allowAdult": params.allow_nsfw,
"allowUnknown": params.allow_unknown,
"query": params.query,
},
"limit": params.page_limit,
"page": params.current_page,
"translationtype": params.translation_type,
"countryorigin": params.country_of_origin,
},
headers=API_GRAPHQL_HEADERS
)
return map_to_search_results(response)
@debug_provider
def get(self, params):
response = execute_graphql(
API_GRAPHQL_ENDPOINT,
self.client,
ANIME_GQL,
variables={"showId": params.id},
headers=API_GRAPHQL_HEADERS
)
return map_to_anime_result(response)
@debug_provider
def episode_streams(self, params):
from .extractors import extract_server
episode_response = execute_graphql(
API_GRAPHQL_ENDPOINT,
self.client,
EPISODE_GQL,
variables={
"showId": params.anime_id,
"translationType": params.translation_type,
"episodeString": params.episode,
},
headers=API_GRAPHQL_HEADERS
)
episode: AllAnimeEpisode = episode_response.json()["data"]["episode"]
for source in episode["sourceUrls"]:
if server := extract_server(self.client, params.episode, episode, source):
yield server
if __name__ == "__main__":
from ..utils.debug import test_anime_provider
test_anime_provider(AllAnime)

View File

@@ -1,111 +0,0 @@
from enum import Enum
from typing import Literal, TypedDict
class Server(Enum):
SHAREPOINT = "sharepoint"
DROPBOX = "dropbox"
GOGOANIME = "gogoanime"
WETRANSFER = "weTransfer"
WIXMP = "wixmp"
YT = "Yt"
MP4_UPLOAD = "mp4-upload"
class AllAnimeEpisodesDetail(TypedDict):
dub: list[str]
sub: list[str]
raw: list[str]
class AllAnimeEpisodes(TypedDict):
dub: int
sub: int
raw: int
class AllAnimePageInfo(TypedDict):
total: int
class AllAnimeShow(TypedDict):
_id: str
name: str
availableEpisodesDetail: AllAnimeEpisodesDetail
__typename: str
class AllAnimeSearchResult(TypedDict):
_id: str
name: str
availableEpisodes: AllAnimeEpisodes
__typename: str | None
class AllAnimeShows(TypedDict):
pageInfo: AllAnimePageInfo
edges: list[AllAnimeSearchResult]
class AllAnimeSearchResults(TypedDict):
shows: AllAnimeShows
class AllAnimeSourceDownload(TypedDict):
sourceName: str
dowloadUrl: str
class AllAnimeSource(TypedDict):
sourceName: Literal[
"Sak",
"S-mp4",
"Luf-mp4",
"Default",
"Yt-mp4",
"Kir",
"Mp4",
"Ak",
"Vid-mp4",
"Ok",
"Ss-Hls",
"Fm-Hls",
]
sourceUrl: str
priority: float
sandbox: str
type: str
className: str
streamerId: str
downloads: AllAnimeSourceDownload
class AllAnimeEpisodeStream(TypedDict):
link: str
hls: bool
resolutionStr: str
fromCache: str
class AllAnimeEpisodeStreams(TypedDict):
links: list[AllAnimeEpisodeStream]
class AllAnimeEpisode(TypedDict):
episodeString: str
sourceUrls: list[AllAnimeSource]
notes: str | None
class AllAnimeStream:
link: str
mp4: bool
hls: bool | None
resolutionStr: str
fromCache: str
priority: int
headers: dict | None
class AllAnimeStreams:
links: list[AllAnimeStream]

View File

@@ -1,92 +0,0 @@
import functools
import logging
import os
import re
from itertools import cycle
logger = logging.getLogger(__name__)
# Dictionary to map hex values to characters
hex_to_char = {
"01": "9",
"08": "0",
"05": "=",
"0a": "2",
"0b": "3",
"0c": "4",
"07": "?",
"00": "8",
"5c": "d",
"0f": "7",
"5e": "f",
"17": "/",
"54": "l",
"09": "1",
"48": "p",
"4f": "w",
"0e": "6",
"5b": "c",
"5d": "e",
"0d": "5",
"53": "k",
"1e": "&",
"5a": "b",
"59": "a",
"4a": "r",
"4c": "t",
"4e": "v",
"57": "o",
"51": "i",
}
def debug_extractor(extractor_function):
@functools.wraps(extractor_function)
def _provider_function_wrapper(*args):
if not os.environ.get("VIU_DEBUG"):
try:
return extractor_function(*args)
except Exception as e:
logger.error(
f"[AllAnime@Server={args[3].get('sourceName', 'UNKNOWN')}]: {e}"
)
else:
return extractor_function(*args)
return _provider_function_wrapper
def give_random_quality(links):
qualities = cycle(["1080", "720", "480", "360"])
return [
{**episode_stream, "quality": quality}
for episode_stream, quality in zip(links, qualities, strict=False)
]
def one_digit_symmetric_xor(password: int, target: str):
def genexp():
for segment in bytearray.fromhex(target):
yield segment ^ password
return bytes(genexp()).decode("utf-8")
def decode_hex_string(hex_string):
"""some of the sources encrypt the urls into hex codes this function decrypts the urls
Args:
hex_string ([TODO:parameter]): [TODO:description]
Returns:
[TODO:return]
"""
# Split the hex string into pairs of characters
hex_pairs = re.findall("..", hex_string)
# Decode each hex pair
decoded_chars = [hex_to_char.get(pair.lower(), pair) for pair in hex_pairs]
# TODO: Better type handling
return "".join(decoded_chars) # type: ignore

View File

@@ -1,56 +0,0 @@
import re
ANIMEPAHE = "animepahe.pw"
ANIMEPAHE_BASE = f"https://{ANIMEPAHE}"
ANIMEPAHE_ENDPOINT = f"{ANIMEPAHE_BASE}/api"
CDN_PROVIDER = "kwik.cx"
CDN_PROVIDER_BASE = f"https://{CDN_PROVIDER}"
SERVERS_AVAILABLE = ["kwik"]
REQUEST_HEADERS = {
"Cookie": "__ddgid_=VvX0ebHrH2DsFZo4; __ddgmark_=3savRpSVFhvZcn5x; __ddg2_=buBJ3c4pNBYKFZNp; __ddg1_=rbVADKr9URtt55zoIGFa; SERVERID=janna; XSRF-TOKEN=eyJpdiI6IjV5bFNtd0phUHgvWGJxc25wL0VJSUE9PSIsInZhbHVlIjoicEJTZktlR2hxR2JZTWhnL0JzazlvZU5TQTR2bjBWZ2dDb0RwUXVUUWNSclhQWUhLRStYSmJmWmUxWkpiYkFRYU12RjFWejlSWHorME1wZG5qQ1U0TnFlNnBFR2laQjN1MjdyNjc5TjVPdXdJb2o5VkU1bEduRW9pRHNDTHh6Sy8iLCJtYWMiOiI0OTc0ZmNjY2UwMGJkOWY2MWNkM2NlMjk2ZGMyZGJmMWE0NTdjZTdkNGI2Y2IwNTIzZmFiZWU5ZTE2OTk0YmU4IiwidGFnIjoiIn0%3D; laravel_session=eyJpdiI6ImxvdlpqREFnTjdaeFJubUlXQWlJVWc9PSIsInZhbHVlIjoiQnE4R3VHdjZ4M1NDdEVWM1ZqMUxtNnVERnJCcmtCUHZKNzRPR2RFbzNFcStTL29xdnVTbWhsNVRBUXEybVZWNU1UYVlTazFqYlN5UjJva1k4czNGaXBTbkJJK01oTUd3VHRYVHBoc3dGUWxHYnFlS2NJVVNFbTFqMVBWdFpuVUgiLCJtYWMiOiI1NDdjZTVkYmNhNjUwZTMxZmRlZmVmMmRlMGNiYjAwYjlmYjFjY2U0MDc1YTQzZThiMTIxMjJlYTg1NTA4YjBmIiwidGFnIjoiIn0%3D; latest=5592",
"Host": ANIMEPAHE,
"Accept": "application, text/javascript, */*; q=0.01",
"Accept-Encoding": "Utf-8",
"Referer": ANIMEPAHE_BASE,
"DNT": "1",
"Connection": "keep-alive",
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Site": "same-origin",
"Sec-Fetch-Mode": "cors",
"TE": "trailers",
}
SERVER_HEADERS = {
"Host": "kwik.cx",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/png,image/svg+xml,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
"Accept-Encoding": "Utf-8",
"DNT": "1",
"Connection": "keep-alive",
"Referer": ANIMEPAHE_BASE + "/",
"Upgrade-Insecure-Requests": "1",
"Sec-Fetch-Dest": "iframe",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "cross-site",
"Priority": "u=4",
"TE": "trailers",
}
STREAM_HEADERS = {
# "Host": "vault-16.owocdn.top", # This will have to be the actual host of the stream (behind Kwik)
"Accept": "*/*",
"Accept-Language": "en-US,en;q=0.5",
"Accept-Encoding": "gzip, deflate, br, zstd",
"Origin": CDN_PROVIDER_BASE,
"Sec-GPC": "1",
"Connection": "keep-alive",
"Referer": CDN_PROVIDER_BASE + "/",
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "cross-site",
"TE": "trailers",
}
JUICY_STREAM_REGEX = re.compile(r"source='(.*)';")
KWIK_RE = re.compile(r"Player\|(.+?)'")

View File

@@ -1,77 +0,0 @@
import re
def animepahe_key_creator(c: int, a: int):
from ...scraping.utils import encode_base_n
if c < a:
val_a = ""
else:
val_a = animepahe_key_creator(int(c / a), a)
c = c % a
if c > 35:
val_b = chr(c + 29)
else:
val_b = encode_base_n(c, 36)
return val_a + val_b
def animepahe_embed_decoder(
encoded_js_p: str,
base_a: int,
no_of_keys_c: int,
values_to_replace_with_k: list,
):
decode_mapper_d: dict = {}
for i in range(no_of_keys_c):
key = animepahe_key_creator(i, base_a)
val = values_to_replace_with_k[i] or key
decode_mapper_d[key] = val
return re.sub(
r"\b\w+\b", lambda match: decode_mapper_d[match.group(0)], encoded_js_p
)
PARAMETERS_REGEX = re.compile(r"eval\(function\(p,a,c,k,e,d\)\{.*\}\((.*?)\)\)$")
ENCODE_JS_REGEX = re.compile(r"'(.*?);',(\d+),(\d+),'(.*)'\.split")
def process_animepahe_embed_page(embed_page: str):
from ...scraping.html_parser import get_element_text_and_html_by_tag
encoded_js_string = ""
embed_page_content = embed_page
for _ in range(8):
text, html = get_element_text_and_html_by_tag("script", embed_page_content)
if not text and html:
embed_page_content = re.sub(html, "", embed_page_content)
continue
if text:
encoded_js_string = text.strip()
break
if not encoded_js_string:
return
obsfucated_js_parameter_match = PARAMETERS_REGEX.search(encoded_js_string)
if not obsfucated_js_parameter_match:
return
parameter_string = obsfucated_js_parameter_match.group(1)
encoded_js_parameter_string = ENCODE_JS_REGEX.search(parameter_string)
if not encoded_js_parameter_string:
return
p: str = encoded_js_parameter_string.group(1)
a: int = int(encoded_js_parameter_string.group(2))
c: int = int(encoded_js_parameter_string.group(3))
k: list = encoded_js_parameter_string.group(4).split("|")
return animepahe_embed_decoder(p, a, c, k).replace("\\", "")
if __name__ == "__main__":
# Testing time
filepath = input("Enter file name: ")
if filepath:
with open(filepath) as file:
data = file.read()
else:
data = """<script>eval(function(p,a,c,k,e,d){e=function(c){return(c<a?'':e(parseInt(c/a)))+((c=c%a)>35?String.fromCharCode(c+29):c.toString(36))};if(!''.replace(/^/,String)){while(c--){d[e(c)]=k[c]||e(c)}k=[function(e){return d[e]}];e=function(){return'\\w+'};c=1};while(c--){if(k[c]){p=p.replace(new RegExp('\\b'+e(c)+'\\b','g'),k[c])}}return p}('f $7={H:a(2){4 B(9.7.h(y z("(?:(?:^|.*;)\\\\s*"+d(2).h(/[\\-\\.\\+\\*]/g,"\\\\$&")+"\\\\s*\\\\=\\\\s*([^;]*).*$)|^.*$"),"$1"))||G},E:a(2,q,3,6,5,t){k(!2||/^(?:8|r\\-v|o|m|p)$/i.D(2)){4 w}f b="";k(3){F(3.J){j K:b=3===P?"; 8=O, I N Q M:u:u A":"; r-v="+3;n;j L:b="; 8="+3;n;j S:b="; 8="+3.Z();n}}9.7=d(2)+"="+d(q)+b+(5?"; m="+5:"")+(6?"; o="+6:"")+(t?"; p":"");4 x},Y:a(2,6,5){k(!2||!11.C(2)){4 w}9.7=d(2)+"=; 8=12, R 10 W l:l:l A"+(5?"; m="+5:"")+(6?"; o="+6:"");4 x},C:a(2){4(y z("(?:^|;\\\\s*)"+d(2).h(/[\\-\\.\\+\\*]/g,"\\\\$&")+"\\\\s*\\\\=")).D(9.7)},X:a(){f c=9.7.h(/((?:^|\\s*;)[^\\=]+)(?=;|$)|^\\s*|\\s*(?:\\=[^;]*)?(?:\\1|$)/g,"").T(/\\s*(?:\\=[^;]*)?;\\s*/);U(f e=0;e<c.V;e++){c[e]=B(c[e])}4 c}};',62,65,'||sKey|vEnd|return|sDomain|sPath|cookie|expires|document|function|sExpires|aKeys|encodeURIComponent|nIdx|var||replace||case|if|00|domain|break|path|secure|sValue|max||bSecure|59|age|false|true|new|RegExp|GMT|decodeURIComponent|hasItem|test|setItem|switch|null|getItem|31|constructor|Number|String|23|Dec|Fri|Infinity|9999|01|Date|split|for|length|1970|keys|removeItem|toUTCString|Jan|this|Thu'.split('|'),0,{}));eval(function(p,a,c,k,e,d){e=function(c){return(c<a?'':e(parseInt(c/a)))+((c=c%a)>35?String.fromCharCode(c+29):c.toString(36))};if(!''.replace(/^/,String)){while(c--){d[e(c)]=k[c]||e(c)}k=[function(e){return d[e]}];e=function(){return'\\w+'};c=1};while(c--){if(k[c]){p=p.replace(new RegExp('\\b'+e(c)+'\\b','g'),k[c])}}return p}('h o=\'1D://1C-E.1B.1A.1z/1y/E/1x/1w/1v.1u\';h d=s.r(\'d\');h 0=B 1t(d,{\'1s\':{\'1r\':i},\'1q\':\'16:9\',\'D\':1,\'1p\':5,\'1o\':{\'1n\':\'1m\'},1l:[\'7-1k\',\'7\',\'1j\',\'1i-1h\',\'1g\',\'1f-1e\',\'1d\',\'D\',\'1c\',\'1b\',\'1a\',\'19\',\'C\',\'18\'],\'C\':{\'17\':i}});8(!A.15()){d.14=o}x{j z={13:12,11:10,Z:Y,X:i,W:i};h c=B A(z);c.V(o);c.U(d);g.c=c}0.3("T",6=>{g.S.R.Q("P")});0.O=1;k v(b,n,m){8(b.y){b.y(n,m,N)}x 8(b.w){b.w(\'3\'+n,m)}}j 4=k(l){g.M.L(l,\'*\')};v(g,\'l\',k(e){j a=e.a;8(a===\'7\')0.7();8(a===\'f\')0.f();8(a===\'u\')0.u()});0.3(\'t\',6=>{4(\'t\')});0.3(\'7\',6=>{4(\'7\')});0.3(\'f\',6=>{4(\'f\')});0.3(\'K\',6=>{4(0.q);s.r(\'.J-I\').H=G(0.q.F(2))});0.3(\'p\',6=>{4(\'p\')});',62,102,'player|||on|sendMessage||event|play|if||data|element|hls|video||pause|window|const|true|var|function|message|eventHandler|eventName|source|ended|currentTime|querySelector|document|ready|stop|bindEvent|attachEvent|else|addEventListener|config|Hls|new|fullscreen|volume|01|toFixed|String|innerHTML|timestamp|ss|timeupdate|postMessage|parent|false|speed|landscape|lock|orientation|screen|enterfullscreen|attachMedia|loadSource|lowLatencyMode|enableWorker|Infinity|backBufferLength|600|maxMaxBufferLength|180|maxBufferLength|src|isSupported||iosNative|capture|airplay|pip|settings|captions|mute|time|current|progress|forward|fast|rewind|large|controls|kwik|key|storage|seekTime|ratio|global|keyboard|Plyr|m3u8|uwu|b92a392054c041a3f9c6eecabeb0e127183f44e547828447b10bca8d77523e6f|03|stream|org|nextcdn|files|eu|https'.split('|'),0,{}))</script>"""
print(process_animepahe_embed_page(data))

View File

@@ -1,113 +0,0 @@
import logging
from ..types import (
Anime,
AnimeEpisodeInfo,
AnimeEpisodes,
EpisodeStream,
MediaTranslationType,
PageInfo,
SearchResult,
SearchResults,
Server,
)
from .types import (
AnimePaheAnimePage,
AnimePaheSearchPage,
)
translation_type_map = {
"sub": MediaTranslationType.SUB,
"dub": MediaTranslationType.DUB,
"raw": MediaTranslationType.RAW,
}
logger = logging.getLogger(__name__)
def map_to_search_results(data: AnimePaheSearchPage) -> SearchResults:
results = []
for result in data["data"]:
results.append(
SearchResult(
id=result["session"],
title=result["title"],
episodes=AnimeEpisodes(
sub=list(map(str, range(1, result["episodes"] + 1))),
dub=list(map(str, range(1, result["episodes"] + 1))),
raw=list(map(str, range(1, result["episodes"] + 1))),
),
media_type=result["type"],
score=result["score"],
status=result["status"],
season=result["season"],
poster=result["poster"],
year=str(result["year"]),
)
)
return SearchResults(
page_info=PageInfo(
total=data["total"],
per_page=data["per_page"],
current_page=data["current_page"],
),
results=results,
)
def map_to_anime_result(
search_result: SearchResult, anime: AnimePaheAnimePage
) -> Anime:
episodes_info = []
episodes = []
anime["data"] = sorted(anime["data"], key=lambda k: float(k["episode"]))
for ep_info in anime["data"]:
episodes.append(str(ep_info["episode"]))
episodes_info.append(
AnimeEpisodeInfo(
id=str(ep_info["id"]),
session_id=ep_info["session"],
episode=str(ep_info["episode"]),
title=ep_info["title"],
poster=ep_info["snapshot"],
duration=str(ep_info["duration"]),
)
)
return Anime(
id=search_result.id,
title=search_result.title,
episodes=AnimeEpisodes(
sub=episodes,
dub=episodes,
),
year=str(search_result.year),
poster=search_result.poster,
episodes_info=episodes_info,
)
def map_to_server(
episode: AnimeEpisodeInfo,
translation_type: str,
stream_links: list[tuple[str, str]],
headers: dict[str, str],
) -> Server:
links = [
EpisodeStream(
link=link[1],
quality=link[0] if link[0] in ["360", "480", "720", "1080"] else "1080", # type:ignore
translation_type=translation_type_map[translation_type],
)
for link in stream_links
]
# sort links by quality, best to worst
links.sort(key=lambda x: int(x.quality), reverse=True)
logger.debug(f"Aggregated links: {links}")
return Server(
name="kwik", links=links, episode_title=episode.title, headers=headers
)

View File

@@ -1,214 +0,0 @@
import logging
from functools import lru_cache
from typing import Iterator, Optional
from urllib.parse import urlparse
from ..base import BaseAnimeProvider
from ..params import AnimeParams, EpisodeStreamsParams, SearchParams
from ..types import Anime, AnimeEpisodeInfo, SearchResult, SearchResults, Server
from ..utils.debug import debug_provider
from .constants import (
ANIMEPAHE_BASE,
ANIMEPAHE_ENDPOINT,
CDN_PROVIDER,
JUICY_STREAM_REGEX,
REQUEST_HEADERS,
SERVER_HEADERS,
STREAM_HEADERS,
)
from .extractor import process_animepahe_embed_page
from .mappers import map_to_anime_result, map_to_search_results, map_to_server
from .types import AnimePaheAnimePage, AnimePaheSearchPage
logger = logging.getLogger(__name__)
class AnimePahe(BaseAnimeProvider):
HEADERS = REQUEST_HEADERS
@debug_provider
def search(self, params: SearchParams) -> SearchResults | None:
return self._search(params)
@lru_cache()
def _search(self, params: SearchParams) -> SearchResults | None:
url_params = {"m": "search", "q": params.query}
response = self.client.get(ANIMEPAHE_ENDPOINT, params=url_params)
response.raise_for_status()
data: AnimePaheSearchPage = response.json()
if not data.get("data"):
return
return map_to_search_results(data)
@debug_provider
def get(self, params: AnimeParams) -> Anime | None:
return self._get_anime(params)
@lru_cache()
def _get_anime(self, params: AnimeParams) -> Anime | None:
page = 1
standardized_episode_number = 0
search_result = self._get_search_result(params)
if not search_result:
logger.error(f"No search result found for ID {params.id}")
return None
anime: Optional[AnimePaheAnimePage] = None
has_next_page = True
while has_next_page:
logger.debug(f"Loading page: {page}")
_anime_page = self._anime_page_loader(
m="release",
id=params.id,
sort="episode_asc",
page=page,
)
has_next_page = True if _anime_page["next_page_url"] else False
page += 1
if not anime:
anime = _anime_page
else:
anime["data"].extend(_anime_page["data"])
if anime:
for episode in anime.get("data", []):
if episode["episode"] % 1 == 0:
standardized_episode_number += 1
episode.update({"episode": standardized_episode_number})
else:
standardized_episode_number += episode["episode"] % 1
episode.update({"episode": standardized_episode_number})
standardized_episode_number = int(standardized_episode_number)
return map_to_anime_result(search_result, anime)
@lru_cache()
def _get_search_result(self, params: AnimeParams) -> Optional[SearchResult]:
search_results = self._search(SearchParams(query=params.query))
if not search_results or not search_results.results:
logger.error(f"No search results found for ID {params.id}")
return None
for search_result in search_results.results:
if search_result.id == params.id:
return search_result
@lru_cache()
def _anime_page_loader(self, m, id, sort, page) -> AnimePaheAnimePage:
url_params = {
"m": m,
"id": id,
"sort": sort,
"page": page,
}
response = self.client.get(ANIMEPAHE_ENDPOINT, params=url_params)
response.raise_for_status()
return response.json()
@debug_provider
def episode_streams(self, params: EpisodeStreamsParams) -> Iterator[Server] | None:
from ...scraping.html_parser import (
extract_attributes,
get_element_by_id,
get_elements_html_by_class,
)
episode = self._get_episode_info(params)
if not episode:
logger.error(
f"Episode {params.episode} doesn't exist for anime {params.anime_id}"
)
return
url = f"{ANIMEPAHE_BASE}/play/{params.anime_id}/{episode.session_id}"
response = self.client.get(url, follow_redirects=True)
response.raise_for_status()
c = get_element_by_id("resolutionMenu", response.text)
if not c:
logger.error("Resolution menu not found in the response")
return
resolutionMenuItems = get_elements_html_by_class("dropdown-item", c)
res_dicts = [extract_attributes(item) for item in resolutionMenuItems]
quality = None
translation_type = None
stream_links = []
stream_host = None
# TODO: better document the scraping process
for res_dict in res_dicts:
# the actual attributes are data attributes in the original html 'prefixed with data-'
embed_url = res_dict["src"]
logger.debug(f"Found embed url: {embed_url}")
data_audio = "dub" if res_dict["audio"] == "eng" else "sub"
if data_audio != params.translation_type:
logger.debug(f"Found {data_audio} but wanted {params.translation_type}")
continue
if not embed_url:
logger.warning("embed url not found please report to the developers")
continue
embed_response = self.client.get(
embed_url,
headers={
"User-Agent": self.client.headers["User-Agent"],
**SERVER_HEADERS,
},
)
embed_response.raise_for_status()
embed_page = embed_response.text
logger.debug("Processing embed page for JS decoding")
decoded_js = process_animepahe_embed_page(embed_page)
if not decoded_js:
logger.error("failed to decode embed page")
continue
logger.debug(f"Decoded JS: {decoded_js[:100]}...")
juicy_stream = JUICY_STREAM_REGEX.search(decoded_js)
if not juicy_stream:
logger.error("failed to find juicy stream")
continue
logger.debug(f"Found juicy stream: {juicy_stream.group(1)}")
juicy_stream = juicy_stream.group(1)
stream_host = urlparse(juicy_stream).hostname
quality = res_dict["resolution"]
logger.debug(f"Found quality: {quality}")
translation_type = data_audio
stream_links.append((quality, juicy_stream))
if translation_type and stream_links:
headers = {
"User-Agent": self.client.headers["User-Agent"],
"Host": stream_host or CDN_PROVIDER,
**STREAM_HEADERS,
}
yield map_to_server(
episode, translation_type, stream_links, headers=headers
)
@lru_cache()
def _get_episode_info(
self, params: EpisodeStreamsParams
) -> Optional[AnimeEpisodeInfo]:
anime_info = self._get_anime(
AnimeParams(id=params.anime_id, query=params.query)
)
if not anime_info:
logger.error(f"No anime info for {params.anime_id}")
return
if not anime_info.episodes_info:
logger.error(f"No episodes info for {params.anime_id}")
return
for episode in anime_info.episodes_info:
if episode.episode == params.episode:
return episode
if __name__ == "__main__":
from ..utils.debug import test_anime_provider
test_anime_provider(AnimePahe)

View File

@@ -1,108 +0,0 @@
from enum import Enum
from typing import Literal, TypedDict
class Server(Enum):
KWIK = "Kwik"
class AnimePaheSearchResult(TypedDict):
id: str
title: str
type: str
episodes: int
status: str
season: str
year: int
score: int
poster: str
session: str
class AnimePaheSearchPage(TypedDict):
total: int
per_page: int
current_page: int
last_page: int
_from: int
to: int
data: list[AnimePaheSearchResult]
class Episode(TypedDict):
id: str
anime_id: int
episode: float
episode2: int
edition: str
title: str
snapshot: str # episode image
disc: str
audio: Literal["eng", "jpn"]
duration: str # time 00:00:00
session: str
filler: int
created_at: str
class AnimePaheAnimePage(TypedDict):
total: int
per_page: int
current_page: int
last_page: int
next_page_url: str | None
prev_page_url: str | None
_from: int
to: int
data: list[Episode]
class AnimePaheEpisodeInfo(TypedDict):
title: str
episode: float
id: str
translation_type: Literal["eng", "jpn"]
duration: str
poster: str
class AvailableEpisodesDetail(TypedDict):
sub: list[str]
dub: list[str]
raw: list[str]
class AnimePaheAnime(TypedDict):
id: str
title: str
year: int
season: str
poster: str
score: int
availableEpisodesDetail: AvailableEpisodesDetail
episodesInfo: list[AnimePaheEpisodeInfo]
class PageInfo(TypedDict):
total: int
perPage: int
currentPage: int
class AnimePaheSearchResults(TypedDict):
pageInfo: PageInfo
results: list[AnimePaheSearchResult]
class AnimePaheStreamLink(TypedDict):
quality: str
translation_type: Literal["sub", "dub"]
link: str
class AnimePaheServer(TypedDict):
server: Literal["kwik"]
links: list[AnimePaheStreamLink]
episode_title: str
subtitles: list
headers: dict

View File

@@ -1,17 +0,0 @@
import re
ANIMEUNITY = "animeunity.so"
ANIMEUNITY_BASE = f"https://www.{ANIMEUNITY}"
MAX_TIMEOUT = 10
TOKEN_REGEX = re.compile(r'<meta.*?name="csrf-token".*?content="([^"]*)".*?>')
REPLACEMENT_WORDS = {"Season ": "", "Cour": "Part"}
# Server Specific
AVAILABLE_VIDEO_QUALITY = ["1080", "720", "480"]
VIDEO_INFO_REGEX = re.compile(r"window.video\s*=\s*(\{[^\}]*\})")
VIDEO_INFO_CLEAN_REGEX = re.compile(r'(?<!["\'])(\b\w+\b)(?=\s*:)')
DOWNLOAD_FILENAME_REGEX = re.compile(r"[?&]filename=([^&]+)")
QUALITY_REGEX = re.compile(r"/(\d{3,4}p)")
DOWNLOAD_URL_REGEX = re.compile(r"window.downloadUrl\s*=\s*'([^']*)'")

View File

@@ -1,50 +0,0 @@
import logging
from .constants import (
DOWNLOAD_FILENAME_REGEX,
DOWNLOAD_URL_REGEX,
QUALITY_REGEX,
VIDEO_INFO_CLEAN_REGEX,
VIDEO_INFO_REGEX,
)
logger = logging.getLogger(__name__)
def extract_server_info(html_content: str, episode_title: str | None) -> dict | None:
"""
Extracts server information from the VixCloud/AnimeUnity embed page.
Handles extraction from both window.video object and download URL.
"""
video_info = VIDEO_INFO_REGEX.search(html_content)
download_url_match = DOWNLOAD_URL_REGEX.search(html_content)
if not (download_url_match and video_info):
return None
info_str = VIDEO_INFO_CLEAN_REGEX.sub(r'"\1"', video_info.group(1))
# Use eval context for JS constants
ctx = {"null": None, "true": True, "false": False}
try:
info = eval(info_str, ctx)
except Exception as e:
logger.error(f"Failed to parse JS object: {e}")
return None
download_url = download_url_match.group(1)
info["link"] = download_url
# Extract metadata from download URL if missing in window.video
if filename_match := DOWNLOAD_FILENAME_REGEX.search(download_url):
info["name"] = filename_match.group(1)
else:
info["name"] = f"{episode_title or 'Unknown'}"
if quality_match := QUALITY_REGEX.search(download_url):
# "720p" -> 720
info["quality"] = int(quality_match.group(1)[:-1])
else:
info["quality"] = 0 # Fallback
return info

View File

@@ -1,133 +0,0 @@
from typing import Literal
from ..types import (
Anime,
AnimeEpisodeInfo,
AnimeEpisodes,
EpisodeStream,
MediaTranslationType,
PageInfo,
SearchResult,
SearchResults,
Server,
)
from .constants import AVAILABLE_VIDEO_QUALITY
def map_to_search_results(
data: dict, translation_type: Literal["sub", "dub"]
) -> SearchResults:
results = []
for result in data:
mapped_result = map_to_search_result(result, translation_type)
if mapped_result:
results.append(mapped_result)
return SearchResults(
page_info=PageInfo(),
results=results,
)
def map_to_search_result(
data: dict, translation_type: Literal["sub", "dub"] | None
) -> SearchResult | None:
if translation_type and data["dub"] != 1 if translation_type == "dub" else 0:
return None
return SearchResult(
id=str(data["id"]),
title=get_titles(data)[0] if get_titles(data) else "Unknown",
episodes=AnimeEpisodes(
sub=(
list(map(str, range(1, get_episodes_count(data) + 1)))
if data["dub"] == 0
else []
),
dub=(
list(map(str, range(1, get_episodes_count(data) + 1)))
if data["dub"] == 1
else []
),
),
other_titles=get_titles(data),
score=data["score"],
poster=data["imageurl"],
year=data["date"],
)
def map_to_anime_result(data: list, search_result: SearchResult) -> Anime:
return Anime(
id=search_result.id,
title=search_result.title,
episodes=AnimeEpisodes(
sub=[
episode["number"]
for episode in data
if len(search_result.episodes.sub) > 0
],
dub=[
episode["number"]
for episode in data
if len(search_result.episodes.dub) > 0
],
),
episodes_info=[
AnimeEpisodeInfo(
id=str(episode["id"]),
episode=episode["number"],
title=f"{search_result.title} - Ep {episode['number']}",
)
for episode in data
],
type=search_result.media_type,
poster=search_result.poster,
year=search_result.year,
)
def map_to_server(
episode: AnimeEpisodeInfo, info: dict, translation_type: Literal["sub", "dub"]
) -> Server:
return Server(
name="vixcloud",
links=[
EpisodeStream(
link=info["link"].replace(str(info["quality"]), quality),
title=info["name"],
quality=quality, # type: ignore
translation_type=MediaTranslationType(translation_type),
mp4=True,
)
for quality in sorted(
list(set(AVAILABLE_VIDEO_QUALITY + [str(info["quality"])])),
key=lambda x: int(x),
reverse=True,
)
if int(quality) <= info["quality"]
],
episode_title=episode.title,
)
def get_titles(data: dict) -> list[str]:
"""
Return the most appropriate title from the record.
"""
titles = []
if data.get("title_eng"):
titles.append(data["title_eng"])
if data.get("title"):
titles.append(data["title"])
if data.get("title_it"):
titles.append(data["title_it"])
return titles
def get_episodes_count(record: dict) -> int:
"""
Return the number of episodes from the record.
"""
if (count := record.get("real_episodes_count", 0)) > 0:
return count
return record.get("episodes_count", 0)

View File

@@ -1,170 +0,0 @@
import logging
from functools import lru_cache
from ...scraping.user_agents import UserAgentGenerator
from ..base import BaseAnimeProvider
from ..params import AnimeParams, EpisodeStreamsParams, SearchParams
from ..types import Anime, AnimeEpisodeInfo, SearchResult, SearchResults
from ..utils.debug import debug_provider
from .constants import (
ANIMEUNITY_BASE,
MAX_TIMEOUT,
REPLACEMENT_WORDS,
TOKEN_REGEX,
)
from .extractor import extract_server_info
from .mappers import (
map_to_anime_result,
map_to_search_result,
map_to_search_results,
map_to_server,
)
logger = logging.getLogger(__name__)
class AnimeUnity(BaseAnimeProvider):
HEADERS = {
"User-Agent": UserAgentGenerator().random(),
}
_cache = dict[str, SearchResult]()
@lru_cache
def _get_token(self) -> None:
response = self.client.get(
ANIMEUNITY_BASE,
headers=self.HEADERS,
timeout=MAX_TIMEOUT,
follow_redirects=True,
)
response.raise_for_status()
token_match = TOKEN_REGEX.search(response.text)
if token_match:
self.HEADERS["x-csrf-token"] = token_match.group(1)
self.client.cookies = {
"animeunity_session": response.cookies.get("animeunity_session") or ""
}
self.client.headers = self.HEADERS
@debug_provider
def search(self, params: SearchParams) -> SearchResults | None:
if not (res := self._search(params)):
return None
for result in res.results:
self._cache[result.id] = result
return res
@lru_cache
def _search(self, params: SearchParams) -> SearchResults | None:
self._get_token()
# Replace words in query to
query = params.query
for old, new in REPLACEMENT_WORDS.items():
query = query.replace(old, new)
response = self.client.post(
url=f"{ANIMEUNITY_BASE}/livesearch",
data={"title": query},
timeout=MAX_TIMEOUT,
)
response.raise_for_status()
return map_to_search_results(
response.json().get("records", []), params.translation_type
)
@debug_provider
def get(self, params: AnimeParams) -> Anime | None:
return self._get_anime(params)
@lru_cache()
def _get_search_result(self, params: AnimeParams) -> SearchResult | None:
if cached := self._cache.get(params.id):
return cached
response = self.client.get(
url=f"{ANIMEUNITY_BASE}/info_api/{params.id}/",
timeout=MAX_TIMEOUT,
)
response.raise_for_status()
data = response.json()
if res := map_to_search_result(data, None):
self._cache[params.id] = res
return res
@lru_cache
def _get_anime(self, params: AnimeParams) -> Anime | None:
if (search_result := self._get_search_result(params)) is None:
logger.error(f"No search result found for ID {params.id}")
return None
# Fetch episodes in chunks
data = []
start_range = 1
episode_count = max(
len(search_result.episodes.sub), len(search_result.episodes.dub)
)
while start_range <= episode_count:
end_range = min(start_range + 119, episode_count)
response = self.client.get(
url=f"{ANIMEUNITY_BASE}/info_api/{params.id}/1",
params={
"start_range": start_range,
"end_range": end_range,
},
timeout=MAX_TIMEOUT,
)
response.raise_for_status()
data.extend(response.json().get("episodes", []))
start_range = end_range + 1
return map_to_anime_result(data, search_result)
@lru_cache()
def _get_episode_info(
self, params: EpisodeStreamsParams
) -> AnimeEpisodeInfo | None:
anime_info = self._get_anime(
AnimeParams(id=params.anime_id, query=params.query)
)
if not anime_info:
logger.error(f"No anime info for {params.anime_id}")
return
if not anime_info.episodes_info:
logger.error(f"No episodes info for {params.anime_id}")
return
for episode in anime_info.episodes_info:
if episode.episode == params.episode:
return episode
@debug_provider
def episode_streams(self, params: EpisodeStreamsParams):
if not (episode := self._get_episode_info(params)):
logger.error(
f"Episode {params.episode} doesn't exist for anime {params.anime_id}"
)
return
# Get the Server url
response = self.client.get(
url=f"{ANIMEUNITY_BASE}/embed-url/{episode.id}", timeout=MAX_TIMEOUT
)
response.raise_for_status()
# Fetch the Server page
video_response = self.client.get(url=response.text.strip(), timeout=MAX_TIMEOUT)
video_response.raise_for_status()
if not (info := extract_server_info(video_response.text, episode.title)):
logger.error(f"Failed to extract video info for episode {episode.id}")
return None
yield map_to_server(episode, info, params.translation_type)
if __name__ == "__main__":
from ..utils.debug import test_anime_provider
test_anime_provider(AnimeUnity)

View File

@@ -1,39 +0,0 @@
from abc import ABC, abstractmethod
from typing import TYPE_CHECKING, ClassVar, Dict
from .params import AnimeParams, EpisodeStreamsParams, SearchParams
if TYPE_CHECKING:
from collections.abc import Iterator
from httpx import Client
from .types import Anime, SearchResults, Server
class BaseAnimeProvider(ABC):
HEADERS: ClassVar[Dict[str, str]]
def __init_subclass__(cls, **kwargs):
super().__init_subclass__(**kwargs)
if not hasattr(cls, "HEADERS"):
raise TypeError(
"Subclasses of BaseAnimeProvider must define a 'HEADERS' class attribute."
)
def __init__(self, client: "Client") -> None:
self.client = client
@abstractmethod
def search(self, params: SearchParams) -> "SearchResults | None":
pass
@abstractmethod
def get(self, params: AnimeParams) -> "Anime | None":
pass
@abstractmethod
def episode_streams(
self, params: EpisodeStreamsParams
) -> "Iterator[Server] | None":
pass

View File

@@ -1,46 +0,0 @@
from dataclasses import dataclass
from typing import Literal, Optional
@dataclass(frozen=True)
class SearchParams:
"""Parameters for searching anime."""
query: str
# pagination and sorting
current_page: int = 1
page_limit: int = 20
sort_by: str = "relevance"
order: Literal["asc", "desc"] = "desc"
# filters
translation_type: Literal["sub", "dub"] = "sub"
genre: Optional[str] = None
year: Optional[int] = None
status: Optional[str] = None
allow_nsfw: bool = True
allow_unknown: bool = True
country_of_origin: Optional[str] = None
@dataclass(frozen=True)
class EpisodeStreamsParams:
"""Parameters for fetching episode streams."""
query: str
anime_id: str
episode: str
translation_type: Literal["sub", "dub"] = "sub"
server: Optional[str] = None
quality: Literal["1080", "720", "480", "360"] = "720"
subtitles: bool = True
@dataclass(frozen=True)
class AnimeParams:
"""Parameters for fetching anime details."""
id: str
# HACK: for the sake of providers which require previous data
query: str

View File

@@ -1,72 +0,0 @@
import importlib
import logging
from httpx import Client
from .base import BaseAnimeProvider
from .types import ProviderName
logger = logging.getLogger(__name__)
PROVIDERS_AVAILABLE = {
"allanime": "provider.AllAnime",
"animepahe": "provider.AnimePahe",
"hianime": "provider.HiAnime",
"nyaa": "provider.Nyaa",
"yugen": "provider.Yugen",
"animeunity": "provider.AnimeUnity",
}
class AnimeProviderFactory:
"""Factory for creating anime provider instances."""
@staticmethod
def create(provider_name: ProviderName) -> BaseAnimeProvider:
"""
Dynamically creates an instance of the specified anime provider.
This method imports the necessary provider module, instantiates its main class,
and injects a pre-configured HTTP client.
Args:
provider_name: The name of the provider to create (e.g., 'allanime').
Returns:
An instance of a class that inherits from BaseProvider.
Raises:
ValueError: If the provider_name is not supported.
ImportError: If the provider module or class cannot be found.
"""
from ....core.utils.networking import random_user_agent
# Correctly determine module and class name from the map
import_path = PROVIDERS_AVAILABLE[provider_name.value.lower()]
module_name, class_name = import_path.split(".", 1)
# Construct the full package path for dynamic import
package_path = f"viu_media.libs.provider.anime.{provider_name.value.lower()}"
try:
provider_module = importlib.import_module(f".{module_name}", package_path)
provider_class = getattr(provider_module, class_name)
except (ImportError, AttributeError) as e:
logger.error(
f"Failed to load provider '{provider_name.value.lower()}': {e}"
)
raise ImportError(
f"Could not load provider '{provider_name.value.lower()}'. "
"Check the module path and class name in PROVIDERS_AVAILABLE."
) from e
# Each provider class requires an httpx.Client, which we set up here.
client = Client(
headers={"User-Agent": random_user_agent(), **provider_class.HEADERS}
)
return provider_class(client)
# Simple alias for ease of use, consistent with other factories in the codebase.
create_provider = AnimeProviderFactory.create

View File

@@ -1,119 +0,0 @@
from enum import Enum
from typing import List, Literal, Optional
from pydantic import BaseModel, ConfigDict
# from .allanime.types import Server as AllAnimeServer
# from .animepahe.types import Server as AnimePaheServer
# ENUMS
class ProviderName(Enum):
ALLANIME = "allanime"
ANIMEPAHE = "animepahe"
ANIMEUNITY = "animeunity"
class ProviderServer(Enum):
TOP = "TOP"
# AllAnimeServer values
SHAREPOINT = "sharepoint"
DROPBOX = "dropbox"
GOGOANIME = "gogoanime"
WETRANSFER = "weTransfer"
WIXMP = "wixmp"
YT = "Yt"
MP4_UPLOAD = "mp4-upload"
# AnimePaheServer values
KWIK = "kwik"
# AnimeUnityServer values
VIXCLOUD = "vixcloud"
class MediaTranslationType(Enum):
SUB = "sub"
DUB = "dub"
RAW = "raw"
# MODELS
class BaseAnimeProviderModel(BaseModel):
model_config = ConfigDict(frozen=True)
class PageInfo(BaseAnimeProviderModel):
total: Optional[int] = None
per_page: Optional[int] = None
current_page: Optional[int] = None
class AnimeEpisodes(BaseAnimeProviderModel):
sub: List[str]
dub: List[str] = []
raw: List[str] = []
class SearchResult(BaseAnimeProviderModel):
id: str
title: str
episodes: AnimeEpisodes
other_titles: List[str] = []
media_type: Optional[str] = None
score: Optional[float] = None
status: Optional[str] = None
season: Optional[str] = None
poster: Optional[str] = None
year: Optional[str] = None
class SearchResults(BaseAnimeProviderModel):
page_info: PageInfo
results: List[SearchResult]
class AnimeEpisodeInfo(BaseAnimeProviderModel):
id: str
episode: str
session_id: Optional[str] = None
title: Optional[str] = None
poster: Optional[str] = None
duration: Optional[str] = None
class Anime(BaseAnimeProviderModel):
id: str
title: str
episodes: AnimeEpisodes
type: Optional[str] = None
episodes_info: List[AnimeEpisodeInfo] | None = None
poster: Optional[str] = None
year: Optional[str] = None
class EpisodeStream(BaseAnimeProviderModel):
# episode: str
link: str
title: Optional[str] = None
quality: Literal["360", "480", "720", "1080"] = "720"
translation_type: MediaTranslationType = MediaTranslationType.SUB
format: Optional[str] = None
hls: Optional[bool] = None
mp4: Optional[bool] = None
priority: Optional[int] = None
class Subtitle(BaseAnimeProviderModel):
url: str
language: Optional[str] = None
class Server(BaseAnimeProviderModel):
name: str
links: List[EpisodeStream]
episode_title: Optional[str] = None
headers: dict[str, str] = dict()
subtitles: List[Subtitle] = []
audio: List[str] = []

View File

@@ -1,90 +0,0 @@
import functools
import logging
import os
from typing import Type
from ..base import BaseAnimeProvider
logger = logging.getLogger(__name__)
def debug_provider(provider_function):
@functools.wraps(provider_function)
def _provider_function_wrapper(self, *args, **kwargs):
provider_name = self.__class__.__name__.upper()
if not os.environ.get("VIU_DEBUG"):
try:
return provider_function(self, *args, **kwargs)
except Exception as e:
logger.error(f"[{provider_name}@{provider_function.__name__}]: {e}")
else:
return provider_function(self, *args, **kwargs)
return _provider_function_wrapper
def test_anime_provider(AnimeProvider: Type[BaseAnimeProvider]):
import shutil
import subprocess
from httpx import Client
from .....core.constants import APP_ASCII_ART
from .....core.utils.networking import random_user_agent
from ..params import AnimeParams, EpisodeStreamsParams, SearchParams
anime_provider = AnimeProvider(
Client(headers={"User-Agent": random_user_agent(), **AnimeProvider.HEADERS})
)
print(APP_ASCII_ART.read_text(encoding="utf-8"))
query = input("What anime would you like to stream: ")
search_results = anime_provider.search(SearchParams(query=query))
if not search_results:
return
for i, search_result in enumerate(search_results.results):
print(f"{i + 1}: {search_result.title}")
result = search_results.results[
int(input(f"Select result (1-{len(search_results.results)}): ")) - 1
]
anime = anime_provider.get(AnimeParams(id=result.id, query=query))
if not anime:
return
translation_type = input("Preferred Translation Type: [dub,sub,raw]: ")
for episode in getattr(anime.episodes, translation_type):
print(episode)
episode_number = input("What episode do you wish to watch: ")
episode_streams = anime_provider.episode_streams(
EpisodeStreamsParams(
query=query,
anime_id=anime.id,
episode=episode_number,
translation_type=translation_type, # type:ignore
)
)
if not episode_streams:
return
episode_streams = list(episode_streams)
for i, stream in enumerate(episode_streams):
print(f"{i + 1}: {stream.name}")
stream = episode_streams[int(input("Select your preferred server: ")) - 1]
for i, link in enumerate(stream.links):
print(f"{i + 1}: {link.quality}")
link = stream.links[int(input("Select your preferred quality: ")) - 1]
if executable := shutil.which("mpv"):
cmd = executable
elif executable := shutil.which("xdg-open"):
cmd = executable
elif executable := shutil.which("open"):
cmd = executable
else:
return
print(
"Now streaming: ",
anime.title,
"Episode: ",
stream.episode_title if stream.episode_title else episode_number,
)
subprocess.run([cmd, link.link])

View File

@@ -1,494 +0,0 @@
# pyright: reportAttributeAccessIssue=false, reportPossiblyUnboundVariable=false
"""
HTML parsing utilities with optional lxml support.
This module provides comprehensive HTML parsing capabilities using either
Python's built-in html.parser or lxml for better performance when available.
"""
# TODO: Review and optimize the HTML parsing logic for better performance and flexibility.
# Consider adding more utility functions for common HTML manipulation tasks.
import logging
import re
from html.parser import HTMLParser as BaseHTMLParser
from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union
if TYPE_CHECKING:
from lxml import etree
logger = logging.getLogger(__name__)
# Try to import lxml
HAS_LXML = False
try:
from lxml import etree, html as lxml_html
HAS_LXML = True
logger.debug("lxml is available and will be used for HTML parsing")
except ImportError:
logger.debug("lxml not available, falling back to html.parser")
class HTMLParserConfig:
"""Configuration for HTML parser selection."""
def __init__(self, use_lxml: Optional[bool] = None):
"""
Initialize parser configuration.
Args:
use_lxml: Force use of lxml (True), html.parser (False), or auto-detect (None)
"""
if use_lxml is None:
self.use_lxml = HAS_LXML
else:
self.use_lxml = use_lxml and HAS_LXML
if use_lxml and not HAS_LXML:
logger.warning(
"lxml requested but not available, falling back to html.parser"
)
class HTMLParser:
"""
Comprehensive HTML parser with optional lxml support.
Provides a unified interface for HTML parsing operations regardless
of the underlying parser implementation.
"""
def __init__(self, config: Optional[HTMLParserConfig] = None):
"""Initialize the HTML parser with configuration."""
self.config = config or HTMLParserConfig()
def parse(self, html_content: str) -> Union[Any, "ParsedHTML"]:
"""
Parse HTML content and return a parsed tree.
Args:
html_content: Raw HTML string to parse
Returns:
Parsed HTML tree (lxml Element or custom ParsedHTML object)
"""
if self.config.use_lxml:
return self._parse_with_lxml(html_content)
else:
return self._parse_with_builtin(html_content)
def _parse_with_lxml(self, html_content: str) -> Any:
"""Parse HTML using lxml."""
try:
# Use lxml's HTML parser which is more lenient
return lxml_html.fromstring(html_content)
except Exception as e:
logger.warning(f"lxml parsing failed: {e}, falling back to html.parser")
return self._parse_with_builtin(html_content)
def _parse_with_builtin(self, html_content: str) -> "ParsedHTML":
"""Parse HTML using Python's built-in parser."""
parser = BuiltinHTMLParser()
parser.feed(html_content)
return ParsedHTML(parser.elements, html_content)
class BuiltinHTMLParser(BaseHTMLParser):
"""Enhanced HTML parser using Python's built-in capabilities."""
def __init__(self):
super().__init__()
self.elements = []
self.current_element = None
self.element_stack = []
def handle_starttag(self, tag: str, attrs: List[Tuple[str, Optional[str]]]):
"""Handle opening tags."""
element = {
"tag": tag,
"attrs": dict(attrs),
"text": "",
"children": [],
"start_pos": self.getpos(),
}
if self.element_stack:
self.element_stack[-1]["children"].append(element)
else:
self.elements.append(element)
self.element_stack.append(element)
def handle_endtag(self, tag: str):
"""Handle closing tags."""
if self.element_stack and self.element_stack[-1]["tag"] == tag:
element = self.element_stack.pop()
element["end_pos"] = self.getpos()
def handle_data(self, data: str):
"""Handle text content."""
if self.element_stack:
self.element_stack[-1]["text"] += data
class ParsedHTML:
"""Wrapper for parsed HTML using built-in parser."""
def __init__(self, elements: List[Dict], raw_html: str):
self.elements = elements
self.raw_html = raw_html
def find_by_id(self, element_id: str) -> Optional[Dict]:
"""Find element by ID."""
return self._find_recursive(
self.elements, lambda el: el["attrs"].get("id") == element_id
)
def find_by_class(self, class_name: str) -> List[Dict]:
"""Find elements by class name."""
results = []
self._find_all_recursive(
self.elements,
lambda el: class_name in el["attrs"].get("class", "").split(),
results,
)
return results
def find_by_tag(self, tag_name: str) -> List[Dict]:
"""Find elements by tag name."""
results = []
self._find_all_recursive(
self.elements, lambda el: el["tag"].lower() == tag_name.lower(), results
)
return results
def _find_recursive(self, elements: List[Dict], condition) -> Optional[Dict]:
"""Recursively find first element matching condition."""
for element in elements:
if condition(element):
return element
result = self._find_recursive(element["children"], condition)
if result:
return result
return None
def _find_all_recursive(self, elements: List[Dict], condition, results: List[Dict]):
"""Recursively find all elements matching condition."""
for element in elements:
if condition(element):
results.append(element)
self._find_all_recursive(element["children"], condition, results)
# Global parser instance
_default_parser = HTMLParser()
def extract_attributes(html_element: str) -> Dict[str, str]:
"""
Extract attributes from an HTML element string.
Args:
html_element: HTML element as string (e.g., '<div class="test" id="main">')
Returns:
Dictionary of attribute name-value pairs
Examples:
>>> extract_attributes('<div class="test" id="main">')
{'class': 'test', 'id': 'main'}
"""
if not html_element:
return {}
# Use regex to extract attributes from HTML string
attr_pattern = r'(\w+)=(["\'])([^"\']*?)\2'
matches = re.findall(attr_pattern, html_element)
attributes = {}
for match in matches:
attr_name, _, attr_value = match
attributes[attr_name] = attr_value
# Handle attributes without quotes
unquoted_pattern = r"(\w+)=([^\s>]+)"
unquoted_matches = re.findall(unquoted_pattern, html_element)
for attr_name, attr_value in unquoted_matches:
if attr_name not in attributes:
attributes[attr_name] = attr_value
return attributes
def get_element_by_id(element_id: str, html_content: str) -> Optional[str]:
"""
Get HTML element by ID.
Args:
element_id: The ID attribute value to search for
html_content: HTML content to search in
Returns:
HTML string of the element or None if not found
Examples:
>>> html = '<div id="test">Content</div>'
>>> get_element_by_id("test", html)
'<div id="test">Content</div>'
"""
parsed = _default_parser.parse(html_content)
if _default_parser.config.use_lxml and HAS_LXML:
try:
element = parsed.xpath(f'//*[@id="{element_id}"]')
if element:
return etree.tostring(element[0], encoding="unicode", method="html")
except Exception as e:
logger.warning(f"lxml XPath search failed: {e}")
return None
else:
element = parsed.find_by_id(element_id)
if element:
return _element_to_html(element, html_content)
return None
def get_element_by_tag(tag_name: str, html_content: str) -> Optional[str]:
"""
Get first HTML element by tag name.
Args:
tag_name: The tag name to search for
html_content: HTML content to search in
Returns:
HTML string of the element or None if not found
"""
parsed = _default_parser.parse(html_content)
if _default_parser.config.use_lxml and HAS_LXML:
try:
elements = parsed.xpath(f"//{tag_name}")
if elements:
return etree.tostring(elements[0], encoding="unicode", method="html")
except Exception as e:
logger.warning(f"lxml XPath search failed: {e}")
return None
else:
elements = parsed.find_by_tag(tag_name)
if elements:
return _element_to_html(elements[0], html_content)
return None
def get_element_by_class(class_name: str, html_content: str) -> Optional[str]:
"""
Get first HTML element by class name.
Args:
class_name: The class name to search for
html_content: HTML content to search in
Returns:
HTML string of the element or None if not found
"""
parsed = _default_parser.parse(html_content)
if _default_parser.config.use_lxml and HAS_LXML:
try:
elements = parsed.xpath(f'//*[contains(@class, "{class_name}")]')
if elements:
return etree.tostring(elements[0], encoding="unicode", method="html")
except Exception as e:
logger.warning(f"lxml XPath search failed: {e}")
return None
else:
elements = parsed.find_by_class(class_name)
if elements:
return _element_to_html(elements[0], html_content)
return None
def get_elements_by_tag(tag_name: str, html_content: str) -> List[str]:
"""
Get all HTML elements by tag name.
Args:
tag_name: The tag name to search for
html_content: HTML content to search in
Returns:
List of HTML strings for matching elements
"""
parsed = _default_parser.parse(html_content)
results = []
if _default_parser.config.use_lxml and HAS_LXML:
try:
elements = parsed.xpath(f"//{tag_name}")
for element in elements:
results.append(
etree.tostring(element, encoding="unicode", method="html")
)
except Exception as e:
logger.warning(f"lxml XPath search failed: {e}")
else:
elements = parsed.find_by_tag(tag_name)
for element in elements:
results.append(_element_to_html(element, html_content))
return results
def get_elements_by_class(class_name: str, html_content: str) -> List[str]:
"""
Get all HTML elements by class name.
Args:
class_name: The class name to search for
html_content: HTML content to search in
Returns:
List of HTML strings for matching elements
"""
parsed = _default_parser.parse(html_content)
results = []
if _default_parser.config.use_lxml and HAS_LXML:
try:
elements = parsed.xpath(f'//*[contains(@class, "{class_name}")]')
for element in elements:
results.append(
etree.tostring(element, encoding="unicode", method="html")
)
except Exception as e:
logger.warning(f"lxml XPath search failed: {e}")
else:
elements = parsed.find_by_class(class_name)
for element in elements:
results.append(_element_to_html(element, html_content))
return results
def get_elements_html_by_class(class_name: str, html_content: str) -> List[str]:
"""
Get HTML strings of elements by class name.
This is an alias for get_elements_by_class for yt-dlp compatibility.
Args:
class_name: The class name to search for
html_content: HTML content to search in
Returns:
List of HTML strings for matching elements
"""
return get_elements_by_class(class_name, html_content)
def get_element_text_and_html_by_tag(
tag_name: str, html_content: str
) -> Tuple[Optional[str], Optional[str]]:
"""
Get both text content and HTML of first element by tag name.
Args:
tag_name: The tag name to search for
html_content: HTML content to search in
Returns:
Tuple of (text_content, html_string) or (None, None) if not found
Examples:
>>> html = '<script>alert("test");</script>'
>>> get_element_text_and_html_by_tag("script", html)
('alert("test");', '<script>alert("test");</script>')
"""
parsed = _default_parser.parse(html_content)
if _default_parser.config.use_lxml and HAS_LXML:
try:
elements = parsed.xpath(f"//{tag_name}")
if elements:
element = elements[0]
text = (
element.text_content()
if hasattr(element, "text_content")
else (element.text or "")
)
html_str = etree.tostring(element, encoding="unicode", method="html")
return text, html_str
except Exception as e:
logger.warning(f"lxml XPath search failed: {e}")
return None, None
else:
elements = parsed.find_by_tag(tag_name)
if elements:
element = elements[0]
text = _extract_text_content(element)
html_str = _element_to_html(element, html_content)
return text, html_str
return None, None
def _element_to_html(element: Dict, original_html: str) -> str:
"""
Convert parsed element back to HTML string.
This is a simplified implementation that reconstructs HTML from parsed data.
For production use, consider using lxml for better accuracy.
"""
if not element:
return ""
# Build opening tag
tag = element["tag"]
attrs = element.get("attrs", {})
attr_str = " ".join(f'{k}="{v}"' for k, v in attrs.items() if v is not None)
if attr_str:
opening_tag = f"<{tag} {attr_str}>"
else:
opening_tag = f"<{tag}>"
# Add text content
text = element.get("text", "")
# Add children
children_html = ""
for child in element.get("children", []):
children_html += _element_to_html(child, original_html)
# Build closing tag
closing_tag = f"</{tag}>"
return f"{opening_tag}{text}{children_html}{closing_tag}"
def _extract_text_content(element: Dict) -> str:
"""Extract all text content from element and its children."""
text = element.get("text", "")
for child in element.get("children", []):
text += _extract_text_content(child)
return text
def configure_parser(use_lxml: Optional[bool] = None) -> None:
"""
Configure the global HTML parser.
Args:
use_lxml: Force use of lxml (True), html.parser (False), or auto-detect (None)
"""
global _default_parser
_default_parser = HTMLParser(HTMLParserConfig(use_lxml))
logger.info(
f"HTML parser configured: {'lxml' if _default_parser.config.use_lxml else 'html.parser'}"
)

View File

@@ -1,235 +0,0 @@
"""
User agent utilities for web scraping.
Provides functionality to generate random user agent strings
to avoid detection and blocking by websites.
"""
import random
from typing import List, Optional
class UserAgentGenerator:
"""
Generator for realistic user agent strings.
Provides a variety of common user agents from different browsers
and operating systems to help avoid detection.
"""
# Common user agents for different browsers and OS combinations
USER_AGENTS = [
# Chrome on Windows
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
# Chrome on macOS
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36",
# Chrome on Linux
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36",
"Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36",
# Firefox on Windows
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:123.0) Gecko/20100101 Firefox/123.0",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:122.0) Gecko/20100101 Firefox/122.0",
# Firefox on macOS
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:123.0) Gecko/20100101 Firefox/123.0",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:122.0) Gecko/20100101 Firefox/122.0",
# Firefox on Linux
"Mozilla/5.0 (X11; Linux x86_64; rv:123.0) Gecko/20100101 Firefox/123.0",
"Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:123.0) Gecko/20100101 Firefox/123.0",
# Safari on macOS
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.3 Safari/605.1.15",
"Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Safari/605.1.15",
# Edge on Windows
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0",
"Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36 Edg/121.0.0.0",
# Mobile Chrome (Android)
"Mozilla/5.0 (Linux; Android 14; SM-G998B) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Mobile Safari/537.36",
"Mozilla/5.0 (Linux; Android 13; Pixel 7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Mobile Safari/537.36",
# Mobile Safari (iOS)
"Mozilla/5.0 (iPhone; CPU iPhone OS 17_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.3 Mobile/15E148 Safari/604.1",
"Mozilla/5.0 (iPad; CPU OS 17_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.3 Mobile/15E148 Safari/604.1",
]
# Browser-specific user agents for when you need a specific browser
CHROME_USER_AGENTS = [
ua for ua in USER_AGENTS if "Chrome" in ua and "Edg" not in ua
]
FIREFOX_USER_AGENTS = [ua for ua in USER_AGENTS if "Firefox" in ua]
SAFARI_USER_AGENTS = [
ua for ua in USER_AGENTS if "Safari" in ua and "Chrome" not in ua
]
EDGE_USER_AGENTS = [ua for ua in USER_AGENTS if "Edg" in ua]
# Platform-specific user agents
WINDOWS_USER_AGENTS = [ua for ua in USER_AGENTS if "Windows NT" in ua]
MACOS_USER_AGENTS = [ua for ua in USER_AGENTS if "Macintosh" in ua]
LINUX_USER_AGENTS = [
ua for ua in USER_AGENTS if "Linux" in ua and "Android" not in ua
]
MOBILE_USER_AGENTS = [ua for ua in USER_AGENTS if "Mobile" in ua or "Android" in ua]
def __init__(self, seed: Optional[int] = None):
"""
Initialize the user agent generator.
Args:
seed: Random seed for reproducible results (optional)
"""
if seed is not None:
random.seed(seed)
def random(self) -> str:
"""
Get a random user agent string.
Returns:
Random user agent string
"""
return random.choice(self.USER_AGENTS)
def random_browser(self, browser: str) -> str:
"""
Get a random user agent for a specific browser.
Args:
browser: Browser name ('chrome', 'firefox', 'safari', 'edge')
Returns:
Random user agent string for the specified browser
Raises:
ValueError: If browser is not supported
"""
browser = browser.lower()
if browser == "chrome":
return random.choice(self.CHROME_USER_AGENTS)
elif browser == "firefox":
return random.choice(self.FIREFOX_USER_AGENTS)
elif browser == "safari":
return random.choice(self.SAFARI_USER_AGENTS)
elif browser == "edge":
return random.choice(self.EDGE_USER_AGENTS)
else:
raise ValueError(f"Unsupported browser: {browser}")
def random_platform(self, platform: str) -> str:
"""
Get a random user agent for a specific platform.
Args:
platform: Platform name ('windows', 'macos', 'linux', 'mobile')
Returns:
Random user agent string for the specified platform
Raises:
ValueError: If platform is not supported
"""
platform = platform.lower()
if platform == "windows":
return random.choice(self.WINDOWS_USER_AGENTS)
elif platform in ("macos", "mac"):
return random.choice(self.MACOS_USER_AGENTS)
elif platform == "linux":
return random.choice(self.LINUX_USER_AGENTS)
elif platform == "mobile":
return random.choice(self.MOBILE_USER_AGENTS)
else:
raise ValueError(f"Unsupported platform: {platform}")
def add_user_agent(self, user_agent: str) -> None:
"""
Add a custom user agent to the list.
Args:
user_agent: Custom user agent string to add
"""
if user_agent not in self.USER_AGENTS:
self.USER_AGENTS.append(user_agent)
def get_all(self) -> List[str]:
"""
Get all available user agent strings.
Returns:
List of all user agent strings
"""
return self.USER_AGENTS.copy()
# Global instance for convenience
_default_generator = UserAgentGenerator()
def random_user_agent() -> str:
"""
Get a random user agent string using the default generator.
Returns:
Random user agent string
Examples:
>>> ua = random_user_agent()
>>> "Mozilla" in ua
True
"""
return _default_generator.random()
def random_user_agent_browser(browser: str) -> str:
"""
Get a random user agent for a specific browser.
Args:
browser: Browser name ('chrome', 'firefox', 'safari', 'edge')
Returns:
Random user agent string for the specified browser
"""
return _default_generator.random_browser(browser)
def random_user_agent_platform(platform: str) -> str:
"""
Get a random user agent for a specific platform.
Args:
platform: Platform name ('windows', 'macos', 'linux', 'mobile')
Returns:
Random user agent string for the specified platform
"""
return _default_generator.random_platform(platform)
def set_user_agent_seed(seed: int) -> None:
"""
Set the random seed for user agent generation.
Args:
seed: Random seed value
"""
global _default_generator
_default_generator = UserAgentGenerator(seed)
def add_custom_user_agent(user_agent: str) -> None:
"""
Add a custom user agent to the default generator.
Args:
user_agent: Custom user agent string to add
"""
_default_generator.add_user_agent(user_agent)
def get_all_user_agents() -> List[str]:
"""
Get all available user agent strings from the default generator.
Returns:
List of all user agent strings
"""
return _default_generator.get_all()

View File

@@ -1,272 +0,0 @@
"""
Encoding and utility functions for web scraping.
Provides various encoding utilities including base-N encoding
that was previously sourced from yt-dlp.
"""
import string
from typing import Optional
def encode_base_n(num: int, n: int, table: Optional[str] = None) -> str:
"""
Encode a number in base-n representation.
Args:
num: The number to encode
n: The base to use for encoding
table: Custom character table (optional)
Returns:
String representation of the number in base-n
Examples:
>>> encode_base_n(255, 16)
'ff'
>>> encode_base_n(42, 36)
'16'
"""
if table is None:
# Default table: 0-9, a-z
table = string.digits + string.ascii_lowercase
if not 2 <= n <= len(table):
raise ValueError(f"Base must be between 2 and {len(table)}")
if num == 0:
return table[0]
result = []
is_negative = num < 0
num = abs(num)
while num > 0:
result.append(table[num % n])
num //= n
if is_negative:
result.append("-")
return "".join(reversed(result))
def decode_base_n(encoded: str, n: int, table: Optional[str] = None) -> int:
"""
Decode a base-n encoded string back to an integer.
Args:
encoded: The base-n encoded string
n: The base used for encoding
table: Custom character table (optional)
Returns:
The decoded integer
Examples:
>>> decode_base_n('ff', 16)
255
>>> decode_base_n('16', 36)
42
"""
if table is None:
table = string.digits + string.ascii_lowercase
if not 2 <= n <= len(table):
raise ValueError(f"Base must be between 2 and {len(table)}")
if not encoded:
return 0
is_negative = encoded.startswith("-")
if is_negative:
encoded = encoded[1:]
result = 0
for i, char in enumerate(reversed(encoded.lower())):
if char not in table:
raise ValueError(f"Invalid character '{char}' for base {n}")
digit_value = table.index(char)
if digit_value >= n:
raise ValueError(f"Invalid digit '{char}' for base {n}")
result += digit_value * (n**i)
return -result if is_negative else result
def url_encode(text: str, safe: str = "") -> str:
"""
URL encode a string.
Args:
text: Text to encode
safe: Characters that should not be encoded
Returns:
URL encoded string
"""
import urllib.parse
return urllib.parse.quote(text, safe=safe)
def url_decode(text: str) -> str:
"""
URL decode a string.
Args:
text: URL encoded text to decode
Returns:
Decoded string
"""
import urllib.parse
return urllib.parse.unquote(text)
def html_unescape(text: str) -> str:
"""
Unescape HTML entities in text.
Args:
text: Text containing HTML entities
Returns:
Text with HTML entities unescaped
Examples:
>>> html_unescape('&quot;Hello&quot; &amp; &lt;World&gt;')
'"Hello" & <World>'
"""
import html
return html.unescape(text)
def strip_tags(html_content: str) -> str:
"""
Remove all HTML tags from content, leaving only text.
Args:
html_content: HTML content with tags
Returns:
Plain text with tags removed
Examples:
>>> strip_tags('<p>Hello <b>world</b>!</p>')
'Hello world!'
"""
import re
return re.sub(r"<[^>]+>", "", html_content)
def normalize_whitespace(text: str) -> str:
"""
Normalize whitespace in text by collapsing multiple spaces and removing leading/trailing whitespace.
Args:
text: Text to normalize
Returns:
Text with normalized whitespace
Examples:
>>> normalize_whitespace(' Hello world \\n\\t ')
'Hello world'
"""
import re
return re.sub(r"\s+", " ", text.strip())
def extract_domain(url: str) -> str:
"""
Extract domain from a URL.
Args:
url: Full URL
Returns:
Domain portion of the URL
Examples:
>>> extract_domain('https://example.com/path?query=1')
'example.com'
"""
import urllib.parse
parsed = urllib.parse.urlparse(url)
return parsed.netloc
def join_url(base: str, path: str) -> str:
"""
Join a base URL with a path.
Args:
base: Base URL
path: Path to join
Returns:
Combined URL
Examples:
>>> join_url('https://example.com', '/api/data')
'https://example.com/api/data'
"""
import urllib.parse
return urllib.parse.urljoin(base, path)
def parse_query_string(query: str) -> dict:
"""
Parse a query string into a dictionary.
Args:
query: Query string (with or without leading '?')
Returns:
Dictionary of query parameters
Examples:
>>> parse_query_string('?name=John&age=30')
{'name': ['John'], 'age': ['30']}
"""
import urllib.parse
if query.startswith("?"):
query = query[1:]
return urllib.parse.parse_qs(query)
def build_query_string(params: dict) -> str:
"""
Build a query string from a dictionary of parameters.
Args:
params: Dictionary of parameters
Returns:
URL-encoded query string
Examples:
>>> build_query_string({'name': 'John', 'age': 30})
'name=John&age=30'
"""
import urllib.parse
# Handle both single values and lists
normalized_params = {}
for key, value in params.items():
if isinstance(value, (list, tuple)):
normalized_params[key] = value
else:
normalized_params[key] = [str(value)]
return urllib.parse.urlencode(normalized_params, doseq=True)