Compare commits

...

2 Commits

Author SHA1 Message Date
Benexl
f99ff546d5 feat(plugins): init examples 2025-08-18 16:20:48 +03:00
Benexl
88b707e060 feat: plugins system 2025-08-18 16:02:37 +03:00
28 changed files with 2121 additions and 6 deletions

319
PLUGINS.md Normal file
View File

@@ -0,0 +1,319 @@
# Viu Plugin Development Guide
This guide explains how to create plugins for viu, the terminal-based anime streaming tool.
## Overview
Viu supports four types of plugins:
- **Providers**: Add support for new anime streaming websites
- **Players**: Add support for new media players
- **Selectors**: Add support for new interactive selection tools
- **Commands**: Add new CLI commands to viu
## Plugin Structure
Every plugin must be a Git repository with the following structure:
```
your-plugin-repo/
├── plugin.info.toml # Plugin metadata (required)
├── your_module.py # Your plugin implementation
├── config.toml # Default configuration (optional)
├── requirements.txt # Dependencies (optional)
├── utils.py # Additional modules (optional)
├── helpers/ # Subdirectories supported (optional)
│ ├── __init__.py
│ └── parser.py
└── README.md # Documentation (recommended)
```
### Multi-File Plugins
Viu supports plugins with multiple Python files. You can organize your plugin code across multiple modules and import between them normally:
```python
# In your main plugin file
from utils import helper_function
from helpers.parser import ResponseParser
class MyProvider(BaseAnimeProvider):
def __init__(self, client, **config):
self.parser = ResponseParser()
# ... rest of implementation
```
The plugin system automatically adds your plugin directory to Python's import path during loading, so relative imports work as expected.
### Plugin Manifest (`plugin.info.toml`)
Every plugin repository must contain a `plugin.info.toml` file at its root:
```toml
[plugin]
name = "My Awesome Plugin"
version = "1.0.0"
description = "Adds support for Example Anime Site"
author = "Your Name"
homepage = "https://github.com/yourname/viu-example-plugin"
requires_python = ">=3.11"
[components]
# Specify which components your plugin provides
provider = "example_provider:ExampleProvider" # format: module:class
# player = "my_player:MyPlayer" # (if providing a player)
# selector = "my_selector:MySelector" # (if providing a selector)
# command = "my_command:my_command_func" # (if providing a command)
```
## Provider Plugins
Provider plugins add support for new anime streaming websites.
### Requirements
Your provider class must inherit from `BaseAnimeProvider` and implement:
- `search(query: str) -> SearchResults`
- `get(anime_id: str) -> Anime`
- `episode_streams(anime_id: str, episode: str) -> List[Server]`
### Example Provider Plugin
**plugin.info.toml:**
```toml
[plugin]
name = "Example Anime Provider"
version = "1.0.0"
description = "Adds support for example.anime.site"
[components]
provider = "example_provider:ExampleProvider"
```
**example_provider.py:**
```python
from typing import List
from httpx import Client
# These imports work because viu adds the plugin path to sys.path
from viu_media.libs.provider.anime.base import BaseAnimeProvider
from viu_media.libs.provider.anime.types import SearchResults, Anime, Server
class ExampleProvider(BaseAnimeProvider):
HEADERS = {
"Referer": "https://example.anime.site/",
}
def __init__(self, client: Client, **config):
self.client = client
# Access plugin configuration
self.timeout = config.get("timeout", 30)
self.preferred_quality = config.get("preferred_quality", "720p")
def search(self, query: str) -> SearchResults:
# Implement search logic
response = self.client.get(f"https://example.anime.site/search?q={query}")
# Parse response and return SearchResults
return SearchResults(...)
def get(self, anime_id: str) -> Anime:
# Implement anime details fetching
response = self.client.get(f"https://example.anime.site/anime/{anime_id}")
# Parse response and return Anime
return Anime(...)
def episode_streams(self, anime_id: str, episode: str) -> List[Server]:
# Implement stream URL extraction
response = self.client.get(f"https://example.anime.site/watch/{anime_id}/{episode}")
# Parse response and return list of Server objects
return [Server(...)]
```
## Player Plugins
Player plugins add support for new media players.
### Requirements
Your player class must inherit from `BasePlayer` and implement:
- `play(media_url: str, **kwargs) -> None`
### Example Player Plugin
**plugin.info.toml:**
```toml
[plugin]
name = "Custom Player"
version = "1.0.0"
description = "Adds support for my custom media player"
[components]
player = "custom_player:CustomPlayer"
```
**custom_player.py:**
```python
import subprocess
from viu_media.libs.player.base import BasePlayer
class CustomPlayer(BasePlayer):
def __init__(self, **config):
self.executable = config.get("executable", "my-player")
self.extra_args = config.get("extra_args", [])
def play(self, media_url: str, **kwargs) -> None:
cmd = [self.executable] + self.extra_args + [media_url]
subprocess.run(cmd)
```
## Selector Plugins
Selector plugins add support for new interactive selection tools.
### Requirements
Your selector class must inherit from `BaseSelector` and implement:
- `choose(choices: List[str], **kwargs) -> str`
- `confirm(message: str, **kwargs) -> bool`
- `ask(message: str, **kwargs) -> str`
## Command Plugins
Command plugins add new CLI commands to viu.
### Example Command Plugin
**plugin.info.toml:**
```toml
[plugin]
name = "My Command"
version = "1.0.0"
description = "Adds a custom command to viu"
[components]
command = "my_command:my_command"
```
**my_command.py:**
```python
import click
@click.command()
@click.argument("arg1")
def my_command(arg1: str):
"""My custom command description."""
click.echo(f"Hello from plugin command with arg: {arg1}")
```
## Plugin Configuration
Plugins can include a default configuration file (`config.toml`) in their repository root. When a plugin is installed, this default configuration is automatically copied to the user's `~/.config/viu/plugins.config.toml` file.
**Example `config.toml` in plugin repository:**
```toml
# Default configuration for My Plugin
[my-plugin-name]
timeout = 30
preferred_quality = "720p"
custom_option = "default_value"
```
**After installation, users can customize by editing `~/.config/viu/plugins.config.toml`:**
```toml
[my-plugin-name]
timeout = 60 # Customized value
preferred_quality = "1080p" # Customized value
custom_option = "my_value" # Customized value
```
Access this configuration in your plugin constructor via the `**config` parameter.
## Installation and Usage
### For Plugin Developers
1. Create your plugin repository following the structure above
2. Test your plugin locally
3. Publish your repository on GitHub/GitLab
4. Share the installation command with users
### For Users
Install a plugin:
```bash
viu plugin add --type provider myplugin github:user/viu-myplugin
```
Configure the plugin by editing `~/.config/viu/plugins.config.toml`:
```toml
[myplugin]
option1 = "value1"
option2 = "value2"
```
Use the plugin:
```bash
viu --provider myplugin search "anime name"
```
## Dependencies
If your plugin requires additional Python packages, include a `requirements.txt` file in your repository root. Users will need to install these manually:
```bash
pip install -r requirements.txt
```
## Best Practices
1. **Error Handling**: Implement proper error handling and logging
2. **Configuration**: Make your plugin configurable through the config system
3. **Documentation**: Include a README.md with usage instructions
4. **Testing**: Test your plugin thoroughly before publishing
5. **Versioning**: Use semantic versioning for your plugin releases
6. **Compatibility**: Specify minimum Python version requirements
## Plugin Management Commands
```bash
# Install a plugin
viu plugin add --type provider myplugin github:user/viu-myplugin
# List installed plugins
viu plugin list
viu plugin list --type provider
# Update a plugin
viu plugin update --type provider myplugin
# Remove a plugin
viu plugin remove --type provider myplugin
```
## Example Plugins
Check out these example plugin repositories:
- [Example Provider Plugin](https://github.com/example/viu-example-provider)
- [Example Player Plugin](https://github.com/example/viu-example-player)
## Support
For plugin development support:
- Open an issue in the main viu repository
- Join the Discord server: https://discord.gg/C4rhMA4mmK

View File

@@ -0,0 +1,15 @@
[multi-file-provider]
# Base URL for the anime site
base_url = "https://multifile.example.site"
# Request timeout in seconds
timeout = 30
# Preferred video quality
preferred_quality = "720p"
# Maximum number of search results
max_results = 25
# Enable debug logging
debug = false

View File

@@ -0,0 +1,169 @@
"""
VLC player integration for Viu.
This module provides the VlcPlayer class, which implements the BasePlayer interface for the VLC media player.
"""
import logging
import shutil
import subprocess
from viu_media.core.config import VlcConfig
from viu_media.core.exceptions import ViuError
from viu_media.core.patterns import TORRENT_REGEX, YOUTUBE_REGEX
from viu_media.core.utils import detect
from viu_media.libs.player.base import BasePlayer
from viu_media.libs.player.params import PlayerParams
from viu_media.libs.player.types import PlayerResult
logger = logging.getLogger(__name__)
class VlcPlayer(BasePlayer):
"""
VLC player implementation for Viu.
Provides playback functionality using the VLC media player, supporting desktop, mobile, and torrent scenarios.
"""
def __init__(self, config: VlcConfig):
"""
Initialize the VlcPlayer with the given VLC configuration.
Args:
config: VlcConfig object containing VLC-specific settings.
"""
self.config = config
self.executable = shutil.which("vlc")
def play(self, params: PlayerParams) -> PlayerResult:
"""
Play the given media using VLC, handling desktop, mobile, and torrent scenarios.
Args:
params: PlayerParams object containing playback parameters.
Returns:
PlayerResult: Information about the playback session.
"""
if not self.executable:
raise ViuError("VLC executable not found in PATH.")
if TORRENT_REGEX.match(params.url) and detect.is_running_in_termux():
return self._play_on_mobile(params)
else:
return self._play_on_desktop(params)
def play_with_ipc(self, params: PlayerParams, socket_path: str) -> subprocess.Popen:
"""
Not implemented for VLC player.
"""
raise NotImplementedError("play_with_ipc is not implemented for VLC player.")
def _play_on_mobile(self, params: PlayerParams) -> PlayerResult:
"""
Play media on a mobile device using Android intents.
Args:
params: PlayerParams object containing playback parameters.
Returns:
PlayerResult: Information about the playback session.
"""
if YOUTUBE_REGEX.match(params.url):
args = [
"nohup",
"am",
"start",
"--user",
"0",
"-a",
"android.intent.action.VIEW",
"-d",
params.url,
"-n",
"com.google.android.youtube/.UrlActivity",
]
else:
args = [
"nohup",
"am",
"start",
"--user",
"0",
"-a",
"android.intent.action.VIEW",
"-d",
params.url,
"-n",
"org.videolan.vlc/org.videolan.vlc.gui.video.VideoPlayerActivity",
"-e",
"title",
params.title,
]
subprocess.run(args)
return PlayerResult(episode=params.episode)
def _play_on_desktop(self, params: PlayerParams) -> PlayerResult:
"""
Play media on a desktop environment using VLC.
Args:
params: PlayerParams object containing playback parameters.
Returns:
PlayerResult: Information about the playback session.
"""
if TORRENT_REGEX.search(params.url):
return self._stream_on_desktop_with_webtorrent_cli(params)
args = [self.executable, params.url]
if params.subtitles:
for sub in params.subtitles:
args.extend(["--sub-file", sub])
break
if params.title:
args.extend(["--video-title", params.title])
if self.config.args:
args.extend(self.config.args.split(","))
subprocess.run(args, encoding="utf-8")
return PlayerResult(episode=params.episode)
def _stream_on_desktop_with_webtorrent_cli(
self, params: PlayerParams
) -> PlayerResult:
"""
Stream torrent media using the webtorrent CLI and VLC.
Args:
params: PlayerParams object containing playback parameters.
Returns:
PlayerResult: Information about the playback session.
"""
WEBTORRENT_CLI = shutil.which("webtorrent")
if not WEBTORRENT_CLI:
raise ViuError("Please Install webtorrent cli inorder to stream torrents")
args = [WEBTORRENT_CLI, params.url, "--vlc"]
if self.config.args:
args.append("--player-args")
args.extend(self.config.args.split(","))
subprocess.run(args)
return PlayerResult(episode=params.episode)
if __name__ == "__main__":
from viu_media.core.constants import APP_ASCII_ART
print(APP_ASCII_ART)
url = input("Enter the url you would like to stream: ")
vlc = VlcPlayer(VlcConfig())
player_result = vlc.play(PlayerParams(url=url, title="", query="", episode=""))
print(player_result)

View File

@@ -0,0 +1,9 @@
[plugin]
name = "Multi-File Provider Plugin"
version = "1.0.0"
description = "A demo plugin with multiple Python files"
author = "Viu Developer"
requires_python = ">=3.11"
[components]
player = "player:VlcPlayer"

View File

@@ -0,0 +1,18 @@
# Default configuration for Example Provider Plugin
# This file is automatically copied to ~/.config/viu/plugins.config.toml during installation
[example-provider]
# Request timeout in seconds
timeout = 30
# Preferred video quality
preferred_quality = "720p"
# Maximum number of search results to return
max_results = 20
# Custom headers (optional)
# custom_header = "value"
# Enable debug logging for this plugin
# debug = false

View File

@@ -0,0 +1,37 @@
import re
ANIMEPAHE = "animepahe.ru"
ANIMEPAHE_BASE = f"https://{ANIMEPAHE}"
ANIMEPAHE_ENDPOINT = f"{ANIMEPAHE_BASE}/api"
SERVERS_AVAILABLE = ["kwik"]
REQUEST_HEADERS = {
"Cookie": "__ddgid_=VvX0ebHrH2DsFZo4; __ddgmark_=3savRpSVFhvZcn5x; __ddg2_=buBJ3c4pNBYKFZNp; __ddg1_=rbVADKr9URtt55zoIGFa; SERVERID=janna; XSRF-TOKEN=eyJpdiI6IjV5bFNtd0phUHgvWGJxc25wL0VJSUE9PSIsInZhbHVlIjoicEJTZktlR2hxR2JZTWhnL0JzazlvZU5TQTR2bjBWZ2dDb0RwUXVUUWNSclhQWUhLRStYSmJmWmUxWkpiYkFRYU12RjFWejlSWHorME1wZG5qQ1U0TnFlNnBFR2laQjN1MjdyNjc5TjVPdXdJb2o5VkU1bEduRW9pRHNDTHh6Sy8iLCJtYWMiOiI0OTc0ZmNjY2UwMGJkOWY2MWNkM2NlMjk2ZGMyZGJmMWE0NTdjZTdkNGI2Y2IwNTIzZmFiZWU5ZTE2OTk0YmU4IiwidGFnIjoiIn0%3D; laravel_session=eyJpdiI6ImxvdlpqREFnTjdaeFJubUlXQWlJVWc9PSIsInZhbHVlIjoiQnE4R3VHdjZ4M1NDdEVWM1ZqMUxtNnVERnJCcmtCUHZKNzRPR2RFbzNFcStTL29xdnVTbWhsNVRBUXEybVZWNU1UYVlTazFqYlN5UjJva1k4czNGaXBTbkJJK01oTUd3VHRYVHBoc3dGUWxHYnFlS2NJVVNFbTFqMVBWdFpuVUgiLCJtYWMiOiI1NDdjZTVkYmNhNjUwZTMxZmRlZmVmMmRlMGNiYjAwYjlmYjFjY2U0MDc1YTQzZThiMTIxMjJlYTg1NTA4YjBmIiwidGFnIjoiIn0%3D; latest=5592",
"Host": ANIMEPAHE,
"Accept": "application, text/javascript, */*; q=0.01",
"Accept-Encoding": "Utf-8",
"Referer": ANIMEPAHE_BASE,
"DNT": "1",
"Connection": "keep-alive",
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Site": "same-origin",
"Sec-Fetch-Mode": "cors",
"TE": "trailers",
}
SERVER_HEADERS = {
"Host": "kwik.si",
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,image/avif,image/webp,image/png,image/svg+xml,*/*;q=0.8",
"Accept-Language": "en-US,en;q=0.5",
"Accept-Encoding": "Utf-8",
"DNT": "1",
"Connection": "keep-alive",
"Referer": "https://animepahe.ru/",
"Upgrade-Insecure-Requests": "1",
"Sec-Fetch-Dest": "iframe",
"Sec-Fetch-Mode": "navigate",
"Sec-Fetch-Site": "cross-site",
"Priority": "u=4",
"TE": "trailers",
}
JUICY_STREAM_REGEX = re.compile(r"source='(.*)';")
KWIK_RE = re.compile(r"Player\|(.+?)'")

View File

@@ -0,0 +1,77 @@
import re
def animepahe_key_creator(c: int, a: int):
from ...scraping.utils import encode_base_n
if c < a:
val_a = ""
else:
val_a = animepahe_key_creator(int(c / a), a)
c = c % a
if c > 35:
val_b = chr(c + 29)
else:
val_b = encode_base_n(c, 36)
return val_a + val_b
def animepahe_embed_decoder(
encoded_js_p: str,
base_a: int,
no_of_keys_c: int,
values_to_replace_with_k: list,
):
decode_mapper_d: dict = {}
for i in range(no_of_keys_c):
key = animepahe_key_creator(i, base_a)
val = values_to_replace_with_k[i] or key
decode_mapper_d[key] = val
return re.sub(
r"\b\w+\b", lambda match: decode_mapper_d[match.group(0)], encoded_js_p
)
PARAMETERS_REGEX = re.compile(r"eval\(function\(p,a,c,k,e,d\)\{.*\}\((.*?)\)\)$")
ENCODE_JS_REGEX = re.compile(r"'(.*?);',(\d+),(\d+),'(.*)'\.split")
def process_animepahe_embed_page(embed_page: str):
from ...scraping.html_parser import get_element_text_and_html_by_tag
encoded_js_string = ""
embed_page_content = embed_page
for _ in range(8):
text, html = get_element_text_and_html_by_tag("script", embed_page_content)
if not text and html:
embed_page_content = re.sub(html, "", embed_page_content)
continue
if text:
encoded_js_string = text.strip()
break
if not encoded_js_string:
return
obsfucated_js_parameter_match = PARAMETERS_REGEX.search(encoded_js_string)
if not obsfucated_js_parameter_match:
return
parameter_string = obsfucated_js_parameter_match.group(1)
encoded_js_parameter_string = ENCODE_JS_REGEX.search(parameter_string)
if not encoded_js_parameter_string:
return
p: str = encoded_js_parameter_string.group(1)
a: int = int(encoded_js_parameter_string.group(2))
c: int = int(encoded_js_parameter_string.group(3))
k: list = encoded_js_parameter_string.group(4).split("|")
return animepahe_embed_decoder(p, a, c, k).replace("\\", "")
if __name__ == "__main__":
# Testing time
filepath = input("Enter file name: ")
if filepath:
with open(filepath) as file:
data = file.read()
else:
data = """<script>eval(function(p,a,c,k,e,d){e=function(c){return(c<a?'':e(parseInt(c/a)))+((c=c%a)>35?String.fromCharCode(c+29):c.toString(36))};if(!''.replace(/^/,String)){while(c--){d[e(c)]=k[c]||e(c)}k=[function(e){return d[e]}];e=function(){return'\\w+'};c=1};while(c--){if(k[c]){p=p.replace(new RegExp('\\b'+e(c)+'\\b','g'),k[c])}}return p}('f $7={H:a(2){4 B(9.7.h(y z("(?:(?:^|.*;)\\\\s*"+d(2).h(/[\\-\\.\\+\\*]/g,"\\\\$&")+"\\\\s*\\\\=\\\\s*([^;]*).*$)|^.*$"),"$1"))||G},E:a(2,q,3,6,5,t){k(!2||/^(?:8|r\\-v|o|m|p)$/i.D(2)){4 w}f b="";k(3){F(3.J){j K:b=3===P?"; 8=O, I N Q M:u:u A":"; r-v="+3;n;j L:b="; 8="+3;n;j S:b="; 8="+3.Z();n}}9.7=d(2)+"="+d(q)+b+(5?"; m="+5:"")+(6?"; o="+6:"")+(t?"; p":"");4 x},Y:a(2,6,5){k(!2||!11.C(2)){4 w}9.7=d(2)+"=; 8=12, R 10 W l:l:l A"+(5?"; m="+5:"")+(6?"; o="+6:"");4 x},C:a(2){4(y z("(?:^|;\\\\s*)"+d(2).h(/[\\-\\.\\+\\*]/g,"\\\\$&")+"\\\\s*\\\\=")).D(9.7)},X:a(){f c=9.7.h(/((?:^|\\s*;)[^\\=]+)(?=;|$)|^\\s*|\\s*(?:\\=[^;]*)?(?:\\1|$)/g,"").T(/\\s*(?:\\=[^;]*)?;\\s*/);U(f e=0;e<c.V;e++){c[e]=B(c[e])}4 c}};',62,65,'||sKey|vEnd|return|sDomain|sPath|cookie|expires|document|function|sExpires|aKeys|encodeURIComponent|nIdx|var||replace||case|if|00|domain|break|path|secure|sValue|max||bSecure|59|age|false|true|new|RegExp|GMT|decodeURIComponent|hasItem|test|setItem|switch|null|getItem|31|constructor|Number|String|23|Dec|Fri|Infinity|9999|01|Date|split|for|length|1970|keys|removeItem|toUTCString|Jan|this|Thu'.split('|'),0,{}));eval(function(p,a,c,k,e,d){e=function(c){return(c<a?'':e(parseInt(c/a)))+((c=c%a)>35?String.fromCharCode(c+29):c.toString(36))};if(!''.replace(/^/,String)){while(c--){d[e(c)]=k[c]||e(c)}k=[function(e){return d[e]}];e=function(){return'\\w+'};c=1};while(c--){if(k[c]){p=p.replace(new RegExp('\\b'+e(c)+'\\b','g'),k[c])}}return p}('h o=\'1D://1C-E.1B.1A.1z/1y/E/1x/1w/1v.1u\';h d=s.r(\'d\');h 0=B 1t(d,{\'1s\':{\'1r\':i},\'1q\':\'16:9\',\'D\':1,\'1p\':5,\'1o\':{\'1n\':\'1m\'},1l:[\'7-1k\',\'7\',\'1j\',\'1i-1h\',\'1g\',\'1f-1e\',\'1d\',\'D\',\'1c\',\'1b\',\'1a\',\'19\',\'C\',\'18\'],\'C\':{\'17\':i}});8(!A.15()){d.14=o}x{j z={13:12,11:10,Z:Y,X:i,W:i};h c=B A(z);c.V(o);c.U(d);g.c=c}0.3("T",6=>{g.S.R.Q("P")});0.O=1;k v(b,n,m){8(b.y){b.y(n,m,N)}x 8(b.w){b.w(\'3\'+n,m)}}j 4=k(l){g.M.L(l,\'*\')};v(g,\'l\',k(e){j a=e.a;8(a===\'7\')0.7();8(a===\'f\')0.f();8(a===\'u\')0.u()});0.3(\'t\',6=>{4(\'t\')});0.3(\'7\',6=>{4(\'7\')});0.3(\'f\',6=>{4(\'f\')});0.3(\'K\',6=>{4(0.q);s.r(\'.J-I\').H=G(0.q.F(2))});0.3(\'p\',6=>{4(\'p\')});',62,102,'player|||on|sendMessage||event|play|if||data|element|hls|video||pause|window|const|true|var|function|message|eventHandler|eventName|source|ended|currentTime|querySelector|document|ready|stop|bindEvent|attachEvent|else|addEventListener|config|Hls|new|fullscreen|volume|01|toFixed|String|innerHTML|timestamp|ss|timeupdate|postMessage|parent|false|speed|landscape|lock|orientation|screen|enterfullscreen|attachMedia|loadSource|lowLatencyMode|enableWorker|Infinity|backBufferLength|600|maxMaxBufferLength|180|maxBufferLength|src|isSupported||iosNative|capture|airplay|pip|settings|captions|mute|time|current|progress|forward|fast|rewind|large|controls|kwik|key|storage|seekTime|ratio|global|keyboard|Plyr|m3u8|uwu|b92a392054c041a3f9c6eecabeb0e127183f44e547828447b10bca8d77523e6f|03|stream|org|nextcdn|files|eu|https'.split('|'),0,{}))</script>"""
print(process_animepahe_embed_page(data))

View File

@@ -0,0 +1,100 @@
from typing import Any
from viu_media.libs.provider.anime.types import (
Anime,
AnimeEpisodeInfo,
AnimeEpisodes,
EpisodeStream,
MediaTranslationType,
PageInfo,
SearchResult,
SearchResults,
Server,
)
from .types import (
AnimePaheAnimePage,
AnimePaheSearchPage,
)
translation_type_map = {
"sub": MediaTranslationType.SUB,
"dub": MediaTranslationType.DUB,
"raw": MediaTranslationType.RAW,
}
def map_to_search_results(data: AnimePaheSearchPage) -> SearchResults:
results = []
for result in data["data"]:
results.append(
SearchResult(
id=result["session"],
title=result["title"],
episodes=AnimeEpisodes(
sub=list(map(str, range(1, result["episodes"] + 1))),
dub=list(map(str, range(1, result["episodes"] + 1))),
raw=list(map(str, range(1, result["episodes"] + 1))),
),
media_type=result["type"],
score=result["score"],
status=result["status"],
season=result["season"],
poster=result["poster"],
year=str(result["year"]),
)
)
return SearchResults(
page_info=PageInfo(
total=data["total"],
per_page=data["per_page"],
current_page=data["current_page"],
),
results=results,
)
def map_to_anime_result(
search_result: SearchResult, anime: AnimePaheAnimePage
) -> Anime:
episodes_info = []
episodes = []
anime["data"] = sorted(anime["data"], key=lambda k: float(k["episode"]))
for ep_info in anime["data"]:
episodes.append(str(ep_info["episode"]))
episodes_info.append(
AnimeEpisodeInfo(
id=str(ep_info["id"]),
session_id=ep_info["session"],
episode=str(ep_info["episode"]),
title=ep_info["title"],
poster=ep_info["snapshot"],
duration=str(ep_info["duration"]),
)
)
return Anime(
id=search_result.id,
title=search_result.title,
episodes=AnimeEpisodes(
sub=episodes,
dub=episodes,
),
year=str(search_result.year),
poster=search_result.poster,
episodes_info=episodes_info,
)
def map_to_server(
episode: AnimeEpisodeInfo, translation_type: Any, quality: Any, stream_link: Any
) -> Server:
links = [
EpisodeStream(
link=stream_link,
quality=quality,
translation_type=translation_type_map[translation_type],
)
]
return Server(name="kwik", links=links, episode_title=episode.title)

View File

@@ -0,0 +1,10 @@
[plugin]
name = "Example Provider Plugin"
version = "1.0.0"
description = "A demo provider plugin for testing the viu plugin system"
author = "Viu Developer"
homepage = "https://github.com/example/viu-example-plugin"
requires_python = ">=3.11"
[components]
provider = "example_provider:ExampleProvider"

View File

@@ -0,0 +1,207 @@
import logging
from functools import lru_cache
from typing import Iterator, Optional
from viu_media.libs.provider.anime.base import BaseAnimeProvider
from viu_media.libs.provider.anime.params import (
AnimeParams,
EpisodeStreamsParams,
SearchParams,
)
from viu_media.libs.provider.anime.types import (
Anime,
AnimeEpisodeInfo,
SearchResult,
SearchResults,
Server,
)
from viu_media.libs.provider.anime.utils.debug import debug_provider
from .constants import (
ANIMEPAHE_BASE,
ANIMEPAHE_ENDPOINT,
JUICY_STREAM_REGEX,
REQUEST_HEADERS,
SERVER_HEADERS,
)
from .extractor import process_animepahe_embed_page
from .mappers import map_to_anime_result, map_to_search_results, map_to_server
from .types import AnimePaheAnimePage, AnimePaheSearchPage
logger = logging.getLogger(__name__)
class AnimePahe(BaseAnimeProvider):
HEADERS = REQUEST_HEADERS
@debug_provider
def search(self, params: SearchParams) -> SearchResults | None:
return self._search(params)
@lru_cache()
def _search(self, params: SearchParams) -> SearchResults | None:
url_params = {"m": "search", "q": params.query}
response = self.client.get(ANIMEPAHE_ENDPOINT, params=url_params)
response.raise_for_status()
data: AnimePaheSearchPage = response.json()
if not data.get("data"):
return
return map_to_search_results(data)
@debug_provider
def get(self, params: AnimeParams) -> Anime | None:
return self._get_anime(params)
@lru_cache()
def _get_anime(self, params: AnimeParams) -> Anime | None:
page = 1
standardized_episode_number = 0
search_result = self._get_search_result(params)
if not search_result:
logger.error(f"No search result found for ID {params.id}")
return None
anime: Optional[AnimePaheAnimePage] = None
has_next_page = True
while has_next_page:
logger.debug(f"Loading page: {page}")
_anime_page = self._anime_page_loader(
m="release",
id=params.id,
sort="episode_asc",
page=page,
)
has_next_page = True if _anime_page["next_page_url"] else False
page += 1
if not anime:
anime = _anime_page
else:
anime["data"].extend(_anime_page["data"])
if anime:
for episode in anime.get("data", []):
if episode["episode"] % 1 == 0:
standardized_episode_number += 1
episode.update({"episode": standardized_episode_number})
else:
standardized_episode_number += episode["episode"] % 1
episode.update({"episode": standardized_episode_number})
standardized_episode_number = int(standardized_episode_number)
return map_to_anime_result(search_result, anime)
@lru_cache()
def _get_search_result(self, params: AnimeParams) -> Optional[SearchResult]:
search_results = self._search(SearchParams(query=params.query))
if not search_results or not search_results.results:
logger.error(f"No search results found for ID {params.id}")
return None
for search_result in search_results.results:
if search_result.id == params.id:
return search_result
@lru_cache()
def _anime_page_loader(self, m, id, sort, page) -> AnimePaheAnimePage:
url_params = {
"m": m,
"id": id,
"sort": sort,
"page": page,
}
response = self.client.get(ANIMEPAHE_ENDPOINT, params=url_params)
response.raise_for_status()
return response.json()
@debug_provider
def episode_streams(self, params: EpisodeStreamsParams) -> Iterator[Server] | None:
from viu_media.libs.provider.scraping.html_parser import (
extract_attributes,
get_element_by_id,
get_elements_html_by_class,
)
episode = self._get_episode_info(params)
if not episode:
logger.error(
f"Episode {params.episode} doesn't exist for anime {params.anime_id}"
)
return
url = f"{ANIMEPAHE_BASE}/play/{params.anime_id}/{episode.session_id}"
response = self.client.get(url, follow_redirects=True)
response.raise_for_status()
c = get_element_by_id("resolutionMenu", response.text)
if not c:
logger.error("Resolution menu not found in the response")
return
resolutionMenuItems = get_elements_html_by_class("dropdown-item", c)
res_dicts = [extract_attributes(item) for item in resolutionMenuItems]
quality = None
translation_type = None
stream_link = None
# TODO: better document the scraping process
for res_dict in res_dicts:
# the actual attributes are data attributes in the original html 'prefixed with data-'
embed_url = res_dict["src"]
data_audio = "dub" if res_dict["audio"] == "eng" else "sub"
if data_audio != params.translation_type:
continue
if not embed_url:
logger.warning("embed url not found please report to the developers")
continue
embed_response = self.client.get(
embed_url,
headers={
"User-Agent": self.client.headers["User-Agent"],
**SERVER_HEADERS,
},
)
embed_response.raise_for_status()
embed_page = embed_response.text
decoded_js = process_animepahe_embed_page(embed_page)
if not decoded_js:
logger.error("failed to decode embed page")
continue
juicy_stream = JUICY_STREAM_REGEX.search(decoded_js)
if not juicy_stream:
logger.error("failed to find juicy stream")
continue
juicy_stream = juicy_stream.group(1)
quality = res_dict["resolution"]
translation_type = data_audio
stream_link = juicy_stream
if translation_type and quality and stream_link:
yield map_to_server(episode, translation_type, quality, stream_link)
@lru_cache()
def _get_episode_info(
self, params: EpisodeStreamsParams
) -> Optional[AnimeEpisodeInfo]:
anime_info = self._get_anime(
AnimeParams(id=params.anime_id, query=params.query)
)
if not anime_info:
logger.error(f"No anime info for {params.anime_id}")
return
if not anime_info.episodes_info:
logger.error(f"No episodes info for {params.anime_id}")
return
for episode in anime_info.episodes_info:
if episode.episode == params.episode:
return episode
if __name__ == "__main__":
from viu_media.libs.provider.anime.utils.debug import test_anime_provider
test_anime_provider(AnimePahe)

View File

@@ -0,0 +1,108 @@
from enum import Enum
from typing import Literal, TypedDict
class Server(Enum):
KWIK = "Kwik"
class AnimePaheSearchResult(TypedDict):
id: str
title: str
type: str
episodes: int
status: str
season: str
year: int
score: int
poster: str
session: str
class AnimePaheSearchPage(TypedDict):
total: int
per_page: int
current_page: int
last_page: int
_from: int
to: int
data: list[AnimePaheSearchResult]
class Episode(TypedDict):
id: str
anime_id: int
episode: float
episode2: int
edition: str
title: str
snapshot: str # episode image
disc: str
audio: Literal["eng", "jpn"]
duration: str # time 00:00:00
session: str
filler: int
created_at: str
class AnimePaheAnimePage(TypedDict):
total: int
per_page: int
current_page: int
last_page: int
next_page_url: str | None
prev_page_url: str | None
_from: int
to: int
data: list[Episode]
class AnimePaheEpisodeInfo(TypedDict):
title: str
episode: float
id: str
translation_type: Literal["eng", "jpn"]
duration: str
poster: str
class AvailableEpisodesDetail(TypedDict):
sub: list[str]
dub: list[str]
raw: list[str]
class AnimePaheAnime(TypedDict):
id: str
title: str
year: int
season: str
poster: str
score: int
availableEpisodesDetail: AvailableEpisodesDetail
episodesInfo: list[AnimePaheEpisodeInfo]
class PageInfo(TypedDict):
total: int
perPage: int
currentPage: int
class AnimePaheSearchResults(TypedDict):
pageInfo: PageInfo
results: list[AnimePaheSearchResult]
class AnimePaheStreamLink(TypedDict):
quality: str
translation_type: Literal["sub", "dub"]
link: str
class AnimePaheServer(TypedDict):
server: Literal["kwik"]
links: list[AnimePaheStreamLink]
episode_title: str
subtitles: list
headers: dict

View File

@@ -11,6 +11,7 @@ dependencies = [
"inquirerpy>=0.3.4",
"pydantic>=2.11.7",
"rich>=13.9.2",
"tomli-w>=1.0.0",
]
[project.scripts]

11
uv.lock generated
View File

@@ -3553,6 +3553,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/82/4f/1695e70ceb3604f19eda9908e289c687ea81c4fecef4d90a9d1d0f2f7ae9/thefuzz-0.22.1-py3-none-any.whl", hash = "sha256:59729b33556850b90e1093c4cf9e618af6f2e4c985df193fdf3c5b5cf02ca481", size = 8245, upload-time = "2024-01-19T19:18:20.362Z" },
]
[[package]]
name = "tomli-w"
version = "1.2.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/19/75/241269d1da26b624c0d5e110e8149093c759b7a286138f4efd61a60e75fe/tomli_w-1.2.0.tar.gz", hash = "sha256:2dd14fac5a47c27be9cd4c976af5a12d87fb1f0b4512f81d69cce3b35ae25021", size = 7184, upload-time = "2025-01-15T12:07:24.262Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/c7/18/c86eb8e0202e32dd3df50d43d7ff9854f8e0603945ff398974c1d91ac1ef/tomli_w-1.2.0-py3-none-any.whl", hash = "sha256:188306098d013b691fcadc011abd66727d3c414c571bb01b1a174ba8c983cf90", size = 6675, upload-time = "2025-01-15T12:07:22.074Z" },
]
[[package]]
name = "typing-extensions"
version = "4.14.1"
@@ -3598,6 +3607,7 @@ dependencies = [
{ name = "inquirerpy" },
{ name = "pydantic" },
{ name = "rich" },
{ name = "tomli-w" },
]
[package.optional-dependencies]
@@ -3667,6 +3677,7 @@ requires-dist = [
{ name = "pypresence", marker = "extra == 'discord'", specifier = ">=4.3.0" },
{ name = "rich", specifier = ">=13.9.2" },
{ name = "thefuzz", marker = "extra == 'standard'", specifier = ">=0.22.1" },
{ name = "tomli-w", specifier = ">=1.0.0" },
{ name = "yt-dlp", marker = "extra == 'download'", specifier = ">=2025.7.21" },
{ name = "yt-dlp", marker = "extra == 'standard'", specifier = ">=2025.7.21" },
]

View File

@@ -39,6 +39,7 @@ commands = {
"worker": "worker.worker",
"queue": "queue.queue",
"completions": "completions.completions",
"plugin": "plugin.plugin",
}

View File

@@ -0,0 +1,5 @@
"""Plugin management commands for viu."""
from .cmd import plugin
__all__ = ["plugin"]

View File

@@ -0,0 +1,24 @@
"""Main plugin command group."""
import click
from ...utils.lazyloader import LazyGroup
lazy_subcommands = {
"add": "add.add",
"remove": "remove.remove",
"list": "list_plugins.list_plugins",
"update": "update.update",
}
@click.group(
name="plugin",
cls=LazyGroup,
root="viu_media.cli.commands.plugin.commands",
lazy_subcommands=lazy_subcommands,
help="Manage viu plugins (providers, players, selectors, commands)"
)
def plugin() -> None:
"""Manage viu plugins."""
pass

View File

@@ -0,0 +1 @@
"""Plugin command implementations."""

View File

@@ -0,0 +1,54 @@
"""Add plugin command."""
import click
from rich.console import Console
from typing import cast
from viu_media.core.plugins.manager import PluginError, plugin_manager, ComponentType
console = Console()
@click.command()
@click.option(
"--type",
"plugin_type",
type=click.Choice(["provider", "player", "selector", "command"]),
required=True,
help="Type of plugin to install"
)
@click.option(
"--force",
is_flag=True,
help="Force installation, overwriting existing plugin"
)
@click.argument("name")
@click.argument("source")
def add(plugin_type: str, name: str, source: str, force: bool) -> None:
"""Install a plugin from a Git repository.
NAME: Local name for the plugin
SOURCE: Git source (e.g., 'github:user/repo' or full URL)
Examples:
viu plugin add --type provider gogoanime github:user/viu-gogoanime
viu plugin add --type player custom-mpv https://github.com/user/viu-mpv-plugin
"""
try:
console.print(f"Installing {plugin_type} plugin '{name}' from {source}...")
plugin_manager.add_plugin(cast(ComponentType, plugin_type), name, source, force=force)
console.print(f"✅ Successfully installed plugin '{name}'", style="green")
# Show configuration hint
from viu_media.core.constants import PLUGINS_CONFIG
console.print(
f"\n💡 Configure the plugin by editing: {PLUGINS_CONFIG}",
style="blue"
)
except PluginError as e:
console.print(f"❌ Failed to install plugin: {e}", style="red")
raise click.ClickException(str(e))
except Exception as e:
console.print(f"❌ Unexpected error: {e}", style="red")
raise click.ClickException(f"Unexpected error: {e}")

View File

@@ -0,0 +1,74 @@
"""List plugins command."""
import click
from rich.console import Console
from rich.table import Table
from typing import cast
from viu_media.core.plugins.manager import plugin_manager, ComponentType
console = Console()
@click.command(name="list")
@click.option(
"--type",
"plugin_type",
type=click.Choice(["provider", "player", "selector", "command"]),
help="Filter by plugin type"
)
def list_plugins(plugin_type: str) -> None:
"""List installed plugins.
Examples:
viu plugin list
viu plugin list --type provider
"""
all_plugins = plugin_manager.list_plugins()
# Filter by type if specified
if plugin_type:
plugins_to_show = {cast(ComponentType, plugin_type): all_plugins[cast(ComponentType, plugin_type)]}
else:
plugins_to_show = all_plugins
# Count total plugins
total_count = sum(len(plugins) for plugins in plugins_to_show.values())
if total_count == 0:
if plugin_type:
console.print(f"No {plugin_type} plugins installed.", style="yellow")
else:
console.print("No plugins installed.", style="yellow")
console.print("Install plugins with: viu plugin add --type <type> <name> <source>")
return
# Create table
table = Table(title="Installed Plugins")
table.add_column("Type", style="cyan")
table.add_column("Name", style="green")
table.add_column("Version", style="yellow")
table.add_column("Source", style="blue")
table.add_column("Path", style="magenta")
# Add rows
for component_type, plugins in plugins_to_show.items():
for name, plugin_info in plugins.items():
table.add_row(
component_type,
name,
plugin_info.version or "unknown",
plugin_info.source,
str(plugin_info.path)
)
console.print(table)
console.print(f"\nTotal: {total_count} plugin(s)")
# Show configuration hint if plugins exist
if total_count > 0:
from viu_media.core.constants import PLUGINS_CONFIG
console.print(
f"\n💡 Configure plugins by editing: {PLUGINS_CONFIG}",
style="blue"
)

View File

@@ -0,0 +1,43 @@
"""Remove plugin command."""
import click
from rich.console import Console
from typing import cast
from viu_media.core.plugins.manager import PluginError, PluginNotFoundError, plugin_manager, ComponentType
console = Console()
@click.command()
@click.option(
"--type",
"plugin_type",
type=click.Choice(["provider", "player", "selector", "command"]),
required=True,
help="Type of plugin to remove"
)
@click.argument("name")
def remove(plugin_type: str, name: str) -> None:
"""Remove an installed plugin.
NAME: Name of the plugin to remove
Examples:
viu plugin remove --type provider gogoanime
viu plugin remove --type player custom-mpv
"""
try:
console.print(f"Removing {plugin_type} plugin '{name}'...")
plugin_manager.remove_plugin(cast(ComponentType, plugin_type), name)
console.print(f"✅ Successfully removed plugin '{name}'", style="green")
except PluginNotFoundError as e:
console.print(f"❌ Plugin not found: {e}", style="red")
raise click.ClickException(str(e))
except PluginError as e:
console.print(f"❌ Failed to remove plugin: {e}", style="red")
raise click.ClickException(str(e))
except Exception as e:
console.print(f"❌ Unexpected error: {e}", style="red")
raise click.ClickException(f"Unexpected error: {e}")

View File

@@ -0,0 +1,43 @@
"""Update plugin command."""
import click
from rich.console import Console
from typing import cast
from viu_media.core.plugins.manager import PluginError, PluginNotFoundError, plugin_manager, ComponentType
console = Console()
@click.command()
@click.option(
"--type",
"plugin_type",
type=click.Choice(["provider", "player", "selector", "command"]),
required=True,
help="Type of plugin to update"
)
@click.argument("name")
def update(plugin_type: str, name: str) -> None:
"""Update an installed plugin by pulling from Git.
NAME: Name of the plugin to update
Examples:
viu plugin update --type provider gogoanime
viu plugin update --type player custom-mpv
"""
try:
console.print(f"Updating {plugin_type} plugin '{name}'...")
plugin_manager.update_plugin(cast(ComponentType, plugin_type), name)
console.print(f"✅ Successfully updated plugin '{name}'", style="green")
except PluginNotFoundError as e:
console.print(f"❌ Plugin not found: {e}", style="red")
raise click.ClickException(str(e))
except PluginError as e:
console.print(f"❌ Failed to update plugin: {e}", style="red")
raise click.ClickException(str(e))
except Exception as e:
console.print(f"❌ Unexpected error: {e}", style="red")
raise click.ClickException(f"Unexpected error: {e}")

View File

@@ -76,11 +76,17 @@ else:
USER_APPLICATIONS = Path.home() / ".local" / "share" / "applications"
LOG_FOLDER = APP_CACHE_DIR / "logs"
# Plugin system paths
PLUGINS_DIR = APP_DATA_DIR / "plugins"
PLUGINS_MANIFEST = APP_DATA_DIR / "plugins.toml"
PLUGINS_CONFIG = APP_DATA_DIR / "plugins.config.toml"
# USER_APPLICATIONS.mkdir(parents=True,exist_ok=True)
APP_DATA_DIR.mkdir(parents=True, exist_ok=True)
APP_CACHE_DIR.mkdir(parents=True, exist_ok=True)
LOG_FOLDER.mkdir(parents=True, exist_ok=True)
USER_VIDEOS_DIR.mkdir(parents=True, exist_ok=True)
PLUGINS_DIR.mkdir(parents=True, exist_ok=True)
USER_CONFIG = APP_DATA_DIR / "config.toml"

View File

@@ -0,0 +1,6 @@
"""Plugin system for viu."""
from .model import PluginComponents, PluginInfo
from .manager import PluginManager
__all__ = ["PluginInfo", "PluginComponents", "PluginManager"]

View File

@@ -0,0 +1,646 @@
"""Plugin manager for viu.
This module contains the PluginManager singleton that handles all plugin operations
including loading, discovery, installation, and removal.
"""
import importlib.util
import logging
import shutil
import subprocess
import sys
import tomllib
from pathlib import Path
from typing import Any, Dict, Literal, Optional, Set, Union
import tomli_w
from pydantic import ValidationError
from viu_media.core.exceptions import ViuError
from ..constants import PLUGINS_CONFIG, PLUGINS_DIR, PLUGINS_MANIFEST
from .model import InstalledPlugin, PluginInfo, PluginManifest
logger = logging.getLogger(__name__)
ComponentType = Literal["provider", "player", "selector", "command"]
class PluginError(ViuError):
"""Base exception for plugin-related errors."""
pass
class PluginNotFoundError(ViuError):
"""Raised when a requested plugin is not found."""
pass
class PluginLoadError(ViuError):
"""Raised when a plugin fails to load."""
pass
class PluginManager:
"""Manages the plugin system for viu.
This is a singleton class that handles:
- Loading and caching plugins
- Installing and removing plugins from Git repositories
- Managing plugin configurations
- Discovering available plugins
"""
_instance: Optional["PluginManager"] = None
_initialized: bool = False
def __new__(cls) -> "PluginManager":
if cls._instance is None:
cls._instance = super().__new__(cls)
return cls._instance
def __init__(self) -> None:
if self._initialized:
return
self._loaded_components: Dict[str, Any] = {}
self._manifest: PluginManifest = PluginManifest()
self._plugin_configs: Dict[str, Dict[str, Any]] = {}
self._load_manifest()
self._load_plugin_configs()
self._initialized = True
def load_component(self, component_type: ComponentType, name: str) -> Any:
"""Lazy-load a plugin component by type and name.
Args:
component_type: Type of component (provider, player, selector, command)
name: Name of the component to load
Returns:
The loaded component instance or function
Raises:
PluginNotFoundError: If the plugin is not installed
PluginLoadError: If the plugin fails to load
"""
cache_key = f"{component_type}:{name}"
# Return cached component if already loaded
if cache_key in self._loaded_components:
return self._loaded_components[cache_key]
# Find the plugin in the manifest
plugins_of_type = getattr(self._manifest, f"{component_type}s")
if name not in plugins_of_type:
raise PluginNotFoundError(
f"Plugin '{name}' of type '{component_type}' is not installed"
)
plugin_entry = plugins_of_type[name]
plugin_path = plugin_entry.path
if not plugin_path.exists():
raise PluginLoadError(f"Plugin path does not exist: {plugin_path}")
# Load plugin info to get component definition
try:
plugin_info = self._get_plugin_info(plugin_path)
except PluginError as e:
raise PluginLoadError(f"Failed to load plugin info: {e}") from e
# Get the component definition
component_def = getattr(plugin_info.components, component_type)
if not component_def:
raise PluginLoadError(
f"Plugin '{name}' does not provide a {component_type} component"
)
# Parse module:class format
if ":" not in component_def:
raise PluginLoadError(f"Invalid component definition: {component_def}")
module_name, class_name = component_def.split(":", 1)
# Load the module
module_path = plugin_path / f"{module_name}.py"
if not module_path.exists():
raise PluginLoadError(f"Plugin module not found: {module_path}")
try:
spec = importlib.util.spec_from_file_location(
f"plugin_{name}_{module_name}", module_path
)
if spec is None or spec.loader is None:
raise PluginLoadError(f"Could not create module spec for {module_path}")
module = importlib.util.module_from_spec(spec)
# Add plugin path to sys.path temporarily for relative imports
sys.path.insert(0, str(plugin_path))
try:
spec.loader.exec_module(module)
finally:
sys.path.remove(str(plugin_path))
except Exception as e:
raise PluginLoadError(f"Failed to load module {module_path}: {e}") from e
# Get the component class/function
if not hasattr(module, class_name):
raise PluginLoadError(f"Module {module_name} does not have {class_name}")
component_cls = getattr(module, class_name)
# For providers, players, and selectors, instantiate with config
if component_type in ("provider", "player", "selector"):
plugin_config = self._plugin_configs.get(name, {})
# For providers, also inject httpx client like the built-in system
if component_type == "provider":
from httpx import Client
from ...core.utils.networking import random_user_agent
headers = getattr(component_cls, "HEADERS", {})
client = Client(headers={"User-Agent": random_user_agent(), **headers})
try:
component = component_cls(client, **plugin_config)
except TypeError:
# Fallback if constructor doesn't accept config
component = component_cls(client)
else:
try:
component = component_cls(**plugin_config)
except TypeError:
# Fallback if constructor doesn't accept config
component = component_cls()
else:
# For commands, just return the function
component = component_cls
# Cache and return
self._loaded_components[cache_key] = component
logger.debug(f"Loaded plugin component: {cache_key}")
return component
def add_plugin(
self, component_type: ComponentType, name: str, source: str, force: bool = False
) -> None:
"""Install a plugin from a Git repository.
Args:
component_type: Type of component the plugin provides
name: Local name for the plugin
source: Git source (e.g., "github:user/repo")
force: Whether to overwrite existing plugin
Raises:
PluginError: If installation fails
"""
plugins_of_type = getattr(self._manifest, f"{component_type}s")
# Check if plugin already exists
if name in plugins_of_type and not force:
raise PluginError(
f"Plugin '{name}' already exists. Use --force to overwrite."
)
# Determine installation path
plugin_dir = PLUGINS_DIR / f"{component_type}s" / name
# Remove existing if force is True
if plugin_dir.exists():
if force:
shutil.rmtree(plugin_dir)
else:
raise PluginError(f"Plugin directory already exists: {plugin_dir}")
# Create parent directory
plugin_dir.parent.mkdir(parents=True, exist_ok=True)
# Clone the repository
self._clone_plugin(source, plugin_dir)
# Validate plugin structure
try:
plugin_info = self._get_plugin_info(plugin_dir)
except PluginError:
# Clean up on validation failure
shutil.rmtree(plugin_dir)
raise
# Ensure plugin provides the expected component type
expected_component = getattr(plugin_info.components, component_type)
if not expected_component:
shutil.rmtree(plugin_dir)
raise PluginError(f"Plugin does not provide a {component_type} component")
# Add to manifest
plugins_of_type[name] = InstalledPlugin(
source=source, path=plugin_dir, version=plugin_info.plugin.version
)
# Save manifest
self._save_manifest()
# Copy default config if it exists
self._install_default_config(name, plugin_dir)
logger.info(f"Successfully installed {component_type} plugin '{name}'")
def remove_plugin(self, component_type: ComponentType, name: str) -> None:
"""Remove an installed plugin.
Args:
component_type: Type of component
name: Name of the plugin to remove
Raises:
PluginNotFoundError: If plugin is not installed
PluginError: If removal fails
"""
plugins_of_type = getattr(self._manifest, f"{component_type}s")
if name not in plugins_of_type:
raise PluginNotFoundError(
f"Plugin '{name}' of type '{component_type}' is not installed"
)
plugin_entry = plugins_of_type[name]
plugin_path = plugin_entry.path
# Remove from filesystem
if plugin_path.exists():
try:
shutil.rmtree(plugin_path)
except OSError as e:
raise PluginError(f"Failed to remove plugin directory: {e}") from e
# Remove from manifest
del plugins_of_type[name]
# Remove from loaded components cache
cache_key = f"{component_type}:{name}"
self._loaded_components.pop(cache_key, None)
# Save manifest
self._save_manifest()
logger.info(f"Successfully removed {component_type} plugin '{name}'")
def update_plugin(self, component_type: ComponentType, name: str) -> None:
"""Update an installed plugin by pulling from Git.
Args:
component_type: Type of component
name: Name of the plugin to update
Raises:
PluginNotFoundError: If plugin is not installed
PluginError: If update fails
"""
plugins_of_type = getattr(self._manifest, f"{component_type}s")
if name not in plugins_of_type:
raise PluginNotFoundError(
f"Plugin '{name}' of type '{component_type}' is not installed"
)
plugin_entry = plugins_of_type[name]
plugin_path = plugin_entry.path
if not plugin_path.exists():
raise PluginError(f"Plugin path does not exist: {plugin_path}")
# Pull latest changes
try:
result = subprocess.run(
["git", "pull"],
cwd=plugin_path,
check=True,
capture_output=True,
text=True,
)
logger.debug(f"Git pull output: {result.stdout}")
except subprocess.CalledProcessError as e:
raise PluginError(f"Failed to update plugin: {e.stderr}") from e
except FileNotFoundError:
raise PluginError("Git is not installed or not in PATH") from None
# Update version in manifest
try:
plugin_info = self._get_plugin_info(plugin_path)
plugin_entry.version = plugin_info.plugin.version
self._save_manifest()
except PluginError as e:
logger.warning(f"Could not update plugin version: {e}")
# Clear from cache to force reload
cache_key = f"{component_type}:{name}"
self._loaded_components.pop(cache_key, None)
logger.info(f"Successfully updated {component_type} plugin '{name}'")
def list_plugins(self) -> Dict[ComponentType, Dict[str, InstalledPlugin]]:
"""List all installed plugins grouped by type.
Returns:
Dictionary mapping component types to their installed plugins
"""
return {
"provider": dict(self._manifest.providers),
"player": dict(self._manifest.players),
"selector": dict(self._manifest.selectors),
"command": dict(self._manifest.commands),
}
def get_available_components(self, component_type: ComponentType) -> Set[str]:
"""Get names of all available components of a given type.
This includes both built-in components and installed plugins.
Args:
component_type: Type of component
Returns:
Set of component names
"""
# Get plugin names
plugins_of_type = getattr(self._manifest, f"{component_type}s")
plugin_names = set(plugins_of_type.keys())
# Add built-in component names
if component_type == "provider":
from ...libs.provider.anime.provider import PROVIDERS_AVAILABLE
builtin_names = set(PROVIDERS_AVAILABLE.keys())
elif component_type == "player":
from ...libs.player.player import PLAYERS
builtin_names = set(PLAYERS)
elif component_type == "selector":
from ...libs.selectors.selector import SELECTORS
builtin_names = set(SELECTORS)
elif component_type == "command":
# Commands would need to be handled differently as they're registered in CLI
builtin_names = set()
else:
builtin_names = set()
return plugin_names | builtin_names
def is_plugin(self, component_type: ComponentType, name: str) -> bool:
"""Check if a component is provided by a plugin.
Args:
component_type: Type of component
name: Name of the component
Returns:
True if it's a plugin, False if it's built-in
"""
plugins_of_type = getattr(self._manifest, f"{component_type}s")
return name in plugins_of_type
def _load_manifest(self) -> None:
"""Load the plugins.toml manifest file."""
if not PLUGINS_MANIFEST.exists():
logger.debug("No plugins manifest found, creating empty one")
self._save_manifest()
return
try:
with open(PLUGINS_MANIFEST, "rb") as f:
data = tomllib.load(f)
self._manifest = PluginManifest.model_validate(data)
logger.debug(
f"Loaded plugins manifest with {len(self.list_plugins())} plugins"
)
except (OSError, ValidationError, tomllib.TOMLDecodeError) as e:
logger.error(f"Failed to load plugins manifest: {e}")
self._manifest = PluginManifest()
def _save_manifest(self) -> None:
"""Save the current manifest to plugins.toml."""
try:
# Convert Path objects to strings for TOML serialization
manifest_dict = self._manifest.model_dump()
# Convert all Path objects to strings
def convert_paths(obj: Any) -> Any:
if isinstance(obj, dict):
return {k: convert_paths(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [convert_paths(item) for item in obj]
elif isinstance(obj, Path):
return str(obj)
else:
return obj
manifest_dict = convert_paths(manifest_dict)
with open(PLUGINS_MANIFEST, "wb") as f:
tomli_w.dump(manifest_dict, f)
logger.debug("Saved plugins manifest")
except OSError as e:
logger.error(f"Failed to save plugins manifest: {e}")
raise PluginError(f"Could not save plugins manifest: {e}") from e
def _load_plugin_configs(self) -> None:
"""Load plugin configurations from plugins.config.toml."""
if not PLUGINS_CONFIG.exists():
logger.debug("No plugin configs found")
return
try:
with open(PLUGINS_CONFIG, "rb") as f:
self._plugin_configs = tomllib.load(f)
logger.debug(f"Loaded configs for {len(self._plugin_configs)} plugins")
except (OSError, tomllib.TOMLDecodeError) as e:
logger.error(f"Failed to load plugin configs: {e}")
self._plugin_configs = {}
def _get_plugin_info(self, plugin_path: Path) -> PluginInfo:
"""Load and validate plugin.info.toml from a plugin directory."""
info_file = plugin_path / "plugin.info.toml"
if not info_file.exists():
raise PluginError(f"Plugin info file not found: {info_file}")
try:
with open(info_file, "rb") as f:
data = tomllib.load(f)
return PluginInfo.model_validate(data)
except (OSError, ValidationError, tomllib.TOMLDecodeError) as e:
raise PluginError(f"Invalid plugin info file {info_file}: {e}") from e
def _parse_git_source(self, source: str) -> tuple[str, str]:
"""Parse a git source string into platform and repo.
Examples:
"github:user/repo" -> ("github.com", "user/repo")
"gitlab:user/repo" -> ("gitlab.com", "user/repo")
"https://github.com/user/repo" -> ("github.com", "user/repo")
"/path/to/local/repo" -> ("local", "/path/to/local/repo")
"file:///path/to/repo" -> ("local", "/path/to/repo")
"""
# Handle local file paths
if source.startswith("file://"):
return "local", source[7:] # Remove file:// prefix
elif (
source.startswith("/")
or source.startswith("./")
or source.startswith("../")
):
return "local", source
if source.startswith("http"):
# Full URL provided
if "github.com" in source:
repo = source.split("github.com/")[-1].rstrip(".git")
return "github.com", repo
elif "gitlab.com" in source:
repo = source.split("gitlab.com/")[-1].rstrip(".git")
return "gitlab.com", repo
else:
raise PluginError(f"Unsupported git host in URL: {source}")
# Short format like "github:user/repo"
if ":" not in source:
raise PluginError(f"Invalid source format: {source}")
platform, repo = source.split(":", 1)
platform_map = {
"github": "github.com",
"gitlab": "gitlab.com",
}
if platform not in platform_map:
raise PluginError(f"Unsupported platform: {platform}")
return platform_map[platform], repo
def _clone_plugin(self, source: str, dest_path: Path) -> None:
"""Clone a plugin repository from Git."""
platform, repo = self._parse_git_source(source)
if platform == "local":
# Handle local repository - just copy the directory
import shutil
src_path = Path(repo).resolve()
if not src_path.exists():
raise PluginError(f"Local repository path does not exist: {src_path}")
if (src_path / ".git").exists():
logger.info(f"Copying local Git repository from {src_path}")
try:
# Use git clone to properly copy the repository
subprocess.run(
["git", "clone", str(src_path), str(dest_path)],
check=True,
capture_output=True,
text=True,
)
except subprocess.CalledProcessError as e:
raise PluginError(
f"Failed to clone local repository: {e.stderr}"
) from e
else:
logger.warning("Copying non git repo, local plugin")
shutil.copytree(src_path, dest_path)
else:
# Handle remote repository
git_url = f"https://{platform}/{repo}.git"
logger.info(f"Cloning plugin from {git_url}")
try:
subprocess.run(
["git", "clone", git_url, str(dest_path)],
check=True,
capture_output=True,
text=True,
)
except subprocess.CalledProcessError as e:
raise PluginError(f"Failed to clone plugin: {e.stderr}") from e
if not dest_path.exists():
raise PluginError(
"Plugin cloning failed - destination directory was not created"
)
# Check for git command availability
try:
subprocess.run(["git", "--version"], check=True, capture_output=True)
except FileNotFoundError:
raise PluginError("Git is not installed or not in PATH") from None
def _install_default_config(self, plugin_name: str, plugin_dir: Path) -> None:
"""Install default configuration from plugin's config.toml if it exists."""
default_config_path = plugin_dir / "config.toml"
if not default_config_path.exists():
logger.debug(f"No default config found for plugin '{plugin_name}'")
return
# Load the default config
try:
with open(default_config_path, "rb") as f:
default_config = tomllib.load(f)
except (OSError, tomllib.TOMLDecodeError) as e:
logger.warning(
f"Failed to load default config for plugin '{plugin_name}': {e}"
)
return
# Load existing plugins config or create empty dict
if PLUGINS_CONFIG.exists():
try:
with open(PLUGINS_CONFIG, "rb") as f:
existing_config = tomllib.load(f)
except (OSError, tomllib.TOMLDecodeError) as e:
logger.warning(f"Failed to load existing plugins config: {e}")
existing_config = {}
else:
existing_config = {}
# Check if plugin config already exists
if plugin_name in existing_config:
logger.debug(
f"Plugin '{plugin_name}' config already exists, skipping default config installation"
)
return
# Merge the default config
if plugin_name in default_config:
existing_config[plugin_name] = default_config[plugin_name]
# Write the updated config
try:
with open(PLUGINS_CONFIG, "wb") as f:
tomli_w.dump(existing_config, f)
logger.info(
f"Installed default configuration for plugin '{plugin_name}'"
)
except OSError as e:
logger.warning(
f"Failed to save default config for plugin '{plugin_name}': {e}"
)
else:
logger.debug(
f"No config section found for plugin '{plugin_name}' in default config"
)
# Global instance
plugin_manager = PluginManager()

