Compare commits

...

34 Commits

Author SHA1 Message Date
Benexl
2217f011af fix(core-constants): use project name over cli name 2025-11-01 20:06:53 +03:00
Benexl
5960a7c502 feat(notifications): use seconds instead of minutes 2025-11-01 19:50:46 +03:00
Benexl
bd0309ee85 feat(dev): add .venv/bin to path using direnv 2025-11-01 19:15:45 +03:00
Benexl
3724f06e33 fix(allanime-anime-provider): not giving different qualities 2025-11-01 17:26:45 +03:00
Benexl
d20af89fc8 feat(debug-anime-provider-utils): allow for quality selection 2025-11-01 16:48:51 +03:00
Benexl
3872b4c8a8 feat(search-command): allow quality selection 2025-11-01 16:48:07 +03:00
Benexl
9545b893e1 feat(search-command): if no title is provided as an option prompt it 2025-11-01 16:47:28 +03:00
Benexl
5634214fb8 chore(ci): update stale.yml to emphasize devs limited time 2025-10-27 00:33:36 +03:00
Benexl
66c0ada29d chore(ci): update days to closure of pr or issue 2025-10-27 00:24:07 +03:00
Benexl
02465b4ddb chore(ci): add stale.yml 2025-10-27 00:19:07 +03:00
Benexl
5ffd94ac24 chore(pre-commit): update pre-commit config to use only Ruff 2025-10-26 23:47:28 +03:00
Benexl
d2864df6d0 style(dev): add extra space inorder to pass ruff fmt 2025-10-26 23:37:19 +03:00
Benexl
2a28e3b9a3 chore: temporarily disable tests in workflow 2025-10-26 23:32:05 +03:00
Benexl
7b8027a8b3 fix(viu): correct import path 2025-10-26 23:28:23 +03:00
Benexl
2a36152c38 fix(provider-scraping-html-parser): pyright errors 2025-10-26 23:26:36 +03:00
Benexl
2048c7b743 fix(inquirer-selector): pyright errors 2025-10-26 23:25:55 +03:00
Benexl
133fd4c1c8 chore: run ruff check --fix 2025-10-26 23:20:30 +03:00
Benexl
e22120fe99 fix(allanime-anime-provider-utils): pyright errors 2025-10-26 23:19:36 +03:00
Benexl
44e6220662 chore: cleanup; directly implement syncplay logic in the actual players 2025-10-26 23:16:23 +03:00
Benexl
1fea1335c6 chore: move to feature branch 2025-10-26 23:10:05 +03:00
Benexl
8b664fae36 chore: move to feature branch 2025-10-26 23:09:53 +03:00
Benexl
19a85511b4 chore: move to feature branch 2025-10-26 23:09:42 +03:00
Benexl
205299108b fix(media-api-debug-utils): pyright errors 2025-10-26 23:05:31 +03:00
Benexl
7670bdd2f3 fix(jikan-media-api-mapper): pyright errors 2025-10-26 23:03:05 +03:00
Benexl
cd3f7f7fb8 fix(anilist-media-api-mapper): pyright errors 2025-10-26 22:58:12 +03:00
Benexl
5be03ed5b8 fix(core-concurrency-utils): pyright errors 2025-10-26 22:56:17 +03:00
Benexl
6581179336 fix(yt-dlp-downloader): pyright errors 2025-10-26 22:53:56 +03:00
Benexl
2bb674f4a0 fix(cli-image-utils): pyright errors 2025-10-26 22:49:32 +03:00
Benexl
642e77f601 fix(config-editor): pyright errors 2025-10-26 22:37:57 +03:00
Benexl
a5e99122f5 fix(registry-cmds): pyright errors 2025-10-26 21:30:10 +03:00
Benexl
39bd7bed61 chore: update deps 2025-10-26 20:18:08 +03:00
Benexl
869072633b chore: create .python-version 2025-10-26 20:17:47 +03:00
Benexl
cbd788a573 chore: bump python version for pyright 2025-10-26 20:13:49 +03:00
Benexl
11fe54b146 chore: update lock file 2025-10-26 19:17:48 +03:00
45 changed files with 310 additions and 511 deletions

3
.envrc
View File

@@ -1,5 +1,6 @@
VIU_APP_NAME="viu-dev"
export VIU_APP_NAME
PATH="./.venv/bin/:$PATH"
export PATH VIU_APP_NAME
if command -v nix >/dev/null;then
use flake
fi

57
.github/workflows/stale.yml vendored Normal file
View File

