mirror of
https://github.com/Benexl/FastAnime.git
synced 2026-02-04 11:07:48 -08:00
feat: implement session management functionality with save/load capabilities and error handling
This commit is contained in:
@@ -59,7 +59,8 @@ def main(ctx: Context, state: State) -> State | ControlFlow:
|
||||
ctx, "REPEATING"
|
||||
),
|
||||
# --- Control Flow and Utility Options ---
|
||||
f"{'📝 ' if icons else ''}Edit Config": lambda: ("RELOAD_CONFIG", None),
|
||||
f"{'<EFBFBD> ' if icons else ''}Session Management": lambda: ("SESSION_MANAGEMENT", None),
|
||||
f"{'<EFBFBD>📝 ' if icons else ''}Edit Config": lambda: ("RELOAD_CONFIG", None),
|
||||
f"{'❌ ' if icons else ''}Exit": lambda: ("EXIT", None),
|
||||
}
|
||||
|
||||
@@ -81,6 +82,8 @@ def main(ctx: Context, state: State) -> State | ControlFlow:
|
||||
return ControlFlow.EXIT
|
||||
if next_menu_name == "RELOAD_CONFIG":
|
||||
return ControlFlow.RELOAD_CONFIG
|
||||
if next_menu_name == "SESSION_MANAGEMENT":
|
||||
return State(menu_name="SESSION_MANAGEMENT")
|
||||
if next_menu_name == "CONTINUE":
|
||||
return ControlFlow.CONTINUE
|
||||
|
||||
|
||||
241
fastanime/cli/interactive/menus/session_management.py
Normal file
241
fastanime/cli/interactive/menus/session_management.py
Normal file
@@ -0,0 +1,241 @@
|
||||
"""
|
||||
Session management menu for the interactive CLI.
|
||||
Provides options to save, load, and manage session state.
|
||||
"""
|
||||
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Callable, Dict
|
||||
|
||||
from rich.console import Console
|
||||
from rich.table import Table
|
||||
|
||||
from ....core.constants import APP_DIR
|
||||
from ...utils.feedback import create_feedback_manager
|
||||
from ..session import Context, session
|
||||
from ..state import ControlFlow, State
|
||||
|
||||
MenuAction = Callable[[], str]
|
||||
|
||||
|
||||
@session.menu
|
||||
def session_management(ctx: Context, state: State) -> State | ControlFlow:
|
||||
"""
|
||||
Session management menu for saving, loading, and managing session state.
|
||||
"""
|
||||
icons = ctx.config.general.icons
|
||||
feedback = create_feedback_manager(icons)
|
||||
console = Console()
|
||||
console.clear()
|
||||
|
||||
# Show current session stats
|
||||
_display_session_info(console, icons)
|
||||
|
||||
options: Dict[str, MenuAction] = {
|
||||
f"{'💾 ' if icons else ''}Save Current Session": lambda: _save_session(ctx, feedback),
|
||||
f"{'📂 ' if icons else ''}Load Session": lambda: _load_session(ctx, feedback),
|
||||
f"{'📋 ' if icons else ''}List Saved Sessions": lambda: _list_sessions(ctx, feedback),
|
||||
f"{'🗑️ ' if icons else ''}Cleanup Old Sessions": lambda: _cleanup_sessions(ctx, feedback),
|
||||
f"{'💾 ' if icons else ''}Create Manual Backup": lambda: _create_backup(ctx, feedback),
|
||||
f"{'⚙️ ' if icons else ''}Session Settings": lambda: _session_settings(ctx, feedback),
|
||||
f"{'🔙 ' if icons else ''}Back to Main Menu": lambda: "BACK",
|
||||
}
|
||||
|
||||
choice_str = ctx.selector.choose(
|
||||
prompt="Select Session Action",
|
||||
choices=list(options.keys()),
|
||||
header="Session Management",
|
||||
)
|
||||
|
||||
if not choice_str:
|
||||
return ControlFlow.BACK
|
||||
|
||||
result = options[choice_str]()
|
||||
|
||||
if result == "BACK":
|
||||
return ControlFlow.BACK
|
||||
else:
|
||||
return ControlFlow.CONTINUE
|
||||
|
||||
|
||||
def _display_session_info(console: Console, icons: bool):
|
||||
"""Display current session information."""
|
||||
session_stats = session.get_session_stats()
|
||||
|
||||
table = Table(title=f"{'📊 ' if icons else ''}Current Session Info")
|
||||
table.add_column("Property", style="cyan")
|
||||
table.add_column("Value", style="green")
|
||||
|
||||
table.add_row("Current States", str(session_stats["current_states"]))
|
||||
table.add_row("Current Menu", session_stats["current_menu"] or "None")
|
||||
table.add_row("Auto-Save", "Enabled" if session_stats["auto_save_enabled"] else "Disabled")
|
||||
table.add_row("Has Auto-Save", "Yes" if session_stats["has_auto_save"] else "No")
|
||||
table.add_row("Has Crash Backup", "Yes" if session_stats["has_crash_backup"] else "No")
|
||||
|
||||
console.print(table)
|
||||
console.print()
|
||||
|
||||
|
||||
def _save_session(ctx: Context, feedback) -> str:
|
||||
"""Save the current session."""
|
||||
session_name = ctx.selector.ask("Enter session name (optional):")
|
||||
description = ctx.selector.ask("Enter session description (optional):")
|
||||
|
||||
if not session_name:
|
||||
session_name = f"session_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
|
||||
|
||||
sessions_dir = APP_DIR / "sessions"
|
||||
file_path = sessions_dir / f"{session_name}.json"
|
||||
|
||||
if file_path.exists():
|
||||
if not feedback.confirm(f"Session '{session_name}' already exists. Overwrite?"):
|
||||
feedback.info("Save cancelled")
|
||||
return "CONTINUE"
|
||||
|
||||
success = session.save(file_path, session_name, description or "")
|
||||
if success:
|
||||
feedback.success(f"Session saved as '{session_name}'")
|
||||
|
||||
return "CONTINUE"
|
||||
|
||||
|
||||
def _load_session(ctx: Context, feedback) -> str:
|
||||
"""Load a saved session."""
|
||||
sessions = session.list_saved_sessions()
|
||||
|
||||
if not sessions:
|
||||
feedback.warning("No saved sessions found")
|
||||
return "CONTINUE"
|
||||
|
||||
# Create choices with session info
|
||||
choices = []
|
||||
session_map = {}
|
||||
|
||||
for sess in sessions:
|
||||
choice_text = f"{sess['name']} - {sess['description'][:50]}{'...' if len(sess['description']) > 50 else ''}"
|
||||
choices.append(choice_text)
|
||||
session_map[choice_text] = sess
|
||||
|
||||
choices.append("Cancel")
|
||||
|
||||
choice = ctx.selector.choose(
|
||||
"Select session to load:",
|
||||
choices=choices,
|
||||
header="Available Sessions"
|
||||
)
|
||||
|
||||
if not choice or choice == "Cancel":
|
||||
return "CONTINUE"
|
||||
|
||||
selected_session = session_map[choice]
|
||||
file_path = Path(selected_session["path"])
|
||||
|
||||
if feedback.confirm(f"Load session '{selected_session['name']}'? This will replace your current session."):
|
||||
success = session.resume(file_path, feedback)
|
||||
if success:
|
||||
feedback.info("Session loaded successfully. Returning to main menu.")
|
||||
# Return to main menu after loading
|
||||
return "MAIN"
|
||||
|
||||
return "CONTINUE"
|
||||
|
||||
|
||||
def _list_sessions(ctx: Context, feedback) -> str:
|
||||
"""List all saved sessions."""
|
||||
sessions = session.list_saved_sessions()
|
||||
|
||||
if not sessions:
|
||||
feedback.info("No saved sessions found")
|
||||
return "CONTINUE"
|
||||
|
||||
console = Console()
|
||||
table = Table(title="Saved Sessions")
|
||||
table.add_column("Name", style="cyan")
|
||||
table.add_column("Description", style="yellow")
|
||||
table.add_column("States", style="green")
|
||||
table.add_column("Created", style="blue")
|
||||
|
||||
for sess in sessions:
|
||||
# Format the created date
|
||||
created = sess["created"]
|
||||
if "T" in created:
|
||||
created = created.split("T")[0] # Just show the date part
|
||||
|
||||
table.add_row(
|
||||
sess["name"],
|
||||
sess["description"][:40] + "..." if len(sess["description"]) > 40 else sess["description"],
|
||||
str(sess["state_count"]),
|
||||
created
|
||||
)
|
||||
|
||||
console.print(table)
|
||||
feedback.pause_for_user()
|
||||
|
||||
return "CONTINUE"
|
||||
|
||||
|
||||
def _cleanup_sessions(ctx: Context, feedback) -> str:
|
||||
"""Clean up old sessions."""
|
||||
sessions = session.list_saved_sessions()
|
||||
|
||||
if len(sessions) <= 5:
|
||||
feedback.info("No cleanup needed. You have 5 or fewer sessions.")
|
||||
return "CONTINUE"
|
||||
|
||||
max_sessions_str = ctx.selector.ask("How many sessions to keep? (default: 10)")
|
||||
try:
|
||||
max_sessions = int(max_sessions_str) if max_sessions_str else 10
|
||||
except ValueError:
|
||||
feedback.error("Invalid number entered")
|
||||
return "CONTINUE"
|
||||
|
||||
if feedback.confirm(f"Delete sessions older than the {max_sessions} most recent?"):
|
||||
deleted_count = session.cleanup_old_sessions(max_sessions)
|
||||
feedback.success(f"Deleted {deleted_count} old sessions")
|
||||
|
||||
return "CONTINUE"
|
||||
|
||||
|
||||
def _create_backup(ctx: Context, feedback) -> str:
|
||||
"""Create a manual backup."""
|
||||
backup_name = ctx.selector.ask("Enter backup name (optional):")
|
||||
|
||||
success = session.create_manual_backup(backup_name or "")
|
||||
if success:
|
||||
feedback.success("Manual backup created successfully")
|
||||
|
||||
return "CONTINUE"
|
||||
|
||||
|
||||
def _session_settings(ctx: Context, feedback) -> str:
|
||||
"""Configure session settings."""
|
||||
current_auto_save = session._auto_save_enabled
|
||||
|
||||
choices = [
|
||||
f"Auto-Save: {'Enabled' if current_auto_save else 'Disabled'}",
|
||||
"Clear Auto-Save File",
|
||||
"Clear Crash Backup",
|
||||
"Back"
|
||||
]
|
||||
|
||||
choice = ctx.selector.choose(
|
||||
"Session Settings:",
|
||||
choices=choices
|
||||
)
|
||||
|
||||
if choice and choice.startswith("Auto-Save"):
|
||||
new_setting = not current_auto_save
|
||||
session.enable_auto_save(new_setting)
|
||||
feedback.success(f"Auto-save {'enabled' if new_setting else 'disabled'}")
|
||||
|
||||
elif choice == "Clear Auto-Save File":
|
||||
if feedback.confirm("Clear the auto-save file?"):
|
||||
session._session_manager.clear_auto_save()
|
||||
feedback.success("Auto-save file cleared")
|
||||
|
||||
elif choice == "Clear Crash Backup":
|
||||
if feedback.confirm("Clear the crash backup file?"):
|
||||
session._session_manager.clear_crash_backup()
|
||||
feedback.success("Crash backup cleared")
|
||||
|
||||
return "CONTINUE"
|
||||
@@ -2,8 +2,9 @@ import importlib.util
|
||||
import logging
|
||||
import os
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import TYPE_CHECKING, Callable, List
|
||||
from typing import Callable, List
|
||||
|
||||
import click
|
||||
|
||||
@@ -14,6 +15,7 @@ from ...libs.players.base import BasePlayer
|
||||
from ...libs.providers.anime.base import BaseAnimeProvider
|
||||
from ...libs.selectors.base import BaseSelector
|
||||
from ..config import ConfigLoader
|
||||
from ..utils.session_manager import SessionManager
|
||||
from .state import ControlFlow, State
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -59,6 +61,8 @@ class Session:
|
||||
self._context: Context | None = None
|
||||
self._history: List[State] = []
|
||||
self._menus: dict[str, Menu] = {}
|
||||
self._session_manager = SessionManager()
|
||||
self._auto_save_enabled = True
|
||||
|
||||
def _load_context(self, config: AppConfig):
|
||||
"""Initializes all shared services based on the provided configuration."""
|
||||
@@ -152,14 +156,60 @@ class Session:
|
||||
config: The initial application configuration.
|
||||
resume_path: Optional path to a saved session file to resume from.
|
||||
"""
|
||||
from ..utils.feedback import create_feedback_manager
|
||||
|
||||
feedback = create_feedback_manager(True) # Always use icons for session messages
|
||||
|
||||
self._load_context(config)
|
||||
|
||||
# Handle session recovery
|
||||
if resume_path:
|
||||
self.resume(resume_path)
|
||||
elif not self._history:
|
||||
# Start with the main menu if history is empty
|
||||
self.resume(resume_path, feedback)
|
||||
elif self._session_manager.has_crash_backup():
|
||||
# Offer to resume from crash backup
|
||||
if feedback.confirm(
|
||||
"Found a crash backup from a previous session. Would you like to resume?",
|
||||
default=True
|
||||
):
|
||||
crash_history = self._session_manager.load_crash_backup(feedback)
|
||||
if crash_history:
|
||||
self._history = crash_history
|
||||
feedback.info("Session restored from crash backup")
|
||||
# Clear the crash backup after successful recovery
|
||||
self._session_manager.clear_crash_backup()
|
||||
elif self._session_manager.has_auto_save():
|
||||
# Offer to resume from auto-save
|
||||
if feedback.confirm(
|
||||
"Found an auto-saved session. Would you like to resume?",
|
||||
default=False
|
||||
):
|
||||
auto_history = self._session_manager.load_auto_save(feedback)
|
||||
if auto_history:
|
||||
self._history = auto_history
|
||||
feedback.info("Session restored from auto-save")
|
||||
|
||||
# Start with main menu if no history
|
||||
if not self._history:
|
||||
self._history.append(State(menu_name="MAIN"))
|
||||
|
||||
# Create crash backup before starting
|
||||
if self._auto_save_enabled:
|
||||
self._session_manager.create_crash_backup(self._history)
|
||||
|
||||
try:
|
||||
self._run_main_loop()
|
||||
except KeyboardInterrupt:
|
||||
feedback.warning("Session interrupted by user")
|
||||
self._handle_session_exit(feedback, interrupted=True)
|
||||
except Exception as e:
|
||||
feedback.error("Session crashed unexpectedly", str(e))
|
||||
self._handle_session_exit(feedback, crashed=True)
|
||||
raise
|
||||
else:
|
||||
self._handle_session_exit(feedback, normal_exit=True)
|
||||
|
||||
def _run_main_loop(self):
|
||||
"""Run the main session loop."""
|
||||
while self._history:
|
||||
current_state = self._history[-1]
|
||||
menu_to_run = self._menus.get(current_state.menu_name)
|
||||
@@ -170,6 +220,10 @@ class Session:
|
||||
)
|
||||
break
|
||||
|
||||
# Auto-save periodically (every 5 state changes)
|
||||
if self._auto_save_enabled and len(self._history) % 5 == 0:
|
||||
self._session_manager.auto_save_session(self._history)
|
||||
|
||||
# Execute the menu function, which returns the next step.
|
||||
next_step = menu_to_run.execute(self._context, current_state)
|
||||
|
||||
@@ -187,37 +241,109 @@ class Session:
|
||||
# if the state is main menu we should reset the history
|
||||
if next_step.menu_name == "MAIN":
|
||||
self._history = [next_step]
|
||||
# A new state was returned, push it to history for the next loop.
|
||||
self._history.append(next_step)
|
||||
else:
|
||||
# A new state was returned, push it to history for the next loop.
|
||||
self._history.append(next_step)
|
||||
else:
|
||||
logger.error(
|
||||
f"Menu '{current_state.menu_name}' returned invalid type: {type(next_step)}"
|
||||
)
|
||||
break
|
||||
|
||||
def _handle_session_exit(self, feedback, normal_exit=False, interrupted=False, crashed=False):
|
||||
"""Handle session cleanup on exit."""
|
||||
if self._auto_save_enabled and self._history:
|
||||
if normal_exit:
|
||||
# Clear auto-save on normal exit
|
||||
self._session_manager.clear_auto_save()
|
||||
self._session_manager.clear_crash_backup()
|
||||
feedback.info("Session completed normally")
|
||||
elif interrupted:
|
||||
# Save session on interruption
|
||||
self._session_manager.auto_save_session(self._history)
|
||||
feedback.info("Session auto-saved due to interruption")
|
||||
elif crashed:
|
||||
# Keep crash backup on crash
|
||||
feedback.error("Session backup maintained for recovery")
|
||||
|
||||
click.echo("Exiting interactive session.")
|
||||
|
||||
def save(self, file_path: Path):
|
||||
"""Serializes the session history to a JSON file."""
|
||||
history_dicts = [state.model_dump(mode="json") for state in self._history]
|
||||
try:
|
||||
file_path.write_text(str(history_dicts))
|
||||
logger.info(f"Session saved to {file_path}")
|
||||
except IOError as e:
|
||||
logger.error(f"Failed to save session: {e}")
|
||||
def save(self, file_path: Path, session_name: str = None, description: str = None):
|
||||
"""
|
||||
Save session history to a file with comprehensive metadata and error handling.
|
||||
|
||||
Args:
|
||||
file_path: Path to save the session
|
||||
session_name: Optional name for the session
|
||||
description: Optional description for the session
|
||||
"""
|
||||
from ..utils.feedback import create_feedback_manager
|
||||
|
||||
feedback = create_feedback_manager(True)
|
||||
return self._session_manager.save_session(
|
||||
self._history,
|
||||
file_path,
|
||||
session_name=session_name,
|
||||
description=description,
|
||||
feedback=feedback
|
||||
)
|
||||
|
||||
def resume(self, file_path: Path):
|
||||
"""Loads a session history from a JSON file."""
|
||||
if not file_path.exists():
|
||||
logger.warning(f"Resume file not found: {file_path}")
|
||||
return
|
||||
try:
|
||||
history_dicts = file_path.read_text()
|
||||
self._history = [State.model_validate(d) for d in history_dicts]
|
||||
logger.info(f"Session resumed from {file_path}")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to resume session: {e}")
|
||||
self._history = [] # Reset history on failure
|
||||
def resume(self, file_path: Path, feedback=None):
|
||||
"""
|
||||
Load session history from a file with comprehensive error handling.
|
||||
|
||||
Args:
|
||||
file_path: Path to the session file
|
||||
feedback: Optional feedback manager for user notifications
|
||||
"""
|
||||
if not feedback:
|
||||
from ..utils.feedback import create_feedback_manager
|
||||
feedback = create_feedback_manager(True)
|
||||
|
||||
history = self._session_manager.load_session(file_path, feedback)
|
||||
if history:
|
||||
self._history = history
|
||||
return True
|
||||
return False
|
||||
|
||||
def list_saved_sessions(self):
|
||||
"""List all saved sessions with their metadata."""
|
||||
return self._session_manager.list_saved_sessions()
|
||||
|
||||
def cleanup_old_sessions(self, max_sessions: int = 10):
|
||||
"""Clean up old session files, keeping only the most recent ones."""
|
||||
return self._session_manager.cleanup_old_sessions(max_sessions)
|
||||
|
||||
def enable_auto_save(self, enabled: bool = True):
|
||||
"""Enable or disable auto-save functionality."""
|
||||
self._auto_save_enabled = enabled
|
||||
|
||||
def get_session_stats(self) -> dict:
|
||||
"""Get statistics about the current session."""
|
||||
return {
|
||||
"current_states": len(self._history),
|
||||
"current_menu": self._history[-1].menu_name if self._history else None,
|
||||
"auto_save_enabled": self._auto_save_enabled,
|
||||
"has_auto_save": self._session_manager.has_auto_save(),
|
||||
"has_crash_backup": self._session_manager.has_crash_backup()
|
||||
}
|
||||
|
||||
def create_manual_backup(self, backup_name: str = None):
|
||||
"""Create a manual backup of the current session."""
|
||||
from ..utils.feedback import create_feedback_manager
|
||||
from ...core.constants import APP_DIR
|
||||
|
||||
feedback = create_feedback_manager(True)
|
||||
backup_name = backup_name or f"manual_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}"
|
||||
backup_path = APP_DIR / "sessions" / f"{backup_name}.json"
|
||||
|
||||
return self._session_manager.save_session(
|
||||
self._history,
|
||||
backup_path,
|
||||
session_name=backup_name,
|
||||
description="Manual backup created by user",
|
||||
feedback=feedback
|
||||
)
|
||||
|
||||
@property
|
||||
def menu(self) -> Callable[[MenuFunction], MenuFunction]:
|
||||
|
||||
333
fastanime/cli/utils/session_manager.py
Normal file
333
fastanime/cli/utils/session_manager.py
Normal file
@@ -0,0 +1,333 @@
|
||||
"""
|
||||
Session state management utilities for the interactive CLI.
|
||||
Provides comprehensive session save/resume functionality with error handling and metadata.
|
||||
"""
|
||||
import json
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Optional
|
||||
|
||||
from ...core.constants import APP_DATA_DIR
|
||||
from ..interactive.state import State
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Session storage directory
|
||||
SESSIONS_DIR = APP_DATA_DIR / "sessions"
|
||||
AUTO_SAVE_FILE = SESSIONS_DIR / "auto_save.json"
|
||||
CRASH_BACKUP_FILE = SESSIONS_DIR / "crash_backup.json"
|
||||
|
||||
|
||||
class SessionMetadata:
|
||||
"""Metadata for saved sessions."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
created_at: Optional[datetime] = None,
|
||||
last_saved: Optional[datetime] = None,
|
||||
session_name: Optional[str] = None,
|
||||
description: Optional[str] = None,
|
||||
state_count: int = 0
|
||||
):
|
||||
self.created_at = created_at or datetime.now()
|
||||
self.last_saved = last_saved or datetime.now()
|
||||
self.session_name = session_name
|
||||
self.description = description
|
||||
self.state_count = state_count
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
"""Convert metadata to dictionary for JSON serialization."""
|
||||
return {
|
||||
"created_at": self.created_at.isoformat(),
|
||||
"last_saved": self.last_saved.isoformat(),
|
||||
"session_name": self.session_name,
|
||||
"description": self.description,
|
||||
"state_count": self.state_count
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: dict) -> "SessionMetadata":
|
||||
"""Create metadata from dictionary."""
|
||||
return cls(
|
||||
created_at=datetime.fromisoformat(data.get("created_at", datetime.now().isoformat())),
|
||||
last_saved=datetime.fromisoformat(data.get("last_saved", datetime.now().isoformat())),
|
||||
session_name=data.get("session_name"),
|
||||
description=data.get("description"),
|
||||
state_count=data.get("state_count", 0)
|
||||
)
|
||||
|
||||
|
||||
class SessionData:
|
||||
"""Complete session data including history and metadata."""
|
||||
|
||||
def __init__(self, history: List[State], metadata: SessionMetadata):
|
||||
self.history = history
|
||||
self.metadata = metadata
|
||||
|
||||
def to_dict(self) -> dict:
|
||||
"""Convert session data to dictionary for JSON serialization."""
|
||||
return {
|
||||
"metadata": self.metadata.to_dict(),
|
||||
"history": [state.model_dump(mode="json") for state in self.history],
|
||||
"format_version": "1.0" # For future compatibility
|
||||
}
|
||||
|
||||
@classmethod
|
||||
def from_dict(cls, data: dict) -> "SessionData":
|
||||
"""Create session data from dictionary."""
|
||||
metadata = SessionMetadata.from_dict(data.get("metadata", {}))
|
||||
history_data = data.get("history", [])
|
||||
history = []
|
||||
|
||||
for state_dict in history_data:
|
||||
try:
|
||||
state = State.model_validate(state_dict)
|
||||
history.append(state)
|
||||
except Exception as e:
|
||||
logger.warning(f"Skipping invalid state in session: {e}")
|
||||
|
||||
return cls(history, metadata)
|
||||
|
||||
|
||||
class SessionManager:
|
||||
"""Manages session save/resume functionality with comprehensive error handling."""
|
||||
|
||||
def __init__(self):
|
||||
self._ensure_sessions_directory()
|
||||
|
||||
def _ensure_sessions_directory(self):
|
||||
"""Ensure the sessions directory exists."""
|
||||
SESSIONS_DIR.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
def save_session(
|
||||
self,
|
||||
history: List[State],
|
||||
file_path: Path,
|
||||
session_name: Optional[str] = None,
|
||||
description: Optional[str] = None,
|
||||
feedback=None
|
||||
) -> bool:
|
||||
"""
|
||||
Save session history to a JSON file with metadata.
|
||||
|
||||
Args:
|
||||
history: List of session states
|
||||
file_path: Path to save the session
|
||||
session_name: Optional name for the session
|
||||
description: Optional description
|
||||
feedback: Optional feedback manager for user notifications
|
||||
|
||||
Returns:
|
||||
True if successful, False otherwise
|
||||
"""
|
||||
try:
|
||||
# Create metadata
|
||||
metadata = SessionMetadata(
|
||||
session_name=session_name,
|
||||
description=description,
|
||||
state_count=len(history)
|
||||
)
|
||||
|
||||
# Create session data
|
||||
session_data = SessionData(history, metadata)
|
||||
|
||||
# Save to file
|
||||
with file_path.open('w', encoding='utf-8') as f:
|
||||
json.dump(session_data.to_dict(), f, indent=2, ensure_ascii=False)
|
||||
|
||||
if feedback:
|
||||
feedback.success(
|
||||
"Session saved successfully",
|
||||
f"Saved {len(history)} states to {file_path.name}"
|
||||
)
|
||||
|
||||
logger.info(f"Session saved to {file_path} with {len(history)} states")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
error_msg = f"Failed to save session: {e}"
|
||||
if feedback:
|
||||
feedback.error("Failed to save session", str(e))
|
||||
logger.error(error_msg)
|
||||
return False
|
||||
|
||||
def load_session(self, file_path: Path, feedback=None) -> Optional[List[State]]:
|
||||
"""
|
||||
Load session history from a JSON file.
|
||||
|
||||
Args:
|
||||
file_path: Path to the session file
|
||||
feedback: Optional feedback manager for user notifications
|
||||
|
||||
Returns:
|
||||
List of states if successful, None otherwise
|
||||
"""
|
||||
if not file_path.exists():
|
||||
if feedback:
|
||||
feedback.warning(
|
||||
"Session file not found",
|
||||
f"The file {file_path.name} does not exist"
|
||||
)
|
||||
logger.warning(f"Session file not found: {file_path}")
|
||||
return None
|
||||
|
||||
try:
|
||||
with file_path.open('r', encoding='utf-8') as f:
|
||||
data = json.load(f)
|
||||
|
||||
session_data = SessionData.from_dict(data)
|
||||
|
||||
if feedback:
|
||||
feedback.success(
|
||||
"Session loaded successfully",
|
||||
f"Loaded {len(session_data.history)} states from {file_path.name}"
|
||||
)
|
||||
|
||||
logger.info(f"Session loaded from {file_path} with {len(session_data.history)} states")
|
||||
return session_data.history
|
||||
|
||||
except json.JSONDecodeError as e:
|
||||
error_msg = f"Session file is corrupted: {e}"
|
||||
if feedback:
|
||||
feedback.error("Session file is corrupted", str(e))
|
||||
logger.error(error_msg)
|
||||
return None
|
||||
except Exception as e:
|
||||
error_msg = f"Failed to load session: {e}"
|
||||
if feedback:
|
||||
feedback.error("Failed to load session", str(e))
|
||||
logger.error(error_msg)
|
||||
return None
|
||||
|
||||
def auto_save_session(self, history: List[State]) -> bool:
|
||||
"""
|
||||
Auto-save session for crash recovery.
|
||||
|
||||
Args:
|
||||
history: Current session history
|
||||
|
||||
Returns:
|
||||
True if successful, False otherwise
|
||||
"""
|
||||
return self.save_session(
|
||||
history,
|
||||
AUTO_SAVE_FILE,
|
||||
session_name="Auto Save",
|
||||
description="Automatically saved session"
|
||||
)
|
||||
|
||||
def create_crash_backup(self, history: List[State]) -> bool:
|
||||
"""
|
||||
Create a crash backup of the current session.
|
||||
|
||||
Args:
|
||||
history: Current session history
|
||||
|
||||
Returns:
|
||||
True if successful, False otherwise
|
||||
"""
|
||||
return self.save_session(
|
||||
history,
|
||||
CRASH_BACKUP_FILE,
|
||||
session_name="Crash Backup",
|
||||
description="Session backup created before potential crash"
|
||||
)
|
||||
|
||||
def has_auto_save(self) -> bool:
|
||||
"""Check if an auto-save file exists."""
|
||||
return AUTO_SAVE_FILE.exists()
|
||||
|
||||
def has_crash_backup(self) -> bool:
|
||||
"""Check if a crash backup file exists."""
|
||||
return CRASH_BACKUP_FILE.exists()
|
||||
|
||||
def load_auto_save(self, feedback=None) -> Optional[List[State]]:
|
||||
"""Load the auto-save session."""
|
||||
return self.load_session(AUTO_SAVE_FILE, feedback)
|
||||
|
||||
def load_crash_backup(self, feedback=None) -> Optional[List[State]]:
|
||||
"""Load the crash backup session."""
|
||||
return self.load_session(CRASH_BACKUP_FILE, feedback)
|
||||
|
||||
def clear_auto_save(self) -> bool:
|
||||
"""Clear the auto-save file."""
|
||||
try:
|
||||
if AUTO_SAVE_FILE.exists():
|
||||
AUTO_SAVE_FILE.unlink()
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to clear auto-save: {e}")
|
||||
return False
|
||||
|
||||
def clear_crash_backup(self) -> bool:
|
||||
"""Clear the crash backup file."""
|
||||
try:
|
||||
if CRASH_BACKUP_FILE.exists():
|
||||
CRASH_BACKUP_FILE.unlink()
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to clear crash backup: {e}")
|
||||
return False
|
||||
|
||||
def list_saved_sessions(self) -> List[Dict[str, str]]:
|
||||
"""
|
||||
List all saved session files with their metadata.
|
||||
|
||||
Returns:
|
||||
List of dictionaries containing session information
|
||||
"""
|
||||
sessions = []
|
||||
|
||||
for session_file in SESSIONS_DIR.glob("*.json"):
|
||||
if session_file.name in ["auto_save.json", "crash_backup.json"]:
|
||||
continue
|
||||
|
||||
try:
|
||||
with session_file.open('r', encoding='utf-8') as f:
|
||||
data = json.load(f)
|
||||
|
||||
metadata = data.get("metadata", {})
|
||||
sessions.append({
|
||||
"file": session_file.name,
|
||||
"path": str(session_file),
|
||||
"name": metadata.get("session_name", "Unnamed"),
|
||||
"description": metadata.get("description", "No description"),
|
||||
"created": metadata.get("created_at", "Unknown"),
|
||||
"last_saved": metadata.get("last_saved", "Unknown"),
|
||||
"state_count": metadata.get("state_count", 0)
|
||||
})
|
||||
except Exception as e:
|
||||
logger.warning(f"Failed to read session metadata from {session_file}: {e}")
|
||||
|
||||
# Sort by last saved time (newest first)
|
||||
sessions.sort(key=lambda x: x["last_saved"], reverse=True)
|
||||
return sessions
|
||||
|
||||
def cleanup_old_sessions(self, max_sessions: int = 10) -> int:
|
||||
"""
|
||||
Clean up old session files, keeping only the most recent ones.
|
||||
|
||||
Args:
|
||||
max_sessions: Maximum number of sessions to keep
|
||||
|
||||
Returns:
|
||||
Number of sessions deleted
|
||||
"""
|
||||
sessions = self.list_saved_sessions()
|
||||
|
||||
if len(sessions) <= max_sessions:
|
||||
return 0
|
||||
|
||||
deleted_count = 0
|
||||
sessions_to_delete = sessions[max_sessions:]
|
||||
|
||||
for session in sessions_to_delete:
|
||||
try:
|
||||
Path(session["path"]).unlink()
|
||||
deleted_count += 1
|
||||
logger.info(f"Deleted old session: {session['name']}")
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to delete session {session['name']}: {e}")
|
||||
|
||||
return deleted_count
|
||||
142
test_session_management.py
Normal file
142
test_session_management.py
Normal file
@@ -0,0 +1,142 @@
|
||||
"""
|
||||
Test script to verify the session management system works correctly.
|
||||
This tests session save/resume functionality and crash recovery.
|
||||
"""
|
||||
import json
|
||||
import sys
|
||||
import tempfile
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
|
||||
# Add the project root to the path so we can import fastanime modules
|
||||
project_root = Path(__file__).parent.parent.parent
|
||||
sys.path.insert(0, str(project_root))
|
||||
|
||||
from fastanime.cli.utils.session_manager import SessionManager, SessionMetadata, SessionData
|
||||
from fastanime.cli.utils.feedback import create_feedback_manager
|
||||
from fastanime.cli.interactive.state import State, MediaApiState
|
||||
|
||||
|
||||
def test_session_management():
|
||||
"""Test the session management system."""
|
||||
print("=== Testing Session Management System ===\n")
|
||||
|
||||
feedback = create_feedback_manager(icons_enabled=True)
|
||||
session_manager = SessionManager()
|
||||
|
||||
# Create test session states
|
||||
test_states = [
|
||||
State(menu_name="MAIN"),
|
||||
State(menu_name="RESULTS", media_api=MediaApiState()),
|
||||
State(menu_name="MEDIA_ACTIONS", media_api=MediaApiState())
|
||||
]
|
||||
|
||||
print("1. Testing session metadata creation:")
|
||||
metadata = SessionMetadata(
|
||||
session_name="Test Session",
|
||||
description="This is a test session for validation",
|
||||
state_count=len(test_states)
|
||||
)
|
||||
print(f" Metadata: {metadata.session_name} - {metadata.description}")
|
||||
print(f" States: {metadata.state_count}, Created: {metadata.created_at}")
|
||||
|
||||
print("\n2. Testing session data serialization:")
|
||||
session_data = SessionData(test_states, metadata)
|
||||
data_dict = session_data.to_dict()
|
||||
print(f" Serialized keys: {list(data_dict.keys())}")
|
||||
print(f" Format version: {data_dict['format_version']}")
|
||||
|
||||
print("\n3. Testing session data deserialization:")
|
||||
restored_session = SessionData.from_dict(data_dict)
|
||||
print(f" Restored states: {len(restored_session.history)}")
|
||||
print(f" Restored metadata: {restored_session.metadata.session_name}")
|
||||
|
||||
print("\n4. Testing session save:")
|
||||
with tempfile.TemporaryDirectory() as temp_dir:
|
||||
test_file = Path(temp_dir) / "test_session.json"
|
||||
success = session_manager.save_session(
|
||||
test_states,
|
||||
test_file,
|
||||
session_name="Test Session Save",
|
||||
description="Testing save functionality",
|
||||
feedback=feedback
|
||||
)
|
||||
print(f" Save success: {success}")
|
||||
print(f" File exists: {test_file.exists()}")
|
||||
|
||||
if test_file.exists():
|
||||
print(f" File size: {test_file.stat().st_size} bytes")
|
||||
|
||||
print("\n5. Testing session load:")
|
||||
loaded_states = session_manager.load_session(test_file, feedback)
|
||||
if loaded_states:
|
||||
print(f" Loaded states: {len(loaded_states)}")
|
||||
print(f" First state menu: {loaded_states[0].menu_name}")
|
||||
print(f" Last state menu: {loaded_states[-1].menu_name}")
|
||||
|
||||
print("\n6. Testing session file content:")
|
||||
with open(test_file, 'r') as f:
|
||||
file_content = json.load(f)
|
||||
print(f" JSON keys: {list(file_content.keys())}")
|
||||
print(f" History length: {len(file_content['history'])}")
|
||||
print(f" Session name: {file_content['metadata']['session_name']}")
|
||||
|
||||
print("\n7. Testing auto-save functionality:")
|
||||
auto_save_success = session_manager.auto_save_session(test_states)
|
||||
print(f" Auto-save success: {auto_save_success}")
|
||||
print(f" Has auto-save: {session_manager.has_auto_save()}")
|
||||
|
||||
print("\n8. Testing crash backup:")
|
||||
crash_backup_success = session_manager.create_crash_backup(test_states)
|
||||
print(f" Crash backup success: {crash_backup_success}")
|
||||
print(f" Has crash backup: {session_manager.has_crash_backup()}")
|
||||
|
||||
print("\n9. Testing session listing:")
|
||||
saved_sessions = session_manager.list_saved_sessions()
|
||||
print(f" Found {len(saved_sessions)} saved sessions")
|
||||
for i, sess in enumerate(saved_sessions[:3]): # Show first 3
|
||||
print(f" Session {i+1}: {sess['name']} ({sess['state_count']} states)")
|
||||
|
||||
print("\n10. Testing cleanup functions:")
|
||||
print(f" Can clear auto-save: {session_manager.clear_auto_save()}")
|
||||
print(f" Can clear crash backup: {session_manager.clear_crash_backup()}")
|
||||
print(f" Auto-save exists after clear: {session_manager.has_auto_save()}")
|
||||
print(f" Crash backup exists after clear: {session_manager.has_crash_backup()}")
|
||||
|
||||
print("\n=== Session Management Tests Completed! ===")
|
||||
|
||||
|
||||
def test_session_error_handling():
|
||||
"""Test error handling in session management."""
|
||||
print("\n=== Testing Error Handling ===\n")
|
||||
|
||||
feedback = create_feedback_manager(icons_enabled=True)
|
||||
session_manager = SessionManager()
|
||||
|
||||
print("1. Testing load of non-existent file:")
|
||||
non_existent = Path("/tmp/non_existent_session.json")
|
||||
result = session_manager.load_session(non_existent, feedback)
|
||||
print(f" Result for non-existent file: {result}")
|
||||
|
||||
print("\n2. Testing load of corrupted file:")
|
||||
with tempfile.NamedTemporaryFile(mode='w', suffix='.json', delete=False) as f:
|
||||
f.write("{ invalid json content }")
|
||||
corrupted_file = Path(f.name)
|
||||
|
||||
try:
|
||||
result = session_manager.load_session(corrupted_file, feedback)
|
||||
print(f" Result for corrupted file: {result}")
|
||||
finally:
|
||||
corrupted_file.unlink() # Clean up
|
||||
|
||||
print("\n3. Testing save to read-only location:")
|
||||
readonly_path = Path("/tmp/readonly_session.json")
|
||||
# This test would need actual readonly permissions to be meaningful
|
||||
print(" Skipped - requires permission setup")
|
||||
|
||||
print("\n=== Error Handling Tests Completed! ===")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
test_session_management()
|
||||
test_session_error_handling()
|
||||
Reference in New Issue
Block a user