mirror of
https://github.com/Benexl/FastAnime.git
synced 2026-01-04 00:37:04 -08:00
278 lines
9.9 KiB
Python
278 lines
9.9 KiB
Python
import json
|
|
import logging
|
|
from datetime import datetime
|
|
from pathlib import Path
|
|
from typing import Dict, Generator, List, Optional
|
|
|
|
from ....core.config.model import MediaRegistryConfig
|
|
from ....core.exceptions import FastAnimeError
|
|
from ....core.utils.file import AtomicWriter, FileLock, check_file_modified
|
|
from ....libs.api.params import MediaSearchParams
|
|
from ....libs.api.types import (
|
|
MediaItem,
|
|
MediaSearchResult,
|
|
PageInfo,
|
|
UserMediaListStatus,
|
|
)
|
|
from .filters import MediaFilter
|
|
from .models import (
|
|
REGISTRY_VERSION,
|
|
MediaRecord,
|
|
MediaRegistryIndex,
|
|
MediaRegistryIndexEntry,
|
|
)
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class MediaRegistryService:
|
|
def __init__(self, media_api: str, config: MediaRegistryConfig):
|
|
self.config = config
|
|
self.media_registry_dir = self.config.media_dir / media_api
|
|
self._media_api = media_api
|
|
self._ensure_directories()
|
|
self._index = None
|
|
self._index_file = self.config.index_dir / "registry.json"
|
|
self._index_file_modified_time = 0
|
|
_lock_file = self.config.media_dir / "registry.lock"
|
|
self._lock = FileLock(_lock_file)
|
|
self._load_index()
|
|
|
|
def _ensure_directories(self) -> None:
|
|
"""Ensure registry directories exist."""
|
|
try:
|
|
self.media_registry_dir.mkdir(parents=True, exist_ok=True)
|
|
self.config.index_dir.mkdir(parents=True, exist_ok=True)
|
|
except Exception as e:
|
|
logger.error(f"Failed to create registry directories: {e}")
|
|
|
|
def _load_index(self) -> MediaRegistryIndex:
|
|
"""Load or create the registry index."""
|
|
self._index_file_modified_time, is_modified = check_file_modified(
|
|
self._index_file, self._index_file_modified_time
|
|
)
|
|
if not is_modified and self._index is not None:
|
|
return self._index
|
|
if self._index_file.exists():
|
|
with self._index_file.open("r", encoding="utf-8") as f:
|
|
data = json.load(f)
|
|
self._index = MediaRegistryIndex.model_validate(data)
|
|
else:
|
|
self._index = MediaRegistryIndex()
|
|
self._save_index(self._index)
|
|
|
|
# check if there was a major change in the registry
|
|
if self._index.version[0] != REGISTRY_VERSION[0]:
|
|
raise FastAnimeError(
|
|
f"Incompatible registry version of {self._index.version}. Current registry supports version {REGISTRY_VERSION}. Please migrate your registry using the migrator"
|
|
)
|
|
|
|
logger.debug(f"Loaded registry index with {self._index.media_count} entries")
|
|
return self._index
|
|
|
|
def _save_index(self, index: MediaRegistryIndex):
|
|
"""Save the registry index."""
|
|
with self._lock:
|
|
index.last_updated = datetime.now()
|
|
with AtomicWriter(self._index_file) as f:
|
|
json.dump(index.model_dump(mode="json"), f, indent=2)
|
|
|
|
logger.debug("saved registry index")
|
|
|
|
def get_media_index_entry(self, media_id: int) -> Optional[MediaRegistryIndexEntry]:
|
|
index = self._load_index()
|
|
return index.media_index.get(f"{self._media_api}_{media_id}")
|
|
|
|
def _get_media_file_path(self, media_id: int) -> Path:
|
|
"""Get file path for media record."""
|
|
return self.media_registry_dir / f"{media_id}.json"
|
|
|
|
def get_media_record(self, media_id: int) -> Optional[MediaRecord]:
|
|
record_file = self._get_media_file_path(media_id)
|
|
if not record_file.exists():
|
|
return None
|
|
|
|
data = json.load(record_file.open(mode="r", encoding="utf-8"))
|
|
|
|
record = MediaRecord.model_validate(data)
|
|
|
|
logger.debug(f"Loaded media record for {media_id}")
|
|
return record
|
|
|
|
def get_or_create_index_entry(self, media_id: int) -> MediaRegistryIndexEntry:
|
|
index_entry = self.get_media_index_entry(media_id)
|
|
if not index_entry:
|
|
index = self._load_index()
|
|
index_entry = MediaRegistryIndexEntry(
|
|
media_id=media_id,
|
|
media_api=self._media_api, # pyright:ignore
|
|
)
|
|
index.media_index[f"{self._media_api}_{media_id}"] = index_entry
|
|
self._save_index(index)
|
|
return index_entry
|
|
return index_entry
|
|
|
|
def save_media_index_entry(self, index_entry: MediaRegistryIndexEntry) -> bool:
|
|
index = self._load_index()
|
|
index.media_index[f"{self._media_api}_{index_entry.media_id}"] = index_entry
|
|
self._save_index(index)
|
|
|
|
logger.debug(f"Saved media record for {index_entry.media_id}")
|
|
return True
|
|
|
|
def save_media_record(self, record: MediaRecord) -> bool:
|
|
self.get_or_create_index_entry(record.media_item.id)
|
|
with self._lock:
|
|
media_id = record.media_item.id
|
|
|
|
record_file = self._get_media_file_path(media_id)
|
|
|
|
with AtomicWriter(record_file) as f:
|
|
json.dump(record.model_dump(mode="json"), f, indent=2, default=str)
|
|
|
|
logger.debug(f"Saved media record for {media_id}")
|
|
return True
|
|
|
|
def get_or_create_record(self, media_item: MediaItem) -> MediaRecord:
|
|
record = self.get_media_record(media_item.id)
|
|
if record is None:
|
|
record = MediaRecord(media_item=media_item)
|
|
self.save_media_record(record)
|
|
else:
|
|
record.media_item = media_item
|
|
self.save_media_record(record)
|
|
|
|
return record
|
|
|
|
def update_media_index_entry(
|
|
self,
|
|
media_id: int,
|
|
watched: bool = False,
|
|
media_item: Optional[MediaItem] = None,
|
|
progress: Optional[str] = None,
|
|
status: Optional[UserMediaListStatus] = None,
|
|
last_watch_position: Optional[str] = None,
|
|
total_duration: Optional[str] = None,
|
|
score: Optional[float] = None,
|
|
repeat: Optional[int] = None,
|
|
notes: Optional[str] = None,
|
|
last_notified_episode: Optional[str] = None,
|
|
):
|
|
"""Update record from player feedback."""
|
|
if media_item:
|
|
self.get_or_create_record(media_item)
|
|
|
|
index = self._load_index()
|
|
index_entry = index.media_index[f"{self._media_api}_{media_id}"]
|
|
|
|
if progress:
|
|
index_entry.progress = progress
|
|
if index_entry.status:
|
|
if status:
|
|
index_entry.status = status
|
|
else:
|
|
index_entry.status = UserMediaListStatus.WATCHING
|
|
|
|
if last_watch_position:
|
|
index_entry.last_watch_position = last_watch_position
|
|
if total_duration:
|
|
index_entry.total_duration = total_duration
|
|
if score:
|
|
index_entry.score = score
|
|
if repeat:
|
|
index_entry.repeat = repeat
|
|
if notes:
|
|
index_entry.notes = notes
|
|
if last_notified_episode:
|
|
index_entry.last_notified_episode = last_notified_episode
|
|
|
|
if watched:
|
|
index_entry.last_watched = datetime.now()
|
|
|
|
index.media_index[f"{self._media_api}_{media_id}"] = index_entry
|
|
self._save_index(index)
|
|
|
|
# TODO: standardize params passed to this
|
|
def get_recently_watched(self, limit: Optional[int] = None) -> MediaSearchResult:
|
|
"""Get recently watched anime."""
|
|
index = self._load_index()
|
|
|
|
sorted_entries = sorted(
|
|
index.media_index.values(), key=lambda x: x.last_watched, reverse=True
|
|
)
|
|
|
|
recent_media: List[MediaItem] = []
|
|
for entry in sorted_entries:
|
|
record = self.get_media_record(entry.media_id)
|
|
if record:
|
|
recent_media.append(record.media_item)
|
|
# if len(recent_media) == limit:
|
|
# break
|
|
|
|
page_info = PageInfo(
|
|
total=len(sorted_entries),
|
|
)
|
|
return MediaSearchResult(page_info=page_info, media=recent_media)
|
|
|
|
def get_registry_stats(self) -> Dict:
|
|
"""Get comprehensive registry statistics."""
|
|
try:
|
|
index = self._load_index()
|
|
|
|
return {
|
|
"total_media_breakdown": index.media_count_breakdown,
|
|
"status_breakdown": index.status_breakdown,
|
|
"last_updated": index.last_updated.strftime("%Y-%m-%d %H:%M:%S"),
|
|
}
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to get registry stats: {e}")
|
|
return {}
|
|
|
|
def get_all_media_records(self) -> Generator[MediaRecord, None, List[MediaRecord]]:
|
|
records = []
|
|
for record_file in self.media_registry_dir.iterdir():
|
|
try:
|
|
if record_file.is_file():
|
|
id = record_file.stem
|
|
if record := self.get_media_record(int(id)):
|
|
records.append(record)
|
|
yield record
|
|
else:
|
|
logger.warning(
|
|
f"{self.media_registry_dir} is impure; ignoring folder: {record_file}"
|
|
)
|
|
except Exception as e:
|
|
logger.warning(f"{self.media_registry_dir} is impure which caused: {e}")
|
|
return records
|
|
|
|
def search_for_media(self, params: MediaSearchParams) -> List[MediaItem]:
|
|
"""Search media by title."""
|
|
try:
|
|
# TODO: enhance performance
|
|
media_items = [record.media_item for record in self.get_all_media_records()]
|
|
|
|
return MediaFilter.apply(media_items, params)
|
|
|
|
except Exception as e:
|
|
logger.error(f"Failed to search media: {e}")
|
|
return []
|
|
|
|
def remove_media_record(self, media_id: int):
|
|
with self._lock:
|
|
record_file = self._get_media_file_path(media_id)
|
|
if record_file.exists():
|
|
record_file.unlink()
|
|
try:
|
|
record_file.parent.rmdir()
|
|
except OSError:
|
|
pass
|
|
|
|
index = self._load_index()
|
|
id = f"{self._media_api}_{media_id}"
|
|
if id in index.media_index:
|
|
del index.media_index[id]
|
|
self._save_index(index)
|
|
|
|
logger.debug(f"Removed media record {media_id}")
|