feat: registry service

This commit is contained in:
Benexl
2025-07-21 17:26:25 +03:00
parent 725fe4875d
commit c0d87c4351
23 changed files with 1406 additions and 2756 deletions

View File

@@ -1,419 +0,0 @@
"""
Pydantic models for download tracking system.
This module defines the data models used throughout the download tracking system,
providing type safety and validation using Pydantic v2.
"""
from __future__ import annotations
import hashlib
import logging
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Literal, Optional
from pydantic import BaseModel, ConfigDict, Field, computed_field, field_validator
from ....core.constants import APP_DATA_DIR
from ....libs.api.types import MediaItem
logger = logging.getLogger(__name__)
# Type aliases for better readability
DownloadStatus = Literal["completed", "failed", "downloading", "queued", "paused"]
QualityOption = Literal["360", "480", "720", "1080", "best"]
MediaStatus = Literal["active", "completed", "paused", "failed"]
class EpisodeDownload(BaseModel):
"""
Pydantic model for individual episode download tracking.
Tracks all information related to a single episode download including
file location, download progress, quality, and integrity information.
"""
model_config = ConfigDict(
str_strip_whitespace=True,
validate_assignment=True,
frozen=True, # Immutable after creation for data integrity
)
episode_number: int = Field(gt=0, description="Episode number")
file_path: Path = Field(description="Path to downloaded file")
file_size: int = Field(ge=0, description="File size in bytes")
download_date: datetime = Field(default_factory=datetime.now)
quality: QualityOption = Field(default="1080")
source_provider: str = Field(description="Provider used for download")
status: DownloadStatus = Field(default="queued")
checksum: Optional[str] = Field(None, description="SHA256 checksum for integrity")
subtitle_files: List[Path] = Field(default_factory=list)
download_progress: float = Field(default=0.0, ge=0.0, le=1.0)
error_message: Optional[str] = Field(None, description="Error message if failed")
download_speed: Optional[float] = Field(None, description="Download speed in bytes/sec")
@field_validator("file_path")
@classmethod
def validate_file_path(cls, v: Path) -> Path:
"""Ensure file path is absolute and within allowed directories."""
if not v.is_absolute():
raise ValueError("File path must be absolute")
return v
@computed_field
@property
def is_completed(self) -> bool:
"""Check if download is completed and file exists."""
return self.status == "completed" and self.file_path.exists()
@computed_field
@property
def file_size_mb(self) -> float:
"""Get file size in megabytes."""
return self.file_size / (1024 * 1024)
@computed_field
@property
def display_status(self) -> str:
"""Get human-readable status."""
status_map = {
"completed": "✓ Completed",
"failed": "✗ Failed",
"downloading": "⬇ Downloading",
"queued": "⏳ Queued",
"paused": "⏸ Paused"
}
return status_map.get(self.status, self.status)
def generate_checksum(self) -> Optional[str]:
"""Generate SHA256 checksum for the downloaded file."""
if not self.file_path.exists():
return None
try:
sha256_hash = hashlib.sha256()
with open(self.file_path, "rb") as f:
for chunk in iter(lambda: f.read(4096), b""):
sha256_hash.update(chunk)
return sha256_hash.hexdigest()
except Exception as e:
logger.error(f"Failed to generate checksum for {self.file_path}: {e}")
return None
def verify_integrity(self) -> bool:
"""Verify file integrity using stored checksum."""
if not self.checksum or not self.file_path.exists():
return False
current_checksum = self.generate_checksum()
return current_checksum == self.checksum
class MediaDownloadRecord(BaseModel):
"""
Pydantic model for anime series download tracking.
Manages download information for an entire anime series including
individual episodes, metadata, and organization preferences.
"""
model_config = ConfigDict(
str_strip_whitespace=True,
validate_assignment=True,
)
media_item: MediaItem = Field(description="The anime media item")
episodes: Dict[int, EpisodeDownload] = Field(default_factory=dict)
download_path: Path = Field(description="Base download directory for this anime")
created_date: datetime = Field(default_factory=datetime.now)
last_updated: datetime = Field(default_factory=datetime.now)
preferred_quality: QualityOption = Field(default="1080")
auto_download_new: bool = Field(default=False, description="Auto-download new episodes")
tags: List[str] = Field(default_factory=list, description="User-defined tags")
notes: Optional[str] = Field(None, description="User notes")
# Organization preferences
naming_template: str = Field(
default="{title}/Season {season:02d}/{episode:02d} - {episode_title}.{ext}",
description="File naming template"
)
@field_validator("download_path")
@classmethod
def validate_download_path(cls, v: Path) -> Path:
"""Ensure download path is absolute."""
if not v.is_absolute():
raise ValueError("Download path must be absolute")
return v
@computed_field
@property
def total_episodes_downloaded(self) -> int:
"""Get count of successfully downloaded episodes."""
return len([ep for ep in self.episodes.values() if ep.is_completed])
@computed_field
@property
def total_size_bytes(self) -> int:
"""Get total size of all downloaded episodes in bytes."""
return sum(ep.file_size for ep in self.episodes.values() if ep.is_completed)
@computed_field
@property
def total_size_gb(self) -> float:
"""Get total size in gigabytes."""
return self.total_size_bytes / (1024 * 1024 * 1024)
@computed_field
@property
def completion_percentage(self) -> float:
"""Get completion percentage based on total episodes."""
if not self.media_item.episodes or self.media_item.episodes == 0:
return 0.0
return (self.total_episodes_downloaded / self.media_item.episodes) * 100
@computed_field
@property
def display_title(self) -> str:
"""Get display title for the anime."""
return (
self.media_item.title.english
or self.media_item.title.romaji
or f"Anime {self.media_item.id}"
)
@computed_field
@property
def status(self) -> MediaStatus:
"""Determine overall download status for this anime."""
if not self.episodes:
return "active"
statuses = [ep.status for ep in self.episodes.values()]
if all(s == "completed" for s in statuses):
if self.media_item.episodes and len(self.episodes) >= self.media_item.episodes:
return "completed"
if any(s == "failed" for s in statuses):
return "failed"
if any(s in ["downloading", "queued"] for s in statuses):
return "active"
return "paused"
def get_next_episode_to_download(self) -> Optional[int]:
"""Get the next episode number that should be downloaded."""
if not self.media_item.episodes:
return None
downloaded_episodes = set(ep.episode_number for ep in self.episodes.values() if ep.is_completed)
for episode_num in range(1, self.media_item.episodes + 1):
if episode_num not in downloaded_episodes:
return episode_num
return None
def get_failed_episodes(self) -> List[int]:
"""Get list of episode numbers that failed to download."""
return [
ep.episode_number for ep in self.episodes.values()
if ep.status == "failed"
]
def update_last_modified(self) -> None:
"""Update the last_updated timestamp."""
# Create a new instance with updated timestamp since the model might be frozen
object.__setattr__(self, "last_updated", datetime.now())
class MediaIndexEntry(BaseModel):
"""
Lightweight entry in the download index for fast operations.
Provides quick access to basic information about a download record
without loading the full MediaDownloadRecord.
"""
model_config = ConfigDict(validate_assignment=True)
media_id: int = Field(description="AniList media ID")
title: str = Field(description="Display title")
episode_count: int = Field(default=0, ge=0)
completed_episodes: int = Field(default=0, ge=0)
last_download: Optional[datetime] = None
status: MediaStatus = Field(default="active")
total_size: int = Field(default=0, ge=0)
file_path: Path = Field(description="Path to the media record file")
@computed_field
@property
def completion_percentage(self) -> float:
"""Get completion percentage."""
if self.episode_count == 0:
return 0.0
return (self.completed_episodes / self.episode_count) * 100
@computed_field
@property
def total_size_mb(self) -> float:
"""Get total size in megabytes."""
return self.total_size / (1024 * 1024)
class DownloadIndex(BaseModel):
"""
Lightweight index for fast download operations.
Maintains an overview of all download records without loading
the full data, enabling fast searches and filtering.
"""
model_config = ConfigDict(validate_assignment=True)
version: str = Field(default="1.0")
last_updated: datetime = Field(default_factory=datetime.now)
media_count: int = Field(default=0, ge=0)
total_episodes: int = Field(default=0, ge=0)
total_size_bytes: int = Field(default=0, ge=0)
media_index: Dict[int, MediaIndexEntry] = Field(default_factory=dict)
@computed_field
@property
def total_size_gb(self) -> float:
"""Get total size across all downloads in gigabytes."""
return self.total_size_bytes / (1024 * 1024 * 1024)
@computed_field
@property
def completion_stats(self) -> Dict[str, int]:
"""Get completion statistics."""
stats = {"completed": 0, "active": 0, "failed": 0, "paused": 0}
for entry in self.media_index.values():
stats[entry.status] = stats.get(entry.status, 0) + 1
return stats
def add_media_entry(self, media_record: MediaDownloadRecord) -> None:
"""Add or update a media entry in the index."""
entry = MediaIndexEntry(
media_id=media_record.media_item.id,
title=media_record.display_title,
episode_count=media_record.media_item.episodes or 0,
completed_episodes=media_record.total_episodes_downloaded,
last_download=media_record.last_updated,
status=media_record.status,
total_size=media_record.total_size_bytes,
file_path=APP_DATA_DIR / "downloads" / "media" / f"{media_record.media_item.id}.json"
)
self.media_index[media_record.media_item.id] = entry
self.media_count = len(self.media_index)
self.total_episodes = sum(entry.completed_episodes for entry in self.media_index.values())
self.total_size_bytes = sum(entry.total_size for entry in self.media_index.values())
self.last_updated = datetime.now()
def remove_media_entry(self, media_id: int) -> bool:
"""Remove a media entry from the index."""
if media_id in self.media_index:
del self.media_index[media_id]
self.media_count = len(self.media_index)
self.total_episodes = sum(entry.completed_episodes for entry in self.media_index.values())
self.total_size_bytes = sum(entry.total_size for entry in self.media_index.values())
self.last_updated = datetime.now()
return True
return False
class DownloadQueueItem(BaseModel):
"""
Item in the download queue.
Represents a single episode queued for download with priority
and scheduling information.
"""
model_config = ConfigDict(frozen=True)
media_id: int
episode_number: int
priority: int = Field(default=0, description="Higher number = higher priority")
added_date: datetime = Field(default_factory=datetime.now)
estimated_size: Optional[int] = Field(None, description="Estimated file size")
quality_preference: QualityOption = Field(default="1080")
retry_count: int = Field(default=0, ge=0)
max_retries: int = Field(default=3, gt=0)
@computed_field
@property
def can_retry(self) -> bool:
"""Check if this item can be retried."""
return self.retry_count < self.max_retries
@computed_field
@property
def estimated_size_mb(self) -> Optional[float]:
"""Get estimated size in megabytes."""
if self.estimated_size is None:
return None
return self.estimated_size / (1024 * 1024)
class DownloadQueue(BaseModel):
"""
Download queue management.
Manages the queue of episodes waiting to be downloaded with
priority handling and scheduling.
"""
model_config = ConfigDict(validate_assignment=True)
items: List[DownloadQueueItem] = Field(default_factory=list)
max_size: int = Field(default=100, gt=0)
last_updated: datetime = Field(default_factory=datetime.now)
def add_item(self, item: DownloadQueueItem) -> bool:
"""Add an item to the queue."""
if len(self.items) >= self.max_size:
return False
# Check for duplicates
for existing_item in self.items:
if (existing_item.media_id == item.media_id and
existing_item.episode_number == item.episode_number):
return False
self.items.append(item)
# Sort by priority (highest first), then by added date
self.items.sort(key=lambda x: (-x.priority, x.added_date))
self.last_updated = datetime.now()
return True
def get_next_item(self) -> Optional[DownloadQueueItem]:
"""Get the next item to download."""
if not self.items:
return None
return self.items[0]
def remove_item(self, media_id: int, episode_number: int) -> bool:
"""Remove an item from the queue."""
for i, item in enumerate(self.items):
if item.media_id == media_id and item.episode_number == episode_number:
del self.items[i]
self.last_updated = datetime.now()
return True
return False
def clear(self) -> None:
"""Clear all items from the queue."""
self.items.clear()
self.last_updated = datetime.now()
@computed_field
@property
def total_estimated_size(self) -> int:
"""Get total estimated size of all queued items."""
return sum(item.estimated_size or 0 for item in self.items)

View File

@@ -1,302 +0,0 @@
"""
Download progress tracking and integration with the download system.
This module provides real-time tracking of download progress and integrates
with the existing download infrastructure to provide progress updates.
"""
from __future__ import annotations
import logging
import threading
import time
from datetime import datetime
from pathlib import Path
from typing import Callable, Dict, Optional
from ....core.config.model import DownloadsConfig
from .manager import DownloadManager, get_download_manager
from .models import DownloadQueueItem
logger = logging.getLogger(__name__)
# Type alias for progress callback
ProgressCallback = Callable[[int, int, float, float], None] # media_id, episode, progress, speed
class DownloadTracker:
"""
Tracks download progress and integrates with the download manager.
Provides real-time progress updates and handles integration between
the actual download process and the tracking system.
"""
def __init__(self, config: DownloadsConfig):
self.config = config
self.download_manager = get_download_manager(config)
# Track active downloads
self._active_downloads: Dict[str, DownloadSession] = {}
self._lock = threading.RLock()
# Progress callbacks
self._progress_callbacks: list[ProgressCallback] = []
def add_progress_callback(self, callback: ProgressCallback) -> None:
"""Add a callback function to receive progress updates."""
with self._lock:
self._progress_callbacks.append(callback)
def remove_progress_callback(self, callback: ProgressCallback) -> None:
"""Remove a progress callback."""
with self._lock:
if callback in self._progress_callbacks:
self._progress_callbacks.remove(callback)
def start_download(self, queue_item: DownloadQueueItem) -> str:
"""Start tracking a download and return session ID."""
with self._lock:
session_id = f"{queue_item.media_id}_{queue_item.episode_number}_{int(time.time())}"
session = DownloadSession(
session_id=session_id,
queue_item=queue_item,
tracker=self
)
self._active_downloads[session_id] = session
# Mark download as started in manager
self.download_manager.mark_download_started(
queue_item.media_id,
queue_item.episode_number
)
logger.info(f"Started download tracking for session {session_id}")
return session_id
def update_progress(self, session_id: str, progress: float,
speed: Optional[float] = None) -> None:
"""Update download progress for a session."""
with self._lock:
if session_id not in self._active_downloads:
logger.warning(f"Unknown download session: {session_id}")
return
session = self._active_downloads[session_id]
session.update_progress(progress, speed)
# Notify callbacks
for callback in self._progress_callbacks:
try:
callback(
session.queue_item.media_id,
session.queue_item.episode_number,
progress,
speed or 0.0
)
except Exception as e:
logger.error(f"Error in progress callback: {e}")
def complete_download(self, session_id: str, file_path: Path,
file_size: int, checksum: Optional[str] = None) -> bool:
"""Mark a download as completed."""
with self._lock:
if session_id not in self._active_downloads:
logger.warning(f"Unknown download session: {session_id}")
return False
session = self._active_downloads[session_id]
session.mark_completed(file_path, file_size, checksum)
# Update download manager
success = self.download_manager.mark_download_completed(
session.queue_item.media_id,
session.queue_item.episode_number,
file_path,
file_size,
checksum
)
# Remove from active downloads
del self._active_downloads[session_id]
logger.info(f"Completed download session {session_id}")
return success
def fail_download(self, session_id: str, error_message: str) -> bool:
"""Mark a download as failed."""
with self._lock:
if session_id not in self._active_downloads:
logger.warning(f"Unknown download session: {session_id}")
return False
session = self._active_downloads[session_id]
session.mark_failed(error_message)
# Update download manager
success = self.download_manager.mark_download_failed(
session.queue_item.media_id,
session.queue_item.episode_number,
error_message
)
# Remove from active downloads
del self._active_downloads[session_id]
logger.warning(f"Failed download session {session_id}: {error_message}")
return success
def get_active_downloads(self) -> Dict[str, 'DownloadSession']:
"""Get all currently active download sessions."""
with self._lock:
return self._active_downloads.copy()
def cancel_download(self, session_id: str) -> bool:
"""Cancel an active download."""
with self._lock:
if session_id not in self._active_downloads:
return False
session = self._active_downloads[session_id]
session.cancel()
# Mark as failed with cancellation message
self.download_manager.mark_download_failed(
session.queue_item.media_id,
session.queue_item.episode_number,
"Download cancelled by user"
)
del self._active_downloads[session_id]
logger.info(f"Cancelled download session {session_id}")
return True
def cleanup_stale_sessions(self, max_age_hours: int = 24) -> int:
"""Clean up stale download sessions that may have been orphaned."""
with self._lock:
current_time = datetime.now()
stale_sessions = []
for session_id, session in self._active_downloads.items():
age_hours = (current_time - session.start_time).total_seconds() / 3600
if age_hours > max_age_hours:
stale_sessions.append(session_id)
for session_id in stale_sessions:
self.fail_download(session_id, "Session timed out")
return len(stale_sessions)
class DownloadSession:
"""
Represents an active download session with progress tracking.
"""
def __init__(self, session_id: str, queue_item: DownloadQueueItem, tracker: DownloadTracker):
self.session_id = session_id
self.queue_item = queue_item
self.tracker = tracker
self.start_time = datetime.now()
# Progress tracking
self.progress = 0.0
self.download_speed = 0.0
self.bytes_downloaded = 0
self.total_bytes = queue_item.estimated_size or 0
# Status
self.is_cancelled = False
self.is_completed = False
self.error_message: Optional[str] = None
# Thread safety
self._lock = threading.Lock()
def update_progress(self, progress: float, speed: Optional[float] = None) -> None:
"""Update the progress of this download session."""
with self._lock:
if self.is_cancelled or self.is_completed:
return
self.progress = max(0.0, min(1.0, progress))
if speed is not None:
self.download_speed = speed
if self.total_bytes > 0:
self.bytes_downloaded = int(self.total_bytes * self.progress)
logger.debug(f"Session {self.session_id} progress: {self.progress:.2%}")
def mark_completed(self, file_path: Path, file_size: int, checksum: Optional[str] = None) -> None:
"""Mark this session as completed."""
with self._lock:
if self.is_cancelled:
return
self.is_completed = True
self.progress = 1.0
self.bytes_downloaded = file_size
self.total_bytes = file_size
def mark_failed(self, error_message: str) -> None:
"""Mark this session as failed."""
with self._lock:
if self.is_cancelled or self.is_completed:
return
self.error_message = error_message
def cancel(self) -> None:
"""Cancel this download session."""
with self._lock:
if self.is_completed:
return
self.is_cancelled = True
@property
def elapsed_time(self) -> float:
"""Get elapsed time in seconds."""
return (datetime.now() - self.start_time).total_seconds()
@property
def estimated_time_remaining(self) -> Optional[float]:
"""Get estimated time remaining in seconds."""
if self.progress <= 0 or self.download_speed <= 0:
return None
remaining_bytes = self.total_bytes - self.bytes_downloaded
if remaining_bytes <= 0:
return 0.0
return remaining_bytes / self.download_speed
@property
def status_text(self) -> str:
"""Get human-readable status."""
if self.is_cancelled:
return "Cancelled"
elif self.is_completed:
return "Completed"
elif self.error_message:
return f"Failed: {self.error_message}"
else:
return f"Downloading ({self.progress:.1%})"
# Global tracker instance
_download_tracker: Optional[DownloadTracker] = None
def get_download_tracker(config: DownloadsConfig) -> DownloadTracker:
"""Get or create the global download tracker instance."""
global _download_tracker
if _download_tracker is None:
_download_tracker = DownloadTracker(config)
return _download_tracker

