From 65aa8fcb4eb79d4a205930ea8fc0a6a2960cec81 Mon Sep 17 00:00:00 2001 From: Benexl Date: Mon, 28 Jul 2025 19:48:20 +0300 Subject: [PATCH] chore: cleanup --- fastanime/cli/commands/anilist/cmd.py | 2 +- .../cli/commands/anilist/commands/download.py | 981 ------------------ fastanime/cli/interactive/menu/share/auth.py | 283 ----- .../menu/share/session_management.py | 254 ----- .../interactive/menu/share/user_media_list.py | 827 --------------- .../interactive/menu/share/watch_history.py | 572 ---------- fastanime/cli/service/download/__init__.py | 3 - fastanime/cli/service/download/service.py | 530 ---------- fastanime/core/config/defaults.py | 25 - fastanime/core/config/model.py | 118 --- 10 files changed, 1 insertion(+), 3594 deletions(-) delete mode 100644 fastanime/cli/commands/anilist/commands/download.py delete mode 100644 fastanime/cli/interactive/menu/share/auth.py delete mode 100644 fastanime/cli/interactive/menu/share/session_management.py delete mode 100644 fastanime/cli/interactive/menu/share/user_media_list.py delete mode 100644 fastanime/cli/interactive/menu/share/watch_history.py delete mode 100644 fastanime/cli/service/download/__init__.py delete mode 100644 fastanime/cli/service/download/service.py diff --git a/fastanime/cli/commands/anilist/cmd.py b/fastanime/cli/commands/anilist/cmd.py index 7a380e8..b877c0a 100644 --- a/fastanime/cli/commands/anilist/cmd.py +++ b/fastanime/cli/commands/anilist/cmd.py @@ -7,7 +7,7 @@ commands = { # "trending": "trending.trending", # "recent": "recent.recent", "search": "search.search", - # "downloads": "downloads.downloads", + "download": "download.download", "auth": "auth.auth", "stats": "stats.stats", } diff --git a/fastanime/cli/commands/anilist/commands/download.py b/fastanime/cli/commands/anilist/commands/download.py deleted file mode 100644 index b3e6ae6..0000000 --- a/fastanime/cli/commands/anilist/commands/download.py +++ /dev/null @@ -1,981 +0,0 @@ -"""AniList download command using the modern download service.""" - -from typing import TYPE_CHECKING - -import click - -from .....core.config import AppConfig -from .....core.exceptions import FastAnimeError -from .....libs.media_api.api import create_api_client -from .....libs.media_api.params import MediaSearchParams -from .....libs.media_api.types import ( - MediaFormat, - MediaGenre, - MediaSeason, - MediaSort, - MediaStatus, - MediaTag, - MediaType, - MediaYear, -) -from .....libs.provider.anime.provider import create_provider -from .....libs.provider.anime.params import SearchParams, AnimeParams -from .....libs.selectors import create_selector -from ....service.download import DownloadService -from ....service.feedback import FeedbackService -from ....service.registry import MediaRegistryService -from ....utils.completion import anime_titles_shell_complete -from ....utils import parse_episode_range -from .. import examples - -if TYPE_CHECKING: - from typing import TypedDict - from typing_extensions import Unpack - - class DownloadOptions(TypedDict, total=False): - title: str | None - episode_range: str | None - quality: str | None - force_redownload: bool - page: int - per_page: int | None - season: str | None - status: tuple[str, ...] - status_not: tuple[str, ...] - sort: str | None - genres: tuple[str, ...] - genres_not: tuple[str, ...] - tags: tuple[str, ...] - tags_not: tuple[str, ...] - media_format: tuple[str, ...] - media_type: str | None - year: str | None - popularity_greater: int | None - popularity_lesser: int | None - score_greater: int | None - score_lesser: int | None - start_date_greater: int | None - start_date_lesser: int | None - end_date_greater: int | None - end_date_lesser: int | None - on_list: bool | None - max_concurrent: int | None - - -@click.command( - help="Download anime episodes using AniList API for search and provider integration", - short_help="Download anime episodes", - epilog=examples.download, -) -@click.option( - "--title", - "-t", - shell_complete=anime_titles_shell_complete, - help="Title of the anime to search for", -) -@click.option( - "--episode-range", - "-r", - help="Range of episodes to download (e.g., '1:5', '3:', ':5', '1:10:2')", -) -@click.option( - "--quality", - "-q", - type=click.Choice(["360", "480", "720", "1080", "best"]), - help="Preferred download quality", -) -@click.option( - "--force-redownload", - "-f", - is_flag=True, - help="Force redownload even if episode already exists", -) -@click.option( - "--page", - "-p", - type=click.IntRange(min=1), - default=1, - help="Page number for search pagination", -) -@click.option( - "--per-page", - type=click.IntRange(min=1, max=50), - help="Number of results per page (max 50)", -) -@click.option( - "--season", - help="The season the media was released", - type=click.Choice([season.value for season in MediaSeason]), -) -@click.option( - "--status", - "-S", - help="The media status of the anime", - multiple=True, - type=click.Choice([status.value for status in MediaStatus]), -) -@click.option( - "--status-not", - help="Exclude media with these statuses", - multiple=True, - type=click.Choice([status.value for status in MediaStatus]), -) -@click.option( - "--sort", - "-s", - help="What to sort the search results on", - type=click.Choice([sort.value for sort in MediaSort]), -) -@click.option( - "--genres", - "-g", - multiple=True, - help="the genres to filter by", - type=click.Choice([genre.value for genre in MediaGenre]), -) -@click.option( - "--genres-not", - multiple=True, - help="Exclude these genres", - type=click.Choice([genre.value for genre in MediaGenre]), -) -@click.option( - "--tags", - "-T", - multiple=True, - help="the tags to filter by", - type=click.Choice([tag.value for tag in MediaTag]), -) -@click.option( - "--tags-not", - multiple=True, - help="Exclude these tags", - type=click.Choice([tag.value for tag in MediaTag]), -) -@click.option( - "--media-format", - "-F", - multiple=True, - help="Media format", - type=click.Choice([format.value for format in MediaFormat]), -) -@click.option( - "--media-type", - help="Media type (ANIME or MANGA)", - type=click.Choice([media_type.value for media_type in MediaType]), -) -@click.option( - "--year", - "-y", - type=click.Choice([year.value for year in MediaYear]), - help="the year the media was released", -) -@click.option( - "--popularity-greater", - type=click.IntRange(min=0), - help="Minimum popularity score", -) -@click.option( - "--popularity-lesser", - type=click.IntRange(min=0), - help="Maximum popularity score", -) -@click.option( - "--score-greater", - type=click.IntRange(min=0, max=100), - help="Minimum average score (0-100)", -) -@click.option( - "--score-lesser", - type=click.IntRange(min=0, max=100), - help="Maximum average score (0-100)", -) -@click.option( - "--start-date-greater", - type=click.IntRange(min=10000101, max=99991231), - help="Minimum start date (YYYYMMDD format, e.g., 20240101)", -) -@click.option( - "--start-date-lesser", - type=click.IntRange(min=10000101, max=99991231), - help="Maximum start date (YYYYMMDD format, e.g., 20241231)", -) -@click.option( - "--end-date-greater", - type=click.IntRange(min=10000101, max=99991231), - help="Minimum end date (YYYYMMDD format, e.g., 20240101)", -) -@click.option( - "--end-date-lesser", - type=click.IntRange(min=10000101, max=99991231), - help="Maximum end date (YYYYMMDD format, e.g., 20241231)", -) -@click.option( - "--on-list/--not-on-list", - "-L/-no-L", - help="Whether the anime should be in your list or not", - type=bool, -) -@click.option( - "--max-concurrent", - "-c", - type=click.IntRange(min=1, max=10), - help="Maximum number of concurrent downloads", -) -@click.pass_obj -def download(config: AppConfig, **options: "Unpack[DownloadOptions]"): - """Download anime episodes using AniList search and provider integration.""" - feedback = FeedbackService(config.general.icons) - - try: - # Extract and validate options - title = options.get("title") - episode_range = options.get("episode_range") - quality = options.get("quality") - force_redownload = options.get("force_redownload", False) - max_concurrent = options.get("max_concurrent", config.downloads.max_concurrent) - - _validate_options(options) - - # Initialize services - feedback.info("Initializing services...") - api_client, provider, selector, media_registry, download_service = ( - _initialize_services(config) - ) - feedback.info(f"Using provider: {provider.__class__.__name__}") - feedback.info(f"Using media API: {config.general.media_api}") - feedback.info(f"Translation type: {config.stream.translation_type}") - - # Search for anime - search_params = _build_search_params(options, config) - search_result = _search_anime(api_client, search_params, feedback) - - # Let user select anime (single or multiple) - selected_anime_list = _select_anime(search_result, selector, feedback) - if not selected_anime_list: - feedback.info("No anime selected. Exiting.") - return - - # Process each selected anime - for selected_anime in selected_anime_list: - feedback.info( - f"Processing: {selected_anime.title.english or selected_anime.title.romaji}" - ) - feedback.info(f"AniList ID: {selected_anime.id}") - - # Get available episodes from provider - episodes_result = _get_available_episodes( - provider, selected_anime, config, feedback - ) - if not episodes_result: - feedback.warning( - f"No episodes found for {selected_anime.title.english or selected_anime.title.romaji}" - ) - _suggest_alternatives(selected_anime, provider, config, feedback) - continue - - # Unpack the result - if len(episodes_result) == 2: - available_episodes, provider_anime_data = episodes_result - else: - # Fallback for backwards compatibility - available_episodes = episodes_result - provider_anime_data = None - - # Determine episodes to download - episodes_to_download = _determine_episodes_to_download( - episode_range, available_episodes, selector, feedback - ) - if not episodes_to_download: - feedback.warning("No episodes selected for download") - continue - - feedback.info( - f"About to download {len(episodes_to_download)} episodes: {', '.join(episodes_to_download)}" - ) - - # Test stream availability before attempting download (using provider anime data) - if episodes_to_download and provider_anime_data: - test_episode = episodes_to_download[0] - feedback.info( - f"Testing stream availability for episode {test_episode}..." - ) - success = _test_episode_stream_availability( - provider, provider_anime_data, test_episode, config, feedback - ) - if not success: - feedback.warning(f"Stream test failed for episode {test_episode}.") - feedback.info("Possible solutions:") - feedback.info("1. Try a different provider (check your config)") - feedback.info("2. Check if the episode number is correct") - feedback.info("3. Try a different translation type (sub/dub)") - feedback.info( - "4. The anime might not be available on this provider" - ) - - # Ask user if they want to continue anyway - continue_anyway = ( - input("\nContinue with download anyway? (y/N): ") - .strip() - .lower() - ) - if continue_anyway not in ["y", "yes"]: - feedback.info("Download cancelled by user") - continue - - # Download episodes (using provider anime data if available, otherwise AniList data) - anime_for_download = ( - provider_anime_data if provider_anime_data else selected_anime - ) - _download_episodes( - download_service, - anime_for_download, - episodes_to_download, - quality, - force_redownload, - max_concurrent, - feedback, - ) - - # Show final statistics - _show_final_statistics(download_service, feedback) - - except FastAnimeError as e: - feedback.error("Download failed", str(e)) - raise click.Abort() - except Exception as e: - feedback.error("Unexpected error occurred", str(e)) - raise click.Abort() - - -def _validate_options(options: "DownloadOptions") -> None: - """Validate command line options.""" - score_greater = options.get("score_greater") - score_lesser = options.get("score_lesser") - popularity_greater = options.get("popularity_greater") - popularity_lesser = options.get("popularity_lesser") - start_date_greater = options.get("start_date_greater") - start_date_lesser = options.get("start_date_lesser") - end_date_greater = options.get("end_date_greater") - end_date_lesser = options.get("end_date_lesser") - - # Score validation - if ( - score_greater is not None - and score_lesser is not None - and score_greater > score_lesser - ): - raise FastAnimeError("Minimum score cannot be higher than maximum score") - - # Popularity validation - if ( - popularity_greater is not None - and popularity_lesser is not None - and popularity_greater > popularity_lesser - ): - raise FastAnimeError( - "Minimum popularity cannot be higher than maximum popularity" - ) - - # Date validation - if ( - start_date_greater is not None - and start_date_lesser is not None - and start_date_greater > start_date_lesser - ): - raise FastAnimeError("Minimum start date cannot be after maximum start date") - - if ( - end_date_greater is not None - and end_date_lesser is not None - and end_date_greater > end_date_lesser - ): - raise FastAnimeError("Minimum end date cannot be after maximum end date") - - -def _initialize_services(config: AppConfig) -> tuple: - """Initialize all required services.""" - api_client = create_api_client(config.general.media_api, config) - provider = create_provider(config.general.provider) - selector = create_selector(config) - media_registry = MediaRegistryService( - config.general.media_api, config.media_registry - ) - download_service = DownloadService(config, media_registry, provider) - - return api_client, provider, selector, media_registry, download_service - - -def _build_search_params( - options: "DownloadOptions", config: AppConfig -) -> MediaSearchParams: - """Build MediaSearchParams from command options.""" - return MediaSearchParams( - query=options.get("title"), - page=options.get("page", 1), - per_page=options.get("per_page") or config.anilist.per_page or 50, - sort=MediaSort(options.get("sort")) if options.get("sort") else None, - status_in=[MediaStatus(s) for s in options.get("status", ())] - if options.get("status") - else None, - status_not_in=[MediaStatus(s) for s in options.get("status_not", ())] - if options.get("status_not") - else None, - genre_in=[MediaGenre(g) for g in options.get("genres", ())] - if options.get("genres") - else None, - genre_not_in=[MediaGenre(g) for g in options.get("genres_not", ())] - if options.get("genres_not") - else None, - tag_in=[MediaTag(t) for t in options.get("tags", ())] - if options.get("tags") - else None, - tag_not_in=[MediaTag(t) for t in options.get("tags_not", ())] - if options.get("tags_not") - else None, - format_in=[MediaFormat(f) for f in options.get("media_format", ())] - if options.get("media_format") - else None, - type=MediaType(options.get("media_type")) - if options.get("media_type") - else None, - season=MediaSeason(options.get("season")) if options.get("season") else None, - seasonYear=int(year) if (year := options.get("year")) else None, - popularity_greater=options.get("popularity_greater"), - popularity_lesser=options.get("popularity_lesser"), - averageScore_greater=options.get("score_greater"), - averageScore_lesser=options.get("score_lesser"), - startDate_greater=options.get("start_date_greater"), - startDate_lesser=options.get("start_date_lesser"), - endDate_greater=options.get("end_date_greater"), - endDate_lesser=options.get("end_date_lesser"), - on_list=options.get("on_list"), - ) - - -def _search_anime(api_client, search_params, feedback): - """Search for anime using the API client.""" - from rich.progress import Progress, SpinnerColumn, TextColumn - - # Check if we have any search criteria at all - has_criteria = any( - [ - search_params.query, - search_params.genre_in, - search_params.tag_in, - search_params.status_in, - search_params.season, - search_params.seasonYear, - search_params.format_in, - search_params.popularity_greater, - search_params.averageScore_greater, - ] - ) - - if not has_criteria: - raise FastAnimeError( - "Please provide at least one search criterion (title, genre, tag, status, etc.)" - ) - - with Progress( - SpinnerColumn(), - TextColumn("[progress.description]{task.description}"), - transient=True, - ) as progress: - progress.add_task("Searching for anime...", total=None) - search_result = api_client.search_media(search_params) - - if not search_result or not search_result.media: - raise FastAnimeError("No anime found matching your search criteria") - - return search_result - - -def _select_anime(search_result, selector, feedback): - """Let user select anime from search results.""" - if len(search_result.media) == 1: - selected_anime = search_result.media[0] - feedback.info( - f"Auto-selected: {selected_anime.title.english or selected_anime.title.romaji}" - ) - return [selected_anime] - - # Create choice strings with additional info - choices = [] - for i, anime in enumerate(search_result.media, 1): - title = anime.title.english or anime.title.romaji or "Unknown" - year = str(anime.start_date.year) if anime.start_date else "N/A" - score = f"{anime.average_score}%" if anime.average_score else "N/A" - status = anime.status.value if anime.status else "N/A" - choices.append(f"{i:2d}. {title} ({year}) [Score: {score}, Status: {status}]") - - # Use multi-selection - selected_choices = selector.choose_multiple( - prompt="Select anime to download", - choices=choices, - header="Use TAB to select multiple anime, ENTER to confirm", - ) - - if not selected_choices: - return [] - - # Extract anime objects from selections - selected_anime_list = [] - for choice in selected_choices: - # Extract index from choice string (format: "XX. Title...") - try: - index = int(choice.split(".")[0].strip()) - 1 - selected_anime_list.append(search_result.media[index]) - except (ValueError, IndexError): - feedback.error(f"Invalid selection: {choice}") - continue - - return selected_anime_list - - -def _get_available_episodes(provider, anime, config, feedback): - """Get available episodes from provider.""" - try: - # Search for anime in provider first - media_title = anime.title.english or anime.title.romaji - feedback.info( - f"Searching provider '{provider.__class__.__name__}' for: '{media_title}'" - ) - feedback.info(f"Using translation type: '{config.stream.translation_type}'") - - provider_search_results = provider.search( - SearchParams( - query=media_title, translation_type=config.stream.translation_type - ) - ) - - if not provider_search_results or not provider_search_results.results: - feedback.warning( - f"Could not find '{media_title}' on provider '{provider.__class__.__name__}'" - ) - return [] - - feedback.info( - f"Found {len(provider_search_results.results)} results on provider" - ) - - # Show the first few results for debugging - for i, result in enumerate(provider_search_results.results[:3]): - feedback.info( - f"Result {i + 1}: ID={result.id}, Title='{getattr(result, 'title', 'Unknown')}'" - ) - - # Get the first result (could be enhanced with fuzzy matching) - first_result = provider_search_results.results[0] - feedback.info(f"Using first result: ID={first_result.id}") - - # Now get the full anime data using the PROVIDER'S ID, not AniList ID - provider_anime_data = provider.get( - AnimeParams(id=first_result.id, query=media_title) - ) - - if not provider_anime_data: - feedback.warning("Failed to get anime details from provider") - return [] - - # Check all available translation types - translation_types = ["sub", "dub"] - for trans_type in translation_types: - episodes = getattr(provider_anime_data.episodes, trans_type, []) - feedback.info( - f"Translation '{trans_type}': {len(episodes)} episodes available" - ) - - available_episodes = getattr( - provider_anime_data.episodes, config.stream.translation_type, [] - ) - - if not available_episodes: - feedback.warning(f"No '{config.stream.translation_type}' episodes found") - # Suggest alternative translation type if available - for trans_type in translation_types: - if trans_type != config.stream.translation_type: - other_episodes = getattr( - provider_anime_data.episodes, trans_type, [] - ) - if other_episodes: - feedback.info( - f"Suggestion: Try using translation type '{trans_type}' (has {len(other_episodes)} episodes)" - ) - return [] - - feedback.info( - f"Found {len(available_episodes)} episodes available for download" - ) - - # Return both episodes and the provider anime data for later use - return available_episodes, provider_anime_data - - except Exception as e: - feedback.error(f"Error getting episodes from provider: {e}") - import traceback - - feedback.error("Full traceback", traceback.format_exc()) - return [] - - -def _determine_episodes_to_download( - episode_range, available_episodes, selector, feedback -): - """Determine which episodes to download based on range or user selection.""" - if not available_episodes: - feedback.warning("No episodes available to download") - return [] - - if episode_range: - try: - episodes_to_download = list( - parse_episode_range(episode_range, available_episodes) - ) - feedback.info( - f"Episodes from range '{episode_range}': {', '.join(episodes_to_download)}" - ) - return episodes_to_download - except (ValueError, IndexError) as e: - feedback.error(f"Invalid episode range '{episode_range}': {e}") - return [] - else: - # Let user select episodes - selected_episodes = selector.choose_multiple( - prompt="Select episodes to download", - choices=available_episodes, - header="Use TAB to select multiple episodes, ENTER to confirm", - ) - - if selected_episodes: - feedback.info(f"Selected episodes: {', '.join(selected_episodes)}") - - return selected_episodes - - -def _suggest_alternatives(anime, provider, config, feedback): - """Suggest alternatives when episodes are not found.""" - feedback.info("Troubleshooting suggestions:") - feedback.info(f"1. Current provider: {provider.__class__.__name__}") - feedback.info(f"2. AniList ID being used: {anime.id}") - feedback.info(f"3. Translation type: {config.stream.translation_type}") - - # Special message for AllAnime provider - if provider.__class__.__name__ == "AllAnimeProvider": - feedback.info( - "4. AllAnime ID mismatch: AllAnime uses different IDs than AniList" - ) - feedback.info(" The provider searches by title, but episodes use AniList ID") - feedback.info( - " This can cause episodes to not be found even if the anime exists" - ) - - # Check if provider has different ID mapping - anime_titles = [] - if anime.title.english: - anime_titles.append(anime.title.english) - if anime.title.romaji: - anime_titles.append(anime.title.romaji) - if anime.title.native: - anime_titles.append(anime.title.native) - - feedback.info(f"5. Available titles: {', '.join(anime_titles)}") - feedback.info("6. Possible solutions:") - feedback.info(" - Try a different provider (GogoAnime, 9anime, etc.)") - feedback.info(" - Check provider configuration") - feedback.info(" - Try different translation type (sub/dub)") - feedback.info(" - Manual search on the provider website") - feedback.info(" - Check if anime is available in your region") - - -def _download_episodes( - download_service, - anime, - episodes, - quality, - force_redownload, - max_concurrent, - feedback, -): - """Download the specified episodes.""" - from concurrent.futures import ThreadPoolExecutor, as_completed - from rich.console import Console - from rich.progress import ( - BarColumn, - MofNCompleteColumn, - Progress, - SpinnerColumn, - TaskProgressColumn, - TextColumn, - TimeElapsedColumn, - ) - import logging - - console = Console() - anime_title = anime.title.english or anime.title.romaji - - console.print(f"\n[bold green]Starting downloads for: {anime_title}[/bold green]") - - # Set up logging capture to get download errors - log_messages = [] - - class ListHandler(logging.Handler): - def emit(self, record): - log_messages.append(self.format(record)) - - handler = ListHandler() - handler.setLevel(logging.ERROR) - logger = logging.getLogger("fastanime") - logger.addHandler(handler) - - try: - with Progress( - SpinnerColumn(), - TextColumn("[progress.description]{task.description}"), - BarColumn(), - MofNCompleteColumn(), - TaskProgressColumn(), - TimeElapsedColumn(), - ) as progress: - task = progress.add_task("Downloading episodes...", total=len(episodes)) - - if max_concurrent == 1: - # Sequential downloads - results = {} - for episode in episodes: - progress.update( - task, description=f"Downloading episode {episode}..." - ) - - # Clear previous log messages for this episode - log_messages.clear() - - try: - success = download_service.download_episode( - media_item=anime, - episode_number=episode, - quality=quality, - force_redownload=force_redownload, - ) - results[episode] = success - - if not success: - # Try to get more detailed error from registry - error_msg = _get_episode_error_details( - download_service, anime, episode - ) - if error_msg: - feedback.error(f"Episode {episode}", error_msg) - elif log_messages: - # Show any log messages that were captured - for msg in log_messages[ - -3: - ]: # Show last 3 error messages - feedback.error(f"Episode {episode}", msg) - else: - feedback.error( - f"Episode {episode}", - "Download failed - check logs for details", - ) - - except Exception as e: - results[episode] = False - feedback.error(f"Episode {episode} failed", str(e)) - progress.advance(task) - else: - # Concurrent downloads - results = {} - with ThreadPoolExecutor(max_workers=max_concurrent) as executor: - # Submit all download tasks - future_to_episode = { - executor.submit( - download_service.download_episode, - media_item=anime, - episode_number=episode, - server=None, - quality=quality, - force_redownload=force_redownload, - ): episode - for episode in episodes - } - - # Process completed downloads - for future in as_completed(future_to_episode): - episode = future_to_episode[future] - try: - success = future.result() - results[episode] = success - if not success: - # Try to get more detailed error from registry - error_msg = _get_episode_error_details( - download_service, anime, episode - ) - if error_msg: - feedback.error(f"Episode {episode}", error_msg) - else: - feedback.error( - f"Episode {episode}", - "Download failed - check logs for details", - ) - except Exception as e: - results[episode] = False - feedback.error( - f"Download failed for episode {episode}", str(e) - ) - - progress.advance(task) - finally: - # Remove the log handler - logger.removeHandler(handler) - - # Display results - _display_download_results(console, results, anime) - - -def _get_episode_error_details(download_service, anime, episode_number): - """Get detailed error information from the registry for a failed episode.""" - try: - # Get the media record from registry - media_record = download_service.media_registry.get_record(anime.id) - if not media_record: - return None - - # Find the episode in the record - for episode_record in media_record.episodes: - if episode_record.episode_number == episode_number: - if episode_record.error_message: - error_msg = episode_record.error_message - - # Provide more helpful error messages for common issues - if "Failed to get server for episode" in error_msg: - return f"Episode {episode_number} not available on current provider. Try a different provider or check episode number." - elif "NoneType" in error_msg or "not subscriptable" in error_msg: - return f"Episode {episode_number} data not found on provider (API returned null). Episode may not exist or be accessible." - else: - return error_msg - elif episode_record.download_status: - return f"Download status: {episode_record.download_status.value}" - break - - return None - except Exception: - return None - - -def _test_episode_stream_availability( - provider, anime, episode_number, config, feedback -): - """Test if streams are available for a specific episode.""" - try: - from .....libs.provider.anime.params import EpisodeStreamsParams - - media_title = anime.title.english or anime.title.romaji - feedback.info( - f"Testing stream availability for '{media_title}' episode {episode_number}" - ) - - # Test episode streams - streams = provider.episode_streams( - EpisodeStreamsParams( - anime_id=str(anime.id), - query=media_title, - episode=episode_number, - translation_type=config.stream.translation_type, - ) - ) - - if not streams: - feedback.warning(f"No streams found for episode {episode_number}") - return False - - # Convert to list to check actual availability - stream_list = list(streams) - if not stream_list: - feedback.warning( - f"No stream servers available for episode {episode_number}" - ) - return False - - feedback.info( - f"Found {len(stream_list)} stream server(s) for episode {episode_number}" - ) - - # Show details about the first server for debugging - first_server = stream_list[0] - feedback.info( - f"First server: name='{first_server.name}', type='{type(first_server).__name__}'" - ) - - return True - - except TypeError as e: - if "'NoneType' object is not subscriptable" in str(e): - feedback.warning( - f"Episode {episode_number} not available on provider (API returned null)" - ) - feedback.info( - "This usually means the episode doesn't exist on this provider or isn't accessible" - ) - return False - else: - feedback.error(f"Type error testing stream availability: {e}") - return False - except Exception as e: - feedback.error(f"Error testing stream availability: {e}") - import traceback - - feedback.error("Stream test traceback", traceback.format_exc()) - return False - - -def _display_download_results(console, results: dict[str, bool], anime): - """Display download results in a formatted table.""" - from rich.table import Table - - table = Table( - title=f"Download Results for {anime.title.english or anime.title.romaji}" - ) - table.add_column("Episode", justify="center", style="cyan") - table.add_column("Status", justify="center") - - for episode, success in sorted(results.items(), key=lambda x: float(x[0])): - status = "[green]โœ“ Success[/green]" if success else "[red]โœ— Failed[/red]" - table.add_row(episode, status) - - console.print(table) - - # Summary - total = len(results) - successful = sum(results.values()) - failed = total - successful - - if failed == 0: - console.print( - f"\n[bold green]All {total} episodes downloaded successfully![/bold green]" - ) - else: - console.print( - f"\n[yellow]Download complete: {successful}/{total} successful, {failed} failed[/yellow]" - ) - - -def _show_final_statistics(download_service, feedback): - """Show final download statistics.""" - from rich.console import Console - - console = Console() - stats = download_service.get_download_statistics() - - if stats: - console.print("\n[bold blue]Overall Download Statistics:[/bold blue]") - console.print(f"Total episodes tracked: {stats.get('total_episodes', 0)}") - console.print(f"Successfully downloaded: {stats.get('downloaded', 0)}") - console.print(f"Failed downloads: {stats.get('failed', 0)}") - console.print(f"Queued downloads: {stats.get('queued', 0)}") - - if stats.get("total_size_bytes", 0) > 0: - size_mb = stats["total_size_bytes"] / (1024 * 1024) - if size_mb > 1024: - console.print(f"Total size: {size_mb / 1024:.2f} GB") - else: - console.print(f"Total size: {size_mb:.2f} MB") diff --git a/fastanime/cli/interactive/menu/share/auth.py b/fastanime/cli/interactive/menu/share/auth.py deleted file mode 100644 index 62033a0..0000000 --- a/fastanime/cli/interactive/menu/share/auth.py +++ /dev/null @@ -1,283 +0,0 @@ -""" -Interactive authentication menu for AniList OAuth login/logout and user profile management. -Implements Step 5: AniList Authentication Flow -""" - -import webbrowser -from typing import Optional - -from rich.console import Console -from rich.panel import Panel -from rich.table import Table - -from ....libs.media_api.types import UserProfile -from ...auth.manager import AuthManager -from ...utils.feedback import create_feedback_manager, execute_with_feedback -from ..session import Context, session -from ..state import InternalDirective, State - - -@session.menu -def auth(ctx: Context, state: State) -> State | InternalDirective: - """ - Interactive authentication menu for managing AniList login/logout and viewing user profile. - """ - icons = ctx.config.general.icons - feedback = create_feedback_manager(icons) - console = Console() - console.clear() - - # Get current authentication status - user_profile = getattr(ctx.media_api, "user_profile", None) - auth_manager = AuthManager() - - # Display current authentication status - _display_auth_status(console, user_profile, icons) - - # Menu options based on authentication status - if user_profile: - options = [ - f"{'๐Ÿ‘ค ' if icons else ''}View Profile Details", - f"{'๐Ÿ”“ ' if icons else ''}Logout", - f"{'โ†ฉ๏ธ ' if icons else ''}Back to Main Menu", - ] - else: - options = [ - f"{'๐Ÿ” ' if icons else ''}Login to AniList", - f"{'โ“ ' if icons else ''}How to Get Token", - f"{'โ†ฉ๏ธ ' if icons else ''}Back to Main Menu", - ] - - choice = ctx.selector.choose( - prompt="Select Authentication Action", - choices=options, - header="AniList Authentication Menu", - ) - - if not choice: - return InternalDirective.BACK - - # Handle menu choices - if "Login to AniList" in choice: - return _handle_login(ctx, auth_manager, feedback, icons) - elif "Logout" in choice: - return _handle_logout(ctx, auth_manager, feedback, icons) - elif "View Profile Details" in choice: - _display_user_profile_details(console, user_profile, icons) - feedback.pause_for_user("Press Enter to continue") - return InternalDirective.RELOAD - elif "How to Get Token" in choice: - _display_token_help(console, icons) - feedback.pause_for_user("Press Enter to continue") - return InternalDirective.RELOAD - else: # Back to Main Menu - return InternalDirective.BACK - - -def _display_auth_status( - console: Console, user_profile: Optional[UserProfile], icons: bool -): - """Display current authentication status in a nice panel.""" - if user_profile: - status_icon = "๐ŸŸข" if icons else "[green]โ—[/green]" - status_text = f"{status_icon} Authenticated" - user_info = f"Logged in as: [bold cyan]{user_profile.name}[/bold cyan]\nUser ID: {user_profile.id}" - else: - status_icon = "๐Ÿ”ด" if icons else "[red]โ—‹[/red]" - status_text = f"{status_icon} Not Authenticated" - user_info = "Log in to access personalized features like:\nโ€ข Your anime lists (Watching, Completed, etc.)\nโ€ข Progress tracking\nโ€ข List management" - - panel = Panel( - user_info, - title=f"Authentication Status: {status_text}", - border_style="green" if user_profile else "red", - ) - console.print(panel) - console.print() - - -def _handle_login( - ctx: Context, auth_manager: AuthManager, feedback, icons: bool -) -> State | InternalDirective: - """Handle the interactive login process.""" - - def perform_login(): - # Open browser to AniList OAuth page - oauth_url = "https://anilist.co/api/v2/oauth/authorize?client_id=20148&response_type=token" - - if feedback.confirm( - "Open AniList authorization page in browser?", default=True - ): - try: - webbrowser.open(oauth_url) - feedback.info( - "Browser opened", - "Complete the authorization process in your browser", - ) - except Exception: - feedback.warning( - "Could not open browser automatically", - f"Please manually visit: {oauth_url}", - ) - else: - feedback.info("Manual authorization", f"Please visit: {oauth_url}") - - # Get token from user - feedback.info( - "Token Input", "Paste the token from the browser URL after '#access_token='" - ) - token = ctx.selector.ask("Enter your AniList Access Token") - - if not token or not token.strip(): - feedback.error("Login cancelled", "No token provided") - return None - - # Authenticate with the API - profile = ctx.media_api.authenticate(token.strip()) - - if not profile: - feedback.error( - "Authentication failed", "The token may be invalid or expired" - ) - return None - - # Save credentials using the auth manager - auth_manager.save_user_profile(profile, token.strip()) - return profile - - success, profile = execute_with_feedback( - perform_login, - feedback, - "authenticate", - loading_msg="Validating token with AniList", - success_msg="Successfully logged in! ๐ŸŽ‰" - if icons - else "Successfully logged in!", - error_msg="Login failed", - show_loading=True, - ) - - if success and profile: - feedback.success( - f"Logged in as {profile.name}" if profile else "Successfully logged in" - ) - feedback.pause_for_user("Press Enter to continue") - - return InternalDirective.RELOAD - - -def _handle_logout( - ctx: Context, auth_manager: AuthManager, feedback, icons: bool -) -> State | InternalDirective: - """Handle the logout process with confirmation.""" - if not feedback.confirm( - "Are you sure you want to logout?", - "This will remove your saved AniList token and log you out", - default=False, - ): - return InternalDirective.RELOAD - - def perform_logout(): - # Clear from auth manager - if hasattr(auth_manager, "logout"): - auth_manager.logout() - else: - auth_manager.clear_user_profile() - - # Clear from API client - ctx.media_api.token = None - ctx.media_api.user_profile = None - if hasattr(ctx.media_api, "http_client"): - ctx.media_api.http_client.headers.pop("Authorization", None) - - return True - - success, _ = execute_with_feedback( - perform_logout, - feedback, - "logout", - loading_msg="Logging out", - success_msg="Successfully logged out ๐Ÿ‘‹" - if icons - else "Successfully logged out", - error_msg="Logout failed", - show_loading=False, - ) - - if success: - feedback.pause_for_user("Press Enter to continue") - - return InternalDirective.CONFIG_EDIT - - -def _display_user_profile_details( - console: Console, user_profile: UserProfile, icons: bool -): - """Display detailed user profile information.""" - if not user_profile: - console.print("[red]No user profile available[/red]") - return - - # Create a detailed profile table - table = Table(title=f"{'๐Ÿ‘ค ' if icons else ''}User Profile: {user_profile.name}") - table.add_column("Property", style="cyan", no_wrap=True) - table.add_column("Value", style="green") - - table.add_row("Name", user_profile.name) - table.add_row("User ID", str(user_profile.id)) - - if user_profile.avatar_url: - table.add_row("Avatar URL", user_profile.avatar_url) - - if user_profile.banner_url: - table.add_row("Banner URL", user_profile.banner_url) - - console.print() - console.print(table) - console.print() - - # Show available features - features_panel = Panel( - "Available Features:\n" - f"{'๐Ÿ“บ ' if icons else 'โ€ข '}Access your anime lists (Watching, Completed, etc.)\n" - f"{'โœ๏ธ ' if icons else 'โ€ข '}Update watch progress and scores\n" - f"{'โž• ' if icons else 'โ€ข '}Add/remove anime from your lists\n" - f"{'๐Ÿ”„ ' if icons else 'โ€ข '}Sync progress with AniList\n" - f"{'๐Ÿ”” ' if icons else 'โ€ข '}Access AniList notifications", - title="Available with Authentication", - border_style="green", - ) - console.print(features_panel) - - -def _display_token_help(console: Console, icons: bool): - """Display help information about getting an AniList token.""" - help_text = """ -[bold cyan]How to get your AniList Access Token:[/bold cyan] - -[bold]Step 1:[/bold] Visit the AniList authorization page -https://anilist.co/api/v2/oauth/authorize?client_id=20148&response_type=token - -[bold]Step 2:[/bold] Log in to your AniList account if prompted - -[bold]Step 3:[/bold] Click "Authorize" to grant FastAnime access - -[bold]Step 4:[/bold] Copy the token from the browser URL -Look for the part after "#access_token=" in the address bar - -[bold]Step 5:[/bold] Paste the token when prompted in FastAnime - -[yellow]Note:[/yellow] The token will be stored securely and used for all AniList features. -You only need to do this once unless you revoke access or the token expires. - -[yellow]Privacy:[/yellow] FastAnime only requests minimal permissions needed for -list management and does not access sensitive account information. -""" - - panel = Panel( - help_text, - title=f"{'โ“ ' if icons else ''}AniList Token Help", - border_style="blue", - ) - console.print() - console.print(panel) diff --git a/fastanime/cli/interactive/menu/share/session_management.py b/fastanime/cli/interactive/menu/share/session_management.py deleted file mode 100644 index a187b84..0000000 --- a/fastanime/cli/interactive/menu/share/session_management.py +++ /dev/null @@ -1,254 +0,0 @@ -""" -Session management menu for the interactive CLI. -Provides options to save, load, and manage session state. -""" - -from datetime import datetime -from pathlib import Path -from typing import Callable, Dict - -from rich.console import Console -from rich.table import Table - -from ....core.constants import APP_DIR -from ...utils.feedback import create_feedback_manager -from ..session import Context, session -from ..state import ControlFlow, State - -MenuAction = Callable[[], str] - - -@session.menu -def session_management(ctx: Context, state: State) -> State | ControlFlow: - """ - Session management menu for saving, loading, and managing session state. - """ - icons = ctx.config.general.icons - feedback = create_feedback_manager(icons) - console = Console() - console.clear() - - # Show current session stats - _display_session_info(console, icons) - - options: Dict[str, MenuAction] = { - f"{'๐Ÿ’พ ' if icons else ''}Save Current Session": lambda: _save_session( - ctx, feedback - ), - f"{'๐Ÿ“‚ ' if icons else ''}Load Session": lambda: _load_session(ctx, feedback), - f"{'๐Ÿ“‹ ' if icons else ''}List Saved Sessions": lambda: _list_sessions( - ctx, feedback - ), - f"{'๐Ÿ—‘๏ธ ' if icons else ''}Cleanup Old Sessions": lambda: _cleanup_sessions( - ctx, feedback - ), - f"{'๐Ÿ’พ ' if icons else ''}Create Manual Backup": lambda: _create_backup( - ctx, feedback - ), - f"{'โš™๏ธ ' if icons else ''}Session Settings": lambda: _session_settings( - ctx, feedback - ), - f"{'๐Ÿ”™ ' if icons else ''}Back to Main Menu": lambda: "BACK", - } - - choice_str = ctx.selector.choose( - prompt="Select Session Action", - choices=list(options.keys()), - header="Session Management", - ) - - if not choice_str: - return ControlFlow.BACK - - result = options[choice_str]() - - if result == "BACK": - return ControlFlow.BACK - else: - return ControlFlow.CONTINUE - - -def _display_session_info(console: Console, icons: bool): - """Display current session information.""" - session_stats = session.get_session_stats() - - table = Table(title=f"{'๐Ÿ“Š ' if icons else ''}Current Session Info") - table.add_column("Property", style="cyan") - table.add_column("Value", style="green") - - table.add_row("Current States", str(session_stats["current_states"])) - table.add_row("Current Menu", session_stats["current_menu"] or "None") - table.add_row( - "Auto-Save", "Enabled" if session_stats["auto_save_enabled"] else "Disabled" - ) - table.add_row("Has Auto-Save", "Yes" if session_stats["has_auto_save"] else "No") - table.add_row( - "Has Crash Backup", "Yes" if session_stats["has_crash_backup"] else "No" - ) - - console.print(table) - console.print() - - -def _save_session(ctx: Context, feedback) -> str: - """Save the current session.""" - session_name = ctx.selector.ask("Enter session name (optional):") - description = ctx.selector.ask("Enter session description (optional):") - - if not session_name: - session_name = f"session_{datetime.now().strftime('%Y%m%d_%H%M%S')}" - - sessions_dir = APP_DIR / "sessions" - file_path = sessions_dir / f"{session_name}.json" - - if file_path.exists(): - if not feedback.confirm(f"Session '{session_name}' already exists. Overwrite?"): - feedback.info("Save cancelled") - return "CONTINUE" - - success = session.save(file_path, session_name, description or "") - if success: - feedback.success(f"Session saved as '{session_name}'") - - return "CONTINUE" - - -def _load_session(ctx: Context, feedback) -> str: - """Load a saved session.""" - sessions = session.list_saved_sessions() - - if not sessions: - feedback.warning("No saved sessions found") - return "CONTINUE" - - # Create choices with session info - choices = [] - session_map = {} - - for sess in sessions: - choice_text = f"{sess['name']} - {sess['description'][:50]}{'...' if len(sess['description']) > 50 else ''}" - choices.append(choice_text) - session_map[choice_text] = sess - - choices.append("Cancel") - - choice = ctx.selector.choose( - "Select session to load:", choices=choices, header="Available Sessions" - ) - - if not choice or choice == "Cancel": - return "CONTINUE" - - selected_session = session_map[choice] - file_path = Path(selected_session["path"]) - - if feedback.confirm( - f"Load session '{selected_session['name']}'? This will replace your current session." - ): - success = session.resume(file_path, feedback) - if success: - feedback.info("Session loaded successfully. Returning to main menu.") - # Return to main menu after loading - return "MAIN" - - return "CONTINUE" - - -def _list_sessions(ctx: Context, feedback) -> str: - """List all saved sessions.""" - sessions = session.list_saved_sessions() - - if not sessions: - feedback.info("No saved sessions found") - return "CONTINUE" - - console = Console() - table = Table(title="Saved Sessions") - table.add_column("Name", style="cyan") - table.add_column("Description", style="yellow") - table.add_column("States", style="green") - table.add_column("Created", style="blue") - - for sess in sessions: - # Format the created date - created = sess["created"] - if "T" in created: - created = created.split("T")[0] # Just show the date part - - table.add_row( - sess["name"], - sess["description"][:40] + "..." - if len(sess["description"]) > 40 - else sess["description"], - str(sess["state_count"]), - created, - ) - - console.print(table) - feedback.pause_for_user() - - return "CONTINUE" - - -def _cleanup_sessions(ctx: Context, feedback) -> str: - """Clean up old sessions.""" - sessions = session.list_saved_sessions() - - if len(sessions) <= 5: - feedback.info("No cleanup needed. You have 5 or fewer sessions.") - return "CONTINUE" - - max_sessions_str = ctx.selector.ask("How many sessions to keep? (default: 10)") - try: - max_sessions = int(max_sessions_str) if max_sessions_str else 10 - except ValueError: - feedback.error("Invalid number entered") - return "CONTINUE" - - if feedback.confirm(f"Delete sessions older than the {max_sessions} most recent?"): - deleted_count = session.cleanup_old_sessions(max_sessions) - feedback.success(f"Deleted {deleted_count} old sessions") - - return "CONTINUE" - - -def _create_backup(ctx: Context, feedback) -> str: - """Create a manual backup.""" - backup_name = ctx.selector.ask("Enter backup name (optional):") - - success = session.create_manual_backup(backup_name or "") - if success: - feedback.success("Manual backup created successfully") - - return "CONTINUE" - - -def _session_settings(ctx: Context, feedback) -> str: - """Configure session settings.""" - current_auto_save = session._auto_save_enabled - - choices = [ - f"Auto-Save: {'Enabled' if current_auto_save else 'Disabled'}", - "Clear Auto-Save File", - "Clear Crash Backup", - "Back", - ] - - choice = ctx.selector.choose("Session Settings:", choices=choices) - - if choice and choice.startswith("Auto-Save"): - new_setting = not current_auto_save - session.enable_auto_save(new_setting) - feedback.success(f"Auto-save {'enabled' if new_setting else 'disabled'}") - - elif choice == "Clear Auto-Save File": - if feedback.confirm("Clear the auto-save file?"): - session._session_manager.clear_auto_save() - feedback.success("Auto-save file cleared") - - elif choice == "Clear Crash Backup": - if feedback.confirm("Clear the crash backup file?"): - session._session_manager.clear_crash_backup() - feedback.success("Crash backup cleared") - - return "CONTINUE" diff --git a/fastanime/cli/interactive/menu/share/user_media_list.py b/fastanime/cli/interactive/menu/share/user_media_list.py deleted file mode 100644 index 7bdaa43..0000000 --- a/fastanime/cli/interactive/menu/share/user_media_list.py +++ /dev/null @@ -1,827 +0,0 @@ -""" -AniList Watch List Operations Menu -Implements Step 8: Remote Watch List Operations - -Provides comprehensive AniList list management including: -- Viewing user lists (Watching, Completed, Planning, etc.) -- Interactive list selection and navigation -- Adding/removing anime from lists -- List statistics and overview -""" - -import logging - -from rich.console import Console -from rich.panel import Panel -from rich.table import Table - -from ....libs.media_api.params import UpdateUserMediaListEntryParams, UserListParams -from ....libs.media_api.types import MediaItem, MediaSearchResult, UserListItem -from ...utils.feedback import create_feedback_manager, execute_with_feedback -from ..session import Context, session -from ..state import ControlFlow, MediaApiState, State - -logger = logging.getLogger(__name__) - - -@session.menu -def anilist_lists(ctx: Context, state: State) -> State | ControlFlow: - """ - Main AniList lists management menu. - Shows all user lists with statistics and navigation options. - """ - icons = ctx.config.general.icons - feedback = create_feedback_manager(icons) - console = Console() - console.clear() - - # Check authentication - if not ctx.media_api.user_profile: - feedback.error( - "Authentication Required", - "You must be logged in to access your AniList lists. Please authenticate first.", - ) - feedback.pause_for_user("Press Enter to continue") - return State(menu_name="AUTH") - - # Display user profile and lists overview - _display_lists_overview(console, ctx, icons) - - # Menu options - options = [ - f"{'๐Ÿ“บ ' if icons else ''}Currently Watching", - f"{'๐Ÿ“‹ ' if icons else ''}Planning to Watch", - f"{'โœ… ' if icons else ''}Completed", - f"{'โธ๏ธ ' if icons else ''}Paused", - f"{'๐Ÿšฎ ' if icons else ''}Dropped", - f"{'๐Ÿ” ' if icons else ''}Rewatching", - f"{'๐Ÿ“Š ' if icons else ''}View All Lists Statistics", - f"{'๐Ÿ” ' if icons else ''}Search Across All Lists", - f"{'โž• ' if icons else ''}Add Anime to List", - f"{'โ†ฉ๏ธ ' if icons else ''}Back to Main Menu", - ] - - choice = ctx.selector.choose( - prompt="Select List Action", - choices=options, - header=f"AniList Lists - {ctx.media_api.user_profile.name}", - ) - - if not choice: - return ControlFlow.BACK - - # Handle menu choices - if "Currently Watching" in choice: - return _navigate_to_list(ctx, "CURRENT") - elif "Planning to Watch" in choice: - return _navigate_to_list(ctx, "PLANNING") - elif "Completed" in choice: - return _navigate_to_list(ctx, "COMPLETED") - elif "Paused" in choice: - return _navigate_to_list(ctx, "PAUSED") - elif "Dropped" in choice: - return _navigate_to_list(ctx, "DROPPED") - elif "Rewatching" in choice: - return _navigate_to_list(ctx, "REPEATING") - elif "View All Lists Statistics" in choice: - return _show_all_lists_stats(ctx, feedback, icons) - elif "Search Across All Lists" in choice: - return _search_all_lists(ctx, feedback, icons) - elif "Add Anime to List" in choice: - return _add_anime_to_list(ctx, feedback, icons) - else: # Back to Main Menu - return ControlFlow.BACK - - -@session.menu -def anilist_list_view(ctx: Context, state: State) -> State | ControlFlow: - """ - View and manage a specific AniList list (e.g., Watching, Completed). - """ - icons = ctx.config.general.icons - feedback = create_feedback_manager(icons) - console = Console() - console.clear() - - # Get list status from state data - list_status = state.data.get("list_status") if state.data else "CURRENT" - page = state.data.get("page", 1) if state.data else 1 - - # Fetch list data - def fetch_list(): - return ctx.media_api.search_media_list( - UserListParams(status=list_status, page=page, per_page=20) - ) - - success, result = execute_with_feedback( - fetch_list, - feedback, - f"fetch {_status_to_display_name(list_status)} list", - loading_msg=f"Loading {_status_to_display_name(list_status)} list...", - success_msg=f"Loaded {_status_to_display_name(list_status)} list", - error_msg=f"Failed to load {_status_to_display_name(list_status)} list", - ) - - if not success or not result: - feedback.pause_for_user("Press Enter to continue") - return ControlFlow.BACK - - # Display list contents - _display_list_contents(console, result, list_status, page, icons) - - # Menu options - options = [ - f"{'๐Ÿ‘๏ธ ' if icons else ''}View/Edit Anime Details", - f"{'๐Ÿ”„ ' if icons else ''}Refresh List", - f"{'โž• ' if icons else ''}Add New Anime", - f"{'๐Ÿ—‘๏ธ ' if icons else ''}Remove from List", - ] - - # Add pagination options - if result.page_info.has_next_page: - options.append(f"{'โžก๏ธ ' if icons else ''}Next Page") - if page > 1: - options.append(f"{'โฌ…๏ธ ' if icons else ''}Previous Page") - - options.extend( - [ - f"{'๐Ÿ“Š ' if icons else ''}List Statistics", - f"{'โ†ฉ๏ธ ' if icons else ''}Back to Lists Menu", - ] - ) - - choice = ctx.selector.choose( - prompt="Select Action", - choices=options, - header=f"{_status_to_display_name(list_status)} - Page {page}", - ) - - if not choice: - return ControlFlow.BACK - - # Handle menu choices - if "View/Edit Anime Details" in choice: - return _select_anime_for_details(ctx, result, list_status, page) - elif "Refresh List" in choice: - return ControlFlow.CONTINUE - elif "Add New Anime" in choice: - return _add_anime_to_specific_list(ctx, list_status, feedback, icons) - elif "Remove from List" in choice: - return _remove_anime_from_list(ctx, result, list_status, page, feedback, icons) - elif "Next Page" in choice: - return State( - menu_name="ANILIST_LIST_VIEW", - data={"list_status": list_status, "page": page + 1}, - ) - elif "Previous Page" in choice: - return State( - menu_name="ANILIST_LIST_VIEW", - data={"list_status": list_status, "page": page - 1}, - ) - elif "List Statistics" in choice: - return _show_list_statistics(ctx, list_status, feedback, icons) - else: # Back to Lists Menu - return State(menu_name="ANILIST_LISTS") - - -@session.menu -def anilist_anime_details(ctx: Context, state: State) -> State | ControlFlow: - """ - View and edit details for a specific anime in a user's list. - """ - icons = ctx.config.general.icons - feedback = create_feedback_manager(icons) - console = Console() - console.clear() - - # Get anime and list info from state - if not state.data: - return ControlFlow.BACK - - anime = state.data.get("anime") - list_status = state.data.get("list_status") - return_page = state.data.get("return_page", 1) - from_media_actions = state.data.get("from_media_actions", False) - - if not anime: - return ControlFlow.BACK - - # Display anime details - _display_anime_list_details(console, anime, icons) - - # Menu options - options = [ - f"{'โœ๏ธ ' if icons else ''}Edit Progress", - f"{'โญ ' if icons else ''}Edit Rating", - f"{'๐Ÿ“ ' if icons else ''}Edit Status", - f"{'๐ŸŽฌ ' if icons else ''}Watch/Stream", - f"{'๐Ÿ—‘๏ธ ' if icons else ''}Remove from List", - f"{'โ†ฉ๏ธ ' if icons else ''}Back to List", - ] - - choice = ctx.selector.choose( - prompt="Select Action", - choices=options, - header=f"{anime.title.english or anime.title.romaji}", - ) - - if not choice: - # Return to appropriate menu based on how we got here - if from_media_actions: - return ControlFlow.BACK - elif list_status: - return State( - menu_name="ANILIST_LIST_VIEW", - data={"list_status": list_status, "page": return_page}, - ) - else: - return State(menu_name="ANILIST_LISTS") - - # Handle menu choices - if "Edit Progress" in choice: - return _edit_anime_progress( - ctx, anime, list_status, return_page, feedback, from_media_actions - ) - elif "Edit Rating" in choice: - return _edit_anime_rating( - ctx, anime, list_status, return_page, feedback, from_media_actions - ) - elif "Edit Status" in choice: - return _edit_anime_status( - ctx, anime, list_status, return_page, feedback, from_media_actions - ) - elif "Watch/Stream" in choice: - return _stream_anime(ctx, anime) - elif "Remove from List" in choice: - return _confirm_remove_anime( - ctx, anime, list_status, return_page, feedback, icons, from_media_actions - ) - else: # Back to List/Media Actions - # Return to appropriate menu based on how we got here - if from_media_actions: - return ControlFlow.BACK - elif list_status: - return State( - menu_name="ANILIST_LIST_VIEW", - data={"list_status": list_status, "page": return_page}, - ) - else: - return State(menu_name="ANILIST_LISTS") - - -def _display_lists_overview(console: Console, ctx: Context, icons: bool): - """Display overview of all user lists with counts.""" - user = ctx.media_api.user_profile - - # Create overview panel - overview_text = f"[bold cyan]{user.name}[/bold cyan]'s AniList Management\n" - overview_text += f"User ID: {user.id}\n\n" - overview_text += "Manage your anime lists, track progress, and sync with AniList" - - panel = Panel( - overview_text, - title=f"{'๐Ÿ“š ' if icons else ''}AniList Lists Overview", - border_style="cyan", - ) - console.print(panel) - console.print() - - -def _display_list_contents( - console: Console, - result: MediaSearchResult, - list_status: str, - page: int, - icons: bool, -): - """Display the contents of a specific list in a table.""" - if not result.media: - console.print( - f"[yellow]No anime found in {_status_to_display_name(list_status)} list[/yellow]" - ) - return - - table = Table(title=f"{_status_to_display_name(list_status)} - Page {page}") - table.add_column("Title", style="cyan", no_wrap=False, width=40) - table.add_column("Episodes", justify="center", width=10) - table.add_column("Progress", justify="center", width=10) - table.add_column("Score", justify="center", width=8) - table.add_column("Status", justify="center", width=12) - - for i, anime in enumerate(result.media, 1): - title = anime.title.english or anime.title.romaji or "Unknown Title" - episodes = str(anime.episodes or "?") - - # Get list entry details if available - progress = "?" - score = "?" - status = _status_to_display_name(list_status) - - # Note: In a real implementation, you'd get these from the MediaList entry - # For now, we'll show placeholders - if hasattr(anime, "media_list_entry") and anime.media_list_entry: - progress = str(anime.media_list_entry.progress or 0) - score = str(anime.media_list_entry.score or "-") - - table.add_row(f"{i}. {title}", episodes, progress, score, status) - - console.print(table) - console.print( - f"\nShowing {len(result.media)} anime from {_status_to_display_name(list_status)} list" - ) - - # Show pagination info - if result.page_info.has_next_page: - console.print("[dim]More results available on next page[/dim]") - - -def _display_anime_list_details(console: Console, anime: MediaItem, icons: bool): - """Display detailed information about an anime in the user's list.""" - title = anime.title.english or anime.title.romaji or "Unknown Title" - - details_text = f"[bold]{title}[/bold]\n\n" - details_text += f"Episodes: {anime.episodes or 'Unknown'}\n" - details_text += f"Status: {anime.status or 'Unknown'}\n" - details_text += ( - f"Genres: {', '.join(anime.genres) if anime.genres else 'Unknown'}\n" - ) - - if anime.description: - # Truncate description for display - desc = ( - anime.description[:300] + "..." - if len(anime.description) > 300 - else anime.description - ) - details_text += f"\nDescription:\n{desc}" - - # Add list-specific information if available - if hasattr(anime, "media_list_entry") and anime.media_list_entry: - entry = anime.media_list_entry - details_text += "\n\n[bold cyan]Your List Info:[/bold cyan]\n" - details_text += f"Progress: {entry.progress or 0} episodes\n" - details_text += f"Score: {entry.score or 'Not rated'}\n" - details_text += f"Status: {_status_to_display_name(entry.status) if hasattr(entry, 'status') else 'Unknown'}\n" - - panel = Panel( - details_text, - title=f"{'๐Ÿ“บ ' if icons else ''}Anime Details", - border_style="blue", - ) - console.print(panel) - - -def _navigate_to_list(ctx: Context, list_status: UserListItem) -> State: - """Navigate to a specific list view.""" - return State( - menu_name="ANILIST_LIST_VIEW", data={"list_status": list_status, "page": 1} - ) - - -def _select_anime_for_details( - ctx: Context, result: MediaSearchResult, list_status: str, page: int -) -> State | ControlFlow: - """Let user select an anime from the list to view/edit details.""" - if not result.media: - return ControlFlow.CONTINUE - - # Create choices from anime list - choices = [] - for i, anime in enumerate(result.media, 1): - title = anime.title.english or anime.title.romaji or "Unknown Title" - choices.append(f"{i}. {title}") - - choice = ctx.selector.choose( - prompt="Select anime to view/edit", - choices=choices, - header="Select Anime", - ) - - if not choice: - return ControlFlow.CONTINUE - - # Extract index and get selected anime - try: - index = int(choice.split(".")[0]) - 1 - selected_anime = result.media[index] - - return State( - menu_name="ANILIST_ANIME_DETAILS", - data={ - "anime": selected_anime, - "list_status": list_status, - "return_page": page, - }, - ) - except (ValueError, IndexError): - return ControlFlow.CONTINUE - - -def _edit_anime_progress( - ctx: Context, - anime: MediaItem, - list_status: str, - return_page: int, - feedback, - from_media_actions: bool = False, -) -> State | ControlFlow: - """Edit the progress (episodes watched) for an anime.""" - current_progress = 0 - if hasattr(anime, "media_list_entry") and anime.media_list_entry: - current_progress = anime.media_list_entry.progress or 0 - - max_episodes = anime.episodes or 999 - - try: - new_progress = click.prompt( - f"Enter new progress (0-{max_episodes}, current: {current_progress})", - type=int, - default=current_progress, - ) - - if new_progress < 0 or new_progress > max_episodes: - feedback.error( - "Invalid progress", f"Progress must be between 0 and {max_episodes}" - ) - feedback.pause_for_user("Press Enter to continue") - return ControlFlow.CONTINUE - - # Update via API - def update_progress(): - return ctx.media_api.update_list_entry( - UpdateUserMediaListEntryParams(media_id=anime.id, progress=new_progress) - ) - - success, _ = execute_with_feedback( - update_progress, - feedback, - "update progress", - loading_msg="Updating progress...", - success_msg=f"Progress updated to {new_progress} episodes", - error_msg="Failed to update progress", - ) - - if success: - feedback.pause_for_user("Press Enter to continue") - - except click.Abort: - pass - - # Return to appropriate menu based on how we got here - if from_media_actions: - return ControlFlow.BACK - elif list_status: - return State( - menu_name="ANILIST_LIST_VIEW", - data={"list_status": list_status, "page": return_page}, - ) - else: - return State(menu_name="ANILIST_LISTS") - - -def _edit_anime_rating( - ctx: Context, - anime: MediaItem, - list_status: str, - return_page: int, - feedback, - from_media_actions: bool = False, -) -> State | ControlFlow: - """Edit the rating/score for an anime.""" - current_score = 0.0 - if hasattr(anime, "media_list_entry") and anime.media_list_entry: - current_score = anime.media_list_entry.score or 0.0 - - try: - new_score = click.prompt( - f"Enter new rating (0.0-10.0, current: {current_score})", - type=float, - default=current_score, - ) - - if new_score < 0.0 or new_score > 10.0: - feedback.error("Invalid rating", "Rating must be between 0.0 and 10.0") - feedback.pause_for_user("Press Enter to continue") - return ControlFlow.CONTINUE - - # Update via API - def update_score(): - return ctx.media_api.update_list_entry( - UpdateUserMediaListEntryParams(media_id=anime.id, score=new_score) - ) - - success, _ = execute_with_feedback( - update_score, - feedback, - "update rating", - loading_msg="Updating rating...", - success_msg=f"Rating updated to {new_score}/10", - error_msg="Failed to update rating", - ) - - if success: - feedback.pause_for_user("Press Enter to continue") - - except click.Abort: - pass - - # Return to appropriate menu based on how we got here - if from_media_actions: - return ControlFlow.BACK - elif list_status: - return State( - menu_name="ANILIST_LIST_VIEW", - data={"list_status": list_status, "page": return_page}, - ) - else: - return State(menu_name="ANILIST_LISTS") - - -def _edit_anime_status( - ctx: Context, - anime: MediaItem, - list_status: str, - return_page: int, - feedback, - from_media_actions: bool = False, -) -> State | ControlFlow: - """Edit the list status for an anime.""" - status_options = [ - "CURRENT (Currently Watching)", - "PLANNING (Plan to Watch)", - "COMPLETED (Completed)", - "PAUSED (Paused)", - "DROPPED (Dropped)", - "REPEATING (Rewatching)", - ] - - choice = ctx.selector.choose( - prompt="Select new status", - choices=status_options, - header="Change List Status", - ) - - if not choice: - return ControlFlow.CONTINUE - - new_status = choice.split(" ")[0] - - # Update via API - def update_status(): - return ctx.media_api.update_list_entry( - UpdateUserMediaListEntryParams(media_id=anime.id, status=new_status) - ) - - success, _ = execute_with_feedback( - update_status, - feedback, - "update status", - loading_msg="Updating status...", - success_msg=f"Status updated to {_status_to_display_name(new_status)}", - error_msg="Failed to update status", - ) - - if success: - feedback.pause_for_user("Press Enter to continue") - - # If status changed, return to main lists menu since the anime - # is no longer in the current list - if new_status != list_status: - if from_media_actions: - return ControlFlow.BACK - else: - return State(menu_name="ANILIST_LISTS") - - # Return to appropriate menu based on how we got here - if from_media_actions: - return ControlFlow.BACK - elif list_status: - return State( - menu_name="ANILIST_LIST_VIEW", - data={"list_status": list_status, "page": return_page}, - ) - else: - return State(menu_name="ANILIST_LISTS") - - -def _confirm_remove_anime( - ctx: Context, - anime: MediaItem, - list_status: str, - return_page: int, - feedback, - icons: bool, - from_media_actions: bool = False, -) -> State | ControlFlow: - """Confirm and remove an anime from the user's list.""" - title = anime.title.english or anime.title.romaji or "Unknown Title" - - if not feedback.confirm( - f"Remove '{title}' from your {_status_to_display_name(list_status)} list?", - default=False, - ): - return ControlFlow.CONTINUE - - # Remove via API - def remove_anime(): - return ctx.media_api.delete_list_entry(anime.id) - - success, _ = execute_with_feedback( - remove_anime, - feedback, - "remove anime", - loading_msg="Removing anime from list...", - success_msg=f"'{title}' removed from list", - error_msg="Failed to remove anime from list", - ) - - if success: - feedback.pause_for_user("Press Enter to continue") - - # Return to appropriate menu based on how we got here - if from_media_actions: - return ControlFlow.BACK - elif list_status: - return State( - menu_name="ANILIST_LIST_VIEW", - data={"list_status": list_status, "page": return_page}, - ) - else: - return State(menu_name="ANILIST_LISTS") - - -def _stream_anime(ctx: Context, anime: MediaItem) -> State: - """Navigate to streaming interface for the selected anime.""" - return State( - menu_name="RESULTS", - data=MediaApiState( - results=[anime], # Pass as single-item list - query=anime.title.english or anime.title.romaji or "Unknown", - page=1, - api_params=None, - user_list_params=None, - ), - ) - - -def _show_all_lists_stats(ctx: Context, feedback, icons: bool) -> State | ControlFlow: - """Show comprehensive statistics across all user lists.""" - console = Console() - console.clear() - - # This would require fetching data from all lists - # For now, show a placeholder implementation - stats_text = "[bold cyan]๐Ÿ“Š Your AniList Statistics[/bold cyan]\n\n" - stats_text += "[dim]Loading comprehensive list statistics...[/dim]\n" - stats_text += "[dim]This feature requires fetching data from all lists.[/dim]" - - panel = Panel( - stats_text, - title=f"{'๐Ÿ“Š ' if icons else ''}AniList Statistics", - border_style="green", - ) - console.print(panel) - - feedback.pause_for_user("Press Enter to continue") - return ControlFlow.CONTINUE - - -def _search_all_lists(ctx: Context, feedback, icons: bool) -> State | ControlFlow: - """Search across all user lists.""" - try: - query = click.prompt("Enter search query", type=str) - if not query.strip(): - return ControlFlow.CONTINUE - - # This would require implementing search across all lists - feedback.info( - "Search functionality", - "Cross-list search will be implemented in a future update", - ) - feedback.pause_for_user("Press Enter to continue") - - except click.Abort: - pass - - return ControlFlow.CONTINUE - - -def _add_anime_to_list(ctx: Context, feedback, icons: bool) -> State | ControlFlow: - """Add a new anime to one of the user's lists.""" - try: - query = click.prompt("Enter anime name to search", type=str) - if not query.strip(): - return ControlFlow.CONTINUE - - # Navigate to search with intent to add to list - return State( - menu_name="PROVIDER_SEARCH", data={"query": query, "add_to_list_mode": True} - ) - - except click.Abort: - pass - - return ControlFlow.CONTINUE - - -def _add_anime_to_specific_list( - ctx: Context, list_status: str, feedback, icons: bool -) -> State | ControlFlow: - """Add a new anime to a specific list.""" - try: - query = click.prompt("Enter anime name to search", type=str) - if not query.strip(): - return ControlFlow.CONTINUE - - # Navigate to search with specific list target - return State( - menu_name="PROVIDER_SEARCH", - data={"query": query, "target_list": list_status}, - ) - - except click.Abort: - pass - - return ControlFlow.CONTINUE - - -def _remove_anime_from_list( - ctx: Context, - result: MediaSearchResult, - list_status: str, - page: int, - feedback, - icons: bool, -) -> State | ControlFlow: - """Select and remove an anime from the current list.""" - if not result.media: - feedback.info("Empty list", "No anime to remove from this list") - feedback.pause_for_user("Press Enter to continue") - return ControlFlow.CONTINUE - - # Create choices from anime list - choices = [] - for i, anime in enumerate(result.media, 1): - title = anime.title.english or anime.title.romaji or "Unknown Title" - choices.append(f"{i}. {title}") - - choice = ctx.selector.choose( - prompt="Select anime to remove", - choices=choices, - header="Remove Anime from List", - ) - - if not choice: - return ControlFlow.CONTINUE - - # Extract index and get selected anime - try: - index = int(choice.split(".")[0]) - 1 - selected_anime = result.media[index] - - return _confirm_remove_anime( - ctx, selected_anime, list_status, page, feedback, icons - ) - except (ValueError, IndexError): - return ControlFlow.CONTINUE - - -def _show_list_statistics( - ctx: Context, list_status: str, feedback, icons: bool -) -> State | ControlFlow: - """Show statistics for a specific list.""" - console = Console() - console.clear() - - list_name = _status_to_display_name(list_status) - - stats_text = f"[bold cyan]๐Ÿ“Š {list_name} Statistics[/bold cyan]\n\n" - stats_text += "[dim]Loading list statistics...[/dim]\n" - stats_text += "[dim]This feature requires comprehensive list analysis.[/dim]" - - panel = Panel( - stats_text, - title=f"{'๐Ÿ“Š ' if icons else ''}{list_name} Stats", - border_style="blue", - ) - console.print(panel) - - feedback.pause_for_user("Press Enter to continue") - return ControlFlow.CONTINUE - - -def _status_to_display_name(status: str) -> str: - """Convert API status to human-readable display name.""" - status_map = { - "CURRENT": "Currently Watching", - "PLANNING": "Planning to Watch", - "COMPLETED": "Completed", - "PAUSED": "Paused", - "DROPPED": "Dropped", - "REPEATING": "Rewatching", - } - return status_map.get(status, status) - - -# Import click for user input -import click diff --git a/fastanime/cli/interactive/menu/share/watch_history.py b/fastanime/cli/interactive/menu/share/watch_history.py deleted file mode 100644 index c8d1684..0000000 --- a/fastanime/cli/interactive/menu/share/watch_history.py +++ /dev/null @@ -1,572 +0,0 @@ -""" -Watch History Management Menu for the interactive CLI. -Provides comprehensive watch history viewing, editing, and management capabilities. -""" - -import logging -from typing import Callable, Dict, List - -from rich.console import Console -from rich.table import Table - -from ....core.constants import APP_DATA_DIR -from ...utils.feedback import create_feedback_manager -from ...utils.watch_history_manager import WatchHistoryManager -from ...utils.watch_history_types import WatchHistoryEntry -from ..session import Context, session -from ..state import InternalDirective, State - -logger = logging.getLogger(__name__) - -MenuAction = Callable[[], str] - - -@session.menu -def watch_history(ctx: Context, state: State) -> State | InternalDirective: - """ - Watch history management menu for viewing and managing local watch history. - """ - icons = ctx.config.general.icons - feedback = create_feedback_manager(icons) - console = Console() - console.clear() - - # Initialize watch history manager - history_manager = WatchHistoryManager() - - # Show watch history stats - _display_history_stats(console, history_manager, icons) - - options: Dict[str, MenuAction] = { - f"{'๐Ÿ“บ ' if icons else ''}Currently Watching": lambda: _view_watching( - ctx, history_manager, feedback - ), - f"{'โœ… ' if icons else ''}Completed Anime": lambda: _view_completed( - ctx, history_manager, feedback - ), - f"{'๐Ÿ•’ ' if icons else ''}Recently Watched": lambda: _view_recent( - ctx, history_manager, feedback - ), - f"{'๐Ÿ“‹ ' if icons else ''}View All History": lambda: _view_all_history( - ctx, history_manager, feedback - ), - f"{'๐Ÿ” ' if icons else ''}Search History": lambda: _search_history( - ctx, history_manager, feedback - ), - f"{'โœ๏ธ ' if icons else ''}Edit Entry": lambda: _edit_entry( - ctx, history_manager, feedback - ), - f"{'๐Ÿ—‘๏ธ ' if icons else ''}Remove Entry": lambda: _remove_entry( - ctx, history_manager, feedback - ), - f"{'๐Ÿ“Š ' if icons else ''}View Statistics": lambda: _view_stats( - ctx, history_manager, feedback - ), - f"{'๐Ÿ’พ ' if icons else ''}Export History": lambda: _export_history( - ctx, history_manager, feedback - ), - f"{'๐Ÿ“ฅ ' if icons else ''}Import History": lambda: _import_history( - ctx, history_manager, feedback - ), - f"{'๐Ÿงน ' if icons else ''}Clear All History": lambda: _clear_history( - ctx, history_manager, feedback - ), - f"{'๐Ÿ”™ ' if icons else ''}Back to Main Menu": lambda: "BACK", - } - - choice_str = ctx.selector.choose( - prompt="Select Watch History Action", - choices=list(options.keys()), - header="Watch History Management", - ) - - if not choice_str: - return InternalDirective.BACK - - result = options[choice_str]() - - if result == "BACK": - return InternalDirective.BACK - else: - return InternalDirective.RELOAD - - -def _display_history_stats( - console: Console, history_manager: WatchHistoryManager, icons: bool -): - """Display current watch history statistics.""" - stats = history_manager.get_stats() - - # Create a stats table - table = Table(title=f"{'๐Ÿ“Š ' if icons else ''}Watch History Overview") - table.add_column("Metric", style="cyan") - table.add_column("Count", style="green") - - table.add_row("Total Anime", str(stats["total_entries"])) - table.add_row("Currently Watching", str(stats["watching"])) - table.add_row("Completed", str(stats["completed"])) - table.add_row("Dropped", str(stats["dropped"])) - table.add_row("Paused", str(stats["paused"])) - table.add_row("Total Episodes", str(stats["total_episodes_watched"])) - table.add_row("Last Updated", stats["last_updated"]) - - console.print(table) - console.print() - - -def _view_watching(ctx: Context, history_manager: WatchHistoryManager, feedback) -> str: - """View currently watching anime.""" - entries = history_manager.get_watching_entries() - - if not entries: - feedback.info("No anime currently being watched") - return "CONTINUE" - - return _display_entries_list(ctx, entries, "Currently Watching", feedback) - - -def _view_completed( - ctx: Context, history_manager: WatchHistoryManager, feedback -) -> str: - """View completed anime.""" - entries = history_manager.get_completed_entries() - - if not entries: - feedback.info("No completed anime found") - return "CONTINUE" - - return _display_entries_list(ctx, entries, "Completed Anime", feedback) - - -def _view_recent(ctx: Context, history_manager: WatchHistoryManager, feedback) -> str: - """View recently watched anime.""" - entries = history_manager.get_recently_watched(20) - - if not entries: - feedback.info("No recent watch history found") - return "CONTINUE" - - return _display_entries_list(ctx, entries, "Recently Watched", feedback) - - -def _view_all_history( - ctx: Context, history_manager: WatchHistoryManager, feedback -) -> str: - """View all watch history entries.""" - entries = history_manager.get_all_entries() - - if not entries: - feedback.info("No watch history found") - return "CONTINUE" - - # Sort by last watched date - entries.sort(key=lambda x: x.last_watched, reverse=True) - - return _display_entries_list(ctx, entries, "All Watch History", feedback) - - -def _search_history( - ctx: Context, history_manager: WatchHistoryManager, feedback -) -> str: - """Search watch history by title.""" - query = ctx.selector.ask("Enter search query:") - - if not query: - return "CONTINUE" - - entries = history_manager.search_entries(query) - - if not entries: - feedback.info(f"No anime found matching '{query}'") - return "CONTINUE" - - return _display_entries_list( - ctx, entries, f"Search Results for '{query}'", feedback - ) - - -def _display_entries_list( - ctx: Context, entries: List[WatchHistoryEntry], title: str, feedback -) -> str: - """Display a list of watch history entries and allow selection.""" - console = Console() - console.clear() - - # Create table for entries - table = Table(title=title) - table.add_column("Status", style="yellow", width=6) - table.add_column("Title", style="cyan") - table.add_column("Progress", style="green", width=12) - table.add_column("Last Watched", style="blue", width=12) - - choices = [] - entry_map = {} - - for i, entry in enumerate(entries): - # Format last watched date - last_watched = entry.last_watched.strftime("%Y-%m-%d") - - # Add to table - table.add_row( - entry.get_status_emoji(), - entry.get_display_title(), - entry.get_progress_display(), - last_watched, - ) - - # Create choice for selector - choice_text = f"{entry.get_status_emoji()} {entry.get_display_title()} - {entry.get_progress_display()}" - choices.append(choice_text) - entry_map[choice_text] = entry - - console.print(table) - console.print() - - if not choices: - feedback.info("No entries to display") - feedback.pause_for_user() - return "CONTINUE" - - choices.append("Back") - - choice = ctx.selector.choose("Select an anime for details:", choices=choices) - - if not choice or choice == "Back": - return "CONTINUE" - - selected_entry = entry_map[choice] - return _show_entry_details(ctx, selected_entry, feedback) - - -def _show_entry_details(ctx: Context, entry: WatchHistoryEntry, feedback) -> str: - """Show detailed information about a watch history entry.""" - console = Console() - console.clear() - - # Display detailed entry information - console.print(f"[bold cyan]{entry.get_display_title()}[/bold cyan]") - console.print(f"Status: {entry.get_status_emoji()} {entry.status.title()}") - console.print(f"Progress: {entry.get_progress_display()}") - console.print(f"Times Watched: {entry.times_watched}") - console.print(f"First Watched: {entry.first_watched.strftime('%Y-%m-%d %H:%M')}") - console.print(f"Last Watched: {entry.last_watched.strftime('%Y-%m-%d %H:%M')}") - - if entry.notes: - console.print(f"Notes: {entry.notes}") - - # Show media details if available - media = entry.media_item - if media.description: - console.print( - f"\nDescription: {media.description[:200]}{'...' if len(media.description) > 200 else ''}" - ) - - if media.genres: - console.print(f"Genres: {', '.join(media.genres)}") - - if media.average_score: - console.print(f"Score: {media.average_score}/100") - - console.print() - - # Action options - actions = [ - "Mark Episode as Watched", - "Change Status", - "Edit Notes", - "Remove from History", - "Back to List", - ] - - choice = ctx.selector.choose("Select action:", choices=actions) - - if choice == "Mark Episode as Watched": - return _mark_episode_watched(ctx, entry, feedback) - elif choice == "Change Status": - return _change_entry_status(ctx, entry, feedback) - elif choice == "Edit Notes": - return _edit_entry_notes(ctx, entry, feedback) - elif choice == "Remove from History": - return _confirm_remove_entry(ctx, entry, feedback) - else: - return "CONTINUE" - - -def _mark_episode_watched(ctx: Context, entry: WatchHistoryEntry, feedback) -> str: - """Mark a specific episode as watched.""" - current_episode = entry.last_watched_episode - max_episodes = entry.media_item.episodes or 999 - - episode_str = ctx.selector.ask( - f"Enter episode number (current: {current_episode}, max: {max_episodes}):" - ) - - try: - episode = int(episode_str) - if episode < 1 or (max_episodes and episode > max_episodes): - feedback.error( - f"Invalid episode number. Must be between 1 and {max_episodes}" - ) - return "CONTINUE" - - history_manager = WatchHistoryManager() - success = history_manager.mark_episode_watched(entry.media_item.id, episode) - - if success: - feedback.success(f"Marked episode {episode} as watched") - else: - feedback.error("Failed to update watch progress") - - except ValueError: - feedback.error("Invalid episode number entered") - - return "CONTINUE" - - -def _change_entry_status(ctx: Context, entry: WatchHistoryEntry, feedback) -> str: - """Change the status of a watch history entry.""" - statuses = ["watching", "completed", "paused", "dropped", "planning"] - current_status = entry.status - - choices = [ - f"{status.title()} {'(current)' if status == current_status else ''}" - for status in statuses - ] - choices.append("Cancel") - - choice = ctx.selector.choose( - f"Select new status (current: {current_status}):", choices=choices - ) - - if not choice or choice == "Cancel": - return "CONTINUE" - - new_status = choice.split()[0].lower() - - history_manager = WatchHistoryManager() - success = history_manager.change_status(entry.media_item.id, new_status) - - if success: - feedback.success(f"Changed status to {new_status}") - else: - feedback.error("Failed to update status") - - return "CONTINUE" - - -def _edit_entry_notes(ctx: Context, entry: WatchHistoryEntry, feedback) -> str: - """Edit notes for a watch history entry.""" - current_notes = entry.notes or "" - - new_notes = ctx.selector.ask(f"Enter notes (current: '{current_notes}'):") - - if new_notes is None: # User cancelled - return "CONTINUE" - - history_manager = WatchHistoryManager() - success = history_manager.update_notes(entry.media_item.id, new_notes) - - if success: - feedback.success("Notes updated successfully") - else: - feedback.error("Failed to update notes") - - return "CONTINUE" - - -def _confirm_remove_entry(ctx: Context, entry: WatchHistoryEntry, feedback) -> str: - """Confirm and remove a watch history entry.""" - if feedback.confirm(f"Remove '{entry.get_display_title()}' from watch history?"): - history_manager = WatchHistoryManager() - success = history_manager.remove_entry(entry.media_item.id) - - if success: - feedback.success("Entry removed from watch history") - else: - feedback.error("Failed to remove entry") - - return "CONTINUE" - - -def _edit_entry(ctx: Context, history_manager: WatchHistoryManager, feedback) -> str: - """Edit a watch history entry (select first).""" - entries = history_manager.get_all_entries() - - if not entries: - feedback.info("No watch history entries to edit") - return "CONTINUE" - - # Sort by title for easier selection - entries.sort(key=lambda x: x.get_display_title()) - - choices = [ - f"{entry.get_display_title()} - {entry.get_progress_display()}" - for entry in entries - ] - choices.append("Cancel") - - choice = ctx.selector.choose("Select anime to edit:", choices=choices) - - if not choice or choice == "Cancel": - return "CONTINUE" - - # Find the selected entry - choice_title = choice.split(" - ")[0] - selected_entry = next( - (entry for entry in entries if entry.get_display_title() == choice_title), None - ) - - if selected_entry: - return _show_entry_details(ctx, selected_entry, feedback) - - return "CONTINUE" - - -def _remove_entry(ctx: Context, history_manager: WatchHistoryManager, feedback) -> str: - """Remove a watch history entry (select first).""" - entries = history_manager.get_all_entries() - - if not entries: - feedback.info("No watch history entries to remove") - return "CONTINUE" - - # Sort by title for easier selection - entries.sort(key=lambda x: x.get_display_title()) - - choices = [ - f"{entry.get_display_title()} - {entry.get_progress_display()}" - for entry in entries - ] - choices.append("Cancel") - - choice = ctx.selector.choose("Select anime to remove:", choices=choices) - - if not choice or choice == "Cancel": - return "CONTINUE" - - # Find the selected entry - choice_title = choice.split(" - ")[0] - selected_entry = next( - (entry for entry in entries if entry.get_display_title() == choice_title), None - ) - - if selected_entry: - return _confirm_remove_entry(ctx, selected_entry, feedback) - - return "CONTINUE" - - -def _view_stats(ctx: Context, history_manager: WatchHistoryManager, feedback) -> str: - """View detailed watch history statistics.""" - console = Console() - console.clear() - - stats = history_manager.get_stats() - - # Create detailed stats table - table = Table(title="Detailed Watch History Statistics") - table.add_column("Metric", style="cyan") - table.add_column("Value", style="green") - - table.add_row("Total Anime Entries", str(stats["total_entries"])) - table.add_row("Currently Watching", str(stats["watching"])) - table.add_row("Completed", str(stats["completed"])) - table.add_row("Dropped", str(stats["dropped"])) - table.add_row("Paused", str(stats["paused"])) - table.add_row("Total Episodes Watched", str(stats["total_episodes_watched"])) - table.add_row("Last Updated", stats["last_updated"]) - - # Calculate additional stats - if stats["total_entries"] > 0: - completion_rate = (stats["completed"] / stats["total_entries"]) * 100 - table.add_row("Completion Rate", f"{completion_rate:.1f}%") - - avg_episodes = stats["total_episodes_watched"] / stats["total_entries"] - table.add_row("Avg Episodes per Anime", f"{avg_episodes:.1f}") - - console.print(table) - feedback.pause_for_user() - - return "CONTINUE" - - -def _export_history( - ctx: Context, history_manager: WatchHistoryManager, feedback -) -> str: - """Export watch history to a file.""" - export_name = ctx.selector.ask("Enter export filename (without extension):") - - if not export_name: - return "CONTINUE" - - export_path = APP_DATA_DIR / f"{export_name}.json" - - if export_path.exists(): - if not feedback.confirm( - f"File '{export_name}.json' already exists. Overwrite?" - ): - return "CONTINUE" - - success = history_manager.export_history(export_path) - - if success: - feedback.success(f"Watch history exported to {export_path}") - else: - feedback.error("Failed to export watch history") - - return "CONTINUE" - - -def _import_history( - ctx: Context, history_manager: WatchHistoryManager, feedback -) -> str: - """Import watch history from a file.""" - import_name = ctx.selector.ask("Enter import filename (without extension):") - - if not import_name: - return "CONTINUE" - - import_path = APP_DATA_DIR / f"{import_name}.json" - - if not import_path.exists(): - feedback.error(f"File '{import_name}.json' not found in {APP_DATA_DIR}") - return "CONTINUE" - - merge = feedback.confirm( - "Merge with existing history? (No = Replace existing history)" - ) - - success = history_manager.import_history(import_path, merge=merge) - - if success: - action = "merged with" if merge else "replaced" - feedback.success(f"Watch history imported and {action} existing data") - else: - feedback.error("Failed to import watch history") - - return "CONTINUE" - - -def _clear_history(ctx: Context, history_manager: WatchHistoryManager, feedback) -> str: - """Clear all watch history with confirmation.""" - if not feedback.confirm( - "Are you sure you want to clear ALL watch history? This cannot be undone." - ): - return "CONTINUE" - - if not feedback.confirm("Final confirmation: Clear all watch history?"): - return "CONTINUE" - - # Create backup before clearing - backup_success = history_manager.backup_history() - if backup_success: - feedback.info("Backup created before clearing") - - success = history_manager.clear_history() - - if success: - feedback.success("All watch history cleared") - else: - feedback.error("Failed to clear watch history") - - return "CONTINUE" diff --git a/fastanime/cli/service/download/__init__.py b/fastanime/cli/service/download/__init__.py deleted file mode 100644 index 6b98bde..0000000 --- a/fastanime/cli/service/download/__init__.py +++ /dev/null @@ -1,3 +0,0 @@ -from .service import DownloadService - -__all__ = ["DownloadService"] diff --git a/fastanime/cli/service/download/service.py b/fastanime/cli/service/download/service.py deleted file mode 100644 index 4bd9457..0000000 --- a/fastanime/cli/service/download/service.py +++ /dev/null @@ -1,530 +0,0 @@ -"""Download service that integrates with the media registry.""" - -import logging -from pathlib import Path -from typing import Optional - -from ....core.config.model import AppConfig -from ....core.downloader.base import BaseDownloader -from ....core.downloader.downloader import create_downloader -from ....core.downloader.params import DownloadParams -from ....core.exceptions import FastAnimeError -from ....libs.media_api.types import MediaItem -from ....libs.provider.anime.base import BaseAnimeProvider -from ....libs.provider.anime.params import EpisodeStreamsParams -from ....libs.provider.anime.types import Server -from ..registry import MediaRegistryService -from ..registry.models import DownloadStatus, MediaEpisode - -logger = logging.getLogger(__name__) - - -class DownloadService: - """Service for downloading episodes and tracking them in the registry.""" - - def __init__( - self, - config: AppConfig, - media_registry: MediaRegistryService, - provider: BaseAnimeProvider, - ): - self.config = config - self.downloads_config = config.downloads - self.media_registry = media_registry - self.provider = provider - self._downloader: Optional[BaseDownloader] = None - - @property - def downloader(self) -> BaseDownloader: - """Lazy initialization of downloader.""" - if self._downloader is None: - self._downloader = create_downloader(self.downloads_config) - return self._downloader - - def download_episode( - self, - media_item: MediaItem, - episode_number: str, - server: Optional[Server] = None, - quality: Optional[str] = None, - force_redownload: bool = False, - ) -> bool: - """ - Download a specific episode and record it in the registry. - - Args: - media_item: The media item to download - episode_number: The episode number to download - server: Optional specific server to use for download - quality: Optional quality preference - force_redownload: Whether to redownload if already exists - - Returns: - bool: True if download was successful, False otherwise - """ - try: - # Get or create media record - media_record = self.media_registry.get_or_create_record(media_item) - - # Check if episode already exists and is completed - existing_episode = self._find_episode_in_record( - media_record, episode_number - ) - if ( - existing_episode - and existing_episode.download_status == DownloadStatus.COMPLETED - and not force_redownload - and existing_episode.file_path.exists() - ): - logger.info( - f"Episode {episode_number} already downloaded at {existing_episode.file_path}" - ) - return True - - # Generate file path - file_path = self._generate_episode_file_path(media_item, episode_number) - - # Update status to QUEUED - self.media_registry.update_episode_download_status( - media_id=media_item.id, - episode_number=episode_number, - status=DownloadStatus.QUEUED, - file_path=file_path, - ) - - # Get episode stream server if not provided - if server is None: - server = self._get_episode_server(media_item, episode_number, quality) - if not server: - self.media_registry.update_episode_download_status( - media_id=media_item.id, - episode_number=episode_number, - status=DownloadStatus.FAILED, - error_message="Failed to get server for episode", - ) - return False - - # Update status to DOWNLOADING - self.media_registry.update_episode_download_status( - media_id=media_item.id, - episode_number=episode_number, - status=DownloadStatus.DOWNLOADING, - provider_name=self.provider.__class__.__name__, - server_name=server.name, - quality=quality or self.downloads_config.preferred_quality, - ) - - # Perform the download - download_result = self._download_from_server( - media_item, episode_number, server, file_path - ) - - if download_result.success and download_result.video_path: - # Get file size if available - file_size = None - if download_result.video_path.exists(): - file_size = download_result.video_path.stat().st_size - - # Update episode record with success - self.media_registry.update_episode_download_status( - media_id=media_item.id, - episode_number=episode_number, - status=DownloadStatus.COMPLETED, - file_path=download_result.video_path, - file_size=file_size, - subtitle_paths=download_result.subtitle_paths, - ) - - logger.info( - f"Successfully downloaded episode {episode_number} to {download_result.video_path}" - ) - else: - # Update episode record with failure - self.media_registry.update_episode_download_status( - media_id=media_item.id, - episode_number=episode_number, - status=DownloadStatus.FAILED, - error_message=download_result.error_message, - ) - - logger.error( - f"Failed to download episode {episode_number}: {download_result.error_message}" - ) - - return download_result.success - - except Exception as e: - logger.error(f"Error downloading episode {episode_number}: {e}") - # Update status to FAILED - try: - self.media_registry.update_episode_download_status( - media_id=media_item.id, - episode_number=episode_number, - status=DownloadStatus.FAILED, - error_message=str(e), - ) - except Exception as cleanup_error: - logger.error(f"Failed to update failed status: {cleanup_error}") - - return False - - def download_multiple_episodes( - self, - media_item: MediaItem, - episode_numbers: list[str], - quality: Optional[str] = None, - force_redownload: bool = False, - ) -> dict[str, bool]: - """ - Download multiple episodes and return success status for each. - - Args: - media_item: The media item to download - episode_numbers: List of episode numbers to download - quality: Optional quality preference - force_redownload: Whether to redownload if already exists - - Returns: - dict: Mapping of episode_number -> success status - """ - results = {} - - for episode_number in episode_numbers: - success = self.download_episode( - media_item=media_item, - episode_number=episode_number, - quality=quality, - force_redownload=force_redownload, - ) - results[episode_number] = success - - # Log progress - logger.info( - f"Download progress: {episode_number} - {'โœ“' if success else 'โœ—'}" - ) - - return results - - def get_download_status( - self, media_item: MediaItem, episode_number: str - ) -> Optional[DownloadStatus]: - """Get the download status for a specific episode.""" - media_record = self.media_registry.get_media_record(media_item.id) - if not media_record: - return None - - episode_record = self._find_episode_in_record(media_record, episode_number) - return episode_record.download_status if episode_record else None - - def get_downloaded_episodes(self, media_item: MediaItem) -> list[str]: - """Get list of successfully downloaded episode numbers for a media item.""" - media_record = self.media_registry.get_media_record(media_item.id) - if not media_record: - return [] - - return [ - episode.episode_number - for episode in media_record.media_episodes - if episode.download_status == DownloadStatus.COMPLETED - and episode.file_path.exists() - ] - - def remove_downloaded_episode( - self, media_item: MediaItem, episode_number: str - ) -> bool: - """Remove a downloaded episode file and update registry.""" - try: - media_record = self.media_registry.get_media_record(media_item.id) - if not media_record: - return False - - episode_record = self._find_episode_in_record(media_record, episode_number) - if not episode_record: - return False - - # Remove file if it exists - if episode_record.file_path.exists(): - episode_record.file_path.unlink() - - # Remove episode from record - media_record.media_episodes = [ - ep - for ep in media_record.media_episodes - if ep.episode_number != episode_number - ] - - # Save updated record - self.media_registry.save_media_record(media_record) - - logger.info(f"Removed downloaded episode {episode_number}") - return True - - except Exception as e: - logger.error(f"Error removing episode {episode_number}: {e}") - return False - - def _find_episode_in_record( - self, media_record, episode_number: str - ) -> Optional[MediaEpisode]: - """Find an episode record by episode number.""" - for episode in media_record.media_episodes: - if episode.episode_number == episode_number: - return episode - return None - - def _get_episode_server( - self, media_item: MediaItem, episode_number: str, quality: Optional[str] = None - ) -> Optional[Server]: - """Get a server for downloading the episode.""" - try: - # Use media title for provider search - media_title = media_item.title.english or media_item.title.romaji - if not media_title: - logger.error("Media item has no searchable title") - return None - - # Get episode streams from provider - streams = self.provider.episode_streams( - EpisodeStreamsParams( - anime_id=str(media_item.id), - query=media_title, - episode=episode_number, - translation_type=self.config.stream.translation_type, - ) - ) - - if not streams: - logger.error(f"No streams found for episode {episode_number}") - return None - - # Convert iterator to list and get first available server - stream_list = list(streams) - if not stream_list: - logger.error(f"No servers available for episode {episode_number}") - return None - - # Return the first server (could be enhanced with quality/preference logic) - return stream_list[0] - - except Exception as e: - logger.error(f"Error getting episode server: {e}") - return None - - def _download_from_server( - self, - media_item: MediaItem, - episode_number: str, - server: Server, - output_path: Path, - ): - """Download episode from a specific server.""" - anime_title = media_item.title.english or media_item.title.romaji or "Unknown" - episode_title = server.episode_title or f"Episode {episode_number}" - - try: - # Get the best quality link from server - if not server.links: - raise FastAnimeError("Server has no available links") - - # Use the first link (could be enhanced with quality filtering) - stream_link = server.links[0] - - # Prepare download parameters - download_params = DownloadParams( - url=stream_link.link, - anime_title=anime_title, - episode_title=episode_title, - silent=True, # Use True by default since there's no verbose in config - headers=server.headers, - subtitles=[sub.url for sub in server.subtitles] - if server.subtitles - else [], - vid_format=self.downloads_config.preferred_quality, - force_unknown_ext=True, - ) - - # Ensure output directory exists - output_path.parent.mkdir(parents=True, exist_ok=True) - - # Perform download - return self.downloader.download(download_params) - - except Exception as e: - logger.error(f"Error during download: {e}") - from ....core.downloader.model import DownloadResult - - return DownloadResult( - success=False, - error_message=str(e), - anime_title=anime_title, - episode_title=episode_title, - ) - - def get_download_statistics(self) -> dict: - """Get comprehensive download statistics from the registry.""" - return self.media_registry.get_download_statistics() - - def get_failed_downloads(self) -> list[tuple[int, str]]: - """Get all episodes that failed to download.""" - return self.media_registry.get_episodes_by_download_status( - DownloadStatus.FAILED - ) - - def get_queued_downloads(self) -> list[tuple[int, str]]: - """Get all episodes queued for download.""" - return self.media_registry.get_episodes_by_download_status( - DownloadStatus.QUEUED - ) - - def retry_failed_downloads(self, max_retries: int = 3) -> dict[str, bool]: - """Retry all failed downloads up to max_retries.""" - failed_episodes = self.get_failed_downloads() - results = {} - - for media_id, episode_number in failed_episodes: - # Get the media record to check retry attempts - media_record = self.media_registry.get_media_record(media_id) - if not media_record: - continue - - episode_record = self._find_episode_in_record(media_record, episode_number) - if not episode_record or episode_record.download_attempts >= max_retries: - logger.info( - f"Skipping {media_id}:{episode_number} - max retries exceeded" - ) - continue - - logger.info(f"Retrying download for {media_id}:{episode_number}") - success = self.download_episode( - media_item=media_record.media_item, - episode_number=episode_number, - force_redownload=True, - ) - results[f"{media_id}:{episode_number}"] = success - - return results - - def cleanup_failed_downloads(self, older_than_days: int = 7) -> int: - """Clean up failed download records older than specified days.""" - from datetime import datetime, timedelta - - cleanup_count = 0 - cutoff_date = datetime.now() - timedelta(days=older_than_days) - - try: - for record in self.media_registry.get_all_media_records(): - episodes_to_remove = [] - - for episode in record.media_episodes: - if ( - episode.download_status == DownloadStatus.FAILED - and episode.download_date < cutoff_date - ): - episodes_to_remove.append(episode.episode_number) - - for episode_number in episodes_to_remove: - record.media_episodes = [ - ep - for ep in record.media_episodes - if ep.episode_number != episode_number - ] - cleanup_count += 1 - - if episodes_to_remove: - self.media_registry.save_media_record(record) - - logger.info(f"Cleaned up {cleanup_count} failed download records") - return cleanup_count - - except Exception as e: - logger.error(f"Error during cleanup: {e}") - return 0 - - def pause_download(self, media_item: MediaItem, episode_number: str) -> bool: - """Pause a download (change status from DOWNLOADING to PAUSED).""" - try: - return self.media_registry.update_episode_download_status( - media_id=media_item.id, - episode_number=episode_number, - status=DownloadStatus.PAUSED, - ) - except Exception as e: - logger.error(f"Error pausing download: {e}") - return False - - def resume_download(self, media_item: MediaItem, episode_number: str) -> bool: - """Resume a paused download.""" - return self.download_episode( - media_item=media_item, - episode_number=episode_number, - force_redownload=True, - ) - - def get_media_download_progress(self, media_item: MediaItem) -> dict: - """Get download progress for a specific media item.""" - try: - media_record = self.media_registry.get_media_record(media_item.id) - if not media_record: - return { - "total": 0, - "downloaded": 0, - "failed": 0, - "queued": 0, - "downloading": 0, - } - - stats = { - "total": 0, - "downloaded": 0, - "failed": 0, - "queued": 0, - "downloading": 0, - "paused": 0, - } - - for episode in media_record.media_episodes: - stats["total"] += 1 - status = episode.download_status.value.lower() - if status == "completed": - stats["downloaded"] += 1 - elif status == "failed": - stats["failed"] += 1 - elif status == "queued": - stats["queued"] += 1 - elif status == "downloading": - stats["downloading"] += 1 - elif status == "paused": - stats["paused"] += 1 - - return stats - - except Exception as e: - logger.error(f"Error getting download progress: {e}") - return { - "total": 0, - "downloaded": 0, - "failed": 0, - "queued": 0, - "downloading": 0, - } - - def _generate_episode_file_path( - self, media_item: MediaItem, episode_number: str - ) -> Path: - """Generate the file path for a downloaded episode.""" - # Use the download directory from config - base_dir = self.downloads_config.downloads_dir - - # Create anime-specific directory - anime_title = media_item.title.english or media_item.title.romaji or "Unknown" - # Sanitize title for filesystem - safe_title = "".join( - c for c in anime_title if c.isalnum() or c in (" ", "-", "_") - ).rstrip() - - anime_dir = base_dir / safe_title - - # Generate filename (could use template from config in the future) - filename = f"Episode_{episode_number:0>2}.mp4" - - return anime_dir / filename diff --git a/fastanime/core/config/defaults.py b/fastanime/core/config/defaults.py index 5aedca1..abad16e 100644 --- a/fastanime/core/config/defaults.py +++ b/fastanime/core/config/defaults.py @@ -39,15 +39,6 @@ STREAM_USE_IPC = ( lambda: True if PLATFORM != "win32" and not detect.is_running_in_termux() else False ) -# ServiceConfig -SERVICE_ENABLED = False -SERVICE_WATCHLIST_CHECK_INTERVAL = 30 -SERVICE_QUEUE_PROCESS_INTERVAL = 1 -SERVICE_MAX_CONCURRENT_DOWNLOADS = 3 -SERVICE_AUTO_RETRY_COUNT = 3 -SERVICE_CLEANUP_COMPLETED_DAYS = 7 -SERVICE_NOTIFICATION_ENABLED = True - # FzfConfig FZF_OPTS = DEFAULTS_DIR / "fzf-opts" FZF_HEADER_COLOR = "95,135,175" @@ -78,22 +69,6 @@ ANILIST_PREFERRED_LANGUAGE = "english" DOWNLOADS_DOWNLOADER = "auto" DOWNLOADS_DOWNLOADS_DIR = USER_VIDEOS_DIR DOWNLOADS_ENABLE_TRACKING = True -DOWNLOADS_AUTO_ORGANIZE = True -DOWNLOADS_MAX_CONCURRENT = 3 -DOWNLOADS_AUTO_CLEANUP_FAILED = True -DOWNLOADS_RETENTION_DAYS = 30 -DOWNLOADS_SYNC_WITH_WATCH_HISTORY = True -DOWNLOADS_AUTO_MARK_OFFLINE = True -DOWNLOADS_NAMING_TEMPLATE = ( - "{title}/Season {season:02d}/{episode:02d} - {episode_title}.{ext}" -) -DOWNLOADS_PREFERRED_QUALITY = "1080" -DOWNLOADS_DOWNLOAD_SUBTITLES = True -DOWNLOADS_SUBTITLE_LANGUAGES = ["en"] -DOWNLOADS_QUEUE_MAX_SIZE = 100 -DOWNLOADS_AUTO_START_DOWNLOADS = True -DOWNLOADS_RETRY_ATTEMPTS = 3 -DOWNLOADS_RETRY_DELAY = 300 # RegistryConfig MEDIA_REGISTRY_DIR = USER_VIDEOS_DIR / ".registry" diff --git a/fastanime/core/config/model.py b/fastanime/core/config/model.py index 7abae27..81c44dd 100644 --- a/fastanime/core/config/model.py +++ b/fastanime/core/config/model.py @@ -140,49 +140,6 @@ class StreamConfig(BaseModel): ) -class ServiceConfig(BaseModel): - """Configuration for the background download service.""" - - enabled: bool = Field( - default=defaults.SERVICE_ENABLED, - description=desc.SERVICE_ENABLED, - ) - watchlist_check_interval: int = Field( - default=defaults.SERVICE_WATCHLIST_CHECK_INTERVAL, - ge=5, - le=180, - description=desc.SERVICE_WATCHLIST_CHECK_INTERVAL, - ) - queue_process_interval: int = Field( - default=defaults.SERVICE_QUEUE_PROCESS_INTERVAL, - ge=1, - le=60, - description=desc.SERVICE_QUEUE_PROCESS_INTERVAL, - ) - max_concurrent_downloads: int = Field( - default=defaults.SERVICE_MAX_CONCURRENT_DOWNLOADS, - ge=1, - le=10, - description=desc.SERVICE_MAX_CONCURRENT_DOWNLOADS, - ) - auto_retry_count: int = Field( - default=defaults.SERVICE_AUTO_RETRY_COUNT, - ge=0, - le=10, - description=desc.SERVICE_AUTO_RETRY_COUNT, - ) - cleanup_completed_days: int = Field( - default=defaults.SERVICE_CLEANUP_COMPLETED_DAYS, - ge=1, - le=30, - description=desc.SERVICE_CLEANUP_COMPLETED_DAYS, - ) - notification_enabled: bool = Field( - default=defaults.SERVICE_NOTIFICATION_ENABLED, - description=desc.SERVICE_NOTIFICATION_ENABLED, - ) - - class OtherConfig(BaseModel): pass @@ -338,72 +295,6 @@ class DownloadsConfig(OtherConfig): default=defaults.DOWNLOADS_ENABLE_TRACKING, description=desc.DOWNLOADS_ENABLE_TRACKING, ) - auto_organize: bool = Field( - default=defaults.DOWNLOADS_AUTO_ORGANIZE, - description=desc.DOWNLOADS_AUTO_ORGANIZE, - ) - max_concurrent: int = Field( - default=defaults.DOWNLOADS_MAX_CONCURRENT, - gt=0, - le=10, - description=desc.DOWNLOADS_MAX_CONCURRENT, - ) - auto_cleanup_failed: bool = Field( - default=defaults.DOWNLOADS_AUTO_CLEANUP_FAILED, - description=desc.DOWNLOADS_AUTO_CLEANUP_FAILED, - ) - retention_days: int = Field( - default=defaults.DOWNLOADS_RETENTION_DAYS, - gt=0, - description=desc.DOWNLOADS_RETENTION_DAYS, - ) - - # Integration with watch history - sync_with_watch_history: bool = Field( - default=defaults.DOWNLOADS_SYNC_WITH_WATCH_HISTORY, - description=desc.DOWNLOADS_SYNC_WITH_WATCH_HISTORY, - ) - auto_mark_offline: bool = Field( - default=defaults.DOWNLOADS_AUTO_MARK_OFFLINE, - description=desc.DOWNLOADS_AUTO_MARK_OFFLINE, - ) - - # File organization - naming_template: str = Field( - default=defaults.DOWNLOADS_NAMING_TEMPLATE, - description=desc.DOWNLOADS_NAMING_TEMPLATE, - ) - - # Quality and subtitles - preferred_quality: Literal["360", "480", "720", "1080", "best"] = Field( - default=defaults.DOWNLOADS_PREFERRED_QUALITY, - description=desc.DOWNLOADS_PREFERRED_QUALITY, - ) - download_subtitles: bool = Field( - default=defaults.DOWNLOADS_DOWNLOAD_SUBTITLES, - description=desc.DOWNLOADS_DOWNLOAD_SUBTITLES, - ) - - # Queue management - queue_max_size: int = Field( - default=defaults.DOWNLOADS_QUEUE_MAX_SIZE, - gt=0, - description=desc.DOWNLOADS_QUEUE_MAX_SIZE, - ) - auto_start_downloads: bool = Field( - default=defaults.DOWNLOADS_AUTO_START_DOWNLOADS, - description=desc.DOWNLOADS_AUTO_START_DOWNLOADS, - ) - retry_attempts: int = Field( - default=defaults.DOWNLOADS_RETRY_ATTEMPTS, - ge=0, - description=desc.DOWNLOADS_RETRY_ATTEMPTS, - ) - retry_delay: int = Field( - default=defaults.DOWNLOADS_RETRY_DELAY, - ge=0, - description=desc.DOWNLOADS_RETRY_DELAY, - ) class MediaRegistryConfig(OtherConfig): @@ -442,11 +333,6 @@ class AppConfig(BaseModel): default_factory=JikanConfig, description=desc.APP_JIKAN, ) - service: ServiceConfig = Field( - default_factory=ServiceConfig, - description=desc.APP_SERVICE, - ) - fzf: FzfConfig = Field( default_factory=FzfConfig, description=desc.APP_FZF, @@ -456,10 +342,6 @@ class AppConfig(BaseModel): description=desc.APP_ROFI, ) mpv: MpvConfig = Field(default_factory=MpvConfig, description=desc.APP_MPV) - service: ServiceConfig = Field( - default_factory=ServiceConfig, - description=desc.APP_SERVICE, - ) media_registry: MediaRegistryConfig = Field( default_factory=MediaRegistryConfig, description=desc.APP_MEDIA_REGISTRY )