Files
FastAnime/fastanime/cli/service/registry/models.py
Benexl f4e73c3335 Add AniList download command and download service integration
- Implemented a new command for downloading anime episodes using the AniList API.
- Created a DownloadService to manage episode downloads and track their status in the media registry.
- Added comprehensive command-line options for filtering and selecting anime based on various criteria.
- Integrated feedback mechanisms to inform users about download progress and issues.
- Established validation for command options to ensure correct input.
- Enhanced error handling and logging for better debugging and user experience.
- Included functionality for managing multiple downloads concurrently.
2025-07-25 00:38:07 +03:00

110 lines
3.3 KiB
Python

import logging
from datetime import datetime
from enum import Enum
from pathlib import Path
from typing import Dict, Literal, Optional
from pydantic import BaseModel, Field, computed_field
from ....libs.media_api.types import MediaItem, UserMediaListStatus
from ....core.utils import converter
logger = logging.getLogger(__name__)
class DownloadStatus(Enum):
NOT_DOWNLOADED = "not_downloaded"
QUEUED = "queued"
DOWNLOADING = "downloading"
COMPLETED = "completed"
FAILED = "failed"
PAUSED = "paused"
REGISTRY_VERSION = "1.0"
class MediaEpisode(BaseModel):
episode_number: str
download_status: DownloadStatus = DownloadStatus.NOT_DOWNLOADED
file_path: Path
download_date: datetime = Field(default_factory=datetime.now)
# Additional download metadata
file_size: Optional[int] = None # File size in bytes
quality: Optional[str] = None # Download quality (e.g., "1080p", "720p")
provider_name: Optional[str] = None # Name of the provider used
server_name: Optional[str] = None # Name of the server used
subtitle_paths: list[Path] = Field(default_factory=list) # Paths to subtitle files
download_attempts: int = 0 # Number of download attempts
last_error: Optional[str] = None # Last error message if failed
class MediaRecord(BaseModel):
media_item: MediaItem
media_episodes: list[MediaEpisode] = Field(default_factory=list)
class MediaRegistryIndexEntry(BaseModel):
media_id: int
media_api: Literal["anilist", "NONE", "jikan"] = "NONE"
status: UserMediaListStatus = UserMediaListStatus.WATCHING
progress: str = "0"
last_watch_position: Optional[str] = None
last_watched: datetime = Field(default_factory=datetime.now)
total_duration: Optional[str] = None
total_episodes: int = 0
score: float = 0
repeat: int = 0
notes: str = ""
last_notified_episode: Optional[str] = None
# for first watch only
start_date: datetime = Field(default_factory=datetime.now)
completed_at: datetime = Field(default_factory=datetime.now)
@computed_field
@property
def watch_completion_percentage(self) -> float:
"""Watch completion percentage."""
if self.total_duration and self.last_watch_position:
return (
converter.time_to_seconds(self.last_watch_position)
/ converter.time_to_seconds(self.total_duration)
) * 100
return 0.0
class MediaRegistryIndex(BaseModel):
version: str = Field(default=REGISTRY_VERSION)
last_updated: datetime = Field(default_factory=datetime.now)
media_index: Dict[str, MediaRegistryIndexEntry] = Field(default_factory=dict)
@computed_field
@property
def status_breakdown(self) -> Dict[str, int]:
"""Get breakdown by user status."""
breakdown = {}
for entry in self.media_index.values():
breakdown[entry.status] = breakdown.get(entry.status, 0) + 1
return breakdown
@computed_field
@property
def media_count_breakdown(self) -> Dict[str, int]:
breakdown = {}
for entry in self.media_index.values():
breakdown[entry.media_api] = breakdown.get(entry.media_api, 0) + 1
return breakdown
@computed_field
@property
def media_count(self) -> int:
"""Get the number of media."""
return len(self.media_index)