From 7088b8ce18ea8c74c86a44f7bedb3a55a74296bb Mon Sep 17 00:00:00 2001 From: Benexl Date: Sat, 26 Jul 2025 11:54:01 +0300 Subject: [PATCH] Refactor preview management and caching system - Introduced a new PreviewWorkerManager to handle both anime and episode preview caching. - Implemented PreviewCacheWorker and EpisodeCacheWorker for specialized background tasks. - Added context management for preview operations to ensure proper cleanup. - Enhanced error handling and logging during image downloads and info text generation. - Removed redundant caching logic and consolidated functionality into worker classes. - Updated session management to clean up preview workers on session end. - Removed unused utility functions and streamlined the codebase. --- .../cli/interactive/menu/media/episodes.py | 35 +- .../interactive/menu/media/media_actions.py | 1 - .../cli/interactive/menu/media/results.py | 41 +- fastanime/cli/interactive/session.py | 13 + fastanime/cli/service/registry/service.py | 2 - fastanime/cli/utils/__init__.py | 5 - fastanime/cli/utils/lazyloader.py | 1 + fastanime/cli/utils/preview.py | 347 +++++-------- fastanime/cli/utils/preview_workers.py | 475 ++++++++++++++++++ fastanime/core/utils/__init__.py | 7 + fastanime/core/utils/concurrency.py | 389 ++++++++++++++ 11 files changed, 1073 insertions(+), 243 deletions(-) create mode 100644 fastanime/cli/utils/preview_workers.py create mode 100644 fastanime/core/utils/__init__.py create mode 100644 fastanime/core/utils/concurrency.py diff --git a/fastanime/cli/interactive/menu/media/episodes.py b/fastanime/cli/interactive/menu/media/episodes.py index 688b66c..d9d88ff 100644 --- a/fastanime/cli/interactive/menu/media/episodes.py +++ b/fastanime/cli/interactive/menu/media/episodes.py @@ -39,21 +39,34 @@ def episodes(ctx: Context, state: State) -> State | InternalDirective: preview_command = None if ctx.config.general.preview != "none": - from ....utils.preview import get_episode_preview + from ....utils.preview import create_preview_context - preview_command = get_episode_preview( - available_episodes, media_item, ctx.config + with create_preview_context() as preview_ctx: + preview_command = preview_ctx.get_episode_preview( + available_episodes, media_item, ctx.config + ) + + chosen_episode_str = ctx.selector.choose( + prompt="Select Episode", choices=choices, preview=preview_command + ) + + if not chosen_episode_str or chosen_episode_str == "Back": + # TODO: should improve the back logic for menus that can be pass through + return InternalDirective.BACKX2 + + chosen_episode = chosen_episode_str + # Workers are automatically cleaned up when exiting the context + else: + # No preview mode + chosen_episode_str = ctx.selector.choose( + prompt="Select Episode", choices=choices, preview=None ) - chosen_episode_str = ctx.selector.choose( - prompt="Select Episode", choices=choices, preview=preview_command - ) + if not chosen_episode_str or chosen_episode_str == "Back": + # TODO: should improve the back logic for menus that can be pass through + return InternalDirective.BACKX2 - if not chosen_episode_str or chosen_episode_str == "Back": - # TODO: should improve the back logic for menus that can be pass through - return InternalDirective.BACKX2 - - chosen_episode = chosen_episode_str + chosen_episode = chosen_episode_str # Track episode selection in watch history (if enabled in config) if ( diff --git a/fastanime/cli/interactive/menu/media/media_actions.py b/fastanime/cli/interactive/menu/media/media_actions.py index 5c3446d..b0c4ebd 100644 --- a/fastanime/cli/interactive/menu/media/media_actions.py +++ b/fastanime/cli/interactive/menu/media/media_actions.py @@ -343,7 +343,6 @@ def _view_info(ctx: Context, state: State) -> MenuAction: # Create next airing panel if available if media_item.next_airing: - airing_info_table = Table(show_header=False, box=box.SIMPLE) airing_info_table.add_column("Field", style="bold red") airing_info_table.add_column("Value", style="white") diff --git a/fastanime/cli/interactive/menu/media/results.py b/fastanime/cli/interactive/menu/media/results.py index 4748f5c..30426fa 100644 --- a/fastanime/cli/interactive/menu/media/results.py +++ b/fastanime/cli/interactive/menu/media/results.py @@ -23,17 +23,6 @@ def results(ctx: Context, state: State) -> State | InternalDirective: _format_title(ctx, media_item): media_item for media_item in search_result.values() } - - preview_command = None - if ctx.config.general.preview != "none": - from ....utils.preview import get_anime_preview - - preview_command = get_anime_preview( - list(search_result_dict.values()), - list(search_result_dict.keys()), - ctx.config, - ) - choices: Dict[str, Callable[[], Union[int, State, InternalDirective]]] = { title: lambda media_id=item.id: media_id for title, item in search_result_dict.items() @@ -64,11 +53,31 @@ def results(ctx: Context, state: State) -> State | InternalDirective: } ) - choice = ctx.selector.choose( - prompt="Select Anime", - choices=list(choices), - preview=preview_command, - ) + preview_command = None + if ctx.config.general.preview != "none": + from ....utils.preview import create_preview_context + + with create_preview_context() as preview_ctx: + preview_command = preview_ctx.get_anime_preview( + list(search_result_dict.values()), + list(search_result_dict.keys()), + ctx.config, + ) + + choice = ctx.selector.choose( + prompt="Select Anime", + choices=list(choices), + preview=preview_command, + ) + + else: + # No preview mode + + choice = ctx.selector.choose( + prompt="Select Anime", + choices=list(choices), + preview=None, + ) if not choice: return InternalDirective.RELOAD diff --git a/fastanime/cli/interactive/session.py b/fastanime/cli/interactive/session.py index a04ad0d..19af20a 100644 --- a/fastanime/cli/interactive/session.py +++ b/fastanime/cli/interactive/session.py @@ -134,8 +134,21 @@ class Session: except Exception: self._context.service.session.create_crash_backup(self._history) raise + finally: + # Clean up preview workers when session ends + self._cleanup_preview_workers() self._context.service.session.save_session(self._history) + def _cleanup_preview_workers(self): + """Clean up preview workers when session ends.""" + try: + from ..utils.preview import shutdown_preview_workers + + shutdown_preview_workers(wait=False, timeout=5.0) + logger.debug("Preview workers cleaned up successfully") + except Exception as e: + logger.warning(f"Failed to cleanup preview workers: {e}") + def _run_main_loop(self): """Run the main session loop.""" while self._history: diff --git a/fastanime/cli/service/registry/service.py b/fastanime/cli/service/registry/service.py index 1bba450..a78a0c2 100644 --- a/fastanime/cli/service/registry/service.py +++ b/fastanime/cli/service/registry/service.py @@ -585,7 +585,6 @@ class MediaRegistryService: ) -> list[tuple[int, str]]: """Get all episodes with a specific download status.""" try: - episodes = [] for record in self.get_all_media_records(): for episode in record.media_episodes: @@ -600,7 +599,6 @@ class MediaRegistryService: def get_download_statistics(self) -> dict: """Get comprehensive download statistics.""" try: - stats = { "total_episodes": 0, "downloaded": 0, diff --git a/fastanime/cli/utils/__init__.py b/fastanime/cli/utils/__init__.py index 535f469..e69de29 100644 --- a/fastanime/cli/utils/__init__.py +++ b/fastanime/cli/utils/__init__.py @@ -1,5 +0,0 @@ -"""CLI utilities for FastAnime.""" - -from .parser import parse_episode_range - -__all__ = ["parse_episode_range"] diff --git a/fastanime/cli/utils/lazyloader.py b/fastanime/cli/utils/lazyloader.py index 3cd96dc..dec2b41 100644 --- a/fastanime/cli/utils/lazyloader.py +++ b/fastanime/cli/utils/lazyloader.py @@ -2,6 +2,7 @@ import importlib import click + # TODO: since command structure is pretty obvious default to only requiring mapping of command names to their function name(cause some have special names like import) class LazyGroup(click.Group): def __init__(self, root: str, *args, lazy_subcommands=None, **kwargs): diff --git a/fastanime/cli/utils/preview.py b/fastanime/cli/utils/preview.py index c96192b..6909844 100644 --- a/fastanime/cli/utils/preview.py +++ b/fastanime/cli/utils/preview.py @@ -1,21 +1,15 @@ -import concurrent.futures import logging import os import re -from hashlib import sha256 -from pathlib import Path -from threading import Thread -from typing import List +from typing import List, Optional -import httpx -from ...core.utils import formatter from ...core.config import AppConfig from ...core.constants import APP_CACHE_DIR, PLATFORM, SCRIPTS_DIR -from ...core.utils.file import AtomicWriter from ...libs.media_api.types import MediaItem from . import ansi +from .preview_workers import PreviewWorkerManager logger = logging.getLogger(__name__) @@ -26,23 +20,100 @@ IMAGES_CACHE_DIR = PREVIEWS_CACHE_DIR / "images" INFO_CACHE_DIR = PREVIEWS_CACHE_DIR / "info" FZF_SCRIPTS_DIR = SCRIPTS_DIR / "fzf" -TEMPLATE_PREVIEW_SCRIPT = Path(str(FZF_SCRIPTS_DIR / "preview.template.sh")).read_text( +TEMPLATE_PREVIEW_SCRIPT = (FZF_SCRIPTS_DIR / "preview.template.sh").read_text( encoding="utf-8" ) -TEMPLATE_INFO_SCRIPT = Path(str(FZF_SCRIPTS_DIR / "info.template.sh")).read_text( - encoding="utf-8" -) -TEMPLATE_EPISODE_INFO_SCRIPT = Path( - str(FZF_SCRIPTS_DIR / "episode-info.template.sh") -).read_text(encoding="utf-8") - EPISODE_PATTERN = re.compile(r"^Episode\s+(\d+)\s-\s.*") +# Global preview worker manager instance +_preview_manager: Optional[PreviewWorkerManager] = None + + +def create_preview_context(): + """ + Create a context manager for preview operations. + + This can be used in menu functions to ensure proper cleanup: + + ```python + with create_preview_context() as preview_ctx: + preview_script = preview_ctx.get_anime_preview(items, titles, config) + # ... use preview_script + # Workers are automatically cleaned up here + ``` + + Returns: + PreviewContext: A context manager for preview operations + """ + return PreviewContext() + + +class PreviewContext: + """Context manager for preview operations with automatic cleanup.""" + + def __init__(self): + self._manager = None + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + if self._manager: + try: + self._manager.shutdown_all(wait=False, timeout=3.0) + except Exception as e: + logger.warning(f"Failed to cleanup preview context: {e}") + + def get_anime_preview( + self, items: List[MediaItem], titles: List[str], config: AppConfig + ) -> str: + """Get anime preview script with managed workers.""" + if not self._manager: + self._manager = _get_preview_manager() + return get_anime_preview(items, titles, config) + + def get_episode_preview( + self, episodes: List[str], media_item: MediaItem, config: AppConfig + ) -> str: + """Get episode preview script with managed workers.""" + if not self._manager: + self._manager = _get_preview_manager() + return get_episode_preview(episodes, media_item, config) + + def cancel_all_tasks(self) -> int: + """Cancel all running preview tasks.""" + if not self._manager: + return 0 + + cancelled = 0 + if self._manager._preview_worker: + cancelled += self._manager._preview_worker.cancel_all_tasks() + if self._manager._episode_worker: + cancelled += self._manager._episode_worker.cancel_all_tasks() + return cancelled + + def get_status(self) -> dict: + """Get status of workers in this context.""" + if self._manager: + return self._manager.get_status() + return {"preview_worker": None, "episode_worker": None} + def get_anime_preview( items: List[MediaItem], titles: List[str], config: AppConfig ) -> str: + """ + Generate anime preview script and start background caching. + + Args: + items: List of media items to preview + titles: Corresponding titles for each media item + config: Application configuration + + Returns: + Preview script content for fzf + """ # Ensure cache directories exist on startup IMAGES_CACHE_DIR.mkdir(parents=True, exist_ok=True) INFO_CACHE_DIR.mkdir(parents=True, exist_ok=True) @@ -52,8 +123,15 @@ def get_anime_preview( preview_script = TEMPLATE_PREVIEW_SCRIPT - # Start the non-blocking background Caching - Thread(target=_cache_worker, args=(items, titles, config), daemon=True).start() + # Start the managed background caching + try: + preview_manager = _get_preview_manager() + worker = preview_manager.get_preview_worker() + worker.cache_anime_previews(items, titles, config) + logger.debug("Started background caching for anime previews") + except Exception as e: + logger.error(f"Failed to start background caching: {e}") + # Continue with script generation even if caching fails # Prepare values to inject into the template path_sep = "\\" if PLATFORM == "win32" else "/" @@ -80,97 +158,20 @@ def get_anime_preview( return preview_script -def _cache_worker(media_items: List[MediaItem], titles: List[str], config: AppConfig): - """The background task that fetches and saves all necessary preview data.""" - with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor: - for media_item, title_str in zip(media_items, titles): - hash_id = _get_cache_hash(title_str) - if config.general.preview in ("full", "image") and media_item.cover_image: - if not (IMAGES_CACHE_DIR / f"{hash_id}.png").exists(): - executor.submit( - _save_image_from_url, media_item.cover_image.large, hash_id - ) - if config.general.preview in ("full", "text"): - # TODO: Come up with a better caching pattern for now just let it be remade - if not (INFO_CACHE_DIR / hash_id).exists() or True: - info_text = _populate_info_template(media_item, config) - executor.submit(_save_info_text, info_text, hash_id) - - -def _populate_info_template(media_item: MediaItem, config: AppConfig) -> str: - """ - Takes the info.sh template and injects formatted, shell-safe data. - """ - info_script = TEMPLATE_INFO_SCRIPT - description = formatter.clean_html( - media_item.description or "No description available." - ) - - # Escape all variables before injecting them into the script - replacements = { - "TITLE": formatter.shell_safe( - media_item.title.english or media_item.title.romaji - ), - "STATUS": formatter.shell_safe(media_item.status.value), - "FORMAT": formatter.shell_safe(media_item.format.value), - "NEXT_EPISODE": formatter.shell_safe( - f"Episode {media_item.next_airing.episode} on {formatter.format_date(media_item.next_airing.airing_at, '%A, %d %B %Y at %X)')}" - if media_item.next_airing - else "N/A" - ), - "EPISODES": formatter.shell_safe(str(media_item.episodes)), - "DURATION": formatter.shell_safe( - formatter.format_media_duration(media_item.duration) - ), - "SCORE": formatter.shell_safe( - formatter.format_score_stars_full(media_item.average_score) - ), - "FAVOURITES": formatter.shell_safe( - formatter.format_number_with_commas(media_item.favourites) - ), - "POPULARITY": formatter.shell_safe( - formatter.format_number_with_commas(media_item.popularity) - ), - "GENRES": formatter.shell_safe( - formatter.format_list_with_commas([v.value for v in media_item.genres]) - ), - "TAGS": formatter.shell_safe( - formatter.format_list_with_commas([t.name.value for t in media_item.tags]) - ), - "STUDIOS": formatter.shell_safe( - formatter.format_list_with_commas( - [t.name for t in media_item.studios if t.name] - ) - ), - "SYNONYMNS": formatter.shell_safe( - formatter.format_list_with_commas(media_item.synonymns) - ), - "USER_STATUS": formatter.shell_safe( - media_item.user_status.status.value - if media_item.user_status and media_item.user_status.status - else "NOT_ON_LIST" - ), - "USER_PROGRESS": formatter.shell_safe( - f"Episode {media_item.user_status.progress}" - if media_item.user_status - else "0" - ), - "START_DATE": formatter.shell_safe( - formatter.format_date(media_item.start_date) - ), - "END_DATE": formatter.shell_safe(formatter.format_date(media_item.end_date)), - "SYNOPSIS": formatter.shell_safe(description), - } - - for key, value in replacements.items(): - info_script = info_script.replace(f"{{{key}}}", value) - - return info_script - - def get_episode_preview( episodes: List[str], media_item: MediaItem, config: AppConfig ) -> str: + """ + Generate episode preview script and start background caching. + + Args: + episodes: List of episode identifiers + media_item: Media item containing episode data + config: Application configuration + + Returns: + Preview script content for fzf + """ IMAGES_CACHE_DIR.mkdir(parents=True, exist_ok=True) INFO_CACHE_DIR.mkdir(parents=True, exist_ok=True) @@ -178,10 +179,16 @@ def get_episode_preview( SEPARATOR_COLOR = config.fzf.preview_separator_color.split(",") preview_script = TEMPLATE_PREVIEW_SCRIPT - # Start background caching for episodes - Thread( - target=_episode_cache_worker, args=(episodes, media_item, config), daemon=True - ).start() + + # Start managed background caching for episodes + try: + preview_manager = _get_preview_manager() + worker = preview_manager.get_episode_worker() + worker.cache_episode_previews(episodes, media_item, config) + logger.debug("Started background caching for episode previews") + except Exception as e: + logger.error(f"Failed to start episode background caching: {e}") + # Continue with script generation even if caching fails # Prepare values to inject into the template path_sep = "\\" if PLATFORM == "win32" else "/" @@ -208,107 +215,31 @@ def get_episode_preview( return preview_script -def _episode_cache_worker( - episodes: List[str], media_item: MediaItem, config: AppConfig -): - """Background task that fetches and saves episode preview data.""" - streaming_episodes = media_item.streaming_episodes - - with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor: - for episode_str in episodes: - hash_id = _get_cache_hash( - f"{media_item.title.english}_Episode_{episode_str}" - ) - - # Find matching streaming episode - title = None - thumbnail = None - if ep := streaming_episodes.get(episode_str): - title = ep.title - thumbnail = ep.thumbnail - - # Fallback if no streaming episode found - if not title: - title = f"Episode {episode_str}" - - # fallback - if not thumbnail and media_item.cover_image: - thumbnail = media_item.cover_image.large - - # Download thumbnail if available - if thumbnail: - executor.submit(_save_image_from_url, thumbnail, hash_id) - - # Generate and save episode info - episode_info = _populate_episode_info_template(config, title, media_item) - executor.submit(_save_info_text, episode_info, hash_id) +def _get_preview_manager() -> PreviewWorkerManager: + """Get or create the global preview worker manager.""" + global _preview_manager + if _preview_manager is None: + _preview_manager = PreviewWorkerManager(IMAGES_CACHE_DIR, INFO_CACHE_DIR) + return _preview_manager -def _populate_episode_info_template( - config: AppConfig, title: str, media_item: MediaItem -) -> str: +def shutdown_preview_workers(wait: bool = True, timeout: Optional[float] = 5.0) -> None: """ - Takes the episode_info.sh template and injects episode-specific formatted data. + Shutdown all preview workers. + + Args: + wait: Whether to wait for tasks to complete + timeout: Maximum time to wait for shutdown """ - episode_info_script = TEMPLATE_EPISODE_INFO_SCRIPT - - replacements = { - "TITLE": formatter.shell_safe(title), - "NEXT_EPISODE": formatter.shell_safe( - f"Episode {media_item.next_airing.episode} on {formatter.format_date(media_item.next_airing.airing_at, '%A, %d %B %Y at %X)')}" - if media_item.next_airing - else "N/A" - ), - "DURATION": formatter.format_media_duration(media_item.duration), - "STATUS": formatter.shell_safe(media_item.status.value), - "EPISODES": formatter.shell_safe(str(media_item.episodes)), - "USER_STATUS": formatter.shell_safe( - media_item.user_status.status.value - if media_item.user_status and media_item.user_status.status - else "NOT_ON_LIST" - ), - "USER_PROGRESS": formatter.shell_safe( - f"Episode {media_item.user_status.progress}" - if media_item.user_status - else "0" - ), - "START_DATE": formatter.shell_safe( - formatter.format_date(media_item.start_date) - ), - "END_DATE": formatter.shell_safe(formatter.format_date(media_item.end_date)), - } - - for key, value in replacements.items(): - episode_info_script = episode_info_script.replace(f"{{{key}}}", value) - - return episode_info_script + global _preview_manager + if _preview_manager: + _preview_manager.shutdown_all(wait=wait, timeout=timeout) + _preview_manager = None -def _get_cache_hash(text: str) -> str: - """Generates a consistent SHA256 hash for a given string to use as a filename.""" - return sha256(text.encode("utf-8")).hexdigest() - - -def _save_image_from_url(url: str, hash_id: str): - """Downloads an image using httpx and saves it to the cache.""" - image_path = IMAGES_CACHE_DIR / f"{hash_id}.png" - try: - with httpx.stream("GET", url, follow_redirects=True, timeout=20) as response: - response.raise_for_status() - with AtomicWriter(image_path, "wb", encoding=None) as f: - chunks = b"" - for chunk in response.iter_bytes(): - chunks += chunk - f.write(chunks) - except Exception as e: - logger.error(f"Failed to download image {url}: {e}") - - -def _save_info_text(info_text: str, hash_id: str): - """Saves pre-formatted text to the info cache.""" - try: - info_path = INFO_CACHE_DIR / hash_id - with AtomicWriter(info_path) as f: - f.write(info_text) - except IOError as e: - logger.error(f"Failed to write info cache for {hash_id}: {e}") +def get_preview_worker_status() -> dict: + """Get status of all preview workers.""" + global _preview_manager + if _preview_manager: + return _preview_manager.get_status() + return {"preview_worker": None, "episode_worker": None} diff --git a/fastanime/cli/utils/preview_workers.py b/fastanime/cli/utils/preview_workers.py new file mode 100644 index 0000000..184058a --- /dev/null +++ b/fastanime/cli/utils/preview_workers.py @@ -0,0 +1,475 @@ +""" +Preview-specific background workers for caching anime and episode data. + +This module provides specialized workers for handling anime preview caching, +including image downloads and info text generation with proper lifecycle management. +""" + +import logging +from typing import List, Optional + +import httpx +from ...core.constants import SCRIPTS_DIR + +from ...core.config import AppConfig +from ...core.utils import formatter +from ...core.utils.concurrency import ( + ManagedBackgroundWorker, + WorkerTask, + thread_manager, +) +from ...core.utils.file import AtomicWriter +from ...libs.media_api.types import MediaItem + +logger = logging.getLogger(__name__) +FZF_SCRIPTS_DIR = SCRIPTS_DIR / "fzf" + +TEMPLATE_INFO_SCRIPT = (FZF_SCRIPTS_DIR / "info.template.sh").read_text( + encoding="utf-8" +) +TEMPLATE_EPISODE_INFO_SCRIPT = (FZF_SCRIPTS_DIR / "episode-info.template.sh").read_text( + encoding="utf-8" +) + + +class PreviewCacheWorker(ManagedBackgroundWorker): + """ + Specialized background worker for caching anime preview data. + + Handles downloading images and generating info text for anime previews + with proper error handling and resource management. + """ + + def __init__(self, images_cache_dir, info_cache_dir, max_workers: int = 10): + """ + Initialize the preview cache worker. + + Args: + images_cache_dir: Directory to cache images + info_cache_dir: Directory to cache info text + max_workers: Maximum number of concurrent workers + """ + super().__init__(max_workers=max_workers, name="PreviewCacheWorker") + self.images_cache_dir = images_cache_dir + self.info_cache_dir = info_cache_dir + self._http_client: Optional[httpx.Client] = None + + def start(self) -> None: + """Start the worker and initialize HTTP client.""" + super().start() + self._http_client = httpx.Client( + timeout=20.0, + follow_redirects=True, + limits=httpx.Limits(max_connections=self.max_workers), + ) + logger.debug("PreviewCacheWorker HTTP client initialized") + + def shutdown(self, wait: bool = True, timeout: Optional[float] = 30.0) -> None: + """Shutdown the worker and cleanup HTTP client.""" + super().shutdown(wait=wait, timeout=timeout) + if self._http_client: + self._http_client.close() + self._http_client = None + logger.debug("PreviewCacheWorker HTTP client closed") + + def cache_anime_previews( + self, media_items: List[MediaItem], titles: List[str], config: AppConfig + ) -> None: + """ + Cache preview data for multiple anime items. + + Args: + media_items: List of media items to cache + titles: Corresponding titles for each media item + config: Application configuration + """ + if not self.is_running(): + raise RuntimeError("PreviewCacheWorker is not running") + + for media_item, title_str in zip(media_items, titles): + hash_id = self._get_cache_hash(title_str) + + # Submit image download task if needed + if config.general.preview in ("full", "image") and media_item.cover_image: + image_path = self.images_cache_dir / f"{hash_id}.png" + if not image_path.exists(): + self.submit_function( + self._download_and_save_image, + media_item.cover_image.large, + hash_id, + ) + + # Submit info generation task if needed + if config.general.preview in ("full", "text"): + info_path = self.info_cache_dir / hash_id + if not info_path.exists(): + info_text = self._generate_info_text(media_item, config) + self.submit_function(self._save_info_text, info_text, hash_id) + + def _download_and_save_image(self, url: str, hash_id: str) -> None: + """Download an image and save it to cache.""" + if not self._http_client: + raise RuntimeError("HTTP client not initialized") + + image_path = self.images_cache_dir / f"{hash_id}.png" + + try: + with self._http_client.stream("GET", url) as response: + response.raise_for_status() + + with AtomicWriter(image_path, "wb", encoding=None) as f: + for chunk in response.iter_bytes(): + f.write(chunk) + + logger.debug(f"Successfully cached image: {hash_id}") + + except Exception as e: + logger.error(f"Failed to download image {url}: {e}") + raise + + def _generate_info_text(self, media_item: MediaItem, config: AppConfig) -> str: + """Generate formatted info text for a media item.""" + # Import here to avoid circular imports + info_script = TEMPLATE_INFO_SCRIPT + description = formatter.clean_html( + media_item.description or "No description available." + ) + + # Escape all variables before injecting them into the script + replacements = { + "TITLE": formatter.shell_safe( + media_item.title.english or media_item.title.romaji + ), + "STATUS": formatter.shell_safe(media_item.status.value), + "FORMAT": formatter.shell_safe(media_item.format.value), + "NEXT_EPISODE": formatter.shell_safe( + f"Episode {media_item.next_airing.episode} on {formatter.format_date(media_item.next_airing.airing_at, '%A, %d %B %Y at %X)')}" + if media_item.next_airing + else "N/A" + ), + "EPISODES": formatter.shell_safe(str(media_item.episodes)), + "DURATION": formatter.shell_safe( + formatter.format_media_duration(media_item.duration) + ), + "SCORE": formatter.shell_safe( + formatter.format_score_stars_full(media_item.average_score) + ), + "FAVOURITES": formatter.shell_safe( + formatter.format_number_with_commas(media_item.favourites) + ), + "POPULARITY": formatter.shell_safe( + formatter.format_number_with_commas(media_item.popularity) + ), + "GENRES": formatter.shell_safe( + formatter.format_list_with_commas([v.value for v in media_item.genres]) + ), + "TAGS": formatter.shell_safe( + formatter.format_list_with_commas( + [t.name.value for t in media_item.tags] + ) + ), + "STUDIOS": formatter.shell_safe( + formatter.format_list_with_commas( + [t.name for t in media_item.studios if t.name] + ) + ), + "SYNONYMNS": formatter.shell_safe( + formatter.format_list_with_commas(media_item.synonymns) + ), + "USER_STATUS": formatter.shell_safe( + media_item.user_status.status.value + if media_item.user_status and media_item.user_status.status + else "NOT_ON_LIST" + ), + "USER_PROGRESS": formatter.shell_safe( + f"Episode {media_item.user_status.progress}" + if media_item.user_status + else "0" + ), + "START_DATE": formatter.shell_safe( + formatter.format_date(media_item.start_date) + ), + "END_DATE": formatter.shell_safe( + formatter.format_date(media_item.end_date) + ), + "SYNOPSIS": formatter.shell_safe(description), + } + + for key, value in replacements.items(): + info_script = info_script.replace(f"{{{key}}}", value) + + return info_script + + def _save_info_text(self, info_text: str, hash_id: str) -> None: + """Save info text to cache.""" + try: + info_path = self.info_cache_dir / hash_id + with AtomicWriter(info_path) as f: + f.write(info_text) + logger.debug(f"Successfully cached info: {hash_id}") + except IOError as e: + logger.error(f"Failed to write info cache for {hash_id}: {e}") + raise + + def _get_cache_hash(self, text: str) -> str: + """Generate a cache hash for the given text.""" + from hashlib import sha256 + + return sha256(text.encode("utf-8")).hexdigest() + + def _on_task_completed(self, task: WorkerTask, future) -> None: + """Handle task completion with enhanced logging.""" + super()._on_task_completed(task, future) + + if future.exception(): + logger.warning(f"Preview cache task failed: {future.exception()}") + else: + logger.debug("Preview cache task completed successfully") + + +class EpisodeCacheWorker(ManagedBackgroundWorker): + """ + Specialized background worker for caching episode preview data. + + Handles episode-specific caching including thumbnails and episode info + with proper error handling and resource management. + """ + + def __init__(self, images_cache_dir, info_cache_dir, max_workers: int = 5): + """ + Initialize the episode cache worker. + + Args: + images_cache_dir: Directory to cache images + info_cache_dir: Directory to cache info text + max_workers: Maximum number of concurrent workers + """ + super().__init__(max_workers=max_workers, name="EpisodeCacheWorker") + self.images_cache_dir = images_cache_dir + self.info_cache_dir = info_cache_dir + self._http_client: Optional[httpx.Client] = None + + def start(self) -> None: + """Start the worker and initialize HTTP client.""" + super().start() + self._http_client = httpx.Client( + timeout=20.0, + follow_redirects=True, + limits=httpx.Limits(max_connections=self.max_workers), + ) + logger.debug("EpisodeCacheWorker HTTP client initialized") + + def shutdown(self, wait: bool = True, timeout: Optional[float] = 30.0) -> None: + """Shutdown the worker and cleanup HTTP client.""" + super().shutdown(wait=wait, timeout=timeout) + if self._http_client: + self._http_client.close() + self._http_client = None + logger.debug("EpisodeCacheWorker HTTP client closed") + + def cache_episode_previews( + self, episodes: List[str], media_item: MediaItem, config: AppConfig + ) -> None: + """ + Cache preview data for multiple episodes. + + Args: + episodes: List of episode identifiers + media_item: Media item containing episode data + config: Application configuration + """ + if not self.is_running(): + raise RuntimeError("EpisodeCacheWorker is not running") + + streaming_episodes = media_item.streaming_episodes + + for episode_str in episodes: + hash_id = self._get_cache_hash( + f"{media_item.title.english}_Episode_{episode_str}" + ) + + # Find episode data + episode_data = streaming_episodes.get(episode_str) + title = episode_data.title if episode_data else f"Episode {episode_str}" + thumbnail = None + + if episode_data and episode_data.thumbnail: + thumbnail = episode_data.thumbnail + elif media_item.cover_image: + thumbnail = media_item.cover_image.large + + # Submit thumbnail download task + if thumbnail: + self.submit_function(self._download_and_save_image, thumbnail, hash_id) + + # Submit episode info generation task + episode_info = self._generate_episode_info(config, title, media_item) + self.submit_function(self._save_info_text, episode_info, hash_id) + + def _download_and_save_image(self, url: str, hash_id: str) -> None: + """Download an image and save it to cache.""" + if not self._http_client: + raise RuntimeError("HTTP client not initialized") + + image_path = self.images_cache_dir / f"{hash_id}.png" + + try: + with self._http_client.stream("GET", url) as response: + response.raise_for_status() + + with AtomicWriter(image_path, "wb", encoding=None) as f: + for chunk in response.iter_bytes(): + f.write(chunk) + + logger.debug(f"Successfully cached episode image: {hash_id}") + + except Exception as e: + logger.error(f"Failed to download episode image {url}: {e}") + raise + + def _generate_episode_info( + self, config: AppConfig, title: str, media_item: MediaItem + ) -> str: + """Generate formatted episode info text.""" + episode_info_script = TEMPLATE_EPISODE_INFO_SCRIPT + + replacements = { + "TITLE": formatter.shell_safe(title), + "NEXT_EPISODE": formatter.shell_safe( + f"Episode {media_item.next_airing.episode} on {formatter.format_date(media_item.next_airing.airing_at, '%A, %d %B %Y at %X)')}" + if media_item.next_airing + else "N/A" + ), + "DURATION": formatter.format_media_duration(media_item.duration), + "STATUS": formatter.shell_safe(media_item.status.value), + "EPISODES": formatter.shell_safe(str(media_item.episodes)), + "USER_STATUS": formatter.shell_safe( + media_item.user_status.status.value + if media_item.user_status and media_item.user_status.status + else "NOT_ON_LIST" + ), + "USER_PROGRESS": formatter.shell_safe( + f"Episode {media_item.user_status.progress}" + if media_item.user_status + else "0" + ), + "START_DATE": formatter.shell_safe( + formatter.format_date(media_item.start_date) + ), + "END_DATE": formatter.shell_safe( + formatter.format_date(media_item.end_date) + ), + } + + for key, value in replacements.items(): + episode_info_script = episode_info_script.replace(f"{{{key}}}", value) + + return episode_info_script + + def _save_info_text(self, info_text: str, hash_id: str) -> None: + """Save episode info text to cache.""" + try: + info_path = self.info_cache_dir / hash_id + with AtomicWriter(info_path) as f: + f.write(info_text) + logger.debug(f"Successfully cached episode info: {hash_id}") + except IOError as e: + logger.error(f"Failed to write episode info cache for {hash_id}: {e}") + raise + + def _get_cache_hash(self, text: str) -> str: + """Generate a cache hash for the given text.""" + from hashlib import sha256 + + return sha256(text.encode("utf-8")).hexdigest() + + def _on_task_completed(self, task: WorkerTask, future) -> None: + """Handle task completion with enhanced logging.""" + super()._on_task_completed(task, future) + + if future.exception(): + logger.warning(f"Episode cache task failed: {future.exception()}") + else: + logger.debug("Episode cache task completed successfully") + + +class PreviewWorkerManager: + """ + High-level manager for preview caching workers. + + Provides a simple interface for managing both anime and episode preview + caching workers with automatic lifecycle management. + """ + + def __init__(self, images_cache_dir, info_cache_dir): + """ + Initialize the preview worker manager. + + Args: + images_cache_dir: Directory to cache images + info_cache_dir: Directory to cache info text + """ + self.images_cache_dir = images_cache_dir + self.info_cache_dir = info_cache_dir + self._preview_worker: Optional[PreviewCacheWorker] = None + self._episode_worker: Optional[EpisodeCacheWorker] = None + + def get_preview_worker(self) -> PreviewCacheWorker: + """Get or create the preview cache worker.""" + if self._preview_worker is None or not self._preview_worker.is_running(): + if self._preview_worker: + # Clean up old worker + thread_manager.shutdown_worker("preview_cache_worker") + + self._preview_worker = PreviewCacheWorker( + self.images_cache_dir, self.info_cache_dir + ) + self._preview_worker.start() + thread_manager.register_worker("preview_cache_worker", self._preview_worker) + + return self._preview_worker + + def get_episode_worker(self) -> EpisodeCacheWorker: + """Get or create the episode cache worker.""" + if self._episode_worker is None or not self._episode_worker.is_running(): + if self._episode_worker: + # Clean up old worker + thread_manager.shutdown_worker("episode_cache_worker") + + self._episode_worker = EpisodeCacheWorker( + self.images_cache_dir, self.info_cache_dir + ) + self._episode_worker.start() + thread_manager.register_worker("episode_cache_worker", self._episode_worker) + + return self._episode_worker + + def shutdown_all(self, wait: bool = True, timeout: Optional[float] = 30.0) -> None: + """Shutdown all managed workers.""" + thread_manager.shutdown_worker( + "preview_cache_worker", wait=wait, timeout=timeout + ) + thread_manager.shutdown_worker( + "episode_cache_worker", wait=wait, timeout=timeout + ) + self._preview_worker = None + self._episode_worker = None + + def get_status(self) -> dict: + """Get status of all managed workers.""" + return { + "preview_worker": self._preview_worker.get_completion_stats() + if self._preview_worker + else None, + "episode_worker": self._episode_worker.get_completion_stats() + if self._episode_worker + else None, + } + + def __enter__(self): + """Context manager entry - workers are created on demand.""" + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Context manager exit with automatic cleanup.""" + self.shutdown_all(wait=False, timeout=5.0) diff --git a/fastanime/core/utils/__init__.py b/fastanime/core/utils/__init__.py new file mode 100644 index 0000000..487ede2 --- /dev/null +++ b/fastanime/core/utils/__init__.py @@ -0,0 +1,7 @@ +""" +Core utilities for FastAnime application. + +This module provides various utility classes and functions used throughout +the FastAnime application, including concurrency management, file operations, +and other common functionality. +""" diff --git a/fastanime/core/utils/concurrency.py b/fastanime/core/utils/concurrency.py new file mode 100644 index 0000000..40edb79 --- /dev/null +++ b/fastanime/core/utils/concurrency.py @@ -0,0 +1,389 @@ +""" +Concurrency utilities for managing background tasks and thread lifecycle. + +This module provides abstract base classes and concrete implementations for managing +background workers with proper lifecycle control, cancellation support, and resource cleanup. +""" + +import logging +import threading +from abc import ABC, abstractmethod +from concurrent.futures import Future, ThreadPoolExecutor +from typing import Any, Callable, Dict, List, Optional, Protocol, TypeVar +from weakref import WeakSet + +logger = logging.getLogger(__name__) + +T = TypeVar("T") + + +class Cancellable(Protocol): + """Protocol for objects that can be cancelled.""" + + def cancel(self) -> bool: + """Cancel the operation. Returns True if cancellation was successful.""" + ... + + def cancelled(self) -> bool: + """Return True if the operation was cancelled.""" + ... + + +class WorkerTask: + """Represents a single task that can be executed by a worker.""" + + def __init__(self, func: Callable[..., Any], *args, **kwargs): + """ + Initialize a worker task. + + Args: + func: The function to execute + *args: Positional arguments for the function + **kwargs: Keyword arguments for the function + """ + self.func = func + self.args = args + self.kwargs = kwargs + self._cancelled = threading.Event() + self._completed = threading.Event() + self._exception: Optional[Exception] = None + self._result: Any = None + + def execute(self) -> Any: + """Execute the task if not cancelled.""" + if self._cancelled.is_set(): + return None + + try: + self._result = self.func(*self.args, **self.kwargs) + return self._result + except Exception as e: + self._exception = e + logger.error(f"Task execution failed: {e}") + raise + finally: + self._completed.set() + + def cancel(self) -> bool: + """Cancel the task.""" + if self._completed.is_set(): + return False + self._cancelled.set() + return True + + def cancelled(self) -> bool: + """Check if the task was cancelled.""" + return self._cancelled.is_set() + + def completed(self) -> bool: + """Check if the task completed.""" + return self._completed.is_set() + + @property + def exception(self) -> Optional[Exception]: + """Get the exception if one occurred during execution.""" + return self._exception + + @property + def result(self) -> Any: + """Get the result of the task execution.""" + return self._result + + +class BackgroundWorker(ABC): + """ + Abstract base class for background workers that manage concurrent tasks. + + Provides lifecycle management, cancellation support, and proper resource cleanup. + """ + + def __init__(self, max_workers: int = 5, name: Optional[str] = None): + """ + Initialize the background worker. + + Args: + max_workers: Maximum number of concurrent worker threads + name: Optional name for the worker (used in logging) + """ + self.max_workers = max_workers + self.name = name or self.__class__.__name__ + self._executor: Optional[ThreadPoolExecutor] = None + self._futures: WeakSet[Future] = WeakSet() + self._tasks: List[WorkerTask] = [] + self._shutdown_event = threading.Event() + self._lock = threading.RLock() + self._started = False + + def start(self) -> None: + """Start the background worker.""" + with self._lock: + if self._started: + logger.warning(f"Worker {self.name} is already started") + return + + self._executor = ThreadPoolExecutor( + max_workers=self.max_workers, thread_name_prefix=f"{self.name}-worker" + ) + self._started = True + logger.debug(f"Started background worker: {self.name}") + + def submit_task(self, task: WorkerTask) -> Future: + """ + Submit a task for background execution. + + Args: + task: The task to execute + + Returns: + Future representing the task execution + + Raises: + RuntimeError: If the worker is not started or is shutting down + """ + with self._lock: + if not self._started or self._shutdown_event.is_set(): + raise RuntimeError(f"Worker {self.name} is not available") + + if self._executor is None: + raise RuntimeError(f"Worker {self.name} executor is not initialized") + + self._tasks.append(task) + future = self._executor.submit(task.execute) + self._futures.add(future) + + logger.debug(f"Submitted task to worker {self.name}") + return future + + def submit_function(self, func: Callable[..., Any], *args, **kwargs) -> Future: + """ + Submit a function for background execution. + + Args: + func: The function to execute + *args: Positional arguments for the function + **kwargs: Keyword arguments for the function + + Returns: + Future representing the task execution + """ + task = WorkerTask(func, *args, **kwargs) + return self.submit_task(task) + + def cancel_all_tasks(self) -> int: + """ + Cancel all pending and running tasks. + + Returns: + Number of tasks that were successfully cancelled + """ + cancelled_count = 0 + + with self._lock: + # Cancel all tasks + for task in self._tasks: + if task.cancel(): + cancelled_count += 1 + + # Cancel all futures + for future in list(self._futures): + if future.cancel(): + cancelled_count += 1 + + logger.debug(f"Cancelled {cancelled_count} tasks in worker {self.name}") + return cancelled_count + + def shutdown(self, wait: bool = True, timeout: Optional[float] = 30.0) -> None: + """ + Shutdown the background worker. + + Args: + wait: Whether to wait for running tasks to complete + timeout: Maximum time to wait for shutdown (ignored if wait=False) + """ + with self._lock: + if not self._started: + return + + self._shutdown_event.set() + self._started = False + + if self._executor is None: + return + + logger.debug(f"Shutting down worker {self.name}") + + if not wait: + # Cancel all tasks and shutdown immediately + self.cancel_all_tasks() + self._executor.shutdown(wait=False, cancel_futures=True) + else: + # Wait for tasks to complete with timeout + try: + self._executor.shutdown(wait=True, timeout=timeout) + except TimeoutError: + logger.warning( + f"Worker {self.name} shutdown timed out, forcing cancellation" + ) + self.cancel_all_tasks() + self._executor.shutdown(wait=False, cancel_futures=True) + + self._executor = None + logger.debug(f"Worker {self.name} shutdown complete") + + def is_running(self) -> bool: + """Check if the worker is currently running.""" + return self._started and not self._shutdown_event.is_set() + + def get_active_task_count(self) -> int: + """Get the number of active (non-completed) tasks.""" + with self._lock: + return sum(1 for task in self._tasks if not task.completed()) + + @abstractmethod + def _on_task_completed(self, task: WorkerTask, future: Future) -> None: + """ + Hook called when a task completes (successfully or with error). + + Args: + task: The completed task + future: The future representing the task execution + """ + pass + + def __enter__(self): + """Context manager entry.""" + self.start() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Context manager exit with automatic cleanup.""" + self.shutdown(wait=True) + + +class ManagedBackgroundWorker(BackgroundWorker): + """ + Concrete implementation of BackgroundWorker with task completion tracking. + + This worker provides additional monitoring and logging of task completion. + """ + + def __init__(self, max_workers: int = 5, name: Optional[str] = None): + super().__init__(max_workers, name) + self._completed_tasks: List[WorkerTask] = [] + self._failed_tasks: List[WorkerTask] = [] + + def _on_task_completed(self, task: WorkerTask, future: Future) -> None: + """Track completed tasks and log results.""" + try: + if future.exception(): + self._failed_tasks.append(task) + logger.error(f"Task failed in worker {self.name}: {future.exception()}") + else: + self._completed_tasks.append(task) + logger.debug(f"Task completed successfully in worker {self.name}") + except Exception as e: + logger.error(f"Error in task completion handler: {e}") + + def get_completion_stats(self) -> Dict[str, int]: + """Get statistics about task completion.""" + with self._lock: + return { + "total_tasks": len(self._tasks), + "completed_tasks": len(self._completed_tasks), + "failed_tasks": len(self._failed_tasks), + "active_tasks": self.get_active_task_count(), + } + + +class ThreadManager: + """ + Manages multiple background workers and provides centralized control. + + This class acts as a registry for all background workers in the application, + allowing for coordinated shutdown and monitoring. + """ + + def __init__(self): + self._workers: Dict[str, BackgroundWorker] = {} + self._lock = threading.RLock() + + def register_worker(self, name: str, worker: BackgroundWorker) -> None: + """ + Register a background worker. + + Args: + name: Unique name for the worker + worker: The worker instance to register + """ + with self._lock: + if name in self._workers: + raise ValueError(f"Worker with name '{name}' already registered") + self._workers[name] = worker + logger.debug(f"Registered worker: {name}") + + def get_worker(self, name: str) -> Optional[BackgroundWorker]: + """Get a registered worker by name.""" + with self._lock: + return self._workers.get(name) + + def shutdown_worker( + self, name: str, wait: bool = True, timeout: Optional[float] = 30.0 + ) -> bool: + """ + Shutdown a specific worker. + + Args: + name: Name of the worker to shutdown + wait: Whether to wait for completion + timeout: Shutdown timeout + + Returns: + True if worker was found and shutdown, False otherwise + """ + with self._lock: + worker = self._workers.get(name) + if worker: + worker.shutdown(wait=wait, timeout=timeout) + del self._workers[name] + logger.debug(f"Shutdown worker: {name}") + return True + return False + + def shutdown_all(self, wait: bool = True, timeout: Optional[float] = 30.0) -> None: + """Shutdown all registered workers.""" + with self._lock: + workers_to_shutdown = list(self._workers.items()) + + for name, worker in workers_to_shutdown: + try: + worker.shutdown(wait=wait, timeout=timeout) + logger.debug(f"Shutdown worker: {name}") + except Exception as e: + logger.error(f"Error shutting down worker {name}: {e}") + + with self._lock: + self._workers.clear() + + def get_all_workers(self) -> Dict[str, BackgroundWorker]: + """Get a copy of all registered workers.""" + with self._lock: + return self._workers.copy() + + def get_status(self) -> Dict[str, Dict[str, Any]]: + """Get status information for all workers.""" + status = {} + with self._lock: + for name, worker in self._workers.items(): + status[name] = { + "running": worker.is_running(), + "active_tasks": worker.get_active_task_count(), + } + + if isinstance(worker, ManagedBackgroundWorker): + status[name].update(worker.get_completion_stats()) + + return status + + +# Global thread manager instance +thread_manager = ThreadManager()