Compare commits

..

3 Commits

Author SHA1 Message Date
Benedict Xavier
ab93e088da Merge pull request #176 from xodexa/self-fix 2025-12-30 09:28:59 +03:00
Zen
bedd345eb8 Merge branch 'master' into self-fix 2025-12-30 12:43:28 +08:00
Zen
4dc15eec50 fix: IPC socket for windows 2025-12-30 12:39:45 +08:00
41 changed files with 244 additions and 2207 deletions

View File

@@ -1,152 +0,0 @@
name: Build Release Binaries
on:
release:
types: [published]
workflow_dispatch:
inputs:
tag:
description: "Tag/version to build (leave empty for latest)"
required: false
type: string
permissions:
contents: write
jobs:
build:
strategy:
fail-fast: false
matrix:
include:
- os: ubuntu-latest
target: linux
asset_name: viu-linux-x86_64
executable: viu
- os: windows-latest
target: windows
asset_name: viu-windows-x86_64.exe
executable: viu.exe
- os: macos-latest
target: macos
asset_name: viu-macos-x86_64
executable: viu
runs-on: ${{ matrix.os }}
steps:
- name: Checkout repository
uses: actions/checkout@v4
with:
ref: ${{ github.event.inputs.tag || github.ref }}
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.11"
- name: Install uv
uses: astral-sh/setup-uv@v3
with:
enable-cache: true
- name: Install system dependencies (Linux)
if: runner.os == 'Linux'
run: |
sudo apt-get update
sudo apt-get install -y libdbus-1-dev libglib2.0-dev
- name: Install dependencies
run: uv sync --all-extras --all-groups
- name: Build executable with PyInstaller
run: uv run pyinstaller bundle/pyinstaller.spec --distpath dist --workpath build/pyinstaller --clean
- name: Rename executable
shell: bash
run: mv dist/${{ matrix.executable }} dist/${{ matrix.asset_name }}
- name: Upload artifact
uses: actions/upload-artifact@v4
with:
name: ${{ matrix.asset_name }}
path: dist/${{ matrix.asset_name }}
if-no-files-found: error
- name: Upload to Release
if: github.event_name == 'release'
uses: softprops/action-gh-release@v2
with:
files: dist/${{ matrix.asset_name }}
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
# Build for macOS ARM (Apple Silicon)
build-macos-arm:
runs-on: macos-14
steps:
- name: Checkout repository
uses: actions/checkout@v4
with:
ref: ${{ github.event.inputs.tag || github.ref }}
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.11"
- name: Install uv
uses: astral-sh/setup-uv@v3
with:
enable-cache: true
- name: Install dependencies
run: uv sync --all-extras --all-groups
- name: Build executable with PyInstaller
run: uv run pyinstaller bundle/pyinstaller.spec --distpath dist --workpath build/pyinstaller --clean
- name: Rename executable
run: mv dist/viu dist/viu-macos-arm64
- name: Upload artifact
uses: actions/upload-artifact@v4
with:
name: viu-macos-arm64
path: dist/viu-macos-arm64
if-no-files-found: error
- name: Upload to Release
if: github.event_name == 'release'
uses: softprops/action-gh-release@v2
with:
files: dist/viu-macos-arm64
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
# Create checksums after all builds complete
checksums:
needs: [build, build-macos-arm]
runs-on: ubuntu-latest
if: github.event_name == 'release'
steps:
- name: Download all artifacts
uses: actions/download-artifact@v4
with:
path: artifacts
merge-multiple: true
- name: Generate checksums
run: |
cd artifacts
sha256sum * > SHA256SUMS.txt
cat SHA256SUMS.txt
- name: Upload checksums to Release
uses: softprops/action-gh-release@v2
with:
files: artifacts/SHA256SUMS.txt
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}

View File

@@ -1,9 +1,9 @@
name: Mark Stale Issues and Pull Requests
on:
# schedule:
# Runs every day at 6:30 UTC
# - cron: "30 6 * * *"
schedule:
# Runs every day at 6:30 UTC
- cron: "30 6 * * *"
# Allows you to run this workflow manually from the Actions tab for testing
workflow_dispatch:

View File

@@ -114,78 +114,38 @@ uv tool install "viu-media[notifications]" # For desktop notifications
```
#### Termux
You may have to have rust installed see this issue: https://github.com/pydantic/pydantic-core/issues/1012#issuecomment-2511269688.
```bash
pkg install python # though uv will probably install python for you, but doesn't hurt to have it :)
pkg install rust # maybe required cause of pydantic
# Recommended (with pip due to more control)
pkg install python
pkg install rust # required cause of pydantic
# NOTE: order matters
# get pydantic from the termux user repository
pip install pydantic --extra-index-url https://termux-user-repository.github.io/pypi/
# the above will take a while if you want to see more output and feel like sth is happening lol
pip install pydantic --extra-index-url https://termux-user-repository.github.io/pypi/ -v
# now you can install viu
pip install viu-media
# === optional deps ===
# if you have reach here awesome lol :)
# you may need to install pydantic manually
python -m pip install pydantic --extra-index-url https://termux-user-repository.github.io/pypi/ # may also be necessary incase the above fails
# yt-dlp for downloading m3u8 and hls streams
# add yt-dlp by
pip install yt-dlp[default,curl-cffi]
# you may also need ffmpeg for processing the videos
pkg install ffmpeg
# prefer without standard and manually install the things you need lxml, yt-dlp and
pip install viu-media[standard]
# tip if you also want yt functionality
pip install yt-dlp-ejs
# you may need to manually install lxml and plyer manually eg
python -m pip install lxml --extra-index-url https://termux-user-repository.github.io/pypi/ # may also be necessary incase the above fails
# you require js runtime
# eg the recommended one
pkg install deno
# Alternative With Uv may work, no promises
pkg install uv
# for faster fuzzy search
pip install thefuzz
uv tool install viu-media
# if you want faster scraping, though barely noticeable lol
pip install lxml --extra-index-url https://termux-user-repository.github.io/pypi/
# and to add yt-dlp only you can do
uv tool install viu-media --with yt-dlp[default,curl-cffi]
# if compilation fails you need to have
pkg install libxml2 libxslt
# == ui setup ==
pkg install fzf
# then enable fzf in the config
viu --selector fzf config --update
# if you want previews as well specify preview option
# though images arent that pretty lol, so you can stick to text over full
viu --preview text config --update
# if you set preview to full you need a terminal image renderer
pkg install chafa
# == player setup ==
# for this you need to strictly install from playstore
# search for mpv or vlc (recommended, since has nicer ui)
# the only limitation is currently its not possible to pass headers to the android players
# through android intents
# so use servers like sharepoint and wixmp
# though this is not an issue when it comes to downloading ;)
# if you have installed using 'pkg' uninstall it
# okey now you are all set, i promise the hussle is worth it lol :)
# posted a video of it working to motivate you
# note i recorded it from waydroid which is android for linux sought of like an emulator(bluestacks for example)
```
https://github.com/user-attachments/assets/0c628421-a439-4dea-91bb-7153e8f20ccf
# or though may fail, cause of lxml and plyer, in that case try to install manually
uv tool install viu-media[standard]
```
#### Using pipx (for isolated environments)
```bash
@@ -221,7 +181,7 @@ Get up and running in three simple steps:
```bash
viu anilist auth
```
This will open your browser. Authorize the app and paste the obtained token back into the terminal. Alternatively, you can pass the token directly as an argument, or provide a path to a text file containing the token.
This will open your browser. Authorize the app and paste the obtained token back into the terminal.
2. **Launch the Interactive TUI:**
```bash

View File