@@ -0,0 +1,57 @@
name: Mark Stale Issues and Pull Requests
on:
schedule:
# Runs every day at 6:30 UTC
- cron: "30 6 * * *"
# Allows you to run this workflow manually from the Actions tab for testing
workflow_dispatch:
jobs:
stale:
runs-on: ubuntu-latest
permissions:
issues: write
pull-requests: write
steps:
- uses: actions/stale@v5
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
stale-issue-message: |
Greetings @{{author}},
This bug report is like an ancient scroll detailing a legendary beast. Our small guild of developers is often on many quests at once, so our response times can be slower than a tortoise in a time-stop spell. We deeply appreciate your patience!
**Seeking Immediate Help or Discussion?**
Our **[Discord Tavern](https://discord.gg/HBEmAwvbHV)** is the best place to get a quick response from the community for general questions or setup help!
**Want to Be the Hero?**
You could try to tame this beast yourself! With modern grimoires (like AI coding assistants) and our **[Contribution Guide](https://github.com/viu-media/Viu/blob/master/CONTRIBUTIONS.md)**, you might just be the hero we're waiting for. We would be thrilled to review your solution!
---
To keep our quest board tidy, we need to know if this creature is still roaming the lands in the latest version of `viu`. If we don't get an update within **7 days**, we'll assume it has vanished and archive the scroll.
Thanks for being our trusted scout!
stale-pr-message: |
Hello @{{author}}, it looks like this powerful contribution has been left in the middle of its training arc! 💪
Our review dojo is managed by just a few senseis who are sometimes away on long missions, so thank you for your patience as we work through the queue.
We were excited to see this new technique being developed. Are you still planning to complete its training, or have you embarked on a different quest? If you need a sparring partner (reviewer) or some guidance from a senpai, just let us know!
To keep our dojo tidy, we'll be archiving unfinished techniques. If we don't hear back within **7 days**, we'll assume it's time to close this PR for now. You can always resume your training and reopen it when you're ready.
Thank you for your incredible effort!
# --- Labels and Timing ---
stale-issue-label: "stale"
stale-pr-label: "stale"
# How many days of inactivity before an issue/PR is marked as stale.
days-before-stale: 14
# How many days of inactivity to wait before closing a stale issue/PR.
days-before-close: 7

View File

@@ -13,7 +13,7 @@ jobs:
strategy:
matrix:
python-version: ["3.11", "3.12"]
python-version: ["3.11", "3.12"]
steps:
- uses: actions/checkout@v4
@@ -41,5 +41,7 @@ jobs:
- name: Run type checking
run: uv run pyright
- name: Run tests
run: uv run pytest tests
# TODO: write tests
# - name: Run tests
# run: uv run pytest tests

View File

@@ -1,33 +1,10 @@
default_language_version:
python: python3.12
repos:
- repo: https://github.com/pycqa/isort
rev: 5.12.0
- repo: https://github.com/astral-sh/ruff-pre-commit
# Ruff version.
rev: v0.14.2
hooks:
- id: isort
name: isort (python)
args: ["--profile", "black"]
- repo: https://github.com/PyCQA/autoflake
rev: v2.2.1
hooks:
- id: autoflake
args:
[
"--in-place",
"--remove-unused-variables",
"--remove-all-unused-imports",
]
# - repo: https://github.com/astral-sh/ruff-pre-commit
# rev: v0.4.10
# hooks:
# - id: ruff
# args: [--fix]
- repo: https://github.com/psf/black-pre-commit-mirror
rev: 24.4.2
hooks:
- id: black
name: black
#language_version: python3.10
# Run the linter.
- id: ruff-check
args: [--fix]
# Run the formatter.
- id: ruff-format

1
.python-version Normal file
View File

@@ -0,0 +1 @@
3.11

View File

@@ -1,9 +1,10 @@
#!/usr/bin/env -S uv run --script
import httpx
import json
from viu_media.core.utils.graphql import execute_graphql
from pathlib import Path
from collections import defaultdict
from pathlib import Path
import httpx
from viu_media.core.utils.graphql import execute_graphql
DEV_DIR = Path(__file__).resolve().parent
media_tags_type_py = (
@@ -26,6 +27,7 @@ template = """\
from enum import Enum
class MediaTag(Enum):\
"""

View File

@@ -1,5 +1,5 @@
{
"venvPath": ".",
"venv": ".venv",
"pythonVersion": "3.11"
"pythonVersion": "3.12"
}

8
uv.lock generated
View File

@@ -3441,15 +3441,15 @@ wheels = [
[[package]]
name = "pyright"
version = "1.1.406"
version = "1.1.407"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "nodeenv" },
{ name = "typing-extensions" },
]
sdist = { url = "https://files.pythonhosted.org/packages/f7/16/6b4fbdd1fef59a0292cbb99f790b44983e390321eccbc5921b4d161da5d1/pyright-1.1.406.tar.gz", hash = "sha256:c4872bc58c9643dac09e8a2e74d472c62036910b3bd37a32813989ef7576ea2c", size = 4113151, upload-time = "2025-10-02T01:04:45.488Z" }
sdist = { url = "https://files.pythonhosted.org/packages/a6/1b/0aa08ee42948b61745ac5b5b5ccaec4669e8884b53d31c8ec20b2fcd6b6f/pyright-1.1.407.tar.gz", hash = "sha256:099674dba5c10489832d4a4b2d302636152a9a42d317986c38474c76fe562262", size = 4122872, upload-time = "2025-10-24T23:17:15.145Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/f6/a2/e309afbb459f50507103793aaef85ca4348b66814c86bc73908bdeb66d12/pyright-1.1.406-py3-none-any.whl", hash = "sha256:1d81fb43c2407bf566e97e57abb01c811973fdb21b2df8df59f870f688bdca71", size = 5980982, upload-time = "2025-10-02T01:04:43.137Z" },
{ url = "https://files.pythonhosted.org/packages/dc/93/b69052907d032b00c40cb656d21438ec00b3a471733de137a3f65a49a0a0/pyright-1.1.407-py3-none-any.whl", hash = "sha256:6dd419f54fcc13f03b52285796d65e639786373f433e243f8b94cf93a7444d21", size = 5997008, upload-time = "2025-10-24T23:17:13.159Z" },
]
[[package]]
@@ -3751,7 +3751,7 @@ wheels = [
[[package]]
name = "viu-media"
version = "3.2.7"
version = "3.2.8"
source = { editable = "." }
dependencies = [
{ name = "click" },

View File

@@ -113,6 +113,7 @@ def _create_tar_backup(
api: str,
):
"""Create a tar-based backup."""
# TODO: Add support for bz2/xz compression if needed
mode = "w:gz" if compress else "w"
with tarfile.open(output_path, mode) as tar:

View File

@@ -5,6 +5,7 @@ Registry restore command - restore registry from backup files
import json
import shutil
import tarfile
import zipfile
from datetime import datetime
from pathlib import Path
@@ -25,6 +26,11 @@ from ....service.registry.service import MediaRegistryService
is_flag=True,
help="Create backup of current registry before restoring",
)
@click.option(
"--backup-current-tar-compression-fmt",
type=click.Choice(["gz", "bz2", "xz"], case_sensitive=False),
help="The compression format to use for the current registry backup (if enabled)",
)
@click.option("--verify", is_flag=True, help="Verify backup integrity before restoring")
@click.option(
"--api",
@@ -38,6 +44,7 @@ def restore(
backup_file: Path,
force: bool,
backup_current: bool,
backup_current_compression_fmt: str,
verify: bool,
api: str,
):
@@ -61,7 +68,7 @@ def restore(
"Verification Failed",
"Backup file appears to be corrupted or invalid",
)
raise click.Abort()
return
feedback.success("Verification", "Backup file integrity verified")
# Check if current registry exists
@@ -77,7 +84,13 @@ def restore(
# Create backup of current registry if requested
if backup_current and registry_exists:
_backup_current_registry(registry_service, api, feedback)
_backup_current_registry(
registry_service,
api,
feedback,
backup_format=backup_format,
compression_fmt=backup_current_compression_fmt,
)
# Show restore summary
_show_restore_summary(backup_file, backup_format, feedback)
@@ -110,7 +123,13 @@ def restore(
def _detect_backup_format(backup_file: Path) -> str:
"""Detect backup file format."""
suffixes = "".join(backup_file.suffixes).lower()
if ".tar" in suffixes or ".gz" in suffixes or ".tgz" in suffixes:
if (
".tar" in suffixes
or ".gz" in suffixes
or ".tgz" in suffixes
or ".bz2" in suffixes
or ".xz" in suffixes
):
return "tar"
elif ".zip" in suffixes:
return "zip"
@@ -122,25 +141,38 @@ def _verify_backup(
) -> bool:
"""Verify backup file integrity."""
try:
metadata = {}
has_registry = has_index = has_metadata = False
if format_type == "tar":
with tarfile.open(backup_file, "r:*") as tar:
names = tar.getnames()
has_registry = any("registry/" in name for name in names)
has_index = any("index/" in name for name in names)
has_metadata = "backup_metadata.json" in names
for name in names:
if name == "registry/":
has_registry = True
continue
if name == "index/":
has_index = True
continue
if name == "backup_metadata.json":
has_metadata = True
continue
if has_metadata:
metadata_member = tar.getmember("backup_metadata.json")
if metadata_file := tar.extractfile(metadata_member):
metadata = json.load(metadata_file)
else: # zip
import zipfile
with zipfile.ZipFile(backup_file, "r") as zip_file:
names = zip_file.namelist()
has_registry = any("registry/" in name for name in names)
has_index = any("index/" in name for name in names)
has_metadata = "backup_metadata.json" in names
for name in names:
if name == "registry/":
has_registry = True
continue
if name == "index/":
has_index = True
continue
if name == "backup_metadata.json":
has_metadata = True
continue
if has_metadata:
with zip_file.open("backup_metadata.json") as metadata_file:
metadata = json.load(metadata_file)
@@ -163,27 +195,42 @@ def _verify_backup(
def _check_registry_exists(registry_service: MediaRegistryService) -> bool:
"""Check if a registry already exists."""
try:
stats = registry_service.get_registry_stats()
return stats.get("total_media", 0) > 0
except Exception:
return False
# TODO: Improve this check to be more robust
return registry_service.media_registry_dir.exists() and any(
registry_service.media_registry_dir.iterdir()
)
def _backup_current_registry(
registry_service: MediaRegistryService, api: str, feedback: FeedbackService
registry_service: MediaRegistryService,
api: str,
feedback: FeedbackService,
backup_format: str,
compression_fmt: str,
):
"""Create backup of current registry before restoring."""
from .backup import _create_tar_backup
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_path = Path(f"viu_registry_pre_restore_{api}_{timestamp}.tar.gz")
if backup_format == "tar":
from .backup import _create_tar_backup
try:
_create_tar_backup(registry_service, backup_path, True, False, feedback, api)
feedback.success("Current Registry Backed Up", f"Saved to {backup_path}")
except Exception as e:
feedback.warning("Backup Warning", f"Failed to backup current registry: {e}")
backup_path = Path(f"viu_registry_pre_restore_{api}_{timestamp}.tar.gz")
try:
_create_tar_backup(
registry_service, backup_path, True, False, feedback, api
)
feedback.success("Current Registry Backed Up", f"Saved to {backup_path}")
except Exception as e:
feedback.warning(
"Backup Warning", f"Failed to backup current registry: {e}"
)
else:
from .backup import _create_zip_backup
backup_path = Path(f"viu_registry_pre_restore_{api}_{timestamp}.zip")
_create_zip_backup(registry_service, backup_path, True, feedback, api)
def _show_restore_summary(

View File

@@ -30,7 +30,6 @@ if TYPE_CHECKING:
@click.option(
"--anime-title",
"-t",
required=True,
shell_complete=anime_titles_shell_complete,
multiple=True,
help="Specify which anime to download",
@@ -52,6 +51,10 @@ def search(config: AppConfig, **options: "Unpack[Options]"):
from ...libs.provider.anime.provider import create_provider
from ...libs.selectors.selector import create_selector
if not options["anime_title"]:
raw = click.prompt("What are you in the mood for? (comma-separated)")
options["anime_title"] = [a.strip() for a in raw.split(",") if a.strip()]
feedback = FeedbackService(config)
provider = create_provider(config.general.provider)
selector = create_selector(config)
@@ -173,6 +176,22 @@ def stream_anime(
if not server_name:
raise ViuError("Server not selected")
server = servers[server_name]
quality = [
ep_stream.link
for ep_stream in server.links
if ep_stream.quality == config.stream.quality
]
if not quality:
feedback.warning("Preferred quality not found, selecting quality...")
stream_link = selector.choose(
"Select Quality", [link.quality for link in server.links]
)
if not stream_link:
raise ViuError("Quality not selected")
stream_link = next(
(link.link for link in server.links if link.quality == stream_link), None
)
stream_link = server.links[0].link
if not stream_link:
raise ViuError(

View File

@@ -1,6 +1,5 @@
"""Update command for Viu CLI."""
import sys
from typing import TYPE_CHECKING
import click

View File

@@ -2,6 +2,7 @@ import textwrap
from pathlib import Path
from typing import Any, Literal, get_args, get_origin
# TODO: should we maintain a separate dependency for InquirerPy or write our own simple prompt system?
from InquirerPy import inquirer
from InquirerPy.validator import NumberValidator
from pydantic import BaseModel
@@ -28,7 +29,7 @@ class InteractiveConfigEditor:
if not isinstance(section_model, BaseModel):
continue
if not inquirer.confirm(
if not inquirer.confirm( # pyright: ignore[reportPrivateImportUsage]
message=f"Configure '{section_name.title()}' settings?",
default=True,
).execute():
@@ -83,14 +84,14 @@ class InteractiveConfigEditor:
# Boolean fields
if field_type is bool:
return inquirer.confirm(
return inquirer.confirm( # pyright: ignore[reportPrivateImportUsage]
message=message, default=current_value, long_instruction=help_text
)
# Literal (Choice) fields
if hasattr(field_type, "__origin__") and get_origin(field_type) is Literal:
choices = list(get_args(field_type))
return inquirer.select(
return inquirer.select( # pyright: ignore[reportPrivateImportUsage]
message=message,
choices=choices,
default=current_value,
@@ -99,7 +100,7 @@ class InteractiveConfigEditor:
# Numeric fields
if field_type is int:
return inquirer.number(
return inquirer.number( # pyright: ignore[reportPrivateImportUsage]
message=message,
default=int(current_value),
long_instruction=help_text,
@@ -110,7 +111,7 @@ class InteractiveConfigEditor:
validate=NumberValidator(),
)
if field_type is float:
return inquirer.number(
return inquirer.number( # pyright: ignore[reportPrivateImportUsage]
message=message,
default=float(current_value),
float_allowed=True,
@@ -120,7 +121,7 @@ class InteractiveConfigEditor:
# Path fields
if field_type is Path:
# Use text prompt for paths to allow '~' expansion, as FilePathPrompt can be tricky
return inquirer.text(
return inquirer.text( # pyright: ignore[reportPrivateImportUsage]
message=message, default=str(current_value), long_instruction=help_text
)
@@ -128,13 +129,13 @@ class InteractiveConfigEditor:
if field_type is str:
# Check for 'examples' to provide choices
if hasattr(field_info, "examples") and field_info.examples:
return inquirer.fuzzy(
return inquirer.fuzzy( # pyright: ignore[reportPrivateImportUsage]
message=message,
choices=field_info.examples,
default=str(current_value),
long_instruction=help_text,
)
return inquirer.text(
return inquirer.text( # pyright: ignore[reportPrivateImportUsage]
message=message, default=str(current_value), long_instruction=help_text
)

View File

@@ -296,8 +296,7 @@ class DownloadService:
message=message,
app_name="Viu",
app_icon=app_icon,
timeout=self.app_config.general.desktop_notification_duration
* 60,
timeout=self.app_config.general.desktop_notification_duration,
)
except: # noqa: E722
pass
@@ -318,7 +317,7 @@ class DownloadService:
message=message,
app_name="Viu",
app_icon=app_icon,
timeout=self.app_config.general.desktop_notification_duration * 60,
timeout=self.app_config.general.desktop_notification_duration,
)
except: # noqa: E722
pass

View File

@@ -41,7 +41,7 @@ class FeedbackService:
message=message,
app_name=CLI_NAME,
app_icon=str(ICON_PATH),
timeout=self.app_config.general.desktop_notification_duration * 60,
timeout=self.app_config.general.desktop_notification_duration,
)
return
except: # noqa: E722
@@ -67,7 +67,7 @@ class FeedbackService:
message=message,
app_name=CLI_NAME,
app_icon=str(ICON_PATH),
timeout=self.app_config.general.desktop_notification_duration * 60,
timeout=self.app_config.general.desktop_notification_duration,
)
return
except: # noqa: E722
@@ -94,7 +94,7 @@ class FeedbackService:
message=message,
app_name=CLI_NAME,
app_icon=str(ICON_PATH),
timeout=self.app_config.general.desktop_notification_duration * 60,
timeout=self.app_config.general.desktop_notification_duration,
)
return
except: # noqa: E722
@@ -120,7 +120,7 @@ class FeedbackService:
message=message,
app_name=CLI_NAME,
app_icon=str(ICON_PATH),
timeout=self.app_config.general.desktop_notification_duration * 60,
timeout=self.app_config.general.desktop_notification_duration,
)
return
except: # noqa: E722
@@ -176,7 +176,7 @@ class FeedbackService:
message="No current way to display info in rofi, use fzf and the terminal instead",
app_name=CLI_NAME,
app_icon=str(ICON_PATH),
timeout=self.app_config.general.desktop_notification_duration * 60,
timeout=self.app_config.general.desktop_notification_duration,
)
return
except: # noqa: E722

View File

@@ -101,7 +101,7 @@ class NotificationService:
message=message,
app_name="Viu",
app_icon=app_icon, # plyer supports file paths or URLs depending on platform
timeout=self.app_config.general.desktop_notification_duration * 60,
timeout=self.app_config.general.desktop_notification_duration,
)
logger.info(f"Displayed notification: {message}")
self._mark_seen(

View File

@@ -3,6 +3,8 @@ import shutil
import subprocess
from pathlib import Path
from typing import Optional
from viu_media.core.exceptions import DependencyNotFoundError
import importlib.util
import click
import httpx
@@ -43,67 +45,74 @@ def resize_image_from_url(
"""
from io import BytesIO
from PIL import Image
if importlib.util.find_spec("PIL"):
from PIL import Image # pyright: ignore[reportMissingImports]
if not return_bytes and output_path is None:
raise ValueError("output_path must be provided if return_bytes is False.")
if not return_bytes and output_path is None:
raise ValueError("output_path must be provided if return_bytes is False.")
try:
# Use the provided synchronous client
response = client.get(url)
response.raise_for_status() # Raise an exception for bad status codes
try:
# Use the provided synchronous client
response = client.get(url)
response.raise_for_status() # Raise an exception for bad status codes
image_bytes = response.content
image_stream = BytesIO(image_bytes)
img = Image.open(image_stream)
image_bytes = response.content
image_stream = BytesIO(image_bytes)
img = Image.open(image_stream)
if maintain_aspect_ratio:
img_copy = img.copy()
img_copy.thumbnail((new_width, new_height), Image.Resampling.LANCZOS)
resized_img = img_copy
else:
resized_img = img.resize((new_width, new_height), Image.Resampling.LANCZOS)
if return_bytes:
# Determine the output format. Default to JPEG if original is unknown or problematic.
# Handle RGBA to RGB conversion for JPEG output.
output_format = (
img.format if img.format in ["JPEG", "PNG", "WEBP"] else "JPEG"
)
if output_format == "JPEG":
if resized_img.mode in ("RGBA", "P"):
resized_img = resized_img.convert("RGB")
byte_arr = BytesIO()
resized_img.save(byte_arr, format=output_format)
logger.info(
f"Image from {url} resized to {resized_img.width}x{resized_img.height} and returned as bytes ({output_format} format)."
)
return byte_arr.getvalue()
else:
# Ensure the directory exists before saving
if output_path:
output_path.parent.mkdir(parents=True, exist_ok=True)
resized_img.save(output_path)
logger.info(
f"Image from {url} resized to {resized_img.width}x{resized_img.height} and saved as '{output_path}'"
if maintain_aspect_ratio:
img_copy = img.copy()
img_copy.thumbnail((new_width, new_height), Image.Resampling.LANCZOS)
resized_img = img_copy
else:
resized_img = img.resize(
(new_width, new_height), Image.Resampling.LANCZOS
)
return None
except httpx.RequestError as e:
logger.error(f"An error occurred while requesting {url}: {e}")
return None
except httpx.HTTPStatusError as e:
logger.error(
f"HTTP error occurred: {e.response.status_code} - {e.response.text}"
if return_bytes:
# Determine the output format. Default to JPEG if original is unknown or problematic.
# Handle RGBA to RGB conversion for JPEG output.
output_format = (
img.format if img.format in ["JPEG", "PNG", "WEBP"] else "JPEG"
)
if output_format == "JPEG":
if resized_img.mode in ("RGBA", "P"):
resized_img = resized_img.convert("RGB")
byte_arr = BytesIO()
resized_img.save(byte_arr, format=output_format)
logger.info(
f"Image from {url} resized to {resized_img.width}x{resized_img.height} and returned as bytes ({output_format} format)."
)
return byte_arr.getvalue()
else:
# Ensure the directory exists before saving
if output_path:
output_path.parent.mkdir(parents=True, exist_ok=True)
resized_img.save(output_path)
logger.info(
f"Image from {url} resized to {resized_img.width}x{resized_img.height} and saved as '{output_path}'"
)
return None
except httpx.RequestError as e:
logger.error(f"An error occurred while requesting {url}: {e}")
return None
except httpx.HTTPStatusError as e:
logger.error(
f"HTTP error occurred: {e.response.status_code} - {e.response.text}"
)
return None
except ValueError as e:
logger.error(f"Configuration error: {e}")
return None
except Exception as e:
logger.error(f"An unexpected error occurred: {e}")
return None
else:
raise DependencyNotFoundError(
"Pillow library is required for image processing. Please install it via 'uv pip install Pillow'."
)
return None
except ValueError as e:
logger.error(f"Configuration error: {e}")
return None
except Exception as e:
logger.error(f"An unexpected error occurred: {e}")
return None
def render(url: str, capture: bool = False, size: str = "30x30") -> Optional[str]:
@@ -123,17 +132,12 @@ def render(url: str, capture: bool = False, size: str = "30x30") -> Optional[str
If capture is False, prints directly to the terminal and returns None.
Returns None on any failure.
"""
# --- Common subprocess arguments ---
subprocess_kwargs = {
"check": False, # We will handle errors manually
"capture_output": capture,
"text": capture, # Decode stdout/stderr as text if capturing
}
# --- Try icat (Kitty terminal) first ---
if icat_executable := shutil.which("icat"):
process = subprocess.run(
[icat_executable, "--align", "left", url], **subprocess_kwargs
[icat_executable, "--align", "left", url],
capture_output=capture,
text=capture,
)
if process.returncode == 0:
return process.stdout if capture else None
@@ -148,11 +152,11 @@ def render(url: str, capture: bool = False, size: str = "30x30") -> Optional[str
response.raise_for_status()
img_bytes = response.content
# Add stdin input to the subprocess arguments
subprocess_kwargs["input"] = img_bytes
process = subprocess.run(
[chafa_executable, f"--size={size}", "-"], **subprocess_kwargs
[chafa_executable, f"--size={size}", "-"],
capture_output=capture,
text=capture,
input=img_bytes,
)
if process.returncode == 0:
return process.stdout if capture else None

View File

@@ -6,7 +6,7 @@ GENERAL_PYGMENT_STYLE = "github-dark"
GENERAL_PREFERRED_SPINNER = "smiley"
GENERAL_API_CLIENT = "anilist"
GENERAL_PREFERRED_TRACKER = "local"
GENERAL_DESKTOP_NOTIFICATION_DURATION = 5
GENERAL_DESKTOP_NOTIFICATION_DURATION = 5 * 60
GENERAL_PROVIDER = "allanime"

View File

@@ -25,7 +25,7 @@ ANILIST_AUTH = (
)
try:
APP_DIR = Path(str(resources.files(CLI_NAME.lower())))
APP_DIR = Path(str(resources.files(PROJECT_NAME.lower())))
except ModuleNotFoundError:
from pathlib import Path

View File

@@ -130,10 +130,11 @@ class YtDLPDownloader(BaseDownloader):
}
)
with yt_dlp.YoutubeDL(opts) as ydl:
# TODO: Confirm this type issues
with yt_dlp.YoutubeDL(opts) as ydl: # type: ignore
info = ydl.extract_info(params.url, download=True)
if info:
_video_path = info["requested_downloads"][0]["filepath"]
_video_path = info["requested_downloads"][0]["filepath"] # type: ignore
if _video_path.endswith(".unknown_video"):
print("Normalizing path...")
_vid_path = _video_path.replace(".unknown_video", ".mp4")

View File

@@ -219,7 +219,7 @@ class BackgroundWorker(ABC):
else:
# Wait for tasks to complete with timeout
try:
self._executor.shutdown(wait=True, timeout=timeout)
self._executor.shutdown(wait=True)
except TimeoutError:
logger.warning(
f"Worker {self.name} shutdown timed out, forcing cancellation"

View File

@@ -1,22 +0,0 @@
from httpx import get
ANISKIP_ENDPOINT = "https://api.aniskip.com/v1/skip-times"
# TODO: Finish own implementation of aniskip script
class AniSkip:
@classmethod
def get_skip_times(
cls, mal_id: int, episode_number: float | int, types=["op", "ed"]
):
url = f"{ANISKIP_ENDPOINT}/{mal_id}/{episode_number}?types=op&types=ed"
response = get(url)
print(response.text)
return response.json()
if __name__ == "__main__":
mal_id = input("Mal id: ")
episode_number = input("episode_number: ")
skip_times = AniSkip.get_skip_times(int(mal_id), float(episode_number))
print(skip_times)

View File

@@ -1,3 +0,0 @@
from .api import connect
__all__ = ["connect"]

View File

@@ -1,13 +0,0 @@
import time
from pypresence import Presence
def connect(show, episode, switch):
presence = Presence(client_id="1292070065583165512")
presence.connect()
if not switch.is_set():
presence.update(details=show, state="Watching episode " + episode)
time.sleep(10)
else:
presence.close()

View File

@@ -6,6 +6,7 @@
from enum import Enum
class MediaTag(Enum):
#
# TECHNICAL

View File

@@ -33,6 +33,7 @@ from ..types import (
Studio,
UserListItem,
UserMediaListStatus,
MediaType,
UserProfile,
)
from .types import (
@@ -539,7 +540,7 @@ def _to_generic_media_item_from_notification_partial(
title=_to_generic_media_title(data["title"]),
cover_image=_to_generic_media_image(data["coverImage"]),
# Provide default/empty values for fields not in notification payload
type="ANIME",
type=MediaType.ANIME,
status=MediaStatus.RELEASING, # Assume releasing for airing notifications
format=None,
description=None,

View File

@@ -6,6 +6,7 @@ from ..types import (
MediaImage,
MediaItem,
MediaSearchResult,
MediaStatus,
MediaTitle,
PageInfo,
Studio,
@@ -17,9 +18,9 @@ if TYPE_CHECKING:
# Jikan uses specific strings for status, we can map them to our generic enum.
JIKAN_STATUS_MAP = {
"Finished Airing": "FINISHED",
"Currently Airing": "RELEASING",
"Not yet aired": "NOT_YET_RELEASED",
"Finished Airing": MediaStatus.FINISHED,
"Currently Airing": MediaStatus.RELEASING,
"Not yet aired": MediaStatus.NOT_YET_RELEASED,
}
@@ -42,7 +43,11 @@ def _to_generic_title(jikan_titles: list[dict]) -> MediaTitle:
elif type_ == "Japanese":
native = title_
return MediaTitle(romaji=romaji, english=english, native=native)
return MediaTitle(
romaji=romaji,
english=english or romaji or native or "NOT AVAILABLE",
native=native,
)
def _to_generic_image(jikan_images: dict) -> MediaImage:
@@ -69,7 +74,7 @@ def _to_generic_media_item(data: dict) -> MediaItem:
id_mal=data["mal_id"],
title=_to_generic_title(data.get("titles", [])),
cover_image=_to_generic_image(data.get("images", {})),
status=JIKAN_STATUS_MAP.get(data.get("status", ""), None),
status=JIKAN_STATUS_MAP.get(data.get("status", ""), MediaStatus.UNKNOWN),
episodes=data.get("episodes"),
duration=data.get("duration"),
average_score=score,
@@ -81,7 +86,7 @@ def _to_generic_media_item(data: dict) -> MediaItem:
Studio(id=s["mal_id"], name=s["name"]) for s in data.get("studios", [])
],
# Jikan doesn't provide streaming episodes
streaming_episodes=[],
streaming_episodes={},
# Jikan doesn't provide user list status in its search results.
user_status=None,
)

View File

@@ -15,6 +15,7 @@ class MediaStatus(Enum):
NOT_YET_RELEASED = "NOT_YET_RELEASED"
CANCELLED = "CANCELLED"
HIATUS = "HIATUS"
UNKNOWN = "UNKNOWN"
class MediaType(Enum):

View File

@@ -30,8 +30,6 @@ def test_media_api(api_client: BaseApiClient):
"""
from ....core.constants import APP_ASCII_ART
from ..params import (
MediaAiringScheduleParams,
MediaCharactersParams,
MediaRecommendationParams,
MediaRelationsParams,
MediaSearchParams,
@@ -125,47 +123,10 @@ def test_media_api(api_client: BaseApiClient):
print()
# Test 5: Get Characters
print("5. Testing Character Information...")
try:
characters = api_client.get_characters_of(
MediaCharactersParams(id=selected_anime.id)
)
if characters and characters.get("data"):
char_data = characters["data"]["Page"]["media"][0]["characters"]["nodes"]
if char_data:
print(f" Found {len(char_data)} characters:")
for char in char_data[:3]: # Show first 3
name = char["name"]["full"] or char["name"]["first"]
print(f" - {name}")
else:
print(" No character data found")
else:
print(" No characters found")
except Exception as e:
print(f" Error: {e}")
print()
# TODO: Recreate this test
# Test 6: Get Airing Schedule
print("6. Testing Airing Schedule...")
try:
schedule = api_client.get_airing_schedule_for(
MediaAiringScheduleParams(id=selected_anime.id)
)
if schedule and schedule.get("data"):
schedule_data = schedule["data"]["Page"]["media"][0]["airingSchedule"][
"nodes"
]
if schedule_data:
print(f" Found {len(schedule_data)} upcoming episodes:")
for ep in schedule_data[:3]: # Show first 3
print(f" - Episode {ep['episode']}")
else:
print(" No upcoming episodes")
else:
print(" No airing schedule found")
except Exception as e:
print(f" Error: {e}")
print()
# TODO: Recreate this test
# Test 7: User Media List (if authenticated)
if api_client.is_authenticated():

View File

@@ -1,65 +0,0 @@
"""
Syncplay integration for Viu.
This module provides a procedural function to launch Syncplay with the given media and options.
"""
import shutil
import subprocess
from .tools import exit_app
def SyncPlayer(url: str, anime_title=None, headers={}, subtitles=[], *args):
"""
Launch Syncplay for synchronized playback with friends.
Args:
url: The media URL to play.
anime_title: Optional title to display in the player.
headers: Optional HTTP headers to pass to the player.
subtitles: Optional list of subtitle dicts with 'url' keys.
*args: Additional arguments (unused).
Returns:
Tuple of ("0", "0") for compatibility.
"""
# TODO: handle m3u8 multi quality streams
#
# check for SyncPlay
SYNCPLAY_EXECUTABLE = shutil.which("syncplay")
if not SYNCPLAY_EXECUTABLE:
print("Syncplay not found")
exit_app(1)
return "0", "0"
# start SyncPlayer
mpv_args = []
if headers:
mpv_headers = "--http-header-fields="
for header_name, header_value in headers.items():
mpv_headers += f"{header_name}:{header_value},"
mpv_args.append(mpv_headers)
for subtitle in subtitles:
mpv_args.append(f"--sub-file={subtitle['url']}")
if not anime_title:
subprocess.run(
[
SYNCPLAY_EXECUTABLE,
url,
],
check=False,
)
else:
subprocess.run(
[
SYNCPLAY_EXECUTABLE,
url,
"--",
f"--force-media-title={anime_title}",
*mpv_args,
],
check=False,
)
# for compatability
return "0", "0"

View File

@@ -88,4 +88,5 @@ def decode_hex_string(hex_string):
# Decode each hex pair
decoded_chars = [hex_to_char.get(pair.lower(), pair) for pair in hex_pairs]
return "".join(decoded_chars)
# TODO: Better type handling
return "".join(decoded_chars) # type: ignore

View File

@@ -1,5 +1,3 @@
from typing import Any
from ..types import (
Anime,
AnimeEpisodeInfo,
@@ -87,13 +85,16 @@ def map_to_anime_result(
def map_to_server(
episode: AnimeEpisodeInfo, translation_type: Any, quality: Any, stream_link: Any
episode: AnimeEpisodeInfo,
translation_type: str,
stream_links: list[tuple[str, str]],
) -> Server:
links = [
EpisodeStream(
link=stream_link,
quality=quality,
link=link[1],
quality=link[0] if link[0] in ["360", "480", "720", "1080"] else "1080", # type:ignore
translation_type=translation_type_map[translation_type],
)
for link in stream_links
]
return Server(name="kwik", links=links, episode_title=episode.title)

View File

@@ -131,15 +131,17 @@ class AnimePahe(BaseAnimeProvider):
res_dicts = [extract_attributes(item) for item in resolutionMenuItems]
quality = None
translation_type = None
stream_link = None
stream_links = []
# TODO: better document the scraping process
for res_dict in res_dicts:
# the actual attributes are data attributes in the original html 'prefixed with data-'
embed_url = res_dict["src"]
logger.debug(f"Found embed url: {embed_url}")
data_audio = "dub" if res_dict["audio"] == "eng" else "sub"
if data_audio != params.translation_type:
logger.debug(f"Found {data_audio} but wanted {params.translation_type}")
continue
if not embed_url:
@@ -155,22 +157,26 @@ class AnimePahe(BaseAnimeProvider):
)
embed_response.raise_for_status()
embed_page = embed_response.text
logger.debug("Processing embed page for JS decoding")
decoded_js = process_animepahe_embed_page(embed_page)
if not decoded_js:
logger.error("failed to decode embed page")
continue
logger.debug(f"Decoded JS: {decoded_js[:100]}...")
juicy_stream = JUICY_STREAM_REGEX.search(decoded_js)
if not juicy_stream:
logger.error("failed to find juicy stream")
continue
logger.debug(f"Found juicy stream: {juicy_stream.group(1)}")
juicy_stream = juicy_stream.group(1)
quality = res_dict["resolution"]
logger.debug(f"Found quality: {quality}")
translation_type = data_audio
stream_link = juicy_stream
stream_links.append((quality, juicy_stream))
if translation_type and quality and stream_link:
yield map_to_server(episode, translation_type, quality, stream_link)
if translation_type and stream_links:
yield map_to_server(episode, translation_type, stream_links)
@lru_cache()
def _get_episode_info(

View File

@@ -69,6 +69,9 @@ def test_anime_provider(AnimeProvider: Type[BaseAnimeProvider]):
for i, stream in enumerate(episode_streams):
print(f"{i + 1}: {stream.name}")
stream = episode_streams[int(input("Select your preferred server: ")) - 1]
for i, link in enumerate(stream.links):
print(f"{i + 1}: {link.quality}")
link = stream.links[int(input("Select your preferred quality: ")) - 1]
if executable := shutil.which("mpv"):
cmd = executable
elif executable := shutil.which("xdg-open"):
@@ -84,4 +87,4 @@ def test_anime_provider(AnimeProvider: Type[BaseAnimeProvider]):
"Episode: ",
stream.episode_title if stream.episode_title else episode_number,
)
subprocess.run([cmd, stream.links[0].link])
subprocess.run([cmd, link.link])

View File

@@ -1,105 +0,0 @@
"""An abstraction over all providers offering added features with a simple and well typed api
[TODO:description]
"""
import importlib
import logging
from typing import TYPE_CHECKING
from .libs.manga_provider import manga_sources
if TYPE_CHECKING:
pass
logger = logging.getLogger(__name__)
class MangaProvider:
"""Class that manages all anime sources adding some extra functionality to them.
Attributes:
PROVIDERS: [TODO:attribute]
provider: [TODO:attribute]
provider: [TODO:attribute]
dynamic: [TODO:attribute]
retries: [TODO:attribute]
manga_provider: [TODO:attribute]
"""
PROVIDERS = list(manga_sources.keys())
provider = PROVIDERS[0]
def __init__(self, provider="mangadex", dynamic=False, retries=0) -> None:
self.provider = provider
self.dynamic = dynamic
self.retries = retries
self.lazyload_provider(self.provider)
def lazyload_provider(self, provider):
"""updates the current provider being used"""
_, anime_provider_cls_name = manga_sources[provider].split(".", 1)
package = f"viu_media.libs.manga_provider.{provider}"
provider_api = importlib.import_module(".api", package)
manga_provider = getattr(provider_api, anime_provider_cls_name)
self.manga_provider = manga_provider()
def search_for_manga(
self,
user_query,
nsfw=True,
unknown=True,
):
"""core abstraction over all providers search functionality
Args:
user_query ([TODO:parameter]): [TODO:description]
translation_type ([TODO:parameter]): [TODO:description]
nsfw ([TODO:parameter]): [TODO:description]
manga_provider ([TODO:parameter]): [TODO:description]
anilist_obj: [TODO:description]
Returns:
[TODO:return]
"""
manga_provider = self.manga_provider
try:
results = manga_provider.search_for_manga(user_query, nsfw, unknown)
except Exception as e:
logger.error(e)
results = None
return results
def get_manga(
self,
anime_id: str,
):
"""core abstraction over getting info of an anime from all providers
Args:
anime_id: [TODO:description]
anilist_obj: [TODO:description]
Returns:
[TODO:return]
"""
manga_provider = self.manga_provider
try:
results = manga_provider.get_manga(anime_id)
except Exception as e:
logger.error(e)
results = None
return results
def get_chapter_thumbnails(
self,
manga_id: str,
chapter: str,
):
manga_provider = self.manga_provider
try:
results = manga_provider.get_chapter_thumbnails(manga_id, chapter)
except Exception as e:
logger.error(e)
results = None
return results # pyright:ignore

View File

@@ -1 +0,0 @@
manga_sources = {"mangadex": "api.MangaDexApi"}

View File

@@ -1,18 +0,0 @@
from httpx import Client
from ....core.utils.networking import random_user_agent
class MangaProvider:
session: Client
USER_AGENT = random_user_agent()
HEADERS = {}
def __init__(self) -> None:
self.session = Client(
headers={
"User-Agent": self.USER_AGENT,
**self.HEADERS,
},
timeout=10,
)

View File

@@ -1,15 +0,0 @@
import logging
from httpx import get
logger = logging.getLogger(__name__)
def fetch_manga_info_from_bal(anilist_id):
try:
url = f"https://raw.githubusercontent.com/bal-mackup/mal-backup/master/anilist/manga/{anilist_id}.json"
response = get(url, timeout=11)
if response.ok:
return response.json()
except Exception as e:
logger.error(e)

View File

@@ -1,51 +0,0 @@
import logging
from ...common.mini_anilist import search_for_manga_with_anilist
from ..base_provider import MangaProvider
from ..common import fetch_manga_info_from_bal
logger = logging.getLogger(__name__)
class MangaDexApi(MangaProvider):
def search_for_manga(self, title: str, *args):
try:
search_results = search_for_manga_with_anilist(title)
return search_results
except Exception as e:
logger.error(f"[MANGADEX-ERROR]: {e}")
def get_manga(self, anilist_manga_id: str):
bal_data = fetch_manga_info_from_bal(anilist_manga_id)
if not bal_data:
return
manga_id, MangaDexManga = next(iter(bal_data["Sites"]["Mangadex"].items()))
return {
"id": manga_id,
"title": MangaDexManga["title"],
"poster": MangaDexManga["image"],
"availableChapters": [],
}
def get_chapter_thumbnails(self, manga_id, chapter):
chapter_info_url = f"https://api.mangadex.org/chapter?manga={manga_id}&translatedLanguage[]=en&chapter={chapter}&includeEmptyPages=0"
chapter_info_response = self.session.get(chapter_info_url)
if not chapter_info_response.ok:
return
chapter_info = next(iter(chapter_info_response.json()["data"]))
chapters_thumbnails_url = (
f"https://api.mangadex.org/at-home/server/{chapter_info['id']}"
)
chapter_thumbnails_response = self.session.get(chapters_thumbnails_url)
if not chapter_thumbnails_response.ok:
return
chapter_thumbnails_info = chapter_thumbnails_response.json()
base_url = chapter_thumbnails_info["baseUrl"]
hash = chapter_thumbnails_info["chapter"]["hash"]
return {
"thumbnails": [
f"{base_url}/data/{hash}/{chapter_thumbnail}"
for chapter_thumbnail in chapter_thumbnails_info["chapter"]["data"]
],
"title": chapter_info["attributes"]["title"],
}

View File

@@ -1,3 +1,4 @@
# pyright: reportAttributeAccessIssue=false, reportPossiblyUnboundVariable=false
"""
HTML parsing utilities with optional lxml support.

View File

@@ -1,4 +1,4 @@
from InquirerPy.prompts import FuzzyPrompt
from InquirerPy.prompts import FuzzyPrompt # pyright: ignore[reportPrivateImportUsage]
from rich.prompt import Confirm, Prompt
from ..base import BaseSelector

View File

@@ -8,7 +8,7 @@ if getattr(sys, "frozen", False):
sys.path.insert(0, application_path)
# Import and run the main application
from viu import Cli
from viu_media import Cli
if __name__ == "__main__":
Cli()