feat: menus

This commit is contained in:
Benexl
2025-07-22 00:47:42 +03:00
parent 0e6aeeea18
commit 0ce27f8e50
22 changed files with 235 additions and 539 deletions

View File

@@ -1,8 +1,5 @@
from .anilist import anilist
from .config import config
from .download import download
from .queue import queue
from .search import search
from .service import service
__all__ = ["config", "search", "download", "anilist", "queue", "service"]
__all__ = ["config", "search", "anilist"]

View File

@@ -26,4 +26,4 @@ def anilist(ctx: click.Context, resume: bool):
if ctx.invoked_subcommand is None:
session.load_menus_from_folder()
session.run(config)
session.run(config, resume=resume)

View File

@@ -16,79 +16,41 @@ def episodes(ctx: Context, state: State) -> State | ControlFlow:
provider_anime = state.provider.anime
anilist_anime = state.media_api.anime
config = ctx.config
console = Console()
console.clear()
feedback = ctx.services.feedback
feedback.clear_console()
if not provider_anime or not anilist_anime:
console.print("[bold red]Error: Anime details are missing.[/bold red]")
feedback.error("Error: Anime details are missing.")
return ControlFlow.BACK
# Get the list of episode strings based on the configured translation type
available_episodes = getattr(
provider_anime.episodes, config.stream.translation_type, []
)
if not available_episodes:
console.print(
f"[bold yellow]No '{config.stream.translation_type}' episodes found for this anime.[/bold yellow]"
feedback.warning(
f"No '{config.stream.translation_type}' episodes found for this anime."
)
return ControlFlow.BACK
chosen_episode: str | None = None
if config.stream.continue_from_watch_history:
# Use our new watch history system
from ...utils.watch_history_tracker import get_continue_episode, track_episode_viewing
# Try to get continue episode from watch history
if config.stream.preferred_watch_history == "local":
chosen_episode = get_continue_episode(anilist_anime, available_episodes, prefer_history=True)
if chosen_episode:
click.echo(
f"[cyan]Continuing from local watch history. Auto-selecting episode {chosen_episode}.[/cyan]"
)
# Fallback to AniList progress if local history doesn't have info or preference is remote
if not chosen_episode and config.stream.preferred_watch_history == "remote":
progress = (
anilist_anime.user_status.progress
if anilist_anime.user_status and anilist_anime.user_status.progress
else 0
)
# Calculate the next episode based on progress
next_episode_num = str(progress + 1)
if next_episode_num in available_episodes:
click.echo(
f"[cyan]Continuing from AniList history. Auto-selecting episode {next_episode_num}.[/cyan]"
)
chosen_episode = next_episode_num
else:
# If the next episode isn't available, fall back to the last watched one
last_watched_num = str(progress)
if last_watched_num in available_episodes:
click.echo(
f"[cyan]Next episode ({next_episode_num}) not found. Falling back to last watched episode {last_watched_num}.[/cyan]"
)
chosen_episode = last_watched_num
else:
click.echo(
f"[yellow]Could not find episode based on your watch history. Please select manually.[/yellow]"
)
# TODO: implement watch history logic
pass
if not chosen_episode:
choices = [*sorted(available_episodes, key=float), "Back"]
# Get episode preview command if preview is enabled
preview_command = None
if ctx.config.general.preview != "none":
from ...utils.previews import get_episode_preview
preview_command = get_episode_preview(available_episodes, anilist_anime, ctx.config)
preview_command = get_episode_preview(
available_episodes, anilist_anime, ctx.config
)
chosen_episode_str = ctx.selector.choose(
prompt="Select Episode",
choices=choices,
preview=preview_command
prompt="Select Episode", choices=choices, preview=preview_command
)
if not chosen_episode_str or chosen_episode_str == "Back":
@@ -97,13 +59,12 @@ def episodes(ctx: Context, state: State) -> State | ControlFlow:
chosen_episode = chosen_episode_str
# Track episode selection in watch history (if enabled in config)
if config.stream.continue_from_watch_history and config.stream.preferred_watch_history == "local":
from ...utils.watch_history_tracker import track_episode_viewing
try:
episode_num = int(chosen_episode)
track_episode_viewing(anilist_anime, episode_num, start_tracking=True)
except (ValueError, AttributeError):
pass # Skip tracking if episode number is invalid
if (
config.stream.continue_from_watch_history
and config.stream.preferred_watch_history == "local"
):
# TODO: implement watch history logic
pass
return State(
menu_name="SERVERS",

View File

@@ -1,15 +1,13 @@
import logging
import random
from typing import Callable, Dict, Tuple
from rich.console import Console
from ....libs.api.params import ApiSearchParams, UserListParams
from ....libs.api.types import MediaSearchResult, MediaStatus, UserListStatusType
from ...utils.auth.utils import check_authentication_required, format_auth_menu_header
from ...utils.feedback import create_feedback_manager, execute_with_feedback
from ..session import Context, session
from ..state import ControlFlow, MediaApiState, State
logger = logging.getLogger(__name__)
MenuAction = Callable[
[],
Tuple[str, MediaSearchResult | None, ApiSearchParams | None, UserListParams | None],
@@ -23,10 +21,10 @@ def main(ctx: Context, state: State) -> State | ControlFlow:
Displays top-level categories for the user to browse and select.
"""
icons = ctx.config.general.icons
feedback = create_feedback_manager(icons)
console = Console()
console.clear()
feedback = ctx.services.feedback
feedback.clear_console()
# TODO: Make them just return the modified state or control flow
options: Dict[str, MenuAction] = {
# --- Search-based Actions ---
f"{'🔥 ' if icons else ''}Trending": _create_media_list_action(
@@ -51,41 +49,21 @@ def main(ctx: Context, state: State) -> State | ControlFlow:
f"{'🎲 ' if icons else ''}Random": _create_random_media_list(ctx),
f"{'🔎 ' if icons else ''}Search": _create_search_media_list(ctx),
# --- Authenticated User List Actions ---
f"{'📺 ' if icons else ''}Watching": _create_user_list_action(ctx, "CURRENT"),
f"{'📑 ' if icons else ''}Planned": _create_user_list_action(ctx, "PLANNING"),
f"{'📺 ' if icons else ''}Watching": _create_user_list_action(ctx, "watching"),
f"{'📑 ' if icons else ''}Planned": _create_user_list_action(ctx, "planning"),
f"{'' if icons else ''}Completed": _create_user_list_action(
ctx, "COMPLETED"
ctx, "completed"
),
f"{'⏸️ ' if icons else ''}Paused": _create_user_list_action(ctx, "PAUSED"),
f"{'🚮 ' if icons else ''}Dropped": _create_user_list_action(ctx, "DROPPED"),
f"{'⏸️ ' if icons else ''}Paused": _create_user_list_action(ctx, "paused"),
f"{'🚮 ' if icons else ''}Dropped": _create_user_list_action(ctx, "dropped"),
f"{'🔁 ' if icons else ''}Rewatching": _create_user_list_action(
ctx, "REPEATING"
ctx, "repeating"
),
# --- List Management ---
f"{'📚 ' if icons else ''}AniList Lists Manager": lambda: (
"ANILIST_LISTS",
None,
None,
None,
),
f"{'📖 ' if icons else ''}Local Watch History": lambda: (
"WATCH_HISTORY",
None,
None,
None,
),
# --- Authentication and Account Management ---
f"{'🔐 ' if icons else ''}Authentication": lambda: ("AUTH", None, None, None),
# --- Control Flow and Utility Options ---
f"{'🔧 ' if icons else ''}Session Management": lambda: (
"SESSION_MANAGEMENT",
None,
None,
None,
),
f"{'📝 ' if icons else ''}Edit Config": lambda: (
"RELOAD_CONFIG",
None,
f"{'🔁 ' if icons else ''}Recent": lambda: (
"RESULTS",
ctx.services.media_registry.get_recently_watched(
ctx.config.anilist.per_page
),
None,
None,
),
@@ -95,7 +73,6 @@ def main(ctx: Context, state: State) -> State | ControlFlow:
choice_str = ctx.selector.choose(
prompt="Select Category",
choices=list(options.keys()),
header=format_auth_menu_header(ctx.media_api, "FastAnime Main Menu", icons),
)
if not choice_str:
@@ -145,93 +122,42 @@ def _create_media_list_action(
"""A factory to create menu actions for fetching media lists"""
def action():
feedback = create_feedback_manager(ctx.config.general.icons)
# Create the search parameters
search_params = ApiSearchParams(
sort=sort, per_page=ctx.config.anilist.per_page, status=status
)
def fetch_data():
return ctx.media_api.search_media(search_params)
result = ctx.media_api.search_media(search_params)
success, result = execute_with_feedback(
fetch_data,
feedback,
"fetch anime list",
loading_msg="Fetching anime",
success_msg="Anime list loaded successfully",
)
# Return the search parameters along with the result for pagination
return (
("RESULTS", result, search_params, None)
if success
else ("CONTINUE", None, None, None)
)
return ("RESULTS", result, search_params, None)
return action
def _create_random_media_list(ctx: Context) -> MenuAction:
def action():
feedback = create_feedback_manager(ctx.config.general.icons)
# Create the search parameters
search_params = ApiSearchParams(
id_in=random.sample(range(1, 160000), k=50),
per_page=ctx.config.anilist.per_page,
)
def fetch_data():
return ctx.media_api.search_media(search_params)
result = ctx.media_api.search_media(search_params)
success, result = execute_with_feedback(
fetch_data,
feedback,
"fetch random anime",
loading_msg="Fetching random anime",
success_msg="Random anime loaded successfully",
)
# Return the search parameters along with the result for pagination
return (
("RESULTS", result, search_params, None)
if success
else ("CONTINUE", None, None, None)
)
return ("RESULTS", result, search_params, None)
return action
def _create_search_media_list(ctx: Context) -> MenuAction:
def action():
feedback = create_feedback_manager(ctx.config.general.icons)
query = ctx.selector.ask("Search for Anime")
if not query:
return "CONTINUE", None, None, None
# Create the search parameters
search_params = ApiSearchParams(query=query)
result = ctx.media_api.search_media(search_params)
def fetch_data():
return ctx.media_api.search_media(search_params)
success, result = execute_with_feedback(
fetch_data,
feedback,
"search anime",
loading_msg=f"Searching for '{query}'",
success_msg=f"Search results for '{query}' loaded successfully",
)
# Return the search parameters along with the result for pagination
return (
("RESULTS", result, search_params, None)
if success
else ("CONTINUE", None, None, None)
)
return ("RESULTS", result, search_params, None)
return action
@@ -240,35 +166,17 @@ def _create_user_list_action(ctx: Context, status: UserListStatusType) -> MenuAc
"""A factory to create menu actions for fetching user lists, handling authentication."""
def action():
feedback = create_feedback_manager(ctx.config.general.icons)
# Check authentication
if not check_authentication_required(
ctx.media_api, feedback, f"view your {status.lower()} list"
):
if not ctx.media_api.is_authenticated():
logger.warning("Not authenticated")
return "CONTINUE", None, None, None
# Create the user list parameters
user_list_params = UserListParams(
status=status, per_page=ctx.config.anilist.per_page
)
def fetch_data():
return ctx.media_api.fetch_user_list(user_list_params)
result = ctx.media_api.fetch_user_list(user_list_params)
success, result = execute_with_feedback(
fetch_data,
feedback,
f"fetch {status.lower()} list",
loading_msg=f"Fetching your {status.lower()} list",
success_msg=f"Your {status.lower()} list loaded successfully",
)
# Return the user list parameters along with the result for pagination
return (
("RESULTS", result, None, user_list_params)
if success
else ("CONTINUE", None, None, None)
)
return ("RESULTS", result, None, user_list_params)
return action

View File

@@ -6,8 +6,6 @@ from rich.console import Console
from ....libs.api.params import UpdateListEntryParams
from ....libs.api.types import MediaItem
from ....libs.players.params import PlayerParams
from ...utils.feedback import create_feedback_manager, execute_with_feedback
from ...utils.auth.utils import check_authentication_required, get_auth_status_indicator
from ..session import Context, session
from ..state import ControlFlow, ProviderState, State
@@ -16,35 +14,25 @@ MenuAction = Callable[[], State | ControlFlow]
@session.menu
def media_actions(ctx: Context, state: State) -> State | ControlFlow:
"""
Displays actions for a single, selected anime, such as streaming,
viewing details, or managing its status on the user's list.
"""
icons = ctx.config.general.icons
# Get authentication status for display
auth_status, user_profile = get_auth_status_indicator(ctx.media_api, icons)
# Create header with auth status
anime = state.media_api.anime
anime_title = anime.title.english or anime.title.romaji if anime else "Unknown"
header = f"Actions for: {anime_title}\n{auth_status}"
# TODO: Add 'Recommendations' and 'Relations' here later.
# TODO: Add media list management
# TODO: cross reference for none implemented features
options: Dict[str, MenuAction] = {
f"{'▶️ ' if icons else ''}Stream": _stream(ctx, state),
f"{'📼 ' if icons else ''}Watch Trailer": _watch_trailer(ctx, state),
f"{' ' if icons else ''}Add/Update List": _add_to_list(ctx, state),
f"{'' if icons else ''}Score Anime": _score_anime(ctx, state),
f"{'<EFBFBD> ' if icons else ''}Manage in Lists": _manage_in_lists(ctx, state),
f"{'<EFBFBD>📚 ' if icons else ''}Add to Local History": _add_to_local_history(ctx, state),
f"{' ' if icons else ''}View Info": _view_info(ctx, state),
f"{'🔙 ' if icons else ''}Back to Results": lambda: ControlFlow.BACK,
}
# --- Prompt and Execute ---
choice_str = ctx.selector.choose(
prompt="Select Action", choices=list(options.keys()), header=header
prompt="Select Action",
choices=list(options.keys()),
)
if choice_str and choice_str in options:
@@ -67,7 +55,7 @@ def _stream(ctx: Context, state: State) -> MenuAction:
def _watch_trailer(ctx: Context, state: State) -> MenuAction:
def action():
feedback = create_feedback_manager(ctx.config.general.icons)
feedback = ctx.services.feedback
anime = state.media_api.anime
if not anime:
return ControlFlow.CONTINUE
@@ -79,17 +67,8 @@ def _watch_trailer(ctx: Context, state: State) -> MenuAction:
else:
trailer_url = f"https://www.youtube.com/watch?v={anime.trailer.id}"
def play_trailer():
ctx.player.play(PlayerParams(url=trailer_url, title=""))
ctx.player.play(PlayerParams(url=trailer_url, title=""))
execute_with_feedback(
play_trailer,
feedback,
"play trailer",
loading_msg=f"Playing trailer for '{anime.title.english or anime.title.romaji}'",
success_msg="Trailer started successfully",
show_loading=False,
)
return ControlFlow.CONTINUE
return action
@@ -97,25 +76,28 @@ def _watch_trailer(ctx: Context, state: State) -> MenuAction:
def _add_to_list(ctx: Context, state: State) -> MenuAction:
def action():
feedback = create_feedback_manager(ctx.config.general.icons)
feedback = ctx.services.feedback
anime = state.media_api.anime
if not anime:
return ControlFlow.CONTINUE
# Check authentication before proceeding
if not check_authentication_required(
ctx.media_api, feedback, "add anime to your list"
):
if not ctx.media_api.is_authenticated():
return ControlFlow.CONTINUE
choices = ["CURRENT", "PLANNING", "COMPLETED", "DROPPED", "PAUSED", "REPEATING"]
choices = [
"watching",
"planning",
"completed",
"dropped",
"paused",
"repeating",
]
status = ctx.selector.choose("Select list status:", choices=choices)
if status:
# status is now guaranteed to be one of the valid choices
_update_user_list_with_feedback(
_update_user_list(
ctx,
anime,
UpdateListEntryParams(media_id=anime.id, status=status), # type: ignore
UpdateListEntryParams(media_id=anime.id, status=status), # pyright:ignore
feedback,
)
return ControlFlow.CONTINUE
@@ -125,13 +107,13 @@ def _add_to_list(ctx: Context, state: State) -> MenuAction:
def _score_anime(ctx: Context, state: State) -> MenuAction:
def action():
feedback = create_feedback_manager(ctx.config.general.icons)
feedback = ctx.services.feedback
anime = state.media_api.anime
if not anime:
return ControlFlow.CONTINUE
# Check authentication before proceeding
if not check_authentication_required(ctx.media_api, feedback, "score anime"):
if not ctx.media_api.is_authenticated():
return ControlFlow.CONTINUE
score_str = ctx.selector.ask("Enter score (0.0 - 10.0):")
@@ -139,7 +121,7 @@ def _score_anime(ctx: Context, state: State) -> MenuAction:
score = float(score_str) if score_str else 0.0
if not 0.0 <= score <= 10.0:
raise ValueError("Score out of range.")
_update_user_list_with_feedback(
_update_user_list(
ctx,
anime,
UpdateListEntryParams(media_id=anime.id, score=score),
@@ -159,8 +141,8 @@ def _view_info(ctx: Context, state: State) -> MenuAction:
anime = state.media_api.anime
if not anime:
return ControlFlow.CONTINUE
# Placeholder for a more detailed info screen if needed.
# For now, we'll just print key details.
# TODO: Make this nice and include all other media item fields
from rich import box
from rich.panel import Panel
from rich.text import Text
@@ -185,133 +167,10 @@ def _view_info(ctx: Context, state: State) -> MenuAction:
return action
def _update_user_list(ctx: Context, anime: MediaItem, params: UpdateListEntryParams):
"""Helper to call the API to update a user's list and show feedback."""
# if not ctx.media_api.user_profile:
# click.echo("[bold yellow]You must be logged in to modify your list.[/]")
# return
success = ctx.media_api.update_list_entry(params)
if success:
click.echo(
f"[bold green]Successfully updated '{anime.title.english or anime.title.romaji}' on your list![/]"
)
else:
click.echo("[bold red]Failed to update list entry.[/bold red]")
def _update_user_list_with_feedback(
def _update_user_list(
ctx: Context, anime: MediaItem, params: UpdateListEntryParams, feedback
):
"""Helper to call the API to update a user's list with comprehensive feedback."""
# Authentication check is handled by the calling functions now
# This function assumes authentication has already been verified
def update_operation():
return ctx.media_api.update_list_entry(params)
anime_title = anime.title.english or anime.title.romaji
success, result = execute_with_feedback(
update_operation,
feedback,
"update anime list",
loading_msg=f"Updating '{anime_title}' on your list",
success_msg=f"Successfully updated '{anime_title}' on your list!",
error_msg="Failed to update list entry",
show_loading=False,
)
def _add_to_local_history(ctx: Context, state: State) -> MenuAction:
"""Add anime to local watch history with status selection."""
def action() -> State | ControlFlow:
anime = state.media_api.anime
if not anime:
click.echo("[bold red]No anime data available.[/bold red]")
return ControlFlow.CONTINUE
feedback = create_feedback_manager(ctx.config.general.icons)
# Check if already in watch history
from ...utils.watch_history_manager import WatchHistoryManager
history_manager = WatchHistoryManager()
existing_entry = history_manager.get_entry(anime.id)
if existing_entry:
# Ask if user wants to update existing entry
if not feedback.confirm(f"'{existing_entry.get_display_title()}' is already in your local watch history. Update it?"):
return ControlFlow.CONTINUE
# Status selection
statuses = ["watching", "completed", "planning", "paused", "dropped"]
status_choices = [status.title() for status in statuses]
chosen_status = ctx.selector.choose(
"Select status for local watch history:",
choices=status_choices + ["Cancel"]
)
if not chosen_status or chosen_status == "Cancel":
return ControlFlow.CONTINUE
status = chosen_status.lower()
# Episode number if applicable
episode = 0
if status in ["watching", "completed"]:
if anime.episodes and anime.episodes > 1:
episode_str = ctx.selector.ask(f"Enter current episode (1-{anime.episodes}, default: 0):")
try:
episode = int(episode_str) if episode_str else 0
episode = max(0, min(episode, anime.episodes))
except ValueError:
episode = 0
# Mark as completed if status is completed
if status == "completed" and anime.episodes:
episode = anime.episodes
# Add to watch history
from ...utils.watch_history_tracker import watch_tracker
success = watch_tracker.add_anime_to_history(anime, status)
if success and episode > 0:
# Update episode progress
history_manager.mark_episode_watched(anime.id, episode, 1.0 if status == "completed" else 0.0)
if success:
feedback.success(f"Added '{anime.title.english or anime.title.romaji}' to local watch history with status: {status}")
else:
feedback.error("Failed to add anime to local watch history")
if ctx.media_api.is_authenticated():
return ControlFlow.CONTINUE
return action
def _manage_in_lists(ctx: Context, state: State) -> MenuAction:
def action():
feedback = create_feedback_manager(ctx.config.general.icons)
anime = state.media_api.anime
if not anime:
return ControlFlow.CONTINUE
# Check authentication before proceeding
if not check_authentication_required(
ctx.media_api, feedback, "manage anime in your lists"
):
return ControlFlow.CONTINUE
# Navigate to AniList anime details with this specific anime
return State(
menu_name="ANILIST_ANIME_DETAILS",
data={
"anime": anime,
"list_status": "CURRENT", # Default status, will be updated when loaded
"return_page": 1,
"from_media_actions": True # Flag to return here instead of lists
}
)
return action
ctx.media_api.update_list_entry(params)

View File

@@ -1,27 +1,18 @@
from typing import TYPE_CHECKING
import click
from rich.console import Console
from rich.progress import Progress
from thefuzz import fuzz
from ....libs.providers.anime.params import SearchParams
from ...utils.feedback import create_feedback_manager, execute_with_feedback
from ....libs.providers.anime.types import SearchResult
from ..session import Context, session
from ..state import ControlFlow, ProviderState, State
if TYPE_CHECKING:
from ....libs.providers.anime.types import SearchResult
@session.menu
def provider_search(ctx: Context, state: State) -> State | ControlFlow:
"""
Searches for the selected AniList anime on the configured provider.
This state allows the user to confirm the correct provider entry before
proceeding to list episodes.
"""
feedback = create_feedback_manager(ctx.config.general.icons)
feedback = ctx.services.feedback
anilist_anime = state.media_api.anime
if not anilist_anime:
feedback.error("No AniList anime to search for", "Please select an anime first")
@@ -30,8 +21,7 @@ def provider_search(ctx: Context, state: State) -> State | ControlFlow:
provider = ctx.provider
selector = ctx.selector
config = ctx.config
console = Console()
console.clear()
feedback.clear_console()
anilist_title = anilist_anime.title.english or anilist_anime.title.romaji
if not anilist_title:
@@ -41,34 +31,19 @@ def provider_search(ctx: Context, state: State) -> State | ControlFlow:
)
return ControlFlow.BACK
# --- Perform Search on Provider ---
def search_provider():
return provider.search(
SearchParams(
query=anilist_title, translation_type=config.stream.translation_type
)
provider_search_results = provider.search(
SearchParams(
query=anilist_title, translation_type=config.stream.translation_type
)
success, provider_search_results = execute_with_feedback(
search_provider,
feedback,
"search provider",
loading_msg=f"Searching for '{anilist_title}' on {provider.__class__.__name__}",
success_msg=f"Found results on {provider.__class__.__name__}",
)
if (
not success
or not provider_search_results
or not provider_search_results.results
):
if not provider_search_results or not provider_search_results.results:
feedback.warning(
f"Could not find '{anilist_title}' on {provider.__class__.__name__}",
"Try another provider from the config or go back to search again",
)
return ControlFlow.BACK
# --- Map results for selection ---
provider_results_map: dict[str, SearchResult] = {
result.title: result for result in provider_search_results.results
}
@@ -82,7 +57,7 @@ def provider_search(ctx: Context, state: State) -> State | ControlFlow:
provider_results_map.keys(),
key=lambda p_title: fuzz.ratio(p_title.lower(), anilist_title.lower()),
)
console.print(f"[cyan]Auto-selecting best match:[/] {best_match_title}")
feedback.info("Auto-selecting best match: {best_match_title}")
selected_provider_anime = provider_results_map[best_match_title]
else:
choices = list(provider_results_map.keys())
@@ -108,8 +83,8 @@ def provider_search(ctx: Context, state: State) -> State | ControlFlow:
full_provider_anime = provider.get(AnimeParams(id=selected_provider_anime.id))
if not full_provider_anime:
console.print(
f"[bold red]Failed to fetch details for '{selected_provider_anime.title}'.[/bold red]"
feedback.warning(
f"Failed to fetch details for '{selected_provider_anime.title}'."
)
return ControlFlow.BACK

View File

@@ -1,88 +1,74 @@
from rich.console import Console
from ....libs.api.types import MediaItem
from ....libs.api.params import ApiSearchParams, UserListParams
from ...utils.auth.utils import get_auth_status_indicator
from ...utils.feedback import create_feedback_manager, execute_with_feedback
from ....libs.api.types import MediaItem
from ..session import Context, session
from ..state import ControlFlow, MediaApiState, State
@session.menu
def results(ctx: Context, state: State) -> State | ControlFlow:
"""
Displays a paginated list of anime from a search or category query.
Allows the user to select an anime to view its actions or navigate pages.
"""
search_results = state.media_api.search_results
console = Console()
console.clear()
feedback = ctx.services.feedback
feedback.clear_console()
if not search_results or not search_results.media:
console.print(
"[bold yellow]No anime found for the given criteria.[/bold yellow]"
)
feedback.info("No anime found for the given criteria")
return ControlFlow.BACK
# --- Prepare choices and previews ---
anime_items = search_results.media
formatted_titles = [
_format_anime_choice(anime, ctx.config) for anime in anime_items
]
# Map formatted titles back to the original MediaItem objects
anime_map = dict(zip(formatted_titles, anime_items))
preview_command = None
if ctx.config.general.preview != "none":
# This function will start background jobs to cache preview data
from ...utils.previews import get_anime_preview
preview_command = get_anime_preview(anime_items, formatted_titles, ctx.config)
# --- Build Navigation and Final Choice List ---
choices = formatted_titles
page_info = search_results.page_info
# Add pagination controls if available with more descriptive text
if page_info.has_next_page:
choices.append(f"{'➡️ ' if ctx.config.general.icons else ''}Next Page (Page {page_info.current_page + 1})")
choices.append(
f"{'➡️ ' if ctx.config.general.icons else ''}Next Page (Page {page_info.current_page + 1})"
)
if page_info.current_page > 1:
choices.append(f"{'⬅️ ' if ctx.config.general.icons else ''}Previous Page (Page {page_info.current_page - 1})")
choices.append(
f"{'⬅️ ' if ctx.config.general.icons else ''}Previous Page (Page {page_info.current_page - 1})"
)
choices.append("Back")
# Create header with auth status and pagination info
auth_status, _ = get_auth_status_indicator(ctx.media_api, ctx.config.general.icons)
pagination_info = f"Page {page_info.current_page}"
if page_info.total > 0 and page_info.per_page > 0:
total_pages = (page_info.total + page_info.per_page - 1) // page_info.per_page
pagination_info += f" of ~{total_pages}"
header = f"Search Results ({len(anime_items)} anime) - {pagination_info}\n{auth_status}"
# --- Prompt User ---
choice_str = ctx.selector.choose(
prompt="Select Anime",
choices=choices,
preview=preview_command,
header=header,
)
if not choice_str:
return ControlFlow.EXIT
# --- Handle User Selection ---
if choice_str == "Back":
return ControlFlow.BACK
# Handle pagination - check for both old and new formats
if (choice_str == "Next Page" or choice_str == "Previous Page" or
choice_str.startswith("Next Page (") or choice_str.startswith("Previous Page (")):
if (
choice_str == "Next Page"
or choice_str == "Previous Page"
or choice_str.startswith("Next Page (")
or choice_str.startswith("Previous Page (")
):
page_delta = 1 if choice_str.startswith("Next Page") else -1
# Implement pagination logic
return _handle_pagination(ctx, state, page_delta)
# If an anime was selected, transition to the MEDIA_ACTIONS state
selected_anime = anime_map.get(choice_str)
if selected_anime:
return State(
@@ -91,7 +77,6 @@ def results(ctx: Context, state: State) -> State | ControlFlow:
search_results=state.media_api.search_results, # Carry over the list
anime=selected_anime, # Set the newly selected item
),
# Persist provider state if it exists
provider=state.provider,
)
@@ -125,36 +110,38 @@ def _format_anime_choice(anime: MediaItem, config) -> str:
return display_title
def _handle_pagination(ctx: Context, state: State, page_delta: int) -> State | ControlFlow:
def _handle_pagination(
ctx: Context, state: State, page_delta: int
) -> State | ControlFlow:
"""
Handle pagination by fetching the next or previous page of results.
Args:
ctx: The application context
state: Current state containing search results and original parameters
page_delta: +1 for next page, -1 for previous page
Returns:
New State with updated search results or ControlFlow.CONTINUE on error
"""
feedback = create_feedback_manager(ctx.config.general.icons)
feedback = ctx.services.feedback
if not state.media_api.search_results:
feedback.error("No search results available for pagination")
return ControlFlow.CONTINUE
current_page = state.media_api.search_results.page_info.current_page
new_page = current_page + page_delta
# Validate page bounds
if new_page < 1:
feedback.warning("Already at the first page")
return ControlFlow.CONTINUE
if page_delta > 0 and not state.media_api.search_results.page_info.has_next_page:
feedback.warning("No more pages available")
return ControlFlow.CONTINUE
# Determine which type of search to perform based on stored parameters
if state.media_api.original_api_params:
# Media search (trending, popular, search, etc.)
@@ -167,13 +154,15 @@ def _handle_pagination(ctx: Context, state: State, page_delta: int) -> State | C
return ControlFlow.CONTINUE
def _fetch_media_page(ctx: Context, state: State, page: int, feedback) -> State | ControlFlow:
def _fetch_media_page(
ctx: Context, state: State, page: int, feedback
) -> State | ControlFlow:
"""Fetch a specific page for media search results."""
original_params = state.media_api.original_api_params
if not original_params:
feedback.error("No original API parameters found")
return ControlFlow.CONTINUE
# Create new parameters with updated page number
new_params = ApiSearchParams(
query=original_params.query,
@@ -203,23 +192,9 @@ def _fetch_media_page(ctx: Context, state: State, page: int, feedback) -> State
type=original_params.type,
on_list=original_params.on_list,
)
def fetch_data():
return ctx.media_api.search_media(new_params)
success, result = execute_with_feedback(
fetch_data,
feedback,
f"fetch page {page}",
loading_msg=f"Loading page {page}",
success_msg=f"Page {page} loaded successfully",
show_loading=False,
)
if not success or not result:
return ControlFlow.CONTINUE
# Return new state with updated results
result = ctx.media_api.search_media(new_params)
return State(
menu_name="RESULTS",
media_api=MediaApiState(
@@ -231,36 +206,24 @@ def _fetch_media_page(ctx: Context, state: State, page: int, feedback) -> State
)
def _fetch_user_list_page(ctx: Context, state: State, page: int, feedback) -> State | ControlFlow:
def _fetch_user_list_page(
ctx: Context, state: State, page: int, feedback
) -> State | ControlFlow:
"""Fetch a specific page for user list results."""
original_params = state.media_api.original_user_list_params
if not original_params:
feedback.error("No original user list parameters found")
return ControlFlow.CONTINUE
# Create new parameters with updated page number
new_params = UserListParams(
status=original_params.status,
page=page,
per_page=original_params.per_page,
)
def fetch_data():
return ctx.media_api.fetch_user_list(new_params)
success, result = execute_with_feedback(
fetch_data,
feedback,
f"fetch page {page} of {original_params.status.lower()} list",
loading_msg=f"Loading page {page}",
success_msg=f"Page {page} loaded successfully",
show_loading=False,
)
if not success or not result:
return ControlFlow.CONTINUE
# Return new state with updated results
result = ctx.media_api.fetch_user_list(new_params)
return State(
menu_name="RESULTS",
media_api=MediaApiState(

View File

@@ -1,6 +1,5 @@
from typing import Dict, List
import click
from rich.console import Console
from rich.progress import Progress
@@ -100,6 +99,10 @@ def servers(ctx: Context, state: State) -> State | ControlFlow:
headers=selected_server.headers,
)
)
if state.media_api.anime and state.provider.episode_number:
ctx.services.watch_history.track(
state.media_api.anime, state.provider.episode_number, player_result
)
return State(
menu_name="PLAYER_CONTROLS",

View File

@@ -121,7 +121,7 @@ class Session:
try:
self._run_main_loop()
except Exception as e:
self._context.services.session.save_session(self._history)
self._context.services.session.create_crash_backup(self._history)
raise
self._context.services.session.save_session(self._history)

View File

@@ -106,3 +106,6 @@ class FeedbackService:
"""Show detailed information in a styled panel."""
console.print(Panel(content, title=title, border_style=style, expand=True))
self.pause_for_user()
def clear_console(self):
console.clear()

View File

@@ -8,7 +8,12 @@ from ....core.config.model import MediaRegistryConfig
from ....core.exceptions import FastAnimeError
from ....core.utils.file import AtomicWriter, FileLock, check_file_modified
from ....libs.api.params import ApiSearchParams
from ....libs.api.types import MediaItem, UserListStatusType
from ....libs.api.types import (
MediaItem,
MediaSearchResult,
PageInfo,
UserListStatusType,
)
from .filters import MediaFilter
from .models import (
REGISTRY_VERSION,
@@ -26,10 +31,12 @@ class MediaRegistryService:
self.media_registry_dir = self.config.media_dir / media_api
self._media_api = media_api
self._ensure_directories()
self._index = None
self._index_file = self.config.index_dir / "registry.json"
self._index_file_modified_time = 0
_lock_file = self.config.media_dir / "registry.lock"
self._lock = FileLock(_lock_file)
self._load_index()
def _ensure_directories(self) -> None:
"""Ensure registry directories exist."""
@@ -68,7 +75,7 @@ class MediaRegistryService:
with self._lock:
index.last_updated = datetime.now()
with AtomicWriter(self._index_file) as f:
json.dump(index.model_dump(), f, indent=2)
json.dump(index.model_dump(mode="json"), f, indent=2)
logger.debug("saved registry index")
@@ -106,23 +113,22 @@ class MediaRegistryService:
return index_entry
def save_media_index_entry(self, index_entry: MediaRegistryIndexEntry) -> bool:
with self._lock:
index = self._load_index()
index.media_index[f"{self._media_api}_{index_entry.media_id}"] = index_entry
self._save_index(index)
index = self._load_index()
index.media_index[f"{self._media_api}_{index_entry.media_id}"] = index_entry
self._save_index(index)
logger.debug(f"Saved media record for {index_entry.media_id}")
return True
logger.debug(f"Saved media record for {index_entry.media_id}")
return True
def save_media_record(self, record: MediaRecord) -> bool:
self.get_or_create_index_entry(record.media_item.id)
with self._lock:
self.get_or_create_index_entry(record.media_item.id)
media_id = record.media_item.id
record_file = self._get_media_file_path(media_id)
with AtomicWriter(record_file) as f:
json.dump(record.model_dump(), f, indent=2, default=str)
json.dump(record.model_dump(mode="json"), f, indent=2, default=str)
logger.debug(f"Saved media record for {media_id}")
return True
@@ -186,7 +192,7 @@ class MediaRegistryService:
index.media_index[f"{self._media_api}_{media_id}"] = index_entry
self._save_index(index)
def get_recently_watched(self, limit: int) -> List[MediaRecord]:
def get_recently_watched(self, limit: int) -> MediaSearchResult:
"""Get recently watched anime."""
index = self._load_index()
@@ -194,7 +200,7 @@ class MediaRegistryService:
index.media_index.values(), key=lambda x: x.last_watched, reverse=True
)
recent_media = []
recent_media: List[MediaItem] = []
for entry in sorted_entries:
record = self.get_media_record(entry.media_id)
if record:
@@ -202,7 +208,10 @@ class MediaRegistryService:
if len(recent_media) == limit:
break
return recent_media
page_info = PageInfo(
total=len(sorted_entries),
)
return MediaSearchResult(page_info=page_info, media=recent_media)
def get_registry_stats(self) -> Dict:
"""Get comprehensive registry statistics."""
@@ -258,10 +267,10 @@ class MediaRegistryService:
except OSError:
pass
index = self._load_index()
id = f"{self._media_api}_{media_id}"
if id in index.media_index:
del index.media_index[id]
self._save_index(index)
index = self._load_index()
id = f"{self._media_api}_{media_id}"
if id in index.media_index:
del index.media_index[id]
self._save_index(index)
logger.debug(f"Removed media record {media_id}")

View File

@@ -50,7 +50,7 @@ class SessionsService:
def _save_session(self, session: Session):
path = self.dir / f"{session.name}.json"
with AtomicWriter(path) as f:
json.dump(session.model_dump(), f)
json.dump(session.model_dump(mode="json"), f)
def _load_session(self, session_name: str) -> Optional[Session]:
path = self.dir / f"{session_name}.json"

View File

@@ -35,7 +35,7 @@ class WatchHistoryService:
status=status,
)
if self.media_api:
if self.media_api and self.media_api.is_authenticated():
self.media_api.update_list_entry(
UpdateListEntryParams(
media_id=media_item.id,
@@ -61,7 +61,7 @@ class WatchHistoryService:
notes=notes,
)
if self.media_api:
if self.media_api and self.media_api.is_authenticated():
self.media_api.update_list_entry(
UpdateListEntryParams(
media_id=media_item.id,

View File

@@ -1,15 +0,0 @@
"""
Utility modules for the FastAnime CLI.
"""
from ..services.watch_history.manager import WatchHistoryManager
from ..services.watch_history.tracker import WatchHistoryTracker, watch_tracker
from ..services.watch_history.types import WatchHistoryEntry, WatchHistoryData
__all__ = [
"WatchHistoryManager",
"WatchHistoryTracker",
"watch_tracker",
"WatchHistoryEntry",
"WatchHistoryData",
]

View File

@@ -155,8 +155,10 @@ def get_anime_preview(
os.environ["SHELL"] = "bash"
return final_script
# --- Episode Preview Functionality ---
def _populate_episode_info_template(episode_data: dict, config: AppConfig) -> str:
"""
Takes the episode_info.sh template and injects episode-specific formatted data.
@@ -172,8 +174,12 @@ def _populate_episode_info_template(episode_data: dict, config: AppConfig) -> st
"SCORE": formatters.shell_safe("N/A"), # Episodes don't have scores
"STATUS": formatters.shell_safe(episode_data.get("status", "Available")),
"FAVOURITES": formatters.shell_safe("N/A"), # Episodes don't have favorites
"GENRES": formatters.shell_safe(episode_data.get("duration", "Unknown duration")),
"SYNOPSIS": formatters.shell_safe(episode_data.get("description", "No episode description available.")),
"GENRES": formatters.shell_safe(
episode_data.get("duration", "Unknown duration")
),
"SYNOPSIS": formatters.shell_safe(
episode_data.get("description", "No episode description available.")
),
# Color codes
"C_TITLE": ansi.get_true_fg(HEADER_COLOR, bold=True),
"C_KEY": ansi.get_true_fg(HEADER_COLOR, bold=True),
@@ -191,72 +197,87 @@ def _populate_episode_info_template(episode_data: dict, config: AppConfig) -> st
def _episode_cache_worker(episodes: List[str], anime: MediaItem, config: AppConfig):
"""Background task that fetches and saves episode preview data."""
streaming_episodes = {ep.title: ep for ep in anime.streaming_episodes}
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
for episode_str in episodes:
hash_id = _get_cache_hash(episode_str)
# Find matching streaming episode
episode_data = None
for title, ep in streaming_episodes.items():
if f"Episode {episode_str}" in title or title.endswith(f" {episode_str}"):
if f"Episode {episode_str}" in title or title.endswith(
f" {episode_str}"
):
episode_data = {
"title": title,
"thumbnail": ep.thumbnail,
"description": f"Episode {episode_str} of {anime.title.english or anime.title.romaji}",
"duration": f"{anime.duration} min" if anime.duration else "Unknown duration",
"status": "Available"
"duration": f"{anime.duration} min"
if anime.duration
else "Unknown duration",
"status": "Available",
}
break
# Fallback if no streaming episode found
if not episode_data:
episode_data = {
"title": f"Episode {episode_str}",
"thumbnail": None,
"description": f"Episode {episode_str} of {anime.title.english or anime.title.romaji}",
"duration": f"{anime.duration} min" if anime.duration else "Unknown duration",
"status": "Available"
"duration": f"{anime.duration} min"
if anime.duration
else "Unknown duration",
"status": "Available",
}
# Download thumbnail if available
if episode_data["thumbnail"]:
executor.submit(_save_image_from_url, episode_data["thumbnail"], hash_id)
executor.submit(
_save_image_from_url, episode_data["thumbnail"], hash_id
)
# Generate and save episode info
episode_info = _populate_episode_info_template(episode_data, config)
executor.submit(_save_info_text, episode_info, hash_id)
def get_episode_preview(episodes: List[str], anime: MediaItem, config: AppConfig) -> str:
def get_episode_preview(
episodes: List[str], anime: MediaItem, config: AppConfig
) -> str:
"""
Starts a background task to cache episode preview data and returns the fzf preview command.
Args:
episodes: List of episode numbers as strings
anime: MediaItem containing the anime data with streaming episodes
config: Application configuration
Returns:
FZF preview command string
"""
# TODO: finish implementation of episode preview
# Ensure cache directories exist
IMAGES_CACHE_DIR.mkdir(parents=True, exist_ok=True)
INFO_CACHE_DIR.mkdir(parents=True, exist_ok=True)
# Start background caching for episodes
Thread(target=_episode_cache_worker, args=(episodes, anime, config), daemon=True).start()
Thread(
target=_episode_cache_worker, args=(episodes, anime, config), daemon=True
).start()
# Read the shell script template
try:
template = PREVIEW_SCRIPT_TEMPLATE_PATH.read_text(encoding="utf-8")
except FileNotFoundError:
logger.error(f"Preview script template not found at {PREVIEW_SCRIPT_TEMPLATE_PATH}")
logger.error(
f"Preview script template not found at {PREVIEW_SCRIPT_TEMPLATE_PATH}"
)
return "echo 'Error: Preview script template not found.'"
# Prepare values to inject into the template
path_sep = "\\" if PLATFORM == "win32" else "/"
# Format the template with the dynamic values
final_script = (
template.replace("{preview_mode}", config.general.preview)
@@ -265,6 +286,6 @@ def get_episode_preview(episodes: List[str], anime: MediaItem, config: AppConfig
.replace("{path_sep}", path_sep)
.replace("{image_renderer}", config.general.image_renderer)
)
os.environ["SHELL"] = "bash"
return final_script

View File

@@ -82,8 +82,8 @@ DOWNLOADS_RETRY_ATTEMPTS = 3
DOWNLOADS_RETRY_DELAY = 300
# RegistryConfig
MEDIA_REGISTRY_DIR = USER_VIDEOS_DIR / APP_NAME / "registry"
MEDIA_REGISTRY_DIR = USER_VIDEOS_DIR / ".registry"
MEDIA_REGISTRY_INDEX_DIR = APP_DATA_DIR
# session config
SESSIONS_DIR = APP_DATA_DIR / "sessions"
SESSIONS_DIR = APP_DATA_DIR / ".sessions"

View File

@@ -21,6 +21,12 @@ from . import descriptions as desc
class GeneralConfig(BaseModel):
"""Configuration for general application behavior and integrations."""
per_page: int = Field(
default=defaults.ANILIST_PER_PAGE,
gt=0,
le=50,
description=desc.ANILIST_PER_PAGE,
)
pygment_style: str = Field(
default=defaults.GENERAL_PYGMENT_STYLE, description=desc.GENERAL_PYGMENT_STYLE
)

View File

@@ -42,6 +42,9 @@ class AniListApi(BaseApiClient):
self.http_client.headers.pop("Authorization", None)
return self.user_profile
def is_authenticated(self) -> bool:
return True if self.user_profile else False
def get_viewer_profile(self) -> Optional[UserProfile]:
if not self.token:
return None

View File

@@ -83,15 +83,16 @@ def _to_generic_media_trailer(
def _to_generic_airing_schedule(
anilist_schedule: AnilistMediaNextAiringEpisode,
anilist_schedule: Optional[AnilistMediaNextAiringEpisode],
) -> Optional[AiringSchedule]:
"""Maps an AniList nextAiringEpisode object to a generic AiringSchedule."""
return AiringSchedule(
airing_at=datetime.fromtimestamp(anilist_schedule["airingAt"])
if anilist_schedule.get("airingAt")
else None,
episode=anilist_schedule.get("episode", 0),
)
if anilist_schedule:
return AiringSchedule(
airing_at=datetime.fromtimestamp(anilist_schedule["airingAt"])
if anilist_schedule.get("airingAt")
else None,
episode=anilist_schedule.get("episode", 0),
)
def _to_generic_studios(anilist_studios: AnilistStudioNodes) -> List[Studio]:

View File

@@ -21,6 +21,10 @@ class BaseApiClient(abc.ABC):
def authenticate(self, token: str) -> Optional[UserProfile]:
pass
@abc.abstractmethod
def is_authenticated(self) -> bool:
pass
@abc.abstractmethod
def get_viewer_profile(self) -> Optional[UserProfile]:
pass

View File

@@ -58,9 +58,7 @@ class ApiSearchParams:
@dataclass(frozen=True)
class UserListParams:
status: Literal[
"CURRENT", "PLANNING", "COMPLETED", "DROPPED", "PAUSED", "REPEATING"
]
status: UserListStatusType
page: int = 1
per_page: int = 20

View File

@@ -130,10 +130,10 @@ class MediaItem(BaseApiModel):
class PageInfo(BaseApiModel):
"""Generic pagination information."""
total: int
current_page: int
has_next_page: bool
per_page: int
total: int = 1
current_page: int = 1
has_next_page: bool = False
per_page: int = 15
class MediaSearchResult(BaseApiModel):