Merge branch 'master' into preview-scaling

This commit is contained in:
iMithrellas
2025-07-29 21:59:06 +02:00
committed by GitHub
26 changed files with 1025 additions and 2289 deletions

View File

@@ -82,7 +82,7 @@ Follow these patterns to ensure your contributions fit the existing architecture
4. The config loader and CLI option generation will handle the rest automatically.
## 5. Code Style and Conventions
* **Style:** `black` for formatting, `ruff` for linting. The `pre-commit` hooks handle this.
* **Style:** `ruff` for formatting, `ruff` for linting. The `pre-commit` hooks handle this.
* **Types:** Full type hinting is mandatory. All code must pass `pyright`.
* **Commits:** Adhere to the [Conventional Commits](https://www.conventionalcommits.org/en/v1.0.0/) standard.
* **Logging:** Use Python's `logging` module. Do not use `print()` for debugging or informational messages in library or service code.
@@ -99,330 +99,3 @@ Follow these patterns to ensure your contributions fit the existing architecture
***DON'T** make direct `httpx` calls outside of a `provider` or `media_api` library.
***DON'T** introduce new dependencies without updating `pyproject.toml` and discussing it first.
## 7. Current Codebase file structure
```
.
├── bundle
│   └── Dockerfile
├── completions
│   ├── fastanime.bash
│   ├── fastanime.fish
│   └── fastanime.zsh
├── CONTRIBUTIONS.md
├── dev
│   ├── generate_completions.sh
│   ├── generated
│   │   └── anilist
│   │   └── tags.json
│   └── make_release
├── DISCLAIMER.md
├── fa
├── fastanime
│   ├── assets
│   │   ├── defaults
│   │   │   ├── ascii-art
│   │   │   ├── fzf-opts
│   │   │   └── rofi-themes
│   │   │   ├── confirm.rasi
│   │   │   ├── input.rasi
│   │   │   ├── main.rasi
│   │   │   └── preview.rasi
│   │   ├── graphql
│   │   │   ├── allanime
│   │   │   │   └── queries
│   │   │   │   ├── anime.gql
│   │   │   │   ├── episodes.gql
│   │   │   │   └── search.gql
│   │   │   └── anilist
│   │   │   ├── mutations
│   │   │   │   ├── delete-list-entry.gql
│   │   │   │   ├── mark-read.gql
│   │   │   │   └── media-list.gql
│   │   │   └── queries
│   │   │   ├── logged-in-user.gql
│   │   │   ├── media-airing-schedule.gql
│   │   │   ├── media-characters.gql
│   │   │   ├── media-list.gql
│   │   │   ├── media-list-item.gql
│   │   │   ├── media-recommendations.gql
│   │   │   ├── media-relations.gql
│   │   │   ├── notifications.gql
│   │   │   ├── reviews.gql
│   │   │   ├── search.gql
│   │   │   └── user-info.gql
│   │   ├── icons
│   │   │   ├── logo.ico
│   │   │   └── logo.png
│   │   ├── normalizer.json
│   │   └── scripts
│   │   └── fzf
│   │   ├── airing-schedule-info.template.sh
│   │   ├── airing-schedule-preview.template.sh
│   │   ├── character-info.template.sh
│   │   ├── character-preview.template.sh
│   │   ├── dynamic-preview.template.sh
│   │   ├── episode-info.template.sh
│   │   ├── info.template.sh
│   │   ├── preview.template.sh
│   │   ├── review-info.template.sh
│   │   ├── review-preview.template.sh
│   │   └── search.template.sh
│   ├── cli
│   │   ├── cli.py
│   │   ├── commands
│   │   │   ├── anilist
│   │   │   │   ├── cmd.py
│   │   │   │   ├── commands
│   │   │   │   │   ├── auth.py
│   │   │   │   │   ├── download.py
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── search.py
│   │   │   │   │   └── stats.py
│   │   │   │   ├── examples.py
│   │   │   │   ├── helpers.py
│   │   │   │   └── __init__.py
│   │   │   ├── completions.py
│   │   │   ├── config.py
│   │   │   ├── download.py
│   │   │   ├── examples.py
│   │   │   ├── __init__.py
│   │   │   ├── registry
│   │   │   │   ├── cmd.py
│   │   │   │   ├── commands
│   │   │   │   │   ├── backup.py
│   │   │   │   │   ├── clean.py
│   │   │   │   │   ├── export.py
│   │   │   │   │   ├── import_.py
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── restore.py
│   │   │   │   │   ├── search.py
│   │   │   │   │   ├── stats.py
│   │   │   │   │   └── sync.py
│   │   │   │   ├── examples.py
│   │   │   │   └── __init__.py
│   │   │   ├── search.py
│   │   │   └── update.py
│   │   ├── config
│   │   │   ├── editor.py
│   │   │   ├── generate.py
│   │   │   ├── __init__.py
│   │   │   └── loader.py
│   │   ├── __init__.py
│   │   ├── interactive
│   │   │   ├── menu
│   │   │   │   ├── media
│   │   │   │   │   ├── downloads.py
│   │   │   │   │   ├── dynamic_search.py
│   │   │   │   │   ├── episodes.py
│   │   │   │   │   ├── main.py
│   │   │   │   │   ├── media_actions.py
│   │   │   │   │   ├── media_airing_schedule.py
│   │   │   │   │   ├── media_characters.py
│   │   │   │   │   ├── media_review.py
│   │   │   │   │   ├── player_controls.py
│   │   │   │   │   ├── provider_search.py
│   │   │   │   │   ├── results.py
│   │   │   │   │   └── servers.py
│   │   │   │   └── share
│   │   │   │   ├── auth.py
│   │   │   │   ├── session_management.py
│   │   │   │   ├── user_media_list.py
│   │   │   │   └── watch_history.py
│   │   │   ├── session.py
│   │   │   └── state.py
│   │   ├── options.py
│   │   ├── service
│   │   │   ├── auth
│   │   │   │   ├── __init__.py
│   │   │   │   ├── model.py
│   │   │   │   └── service.py
│   │   │   ├── download
│   │   │   │   ├── __init__.py
│   │   │   │   └── service.py
│   │   │   ├── feedback
│   │   │   │   ├── __init__.py
│   │   │   │   └── service.py
│   │   │   ├── player
│   │   │   │   ├── __init__.py
│   │   │   │   ├── ipc
│   │   │   │   │   ├── base.py
│   │   │   │   │   └── mpv.py
│   │   │   │   └── service.py
│   │   │   ├── registry
│   │   │   │   ├── __init__.py
│   │   │   │   ├── models.py
│   │   │   │   └── service.py
│   │   │   ├── session
│   │   │   │   ├── __init__.py
│   │   │   │   ├── model.py
│   │   │   │   └── service.py
│   │   │   └── watch_history
│   │   │   ├── __init__.py
│   │   │   └── service.py
│   │   └── utils
│   │   ├── ansi.py
│   │   ├── completion.py
│   │   ├── exception.py
│   │   ├── feh.py
│   │   ├── icat.py
│   │   ├── image.py
│   │   ├── __init__.py
│   │   ├── lazyloader.py
│   │   ├── logging.py
│   │   ├── parser.py
│   │   ├── preview.py
│   │   ├── preview_workers.py
│   │   └── update.py
│   ├── core
│   │   ├── caching
│   │   ├── config
│   │   │   ├── defaults.py
│   │   │   ├── descriptions.py
│   │   │   ├── __init__.py
│   │   │   └── model.py
│   │   ├── constants.py
│   │   ├── downloader
│   │   │   ├── base.py
│   │   │   ├── default.py
│   │   │   ├── downloader.py
│   │   │   ├── __init__.py
│   │   │   ├── model.py
│   │   │   ├── params.py
│   │   │   ├── torrents.py
│   │   │   └── yt_dlp.py
│   │   ├── exceptions.py
│   │   ├── __init__.py
│   │   ├── patterns.py
│   │   └── utils
│   │   ├── concurrency.py
│   │   ├── converter.py
│   │   ├── detect.py
│   │   ├── file.py
│   │   ├── formatter.py
│   │   ├── fuzzy.py
│   │   ├── graphql.py
│   │   ├── __init__.py
│   │   ├── networking.py
│   │   └── normalizer.py
│   ├── fastanime.py
│   ├── __init__.py
│   ├── libs
│   │   ├── aniskip
│   │   │   ├── api.py
│   │   │   └── __init__.py
│   │   ├── discord
│   │   │   ├── api.py
│   │   │   └── __init__.py
│   │   ├── __init__.py
│   │   ├── media_api
│   │   │   ├── anilist
│   │   │   │   ├── api.py
│   │   │   │   ├── gql.py
│   │   │   │   ├── __init__.py
│   │   │   │   ├── mapper.py
│   │   │   │   └── types.py
│   │   │   ├── api.py
│   │   │   ├── base.py
│   │   │   ├── __init__.py
│   │   │   ├── jikan
│   │   │   │   ├── api.py
│   │   │   │   ├── __init__.py
│   │   │   │   └── mapper.py
│   │   │   ├── params.py
│   │   │   ├── types.py
│   │   │   └── utils
│   │   │   └── debug.py
│   │   ├── player
│   │   │   ├── base.py
│   │   │   ├── __init__.py
│   │   │   ├── mpv
│   │   │   │   ├── __init__.py
│   │   │   │   └── player.py
│   │   │   ├── params.py
│   │   │   ├── player.py
│   │   │   ├── syncplay
│   │   │   │   ├── __init__.py
│   │   │   │   └── player.py
│   │   │   ├── types.py
│   │   │   └── vlc
│   │   │   ├── __init__.py
│   │   │   └── player.py
│   │   ├── provider
│   │   │   ├── anime
│   │   │   │   ├── allanime
│   │   │   │   │   ├── constants.py
│   │   │   │   │   ├── extractors
│   │   │   │   │   │   ├── ak.py
│   │   │   │   │   │   ├── base.py
│   │   │   │   │   │   ├── dropbox.py
│   │   │   │   │   │   ├── extractor.py
│   │   │   │   │   │   ├── filemoon.py
│   │   │   │   │   │   ├── gogoanime.py
│   │   │   │   │   │   ├── __init__.py
│   │   │   │   │   │   ├── mp4_upload.py
│   │   │   │   │   │   ├── sharepoint.py
│   │   │   │   │   │   ├── streamsb.py
│   │   │   │   │   │   ├── vid_mp4.py
│   │   │   │   │   │   ├── we_transfer.py
│   │   │   │   │   │   ├── wixmp.py
│   │   │   │   │   │   └── yt_mp4.py
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── mappers.py
│   │   │   │   │   ├── provider.py
│   │   │   │   │   ├── types.py
│   │   │   │   │   └── utils.py
│   │   │   │   ├── animepahe
│   │   │   │   │   ├── constants.py
│   │   │   │   │   ├── extractor.py
│   │   │   │   │   ├── __init__.py
│   │   │   │   │   ├── mappers.py
│   │   │   │   │   ├── provider.py
│   │   │   │   │   └── types.py
│   │   │   │   ├── base.py
│   │   │   │   ├── __init__.py
│   │   │   │   ├── params.py
│   │   │   │   ├── provider.py
│   │   │   │   ├── types.py
│   │   │   │   └── utils
│   │   │   │   └── debug.py
│   │   │   ├── __init__.py
│   │   │   ├── manga
│   │   │   │   ├── base.py
│   │   │   │   ├── common.py
│   │   │   │   ├── __init__.py
│   │   │   │   ├── mangadex
│   │   │   │   │   ├── api.py
│   │   │   │   │   └── __init__.py
│   │   │   │   └── MangaProvider.py
│   │   │   └── scraping
│   │   │   ├── html_parser.py
│   │   │   ├── __init__.py
│   │   │   ├── user_agents.py
│   │   │   └── utils.py
│   │   └── selectors
│   │   ├── base.py
│   │   ├── fzf
│   │   │   ├── __init__.py
│   │   │   └── selector.py
│   │   ├── __init__.py
│   │   ├── inquirer
│   │   │   ├── __init__.py
│   │   │   └── selector.py
│   │   ├── rofi
│   │   │   ├── __init__.py
│   │   │   └── selector.py
│   │   └── selector.py
│   └── __main__.py
├── flake.lock
├── flake.nix
├── LICENSE
├── pyproject.toml
├── pyrightconfig.json
├── pytest.ini
├── README.md
├── shell.nix
├── tox.ini
└── uv.lock
69 directories, 253 files
```

925
README.md

File diff suppressed because it is too large Load Diff

View File

@@ -21,7 +21,7 @@ C_RULE="{C_RULE}"
RESET="{RESET}"
# Selected item from fzf
SELECTED_ITEM="{}"
SELECTED_ITEM={}
generate_sha256() {
local input="$1"
@@ -156,14 +156,11 @@ if [ -z "$SELECTED_ITEM" ] || [ ! -f "$SEARCH_RESULTS_FILE" ]; then
fi
exit 0
fi
# HACK: the extra dot is cause theres weird character at start
ANIME_ID=$(echo "$SELECTED_ITEM"|sed -E 's/^[[:space:]]+|[[:space:]]+$//g'|sed -E 's/^.\[([0-9]+)\] .*/\1/g')
# Parse the search results JSON and find the matching item
if command -v jq >/dev/null 2>&1; then
# Use jq for faster and more reliable JSON parsing
MEDIA_DATA=$(cat "$SEARCH_RESULTS_FILE" | jq --arg anime_id "$ANIME_ID" '
MEDIA_DATA=$(cat "$SEARCH_RESULTS_FILE" | jq --arg anime_title "$SELECTED_ITEM" '
.data.Page.media[]? |
select(.id == ($anime_id | tonumber) )
select((.title.english // .title.romaji // .title.native // "Unknown") == $anime_title )
' )
else
# Fallback to Python for JSON parsing

View File

@@ -75,11 +75,7 @@ if command -v jq >/dev/null 2>&1; then
elif (.data.Page.media // []) | length == 0 then
"❌ No results found"
else
.data.Page.media[] |
"[" + (.id|tostring) + "] " +((.title.english // .title.romaji // .title.native // "Unknown") +
" (" + (.startDate.year // "Unknown" | tostring) + ") " +
"[" + (.status // "Unknown") + "] - " +
((.genres[:3] // []) | join(", ") | if . == "" then "Unknown" else . end))
.data.Page.media[] | (.title.english // .title.romaji // .title.native // "Unknown")
end
' 2>/dev/null || echo "❌ Parse error"
else

View File

@@ -54,7 +54,7 @@ def stats(config: "AppConfig"):
)
else:
# Display profile image using kitten icat
if profile.avatar_url:
if profile.user_profile.avatar_url:
console.clear()
image_x = int(console.size.width * 0.1)
image_y = int(console.size.height * 0.1)
@@ -68,7 +68,7 @@ def stats(config: "AppConfig"):
"--clear",
"--place",
f"{img_w}x{img_h}@{image_x}x{image_y}",
profile.avatar_url,
profile.user_profile.avatar_url,
],
check=False,
)
@@ -82,16 +82,13 @@ def stats(config: "AppConfig"):
console.print(
Panel(
Markdown(about_text),
title=f"📊 {profile.name}'s Profile",
title=f"📊 {profile.user_profile.name}'s Profile",
)
)
# You can add more stats here if the API provides them
feedback.success("User profile displayed successfully")
except FastAnimeError as e:
feedback.error("Failed to fetch user stats", str(e))
raise click.Abort()
except Exception as e:
feedback.error("Unexpected error occurred", str(e))
raise click.Abort()

View File

@@ -1,176 +0,0 @@
"""
Common helper functions for anilist subcommands.
"""
import json
from typing import TYPE_CHECKING
import click
from rich.progress import Progress
if TYPE_CHECKING:
from fastanime.core.config import AppConfig
from fastanime.libs.media_api.base import BaseApiClient
def get_authenticated_api_client(config: "AppConfig") -> "BaseApiClient":
"""
Get an authenticated API client or raise an error if not authenticated.
Args:
config: Application configuration
Returns:
Authenticated API client
Raises:
click.Abort: If user is not authenticated
"""
from fastanime.cli.utils.feedback import create_feedback_manager
from fastanime.libs.media_api.api import create_api_client
feedback = create_feedback_manager(config.general.icons)
api_client = create_api_client(config.general.media_api, config)
# Check if user is authenticated by trying to get viewer profile
try:
user_profile = api_client.get_viewer_profile()
if not user_profile:
feedback.error("Not authenticated", "Please run: fastanime anilist login")
raise click.Abort()
except Exception:
feedback.error(
"Authentication check failed", "Please run: fastanime anilist login"
)
raise click.Abort()
return api_client
def handle_media_search_command(
config: "AppConfig",
dump_json: bool,
task_name: str,
search_params_factory,
empty_message: str,
):
"""
Generic handler for media search commands (trending, popular, recent, etc).
Args:
config: Application configuration
dump_json: Whether to output JSON instead of launching interactive mode
task_name: Name to display in progress indicator
search_params_factory: Function that returns ApiSearchParams
empty_message: Message to show when no results found
"""
from fastanime.cli.utils.feedback import create_feedback_manager
from fastanime.core.exceptions import FastAnimeError
from fastanime.libs.media_api.api import create_api_client
feedback = create_feedback_manager(config.general.icons)
try:
# Create API client
api_client = create_api_client(config.general.media_api, config)
# Fetch media
with Progress() as progress:
progress.add_task(task_name, total=None)
search_params = search_params_factory(config)
search_result = api_client.search_media(search_params)
if not search_result or not search_result.media:
raise FastAnimeError(empty_message)
if dump_json:
# Use Pydantic's built-in serialization
print(json.dumps(search_result.model_dump(), indent=2))
else:
# Launch interactive session for browsing results
from fastanime.cli.interactive.session import session
feedback.info(
f"Found {len(search_result.media)} anime. Launching interactive mode..."
)
session.load_menus_from_folder()
session.run(config)
except FastAnimeError as e:
feedback.error(f"Failed to fetch {task_name.lower()}", str(e))
raise click.Abort()
except Exception as e:
feedback.error("Unexpected error occurred", str(e))
raise click.Abort()
def handle_user_list_command(
config: "AppConfig", dump_json: bool, status: str, list_name: str
):
"""
Generic handler for user list commands (watching, completed, planning, etc).
Args:
config: Application configuration
dump_json: Whether to output JSON instead of launching interactive mode
status: The list status to fetch (CURRENT, COMPLETED, PLANNING, etc)
list_name: Human-readable name for the list (e.g., "watching", "completed")
"""
from fastanime.cli.utils.feedback import create_feedback_manager
from fastanime.core.exceptions import FastAnimeError
from fastanime.libs.media_api.params import UserMediaListSearchParams
feedback = create_feedback_manager(config.general.icons)
# Validate status parameter
valid_statuses = [
"CURRENT",
"PLANNING",
"COMPLETED",
"DROPPED",
"PAUSED",
"REPEATING",
]
if status not in valid_statuses:
feedback.error(
f"Invalid status: {status}", f"Valid statuses are: {valid_statuses}"
)
raise click.Abort()
try:
# Get authenticated API client
api_client = get_authenticated_api_client(config)
# Fetch user's anime list
with Progress() as progress:
progress.add_task(f"Fetching your {list_name} list...", total=None)
list_params = UserMediaListSearchParams(
status=status, # type: ignore # We validated it above
page=1,
per_page=config.anilist.per_page or 50,
)
user_list = api_client.search_media_list(list_params)
if not user_list or not user_list.media:
feedback.info(f"You have no anime in your {list_name} list")
return
if dump_json:
# Use Pydantic's built-in serialization
print(json.dumps(user_list.model_dump(), indent=2))
else:
# Launch interactive session for browsing results
from fastanime.cli.interactive.session import session
feedback.info(
f"Found {len(user_list.media)} anime in your {list_name} list. Launching interactive mode..."
)
session.load_menus_from_folder()
session.run(config)
except FastAnimeError as e:
feedback.error(f"Failed to fetch {list_name} list", str(e))
raise click.Abort()
except Exception as e:
feedback.error("Unexpected error occurred", str(e))
raise click.Abort()

View File

@@ -204,14 +204,13 @@ def download_anime(
anime_title: str,
episode: str,
):
from rich import print
from ...core.downloader import DownloadParams, create_downloader
from ...libs.provider.anime.params import EpisodeStreamsParams
downloader = create_downloader(config.downloads)
with feedback.progress(f"Fetching episode streams"):
with feedback.progress("Fetching episode streams"):
streams = provider.episode_streams(
EpisodeStreamsParams(
anime_id=anime.id,
@@ -226,14 +225,14 @@ def download_anime(
)
if config.stream.server.value == "TOP":
with feedback.progress(f"Fetching top server"):
with feedback.progress("Fetching top server"):
server = next(streams, None)
if not server:
raise FastAnimeError(
f"Failed to get server for anime: {anime.title}, episode: {episode}"
)
else:
with feedback.progress(f"Fetching servers"):
with feedback.progress("Fetching servers"):
servers = {server.name: server for server in streams}
servers_names = list(servers.keys())
if config.stream.server in servers_names:

View File

@@ -2,15 +2,21 @@
Registry backup command - create full backups of the registry
"""
import json
import tarfile
from pathlib import Path
from datetime import datetime
from io import BytesIO
from pathlib import Path
from typing import TYPE_CHECKING
import click
from .....core.config import AppConfig
from ....service.feedback import FeedbackService
from ....service.registry.service import MediaRegistryService
from ....utils.feedback import create_feedback_manager
if TYPE_CHECKING:
pass
@click.command(help="Create a full backup of the registry")
@@ -50,10 +56,10 @@ def backup(
Includes all media records, index files, and optionally cache data.
Backups can be compressed and are suitable for restoration.
"""
feedback = create_feedback_manager(config.general.icons)
feedback = FeedbackService(config)
try:
registry_service = MediaRegistryService(api, config.registry)
registry_service = MediaRegistryService(api, config.media_registry)
# Generate output filename if not specified
if not output:
@@ -99,15 +105,14 @@ def backup(
def _create_tar_backup(
registry_service,
registry_service: MediaRegistryService,
output_path: Path,
compress: bool,
include_cache: bool,
feedback,
feedback: FeedbackService,
api: str,
):
"""Create a tar-based backup."""
mode = "w:gz" if compress else "w"
with tarfile.open(output_path, mode) as tar:
@@ -130,25 +135,27 @@ def _create_tar_backup(
tar.add(cache_dir, arcname="cache")
feedback.info("Added to backup", "Cache data")
# Add metadata file
metadata = _create_backup_metadata(registry_service, api, include_cache)
metadata_path = output_path.parent / "backup_metadata.json"
# Add metadata file directly into the archive without creating a temp file
try:
import json
metadata = _create_backup_metadata(registry_service, api, include_cache)
metadata_bytes = json.dumps(metadata, indent=2, default=str).encode("utf-8")
with open(metadata_path, "w", encoding="utf-8") as f:
json.dump(metadata, f, indent=2, default=str)
tar.add(metadata_path, arcname="backup_metadata.json")
metadata_path.unlink() # Clean up temp file
tarinfo = tarfile.TarInfo(name="backup_metadata.json")
tarinfo.size = len(metadata_bytes)
tarinfo.mtime = int(datetime.now().timestamp())
with BytesIO(metadata_bytes) as bio:
tar.addfile(tarinfo, bio)
except Exception as e:
feedback.warning("Metadata Error", f"Failed to add metadata: {e}")
def _create_zip_backup(
registry_service, output_path: Path, include_cache: bool, feedback, api: str
registry_service: MediaRegistryService,
output_path: Path,
include_cache: bool,
feedback: FeedbackService,
api: str,
):
"""Create a zip-based backup."""
import zipfile
@@ -183,23 +190,25 @@ def _create_zip_backup(
feedback.info("Added to backup", "Cache data")
# Add metadata
metadata = _create_backup_metadata(registry_service, api, include_cache)
try:
import json
metadata = _create_backup_metadata(registry_service, api, include_cache)
metadata_json = json.dumps(metadata, indent=2, default=str)
zip_file.writestr("backup_metadata.json", metadata_json)
except Exception as e:
feedback.warning("Metadata Error", f"Failed to add metadata: {e}")
def _create_backup_metadata(registry_service, api: str, include_cache: bool) -> dict:
def _create_backup_metadata(
registry_service: MediaRegistryService, api: str, include_cache: bool
) -> dict:
"""Create backup metadata."""
from .....core.constants import __version__
stats = registry_service.get_registry_stats()
return {
"backup_timestamp": datetime.now().isoformat(),
"fastanime_version": "unknown", # You might want to get this from somewhere
"fastanime_version": __version__,
"registry_version": stats.get("version"),
"api": api,
"total_media": stats.get("total_media", 0),
@@ -209,9 +218,10 @@ def _create_backup_metadata(registry_service, api: str, include_cache: bool) ->
}
def _show_backup_summary(backup_path: Path, format_type: str, feedback):
def _show_backup_summary(
backup_path: Path, format_type: str, feedback: FeedbackService
):
"""Show summary of backup contents."""
try:
if format_type.lower() == "tar":
with tarfile.open(backup_path, "r:*") as tar:
@@ -235,11 +245,14 @@ def _show_backup_summary(backup_path: Path, format_type: str, feedback):
def _format_file_size(file_path: Path) -> str:
"""Format file size in human-readable format."""
try:
size = file_path.stat().st_size
for unit in ["B", "KB", "MB", "GB"]:
if size < 1024.0:
return f"{size:.1f} {unit}"
size /= 1024.0
return f"{size:.1f} TB"
except:
size_bytes: float = float(file_path.stat().st_size)
if size_bytes == 0:
return "0 B"
size_name = ("B", "KB", "MB", "GB", "TB")
i = 0
while size_bytes >= 1024.0 and i < len(size_name) - 1:
size_bytes /= 1024.0
i += 1
return f"{size_bytes:.1f} {size_name[i]}"
except FileNotFoundError:
return "Unknown size"

View File

@@ -2,13 +2,16 @@
Registry clean command - clean up orphaned entries and invalid data
"""
import json
from typing import Dict, List
import click
from rich.console import Console
from rich.table import Table
from .....core.config import AppConfig
from ....service.feedback import FeedbackService
from ....service.registry.service import MediaRegistryService
from ....utils.feedback import create_feedback_manager
@click.command(help="Clean up orphaned entries and invalid data from registry")
@@ -51,7 +54,7 @@ def clean(
Can remove orphaned entries, invalid data, duplicates, and entries
from old format versions. Use --dry-run to preview changes.
"""
feedback = create_feedback_manager(config.general.icons)
feedback = FeedbackService(config)
console = Console()
# Default to all cleanup types if none specified
@@ -59,9 +62,9 @@ def clean(
orphaned = invalid = duplicates = old_format = True
try:
registry_service = MediaRegistryService(api, config.registry)
registry_service = MediaRegistryService(api, config.media_registry)
cleanup_results = {
cleanup_results: Dict[str, List] = {
"orphaned": [],
"invalid": [],
"duplicates": [],
@@ -69,33 +72,35 @@ def clean(
}
# Analyze registry for cleanup opportunities
_analyze_registry(
registry_service, cleanup_results, orphaned, invalid, duplicates, old_format
)
with feedback.progress("Analyzing registry..."):
_analyze_registry(
registry_service,
cleanup_results,
orphaned,
invalid,
duplicates,
old_format,
)
# Show cleanup summary
_display_cleanup_summary(console, cleanup_results, config.general.icons)
# Confirm cleanup if not dry run and not forced
total_items = sum(len(items) for items in cleanup_results.values())
if total_items == 0:
feedback.info(
feedback.success(
"Registry Clean", "No cleanup needed - registry is already clean!"
)
return
if not dry_run:
if not force:
if not click.confirm(f"Clean up {total_items} items from registry?"):
feedback.info("Cleanup Cancelled", "No changes were made")
return
if not force and not click.confirm(
f"Clean up {total_items} items from registry?"
):
feedback.info("Cleanup Cancelled", "No changes were made")
return
# Perform cleanup
_perform_cleanup(registry_service, cleanup_results, feedback)
feedback.success(
"Cleanup Complete", f"Cleaned up {total_items} items from registry"
)
else:
feedback.info("Dry Run Complete", f"Would clean up {total_items} items")
@@ -105,172 +110,107 @@ def clean(
def _analyze_registry(
registry_service,
results: dict,
registry_service: MediaRegistryService,
results: Dict[str, List],
check_orphaned: bool,
check_invalid: bool,
check_duplicates: bool,
check_old_format: bool,
):
"""Analyze registry for cleanup opportunities."""
if check_orphaned:
results["orphaned"] = _find_orphaned_entries(registry_service)
if check_invalid:
results["invalid"] = _find_invalid_entries(registry_service)
if check_duplicates:
results["duplicates"] = _find_duplicate_entries(registry_service)
if check_old_format:
results["old_format"] = _find_old_format_entries(registry_service)
def _find_orphaned_entries(registry_service) -> list:
def _find_orphaned_entries(registry_service: MediaRegistryService) -> list:
"""Find index entries that don't have corresponding media files."""
orphaned = []
try:
index = registry_service._load_index()
for entry_key, entry in index.media_index.items():
media_file = registry_service._get_media_file_path(entry.media_id)
if not media_file.exists():
orphaned.append(
{
"type": "orphaned_index",
"id": entry.media_id,
"key": entry_key,
"reason": "Media file missing",
}
)
except Exception:
pass
index = registry_service._load_index()
for entry_key, entry in index.media_index.items():
media_file = registry_service._get_media_file_path(entry.media_id)
if not media_file.exists():
orphaned.append(
{"id": entry.media_id, "key": entry_key, "reason": "Media file missing"}
)
return orphaned
def _find_invalid_entries(registry_service) -> list:
def _find_invalid_entries(registry_service: MediaRegistryService) -> list:
"""Find invalid or corrupted entries."""
invalid = []
try:
# Check all media files
for media_file in registry_service.media_registry_dir.iterdir():
if not media_file.name.endswith(".json"):
continue
try:
media_id = int(media_file.stem)
record = registry_service.get_media_record(media_id)
# Check for invalid record structure
if not record or not record.media_item:
invalid.append(
{
"type": "invalid_record",
"id": media_id,
"file": media_file,
"reason": "Invalid record structure",
}
)
elif (
not record.media_item.title
or not record.media_item.title.english
and not record.media_item.title.romaji
):
invalid.append(
{
"type": "invalid_title",
"id": media_id,
"file": media_file,
"reason": "Missing or invalid title",
}
)
except (ValueError, Exception) as e:
for media_file in registry_service.media_registry_dir.glob("*.json"):
try:
media_id = int(media_file.stem)
record = registry_service.get_media_record(media_id)
if (
not record
or not record.media_item
or not record.media_item.title.english
and not record.media_item.title.romaji
):
invalid.append(
{
"type": "corrupted_file",
"id": media_file.stem,
"id": media_id,
"file": media_file,
"reason": f"File corruption: {e}",
"reason": "Invalid record structure or missing title",
}
)
except Exception:
pass
except (ValueError, json.JSONDecodeError) as e:
invalid.append(
{
"id": media_file.stem,
"file": media_file,
"reason": f"File corruption: {e}",
}
)
return invalid
def _find_duplicate_entries(registry_service) -> list:
def _find_duplicate_entries(registry_service: MediaRegistryService) -> list:
"""Find duplicate entries (same media ID appearing multiple times)."""
duplicates = []
seen_ids = set()
try:
index = registry_service._load_index()
for entry_key, entry in index.media_index.items():
if entry.media_id in seen_ids:
duplicates.append(
{
"type": "duplicate_index",
"id": entry.media_id,
"key": entry_key,
"reason": "Duplicate media ID in index",
}
)
else:
seen_ids.add(entry.media_id)
except Exception:
pass
index = registry_service._load_index()
for entry_key, entry in index.media_index.items():
if entry.media_id in seen_ids:
duplicates.append(
{
"id": entry.media_id,
"key": entry_key,
"reason": "Duplicate media ID in index",
}
)
else:
seen_ids.add(entry.media_id)
return duplicates
def _find_old_format_entries(registry_service) -> list:
def _find_old_format_entries(registry_service: MediaRegistryService) -> list:
"""Find entries from old registry format versions."""
from ....service.registry.service import REGISTRY_VERSION
old_format = []
try:
index = registry_service._load_index()
current_version = registry_service._index.version
# Check for entries that might be from old formats
# This is a placeholder - you'd implement specific checks based on your version history
for media_file in registry_service.media_registry_dir.iterdir():
if not media_file.name.endswith(".json"):
continue
try:
import json
with open(media_file, "r") as f:
data = json.load(f)
# Check for old format indicators
if "version" in data and data["version"] < current_version:
old_format.append(
{
"type": "old_version",
"id": media_file.stem,
"file": media_file,
"reason": f"Old format version {data.get('version')}",
}
)
except Exception:
pass
except Exception:
pass
index = registry_service._load_index()
current_version = index.version
if index.version != REGISTRY_VERSION:
old_format.append(
{
"id": "index",
"file": registry_service._index_file,
"reason": f"Index version mismatch ({index.version})",
}
)
return old_format
def _display_cleanup_summary(console: Console, results: dict, icons: bool):
def _display_cleanup_summary(console: Console, results: Dict[str, List], icons: bool):
"""Display summary of cleanup opportunities."""
table = Table(title=f"{'🧹 ' if icons else ''}Registry Cleanup Summary")
table.add_column("Category", style="cyan", no_wrap=True)
table.add_column("Count", style="magenta", justify="right")
@@ -278,123 +218,69 @@ def _display_cleanup_summary(console: Console, results: dict, icons: bool):
categories = {
"orphaned": "Orphaned Entries",
"invalid": "Invalid Entries",
"invalid": "Invalid/Corrupt Entries",
"duplicates": "Duplicate Entries",
"old_format": "Old Format Entries",
"old_format": "Outdated Format",
}
for category, display_name in categories.items():
count = len(results[category])
description = "None found"
if count > 0:
# Get sample reasons
reasons = set(item["reason"] for item in results[category][:3])
reasons = {item["reason"] for item in results[category][:3]}
description = "; ".join(list(reasons)[:2])
if len(reasons) > 2:
description += "..."
else:
description = "None found"
table.add_row(display_name, str(count), description)
console.print(table)
console.print()
# Show detailed breakdown if there are items to clean
for category, items in results.items():
if items:
_display_category_details(console, category, items, icons)
def _display_category_details(
console: Console, category: str, items: list, icons: bool
def _perform_cleanup(
registry_service: MediaRegistryService,
results: Dict[str, List],
feedback: FeedbackService,
):
"""Display detailed breakdown for a cleanup category."""
category_names = {
"orphaned": "🔗 Orphaned Entries" if icons else "Orphaned Entries",
"invalid": "❌ Invalid Entries" if icons else "Invalid Entries",
"duplicates": "👥 Duplicate Entries" if icons else "Duplicate Entries",
"old_format": "📼 Old Format Entries" if icons else "Old Format Entries",
}
table = Table(title=category_names.get(category, category.title()))
table.add_column("ID", style="cyan", no_wrap=True)
table.add_column("Type", style="magenta")
table.add_column("Reason", style="yellow")
for item in items[:10]: # Show max 10 items
table.add_row(str(item["id"]), item["type"], item["reason"])
if len(items) > 10:
table.add_row("...", "...", f"And {len(items) - 10} more")
console.print(table)
console.print()
def _perform_cleanup(registry_service, results: dict, feedback):
"""Perform the actual cleanup operations."""
cleaned_count = 0
total_to_clean = sum(len(v) for v in results.values())
# Clean orphaned entries
for item in results["orphaned"]:
try:
if item["type"] == "orphaned_index":
index = registry_service._load_index()
if item["key"] in index.media_index:
del index.media_index[item["key"]]
registry_service._save_index(index)
with feedback.progress("Cleaning registry...", total=total_to_clean) as (
task_id,
progress,
):
def _cleanup_item(item_list, cleanup_func):
nonlocal cleaned_count
for item in item_list:
try:
cleanup_func(item)
cleaned_count += 1
except Exception as e:
feedback.warning(
"Cleanup Error", f"Failed to clean orphaned entry {item['id']}: {e}"
)
except Exception as e:
feedback.warning(
"Cleanup Error",
f"Failed to clean item {item.get('id', 'N/A')}: {e}",
)
progress.advance(task_id) # type: ignore
# Clean invalid entries
for item in results["invalid"]:
try:
if "file" in item:
item["file"].unlink() # Delete the file
cleaned_count += 1
index = registry_service._load_index()
# Also remove from index if present
index = registry_service._load_index()
entry_key = f"{registry_service._media_api}_{item['id']}"
if entry_key in index.media_index:
del index.media_index[entry_key]
registry_service._save_index(index)
_cleanup_item(
results["orphaned"], lambda item: index.media_index.pop(item["key"], None)
)
_cleanup_item(results["invalid"], lambda item: item["file"].unlink())
_cleanup_item(
results["duplicates"], lambda item: index.media_index.pop(item["key"], None)
)
except Exception as e:
feedback.warning(
"Cleanup Error", f"Failed to clean invalid entry {item['id']}: {e}"
)
from ....service.registry.service import REGISTRY_VERSION
# Clean duplicates
for item in results["duplicates"]:
try:
if item["type"] == "duplicate_index":
index = registry_service._load_index()
if item["key"] in index.media_index:
del index.media_index[item["key"]]
registry_service._save_index(index)
cleaned_count += 1
except Exception as e:
feedback.warning(
"Cleanup Error", f"Failed to clean duplicate entry {item['id']}: {e}"
)
# For old format, we just re-save the index to update its version
if results["old_format"]:
index.version = REGISTRY_VERSION
progress.advance(task_id, len(results["old_format"])) # type:ignore
# Clean old format entries
for item in results["old_format"]:
try:
if "file" in item:
# You might want to migrate instead of delete
# For now, we'll just remove old format files
item["file"].unlink()
cleaned_count += 1
except Exception as e:
feedback.warning(
"Cleanup Error", f"Failed to clean old format entry {item['id']}: {e}"
)
feedback.info("Cleanup Results", f"Successfully cleaned {cleaned_count} items")
registry_service._save_index(index)
feedback.success(
"Cleanup Complete",
f"Successfully cleaned {cleaned_count} items from the registry.",
)

View File

@@ -2,16 +2,20 @@
Registry export command - export registry data to various formats
"""
import json
import csv
from pathlib import Path
import json
from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING
import click
from .....core.config import AppConfig
from ....service.feedback import FeedbackService
from ....service.registry.service import MediaRegistryService
from ....utils.feedback import create_feedback_manager
if TYPE_CHECKING:
from ....service.registry.models import MediaRecord
@click.command(help="Export registry data to various formats")
@@ -25,7 +29,7 @@ from ....utils.feedback import create_feedback_manager
@click.option(
"--output",
"-o",
type=click.Path(),
type=click.Path(path_type=Path),
help="Output file path (auto-generated if not specified)",
)
@click.option(
@@ -51,7 +55,7 @@ from ....utils.feedback import create_feedback_manager
def export(
config: AppConfig,
output_format: str,
output: str | None,
output: Path | None,
include_metadata: bool,
status: tuple[str, ...],
compress: bool,
@@ -63,10 +67,10 @@ def export(
Supports JSON, CSV, and XML formats. Can optionally include
detailed metadata and compress the output.
"""
feedback = create_feedback_manager(config.general.icons)
feedback = FeedbackService(config)
try:
registry_service = MediaRegistryService(api, config.registry)
registry_service = MediaRegistryService(api, config.media_registry)
# Generate output filename if not specified
if not output:
@@ -74,20 +78,30 @@ def export(
extension = output_format.lower()
if compress:
extension += ".gz"
output = f"fastanime_registry_{api}_{timestamp}.{extension}"
output_path = Path(output)
output_path = Path(f"fastanime_registry_{api}_{timestamp}.{extension}")
else:
output_path = output
# Get export data
export_data = _prepare_export_data(registry_service, include_metadata, status)
if not export_data["media"]:
feedback.warning(
"No Data", "No media entries to export based on your criteria."
)
return
# Export based on format
if output_format.lower() == "json":
_export_json(export_data, output_path, compress, feedback)
_export_json(export_data, output_path)
elif output_format.lower() == "csv":
_export_csv(export_data, output_path, compress, feedback)
_export_csv(export_data, output_path)
elif output_format.lower() == "xml":
_export_xml(export_data, output_path, compress, feedback)
_export_xml(export_data, output_path)
if compress:
_compress_file(output_path, feedback)
output_path = output_path.with_suffix(output_path.suffix + ".gz")
feedback.success(
"Export Complete",
@@ -100,11 +114,11 @@ def export(
def _prepare_export_data(
registry_service, include_metadata: bool, status_filter: tuple[str, ...]
registry_service: MediaRegistryService,
include_metadata: bool,
status_filter: tuple[str, ...],
) -> dict:
"""Prepare data for export based on options."""
# Convert status filter to enums
from .....libs.media_api.types import UserMediaListStatus
status_map = {
@@ -115,194 +129,106 @@ def _prepare_export_data(
"paused": UserMediaListStatus.PAUSED,
"repeating": UserMediaListStatus.REPEATING,
}
status_enums = [status_map[s] for s in status_filter] if status_filter else None
status_enums = {status_map[s] for s in status_filter}
export_data = {
"metadata": {
"export_timestamp": datetime.now().isoformat(),
"registry_version": registry_service._load_index().version,
"include_metadata": include_metadata,
"filtered_status": list(status_filter) if status_filter else None,
"filtered_status": list(status_filter) if status_filter else "all",
},
"statistics": registry_service.get_registry_stats(),
"media": [],
}
# Get all records and filter by status if specified
all_records = registry_service.get_all_media_records()
for record in all_records:
index_entry = registry_service.get_media_index_entry(record.media_item.id)
# Skip if status filter is specified and doesn't match
if status_enums and (not index_entry or index_entry.status not in status_enums):
continue
media_data = {
"id": record.media_item.id,
"title": {
"english": record.media_item.title.english,
"romaji": record.media_item.title.romaji,
"native": record.media_item.title.native,
},
"user_status": {
"status": index_entry.status.value
if index_entry and index_entry.status
else None,
"progress": index_entry.progress if index_entry else None,
"score": index_entry.score if index_entry else None,
"last_watched": index_entry.last_watched.isoformat()
if index_entry and index_entry.last_watched
else None,
"notes": index_entry.notes if index_entry else None,
},
}
if include_metadata:
media_data.update(
{
"format": record.media_item.format.value
if record.media_item.format
else None,
"episodes": record.media_item.episodes,
"duration": record.media_item.duration,
"status": record.media_item.status.value
if record.media_item.status
else None,
"start_date": record.media_item.start_date.isoformat()
if record.media_item.start_date
else None,
"end_date": record.media_item.end_date.isoformat()
if record.media_item.end_date
else None,
"average_score": record.media_item.average_score,
"popularity": record.media_item.popularity,
"genres": [genre.value for genre in record.media_item.genres],
"tags": [
{"name": tag.name.value, "rank": tag.rank}
for tag in record.media_item.tags
],
"studios": [
studio.name
for studio in record.media_item.studios
if studio.name
],
"description": record.media_item.description,
"cover_image": {
"large": record.media_item.cover_image.large
if record.media_item.cover_image
else None,
"medium": record.media_item.cover_image.medium
if record.media_item.cover_image
else None,
}
if record.media_item.cover_image
else None,
}
)
media_data = _flatten_record_for_export(record, index_entry, include_metadata)
export_data["media"].append(media_data)
return export_data
def _export_json(data: dict, output_path: Path, compress: bool, feedback):
"""Export data to JSON format."""
if compress:
import gzip
def _flatten_record_for_export(
record: "MediaRecord", index_entry, include_metadata: bool
) -> dict:
"""Helper to convert a MediaRecord into a flat dictionary for exporting."""
media_item = record.media_item
with gzip.open(output_path, "wt", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
else:
with open(output_path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
data = {
"id": media_item.id,
"title_english": media_item.title.english,
"title_romaji": media_item.title.romaji,
"title_native": media_item.title.native,
"user_status": index_entry.status.value
if index_entry and index_entry.status
else None,
"user_progress": index_entry.progress if index_entry else None,
"user_score": index_entry.score if index_entry else None,
"user_last_watched": index_entry.last_watched.isoformat()
if index_entry and index_entry.last_watched
else None,
"user_notes": index_entry.notes if index_entry else None,
}
def _export_csv(data: dict, output_path: Path, compress: bool, feedback):
"""Export data to CSV format."""
# Flatten media data for CSV
fieldnames = [
"id",
"title_english",
"title_romaji",
"title_native",
"status",
"progress",
"score",
"last_watched",
"notes",
]
# Add metadata fields if included
if data["metadata"]["include_metadata"]:
fieldnames.extend(
[
"format",
"episodes",
"duration",
"media_status",
"start_date",
"end_date",
"average_score",
"popularity",
"genres",
"description",
]
)
def write_csv(file_obj):
writer = csv.DictWriter(file_obj, fieldnames=fieldnames)
writer.writeheader()
for media in data["media"]:
row = {
"id": media["id"],
"title_english": media["title"]["english"],
"title_romaji": media["title"]["romaji"],
"title_native": media["title"]["native"],
"status": media["user_status"]["status"],
"progress": media["user_status"]["progress"],
"score": media["user_status"]["score"],
"last_watched": media["user_status"]["last_watched"],
"notes": media["user_status"]["notes"],
if include_metadata:
data.update(
{
"format": media_item.format.value if media_item.format else None,
"episodes": media_item.episodes,
"duration_minutes": media_item.duration,
"media_status": media_item.status.value if media_item.status else None,
"start_date": media_item.start_date.isoformat()
if media_item.start_date
else None,
"end_date": media_item.end_date.isoformat()
if media_item.end_date
else None,
"average_score": media_item.average_score,
"popularity": media_item.popularity,
"genres": ", ".join([genre.value for genre in media_item.genres]),
"tags": ", ".join([tag.name.value for tag in media_item.tags]),
"studios": ", ".join(
[studio.name for studio in media_item.studios if studio.name]
),
"description": media_item.description,
"cover_image_large": media_item.cover_image.large
if media_item.cover_image
else None,
}
if data["metadata"]["include_metadata"]:
row.update(
{
"format": media.get("format"),
"episodes": media.get("episodes"),
"duration": media.get("duration"),
"media_status": media.get("status"),
"start_date": media.get("start_date"),
"end_date": media.get("end_date"),
"average_score": media.get("average_score"),
"popularity": media.get("popularity"),
"genres": ",".join(media.get("genres", [])),
"description": media.get("description"),
}
)
writer.writerow(row)
if compress:
import gzip
with gzip.open(output_path, "wt", encoding="utf-8", newline="") as f:
write_csv(f)
else:
with open(output_path, "w", encoding="utf-8", newline="") as f:
write_csv(f)
)
return data
def _export_xml(data: dict, output_path: Path, compress: bool, feedback):
def _export_json(data: dict, output_path: Path):
"""Export data to JSON format."""
with open(output_path, "w", encoding="utf-8") as f:
json.dump(data, f, indent=2, ensure_ascii=False)
def _export_csv(data: dict, output_path: Path):
"""Export data to CSV format."""
if not data["media"]:
return
fieldnames = list(data["media"][0].keys())
with open(output_path, "w", encoding="utf-8", newline="") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(data["media"])
def _export_xml(data: dict, output_path: Path):
"""Export data to XML format."""
try:
import xml.etree.ElementTree as ET
except ImportError:
feedback.error("XML Export Error", "XML export requires Python's xml module")
raise click.Abort()
import xml.etree.ElementTree as ET
root = ET.Element("fastanime_registry")
@@ -313,71 +239,46 @@ def _export_xml(data: dict, output_path: Path, compress: bool, feedback):
elem = ET.SubElement(metadata_elem, key)
elem.text = str(value)
# Add statistics
stats_elem = ET.SubElement(root, "statistics")
for key, value in data["statistics"].items():
if value is not None:
elem = ET.SubElement(stats_elem, key)
elem.text = str(value)
# Add media
media_list_elem = ET.SubElement(root, "media_list")
for media in data["media"]:
media_elem = ET.SubElement(media_list_elem, "media")
media_elem.set("id", str(media["id"]))
# Add titles
titles_elem = ET.SubElement(media_elem, "titles")
for title_type, title_value in media["title"].items():
if title_value:
title_elem = ET.SubElement(titles_elem, title_type)
title_elem.text = title_value
# Add user status
status_elem = ET.SubElement(media_elem, "user_status")
for key, value in media["user_status"].items():
for key, value in media.items():
if value is not None:
elem = ET.SubElement(status_elem, key)
elem.text = str(value)
# Add metadata if included
if data["metadata"]["include_metadata"]:
for key, value in media.items():
if key not in ["id", "title", "user_status"] and value is not None:
if isinstance(value, list):
list_elem = ET.SubElement(media_elem, key)
for item in value:
item_elem = ET.SubElement(list_elem, "item")
item_elem.text = str(item)
elif isinstance(value, dict):
dict_elem = ET.SubElement(media_elem, key)
for sub_key, sub_value in value.items():
if sub_value is not None:
sub_elem = ET.SubElement(dict_elem, sub_key)
sub_elem.text = str(sub_value)
else:
elem = ET.SubElement(media_elem, key)
elem.text = str(value)
field_elem = ET.SubElement(media_elem, key)
field_elem.text = str(value)
# Write XML
tree = ET.ElementTree(root)
if compress:
import gzip
ET.indent(tree, space=" ", level=0) # Pretty print
tree.write(output_path, encoding="utf-8", xml_declaration=True)
with gzip.open(output_path, "wb") as f:
tree.write(f, encoding="utf-8", xml_declaration=True)
else:
tree.write(output_path, encoding="utf-8", xml_declaration=True)
def _compress_file(file_path: Path, feedback: FeedbackService):
"""Compresses a file using gzip and removes the original."""
import gzip
import shutil
compressed_path = file_path.with_suffix(file_path.suffix + ".gz")
try:
with open(file_path, "rb") as f_in:
with gzip.open(compressed_path, "wb") as f_out:
shutil.copyfileobj(f_in, f_out)
file_path.unlink() # Remove original file
except Exception as e:
feedback.warning("Compression Failed", f"Could not compress {file_path}: {e}")
def _format_file_size(file_path: Path) -> str:
"""Format file size in human-readable format."""
try:
size = file_path.stat().st_size
for unit in ["B", "KB", "MB", "GB"]:
if size < 1024.0:
return f"{size:.1f} {unit}"
size /= 1024.0
return f"{size:.1f} TB"
except:
size_bytes: float = float(file_path.stat().st_size)
if size_bytes < 1024.0:
return f"{int(size_bytes)} B"
for unit in ["KB", "MB", "GB"]:
if size_bytes < 1024.0:
return f"{size_bytes:.1f} {unit}"
size_bytes /= 1024.0
return f"{size_bytes:.1f} TB"
except FileNotFoundError:
return "Unknown size"

View File

@@ -2,17 +2,18 @@
Registry import command - import registry data from various formats
"""
import json
import csv
from pathlib import Path
import json
from datetime import datetime
from pathlib import Path
from typing import Any, Dict, Optional
import click
from .....core.config import AppConfig
from .....libs.media_api.types import UserMediaListStatus, MediaItem, MediaTitle
from .....libs.media_api.types import MediaItem, MediaTitle, UserMediaListStatus
from ....service.feedback import FeedbackService
from ....service.registry.service import MediaRegistryService
from ....utils.feedback import create_feedback_manager
@click.command(name="import", help="Import registry data from various formats")
@@ -60,14 +61,14 @@ def import_(
Supports JSON, CSV, and XML formats exported by the export command
or compatible third-party tools.
"""
feedback = create_feedback_manager(config.general.icons)
feedback = FeedbackService(config)
try:
registry_service = MediaRegistryService(api, config.registry)
registry_service = MediaRegistryService(api, config.media_registry)
# Create backup if requested
if backup and not dry_run:
_create_backup(registry_service, feedback)
_create_backup(registry_service, feedback, api)
# Auto-detect format if needed
if input_format == "auto":
@@ -77,7 +78,7 @@ def import_(
)
# Parse input file
import_data = _parse_input_file(input_file, input_format, feedback)
import_data = _parse_input_file(input_file, input_format)
# Validate import data
_validate_import_data(import_data, force, feedback)
@@ -101,15 +102,17 @@ def import_(
raise click.Abort()
def _create_backup(registry_service, feedback):
def _create_backup(
registry_service: MediaRegistryService, feedback: FeedbackService, api: str
):
"""Create a backup before importing."""
from .export import _prepare_export_data, _export_json
from .export import _export_json, _prepare_export_data
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_path = Path(f"fastanime_registry_backup_{timestamp}.json")
backup_path = Path(f"fastanime_registry_pre_import_{api}_{timestamp}.json")
export_data = _prepare_export_data(registry_service, True, ())
_export_json(export_data, backup_path, False, feedback)
_export_json(export_data, backup_path)
feedback.info("Backup Created", f"Registry backed up to {backup_path}")
@@ -117,52 +120,70 @@ def _create_backup(registry_service, feedback):
def _detect_format(file_path: Path) -> str:
"""Auto-detect file format based on extension and content."""
extension = file_path.suffix.lower()
if extension in [".json", ".gz"]:
if ".gz" in file_path.suffixes:
return "json" # Assume gzipped jsons for now
if extension == ".json":
return "json"
elif extension == ".csv":
return "csv"
elif extension == ".xml":
return "xml"
# Try to detect by content
# Fallback to content detection
try:
with open(file_path, "r", encoding="utf-8") as f:
content = f.read(100).strip()
if content.startswith("{") or content.startswith("["):
if content.startswith(("{", "[")):
return "json"
elif content.startswith("<?xml") or content.startswith("<"):
return "xml"
elif "," in content: # Very basic CSV detection
elif "," in content:
return "csv"
except:
except Exception:
pass
raise click.ClickException(f"Could not detect format for {file_path}")
raise click.ClickException(f"Could not auto-detect format for {file_path}")
def _parse_input_file(file_path: Path, format_type: str, feedback) -> dict:
def _parse_input_file(file_path: Path, format_type: str) -> dict:
"""Parse input file based on format."""
if format_type == "json":
return _parse_json(file_path)
elif format_type == "csv":
if format_type == "csv":
return _parse_csv(file_path)
elif format_type == "xml":
if format_type == "xml":
return _parse_xml(file_path)
else:
raise click.ClickException(f"Unsupported format: {format_type}")
raise click.ClickException(f"Unsupported format: {format_type}")
def _safe_int(value: Optional[str]) -> Optional[int]:
if value is None or value == "":
return None
try:
return int(value)
except (ValueError, TypeError):
return None
def _safe_float(value: Optional[str]) -> Optional[float]:
if value is None or value == "":
return None
try:
return float(value)
except (ValueError, TypeError):
return None
def _parse_json(file_path: Path) -> dict:
"""Parse JSON input file."""
try:
if file_path.suffix.lower() == ".gz":
if ".gz" in file_path.suffixes:
import gzip
with gzip.open(file_path, "rt", encoding="utf-8") as f:
return json.load(f)
else:
with open(file_path, "r", encoding="utf-8") as f:
with file_path.open("r", encoding="utf-8") as f:
return json.load(f)
except json.JSONDecodeError as e:
raise click.ClickException(f"Invalid JSON format: {e}")
@@ -170,20 +191,13 @@ def _parse_json(file_path: Path) -> dict:
def _parse_csv(file_path: Path) -> dict:
"""Parse CSV input file."""
import_data = {
"metadata": {
"import_timestamp": datetime.now().isoformat(),
"source_format": "csv",
},
"media": [],
}
import_data = {"metadata": {"source_format": "csv"}, "media": []}
try:
with open(file_path, "r", encoding="utf-8") as f:
with file_path.open("r", encoding="utf-8", newline="") as f:
reader = csv.DictReader(f)
for row in reader:
media_data = {
"id": int(row["id"]) if row.get("id") else None,
media_data: Dict[str, Any] = {
"id": _safe_int(row.get("id")),
"title": {
"english": row.get("title_english"),
"romaji": row.get("title_romaji"),
@@ -191,235 +205,136 @@ def _parse_csv(file_path: Path) -> dict:
},
"user_status": {
"status": row.get("status"),
"progress": int(row["progress"])
if row.get("progress")
else None,
"score": float(row["score"]) if row.get("score") else None,
"progress": _safe_int(row.get("progress")),
"score": _safe_float(row.get("score")),
"last_watched": row.get("last_watched"),
"notes": row.get("notes"),
},
}
# Add metadata fields if present
if "format" in row:
if "format" in row: # Check if detailed metadata is present
media_data.update(
{
"format": row.get("format"),
"episodes": int(row["episodes"])
if row.get("episodes")
else None,
"duration": int(row["duration"])
if row.get("duration")
else None,
"episodes": _safe_int(row.get("episodes")),
"duration": _safe_int(row.get("duration")),
"media_status": row.get("media_status"),
"start_date": row.get("start_date"),
"end_date": row.get("end_date"),
"average_score": float(row["average_score"])
if row.get("average_score")
else None,
"popularity": int(row["popularity"])
if row.get("popularity")
else None,
"average_score": _safe_float(row.get("average_score")),
"popularity": _safe_int(row.get("popularity")),
"genres": row.get("genres", "").split(",")
if row.get("genres")
else [],
"description": row.get("description"),
}
)
import_data["media"].append(media_data)
except (ValueError, KeyError) as e:
except (ValueError, KeyError, csv.Error) as e:
raise click.ClickException(f"Invalid CSV format: {e}")
return import_data
def _parse_xml(file_path: Path) -> dict:
"""Parse XML input file."""
try:
import xml.etree.ElementTree as ET
except ImportError:
raise click.ClickException("XML import requires Python's xml module")
import xml.etree.ElementTree as ET
try:
tree = ET.parse(file_path)
root = tree.getroot()
import_data: Dict[str, Any] = {"metadata": {}, "media": []}
import_data = {"metadata": {}, "media": []}
# Parse metadata
metadata_elem = root.find("metadata")
if metadata_elem is not None:
for child in metadata_elem:
import_data["metadata"][child.tag] = child.text
# Parse media
media_list_elem = root.find("media_list")
if media_list_elem is not None:
for media_elem in media_list_elem.findall("media"):
media_data = {
"id": int(media_elem.get("id")),
"title": {},
"user_status": {},
}
# Parse titles
titles_elem = media_elem.find("titles")
if titles_elem is not None:
for title_elem in titles_elem:
media_data["title"][title_elem.tag] = title_elem.text
# Parse user status
status_elem = media_elem.find("user_status")
if status_elem is not None:
for child in status_elem:
value = child.text
if child.tag in ["progress", "score"] and value:
try:
value = (
float(value) if child.tag == "score" else int(value)
)
except ValueError:
pass
media_data["user_status"][child.tag] = value
# Parse other metadata
for child in media_elem:
if child.tag not in ["titles", "user_status"]:
if child.tag in ["episodes", "duration", "popularity"]:
try:
media_data[child.tag] = (
int(child.text) if child.text else None
)
except ValueError:
media_data[child.tag] = child.text
elif child.tag == "average_score":
try:
media_data[child.tag] = (
float(child.text) if child.text else None
)
except ValueError:
media_data[child.tag] = child.text
else:
media_data[child.tag] = child.text
import_data["media"].append(media_data)
for child in root.find("metadata") or []:
import_data["metadata"][child.tag] = child.text
for media_elem in root.find("media_list") or []:
media_data = {child.tag: child.text for child in media_elem}
# Reconstruct nested structures for consistency with other parsers
media_data["id"] = _safe_int(media_data.get("id"))
media_data["title"] = {
"english": media_data.pop("title_english", None),
"romaji": media_data.pop("title_romaji", None),
"native": media_data.pop("title_native", None),
}
media_data["user_status"] = {
"status": media_data.pop("user_status", None),
"progress": _safe_int(media_data.pop("user_progress", None)),
"score": _safe_float(media_data.pop("user_score", None)),
"last_watched": media_data.pop("user_last_watched", None),
"notes": media_data.pop("user_notes", None),
}
import_data["media"].append(media_data)
except ET.ParseError as e:
raise click.ClickException(f"Invalid XML format: {e}")
return import_data
def _validate_import_data(data: dict, force: bool, feedback):
def _validate_import_data(data: dict, force: bool, feedback: FeedbackService):
"""Validate import data structure and compatibility."""
if "media" not in data:
raise click.ClickException("Import data missing 'media' section")
if not isinstance(data["media"], list):
raise click.ClickException("'media' section must be a list")
# Check if any media entries exist
if "media" not in data or not isinstance(data["media"], list):
raise click.ClickException(
"Import data missing or has invalid 'media' section."
)
if not data["media"]:
feedback.warning("No Media", "Import file contains no media entries")
feedback.warning("No Media", "Import file contains no media entries.")
return
# Validate media entries
required_fields = ["id", "title"]
for i, media in enumerate(data["media"]):
for field in required_fields:
if field not in media:
raise click.ClickException(
f"Media entry {i} missing required field: {field}"
)
if "id" not in media or "title" not in media:
raise click.ClickException(
f"Media entry {i + 1} missing required 'id' or 'title' field."
)
if not isinstance(media.get("title"), dict):
raise click.ClickException(f"Media entry {i} has invalid title format")
raise click.ClickException(f"Media entry {i + 1} has invalid title format.")
feedback.info(
"Validation", f"Import data validated - {len(data['media'])} media entries"
"Validation",
f"Import data validated - {len(data['media'])} media entries found.",
)
def _import_data(registry_service, data: dict, merge: bool, dry_run: bool, feedback):
def _import_data(
registry_service: MediaRegistryService,
data: dict,
merge: bool,
dry_run: bool,
feedback: FeedbackService,
):
"""Import data into the registry."""
from .....libs.media_api.types import (
MediaFormat,
MediaType,
)
from .....libs.media_api.types import MediaType
imported_count = 0
updated_count = 0
error_count = 0
status_map = {
"watching": UserMediaListStatus.WATCHING,
"completed": UserMediaListStatus.COMPLETED,
"planning": UserMediaListStatus.PLANNING,
"dropped": UserMediaListStatus.DROPPED,
"paused": UserMediaListStatus.PAUSED,
"repeating": UserMediaListStatus.REPEATING,
}
imported_count, updated_count, error_count = 0, 0, 0
status_map = {status.value: status for status in UserMediaListStatus}
for media_data in data["media"]:
try:
media_id = media_data["id"]
media_id = media_data.get("id")
if not media_id:
error_count += 1
continue
title_data = media_data.get("title", {})
title = MediaTitle(
english=title_data.get("english") or "",
romaji=title_data.get("romaji"),
native=title_data.get("native"),
)
# Create minimal MediaItem for registry
media_item = MediaItem(
id=media_id,
title=title,
type=MediaType.ANIME, # Default to anime
)
# Add additional metadata if available
if "format" in media_data and media_data["format"]:
try:
media_item.format = getattr(MediaFormat, media_data["format"])
except (AttributeError, TypeError):
pass
if "episodes" in media_data:
media_item.episodes = media_data["episodes"]
if "average_score" in media_data:
media_item.average_score = media_data["average_score"]
title = MediaTitle(**media_data.get("title", {}))
media_item = MediaItem(id=media_id, title=title, type=MediaType.ANIME)
if dry_run:
title_str = title.english or title.romaji or f"ID:{media_id}"
feedback.info("Would import", title_str)
feedback.info(
"Would import", title.english or title.romaji or f"ID:{media_id}"
)
imported_count += 1
continue
# Check if record exists
existing_record = registry_service.get_media_record(media_id)
if existing_record and not merge:
# Skip if not merging
continue
elif existing_record:
updated_count += 1
else:
imported_count += 1
# Create or update record
updated_count += 1 if existing_record else 0
imported_count += 1 if not existing_record else 0
record = registry_service.get_or_create_record(media_item)
registry_service.save_media_record(record)
# Update user status if provided
user_status = media_data.get("user_status", {})
if user_status.get("status"):
status_enum = status_map.get(user_status["status"].lower())
status_enum = status_map.get(str(user_status["status"]).lower())
if status_enum:
registry_service.update_media_index_entry(
media_id,
@@ -429,14 +344,12 @@ def _import_data(registry_service, data: dict, merge: bool, dry_run: bool, feedb
score=user_status.get("score"),
notes=user_status.get("notes"),
)
except Exception as e:
error_count += 1
feedback.warning(
"Import Error",
f"Failed to import media {media_data.get('id', 'unknown')}: {e}",
)
continue
if not dry_run:
feedback.info(

View File

@@ -2,16 +2,17 @@
Registry restore command - restore registry from backup files
"""
import json
import shutil
import tarfile
from pathlib import Path
from datetime import datetime
from pathlib import Path
import click
from .....core.config import AppConfig
from ....service.feedback import FeedbackService
from ....service.registry.service import MediaRegistryService
from ....utils.feedback import create_feedback_manager
@click.command(help="Restore registry from a backup file")
@@ -46,7 +47,7 @@ def restore(
Can restore from tar or zip backups created by the backup command.
Optionally creates a backup of the current registry before restoring.
"""
feedback = create_feedback_manager(config.general.icons)
feedback = FeedbackService(config)
try:
# Detect backup format
@@ -57,17 +58,20 @@ def restore(
if verify:
if not _verify_backup(backup_file, backup_format, feedback):
feedback.error(
"Verification Failed", "Backup file appears to be corrupted"
"Verification Failed",
"Backup file appears to be corrupted or invalid",
)
raise click.Abort()
feedback.success("Verification", "Backup file integrity verified")
# Check if current registry exists
registry_service = MediaRegistryService(api, config.registry)
registry_service = MediaRegistryService(api, config.media_registry)
registry_exists = _check_registry_exists(registry_service)
if registry_exists and not force:
if not click.confirm("Current registry exists. Continue with restore?"):
if not click.confirm(
"Current registry exists. This will overwrite it. Continue with restore?"
):
feedback.info("Restore Cancelled", "No changes were made")
return
@@ -87,7 +91,7 @@ def restore(
# Verify restored registry
try:
restored_service = MediaRegistryService(api, config.registry)
restored_service = MediaRegistryService(api, config.media_registry)
stats = restored_service.get_registry_stats()
feedback.info(
"Restored Registry",
@@ -105,65 +109,30 @@ def restore(
def _detect_backup_format(backup_file: Path) -> str:
"""Detect backup file format."""
if backup_file.suffix.lower() in [".tar", ".gz"]:
suffixes = "".join(backup_file.suffixes).lower()
if ".tar" in suffixes or ".gz" in suffixes or ".tgz" in suffixes:
return "tar"
elif backup_file.suffix.lower() == ".zip":
elif ".zip" in suffixes:
return "zip"
elif backup_file.name.endswith(".tar.gz"):
return "tar"
else:
# Try to detect by content
try:
with tarfile.open(backup_file, "r:*"):
return "tar"
except:
pass
try:
import zipfile
with zipfile.ZipFile(backup_file, "r"):
return "zip"
except:
pass
raise click.ClickException(f"Could not detect backup format for {backup_file}")
def _verify_backup(backup_file: Path, format_type: str, feedback) -> bool:
def _verify_backup(
backup_file: Path, format_type: str, feedback: FeedbackService
) -> bool:
"""Verify backup file integrity."""
try:
has_registry = has_index = has_metadata = False
if format_type == "tar":
with tarfile.open(backup_file, "r:*") as tar:
# Check if essential files exist
names = tar.getnames()
has_registry = any("registry/" in name for name in names)
has_index = any("index/" in name for name in names)
has_metadata = "backup_metadata.json" in names
if not (has_registry and has_index):
return False
# Try to read metadata if it exists
if has_metadata:
try:
metadata_member = tar.getmember("backup_metadata.json")
metadata_file = tar.extractfile(metadata_member)
if metadata_file:
import json
metadata = json.load(metadata_file)
feedback.info(
"Backup Info",
f"Created: {metadata.get('backup_timestamp', 'Unknown')}",
)
feedback.info(
"Backup Info",
f"Total Media: {metadata.get('total_media', 'Unknown')}",
)
except:
pass
metadata_member = tar.getmember("backup_metadata.json")
if metadata_file := tar.extractfile(metadata_member):
metadata = json.load(metadata_file)
else: # zip
import zipfile
@@ -172,44 +141,38 @@ def _verify_backup(backup_file: Path, format_type: str, feedback) -> bool:
has_registry = any("registry/" in name for name in names)
has_index = any("index/" in name for name in names)
has_metadata = "backup_metadata.json" in names
if not (has_registry and has_index):
return False
# Try to read metadata
if has_metadata:
try:
with zip_file.open("backup_metadata.json") as metadata_file:
import json
with zip_file.open("backup_metadata.json") as metadata_file:
metadata = json.load(metadata_file)
metadata = json.load(metadata_file)
feedback.info(
"Backup Info",
f"Created: {metadata.get('backup_timestamp', 'Unknown')}",
)
feedback.info(
"Backup Info",
f"Total Media: {metadata.get('total_media', 'Unknown')}",
)
except:
pass
if has_metadata:
feedback.info(
"Backup Info", f"Created: {metadata.get('backup_timestamp', 'Unknown')}"
)
feedback.info(
"Backup Info", f"Total Media: {metadata.get('total_media', 'Unknown')}"
)
return True
except Exception:
return has_registry and has_index
except (tarfile.ReadError, zipfile.BadZipFile, json.JSONDecodeError):
return False
except Exception as e:
feedback.warning("Verification Warning", f"Could not fully verify backup: {e}")
return False
def _check_registry_exists(registry_service) -> bool:
def _check_registry_exists(registry_service: MediaRegistryService) -> bool:
"""Check if a registry already exists."""
try:
stats = registry_service.get_registry_stats()
return stats.get("total_media", 0) > 0
except:
except Exception:
return False
def _backup_current_registry(registry_service, api: str, feedback):
def _backup_current_registry(
registry_service: MediaRegistryService, api: str, feedback: FeedbackService
):
"""Create backup of current registry before restoring."""
from .backup import _create_tar_backup
@@ -218,20 +181,21 @@ def _backup_current_registry(registry_service, api: str, feedback):
try:
_create_tar_backup(registry_service, backup_path, True, False, feedback, api)
feedback.info("Current Registry Backed Up", f"Saved to {backup_path}")
feedback.success("Current Registry Backed Up", f"Saved to {backup_path}")
except Exception as e:
feedback.warning("Backup Warning", f"Failed to backup current registry: {e}")
def _show_restore_summary(backup_file: Path, format_type: str, feedback):
def _show_restore_summary(
backup_file: Path, format_type: str, feedback: FeedbackService
):
"""Show summary of what will be restored."""
try:
file_count = media_files = 0
if format_type == "tar":
with tarfile.open(backup_file, "r:*") as tar:
members = tar.getmembers()
file_count = len([m for m in members if m.isfile()])
# Count media files
media_files = len(
[
m
@@ -239,15 +203,12 @@ def _show_restore_summary(backup_file: Path, format_type: str, feedback):
if m.name.startswith("registry/") and m.name.endswith(".json")
]
)
else: # zip
import zipfile
with zipfile.ZipFile(backup_file, "r") as zip_file:
info_list = zip_file.infolist()
file_count = len([info for info in info_list if not info.is_dir()])
# Count media files
media_files = len(
[
info
@@ -257,70 +218,69 @@ def _show_restore_summary(backup_file: Path, format_type: str, feedback):
]
)
feedback.info("Restore Preview", f"Will restore {file_count} files")
feedback.info("Media Records", f"Contains {media_files} media entries")
feedback.info(
"Restore Preview",
f"Backup contains {file_count} files, including {media_files} media entries.",
)
except Exception as e:
feedback.warning("Preview Error", f"Could not analyze backup: {e}")
def _perform_restore(
backup_file: Path, format_type: str, config: AppConfig, api: str, feedback
backup_file: Path,
format_type: str,
config: AppConfig,
api: str,
feedback: FeedbackService,
):
"""Perform the actual restore operation."""
# Create temporary extraction directory
temp_dir = Path(config.registry.media_dir.parent / "restore_temp")
temp_dir.mkdir(exist_ok=True)
temp_dir = Path(
config.media_registry.media_dir.parent
/ f"restore_temp_{datetime.now().timestamp()}"
)
temp_dir.mkdir(exist_ok=True, parents=True)
try:
# Extract backup
if format_type == "tar":
with tarfile.open(backup_file, "r:*") as tar:
tar.extractall(temp_dir)
else: # zip
import zipfile
with feedback.progress("Restoring from backup...") as (task_id, progress):
# 1. Extract backup
progress.update(task_id, description="Extracting backup...")
if format_type == "tar":
with tarfile.open(backup_file, "r:*") as tar:
tar.extractall(temp_dir)
else:
import zipfile
with zipfile.ZipFile(backup_file, "r") as zip_file:
zip_file.extractall(temp_dir)
with zipfile.ZipFile(backup_file, "r") as zip_file:
zip_file.extractall(temp_dir)
feedback.info("Extraction", "Backup extracted to temporary directory")
feedback.info("Extraction", "Backup extracted to temporary directory")
# 2. Prepare paths
registry_dir = config.media_registry.media_dir / api
index_dir = config.media_registry.index_dir
cache_dir = config.media_registry.media_dir.parent / "cache"
# Remove existing registry if it exists
registry_dir = config.registry.media_dir / api
index_dir = config.registry.index_dir
if registry_dir.exists():
shutil.rmtree(registry_dir)
feedback.info("Cleanup", "Removed existing registry data")
if index_dir.exists():
shutil.rmtree(index_dir)
feedback.info("Cleanup", "Removed existing index data")
# Move extracted files to proper locations
extracted_registry = temp_dir / "registry" / api
extracted_index = temp_dir / "index"
if extracted_registry.exists():
shutil.move(str(extracted_registry), str(registry_dir))
feedback.info("Restore", "Registry data restored")
if extracted_index.exists():
shutil.move(str(extracted_index), str(index_dir))
feedback.info("Restore", "Index data restored")
# Restore cache if it exists
extracted_cache = temp_dir / "cache"
if extracted_cache.exists():
cache_dir = config.registry.media_dir.parent / "cache"
# 3. Clean existing data
progress.update(task_id, description="Cleaning existing registry...")
if registry_dir.exists():
shutil.rmtree(registry_dir)
if index_dir.exists():
shutil.rmtree(index_dir)
if cache_dir.exists():
shutil.rmtree(cache_dir)
shutil.move(str(extracted_cache), str(cache_dir))
feedback.info("Restore", "Cache data restored")
feedback.info("Cleanup", "Removed existing registry, index, and cache data")
# 4. Move extracted files
progress.update(task_id, description="Moving new files into place...")
if (extracted_registry := temp_dir / "registry" / api).exists():
shutil.move(str(extracted_registry), str(registry_dir))
if (extracted_index := temp_dir / "index").exists():
shutil.move(str(extracted_index), str(index_dir))
if (extracted_cache := temp_dir / "cache").exists():
shutil.move(str(extracted_cache), str(cache_dir))
progress.update(task_id, description="Finalizing...")
finally:
# Clean up temporary directory
if temp_dir.exists():
shutil.rmtree(temp_dir)
feedback.info("Cleanup", "Temporary files removed")

View File

@@ -2,15 +2,26 @@
Registry search command - search through the local media registry
"""
import json
from typing import TYPE_CHECKING
import click
from rich.console import Console
from rich.table import Table
from .....core.config import AppConfig
from .....libs.media_api.params import MediaSearchParams
from .....libs.media_api.types import MediaSort, UserMediaListStatus
from .....libs.media_api.types import (
MediaFormat,
MediaGenre,
MediaSort,
UserMediaListStatus,
)
from ....service.feedback import FeedbackService
from ....service.registry.service import MediaRegistryService
from ....utils.feedback import create_feedback_manager
if TYPE_CHECKING:
from .....libs.media_api.types import MediaSearchResult
@click.command(help="Search through the local media registry")
@@ -18,7 +29,7 @@ from ....utils.feedback import create_feedback_manager
@click.option(
"--status",
type=click.Choice(
["watching", "completed", "planning", "dropped", "paused", "repeating"],
[s.value for s in UserMediaListStatus],
case_sensitive=False,
),
help="Filter by watch status",
@@ -29,7 +40,11 @@ from ....utils.feedback import create_feedback_manager
@click.option(
"--format",
type=click.Choice(
["TV", "TV_SHORT", "MOVIE", "SPECIAL", "OVA", "ONA", "MUSIC"],
[
f.value
for f in MediaFormat
if f not in [MediaFormat.MANGA, MediaFormat.NOVEL, MediaFormat.ONE_SHOT]
],
case_sensitive=False,
),
help="Filter by format",
@@ -77,28 +92,25 @@ def search(
You can search by title and filter by various criteria like status,
genre, format, year, and score range.
"""
feedback = create_feedback_manager(config.general.icons)
feedback = FeedbackService(config)
console = Console()
try:
registry_service = MediaRegistryService(api, config.registry)
registry_service = MediaRegistryService(api, config.media_registry)
# Build search parameters
search_params = _build_search_params(
query, status, genre, format, year, min_score, max_score, sort, limit
)
# Perform search
result = registry_service.search_for_media(search_params)
with feedback.progress("Searching local registry..."):
result = registry_service.search_for_media(search_params)
if not result or not result.media:
feedback.info("No Results", "No media found matching your criteria")
return
if output_json:
import json
print(json.dumps(result.model_dump(), indent=2, default=str))
print(json.dumps(result.model_dump(mode="json"), indent=2))
return
_display_search_results(console, result, config.general.icons)
@@ -109,24 +121,17 @@ def search(
def _build_search_params(
query, status, genre, format, year, min_score, max_score, sort, limit
query: str | None,
status: str | None,
genre: tuple[str, ...],
format_str: str | None,
year: int | None,
min_score: float | None,
max_score: float | None,
sort: str,
limit: int,
) -> MediaSearchParams:
"""Build MediaSearchParams from command options."""
# Convert status string to enum
status_enum = None
if status:
status_map = {
"watching": UserMediaListStatus.WATCHING,
"completed": UserMediaListStatus.COMPLETED,
"planning": UserMediaListStatus.PLANNING,
"dropped": UserMediaListStatus.DROPPED,
"paused": UserMediaListStatus.PAUSED,
"repeating": UserMediaListStatus.REPEATING,
}
status_enum = status_map.get(status.lower())
# Convert sort string to enum
"""Build MediaSearchParams from command options for local filtering."""
sort_map = {
"title": MediaSort.TITLE_ROMAJI,
"score": MediaSort.SCORE_DESC,
@@ -135,74 +140,52 @@ def _build_search_params(
"episodes": MediaSort.EPISODES_DESC,
"updated": MediaSort.UPDATED_AT_DESC,
}
sort_enum = sort_map.get(sort.lower(), MediaSort.TITLE_ROMAJI)
# Convert format string to enum if provided
format_enum = None
if format:
from .....libs.media_api.types import MediaFormat
# Safely convert strings to enums
format_enum = next(
(f for f in MediaFormat if f.value.lower() == (format_str or "").lower()), None
)
genre_enums = [
g for g_str in genre for g in MediaGenre if g.value.lower() == g_str.lower()
]
format_enum = getattr(MediaFormat, format.upper(), None)
# Convert genre strings to enums
genre_enums = []
if genre:
from .....libs.media_api.types import MediaGenre
for g in genre:
# Try to find matching genre enum
for genre_enum in MediaGenre:
if genre_enum.value.lower() == g.lower():
genre_enums.append(genre_enum)
break
# Note: Local search handles status separately as it's part of the index, not MediaItem
return MediaSearchParams(
query=query,
per_page=limit,
sort=[sort_enum],
averageScore_greater=min_score * 10
if min_score
else None, # Convert to AniList scale
averageScore_lesser=max_score * 10 if max_score else None,
genre_in=genre_enums if genre_enums else None,
sort=[sort_map.get(sort.lower(), MediaSort.TITLE_ROMAJI)],
averageScore_greater=int(min_score * 10) if min_score is not None else None,
averageScore_lesser=int(max_score * 10) if max_score is not None else None,
genre_in=genre_enums or None,
format_in=[format_enum] if format_enum else None,
seasonYear=year,
# We'll handle status filtering differently since it's user-specific
)
def _display_search_results(console: Console, result, icons: bool):
def _display_search_results(console: Console, result: "MediaSearchResult", icons: bool):
"""Display search results in a formatted table."""
table = Table(
title=f"{'🔍 ' if icons else ''}Search Results ({len(result.media)} found)"
)
table.add_column("Title", style="cyan", min_width=30)
table.add_column("Year", style="dim", justify="center", min_width=6)
table.add_column("Format", style="magenta", justify="center", min_width=8)
table.add_column("Episodes", style="green", justify="center", min_width=8)
table.add_column("Score", style="yellow", justify="center", min_width=6)
table.add_column("Status", style="blue", justify="center", min_width=10)
table.add_column("Progress", style="white", justify="center", min_width=8)
table.add_column("Title", style="cyan", min_width=30, overflow="ellipsis")
table.add_column("Year", style="dim", justify="center")
table.add_column("Format", style="magenta", justify="center")
table.add_column("Episodes", style="green", justify="center")
table.add_column("Score", style="yellow", justify="center")
table.add_column("Status", style="blue", justify="center")
table.add_column("Progress", style="white", justify="center")
for media in result.media:
# Get title (prefer English, fallback to Romaji)
title = media.title.english or media.title.romaji or "Unknown"
if len(title) > 40:
title = title[:37] + "..."
year = str(media.start_date.year) if media.start_date else "N/A"
episodes_total = str(media.episodes) if media.episodes else "?"
score = (
f"{media.average_score / 10:.1f}"
if media.average_score is not None
else "N/A"
)
# Get year from start date
year = ""
if media.start_date:
year = str(media.start_date.year)
# Format episodes
episodes = str(media.episodes) if media.episodes else "?"
# Format score
score = f"{media.average_score / 10:.1f}" if media.average_score else "N/A"
# Get user status
status = "Not Listed"
progress = "0"
if media.user_status:
@@ -211,13 +194,13 @@ def _display_search_results(console: Console, result, icons: bool):
if media.user_status.status
else "Unknown"
)
progress = f"{media.user_status.progress or 0}/{episodes}"
progress = f"{media.user_status.progress or 0}/{episodes_total}"
table.add_row(
title,
year,
media.format.value if media.format else "Unknown",
episodes,
media.format.value if media.format else "N/A",
episodes_total,
score,
status,
progress,
@@ -225,8 +208,7 @@ def _display_search_results(console: Console, result, icons: bool):
console.print(table)
# Show pagination info if applicable
if result.page_info.total > len(result.media):
if result.page_info and result.page_info.total > len(result.media):
console.print(
f"\n[dim]Showing {len(result.media)} of {result.page_info.total} total results[/dim]"
)

View File

@@ -2,14 +2,25 @@
Registry stats command - show detailed statistics about the local registry
"""
import json
from datetime import datetime, timedelta
from typing import TYPE_CHECKING, Dict
import click
from rich.columns import Columns
from rich.console import Console
from rich.table import Table
from rich.panel import Panel
from rich.table import Table
from .....core.config import AppConfig
from ....service.feedback import FeedbackService
from ....service.registry.service import MediaRegistryService
from ....utils.feedback import create_feedback_manager
if TYPE_CHECKING:
from ....service.registry.service import StatBreakdown
# --- Constants for better maintainability ---
TOP_N_STATS = 10
@click.command(help="Show detailed statistics about the local media registry")
@@ -36,16 +47,14 @@ def stats(config: AppConfig, detailed: bool, output_json: bool, api: str):
Shows total counts, status breakdown, and optionally detailed
analysis by genre, format, and release year.
"""
feedback = create_feedback_manager(config.general.icons)
feedback = FeedbackService(config)
console = Console()
try:
registry_service = MediaRegistryService(api, config.registry)
registry_service = MediaRegistryService(api, config.media_registry)
stats_data = registry_service.get_registry_stats()
if output_json:
import json
print(json.dumps(stats_data, indent=2, default=str))
return
@@ -59,124 +68,185 @@ def stats(config: AppConfig, detailed: bool, output_json: bool, api: str):
raise click.Abort()
def _display_stats_overview(console: Console, stats: dict, api: str, icons: bool):
"""Display basic registry statistics overview."""
def _display_stats_overview(
console: Console, stats: "StatBreakdown", api: str, icons: bool
):
"""
Display the main overview and status breakdown tables.
"""
# --- Main Overview Table ---
overview_table = Table.grid(expand=True, padding=(0, 1))
overview_table.add_column("Metric", style="bold cyan", no_wrap=True)
overview_table.add_column("Value", style="white")
# Main overview panel
overview_text = f"[bold cyan]Media API:[/bold cyan] {api.title()}\n"
overview_text += (
f"[bold cyan]Total Media:[/bold cyan] {stats.get('total_media', 0)}\n"
)
overview_text += (
f"[bold cyan]Registry Version:[/bold cyan] {stats.get('version', 'Unknown')}\n"
)
overview_text += (
f"[bold cyan]Last Updated:[/bold cyan] {stats.get('last_updated', 'Never')}\n"
)
overview_text += (
f"[bold cyan]Storage Size:[/bold cyan] {stats.get('storage_size', 'Unknown')}"
)
overview_table.add_row("Media API:", api.title())
overview_table.add_row("Total Media:", str(stats.get("total_media", 0)))
overview_table.add_row("Registry Version:", str(stats.get("version", "Unknown")))
panel = Panel(
overview_text,
title=f"{'📊 ' if icons else ''}Registry Overview",
border_style="cyan",
# Format "Last Updated" timestamp to be more human-readable
last_updated_str = stats.get("last_updated", "Never")
if last_updated_str != "Never":
try:
last_updated_dt = datetime.fromisoformat(last_updated_str)
last_updated_str = _format_timedelta(datetime.now() - last_updated_dt)
except (ValueError, TypeError):
pass # Keep original string if parsing fails
overview_table.add_row("Last Updated:", last_updated_str)
# Format storage size
storage_size_str = _format_storage_size(float(stats.get("storage_size_bytes", 0)))
overview_table.add_row("Storage Size:", storage_size_str)
console.print(
Panel(
overview_table,
title=f"{'📊 ' if icons else ''}Registry Overview",
border_style="cyan",
)
)
console.print(panel)
console.print()
# Status breakdown table
# --- Status Breakdown Table ---
status_breakdown = stats.get("status_breakdown", {})
if status_breakdown:
table = Table(title=f"{'📋 ' if icons else ''}Status Breakdown")
table.add_column("Status", style="cyan", no_wrap=True)
table.add_column("Count", style="magenta", justify="right")
table.add_column("Percentage", style="green", justify="right")
total = sum(status_breakdown.values())
for status, count in sorted(status_breakdown.items()):
percentage = (count / total * 100) if total > 0 else 0
table.add_row(status.title(), str(count), f"{percentage:.1f}%")
console.print(table)
status_table = _create_breakdown_table(
title=f"{'📋 ' if icons else ''}Status Breakdown",
data=status_breakdown,
key_header="Status",
value_header="Count",
show_percentage=True,
)
console.print(status_table)
console.print()
# Download status breakdown
# --- Download Status Table ---
download_stats = stats.get("download_stats", {})
if download_stats:
table = Table(title=f"{'💾 ' if icons else ''}Download Status")
table.add_column("Status", style="cyan", no_wrap=True)
table.add_column("Count", style="magenta", justify="right")
for status, count in download_stats.items():
table.add_row(status.title(), str(count))
console.print(table)
download_table = _create_breakdown_table(
title=f"{'💾 ' if icons else ''}Download Status",
data=download_stats,
key_header="Status",
value_header="Count",
show_percentage=False,
)
console.print(download_table)
console.print()
def _display_detailed_stats(console: Console, stats: dict, icons: bool):
"""Display detailed breakdown by various categories."""
def _display_detailed_stats(console: Console, stats: "StatBreakdown", icons: bool):
"""
Display detailed breakdowns by various categories using a column layout.
"""
genre_table = _create_breakdown_table(
title=f"{'🎭 ' if icons else ''}Top {TOP_N_STATS} Genres",
data=stats.get("genre_breakdown", {}),
key_header="Genre",
value_header="Count",
limit=TOP_N_STATS,
)
# Genre breakdown
genre_breakdown = stats.get("genre_breakdown", {})
if genre_breakdown:
table = Table(title=f"{'🎭 ' if icons else ''}Top Genres")
table.add_column("Genre", style="cyan")
table.add_column("Count", style="magenta", justify="right")
format_table = _create_breakdown_table(
title=f"{'📺 ' if icons else ''}Format Breakdown",
data=stats.get("format_breakdown", {}),
key_header="Format",
value_header="Count",
show_percentage=True,
)
# Sort by count and show top 10
top_genres = sorted(genre_breakdown.items(), key=lambda x: x[1], reverse=True)[
:10
]
for genre, count in top_genres:
table.add_row(genre, str(count))
year_table = _create_breakdown_table(
title=f"{'📅 ' if icons else ''}Top {TOP_N_STATS} Release Years",
data=stats.get("year_breakdown", {}),
key_header="Year",
value_header="Count",
sort_by_key=True,
limit=TOP_N_STATS,
)
console.print(table)
console.print()
rating_table = _create_breakdown_table(
title=f"{'' if icons else ''}Score Distribution",
data=stats.get("rating_breakdown", {}),
key_header="Score Range",
value_header="Count",
sort_by_key=True,
reverse_sort=False,
)
# Format breakdown
format_breakdown = stats.get("format_breakdown", {})
if format_breakdown:
table = Table(title=f"{'📺 ' if icons else ''}Format Breakdown")
table.add_column("Format", style="cyan")
table.add_column("Count", style="magenta", justify="right")
# Render tables in columns for a compact view
console.print(Columns([genre_table, format_table], equal=True, expand=True))
console.print()
console.print(Columns([year_table, rating_table], equal=True, expand=True))
def _create_breakdown_table(
title: str,
data: Dict,
key_header: str,
value_header: str,
show_percentage: bool = False,
sort_by_key: bool = False,
reverse_sort: bool = True,
limit: int = 0,
) -> Table:
"""
Generic helper to create a rich Table for breakdown statistics.
"""
table = Table(title=title)
table.add_column(key_header, style="cyan")
table.add_column(value_header, style="magenta", justify="right")
if show_percentage:
table.add_column("Percentage", style="green", justify="right")
total = sum(format_breakdown.values())
for format_type, count in sorted(format_breakdown.items()):
if not data:
row = (
["No data available", "-", "-"]
if show_percentage
else ["No data available", "-"]
)
table.add_row(*row)
return table
total = sum(data.values())
# Determine sorting method
sort_key = lambda item: item[0] if sort_by_key else item[1]
sorted_data = sorted(data.items(), key=sort_key, reverse=reverse_sort)
# Apply limit if specified
if limit > 0:
sorted_data = sorted_data[:limit]
for key, count in sorted_data:
row = [str(key).title(), str(count)]
if show_percentage:
percentage = (count / total * 100) if total > 0 else 0
table.add_row(format_type, str(count), f"{percentage:.1f}%")
row.append(f"{percentage:.1f}%")
table.add_row(*row)
console.print(table)
console.print()
return table
# Year breakdown
year_breakdown = stats.get("year_breakdown", {})
if year_breakdown:
table = Table(title=f"{'📅 ' if icons else ''}Release Years (Top 10)")
table.add_column("Year", style="cyan", justify="center")
table.add_column("Count", style="magenta", justify="right")
# Sort by year descending and show top 10
top_years = sorted(year_breakdown.items(), key=lambda x: x[0], reverse=True)[
:10
]
for year, count in top_years:
table.add_row(str(year), str(count))
def _format_storage_size(size_bytes: float) -> str:
"""Formats bytes into a human-readable string (KB, MB, GB)."""
if size_bytes == 0:
return "0 B"
size_name = ("B", "KB", "MB", "GB", "TB")
i = 0
while size_bytes >= 1024.0 and i < len(size_name) - 1:
size_bytes /= 1024.0
i += 1
return f"{size_bytes:.2f} {size_name[i]}"
console.print(table)
console.print()
# Rating breakdown
rating_breakdown = stats.get("rating_breakdown", {})
if rating_breakdown:
table = Table(title=f"{'' if icons else ''}Score Distribution")
table.add_column("Score Range", style="cyan")
table.add_column("Count", style="magenta", justify="right")
for score_range, count in sorted(rating_breakdown.items()):
table.add_row(score_range, str(count))
console.print(table)
console.print()
def _format_timedelta(delta: timedelta) -> str:
"""Formats a timedelta into a human-readable relative time string."""
seconds = int(delta.total_seconds())
if seconds < 60:
return "Just now"
minutes = seconds // 60
if minutes < 60:
return f"{minutes} minute{'s' if minutes > 1 else ''} ago"
hours = minutes // 60
if hours < 24:
return f"{hours} hour{'s' if hours > 1 else ''} ago"
days = hours // 24
return f"{days} day{'s' if days > 1 else ''} ago"

View File

@@ -5,7 +5,6 @@ Registry sync command - synchronize local registry with remote media API
import click
from fastanime.cli.service.feedback.service import FeedbackService
from fastanime.cli.service.registry.service import MediaRegistryService
from rich.progress import Progress
from .....core.config import AppConfig

View File

@@ -141,7 +141,7 @@ def stream_anime(
player_service = PlayerService(config, provider)
with feedback.progress(f"Fetching episode streams"):
with feedback.progress("Fetching episode streams"):
streams = provider.episode_streams(
EpisodeStreamsParams(
anime_id=anime.id,
@@ -156,14 +156,14 @@ def stream_anime(
)
if config.stream.server.value == "TOP":
with feedback.progress(f"Fetching top server"):
with feedback.progress("Fetching top server"):
server = next(streams, None)
if not server:
raise FastAnimeError(
f"Failed to get server for anime: {anime.title}, episode: {episode}"
)
else:
with feedback.progress(f"Fetching servers"):
with feedback.progress("Fetching servers"):
servers = {server.name: server for server in streams}
servers_names = list(servers.keys())
if config.stream.server.value in servers_names:

View File

@@ -1,7 +1,4 @@
from .....core.utils.fuzzy import fuzz
from .....core.utils.normalizer import normalize_title
from .....libs.provider.anime.params import AnimeParams, SearchParams
from ....service.download.service import DownloadService
from ...session import Context, session
from ...state import InternalDirective, State
@@ -9,6 +6,10 @@ from ...state import InternalDirective, State
@session.menu
def download_episodes(ctx: Context, state: State) -> State | InternalDirective:
"""Menu to select and download episodes synchronously."""
from .....core.utils.fuzzy import fuzz
from .....core.utils.normalizer import normalize_title
from ....service.download.service import DownloadService
feedback = ctx.feedback
selector = ctx.selector
media_item = state.media_api.media_item

View File

@@ -96,8 +96,10 @@ def dynamic_search(ctx: Context, state: State) -> State | InternalDirective:
# Find the selected media item by matching the choice with the displayed format
selected_media = None
for media_item in search_result.media:
anime_id = media_item.id
if f"[{anime_id}] " in choice.strip():
if (
media_item.title.english == choice.strip()
or media_item.title.romaji == choice.strip()
):
selected_media = media_item
break
@@ -109,7 +111,7 @@ def dynamic_search(ctx: Context, state: State) -> State | InternalDirective:
return State(
menu_name=MenuName.MEDIA_ACTIONS,
media_api=MediaApiState(
search_result={selected_media.id: selected_media},
search_result={media.id: media for media in search_result.media},
media_id=selected_media.id,
search_params=MediaSearchParams(),
page_info=search_result.page_info,

View File

@@ -11,7 +11,6 @@ from .....libs.media_api.types import (
UserMediaListStatus,
)
from .....libs.player.params import PlayerParams
from ....service.registry.service import DownloadStatus
from ...session import Context, session
from ...state import InternalDirective, MediaApiState, MenuName, State
@@ -20,6 +19,8 @@ MenuAction = Callable[[], State | InternalDirective]
@session.menu
def media_actions(ctx: Context, state: State) -> State | InternalDirective:
from ....service.registry.service import DownloadStatus
feedback = ctx.feedback
icons = ctx.config.general.icons
@@ -254,7 +255,7 @@ def _manage_user_media_list_in_bulk(ctx: Context, state: State) -> MenuAction:
ctx.config,
)
selected_titles = ctx.selector.choose_multiple(
"Select anime to download",
"Select anime to manage",
list(choice_map.keys()),
preview=preview_command,
)

View File

@@ -1,7 +1,6 @@
from typing import Callable, Dict, Literal, Union
from .....libs.player.params import PlayerParams
from ....service.registry.models import DownloadStatus
from ...session import Context, session
from ...state import InternalDirective, MenuName, State
@@ -11,6 +10,8 @@ MenuAction = Callable[[], Union[State, InternalDirective]]
@session.menu
def play_downloads(ctx: Context, state: State) -> State | InternalDirective:
"""Menu to select and play locally downloaded episodes."""
from ....service.registry.models import DownloadStatus
feedback = ctx.feedback
media_item = state.media_api.media_item
current_episode_num = state.provider.episode
@@ -93,6 +94,8 @@ def play_downloads(ctx: Context, state: State) -> State | InternalDirective:
def downloads_player_controls(
ctx: Context, state: State
) -> Union[State, InternalDirective]:
from ....service.registry.models import DownloadStatus
feedback = ctx.feedback
feedback.clear_console()
@@ -190,6 +193,8 @@ def downloads_player_controls(
def _next_episode(ctx: Context, state: State) -> MenuAction:
def action():
from ....service.registry.models import DownloadStatus
feedback = ctx.feedback
config = ctx.config
@@ -239,6 +244,8 @@ def _next_episode(ctx: Context, state: State) -> MenuAction:
def _previous_episode(ctx: Context, state: State) -> MenuAction:
def action():
from ....service.registry.models import DownloadStatus
feedback = ctx.feedback
config = ctx.config

View File

@@ -126,18 +126,11 @@ class DownloadService:
status=DownloadStatus.DOWNLOADING,
)
media_title = (
media_item.title.english or media_item.title.romaji or "Unknown"
)
media_title = media_item.title.romaji or media_item.title.english
# 1. Search the provider to get the provider-specific ID
provider_search_title = normalize_title(
media_title,
self.config.general.provider.value,
use_provider_mapping=True,
)
provider_search_results = self.provider.search(
SearchParams(query=provider_search_title)
SearchParams(query=media_title)
)
if not provider_search_results or not provider_search_results.results:
@@ -172,7 +165,7 @@ class DownloadService:
# 4. Get stream links using the now-validated provider_anime ID
streams_iterator = self.provider.episode_streams(
EpisodeStreamsParams(
anime_id=provider_anime.id, # Use the ID from the provider, not AniList
anime_id=provider_anime.id,
query=media_title,
episode=episode_number,
translation_type=self.config.stream.translation_type,
@@ -196,11 +189,16 @@ class DownloadService:
break
stream_link = server.links[0]
episode_title = f"{media_item.title.english}; Episode {episode_number}"
if media_item.streaming_episodes and media_item.streaming_episodes.get(
episode_number
):
episode_title = media_item.streaming_episodes[episode_number].title
# 5. Perform the download
download_params = DownloadParams(
url=stream_link.link,
anime_title=media_title,
episode_title=f"{media_title} - Episode {episode_number}",
anime_title=media_item.title.english,
episode_title=episode_title,
silent=False,
headers=server.headers,
subtitles=[sub.url for sub in server.subtitles],

View File

@@ -83,10 +83,7 @@ class FeedbackService:
transient=transient,
console=console,
) as progress:
task_id = None
if auto_add_task:
# FIXME: for some reason task id is still none
task_id = progress.add_task("", total=total)
task_id = progress.add_task("", total=total)
try:
yield task_id, progress
if success_msg:

View File

@@ -27,6 +27,7 @@ class WatchHistoryService:
f"Updating watch history for {media_item.title.english} ({media_item.id}) with Episode={player_result.episode}; Stop Time={player_result.stop_time}; Total Duration={player_result.total_time}"
)
status = None
self.media_registry.update_media_index_entry(
media_id=media_item.id,
watched=True,
@@ -37,6 +38,17 @@ class WatchHistoryService:
status=status,
)
if player_result.stop_time and player_result.total_time:
from ....core.utils.converter import calculate_completion_percentage
completion_percentage = calculate_completion_percentage(
player_result.stop_time, player_result.total_time
)
if completion_percentage < self.config.stream.episode_complete_at:
logger.info(
f"Not updating remote watch history since completion percentage ({completion_percentage} is not greater than episode complete at ({self.config.stream.episode_complete_at}))"
)
return
if self.media_api and self.media_api.is_authenticated():
if not self.media_api.update_list_entry(
UpdateUserMediaListEntryParams(

View File

@@ -19,7 +19,7 @@
pkgs = nixpkgs.legacyPackages.${system};
inherit (pkgs) lib python3Packages;
version = "2.9.9";
version = "3.0.0";
in
{
packages.default = python3Packages.buildPythonApplication {

View File

@@ -1,6 +1,6 @@
[project]
name = "fastanime"
version = "2.9.9"
version = "3.0.0"
description = "A browser anime site experience from the terminal"
license = "UNLICENSE"
readme = "README.md"
@@ -18,17 +18,15 @@ fastanime = 'fastanime:Cli'
[project.optional-dependencies]
standard = [
"mpv>=1.0.7",
"plyer>=2.1.0",
"libtorrent>=2.0.11",
"lxml>=6.0.0",
"pypresence>=4.3.0",
"thefuzz>=0.22.1",
"yt-dlp>=2025.7.21",
"pycryptodomex>=3.23.0",
]
notifications = ["plyer>=2.1.0"]
mpv = ["mpv>=1.0.7"]
mpv = [
"mpv>=1.0.7",
]
torrent = ["libtorrent>=2.0.11"]
lxml = ["lxml>=6.0.0"]
discord = ["pypresence>=4.3.0"]
@@ -36,6 +34,9 @@ download = [
"pycryptodomex>=3.23.0",
"yt-dlp>=2025.7.21",
]
torrents = [
"libtorrent>=2.0.11",
]
[build-system]
requires = ["hatchling"]

14
uv.lock generated
View File

@@ -125,18 +125,17 @@ notifications = [
{ name = "plyer" },
]
standard = [
{ name = "libtorrent" },
{ name = "lxml" },
{ name = "mpv" },
{ name = "plyer" },
{ name = "pycryptodomex" },
{ name = "pypresence" },
{ name = "thefuzz" },
{ name = "yt-dlp" },
]
torrent = [
{ name = "libtorrent" },
]
torrents = [
{ name = "libtorrent" },
]
[package.dev-dependencies]
dev = [
@@ -153,25 +152,22 @@ requires-dist = [
{ name = "click", specifier = ">=8.1.7" },
{ name = "httpx", specifier = ">=0.28.1" },
{ name = "inquirerpy", specifier = ">=0.3.4" },
{ name = "libtorrent", marker = "extra == 'standard'", specifier = ">=2.0.11" },
{ name = "libtorrent", marker = "extra == 'torrent'", specifier = ">=2.0.11" },
{ name = "libtorrent", marker = "extra == 'torrents'", specifier = ">=2.0.11" },
{ name = "lxml", marker = "extra == 'lxml'", specifier = ">=6.0.0" },
{ name = "lxml", marker = "extra == 'standard'", specifier = ">=6.0.0" },
{ name = "mpv", marker = "extra == 'mpv'", specifier = ">=1.0.7" },
{ name = "mpv", marker = "extra == 'standard'", specifier = ">=1.0.7" },
{ name = "plyer", marker = "extra == 'notifications'", specifier = ">=2.1.0" },
{ name = "plyer", marker = "extra == 'standard'", specifier = ">=2.1.0" },
{ name = "pycryptodomex", marker = "extra == 'download'", specifier = ">=3.23.0" },
{ name = "pycryptodomex", marker = "extra == 'standard'", specifier = ">=3.23.0" },
{ name = "pydantic", specifier = ">=2.11.7" },
{ name = "pypresence", marker = "extra == 'discord'", specifier = ">=4.3.0" },
{ name = "pypresence", marker = "extra == 'standard'", specifier = ">=4.3.0" },
{ name = "rich", specifier = ">=13.9.2" },
{ name = "thefuzz", marker = "extra == 'standard'", specifier = ">=0.22.1" },
{ name = "yt-dlp", marker = "extra == 'download'", specifier = ">=2025.7.21" },
{ name = "yt-dlp", marker = "extra == 'standard'", specifier = ">=2025.7.21" },
]
provides-extras = ["standard", "notifications", "mpv", "torrent", "lxml", "discord", "download"]
provides-extras = ["standard", "notifications", "mpv", "torrent", "lxml", "discord", "download", "torrents"]
[package.metadata.requires-dev]
dev = [