diff --git a/fastanime/cli/interactive/menus/main.py b/fastanime/cli/interactive/menus/main.py index 45e0852..12650a7 100644 --- a/fastanime/cli/interactive/menus/main.py +++ b/fastanime/cli/interactive/menus/main.py @@ -59,7 +59,8 @@ def main(ctx: Context, state: State) -> State | ControlFlow: ctx, "REPEATING" ), # --- Control Flow and Utility Options --- - f"{'📝 ' if icons else ''}Edit Config": lambda: ("RELOAD_CONFIG", None), + f"{'� ' if icons else ''}Session Management": lambda: ("SESSION_MANAGEMENT", None), + f"{'�📝 ' if icons else ''}Edit Config": lambda: ("RELOAD_CONFIG", None), f"{'❌ ' if icons else ''}Exit": lambda: ("EXIT", None), } @@ -81,6 +82,8 @@ def main(ctx: Context, state: State) -> State | ControlFlow: return ControlFlow.EXIT if next_menu_name == "RELOAD_CONFIG": return ControlFlow.RELOAD_CONFIG + if next_menu_name == "SESSION_MANAGEMENT": + return State(menu_name="SESSION_MANAGEMENT") if next_menu_name == "CONTINUE": return ControlFlow.CONTINUE diff --git a/fastanime/cli/interactive/menus/session_management.py b/fastanime/cli/interactive/menus/session_management.py new file mode 100644 index 0000000..2cc18a9 --- /dev/null +++ b/fastanime/cli/interactive/menus/session_management.py @@ -0,0 +1,241 @@ +""" +Session management menu for the interactive CLI. +Provides options to save, load, and manage session state. +""" + +from datetime import datetime +from pathlib import Path +from typing import Callable, Dict + +from rich.console import Console +from rich.table import Table + +from ....core.constants import APP_DIR +from ...utils.feedback import create_feedback_manager +from ..session import Context, session +from ..state import ControlFlow, State + +MenuAction = Callable[[], str] + + +@session.menu +def session_management(ctx: Context, state: State) -> State | ControlFlow: + """ + Session management menu for saving, loading, and managing session state. + """ + icons = ctx.config.general.icons + feedback = create_feedback_manager(icons) + console = Console() + console.clear() + + # Show current session stats + _display_session_info(console, icons) + + options: Dict[str, MenuAction] = { + f"{'💾 ' if icons else ''}Save Current Session": lambda: _save_session(ctx, feedback), + f"{'📂 ' if icons else ''}Load Session": lambda: _load_session(ctx, feedback), + f"{'📋 ' if icons else ''}List Saved Sessions": lambda: _list_sessions(ctx, feedback), + f"{'🗑️ ' if icons else ''}Cleanup Old Sessions": lambda: _cleanup_sessions(ctx, feedback), + f"{'💾 ' if icons else ''}Create Manual Backup": lambda: _create_backup(ctx, feedback), + f"{'⚙️ ' if icons else ''}Session Settings": lambda: _session_settings(ctx, feedback), + f"{'🔙 ' if icons else ''}Back to Main Menu": lambda: "BACK", + } + + choice_str = ctx.selector.choose( + prompt="Select Session Action", + choices=list(options.keys()), + header="Session Management", + ) + + if not choice_str: + return ControlFlow.BACK + + result = options[choice_str]() + + if result == "BACK": + return ControlFlow.BACK + else: + return ControlFlow.CONTINUE + + +def _display_session_info(console: Console, icons: bool): + """Display current session information.""" + session_stats = session.get_session_stats() + + table = Table(title=f"{'📊 ' if icons else ''}Current Session Info") + table.add_column("Property", style="cyan") + table.add_column("Value", style="green") + + table.add_row("Current States", str(session_stats["current_states"])) + table.add_row("Current Menu", session_stats["current_menu"] or "None") + table.add_row("Auto-Save", "Enabled" if session_stats["auto_save_enabled"] else "Disabled") + table.add_row("Has Auto-Save", "Yes" if session_stats["has_auto_save"] else "No") + table.add_row("Has Crash Backup", "Yes" if session_stats["has_crash_backup"] else "No") + + console.print(table) + console.print() + + +def _save_session(ctx: Context, feedback) -> str: + """Save the current session.""" + session_name = ctx.selector.ask("Enter session name (optional):") + description = ctx.selector.ask("Enter session description (optional):") + + if not session_name: + session_name = f"session_{datetime.now().strftime('%Y%m%d_%H%M%S')}" + + sessions_dir = APP_DIR / "sessions" + file_path = sessions_dir / f"{session_name}.json" + + if file_path.exists(): + if not feedback.confirm(f"Session '{session_name}' already exists. Overwrite?"): + feedback.info("Save cancelled") + return "CONTINUE" + + success = session.save(file_path, session_name, description or "") + if success: + feedback.success(f"Session saved as '{session_name}'") + + return "CONTINUE" + + +def _load_session(ctx: Context, feedback) -> str: + """Load a saved session.""" + sessions = session.list_saved_sessions() + + if not sessions: + feedback.warning("No saved sessions found") + return "CONTINUE" + + # Create choices with session info + choices = [] + session_map = {} + + for sess in sessions: + choice_text = f"{sess['name']} - {sess['description'][:50]}{'...' if len(sess['description']) > 50 else ''}" + choices.append(choice_text) + session_map[choice_text] = sess + + choices.append("Cancel") + + choice = ctx.selector.choose( + "Select session to load:", + choices=choices, + header="Available Sessions" + ) + + if not choice or choice == "Cancel": + return "CONTINUE" + + selected_session = session_map[choice] + file_path = Path(selected_session["path"]) + + if feedback.confirm(f"Load session '{selected_session['name']}'? This will replace your current session."): + success = session.resume(file_path, feedback) + if success: + feedback.info("Session loaded successfully. Returning to main menu.") + # Return to main menu after loading + return "MAIN" + + return "CONTINUE" + + +def _list_sessions(ctx: Context, feedback) -> str: + """List all saved sessions.""" + sessions = session.list_saved_sessions() + + if not sessions: + feedback.info("No saved sessions found") + return "CONTINUE" + + console = Console() + table = Table(title="Saved Sessions") + table.add_column("Name", style="cyan") + table.add_column("Description", style="yellow") + table.add_column("States", style="green") + table.add_column("Created", style="blue") + + for sess in sessions: + # Format the created date + created = sess["created"] + if "T" in created: + created = created.split("T")[0] # Just show the date part + + table.add_row( + sess["name"], + sess["description"][:40] + "..." if len(sess["description"]) > 40 else sess["description"], + str(sess["state_count"]), + created + ) + + console.print(table) + feedback.pause_for_user() + + return "CONTINUE" + + +def _cleanup_sessions(ctx: Context, feedback) -> str: + """Clean up old sessions.""" + sessions = session.list_saved_sessions() + + if len(sessions) <= 5: + feedback.info("No cleanup needed. You have 5 or fewer sessions.") + return "CONTINUE" + + max_sessions_str = ctx.selector.ask("How many sessions to keep? (default: 10)") + try: + max_sessions = int(max_sessions_str) if max_sessions_str else 10 + except ValueError: + feedback.error("Invalid number entered") + return "CONTINUE" + + if feedback.confirm(f"Delete sessions older than the {max_sessions} most recent?"): + deleted_count = session.cleanup_old_sessions(max_sessions) + feedback.success(f"Deleted {deleted_count} old sessions") + + return "CONTINUE" + + +def _create_backup(ctx: Context, feedback) -> str: + """Create a manual backup.""" + backup_name = ctx.selector.ask("Enter backup name (optional):") + + success = session.create_manual_backup(backup_name or "") + if success: + feedback.success("Manual backup created successfully") + + return "CONTINUE" + + +def _session_settings(ctx: Context, feedback) -> str: + """Configure session settings.""" + current_auto_save = session._auto_save_enabled + + choices = [ + f"Auto-Save: {'Enabled' if current_auto_save else 'Disabled'}", + "Clear Auto-Save File", + "Clear Crash Backup", + "Back" + ] + + choice = ctx.selector.choose( + "Session Settings:", + choices=choices + ) + + if choice and choice.startswith("Auto-Save"): + new_setting = not current_auto_save + session.enable_auto_save(new_setting) + feedback.success(f"Auto-save {'enabled' if new_setting else 'disabled'}") + + elif choice == "Clear Auto-Save File": + if feedback.confirm("Clear the auto-save file?"): + session._session_manager.clear_auto_save() + feedback.success("Auto-save file cleared") + + elif choice == "Clear Crash Backup": + if feedback.confirm("Clear the crash backup file?"): + session._session_manager.clear_crash_backup() + feedback.success("Crash backup cleared") + + return "CONTINUE" diff --git a/fastanime/cli/interactive/session.py b/fastanime/cli/interactive/session.py index 2c427bd..89da901 100644 --- a/fastanime/cli/interactive/session.py +++ b/fastanime/cli/interactive/session.py @@ -2,8 +2,9 @@ import importlib.util import logging import os from dataclasses import dataclass +from datetime import datetime from pathlib import Path -from typing import TYPE_CHECKING, Callable, List +from typing import Callable, List import click @@ -14,6 +15,7 @@ from ...libs.players.base import BasePlayer from ...libs.providers.anime.base import BaseAnimeProvider from ...libs.selectors.base import BaseSelector from ..config import ConfigLoader +from ..utils.session_manager import SessionManager from .state import ControlFlow, State logger = logging.getLogger(__name__) @@ -59,6 +61,8 @@ class Session: self._context: Context | None = None self._history: List[State] = [] self._menus: dict[str, Menu] = {} + self._session_manager = SessionManager() + self._auto_save_enabled = True def _load_context(self, config: AppConfig): """Initializes all shared services based on the provided configuration.""" @@ -152,14 +156,60 @@ class Session: config: The initial application configuration. resume_path: Optional path to a saved session file to resume from. """ + from ..utils.feedback import create_feedback_manager + + feedback = create_feedback_manager(True) # Always use icons for session messages + self._load_context(config) + # Handle session recovery if resume_path: - self.resume(resume_path) - elif not self._history: - # Start with the main menu if history is empty + self.resume(resume_path, feedback) + elif self._session_manager.has_crash_backup(): + # Offer to resume from crash backup + if feedback.confirm( + "Found a crash backup from a previous session. Would you like to resume?", + default=True + ): + crash_history = self._session_manager.load_crash_backup(feedback) + if crash_history: + self._history = crash_history + feedback.info("Session restored from crash backup") + # Clear the crash backup after successful recovery + self._session_manager.clear_crash_backup() + elif self._session_manager.has_auto_save(): + # Offer to resume from auto-save + if feedback.confirm( + "Found an auto-saved session. Would you like to resume?", + default=False + ): + auto_history = self._session_manager.load_auto_save(feedback) + if auto_history: + self._history = auto_history + feedback.info("Session restored from auto-save") + + # Start with main menu if no history + if not self._history: self._history.append(State(menu_name="MAIN")) + # Create crash backup before starting + if self._auto_save_enabled: + self._session_manager.create_crash_backup(self._history) + + try: + self._run_main_loop() + except KeyboardInterrupt: + feedback.warning("Session interrupted by user") + self._handle_session_exit(feedback, interrupted=True) + except Exception as e: + feedback.error("Session crashed unexpectedly", str(e)) + self._handle_session_exit(feedback, crashed=True) + raise + else: + self._handle_session_exit(feedback, normal_exit=True) + + def _run_main_loop(self): + """Run the main session loop.""" while self._history: current_state = self._history[-1] menu_to_run = self._menus.get(current_state.menu_name) @@ -170,6 +220,10 @@ class Session: ) break + # Auto-save periodically (every 5 state changes) + if self._auto_save_enabled and len(self._history) % 5 == 0: + self._session_manager.auto_save_session(self._history) + # Execute the menu function, which returns the next step. next_step = menu_to_run.execute(self._context, current_state) @@ -187,37 +241,109 @@ class Session: # if the state is main menu we should reset the history if next_step.menu_name == "MAIN": self._history = [next_step] - # A new state was returned, push it to history for the next loop. - self._history.append(next_step) + else: + # A new state was returned, push it to history for the next loop. + self._history.append(next_step) else: logger.error( f"Menu '{current_state.menu_name}' returned invalid type: {type(next_step)}" ) break + def _handle_session_exit(self, feedback, normal_exit=False, interrupted=False, crashed=False): + """Handle session cleanup on exit.""" + if self._auto_save_enabled and self._history: + if normal_exit: + # Clear auto-save on normal exit + self._session_manager.clear_auto_save() + self._session_manager.clear_crash_backup() + feedback.info("Session completed normally") + elif interrupted: + # Save session on interruption + self._session_manager.auto_save_session(self._history) + feedback.info("Session auto-saved due to interruption") + elif crashed: + # Keep crash backup on crash + feedback.error("Session backup maintained for recovery") + click.echo("Exiting interactive session.") - def save(self, file_path: Path): - """Serializes the session history to a JSON file.""" - history_dicts = [state.model_dump(mode="json") for state in self._history] - try: - file_path.write_text(str(history_dicts)) - logger.info(f"Session saved to {file_path}") - except IOError as e: - logger.error(f"Failed to save session: {e}") + def save(self, file_path: Path, session_name: str = None, description: str = None): + """ + Save session history to a file with comprehensive metadata and error handling. + + Args: + file_path: Path to save the session + session_name: Optional name for the session + description: Optional description for the session + """ + from ..utils.feedback import create_feedback_manager + + feedback = create_feedback_manager(True) + return self._session_manager.save_session( + self._history, + file_path, + session_name=session_name, + description=description, + feedback=feedback + ) - def resume(self, file_path: Path): - """Loads a session history from a JSON file.""" - if not file_path.exists(): - logger.warning(f"Resume file not found: {file_path}") - return - try: - history_dicts = file_path.read_text() - self._history = [State.model_validate(d) for d in history_dicts] - logger.info(f"Session resumed from {file_path}") - except Exception as e: - logger.error(f"Failed to resume session: {e}") - self._history = [] # Reset history on failure + def resume(self, file_path: Path, feedback=None): + """ + Load session history from a file with comprehensive error handling. + + Args: + file_path: Path to the session file + feedback: Optional feedback manager for user notifications + """ + if not feedback: + from ..utils.feedback import create_feedback_manager + feedback = create_feedback_manager(True) + + history = self._session_manager.load_session(file_path, feedback) + if history: + self._history = history + return True + return False + + def list_saved_sessions(self): + """List all saved sessions with their metadata.""" + return self._session_manager.list_saved_sessions() + + def cleanup_old_sessions(self, max_sessions: int = 10): + """Clean up old session files, keeping only the most recent ones.""" + return self._session_manager.cleanup_old_sessions(max_sessions) + + def enable_auto_save(self, enabled: bool = True): + """Enable or disable auto-save functionality.""" + self._auto_save_enabled = enabled + + def get_session_stats(self) -> dict: + """Get statistics about the current session.""" + return { + "current_states": len(self._history), + "current_menu": self._history[-1].menu_name if self._history else None, + "auto_save_enabled": self._auto_save_enabled, + "has_auto_save": self._session_manager.has_auto_save(), + "has_crash_backup": self._session_manager.has_crash_backup() + } + + def create_manual_backup(self, backup_name: str = None): + """Create a manual backup of the current session.""" + from ..utils.feedback import create_feedback_manager + from ...core.constants import APP_DIR + + feedback = create_feedback_manager(True) + backup_name = backup_name or f"manual_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}" + backup_path = APP_DIR / "sessions" / f"{backup_name}.json" + + return self._session_manager.save_session( + self._history, + backup_path, + session_name=backup_name, + description="Manual backup created by user", + feedback=feedback + ) @property def menu(self) -> Callable[[MenuFunction], MenuFunction]: diff --git a/fastanime/cli/utils/session_manager.py b/fastanime/cli/utils/session_manager.py new file mode 100644 index 0000000..a93eee3 --- /dev/null +++ b/fastanime/cli/utils/session_manager.py @@ -0,0 +1,333 @@ +""" +Session state management utilities for the interactive CLI. +Provides comprehensive session save/resume functionality with error handling and metadata. +""" +import json +import logging +from datetime import datetime +from pathlib import Path +from typing import Dict, List, Optional + +from ...core.constants import APP_DATA_DIR +from ..interactive.state import State + +logger = logging.getLogger(__name__) + +# Session storage directory +SESSIONS_DIR = APP_DATA_DIR / "sessions" +AUTO_SAVE_FILE = SESSIONS_DIR / "auto_save.json" +CRASH_BACKUP_FILE = SESSIONS_DIR / "crash_backup.json" + + +class SessionMetadata: + """Metadata for saved sessions.""" + + def __init__( + self, + created_at: Optional[datetime] = None, + last_saved: Optional[datetime] = None, + session_name: Optional[str] = None, + description: Optional[str] = None, + state_count: int = 0 + ): + self.created_at = created_at or datetime.now() + self.last_saved = last_saved or datetime.now() + self.session_name = session_name + self.description = description + self.state_count = state_count + + def to_dict(self) -> dict: + """Convert metadata to dictionary for JSON serialization.""" + return { + "created_at": self.created_at.isoformat(), + "last_saved": self.last_saved.isoformat(), + "session_name": self.session_name, + "description": self.description, + "state_count": self.state_count + } + + @classmethod + def from_dict(cls, data: dict) -> "SessionMetadata": + """Create metadata from dictionary.""" + return cls( + created_at=datetime.fromisoformat(data.get("created_at", datetime.now().isoformat())), + last_saved=datetime.fromisoformat(data.get("last_saved", datetime.now().isoformat())), + session_name=data.get("session_name"), + description=data.get("description"), + state_count=data.get("state_count", 0) + ) + + +class SessionData: + """Complete session data including history and metadata.""" + + def __init__(self, history: List[State], metadata: SessionMetadata): + self.history = history + self.metadata = metadata + + def to_dict(self) -> dict: + """Convert session data to dictionary for JSON serialization.""" + return { + "metadata": self.metadata.to_dict(), + "history": [state.model_dump(mode="json") for state in self.history], + "format_version": "1.0" # For future compatibility + } + + @classmethod + def from_dict(cls, data: dict) -> "SessionData": + """Create session data from dictionary.""" + metadata = SessionMetadata.from_dict(data.get("metadata", {})) + history_data = data.get("history", []) + history = [] + + for state_dict in history_data: + try: + state = State.model_validate(state_dict) + history.append(state) + except Exception as e: + logger.warning(f"Skipping invalid state in session: {e}") + + return cls(history, metadata) + + +class SessionManager: + """Manages session save/resume functionality with comprehensive error handling.""" + + def __init__(self): + self._ensure_sessions_directory() + + def _ensure_sessions_directory(self): + """Ensure the sessions directory exists.""" + SESSIONS_DIR.mkdir(parents=True, exist_ok=True) + + def save_session( + self, + history: List[State], + file_path: Path, + session_name: Optional[str] = None, + description: Optional[str] = None, + feedback=None + ) -> bool: + """ + Save session history to a JSON file with metadata. + + Args: + history: List of session states + file_path: Path to save the session + session_name: Optional name for the session + description: Optional description + feedback: Optional feedback manager for user notifications + + Returns: + True if successful, False otherwise + """ + try: + # Create metadata + metadata = SessionMetadata( + session_name=session_name, + description=description, + state_count=len(history) + ) + + # Create session data + session_data = SessionData(history, metadata) + + # Save to file + with file_path.open('w', encoding='utf-8') as f: + json.dump(session_data.to_dict(), f, indent=2, ensure_ascii=False) + + if feedback: + feedback.success( + "Session saved successfully", + f"Saved {len(history)} states to {file_path.name}" + ) + + logger.info(f"Session saved to {file_path} with {len(history)} states") + return True + + except Exception as e: + error_msg = f"Failed to save session: {e}" + if feedback: + feedback.error("Failed to save session", str(e)) + logger.error(error_msg) + return False + + def load_session(self, file_path: Path, feedback=None) -> Optional[List[State]]: + """ + Load session history from a JSON file. + + Args: + file_path: Path to the session file + feedback: Optional feedback manager for user notifications + + Returns: + List of states if successful, None otherwise + """ + if not file_path.exists(): + if feedback: + feedback.warning( + "Session file not found", + f"The file {file_path.name} does not exist" + ) + logger.warning(f"Session file not found: {file_path}") + return None + + try: + with file_path.open('r', encoding='utf-8') as f: + data = json.load(f) + + session_data = SessionData.from_dict(data) + + if feedback: + feedback.success( + "Session loaded successfully", + f"Loaded {len(session_data.history)} states from {file_path.name}" + ) + + logger.info(f"Session loaded from {file_path} with {len(session_data.history)} states") + return session_data.history + + except json.JSONDecodeError as e: + error_msg = f"Session file is corrupted: {e}" + if feedback: + feedback.error("Session file is corrupted", str(e)) + logger.error(error_msg) + return None + except Exception as e: + error_msg = f"Failed to load session: {e}" + if feedback: + feedback.error("Failed to load session", str(e)) + logger.error(error_msg) + return None + + def auto_save_session(self, history: List[State]) -> bool: + """ + Auto-save session for crash recovery. + + Args: + history: Current session history + + Returns: + True if successful, False otherwise + """ + return self.save_session( + history, + AUTO_SAVE_FILE, + session_name="Auto Save", + description="Automatically saved session" + ) + + def create_crash_backup(self, history: List[State]) -> bool: + """ + Create a crash backup of the current session. + + Args: + history: Current session history + + Returns: + True if successful, False otherwise + """ + return self.save_session( + history, + CRASH_BACKUP_FILE, + session_name="Crash Backup", + description="Session backup created before potential crash" + ) + + def has_auto_save(self) -> bool: + """Check if an auto-save file exists.""" + return AUTO_SAVE_FILE.exists() + + def has_crash_backup(self) -> bool: + """Check if a crash backup file exists.""" + return CRASH_BACKUP_FILE.exists() + + def load_auto_save(self, feedback=None) -> Optional[List[State]]: + """Load the auto-save session.""" + return self.load_session(AUTO_SAVE_FILE, feedback) + + def load_crash_backup(self, feedback=None) -> Optional[List[State]]: + """Load the crash backup session.""" + return self.load_session(CRASH_BACKUP_FILE, feedback) + + def clear_auto_save(self) -> bool: + """Clear the auto-save file.""" + try: + if AUTO_SAVE_FILE.exists(): + AUTO_SAVE_FILE.unlink() + return True + except Exception as e: + logger.error(f"Failed to clear auto-save: {e}") + return False + + def clear_crash_backup(self) -> bool: + """Clear the crash backup file.""" + try: + if CRASH_BACKUP_FILE.exists(): + CRASH_BACKUP_FILE.unlink() + return True + except Exception as e: + logger.error(f"Failed to clear crash backup: {e}") + return False + + def list_saved_sessions(self) -> List[Dict[str, str]]: + """ + List all saved session files with their metadata. + + Returns: + List of dictionaries containing session information + """ + sessions = [] + + for session_file in SESSIONS_DIR.glob("*.json"): + if session_file.name in ["auto_save.json", "crash_backup.json"]: + continue + + try: + with session_file.open('r', encoding='utf-8') as f: + data = json.load(f) + + metadata = data.get("metadata", {}) + sessions.append({ + "file": session_file.name, + "path": str(session_file), + "name": metadata.get("session_name", "Unnamed"), + "description": metadata.get("description", "No description"), + "created": metadata.get("created_at", "Unknown"), + "last_saved": metadata.get("last_saved", "Unknown"), + "state_count": metadata.get("state_count", 0) + }) + except Exception as e: + logger.warning(f"Failed to read session metadata from {session_file}: {e}") + + # Sort by last saved time (newest first) + sessions.sort(key=lambda x: x["last_saved"], reverse=True) + return sessions + + def cleanup_old_sessions(self, max_sessions: int = 10) -> int: + """ + Clean up old session files, keeping only the most recent ones. + + Args: + max_sessions: Maximum number of sessions to keep + + Returns: + Number of sessions deleted + """ + sessions = self.list_saved_sessions() + + if len(sessions) <= max_sessions: + return 0 + + deleted_count = 0 + sessions_to_delete = sessions[max_sessions:] + + for session in sessions_to_delete: + try: + Path(session["path"]).unlink() + deleted_count += 1 + logger.info(f"Deleted old session: {session['name']}") + except Exception as e: + logger.error(f"Failed to delete session {session['name']}: {e}") + + return deleted_count diff --git a/test_session_management.py b/test_session_management.py new file mode 100644 index 0000000..29c9691 --- /dev/null +++ b/test_session_management.py @@ -0,0 +1,142 @@ +""" +Test script to verify the session management system works correctly. +This tests session save/resume functionality and crash recovery. +""" +import json +import sys +import tempfile +from datetime import datetime +from pathlib import Path + +# Add the project root to the path so we can import fastanime modules +project_root = Path(__file__).parent.parent.parent +sys.path.insert(0, str(project_root)) + +from fastanime.cli.utils.session_manager import SessionManager, SessionMetadata, SessionData +from fastanime.cli.utils.feedback import create_feedback_manager +from fastanime.cli.interactive.state import State, MediaApiState + + +def test_session_management(): + """Test the session management system.""" + print("=== Testing Session Management System ===\n") + + feedback = create_feedback_manager(icons_enabled=True) + session_manager = SessionManager() + + # Create test session states + test_states = [ + State(menu_name="MAIN"), + State(menu_name="RESULTS", media_api=MediaApiState()), + State(menu_name="MEDIA_ACTIONS", media_api=MediaApiState()) + ] + + print("1. Testing session metadata creation:") + metadata = SessionMetadata( + session_name="Test Session", + description="This is a test session for validation", + state_count=len(test_states) + ) + print(f" Metadata: {metadata.session_name} - {metadata.description}") + print(f" States: {metadata.state_count}, Created: {metadata.created_at}") + + print("\n2. Testing session data serialization:") + session_data = SessionData(test_states, metadata) + data_dict = session_data.to_dict() + print(f" Serialized keys: {list(data_dict.keys())}") + print(f" Format version: {data_dict['format_version']}") + + print("\n3. Testing session data deserialization:") + restored_session = SessionData.from_dict(data_dict) + print(f" Restored states: {len(restored_session.history)}") + print(f" Restored metadata: {restored_session.metadata.session_name}") + + print("\n4. Testing session save:") + with tempfile.TemporaryDirectory() as temp_dir: + test_file = Path(temp_dir) / "test_session.json" + success = session_manager.save_session( + test_states, + test_file, + session_name="Test Session Save", + description="Testing save functionality", + feedback=feedback + ) + print(f" Save success: {success}") + print(f" File exists: {test_file.exists()}") + + if test_file.exists(): + print(f" File size: {test_file.stat().st_size} bytes") + + print("\n5. Testing session load:") + loaded_states = session_manager.load_session(test_file, feedback) + if loaded_states: + print(f" Loaded states: {len(loaded_states)}") + print(f" First state menu: {loaded_states[0].menu_name}") + print(f" Last state menu: {loaded_states[-1].menu_name}") + + print("\n6. Testing session file content:") + with open(test_file, 'r') as f: + file_content = json.load(f) + print(f" JSON keys: {list(file_content.keys())}") + print(f" History length: {len(file_content['history'])}") + print(f" Session name: {file_content['metadata']['session_name']}") + + print("\n7. Testing auto-save functionality:") + auto_save_success = session_manager.auto_save_session(test_states) + print(f" Auto-save success: {auto_save_success}") + print(f" Has auto-save: {session_manager.has_auto_save()}") + + print("\n8. Testing crash backup:") + crash_backup_success = session_manager.create_crash_backup(test_states) + print(f" Crash backup success: {crash_backup_success}") + print(f" Has crash backup: {session_manager.has_crash_backup()}") + + print("\n9. Testing session listing:") + saved_sessions = session_manager.list_saved_sessions() + print(f" Found {len(saved_sessions)} saved sessions") + for i, sess in enumerate(saved_sessions[:3]): # Show first 3 + print(f" Session {i+1}: {sess['name']} ({sess['state_count']} states)") + + print("\n10. Testing cleanup functions:") + print(f" Can clear auto-save: {session_manager.clear_auto_save()}") + print(f" Can clear crash backup: {session_manager.clear_crash_backup()}") + print(f" Auto-save exists after clear: {session_manager.has_auto_save()}") + print(f" Crash backup exists after clear: {session_manager.has_crash_backup()}") + + print("\n=== Session Management Tests Completed! ===") + + +def test_session_error_handling(): + """Test error handling in session management.""" + print("\n=== Testing Error Handling ===\n") + + feedback = create_feedback_manager(icons_enabled=True) + session_manager = SessionManager() + + print("1. Testing load of non-existent file:") + non_existent = Path("/tmp/non_existent_session.json") + result = session_manager.load_session(non_existent, feedback) + print(f" Result for non-existent file: {result}") + + print("\n2. Testing load of corrupted file:") + with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f: + f.write("{ invalid json content }") + corrupted_file = Path(f.name) + + try: + result = session_manager.load_session(corrupted_file, feedback) + print(f" Result for corrupted file: {result}") + finally: + corrupted_file.unlink() # Clean up + + print("\n3. Testing save to read-only location:") + readonly_path = Path("/tmp/readonly_session.json") + # This test would need actual readonly permissions to be meaningful + print(" Skipped - requires permission setup") + + print("\n=== Error Handling Tests Completed! ===") + + +if __name__ == "__main__": + test_session_management() + test_session_error_handling()