feat: improve state models

This commit is contained in:
Benexl
2025-07-23 21:16:50 +03:00
parent 2067467134
commit f678fa13f0
27 changed files with 397 additions and 375 deletions

View File

@@ -18,11 +18,11 @@ if TYPE_CHECKING:
)
@click.pass_obj
def favourites(config: "AppConfig", dump_json: bool):
from fastanime.libs.api.params import ApiSearchParams
from fastanime.libs.api.params import MediaSearchParams
from ..helpers import handle_media_search_command
def create_search_params(config):
return ApiSearchParams(
return MediaSearchParams(
per_page=config.anilist.per_page or 15,
sort=["FAVOURITES_DESC"]
)

View File

@@ -18,11 +18,11 @@ if TYPE_CHECKING:
)
@click.pass_obj
def popular(config: "AppConfig", dump_json: bool):
from fastanime.libs.api.params import ApiSearchParams
from fastanime.libs.api.params import MediaSearchParams
from ..helpers import handle_media_search_command
def create_search_params(config):
return ApiSearchParams(
return MediaSearchParams(
per_page=config.anilist.per_page or 15,
sort=["POPULARITY_DESC"]
)

View File

@@ -24,7 +24,7 @@ def random_anime(config: "AppConfig", dump_json: bool):
from fastanime.cli.utils.feedback import create_feedback_manager
from fastanime.core.exceptions import FastAnimeError
from fastanime.libs.api.factory import create_api_client
from fastanime.libs.api.params import ApiSearchParams
from fastanime.libs.api.params import MediaSearchParams
from rich.progress import Progress
feedback = create_feedback_manager(config.general.icons)
@@ -39,7 +39,7 @@ def random_anime(config: "AppConfig", dump_json: bool):
# Search for random anime
with Progress() as progress:
progress.add_task("Fetching random anime...", total=None)
search_params = ApiSearchParams(id_in=random_ids, per_page=50)
search_params = MediaSearchParams(id_in=random_ids, per_page=50)
search_result = api_client.search_media(search_params)
if not search_result or not search_result.media:

View File

@@ -18,11 +18,11 @@ if TYPE_CHECKING:
)
@click.pass_obj
def recent(config: "AppConfig", dump_json: bool):
from fastanime.libs.api.params import ApiSearchParams
from fastanime.libs.api.params import MediaSearchParams
from ..helpers import handle_media_search_command
def create_search_params(config):
return ApiSearchParams(
return MediaSearchParams(
per_page=config.anilist.per_page or 15,
sort=["UPDATED_AT_DESC"],
status_in=["RELEASING"]

View File

@@ -18,11 +18,11 @@ if TYPE_CHECKING:
)
@click.pass_obj
def scores(config: "AppConfig", dump_json: bool):
from fastanime.libs.api.params import ApiSearchParams
from fastanime.libs.api.params import MediaSearchParams
from ..helpers import handle_media_search_command
def create_search_params(config):
return ApiSearchParams(
return MediaSearchParams(
per_page=config.anilist.per_page or 15,
sort=["SCORE_DESC"]
)

View File

@@ -98,7 +98,7 @@ def search(
from fastanime.cli.utils.feedback import create_feedback_manager
from fastanime.core.exceptions import FastAnimeError
from fastanime.libs.api.factory import create_api_client
from fastanime.libs.api.params import ApiSearchParams
from fastanime.libs.api.params import MediaSearchParams
from rich.progress import Progress
feedback = create_feedback_manager(config.general.icons)
@@ -108,7 +108,7 @@ def search(
api_client = create_api_client(config.general.media_api, config)
# Build search parameters
search_params = ApiSearchParams(
search_params = MediaSearchParams(
query=title,
per_page=config.anilist.per_page or 50,
sort=[sort] if sort else None,

View File

@@ -18,13 +18,13 @@ if TYPE_CHECKING:
)
@click.pass_obj
def trending(config: "AppConfig", dump_json: bool):
from fastanime.libs.api.params import ApiSearchParams
from fastanime.libs.api.params import MediaSearchParams
from ..helpers import handle_media_search_command
def create_search_params(config):
return ApiSearchParams(
per_page=config.anilist.per_page or 15,
sort=["TRENDING_DESC"]
return MediaSearchParams(
per_page=config.anilist.per_page or 15, sort=["TRENDING_DESC"]
)
handle_media_search_command(
@@ -32,5 +32,5 @@ def trending(config: "AppConfig", dump_json: bool):
dump_json=dump_json,
task_name="Fetching trending anime...",
search_params_factory=create_search_params,
empty_message="No trending anime found"
empty_message="No trending anime found",
)

View File

@@ -18,11 +18,11 @@ if TYPE_CHECKING:
)
@click.pass_obj
def upcoming(config: "AppConfig", dump_json: bool):
from fastanime.libs.api.params import ApiSearchParams
from fastanime.libs.api.params import MediaSearchParams
from ..helpers import handle_media_search_command
def create_search_params(config):
return ApiSearchParams(
return MediaSearchParams(
per_page=config.anilist.per_page or 15,
sort=["POPULARITY_DESC"],
status_in=["NOT_YET_RELEASED"]

View File

@@ -119,7 +119,7 @@ def handle_user_list_command(
"""
from fastanime.cli.utils.feedback import create_feedback_manager
from fastanime.core.exceptions import FastAnimeError
from fastanime.libs.api.params import UserListParams
from fastanime.libs.api.params import UserMediaListSearchParams
feedback = create_feedback_manager(config.general.icons)
@@ -145,7 +145,7 @@ def handle_user_list_command(
# Fetch user's anime list
with Progress() as progress:
progress.add_task(f"Fetching your {list_name} list...", total=None)
list_params = UserListParams(
list_params = UserMediaListSearchParams(
status=status, # type: ignore # We validated it above
page=1,
per_page=config.anilist.per_page or 50,

View File

@@ -15,11 +15,11 @@ from ....libs.api.types import UserProfile
from ...auth.manager import AuthManager
from ...utils.feedback import create_feedback_manager, execute_with_feedback
from ..session import Context, session
from ..state import ControlFlow, State
from ..state import InternalDirective, State
@session.menu
def auth(ctx: Context, state: State) -> State | ControlFlow:
def auth(ctx: Context, state: State) -> State | InternalDirective:
"""
Interactive authentication menu for managing AniList login/logout and viewing user profile.
"""
@@ -56,7 +56,7 @@ def auth(ctx: Context, state: State) -> State | ControlFlow:
)
if not choice:
return ControlFlow.BACK
return InternalDirective.BACK
# Handle menu choices
if "Login to AniList" in choice:
@@ -66,13 +66,13 @@ def auth(ctx: Context, state: State) -> State | ControlFlow:
elif "View Profile Details" in choice:
_display_user_profile_details(console, user_profile, icons)
feedback.pause_for_user("Press Enter to continue")
return ControlFlow.CONTINUE
return InternalDirective.CONTINUE
elif "How to Get Token" in choice:
_display_token_help(console, icons)
feedback.pause_for_user("Press Enter to continue")
return ControlFlow.CONTINUE
return InternalDirective.CONTINUE
else: # Back to Main Menu
return ControlFlow.BACK
return InternalDirective.BACK
def _display_auth_status(
@@ -99,7 +99,7 @@ def _display_auth_status(
def _handle_login(
ctx: Context, auth_manager: AuthManager, feedback, icons: bool
) -> State | ControlFlow:
) -> State | InternalDirective:
"""Handle the interactive login process."""
def perform_login():
@@ -164,19 +164,19 @@ def _handle_login(
)
feedback.pause_for_user("Press Enter to continue")
return ControlFlow.CONTINUE
return InternalDirective.CONTINUE
def _handle_logout(
ctx: Context, auth_manager: AuthManager, feedback, icons: bool
) -> State | ControlFlow:
) -> State | InternalDirective:
"""Handle the logout process with confirmation."""
if not feedback.confirm(
"Are you sure you want to logout?",
"This will remove your saved AniList token and log you out",
default=False,
):
return ControlFlow.CONTINUE
return InternalDirective.CONTINUE
def perform_logout():
# Clear from auth manager
@@ -208,7 +208,7 @@ def _handle_logout(
if success:
feedback.pause_for_user("Press Enter to continue")
return ControlFlow.CONFIG_EDIT
return InternalDirective.CONFIG_EDIT
def _display_user_profile_details(

View File

@@ -4,11 +4,11 @@ import click
from rich.console import Console
from ..session import Context, session
from ..state import ControlFlow, ProviderState, State
from ..state import InternalDirective, ProviderState, State
@session.menu
def episodes(ctx: Context, state: State) -> State | ControlFlow:
def episodes(ctx: Context, state: State) -> State | InternalDirective:
"""
Displays available episodes for a selected provider anime and handles
the logic for continuing from watch history or manual selection.
@@ -21,7 +21,7 @@ def episodes(ctx: Context, state: State) -> State | ControlFlow:
if not provider_anime or not anilist_anime:
feedback.error("Error: Anime details are missing.")
return ControlFlow.BACK
return InternalDirective.BACK
available_episodes = getattr(
provider_anime.episodes, config.stream.translation_type, []
@@ -30,7 +30,7 @@ def episodes(ctx: Context, state: State) -> State | ControlFlow:
feedback.warning(
f"No '{config.stream.translation_type}' episodes found for this anime."
)
return ControlFlow.BACKX2
return InternalDirective.BACKX2
chosen_episode: str | None = None
@@ -55,7 +55,7 @@ def episodes(ctx: Context, state: State) -> State | ControlFlow:
if not chosen_episode_str or chosen_episode_str == "Back":
# TODO: should improve the back logic for menus that can be pass through
return ControlFlow.BACKX2
return InternalDirective.BACKX2
chosen_episode = chosen_episode_str

View File

@@ -2,7 +2,7 @@ import logging
import random
from typing import Callable, Dict, Tuple
from ....libs.api.params import ApiSearchParams, UserListParams
from ....libs.api.params import MediaSearchParams, UserMediaListSearchParams
from ....libs.api.types import (
MediaSearchResult,
MediaSort,
@@ -10,17 +10,22 @@ from ....libs.api.types import (
UserMediaListStatus,
)
from ..session import Context, session
from ..state import ControlFlow, MediaApiState, State
from ..state import InternalDirective, MediaApiState, State
logger = logging.getLogger(__name__)
MenuAction = Callable[
[],
Tuple[str, MediaSearchResult | None, ApiSearchParams | None, UserListParams | None],
Tuple[
str,
MediaSearchResult | None,
MediaSearchParams | None,
UserMediaListSearchParams | None,
],
]
@session.menu
def main(ctx: Context, state: State) -> State | ControlFlow:
def main(ctx: Context, state: State) -> State | InternalDirective:
"""
The main entry point menu for the interactive session.
Displays top-level categories for the user to browse and select.
@@ -95,7 +100,7 @@ def main(ctx: Context, state: State) -> State | ControlFlow:
)
if not choice_str:
return ControlFlow.EXIT
return InternalDirective.EXIT
# --- Action Handling ---
selected_action = options[choice_str]
@@ -103,9 +108,9 @@ def main(ctx: Context, state: State) -> State | ControlFlow:
next_menu_name, result_data, api_params, user_list_params = selected_action()
if next_menu_name == "EXIT":
return ControlFlow.EXIT
return InternalDirective.EXIT
if next_menu_name == "CONFIG_EDIT":
return ControlFlow.CONFIG_EDIT
return InternalDirective.CONFIG_EDIT
if next_menu_name == "SESSION_MANAGEMENT":
return State(menu_name="SESSION_MANAGEMENT")
if next_menu_name == "AUTH":
@@ -115,14 +120,14 @@ def main(ctx: Context, state: State) -> State | ControlFlow:
if next_menu_name == "WATCH_HISTORY":
return State(menu_name="WATCH_HISTORY")
if next_menu_name == "CONTINUE":
return ControlFlow.CONTINUE
return InternalDirective.CONTINUE
if not result_data:
feedback.error(
f"Failed to fetch data for '{choice_str.strip()}'",
"Please check your internet connection and try again.",
)
return ControlFlow.CONTINUE
return InternalDirective.CONTINUE
# On success, transition to the RESULTS menu state.
return State(
@@ -142,7 +147,7 @@ def _create_media_list_action(
def action():
# Create the search parameters
search_params = ApiSearchParams(sort=sort, status=status)
search_params = MediaSearchParams(sort=sort, status=status)
result = ctx.media_api.search_media(search_params)
@@ -153,7 +158,7 @@ def _create_media_list_action(
def _create_random_media_list(ctx: Context) -> MenuAction:
def action():
search_params = ApiSearchParams(id_in=random.sample(range(1, 15000), k=50))
search_params = MediaSearchParams(id_in=random.sample(range(1, 15000), k=50))
result = ctx.media_api.search_media(search_params)
@@ -168,7 +173,7 @@ def _create_search_media_list(ctx: Context) -> MenuAction:
if not query:
return "CONTINUE", None, None, None
search_params = ApiSearchParams(query=query)
search_params = MediaSearchParams(query=query)
result = ctx.media_api.search_media(search_params)
return ("RESULTS", result, search_params, None)
@@ -185,7 +190,7 @@ def _create_user_list_action(ctx: Context, status: UserMediaListStatus) -> MenuA
logger.warning("Not authenticated")
return "CONTINUE", None, None, None
user_list_params = UserListParams(status=status)
user_list_params = UserMediaListSearchParams(status=status)
result = ctx.media_api.search_media_list(user_list_params)

View File

@@ -6,13 +6,13 @@ from ....libs.api.params import UpdateListEntryParams
from ....libs.api.types import MediaItem
from ....libs.players.params import PlayerParams
from ..session import Context, session
from ..state import ControlFlow, ProviderState, State
from ..state import InternalDirective, ProviderState, State
MenuAction = Callable[[], State | ControlFlow]
MenuAction = Callable[[], State | InternalDirective]
@session.menu
def media_actions(ctx: Context, state: State) -> State | ControlFlow:
def media_actions(ctx: Context, state: State) -> State | InternalDirective:
icons = ctx.config.general.icons
anime = state.media_api.anime
anime_title = anime.title.english or anime.title.romaji if anime else "Unknown"
@@ -26,7 +26,7 @@ def media_actions(ctx: Context, state: State) -> State | ControlFlow:
f"{' ' if icons else ''}Add/Update List": _add_to_list(ctx, state),
f"{'' if icons else ''}Score Anime": _score_anime(ctx, state),
f"{' ' if icons else ''}View Info": _view_info(ctx, state),
f"{'🔙 ' if icons else ''}Back to Results": lambda: ControlFlow.BACK,
f"{'🔙 ' if icons else ''}Back to Results": lambda: InternalDirective.BACK,
}
choice_str = ctx.selector.choose(
@@ -37,7 +37,7 @@ def media_actions(ctx: Context, state: State) -> State | ControlFlow:
if choice_str and choice_str in options:
return options[choice_str]()
return ControlFlow.BACK
return InternalDirective.BACK
# --- Action Implementations ---
@@ -57,7 +57,7 @@ def _watch_trailer(ctx: Context, state: State) -> MenuAction:
feedback = ctx.services.feedback
anime = state.media_api.anime
if not anime:
return ControlFlow.CONTINUE
return InternalDirective.CONTINUE
if not anime.trailer or not anime.trailer.id:
feedback.warning(
"No trailer available for this anime",
@@ -68,7 +68,7 @@ def _watch_trailer(ctx: Context, state: State) -> MenuAction:
ctx.player.play(PlayerParams(url=trailer_url, title=""))
return ControlFlow.CONTINUE
return InternalDirective.CONTINUE
return action
@@ -78,10 +78,10 @@ def _add_to_list(ctx: Context, state: State) -> MenuAction:
feedback = ctx.services.feedback
anime = state.media_api.anime
if not anime:
return ControlFlow.CONTINUE
return InternalDirective.CONTINUE
if not ctx.media_api.is_authenticated():
return ControlFlow.CONTINUE
return InternalDirective.CONTINUE
choices = [
"watching",
@@ -99,7 +99,7 @@ def _add_to_list(ctx: Context, state: State) -> MenuAction:
UpdateListEntryParams(media_id=anime.id, status=status), # pyright:ignore
feedback,
)
return ControlFlow.CONTINUE
return InternalDirective.CONTINUE
return action
@@ -109,11 +109,11 @@ def _score_anime(ctx: Context, state: State) -> MenuAction:
feedback = ctx.services.feedback
anime = state.media_api.anime
if not anime:
return ControlFlow.CONTINUE
return InternalDirective.CONTINUE
# Check authentication before proceeding
if not ctx.media_api.is_authenticated():
return ControlFlow.CONTINUE
return InternalDirective.CONTINUE
score_str = ctx.selector.ask("Enter score (0.0 - 10.0):")
try:
@@ -130,7 +130,7 @@ def _score_anime(ctx: Context, state: State) -> MenuAction:
feedback.error(
"Invalid score entered", "Please enter a number between 0.0 and 10.0"
)
return ControlFlow.CONTINUE
return InternalDirective.CONTINUE
return action
@@ -139,7 +139,7 @@ def _view_info(ctx: Context, state: State) -> MenuAction:
def action():
anime = state.media_api.anime
if not anime:
return ControlFlow.CONTINUE
return InternalDirective.CONTINUE
# TODO: Make this nice and include all other media item fields
from rich import box
@@ -161,7 +161,7 @@ def _view_info(ctx: Context, state: State) -> MenuAction:
console.print(Panel(panel_content, title=title, box=box.ROUNDED, expand=True))
ctx.selector.ask("Press Enter to continue...")
return ControlFlow.CONTINUE
return InternalDirective.CONTINUE
return action
@@ -170,6 +170,6 @@ def _update_user_list(
ctx: Context, anime: MediaItem, params: UpdateListEntryParams, feedback
):
if ctx.media_api.is_authenticated():
return ControlFlow.CONTINUE
return InternalDirective.CONTINUE
ctx.media_api.update_list_entry(params)

View File

@@ -5,14 +5,14 @@ import click
from rich.console import Console
from ..session import Context, session
from ..state import ControlFlow, State
from ..state import InternalDirective, State
if TYPE_CHECKING:
from ....libs.providers.anime.types import Server
@session.menu
def player_controls(ctx: Context, state: State) -> State | ControlFlow:
def player_controls(ctx: Context, state: State) -> State | InternalDirective:
"""
Handles post-playback options like playing the next episode,
replaying, or changing streaming options.
@@ -43,7 +43,7 @@ def player_controls(ctx: Context, state: State) -> State | ControlFlow:
console.print(
"[bold red]Error: Player state is incomplete. Returning.[/bold red]"
)
return ControlFlow.BACK
return InternalDirective.BACK
# --- Auto-Next Logic ---
available_episodes = getattr(
@@ -66,7 +66,7 @@ def player_controls(ctx: Context, state: State) -> State | ControlFlow:
)
# --- Action Definitions ---
def next_episode() -> State | ControlFlow:
def next_episode() -> State | InternalDirective:
if current_index < len(available_episodes) - 1:
next_episode_num = available_episodes[current_index + 1]
@@ -79,15 +79,15 @@ def player_controls(ctx: Context, state: State) -> State | ControlFlow:
),
)
console.print("[bold yellow]This is the last available episode.[/bold yellow]")
return ControlFlow.CONTINUE
return InternalDirective.CONTINUE
def replay() -> State | ControlFlow:
def replay() -> State | InternalDirective:
# We don't need to change state, just re-trigger the SERVERS menu's logic.
return State(
menu_name="SERVERS", media_api=state.media_api, provider=state.provider
)
def change_server() -> State | ControlFlow:
def change_server() -> State | InternalDirective:
server_map: Dict[str, Server] = {s.name: s for s in all_servers}
new_server_name = selector.choose(
"Select a different server:", list(server_map.keys())
@@ -101,11 +101,11 @@ def player_controls(ctx: Context, state: State) -> State | ControlFlow:
update={"selected_server": server_map[new_server_name]}
),
)
return ControlFlow.CONTINUE
return InternalDirective.CONTINUE
# --- Menu Options ---
icons = config.general.icons
options: Dict[str, Callable[[], State | ControlFlow]] = {}
options: Dict[str, Callable[[], State | InternalDirective]] = {}
if current_index < len(available_episodes) - 1:
options[f"{'⏭️ ' if icons else ''}Next Episode"] = next_episode
@@ -118,7 +118,7 @@ def player_controls(ctx: Context, state: State) -> State | ControlFlow:
menu_name="EPISODES", media_api=state.media_api, provider=state.provider
),
f"{'🏠 ' if icons else ''}Main Menu": lambda: State(menu_name="MAIN"),
f"{'' if icons else ''}Exit": lambda: ControlFlow.EXIT,
f"{'' if icons else ''}Exit": lambda: InternalDirective.EXIT,
}
)
@@ -131,4 +131,4 @@ def player_controls(ctx: Context, state: State) -> State | ControlFlow:
if choice_str and choice_str in options:
return options[choice_str]()
return ControlFlow.BACK
return InternalDirective.BACK

View File

@@ -7,16 +7,16 @@ from thefuzz import fuzz
from ....libs.providers.anime.params import SearchParams
from ....libs.providers.anime.types import SearchResult
from ..session import Context, session
from ..state import ControlFlow, ProviderState, State
from ..state import InternalDirective, ProviderState, State
@session.menu
def provider_search(ctx: Context, state: State) -> State | ControlFlow:
def provider_search(ctx: Context, state: State) -> State | InternalDirective:
feedback = ctx.services.feedback
anilist_anime = state.media_api.anime
if not anilist_anime:
feedback.error("No AniList anime to search for", "Please select an anime first")
return ControlFlow.BACK
return InternalDirective.BACK
provider = ctx.provider
selector = ctx.selector
@@ -29,7 +29,7 @@ def provider_search(ctx: Context, state: State) -> State | ControlFlow:
"Selected anime has no searchable title",
"This anime entry is missing required title information",
)
return ControlFlow.BACK
return InternalDirective.BACK
provider_search_results = provider.search(
SearchParams(
@@ -42,7 +42,7 @@ def provider_search(ctx: Context, state: State) -> State | ControlFlow:
f"Could not find '{anilist_title}' on {provider.__class__.__name__}",
"Try another provider from the config or go back to search again",
)
return ControlFlow.BACK
return InternalDirective.BACK
provider_results_map: dict[str, SearchResult] = {
result.title: result for result in provider_search_results.results
@@ -68,7 +68,7 @@ def provider_search(ctx: Context, state: State) -> State | ControlFlow:
)
if not chosen_title or chosen_title == "Back":
return ControlFlow.BACK
return InternalDirective.BACK
selected_provider_anime = provider_results_map[chosen_title]
@@ -88,7 +88,7 @@ def provider_search(ctx: Context, state: State) -> State | ControlFlow:
feedback.warning(
f"Failed to fetch details for '{selected_provider_anime.title}'."
)
return ControlFlow.BACK
return InternalDirective.BACK
return State(
menu_name="EPISODES",

View File

@@ -1,18 +1,18 @@
from ....libs.api.params import ApiSearchParams, UserListParams
from ....libs.api.params import MediaSearchParams, UserMediaListSearchParams
from ....libs.api.types import MediaItem, MediaStatus, UserMediaListStatus
from ..session import Context, session
from ..state import ControlFlow, MediaApiState, State
from ..state import InternalDirective, MediaApiState, State
@session.menu
def results(ctx: Context, state: State) -> State | ControlFlow:
def results(ctx: Context, state: State) -> State | InternalDirective:
search_results = state.media_api.search_results
feedback = ctx.services.feedback
feedback.clear_console()
if not search_results or not search_results.media:
feedback.info("No anime found for the given criteria")
return ControlFlow.BACK
return InternalDirective.BACK
anime_items = search_results.media
formatted_titles = [
@@ -54,10 +54,10 @@ def results(ctx: Context, state: State) -> State | ControlFlow:
)
if not choice_str:
return ControlFlow.EXIT
return InternalDirective.EXIT
if choice_str == "Back":
return ControlFlow.BACK
return InternalDirective.BACK
if (
choice_str == "Next Page"
@@ -81,7 +81,7 @@ def results(ctx: Context, state: State) -> State | ControlFlow:
)
# Fallback
return ControlFlow.CONTINUE
return InternalDirective.CONTINUE
def _format_anime_choice(anime: MediaItem, config) -> str:
@@ -112,7 +112,7 @@ def _format_anime_choice(anime: MediaItem, config) -> str:
def _handle_pagination(
ctx: Context, state: State, page_delta: int
) -> State | ControlFlow:
) -> State | InternalDirective:
"""
Handle pagination by fetching the next or previous page of results.
@@ -128,7 +128,7 @@ def _handle_pagination(
if not state.media_api.search_results:
feedback.error("No search results available for pagination")
return ControlFlow.CONTINUE
return InternalDirective.CONTINUE
current_page = state.media_api.search_results.page_info.current_page
new_page = current_page + page_delta
@@ -136,11 +136,11 @@ def _handle_pagination(
# Validate page bounds
if new_page < 1:
feedback.warning("Already at the first page")
return ControlFlow.CONTINUE
return InternalDirective.CONTINUE
if page_delta > 0 and not state.media_api.search_results.page_info.has_next_page:
feedback.warning("No more pages available")
return ControlFlow.CONTINUE
return InternalDirective.CONTINUE
# Determine which type of search to perform based on stored parameters
if state.media_api.original_api_params:
@@ -151,20 +151,20 @@ def _handle_pagination(
return _fetch_user_list_page(ctx, state, new_page, feedback)
else:
feedback.error("No original search parameters found for pagination")
return ControlFlow.CONTINUE
return InternalDirective.CONTINUE
def _fetch_media_page(
ctx: Context, state: State, page: int, feedback
) -> State | ControlFlow:
) -> State | InternalDirective:
"""Fetch a specific page for media search results."""
original_params = state.media_api.original_api_params
if not original_params:
feedback.error("No original API parameters found")
return ControlFlow.CONTINUE
return InternalDirective.CONTINUE
# Create new parameters with updated page number
new_params = ApiSearchParams(
new_params = MediaSearchParams(
query=original_params.query,
page=page,
per_page=original_params.per_page,
@@ -208,15 +208,15 @@ def _fetch_media_page(
def _fetch_user_list_page(
ctx: Context, state: State, page: int, feedback
) -> State | ControlFlow:
) -> State | InternalDirective:
"""Fetch a specific page for user list results."""
original_params = state.media_api.original_user_list_params
if not original_params:
feedback.error("No original user list parameters found")
return ControlFlow.CONTINUE
return InternalDirective.CONTINUE
# Create new parameters with updated page number
new_params = UserListParams(
new_params = UserMediaListSearchParams(
status=original_params.status,
page=page,
per_page=original_params.per_page,

View File

@@ -7,7 +7,7 @@ from ....libs.players.params import PlayerParams
from ....libs.providers.anime.params import EpisodeStreamsParams
from ....libs.providers.anime.types import Server
from ..session import Context, session
from ..state import ControlFlow, State
from ..state import InternalDirective, State
def _filter_by_quality(links, quality):
@@ -19,14 +19,14 @@ def _filter_by_quality(links, quality):
@session.menu
def servers(ctx: Context, state: State) -> State | ControlFlow:
def servers(ctx: Context, state: State) -> State | InternalDirective:
"""
Fetches and displays available streaming servers for a chosen episode,
then launches the media player and transitions to post-playback controls.
"""
provider_anime = state.provider.anime
if not state.media_api.anime:
return ControlFlow.BACK
return InternalDirective.BACK
anime_title = (
state.media_api.anime.title.romaji or state.media_api.anime.title.english
)
@@ -42,7 +42,7 @@ def servers(ctx: Context, state: State) -> State | ControlFlow:
"[bold red]Error: Anime or episode details are missing.[/bold red]"
)
selector.ask("Enter to continue...")
return ControlFlow.BACK
return InternalDirective.BACK
# --- Fetch Server Streams ---
with Progress(transient=True) as progress:
@@ -64,7 +64,7 @@ def servers(ctx: Context, state: State) -> State | ControlFlow:
console.print(
f"[bold yellow]No streaming servers found for this episode.[/bold yellow]"
)
return ControlFlow.BACK
return InternalDirective.BACK
# --- Auto-Select or Prompt for Server ---
server_map: Dict[str, Server] = {s.name: s for s in all_servers}
@@ -83,7 +83,7 @@ def servers(ctx: Context, state: State) -> State | ControlFlow:
choices = [*server_map.keys(), "Back"]
chosen_name = selector.choose("Select Server", choices)
if not chosen_name or chosen_name == "Back":
return ControlFlow.BACK
return InternalDirective.BACK
selected_server = server_map[chosen_name]
stream_link_obj = _filter_by_quality(selected_server.links, config.stream.quality)
@@ -91,7 +91,7 @@ def servers(ctx: Context, state: State) -> State | ControlFlow:
console.print(
f"[bold red]No stream of quality '{config.stream.quality}' found on server '{selected_server.name}'.[/bold red]"
)
return ControlFlow.CONTINUE
return InternalDirective.CONTINUE
# --- Launch Player ---
final_title = f"{provider_anime.title} - Ep {episode_number}"

View File

@@ -16,7 +16,7 @@ from ...utils.feedback import create_feedback_manager
from ...utils.watch_history_manager import WatchHistoryManager
from ...utils.watch_history_types import WatchHistoryEntry
from ..session import Context, session
from ..state import ControlFlow, State
from ..state import InternalDirective, State
logger = logging.getLogger(__name__)
@@ -24,7 +24,7 @@ MenuAction = Callable[[], str]
@session.menu
def watch_history(ctx: Context, state: State) -> State | ControlFlow:
def watch_history(ctx: Context, state: State) -> State | InternalDirective:
"""
Watch history management menu for viewing and managing local watch history.
"""
@@ -40,17 +40,39 @@ def watch_history(ctx: Context, state: State) -> State | ControlFlow:
_display_history_stats(console, history_manager, icons)
options: Dict[str, MenuAction] = {
f"{'📺 ' if icons else ''}Currently Watching": lambda: _view_watching(ctx, history_manager, feedback),
f"{'' if icons else ''}Completed Anime": lambda: _view_completed(ctx, history_manager, feedback),
f"{'🕒 ' if icons else ''}Recently Watched": lambda: _view_recent(ctx, history_manager, feedback),
f"{'📋 ' if icons else ''}View All History": lambda: _view_all_history(ctx, history_manager, feedback),
f"{'🔍 ' if icons else ''}Search History": lambda: _search_history(ctx, history_manager, feedback),
f"{'✏️ ' if icons else ''}Edit Entry": lambda: _edit_entry(ctx, history_manager, feedback),
f"{'🗑️ ' if icons else ''}Remove Entry": lambda: _remove_entry(ctx, history_manager, feedback),
f"{'📊 ' if icons else ''}View Statistics": lambda: _view_stats(ctx, history_manager, feedback),
f"{'💾 ' if icons else ''}Export History": lambda: _export_history(ctx, history_manager, feedback),
f"{'📥 ' if icons else ''}Import History": lambda: _import_history(ctx, history_manager, feedback),
f"{'🧹 ' if icons else ''}Clear All History": lambda: _clear_history(ctx, history_manager, feedback),
f"{'📺 ' if icons else ''}Currently Watching": lambda: _view_watching(
ctx, history_manager, feedback
),
f"{' ' if icons else ''}Completed Anime": lambda: _view_completed(
ctx, history_manager, feedback
),
f"{'🕒 ' if icons else ''}Recently Watched": lambda: _view_recent(
ctx, history_manager, feedback
),
f"{'📋 ' if icons else ''}View All History": lambda: _view_all_history(
ctx, history_manager, feedback
),
f"{'🔍 ' if icons else ''}Search History": lambda: _search_history(
ctx, history_manager, feedback
),
f"{'✏️ ' if icons else ''}Edit Entry": lambda: _edit_entry(
ctx, history_manager, feedback
),
f"{'🗑️ ' if icons else ''}Remove Entry": lambda: _remove_entry(
ctx, history_manager, feedback
),
f"{'📊 ' if icons else ''}View Statistics": lambda: _view_stats(
ctx, history_manager, feedback
),
f"{'💾 ' if icons else ''}Export History": lambda: _export_history(
ctx, history_manager, feedback
),
f"{'📥 ' if icons else ''}Import History": lambda: _import_history(
ctx, history_manager, feedback
),
f"{'🧹 ' if icons else ''}Clear All History": lambda: _clear_history(
ctx, history_manager, feedback
),
f"{'🔙 ' if icons else ''}Back to Main Menu": lambda: "BACK",
}
@@ -61,25 +83,27 @@ def watch_history(ctx: Context, state: State) -> State | ControlFlow:
)
if not choice_str:
return ControlFlow.BACK
return InternalDirective.BACK
result = options[choice_str]()
if result == "BACK":
return ControlFlow.BACK
return InternalDirective.BACK
else:
return ControlFlow.CONTINUE
return InternalDirective.CONTINUE
def _display_history_stats(console: Console, history_manager: WatchHistoryManager, icons: bool):
def _display_history_stats(
console: Console, history_manager: WatchHistoryManager, icons: bool
):
"""Display current watch history statistics."""
stats = history_manager.get_stats()
# Create a stats table
table = Table(title=f"{'📊 ' if icons else ''}Watch History Overview")
table.add_column("Metric", style="cyan")
table.add_column("Count", style="green")
table.add_row("Total Anime", str(stats["total_entries"]))
table.add_row("Currently Watching", str(stats["watching"]))
table.add_row("Completed", str(stats["completed"]))
@@ -87,7 +111,7 @@ def _display_history_stats(console: Console, history_manager: WatchHistoryManage
table.add_row("Paused", str(stats["paused"]))
table.add_row("Total Episodes", str(stats["total_episodes_watched"]))
table.add_row("Last Updated", stats["last_updated"])
console.print(table)
console.print()
@@ -95,116 +119,123 @@ def _display_history_stats(console: Console, history_manager: WatchHistoryManage
def _view_watching(ctx: Context, history_manager: WatchHistoryManager, feedback) -> str:
"""View currently watching anime."""
entries = history_manager.get_watching_entries()
if not entries:
feedback.info("No anime currently being watched")
return "CONTINUE"
return _display_entries_list(ctx, entries, "Currently Watching", feedback)
def _view_completed(ctx: Context, history_manager: WatchHistoryManager, feedback) -> str:
def _view_completed(
ctx: Context, history_manager: WatchHistoryManager, feedback
) -> str:
"""View completed anime."""
entries = history_manager.get_completed_entries()
if not entries:
feedback.info("No completed anime found")
return "CONTINUE"
return _display_entries_list(ctx, entries, "Completed Anime", feedback)
def _view_recent(ctx: Context, history_manager: WatchHistoryManager, feedback) -> str:
"""View recently watched anime."""
entries = history_manager.get_recently_watched(20)
if not entries:
feedback.info("No recent watch history found")
return "CONTINUE"
return _display_entries_list(ctx, entries, "Recently Watched", feedback)
def _view_all_history(ctx: Context, history_manager: WatchHistoryManager, feedback) -> str:
def _view_all_history(
ctx: Context, history_manager: WatchHistoryManager, feedback
) -> str:
"""View all watch history entries."""
entries = history_manager.get_all_entries()
if not entries:
feedback.info("No watch history found")
return "CONTINUE"
# Sort by last watched date
entries.sort(key=lambda x: x.last_watched, reverse=True)
return _display_entries_list(ctx, entries, "All Watch History", feedback)
def _search_history(ctx: Context, history_manager: WatchHistoryManager, feedback) -> str:
def _search_history(
ctx: Context, history_manager: WatchHistoryManager, feedback
) -> str:
"""Search watch history by title."""
query = ctx.selector.ask("Enter search query:")
if not query:
return "CONTINUE"
entries = history_manager.search_entries(query)
if not entries:
feedback.info(f"No anime found matching '{query}'")
return "CONTINUE"
return _display_entries_list(ctx, entries, f"Search Results for '{query}'", feedback)
return _display_entries_list(
ctx, entries, f"Search Results for '{query}'", feedback
)
def _display_entries_list(ctx: Context, entries: List[WatchHistoryEntry], title: str, feedback) -> str:
def _display_entries_list(
ctx: Context, entries: List[WatchHistoryEntry], title: str, feedback
) -> str:
"""Display a list of watch history entries and allow selection."""
console = Console()
console.clear()
# Create table for entries
table = Table(title=title)
table.add_column("Status", style="yellow", width=6)
table.add_column("Title", style="cyan")
table.add_column("Progress", style="green", width=12)
table.add_column("Last Watched", style="blue", width=12)
choices = []
entry_map = {}
for i, entry in enumerate(entries):
# Format last watched date
last_watched = entry.last_watched.strftime("%Y-%m-%d")
# Add to table
table.add_row(
entry.get_status_emoji(),
entry.get_display_title(),
entry.get_progress_display(),
last_watched
last_watched,
)
# Create choice for selector
choice_text = f"{entry.get_status_emoji()} {entry.get_display_title()} - {entry.get_progress_display()}"
choices.append(choice_text)
entry_map[choice_text] = entry
console.print(table)
console.print()
if not choices:
feedback.info("No entries to display")
feedback.pause_for_user()
return "CONTINUE"
choices.append("Back")
choice = ctx.selector.choose(
"Select an anime for details:",
choices=choices
)
choice = ctx.selector.choose("Select an anime for details:", choices=choices)
if not choice or choice == "Back":
return "CONTINUE"
selected_entry = entry_map[choice]
return _show_entry_details(ctx, selected_entry, feedback)
@@ -213,7 +244,7 @@ def _show_entry_details(ctx: Context, entry: WatchHistoryEntry, feedback) -> str
"""Show detailed information about a watch history entry."""
console = Console()
console.clear()
# Display detailed entry information
console.print(f"[bold cyan]{entry.get_display_title()}[/bold cyan]")
console.print(f"Status: {entry.get_status_emoji()} {entry.status.title()}")
@@ -221,37 +252,36 @@ def _show_entry_details(ctx: Context, entry: WatchHistoryEntry, feedback) -> str
console.print(f"Times Watched: {entry.times_watched}")
console.print(f"First Watched: {entry.first_watched.strftime('%Y-%m-%d %H:%M')}")
console.print(f"Last Watched: {entry.last_watched.strftime('%Y-%m-%d %H:%M')}")
if entry.notes:
console.print(f"Notes: {entry.notes}")
# Show media details if available
media = entry.media_item
if media.description:
console.print(f"\nDescription: {media.description[:200]}{'...' if len(media.description) > 200 else ''}")
console.print(
f"\nDescription: {media.description[:200]}{'...' if len(media.description) > 200 else ''}"
)
if media.genres:
console.print(f"Genres: {', '.join(media.genres)}")
if media.average_score:
console.print(f"Score: {media.average_score}/100")
console.print()
# Action options
actions = [
"Mark Episode as Watched",
"Change Status",
"Edit Notes",
"Remove from History",
"Back to List"
"Back to List",
]
choice = ctx.selector.choose(
"Select action:",
choices=actions
)
choice = ctx.selector.choose("Select action:", choices=actions)
if choice == "Mark Episode as Watched":
return _mark_episode_watched(ctx, entry, feedback)
elif choice == "Change Status":
@@ -268,26 +298,30 @@ def _mark_episode_watched(ctx: Context, entry: WatchHistoryEntry, feedback) -> s
"""Mark a specific episode as watched."""
current_episode = entry.last_watched_episode
max_episodes = entry.media_item.episodes or 999
episode_str = ctx.selector.ask(f"Enter episode number (current: {current_episode}, max: {max_episodes}):")
episode_str = ctx.selector.ask(
f"Enter episode number (current: {current_episode}, max: {max_episodes}):"
)
try:
episode = int(episode_str)
if episode < 1 or (max_episodes and episode > max_episodes):
feedback.error(f"Invalid episode number. Must be between 1 and {max_episodes}")
feedback.error(
f"Invalid episode number. Must be between 1 and {max_episodes}"
)
return "CONTINUE"
history_manager = WatchHistoryManager()
success = history_manager.mark_episode_watched(entry.media_item.id, episode)
if success:
feedback.success(f"Marked episode {episode} as watched")
else:
feedback.error("Failed to update watch progress")
except ValueError:
feedback.error("Invalid episode number entered")
return "CONTINUE"
@@ -295,48 +329,50 @@ def _change_entry_status(ctx: Context, entry: WatchHistoryEntry, feedback) -> st
"""Change the status of a watch history entry."""
statuses = ["watching", "completed", "paused", "dropped", "planning"]
current_status = entry.status
choices = [f"{status.title()} {'(current)' if status == current_status else ''}" for status in statuses]
choices = [
f"{status.title()} {'(current)' if status == current_status else ''}"
for status in statuses
]
choices.append("Cancel")
choice = ctx.selector.choose(
f"Select new status (current: {current_status}):",
choices=choices
f"Select new status (current: {current_status}):", choices=choices
)
if not choice or choice == "Cancel":
return "CONTINUE"
new_status = choice.split()[0].lower()
history_manager = WatchHistoryManager()
success = history_manager.change_status(entry.media_item.id, new_status)
if success:
feedback.success(f"Changed status to {new_status}")
else:
feedback.error("Failed to update status")
return "CONTINUE"
def _edit_entry_notes(ctx: Context, entry: WatchHistoryEntry, feedback) -> str:
"""Edit notes for a watch history entry."""
current_notes = entry.notes or ""
new_notes = ctx.selector.ask(f"Enter notes (current: '{current_notes}'):")
if new_notes is None: # User cancelled
return "CONTINUE"
history_manager = WatchHistoryManager()
success = history_manager.update_notes(entry.media_item.id, new_notes)
if success:
feedback.success("Notes updated successfully")
else:
feedback.error("Failed to update notes")
return "CONTINUE"
@@ -345,76 +381,80 @@ def _confirm_remove_entry(ctx: Context, entry: WatchHistoryEntry, feedback) -> s
if feedback.confirm(f"Remove '{entry.get_display_title()}' from watch history?"):
history_manager = WatchHistoryManager()
success = history_manager.remove_entry(entry.media_item.id)
if success:
feedback.success("Entry removed from watch history")
else:
feedback.error("Failed to remove entry")
return "CONTINUE"
def _edit_entry(ctx: Context, history_manager: WatchHistoryManager, feedback) -> str:
"""Edit a watch history entry (select first)."""
entries = history_manager.get_all_entries()
if not entries:
feedback.info("No watch history entries to edit")
return "CONTINUE"
# Sort by title for easier selection
entries.sort(key=lambda x: x.get_display_title())
choices = [f"{entry.get_display_title()} - {entry.get_progress_display()}" for entry in entries]
choices = [
f"{entry.get_display_title()} - {entry.get_progress_display()}"
for entry in entries
]
choices.append("Cancel")
choice = ctx.selector.choose(
"Select anime to edit:",
choices=choices
)
choice = ctx.selector.choose("Select anime to edit:", choices=choices)
if not choice or choice == "Cancel":
return "CONTINUE"
# Find the selected entry
choice_title = choice.split(" - ")[0]
selected_entry = next((entry for entry in entries if entry.get_display_title() == choice_title), None)
selected_entry = next(
(entry for entry in entries if entry.get_display_title() == choice_title), None
)
if selected_entry:
return _show_entry_details(ctx, selected_entry, feedback)
return "CONTINUE"
def _remove_entry(ctx: Context, history_manager: WatchHistoryManager, feedback) -> str:
"""Remove a watch history entry (select first)."""
entries = history_manager.get_all_entries()
if not entries:
feedback.info("No watch history entries to remove")
return "CONTINUE"
# Sort by title for easier selection
entries.sort(key=lambda x: x.get_display_title())
choices = [f"{entry.get_display_title()} - {entry.get_progress_display()}" for entry in entries]
choices = [
f"{entry.get_display_title()} - {entry.get_progress_display()}"
for entry in entries
]
choices.append("Cancel")
choice = ctx.selector.choose(
"Select anime to remove:",
choices=choices
)
choice = ctx.selector.choose("Select anime to remove:", choices=choices)
if not choice or choice == "Cancel":
return "CONTINUE"
# Find the selected entry
choice_title = choice.split(" - ")[0]
selected_entry = next((entry for entry in entries if entry.get_display_title() == choice_title), None)
selected_entry = next(
(entry for entry in entries if entry.get_display_title() == choice_title), None
)
if selected_entry:
return _confirm_remove_entry(ctx, selected_entry, feedback)
return "CONTINUE"
@@ -422,14 +462,14 @@ def _view_stats(ctx: Context, history_manager: WatchHistoryManager, feedback) ->
"""View detailed watch history statistics."""
console = Console()
console.clear()
stats = history_manager.get_stats()
# Create detailed stats table
table = Table(title="Detailed Watch History Statistics")
table.add_column("Metric", style="cyan")
table.add_column("Value", style="green")
table.add_row("Total Anime Entries", str(stats["total_entries"]))
table.add_row("Currently Watching", str(stats["watching"]))
table.add_row("Completed", str(stats["completed"]))
@@ -437,88 +477,98 @@ def _view_stats(ctx: Context, history_manager: WatchHistoryManager, feedback) ->
table.add_row("Paused", str(stats["paused"]))
table.add_row("Total Episodes Watched", str(stats["total_episodes_watched"]))
table.add_row("Last Updated", stats["last_updated"])
# Calculate additional stats
if stats["total_entries"] > 0:
completion_rate = (stats["completed"] / stats["total_entries"]) * 100
table.add_row("Completion Rate", f"{completion_rate:.1f}%")
avg_episodes = stats["total_episodes_watched"] / stats["total_entries"]
table.add_row("Avg Episodes per Anime", f"{avg_episodes:.1f}")
console.print(table)
feedback.pause_for_user()
return "CONTINUE"
def _export_history(ctx: Context, history_manager: WatchHistoryManager, feedback) -> str:
def _export_history(
ctx: Context, history_manager: WatchHistoryManager, feedback
) -> str:
"""Export watch history to a file."""
export_name = ctx.selector.ask("Enter export filename (without extension):")
if not export_name:
return "CONTINUE"
export_path = APP_DATA_DIR / f"{export_name}.json"
if export_path.exists():
if not feedback.confirm(f"File '{export_name}.json' already exists. Overwrite?"):
if not feedback.confirm(
f"File '{export_name}.json' already exists. Overwrite?"
):
return "CONTINUE"
success = history_manager.export_history(export_path)
if success:
feedback.success(f"Watch history exported to {export_path}")
else:
feedback.error("Failed to export watch history")
return "CONTINUE"
def _import_history(ctx: Context, history_manager: WatchHistoryManager, feedback) -> str:
def _import_history(
ctx: Context, history_manager: WatchHistoryManager, feedback
) -> str:
"""Import watch history from a file."""
import_name = ctx.selector.ask("Enter import filename (without extension):")
if not import_name:
return "CONTINUE"
import_path = APP_DATA_DIR / f"{import_name}.json"
if not import_path.exists():
feedback.error(f"File '{import_name}.json' not found in {APP_DATA_DIR}")
return "CONTINUE"
merge = feedback.confirm("Merge with existing history? (No = Replace existing history)")
merge = feedback.confirm(
"Merge with existing history? (No = Replace existing history)"
)
success = history_manager.import_history(import_path, merge=merge)
if success:
action = "merged with" if merge else "replaced"
feedback.success(f"Watch history imported and {action} existing data")
else:
feedback.error("Failed to import watch history")
return "CONTINUE"
def _clear_history(ctx: Context, history_manager: WatchHistoryManager, feedback) -> str:
"""Clear all watch history with confirmation."""
if not feedback.confirm("Are you sure you want to clear ALL watch history? This cannot be undone."):
if not feedback.confirm(
"Are you sure you want to clear ALL watch history? This cannot be undone."
):
return "CONTINUE"
if not feedback.confirm("Final confirmation: Clear all watch history?"):
return "CONTINUE"
# Create backup before clearing
backup_success = history_manager.backup_history()
if backup_success:
feedback.info("Backup created before clearing")
success = history_manager.clear_history()
if success:
feedback.success("All watch history cleared")
else:
feedback.error("Failed to clear watch history")
return "CONTINUE"

View File

@@ -23,7 +23,7 @@ from ..services.feedback import FeedbackService
from ..services.registry import MediaRegistryService
from ..services.session import SessionsService
from ..services.watch_history import WatchHistoryService
from .state import ControlFlow, State
from .state import InternalDirective, State
logger = logging.getLogger(__name__)
@@ -140,22 +140,22 @@ class Session:
self._context, current_state
)
if isinstance(next_step, ControlFlow):
if next_step == ControlFlow.EXIT:
if isinstance(next_step, InternalDirective):
if next_step == InternalDirective.EXIT:
break
elif next_step == ControlFlow.BACK:
elif next_step == InternalDirective.BACK:
if len(self._history) > 1:
self._history.pop()
elif next_step == ControlFlow.BACKX2:
elif next_step == InternalDirective.BACKX2:
if len(self._history) > 2:
self._history.pop()
self._history.pop()
elif next_step == ControlFlow.BACKX3:
elif next_step == InternalDirective.BACKX3:
if len(self._history) > 3:
self._history.pop()
self._history.pop()
self._history.pop()
elif next_step == ControlFlow.CONFIG_EDIT:
elif next_step == InternalDirective.CONFIG_EDIT:
self._edit_config()
else:
# if the state is main menu we should reset the history

View File

@@ -1,115 +1,71 @@
from enum import Enum, auto
from typing import Iterator, List, Literal, Optional
from typing import Dict, Optional, Union
from pydantic import BaseModel, ConfigDict
from pydantic import BaseModel, ConfigDict, Field
from ...libs.api.params import ApiSearchParams, UserListParams # Add this import
from ...libs.api.types import (
MediaItem,
MediaSearchResult,
MediaStatus,
UserListItem,
)
from ...libs.players.types import PlayerResult
from ...libs.api.params import MediaSearchParams, UserMediaListSearchParams
from ...libs.api.types import MediaItem, PageInfo
from ...libs.providers.anime.types import Anime, SearchResults, Server
class ControlFlow(Enum):
"""
Represents special commands to control the session loop instead of
transitioning to a new state. This provides a clear, type-safe alternative
to using magic strings.
"""
# TODO: is internal directive a good name
class InternalDirective(Enum):
BACK = auto()
"""Pop the current state from history and return to the previous one."""
BACKX2 = auto()
"""Pop x2 the current state from history and return to the previous one."""
BACKX3 = auto()
"""Pop x3 the current state from history and return to the previous one."""
EXIT = auto()
"""Terminate the interactive session gracefully."""
CONFIG_EDIT = auto()
"""Reload the application configuration and re-initialize the context."""
CONTINUE = auto()
"""
Stay in the current menu. This is useful for actions that don't
change the state but should not exit the menu (e.g., displaying an error).
"""
# ==============================================================================
# Nested State Models
# ==============================================================================
class MenuName(Enum):
MAIN = "MAIN"
AUTH = "AUTH"
EPISODES = "EPISODES"
RESULTS = "RESULTS"
SERVERS = "SERVERS"
WATCH_HISTORY = "WATCH_HISTORY"
PROVIDER_SEARCH = "PROVIDER_SEARCH"
PLAYER_CONTROLS = "PLAYER_CONTROLS"
USER_MEDIA_LIST = "USER_MEDIA_LIST"
SESSION_MANAGEMENT = "SESSION_MANAGEMENT"
class ProviderState(BaseModel):
"""
An immutable snapshot of data related to the anime provider.
This includes search results, the selected anime's full details,
and the latest fetched episode streams.
"""
class StateModel(BaseModel):
model_config = ConfigDict(frozen=True)
class MediaApiState(StateModel):
search_result: Optional[Dict[int, MediaItem]] = None
search_params: Optional[Union[MediaSearchParams, UserMediaListSearchParams]] = None
page_info: Optional[PageInfo] = None
media_id: Optional[int] = None
@property
def media_item(self) -> Optional[MediaItem]:
if self.search_result and self.media_id:
return self.search_result[self.media_id]
class ProviderState(StateModel):
search_results: Optional[SearchResults] = None
anime: Optional[Anime] = None
episode_streams: Optional[Iterator[Server]] = None
episode_number: Optional[str] = None
last_player_result: Optional[PlayerResult] = None
servers: Optional[List[Server]] = None
selected_server: Optional[Server] = None
episode: Optional[str] = None
servers: Optional[Dict[str, Server]] = None
server_name: Optional[str] = None
model_config = ConfigDict(
frozen=True,
# Required to allow complex types like iterators in the model.
arbitrary_types_allowed=True,
)
@property
def server(self) -> Optional[Server]:
if self.servers and self.server_name:
return self.servers[self.server_name]
class MediaApiState(BaseModel):
"""
An immutable snapshot of data related to the metadata API (e.g., AniList).
This includes search results and the full details of a selected media item.
"""
search_results: Optional[MediaSearchResult] = None
search_results_type: Optional[Literal["MEDIA_LIST", "USER_MEDIA_LIST"]] = None
sort: Optional[str] = None
query: Optional[str] = None
user_media_status: Optional[UserListItem] = None
media_status: Optional[MediaStatus] = None
anime: Optional[MediaItem] = None
# Add pagination support: store original search parameters to enable page navigation
original_api_params: Optional[ApiSearchParams] = None
original_user_list_params: Optional[UserListParams] = None
model_config = ConfigDict(frozen=True, arbitrary_types_allowed=True)
# ==============================================================================
# Root State Model
# ==============================================================================
class State(BaseModel):
"""
Represents the complete, immutable state of the interactive UI at a single
point in time. A new State object is created for each transition.
Attributes:
menu_name: The name of the menu function (e.g., 'MAIN', 'MEDIA_RESULTS')
that should be rendered for this state.
provider: Nested state for data from the anime provider.
media_api: Nested state for data from the metadata API (AniList).
"""
menu_name: str
provider: ProviderState = ProviderState()
media_api: MediaApiState = MediaApiState()
model_config = ConfigDict(frozen=True, arbitrary_types_allowed=True)
class State(StateModel):
menu_name: MenuName
provider: ProviderState = Field(default_factory=ProviderState)
media_api: MediaApiState = Field(default_factory=MediaApiState)

View File

@@ -1,6 +1,6 @@
from typing import List
from ....libs.api.params import ApiSearchParams
from ....libs.api.params import MediaSearchParams
from ....libs.api.types import MediaItem
@@ -37,7 +37,7 @@ class MediaFilter:
@classmethod
def apply(
cls, media_items: List[MediaItem], filters: ApiSearchParams
cls, media_items: List[MediaItem], filters: MediaSearchParams
) -> List[MediaItem]:
"""
Applies filtering, sorting, and pagination to a list of MediaItem objects.

View File

@@ -7,7 +7,7 @@ from typing import Dict, Generator, List, Optional
from ....core.config.model import MediaRegistryConfig
from ....core.exceptions import FastAnimeError
from ....core.utils.file import AtomicWriter, FileLock, check_file_modified
from ....libs.api.params import ApiSearchParams
from ....libs.api.params import MediaSearchParams
from ....libs.api.types import (
MediaItem,
MediaSearchResult,
@@ -245,7 +245,7 @@ class MediaRegistryService:
logger.warning(f"{self.media_registry_dir} is impure which caused: {e}")
return records
def search_for_media(self, params: ApiSearchParams) -> List[MediaItem]:
def search_for_media(self, params: MediaSearchParams) -> List[MediaItem]:
"""Search media by title."""
try:
# TODO: enhance performance

View File

@@ -8,7 +8,12 @@ from ....core.config import AnilistConfig
from ....core.utils.graphql import (
execute_graphql,
)
from ..base import ApiSearchParams, BaseApiClient, UpdateListEntryParams, UserListParams
from ..base import (
BaseApiClient,
MediaSearchParams,
UpdateListEntryParams,
UserMediaListSearchParams,
)
from ..types import MediaSearchResult, UserMediaListStatus, UserProfile
from . import gql, mapper
@@ -85,7 +90,7 @@ class AniListApi(BaseApiClient):
)
return mapper.to_generic_user_profile(response.json())
def search_media(self, params: ApiSearchParams) -> Optional[MediaSearchResult]:
def search_media(self, params: MediaSearchParams) -> Optional[MediaSearchResult]:
variables = {
search_params_map[k]: v
for k, v in params.__dict__.items()
@@ -126,7 +131,9 @@ class AniListApi(BaseApiClient):
)
return mapper.to_generic_search_result(response.json())
def search_media_list(self, params: UserListParams) -> Optional[MediaSearchResult]:
def search_media_list(
self, params: UserMediaListSearchParams
) -> Optional[MediaSearchResult]:
if not self.user_profile:
logger.error("Cannot fetch user list: user is not authenticated.")
return None
@@ -203,14 +210,14 @@ if __name__ == "__main__":
from ....core.config import AnilistConfig
from ....core.constants import APP_ASCII_ART
from ..params import ApiSearchParams
from ..params import MediaSearchParams
anilist = AniListApi(AnilistConfig(), Client())
print(APP_ASCII_ART)
# search
query = input("What anime would you like to search for: ")
search_results = anilist.search_media(ApiSearchParams(query=query))
search_results = anilist.search_media(MediaSearchParams(query=query))
if not search_results:
print("Nothing was finding matching: ", query)
exit()

View File

@@ -4,7 +4,7 @@ from typing import Any, Optional
from httpx import Client
from ...core.config import AnilistConfig
from .params import ApiSearchParams, UpdateListEntryParams, UserListParams
from .params import MediaSearchParams, UpdateListEntryParams, UserMediaListSearchParams
from .types import MediaSearchResult, UserProfile
@@ -30,12 +30,14 @@ class BaseApiClient(abc.ABC):
pass
@abc.abstractmethod
def search_media(self, params: ApiSearchParams) -> Optional[MediaSearchResult]:
def search_media(self, params: MediaSearchParams) -> Optional[MediaSearchResult]:
"""Searches for media based on a query and other filters."""
pass
@abc.abstractmethod
def search_media_list(self, params: UserListParams) -> Optional[MediaSearchResult]:
def search_media_list(
self, params: UserMediaListSearchParams
) -> Optional[MediaSearchResult]:
pass
@abc.abstractmethod

View File

@@ -4,10 +4,10 @@ import logging
from typing import TYPE_CHECKING, List, Optional
from ..base import (
ApiSearchParams,
BaseApiClient,
MediaSearchParams,
UpdateListEntryParams,
UserListParams,
UserMediaListSearchParams,
)
from ..types import MediaItem, MediaSearchResult, UserProfile
from . import mapper
@@ -45,7 +45,7 @@ class JikanApi(BaseApiClient):
# --- Read-Only Method Implementations ---
def search_media(self, params: ApiSearchParams) -> Optional[MediaSearchResult]:
def search_media(self, params: MediaSearchParams) -> Optional[MediaSearchResult]:
"""Searches for anime on MyAnimeList via Jikan."""
jikan_params = {
"q": params.query,
@@ -87,7 +87,9 @@ class JikanApi(BaseApiClient):
logger.warning("Jikan API does not support user profiles.")
return None
def fetch_user_list(self, params: UserListParams) -> Optional[MediaSearchResult]:
def fetch_user_list(
self, params: UserMediaListSearchParams
) -> Optional[MediaSearchResult]:
logger.warning("Jikan API does not support fetching user lists.")
return None

View File

@@ -15,7 +15,7 @@ from .types import (
@dataclass(frozen=True)
class ApiSearchParams:
class MediaSearchParams:
query: Optional[str] = None
page: int = 1
per_page: Optional[int] = None
@@ -67,7 +67,7 @@ class ApiSearchParams:
@dataclass(frozen=True)
class UserListParams:
class UserMediaListSearchParams:
status: UserMediaListStatus
page: int = 1
type: Optional[MediaType] = None