diff --git a/viu_media/libs/provider/__init__.py b/viu_media/libs/provider/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/viu_media/libs/provider/anime/__init__.py b/viu_media/libs/provider/anime/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/viu_media/libs/provider/anime/allanime/__init__.py b/viu_media/libs/provider/anime/allanime/__init__.py deleted file mode 100644 index 8b13789..0000000 --- a/viu_media/libs/provider/anime/allanime/__init__.py +++ /dev/null @@ -1 +0,0 @@ - diff --git a/viu_media/libs/provider/anime/allanime/constants.py b/viu_media/libs/provider/anime/allanime/constants.py deleted file mode 100644 index cbe3e4e..0000000 --- a/viu_media/libs/provider/anime/allanime/constants.py +++ /dev/null @@ -1,39 +0,0 @@ -import re - -from .....core.constants import GRAPHQL_DIR - -SERVERS_AVAILABLE = [ - "sharepoint", - "dropbox", - "gogoanime", - "weTransfer", - "wixmp", - "Yt", - "mp4-upload", -] -API_BASE_URL = "allanime.day" -API_GRAPHQL_REFERER = "https://allanime.to/" -API_GRAPHQL_ENDPOINT = f"https://api.{API_BASE_URL}/api/" -API_GRAPHQL_HEADERS= { - "User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", - "Content-Type": "application/json", - "Origin": f"{API_GRAPHQL_REFERER}", -} - -# search constants -DEFAULT_COUNTRY_OF_ORIGIN = "all" -DEFAULT_NSFW = True -DEFAULT_UNKNOWN = True -DEFAULT_PER_PAGE = 40 -DEFAULT_PAGE = 1 - -# regex stuff -MP4_SERVER_JUICY_STREAM_REGEX = re.compile( - r"video/mp4\",src:\"(https?://.*/video\.mp4)\"" -) - -# graphql files -_GQL_QUERIES = GRAPHQL_DIR / "allanime" / "queries" -SEARCH_GQL = _GQL_QUERIES / "search.gql" -ANIME_GQL = _GQL_QUERIES / "anime.gql" -EPISODE_GQL = _GQL_QUERIES / "episodes.gql" diff --git a/viu_media/libs/provider/anime/allanime/extractors/__init__.py b/viu_media/libs/provider/anime/allanime/extractors/__init__.py deleted file mode 100644 index c857165..0000000 --- a/viu_media/libs/provider/anime/allanime/extractors/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from .extractor import extract_server - -__all__ = ["extract_server"] diff --git a/viu_media/libs/provider/anime/allanime/extractors/ak.py b/viu_media/libs/provider/anime/allanime/extractors/ak.py deleted file mode 100644 index deb2dec..0000000 --- a/viu_media/libs/provider/anime/allanime/extractors/ak.py +++ /dev/null @@ -1,31 +0,0 @@ -from ...types import EpisodeStream, Server -from ..constants import API_BASE_URL -from ..types import AllAnimeEpisode, AllAnimeSource -from .base import BaseExtractor - - -class AkExtractor(BaseExtractor): - @classmethod - def extract( - cls, - url, - client, - episode_number: str, - episode: AllAnimeEpisode, - source: AllAnimeSource, - ) -> Server: - response = client.get( - f"https://{API_BASE_URL}{url.replace('clock', 'clock.json')}", - timeout=10, - ) - response.raise_for_status() - streams = response.json() - - return Server( - name="Ak", - links=[ - EpisodeStream(link=link, quality="1080") for link in streams["links"] - ], - episode_title=episode["notes"], - headers={"Referer": f"https://{API_BASE_URL}/"}, - ) diff --git a/viu_media/libs/provider/anime/allanime/extractors/base.py b/viu_media/libs/provider/anime/allanime/extractors/base.py deleted file mode 100644 index 0ad66a9..0000000 --- a/viu_media/libs/provider/anime/allanime/extractors/base.py +++ /dev/null @@ -1,20 +0,0 @@ -from abc import ABC, abstractmethod - -from httpx import Client - -from ...types import Server -from ..types import AllAnimeEpisode, AllAnimeSource - - -class BaseExtractor(ABC): - @classmethod - @abstractmethod - def extract( - cls, - url: str, - client: Client, - episode_number: str, - episode: AllAnimeEpisode, - source: AllAnimeSource, - ) -> Server | None: - pass diff --git a/viu_media/libs/provider/anime/allanime/extractors/dropbox.py b/viu_media/libs/provider/anime/allanime/extractors/dropbox.py deleted file mode 100644 index 6b779c6..0000000 --- a/viu_media/libs/provider/anime/allanime/extractors/dropbox.py +++ /dev/null @@ -1,31 +0,0 @@ -from ...types import EpisodeStream, Server -from ..constants import API_BASE_URL -from ..types import AllAnimeEpisode, AllAnimeSource -from .base import BaseExtractor - - -class SakExtractor(BaseExtractor): - @classmethod - def extract( - cls, - url, - client, - episode_number: str, - episode: AllAnimeEpisode, - source: AllAnimeSource, - ) -> Server: - response = client.get( - f"https://{API_BASE_URL}{url.replace('clock', 'clock.json')}", - timeout=10, - ) - response.raise_for_status() - streams = response.json() - - return Server( - name="dropbox", - links=[ - EpisodeStream(link=link, quality="1080") for link in streams["links"] - ], - episode_title=episode["notes"], - headers={"Referer": f"https://{API_BASE_URL}/"}, - ) diff --git a/viu_media/libs/provider/anime/allanime/extractors/extractor.py b/viu_media/libs/provider/anime/allanime/extractors/extractor.py deleted file mode 100644 index 92deccd..0000000 --- a/viu_media/libs/provider/anime/allanime/extractors/extractor.py +++ /dev/null @@ -1,65 +0,0 @@ -from httpx import Client - -from ...types import Server -from ..types import AllAnimeEpisode, AllAnimeSource -from ..utils import debug_extractor, logger, one_digit_symmetric_xor -from .ak import AkExtractor -from .dropbox import SakExtractor -from .filemoon import FmHlsExtractor, OkExtractor -from .gogoanime import Lufmp4Extractor -from .mp4_upload import Mp4Extractor -from .sharepoint import Smp4Extractor -from .streamsb import SsHlsExtractor -from .vid_mp4 import VidMp4Extractor -from .we_transfer import KirExtractor -from .wixmp import DefaultExtractor -from .yt_mp4 import YtExtractor - -AVAILABLE_SOURCES = { - "Sak": SakExtractor, - "S-mp4": Smp4Extractor, - "Luf-Mp4": Lufmp4Extractor, - "Default": DefaultExtractor, - "Yt-mp4": YtExtractor, - "Kir": KirExtractor, - "Mp4": Mp4Extractor, -} -OTHER_SOURCES = { - "Ak": AkExtractor, - "Vid-mp4": VidMp4Extractor, - "Ok": OkExtractor, - "Ss-Hls": SsHlsExtractor, - "Fm-Hls": FmHlsExtractor, -} - - -@debug_extractor -def extract_server( - client: Client, - episode_number: str, - episode: AllAnimeEpisode, - source: AllAnimeSource, -) -> Server | None: - url = source.get("sourceUrl") - if not url: - logger.debug(f"Url not found in source: {source}") - return - - if url.startswith("--"): - url = one_digit_symmetric_xor(56, url[2:]) - - logger.debug(f"Decrypting url for source: {source['sourceName']}") - if source["sourceName"] in OTHER_SOURCES: - logger.debug(f"Found {source['sourceName']} but ignoring") - return - - if source["sourceName"] not in AVAILABLE_SOURCES: - logger.debug( - f"Found {source['sourceName']} but did not expect it, its time to scrape lol" - ) - return - logger.debug(f"Found {source['sourceName']}") - - return AVAILABLE_SOURCES[source["sourceName"]].extract( - url, client, episode_number, episode, source - ) diff --git a/viu_media/libs/provider/anime/allanime/extractors/filemoon.py b/viu_media/libs/provider/anime/allanime/extractors/filemoon.py deleted file mode 100644 index 11eb735..0000000 --- a/viu_media/libs/provider/anime/allanime/extractors/filemoon.py +++ /dev/null @@ -1,62 +0,0 @@ -from ...types import EpisodeStream, Server -from ..constants import API_BASE_URL, MP4_SERVER_JUICY_STREAM_REGEX -from ..types import AllAnimeEpisode, AllAnimeSource -from .base import BaseExtractor - - -# TODO: requires decoding obsfucated js (filemoon) -class FmHlsExtractor(BaseExtractor): - @classmethod - def extract( - cls, - url, - client, - episode_number: str, - episode: AllAnimeEpisode, - source: AllAnimeSource, - ) -> Server: - response = client.get( - f"https://{API_BASE_URL}{url.replace('clock', 'clock.json')}", - timeout=10, - ) - response.raise_for_status() - - embed_html = response.text.replace(" ", "").replace("\n", "") - vid = MP4_SERVER_JUICY_STREAM_REGEX.search(embed_html) - if not vid: - raise Exception("") - return Server( - name="dropbox", - links=[EpisodeStream(link=vid.group(1), quality="1080")], - episode_title=episode["notes"], - headers={"Referer": "https://www.mp4upload.com/"}, - ) - - -# TODO: requires decoding obsfucated js (filemoon) -class OkExtractor(BaseExtractor): - @classmethod - def extract( - cls, - url, - client, - episode_number: str, - episode: AllAnimeEpisode, - source: AllAnimeSource, - ) -> Server: - response = client.get( - f"https://{API_BASE_URL}{url.replace('clock', 'clock.json')}", - timeout=10, - ) - response.raise_for_status() - - embed_html = response.text.replace(" ", "").replace("\n", "") - vid = MP4_SERVER_JUICY_STREAM_REGEX.search(embed_html) - if not vid: - raise Exception("") - return Server( - name="dropbox", - links=[EpisodeStream(link=vid.group(1), quality="1080")], - episode_title=episode["notes"], - headers={"Referer": "https://www.mp4upload.com/"}, - ) diff --git a/viu_media/libs/provider/anime/allanime/extractors/gogoanime.py b/viu_media/libs/provider/anime/allanime/extractors/gogoanime.py deleted file mode 100644 index 1fc4f03..0000000 --- a/viu_media/libs/provider/anime/allanime/extractors/gogoanime.py +++ /dev/null @@ -1,34 +0,0 @@ -from ...types import EpisodeStream, Server -from ..constants import API_BASE_URL -from ..types import AllAnimeEpisode, AllAnimeEpisodeStreams, AllAnimeSource -from .base import BaseExtractor - - -class Lufmp4Extractor(BaseExtractor): - @classmethod - def extract( - cls, - url, - client, - episode_number: str, - episode: AllAnimeEpisode, - source: AllAnimeSource, - ) -> Server: - response = client.get( - f"https://{API_BASE_URL}{url.replace('clock', 'clock.json')}", - timeout=10, - ) - response.raise_for_status() - streams: AllAnimeEpisodeStreams = response.json() - - return Server( - name="gogoanime", - links=[ - EpisodeStream( - link=stream["link"], quality="1080", format=stream["resolutionStr"] - ) - for stream in streams["links"] - ], - episode_title=episode["notes"], - headers={"Referer": f"https://{API_BASE_URL}/"}, - ) diff --git a/viu_media/libs/provider/anime/allanime/extractors/mp4_upload.py b/viu_media/libs/provider/anime/allanime/extractors/mp4_upload.py deleted file mode 100644 index 0dd60e0..0000000 --- a/viu_media/libs/provider/anime/allanime/extractors/mp4_upload.py +++ /dev/null @@ -1,32 +0,0 @@ -from ...types import EpisodeStream, Server -from ..constants import MP4_SERVER_JUICY_STREAM_REGEX -from ..utils import logger -from .base import BaseExtractor - - -class Mp4Extractor(BaseExtractor): - @classmethod - def extract(cls, url, client, episode_number, episode, source): - response = client.get(url, timeout=10, follow_redirects=True) - response.raise_for_status() - - embed_html = response.text.replace(" ", "").replace("\n", "") - - # NOTE: some of the video were deleted so the embed html will just be "Filewasdeleted" - vid = MP4_SERVER_JUICY_STREAM_REGEX.search(embed_html) - if not vid: - if embed_html == "Filewasdeleted": - logger.debug( - "Failed to extract stream url from mp4-uploads. Reason: Filewasdeleted" - ) - return - logger.debug( - f"Failed to extract stream url from mp4-uploads. Reason: unknown. Embed html: {embed_html}" - ) - return - return Server( - name="mp4-upload", - links=[EpisodeStream(link=vid.group(1), quality="1080")], - episode_title=episode["notes"], - headers={"Referer": "https://www.mp4upload.com/"}, - ) diff --git a/viu_media/libs/provider/anime/allanime/extractors/sharepoint.py b/viu_media/libs/provider/anime/allanime/extractors/sharepoint.py deleted file mode 100644 index a105d6c..0000000 --- a/viu_media/libs/provider/anime/allanime/extractors/sharepoint.py +++ /dev/null @@ -1,28 +0,0 @@ -from ...types import EpisodeStream, Server -from ..constants import API_BASE_URL -from ..types import AllAnimeEpisodeStreams -from .base import BaseExtractor - - -class Smp4Extractor(BaseExtractor): - @classmethod - def extract(cls, url, client, episode_number, episode, source): - response = client.get( - f"https://{API_BASE_URL}{url.replace('clock', 'clock.json')}", - timeout=10, - ) - response.raise_for_status() - streams: AllAnimeEpisodeStreams = response.json() - return Server( - name="sharepoint", - links=[ - EpisodeStream( - link=stream["link"], - quality="1080", - format=stream["resolutionStr"], - ) - for stream in streams["links"] - ], - episode_title=episode["notes"], - headers={"Referer": f"https://{API_BASE_URL}/"}, - ) diff --git a/viu_media/libs/provider/anime/allanime/extractors/streamsb.py b/viu_media/libs/provider/anime/allanime/extractors/streamsb.py deleted file mode 100644 index 56670c6..0000000 --- a/viu_media/libs/provider/anime/allanime/extractors/streamsb.py +++ /dev/null @@ -1,32 +0,0 @@ -from ...types import EpisodeStream, Server -from ..constants import API_BASE_URL -from ..types import AllAnimeEpisode, AllAnimeSource -from .base import BaseExtractor - - -class SsHlsExtractor(BaseExtractor): - @classmethod - def extract( - cls, - url, - client, - episode_number: str, - episode: AllAnimeEpisode, - source: AllAnimeSource, - ) -> Server: - # TODO: requires some serious work i think : ) - response = client.get( - url, - timeout=10, - ) - response.raise_for_status() - streams = response.json()["links"] - - return Server( - name="StreamSb", - links=[ - EpisodeStream(link=link, quality="1080") for link in streams["links"] - ], - episode_title=episode["notes"], - headers={"Referer": f"https://{API_BASE_URL}/"}, - ) diff --git a/viu_media/libs/provider/anime/allanime/extractors/vid_mp4.py b/viu_media/libs/provider/anime/allanime/extractors/vid_mp4.py deleted file mode 100644 index 6f9b51c..0000000 --- a/viu_media/libs/provider/anime/allanime/extractors/vid_mp4.py +++ /dev/null @@ -1,32 +0,0 @@ -from ...types import EpisodeStream, Server -from ..constants import API_BASE_URL -from ..types import AllAnimeEpisode, AllAnimeSource -from .base import BaseExtractor - - -# TODO: requires some serious work i think : ) -class VidMp4Extractor(BaseExtractor): - @classmethod - def extract( - cls, - url, - client, - episode_number: str, - episode: AllAnimeEpisode, - source: AllAnimeSource, - ) -> Server: - response = client.get( - f"https://{API_BASE_URL}{url.replace('clock', 'clock.json')}", - timeout=10, - ) - response.raise_for_status() - streams = response.json() - - return Server( - name="Vid-mp4", - links=[ - EpisodeStream(link=link, quality="1080") for link in streams["links"] - ], - episode_title=episode["notes"], - headers={"Referer": f"https://{API_BASE_URL}/"}, - ) diff --git a/viu_media/libs/provider/anime/allanime/extractors/we_transfer.py b/viu_media/libs/provider/anime/allanime/extractors/we_transfer.py deleted file mode 100644 index b723a5d..0000000 --- a/viu_media/libs/provider/anime/allanime/extractors/we_transfer.py +++ /dev/null @@ -1,31 +0,0 @@ -from ...types import EpisodeStream, Server -from ..constants import API_BASE_URL -from ..types import AllAnimeEpisode, AllAnimeSource -from .base import BaseExtractor - - -class KirExtractor(BaseExtractor): - @classmethod - def extract( - cls, - url, - client, - episode_number: str, - episode: AllAnimeEpisode, - source: AllAnimeSource, - ) -> Server: - response = client.get( - f"https://{API_BASE_URL}{url.replace('clock', 'clock.json')}", - timeout=10, - ) - response.raise_for_status() - streams = response.json() - - return Server( - name="weTransfer", - links=[ - EpisodeStream(link=link, quality="1080") for link in streams["links"] - ], - episode_title=episode["notes"], - headers={"Referer": f"https://{API_BASE_URL}/"}, - ) diff --git a/viu_media/libs/provider/anime/allanime/extractors/wixmp.py b/viu_media/libs/provider/anime/allanime/extractors/wixmp.py deleted file mode 100644 index 59e8b13..0000000 --- a/viu_media/libs/provider/anime/allanime/extractors/wixmp.py +++ /dev/null @@ -1,26 +0,0 @@ -from ...types import EpisodeStream, Server -from ..constants import API_BASE_URL -from ..types import AllAnimeEpisodeStreams -from .base import BaseExtractor - - -class DefaultExtractor(BaseExtractor): - @classmethod - def extract(cls, url, client, episode_number, episode, source): - response = client.get( - f"https://{API_BASE_URL}{url.replace('clock', 'clock.json')}", - timeout=10, - ) - response.raise_for_status() - streams: AllAnimeEpisodeStreams = response.json() - return Server( - name="wixmp", - links=[ - EpisodeStream( - link=stream["link"], quality="1080", format=stream["resolutionStr"] - ) - for stream in streams["links"] - ], - episode_title=episode["notes"], - headers={"Referer": f"https://{API_BASE_URL}/"}, - ) diff --git a/viu_media/libs/provider/anime/allanime/extractors/yt_mp4.py b/viu_media/libs/provider/anime/allanime/extractors/yt_mp4.py deleted file mode 100644 index 4e8fad3..0000000 --- a/viu_media/libs/provider/anime/allanime/extractors/yt_mp4.py +++ /dev/null @@ -1,22 +0,0 @@ -from ...types import EpisodeStream, Server -from ..constants import API_BASE_URL -from ..types import AllAnimeEpisode, AllAnimeSource -from .base import BaseExtractor - - -class YtExtractor(BaseExtractor): - @classmethod - def extract( - cls, - url, - client, - episode_number: str, - episode: AllAnimeEpisode, - source: AllAnimeSource, - ) -> Server: - return Server( - name="Yt", - links=[EpisodeStream(link=url, quality="1080")], - episode_title=episode["notes"], - headers={"Referer": f"https://{API_BASE_URL}/"}, - ) diff --git a/viu_media/libs/provider/anime/allanime/mappers.py b/viu_media/libs/provider/anime/allanime/mappers.py deleted file mode 100644 index 51cc591..0000000 --- a/viu_media/libs/provider/anime/allanime/mappers.py +++ /dev/null @@ -1,58 +0,0 @@ -from typing import Union - -from httpx import Response - -from ..types import ( - Anime, - AnimeEpisodes, - MediaTranslationType, - PageInfo, - SearchResult, - SearchResults, -) -from .types import AllAnimeSearchResults, AllAnimeShow - - -def generate_list(count: Union[int, str]) -> list[str]: - return list(map(str, range(int(count)))) - - -translation_type_map = { - "sub": MediaTranslationType.SUB, - "dub": MediaTranslationType.DUB, - "raw": MediaTranslationType.RAW, -} - - -def map_to_search_results(response: Response) -> SearchResults: - search_results: AllAnimeSearchResults = response.json()["data"] - return SearchResults( - page_info=PageInfo(total=search_results["shows"]["pageInfo"]["total"]), - results=[ - SearchResult( - id=result["_id"], - title=result["name"], - media_type=result["__typename"], - episodes=AnimeEpisodes( - sub=generate_list(result["availableEpisodes"]["sub"]), - dub=generate_list(result["availableEpisodes"]["dub"]), - raw=generate_list(result["availableEpisodes"]["raw"]), - ), - ) - for result in search_results["shows"]["edges"] - ], - ) - - -def map_to_anime_result(response: Response) -> Anime: - anime: AllAnimeShow = response.json()["data"]["show"] - return Anime( - id=anime["_id"], - title=anime["name"], - episodes=AnimeEpisodes( - sub=sorted(anime["availableEpisodesDetail"]["sub"], key=float), - dub=sorted(anime["availableEpisodesDetail"]["dub"], key=float), - raw=sorted(anime["availableEpisodesDetail"]["raw"], key=float), - ), - type=anime.get("__typename"), - ) diff --git a/viu_media/libs/provider/anime/allanime/provider.py b/viu_media/libs/provider/anime/allanime/provider.py deleted file mode 100644 index d4bf579..0000000 --- a/viu_media/libs/provider/anime/allanime/provider.py +++ /dev/null @@ -1,84 +0,0 @@ -import logging -from typing import TYPE_CHECKING - -from .....core.utils.graphql import execute_graphql -from ..base import BaseAnimeProvider -from ..utils.debug import debug_provider -from .constants import ( - ANIME_GQL, - API_GRAPHQL_ENDPOINT, - API_GRAPHQL_HEADERS, - API_GRAPHQL_REFERER, - EPISODE_GQL, - SEARCH_GQL, -) -from .mappers import ( - map_to_anime_result, - map_to_search_results, -) - -if TYPE_CHECKING: - from .types import AllAnimeEpisode -logger = logging.getLogger(__name__) - - -class AllAnime(BaseAnimeProvider): - HEADERS = {"Referer": API_GRAPHQL_REFERER} - - @debug_provider - def search(self, params): - response = execute_graphql( - API_GRAPHQL_ENDPOINT, - self.client, - SEARCH_GQL, - variables={ - "search": { - "allowAdult": params.allow_nsfw, - "allowUnknown": params.allow_unknown, - "query": params.query, - }, - "limit": params.page_limit, - "page": params.current_page, - "translationtype": params.translation_type, - "countryorigin": params.country_of_origin, - }, - headers=API_GRAPHQL_HEADERS - ) - return map_to_search_results(response) - - @debug_provider - def get(self, params): - response = execute_graphql( - API_GRAPHQL_ENDPOINT, - self.client, - ANIME_GQL, - variables={"showId": params.id}, - headers=API_GRAPHQL_HEADERS - ) - return map_to_anime_result(response) - - @debug_provider - def episode_streams(self, params): - from .extractors import extract_server - - episode_response = execute_graphql( - API_GRAPHQL_ENDPOINT, - self.client, - EPISODE_GQL, - variables={ - "showId": params.anime_id, - "translationType": params.translation_type, - "episodeString": params.episode, - }, - headers=API_GRAPHQL_HEADERS - ) - episode: AllAnimeEpisode = episode_response.json()["data"]["episode"] - for source in episode["sourceUrls"]: - if server := extract_server(self.client, params.episode, episode, source): - yield server - - -if __name__ == "__main__": - from ..utils.debug import test_anime_provider - - test_anime_provider(AllAnime) diff --git a/viu_media/libs/provider/anime/allanime/types.py b/viu_media/libs/provider/anime/allanime/types.py deleted file mode 100644 index 2c6f7d3..0000000 --- a/viu_media/libs/provider/anime/allanime/types.py +++ /dev/null @@ -1,111 +0,0 @@ -from enum import Enum -from typing import Literal, TypedDict - - -class Server(Enum): - SHAREPOINT = "sharepoint" - DROPBOX = "dropbox" - GOGOANIME = "gogoanime" - WETRANSFER = "weTransfer" - WIXMP = "wixmp" - YT = "Yt" - MP4_UPLOAD = "mp4-upload" - - -class AllAnimeEpisodesDetail(TypedDict): - dub: list[str] - sub: list[str] - raw: list[str] - - -class AllAnimeEpisodes(TypedDict): - dub: int - sub: int - raw: int - - -class AllAnimePageInfo(TypedDict): - total: int - - -class AllAnimeShow(TypedDict): - _id: str - name: str - availableEpisodesDetail: AllAnimeEpisodesDetail - __typename: str - - -class AllAnimeSearchResult(TypedDict): - _id: str - name: str - availableEpisodes: AllAnimeEpisodes - __typename: str | None - - -class AllAnimeShows(TypedDict): - pageInfo: AllAnimePageInfo - edges: list[AllAnimeSearchResult] - - -class AllAnimeSearchResults(TypedDict): - shows: AllAnimeShows - - -class AllAnimeSourceDownload(TypedDict): - sourceName: str - dowloadUrl: str - - -class AllAnimeSource(TypedDict): - sourceName: Literal[ - "Sak", - "S-mp4", - "Luf-mp4", - "Default", - "Yt-mp4", - "Kir", - "Mp4", - "Ak", - "Vid-mp4", - "Ok", - "Ss-Hls", - "Fm-Hls", - ] - sourceUrl: str - priority: float - sandbox: str - type: str - className: str - streamerId: str - downloads: AllAnimeSourceDownload - - -class AllAnimeEpisodeStream(TypedDict): - link: str - hls: bool - resolutionStr: str - fromCache: str - - -class AllAnimeEpisodeStreams(TypedDict): - links: list[AllAnimeEpisodeStream] - - -class AllAnimeEpisode(TypedDict): - episodeString: str - sourceUrls: list[AllAnimeSource] - notes: str | None - - -class AllAnimeStream: - link: str - mp4: bool - hls: bool | None - resolutionStr: str - fromCache: str - priority: int - headers: dict | None - - -class AllAnimeStreams: - links: list[AllAnimeStream] diff --git a/viu_media/libs/provider/anime/allanime/utils.py b/viu_media/libs/provider/anime/allanime/utils.py deleted file mode 100644 index f1e60b0..0000000 --- a/viu_media/libs/provider/anime/allanime/utils.py +++ /dev/null @@ -1,92 +0,0 @@ -import functools -import logging -import os -import re -from itertools import cycle - -logger = logging.getLogger(__name__) - -# Dictionary to map hex values to characters -hex_to_char = { - "01": "9", - "08": "0", - "05": "=", - "0a": "2", - "0b": "3", - "0c": "4", - "07": "?", - "00": "8", - "5c": "d", - "0f": "7", - "5e": "f", - "17": "/", - "54": "l", - "09": "1", - "48": "p", - "4f": "w", - "0e": "6", - "5b": "c", - "5d": "e", - "0d": "5", - "53": "k", - "1e": "&", - "5a": "b", - "59": "a", - "4a": "r", - "4c": "t", - "4e": "v", - "57": "o", - "51": "i", -} - - -def debug_extractor(extractor_function): - @functools.wraps(extractor_function) - def _provider_function_wrapper(*args): - if not os.environ.get("VIU_DEBUG"): - try: - return extractor_function(*args) - except Exception as e: - logger.error( - f"[AllAnime@Server={args[3].get('sourceName', 'UNKNOWN')}]: {e}" - ) - else: - return extractor_function(*args) - - return _provider_function_wrapper - - -def give_random_quality(links): - qualities = cycle(["1080", "720", "480", "360"]) - - return [ - {**episode_stream, "quality": quality} - for episode_stream, quality in zip(links, qualities, strict=False) - ] - - -def one_digit_symmetric_xor(password: int, target: str): - def genexp(): - for segment in bytearray.fromhex(target): - yield segment ^ password - - return bytes(genexp()).decode("utf-8") - - -def decode_hex_string(hex_string): - """some of the sources encrypt the urls into hex codes this function decrypts the urls - - Args: - hex_string ([TODO:parameter]): [TODO:description] - - Returns: - [TODO:return] - """ - # Split the hex string into pairs of characters - hex_pairs = re.findall("..", hex_string) - - # Decode each hex pair - decoded_chars = [hex_to_char.get(pair.lower(), pair) for pair in hex_pairs] - - # TODO: Better type handling - return "".join(decoded_chars) # type: ignore diff --git a/viu_media/libs/provider/anime/animepahe/__init__.py b/viu_media/libs/provider/anime/animepahe/__init__.py deleted file mode 100644 index 8b13789..0000000 --- a/viu_media/libs/provider/anime/animepahe/__init__.py +++ /dev/null @@ -1 +0,0 @@ - diff --git a/viu_media/libs/provider/anime/animepahe/constants.py b/viu_media/libs/provider/anime/animepahe/constants.py deleted file mode 100644 index 735fc09..0000000 --- a/viu_media/libs/provider/anime/animepahe/constants.py +++ /dev/null @@ -1,56 +0,0 @@ -import re - -ANIMEPAHE = "animepahe.pw" -ANIMEPAHE_BASE = f"https://{ANIMEPAHE}" -ANIMEPAHE_ENDPOINT = f"{ANIMEPAHE_BASE}/api" -CDN_PROVIDER = "kwik.cx" -CDN_PROVIDER_BASE = f"https://{CDN_PROVIDER}" - -SERVERS_AVAILABLE = ["kwik"] -REQUEST_HEADERS = { - "Cookie": "__ddgid_=VvX0ebHrH2DsFZo4; __ddgmark_=3savRpSVFhvZcn5x; __ddg2_=buBJ3c4pNBYKFZNp; __ddg1_=rbVADKr9URtt55zoIGFa; SERVERID=janna; XSRF-TOKEN=eyJpdiI6IjV5bFNtd0phUHgvWGJxc25wL0VJSUE9PSIsInZhbHVlIjoicEJTZktlR2hxR2JZTWhnL0JzazlvZU5TQTR2bjBWZ2dDb0RwUXVUUWNSclhQWUhLRStYSmJmWmUxWkpiYkFRYU12RjFWejlSWHorME1wZG5qQ1U0TnFlNnBFR2laQjN1MjdyNjc5TjVPdXdJb2o5VkU1bEduRW9pRHNDTHh6Sy8iLCJtYWMiOiI0OTc0ZmNjY2UwMGJkOWY2MWNkM2NlMjk2ZGMyZGJmMWE0NTdjZTdkNGI2Y2IwNTIzZmFiZWU5ZTE2OTk0YmU4IiwidGFnIjoiIn0%3D; laravel_session=eyJpdiI6ImxvdlpqREFnTjdaeFJubUlXQWlJVWc9PSIsInZhbHVlIjoiQnE4R3VHdjZ4M1NDdEVWM1ZqMUxtNnVERnJCcmtCUHZKNzRPR2RFbzNFcStTL29xdnVTbWhsNVRBUXEybVZWNU1UYVlTazFqYlN5UjJva1k4czNGaXBTbkJJK01oTUd3VHRYVHBoc3dGUWxHYnFlS2NJVVNFbTFqMVBWdFpuVUgiLCJtYWMiOiI1NDdjZTVkYmNhNjUwZTMxZmRlZmVmMmRlMGNiYjAwYjlmYjFjY2U0MDc1YTQzZThiMTIxMjJlYTg1NTA4YjBmIiwidGFnIjoiIn0%3D; latest=5592", - "Host": ANIMEPAHE, - "Accept": "application, text/javascript, */*; q=0.01", - "Accept-Encoding": "Utf-8", - "Referer": ANIMEPAHE_BASE, - "DNT": "1", - "Connection": "keep-alive", - "Sec-Fetch-Dest": "empty", - "Sec-Fetch-Site": "same-origin", - "Sec-Fetch-Mode": "cors", - "TE": "trailers", -} -SERVER_HEADERS = { - "Host": "kwik.cx", - "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/png,image/svg+xml,*/*;q=0.8", - "Accept-Language": "en-US,en;q=0.5", - "Accept-Encoding": "Utf-8", - "DNT": "1", - "Connection": "keep-alive", - "Referer": ANIMEPAHE_BASE + "/", - "Upgrade-Insecure-Requests": "1", - "Sec-Fetch-Dest": "iframe", - "Sec-Fetch-Mode": "navigate", - "Sec-Fetch-Site": "cross-site", - "Priority": "u=4", - "TE": "trailers", -} - -STREAM_HEADERS = { - # "Host": "vault-16.owocdn.top", # This will have to be the actual host of the stream (behind Kwik) - "Accept": "*/*", - "Accept-Language": "en-US,en;q=0.5", - "Accept-Encoding": "gzip, deflate, br, zstd", - "Origin": CDN_PROVIDER_BASE, - "Sec-GPC": "1", - "Connection": "keep-alive", - "Referer": CDN_PROVIDER_BASE + "/", - "Sec-Fetch-Dest": "empty", - "Sec-Fetch-Mode": "cors", - "Sec-Fetch-Site": "cross-site", - "TE": "trailers", -} - - -JUICY_STREAM_REGEX = re.compile(r"source='(.*)';") -KWIK_RE = re.compile(r"Player\|(.+?)'") diff --git a/viu_media/libs/provider/anime/animepahe/extractor.py b/viu_media/libs/provider/anime/animepahe/extractor.py deleted file mode 100644 index 349aa74..0000000 --- a/viu_media/libs/provider/anime/animepahe/extractor.py +++ /dev/null @@ -1,77 +0,0 @@ -import re - - -def animepahe_key_creator(c: int, a: int): - from ...scraping.utils import encode_base_n - - if c < a: - val_a = "" - else: - val_a = animepahe_key_creator(int(c / a), a) - c = c % a - if c > 35: - val_b = chr(c + 29) - else: - val_b = encode_base_n(c, 36) - return val_a + val_b - - -def animepahe_embed_decoder( - encoded_js_p: str, - base_a: int, - no_of_keys_c: int, - values_to_replace_with_k: list, -): - decode_mapper_d: dict = {} - for i in range(no_of_keys_c): - key = animepahe_key_creator(i, base_a) - val = values_to_replace_with_k[i] or key - decode_mapper_d[key] = val - return re.sub( - r"\b\w+\b", lambda match: decode_mapper_d[match.group(0)], encoded_js_p - ) - - -PARAMETERS_REGEX = re.compile(r"eval\(function\(p,a,c,k,e,d\)\{.*\}\((.*?)\)\)$") -ENCODE_JS_REGEX = re.compile(r"'(.*?);',(\d+),(\d+),'(.*)'\.split") - - -def process_animepahe_embed_page(embed_page: str): - from ...scraping.html_parser import get_element_text_and_html_by_tag - - encoded_js_string = "" - embed_page_content = embed_page - for _ in range(8): - text, html = get_element_text_and_html_by_tag("script", embed_page_content) - if not text and html: - embed_page_content = re.sub(html, "", embed_page_content) - continue - if text: - encoded_js_string = text.strip() - break - if not encoded_js_string: - return - obsfucated_js_parameter_match = PARAMETERS_REGEX.search(encoded_js_string) - if not obsfucated_js_parameter_match: - return - parameter_string = obsfucated_js_parameter_match.group(1) - encoded_js_parameter_string = ENCODE_JS_REGEX.search(parameter_string) - if not encoded_js_parameter_string: - return - p: str = encoded_js_parameter_string.group(1) - a: int = int(encoded_js_parameter_string.group(2)) - c: int = int(encoded_js_parameter_string.group(3)) - k: list = encoded_js_parameter_string.group(4).split("|") - return animepahe_embed_decoder(p, a, c, k).replace("\\", "") - - -if __name__ == "__main__": - # Testing time - filepath = input("Enter file name: ") - if filepath: - with open(filepath) as file: - data = file.read() - else: - data = """""" - - print(process_animepahe_embed_page(data)) diff --git a/viu_media/libs/provider/anime/animepahe/mappers.py b/viu_media/libs/provider/anime/animepahe/mappers.py deleted file mode 100644 index 4fa1c69..0000000 --- a/viu_media/libs/provider/anime/animepahe/mappers.py +++ /dev/null @@ -1,113 +0,0 @@ -import logging - -from ..types import ( - Anime, - AnimeEpisodeInfo, - AnimeEpisodes, - EpisodeStream, - MediaTranslationType, - PageInfo, - SearchResult, - SearchResults, - Server, -) -from .types import ( - AnimePaheAnimePage, - AnimePaheSearchPage, -) - -translation_type_map = { - "sub": MediaTranslationType.SUB, - "dub": MediaTranslationType.DUB, - "raw": MediaTranslationType.RAW, -} - -logger = logging.getLogger(__name__) - - -def map_to_search_results(data: AnimePaheSearchPage) -> SearchResults: - results = [] - for result in data["data"]: - results.append( - SearchResult( - id=result["session"], - title=result["title"], - episodes=AnimeEpisodes( - sub=list(map(str, range(1, result["episodes"] + 1))), - dub=list(map(str, range(1, result["episodes"] + 1))), - raw=list(map(str, range(1, result["episodes"] + 1))), - ), - media_type=result["type"], - score=result["score"], - status=result["status"], - season=result["season"], - poster=result["poster"], - year=str(result["year"]), - ) - ) - - return SearchResults( - page_info=PageInfo( - total=data["total"], - per_page=data["per_page"], - current_page=data["current_page"], - ), - results=results, - ) - - -def map_to_anime_result( - search_result: SearchResult, anime: AnimePaheAnimePage -) -> Anime: - episodes_info = [] - episodes = [] - anime["data"] = sorted(anime["data"], key=lambda k: float(k["episode"])) - for ep_info in anime["data"]: - episodes.append(str(ep_info["episode"])) - episodes_info.append( - AnimeEpisodeInfo( - id=str(ep_info["id"]), - session_id=ep_info["session"], - episode=str(ep_info["episode"]), - title=ep_info["title"], - poster=ep_info["snapshot"], - duration=str(ep_info["duration"]), - ) - ) - - return Anime( - id=search_result.id, - title=search_result.title, - episodes=AnimeEpisodes( - sub=episodes, - dub=episodes, - ), - year=str(search_result.year), - poster=search_result.poster, - episodes_info=episodes_info, - ) - - -def map_to_server( - episode: AnimeEpisodeInfo, - translation_type: str, - stream_links: list[tuple[str, str]], - headers: dict[str, str], -) -> Server: - links = [ - EpisodeStream( - link=link[1], - quality=link[0] if link[0] in ["360", "480", "720", "1080"] else "1080", # type:ignore - translation_type=translation_type_map[translation_type], - ) - for link in stream_links - ] - - # sort links by quality, best to worst - links.sort(key=lambda x: int(x.quality), reverse=True) - logger.debug(f"Aggregated links: {links}") - - return Server( - name="kwik", links=links, episode_title=episode.title, headers=headers - ) - diff --git a/viu_media/libs/provider/anime/animepahe/provider.py b/viu_media/libs/provider/anime/animepahe/provider.py deleted file mode 100644 index 3e0a432..0000000 --- a/viu_media/libs/provider/anime/animepahe/provider.py +++ /dev/null @@ -1,214 +0,0 @@ -import logging -from functools import lru_cache -from typing import Iterator, Optional -from urllib.parse import urlparse - -from ..base import BaseAnimeProvider -from ..params import AnimeParams, EpisodeStreamsParams, SearchParams -from ..types import Anime, AnimeEpisodeInfo, SearchResult, SearchResults, Server -from ..utils.debug import debug_provider -from .constants import ( - ANIMEPAHE_BASE, - ANIMEPAHE_ENDPOINT, - CDN_PROVIDER, - JUICY_STREAM_REGEX, - REQUEST_HEADERS, - SERVER_HEADERS, - STREAM_HEADERS, -) -from .extractor import process_animepahe_embed_page -from .mappers import map_to_anime_result, map_to_search_results, map_to_server -from .types import AnimePaheAnimePage, AnimePaheSearchPage - -logger = logging.getLogger(__name__) - - -class AnimePahe(BaseAnimeProvider): - HEADERS = REQUEST_HEADERS - - @debug_provider - def search(self, params: SearchParams) -> SearchResults | None: - return self._search(params) - - @lru_cache() - def _search(self, params: SearchParams) -> SearchResults | None: - url_params = {"m": "search", "q": params.query} - response = self.client.get(ANIMEPAHE_ENDPOINT, params=url_params) - response.raise_for_status() - data: AnimePaheSearchPage = response.json() - if not data.get("data"): - return - return map_to_search_results(data) - - @debug_provider - def get(self, params: AnimeParams) -> Anime | None: - return self._get_anime(params) - - @lru_cache() - def _get_anime(self, params: AnimeParams) -> Anime | None: - page = 1 - standardized_episode_number = 0 - - search_result = self._get_search_result(params) - if not search_result: - logger.error(f"No search result found for ID {params.id}") - return None - - anime: Optional[AnimePaheAnimePage] = None - - has_next_page = True - while has_next_page: - logger.debug(f"Loading page: {page}") - _anime_page = self._anime_page_loader( - m="release", - id=params.id, - sort="episode_asc", - page=page, - ) - - has_next_page = True if _anime_page["next_page_url"] else False - page += 1 - if not anime: - anime = _anime_page - else: - anime["data"].extend(_anime_page["data"]) - - if anime: - for episode in anime.get("data", []): - if episode["episode"] % 1 == 0: - standardized_episode_number += 1 - episode.update({"episode": standardized_episode_number}) - else: - standardized_episode_number += episode["episode"] % 1 - episode.update({"episode": standardized_episode_number}) - standardized_episode_number = int(standardized_episode_number) - - return map_to_anime_result(search_result, anime) - - @lru_cache() - def _get_search_result(self, params: AnimeParams) -> Optional[SearchResult]: - search_results = self._search(SearchParams(query=params.query)) - if not search_results or not search_results.results: - logger.error(f"No search results found for ID {params.id}") - return None - for search_result in search_results.results: - if search_result.id == params.id: - return search_result - - @lru_cache() - def _anime_page_loader(self, m, id, sort, page) -> AnimePaheAnimePage: - url_params = { - "m": m, - "id": id, - "sort": sort, - "page": page, - } - response = self.client.get(ANIMEPAHE_ENDPOINT, params=url_params) - response.raise_for_status() - return response.json() - - @debug_provider - def episode_streams(self, params: EpisodeStreamsParams) -> Iterator[Server] | None: - from ...scraping.html_parser import ( - extract_attributes, - get_element_by_id, - get_elements_html_by_class, - ) - - episode = self._get_episode_info(params) - if not episode: - logger.error( - f"Episode {params.episode} doesn't exist for anime {params.anime_id}" - ) - return - - url = f"{ANIMEPAHE_BASE}/play/{params.anime_id}/{episode.session_id}" - response = self.client.get(url, follow_redirects=True) - response.raise_for_status() - - c = get_element_by_id("resolutionMenu", response.text) - if not c: - logger.error("Resolution menu not found in the response") - return - resolutionMenuItems = get_elements_html_by_class("dropdown-item", c) - res_dicts = [extract_attributes(item) for item in resolutionMenuItems] - quality = None - translation_type = None - stream_links = [] - stream_host = None - - # TODO: better document the scraping process - for res_dict in res_dicts: - # the actual attributes are data attributes in the original html 'prefixed with data-' - embed_url = res_dict["src"] - logger.debug(f"Found embed url: {embed_url}") - data_audio = "dub" if res_dict["audio"] == "eng" else "sub" - - if data_audio != params.translation_type: - logger.debug(f"Found {data_audio} but wanted {params.translation_type}") - continue - - if not embed_url: - logger.warning("embed url not found please report to the developers") - continue - - embed_response = self.client.get( - embed_url, - headers={ - "User-Agent": self.client.headers["User-Agent"], - **SERVER_HEADERS, - }, - ) - embed_response.raise_for_status() - embed_page = embed_response.text - logger.debug("Processing embed page for JS decoding") - - decoded_js = process_animepahe_embed_page(embed_page) - if not decoded_js: - logger.error("failed to decode embed page") - continue - logger.debug(f"Decoded JS: {decoded_js[:100]}...") - juicy_stream = JUICY_STREAM_REGEX.search(decoded_js) - if not juicy_stream: - logger.error("failed to find juicy stream") - continue - logger.debug(f"Found juicy stream: {juicy_stream.group(1)}") - juicy_stream = juicy_stream.group(1) - stream_host = urlparse(juicy_stream).hostname - quality = res_dict["resolution"] - logger.debug(f"Found quality: {quality}") - translation_type = data_audio - stream_links.append((quality, juicy_stream)) - - if translation_type and stream_links: - headers = { - "User-Agent": self.client.headers["User-Agent"], - "Host": stream_host or CDN_PROVIDER, - **STREAM_HEADERS, - } - yield map_to_server( - episode, translation_type, stream_links, headers=headers - ) - - @lru_cache() - def _get_episode_info( - self, params: EpisodeStreamsParams - ) -> Optional[AnimeEpisodeInfo]: - anime_info = self._get_anime( - AnimeParams(id=params.anime_id, query=params.query) - ) - if not anime_info: - logger.error(f"No anime info for {params.anime_id}") - return - if not anime_info.episodes_info: - logger.error(f"No episodes info for {params.anime_id}") - return - for episode in anime_info.episodes_info: - if episode.episode == params.episode: - return episode - - -if __name__ == "__main__": - from ..utils.debug import test_anime_provider - - test_anime_provider(AnimePahe) diff --git a/viu_media/libs/provider/anime/animepahe/types.py b/viu_media/libs/provider/anime/animepahe/types.py deleted file mode 100644 index 365eaac..0000000 --- a/viu_media/libs/provider/anime/animepahe/types.py +++ /dev/null @@ -1,108 +0,0 @@ -from enum import Enum -from typing import Literal, TypedDict - - -class Server(Enum): - KWIK = "Kwik" - - -class AnimePaheSearchResult(TypedDict): - id: str - title: str - type: str - episodes: int - status: str - season: str - year: int - score: int - poster: str - session: str - - -class AnimePaheSearchPage(TypedDict): - total: int - per_page: int - current_page: int - last_page: int - _from: int - to: int - data: list[AnimePaheSearchResult] - - -class Episode(TypedDict): - id: str - anime_id: int - episode: float - episode2: int - edition: str - title: str - snapshot: str # episode image - disc: str - audio: Literal["eng", "jpn"] - duration: str # time 00:00:00 - session: str - filler: int - created_at: str - - -class AnimePaheAnimePage(TypedDict): - total: int - per_page: int - current_page: int - last_page: int - next_page_url: str | None - prev_page_url: str | None - _from: int - to: int - data: list[Episode] - - -class AnimePaheEpisodeInfo(TypedDict): - title: str - episode: float - id: str - translation_type: Literal["eng", "jpn"] - duration: str - poster: str - - -class AvailableEpisodesDetail(TypedDict): - sub: list[str] - dub: list[str] - raw: list[str] - - -class AnimePaheAnime(TypedDict): - id: str - title: str - year: int - season: str - poster: str - score: int - availableEpisodesDetail: AvailableEpisodesDetail - episodesInfo: list[AnimePaheEpisodeInfo] - - -class PageInfo(TypedDict): - total: int - perPage: int - currentPage: int - - -class AnimePaheSearchResults(TypedDict): - pageInfo: PageInfo - results: list[AnimePaheSearchResult] - - -class AnimePaheStreamLink(TypedDict): - quality: str - translation_type: Literal["sub", "dub"] - link: str - - -class AnimePaheServer(TypedDict): - server: Literal["kwik"] - links: list[AnimePaheStreamLink] - episode_title: str - subtitles: list - headers: dict diff --git a/viu_media/libs/provider/anime/animeunity/__init__.py b/viu_media/libs/provider/anime/animeunity/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/viu_media/libs/provider/anime/animeunity/constants.py b/viu_media/libs/provider/anime/animeunity/constants.py deleted file mode 100644 index 7552685..0000000 --- a/viu_media/libs/provider/anime/animeunity/constants.py +++ /dev/null @@ -1,17 +0,0 @@ -import re - -ANIMEUNITY = "animeunity.so" -ANIMEUNITY_BASE = f"https://www.{ANIMEUNITY}" - -MAX_TIMEOUT = 10 -TOKEN_REGEX = re.compile(r'') - -REPLACEMENT_WORDS = {"Season ": "", "Cour": "Part"} - -# Server Specific -AVAILABLE_VIDEO_QUALITY = ["1080", "720", "480"] -VIDEO_INFO_REGEX = re.compile(r"window.video\s*=\s*(\{[^\}]*\})") -VIDEO_INFO_CLEAN_REGEX = re.compile(r'(? dict | None: - """ - Extracts server information from the VixCloud/AnimeUnity embed page. - Handles extraction from both window.video object and download URL. - """ - video_info = VIDEO_INFO_REGEX.search(html_content) - download_url_match = DOWNLOAD_URL_REGEX.search(html_content) - - if not (download_url_match and video_info): - return None - - info_str = VIDEO_INFO_CLEAN_REGEX.sub(r'"\1"', video_info.group(1)) - - # Use eval context for JS constants - ctx = {"null": None, "true": True, "false": False} - try: - info = eval(info_str, ctx) - except Exception as e: - logger.error(f"Failed to parse JS object: {e}") - return None - - download_url = download_url_match.group(1) - info["link"] = download_url - - # Extract metadata from download URL if missing in window.video - if filename_match := DOWNLOAD_FILENAME_REGEX.search(download_url): - info["name"] = filename_match.group(1) - else: - info["name"] = f"{episode_title or 'Unknown'}" - - if quality_match := QUALITY_REGEX.search(download_url): - # "720p" -> 720 - info["quality"] = int(quality_match.group(1)[:-1]) - else: - info["quality"] = 0 # Fallback - - return info diff --git a/viu_media/libs/provider/anime/animeunity/mappers.py b/viu_media/libs/provider/anime/animeunity/mappers.py deleted file mode 100644 index cd52b86..0000000 --- a/viu_media/libs/provider/anime/animeunity/mappers.py +++ /dev/null @@ -1,133 +0,0 @@ -from typing import Literal - -from ..types import ( - Anime, - AnimeEpisodeInfo, - AnimeEpisodes, - EpisodeStream, - MediaTranslationType, - PageInfo, - SearchResult, - SearchResults, - Server, -) -from .constants import AVAILABLE_VIDEO_QUALITY - - -def map_to_search_results( - data: dict, translation_type: Literal["sub", "dub"] -) -> SearchResults: - results = [] - for result in data: - mapped_result = map_to_search_result(result, translation_type) - if mapped_result: - results.append(mapped_result) - - return SearchResults( - page_info=PageInfo(), - results=results, - ) - - -def map_to_search_result( - data: dict, translation_type: Literal["sub", "dub"] | None -) -> SearchResult | None: - if translation_type and data["dub"] != 1 if translation_type == "dub" else 0: - return None - return SearchResult( - id=str(data["id"]), - title=get_titles(data)[0] if get_titles(data) else "Unknown", - episodes=AnimeEpisodes( - sub=( - list(map(str, range(1, get_episodes_count(data) + 1))) - if data["dub"] == 0 - else [] - ), - dub=( - list(map(str, range(1, get_episodes_count(data) + 1))) - if data["dub"] == 1 - else [] - ), - ), - other_titles=get_titles(data), - score=data["score"], - poster=data["imageurl"], - year=data["date"], - ) - - -def map_to_anime_result(data: list, search_result: SearchResult) -> Anime: - return Anime( - id=search_result.id, - title=search_result.title, - episodes=AnimeEpisodes( - sub=[ - episode["number"] - for episode in data - if len(search_result.episodes.sub) > 0 - ], - dub=[ - episode["number"] - for episode in data - if len(search_result.episodes.dub) > 0 - ], - ), - episodes_info=[ - AnimeEpisodeInfo( - id=str(episode["id"]), - episode=episode["number"], - title=f"{search_result.title} - Ep {episode['number']}", - ) - for episode in data - ], - type=search_result.media_type, - poster=search_result.poster, - year=search_result.year, - ) - - -def map_to_server( - episode: AnimeEpisodeInfo, info: dict, translation_type: Literal["sub", "dub"] -) -> Server: - return Server( - name="vixcloud", - links=[ - EpisodeStream( - link=info["link"].replace(str(info["quality"]), quality), - title=info["name"], - quality=quality, # type: ignore - translation_type=MediaTranslationType(translation_type), - mp4=True, - ) - for quality in sorted( - list(set(AVAILABLE_VIDEO_QUALITY + [str(info["quality"])])), - key=lambda x: int(x), - reverse=True, - ) - if int(quality) <= info["quality"] - ], - episode_title=episode.title, - ) - - -def get_titles(data: dict) -> list[str]: - """ - Return the most appropriate title from the record. - """ - titles = [] - if data.get("title_eng"): - titles.append(data["title_eng"]) - if data.get("title"): - titles.append(data["title"]) - if data.get("title_it"): - titles.append(data["title_it"]) - return titles - - -def get_episodes_count(record: dict) -> int: - """ - Return the number of episodes from the record. - """ - if (count := record.get("real_episodes_count", 0)) > 0: - return count - return record.get("episodes_count", 0) diff --git a/viu_media/libs/provider/anime/animeunity/provider.py b/viu_media/libs/provider/anime/animeunity/provider.py deleted file mode 100644 index fed1b47..0000000 --- a/viu_media/libs/provider/anime/animeunity/provider.py +++ /dev/null @@ -1,170 +0,0 @@ -import logging -from functools import lru_cache - -from ...scraping.user_agents import UserAgentGenerator -from ..base import BaseAnimeProvider -from ..params import AnimeParams, EpisodeStreamsParams, SearchParams -from ..types import Anime, AnimeEpisodeInfo, SearchResult, SearchResults -from ..utils.debug import debug_provider -from .constants import ( - ANIMEUNITY_BASE, - MAX_TIMEOUT, - REPLACEMENT_WORDS, - TOKEN_REGEX, -) -from .extractor import extract_server_info -from .mappers import ( - map_to_anime_result, - map_to_search_result, - map_to_search_results, - map_to_server, -) - -logger = logging.getLogger(__name__) - - -class AnimeUnity(BaseAnimeProvider): - HEADERS = { - "User-Agent": UserAgentGenerator().random(), - } - _cache = dict[str, SearchResult]() - - @lru_cache - def _get_token(self) -> None: - response = self.client.get( - ANIMEUNITY_BASE, - headers=self.HEADERS, - timeout=MAX_TIMEOUT, - follow_redirects=True, - ) - response.raise_for_status() - token_match = TOKEN_REGEX.search(response.text) - if token_match: - self.HEADERS["x-csrf-token"] = token_match.group(1) - self.client.cookies = { - "animeunity_session": response.cookies.get("animeunity_session") or "" - } - self.client.headers = self.HEADERS - - @debug_provider - def search(self, params: SearchParams) -> SearchResults | None: - if not (res := self._search(params)): - return None - - for result in res.results: - self._cache[result.id] = result - - return res - - @lru_cache - def _search(self, params: SearchParams) -> SearchResults | None: - self._get_token() - # Replace words in query to - query = params.query - for old, new in REPLACEMENT_WORDS.items(): - query = query.replace(old, new) - - response = self.client.post( - url=f"{ANIMEUNITY_BASE}/livesearch", - data={"title": query}, - timeout=MAX_TIMEOUT, - ) - - response.raise_for_status() - return map_to_search_results( - response.json().get("records", []), params.translation_type - ) - - @debug_provider - def get(self, params: AnimeParams) -> Anime | None: - return self._get_anime(params) - - @lru_cache() - def _get_search_result(self, params: AnimeParams) -> SearchResult | None: - if cached := self._cache.get(params.id): - return cached - - response = self.client.get( - url=f"{ANIMEUNITY_BASE}/info_api/{params.id}/", - timeout=MAX_TIMEOUT, - ) - response.raise_for_status() - data = response.json() - - if res := map_to_search_result(data, None): - self._cache[params.id] = res - return res - - @lru_cache - def _get_anime(self, params: AnimeParams) -> Anime | None: - if (search_result := self._get_search_result(params)) is None: - logger.error(f"No search result found for ID {params.id}") - return None - - # Fetch episodes in chunks - data = [] - start_range = 1 - episode_count = max( - len(search_result.episodes.sub), len(search_result.episodes.dub) - ) - while start_range <= episode_count: - end_range = min(start_range + 119, episode_count) - response = self.client.get( - url=f"{ANIMEUNITY_BASE}/info_api/{params.id}/1", - params={ - "start_range": start_range, - "end_range": end_range, - }, - timeout=MAX_TIMEOUT, - ) - response.raise_for_status() - data.extend(response.json().get("episodes", [])) - start_range = end_range + 1 - - return map_to_anime_result(data, search_result) - - @lru_cache() - def _get_episode_info( - self, params: EpisodeStreamsParams - ) -> AnimeEpisodeInfo | None: - anime_info = self._get_anime( - AnimeParams(id=params.anime_id, query=params.query) - ) - if not anime_info: - logger.error(f"No anime info for {params.anime_id}") - return - if not anime_info.episodes_info: - logger.error(f"No episodes info for {params.anime_id}") - return - for episode in anime_info.episodes_info: - if episode.episode == params.episode: - return episode - - @debug_provider - def episode_streams(self, params: EpisodeStreamsParams): - if not (episode := self._get_episode_info(params)): - logger.error( - f"Episode {params.episode} doesn't exist for anime {params.anime_id}" - ) - return - # Get the Server url - response = self.client.get( - url=f"{ANIMEUNITY_BASE}/embed-url/{episode.id}", timeout=MAX_TIMEOUT - ) - response.raise_for_status() - - # Fetch the Server page - video_response = self.client.get(url=response.text.strip(), timeout=MAX_TIMEOUT) - video_response.raise_for_status() - - if not (info := extract_server_info(video_response.text, episode.title)): - logger.error(f"Failed to extract video info for episode {episode.id}") - return None - - yield map_to_server(episode, info, params.translation_type) - - -if __name__ == "__main__": - from ..utils.debug import test_anime_provider - - test_anime_provider(AnimeUnity) diff --git a/viu_media/libs/provider/anime/base.py b/viu_media/libs/provider/anime/base.py deleted file mode 100644 index 3110c2c..0000000 --- a/viu_media/libs/provider/anime/base.py +++ /dev/null @@ -1,39 +0,0 @@ -from abc import ABC, abstractmethod -from typing import TYPE_CHECKING, ClassVar, Dict - -from .params import AnimeParams, EpisodeStreamsParams, SearchParams - -if TYPE_CHECKING: - from collections.abc import Iterator - - from httpx import Client - - from .types import Anime, SearchResults, Server - - -class BaseAnimeProvider(ABC): - HEADERS: ClassVar[Dict[str, str]] - - def __init_subclass__(cls, **kwargs): - super().__init_subclass__(**kwargs) - if not hasattr(cls, "HEADERS"): - raise TypeError( - "Subclasses of BaseAnimeProvider must define a 'HEADERS' class attribute." - ) - - def __init__(self, client: "Client") -> None: - self.client = client - - @abstractmethod - def search(self, params: SearchParams) -> "SearchResults | None": - pass - - @abstractmethod - def get(self, params: AnimeParams) -> "Anime | None": - pass - - @abstractmethod - def episode_streams( - self, params: EpisodeStreamsParams - ) -> "Iterator[Server] | None": - pass diff --git a/viu_media/libs/provider/anime/params.py b/viu_media/libs/provider/anime/params.py deleted file mode 100644 index 8c52f0c..0000000 --- a/viu_media/libs/provider/anime/params.py +++ /dev/null @@ -1,46 +0,0 @@ -from dataclasses import dataclass -from typing import Literal, Optional - - -@dataclass(frozen=True) -class SearchParams: - """Parameters for searching anime.""" - - query: str - - # pagination and sorting - current_page: int = 1 - page_limit: int = 20 - sort_by: str = "relevance" - order: Literal["asc", "desc"] = "desc" - - # filters - translation_type: Literal["sub", "dub"] = "sub" - genre: Optional[str] = None - year: Optional[int] = None - status: Optional[str] = None - allow_nsfw: bool = True - allow_unknown: bool = True - country_of_origin: Optional[str] = None - - -@dataclass(frozen=True) -class EpisodeStreamsParams: - """Parameters for fetching episode streams.""" - - query: str - anime_id: str - episode: str - translation_type: Literal["sub", "dub"] = "sub" - server: Optional[str] = None - quality: Literal["1080", "720", "480", "360"] = "720" - subtitles: bool = True - - -@dataclass(frozen=True) -class AnimeParams: - """Parameters for fetching anime details.""" - - id: str - # HACK: for the sake of providers which require previous data - query: str diff --git a/viu_media/libs/provider/anime/provider.py b/viu_media/libs/provider/anime/provider.py deleted file mode 100644 index 4784dab..0000000 --- a/viu_media/libs/provider/anime/provider.py +++ /dev/null @@ -1,72 +0,0 @@ -import importlib -import logging - -from httpx import Client - -from .base import BaseAnimeProvider -from .types import ProviderName - -logger = logging.getLogger(__name__) - -PROVIDERS_AVAILABLE = { - "allanime": "provider.AllAnime", - "animepahe": "provider.AnimePahe", - "hianime": "provider.HiAnime", - "nyaa": "provider.Nyaa", - "yugen": "provider.Yugen", - "animeunity": "provider.AnimeUnity", -} - - -class AnimeProviderFactory: - """Factory for creating anime provider instances.""" - - @staticmethod - def create(provider_name: ProviderName) -> BaseAnimeProvider: - """ - Dynamically creates an instance of the specified anime provider. - - This method imports the necessary provider module, instantiates its main class, - and injects a pre-configured HTTP client. - - Args: - provider_name: The name of the provider to create (e.g., 'allanime'). - - Returns: - An instance of a class that inherits from BaseProvider. - - Raises: - ValueError: If the provider_name is not supported. - ImportError: If the provider module or class cannot be found. - """ - from ....core.utils.networking import random_user_agent - - # Correctly determine module and class name from the map - import_path = PROVIDERS_AVAILABLE[provider_name.value.lower()] - module_name, class_name = import_path.split(".", 1) - - # Construct the full package path for dynamic import - package_path = f"viu_media.libs.provider.anime.{provider_name.value.lower()}" - - try: - provider_module = importlib.import_module(f".{module_name}", package_path) - provider_class = getattr(provider_module, class_name) - except (ImportError, AttributeError) as e: - logger.error( - f"Failed to load provider '{provider_name.value.lower()}': {e}" - ) - raise ImportError( - f"Could not load provider '{provider_name.value.lower()}'. " - "Check the module path and class name in PROVIDERS_AVAILABLE." - ) from e - - # Each provider class requires an httpx.Client, which we set up here. - client = Client( - headers={"User-Agent": random_user_agent(), **provider_class.HEADERS} - ) - - return provider_class(client) - - -# Simple alias for ease of use, consistent with other factories in the codebase. -create_provider = AnimeProviderFactory.create diff --git a/viu_media/libs/provider/anime/types.py b/viu_media/libs/provider/anime/types.py deleted file mode 100644 index 3cf81ee..0000000 --- a/viu_media/libs/provider/anime/types.py +++ /dev/null @@ -1,119 +0,0 @@ -from enum import Enum -from typing import List, Literal, Optional - -from pydantic import BaseModel, ConfigDict - -# from .allanime.types import Server as AllAnimeServer -# from .animepahe.types import Server as AnimePaheServer - - -# ENUMS -class ProviderName(Enum): - ALLANIME = "allanime" - ANIMEPAHE = "animepahe" - ANIMEUNITY = "animeunity" - - -class ProviderServer(Enum): - TOP = "TOP" - - # AllAnimeServer values - SHAREPOINT = "sharepoint" - DROPBOX = "dropbox" - GOGOANIME = "gogoanime" - WETRANSFER = "weTransfer" - WIXMP = "wixmp" - YT = "Yt" - MP4_UPLOAD = "mp4-upload" - - # AnimePaheServer values - KWIK = "kwik" - - # AnimeUnityServer values - VIXCLOUD = "vixcloud" - - -class MediaTranslationType(Enum): - SUB = "sub" - DUB = "dub" - RAW = "raw" - - -# MODELS -class BaseAnimeProviderModel(BaseModel): - model_config = ConfigDict(frozen=True) - - -class PageInfo(BaseAnimeProviderModel): - total: Optional[int] = None - per_page: Optional[int] = None - current_page: Optional[int] = None - - -class AnimeEpisodes(BaseAnimeProviderModel): - sub: List[str] - dub: List[str] = [] - raw: List[str] = [] - - -class SearchResult(BaseAnimeProviderModel): - id: str - title: str - episodes: AnimeEpisodes - other_titles: List[str] = [] - media_type: Optional[str] = None - score: Optional[float] = None - status: Optional[str] = None - season: Optional[str] = None - poster: Optional[str] = None - year: Optional[str] = None - - -class SearchResults(BaseAnimeProviderModel): - page_info: PageInfo - results: List[SearchResult] - - -class AnimeEpisodeInfo(BaseAnimeProviderModel): - id: str - episode: str - session_id: Optional[str] = None - title: Optional[str] = None - poster: Optional[str] = None - duration: Optional[str] = None - - -class Anime(BaseAnimeProviderModel): - id: str - title: str - episodes: AnimeEpisodes - type: Optional[str] = None - episodes_info: List[AnimeEpisodeInfo] | None = None - poster: Optional[str] = None - year: Optional[str] = None - - -class EpisodeStream(BaseAnimeProviderModel): - # episode: str - link: str - title: Optional[str] = None - quality: Literal["360", "480", "720", "1080"] = "720" - translation_type: MediaTranslationType = MediaTranslationType.SUB - format: Optional[str] = None - hls: Optional[bool] = None - mp4: Optional[bool] = None - priority: Optional[int] = None - - -class Subtitle(BaseAnimeProviderModel): - url: str - language: Optional[str] = None - - -class Server(BaseAnimeProviderModel): - name: str - links: List[EpisodeStream] - episode_title: Optional[str] = None - headers: dict[str, str] = dict() - subtitles: List[Subtitle] = [] - audio: List[str] = [] diff --git a/viu_media/libs/provider/anime/utils/debug.py b/viu_media/libs/provider/anime/utils/debug.py deleted file mode 100644 index bde1969..0000000 --- a/viu_media/libs/provider/anime/utils/debug.py +++ /dev/null @@ -1,90 +0,0 @@ -import functools -import logging -import os -from typing import Type - -from ..base import BaseAnimeProvider - -logger = logging.getLogger(__name__) - - -def debug_provider(provider_function): - @functools.wraps(provider_function) - def _provider_function_wrapper(self, *args, **kwargs): - provider_name = self.__class__.__name__.upper() - if not os.environ.get("VIU_DEBUG"): - try: - return provider_function(self, *args, **kwargs) - except Exception as e: - logger.error(f"[{provider_name}@{provider_function.__name__}]: {e}") - else: - return provider_function(self, *args, **kwargs) - - return _provider_function_wrapper - - -def test_anime_provider(AnimeProvider: Type[BaseAnimeProvider]): - import shutil - import subprocess - - from httpx import Client - - from .....core.constants import APP_ASCII_ART - from .....core.utils.networking import random_user_agent - from ..params import AnimeParams, EpisodeStreamsParams, SearchParams - - anime_provider = AnimeProvider( - Client(headers={"User-Agent": random_user_agent(), **AnimeProvider.HEADERS}) - ) - print(APP_ASCII_ART.read_text(encoding="utf-8")) - query = input("What anime would you like to stream: ") - search_results = anime_provider.search(SearchParams(query=query)) - if not search_results: - return - for i, search_result in enumerate(search_results.results): - print(f"{i + 1}: {search_result.title}") - result = search_results.results[ - int(input(f"Select result (1-{len(search_results.results)}): ")) - 1 - ] - anime = anime_provider.get(AnimeParams(id=result.id, query=query)) - - if not anime: - return - translation_type = input("Preferred Translation Type: [dub,sub,raw]: ") - for episode in getattr(anime.episodes, translation_type): - print(episode) - episode_number = input("What episode do you wish to watch: ") - episode_streams = anime_provider.episode_streams( - EpisodeStreamsParams( - query=query, - anime_id=anime.id, - episode=episode_number, - translation_type=translation_type, # type:ignore - ) - ) - - if not episode_streams: - return - episode_streams = list(episode_streams) - for i, stream in enumerate(episode_streams): - print(f"{i + 1}: {stream.name}") - stream = episode_streams[int(input("Select your preferred server: ")) - 1] - for i, link in enumerate(stream.links): - print(f"{i + 1}: {link.quality}") - link = stream.links[int(input("Select your preferred quality: ")) - 1] - if executable := shutil.which("mpv"): - cmd = executable - elif executable := shutil.which("xdg-open"): - cmd = executable - elif executable := shutil.which("open"): - cmd = executable - else: - return - - print( - "Now streaming: ", - anime.title, - "Episode: ", - stream.episode_title if stream.episode_title else episode_number, - ) - subprocess.run([cmd, link.link]) diff --git a/viu_media/libs/provider/scraping/__init__.py b/viu_media/libs/provider/scraping/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/viu_media/libs/provider/scraping/html_parser.py b/viu_media/libs/provider/scraping/html_parser.py deleted file mode 100644 index 908e2d6..0000000 --- a/viu_media/libs/provider/scraping/html_parser.py +++ /dev/null @@ -1,494 +0,0 @@ -# pyright: reportAttributeAccessIssue=false, reportPossiblyUnboundVariable=false -""" -HTML parsing utilities with optional lxml support. - -This module provides comprehensive HTML parsing capabilities using either -Python's built-in html.parser or lxml for better performance when available. -""" - -# TODO: Review and optimize the HTML parsing logic for better performance and flexibility. -# Consider adding more utility functions for common HTML manipulation tasks. -import logging -import re -from html.parser import HTMLParser as BaseHTMLParser -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union - -if TYPE_CHECKING: - from lxml import etree - -logger = logging.getLogger(__name__) - -# Try to import lxml -HAS_LXML = False -try: - from lxml import etree, html as lxml_html - - HAS_LXML = True - logger.debug("lxml is available and will be used for HTML parsing") -except ImportError: - logger.debug("lxml not available, falling back to html.parser") - - -class HTMLParserConfig: - """Configuration for HTML parser selection.""" - - def __init__(self, use_lxml: Optional[bool] = None): - """ - Initialize parser configuration. - - Args: - use_lxml: Force use of lxml (True), html.parser (False), or auto-detect (None) - """ - if use_lxml is None: - self.use_lxml = HAS_LXML - else: - self.use_lxml = use_lxml and HAS_LXML - - if use_lxml and not HAS_LXML: - logger.warning( - "lxml requested but not available, falling back to html.parser" - ) - - -class HTMLParser: - """ - Comprehensive HTML parser with optional lxml support. - - Provides a unified interface for HTML parsing operations regardless - of the underlying parser implementation. - """ - - def __init__(self, config: Optional[HTMLParserConfig] = None): - """Initialize the HTML parser with configuration.""" - self.config = config or HTMLParserConfig() - - def parse(self, html_content: str) -> Union[Any, "ParsedHTML"]: - """ - Parse HTML content and return a parsed tree. - - Args: - html_content: Raw HTML string to parse - - Returns: - Parsed HTML tree (lxml Element or custom ParsedHTML object) - """ - if self.config.use_lxml: - return self._parse_with_lxml(html_content) - else: - return self._parse_with_builtin(html_content) - - def _parse_with_lxml(self, html_content: str) -> Any: - """Parse HTML using lxml.""" - try: - # Use lxml's HTML parser which is more lenient - return lxml_html.fromstring(html_content) - except Exception as e: - logger.warning(f"lxml parsing failed: {e}, falling back to html.parser") - return self._parse_with_builtin(html_content) - - def _parse_with_builtin(self, html_content: str) -> "ParsedHTML": - """Parse HTML using Python's built-in parser.""" - parser = BuiltinHTMLParser() - parser.feed(html_content) - return ParsedHTML(parser.elements, html_content) - - -class BuiltinHTMLParser(BaseHTMLParser): - """Enhanced HTML parser using Python's built-in capabilities.""" - - def __init__(self): - super().__init__() - self.elements = [] - self.current_element = None - self.element_stack = [] - - def handle_starttag(self, tag: str, attrs: List[Tuple[str, Optional[str]]]): - """Handle opening tags.""" - element = { - "tag": tag, - "attrs": dict(attrs), - "text": "", - "children": [], - "start_pos": self.getpos(), - } - - if self.element_stack: - self.element_stack[-1]["children"].append(element) - else: - self.elements.append(element) - - self.element_stack.append(element) - - def handle_endtag(self, tag: str): - """Handle closing tags.""" - if self.element_stack and self.element_stack[-1]["tag"] == tag: - element = self.element_stack.pop() - element["end_pos"] = self.getpos() - - def handle_data(self, data: str): - """Handle text content.""" - if self.element_stack: - self.element_stack[-1]["text"] += data - - -class ParsedHTML: - """Wrapper for parsed HTML using built-in parser.""" - - def __init__(self, elements: List[Dict], raw_html: str): - self.elements = elements - self.raw_html = raw_html - - def find_by_id(self, element_id: str) -> Optional[Dict]: - """Find element by ID.""" - return self._find_recursive( - self.elements, lambda el: el["attrs"].get("id") == element_id - ) - - def find_by_class(self, class_name: str) -> List[Dict]: - """Find elements by class name.""" - results = [] - self._find_all_recursive( - self.elements, - lambda el: class_name in el["attrs"].get("class", "").split(), - results, - ) - return results - - def find_by_tag(self, tag_name: str) -> List[Dict]: - """Find elements by tag name.""" - results = [] - self._find_all_recursive( - self.elements, lambda el: el["tag"].lower() == tag_name.lower(), results - ) - return results - - def _find_recursive(self, elements: List[Dict], condition) -> Optional[Dict]: - """Recursively find first element matching condition.""" - for element in elements: - if condition(element): - return element - result = self._find_recursive(element["children"], condition) - if result: - return result - return None - - def _find_all_recursive(self, elements: List[Dict], condition, results: List[Dict]): - """Recursively find all elements matching condition.""" - for element in elements: - if condition(element): - results.append(element) - self._find_all_recursive(element["children"], condition, results) - - -# Global parser instance -_default_parser = HTMLParser() - - -def extract_attributes(html_element: str) -> Dict[str, str]: - """ - Extract attributes from an HTML element string. - - Args: - html_element: HTML element as string (e.g., '
') - - Returns: - Dictionary of attribute name-value pairs - - Examples: - >>> extract_attributes('
') - {'class': 'test', 'id': 'main'} - """ - if not html_element: - return {} - - # Use regex to extract attributes from HTML string - attr_pattern = r'(\w+)=(["\'])([^"\']*?)\2' - matches = re.findall(attr_pattern, html_element) - - attributes = {} - for match in matches: - attr_name, _, attr_value = match - attributes[attr_name] = attr_value - - # Handle attributes without quotes - unquoted_pattern = r"(\w+)=([^\s>]+)" - unquoted_matches = re.findall(unquoted_pattern, html_element) - for attr_name, attr_value in unquoted_matches: - if attr_name not in attributes: - attributes[attr_name] = attr_value - - return attributes - - -def get_element_by_id(element_id: str, html_content: str) -> Optional[str]: - """ - Get HTML element by ID. - - Args: - element_id: The ID attribute value to search for - html_content: HTML content to search in - - Returns: - HTML string of the element or None if not found - - Examples: - >>> html = '
Content
' - >>> get_element_by_id("test", html) - '
Content
' - """ - parsed = _default_parser.parse(html_content) - - if _default_parser.config.use_lxml and HAS_LXML: - try: - element = parsed.xpath(f'//*[@id="{element_id}"]') - if element: - return etree.tostring(element[0], encoding="unicode", method="html") - except Exception as e: - logger.warning(f"lxml XPath search failed: {e}") - return None - else: - element = parsed.find_by_id(element_id) - if element: - return _element_to_html(element, html_content) - - return None - - -def get_element_by_tag(tag_name: str, html_content: str) -> Optional[str]: - """ - Get first HTML element by tag name. - - Args: - tag_name: The tag name to search for - html_content: HTML content to search in - - Returns: - HTML string of the element or None if not found - """ - parsed = _default_parser.parse(html_content) - - if _default_parser.config.use_lxml and HAS_LXML: - try: - elements = parsed.xpath(f"//{tag_name}") - if elements: - return etree.tostring(elements[0], encoding="unicode", method="html") - except Exception as e: - logger.warning(f"lxml XPath search failed: {e}") - return None - else: - elements = parsed.find_by_tag(tag_name) - if elements: - return _element_to_html(elements[0], html_content) - - return None - - -def get_element_by_class(class_name: str, html_content: str) -> Optional[str]: - """ - Get first HTML element by class name. - - Args: - class_name: The class name to search for - html_content: HTML content to search in - - Returns: - HTML string of the element or None if not found - """ - parsed = _default_parser.parse(html_content) - - if _default_parser.config.use_lxml and HAS_LXML: - try: - elements = parsed.xpath(f'//*[contains(@class, "{class_name}")]') - if elements: - return etree.tostring(elements[0], encoding="unicode", method="html") - except Exception as e: - logger.warning(f"lxml XPath search failed: {e}") - return None - else: - elements = parsed.find_by_class(class_name) - if elements: - return _element_to_html(elements[0], html_content) - - return None - - -def get_elements_by_tag(tag_name: str, html_content: str) -> List[str]: - """ - Get all HTML elements by tag name. - - Args: - tag_name: The tag name to search for - html_content: HTML content to search in - - Returns: - List of HTML strings for matching elements - """ - parsed = _default_parser.parse(html_content) - results = [] - - if _default_parser.config.use_lxml and HAS_LXML: - try: - elements = parsed.xpath(f"//{tag_name}") - for element in elements: - results.append( - etree.tostring(element, encoding="unicode", method="html") - ) - except Exception as e: - logger.warning(f"lxml XPath search failed: {e}") - else: - elements = parsed.find_by_tag(tag_name) - for element in elements: - results.append(_element_to_html(element, html_content)) - - return results - - -def get_elements_by_class(class_name: str, html_content: str) -> List[str]: - """ - Get all HTML elements by class name. - - Args: - class_name: The class name to search for - html_content: HTML content to search in - - Returns: - List of HTML strings for matching elements - """ - parsed = _default_parser.parse(html_content) - results = [] - - if _default_parser.config.use_lxml and HAS_LXML: - try: - elements = parsed.xpath(f'//*[contains(@class, "{class_name}")]') - for element in elements: - results.append( - etree.tostring(element, encoding="unicode", method="html") - ) - except Exception as e: - logger.warning(f"lxml XPath search failed: {e}") - else: - elements = parsed.find_by_class(class_name) - for element in elements: - results.append(_element_to_html(element, html_content)) - - return results - - -def get_elements_html_by_class(class_name: str, html_content: str) -> List[str]: - """ - Get HTML strings of elements by class name. - - This is an alias for get_elements_by_class for yt-dlp compatibility. - - Args: - class_name: The class name to search for - html_content: HTML content to search in - - Returns: - List of HTML strings for matching elements - """ - return get_elements_by_class(class_name, html_content) - - -def get_element_text_and_html_by_tag( - tag_name: str, html_content: str -) -> Tuple[Optional[str], Optional[str]]: - """ - Get both text content and HTML of first element by tag name. - - Args: - tag_name: The tag name to search for - html_content: HTML content to search in - - Returns: - Tuple of (text_content, html_string) or (None, None) if not found - - Examples: - >>> html = '' - >>> get_element_text_and_html_by_tag("script", html) - ('alert("test");', '') - """ - parsed = _default_parser.parse(html_content) - - if _default_parser.config.use_lxml and HAS_LXML: - try: - elements = parsed.xpath(f"//{tag_name}") - if elements: - element = elements[0] - text = ( - element.text_content() - if hasattr(element, "text_content") - else (element.text or "") - ) - html_str = etree.tostring(element, encoding="unicode", method="html") - return text, html_str - except Exception as e: - logger.warning(f"lxml XPath search failed: {e}") - return None, None - else: - elements = parsed.find_by_tag(tag_name) - if elements: - element = elements[0] - text = _extract_text_content(element) - html_str = _element_to_html(element, html_content) - return text, html_str - - return None, None - - -def _element_to_html(element: Dict, original_html: str) -> str: - """ - Convert parsed element back to HTML string. - - This is a simplified implementation that reconstructs HTML from parsed data. - For production use, consider using lxml for better accuracy. - """ - if not element: - return "" - - # Build opening tag - tag = element["tag"] - attrs = element.get("attrs", {}) - attr_str = " ".join(f'{k}="{v}"' for k, v in attrs.items() if v is not None) - - if attr_str: - opening_tag = f"<{tag} {attr_str}>" - else: - opening_tag = f"<{tag}>" - - # Add text content - text = element.get("text", "") - - # Add children - children_html = "" - for child in element.get("children", []): - children_html += _element_to_html(child, original_html) - - # Build closing tag - closing_tag = f"" - - return f"{opening_tag}{text}{children_html}{closing_tag}" - - -def _extract_text_content(element: Dict) -> str: - """Extract all text content from element and its children.""" - text = element.get("text", "") - - for child in element.get("children", []): - text += _extract_text_content(child) - - return text - - -def configure_parser(use_lxml: Optional[bool] = None) -> None: - """ - Configure the global HTML parser. - - Args: - use_lxml: Force use of lxml (True), html.parser (False), or auto-detect (None) - """ - global _default_parser - _default_parser = HTMLParser(HTMLParserConfig(use_lxml)) - logger.info( - f"HTML parser configured: {'lxml' if _default_parser.config.use_lxml else 'html.parser'}" - ) diff --git a/viu_media/libs/provider/scraping/user_agents.py b/viu_media/libs/provider/scraping/user_agents.py deleted file mode 100644 index 4637aae..0000000 --- a/viu_media/libs/provider/scraping/user_agents.py +++ /dev/null @@ -1,235 +0,0 @@ -""" -User agent utilities for web scraping. - -Provides functionality to generate random user agent strings -to avoid detection and blocking by websites. -""" - -import random -from typing import List, Optional - - -class UserAgentGenerator: - """ - Generator for realistic user agent strings. - - Provides a variety of common user agents from different browsers - and operating systems to help avoid detection. - """ - - # Common user agents for different browsers and OS combinations - USER_AGENTS = [ - # Chrome on Windows - "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", - "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36", - "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36", - # Chrome on macOS - "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", - "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36", - # Chrome on Linux - "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36", - "Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36", - # Firefox on Windows - "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:123.0) Gecko/20100101 Firefox/123.0", - "Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:122.0) Gecko/20100101 Firefox/122.0", - # Firefox on macOS - "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:123.0) Gecko/20100101 Firefox/123.0", - "Mozilla/5.0 (Macintosh; Intel Mac OS X 10.15; rv:122.0) Gecko/20100101 Firefox/122.0", - # Firefox on Linux - "Mozilla/5.0 (X11; Linux x86_64; rv:123.0) Gecko/20100101 Firefox/123.0", - "Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:123.0) Gecko/20100101 Firefox/123.0", - # Safari on macOS - "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.3 Safari/605.1.15", - "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_15_7) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.2 Safari/605.1.15", - # Edge on Windows - "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Safari/537.36 Edg/122.0.0.0", - "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/121.0.0.0 Safari/537.36 Edg/121.0.0.0", - # Mobile Chrome (Android) - "Mozilla/5.0 (Linux; Android 14; SM-G998B) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Mobile Safari/537.36", - "Mozilla/5.0 (Linux; Android 13; Pixel 7) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/122.0.0.0 Mobile Safari/537.36", - # Mobile Safari (iOS) - "Mozilla/5.0 (iPhone; CPU iPhone OS 17_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.3 Mobile/15E148 Safari/604.1", - "Mozilla/5.0 (iPad; CPU OS 17_3 like Mac OS X) AppleWebKit/605.1.15 (KHTML, like Gecko) Version/17.3 Mobile/15E148 Safari/604.1", - ] - - # Browser-specific user agents for when you need a specific browser - CHROME_USER_AGENTS = [ - ua for ua in USER_AGENTS if "Chrome" in ua and "Edg" not in ua - ] - FIREFOX_USER_AGENTS = [ua for ua in USER_AGENTS if "Firefox" in ua] - SAFARI_USER_AGENTS = [ - ua for ua in USER_AGENTS if "Safari" in ua and "Chrome" not in ua - ] - EDGE_USER_AGENTS = [ua for ua in USER_AGENTS if "Edg" in ua] - - # Platform-specific user agents - WINDOWS_USER_AGENTS = [ua for ua in USER_AGENTS if "Windows NT" in ua] - MACOS_USER_AGENTS = [ua for ua in USER_AGENTS if "Macintosh" in ua] - LINUX_USER_AGENTS = [ - ua for ua in USER_AGENTS if "Linux" in ua and "Android" not in ua - ] - MOBILE_USER_AGENTS = [ua for ua in USER_AGENTS if "Mobile" in ua or "Android" in ua] - - def __init__(self, seed: Optional[int] = None): - """ - Initialize the user agent generator. - - Args: - seed: Random seed for reproducible results (optional) - """ - if seed is not None: - random.seed(seed) - - def random(self) -> str: - """ - Get a random user agent string. - - Returns: - Random user agent string - """ - return random.choice(self.USER_AGENTS) - - def random_browser(self, browser: str) -> str: - """ - Get a random user agent for a specific browser. - - Args: - browser: Browser name ('chrome', 'firefox', 'safari', 'edge') - - Returns: - Random user agent string for the specified browser - - Raises: - ValueError: If browser is not supported - """ - browser = browser.lower() - if browser == "chrome": - return random.choice(self.CHROME_USER_AGENTS) - elif browser == "firefox": - return random.choice(self.FIREFOX_USER_AGENTS) - elif browser == "safari": - return random.choice(self.SAFARI_USER_AGENTS) - elif browser == "edge": - return random.choice(self.EDGE_USER_AGENTS) - else: - raise ValueError(f"Unsupported browser: {browser}") - - def random_platform(self, platform: str) -> str: - """ - Get a random user agent for a specific platform. - - Args: - platform: Platform name ('windows', 'macos', 'linux', 'mobile') - - Returns: - Random user agent string for the specified platform - - Raises: - ValueError: If platform is not supported - """ - platform = platform.lower() - if platform == "windows": - return random.choice(self.WINDOWS_USER_AGENTS) - elif platform in ("macos", "mac"): - return random.choice(self.MACOS_USER_AGENTS) - elif platform == "linux": - return random.choice(self.LINUX_USER_AGENTS) - elif platform == "mobile": - return random.choice(self.MOBILE_USER_AGENTS) - else: - raise ValueError(f"Unsupported platform: {platform}") - - def add_user_agent(self, user_agent: str) -> None: - """ - Add a custom user agent to the list. - - Args: - user_agent: Custom user agent string to add - """ - if user_agent not in self.USER_AGENTS: - self.USER_AGENTS.append(user_agent) - - def get_all(self) -> List[str]: - """ - Get all available user agent strings. - - Returns: - List of all user agent strings - """ - return self.USER_AGENTS.copy() - - -# Global instance for convenience -_default_generator = UserAgentGenerator() - - -def random_user_agent() -> str: - """ - Get a random user agent string using the default generator. - - Returns: - Random user agent string - - Examples: - >>> ua = random_user_agent() - >>> "Mozilla" in ua - True - """ - return _default_generator.random() - - -def random_user_agent_browser(browser: str) -> str: - """ - Get a random user agent for a specific browser. - - Args: - browser: Browser name ('chrome', 'firefox', 'safari', 'edge') - - Returns: - Random user agent string for the specified browser - """ - return _default_generator.random_browser(browser) - - -def random_user_agent_platform(platform: str) -> str: - """ - Get a random user agent for a specific platform. - - Args: - platform: Platform name ('windows', 'macos', 'linux', 'mobile') - - Returns: - Random user agent string for the specified platform - """ - return _default_generator.random_platform(platform) - - -def set_user_agent_seed(seed: int) -> None: - """ - Set the random seed for user agent generation. - - Args: - seed: Random seed value - """ - global _default_generator - _default_generator = UserAgentGenerator(seed) - - -def add_custom_user_agent(user_agent: str) -> None: - """ - Add a custom user agent to the default generator. - - Args: - user_agent: Custom user agent string to add - """ - _default_generator.add_user_agent(user_agent) - - -def get_all_user_agents() -> List[str]: - """ - Get all available user agent strings from the default generator. - - Returns: - List of all user agent strings - """ - return _default_generator.get_all() diff --git a/viu_media/libs/provider/scraping/utils.py b/viu_media/libs/provider/scraping/utils.py deleted file mode 100644 index b241bcd..0000000 --- a/viu_media/libs/provider/scraping/utils.py +++ /dev/null @@ -1,272 +0,0 @@ -""" -Encoding and utility functions for web scraping. - -Provides various encoding utilities including base-N encoding -that was previously sourced from yt-dlp. -""" - -import string -from typing import Optional - - -def encode_base_n(num: int, n: int, table: Optional[str] = None) -> str: - """ - Encode a number in base-n representation. - - Args: - num: The number to encode - n: The base to use for encoding - table: Custom character table (optional) - - Returns: - String representation of the number in base-n - - Examples: - >>> encode_base_n(255, 16) - 'ff' - >>> encode_base_n(42, 36) - '16' - """ - if table is None: - # Default table: 0-9, a-z - table = string.digits + string.ascii_lowercase - - if not 2 <= n <= len(table): - raise ValueError(f"Base must be between 2 and {len(table)}") - - if num == 0: - return table[0] - - result = [] - is_negative = num < 0 - num = abs(num) - - while num > 0: - result.append(table[num % n]) - num //= n - - if is_negative: - result.append("-") - - return "".join(reversed(result)) - - -def decode_base_n(encoded: str, n: int, table: Optional[str] = None) -> int: - """ - Decode a base-n encoded string back to an integer. - - Args: - encoded: The base-n encoded string - n: The base used for encoding - table: Custom character table (optional) - - Returns: - The decoded integer - - Examples: - >>> decode_base_n('ff', 16) - 255 - >>> decode_base_n('16', 36) - 42 - """ - if table is None: - table = string.digits + string.ascii_lowercase - - if not 2 <= n <= len(table): - raise ValueError(f"Base must be between 2 and {len(table)}") - - if not encoded: - return 0 - - is_negative = encoded.startswith("-") - if is_negative: - encoded = encoded[1:] - - result = 0 - for i, char in enumerate(reversed(encoded.lower())): - if char not in table: - raise ValueError(f"Invalid character '{char}' for base {n}") - - digit_value = table.index(char) - if digit_value >= n: - raise ValueError(f"Invalid digit '{char}' for base {n}") - - result += digit_value * (n**i) - - return -result if is_negative else result - - -def url_encode(text: str, safe: str = "") -> str: - """ - URL encode a string. - - Args: - text: Text to encode - safe: Characters that should not be encoded - - Returns: - URL encoded string - """ - import urllib.parse - - return urllib.parse.quote(text, safe=safe) - - -def url_decode(text: str) -> str: - """ - URL decode a string. - - Args: - text: URL encoded text to decode - - Returns: - Decoded string - """ - import urllib.parse - - return urllib.parse.unquote(text) - - -def html_unescape(text: str) -> str: - """ - Unescape HTML entities in text. - - Args: - text: Text containing HTML entities - - Returns: - Text with HTML entities unescaped - - Examples: - >>> html_unescape('"Hello" & <World>') - '"Hello" & ' - """ - import html - - return html.unescape(text) - - -def strip_tags(html_content: str) -> str: - """ - Remove all HTML tags from content, leaving only text. - - Args: - html_content: HTML content with tags - - Returns: - Plain text with tags removed - - Examples: - >>> strip_tags('

Hello world!

') - 'Hello world!' - """ - import re - - return re.sub(r"<[^>]+>", "", html_content) - - -def normalize_whitespace(text: str) -> str: - """ - Normalize whitespace in text by collapsing multiple spaces and removing leading/trailing whitespace. - - Args: - text: Text to normalize - - Returns: - Text with normalized whitespace - - Examples: - >>> normalize_whitespace(' Hello world \\n\\t ') - 'Hello world' - """ - import re - - return re.sub(r"\s+", " ", text.strip()) - - -def extract_domain(url: str) -> str: - """ - Extract domain from a URL. - - Args: - url: Full URL - - Returns: - Domain portion of the URL - - Examples: - >>> extract_domain('https://example.com/path?query=1') - 'example.com' - """ - import urllib.parse - - parsed = urllib.parse.urlparse(url) - return parsed.netloc - - -def join_url(base: str, path: str) -> str: - """ - Join a base URL with a path. - - Args: - base: Base URL - path: Path to join - - Returns: - Combined URL - - Examples: - >>> join_url('https://example.com', '/api/data') - 'https://example.com/api/data' - """ - import urllib.parse - - return urllib.parse.urljoin(base, path) - - -def parse_query_string(query: str) -> dict: - """ - Parse a query string into a dictionary. - - Args: - query: Query string (with or without leading '?') - - Returns: - Dictionary of query parameters - - Examples: - >>> parse_query_string('?name=John&age=30') - {'name': ['John'], 'age': ['30']} - """ - import urllib.parse - - if query.startswith("?"): - query = query[1:] - return urllib.parse.parse_qs(query) - - -def build_query_string(params: dict) -> str: - """ - Build a query string from a dictionary of parameters. - - Args: - params: Dictionary of parameters - - Returns: - URL-encoded query string - - Examples: - >>> build_query_string({'name': 'John', 'age': 30}) - 'name=John&age=30' - """ - import urllib.parse - - # Handle both single values and lists - normalized_params = {} - for key, value in params.items(): - if isinstance(value, (list, tuple)): - normalized_params[key] = value - else: - normalized_params[key] = [str(value)] - - return urllib.parse.urlencode(normalized_params, doseq=True)