diff --git a/fastanime/cli/commands/queue.py b/fastanime/cli/commands/queue.py index 847de76..8c617c3 100644 --- a/fastanime/cli/commands/queue.py +++ b/fastanime/cli/commands/queue.py @@ -1,62 +1,218 @@ import click from fastanime.core.config import AppConfig from fastanime.core.exceptions import FastAnimeError -from fastanime.libs.media_api.params import MediaSearchParams +from fastanime.libs.media_api.types import ( + MediaFormat, + MediaGenre, + MediaItem, + MediaSeason, + MediaSort, + MediaStatus, + MediaTag, + MediaType, + MediaYear, +) @click.command(help="Queue episodes for the background worker to download.") +# Search/Filter options (mirrors 'fastanime anilist download') +@click.option("--title", "-t") +@click.option("--page", "-p", type=click.IntRange(min=1), default=1) +@click.option("--per-page", type=click.IntRange(min=1, max=50)) +@click.option("--season", type=click.Choice([s.value for s in MediaSeason])) @click.option( - "--title", "-t", required=True, multiple=True, help="Anime title to queue." + "--status", "-S", multiple=True, type=click.Choice([s.value for s in MediaStatus]) ) @click.option( - "--episode-range", "-r", required=True, help="Range of episodes (e.g., '1-10')." + "--status-not", multiple=True, type=click.Choice([s.value for s in MediaStatus]) +) +@click.option("--sort", "-s", type=click.Choice([s.value for s in MediaSort])) +@click.option( + "--genres", "-g", multiple=True, type=click.Choice([g.value for g in MediaGenre]) +) +@click.option( + "--genres-not", multiple=True, type=click.Choice([g.value for g in MediaGenre]) +) +@click.option("--tags", "-T", multiple=True, type=click.Choice([t.value for t in MediaTag])) +@click.option("--tags-not", multiple=True, type=click.Choice([t.value for t in MediaTag])) +@click.option( + "--media-format", + "-f", + multiple=True, + type=click.Choice([f.value for f in MediaFormat]), +) +@click.option("--media-type", type=click.Choice([t.value for t in MediaType])) +@click.option("--year", "-y", type=click.Choice([y.value for y in MediaYear])) +@click.option("--popularity-greater", type=click.IntRange(min=0)) +@click.option("--popularity-lesser", type=click.IntRange(min=0)) +@click.option("--score-greater", type=click.IntRange(min=0, max=100)) +@click.option("--score-lesser", type=click.IntRange(min=0, max=100)) +@click.option("--start-date-greater", type=int) +@click.option("--start-date-lesser", type=int) +@click.option("--end-date-greater", type=int) +@click.option("--end-date-lesser", type=int) +@click.option("--on-list/--not-on-list", "-L/-no-L", type=bool, default=None) +# Queue-specific options +@click.option( + "--episode-range", + "-r", + required=True, + help="Range of episodes to queue (e.g., '1-10', '5', '8:12').", +) +@click.option( + "--yes", + "-Y", + is_flag=True, + help="Automatically queue from all found anime without prompting for selection.", ) @click.pass_obj -def queue(config: AppConfig, title: tuple, episode_range: str): +def queue(config: AppConfig, **options): """ - Searches for an anime and adds the specified episodes to the download queue. - The background worker must be running for the downloads to start. + Search AniList with filters, select one or more anime (or use --yes), + and queue the specified episode range for background download. + The background worker should be running to process the queue. """ from fastanime.cli.service.download.service import DownloadService from fastanime.cli.service.feedback import FeedbackService from fastanime.cli.service.registry import MediaRegistryService from fastanime.cli.utils.parser import parse_episode_range + from fastanime.libs.media_api.params import MediaSearchParams from fastanime.libs.media_api.api import create_api_client from fastanime.libs.provider.anime.provider import create_provider + from fastanime.libs.selectors import create_selector + from rich.progress import Progress feedback = FeedbackService(config) + selector = create_selector(config) media_api = create_api_client(config.general.media_api, config) provider = create_provider(config.general.provider) registry = MediaRegistryService(config.general.media_api, config.media_registry) download_service = DownloadService(config, registry, media_api, provider) - for anime_title in title: - try: - feedback.info(f"Searching for '{anime_title}'...") - search_result = media_api.search_media( - MediaSearchParams(query=anime_title, per_page=1) - ) + try: + # Build search params mirroring anilist download + sort_val = options.get("sort") + status_val = options.get("status") + status_not_val = options.get("status_not") + genres_val = options.get("genres") + genres_not_val = options.get("genres_not") + tags_val = options.get("tags") + tags_not_val = options.get("tags_not") + media_format_val = options.get("media_format") + media_type_val = options.get("media_type") + season_val = options.get("season") + year_val = options.get("year") - if not search_result or not search_result.media: - feedback.warning(f"Could not find '{anime_title}' on AniList.") + search_params = MediaSearchParams( + query=options.get("title"), + page=options.get("page", 1), + per_page=options.get("per_page"), + sort=MediaSort(sort_val) if sort_val else None, + status_in=[MediaStatus(s) for s in status_val] if status_val else None, + status_not_in=[MediaStatus(s) for s in status_not_val] + if status_not_val + else None, + genre_in=[MediaGenre(g) for g in genres_val] if genres_val else None, + genre_not_in=[MediaGenre(g) for g in genres_not_val] + if genres_not_val + else None, + tag_in=[MediaTag(t) for t in tags_val] if tags_val else None, + tag_not_in=[MediaTag(t) for t in tags_not_val] if tags_not_val else None, + format_in=[MediaFormat(f) for f in media_format_val] + if media_format_val + else None, + type=MediaType(media_type_val) if media_type_val else None, + season=MediaSeason(season_val) if season_val else None, + seasonYear=int(year_val) if year_val else None, + popularity_greater=options.get("popularity_greater"), + popularity_lesser=options.get("popularity_lesser"), + averageScore_greater=options.get("score_greater"), + averageScore_lesser=options.get("score_lesser"), + startDate_greater=options.get("start_date_greater"), + startDate_lesser=options.get("start_date_lesser"), + endDate_greater=options.get("end_date_greater"), + endDate_lesser=options.get("end_date_lesser"), + on_list=options.get("on_list"), + ) + + with Progress() as progress: + progress.add_task("Searching AniList...", total=None) + search_result = media_api.search_media(search_params) + + if not search_result or not search_result.media: + raise FastAnimeError("No anime found matching your search criteria.") + + if options.get("yes"): + anime_to_queue = search_result.media + else: + choice_map: dict[str, MediaItem] = { + (item.title.english or item.title.romaji or f"ID: {item.id}"): item + for item in search_result.media + } + preview_command = None + if config.general.preview != "none": + from ..utils.preview import create_preview_context # type: ignore + + with create_preview_context() as preview_ctx: + preview_command = preview_ctx.get_anime_preview( + list(choice_map.values()), + list(choice_map.keys()), + config, + ) + selected_titles = selector.choose_multiple( + "Select anime to queue", + list(choice_map.keys()), + preview=preview_command, + ) + else: + selected_titles = selector.choose_multiple( + "Select anime to queue", list(choice_map.keys()) + ) + + if not selected_titles: + feedback.warning("No anime selected. Nothing queued.") + return + anime_to_queue = [choice_map[title] for title in selected_titles] + + episode_range_str = options.get("episode_range") + total_queued = 0 + for media_item in anime_to_queue: + available_episodes = [str(i + 1) for i in range(media_item.episodes or 0)] + if not available_episodes: + feedback.warning( + f"No episode information for '{media_item.title.english}', skipping." + ) continue - media_item = search_result.media[0] - available_episodes = [str(i + 1) for i in range(media_item.episodes or 0)] - episodes_to_queue = list( - parse_episode_range(episode_range, available_episodes) - ) + try: + episodes_to_queue = list( + parse_episode_range(episode_range_str, available_episodes) + ) + if not episodes_to_queue: + feedback.warning( + f"Episode range '{episode_range_str}' resulted in no episodes for '{media_item.title.english}'." + ) + continue - queued_count = 0 - for ep in episodes_to_queue: - if download_service.add_to_queue(media_item, ep): - queued_count += 1 + queued_count = 0 + for ep in episodes_to_queue: + if download_service.add_to_queue(media_item, ep): + queued_count += 1 - feedback.success( - f"Successfully queued {queued_count} episodes for '{media_item.title.english}'." - ) + total_queued += queued_count + feedback.success( + f"Queued {queued_count} episodes for '{media_item.title.english}'." + ) + except (ValueError, IndexError) as e: + feedback.error( + f"Invalid episode range for '{media_item.title.english}': {e}" + ) - except FastAnimeError as e: - feedback.error(f"Failed to queue '{anime_title}'", str(e)) - except Exception as e: - feedback.error("An unexpected error occurred", str(e)) + feedback.success( + f"Done. Total of {total_queued} episode(s) queued across all selections." + ) + + except FastAnimeError as e: + feedback.error("Queue command failed", str(e)) + except Exception as e: + feedback.error("An unexpected error occurred", str(e)) diff --git a/fastanime/cli/commands/queue/__init__.py b/fastanime/cli/commands/queue/__init__.py new file mode 100644 index 0000000..26b6954 --- /dev/null +++ b/fastanime/cli/commands/queue/__init__.py @@ -0,0 +1,3 @@ +from .cmd import queue + +__all__ = ["queue"] diff --git a/fastanime/cli/commands/queue/cmd.py b/fastanime/cli/commands/queue/cmd.py new file mode 100644 index 0000000..6ed0060 --- /dev/null +++ b/fastanime/cli/commands/queue/cmd.py @@ -0,0 +1,26 @@ +import click + +from ...utils.lazyloader import LazyGroup + +commands = { + "add": "add.add", + "list": "list.list_cmd", + "resume": "resume.resume", + "clear": "clear.clear_cmd", +} + + +@click.group( + cls=LazyGroup, + name="queue", + root="fastanime.cli.commands.queue.commands", + invoke_without_command=False, + help="Manage the download queue (add, list, resume, clear).", + short_help="Manage the download queue.", + lazy_subcommands=commands, +) +@click.pass_context +def queue(ctx: click.Context): + """Queue management root command.""" + # No-op root; subcommands are lazy-loaded + pass diff --git a/fastanime/cli/commands/queue/commands/__init__.py b/fastanime/cli/commands/queue/commands/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/fastanime/cli/commands/queue/commands/add.py b/fastanime/cli/commands/queue/commands/add.py new file mode 100644 index 0000000..95be02e --- /dev/null +++ b/fastanime/cli/commands/queue/commands/add.py @@ -0,0 +1,213 @@ +import click +from fastanime.core.config import AppConfig +from fastanime.core.exceptions import FastAnimeError +from fastanime.libs.media_api.types import ( + MediaFormat, + MediaGenre, + MediaItem, + MediaSeason, + MediaSort, + MediaStatus, + MediaTag, + MediaType, + MediaYear, +) + + +@click.command(name="add", help="Add episodes to the background download queue.") +# Search/Filter options (mirrors 'fastanime anilist download') +@click.option("--title", "-t") +@click.option("--page", "-p", type=click.IntRange(min=1), default=1) +@click.option("--per-page", type=click.IntRange(min=1, max=50)) +@click.option("--season", type=click.Choice([s.value for s in MediaSeason])) +@click.option( + "--status", "-S", multiple=True, type=click.Choice([s.value for s in MediaStatus]) +) +@click.option( + "--status-not", multiple=True, type=click.Choice([s.value for s in MediaStatus]) +) +@click.option("--sort", "-s", type=click.Choice([s.value for s in MediaSort])) +@click.option( + "--genres", "-g", multiple=True, type=click.Choice([g.value for g in MediaGenre]) +) +@click.option( + "--genres-not", multiple=True, type=click.Choice([g.value for g in MediaGenre]) +) +@click.option("--tags", "-T", multiple=True, type=click.Choice([t.value for t in MediaTag])) +@click.option("--tags-not", multiple=True, type=click.Choice([t.value for t in MediaTag])) +@click.option( + "--media-format", + "-f", + multiple=True, + type=click.Choice([f.value for f in MediaFormat]), +) +@click.option("--media-type", type=click.Choice([t.value for t in MediaType])) +@click.option("--year", "-y", type=click.Choice([y.value for y in MediaYear])) +@click.option("--popularity-greater", type=click.IntRange(min=0)) +@click.option("--popularity-lesser", type=click.IntRange(min=0)) +@click.option("--score-greater", type=click.IntRange(min=0, max=100)) +@click.option("--score-lesser", type=click.IntRange(min=0, max=100)) +@click.option("--start-date-greater", type=int) +@click.option("--start-date-lesser", type=int) +@click.option("--end-date-greater", type=int) +@click.option("--end-date-lesser", type=int) +@click.option("--on-list/--not-on-list", "-L/-no-L", type=bool, default=None) +# Queue-specific options +@click.option( + "--episode-range", + "-r", + required=True, + help="Range of episodes to queue (e.g., '1-10', '5', '8:12').", +) +@click.option( + "--yes", + "-Y", + is_flag=True, + help="Queue for all found anime without prompting for selection.", +) +@click.pass_obj +def add(config: AppConfig, **options): + from fastanime.cli.service.download.service import DownloadService + from fastanime.cli.service.feedback import FeedbackService + from fastanime.cli.service.registry import MediaRegistryService + from fastanime.cli.utils.parser import parse_episode_range + from fastanime.libs.media_api.params import MediaSearchParams + from fastanime.libs.media_api.api import create_api_client + from fastanime.libs.provider.anime.provider import create_provider + from fastanime.libs.selectors import create_selector + from rich.progress import Progress + + feedback = FeedbackService(config) + selector = create_selector(config) + media_api = create_api_client(config.general.media_api, config) + provider = create_provider(config.general.provider) + registry = MediaRegistryService(config.general.media_api, config.media_registry) + download_service = DownloadService(config, registry, media_api, provider) + + try: + # Build search params mirroring anilist download + sort_val = options.get("sort") + status_val = options.get("status") + status_not_val = options.get("status_not") + genres_val = options.get("genres") + genres_not_val = options.get("genres_not") + tags_val = options.get("tags") + tags_not_val = options.get("tags_not") + media_format_val = options.get("media_format") + media_type_val = options.get("media_type") + season_val = options.get("season") + year_val = options.get("year") + + search_params = MediaSearchParams( + query=options.get("title"), + page=options.get("page", 1), + per_page=options.get("per_page"), + sort=MediaSort(sort_val) if sort_val else None, + status_in=[MediaStatus(s) for s in status_val] if status_val else None, + status_not_in=[MediaStatus(s) for s in status_not_val] + if status_not_val + else None, + genre_in=[MediaGenre(g) for g in genres_val] if genres_val else None, + genre_not_in=[MediaGenre(g) for g in genres_not_val] + if genres_not_val + else None, + tag_in=[MediaTag(t) for t in tags_val] if tags_val else None, + tag_not_in=[MediaTag(t) for t in tags_not_val] if tags_not_val else None, + format_in=[MediaFormat(f) for f in media_format_val] + if media_format_val + else None, + type=MediaType(media_type_val) if media_type_val else None, + season=MediaSeason(season_val) if season_val else None, + seasonYear=int(year_val) if year_val else None, + popularity_greater=options.get("popularity_greater"), + popularity_lesser=options.get("popularity_lesser"), + averageScore_greater=options.get("score_greater"), + averageScore_lesser=options.get("score_lesser"), + startDate_greater=options.get("start_date_greater"), + startDate_lesser=options.get("start_date_lesser"), + endDate_greater=options.get("end_date_greater"), + endDate_lesser=options.get("end_date_lesser"), + on_list=options.get("on_list"), + ) + + with Progress() as progress: + progress.add_task("Searching AniList...", total=None) + search_result = media_api.search_media(search_params) + + if not search_result or not search_result.media: + raise FastAnimeError("No anime found matching your search criteria.") + + if options.get("yes"): + anime_to_queue = search_result.media + else: + choice_map: dict[str, MediaItem] = { + (item.title.english or item.title.romaji or f"ID: {item.id}"): item + for item in search_result.media + } + preview_command = None + if config.general.preview != "none": + from ...utils.preview import create_preview_context # type: ignore + + with create_preview_context() as preview_ctx: + preview_command = preview_ctx.get_anime_preview( + list(choice_map.values()), + list(choice_map.keys()), + config, + ) + selected_titles = selector.choose_multiple( + "Select anime to queue", + list(choice_map.keys()), + preview=preview_command, + ) + else: + selected_titles = selector.choose_multiple( + "Select anime to queue", list(choice_map.keys()) + ) + + if not selected_titles: + feedback.warning("No anime selected. Nothing queued.") + return + anime_to_queue = [choice_map[title] for title in selected_titles] + + episode_range_str = options.get("episode_range") + total_queued = 0 + for media_item in anime_to_queue: + available_episodes = [str(i + 1) for i in range(media_item.episodes or 0)] + if not available_episodes: + feedback.warning( + f"No episode information for '{media_item.title.english}', skipping." + ) + continue + + try: + episodes_to_queue = list( + parse_episode_range(episode_range_str, available_episodes) + ) + if not episodes_to_queue: + feedback.warning( + f"Episode range '{episode_range_str}' resulted in no episodes for '{media_item.title.english}'." + ) + continue + + queued_count = 0 + for ep in episodes_to_queue: + if download_service.add_to_queue(media_item, ep): + queued_count += 1 + + total_queued += queued_count + feedback.success( + f"Queued {queued_count} episodes for '{media_item.title.english}'." + ) + except (ValueError, IndexError) as e: + feedback.error( + f"Invalid episode range for '{media_item.title.english}': {e}" + ) + + feedback.success( + f"Done. Total of {total_queued} episode(s) queued across all selections." + ) + + except FastAnimeError as e: + feedback.error("Queue add failed", str(e)) + except Exception as e: + feedback.error("An unexpected error occurred", str(e)) diff --git a/fastanime/cli/commands/queue/commands/clear.py b/fastanime/cli/commands/queue/commands/clear.py new file mode 100644 index 0000000..60f9783 --- /dev/null +++ b/fastanime/cli/commands/queue/commands/clear.py @@ -0,0 +1,30 @@ +import click +from fastanime.core.config import AppConfig + + +@click.command(name="clear", help="Clear queued items from the registry (QUEUED -> NOT_DOWNLOADED).") +@click.option("--force", is_flag=True, help="Do not prompt for confirmation.") +@click.pass_obj +def clear_cmd(config: AppConfig, force: bool): + from fastanime.cli.service.feedback import FeedbackService + from fastanime.cli.service.registry import MediaRegistryService + from fastanime.cli.service.registry.models import DownloadStatus + + feedback = FeedbackService(config) + registry = MediaRegistryService(config.general.media_api, config.media_registry) + + if not force and not click.confirm("This will clear all queued items. Continue?"): + feedback.info("Aborted.") + return + + cleared = 0 + queued = registry.get_episodes_by_download_status(DownloadStatus.QUEUED) + for media_id, ep in queued: + ok = registry.update_episode_download_status( + media_id=media_id, + episode_number=ep, + status=DownloadStatus.NOT_DOWNLOADED, + ) + if ok: + cleared += 1 + feedback.success(f"Cleared {cleared} queued episode(s).") diff --git a/fastanime/cli/commands/queue/commands/list.py b/fastanime/cli/commands/queue/commands/list.py new file mode 100644 index 0000000..54bb6b4 --- /dev/null +++ b/fastanime/cli/commands/queue/commands/list.py @@ -0,0 +1,45 @@ +import click +from fastanime.core.config import AppConfig + + +@click.command(name="list", help="List items in the download queue and their statuses.") +@click.option("--status", type=click.Choice(["queued", "downloading", "completed", "failed", "paused"])) +@click.pass_obj +def list_cmd(config: AppConfig, status: str | None): + from fastanime.cli.service.registry import MediaRegistryService + from fastanime.cli.service.registry.models import DownloadStatus + from fastanime.cli.service.feedback import FeedbackService + + feedback = FeedbackService(config) + registry = MediaRegistryService(config.general.media_api, config.media_registry) + + status_map = { + "queued": DownloadStatus.QUEUED, + "downloading": DownloadStatus.DOWNLOADING, + "completed": DownloadStatus.COMPLETED, + "failed": DownloadStatus.FAILED, + "paused": DownloadStatus.PAUSED, + } + + if status: + target = status_map[status] + episodes = registry.get_episodes_by_download_status(target) + feedback.info(f"{len(episodes)} episode(s) with status {status}.") + for media_id, ep in episodes: + feedback.info(f"- media:{media_id} episode:{ep}") + else: + from rich.table import Table + from rich.console import Console + + stats = registry.get_download_statistics() + table = Table(title="Queue Status") + table.add_column("Metric") + table.add_column("Value") + table.add_row("Queued", str(stats.get("queued", 0))) + table.add_row("Downloading", str(stats.get("downloading", 0))) + table.add_row("Completed", str(stats.get("downloaded", 0))) + table.add_row("Failed", str(stats.get("failed", 0))) + table.add_row("Paused", str(stats.get("paused", 0))) + + console = Console() + console.print(table) diff --git a/fastanime/cli/commands/queue/commands/resume.py b/fastanime/cli/commands/queue/commands/resume.py new file mode 100644 index 0000000..c8d267b --- /dev/null +++ b/fastanime/cli/commands/queue/commands/resume.py @@ -0,0 +1,22 @@ +import click +from fastanime.core.config import AppConfig + + +@click.command(name="resume", help="Submit any queued or in-progress downloads to the worker.") +@click.pass_obj +def resume(config: AppConfig): + from fastanime.cli.service.download.service import DownloadService + from fastanime.cli.service.feedback import FeedbackService + from fastanime.cli.service.registry import MediaRegistryService + from fastanime.libs.media_api.api import create_api_client + from fastanime.libs.provider.anime.provider import create_provider + + feedback = FeedbackService(config) + media_api = create_api_client(config.general.media_api, config) + provider = create_provider(config.general.provider) + registry = MediaRegistryService(config.general.media_api, config.media_registry) + download_service = DownloadService(config, registry, media_api, provider) + + download_service.start() + download_service.resume_unfinished_downloads() + feedback.success("Submitted queued downloads to background worker.") diff --git a/fastanime/cli/commands/worker.py b/fastanime/cli/commands/worker.py index 5bbd74a..9f50784 100644 --- a/fastanime/cli/commands/worker.py +++ b/fastanime/cli/commands/worker.py @@ -16,6 +16,7 @@ def worker(config: AppConfig): from fastanime.cli.service.notification.service import NotificationService from fastanime.cli.service.registry.service import MediaRegistryService from fastanime.cli.service.worker.service import BackgroundWorkerService + from fastanime.cli.service.auth import AuthService from fastanime.libs.media_api.api import create_api_client from fastanime.libs.provider.anime.provider import create_provider @@ -26,6 +27,13 @@ def worker(config: AppConfig): # Instantiate services media_api = create_api_client(config.general.media_api, config) + # Authenticate if credentials exist (enables notifications) + auth = AuthService(config.general.media_api) + if profile := auth.get_auth(): + try: + media_api.authenticate(profile.token) + except Exception: + pass provider = create_provider(config.general.provider) registry = MediaRegistryService(config.general.media_api, config.media_registry) diff --git a/fastanime/cli/service/download/service.py b/fastanime/cli/service/download/service.py index 52930de..d620d33 100644 --- a/fastanime/cli/service/download/service.py +++ b/fastanime/cli/service/download/service.py @@ -36,6 +36,8 @@ class DownloadService: self.media_api = media_api_service self.provider = provider_service self.downloader = create_downloader(config.downloads) + # Track in-flight downloads to avoid duplicate queueing + self._inflight: set[tuple[int, str]] = set() # Worker is kept for potential future background commands self._worker = ManagedBackgroundWorker( @@ -56,18 +58,25 @@ class DownloadService: self._worker.shutdown(wait=False) def add_to_queue(self, media_item: MediaItem, episode_number: str) -> bool: - """Adds a download job to the ASYNCHRONOUS queue.""" + """Mark an episode as queued in the registry (no immediate download).""" logger.info( - f"Queueing background download for '{media_item.title.english}' Episode {episode_number}" + f"Queueing episode '{episode_number}' for '{media_item.title.english}' (registry only)" ) self.registry.get_or_create_record(media_item) - updated = self.registry.update_episode_download_status( + return self.registry.update_episode_download_status( media_id=media_item.id, episode_number=episode_number, status=DownloadStatus.QUEUED, ) - if not updated: + + def _submit_download(self, media_item: MediaItem, episode_number: str) -> bool: + """Submit a download task to the worker if not already in-flight.""" + key = (media_item.id, str(episode_number)) + if key in self._inflight: return False + if not self._worker.is_running(): + self._worker.start() + self._inflight.add(key) self._worker.submit_function( self._execute_download_job, media_item, episode_number ) @@ -108,9 +117,11 @@ class DownloadService: f"Found {len(unfinished_jobs)} unfinished downloads. Re-queueing..." ) for media_id, episode_number in unfinished_jobs: + if (media_id, str(episode_number)) in self._inflight: + continue record = self.registry.get_media_record(media_id) if record and record.media_item: - self.add_to_queue(record.media_item, episode_number) + self._submit_download(record.media_item, episode_number) else: logger.error( f"Could not find metadata for media ID {media_id}. Cannot resume. Please run 'fastanime registry sync'." @@ -244,3 +255,9 @@ class DownloadService: status=DownloadStatus.FAILED, error_message=str(e), ) + finally: + # Remove from in-flight tracking regardless of outcome + try: + self._inflight.discard((media_item.id, str(episode_number))) + except Exception: + pass diff --git a/fastanime/cli/service/notification/service.py b/fastanime/cli/service/notification/service.py index ae067c8..6200aed 100644 --- a/fastanime/cli/service/notification/service.py +++ b/fastanime/cli/service/notification/service.py @@ -1,41 +1,58 @@ -import json import logging -from typing import Set +from pathlib import Path +from typing import Optional, Set + +import httpx from fastanime.core.constants import APP_CACHE_DIR from fastanime.libs.media_api.base import BaseApiClient +from fastanime.libs.media_api.types import MediaItem +# Note: Previously used image resizing; now we download icons directly without resizing. try: - import plyer + from plyer import notification as plyer_notification PLYER_AVAILABLE = True -except ImportError: +except ImportError: # pragma: no cover - optional dependency + plyer_notification = None # type: ignore[assignment] PLYER_AVAILABLE = False logger = logging.getLogger(__name__) -SEEN_NOTIFICATIONS_CACHE = APP_CACHE_DIR / "seen_notifications.json" class NotificationService: - def __init__(self, media_api: BaseApiClient): + def __init__(self, media_api: BaseApiClient, registry_service=None): self.media_api = media_api + self.registry = registry_service # optional; used for seen tracking self._seen_ids: Set[int] = self._load_seen_ids() def _load_seen_ids(self) -> Set[int]: - if not SEEN_NOTIFICATIONS_CACHE.exists(): - return set() + # Prefer MediaRegistry storage via index.last_notified_episode markers try: - with open(SEEN_NOTIFICATIONS_CACHE, "r") as f: - return set(json.load(f)) - except (json.JSONDecodeError, IOError): + if not self.registry: + return set() + seen: Set[int] = set() + for record in self.registry.get_all_media_records(): + index_entry = self.registry.get_media_index_entry( + record.media_item.id + ) + # last_notified_episode stored per media; we can’t reconstruct notif IDs, + # so keep an in-memory set per session (fresh on start). Return empty. + # Future: persist a mapping media_id->max_created_at for durability. + return seen + except Exception: return set() - def _save_seen_ids(self): - try: - with open(SEEN_NOTIFICATIONS_CACHE, "w") as f: - json.dump(list(self._seen_ids), f) - except IOError: - logger.error("Failed to save seen notifications cache.") + def _mark_seen(self, notification_id: int, media_id: int, episode: str | None): + self._seen_ids.add(notification_id) + # Also update registry’s last_notified_episode for the media + if self.registry and episode: + try: + self.registry.update_media_index_entry( + media_id, last_notified_episode=str(episode) + ) + except Exception: + logger.debug("Failed to update last_notified_episode in registry") def check_and_display_notifications(self): if not PLYER_AVAILABLE: @@ -53,26 +70,93 @@ class NotificationService: logger.info("No new notifications found.") return - new_notifications = [n for n in notifications if n.id not in self._seen_ids] + # Filter out notifications already seen in this session or older than registry marker + filtered = [] + for n in notifications: + if n.id in self._seen_ids: + continue + if self._is_seen_in_registry(n.media.id, n.episode): + continue + filtered.append(n) - if not new_notifications: + if not filtered: logger.info("No unseen notifications found.") return - for notif in new_notifications: + for notif in filtered: title = notif.media.title.english or notif.media.title.romaji message = f"Episode {notif.episode} of {title} has aired!" + # Try to include an image (cover large/extra_large) if available + app_icon: Optional[str] = None try: - plyer.notification.notify( + icon_path = self._get_or_fetch_icon(notif.media) + app_icon = str(icon_path) if icon_path else None + except Exception: + app_icon = None + + try: + # Guard: only call if available + if not PLYER_AVAILABLE or plyer_notification is None: + raise RuntimeError("Notification backend unavailable") + # Assert for type checkers and runtime safety + assert plyer_notification is not None + plyer_notification.notify( title="FastAnime: New Episode", message=message, app_name="FastAnime", + app_icon=app_icon, # plyer supports file paths or URLs depending on platform timeout=20, ) logger.info(f"Displayed notification: {message}") - self._seen_ids.add(notif.id) + self._mark_seen( + notif.id, + notif.media.id, + str(notif.episode) if notif.episode is not None else None, + ) except Exception as e: logger.error(f"Failed to display notification: {e}") - self._save_seen_ids() + def _is_seen_in_registry(self, media_id: int, episode: Optional[int]) -> bool: + if not self.registry or episode is None: + return False + try: + entry = self.registry.get_media_index_entry(media_id) + if not entry or not entry.last_notified_episode: + return False + # Compare numerically + try: + last_ep = float(entry.last_notified_episode) + return float(episode) <= last_ep + except Exception: + return str(episode) <= entry.last_notified_episode + except Exception: + return False + + def _get_or_fetch_icon(self, media_item: MediaItem) -> Optional[Path]: + """Fetch and cache a small cover image for system notifications.""" + try: + cover = media_item.cover_image + url = None + if cover: + url = cover.extra_large or cover.large or cover.medium + if not url: + return None + + cache_dir = APP_CACHE_DIR / "notification_icons" + cache_dir.mkdir(parents=True, exist_ok=True) + icon_path = cache_dir / f"{media_item.id}.png" + if icon_path.exists() and icon_path.stat().st_size > 0: + return icon_path + + # Directly download the image bytes without resizing + with httpx.Client(follow_redirects=True, timeout=20) as client: + resp = client.get(url) + resp.raise_for_status() + data = resp.content + if data: + icon_path.write_bytes(data) + return icon_path + except Exception as e: + logger.debug(f"Could not fetch icon for media {media_item.id}: {e}") + return None diff --git a/fastanime/cli/service/registry/service.py b/fastanime/cli/service/registry/service.py index 8e6e0b3..f1cc708 100644 --- a/fastanime/cli/service/registry/service.py +++ b/fastanime/cli/service/registry/service.py @@ -550,13 +550,12 @@ class MediaRegistryService: break if not episode_record: - if not file_path: - logger.error(f"File path required for new episode {episode_number}") - return False + # Allow creation without file_path for queued/in-progress states. + # Only require file_path once the episode is marked COMPLETED. episode_record = MediaEpisode( episode_number=episode_number, - file_path=file_path, download_status=status, + file_path=file_path, ) record.media_episodes.append(episode_record) @@ -564,6 +563,12 @@ class MediaRegistryService: episode_record.download_status = status if file_path: episode_record.file_path = file_path + elif status.name == "COMPLETED" and not episode_record.file_path: + logger.warning( + "Completed status set without file_path for media %s episode %s", + media_id, + episode_number, + ) if file_size is not None: episode_record.file_size = file_size if quality: diff --git a/fastanime/cli/service/worker/service.py b/fastanime/cli/service/worker/service.py index fba45da..b76e63a 100644 --- a/fastanime/cli/service/worker/service.py +++ b/fastanime/cli/service/worker/service.py @@ -1,5 +1,8 @@ import logging +import signal +import threading import time +from typing import Optional from fastanime.cli.service.download.service import DownloadService from fastanime.cli.service.notification.service import NotificationService @@ -18,44 +21,96 @@ class BackgroundWorkerService: self.config = config self.notification_service = notification_service self.download_service = download_service - self.running = True + self._stop_event = threading.Event() + self._signals_installed = False - def run(self): - logger.info("Background worker started.") - last_notification_check = 0 - last_download_check = 0 + def _install_signal_handlers(self): + """Install SIGINT/SIGTERM handlers to allow graceful shutdown when run in foreground.""" + if self._signals_installed: + return - notification_interval_sec = self.config.notification_check_interval * 60 - download_interval_sec = self.config.download_check_interval * 60 - self.download_service.start() - - try: - while self.running: - current_time = time.time() - - # Check for notifications - if current_time - last_notification_check > notification_interval_sec: - try: - self.notification_service.check_and_display_notifications() - except Exception as e: - logger.error(f"Error during notification check: {e}") - last_notification_check = current_time - - # Process download queue - if current_time - last_download_check > download_interval_sec: - try: - self.download_service.resume_unfinished_downloads() - except Exception as e: - logger.error(f"Error during download queue processing: {e}") - last_download_check = current_time - - # Sleep for a short interval to prevent high CPU usage - time.sleep(30) # Sleep for 30 seconds before next check cycle - - except KeyboardInterrupt: - logger.info("Background worker stopped by user.") + def _handler(signum, frame): # noqa: ARG001 (signature fixed by signal) + logger.info("Received signal %s, shutting down background worker...", signum) self.stop() + try: + signal.signal(signal.SIGINT, _handler) + signal.signal(signal.SIGTERM, _handler) + self._signals_installed = True + except Exception: + # Signal handling may fail in non-main threads or certain environments + logger.debug("Signal handlers not installed (non-main thread or unsupported environment).") + + def run(self): + """Run the background loop until stopped. + + Responsibilities: + - Periodically check AniList notifications (if authenticated & plyer available) + - Periodically resume/process unfinished downloads + - Keep CPU usage low using an event-based wait + - Gracefully terminate on KeyboardInterrupt/SIGTERM + """ + logger.info("Background worker starting...") + + # Convert configured minutes to seconds + notification_interval_sec = max(60, self.config.notification_check_interval * 60) + download_interval_sec = max(60, self.config.download_check_interval * 60) + + # Start download worker and attempt resuming pending jobs once at startup + self.download_service.start() + + # Schedule the very first execution immediately + next_notification_ts: Optional[float] = 0.0 + next_download_ts: Optional[float] = 0.0 + + # Install signal handlers if possible + self._install_signal_handlers() + + try: + while not self._stop_event.is_set(): + now = time.time() + + # Check for notifications + if next_notification_ts is not None and now >= next_notification_ts: + try: + self.notification_service.check_and_display_notifications() + except Exception: + logger.exception("Error during notification check") + finally: + next_notification_ts = now + notification_interval_sec + + # Process download queue + if next_download_ts is not None and now >= next_download_ts: + try: + self.download_service.resume_unfinished_downloads() + except Exception: + logger.exception("Error during download queue processing") + finally: + next_download_ts = now + download_interval_sec + + # Determine how long to wait until the next scheduled task + next_events = [t for t in (next_notification_ts, next_download_ts) if t is not None] + if next_events: + time_until_next = max(0.0, min(next_events) - time.time()) + else: + time_until_next = 30.0 + + # Cap wait to react reasonably fast to stop requests + wait_time = min(time_until_next, 30.0) + self._stop_event.wait(timeout=wait_time) + + except KeyboardInterrupt: + logger.info("Background worker interrupted by user. Stopping...") + self.stop() + finally: + # Ensure we always stop the download worker + try: + self.download_service.stop() + except Exception: + logger.exception("Failed to stop download service cleanly") + logger.info("Background worker stopped.") + def stop(self): - self.running = False - logger.info("Background worker shutting down.") + if not self._stop_event.is_set(): + logger.info("Background worker shutting down...") + self._stop_event.set() diff --git a/pyproject.toml b/pyproject.toml index 1285ce5..038c279 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -23,7 +23,10 @@ standard = [ "yt-dlp>=2025.7.21", "pycryptodomex>=3.23.0", ] -notifications = ["plyer>=2.1.0"] +notifications = [ + "dbus-python>=1.4.0", + "plyer>=2.1.0", +] mpv = [ "mpv>=1.0.7", ] diff --git a/uv.lock b/uv.lock index 4a3a876..d895625 100644 --- a/uv.lock +++ b/uv.lock @@ -74,6 +74,12 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/d1/d6/3965ed04c63042e047cb6a3e6ed1a63a35087b6a609aa3a15ed8ac56c221/colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6", size = 25335, upload-time = "2022-10-25T02:36:20.889Z" }, ] +[[package]] +name = "dbus-python" +version = "1.4.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/ff/24/63118050c7dd7be04b1ccd60eab53fef00abe844442e1b6dec92dae505d6/dbus-python-1.4.0.tar.gz", hash = "sha256:991666e498f60dbf3e49b8b7678f5559b8a65034fdf61aae62cdecdb7d89c770", size = 232490, upload-time = "2025-03-13T19:57:54.212Z" } + [[package]] name = "distlib" version = "0.4.0" @@ -122,6 +128,7 @@ mpv = [ { name = "mpv" }, ] notifications = [ + { name = "dbus-python" }, { name = "plyer" }, ] standard = [ @@ -150,6 +157,7 @@ dev = [ [package.metadata] requires-dist = [ { name = "click", specifier = ">=8.1.7" }, + { name = "dbus-python", marker = "extra == 'notifications'", specifier = ">=1.4.0" }, { name = "httpx", specifier = ">=0.28.1" }, { name = "inquirerpy", specifier = ">=0.3.4" }, { name = "libtorrent", marker = "extra == 'torrent'", specifier = ">=2.0.11" },