From 82ca5f32b13ff98cc62195f1adf4fb3f72fd75d9 Mon Sep 17 00:00:00 2001 From: Benexl Date: Fri, 25 Jul 2025 03:04:16 +0300 Subject: [PATCH] fix: logical issues with registry --- fastanime/cli/service/registry/models.py | 14 +- fastanime/cli/service/registry/service.py | 291 ++++++++++++---------- 2 files changed, 166 insertions(+), 139 deletions(-) diff --git a/fastanime/cli/service/registry/models.py b/fastanime/cli/service/registry/models.py index 93cf476..1bfff0b 100644 --- a/fastanime/cli/service/registry/models.py +++ b/fastanime/cli/service/registry/models.py @@ -6,8 +6,8 @@ from typing import Dict, Literal, Optional from pydantic import BaseModel, Field, computed_field -from ....libs.media_api.types import MediaItem, UserMediaListStatus from ....core.utils import converter +from ....libs.media_api.types import MediaItem, UserMediaListStatus logger = logging.getLogger(__name__) @@ -30,15 +30,15 @@ class MediaEpisode(BaseModel): download_status: DownloadStatus = DownloadStatus.NOT_DOWNLOADED file_path: Path download_date: datetime = Field(default_factory=datetime.now) - + # Additional download metadata file_size: Optional[int] = None # File size in bytes - quality: Optional[str] = None # Download quality (e.g., "1080p", "720p") + quality: Optional[str] = None # Download quality (e.g., "1080p", "720p") provider_name: Optional[str] = None # Name of the provider used - server_name: Optional[str] = None # Name of the server used + server_name: Optional[str] = None # Name of the server used subtitle_paths: list[Path] = Field(default_factory=list) # Paths to subtitle files - download_attempts: int = 0 # Number of download attempts - last_error: Optional[str] = None # Last error message if failed + download_attempts: int = 0 # Number of download attempts + last_error: Optional[str] = None # Last error message if failed class MediaRecord(BaseModel): @@ -91,7 +91,7 @@ class MediaRegistryIndex(BaseModel): """Get breakdown by user status.""" breakdown = {} for entry in self.media_index.values(): - breakdown[entry.status] = breakdown.get(entry.status, 0) + 1 + breakdown[entry.status.value] = breakdown.get(entry.status.value, 0) + 1 return breakdown @computed_field diff --git a/fastanime/cli/service/registry/service.py b/fastanime/cli/service/registry/service.py index 9584e75..7f9b5e2 100644 --- a/fastanime/cli/service/registry/service.py +++ b/fastanime/cli/service/registry/service.py @@ -2,7 +2,7 @@ import json import logging from datetime import datetime from pathlib import Path -from typing import Dict, Generator, List, Optional +from typing import TYPE_CHECKING, Dict, Generator, List, Optional, TypedDict from ....core.config.model import MediaRegistryConfig from ....core.exceptions import FastAnimeError @@ -17,12 +17,19 @@ from ....libs.media_api.types import ( ) from .models import ( REGISTRY_VERSION, + DownloadStatus, MediaRecord, MediaRegistryIndex, MediaRegistryIndexEntry, - DownloadStatus ) + +class StatBreakdown(TypedDict): + total_media_breakdown: Dict[int, int] + status_breakdown: Dict[int, int] + last_updated: str + + logger = logging.getLogger(__name__) @@ -205,26 +212,7 @@ class MediaRegistryService: for entry in sorted_entries: record = self.get_media_record(entry.media_id) if record: - # Create UserListItem from index entry - user_list_item = UserListItem( - status=entry.status, - progress=int(entry.progress) if entry.progress.isdigit() else 0, - score=entry.score, - repeat=entry.repeat, - notes=entry.notes, - start_date=entry.start_date, - completed_at=entry.completed_at - ) - - # Create new MediaItem with user status - media_with_status = MediaItem( - **record.media_item.model_dump(), - user_status=user_list_item - ) - recent_media.append(media_with_status) - # if len(recent_media) == limit: - # break - + recent_media.append(record.media_item) page_info = PageInfo( total=len(sorted_entries), ) @@ -233,143 +221,185 @@ class MediaRegistryService: def search_for_media(self, params: MediaSearchParams) -> MediaSearchResult: """Search for media in the local registry based on search parameters.""" from ....libs.media_api.types import MediaSearchResult, PageInfo - + index = self._load_index() all_media: List[MediaItem] = [] - + # Get all media records and attach user status for entry in index.media_index.values(): record = self.get_media_record(entry.media_id) if record: # Create UserListItem from index entry - user_list_item = UserListItem( - status=entry.status, - progress=int(entry.progress) if entry.progress.isdigit() else 0, - score=entry.score, - repeat=entry.repeat, - notes=entry.notes, - start_date=entry.start_date, - completed_at=entry.completed_at - ) - - # Create new MediaItem with user status - media_with_status = MediaItem( - **record.media_item.model_dump(), - user_status=user_list_item - ) - all_media.append(media_with_status) - + all_media.append(record.media_item) + # Apply filters based on search parameters filtered_media = self._apply_search_filters(all_media, params, index) - + # Apply sorting sorted_media = self._apply_sorting(filtered_media, params, index) - + # Apply pagination page = params.page or 1 per_page = params.per_page or 15 start_idx = (page - 1) * per_page end_idx = start_idx + per_page - + paginated_media = sorted_media[start_idx:end_idx] - + page_info = PageInfo( total=len(sorted_media), current_page=page, has_next_page=end_idx < len(sorted_media), - per_page=per_page + per_page=per_page, ) - + return MediaSearchResult(page_info=page_info, media=paginated_media) - def _apply_search_filters(self, media_list: List[MediaItem], params: MediaSearchParams, index) -> List[MediaItem]: + def _apply_search_filters( + self, media_list: List[MediaItem], params: MediaSearchParams, index + ) -> List[MediaItem]: """Apply search filters to media list.""" filtered = media_list.copy() - + # Query filter (search in title) if params.query: query_lower = params.query.lower() filtered = [ - media for media in filtered - if (query_lower in media.title.english.lower() if media.title.english else False) or - (query_lower in media.title.romaji.lower() if media.title.romaji else False) or - (query_lower in media.title.native.lower() if media.title.native else False) or - any(query_lower in synonym.lower() for synonym in media.synonymns) + media + for media in filtered + if ( + query_lower in media.title.english.lower() + if media.title.english + else False + ) + or ( + query_lower in media.title.romaji.lower() + if media.title.romaji + else False + ) + or ( + query_lower in media.title.native.lower() + if media.title.native + else False + ) + or any(query_lower in synonym.lower() for synonym in media.synonymns) ] - + # Status filters if params.status: filtered = [media for media in filtered if media.status == params.status] if params.status_in: filtered = [media for media in filtered if media.status in params.status_in] if params.status_not_in: - filtered = [media for media in filtered if media.status not in params.status_not_in] - + filtered = [ + media for media in filtered if media.status not in params.status_not_in + ] + # Genre filters if params.genre_in: - filtered = [media for media in filtered if any(genre in media.genres for genre in params.genre_in)] + filtered = [ + media + for media in filtered + if any(genre in media.genres for genre in params.genre_in) + ] if params.genre_not_in: - filtered = [media for media in filtered if not any(genre in media.genres for genre in params.genre_not_in)] - + filtered = [ + media + for media in filtered + if not any(genre in media.genres for genre in params.genre_not_in) + ] + # Tag filters if params.tag_in: media_tags = [tag.name for media in filtered for tag in media.tags] - filtered = [media for media in filtered if any(tag in media_tags for tag in params.tag_in)] + filtered = [ + media + for media in filtered + if any(tag in media_tags for tag in params.tag_in) + ] if params.tag_not_in: media_tags = [tag.name for media in filtered for tag in media.tags] - filtered = [media for media in filtered if not any(tag in media_tags for tag in params.tag_not_in)] - + filtered = [ + media + for media in filtered + if not any(tag in media_tags for tag in params.tag_not_in) + ] + # Format filter if params.format_in: filtered = [media for media in filtered if media.format in params.format_in] - + # Type filter if params.type: filtered = [media for media in filtered if media.type == params.type] - + # Score filters if params.averageScore_greater is not None: - filtered = [media for media in filtered if media.average_score and media.average_score >= params.averageScore_greater] + filtered = [ + media + for media in filtered + if media.average_score + and media.average_score >= params.averageScore_greater + ] if params.averageScore_lesser is not None: - filtered = [media for media in filtered if media.average_score and media.average_score <= params.averageScore_lesser] - + filtered = [ + media + for media in filtered + if media.average_score + and media.average_score <= params.averageScore_lesser + ] + # Popularity filters if params.popularity_greater is not None: - filtered = [media for media in filtered if media.popularity and media.popularity >= params.popularity_greater] + filtered = [ + media + for media in filtered + if media.popularity and media.popularity >= params.popularity_greater + ] if params.popularity_lesser is not None: - filtered = [media for media in filtered if media.popularity and media.popularity <= params.popularity_lesser] - + filtered = [ + media + for media in filtered + if media.popularity and media.popularity <= params.popularity_lesser + ] + # ID filter if params.id_in: filtered = [media for media in filtered if media.id in params.id_in] - + # User list filter if params.on_list is not None: if params.on_list: # Only show media that has user status (is on list) - filtered = [media for media in filtered if media.user_status is not None] + filtered = [ + media for media in filtered if media.user_status is not None + ] else: # Only show media that doesn't have user status (not on list) filtered = [media for media in filtered if media.user_status is None] - + return filtered - def _apply_sorting(self, media_list: List[MediaItem], params: MediaSearchParams, index) -> List[MediaItem]: + def _apply_sorting( + self, media_list: List[MediaItem], params: MediaSearchParams, index + ) -> List[MediaItem]: """Apply sorting to media list.""" if not params.sort: return media_list - + # Get the MediaSort value sort = params.sort if isinstance(sort, list): sort = sort[0] # Use first sort if multiple provided - + # Apply sorting based on MediaSort enum try: if sort.value == "POPULARITY_DESC": return sorted(media_list, key=lambda x: x.popularity or 0, reverse=True) elif sort.value == "SCORE_DESC": - return sorted(media_list, key=lambda x: x.average_score or 0, reverse=True) + return sorted( + media_list, key=lambda x: x.average_score or 0, reverse=True + ) elif sort.value == "FAVOURITES_DESC": return sorted(media_list, key=lambda x: x.favourites or 0, reverse=True) elif sort.value == "TRENDING_DESC": @@ -380,10 +410,13 @@ class MediaRegistryService: def get_last_watched(media): entry = index.media_index.get(f"{self._media_api}_{media.id}") return entry.last_watched if entry else datetime.min + return sorted(media_list, key=get_last_watched, reverse=True) else: # Default to title sorting - return sorted(media_list, key=lambda x: x.title.english or x.title.romaji or "") + return sorted( + media_list, key=lambda x: x.title.english or x.title.romaji or "" + ) except Exception as e: logger.warning(f"Failed to apply sorting {sort}: {e}") return media_list @@ -391,63 +424,58 @@ class MediaRegistryService: def get_media_by_status(self, status: UserMediaListStatus) -> MediaSearchResult: """Get media filtered by user status from registry.""" index = self._load_index() - + # Filter entries by status status_entries = [ - entry for entry in index.media_index.values() - if entry.status == status + entry for entry in index.media_index.values() if entry.status == status ] - + # Get media items for these entries media_list: List[MediaItem] = [] for entry in status_entries: record = self.get_media_record(entry.media_id) if record: # Create UserListItem from index entry - user_status = UserListItem( - status=entry.status, - progress=int(entry.progress) if entry.progress.isdigit() else 0, - score=entry.score, - repeat=entry.repeat, - notes=entry.notes, - start_date=entry.start_date, - completed_at=entry.completed_at - ) - - media_with_status = MediaItem( - **record.media_item.model_dump(), - user_status=user_status - ) - media_list.append(media_with_status) - + media_list.append(record.media_item) + # Sort by last watched sorted_media = sorted( - media_list, - key=lambda media: next( - (entry.last_watched for entry in index.media_index.values() - if entry.media_id == media.id), - datetime.min - ), - reverse=True + media_list, + key=lambda media_item: next( + ( + entry.last_watched + for entry in index.media_index.values() + if entry.media_id == media_item.id + ), + datetime.min, + ), + reverse=True, ) - + page_info = PageInfo(total=len(sorted_media)) return MediaSearchResult(page_info=page_info, media=sorted_media) - def get_registry_stats(self) -> Dict: + def get_registry_stats(self) -> "StatBreakdown": """Get comprehensive registry statistics.""" + stats: "StatBreakdown" = {} # type: ignore try: index = self._load_index() - return { - "total_media_breakdown": index.media_count_breakdown, - "status_breakdown": index.status_breakdown, - "last_updated": index.last_updated.strftime("%Y-%m-%d %H:%M:%S"), - } + stats.update( + StatBreakdown( + **{ + "total_media_breakdown": index.media_count_breakdown, + "status_breakdown": index.status_breakdown, + "last_updated": index.last_updated.strftime( + "%Y-%m-%d %H:%M:%S" + ), + } + ) + ) except Exception as e: logger.error(f"Failed to get registry stats: {e}") - return {} + return stats def get_all_media_records(self) -> Generator[MediaRecord, None, List[MediaRecord]]: records = [] @@ -466,7 +494,6 @@ class MediaRegistryService: logger.warning(f"{self.media_registry_dir} is impure which caused: {e}") return records - def remove_media_record(self, media_id: int): with self._lock: record_file = self._get_media_file_path(media_id) @@ -501,19 +528,19 @@ class MediaRegistryService: """Update the download status and metadata for a specific episode.""" try: from .models import DownloadStatus, MediaEpisode - + record = self.get_media_record(media_id) if not record: logger.error(f"No media record found for ID {media_id}") return False - + # Find existing episode or create new one episode_record = None for episode in record.media_episodes: if episode.episode_number == episode_number: episode_record = episode break - + if not episode_record: if not file_path: logger.error(f"File path required for new episode {episode_number}") @@ -524,7 +551,7 @@ class MediaRegistryService: download_status=status, ) record.media_episodes.append(episode_record) - + # Update episode metadata episode_record.download_status = status if file_path: @@ -541,14 +568,14 @@ class MediaRegistryService: episode_record.subtitle_paths = subtitle_paths if error_message: episode_record.last_error = error_message - + # Increment download attempts if this is a failure if status == DownloadStatus.FAILED: episode_record.download_attempts += 1 - + # Save the updated record return self.save_media_record(record) - + except Exception as e: logger.error(f"Failed to update episode download status: {e}") return False @@ -559,14 +586,14 @@ class MediaRegistryService: """Get all episodes with a specific download status.""" try: from .models import DownloadStatus - + episodes = [] for record in self.get_all_media_records(): for episode in record.media_episodes: if episode.download_status == status: episodes.append((record.media_item.id, episode.episode_number)) return episodes - + except Exception as e: logger.error(f"Failed to get episodes by status: {e}") return [] @@ -575,7 +602,7 @@ class MediaRegistryService: """Get comprehensive download statistics.""" try: from .models import DownloadStatus - + stats = { "total_episodes": 0, "downloaded": 0, @@ -587,11 +614,11 @@ class MediaRegistryService: "by_quality": {}, "by_provider": {}, } - + for record in self.get_all_media_records(): for episode in record.media_episodes: stats["total_episodes"] += 1 - + # Count by status status_key = episode.download_status.value.lower() if status_key == "completed": @@ -604,25 +631,25 @@ class MediaRegistryService: stats["downloading"] += 1 elif status_key == "paused": stats["paused"] += 1 - + # Aggregate file sizes if episode.file_size: stats["total_size_bytes"] += episode.file_size - + # Count by quality if episode.quality: stats["by_quality"][episode.quality] = ( stats["by_quality"].get(episode.quality, 0) + 1 ) - + # Count by provider if episode.provider_name: stats["by_provider"][episode.provider_name] = ( stats["by_provider"].get(episode.provider_name, 0) + 1 ) - + return stats - + except Exception as e: logger.error(f"Failed to get download statistics: {e}") return {}