View File

@@ -0,0 +1,91 @@
"""Plugin interface definitions for viu.
This module defines the Pydantic models that represent the structure
of plugin.info.toml files and plugin configurations.
"""
from pathlib import Path
from typing import Dict, Optional
from pydantic import BaseModel, Field
class PluginComponents(BaseModel):
"""Defines the components that a plugin provides.
Each component is defined as a string in the format:
{module_name_in_repo}:{ClassName_or_factory_function}
For example:
provider = "gogo_provider:GogoProvider"
player = "my_player:MyPlayer"
selector = "my_selector:MySelector"
command = "my_command:my_command_func"
"""
provider: Optional[str] = Field(
None,
description="Provider component in format 'module:class'"
)
player: Optional[str] = Field(
None,
description="Player component in format 'module:class'"
)
selector: Optional[str] = Field(
None,
description="Selector component in format 'module:class'"
)
command: Optional[str] = Field(
None,
description="Command component in format 'module:function'"
)
class PluginMetadata(BaseModel):
"""Plugin metadata from the [plugin] section."""
name: str = Field(description="Human-readable plugin name")
version: str = Field(description="Plugin version")
description: str = Field(description="Plugin description")
author: Optional[str] = Field(None, description="Plugin author")
homepage: Optional[str] = Field(None, description="Plugin homepage URL")
requires_python: Optional[str] = Field(
None,
description="Minimum Python version required"
)
class PluginInfo(BaseModel):
"""Complete plugin information from plugin.info.toml."""
plugin: PluginMetadata = Field(description="Plugin metadata")
components: PluginComponents = Field(description="Plugin components")
class InstalledPlugin(BaseModel):
"""Represents a plugin entry in plugins.toml."""
source: str = Field(description="Git source (e.g., 'github:user/repo')")
path: Path = Field(description="Local filesystem path to the plugin")
version: Optional[str] = Field(None, description="Installed version")
class PluginManifest(BaseModel):
"""Complete plugins.toml manifest structure."""
providers: Dict[str, InstalledPlugin] = Field(
default_factory=dict,
description="Installed provider plugins"
)
players: Dict[str, InstalledPlugin] = Field(
default_factory=dict,
description="Installed player plugins"
)
selectors: Dict[str, InstalledPlugin] = Field(
default_factory=dict,
description="Installed selector plugins"
)
commands: Dict[str, InstalledPlugin] = Field(
default_factory=dict,
description="Installed command plugins"
)

