From 74487a2d1734ccf09e853d8c254e2c4c8b86df4e Mon Sep 17 00:00:00 2001 From: Benexl Date: Sat, 2 May 2026 21:36:19 +0300 Subject: [PATCH] Revert "Delete viu_media/libs/provider directory" This reverts commit 7e93e4eef96865340a64e1f6d7568469e0b24ca7. --- viu_media/libs/provider/__init__.py | 0 viu_media/libs/provider/anime/__init__.py | 0 .../libs/provider/anime/allanime/__init__.py | 1 + .../libs/provider/anime/allanime/constants.py | 39 ++ .../anime/allanime/extractors/__init__.py | 3 + .../provider/anime/allanime/extractors/ak.py | 31 ++ .../anime/allanime/extractors/base.py | 20 + .../anime/allanime/extractors/dropbox.py | 31 ++ .../anime/allanime/extractors/extractor.py | 65 +++ .../anime/allanime/extractors/filemoon.py | 62 +++ .../anime/allanime/extractors/gogoanime.py | 34 ++ .../anime/allanime/extractors/mp4_upload.py | 32 ++ .../anime/allanime/extractors/sharepoint.py | 28 + .../anime/allanime/extractors/streamsb.py | 32 ++ .../anime/allanime/extractors/vid_mp4.py | 32 ++ .../anime/allanime/extractors/we_transfer.py | 31 ++ .../anime/allanime/extractors/wixmp.py | 26 + .../anime/allanime/extractors/yt_mp4.py | 22 + .../libs/provider/anime/allanime/mappers.py | 58 ++ .../libs/provider/anime/allanime/provider.py | 84 +++ .../libs/provider/anime/allanime/types.py | 111 ++++ .../libs/provider/anime/allanime/utils.py | 92 ++++ .../libs/provider/anime/animepahe/__init__.py | 1 + .../provider/anime/animepahe/constants.py | 56 ++ .../provider/anime/animepahe/extractor.py | 77 +++ .../libs/provider/anime/animepahe/mappers.py | 113 ++++ .../libs/provider/anime/animepahe/provider.py | 214 ++++++++ .../libs/provider/anime/animepahe/types.py | 108 ++++ .../provider/anime/animeunity/__init__.py | 0 .../provider/anime/animeunity/constants.py | 17 + .../provider/anime/animeunity/extractor.py | 50 ++ .../libs/provider/anime/animeunity/mappers.py | 133 +++++ .../provider/anime/animeunity/provider.py | 170 ++++++ viu_media/libs/provider/anime/base.py | 39 ++ viu_media/libs/provider/anime/params.py | 46 ++ viu_media/libs/provider/anime/provider.py | 72 +++ viu_media/libs/provider/anime/types.py | 119 +++++ viu_media/libs/provider/anime/utils/debug.py | 90 ++++ viu_media/libs/provider/scraping/__init__.py | 0 .../libs/provider/scraping/html_parser.py | 494 ++++++++++++++++++ .../libs/provider/scraping/user_agents.py | 235 +++++++++ viu_media/libs/provider/scraping/utils.py | 272 ++++++++++ 42 files changed, 3140 insertions(+) create mode 100644 viu_media/libs/provider/__init__.py create mode 100644 viu_media/libs/provider/anime/__init__.py create mode 100644 viu_media/libs/provider/anime/allanime/__init__.py create mode 100644 viu_media/libs/provider/anime/allanime/constants.py create mode 100644 viu_media/libs/provider/anime/allanime/extractors/__init__.py create mode 100644 viu_media/libs/provider/anime/allanime/extractors/ak.py create mode 100644 viu_media/libs/provider/anime/allanime/extractors/base.py create mode 100644 viu_media/libs/provider/anime/allanime/extractors/dropbox.py create mode 100644 viu_media/libs/provider/anime/allanime/extractors/extractor.py create mode 100644 viu_media/libs/provider/anime/allanime/extractors/filemoon.py create mode 100644 viu_media/libs/provider/anime/allanime/extractors/gogoanime.py create mode 100644 viu_media/libs/provider/anime/allanime/extractors/mp4_upload.py create mode 100644 viu_media/libs/provider/anime/allanime/extractors/sharepoint.py create mode 100644 viu_media/libs/provider/anime/allanime/extractors/streamsb.py create mode 100644 viu_media/libs/provider/anime/allanime/extractors/vid_mp4.py create mode 100644 viu_media/libs/provider/anime/allanime/extractors/we_transfer.py create mode 100644 viu_media/libs/provider/anime/allanime/extractors/wixmp.py create mode 100644 viu_media/libs/provider/anime/allanime/extractors/yt_mp4.py create mode 100644 viu_media/libs/provider/anime/allanime/mappers.py create mode 100644 viu_media/libs/provider/anime/allanime/provider.py create mode 100644 viu_media/libs/provider/anime/allanime/types.py create mode 100644 viu_media/libs/provider/anime/allanime/utils.py create mode 100644 viu_media/libs/provider/anime/animepahe/__init__.py create mode 100644 viu_media/libs/provider/anime/animepahe/constants.py create mode 100644 viu_media/libs/provider/anime/animepahe/extractor.py create mode 100644 viu_media/libs/provider/anime/animepahe/mappers.py create mode 100644 viu_media/libs/provider/anime/animepahe/provider.py create mode 100644 viu_media/libs/provider/anime/animepahe/types.py create mode 100644 viu_media/libs/provider/anime/animeunity/__init__.py create mode 100644 viu_media/libs/provider/anime/animeunity/constants.py create mode 100644 viu_media/libs/provider/anime/animeunity/extractor.py create mode 100644 viu_media/libs/provider/anime/animeunity/mappers.py create mode 100644 viu_media/libs/provider/anime/animeunity/provider.py create mode 100644 viu_media/libs/provider/anime/base.py create mode 100644 viu_media/libs/provider/anime/params.py create mode 100644 viu_media/libs/provider/anime/provider.py create mode 100644 viu_media/libs/provider/anime/types.py create mode 100644 viu_media/libs/provider/anime/utils/debug.py create mode 100644 viu_media/libs/provider/scraping/__init__.py create mode 100644 viu_media/libs/provider/scraping/html_parser.py create mode 100644 viu_media/libs/provider/scraping/user_agents.py create mode 100644 viu_media/libs/provider/scraping/utils.py diff --git a/viu_media/libs/provider/__init__.py b/viu_media/libs/provider/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/viu_media/libs/provider/anime/__init__.py b/viu_media/libs/provider/anime/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/viu_media/libs/provider/anime/allanime/__init__.py b/viu_media/libs/provider/anime/allanime/__init__.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/viu_media/libs/provider/anime/allanime/__init__.py @@ -0,0 +1 @@ + diff --git a/viu_media/libs/provider/anime/allanime/constants.py b/viu_media/libs/provider/anime/allanime/constants.py new file mode 100644 index 0000000..cbe3e4e --- /dev/null +++ b/viu_media/libs/provider/anime/allanime/constants.py @@ -0,0 +1,39 @@ +import re + +from .....core.constants import GRAPHQL_DIR + +SERVERS_AVAILABLE = [ + "sharepoint", + "dropbox", + "gogoanime", + "weTransfer", + "wixmp", + "Yt", + "mp4-upload", +] +API_BASE_URL = "allanime.day" +API_GRAPHQL_REFERER = "https://allanime.to/" +API_GRAPHQL_ENDPOINT = f"https://api.{API_BASE_URL}/api/" +API_GRAPHQL_HEADERS= { + "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", + "Content-Type": "application/json", + "Origin": f"{API_GRAPHQL_REFERER}", +} + +# search constants +DEFAULT_COUNTRY_OF_ORIGIN = "all" +DEFAULT_NSFW = True +DEFAULT_UNKNOWN = True +DEFAULT_PER_PAGE = 40 +DEFAULT_PAGE = 1 + +# regex stuff +MP4_SERVER_JUICY_STREAM_REGEX = re.compile( + r"video/mp4\",src:\"(https?://.*/video\.mp4)\"" +) + +# graphql files +_GQL_QUERIES = GRAPHQL_DIR / "allanime" / "queries" +SEARCH_GQL = _GQL_QUERIES / "search.gql" +ANIME_GQL = _GQL_QUERIES / "anime.gql" +EPISODE_GQL = _GQL_QUERIES / "episodes.gql" diff --git a/viu_media/libs/provider/anime/allanime/extractors/__init__.py b/viu_media/libs/provider/anime/allanime/extractors/__init__.py new file mode 100644 index 0000000..c857165 --- /dev/null +++ b/viu_media/libs/provider/anime/allanime/extractors/__init__.py @@ -0,0 +1,3 @@ +from .extractor import extract_server + +__all__ = ["extract_server"] diff --git a/viu_media/libs/provider/anime/allanime/extractors/ak.py b/viu_media/libs/provider/anime/allanime/extractors/ak.py new file mode 100644 index 0000000..deb2dec --- /dev/null +++ b/viu_media/libs/provider/anime/allanime/extractors/ak.py @@ -0,0 +1,31 @@ +from ...types import EpisodeStream, Server +from ..constants import API_BASE_URL +from ..types import AllAnimeEpisode, AllAnimeSource +from .base import BaseExtractor + + +class AkExtractor(BaseExtractor): + @classmethod + def extract( + cls, + url, + client, + episode_number: str, + episode: AllAnimeEpisode, + source: AllAnimeSource, + ) -> Server: + response = client.get( + f"https://{API_BASE_URL}{url.replace('clock', 'clock.json')}", + timeout=10, + ) + response.raise_for_status() + streams = response.json() + + return Server( + name="Ak", + links=[ + EpisodeStream(link=link, quality="1080") for link in streams["links"] + ], + episode_title=episode["notes"], + headers={"Referer": f"https://{API_BASE_URL}/"}, + ) diff --git a/viu_media/libs/provider/anime/allanime/extractors/base.py b/viu_media/libs/provider/anime/allanime/extractors/base.py new file mode 100644 index 0000000..0ad66a9 --- /dev/null +++ b/viu_media/libs/provider/anime/allanime/extractors/base.py @@ -0,0 +1,20 @@ +from abc import ABC, abstractmethod + +from httpx import Client + +from ...types import Server +from ..types import AllAnimeEpisode, AllAnimeSource + + +class BaseExtractor(ABC): + @classmethod + @abstractmethod + def extract( + cls, + url: str, + client: Client, + episode_number: str, + episode: AllAnimeEpisode, + source: AllAnimeSource, + ) -> Server | None: + pass diff --git a/viu_media/libs/provider/anime/allanime/extractors/dropbox.py b/viu_media/libs/provider/anime/allanime/extractors/dropbox.py new file mode 100644 index 0000000..6b779c6 --- /dev/null +++ b/viu_media/libs/provider/anime/allanime/extractors/dropbox.py @@ -0,0 +1,31 @@ +from ...types import EpisodeStream, Server +from ..constants import API_BASE_URL +from ..types import AllAnimeEpisode, AllAnimeSource +from .base import BaseExtractor + + +class SakExtractor(BaseExtractor): + @classmethod + def extract( + cls, + url, + client, + episode_number: str, + episode: AllAnimeEpisode, + source: AllAnimeSource, + ) -> Server: + response = client.get( + f"https://{API_BASE_URL}{url.replace('clock', 'clock.json')}", + timeout=10, + ) + response.raise_for_status() + streams = response.json() + + return Server( + name="dropbox", + links=[ + EpisodeStream(link=link, quality="1080") for link in streams["links"] + ], + episode_title=episode["notes"], + headers={"Referer": f"https://{API_BASE_URL}/"}, + ) diff --git a/viu_media/libs/provider/anime/allanime/extractors/extractor.py b/viu_media/libs/provider/anime/allanime/extractors/extractor.py new file mode 100644 index 0000000..92deccd --- /dev/null +++ b/viu_media/libs/provider/anime/allanime/extractors/extractor.py @@ -0,0 +1,65 @@ +from httpx import Client + +from ...types import Server +from ..types import AllAnimeEpisode, AllAnimeSource +from ..utils import debug_extractor, logger, one_digit_symmetric_xor +from .ak import AkExtractor +from .dropbox import SakExtractor +from .filemoon import FmHlsExtractor, OkExtractor +from .gogoanime import Lufmp4Extractor +from .mp4_upload import Mp4Extractor +from .sharepoint import Smp4Extractor +from .streamsb import SsHlsExtractor +from .vid_mp4 import VidMp4Extractor +from .we_transfer import KirExtractor +from .wixmp import DefaultExtractor +from .yt_mp4 import YtExtractor + +AVAILABLE_SOURCES = { + "Sak": SakExtractor, + "S-mp4": Smp4Extractor, + "Luf-Mp4": Lufmp4Extractor, + "Default": DefaultExtractor, + "Yt-mp4": YtExtractor, + "Kir": KirExtractor, + "Mp4": Mp4Extractor, +} +OTHER_SOURCES = { + "Ak": AkExtractor, + "Vid-mp4": VidMp4Extractor, + "Ok": OkExtractor, + "Ss-Hls": SsHlsExtractor, + "Fm-Hls": FmHlsExtractor, +} + + +@debug_extractor +def extract_server( + client: Client, + episode_number: str, + episode: AllAnimeEpisode, + source: AllAnimeSource, +) -> Server | None: + url = source.get("sourceUrl") + if not url: + logger.debug(f"Url not found in source: {source}") + return + + if url.startswith("--"): + url = one_digit_symmetric_xor(56, url[2:]) + + logger.debug(f"Decrypting url for source: {source['sourceName']}") + if source["sourceName"] in OTHER_SOURCES: + logger.debug(f"Found {source['sourceName']} but ignoring") + return + + if source["sourceName"] not in AVAILABLE_SOURCES: + logger.debug( + f"Found {source['sourceName']} but did not expect it, its time to scrape lol" + ) + return + logger.debug(f"Found {source['sourceName']}") + + return AVAILABLE_SOURCES[source["sourceName"]].extract( + url, client, episode_number, episode, source + ) diff --git a/viu_media/libs/provider/anime/allanime/extractors/filemoon.py b/viu_media/libs/provider/anime/allanime/extractors/filemoon.py new file mode 100644 index 0000000..11eb735 --- /dev/null +++ b/viu_media/libs/provider/anime/allanime/extractors/filemoon.py @@ -0,0 +1,62 @@ +from ...types import EpisodeStream, Server +from ..constants import API_BASE_URL, MP4_SERVER_JUICY_STREAM_REGEX +from ..types import AllAnimeEpisode, AllAnimeSource +from .base import BaseExtractor + + +# TODO: requires decoding obsfucated js (filemoon) +class FmHlsExtractor(BaseExtractor): + @classmethod + def extract( + cls, + url, + client, + episode_number: str, + episode: AllAnimeEpisode, + source: AllAnimeSource, + ) -> Server: + response = client.get( + f"https://{API_BASE_URL}{url.replace('clock', 'clock.json')}", + timeout=10, + ) + response.raise_for_status() + + embed_html = response.text.replace(" ", "").replace("\n", "") + vid = MP4_SERVER_JUICY_STREAM_REGEX.search(embed_html) + if not vid: + raise Exception("") + return Server( + name="dropbox", + links=[EpisodeStream(link=vid.group(1), quality="1080")], + episode_title=episode["notes"], + headers={"Referer": "https://www.mp4upload.com/"}, + ) + + +# TODO: requires decoding obsfucated js (filemoon) +class OkExtractor(BaseExtractor): + @classmethod + def extract( + cls, + url, + client, + episode_number: str, + episode: AllAnimeEpisode, + source: AllAnimeSource, + ) -> Server: + response = client.get( + f"https://{API_BASE_URL}{url.replace('clock', 'clock.json')}", + timeout=10, + ) + response.raise_for_status() + + embed_html = response.text.replace(" ", "").replace("\n", "") + vid = MP4_SERVER_JUICY_STREAM_REGEX.search(embed_html) + if not vid: + raise Exception("") + return Server( + name="dropbox", + links=[EpisodeStream(link=vid.group(1), quality="1080")], + episode_title=episode["notes"], + headers={"Referer": "https://www.mp4upload.com/"}, + ) diff --git a/viu_media/libs/provider/anime/allanime/extractors/gogoanime.py b/viu_media/libs/provider/anime/allanime/extractors/gogoanime.py new file mode 100644 index 0000000..1fc4f03 --- /dev/null +++ b/viu_media/libs/provider/anime/allanime/extractors/gogoanime.py @@ -0,0 +1,34 @@ +from ...types import EpisodeStream, Server +from ..constants import API_BASE_URL +from ..types import AllAnimeEpisode, AllAnimeEpisodeStreams, AllAnimeSource +from .base import BaseExtractor + + +class Lufmp4Extractor(BaseExtractor): + @classmethod + def extract( + cls, + url, + client, + episode_number: str, + episode: AllAnimeEpisode, + source: AllAnimeSource, + ) -> Server: + response = client.get( + f"https://{API_BASE_URL}{url.replace('clock', 'clock.json')}", + timeout=10, + ) + response.raise_for_status() + streams: AllAnimeEpisodeStreams = response.json() + + return Server( + name="gogoanime", + links=[ + EpisodeStream( + link=stream["link"], quality="1080", format=stream["resolutionStr"] + ) + for stream in streams["links"] + ], + episode_title=episode["notes"], + headers={"Referer": f"https://{API_BASE_URL}/"}, + ) diff --git a/viu_media/libs/provider/anime/allanime/extractors/mp4_upload.py b/viu_media/libs/provider/anime/allanime/extractors/mp4_upload.py new file mode 100644 index 0000000..0dd60e0 --- /dev/null +++ b/viu_media/libs/provider/anime/allanime/extractors/mp4_upload.py @@ -0,0 +1,32 @@ +from ...types import EpisodeStream, Server +from ..constants import MP4_SERVER_JUICY_STREAM_REGEX +from ..utils import logger +from .base import BaseExtractor + + +class Mp4Extractor(BaseExtractor): + @classmethod + def extract(cls, url, client, episode_number, episode, source): + response = client.get(url, timeout=10, follow_redirects=True) + response.raise_for_status() + + embed_html = response.text.replace(" ", "").replace("\n", "") + + # NOTE: some of the video were deleted so the embed html will just be "Filewasdeleted" + vid = MP4_SERVER_JUICY_STREAM_REGEX.search(embed_html) + if not vid: + if embed_html == "Filewasdeleted": + logger.debug( + "Failed to extract stream url from mp4-uploads. Reason: Filewasdeleted" + ) + return + logger.debug( + f"Failed to extract stream url from mp4-uploads. Reason: unknown. Embed html: {embed_html}" + ) + return + return Server( + name="mp4-upload", + links=[EpisodeStream(link=vid.group(1), quality="1080")], + episode_title=episode["notes"], + headers={"Referer": "https://www.mp4upload.com/"}, + ) diff --git a/viu_media/libs/provider/anime/allanime/extractors/sharepoint.py b/viu_media/libs/provider/anime/allanime/extractors/sharepoint.py new file mode 100644 index 0000000..a105d6c --- /dev/null +++ b/viu_media/libs/provider/anime/allanime/extractors/sharepoint.py @@ -0,0 +1,28 @@ +from ...types import EpisodeStream, Server +from ..constants import API_BASE_URL +from ..types import AllAnimeEpisodeStreams +from .base import BaseExtractor + + +class Smp4Extractor(BaseExtractor): + @classmethod + def extract(cls, url, client, episode_number, episode, source): + response = client.get( + f"https://{API_BASE_URL}{url.replace('clock', 'clock.json')}", + timeout=10, + ) + response.raise_for_status() + streams: AllAnimeEpisodeStreams = response.json() + return Server( + name="sharepoint", + links=[ + EpisodeStream( + link=stream["link"], + quality="1080", + format=stream["resolutionStr"], + ) + for stream in streams["links"] + ], + episode_title=episode["notes"], + headers={"Referer": f"https://{API_BASE_URL}/"}, + ) diff --git a/viu_media/libs/provider/anime/allanime/extractors/streamsb.py b/viu_media/libs/provider/anime/allanime/extractors/streamsb.py new file mode 100644 index 0000000..56670c6 --- /dev/null +++ b/viu_media/libs/provider/anime/allanime/extractors/streamsb.py @@ -0,0 +1,32 @@ +from ...types import EpisodeStream, Server +from ..constants import API_BASE_URL +from ..types import AllAnimeEpisode, AllAnimeSource +from .base import BaseExtractor + + +class SsHlsExtractor(BaseExtractor): + @classmethod + def extract( + cls, + url, + client, + episode_number: str, + episode: AllAnimeEpisode, + source: AllAnimeSource, + ) -> Server: + # TODO: requires some serious work i think : ) + response = client.get( + url, + timeout=10, + ) + response.raise_for_status() + streams = response.json()["links"] + + return Server( + name="StreamSb", + links=[ + EpisodeStream(link=link, quality="1080") for link in streams["links"] + ], + episode_title=episode["notes"], + headers={"Referer": f"https://{API_BASE_URL}/"}, + ) diff --git a/viu_media/libs/provider/anime/allanime/extractors/vid_mp4.py b/viu_media/libs/provider/anime/allanime/extractors/vid_mp4.py new file mode 100644 index 0000000..6f9b51c --- /dev/null +++ b/viu_media/libs/provider/anime/allanime/extractors/vid_mp4.py @@ -0,0 +1,32 @@ +from ...types import EpisodeStream, Server +from ..constants import API_BASE_URL +from ..types import AllAnimeEpisode, AllAnimeSource +from .base import BaseExtractor + + +# TODO: requires some serious work i think : ) +class VidMp4Extractor(BaseExtractor): + @classmethod + def extract( + cls, + url, + client, + episode_number: str, + episode: AllAnimeEpisode, + source: AllAnimeSource, + ) -> Server: + response = client.get( + f"https://{API_BASE_URL}{url.replace('clock', 'clock.json')}", + timeout=10, + ) + response.raise_for_status() + streams = response.json() + + return Server( + name="Vid-mp4", + links=[ + EpisodeStream(link=link, quality="1080") for link in streams["links"] + ], + episode_title=episode["notes"], + headers={"Referer": f"https://{API_BASE_URL}/"}, + ) diff --git a/viu_media/libs/provider/anime/allanime/extractors/we_transfer.py b/viu_media/libs/provider/anime/allanime/extractors/we_transfer.py new file mode 100644 index 0000000..b723a5d --- /dev/null +++ b/viu_media/libs/provider/anime/allanime/extractors/we_transfer.py @@ -0,0 +1,31 @@ +from ...types import EpisodeStream, Server +from ..constants import API_BASE_URL +from ..types import AllAnimeEpisode, AllAnimeSource +from .base import BaseExtractor + + +class KirExtractor(BaseExtractor): + @classmethod + def extract( + cls, + url, + client, + episode_number: str, + episode: AllAnimeEpisode, + source: AllAnimeSource, + ) -> Server: + response = client.get( + f"https://{API_BASE_URL}{url.replace('clock', 'clock.json')}", + timeout=10, + ) + response.raise_for_status() + streams = response.json() + + return Server( + name="weTransfer", + links=[ + EpisodeStream(link=link, quality="1080") for link in streams["links"] + ], + episode_title=episode["notes"], + headers={"Referer": f"https://{API_BASE_URL}/"}, + ) diff --git a/viu_media/libs/provider/anime/allanime/extractors/wixmp.py b/viu_media/libs/provider/anime/allanime/extractors/wixmp.py new file mode 100644 index 0000000..59e8b13 --- /dev/null +++ b/viu_media/libs/provider/anime/allanime/extractors/wixmp.py @@ -0,0 +1,26 @@ +from ...types import EpisodeStream, Server +from ..constants import API_BASE_URL +from ..types import AllAnimeEpisodeStreams +from .base import BaseExtractor + + +class DefaultExtractor(BaseExtractor): + @classmethod + def extract(cls, url, client, episode_number, episode, source): + response = client.get( + f"https://{API_BASE_URL}{url.replace('clock', 'clock.json')}", + timeout=10, + ) + response.raise_for_status() + streams: AllAnimeEpisodeStreams = response.json() + return Server( + name="wixmp", + links=[ + EpisodeStream( + link=stream["link"], quality="1080", format=stream["resolutionStr"] + ) + for stream in streams["links"] + ], + episode_title=episode["notes"], + headers={"Referer": f"https://{API_BASE_URL}/"}, + ) diff --git a/viu_media/libs/provider/anime/allanime/extractors/yt_mp4.py b/viu_media/libs/provider/anime/allanime/extractors/yt_mp4.py new file mode 100644 index 0000000..4e8fad3 --- /dev/null +++ b/viu_media/libs/provider/anime/allanime/extractors/yt_mp4.py @@ -0,0 +1,22 @@ +from ...types import EpisodeStream, Server +from ..constants import API_BASE_URL +from ..types import AllAnimeEpisode, AllAnimeSource +from .base import BaseExtractor + + +class YtExtractor(BaseExtractor): + @classmethod + def extract( + cls, + url, + client, + episode_number: str, + episode: AllAnimeEpisode, + source: AllAnimeSource, + ) -> Server: + return Server( + name="Yt", + links=[EpisodeStream(link=url, quality="1080")], + episode_title=episode["notes"], + headers={"Referer": f"https://{API_BASE_URL}/"}, + ) diff --git a/viu_media/libs/provider/anime/allanime/mappers.py b/viu_media/libs/provider/anime/allanime/mappers.py new file mode 100644 index 0000000..51cc591 --- /dev/null +++ b/viu_media/libs/provider/anime/allanime/mappers.py @@ -0,0 +1,58 @@ +from typing import Union + +from httpx import Response + +from ..types import ( + Anime, + AnimeEpisodes, + MediaTranslationType, + PageInfo, + SearchResult, + SearchResults, +) +from .types import AllAnimeSearchResults, AllAnimeShow + + +def generate_list(count: Union[int, str]) -> list[str]: + return list(map(str, range(int(count)))) + + +translation_type_map = { + "sub": MediaTranslationType.SUB, + "dub": MediaTranslationType.DUB, + "raw": MediaTranslationType.RAW, +} + + +def map_to_search_results(response: Response) -> SearchResults: + search_results: AllAnimeSearchResults = response.json()["data"] + return SearchResults( + page_info=PageInfo(total=search_results["shows"]["pageInfo"]["total"]), + results=[ + SearchResult( + id=result["_id"], + title=result["name"], + media_type=result["__typename"], + episodes=AnimeEpisodes( + sub=generate_list(result["availableEpisodes"]["sub"]), + dub=generate_list(result["availableEpisodes"]["dub"]), + raw=generate_list(result["availableEpisodes"]["raw"]), + ), + ) + for result in search_results["shows"]["edges"] + ], + ) + + +def map_to_anime_result(response: Response) -> Anime: + anime: AllAnimeShow = response.json()["data"]["show"] + return Anime( + id=anime["_id"], + title=anime["name"], + episodes=AnimeEpisodes( + sub=sorted(anime["availableEpisodesDetail"]["sub"], key=float), + dub=sorted(anime["availableEpisodesDetail"]["dub"], key=float), + raw=sorted(anime["availableEpisodesDetail"]["raw"], key=float), + ), + type=anime.get("__typename"), + ) diff --git a/viu_media/libs/provider/anime/allanime/provider.py b/viu_media/libs/provider/anime/allanime/provider.py new file mode 100644 index 0000000..d4bf579 --- /dev/null +++ b/viu_media/libs/provider/anime/allanime/provider.py @@ -0,0 +1,84 @@ +import logging +from typing import TYPE_CHECKING + +from .....core.utils.graphql import execute_graphql +from ..base import BaseAnimeProvider +from ..utils.debug import debug_provider +from .constants import ( + ANIME_GQL, + API_GRAPHQL_ENDPOINT, + API_GRAPHQL_HEADERS, + API_GRAPHQL_REFERER, + EPISODE_GQL, + SEARCH_GQL, +) +from .mappers import ( + map_to_anime_result, + map_to_search_results, +) + +if TYPE_CHECKING: + from .types import AllAnimeEpisode +logger = logging.getLogger(__name__) + + +class AllAnime(BaseAnimeProvider): + HEADERS = {"Referer": API_GRAPHQL_REFERER} + + @debug_provider + def search(self, params): + response = execute_graphql( + API_GRAPHQL_ENDPOINT, + self.client, + SEARCH_GQL, + variables={ + "search": { + "allowAdult": params.allow_nsfw, + "allowUnknown": params.allow_unknown, + "query": params.query, + }, + "limit": params.page_limit, + "page": params.current_page, + "translationtype": params.translation_type, + "countryorigin": params.country_of_origin, + }, + headers=API_GRAPHQL_HEADERS + ) + return map_to_search_results(response) + + @debug_provider + def get(self, params): + response = execute_graphql( + API_GRAPHQL_ENDPOINT, + self.client, + ANIME_GQL, + variables={"showId": params.id}, + headers=API_GRAPHQL_HEADERS + ) + return map_to_anime_result(response) + + @debug_provider + def episode_streams(self, params): + from .extractors import extract_server + + episode_response = execute_graphql( + API_GRAPHQL_ENDPOINT, + self.client, + EPISODE_GQL, + variables={ + "showId": params.anime_id, + "translationType": params.translation_type, + "episodeString": params.episode, + }, + headers=API_GRAPHQL_HEADERS + ) + episode: AllAnimeEpisode = episode_response.json()["data"]["episode"] + for source in episode["sourceUrls"]: + if server := extract_server(self.client, params.episode, episode, source): + yield server + + +if __name__ == "__main__": + from ..utils.debug import test_anime_provider + + test_anime_provider(AllAnime) diff --git a/viu_media/libs/provider/anime/allanime/types.py b/viu_media/libs/provider/anime/allanime/types.py new file mode 100644 index 0000000..2c6f7d3 --- /dev/null +++ b/viu_media/libs/provider/anime/allanime/types.py @@ -0,0 +1,111 @@ +from enum import Enum +from typing import Literal, TypedDict + + +class Server(Enum): + SHAREPOINT = "sharepoint" + DROPBOX = "dropbox" + GOGOANIME = "gogoanime" + WETRANSFER = "weTransfer" + WIXMP = "wixmp" + YT = "Yt" + MP4_UPLOAD = "mp4-upload" + + +class AllAnimeEpisodesDetail(TypedDict): + dub: list[str] + sub: list[str] + raw: list[str] + + +class AllAnimeEpisodes(TypedDict): + dub: int + sub: int + raw: int + + +class AllAnimePageInfo(TypedDict): + total: int + + +class AllAnimeShow(TypedDict): + _id: str + name: str + availableEpisodesDetail: AllAnimeEpisodesDetail + __typename: str + + +class AllAnimeSearchResult(TypedDict): + _id: str + name: str + availableEpisodes: AllAnimeEpisodes + __typename: str | None + + +class AllAnimeShows(TypedDict): + pageInfo: AllAnimePageInfo + edges: list[AllAnimeSearchResult] + + +class AllAnimeSearchResults(TypedDict): + shows: AllAnimeShows + + +class AllAnimeSourceDownload(TypedDict): + sourceName: str + dowloadUrl: str + + +class AllAnimeSource(TypedDict): + sourceName: Literal[ + "Sak", + "S-mp4", + "Luf-mp4", + "Default", + "Yt-mp4", + "Kir", + "Mp4", + "Ak", + "Vid-mp4", + "Ok", + "Ss-Hls", + "Fm-Hls", + ] + sourceUrl: str + priority: float + sandbox: str + type: str + className: str + streamerId: str + downloads: AllAnimeSourceDownload + + +class AllAnimeEpisodeStream(TypedDict): + link: str + hls: bool + resolutionStr: str + fromCache: str + + +class AllAnimeEpisodeStreams(TypedDict): + links: list[AllAnimeEpisodeStream] + + +class AllAnimeEpisode(TypedDict): + episodeString: str + sourceUrls: list[AllAnimeSource] + notes: str | None + + +class AllAnimeStream: + link: str + mp4: bool + hls: bool | None + resolutionStr: str + fromCache: str + priority: int + headers: dict | None + + +class AllAnimeStreams: + links: list[AllAnimeStream] diff --git a/viu_media/libs/provider/anime/allanime/utils.py b/viu_media/libs/provider/anime/allanime/utils.py new file mode 100644 index 0000000..f1e60b0 --- /dev/null +++ b/viu_media/libs/provider/anime/allanime/utils.py @@ -0,0 +1,92 @@ +import functools +import logging +import os +import re +from itertools import cycle + +logger = logging.getLogger(__name__) + +# Dictionary to map hex values to characters +hex_to_char = { + "01": "9", + "08": "0", + "05": "=", + "0a": "2", + "0b": "3", + "0c": "4", + "07": "?", + "00": "8", + "5c": "d", + "0f": "7", + "5e": "f", + "17": "/", + "54": "l", + "09": "1", + "48": "p", + "4f": "w", + "0e": "6", + "5b": "c", + "5d": "e", + "0d": "5", + "53": "k", + "1e": "&", + "5a": "b", + "59": "a", + "4a": "r", + "4c": "t", + "4e": "v", + "57": "o", + "51": "i", +} + + +def debug_extractor(extractor_function): + @functools.wraps(extractor_function) + def _provider_function_wrapper(*args): + if not os.environ.get("VIU_DEBUG"): + try: + return extractor_function(*args) + except Exception as e: + logger.error( + f"[AllAnime@Server={args[3].get('sourceName', 'UNKNOWN')}]: {e}" + ) + else: + return extractor_function(*args) + + return _provider_function_wrapper + + +def give_random_quality(links): + qualities = cycle(["1080", "720", "480", "360"]) + + return [ + {**episode_stream, "quality": quality} + for episode_stream, quality in zip(links, qualities, strict=False) + ] + + +def one_digit_symmetric_xor(password: int, target: str): + def genexp(): + for segment in bytearray.fromhex(target): + yield segment ^ password + + return bytes(genexp()).decode("utf-8") + + +def decode_hex_string(hex_string): + """some of the sources encrypt the urls into hex codes this function decrypts the urls + + Args: + hex_string ([TODO:parameter]): [TODO:description] + + Returns: + [TODO:return] + """ + # Split the hex string into pairs of characters + hex_pairs = re.findall("..", hex_string) + + # Decode each hex pair + decoded_chars = [hex_to_char.get(pair.lower(), pair) for pair in hex_pairs] + + # TODO: Better type handling + return "".join(decoded_chars) # type: ignore diff --git a/viu_media/libs/provider/anime/animepahe/__init__.py b/viu_media/libs/provider/anime/animepahe/__init__.py new file mode 100644 index 0000000..8b13789 --- /dev/null +++ b/viu_media/libs/provider/anime/animepahe/__init__.py @@ -0,0 +1 @@ + diff --git a/viu_media/libs/provider/anime/animepahe/constants.py b/viu_media/libs/provider/anime/animepahe/constants.py new file mode 100644 index 0000000..735fc09 --- /dev/null +++ b/viu_media/libs/provider/anime/animepahe/constants.py @@ -0,0 +1,56 @@ +import re + +ANIMEPAHE = "animepahe.pw" +ANIMEPAHE_BASE = f"https://{ANIMEPAHE}" +ANIMEPAHE_ENDPOINT = f"{ANIMEPAHE_BASE}/api" +CDN_PROVIDER = "kwik.cx" +CDN_PROVIDER_BASE = f"https://{CDN_PROVIDER}" + +SERVERS_AVAILABLE = ["kwik"] +REQUEST_HEADERS = { + "Cookie": "__ddgid_=VvX0ebHrH2DsFZo4; __ddgmark_=3savRpSVFhvZcn5x; __ddg2_=buBJ3c4pNBYKFZNp; __ddg1_=rbVADKr9URtt55zoIGFa; SERVERID=janna; XSRF-TOKEN=eyJpdiI6IjV5bFNtd0phUHgvWGJxc25wL0VJSUE9PSIsInZhbHVlIjoicEJTZktlR2hxR2JZTWhnL0JzazlvZU5TQTR2bjBWZ2dDb0RwUXVUUWNSclhQWUhLRStYSmJmWmUxWkpiYkFRYU12RjFWejlSWHorME1wZG5qQ1U0TnFlNnBFR2laQjN1MjdyNjc5TjVPdXdJb2o5VkU1bEduRW9pRHNDTHh6Sy8iLCJtYWMiOiI0OTc0ZmNjY2UwMGJkOWY2MWNkM2NlMjk2ZGMyZGJmMWE0NTdjZTdkNGI2Y2IwNTIzZmFiZWU5ZTE2OTk0YmU4IiwidGFnIjoiIn0%3D; laravel_session=eyJpdiI6ImxvdlpqREFnTjdaeFJubUlXQWlJVWc9PSIsInZhbHVlIjoiQnE4R3VHdjZ4M1NDdEVWM1ZqMUxtNnVERnJCcmtCUHZKNzRPR2RFbzNFcStTL29xdnVTbWhsNVRBUXEybVZWNU1UYVlTazFqYlN5UjJva1k4czNGaXBTbkJJK01oTUd3VHRYVHBoc3dGUWxHYnFlS2NJVVNFbTFqMVBWdFpuVUgiLCJtYWMiOiI1NDdjZTVkYmNhNjUwZTMxZmRlZmVmMmRlMGNiYjAwYjlmYjFjY2U0MDc1YTQzZThiMTIxMjJlYTg1NTA4YjBmIiwidGFnIjoiIn0%3D; latest=5592", + "Host": ANIMEPAHE, + "Accept": "application, text/javascript, */*; q=0.01", + "Accept-Encoding": "Utf-8", + "Referer": ANIMEPAHE_BASE, + "DNT": "1", + "Connection": "keep-alive", + "Sec-Fetch-Dest": "empty", + "Sec-Fetch-Site": "same-origin", + "Sec-Fetch-Mode": "cors", + "TE": "trailers", +} +SERVER_HEADERS = { + "Host": "kwik.cx", + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/png,image/svg+xml,*/*;q=0.8", + "Accept-Language": "en-US,en;q=0.5", + "Accept-Encoding": "Utf-8", + "DNT": "1", + "Connection": "keep-alive", + "Referer": ANIMEPAHE_BASE + "/", + "Upgrade-Insecure-Requests": "1", + "Sec-Fetch-Dest": "iframe", + "Sec-Fetch-Mode": "navigate", + "Sec-Fetch-Site": "cross-site", + "Priority": "u=4", + "TE": "trailers", +} + +STREAM_HEADERS = { + # "Host": "vault-16.owocdn.top", # This will have to be the actual host of the stream (behind Kwik) + "Accept": "*/*", + "Accept-Language": "en-US,en;q=0.5", + "Accept-Encoding": "gzip, deflate, br, zstd", + "Origin": CDN_PROVIDER_BASE, + "Sec-GPC": "1", + "Connection": "keep-alive", + "Referer": CDN_PROVIDER_BASE + "/", + "Sec-Fetch-Dest": "empty", + "Sec-Fetch-Mode": "cors", + "Sec-Fetch-Site": "cross-site", + "TE": "trailers", +} + + +JUICY_STREAM_REGEX = re.compile(r"source='(.*)';") +KWIK_RE = re.compile(r"Player\|(.+?)'") diff --git a/viu_media/libs/provider/anime/animepahe/extractor.py b/viu_media/libs/provider/anime/animepahe/extractor.py new file mode 100644 index 0000000..349aa74 --- /dev/null +++ b/viu_media/libs/provider/anime/animepahe/extractor.py @@ -0,0 +1,77 @@ +import re + + +def animepahe_key_creator(c: int, a: int): + from ...scraping.utils import encode_base_n + + if c < a: + val_a = "" + else: + val_a = animepahe_key_creator(int(c / a), a) + c = c % a + if c > 35: + val_b = chr(c + 29) + else: + val_b = encode_base_n(c, 36) + return val_a + val_b + + +def animepahe_embed_decoder( + encoded_js_p: str, + base_a: int, + no_of_keys_c: int, + values_to_replace_with_k: list, +): + decode_mapper_d: dict = {} + for i in range(no_of_keys_c): + key = animepahe_key_creator(i, base_a) + val = values_to_replace_with_k[i] or key + decode_mapper_d[key] = val + return re.sub( + r"\b\w+\b", lambda match: decode_mapper_d[match.group(0)], encoded_js_p + ) + + +PARAMETERS_REGEX = re.compile(r"eval\(function\(p,a,c,k,e,d\)\{.*\}\((.*?)\)\)$") +ENCODE_JS_REGEX = re.compile(r"'(.*?);',(\d+),(\d+),'(.*)'\.split") + + +def process_animepahe_embed_page(embed_page: str): + from ...scraping.html_parser import get_element_text_and_html_by_tag + + encoded_js_string = "" + embed_page_content = embed_page + for _ in range(8): + text, html = get_element_text_and_html_by_tag("script", embed_page_content) + if not text and html: + embed_page_content = re.sub(html, "", embed_page_content) + continue + if text: + encoded_js_string = text.strip() + break + if not encoded_js_string: + return + obsfucated_js_parameter_match = PARAMETERS_REGEX.search(encoded_js_string) + if not obsfucated_js_parameter_match: + return + parameter_string = obsfucated_js_parameter_match.group(1) + encoded_js_parameter_string = ENCODE_JS_REGEX.search(parameter_string) + if not encoded_js_parameter_string: + return + p: str = encoded_js_parameter_string.group(1) + a: int = int(encoded_js_parameter_string.group(2)) + c: int = int(encoded_js_parameter_string.group(3)) + k: list = encoded_js_parameter_string.group(4).split("|") + return animepahe_embed_decoder(p, a, c, k).replace("\\", "") + + +if __name__ == "__main__": + # Testing time + filepath = input("Enter file name: ") + if filepath: + with open(filepath) as file: + data = file.read() + else: + data = """""" + + print(process_animepahe_embed_page(data)) diff --git a/viu_media/libs/provider/anime/animepahe/mappers.py b/viu_media/libs/provider/anime/animepahe/mappers.py new file mode 100644 index 0000000..4fa1c69 --- /dev/null +++ b/viu_media/libs/provider/anime/animepahe/mappers.py @@ -0,0 +1,113 @@ +import logging + +from ..types import ( + Anime, + AnimeEpisodeInfo, + AnimeEpisodes, + EpisodeStream, + MediaTranslationType, + PageInfo, + SearchResult, + SearchResults, + Server, +) +from .types import ( + AnimePaheAnimePage, + AnimePaheSearchPage, +) + +translation_type_map = { + "sub": MediaTranslationType.SUB, + "dub": MediaTranslationType.DUB, + "raw": MediaTranslationType.RAW, +} + +logger = logging.getLogger(__name__) + + +def map_to_search_results(data: AnimePaheSearchPage) -> SearchResults: + results = [] + for result in data["data"]: + results.append( + SearchResult( + id=result["session"], + title=result["title"], + episodes=AnimeEpisodes( + sub=list(map(str, range(1, result["episodes"] + 1))), + dub=list(map(str, range(1, result["episodes"] + 1))), + raw=list(map(str, range(1, result["episodes"] + 1))), + ), + media_type=result["type"], + score=result["score"], + status=result["status"], + season=result["season"], + poster=result["poster"], + year=str(result["year"]), + ) + ) + + return SearchResults( + page_info=PageInfo( + total=data["total"], + per_page=data["per_page"], + current_page=data["current_page"], + ), + results=results, + ) + + +def map_to_anime_result( + search_result: SearchResult, anime: AnimePaheAnimePage +) -> Anime: + episodes_info = [] + episodes = [] + anime["data"] = sorted(anime["data"], key=lambda k: float(k["episode"])) + for ep_info in anime["data"]: + episodes.append(str(ep_info["episode"])) + episodes_info.append( + AnimeEpisodeInfo( + id=str(ep_info["id"]), + session_id=ep_info["session"], + episode=str(ep_info["episode"]), + title=ep_info["title"], + poster=ep_info["snapshot"], + duration=str(ep_info["duration"]), + ) + ) + + return Anime( + id=search_result.id, + title=search_result.title, + episodes=AnimeEpisodes( + sub=episodes, + dub=episodes, + ), + year=str(search_result.year), + poster=search_result.poster, + episodes_info=episodes_info, + ) + + +def map_to_server( + episode: AnimeEpisodeInfo, + translation_type: str, + stream_links: list[tuple[str, str]], + headers: dict[str, str], +) -> Server: + links = [ + EpisodeStream( + link=link[1], + quality=link[0] if link[0] in ["360", "480", "720", "1080"] else "1080", # type:ignore + translation_type=translation_type_map[translation_type], + ) + for link in stream_links + ] + + # sort links by quality, best to worst + links.sort(key=lambda x: int(x.quality), reverse=True) + logger.debug(f"Aggregated links: {links}") + + return Server( + name="kwik", links=links, episode_title=episode.title, headers=headers + ) + diff --git a/viu_media/libs/provider/anime/animepahe/provider.py b/viu_media/libs/provider/anime/animepahe/provider.py new file mode 100644 index 0000000..3e0a432 --- /dev/null +++ b/viu_media/libs/provider/anime/animepahe/provider.py @@ -0,0 +1,214 @@ +import logging +from functools import lru_cache +from typing import Iterator, Optional +from urllib.parse import urlparse + +from ..base import BaseAnimeProvider +from ..params import AnimeParams, EpisodeStreamsParams, SearchParams +from ..types import Anime, AnimeEpisodeInfo, SearchResult, SearchResults, Server +from ..utils.debug import debug_provider +from .constants import ( + ANIMEPAHE_BASE, + ANIMEPAHE_ENDPOINT, + CDN_PROVIDER, + JUICY_STREAM_REGEX, + REQUEST_HEADERS, + SERVER_HEADERS, + STREAM_HEADERS, +) +from .extractor import process_animepahe_embed_page +from .mappers import map_to_anime_result, map_to_search_results, map_to_server +from .types import AnimePaheAnimePage, AnimePaheSearchPage + +logger = logging.getLogger(__name__) + + +class AnimePahe(BaseAnimeProvider): + HEADERS = REQUEST_HEADERS + + @debug_provider + def search(self, params: SearchParams) -> SearchResults | None: + return self._search(params) + + @lru_cache() + def _search(self, params: SearchParams) -> SearchResults | None: + url_params = {"m": "search", "q": params.query} + response = self.client.get(ANIMEPAHE_ENDPOINT, params=url_params) + response.raise_for_status() + data: AnimePaheSearchPage = response.json() + if not data.get("data"): + return + return map_to_search_results(data) + + @debug_provider + def get(self, params: AnimeParams) -> Anime | None: + return self._get_anime(params) + + @lru_cache() + def _get_anime(self, params: AnimeParams) -> Anime | None: + page = 1 + standardized_episode_number = 0 + + search_result = self._get_search_result(params) + if not search_result: + logger.error(f"No search result found for ID {params.id}") + return None + + anime: Optional[AnimePaheAnimePage] = None + + has_next_page = True + while has_next_page: + logger.debug(f"Loading page: {page}") + _anime_page = self._anime_page_loader( + m="release", + id=params.id, + sort="episode_asc", + page=page, + ) + + has_next_page = True if _anime_page["next_page_url"] else False + page += 1 + if not anime: + anime = _anime_page + else: + anime["data"].extend(_anime_page["data"]) + + if anime: + for episode in anime.get("data", []): + if episode["episode"] % 1 == 0: + standardized_episode_number += 1 + episode.update({"episode": standardized_episode_number}) + else: + standardized_episode_number += episode["episode"] % 1 + episode.update({"episode": standardized_episode_number}) + standardized_episode_number = int(standardized_episode_number) + + return map_to_anime_result(search_result, anime) + + @lru_cache() + def _get_search_result(self, params: AnimeParams) -> Optional[SearchResult]: + search_results = self._search(SearchParams(query=params.query)) + if not search_results or not search_results.results: + logger.error(f"No search results found for ID {params.id}") + return None + for search_result in search_results.results: + if search_result.id == params.id: + return search_result + + @lru_cache() + def _anime_page_loader(self, m, id, sort, page) -> AnimePaheAnimePage: + url_params = { + "m": m, + "id": id, + "sort": sort, + "page": page, + } + response = self.client.get(ANIMEPAHE_ENDPOINT, params=url_params) + response.raise_for_status() + return response.json() + + @debug_provider + def episode_streams(self, params: EpisodeStreamsParams) -> Iterator[Server] | None: + from ...scraping.html_parser import ( + extract_attributes, + get_element_by_id, + get_elements_html_by_class, + ) + + episode = self._get_episode_info(params) + if not episode: + logger.error( + f"Episode {params.episode} doesn't exist for anime {params.anime_id}" + ) + return + + url = f"{ANIMEPAHE_BASE}/play/{params.anime_id}/{episode.session_id}" + response = self.client.get(url, follow_redirects=True) + response.raise_for_status() + + c = get_element_by_id("resolutionMenu", response.text) + if not c: + logger.error("Resolution menu not found in the response") + return + resolutionMenuItems = get_elements_html_by_class("dropdown-item", c) + res_dicts = [extract_attributes(item) for item in resolutionMenuItems] + quality = None + translation_type = None + stream_links = [] + stream_host = None + + # TODO: better document the scraping process + for res_dict in res_dicts: + # the actual attributes are data attributes in the original html 'prefixed with data-' + embed_url = res_dict["src"] + logger.debug(f"Found embed url: {embed_url}") + data_audio = "dub" if res_dict["audio"] == "eng" else "sub" + + if data_audio != params.translation_type: + logger.debug(f"Found {data_audio} but wanted {params.translation_type}") + continue + + if not embed_url: + logger.warning("embed url not found please report to the developers") + continue + + embed_response = self.client.get( + embed_url, + headers={ + "User-Agent": self.client.headers["User-Agent"], + **SERVER_HEADERS, + }, + ) + embed_response.raise_for_status() + embed_page = embed_response.text + logger.debug("Processing embed page for JS decoding") + + decoded_js = process_animepahe_embed_page(embed_page) + if not decoded_js: + logger.error("failed to decode embed page") + continue + logger.debug(f"Decoded JS: {decoded_js[:100]}...") + juicy_stream = JUICY_STREAM_REGEX.search(decoded_js) + if not juicy_stream: + logger.error("failed to find juicy stream") + continue + logger.debug(f"Found juicy stream: {juicy_stream.group(1)}") + juicy_stream = juicy_stream.group(1) + stream_host = urlparse(juicy_stream).hostname + quality = res_dict["resolution"] + logger.debug(f"Found quality: {quality}") + translation_type = data_audio + stream_links.append((quality, juicy_stream)) + + if translation_type and stream_links: + headers = { + "User-Agent": self.client.headers["User-Agent"], + "Host": stream_host or CDN_PROVIDER, + **STREAM_HEADERS, + } + yield map_to_server( + episode, translation_type, stream_links, headers=headers + ) + + @lru_cache() + def _get_episode_info( + self, params: EpisodeStreamsParams + ) -> Optional[AnimeEpisodeInfo]: + anime_info = self._get_anime( + AnimeParams(id=params.anime_id, query=params.query) + ) + if not anime_info: + logger.error(f"No anime info for {params.anime_id}") + return + if not anime_info.episodes_info: + logger.error(f"No episodes info for {params.anime_id}") + return + for episode in anime_info.episodes_info: + if episode.episode == params.episode: + return episode + + +if __name__ == "__main__": + from ..utils.debug import test_anime_provider + + test_anime_provider(AnimePahe) diff --git a/viu_media/libs/provider/anime/animepahe/types.py b/viu_media/libs/provider/anime/animepahe/types.py new file mode 100644 index 0000000..365eaac --- /dev/null +++ b/viu_media/libs/provider/anime/animepahe/types.py @@ -0,0 +1,108 @@ +from enum import Enum +from typing import Literal, TypedDict + + +class Server(Enum): + KWIK = "Kwik" + + +class AnimePaheSearchResult(TypedDict): + id: str + title: str + type: str + episodes: int + status: str + season: str + year: int + score: int + poster: str + session: str + + +class AnimePaheSearchPage(TypedDict): + total: int + per_page: int + current_page: int + last_page: int + _from: int + to: int + data: list[AnimePaheSearchResult] + + +class Episode(TypedDict): + id: str + anime_id: int + episode: float + episode2: int + edition: str + title: str + snapshot: str # episode image + disc: str + audio: Literal["eng", "jpn"] + duration: str # time 00:00:00 + session: str + filler: int + created_at: str + + +class AnimePaheAnimePage(TypedDict): + total: int + per_page: int + current_page: int + last_page: int + next_page_url: str | None + prev_page_url: str | None + _from: int + to: int + data: list[Episode] + + +class AnimePaheEpisodeInfo(TypedDict): + title: str + episode: float + id: str + translation_type: Literal["eng", "jpn"] + duration: str + poster: str + + +class AvailableEpisodesDetail(TypedDict): + sub: list[str] + dub: list[str] + raw: list[str] + + +class AnimePaheAnime(TypedDict): + id: str + title: str + year: int + season: str + poster: str + score: int + availableEpisodesDetail: AvailableEpisodesDetail + episodesInfo: list[AnimePaheEpisodeInfo] + + +class PageInfo(TypedDict): + total: int + perPage: int + currentPage: int + + +class AnimePaheSearchResults(TypedDict): + pageInfo: PageInfo + results: list[AnimePaheSearchResult] + + +class AnimePaheStreamLink(TypedDict): + quality: str + translation_type: Literal["sub", "dub"] + link: str + + +class AnimePaheServer(TypedDict): + server: Literal["kwik"] + links: list[AnimePaheStreamLink] + episode_title: str + subtitles: list + headers: dict diff --git a/viu_media/libs/provider/anime/animeunity/__init__.py b/viu_media/libs/provider/anime/animeunity/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/viu_media/libs/provider/anime/animeunity/constants.py b/viu_media/libs/provider/anime/animeunity/constants.py new file mode 100644 index 0000000..7552685 --- /dev/null +++ b/viu_media/libs/provider/anime/animeunity/constants.py @@ -0,0 +1,17 @@ +import re + +ANIMEUNITY = "animeunity.so" +ANIMEUNITY_BASE = f"https://www.{ANIMEUNITY}" + +MAX_TIMEOUT = 10 +TOKEN_REGEX = re.compile(r'') + +REPLACEMENT_WORDS = {"Season ": "", "Cour": "Part"} + +# Server Specific +AVAILABLE_VIDEO_QUALITY = ["1080", "720", "480"] +VIDEO_INFO_REGEX = re.compile(r"window.video\s*=\s*(\{[^\}]*\})") +VIDEO_INFO_CLEAN_REGEX = re.compile(r'(? dict | None: + """ + Extracts server information from the VixCloud/AnimeUnity embed page. + Handles extraction from both window.video object and download URL. + """ + video_info = VIDEO_INFO_REGEX.search(html_content) + download_url_match = DOWNLOAD_URL_REGEX.search(html_content) + + if not (download_url_match and video_info): + return None + + info_str = VIDEO_INFO_CLEAN_REGEX.sub(r'"\1"', video_info.group(1)) + + # Use eval context for JS constants + ctx = {"null": None, "true": True, "false": False} + try: + info = eval(info_str, ctx) + except Exception as e: + logger.error(f"Failed to parse JS object: {e}") + return None + + download_url = download_url_match.group(1) + info["link"] = download_url + + # Extract metadata from download URL if missing in window.video + if filename_match := DOWNLOAD_FILENAME_REGEX.search(download_url): + info["name"] = filename_match.group(1) + else: + info["name"] = f"{episode_title or 'Unknown'}" + + if quality_match := QUALITY_REGEX.search(download_url): + # "720p" -> 720 + info["quality"] = int(quality_match.group(1)[:-1]) + else: + info["quality"] = 0 # Fallback + + return info diff --git a/viu_media/libs/provider/anime/animeunity/mappers.py b/viu_media/libs/provider/anime/animeunity/mappers.py new file mode 100644 index 0000000..cd52b86 --- /dev/null +++ b/viu_media/libs/provider/anime/animeunity/mappers.py @@ -0,0 +1,133 @@ +from typing import Literal + +from ..types import ( + Anime, + AnimeEpisodeInfo, + AnimeEpisodes, + EpisodeStream, + MediaTranslationType, + PageInfo, + SearchResult, + SearchResults, + Server, +) +from .constants import AVAILABLE_VIDEO_QUALITY + + +def map_to_search_results( + data: dict, translation_type: Literal["sub", "dub"] +) -> SearchResults: + results = [] + for result in data: + mapped_result = map_to_search_result(result, translation_type) + if mapped_result: + results.append(mapped_result) + + return SearchResults( + page_info=PageInfo(), + results=results, + ) + + +def map_to_search_result( + data: dict, translation_type: Literal["sub", "dub"] | None +) -> SearchResult | None: + if translation_type and data["dub"] != 1 if translation_type == "dub" else 0: + return None + return SearchResult( + id=str(data["id"]), + title=get_titles(data)[0] if get_titles(data) else "Unknown", + episodes=AnimeEpisodes( + sub=( + list(map(str, range(1, get_episodes_count(data) + 1))) + if data["dub"] == 0 + else [] + ), + dub=( + list(map(str, range(1, get_episodes_count(data) + 1))) + if data["dub"] == 1 + else [] + ), + ), + other_titles=get_titles(data), + score=data["score"], + poster=data["imageurl"], + year=data["date"], + ) + + +def map_to_anime_result(data: list, search_result: SearchResult) -> Anime: + return Anime( + id=search_result.id, + title=search_result.title, + episodes=AnimeEpisodes( + sub=[ + episode["number"] + for episode in data + if len(search_result.episodes.sub) > 0 + ], + dub=[ + episode["number"] + for episode in data + if len(search_result.episodes.dub) > 0 + ], + ), + episodes_info=[ + AnimeEpisodeInfo( + id=str(episode["id"]), + episode=episode["number"], + title=f"{search_result.title} - Ep {episode['number']}", + ) + for episode in data + ], + type=search_result.media_type, + poster=search_result.poster, + year=search_result.year, + ) + + +def map_to_server( + episode: AnimeEpisodeInfo, info: dict, translation_type: Literal["sub", "dub"] +) -> Server: + return Server( + name="vixcloud", + links=[ + EpisodeStream( + link=info["link"].replace(str(info["quality"]), quality), + title=info["name"], + quality=quality, # type: ignore + translation_type=MediaTranslationType(translation_type), + mp4=True, + ) + for quality in sorted( + list(set(AVAILABLE_VIDEO_QUALITY + [str(info["quality"])])), + key=lambda x: int(x), + reverse=True, + ) + if int(quality) <= info["quality"] + ], + episode_title=episode.title, + ) + + +def get_titles(data: dict) -> list[str]: + """ + Return the most appropriate title from the record. + """ + titles = [] + if data.get("title_eng"): + titles.append(data["title_eng"]) + if data.get("title"): + titles.append(data["title"]) + if data.get("title_it"): + titles.append(data["title_it"]) + return titles + + +def get_episodes_count(record: dict) -> int: + """ + Return the number of episodes from the record. + """ + if (count := record.get("real_episodes_count", 0)) > 0: + return count + return record.get("episodes_count", 0) diff --git a/viu_media/libs/provider/anime/animeunity/provider.py b/viu_media/libs/provider/anime/animeunity/provider.py new file mode 100644 index 0000000..fed1b47 --- /dev/null +++ b/viu_media/libs/provider/anime/animeunity/provider.py @@ -0,0 +1,170 @@ +import logging +from functools import lru_cache + +from ...scraping.user_agents import UserAgentGenerator +from ..base import BaseAnimeProvider +from ..params import AnimeParams, EpisodeStreamsParams, SearchParams +from ..types import Anime, AnimeEpisodeInfo, SearchResult, SearchResults +from ..utils.debug import debug_provider +from .constants import ( + ANIMEUNITY_BASE, + MAX_TIMEOUT, + REPLACEMENT_WORDS, + TOKEN_REGEX, +) +from .extractor import extract_server_info +from .mappers import ( + map_to_anime_result, + map_to_search_result, + map_to_search_results, + map_to_server, +) + +logger = logging.getLogger(__name__) + + +class AnimeUnity(BaseAnimeProvider): + HEADERS = { + "User-Agent": UserAgentGenerator().random(), + } + _cache = dict[str, SearchResult]() + + @lru_cache + def _get_token(self) -> None: + response = self.client.get( + ANIMEUNITY_BASE, + headers=self.HEADERS, + timeout=MAX_TIMEOUT, + follow_redirects=True, + ) + response.raise_for_status() + token_match = TOKEN_REGEX.search(response.text) + if token_match: + self.HEADERS["x-csrf-token"] = token_match.group(1) + self.client.cookies = { + "animeunity_session": response.cookies.get("animeunity_session") or "" + } + self.client.headers = self.HEADERS + + @debug_provider + def search(self, params: SearchParams) -> SearchResults | None: + if not (res := self._search(params)): + return None + + for result in res.results: + self._cache[result.id] = result + + return res + + @lru_cache + def _search(self, params: SearchParams) -> SearchResults | None: + self._get_token() + # Replace words in query to + query = params.query + for old, new in REPLACEMENT_WORDS.items(): + query = query.replace(old, new) + + response = self.client.post( + url=f"{ANIMEUNITY_BASE}/livesearch", + data={"title": query}, + timeout=MAX_TIMEOUT, + ) + + response.raise_for_status() + return map_to_search_results( + response.json().get("records", []), params.translation_type + ) + + @debug_provider + def get(self, params: AnimeParams) -> Anime | None: + return self._get_anime(params) + + @lru_cache() + def _get_search_result(self, params: AnimeParams) -> SearchResult | None: + if cached := self._cache.get(params.id): + return cached + + response = self.client.get( + url=f"{ANIMEUNITY_BASE}/info_api/{params.id}/", + timeout=MAX_TIMEOUT, + ) + response.raise_for_status() + data = response.json() + + if res := map_to_search_result(data, None): + self._cache[params.id] = res + return res + + @lru_cache + def _get_anime(self, params: AnimeParams) -> Anime | None: + if (search_result := self._get_search_result(params)) is None: + logger.error(f"No search result found for ID {params.id}") + return None + + # Fetch episodes in chunks + data = [] + start_range = 1 + episode_count = max( + len(search_result.episodes.sub), len(search_result.episodes.dub) + ) + while start_range <= episode_count: + end_range = min(start_range + 119, episode_count) + response = self.client.get( + url=f"{ANIMEUNITY_BASE}/info_api/{params.id}/1", + params={ + "start_range": start_range, + "end_range": end_range, + }, + timeout=MAX_TIMEOUT, + ) + response.raise_for_status() + data.extend(response.json().get("episodes", [])) + start_range = end_range + 1 + + return map_to_anime_result(data, search_result) + + @lru_cache() + def _get_episode_info( + self, params: EpisodeStreamsParams + ) -> AnimeEpisodeInfo | None: + anime_info = self._get_anime( + AnimeParams(id=params.anime_id, query=params.query) + ) + if not anime_info: + logger.error(f"No anime info for {params.anime_id}") + return + if not anime_info.episodes_info: + logger.error(f"No episodes info for {params.anime_id}") + return + for episode in anime_info.episodes_info: + if episode.episode == params.episode: + return episode + + @debug_provider + def episode_streams(self, params: EpisodeStreamsParams): + if not (episode := self._get_episode_info(params)): + logger.error( + f"Episode {params.episode} doesn't exist for anime {params.anime_id}" + ) + return + # Get the Server url + response = self.client.get( + url=f"{ANIMEUNITY_BASE}/embed-url/{episode.id}", timeout=MAX_TIMEOUT + ) + response.raise_for_status() + + # Fetch the Server page + video_response = self.client.get(url=response.text.strip(), timeout=MAX_TIMEOUT) + video_response.raise_for_status() + + if not (info := extract_server_info(video_response.text, episode.title)): + logger.error(f"Failed to extract video info for episode {episode.id}") + return None + + yield map_to_server(episode, info, params.translation_type) + + +if __name__ == "__main__": + from ..utils.debug import test_anime_provider + + test_anime_provider(AnimeUnity) diff --git a/viu_media/libs/provider/anime/base.py b/viu_media/libs/provider/anime/base.py new file mode 100644 index 0000000..3110c2c --- /dev/null +++ b/viu_media/libs/provider/anime/base.py @@ -0,0 +1,39 @@ +from abc import ABC, abstractmethod +from typing import TYPE_CHECKING, ClassVar, Dict + +from .params import AnimeParams, EpisodeStreamsParams, SearchParams + +if TYPE_CHECKING: + from collections.abc import Iterator + + from httpx import Client + + from .types import Anime, SearchResults, Server + + +class BaseAnimeProvider(ABC): + HEADERS: ClassVar[Dict[str, str]] + + def __init_subclass__(cls, **kwargs): + super().__init_subclass__(**kwargs) + if not hasattr(cls, "HEADERS"): + raise TypeError( + "Subclasses of BaseAnimeProvider must define a 'HEADERS' class attribute." + ) + + def __init__(self, client: "Client") -> None: + self.client = client + + @abstractmethod + def search(self, params: SearchParams) -> "SearchResults | None": + pass + + @abstractmethod + def get(self, params: AnimeParams) -> "Anime | None": + pass + + @abstractmethod + def episode_streams( + self, params: EpisodeStreamsParams + ) -> "Iterator[Server] | None": + pass diff --git a/viu_media/libs/provider/anime/params.py b/viu_media/libs/provider/anime/params.py new file mode 100644 index 0000000..8c52f0c --- /dev/null +++ b/viu_media/libs/provider/anime/params.py @@ -0,0 +1,46 @@ +from dataclasses import dataclass +from typing import Literal, Optional + + +@dataclass(frozen=True) +class SearchParams: + """Parameters for searching anime.""" + + query: str + + # pagination and sorting + current_page: int = 1 + page_limit: int = 20 + sort_by: str = "relevance" + order: Literal["asc", "desc"] = "desc" + + # filters + translation_type: Literal["sub", "dub"] = "sub" + genre: Optional[str] = None + year: Optional[int] = None + status: Optional[str] = None + allow_nsfw: bool = True + allow_unknown: bool = True + country_of_origin: Optional[str] = None + + +@dataclass(frozen=True) +class EpisodeStreamsParams: + """Parameters for fetching episode streams.""" + + query: str + anime_id: str + episode: str + translation_type: Literal["sub", "dub"] = "sub" + server: Optional[str] = None + quality: Literal["1080", "720", "480", "360"] = "720" + subtitles: bool = True + + +@dataclass(frozen=True) +class AnimeParams: + """Parameters for fetching anime details.""" + + id: str + # HACK: for the sake of providers which require previous data + query: str diff --git a/viu_media/libs/provider/anime/provider.py b/viu_media/libs/provider/anime/provider.py new file mode 100644 index 0000000..4784dab --- /dev/null +++ b/viu_media/libs/provider/anime/provider.py @@ -0,0 +1,72 @@ +import importlib +import logging + +from httpx import Client + +from .base import BaseAnimeProvider +from .types import ProviderName + +logger = logging.getLogger(__name__) + +PROVIDERS_AVAILABLE = { + "allanime": "provider.AllAnime", + "animepahe": "provider.AnimePahe", + "hianime": "provider.HiAnime", + "nyaa": "provider.Nyaa", + "yugen": "provider.Yugen", + "animeunity": "provider.AnimeUnity", +} + + +class AnimeProviderFactory: + """Factory for creating anime provider instances.""" + + @staticmethod + def create(provider_name: ProviderName) -> BaseAnimeProvider: + """ + Dynamically creates an instance of the specified anime provider. + + This method imports the necessary provider module, instantiates its main class, + and injects a pre-configured HTTP client. + + Args: + provider_name: The name of the provider to create (e.g., 'allanime'). + + Returns: + An instance of a class that inherits from BaseProvider. + + Raises: + ValueError: If the provider_name is not supported. + ImportError: If the provider module or class cannot be found. + """ + from ....core.utils.networking import random_user_agent + + # Correctly determine module and class name from the map + import_path = PROVIDERS_AVAILABLE[provider_name.value.lower()] + module_name, class_name = import_path.split(".", 1) + + # Construct the full package path for dynamic import + package_path = f"viu_media.libs.provider.anime.{provider_name.value.lower()}" + + try: + provider_module = importlib.import_module(f".{module_name}", package_path) + provider_class = getattr(provider_module, class_name) + except (ImportError, AttributeError) as e: + logger.error( + f"Failed to load provider '{provider_name.value.lower()}': {e}" + ) + raise ImportError( + f"Could not load provider '{provider_name.value.lower()}'. " + "Check the module path and class name in PROVIDERS_AVAILABLE." + ) from e + + # Each provider class requires an httpx.Client, which we set up here. + client = Client( + headers={"User-Agent": random_user_agent(), **provider_class.HEADERS} + ) + + return provider_class(client) + + +# Simple alias for ease of use, consistent with other factories in the codebase. +create_provider = AnimeProviderFactory.create diff --git a/viu_media/libs/provider/anime/types.py b/viu_media/libs/provider/anime/types.py new file mode 100644 index 0000000..3cf81ee --- /dev/null +++ b/viu_media/libs/provider/anime/types.py @@ -0,0 +1,119 @@ +from enum import Enum +from typing import List, Literal, Optional + +from pydantic import BaseModel, ConfigDict + +# from .allanime.types import Server as AllAnimeServer +# from .animepahe.types import Server as AnimePaheServer + + +# ENUMS +class ProviderName(Enum): + ALLANIME = "allanime" + ANIMEPAHE = "animepahe" + ANIMEUNITY = "animeunity" + + +class ProviderServer(Enum): + TOP = "TOP" + + # AllAnimeServer values + SHAREPOINT = "sharepoint" + DROPBOX = "dropbox" + GOGOANIME = "gogoanime" + WETRANSFER = "weTransfer" + WIXMP = "wixmp" + YT = "Yt" + MP4_UPLOAD = "mp4-upload" + + # AnimePaheServer values + KWIK = "kwik" + + # AnimeUnityServer values + VIXCLOUD = "vixcloud" + + +class MediaTranslationType(Enum): + SUB = "sub" + DUB = "dub" + RAW = "raw" + + +# MODELS +class BaseAnimeProviderModel(BaseModel): + model_config = ConfigDict(frozen=True) + + +class PageInfo(BaseAnimeProviderModel): + total: Optional[int] = None + per_page: Optional[int] = None + current_page: Optional[int] = None + + +class AnimeEpisodes(BaseAnimeProviderModel): + sub: List[str] + dub: List[str] = [] + raw: List[str] = [] + + +class SearchResult(BaseAnimeProviderModel): + id: str + title: str + episodes: AnimeEpisodes + other_titles: List[str] = [] + media_type: Optional[str] = None + score: Optional[float] = None + status: Optional[str] = None + season: Optional[str] = None + poster: Optional[str] = None + year: Optional[str] = None + + +class SearchResults(BaseAnimeProviderModel): + page_info: PageInfo + results: List[SearchResult] + + +class AnimeEpisodeInfo(BaseAnimeProviderModel): + id: str + episode: str + session_id: Optional[str] = None + title: Optional[str] = None + poster: Optional[str] = None + duration: Optional[str] = None + + +class Anime(BaseAnimeProviderModel): + id: str + title: str + episodes: AnimeEpisodes + type: Optional[str] = None + episodes_info: List[AnimeEpisodeInfo] | None = None + poster: Optional[str] = None + year: Optional[str] = None + + +class EpisodeStream(BaseAnimeProviderModel): + # episode: str + link: str + title: Optional[str] = None + quality: Literal["360", "480", "720", "1080"] = "720" + translation_type: MediaTranslationType = MediaTranslationType.SUB + format: Optional[str] = None + hls: Optional[bool] = None + mp4: Optional[bool] = None + priority: Optional[int] = None + + +class Subtitle(BaseAnimeProviderModel): + url: str + language: Optional[str] = None + + +class Server(BaseAnimeProviderModel): + name: str + links: List[EpisodeStream] + episode_title: Optional[str] = None + headers: dict[str, str] = dict() + subtitles: List[Subtitle] = [] + audio: List[str] = [] diff --git a/viu_media/libs/provider/anime/utils/debug.py b/viu_media/libs/provider/anime/utils/debug.py new file mode 100644 index 0000000..bde1969 --- /dev/null +++ b/viu_media/libs/provider/anime/utils/debug.py @@ -0,0 +1,90 @@ +import functools +import logging +import os +from typing import Type + +from ..base import BaseAnimeProvider + +logger = logging.getLogger(__name__) + + +def debug_provider(provider_function): + @functools.wraps(provider_function) + def _provider_function_wrapper(self, *args, **kwargs): + provider_name = self.__class__.__name__.upper() + if not os.environ.get("VIU_DEBUG"): + try: + return provider_function(self, *args, **kwargs) + except Exception as e: + logger.error(f"[{provider_name}@{provider_function.__name__}]: {e}") + else: + return provider_function(self, *args, **kwargs) + + return _provider_function_wrapper + + +def test_anime_provider(AnimeProvider: Type[BaseAnimeProvider]): + import shutil + import subprocess + + from httpx import Client + + from .....core.constants import APP_ASCII_ART + from .....core.utils.networking import random_user_agent + from ..params import AnimeParams, EpisodeStreamsParams, SearchParams + + anime_provider = AnimeProvider( + Client(headers={"User-Agent": random_user_agent(), **AnimeProvider.HEADERS}) + ) + print(APP_ASCII_ART.read_text(encoding="utf-8")) + query = input("What anime would you like to stream: ") + search_results = anime_provider.search(SearchParams(query=query)) + if not search_results: + return + for i, search_result in enumerate(search_results.results): + print(f"{i + 1}: {search_result.title}") + result = search_results.results[ + int(input(f"Select result (1-{len(search_results.results)}): ")) - 1 + ] + anime = anime_provider.get(AnimeParams(id=result.id, query=query)) + + if not anime: + return + translation_type = input("Preferred Translation Type: [dub,sub,raw]: ") + for episode in getattr(anime.episodes, translation_type): + print(episode) + episode_number = input("What episode do you wish to watch: ") + episode_streams = anime_provider.episode_streams( + EpisodeStreamsParams( + query=query, + anime_id=anime.id, + episode=episode_number, + translation_type=translation_type, # type:ignore + ) + ) + + if not episode_streams: + return + episode_streams = list(episode_streams) + for i, stream in enumerate(episode_streams): + print(f"{i + 1}: {stream.name}") + stream = episode_streams[int(input("Select your preferred server: ")) - 1] + for i, link in enumerate(stream.links): + print(f"{i + 1}: {link.quality}") + link = stream.links[int(input("Select your preferred quality: ")) - 1] + if executable := shutil.which("mpv"): + cmd = executable + elif executable := shutil.which("xdg-open"): + cmd = executable + elif executable := shutil.which("open"): + cmd = executable + else: + return + + print( + "Now streaming: ", + anime.title, + "Episode: ", + stream.episode_title if stream.episode_title else episode_number, + ) + subprocess.run([cmd, link.link]) diff --git a/viu_media/libs/provider/scraping/__init__.py b/viu_media/libs/provider/scraping/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/viu_media/libs/provider/scraping/html_parser.py b/viu_media/libs/provider/scraping/html_parser.py new file mode 100644 index 0000000..908e2d6 --- /dev/null +++ b/viu_media/libs/provider/scraping/html_parser.py @@ -0,0 +1,494 @@ +# pyright: reportAttributeAccessIssue=false, reportPossiblyUnboundVariable=false +""" +HTML parsing utilities with optional lxml support. + +This module provides comprehensive HTML parsing capabilities using either +Python's built-in html.parser or lxml for better performance when available. +""" + +# TODO: Review and optimize the HTML parsing logic for better performance and flexibility. +# Consider adding more utility functions for common HTML manipulation tasks. +import logging +import re +from html.parser import HTMLParser as BaseHTMLParser +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union + +if TYPE_CHECKING: + from lxml import etree + +logger = logging.getLogger(__name__) + +# Try to import lxml +HAS_LXML = False +try: + from lxml import etree, html as lxml_html + + HAS_LXML = True + logger.debug("lxml is available and will be used for HTML parsing") +except ImportError: + logger.debug("lxml not available, falling back to html.parser") + + +class HTMLParserConfig: + """Configuration for HTML parser selection.""" + + def __init__(self, use_lxml: Optional[bool] = None): + """ + Initialize parser configuration. + + Args: + use_lxml: Force use of lxml (True), html.parser (False), or auto-detect (None) + """ + if use_lxml is None: + self.use_lxml = HAS_LXML + else: + self.use_lxml = use_lxml and HAS_LXML + + if use_lxml and not HAS_LXML: + logger.warning( + "lxml requested but not available, falling back to html.parser" + ) + + +class HTMLParser: + """ + Comprehensive HTML parser with optional lxml support. + + Provides a unified interface for HTML parsing operations regardless + of the underlying parser implementation. + """ + + def __init__(self, config: Optional[HTMLParserConfig] = None): + """Initialize the HTML parser with configuration.""" + self.config = config or HTMLParserConfig() + + def parse(self, html_content: str) -> Union[Any, "ParsedHTML"]: + """ + Parse HTML content and return a parsed tree. + + Args: + html_content: Raw HTML string to parse + + Returns: + Parsed HTML tree (lxml Element or custom ParsedHTML object) + """ + if self.config.use_lxml: + return self._parse_with_lxml(html_content) + else: + return self._parse_with_builtin(html_content) + + def _parse_with_lxml(self, html_content: str) -> Any: + """Parse HTML using lxml.""" + try: + # Use lxml's HTML parser which is more lenient + return lxml_html.fromstring(html_content) + except Exception as e: + logger.warning(f"lxml parsing failed: {e}, falling back to html.parser") + return self._parse_with_builtin(html_content) + + def _parse_with_builtin(self, html_content: str) -> "ParsedHTML": + """Parse HTML using Python's built-in parser.""" + parser = BuiltinHTMLParser() + parser.feed(html_content) + return ParsedHTML(parser.elements, html_content) + + +class BuiltinHTMLParser(BaseHTMLParser): + """Enhanced HTML parser using Python's built-in capabilities.""" + + def __init__(self): + super().__init__() + self.elements = [] + self.current_element = None + self.element_stack = [] + + def handle_starttag(self, tag: str, attrs: List[Tuple[str, Optional[str]]]): + """Handle opening tags.""" + element = { + "tag": tag, + "attrs": dict(attrs), + "text": "", + "children": [], + "start_pos": self.getpos(), + } + + if self.element_stack: + self.element_stack[-1]["children"].append(element) + else: + self.elements.append(element) + + self.element_stack.append(element) + + def handle_endtag(self, tag: str): + """Handle closing tags.""" + if self.element_stack and self.element_stack[-1]["tag"] == tag: + element = self.element_stack.pop() + element["end_pos"] = self.getpos() + + def handle_data(self, data: str): + """Handle text content.""" + if self.element_stack: + self.element_stack[-1]["text"] += data + + +class ParsedHTML: + """Wrapper for parsed HTML using built-in parser.""" + + def __init__(self, elements: List[Dict], raw_html: str): + self.elements = elements + self.raw_html = raw_html + + def find_by_id(self, element_id: str) -> Optional[Dict]: + """Find element by ID.""" + return self._find_recursive( + self.elements, lambda el: el["attrs"].get("id") == element_id + ) + + def find_by_class(self, class_name: str) -> List[Dict]: + """Find elements by class name.""" + results = [] + self._find_all_recursive( + self.elements, + lambda el: class_name in el["attrs"].get("class", "").split(), + results, + ) + return results + + def find_by_tag(self, tag_name: str) -> List[Dict]: + """Find elements by tag name.""" + results = [] + self._find_all_recursive( + self.elements, lambda el: el["tag"].lower() == tag_name.lower(), results + ) + return results + + def _find_recursive(self, elements: List[Dict], condition) -> Optional[Dict]: + """Recursively find first element matching condition.""" + for element in elements: + if condition(element): + return element + result = self._find_recursive(element["children"], condition) + if result: + return result + return None + + def _find_all_recursive(self, elements: List[Dict], condition, results: List[Dict]): + """Recursively find all elements matching condition.""" + for element in elements: + if condition(element): + results.append(element) + self._find_all_recursive(element["children"], condition, results) + + +# Global parser instance +_default_parser = HTMLParser() + + +def extract_attributes(html_element: str) -> Dict[str, str]: + """ + Extract attributes from an HTML element string. + + Args: + html_element: HTML element as string (e.g., '
') + + Returns: + Dictionary of attribute name-value pairs + + Examples: + >>> extract_attributes('
') + {'class': 'test', 'id': 'main'} + """ + if not html_element: + return {} + + # Use regex to extract attributes from HTML string + attr_pattern = r'(\w+)=(["\'])([^"\']*?)\2' + matches = re.findall(attr_pattern, html_element) + + attributes = {} + for match in matches: + attr_name, _, attr_value = match + attributes[attr_name] = attr_value + + # Handle attributes without quotes + unquoted_pattern = r"(\w+)=([^\s>]+)" + unquoted_matches = re.findall(unquoted_pattern, html_element) + for attr_name, attr_value in unquoted_matches: + if attr_name not in attributes: + attributes[attr_name] = attr_value + + return attributes + + +def get_element_by_id(element_id: str, html_content: str) -> Optional[str]: + """ + Get HTML element by ID. + + Args: + element_id: The ID attribute value to search for + html_content: HTML content to search in + + Returns: + HTML string of the element or None if not found + + Examples: + >>> html = '
Content
' + >>> get_element_by_id("test", html) + '
Content
' + """ + parsed = _default_parser.parse(html_content) + + if _default_parser.config.use_lxml and HAS_LXML: + try: + element = parsed.xpath(f'//*[@id="{element_id}"]') + if element: + return etree.tostring(element[0], encoding="unicode", method="html") + except Exception as e: + logger.warning(f"lxml XPath search failed: {e}") + return None + else: + element = parsed.find_by_id(element_id) + if element: + return _element_to_html(element, html_content) + + return None + + +def get_element_by_tag(tag_name: str, html_content: str) -> Optional[str]: + """ + Get first HTML element by tag name. + + Args: + tag_name: The tag name to search for + html_content: HTML content to search in + + Returns: + HTML string of the element or None if not found + """ + parsed = _default_parser.parse(html_content) + + if _default_parser.config.use_lxml and HAS_LXML: + try: + elements = parsed.xpath(f"//{tag_name}") + if elements: + return etree.tostring(elements[0], encoding="unicode", method="html") + except Exception as e: + logger.warning(f"lxml XPath search failed: {e}") + return None + else: + elements = parsed.find_by_tag(tag_name) + if elements: + return _element_to_html(elements[0], html_content) + + return None + + +def get_element_by_class(class_name: str, html_content: str) -> Optional[str]: + """ + Get first HTML element by class name. + + Args: + class_name: The class name to search for + html_content: HTML content to search in + + Returns: + HTML string of the element or None if not found + """ + parsed = _default_parser.parse(html_content) + + if _default_parser.config.use_lxml and HAS_LXML: + try: + elements = parsed.xpath(f'//*[contains(@class, "{class_name}")]') + if elements: + return etree.tostring(elements[0], encoding="unicode", method="html") + except Exception as e: + logger.warning(f"lxml XPath search failed: {e}") + return None + else: + elements = parsed.find_by_class(class_name) + if elements: + return _element_to_html(elements[0], html_content) + + return None + + +def get_elements_by_tag(tag_name: str, html_content: str) -> List[str]: + """ + Get all HTML elements by tag name. + + Args: + tag_name: The tag name to search for + html_content: HTML content to search in + + Returns: + List of HTML strings for matching elements + """ + parsed = _default_parser.parse(html_content) + results = [] + + if _default_parser.config.use_lxml and HAS_LXML: + try: + elements = parsed.xpath(f"//{tag_name}") + for element in elements: + results.append( + etree.tostring(element, encoding="unicode", method="html") + ) + except Exception as e: + logger.warning(f"lxml XPath search failed: {e}") + else: + elements = parsed.find_by_tag(tag_name) + for element in elements: + results.append(_element_to_html(element, html_content)) + + return results + + +def get_elements_by_class(class_name: str, html_content: str) -> List[str]: + """ + Get all HTML elements by class name. + + Args: + class_name: The class name to search for + html_content: HTML content to search in + + Returns: + List of HTML strings for matching elements + """ + parsed = _default_parser.parse(html_content) + results = [] + + if _default_parser.config.use_lxml and HAS_LXML: + try: + elements = parsed.xpath(f'//*[contains(@class, "{class_name}")]') + for element in elements: + results.append( + etree.tostring(element, encoding="unicode", method="html") + ) + except Exception as e: + logger.warning(f"lxml XPath search failed: {e}") + else: + elements = parsed.find_by_class(class_name) + for element in elements: + results.append(_element_to_html(element, html_content)) + + return results + + +def get_elements_html_by_class(class_name: str, html_content: str) -> List[str]: + """ + Get HTML strings of elements by class name. + + This is an alias for get_elements_by_class for yt-dlp compatibility. + + Args: + class_name: The class name to search for + html_content: HTML content to search in + + Returns: + List of HTML strings for matching elements + """ + return get_elements_by_class(class_name, html_content) + + +def get_element_text_and_html_by_tag( + tag_name: str, html_content: str +) -> Tuple[Optional[str], Optional[str]]: + """ + Get both text content and HTML of first element by tag name. + + Args: + tag_name: The tag name to search for + html_content: HTML content to search in + + Returns: + Tuple of (text_content, html_string) or (None, None) if not found + + Examples: + >>> html = '' + >>> get_element_text_and_html_by_tag("script", html) + ('alert("test");', '') + """ + parsed = _default_parser.parse(html_content) + + if _default_parser.config.use_lxml and HAS_LXML: + try: + elements = parsed.xpath(f"//{tag_name}") + if elements: + element = elements[0] + text = ( + element.text_content() + if hasattr(element, "text_content") + else (element.text or "") + ) + html_str = etree.tostring(element, encoding="unicode", method="html") + return text, html_str + except Exception as e: + logger.warning(f"lxml XPath search failed: {e}") + return None, None + else: + elements = parsed.find_by_tag(tag_name) + if elements: + element = elements[0] + text = _extract_text_content(element) + html_str = _element_to_html(element, html_content) + return text, html_str + + return None, None + + +def _element_to_html(element: Dict, original_html: str) -> str: + """ + Convert parsed element back to HTML string. + + This is a simplified implementation that reconstructs HTML from parsed data. + For production use, consider using lxml for better accuracy. + """ + if not element: + return "" + + # Build opening tag + tag = element["tag"] + attrs = element.get("attrs", {}) + attr_str = " ".join(f'{k}="{v}"' for k, v in attrs.items() if v is not None) + + if attr_str: + opening_tag = f"<{tag} {attr_str}>" + else: + opening_tag = f"<{tag}>" + + # Add text content + text = element.get("text", "") + + # Add children + children_html = "" + for child in element.get("children", []): + children_html += _element_to_html(child, original_html) + + # Build closing tag + closing_tag = f"" + + return f"{opening_tag}{text}{children_html}{closing_tag}" + + +def _extract_text_content(element: Dict) -> str: + """Extract all text content from element and its children.""" + text = element.get("text", "") + + for child in element.get("children", []): + text += _extract_text_content(child) + + return text + + +def configure_parser(use_lxml: Optional[bool] = None) -> None: + """ + Configure the global HTML parser. + + Args: + use_lxml: Force use of lxml (True), html.parser (False), or auto-detect (None) + """ + global _default_parser + _default_parser = HTMLParser(HTMLParserConfig(use_lxml)) + logger.info( + f"HTML parser configured: {'lxml' if _default_parser.config.use_lxml else 'html.parser'}" + ) diff --git a/viu_media/libs/provider/scraping/user_agents.py b/viu_media/libs/provider/scraping/user_agents.py new file mode 100644 index 0000000..4637aae --- /dev/null +++ b/viu_media/libs/provider/scraping/user_agents.py @@ -0,0 +1,235 @@ +""" +User agent utilities for web scraping. + +Provides functionality to generate random user agent strings +to avoid detection and blocking by websites. +""" + +import random +from typing import List, Optional + + +class UserAgentGenerator: + """ + Generator for realistic user agent strings. + + Provides a variety of common user agents from different browsers + and operating systems to help avoid detection. + """ + + # Common user agents for different browsers and OS combinations + USER_AGENTS = [ + # Chrome on Windows + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36", + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", + # Chrome on macOS + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36", + # Chrome on Linux + "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", + "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36", + # Firefox on Windows + "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:123.0) Gecko/20100101 Firefox/123.0", + "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:122.0) Gecko/20100101 Firefox/122.0", + # Firefox on macOS + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:123.0) Gecko/20100101 Firefox/123.0", + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:122.0) Gecko/20100101 Firefox/122.0", + # Firefox on Linux + "Mozilla/5.0 (X11; Linux x86_64; rv:123.0) Gecko/20100101 Firefox/123.0", + "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:123.0) Gecko/20100101 Firefox/123.0", + # Safari on macOS + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.3 Safari/605.1.15", + "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Safari/605.1.15", + # Edge on Windows + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0", + "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36 Edg/121.0.0.0", + # Mobile Chrome (Android) + "Mozilla/5.0 (Linux; Android 14; SM-G998B) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Mobile Safari/537.36", + "Mozilla/5.0 (Linux; Android 13; Pixel 7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Mobile Safari/537.36", + # Mobile Safari (iOS) + "Mozilla/5.0 (iPhone; CPU iPhone OS 17_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.3 Mobile/15E148 Safari/604.1", + "Mozilla/5.0 (iPad; CPU OS 17_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.3 Mobile/15E148 Safari/604.1", + ] + + # Browser-specific user agents for when you need a specific browser + CHROME_USER_AGENTS = [ + ua for ua in USER_AGENTS if "Chrome" in ua and "Edg" not in ua + ] + FIREFOX_USER_AGENTS = [ua for ua in USER_AGENTS if "Firefox" in ua] + SAFARI_USER_AGENTS = [ + ua for ua in USER_AGENTS if "Safari" in ua and "Chrome" not in ua + ] + EDGE_USER_AGENTS = [ua for ua in USER_AGENTS if "Edg" in ua] + + # Platform-specific user agents + WINDOWS_USER_AGENTS = [ua for ua in USER_AGENTS if "Windows NT" in ua] + MACOS_USER_AGENTS = [ua for ua in USER_AGENTS if "Macintosh" in ua] + LINUX_USER_AGENTS = [ + ua for ua in USER_AGENTS if "Linux" in ua and "Android" not in ua + ] + MOBILE_USER_AGENTS = [ua for ua in USER_AGENTS if "Mobile" in ua or "Android" in ua] + + def __init__(self, seed: Optional[int] = None): + """ + Initialize the user agent generator. + + Args: + seed: Random seed for reproducible results (optional) + """ + if seed is not None: + random.seed(seed) + + def random(self) -> str: + """ + Get a random user agent string. + + Returns: + Random user agent string + """ + return random.choice(self.USER_AGENTS) + + def random_browser(self, browser: str) -> str: + """ + Get a random user agent for a specific browser. + + Args: + browser: Browser name ('chrome', 'firefox', 'safari', 'edge') + + Returns: + Random user agent string for the specified browser + + Raises: + ValueError: If browser is not supported + """ + browser = browser.lower() + if browser == "chrome": + return random.choice(self.CHROME_USER_AGENTS) + elif browser == "firefox": + return random.choice(self.FIREFOX_USER_AGENTS) + elif browser == "safari": + return random.choice(self.SAFARI_USER_AGENTS) + elif browser == "edge": + return random.choice(self.EDGE_USER_AGENTS) + else: + raise ValueError(f"Unsupported browser: {browser}") + + def random_platform(self, platform: str) -> str: + """ + Get a random user agent for a specific platform. + + Args: + platform: Platform name ('windows', 'macos', 'linux', 'mobile') + + Returns: + Random user agent string for the specified platform + + Raises: + ValueError: If platform is not supported + """ + platform = platform.lower() + if platform == "windows": + return random.choice(self.WINDOWS_USER_AGENTS) + elif platform in ("macos", "mac"): + return random.choice(self.MACOS_USER_AGENTS) + elif platform == "linux": + return random.choice(self.LINUX_USER_AGENTS) + elif platform == "mobile": + return random.choice(self.MOBILE_USER_AGENTS) + else: + raise ValueError(f"Unsupported platform: {platform}") + + def add_user_agent(self, user_agent: str) -> None: + """ + Add a custom user agent to the list. + + Args: + user_agent: Custom user agent string to add + """ + if user_agent not in self.USER_AGENTS: + self.USER_AGENTS.append(user_agent) + + def get_all(self) -> List[str]: + """ + Get all available user agent strings. + + Returns: + List of all user agent strings + """ + return self.USER_AGENTS.copy() + + +# Global instance for convenience +_default_generator = UserAgentGenerator() + + +def random_user_agent() -> str: + """ + Get a random user agent string using the default generator. + + Returns: + Random user agent string + + Examples: + >>> ua = random_user_agent() + >>> "Mozilla" in ua + True + """ + return _default_generator.random() + + +def random_user_agent_browser(browser: str) -> str: + """ + Get a random user agent for a specific browser. + + Args: + browser: Browser name ('chrome', 'firefox', 'safari', 'edge') + + Returns: + Random user agent string for the specified browser + """ + return _default_generator.random_browser(browser) + + +def random_user_agent_platform(platform: str) -> str: + """ + Get a random user agent for a specific platform. + + Args: + platform: Platform name ('windows', 'macos', 'linux', 'mobile') + + Returns: + Random user agent string for the specified platform + """ + return _default_generator.random_platform(platform) + + +def set_user_agent_seed(seed: int) -> None: + """ + Set the random seed for user agent generation. + + Args: + seed: Random seed value + """ + global _default_generator + _default_generator = UserAgentGenerator(seed) + + +def add_custom_user_agent(user_agent: str) -> None: + """ + Add a custom user agent to the default generator. + + Args: + user_agent: Custom user agent string to add + """ + _default_generator.add_user_agent(user_agent) + + +def get_all_user_agents() -> List[str]: + """ + Get all available user agent strings from the default generator. + + Returns: + List of all user agent strings + """ + return _default_generator.get_all() diff --git a/viu_media/libs/provider/scraping/utils.py b/viu_media/libs/provider/scraping/utils.py new file mode 100644 index 0000000..b241bcd --- /dev/null +++ b/viu_media/libs/provider/scraping/utils.py @@ -0,0 +1,272 @@ +""" +Encoding and utility functions for web scraping. + +Provides various encoding utilities including base-N encoding +that was previously sourced from yt-dlp. +""" + +import string +from typing import Optional + + +def encode_base_n(num: int, n: int, table: Optional[str] = None) -> str: + """ + Encode a number in base-n representation. + + Args: + num: The number to encode + n: The base to use for encoding + table: Custom character table (optional) + + Returns: + String representation of the number in base-n + + Examples: + >>> encode_base_n(255, 16) + 'ff' + >>> encode_base_n(42, 36) + '16' + """ + if table is None: + # Default table: 0-9, a-z + table = string.digits + string.ascii_lowercase + + if not 2 <= n <= len(table): + raise ValueError(f"Base must be between 2 and {len(table)}") + + if num == 0: + return table[0] + + result = [] + is_negative = num < 0 + num = abs(num) + + while num > 0: + result.append(table[num % n]) + num //= n + + if is_negative: + result.append("-") + + return "".join(reversed(result)) + + +def decode_base_n(encoded: str, n: int, table: Optional[str] = None) -> int: + """ + Decode a base-n encoded string back to an integer. + + Args: + encoded: The base-n encoded string + n: The base used for encoding + table: Custom character table (optional) + + Returns: + The decoded integer + + Examples: + >>> decode_base_n('ff', 16) + 255 + >>> decode_base_n('16', 36) + 42 + """ + if table is None: + table = string.digits + string.ascii_lowercase + + if not 2 <= n <= len(table): + raise ValueError(f"Base must be between 2 and {len(table)}") + + if not encoded: + return 0 + + is_negative = encoded.startswith("-") + if is_negative: + encoded = encoded[1:] + + result = 0 + for i, char in enumerate(reversed(encoded.lower())): + if char not in table: + raise ValueError(f"Invalid character '{char}' for base {n}") + + digit_value = table.index(char) + if digit_value >= n: + raise ValueError(f"Invalid digit '{char}' for base {n}") + + result += digit_value * (n**i) + + return -result if is_negative else result + + +def url_encode(text: str, safe: str = "") -> str: + """ + URL encode a string. + + Args: + text: Text to encode + safe: Characters that should not be encoded + + Returns: + URL encoded string + """ + import urllib.parse + + return urllib.parse.quote(text, safe=safe) + + +def url_decode(text: str) -> str: + """ + URL decode a string. + + Args: + text: URL encoded text to decode + + Returns: + Decoded string + """ + import urllib.parse + + return urllib.parse.unquote(text) + + +def html_unescape(text: str) -> str: + """ + Unescape HTML entities in text. + + Args: + text: Text containing HTML entities + + Returns: + Text with HTML entities unescaped + + Examples: + >>> html_unescape('"Hello" & <World>') + '"Hello" & ' + """ + import html + + return html.unescape(text) + + +def strip_tags(html_content: str) -> str: + """ + Remove all HTML tags from content, leaving only text. + + Args: + html_content: HTML content with tags + + Returns: + Plain text with tags removed + + Examples: + >>> strip_tags('

Hello world!

') + 'Hello world!' + """ + import re + + return re.sub(r"<[^>]+>", "", html_content) + + +def normalize_whitespace(text: str) -> str: + """ + Normalize whitespace in text by collapsing multiple spaces and removing leading/trailing whitespace. + + Args: + text: Text to normalize + + Returns: + Text with normalized whitespace + + Examples: + >>> normalize_whitespace(' Hello world \\n\\t ') + 'Hello world' + """ + import re + + return re.sub(r"\s+", " ", text.strip()) + + +def extract_domain(url: str) -> str: + """ + Extract domain from a URL. + + Args: + url: Full URL + + Returns: + Domain portion of the URL + + Examples: + >>> extract_domain('https://example.com/path?query=1') + 'example.com' + """ + import urllib.parse + + parsed = urllib.parse.urlparse(url) + return parsed.netloc + + +def join_url(base: str, path: str) -> str: + """ + Join a base URL with a path. + + Args: + base: Base URL + path: Path to join + + Returns: + Combined URL + + Examples: + >>> join_url('https://example.com', '/api/data') + 'https://example.com/api/data' + """ + import urllib.parse + + return urllib.parse.urljoin(base, path) + + +def parse_query_string(query: str) -> dict: + """ + Parse a query string into a dictionary. + + Args: + query: Query string (with or without leading '?') + + Returns: + Dictionary of query parameters + + Examples: + >>> parse_query_string('?name=John&age=30') + {'name': ['John'], 'age': ['30']} + """ + import urllib.parse + + if query.startswith("?"): + query = query[1:] + return urllib.parse.parse_qs(query) + + +def build_query_string(params: dict) -> str: + """ + Build a query string from a dictionary of parameters. + + Args: + params: Dictionary of parameters + + Returns: + URL-encoded query string + + Examples: + >>> build_query_string({'name': 'John', 'age': 30}) + 'name=John&age=30' + """ + import urllib.parse + + # Handle both single values and lists + normalized_params = {} + for key, value in params.items(): + if isinstance(value, (list, tuple)): + normalized_params[key] = value + else: + normalized_params[key] = [str(value)] + + return urllib.parse.urlencode(normalized_params, doseq=True)