mirror of
https://github.com/Benexl/FastAnime.git
synced 2025-12-05 20:40:09 -08:00
feat: watch history service
This commit is contained in:
@@ -5,7 +5,7 @@ from typing import Dict, Literal, Optional
|
||||
|
||||
from pydantic import BaseModel, Field, computed_field
|
||||
|
||||
from ....libs.api.types import MediaItem
|
||||
from ....libs.api.types import MediaItem, UserListStatusType
|
||||
from ...utils import converters
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -14,7 +14,6 @@ logger = logging.getLogger(__name__)
|
||||
DownloadStatus = Literal[
|
||||
"not_downloaded", "queued", "downloading", "completed", "failed", "paused"
|
||||
]
|
||||
MediaUserStatus = Literal["planning", "watching", "completed", "dropped", "paused"]
|
||||
REGISTRY_VERSION = "1.0"
|
||||
|
||||
|
||||
@@ -36,7 +35,7 @@ class MediaRegistryIndexEntry(BaseModel):
|
||||
media_id: int
|
||||
media_api: Literal["anilist", "NONE", "jikan"] = "NONE"
|
||||
|
||||
status: MediaUserStatus = "watching"
|
||||
status: UserListStatusType = "watching"
|
||||
progress: str = "0"
|
||||
last_watch_position: Optional[str] = None
|
||||
last_watched: datetime = Field(default_factory=datetime.now)
|
||||
|
||||
@@ -5,12 +5,13 @@ from pathlib import Path
|
||||
from typing import Dict, Generator, List, Optional
|
||||
|
||||
from ....core.config.model import MediaRegistryConfig
|
||||
from ....core.exceptions import FastAnimeError
|
||||
from ....core.utils.file import AtomicWriter, FileLock, check_file_modified
|
||||
from ....libs.api.params import ApiSearchParams
|
||||
from ....libs.api.types import MediaItem
|
||||
from ....libs.players.types import PlayerResult
|
||||
from ....libs.api.types import MediaItem, UserListStatusType
|
||||
from .filters import MediaFilter
|
||||
from .models import (
|
||||
REGISTRY_VERSION,
|
||||
MediaRecord,
|
||||
MediaRegistryIndex,
|
||||
MediaRegistryIndexEntry,
|
||||
@@ -53,6 +54,12 @@ class MediaRegistryService:
|
||||
self._index = MediaRegistryIndex()
|
||||
self._save_index(self._index)
|
||||
|
||||
# check if there was a major change in the registry
|
||||
if self._index.version[0] != REGISTRY_VERSION[0]:
|
||||
raise FastAnimeError(
|
||||
f"Incompatible registry version of {self._index.version}. Current registry supports version {REGISTRY_VERSION}. Please migrate your registry using the migrator"
|
||||
)
|
||||
|
||||
logger.debug(f"Loaded registry index with {self._index.media_count} entries")
|
||||
return self._index
|
||||
|
||||
@@ -131,21 +138,52 @@ class MediaRegistryService:
|
||||
|
||||
return record
|
||||
|
||||
def update_from_player_result(
|
||||
self, media_item: MediaItem, episode_number: str, player_result: PlayerResult
|
||||
def update_media_index_entry(
|
||||
self,
|
||||
media_id: int,
|
||||
watched: bool = False,
|
||||
media_item: Optional[MediaItem] = None,
|
||||
progress: Optional[str] = None,
|
||||
status: Optional[UserListStatusType] = None,
|
||||
last_watch_position: Optional[str] = None,
|
||||
total_duration: Optional[str] = None,
|
||||
score: Optional[float] = None,
|
||||
repeat: Optional[int] = None,
|
||||
notes: Optional[str] = None,
|
||||
last_notified_episode: Optional[str] = None,
|
||||
):
|
||||
"""Update record from player feedback."""
|
||||
self.get_or_create_record(media_item)
|
||||
if media_item:
|
||||
self.get_or_create_record(media_item)
|
||||
|
||||
index = self._load_index()
|
||||
index_entry = index.media_index[f"{self._media_api}_{media_item.id}"]
|
||||
index_entry = index.media_index[f"{self._media_api}_{media_id}"]
|
||||
|
||||
index_entry.last_watch_position = player_result.stop_time
|
||||
index_entry.total_duration = player_result.total_time
|
||||
index_entry.progress = episode_number
|
||||
index_entry.last_watched = datetime.now()
|
||||
if progress:
|
||||
index_entry.progress = progress
|
||||
if index_entry.status:
|
||||
if status:
|
||||
index_entry.status = status
|
||||
else:
|
||||
index_entry.status = "watching"
|
||||
|
||||
index.media_index[f"{self._media_api}_{media_item.id}"] = index_entry
|
||||
if last_watch_position:
|
||||
index_entry.last_watch_position = last_watch_position
|
||||
if total_duration:
|
||||
index_entry.total_duration = total_duration
|
||||
if score:
|
||||
index_entry.score = score
|
||||
if repeat:
|
||||
index_entry.repeat = repeat
|
||||
if notes:
|
||||
index_entry.notes = notes
|
||||
if last_notified_episode:
|
||||
index_entry.last_notified_episode = last_notified_episode
|
||||
|
||||
if watched:
|
||||
index_entry.last_watched = datetime.now()
|
||||
|
||||
index.media_index[f"{self._media_api}_{media_id}"] = index_entry
|
||||
self._save_index(index)
|
||||
|
||||
def get_recently_watched(self, limit: int) -> List[MediaRecord]:
|
||||
|
||||
@@ -1,351 +1,72 @@
|
||||
"""
|
||||
Watch history manager for local storage operations.
|
||||
Handles saving, loading, and managing local watch history data.
|
||||
"""
|
||||
|
||||
import json
|
||||
import logging
|
||||
from pathlib import Path
|
||||
from typing import List, Optional
|
||||
from typing import Optional
|
||||
|
||||
from ....core.constants import USER_WATCH_HISTORY_PATH
|
||||
from ....libs.api.types import MediaItem
|
||||
from .types import WatchHistoryData, WatchHistoryEntry
|
||||
from ....core.config.model import AppConfig
|
||||
from ....libs.api.base import BaseApiClient
|
||||
from ....libs.api.params import UpdateListEntryParams
|
||||
from ....libs.api.types import MediaItem, UserListStatusType
|
||||
from ....libs.players.types import PlayerResult
|
||||
from ..registry import MediaRegistryService
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
# TODO: Implement stuff like syncing btw local and remote
|
||||
class WatchHistoryService:
|
||||
"""
|
||||
Manages local watch history storage and operations.
|
||||
Provides comprehensive watch history management with error handling.
|
||||
"""
|
||||
|
||||
def __init__(self, history_file_path: Path = USER_WATCH_HISTORY_PATH):
|
||||
self.history_file_path = history_file_path
|
||||
self._data: Optional[WatchHistoryData] = None
|
||||
self._ensure_history_file()
|
||||
|
||||
def _ensure_history_file(self):
|
||||
"""Ensure the watch history file and directory exist."""
|
||||
try:
|
||||
self.history_file_path.parent.mkdir(parents=True, exist_ok=True)
|
||||
if not self.history_file_path.exists():
|
||||
# Create empty watch history file
|
||||
empty_data = WatchHistoryData()
|
||||
self._save_data(empty_data)
|
||||
logger.info(
|
||||
f"Created new watch history file at {self.history_file_path}"
|
||||
)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to ensure watch history file: {e}")
|
||||
|
||||
def _load_data(self) -> WatchHistoryData:
|
||||
"""Load watch history data from file."""
|
||||
if self._data is not None:
|
||||
return self._data
|
||||
|
||||
try:
|
||||
if not self.history_file_path.exists():
|
||||
self._data = WatchHistoryData()
|
||||
return self._data
|
||||
|
||||
with self.history_file_path.open("r", encoding="utf-8") as f:
|
||||
data = json.load(f)
|
||||
|
||||
self._data = WatchHistoryData.from_dict(data)
|
||||
logger.debug(f"Loaded watch history with {len(self._data.entries)} entries")
|
||||
return self._data
|
||||
|
||||
except json.JSONDecodeError as e:
|
||||
logger.error(f"Watch history file is corrupted: {e}")
|
||||
# Create backup of corrupted file
|
||||
backup_path = self.history_file_path.with_suffix(".backup")
|
||||
self.history_file_path.rename(backup_path)
|
||||
logger.info(f"Corrupted file moved to {backup_path}")
|
||||
|
||||
# Create new empty data
|
||||
self._data = WatchHistoryData()
|
||||
self._save_data(self._data)
|
||||
return self._data
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to load watch history: {e}")
|
||||
self._data = WatchHistoryData()
|
||||
return self._data
|
||||
|
||||
def _save_data(self, data: WatchHistoryData) -> bool:
|
||||
"""Save watch history data to file."""
|
||||
try:
|
||||
# Create backup of existing file
|
||||
if self.history_file_path.exists():
|
||||
backup_path = self.history_file_path.with_suffix(".bak")
|
||||
self.history_file_path.rename(backup_path)
|
||||
|
||||
with self.history_file_path.open("w", encoding="utf-8") as f:
|
||||
json.dump(data.to_dict(), f, indent=2, ensure_ascii=False)
|
||||
|
||||
# Remove backup on successful save
|
||||
backup_path = self.history_file_path.with_suffix(".bak")
|
||||
if backup_path.exists():
|
||||
backup_path.unlink()
|
||||
|
||||
logger.debug(f"Saved watch history with {len(data.entries)} entries")
|
||||
return True
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to save watch history: {e}")
|
||||
# Restore backup if save failed
|
||||
backup_path = self.history_file_path.with_suffix(".bak")
|
||||
if backup_path.exists():
|
||||
backup_path.rename(self.history_file_path)
|
||||
return False
|
||||
|
||||
def add_or_update_entry(
|
||||
def __init__(
|
||||
self,
|
||||
media_item: MediaItem,
|
||||
episode: int = 0,
|
||||
progress: float = 0.0,
|
||||
status: str = "watching",
|
||||
notes: str = "",
|
||||
) -> bool:
|
||||
"""Add or update a watch history entry."""
|
||||
try:
|
||||
data = self._load_data()
|
||||
entry = data.add_or_update_entry(media_item, episode, progress, status)
|
||||
if notes:
|
||||
entry.notes = notes
|
||||
config: AppConfig,
|
||||
media_registry: MediaRegistryService,
|
||||
media_api: Optional[BaseApiClient] = None,
|
||||
):
|
||||
self.config = config
|
||||
self.media_registry = media_registry
|
||||
self.media_api = media_api
|
||||
|
||||
success = self._save_data(data)
|
||||
if success:
|
||||
self._data = data # Update cached data
|
||||
logger.info(f"Updated watch history for {entry.get_display_title()}")
|
||||
return success
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to add/update watch history entry: {e}")
|
||||
return False
|
||||
|
||||
def get_entry(self, media_id: int) -> Optional[WatchHistoryEntry]:
|
||||
"""Get a specific watch history entry."""
|
||||
try:
|
||||
data = self._load_data()
|
||||
return data.get_entry(media_id)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get watch history entry: {e}")
|
||||
return None
|
||||
|
||||
def remove_entry(self, media_id: int) -> bool:
|
||||
"""Remove an entry from watch history."""
|
||||
try:
|
||||
data = self._load_data()
|
||||
removed = data.remove_entry(media_id)
|
||||
|
||||
if removed:
|
||||
success = self._save_data(data)
|
||||
if success:
|
||||
self._data = data
|
||||
logger.info(f"Removed watch history entry for media ID {media_id}")
|
||||
return success
|
||||
return False
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to remove watch history entry: {e}")
|
||||
return False
|
||||
|
||||
def get_all_entries(self) -> List[WatchHistoryEntry]:
|
||||
"""Get all watch history entries."""
|
||||
try:
|
||||
data = self._load_data()
|
||||
return list(data.entries.values())
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get all entries: {e}")
|
||||
return []
|
||||
|
||||
def get_entries_by_status(self, status: str) -> List[WatchHistoryEntry]:
|
||||
"""Get entries by status (watching, completed, etc.)."""
|
||||
try:
|
||||
data = self._load_data()
|
||||
return data.get_entries_by_status(status)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get entries by status: {e}")
|
||||
return []
|
||||
|
||||
def get_recently_watched(self, limit: int = 10) -> List[WatchHistoryEntry]:
|
||||
"""Get recently watched entries."""
|
||||
try:
|
||||
data = self._load_data()
|
||||
return data.get_recently_watched(limit)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get recently watched: {e}")
|
||||
return []
|
||||
|
||||
def search_entries(self, query: str) -> List[WatchHistoryEntry]:
|
||||
"""Search entries by title."""
|
||||
try:
|
||||
data = self._load_data()
|
||||
return data.search_entries(query)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to search entries: {e}")
|
||||
return []
|
||||
|
||||
def get_watching_entries(self) -> List[WatchHistoryEntry]:
|
||||
"""Get entries that are currently being watched."""
|
||||
return self.get_entries_by_status("watching")
|
||||
|
||||
def get_completed_entries(self) -> List[WatchHistoryEntry]:
|
||||
"""Get completed entries."""
|
||||
return self.get_entries_by_status("completed")
|
||||
|
||||
def mark_episode_watched(
|
||||
self, media_id: int, episode: int, progress: float = 1.0
|
||||
) -> bool:
|
||||
"""Mark a specific episode as watched."""
|
||||
entry = self.get_entry(media_id)
|
||||
if entry:
|
||||
return self.add_or_update_entry(
|
||||
entry.media_item, episode, progress, entry.status
|
||||
)
|
||||
return False
|
||||
|
||||
def mark_completed(self, media_id: int) -> bool:
|
||||
"""Mark an anime as completed."""
|
||||
entry = self.get_entry(media_id)
|
||||
if entry:
|
||||
entry.mark_completed()
|
||||
data = self._load_data()
|
||||
return self._save_data(data)
|
||||
return False
|
||||
|
||||
def change_status(self, media_id: int, new_status: str) -> bool:
|
||||
"""Change the status of an entry."""
|
||||
entry = self.get_entry(media_id)
|
||||
if entry:
|
||||
return self.add_or_update_entry(
|
||||
entry.media_item,
|
||||
entry.last_watched_episode,
|
||||
entry.watch_progress,
|
||||
new_status,
|
||||
)
|
||||
return False
|
||||
|
||||
def update_notes(self, media_id: int, notes: str) -> bool:
|
||||
"""Update notes for an entry."""
|
||||
entry = self.get_entry(media_id)
|
||||
if entry:
|
||||
return self.add_or_update_entry(
|
||||
entry.media_item,
|
||||
entry.last_watched_episode,
|
||||
entry.watch_progress,
|
||||
entry.status,
|
||||
notes,
|
||||
)
|
||||
return False
|
||||
|
||||
def get_stats(self) -> dict:
|
||||
"""Get watch history statistics."""
|
||||
try:
|
||||
data = self._load_data()
|
||||
return data.get_stats()
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to get stats: {e}")
|
||||
return {
|
||||
"total_entries": 0,
|
||||
"watching": 0,
|
||||
"completed": 0,
|
||||
"dropped": 0,
|
||||
"paused": 0,
|
||||
"total_episodes_watched": 0,
|
||||
"last_updated": "Unknown",
|
||||
}
|
||||
|
||||
def export_history(self, export_path: Path) -> bool:
|
||||
"""Export watch history to a file."""
|
||||
try:
|
||||
data = self._load_data()
|
||||
with export_path.open("w", encoding="utf-8") as f:
|
||||
json.dump(data.to_dict(), f, indent=2, ensure_ascii=False)
|
||||
logger.info(f"Exported watch history to {export_path}")
|
||||
return True
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to export watch history: {e}")
|
||||
return False
|
||||
|
||||
def import_history(self, import_path: Path, merge: bool = True) -> bool:
|
||||
"""Import watch history from a file."""
|
||||
try:
|
||||
if not import_path.exists():
|
||||
logger.error(f"Import file does not exist: {import_path}")
|
||||
return False
|
||||
|
||||
with import_path.open("r", encoding="utf-8") as f:
|
||||
import_data = json.load(f)
|
||||
|
||||
imported_history = WatchHistoryData.from_dict(import_data)
|
||||
|
||||
if merge:
|
||||
# Merge with existing data
|
||||
current_data = self._load_data()
|
||||
for media_id, entry in imported_history.entries.items():
|
||||
current_data.entries[media_id] = entry
|
||||
success = self._save_data(current_data)
|
||||
else:
|
||||
# Replace existing data
|
||||
success = self._save_data(imported_history)
|
||||
|
||||
if success:
|
||||
self._data = None # Force reload on next access
|
||||
logger.info(f"Imported watch history from {import_path}")
|
||||
|
||||
return success
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to import watch history: {e}")
|
||||
return False
|
||||
|
||||
def clear_history(self) -> bool:
|
||||
"""Clear all watch history."""
|
||||
try:
|
||||
empty_data = WatchHistoryData()
|
||||
success = self._save_data(empty_data)
|
||||
if success:
|
||||
self._data = empty_data
|
||||
logger.info("Cleared all watch history")
|
||||
return success
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to clear watch history: {e}")
|
||||
return False
|
||||
|
||||
def backup_history(self, backup_path: Path = None) -> bool:
|
||||
"""Create a backup of watch history."""
|
||||
try:
|
||||
if backup_path is None:
|
||||
from datetime import datetime
|
||||
|
||||
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
|
||||
backup_path = (
|
||||
self.history_file_path.parent
|
||||
/ f"watch_history_backup_{timestamp}.json"
|
||||
)
|
||||
|
||||
return self.export_history(backup_path)
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to backup watch history: {e}")
|
||||
return False
|
||||
|
||||
def get_entry_by_media_id(self, media_id: int) -> Optional[WatchHistoryEntry]:
|
||||
"""Get watch history entry by media ID (alias for get_entry)."""
|
||||
return self.get_entry(media_id)
|
||||
|
||||
def save_entry(self, entry: WatchHistoryEntry) -> bool:
|
||||
"""Save a watch history entry (alias for add_or_update_entry)."""
|
||||
return self.add_or_update_entry(
|
||||
entry.media_item,
|
||||
entry.status,
|
||||
entry.last_watched_episode,
|
||||
entry.watch_progress,
|
||||
def track(self, media_item: MediaItem, episode: str, player_result: PlayerResult):
|
||||
status = None
|
||||
self.media_registry.update_media_index_entry(
|
||||
media_id=media_item.id,
|
||||
watched=True,
|
||||
media_item=media_item,
|
||||
last_watch_position=player_result.stop_time,
|
||||
total_duration=player_result.total_time,
|
||||
progress=episode,
|
||||
status=status,
|
||||
)
|
||||
|
||||
def get_currently_watching(self) -> List[WatchHistoryEntry]:
|
||||
"""Get entries that are currently being watched."""
|
||||
return self.get_watching_entries()
|
||||
if self.media_api:
|
||||
self.media_api.update_list_entry(
|
||||
UpdateListEntryParams(
|
||||
media_id=media_item.id,
|
||||
progress=episode,
|
||||
status=status,
|
||||
)
|
||||
)
|
||||
|
||||
def update(
|
||||
self,
|
||||
media_item: MediaItem,
|
||||
progress: Optional[str] = None,
|
||||
status: Optional[UserListStatusType] = None,
|
||||
score: Optional[float] = None,
|
||||
notes: Optional[str] = None,
|
||||
):
|
||||
self.media_registry.update_media_index_entry(
|
||||
media_id=media_item.id,
|
||||
media_item=media_item,
|
||||
progress=progress,
|
||||
status=status,
|
||||
score=score,
|
||||
notes=notes,
|
||||
)
|
||||
|
||||
if self.media_api:
|
||||
self.media_api.update_list_entry(
|
||||
UpdateListEntryParams(
|
||||
media_id=media_item.id,
|
||||
status=status,
|
||||
score=score,
|
||||
progress=progress,
|
||||
)
|
||||
)
|
||||
|
||||
@@ -6,7 +6,6 @@ from httpx import Client
|
||||
from ....core.config import AnilistConfig
|
||||
from ....core.utils.graphql import (
|
||||
execute_graphql,
|
||||
execute_graphql_query_with_get_request,
|
||||
)
|
||||
from ..base import ApiSearchParams, BaseApiClient, UpdateListEntryParams, UserListParams
|
||||
from ..types import MediaSearchResult, UserProfile
|
||||
@@ -16,6 +15,16 @@ logger = logging.getLogger(__name__)
|
||||
ANILIST_ENDPOINT = "https://graphql.anilist.co"
|
||||
|
||||
|
||||
status_map = {
|
||||
"watching": "CURRENT",
|
||||
"planning": "PLANNING",
|
||||
"completed": "COMPLETED",
|
||||
"dropped": "DROPPED",
|
||||
"paused": "PAUSED",
|
||||
"repeating": "REPEATING",
|
||||
}
|
||||
|
||||
|
||||
class AniListApi(BaseApiClient):
|
||||
"""AniList API implementation of the BaseApiClient contract."""
|
||||
|
||||
@@ -70,8 +79,8 @@ class AniListApi(BaseApiClient):
|
||||
score_raw = int(params.score * 10) if params.score is not None else None
|
||||
variables = {
|
||||
"mediaId": params.media_id,
|
||||
"status": params.status,
|
||||
"progress": params.progress,
|
||||
"status": status_map[params.status] if params.status else None,
|
||||
"progress": int(float(params.progress)) if params.progress else None,
|
||||
"scoreRaw": score_raw,
|
||||
}
|
||||
variables = {k: v for k, v in variables.items() if v is not None}
|
||||
|
||||
@@ -1,6 +1,6 @@
|
||||
import logging
|
||||
from datetime import datetime
|
||||
from typing import List, Optional
|
||||
from typing import Dict, List, Optional
|
||||
|
||||
from ..types import (
|
||||
AiringSchedule,
|
||||
@@ -23,6 +23,7 @@ from .types import (
|
||||
AnilistImage,
|
||||
AnilistMediaList,
|
||||
AnilistMediaLists,
|
||||
AnilistMediaListStatus,
|
||||
AnilistMediaNextAiringEpisode,
|
||||
AnilistMediaTag,
|
||||
AnilistMediaTitle,
|
||||
@@ -37,6 +38,15 @@ from .types import (
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
status_map = {
|
||||
"CURRENT": "watching",
|
||||
"PLANNING": "planning",
|
||||
"COMPLETED": "completed",
|
||||
"DROPPED": "dropped",
|
||||
"PAUSED": "paused",
|
||||
"REPEATING": "repeating",
|
||||
}
|
||||
|
||||
|
||||
def _to_generic_media_title(anilist_title: AnilistMediaTitle) -> MediaTitle:
|
||||
"""Maps an AniList title object to a generic MediaTitle."""
|
||||
@@ -76,9 +86,6 @@ def _to_generic_airing_schedule(
|
||||
anilist_schedule: AnilistMediaNextAiringEpisode,
|
||||
) -> Optional[AiringSchedule]:
|
||||
"""Maps an AniList nextAiringEpisode object to a generic AiringSchedule."""
|
||||
if not anilist_schedule:
|
||||
return
|
||||
|
||||
return AiringSchedule(
|
||||
airing_at=datetime.fromtimestamp(anilist_schedule["airingAt"])
|
||||
if anilist_schedule.get("airingAt")
|
||||
@@ -126,7 +133,7 @@ def _to_generic_user_status(
|
||||
"""Maps an AniList mediaListEntry to a generic UserListStatus."""
|
||||
if anilist_list_entry:
|
||||
return UserListStatus(
|
||||
status=anilist_list_entry["status"],
|
||||
status=status_map[anilist_list_entry["status"]], # pyright: ignore
|
||||
progress=anilist_list_entry["progress"],
|
||||
score=anilist_list_entry["score"],
|
||||
repeat=anilist_list_entry["repeat"],
|
||||
|
||||
@@ -242,7 +242,7 @@ class AnilistNotifications(TypedDict):
|
||||
|
||||
class AnilistMediaList(TypedDict):
|
||||
media: AnilistBaseMediaDataSchema
|
||||
status: str
|
||||
status: AnilistMediaListStatus
|
||||
progress: int
|
||||
score: int
|
||||
repeat: int
|
||||
|
||||
@@ -1,6 +1,8 @@
|
||||
from dataclasses import dataclass
|
||||
from typing import List, Literal, Optional, Union
|
||||
|
||||
from .types import UserListStatusType
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ApiSearchParams:
|
||||
@@ -66,8 +68,6 @@ class UserListParams:
|
||||
@dataclass(frozen=True)
|
||||
class UpdateListEntryParams:
|
||||
media_id: int
|
||||
status: Optional[
|
||||
Literal["CURRENT", "PLANNING", "COMPLETED", "DROPPED", "PAUSED", "REPEATING"]
|
||||
] = None
|
||||
progress: Optional[int] = None
|
||||
status: Optional[UserListStatusType] = None
|
||||
progress: Optional[str] = None
|
||||
score: Optional[float] = None
|
||||
|
||||
@@ -1,5 +1,3 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from datetime import datetime
|
||||
from typing import List, Literal, Optional
|
||||
|
||||
@@ -11,10 +9,10 @@ MediaType = Literal["ANIME", "MANGA"]
|
||||
MediaStatus = Literal[
|
||||
"FINISHED", "RELEASING", "NOT_YET_RELEASED", "CANCELLED", "HIATUS"
|
||||
]
|
||||
UserListStatusType = Literal[
|
||||
"CURRENT", "PLANNING", "COMPLETED", "DROPPED", "PAUSED", "REPEATING"
|
||||
]
|
||||
|
||||
UserListStatusType = Literal[
|
||||
"planning", "watching", "completed", "dropped", "paused", "repeating"
|
||||
]
|
||||
# --- Generic Data Models ---
|
||||
|
||||
|
||||
@@ -83,7 +81,7 @@ class UserListStatus(BaseApiModel):
|
||||
|
||||
id: int | None = None
|
||||
|
||||
status: Optional[str] = None
|
||||
status: Optional[UserListStatusType] = None
|
||||
progress: Optional[int] = None
|
||||
score: Optional[float] = None
|
||||
repeat: Optional[int] = None
|
||||
|
||||
Reference in New Issue
Block a user