feat: the worker command lol

This commit is contained in:
Benexl
2025-08-11 22:41:59 +03:00
parent ae62adf233
commit 66eb854da5
10 changed files with 156 additions and 54 deletions

View File

@@ -15,7 +15,6 @@ from fastanime.libs.media_api.types import (
@click.command(name="add", help="Add episodes to the background download queue.")
# Search/Filter options (mirrors 'fastanime anilist download')
@click.option("--title", "-t")
@click.option("--page", "-p", type=click.IntRange(min=1), default=1)
@click.option("--per-page", type=click.IntRange(min=1, max=50))
@@ -33,8 +32,12 @@ from fastanime.libs.media_api.types import (
@click.option(
"--genres-not", multiple=True, type=click.Choice([g.value for g in MediaGenre])
)
@click.option("--tags", "-T", multiple=True, type=click.Choice([t.value for t in MediaTag]))
@click.option("--tags-not", multiple=True, type=click.Choice([t.value for t in MediaTag]))
@click.option(
"--tags", "-T", multiple=True, type=click.Choice([t.value for t in MediaTag])
)
@click.option(
"--tags-not", multiple=True, type=click.Choice([t.value for t in MediaTag])
)
@click.option(
"--media-format",
"-f",
@@ -67,12 +70,12 @@ from fastanime.libs.media_api.types import (
)
@click.pass_obj
def add(config: AppConfig, **options):
from fastanime.cli.service.download.service import DownloadService
from fastanime.cli.service.download import DownloadService
from fastanime.cli.service.feedback import FeedbackService
from fastanime.cli.service.registry import MediaRegistryService
from fastanime.cli.utils.parser import parse_episode_range
from fastanime.libs.media_api.params import MediaSearchParams
from fastanime.libs.media_api.api import create_api_client
from fastanime.libs.media_api.params import MediaSearchParams
from fastanime.libs.provider.anime.provider import create_provider
from fastanime.libs.selectors import create_selector
from rich.progress import Progress
@@ -146,7 +149,7 @@ def add(config: AppConfig, **options):
}
preview_command = None
if config.general.preview != "none":
from ...utils.preview import create_preview_context # type: ignore
from fastanime.cli.utils.preview import create_preview_context
with create_preview_context() as preview_ctx:
preview_command = preview_ctx.get_anime_preview(
@@ -172,6 +175,7 @@ def add(config: AppConfig, **options):
episode_range_str = options.get("episode_range")
total_queued = 0
for media_item in anime_to_queue:
# TODO: do a provider search here to determine episodes available maybe, or allow pasing of an episode list probably just change the format for parsing episodes
available_episodes = [str(i + 1) for i in range(media_item.episodes or 0)]
if not available_episodes:
feedback.warning(

View File

@@ -3,12 +3,16 @@ from fastanime.core.config import AppConfig
@click.command(name="list", help="List items in the download queue and their statuses.")
@click.option("--status", type=click.Choice(["queued", "downloading", "completed", "failed", "paused"]))
@click.option(
"--status",
type=click.Choice(["queued", "downloading", "completed", "failed", "paused"]),
)
@click.option("--detailed", is_flag=True)
@click.pass_obj
def list_cmd(config: AppConfig, status: str | None):
def list_cmd(config: AppConfig, status: str | None, detailed: bool | None):
from fastanime.cli.service.feedback import FeedbackService
from fastanime.cli.service.registry import MediaRegistryService
from fastanime.cli.service.registry.models import DownloadStatus
from fastanime.cli.service.feedback import FeedbackService
feedback = FeedbackService(config)
registry = MediaRegistryService(config.general.media_api, config.media_registry)
@@ -21,6 +25,17 @@ def list_cmd(config: AppConfig, status: str | None):
"paused": DownloadStatus.PAUSED,
}
# TODO: improve this by modifying the download_status function or create new function
if detailed and status:
target = status_map[status]
episodes = registry.get_episodes_by_download_status(target)
feedback.info(f"{len(episodes)} episode(s) with status {status}.")
for media_id, ep in episodes:
record = registry.get_media_record(media_id)
if record:
feedback.info(f"{record.media_item.title.english} episode {ep}")
return
if status:
target = status_map[status]
episodes = registry.get_episodes_by_download_status(target)
@@ -28,8 +43,8 @@ def list_cmd(config: AppConfig, status: str | None):
for media_id, ep in episodes:
feedback.info(f"- media:{media_id} episode:{ep}")
else:
from rich.table import Table
from rich.console import Console
from rich.table import Table
stats = registry.get_download_statistics()
table = Table(title="Queue Status")

View File

@@ -11,12 +11,12 @@ def worker(config: AppConfig):
process any queued downloads. It's recommended to run this in the
background (e.g., 'fastanime worker &') or as a system service.
"""
from fastanime.cli.service.auth import AuthService
from fastanime.cli.service.download.service import DownloadService
from fastanime.cli.service.feedback import FeedbackService
from fastanime.cli.service.notification.service import NotificationService
from fastanime.cli.service.registry.service import MediaRegistryService
from fastanime.cli.service.worker.service import BackgroundWorkerService
from fastanime.cli.service.auth import AuthService
from fastanime.libs.media_api.api import create_api_client
from fastanime.libs.provider.anime.provider import create_provider
@@ -37,7 +37,7 @@ def worker(config: AppConfig):
provider = create_provider(config.general.provider)
registry = MediaRegistryService(config.general.media_api, config.media_registry)
notification_service = NotificationService(media_api)
notification_service = NotificationService(config, media_api, registry)
download_service = DownloadService(config, registry, media_api, provider)
worker_service = BackgroundWorkerService(
config.worker, notification_service, download_service

View File

@@ -0,0 +1,3 @@
from .service import DownloadService
__all__ = ["DownloadService"]

View File

@@ -39,7 +39,6 @@ class DownloadService:
# Track in-flight downloads to avoid duplicate queueing
self._inflight: set[tuple[int, str]] = set()
# Worker is kept for potential future background commands
self._worker = ManagedBackgroundWorker(
max_workers=config.downloads.max_concurrent_downloads,
name="DownloadWorker",
@@ -101,6 +100,7 @@ class DownloadService:
def resume_unfinished_downloads(self):
"""Finds and re-queues any downloads that were left in an unfinished state."""
logger.info("Checking for unfinished downloads to resume...")
# TODO: make the checking of unfinished downloads more efficient probably by modifying the registry to be aware of what actually changed and load that instead
queued_jobs = self.registry.get_episodes_by_download_status(
DownloadStatus.QUEUED
)
@@ -127,6 +127,49 @@ class DownloadService:
f"Could not find metadata for media ID {media_id}. Cannot resume. Please run 'fastanime registry sync'."
)
def retry_failed_downloads(self):
"""Finds and re-queues any downloads that were left in an unfinished state."""
logger.info("Checking for unfinished downloads to resume...")
# TODO: may need to improve this
queued_jobs = self.registry.get_episodes_by_download_status(
DownloadStatus.FAILED
)
unfinished_jobs = queued_jobs
if not unfinished_jobs:
logger.info("No unfinished downloads found.")
return
logger.info(
f"Found {len(unfinished_jobs)} unfinished downloads. Re-queueing..."
)
for media_id, episode_number in unfinished_jobs:
if (media_id, str(episode_number)) in self._inflight:
continue
record = self.registry.get_media_record(media_id)
if record and record.media_item:
for episode in record.media_episodes:
if episode_number != episode.episode_number:
continue
if (
episode.download_attempts
<= self.config.downloads.max_retry_attempts
):
logger.info(
f"Retrying {episode_number} of {record.media_item.title.english}"
)
self._submit_download(record.media_item, episode_number)
else:
logger.info(
f"Max attempts reached for {episode_number} of {record.media_item.title.english}"
)
else:
logger.error(
f"Could not find metadata for media ID {media_id}. Cannot resume. Please run 'fastanime registry sync'."
)
def _execute_download_job(self, media_item: MediaItem, episode_number: str):
"""The core download logic, can be called by worker or synchronously."""
self.registry.get_or_create_record(media_item)

View File

@@ -1,13 +1,14 @@
import logging
from pathlib import Path
from typing import Optional, Set
from typing import Optional
import httpx
from fastanime.cli.service.registry import MediaRegistryService
from fastanime.cli.service.registry.models import DownloadStatus
from fastanime.core.config.model import AppConfig
from fastanime.core.constants import APP_CACHE_DIR
from fastanime.libs.media_api.base import BaseApiClient
from fastanime.libs.media_api.types import MediaItem
# Note: Previously used image resizing; now we download icons directly without resizing.
from fastanime.libs.media_api.types import MediaItem, Notification
try:
from plyer import notification as plyer_notification
@@ -19,33 +20,21 @@ except ImportError: # pragma: no cover - optional dependency
logger = logging.getLogger(__name__)
NOTIFICATION_ICONS_CACHE_DIR = APP_CACHE_DIR / "notification_icons"
class NotificationService:
def __init__(self, media_api: BaseApiClient, registry_service=None):
def __init__(
self,
app_config: AppConfig,
media_api: BaseApiClient,
registry_service: MediaRegistryService,
):
self.media_api = media_api
self.registry = registry_service # optional; used for seen tracking
self._seen_ids: Set[int] = self._load_seen_ids()
def _load_seen_ids(self) -> Set[int]:
# Prefer MediaRegistry storage via index.last_notified_episode markers
try:
if not self.registry:
return set()
seen: Set[int] = set()
for record in self.registry.get_all_media_records():
index_entry = self.registry.get_media_index_entry(
record.media_item.id
)
# last_notified_episode stored per media; we cant reconstruct notif IDs,
# so keep an in-memory set per session (fresh on start). Return empty.
# Future: persist a mapping media_id->max_created_at for durability.
return seen
except Exception:
return set()
self.app_config = app_config
self.registry = registry_service
def _mark_seen(self, notification_id: int, media_id: int, episode: str | None):
self._seen_ids.add(notification_id)
# Also update registrys last_notified_episode for the media
if self.registry and episode:
try:
self.registry.update_media_index_entry(
@@ -71,10 +60,8 @@ class NotificationService:
return
# Filter out notifications already seen in this session or older than registry marker
filtered = []
filtered: list[Notification] = []
for n in notifications:
if n.id in self._seen_ids:
continue
if self._is_seen_in_registry(n.media.id, n.episode):
continue
filtered.append(n)
@@ -84,6 +71,14 @@ class NotificationService:
return
for notif in filtered:
if self.app_config.worker.auto_download_new_episode:
if not self.registry.get_media_record(notif.media.id):
self.registry.get_or_create_record(notif.media)
self.registry.update_episode_download_status(
media_id=notif.media.id,
episode_number=str(notif.episode),
status=DownloadStatus.QUEUED,
)
title = notif.media.title.english or notif.media.title.romaji
message = f"Episode {notif.episode} of {title} has aired!"
@@ -101,7 +96,7 @@ class NotificationService:
raise RuntimeError("Notification backend unavailable")
# Assert for type checkers and runtime safety
assert plyer_notification is not None
plyer_notification.notify(
plyer_notification.notify( # type: ignore
title="FastAnime: New Episode",
message=message,
app_name="FastAnime",
@@ -118,7 +113,7 @@ class NotificationService:
logger.error(f"Failed to display notification: {e}")
def _is_seen_in_registry(self, media_id: int, episode: Optional[int]) -> bool:
if not self.registry or episode is None:
if episode is None:
return False
try:
entry = self.registry.get_media_index_entry(media_id)
@@ -129,7 +124,7 @@ class NotificationService:
last_ep = float(entry.last_notified_episode)
return float(episode) <= last_ep
except Exception:
return str(episode) <= entry.last_notified_episode
return False
except Exception:
return False
@@ -143,7 +138,7 @@ class NotificationService:
if not url:
return None
cache_dir = APP_CACHE_DIR / "notification_icons"
cache_dir = NOTIFICATION_ICONS_CACHE_DIR
cache_dir.mkdir(parents=True, exist_ok=True)
icon_path = cache_dir / f"{media_item.id}.png"
if icon_path.exists() and icon_path.stat().st_size > 0:

View File

@@ -40,8 +40,6 @@ class MediaEpisode(BaseModel):
download_attempts: int = 0 # Number of download attempts
last_error: Optional[str] = None # Last error message if failed
model_config = {"arbitrary_types_allowed": True}
class MediaRecord(BaseModel):
media_item: MediaItem

View File

@@ -85,6 +85,13 @@ class MediaRegistryService:
logger.debug("saved registry index")
def get_seen_notifications(self) -> dict[int, str]:
seen = {}
for id, index_entry in self._load_index().media_index.items():
if episode := index_entry.last_notified_episode:
seen[index_entry.media_id] = episode
return seen
def get_media_index_entry(self, media_id: int) -> Optional[MediaRegistryIndexEntry]:
index = self._load_index()
return index.media_index.get(f"{self._media_api}_{media_id}")
@@ -102,7 +109,7 @@ class MediaRegistryService:
record = MediaRecord.model_validate(data)
logger.debug(f"Loaded media record for {media_id}")
# logger.debug(f"Loaded media record for {media_id}")
return record
def get_or_create_index_entry(self, media_id: int) -> MediaRegistryIndexEntry:

View File

@@ -30,7 +30,9 @@ class BackgroundWorkerService:
return
def _handler(signum, frame): # noqa: ARG001 (signature fixed by signal)
logger.info("Received signal %s, shutting down background worker...", signum)
logger.info(
"Received signal %s, shutting down background worker...", signum
)
self.stop()
try:
@@ -39,7 +41,9 @@ class BackgroundWorkerService:
self._signals_installed = True
except Exception:
# Signal handling may fail in non-main threads or certain environments
logger.debug("Signal handlers not installed (non-main thread or unsupported environment).")
logger.debug(
"Signal handlers not installed (non-main thread or unsupported environment)."
)
def run(self):
"""Run the background loop until stopped.
@@ -53,8 +57,13 @@ class BackgroundWorkerService:
logger.info("Background worker starting...")
# Convert configured minutes to seconds
notification_interval_sec = max(60, self.config.notification_check_interval * 60)
notification_interval_sec = max(
60, self.config.notification_check_interval * 60
)
download_interval_sec = max(60, self.config.download_check_interval * 60)
download_retry_interval_sec = max(
60, self.config.download_check_failed_interval * 60
)
# Start download worker and attempt resuming pending jobs once at startup
self.download_service.start()
@@ -62,6 +71,7 @@ class BackgroundWorkerService:
# Schedule the very first execution immediately
next_notification_ts: Optional[float] = 0.0
next_download_ts: Optional[float] = 0.0
next_retry_download_ts: Optional[float] = 0.0
# Install signal handlers if possible
self._install_signal_handlers()
@@ -73,6 +83,7 @@ class BackgroundWorkerService:
# Check for notifications
if next_notification_ts is not None and now >= next_notification_ts:
try:
logger.info("Checking for notifications...")
self.notification_service.check_and_display_notifications()
except Exception:
logger.exception("Error during notification check")
@@ -88,8 +99,25 @@ class BackgroundWorkerService:
finally:
next_download_ts = now + download_interval_sec
if next_retry_download_ts is not None and now >= next_retry_download_ts:
try:
self.download_service.retry_failed_downloads()
except Exception:
logger.exception(
"Error during failed download queue processing"
)
finally:
next_retry_download_ts = now + download_retry_interval_sec
# Determine how long to wait until the next scheduled task
next_events = [t for t in (next_notification_ts, next_download_ts) if t is not None]
next_events = [
t
for t in (
next_notification_ts,
next_download_ts,
next_retry_download_ts,
)
if t is not None
]
if next_events:
time_until_next = max(0.0, min(next_events) - time.time())
else:

View File

@@ -244,6 +244,15 @@ class WorkerConfig(OtherConfig):
ge=1,
description="How often to process the download queue (in minutes).",
)
download_check_failed_interval: int = Field(
default=60, # in minutes
ge=1,
description="How often to process the failed download queue (in minutes).",
)
auto_download_new_episode: bool = Field(
default=True,
description="Whether to automatically download a new episode that has been notified",
)
class SessionsConfig(OtherConfig):
@@ -399,7 +408,7 @@ class DownloadsConfig(OtherConfig):
ge=1,
description=desc.DOWNLOADS_MAX_CONCURRENT,
)
retry_attempts: int = Field(
max_retry_attempts: int = Field(
default=defaults.DOWNLOADS_RETRY_ATTEMPTS,
ge=0,
description=desc.DOWNLOADS_RETRY_ATTEMPTS,