feat: working with ai is a mess lol

This commit is contained in:
Benexl
2025-07-16 00:46:02 +03:00
parent 490f8b0e8b
commit 49cdd440df
50 changed files with 964 additions and 2364 deletions

View File

@@ -2,9 +2,9 @@
Utility modules for the FastAnime CLI.
"""
from .watch_history_manager import WatchHistoryManager
from .watch_history_tracker import WatchHistoryTracker, watch_tracker
from .watch_history_types import WatchHistoryEntry, WatchHistoryData
from ..services.watch_history.manager import WatchHistoryManager
from ..services.watch_history.tracker import WatchHistoryTracker, watch_tracker
from ..services.watch_history.types import WatchHistoryEntry, WatchHistoryData
__all__ = [
"WatchHistoryManager",

View File

@@ -1,168 +0,0 @@
"""
Authentication utilities for the interactive CLI.
Provides functions to check authentication status and display user information.
"""
from typing import Optional
from ...libs.api.base import BaseApiClient
from ...libs.api.types import UserProfile
from .feedback import FeedbackManager
def get_auth_status_indicator(
api_client: BaseApiClient, icons_enabled: bool = True
) -> tuple[str, Optional[UserProfile]]:
"""
Get authentication status indicator for display in menus.
Returns:
tuple of (status_text, user_profile or None)
"""
user_profile = getattr(api_client, "user_profile", None)
if user_profile:
# User is authenticated
icon = "🟢 " if icons_enabled else ""
status_text = f"{icon}Logged in as {user_profile.name}"
return status_text, user_profile
else:
# User is not authenticated
icon = "🔴 " if icons_enabled else ""
status_text = f"{icon}Not logged in"
return status_text, None
def format_user_info_header(
user_profile: Optional[UserProfile], icons_enabled: bool = True
) -> str:
"""
Format user information for display in menu headers.
Returns:
Formatted string with user info or empty string if not authenticated
"""
if not user_profile:
return ""
icon = "👤 " if icons_enabled else ""
return f"{icon}User: {user_profile.name} (ID: {user_profile.id})"
def check_authentication_required(
api_client: BaseApiClient,
feedback: FeedbackManager,
operation_name: str = "this action",
) -> bool:
"""
Check if user is authenticated and show appropriate feedback if not.
Returns:
True if authenticated, False if not (with feedback shown)
"""
user_profile = getattr(api_client, "user_profile", None)
if not user_profile:
feedback.warning(
f"Authentication required for {operation_name}",
"Please log in to your AniList account using 'fastanime anilist auth' to access this feature",
)
return False
return True
def format_auth_menu_header(
api_client: BaseApiClient, base_header: str, icons_enabled: bool = True
) -> str:
"""
Format menu header with authentication status.
Args:
api_client: The API client to check authentication status
base_header: Base header text (e.g., "FastAnime Main Menu")
icons_enabled: Whether to show icons
Returns:
Formatted header with authentication status
"""
status_text, user_profile = get_auth_status_indicator(api_client, icons_enabled)
if user_profile:
return f"{base_header}\n{status_text}"
else:
return f"{base_header}\n{status_text} - Some features require authentication"
def prompt_for_authentication(
feedback: FeedbackManager, operation_name: str = "continue"
) -> bool:
"""
Prompt user about authentication requirement and offer guidance.
Returns:
True if user wants to continue anyway, False if they want to stop
"""
feedback.info(
"Authentication Required",
f"To {operation_name}, you need to log in to your AniList account",
)
feedback.info(
"How to authenticate:",
"Run 'fastanime anilist auth' in your terminal to log in",
)
return feedback.confirm("Continue without authentication?", default=False)
def show_authentication_instructions(feedback: FeedbackManager, icons_enabled: bool = True) -> None:
"""
Show detailed instructions for authenticating with AniList.
"""
icon = "🔐 " if icons_enabled else ""
feedback.info(
f"{icon}AniList Authentication Required",
"To access personalized features, you need to authenticate with your AniList account"
)
instructions = [
"1. Go to the interactive menu: 'Authentication' option",
"2. Select 'Login to AniList'",
"3. Follow the OAuth flow in your browser",
"4. Copy and paste the token when prompted",
"",
"Alternatively, use the CLI command:",
"fastanime anilist auth"
]
for instruction in instructions:
if instruction:
feedback.info("", instruction)
else:
feedback.info("", "")
def get_authentication_prompt_message(operation_name: str, icons_enabled: bool = True) -> str:
"""
Get a formatted message prompting for authentication for a specific operation.
"""
icon = "🔒 " if icons_enabled else ""
return f"{icon}Authentication required to {operation_name}. Please log in to continue."
def format_login_success_message(user_name: str, icons_enabled: bool = True) -> str:
"""
Format a success message for successful login.
"""
icon = "🎉 " if icons_enabled else ""
return f"{icon}Successfully logged in as {user_name}!"
def format_logout_success_message(icons_enabled: bool = True) -> str:
"""
Format a success message for successful logout.
"""
icon = "👋 " if icons_enabled else ""
return f"{icon}Successfully logged out!"

View File

@@ -1,208 +0,0 @@
"""
Download queue management system for FastAnime.
Handles queuing, processing, and tracking of download jobs.
"""
import json
import logging
from datetime import datetime
from enum import Enum
from pathlib import Path
from typing import Dict, List, Optional
from pydantic import BaseModel, Field
from ...core.constants import APP_DATA_DIR
logger = logging.getLogger(__name__)
class DownloadStatus(str, Enum):
"""Status of a download job."""
PENDING = "pending"
DOWNLOADING = "downloading"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
class DownloadJob(BaseModel):
"""Represents a single download job in the queue."""
id: str = Field(description="Unique identifier for the job")
anime_title: str = Field(description="Title of the anime")
episode: str = Field(description="Episode number or identifier")
media_id: Optional[int] = Field(default=None, description="AniList media ID if available")
provider_id: Optional[str] = Field(default=None, description="Provider-specific anime ID")
quality: str = Field(default="1080", description="Preferred quality")
translation_type: str = Field(default="sub", description="sub or dub")
priority: int = Field(default=5, description="Priority level (1-10, lower is higher priority)")
status: DownloadStatus = Field(default=DownloadStatus.PENDING)
created_at: datetime = Field(default_factory=datetime.now)
started_at: Optional[datetime] = Field(default=None)
completed_at: Optional[datetime] = Field(default=None)
error_message: Optional[str] = Field(default=None)
retry_count: int = Field(default=0)
auto_added: bool = Field(default=False, description="Whether this was auto-added by the service")
class DownloadQueue(BaseModel):
"""Container for all download jobs."""
jobs: Dict[str, DownloadJob] = Field(default_factory=dict)
max_concurrent: int = Field(default=3, description="Maximum concurrent downloads")
auto_retry_count: int = Field(default=3, description="Maximum retry attempts")
class QueueManager:
"""Manages the download queue operations."""
def __init__(self, queue_file_path: Optional[Path] = None):
self.queue_file_path = queue_file_path or APP_DATA_DIR / "download_queue.json"
self._queue: Optional[DownloadQueue] = None
def _load_queue(self) -> DownloadQueue:
"""Load queue from file."""
if self.queue_file_path.exists():
try:
with open(self.queue_file_path, 'r', encoding='utf-8') as f:
data = json.load(f)
return DownloadQueue.model_validate(data)
except (json.JSONDecodeError, ValueError) as e:
logger.error(f"Failed to load queue from {self.queue_file_path}: {e}")
return DownloadQueue()
return DownloadQueue()
def _save_queue(self, queue: DownloadQueue) -> bool:
"""Save queue to file."""
try:
with open(self.queue_file_path, 'w', encoding='utf-8') as f:
json.dump(queue.model_dump(), f, indent=2, default=str)
return True
except Exception as e:
logger.error(f"Failed to save queue to {self.queue_file_path}: {e}")
return False
@property
def queue(self) -> DownloadQueue:
"""Get the current queue, loading it if necessary."""
if self._queue is None:
self._queue = self._load_queue()
return self._queue
def add_job(self, job: DownloadJob) -> bool:
"""Add a new download job to the queue."""
try:
self.queue.jobs[job.id] = job
success = self._save_queue(self.queue)
if success:
logger.info(f"Added download job: {job.anime_title} Episode {job.episode}")
return success
except Exception as e:
logger.error(f"Failed to add job to queue: {e}")
return False
def remove_job(self, job_id: str) -> bool:
"""Remove a job from the queue."""
try:
if job_id in self.queue.jobs:
job = self.queue.jobs.pop(job_id)
success = self._save_queue(self.queue)
if success:
logger.info(f"Removed download job: {job.anime_title} Episode {job.episode}")
return success
return False
except Exception as e:
logger.error(f"Failed to remove job from queue: {e}")
return False
def update_job_status(self, job_id: str, status: DownloadStatus, error_message: Optional[str] = None) -> bool:
"""Update the status of a job."""
try:
if job_id in self.queue.jobs:
job = self.queue.jobs[job_id]
job.status = status
if error_message:
job.error_message = error_message
if status == DownloadStatus.DOWNLOADING:
job.started_at = datetime.now()
elif status in (DownloadStatus.COMPLETED, DownloadStatus.FAILED, DownloadStatus.CANCELLED):
job.completed_at = datetime.now()
return self._save_queue(self.queue)
return False
except Exception as e:
logger.error(f"Failed to update job status: {e}")
return False
def get_pending_jobs(self, limit: Optional[int] = None) -> List[DownloadJob]:
"""Get pending jobs sorted by priority and creation time."""
pending = [
job for job in self.queue.jobs.values()
if job.status == DownloadStatus.PENDING
]
# Sort by priority (lower number = higher priority), then by creation time
pending.sort(key=lambda x: (x.priority, x.created_at))
if limit:
return pending[:limit]
return pending
def get_active_jobs(self) -> List[DownloadJob]:
"""Get currently downloading jobs."""
return [
job for job in self.queue.jobs.values()
if job.status == DownloadStatus.DOWNLOADING
]
def get_job_by_id(self, job_id: str) -> Optional[DownloadJob]:
"""Get a specific job by ID."""
return self.queue.jobs.get(job_id)
def get_all_jobs(self) -> List[DownloadJob]:
"""Get all jobs."""
return list(self.queue.jobs.values())
def clean_completed_jobs(self, max_age_days: int = 7) -> int:
"""Remove completed jobs older than specified days."""
cutoff_date = datetime.now().replace(hour=0, minute=0, second=0, microsecond=0)
cutoff_date = cutoff_date.replace(day=cutoff_date.day - max_age_days)
jobs_to_remove = []
for job_id, job in self.queue.jobs.items():
if (job.status in (DownloadStatus.COMPLETED, DownloadStatus.FAILED, DownloadStatus.CANCELLED)
and job.completed_at and job.completed_at < cutoff_date):
jobs_to_remove.append(job_id)
for job_id in jobs_to_remove:
del self.queue.jobs[job_id]
if jobs_to_remove:
self._save_queue(self.queue)
logger.info(f"Cleaned {len(jobs_to_remove)} old completed jobs")
return len(jobs_to_remove)
def get_queue_stats(self) -> Dict[str, int]:
"""Get statistics about the queue."""
stats = {
"total": len(self.queue.jobs),
"pending": 0,
"downloading": 0,
"completed": 0,
"failed": 0,
"cancelled": 0
}
for job in self.queue.jobs.values():
if job.status == DownloadStatus.PENDING:
stats["pending"] += 1
elif job.status == DownloadStatus.DOWNLOADING:
stats["downloading"] += 1
elif job.status == DownloadStatus.COMPLETED:
stats["completed"] += 1
elif job.status == DownloadStatus.FAILED:
stats["failed"] += 1
elif job.status == DownloadStatus.CANCELLED:
stats["cancelled"] += 1
return stats

View File

@@ -1,68 +0,0 @@
bash_functions = r"""
generate_sha256() {
local input
# Check if input is passed as an argument or piped
if [ -n "$1" ]; then
input="$1"
else
input=$(cat)
fi
if command -v sha256sum &>/dev/null; then
echo -n "$input" | sha256sum | awk '{print $1}'
elif command -v shasum &>/dev/null; then
echo -n "$input" | shasum -a 256 | awk '{print $1}'
elif command -v sha256 &>/dev/null; then
echo -n "$input" | sha256 | awk '{print $1}'
elif command -v openssl &>/dev/null; then
echo -n "$input" | openssl dgst -sha256 | awk '{print $2}'
else
echo -n "$input" | base64 | tr '/+' '_-' | tr -d '\n'
fi
}
fzf_preview() {
file=$1
dim=${FZF_PREVIEW_COLUMNS}x${FZF_PREVIEW_LINES}
if [ "$dim" = x ]; then
dim=$(stty size </dev/tty | awk "{print \$2 \"x\" \$1}")
fi
if ! [ "$FASTANIME_IMAGE_RENDERER" = "icat" ] && [ -z "$KITTY_WINDOW_ID" ] && [ "$((FZF_PREVIEW_TOP + FZF_PREVIEW_LINES))" -eq "$(stty size </dev/tty | awk "{print \$1}")" ]; then
dim=${FZF_PREVIEW_COLUMNS}x$((FZF_PREVIEW_LINES - 1))
fi
if [ "$FASTANIME_IMAGE_RENDERER" = "icat" ] && [ -z "$GHOSTTY_BIN_DIR" ]; then
if command -v kitten >/dev/null 2>&1; then
kitten icat --clear --transfer-mode=memory --unicode-placeholder --stdin=no --place="$dim@0x0" "$file" | sed "\$d" | sed "$(printf "\$s/\$/\033[m/")"
elif command -v icat >/dev/null 2>&1; then
icat --clear --transfer-mode=memory --unicode-placeholder --stdin=no --place="$dim@0x0" "$file" | sed "\$d" | sed "$(printf "\$s/\$/\033[m/")"
else
kitty icat --clear --transfer-mode=memory --unicode-placeholder --stdin=no --place="$dim@0x0" "$file" | sed "\$d" | sed "$(printf "\$s/\$/\033[m/")"
fi
elif [ -n "$GHOSTTY_BIN_DIR" ]; then
if command -v kitten >/dev/null 2>&1; then
kitten icat --clear --transfer-mode=memory --unicode-placeholder --stdin=no --place="$dim@0x0" "$file" | sed "\$d" | sed "$(printf "\$s/\$/\033[m/")"
elif command -v icat >/dev/null 2>&1; then
icat --clear --transfer-mode=memory --unicode-placeholder --stdin=no --place="$dim@0x0" "$file" | sed "\$d" | sed "$(printf "\$s/\$/\033[m/")"
else
chafa -s "$dim" "$file"
fi
elif command -v chafa >/dev/null 2>&1; then
case "$PLATFORM" in
android) chafa -s "$dim" "$file" ;;
windows) chafa -f sixel -s "$dim" "$file" ;;
*) chafa -s "$dim" "$file" ;;
esac
echo
elif command -v imgcat >/dev/null; then
imgcat -W "${dim%%x*}" -H "${dim##*x}" "$file"
else
echo please install a terminal image viewer
echo either icat for kitty terminal and wezterm or imgcat or chafa
fi
}
"""

View File

@@ -1,333 +0,0 @@
"""
Session state management utilities for the interactive CLI.
Provides comprehensive session save/resume functionality with error handling and metadata.
"""
import json
import logging
from datetime import datetime
from pathlib import Path
from typing import Dict, List, Optional
from ...core.constants import APP_DATA_DIR
from ..interactive.state import State
logger = logging.getLogger(__name__)
# Session storage directory
SESSIONS_DIR = APP_DATA_DIR / "sessions"
AUTO_SAVE_FILE = SESSIONS_DIR / "auto_save.json"
CRASH_BACKUP_FILE = SESSIONS_DIR / "crash_backup.json"
class SessionMetadata:
"""Metadata for saved sessions."""
def __init__(
self,
created_at: Optional[datetime] = None,
last_saved: Optional[datetime] = None,
session_name: Optional[str] = None,
description: Optional[str] = None,
state_count: int = 0
):
self.created_at = created_at or datetime.now()
self.last_saved = last_saved or datetime.now()
self.session_name = session_name
self.description = description
self.state_count = state_count
def to_dict(self) -> dict:
"""Convert metadata to dictionary for JSON serialization."""
return {
"created_at": self.created_at.isoformat(),
"last_saved": self.last_saved.isoformat(),
"session_name": self.session_name,
"description": self.description,
"state_count": self.state_count
}
@classmethod
def from_dict(cls, data: dict) -> "SessionMetadata":
"""Create metadata from dictionary."""
return cls(
created_at=datetime.fromisoformat(data.get("created_at", datetime.now().isoformat())),
last_saved=datetime.fromisoformat(data.get("last_saved", datetime.now().isoformat())),
session_name=data.get("session_name"),
description=data.get("description"),
state_count=data.get("state_count", 0)
)
class SessionData:
"""Complete session data including history and metadata."""
def __init__(self, history: List[State], metadata: SessionMetadata):
self.history = history
self.metadata = metadata
def to_dict(self) -> dict:
"""Convert session data to dictionary for JSON serialization."""
return {
"metadata": self.metadata.to_dict(),
"history": [state.model_dump(mode="json") for state in self.history],
"format_version": "1.0" # For future compatibility
}
@classmethod
def from_dict(cls, data: dict) -> "SessionData":
"""Create session data from dictionary."""
metadata = SessionMetadata.from_dict(data.get("metadata", {}))
history_data = data.get("history", [])
history = []
for state_dict in history_data:
try:
state = State.model_validate(state_dict)
history.append(state)
except Exception as e:
logger.warning(f"Skipping invalid state in session: {e}")
return cls(history, metadata)
class SessionManager:
"""Manages session save/resume functionality with comprehensive error handling."""
def __init__(self):
self._ensure_sessions_directory()
def _ensure_sessions_directory(self):
"""Ensure the sessions directory exists."""
SESSIONS_DIR.mkdir(parents=True, exist_ok=True)
def save_session(
self,
history: List[State],
file_path: Path,
session_name: Optional[str] = None,
description: Optional[str] = None,
feedback=None
) -> bool:
"""
Save session history to a JSON file with metadata.
Args:
history: List of session states
file_path: Path to save the session
session_name: Optional name for the session
description: Optional description
feedback: Optional feedback manager for user notifications
Returns:
True if successful, False otherwise
"""
try:
# Create metadata
metadata = SessionMetadata(
session_name=session_name,
description=description,
state_count=len(history)
)
# Create session data
session_data = SessionData(history, metadata)
# Save to file
with file_path.open('w', encoding='utf-8') as f:
json.dump(session_data.to_dict(), f, indent=2, ensure_ascii=False)
if feedback:
feedback.success(
"Session saved successfully",
f"Saved {len(history)} states to {file_path.name}"
)
logger.info(f"Session saved to {file_path} with {len(history)} states")
return True
except Exception as e:
error_msg = f"Failed to save session: {e}"
if feedback:
feedback.error("Failed to save session", str(e))
logger.error(error_msg)
return False
def load_session(self, file_path: Path, feedback=None) -> Optional[List[State]]:
"""
Load session history from a JSON file.
Args:
file_path: Path to the session file
feedback: Optional feedback manager for user notifications
Returns:
List of states if successful, None otherwise
"""
if not file_path.exists():
if feedback:
feedback.warning(
"Session file not found",
f"The file {file_path.name} does not exist"
)
logger.warning(f"Session file not found: {file_path}")
return None
try:
with file_path.open('r', encoding='utf-8') as f:
data = json.load(f)
session_data = SessionData.from_dict(data)
if feedback:
feedback.success(
"Session loaded successfully",
f"Loaded {len(session_data.history)} states from {file_path.name}"
)
logger.info(f"Session loaded from {file_path} with {len(session_data.history)} states")
return session_data.history
except json.JSONDecodeError as e:
error_msg = f"Session file is corrupted: {e}"
if feedback:
feedback.error("Session file is corrupted", str(e))
logger.error(error_msg)
return None
except Exception as e:
error_msg = f"Failed to load session: {e}"
if feedback:
feedback.error("Failed to load session", str(e))
logger.error(error_msg)
return None
def auto_save_session(self, history: List[State]) -> bool:
"""
Auto-save session for crash recovery.
Args:
history: Current session history
Returns:
True if successful, False otherwise
"""
return self.save_session(
history,
AUTO_SAVE_FILE,
session_name="Auto Save",
description="Automatically saved session"
)
def create_crash_backup(self, history: List[State]) -> bool:
"""
Create a crash backup of the current session.
Args:
history: Current session history
Returns:
True if successful, False otherwise
"""
return self.save_session(
history,
CRASH_BACKUP_FILE,
session_name="Crash Backup",
description="Session backup created before potential crash"
)
def has_auto_save(self) -> bool:
"""Check if an auto-save file exists."""
return AUTO_SAVE_FILE.exists()
def has_crash_backup(self) -> bool:
"""Check if a crash backup file exists."""
return CRASH_BACKUP_FILE.exists()
def load_auto_save(self, feedback=None) -> Optional[List[State]]:
"""Load the auto-save session."""
return self.load_session(AUTO_SAVE_FILE, feedback)
def load_crash_backup(self, feedback=None) -> Optional[List[State]]:
"""Load the crash backup session."""
return self.load_session(CRASH_BACKUP_FILE, feedback)
def clear_auto_save(self) -> bool:
"""Clear the auto-save file."""
try:
if AUTO_SAVE_FILE.exists():
AUTO_SAVE_FILE.unlink()
return True
except Exception as e:
logger.error(f"Failed to clear auto-save: {e}")
return False
def clear_crash_backup(self) -> bool:
"""Clear the crash backup file."""
try:
if CRASH_BACKUP_FILE.exists():
CRASH_BACKUP_FILE.unlink()
return True
except Exception as e:
logger.error(f"Failed to clear crash backup: {e}")
return False
def list_saved_sessions(self) -> List[Dict[str, str]]:
"""
List all saved session files with their metadata.
Returns:
List of dictionaries containing session information
"""
sessions = []
for session_file in SESSIONS_DIR.glob("*.json"):
if session_file.name in ["auto_save.json", "crash_backup.json"]:
continue
try:
with session_file.open('r', encoding='utf-8') as f:
data = json.load(f)
metadata = data.get("metadata", {})
sessions.append({
"file": session_file.name,
"path": str(session_file),
"name": metadata.get("session_name", "Unnamed"),
"description": metadata.get("description", "No description"),
"created": metadata.get("created_at", "Unknown"),
"last_saved": metadata.get("last_saved", "Unknown"),
"state_count": metadata.get("state_count", 0)
})
except Exception as e:
logger.warning(f"Failed to read session metadata from {session_file}: {e}")
# Sort by last saved time (newest first)
sessions.sort(key=lambda x: x["last_saved"], reverse=True)
return sessions
def cleanup_old_sessions(self, max_sessions: int = 10) -> int:
"""
Clean up old session files, keeping only the most recent ones.
Args:
max_sessions: Maximum number of sessions to keep
Returns:
Number of sessions deleted
"""
sessions = self.list_saved_sessions()
if len(sessions) <= max_sessions:
return 0
deleted_count = 0
sessions_to_delete = sessions[max_sessions:]
for session in sessions_to_delete:
try:
Path(session["path"]).unlink()
deleted_count += 1
logger.info(f"Deleted old session: {session['name']}")
except Exception as e:
logger.error(f"Failed to delete session {session['name']}: {e}")
return deleted_count

View File

@@ -1,329 +0,0 @@
"""
Watch history manager for local storage operations.
Handles saving, loading, and managing local watch history data.
"""
import json
import logging
from pathlib import Path
from typing import List, Optional
from ...core.constants import USER_WATCH_HISTORY_PATH
from ...libs.api.types import MediaItem
from .watch_history_types import WatchHistoryData, WatchHistoryEntry
logger = logging.getLogger(__name__)
class WatchHistoryManager:
"""
Manages local watch history storage and operations.
Provides comprehensive watch history management with error handling.
"""
def __init__(self, history_file_path: Path = USER_WATCH_HISTORY_PATH):
self.history_file_path = history_file_path
self._data: Optional[WatchHistoryData] = None
self._ensure_history_file()
def _ensure_history_file(self):
"""Ensure the watch history file and directory exist."""
try:
self.history_file_path.parent.mkdir(parents=True, exist_ok=True)
if not self.history_file_path.exists():
# Create empty watch history file
empty_data = WatchHistoryData()
self._save_data(empty_data)
logger.info(f"Created new watch history file at {self.history_file_path}")
except Exception as e:
logger.error(f"Failed to ensure watch history file: {e}")
def _load_data(self) -> WatchHistoryData:
"""Load watch history data from file."""
if self._data is not None:
return self._data
try:
if not self.history_file_path.exists():
self._data = WatchHistoryData()
return self._data
with self.history_file_path.open('r', encoding='utf-8') as f:
data = json.load(f)
self._data = WatchHistoryData.from_dict(data)
logger.debug(f"Loaded watch history with {len(self._data.entries)} entries")
return self._data
except json.JSONDecodeError as e:
logger.error(f"Watch history file is corrupted: {e}")
# Create backup of corrupted file
backup_path = self.history_file_path.with_suffix('.backup')
self.history_file_path.rename(backup_path)
logger.info(f"Corrupted file moved to {backup_path}")
# Create new empty data
self._data = WatchHistoryData()
self._save_data(self._data)
return self._data
except Exception as e:
logger.error(f"Failed to load watch history: {e}")
self._data = WatchHistoryData()
return self._data
def _save_data(self, data: WatchHistoryData) -> bool:
"""Save watch history data to file."""
try:
# Create backup of existing file
if self.history_file_path.exists():
backup_path = self.history_file_path.with_suffix('.bak')
self.history_file_path.rename(backup_path)
with self.history_file_path.open('w', encoding='utf-8') as f:
json.dump(data.to_dict(), f, indent=2, ensure_ascii=False)
# Remove backup on successful save
backup_path = self.history_file_path.with_suffix('.bak')
if backup_path.exists():
backup_path.unlink()
logger.debug(f"Saved watch history with {len(data.entries)} entries")
return True
except Exception as e:
logger.error(f"Failed to save watch history: {e}")
# Restore backup if save failed
backup_path = self.history_file_path.with_suffix('.bak')
if backup_path.exists():
backup_path.rename(self.history_file_path)
return False
def add_or_update_entry(
self,
media_item: MediaItem,
episode: int = 0,
progress: float = 0.0,
status: str = "watching",
notes: str = ""
) -> bool:
"""Add or update a watch history entry."""
try:
data = self._load_data()
entry = data.add_or_update_entry(media_item, episode, progress, status)
if notes:
entry.notes = notes
success = self._save_data(data)
if success:
self._data = data # Update cached data
logger.info(f"Updated watch history for {entry.get_display_title()}")
return success
except Exception as e:
logger.error(f"Failed to add/update watch history entry: {e}")
return False
def get_entry(self, media_id: int) -> Optional[WatchHistoryEntry]:
"""Get a specific watch history entry."""
try:
data = self._load_data()
return data.get_entry(media_id)
except Exception as e:
logger.error(f"Failed to get watch history entry: {e}")
return None
def remove_entry(self, media_id: int) -> bool:
"""Remove an entry from watch history."""
try:
data = self._load_data()
removed = data.remove_entry(media_id)
if removed:
success = self._save_data(data)
if success:
self._data = data
logger.info(f"Removed watch history entry for media ID {media_id}")
return success
return False
except Exception as e:
logger.error(f"Failed to remove watch history entry: {e}")
return False
def get_all_entries(self) -> List[WatchHistoryEntry]:
"""Get all watch history entries."""
try:
data = self._load_data()
return list(data.entries.values())
except Exception as e:
logger.error(f"Failed to get all entries: {e}")
return []
def get_entries_by_status(self, status: str) -> List[WatchHistoryEntry]:
"""Get entries by status (watching, completed, etc.)."""
try:
data = self._load_data()
return data.get_entries_by_status(status)
except Exception as e:
logger.error(f"Failed to get entries by status: {e}")
return []
def get_recently_watched(self, limit: int = 10) -> List[WatchHistoryEntry]:
"""Get recently watched entries."""
try:
data = self._load_data()
return data.get_recently_watched(limit)
except Exception as e:
logger.error(f"Failed to get recently watched: {e}")
return []
def search_entries(self, query: str) -> List[WatchHistoryEntry]:
"""Search entries by title."""
try:
data = self._load_data()
return data.search_entries(query)
except Exception as e:
logger.error(f"Failed to search entries: {e}")
return []
def get_watching_entries(self) -> List[WatchHistoryEntry]:
"""Get entries that are currently being watched."""
return self.get_entries_by_status("watching")
def get_completed_entries(self) -> List[WatchHistoryEntry]:
"""Get completed entries."""
return self.get_entries_by_status("completed")
def mark_episode_watched(self, media_id: int, episode: int, progress: float = 1.0) -> bool:
"""Mark a specific episode as watched."""
entry = self.get_entry(media_id)
if entry:
return self.add_or_update_entry(
entry.media_item,
episode,
progress,
entry.status
)
return False
def mark_completed(self, media_id: int) -> bool:
"""Mark an anime as completed."""
entry = self.get_entry(media_id)
if entry:
entry.mark_completed()
data = self._load_data()
return self._save_data(data)
return False
def change_status(self, media_id: int, new_status: str) -> bool:
"""Change the status of an entry."""
entry = self.get_entry(media_id)
if entry:
return self.add_or_update_entry(
entry.media_item,
entry.last_watched_episode,
entry.watch_progress,
new_status
)
return False
def update_notes(self, media_id: int, notes: str) -> bool:
"""Update notes for an entry."""
entry = self.get_entry(media_id)
if entry:
return self.add_or_update_entry(
entry.media_item,
entry.last_watched_episode,
entry.watch_progress,
entry.status,
notes
)
return False
def get_stats(self) -> dict:
"""Get watch history statistics."""
try:
data = self._load_data()
return data.get_stats()
except Exception as e:
logger.error(f"Failed to get stats: {e}")
return {
"total_entries": 0,
"watching": 0,
"completed": 0,
"dropped": 0,
"paused": 0,
"total_episodes_watched": 0,
"last_updated": "Unknown"
}
def export_history(self, export_path: Path) -> bool:
"""Export watch history to a file."""
try:
data = self._load_data()
with export_path.open('w', encoding='utf-8') as f:
json.dump(data.to_dict(), f, indent=2, ensure_ascii=False)
logger.info(f"Exported watch history to {export_path}")
return True
except Exception as e:
logger.error(f"Failed to export watch history: {e}")
return False
def import_history(self, import_path: Path, merge: bool = True) -> bool:
"""Import watch history from a file."""
try:
if not import_path.exists():
logger.error(f"Import file does not exist: {import_path}")
return False
with import_path.open('r', encoding='utf-8') as f:
import_data = json.load(f)
imported_history = WatchHistoryData.from_dict(import_data)
if merge:
# Merge with existing data
current_data = self._load_data()
for media_id, entry in imported_history.entries.items():
current_data.entries[media_id] = entry
success = self._save_data(current_data)
else:
# Replace existing data
success = self._save_data(imported_history)
if success:
self._data = None # Force reload on next access
logger.info(f"Imported watch history from {import_path}")
return success
except Exception as e:
logger.error(f"Failed to import watch history: {e}")
return False
def clear_history(self) -> bool:
"""Clear all watch history."""
try:
empty_data = WatchHistoryData()
success = self._save_data(empty_data)
if success:
self._data = empty_data
logger.info("Cleared all watch history")
return success
except Exception as e:
logger.error(f"Failed to clear watch history: {e}")
return False
def backup_history(self, backup_path: Path = None) -> bool:
"""Create a backup of watch history."""
try:
if backup_path is None:
from datetime import datetime
timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')
backup_path = self.history_file_path.parent / f"watch_history_backup_{timestamp}.json"
return self.export_history(backup_path)
except Exception as e:
logger.error(f"Failed to backup watch history: {e}")
return False

View File

@@ -1,273 +0,0 @@
"""
Watch history tracking utilities for integration with episode viewing and player controls.
Provides automatic watch history updates during episode viewing.
"""
import logging
from typing import Optional
from ...libs.api.types import MediaItem
from ..utils.watch_history_manager import WatchHistoryManager
logger = logging.getLogger(__name__)
class WatchHistoryTracker:
"""
Tracks watch history automatically during episode viewing.
Integrates with the episode selection and player control systems.
"""
def __init__(self):
self.history_manager = WatchHistoryManager()
def track_episode_start(self, media_item: MediaItem, episode: int) -> bool:
"""
Track when an episode starts being watched.
Args:
media_item: The anime being watched
episode: Episode number being started
Returns:
True if tracking was successful
"""
try:
# Update or create watch history entry
success = self.history_manager.add_or_update_entry(
media_item=media_item,
episode=episode,
progress=0.0,
status="watching"
)
if success:
logger.info(f"Started tracking episode {episode} of {media_item.title.english or media_item.title.romaji}")
return success
except Exception as e:
logger.error(f"Failed to track episode start: {e}")
return False
def track_episode_progress(self, media_id: int, episode: int, progress: float) -> bool:
"""
Track progress within an episode.
Args:
media_id: ID of the anime
episode: Episode number
progress: Progress within the episode (0.0-1.0)
Returns:
True if tracking was successful
"""
try:
success = self.history_manager.mark_episode_watched(media_id, episode, progress)
if success and progress >= 0.8: # Consider episode "watched" at 80%
logger.info(f"Episode {episode} marked as watched (progress: {progress:.1%})")
return success
except Exception as e:
logger.error(f"Failed to track episode progress: {e}")
return False
def track_episode_completion(self, media_id: int, episode: int) -> bool:
"""
Track when an episode is completed.
Args:
media_id: ID of the anime
episode: Episode number completed
Returns:
True if tracking was successful
"""
try:
# Mark episode as fully watched
success = self.history_manager.mark_episode_watched(media_id, episode, 1.0)
if success:
# Check if this was the final episode and mark as completed
entry = self.history_manager.get_entry(media_id)
if entry and entry.media_item.episodes and episode >= entry.media_item.episodes:
self.history_manager.mark_completed(media_id)
logger.info(f"Anime completed: {entry.get_display_title()}")
else:
logger.info(f"Episode {episode} completed")
return success
except Exception as e:
logger.error(f"Failed to track episode completion: {e}")
return False
def get_watch_progress(self, media_id: int) -> Optional[dict]:
"""
Get current watch progress for an anime.
Args:
media_id: ID of the anime
Returns:
Dictionary with progress info or None if not found
"""
try:
entry = self.history_manager.get_entry(media_id)
if entry:
return {
"last_episode": entry.last_watched_episode,
"progress": entry.watch_progress,
"status": entry.status,
"next_episode": entry.last_watched_episode + 1,
"title": entry.get_display_title(),
}
return None
except Exception as e:
logger.error(f"Failed to get watch progress: {e}")
return None
def should_continue_from_history(self, media_id: int, available_episodes: list) -> Optional[str]:
"""
Determine if we should continue from watch history and which episode.
Args:
media_id: ID of the anime
available_episodes: List of available episode numbers
Returns:
Episode number to continue from, or None if no history
"""
try:
progress = self.get_watch_progress(media_id)
if not progress:
return None
last_episode = progress["last_episode"]
next_episode = last_episode + 1
# Check if next episode is available
if str(next_episode) in available_episodes:
logger.info(f"Continuing from episode {next_episode} based on watch history")
return str(next_episode)
# Fall back to last watched episode if next isn't available
elif str(last_episode) in available_episodes and last_episode > 0:
logger.info(f"Next episode not available, falling back to episode {last_episode}")
return str(last_episode)
return None
except Exception as e:
logger.error(f"Failed to determine continue episode: {e}")
return None
def update_anime_status(self, media_id: int, status: str) -> bool:
"""
Update the status of an anime in watch history.
Args:
media_id: ID of the anime
status: New status (watching, completed, dropped, paused)
Returns:
True if update was successful
"""
try:
success = self.history_manager.change_status(media_id, status)
if success:
logger.info(f"Updated anime status to {status}")
return success
except Exception as e:
logger.error(f"Failed to update anime status: {e}")
return False
def add_anime_to_history(self, media_item: MediaItem, status: str = "planning") -> bool:
"""
Add an anime to watch history without watching any episodes.
Args:
media_item: The anime to add
status: Initial status
Returns:
True if successful
"""
try:
success = self.history_manager.add_or_update_entry(
media_item=media_item,
episode=0,
progress=0.0,
status=status
)
if success:
logger.info(f"Added {media_item.title.english or media_item.title.romaji} to watch history")
return success
except Exception as e:
logger.error(f"Failed to add anime to history: {e}")
return False
# Global tracker instance for use throughout the application
watch_tracker = WatchHistoryTracker()
def track_episode_viewing(media_item: MediaItem, episode: int, start_tracking: bool = True) -> bool:
"""
Convenience function to track episode viewing.
Args:
media_item: The anime being watched
episode: Episode number
start_tracking: Whether to start tracking (True) or just update progress
Returns:
True if tracking was successful
"""
if start_tracking:
return watch_tracker.track_episode_start(media_item, episode)
else:
return watch_tracker.track_episode_completion(media_item.id, episode)
def get_continue_episode(media_item: MediaItem, available_episodes: list, prefer_history: bool = True) -> Optional[str]:
"""
Get the episode to continue from based on watch history.
Args:
media_item: The anime
available_episodes: List of available episodes
prefer_history: Whether to prefer local history over remote
Returns:
Episode number to continue from
"""
if prefer_history:
return watch_tracker.should_continue_from_history(media_item.id, available_episodes)
return None
def update_episode_progress(media_id: int, episode: int, completion_percentage: float) -> bool:
"""
Update progress for an episode based on completion percentage.
Args:
media_id: ID of the anime
episode: Episode number
completion_percentage: Completion percentage (0-100)
Returns:
True if update was successful
"""
progress = completion_percentage / 100.0
if completion_percentage >= 80: # Consider episode completed at 80%
return watch_tracker.track_episode_completion(media_id, episode)
else:
return watch_tracker.track_episode_progress(media_id, episode, progress)

View File

@@ -1,183 +0,0 @@
"""
Watch history data models and types for the interactive CLI.
Provides comprehensive data structures for tracking and managing local watch history.
"""
from __future__ import annotations
import logging
from datetime import datetime
from typing import Dict, List, Optional
from pydantic import BaseModel, Field
from ...libs.api.types import MediaItem
logger = logging.getLogger(__name__)
class WatchHistoryEntry(BaseModel):
"""
Represents a single entry in the watch history.
Contains media information and viewing progress.
"""
media_item: MediaItem
last_watched_episode: int = 0
watch_progress: float = 0.0 # Progress within the episode (0.0-1.0)
times_watched: int = 1
first_watched: datetime = Field(default_factory=datetime.now)
last_watched: datetime = Field(default_factory=datetime.now)
status: str = "watching" # watching, completed, dropped, paused
notes: str = ""
# With Pydantic, serialization is automatic!
# No need for manual to_dict() and from_dict() methods
# Use: entry.model_dump() and WatchHistoryEntry.model_validate(data)
def update_progress(self, episode: int, progress: float = 0.0, status: Optional[str] = None):
"""Update watch progress for this entry."""
self.last_watched_episode = max(self.last_watched_episode, episode)
self.watch_progress = progress
self.last_watched = datetime.now()
if status:
self.status = status
def mark_completed(self):
"""Mark this entry as completed."""
self.status = "completed"
self.last_watched = datetime.now()
if self.media_item.episodes:
self.last_watched_episode = self.media_item.episodes
self.watch_progress = 1.0
def get_display_title(self) -> str:
"""Get the best available title for display."""
if self.media_item.title.english:
return self.media_item.title.english
elif self.media_item.title.romaji:
return self.media_item.title.romaji
elif self.media_item.title.native:
return self.media_item.title.native
else:
return f"Anime #{self.media_item.id}"
def get_progress_display(self) -> str:
"""Get a human-readable progress display."""
if self.media_item.episodes:
return f"{self.last_watched_episode}/{self.media_item.episodes}"
else:
return f"Ep {self.last_watched_episode}"
def get_status_emoji(self) -> str:
"""Get emoji representation of status."""
status_emojis = {
"watching": "📺",
"completed": "",
"dropped": "🚮",
"paused": "⏸️",
"planning": "📑"
}
return status_emojis.get(self.status, "")
class WatchHistoryData(BaseModel):
"""Complete watch history data container."""
entries: Dict[int, WatchHistoryEntry] = Field(default_factory=dict)
last_updated: datetime = Field(default_factory=datetime.now)
format_version: str = "1.0"
# With Pydantic, serialization is automatic!
# No need for manual to_dict() and from_dict() methods
# Use: data.model_dump() and WatchHistoryData.model_validate(data)
def add_or_update_entry(self, media_item: MediaItem, episode: int = 0, progress: float = 0.0, status: str = "watching") -> WatchHistoryEntry:
"""Add or update a watch history entry."""
media_id = media_item.id
if media_id in self.entries:
# Update existing entry
entry = self.entries[media_id]
entry.update_progress(episode, progress, status)
entry.times_watched += 1
else:
# Create new entry
entry = WatchHistoryEntry(
media_item=media_item,
last_watched_episode=episode,
watch_progress=progress,
status=status,
)
self.entries[media_id] = entry
self.last_updated = datetime.now()
return entry
def get_entry(self, media_id: int) -> Optional[WatchHistoryEntry]:
"""Get a specific watch history entry."""
return self.entries.get(media_id)
def remove_entry(self, media_id: int) -> bool:
"""Remove an entry from watch history."""
if media_id in self.entries:
del self.entries[media_id]
self.last_updated = datetime.now()
return True
return False
def get_entries_by_status(self, status: str) -> List[WatchHistoryEntry]:
"""Get all entries with a specific status."""
return [entry for entry in self.entries.values() if entry.status == status]
def get_recently_watched(self, limit: int = 10) -> List[WatchHistoryEntry]:
"""Get recently watched entries."""
sorted_entries = sorted(
self.entries.values(),
key=lambda x: x.last_watched,
reverse=True
)
return sorted_entries[:limit]
def get_watching_entries(self) -> List[WatchHistoryEntry]:
"""Get entries that are currently being watched."""
return self.get_entries_by_status("watching")
def get_completed_entries(self) -> List[WatchHistoryEntry]:
"""Get completed entries."""
return self.get_entries_by_status("completed")
def search_entries(self, query: str) -> List[WatchHistoryEntry]:
"""Search entries by title."""
query_lower = query.lower()
results = []
for entry in self.entries.values():
title = entry.get_display_title().lower()
if query_lower in title:
results.append(entry)
return results
def get_stats(self) -> dict:
"""Get watch history statistics."""
total_entries = len(self.entries)
watching = len(self.get_entries_by_status("watching"))
completed = len(self.get_entries_by_status("completed"))
dropped = len(self.get_entries_by_status("dropped"))
paused = len(self.get_entries_by_status("paused"))
total_episodes = sum(
entry.last_watched_episode
for entry in self.entries.values()
)
return {
"total_entries": total_entries,
"watching": watching,
"completed": completed,
"dropped": dropped,
"paused": paused,
"total_episodes_watched": total_episodes,
"last_updated": self.last_updated.strftime("%Y-%m-%d %H:%M:%S"),
}