Files
FastAnime/viu_media/cli/utils/preview.py
2025-12-02 17:44:46 +03:00

623 lines
21 KiB
Python

import logging
import re
from hashlib import sha256
import sys
from typing import Dict, List, Optional
import httpx
from viu_media.core.utils import formatter
from ...core.config import AppConfig
from ...core.constants import APP_CACHE_DIR, SCRIPTS_DIR
from ...core.utils.file import AtomicWriter
from ...libs.media_api.types import (
AiringScheduleResult,
Character,
MediaItem,
MediaReview,
)
from .preview_workers import PreviewWorkerManager
def get_rofi_preview(
media_items: List[MediaItem], titles: List[str], config: AppConfig
) -> str:
# Ensure cache directories exist on startup
IMAGES_CACHE_DIR.mkdir(parents=True, exist_ok=True)
INFO_CACHE_DIR.mkdir(parents=True, exist_ok=True)
return (
"".join(
[
f"{title}\0icon\x1f{_get_image(item)}\n"
for item, title in zip(media_items, titles)
]
)
+ "Back\nExit"
)
def _get_image(item: MediaItem) -> str:
if not item.cover_image:
return ""
hash_id = sha256(item.title.english.encode("utf-8")).hexdigest()
image_path = IMAGES_CACHE_DIR / f"{hash_id}.png"
if image_path.exists():
return str(image_path)
if not item.cover_image.large:
return ""
try:
with httpx.stream(
"GET", item.cover_image.large, follow_redirects=True
) as response:
response.raise_for_status()
with AtomicWriter(image_path, "wb", encoding=None) as f:
for chunk in response.iter_bytes():
f.write(chunk)
return str(image_path)
except Exception as e:
logger.error(f"Failed to download image {item.cover_image.large}: {e}")
return ""
def get_rofi_episode_preview(
episodes: List[str], media_item: MediaItem, config: AppConfig
) -> str:
# Ensure cache directories exist on startup
IMAGES_CACHE_DIR.mkdir(parents=True, exist_ok=True)
INFO_CACHE_DIR.mkdir(parents=True, exist_ok=True)
return (
"".join(
[
f"{episode}\0icon\x1f{_get_episode_image(episode, media_item)}\n"
for episode in episodes
]
)
+ "Back\nExit"
)
def _get_episode_image(episode: str, media_item: MediaItem) -> str:
if media_item.streaming_episodes and media_item.streaming_episodes.get(episode):
stream = media_item.streaming_episodes[episode]
image_url = stream.thumbnail
else:
if not media_item.cover_image:
return ""
image_url = media_item.cover_image.large
if not image_url:
return ""
hash_id = sha256(
f"{media_item.title.english}_Episode_{episode}".encode("utf-8")
).hexdigest()
image_path = IMAGES_CACHE_DIR / f"{hash_id}.png"
if image_path.exists():
return str(image_path)
try:
with httpx.stream("GET", image_url, follow_redirects=True) as response:
response.raise_for_status()
with AtomicWriter(image_path, "wb", encoding=None) as f:
for chunk in response.iter_bytes():
f.write(chunk)
return str(image_path)
except Exception as e:
logger.error(
f"Failed to download image {image_url} for {media_item.title.english}: {e}"
)
return ""
logger = logging.getLogger(__name__)
# os.environ["SHELL"] = sys.executable
PREVIEWS_CACHE_DIR = APP_CACHE_DIR / "previews"
IMAGES_CACHE_DIR = PREVIEWS_CACHE_DIR / "images"
INFO_CACHE_DIR = PREVIEWS_CACHE_DIR / "info"
FZF_SCRIPTS_DIR = SCRIPTS_DIR / "fzf"
TEMPLATE_PREVIEW_SCRIPT = (FZF_SCRIPTS_DIR / "preview.py").read_text(encoding="utf-8")
DYNAMIC_PREVIEW_SCRIPT = (FZF_SCRIPTS_DIR / "dynamic_preview.py").read_text(
encoding="utf-8"
)
EPISODE_PATTERN = re.compile(r"^Episode\s+(\d+)\s-\s.*")
# Global preview worker manager instance
_preview_manager: Optional[PreviewWorkerManager] = None
def _ensure_ansi_utils_in_cache():
"""Copy _ansi_utils.py to the info cache directory so cached scripts can import it."""
source = FZF_SCRIPTS_DIR / "_ansi_utils.py"
dest = INFO_CACHE_DIR / "_ansi_utils.py"
if source.exists() and (
not dest.exists() or source.stat().st_mtime > dest.stat().st_mtime
):
try:
import shutil
shutil.copy2(source, dest)
logger.debug(f"Copied _ansi_utils.py to {INFO_CACHE_DIR}")
except Exception as e:
logger.warning(f"Failed to copy _ansi_utils.py to cache: {e}")
def create_preview_context():
"""
Create a context manager for preview operations.
This can be used in menu functions to ensure proper cleanup:
```python
with create_preview_context() as preview_ctx:
preview_script = preview_ctx.get_anime_preview(items, titles, config)
# ... use preview_script
# Workers are automatically cleaned up here
```
Returns:
PreviewContext: A context manager for preview operations
"""
return PreviewContext()
class PreviewContext:
"""Context manager for preview operations with automatic cleanup."""
def __init__(self):
self._manager = None
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
if self._manager:
try:
self._manager.shutdown_all(wait=False, timeout=3.0)
except Exception as e:
logger.warning(f"Failed to cleanup preview context: {e}")
def get_anime_preview(
self, items: List[MediaItem], titles: List[str], config: AppConfig
) -> str:
"""Get anime preview script with managed workers."""
if not self._manager:
self._manager = _get_preview_manager()
return get_anime_preview(items, titles, config)
def get_episode_preview(
self, episodes: List[str], media_item: MediaItem, config: AppConfig
) -> str:
"""Get episode preview script with managed workers."""
if not self._manager:
self._manager = _get_preview_manager()
return get_episode_preview(episodes, media_item, config)
def get_dynamic_anime_preview(self, config: AppConfig) -> str:
"""Get dynamic anime preview script for search functionality."""
if not self._manager:
self._manager = _get_preview_manager()
return get_dynamic_anime_preview(config)
def get_review_preview(
self, choice_map: Dict[str, MediaReview], config: AppConfig
) -> str:
"""Get review preview script with managed workers."""
if not self._manager:
self._manager = _get_preview_manager()
return get_review_preview(choice_map, config)
def get_character_preview(
self, choice_map: Dict[str, Character], config: AppConfig
) -> str:
"""Get character preview script with managed workers."""
if not self._manager:
self._manager = _get_preview_manager()
return get_character_preview(choice_map, config)
def get_airing_schedule_preview(
self,
schedule_result: AiringScheduleResult,
config: AppConfig,
anime_title: str = "Anime",
) -> str:
"""Get airing schedule preview script with managed workers."""
if not self._manager:
self._manager = _get_preview_manager()
return get_airing_schedule_preview(schedule_result, config, anime_title)
def cancel_all_tasks(self) -> int:
"""Cancel all running preview tasks."""
if not self._manager:
return 0
cancelled = 0
if self._manager._preview_worker:
cancelled += self._manager._preview_worker.cancel_all_tasks()
if self._manager._episode_worker:
cancelled += self._manager._episode_worker.cancel_all_tasks()
if self._manager._review_worker:
cancelled += self._manager._review_worker.cancel_all_tasks()
if self._manager._character_worker:
cancelled += self._manager._character_worker.cancel_all_tasks()
if self._manager._airing_schedule_worker:
cancelled += self._manager._airing_schedule_worker.cancel_all_tasks()
return cancelled
def get_status(self) -> dict:
"""Get status of workers in this context."""
if self._manager:
return self._manager.get_status()
return {
"preview_worker": None,
"episode_worker": None,
"review_worker": None,
"character_worker": None,
"airing_schedule_worker": None,
}
def get_anime_preview(
items: List[MediaItem], titles: List[str], config: AppConfig
) -> str:
if config.general.selector == "rofi":
return get_rofi_preview(items, titles, config)
"""
Generate anime preview script and start background caching.
Args:
items: List of media items to preview
titles: Corresponding titles for each media item
config: Application configuration
Returns:
Preview script content for fzf
"""
# Ensure cache directories exist on startup
IMAGES_CACHE_DIR.mkdir(parents=True, exist_ok=True)
INFO_CACHE_DIR.mkdir(parents=True, exist_ok=True)
_ensure_ansi_utils_in_cache()
HEADER_COLOR = config.fzf.preview_header_color.split(",")
SEPARATOR_COLOR = config.fzf.preview_separator_color.split(",")
preview_script = TEMPLATE_PREVIEW_SCRIPT
# Start the managed background caching
try:
preview_manager = _get_preview_manager()
worker = preview_manager.get_preview_worker()
worker.cache_anime_previews(items, titles, config)
logger.debug("Started background caching for anime previews")
except Exception as e:
logger.error(f"Failed to start background caching: {e}")
# Continue with script generation even if caching fails
# Format the template with the dynamic values
replacements = {
"PREVIEW_MODE": config.general.preview,
"IMAGE_CACHE_DIR": str(IMAGES_CACHE_DIR),
"INFO_CACHE_DIR": str(INFO_CACHE_DIR),
"IMAGE_RENDERER": config.general.image_renderer,
# Color codes
"HEADER_COLOR": ",".join(HEADER_COLOR),
"SEPARATOR_COLOR": ",".join(SEPARATOR_COLOR),
"PREFIX": "search-result",
"KEY": "",
"SCALE_UP": str(config.general.preview_scale_up),
}
for key, value in replacements.items():
preview_script = preview_script.replace(f"{{{key}}}", value)
preview_file = PREVIEWS_CACHE_DIR / "search-result-preview-script.py"
preview_file.write_text(preview_script, encoding="utf-8")
preview_script_final = f"{sys.executable} {preview_file} {{}}"
return preview_script_final
def get_episode_preview(
episodes: List[str], media_item: MediaItem, config: AppConfig
) -> str:
"""
Generate episode preview script and start background caching.
Args:
episodes: List of episode identifiers
media_item: Media item containing episode data
config: Application configuration
Returns:
Preview script content for fzf
"""
if config.general.selector == "rofi":
return get_rofi_episode_preview(episodes, media_item, config)
IMAGES_CACHE_DIR.mkdir(parents=True, exist_ok=True)
INFO_CACHE_DIR.mkdir(parents=True, exist_ok=True)
HEADER_COLOR = config.fzf.preview_header_color.split(",")
SEPARATOR_COLOR = config.fzf.preview_separator_color.split(",")
preview_script = TEMPLATE_PREVIEW_SCRIPT
# Start managed background caching for episodes
try:
preview_manager = _get_preview_manager()
worker = preview_manager.get_episode_worker()
worker.cache_episode_previews(episodes, media_item, config)
logger.debug("Started background caching for episode previews")
except Exception as e:
logger.error(f"Failed to start episode background caching: {e}")
# Continue with script generation even if caching fails
# Format the template with the dynamic values
replacements = {
"PREVIEW_MODE": config.general.preview,
"IMAGE_CACHE_DIR": str(IMAGES_CACHE_DIR),
"INFO_CACHE_DIR": str(INFO_CACHE_DIR),
"IMAGE_RENDERER": config.general.image_renderer,
# Color codes
"HEADER_COLOR": ",".join(HEADER_COLOR),
"SEPARATOR_COLOR": ",".join(SEPARATOR_COLOR),
"PREFIX": "episode",
"KEY": f"{media_item.title.english.replace(formatter.DOUBLE_QUOTE, formatter.SINGLE_QUOTE)}",
"SCALE_UP": str(config.general.preview_scale_up),
}
for key, value in replacements.items():
preview_script = preview_script.replace(f"{{{key}}}", value)
preview_file = PREVIEWS_CACHE_DIR / "episode-preview-script.py"
preview_file.write_text(preview_script, encoding="utf-8")
preview_script_final = f"{sys.executable} {preview_file} {{}}"
return preview_script_final
def get_character_preview(choice_map: Dict[str, Character], config: AppConfig) -> str:
"""
Generate the generic loader script for character previews and start background caching.
"""
IMAGES_CACHE_DIR.mkdir(parents=True, exist_ok=True)
INFO_CACHE_DIR.mkdir(parents=True, exist_ok=True)
HEADER_COLOR = config.fzf.preview_header_color.split(",")
SEPARATOR_COLOR = config.fzf.preview_separator_color.split(",")
# Start managed background caching for episodes
try:
preview_manager = _get_preview_manager()
worker = preview_manager.get_character_worker()
worker.cache_character_previews(choice_map, config)
logger.debug("Started background caching for character previews")
except Exception as e:
logger.error(f"Failed to start episode background caching: {e}")
# Use the generic loader script
preview_script = TEMPLATE_PREVIEW_SCRIPT
replacements = {
"PREVIEW_MODE": config.general.preview,
"IMAGE_CACHE_DIR": str(IMAGES_CACHE_DIR),
"INFO_CACHE_DIR": str(INFO_CACHE_DIR),
"IMAGE_RENDERER": config.general.image_renderer,
# Color codes
"HEADER_COLOR": ",".join(HEADER_COLOR),
"SEPARATOR_COLOR": ",".join(SEPARATOR_COLOR),
"PREFIX": "character",
"KEY": "",
"SCALE_UP": str(config.general.preview_scale_up),
}
for key, value in replacements.items():
preview_script = preview_script.replace(f"{{{key}}}", value)
preview_file = PREVIEWS_CACHE_DIR / "character-preview-script.py"
preview_file.write_text(preview_script, encoding="utf-8")
preview_script_final = f"{sys.executable} {preview_file} {{}}"
return preview_script_final
def get_review_preview(choice_map: Dict[str, MediaReview], config: AppConfig) -> str:
"""
Generate the generic loader script for review previews and start background caching.
"""
IMAGES_CACHE_DIR.mkdir(parents=True, exist_ok=True)
INFO_CACHE_DIR.mkdir(parents=True, exist_ok=True)
HEADER_COLOR = config.fzf.preview_header_color.split(",")
SEPARATOR_COLOR = config.fzf.preview_separator_color.split(",")
# Start managed background caching for episodes
try:
preview_manager = _get_preview_manager()
worker = preview_manager.get_review_worker()
worker.cache_review_previews(choice_map, config)
logger.debug("Started background caching for review previews")
except Exception as e:
logger.error(f"Failed to start episode background caching: {e}")
# Use the generic loader script
preview_script = TEMPLATE_PREVIEW_SCRIPT
replacements = {
"PREVIEW_MODE": config.general.preview,
"IMAGE_CACHE_DIR": str(IMAGES_CACHE_DIR),
"INFO_CACHE_DIR": str(INFO_CACHE_DIR),
"IMAGE_RENDERER": config.general.image_renderer,
# Color codes
"HEADER_COLOR": ",".join(HEADER_COLOR),
"SEPARATOR_COLOR": ",".join(SEPARATOR_COLOR),
"PREFIX": "review",
"KEY": "",
"SCALE_UP": str(config.general.preview_scale_up),
}
for key, value in replacements.items():
preview_script = preview_script.replace(f"{{{key}}}", value)
preview_file = PREVIEWS_CACHE_DIR / "review-preview-script.py"
preview_file.write_text(preview_script, encoding="utf-8")
preview_script_final = f"{sys.executable} {preview_file} {{}}"
return preview_script_final
def get_airing_schedule_preview(
schedule_result: AiringScheduleResult, config: AppConfig, anime_title: str = "Anime"
) -> str:
"""
Generate the generic loader script for airing schedule previews and start background caching.
"""
IMAGES_CACHE_DIR.mkdir(parents=True, exist_ok=True)
INFO_CACHE_DIR.mkdir(parents=True, exist_ok=True)
HEADER_COLOR = config.fzf.preview_header_color.split(",")
SEPARATOR_COLOR = config.fzf.preview_separator_color.split(",")
# Start managed background caching for episodes
try:
preview_manager = _get_preview_manager()
worker = preview_manager.get_airing_schedule_worker()
worker.cache_airing_schedule_preview(anime_title, schedule_result, config)
logger.debug("Started background caching for airing schedule previews")
except Exception as e:
logger.error(f"Failed to start episode background caching: {e}")
# Use the generic loader script
preview_script = TEMPLATE_PREVIEW_SCRIPT
replacements = {
"PREVIEW_MODE": config.general.preview,
"IMAGE_CACHE_DIR": str(IMAGES_CACHE_DIR),
"INFO_CACHE_DIR": str(INFO_CACHE_DIR),
"IMAGE_RENDERER": config.general.image_renderer,
# Color codes
"HEADER_COLOR": ",".join(HEADER_COLOR),
"SEPARATOR_COLOR": ",".join(SEPARATOR_COLOR),
"PREFIX": "airing-schedule",
"KEY": "",
"SCALE_UP": str(config.general.preview_scale_up),
}
for key, value in replacements.items():
preview_script = preview_script.replace(f"{{{key}}}", value)
preview_file = PREVIEWS_CACHE_DIR / "airing-schedule-preview-script.py"
preview_file.write_text(preview_script, encoding="utf-8")
# preview_script_final = f"{sys.executable} {preview_file} {{}}"
# NOTE: disabled cause not very useful
return ""
def get_dynamic_anime_preview(config: AppConfig) -> str:
"""
Generate dynamic anime preview script for search functionality.
This is different from regular anime preview because:
1. We don't have media items upfront
2. The preview needs to work with search results as they come in
3. Preview script dynamically loads data from search results JSON
Args:
config: Application configuration
Returns:
Preview script command for fzf dynamic search
"""
# Ensure cache directories exist
IMAGES_CACHE_DIR.mkdir(parents=True, exist_ok=True)
INFO_CACHE_DIR.mkdir(parents=True, exist_ok=True)
source = FZF_SCRIPTS_DIR / "_ansi_utils.py"
dest = PREVIEWS_CACHE_DIR / "_ansi_utils.py"
if source.exists() and (
not dest.exists() or source.stat().st_mtime > dest.stat().st_mtime
):
try:
import shutil
shutil.copy2(source, dest)
logger.debug(f"Copied _ansi_utils.py to {INFO_CACHE_DIR}")
except Exception as e:
logger.warning(f"Failed to copy _ansi_utils.py to cache: {e}")
HEADER_COLOR = config.fzf.preview_header_color.split(",")
SEPARATOR_COLOR = config.fzf.preview_separator_color.split(",")
# Use the dynamic preview script template
preview_script = DYNAMIC_PREVIEW_SCRIPT
search_cache_dir = APP_CACHE_DIR / "search"
search_results_file = search_cache_dir / "current_search_results.json"
# Prepare replacements for the template
replacements = {
"SEARCH_RESULTS_FILE": str(search_results_file),
"IMAGE_CACHE_DIR": str(IMAGES_CACHE_DIR),
"PREVIEW_MODE": config.general.preview,
"IMAGE_RENDERER": config.general.image_renderer,
"HEADER_COLOR": ",".join(HEADER_COLOR),
"SEPARATOR_COLOR": ",".join(SEPARATOR_COLOR),
"SCALE_UP": str(config.general.preview_scale_up),
}
for key, value in replacements.items():
preview_script = preview_script.replace(f"{{{key}}}", value)
# Write the preview script to cache
preview_file = PREVIEWS_CACHE_DIR / "dynamic-search-preview-script.py"
preview_file.write_text(preview_script, encoding="utf-8")
# Return the command to execute the preview script
preview_script_final = f"{sys.executable} {preview_file} {{}}"
return preview_script_final
def _get_preview_manager() -> PreviewWorkerManager:
"""Get or create the global preview worker manager."""
global _preview_manager
if _preview_manager is None:
_preview_manager = PreviewWorkerManager(IMAGES_CACHE_DIR, INFO_CACHE_DIR)
return _preview_manager
def shutdown_preview_workers(wait: bool = True, timeout: Optional[float] = 5.0) -> None:
"""
Shutdown all preview workers.
Args:
wait: Whether to wait for tasks to complete
timeout: Maximum time to wait for shutdown
"""
global _preview_manager
if _preview_manager:
_preview_manager.shutdown_all(wait=wait, timeout=timeout)
_preview_manager = None
def get_preview_worker_status() -> dict:
"""Get status of all preview workers."""
global _preview_manager
if _preview_manager:
return _preview_manager.get_status()
return {"preview_worker": None, "episode_worker": None}