diff --git a/fastanime/cli/services/auth/manager.py b/fastanime/cli/services/auth/service.py similarity index 100% rename from fastanime/cli/services/auth/manager.py rename to fastanime/cli/services/auth/service.py diff --git a/fastanime/cli/services/downloads/models.py b/fastanime/cli/services/downloads/models.py deleted file mode 100644 index 847d0a3..0000000 --- a/fastanime/cli/services/downloads/models.py +++ /dev/null @@ -1,419 +0,0 @@ -""" -Pydantic models for download tracking system. - -This module defines the data models used throughout the download tracking system, -providing type safety and validation using Pydantic v2. -""" - -from __future__ import annotations - -import hashlib -import logging -from datetime import datetime -from pathlib import Path -from typing import Dict, List, Literal, Optional - -from pydantic import BaseModel, ConfigDict, Field, computed_field, field_validator - -from ....core.constants import APP_DATA_DIR -from ....libs.api.types import MediaItem - -logger = logging.getLogger(__name__) - -# Type aliases for better readability -DownloadStatus = Literal["completed", "failed", "downloading", "queued", "paused"] -QualityOption = Literal["360", "480", "720", "1080", "best"] -MediaStatus = Literal["active", "completed", "paused", "failed"] - - -class EpisodeDownload(BaseModel): - """ - Pydantic model for individual episode download tracking. - - Tracks all information related to a single episode download including - file location, download progress, quality, and integrity information. - """ - - model_config = ConfigDict( - str_strip_whitespace=True, - validate_assignment=True, - frozen=True, # Immutable after creation for data integrity - ) - - episode_number: int = Field(gt=0, description="Episode number") - file_path: Path = Field(description="Path to downloaded file") - file_size: int = Field(ge=0, description="File size in bytes") - download_date: datetime = Field(default_factory=datetime.now) - quality: QualityOption = Field(default="1080") - source_provider: str = Field(description="Provider used for download") - status: DownloadStatus = Field(default="queued") - checksum: Optional[str] = Field(None, description="SHA256 checksum for integrity") - subtitle_files: List[Path] = Field(default_factory=list) - download_progress: float = Field(default=0.0, ge=0.0, le=1.0) - error_message: Optional[str] = Field(None, description="Error message if failed") - download_speed: Optional[float] = Field(None, description="Download speed in bytes/sec") - - @field_validator("file_path") - @classmethod - def validate_file_path(cls, v: Path) -> Path: - """Ensure file path is absolute and within allowed directories.""" - if not v.is_absolute(): - raise ValueError("File path must be absolute") - return v - - @computed_field - @property - def is_completed(self) -> bool: - """Check if download is completed and file exists.""" - return self.status == "completed" and self.file_path.exists() - - @computed_field - @property - def file_size_mb(self) -> float: - """Get file size in megabytes.""" - return self.file_size / (1024 * 1024) - - @computed_field - @property - def display_status(self) -> str: - """Get human-readable status.""" - status_map = { - "completed": "✓ Completed", - "failed": "✗ Failed", - "downloading": "⬇ Downloading", - "queued": "⏳ Queued", - "paused": "⏸ Paused" - } - return status_map.get(self.status, self.status) - - def generate_checksum(self) -> Optional[str]: - """Generate SHA256 checksum for the downloaded file.""" - if not self.file_path.exists(): - return None - - try: - sha256_hash = hashlib.sha256() - with open(self.file_path, "rb") as f: - for chunk in iter(lambda: f.read(4096), b""): - sha256_hash.update(chunk) - return sha256_hash.hexdigest() - except Exception as e: - logger.error(f"Failed to generate checksum for {self.file_path}: {e}") - return None - - def verify_integrity(self) -> bool: - """Verify file integrity using stored checksum.""" - if not self.checksum or not self.file_path.exists(): - return False - - current_checksum = self.generate_checksum() - return current_checksum == self.checksum - - -class MediaDownloadRecord(BaseModel): - """ - Pydantic model for anime series download tracking. - - Manages download information for an entire anime series including - individual episodes, metadata, and organization preferences. - """ - - model_config = ConfigDict( - str_strip_whitespace=True, - validate_assignment=True, - ) - - media_item: MediaItem = Field(description="The anime media item") - episodes: Dict[int, EpisodeDownload] = Field(default_factory=dict) - download_path: Path = Field(description="Base download directory for this anime") - created_date: datetime = Field(default_factory=datetime.now) - last_updated: datetime = Field(default_factory=datetime.now) - preferred_quality: QualityOption = Field(default="1080") - auto_download_new: bool = Field(default=False, description="Auto-download new episodes") - tags: List[str] = Field(default_factory=list, description="User-defined tags") - notes: Optional[str] = Field(None, description="User notes") - - # Organization preferences - naming_template: str = Field( - default="{title}/Season {season:02d}/{episode:02d} - {episode_title}.{ext}", - description="File naming template" - ) - - @field_validator("download_path") - @classmethod - def validate_download_path(cls, v: Path) -> Path: - """Ensure download path is absolute.""" - if not v.is_absolute(): - raise ValueError("Download path must be absolute") - return v - - @computed_field - @property - def total_episodes_downloaded(self) -> int: - """Get count of successfully downloaded episodes.""" - return len([ep for ep in self.episodes.values() if ep.is_completed]) - - @computed_field - @property - def total_size_bytes(self) -> int: - """Get total size of all downloaded episodes in bytes.""" - return sum(ep.file_size for ep in self.episodes.values() if ep.is_completed) - - @computed_field - @property - def total_size_gb(self) -> float: - """Get total size in gigabytes.""" - return self.total_size_bytes / (1024 * 1024 * 1024) - - @computed_field - @property - def completion_percentage(self) -> float: - """Get completion percentage based on total episodes.""" - if not self.media_item.episodes or self.media_item.episodes == 0: - return 0.0 - return (self.total_episodes_downloaded / self.media_item.episodes) * 100 - - @computed_field - @property - def display_title(self) -> str: - """Get display title for the anime.""" - return ( - self.media_item.title.english - or self.media_item.title.romaji - or f"Anime {self.media_item.id}" - ) - - @computed_field - @property - def status(self) -> MediaStatus: - """Determine overall download status for this anime.""" - if not self.episodes: - return "active" - - statuses = [ep.status for ep in self.episodes.values()] - - if all(s == "completed" for s in statuses): - if self.media_item.episodes and len(self.episodes) >= self.media_item.episodes: - return "completed" - - if any(s == "failed" for s in statuses): - return "failed" - - if any(s in ["downloading", "queued"] for s in statuses): - return "active" - - return "paused" - - def get_next_episode_to_download(self) -> Optional[int]: - """Get the next episode number that should be downloaded.""" - if not self.media_item.episodes: - return None - - downloaded_episodes = set(ep.episode_number for ep in self.episodes.values() if ep.is_completed) - - for episode_num in range(1, self.media_item.episodes + 1): - if episode_num not in downloaded_episodes: - return episode_num - - return None - - def get_failed_episodes(self) -> List[int]: - """Get list of episode numbers that failed to download.""" - return [ - ep.episode_number for ep in self.episodes.values() - if ep.status == "failed" - ] - - def update_last_modified(self) -> None: - """Update the last_updated timestamp.""" - # Create a new instance with updated timestamp since the model might be frozen - object.__setattr__(self, "last_updated", datetime.now()) - - -class MediaIndexEntry(BaseModel): - """ - Lightweight entry in the download index for fast operations. - - Provides quick access to basic information about a download record - without loading the full MediaDownloadRecord. - """ - - model_config = ConfigDict(validate_assignment=True) - - media_id: int = Field(description="AniList media ID") - title: str = Field(description="Display title") - episode_count: int = Field(default=0, ge=0) - completed_episodes: int = Field(default=0, ge=0) - last_download: Optional[datetime] = None - status: MediaStatus = Field(default="active") - total_size: int = Field(default=0, ge=0) - file_path: Path = Field(description="Path to the media record file") - - @computed_field - @property - def completion_percentage(self) -> float: - """Get completion percentage.""" - if self.episode_count == 0: - return 0.0 - return (self.completed_episodes / self.episode_count) * 100 - - @computed_field - @property - def total_size_mb(self) -> float: - """Get total size in megabytes.""" - return self.total_size / (1024 * 1024) - - -class DownloadIndex(BaseModel): - """ - Lightweight index for fast download operations. - - Maintains an overview of all download records without loading - the full data, enabling fast searches and filtering. - """ - - model_config = ConfigDict(validate_assignment=True) - - version: str = Field(default="1.0") - last_updated: datetime = Field(default_factory=datetime.now) - media_count: int = Field(default=0, ge=0) - total_episodes: int = Field(default=0, ge=0) - total_size_bytes: int = Field(default=0, ge=0) - media_index: Dict[int, MediaIndexEntry] = Field(default_factory=dict) - - @computed_field - @property - def total_size_gb(self) -> float: - """Get total size across all downloads in gigabytes.""" - return self.total_size_bytes / (1024 * 1024 * 1024) - - @computed_field - @property - def completion_stats(self) -> Dict[str, int]: - """Get completion statistics.""" - stats = {"completed": 0, "active": 0, "failed": 0, "paused": 0} - for entry in self.media_index.values(): - stats[entry.status] = stats.get(entry.status, 0) + 1 - return stats - - def add_media_entry(self, media_record: MediaDownloadRecord) -> None: - """Add or update a media entry in the index.""" - entry = MediaIndexEntry( - media_id=media_record.media_item.id, - title=media_record.display_title, - episode_count=media_record.media_item.episodes or 0, - completed_episodes=media_record.total_episodes_downloaded, - last_download=media_record.last_updated, - status=media_record.status, - total_size=media_record.total_size_bytes, - file_path=APP_DATA_DIR / "downloads" / "media" / f"{media_record.media_item.id}.json" - ) - - self.media_index[media_record.media_item.id] = entry - self.media_count = len(self.media_index) - self.total_episodes = sum(entry.completed_episodes for entry in self.media_index.values()) - self.total_size_bytes = sum(entry.total_size for entry in self.media_index.values()) - self.last_updated = datetime.now() - - def remove_media_entry(self, media_id: int) -> bool: - """Remove a media entry from the index.""" - if media_id in self.media_index: - del self.media_index[media_id] - self.media_count = len(self.media_index) - self.total_episodes = sum(entry.completed_episodes for entry in self.media_index.values()) - self.total_size_bytes = sum(entry.total_size for entry in self.media_index.values()) - self.last_updated = datetime.now() - return True - return False - - -class DownloadQueueItem(BaseModel): - """ - Item in the download queue. - - Represents a single episode queued for download with priority - and scheduling information. - """ - - model_config = ConfigDict(frozen=True) - - media_id: int - episode_number: int - priority: int = Field(default=0, description="Higher number = higher priority") - added_date: datetime = Field(default_factory=datetime.now) - estimated_size: Optional[int] = Field(None, description="Estimated file size") - quality_preference: QualityOption = Field(default="1080") - retry_count: int = Field(default=0, ge=0) - max_retries: int = Field(default=3, gt=0) - - @computed_field - @property - def can_retry(self) -> bool: - """Check if this item can be retried.""" - return self.retry_count < self.max_retries - - @computed_field - @property - def estimated_size_mb(self) -> Optional[float]: - """Get estimated size in megabytes.""" - if self.estimated_size is None: - return None - return self.estimated_size / (1024 * 1024) - - -class DownloadQueue(BaseModel): - """ - Download queue management. - - Manages the queue of episodes waiting to be downloaded with - priority handling and scheduling. - """ - - model_config = ConfigDict(validate_assignment=True) - - items: List[DownloadQueueItem] = Field(default_factory=list) - max_size: int = Field(default=100, gt=0) - last_updated: datetime = Field(default_factory=datetime.now) - - def add_item(self, item: DownloadQueueItem) -> bool: - """Add an item to the queue.""" - if len(self.items) >= self.max_size: - return False - - # Check for duplicates - for existing_item in self.items: - if (existing_item.media_id == item.media_id and - existing_item.episode_number == item.episode_number): - return False - - self.items.append(item) - # Sort by priority (highest first), then by added date - self.items.sort(key=lambda x: (-x.priority, x.added_date)) - self.last_updated = datetime.now() - return True - - def get_next_item(self) -> Optional[DownloadQueueItem]: - """Get the next item to download.""" - if not self.items: - return None - return self.items[0] - - def remove_item(self, media_id: int, episode_number: int) -> bool: - """Remove an item from the queue.""" - for i, item in enumerate(self.items): - if item.media_id == media_id and item.episode_number == episode_number: - del self.items[i] - self.last_updated = datetime.now() - return True - return False - - def clear(self) -> None: - """Clear all items from the queue.""" - self.items.clear() - self.last_updated = datetime.now() - - @computed_field - @property - def total_estimated_size(self) -> int: - """Get total estimated size of all queued items.""" - return sum(item.estimated_size or 0 for item in self.items) diff --git a/fastanime/cli/services/downloads/manager.py b/fastanime/cli/services/downloads/service.py similarity index 100% rename from fastanime/cli/services/downloads/manager.py rename to fastanime/cli/services/downloads/service.py diff --git a/fastanime/cli/services/downloads/tracker.py b/fastanime/cli/services/downloads/tracker.py deleted file mode 100644 index ed0a639..0000000 --- a/fastanime/cli/services/downloads/tracker.py +++ /dev/null @@ -1,302 +0,0 @@ -""" -Download progress tracking and integration with the download system. - -This module provides real-time tracking of download progress and integrates -with the existing download infrastructure to provide progress updates. -""" - -from __future__ import annotations - -import logging -import threading -import time -from datetime import datetime -from pathlib import Path -from typing import Callable, Dict, Optional - -from ....core.config.model import DownloadsConfig -from .manager import DownloadManager, get_download_manager -from .models import DownloadQueueItem - -logger = logging.getLogger(__name__) - -# Type alias for progress callback -ProgressCallback = Callable[[int, int, float, float], None] # media_id, episode, progress, speed - - -class DownloadTracker: - """ - Tracks download progress and integrates with the download manager. - - Provides real-time progress updates and handles integration between - the actual download process and the tracking system. - """ - - def __init__(self, config: DownloadsConfig): - self.config = config - self.download_manager = get_download_manager(config) - - # Track active downloads - self._active_downloads: Dict[str, DownloadSession] = {} - self._lock = threading.RLock() - - # Progress callbacks - self._progress_callbacks: list[ProgressCallback] = [] - - def add_progress_callback(self, callback: ProgressCallback) -> None: - """Add a callback function to receive progress updates.""" - with self._lock: - self._progress_callbacks.append(callback) - - def remove_progress_callback(self, callback: ProgressCallback) -> None: - """Remove a progress callback.""" - with self._lock: - if callback in self._progress_callbacks: - self._progress_callbacks.remove(callback) - - def start_download(self, queue_item: DownloadQueueItem) -> str: - """Start tracking a download and return session ID.""" - with self._lock: - session_id = f"{queue_item.media_id}_{queue_item.episode_number}_{int(time.time())}" - - session = DownloadSession( - session_id=session_id, - queue_item=queue_item, - tracker=self - ) - - self._active_downloads[session_id] = session - - # Mark download as started in manager - self.download_manager.mark_download_started( - queue_item.media_id, - queue_item.episode_number - ) - - logger.info(f"Started download tracking for session {session_id}") - return session_id - - def update_progress(self, session_id: str, progress: float, - speed: Optional[float] = None) -> None: - """Update download progress for a session.""" - with self._lock: - if session_id not in self._active_downloads: - logger.warning(f"Unknown download session: {session_id}") - return - - session = self._active_downloads[session_id] - session.update_progress(progress, speed) - - # Notify callbacks - for callback in self._progress_callbacks: - try: - callback( - session.queue_item.media_id, - session.queue_item.episode_number, - progress, - speed or 0.0 - ) - except Exception as e: - logger.error(f"Error in progress callback: {e}") - - def complete_download(self, session_id: str, file_path: Path, - file_size: int, checksum: Optional[str] = None) -> bool: - """Mark a download as completed.""" - with self._lock: - if session_id not in self._active_downloads: - logger.warning(f"Unknown download session: {session_id}") - return False - - session = self._active_downloads[session_id] - session.mark_completed(file_path, file_size, checksum) - - # Update download manager - success = self.download_manager.mark_download_completed( - session.queue_item.media_id, - session.queue_item.episode_number, - file_path, - file_size, - checksum - ) - - # Remove from active downloads - del self._active_downloads[session_id] - - logger.info(f"Completed download session {session_id}") - return success - - def fail_download(self, session_id: str, error_message: str) -> bool: - """Mark a download as failed.""" - with self._lock: - if session_id not in self._active_downloads: - logger.warning(f"Unknown download session: {session_id}") - return False - - session = self._active_downloads[session_id] - session.mark_failed(error_message) - - # Update download manager - success = self.download_manager.mark_download_failed( - session.queue_item.media_id, - session.queue_item.episode_number, - error_message - ) - - # Remove from active downloads - del self._active_downloads[session_id] - - logger.warning(f"Failed download session {session_id}: {error_message}") - return success - - def get_active_downloads(self) -> Dict[str, 'DownloadSession']: - """Get all currently active download sessions.""" - with self._lock: - return self._active_downloads.copy() - - def cancel_download(self, session_id: str) -> bool: - """Cancel an active download.""" - with self._lock: - if session_id not in self._active_downloads: - return False - - session = self._active_downloads[session_id] - session.cancel() - - # Mark as failed with cancellation message - self.download_manager.mark_download_failed( - session.queue_item.media_id, - session.queue_item.episode_number, - "Download cancelled by user" - ) - - del self._active_downloads[session_id] - logger.info(f"Cancelled download session {session_id}") - return True - - def cleanup_stale_sessions(self, max_age_hours: int = 24) -> int: - """Clean up stale download sessions that may have been orphaned.""" - with self._lock: - current_time = datetime.now() - stale_sessions = [] - - for session_id, session in self._active_downloads.items(): - age_hours = (current_time - session.start_time).total_seconds() / 3600 - if age_hours > max_age_hours: - stale_sessions.append(session_id) - - for session_id in stale_sessions: - self.fail_download(session_id, "Session timed out") - - return len(stale_sessions) - - -class DownloadSession: - """ - Represents an active download session with progress tracking. - """ - - def __init__(self, session_id: str, queue_item: DownloadQueueItem, tracker: DownloadTracker): - self.session_id = session_id - self.queue_item = queue_item - self.tracker = tracker - self.start_time = datetime.now() - - # Progress tracking - self.progress = 0.0 - self.download_speed = 0.0 - self.bytes_downloaded = 0 - self.total_bytes = queue_item.estimated_size or 0 - - # Status - self.is_cancelled = False - self.is_completed = False - self.error_message: Optional[str] = None - - # Thread safety - self._lock = threading.Lock() - - def update_progress(self, progress: float, speed: Optional[float] = None) -> None: - """Update the progress of this download session.""" - with self._lock: - if self.is_cancelled or self.is_completed: - return - - self.progress = max(0.0, min(1.0, progress)) - - if speed is not None: - self.download_speed = speed - - if self.total_bytes > 0: - self.bytes_downloaded = int(self.total_bytes * self.progress) - - logger.debug(f"Session {self.session_id} progress: {self.progress:.2%}") - - def mark_completed(self, file_path: Path, file_size: int, checksum: Optional[str] = None) -> None: - """Mark this session as completed.""" - with self._lock: - if self.is_cancelled: - return - - self.is_completed = True - self.progress = 1.0 - self.bytes_downloaded = file_size - self.total_bytes = file_size - - def mark_failed(self, error_message: str) -> None: - """Mark this session as failed.""" - with self._lock: - if self.is_cancelled or self.is_completed: - return - - self.error_message = error_message - - def cancel(self) -> None: - """Cancel this download session.""" - with self._lock: - if self.is_completed: - return - - self.is_cancelled = True - - @property - def elapsed_time(self) -> float: - """Get elapsed time in seconds.""" - return (datetime.now() - self.start_time).total_seconds() - - @property - def estimated_time_remaining(self) -> Optional[float]: - """Get estimated time remaining in seconds.""" - if self.progress <= 0 or self.download_speed <= 0: - return None - - remaining_bytes = self.total_bytes - self.bytes_downloaded - if remaining_bytes <= 0: - return 0.0 - - return remaining_bytes / self.download_speed - - @property - def status_text(self) -> str: - """Get human-readable status.""" - if self.is_cancelled: - return "Cancelled" - elif self.is_completed: - return "Completed" - elif self.error_message: - return f"Failed: {self.error_message}" - else: - return f"Downloading ({self.progress:.1%})" - - -# Global tracker instance -_download_tracker: Optional[DownloadTracker] = None - - -def get_download_tracker(config: DownloadsConfig) -> DownloadTracker: - """Get or create the global download tracker instance.""" - global _download_tracker - - if _download_tracker is None: - _download_tracker = DownloadTracker(config) - - return _download_tracker diff --git a/fastanime/cli/services/downloads/validator.py b/fastanime/cli/services/downloads/validator.py deleted file mode 100644 index bb224e6..0000000 --- a/fastanime/cli/services/downloads/validator.py +++ /dev/null @@ -1,340 +0,0 @@ -""" -Download validation and integrity checking utilities. - -This module provides functionality to validate downloaded files, verify -integrity, and repair corrupted download records. -""" - -from __future__ import annotations - -import json -import logging -from datetime import datetime -from pathlib import Path -from typing import Dict, List, Optional, Tuple - -from pydantic import ValidationError - -from ....core.constants import APP_DATA_DIR -from .manager import DownloadManager -from .models import DownloadIndex, MediaDownloadRecord - -logger = logging.getLogger(__name__) - - -class DownloadValidator: - """ - Validator for download records and file integrity using Pydantic models. - - Provides functionality to validate, repair, and maintain the integrity - of download tracking data and associated files. - """ - - def __init__(self, download_manager: DownloadManager): - self.download_manager = download_manager - self.tracking_dir = APP_DATA_DIR / "downloads" - self.media_dir = self.tracking_dir / "media" - - def validate_download_record(self, file_path: Path) -> Optional[MediaDownloadRecord]: - """Load and validate a download record with Pydantic.""" - try: - with open(file_path, 'r', encoding='utf-8') as f: - data = json.load(f) - - record = MediaDownloadRecord.model_validate(data) - logger.debug(f"Successfully validated download record: {file_path}") - return record - - except ValidationError as e: - logger.error(f"Invalid download record {file_path}: {e}") - return None - except Exception as e: - logger.error(f"Failed to load download record {file_path}: {e}") - return None - - def validate_all_records(self) -> Tuple[List[MediaDownloadRecord], List[Path]]: - """Validate all download records and return valid records and invalid file paths.""" - valid_records = [] - invalid_files = [] - - if not self.media_dir.exists(): - logger.warning("Media directory does not exist") - return valid_records, invalid_files - - for record_file in self.media_dir.glob("*.json"): - record = self.validate_download_record(record_file) - if record: - valid_records.append(record) - else: - invalid_files.append(record_file) - - logger.info(f"Validated {len(valid_records)} records, found {len(invalid_files)} invalid files") - return valid_records, invalid_files - - def verify_file_integrity(self, record: MediaDownloadRecord) -> Dict[int, bool]: - """Verify file integrity for all episodes in a download record.""" - integrity_results = {} - - for episode_num, episode_download in record.episodes.items(): - if episode_download.status != "completed": - integrity_results[episode_num] = True # Skip non-completed downloads - continue - - # Check if file exists - if not episode_download.file_path.exists(): - logger.warning(f"Missing file for episode {episode_num}: {episode_download.file_path}") - integrity_results[episode_num] = False - continue - - # Verify file size - actual_size = episode_download.file_path.stat().st_size - if actual_size != episode_download.file_size: - logger.warning(f"Size mismatch for episode {episode_num}: expected {episode_download.file_size}, got {actual_size}") - integrity_results[episode_num] = False - continue - - # Verify checksum if available - if episode_download.checksum: - if not episode_download.verify_integrity(): - logger.warning(f"Checksum mismatch for episode {episode_num}") - integrity_results[episode_num] = False - continue - - integrity_results[episode_num] = True - - return integrity_results - - def repair_download_record(self, record_file: Path) -> bool: - """Attempt to repair a corrupted download record.""" - try: - # Try to load raw data - with open(record_file, 'r', encoding='utf-8') as f: - data = json.load(f) - - # Attempt basic repairs - repaired_data = self._attempt_basic_repairs(data) - - # Try to validate repaired data - try: - repaired_record = MediaDownloadRecord.model_validate(repaired_data) - - # Save repaired record - self.download_manager.save_download_record(repaired_record) - logger.info(f"Successfully repaired download record: {record_file}") - return True - - except ValidationError as e: - logger.error(f"Could not repair download record {record_file}: {e}") - return False - - except Exception as e: - logger.error(f"Failed to repair download record {record_file}: {e}") - return False - - def _attempt_basic_repairs(self, data: Dict) -> Dict: - """Attempt basic repairs on download record data.""" - repaired = data.copy() - - # Ensure required fields exist with defaults - if "episodes" not in repaired: - repaired["episodes"] = {} - - if "created_date" not in repaired: - repaired["created_date"] = "2024-01-01T00:00:00" - - if "last_updated" not in repaired: - repaired["last_updated"] = "2024-01-01T00:00:00" - - if "tags" not in repaired: - repaired["tags"] = [] - - if "preferred_quality" not in repaired: - repaired["preferred_quality"] = "1080" - - if "auto_download_new" not in repaired: - repaired["auto_download_new"] = False - - # Fix episodes data - if isinstance(repaired["episodes"], dict): - fixed_episodes = {} - for ep_num, ep_data in repaired["episodes"].items(): - if isinstance(ep_data, dict): - # Ensure required episode fields - if "episode_number" not in ep_data: - ep_data["episode_number"] = int(ep_num) if ep_num.isdigit() else 1 - - if "status" not in ep_data: - ep_data["status"] = "queued" - - if "download_progress" not in ep_data: - ep_data["download_progress"] = 0.0 - - if "file_size" not in ep_data: - ep_data["file_size"] = 0 - - if "subtitle_files" not in ep_data: - ep_data["subtitle_files"] = [] - - fixed_episodes[ep_num] = ep_data - - repaired["episodes"] = fixed_episodes - - return repaired - - def rebuild_index_from_records(self) -> bool: - """Rebuild the download index from individual record files.""" - try: - valid_records, _ = self.validate_all_records() - - # Create new index - new_index = DownloadIndex() - - # Add all valid records to index - for record in valid_records: - new_index.add_media_entry(record) - - # Save rebuilt index - self.download_manager._save_index(new_index) - - logger.info(f"Rebuilt download index with {len(valid_records)} records") - return True - - except Exception as e: - logger.error(f"Failed to rebuild index: {e}") - return False - - def cleanup_orphaned_files(self) -> int: - """Clean up orphaned files and inconsistent records.""" - cleanup_count = 0 - - try: - # Load current index - index = self.download_manager._load_index() - - # Check for orphaned record files - if self.media_dir.exists(): - for record_file in self.media_dir.glob("*.json"): - media_id = int(record_file.stem) - if media_id not in index.media_index: - # Check if record is valid - record = self.validate_download_record(record_file) - if record: - # Add to index - index.add_media_entry(record) - logger.info(f"Re-added orphaned record to index: {media_id}") - else: - # Remove invalid file - record_file.unlink() - cleanup_count += 1 - logger.info(f"Removed invalid record file: {record_file}") - - # Check for missing record files - missing_records = [] - for media_id, index_entry in index.media_index.items(): - if not index_entry.file_path.exists(): - missing_records.append(media_id) - - # Remove missing records from index - for media_id in missing_records: - index.remove_media_entry(media_id) - cleanup_count += 1 - logger.info(f"Removed missing record from index: {media_id}") - - # Save updated index - if cleanup_count > 0: - self.download_manager._save_index(index) - - return cleanup_count - - except Exception as e: - logger.error(f"Failed to cleanup orphaned files: {e}") - return 0 - - def validate_file_paths(self, record: MediaDownloadRecord) -> List[str]: - """Validate file paths in a download record and return issues.""" - issues = [] - - # Check download path - if not record.download_path.is_absolute(): - issues.append(f"Download path is not absolute: {record.download_path}") - - # Check episode file paths - for episode_num, episode_download in record.episodes.items(): - if not episode_download.file_path.is_absolute(): - issues.append(f"Episode {episode_num} file path is not absolute: {episode_download.file_path}") - - # Check if file exists for completed downloads - if episode_download.status == "completed" and not episode_download.file_path.exists(): - issues.append(f"Episode {episode_num} file does not exist: {episode_download.file_path}") - - # Check subtitle files - for subtitle_file in episode_download.subtitle_files: - if not subtitle_file.exists(): - issues.append(f"Episode {episode_num} subtitle file does not exist: {subtitle_file}") - - return issues - - def generate_validation_report(self) -> Dict: - """Generate a comprehensive validation report.""" - report = { - "timestamp": str(datetime.now()), - "total_records": 0, - "valid_records": 0, - "invalid_records": 0, - "integrity_issues": 0, - "orphaned_files": 0, - "path_issues": 0, - "details": { - "invalid_files": [], - "integrity_failures": [], - "path_issues": [] - } - } - - try: - # Validate all records - valid_records, invalid_files = self.validate_all_records() - - report["total_records"] = len(valid_records) + len(invalid_files) - report["valid_records"] = len(valid_records) - report["invalid_records"] = len(invalid_files) - report["details"]["invalid_files"] = [str(f) for f in invalid_files] - - # Check integrity and paths for valid records - for record in valid_records: - # Check file integrity - integrity_results = self.verify_file_integrity(record) - failed_episodes = [ep for ep, result in integrity_results.items() if not result] - if failed_episodes: - report["integrity_issues"] += len(failed_episodes) - report["details"]["integrity_failures"].append({ - "media_id": record.media_item.id, - "title": record.display_title, - "failed_episodes": failed_episodes - }) - - # Check file paths - path_issues = self.validate_file_paths(record) - if path_issues: - report["path_issues"] += len(path_issues) - report["details"]["path_issues"].append({ - "media_id": record.media_item.id, - "title": record.display_title, - "issues": path_issues - }) - - # Check for orphaned files - orphaned_count = self.cleanup_orphaned_files() - report["orphaned_files"] = orphaned_count - - except Exception as e: - logger.error(f"Failed to generate validation report: {e}") - report["error"] = str(e) - - return report - - -def validate_downloads(download_manager: DownloadManager) -> Dict: - """Convenience function to validate all downloads and return a report.""" - validator = DownloadValidator(download_manager) - return validator.generate_validation_report() diff --git a/fastanime/cli/services/feedback/__init__.py b/fastanime/cli/services/feedback/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/fastanime/cli/utils/feedback.py b/fastanime/cli/services/feedback/service.py similarity index 100% rename from fastanime/cli/utils/feedback.py rename to fastanime/cli/services/feedback/service.py diff --git a/fastanime/cli/services/registry/filters.py b/fastanime/cli/services/registry/filters.py new file mode 100644 index 0000000..d83c316 --- /dev/null +++ b/fastanime/cli/services/registry/filters.py @@ -0,0 +1,277 @@ +from typing import List + +from ....libs.api.params import ApiSearchParams +from ....libs.api.types import MediaItem + + +class MediaFilter: + """ + A class to filter, sort, and paginate a list of MediaItem objects + based on ApiSearchParams. + """ + + # Mapping for season to month range (MMDD format) + _SEASON_MONTH_RANGES = { + "WINTER": (101, 331), # Jan 1 - Mar 31 + "SPRING": (401, 630), # Apr 1 - Jun 30 + "SUMMER": (701, 930), # Jul 1 - Sep 30 + "FALL": (1001, 1231), # Oct 1 - Dec 31 + } + + # Mapping for sort parameters to MediaItem attributes and order + # (attribute_name, is_descending, is_nested_title_field) + _SORT_MAPPING = { + "ID": ("id", False, False), + "ID_DESC": ("id", True, False), + "POPULARITY": ("popularity", False, False), + "POPULARITY_DESC": ("popularity", True, False), + "SCORE": ("average_score", False, False), + "SCORE_DESC": ("average_score", True, False), + "TITLE_ROMAJI": ("romaji", False, True), # Nested under title + "TITLE_ROMAJI_DESC": ("romaji", True, True), + "TITLE_ENGLISH": ("english", False, True), + "TITLE_ENGLISH_DESC": ("english", True, True), + "START_DATE": ("start_date", False, False), + "START_DATE_DESC": ("start_date", True, False), + } + + @classmethod + def apply( + cls, media_items: List[MediaItem], filters: ApiSearchParams + ) -> List[MediaItem]: + """ + Applies filtering, sorting, and pagination to a list of MediaItem objects. + + Args: + media_items: The initial list of MediaItem objects to filter. + params: An ApiSearchParams object containing the filter, sort, and pagination criteria. + + Returns: + A new list of MediaItem objects, filtered, sorted, and paginated. + """ + filtered_items = list(media_items) # Create a mutable copy + + if filters.query: + query_lower = filters.query.lower() + filtered_items = [ + item + for item in filtered_items + if ( + item.title + and ( + (item.title.romaji and query_lower in item.title.romaji.lower()) + or ( + item.title.english + and query_lower in item.title.english.lower() + ) + or ( + item.title.native + and query_lower in item.title.native.lower() + ) + ) + ) + or (item.description and query_lower in item.description.lower()) + or any(query_lower in syn.lower() for syn in item.synonyms) + ] + + # IDs + if filters.id_in: + id_set = set(filters.id_in) + filtered_items = [item for item in filtered_items if item.id in id_set] + + # Genres + if filters.genre_in: + genre_in_set = set(g.lower() for g in filters.genre_in) + filtered_items = [ + item + for item in filtered_items + if any(g.lower() in genre_in_set for g in item.genres) + ] + if filters.genre_not_in: + genre_not_in_set = set(g.lower() for g in filters.genre_not_in) + filtered_items = [ + item + for item in filtered_items + if not any(g.lower() in genre_not_in_set for g in item.genres) + ] + + # Tags + if filters.tag_in: + tag_in_set = set(t.lower() for t in filters.tag_in) + filtered_items = [ + item + for item in filtered_items + if any(tag.name and tag.name.lower() in tag_in_set for tag in item.tags) + ] + if filters.tag_not_in: + tag_not_in_set = set(t.lower() for t in filters.tag_not_in) + filtered_items = [ + item + for item in filtered_items + if not any( + tag.name and tag.name.lower() in tag_not_in_set for tag in item.tags + ) + ] + + # Status + combined_status_in = set() + if filters.status_in: + combined_status_in.update(s.upper() for s in filters.status_in) + if filters.status: + combined_status_in.add(filters.status.upper()) + + if combined_status_in: + filtered_items = [ + item + for item in filtered_items + if item.status and item.status.upper() in combined_status_in + ] + if filters.status_not_in: + status_not_in_set = set(s.upper() for s in filters.status_not_in) + filtered_items = [ + item + for item in filtered_items + if item.status and item.status.upper() not in status_not_in_set + ] + + # Popularity + if filters.popularity_greater is not None: + filtered_items = [ + item + for item in filtered_items + if item.popularity is not None + and item.popularity > filters.popularity_greater + ] + if filters.popularity_lesser is not None: + filtered_items = [ + item + for item in filtered_items + if item.popularity is not None + and item.popularity < filters.popularity_lesser + ] + + # Average Score + if filters.averageScore_greater is not None: + filtered_items = [ + item + for item in filtered_items + if item.average_score is not None + and item.average_score > filters.averageScore_greater + ] + if filters.averageScore_lesser is not None: + filtered_items = [ + item + for item in filtered_items + if item.average_score is not None + and item.average_score < filters.averageScore_lesser + ] + + # Date Filtering (combining season/year with startDate parameters) + effective_start_date_greater = filters.startDate_greater + effective_start_date_lesser = filters.startDate_lesser + + if filters.seasonYear is not None and filters.season is not None: + season_range = cls._SEASON_MONTH_RANGES.get(filters.season.upper()) + if season_range: + # Calculate start and end of the season in YYYYMMDD format + season_start_date = filters.seasonYear * 10000 + season_range[0] + season_end_date = filters.seasonYear * 10000 + season_range[1] + + # Combine with existing startDate_greater/lesser, taking the stricter boundary + effective_start_date_greater = max( + effective_start_date_greater or 0, season_start_date + ) + effective_start_date_lesser = min( + effective_start_date_lesser or 99999999, season_end_date + ) + + # TODO: re enable date filtering since date is a datetime + + # if filters.startDate is not None: + # # If a specific start date is given, it overrides ranges for exact match + # filtered_items = [ + # item for item in filtered_items if item.start_date == filters.startDate + # ] + # else: + # if effective_start_date_greater is not None: + # filtered_items = [ + # item + # for item in filtered_items + # if item.start_date is not None + # and item.start_date >= datetime(y,m,d) + # ] + # if effective_start_date_lesser is not None: + # filtered_items = [ + # item + # for item in filtered_items + # if item.start_date is not None + # and item.start_date <= effective_start_date_lesser + # ] + + # if filters.endDate_greater is not None: + # filtered_items = [ + # item + # for item in filtered_items + # if item.end_date is not None + # and item.end_date >= filters.endDate_greater + # ] + # if filters.endDate_lesser is not None: + # filtered_items = [ + # item + # for item in filtered_items + # if item.end_date is not None and item.end_date <= filters.endDate_lesser + # ] + + # Format and Type + if filters.format_in: + format_in_set = set(f.upper() for f in filters.format_in) + filtered_items = [ + item + for item in filtered_items + if item.format and item.format.upper() in format_in_set + ] + if filters.type: + filtered_items = [ + item + for item in filtered_items + if item.type and item.type.upper() == filters.type.upper() + ] + + # --- 2. Apply Sorting --- + if filters.sort: + sort_criteria = ( + [filters.sort] if isinstance(filters.sort, str) else filters.sort + ) + + # Sort in reverse order of criteria so the first criterion is primary + for sort_param in reversed(sort_criteria): + sort_info = cls._SORT_MAPPING.get(sort_param.upper()) + if sort_info: + attr_name, is_descending, is_nested_title = sort_info + + def sort_key(item: MediaItem): + if is_nested_title: + # Handle nested title attributes + title_obj = item.title + if title_obj and hasattr(title_obj, attr_name): + val = getattr(title_obj, attr_name) + return val.lower() if isinstance(val, str) else val + return None # Handle missing title or attribute gracefully + else: + # Handle direct attributes + return getattr(item, attr_name) + + # Sort, handling None values (None typically sorts first in ascending) + filtered_items.sort( + key=lambda item: (sort_key(item) is None, sort_key(item)), + reverse=is_descending, + ) + else: + print(f"Warning: Unknown sort parameter '{sort_param}'. Skipping.") + + # --- 3. Apply Pagination --- + start_index = (filters.page - 1) * filters.per_page + end_index = start_index + filters.per_page + paginated_items = filtered_items[start_index:end_index] + + return paginated_items diff --git a/fastanime/cli/services/registry/manager.py b/fastanime/cli/services/registry/manager.py deleted file mode 100644 index c4c9c82..0000000 --- a/fastanime/cli/services/registry/manager.py +++ /dev/null @@ -1,380 +0,0 @@ -""" -Unified Media Registry Manager. - -Provides centralized management of anime metadata, downloads, and watch history -through a single interface, eliminating data duplication. -""" - -from __future__ import annotations - -import json -import logging -import threading -from pathlib import Path -from typing import Dict, List, Optional - -from ....core.constants import APP_DATA_DIR -from ....libs.api.types import MediaItem -from .models import MediaRecord, MediaRegistryIndex, EpisodeStatus, UserMediaData - -logger = logging.getLogger(__name__) - - -class MediaRegistryManager: - """ - Unified manager for anime data, downloads, and watch history. - - Provides a single interface for all media-related operations, - eliminating duplication between download and watch systems. - """ - - def __init__(self, registry_path: Path = None): - self.registry_path = registry_path or APP_DATA_DIR / "media_registry" - self.media_dir = self.registry_path / "media" - self.cache_dir = self.registry_path / "cache" - self.index_file = self.registry_path / "index.json" - - # Thread safety - self._lock = threading.RLock() - - # Cached data - self._index: Optional[MediaRegistryIndex] = None - self._loaded_records: Dict[int, MediaRecord] = {} - - self._ensure_directories() - - def _ensure_directories(self) -> None: - """Ensure registry directories exist.""" - try: - self.registry_path.mkdir(parents=True, exist_ok=True) - self.media_dir.mkdir(exist_ok=True) - self.cache_dir.mkdir(exist_ok=True) - except Exception as e: - logger.error(f"Failed to create registry directories: {e}") - - def _load_index(self) -> MediaRegistryIndex: - """Load or create the registry index.""" - if self._index is not None: - return self._index - - try: - if self.index_file.exists(): - with open(self.index_file, 'r', encoding='utf-8') as f: - data = json.load(f) - self._index = MediaRegistryIndex.model_validate(data) - else: - self._index = MediaRegistryIndex() - self._save_index() - - logger.debug(f"Loaded registry index with {self._index.media_count} entries") - return self._index - - except Exception as e: - logger.error(f"Failed to load registry index: {e}") - self._index = MediaRegistryIndex() - return self._index - - def _save_index(self) -> bool: - """Save the registry index.""" - try: - # Atomic write - temp_file = self.index_file.with_suffix('.tmp') - with open(temp_file, 'w', encoding='utf-8') as f: - json.dump(self._index.model_dump(), f, indent=2, ensure_ascii=False, default=str) - - temp_file.replace(self.index_file) - logger.debug("Saved registry index") - return True - - except Exception as e: - logger.error(f"Failed to save registry index: {e}") - return False - - def _get_media_file_path(self, media_id: int) -> Path: - """Get file path for media record.""" - return self.media_dir / str(media_id) / "record.json" - - def get_media_record(self, media_id: int) -> Optional[MediaRecord]: - """Get media record by ID.""" - with self._lock: - # Check cache first - if media_id in self._loaded_records: - return self._loaded_records[media_id] - - try: - record_file = self._get_media_file_path(media_id) - if not record_file.exists(): - return None - - with open(record_file, 'r', encoding='utf-8') as f: - data = json.load(f) - - record = MediaRecord.model_validate(data) - self._loaded_records[media_id] = record - - logger.debug(f"Loaded media record for {media_id}") - return record - - except Exception as e: - logger.error(f"Failed to load media record {media_id}: {e}") - return None - - def save_media_record(self, record: MediaRecord) -> bool: - """Save media record to storage.""" - with self._lock: - try: - media_id = record.media_item.id - record_file = self._get_media_file_path(media_id) - - # Ensure directory exists - record_file.parent.mkdir(parents=True, exist_ok=True) - - # Atomic write - temp_file = record_file.with_suffix('.tmp') - with open(temp_file, 'w', encoding='utf-8') as f: - json.dump(record.model_dump(), f, indent=2, ensure_ascii=False, default=str) - - temp_file.replace(record_file) - - # Update cache and index - self._loaded_records[media_id] = record - index = self._load_index() - index.add_media_entry(record) - self._save_index() - - logger.debug(f"Saved media record for {media_id}") - return True - - except Exception as e: - logger.error(f"Failed to save media record: {e}") - return False - - def get_or_create_record(self, media_item: MediaItem) -> MediaRecord: - """Get existing record or create new one.""" - record = self.get_media_record(media_item.id) - if record is None: - record = MediaRecord(media_item=media_item) - self.save_media_record(record) - else: - # Update media_item in case metadata changed - record.media_item = media_item - record.user_data.update_timestamp() - self.save_media_record(record) - - return record - - def update_download_completion(self, media_item: MediaItem, episode_number: int, - file_path: Path, file_size: int, quality: str, - checksum: Optional[str] = None) -> bool: - """Update record when download completes.""" - try: - record = self.get_or_create_record(media_item) - record.update_from_download_completion( - episode_number, file_path, file_size, quality, checksum - ) - return self.save_media_record(record) - - except Exception as e: - logger.error(f"Failed to update download completion: {e}") - return False - - def update_from_player_result(self, media_item: MediaItem, episode_number: int, - stop_time: str, total_time: str) -> bool: - """Update record from player feedback.""" - try: - record = self.get_or_create_record(media_item) - record.update_from_player_result(episode_number, stop_time, total_time) - return self.save_media_record(record) - - except Exception as e: - logger.error(f"Failed to update from player result: {e}") - return False - - def mark_episode_watched(self, media_id: int, episode_number: int, - progress: float = 1.0) -> bool: - """Mark episode as watched.""" - try: - record = self.get_media_record(media_id) - if not record: - return False - - episode = record.get_episode_status(episode_number) - episode.watch_status = "completed" if progress >= 0.8 else "watching" - episode.watch_progress = progress - episode.watch_date = datetime.now() - episode.watch_count += 1 - - record.user_data.update_timestamp() - return self.save_media_record(record) - - except Exception as e: - logger.error(f"Failed to mark episode watched: {e}") - return False - - def get_currently_watching(self) -> List[MediaRecord]: - """Get anime currently being watched.""" - try: - index = self._load_index() - watching_records = [] - - for entry in index.media_index.values(): - if entry.user_status == "watching": - record = self.get_media_record(entry.media_id) - if record: - watching_records.append(record) - - return watching_records - - except Exception as e: - logger.error(f"Failed to get currently watching: {e}") - return [] - - def get_recently_watched(self, limit: int = 10) -> List[MediaRecord]: - """Get recently watched anime.""" - try: - index = self._load_index() - - # Sort by last updated - sorted_entries = sorted( - index.media_index.values(), - key=lambda x: x.last_updated, - reverse=True - ) - - recent_records = [] - for entry in sorted_entries[:limit]: - if entry.episodes_watched > 0: # Only include if actually watched - record = self.get_media_record(entry.media_id) - if record: - recent_records.append(record) - - return recent_records - - except Exception as e: - logger.error(f"Failed to get recently watched: {e}") - return [] - - def get_download_queue_candidates(self) -> List[MediaRecord]: - """Get anime that have downloads queued or in progress.""" - try: - index = self._load_index() - download_records = [] - - for entry in index.media_index.values(): - if entry.episodes_downloaded < entry.total_episodes: - record = self.get_media_record(entry.media_id) - if record: - # Check if any episodes are queued/downloading - has_active_downloads = any( - ep.download_status in ["queued", "downloading"] - for ep in record.episodes.values() - ) - if has_active_downloads: - download_records.append(record) - - return download_records - - except Exception as e: - logger.error(f"Failed to get download queue candidates: {e}") - return [] - - def get_continue_episode(self, media_id: int, available_episodes: List[str]) -> Optional[str]: - """Get episode to continue from based on watch history.""" - try: - record = self.get_media_record(media_id) - if not record: - return None - - next_episode = record.next_episode_to_watch - if next_episode and str(next_episode) in available_episodes: - return str(next_episode) - - return None - - except Exception as e: - logger.error(f"Failed to get continue episode: {e}") - return None - - def get_registry_stats(self) -> Dict: - """Get comprehensive registry statistics.""" - try: - index = self._load_index() - - total_downloaded = sum(entry.episodes_downloaded for entry in index.media_index.values()) - total_watched = sum(entry.episodes_watched for entry in index.media_index.values()) - - return { - "total_anime": index.media_count, - "status_breakdown": index.status_breakdown, - "total_episodes_downloaded": total_downloaded, - "total_episodes_watched": total_watched, - "last_updated": index.last_updated.strftime("%Y-%m-%d %H:%M:%S"), - } - - except Exception as e: - logger.error(f"Failed to get registry stats: {e}") - return {} - - def search_media(self, query: str) -> List[MediaRecord]: - """Search media by title.""" - try: - index = self._load_index() - query_lower = query.lower() - results = [] - - for entry in index.media_index.values(): - if query_lower in entry.title.lower(): - record = self.get_media_record(entry.media_id) - if record: - results.append(record) - - return results - - except Exception as e: - logger.error(f"Failed to search media: {e}") - return [] - - def remove_media_record(self, media_id: int) -> bool: - """Remove media record completely.""" - with self._lock: - try: - # Remove from cache - if media_id in self._loaded_records: - del self._loaded_records[media_id] - - # Remove file - record_file = self._get_media_file_path(media_id) - if record_file.exists(): - record_file.unlink() - - # Remove directory if empty - try: - record_file.parent.rmdir() - except OSError: - pass # Directory not empty - - # Update index - index = self._load_index() - if media_id in index.media_index: - del index.media_index[media_id] - index.media_count = len(index.media_index) - self._save_index() - - logger.debug(f"Removed media record {media_id}") - return True - - except Exception as e: - logger.error(f"Failed to remove media record {media_id}: {e}") - return False - - -# Global instance -_media_registry: Optional[MediaRegistryManager] = None - - -def get_media_registry() -> MediaRegistryManager: - """Get or create the global media registry instance.""" - global _media_registry - if _media_registry is None: - _media_registry = MediaRegistryManager() - return _media_registry diff --git a/fastanime/cli/services/registry/models.py b/fastanime/cli/services/registry/models.py index a9dbb6b..476723c 100644 --- a/fastanime/cli/services/registry/models.py +++ b/fastanime/cli/services/registry/models.py @@ -1,346 +1,95 @@ -""" -Unified data models for Media Registry. - -Provides single source of truth for anime metadata, episode tracking, -and user data, eliminating duplication between download and watch systems. -""" - -from __future__ import annotations - import logging from datetime import datetime from pathlib import Path -from typing import Dict, List, Literal, Optional +from typing import Dict, Literal, Optional -from pydantic import BaseModel, ConfigDict, Field, computed_field +from pydantic import BaseModel, Field, computed_field from ....libs.api.types import MediaItem +from ...utils import converters logger = logging.getLogger(__name__) # Type aliases -DownloadStatus = Literal["not_downloaded", "queued", "downloading", "completed", "failed", "paused"] -WatchStatus = Literal["not_watched", "watching", "completed", "dropped", "paused"] +DownloadStatus = Literal[ + "not_downloaded", "queued", "downloading", "completed", "failed", "paused" +] MediaUserStatus = Literal["planning", "watching", "completed", "dropped", "paused"] +REGISTRY_VERSION = "1.0" -class EpisodeStatus(BaseModel): - """ - Unified episode status tracking both download and watch state. - Single source of truth for episode-level data. - """ - - model_config = ConfigDict( - str_strip_whitespace=True, - validate_assignment=True, - ) - - episode_number: int = Field(gt=0) - +class MediaEpisode(BaseModel): + episode_number: str + # Download tracking download_status: DownloadStatus = "not_downloaded" - file_path: Optional[Path] = None - file_size: Optional[int] = None - download_date: Optional[datetime] = None - download_quality: Optional[str] = None - checksum: Optional[str] = None - - # Watch tracking (from player feedback) - watch_status: WatchStatus = "not_watched" - watch_progress: float = Field(default=0.0, ge=0.0, le=1.0) - last_watch_position: Optional[str] = None # "HH:MM:SS" from PlayerResult - total_duration: Optional[str] = None # "HH:MM:SS" from PlayerResult - watch_date: Optional[datetime] = None - watch_count: int = Field(default=0, ge=0) - - # Integration fields - auto_marked_watched: bool = Field(default=False, description="Auto-marked watched from download") - - @computed_field - @property - def is_available_locally(self) -> bool: - """Check if episode is downloaded and file exists.""" - return ( - self.download_status == "completed" - and self.file_path is not None - and self.file_path.exists() - ) - - @computed_field - @property - def completion_percentage(self) -> float: - """Calculate actual watch completion from player data.""" - if self.last_watch_position and self.total_duration: - try: - last_seconds = self._time_to_seconds(self.last_watch_position) - total_seconds = self._time_to_seconds(self.total_duration) - if total_seconds > 0: - return min(100.0, (last_seconds / total_seconds) * 100) - except (ValueError, AttributeError): - pass - return self.watch_progress * 100 - - @computed_field - @property - def should_auto_mark_watched(self) -> bool: - """Check if episode should be auto-marked as watched.""" - return self.completion_percentage >= 80.0 and self.watch_status != "completed" - - def _time_to_seconds(self, time_str: str) -> int: - """Convert HH:MM:SS to seconds.""" - try: - parts = time_str.split(':') - if len(parts) == 3: - h, m, s = map(int, parts) - return h * 3600 + m * 60 + s - except (ValueError, AttributeError): - pass - return 0 - - def update_from_player_result(self, stop_time: str, total_time: str) -> None: - """Update watch status from PlayerResult.""" - self.last_watch_position = stop_time - self.total_duration = total_time - self.watch_date = datetime.now() - self.watch_count += 1 - - # Auto-mark as completed if 80%+ watched - if self.should_auto_mark_watched: - self.watch_status = "completed" - self.watch_progress = 1.0 - - -class UserMediaData(BaseModel): - """ - User-specific data for a media item. - Consolidates user preferences from both download and watch systems. - """ - - model_config = ConfigDict( - str_strip_whitespace=True, - validate_assignment=True, - ) - - # User status and preferences - status: MediaUserStatus = "planning" - notes: str = "" - tags: List[str] = Field(default_factory=list) - rating: Optional[int] = Field(None, ge=1, le=10) - favorite: bool = False - priority: int = Field(default=0, ge=0) - - # Download preferences - preferred_quality: str = "1080" - auto_download_new: bool = False - download_path: Optional[Path] = None - - # Watch preferences - continue_from_history: bool = True - auto_mark_watched_on_download: bool = False - - # Timestamps - created_date: datetime = Field(default_factory=datetime.now) - last_updated: datetime = Field(default_factory=datetime.now) - - def update_timestamp(self) -> None: - """Update last_updated timestamp.""" - self.last_updated = datetime.now() + file_path: Path + download_date: datetime = Field(default_factory=datetime.now) class MediaRecord(BaseModel): - """ - Unified media record - single source of truth for anime data. - Replaces both MediaDownloadRecord and WatchHistoryEntry. - """ - - model_config = ConfigDict( - str_strip_whitespace=True, - validate_assignment=True, - ) - media_item: MediaItem - episodes: Dict[int, EpisodeStatus] = Field(default_factory=dict) - user_data: UserMediaData = Field(default_factory=UserMediaData) - - @computed_field - @property - def display_title(self) -> str: - """Get display title for the anime.""" - return ( - self.media_item.title.english - or self.media_item.title.romaji - or self.media_item.title.native - or f"Anime #{self.media_item.id}" - ) - - @computed_field - @property - def total_episodes_downloaded(self) -> int: - """Count of successfully downloaded episodes.""" - return len([ep for ep in self.episodes.values() if ep.is_available_locally]) - - @computed_field - @property - def total_episodes_watched(self) -> int: - """Count of completed episodes.""" - return len([ep for ep in self.episodes.values() if ep.watch_status == "completed"]) - - @computed_field - @property - def last_watched_episode(self) -> int: - """Get highest watched episode number.""" - watched_episodes = [ - ep.episode_number for ep in self.episodes.values() - if ep.watch_status == "completed" - ] - return max(watched_episodes) if watched_episodes else 0 - - @computed_field - @property - def next_episode_to_watch(self) -> Optional[int]: - """Get next episode to watch based on progress.""" - if not self.episodes: - return 1 - - # Find highest completed episode - last_watched = self.last_watched_episode - - if last_watched == 0: - return 1 - - next_ep = last_watched + 1 - total_eps = self.media_item.episodes or float('inf') - - return next_ep if next_ep <= total_eps else None - - @computed_field - @property - def download_completion_percentage(self) -> float: - """Download completion percentage.""" - if not self.media_item.episodes or self.media_item.episodes == 0: - return 0.0 - return (self.total_episodes_downloaded / self.media_item.episodes) * 100 - + media_episodes: list[MediaEpisode] = Field(default_factory=list) + + +class MediaRegistryIndexEntry(BaseModel): + media_id: int + media_api: Literal["anilist", "NONE", "jikan"] = "NONE" + + status: MediaUserStatus = "watching" + progress: str = "0" + last_watch_position: Optional[str] = None + last_watched: datetime = Field(default_factory=datetime.now) + total_duration: Optional[str] = None + total_episodes: int = 0 + + score: float = 0 + repeat: int = 0 + notes: str = "" + + last_notified_episode: Optional[str] = None + + # for first watch only + start_date: datetime = Field(default_factory=datetime.now) + completed_at: datetime = Field(default_factory=datetime.now) + @computed_field @property def watch_completion_percentage(self) -> float: - """Watch completion percentage.""" - if not self.media_item.episodes or self.media_item.episodes == 0: - return 0.0 - return (self.total_episodes_watched / self.media_item.episodes) * 100 - - def get_episode_status(self, episode_number: int) -> EpisodeStatus: - """Get or create episode status.""" - if episode_number not in self.episodes: - self.episodes[episode_number] = EpisodeStatus(episode_number=episode_number) - return self.episodes[episode_number] - - def update_from_download_completion(self, episode_number: int, file_path: Path, - file_size: int, quality: str, checksum: Optional[str] = None) -> None: - """Update episode status when download completes.""" - episode = self.get_episode_status(episode_number) - episode.download_status = "completed" - episode.file_path = file_path - episode.file_size = file_size - episode.download_quality = quality - episode.checksum = checksum - episode.download_date = datetime.now() - - # Auto-mark as watched if enabled - if self.user_data.auto_mark_watched_on_download and episode.watch_status == "not_watched": - episode.watch_status = "completed" - episode.watch_progress = 1.0 - episode.auto_marked_watched = True - episode.watch_date = datetime.now() - - self.user_data.update_timestamp() - - def update_from_player_result(self, episode_number: int, stop_time: str, total_time: str) -> None: - """Update episode status from player feedback.""" - episode = self.get_episode_status(episode_number) - episode.update_from_player_result(stop_time, total_time) - self.user_data.update_timestamp() - - # Update overall status based on progress - if episode.watch_status == "completed": - if self.user_data.status == "planning": - self.user_data.status = "watching" - - # Check if anime is completed - if self.media_item.episodes and self.total_episodes_watched >= self.media_item.episodes: - self.user_data.status = "completed" + """Watch completion percentage.""" + if self.total_duration and self.last_watch_position: + return ( + converters.time_to_seconds(self.last_watch_position) + / converters.time_to_seconds(self.total_duration) + ) * 100 + return 0.0 class MediaRegistryIndex(BaseModel): - """ - Lightweight index for fast media registry operations. - Provides quick access without loading full MediaRecord files. - """ - - model_config = ConfigDict(validate_assignment=True) - - version: str = Field(default="1.0") + version: str = Field(default=REGISTRY_VERSION) last_updated: datetime = Field(default_factory=datetime.now) - media_count: int = Field(default=0, ge=0) - - # Quick access index - media_index: Dict[int, "MediaIndexEntry"] = Field(default_factory=dict) - + + media_index: Dict[str, MediaRegistryIndexEntry] = Field(default_factory=dict) + @computed_field @property def status_breakdown(self) -> Dict[str, int]: """Get breakdown by user status.""" - breakdown = {"planning": 0, "watching": 0, "completed": 0, "dropped": 0, "paused": 0} + breakdown = {} for entry in self.media_index.values(): - breakdown[entry.user_status] = breakdown.get(entry.user_status, 0) + 1 + breakdown[entry.status] = breakdown.get(entry.status, 0) + 1 return breakdown - - def add_media_entry(self, media_record: MediaRecord) -> None: - """Add or update media entry in index.""" - entry = MediaIndexEntry( - media_id=media_record.media_item.id, - title=media_record.display_title, - user_status=media_record.user_data.status, - episodes_downloaded=media_record.total_episodes_downloaded, - episodes_watched=media_record.total_episodes_watched, - total_episodes=media_record.media_item.episodes or 0, - last_updated=media_record.user_data.last_updated, - last_watched_episode=media_record.last_watched_episode, - next_episode=media_record.next_episode_to_watch - ) - - self.media_index[media_record.media_item.id] = entry - self.media_count = len(self.media_index) - self.last_updated = datetime.now() - -class MediaIndexEntry(BaseModel): - """Lightweight index entry for a media item.""" - - model_config = ConfigDict(validate_assignment=True) - - media_id: int - title: str - user_status: MediaUserStatus - episodes_downloaded: int = 0 - episodes_watched: int = 0 - total_episodes: int = 0 - last_updated: datetime - last_watched_episode: int = 0 - next_episode: Optional[int] = None - @computed_field @property - def download_progress(self) -> float: - """Download progress percentage.""" - if self.total_episodes == 0: - return 0.0 - return (self.episodes_downloaded / self.total_episodes) * 100 - + def media_count_breakdown(self) -> Dict[str, int]: + breakdown = {} + for entry in self.media_index.values(): + breakdown[entry.media_api] = breakdown.get(entry.media_api, 0) + 1 + return breakdown + @computed_field @property - def watch_progress(self) -> float: - """Watch progress percentage.""" - if self.total_episodes == 0: - return 0.0 - return (self.episodes_watched / self.total_episodes) * 100 + def media_count(self) -> int: + """Get the number of media.""" + return len(self.media_index) diff --git a/fastanime/cli/services/registry/service.py b/fastanime/cli/services/registry/service.py new file mode 100644 index 0000000..88b9f00 --- /dev/null +++ b/fastanime/cli/services/registry/service.py @@ -0,0 +1,229 @@ +import json +import logging +from datetime import datetime +from pathlib import Path +from typing import Dict, Generator, List, Optional + +from ....core.config.model import MediaRegistryConfig +from ....core.utils.file import AtomicWriter, FileLock, check_file_modified +from ....libs.api.params import ApiSearchParams +from ....libs.api.types import MediaItem +from ....libs.players.types import PlayerResult +from .filters import MediaFilter +from .models import ( + MediaRecord, + MediaRegistryIndex, + MediaRegistryIndexEntry, +) + +logger = logging.getLogger(__name__) + + +class MediaRegistryService: + def __init__(self, media_api: str, config: MediaRegistryConfig): + self.config = config + self.media_registry_dir = self.config.media_dir / media_api + self._media_api = media_api + self._ensure_directories() + self._index_file = self.config.index_dir / "registry.json" + self._index_file_modified_time = 0 + _lock_file = self.config.media_dir / "registry.lock" + self._lock = FileLock(_lock_file) + + def _ensure_directories(self) -> None: + """Ensure registry directories exist.""" + try: + self.media_registry_dir.mkdir(parents=True, exist_ok=True) + self.config.index_dir.mkdir(parents=True, exist_ok=True) + except Exception as e: + logger.error(f"Failed to create registry directories: {e}") + + def _load_index(self) -> MediaRegistryIndex: + """Load or create the registry index.""" + self._index_file_modified_time, is_modified = check_file_modified( + self._index_file, self._index_file_modified_time + ) + if not is_modified and self._index is not None: + return self._index + if self._index_file.exists(): + with self._index_file.open("r", encoding="utf-8") as f: + data = json.load(f) + self._index = MediaRegistryIndex.model_validate(data) + else: + self._index = MediaRegistryIndex() + self._save_index(self._index) + + logger.debug(f"Loaded registry index with {self._index.media_count} entries") + return self._index + + def _save_index(self, index: MediaRegistryIndex): + """Save the registry index.""" + with self._lock: + index.last_updated = datetime.now() + with AtomicWriter(self._index_file) as f: + json.dump(index.model_dump(), f, indent=2) + + logger.debug("saved registry index") + + def get_media_index_entry(self, media_id: int) -> Optional[MediaRegistryIndexEntry]: + index = self._load_index() + return index.media_index.get(f"{self._media_api}_{media_id}") + + def _get_media_file_path(self, media_id: int) -> Path: + """Get file path for media record.""" + return self.media_registry_dir / f"{media_id}.json" + + def get_media_record(self, media_id: int) -> Optional[MediaRecord]: + record_file = self._get_media_file_path(media_id) + if not record_file.exists(): + return None + + data = json.load(record_file.open(mode="r", encoding="utf-8")) + + record = MediaRecord.model_validate(data) + + logger.debug(f"Loaded media record for {media_id}") + return record + + def get_or_create_index_entry(self, media_id: int) -> MediaRegistryIndexEntry: + index_entry = self.get_media_index_entry(media_id) + if not index_entry: + index = self._load_index() + index_entry = MediaRegistryIndexEntry( + media_id=media_id, + media_api=self._media_api, # pyright:ignore + ) + index.media_index[f"{self._media_api}_{media_id}"] = index_entry + self._save_index(index) + return index_entry + return index_entry + + def save_media_index_entry(self, index_entry: MediaRegistryIndexEntry) -> bool: + with self._lock: + index = self._load_index() + index.media_index[f"{self._media_api}_{index_entry.media_id}"] = index_entry + self._save_index(index) + + logger.debug(f"Saved media record for {index_entry.media_id}") + return True + + def save_media_record(self, record: MediaRecord) -> bool: + with self._lock: + self.get_or_create_index_entry(record.media_item.id) + media_id = record.media_item.id + + record_file = self._get_media_file_path(media_id) + + with AtomicWriter(record_file) as f: + json.dump(record.model_dump(), f, indent=2, default=str) + + logger.debug(f"Saved media record for {media_id}") + return True + + def get_or_create_record(self, media_item: MediaItem) -> MediaRecord: + record = self.get_media_record(media_item.id) + if record is None: + record = MediaRecord(media_item=media_item) + self.save_media_record(record) + else: + record.media_item = media_item + self.save_media_record(record) + + return record + + def update_from_player_result( + self, media_item: MediaItem, episode_number: str, player_result: PlayerResult + ): + """Update record from player feedback.""" + self.get_or_create_record(media_item) + + index = self._load_index() + index_entry = index.media_index[f"{self._media_api}_{media_item.id}"] + + index_entry.last_watch_position = player_result.stop_time + index_entry.total_duration = player_result.total_time + index_entry.progress = episode_number + index_entry.last_watched = datetime.now() + + index.media_index[f"{self._media_api}_{media_item.id}"] = index_entry + self._save_index(index) + + def get_recently_watched(self, limit: int) -> List[MediaRecord]: + """Get recently watched anime.""" + index = self._load_index() + + sorted_entries = sorted( + index.media_index.values(), key=lambda x: x.last_watched, reverse=True + ) + + recent_media = [] + for entry in sorted_entries: + record = self.get_media_record(entry.media_id) + if record: + recent_media.append(record.media_item) + if len(recent_media) == limit: + break + + return recent_media + + def get_registry_stats(self) -> Dict: + """Get comprehensive registry statistics.""" + try: + index = self._load_index() + + return { + "total_media_breakdown": index.media_count_breakdown, + "status_breakdown": index.status_breakdown, + "last_updated": index.last_updated.strftime("%Y-%m-%d %H:%M:%S"), + } + + except Exception as e: + logger.error(f"Failed to get registry stats: {e}") + return {} + + def get_all_media_records(self) -> Generator[MediaRecord, None, List[MediaRecord]]: + records = [] + for record_file in self.media_registry_dir.iterdir(): + try: + if record_file.is_file(): + id = record_file.stem + if record := self.get_media_record(int(id)): + records.append(record) + yield record + else: + logger.warning( + f"{self.media_registry_dir} is impure; ignoring folder: {record_file}" + ) + except Exception as e: + logger.warning(f"{self.media_registry_dir} is impure which caused: {e}") + return records + + def search_for_media(self, params: ApiSearchParams) -> List[MediaItem]: + """Search media by title.""" + try: + # TODO: enhance performance + media_items = [record.media_item for record in self.get_all_media_records()] + + return MediaFilter.apply(media_items, params) + + except Exception as e: + logger.error(f"Failed to search media: {e}") + return [] + + def remove_media_record(self, media_id: int): + with self._lock: + record_file = self._get_media_file_path(media_id) + if record_file.exists(): + record_file.unlink() + try: + record_file.parent.rmdir() + except OSError: + pass + + index = self._load_index() + id = f"{self._media_api}_{media_id}" + if id in index.media_index: + del index.media_index[id] + self._save_index(index) + + logger.debug(f"Removed media record {media_id}") diff --git a/fastanime/cli/services/registry/tracker.py b/fastanime/cli/services/registry/tracker.py deleted file mode 100644 index c29f146..0000000 --- a/fastanime/cli/services/registry/tracker.py +++ /dev/null @@ -1,289 +0,0 @@ -""" -Unified Media Tracker for player integration and real-time updates. - -Provides automatic tracking of watch progress and download completion -through a single interface. -""" - -from __future__ import annotations - -import logging -from typing import Optional - -from ....libs.api.types import MediaItem -from ....libs.players.types import PlayerResult -from .manager import MediaRegistryManager, get_media_registry - -logger = logging.getLogger(__name__) - - -class MediaTracker: - """ - Unified tracker for media interactions. - - Handles automatic updates from player results and download completion, - providing seamless integration between watching and downloading. - """ - - def __init__(self, registry_manager: MediaRegistryManager = None): - self.registry = registry_manager or get_media_registry() - - def track_episode_start(self, media_item: MediaItem, episode: int) -> bool: - """ - Track when episode playback starts. - - Args: - media_item: The anime being watched - episode: Episode number being started - - Returns: - True if tracking was successful - """ - try: - record = self.registry.get_or_create_record(media_item) - episode_status = record.get_episode_status(episode) - - # Only update to "watching" if not already completed - if episode_status.watch_status not in ["completed"]: - episode_status.watch_status = "watching" - - # Update overall user status if still planning - if record.user_data.status == "planning": - record.user_data.status = "watching" - - return self.registry.save_media_record(record) - - except Exception as e: - logger.error(f"Failed to track episode start: {e}") - return False - - def track_from_player_result(self, media_item: MediaItem, episode: int, - player_result: PlayerResult) -> bool: - """ - Update watch status based on actual player feedback. - - Args: - media_item: The anime that was watched - episode: Episode number that was watched - player_result: Result from the player session - - Returns: - True if tracking was successful - """ - try: - if not player_result.stop_time or not player_result.total_time: - logger.warning("PlayerResult missing timing data - cannot track accurately") - return False - - return self.registry.update_from_player_result( - media_item, episode, player_result.stop_time, player_result.total_time - ) - - except Exception as e: - logger.error(f"Failed to track from player result: {e}") - return False - - def track_download_completion(self, media_item: MediaItem, episode: int, - file_path, file_size: int, quality: str, - checksum: Optional[str] = None) -> bool: - """ - Update status when download completes. - - Args: - media_item: The anime that was downloaded - episode: Episode number that was downloaded - file_path: Path to downloaded file - file_size: File size in bytes - quality: Download quality - checksum: Optional file checksum - - Returns: - True if tracking was successful - """ - try: - from pathlib import Path - file_path = Path(file_path) if not isinstance(file_path, Path) else file_path - - return self.registry.update_download_completion( - media_item, episode, file_path, file_size, quality, checksum - ) - - except Exception as e: - logger.error(f"Failed to track download completion: {e}") - return False - - def get_continue_episode(self, media_item: MediaItem, - available_episodes: list) -> Optional[str]: - """ - Get episode to continue watching based on history. - - Args: - media_item: The anime - available_episodes: List of available episode numbers - - Returns: - Episode number to continue from or None - """ - try: - return self.registry.get_continue_episode( - media_item.id, [str(ep) for ep in available_episodes] - ) - - except Exception as e: - logger.error(f"Failed to get continue episode: {e}") - return None - - def get_watch_progress(self, media_id: int) -> Optional[dict]: - """ - Get current watch progress for an anime. - - Args: - media_id: ID of the anime - - Returns: - Dictionary with progress info or None if not found - """ - try: - record = self.registry.get_media_record(media_id) - if not record: - return None - - return { - "last_episode": record.last_watched_episode, - "next_episode": record.next_episode_to_watch, - "status": record.user_data.status, - "title": record.display_title, - "watch_percentage": record.watch_completion_percentage, - "download_percentage": record.download_completion_percentage, - "episodes_watched": record.total_episodes_watched, - "episodes_downloaded": record.total_episodes_downloaded, - } - - except Exception as e: - logger.error(f"Failed to get watch progress: {e}") - return None - - def update_anime_status(self, media_id: int, status: str) -> bool: - """ - Update overall anime status. - - Args: - media_id: ID of the anime - status: New status (planning, watching, completed, dropped, paused) - - Returns: - True if update was successful - """ - try: - record = self.registry.get_media_record(media_id) - if not record: - return False - - if status in ["planning", "watching", "completed", "dropped", "paused"]: - record.user_data.status = status - record.user_data.update_timestamp() - return self.registry.save_media_record(record) - - return False - - except Exception as e: - logger.error(f"Failed to update anime status: {e}") - return False - - def add_anime_to_registry(self, media_item: MediaItem, status: str = "planning") -> bool: - """ - Add anime to registry with initial status. - - Args: - media_item: The anime to add - status: Initial status - - Returns: - True if added successfully - """ - try: - record = self.registry.get_or_create_record(media_item) - if status in ["planning", "watching", "completed", "dropped", "paused"]: - record.user_data.status = status - record.user_data.update_timestamp() - return self.registry.save_media_record(record) - - return False - - except Exception as e: - logger.error(f"Failed to add anime to registry: {e}") - return False - - def should_auto_download_next(self, media_id: int) -> Optional[int]: - """ - Check if next episode should be auto-downloaded based on watch progress. - - Args: - media_id: ID of the anime - - Returns: - Episode number to download or None - """ - try: - record = self.registry.get_media_record(media_id) - if not record or not record.user_data.auto_download_new: - return None - - # Only if currently watching - if record.user_data.status != "watching": - return None - - next_episode = record.next_episode_to_watch - if not next_episode: - return None - - # Check if already downloaded - episode_status = record.episodes.get(next_episode) - if episode_status and episode_status.is_available_locally: - return None - - return next_episode - - except Exception as e: - logger.error(f"Failed to check auto download: {e}") - return None - - -# Global tracker instance -_media_tracker: Optional[MediaTracker] = None - - -def get_media_tracker() -> MediaTracker: - """Get or create the global media tracker instance.""" - global _media_tracker - if _media_tracker is None: - _media_tracker = MediaTracker() - return _media_tracker - - -# Convenience functions for backward compatibility -def track_episode_viewing(media_item: MediaItem, episode: int, start_tracking: bool = True) -> bool: - """Track episode viewing (backward compatibility).""" - tracker = get_media_tracker() - return tracker.track_episode_start(media_item, episode) - - -def get_continue_episode(media_item: MediaItem, available_episodes: list, - prefer_history: bool = True) -> Optional[str]: - """Get continue episode (backward compatibility).""" - if not prefer_history: - return None - - tracker = get_media_tracker() - return tracker.get_continue_episode(media_item, available_episodes) - - -def update_episode_progress(media_id: int, episode: int, completion_percentage: float) -> bool: - """Update episode progress (backward compatibility).""" - # This would need more context to implement properly with PlayerResult - # For now, just mark as watched if 80%+ - if completion_percentage >= 80: - tracker = get_media_tracker() - registry = get_media_registry() - return registry.mark_episode_watched(media_id, episode, completion_percentage / 100) - return True diff --git a/fastanime/cli/services/session/manager.py b/fastanime/cli/services/session/service.py similarity index 100% rename from fastanime/cli/services/session/manager.py rename to fastanime/cli/services/session/service.py diff --git a/fastanime/cli/services/watch_history/manager.py b/fastanime/cli/services/watch_history/service.py similarity index 100% rename from fastanime/cli/services/watch_history/manager.py rename to fastanime/cli/services/watch_history/service.py diff --git a/fastanime/cli/services/watch_history/tracker.py b/fastanime/cli/services/watch_history/tracker.py deleted file mode 100644 index c454cdd..0000000 --- a/fastanime/cli/services/watch_history/tracker.py +++ /dev/null @@ -1,273 +0,0 @@ -""" -Watch history tracking utilities for integration with episode viewing and player controls. -Provides automatic watch history updates during episode viewing. -""" - -import logging -from typing import Optional - -from ....libs.api.types import MediaItem -from .manager import WatchHistoryManager - -logger = logging.getLogger(__name__) - - -class WatchHistoryTracker: - """ - Tracks watch history automatically during episode viewing. - Integrates with the episode selection and player control systems. - """ - - def __init__(self): - self.history_manager = WatchHistoryManager() - - def track_episode_start(self, media_item: MediaItem, episode: int) -> bool: - """ - Track when an episode starts being watched. - - Args: - media_item: The anime being watched - episode: Episode number being started - - Returns: - True if tracking was successful - """ - try: - # Update or create watch history entry - success = self.history_manager.add_or_update_entry( - media_item=media_item, - episode=episode, - progress=0.0, - status="watching" - ) - - if success: - logger.info(f"Started tracking episode {episode} of {media_item.title.english or media_item.title.romaji}") - - return success - - except Exception as e: - logger.error(f"Failed to track episode start: {e}") - return False - - def track_episode_progress(self, media_id: int, episode: int, progress: float) -> bool: - """ - Track progress within an episode. - - Args: - media_id: ID of the anime - episode: Episode number - progress: Progress within the episode (0.0-1.0) - - Returns: - True if tracking was successful - """ - try: - success = self.history_manager.mark_episode_watched(media_id, episode, progress) - - if success and progress >= 0.8: # Consider episode "watched" at 80% - logger.info(f"Episode {episode} marked as watched (progress: {progress:.1%})") - - return success - - except Exception as e: - logger.error(f"Failed to track episode progress: {e}") - return False - - def track_episode_completion(self, media_id: int, episode: int) -> bool: - """ - Track when an episode is completed. - - Args: - media_id: ID of the anime - episode: Episode number completed - - Returns: - True if tracking was successful - """ - try: - # Mark episode as fully watched - success = self.history_manager.mark_episode_watched(media_id, episode, 1.0) - - if success: - # Check if this was the final episode and mark as completed - entry = self.history_manager.get_entry(media_id) - if entry and entry.media_item.episodes and episode >= entry.media_item.episodes: - self.history_manager.mark_completed(media_id) - logger.info(f"Anime completed: {entry.get_display_title()}") - else: - logger.info(f"Episode {episode} completed") - - return success - - except Exception as e: - logger.error(f"Failed to track episode completion: {e}") - return False - - def get_watch_progress(self, media_id: int) -> Optional[dict]: - """ - Get current watch progress for an anime. - - Args: - media_id: ID of the anime - - Returns: - Dictionary with progress info or None if not found - """ - try: - entry = self.history_manager.get_entry(media_id) - if entry: - return { - "last_episode": entry.last_watched_episode, - "progress": entry.watch_progress, - "status": entry.status, - "next_episode": entry.last_watched_episode + 1, - "title": entry.get_display_title(), - } - return None - - except Exception as e: - logger.error(f"Failed to get watch progress: {e}") - return None - - def should_continue_from_history(self, media_id: int, available_episodes: list) -> Optional[str]: - """ - Determine if we should continue from watch history and which episode. - - Args: - media_id: ID of the anime - available_episodes: List of available episode numbers - - Returns: - Episode number to continue from, or None if no history - """ - try: - progress = self.get_watch_progress(media_id) - if not progress: - return None - - last_episode = progress["last_episode"] - next_episode = last_episode + 1 - - # Check if next episode is available - if str(next_episode) in available_episodes: - logger.info(f"Continuing from episode {next_episode} based on watch history") - return str(next_episode) - # Fall back to last watched episode if next isn't available - elif str(last_episode) in available_episodes and last_episode > 0: - logger.info(f"Next episode not available, falling back to episode {last_episode}") - return str(last_episode) - - return None - - except Exception as e: - logger.error(f"Failed to determine continue episode: {e}") - return None - - def update_anime_status(self, media_id: int, status: str) -> bool: - """ - Update the status of an anime in watch history. - - Args: - media_id: ID of the anime - status: New status (watching, completed, dropped, paused) - - Returns: - True if update was successful - """ - try: - success = self.history_manager.change_status(media_id, status) - if success: - logger.info(f"Updated anime status to {status}") - return success - - except Exception as e: - logger.error(f"Failed to update anime status: {e}") - return False - - def add_anime_to_history(self, media_item: MediaItem, status: str = "planning") -> bool: - """ - Add an anime to watch history without watching any episodes. - - Args: - media_item: The anime to add - status: Initial status - - Returns: - True if successful - """ - try: - success = self.history_manager.add_or_update_entry( - media_item=media_item, - episode=0, - progress=0.0, - status=status - ) - - if success: - logger.info(f"Added {media_item.title.english or media_item.title.romaji} to watch history") - - return success - - except Exception as e: - logger.error(f"Failed to add anime to history: {e}") - return False - - -# Global tracker instance for use throughout the application -watch_tracker = WatchHistoryTracker() - - -def track_episode_viewing(media_item: MediaItem, episode: int, start_tracking: bool = True) -> bool: - """ - Convenience function to track episode viewing. - - Args: - media_item: The anime being watched - episode: Episode number - start_tracking: Whether to start tracking (True) or just update progress - - Returns: - True if tracking was successful - """ - if start_tracking: - return watch_tracker.track_episode_start(media_item, episode) - else: - return watch_tracker.track_episode_completion(media_item.id, episode) - - -def get_continue_episode(media_item: MediaItem, available_episodes: list, prefer_history: bool = True) -> Optional[str]: - """ - Get the episode to continue from based on watch history. - - Args: - media_item: The anime - available_episodes: List of available episodes - prefer_history: Whether to prefer local history over remote - - Returns: - Episode number to continue from - """ - if prefer_history: - return watch_tracker.should_continue_from_history(media_item.id, available_episodes) - return None - - -def update_episode_progress(media_id: int, episode: int, completion_percentage: float) -> bool: - """ - Update progress for an episode based on completion percentage. - - Args: - media_id: ID of the anime - episode: Episode number - completion_percentage: Completion percentage (0-100) - - Returns: - True if update was successful - """ - progress = completion_percentage / 100.0 - - if completion_percentage >= 80: # Consider episode completed at 80% - return watch_tracker.track_episode_completion(media_id, episode) - else: - return watch_tracker.track_episode_progress(media_id, episode, progress) diff --git a/fastanime/cli/services/watch_history/types.py b/fastanime/cli/services/watch_history/types.py deleted file mode 100644 index 9bfd538..0000000 --- a/fastanime/cli/services/watch_history/types.py +++ /dev/null @@ -1,187 +0,0 @@ -""" -Watch history data models and types for the interactive CLI. -Provides comprehensive data structures for tracking and managing local watch history. -""" - -from __future__ import annotations - -import logging -from datetime import datetime -from typing import Dict, List, Optional - -from pydantic import BaseModel, Field - -from ....libs.api.types import MediaItem - -logger = logging.getLogger(__name__) - - -class WatchHistoryEntry(BaseModel): - """ - Represents a single entry in the watch history. - Contains media information and viewing progress. - """ - - media_item: MediaItem - last_watched_episode: int = 0 - watch_progress: float = 0.0 # Progress within the episode (0.0-1.0) - times_watched: int = 1 - first_watched: datetime = Field(default_factory=datetime.now) - last_watched: datetime = Field(default_factory=datetime.now) - status: str = "watching" # watching, completed, dropped, paused - notes: str = "" - - # Download integration fields - has_downloads: bool = Field(default=False, description="Whether episodes are downloaded") - offline_available: bool = Field(default=False, description="Can watch offline") - - # With Pydantic, serialization is automatic! - # No need for manual to_dict() and from_dict() methods - # Use: entry.model_dump() and WatchHistoryEntry.model_validate(data) - - def update_progress(self, episode: int, progress: float = 0.0, status: Optional[str] = None): - """Update watch progress for this entry.""" - self.last_watched_episode = max(self.last_watched_episode, episode) - self.watch_progress = progress - self.last_watched = datetime.now() - if status: - self.status = status - - def mark_completed(self): - """Mark this entry as completed.""" - self.status = "completed" - self.last_watched = datetime.now() - if self.media_item.episodes: - self.last_watched_episode = self.media_item.episodes - self.watch_progress = 1.0 - - def get_display_title(self) -> str: - """Get the best available title for display.""" - if self.media_item.title.english: - return self.media_item.title.english - elif self.media_item.title.romaji: - return self.media_item.title.romaji - elif self.media_item.title.native: - return self.media_item.title.native - else: - return f"Anime #{self.media_item.id}" - - def get_progress_display(self) -> str: - """Get a human-readable progress display.""" - if self.media_item.episodes: - return f"{self.last_watched_episode}/{self.media_item.episodes}" - else: - return f"Ep {self.last_watched_episode}" - - def get_status_emoji(self) -> str: - """Get emoji representation of status.""" - status_emojis = { - "watching": "📺", - "completed": "✅", - "dropped": "🚮", - "paused": "⏸️", - "planning": "📑" - } - return status_emojis.get(self.status, "❓") - - -class WatchHistoryData(BaseModel): - """Complete watch history data container.""" - - entries: Dict[int, WatchHistoryEntry] = Field(default_factory=dict) - last_updated: datetime = Field(default_factory=datetime.now) - format_version: str = "1.0" - - # With Pydantic, serialization is automatic! - # No need for manual to_dict() and from_dict() methods - # Use: data.model_dump() and WatchHistoryData.model_validate(data) - - def add_or_update_entry(self, media_item: MediaItem, episode: int = 0, progress: float = 0.0, status: str = "watching") -> WatchHistoryEntry: - """Add or update a watch history entry.""" - media_id = media_item.id - - if media_id in self.entries: - # Update existing entry - entry = self.entries[media_id] - entry.update_progress(episode, progress, status) - entry.times_watched += 1 - else: - # Create new entry - entry = WatchHistoryEntry( - media_item=media_item, - last_watched_episode=episode, - watch_progress=progress, - status=status, - ) - self.entries[media_id] = entry - - self.last_updated = datetime.now() - return entry - - def get_entry(self, media_id: int) -> Optional[WatchHistoryEntry]: - """Get a specific watch history entry.""" - return self.entries.get(media_id) - - def remove_entry(self, media_id: int) -> bool: - """Remove an entry from watch history.""" - if media_id in self.entries: - del self.entries[media_id] - self.last_updated = datetime.now() - return True - return False - - def get_entries_by_status(self, status: str) -> List[WatchHistoryEntry]: - """Get all entries with a specific status.""" - return [entry for entry in self.entries.values() if entry.status == status] - - def get_recently_watched(self, limit: int = 10) -> List[WatchHistoryEntry]: - """Get recently watched entries.""" - sorted_entries = sorted( - self.entries.values(), - key=lambda x: x.last_watched, - reverse=True - ) - return sorted_entries[:limit] - - def get_watching_entries(self) -> List[WatchHistoryEntry]: - """Get entries that are currently being watched.""" - return self.get_entries_by_status("watching") - - def get_completed_entries(self) -> List[WatchHistoryEntry]: - """Get completed entries.""" - return self.get_entries_by_status("completed") - - def search_entries(self, query: str) -> List[WatchHistoryEntry]: - """Search entries by title.""" - query_lower = query.lower() - results = [] - - for entry in self.entries.values(): - title = entry.get_display_title().lower() - if query_lower in title: - results.append(entry) - - return results - - def get_stats(self) -> dict: - """Get watch history statistics.""" - total_entries = len(self.entries) - watching = len(self.get_entries_by_status("watching")) - completed = len(self.get_entries_by_status("completed")) - dropped = len(self.get_entries_by_status("dropped")) - paused = len(self.get_entries_by_status("paused")) - - total_episodes = sum( - entry.last_watched_episode - for entry in self.entries.values() - ) - - return { - "total_entries": total_entries, - "watching": watching, - "completed": completed, - "dropped": dropped, - "paused": paused, - "total_episodes_watched": total_episodes, - "last_updated": self.last_updated.strftime("%Y-%m-%d %H:%M:%S"), - } diff --git a/fastanime/cli/utils/converters.py b/fastanime/cli/utils/converters.py new file mode 100644 index 0000000..bc57c4b --- /dev/null +++ b/fastanime/cli/utils/converters.py @@ -0,0 +1,10 @@ +def time_to_seconds(time_str: str) -> int: + """Convert HH:MM:SS to seconds.""" + try: + parts = time_str.split(":") + if len(parts) == 3: + h, m, s = map(int, parts) + return h * 3600 + m * 60 + s + except (ValueError, AttributeError): + pass + return 0 diff --git a/fastanime/core/config/defaults.py b/fastanime/core/config/defaults.py new file mode 100644 index 0000000..4d7602e --- /dev/null +++ b/fastanime/core/config/defaults.py @@ -0,0 +1,86 @@ +# fastanime/core/config/defaults.py + +from ..constants import APP_DATA_DIR, APP_NAME, USER_VIDEOS_DIR + +# GeneralConfig +GENERAL_PYGMENT_STYLE = "github-dark" +GENERAL_API_CLIENT = "anilist" +GENERAL_PROVIDER = "allanime" +GENERAL_SELECTOR = "default" +GENERAL_AUTO_SELECT_ANIME_RESULT = True +GENERAL_ICONS = False +GENERAL_PREVIEW = "none" +GENERAL_IMAGE_RENDERER = "chafa" +GENERAL_MANGA_VIEWER = "feh" +GENERAL_CHECK_FOR_UPDATES = True +GENERAL_CACHE_REQUESTS = True +GENERAL_MAX_CACHE_LIFETIME = "03:00:00" +GENERAL_NORMALIZE_TITLES = True +GENERAL_DISCORD = False +GENERAL_RECENT = 50 + +# StreamConfig +STREAM_PLAYER = "mpv" +STREAM_QUALITY = "1080" +STREAM_TRANSLATION_TYPE = "sub" +STREAM_SERVER = "TOP" +STREAM_AUTO_NEXT = False +STREAM_CONTINUE_FROM_WATCH_HISTORY = True +STREAM_PREFERRED_WATCH_HISTORY = "local" +STREAM_AUTO_SKIP = False +STREAM_EPISODE_COMPLETE_AT = 80 +STREAM_YTDLP_FORMAT = "best[height<=1080]/bestvideo[height<=1080]+bestaudio/best" +STREAM_FORCE_FORWARD_TRACKING = True +STREAM_DEFAULT_MEDIA_LIST_TRACKING = "prompt" +STREAM_SUB_LANG = "eng" + +# ServiceConfig +SERVICE_ENABLED = False +SERVICE_WATCHLIST_CHECK_INTERVAL = 30 +SERVICE_QUEUE_PROCESS_INTERVAL = 1 +SERVICE_MAX_CONCURRENT_DOWNLOADS = 3 +SERVICE_AUTO_RETRY_COUNT = 3 +SERVICE_CLEANUP_COMPLETED_DAYS = 7 +SERVICE_NOTIFICATION_ENABLED = True + +# FzfConfig +FZF_HEADER_COLOR = "95,135,175" +FZF_PREVIEW_HEADER_COLOR = "215,0,95" +FZF_PREVIEW_SEPARATOR_COLOR = "208,208,208" + +# MpvConfig +MPV_ARGS = "" +MPV_PRE_ARGS = "" +MPV_DISABLE_POPEN = True +MPV_USE_PYTHON_MPV = False + +# VlcConfig +VLC_ARGS = "" + +# AnilistConfig +ANILIST_PER_PAGE = 15 +ANILIST_SORT_BY = "SEARCH_MATCH" +ANILIST_PREFERRED_LANGUAGE = "english" + +# DownloadsConfig +DOWNLOADS_DOWNLOADER = "auto" +DOWNLOADS_DOWNLOADS_DIR = USER_VIDEOS_DIR +DOWNLOADS_ENABLE_TRACKING = True +DOWNLOADS_AUTO_ORGANIZE = True +DOWNLOADS_MAX_CONCURRENT = 3 +DOWNLOADS_AUTO_CLEANUP_FAILED = True +DOWNLOADS_RETENTION_DAYS = 30 +DOWNLOADS_SYNC_WITH_WATCH_HISTORY = True +DOWNLOADS_AUTO_MARK_OFFLINE = True +DOWNLOADS_NAMING_TEMPLATE = "{title}/Season {season:02d}/{episode:02d} - {episode_title}.{ext}" +DOWNLOADS_PREFERRED_QUALITY = "1080" +DOWNLOADS_DOWNLOAD_SUBTITLES = True +DOWNLOADS_SUBTITLE_LANGUAGES = ["en"] +DOWNLOADS_QUEUE_MAX_SIZE = 100 +DOWNLOADS_AUTO_START_DOWNLOADS = True +DOWNLOADS_RETRY_ATTEMPTS = 3 +DOWNLOADS_RETRY_DELAY = 300 + +# RegistryConfig +MEDIA_REGISTRY_DIR = USER_VIDEOS_DIR / APP_NAME / "registry" +MEDIA_REGISTRY_INDEX_DIR = APP_DATA_DIR \ No newline at end of file diff --git a/fastanime/core/config/descriptions.py b/fastanime/core/config/descriptions.py new file mode 100644 index 0000000..dc2881b --- /dev/null +++ b/fastanime/core/config/descriptions.py @@ -0,0 +1,132 @@ +# GeneralConfig +GENERAL_PYGMENT_STYLE = "The pygment style to use" +GENERAL_API_CLIENT = "The media database API to use (e.g., 'anilist', 'jikan')." +GENERAL_PROVIDER = "The default anime provider to use for scraping." +GENERAL_SELECTOR = "The interactive selector tool to use for menus." +GENERAL_AUTO_SELECT_ANIME_RESULT = ( + "Automatically select the best-matching search result from a provider." +) +GENERAL_ICONS = "Display emoji icons in the user interface." +GENERAL_PREVIEW = "Type of preview to display in selectors." +GENERAL_IMAGE_RENDERER = ( + "The command-line tool to use for rendering images in the terminal." +) +GENERAL_MANGA_VIEWER = "The external application to use for viewing manga pages." +GENERAL_CHECK_FOR_UPDATES = ( + "Automatically check for new versions of FastAnime on startup." +) +GENERAL_CACHE_REQUESTS = ( + "Enable caching of network requests to speed up subsequent operations." +) +GENERAL_MAX_CACHE_LIFETIME = "Maximum lifetime for a cached request in DD:HH:MM format." +GENERAL_NORMALIZE_TITLES = ( + "Attempt to normalize provider titles to match AniList titles." +) +GENERAL_DISCORD = "Enable Discord Rich Presence to show your current activity." +GENERAL_RECENT = "Number of recently watched anime to keep in history." + +# StreamConfig +STREAM_PLAYER = "The media player to use for streaming." +STREAM_QUALITY = "Preferred stream quality." +STREAM_TRANSLATION_TYPE = "Preferred audio/subtitle language type." +STREAM_SERVER = ( + "The default server to use from a provider. 'top' uses the first available." +) +STREAM_AUTO_NEXT = "Automatically play the next episode when the current one finishes." +STREAM_CONTINUE_FROM_WATCH_HISTORY = ( + "Automatically resume playback from the last known episode and position." +) +STREAM_PREFERRED_WATCH_HISTORY = ( + "Which watch history to prioritize: local file or remote AniList progress." +) +STREAM_AUTO_SKIP = "Automatically skip openings/endings if skip data is available." +STREAM_EPISODE_COMPLETE_AT = ( + "Percentage of an episode to watch before it's marked as complete." +) +STREAM_YTDLP_FORMAT = "The format selection string for yt-dlp." +STREAM_FORCE_FORWARD_TRACKING = ( + "Prevent updating AniList progress to a lower episode number." +) +STREAM_DEFAULT_MEDIA_LIST_TRACKING = ( + "Default behavior for tracking progress on AniList." +) +STREAM_SUB_LANG = "Preferred language code for subtitles (e.g., 'en', 'es')." + +# ServiceConfig +SERVICE_ENABLED = "Whether the background service should be enabled by default." +SERVICE_WATCHLIST_CHECK_INTERVAL = ( + "Minutes between checking AniList watchlist for new episodes." +) +SERVICE_QUEUE_PROCESS_INTERVAL = "Minutes between processing the download queue." +SERVICE_MAX_CONCURRENT_DOWNLOADS = "Maximum number of concurrent downloads." +SERVICE_AUTO_RETRY_COUNT = "Number of times to retry failed downloads." +SERVICE_CLEANUP_COMPLETED_DAYS = ( + "Days to keep completed/failed jobs in queue before cleanup." +) +SERVICE_NOTIFICATION_ENABLED = "Whether to show notifications for new episodes." + +# FzfConfig +FZF_HEADER_COLOR = "RGB color for the main TUI header." +FZF_PREVIEW_HEADER_COLOR = "RGB color for preview pane headers." +FZF_PREVIEW_SEPARATOR_COLOR = "RGB color for preview pane separators." +FZF_OPTS = "The FZF options, formatted with leading tabs for the config file." +FZF_HEADER_ASCII_ART = "The ASCII art to display as a header in the FZF interface." + + +# RofiConfig +ROFI_THEME_MAIN = "Path to the main Rofi theme file." +ROFI_THEME_PREVIEW = "Path to the Rofi theme file for previews." +ROFI_THEME_CONFIRM = "Path to the Rofi theme file for confirmation prompts." +ROFI_THEME_INPUT = "Path to the Rofi theme file for user input prompts." + +# MpvConfig +MPV_ARGS = "Comma-separated arguments to pass to the MPV player." +MPV_PRE_ARGS = "Comma-separated arguments to prepend before the MPV command." +MPV_DISABLE_POPEN = ( + "Disable using subprocess.Popen for MPV, which can be unstable on some systems." +) +MPV_USE_PYTHON_MPV = "Use the python-mpv library for enhanced player control." + +# VlcConfig +VLC_ARGS = "Comma-separated arguments to pass to the Vlc player." + +# AnilistConfig +ANILIST_PER_PAGE = "Number of items to fetch per page from AniList." +ANILIST_SORT_BY = "Default sort order for AniList search results." +ANILIST_PREFERRED_LANGUAGE = "Preferred language for anime titles from AniList." + +# DownloadsConfig +DOWNLOADS_DOWNLOADER = "The downloader to use" +DOWNLOADS_DOWNLOADS_DIR = "The default directory to save downloaded anime." +DOWNLOADS_ENABLE_TRACKING = "Enable download tracking and management" +DOWNLOADS_AUTO_ORGANIZE = "Automatically organize downloads by anime title" +DOWNLOADS_MAX_CONCURRENT = "Maximum concurrent downloads" +DOWNLOADS_AUTO_CLEANUP_FAILED = "Automatically cleanup failed downloads" +DOWNLOADS_RETENTION_DAYS = "Days to keep failed downloads before cleanup" +DOWNLOADS_SYNC_WITH_WATCH_HISTORY = "Sync download status with watch history" +DOWNLOADS_AUTO_MARK_OFFLINE = ( + "Automatically mark downloaded episodes as available offline" +) +DOWNLOADS_NAMING_TEMPLATE = "File naming template for downloaded episodes" +DOWNLOADS_PREFERRED_QUALITY = "Preferred download quality" +DOWNLOADS_DOWNLOAD_SUBTITLES = "Download subtitles when available" +DOWNLOADS_SUBTITLE_LANGUAGES = "Preferred subtitle languages" +DOWNLOADS_QUEUE_MAX_SIZE = "Maximum number of items in download queue" +DOWNLOADS_AUTO_START_DOWNLOADS = "Automatically start downloads when items are queued" +DOWNLOADS_RETRY_ATTEMPTS = "Number of retry attempts for failed downloads" +DOWNLOADS_RETRY_DELAY = "Delay between retry attempts in seconds" + +# RegistryConfig +MEDIA_REGISTRY_DIR = "The default directory to save media registry" +MEDIA_REGISTRY_INDEX_DIR = "The default directory to save media registry index" + +# AppConfig +APP_GENERAL = "General configuration settings for application behavior." +APP_STREAM = "Settings related to video streaming and playback." +APP_DOWNLOADS = "Settings related to downloading" +APP_ANILIST = "Configuration for AniList API integration." +APP_SERVICE = "Configuration for the background download service." +APP_FZF = "Settings for the FZF selector interface." +APP_ROFI = "Settings for the Rofi selector interface." +APP_MPV = "Configuration for the MPV media player." +APP_MEDIA_REGISTRY = "Configuration for the media registry." diff --git a/fastanime/core/config/model.py b/fastanime/core/config/model.py index a9cf2a2..ef31f6f 100644 --- a/fastanime/core/config/model.py +++ b/fastanime/core/config/model.py @@ -13,7 +13,191 @@ from ...core.constants import ( ) from ...libs.api.anilist.constants import SORTS_AVAILABLE from ...libs.providers.anime import PROVIDERS_AVAILABLE, SERVERS_AVAILABLE -from ..constants import APP_ASCII_ART, USER_VIDEOS_DIR +from ..constants import APP_ASCII_ART, APP_DATA_DIR, USER_VIDEOS_DIR +from . import defaults +from . import descriptions as desc + + +class GeneralConfig(BaseModel): + """Configuration for general application behavior and integrations.""" + + pygment_style: str = Field( + default=defaults.GENERAL_PYGMENT_STYLE, description=desc.GENERAL_PYGMENT_STYLE + ) + api_client: Literal["anilist", "jikan"] = Field( + default=defaults.GENERAL_API_CLIENT, + description=desc.GENERAL_API_CLIENT, + ) + provider: str = Field( + default=defaults.GENERAL_PROVIDER, + description=desc.GENERAL_PROVIDER, + examples=list(PROVIDERS_AVAILABLE.keys()), + ) + selector: Literal["default", "fzf", "rofi"] = Field( + default=defaults.GENERAL_SELECTOR, description=desc.GENERAL_SELECTOR + ) + auto_select_anime_result: bool = Field( + default=defaults.GENERAL_AUTO_SELECT_ANIME_RESULT, + description=desc.GENERAL_AUTO_SELECT_ANIME_RESULT, + ) + icons: bool = Field(default=defaults.GENERAL_ICONS, description=desc.GENERAL_ICONS) + preview: Literal["full", "text", "image", "none"] = Field( + default=defaults.GENERAL_PREVIEW, description=desc.GENERAL_PREVIEW + ) + image_renderer: Literal["icat", "chafa", "imgcat"] = Field( + default="icat" + if os.environ.get("KITTY_WINDOW_ID") + else defaults.GENERAL_IMAGE_RENDERER, + description=desc.GENERAL_IMAGE_RENDERER, + ) + manga_viewer: Literal["feh", "icat"] = Field( + default=defaults.GENERAL_MANGA_VIEWER, + description=desc.GENERAL_MANGA_VIEWER, + ) + check_for_updates: bool = Field( + default=defaults.GENERAL_CHECK_FOR_UPDATES, + description=desc.GENERAL_CHECK_FOR_UPDATES, + ) + cache_requests: bool = Field( + default=defaults.GENERAL_CACHE_REQUESTS, + description=desc.GENERAL_CACHE_REQUESTS, + ) + max_cache_lifetime: str = Field( + default=defaults.GENERAL_MAX_CACHE_LIFETIME, + description=desc.GENERAL_MAX_CACHE_LIFETIME, + ) + normalize_titles: bool = Field( + default=defaults.GENERAL_NORMALIZE_TITLES, + description=desc.GENERAL_NORMALIZE_TITLES, + ) + discord: bool = Field( + default=defaults.GENERAL_DISCORD, + description=desc.GENERAL_DISCORD, + ) + recent: int = Field( + default=defaults.GENERAL_RECENT, + ge=0, + description=desc.GENERAL_RECENT, + ) + + @field_validator("provider") + @classmethod + def validate_provider(cls, v: str) -> str: + if v not in PROVIDERS_AVAILABLE: + raise ValueError( + f"'{v}' is not a valid provider. Must be one of: {PROVIDERS_AVAILABLE}" + ) + return v + + +class StreamConfig(BaseModel): + """Configuration specific to video streaming and playback.""" + + player: Literal["mpv", "vlc"] = Field( + default=defaults.STREAM_PLAYER, description=desc.STREAM_PLAYER + ) + quality: Literal["360", "480", "720", "1080"] = Field( + default=defaults.STREAM_QUALITY, description=desc.STREAM_QUALITY + ) + translation_type: Literal["sub", "dub"] = Field( + default=defaults.STREAM_TRANSLATION_TYPE, + description=desc.STREAM_TRANSLATION_TYPE, + ) + server: str = Field( + default=defaults.STREAM_SERVER, + description=desc.STREAM_SERVER, + examples=SERVERS_AVAILABLE, + ) + auto_next: bool = Field( + default=defaults.STREAM_AUTO_NEXT, + description=desc.STREAM_AUTO_NEXT, + ) + continue_from_watch_history: bool = Field( + default=defaults.STREAM_CONTINUE_FROM_WATCH_HISTORY, + description=desc.STREAM_CONTINUE_FROM_WATCH_HISTORY, + ) + preferred_watch_history: Literal["local", "remote"] = Field( + default=defaults.STREAM_PREFERRED_WATCH_HISTORY, + description=desc.STREAM_PREFERRED_WATCH_HISTORY, + ) + auto_skip: bool = Field( + default=defaults.STREAM_AUTO_SKIP, + description=desc.STREAM_AUTO_SKIP, + ) + episode_complete_at: int = Field( + default=defaults.STREAM_EPISODE_COMPLETE_AT, + ge=0, + le=100, + description=desc.STREAM_EPISODE_COMPLETE_AT, + ) + ytdlp_format: str = Field( + default=defaults.STREAM_YTDLP_FORMAT, + description=desc.STREAM_YTDLP_FORMAT, + ) + force_forward_tracking: bool = Field( + default=defaults.STREAM_FORCE_FORWARD_TRACKING, + description=desc.STREAM_FORCE_FORWARD_TRACKING, + ) + default_media_list_tracking: Literal["track", "disabled", "prompt"] = Field( + default=defaults.STREAM_DEFAULT_MEDIA_LIST_TRACKING, + description=desc.STREAM_DEFAULT_MEDIA_LIST_TRACKING, + ) + sub_lang: str = Field( + default=defaults.STREAM_SUB_LANG, + description=desc.STREAM_SUB_LANG, + ) + + @field_validator("server") + @classmethod + def validate_server(cls, v: str) -> str: + if v.lower() != "top" and v not in SERVERS_AVAILABLE: + raise ValueError( + f"'{v}' is not a valid server. Must be 'top' or one of: {SERVERS_AVAILABLE}" + ) + return v + + +class ServiceConfig(BaseModel): + """Configuration for the background download service.""" + + enabled: bool = Field( + default=defaults.SERVICE_ENABLED, + description=desc.SERVICE_ENABLED, + ) + watchlist_check_interval: int = Field( + default=defaults.SERVICE_WATCHLIST_CHECK_INTERVAL, + ge=5, + le=180, + description=desc.SERVICE_WATCHLIST_CHECK_INTERVAL, + ) + queue_process_interval: int = Field( + default=defaults.SERVICE_QUEUE_PROCESS_INTERVAL, + ge=1, + le=60, + description=desc.SERVICE_QUEUE_PROCESS_INTERVAL, + ) + max_concurrent_downloads: int = Field( + default=defaults.SERVICE_MAX_CONCURRENT_DOWNLOADS, + ge=1, + le=10, + description=desc.SERVICE_MAX_CONCURRENT_DOWNLOADS, + ) + auto_retry_count: int = Field( + default=defaults.SERVICE_AUTO_RETRY_COUNT, + ge=0, + le=10, + description=desc.SERVICE_AUTO_RETRY_COUNT, + ) + cleanup_completed_days: int = Field( + default=defaults.SERVICE_CLEANUP_COMPLETED_DAYS, + ge=1, + le=30, + description=desc.SERVICE_CLEANUP_COMPLETED_DAYS, + ) + notification_enabled: bool = Field( + default=defaults.SERVICE_NOTIFICATION_ENABLED, + description=desc.SERVICE_NOTIFICATION_ENABLED, + ) class OtherConfig(BaseModel): @@ -25,14 +209,16 @@ class FzfConfig(OtherConfig): _opts: str = PrivateAttr(default=FZF_DEFAULT_OPTS.read_text(encoding="utf-8")) header_color: str = Field( - default="95,135,175", description="RGB color for the main TUI header." + default=defaults.FZF_HEADER_COLOR, description=desc.FZF_HEADER_COLOR ) _header_ascii_art: str = PrivateAttr(default=APP_ASCII_ART) preview_header_color: str = Field( - default="215,0,95", description="RGB color for preview pane headers." + default=defaults.FZF_PREVIEW_HEADER_COLOR, + description=desc.FZF_PREVIEW_HEADER_COLOR, ) preview_separator_color: str = Field( - default="208,208,208", description="RGB color for preview pane separators." + default=defaults.FZF_PREVIEW_SEPARATOR_COLOR, + description=desc.FZF_PREVIEW_SEPARATOR_COLOR, ) def __init__(self, **kwargs): @@ -45,16 +231,12 @@ class FzfConfig(OtherConfig): if header_ascii_art: self._header_ascii_art = header_ascii_art - @computed_field( - description="The FZF options, formatted with leading tabs for the config file." - ) + @computed_field(description=desc.FZF_OPTS) @property def opts(self) -> str: return "\n" + "\n".join([f"\t{line}" for line in self._opts.split()]) - @computed_field( - description="The ASCII art to display as a header in the FZF interface." - ) + @computed_field(description=desc.FZF_HEADER_ASCII_ART) @property def header_ascii_art(self) -> str: return "\n" + "\n".join( @@ -67,67 +249,63 @@ class RofiConfig(OtherConfig): theme_main: Path = Field( default=Path(str(ROFI_THEME_MAIN)), - description="Path to the main Rofi theme file.", + description=desc.ROFI_THEME_MAIN, ) theme_preview: Path = Field( default=Path(str(ROFI_THEME_PREVIEW)), - description="Path to the Rofi theme file for previews.", + description=desc.ROFI_THEME_PREVIEW, ) theme_confirm: Path = Field( default=Path(str(ROFI_THEME_CONFIRM)), - description="Path to the Rofi theme file for confirmation prompts.", + description=desc.ROFI_THEME_CONFIRM, ) theme_input: Path = Field( default=Path(str(ROFI_THEME_INPUT)), - description="Path to the Rofi theme file for user input prompts.", + description=desc.ROFI_THEME_INPUT, ) class MpvConfig(OtherConfig): """Configuration specific to the MPV player integration.""" - args: str = Field( - default="", description="Comma-separated arguments to pass to the MPV player." - ) + args: str = Field(default=defaults.MPV_ARGS, description=desc.MPV_ARGS) pre_args: str = Field( - default="", - description="Comma-separated arguments to prepend before the MPV command.", + default=defaults.MPV_PRE_ARGS, + description=desc.MPV_PRE_ARGS, ) disable_popen: bool = Field( - default=True, - description="Disable using subprocess.Popen for MPV, which can be unstable on some systems.", + default=defaults.MPV_DISABLE_POPEN, + description=desc.MPV_DISABLE_POPEN, ) use_python_mpv: bool = Field( - default=False, - description="Use the python-mpv library for enhanced player control.", + default=defaults.MPV_USE_PYTHON_MPV, + description=desc.MPV_USE_PYTHON_MPV, ) class VlcConfig(OtherConfig): """Configuration specific to the vlc player integration.""" - args: str = Field( - default="", description="Comma-separated arguments to pass to the Vlc player." - ) + args: str = Field(default=defaults.VLC_ARGS, description=desc.VLC_ARGS) class AnilistConfig(OtherConfig): """Configuration for interacting with the AniList API.""" per_page: int = Field( - default=15, + default=defaults.ANILIST_PER_PAGE, gt=0, le=50, - description="Number of items to fetch per page from AniList.", + description=desc.ANILIST_PER_PAGE, ) sort_by: str = Field( - default="SEARCH_MATCH", - description="Default sort order for AniList search results.", + default=defaults.ANILIST_SORT_BY, + description=desc.ANILIST_SORT_BY, examples=SORTS_AVAILABLE, ) preferred_language: Literal["english", "romaji"] = Field( - default="english", - description="Preferred language for anime titles from AniList.", + default=defaults.ANILIST_PREFERRED_LANGUAGE, + description=desc.ANILIST_PREFERRED_LANGUAGE, ) @field_validator("sort_by") @@ -150,249 +328,102 @@ class DownloadsConfig(OtherConfig): """Configuration for download related options""" downloader: Literal["auto", "default", "yt-dlp"] = Field( - default="auto", description="The downloader to use" + default=defaults.DOWNLOADS_DOWNLOADER, description=desc.DOWNLOADS_DOWNLOADER ) downloads_dir: Path = Field( - default_factory=lambda: USER_VIDEOS_DIR, - description="The default directory to save downloaded anime.", + default_factory=lambda: defaults.DOWNLOADS_DOWNLOADS_DIR, + description=desc.DOWNLOADS_DOWNLOADS_DIR, ) - + # Download tracking configuration enable_tracking: bool = Field( - default=True, description="Enable download tracking and management" + default=defaults.DOWNLOADS_ENABLE_TRACKING, + description=desc.DOWNLOADS_ENABLE_TRACKING, ) auto_organize: bool = Field( - default=True, description="Automatically organize downloads by anime title" + default=defaults.DOWNLOADS_AUTO_ORGANIZE, + description=desc.DOWNLOADS_AUTO_ORGANIZE, ) max_concurrent: int = Field( - default=3, gt=0, le=10, description="Maximum concurrent downloads" + default=defaults.DOWNLOADS_MAX_CONCURRENT, + gt=0, + le=10, + description=desc.DOWNLOADS_MAX_CONCURRENT, ) auto_cleanup_failed: bool = Field( - default=True, description="Automatically cleanup failed downloads" + default=defaults.DOWNLOADS_AUTO_CLEANUP_FAILED, + description=desc.DOWNLOADS_AUTO_CLEANUP_FAILED, ) retention_days: int = Field( - default=30, gt=0, description="Days to keep failed downloads before cleanup" + default=defaults.DOWNLOADS_RETENTION_DAYS, + gt=0, + description=desc.DOWNLOADS_RETENTION_DAYS, ) - + # Integration with watch history sync_with_watch_history: bool = Field( - default=True, description="Sync download status with watch history" + default=defaults.DOWNLOADS_SYNC_WITH_WATCH_HISTORY, + description=desc.DOWNLOADS_SYNC_WITH_WATCH_HISTORY, ) auto_mark_offline: bool = Field( - default=True, description="Automatically mark downloaded episodes as available offline" + default=defaults.DOWNLOADS_AUTO_MARK_OFFLINE, + description=desc.DOWNLOADS_AUTO_MARK_OFFLINE, ) - + # File organization naming_template: str = Field( - default="{title}/Season {season:02d}/{episode:02d} - {episode_title}.{ext}", - description="File naming template for downloaded episodes" + default=defaults.DOWNLOADS_NAMING_TEMPLATE, + description=desc.DOWNLOADS_NAMING_TEMPLATE, ) - + # Quality and subtitles preferred_quality: Literal["360", "480", "720", "1080", "best"] = Field( - default="1080", description="Preferred download quality" + default=defaults.DOWNLOADS_PREFERRED_QUALITY, + description=desc.DOWNLOADS_PREFERRED_QUALITY, ) download_subtitles: bool = Field( - default=True, description="Download subtitles when available" + default=defaults.DOWNLOADS_DOWNLOAD_SUBTITLES, + description=desc.DOWNLOADS_DOWNLOAD_SUBTITLES, ) subtitle_languages: List[str] = Field( - default=["en"], description="Preferred subtitle languages" + default=defaults.DOWNLOADS_SUBTITLE_LANGUAGES, + description=desc.DOWNLOADS_SUBTITLE_LANGUAGES, ) - + # Queue management queue_max_size: int = Field( - default=100, gt=0, description="Maximum number of items in download queue" + default=defaults.DOWNLOADS_QUEUE_MAX_SIZE, + gt=0, + description=desc.DOWNLOADS_QUEUE_MAX_SIZE, ) auto_start_downloads: bool = Field( - default=True, description="Automatically start downloads when items are queued" + default=defaults.DOWNLOADS_AUTO_START_DOWNLOADS, + description=desc.DOWNLOADS_AUTO_START_DOWNLOADS, ) retry_attempts: int = Field( - default=3, ge=0, description="Number of retry attempts for failed downloads" + default=defaults.DOWNLOADS_RETRY_ATTEMPTS, + ge=0, + description=desc.DOWNLOADS_RETRY_ATTEMPTS, ) retry_delay: int = Field( - default=300, ge=0, description="Delay between retry attempts in seconds" - ) - - -class GeneralConfig(BaseModel): - """Configuration for general application behavior and integrations.""" - - pygment_style: str = Field( - default="github-dark", description="The pygment style to use" - ) - api_client: Literal["anilist", "jikan"] = Field( - default="anilist", - description="The media database API to use (e.g., 'anilist', 'jikan').", - ) - provider: str = Field( - default="allanime", - description="The default anime provider to use for scraping.", - examples=list(PROVIDERS_AVAILABLE.keys()), - ) - selector: Literal["default", "fzf", "rofi"] = Field( - default="default", description="The interactive selector tool to use for menus." - ) - auto_select_anime_result: bool = Field( - default=True, - description="Automatically select the best-matching search result from a provider.", - ) - icons: bool = Field( - default=False, description="Display emoji icons in the user interface." - ) - preview: Literal["full", "text", "image", "none"] = Field( - default="none", description="Type of preview to display in selectors." - ) - image_renderer: Literal["icat", "chafa", "imgcat"] = Field( - default="icat" if os.environ.get("KITTY_WINDOW_ID") else "chafa", - description="The command-line tool to use for rendering images in the terminal.", - ) - manga_viewer: Literal["feh", "icat"] = Field( - default="feh", - description="The external application to use for viewing manga pages.", - ) - check_for_updates: bool = Field( - default=True, - description="Automatically check for new versions of FastAnime on startup.", - ) - cache_requests: bool = Field( - default=True, - description="Enable caching of network requests to speed up subsequent operations.", - ) - max_cache_lifetime: str = Field( - default="03:00:00", - description="Maximum lifetime for a cached request in DD:HH:MM format.", - ) - normalize_titles: bool = Field( - default=True, - description="Attempt to normalize provider titles to match AniList titles.", - ) - discord: bool = Field( - default=False, - description="Enable Discord Rich Presence to show your current activity.", - ) - recent: int = Field( - default=50, + default=defaults.DOWNLOADS_RETRY_DELAY, ge=0, - description="Number of recently watched anime to keep in history.", + description=desc.DOWNLOADS_RETRY_DELAY, ) - @field_validator("provider") - @classmethod - def validate_provider(cls, v: str) -> str: - if v not in PROVIDERS_AVAILABLE: - raise ValueError( - f"'{v}' is not a valid provider. Must be one of: {PROVIDERS_AVAILABLE}" - ) - return v +class MediaRegistryConfig(OtherConfig): + """Configuration for registry related options""" -class StreamConfig(BaseModel): - """Configuration specific to video streaming and playback.""" - - player: Literal["mpv", "vlc"] = Field( - default="mpv", description="The media player to use for streaming." - ) - quality: Literal["360", "480", "720", "1080"] = Field( - default="1080", description="Preferred stream quality." - ) - translation_type: Literal["sub", "dub"] = Field( - default="sub", description="Preferred audio/subtitle language type." - ) - server: str = Field( - default="TOP", - description="The default server to use from a provider. 'top' uses the first available.", - examples=SERVERS_AVAILABLE, - ) - auto_next: bool = Field( - default=False, - description="Automatically play the next episode when the current one finishes.", - ) - continue_from_watch_history: bool = Field( - default=True, - description="Automatically resume playback from the last known episode and position.", - ) - preferred_watch_history: Literal["local", "remote"] = Field( - default="local", - description="Which watch history to prioritize: local file or remote AniList progress.", - ) - auto_skip: bool = Field( - default=False, - description="Automatically skip openings/endings if skip data is available.", - ) - episode_complete_at: int = Field( - default=80, - ge=0, - le=100, - description="Percentage of an episode to watch before it's marked as complete.", - ) - ytdlp_format: str = Field( - default="best[height<=1080]/bestvideo[height<=1080]+bestaudio/best", - description="The format selection string for yt-dlp.", - ) - force_forward_tracking: bool = Field( - default=True, - description="Prevent updating AniList progress to a lower episode number.", - ) - default_media_list_tracking: Literal["track", "disabled", "prompt"] = Field( - default="prompt", - description="Default behavior for tracking progress on AniList.", - ) - sub_lang: str = Field( - default="eng", - description="Preferred language code for subtitles (e.g., 'en', 'es').", + media_dir: Path = Field( + default=defaults.MEDIA_REGISTRY_DIR, + description=desc.MEDIA_REGISTRY_DIR, ) - @field_validator("server") - @classmethod - def validate_server(cls, v: str) -> str: - if v.lower() != "top" and v not in SERVERS_AVAILABLE: - raise ValueError( - f"'{v}' is not a valid server. Must be 'top' or one of: {SERVERS_AVAILABLE}" - ) - return v - - -class ServiceConfig(BaseModel): - """Configuration for the background download service.""" - - enabled: bool = Field( - default=False, - description="Whether the background service should be enabled by default.", - ) - watchlist_check_interval: int = Field( - default=30, - ge=5, - le=180, - description="Minutes between checking AniList watchlist for new episodes.", - ) - queue_process_interval: int = Field( - default=1, - ge=1, - le=60, - description="Minutes between processing the download queue.", - ) - max_concurrent_downloads: int = Field( - default=3, - ge=1, - le=10, - description="Maximum number of concurrent downloads.", - ) - auto_retry_count: int = Field( - default=3, - ge=0, - le=10, - description="Number of times to retry failed downloads.", - ) - cleanup_completed_days: int = Field( - default=7, - ge=1, - le=30, - description="Days to keep completed/failed jobs in queue before cleanup.", - ) - notification_enabled: bool = Field( - default=True, - description="Whether to show notifications for new episodes.", + index_dir: Path = Field( + default=defaults.MEDIA_REGISTRY_INDEX_DIR, + description=desc.MEDIA_REGISTRY_INDEX_DIR, ) @@ -401,36 +432,37 @@ class AppConfig(BaseModel): general: GeneralConfig = Field( default_factory=GeneralConfig, - description="General configuration settings for application behavior.", + description=desc.APP_GENERAL, ) stream: StreamConfig = Field( default_factory=StreamConfig, - description="Settings related to video streaming and playback.", + description=desc.APP_STREAM, ) downloads: DownloadsConfig = Field( - default_factory=DownloadsConfig, description="Settings related to downloading" + default_factory=DownloadsConfig, description=desc.APP_DOWNLOADS ) anilist: AnilistConfig = Field( default_factory=AnilistConfig, - description="Configuration for AniList API integration.", + description=desc.APP_ANILIST, ) service: ServiceConfig = Field( default_factory=ServiceConfig, - description="Configuration for the background download service.", + description=desc.APP_SERVICE, ) fzf: FzfConfig = Field( default_factory=FzfConfig, - description="Settings for the FZF selector interface.", + description=desc.APP_FZF, ) rofi: RofiConfig = Field( default_factory=RofiConfig, - description="Settings for the Rofi selector interface.", - ) - mpv: MpvConfig = Field( - default_factory=MpvConfig, description="Configuration for the MPV media player." + description=desc.APP_ROFI, ) + mpv: MpvConfig = Field(default_factory=MpvConfig, description=desc.APP_MPV) service: ServiceConfig = Field( default_factory=ServiceConfig, - description="Configuration for the background download service.", + description=desc.APP_SERVICE, + ) + media_registry: MediaRegistryConfig = Field( + default_factory=MediaRegistryConfig, description=desc.APP_MEDIA_REGISTRY ) diff --git a/fastanime/core/utils/file.py b/fastanime/core/utils/file.py new file mode 100644 index 0000000..3c7823f --- /dev/null +++ b/fastanime/core/utils/file.py @@ -0,0 +1,310 @@ +import logging +import os +import time +import uuid +from pathlib import Path +from typing import IO, Any, BinaryIO, TextIO, Union + +logger = logging.getLogger(__name__) + + +def get_file_modification_time(filepath: Path) -> float: + """ + Returns the modification time of a file as a Unix timestamp. + Returns 0 if the file does not exist. + """ + if filepath.exists(): + return filepath.stat().st_mtime + return 0 + + +def check_file_modified(filepath: Path, previous_mtime: float) -> tuple[float, bool]: + """ + Checks if a file has been modified since a given previous modification time. + """ + current_mtime = get_file_modification_time(filepath) + return current_mtime, current_mtime > previous_mtime + + +class AtomicWriter: + """ + A context manager for performing atomic file writes. + + Writes are first directed to a temporary file. If the 'with' block + completes successfully, the temporary file is atomically renamed + to the target path, ensuring that the target file is never in + a partially written or corrupted state. If an error occurs, the + temporary file is cleaned up, and the original target file remains + untouched. + + Usage: + # For text files + with AtomicWriter(Path("my_file.txt"), mode="w", encoding="utf-8") as f: + f.write("Hello, world!\n") + f.write("This is an atomic write.") + + # For binary files + with AtomicWriter(Path("binary_data.bin"), mode="wb") as f: + f.write(b"\x01\x02\x03\x04") + """ + + def __init__( + self, target_path: Path, mode: str = "w", encoding: Union[str, None] = "utf-8" + ): + """ + Initializes the AtomicWriter. + + Args: + target_path: The Path object for the final destination file. + mode: The file opening mode (e.g., 'w', 'wb'). Only write modes are supported. + 'a' (append) and 'x' (exclusive creation) modes are not supported + as this class is designed for full file replacement. + encoding: The encoding to use for text modes ('w', 'wt'). + Should be None for binary modes ('wb'). + + Raises: + ValueError: If an unsupported file mode is provided. + """ + if "a" in mode: + raise ValueError( + "AtomicWriter does not support 'append' mode ('a'). " + "It's designed for full file replacement." + ) + if "x" in mode: + raise ValueError( + "AtomicWriter does not support 'exclusive creation' mode ('x'). " + "It handles creation/replacement atomically." + ) + if "r" in mode: + raise ValueError("AtomicWriter is for writing, not reading.") + if "b" in mode and encoding is not None: + raise ValueError("Encoding must be None for binary write modes ('wb').") + if "b" not in mode and encoding is None: + raise ValueError( + "Encoding must be specified for text write modes ('w', 'wt')." + ) + + self.target_path = target_path + self.mode = mode + self.encoding = encoding + + temp_filename = f"{target_path.name}.{os.getpid()}.{uuid.uuid4()}.tmp" + self.temp_path = target_path.parent / temp_filename + + self._file_handle: Union[IO[Any], None] = None + + def __enter__(self) -> IO[Any]: + """ + Enters the context, opens the temporary file for writing, + and returns the file handle. + + Ensures the parent directory of the target file exists. + + Returns: + A file-like object (TextIO or BinaryIO) for writing. + + Raises: + Exception: If there's an error opening the temporary file. + """ + try: + self.target_path.parent.mkdir(parents=True, exist_ok=True) + + self._file_handle = self.temp_path.open( + mode=self.mode, encoding=self.encoding + ) + return self._file_handle + except Exception as e: + logger.error(f"Error opening temporary file {self.temp_path}: {e}") + raise + + def __exit__(self, exc_type, exc_val, exc_tb) -> bool: + """ + Exits the context. Closes the temporary file. + + If no exception occurred within the 'with' block, atomically renames + the temporary file to the target path. Otherwise, cleans up the + temporary file, ensuring the original target file remains untouched. + + Args: + exc_type: The type of exception raised in the 'with' block (or None). + exc_val: The exception instance (or None). + exc_tb: The traceback object (or None). + + Returns: + False: To propagate any exception that occurred within the 'with' block. + (Returning True would suppress the exception). + """ + if self._file_handle: + self._file_handle.close() + self._file_handle = None + + if exc_type is None: + try: + os.replace(self.temp_path, self.target_path) + logger.debug(f"Successfully wrote atomically to {self.target_path}") + except OSError as e: + logger.error( + f"Error renaming temporary file {self.temp_path} to {self.target_path}: {e}" + ) + try: + self.temp_path.unlink(missing_ok=True) + except OSError as cleanup_e: + logger.error( + f"Failed to clean up temporary file {self.temp_path} after rename error: {cleanup_e}" + ) + raise + else: + logger.debug( + f"An error occurred during write. Cleaning up temporary file {self.temp_path}." + ) + try: + self.temp_path.unlink(missing_ok=True) + except OSError as e: + logger.error(f"Error cleaning up temporary file {self.temp_path}: {e}") + return False + + +class FileLock: + def __init__( + self, lock_file_path: Path, timeout: float = 300, stale_timeout: float = 300 + ): + """ + Initializes a file-based lock. + + Args: + lock_file_path: The Path object for the lock file. + timeout: How long (in seconds) to wait to acquire the lock. + Set to 0 for non-blocking attempt. Set to -1 for indefinite wait. + stale_timeout: If the lock file is older than this (in seconds), + it's considered stale and will be broken. + """ + self.lock_file_path = lock_file_path + self.timeout = timeout + self.stale_timeout = stale_timeout + self._acquired = False + self._pid = os.getpid() # Get current process ID + + def _acquire_atomic(self) -> bool: + """ + Attempts to atomically create the lock file. + Returns True on success, False on failure (file already exists). + Writes the PID and timestamp to the lock file. + """ + try: + # Use 'x' mode for atomic creation: create only if it doesn't exist. + # If it exists, FileExistsError is raised. + with self.lock_file_path.open("x") as f: + f.write(f"{self._pid}\n{time.time()}") + return True + except FileExistsError: + return False + except Exception as e: + # Handle other potential errors during file creation/write + logger.error(f"Error creating lock file {self.lock_file_path}: {e}") + return False + + def _is_stale(self) -> bool: + """ + Checks if the existing lock file is stale based on its modification time + or the PID inside it. + """ + if not self.lock_file_path.exists(): + return False # Not stale if it doesn't exist + + try: + # Read PID and timestamp from the lock file + with self.lock_file_path.open("r") as f: + lines = f.readlines() + if len(lines) >= 2: + locked_pid = int(lines[0].strip()) + locked_timestamp = float(lines[1].strip()) + current_time = time.time() + if current_time - locked_timestamp > self.stale_timeout: + logger.warning( + f"Lock file {self.lock_file_path} is older than {self.stale_timeout} seconds. Considering it stale." + ) + return True + return False + + except (ValueError, IndexError, FileNotFoundError, OSError) as e: + logger.warning( + f"Could not read or parse lock file {self.lock_file_path}. Assuming it's stale due to potential corruption: {e}" + ) + return True + + def acquire(self): + """ + Attempts to acquire the lock. Blocks until acquired or timeout occurs. + """ + start_time = time.time() + while True: + if self._acquire_atomic(): + self._acquired = True + logger.debug(f"Lock acquired by PID {self._pid}.") + return + + if self._is_stale(): + logger.debug( + f"Existing lock file {self.lock_file_path} is stale. Attempting to break it." + ) + try: + self.lock_file_path.unlink() + if self._acquire_atomic(): + self._acquired = True + logger.debug( + f"Stale lock broken and new lock acquired by PID {self._pid}." + ) + return + except OSError as e: + logger.error( + f"Could not remove stale lock file {self.lock_file_path}: {e}" + ) + + if self.timeout >= 0 and time.time() - start_time > self.timeout: + raise TimeoutError( + f"Failed to acquire lock {self.lock_file_path} within {self.timeout} seconds." + ) + + sleep_time = 0.1 + if self.timeout == -1: + logger.debug(f"Waiting for lock {self.lock_file_path} indefinitely...") + time.sleep(sleep_time) + elif self.timeout > 0: + logger.debug( + f"Waiting for lock {self.lock_file_path} ({round(self.timeout - (time.time() - start_time), 1)}s remaining)..." + ) + time.sleep( + min( + sleep_time, + self.timeout - (time.time() - start_time) + if self.timeout - (time.time() - start_time) > 0 + else sleep_time, + ) + ) + else: + raise BlockingIOError( + f"Lock {self.lock_file_path} is currently held by another process (non-blocking)." + ) + + def release(self): + """ + Releases the lock by deleting the lock file. + """ + if self._acquired: + try: + self.lock_file_path.unlink(missing_ok=True) + self._acquired = False + logger.debug(f"Lock released by PID {self._pid}.") + except OSError as e: + logger.error(f"Error releasing lock file {self.lock_file_path}: {e}") + else: + logger.warning( + "Attempted to release a lock that was not acquired by PID {self._pid}." + ) + + def __enter__(self): + self.acquire() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + self.release() diff --git a/fastanime/libs/api/anilist/mapper.py b/fastanime/libs/api/anilist/mapper.py index 2285384..4f04693 100644 --- a/fastanime/libs/api/anilist/mapper.py +++ b/fastanime/libs/api/anilist/mapper.py @@ -30,6 +30,8 @@ from .types import ( AnilistPageInfo, AnilistStudioNodes, AnilistViewerData, +) +from .types import ( StreamingEpisode as AnilistStreamingEpisode, ) @@ -38,10 +40,13 @@ logger = logging.getLogger(__name__) def _to_generic_media_title(anilist_title: AnilistMediaTitle) -> MediaTitle: """Maps an AniList title object to a generic MediaTitle.""" + romaji = anilist_title.get("romaji") + english = anilist_title.get("english") + native = anilist_title.get("native") return MediaTitle( - romaji=anilist_title.get("romaji"), - english=anilist_title.get("english"), - native=anilist_title.get("native"), + romaji=romaji, + english=(english or romaji or native or "NO_TITLE"), + native=native, ) @@ -103,13 +108,12 @@ def _to_generic_tags(anilist_tags: list[AnilistMediaTag]) -> List[MediaTag]: ] -def _to_generic_streaming_episodes(anilist_episodes: list[AnilistStreamingEpisode]) -> List[StreamingEpisode]: +def _to_generic_streaming_episodes( + anilist_episodes: list[AnilistStreamingEpisode], +) -> List[StreamingEpisode]: """Maps a list of AniList streaming episodes to generic StreamingEpisode objects.""" return [ - StreamingEpisode( - title=episode["title"], - thumbnail=episode.get("thumbnail") - ) + StreamingEpisode(title=episode["title"], thumbnail=episode.get("thumbnail")) for episode in anilist_episodes if episode.get("title") ] @@ -174,7 +178,19 @@ def _to_generic_media_item( popularity=data.get("popularity"), favourites=data.get("favourites"), next_airing=_to_generic_airing_schedule(data.get("nextAiringEpisode")), - streaming_episodes=_to_generic_streaming_episodes(data.get("streamingEpisodes", [])), + start_date=datetime( + data["startDate"]["year"], + data["startDate"]["month"], + data["startDate"]["day"], + ), + end_date=datetime( + data["startDate"]["year"], + data["startDate"]["month"], + data["startDate"]["day"], + ), + streaming_episodes=_to_generic_streaming_episodes( + data.get("streamingEpisodes", []) + ), user_status=_to_generic_user_status(data, media_list), ) diff --git a/fastanime/libs/api/types.py b/fastanime/libs/api/types.py index a9bf246..aef18fe 100644 --- a/fastanime/libs/api/types.py +++ b/fastanime/libs/api/types.py @@ -20,6 +20,7 @@ UserListStatusType = Literal[ class BaseApiModel(BaseModel): """Base model for all API types.""" + pass @@ -34,8 +35,8 @@ class MediaImage(BaseApiModel): class MediaTitle(BaseApiModel): """A generic representation of media titles.""" + english: str romaji: Optional[str] = None - english: Optional[str] = None native: Optional[str] = None @@ -93,15 +94,10 @@ class UserListStatus(BaseApiModel): class MediaItem(BaseApiModel): - """ - The definitive, backend-agnostic representation of a single media item. - This is the primary data model the application will interact with. - """ - id: int + title: MediaTitle id_mal: Optional[int] = None type: MediaType = "ANIME" - title: MediaTitle = Field(default_factory=MediaTitle) status: Optional[str] = None format: Optional[str] = None # e.g., TV, MOVIE, OVA @@ -121,6 +117,9 @@ class MediaItem(BaseApiModel): popularity: Optional[int] = None favourites: Optional[int] = None + start_date: Optional[datetime] = None + end_date: Optional[datetime] = None + next_airing: Optional[AiringSchedule] = None # streaming episodes