View File

@@ -1,340 +0,0 @@
"""
Download validation and integrity checking utilities.
This module provides functionality to validate downloaded files, verify
integrity, and repair corrupted download records.
"""
from __future__ import annotations
import json
import logging
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from pydantic import ValidationError
from ....core.constants import APP_DATA_DIR
from .manager import DownloadManager
from .models import DownloadIndex, MediaDownloadRecord
logger = logging.getLogger(__name__)
class DownloadValidator:
"""
Validator for download records and file integrity using Pydantic models.
Provides functionality to validate, repair, and maintain the integrity
of download tracking data and associated files.
"""
def __init__(self, download_manager: DownloadManager):
self.download_manager = download_manager
self.tracking_dir = APP_DATA_DIR / "downloads"
self.media_dir = self.tracking_dir / "media"
def validate_download_record(self, file_path: Path) -> Optional[MediaDownloadRecord]:
"""Load and validate a download record with Pydantic."""
try:
with open(file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
record = MediaDownloadRecord.model_validate(data)
logger.debug(f"Successfully validated download record: {file_path}")
return record
except ValidationError as e:
logger.error(f"Invalid download record {file_path}: {e}")
return None
except Exception as e:
logger.error(f"Failed to load download record {file_path}: {e}")
return None
def validate_all_records(self) -> Tuple[List[MediaDownloadRecord], List[Path]]:
"""Validate all download records and return valid records and invalid file paths."""
valid_records = []
invalid_files = []
if not self.media_dir.exists():
logger.warning("Media directory does not exist")
return valid_records, invalid_files
for record_file in self.media_dir.glob("*.json"):
record = self.validate_download_record(record_file)
if record:
valid_records.append(record)
else:
invalid_files.append(record_file)
logger.info(f"Validated {len(valid_records)} records, found {len(invalid_files)} invalid files")
return valid_records, invalid_files
def verify_file_integrity(self, record: MediaDownloadRecord) -> Dict[int, bool]:
"""Verify file integrity for all episodes in a download record."""
integrity_results = {}
for episode_num, episode_download in record.episodes.items():
if episode_download.status != "completed":
integrity_results[episode_num] = True # Skip non-completed downloads
continue
# Check if file exists
if not episode_download.file_path.exists():
logger.warning(f"Missing file for episode {episode_num}: {episode_download.file_path}")
integrity_results[episode_num] = False
continue
# Verify file size
actual_size = episode_download.file_path.stat().st_size
if actual_size != episode_download.file_size:
logger.warning(f"Size mismatch for episode {episode_num}: expected {episode_download.file_size}, got {actual_size}")
integrity_results[episode_num] = False
continue
# Verify checksum if available
if episode_download.checksum:
if not episode_download.verify_integrity():
logger.warning(f"Checksum mismatch for episode {episode_num}")
integrity_results[episode_num] = False
continue
integrity_results[episode_num] = True
return integrity_results
def repair_download_record(self, record_file: Path) -> bool:
"""Attempt to repair a corrupted download record."""
try:
# Try to load raw data
with open(record_file, 'r', encoding='utf-8') as f:
data = json.load(f)
# Attempt basic repairs
repaired_data = self._attempt_basic_repairs(data)
# Try to validate repaired data
try:
repaired_record = MediaDownloadRecord.model_validate(repaired_data)
# Save repaired record
self.download_manager.save_download_record(repaired_record)
logger.info(f"Successfully repaired download record: {record_file}")
return True
except ValidationError as e:
logger.error(f"Could not repair download record {record_file}: {e}")
return False
except Exception as e:
logger.error(f"Failed to repair download record {record_file}: {e}")
return False
def _attempt_basic_repairs(self, data: Dict) -> Dict:
"""Attempt basic repairs on download record data."""
repaired = data.copy()
# Ensure required fields exist with defaults
if "episodes" not in repaired:
repaired["episodes"] = {}
if "created_date" not in repaired:
repaired["created_date"] = "2024-01-01T00:00:00"
if "last_updated" not in repaired:
repaired["last_updated"] = "2024-01-01T00:00:00"
if "tags" not in repaired:
repaired["tags"] = []
if "preferred_quality" not in repaired:
repaired["preferred_quality"] = "1080"
if "auto_download_new" not in repaired:
repaired["auto_download_new"] = False
# Fix episodes data
if isinstance(repaired["episodes"], dict):
fixed_episodes = {}
for ep_num, ep_data in repaired["episodes"].items():
if isinstance(ep_data, dict):
# Ensure required episode fields
if "episode_number" not in ep_data:
ep_data["episode_number"] = int(ep_num) if ep_num.isdigit() else 1
if "status" not in ep_data:
ep_data["status"] = "queued"
if "download_progress" not in ep_data:
ep_data["download_progress"] = 0.0
if "file_size" not in ep_data:
ep_data["file_size"] = 0
if "subtitle_files" not in ep_data:
ep_data["subtitle_files"] = []
fixed_episodes[ep_num] = ep_data
repaired["episodes"] = fixed_episodes
return repaired
def rebuild_index_from_records(self) -> bool:
"""Rebuild the download index from individual record files."""
try:
valid_records, _ = self.validate_all_records()
# Create new index
new_index = DownloadIndex()
# Add all valid records to index
for record in valid_records:
new_index.add_media_entry(record)
# Save rebuilt index
self.download_manager._save_index(new_index)
logger.info(f"Rebuilt download index with {len(valid_records)} records")
return True
except Exception as e:
logger.error(f"Failed to rebuild index: {e}")
return False
def cleanup_orphaned_files(self) -> int:
"""Clean up orphaned files and inconsistent records."""
cleanup_count = 0
try:
# Load current index
index = self.download_manager._load_index()
# Check for orphaned record files
if self.media_dir.exists():
for record_file in self.media_dir.glob("*.json"):
media_id = int(record_file.stem)
if media_id not in index.media_index:
# Check if record is valid
record = self.validate_download_record(record_file)
if record:
# Add to index
index.add_media_entry(record)
logger.info(f"Re-added orphaned record to index: {media_id}")
else:
# Remove invalid file
record_file.unlink()
cleanup_count += 1
logger.info(f"Removed invalid record file: {record_file}")
# Check for missing record files
missing_records = []
for media_id, index_entry in index.media_index.items():
if not index_entry.file_path.exists():
missing_records.append(media_id)
# Remove missing records from index
for media_id in missing_records:
index.remove_media_entry(media_id)
cleanup_count += 1
logger.info(f"Removed missing record from index: {media_id}")
# Save updated index
if cleanup_count > 0:
self.download_manager._save_index(index)
return cleanup_count
except Exception as e:
logger.error(f"Failed to cleanup orphaned files: {e}")
return 0
def validate_file_paths(self, record: MediaDownloadRecord) -> List[str]:
"""Validate file paths in a download record and return issues."""
issues = []
# Check download path
if not record.download_path.is_absolute():
issues.append(f"Download path is not absolute: {record.download_path}")
# Check episode file paths
for episode_num, episode_download in record.episodes.items():
if not episode_download.file_path.is_absolute():
issues.append(f"Episode {episode_num} file path is not absolute: {episode_download.file_path}")
# Check if file exists for completed downloads
if episode_download.status == "completed" and not episode_download.file_path.exists():
issues.append(f"Episode {episode_num} file does not exist: {episode_download.file_path}")
# Check subtitle files
for subtitle_file in episode_download.subtitle_files:
if not subtitle_file.exists():
issues.append(f"Episode {episode_num} subtitle file does not exist: {subtitle_file}")
return issues
def generate_validation_report(self) -> Dict:
"""Generate a comprehensive validation report."""
report = {
"timestamp": str(datetime.now()),
"total_records": 0,
"valid_records": 0,
"invalid_records": 0,
"integrity_issues": 0,
"orphaned_files": 0,
"path_issues": 0,
"details": {
"invalid_files": [],
"integrity_failures": [],
"path_issues": []
}
}
try:
# Validate all records
valid_records, invalid_files = self.validate_all_records()
report["total_records"] = len(valid_records) + len(invalid_files)
report["valid_records"] = len(valid_records)
report["invalid_records"] = len(invalid_files)
report["details"]["invalid_files"] = [str(f) for f in invalid_files]
# Check integrity and paths for valid records
for record in valid_records:
# Check file integrity
integrity_results = self.verify_file_integrity(record)
failed_episodes = [ep for ep, result in integrity_results.items() if not result]
if failed_episodes:
report["integrity_issues"] += len(failed_episodes)
report["details"]["integrity_failures"].append({
"media_id": record.media_item.id,
"title": record.display_title,
"failed_episodes": failed_episodes
})
# Check file paths
path_issues = self.validate_file_paths(record)
if path_issues:
report["path_issues"] += len(path_issues)
report["details"]["path_issues"].append({
"media_id": record.media_item.id,
"title": record.display_title,
"issues": path_issues
})
# Check for orphaned files
orphaned_count = self.cleanup_orphaned_files()
report["orphaned_files"] = orphaned_count
except Exception as e:
logger.error(f"Failed to generate validation report: {e}")
report["error"] = str(e)
return report
def validate_downloads(download_manager: DownloadManager) -> Dict:
"""Convenience function to validate all downloads and return a report."""
validator = DownloadValidator(download_manager)
return validator.generate_validation_report()

View File

@@ -0,0 +1,277 @@
from typing import List
from ....libs.api.params import ApiSearchParams
from ....libs.api.types import MediaItem
class MediaFilter:
"""
A class to filter, sort, and paginate a list of MediaItem objects
based on ApiSearchParams.
"""
# Mapping for season to month range (MMDD format)
_SEASON_MONTH_RANGES = {
"WINTER": (101, 331), # Jan 1 - Mar 31
"SPRING": (401, 630), # Apr 1 - Jun 30
"SUMMER": (701, 930), # Jul 1 - Sep 30
"FALL": (1001, 1231), # Oct 1 - Dec 31
}
# Mapping for sort parameters to MediaItem attributes and order
# (attribute_name, is_descending, is_nested_title_field)
_SORT_MAPPING = {
"ID": ("id", False, False),
"ID_DESC": ("id", True, False),
"POPULARITY": ("popularity", False, False),
"POPULARITY_DESC": ("popularity", True, False),
"SCORE": ("average_score", False, False),
"SCORE_DESC": ("average_score", True, False),
"TITLE_ROMAJI": ("romaji", False, True), # Nested under title
"TITLE_ROMAJI_DESC": ("romaji", True, True),
"TITLE_ENGLISH": ("english", False, True),
"TITLE_ENGLISH_DESC": ("english", True, True),
"START_DATE": ("start_date", False, False),
"START_DATE_DESC": ("start_date", True, False),
}
@classmethod
def apply(
cls, media_items: List[MediaItem], filters: ApiSearchParams
) -> List[MediaItem]:
"""
Applies filtering, sorting, and pagination to a list of MediaItem objects.
Args:
media_items: The initial list of MediaItem objects to filter.
params: An ApiSearchParams object containing the filter, sort, and pagination criteria.
Returns:
A new list of MediaItem objects, filtered, sorted, and paginated.
"""
filtered_items = list(media_items) # Create a mutable copy
if filters.query:
query_lower = filters.query.lower()
filtered_items = [
item
for item in filtered_items
if (
item.title
and (
(item.title.romaji and query_lower in item.title.romaji.lower())
or (
item.title.english
and query_lower in item.title.english.lower()
)
or (
item.title.native
and query_lower in item.title.native.lower()
)
)
)
or (item.description and query_lower in item.description.lower())
or any(query_lower in syn.lower() for syn in item.synonyms)
]
# IDs
if filters.id_in:
id_set = set(filters.id_in)
filtered_items = [item for item in filtered_items if item.id in id_set]
# Genres
if filters.genre_in:
genre_in_set = set(g.lower() for g in filters.genre_in)
filtered_items = [
item
for item in filtered_items
if any(g.lower() in genre_in_set for g in item.genres)
]
if filters.genre_not_in:
genre_not_in_set = set(g.lower() for g in filters.genre_not_in)
filtered_items = [
item
for item in filtered_items
if not any(g.lower() in genre_not_in_set for g in item.genres)
]
# Tags
if filters.tag_in:
tag_in_set = set(t.lower() for t in filters.tag_in)
filtered_items = [
item
for item in filtered_items
if any(tag.name and tag.name.lower() in tag_in_set for tag in item.tags)
]
if filters.tag_not_in:
tag_not_in_set = set(t.lower() for t in filters.tag_not_in)
filtered_items = [
item
for item in filtered_items
if not any(
tag.name and tag.name.lower() in tag_not_in_set for tag in item.tags
)
]
# Status
combined_status_in = set()
if filters.status_in:
combined_status_in.update(s.upper() for s in filters.status_in)
if filters.status:
combined_status_in.add(filters.status.upper())
if combined_status_in:
filtered_items = [
item
for item in filtered_items
if item.status and item.status.upper() in combined_status_in
]
if filters.status_not_in:
status_not_in_set = set(s.upper() for s in filters.status_not_in)
filtered_items = [
item
for item in filtered_items
if item.status and item.status.upper() not in status_not_in_set
]
# Popularity
if filters.popularity_greater is not None:
filtered_items = [
item
for item in filtered_items
if item.popularity is not None
and item.popularity > filters.popularity_greater
]
if filters.popularity_lesser is not None:
filtered_items = [
item
for item in filtered_items
if item.popularity is not None
and item.popularity < filters.popularity_lesser
]
# Average Score
if filters.averageScore_greater is not None:
filtered_items = [
item
for item in filtered_items
if item.average_score is not None
and item.average_score > filters.averageScore_greater
]
if filters.averageScore_lesser is not None:
filtered_items = [
item
for item in filtered_items
if item.average_score is not None
and item.average_score < filters.averageScore_lesser
]
# Date Filtering (combining season/year with startDate parameters)
effective_start_date_greater = filters.startDate_greater
effective_start_date_lesser = filters.startDate_lesser
if filters.seasonYear is not None and filters.season is not None:
season_range = cls._SEASON_MONTH_RANGES.get(filters.season.upper())
if season_range:
# Calculate start and end of the season in YYYYMMDD format
season_start_date = filters.seasonYear * 10000 + season_range[0]
season_end_date = filters.seasonYear * 10000 + season_range[1]
# Combine with existing startDate_greater/lesser, taking the stricter boundary
effective_start_date_greater = max(
effective_start_date_greater or 0, season_start_date
)
effective_start_date_lesser = min(
effective_start_date_lesser or 99999999, season_end_date
)
# TODO: re enable date filtering since date is a datetime
# if filters.startDate is not None:
# # If a specific start date is given, it overrides ranges for exact match
# filtered_items = [
# item for item in filtered_items if item.start_date == filters.startDate
# ]
# else:
# if effective_start_date_greater is not None:
# filtered_items = [
# item
# for item in filtered_items
# if item.start_date is not None
# and item.start_date >= datetime(y,m,d)
# ]
# if effective_start_date_lesser is not None:
# filtered_items = [
# item
# for item in filtered_items
# if item.start_date is not None
# and item.start_date <= effective_start_date_lesser
# ]
# if filters.endDate_greater is not None:
# filtered_items = [
# item
# for item in filtered_items
# if item.end_date is not None
# and item.end_date >= filters.endDate_greater
# ]
# if filters.endDate_lesser is not None:
# filtered_items = [
# item
# for item in filtered_items
# if item.end_date is not None and item.end_date <= filters.endDate_lesser
# ]
# Format and Type
if filters.format_in:
format_in_set = set(f.upper() for f in filters.format_in)
filtered_items = [
item
for item in filtered_items
if item.format and item.format.upper() in format_in_set
]
if filters.type:
filtered_items = [
item
for item in filtered_items
if item.type and item.type.upper() == filters.type.upper()
]
# --- 2. Apply Sorting ---
if filters.sort:
sort_criteria = (
[filters.sort] if isinstance(filters.sort, str) else filters.sort
)
# Sort in reverse order of criteria so the first criterion is primary
for sort_param in reversed(sort_criteria):
sort_info = cls._SORT_MAPPING.get(sort_param.upper())
if sort_info:
attr_name, is_descending, is_nested_title = sort_info
def sort_key(item: MediaItem):
if is_nested_title:
# Handle nested title attributes
title_obj = item.title
if title_obj and hasattr(title_obj, attr_name):
val = getattr(title_obj, attr_name)
return val.lower() if isinstance(val, str) else val
return None # Handle missing title or attribute gracefully
else:
# Handle direct attributes
return getattr(item, attr_name)
# Sort, handling None values (None typically sorts first in ascending)
filtered_items.sort(
key=lambda item: (sort_key(item) is None, sort_key(item)),
reverse=is_descending,
)
else:
print(f"Warning: Unknown sort parameter '{sort_param}'. Skipping.")
# --- 3. Apply Pagination ---
start_index = (filters.page - 1) * filters.per_page
end_index = start_index + filters.per_page
paginated_items = filtered_items[start_index:end_index]
return paginated_items

View File

@@ -1,380 +0,0 @@
"""
Unified Media Registry Manager.
Provides centralized management of anime metadata, downloads, and watch history
through a single interface, eliminating data duplication.
"""
from __future__ import annotations
import json
import logging
import threading
from pathlib import Path
from typing import Dict, List, Optional
from ....core.constants import APP_DATA_DIR
from ....libs.api.types import MediaItem
from .models import MediaRecord, MediaRegistryIndex, EpisodeStatus, UserMediaData
logger = logging.getLogger(__name__)
class MediaRegistryManager:
"""
Unified manager for anime data, downloads, and watch history.
Provides a single interface for all media-related operations,
eliminating duplication between download and watch systems.
"""
def __init__(self, registry_path: Path = None):
self.registry_path = registry_path or APP_DATA_DIR / "media_registry"
self.media_dir = self.registry_path / "media"
self.cache_dir = self.registry_path / "cache"
self.index_file = self.registry_path / "index.json"
# Thread safety
self._lock = threading.RLock()
# Cached data
self._index: Optional[MediaRegistryIndex] = None
self._loaded_records: Dict[int, MediaRecord] = {}
self._ensure_directories()
def _ensure_directories(self) -> None:
"""Ensure registry directories exist."""
try:
self.registry_path.mkdir(parents=True, exist_ok=True)
self.media_dir.mkdir(exist_ok=True)
self.cache_dir.mkdir(exist_ok=True)
except Exception as e:
logger.error(f"Failed to create registry directories: {e}")
def _load_index(self) -> MediaRegistryIndex:
"""Load or create the registry index."""
if self._index is not None:
return self._index
try:
if self.index_file.exists():
with open(self.index_file, 'r', encoding='utf-8') as f:
data = json.load(f)
self._index = MediaRegistryIndex.model_validate(data)
else:
self._index = MediaRegistryIndex()
self._save_index()
logger.debug(f"Loaded registry index with {self._index.media_count} entries")
return self._index
except Exception as e:
logger.error(f"Failed to load registry index: {e}")
self._index = MediaRegistryIndex()
return self._index
def _save_index(self) -> bool:
"""Save the registry index."""
try:
# Atomic write
temp_file = self.index_file.with_suffix('.tmp')
with open(temp_file, 'w', encoding='utf-8') as f:
json.dump(self._index.model_dump(), f, indent=2, ensure_ascii=False, default=str)
temp_file.replace(self.index_file)
logger.debug("Saved registry index")
return True
except Exception as e:
logger.error(f"Failed to save registry index: {e}")
return False
def _get_media_file_path(self, media_id: int) -> Path:
"""Get file path for media record."""
return self.media_dir / str(media_id) / "record.json"
def get_media_record(self, media_id: int) -> Optional[MediaRecord]:
"""Get media record by ID."""
with self._lock:
# Check cache first
if media_id in self._loaded_records:
return self._loaded_records[media_id]
try:
record_file = self._get_media_file_path(media_id)
if not record_file.exists():
return None
with open(record_file, 'r', encoding='utf-8') as f:
data = json.load(f)
record = MediaRecord.model_validate(data)
self._loaded_records[media_id] = record
logger.debug(f"Loaded media record for {media_id}")
return record
except Exception as e:
logger.error(f"Failed to load media record {media_id}: {e}")
return None
def save_media_record(self, record: MediaRecord) -> bool:
"""Save media record to storage."""
with self._lock:
try:
media_id = record.media_item.id
record_file = self._get_media_file_path(media_id)
# Ensure directory exists
record_file.parent.mkdir(parents=True, exist_ok=True)
# Atomic write
temp_file = record_file.with_suffix('.tmp')
with open(temp_file, 'w', encoding='utf-8') as f:
json.dump(record.model_dump(), f, indent=2, ensure_ascii=False, default=str)
temp_file.replace(record_file)
# Update cache and index
self._loaded_records[media_id] = record
index = self._load_index()
index.add_media_entry(record)
self._save_index()
logger.debug(f"Saved media record for {media_id}")
return True
except Exception as e:
logger.error(f"Failed to save media record: {e}")
return False
def get_or_create_record(self, media_item: MediaItem) -> MediaRecord:
"""Get existing record or create new one."""
record = self.get_media_record(media_item.id)
if record is None:
record = MediaRecord(media_item=media_item)
self.save_media_record(record)
else:
# Update media_item in case metadata changed
record.media_item = media_item
record.user_data.update_timestamp()
self.save_media_record(record)
return record
def update_download_completion(self, media_item: MediaItem, episode_number: int,
file_path: Path, file_size: int, quality: str,
checksum: Optional[str] = None) -> bool:
"""Update record when download completes."""
try:
record = self.get_or_create_record(media_item)
record.update_from_download_completion(
episode_number, file_path, file_size, quality, checksum
)
return self.save_media_record(record)
except Exception as e:
logger.error(f"Failed to update download completion: {e}")
return False
def update_from_player_result(self, media_item: MediaItem, episode_number: int,
stop_time: str, total_time: str) -> bool:
"""Update record from player feedback."""
try:
record = self.get_or_create_record(media_item)
record.update_from_player_result(episode_number, stop_time, total_time)
return self.save_media_record(record)
except Exception as e:
logger.error(f"Failed to update from player result: {e}")
return False
def mark_episode_watched(self, media_id: int, episode_number: int,
progress: float = 1.0) -> bool:
"""Mark episode as watched."""
try:
record = self.get_media_record(media_id)
if not record:
return False
episode = record.get_episode_status(episode_number)
episode.watch_status = "completed" if progress >= 0.8 else "watching"
episode.watch_progress = progress
episode.watch_date = datetime.now()
episode.watch_count += 1
record.user_data.update_timestamp()
return self.save_media_record(record)
except Exception as e:
logger.error(f"Failed to mark episode watched: {e}")
return False
def get_currently_watching(self) -> List[MediaRecord]:
"""Get anime currently being watched."""
try:
index = self._load_index()
watching_records = []
for entry in index.media_index.values():
if entry.user_status == "watching":
record = self.get_media_record(entry.media_id)
if record:
watching_records.append(record)
return watching_records
except Exception as e:
logger.error(f"Failed to get currently watching: {e}")
return []
def get_recently_watched(self, limit: int = 10) -> List[MediaRecord]:
"""Get recently watched anime."""
try:
index = self._load_index()
# Sort by last updated
sorted_entries = sorted(
index.media_index.values(),
key=lambda x: x.last_updated,
reverse=True
)
recent_records = []
for entry in sorted_entries[:limit]:
if entry.episodes_watched > 0: # Only include if actually watched
record = self.get_media_record(entry.media_id)
if record:
recent_records.append(record)
return recent_records
except Exception as e:
logger.error(f"Failed to get recently watched: {e}")
return []
def get_download_queue_candidates(self) -> List[MediaRecord]:
"""Get anime that have downloads queued or in progress."""
try:
index = self._load_index()
download_records = []
for entry in index.media_index.values():
if entry.episodes_downloaded < entry.total_episodes:
record = self.get_media_record(entry.media_id)
if record:
# Check if any episodes are queued/downloading
has_active_downloads = any(
ep.download_status in ["queued", "downloading"]
for ep in record.episodes.values()
)
if has_active_downloads:
download_records.append(record)
return download_records
except Exception as e:
logger.error(f"Failed to get download queue candidates: {e}")
return []
def get_continue_episode(self, media_id: int, available_episodes: List[str]) -> Optional[str]:
"""Get episode to continue from based on watch history."""
try:
record = self.get_media_record(media_id)
if not record:
return None
next_episode = record.next_episode_to_watch
if next_episode and str(next_episode) in available_episodes:
return str(next_episode)
return None
except Exception as e:
logger.error(f"Failed to get continue episode: {e}")
return None
def get_registry_stats(self) -> Dict:
"""Get comprehensive registry statistics."""
try:
index = self._load_index()
total_downloaded = sum(entry.episodes_downloaded for entry in index.media_index.values())
total_watched = sum(entry.episodes_watched for entry in index.media_index.values())
return {
"total_anime": index.media_count,
"status_breakdown": index.status_breakdown,
"total_episodes_downloaded": total_downloaded,
"total_episodes_watched": total_watched,
"last_updated": index.last_updated.strftime("%Y-%m-%d %H:%M:%S"),
}
except Exception as e:
logger.error(f"Failed to get registry stats: {e}")
return {}
def search_media(self, query: str) -> List[MediaRecord]:
"""Search media by title."""
try:
index = self._load_index()
query_lower = query.lower()
results = []
for entry in index.media_index.values():
if query_lower in entry.title.lower():
record = self.get_media_record(entry.media_id)
if record:
results.append(record)
return results
except Exception as e:
logger.error(f"Failed to search media: {e}")
return []
def remove_media_record(self, media_id: int) -> bool:
"""Remove media record completely."""
with self._lock:
try:
# Remove from cache
if media_id in self._loaded_records:
del self._loaded_records[media_id]
# Remove file
record_file = self._get_media_file_path(media_id)
if record_file.exists():
record_file.unlink()
# Remove directory if empty
try:
record_file.parent.rmdir()
except OSError:
pass # Directory not empty
# Update index
index = self._load_index()
if media_id in index.media_index:
del index.media_index[media_id]
index.media_count = len(index.media_index)
self._save_index()
logger.debug(f"Removed media record {media_id}")
return True
except Exception as e:
logger.error(f"Failed to remove media record {media_id}: {e}")
return False
# Global instance
_media_registry: Optional[MediaRegistryManager] = None
def get_media_registry() -> MediaRegistryManager:
"""Get or create the global media registry instance."""
global _media_registry
if _media_registry is None:
_media_registry = MediaRegistryManager()
return _media_registry

View File

@@ -1,346 +1,95 @@
"""
Unified data models for Media Registry.
Provides single source of truth for anime metadata, episode tracking,
and user data, eliminating duplication between download and watch systems.
"""
from __future__ import annotations
import logging
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Literal, Optional
from typing import Dict, Literal, Optional
from pydantic import BaseModel, ConfigDict, Field, computed_field
from pydantic import BaseModel, Field, computed_field
from ....libs.api.types import MediaItem
from ...utils import converters
logger = logging.getLogger(__name__)
# Type aliases
DownloadStatus = Literal["not_downloaded", "queued", "downloading", "completed", "failed", "paused"]
WatchStatus = Literal["not_watched", "watching", "completed", "dropped", "paused"]
DownloadStatus = Literal[
"not_downloaded", "queued", "downloading", "completed", "failed", "paused"
]
MediaUserStatus = Literal["planning", "watching", "completed", "dropped", "paused"]
REGISTRY_VERSION = "1.0"
class EpisodeStatus(BaseModel):
"""
Unified episode status tracking both download and watch state.
Single source of truth for episode-level data.
"""
model_config = ConfigDict(
str_strip_whitespace=True,
validate_assignment=True,
)
episode_number: int = Field(gt=0)
class MediaEpisode(BaseModel):
episode_number: str
# Download tracking
download_status: DownloadStatus = "not_downloaded"
file_path: Optional[Path] = None
file_size: Optional[int] = None
download_date: Optional[datetime] = None
download_quality: Optional[str] = None
checksum: Optional[str] = None
# Watch tracking (from player feedback)
watch_status: WatchStatus = "not_watched"
watch_progress: float = Field(default=0.0, ge=0.0, le=1.0)
last_watch_position: Optional[str] = None # "HH:MM:SS" from PlayerResult
total_duration: Optional[str] = None # "HH:MM:SS" from PlayerResult
watch_date: Optional[datetime] = None
watch_count: int = Field(default=0, ge=0)
# Integration fields
auto_marked_watched: bool = Field(default=False, description="Auto-marked watched from download")
@computed_field
@property
def is_available_locally(self) -> bool:
"""Check if episode is downloaded and file exists."""
return (
self.download_status == "completed"
and self.file_path is not None
and self.file_path.exists()
)
@computed_field
@property
def completion_percentage(self) -> float:
"""Calculate actual watch completion from player data."""
if self.last_watch_position and self.total_duration:
try:
last_seconds = self._time_to_seconds(self.last_watch_position)
total_seconds = self._time_to_seconds(self.total_duration)
if total_seconds > 0:
return min(100.0, (last_seconds / total_seconds) * 100)
except (ValueError, AttributeError):
pass
return self.watch_progress * 100
@computed_field
@property
def should_auto_mark_watched(self) -> bool:
"""Check if episode should be auto-marked as watched."""
return self.completion_percentage >= 80.0 and self.watch_status != "completed"
def _time_to_seconds(self, time_str: str) -> int:
"""Convert HH:MM:SS to seconds."""
try:
parts = time_str.split(':')
if len(parts) == 3:
h, m, s = map(int, parts)
return h * 3600 + m * 60 + s
except (ValueError, AttributeError):
pass
return 0
def update_from_player_result(self, stop_time: str, total_time: str) -> None:
"""Update watch status from PlayerResult."""
self.last_watch_position = stop_time
self.total_duration = total_time
self.watch_date = datetime.now()
self.watch_count += 1
# Auto-mark as completed if 80%+ watched
if self.should_auto_mark_watched:
self.watch_status = "completed"
self.watch_progress = 1.0
class UserMediaData(BaseModel):
"""
User-specific data for a media item.
Consolidates user preferences from both download and watch systems.
"""
model_config = ConfigDict(
str_strip_whitespace=True,
validate_assignment=True,
)
# User status and preferences
status: MediaUserStatus = "planning"
notes: str = ""
tags: List[str] = Field(default_factory=list)
rating: Optional[int] = Field(None, ge=1, le=10)
favorite: bool = False
priority: int = Field(default=0, ge=0)
# Download preferences
preferred_quality: str = "1080"
auto_download_new: bool = False
download_path: Optional[Path] = None
# Watch preferences
continue_from_history: bool = True
auto_mark_watched_on_download: bool = False
# Timestamps
created_date: datetime = Field(default_factory=datetime.now)
last_updated: datetime = Field(default_factory=datetime.now)
def update_timestamp(self) -> None:
"""Update last_updated timestamp."""
self.last_updated = datetime.now()
file_path: Path
download_date: datetime = Field(default_factory=datetime.now)
class MediaRecord(BaseModel):
"""
Unified media record - single source of truth for anime data.
Replaces both MediaDownloadRecord and WatchHistoryEntry.
"""
model_config = ConfigDict(
str_strip_whitespace=True,
validate_assignment=True,
)
media_item: MediaItem
episodes: Dict[int, EpisodeStatus] = Field(default_factory=dict)
user_data: UserMediaData = Field(default_factory=UserMediaData)
@computed_field
@property
def display_title(self) -> str:
"""Get display title for the anime."""
return (
self.media_item.title.english
or self.media_item.title.romaji
or self.media_item.title.native
or f"Anime #{self.media_item.id}"
)
@computed_field
@property
def total_episodes_downloaded(self) -> int:
"""Count of successfully downloaded episodes."""
return len([ep for ep in self.episodes.values() if ep.is_available_locally])
@computed_field
@property
def total_episodes_watched(self) -> int:
"""Count of completed episodes."""
return len([ep for ep in self.episodes.values() if ep.watch_status == "completed"])
@computed_field
@property
def last_watched_episode(self) -> int:
"""Get highest watched episode number."""
watched_episodes = [
ep.episode_number for ep in self.episodes.values()
if ep.watch_status == "completed"
]
return max(watched_episodes) if watched_episodes else 0
@computed_field
@property
def next_episode_to_watch(self) -> Optional[int]:
"""Get next episode to watch based on progress."""
if not self.episodes:
return 1
# Find highest completed episode
last_watched = self.last_watched_episode
if last_watched == 0:
return 1
next_ep = last_watched + 1
total_eps = self.media_item.episodes or float('inf')
return next_ep if next_ep <= total_eps else None
@computed_field
@property
def download_completion_percentage(self) -> float:
"""Download completion percentage."""
if not self.media_item.episodes or self.media_item.episodes == 0:
return 0.0
return (self.total_episodes_downloaded / self.media_item.episodes) * 100
media_episodes: list[MediaEpisode] = Field(default_factory=list)
class MediaRegistryIndexEntry(BaseModel):
media_id: int
media_api: Literal["anilist", "NONE", "jikan"] = "NONE"
status: MediaUserStatus = "watching"
progress: str = "0"
last_watch_position: Optional[str] = None
last_watched: datetime = Field(default_factory=datetime.now)
total_duration: Optional[str] = None
total_episodes: int = 0
score: float = 0
repeat: int = 0
notes: str = ""
last_notified_episode: Optional[str] = None
# for first watch only
start_date: datetime = Field(default_factory=datetime.now)
completed_at: datetime = Field(default_factory=datetime.now)
@computed_field
@property
def watch_completion_percentage(self) -> float:
"""Watch completion percentage."""
if not self.media_item.episodes or self.media_item.episodes == 0:
return 0.0
return (self.total_episodes_watched / self.media_item.episodes) * 100
def get_episode_status(self, episode_number: int) -> EpisodeStatus:
"""Get or create episode status."""
if episode_number not in self.episodes:
self.episodes[episode_number] = EpisodeStatus(episode_number=episode_number)
return self.episodes[episode_number]
def update_from_download_completion(self, episode_number: int, file_path: Path,
file_size: int, quality: str, checksum: Optional[str] = None) -> None:
"""Update episode status when download completes."""
episode = self.get_episode_status(episode_number)
episode.download_status = "completed"
episode.file_path = file_path
episode.file_size = file_size
episode.download_quality = quality
episode.checksum = checksum
episode.download_date = datetime.now()
# Auto-mark as watched if enabled
if self.user_data.auto_mark_watched_on_download and episode.watch_status == "not_watched":
episode.watch_status = "completed"
episode.watch_progress = 1.0
episode.auto_marked_watched = True
episode.watch_date = datetime.now()
self.user_data.update_timestamp()
def update_from_player_result(self, episode_number: int, stop_time: str, total_time: str) -> None:
"""Update episode status from player feedback."""
episode = self.get_episode_status(episode_number)
episode.update_from_player_result(stop_time, total_time)
self.user_data.update_timestamp()
# Update overall status based on progress
if episode.watch_status == "completed":
if self.user_data.status == "planning":
self.user_data.status = "watching"
# Check if anime is completed
if self.media_item.episodes and self.total_episodes_watched >= self.media_item.episodes:
self.user_data.status = "completed"
"""Watch completion percentage."""
if self.total_duration and self.last_watch_position:
return (
converters.time_to_seconds(self.last_watch_position)
/ converters.time_to_seconds(self.total_duration)
) * 100
return 0.0
class MediaRegistryIndex(BaseModel):
"""
Lightweight index for fast media registry operations.
Provides quick access without loading full MediaRecord files.
"""
model_config = ConfigDict(validate_assignment=True)
version: str = Field(default="1.0")
version: str = Field(default=REGISTRY_VERSION)
last_updated: datetime = Field(default_factory=datetime.now)
media_count: int = Field(default=0, ge=0)
# Quick access index
media_index: Dict[int, "MediaIndexEntry"] = Field(default_factory=dict)
media_index: Dict[str, MediaRegistryIndexEntry] = Field(default_factory=dict)
@computed_field
@property
def status_breakdown(self) -> Dict[str, int]:
"""Get breakdown by user status."""
breakdown = {"planning": 0, "watching": 0, "completed": 0, "dropped": 0, "paused": 0}
breakdown = {}
for entry in self.media_index.values():
breakdown[entry.user_status] = breakdown.get(entry.user_status, 0) + 1
breakdown[entry.status] = breakdown.get(entry.status, 0) + 1
return breakdown
def add_media_entry(self, media_record: MediaRecord) -> None:
"""Add or update media entry in index."""
entry = MediaIndexEntry(
media_id=media_record.media_item.id,
title=media_record.display_title,
user_status=media_record.user_data.status,
episodes_downloaded=media_record.total_episodes_downloaded,
episodes_watched=media_record.total_episodes_watched,
total_episodes=media_record.media_item.episodes or 0,
last_updated=media_record.user_data.last_updated,
last_watched_episode=media_record.last_watched_episode,
next_episode=media_record.next_episode_to_watch
)
self.media_index[media_record.media_item.id] = entry
self.media_count = len(self.media_index)
self.last_updated = datetime.now()
class MediaIndexEntry(BaseModel):
"""Lightweight index entry for a media item."""
model_config = ConfigDict(validate_assignment=True)
media_id: int
title: str
user_status: MediaUserStatus
episodes_downloaded: int = 0
episodes_watched: int = 0
total_episodes: int = 0
last_updated: datetime
last_watched_episode: int = 0
next_episode: Optional[int] = None
@computed_field
@property
def download_progress(self) -> float:
"""Download progress percentage."""
if self.total_episodes == 0:
return 0.0
return (self.episodes_downloaded / self.total_episodes) * 100
def media_count_breakdown(self) -> Dict[str, int]:
breakdown = {}
for entry in self.media_index.values():
breakdown[entry.media_api] = breakdown.get(entry.media_api, 0) + 1
return breakdown
@computed_field
@property
def watch_progress(self) -> float:
"""Watch progress percentage."""
if self.total_episodes == 0:
return 0.0
return (self.episodes_watched / self.total_episodes) * 100
def media_count(self) -> int:
"""Get the number of media."""
return len(self.media_index)

View File

@@ -0,0 +1,229 @@
import json
import logging
from datetime import datetime
from pathlib import Path
from typing import Dict, Generator, List, Optional
from ....core.config.model import MediaRegistryConfig
from ....core.utils.file import AtomicWriter, FileLock, check_file_modified
from ....libs.api.params import ApiSearchParams
from ....libs.api.types import MediaItem
from ....libs.players.types import PlayerResult
from .filters import MediaFilter
from .models import (
MediaRecord,
MediaRegistryIndex,
MediaRegistryIndexEntry,
)
logger = logging.getLogger(__name__)
class MediaRegistryService:
def __init__(self, media_api: str, config: MediaRegistryConfig):
self.config = config
self.media_registry_dir = self.config.media_dir / media_api
self._media_api = media_api
self._ensure_directories()
self._index_file = self.config.index_dir / "registry.json"
self._index_file_modified_time = 0
_lock_file = self.config.media_dir / "registry.lock"
self._lock = FileLock(_lock_file)
def _ensure_directories(self) -> None:
"""Ensure registry directories exist."""
try:
self.media_registry_dir.mkdir(parents=True, exist_ok=True)
self.config.index_dir.mkdir(parents=True, exist_ok=True)
except Exception as e:
logger.error(f"Failed to create registry directories: {e}")
def _load_index(self) -> MediaRegistryIndex:
"""Load or create the registry index."""
self._index_file_modified_time, is_modified = check_file_modified(
self._index_file, self._index_file_modified_time
)
if not is_modified and self._index is not None:
return self._index
if self._index_file.exists():
with self._index_file.open("r", encoding="utf-8") as f:
data = json.load(f)
self._index = MediaRegistryIndex.model_validate(data)
else:
self._index = MediaRegistryIndex()
self._save_index(self._index)
logger.debug(f"Loaded registry index with {self._index.media_count} entries")
return self._index
def _save_index(self, index: MediaRegistryIndex):
"""Save the registry index."""
with self._lock:
index.last_updated = datetime.now()
with AtomicWriter(self._index_file) as f:
json.dump(index.model_dump(), f, indent=2)
logger.debug("saved registry index")
def get_media_index_entry(self, media_id: int) -> Optional[MediaRegistryIndexEntry]:
index = self._load_index()
return index.media_index.get(f"{self._media_api}_{media_id}")
def _get_media_file_path(self, media_id: int) -> Path:
"""Get file path for media record."""
return self.media_registry_dir / f"{media_id}.json"
def get_media_record(self, media_id: int) -> Optional[MediaRecord]:
record_file = self._get_media_file_path(media_id)
if not record_file.exists():
return None
data = json.load(record_file.open(mode="r", encoding="utf-8"))
record = MediaRecord.model_validate(data)
logger.debug(f"Loaded media record for {media_id}")
return record
def get_or_create_index_entry(self, media_id: int) -> MediaRegistryIndexEntry:
index_entry = self.get_media_index_entry(media_id)
if not index_entry:
index = self._load_index()
index_entry = MediaRegistryIndexEntry(
media_id=media_id,
media_api=self._media_api, # pyright:ignore
)
index.media_index[f"{self._media_api}_{media_id}"] = index_entry
self._save_index(index)
return index_entry
return index_entry
def save_media_index_entry(self, index_entry: MediaRegistryIndexEntry) -> bool:
with self._lock:
index = self._load_index()
index.media_index[f"{self._media_api}_{index_entry.media_id}"] = index_entry
self._save_index(index)
logger.debug(f"Saved media record for {index_entry.media_id}")
return True
def save_media_record(self, record: MediaRecord) -> bool:
with self._lock:
self.get_or_create_index_entry(record.media_item.id)
media_id = record.media_item.id
record_file = self._get_media_file_path(media_id)
with AtomicWriter(record_file) as f:
json.dump(record.model_dump(), f, indent=2, default=str)
logger.debug(f"Saved media record for {media_id}")
return True
def get_or_create_record(self, media_item: MediaItem) -> MediaRecord:
record = self.get_media_record(media_item.id)
if record is None:
record = MediaRecord(media_item=media_item)
self.save_media_record(record)
else:
record.media_item = media_item
self.save_media_record(record)
return record
def update_from_player_result(
self, media_item: MediaItem, episode_number: str, player_result: PlayerResult
):
"""Update record from player feedback."""
self.get_or_create_record(media_item)
index = self._load_index()
index_entry = index.media_index[f"{self._media_api}_{media_item.id}"]
index_entry.last_watch_position = player_result.stop_time
index_entry.total_duration = player_result.total_time
index_entry.progress = episode_number
index_entry.last_watched = datetime.now()
index.media_index[f"{self._media_api}_{media_item.id}"] = index_entry
self._save_index(index)
def get_recently_watched(self, limit: int) -> List[MediaRecord]:
"""Get recently watched anime."""
index = self._load_index()
sorted_entries = sorted(
index.media_index.values(), key=lambda x: x.last_watched, reverse=True
)
recent_media = []
for entry in sorted_entries:
record = self.get_media_record(entry.media_id)
if record:
recent_media.append(record.media_item)
if len(recent_media) == limit:
break
return recent_media
def get_registry_stats(self) -> Dict:
"""Get comprehensive registry statistics."""
try:
index = self._load_index()
return {
"total_media_breakdown": index.media_count_breakdown,
"status_breakdown": index.status_breakdown,
"last_updated": index.last_updated.strftime("%Y-%m-%d %H:%M:%S"),
}
except Exception as e:
logger.error(f"Failed to get registry stats: {e}")
return {}
def get_all_media_records(self) -> Generator[MediaRecord, None, List[MediaRecord]]:
records = []
for record_file in self.media_registry_dir.iterdir():
try:
if record_file.is_file():
id = record_file.stem
if record := self.get_media_record(int(id)):
records.append(record)
yield record
else:
logger.warning(
f"{self.media_registry_dir} is impure; ignoring folder: {record_file}"
)
except Exception as e:
logger.warning(f"{self.media_registry_dir} is impure which caused: {e}")
return records
def search_for_media(self, params: ApiSearchParams) -> List[MediaItem]:
"""Search media by title."""
try:
# TODO: enhance performance
media_items = [record.media_item for record in self.get_all_media_records()]
return MediaFilter.apply(media_items, params)
except Exception as e:
logger.error(f"Failed to search media: {e}")
return []
def remove_media_record(self, media_id: int):
with self._lock:
record_file = self._get_media_file_path(media_id)
if record_file.exists():
record_file.unlink()
try:
record_file.parent.rmdir()
except OSError:
pass
index = self._load_index()
id = f"{self._media_api}_{media_id}"
if id in index.media_index:
del index.media_index[id]
self._save_index(index)
logger.debug(f"Removed media record {media_id}")

View File

@@ -1,289 +0,0 @@
"""
Unified Media Tracker for player integration and real-time updates.
Provides automatic tracking of watch progress and download completion
through a single interface.
"""
from __future__ import annotations
import logging
from typing import Optional
from ....libs.api.types import MediaItem
from ....libs.players.types import PlayerResult
from .manager import MediaRegistryManager, get_media_registry
logger = logging.getLogger(__name__)
class MediaTracker:
"""
Unified tracker for media interactions.
Handles automatic updates from player results and download completion,
providing seamless integration between watching and downloading.
"""
def __init__(self, registry_manager: MediaRegistryManager = None):
self.registry = registry_manager or get_media_registry()
def track_episode_start(self, media_item: MediaItem, episode: int) -> bool:
"""
Track when episode playback starts.
Args:
media_item: The anime being watched
episode: Episode number being started
Returns:
True if tracking was successful
"""
try:
record = self.registry.get_or_create_record(media_item)
episode_status = record.get_episode_status(episode)
# Only update to "watching" if not already completed
if episode_status.watch_status not in ["completed"]:
episode_status.watch_status = "watching"
# Update overall user status if still planning
if record.user_data.status == "planning":
record.user_data.status = "watching"
return self.registry.save_media_record(record)
except Exception as e:
logger.error(f"Failed to track episode start: {e}")
return False
def track_from_player_result(self, media_item: MediaItem, episode: int,
player_result: PlayerResult) -> bool:
"""
Update watch status based on actual player feedback.
Args:
media_item: The anime that was watched
episode: Episode number that was watched
player_result: Result from the player session
Returns:
True if tracking was successful
"""
try:
if not player_result.stop_time or not player_result.total_time:
logger.warning("PlayerResult missing timing data - cannot track accurately")
return False
return self.registry.update_from_player_result(
media_item, episode, player_result.stop_time, player_result.total_time
)
except Exception as e:
logger.error(f"Failed to track from player result: {e}")
return False
def track_download_completion(self, media_item: MediaItem, episode: int,
file_path, file_size: int, quality: str,
checksum: Optional[str] = None) -> bool:
"""
Update status when download completes.
Args:
media_item: The anime that was downloaded
episode: Episode number that was downloaded
file_path: Path to downloaded file
file_size: File size in bytes
quality: Download quality
checksum: Optional file checksum
Returns:
True if tracking was successful
"""
try:
from pathlib import Path
file_path = Path(file_path) if not isinstance(file_path, Path) else file_path
return self.registry.update_download_completion(
media_item, episode, file_path, file_size, quality, checksum
)
except Exception as e:
logger.error(f"Failed to track download completion: {e}")
return False
def get_continue_episode(self, media_item: MediaItem,
available_episodes: list) -> Optional[str]:
"""
Get episode to continue watching based on history.
Args:
media_item: The anime
available_episodes: List of available episode numbers
Returns:
Episode number to continue from or None
"""
try:
return self.registry.get_continue_episode(
media_item.id, [str(ep) for ep in available_episodes]
)
except Exception as e:
logger.error(f"Failed to get continue episode: {e}")
return None
def get_watch_progress(self, media_id: int) -> Optional[dict]:
"""
Get current watch progress for an anime.
Args:
media_id: ID of the anime
Returns:
Dictionary with progress info or None if not found
"""
try:
record = self.registry.get_media_record(media_id)
if not record:
return None
return {
"last_episode": record.last_watched_episode,
"next_episode": record.next_episode_to_watch,
"status": record.user_data.status,
"title": record.display_title,
"watch_percentage": record.watch_completion_percentage,
"download_percentage": record.download_completion_percentage,
"episodes_watched": record.total_episodes_watched,
"episodes_downloaded": record.total_episodes_downloaded,
}
except Exception as e:
logger.error(f"Failed to get watch progress: {e}")
return None
def update_anime_status(self, media_id: int, status: str) -> bool:
"""
Update overall anime status.
Args:
media_id: ID of the anime
status: New status (planning, watching, completed, dropped, paused)
Returns:
True if update was successful
"""
try:
record = self.registry.get_media_record(media_id)
if not record:
return False
if status in ["planning", "watching", "completed", "dropped", "paused"]:
record.user_data.status = status
record.user_data.update_timestamp()
return self.registry.save_media_record(record)
return False
except Exception as e:
logger.error(f"Failed to update anime status: {e}")
return False
def add_anime_to_registry(self, media_item: MediaItem, status: str = "planning") -> bool:
"""
Add anime to registry with initial status.
Args:
media_item: The anime to add
status: Initial status
Returns:
True if added successfully
"""
try:
record = self.registry.get_or_create_record(media_item)
if status in ["planning", "watching", "completed", "dropped", "paused"]:
record.user_data.status = status
record.user_data.update_timestamp()
return self.registry.save_media_record(record)
return False
except Exception as e:
logger.error(f"Failed to add anime to registry: {e}")
return False
def should_auto_download_next(self, media_id: int) -> Optional[int]:
"""
Check if next episode should be auto-downloaded based on watch progress.
Args:
media_id: ID of the anime
Returns:
Episode number to download or None
"""
try:
record = self.registry.get_media_record(media_id)
if not record or not record.user_data.auto_download_new:
return None
# Only if currently watching
if record.user_data.status != "watching":
return None
next_episode = record.next_episode_to_watch
if not next_episode:
return None
# Check if already downloaded
episode_status = record.episodes.get(next_episode)
if episode_status and episode_status.is_available_locally:
return None
return next_episode
except Exception as e:
logger.error(f"Failed to check auto download: {e}")
return None
# Global tracker instance
_media_tracker: Optional[MediaTracker] = None
def get_media_tracker() -> MediaTracker:
"""Get or create the global media tracker instance."""
global _media_tracker
if _media_tracker is None:
_media_tracker = MediaTracker()
return _media_tracker
# Convenience functions for backward compatibility
def track_episode_viewing(media_item: MediaItem, episode: int, start_tracking: bool = True) -> bool:
"""Track episode viewing (backward compatibility)."""
tracker = get_media_tracker()
return tracker.track_episode_start(media_item, episode)
def get_continue_episode(media_item: MediaItem, available_episodes: list,
prefer_history: bool = True) -> Optional[str]:
"""Get continue episode (backward compatibility)."""
if not prefer_history:
return None
tracker = get_media_tracker()
return tracker.get_continue_episode(media_item, available_episodes)
def update_episode_progress(media_id: int, episode: int, completion_percentage: float) -> bool:
"""Update episode progress (backward compatibility)."""
# This would need more context to implement properly with PlayerResult
# For now, just mark as watched if 80%+
if completion_percentage >= 80:
tracker = get_media_tracker()
registry = get_media_registry()
return registry.mark_episode_watched(media_id, episode, completion_percentage / 100)
return True

View File

@@ -1,273 +0,0 @@
"""
Watch history tracking utilities for integration with episode viewing and player controls.
Provides automatic watch history updates during episode viewing.
"""
import logging
from typing import Optional
from ....libs.api.types import MediaItem
from .manager import WatchHistoryManager
logger = logging.getLogger(__name__)
class WatchHistoryTracker:
"""
Tracks watch history automatically during episode viewing.
Integrates with the episode selection and player control systems.
"""
def __init__(self):
self.history_manager = WatchHistoryManager()
def track_episode_start(self, media_item: MediaItem, episode: int) -> bool:
"""
Track when an episode starts being watched.
Args:
media_item: The anime being watched
episode: Episode number being started
Returns:
True if tracking was successful
"""
try:
# Update or create watch history entry
success = self.history_manager.add_or_update_entry(
media_item=media_item,
episode=episode,
progress=0.0,
status="watching"
)
if success:
logger.info(f"Started tracking episode {episode} of {media_item.title.english or media_item.title.romaji}")
return success
except Exception as e:
logger.error(f"Failed to track episode start: {e}")
return False
def track_episode_progress(self, media_id: int, episode: int, progress: float) -> bool:
"""
Track progress within an episode.
Args:
media_id: ID of the anime
episode: Episode number
progress: Progress within the episode (0.0-1.0)
Returns:
True if tracking was successful
"""
try:
success = self.history_manager.mark_episode_watched(media_id, episode, progress)
if success and progress >= 0.8: # Consider episode "watched" at 80%
logger.info(f"Episode {episode} marked as watched (progress: {progress:.1%})")
return success
except Exception as e:
logger.error(f"Failed to track episode progress: {e}")
return False
def track_episode_completion(self, media_id: int, episode: int) -> bool:
"""
Track when an episode is completed.
Args:
media_id: ID of the anime
episode: Episode number completed
Returns:
True if tracking was successful
"""
try:
# Mark episode as fully watched
success = self.history_manager.mark_episode_watched(media_id, episode, 1.0)
if success:
# Check if this was the final episode and mark as completed
entry = self.history_manager.get_entry(media_id)
if entry and entry.media_item.episodes and episode >= entry.media_item.episodes:
self.history_manager.mark_completed(media_id)
logger.info(f"Anime completed: {entry.get_display_title()}")
else:
logger.info(f"Episode {episode} completed")
return success
except Exception as e:
logger.error(f"Failed to track episode completion: {e}")
return False
def get_watch_progress(self, media_id: int) -> Optional[dict]:
"""
Get current watch progress for an anime.
Args:
media_id: ID of the anime
Returns:
Dictionary with progress info or None if not found
"""
try:
entry = self.history_manager.get_entry(media_id)
if entry:
return {
"last_episode": entry.last_watched_episode,
"progress": entry.watch_progress,
"status": entry.status,
"next_episode": entry.last_watched_episode + 1,
"title": entry.get_display_title(),
}
return None
except Exception as e:
logger.error(f"Failed to get watch progress: {e}")
return None
def should_continue_from_history(self, media_id: int, available_episodes: list) -> Optional[str]:
"""
Determine if we should continue from watch history and which episode.
Args:
media_id: ID of the anime
available_episodes: List of available episode numbers
Returns:
Episode number to continue from, or None if no history
"""
try:
progress = self.get_watch_progress(media_id)
if not progress:
return None
last_episode = progress["last_episode"]
next_episode = last_episode + 1
# Check if next episode is available
if str(next_episode) in available_episodes:
logger.info(f"Continuing from episode {next_episode} based on watch history")
return str(next_episode)
# Fall back to last watched episode if next isn't available
elif str(last_episode) in available_episodes and last_episode > 0:
logger.info(f"Next episode not available, falling back to episode {last_episode}")
return str(last_episode)
return None
except Exception as e:
logger.error(f"Failed to determine continue episode: {e}")
return None
def update_anime_status(self, media_id: int, status: str) -> bool:
"""
Update the status of an anime in watch history.
Args:
media_id: ID of the anime
status: New status (watching, completed, dropped, paused)
Returns:
True if update was successful
"""
try:
success = self.history_manager.change_status(media_id, status)
if success:
logger.info(f"Updated anime status to {status}")
return success
except Exception as e:
logger.error(f"Failed to update anime status: {e}")
return False
def add_anime_to_history(self, media_item: MediaItem, status: str = "planning") -> bool:
"""
Add an anime to watch history without watching any episodes.
Args:
media_item: The anime to add
status: Initial status
Returns:
True if successful
"""
try:
success = self.history_manager.add_or_update_entry(
media_item=media_item,
episode=0,
progress=0.0,
status=status
)
if success:
logger.info(f"Added {media_item.title.english or media_item.title.romaji} to watch history")
return success
except Exception as e:
logger.error(f"Failed to add anime to history: {e}")
return False
# Global tracker instance for use throughout the application
watch_tracker = WatchHistoryTracker()
def track_episode_viewing(media_item: MediaItem, episode: int, start_tracking: bool = True) -> bool:
"""
Convenience function to track episode viewing.
Args:
media_item: The anime being watched
episode: Episode number
start_tracking: Whether to start tracking (True) or just update progress
Returns:
True if tracking was successful
"""
if start_tracking:
return watch_tracker.track_episode_start(media_item, episode)
else:
return watch_tracker.track_episode_completion(media_item.id, episode)
def get_continue_episode(media_item: MediaItem, available_episodes: list, prefer_history: bool = True) -> Optional[str]:
"""
Get the episode to continue from based on watch history.
Args:
media_item: The anime
available_episodes: List of available episodes
prefer_history: Whether to prefer local history over remote
Returns:
Episode number to continue from
"""
if prefer_history:
return watch_tracker.should_continue_from_history(media_item.id, available_episodes)
return None
def update_episode_progress(media_id: int, episode: int, completion_percentage: float) -> bool:
"""
Update progress for an episode based on completion percentage.
Args:
media_id: ID of the anime
episode: Episode number
completion_percentage: Completion percentage (0-100)
Returns:
True if update was successful
"""
progress = completion_percentage / 100.0
if completion_percentage >= 80: # Consider episode completed at 80%
return watch_tracker.track_episode_completion(media_id, episode)
else:
return watch_tracker.track_episode_progress(media_id, episode, progress)

View File

@@ -1,187 +0,0 @@
"""
Watch history data models and types for the interactive CLI.
Provides comprehensive data structures for tracking and managing local watch history.
"""
from __future__ import annotations
import logging
from datetime import datetime
from typing import Dict, List, Optional
from pydantic import BaseModel, Field
from ....libs.api.types import MediaItem
logger = logging.getLogger(__name__)
class WatchHistoryEntry(BaseModel):
"""
Represents a single entry in the watch history.
Contains media information and viewing progress.
"""
media_item: MediaItem
last_watched_episode: int = 0
watch_progress: float = 0.0 # Progress within the episode (0.0-1.0)
times_watched: int = 1
first_watched: datetime = Field(default_factory=datetime.now)
last_watched: datetime = Field(default_factory=datetime.now)
status: str = "watching" # watching, completed, dropped, paused
notes: str = ""
# Download integration fields
has_downloads: bool = Field(default=False, description="Whether episodes are downloaded")
offline_available: bool = Field(default=False, description="Can watch offline")
# With Pydantic, serialization is automatic!
# No need for manual to_dict() and from_dict() methods
# Use: entry.model_dump() and WatchHistoryEntry.model_validate(data)
def update_progress(self, episode: int, progress: float = 0.0, status: Optional[str] = None):
"""Update watch progress for this entry."""
self.last_watched_episode = max(self.last_watched_episode, episode)
self.watch_progress = progress
self.last_watched = datetime.now()
if status:
self.status = status
def mark_completed(self):
"""Mark this entry as completed."""
self.status = "completed"
self.last_watched = datetime.now()
if self.media_item.episodes:
self.last_watched_episode = self.media_item.episodes
self.watch_progress = 1.0
def get_display_title(self) -> str:
"""Get the best available title for display."""
if self.media_item.title.english:
return self.media_item.title.english
elif self.media_item.title.romaji:
return self.media_item.title.romaji
elif self.media_item.title.native:
return self.media_item.title.native
else:
return f"Anime #{self.media_item.id}"
def get_progress_display(self) -> str:
"""Get a human-readable progress display."""
if self.media_item.episodes:
return f"{self.last_watched_episode}/{self.media_item.episodes}"
else:
return f"Ep {self.last_watched_episode}"
def get_status_emoji(self) -> str:
"""Get emoji representation of status."""
status_emojis = {
"watching": "📺",
"completed": "",
"dropped": "🚮",
"paused": "⏸️",
"planning": "📑"
}
return status_emojis.get(self.status, "")
class WatchHistoryData(BaseModel):
"""Complete watch history data container."""
entries: Dict[int, WatchHistoryEntry] = Field(default_factory=dict)
last_updated: datetime = Field(default_factory=datetime.now)
format_version: str = "1.0"
# With Pydantic, serialization is automatic!
# No need for manual to_dict() and from_dict() methods
# Use: data.model_dump() and WatchHistoryData.model_validate(data)
def add_or_update_entry(self, media_item: MediaItem, episode: int = 0, progress: float = 0.0, status: str = "watching") -> WatchHistoryEntry:
"""Add or update a watch history entry."""
media_id = media_item.id
if media_id in self.entries:
# Update existing entry
entry = self.entries[media_id]
entry.update_progress(episode, progress, status)
entry.times_watched += 1
else:
# Create new entry
entry = WatchHistoryEntry(
media_item=media_item,
last_watched_episode=episode,
watch_progress=progress,
status=status,
)
self.entries[media_id] = entry
self.last_updated = datetime.now()
return entry
def get_entry(self, media_id: int) -> Optional[WatchHistoryEntry]:
"""Get a specific watch history entry."""
return self.entries.get(media_id)
def remove_entry(self, media_id: int) -> bool:
"""Remove an entry from watch history."""
if media_id in self.entries:
del self.entries[media_id]
self.last_updated = datetime.now()
return True
return False
def get_entries_by_status(self, status: str) -> List[WatchHistoryEntry]:
"""Get all entries with a specific status."""
return [entry for entry in self.entries.values() if entry.status == status]
def get_recently_watched(self, limit: int = 10) -> List[WatchHistoryEntry]:
"""Get recently watched entries."""
sorted_entries = sorted(
self.entries.values(),
key=lambda x: x.last_watched,
reverse=True
)
return sorted_entries[:limit]
def get_watching_entries(self) -> List[WatchHistoryEntry]:
"""Get entries that are currently being watched."""
return self.get_entries_by_status("watching")
def get_completed_entries(self) -> List[WatchHistoryEntry]:
"""Get completed entries."""
return self.get_entries_by_status("completed")
def search_entries(self, query: str) -> List[WatchHistoryEntry]:
"""Search entries by title."""
query_lower = query.lower()
results = []
for entry in self.entries.values():
title = entry.get_display_title().lower()
if query_lower in title:
results.append(entry)
return results
def get_stats(self) -> dict:
"""Get watch history statistics."""
total_entries = len(self.entries)
watching = len(self.get_entries_by_status("watching"))
completed = len(self.get_entries_by_status("completed"))
dropped = len(self.get_entries_by_status("dropped"))
paused = len(self.get_entries_by_status("paused"))
total_episodes = sum(
entry.last_watched_episode
for entry in self.entries.values()
)
return {
"total_entries": total_entries,
"watching": watching,
"completed": completed,
"dropped": dropped,
"paused": paused,
"total_episodes_watched": total_episodes,
"last_updated": self.last_updated.strftime("%Y-%m-%d %H:%M:%S"),
}

View File

@@ -0,0 +1,10 @@
def time_to_seconds(time_str: str) -> int:
"""Convert HH:MM:SS to seconds."""
try:
parts = time_str.split(":")
if len(parts) == 3:
h, m, s = map(int, parts)
return h * 3600 + m * 60 + s
except (ValueError, AttributeError):
pass
return 0

View File

@@ -0,0 +1,86 @@
# fastanime/core/config/defaults.py
from ..constants import APP_DATA_DIR, APP_NAME, USER_VIDEOS_DIR
# GeneralConfig
GENERAL_PYGMENT_STYLE = "github-dark"
GENERAL_API_CLIENT = "anilist"
GENERAL_PROVIDER = "allanime"
GENERAL_SELECTOR = "default"
GENERAL_AUTO_SELECT_ANIME_RESULT = True
GENERAL_ICONS = False
GENERAL_PREVIEW = "none"
GENERAL_IMAGE_RENDERER = "chafa"
GENERAL_MANGA_VIEWER = "feh"
GENERAL_CHECK_FOR_UPDATES = True
GENERAL_CACHE_REQUESTS = True
GENERAL_MAX_CACHE_LIFETIME = "03:00:00"
GENERAL_NORMALIZE_TITLES = True
GENERAL_DISCORD = False
GENERAL_RECENT = 50
# StreamConfig
STREAM_PLAYER = "mpv"
STREAM_QUALITY = "1080"
STREAM_TRANSLATION_TYPE = "sub"
STREAM_SERVER = "TOP"
STREAM_AUTO_NEXT = False
STREAM_CONTINUE_FROM_WATCH_HISTORY = True
STREAM_PREFERRED_WATCH_HISTORY = "local"
STREAM_AUTO_SKIP = False
STREAM_EPISODE_COMPLETE_AT = 80
STREAM_YTDLP_FORMAT = "best[height<=1080]/bestvideo[height<=1080]+bestaudio/best"
STREAM_FORCE_FORWARD_TRACKING = True
STREAM_DEFAULT_MEDIA_LIST_TRACKING = "prompt"
STREAM_SUB_LANG = "eng"
# ServiceConfig
SERVICE_ENABLED = False
SERVICE_WATCHLIST_CHECK_INTERVAL = 30
SERVICE_QUEUE_PROCESS_INTERVAL = 1
SERVICE_MAX_CONCURRENT_DOWNLOADS = 3
SERVICE_AUTO_RETRY_COUNT = 3
SERVICE_CLEANUP_COMPLETED_DAYS = 7
SERVICE_NOTIFICATION_ENABLED = True
# FzfConfig
FZF_HEADER_COLOR = "95,135,175"
FZF_PREVIEW_HEADER_COLOR = "215,0,95"
FZF_PREVIEW_SEPARATOR_COLOR = "208,208,208"
# MpvConfig
MPV_ARGS = ""
MPV_PRE_ARGS = ""
MPV_DISABLE_POPEN = True
MPV_USE_PYTHON_MPV = False
# VlcConfig
VLC_ARGS = ""
# AnilistConfig
ANILIST_PER_PAGE = 15
ANILIST_SORT_BY = "SEARCH_MATCH"
ANILIST_PREFERRED_LANGUAGE = "english"
# DownloadsConfig
DOWNLOADS_DOWNLOADER = "auto"
DOWNLOADS_DOWNLOADS_DIR = USER_VIDEOS_DIR
DOWNLOADS_ENABLE_TRACKING = True
DOWNLOADS_AUTO_ORGANIZE = True
DOWNLOADS_MAX_CONCURRENT = 3
DOWNLOADS_AUTO_CLEANUP_FAILED = True
DOWNLOADS_RETENTION_DAYS = 30
DOWNLOADS_SYNC_WITH_WATCH_HISTORY = True
DOWNLOADS_AUTO_MARK_OFFLINE = True
DOWNLOADS_NAMING_TEMPLATE = "{title}/Season {season:02d}/{episode:02d} - {episode_title}.{ext}"
DOWNLOADS_PREFERRED_QUALITY = "1080"
DOWNLOADS_DOWNLOAD_SUBTITLES = True
DOWNLOADS_SUBTITLE_LANGUAGES = ["en"]
DOWNLOADS_QUEUE_MAX_SIZE = 100
DOWNLOADS_AUTO_START_DOWNLOADS = True
DOWNLOADS_RETRY_ATTEMPTS = 3
DOWNLOADS_RETRY_DELAY = 300
# RegistryConfig
MEDIA_REGISTRY_DIR = USER_VIDEOS_DIR / APP_NAME / "registry"
MEDIA_REGISTRY_INDEX_DIR = APP_DATA_DIR

View File

@@ -0,0 +1,132 @@
# GeneralConfig
GENERAL_PYGMENT_STYLE = "The pygment style to use"
GENERAL_API_CLIENT = "The media database API to use (e.g., 'anilist', 'jikan')."
GENERAL_PROVIDER = "The default anime provider to use for scraping."
GENERAL_SELECTOR = "The interactive selector tool to use for menus."
GENERAL_AUTO_SELECT_ANIME_RESULT = (
"Automatically select the best-matching search result from a provider."
)
GENERAL_ICONS = "Display emoji icons in the user interface."
GENERAL_PREVIEW = "Type of preview to display in selectors."
GENERAL_IMAGE_RENDERER = (
"The command-line tool to use for rendering images in the terminal."
)
GENERAL_MANGA_VIEWER = "The external application to use for viewing manga pages."
GENERAL_CHECK_FOR_UPDATES = (
"Automatically check for new versions of FastAnime on startup."
)
GENERAL_CACHE_REQUESTS = (
"Enable caching of network requests to speed up subsequent operations."
)
GENERAL_MAX_CACHE_LIFETIME = "Maximum lifetime for a cached request in DD:HH:MM format."
GENERAL_NORMALIZE_TITLES = (
"Attempt to normalize provider titles to match AniList titles."
)
GENERAL_DISCORD = "Enable Discord Rich Presence to show your current activity."
GENERAL_RECENT = "Number of recently watched anime to keep in history."
# StreamConfig
STREAM_PLAYER = "The media player to use for streaming."
STREAM_QUALITY = "Preferred stream quality."
STREAM_TRANSLATION_TYPE = "Preferred audio/subtitle language type."
STREAM_SERVER = (
"The default server to use from a provider. 'top' uses the first available."
)
STREAM_AUTO_NEXT = "Automatically play the next episode when the current one finishes."
STREAM_CONTINUE_FROM_WATCH_HISTORY = (
"Automatically resume playback from the last known episode and position."
)
STREAM_PREFERRED_WATCH_HISTORY = (
"Which watch history to prioritize: local file or remote AniList progress."
)
STREAM_AUTO_SKIP = "Automatically skip openings/endings if skip data is available."
STREAM_EPISODE_COMPLETE_AT = (
"Percentage of an episode to watch before it's marked as complete."
)
STREAM_YTDLP_FORMAT = "The format selection string for yt-dlp."
STREAM_FORCE_FORWARD_TRACKING = (
"Prevent updating AniList progress to a lower episode number."
)
STREAM_DEFAULT_MEDIA_LIST_TRACKING = (
"Default behavior for tracking progress on AniList."
)
STREAM_SUB_LANG = "Preferred language code for subtitles (e.g., 'en', 'es')."
# ServiceConfig
SERVICE_ENABLED = "Whether the background service should be enabled by default."
SERVICE_WATCHLIST_CHECK_INTERVAL = (
"Minutes between checking AniList watchlist for new episodes."
)
SERVICE_QUEUE_PROCESS_INTERVAL = "Minutes between processing the download queue."
SERVICE_MAX_CONCURRENT_DOWNLOADS = "Maximum number of concurrent downloads."
SERVICE_AUTO_RETRY_COUNT = "Number of times to retry failed downloads."
SERVICE_CLEANUP_COMPLETED_DAYS = (
"Days to keep completed/failed jobs in queue before cleanup."
)
SERVICE_NOTIFICATION_ENABLED = "Whether to show notifications for new episodes."
# FzfConfig
FZF_HEADER_COLOR = "RGB color for the main TUI header."
FZF_PREVIEW_HEADER_COLOR = "RGB color for preview pane headers."
FZF_PREVIEW_SEPARATOR_COLOR = "RGB color for preview pane separators."
FZF_OPTS = "The FZF options, formatted with leading tabs for the config file."
FZF_HEADER_ASCII_ART = "The ASCII art to display as a header in the FZF interface."
# RofiConfig
ROFI_THEME_MAIN = "Path to the main Rofi theme file."
ROFI_THEME_PREVIEW = "Path to the Rofi theme file for previews."
ROFI_THEME_CONFIRM = "Path to the Rofi theme file for confirmation prompts."
ROFI_THEME_INPUT = "Path to the Rofi theme file for user input prompts."
# MpvConfig
MPV_ARGS = "Comma-separated arguments to pass to the MPV player."
MPV_PRE_ARGS = "Comma-separated arguments to prepend before the MPV command."
MPV_DISABLE_POPEN = (
"Disable using subprocess.Popen for MPV, which can be unstable on some systems."
)
MPV_USE_PYTHON_MPV = "Use the python-mpv library for enhanced player control."
# VlcConfig
VLC_ARGS = "Comma-separated arguments to pass to the Vlc player."
# AnilistConfig
ANILIST_PER_PAGE = "Number of items to fetch per page from AniList."
ANILIST_SORT_BY = "Default sort order for AniList search results."
ANILIST_PREFERRED_LANGUAGE = "Preferred language for anime titles from AniList."
# DownloadsConfig
DOWNLOADS_DOWNLOADER = "The downloader to use"
DOWNLOADS_DOWNLOADS_DIR = "The default directory to save downloaded anime."
DOWNLOADS_ENABLE_TRACKING = "Enable download tracking and management"
DOWNLOADS_AUTO_ORGANIZE = "Automatically organize downloads by anime title"
DOWNLOADS_MAX_CONCURRENT = "Maximum concurrent downloads"
DOWNLOADS_AUTO_CLEANUP_FAILED = "Automatically cleanup failed downloads"
DOWNLOADS_RETENTION_DAYS = "Days to keep failed downloads before cleanup"
DOWNLOADS_SYNC_WITH_WATCH_HISTORY = "Sync download status with watch history"
DOWNLOADS_AUTO_MARK_OFFLINE = (
"Automatically mark downloaded episodes as available offline"
)
DOWNLOADS_NAMING_TEMPLATE = "File naming template for downloaded episodes"
DOWNLOADS_PREFERRED_QUALITY = "Preferred download quality"
DOWNLOADS_DOWNLOAD_SUBTITLES = "Download subtitles when available"
DOWNLOADS_SUBTITLE_LANGUAGES = "Preferred subtitle languages"
DOWNLOADS_QUEUE_MAX_SIZE = "Maximum number of items in download queue"
DOWNLOADS_AUTO_START_DOWNLOADS = "Automatically start downloads when items are queued"
DOWNLOADS_RETRY_ATTEMPTS = "Number of retry attempts for failed downloads"
DOWNLOADS_RETRY_DELAY = "Delay between retry attempts in seconds"
# RegistryConfig
MEDIA_REGISTRY_DIR = "The default directory to save media registry"
MEDIA_REGISTRY_INDEX_DIR = "The default directory to save media registry index"
# AppConfig
APP_GENERAL = "General configuration settings for application behavior."
APP_STREAM = "Settings related to video streaming and playback."
APP_DOWNLOADS = "Settings related to downloading"
APP_ANILIST = "Configuration for AniList API integration."
APP_SERVICE = "Configuration for the background download service."
APP_FZF = "Settings for the FZF selector interface."
APP_ROFI = "Settings for the Rofi selector interface."
APP_MPV = "Configuration for the MPV media player."
APP_MEDIA_REGISTRY = "Configuration for the media registry."

View File

@@ -13,7 +13,191 @@ from ...core.constants import (
)
from ...libs.api.anilist.constants import SORTS_AVAILABLE
from ...libs.providers.anime import PROVIDERS_AVAILABLE, SERVERS_AVAILABLE
from ..constants import APP_ASCII_ART, USER_VIDEOS_DIR
from ..constants import APP_ASCII_ART, APP_DATA_DIR, USER_VIDEOS_DIR
from . import defaults
from . import descriptions as desc
class GeneralConfig(BaseModel):
"""Configuration for general application behavior and integrations."""
pygment_style: str = Field(
default=defaults.GENERAL_PYGMENT_STYLE, description=desc.GENERAL_PYGMENT_STYLE
)
api_client: Literal["anilist", "jikan"] = Field(
default=defaults.GENERAL_API_CLIENT,
description=desc.GENERAL_API_CLIENT,
)
provider: str = Field(
default=defaults.GENERAL_PROVIDER,
description=desc.GENERAL_PROVIDER,
examples=list(PROVIDERS_AVAILABLE.keys()),
)
selector: Literal["default", "fzf", "rofi"] = Field(
default=defaults.GENERAL_SELECTOR, description=desc.GENERAL_SELECTOR
)
auto_select_anime_result: bool = Field(
default=defaults.GENERAL_AUTO_SELECT_ANIME_RESULT,
description=desc.GENERAL_AUTO_SELECT_ANIME_RESULT,
)
icons: bool = Field(default=defaults.GENERAL_ICONS, description=desc.GENERAL_ICONS)
preview: Literal["full", "text", "image", "none"] = Field(
default=defaults.GENERAL_PREVIEW, description=desc.GENERAL_PREVIEW
)
image_renderer: Literal["icat", "chafa", "imgcat"] = Field(
default="icat"
if os.environ.get("KITTY_WINDOW_ID")
else defaults.GENERAL_IMAGE_RENDERER,
description=desc.GENERAL_IMAGE_RENDERER,
)
manga_viewer: Literal["feh", "icat"] = Field(
default=defaults.GENERAL_MANGA_VIEWER,
description=desc.GENERAL_MANGA_VIEWER,
)
check_for_updates: bool = Field(
default=defaults.GENERAL_CHECK_FOR_UPDATES,
description=desc.GENERAL_CHECK_FOR_UPDATES,
)
cache_requests: bool = Field(
default=defaults.GENERAL_CACHE_REQUESTS,
description=desc.GENERAL_CACHE_REQUESTS,
)
max_cache_lifetime: str = Field(
default=defaults.GENERAL_MAX_CACHE_LIFETIME,
description=desc.GENERAL_MAX_CACHE_LIFETIME,
)
normalize_titles: bool = Field(
default=defaults.GENERAL_NORMALIZE_TITLES,
description=desc.GENERAL_NORMALIZE_TITLES,
)
discord: bool = Field(
default=defaults.GENERAL_DISCORD,
description=desc.GENERAL_DISCORD,
)
recent: int = Field(
default=defaults.GENERAL_RECENT,
ge=0,
description=desc.GENERAL_RECENT,
)
@field_validator("provider")
@classmethod
def validate_provider(cls, v: str) -> str:
if v not in PROVIDERS_AVAILABLE:
raise ValueError(
f"'{v}' is not a valid provider. Must be one of: {PROVIDERS_AVAILABLE}"
)
return v
class StreamConfig(BaseModel):
"""Configuration specific to video streaming and playback."""
player: Literal["mpv", "vlc"] = Field(
default=defaults.STREAM_PLAYER, description=desc.STREAM_PLAYER
)
quality: Literal["360", "480", "720", "1080"] = Field(
default=defaults.STREAM_QUALITY, description=desc.STREAM_QUALITY
)
translation_type: Literal["sub", "dub"] = Field(
default=defaults.STREAM_TRANSLATION_TYPE,
description=desc.STREAM_TRANSLATION_TYPE,
)
server: str = Field(
default=defaults.STREAM_SERVER,
description=desc.STREAM_SERVER,
examples=SERVERS_AVAILABLE,
)
auto_next: bool = Field(
default=defaults.STREAM_AUTO_NEXT,
description=desc.STREAM_AUTO_NEXT,
)
continue_from_watch_history: bool = Field(
default=defaults.STREAM_CONTINUE_FROM_WATCH_HISTORY,
description=desc.STREAM_CONTINUE_FROM_WATCH_HISTORY,
)
preferred_watch_history: Literal["local", "remote"] = Field(
default=defaults.STREAM_PREFERRED_WATCH_HISTORY,
description=desc.STREAM_PREFERRED_WATCH_HISTORY,
)
auto_skip: bool = Field(
default=defaults.STREAM_AUTO_SKIP,
description=desc.STREAM_AUTO_SKIP,
)
episode_complete_at: int = Field(
default=defaults.STREAM_EPISODE_COMPLETE_AT,
ge=0,
le=100,
description=desc.STREAM_EPISODE_COMPLETE_AT,
)
ytdlp_format: str = Field(
default=defaults.STREAM_YTDLP_FORMAT,
description=desc.STREAM_YTDLP_FORMAT,
)
force_forward_tracking: bool = Field(
default=defaults.STREAM_FORCE_FORWARD_TRACKING,
description=desc.STREAM_FORCE_FORWARD_TRACKING,
)
default_media_list_tracking: Literal["track", "disabled", "prompt"] = Field(
default=defaults.STREAM_DEFAULT_MEDIA_LIST_TRACKING,
description=desc.STREAM_DEFAULT_MEDIA_LIST_TRACKING,
)
sub_lang: str = Field(
default=defaults.STREAM_SUB_LANG,
description=desc.STREAM_SUB_LANG,
)
@field_validator("server")
@classmethod
def validate_server(cls, v: str) -> str:
if v.lower() != "top" and v not in SERVERS_AVAILABLE:
raise ValueError(
f"'{v}' is not a valid server. Must be 'top' or one of: {SERVERS_AVAILABLE}"
)
return v
class ServiceConfig(BaseModel):
"""Configuration for the background download service."""
enabled: bool = Field(
default=defaults.SERVICE_ENABLED,
description=desc.SERVICE_ENABLED,
)
watchlist_check_interval: int = Field(
default=defaults.SERVICE_WATCHLIST_CHECK_INTERVAL,
ge=5,
le=180,
description=desc.SERVICE_WATCHLIST_CHECK_INTERVAL,
)
queue_process_interval: int = Field(
default=defaults.SERVICE_QUEUE_PROCESS_INTERVAL,
ge=1,
le=60,
description=desc.SERVICE_QUEUE_PROCESS_INTERVAL,
)
max_concurrent_downloads: int = Field(
default=defaults.SERVICE_MAX_CONCURRENT_DOWNLOADS,
ge=1,
le=10,
description=desc.SERVICE_MAX_CONCURRENT_DOWNLOADS,
)
auto_retry_count: int = Field(
default=defaults.SERVICE_AUTO_RETRY_COUNT,
ge=0,
le=10,
description=desc.SERVICE_AUTO_RETRY_COUNT,
)
cleanup_completed_days: int = Field(
default=defaults.SERVICE_CLEANUP_COMPLETED_DAYS,
ge=1,
le=30,
description=desc.SERVICE_CLEANUP_COMPLETED_DAYS,
)
notification_enabled: bool = Field(
default=defaults.SERVICE_NOTIFICATION_ENABLED,
description=desc.SERVICE_NOTIFICATION_ENABLED,
)
class OtherConfig(BaseModel):
@@ -25,14 +209,16 @@ class FzfConfig(OtherConfig):
_opts: str = PrivateAttr(default=FZF_DEFAULT_OPTS.read_text(encoding="utf-8"))
header_color: str = Field(
default="95,135,175", description="RGB color for the main TUI header."
default=defaults.FZF_HEADER_COLOR, description=desc.FZF_HEADER_COLOR
)
_header_ascii_art: str = PrivateAttr(default=APP_ASCII_ART)
preview_header_color: str = Field(
default="215,0,95", description="RGB color for preview pane headers."
default=defaults.FZF_PREVIEW_HEADER_COLOR,
description=desc.FZF_PREVIEW_HEADER_COLOR,
)
preview_separator_color: str = Field(
default="208,208,208", description="RGB color for preview pane separators."
default=defaults.FZF_PREVIEW_SEPARATOR_COLOR,
description=desc.FZF_PREVIEW_SEPARATOR_COLOR,
)
def __init__(self, **kwargs):
@@ -45,16 +231,12 @@ class FzfConfig(OtherConfig):
if header_ascii_art:
self._header_ascii_art = header_ascii_art
@computed_field(
description="The FZF options, formatted with leading tabs for the config file."
)
@computed_field(description=desc.FZF_OPTS)
@property
def opts(self) -> str:
return "\n" + "\n".join([f"\t{line}" for line in self._opts.split()])
@computed_field(
description="The ASCII art to display as a header in the FZF interface."
)
@computed_field(description=desc.FZF_HEADER_ASCII_ART)
@property
def header_ascii_art(self) -> str:
return "\n" + "\n".join(
@@ -67,67 +249,63 @@ class RofiConfig(OtherConfig):
theme_main: Path = Field(
default=Path(str(ROFI_THEME_MAIN)),
description="Path to the main Rofi theme file.",
description=desc.ROFI_THEME_MAIN,
)
theme_preview: Path = Field(
default=Path(str(ROFI_THEME_PREVIEW)),
description="Path to the Rofi theme file for previews.",
description=desc.ROFI_THEME_PREVIEW,
)
theme_confirm: Path = Field(
default=Path(str(ROFI_THEME_CONFIRM)),
description="Path to the Rofi theme file for confirmation prompts.",
description=desc.ROFI_THEME_CONFIRM,
)
theme_input: Path = Field(
default=Path(str(ROFI_THEME_INPUT)),
description="Path to the Rofi theme file for user input prompts.",
description=desc.ROFI_THEME_INPUT,
)
class MpvConfig(OtherConfig):
"""Configuration specific to the MPV player integration."""
args: str = Field(
default="", description="Comma-separated arguments to pass to the MPV player."
)
args: str = Field(default=defaults.MPV_ARGS, description=desc.MPV_ARGS)
pre_args: str = Field(
default="",
description="Comma-separated arguments to prepend before the MPV command.",
default=defaults.MPV_PRE_ARGS,
description=desc.MPV_PRE_ARGS,
)
disable_popen: bool = Field(
default=True,
description="Disable using subprocess.Popen for MPV, which can be unstable on some systems.",
default=defaults.MPV_DISABLE_POPEN,
description=desc.MPV_DISABLE_POPEN,
)
use_python_mpv: bool = Field(
default=False,
description="Use the python-mpv library for enhanced player control.",
default=defaults.MPV_USE_PYTHON_MPV,
description=desc.MPV_USE_PYTHON_MPV,
)
class VlcConfig(OtherConfig):
"""Configuration specific to the vlc player integration."""
args: str = Field(
default="", description="Comma-separated arguments to pass to the Vlc player."
)
args: str = Field(default=defaults.VLC_ARGS, description=desc.VLC_ARGS)
class AnilistConfig(OtherConfig):
"""Configuration for interacting with the AniList API."""
per_page: int = Field(
default=15,
default=defaults.ANILIST_PER_PAGE,
gt=0,
le=50,
description="Number of items to fetch per page from AniList.",
description=desc.ANILIST_PER_PAGE,
)
sort_by: str = Field(
default="SEARCH_MATCH",
description="Default sort order for AniList search results.",
default=defaults.ANILIST_SORT_BY,
description=desc.ANILIST_SORT_BY,
examples=SORTS_AVAILABLE,
)
preferred_language: Literal["english", "romaji"] = Field(
default="english",
description="Preferred language for anime titles from AniList.",
default=defaults.ANILIST_PREFERRED_LANGUAGE,
description=desc.ANILIST_PREFERRED_LANGUAGE,
)
@field_validator("sort_by")
@@ -150,249 +328,102 @@ class DownloadsConfig(OtherConfig):
"""Configuration for download related options"""
downloader: Literal["auto", "default", "yt-dlp"] = Field(
default="auto", description="The downloader to use"
default=defaults.DOWNLOADS_DOWNLOADER, description=desc.DOWNLOADS_DOWNLOADER
)
downloads_dir: Path = Field(
default_factory=lambda: USER_VIDEOS_DIR,
description="The default directory to save downloaded anime.",
default_factory=lambda: defaults.DOWNLOADS_DOWNLOADS_DIR,
description=desc.DOWNLOADS_DOWNLOADS_DIR,
)
# Download tracking configuration
enable_tracking: bool = Field(
default=True, description="Enable download tracking and management"
default=defaults.DOWNLOADS_ENABLE_TRACKING,
description=desc.DOWNLOADS_ENABLE_TRACKING,
)
auto_organize: bool = Field(
default=True, description="Automatically organize downloads by anime title"
default=defaults.DOWNLOADS_AUTO_ORGANIZE,
description=desc.DOWNLOADS_AUTO_ORGANIZE,
)
max_concurrent: int = Field(
default=3, gt=0, le=10, description="Maximum concurrent downloads"
default=defaults.DOWNLOADS_MAX_CONCURRENT,
gt=0,
le=10,
description=desc.DOWNLOADS_MAX_CONCURRENT,
)
auto_cleanup_failed: bool = Field(
default=True, description="Automatically cleanup failed downloads"
default=defaults.DOWNLOADS_AUTO_CLEANUP_FAILED,
description=desc.DOWNLOADS_AUTO_CLEANUP_FAILED,
)
retention_days: int = Field(
default=30, gt=0, description="Days to keep failed downloads before cleanup"
default=defaults.DOWNLOADS_RETENTION_DAYS,
gt=0,
description=desc.DOWNLOADS_RETENTION_DAYS,
)
# Integration with watch history
sync_with_watch_history: bool = Field(
default=True, description="Sync download status with watch history"
default=defaults.DOWNLOADS_SYNC_WITH_WATCH_HISTORY,
description=desc.DOWNLOADS_SYNC_WITH_WATCH_HISTORY,
)
auto_mark_offline: bool = Field(
default=True, description="Automatically mark downloaded episodes as available offline"
default=defaults.DOWNLOADS_AUTO_MARK_OFFLINE,
description=desc.DOWNLOADS_AUTO_MARK_OFFLINE,
)
# File organization
naming_template: str = Field(
default="{title}/Season {season:02d}/{episode:02d} - {episode_title}.{ext}",
description="File naming template for downloaded episodes"
default=defaults.DOWNLOADS_NAMING_TEMPLATE,
description=desc.DOWNLOADS_NAMING_TEMPLATE,
)
# Quality and subtitles
preferred_quality: Literal["360", "480", "720", "1080", "best"] = Field(
default="1080", description="Preferred download quality"
default=defaults.DOWNLOADS_PREFERRED_QUALITY,
description=desc.DOWNLOADS_PREFERRED_QUALITY,
)
download_subtitles: bool = Field(
default=True, description="Download subtitles when available"
default=defaults.DOWNLOADS_DOWNLOAD_SUBTITLES,
description=desc.DOWNLOADS_DOWNLOAD_SUBTITLES,
)
subtitle_languages: List[str] = Field(
default=["en"], description="Preferred subtitle languages"
default=defaults.DOWNLOADS_SUBTITLE_LANGUAGES,
description=desc.DOWNLOADS_SUBTITLE_LANGUAGES,
)
# Queue management
queue_max_size: int = Field(
default=100, gt=0, description="Maximum number of items in download queue"
default=defaults.DOWNLOADS_QUEUE_MAX_SIZE,
gt=0,
description=desc.DOWNLOADS_QUEUE_MAX_SIZE,
)
auto_start_downloads: bool = Field(
default=True, description="Automatically start downloads when items are queued"
default=defaults.DOWNLOADS_AUTO_START_DOWNLOADS,
description=desc.DOWNLOADS_AUTO_START_DOWNLOADS,
)
retry_attempts: int = Field(
default=3, ge=0, description="Number of retry attempts for failed downloads"
default=defaults.DOWNLOADS_RETRY_ATTEMPTS,
ge=0,
description=desc.DOWNLOADS_RETRY_ATTEMPTS,
)
retry_delay: int = Field(
default=300, ge=0, description="Delay between retry attempts in seconds"
)
class GeneralConfig(BaseModel):
"""Configuration for general application behavior and integrations."""
pygment_style: str = Field(
default="github-dark", description="The pygment style to use"
)
api_client: Literal["anilist", "jikan"] = Field(
default="anilist",
description="The media database API to use (e.g., 'anilist', 'jikan').",
)
provider: str = Field(
default="allanime",
description="The default anime provider to use for scraping.",
examples=list(PROVIDERS_AVAILABLE.keys()),
)
selector: Literal["default", "fzf", "rofi"] = Field(
default="default", description="The interactive selector tool to use for menus."
)
auto_select_anime_result: bool = Field(
default=True,
description="Automatically select the best-matching search result from a provider.",
)
icons: bool = Field(
default=False, description="Display emoji icons in the user interface."
)
preview: Literal["full", "text", "image", "none"] = Field(
default="none", description="Type of preview to display in selectors."
)
image_renderer: Literal["icat", "chafa", "imgcat"] = Field(
default="icat" if os.environ.get("KITTY_WINDOW_ID") else "chafa",
description="The command-line tool to use for rendering images in the terminal.",
)
manga_viewer: Literal["feh", "icat"] = Field(
default="feh",
description="The external application to use for viewing manga pages.",
)
check_for_updates: bool = Field(
default=True,
description="Automatically check for new versions of FastAnime on startup.",
)
cache_requests: bool = Field(
default=True,
description="Enable caching of network requests to speed up subsequent operations.",
)
max_cache_lifetime: str = Field(
default="03:00:00",
description="Maximum lifetime for a cached request in DD:HH:MM format.",
)
normalize_titles: bool = Field(
default=True,
description="Attempt to normalize provider titles to match AniList titles.",
)
discord: bool = Field(
default=False,
description="Enable Discord Rich Presence to show your current activity.",
)
recent: int = Field(
default=50,
default=defaults.DOWNLOADS_RETRY_DELAY,
ge=0,
description="Number of recently watched anime to keep in history.",
description=desc.DOWNLOADS_RETRY_DELAY,
)
@field_validator("provider")
@classmethod
def validate_provider(cls, v: str) -> str:
if v not in PROVIDERS_AVAILABLE:
raise ValueError(
f"'{v}' is not a valid provider. Must be one of: {PROVIDERS_AVAILABLE}"
)
return v
class MediaRegistryConfig(OtherConfig):
"""Configuration for registry related options"""
class StreamConfig(BaseModel):
"""Configuration specific to video streaming and playback."""
player: Literal["mpv", "vlc"] = Field(
default="mpv", description="The media player to use for streaming."
)
quality: Literal["360", "480", "720", "1080"] = Field(
default="1080", description="Preferred stream quality."
)
translation_type: Literal["sub", "dub"] = Field(
default="sub", description="Preferred audio/subtitle language type."
)
server: str = Field(
default="TOP",
description="The default server to use from a provider. 'top' uses the first available.",
examples=SERVERS_AVAILABLE,
)
auto_next: bool = Field(
default=False,
description="Automatically play the next episode when the current one finishes.",
)
continue_from_watch_history: bool = Field(
default=True,
description="Automatically resume playback from the last known episode and position.",
)
preferred_watch_history: Literal["local", "remote"] = Field(
default="local",
description="Which watch history to prioritize: local file or remote AniList progress.",
)
auto_skip: bool = Field(
default=False,
description="Automatically skip openings/endings if skip data is available.",
)
episode_complete_at: int = Field(
default=80,
ge=0,
le=100,
description="Percentage of an episode to watch before it's marked as complete.",
)
ytdlp_format: str = Field(
default="best[height<=1080]/bestvideo[height<=1080]+bestaudio/best",
description="The format selection string for yt-dlp.",
)
force_forward_tracking: bool = Field(
default=True,
description="Prevent updating AniList progress to a lower episode number.",
)
default_media_list_tracking: Literal["track", "disabled", "prompt"] = Field(
default="prompt",
description="Default behavior for tracking progress on AniList.",
)
sub_lang: str = Field(
default="eng",
description="Preferred language code for subtitles (e.g., 'en', 'es').",
media_dir: Path = Field(
default=defaults.MEDIA_REGISTRY_DIR,
description=desc.MEDIA_REGISTRY_DIR,
)
@field_validator("server")
@classmethod
def validate_server(cls, v: str) -> str:
if v.lower() != "top" and v not in SERVERS_AVAILABLE:
raise ValueError(
f"'{v}' is not a valid server. Must be 'top' or one of: {SERVERS_AVAILABLE}"
)
return v
class ServiceConfig(BaseModel):
"""Configuration for the background download service."""
enabled: bool = Field(
default=False,
description="Whether the background service should be enabled by default.",
)
watchlist_check_interval: int = Field(
default=30,
ge=5,
le=180,
description="Minutes between checking AniList watchlist for new episodes.",
)
queue_process_interval: int = Field(
default=1,
ge=1,
le=60,
description="Minutes between processing the download queue.",
)
max_concurrent_downloads: int = Field(
default=3,
ge=1,
le=10,
description="Maximum number of concurrent downloads.",
)
auto_retry_count: int = Field(
default=3,
ge=0,
le=10,
description="Number of times to retry failed downloads.",
)
cleanup_completed_days: int = Field(
default=7,
ge=1,
le=30,
description="Days to keep completed/failed jobs in queue before cleanup.",
)
notification_enabled: bool = Field(
default=True,
description="Whether to show notifications for new episodes.",
index_dir: Path = Field(
default=defaults.MEDIA_REGISTRY_INDEX_DIR,
description=desc.MEDIA_REGISTRY_INDEX_DIR,
)
@@ -401,36 +432,37 @@ class AppConfig(BaseModel):
general: GeneralConfig = Field(
default_factory=GeneralConfig,
description="General configuration settings for application behavior.",
description=desc.APP_GENERAL,
)
stream: StreamConfig = Field(
default_factory=StreamConfig,
description="Settings related to video streaming and playback.",
description=desc.APP_STREAM,
)
downloads: DownloadsConfig = Field(
default_factory=DownloadsConfig, description="Settings related to downloading"
default_factory=DownloadsConfig, description=desc.APP_DOWNLOADS
)
anilist: AnilistConfig = Field(
default_factory=AnilistConfig,
description="Configuration for AniList API integration.",
description=desc.APP_ANILIST,
)
service: ServiceConfig = Field(
default_factory=ServiceConfig,
description="Configuration for the background download service.",
description=desc.APP_SERVICE,
)
fzf: FzfConfig = Field(
default_factory=FzfConfig,
description="Settings for the FZF selector interface.",
description=desc.APP_FZF,
)
rofi: RofiConfig = Field(
default_factory=RofiConfig,
description="Settings for the Rofi selector interface.",
)
mpv: MpvConfig = Field(
default_factory=MpvConfig, description="Configuration for the MPV media player."
description=desc.APP_ROFI,
)
mpv: MpvConfig = Field(default_factory=MpvConfig, description=desc.APP_MPV)
service: ServiceConfig = Field(
default_factory=ServiceConfig,
description="Configuration for the background download service.",
description=desc.APP_SERVICE,
)
media_registry: MediaRegistryConfig = Field(
default_factory=MediaRegistryConfig, description=desc.APP_MEDIA_REGISTRY
)

View File

@@ -0,0 +1,310 @@
import logging
import os
import time
import uuid
from pathlib import Path
from typing import IO, Any, BinaryIO, TextIO, Union
logger = logging.getLogger(__name__)
def get_file_modification_time(filepath: Path) -> float:
"""
Returns the modification time of a file as a Unix timestamp.
Returns 0 if the file does not exist.
"""
if filepath.exists():
return filepath.stat().st_mtime
return 0
def check_file_modified(filepath: Path, previous_mtime: float) -> tuple[float, bool]:
"""
Checks if a file has been modified since a given previous modification time.
"""
current_mtime = get_file_modification_time(filepath)
return current_mtime, current_mtime > previous_mtime
class AtomicWriter:
"""
A context manager for performing atomic file writes.
Writes are first directed to a temporary file. If the 'with' block
completes successfully, the temporary file is atomically renamed
to the target path, ensuring that the target file is never in
a partially written or corrupted state. If an error occurs, the
temporary file is cleaned up, and the original target file remains
untouched.
Usage:
# For text files
with AtomicWriter(Path("my_file.txt"), mode="w", encoding="utf-8") as f:
f.write("Hello, world!\n")
f.write("This is an atomic write.")
# For binary files
with AtomicWriter(Path("binary_data.bin"), mode="wb") as f:
f.write(b"\x01\x02\x03\x04")
"""
def __init__(
self, target_path: Path, mode: str = "w", encoding: Union[str, None] = "utf-8"
):
"""
Initializes the AtomicWriter.
Args:
target_path: The Path object for the final destination file.
mode: The file opening mode (e.g., 'w', 'wb'). Only write modes are supported.
'a' (append) and 'x' (exclusive creation) modes are not supported
as this class is designed for full file replacement.
encoding: The encoding to use for text modes ('w', 'wt').
Should be None for binary modes ('wb').
Raises:
ValueError: If an unsupported file mode is provided.
"""
if "a" in mode:
raise ValueError(
"AtomicWriter does not support 'append' mode ('a'). "
"It's designed for full file replacement."
)
if "x" in mode:
raise ValueError(
"AtomicWriter does not support 'exclusive creation' mode ('x'). "
"It handles creation/replacement atomically."
)
if "r" in mode:
raise ValueError("AtomicWriter is for writing, not reading.")
if "b" in mode and encoding is not None:
raise ValueError("Encoding must be None for binary write modes ('wb').")
if "b" not in mode and encoding is None:
raise ValueError(
"Encoding must be specified for text write modes ('w', 'wt')."
)
self.target_path = target_path
self.mode = mode
self.encoding = encoding
temp_filename = f"{target_path.name}.{os.getpid()}.{uuid.uuid4()}.tmp"
self.temp_path = target_path.parent / temp_filename
self._file_handle: Union[IO[Any], None] = None
def __enter__(self) -> IO[Any]:
"""
Enters the context, opens the temporary file for writing,
and returns the file handle.
Ensures the parent directory of the target file exists.
Returns:
A file-like object (TextIO or BinaryIO) for writing.
Raises:
Exception: If there's an error opening the temporary file.
"""
try:
self.target_path.parent.mkdir(parents=True, exist_ok=True)
self._file_handle = self.temp_path.open(
mode=self.mode, encoding=self.encoding
)
return self._file_handle
except Exception as e:
logger.error(f"Error opening temporary file {self.temp_path}: {e}")
raise
def __exit__(self, exc_type, exc_val, exc_tb) -> bool:
"""
Exits the context. Closes the temporary file.
If no exception occurred within the 'with' block, atomically renames
the temporary file to the target path. Otherwise, cleans up the
temporary file, ensuring the original target file remains untouched.
Args:
exc_type: The type of exception raised in the 'with' block (or None).
exc_val: The exception instance (or None).
exc_tb: The traceback object (or None).
Returns:
False: To propagate any exception that occurred within the 'with' block.
(Returning True would suppress the exception).
"""
if self._file_handle:
self._file_handle.close()
self._file_handle = None
if exc_type is None:
try:
os.replace(self.temp_path, self.target_path)
logger.debug(f"Successfully wrote atomically to {self.target_path}")
except OSError as e:
logger.error(
f"Error renaming temporary file {self.temp_path} to {self.target_path}: {e}"
)
try:
self.temp_path.unlink(missing_ok=True)
except OSError as cleanup_e:
logger.error(
f"Failed to clean up temporary file {self.temp_path} after rename error: {cleanup_e}"
)
raise
else:
logger.debug(
f"An error occurred during write. Cleaning up temporary file {self.temp_path}."
)
try:
self.temp_path.unlink(missing_ok=True)
except OSError as e:
logger.error(f"Error cleaning up temporary file {self.temp_path}: {e}")
return False
class FileLock:
def __init__(
self, lock_file_path: Path, timeout: float = 300, stale_timeout: float = 300
):
"""
Initializes a file-based lock.
Args:
lock_file_path: The Path object for the lock file.
timeout: How long (in seconds) to wait to acquire the lock.
Set to 0 for non-blocking attempt. Set to -1 for indefinite wait.
stale_timeout: If the lock file is older than this (in seconds),
it's considered stale and will be broken.
"""
self.lock_file_path = lock_file_path
self.timeout = timeout
self.stale_timeout = stale_timeout
self._acquired = False
self._pid = os.getpid() # Get current process ID
def _acquire_atomic(self) -> bool:
"""
Attempts to atomically create the lock file.
Returns True on success, False on failure (file already exists).
Writes the PID and timestamp to the lock file.
"""
try:
# Use 'x' mode for atomic creation: create only if it doesn't exist.
# If it exists, FileExistsError is raised.
with self.lock_file_path.open("x") as f:
f.write(f"{self._pid}\n{time.time()}")
return True
except FileExistsError:
return False
except Exception as e:
# Handle other potential errors during file creation/write
logger.error(f"Error creating lock file {self.lock_file_path}: {e}")
return False
def _is_stale(self) -> bool:
"""
Checks if the existing lock file is stale based on its modification time
or the PID inside it.
"""
if not self.lock_file_path.exists():
return False # Not stale if it doesn't exist
try:
# Read PID and timestamp from the lock file
with self.lock_file_path.open("r") as f:
lines = f.readlines()
if len(lines) >= 2:
locked_pid = int(lines[0].strip())
locked_timestamp = float(lines[1].strip())
current_time = time.time()
if current_time - locked_timestamp > self.stale_timeout:
logger.warning(
f"Lock file {self.lock_file_path} is older than {self.stale_timeout} seconds. Considering it stale."
)
return True
return False
except (ValueError, IndexError, FileNotFoundError, OSError) as e:
logger.warning(
f"Could not read or parse lock file {self.lock_file_path}. Assuming it's stale due to potential corruption: {e}"
)
return True
def acquire(self):
"""
Attempts to acquire the lock. Blocks until acquired or timeout occurs.
"""
start_time = time.time()
while True:
if self._acquire_atomic():
self._acquired = True
logger.debug(f"Lock acquired by PID {self._pid}.")
return
if self._is_stale():
logger.debug(
f"Existing lock file {self.lock_file_path} is stale. Attempting to break it."
)
try:
self.lock_file_path.unlink()
if self._acquire_atomic():
self._acquired = True
logger.debug(
f"Stale lock broken and new lock acquired by PID {self._pid}."
)
return
except OSError as e:
logger.error(
f"Could not remove stale lock file {self.lock_file_path}: {e}"
)
if self.timeout >= 0 and time.time() - start_time > self.timeout:
raise TimeoutError(
f"Failed to acquire lock {self.lock_file_path} within {self.timeout} seconds."
)
sleep_time = 0.1
if self.timeout == -1:
logger.debug(f"Waiting for lock {self.lock_file_path} indefinitely...")
time.sleep(sleep_time)
elif self.timeout > 0:
logger.debug(
f"Waiting for lock {self.lock_file_path} ({round(self.timeout - (time.time() - start_time), 1)}s remaining)..."
)
time.sleep(
min(
sleep_time,
self.timeout - (time.time() - start_time)
if self.timeout - (time.time() - start_time) > 0
else sleep_time,
)
)
else:
raise BlockingIOError(
f"Lock {self.lock_file_path} is currently held by another process (non-blocking)."
)
def release(self):
"""
Releases the lock by deleting the lock file.
"""
if self._acquired:
try:
self.lock_file_path.unlink(missing_ok=True)
self._acquired = False
logger.debug(f"Lock released by PID {self._pid}.")
except OSError as e:
logger.error(f"Error releasing lock file {self.lock_file_path}: {e}")
else:
logger.warning(
"Attempted to release a lock that was not acquired by PID {self._pid}."
)
def __enter__(self):
self.acquire()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.release()

View File

@@ -30,6 +30,8 @@ from .types import (
AnilistPageInfo,
AnilistStudioNodes,
AnilistViewerData,
)
from .types import (
StreamingEpisode as AnilistStreamingEpisode,
)
@@ -38,10 +40,13 @@ logger = logging.getLogger(__name__)
def _to_generic_media_title(anilist_title: AnilistMediaTitle) -> MediaTitle:
"""Maps an AniList title object to a generic MediaTitle."""
romaji = anilist_title.get("romaji")
english = anilist_title.get("english")
native = anilist_title.get("native")
return MediaTitle(
romaji=anilist_title.get("romaji"),
english=anilist_title.get("english"),
native=anilist_title.get("native"),
romaji=romaji,
english=(english or romaji or native or "NO_TITLE"),
native=native,
)
@@ -103,13 +108,12 @@ def _to_generic_tags(anilist_tags: list[AnilistMediaTag]) -> List[MediaTag]:
]
def _to_generic_streaming_episodes(anilist_episodes: list[AnilistStreamingEpisode]) -> List[StreamingEpisode]:
def _to_generic_streaming_episodes(
anilist_episodes: list[AnilistStreamingEpisode],
) -> List[StreamingEpisode]:
"""Maps a list of AniList streaming episodes to generic StreamingEpisode objects."""
return [
StreamingEpisode(
title=episode["title"],
thumbnail=episode.get("thumbnail")
)
StreamingEpisode(title=episode["title"], thumbnail=episode.get("thumbnail"))
for episode in anilist_episodes
if episode.get("title")
]
@@ -174,7 +178,19 @@ def _to_generic_media_item(
popularity=data.get("popularity"),
favourites=data.get("favourites"),
next_airing=_to_generic_airing_schedule(data.get("nextAiringEpisode")),
streaming_episodes=_to_generic_streaming_episodes(data.get("streamingEpisodes", [])),
start_date=datetime(
data["startDate"]["year"],
data["startDate"]["month"],
data["startDate"]["day"],
),
end_date=datetime(
data["startDate"]["year"],
data["startDate"]["month"],
data["startDate"]["day"],
),
streaming_episodes=_to_generic_streaming_episodes(
data.get("streamingEpisodes", [])
),
user_status=_to_generic_user_status(data, media_list),
)

View File

@@ -20,6 +20,7 @@ UserListStatusType = Literal[
class BaseApiModel(BaseModel):
"""Base model for all API types."""
pass
@@ -34,8 +35,8 @@ class MediaImage(BaseApiModel):
class MediaTitle(BaseApiModel):
"""A generic representation of media titles."""
english: str
romaji: Optional[str] = None
english: Optional[str] = None
native: Optional[str] = None
@@ -93,15 +94,10 @@ class UserListStatus(BaseApiModel):
class MediaItem(BaseApiModel):
"""
The definitive, backend-agnostic representation of a single media item.
This is the primary data model the application will interact with.
"""
id: int
title: MediaTitle
id_mal: Optional[int] = None
type: MediaType = "ANIME"
title: MediaTitle = Field(default_factory=MediaTitle)
status: Optional[str] = None
format: Optional[str] = None # e.g., TV, MOVIE, OVA
@@ -121,6 +117,9 @@ class MediaItem(BaseApiModel):
popularity: Optional[int] = None
favourites: Optional[int] = None
start_date: Optional[datetime] = None
end_date: Optional[datetime] = None
next_airing: Optional[AiringSchedule] = None
# streaming episodes