View File

@@ -30,8 +30,18 @@ class PlayerFactory:
ValueError: If the player_name is not supported.
NotImplementedError: If the player is recognized but not yet implemented.
"""
from ...core.plugins.manager import plugin_manager
player_name = config.stream.player
# Check if it's a plugin first
if plugin_manager.is_plugin("player", player_name):
try:
return plugin_manager.load_component("player", player_name)
except Exception as e:
raise ValueError(f"Could not load plugin player '{player_name}': {e}") from e
# Handle built-in players
if player_name not in PLAYERS:
raise ValueError(
f"Unsupported player: '{player_name}'. Supported players are: {PLAYERS}"

View File

@@ -1,5 +1,6 @@
import importlib
import logging
from typing import Union
from httpx import Client
@@ -21,12 +22,12 @@ class AnimeProviderFactory:
"""Factory for creating anime provider instances."""
@staticmethod
def create(provider_name: ProviderName) -> BaseAnimeProvider:
def create(provider_name: Union[ProviderName, str]) -> BaseAnimeProvider:
"""
Dynamically creates an instance of the specified anime provider.
This method imports the necessary provider module, instantiates its main class,
and injects a pre-configured HTTP client.
and injects a pre-configured HTTP client. It now also supports plugin providers.
Args:
provider_name: The name of the provider to create (e.g., 'allanime').
@@ -38,24 +39,43 @@ class AnimeProviderFactory:
ValueError: If the provider_name is not supported.
ImportError: If the provider module or class cannot be found.
"""
from ....core.plugins.manager import plugin_manager
from ....core.utils.networking import random_user_agent
# Convert to string if it's an enum
if isinstance(provider_name, ProviderName):
provider_str = provider_name.value
else:
provider_str = str(provider_name)
# Check if it's a plugin first
if plugin_manager.is_plugin("provider", provider_str):
try:
return plugin_manager.load_component("provider", provider_str)
except Exception as e:
logger.error(f"Failed to load plugin provider '{provider_str}': {e}")
raise ImportError(f"Could not load plugin provider '{provider_str}': {e}") from e
# Handle built-in providers
if provider_str.lower() not in PROVIDERS_AVAILABLE:
raise ValueError(f"Provider '{provider_str}' is not available")
# Correctly determine module and class name from the map
import_path = PROVIDERS_AVAILABLE[provider_name.value.lower()]
import_path = PROVIDERS_AVAILABLE[provider_str.lower()]
module_name, class_name = import_path.split(".", 1)
# Construct the full package path for dynamic import
package_path = f"viu_media.libs.provider.anime.{provider_name.value.lower()}"
package_path = f"viu_media.libs.provider.anime.{provider_str.lower()}"
try:
provider_module = importlib.import_module(f".{module_name}", package_path)
provider_class = getattr(provider_module, class_name)
except (ImportError, AttributeError) as e:
logger.error(
f"Failed to load provider '{provider_name.value.lower()}': {e}"
f"Failed to load provider '{provider_str}': {e}"
)
raise ImportError(
f"Could not load provider '{provider_name.value.lower()}'. "
f"Could not load provider '{provider_str}'. "
"Check the module path and class name in PROVIDERS_AVAILABLE."
) from e

View File

@@ -14,8 +14,18 @@ class SelectorFactory:
"""
Factory to create a selector instance based on the configuration.
"""
from ...core.plugins.manager import plugin_manager
selector_name = config.general.selector
# Check if it's a plugin first
if plugin_manager.is_plugin("selector", selector_name):
try:
return plugin_manager.load_component("selector", selector_name)
except Exception as e:
raise ValueError(f"Could not load plugin selector '{selector_name}': {e}") from e
# Handle built-in selectors
if selector_name not in SELECTORS:
raise ValueError(
f"Unsupported selector: '{selector_name}'.Available selectors are: {SELECTORS}"