diff --git a/fastanime/cli/interactive/menus/episodes.py b/fastanime/cli/interactive/menus/episodes.py index acacab4..764ce3e 100644 --- a/fastanime/cli/interactive/menus/episodes.py +++ b/fastanime/cli/interactive/menus/episodes.py @@ -35,33 +35,46 @@ def episodes(ctx: Context, state: State) -> State | ControlFlow: chosen_episode: str | None = None - if config.stream.continue_from_watch_history and False: - progress = ( - anilist_anime.user_status.progress - if anilist_anime.user_status and anilist_anime.user_status.progress - else 0 - ) - - # Calculate the next episode based on progress - next_episode_num = str(progress + 1) - - if next_episode_num in available_episodes: - click.echo( - f"[cyan]Continuing from history. Auto-selecting episode {next_episode_num}.[/cyan]" + if config.stream.continue_from_watch_history: + # Use our new watch history system + from ...utils.watch_history_tracker import get_continue_episode, track_episode_viewing + + # Try to get continue episode from watch history + if config.stream.preferred_watch_history == "local": + chosen_episode = get_continue_episode(anilist_anime, available_episodes, prefer_history=True) + if chosen_episode: + click.echo( + f"[cyan]Continuing from local watch history. Auto-selecting episode {chosen_episode}.[/cyan]" + ) + + # Fallback to AniList progress if local history doesn't have info or preference is remote + if not chosen_episode and config.stream.preferred_watch_history == "remote": + progress = ( + anilist_anime.user_status.progress + if anilist_anime.user_status and anilist_anime.user_status.progress + else 0 ) - chosen_episode = next_episode_num - else: - # If the next episode isn't available, fall back to the last watched one - last_watched_num = str(progress) - if last_watched_num in available_episodes: + + # Calculate the next episode based on progress + next_episode_num = str(progress + 1) + + if next_episode_num in available_episodes: click.echo( - f"[cyan]Next episode ({next_episode_num}) not found. Falling back to last watched episode {last_watched_num}.[/cyan]" + f"[cyan]Continuing from AniList history. Auto-selecting episode {next_episode_num}.[/cyan]" ) - chosen_episode = last_watched_num + chosen_episode = next_episode_num else: - click.echo( - f"[yellow]Could not find episode based on your watch history. Please select manually.[/yellow]" - ) + # If the next episode isn't available, fall back to the last watched one + last_watched_num = str(progress) + if last_watched_num in available_episodes: + click.echo( + f"[cyan]Next episode ({next_episode_num}) not found. Falling back to last watched episode {last_watched_num}.[/cyan]" + ) + chosen_episode = last_watched_num + else: + click.echo( + f"[yellow]Could not find episode based on your watch history. Please select manually.[/yellow]" + ) if not chosen_episode: choices = [*sorted(available_episodes, key=float), "Back"] @@ -78,6 +91,15 @@ def episodes(ctx: Context, state: State) -> State | ControlFlow: chosen_episode = chosen_episode_str + # Track episode selection in watch history (if enabled in config) + if config.stream.continue_from_watch_history and config.stream.preferred_watch_history == "local": + from ...utils.watch_history_tracker import track_episode_viewing + try: + episode_num = int(chosen_episode) + track_episode_viewing(anilist_anime, episode_num, start_tracking=True) + except (ValueError, AttributeError): + pass # Skip tracking if episode number is invalid + return State( menu_name="SERVERS", media_api=state.media_api, diff --git a/fastanime/cli/interactive/menus/main.py b/fastanime/cli/interactive/menus/main.py index 12650a7..dd5fdc0 100644 --- a/fastanime/cli/interactive/menus/main.py +++ b/fastanime/cli/interactive/menus/main.py @@ -58,8 +58,10 @@ def main(ctx: Context, state: State) -> State | ControlFlow: f"{'๐Ÿ” ' if icons else ''}Rewatching": _create_user_list_action( ctx, "REPEATING" ), + # --- Local Watch History --- + f"{'๐Ÿ“– ' if icons else ''}Local Watch History": lambda: ("WATCH_HISTORY", None), # --- Control Flow and Utility Options --- - f"{'๏ฟฝ ' if icons else ''}Session Management": lambda: ("SESSION_MANAGEMENT", None), + f"{'๐Ÿ”ง ' if icons else ''}Session Management": lambda: ("SESSION_MANAGEMENT", None), f"{'๏ฟฝ๐Ÿ“ ' if icons else ''}Edit Config": lambda: ("RELOAD_CONFIG", None), f"{'โŒ ' if icons else ''}Exit": lambda: ("EXIT", None), } diff --git a/fastanime/cli/interactive/menus/media_actions.py b/fastanime/cli/interactive/menus/media_actions.py index 78d04fb..80457f3 100644 --- a/fastanime/cli/interactive/menus/media_actions.py +++ b/fastanime/cli/interactive/menus/media_actions.py @@ -36,6 +36,7 @@ def media_actions(ctx: Context, state: State) -> State | ControlFlow: f"{'๐Ÿ“ผ ' if icons else ''}Watch Trailer": _watch_trailer(ctx, state), f"{'โž• ' if icons else ''}Add/Update List": _add_to_list(ctx, state), f"{'โญ ' if icons else ''}Score Anime": _score_anime(ctx, state), + f"{'๐Ÿ“š ' if icons else ''}Add to Local History": _add_to_local_history(ctx, state), f"{'โ„น๏ธ ' if icons else ''}View Info": _view_info(ctx, state), f"{'๐Ÿ”™ ' if icons else ''}Back to Results": lambda: ControlFlow.BACK, } @@ -218,3 +219,71 @@ def _update_user_list_with_feedback( error_msg="Failed to update list entry", show_loading=False, ) + + +def _add_to_local_history(ctx: Context, state: State) -> MenuAction: + """Add anime to local watch history with status selection.""" + + def action() -> State | ControlFlow: + anime = state.media_api.anime + if not anime: + click.echo("[bold red]No anime data available.[/bold red]") + return ControlFlow.CONTINUE + + feedback = create_feedback_manager(ctx.config.general.icons) + + # Check if already in watch history + from ...utils.watch_history_manager import WatchHistoryManager + history_manager = WatchHistoryManager() + existing_entry = history_manager.get_entry(anime.id) + + if existing_entry: + # Ask if user wants to update existing entry + if not feedback.confirm(f"'{existing_entry.get_display_title()}' is already in your local watch history. Update it?"): + return ControlFlow.CONTINUE + + # Status selection + statuses = ["watching", "completed", "planning", "paused", "dropped"] + status_choices = [status.title() for status in statuses] + + chosen_status = ctx.selector.choose( + "Select status for local watch history:", + choices=status_choices + ["Cancel"] + ) + + if not chosen_status or chosen_status == "Cancel": + return ControlFlow.CONTINUE + + status = chosen_status.lower() + + # Episode number if applicable + episode = 0 + if status in ["watching", "completed"]: + if anime.episodes and anime.episodes > 1: + episode_str = ctx.selector.ask(f"Enter current episode (1-{anime.episodes}, default: 0):") + try: + episode = int(episode_str) if episode_str else 0 + episode = max(0, min(episode, anime.episodes)) + except ValueError: + episode = 0 + + # Mark as completed if status is completed + if status == "completed" and anime.episodes: + episode = anime.episodes + + # Add to watch history + from ...utils.watch_history_tracker import watch_tracker + success = watch_tracker.add_anime_to_history(anime, status) + + if success and episode > 0: + # Update episode progress + history_manager.mark_episode_watched(anime.id, episode, 1.0 if status == "completed" else 0.0) + + if success: + feedback.success(f"Added '{anime.title.english or anime.title.romaji}' to local watch history with status: {status}") + else: + feedback.error("Failed to add anime to local watch history") + + return ControlFlow.CONTINUE + + return action diff --git a/fastanime/cli/interactive/menus/player_controls.py b/fastanime/cli/interactive/menus/player_controls.py index 9f1afdf..2098d85 100644 --- a/fastanime/cli/interactive/menus/player_controls.py +++ b/fastanime/cli/interactive/menus/player_controls.py @@ -83,6 +83,14 @@ def player_controls(ctx: Context, state: State) -> State | ControlFlow: _update_progress_in_background( ctx, anilist_anime.id, int(current_episode_num) ) + + # Also update local watch history if enabled + if config.stream.continue_from_watch_history and config.stream.preferred_watch_history == "local": + from ...utils.watch_history_tracker import update_episode_progress + try: + update_episode_progress(anilist_anime.id, int(current_episode_num), completion_pct) + except (ValueError, AttributeError): + pass # Skip if episode number conversion fails # --- Auto-Next Logic --- available_episodes = getattr( @@ -93,6 +101,15 @@ def player_controls(ctx: Context, state: State) -> State | ControlFlow: if config.stream.auto_next and current_index < len(available_episodes) - 1: console.print("[cyan]Auto-playing next episode...[/cyan]") next_episode_num = available_episodes[current_index + 1] + + # Track next episode in watch history + if config.stream.continue_from_watch_history and config.stream.preferred_watch_history == "local" and anilist_anime: + from ...utils.watch_history_tracker import track_episode_viewing + try: + track_episode_viewing(anilist_anime, int(next_episode_num), start_tracking=True) + except (ValueError, AttributeError): + pass + return State( menu_name="SERVERS", media_api=state.media_api, @@ -105,6 +122,15 @@ def player_controls(ctx: Context, state: State) -> State | ControlFlow: def next_episode() -> State | ControlFlow: if current_index < len(available_episodes) - 1: next_episode_num = available_episodes[current_index + 1] + + # Track next episode in watch history + if config.stream.continue_from_watch_history and config.stream.preferred_watch_history == "local" and anilist_anime: + from ...utils.watch_history_tracker import track_episode_viewing + try: + track_episode_viewing(anilist_anime, int(next_episode_num), start_tracking=True) + except (ValueError, AttributeError): + pass + # Transition back to the SERVERS menu with the new episode number. return State( menu_name="SERVERS", diff --git a/fastanime/cli/interactive/menus/watch_history.py b/fastanime/cli/interactive/menus/watch_history.py new file mode 100644 index 0000000..c7b62c2 --- /dev/null +++ b/fastanime/cli/interactive/menus/watch_history.py @@ -0,0 +1,524 @@ +""" +Watch History Management Menu for the interactive CLI. +Provides comprehensive watch history viewing, editing, and management capabilities. +""" + +import logging +from pathlib import Path +from typing import Callable, Dict, List + +from rich.console import Console +from rich.table import Table +from rich.text import Text + +from ....core.constants import APP_DATA_DIR +from ...utils.feedback import create_feedback_manager +from ...utils.watch_history_manager import WatchHistoryManager +from ...utils.watch_history_types import WatchHistoryEntry +from ..session import Context, session +from ..state import ControlFlow, State + +logger = logging.getLogger(__name__) + +MenuAction = Callable[[], str] + + +@session.menu +def watch_history(ctx: Context, state: State) -> State | ControlFlow: + """ + Watch history management menu for viewing and managing local watch history. + """ + icons = ctx.config.general.icons + feedback = create_feedback_manager(icons) + console = Console() + console.clear() + + # Initialize watch history manager + history_manager = WatchHistoryManager() + + # Show watch history stats + _display_history_stats(console, history_manager, icons) + + options: Dict[str, MenuAction] = { + f"{'๐Ÿ“บ ' if icons else ''}Currently Watching": lambda: _view_watching(ctx, history_manager, feedback), + f"{'โœ… ' if icons else ''}Completed Anime": lambda: _view_completed(ctx, history_manager, feedback), + f"{'๐Ÿ•’ ' if icons else ''}Recently Watched": lambda: _view_recent(ctx, history_manager, feedback), + f"{'๐Ÿ“‹ ' if icons else ''}View All History": lambda: _view_all_history(ctx, history_manager, feedback), + f"{'๐Ÿ” ' if icons else ''}Search History": lambda: _search_history(ctx, history_manager, feedback), + f"{'โœ๏ธ ' if icons else ''}Edit Entry": lambda: _edit_entry(ctx, history_manager, feedback), + f"{'๐Ÿ—‘๏ธ ' if icons else ''}Remove Entry": lambda: _remove_entry(ctx, history_manager, feedback), + f"{'๐Ÿ“Š ' if icons else ''}View Statistics": lambda: _view_stats(ctx, history_manager, feedback), + f"{'๐Ÿ’พ ' if icons else ''}Export History": lambda: _export_history(ctx, history_manager, feedback), + f"{'๐Ÿ“ฅ ' if icons else ''}Import History": lambda: _import_history(ctx, history_manager, feedback), + f"{'๐Ÿงน ' if icons else ''}Clear All History": lambda: _clear_history(ctx, history_manager, feedback), + f"{'๐Ÿ”™ ' if icons else ''}Back to Main Menu": lambda: "BACK", + } + + choice_str = ctx.selector.choose( + prompt="Select Watch History Action", + choices=list(options.keys()), + header="Watch History Management", + ) + + if not choice_str: + return ControlFlow.BACK + + result = options[choice_str]() + + if result == "BACK": + return ControlFlow.BACK + else: + return ControlFlow.CONTINUE + + +def _display_history_stats(console: Console, history_manager: WatchHistoryManager, icons: bool): + """Display current watch history statistics.""" + stats = history_manager.get_stats() + + # Create a stats table + table = Table(title=f"{'๐Ÿ“Š ' if icons else ''}Watch History Overview") + table.add_column("Metric", style="cyan") + table.add_column("Count", style="green") + + table.add_row("Total Anime", str(stats["total_entries"])) + table.add_row("Currently Watching", str(stats["watching"])) + table.add_row("Completed", str(stats["completed"])) + table.add_row("Dropped", str(stats["dropped"])) + table.add_row("Paused", str(stats["paused"])) + table.add_row("Total Episodes", str(stats["total_episodes_watched"])) + table.add_row("Last Updated", stats["last_updated"]) + + console.print(table) + console.print() + + +def _view_watching(ctx: Context, history_manager: WatchHistoryManager, feedback) -> str: + """View currently watching anime.""" + entries = history_manager.get_watching_entries() + + if not entries: + feedback.info("No anime currently being watched") + return "CONTINUE" + + return _display_entries_list(ctx, entries, "Currently Watching", feedback) + + +def _view_completed(ctx: Context, history_manager: WatchHistoryManager, feedback) -> str: + """View completed anime.""" + entries = history_manager.get_completed_entries() + + if not entries: + feedback.info("No completed anime found") + return "CONTINUE" + + return _display_entries_list(ctx, entries, "Completed Anime", feedback) + + +def _view_recent(ctx: Context, history_manager: WatchHistoryManager, feedback) -> str: + """View recently watched anime.""" + entries = history_manager.get_recently_watched(20) + + if not entries: + feedback.info("No recent watch history found") + return "CONTINUE" + + return _display_entries_list(ctx, entries, "Recently Watched", feedback) + + +def _view_all_history(ctx: Context, history_manager: WatchHistoryManager, feedback) -> str: + """View all watch history entries.""" + entries = history_manager.get_all_entries() + + if not entries: + feedback.info("No watch history found") + return "CONTINUE" + + # Sort by last watched date + entries.sort(key=lambda x: x.last_watched, reverse=True) + + return _display_entries_list(ctx, entries, "All Watch History", feedback) + + +def _search_history(ctx: Context, history_manager: WatchHistoryManager, feedback) -> str: + """Search watch history by title.""" + query = ctx.selector.ask("Enter search query:") + + if not query: + return "CONTINUE" + + entries = history_manager.search_entries(query) + + if not entries: + feedback.info(f"No anime found matching '{query}'") + return "CONTINUE" + + return _display_entries_list(ctx, entries, f"Search Results for '{query}'", feedback) + + +def _display_entries_list(ctx: Context, entries: List[WatchHistoryEntry], title: str, feedback) -> str: + """Display a list of watch history entries and allow selection.""" + console = Console() + console.clear() + + # Create table for entries + table = Table(title=title) + table.add_column("Status", style="yellow", width=6) + table.add_column("Title", style="cyan") + table.add_column("Progress", style="green", width=12) + table.add_column("Last Watched", style="blue", width=12) + + choices = [] + entry_map = {} + + for i, entry in enumerate(entries): + # Format last watched date + last_watched = entry.last_watched.strftime("%Y-%m-%d") + + # Add to table + table.add_row( + entry.get_status_emoji(), + entry.get_display_title(), + entry.get_progress_display(), + last_watched + ) + + # Create choice for selector + choice_text = f"{entry.get_status_emoji()} {entry.get_display_title()} - {entry.get_progress_display()}" + choices.append(choice_text) + entry_map[choice_text] = entry + + console.print(table) + console.print() + + if not choices: + feedback.info("No entries to display") + feedback.pause_for_user() + return "CONTINUE" + + choices.append("Back") + + choice = ctx.selector.choose( + "Select an anime for details:", + choices=choices + ) + + if not choice or choice == "Back": + return "CONTINUE" + + selected_entry = entry_map[choice] + return _show_entry_details(ctx, selected_entry, feedback) + + +def _show_entry_details(ctx: Context, entry: WatchHistoryEntry, feedback) -> str: + """Show detailed information about a watch history entry.""" + console = Console() + console.clear() + + # Display detailed entry information + console.print(f"[bold cyan]{entry.get_display_title()}[/bold cyan]") + console.print(f"Status: {entry.get_status_emoji()} {entry.status.title()}") + console.print(f"Progress: {entry.get_progress_display()}") + console.print(f"Times Watched: {entry.times_watched}") + console.print(f"First Watched: {entry.first_watched.strftime('%Y-%m-%d %H:%M')}") + console.print(f"Last Watched: {entry.last_watched.strftime('%Y-%m-%d %H:%M')}") + + if entry.notes: + console.print(f"Notes: {entry.notes}") + + # Show media details if available + media = entry.media_item + if media.description: + console.print(f"\nDescription: {media.description[:200]}{'...' if len(media.description) > 200 else ''}") + + if media.genres: + console.print(f"Genres: {', '.join(media.genres)}") + + if media.average_score: + console.print(f"Score: {media.average_score}/100") + + console.print() + + # Action options + actions = [ + "Mark Episode as Watched", + "Change Status", + "Edit Notes", + "Remove from History", + "Back to List" + ] + + choice = ctx.selector.choose( + "Select action:", + choices=actions + ) + + if choice == "Mark Episode as Watched": + return _mark_episode_watched(ctx, entry, feedback) + elif choice == "Change Status": + return _change_entry_status(ctx, entry, feedback) + elif choice == "Edit Notes": + return _edit_entry_notes(ctx, entry, feedback) + elif choice == "Remove from History": + return _confirm_remove_entry(ctx, entry, feedback) + else: + return "CONTINUE" + + +def _mark_episode_watched(ctx: Context, entry: WatchHistoryEntry, feedback) -> str: + """Mark a specific episode as watched.""" + current_episode = entry.last_watched_episode + max_episodes = entry.media_item.episodes or 999 + + episode_str = ctx.selector.ask(f"Enter episode number (current: {current_episode}, max: {max_episodes}):") + + try: + episode = int(episode_str) + if episode < 1 or (max_episodes and episode > max_episodes): + feedback.error(f"Invalid episode number. Must be between 1 and {max_episodes}") + return "CONTINUE" + + history_manager = WatchHistoryManager() + success = history_manager.mark_episode_watched(entry.media_item.id, episode) + + if success: + feedback.success(f"Marked episode {episode} as watched") + else: + feedback.error("Failed to update watch progress") + + except ValueError: + feedback.error("Invalid episode number entered") + + return "CONTINUE" + + +def _change_entry_status(ctx: Context, entry: WatchHistoryEntry, feedback) -> str: + """Change the status of a watch history entry.""" + statuses = ["watching", "completed", "paused", "dropped", "planning"] + current_status = entry.status + + choices = [f"{status.title()} {'(current)' if status == current_status else ''}" for status in statuses] + choices.append("Cancel") + + choice = ctx.selector.choose( + f"Select new status (current: {current_status}):", + choices=choices + ) + + if not choice or choice == "Cancel": + return "CONTINUE" + + new_status = choice.split()[0].lower() + + history_manager = WatchHistoryManager() + success = history_manager.change_status(entry.media_item.id, new_status) + + if success: + feedback.success(f"Changed status to {new_status}") + else: + feedback.error("Failed to update status") + + return "CONTINUE" + + +def _edit_entry_notes(ctx: Context, entry: WatchHistoryEntry, feedback) -> str: + """Edit notes for a watch history entry.""" + current_notes = entry.notes or "" + + new_notes = ctx.selector.ask(f"Enter notes (current: '{current_notes}'):") + + if new_notes is None: # User cancelled + return "CONTINUE" + + history_manager = WatchHistoryManager() + success = history_manager.update_notes(entry.media_item.id, new_notes) + + if success: + feedback.success("Notes updated successfully") + else: + feedback.error("Failed to update notes") + + return "CONTINUE" + + +def _confirm_remove_entry(ctx: Context, entry: WatchHistoryEntry, feedback) -> str: + """Confirm and remove a watch history entry.""" + if feedback.confirm(f"Remove '{entry.get_display_title()}' from watch history?"): + history_manager = WatchHistoryManager() + success = history_manager.remove_entry(entry.media_item.id) + + if success: + feedback.success("Entry removed from watch history") + else: + feedback.error("Failed to remove entry") + + return "CONTINUE" + + +def _edit_entry(ctx: Context, history_manager: WatchHistoryManager, feedback) -> str: + """Edit a watch history entry (select first).""" + entries = history_manager.get_all_entries() + + if not entries: + feedback.info("No watch history entries to edit") + return "CONTINUE" + + # Sort by title for easier selection + entries.sort(key=lambda x: x.get_display_title()) + + choices = [f"{entry.get_display_title()} - {entry.get_progress_display()}" for entry in entries] + choices.append("Cancel") + + choice = ctx.selector.choose( + "Select anime to edit:", + choices=choices + ) + + if not choice or choice == "Cancel": + return "CONTINUE" + + # Find the selected entry + choice_title = choice.split(" - ")[0] + selected_entry = next((entry for entry in entries if entry.get_display_title() == choice_title), None) + + if selected_entry: + return _show_entry_details(ctx, selected_entry, feedback) + + return "CONTINUE" + + +def _remove_entry(ctx: Context, history_manager: WatchHistoryManager, feedback) -> str: + """Remove a watch history entry (select first).""" + entries = history_manager.get_all_entries() + + if not entries: + feedback.info("No watch history entries to remove") + return "CONTINUE" + + # Sort by title for easier selection + entries.sort(key=lambda x: x.get_display_title()) + + choices = [f"{entry.get_display_title()} - {entry.get_progress_display()}" for entry in entries] + choices.append("Cancel") + + choice = ctx.selector.choose( + "Select anime to remove:", + choices=choices + ) + + if not choice or choice == "Cancel": + return "CONTINUE" + + # Find the selected entry + choice_title = choice.split(" - ")[0] + selected_entry = next((entry for entry in entries if entry.get_display_title() == choice_title), None) + + if selected_entry: + return _confirm_remove_entry(ctx, selected_entry, feedback) + + return "CONTINUE" + + +def _view_stats(ctx: Context, history_manager: WatchHistoryManager, feedback) -> str: + """View detailed watch history statistics.""" + console = Console() + console.clear() + + stats = history_manager.get_stats() + + # Create detailed stats table + table = Table(title="Detailed Watch History Statistics") + table.add_column("Metric", style="cyan") + table.add_column("Value", style="green") + + table.add_row("Total Anime Entries", str(stats["total_entries"])) + table.add_row("Currently Watching", str(stats["watching"])) + table.add_row("Completed", str(stats["completed"])) + table.add_row("Dropped", str(stats["dropped"])) + table.add_row("Paused", str(stats["paused"])) + table.add_row("Total Episodes Watched", str(stats["total_episodes_watched"])) + table.add_row("Last Updated", stats["last_updated"]) + + # Calculate additional stats + if stats["total_entries"] > 0: + completion_rate = (stats["completed"] / stats["total_entries"]) * 100 + table.add_row("Completion Rate", f"{completion_rate:.1f}%") + + avg_episodes = stats["total_episodes_watched"] / stats["total_entries"] + table.add_row("Avg Episodes per Anime", f"{avg_episodes:.1f}") + + console.print(table) + feedback.pause_for_user() + + return "CONTINUE" + + +def _export_history(ctx: Context, history_manager: WatchHistoryManager, feedback) -> str: + """Export watch history to a file.""" + export_name = ctx.selector.ask("Enter export filename (without extension):") + + if not export_name: + return "CONTINUE" + + export_path = APP_DATA_DIR / f"{export_name}.json" + + if export_path.exists(): + if not feedback.confirm(f"File '{export_name}.json' already exists. Overwrite?"): + return "CONTINUE" + + success = history_manager.export_history(export_path) + + if success: + feedback.success(f"Watch history exported to {export_path}") + else: + feedback.error("Failed to export watch history") + + return "CONTINUE" + + +def _import_history(ctx: Context, history_manager: WatchHistoryManager, feedback) -> str: + """Import watch history from a file.""" + import_name = ctx.selector.ask("Enter import filename (without extension):") + + if not import_name: + return "CONTINUE" + + import_path = APP_DATA_DIR / f"{import_name}.json" + + if not import_path.exists(): + feedback.error(f"File '{import_name}.json' not found in {APP_DATA_DIR}") + return "CONTINUE" + + merge = feedback.confirm("Merge with existing history? (No = Replace existing history)") + + success = history_manager.import_history(import_path, merge=merge) + + if success: + action = "merged with" if merge else "replaced" + feedback.success(f"Watch history imported and {action} existing data") + else: + feedback.error("Failed to import watch history") + + return "CONTINUE" + + +def _clear_history(ctx: Context, history_manager: WatchHistoryManager, feedback) -> str: + """Clear all watch history with confirmation.""" + if not feedback.confirm("Are you sure you want to clear ALL watch history? This cannot be undone."): + return "CONTINUE" + + if not feedback.confirm("Final confirmation: Clear all watch history?"): + return "CONTINUE" + + # Create backup before clearing + backup_success = history_manager.backup_history() + if backup_success: + feedback.info("Backup created before clearing") + + success = history_manager.clear_history() + + if success: + feedback.success("All watch history cleared") + else: + feedback.error("Failed to clear watch history") + + return "CONTINUE" diff --git a/fastanime/cli/utils/__init__.py b/fastanime/cli/utils/__init__.py index e69de29..802dc73 100644 --- a/fastanime/cli/utils/__init__.py +++ b/fastanime/cli/utils/__init__.py @@ -0,0 +1,15 @@ +""" +Utility modules for the FastAnime CLI. +""" + +from .watch_history_manager import WatchHistoryManager +from .watch_history_tracker import WatchHistoryTracker, watch_tracker +from .watch_history_types import WatchHistoryEntry, WatchHistoryData + +__all__ = [ + "WatchHistoryManager", + "WatchHistoryTracker", + "watch_tracker", + "WatchHistoryEntry", + "WatchHistoryData", +] \ No newline at end of file diff --git a/fastanime/cli/utils/watch_history_manager.py b/fastanime/cli/utils/watch_history_manager.py new file mode 100644 index 0000000..a7097d5 --- /dev/null +++ b/fastanime/cli/utils/watch_history_manager.py @@ -0,0 +1,329 @@ +""" +Watch history manager for local storage operations. +Handles saving, loading, and managing local watch history data. +""" + +import json +import logging +from pathlib import Path +from typing import List, Optional + +from ...core.constants import USER_WATCH_HISTORY_PATH +from ...libs.api.types import MediaItem +from .watch_history_types import WatchHistoryData, WatchHistoryEntry + +logger = logging.getLogger(__name__) + + +class WatchHistoryManager: + """ + Manages local watch history storage and operations. + Provides comprehensive watch history management with error handling. + """ + + def __init__(self, history_file_path: Path = USER_WATCH_HISTORY_PATH): + self.history_file_path = history_file_path + self._data: Optional[WatchHistoryData] = None + self._ensure_history_file() + + def _ensure_history_file(self): + """Ensure the watch history file and directory exist.""" + try: + self.history_file_path.parent.mkdir(parents=True, exist_ok=True) + if not self.history_file_path.exists(): + # Create empty watch history file + empty_data = WatchHistoryData() + self._save_data(empty_data) + logger.info(f"Created new watch history file at {self.history_file_path}") + except Exception as e: + logger.error(f"Failed to ensure watch history file: {e}") + + def _load_data(self) -> WatchHistoryData: + """Load watch history data from file.""" + if self._data is not None: + return self._data + + try: + if not self.history_file_path.exists(): + self._data = WatchHistoryData() + return self._data + + with self.history_file_path.open('r', encoding='utf-8') as f: + data = json.load(f) + + self._data = WatchHistoryData.from_dict(data) + logger.debug(f"Loaded watch history with {len(self._data.entries)} entries") + return self._data + + except json.JSONDecodeError as e: + logger.error(f"Watch history file is corrupted: {e}") + # Create backup of corrupted file + backup_path = self.history_file_path.with_suffix('.backup') + self.history_file_path.rename(backup_path) + logger.info(f"Corrupted file moved to {backup_path}") + + # Create new empty data + self._data = WatchHistoryData() + self._save_data(self._data) + return self._data + + except Exception as e: + logger.error(f"Failed to load watch history: {e}") + self._data = WatchHistoryData() + return self._data + + def _save_data(self, data: WatchHistoryData) -> bool: + """Save watch history data to file.""" + try: + # Create backup of existing file + if self.history_file_path.exists(): + backup_path = self.history_file_path.with_suffix('.bak') + self.history_file_path.rename(backup_path) + + with self.history_file_path.open('w', encoding='utf-8') as f: + json.dump(data.to_dict(), f, indent=2, ensure_ascii=False) + + # Remove backup on successful save + backup_path = self.history_file_path.with_suffix('.bak') + if backup_path.exists(): + backup_path.unlink() + + logger.debug(f"Saved watch history with {len(data.entries)} entries") + return True + + except Exception as e: + logger.error(f"Failed to save watch history: {e}") + # Restore backup if save failed + backup_path = self.history_file_path.with_suffix('.bak') + if backup_path.exists(): + backup_path.rename(self.history_file_path) + return False + + def add_or_update_entry( + self, + media_item: MediaItem, + episode: int = 0, + progress: float = 0.0, + status: str = "watching", + notes: str = "" + ) -> bool: + """Add or update a watch history entry.""" + try: + data = self._load_data() + entry = data.add_or_update_entry(media_item, episode, progress, status) + if notes: + entry.notes = notes + + success = self._save_data(data) + if success: + self._data = data # Update cached data + logger.info(f"Updated watch history for {entry.get_display_title()}") + return success + + except Exception as e: + logger.error(f"Failed to add/update watch history entry: {e}") + return False + + def get_entry(self, media_id: int) -> Optional[WatchHistoryEntry]: + """Get a specific watch history entry.""" + try: + data = self._load_data() + return data.get_entry(media_id) + except Exception as e: + logger.error(f"Failed to get watch history entry: {e}") + return None + + def remove_entry(self, media_id: int) -> bool: + """Remove an entry from watch history.""" + try: + data = self._load_data() + removed = data.remove_entry(media_id) + + if removed: + success = self._save_data(data) + if success: + self._data = data + logger.info(f"Removed watch history entry for media ID {media_id}") + return success + return False + + except Exception as e: + logger.error(f"Failed to remove watch history entry: {e}") + return False + + def get_all_entries(self) -> List[WatchHistoryEntry]: + """Get all watch history entries.""" + try: + data = self._load_data() + return list(data.entries.values()) + except Exception as e: + logger.error(f"Failed to get all entries: {e}") + return [] + + def get_entries_by_status(self, status: str) -> List[WatchHistoryEntry]: + """Get entries by status (watching, completed, etc.).""" + try: + data = self._load_data() + return data.get_entries_by_status(status) + except Exception as e: + logger.error(f"Failed to get entries by status: {e}") + return [] + + def get_recently_watched(self, limit: int = 10) -> List[WatchHistoryEntry]: + """Get recently watched entries.""" + try: + data = self._load_data() + return data.get_recently_watched(limit) + except Exception as e: + logger.error(f"Failed to get recently watched: {e}") + return [] + + def search_entries(self, query: str) -> List[WatchHistoryEntry]: + """Search entries by title.""" + try: + data = self._load_data() + return data.search_entries(query) + except Exception as e: + logger.error(f"Failed to search entries: {e}") + return [] + + def get_watching_entries(self) -> List[WatchHistoryEntry]: + """Get entries that are currently being watched.""" + return self.get_entries_by_status("watching") + + def get_completed_entries(self) -> List[WatchHistoryEntry]: + """Get completed entries.""" + return self.get_entries_by_status("completed") + + def mark_episode_watched(self, media_id: int, episode: int, progress: float = 1.0) -> bool: + """Mark a specific episode as watched.""" + entry = self.get_entry(media_id) + if entry: + return self.add_or_update_entry( + entry.media_item, + episode, + progress, + entry.status + ) + return False + + def mark_completed(self, media_id: int) -> bool: + """Mark an anime as completed.""" + entry = self.get_entry(media_id) + if entry: + entry.mark_completed() + data = self._load_data() + return self._save_data(data) + return False + + def change_status(self, media_id: int, new_status: str) -> bool: + """Change the status of an entry.""" + entry = self.get_entry(media_id) + if entry: + return self.add_or_update_entry( + entry.media_item, + entry.last_watched_episode, + entry.watch_progress, + new_status + ) + return False + + def update_notes(self, media_id: int, notes: str) -> bool: + """Update notes for an entry.""" + entry = self.get_entry(media_id) + if entry: + return self.add_or_update_entry( + entry.media_item, + entry.last_watched_episode, + entry.watch_progress, + entry.status, + notes + ) + return False + + def get_stats(self) -> dict: + """Get watch history statistics.""" + try: + data = self._load_data() + return data.get_stats() + except Exception as e: + logger.error(f"Failed to get stats: {e}") + return { + "total_entries": 0, + "watching": 0, + "completed": 0, + "dropped": 0, + "paused": 0, + "total_episodes_watched": 0, + "last_updated": "Unknown" + } + + def export_history(self, export_path: Path) -> bool: + """Export watch history to a file.""" + try: + data = self._load_data() + with export_path.open('w', encoding='utf-8') as f: + json.dump(data.to_dict(), f, indent=2, ensure_ascii=False) + logger.info(f"Exported watch history to {export_path}") + return True + except Exception as e: + logger.error(f"Failed to export watch history: {e}") + return False + + def import_history(self, import_path: Path, merge: bool = True) -> bool: + """Import watch history from a file.""" + try: + if not import_path.exists(): + logger.error(f"Import file does not exist: {import_path}") + return False + + with import_path.open('r', encoding='utf-8') as f: + import_data = json.load(f) + + imported_history = WatchHistoryData.from_dict(import_data) + + if merge: + # Merge with existing data + current_data = self._load_data() + for media_id, entry in imported_history.entries.items(): + current_data.entries[media_id] = entry + success = self._save_data(current_data) + else: + # Replace existing data + success = self._save_data(imported_history) + + if success: + self._data = None # Force reload on next access + logger.info(f"Imported watch history from {import_path}") + + return success + + except Exception as e: + logger.error(f"Failed to import watch history: {e}") + return False + + def clear_history(self) -> bool: + """Clear all watch history.""" + try: + empty_data = WatchHistoryData() + success = self._save_data(empty_data) + if success: + self._data = empty_data + logger.info("Cleared all watch history") + return success + except Exception as e: + logger.error(f"Failed to clear watch history: {e}") + return False + + def backup_history(self, backup_path: Path = None) -> bool: + """Create a backup of watch history.""" + try: + if backup_path is None: + from datetime import datetime + timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') + backup_path = self.history_file_path.parent / f"watch_history_backup_{timestamp}.json" + + return self.export_history(backup_path) + except Exception as e: + logger.error(f"Failed to backup watch history: {e}") + return False diff --git a/fastanime/cli/utils/watch_history_tracker.py b/fastanime/cli/utils/watch_history_tracker.py new file mode 100644 index 0000000..0177ae5 --- /dev/null +++ b/fastanime/cli/utils/watch_history_tracker.py @@ -0,0 +1,273 @@ +""" +Watch history tracking utilities for integration with episode viewing and player controls. +Provides automatic watch history updates during episode viewing. +""" + +import logging +from typing import Optional + +from ...libs.api.types import MediaItem +from ..utils.watch_history_manager import WatchHistoryManager + +logger = logging.getLogger(__name__) + + +class WatchHistoryTracker: + """ + Tracks watch history automatically during episode viewing. + Integrates with the episode selection and player control systems. + """ + + def __init__(self): + self.history_manager = WatchHistoryManager() + + def track_episode_start(self, media_item: MediaItem, episode: int) -> bool: + """ + Track when an episode starts being watched. + + Args: + media_item: The anime being watched + episode: Episode number being started + + Returns: + True if tracking was successful + """ + try: + # Update or create watch history entry + success = self.history_manager.add_or_update_entry( + media_item=media_item, + episode=episode, + progress=0.0, + status="watching" + ) + + if success: + logger.info(f"Started tracking episode {episode} of {media_item.title.english or media_item.title.romaji}") + + return success + + except Exception as e: + logger.error(f"Failed to track episode start: {e}") + return False + + def track_episode_progress(self, media_id: int, episode: int, progress: float) -> bool: + """ + Track progress within an episode. + + Args: + media_id: ID of the anime + episode: Episode number + progress: Progress within the episode (0.0-1.0) + + Returns: + True if tracking was successful + """ + try: + success = self.history_manager.mark_episode_watched(media_id, episode, progress) + + if success and progress >= 0.8: # Consider episode "watched" at 80% + logger.info(f"Episode {episode} marked as watched (progress: {progress:.1%})") + + return success + + except Exception as e: + logger.error(f"Failed to track episode progress: {e}") + return False + + def track_episode_completion(self, media_id: int, episode: int) -> bool: + """ + Track when an episode is completed. + + Args: + media_id: ID of the anime + episode: Episode number completed + + Returns: + True if tracking was successful + """ + try: + # Mark episode as fully watched + success = self.history_manager.mark_episode_watched(media_id, episode, 1.0) + + if success: + # Check if this was the final episode and mark as completed + entry = self.history_manager.get_entry(media_id) + if entry and entry.media_item.episodes and episode >= entry.media_item.episodes: + self.history_manager.mark_completed(media_id) + logger.info(f"Anime completed: {entry.get_display_title()}") + else: + logger.info(f"Episode {episode} completed") + + return success + + except Exception as e: + logger.error(f"Failed to track episode completion: {e}") + return False + + def get_watch_progress(self, media_id: int) -> Optional[dict]: + """ + Get current watch progress for an anime. + + Args: + media_id: ID of the anime + + Returns: + Dictionary with progress info or None if not found + """ + try: + entry = self.history_manager.get_entry(media_id) + if entry: + return { + "last_episode": entry.last_watched_episode, + "progress": entry.watch_progress, + "status": entry.status, + "next_episode": entry.last_watched_episode + 1, + "title": entry.get_display_title(), + } + return None + + except Exception as e: + logger.error(f"Failed to get watch progress: {e}") + return None + + def should_continue_from_history(self, media_id: int, available_episodes: list) -> Optional[str]: + """ + Determine if we should continue from watch history and which episode. + + Args: + media_id: ID of the anime + available_episodes: List of available episode numbers + + Returns: + Episode number to continue from, or None if no history + """ + try: + progress = self.get_watch_progress(media_id) + if not progress: + return None + + last_episode = progress["last_episode"] + next_episode = last_episode + 1 + + # Check if next episode is available + if str(next_episode) in available_episodes: + logger.info(f"Continuing from episode {next_episode} based on watch history") + return str(next_episode) + # Fall back to last watched episode if next isn't available + elif str(last_episode) in available_episodes and last_episode > 0: + logger.info(f"Next episode not available, falling back to episode {last_episode}") + return str(last_episode) + + return None + + except Exception as e: + logger.error(f"Failed to determine continue episode: {e}") + return None + + def update_anime_status(self, media_id: int, status: str) -> bool: + """ + Update the status of an anime in watch history. + + Args: + media_id: ID of the anime + status: New status (watching, completed, dropped, paused) + + Returns: + True if update was successful + """ + try: + success = self.history_manager.change_status(media_id, status) + if success: + logger.info(f"Updated anime status to {status}") + return success + + except Exception as e: + logger.error(f"Failed to update anime status: {e}") + return False + + def add_anime_to_history(self, media_item: MediaItem, status: str = "planning") -> bool: + """ + Add an anime to watch history without watching any episodes. + + Args: + media_item: The anime to add + status: Initial status + + Returns: + True if successful + """ + try: + success = self.history_manager.add_or_update_entry( + media_item=media_item, + episode=0, + progress=0.0, + status=status + ) + + if success: + logger.info(f"Added {media_item.title.english or media_item.title.romaji} to watch history") + + return success + + except Exception as e: + logger.error(f"Failed to add anime to history: {e}") + return False + + +# Global tracker instance for use throughout the application +watch_tracker = WatchHistoryTracker() + + +def track_episode_viewing(media_item: MediaItem, episode: int, start_tracking: bool = True) -> bool: + """ + Convenience function to track episode viewing. + + Args: + media_item: The anime being watched + episode: Episode number + start_tracking: Whether to start tracking (True) or just update progress + + Returns: + True if tracking was successful + """ + if start_tracking: + return watch_tracker.track_episode_start(media_item, episode) + else: + return watch_tracker.track_episode_completion(media_item.id, episode) + + +def get_continue_episode(media_item: MediaItem, available_episodes: list, prefer_history: bool = True) -> Optional[str]: + """ + Get the episode to continue from based on watch history. + + Args: + media_item: The anime + available_episodes: List of available episodes + prefer_history: Whether to prefer local history over remote + + Returns: + Episode number to continue from + """ + if prefer_history: + return watch_tracker.should_continue_from_history(media_item.id, available_episodes) + return None + + +def update_episode_progress(media_id: int, episode: int, completion_percentage: float) -> bool: + """ + Update progress for an episode based on completion percentage. + + Args: + media_id: ID of the anime + episode: Episode number + completion_percentage: Completion percentage (0-100) + + Returns: + True if update was successful + """ + progress = completion_percentage / 100.0 + + if completion_percentage >= 80: # Consider episode completed at 80% + return watch_tracker.track_episode_completion(media_id, episode) + else: + return watch_tracker.track_episode_progress(media_id, episode, progress) diff --git a/fastanime/cli/utils/watch_history_types.py b/fastanime/cli/utils/watch_history_types.py new file mode 100644 index 0000000..2123efd --- /dev/null +++ b/fastanime/cli/utils/watch_history_types.py @@ -0,0 +1,296 @@ +""" +Watch history data models and types for the interactive CLI. +Provides comprehensive data structures for tracking and managing local watch history. +""" + +from __future__ import annotations + +import logging +from dataclasses import dataclass, field +from datetime import datetime +from typing import Dict, List, Optional + +from ...libs.api.types import MediaItem + +logger = logging.getLogger(__name__) + + +@dataclass +class WatchHistoryEntry: + """ + Represents a single entry in the watch history. + Contains media information and viewing progress. + """ + + media_item: MediaItem + last_watched_episode: int = 0 + watch_progress: float = 0.0 # Progress within the episode (0.0-1.0) + times_watched: int = 1 + first_watched: datetime = field(default_factory=datetime.now) + last_watched: datetime = field(default_factory=datetime.now) + status: str = "watching" # watching, completed, dropped, paused + notes: str = "" + + def to_dict(self) -> dict: + """Convert entry to dictionary for JSON serialization.""" + return { + "media_item": { + "id": self.media_item.id, + "id_mal": self.media_item.id_mal, + "type": self.media_item.type, + "title": { + "romaji": self.media_item.title.romaji, + "english": self.media_item.title.english, + "native": self.media_item.title.native, + }, + "status": self.media_item.status, + "format": self.media_item.format, + "cover_image": { + "large": self.media_item.cover_image.large if self.media_item.cover_image else None, + "medium": self.media_item.cover_image.medium if self.media_item.cover_image else None, + } if self.media_item.cover_image else None, + "banner_image": self.media_item.banner_image, + "description": self.media_item.description, + "episodes": self.media_item.episodes, + "duration": self.media_item.duration, + "genres": self.media_item.genres, + "synonyms": self.media_item.synonyms, + "average_score": self.media_item.average_score, + "popularity": self.media_item.popularity, + "favourites": self.media_item.favourites, + }, + "last_watched_episode": self.last_watched_episode, + "watch_progress": self.watch_progress, + "times_watched": self.times_watched, + "first_watched": self.first_watched.isoformat(), + "last_watched": self.last_watched.isoformat(), + "status": self.status, + "notes": self.notes, + } + + @classmethod + def from_dict(cls, data: dict) -> "WatchHistoryEntry": + """Create entry from dictionary.""" + from ...libs.api.types import MediaImage, MediaTitle + + media_data = data["media_item"] + + # Reconstruct MediaTitle + title_data = media_data.get("title", {}) + title = MediaTitle( + romaji=title_data.get("romaji"), + english=title_data.get("english"), + native=title_data.get("native"), + ) + + # Reconstruct MediaImage if present + cover_data = media_data.get("cover_image") + cover_image = None + if cover_data: + cover_image = MediaImage( + large=cover_data.get("large", ""), + medium=cover_data.get("medium"), + ) + + # Reconstruct MediaItem + media_item = MediaItem( + id=media_data["id"], + id_mal=media_data.get("id_mal"), + type=media_data.get("type", "ANIME"), + title=title, + status=media_data.get("status"), + format=media_data.get("format"), + cover_image=cover_image, + banner_image=media_data.get("banner_image"), + description=media_data.get("description"), + episodes=media_data.get("episodes"), + duration=media_data.get("duration"), + genres=media_data.get("genres", []), + synonyms=media_data.get("synonyms", []), + average_score=media_data.get("average_score"), + popularity=media_data.get("popularity"), + favourites=media_data.get("favourites"), + ) + + return cls( + media_item=media_item, + last_watched_episode=data.get("last_watched_episode", 0), + watch_progress=data.get("watch_progress", 0.0), + times_watched=data.get("times_watched", 1), + first_watched=datetime.fromisoformat(data.get("first_watched", datetime.now().isoformat())), + last_watched=datetime.fromisoformat(data.get("last_watched", datetime.now().isoformat())), + status=data.get("status", "watching"), + notes=data.get("notes", ""), + ) + + def update_progress(self, episode: int, progress: float = 0.0, status: str = None): + """Update watch progress for this entry.""" + self.last_watched_episode = max(self.last_watched_episode, episode) + self.watch_progress = progress + self.last_watched = datetime.now() + if status: + self.status = status + + def mark_completed(self): + """Mark this entry as completed.""" + self.status = "completed" + self.last_watched = datetime.now() + if self.media_item.episodes: + self.last_watched_episode = self.media_item.episodes + self.watch_progress = 1.0 + + def get_display_title(self) -> str: + """Get the best available title for display.""" + if self.media_item.title.english: + return self.media_item.title.english + elif self.media_item.title.romaji: + return self.media_item.title.romaji + elif self.media_item.title.native: + return self.media_item.title.native + else: + return f"Anime #{self.media_item.id}" + + def get_progress_display(self) -> str: + """Get a human-readable progress display.""" + if self.media_item.episodes: + return f"{self.last_watched_episode}/{self.media_item.episodes}" + else: + return f"Ep {self.last_watched_episode}" + + def get_status_emoji(self) -> str: + """Get emoji representation of status.""" + status_emojis = { + "watching": "๐Ÿ“บ", + "completed": "โœ…", + "dropped": "๐Ÿšฎ", + "paused": "โธ๏ธ", + "planning": "๐Ÿ“‘" + } + return status_emojis.get(self.status, "โ“") + + +@dataclass +class WatchHistoryData: + """Complete watch history data container.""" + + entries: Dict[int, WatchHistoryEntry] = field(default_factory=dict) + last_updated: datetime = field(default_factory=datetime.now) + format_version: str = "1.0" + + def to_dict(self) -> dict: + """Convert to dictionary for JSON serialization.""" + return { + "entries": {str(k): v.to_dict() for k, v in self.entries.items()}, + "last_updated": self.last_updated.isoformat(), + "format_version": self.format_version, + } + + @classmethod + def from_dict(cls, data: dict) -> "WatchHistoryData": + """Create from dictionary.""" + entries = {} + entries_data = data.get("entries", {}) + + for media_id_str, entry_data in entries_data.items(): + try: + media_id = int(media_id_str) + entry = WatchHistoryEntry.from_dict(entry_data) + entries[media_id] = entry + except (ValueError, KeyError) as e: + logger.warning(f"Skipping invalid watch history entry {media_id_str}: {e}") + + return cls( + entries=entries, + last_updated=datetime.fromisoformat(data.get("last_updated", datetime.now().isoformat())), + format_version=data.get("format_version", "1.0"), + ) + + def add_or_update_entry(self, media_item: MediaItem, episode: int = 0, progress: float = 0.0, status: str = "watching") -> WatchHistoryEntry: + """Add or update a watch history entry.""" + media_id = media_item.id + + if media_id in self.entries: + # Update existing entry + entry = self.entries[media_id] + entry.update_progress(episode, progress, status) + entry.times_watched += 1 + else: + # Create new entry + entry = WatchHistoryEntry( + media_item=media_item, + last_watched_episode=episode, + watch_progress=progress, + status=status, + ) + self.entries[media_id] = entry + + self.last_updated = datetime.now() + return entry + + def get_entry(self, media_id: int) -> Optional[WatchHistoryEntry]: + """Get a specific watch history entry.""" + return self.entries.get(media_id) + + def remove_entry(self, media_id: int) -> bool: + """Remove an entry from watch history.""" + if media_id in self.entries: + del self.entries[media_id] + self.last_updated = datetime.now() + return True + return False + + def get_entries_by_status(self, status: str) -> List[WatchHistoryEntry]: + """Get all entries with a specific status.""" + return [entry for entry in self.entries.values() if entry.status == status] + + def get_recently_watched(self, limit: int = 10) -> List[WatchHistoryEntry]: + """Get recently watched entries.""" + sorted_entries = sorted( + self.entries.values(), + key=lambda x: x.last_watched, + reverse=True + ) + return sorted_entries[:limit] + + def get_watching_entries(self) -> List[WatchHistoryEntry]: + """Get entries that are currently being watched.""" + return self.get_entries_by_status("watching") + + def get_completed_entries(self) -> List[WatchHistoryEntry]: + """Get completed entries.""" + return self.get_entries_by_status("completed") + + def search_entries(self, query: str) -> List[WatchHistoryEntry]: + """Search entries by title.""" + query_lower = query.lower() + results = [] + + for entry in self.entries.values(): + title = entry.get_display_title().lower() + if query_lower in title: + results.append(entry) + + return results + + def get_stats(self) -> dict: + """Get watch history statistics.""" + total_entries = len(self.entries) + watching = len(self.get_entries_by_status("watching")) + completed = len(self.get_entries_by_status("completed")) + dropped = len(self.get_entries_by_status("dropped")) + paused = len(self.get_entries_by_status("paused")) + + total_episodes = sum( + entry.last_watched_episode + for entry in self.entries.values() + ) + + return { + "total_entries": total_entries, + "watching": watching, + "completed": completed, + "dropped": dropped, + "paused": paused, + "total_episodes_watched": total_episodes, + "last_updated": self.last_updated.strftime("%Y-%m-%d %H:%M:%S"), + } diff --git a/test_watch_history.py b/test_watch_history.py new file mode 100644 index 0000000..e99ae59 --- /dev/null +++ b/test_watch_history.py @@ -0,0 +1,116 @@ +#!/usr/bin/env python3 +""" +Test script for watch history management implementation. +Tests basic functionality without requiring full interactive session. +""" + +import sys +from pathlib import Path + +# Add the project root to Python path +project_root = Path(__file__).parent.parent +sys.path.insert(0, str(project_root)) + +from fastanime.cli.utils.watch_history_manager import WatchHistoryManager +from fastanime.cli.utils.watch_history_tracker import WatchHistoryTracker +from fastanime.libs.api.types import MediaItem, MediaTitle, MediaImage + + +def test_watch_history(): + """Test basic watch history functionality.""" + print("Testing Watch History Management System") + print("=" * 50) + + # Create test media item + test_anime = MediaItem( + id=123456, + id_mal=12345, + title=MediaTitle( + english="Test Anime", + romaji="Test Anime Romaji", + native="ใƒ†ใ‚นใƒˆใ‚ขใƒ‹ใƒก" + ), + episodes=24, + cover_image=MediaImage( + large="https://example.com/cover.jpg", + medium="https://example.com/cover_medium.jpg" + ), + genres=["Action", "Adventure"], + average_score=85.0 + ) + + # Test watch history manager + print("\n1. Testing WatchHistoryManager...") + history_manager = WatchHistoryManager() + + # Add anime to history + success = history_manager.add_or_update_entry( + test_anime, + episode=5, + progress=0.8, + status="watching", + notes="Great anime so far!" + ) + print(f" Added anime to history: {success}") + + # Get entry back + entry = history_manager.get_entry(123456) + if entry: + print(f" Retrieved entry: {entry.get_display_title()}") + print(f" Progress: {entry.get_progress_display()}") + print(f" Status: {entry.status}") + print(f" Notes: {entry.notes}") + else: + print(" Failed to retrieve entry") + + # Test tracker + print("\n2. Testing WatchHistoryTracker...") + tracker = WatchHistoryTracker() + + # Track episode viewing + success = tracker.track_episode_start(test_anime, 6) + print(f" Started tracking episode 6: {success}") + + # Complete episode + success = tracker.track_episode_completion(123456, 6) + print(f" Completed episode 6: {success}") + + # Get progress + progress = tracker.get_watch_progress(123456) + if progress: + print(f" Current progress: Episode {progress['last_episode']}") + print(f" Next episode: {progress['next_episode']}") + print(f" Status: {progress['status']}") + + # Test stats + print("\n3. Testing Statistics...") + stats = history_manager.get_stats() + print(f" Total entries: {stats['total_entries']}") + print(f" Watching: {stats['watching']}") + print(f" Total episodes watched: {stats['total_episodes_watched']}") + + # Test search + print("\n4. Testing Search...") + search_results = history_manager.search_entries("Test") + print(f" Search results for 'Test': {len(search_results)} found") + + # Test status updates + print("\n5. Testing Status Updates...") + success = history_manager.change_status(123456, "completed") + print(f" Changed status to completed: {success}") + + # Verify status change + entry = history_manager.get_entry(123456) + if entry: + print(f" New status: {entry.status}") + + print("\n" + "=" * 50) + print("Watch History Test Complete!") + + # Cleanup test data + history_manager.remove_entry(123456) + print("Test data cleaned up.") + + +if __name__ == "__main__": + test_watch_history()