feat: results menu

This commit is contained in:
Benexl
2025-07-24 00:07:26 +03:00
parent a6ddb10734
commit afe1cb68f6
13 changed files with 398 additions and 393 deletions

View File

@@ -66,11 +66,11 @@ def auth(ctx: Context, state: State) -> State | InternalDirective:
elif "View Profile Details" in choice:
_display_user_profile_details(console, user_profile, icons)
feedback.pause_for_user("Press Enter to continue")
return InternalDirective.CONTINUE
return InternalDirective.RELOAD
elif "How to Get Token" in choice:
_display_token_help(console, icons)
feedback.pause_for_user("Press Enter to continue")
return InternalDirective.CONTINUE
return InternalDirective.RELOAD
else: # Back to Main Menu
return InternalDirective.BACK
@@ -164,7 +164,7 @@ def _handle_login(
)
feedback.pause_for_user("Press Enter to continue")
return InternalDirective.CONTINUE
return InternalDirective.RELOAD
def _handle_logout(
@@ -176,7 +176,7 @@ def _handle_logout(
"This will remove your saved AniList token and log you out",
default=False,
):
return InternalDirective.CONTINUE
return InternalDirective.RELOAD
def perform_logout():
# Clear from auth manager

View File

@@ -57,7 +57,7 @@ def _watch_trailer(ctx: Context, state: State) -> MenuAction:
feedback = ctx.services.feedback
anime = state.media_api.anime
if not anime:
return InternalDirective.CONTINUE
return InternalDirective.RELOAD
if not anime.trailer or not anime.trailer.id:
feedback.warning(
"No trailer available for this anime",
@@ -68,7 +68,7 @@ def _watch_trailer(ctx: Context, state: State) -> MenuAction:
ctx.player.play(PlayerParams(url=trailer_url, title=""))
return InternalDirective.CONTINUE
return InternalDirective.RELOAD
return action
@@ -78,10 +78,10 @@ def _add_to_list(ctx: Context, state: State) -> MenuAction:
feedback = ctx.services.feedback
anime = state.media_api.anime
if not anime:
return InternalDirective.CONTINUE
return InternalDirective.RELOAD
if not ctx.media_api.is_authenticated():
return InternalDirective.CONTINUE
return InternalDirective.RELOAD
choices = [
"watching",
@@ -99,7 +99,7 @@ def _add_to_list(ctx: Context, state: State) -> MenuAction:
UpdateListEntryParams(media_id=anime.id, status=status), # pyright:ignore
feedback,
)
return InternalDirective.CONTINUE
return InternalDirective.RELOAD
return action
@@ -109,11 +109,11 @@ def _score_anime(ctx: Context, state: State) -> MenuAction:
feedback = ctx.services.feedback
anime = state.media_api.anime
if not anime:
return InternalDirective.CONTINUE
return InternalDirective.RELOAD
# Check authentication before proceeding
if not ctx.media_api.is_authenticated():
return InternalDirective.CONTINUE
return InternalDirective.RELOAD
score_str = ctx.selector.ask("Enter score (0.0 - 10.0):")
try:
@@ -130,7 +130,7 @@ def _score_anime(ctx: Context, state: State) -> MenuAction:
feedback.error(
"Invalid score entered", "Please enter a number between 0.0 and 10.0"
)
return InternalDirective.CONTINUE
return InternalDirective.RELOAD
return action
@@ -139,7 +139,7 @@ def _view_info(ctx: Context, state: State) -> MenuAction:
def action():
anime = state.media_api.anime
if not anime:
return InternalDirective.CONTINUE
return InternalDirective.RELOAD
# TODO: Make this nice and include all other media item fields
from rich import box
@@ -161,7 +161,7 @@ def _view_info(ctx: Context, state: State) -> MenuAction:
console.print(Panel(panel_content, title=title, box=box.ROUNDED, expand=True))
ctx.selector.ask("Press Enter to continue...")
return InternalDirective.CONTINUE
return InternalDirective.RELOAD
return action
@@ -170,6 +170,6 @@ def _update_user_list(
ctx: Context, anime: MediaItem, params: UpdateListEntryParams, feedback
):
if ctx.media_api.is_authenticated():
return InternalDirective.CONTINUE
return InternalDirective.RELOAD
ctx.media_api.update_list_entry(params)

View File

@@ -79,7 +79,7 @@ def player_controls(ctx: Context, state: State) -> State | InternalDirective:
),
)
console.print("[bold yellow]This is the last available episode.[/bold yellow]")
return InternalDirective.CONTINUE
return InternalDirective.RELOAD
def replay() -> State | InternalDirective:
# We don't need to change state, just re-trigger the SERVERS menu's logic.
@@ -101,7 +101,7 @@ def player_controls(ctx: Context, state: State) -> State | InternalDirective:
update={"selected_server": server_map[new_server_name]}
),
)
return InternalDirective.CONTINUE
return InternalDirective.RELOAD
# --- Menu Options ---
icons = config.general.icons