@@ -1,46 +1,26 @@
# -*- mode: python ; coding: utf-8 -*-
import sys
from PyInstaller.utils.hooks import collect_data_files, collect_submodules
block_cipher = None
# Platform-specific settings
is_windows = sys.platform == 'win32'
is_macos = sys.platform == 'darwin'
# Collect all required data files
datas = [
('../viu_media/assets', 'viu_media/assets'),
('viu/assets/*', 'viu/assets'),
]
# Collect all required hidden imports
# Include viu_media and all its submodules to ensure menu modules are bundled
hiddenimports = [
'click',
'rich',
'requests',
'yt_dlp',
'viu_media',
'viu_media.cli.interactive.menu',
'viu_media.cli.interactive.menu.media',
# Explicit menu modules (PyInstaller doesn't always pick these up)
'viu_media.cli.interactive.menu.media.downloads',
'viu_media.cli.interactive.menu.media.download_episodes',
'viu_media.cli.interactive.menu.media.dynamic_search',
'viu_media.cli.interactive.menu.media.episodes',
'viu_media.cli.interactive.menu.media.main',
'viu_media.cli.interactive.menu.media.media_actions',
'viu_media.cli.interactive.menu.media.media_airing_schedule',
'viu_media.cli.interactive.menu.media.media_characters',
'viu_media.cli.interactive.menu.media.media_review',
'viu_media.cli.interactive.menu.media.player_controls',
'viu_media.cli.interactive.menu.media.play_downloads',
'viu_media.cli.interactive.menu.media.provider_search',
'viu_media.cli.interactive.menu.media.results',
'viu_media.cli.interactive.menu.media.servers',
] + collect_submodules('viu_media')
'python_mpv',
'fuzzywuzzy',
'viu',
] + collect_submodules('viu')
a = Analysis(
['../viu_media/viu.py'],
['./viu/viu.py'], # Changed entry point
pathex=[],
binaries=[],
datas=datas,
@@ -52,18 +32,16 @@ a = Analysis(
win_no_prefer_redirects=False,
win_private_assemblies=False,
cipher=block_cipher,
noarchive=False,
strip=True, # Strip debug information
optimize=2 # Optimize bytecode noarchive=False
)
pyz = PYZ(
a.pure,
a.zipped_data,
cipher=block_cipher,
optimize=2 # Optimize bytecode cipher=block_cipher
)
# Icon path - only use .ico on Windows
icon_path = '../viu_media/assets/icons/logo.ico' if is_windows else None
exe = EXE(
pyz,
a.scripts,
@@ -74,7 +52,7 @@ exe = EXE(
name='viu',
debug=False,
bootloader_ignore_signals=False,
strip=not is_windows, # strip doesn't work well on Windows without proper tools
strip=True,
upx=True,
upx_exclude=[],
runtime_tmpdir=None,
@@ -83,5 +61,5 @@ exe = EXE(
target_arch=None,
codesign_identity=None,
entitlements_file=None,
icon=icon_path,
icon='viu/assets/logo.ico'
)

View File

@@ -1,16 +1,16 @@
[project]
name = "viu-media"
version = "3.3.6"
version = "3.3.5"
description = "A browser anime site experience from the terminal"
license = "UNLICENSE"
readme = "README.md"
requires-python = ">=3.11"
dependencies = [
"click>=8.1.7",
"httpx>=0.28.1",
"inquirerpy>=0.3.4",
"pydantic>=2.11.7",
"rich>=13.9.2",
"click>=8.1.7",
"httpx>=0.28.1",
"inquirerpy>=0.3.4",
"pydantic>=2.11.7",
"rich>=13.9.2",
]
[project.scripts]
@@ -18,27 +18,32 @@ viu = 'viu_media:Cli'
[project.optional-dependencies]
standard = [
"thefuzz>=0.22.1",
"yt-dlp>=2025.7.21",
"pycryptodomex>=3.23.0",
"pypiwin32; sys_platform == 'win32'", # For Windows-specific functionality
"pyobjc; sys_platform == 'darwin'", # For macOS-specific functionality
"dbus-python; sys_platform == 'linux'", # For Linux-specific functionality (e.g., notifications),
"plyer>=2.1.0",
"lxml>=6.0.0",
"thefuzz>=0.22.1",
"yt-dlp>=2025.7.21",
"pycryptodomex>=3.23.0",
"pypiwin32; sys_platform == 'win32'", # For Windows-specific functionality
"pyobjc; sys_platform == 'darwin'", # For macOS-specific functionality
"dbus-python; sys_platform == 'linux'", # For Linux-specific functionality (e.g., notifications),
"plyer>=2.1.0",
"lxml>=6.0.0"
]
notifications = [
"pypiwin32; sys_platform == 'win32'", # For Windows-specific functionality
"pyobjc; sys_platform == 'darwin'", # For macOS-specific functionality
"dbus-python>=1.4.0; sys_platform == 'linux'",
"dbus-python>=1.4.0",
"plyer>=2.1.0",
]
mpv = ["mpv>=1.0.7"]
mpv = [
"mpv>=1.0.7",
]
torrent = ["libtorrent>=2.0.11"]
lxml = ["lxml>=6.0.0"]
discord = ["pypresence>=4.3.0"]
download = ["pycryptodomex>=3.23.0", "yt-dlp>=2025.7.21"]
torrents = ["libtorrent>=2.0.11"]
download = [
"pycryptodomex>=3.23.0",
"yt-dlp>=2025.7.21",
]
torrents = [
"libtorrent>=2.0.11",
]
[build-system]
requires = ["hatchling"]
@@ -46,12 +51,12 @@ build-backend = "hatchling.build"
[dependency-groups]
dev = [
"pre-commit>=4.0.1",
"pyinstaller>=6.11.1",
"pyright>=1.1.384",
"pytest>=8.3.3",
"pytest-httpx>=0.35.0",
"ruff>=0.6.9",
"pre-commit>=4.0.1",
"pyinstaller>=6.11.1",
"pyright>=1.1.384",
"pytest>=8.3.3",
"pytest-httpx>=0.35.0",
"ruff>=0.6.9",
]
[tool.pytest.ini_options]

View File

@@ -1,284 +0,0 @@
from unittest.mock import MagicMock, patch
import pytest
from click.testing import CliRunner
from viu_media.cli.commands.anilist.commands.auth import auth
@pytest.fixture
def runner():
return CliRunner()
@pytest.fixture
def mock_config():
config = MagicMock()
config.user.interactive = True
return config
@pytest.fixture
def mock_auth_service():
with patch("viu_media.cli.service.auth.AuthService") as mock:
yield mock
@pytest.fixture
def mock_feedback_service():
with patch("viu_media.cli.service.feedback.FeedbackService") as mock:
yield mock
@pytest.fixture
def mock_selector():
with patch("viu_media.libs.selectors.selector.create_selector") as mock:
yield mock
@pytest.fixture
def mock_api_client():
with patch("viu_media.libs.media_api.api.create_api_client") as mock:
yield mock
@pytest.fixture
def mock_webbrowser():
with patch("viu_media.cli.commands.anilist.commands.auth.webbrowser") as mock:
yield mock
def test_auth_with_token_argument(
runner,
mock_config,
mock_auth_service,
mock_feedback_service,
mock_selector,
mock_api_client,
):
"""Test 'viu anilist auth <token>'."""
api_client_instance = mock_api_client.return_value
profile_mock = MagicMock()
profile_mock.name = "testuser"
api_client_instance.authenticate.return_value = profile_mock
auth_service_instance = mock_auth_service.return_value
auth_service_instance.get_auth.return_value = None
result = runner.invoke(auth, ["test_token"], obj=mock_config)
assert result.exit_code == 0
mock_api_client.assert_called_with("anilist", mock_config)
api_client_instance.authenticate.assert_called_with("test_token")
auth_service_instance.save_user_profile.assert_called_with(
profile_mock, "test_token"
)
feedback_instance = mock_feedback_service.return_value
feedback_instance.info.assert_called_with("Successfully logged in as testuser! ✨")
def test_auth_with_token_file(
runner,
mock_config,
mock_auth_service,
mock_feedback_service,
mock_selector,
mock_api_client,
tmp_path,
):
"""Test 'viu anilist auth <path/to/token.txt>'."""
token_file = tmp_path / "token.txt"
token_file.write_text("file_token")
api_client_instance = mock_api_client.return_value
profile_mock = MagicMock()
profile_mock.name = "testuser"
api_client_instance.authenticate.return_value = profile_mock
auth_service_instance = mock_auth_service.return_value
auth_service_instance.get_auth.return_value = None
result = runner.invoke(auth, [str(token_file)], obj=mock_config)
assert result.exit_code == 0
mock_api_client.assert_called_with("anilist", mock_config)
api_client_instance.authenticate.assert_called_with("file_token")
auth_service_instance.save_user_profile.assert_called_with(
profile_mock, "file_token"
)
feedback_instance = mock_feedback_service.return_value
feedback_instance.info.assert_called_with("Successfully logged in as testuser! ✨")
def test_auth_with_empty_token_file(
runner,
mock_config,
mock_auth_service,
mock_feedback_service,
mock_selector,
mock_api_client,
tmp_path,
):
"""Test 'viu anilist auth' with an empty token file."""
token_file = tmp_path / "token.txt"
token_file.write_text("")
auth_service_instance = mock_auth_service.return_value
auth_service_instance.get_auth.return_value = None
result = runner.invoke(auth, [str(token_file)], obj=mock_config)
assert result.exit_code == 0
feedback_instance = mock_feedback_service.return_value
feedback_instance.error.assert_called_with(f"Token file is empty: {token_file}")
def test_auth_interactive(
runner,
mock_config,
mock_auth_service,
mock_feedback_service,
mock_selector,
mock_api_client,
mock_webbrowser,
):
"""Test 'viu anilist auth' interactive mode."""
mock_webbrowser.open.return_value = True
selector_instance = mock_selector.return_value
selector_instance.ask.return_value = "interactive_token"
api_client_instance = mock_api_client.return_value
profile_mock = MagicMock()
profile_mock.name = "testuser"
api_client_instance.authenticate.return_value = profile_mock
auth_service_instance = mock_auth_service.return_value
auth_service_instance.get_auth.return_value = None
result = runner.invoke(auth, [], obj=mock_config)
assert result.exit_code == 0
selector_instance.ask.assert_called_with("Enter your AniList Access Token")
api_client_instance.authenticate.assert_called_with("interactive_token")
auth_service_instance.save_user_profile.assert_called_with(
profile_mock, "interactive_token"
)
feedback_instance = mock_feedback_service.return_value
feedback_instance.info.assert_called_with("Successfully logged in as testuser! ✨")
def test_auth_status_logged_in(
runner, mock_config, mock_auth_service, mock_feedback_service
):
"""Test 'viu anilist auth --status' when logged in."""
auth_service_instance = mock_auth_service.return_value
user_data_mock = MagicMock()
user_data_mock.user_profile = "testuser"
auth_service_instance.get_auth.return_value = user_data_mock
result = runner.invoke(auth, ["--status"], obj=mock_config)
assert result.exit_code == 0
feedback_instance = mock_feedback_service.return_value
feedback_instance.info.assert_called_with("Logged in as: testuser")
def test_auth_status_logged_out(
runner, mock_config, mock_auth_service, mock_feedback_service
):
"""Test 'viu anilist auth --status' when logged out."""
auth_service_instance = mock_auth_service.return_value
auth_service_instance.get_auth.return_value = None
result = runner.invoke(auth, ["--status"], obj=mock_config)
assert result.exit_code == 0
feedback_instance = mock_feedback_service.return_value
feedback_instance.error.assert_called_with("Not logged in.")
def test_auth_logout(
runner, mock_config, mock_auth_service, mock_feedback_service, mock_selector
):
"""Test 'viu anilist auth --logout'."""
selector_instance = mock_selector.return_value
selector_instance.confirm.return_value = True
result = runner.invoke(auth, ["--logout"], obj=mock_config)
assert result.exit_code == 0
auth_service_instance = mock_auth_service.return_value
auth_service_instance.clear_user_profile.assert_called_once()
feedback_instance = mock_feedback_service.return_value
feedback_instance.info.assert_called_with("You have been logged out.")
def test_auth_logout_cancel(
runner, mock_config, mock_auth_service, mock_feedback_service, mock_selector
):
"""Test 'viu anilist auth --logout' when user cancels."""
selector_instance = mock_selector.return_value
selector_instance.confirm.return_value = False
result = runner.invoke(auth, ["--logout"], obj=mock_config)
assert result.exit_code == 0
auth_service_instance = mock_auth_service.return_value
auth_service_instance.clear_user_profile.assert_not_called()
def test_auth_already_logged_in_relogin_yes(
runner,
mock_config,
mock_auth_service,
mock_feedback_service,
mock_selector,
mock_api_client,
):
"""Test 'viu anilist auth' when already logged in and user chooses to relogin."""
auth_service_instance = mock_auth_service.return_value
auth_profile_mock = MagicMock()
auth_profile_mock.user_profile.name = "testuser"
auth_service_instance.get_auth.return_value = auth_profile_mock
selector_instance = mock_selector.return_value
selector_instance.confirm.return_value = True
selector_instance.ask.return_value = "new_token"
api_client_instance = mock_api_client.return_value
new_profile_mock = MagicMock()
new_profile_mock.name = "newuser"
api_client_instance.authenticate.return_value = new_profile_mock
result = runner.invoke(auth, [], obj=mock_config)
assert result.exit_code == 0
selector_instance.confirm.assert_called_with(
"You are already logged in as testuser. Would you like to relogin"
)
auth_service_instance.save_user_profile.assert_called_with(
new_profile_mock, "new_token"
)
feedback_instance = mock_feedback_service.return_value
feedback_instance.info.assert_called_with("Successfully logged in as newuser! ✨")
def test_auth_already_logged_in_relogin_no(
runner, mock_config, mock_auth_service, mock_feedback_service, mock_selector
):
"""Test 'viu anilist auth' when already logged in and user chooses not to relogin."""
auth_service_instance = mock_auth_service.return_value
auth_profile_mock = MagicMock()
auth_profile_mock.user_profile.name = "testuser"
auth_service_instance.get_auth.return_value = auth_profile_mock
selector_instance = mock_selector.return_value
selector_instance.confirm.return_value = False
result = runner.invoke(auth, [], obj=mock_config)
assert result.exit_code == 0
auth_service_instance.save_user_profile.assert_not_called()
feedback_instance = mock_feedback_service.return_value
feedback_instance.info.assert_not_called()

View File

View File

@@ -1,54 +0,0 @@
from typing import Any
from viu_media.libs.media_api.anilist.mapper import to_generic_user_profile
from viu_media.libs.media_api.anilist.types import AnilistViewerData
from viu_media.libs.media_api.types import UserProfile
def test_to_generic_user_profile_success():
data: AnilistViewerData = {
"data": {
"Viewer": {
"id": 123,
"name": "testuser",
"avatar": {
"large": "https://example.com/avatar.png",
"medium": "https://example.com/avatar_medium.png",
"extraLarge": "https://example.com/avatar_extraLarge.png",
"small": "https://example.com/avatar_small.png",
},
"bannerImage": "https://example.com/banner.png",
"token": "test_token",
}
}
}
profile = to_generic_user_profile(data)
assert isinstance(profile, UserProfile)
assert profile.id == 123
assert profile.name == "testuser"
assert profile.avatar_url == "https://example.com/avatar.png"
assert profile.banner_url == "https://example.com/banner.png"
def test_to_generic_user_profile_data_none():
data: Any = {"data": None}
profile = to_generic_user_profile(data)
assert profile is None
def test_to_generic_user_profile_no_data_key():
data: Any = {"errors": [{"message": "Invalid token"}]}
profile = to_generic_user_profile(data)
assert profile is None
def test_to_generic_user_profile_no_viewer_key():
data: Any = {"data": {"Page": {}}}
profile = to_generic_user_profile(data)
assert profile is None
def test_to_generic_user_profile_viewer_none():
data: Any = {"data": {"Viewer": None}}
profile = to_generic_user_profile(data)
assert profile is None

10
uv.lock generated
View File

@@ -3743,7 +3743,7 @@ wheels = [
[[package]]
name = "viu-media"
version = "3.3.6"
version = "3.3.5"
source = { editable = "." }
dependencies = [
{ name = "click" },
@@ -3768,10 +3768,8 @@ mpv = [
{ name = "mpv" },
]
notifications = [
{ name = "dbus-python", marker = "sys_platform == 'linux'" },
{ name = "dbus-python" },
{ name = "plyer" },
{ name = "pyobjc", marker = "sys_platform == 'darwin'" },
{ name = "pypiwin32", marker = "sys_platform == 'win32'" },
]
standard = [
{ name = "dbus-python", marker = "sys_platform == 'linux'" },
@@ -3803,8 +3801,8 @@ dev = [
[package.metadata]
requires-dist = [
{ name = "click", specifier = ">=8.1.7" },
{ name = "dbus-python", marker = "sys_platform == 'linux' and extra == 'notifications'", specifier = ">=1.4.0" },
{ name = "dbus-python", marker = "sys_platform == 'linux' and extra == 'standard'" },
{ name = "dbus-python", marker = "extra == 'notifications'", specifier = ">=1.4.0" },
{ name = "httpx", specifier = ">=0.28.1" },
{ name = "inquirerpy", specifier = ">=0.3.4" },
{ name = "libtorrent", marker = "extra == 'torrent'", specifier = ">=2.0.11" },
@@ -3817,9 +3815,7 @@ requires-dist = [
{ name = "pycryptodomex", marker = "extra == 'download'", specifier = ">=3.23.0" },
{ name = "pycryptodomex", marker = "extra == 'standard'", specifier = ">=3.23.0" },
{ name = "pydantic", specifier = ">=2.11.7" },
{ name = "pyobjc", marker = "sys_platform == 'darwin' and extra == 'notifications'" },
{ name = "pyobjc", marker = "sys_platform == 'darwin' and extra == 'standard'" },
{ name = "pypiwin32", marker = "sys_platform == 'win32' and extra == 'notifications'" },
{ name = "pypiwin32", marker = "sys_platform == 'win32' and extra == 'standard'" },
{ name = "pypresence", marker = "extra == 'discord'", specifier = ">=4.3.0" },
{ name = "rich", specifier = ">=13.9.2" },

View File

@@ -5,8 +5,7 @@
"Dungeon ni Deai o Motomeru no wa Machigatte Iru Darouka": "Dungeon ni Deai wo Motomeru no wa Machigatteiru Darou ka",
"Hazurewaku no \"Joutai Ijou Skill\" de Saikyou ni Natta Ore ga Subete wo Juurin suru made": "Hazure Waku no [Joutai Ijou Skill] de Saikyou ni Natta Ore ga Subete wo Juurin Suru made",
"Re:Zero kara Hajimeru Isekai Seikatsu Season 3": "Re:Zero kara Hajimeru Isekai Seikatsu 3rd Season",
"Hanka×Hanka (2011)": "Hunter × Hunter (2011)",
"Burichi -": "bleach"
"Hanka×Hanka (2011)": "Hunter × Hunter (2011)"
},
"hianime": {
"My Star": "Oshi no Ko"

View File

@@ -1,323 +0,0 @@
#!/usr/bin/env python3
"""
Filter Parser for Dynamic Search
This module provides a parser for the special filter syntax used in dynamic search.
Filter syntax allows users to add filters inline with their search query.
SYNTAX:
@filter:value - Apply a filter with the given value
@filter:value1,value2 - Apply multiple values (for array filters)
@filter:!value - Exclude/negate a filter value
SUPPORTED FILTERS:
@genre:action,comedy - Filter by genres
@genre:!hentai - Exclude genre
@status:airing - Filter by status (airing, finished, upcoming, cancelled, hiatus)
@year:2024 - Filter by season year
@season:winter - Filter by season (winter, spring, summer, fall)
@format:tv,movie - Filter by format (tv, movie, ova, ona, special, music)
@sort:score - Sort by (score, popularity, trending, title, date)
@score:>80 - Minimum score
@score:<50 - Maximum score
@popularity:>10000 - Minimum popularity
@onlist - Only show anime on user's list
@onlist:false - Only show anime NOT on user's list
EXAMPLES:
"naruto @genre:action @status:finished"
"isekai @year:2024 @season:winter @sort:score"
"@genre:action,adventure @status:airing"
"romance @genre:!hentai @format:tv,movie"
"""
import re
from typing import Any, Dict, List, Optional, Tuple
# Mapping of user-friendly filter names to GraphQL variable names
FILTER_ALIASES = {
# Status aliases
"airing": "RELEASING",
"releasing": "RELEASING",
"finished": "FINISHED",
"completed": "FINISHED",
"upcoming": "NOT_YET_RELEASED",
"not_yet_released": "NOT_YET_RELEASED",
"unreleased": "NOT_YET_RELEASED",
"cancelled": "CANCELLED",
"canceled": "CANCELLED",
"hiatus": "HIATUS",
"paused": "HIATUS",
# Format aliases
"tv": "TV",
"tv_short": "TV_SHORT",
"tvshort": "TV_SHORT",
"movie": "MOVIE",
"film": "MOVIE",
"ova": "OVA",
"ona": "ONA",
"special": "SPECIAL",
"music": "MUSIC",
# Season aliases
"winter": "WINTER",
"spring": "SPRING",
"summer": "SUMMER",
"fall": "FALL",
"autumn": "FALL",
# Sort aliases
"score": "SCORE_DESC",
"score_desc": "SCORE_DESC",
"score_asc": "SCORE",
"popularity": "POPULARITY_DESC",
"popularity_desc": "POPULARITY_DESC",
"popularity_asc": "POPULARITY",
"trending": "TRENDING_DESC",
"trending_desc": "TRENDING_DESC",
"trending_asc": "TRENDING",
"title": "TITLE_ROMAJI",
"title_desc": "TITLE_ROMAJI_DESC",
"date": "START_DATE_DESC",
"date_desc": "START_DATE_DESC",
"date_asc": "START_DATE",
"newest": "START_DATE_DESC",
"oldest": "START_DATE",
"favourites": "FAVOURITES_DESC",
"favorites": "FAVOURITES_DESC",
"episodes": "EPISODES_DESC",
}
# Genre name normalization (lowercase -> proper case)
GENRE_NAMES = {
"action": "Action",
"adventure": "Adventure",
"comedy": "Comedy",
"drama": "Drama",
"ecchi": "Ecchi",
"fantasy": "Fantasy",
"horror": "Horror",
"mahou_shoujo": "Mahou Shoujo",
"mahou": "Mahou Shoujo",
"magical_girl": "Mahou Shoujo",
"mecha": "Mecha",
"music": "Music",
"mystery": "Mystery",
"psychological": "Psychological",
"romance": "Romance",
"sci-fi": "Sci-Fi",
"scifi": "Sci-Fi",
"sci_fi": "Sci-Fi",
"slice_of_life": "Slice of Life",
"sol": "Slice of Life",
"sports": "Sports",
"supernatural": "Supernatural",
"thriller": "Thriller",
"hentai": "Hentai",
}
# Filter pattern: @key:value or @key (boolean flags)
FILTER_PATTERN = re.compile(r"@(\w+)(?::([^\s]+))?", re.IGNORECASE)
# Comparison operators for numeric filters
COMPARISON_PATTERN = re.compile(r"^([<>]=?)?(\d+)$")
def normalize_value(value: str, value_type: str) -> str:
"""Normalize a filter value based on its type."""
value_lower = value.lower().strip()
if value_type == "genre":
return GENRE_NAMES.get(value_lower, value.title())
elif value_type in ("status", "format", "season", "sort"):
return FILTER_ALIASES.get(value_lower, value.upper())
return value
def parse_value_list(value_str: str) -> Tuple[List[str], List[str]]:
"""
Parse a comma-separated value string, separating includes from excludes.
Returns:
Tuple of (include_values, exclude_values)
"""
includes = []
excludes = []
for val in value_str.split(","):
val = val.strip()
if not val:
continue
if val.startswith("!"):
excludes.append(val[1:])
else:
includes.append(val)
return includes, excludes
def parse_comparison(value: str) -> Tuple[Optional[str], Optional[int]]:
"""
Parse a comparison value like ">80" or "<50".
Returns:
Tuple of (operator, number) or (None, None) if invalid
"""
match = COMPARISON_PATTERN.match(value)
if match:
operator = match.group(1) or ">" # Default to greater than
number = int(match.group(2))
return operator, number
return None, None
def parse_filters(query: str) -> Tuple[str, Dict[str, Any]]:
"""
Parse a search query and extract filter directives.
Args:
query: The full search query including filter syntax
Returns:
Tuple of (clean_query, filters_dict)
- clean_query: The query with filter syntax removed
- filters_dict: Dictionary of GraphQL variables to apply
"""
filters: Dict[str, Any] = {}
# Find all filter matches
matches = list(FILTER_PATTERN.finditer(query))
for match in matches:
filter_name = match.group(1).lower()
filter_value = match.group(2) # May be None for boolean flags
# Handle different filter types
if filter_name == "genre":
if filter_value:
includes, excludes = parse_value_list(filter_value)
if includes:
normalized = [normalize_value(v, "genre") for v in includes]
filters.setdefault("genre_in", []).extend(normalized)
if excludes:
normalized = [normalize_value(v, "genre") for v in excludes]
filters.setdefault("genre_not_in", []).extend(normalized)
elif filter_name == "status":
if filter_value:
includes, excludes = parse_value_list(filter_value)
if includes:
normalized = [normalize_value(v, "status") for v in includes]
filters.setdefault("status_in", []).extend(normalized)
if excludes:
normalized = [normalize_value(v, "status") for v in excludes]
filters.setdefault("status_not_in", []).extend(normalized)
elif filter_name == "format":
if filter_value:
includes, _ = parse_value_list(filter_value)
if includes:
normalized = [normalize_value(v, "format") for v in includes]
filters.setdefault("format_in", []).extend(normalized)
elif filter_name == "year":
if filter_value:
try:
filters["seasonYear"] = int(filter_value)
except ValueError:
pass # Invalid year, skip
elif filter_name == "season":
if filter_value:
filters["season"] = normalize_value(filter_value, "season")
elif filter_name == "sort":
if filter_value:
sort_val = normalize_value(filter_value, "sort")
filters["sort"] = [sort_val]
elif filter_name == "score":
if filter_value:
op, num = parse_comparison(filter_value)
if num is not None:
if op in (">", ">="):
filters["averageScore_greater"] = num
elif op in ("<", "<="):
filters["averageScore_lesser"] = num
elif filter_name == "popularity":
if filter_value:
op, num = parse_comparison(filter_value)
if num is not None:
if op in (">", ">="):
filters["popularity_greater"] = num
elif op in ("<", "<="):
filters["popularity_lesser"] = num
elif filter_name == "onlist":
if filter_value is None or filter_value.lower() in ("true", "yes", "1"):
filters["on_list"] = True
elif filter_value.lower() in ("false", "no", "0"):
filters["on_list"] = False
elif filter_name == "tag":
if filter_value:
includes, excludes = parse_value_list(filter_value)
if includes:
# Tags use title case typically
normalized = [v.replace("_", " ").title() for v in includes]
filters.setdefault("tag_in", []).extend(normalized)
if excludes:
normalized = [v.replace("_", " ").title() for v in excludes]
filters.setdefault("tag_not_in", []).extend(normalized)
# Remove filter syntax from query to get clean search text
clean_query = FILTER_PATTERN.sub("", query).strip()
# Clean up multiple spaces
clean_query = re.sub(r"\s+", " ", clean_query).strip()
return clean_query, filters
def get_help_text() -> str:
"""Return a help string describing the filter syntax."""
return """
╭─────────────────── Filter Syntax Help ───────────────────╮
│ │
│ @genre:action,comedy Filter by genres │
│ @genre:!hentai Exclude genre │
│ @status:airing Status: airing, finished, │
│ upcoming, cancelled, hiatus │
│ @year:2024 Filter by year │
│ @season:winter winter, spring, summer, fall │
│ @format:tv,movie tv, movie, ova, ona, special │
│ @sort:score score, popularity, trending, │
│ date, title, newest, oldest │
│ @score:>80 Minimum score │
│ @score:<50 Maximum score │
│ @popularity:>10000 Minimum popularity │
│ @onlist Only on your list │
│ @onlist:false Not on your list │
│ @tag:isekai,reincarnation Filter by tags │
│ │
│ Examples: │
│ naruto @genre:action @status:finished │
│ @genre:action,adventure @year:2024 @sort:score │
│ isekai @season:winter @year:2024 │
│ │
╰──────────────────────────────────────────────────────────╯
""".strip()
if __name__ == "__main__":
# Test the parser
import json
import sys
if len(sys.argv) > 1:
test_query = " ".join(sys.argv[1:])
clean, filters = parse_filters(test_query)
print(f"Original: {test_query}")
print(f"Clean query: {clean}")
print(f"Filters: {json.dumps(filters, indent=2)}")
else:
print(get_help_text())

View File

@@ -45,15 +45,6 @@ def format_number(num):
return f"{num:,}"
def format_score_stars(score):
"""Format score as stars out of 6."""
if score is None:
return "N/A"
# Convert 0-100 score to 0-6 stars, capped at 6 for consistency
stars = min(round(score * 6 / 100), 6)
return "" * stars + f" ({score}/100)"
def format_date(date_obj):
"""Format date object to string."""
if not date_obj or date_obj == "null":
@@ -351,68 +342,31 @@ def main():
# Extract data
status = media.get("status", "Unknown")
format_type = media.get("format", "Unknown")
episodes = media.get("episodes", "??")
episodes = media.get("episodes", "?")
duration = media.get("duration")
duration_str = f"{duration} min/ep" if duration else "Unknown"
duration_str = f"{duration} min" if duration else "Unknown"
score = media.get("averageScore")
score_str = format_score_stars(score)
score_str = f"{score}/100" if score else "N/A"
favourites = format_number(media.get("favourites", 0))
popularity = format_number(media.get("popularity", 0))
genres = ", ".join(media.get("genres", [])) or "Unknown"
genres = ", ".join(media.get("genres", [])[:5]) or "Unknown"
start_date = format_date(media.get("startDate"))
end_date = format_date(media.get("endDate"))
studios_list = media.get("studios", {}).get("nodes", [])
# Studios are those with isAnimationStudio=true
studios = ", ".join([s["name"] for s in studios_list if s.get("name") and s.get("isAnimationStudio")]) or "N/A"
# Producers are those with isAnimationStudio=false
producers = ", ".join([s["name"] for s in studios_list if s.get("name") and not s.get("isAnimationStudio")]) or "N/A"
studios = ", ".join([s.get("name", "") for s in studios_list[:3]]) or "Unknown"
synonyms_list = media.get("synonyms", [])
# Include romaji in synonyms if different from title
romaji = title_obj.get("romaji")
if romaji and romaji != title and romaji not in synonyms_list:
synonyms_list = [romaji] + synonyms_list
synonyms = ", ".join(synonyms_list) or "N/A"
# Tags
tags_list = media.get("tags", [])
tags = ", ".join([t.get("name", "") for t in tags_list if t.get("name")]) or "N/A"
# Next airing episode
next_airing = media.get("nextAiringEpisode")
if next_airing:
next_ep = next_airing.get("episode", "?")
airing_at = next_airing.get("airingAt")
if airing_at:
from datetime import datetime
try:
dt = datetime.fromtimestamp(airing_at)
next_episode_str = f"Episode {next_ep} on {dt.strftime('%A, %d %B %Y at %H:%M')}"
except (ValueError, OSError):
next_episode_str = f"Episode {next_ep}"
else:
next_episode_str = f"Episode {next_ep}"
else:
next_episode_str = "N/A"
# User list status
media_list_entry = media.get("mediaListEntry")
if media_list_entry:
user_status = media_list_entry.get("status", "NOT_ON_LIST")
user_progress = f"Episode {media_list_entry.get('progress', 0)}"
else:
user_status = "NOT_ON_LIST"
user_progress = "0"
synonyms = ", ".join(synonyms_list[:3]) or "N/A"
description = media.get("description", "No description available.")
description = strip_markdown(description)
# Print sections matching media_info.py structure exactly
# Print sections matching media_info.py structure
rows = [
("Score", score_str),
("Favorites", favourites),
@@ -422,17 +376,16 @@ def main():
print_rule(SEPARATOR_COLOR)
for key, value in rows:
print_table_row(key, value, HEADER_COLOR, 15, term_width - 20)
print_table_row(key, value, HEADER_COLOR, 0, 0)
rows = [
("Episodes", str(episodes)),
("Duration", duration_str),
("Next Episode", next_episode_str),
]
print_rule(SEPARATOR_COLOR)
for key, value in rows:
print_table_row(key, value, HEADER_COLOR, 15, term_width - 20)
print_table_row(key, value, HEADER_COLOR, 0, 0)
rows = [
("Genres", genres),
@@ -441,16 +394,7 @@ def main():
print_rule(SEPARATOR_COLOR)
for key, value in rows:
print_table_row(key, value, HEADER_COLOR, 15, term_width - 20)
rows = [
("List Status", user_status),
("Progress", user_progress),
]
print_rule(SEPARATOR_COLOR)
for key, value in rows:
print_table_row(key, value, HEADER_COLOR, 15, term_width - 20)
print_table_row(key, value, HEADER_COLOR, 0, 0)
rows = [
("Start Date", start_date),
@@ -459,16 +403,15 @@ def main():
print_rule(SEPARATOR_COLOR)
for key, value in rows:
print_table_row(key, value, HEADER_COLOR, 15, term_width - 20)
print_table_row(key, value, HEADER_COLOR, 0, 0)
rows = [
("Studios", studios),
("Producers", producers),
]
print_rule(SEPARATOR_COLOR)
for key, value in rows:
print_table_row(key, value, HEADER_COLOR, 15, term_width - 20)
print_table_row(key, value, HEADER_COLOR, 0, 0)
rows = [
("Synonyms", synonyms),
@@ -476,15 +419,7 @@ def main():
print_rule(SEPARATOR_COLOR)
for key, value in rows:
print_table_row(key, value, HEADER_COLOR, 15, term_width - 20)
rows = [
("Tags", tags),
]
print_rule(SEPARATOR_COLOR)
for key, value in rows:
print_table_row(key, value, HEADER_COLOR, 15, term_width - 20)
print_table_row(key, value, HEADER_COLOR, 0, 0)
print_rule(SEPARATOR_COLOR)
print(wrap_text(description, term_width))

View File

@@ -67,7 +67,6 @@ for key, value in rows:
rows = [
("Studios", "{STUDIOS}"),
("Producers", "{PRODUCERS}"),
]
print_rule(SEPARATOR_COLOR)

View File

@@ -5,18 +5,6 @@
# This script is a template for dynamic search functionality in fzf.
# The placeholders in curly braces, like {GRAPHQL_ENDPOINT} are dynamically
# filled by Python using .replace() during runtime.
#
# FILTER SYNTAX:
# @genre:action,comedy Filter by genres
# @genre:!hentai Exclude genre
# @status:airing Status: airing, finished, upcoming, cancelled, hiatus
# @year:2024 Filter by year
# @season:winter winter, spring, summer, fall
# @format:tv,movie tv, movie, ova, ona, special
# @sort:score score, popularity, trending, date, title
# @score:>80 / @score:<50 Min/max score
# @onlist / @onlist:false Filter by list status
# @tag:isekai Filter by tags
import json
import sys
@@ -24,13 +12,9 @@ from pathlib import Path
from urllib import request
from urllib.error import URLError
# Import the filter parser
from _filter_parser import parse_filters
# --- Template Variables (Injected by Python) ---
GRAPHQL_ENDPOINT = "{GRAPHQL_ENDPOINT}"
SEARCH_RESULTS_FILE = Path("{SEARCH_RESULTS_FILE}")
LAST_QUERY_FILE = Path("{LAST_QUERY_FILE}")
AUTH_HEADER = "{AUTH_HEADER}"
# The GraphQL query is injected as a properly escaped JSON string
@@ -38,29 +22,17 @@ GRAPHQL_QUERY = "{GRAPHQL_QUERY}"
# --- Get Query from fzf ---
# fzf passes the current query as the first argument when using --bind change:reload
RAW_QUERY = sys.argv[1] if len(sys.argv) > 1 else ""
QUERY = sys.argv[1] if len(sys.argv) > 1 else ""
# Parse the query to extract filters and clean search text
QUERY, PARSED_FILTERS = parse_filters(RAW_QUERY)
# If query is empty and no filters, show help hint
if not RAW_QUERY.strip():
print("💡 Tip: Use @genre:action @status:airing for filters (type @help for syntax)")
# If query is empty, exit with empty results
if not QUERY.strip():
print("")
sys.exit(0)
# Show filter help if requested
if RAW_QUERY.strip().lower() in ("@help", "@?", "@h"):
from _filter_parser import get_help_text
print(get_help_text())
sys.exit(0)
# If we only have filters (no search text), that's valid - we'll search with filters only
# But if we have neither query nor filters, we already showed the help hint above
def make_graphql_request(
endpoint: str, query: str, variables: dict, auth_token: str = ""
) -> tuple[dict | None, str | None]:
) -> dict | None:
"""
Make a GraphQL request to the specified endpoint.
@@ -71,7 +43,7 @@ def make_graphql_request(
auth_token: Optional authorization token (Bearer token)
Returns:
Tuple of (Response JSON, error message) - one will be None
Response JSON as a dictionary, or None if request fails
"""
payload = {"query": query, "variables": variables}
@@ -89,13 +61,10 @@ def make_graphql_request(
)
with request.urlopen(req, timeout=10) as response:
return json.loads(response.read().decode("utf-8")), None
except URLError as e:
return None, f"Network error: {e.reason}"
except json.JSONDecodeError as e:
return None, f"Invalid response: {e}"
except Exception as e:
return None, f"Request error: {e}"
return json.loads(response.read().decode("utf-8"))
except (URLError, json.JSONDecodeError, Exception) as e:
print(f"❌ Request failed: {e}", file=sys.stderr)
return None
def extract_title(media_item: dict) -> str:
@@ -121,67 +90,34 @@ def main():
# Ensure parent directory exists
SEARCH_RESULTS_FILE.parent.mkdir(parents=True, exist_ok=True)
# Base GraphQL variables
# Create GraphQL variables
variables = {
"query": QUERY,
"type": "ANIME",
"per_page": 50,
"genre_not_in": ["Hentai"], # Default exclusion
"genre_not_in": ["Hentai"],
}
# Add search query if provided
if QUERY:
variables["query"] = QUERY
# Apply parsed filters from the filter syntax
for key, value in PARSED_FILTERS.items():
# Handle array merging for _in and _not_in fields
if key.endswith("_in") or key.endswith("_not_in"):
if key in variables:
# Merge arrays, avoiding duplicates
existing = set(variables[key])
existing.update(value)
variables[key] = list(existing)
else:
variables[key] = value
else:
variables[key] = value
# Make the GraphQL request
response, error = make_graphql_request(
response = make_graphql_request(
GRAPHQL_ENDPOINT, GRAPHQL_QUERY, variables, AUTH_HEADER
)
if error:
print(f"{error}")
# Also show what we tried to search for debugging
print(f" Query: {QUERY or '(none)'}")
print(f" Filters: {json.dumps(PARSED_FILTERS) if PARSED_FILTERS else '(none)'}")
sys.exit(1)
if response is None:
print("❌ Search failed: No response received")
print("❌ Search failed")
sys.exit(1)
# Check for GraphQL errors first (these come in the response body)
if "errors" in response:
errors = response["errors"]
if errors:
# Extract error messages
error_msgs = [e.get("message", str(e)) for e in errors]
print(f"❌ API Error: {'; '.join(error_msgs)}")
# Show variables for debugging
print(f" Filters used: {json.dumps(PARSED_FILTERS, indent=2) if PARSED_FILTERS else '(none)'}")
sys.exit(1)
# Save the raw response for later processing by dynamic_search.py
try:
with open(SEARCH_RESULTS_FILE, "w", encoding="utf-8") as f:
json.dump(response, f, ensure_ascii=False, indent=2)
# Also save the raw query so it can be restored when going back
with open(LAST_QUERY_FILE, "w", encoding="utf-8") as f:
f.write(RAW_QUERY)
except IOError as e:
print(f"❌ Failed to save results: {e}")
print(f"❌ Failed to save results: {e}", file=sys.stderr)
sys.exit(1)
# Parse and display results
if "errors" in response:
print(f"❌ Search error: {response['errors']}")
sys.exit(1)
# Navigate the response structure
@@ -190,9 +126,7 @@ def main():
media_list = page.get("media", [])
if not media_list:
print("🔍 No results found")
if PARSED_FILTERS:
print(" Try adjusting your filters")
print(" No results found")
sys.exit(0)
# Output titles for fzf (one per line)
@@ -207,5 +141,5 @@ if __name__ == "__main__":
except KeyboardInterrupt:
sys.exit(0)
except Exception as e:
print(f"❌ Unexpected error: {type(e).__name__}: {e}")
print(f"❌ Unexpected error: {e}", file=sys.stderr)
sys.exit(1)

View File

@@ -189,7 +189,7 @@ You can disable this message by turning off the welcome_screen option in the con
):
import subprocess
_cli_cmd_name = "viu" if not shutil.which("viu-media") else "viu-media"
_cli_cmd_name="viu" if not shutil.which("viu-media") else "viu-media"
cmd = [_cli_cmd_name, "config", "--update"]
print(f"running '{' '.join(cmd)}'...")
subprocess.run(cmd)

View File

@@ -1,72 +1,25 @@
import webbrowser
from pathlib import Path
import click
import webbrowser
from .....core.config.model import AppConfig
def _get_token(feedback, selector, token_input: str | None) -> str | None:
"""
Retrieves the authentication token from a file path, a direct string, or an interactive prompt.
"""
if token_input:
path = Path(token_input)
if path.is_file():
try:
token = path.read_text().strip()
if not token:
feedback.error(f"Token file is empty: {path}")
return None
return token
except Exception as e:
feedback.error(f"Error reading token from file: {e}")
return None
return token_input
from .....core.constants import ANILIST_AUTH
open_success = webbrowser.open(ANILIST_AUTH, new=2)
if open_success:
feedback.info("Your browser has been opened to obtain an AniList token.")
feedback.info(
f"Or you can visit the site manually [magenta][link={ANILIST_AUTH}]here[/link][/magenta]."
)
else:
feedback.warning(
f"Failed to open the browser. Please visit the site manually [magenta][link={ANILIST_AUTH}]here[/link][/magenta]."
)
feedback.info(
"After authorizing, copy the token from the address bar and paste it below."
)
return selector.ask("Enter your AniList Access Token")
@click.command(help="Login to your AniList account to enable progress tracking.")
@click.option("--status", "-s", is_flag=True, help="Check current login status.")
@click.option("--logout", "-l", is_flag=True, help="Log out and erase credentials.")
@click.argument("token_input", required=False, type=str)
@click.pass_obj
def auth(config: AppConfig, status: bool, logout: bool, token_input: str | None):
"""
Handles user authentication and credential management.
This command allows you to log in to your AniList account to enable
progress tracking and other features.
You can provide your authentication token in three ways:
1. Interactively: Run the command without arguments to open a browser
and be prompted to paste the token.
2. As an argument: Pass the token string directly to the command.
$ viu anilist auth "your_token_here"
3. As a file: Pass the path to a text file containing the token.
$ viu anilist auth /path/to/token.txt
"""
def auth(config: AppConfig, status: bool, logout: bool):
"""Handles user authentication and credential management."""
from .....core.constants import ANILIST_AUTH
from .....libs.media_api.api import create_api_client
from .....libs.selectors.selector import create_selector
from ....service.auth import AuthService
from ....service.feedback import FeedbackService
auth_service = AuthService("anilist")
feedback = FeedbackService(config)
selector = create_selector(config)
feedback.clear_console()
if status:
user_data = auth_service.get_auth()
@@ -76,11 +29,6 @@ def auth(config: AppConfig, status: bool, logout: bool, token_input: str | None)
feedback.error("Not logged in.")
return
from .....libs.selectors.selector import create_selector
selector = create_selector(config)
feedback.clear_console()
if logout:
if selector.confirm("Are you sure you want to log out and erase your token?"):
auth_service.clear_user_profile()
@@ -92,14 +40,27 @@ def auth(config: AppConfig, status: bool, logout: bool, token_input: str | None)
f"You are already logged in as {auth_profile.user_profile.name}.Would you like to relogin"
):
return
token = _get_token(feedback, selector, token_input)
api_client = create_api_client("anilist", config)
open_success = webbrowser.open(ANILIST_AUTH, new=2)
if open_success:
feedback.info("Your browser has been opened to obtain an AniList token.")
feedback.info(
f"or you can visit the site manually [magenta][link={ANILIST_AUTH}]here[/link][/magenta]."
)
else:
feedback.warning(
f"Failed to open the browser. Please visit the site manually [magenta][link={ANILIST_AUTH}]here[/link][/magenta]."
)
feedback.info(
"After authorizing, copy the token from the address bar and paste it below."
)
token = selector.ask("Enter your AniList Access Token")
if not token:
if not token_input:
feedback.error("Login cancelled.")
feedback.error("Login cancelled.")
return
api_client = create_api_client("anilist", config)
# Use the API client to validate the token and get profile info
profile = api_client.authenticate(token.strip())

View File

@@ -30,9 +30,6 @@ from ...core.config import AppConfig
\b
# view the current contents of your config
viu config --view
\b
# clear cached GitHub authentication token
viu config --clear-github-auth
""",
)
@click.option("--path", "-p", help="Print the config location and exit", is_flag=True)
@@ -63,11 +60,6 @@ from ...core.config import AppConfig
is_flag=True,
help="Start the interactive configuration wizard.",
)
@click.option(
"--clear-github-auth",
is_flag=True,
help="Clear cached GitHub authentication token.",
)
@click.pass_obj
def config(
user_config: AppConfig,
@@ -77,18 +69,12 @@ def config(
generate_desktop_entry,
update,
interactive,
clear_github_auth,
):
from ...core.constants import USER_CONFIG
from ..config.editor import InteractiveConfigEditor
from ..config.generate import generate_config_toml_from_app_model
if clear_github_auth:
from ..service.github import GitHubContributionService
GitHubContributionService.clear_cached_auth_static()
click.echo("GitHub authentication cache cleared.")
elif path:
if path:
print(USER_CONFIG)
elif view:
from rich.console import Console

View File

@@ -1 +0,0 @@
# Menu package for interactive session

View File

@@ -1,18 +0,0 @@
# Media menu modules
# Explicit module list for PyInstaller compatibility
__all__ = [
"downloads",
"download_episodes",
"dynamic_search",
"episodes",
"main",
"media_actions",
"media_airing_schedule",
"media_characters",
"media_review",
"player_controls",
"play_downloads",
"provider_search",
"results",
"servers",
]

View File

@@ -1,10 +1,9 @@
import json
import logging
import shutil
import sys
from pathlib import Path
from .....core.constants import APP_CACHE_DIR, SCRIPTS_DIR
from .....core.utils.detect import get_python_executable
from .....libs.media_api.params import MediaSearchParams
from ...session import Context, session
from ...state import InternalDirective, MediaApiState, MenuName, State
@@ -13,36 +12,8 @@ logger = logging.getLogger(__name__)
SEARCH_CACHE_DIR = APP_CACHE_DIR / "previews" / "dynamic-search"
SEARCH_RESULTS_FILE = SEARCH_CACHE_DIR / "current_search_results.json"
LAST_QUERY_FILE = SEARCH_CACHE_DIR / "last_query.txt"
RESTORE_MODE_FILE = SEARCH_CACHE_DIR / ".restore_mode"
FZF_SCRIPTS_DIR = SCRIPTS_DIR / "fzf"
SEARCH_TEMPLATE_SCRIPT = (FZF_SCRIPTS_DIR / "search.py").read_text(encoding="utf-8")
FILTER_PARSER_SCRIPT = FZF_SCRIPTS_DIR / "_filter_parser.py"
def _load_cached_titles() -> list[str]:
"""Load titles from cached search results for display in fzf."""
if not SEARCH_RESULTS_FILE.exists():
return []
try:
with open(SEARCH_RESULTS_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
media_list = data.get("data", {}).get("Page", {}).get("media", [])
titles = []
for media in media_list:
title_obj = media.get("title", {})
title = (
title_obj.get("english")
or title_obj.get("romaji")
or title_obj.get("native")
or "Unknown"
)
titles.append(title)
return titles
except (IOError, json.JSONDecodeError):
return []
@session.menu
@@ -54,12 +25,6 @@ def dynamic_search(ctx: Context, state: State) -> State | InternalDirective:
# Ensure cache directory exists
SEARCH_CACHE_DIR.mkdir(parents=True, exist_ok=True)
# Check if we're in restore mode (coming back from media_actions)
restore_mode = RESTORE_MODE_FILE.exists()
if restore_mode:
# Clear the restore flag
RESTORE_MODE_FILE.unlink(missing_ok=True)
# Read the GraphQL search query
from .....libs.media_api.anilist import gql
@@ -79,7 +44,6 @@ def dynamic_search(ctx: Context, state: State) -> State | InternalDirective:
"GRAPHQL_ENDPOINT": "https://graphql.anilist.co",
"GRAPHQL_QUERY": search_query_json,
"SEARCH_RESULTS_FILE": SEARCH_RESULTS_FILE.as_posix(),
"LAST_QUERY_FILE": LAST_QUERY_FILE.as_posix(),
"AUTH_HEADER": auth_header,
}
@@ -90,34 +54,12 @@ def dynamic_search(ctx: Context, state: State) -> State | InternalDirective:
search_script_file = SEARCH_CACHE_DIR / "search.py"
search_script_file.write_text(search_command, encoding="utf-8")
# Copy the filter parser module to the cache directory
# This is required for the search script to import it
filter_parser_dest = SEARCH_CACHE_DIR / "_filter_parser.py"
if FILTER_PARSER_SCRIPT.exists():
shutil.copy2(FILTER_PARSER_SCRIPT, filter_parser_dest)
# Make the search script executable by calling it with python3
# fzf will pass the query as {q} which becomes the first argument
search_command_final = (
f"{Path(get_python_executable()).as_posix()} {search_script_file.as_posix()} {{q}}"
f"{Path(sys.executable).as_posix()} {search_script_file.as_posix()} {{q}}"
)
# Header hint for filter syntax
filter_hint = "💡 Filters: @genre:action @status:airing @year:2024 @sort:score (type @help for more)"
# Only load previous query if we're in restore mode (coming back from media_actions)
initial_query = None
cached_results = None
if restore_mode:
# Load previous query
if LAST_QUERY_FILE.exists():
try:
initial_query = LAST_QUERY_FILE.read_text(encoding="utf-8").strip()
except IOError:
pass
# Load cached results to display immediately without network request
cached_results = _load_cached_titles()
try:
# Prepare preview functionality
preview_command = None
@@ -131,17 +73,11 @@ def dynamic_search(ctx: Context, state: State) -> State | InternalDirective:
prompt="Search Anime",
search_command=search_command_final,
preview=preview_command,
header=filter_hint,
initial_query=initial_query,
initial_results=cached_results,
)
else:
choice = ctx.selector.search(
prompt="Search Anime",
search_command=search_command_final,
header=filter_hint,
initial_query=initial_query,
initial_results=cached_results,
)
except NotImplementedError:
feedback.error("Dynamic search is not supported by your current selector")
@@ -180,9 +116,6 @@ def dynamic_search(ctx: Context, state: State) -> State | InternalDirective:
logger.error(f"Could not find selected media for choice: {choice}")
return InternalDirective.MAIN
# Set restore mode flag so we can restore state when user goes back
RESTORE_MODE_FILE.touch()
# Navigate to media actions with the selected item
return State(
menu_name=MenuName.MEDIA_ACTIONS,

View File

@@ -73,21 +73,6 @@ def provider_search(ctx: Context, state: State) -> State | InternalDirective:
update_user_normalizer_json(
chosen_title, media_title, config.general.provider.value
)
# Offer to submit the mapping to GitHub
if selector.confirm(
"Would you like to contribute this mapping to the project on GitHub?"
):
from ....service.github import GitHubContribution
contribution = GitHubContribution(
provider_name=config.general.provider.value,
provider_title=chosen_title,
media_api_title=media_title,
anilist_id=media_item.id if hasattr(media_item, "id") else None,
)
ctx.github.submit_contribution(contribution)
selected_provider_anime = provider_results_map[chosen_title]
with feedback.progress(

View File

@@ -1,7 +1,6 @@
import importlib
import importlib.util
import logging
import pkgutil
import os
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Callable, List, Optional, Union
@@ -17,7 +16,6 @@ if TYPE_CHECKING:
from ...libs.selectors.base import BaseSelector
from ..service.auth import AuthService
from ..service.feedback import FeedbackService
from ..service.github import GitHubContributionService
from ..service.player import PlayerService
from ..service.registry import MediaRegistryService
from ..service.session import SessionsService
@@ -93,7 +91,6 @@ class Context:
_session: Optional["SessionsService"] = None
_auth: Optional["AuthService"] = None
_player: Optional["PlayerService"] = None
_github: Optional["GitHubContributionService"] = None
@property
def provider(self) -> "BaseAnimeProvider":
@@ -193,17 +190,6 @@ class Context:
self._auth = AuthService(self.config.general.media_api)
return self._auth
@property
def github(self) -> "GitHubContributionService":
if not self._github:
from ..service.github.service import GitHubContributionService
self._github = GitHubContributionService(
selector=self.selector,
feedback=self.feedback,
)
return self._github
MenuFunction = Callable[[Context, State], Union[State, InternalDirective]]
@@ -323,46 +309,30 @@ class Session:
return decorator
def load_menus_from_folder(self, package: str):
"""Load menu modules from a subfolder.
Uses pkgutil to discover modules for regular Python, and falls back
to the package's __all__ list for PyInstaller frozen executables.
"""
full_package_name = f"viu_media.cli.interactive.menu.{package}"
logger.debug(f"Loading menus from package '{full_package_name}'...")
package_path = MENUS_DIR / package
package_name = package_path.name
logger.debug(f"Loading menus from '{package_path}'...")
try:
# Import the parent package first
parent_package = importlib.import_module(full_package_name)
except ImportError as e:
logger.error(f"Failed to import menu package '{full_package_name}': {e}")
return
# Try pkgutil first (works in regular Python)
package_path = getattr(parent_package, "__path__", None)
module_names = []
if package_path:
module_names = [
name for _, name, ispkg in pkgutil.iter_modules(package_path)
if not ispkg and not name.startswith("_")
]
# Fallback to __all__ for PyInstaller frozen executables
if not module_names:
module_names = getattr(parent_package, "__all__", [])
logger.debug(f"Using __all__ fallback with {len(module_names)} modules")
for module_name in module_names:
full_module_name = f"{full_package_name}.{module_name}"
try:
# Simply importing the module will execute it,
# which runs the @session.menu decorators
importlib.import_module(full_module_name)
except Exception as e:
logger.error(
f"Failed to load menu module '{full_module_name}': {e}"
for filename in os.listdir(package_path):
if filename.endswith(".py") and not filename.startswith("__"):
module_name = filename[:-3]
full_module_name = (
f"viu_media.cli.interactive.menu.{package_name}.{module_name}"
)
file_path = package_path / filename
try:
spec = importlib.util.spec_from_file_location(
full_module_name, file_path
)
if spec and spec.loader:
module = importlib.util.module_from_spec(spec)
# The act of executing the module runs the @session.menu decorators
spec.loader.exec_module(module)
except Exception as e:
logger.error(
f"Failed to load menu module '{full_module_name}': {e}"
)
# Create a single, global instance of the Session to be imported by menu modules.

View File

@@ -1,4 +0,0 @@
from .model import AuthMethod, GitHubContribution, GitHubPRResponse
from .service import GitHubContributionService
__all__ = ["GitHubContributionService", "GitHubContribution", "AuthMethod", "GitHubPRResponse"]

View File

@@ -1,66 +0,0 @@
from enum import Enum
from typing import Optional
from pydantic import BaseModel, Field
class AuthMethod(str, Enum):
"""Authentication method for GitHub API."""
BROWSER = "browser"
GH_CLI = "gh"
class GitHubAuth(BaseModel):
"""Stored GitHub authentication credentials."""
access_token: str
token_type: str = "bearer"
scope: str = ""
class GitHubContribution(BaseModel):
"""Represents a normalizer mapping contribution."""
provider_name: str = Field(..., description="The provider name (e.g., 'allanime')")
provider_title: str = Field(
..., description="The title as it appears on the provider"
)
media_api_title: str = Field(..., description="The normalized media API title")
anilist_id: Optional[int] = Field(
default=None, description="Optional AniList ID for reference"
)
class GitHubPRResponse(BaseModel):
"""Response from GitHub API when creating a pull request."""
id: int
number: int
html_url: str
title: str
state: str
class GitHubUser(BaseModel):
"""GitHub user information."""
login: str
id: int
class GitHubRepo(BaseModel):
"""GitHub repository information."""
full_name: str
default_branch: str
fork: bool = False
class GitHubFileContent(BaseModel):
"""GitHub file content response."""
sha: str
content: str
encoding: str = "base64"

View File

@@ -1,674 +0,0 @@
"""
GitHub Contribution Service
Provides functionality to submit normalizer mappings to the viu repository
via Pull Request, using either browser-based OAuth or the GitHub CLI (gh).
"""
import base64
import json
import logging
import shutil
import subprocess
import time
import webbrowser
from typing import TYPE_CHECKING, Optional
import httpx
from ....core.constants import APP_DATA_DIR, AUTHOR, CLI_NAME
from ....core.utils.file import AtomicWriter, FileLock
from ....core.utils.normalizer import USER_NORMALIZER_JSON
from .model import (
AuthMethod,
GitHubAuth,
GitHubContribution,
GitHubFileContent,
GitHubPRResponse,
GitHubRepo,
GitHubUser,
)
if TYPE_CHECKING:
from ....libs.selectors.base import BaseSelector
from ...service.feedback import FeedbackService
logger = logging.getLogger(__name__)
# GitHub OAuth configuration
GITHUB_CLIENT_ID = "Iv23liXUYWot4d4Zvjxa" # Register your OAuth app on GitHub
GITHUB_OAUTH_SCOPES = "public_repo"
GITHUB_API_BASE = "https://api.github.com"
# Repository information
REPO_OWNER = AUTHOR
REPO_NAME = "viu" # Must match GitHub repo name exactly (case-sensitive)
NORMALIZER_FILE_PATH = "viu_media/assets/normalizer.json"
AUTH_FILE = APP_DATA_DIR / "github_auth.json"
class GitHubContributionService:
"""Service for submitting normalizer mappings to GitHub."""
def __init__(
self,
selector: "BaseSelector",
feedback: Optional["FeedbackService"] = None,
):
self.selector = selector
self.feedback = feedback
self._lock = FileLock(APP_DATA_DIR / "github_auth.lock")
self._http_client = httpx.Client(
headers={
"Accept": "application/json",
"User-Agent": f"{CLI_NAME}/1.0",
},
timeout=30.0,
follow_redirects=True, # Follow redirects for all request types
)
def __del__(self):
"""Cleanup HTTP client."""
if hasattr(self, "_http_client"):
self._http_client.close()
def is_gh_cli_available(self) -> bool:
"""Check if GitHub CLI (gh) is installed and available."""
return shutil.which("gh") is not None
def is_gh_cli_authenticated(self) -> bool:
"""Check if GitHub CLI is authenticated."""
if not self.is_gh_cli_available():
return False
try:
result = subprocess.run(
["gh", "auth", "status"],
capture_output=True,
text=True,
timeout=10,
)
return result.returncode == 0
except (subprocess.SubprocessError, OSError):
return False
def get_available_auth_methods(self) -> list[AuthMethod]:
"""Get list of available authentication methods."""
methods = [AuthMethod.BROWSER]
if self.is_gh_cli_available():
methods.insert(0, AuthMethod.GH_CLI) # Prefer gh CLI if available
return methods
def prompt_auth_method(self) -> Optional[AuthMethod]:
"""
Prompt user to select their preferred authentication method.
Returns:
Selected AuthMethod or None if cancelled.
"""
methods = self.get_available_auth_methods()
choices = []
for method in methods:
if method == AuthMethod.GH_CLI:
status = "✓ authenticated" if self.is_gh_cli_authenticated() else ""
choices.append(f"gh CLI {status}".strip())
else:
choices.append("Browser (OAuth)")
choices.append("Cancel")
choice = self.selector.choose(
prompt="Select GitHub authentication method",
choices=choices,
)
if not choice or choice == "Cancel":
return None
if choice.startswith("gh CLI"):
return AuthMethod.GH_CLI
return AuthMethod.BROWSER
def submit_contribution(
self,
contribution: GitHubContribution,
auth_method: Optional[AuthMethod] = None,
) -> Optional[str]:
"""
Submit a normalizer mapping contribution to GitHub as a Pull Request.
This will:
1. Fork the repository (if not already forked)
2. Create a new branch with the updated normalizer.json
3. Open a Pull Request to the upstream repository
Args:
contribution: The mapping contribution to submit.
auth_method: The authentication method to use. If None, will prompt.
Returns:
URL of the created PR, or None if failed.
"""
if auth_method is None:
auth_method = self.prompt_auth_method()
if auth_method is None:
return None
if auth_method == AuthMethod.GH_CLI:
return self._submit_pr_via_gh_cli(contribution)
else:
return self._submit_pr_via_api(contribution)
def _get_user_normalizer_content(self) -> Optional[dict]:
"""Read the user's local normalizer.json file."""
if not USER_NORMALIZER_JSON.exists():
self._log_error(
f"Local normalizer.json not found at {USER_NORMALIZER_JSON}"
)
return None
try:
with USER_NORMALIZER_JSON.open("r", encoding="utf-8") as f:
return json.load(f)
except (json.JSONDecodeError, OSError) as e:
self._log_error(f"Failed to read normalizer.json: {e}")
return None
def _submit_pr_via_gh_cli(
self, contribution: GitHubContribution
) -> Optional[str]:
"""Submit PR using GitHub CLI."""
if not self.is_gh_cli_available():
self._log_error("GitHub CLI (gh) is not installed")
return None
if not self.is_gh_cli_authenticated():
self._log_info("GitHub CLI not authenticated. Running 'gh auth login'...")
try:
subprocess.run(["gh", "auth", "login"], check=True)
except subprocess.SubprocessError:
self._log_error("Failed to authenticate with GitHub CLI")
return None
# Read local normalizer content
normalizer_content = self._get_user_normalizer_content()
if not normalizer_content:
return None
# Get current username
try:
result = subprocess.run(
["gh", "api", "user", "--jq", ".login"],
capture_output=True,
text=True,
timeout=30,
)
if result.returncode != 0:
self._log_error("Failed to get GitHub username")
return None
username = result.stdout.strip()
except subprocess.SubprocessError as e:
self._log_error(f"Failed to get username: {e}")
return None
# Fork the repository if not already forked
self._log_info("Ensuring fork exists...")
try:
subprocess.run(
["gh", "repo", "fork", f"{REPO_OWNER}/{REPO_NAME}", "--clone=false"],
capture_output=True,
text=True,
timeout=60,
)
except subprocess.SubprocessError:
pass # Fork may already exist, continue
# Create branch name
branch_name = f"normalizer/{contribution.provider_name}-{int(time.time())}"
# Create the PR using gh pr create with the file content
title = self._format_pr_title(contribution)
body = self._format_pr_body(contribution)
# We need to create the branch and commit via API since gh doesn't support this directly
# Fall back to API method for the actual PR creation
self._log_info("Creating pull request...")
# Get token from gh CLI
try:
result = subprocess.run(
["gh", "auth", "token"],
capture_output=True,
text=True,
timeout=10,
)
if result.returncode != 0:
self._log_error("Failed to get auth token from gh CLI")
return None
token = result.stdout.strip()
except subprocess.SubprocessError as e:
self._log_error(f"Failed to get token: {e}")
return None
return self._create_pr_via_api(contribution, token, normalizer_content)
def _submit_pr_via_api(self, contribution: GitHubContribution) -> Optional[str]:
"""Submit PR using browser-based OAuth and GitHub API."""
# Authenticate
auth = self._load_cached_auth()
if not auth or not self._validate_token(auth.access_token):
auth = self._perform_device_flow_auth()
if not auth:
self._log_error("Failed to authenticate with GitHub")
return None
self._save_auth(auth)
# Read local normalizer content
normalizer_content = self._get_user_normalizer_content()
if not normalizer_content:
return None
return self._create_pr_via_api(contribution, auth.access_token, normalizer_content)
def _create_pr_via_api(
self,
contribution: GitHubContribution,
token: str,
normalizer_content: dict,
) -> Optional[str]:
"""Create a Pull Request via GitHub API."""
headers = {"Authorization": f"Bearer {token}"}
# Step 1: Get current user
self._log_info("Getting user info...")
try:
response = self._http_client.get(
f"{GITHUB_API_BASE}/user", headers=headers
)
response.raise_for_status()
user = GitHubUser.model_validate(response.json())
except httpx.HTTPError as e:
self._log_error(f"Failed to get user info: {e}")
return None
# Step 2: Fork the repository (if not already forked)
self._log_info("Ensuring fork exists...")
fork_exists = False
fork_full_name = ""
try:
# Check if fork exists by listing user's forks of the repo
response = self._http_client.get(
f"{GITHUB_API_BASE}/repos/{REPO_OWNER}/{REPO_NAME}/forks",
headers=headers,
)
response.raise_for_status()
forks = response.json()
# Find user's fork
user_fork = next(
(f for f in forks if f["owner"]["login"].lower() == user.login.lower()),
None
)
if user_fork:
fork_full_name = user_fork["full_name"]
fork_exists = True
else:
# Create fork
self._log_info("Creating fork...")
response = self._http_client.post(
f"{GITHUB_API_BASE}/repos/{REPO_OWNER}/{REPO_NAME}/forks",
headers=headers,
)
response.raise_for_status()
fork_data = response.json()
fork_full_name = fork_data["full_name"]
# Wait for fork to be ready
time.sleep(5)
except httpx.HTTPError as e:
self._log_error(f"Failed to create/check fork: {e}")
return None
self._log_info(f"Using fork: {fork_full_name}")
# Step 3: Get the default branch SHA from upstream
self._log_info("Getting upstream branch info...")
try:
response = self._http_client.get(
f"{GITHUB_API_BASE}/repos/{REPO_OWNER}/{REPO_NAME}/git/ref/heads/master",
headers=headers,
)
response.raise_for_status()
base_sha = response.json()["object"]["sha"]
except httpx.HTTPError as e:
self._log_error(f"Failed to get base branch: {e}")
return None
# Step 3.5: Sync fork with upstream if it already existed
if fork_exists:
self._log_info("Syncing fork with upstream...")
try:
response = self._http_client.post(
f"{GITHUB_API_BASE}/repos/{fork_full_name}/merge-upstream",
headers=headers,
json={"branch": "master"},
)
# 409 means already up to date, which is fine
if response.status_code not in (200, 409):
response.raise_for_status()
except httpx.HTTPError as e:
self._log_info(f"Could not sync fork (continuing anyway): {e}")
# Step 4: Create a new branch in the fork
branch_name = f"normalizer/{contribution.provider_name}-{int(time.time())}"
self._log_info(f"Creating branch: {branch_name}")
try:
response = self._http_client.post(
f"{GITHUB_API_BASE}/repos/{fork_full_name}/git/refs",
headers=headers,
json={"ref": f"refs/heads/{branch_name}", "sha": base_sha},
)
response.raise_for_status()
except httpx.HTTPStatusError as e:
error_detail = ""
try:
error_detail = str(e.response.json())
except Exception:
pass
self._log_error(f"Failed to create branch: {e} {error_detail}")
return None
except httpx.HTTPError as e:
self._log_error(f"Failed to create branch: {e}")
return None
# Step 5: Get current normalizer.json from the fork's new branch to get SHA
self._log_info("Fetching current normalizer.json...")
try:
response = self._http_client.get(
f"{GITHUB_API_BASE}/repos/{fork_full_name}/contents/{NORMALIZER_FILE_PATH}",
headers=headers,
params={"ref": branch_name},
)
response.raise_for_status()
file_info = GitHubFileContent.model_validate(response.json())
file_sha = file_info.sha
# Decode existing content and merge with user's mappings
existing_content = json.loads(
base64.b64decode(file_info.content).decode("utf-8")
)
# Merge: user's normalizer takes precedence
merged_content = existing_content.copy()
for provider, mappings in normalizer_content.items():
if provider not in merged_content:
merged_content[provider] = {}
merged_content[provider].update(mappings)
except httpx.HTTPError as e:
self._log_error(f"Failed to get normalizer.json: {e}")
return None
# Step 6: Update the file in the fork
self._log_info("Committing changes...")
new_content = json.dumps(merged_content, indent=2, ensure_ascii=False)
encoded_content = base64.b64encode(new_content.encode("utf-8")).decode("utf-8")
commit_message = (
f"feat(normalizer): add mapping for '{contribution.provider_title}'\n\n"
f"Provider: {contribution.provider_name}\n"
f"Maps: {contribution.provider_title} -> {contribution.media_api_title}"
)
try:
response = self._http_client.put(
f"{GITHUB_API_BASE}/repos/{fork_full_name}/contents/{NORMALIZER_FILE_PATH}",
headers=headers,
json={
"message": commit_message,
"content": encoded_content,
"sha": file_sha,
"branch": branch_name,
},
)
response.raise_for_status()
except httpx.HTTPStatusError as e:
error_detail = ""
try:
error_detail = str(e.response.json())
except Exception:
pass
self._log_error(f"Failed to commit changes: {e} {error_detail}")
return None
except httpx.HTTPError as e:
self._log_error(f"Failed to commit changes: {e}")
return None
# Step 7: Create the Pull Request
self._log_info("Creating pull request...")
title = self._format_pr_title(contribution)
body = self._format_pr_body(contribution)
try:
response = self._http_client.post(
f"{GITHUB_API_BASE}/repos/{REPO_OWNER}/{REPO_NAME}/pulls",
headers=headers,
json={
"title": title,
"body": body,
"head": f"{user.login}:{branch_name}",
"base": "master",
},
)
response.raise_for_status()
pr = GitHubPRResponse.model_validate(response.json())
self._log_success(f"Created PR #{pr.number}: {pr.html_url}")
return pr.html_url
except httpx.HTTPStatusError as e:
error_detail = ""
try:
error_json = e.response.json()
error_detail = error_json.get("message", "")
# GitHub includes detailed errors in 'errors' array
if "errors" in error_json:
errors = error_json["errors"]
error_detail += " | " + str(errors)
except Exception:
pass
self._log_error(f"Failed to create PR: {e} {error_detail}")
return None
except httpx.HTTPError as e:
self._log_error(f"Failed to create PR: {e}")
return None
def _format_pr_title(self, contribution: GitHubContribution) -> str:
"""Format the PR title."""
return (
f"feat(normalizer): add mapping for '{contribution.provider_title}' "
f"({contribution.provider_name})"
)
def _format_pr_body(self, contribution: GitHubContribution) -> str:
"""Format the PR body."""
return f"""## Normalizer Mapping Contribution
This PR adds a new title mapping to the normalizer.
### Mapping Details
| Field | Value |
|-------|-------|
| **Provider** | `{contribution.provider_name}` |
| **Provider Title** | `{contribution.provider_title}` |
| **Media API Title** | `{contribution.media_api_title}` |
| **AniList ID** | {contribution.anilist_id or 'N/A'} |
### Changes
This PR updates `{NORMALIZER_FILE_PATH}` with the following mapping:
```json
"{contribution.provider_title}": "{contribution.media_api_title.lower()}"
```
---
*Submitted automatically via {CLI_NAME} CLI*
"""
def _perform_device_flow_auth(self) -> Optional[GitHubAuth]:
"""
Perform GitHub Device Flow authentication.
This is more reliable for CLI apps than the web redirect flow.
"""
self._log_info("Starting GitHub authentication...")
# Request device code
try:
response = self._http_client.post(
"https://github.com/login/device/code",
data={
"client_id": GITHUB_CLIENT_ID,
"scope": GITHUB_OAUTH_SCOPES,
},
headers={"Accept": "application/json"},
)
response.raise_for_status()
data = response.json()
except httpx.HTTPError as e:
self._log_error(f"Failed to start authentication: {e}")
return None
device_code = data.get("device_code")
user_code = data.get("user_code")
verification_uri = data.get("verification_uri")
expires_in = data.get("expires_in", 900)
interval = data.get("interval", 5)
if not all([device_code, user_code, verification_uri]):
self._log_error("Invalid response from GitHub")
return None
# Show user the code and open browser
self._log_info(f"\n🔑 Your code: {user_code}")
self._log_info(f"Opening {verification_uri} in your browser...")
self._log_info("Enter the code above to authenticate.\n")
webbrowser.open(verification_uri)
# Poll for token
import time
start_time = time.time()
while time.time() - start_time < expires_in:
time.sleep(interval)
try:
token_response = self._http_client.post(
"https://github.com/login/oauth/access_token",
data={
"client_id": GITHUB_CLIENT_ID,
"device_code": device_code,
"grant_type": "urn:ietf:params:oauth:grant-type:device_code",
},
headers={"Accept": "application/json"},
)
token_data = token_response.json()
if "access_token" in token_data:
self._log_success("Authentication successful!")
return GitHubAuth(
access_token=token_data["access_token"],
token_type=token_data.get("token_type", "bearer"),
scope=token_data.get("scope", ""),
)
error = token_data.get("error")
if error == "authorization_pending":
continue
elif error == "slow_down":
interval += 5
elif error == "expired_token":
self._log_error("Authentication expired. Please try again.")
return None
elif error == "access_denied":
self._log_error("Authentication denied by user.")
return None
else:
self._log_error(f"Authentication error: {error}")
return None
except httpx.HTTPError:
continue
self._log_error("Authentication timed out. Please try again.")
return None
def _validate_token(self, token: str) -> bool:
"""Check if a GitHub token is still valid."""
try:
response = self._http_client.get(
f"{GITHUB_API_BASE}/user",
headers={"Authorization": f"Bearer {token}"},
)
return response.status_code == 200
except httpx.HTTPError:
return False
def _load_cached_auth(self) -> Optional[GitHubAuth]:
"""Load cached GitHub authentication."""
if not AUTH_FILE.exists():
return None
try:
with AUTH_FILE.open("r", encoding="utf-8") as f:
data = json.load(f)
return GitHubAuth.model_validate(data)
except (json.JSONDecodeError, ValueError):
return None
def _save_auth(self, auth: GitHubAuth) -> None:
"""Save GitHub authentication to cache."""
APP_DATA_DIR.mkdir(parents=True, exist_ok=True)
with self._lock:
with AtomicWriter(AUTH_FILE) as f:
json.dump(auth.model_dump(), f, indent=2)
def clear_cached_auth(self) -> None:
"""Clear cached GitHub authentication."""
if AUTH_FILE.exists():
AUTH_FILE.unlink()
logger.info("Cleared GitHub authentication cache")
@staticmethod
def clear_cached_auth_static() -> None:
"""Clear cached GitHub authentication (static method for CLI use)."""
if AUTH_FILE.exists():
AUTH_FILE.unlink()
logger.info("Cleared GitHub authentication cache")
def _log_info(self, message: str) -> None:
"""Log info message."""
if self.feedback:
self.feedback.info(message)
else:
logger.info(message)
def _log_success(self, message: str) -> None:
"""Log success message."""
if self.feedback:
self.feedback.success(message)
else:
logger.info(message)
def _log_error(self, message: str) -> None:
"""Log error message."""
if self.feedback:
self.feedback.error(message)
else:
logger.error(message)

View File

@@ -5,6 +5,7 @@ This provides advanced features like episode navigation, quality switching, and
import json
import logging
import os
import socket
import subprocess
import tempfile
@@ -43,7 +44,7 @@ class MPVIPCClient:
def __init__(self, socket_path: str):
self.socket_path = socket_path
self.socket: Optional[socket.socket] = None
self.socket: Optional[Any] = None
self._request_id_counter = 0
self._lock = threading.Lock()
@@ -55,16 +56,54 @@ class MPVIPCClient:
self._response_dict: Dict[int, Any] = {}
self._response_events: Dict[int, threading.Event] = {}
@staticmethod
def _is_windows_named_pipe(path: str) -> bool:
return path.startswith("\\\\.\\pipe\\")
@staticmethod
def _supports_unix_sockets() -> bool:
return hasattr(socket, "AF_UNIX")
@staticmethod
def _open_windows_named_pipe(path: str):
# MPV's JSON IPC on Windows uses named pipes like: \\.\pipe\mpvpipe
# Opening the pipe as a binary file supports read/write.
f = open(path, "r+b", buffering=0)
class _PipeConn:
def __init__(self, fileobj):
self._f = fileobj
def recv(self, n: int) -> bytes:
return self._f.read(n)
def sendall(self, data: bytes) -> None:
self._f.write(data)
self._f.flush()
def close(self) -> None:
self._f.close()
return _PipeConn(f)
def connect(self, timeout: float = 5.0) -> None:
"""Connect to MPV IPC socket and start the reader thread."""
if not hasattr(socket, "AF_UNIX"):
raise MPVIPCError("Unix domain sockets are unavailable on this platform")
start_time = time.time()
while time.time() - start_time < timeout:
try:
self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.socket.connect(self.socket_path)
if self._supports_unix_sockets() and not self._is_windows_named_pipe(
self.socket_path
):
self.socket = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
self.socket.connect(self.socket_path)
else:
if os.name != "nt" or not self._is_windows_named_pipe(self.socket_path):
raise MPVIPCError(
"MPV IPC requires Unix domain sockets (AF_UNIX) or a Windows named pipe path "
"like \\\\.\\pipe\\mpvpipe. Got: "
f"{self.socket_path}"
)
self.socket = self._open_windows_named_pipe(self.socket_path)
logger.info(f"Connected to MPV IPC socket at {self.socket_path}")
self._start_reader_thread()
return
@@ -302,10 +341,6 @@ class MpvIPCPlayer(BaseIPCPlayer):
def _play_with_ipc(self, player: BasePlayer, params: PlayerParams) -> PlayerResult:
"""Play media using MPV IPC."""
try:
if not hasattr(socket, "AF_UNIX"):
raise MPVIPCError(
"MPV IPC requires Unix domain sockets, which are unavailable on this platform."
)
self._start_mpv_process(player, params)
self._connect_ipc()
self._setup_event_handling()
@@ -336,8 +371,12 @@ class MpvIPCPlayer(BaseIPCPlayer):
def _start_mpv_process(self, player: BasePlayer, params: PlayerParams) -> None:
"""Start MPV process with IPC enabled."""
temp_dir = Path(tempfile.gettempdir())
self.socket_path = str(temp_dir / f"mpv_ipc_{time.time()}.sock")
if hasattr(socket, "AF_UNIX"):
temp_dir = Path(tempfile.gettempdir())
self.socket_path = str(temp_dir / f"mpv_ipc_{time.time()}.sock")
else:
# Windows MPV IPC uses named pipes.
self.socket_path = f"\\\\.\\pipe\\mpv_ipc_{int(time.time() * 1000)}"
self.mpv_process = player.play_with_ipc(params, self.socket_path)
time.sleep(1.0)
@@ -487,7 +526,11 @@ class MpvIPCPlayer(BaseIPCPlayer):
self.mpv_process.wait(timeout=3)
except subprocess.TimeoutExpired:
self.mpv_process.kill()
if self.socket_path and Path(self.socket_path).exists():
if (
self.socket_path
and not self.socket_path.startswith("\\\\.\\pipe\\")
and Path(self.socket_path).exists()
):
Path(self.socket_path).unlink(missing_ok=True)
def _get_episode(

View File

@@ -2,6 +2,7 @@ import logging
from pathlib import Path
import re
from hashlib import sha256
import sys
from typing import Dict, List, Optional
import httpx
@@ -10,7 +11,6 @@ from viu_media.core.utils import formatter
from ...core.config import AppConfig
from ...core.constants import APP_CACHE_DIR, SCRIPTS_DIR
from ...core.utils.detect import get_python_executable
from ...core.utils.file import AtomicWriter
from ...libs.media_api.types import (
AiringScheduleResult,
@@ -327,7 +327,7 @@ def get_anime_preview(
preview_file.write_text(preview_script, encoding="utf-8")
preview_script_final = (
f"{Path(get_python_executable()).as_posix()} {preview_file.as_posix()} {{}}"
f"{Path(sys.executable).as_posix()} {preview_file.as_posix()} {{}}"
)
return preview_script_final
@@ -387,7 +387,7 @@ def get_episode_preview(
preview_file.write_text(preview_script, encoding="utf-8")
preview_script_final = (
f"{Path(get_python_executable()).as_posix()} {preview_file.as_posix()} {{}}"
f"{Path(sys.executable).as_posix()} {preview_file.as_posix()} {{}}"
)
return preview_script_final
@@ -435,7 +435,7 @@ def get_character_preview(choice_map: Dict[str, Character], config: AppConfig) -
preview_file.write_text(preview_script, encoding="utf-8")
preview_script_final = (
f"{Path(get_python_executable()).as_posix()} {preview_file.as_posix()} {{}}"
f"{Path(sys.executable).as_posix()} {preview_file.as_posix()} {{}}"
)
return preview_script_final
@@ -483,7 +483,7 @@ def get_review_preview(choice_map: Dict[str, MediaReview], config: AppConfig) ->
preview_file.write_text(preview_script, encoding="utf-8")
preview_script_final = (
f"{Path(get_python_executable()).as_posix()} {preview_file.as_posix()} {{}}"
f"{Path(sys.executable).as_posix()} {preview_file.as_posix()} {{}}"
)
return preview_script_final
@@ -599,7 +599,7 @@ def get_dynamic_anime_preview(config: AppConfig) -> str:
# Return the command to execute the preview script
preview_script_final = (
f"{Path(get_python_executable()).as_posix()} {preview_file.as_posix()} {{}}"
f"{Path(sys.executable).as_posix()} {preview_file.as_posix()} {{}}"
)
return preview_script_final

View File

@@ -189,12 +189,7 @@ class PreviewCacheWorker(ManagedBackgroundWorker):
),
"STUDIOS": formatter.shell_safe(
formatter.format_list_with_commas(
[t.name for t in media_item.studios if t.name and t.is_animation_studio]
)
),
"PRODUCERS": formatter.shell_safe(
formatter.format_list_with_commas(
[t.name for t in media_item.studios if t.name and not t.is_animation_studio]
[t.name for t in media_item.studios if t.name]
)
),
"SYNONYMNS": formatter.shell_safe(

View File

@@ -56,30 +56,3 @@ def is_running_kitty_terminal() -> bool:
def has_fzf() -> bool:
return True if shutil.which("fzf") else False
def is_frozen() -> bool:
"""Check if running as a PyInstaller frozen executable."""
return getattr(sys, "frozen", False)
def get_python_executable() -> str:
"""
Get the Python executable path.
In frozen (PyInstaller) apps, sys.executable points to the .exe,
so we need to find the system Python instead.
Returns:
Path to a Python executable.
"""
if is_frozen():
# We're in a frozen app - find system Python
for python_name in ["python3", "python", "py"]:
python_path = shutil.which(python_name)
if python_path:
return python_path
# Fallback - this likely won't work but is the best we can do
return "python"
else:
return sys.executable

View File

@@ -184,22 +184,13 @@ def format_score(score: Optional[float]) -> str:
def shell_safe(text: Optional[str]) -> str:
"""
Escapes a string for safe inclusion in a Python script string literal.
This is used when generating Python cache scripts with embedded text content.
For Python triple-quoted strings, we need to:
- Escape backslashes first (so existing backslashes don't interfere)
- Escape triple quotes (to not break the string literal)
- Remove or replace problematic characters
Escapes a string for safe inclusion in a shell script,
specifically for use within double quotes. It escapes backticks,
double quotes, and dollar signs.
"""
if not text:
return ""
# Escape backslashes first
result = text.replace("\\", "\\\\")
# Escape triple quotes (both types) for Python triple-quoted string literals
result = result.replace('"""', r'\"\"\"')
result = result.replace("'''", r"\'\'\'")
return result
return text.replace("`", "\\`").replace('"', '\\"').replace("$", "\\$")
def extract_episode_number(title: str) -> Optional[float]:

View File

@@ -50,10 +50,15 @@ def _load_normalizer_data() -> Dict[str, Dict[str, str]]:
def update_user_normalizer_json(
provider_title: str, media_api_title: str, provider_name: str
):
import time
from .file import AtomicWriter
logger.info(f"Updating user normalizer JSON at: {USER_NORMALIZER_JSON}")
print(
"UPDATING USER NORMALIZER JSON. PLEASE CONTRIBUTE TO THE PROJECT BY OPENING A PR ON GITHUB TO MERGE YOUR NORMALIZER JSON TO MAIN. MAEMOTTE KANSHA SHIMASU :)"
)
print(f"NORMALIZER JSON PATH IS: {USER_NORMALIZER_JSON}")
time.sleep(5)
if not _normalizer_cache:
raise RuntimeError(
"Fatal _normalizer_cache missing this should not be the case : (. Please report"

View File

@@ -323,14 +323,7 @@ def to_generic_user_list_result(data: AnilistMediaLists) -> Optional[MediaSearch
def to_generic_user_profile(data: AnilistViewerData) -> Optional[UserProfile]:
"""Maps a raw AniList viewer response to a generic UserProfile."""
data_node = data.get("data")
if not data_node:
return None
viewer_data: Optional[AnilistCurrentlyLoggedInUser] = data_node.get("Viewer")
if not viewer_data:
return None
viewer_data: Optional[AnilistCurrentlyLoggedInUser] = data["data"]["Viewer"]
return UserProfile(
id=viewer_data["id"],

View File

@@ -52,7 +52,7 @@ class MpvPlayer(BasePlayer):
if TORRENT_REGEX.match(params.url) and detect.is_running_in_termux():
raise ViuError("Unable to play torrents on termux")
elif params.syncplay and detect.is_running_in_termux():
raise ViuError("Unable to play with syncplay on termux")
raise ViuError("Unable to play torrents on termux")
elif detect.is_running_in_termux():
return self._play_on_mobile(params)
else:

View File

@@ -46,11 +46,10 @@ class VlcPlayer(BasePlayer):
Returns:
PlayerResult: Information about the playback session.
"""
if not self.executable:
raise ViuError("VLC executable not found in PATH.")
if TORRENT_REGEX.match(params.url) and detect.is_running_in_termux():
raise ViuError("Unable to play torrents on termux")
elif params.syncplay and detect.is_running_in_termux():
raise ViuError("Unable to play with syncplay on termux")
elif detect.is_running_in_termux():
return self._play_on_mobile(params)
else:
return self._play_on_desktop(params)
@@ -117,9 +116,6 @@ class VlcPlayer(BasePlayer):
Returns:
PlayerResult: Information about the playback session.
"""
if not self.executable:
raise ViuError("VLC executable not found in PATH.")
if TORRENT_REGEX.search(params.url):
return self._stream_on_desktop_with_webtorrent_cli(params)

View File

@@ -27,7 +27,7 @@ SERVER_HEADERS = {
"Accept-Encoding": "Utf-8",
"DNT": "1",
"Connection": "keep-alive",
"Referer": ANIMEPAHE_BASE + "/",
"Referer": ANIMEPAHE_BASE + '/',
"Upgrade-Insecure-Requests": "1",
"Sec-Fetch-Dest": "iframe",
"Sec-Fetch-Mode": "navigate",
@@ -44,7 +44,7 @@ STREAM_HEADERS = {
"Origin": CDN_PROVIDER_BASE,
"Sec-GPC": "1",
"Connection": "keep-alive",
"Referer": CDN_PROVIDER_BASE + "/",
"Referer": CDN_PROVIDER_BASE + '/',
"Sec-Fetch-Dest": "empty",
"Sec-Fetch-Mode": "cors",
"Sec-Fetch-Site": "cross-site",

View File

@@ -98,6 +98,4 @@ def map_to_server(
)
for link in stream_links
]
return Server(
name="kwik", links=links, episode_title=episode.title, headers=headers
)
return Server(name="kwik", links=links, episode_title=episode.title, headers=headers)

View File

@@ -184,11 +184,9 @@ class AnimePahe(BaseAnimeProvider):
headers = {
"User-Agent": self.client.headers["User-Agent"],
"Host": stream_host or CDN_PROVIDER,
**STREAM_HEADERS,
**STREAM_HEADERS
}
yield map_to_server(
episode, translation_type, stream_links, headers=headers
)
yield map_to_server(episode, translation_type, stream_links, headers=headers)
@lru_cache()
def _get_episode_info(

View File

@@ -88,8 +88,6 @@ class BaseSelector(ABC):
*,
preview: Optional[str] = None,
header: Optional[str] = None,
initial_query: Optional[str] = None,
initial_results: Optional[List[str]] = None,
) -> str | None:
"""
Provides dynamic search functionality that reloads results based on user input.
@@ -99,8 +97,6 @@ class BaseSelector(ABC):
search_command: The command to execute for searching/reloading results.
preview: An optional command or string for a preview window.
header: An optional header to display above the choices.
initial_query: An optional initial query to pre-populate the search.
initial_results: Optional list of results to display initially (avoids network request).
Returns:
The string of the chosen item.

View File

@@ -117,42 +117,26 @@ class FzfSelector(BaseSelector):
lines = result.stdout.strip().splitlines()
return lines[-1] if lines else (default or "")
def search(self, prompt, search_command, *, preview=None, header=None, initial_query=None, initial_results=None):
def search(self, prompt, search_command, *, preview=None, header=None):
"""Enhanced search using fzf's --reload flag for dynamic search."""
# Build the header with optional custom header line
display_header = self.header
if header:
display_header = f"{self.header}\n{header}"
commands = [
self.executable,
"--prompt",
f"{prompt.title()}: ",
"--header",
display_header,
self.header,
"--header-first",
"--disabled", # Disable local filtering - rely on external search command
"--bind",
f"change:reload({search_command})",
"--ansi",
]
# If there's an initial query, set it
if initial_query:
commands.extend(["--query", initial_query])
# Only trigger reload on start if we don't have cached results
if not initial_results:
commands.extend(["--bind", f"start:reload({search_command})"])
if preview:
commands.extend(["--preview", preview])
# Use cached results as initial input if provided (avoids network request)
fzf_input = "\n".join(initial_results) if initial_results else ""
result = subprocess.run(
commands,
input=fzf_input,
input="",
stdout=subprocess.PIPE,
text=True,
encoding="utf-8",