Compare commits

...

26 Commits

Author SHA1 Message Date
benexl
beacf4fb2d feat: add command to clear cached GitHub authentication token 2025-12-31 19:54:45 +03:00
benexl
92eb1f12b2 feat: implement GitHub contribution service for normalizer mappings 2025-12-31 19:44:29 +03:00
Benedict Xavier
efa1340e41 Merge pull request #177 from viu-media/dynamic-search-filters
Implement dynamic search enhancements (eg filters) and media info differentiation
2025-12-31 18:57:04 +03:00
Benedict Xavier
ac7e90acdf Update viu_media/assets/scripts/fzf/dynamic_preview.py
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2025-12-31 18:54:02 +03:00
Benedict Xavier
8c5b066019 Update viu_media/assets/scripts/fzf/dynamic_preview.py
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2025-12-31 18:52:57 +03:00
Benedict Xavier
a826f391c1 Update viu_media/core/utils/formatter.py
Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com>
2025-12-31 18:51:17 +03:00
benexl
6a31f4191f fix: remove f-string for filter adjustment message in search results 2025-12-31 18:47:40 +03:00
benexl
b8f77d80e9 feat: implement restore mode for dynamic search with last query and cached results 2025-12-31 18:43:59 +03:00
benexl
6192252d10 feat: enhance shell_safe function to support Python string literals and escape triple quotes 2025-12-31 18:31:40 +03:00
benexl
efed80f4dc feat: update score formatting in format_score_stars function to match media_info.py style 2025-12-31 18:25:05 +03:00
benexl
e49baed46f feat: differentiate between studios and producers in media info and dynamic preview 2025-12-31 18:11:10 +03:00
benexl
6e26ac500d feat: enhance consistency with normal media-info menu 2025-12-31 18:04:58 +03:00
benexl
5db33d2fa0 feat: implement dynamic search filter parser and enhance search script with filter syntax 2025-12-31 17:59:04 +03:00
benexl
0524af6e26 fix(ipc): add checks for Unix domain socket availability in MPVIPCClient and MpvIPCPlayer 2025-12-31 15:47:43 +03:00
benexl
a2fc9e442d fix: add libglib2.0-dev installation for Linux system dependencies in GitHub Actions workflow 2025-12-31 15:22:37 +03:00
benexl
f9ca8bbd79 fix: add installation of system dependencies for Linux in GitHub Actions workflow 2025-12-31 15:18:24 +03:00
benexl
dd9d9695e7 fix: remove unused imports for cleaner code 2025-12-31 15:14:14 +03:00
benexl
c9d948ae4b feat: add GitHub Actions workflow for building release binaries across platforms 2025-12-31 15:09:06 +03:00
benexl
b9766af11a fix(pyinstaller): update platform-specific settings and optimize EXE configuration 2025-12-31 15:02:29 +03:00
benexl
9d72a50916 fix: replace sys.executable with get_python_executable for better compatibility 2025-12-31 14:51:50 +03:00
benexl
acb14d025c fix: enhance menu loading to support PyInstaller compatibility with explicit module listing 2025-12-31 14:42:44 +03:00
benexl
ba9b170ba8 fix: update menu loading mechanism to support pkgutil for dynamic imports 2025-12-31 14:31:35 +03:00
benexl
ecc4de6ae6 ci: update paths 2025-12-31 14:21:50 +03:00
benexl
e065c8e8fc fix(normalizer): add anime title mapping for "Burichi -" 2025-12-31 13:10:47 +03:00
benexl
32df0503d0 fix(dependencies): update optional dependencies for platform-specific functionality 2025-12-31 13:05:56 +03:00
Benedict Xavier
11449378e9 docs: Revise Termux installation instructions in README
Updated installation instructions for Termux, including required packages and optional dependencies.
2025-12-30 14:51:36 +03:00
27 changed files with 1769 additions and 148 deletions

152
.github/workflows/release-binaries.yml vendored Normal file
View File

@@ -0,0 +1,152 @@
name: Build Release Binaries
on:
release:
types: [published]
workflow_dispatch:
inputs:
tag:
description: "Tag/version to build (leave empty for latest)"
required: false
type: string
permissions:
contents: write
jobs:
build:
strategy:
fail-fast: false
matrix:
include:
- os: ubuntu-latest
target: linux
asset_name: viu-linux-x86_64
executable: viu
- os: windows-latest
target: windows
asset_name: viu-windows-x86_64.exe
executable: viu.exe
- os: macos-latest
target: macos
asset_name: viu-macos-x86_64
executable: viu
runs-on: ${{ matrix.os }}
steps:
- name: Checkout repository
uses: actions/checkout@v4
with:
ref: ${{ github.event.inputs.tag || github.ref }}
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.11"
- name: Install uv
uses: astral-sh/setup-uv@v3
with:
enable-cache: true
- name: Install system dependencies (Linux)
if: runner.os == 'Linux'
run: |
sudo apt-get update
sudo apt-get install -y libdbus-1-dev libglib2.0-dev
- name: Install dependencies
run: uv sync --all-extras --all-groups
- name: Build executable with PyInstaller
run: uv run pyinstaller bundle/pyinstaller.spec --distpath dist --workpath build/pyinstaller --clean
- name: Rename executable
shell: bash
run: mv dist/${{ matrix.executable }} dist/${{ matrix.asset_name }}
- name: Upload artifact
uses: actions/upload-artifact@v4
with:
name: ${{ matrix.asset_name }}
path: dist/${{ matrix.asset_name }}
if-no-files-found: error
- name: Upload to Release
if: github.event_name == 'release'
uses: softprops/action-gh-release@v2
with:
files: dist/${{ matrix.asset_name }}
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
# Build for macOS ARM (Apple Silicon)
build-macos-arm:
runs-on: macos-14
steps:
- name: Checkout repository
uses: actions/checkout@v4
with:
ref: ${{ github.event.inputs.tag || github.ref }}
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: "3.11"
- name: Install uv
uses: astral-sh/setup-uv@v3
with:
enable-cache: true
- name: Install dependencies
run: uv sync --all-extras --all-groups
- name: Build executable with PyInstaller
run: uv run pyinstaller bundle/pyinstaller.spec --distpath dist --workpath build/pyinstaller --clean
- name: Rename executable
run: mv dist/viu dist/viu-macos-arm64
- name: Upload artifact
uses: actions/upload-artifact@v4
with:
name: viu-macos-arm64
path: dist/viu-macos-arm64
if-no-files-found: error
- name: Upload to Release
if: github.event_name == 'release'
uses: softprops/action-gh-release@v2
with:
files: dist/viu-macos-arm64
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
# Create checksums after all builds complete
checksums:
needs: [build, build-macos-arm]
runs-on: ubuntu-latest
if: github.event_name == 'release'
steps:
- name: Download all artifacts
uses: actions/download-artifact@v4
with:
path: artifacts
merge-multiple: true
- name: Generate checksums
run: |
cd artifacts
sha256sum * > SHA256SUMS.txt
cat SHA256SUMS.txt
- name: Upload checksums to Release
uses: softprops/action-gh-release@v2
with:
files: artifacts/SHA256SUMS.txt
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}

View File