View File

@@ -1,108 +1,103 @@
from dataclasses import asdict
from typing import Callable, Dict, Union
from ....libs.api.params import MediaSearchParams, UserMediaListSearchParams
from ....libs.api.types import MediaItem, MediaStatus, UserMediaListStatus
from ..session import Context, session
from ..state import InternalDirective, MediaApiState, State
from ..state import InternalDirective, MediaApiState, MenuName, State
@session.menu
def results(ctx: Context, state: State) -> State | InternalDirective:
search_results = state.media_api.search_results
feedback = ctx.services.feedback
feedback.clear_console()
if not search_results or not search_results.media:
search_result = state.media_api.search_result
page_info = state.media_api.page_info
if not search_result:
feedback.info("No anime found for the given criteria")
return InternalDirective.BACK
anime_items = search_results.media
formatted_titles = [
_format_anime_choice(anime, ctx.config) for anime in anime_items
]
anime_map = dict(zip(formatted_titles, anime_items))
_formatted_titles = [_format_title(ctx, anime) for anime in search_result.values()]
preview_command = None
if ctx.config.general.preview != "none":
from ...utils.previews import get_anime_preview
preview_command = get_anime_preview(anime_items, formatted_titles, ctx.config)
choices = formatted_titles
page_info = search_results.page_info
# Add pagination controls if available with more descriptive text
if page_info.has_next_page:
choices.append(
f"{'➡️ ' if ctx.config.general.icons else ''}Next Page (Page {page_info.current_page + 1})"
preview_command = get_anime_preview(
list(search_result.values()), _formatted_titles, ctx.config
)
if page_info.current_page > 1:
choices.append(
f"{'⬅️ ' if ctx.config.general.icons else ''}Previous Page (Page {page_info.current_page - 1})"
)
choices.append("Back")
# Create header with auth status and pagination info
pagination_info = f"Page {page_info.current_page}"
if page_info.total > 0 and page_info.per_page > 0:
total_pages = (page_info.total + page_info.per_page - 1) // page_info.per_page
pagination_info += f" of ~{total_pages}"
choices: Dict[str, Callable[[], Union[int, State, InternalDirective]]] = dict(
zip(_formatted_titles, [lambda: item for item in search_result.keys()])
)
choice_str = ctx.selector.choose(
if page_info:
if page_info.has_next_page:
choices.update(
{
f"{'➡️ ' if ctx.config.general.icons else ''}Next Page (Page {page_info.current_page + 1})": lambda: _handle_pagination(
ctx, state, 1
)
}
)
if page_info.current_page > 1:
choices.update(
{
f"{'⬅️ ' if ctx.config.general.icons else ''}Previous Page (Page {page_info.current_page - 1})": lambda: _handle_pagination(
ctx, state, -1
)
}
)
choices.update(
{"Back": lambda: InternalDirective.MAIN, "Exit": lambda: InternalDirective.EXIT}
)
choice = ctx.selector.choose(
prompt="Select Anime",
choices=choices,
choices=list(choices),
preview=preview_command,
)
if not choice_str:
return InternalDirective.EXIT
if not choice:
return InternalDirective.RELOAD
if choice_str == "Back":
return InternalDirective.BACK
if (
choice_str == "Next Page"
or choice_str == "Previous Page"
or choice_str.startswith("Next Page (")
or choice_str.startswith("Previous Page (")
):
page_delta = 1 if choice_str.startswith("Next Page") else -1
return _handle_pagination(ctx, state, page_delta)
selected_anime = anime_map.get(choice_str)
if selected_anime:
next_step = choices[choice]()
if isinstance(next_step, State) or isinstance(next_step, InternalDirective):
return next_step
else:
return State(
menu_name="MEDIA_ACTIONS",
menu_name=MenuName.MEDIA_ACTIONS,
media_api=MediaApiState(
search_results=state.media_api.search_results, # Carry over the list
anime=selected_anime, # Set the newly selected item
media_id=next_step,
search_result=state.media_api.search_result,
page_info=state.media_api.page_info,
),
provider=state.provider,
)
# Fallback
return InternalDirective.CONTINUE
def _format_title(ctx: Context, media_item: MediaItem) -> str:
config = ctx.config
def _format_anime_choice(anime: MediaItem, config) -> str:
"""Creates a display string for a single anime item for the selector."""
title = anime.title.english or anime.title.romaji
title = media_item.title.english or media_item.title.romaji
progress = "0"
if anime.user_status:
progress = str(anime.user_status.progress or 0)
episodes_total = str(anime.episodes or "??")
if media_item.user_status:
progress = str(media_item.user_status.progress or 0)
episodes_total = str(media_item.episodes or "??")
display_title = f"{title} ({progress} of {episodes_total})"
# Add a visual indicator for new episodes if applicable
if (
anime.status == MediaStatus.RELEASING
and anime.next_airing
and anime.user_status
and anime.user_status.status == UserMediaListStatus.WATCHING
media_item.status == MediaStatus.RELEASING
and media_item.next_airing
and media_item.user_status
and media_item.user_status.status == UserMediaListStatus.WATCHING
):
last_aired = anime.next_airing.episode - 1
unwatched = last_aired - (anime.user_status.progress or 0)
last_aired = media_item.next_airing.episode - 1
unwatched = last_aired - (media_item.user_status.progress or 0)
if unwatched > 0:
icon = "🔹" if config.general.icons else "!"
display_title += f" {icon}{unwatched} new{icon}"
@@ -113,123 +108,83 @@ def _format_anime_choice(anime: MediaItem, config) -> str:
def _handle_pagination(
ctx: Context, state: State, page_delta: int
) -> State | InternalDirective:
"""
Handle pagination by fetching the next or previous page of results.
Args:
ctx: The application context
state: Current state containing search results and original parameters
page_delta: +1 for next page, -1 for previous page
Returns:
New State with updated search results or ControlFlow.CONTINUE on error
"""
feedback = ctx.services.feedback
if not state.media_api.search_results:
feedback.error("No search results available for pagination")
return InternalDirective.CONTINUE
search_params = state.media_api.search_params
current_page = state.media_api.search_results.page_info.current_page
if (
not state.media_api.search_result
or not state.media_api.page_info
or not search_params
):
feedback.error("No search results available for pagination")
return InternalDirective.RELOAD
current_page = state.media_api.page_info.current_page
new_page = current_page + page_delta
# Validate page bounds
if new_page < 1:
feedback.warning("Already at the first page")
return InternalDirective.CONTINUE
return InternalDirective.RELOAD
if page_delta > 0 and not state.media_api.search_results.page_info.has_next_page:
if page_delta == -1:
return InternalDirective.BACK
if page_delta > 0 and not state.media_api.page_info.has_next_page:
feedback.warning("No more pages available")
return InternalDirective.CONTINUE
return InternalDirective.RELOAD
# Determine which type of search to perform based on stored parameters
if state.media_api.original_api_params:
# Media search (trending, popular, search, etc.)
return _fetch_media_page(ctx, state, new_page, feedback)
elif state.media_api.original_user_list_params:
# User list search (watching, completed, etc.)
return _fetch_user_list_page(ctx, state, new_page, feedback)
if isinstance(search_params, UserMediaListSearchParams):
if not ctx.media_api.is_authenticated():
feedback.error("You haven't logged in")
return InternalDirective.RELOAD
search_params_dict = asdict(search_params)
search_params_dict.pop("page")
loading_message = f"Fetching media list"
result = None
new_search_params = UserMediaListSearchParams(
**search_params_dict, page=new_page
)
with feedback.progress(loading_message):
result = ctx.media_api.search_media_list(new_search_params)
if result:
return State(
menu_name=MenuName.RESULTS,
media_api=MediaApiState(
search_result={
media_item.id: media_item for media_item in result.media
},
search_params=new_search_params,
page_info=result.page_info,
),
)
else:
feedback.error("No original search parameters found for pagination")
return InternalDirective.CONTINUE
search_params_dict = asdict(search_params)
search_params_dict.pop("page")
loading_message = f"Fetching media list"
result = None
new_search_params = MediaSearchParams(**search_params_dict, page=new_page)
with feedback.progress(loading_message):
result = ctx.media_api.search_media(new_search_params)
def _fetch_media_page(
ctx: Context, state: State, page: int, feedback
) -> State | InternalDirective:
"""Fetch a specific page for media search results."""
original_params = state.media_api.original_api_params
if not original_params:
feedback.error("No original API parameters found")
return InternalDirective.CONTINUE
if result:
return State(
menu_name=MenuName.RESULTS,
media_api=MediaApiState(
search_result={
media_item.id: media_item for media_item in result.media
},
search_params=new_search_params,
page_info=result.page_info,
),
)
# Create new parameters with updated page number
new_params = MediaSearchParams(
query=original_params.query,
page=page,
per_page=original_params.per_page,
sort=original_params.sort,
id_in=original_params.id_in,
genre_in=original_params.genre_in,
genre_not_in=original_params.genre_not_in,
tag_in=original_params.tag_in,
tag_not_in=original_params.tag_not_in,
status_in=original_params.status_in,
status=original_params.status,
status_not_in=original_params.status_not_in,
popularity_greater=original_params.popularity_greater,
popularity_lesser=original_params.popularity_lesser,
averageScore_greater=original_params.averageScore_greater,
averageScore_lesser=original_params.averageScore_lesser,
seasonYear=original_params.seasonYear,
season=original_params.season,
startDate_greater=original_params.startDate_greater,
startDate_lesser=original_params.startDate_lesser,
startDate=original_params.startDate,
endDate_greater=original_params.endDate_greater,
endDate_lesser=original_params.endDate_lesser,
format_in=original_params.format_in,
type=original_params.type,
on_list=original_params.on_list,
)
result = ctx.media_api.search_media(new_params)
return State(
menu_name="RESULTS",
media_api=MediaApiState(
search_results=result,
original_api_params=original_params, # Keep original params for further pagination
original_user_list_params=state.media_api.original_user_list_params,
),
provider=state.provider, # Preserve provider state if it exists
)
def _fetch_user_list_page(
ctx: Context, state: State, page: int, feedback
) -> State | InternalDirective:
"""Fetch a specific page for user list results."""
original_params = state.media_api.original_user_list_params
if not original_params:
feedback.error("No original user list parameters found")
return InternalDirective.CONTINUE
# Create new parameters with updated page number
new_params = UserMediaListSearchParams(
status=original_params.status,
page=page,
per_page=original_params.per_page,
)
result = ctx.media_api.search_media_list(new_params)
return State(
menu_name="RESULTS",
media_api=MediaApiState(
search_results=result,
original_api_params=state.media_api.original_api_params,
original_user_list_params=original_params, # Keep original params for further pagination
),
provider=state.provider, # Preserve provider state if it exists
)
# print(new_search_params)
# print(result)
feedback.warning("Failed to load page")
return InternalDirective.RELOAD

View File

@@ -91,7 +91,7 @@ def servers(ctx: Context, state: State) -> State | InternalDirective:
console.print(
f"[bold red]No stream of quality '{config.stream.quality}' found on server '{selected_server.name}'.[/bold red]"
)
return InternalDirective.CONTINUE
return InternalDirective.RELOAD
# --- Launch Player ---
final_title = f"{provider_anime.title} - Ep {episode_number}"

View File

@@ -90,7 +90,7 @@ def watch_history(ctx: Context, state: State) -> State | InternalDirective:
if result == "BACK":
return InternalDirective.BACK
else:
return InternalDirective.CONTINUE
return InternalDirective.RELOAD
def _display_history_stats(

View File

@@ -2,8 +2,7 @@ import importlib.util
import logging
import os
from dataclasses import dataclass
from pathlib import Path
from typing import Callable, List, Optional
from typing import Callable, List, Optional, Union
import click
@@ -23,12 +22,11 @@ from ..services.feedback import FeedbackService
from ..services.registry import MediaRegistryService
from ..services.session import SessionsService
from ..services.watch_history import WatchHistoryService
from .state import InternalDirective, State
from .state import InternalDirective, MenuName, State
logger = logging.getLogger(__name__)
# A type alias for the signature all menu functions must follow.
MenuFunction = Callable[["Context", State], "State | ControlFlow"]
MENUS_DIR = APP_DIR / "cli" / "interactive" / "menus"
@@ -52,16 +50,19 @@ class Context:
services: Services
MenuFunction = Callable[[Context, State], Union[State, InternalDirective]]
@dataclass(frozen=True)
class Menu:
name: str
name: MenuName
execute: MenuFunction
class Session:
_context: Context
_history: List[State] = []
_menus: dict[str, Menu] = {}
_menus: dict[MenuName, Menu] = {}
def _load_context(self, config: AppConfig):
"""Initializes all shared services based on the provided configuration."""
@@ -122,7 +123,7 @@ class Session:
logger.warning("Failed to continue from history. No sessions found")
if not self._history:
self._history.append(State(menu_name="MAIN"))
self._history.append(State(menu_name=MenuName.MAIN))
try:
self._run_main_loop()
@@ -141,8 +142,12 @@ class Session:
)
if isinstance(next_step, InternalDirective):
if next_step == InternalDirective.EXIT:
break
if next_step == InternalDirective.MAIN:
self._history = [self._history[0]]
if next_step == InternalDirective.RELOAD:
continue
elif next_step == InternalDirective.CONFIG_EDIT:
self._edit_config()
elif next_step == InternalDirective.BACK:
if len(self._history) > 1:
self._history.pop()
@@ -155,21 +160,17 @@ class Session:
self._history.pop()
self._history.pop()
self._history.pop()
elif next_step == InternalDirective.CONFIG_EDIT:
self._edit_config()
elif next_step == InternalDirective.EXIT:
break
else:
# if the state is main menu we should reset the history
if next_step.menu_name == "MAIN":
self._history = [next_step]
else:
self._history.append(next_step)
self._history.append(next_step)
@property
def menu(self) -> Callable[[MenuFunction], MenuFunction]:
"""A decorator to register a function as a menu."""
def decorator(func: MenuFunction) -> MenuFunction:
menu_name = func.__name__.upper()
menu_name = MenuName(func.__name__.upper())
if menu_name in self._menus:
logger.warning(f"Menu '{menu_name}' is being redefined.")
self._menus[menu_name] = Menu(name=menu_name, execute=func)

View File

@@ -10,6 +10,8 @@ from ...libs.providers.anime.types import Anime, SearchResults, Server
# TODO: is internal directive a good name
class InternalDirective(Enum):
MAIN = "MAIN"
BACK = auto()
BACKX2 = auto()
@@ -20,7 +22,7 @@ class InternalDirective(Enum):
CONFIG_EDIT = auto()
CONTINUE = auto()
RELOAD = auto()
class MenuName(Enum):
@@ -34,6 +36,7 @@ class MenuName(Enum):
PLAYER_CONTROLS = "PLAYER_CONTROLS"
USER_MEDIA_LIST = "USER_MEDIA_LIST"
SESSION_MANAGEMENT = "SESSION_MANAGEMENT"
MEDIA_ACTIONS = "MEDIA_ACTIONS"
class StateModel(BaseModel):