Refactor preview management and caching system

- Introduced a new PreviewWorkerManager to handle both anime and episode preview caching.
- Implemented PreviewCacheWorker and EpisodeCacheWorker for specialized background tasks.
- Added context management for preview operations to ensure proper cleanup.
- Enhanced error handling and logging during image downloads and info text generation.
- Removed redundant caching logic and consolidated functionality into worker classes.
- Updated session management to clean up preview workers on session end.
- Removed unused utility functions and streamlined the codebase.
This commit is contained in:
Benexl
2025-07-26 11:54:01 +03:00
parent 1c95d45be4
commit 7088b8ce18
11 changed files with 1073 additions and 243 deletions

View File

@@ -39,21 +39,34 @@ def episodes(ctx: Context, state: State) -> State | InternalDirective:
preview_command = None
if ctx.config.general.preview != "none":
from ....utils.preview import get_episode_preview
from ....utils.preview import create_preview_context
preview_command = get_episode_preview(
available_episodes, media_item, ctx.config
with create_preview_context() as preview_ctx:
preview_command = preview_ctx.get_episode_preview(
available_episodes, media_item, ctx.config
)
chosen_episode_str = ctx.selector.choose(
prompt="Select Episode", choices=choices, preview=preview_command
)
if not chosen_episode_str or chosen_episode_str == "Back":
# TODO: should improve the back logic for menus that can be pass through
return InternalDirective.BACKX2
chosen_episode = chosen_episode_str
# Workers are automatically cleaned up when exiting the context
else:
# No preview mode
chosen_episode_str = ctx.selector.choose(
prompt="Select Episode", choices=choices, preview=None
)
chosen_episode_str = ctx.selector.choose(
prompt="Select Episode", choices=choices, preview=preview_command
)
if not chosen_episode_str or chosen_episode_str == "Back":
# TODO: should improve the back logic for menus that can be pass through
return InternalDirective.BACKX2
if not chosen_episode_str or chosen_episode_str == "Back":
# TODO: should improve the back logic for menus that can be pass through
return InternalDirective.BACKX2
chosen_episode = chosen_episode_str
chosen_episode = chosen_episode_str
# Track episode selection in watch history (if enabled in config)
if (

View File

@@ -343,7 +343,6 @@ def _view_info(ctx: Context, state: State) -> MenuAction:
# Create next airing panel if available
if media_item.next_airing:
airing_info_table = Table(show_header=False, box=box.SIMPLE)
airing_info_table.add_column("Field", style="bold red")
airing_info_table.add_column("Value", style="white")

View File

@@ -23,17 +23,6 @@ def results(ctx: Context, state: State) -> State | InternalDirective:
_format_title(ctx, media_item): media_item
for media_item in search_result.values()
}
preview_command = None
if ctx.config.general.preview != "none":
from ....utils.preview import get_anime_preview
preview_command = get_anime_preview(
list(search_result_dict.values()),
list(search_result_dict.keys()),
ctx.config,
)
choices: Dict[str, Callable[[], Union[int, State, InternalDirective]]] = {
title: lambda media_id=item.id: media_id
for title, item in search_result_dict.items()
@@ -64,11 +53,31 @@ def results(ctx: Context, state: State) -> State | InternalDirective:
}
)
choice = ctx.selector.choose(
prompt="Select Anime",
choices=list(choices),
preview=preview_command,
)
preview_command = None
if ctx.config.general.preview != "none":
from ....utils.preview import create_preview_context
with create_preview_context() as preview_ctx:
preview_command = preview_ctx.get_anime_preview(
list(search_result_dict.values()),
list(search_result_dict.keys()),
ctx.config,
)
choice = ctx.selector.choose(
prompt="Select Anime",
choices=list(choices),
preview=preview_command,
)
else:
# No preview mode
choice = ctx.selector.choose(
prompt="Select Anime",
choices=list(choices),
preview=None,
)
if not choice:
return InternalDirective.RELOAD

View File

@@ -134,8 +134,21 @@ class Session:
except Exception:
self._context.service.session.create_crash_backup(self._history)
raise
finally:
# Clean up preview workers when session ends
self._cleanup_preview_workers()
self._context.service.session.save_session(self._history)
def _cleanup_preview_workers(self):
"""Clean up preview workers when session ends."""
try:
from ..utils.preview import shutdown_preview_workers
shutdown_preview_workers(wait=False, timeout=5.0)
logger.debug("Preview workers cleaned up successfully")
except Exception as e:
logger.warning(f"Failed to cleanup preview workers: {e}")
def _run_main_loop(self):
"""Run the main session loop."""
while self._history:

View File

