diff --git a/examples/plugins/player/config.toml b/examples/plugins/player/config.toml new file mode 100644 index 0000000..e91cc1f --- /dev/null +++ b/examples/plugins/player/config.toml @@ -0,0 +1,15 @@ +[multi-file-provider] +# Base URL for the anime site +base_url = "https://multifile.example.site" + +# Request timeout in seconds +timeout = 30 + +# Preferred video quality +preferred_quality = "720p" + +# Maximum number of search results +max_results = 25 + +# Enable debug logging +debug = false diff --git a/examples/plugins/player/player.py b/examples/plugins/player/player.py new file mode 100644 index 0000000..a10fb80 --- /dev/null +++ b/examples/plugins/player/player.py @@ -0,0 +1,169 @@ +""" +VLC player integration for Viu. + +This module provides the VlcPlayer class, which implements the BasePlayer interface for the VLC media player. +""" + +import logging +import shutil +import subprocess + +from viu_media.core.config import VlcConfig +from viu_media.core.exceptions import ViuError +from viu_media.core.patterns import TORRENT_REGEX, YOUTUBE_REGEX +from viu_media.core.utils import detect +from viu_media.libs.player.base import BasePlayer +from viu_media.libs.player.params import PlayerParams +from viu_media.libs.player.types import PlayerResult + +logger = logging.getLogger(__name__) + + +class VlcPlayer(BasePlayer): + """ + VLC player implementation for Viu. + + Provides playback functionality using the VLC media player, supporting desktop, mobile, and torrent scenarios. + """ + + def __init__(self, config: VlcConfig): + """ + Initialize the VlcPlayer with the given VLC configuration. + + Args: + config: VlcConfig object containing VLC-specific settings. + """ + self.config = config + self.executable = shutil.which("vlc") + + def play(self, params: PlayerParams) -> PlayerResult: + """ + Play the given media using VLC, handling desktop, mobile, and torrent scenarios. + + Args: + params: PlayerParams object containing playback parameters. + + Returns: + PlayerResult: Information about the playback session. + """ + if not self.executable: + raise ViuError("VLC executable not found in PATH.") + + if TORRENT_REGEX.match(params.url) and detect.is_running_in_termux(): + return self._play_on_mobile(params) + else: + return self._play_on_desktop(params) + + def play_with_ipc(self, params: PlayerParams, socket_path: str) -> subprocess.Popen: + """ + Not implemented for VLC player. + """ + raise NotImplementedError("play_with_ipc is not implemented for VLC player.") + + def _play_on_mobile(self, params: PlayerParams) -> PlayerResult: + """ + Play media on a mobile device using Android intents. + + Args: + params: PlayerParams object containing playback parameters. + + Returns: + PlayerResult: Information about the playback session. + """ + if YOUTUBE_REGEX.match(params.url): + args = [ + "nohup", + "am", + "start", + "--user", + "0", + "-a", + "android.intent.action.VIEW", + "-d", + params.url, + "-n", + "com.google.android.youtube/.UrlActivity", + ] + else: + args = [ + "nohup", + "am", + "start", + "--user", + "0", + "-a", + "android.intent.action.VIEW", + "-d", + params.url, + "-n", + "org.videolan.vlc/org.videolan.vlc.gui.video.VideoPlayerActivity", + "-e", + "title", + params.title, + ] + + subprocess.run(args) + + return PlayerResult(episode=params.episode) + + def _play_on_desktop(self, params: PlayerParams) -> PlayerResult: + """ + Play media on a desktop environment using VLC. + + Args: + params: PlayerParams object containing playback parameters. + + Returns: + PlayerResult: Information about the playback session. + """ + if TORRENT_REGEX.search(params.url): + return self._stream_on_desktop_with_webtorrent_cli(params) + + args = [self.executable, params.url] + if params.subtitles: + for sub in params.subtitles: + args.extend(["--sub-file", sub]) + break + if params.title: + args.extend(["--video-title", params.title]) + + if self.config.args: + args.extend(self.config.args.split(",")) + + subprocess.run(args, encoding="utf-8") + return PlayerResult(episode=params.episode) + + def _stream_on_desktop_with_webtorrent_cli( + self, params: PlayerParams + ) -> PlayerResult: + """ + Stream torrent media using the webtorrent CLI and VLC. + + Args: + params: PlayerParams object containing playback parameters. + + Returns: + PlayerResult: Information about the playback session. + """ + WEBTORRENT_CLI = shutil.which("webtorrent") + if not WEBTORRENT_CLI: + raise ViuError("Please Install webtorrent cli inorder to stream torrents") + + args = [WEBTORRENT_CLI, params.url, "--vlc"] + + if self.config.args: + args.append("--player-args") + args.extend(self.config.args.split(",")) + + subprocess.run(args) + return PlayerResult(episode=params.episode) + + +if __name__ == "__main__": + from viu_media.core.constants import APP_ASCII_ART + + print(APP_ASCII_ART) + url = input("Enter the url you would like to stream: ") + vlc = VlcPlayer(VlcConfig()) + player_result = vlc.play(PlayerParams(url=url, title="", query="", episode="")) + print(player_result) diff --git a/examples/plugins/player/plugin.info.toml b/examples/plugins/player/plugin.info.toml new file mode 100644 index 0000000..418fe1e --- /dev/null +++ b/examples/plugins/player/plugin.info.toml @@ -0,0 +1,9 @@ +[plugin] +name = "Multi-File Provider Plugin" +version = "1.0.0" +description = "A demo plugin with multiple Python files" +author = "Viu Developer" +requires_python = ">=3.11" + +[components] +player = "player:VlcPlayer" diff --git a/examples/plugins/provider/config.toml b/examples/plugins/provider/config.toml new file mode 100644 index 0000000..52075f1 --- /dev/null +++ b/examples/plugins/provider/config.toml @@ -0,0 +1,18 @@ +# Default configuration for Example Provider Plugin +# This file is automatically copied to ~/.config/viu/plugins.config.toml during installation + +[example-provider] +# Request timeout in seconds +timeout = 30 + +# Preferred video quality +preferred_quality = "720p" + +# Maximum number of search results to return +max_results = 20 + +# Custom headers (optional) +# custom_header = "value" + +# Enable debug logging for this plugin +# debug = false diff --git a/examples/plugins/provider/constants.py b/examples/plugins/provider/constants.py new file mode 100644 index 0000000..af8cf49 --- /dev/null +++ b/examples/plugins/provider/constants.py @@ -0,0 +1,37 @@ +import re + +ANIMEPAHE = "animepahe.ru" +ANIMEPAHE_BASE = f"https://{ANIMEPAHE}" +ANIMEPAHE_ENDPOINT = f"{ANIMEPAHE_BASE}/api" + +SERVERS_AVAILABLE = ["kwik"] +REQUEST_HEADERS = { + "Cookie": "__ddgid_=VvX0ebHrH2DsFZo4; __ddgmark_=3savRpSVFhvZcn5x; __ddg2_=buBJ3c4pNBYKFZNp; __ddg1_=rbVADKr9URtt55zoIGFa; SERVERID=janna; XSRF-TOKEN=eyJpdiI6IjV5bFNtd0phUHgvWGJxc25wL0VJSUE9PSIsInZhbHVlIjoicEJTZktlR2hxR2JZTWhnL0JzazlvZU5TQTR2bjBWZ2dDb0RwUXVUUWNSclhQWUhLRStYSmJmWmUxWkpiYkFRYU12RjFWejlSWHorME1wZG5qQ1U0TnFlNnBFR2laQjN1MjdyNjc5TjVPdXdJb2o5VkU1bEduRW9pRHNDTHh6Sy8iLCJtYWMiOiI0OTc0ZmNjY2UwMGJkOWY2MWNkM2NlMjk2ZGMyZGJmMWE0NTdjZTdkNGI2Y2IwNTIzZmFiZWU5ZTE2OTk0YmU4IiwidGFnIjoiIn0%3D; laravel_session=eyJpdiI6ImxvdlpqREFnTjdaeFJubUlXQWlJVWc9PSIsInZhbHVlIjoiQnE4R3VHdjZ4M1NDdEVWM1ZqMUxtNnVERnJCcmtCUHZKNzRPR2RFbzNFcStTL29xdnVTbWhsNVRBUXEybVZWNU1UYVlTazFqYlN5UjJva1k4czNGaXBTbkJJK01oTUd3VHRYVHBoc3dGUWxHYnFlS2NJVVNFbTFqMVBWdFpuVUgiLCJtYWMiOiI1NDdjZTVkYmNhNjUwZTMxZmRlZmVmMmRlMGNiYjAwYjlmYjFjY2U0MDc1YTQzZThiMTIxMjJlYTg1NTA4YjBmIiwidGFnIjoiIn0%3D; latest=5592", + "Host": ANIMEPAHE, + "Accept": "application, text/javascript, */*; q=0.01", + "Accept-Encoding": "Utf-8", + "Referer": ANIMEPAHE_BASE, + "DNT": "1", + "Connection": "keep-alive", + "Sec-Fetch-Dest": "empty", + "Sec-Fetch-Site": "same-origin", + "Sec-Fetch-Mode": "cors", + "TE": "trailers", +} +SERVER_HEADERS = { + "Host": "kwik.si", + "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/png,image/svg+xml,*/*;q=0.8", + "Accept-Language": "en-US,en;q=0.5", + "Accept-Encoding": "Utf-8", + "DNT": "1", + "Connection": "keep-alive", + "Referer": "https://animepahe.ru/", + "Upgrade-Insecure-Requests": "1", + "Sec-Fetch-Dest": "iframe", + "Sec-Fetch-Mode": "navigate", + "Sec-Fetch-Site": "cross-site", + "Priority": "u=4", + "TE": "trailers", +} +JUICY_STREAM_REGEX = re.compile(r"source='(.*)';") +KWIK_RE = re.compile(r"Player\|(.+?)'") diff --git a/examples/plugins/provider/extractor.py b/examples/plugins/provider/extractor.py new file mode 100644 index 0000000..349aa74 --- /dev/null +++ b/examples/plugins/provider/extractor.py @@ -0,0 +1,77 @@ +import re + + +def animepahe_key_creator(c: int, a: int): + from ...scraping.utils import encode_base_n + + if c < a: + val_a = "" + else: + val_a = animepahe_key_creator(int(c / a), a) + c = c % a + if c > 35: + val_b = chr(c + 29) + else: + val_b = encode_base_n(c, 36) + return val_a + val_b + + +def animepahe_embed_decoder( + encoded_js_p: str, + base_a: int, + no_of_keys_c: int, + values_to_replace_with_k: list, +): + decode_mapper_d: dict = {} + for i in range(no_of_keys_c): + key = animepahe_key_creator(i, base_a) + val = values_to_replace_with_k[i] or key + decode_mapper_d[key] = val + return re.sub( + r"\b\w+\b", lambda match: decode_mapper_d[match.group(0)], encoded_js_p + ) + + +PARAMETERS_REGEX = re.compile(r"eval\(function\(p,a,c,k,e,d\)\{.*\}\((.*?)\)\)$") +ENCODE_JS_REGEX = re.compile(r"'(.*?);',(\d+),(\d+),'(.*)'\.split") + + +def process_animepahe_embed_page(embed_page: str): + from ...scraping.html_parser import get_element_text_and_html_by_tag + + encoded_js_string = "" + embed_page_content = embed_page + for _ in range(8): + text, html = get_element_text_and_html_by_tag("script", embed_page_content) + if not text and html: + embed_page_content = re.sub(html, "", embed_page_content) + continue + if text: + encoded_js_string = text.strip() + break + if not encoded_js_string: + return + obsfucated_js_parameter_match = PARAMETERS_REGEX.search(encoded_js_string) + if not obsfucated_js_parameter_match: + return + parameter_string = obsfucated_js_parameter_match.group(1) + encoded_js_parameter_string = ENCODE_JS_REGEX.search(parameter_string) + if not encoded_js_parameter_string: + return + p: str = encoded_js_parameter_string.group(1) + a: int = int(encoded_js_parameter_string.group(2)) + c: int = int(encoded_js_parameter_string.group(3)) + k: list = encoded_js_parameter_string.group(4).split("|") + return animepahe_embed_decoder(p, a, c, k).replace("\\", "") + + +if __name__ == "__main__": + # Testing time + filepath = input("Enter file name: ") + if filepath: + with open(filepath) as file: + data = file.read() + else: + data = """""" + + print(process_animepahe_embed_page(data)) diff --git a/examples/plugins/provider/mappers.py b/examples/plugins/provider/mappers.py new file mode 100644 index 0000000..8415bac --- /dev/null +++ b/examples/plugins/provider/mappers.py @@ -0,0 +1,100 @@ +from typing import Any + +from viu_media.libs.provider.anime.types import ( + Anime, + AnimeEpisodeInfo, + AnimeEpisodes, + EpisodeStream, + MediaTranslationType, + PageInfo, + SearchResult, + SearchResults, + Server, +) + +from .types import ( + AnimePaheAnimePage, + AnimePaheSearchPage, +) + +translation_type_map = { + "sub": MediaTranslationType.SUB, + "dub": MediaTranslationType.DUB, + "raw": MediaTranslationType.RAW, +} + + +def map_to_search_results(data: AnimePaheSearchPage) -> SearchResults: + results = [] + for result in data["data"]: + results.append( + SearchResult( + id=result["session"], + title=result["title"], + episodes=AnimeEpisodes( + sub=list(map(str, range(1, result["episodes"] + 1))), + dub=list(map(str, range(1, result["episodes"] + 1))), + raw=list(map(str, range(1, result["episodes"] + 1))), + ), + media_type=result["type"], + score=result["score"], + status=result["status"], + season=result["season"], + poster=result["poster"], + year=str(result["year"]), + ) + ) + + return SearchResults( + page_info=PageInfo( + total=data["total"], + per_page=data["per_page"], + current_page=data["current_page"], + ), + results=results, + ) + + +def map_to_anime_result( + search_result: SearchResult, anime: AnimePaheAnimePage +) -> Anime: + episodes_info = [] + episodes = [] + anime["data"] = sorted(anime["data"], key=lambda k: float(k["episode"])) + for ep_info in anime["data"]: + episodes.append(str(ep_info["episode"])) + episodes_info.append( + AnimeEpisodeInfo( + id=str(ep_info["id"]), + session_id=ep_info["session"], + episode=str(ep_info["episode"]), + title=ep_info["title"], + poster=ep_info["snapshot"], + duration=str(ep_info["duration"]), + ) + ) + + return Anime( + id=search_result.id, + title=search_result.title, + episodes=AnimeEpisodes( + sub=episodes, + dub=episodes, + ), + year=str(search_result.year), + poster=search_result.poster, + episodes_info=episodes_info, + ) + + +def map_to_server( + episode: AnimeEpisodeInfo, translation_type: Any, quality: Any, stream_link: Any +) -> Server: + links = [ + EpisodeStream( + link=stream_link, + quality=quality, + translation_type=translation_type_map[translation_type], + ) + ] + return Server(name="kwik", links=links, episode_title=episode.title) diff --git a/examples/plugins/provider/plugin.info.toml b/examples/plugins/provider/plugin.info.toml new file mode 100644 index 0000000..bd45993 --- /dev/null +++ b/examples/plugins/provider/plugin.info.toml @@ -0,0 +1,10 @@ +[plugin] +name = "Example Provider Plugin" +version = "1.0.0" +description = "A demo provider plugin for testing the viu plugin system" +author = "Viu Developer" +homepage = "https://github.com/example/viu-example-plugin" +requires_python = ">=3.11" + +[components] +provider = "example_provider:ExampleProvider" diff --git a/examples/plugins/provider/provider.py b/examples/plugins/provider/provider.py new file mode 100644 index 0000000..d56c3bc --- /dev/null +++ b/examples/plugins/provider/provider.py @@ -0,0 +1,207 @@ +import logging +from functools import lru_cache +from typing import Iterator, Optional + +from viu_media.libs.provider.anime.base import BaseAnimeProvider +from viu_media.libs.provider.anime.params import ( + AnimeParams, + EpisodeStreamsParams, + SearchParams, +) +from viu_media.libs.provider.anime.types import ( + Anime, + AnimeEpisodeInfo, + SearchResult, + SearchResults, + Server, +) +from viu_media.libs.provider.anime.utils.debug import debug_provider + +from .constants import ( + ANIMEPAHE_BASE, + ANIMEPAHE_ENDPOINT, + JUICY_STREAM_REGEX, + REQUEST_HEADERS, + SERVER_HEADERS, +) +from .extractor import process_animepahe_embed_page +from .mappers import map_to_anime_result, map_to_search_results, map_to_server +from .types import AnimePaheAnimePage, AnimePaheSearchPage + +logger = logging.getLogger(__name__) + + +class AnimePahe(BaseAnimeProvider): + HEADERS = REQUEST_HEADERS + + @debug_provider + def search(self, params: SearchParams) -> SearchResults | None: + return self._search(params) + + @lru_cache() + def _search(self, params: SearchParams) -> SearchResults | None: + url_params = {"m": "search", "q": params.query} + response = self.client.get(ANIMEPAHE_ENDPOINT, params=url_params) + response.raise_for_status() + data: AnimePaheSearchPage = response.json() + if not data.get("data"): + return + return map_to_search_results(data) + + @debug_provider + def get(self, params: AnimeParams) -> Anime | None: + return self._get_anime(params) + + @lru_cache() + def _get_anime(self, params: AnimeParams) -> Anime | None: + page = 1 + standardized_episode_number = 0 + + search_result = self._get_search_result(params) + if not search_result: + logger.error(f"No search result found for ID {params.id}") + return None + + anime: Optional[AnimePaheAnimePage] = None + + has_next_page = True + while has_next_page: + logger.debug(f"Loading page: {page}") + _anime_page = self._anime_page_loader( + m="release", + id=params.id, + sort="episode_asc", + page=page, + ) + + has_next_page = True if _anime_page["next_page_url"] else False + page += 1 + if not anime: + anime = _anime_page + else: + anime["data"].extend(_anime_page["data"]) + + if anime: + for episode in anime.get("data", []): + if episode["episode"] % 1 == 0: + standardized_episode_number += 1 + episode.update({"episode": standardized_episode_number}) + else: + standardized_episode_number += episode["episode"] % 1 + episode.update({"episode": standardized_episode_number}) + standardized_episode_number = int(standardized_episode_number) + + return map_to_anime_result(search_result, anime) + + @lru_cache() + def _get_search_result(self, params: AnimeParams) -> Optional[SearchResult]: + search_results = self._search(SearchParams(query=params.query)) + if not search_results or not search_results.results: + logger.error(f"No search results found for ID {params.id}") + return None + for search_result in search_results.results: + if search_result.id == params.id: + return search_result + + @lru_cache() + def _anime_page_loader(self, m, id, sort, page) -> AnimePaheAnimePage: + url_params = { + "m": m, + "id": id, + "sort": sort, + "page": page, + } + response = self.client.get(ANIMEPAHE_ENDPOINT, params=url_params) + response.raise_for_status() + return response.json() + + @debug_provider + def episode_streams(self, params: EpisodeStreamsParams) -> Iterator[Server] | None: + from viu_media.libs.provider.scraping.html_parser import ( + extract_attributes, + get_element_by_id, + get_elements_html_by_class, + ) + + episode = self._get_episode_info(params) + if not episode: + logger.error( + f"Episode {params.episode} doesn't exist for anime {params.anime_id}" + ) + return + + url = f"{ANIMEPAHE_BASE}/play/{params.anime_id}/{episode.session_id}" + response = self.client.get(url, follow_redirects=True) + response.raise_for_status() + + c = get_element_by_id("resolutionMenu", response.text) + if not c: + logger.error("Resolution menu not found in the response") + return + resolutionMenuItems = get_elements_html_by_class("dropdown-item", c) + res_dicts = [extract_attributes(item) for item in resolutionMenuItems] + quality = None + translation_type = None + stream_link = None + + # TODO: better document the scraping process + for res_dict in res_dicts: + # the actual attributes are data attributes in the original html 'prefixed with data-' + embed_url = res_dict["src"] + data_audio = "dub" if res_dict["audio"] == "eng" else "sub" + + if data_audio != params.translation_type: + continue + + if not embed_url: + logger.warning("embed url not found please report to the developers") + continue + + embed_response = self.client.get( + embed_url, + headers={ + "User-Agent": self.client.headers["User-Agent"], + **SERVER_HEADERS, + }, + ) + embed_response.raise_for_status() + embed_page = embed_response.text + + decoded_js = process_animepahe_embed_page(embed_page) + if not decoded_js: + logger.error("failed to decode embed page") + continue + juicy_stream = JUICY_STREAM_REGEX.search(decoded_js) + if not juicy_stream: + logger.error("failed to find juicy stream") + continue + juicy_stream = juicy_stream.group(1) + quality = res_dict["resolution"] + translation_type = data_audio + stream_link = juicy_stream + + if translation_type and quality and stream_link: + yield map_to_server(episode, translation_type, quality, stream_link) + + @lru_cache() + def _get_episode_info( + self, params: EpisodeStreamsParams + ) -> Optional[AnimeEpisodeInfo]: + anime_info = self._get_anime( + AnimeParams(id=params.anime_id, query=params.query) + ) + if not anime_info: + logger.error(f"No anime info for {params.anime_id}") + return + if not anime_info.episodes_info: + logger.error(f"No episodes info for {params.anime_id}") + return + for episode in anime_info.episodes_info: + if episode.episode == params.episode: + return episode + + +if __name__ == "__main__": + from viu_media.libs.provider.anime.utils.debug import test_anime_provider + + test_anime_provider(AnimePahe) diff --git a/examples/plugins/provider/types.py b/examples/plugins/provider/types.py new file mode 100644 index 0000000..365eaac --- /dev/null +++ b/examples/plugins/provider/types.py @@ -0,0 +1,108 @@ +from enum import Enum +from typing import Literal, TypedDict + + +class Server(Enum): + KWIK = "Kwik" + + +class AnimePaheSearchResult(TypedDict): + id: str + title: str + type: str + episodes: int + status: str + season: str + year: int + score: int + poster: str + session: str + + +class AnimePaheSearchPage(TypedDict): + total: int + per_page: int + current_page: int + last_page: int + _from: int + to: int + data: list[AnimePaheSearchResult] + + +class Episode(TypedDict): + id: str + anime_id: int + episode: float + episode2: int + edition: str + title: str + snapshot: str # episode image + disc: str + audio: Literal["eng", "jpn"] + duration: str # time 00:00:00 + session: str + filler: int + created_at: str + + +class AnimePaheAnimePage(TypedDict): + total: int + per_page: int + current_page: int + last_page: int + next_page_url: str | None + prev_page_url: str | None + _from: int + to: int + data: list[Episode] + + +class AnimePaheEpisodeInfo(TypedDict): + title: str + episode: float + id: str + translation_type: Literal["eng", "jpn"] + duration: str + poster: str + + +class AvailableEpisodesDetail(TypedDict): + sub: list[str] + dub: list[str] + raw: list[str] + + +class AnimePaheAnime(TypedDict): + id: str + title: str + year: int + season: str + poster: str + score: int + availableEpisodesDetail: AvailableEpisodesDetail + episodesInfo: list[AnimePaheEpisodeInfo] + + +class PageInfo(TypedDict): + total: int + perPage: int + currentPage: int + + +class AnimePaheSearchResults(TypedDict): + pageInfo: PageInfo + results: list[AnimePaheSearchResult] + + +class AnimePaheStreamLink(TypedDict): + quality: str + translation_type: Literal["sub", "dub"] + link: str + + +class AnimePaheServer(TypedDict): + server: Literal["kwik"] + links: list[AnimePaheStreamLink] + episode_title: str + subtitles: list + headers: dict diff --git a/viu_media/core/plugins/manager.py b/viu_media/core/plugins/manager.py index 04cb097..5116f3d 100644 --- a/viu_media/core/plugins/manager.py +++ b/viu_media/core/plugins/manager.py @@ -15,10 +15,11 @@ from typing import Any, Dict, Literal, Optional, Set, Union import tomli_w from pydantic import ValidationError +from viu_media.core.exceptions import ViuError from ..constants import PLUGINS_CONFIG, PLUGINS_DIR, PLUGINS_MANIFEST from .model import InstalledPlugin, PluginInfo, PluginManifest -from viu_media.core.exceptions import ViuError + logger = logging.getLogger(__name__) ComponentType = Literal["provider", "player", "selector", "command"] @@ -26,152 +27,148 @@ ComponentType = Literal["provider", "player", "selector", "command"] class PluginError(ViuError): """Base exception for plugin-related errors.""" + pass class PluginNotFoundError(ViuError): """Raised when a requested plugin is not found.""" + pass class PluginLoadError(ViuError): """Raised when a plugin fails to load.""" + pass class PluginManager: """Manages the plugin system for viu. - + This is a singleton class that handles: - Loading and caching plugins - Installing and removing plugins from Git repositories - Managing plugin configurations - Discovering available plugins """ - + _instance: Optional["PluginManager"] = None _initialized: bool = False - + def __new__(cls) -> "PluginManager": if cls._instance is None: cls._instance = super().__new__(cls) return cls._instance - + def __init__(self) -> None: if self._initialized: return - + self._loaded_components: Dict[str, Any] = {} self._manifest: PluginManifest = PluginManifest() self._plugin_configs: Dict[str, Dict[str, Any]] = {} - + self._load_manifest() self._load_plugin_configs() - + self._initialized = True - + def load_component(self, component_type: ComponentType, name: str) -> Any: """Lazy-load a plugin component by type and name. - + Args: component_type: Type of component (provider, player, selector, command) name: Name of the component to load - + Returns: The loaded component instance or function - + Raises: PluginNotFoundError: If the plugin is not installed PluginLoadError: If the plugin fails to load """ cache_key = f"{component_type}:{name}" - + # Return cached component if already loaded if cache_key in self._loaded_components: return self._loaded_components[cache_key] - + # Find the plugin in the manifest plugins_of_type = getattr(self._manifest, f"{component_type}s") if name not in plugins_of_type: raise PluginNotFoundError( f"Plugin '{name}' of type '{component_type}' is not installed" ) - + plugin_entry = plugins_of_type[name] plugin_path = plugin_entry.path - + if not plugin_path.exists(): - raise PluginLoadError( - f"Plugin path does not exist: {plugin_path}" - ) - + raise PluginLoadError(f"Plugin path does not exist: {plugin_path}") + # Load plugin info to get component definition try: plugin_info = self._get_plugin_info(plugin_path) except PluginError as e: raise PluginLoadError(f"Failed to load plugin info: {e}") from e - + # Get the component definition component_def = getattr(plugin_info.components, component_type) if not component_def: raise PluginLoadError( f"Plugin '{name}' does not provide a {component_type} component" ) - + # Parse module:class format if ":" not in component_def: - raise PluginLoadError( - f"Invalid component definition: {component_def}" - ) - + raise PluginLoadError(f"Invalid component definition: {component_def}") + module_name, class_name = component_def.split(":", 1) - + # Load the module module_path = plugin_path / f"{module_name}.py" if not module_path.exists(): raise PluginLoadError(f"Plugin module not found: {module_path}") - + try: spec = importlib.util.spec_from_file_location( f"plugin_{name}_{module_name}", module_path ) if spec is None or spec.loader is None: raise PluginLoadError(f"Could not create module spec for {module_path}") - + module = importlib.util.module_from_spec(spec) - + # Add plugin path to sys.path temporarily for relative imports sys.path.insert(0, str(plugin_path)) try: spec.loader.exec_module(module) finally: sys.path.remove(str(plugin_path)) - + except Exception as e: raise PluginLoadError(f"Failed to load module {module_path}: {e}") from e - + # Get the component class/function if not hasattr(module, class_name): - raise PluginLoadError( - f"Module {module_name} does not have {class_name}" - ) - + raise PluginLoadError(f"Module {module_name} does not have {class_name}") + component_cls = getattr(module, class_name) - + # For providers, players, and selectors, instantiate with config if component_type in ("provider", "player", "selector"): plugin_config = self._plugin_configs.get(name, {}) - + # For providers, also inject httpx client like the built-in system if component_type == "provider": - from ...core.utils.networking import random_user_agent from httpx import Client - + + from ...core.utils.networking import random_user_agent + headers = getattr(component_cls, "HEADERS", {}) - client = Client( - headers={"User-Agent": random_user_agent(), **headers} - ) - + client = Client(headers={"User-Agent": random_user_agent(), **headers}) + try: component = component_cls(client, **plugin_config) except TypeError: @@ -186,54 +183,50 @@ class PluginManager: else: # For commands, just return the function component = component_cls - + # Cache and return self._loaded_components[cache_key] = component logger.debug(f"Loaded plugin component: {cache_key}") return component - + def add_plugin( - self, - component_type: ComponentType, - name: str, - source: str, - force: bool = False + self, component_type: ComponentType, name: str, source: str, force: bool = False ) -> None: """Install a plugin from a Git repository. - + Args: component_type: Type of component the plugin provides name: Local name for the plugin source: Git source (e.g., "github:user/repo") force: Whether to overwrite existing plugin - + Raises: PluginError: If installation fails """ plugins_of_type = getattr(self._manifest, f"{component_type}s") - + # Check if plugin already exists if name in plugins_of_type and not force: raise PluginError( f"Plugin '{name}' already exists. Use --force to overwrite." ) - + # Determine installation path plugin_dir = PLUGINS_DIR / f"{component_type}s" / name - + # Remove existing if force is True if plugin_dir.exists(): if force: shutil.rmtree(plugin_dir) else: raise PluginError(f"Plugin directory already exists: {plugin_dir}") - + # Create parent directory plugin_dir.parent.mkdir(parents=True, exist_ok=True) - + # Clone the repository self._clone_plugin(source, plugin_dir) - + # Validate plugin structure try: plugin_info = self._get_plugin_info(plugin_dir) @@ -241,94 +234,90 @@ class PluginManager: # Clean up on validation failure shutil.rmtree(plugin_dir) raise - + # Ensure plugin provides the expected component type expected_component = getattr(plugin_info.components, component_type) if not expected_component: shutil.rmtree(plugin_dir) - raise PluginError( - f"Plugin does not provide a {component_type} component" - ) - + raise PluginError(f"Plugin does not provide a {component_type} component") + # Add to manifest plugins_of_type[name] = InstalledPlugin( - source=source, - path=plugin_dir, - version=plugin_info.plugin.version + source=source, path=plugin_dir, version=plugin_info.plugin.version ) - + # Save manifest self._save_manifest() - + # Copy default config if it exists self._install_default_config(name, plugin_dir) - + logger.info(f"Successfully installed {component_type} plugin '{name}'") - + def remove_plugin(self, component_type: ComponentType, name: str) -> None: """Remove an installed plugin. - + Args: component_type: Type of component name: Name of the plugin to remove - + Raises: PluginNotFoundError: If plugin is not installed PluginError: If removal fails """ plugins_of_type = getattr(self._manifest, f"{component_type}s") - + if name not in plugins_of_type: raise PluginNotFoundError( f"Plugin '{name}' of type '{component_type}' is not installed" ) - + plugin_entry = plugins_of_type[name] plugin_path = plugin_entry.path - + # Remove from filesystem if plugin_path.exists(): try: shutil.rmtree(plugin_path) except OSError as e: raise PluginError(f"Failed to remove plugin directory: {e}") from e - + # Remove from manifest del plugins_of_type[name] - + # Remove from loaded components cache cache_key = f"{component_type}:{name}" self._loaded_components.pop(cache_key, None) - + # Save manifest self._save_manifest() - + logger.info(f"Successfully removed {component_type} plugin '{name}'") - + def update_plugin(self, component_type: ComponentType, name: str) -> None: """Update an installed plugin by pulling from Git. - + Args: component_type: Type of component name: Name of the plugin to update - + Raises: PluginNotFoundError: If plugin is not installed PluginError: If update fails """ plugins_of_type = getattr(self._manifest, f"{component_type}s") - + if name not in plugins_of_type: raise PluginNotFoundError( f"Plugin '{name}' of type '{component_type}' is not installed" ) - + plugin_entry = plugins_of_type[name] plugin_path = plugin_entry.path - + if not plugin_path.exists(): raise PluginError(f"Plugin path does not exist: {plugin_path}") - + # Pull latest changes try: result = subprocess.run( @@ -336,14 +325,14 @@ class PluginManager: cwd=plugin_path, check=True, capture_output=True, - text=True + text=True, ) logger.debug(f"Git pull output: {result.stdout}") except subprocess.CalledProcessError as e: raise PluginError(f"Failed to update plugin: {e.stderr}") from e except FileNotFoundError: raise PluginError("Git is not installed or not in PATH") from None - + # Update version in manifest try: plugin_info = self._get_plugin_info(plugin_path) @@ -351,16 +340,16 @@ class PluginManager: self._save_manifest() except PluginError as e: logger.warning(f"Could not update plugin version: {e}") - + # Clear from cache to force reload cache_key = f"{component_type}:{name}" self._loaded_components.pop(cache_key, None) - + logger.info(f"Successfully updated {component_type} plugin '{name}'") - + def list_plugins(self) -> Dict[ComponentType, Dict[str, InstalledPlugin]]: """List all installed plugins grouped by type. - + Returns: Dictionary mapping component types to their installed plugins """ @@ -370,75 +359,80 @@ class PluginManager: "selector": dict(self._manifest.selectors), "command": dict(self._manifest.commands), } - + def get_available_components(self, component_type: ComponentType) -> Set[str]: """Get names of all available components of a given type. - + This includes both built-in components and installed plugins. - + Args: component_type: Type of component - + Returns: Set of component names """ # Get plugin names plugins_of_type = getattr(self._manifest, f"{component_type}s") plugin_names = set(plugins_of_type.keys()) - + # Add built-in component names if component_type == "provider": from ...libs.provider.anime.provider import PROVIDERS_AVAILABLE + builtin_names = set(PROVIDERS_AVAILABLE.keys()) elif component_type == "player": from ...libs.player.player import PLAYERS + builtin_names = set(PLAYERS) elif component_type == "selector": from ...libs.selectors.selector import SELECTORS + builtin_names = set(SELECTORS) elif component_type == "command": # Commands would need to be handled differently as they're registered in CLI builtin_names = set() else: builtin_names = set() - + return plugin_names | builtin_names - + def is_plugin(self, component_type: ComponentType, name: str) -> bool: """Check if a component is provided by a plugin. - + Args: component_type: Type of component name: Name of the component - + Returns: True if it's a plugin, False if it's built-in """ plugins_of_type = getattr(self._manifest, f"{component_type}s") return name in plugins_of_type - + def _load_manifest(self) -> None: """Load the plugins.toml manifest file.""" if not PLUGINS_MANIFEST.exists(): logger.debug("No plugins manifest found, creating empty one") self._save_manifest() return - + try: with open(PLUGINS_MANIFEST, "rb") as f: data = tomllib.load(f) self._manifest = PluginManifest.model_validate(data) - logger.debug(f"Loaded plugins manifest with {len(self.list_plugins())} plugins") + logger.debug( + f"Loaded plugins manifest with {len(self.list_plugins())} plugins" + ) except (OSError, ValidationError, tomllib.TOMLDecodeError) as e: logger.error(f"Failed to load plugins manifest: {e}") self._manifest = PluginManifest() - + def _save_manifest(self) -> None: """Save the current manifest to plugins.toml.""" try: # Convert Path objects to strings for TOML serialization manifest_dict = self._manifest.model_dump() - + # Convert all Path objects to strings def convert_paths(obj: Any) -> Any: if isinstance(obj, dict): @@ -449,22 +443,22 @@ class PluginManager: return str(obj) else: return obj - + manifest_dict = convert_paths(manifest_dict) - + with open(PLUGINS_MANIFEST, "wb") as f: tomli_w.dump(manifest_dict, f) logger.debug("Saved plugins manifest") except OSError as e: logger.error(f"Failed to save plugins manifest: {e}") raise PluginError(f"Could not save plugins manifest: {e}") from e - + def _load_plugin_configs(self) -> None: """Load plugin configurations from plugins.config.toml.""" if not PLUGINS_CONFIG.exists(): logger.debug("No plugin configs found") return - + try: with open(PLUGINS_CONFIG, "rb") as f: self._plugin_configs = tomllib.load(f) @@ -472,23 +466,23 @@ class PluginManager: except (OSError, tomllib.TOMLDecodeError) as e: logger.error(f"Failed to load plugin configs: {e}") self._plugin_configs = {} - + def _get_plugin_info(self, plugin_path: Path) -> PluginInfo: """Load and validate plugin.info.toml from a plugin directory.""" info_file = plugin_path / "plugin.info.toml" if not info_file.exists(): raise PluginError(f"Plugin info file not found: {info_file}") - + try: with open(info_file, "rb") as f: data = tomllib.load(f) return PluginInfo.model_validate(data) except (OSError, ValidationError, tomllib.TOMLDecodeError) as e: raise PluginError(f"Invalid plugin info file {info_file}: {e}") from e - + def _parse_git_source(self, source: str) -> tuple[str, str]: """Parse a git source string into platform and repo. - + Examples: "github:user/repo" -> ("github.com", "user/repo") "gitlab:user/repo" -> ("gitlab.com", "user/repo") @@ -499,9 +493,13 @@ class PluginManager: # Handle local file paths if source.startswith("file://"): return "local", source[7:] # Remove file:// prefix - elif source.startswith("/") or source.startswith("./") or source.startswith("../"): + elif ( + source.startswith("/") + or source.startswith("./") + or source.startswith("../") + ): return "local", source - + if source.startswith("http"): # Full URL provided if "github.com" in source: @@ -512,90 +510,99 @@ class PluginManager: return "gitlab.com", repo else: raise PluginError(f"Unsupported git host in URL: {source}") - + # Short format like "github:user/repo" if ":" not in source: raise PluginError(f"Invalid source format: {source}") - + platform, repo = source.split(":", 1) platform_map = { "github": "github.com", "gitlab": "gitlab.com", } - + if platform not in platform_map: raise PluginError(f"Unsupported platform: {platform}") - + return platform_map[platform], repo - + def _clone_plugin(self, source: str, dest_path: Path) -> None: """Clone a plugin repository from Git.""" platform, repo = self._parse_git_source(source) - + if platform == "local": # Handle local repository - just copy the directory import shutil + src_path = Path(repo).resolve() - + if not src_path.exists(): raise PluginError(f"Local repository path does not exist: {src_path}") - - if not (src_path / ".git").exists(): - raise PluginError(f"Path is not a Git repository: {src_path}") - - logger.info(f"Copying local Git repository from {src_path}") - - try: - # Use git clone to properly copy the repository - subprocess.run( - ["git", "clone", str(src_path), str(dest_path)], - check=True, - capture_output=True, - text=True - ) - except subprocess.CalledProcessError as e: - raise PluginError(f"Failed to clone local repository: {e.stderr}") from e + + if (src_path / ".git").exists(): + logger.info(f"Copying local Git repository from {src_path}") + + try: + # Use git clone to properly copy the repository + subprocess.run( + ["git", "clone", str(src_path), str(dest_path)], + check=True, + capture_output=True, + text=True, + ) + except subprocess.CalledProcessError as e: + raise PluginError( + f"Failed to clone local repository: {e.stderr}" + ) from e + else: + logger.warning("Copying non git repo, local plugin") + shutil.copytree(src_path, dest_path) + else: # Handle remote repository git_url = f"https://{platform}/{repo}.git" - + logger.info(f"Cloning plugin from {git_url}") - + try: subprocess.run( ["git", "clone", git_url, str(dest_path)], check=True, capture_output=True, - text=True + text=True, ) except subprocess.CalledProcessError as e: raise PluginError(f"Failed to clone plugin: {e.stderr}") from e - + if not dest_path.exists(): - raise PluginError("Plugin cloning failed - destination directory was not created") - + raise PluginError( + "Plugin cloning failed - destination directory was not created" + ) + # Check for git command availability try: subprocess.run(["git", "--version"], check=True, capture_output=True) except FileNotFoundError: raise PluginError("Git is not installed or not in PATH") from None - + def _install_default_config(self, plugin_name: str, plugin_dir: Path) -> None: """Install default configuration from plugin's config.toml if it exists.""" default_config_path = plugin_dir / "config.toml" - + if not default_config_path.exists(): logger.debug(f"No default config found for plugin '{plugin_name}'") return - + # Load the default config try: with open(default_config_path, "rb") as f: default_config = tomllib.load(f) except (OSError, tomllib.TOMLDecodeError) as e: - logger.warning(f"Failed to load default config for plugin '{plugin_name}': {e}") + logger.warning( + f"Failed to load default config for plugin '{plugin_name}': {e}" + ) return - + # Load existing plugins config or create empty dict if PLUGINS_CONFIG.exists(): try: @@ -606,25 +613,33 @@ class PluginManager: existing_config = {} else: existing_config = {} - + # Check if plugin config already exists if plugin_name in existing_config: - logger.debug(f"Plugin '{plugin_name}' config already exists, skipping default config installation") + logger.debug( + f"Plugin '{plugin_name}' config already exists, skipping default config installation" + ) return - + # Merge the default config if plugin_name in default_config: existing_config[plugin_name] = default_config[plugin_name] - + # Write the updated config try: with open(PLUGINS_CONFIG, "wb") as f: tomli_w.dump(existing_config, f) - logger.info(f"Installed default configuration for plugin '{plugin_name}'") + logger.info( + f"Installed default configuration for plugin '{plugin_name}'" + ) except OSError as e: - logger.warning(f"Failed to save default config for plugin '{plugin_name}': {e}") + logger.warning( + f"Failed to save default config for plugin '{plugin_name}': {e}" + ) else: - logger.debug(f"No config section found for plugin '{plugin_name}' in default config") + logger.debug( + f"No config section found for plugin '{plugin_name}' in default config" + ) # Global instance