Merge branch 'master' of https://github.com/Benexl/FastAnime into minor-fixes

This commit is contained in:
Type-Delta
2025-08-12 14:35:52 +07:00
29 changed files with 1409 additions and 200 deletions

View File

@@ -20,10 +20,13 @@ from ...core.config import AppConfig
fastanime config --path
\b
# print desktop entry info
fastanime config --desktop-entry
fastanime config --generate-desktop-entry
\b
# update your config without opening an editor
fastanime --icons --fzf --preview config --update
fastanime --icons --selector fzf --preview full config --update
\b
# interactively define your config
fastanime config --interactive
\b
# view the current contents of your config
fastanime config --view
@@ -40,9 +43,9 @@ from ...core.config import AppConfig
is_flag=True,
)
@click.option(
"--desktop-entry",
"--generate-desktop-entry",
"-d",
help="Configure the desktop entry of fastanime",
help="Generate the desktop entry of fastanime",
is_flag=True,
)
@click.option(
@@ -59,7 +62,13 @@ from ...core.config import AppConfig
)
@click.pass_obj
def config(
user_config: AppConfig, path, view, view_json, desktop_entry, update, interactive
user_config: AppConfig,
path,
view,
view_json,
generate_desktop_entry,
update,
interactive,
):
from ...core.constants import USER_CONFIG
from ..config.editor import InteractiveConfigEditor
@@ -85,7 +94,7 @@ def config(
import json
print(json.dumps(user_config.model_dump(mode="json")))
elif desktop_entry:
elif generate_desktop_entry:
_generate_desktop_entry()
elif interactive:
editor = InteractiveConfigEditor(current_config=user_config)
@@ -123,9 +132,9 @@ def _generate_desktop_entry():
EXECUTABLE = shutil.which("fastanime")
if EXECUTABLE:
cmds = f"{EXECUTABLE} --rofi anilist"
cmds = f"{EXECUTABLE} --selector rofi anilist"
else:
cmds = f"{sys.executable} -m fastanime --rofi anilist"
cmds = f"{sys.executable} -m fastanime --selector rofi anilist"
# TODO: Get funs of the other platforms to complete this lol
if PLATFORM == "win32":
@@ -140,7 +149,7 @@ def _generate_desktop_entry():
desktop_entry = dedent(
f"""
[Desktop Entry]
Name={PROJECT_NAME}
Name={PROJECT_NAME.title()}
Type=Application
version={__version__}
Path={Path().home()}

View File

@@ -1,62 +1,218 @@
import click
from fastanime.core.config import AppConfig
from fastanime.core.exceptions import FastAnimeError
from fastanime.libs.media_api.params import MediaSearchParams
from fastanime.libs.media_api.types import (
MediaFormat,
MediaGenre,
MediaItem,
MediaSeason,
MediaSort,
MediaStatus,
MediaTag,
MediaType,
MediaYear,
)
@click.command(help="Queue episodes for the background worker to download.")
# Search/Filter options (mirrors 'fastanime anilist download')
@click.option("--title", "-t")
@click.option("--page", "-p", type=click.IntRange(min=1), default=1)
@click.option("--per-page", type=click.IntRange(min=1, max=50))
@click.option("--season", type=click.Choice([s.value for s in MediaSeason]))
@click.option(
"--title", "-t", required=True, multiple=True, help="Anime title to queue."
"--status", "-S", multiple=True, type=click.Choice([s.value for s in MediaStatus])
)
@click.option(
"--episode-range", "-r", required=True, help="Range of episodes (e.g., '1-10')."
"--status-not", multiple=True, type=click.Choice([s.value for s in MediaStatus])
)
@click.option("--sort", "-s", type=click.Choice([s.value for s in MediaSort]))
@click.option(
"--genres", "-g", multiple=True, type=click.Choice([g.value for g in MediaGenre])
)
@click.option(
"--genres-not", multiple=True, type=click.Choice([g.value for g in MediaGenre])
)
@click.option("--tags", "-T", multiple=True, type=click.Choice([t.value for t in MediaTag]))
@click.option("--tags-not", multiple=True, type=click.Choice([t.value for t in MediaTag]))
@click.option(
"--media-format",
"-f",
multiple=True,
type=click.Choice([f.value for f in MediaFormat]),
)
@click.option("--media-type", type=click.Choice([t.value for t in MediaType]))
@click.option("--year", "-y", type=click.Choice([y.value for y in MediaYear]))
@click.option("--popularity-greater", type=click.IntRange(min=0))
@click.option("--popularity-lesser", type=click.IntRange(min=0))
@click.option("--score-greater", type=click.IntRange(min=0, max=100))
@click.option("--score-lesser", type=click.IntRange(min=0, max=100))
@click.option("--start-date-greater", type=int)
@click.option("--start-date-lesser", type=int)
@click.option("--end-date-greater", type=int)
@click.option("--end-date-lesser", type=int)
@click.option("--on-list/--not-on-list", "-L/-no-L", type=bool, default=None)
# Queue-specific options
@click.option(
"--episode-range",
"-r",
required=True,
help="Range of episodes to queue (e.g., '1-10', '5', '8:12').",
)
@click.option(
"--yes",
"-Y",
is_flag=True,
help="Automatically queue from all found anime without prompting for selection.",
)
@click.pass_obj
def queue(config: AppConfig, title: tuple, episode_range: str):
def queue(config: AppConfig, **options):
"""
Searches for an anime and adds the specified episodes to the download queue.
The background worker must be running for the downloads to start.
Search AniList with filters, select one or more anime (or use --yes),
and queue the specified episode range for background download.
The background worker should be running to process the queue.
"""
from fastanime.cli.service.download.service import DownloadService
from fastanime.cli.service.feedback import FeedbackService
from fastanime.cli.service.registry import MediaRegistryService
from fastanime.cli.utils.parser import parse_episode_range
from fastanime.libs.media_api.params import MediaSearchParams
from fastanime.libs.media_api.api import create_api_client
from fastanime.libs.provider.anime.provider import create_provider
from fastanime.libs.selectors import create_selector
from rich.progress import Progress
feedback = FeedbackService(config)
selector = create_selector(config)
media_api = create_api_client(config.general.media_api, config)
provider = create_provider(config.general.provider)
registry = MediaRegistryService(config.general.media_api, config.media_registry)
download_service = DownloadService(config, registry, media_api, provider)
for anime_title in title:
try:
feedback.info(f"Searching for '{anime_title}'...")
search_result = media_api.search_media(
MediaSearchParams(query=anime_title, per_page=1)
)
try:
# Build search params mirroring anilist download
sort_val = options.get("sort")
status_val = options.get("status")
status_not_val = options.get("status_not")
genres_val = options.get("genres")
genres_not_val = options.get("genres_not")
tags_val = options.get("tags")
tags_not_val = options.get("tags_not")
media_format_val = options.get("media_format")
media_type_val = options.get("media_type")
season_val = options.get("season")
year_val = options.get("year")
if not search_result or not search_result.media:
feedback.warning(f"Could not find '{anime_title}' on AniList.")
search_params = MediaSearchParams(
query=options.get("title"),
page=options.get("page", 1),
per_page=options.get("per_page"),
sort=MediaSort(sort_val) if sort_val else None,
status_in=[MediaStatus(s) for s in status_val] if status_val else None,
status_not_in=[MediaStatus(s) for s in status_not_val]
if status_not_val
else None,
genre_in=[MediaGenre(g) for g in genres_val] if genres_val else None,
genre_not_in=[MediaGenre(g) for g in genres_not_val]
if genres_not_val
else None,
tag_in=[MediaTag(t) for t in tags_val] if tags_val else None,
tag_not_in=[MediaTag(t) for t in tags_not_val] if tags_not_val else None,
format_in=[MediaFormat(f) for f in media_format_val]
if media_format_val
else None,
type=MediaType(media_type_val) if media_type_val else None,
season=MediaSeason(season_val) if season_val else None,
seasonYear=int(year_val) if year_val else None,
popularity_greater=options.get("popularity_greater"),
popularity_lesser=options.get("popularity_lesser"),
averageScore_greater=options.get("score_greater"),
averageScore_lesser=options.get("score_lesser"),
startDate_greater=options.get("start_date_greater"),
startDate_lesser=options.get("start_date_lesser"),
endDate_greater=options.get("end_date_greater"),
endDate_lesser=options.get("end_date_lesser"),
on_list=options.get("on_list"),
)
with Progress() as progress:
progress.add_task("Searching AniList...", total=None)
search_result = media_api.search_media(search_params)
if not search_result or not search_result.media:
raise FastAnimeError("No anime found matching your search criteria.")
if options.get("yes"):
anime_to_queue = search_result.media
else:
choice_map: dict[str, MediaItem] = {
(item.title.english or item.title.romaji or f"ID: {item.id}"): item
for item in search_result.media
}
preview_command = None
if config.general.preview != "none":
from ..utils.preview import create_preview_context # type: ignore
with create_preview_context() as preview_ctx:
preview_command = preview_ctx.get_anime_preview(
list(choice_map.values()),
list(choice_map.keys()),
config,
)
selected_titles = selector.choose_multiple(
"Select anime to queue",
list(choice_map.keys()),
preview=preview_command,
)
else:
selected_titles = selector.choose_multiple(
"Select anime to queue", list(choice_map.keys())
)
if not selected_titles:
feedback.warning("No anime selected. Nothing queued.")
return
anime_to_queue = [choice_map[title] for title in selected_titles]
episode_range_str = options.get("episode_range")
total_queued = 0
for media_item in anime_to_queue:
available_episodes = [str(i + 1) for i in range(media_item.episodes or 0)]
if not available_episodes:
feedback.warning(
f"No episode information for '{media_item.title.english}', skipping."
)
continue
media_item = search_result.media[0]
available_episodes = [str(i + 1) for i in range(media_item.episodes or 0)]
episodes_to_queue = list(
parse_episode_range(episode_range, available_episodes)
)
try:
episodes_to_queue = list(
parse_episode_range(episode_range_str, available_episodes)
)
if not episodes_to_queue:
feedback.warning(
f"Episode range '{episode_range_str}' resulted in no episodes for '{media_item.title.english}'."
)
continue
queued_count = 0
for ep in episodes_to_queue:
if download_service.add_to_queue(media_item, ep):
queued_count += 1
queued_count = 0
for ep in episodes_to_queue:
if download_service.add_to_queue(media_item, ep):
queued_count += 1
feedback.success(
f"Successfully queued {queued_count} episodes for '{media_item.title.english}'."
)
total_queued += queued_count
feedback.success(
f"Queued {queued_count} episodes for '{media_item.title.english}'."
)
except (ValueError, IndexError) as e:
feedback.error(
f"Invalid episode range for '{media_item.title.english}': {e}"
)
except FastAnimeError as e:
feedback.error(f"Failed to queue '{anime_title}'", str(e))
except Exception as e:
feedback.error("An unexpected error occurred", str(e))
feedback.success(
f"Done. Total of {total_queued} episode(s) queued across all selections."
)
except FastAnimeError as e:
feedback.error("Queue command failed", str(e))
except Exception as e:
feedback.error("An unexpected error occurred", str(e))

View File

@@ -0,0 +1,3 @@
from .cmd import queue
__all__ = ["queue"]

View File

@@ -0,0 +1,26 @@
import click
from ...utils.lazyloader import LazyGroup
commands = {
"add": "add.add",
"list": "list.list_cmd",
"resume": "resume.resume",
"clear": "clear.clear_cmd",
}
@click.group(
cls=LazyGroup,
name="queue",
root="fastanime.cli.commands.queue.commands",
invoke_without_command=False,
help="Manage the download queue (add, list, resume, clear).",
short_help="Manage the download queue.",
lazy_subcommands=commands,
)
@click.pass_context
def queue(ctx: click.Context):
"""Queue management root command."""
# No-op root; subcommands are lazy-loaded
pass

View File

@@ -0,0 +1,217 @@
import click
from fastanime.core.config import AppConfig
from fastanime.core.exceptions import FastAnimeError
from fastanime.libs.media_api.types import (
MediaFormat,
MediaGenre,
MediaItem,
MediaSeason,
MediaSort,
MediaStatus,
MediaTag,
MediaType,
MediaYear,
)
@click.command(name="add", help="Add episodes to the background download queue.")
@click.option("--title", "-t")
@click.option("--page", "-p", type=click.IntRange(min=1), default=1)
@click.option("--per-page", type=click.IntRange(min=1, max=50))
@click.option("--season", type=click.Choice([s.value for s in MediaSeason]))
@click.option(
"--status", "-S", multiple=True, type=click.Choice([s.value for s in MediaStatus])
)
@click.option(
"--status-not", multiple=True, type=click.Choice([s.value for s in MediaStatus])
)
@click.option("--sort", "-s", type=click.Choice([s.value for s in MediaSort]))
@click.option(
"--genres", "-g", multiple=True, type=click.Choice([g.value for g in MediaGenre])
)
@click.option(
"--genres-not", multiple=True, type=click.Choice([g.value for g in MediaGenre])
)
@click.option(
"--tags", "-T", multiple=True, type=click.Choice([t.value for t in MediaTag])
)
@click.option(
"--tags-not", multiple=True, type=click.Choice([t.value for t in MediaTag])
)
@click.option(
"--media-format",
"-f",
multiple=True,
type=click.Choice([f.value for f in MediaFormat]),
)
@click.option("--media-type", type=click.Choice([t.value for t in MediaType]))
@click.option("--year", "-y", type=click.Choice([y.value for y in MediaYear]))
@click.option("--popularity-greater", type=click.IntRange(min=0))
@click.option("--popularity-lesser", type=click.IntRange(min=0))
@click.option("--score-greater", type=click.IntRange(min=0, max=100))
@click.option("--score-lesser", type=click.IntRange(min=0, max=100))
@click.option("--start-date-greater", type=int)
@click.option("--start-date-lesser", type=int)
@click.option("--end-date-greater", type=int)
@click.option("--end-date-lesser", type=int)
@click.option("--on-list/--not-on-list", "-L/-no-L", type=bool, default=None)
# Queue-specific options
@click.option(
"--episode-range",
"-r",
required=True,
help="Range of episodes to queue (e.g., '1-10', '5', '8:12').",
)
@click.option(
"--yes",
"-Y",
is_flag=True,
help="Queue for all found anime without prompting for selection.",
)
@click.pass_obj
def add(config: AppConfig, **options):
from fastanime.cli.service.download import DownloadService
from fastanime.cli.service.feedback import FeedbackService
from fastanime.cli.service.registry import MediaRegistryService
from fastanime.cli.utils.parser import parse_episode_range
from fastanime.libs.media_api.api import create_api_client
from fastanime.libs.media_api.params import MediaSearchParams
from fastanime.libs.provider.anime.provider import create_provider
from fastanime.libs.selectors import create_selector
from rich.progress import Progress
feedback = FeedbackService(config)
selector = create_selector(config)
media_api = create_api_client(config.general.media_api, config)
provider = create_provider(config.general.provider)
registry = MediaRegistryService(config.general.media_api, config.media_registry)
download_service = DownloadService(config, registry, media_api, provider)
try:
# Build search params mirroring anilist download
sort_val = options.get("sort")
status_val = options.get("status")
status_not_val = options.get("status_not")
genres_val = options.get("genres")
genres_not_val = options.get("genres_not")
tags_val = options.get("tags")
tags_not_val = options.get("tags_not")
media_format_val = options.get("media_format")
media_type_val = options.get("media_type")
season_val = options.get("season")
year_val = options.get("year")
search_params = MediaSearchParams(
query=options.get("title"),
page=options.get("page", 1),
per_page=options.get("per_page"),
sort=MediaSort(sort_val) if sort_val else None,
status_in=[MediaStatus(s) for s in status_val] if status_val else None,
status_not_in=[MediaStatus(s) for s in status_not_val]
if status_not_val
else None,
genre_in=[MediaGenre(g) for g in genres_val] if genres_val else None,
genre_not_in=[MediaGenre(g) for g in genres_not_val]
if genres_not_val
else None,
tag_in=[MediaTag(t) for t in tags_val] if tags_val else None,
tag_not_in=[MediaTag(t) for t in tags_not_val] if tags_not_val else None,
format_in=[MediaFormat(f) for f in media_format_val]
if media_format_val
else None,
type=MediaType(media_type_val) if media_type_val else None,
season=MediaSeason(season_val) if season_val else None,
seasonYear=int(year_val) if year_val else None,
popularity_greater=options.get("popularity_greater"),
popularity_lesser=options.get("popularity_lesser"),
averageScore_greater=options.get("score_greater"),
averageScore_lesser=options.get("score_lesser"),
startDate_greater=options.get("start_date_greater"),
startDate_lesser=options.get("start_date_lesser"),
endDate_greater=options.get("end_date_greater"),
endDate_lesser=options.get("end_date_lesser"),
on_list=options.get("on_list"),
)
with Progress() as progress:
progress.add_task("Searching AniList...", total=None)
search_result = media_api.search_media(search_params)
if not search_result or not search_result.media:
raise FastAnimeError("No anime found matching your search criteria.")
if options.get("yes"):
anime_to_queue = search_result.media
else:
choice_map: dict[str, MediaItem] = {
(item.title.english or item.title.romaji or f"ID: {item.id}"): item
for item in search_result.media
}
preview_command = None
if config.general.preview != "none":
from fastanime.cli.utils.preview import create_preview_context
with create_preview_context() as preview_ctx:
preview_command = preview_ctx.get_anime_preview(
list(choice_map.values()),
list(choice_map.keys()),
config,
)
selected_titles = selector.choose_multiple(
"Select anime to queue",
list(choice_map.keys()),
preview=preview_command,
)
else:
selected_titles = selector.choose_multiple(
"Select anime to queue", list(choice_map.keys())
)
if not selected_titles:
feedback.warning("No anime selected. Nothing queued.")
return
anime_to_queue = [choice_map[title] for title in selected_titles]
episode_range_str = options.get("episode_range")
total_queued = 0
for media_item in anime_to_queue:
# TODO: do a provider search here to determine episodes available maybe, or allow pasing of an episode list probably just change the format for parsing episodes
available_episodes = [str(i + 1) for i in range(media_item.episodes or 0)]
if not available_episodes:
feedback.warning(
f"No episode information for '{media_item.title.english}', skipping."
)
continue
try:
episodes_to_queue = list(
parse_episode_range(episode_range_str, available_episodes)
)
if not episodes_to_queue:
feedback.warning(
f"Episode range '{episode_range_str}' resulted in no episodes for '{media_item.title.english}'."
)
continue
queued_count = 0
for ep in episodes_to_queue:
if download_service.add_to_queue(media_item, ep):
queued_count += 1
total_queued += queued_count
feedback.success(
f"Queued {queued_count} episodes for '{media_item.title.english}'."
)
except (ValueError, IndexError) as e:
feedback.error(
f"Invalid episode range for '{media_item.title.english}': {e}"
)
feedback.success(
f"Done. Total of {total_queued} episode(s) queued across all selections."
)
except FastAnimeError as e:
feedback.error("Queue add failed", str(e))
except Exception as e:
feedback.error("An unexpected error occurred", str(e))

View File

@@ -0,0 +1,30 @@
import click
from fastanime.core.config import AppConfig
@click.command(name="clear", help="Clear queued items from the registry (QUEUED -> NOT_DOWNLOADED).")
@click.option("--force", is_flag=True, help="Do not prompt for confirmation.")
@click.pass_obj
def clear_cmd(config: AppConfig, force: bool):
from fastanime.cli.service.feedback import FeedbackService
from fastanime.cli.service.registry import MediaRegistryService
from fastanime.cli.service.registry.models import DownloadStatus
feedback = FeedbackService(config)
registry = MediaRegistryService(config.general.media_api, config.media_registry)
if not force and not click.confirm("This will clear all queued items. Continue?"):
feedback.info("Aborted.")
return
cleared = 0
queued = registry.get_episodes_by_download_status(DownloadStatus.QUEUED)
for media_id, ep in queued:
ok = registry.update_episode_download_status(
media_id=media_id,
episode_number=ep,
status=DownloadStatus.NOT_DOWNLOADED,
)
if ok:
cleared += 1
feedback.success(f"Cleared {cleared} queued episode(s).")

View File

@@ -0,0 +1,60 @@
import click
from fastanime.core.config import AppConfig
@click.command(name="list", help="List items in the download queue and their statuses.")
@click.option(
"--status",
type=click.Choice(["queued", "downloading", "completed", "failed", "paused"]),
)
@click.option("--detailed", is_flag=True)
@click.pass_obj
def list_cmd(config: AppConfig, status: str | None, detailed: bool | None):
from fastanime.cli.service.feedback import FeedbackService
from fastanime.cli.service.registry import MediaRegistryService
from fastanime.cli.service.registry.models import DownloadStatus
feedback = FeedbackService(config)
registry = MediaRegistryService(config.general.media_api, config.media_registry)
status_map = {
"queued": DownloadStatus.QUEUED,
"downloading": DownloadStatus.DOWNLOADING,
"completed": DownloadStatus.COMPLETED,
"failed": DownloadStatus.FAILED,
"paused": DownloadStatus.PAUSED,
}
# TODO: improve this by modifying the download_status function or create new function
if detailed and status:
target = status_map[status]
episodes = registry.get_episodes_by_download_status(target)
feedback.info(f"{len(episodes)} episode(s) with status {status}.")
for media_id, ep in episodes:
record = registry.get_media_record(media_id)
if record:
feedback.info(f"{record.media_item.title.english} episode {ep}")
return
if status:
target = status_map[status]
episodes = registry.get_episodes_by_download_status(target)
feedback.info(f"{len(episodes)} episode(s) with status {status}.")
for media_id, ep in episodes:
feedback.info(f"- media:{media_id} episode:{ep}")
else:
from rich.console import Console
from rich.table import Table
stats = registry.get_download_statistics()
table = Table(title="Queue Status")
table.add_column("Metric")
table.add_column("Value")
table.add_row("Queued", str(stats.get("queued", 0)))
table.add_row("Downloading", str(stats.get("downloading", 0)))
table.add_row("Completed", str(stats.get("downloaded", 0)))
table.add_row("Failed", str(stats.get("failed", 0)))
table.add_row("Paused", str(stats.get("paused", 0)))
console = Console()
console.print(table)

View File

@@ -0,0 +1,22 @@
import click
from fastanime.core.config import AppConfig
@click.command(name="resume", help="Submit any queued or in-progress downloads to the worker.")
@click.pass_obj
def resume(config: AppConfig):
from fastanime.cli.service.download.service import DownloadService
from fastanime.cli.service.feedback import FeedbackService
from fastanime.cli.service.registry import MediaRegistryService
from fastanime.libs.media_api.api import create_api_client
from fastanime.libs.provider.anime.provider import create_provider
feedback = FeedbackService(config)
media_api = create_api_client(config.general.media_api, config)
provider = create_provider(config.general.provider)
registry = MediaRegistryService(config.general.media_api, config.media_registry)
download_service = DownloadService(config, registry, media_api, provider)
download_service.start()
download_service.resume_unfinished_downloads()
feedback.success("Submitted queued downloads to background worker.")

View File

@@ -11,6 +11,7 @@ def worker(config: AppConfig):
process any queued downloads. It's recommended to run this in the
background (e.g., 'fastanime worker &') or as a system service.
"""
from fastanime.cli.service.auth import AuthService
from fastanime.cli.service.download.service import DownloadService
from fastanime.cli.service.feedback import FeedbackService
from fastanime.cli.service.notification.service import NotificationService
@@ -26,10 +27,17 @@ def worker(config: AppConfig):
# Instantiate services
media_api = create_api_client(config.general.media_api, config)
# Authenticate if credentials exist (enables notifications)
auth = AuthService(config.general.media_api)
if profile := auth.get_auth():
try:
media_api.authenticate(profile.token)
except Exception:
pass
provider = create_provider(config.general.provider)
registry = MediaRegistryService(config.general.media_api, config.media_registry)
notification_service = NotificationService(media_api)
notification_service = NotificationService(config, media_api, registry)
download_service = DownloadService(config, registry, media_api, provider)
worker_service = BackgroundWorkerService(
config.worker, notification_service, download_service

View File

@@ -1,6 +1,9 @@
import textwrap
from enum import Enum
from pathlib import Path
from typing import Any, Literal, get_args, get_origin
from pydantic.fields import FieldInfo
from ...core.config import AppConfig
from ...core.constants import APP_ASCII_ART, DISCORD_INVITE, PROJECT_NAME, REPO_HOME
@@ -11,7 +14,7 @@ config_asci = "\n".join(
)
CONFIG_HEADER = f"""
# ==============================================================================
#
#
{config_asci}
#
# ==============================================================================
@@ -37,26 +40,16 @@ CONFIG_FOOTER = f"""
def generate_config_ini_from_app_model(app_model: AppConfig) -> str:
"""Generate a configuration file content from a Pydantic model."""
model_schema = AppConfig.model_json_schema(mode="serialization")
app_model_dict = app_model.model_dump()
config_ini_content = [CONFIG_HEADER]
for section_name, section_dict in app_model_dict.items():
section_ref = model_schema["properties"][section_name].get("$ref")
if not section_ref:
continue
section_class_name = section_ref.split("/")[-1]
section_schema = model_schema["$defs"][section_class_name]
section_comment = section_schema.get("description", "")
for section_name, section_model in app_model:
section_comment = section_model.model_config.get("title", "")
config_ini_content.append(f"\n#\n# {section_comment}\n#")
config_ini_content.append(f"[{section_name}]")
for field_name, field_value in section_dict.items():
field_properties = section_schema.get("properties", {}).get(field_name, {})
description = field_properties.get("description", "")
for field_name, field_info in section_model.model_fields.items():
description = field_info.description or ""
if description:
wrapped_comment = textwrap.fill(
description,
@@ -66,6 +59,17 @@ def generate_config_ini_from_app_model(app_model: AppConfig) -> str:
)
config_ini_content.append(f"\n{wrapped_comment}")
field_type_comment = _get_field_type_comment(field_info)
if field_type_comment:
wrapped_comment = textwrap.fill(
field_type_comment,
width=78,
initial_indent="# ",
subsequent_indent="# ",
)
config_ini_content.append(wrapped_comment)
field_value = getattr(section_model, field_name)
if isinstance(field_value, bool):
value_str = str(field_value).lower()
elif isinstance(field_value, Path):
@@ -81,3 +85,70 @@ def generate_config_ini_from_app_model(app_model: AppConfig) -> str:
config_ini_content.extend(["\n", CONFIG_FOOTER])
return "\n".join(config_ini_content)
def _get_field_type_comment(field_info: FieldInfo) -> str:
"""Generate a comment with type information for a field."""
field_type = field_info.annotation
# Handle Literal and Enum types
possible_values = []
if field_type is not None:
if isinstance(field_type, type) and issubclass(field_type, Enum):
possible_values = [member.value for member in field_type]
elif hasattr(field_type, "__origin__") and get_origin(field_type) is Literal:
args = get_args(field_type)
if args:
possible_values = list(args)
if possible_values:
return f"Possible values: [ {', '.join(map(str, possible_values))} ]"
# Handle basic types and numeric ranges
type_name = _get_type_name(field_type)
range_info = _get_range_info(field_info)
if range_info:
return f"Type: {type_name} ({range_info})"
elif type_name:
return f"Type: {type_name}"
return ""
def _get_type_name(field_type: Any) -> str:
"""Get a user-friendly name for a field's type."""
if field_type is str:
return "string"
if field_type is int:
return "integer"
if field_type is float:
return "float"
if field_type is bool:
return "boolean"
if field_type is Path:
return "path"
return ""
def _get_range_info(field_info: FieldInfo) -> str:
"""Get a string describing the numeric range of a field."""
constraints = {}
if hasattr(field_info, "metadata") and field_info.metadata:
for constraint in field_info.metadata:
constraint_type = type(constraint).__name__
if constraint_type == "Ge" and hasattr(constraint, "ge"):
constraints["min"] = constraint.ge
elif constraint_type == "Le" and hasattr(constraint, "le"):
constraints["max"] = constraint.le
elif constraint_type == "Gt" and hasattr(constraint, "gt"):
constraints["min"] = constraint.gt + 1
elif constraint_type == "Lt" and hasattr(constraint, "lt"):
constraints["max"] = constraint.lt - 1
if constraints:
min_val = constraints.get("min", "N/A")
max_val = constraints.get("max", "N/A")
return f"Range: {min_val}-{max_val}"
return ""

View File

@@ -7,7 +7,7 @@ from ...state import InternalDirective, MenuName, ProviderState, State
@session.menu
def provider_search(ctx: Context, state: State) -> State | InternalDirective:
from .....core.utils.fuzzy import fuzz
from .....core.utils.normalizer import normalize_title
from .....core.utils.normalizer import normalize_title, update_user_normalizer_json
feedback = ctx.feedback
media_item = state.media_api.media_item
@@ -71,6 +71,12 @@ def provider_search(ctx: Context, state: State) -> State | InternalDirective:
if not chosen_title or chosen_title == "Back":
return InternalDirective.BACK
if selector.confirm(
f"Would you like to update your local normalizer json with: {chosen_title} for {media_title}"
):
update_user_normalizer_json(
chosen_title, media_title, config.general.provider.value
)
selected_provider_anime = provider_results_map[chosen_title]
with feedback.progress(

View File

@@ -0,0 +1,3 @@
from .service import DownloadService
__all__ = ["DownloadService"]

View File

@@ -1,7 +1,9 @@
import logging
from pathlib import Path
from typing import TYPE_CHECKING, List
from ....core.config.model import AppConfig
from ....core.constants import APP_CACHE_DIR
from ....core.downloader import DownloadParams, create_downloader
from ....core.utils.concurrency import ManagedBackgroundWorker, thread_manager
from ....core.utils.fuzzy import fuzz
@@ -21,6 +23,7 @@ if TYPE_CHECKING:
logger = logging.getLogger(__name__)
NOTIFICATION_ICONS_CACHE_DIR = APP_CACHE_DIR / "notification_icons"
class DownloadService:
@@ -31,13 +34,14 @@ class DownloadService:
media_api_service: "BaseApiClient",
provider_service: "BaseAnimeProvider",
):
self.config = config
self.app_config = config
self.registry = registry_service
self.media_api = media_api_service
self.provider = provider_service
self.downloader = create_downloader(config.downloads)
# Track in-flight downloads to avoid duplicate queueing
self._inflight: set[tuple[int, str]] = set()
# Worker is kept for potential future background commands
self._worker = ManagedBackgroundWorker(
max_workers=config.downloads.max_concurrent_downloads,
name="DownloadWorker",
@@ -56,18 +60,25 @@ class DownloadService:
self._worker.shutdown(wait=False)
def add_to_queue(self, media_item: MediaItem, episode_number: str) -> bool:
"""Adds a download job to the ASYNCHRONOUS queue."""
"""Mark an episode as queued in the registry (no immediate download)."""
logger.info(
f"Queueing background download for '{media_item.title.english}' Episode {episode_number}"
f"Queueing episode '{episode_number}' for '{media_item.title.english}' (registry only)"
)
self.registry.get_or_create_record(media_item)
updated = self.registry.update_episode_download_status(
return self.registry.update_episode_download_status(
media_id=media_item.id,
episode_number=episode_number,
status=DownloadStatus.QUEUED,
)
if not updated:
def _submit_download(self, media_item: MediaItem, episode_number: str) -> bool:
"""Submit a download task to the worker if not already in-flight."""
key = (media_item.id, str(episode_number))
if key in self._inflight:
return False
if not self._worker.is_running():
self._worker.start()
self._inflight.add(key)
self._worker.submit_function(
self._execute_download_job, media_item, episode_number
)
@@ -92,6 +103,7 @@ class DownloadService:
def resume_unfinished_downloads(self):
"""Finds and re-queues any downloads that were left in an unfinished state."""
logger.info("Checking for unfinished downloads to resume...")
# TODO: make the checking of unfinished downloads more efficient probably by modifying the registry to be aware of what actually changed and load that instead
queued_jobs = self.registry.get_episodes_by_download_status(
DownloadStatus.QUEUED
)
@@ -108,9 +120,54 @@ class DownloadService:
f"Found {len(unfinished_jobs)} unfinished downloads. Re-queueing..."
)
for media_id, episode_number in unfinished_jobs:
if (media_id, str(episode_number)) in self._inflight:
continue
record = self.registry.get_media_record(media_id)
if record and record.media_item:
self.add_to_queue(record.media_item, episode_number)
self._submit_download(record.media_item, episode_number)
else:
logger.error(
f"Could not find metadata for media ID {media_id}. Cannot resume. Please run 'fastanime registry sync'."
)
def retry_failed_downloads(self):
"""Finds and re-queues any downloads that were left in an unfinished state."""
logger.info("Checking for unfinished downloads to resume...")
# TODO: may need to improve this
queued_jobs = self.registry.get_episodes_by_download_status(
DownloadStatus.FAILED
)
unfinished_jobs = queued_jobs
if not unfinished_jobs:
logger.info("No unfinished downloads found.")
return
logger.info(
f"Found {len(unfinished_jobs)} unfinished downloads. Re-queueing..."
)
for media_id, episode_number in unfinished_jobs:
if (media_id, str(episode_number)) in self._inflight:
continue
record = self.registry.get_media_record(media_id)
if record and record.media_item:
for episode in record.media_episodes:
if episode_number != episode.episode_number:
continue
if (
episode.download_attempts
<= self.app_config.downloads.max_retry_attempts
):
logger.info(
f"Retrying {episode_number} of {record.media_item.title.english}"
)
self._submit_download(record.media_item, episode_number)
else:
logger.info(
f"Max attempts reached for {episode_number} of {record.media_item.title.english}"
)
else:
logger.error(
f"Could not find metadata for media ID {media_id}. Cannot resume. Please run 'fastanime registry sync'."
@@ -130,12 +187,17 @@ class DownloadService:
# 1. Search the provider to get the provider-specific ID
provider_search_results = self.provider.search(
SearchParams(query=media_title)
SearchParams(
query=normalize_title(
media_title, self.app_config.general.provider.value, True
),
translation_type=self.app_config.stream.translation_type,
)
)
if not provider_search_results or not provider_search_results.results:
raise ValueError(
f"Could not find '{media_title}' on provider '{self.config.general.provider.value}'"
f"Could not find '{media_title}' on provider '{self.app_config.general.provider.value}'"
)
# 2. Find the best match using fuzzy logic (like auto-select)
@@ -146,7 +208,7 @@ class DownloadService:
provider_results_map.keys(),
key=lambda p_title: fuzz.ratio(
normalize_title(
p_title, self.config.general.provider.value
p_title, self.app_config.general.provider.value
).lower(),
media_title.lower(),
),
@@ -168,7 +230,7 @@ class DownloadService:
anime_id=provider_anime.id,
query=media_title,
episode=episode_number,
translation_type=self.config.stream.translation_type,
translation_type=self.app_config.stream.translation_type,
)
)
if not streams_iterator:
@@ -178,11 +240,11 @@ class DownloadService:
if not server or not server.links:
raise ValueError(f"No stream links found for Episode {episode_number}")
if server.name != self.config.downloads.server.value:
if server.name != self.app_config.downloads.server.value:
while True:
try:
_server = next(streams_iterator)
if _server.name == self.config.downloads.server.value:
if _server.name == self.app_config.downloads.server.value:
server = _server
break
except StopIteration:
@@ -202,9 +264,9 @@ class DownloadService:
silent=False,
headers=server.headers,
subtitles=[sub.url for sub in server.subtitles],
merge=self.config.downloads.merge_subtitles,
clean=self.config.downloads.cleanup_after_merge,
no_check_certificate=self.config.downloads.no_check_certificate,
merge=self.app_config.downloads.merge_subtitles,
clean=self.app_config.downloads.cleanup_after_merge,
no_check_certificate=self.app_config.downloads.no_check_certificate,
)
result = self.downloader.download(download_params)
@@ -223,19 +285,49 @@ class DownloadService:
file_path=result.merged_path or result.video_path,
file_size=file_size,
quality=stream_link.quality,
provider_name=self.config.general.provider.value,
provider_name=self.app_config.general.provider.value,
server_name=server.name,
subtitle_paths=result.subtitle_paths,
)
logger.info(
f"Successfully downloaded Episode {episode_number} of '{media_title}'"
)
message = f"Successfully downloaded Episode {episode_number} of '{media_title}'"
try:
from plyer import notification
icon_path = self._get_or_fetch_icon(media_item)
app_icon = str(icon_path) if icon_path else None
notification.notify( # type: ignore
title="FastAnime: New Episode",
message=message,
app_name="FastAnime",
app_icon=app_icon,
timeout=20,
)
except:
pass
logger.info(message)
else:
raise ValueError(result.error_message or "Unknown download error")
except Exception as e:
message = f"Download failed for '{media_item.title.english}' Ep {episode_number}: {e}"
try:
from plyer import notification
icon_path = self._get_or_fetch_icon(media_item)
app_icon = str(icon_path) if icon_path else None
notification.notify( # type: ignore
title="FastAnime: New Episode",
message=message,
app_name="FastAnime",
app_icon=app_icon,
timeout=20,
)
except:
pass
logger.error(
f"Download failed for '{media_item.title.english}' Ep {episode_number}: {e}",
message,
exc_info=True,
)
self.registry.update_episode_download_status(
@@ -244,3 +336,39 @@ class DownloadService:
status=DownloadStatus.FAILED,
error_message=str(e),
)
finally:
# Remove from in-flight tracking regardless of outcome
try:
self._inflight.discard((media_item.id, str(episode_number)))
except Exception:
pass
def _get_or_fetch_icon(self, media_item: MediaItem) -> Path | None:
"""Fetch and cache a small cover image for system notifications."""
import httpx
try:
cover = media_item.cover_image
url = None
if cover:
url = cover.extra_large or cover.large or cover.medium
if not url:
return None
cache_dir = NOTIFICATION_ICONS_CACHE_DIR
cache_dir.mkdir(parents=True, exist_ok=True)
icon_path = cache_dir / f"{media_item.id}.png"
if icon_path.exists() and icon_path.stat().st_size > 0:
return icon_path
# Directly download the image bytes without resizing
with httpx.Client(follow_redirects=True, timeout=20) as client:
resp = client.get(url)
resp.raise_for_status()
data = resp.content
if data:
icon_path.write_bytes(data)
return icon_path
except Exception as e:
logger.debug(f"Could not fetch icon for media {media_item.id}: {e}")
return None

View File

@@ -1,41 +1,47 @@
import json
import logging
from typing import Set
from pathlib import Path
from typing import Optional
import httpx
from fastanime.cli.service.registry import MediaRegistryService
from fastanime.cli.service.registry.models import DownloadStatus
from fastanime.core.config.model import AppConfig
from fastanime.core.constants import APP_CACHE_DIR
from fastanime.libs.media_api.base import BaseApiClient
from fastanime.libs.media_api.types import MediaItem, Notification
try:
import plyer
from plyer import notification as plyer_notification
PLYER_AVAILABLE = True
except ImportError:
except ImportError: # pragma: no cover - optional dependency
plyer_notification = None # type: ignore[assignment]
PLYER_AVAILABLE = False
logger = logging.getLogger(__name__)
SEEN_NOTIFICATIONS_CACHE = APP_CACHE_DIR / "seen_notifications.json"
NOTIFICATION_ICONS_CACHE_DIR = APP_CACHE_DIR / "notification_icons"
class NotificationService:
def __init__(self, media_api: BaseApiClient):
def __init__(
self,
app_config: AppConfig,
media_api: BaseApiClient,
registry_service: MediaRegistryService,
):
self.media_api = media_api
self._seen_ids: Set[int] = self._load_seen_ids()
self.app_config = app_config
self.registry = registry_service
def _load_seen_ids(self) -> Set[int]:
if not SEEN_NOTIFICATIONS_CACHE.exists():
return set()
try:
with open(SEEN_NOTIFICATIONS_CACHE, "r") as f:
return set(json.load(f))
except (json.JSONDecodeError, IOError):
return set()
def _save_seen_ids(self):
try:
with open(SEEN_NOTIFICATIONS_CACHE, "w") as f:
json.dump(list(self._seen_ids), f)
except IOError:
logger.error("Failed to save seen notifications cache.")
def _mark_seen(self, notification_id: int, media_id: int, episode: str | None):
if self.registry and episode:
try:
self.registry.update_media_index_entry(
media_id, last_notified_episode=str(episode)
)
except Exception:
logger.debug("Failed to update last_notified_episode in registry")
def check_and_display_notifications(self):
if not PLYER_AVAILABLE:
@@ -53,26 +59,99 @@ class NotificationService:
logger.info("No new notifications found.")
return
new_notifications = [n for n in notifications if n.id not in self._seen_ids]
# Filter out notifications already seen in this session or older than registry marker
filtered: list[Notification] = []
for n in notifications:
if self._is_seen_in_registry(n.media.id, n.episode):
continue
filtered.append(n)
if not new_notifications:
if not filtered:
logger.info("No unseen notifications found.")
return
for notif in new_notifications:
for notif in filtered:
if self.app_config.worker.auto_download_new_episode:
if not self.registry.get_media_record(notif.media.id):
self.registry.get_or_create_record(notif.media)
self.registry.update_episode_download_status(
media_id=notif.media.id,
episode_number=str(notif.episode),
status=DownloadStatus.QUEUED,
)
title = notif.media.title.english or notif.media.title.romaji
message = f"Episode {notif.episode} of {title} has aired!"
# Try to include an image (cover large/extra_large) if available
app_icon: Optional[str] = None
try:
plyer.notification.notify(
icon_path = self._get_or_fetch_icon(notif.media)
app_icon = str(icon_path) if icon_path else None
except Exception:
app_icon = None
try:
# Guard: only call if available
if not PLYER_AVAILABLE or plyer_notification is None:
raise RuntimeError("Notification backend unavailable")
# Assert for type checkers and runtime safety
assert plyer_notification is not None
plyer_notification.notify( # type: ignore
title="FastAnime: New Episode",
message=message,
app_name="FastAnime",
app_icon=app_icon, # plyer supports file paths or URLs depending on platform
timeout=20,
)
logger.info(f"Displayed notification: {message}")
self._seen_ids.add(notif.id)
self._mark_seen(
notif.id,
notif.media.id,
str(notif.episode) if notif.episode is not None else None,
)
except Exception as e:
logger.error(f"Failed to display notification: {e}")
self._save_seen_ids()
def _is_seen_in_registry(self, media_id: int, episode: Optional[int]) -> bool:
if episode is None:
return False
try:
entry = self.registry.get_media_index_entry(media_id)
if not entry or not entry.last_notified_episode:
return False
# Compare numerically
try:
last_ep = float(entry.last_notified_episode)
return float(episode) <= last_ep
except Exception:
return False
except Exception:
return False
def _get_or_fetch_icon(self, media_item: MediaItem) -> Optional[Path]:
"""Fetch and cache a small cover image for system notifications."""
try:
cover = media_item.cover_image
url = None
if cover:
url = cover.extra_large or cover.large or cover.medium
if not url:
return None
cache_dir = NOTIFICATION_ICONS_CACHE_DIR
cache_dir.mkdir(parents=True, exist_ok=True)
icon_path = cache_dir / f"{media_item.id}.png"
if icon_path.exists() and icon_path.stat().st_size > 0:
return icon_path
# Directly download the image bytes without resizing
with httpx.Client(follow_redirects=True, timeout=20) as client:
resp = client.get(url)
resp.raise_for_status()
data = resp.content
if data:
icon_path.write_bytes(data)
return icon_path
except Exception as e:
logger.debug(f"Could not fetch icon for media {media_item.id}: {e}")
return None

View File

@@ -40,8 +40,6 @@ class MediaEpisode(BaseModel):
download_attempts: int = 0 # Number of download attempts
last_error: Optional[str] = None # Last error message if failed
model_config = {"arbitrary_types_allowed": True}
class MediaRecord(BaseModel):
media_item: MediaItem

View File

@@ -85,6 +85,13 @@ class MediaRegistryService:
logger.debug("saved registry index")
def get_seen_notifications(self) -> dict[int, str]:
seen = {}
for id, index_entry in self._load_index().media_index.items():
if episode := index_entry.last_notified_episode:
seen[index_entry.media_id] = episode
return seen
def get_media_index_entry(self, media_id: int) -> Optional[MediaRegistryIndexEntry]:
index = self._load_index()
return index.media_index.get(f"{self._media_api}_{media_id}")
@@ -102,7 +109,7 @@ class MediaRegistryService:
record = MediaRecord.model_validate(data)
logger.debug(f"Loaded media record for {media_id}")
# logger.debug(f"Loaded media record for {media_id}")
return record
def get_or_create_index_entry(self, media_id: int) -> MediaRegistryIndexEntry:
@@ -184,6 +191,8 @@ class MediaRegistryService:
else:
if not index_entry.status:
index_entry.status = UserMediaListStatus.WATCHING
elif index_entry.status == UserMediaListStatus.COMPLETED:
index_entry.status = UserMediaListStatus.REPEATING
if last_watch_position:
index_entry.last_watch_position = last_watch_position
@@ -550,13 +559,12 @@ class MediaRegistryService:
break
if not episode_record:
if not file_path:
logger.error(f"File path required for new episode {episode_number}")
return False
# Allow creation without file_path for queued/in-progress states.
# Only require file_path once the episode is marked COMPLETED.
episode_record = MediaEpisode(
episode_number=episode_number,
file_path=file_path,
download_status=status,
file_path=file_path,
)
record.media_episodes.append(episode_record)
@@ -564,6 +572,12 @@ class MediaRegistryService:
episode_record.download_status = status
if file_path:
episode_record.file_path = file_path
elif status.name == "COMPLETED" and not episode_record.file_path:
logger.warning(
"Completed status set without file_path for media %s episode %s",
media_id,
episode_number,
)
if file_size is not None:
episode_record.file_size = file_size
if quality:

View File

@@ -28,6 +28,11 @@ class WatchHistoryService:
)
status = None
if (
media_item.user_status
and media_item.user_status.status == UserMediaListStatus.COMPLETED
):
status = UserMediaListStatus.REPEATING
self.media_registry.update_media_index_entry(
media_id=media_item.id,
watched=True,

View File

@@ -1,5 +1,8 @@
import logging
import signal
import threading
import time
from typing import Optional
from fastanime.cli.service.download.service import DownloadService
from fastanime.cli.service.notification.service import NotificationService
@@ -18,44 +21,124 @@ class BackgroundWorkerService:
self.config = config
self.notification_service = notification_service
self.download_service = download_service
self.running = True
self._stop_event = threading.Event()
self._signals_installed = False
def run(self):
logger.info("Background worker started.")
last_notification_check = 0
last_download_check = 0
def _install_signal_handlers(self):
"""Install SIGINT/SIGTERM handlers to allow graceful shutdown when run in foreground."""
if self._signals_installed:
return
notification_interval_sec = self.config.notification_check_interval * 60
download_interval_sec = self.config.download_check_interval * 60
self.download_service.start()
try:
while self.running:
current_time = time.time()
# Check for notifications
if current_time - last_notification_check > notification_interval_sec:
try:
self.notification_service.check_and_display_notifications()
except Exception as e:
logger.error(f"Error during notification check: {e}")
last_notification_check = current_time
# Process download queue
if current_time - last_download_check > download_interval_sec:
try:
self.download_service.resume_unfinished_downloads()
except Exception as e:
logger.error(f"Error during download queue processing: {e}")
last_download_check = current_time
# Sleep for a short interval to prevent high CPU usage
time.sleep(30) # Sleep for 30 seconds before next check cycle
except KeyboardInterrupt:
logger.info("Background worker stopped by user.")
def _handler(signum, frame): # noqa: ARG001 (signature fixed by signal)
logger.info(
"Received signal %s, shutting down background worker...", signum
)
self.stop()
try:
signal.signal(signal.SIGINT, _handler)
signal.signal(signal.SIGTERM, _handler)
self._signals_installed = True
except Exception:
# Signal handling may fail in non-main threads or certain environments
logger.debug(
"Signal handlers not installed (non-main thread or unsupported environment)."
)
def run(self):
"""Run the background loop until stopped.
Responsibilities:
- Periodically check AniList notifications (if authenticated & plyer available)
- Periodically resume/process unfinished downloads
- Keep CPU usage low using an event-based wait
- Gracefully terminate on KeyboardInterrupt/SIGTERM
"""
logger.info("Background worker starting...")
# Convert configured minutes to seconds
notification_interval_sec = max(
60, self.config.notification_check_interval * 60
)
download_interval_sec = max(60, self.config.download_check_interval * 60)
download_retry_interval_sec = max(
60, self.config.download_check_failed_interval * 60
)
# Start download worker and attempt resuming pending jobs once at startup
self.download_service.start()
# Schedule the very first execution immediately
next_notification_ts: Optional[float] = 0.0
next_download_ts: Optional[float] = 0.0
next_retry_download_ts: Optional[float] = 0.0
# Install signal handlers if possible
self._install_signal_handlers()
try:
while not self._stop_event.is_set():
now = time.time()
# Check for notifications
if next_notification_ts is not None and now >= next_notification_ts:
try:
logger.info("Checking for notifications...")
self.notification_service.check_and_display_notifications()
except Exception:
logger.exception("Error during notification check")
finally:
next_notification_ts = now + notification_interval_sec
# Process download queue
if next_download_ts is not None and now >= next_download_ts:
try:
self.download_service.resume_unfinished_downloads()
except Exception:
logger.exception("Error during download queue processing")
finally:
next_download_ts = now + download_interval_sec
if next_retry_download_ts is not None and now >= next_retry_download_ts:
try:
self.download_service.retry_failed_downloads()
except Exception:
logger.exception(
"Error during failed download queue processing"
)
finally:
next_retry_download_ts = now + download_retry_interval_sec
# Determine how long to wait until the next scheduled task
next_events = [
t
for t in (
next_notification_ts,
next_download_ts,
next_retry_download_ts,
)
if t is not None
]
if next_events:
time_until_next = max(0.0, min(next_events) - time.time())
else:
time_until_next = 30.0
# Cap wait to react reasonably fast to stop requests
wait_time = min(time_until_next, 30.0)
self._stop_event.wait(timeout=wait_time)
except KeyboardInterrupt:
logger.info("Background worker interrupted by user. Stopping...")
self.stop()
finally:
# Ensure we always stop the download worker
try:
self.download_service.stop()
except Exception:
logger.exception("Failed to stop download service cleanly")
logger.info("Background worker stopped.")
def stop(self):
self.running = False
logger.info("Background worker shutting down.")
if not self._stop_event.is_set():
logger.info("Background worker shutting down...")
self._stop_event.set()

View File

@@ -1,10 +1,14 @@
import logging
import os
import re
from hashlib import sha256
from typing import Dict, List, Optional
import httpx
from ...core.config import AppConfig
from ...core.constants import APP_CACHE_DIR, PLATFORM, SCRIPTS_DIR
from ...core.utils.file import AtomicWriter
from ...libs.media_api.types import (
AiringScheduleResult,
Character,
@@ -14,6 +18,103 @@ from ...libs.media_api.types import (
from . import ansi
from .preview_workers import PreviewWorkerManager
def get_rofi_preview(
media_items: List[MediaItem], titles: List[str], config: AppConfig
) -> str:
# Ensure cache directories exist on startup
IMAGES_CACHE_DIR.mkdir(parents=True, exist_ok=True)
INFO_CACHE_DIR.mkdir(parents=True, exist_ok=True)
return (
"".join(
[
f"{title}\0icon\x1f{_get_image(item)}\n"
for item, title in zip(media_items, titles)
]
)
+ "Back\nExit"
)
def _get_image(item: MediaItem) -> str:
if not item.cover_image:
return ""
hash_id = sha256(item.title.english.encode("utf-8")).hexdigest()
image_path = IMAGES_CACHE_DIR / f"{hash_id}.png"
if image_path.exists():
return str(image_path)
if not item.cover_image.large:
return ""
try:
with httpx.stream(
"GET", item.cover_image.large, follow_redirects=True
) as response:
response.raise_for_status()
with AtomicWriter(image_path, "wb", encoding=None) as f:
for chunk in response.iter_bytes():
f.write(chunk)
return str(image_path)
except Exception as e:
logger.error(f"Failed to download image {item.cover_image.large}: {e}")
return ""
def get_rofi_episode_preview(
episodes: List[str], media_item: MediaItem, config: AppConfig
) -> str:
# Ensure cache directories exist on startup
IMAGES_CACHE_DIR.mkdir(parents=True, exist_ok=True)
INFO_CACHE_DIR.mkdir(parents=True, exist_ok=True)
return (
"".join(
[
f"{episode}\0icon\x1f{_get_episode_image(episode, media_item)}\n"
for episode in episodes
]
)
+ "Back\nExit"
)
def _get_episode_image(episode: str, media_item: MediaItem) -> str:
if media_item.streaming_episodes and media_item.streaming_episodes.get(episode):
stream = media_item.streaming_episodes[episode]
image_url = stream.thumbnail
else:
if not media_item.cover_image:
return ""
image_url = media_item.cover_image.large
if not image_url:
return ""
hash_id = sha256(
f"{media_item.title.english}_Episode_{episode}".encode("utf-8")
).hexdigest()
image_path = IMAGES_CACHE_DIR / f"{hash_id}.png"
if image_path.exists():
return str(image_path)
try:
with httpx.stream("GET", image_url, follow_redirects=True) as response:
response.raise_for_status()
with AtomicWriter(image_path, "wb", encoding=None) as f:
for chunk in response.iter_bytes():
f.write(chunk)
return str(image_path)
except Exception as e:
logger.error(
f"Failed to download image {image_url} for {media_item.title.english}: {e}"
)
return ""
logger = logging.getLogger(__name__)
os.environ["SHELL"] = "bash"
@@ -166,6 +267,9 @@ class PreviewContext:
def get_anime_preview(
items: List[MediaItem], titles: List[str], config: AppConfig
) -> str:
if config.general.selector == "rofi":
return get_rofi_preview(items, titles, config)
"""
Generate anime preview script and start background caching.
@@ -236,6 +340,8 @@ def get_episode_preview(
Returns:
Preview script content for fzf
"""
if config.general.selector == "rofi":
return get_rofi_episode_preview(episodes, media_item, config)
IMAGES_CACHE_DIR.mkdir(parents=True, exist_ok=True)
INFO_CACHE_DIR.mkdir(parents=True, exist_ok=True)

View File

@@ -17,7 +17,57 @@ class GeneralConfig(BaseModel):
default=defaults.GENERAL_PREFERRED_TRACKER,
description=desc.GENERAL_PREFERRED_TRACKER,
)
pygment_style: str = Field(
pygment_style: Literal[
"abap",
"algol",
"algol_nu",
"arduino",
"autumn",
"bw",
"borland",
"coffee",
"colorful",
"default",
"dracula",
"emacs",
"friendly_grayscale",
"friendly",
"fruity",
"github-dark",
"gruvbox-dark",
"gruvbox-light",
"igor",
"inkpot",
"lightbulb",
"lilypond",
"lovelace",
"manni",
"material",
"monokai",
"murphy",
"native",
"nord-darker",
"nord",
"one-dark",
"paraiso-dark",
"paraiso-light",
"pastie",
"perldoc",
"rainbow_dash",
"rrt",
"sas",
"solarized-dark",
"solarized-light",
"staroffice",
"stata-dark",
"stata-light",
"tango",
"trac",
"vim",
"vs",
"xcode",
"zenburn",
] = Field(
default=defaults.GENERAL_PYGMENT_STYLE, description=desc.GENERAL_PYGMENT_STYLE
)
preferred_spinner: Literal[
@@ -244,6 +294,15 @@ class WorkerConfig(OtherConfig):
ge=1,
description="How often to process the download queue (in minutes).",
)
download_check_failed_interval: int = Field(
default=60, # in minutes
ge=1,
description="How often to process the failed download queue (in minutes).",
)
auto_download_new_episode: bool = Field(
default=True,
description="Whether to automatically download a new episode that has been notified",
)
class SessionsConfig(OtherConfig):
@@ -399,7 +458,7 @@ class DownloadsConfig(OtherConfig):
ge=1,
description=desc.DOWNLOADS_MAX_CONCURRENT,
)
retry_attempts: int = Field(
max_retry_attempts: int = Field(
default=defaults.DOWNLOADS_RETRY_ATTEMPTS,
ge=0,
description=desc.DOWNLOADS_RETRY_ATTEMPTS,

View File

@@ -1,4 +1,5 @@
import os
import re
import shutil
import sys
@@ -19,6 +20,36 @@ def is_running_in_termux():
return False
def is_bash_script(text: str) -> bool:
# Normalize line endings
text = text.strip()
# Check for shebang at the top
if text.startswith("#!/bin/bash") or text.startswith("#!/usr/bin/env bash"):
return True
# Look for common bash syntax/keywords
bash_keywords = [
r"\becho\b",
r"\bfi\b",
r"\bthen\b",
r"\bfunction\b",
r"\bfor\b",
r"\bwhile\b",
r"\bdone\b",
r"\bcase\b",
r"\besac\b",
r"\$\(",
r"\[\[",
r"\]\]",
r";;",
]
# Score based on matches
matches = sum(bool(re.search(pattern, text)) for pattern in bash_keywords)
return matches >= 2
def is_running_kitty_terminal() -> bool:
return True if os.environ.get("KITTY_WINDOW_ID") else False

View File

@@ -46,14 +46,19 @@ import json
import logging
from typing import Dict, Optional
from ..constants import ASSETS_DIR
from ..constants import APP_DATA_DIR, ASSETS_DIR
logger = logging.getLogger(__name__)
# Cache for the normalizer data to avoid repeated file reads
_normalizer_cache: Optional[Dict[str, Dict[str, str]]] = None
USER_NORMALIZER_JSON = APP_DATA_DIR / "normalizer.json"
DEFAULT_NORMALIZER_JSON = ASSETS_DIR / "normalizer.json"
# will load one in the config dir if available and merge them
def _load_normalizer_data() -> Dict[str, Dict[str, str]]:
"""
Load the normalizer.json file and cache it.
@@ -70,21 +75,41 @@ def _load_normalizer_data() -> Dict[str, Dict[str, str]]:
if _normalizer_cache is not None:
return _normalizer_cache
normalizer_path = ASSETS_DIR / "normalizer.json"
default_normalizer = {}
user_normalizer = {}
with open(DEFAULT_NORMALIZER_JSON, "r", encoding="utf-8") as f:
default_normalizer: dict = json.load(f)
if USER_NORMALIZER_JSON.exists():
with open(USER_NORMALIZER_JSON, "r", encoding="utf-8") as f:
user_normalizer: dict = json.load(f)
try:
with open(normalizer_path, "r", encoding="utf-8") as f:
_normalizer_cache = json.load(f)
logger.debug("Loaded normalizer data from %s", normalizer_path)
# Type checker now knows _normalizer_cache is not None
assert _normalizer_cache is not None
return _normalizer_cache
except FileNotFoundError:
logger.error("Normalizer file not found at %s", normalizer_path)
raise
except json.JSONDecodeError as e:
logger.error("Invalid JSON in normalizer file: %s", e)
raise
_normalizer_cache = default_normalizer
for key in default_normalizer:
if key in user_normalizer:
_normalizer_cache[key].update(user_normalizer[key])
return _normalizer_cache
def update_user_normalizer_json(
provider_title: str, media_api_title: str, provider_name: str
):
import time
from .file import AtomicWriter
print(
"UPDATING USER NORMALIZER JSON. PLEASE CONTRIBUTE TO THE PROJECT BY OPENING A PR ON GITHUB TO MERGE YOUR NORMALIZER JSON TO MAIN. MAEMOTTE KANSHA SHIMASU :)"
)
print(f"NORMALIZER JSON PATH IS: {USER_NORMALIZER_JSON}")
time.sleep(5)
if not _normalizer_cache:
raise RuntimeError(
"Fatal _normalizer_cache missing this should not be the case : (. Please report"
)
_normalizer_cache[provider_name][provider_title] = media_api_title
with AtomicWriter(USER_NORMALIZER_JSON) as f:
json.dump(_normalizer_cache, f, indent=2)
def provider_title_to_media_api_title(provider_title: str, provider_name: str) -> str:

View File

@@ -31,6 +31,7 @@ class BaseSelector(ABC):
"""
pass
@abstractmethod
def choose_multiple(
self,
prompt: str,
@@ -50,29 +51,7 @@ class BaseSelector(ABC):
Returns:
A list of the chosen items.
"""
# Default implementation: single selection in a loop
selected = []
remaining_choices = choices.copy()
while remaining_choices:
choice = self.choose(
f"{prompt} (Select multiple, empty to finish)",
remaining_choices + ["[DONE] Finish selection"],
preview=preview,
)
if not choice or choice == "[DONE] Finish selection":
break
selected.append(choice)
remaining_choices.remove(choice)
if not self.confirm(
f"Selected: {', '.join(selected)}. Continue selecting?", default=True
):
break
return selected
pass
@abstractmethod
def confirm(self, prompt: str, *, default: bool = False) -> bool:

View File

@@ -141,3 +141,16 @@ class FzfSelector(BaseSelector):
if result.returncode != 0:
return None
return result.stdout.strip()
if __name__ == "__main__":
config = FzfConfig()
selector = FzfSelector(config)
choice = selector.ask("Hello dev :)")
print(choice)
choice = selector.confirm("Hello dev :)")
print(choice)
choice = selector.choose_multiple("What comes first", ["a", "b"])
print(choice)
choice = selector.choose("What comes first", ["a", "b"])
print(choice)

View File

@@ -20,4 +20,27 @@ class InquirerSelector(BaseSelector):
return Confirm.ask(prompt, default=default)
def ask(self, prompt, *, default=None):
return Prompt.ask(prompt=prompt, default=default or "")
return Prompt.ask(prompt=prompt, default=default or None)
def choose_multiple(
self, prompt: str, choices: list[str], preview: str | None = None
) -> list[str]:
return FuzzyPrompt(
message=prompt,
choices=choices,
height="100%",
multiselect=True,
border=True,
).execute()
if __name__ == "__main__":
selector = InquirerSelector()
choice = selector.ask("Hello dev :)")
print(choice)
choice = selector.confirm("Hello dev :)")
print(choice)
choice = selector.choose_multiple("What comes first", ["a", "b"])
print(choice)
choice = selector.choose("What comes first", ["a", "b"])
print(choice)

View File

@@ -2,6 +2,7 @@ import shutil
import subprocess
from ....core.config import RofiConfig
from ....core.utils import detect
from ..base import BaseSelector
@@ -13,13 +14,52 @@ class RofiSelector(BaseSelector):
raise FileNotFoundError("rofi executable not found in PATH.")
def choose(self, prompt, choices, *, preview=None, header=None):
rofi_input = "\n".join(choices)
if preview and detect.is_bash_script(preview):
preview = None
rofi_input = preview if preview else "\n".join(choices)
args = [
self.executable,
"-no-config",
"-theme",
self.config.theme_preview if preview else self.config.theme_main,
"-p",
prompt,
"-i",
"-dmenu",
]
if preview:
args.append("-show-icons")
result = subprocess.run(
args,
input=rofi_input,
stdout=subprocess.PIPE,
text=True,
)
if result:
choice = result.stdout.strip()
return choice
def confirm(self, prompt, *, default=False):
choices = ["Yes", "No"]
default_choice = "Yes" if default else "No"
result = self.choose(prompt, choices, header=f"Default: {default_choice}")
return result == "Yes"
def ask(self, prompt, *, default=None):
return self.choose(prompt, [])
def choose_multiple(
self, prompt: str, choices: list[str], preview: str | None = None
) -> list[str]:
rofi_input = "\n".join(choices)
args = [
self.executable,
"-no-config",
"-theme",
self.config.theme_main,
"-multi-select",
"-p",
prompt,
"-i",
@@ -34,14 +74,18 @@ class RofiSelector(BaseSelector):
if result:
choice = result.stdout.strip()
return choice
return choice.split()
return []
def confirm(self, prompt, *, default=False):
# Maps directly to your existing `confirm` method
# ... (logic from your `Rofi.confirm` method) ...
pass
def ask(self, prompt, *, default=None):
# Maps directly to your existing `ask` method
# ... (logic from your `Rofi.ask` method) ...
pass
if __name__ == "__main__":
config = RofiConfig()
selector = RofiSelector(config)
choice = selector.ask("Hello dev :)")
print(choice)
choice = selector.confirm("Hello dev :)")
print(choice)
choice = selector.choose_multiple("What comes first", ["a", "b"])
print(choice)
choice = selector.choose("What comes first", ["a", "b"])
print(choice)

View File

@@ -23,7 +23,10 @@ standard = [
"yt-dlp>=2025.7.21",
"pycryptodomex>=3.23.0",
]
notifications = ["plyer>=2.1.0"]
notifications = [
"dbus-python>=1.4.0",
"plyer>=2.1.0",
]
mpv = [
"mpv>=1.0.7",
]

8
uv.lock generated
View File

@@ -74,6 +74,12 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/d1/d6/3965ed04c63042e047cb6a3e6ed1a63a35087b6a609aa3a15ed8ac56c221/colorama-0.4.6-py2.py3-none-any.whl", hash = "sha256:4f1d9991f5acc0ca119f9d443620b77f9d6b33703e51011c16baf57afb285fc6", size = 25335, upload-time = "2022-10-25T02:36:20.889Z" },
]
[[package]]
name = "dbus-python"
version = "1.4.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/ff/24/63118050c7dd7be04b1ccd60eab53fef00abe844442e1b6dec92dae505d6/dbus-python-1.4.0.tar.gz", hash = "sha256:991666e498f60dbf3e49b8b7678f5559b8a65034fdf61aae62cdecdb7d89c770", size = 232490, upload-time = "2025-03-13T19:57:54.212Z" }
[[package]]
name = "distlib"
version = "0.4.0"
@@ -122,6 +128,7 @@ mpv = [
{ name = "mpv" },
]
notifications = [
{ name = "dbus-python" },
{ name = "plyer" },
]
standard = [
@@ -150,6 +157,7 @@ dev = [
[package.metadata]
requires-dist = [
{ name = "click", specifier = ">=8.1.7" },
{ name = "dbus-python", marker = "extra == 'notifications'", specifier = ">=1.4.0" },
{ name = "httpx", specifier = ">=0.28.1" },
{ name = "inquirerpy", specifier = ">=0.3.4" },
{ name = "libtorrent", marker = "extra == 'torrent'", specifier = ">=2.0.11" },