@@ -114,38 +114,78 @@ uv tool install "viu-media[notifications]" # For desktop notifications
```
#### Termux
You may have to have rust installed see this issue: https://github.com/pydantic/pydantic-core/issues/1012#issuecomment-2511269688.
```bash
pkg install python # though uv will probably install python for you, but doesn't hurt to have it :)
pkg install rust # maybe required cause of pydantic
# Recommended (with pip due to more control)
pkg install python
pkg install rust # required cause of pydantic
# NOTE: order matters
# get pydantic from the termux user repository
pip install pydantic --extra-index-url https://termux-user-repository.github.io/pypi/
# the above will take a while if you want to see more output and feel like sth is happening lol
pip install pydantic --extra-index-url https://termux-user-repository.github.io/pypi/ -v
# now you can install viu
pip install viu-media
# you may need to install pydantic manually
python -m pip install pydantic --extra-index-url https://termux-user-repository.github.io/pypi/ # may also be necessary incase the above fails
# === optional deps ===
# if you have reach here awesome lol :)
# add yt-dlp by
# yt-dlp for downloading m3u8 and hls streams
pip install yt-dlp[default,curl-cffi]
# prefer without standard and manually install the things you need lxml, yt-dlp and
pip install viu-media[standard]
# you may also need ffmpeg for processing the videos
pkg install ffmpeg
# you may need to manually install lxml and plyer manually eg
python -m pip install lxml --extra-index-url https://termux-user-repository.github.io/pypi/ # may also be necessary incase the above fails
# tip if you also want yt functionality
pip install yt-dlp-ejs
# Alternative With Uv may work, no promises
pkg install uv
# you require js runtime
# eg the recommended one
pkg install deno
uv tool install viu-media
# for faster fuzzy search
pip install thefuzz
# and to add yt-dlp only you can do
uv tool install viu-media --with yt-dlp[default,curl-cffi]
# if you want faster scraping, though barely noticeable lol
pip install lxml --extra-index-url https://termux-user-repository.github.io/pypi/
# or though may fail, cause of lxml and plyer, in that case try to install manually
uv tool install viu-media[standard]
# if compilation fails you need to have
pkg install libxml2 libxslt
# == ui setup ==
pkg install fzf
# then enable fzf in the config
viu --selector fzf config --update
# if you want previews as well specify preview option
# though images arent that pretty lol, so you can stick to text over full
viu --preview text config --update
# if you set preview to full you need a terminal image renderer
pkg install chafa
# == player setup ==
# for this you need to strictly install from playstore
# search for mpv or vlc (recommended, since has nicer ui)
# the only limitation is currently its not possible to pass headers to the android players
# through android intents
# so use servers like sharepoint and wixmp
# though this is not an issue when it comes to downloading ;)
# if you have installed using 'pkg' uninstall it
# okey now you are all set, i promise the hussle is worth it lol :)
# posted a video of it working to motivate you
# note i recorded it from waydroid which is android for linux sought of like an emulator(bluestacks for example)
```
https://github.com/user-attachments/assets/0c628421-a439-4dea-91bb-7153e8f20ccf
```
#### Using pipx (for isolated environments)
```bash

View File

@@ -1,26 +1,46 @@
# -*- mode: python ; coding: utf-8 -*-
import sys
from PyInstaller.utils.hooks import collect_data_files, collect_submodules
block_cipher = None
# Platform-specific settings
is_windows = sys.platform == 'win32'
is_macos = sys.platform == 'darwin'
# Collect all required data files
datas = [
('viu/assets/*', 'viu/assets'),
('../viu_media/assets', 'viu_media/assets'),
]
# Collect all required hidden imports
# Include viu_media and all its submodules to ensure menu modules are bundled
hiddenimports = [
'click',
'rich',
'requests',
'yt_dlp',
'python_mpv',
'fuzzywuzzy',
'viu',
] + collect_submodules('viu')
'viu_media',
'viu_media.cli.interactive.menu',
'viu_media.cli.interactive.menu.media',
# Explicit menu modules (PyInstaller doesn't always pick these up)
'viu_media.cli.interactive.menu.media.downloads',
'viu_media.cli.interactive.menu.media.download_episodes',
'viu_media.cli.interactive.menu.media.dynamic_search',
'viu_media.cli.interactive.menu.media.episodes',
'viu_media.cli.interactive.menu.media.main',
'viu_media.cli.interactive.menu.media.media_actions',
'viu_media.cli.interactive.menu.media.media_airing_schedule',
'viu_media.cli.interactive.menu.media.media_characters',
'viu_media.cli.interactive.menu.media.media_review',
'viu_media.cli.interactive.menu.media.player_controls',
'viu_media.cli.interactive.menu.media.play_downloads',
'viu_media.cli.interactive.menu.media.provider_search',
'viu_media.cli.interactive.menu.media.results',
'viu_media.cli.interactive.menu.media.servers',
] + collect_submodules('viu_media')
a = Analysis(
['./viu/viu.py'], # Changed entry point
['../viu_media/viu.py'],
pathex=[],
binaries=[],
datas=datas,
@@ -32,16 +52,18 @@ a = Analysis(
win_no_prefer_redirects=False,
win_private_assemblies=False,
cipher=block_cipher,
strip=True, # Strip debug information
optimize=2 # Optimize bytecode noarchive=False
noarchive=False,
)
pyz = PYZ(
a.pure,
a.zipped_data,
optimize=2 # Optimize bytecode cipher=block_cipher
cipher=block_cipher,
)
# Icon path - only use .ico on Windows
icon_path = '../viu_media/assets/icons/logo.ico' if is_windows else None
exe = EXE(
pyz,
a.scripts,
@@ -52,7 +74,7 @@ exe = EXE(
name='viu',
debug=False,
bootloader_ignore_signals=False,
strip=True,
strip=not is_windows, # strip doesn't work well on Windows without proper tools
upx=True,
upx_exclude=[],
runtime_tmpdir=None,
@@ -61,5 +83,5 @@ exe = EXE(
target_arch=None,
codesign_identity=None,
entitlements_file=None,
icon='viu/assets/logo.ico'
icon=icon_path,
)

View File

@@ -6,11 +6,11 @@ license = "UNLICENSE"
readme = "README.md"
requires-python = ">=3.11"
dependencies = [
"click>=8.1.7",
"httpx>=0.28.1",
"inquirerpy>=0.3.4",
"pydantic>=2.11.7",
"rich>=13.9.2",
"click>=8.1.7",
"httpx>=0.28.1",
"inquirerpy>=0.3.4",
"pydantic>=2.11.7",
"rich>=13.9.2",
]
[project.scripts]
@@ -18,32 +18,27 @@ viu = 'viu_media:Cli'
[project.optional-dependencies]
standard = [
"thefuzz>=0.22.1",
"yt-dlp>=2025.7.21",
"pycryptodomex>=3.23.0",
"pypiwin32; sys_platform == 'win32'", # For Windows-specific functionality
"pyobjc; sys_platform == 'darwin'", # For macOS-specific functionality
"dbus-python; sys_platform == 'linux'", # For Linux-specific functionality (e.g., notifications),
"plyer>=2.1.0",
"lxml>=6.0.0"
"thefuzz>=0.22.1",
"yt-dlp>=2025.7.21",
"pycryptodomex>=3.23.0",
"pypiwin32; sys_platform == 'win32'", # For Windows-specific functionality
"pyobjc; sys_platform == 'darwin'", # For macOS-specific functionality
"dbus-python; sys_platform == 'linux'", # For Linux-specific functionality (e.g., notifications),
"plyer>=2.1.0",
"lxml>=6.0.0",
]
notifications = [
"dbus-python>=1.4.0",
"pypiwin32; sys_platform == 'win32'", # For Windows-specific functionality
"pyobjc; sys_platform == 'darwin'", # For macOS-specific functionality
"dbus-python>=1.4.0; sys_platform == 'linux'",
"plyer>=2.1.0",
]
mpv = [
"mpv>=1.0.7",
]
mpv = ["mpv>=1.0.7"]
torrent = ["libtorrent>=2.0.11"]
lxml = ["lxml>=6.0.0"]
discord = ["pypresence>=4.3.0"]
download = [
"pycryptodomex>=3.23.0",
"yt-dlp>=2025.7.21",
]
torrents = [
"libtorrent>=2.0.11",
]
download = ["pycryptodomex>=3.23.0", "yt-dlp>=2025.7.21"]
torrents = ["libtorrent>=2.0.11"]
[build-system]
requires = ["hatchling"]
@@ -51,12 +46,12 @@ build-backend = "hatchling.build"
[dependency-groups]
dev = [
"pre-commit>=4.0.1",
"pyinstaller>=6.11.1",
"pyright>=1.1.384",
"pytest>=8.3.3",
"pytest-httpx>=0.35.0",
"ruff>=0.6.9",
"pre-commit>=4.0.1",
"pyinstaller>=6.11.1",
"pyright>=1.1.384",
"pytest>=8.3.3",
"pytest-httpx>=0.35.0",
"ruff>=0.6.9",
]
[tool.pytest.ini_options]

8
uv.lock generated
View File

@@ -3768,8 +3768,10 @@ mpv = [
{ name = "mpv" },
]
notifications = [
{ name = "dbus-python" },
{ name = "dbus-python", marker = "sys_platform == 'linux'" },
{ name = "plyer" },
{ name = "pyobjc", marker = "sys_platform == 'darwin'" },
{ name = "pypiwin32", marker = "sys_platform == 'win32'" },
]
standard = [
{ name = "dbus-python", marker = "sys_platform == 'linux'" },
@@ -3801,8 +3803,8 @@ dev = [
[package.metadata]
requires-dist = [
{ name = "click", specifier = ">=8.1.7" },
{ name = "dbus-python", marker = "sys_platform == 'linux' and extra == 'notifications'", specifier = ">=1.4.0" },
{ name = "dbus-python", marker = "sys_platform == 'linux' and extra == 'standard'" },
{ name = "dbus-python", marker = "extra == 'notifications'", specifier = ">=1.4.0" },
{ name = "httpx", specifier = ">=0.28.1" },
{ name = "inquirerpy", specifier = ">=0.3.4" },
{ name = "libtorrent", marker = "extra == 'torrent'", specifier = ">=2.0.11" },
@@ -3815,7 +3817,9 @@ requires-dist = [
{ name = "pycryptodomex", marker = "extra == 'download'", specifier = ">=3.23.0" },
{ name = "pycryptodomex", marker = "extra == 'standard'", specifier = ">=3.23.0" },
{ name = "pydantic", specifier = ">=2.11.7" },
{ name = "pyobjc", marker = "sys_platform == 'darwin' and extra == 'notifications'" },
{ name = "pyobjc", marker = "sys_platform == 'darwin' and extra == 'standard'" },
{ name = "pypiwin32", marker = "sys_platform == 'win32' and extra == 'notifications'" },
{ name = "pypiwin32", marker = "sys_platform == 'win32' and extra == 'standard'" },
{ name = "pypresence", marker = "extra == 'discord'", specifier = ">=4.3.0" },
{ name = "rich", specifier = ">=13.9.2" },

View File

@@ -5,7 +5,8 @@
"Dungeon ni Deai o Motomeru no wa Machigatte Iru Darouka": "Dungeon ni Deai wo Motomeru no wa Machigatteiru Darou ka",
"Hazurewaku no \"Joutai Ijou Skill\" de Saikyou ni Natta Ore ga Subete wo Juurin suru made": "Hazure Waku no [Joutai Ijou Skill] de Saikyou ni Natta Ore ga Subete wo Juurin Suru made",
"Re:Zero kara Hajimeru Isekai Seikatsu Season 3": "Re:Zero kara Hajimeru Isekai Seikatsu 3rd Season",
"Hanka×Hanka (2011)": "Hunter × Hunter (2011)"
"Hanka×Hanka (2011)": "Hunter × Hunter (2011)",
"Burichi -": "bleach"
},
"hianime": {
"My Star": "Oshi no Ko"

View File

@@ -0,0 +1,323 @@
#!/usr/bin/env python3
"""
Filter Parser for Dynamic Search
This module provides a parser for the special filter syntax used in dynamic search.
Filter syntax allows users to add filters inline with their search query.
SYNTAX:
@filter:value - Apply a filter with the given value
@filter:value1,value2 - Apply multiple values (for array filters)
@filter:!value - Exclude/negate a filter value
SUPPORTED FILTERS:
@genre:action,comedy - Filter by genres
@genre:!hentai - Exclude genre
@status:airing - Filter by status (airing, finished, upcoming, cancelled, hiatus)
@year:2024 - Filter by season year
@season:winter - Filter by season (winter, spring, summer, fall)
@format:tv,movie - Filter by format (tv, movie, ova, ona, special, music)
@sort:score - Sort by (score, popularity, trending, title, date)
@score:>80 - Minimum score
@score:<50 - Maximum score
@popularity:>10000 - Minimum popularity
@onlist - Only show anime on user's list
@onlist:false - Only show anime NOT on user's list
EXAMPLES:
"naruto @genre:action @status:finished"
"isekai @year:2024 @season:winter @sort:score"
"@genre:action,adventure @status:airing"
"romance @genre:!hentai @format:tv,movie"
"""
import re
from typing import Any, Dict, List, Optional, Tuple
# Mapping of user-friendly filter names to GraphQL variable names
FILTER_ALIASES = {
# Status aliases
"airing": "RELEASING",
"releasing": "RELEASING",
"finished": "FINISHED",
"completed": "FINISHED",
"upcoming": "NOT_YET_RELEASED",
"not_yet_released": "NOT_YET_RELEASED",
"unreleased": "NOT_YET_RELEASED",
"cancelled": "CANCELLED",
"canceled": "CANCELLED",
"hiatus": "HIATUS",
"paused": "HIATUS",
# Format aliases
"tv": "TV",
"tv_short": "TV_SHORT",
"tvshort": "TV_SHORT",
"movie": "MOVIE",
"film": "MOVIE",
"ova": "OVA",
"ona": "ONA",
"special": "SPECIAL",
"music": "MUSIC",
# Season aliases
"winter": "WINTER",
"spring": "SPRING",
"summer": "SUMMER",
"fall": "FALL",
"autumn": "FALL",
# Sort aliases
"score": "SCORE_DESC",
"score_desc": "SCORE_DESC",
"score_asc": "SCORE",
"popularity": "POPULARITY_DESC",
"popularity_desc": "POPULARITY_DESC",
"popularity_asc": "POPULARITY",
"trending": "TRENDING_DESC",
"trending_desc": "TRENDING_DESC",
"trending_asc": "TRENDING",
"title": "TITLE_ROMAJI",
"title_desc": "TITLE_ROMAJI_DESC",
"date": "START_DATE_DESC",
"date_desc": "START_DATE_DESC",
"date_asc": "START_DATE",
"newest": "START_DATE_DESC",
"oldest": "START_DATE",
"favourites": "FAVOURITES_DESC",
"favorites": "FAVOURITES_DESC",
"episodes": "EPISODES_DESC",
}
# Genre name normalization (lowercase -> proper case)
GENRE_NAMES = {
"action": "Action",
"adventure": "Adventure",
"comedy": "Comedy",
"drama": "Drama",
"ecchi": "Ecchi",
"fantasy": "Fantasy",
"horror": "Horror",
"mahou_shoujo": "Mahou Shoujo",
"mahou": "Mahou Shoujo",
"magical_girl": "Mahou Shoujo",
"mecha": "Mecha",
"music": "Music",
"mystery": "Mystery",
"psychological": "Psychological",
"romance": "Romance",
"sci-fi": "Sci-Fi",
"scifi": "Sci-Fi",
"sci_fi": "Sci-Fi",
"slice_of_life": "Slice of Life",
"sol": "Slice of Life",
"sports": "Sports",
"supernatural": "Supernatural",
"thriller": "Thriller",
"hentai": "Hentai",
}
# Filter pattern: @key:value or @key (boolean flags)
FILTER_PATTERN = re.compile(r"@(\w+)(?::([^\s]+))?", re.IGNORECASE)
# Comparison operators for numeric filters
COMPARISON_PATTERN = re.compile(r"^([<>]=?)?(\d+)$")
def normalize_value(value: str, value_type: str) -> str:
"""Normalize a filter value based on its type."""
value_lower = value.lower().strip()
if value_type == "genre":
return GENRE_NAMES.get(value_lower, value.title())
elif value_type in ("status", "format", "season", "sort"):
return FILTER_ALIASES.get(value_lower, value.upper())
return value
def parse_value_list(value_str: str) -> Tuple[List[str], List[str]]:
"""
Parse a comma-separated value string, separating includes from excludes.
Returns:
Tuple of (include_values, exclude_values)
"""
includes = []
excludes = []
for val in value_str.split(","):
val = val.strip()
if not val:
continue
if val.startswith("!"):
excludes.append(val[1:])
else:
includes.append(val)
return includes, excludes
def parse_comparison(value: str) -> Tuple[Optional[str], Optional[int]]:
"""
Parse a comparison value like ">80" or "<50".
Returns:
Tuple of (operator, number) or (None, None) if invalid
"""
match = COMPARISON_PATTERN.match(value)
if match:
operator = match.group(1) or ">" # Default to greater than
number = int(match.group(2))
return operator, number
return None, None
def parse_filters(query: str) -> Tuple[str, Dict[str, Any]]:
"""
Parse a search query and extract filter directives.
Args:
query: The full search query including filter syntax
Returns:
Tuple of (clean_query, filters_dict)
- clean_query: The query with filter syntax removed
- filters_dict: Dictionary of GraphQL variables to apply
"""
filters: Dict[str, Any] = {}
# Find all filter matches
matches = list(FILTER_PATTERN.finditer(query))
for match in matches:
filter_name = match.group(1).lower()
filter_value = match.group(2) # May be None for boolean flags
# Handle different filter types
if filter_name == "genre":
if filter_value:
includes, excludes = parse_value_list(filter_value)
if includes:
normalized = [normalize_value(v, "genre") for v in includes]
filters.setdefault("genre_in", []).extend(normalized)
if excludes:
normalized = [normalize_value(v, "genre") for v in excludes]
filters.setdefault("genre_not_in", []).extend(normalized)
elif filter_name == "status":
if filter_value:
includes, excludes = parse_value_list(filter_value)
if includes:
normalized = [normalize_value(v, "status") for v in includes]
filters.setdefault("status_in", []).extend(normalized)
if excludes:
normalized = [normalize_value(v, "status") for v in excludes]
filters.setdefault("status_not_in", []).extend(normalized)
elif filter_name == "format":
if filter_value:
includes, _ = parse_value_list(filter_value)
if includes:
normalized = [normalize_value(v, "format") for v in includes]
filters.setdefault("format_in", []).extend(normalized)
elif filter_name == "year":
if filter_value:
try:
filters["seasonYear"] = int(filter_value)
except ValueError:
pass # Invalid year, skip
elif filter_name == "season":
if filter_value:
filters["season"] = normalize_value(filter_value, "season")
elif filter_name == "sort":
if filter_value:
sort_val = normalize_value(filter_value, "sort")
filters["sort"] = [sort_val]
elif filter_name == "score":
if filter_value:
op, num = parse_comparison(filter_value)
if num is not None:
if op in (">", ">="):
filters["averageScore_greater"] = num
elif op in ("<", "<="):
filters["averageScore_lesser"] = num
elif filter_name == "popularity":
if filter_value:
op, num = parse_comparison(filter_value)
if num is not None:
if op in (">", ">="):
filters["popularity_greater"] = num
elif op in ("<", "<="):
filters["popularity_lesser"] = num
elif filter_name == "onlist":
if filter_value is None or filter_value.lower() in ("true", "yes", "1"):
filters["on_list"] = True
elif filter_value.lower() in ("false", "no", "0"):
filters["on_list"] = False
elif filter_name == "tag":
if filter_value:
includes, excludes = parse_value_list(filter_value)
if includes:
# Tags use title case typically
normalized = [v.replace("_", " ").title() for v in includes]
filters.setdefault("tag_in", []).extend(normalized)
if excludes:
normalized = [v.replace("_", " ").title() for v in excludes]
filters.setdefault("tag_not_in", []).extend(normalized)
# Remove filter syntax from query to get clean search text
clean_query = FILTER_PATTERN.sub("", query).strip()
# Clean up multiple spaces
clean_query = re.sub(r"\s+", " ", clean_query).strip()
return clean_query, filters
def get_help_text() -> str:
"""Return a help string describing the filter syntax."""
return """
╭─────────────────── Filter Syntax Help ───────────────────╮
│ │
│ @genre:action,comedy Filter by genres │
│ @genre:!hentai Exclude genre │
│ @status:airing Status: airing, finished, │
│ upcoming, cancelled, hiatus │
│ @year:2024 Filter by year │
│ @season:winter winter, spring, summer, fall │
│ @format:tv,movie tv, movie, ova, ona, special │
│ @sort:score score, popularity, trending, │
│ date, title, newest, oldest │
│ @score:>80 Minimum score │
│ @score:<50 Maximum score │
│ @popularity:>10000 Minimum popularity │
│ @onlist Only on your list │
│ @onlist:false Not on your list │
│ @tag:isekai,reincarnation Filter by tags │
│ │
│ Examples: │
│ naruto @genre:action @status:finished │
│ @genre:action,adventure @year:2024 @sort:score │
│ isekai @season:winter @year:2024 │
│ │
╰──────────────────────────────────────────────────────────╯
""".strip()
if __name__ == "__main__":
# Test the parser
import json
import sys
if len(sys.argv) > 1:
test_query = " ".join(sys.argv[1:])
clean, filters = parse_filters(test_query)
print(f"Original: {test_query}")
print(f"Clean query: {clean}")
print(f"Filters: {json.dumps(filters, indent=2)}")
else:
print(get_help_text())

View File

@@ -45,6 +45,15 @@ def format_number(num):
return f"{num:,}"
def format_score_stars(score):
"""Format score as stars out of 6."""
if score is None:
return "N/A"
# Convert 0-100 score to 0-6 stars, capped at 6 for consistency
stars = min(round(score * 6 / 100), 6)
return "" * stars + f" ({score}/100)"
def format_date(date_obj):
"""Format date object to string."""
if not date_obj or date_obj == "null":
@@ -342,31 +351,68 @@ def main():
# Extract data
status = media.get("status", "Unknown")
format_type = media.get("format", "Unknown")
episodes = media.get("episodes", "?")
episodes = media.get("episodes", "??")
duration = media.get("duration")
duration_str = f"{duration} min" if duration else "Unknown"
duration_str = f"{duration} min/ep" if duration else "Unknown"
score = media.get("averageScore")
score_str = f"{score}/100" if score else "N/A"
score_str = format_score_stars(score)
favourites = format_number(media.get("favourites", 0))
popularity = format_number(media.get("popularity", 0))
genres = ", ".join(media.get("genres", [])[:5]) or "Unknown"
genres = ", ".join(media.get("genres", [])) or "Unknown"
start_date = format_date(media.get("startDate"))
end_date = format_date(media.get("endDate"))
studios_list = media.get("studios", {}).get("nodes", [])
studios = ", ".join([s.get("name", "") for s in studios_list[:3]]) or "Unknown"
# Studios are those with isAnimationStudio=true
studios = ", ".join([s["name"] for s in studios_list if s.get("name") and s.get("isAnimationStudio")]) or "N/A"
# Producers are those with isAnimationStudio=false
producers = ", ".join([s["name"] for s in studios_list if s.get("name") and not s.get("isAnimationStudio")]) or "N/A"
synonyms_list = media.get("synonyms", [])
synonyms = ", ".join(synonyms_list[:3]) or "N/A"
# Include romaji in synonyms if different from title
romaji = title_obj.get("romaji")
if romaji and romaji != title and romaji not in synonyms_list:
synonyms_list = [romaji] + synonyms_list
synonyms = ", ".join(synonyms_list) or "N/A"
# Tags
tags_list = media.get("tags", [])
tags = ", ".join([t.get("name", "") for t in tags_list if t.get("name")]) or "N/A"
# Next airing episode
next_airing = media.get("nextAiringEpisode")
if next_airing:
next_ep = next_airing.get("episode", "?")
airing_at = next_airing.get("airingAt")
if airing_at:
from datetime import datetime
try:
dt = datetime.fromtimestamp(airing_at)
next_episode_str = f"Episode {next_ep} on {dt.strftime('%A, %d %B %Y at %H:%M')}"
except (ValueError, OSError):
next_episode_str = f"Episode {next_ep}"
else:
next_episode_str = f"Episode {next_ep}"
else:
next_episode_str = "N/A"
# User list status
media_list_entry = media.get("mediaListEntry")
if media_list_entry:
user_status = media_list_entry.get("status", "NOT_ON_LIST")
user_progress = f"Episode {media_list_entry.get('progress', 0)}"
else:
user_status = "NOT_ON_LIST"
user_progress = "0"
description = media.get("description", "No description available.")
description = strip_markdown(description)
# Print sections matching media_info.py structure
# Print sections matching media_info.py structure exactly
rows = [
("Score", score_str),
("Favorites", favourites),
@@ -376,16 +422,17 @@ def main():
print_rule(SEPARATOR_COLOR)
for key, value in rows:
print_table_row(key, value, HEADER_COLOR, 0, 0)
print_table_row(key, value, HEADER_COLOR, 15, term_width - 20)
rows = [
("Episodes", str(episodes)),
("Duration", duration_str),
("Next Episode", next_episode_str),
]
print_rule(SEPARATOR_COLOR)
for key, value in rows:
print_table_row(key, value, HEADER_COLOR, 0, 0)
print_table_row(key, value, HEADER_COLOR, 15, term_width - 20)
rows = [
("Genres", genres),
@@ -394,7 +441,16 @@ def main():
print_rule(SEPARATOR_COLOR)
for key, value in rows:
print_table_row(key, value, HEADER_COLOR, 0, 0)
print_table_row(key, value, HEADER_COLOR, 15, term_width - 20)
rows = [
("List Status", user_status),
("Progress", user_progress),
]
print_rule(SEPARATOR_COLOR)
for key, value in rows:
print_table_row(key, value, HEADER_COLOR, 15, term_width - 20)
rows = [
("Start Date", start_date),
@@ -403,15 +459,16 @@ def main():
print_rule(SEPARATOR_COLOR)
for key, value in rows:
print_table_row(key, value, HEADER_COLOR, 0, 0)
print_table_row(key, value, HEADER_COLOR, 15, term_width - 20)
rows = [
("Studios", studios),
("Producers", producers),
]
print_rule(SEPARATOR_COLOR)
for key, value in rows:
print_table_row(key, value, HEADER_COLOR, 0, 0)
print_table_row(key, value, HEADER_COLOR, 15, term_width - 20)
rows = [
("Synonyms", synonyms),
@@ -419,7 +476,15 @@ def main():
print_rule(SEPARATOR_COLOR)
for key, value in rows:
print_table_row(key, value, HEADER_COLOR, 0, 0)
print_table_row(key, value, HEADER_COLOR, 15, term_width - 20)
rows = [
("Tags", tags),
]
print_rule(SEPARATOR_COLOR)
for key, value in rows:
print_table_row(key, value, HEADER_COLOR, 15, term_width - 20)
print_rule(SEPARATOR_COLOR)
print(wrap_text(description, term_width))

View File

@@ -67,6 +67,7 @@ for key, value in rows:
rows = [
("Studios", "{STUDIOS}"),
("Producers", "{PRODUCERS}"),
]
print_rule(SEPARATOR_COLOR)

View File

@@ -5,6 +5,18 @@
# This script is a template for dynamic search functionality in fzf.
# The placeholders in curly braces, like {GRAPHQL_ENDPOINT} are dynamically
# filled by Python using .replace() during runtime.
#
# FILTER SYNTAX:
# @genre:action,comedy Filter by genres
# @genre:!hentai Exclude genre
# @status:airing Status: airing, finished, upcoming, cancelled, hiatus
# @year:2024 Filter by year
# @season:winter winter, spring, summer, fall
# @format:tv,movie tv, movie, ova, ona, special
# @sort:score score, popularity, trending, date, title
# @score:>80 / @score:<50 Min/max score
# @onlist / @onlist:false Filter by list status
# @tag:isekai Filter by tags
import json
import sys
@@ -12,9 +24,13 @@ from pathlib import Path
from urllib import request
from urllib.error import URLError
# Import the filter parser
from _filter_parser import parse_filters
# --- Template Variables (Injected by Python) ---
GRAPHQL_ENDPOINT = "{GRAPHQL_ENDPOINT}"
SEARCH_RESULTS_FILE = Path("{SEARCH_RESULTS_FILE}")
LAST_QUERY_FILE = Path("{LAST_QUERY_FILE}")
AUTH_HEADER = "{AUTH_HEADER}"
# The GraphQL query is injected as a properly escaped JSON string
@@ -22,17 +38,29 @@ GRAPHQL_QUERY = "{GRAPHQL_QUERY}"
# --- Get Query from fzf ---
# fzf passes the current query as the first argument when using --bind change:reload
QUERY = sys.argv[1] if len(sys.argv) > 1 else ""
RAW_QUERY = sys.argv[1] if len(sys.argv) > 1 else ""
# If query is empty, exit with empty results
if not QUERY.strip():
print("")
# Parse the query to extract filters and clean search text
QUERY, PARSED_FILTERS = parse_filters(RAW_QUERY)
# If query is empty and no filters, show help hint
if not RAW_QUERY.strip():
print("💡 Tip: Use @genre:action @status:airing for filters (type @help for syntax)")
sys.exit(0)
# Show filter help if requested
if RAW_QUERY.strip().lower() in ("@help", "@?", "@h"):
from _filter_parser import get_help_text
print(get_help_text())
sys.exit(0)
# If we only have filters (no search text), that's valid - we'll search with filters only
# But if we have neither query nor filters, we already showed the help hint above
def make_graphql_request(
endpoint: str, query: str, variables: dict, auth_token: str = ""
) -> dict | None:
) -> tuple[dict | None, str | None]:
"""
Make a GraphQL request to the specified endpoint.
@@ -43,7 +71,7 @@ def make_graphql_request(
auth_token: Optional authorization token (Bearer token)
Returns:
Response JSON as a dictionary, or None if request fails
Tuple of (Response JSON, error message) - one will be None
"""
payload = {"query": query, "variables": variables}
@@ -61,10 +89,13 @@ def make_graphql_request(
)
with request.urlopen(req, timeout=10) as response:
return json.loads(response.read().decode("utf-8"))
except (URLError, json.JSONDecodeError, Exception) as e:
print(f"❌ Request failed: {e}", file=sys.stderr)
return None
return json.loads(response.read().decode("utf-8")), None
except URLError as e:
return None, f"Network error: {e.reason}"
except json.JSONDecodeError as e:
return None, f"Invalid response: {e}"
except Exception as e:
return None, f"Request error: {e}"
def extract_title(media_item: dict) -> str:
@@ -90,34 +121,67 @@ def main():
# Ensure parent directory exists
SEARCH_RESULTS_FILE.parent.mkdir(parents=True, exist_ok=True)
# Create GraphQL variables
# Base GraphQL variables
variables = {
"query": QUERY,
"type": "ANIME",
"per_page": 50,
"genre_not_in": ["Hentai"],
"genre_not_in": ["Hentai"], # Default exclusion
}
# Add search query if provided
if QUERY:
variables["query"] = QUERY
# Apply parsed filters from the filter syntax
for key, value in PARSED_FILTERS.items():
# Handle array merging for _in and _not_in fields
if key.endswith("_in") or key.endswith("_not_in"):
if key in variables:
# Merge arrays, avoiding duplicates
existing = set(variables[key])
existing.update(value)
variables[key] = list(existing)
else:
variables[key] = value
else:
variables[key] = value
# Make the GraphQL request
response = make_graphql_request(
response, error = make_graphql_request(
GRAPHQL_ENDPOINT, GRAPHQL_QUERY, variables, AUTH_HEADER
)
if response is None:
print("Search failed")
if error:
print(f"{error}")
# Also show what we tried to search for debugging
print(f" Query: {QUERY or '(none)'}")
print(f" Filters: {json.dumps(PARSED_FILTERS) if PARSED_FILTERS else '(none)'}")
sys.exit(1)
if response is None:
print("❌ Search failed: No response received")
sys.exit(1)
# Check for GraphQL errors first (these come in the response body)
if "errors" in response:
errors = response["errors"]
if errors:
# Extract error messages
error_msgs = [e.get("message", str(e)) for e in errors]
print(f"❌ API Error: {'; '.join(error_msgs)}")
# Show variables for debugging
print(f" Filters used: {json.dumps(PARSED_FILTERS, indent=2) if PARSED_FILTERS else '(none)'}")
sys.exit(1)
# Save the raw response for later processing by dynamic_search.py
try:
with open(SEARCH_RESULTS_FILE, "w", encoding="utf-8") as f:
json.dump(response, f, ensure_ascii=False, indent=2)
# Also save the raw query so it can be restored when going back
with open(LAST_QUERY_FILE, "w", encoding="utf-8") as f:
f.write(RAW_QUERY)
except IOError as e:
print(f"❌ Failed to save results: {e}", file=sys.stderr)
sys.exit(1)
# Parse and display results
if "errors" in response:
print(f"❌ Search error: {response['errors']}")
print(f"❌ Failed to save results: {e}")
sys.exit(1)
# Navigate the response structure
@@ -126,7 +190,9 @@ def main():
media_list = page.get("media", [])
if not media_list:
print(" No results found")
print("🔍 No results found")
if PARSED_FILTERS:
print(" Try adjusting your filters")
sys.exit(0)
# Output titles for fzf (one per line)
@@ -141,5 +207,5 @@ if __name__ == "__main__":
except KeyboardInterrupt:
sys.exit(0)
except Exception as e:
print(f"❌ Unexpected error: {e}", file=sys.stderr)
print(f"❌ Unexpected error: {type(e).__name__}: {e}")
sys.exit(1)

View File

@@ -30,6 +30,9 @@ from ...core.config import AppConfig
\b
# view the current contents of your config
viu config --view
\b
# clear cached GitHub authentication token
viu config --clear-github-auth
""",
)
@click.option("--path", "-p", help="Print the config location and exit", is_flag=True)
@@ -60,6 +63,11 @@ from ...core.config import AppConfig
is_flag=True,
help="Start the interactive configuration wizard.",
)
@click.option(
"--clear-github-auth",
is_flag=True,
help="Clear cached GitHub authentication token.",
)
@click.pass_obj
def config(
user_config: AppConfig,
@@ -69,12 +77,18 @@ def config(
generate_desktop_entry,
update,
interactive,
clear_github_auth,
):
from ...core.constants import USER_CONFIG
from ..config.editor import InteractiveConfigEditor
from ..config.generate import generate_config_toml_from_app_model
if path:
if clear_github_auth:
from ..service.github import GitHubContributionService
GitHubContributionService.clear_cached_auth_static()
click.echo("GitHub authentication cache cleared.")
elif path:
print(USER_CONFIG)
elif view:
from rich.console import Console

View File

@@ -0,0 +1 @@
# Menu package for interactive session

View File

@@ -0,0 +1,18 @@
# Media menu modules
# Explicit module list for PyInstaller compatibility
__all__ = [
"downloads",
"download_episodes",
"dynamic_search",
"episodes",
"main",
"media_actions",
"media_airing_schedule",
"media_characters",
"media_review",
"player_controls",
"play_downloads",
"provider_search",
"results",
"servers",
]

View File

@@ -1,9 +1,10 @@
import json
import logging
import sys
import shutil
from pathlib import Path
from .....core.constants import APP_CACHE_DIR, SCRIPTS_DIR
from .....core.utils.detect import get_python_executable
from .....libs.media_api.params import MediaSearchParams
from ...session import Context, session
from ...state import InternalDirective, MediaApiState, MenuName, State
@@ -12,8 +13,36 @@ logger = logging.getLogger(__name__)
SEARCH_CACHE_DIR = APP_CACHE_DIR / "previews" / "dynamic-search"
SEARCH_RESULTS_FILE = SEARCH_CACHE_DIR / "current_search_results.json"
LAST_QUERY_FILE = SEARCH_CACHE_DIR / "last_query.txt"
RESTORE_MODE_FILE = SEARCH_CACHE_DIR / ".restore_mode"
FZF_SCRIPTS_DIR = SCRIPTS_DIR / "fzf"
SEARCH_TEMPLATE_SCRIPT = (FZF_SCRIPTS_DIR / "search.py").read_text(encoding="utf-8")
FILTER_PARSER_SCRIPT = FZF_SCRIPTS_DIR / "_filter_parser.py"
def _load_cached_titles() -> list[str]:
"""Load titles from cached search results for display in fzf."""
if not SEARCH_RESULTS_FILE.exists():
return []
try:
with open(SEARCH_RESULTS_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
media_list = data.get("data", {}).get("Page", {}).get("media", [])
titles = []
for media in media_list:
title_obj = media.get("title", {})
title = (
title_obj.get("english")
or title_obj.get("romaji")
or title_obj.get("native")
or "Unknown"
)
titles.append(title)
return titles
except (IOError, json.JSONDecodeError):
return []
@session.menu
@@ -25,6 +54,12 @@ def dynamic_search(ctx: Context, state: State) -> State | InternalDirective:
# Ensure cache directory exists
SEARCH_CACHE_DIR.mkdir(parents=True, exist_ok=True)
# Check if we're in restore mode (coming back from media_actions)
restore_mode = RESTORE_MODE_FILE.exists()
if restore_mode:
# Clear the restore flag
RESTORE_MODE_FILE.unlink(missing_ok=True)
# Read the GraphQL search query
from .....libs.media_api.anilist import gql
@@ -44,6 +79,7 @@ def dynamic_search(ctx: Context, state: State) -> State | InternalDirective:
"GRAPHQL_ENDPOINT": "https://graphql.anilist.co",
"GRAPHQL_QUERY": search_query_json,
"SEARCH_RESULTS_FILE": SEARCH_RESULTS_FILE.as_posix(),
"LAST_QUERY_FILE": LAST_QUERY_FILE.as_posix(),
"AUTH_HEADER": auth_header,
}
@@ -54,12 +90,34 @@ def dynamic_search(ctx: Context, state: State) -> State | InternalDirective:
search_script_file = SEARCH_CACHE_DIR / "search.py"
search_script_file.write_text(search_command, encoding="utf-8")
# Copy the filter parser module to the cache directory
# This is required for the search script to import it
filter_parser_dest = SEARCH_CACHE_DIR / "_filter_parser.py"
if FILTER_PARSER_SCRIPT.exists():
shutil.copy2(FILTER_PARSER_SCRIPT, filter_parser_dest)
# Make the search script executable by calling it with python3
# fzf will pass the query as {q} which becomes the first argument
search_command_final = (
f"{Path(sys.executable).as_posix()} {search_script_file.as_posix()} {{q}}"
f"{Path(get_python_executable()).as_posix()} {search_script_file.as_posix()} {{q}}"
)
# Header hint for filter syntax
filter_hint = "💡 Filters: @genre:action @status:airing @year:2024 @sort:score (type @help for more)"
# Only load previous query if we're in restore mode (coming back from media_actions)
initial_query = None
cached_results = None
if restore_mode:
# Load previous query
if LAST_QUERY_FILE.exists():
try:
initial_query = LAST_QUERY_FILE.read_text(encoding="utf-8").strip()
except IOError:
pass
# Load cached results to display immediately without network request
cached_results = _load_cached_titles()
try:
# Prepare preview functionality
preview_command = None
@@ -73,11 +131,17 @@ def dynamic_search(ctx: Context, state: State) -> State | InternalDirective:
prompt="Search Anime",
search_command=search_command_final,
preview=preview_command,
header=filter_hint,
initial_query=initial_query,
initial_results=cached_results,
)
else:
choice = ctx.selector.search(
prompt="Search Anime",
search_command=search_command_final,
header=filter_hint,
initial_query=initial_query,
initial_results=cached_results,
)
except NotImplementedError:
feedback.error("Dynamic search is not supported by your current selector")
@@ -116,6 +180,9 @@ def dynamic_search(ctx: Context, state: State) -> State | InternalDirective:
logger.error(f"Could not find selected media for choice: {choice}")
return InternalDirective.MAIN
# Set restore mode flag so we can restore state when user goes back
RESTORE_MODE_FILE.touch()
# Navigate to media actions with the selected item
return State(
menu_name=MenuName.MEDIA_ACTIONS,

View File

@@ -73,6 +73,21 @@ def provider_search(ctx: Context, state: State) -> State | InternalDirective:
update_user_normalizer_json(
chosen_title, media_title, config.general.provider.value
)
# Offer to submit the mapping to GitHub
if selector.confirm(
"Would you like to contribute this mapping to the project on GitHub?"
):
from ....service.github import GitHubContribution
contribution = GitHubContribution(
provider_name=config.general.provider.value,
provider_title=chosen_title,
media_api_title=media_title,
anilist_id=media_item.id if hasattr(media_item, "id") else None,
)
ctx.github.submit_contribution(contribution)
selected_provider_anime = provider_results_map[chosen_title]
with feedback.progress(

View File

@@ -1,6 +1,7 @@
import importlib
import importlib.util
import logging
import os
import pkgutil
from dataclasses import dataclass, field
from typing import TYPE_CHECKING, Callable, List, Optional, Union
@@ -16,6 +17,7 @@ if TYPE_CHECKING:
from ...libs.selectors.base import BaseSelector
from ..service.auth import AuthService
from ..service.feedback import FeedbackService
from ..service.github import GitHubContributionService
from ..service.player import PlayerService
from ..service.registry import MediaRegistryService
from ..service.session import SessionsService
@@ -91,6 +93,7 @@ class Context:
_session: Optional["SessionsService"] = None
_auth: Optional["AuthService"] = None
_player: Optional["PlayerService"] = None
_github: Optional["GitHubContributionService"] = None
@property
def provider(self) -> "BaseAnimeProvider":
@@ -190,6 +193,17 @@ class Context:
self._auth = AuthService(self.config.general.media_api)
return self._auth
@property
def github(self) -> "GitHubContributionService":
if not self._github:
from ..service.github.service import GitHubContributionService
self._github = GitHubContributionService(
selector=self.selector,
feedback=self.feedback,
)
return self._github
MenuFunction = Callable[[Context, State], Union[State, InternalDirective]]
@@ -309,30 +323,46 @@ class Session:
return decorator
def load_menus_from_folder(self, package: str):
package_path = MENUS_DIR / package
package_name = package_path.name
logger.debug(f"Loading menus from '{package_path}'...")
"""Load menu modules from a subfolder.
Uses pkgutil to discover modules for regular Python, and falls back
to the package's __all__ list for PyInstaller frozen executables.
"""
full_package_name = f"viu_media.cli.interactive.menu.{package}"
logger.debug(f"Loading menus from package '{full_package_name}'...")
for filename in os.listdir(package_path):
if filename.endswith(".py") and not filename.startswith("__"):
module_name = filename[:-3]
full_module_name = (
f"viu_media.cli.interactive.menu.{package_name}.{module_name}"
try:
# Import the parent package first
parent_package = importlib.import_module(full_package_name)
except ImportError as e:
logger.error(f"Failed to import menu package '{full_package_name}': {e}")
return
# Try pkgutil first (works in regular Python)
package_path = getattr(parent_package, "__path__", None)
module_names = []
if package_path:
module_names = [
name for _, name, ispkg in pkgutil.iter_modules(package_path)
if not ispkg and not name.startswith("_")
]
# Fallback to __all__ for PyInstaller frozen executables
if not module_names:
module_names = getattr(parent_package, "__all__", [])
logger.debug(f"Using __all__ fallback with {len(module_names)} modules")
for module_name in module_names:
full_module_name = f"{full_package_name}.{module_name}"
try:
# Simply importing the module will execute it,
# which runs the @session.menu decorators
importlib.import_module(full_module_name)
except Exception as e:
logger.error(
f"Failed to load menu module '{full_module_name}': {e}"
)
file_path = package_path / filename
try:
spec = importlib.util.spec_from_file_location(
full_module_name, file_path
)
if spec and spec.loader:
module = importlib.util.module_from_spec(spec)
# The act of executing the module runs the @session.menu decorators
spec.loader.exec_module(module)
except Exception as e:
logger.error(
f"Failed to load menu module '{full_module_name}': {e}"
)
# Create a single, global instance of the Session to be imported by menu modules.

View File

@@ -0,0 +1,4 @@
from .model import AuthMethod, GitHubContribution, GitHubPRResponse
from .service import GitHubContributionService
__all__ = ["GitHubContributionService", "GitHubContribution", "AuthMethod", "GitHubPRResponse"]

View File

@@ -0,0 +1,66 @@
from enum import Enum
from typing import Optional
from pydantic import BaseModel, Field
class AuthMethod(str, Enum):
"""Authentication method for GitHub API."""
BROWSER = "browser"
GH_CLI = "gh"
class GitHubAuth(BaseModel):
"""Stored GitHub authentication credentials."""
access_token: str
token_type: str = "bearer"
scope: str = ""
class GitHubContribution(BaseModel):
"""Represents a normalizer mapping contribution."""
provider_name: str = Field(..., description="The provider name (e.g., 'allanime')")
provider_title: str = Field(
..., description="The title as it appears on the provider"
)
media_api_title: str = Field(..., description="The normalized media API title")
anilist_id: Optional[int] = Field(
default=None, description="Optional AniList ID for reference"
)
class GitHubPRResponse(BaseModel):
"""Response from GitHub API when creating a pull request."""
id: int
number: int
html_url: str
title: str
state: str
class GitHubUser(BaseModel):
"""GitHub user information."""
login: str
id: int
class GitHubRepo(BaseModel):
"""GitHub repository information."""
full_name: str
default_branch: str
fork: bool = False
class GitHubFileContent(BaseModel):
"""GitHub file content response."""
sha: str
content: str
encoding: str = "base64"

View File

@@ -0,0 +1,674 @@
"""
GitHub Contribution Service
Provides functionality to submit normalizer mappings to the viu repository
via Pull Request, using either browser-based OAuth or the GitHub CLI (gh).
"""
import base64
import json
import logging
import shutil
import subprocess
import time
import webbrowser
from typing import TYPE_CHECKING, Optional
import httpx
from ....core.constants import APP_DATA_DIR, AUTHOR, CLI_NAME
from ....core.utils.file import AtomicWriter, FileLock
from ....core.utils.normalizer import USER_NORMALIZER_JSON
from .model import (
AuthMethod,
GitHubAuth,
GitHubContribution,
GitHubFileContent,
GitHubPRResponse,
GitHubRepo,
GitHubUser,
)
if TYPE_CHECKING:
from ....libs.selectors.base import BaseSelector
from ...service.feedback import FeedbackService
logger = logging.getLogger(__name__)
# GitHub OAuth configuration
GITHUB_CLIENT_ID = "Iv23liXUYWot4d4Zvjxa" # Register your OAuth app on GitHub
GITHUB_OAUTH_SCOPES = "public_repo"
GITHUB_API_BASE = "https://api.github.com"
# Repository information
REPO_OWNER = AUTHOR
REPO_NAME = "viu" # Must match GitHub repo name exactly (case-sensitive)
NORMALIZER_FILE_PATH = "viu_media/assets/normalizer.json"
AUTH_FILE = APP_DATA_DIR / "github_auth.json"
class GitHubContributionService:
"""Service for submitting normalizer mappings to GitHub."""
def __init__(
self,
selector: "BaseSelector",
feedback: Optional["FeedbackService"] = None,
):
self.selector = selector
self.feedback = feedback
self._lock = FileLock(APP_DATA_DIR / "github_auth.lock")
self._http_client = httpx.Client(
headers={
"Accept": "application/json",
"User-Agent": f"{CLI_NAME}/1.0",
},
timeout=30.0,
follow_redirects=True, # Follow redirects for all request types
)
def __del__(self):
"""Cleanup HTTP client."""
if hasattr(self, "_http_client"):
self._http_client.close()
def is_gh_cli_available(self) -> bool:
"""Check if GitHub CLI (gh) is installed and available."""
return shutil.which("gh") is not None
def is_gh_cli_authenticated(self) -> bool:
"""Check if GitHub CLI is authenticated."""
if not self.is_gh_cli_available():
return False
try:
result = subprocess.run(
["gh", "auth", "status"],
capture_output=True,
text=True,
timeout=10,
)
return result.returncode == 0
except (subprocess.SubprocessError, OSError):
return False
def get_available_auth_methods(self) -> list[AuthMethod]:
"""Get list of available authentication methods."""
methods = [AuthMethod.BROWSER]
if self.is_gh_cli_available():
methods.insert(0, AuthMethod.GH_CLI) # Prefer gh CLI if available
return methods
def prompt_auth_method(self) -> Optional[AuthMethod]:
"""
Prompt user to select their preferred authentication method.
Returns:
Selected AuthMethod or None if cancelled.
"""
methods = self.get_available_auth_methods()
choices = []
for method in methods:
if method == AuthMethod.GH_CLI:
status = "✓ authenticated" if self.is_gh_cli_authenticated() else ""
choices.append(f"gh CLI {status}".strip())
else:
choices.append("Browser (OAuth)")
choices.append("Cancel")
choice = self.selector.choose(
prompt="Select GitHub authentication method",
choices=choices,
)
if not choice or choice == "Cancel":
return None
if choice.startswith("gh CLI"):
return AuthMethod.GH_CLI
return AuthMethod.BROWSER
def submit_contribution(
self,
contribution: GitHubContribution,
auth_method: Optional[AuthMethod] = None,
) -> Optional[str]:
"""
Submit a normalizer mapping contribution to GitHub as a Pull Request.
This will:
1. Fork the repository (if not already forked)
2. Create a new branch with the updated normalizer.json
3. Open a Pull Request to the upstream repository
Args:
contribution: The mapping contribution to submit.
auth_method: The authentication method to use. If None, will prompt.
Returns:
URL of the created PR, or None if failed.
"""
if auth_method is None:
auth_method = self.prompt_auth_method()
if auth_method is None:
return None
if auth_method == AuthMethod.GH_CLI:
return self._submit_pr_via_gh_cli(contribution)
else:
return self._submit_pr_via_api(contribution)
def _get_user_normalizer_content(self) -> Optional[dict]:
"""Read the user's local normalizer.json file."""
if not USER_NORMALIZER_JSON.exists():
self._log_error(
f"Local normalizer.json not found at {USER_NORMALIZER_JSON}"
)
return None
try:
with USER_NORMALIZER_JSON.open("r", encoding="utf-8") as f:
return json.load(f)
except (json.JSONDecodeError, OSError) as e:
self._log_error(f"Failed to read normalizer.json: {e}")
return None
def _submit_pr_via_gh_cli(
self, contribution: GitHubContribution
) -> Optional[str]:
"""Submit PR using GitHub CLI."""
if not self.is_gh_cli_available():
self._log_error("GitHub CLI (gh) is not installed")
return None
if not self.is_gh_cli_authenticated():
self._log_info("GitHub CLI not authenticated. Running 'gh auth login'...")
try:
subprocess.run(["gh", "auth", "login"], check=True)
except subprocess.SubprocessError:
self._log_error("Failed to authenticate with GitHub CLI")
return None
# Read local normalizer content
normalizer_content = self._get_user_normalizer_content()
if not normalizer_content:
return None
# Get current username
try:
result = subprocess.run(
["gh", "api", "user", "--jq", ".login"],
capture_output=True,
text=True,
timeout=30,
)
if result.returncode != 0:
self._log_error("Failed to get GitHub username")
return None
username = result.stdout.strip()
except subprocess.SubprocessError as e:
self._log_error(f"Failed to get username: {e}")
return None
# Fork the repository if not already forked
self._log_info("Ensuring fork exists...")
try:
subprocess.run(
["gh", "repo", "fork", f"{REPO_OWNER}/{REPO_NAME}", "--clone=false"],
capture_output=True,
text=True,
timeout=60,
)
except subprocess.SubprocessError:
pass # Fork may already exist, continue
# Create branch name
branch_name = f"normalizer/{contribution.provider_name}-{int(time.time())}"
# Create the PR using gh pr create with the file content
title = self._format_pr_title(contribution)
body = self._format_pr_body(contribution)
# We need to create the branch and commit via API since gh doesn't support this directly
# Fall back to API method for the actual PR creation
self._log_info("Creating pull request...")
# Get token from gh CLI
try:
result = subprocess.run(
["gh", "auth", "token"],
capture_output=True,
text=True,
timeout=10,
)
if result.returncode != 0:
self._log_error("Failed to get auth token from gh CLI")
return None
token = result.stdout.strip()
except subprocess.SubprocessError as e:
self._log_error(f"Failed to get token: {e}")
return None
return self._create_pr_via_api(contribution, token, normalizer_content)
def _submit_pr_via_api(self, contribution: GitHubContribution) -> Optional[str]:
"""Submit PR using browser-based OAuth and GitHub API."""
# Authenticate
auth = self._load_cached_auth()
if not auth or not self._validate_token(auth.access_token):
auth = self._perform_device_flow_auth()
if not auth:
self._log_error("Failed to authenticate with GitHub")
return None
self._save_auth(auth)
# Read local normalizer content
normalizer_content = self._get_user_normalizer_content()
if not normalizer_content:
return None
return self._create_pr_via_api(contribution, auth.access_token, normalizer_content)
def _create_pr_via_api(
self,
contribution: GitHubContribution,
token: str,
normalizer_content: dict,
) -> Optional[str]:
"""Create a Pull Request via GitHub API."""
headers = {"Authorization": f"Bearer {token}"}
# Step 1: Get current user
self._log_info("Getting user info...")
try:
response = self._http_client.get(
f"{GITHUB_API_BASE}/user", headers=headers
)
response.raise_for_status()
user = GitHubUser.model_validate(response.json())
except httpx.HTTPError as e:
self._log_error(f"Failed to get user info: {e}")
return None
# Step 2: Fork the repository (if not already forked)
self._log_info("Ensuring fork exists...")
fork_exists = False
fork_full_name = ""
try:
# Check if fork exists by listing user's forks of the repo
response = self._http_client.get(
f"{GITHUB_API_BASE}/repos/{REPO_OWNER}/{REPO_NAME}/forks",
headers=headers,
)
response.raise_for_status()
forks = response.json()
# Find user's fork
user_fork = next(
(f for f in forks if f["owner"]["login"].lower() == user.login.lower()),
None
)
if user_fork:
fork_full_name = user_fork["full_name"]
fork_exists = True
else:
# Create fork
self._log_info("Creating fork...")
response = self._http_client.post(
f"{GITHUB_API_BASE}/repos/{REPO_OWNER}/{REPO_NAME}/forks",
headers=headers,
)
response.raise_for_status()
fork_data = response.json()
fork_full_name = fork_data["full_name"]
# Wait for fork to be ready
time.sleep(5)
except httpx.HTTPError as e:
self._log_error(f"Failed to create/check fork: {e}")
return None
self._log_info(f"Using fork: {fork_full_name}")
# Step 3: Get the default branch SHA from upstream
self._log_info("Getting upstream branch info...")
try:
response = self._http_client.get(
f"{GITHUB_API_BASE}/repos/{REPO_OWNER}/{REPO_NAME}/git/ref/heads/master",
headers=headers,
)
response.raise_for_status()
base_sha = response.json()["object"]["sha"]
except httpx.HTTPError as e:
self._log_error(f"Failed to get base branch: {e}")
return None
# Step 3.5: Sync fork with upstream if it already existed
if fork_exists:
self._log_info("Syncing fork with upstream...")
try:
response = self._http_client.post(
f"{GITHUB_API_BASE}/repos/{fork_full_name}/merge-upstream",
headers=headers,
json={"branch": "master"},
)
# 409 means already up to date, which is fine
if response.status_code not in (200, 409):
response.raise_for_status()
except httpx.HTTPError as e:
self._log_info(f"Could not sync fork (continuing anyway): {e}")
# Step 4: Create a new branch in the fork
branch_name = f"normalizer/{contribution.provider_name}-{int(time.time())}"
self._log_info(f"Creating branch: {branch_name}")
try:
response = self._http_client.post(
f"{GITHUB_API_BASE}/repos/{fork_full_name}/git/refs",
headers=headers,
json={"ref": f"refs/heads/{branch_name}", "sha": base_sha},
)
response.raise_for_status()
except httpx.HTTPStatusError as e:
error_detail = ""
try:
error_detail = str(e.response.json())
except Exception:
pass
self._log_error(f"Failed to create branch: {e} {error_detail}")
return None
except httpx.HTTPError as e:
self._log_error(f"Failed to create branch: {e}")
return None
# Step 5: Get current normalizer.json from the fork's new branch to get SHA
self._log_info("Fetching current normalizer.json...")
try:
response = self._http_client.get(
f"{GITHUB_API_BASE}/repos/{fork_full_name}/contents/{NORMALIZER_FILE_PATH}",
headers=headers,
params={"ref": branch_name},
)
response.raise_for_status()
file_info = GitHubFileContent.model_validate(response.json())
file_sha = file_info.sha
# Decode existing content and merge with user's mappings
existing_content = json.loads(
base64.b64decode(file_info.content).decode("utf-8")
)
# Merge: user's normalizer takes precedence
merged_content = existing_content.copy()
for provider, mappings in normalizer_content.items():
if provider not in merged_content:
merged_content[provider] = {}
merged_content[provider].update(mappings)
except httpx.HTTPError as e:
self._log_error(f"Failed to get normalizer.json: {e}")
return None
# Step 6: Update the file in the fork
self._log_info("Committing changes...")
new_content = json.dumps(merged_content, indent=2, ensure_ascii=False)
encoded_content = base64.b64encode(new_content.encode("utf-8")).decode("utf-8")
commit_message = (
f"feat(normalizer): add mapping for '{contribution.provider_title}'\n\n"
f"Provider: {contribution.provider_name}\n"
f"Maps: {contribution.provider_title} -> {contribution.media_api_title}"
)
try:
response = self._http_client.put(
f"{GITHUB_API_BASE}/repos/{fork_full_name}/contents/{NORMALIZER_FILE_PATH}",
headers=headers,
json={
"message": commit_message,
"content": encoded_content,
"sha": file_sha,
"branch": branch_name,
},
)
response.raise_for_status()
except httpx.HTTPStatusError as e:
error_detail = ""
try:
error_detail = str(e.response.json())
except Exception:
pass
self._log_error(f"Failed to commit changes: {e} {error_detail}")
return None
except httpx.HTTPError as e:
self._log_error(f"Failed to commit changes: {e}")
return None
# Step 7: Create the Pull Request
self._log_info("Creating pull request...")
title = self._format_pr_title(contribution)
body = self._format_pr_body(contribution)
try:
response = self._http_client.post(
f"{GITHUB_API_BASE}/repos/{REPO_OWNER}/{REPO_NAME}/pulls",
headers=headers,
json={
"title": title,
"body": body,
"head": f"{user.login}:{branch_name}",
"base": "master",
},
)
response.raise_for_status()
pr = GitHubPRResponse.model_validate(response.json())
self._log_success(f"Created PR #{pr.number}: {pr.html_url}")
return pr.html_url
except httpx.HTTPStatusError as e:
error_detail = ""
try:
error_json = e.response.json()
error_detail = error_json.get("message", "")
# GitHub includes detailed errors in 'errors' array
if "errors" in error_json:
errors = error_json["errors"]
error_detail += " | " + str(errors)
except Exception:
pass
self._log_error(f"Failed to create PR: {e} {error_detail}")
return None
except httpx.HTTPError as e:
self._log_error(f"Failed to create PR: {e}")
return None
def _format_pr_title(self, contribution: GitHubContribution) -> str:
"""Format the PR title."""
return (
f"feat(normalizer): add mapping for '{contribution.provider_title}' "
f"({contribution.provider_name})"
)
def _format_pr_body(self, contribution: GitHubContribution) -> str:
"""Format the PR body."""
return f"""## Normalizer Mapping Contribution
This PR adds a new title mapping to the normalizer.
### Mapping Details
| Field | Value |
|-------|-------|
| **Provider** | `{contribution.provider_name}` |
| **Provider Title** | `{contribution.provider_title}` |
| **Media API Title** | `{contribution.media_api_title}` |
| **AniList ID** | {contribution.anilist_id or 'N/A'} |
### Changes
This PR updates `{NORMALIZER_FILE_PATH}` with the following mapping:
```json
"{contribution.provider_title}": "{contribution.media_api_title.lower()}"
```
---
*Submitted automatically via {CLI_NAME} CLI*
"""
def _perform_device_flow_auth(self) -> Optional[GitHubAuth]:
"""
Perform GitHub Device Flow authentication.
This is more reliable for CLI apps than the web redirect flow.
"""
self._log_info("Starting GitHub authentication...")
# Request device code
try:
response = self._http_client.post(
"https://github.com/login/device/code",
data={
"client_id": GITHUB_CLIENT_ID,
"scope": GITHUB_OAUTH_SCOPES,
},
headers={"Accept": "application/json"},
)
response.raise_for_status()
data = response.json()
except httpx.HTTPError as e:
self._log_error(f"Failed to start authentication: {e}")
return None
device_code = data.get("device_code")
user_code = data.get("user_code")
verification_uri = data.get("verification_uri")
expires_in = data.get("expires_in", 900)
interval = data.get("interval", 5)
if not all([device_code, user_code, verification_uri]):
self._log_error("Invalid response from GitHub")
return None
# Show user the code and open browser
self._log_info(f"\n🔑 Your code: {user_code}")
self._log_info(f"Opening {verification_uri} in your browser...")
self._log_info("Enter the code above to authenticate.\n")
webbrowser.open(verification_uri)
# Poll for token
import time
start_time = time.time()
while time.time() - start_time < expires_in:
time.sleep(interval)
try:
token_response = self._http_client.post(
"https://github.com/login/oauth/access_token",
data={
"client_id": GITHUB_CLIENT_ID,
"device_code": device_code,
"grant_type": "urn:ietf:params:oauth:grant-type:device_code",
},
headers={"Accept": "application/json"},
)
token_data = token_response.json()
if "access_token" in token_data:
self._log_success("Authentication successful!")
return GitHubAuth(
access_token=token_data["access_token"],
token_type=token_data.get("token_type", "bearer"),
scope=token_data.get("scope", ""),
)
error = token_data.get("error")
if error == "authorization_pending":
continue
elif error == "slow_down":
interval += 5
elif error == "expired_token":
self._log_error("Authentication expired. Please try again.")
return None
elif error == "access_denied":
self._log_error("Authentication denied by user.")
return None
else:
self._log_error(f"Authentication error: {error}")
return None
except httpx.HTTPError:
continue
self._log_error("Authentication timed out. Please try again.")
return None
def _validate_token(self, token: str) -> bool:
"""Check if a GitHub token is still valid."""
try:
response = self._http_client.get(
f"{GITHUB_API_BASE}/user",
headers={"Authorization": f"Bearer {token}"},
)
return response.status_code == 200
except httpx.HTTPError:
return False
def _load_cached_auth(self) -> Optional[GitHubAuth]:
"""Load cached GitHub authentication."""
if not AUTH_FILE.exists():
return None
try:
with AUTH_FILE.open("r", encoding="utf-8") as f:
data = json.load(f)
return GitHubAuth.model_validate(data)
except (json.JSONDecodeError, ValueError):
return None
def _save_auth(self, auth: GitHubAuth) -> None:
"""Save GitHub authentication to cache."""
APP_DATA_DIR.mkdir(parents=True, exist_ok=True)
with self._lock:
with AtomicWriter(AUTH_FILE) as f:
json.dump(auth.model_dump(), f, indent=2)
def clear_cached_auth(self) -> None:
"""Clear cached GitHub authentication."""
if AUTH_FILE.exists():
AUTH_FILE.unlink()
logger.info("Cleared GitHub authentication cache")
@staticmethod
def clear_cached_auth_static() -> None:
"""Clear cached GitHub authentication (static method for CLI use)."""
if AUTH_FILE.exists():
AUTH_FILE.unlink()
logger.info("Cleared GitHub authentication cache")
def _log_info(self, message: str) -> None:
"""Log info message."""
if self.feedback:
self.feedback.info(message)
else:
logger.info(message)
def _log_success(self, message: str) -> None:
"""Log success message."""
if self.feedback:
self.feedback.success(message)
else:
logger.info(message)
def _log_error(self, message: str) -> None:
"""Log error message."""
if self.feedback:
self.feedback.error(message)
else:
logger.error(message)

View File

@@ -57,6 +57,9 @@ class MPVIPCClient:
def connect(self, timeout: float = 5.0) -> None:
"""Connect to MPV IPC socket and start the reader thread."""
if not hasattr(socket, "AF_UNIX"):
raise MPVIPCError("Unix domain sockets are unavailable on this platform")
start_time = time.time()
while time.time() - start_time < timeout:
try:
@@ -299,6 +302,10 @@ class MpvIPCPlayer(BaseIPCPlayer):
def _play_with_ipc(self, player: BasePlayer, params: PlayerParams) -> PlayerResult:
"""Play media using MPV IPC."""
try:
if not hasattr(socket, "AF_UNIX"):
raise MPVIPCError(
"MPV IPC requires Unix domain sockets, which are unavailable on this platform."
)
self._start_mpv_process(player, params)
self._connect_ipc()
self._setup_event_handling()

View File

@@ -2,7 +2,6 @@ import logging
from pathlib import Path
import re
from hashlib import sha256
import sys
from typing import Dict, List, Optional
import httpx
@@ -11,6 +10,7 @@ from viu_media.core.utils import formatter
from ...core.config import AppConfig
from ...core.constants import APP_CACHE_DIR, SCRIPTS_DIR
from ...core.utils.detect import get_python_executable
from ...core.utils.file import AtomicWriter
from ...libs.media_api.types import (
AiringScheduleResult,
@@ -327,7 +327,7 @@ def get_anime_preview(
preview_file.write_text(preview_script, encoding="utf-8")
preview_script_final = (
f"{Path(sys.executable).as_posix()} {preview_file.as_posix()} {{}}"
f"{Path(get_python_executable()).as_posix()} {preview_file.as_posix()} {{}}"
)
return preview_script_final
@@ -387,7 +387,7 @@ def get_episode_preview(
preview_file.write_text(preview_script, encoding="utf-8")
preview_script_final = (
f"{Path(sys.executable).as_posix()} {preview_file.as_posix()} {{}}"
f"{Path(get_python_executable()).as_posix()} {preview_file.as_posix()} {{}}"
)
return preview_script_final
@@ -435,7 +435,7 @@ def get_character_preview(choice_map: Dict[str, Character], config: AppConfig) -
preview_file.write_text(preview_script, encoding="utf-8")
preview_script_final = (
f"{Path(sys.executable).as_posix()} {preview_file.as_posix()} {{}}"
f"{Path(get_python_executable()).as_posix()} {preview_file.as_posix()} {{}}"
)
return preview_script_final
@@ -483,7 +483,7 @@ def get_review_preview(choice_map: Dict[str, MediaReview], config: AppConfig) ->
preview_file.write_text(preview_script, encoding="utf-8")
preview_script_final = (
f"{Path(sys.executable).as_posix()} {preview_file.as_posix()} {{}}"
f"{Path(get_python_executable()).as_posix()} {preview_file.as_posix()} {{}}"
)
return preview_script_final
@@ -599,7 +599,7 @@ def get_dynamic_anime_preview(config: AppConfig) -> str:
# Return the command to execute the preview script
preview_script_final = (
f"{Path(sys.executable).as_posix()} {preview_file.as_posix()} {{}}"
f"{Path(get_python_executable()).as_posix()} {preview_file.as_posix()} {{}}"
)
return preview_script_final

View File

@@ -189,7 +189,12 @@ class PreviewCacheWorker(ManagedBackgroundWorker):
),
"STUDIOS": formatter.shell_safe(
formatter.format_list_with_commas(
[t.name for t in media_item.studios if t.name]
[t.name for t in media_item.studios if t.name and t.is_animation_studio]
)
),
"PRODUCERS": formatter.shell_safe(
formatter.format_list_with_commas(
[t.name for t in media_item.studios if t.name and not t.is_animation_studio]
)
),
"SYNONYMNS": formatter.shell_safe(

View File

@@ -56,3 +56,30 @@ def is_running_kitty_terminal() -> bool:
def has_fzf() -> bool:
return True if shutil.which("fzf") else False
def is_frozen() -> bool:
"""Check if running as a PyInstaller frozen executable."""
return getattr(sys, "frozen", False)
def get_python_executable() -> str:
"""
Get the Python executable path.
In frozen (PyInstaller) apps, sys.executable points to the .exe,
so we need to find the system Python instead.
Returns:
Path to a Python executable.
"""
if is_frozen():
# We're in a frozen app - find system Python
for python_name in ["python3", "python", "py"]:
python_path = shutil.which(python_name)
if python_path:
return python_path
# Fallback - this likely won't work but is the best we can do
return "python"
else:
return sys.executable

View File

@@ -184,13 +184,22 @@ def format_score(score: Optional[float]) -> str:
def shell_safe(text: Optional[str]) -> str:
"""
Escapes a string for safe inclusion in a shell script,
specifically for use within double quotes. It escapes backticks,
double quotes, and dollar signs.
Escapes a string for safe inclusion in a Python script string literal.
This is used when generating Python cache scripts with embedded text content.
For Python triple-quoted strings, we need to:
- Escape backslashes first (so existing backslashes don't interfere)
- Escape triple quotes (to not break the string literal)
- Remove or replace problematic characters
"""
if not text:
return ""
return text.replace("`", "\\`").replace('"', '\\"').replace("$", "\\$")
# Escape backslashes first
result = text.replace("\\", "\\\\")
# Escape triple quotes (both types) for Python triple-quoted string literals
result = result.replace('"""', r'\"\"\"')
result = result.replace("'''", r"\'\'\'")
return result
def extract_episode_number(title: str) -> Optional[float]:

View File

@@ -50,15 +50,10 @@ def _load_normalizer_data() -> Dict[str, Dict[str, str]]:
def update_user_normalizer_json(
provider_title: str, media_api_title: str, provider_name: str
):
import time
from .file import AtomicWriter
print(
"UPDATING USER NORMALIZER JSON. PLEASE CONTRIBUTE TO THE PROJECT BY OPENING A PR ON GITHUB TO MERGE YOUR NORMALIZER JSON TO MAIN. MAEMOTTE KANSHA SHIMASU :)"
)
print(f"NORMALIZER JSON PATH IS: {USER_NORMALIZER_JSON}")
time.sleep(5)
logger.info(f"Updating user normalizer JSON at: {USER_NORMALIZER_JSON}")
if not _normalizer_cache:
raise RuntimeError(
"Fatal _normalizer_cache missing this should not be the case : (. Please report"

View File

@@ -88,6 +88,8 @@ class BaseSelector(ABC):
*,
preview: Optional[str] = None,
header: Optional[str] = None,
initial_query: Optional[str] = None,
initial_results: Optional[List[str]] = None,
) -> str | None:
"""
Provides dynamic search functionality that reloads results based on user input.
@@ -97,6 +99,8 @@ class BaseSelector(ABC):
search_command: The command to execute for searching/reloading results.
preview: An optional command or string for a preview window.
header: An optional header to display above the choices.
initial_query: An optional initial query to pre-populate the search.
initial_results: Optional list of results to display initially (avoids network request).
Returns:
The string of the chosen item.

View File

@@ -117,26 +117,42 @@ class FzfSelector(BaseSelector):
lines = result.stdout.strip().splitlines()
return lines[-1] if lines else (default or "")
def search(self, prompt, search_command, *, preview=None, header=None):
def search(self, prompt, search_command, *, preview=None, header=None, initial_query=None, initial_results=None):
"""Enhanced search using fzf's --reload flag for dynamic search."""
# Build the header with optional custom header line
display_header = self.header
if header:
display_header = f"{self.header}\n{header}"
commands = [
self.executable,
"--prompt",
f"{prompt.title()}: ",
"--header",
self.header,
display_header,
"--header-first",
"--disabled", # Disable local filtering - rely on external search command
"--bind",
f"change:reload({search_command})",
"--ansi",
]
# If there's an initial query, set it
if initial_query:
commands.extend(["--query", initial_query])
# Only trigger reload on start if we don't have cached results
if not initial_results:
commands.extend(["--bind", f"start:reload({search_command})"])
if preview:
commands.extend(["--preview", preview])
# Use cached results as initial input if provided (avoids network request)
fzf_input = "\n".join(initial_results) if initial_results else ""
result = subprocess.run(
commands,
input="",
input=fzf_input,
stdout=subprocess.PIPE,
text=True,
encoding="utf-8",