@@ -585,7 +585,6 @@ class MediaRegistryService:
) -> list[tuple[int, str]]:
"""Get all episodes with a specific download status."""
try:
episodes = []
for record in self.get_all_media_records():
for episode in record.media_episodes:
@@ -600,7 +599,6 @@ class MediaRegistryService:
def get_download_statistics(self) -> dict:
"""Get comprehensive download statistics."""
try:
stats = {
"total_episodes": 0,
"downloaded": 0,

View File

@@ -1,5 +0,0 @@
"""CLI utilities for FastAnime."""
from .parser import parse_episode_range
__all__ = ["parse_episode_range"]

View File

@@ -2,6 +2,7 @@ import importlib
import click
# TODO: since command structure is pretty obvious default to only requiring mapping of command names to their function name(cause some have special names like import)
class LazyGroup(click.Group):
def __init__(self, root: str, *args, lazy_subcommands=None, **kwargs):

View File

@@ -1,21 +1,15 @@
import concurrent.futures
import logging
import os
import re
from hashlib import sha256
from pathlib import Path
from threading import Thread
from typing import List
from typing import List, Optional
import httpx
from ...core.utils import formatter
from ...core.config import AppConfig
from ...core.constants import APP_CACHE_DIR, PLATFORM, SCRIPTS_DIR
from ...core.utils.file import AtomicWriter
from ...libs.media_api.types import MediaItem
from . import ansi
from .preview_workers import PreviewWorkerManager
logger = logging.getLogger(__name__)
@@ -26,23 +20,100 @@ IMAGES_CACHE_DIR = PREVIEWS_CACHE_DIR / "images"
INFO_CACHE_DIR = PREVIEWS_CACHE_DIR / "info"
FZF_SCRIPTS_DIR = SCRIPTS_DIR / "fzf"
TEMPLATE_PREVIEW_SCRIPT = Path(str(FZF_SCRIPTS_DIR / "preview.template.sh")).read_text(
TEMPLATE_PREVIEW_SCRIPT = (FZF_SCRIPTS_DIR / "preview.template.sh").read_text(
encoding="utf-8"
)
TEMPLATE_INFO_SCRIPT = Path(str(FZF_SCRIPTS_DIR / "info.template.sh")).read_text(
encoding="utf-8"
)
TEMPLATE_EPISODE_INFO_SCRIPT = Path(
str(FZF_SCRIPTS_DIR / "episode-info.template.sh")
).read_text(encoding="utf-8")
EPISODE_PATTERN = re.compile(r"^Episode\s+(\d+)\s-\s.*")
# Global preview worker manager instance
_preview_manager: Optional[PreviewWorkerManager] = None
def create_preview_context():
"""
Create a context manager for preview operations.
This can be used in menu functions to ensure proper cleanup:
```python
with create_preview_context() as preview_ctx:
preview_script = preview_ctx.get_anime_preview(items, titles, config)
# ... use preview_script
# Workers are automatically cleaned up here
```
Returns:
PreviewContext: A context manager for preview operations
"""
return PreviewContext()
class PreviewContext:
"""Context manager for preview operations with automatic cleanup."""
def __init__(self):
self._manager = None
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self._manager:
try:
self._manager.shutdown_all(wait=False, timeout=3.0)
except Exception as e:
logger.warning(f"Failed to cleanup preview context: {e}")
def get_anime_preview(
self, items: List[MediaItem], titles: List[str], config: AppConfig
) -> str:
"""Get anime preview script with managed workers."""
if not self._manager:
self._manager = _get_preview_manager()
return get_anime_preview(items, titles, config)
def get_episode_preview(
self, episodes: List[str], media_item: MediaItem, config: AppConfig
) -> str:
"""Get episode preview script with managed workers."""
if not self._manager:
self._manager = _get_preview_manager()
return get_episode_preview(episodes, media_item, config)
def cancel_all_tasks(self) -> int:
"""Cancel all running preview tasks."""
if not self._manager:
return 0
cancelled = 0
if self._manager._preview_worker:
cancelled += self._manager._preview_worker.cancel_all_tasks()
if self._manager._episode_worker:
cancelled += self._manager._episode_worker.cancel_all_tasks()
return cancelled
def get_status(self) -> dict:
"""Get status of workers in this context."""
if self._manager:
return self._manager.get_status()
return {"preview_worker": None, "episode_worker": None}
def get_anime_preview(
items: List[MediaItem], titles: List[str], config: AppConfig
) -> str:
"""
Generate anime preview script and start background caching.
Args:
items: List of media items to preview
titles: Corresponding titles for each media item
config: Application configuration
Returns:
Preview script content for fzf
"""
# Ensure cache directories exist on startup
IMAGES_CACHE_DIR.mkdir(parents=True, exist_ok=True)
INFO_CACHE_DIR.mkdir(parents=True, exist_ok=True)
@@ -52,8 +123,15 @@ def get_anime_preview(
preview_script = TEMPLATE_PREVIEW_SCRIPT
# Start the non-blocking background Caching
Thread(target=_cache_worker, args=(items, titles, config), daemon=True).start()
# Start the managed background caching
try:
preview_manager = _get_preview_manager()
worker = preview_manager.get_preview_worker()
worker.cache_anime_previews(items, titles, config)
logger.debug("Started background caching for anime previews")
except Exception as e:
logger.error(f"Failed to start background caching: {e}")
# Continue with script generation even if caching fails
# Prepare values to inject into the template
path_sep = "\\" if PLATFORM == "win32" else "/"
@@ -80,97 +158,20 @@ def get_anime_preview(
return preview_script
def _cache_worker(media_items: List[MediaItem], titles: List[str], config: AppConfig):
"""The background task that fetches and saves all necessary preview data."""
with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
for media_item, title_str in zip(media_items, titles):
hash_id = _get_cache_hash(title_str)
if config.general.preview in ("full", "image") and media_item.cover_image:
if not (IMAGES_CACHE_DIR / f"{hash_id}.png").exists():
executor.submit(
_save_image_from_url, media_item.cover_image.large, hash_id
)
if config.general.preview in ("full", "text"):
# TODO: Come up with a better caching pattern for now just let it be remade
if not (INFO_CACHE_DIR / hash_id).exists() or True:
info_text = _populate_info_template(media_item, config)
executor.submit(_save_info_text, info_text, hash_id)
def _populate_info_template(media_item: MediaItem, config: AppConfig) -> str:
"""
Takes the info.sh template and injects formatted, shell-safe data.
"""
info_script = TEMPLATE_INFO_SCRIPT
description = formatter.clean_html(
media_item.description or "No description available."
)
# Escape all variables before injecting them into the script
replacements = {
"TITLE": formatter.shell_safe(
media_item.title.english or media_item.title.romaji
),
"STATUS": formatter.shell_safe(media_item.status.value),
"FORMAT": formatter.shell_safe(media_item.format.value),
"NEXT_EPISODE": formatter.shell_safe(
f"Episode {media_item.next_airing.episode} on {formatter.format_date(media_item.next_airing.airing_at, '%A, %d %B %Y at %X)')}"
if media_item.next_airing
else "N/A"
),
"EPISODES": formatter.shell_safe(str(media_item.episodes)),
"DURATION": formatter.shell_safe(
formatter.format_media_duration(media_item.duration)
),
"SCORE": formatter.shell_safe(
formatter.format_score_stars_full(media_item.average_score)
),
"FAVOURITES": formatter.shell_safe(
formatter.format_number_with_commas(media_item.favourites)
),
"POPULARITY": formatter.shell_safe(
formatter.format_number_with_commas(media_item.popularity)
),
"GENRES": formatter.shell_safe(
formatter.format_list_with_commas([v.value for v in media_item.genres])
),
"TAGS": formatter.shell_safe(
formatter.format_list_with_commas([t.name.value for t in media_item.tags])
),
"STUDIOS": formatter.shell_safe(
formatter.format_list_with_commas(
[t.name for t in media_item.studios if t.name]
)
),
"SYNONYMNS": formatter.shell_safe(
formatter.format_list_with_commas(media_item.synonymns)
),
"USER_STATUS": formatter.shell_safe(
media_item.user_status.status.value
if media_item.user_status and media_item.user_status.status
else "NOT_ON_LIST"
),
"USER_PROGRESS": formatter.shell_safe(
f"Episode {media_item.user_status.progress}"
if media_item.user_status
else "0"
),
"START_DATE": formatter.shell_safe(
formatter.format_date(media_item.start_date)
),
"END_DATE": formatter.shell_safe(formatter.format_date(media_item.end_date)),
"SYNOPSIS": formatter.shell_safe(description),
}
for key, value in replacements.items():
info_script = info_script.replace(f"{{{key}}}", value)
return info_script
def get_episode_preview(
episodes: List[str], media_item: MediaItem, config: AppConfig
) -> str:
"""
Generate episode preview script and start background caching.
Args:
episodes: List of episode identifiers
media_item: Media item containing episode data
config: Application configuration
Returns:
Preview script content for fzf
"""
IMAGES_CACHE_DIR.mkdir(parents=True, exist_ok=True)
INFO_CACHE_DIR.mkdir(parents=True, exist_ok=True)
@@ -178,10 +179,16 @@ def get_episode_preview(
SEPARATOR_COLOR = config.fzf.preview_separator_color.split(",")
preview_script = TEMPLATE_PREVIEW_SCRIPT
# Start background caching for episodes
Thread(
target=_episode_cache_worker, args=(episodes, media_item, config), daemon=True
).start()
# Start managed background caching for episodes
try:
preview_manager = _get_preview_manager()
worker = preview_manager.get_episode_worker()
worker.cache_episode_previews(episodes, media_item, config)
logger.debug("Started background caching for episode previews")
except Exception as e:
logger.error(f"Failed to start episode background caching: {e}")
# Continue with script generation even if caching fails
# Prepare values to inject into the template
path_sep = "\\" if PLATFORM == "win32" else "/"
@@ -208,107 +215,31 @@ def get_episode_preview(
return preview_script
def _episode_cache_worker(
episodes: List[str], media_item: MediaItem, config: AppConfig
):
"""Background task that fetches and saves episode preview data."""
streaming_episodes = media_item.streaming_episodes
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
for episode_str in episodes:
hash_id = _get_cache_hash(
f"{media_item.title.english}_Episode_{episode_str}"
)
# Find matching streaming episode
title = None
thumbnail = None
if ep := streaming_episodes.get(episode_str):
title = ep.title
thumbnail = ep.thumbnail
# Fallback if no streaming episode found
if not title:
title = f"Episode {episode_str}"
# fallback
if not thumbnail and media_item.cover_image:
thumbnail = media_item.cover_image.large
# Download thumbnail if available
if thumbnail:
executor.submit(_save_image_from_url, thumbnail, hash_id)
# Generate and save episode info
episode_info = _populate_episode_info_template(config, title, media_item)
executor.submit(_save_info_text, episode_info, hash_id)
def _get_preview_manager() -> PreviewWorkerManager:
"""Get or create the global preview worker manager."""
global _preview_manager
if _preview_manager is None:
_preview_manager = PreviewWorkerManager(IMAGES_CACHE_DIR, INFO_CACHE_DIR)
return _preview_manager
def _populate_episode_info_template(
config: AppConfig, title: str, media_item: MediaItem
) -> str:
def shutdown_preview_workers(wait: bool = True, timeout: Optional[float] = 5.0) -> None:
"""
Takes the episode_info.sh template and injects episode-specific formatted data.
Shutdown all preview workers.
Args:
wait: Whether to wait for tasks to complete
timeout: Maximum time to wait for shutdown
"""
episode_info_script = TEMPLATE_EPISODE_INFO_SCRIPT
replacements = {
"TITLE": formatter.shell_safe(title),
"NEXT_EPISODE": formatter.shell_safe(
f"Episode {media_item.next_airing.episode} on {formatter.format_date(media_item.next_airing.airing_at, '%A, %d %B %Y at %X)')}"
if media_item.next_airing
else "N/A"
),
"DURATION": formatter.format_media_duration(media_item.duration),
"STATUS": formatter.shell_safe(media_item.status.value),
"EPISODES": formatter.shell_safe(str(media_item.episodes)),
"USER_STATUS": formatter.shell_safe(
media_item.user_status.status.value
if media_item.user_status and media_item.user_status.status
else "NOT_ON_LIST"
),
"USER_PROGRESS": formatter.shell_safe(
f"Episode {media_item.user_status.progress}"
if media_item.user_status
else "0"
),
"START_DATE": formatter.shell_safe(
formatter.format_date(media_item.start_date)
),
"END_DATE": formatter.shell_safe(formatter.format_date(media_item.end_date)),
}
for key, value in replacements.items():
episode_info_script = episode_info_script.replace(f"{{{key}}}", value)
return episode_info_script
global _preview_manager
if _preview_manager:
_preview_manager.shutdown_all(wait=wait, timeout=timeout)
_preview_manager = None
def _get_cache_hash(text: str) -> str:
"""Generates a consistent SHA256 hash for a given string to use as a filename."""
return sha256(text.encode("utf-8")).hexdigest()
def _save_image_from_url(url: str, hash_id: str):
"""Downloads an image using httpx and saves it to the cache."""
image_path = IMAGES_CACHE_DIR / f"{hash_id}.png"
try:
with httpx.stream("GET", url, follow_redirects=True, timeout=20) as response:
response.raise_for_status()
with AtomicWriter(image_path, "wb", encoding=None) as f:
chunks = b""
for chunk in response.iter_bytes():
chunks += chunk
f.write(chunks)
except Exception as e:
logger.error(f"Failed to download image {url}: {e}")
def _save_info_text(info_text: str, hash_id: str):
"""Saves pre-formatted text to the info cache."""
try:
info_path = INFO_CACHE_DIR / hash_id
with AtomicWriter(info_path) as f:
f.write(info_text)
except IOError as e:
logger.error(f"Failed to write info cache for {hash_id}: {e}")
def get_preview_worker_status() -> dict:
"""Get status of all preview workers."""
global _preview_manager
if _preview_manager:
return _preview_manager.get_status()
return {"preview_worker": None, "episode_worker": None}

View File

@@ -0,0 +1,475 @@
"""
Preview-specific background workers for caching anime and episode data.
This module provides specialized workers for handling anime preview caching,
including image downloads and info text generation with proper lifecycle management.
"""
import logging
from typing import List, Optional
import httpx
from ...core.constants import SCRIPTS_DIR
from ...core.config import AppConfig
from ...core.utils import formatter
from ...core.utils.concurrency import (
ManagedBackgroundWorker,
WorkerTask,
thread_manager,
)
from ...core.utils.file import AtomicWriter
from ...libs.media_api.types import MediaItem
logger = logging.getLogger(__name__)
FZF_SCRIPTS_DIR = SCRIPTS_DIR / "fzf"
TEMPLATE_INFO_SCRIPT = (FZF_SCRIPTS_DIR / "info.template.sh").read_text(
encoding="utf-8"
)
TEMPLATE_EPISODE_INFO_SCRIPT = (FZF_SCRIPTS_DIR / "episode-info.template.sh").read_text(
encoding="utf-8"
)
class PreviewCacheWorker(ManagedBackgroundWorker):
"""
Specialized background worker for caching anime preview data.
Handles downloading images and generating info text for anime previews
with proper error handling and resource management.
"""
def __init__(self, images_cache_dir, info_cache_dir, max_workers: int = 10):
"""
Initialize the preview cache worker.
Args:
images_cache_dir: Directory to cache images
info_cache_dir: Directory to cache info text
max_workers: Maximum number of concurrent workers
"""
super().__init__(max_workers=max_workers, name="PreviewCacheWorker")
self.images_cache_dir = images_cache_dir
self.info_cache_dir = info_cache_dir
self._http_client: Optional[httpx.Client] = None
def start(self) -> None:
"""Start the worker and initialize HTTP client."""
super().start()
self._http_client = httpx.Client(
timeout=20.0,
follow_redirects=True,
limits=httpx.Limits(max_connections=self.max_workers),
)
logger.debug("PreviewCacheWorker HTTP client initialized")
def shutdown(self, wait: bool = True, timeout: Optional[float] = 30.0) -> None:
"""Shutdown the worker and cleanup HTTP client."""
super().shutdown(wait=wait, timeout=timeout)
if self._http_client:
self._http_client.close()
self._http_client = None
logger.debug("PreviewCacheWorker HTTP client closed")
def cache_anime_previews(
self, media_items: List[MediaItem], titles: List[str], config: AppConfig
) -> None:
"""
Cache preview data for multiple anime items.
Args:
media_items: List of media items to cache
titles: Corresponding titles for each media item
config: Application configuration
"""
if not self.is_running():
raise RuntimeError("PreviewCacheWorker is not running")
for media_item, title_str in zip(media_items, titles):
hash_id = self._get_cache_hash(title_str)
# Submit image download task if needed
if config.general.preview in ("full", "image") and media_item.cover_image:
image_path = self.images_cache_dir / f"{hash_id}.png"
if not image_path.exists():
self.submit_function(
self._download_and_save_image,
media_item.cover_image.large,
hash_id,
)
# Submit info generation task if needed
if config.general.preview in ("full", "text"):
info_path = self.info_cache_dir / hash_id
if not info_path.exists():
info_text = self._generate_info_text(media_item, config)
self.submit_function(self._save_info_text, info_text, hash_id)
def _download_and_save_image(self, url: str, hash_id: str) -> None:
"""Download an image and save it to cache."""
if not self._http_client:
raise RuntimeError("HTTP client not initialized")
image_path = self.images_cache_dir / f"{hash_id}.png"
try:
with self._http_client.stream("GET", url) as response:
response.raise_for_status()
with AtomicWriter(image_path, "wb", encoding=None) as f:
for chunk in response.iter_bytes():
f.write(chunk)
logger.debug(f"Successfully cached image: {hash_id}")
except Exception as e:
logger.error(f"Failed to download image {url}: {e}")
raise
def _generate_info_text(self, media_item: MediaItem, config: AppConfig) -> str:
"""Generate formatted info text for a media item."""
# Import here to avoid circular imports
info_script = TEMPLATE_INFO_SCRIPT
description = formatter.clean_html(
media_item.description or "No description available."
)
# Escape all variables before injecting them into the script
replacements = {
"TITLE": formatter.shell_safe(
media_item.title.english or media_item.title.romaji
),
"STATUS": formatter.shell_safe(media_item.status.value),
"FORMAT": formatter.shell_safe(media_item.format.value),
"NEXT_EPISODE": formatter.shell_safe(
f"Episode {media_item.next_airing.episode} on {formatter.format_date(media_item.next_airing.airing_at, '%A, %d %B %Y at %X)')}"
if media_item.next_airing
else "N/A"
),
"EPISODES": formatter.shell_safe(str(media_item.episodes)),
"DURATION": formatter.shell_safe(
formatter.format_media_duration(media_item.duration)
),
"SCORE": formatter.shell_safe(
formatter.format_score_stars_full(media_item.average_score)
),
"FAVOURITES": formatter.shell_safe(
formatter.format_number_with_commas(media_item.favourites)
),
"POPULARITY": formatter.shell_safe(
formatter.format_number_with_commas(media_item.popularity)
),
"GENRES": formatter.shell_safe(
formatter.format_list_with_commas([v.value for v in media_item.genres])
),
"TAGS": formatter.shell_safe(
formatter.format_list_with_commas(
[t.name.value for t in media_item.tags]
)
),
"STUDIOS": formatter.shell_safe(
formatter.format_list_with_commas(
[t.name for t in media_item.studios if t.name]
)
),
"SYNONYMNS": formatter.shell_safe(
formatter.format_list_with_commas(media_item.synonymns)
),
"USER_STATUS": formatter.shell_safe(
media_item.user_status.status.value
if media_item.user_status and media_item.user_status.status
else "NOT_ON_LIST"
),
"USER_PROGRESS": formatter.shell_safe(
f"Episode {media_item.user_status.progress}"
if media_item.user_status
else "0"
),
"START_DATE": formatter.shell_safe(
formatter.format_date(media_item.start_date)
),
"END_DATE": formatter.shell_safe(
formatter.format_date(media_item.end_date)
),
"SYNOPSIS": formatter.shell_safe(description),
}
for key, value in replacements.items():
info_script = info_script.replace(f"{{{key}}}", value)
return info_script
def _save_info_text(self, info_text: str, hash_id: str) -> None:
"""Save info text to cache."""
try:
info_path = self.info_cache_dir / hash_id
with AtomicWriter(info_path) as f:
f.write(info_text)
logger.debug(f"Successfully cached info: {hash_id}")
except IOError as e:
logger.error(f"Failed to write info cache for {hash_id}: {e}")
raise
def _get_cache_hash(self, text: str) -> str:
"""Generate a cache hash for the given text."""
from hashlib import sha256
return sha256(text.encode("utf-8")).hexdigest()
def _on_task_completed(self, task: WorkerTask, future) -> None:
"""Handle task completion with enhanced logging."""
super()._on_task_completed(task, future)
if future.exception():
logger.warning(f"Preview cache task failed: {future.exception()}")
else:
logger.debug("Preview cache task completed successfully")
class EpisodeCacheWorker(ManagedBackgroundWorker):
"""
Specialized background worker for caching episode preview data.
Handles episode-specific caching including thumbnails and episode info
with proper error handling and resource management.
"""
def __init__(self, images_cache_dir, info_cache_dir, max_workers: int = 5):
"""
Initialize the episode cache worker.
Args:
images_cache_dir: Directory to cache images
info_cache_dir: Directory to cache info text
max_workers: Maximum number of concurrent workers
"""
super().__init__(max_workers=max_workers, name="EpisodeCacheWorker")
self.images_cache_dir = images_cache_dir
self.info_cache_dir = info_cache_dir
self._http_client: Optional[httpx.Client] = None
def start(self) -> None:
"""Start the worker and initialize HTTP client."""
super().start()
self._http_client = httpx.Client(
timeout=20.0,
follow_redirects=True,
limits=httpx.Limits(max_connections=self.max_workers),
)
logger.debug("EpisodeCacheWorker HTTP client initialized")
def shutdown(self, wait: bool = True, timeout: Optional[float] = 30.0) -> None:
"""Shutdown the worker and cleanup HTTP client."""
super().shutdown(wait=wait, timeout=timeout)
if self._http_client:
self._http_client.close()
self._http_client = None
logger.debug("EpisodeCacheWorker HTTP client closed")
def cache_episode_previews(
self, episodes: List[str], media_item: MediaItem, config: AppConfig
) -> None:
"""
Cache preview data for multiple episodes.
Args:
episodes: List of episode identifiers
media_item: Media item containing episode data
config: Application configuration
"""
if not self.is_running():
raise RuntimeError("EpisodeCacheWorker is not running")
streaming_episodes = media_item.streaming_episodes
for episode_str in episodes:
hash_id = self._get_cache_hash(
f"{media_item.title.english}_Episode_{episode_str}"
)
# Find episode data
episode_data = streaming_episodes.get(episode_str)
title = episode_data.title if episode_data else f"Episode {episode_str}"
thumbnail = None
if episode_data and episode_data.thumbnail:
thumbnail = episode_data.thumbnail
elif media_item.cover_image:
thumbnail = media_item.cover_image.large
# Submit thumbnail download task
if thumbnail:
self.submit_function(self._download_and_save_image, thumbnail, hash_id)
# Submit episode info generation task
episode_info = self._generate_episode_info(config, title, media_item)
self.submit_function(self._save_info_text, episode_info, hash_id)
def _download_and_save_image(self, url: str, hash_id: str) -> None:
"""Download an image and save it to cache."""
if not self._http_client:
raise RuntimeError("HTTP client not initialized")
image_path = self.images_cache_dir / f"{hash_id}.png"
try:
with self._http_client.stream("GET", url) as response:
response.raise_for_status()
with AtomicWriter(image_path, "wb", encoding=None) as f:
for chunk in response.iter_bytes():
f.write(chunk)
logger.debug(f"Successfully cached episode image: {hash_id}")
except Exception as e:
logger.error(f"Failed to download episode image {url}: {e}")
raise
def _generate_episode_info(
self, config: AppConfig, title: str, media_item: MediaItem
) -> str:
"""Generate formatted episode info text."""
episode_info_script = TEMPLATE_EPISODE_INFO_SCRIPT
replacements = {
"TITLE": formatter.shell_safe(title),
"NEXT_EPISODE": formatter.shell_safe(
f"Episode {media_item.next_airing.episode} on {formatter.format_date(media_item.next_airing.airing_at, '%A, %d %B %Y at %X)')}"
if media_item.next_airing
else "N/A"
),
"DURATION": formatter.format_media_duration(media_item.duration),
"STATUS": formatter.shell_safe(media_item.status.value),
"EPISODES": formatter.shell_safe(str(media_item.episodes)),
"USER_STATUS": formatter.shell_safe(
media_item.user_status.status.value
if media_item.user_status and media_item.user_status.status
else "NOT_ON_LIST"
),
"USER_PROGRESS": formatter.shell_safe(
f"Episode {media_item.user_status.progress}"
if media_item.user_status
else "0"
),
"START_DATE": formatter.shell_safe(
formatter.format_date(media_item.start_date)
),
"END_DATE": formatter.shell_safe(
formatter.format_date(media_item.end_date)
),
}
for key, value in replacements.items():
episode_info_script = episode_info_script.replace(f"{{{key}}}", value)
return episode_info_script
def _save_info_text(self, info_text: str, hash_id: str) -> None:
"""Save episode info text to cache."""
try:
info_path = self.info_cache_dir / hash_id
with AtomicWriter(info_path) as f:
f.write(info_text)
logger.debug(f"Successfully cached episode info: {hash_id}")
except IOError as e:
logger.error(f"Failed to write episode info cache for {hash_id}: {e}")
raise
def _get_cache_hash(self, text: str) -> str:
"""Generate a cache hash for the given text."""
from hashlib import sha256
return sha256(text.encode("utf-8")).hexdigest()
def _on_task_completed(self, task: WorkerTask, future) -> None:
"""Handle task completion with enhanced logging."""
super()._on_task_completed(task, future)
if future.exception():
logger.warning(f"Episode cache task failed: {future.exception()}")
else:
logger.debug("Episode cache task completed successfully")
class PreviewWorkerManager:
"""
High-level manager for preview caching workers.
Provides a simple interface for managing both anime and episode preview
caching workers with automatic lifecycle management.
"""
def __init__(self, images_cache_dir, info_cache_dir):
"""
Initialize the preview worker manager.
Args:
images_cache_dir: Directory to cache images
info_cache_dir: Directory to cache info text
"""
self.images_cache_dir = images_cache_dir
self.info_cache_dir = info_cache_dir
self._preview_worker: Optional[PreviewCacheWorker] = None
self._episode_worker: Optional[EpisodeCacheWorker] = None
def get_preview_worker(self) -> PreviewCacheWorker:
"""Get or create the preview cache worker."""
if self._preview_worker is None or not self._preview_worker.is_running():
if self._preview_worker:
# Clean up old worker
thread_manager.shutdown_worker("preview_cache_worker")
self._preview_worker = PreviewCacheWorker(
self.images_cache_dir, self.info_cache_dir
)
self._preview_worker.start()
thread_manager.register_worker("preview_cache_worker", self._preview_worker)
return self._preview_worker
def get_episode_worker(self) -> EpisodeCacheWorker:
"""Get or create the episode cache worker."""
if self._episode_worker is None or not self._episode_worker.is_running():
if self._episode_worker:
# Clean up old worker
thread_manager.shutdown_worker("episode_cache_worker")
self._episode_worker = EpisodeCacheWorker(
self.images_cache_dir, self.info_cache_dir
)
self._episode_worker.start()
thread_manager.register_worker("episode_cache_worker", self._episode_worker)
return self._episode_worker
def shutdown_all(self, wait: bool = True, timeout: Optional[float] = 30.0) -> None:
"""Shutdown all managed workers."""
thread_manager.shutdown_worker(
"preview_cache_worker", wait=wait, timeout=timeout
)
thread_manager.shutdown_worker(
"episode_cache_worker", wait=wait, timeout=timeout
)
self._preview_worker = None
self._episode_worker = None
def get_status(self) -> dict:
"""Get status of all managed workers."""
return {
"preview_worker": self._preview_worker.get_completion_stats()
if self._preview_worker
else None,
"episode_worker": self._episode_worker.get_completion_stats()
if self._episode_worker
else None,
}
def __enter__(self):
"""Context manager entry - workers are created on demand."""
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit with automatic cleanup."""
self.shutdown_all(wait=False, timeout=5.0)

View File

@@ -0,0 +1,7 @@
"""
Core utilities for FastAnime application.
This module provides various utility classes and functions used throughout
the FastAnime application, including concurrency management, file operations,
and other common functionality.
"""

View File

@@ -0,0 +1,389 @@
"""
Concurrency utilities for managing background tasks and thread lifecycle.
This module provides abstract base classes and concrete implementations for managing
background workers with proper lifecycle control, cancellation support, and resource cleanup.
"""
import logging
import threading
from abc import ABC, abstractmethod
from concurrent.futures import Future, ThreadPoolExecutor
from typing import Any, Callable, Dict, List, Optional, Protocol, TypeVar
from weakref import WeakSet
logger = logging.getLogger(__name__)
T = TypeVar("T")
class Cancellable(Protocol):
"""Protocol for objects that can be cancelled."""
def cancel(self) -> bool:
"""Cancel the operation. Returns True if cancellation was successful."""
...
def cancelled(self) -> bool:
"""Return True if the operation was cancelled."""
...
class WorkerTask:
"""Represents a single task that can be executed by a worker."""
def __init__(self, func: Callable[..., Any], *args, **kwargs):
"""
Initialize a worker task.
Args:
func: The function to execute
*args: Positional arguments for the function
**kwargs: Keyword arguments for the function
"""
self.func = func
self.args = args
self.kwargs = kwargs
self._cancelled = threading.Event()
self._completed = threading.Event()
self._exception: Optional[Exception] = None
self._result: Any = None
def execute(self) -> Any:
"""Execute the task if not cancelled."""
if self._cancelled.is_set():
return None
try:
self._result = self.func(*self.args, **self.kwargs)
return self._result
except Exception as e:
self._exception = e
logger.error(f"Task execution failed: {e}")
raise
finally:
self._completed.set()
def cancel(self) -> bool:
"""Cancel the task."""
if self._completed.is_set():
return False
self._cancelled.set()
return True
def cancelled(self) -> bool:
"""Check if the task was cancelled."""
return self._cancelled.is_set()
def completed(self) -> bool:
"""Check if the task completed."""
return self._completed.is_set()
@property
def exception(self) -> Optional[Exception]:
"""Get the exception if one occurred during execution."""
return self._exception
@property
def result(self) -> Any:
"""Get the result of the task execution."""
return self._result
class BackgroundWorker(ABC):
"""
Abstract base class for background workers that manage concurrent tasks.
Provides lifecycle management, cancellation support, and proper resource cleanup.
"""
def __init__(self, max_workers: int = 5, name: Optional[str] = None):
"""
Initialize the background worker.
Args:
max_workers: Maximum number of concurrent worker threads
name: Optional name for the worker (used in logging)
"""
self.max_workers = max_workers
self.name = name or self.__class__.__name__
self._executor: Optional[ThreadPoolExecutor] = None
self._futures: WeakSet[Future] = WeakSet()
self._tasks: List[WorkerTask] = []
self._shutdown_event = threading.Event()
self._lock = threading.RLock()
self._started = False
def start(self) -> None:
"""Start the background worker."""
with self._lock:
if self._started:
logger.warning(f"Worker {self.name} is already started")
return
self._executor = ThreadPoolExecutor(
max_workers=self.max_workers, thread_name_prefix=f"{self.name}-worker"
)
self._started = True
logger.debug(f"Started background worker: {self.name}")
def submit_task(self, task: WorkerTask) -> Future:
"""
Submit a task for background execution.
Args:
task: The task to execute
Returns:
Future representing the task execution
Raises:
RuntimeError: If the worker is not started or is shutting down
"""
with self._lock:
if not self._started or self._shutdown_event.is_set():
raise RuntimeError(f"Worker {self.name} is not available")
if self._executor is None:
raise RuntimeError(f"Worker {self.name} executor is not initialized")
self._tasks.append(task)
future = self._executor.submit(task.execute)
self._futures.add(future)
logger.debug(f"Submitted task to worker {self.name}")
return future
def submit_function(self, func: Callable[..., Any], *args, **kwargs) -> Future:
"""
Submit a function for background execution.
Args:
func: The function to execute
*args: Positional arguments for the function
**kwargs: Keyword arguments for the function
Returns:
Future representing the task execution
"""
task = WorkerTask(func, *args, **kwargs)
return self.submit_task(task)
def cancel_all_tasks(self) -> int:
"""
Cancel all pending and running tasks.
Returns:
Number of tasks that were successfully cancelled
"""
cancelled_count = 0
with self._lock:
# Cancel all tasks
for task in self._tasks:
if task.cancel():
cancelled_count += 1
# Cancel all futures
for future in list(self._futures):
if future.cancel():
cancelled_count += 1
logger.debug(f"Cancelled {cancelled_count} tasks in worker {self.name}")
return cancelled_count
def shutdown(self, wait: bool = True, timeout: Optional[float] = 30.0) -> None:
"""
Shutdown the background worker.
Args:
wait: Whether to wait for running tasks to complete
timeout: Maximum time to wait for shutdown (ignored if wait=False)
"""
with self._lock:
if not self._started:
return
self._shutdown_event.set()
self._started = False
if self._executor is None:
return
logger.debug(f"Shutting down worker {self.name}")
if not wait:
# Cancel all tasks and shutdown immediately
self.cancel_all_tasks()
self._executor.shutdown(wait=False, cancel_futures=True)
else:
# Wait for tasks to complete with timeout
try:
self._executor.shutdown(wait=True, timeout=timeout)
except TimeoutError:
logger.warning(
f"Worker {self.name} shutdown timed out, forcing cancellation"
)
self.cancel_all_tasks()
self._executor.shutdown(wait=False, cancel_futures=True)
self._executor = None
logger.debug(f"Worker {self.name} shutdown complete")
def is_running(self) -> bool:
"""Check if the worker is currently running."""
return self._started and not self._shutdown_event.is_set()
def get_active_task_count(self) -> int:
"""Get the number of active (non-completed) tasks."""
with self._lock:
return sum(1 for task in self._tasks if not task.completed())
@abstractmethod
def _on_task_completed(self, task: WorkerTask, future: Future) -> None:
"""
Hook called when a task completes (successfully or with error).
Args:
task: The completed task
future: The future representing the task execution
"""
pass
def __enter__(self):
"""Context manager entry."""
self.start()
return self
def __exit__(self, exc_type, exc_val, exc_tb):
"""Context manager exit with automatic cleanup."""
self.shutdown(wait=True)
class ManagedBackgroundWorker(BackgroundWorker):
"""
Concrete implementation of BackgroundWorker with task completion tracking.
This worker provides additional monitoring and logging of task completion.
"""
def __init__(self, max_workers: int = 5, name: Optional[str] = None):
super().__init__(max_workers, name)
self._completed_tasks: List[WorkerTask] = []
self._failed_tasks: List[WorkerTask] = []
def _on_task_completed(self, task: WorkerTask, future: Future) -> None:
"""Track completed tasks and log results."""
try:
if future.exception():
self._failed_tasks.append(task)
logger.error(f"Task failed in worker {self.name}: {future.exception()}")
else:
self._completed_tasks.append(task)
logger.debug(f"Task completed successfully in worker {self.name}")
except Exception as e:
logger.error(f"Error in task completion handler: {e}")
def get_completion_stats(self) -> Dict[str, int]:
"""Get statistics about task completion."""
with self._lock:
return {
"total_tasks": len(self._tasks),
"completed_tasks": len(self._completed_tasks),
"failed_tasks": len(self._failed_tasks),
"active_tasks": self.get_active_task_count(),
}
class ThreadManager:
"""
Manages multiple background workers and provides centralized control.
This class acts as a registry for all background workers in the application,
allowing for coordinated shutdown and monitoring.
"""
def __init__(self):
self._workers: Dict[str, BackgroundWorker] = {}
self._lock = threading.RLock()
def register_worker(self, name: str, worker: BackgroundWorker) -> None:
"""
Register a background worker.
Args:
name: Unique name for the worker
worker: The worker instance to register
"""
with self._lock:
if name in self._workers:
raise ValueError(f"Worker with name '{name}' already registered")
self._workers[name] = worker
logger.debug(f"Registered worker: {name}")
def get_worker(self, name: str) -> Optional[BackgroundWorker]:
"""Get a registered worker by name."""
with self._lock:
return self._workers.get(name)
def shutdown_worker(
self, name: str, wait: bool = True, timeout: Optional[float] = 30.0
) -> bool:
"""
Shutdown a specific worker.
Args:
name: Name of the worker to shutdown
wait: Whether to wait for completion
timeout: Shutdown timeout
Returns:
True if worker was found and shutdown, False otherwise
"""
with self._lock:
worker = self._workers.get(name)
if worker:
worker.shutdown(wait=wait, timeout=timeout)
del self._workers[name]
logger.debug(f"Shutdown worker: {name}")
return True
return False
def shutdown_all(self, wait: bool = True, timeout: Optional[float] = 30.0) -> None:
"""Shutdown all registered workers."""
with self._lock:
workers_to_shutdown = list(self._workers.items())
for name, worker in workers_to_shutdown:
try:
worker.shutdown(wait=wait, timeout=timeout)
logger.debug(f"Shutdown worker: {name}")
except Exception as e:
logger.error(f"Error shutting down worker {name}: {e}")
with self._lock:
self._workers.clear()
def get_all_workers(self) -> Dict[str, BackgroundWorker]:
"""Get a copy of all registered workers."""
with self._lock:
return self._workers.copy()
def get_status(self) -> Dict[str, Dict[str, Any]]:
"""Get status information for all workers."""
status = {}
with self._lock:
for name, worker in self._workers.items():
status[name] = {
"running": worker.is_running(),
"active_tasks": worker.get_active_task_count(),
}
if isinstance(worker, ManagedBackgroundWorker):
status[name].update(worker.get_completion_stats())
return status
# Global thread manager instance
thread_manager = ThreadManager()