mirror of
https://github.com/Benexl/FastAnime.git
synced 2025-12-05 20:40:09 -08:00
feat(media-api): media reviews
This commit is contained in:
19
fastanime/assets/scripts/fzf/review-info.template.sh
Normal file
19
fastanime/assets/scripts/fzf/review-info.template.sh
Normal file
@@ -0,0 +1,19 @@
|
||||
#!/bin/sh
|
||||
#
|
||||
# FastAnime Review Info Script Template
|
||||
# This script formats and displays review details in the FZF preview pane.
|
||||
# Python injects the actual data values into the placeholders.
|
||||
|
||||
draw_rule
|
||||
|
||||
print_kv "Review By" "{REVIEWER_NAME}"
|
||||
|
||||
draw_rule
|
||||
|
||||
print_kv "Summary" "{REVIEW_SUMMARY}"
|
||||
|
||||
draw_rule
|
||||
|
||||
echo "{REVIEW_BODY}" | fold -s -w "$WIDTH"
|
||||
|
||||
draw_rule
|
||||
75
fastanime/assets/scripts/fzf/review-preview.template.sh
Normal file
75
fastanime/assets/scripts/fzf/review-preview.template.sh
Normal file
@@ -0,0 +1,75 @@
|
||||
#!/bin/sh
|
||||
#
|
||||
# FZF Preview Script Template
|
||||
#
|
||||
# This script is a template. The placeholders in curly braces, like {NAME}
|
||||
# are dynamically filled by python using .replace()
|
||||
|
||||
WIDTH=${FZF_PREVIEW_COLUMNS:-80} # Set a fallback width of 80
|
||||
IMAGE_RENDERER="{IMAGE_RENDERER}"
|
||||
|
||||
generate_sha256() {
|
||||
local input
|
||||
|
||||
# Check if input is passed as an argument or piped
|
||||
if [ -n "$1" ]; then
|
||||
input="$1"
|
||||
else
|
||||
input=$(cat)
|
||||
fi
|
||||
|
||||
if command -v sha256sum &>/dev/null; then
|
||||
echo -n "$input" | sha256sum | awk '{print $1}'
|
||||
elif command -v shasum &>/dev/null; then
|
||||
echo -n "$input" | shasum -a 256 | awk '{print $1}'
|
||||
elif command -v sha256 &>/dev/null; then
|
||||
echo -n "$input" | sha256 | awk '{print $1}'
|
||||
elif command -v openssl &>/dev/null; then
|
||||
echo -n "$input" | openssl dgst -sha256 | awk '{print $2}'
|
||||
else
|
||||
echo -n "$input" | base64 | tr '/+' '_-' | tr -d '\n'
|
||||
fi
|
||||
}
|
||||
|
||||
|
||||
print_kv() {
|
||||
local key="$1"
|
||||
local value="$2"
|
||||
local key_len=${#key}
|
||||
local value_len=${#value}
|
||||
local multiplier="${3:-1}"
|
||||
|
||||
# Correctly calculate padding by accounting for the key, the ": ", and the value.
|
||||
local padding_len=$((WIDTH - key_len - 2 - value_len * multiplier))
|
||||
|
||||
# If the text is too long to fit, just add a single space for separation.
|
||||
if [ "$padding_len" -lt 1 ]; then
|
||||
padding_len=1
|
||||
value=$(echo $value| fold -s -w "$((WIDTH - key_len - 3))")
|
||||
printf "{C_KEY}%s:{RESET}%*s%s\\n" "$key" "$padding_len" "" " $value"
|
||||
else
|
||||
printf "{C_KEY}%s:{RESET}%*s%s\\n" "$key" "$padding_len" "" " $value"
|
||||
fi
|
||||
}
|
||||
|
||||
|
||||
draw_rule(){
|
||||
ll=2
|
||||
while [ $ll -le $FZF_PREVIEW_COLUMNS ];do
|
||||
echo -n -e "{C_RULE}─{RESET}"
|
||||
((ll++))
|
||||
done
|
||||
echo
|
||||
}
|
||||
|
||||
title={}
|
||||
hash=$(generate_sha256 "$title")
|
||||
|
||||
if [ "{PREVIEW_MODE}" = "full" ] || [ "{PREVIEW_MODE}" = "text" ]; then
|
||||
info_file="{INFO_CACHE_DIR}{PATH_SEP}$hash"
|
||||
if [ -f "$info_file" ]; then
|
||||
source "$info_file"
|
||||
else
|
||||
echo "📝 Loading details..."
|
||||
fi
|
||||
fi
|
||||
@@ -1,4 +1,4 @@
|
||||
from typing import Callable, Dict, Literal, Optional
|
||||
from typing import Callable, Dict, List, Literal, Optional, Union
|
||||
|
||||
from .....libs.media_api.params import (
|
||||
MediaAiringScheduleParams,
|
||||
@@ -7,7 +7,12 @@ from .....libs.media_api.params import (
|
||||
MediaRelationsParams,
|
||||
UpdateUserMediaListEntryParams,
|
||||
)
|
||||
from .....libs.media_api.types import MediaItem, MediaStatus, UserMediaListStatus
|
||||
from .....libs.media_api.types import (
|
||||
MediaItem,
|
||||
MediaReview,
|
||||
MediaStatus,
|
||||
UserMediaListStatus,
|
||||
)
|
||||
from .....libs.player.params import PlayerParams
|
||||
from ...session import Context, session
|
||||
from ...state import InternalDirective, MediaApiState, MenuName, State
|
||||
@@ -40,6 +45,7 @@ def media_actions(ctx: Context, state: State) -> State | InternalDirective:
|
||||
f"{'🔄 ' if icons else ''}Related Anime": _view_relations(ctx, state),
|
||||
f"{'👥 ' if icons else ''}Characters": _view_characters(ctx, state),
|
||||
f"{'📅 ' if icons else ''}Airing Schedule": _view_airing_schedule(ctx, state),
|
||||
f"{'📝 ' if icons else ''}View Reviews": _view_reviews(ctx, state),
|
||||
f"{'➕ ' if icons else ''}Add/Update List": _manage_user_media_list(ctx, state),
|
||||
f"{'⭐ ' if icons else ''}Score Anime": _score_anime(ctx, state),
|
||||
f"{'ℹ️ ' if icons else ''}View Info": _view_info(ctx, state),
|
||||
@@ -127,7 +133,9 @@ def _watch_trailer(ctx: Context, state: State) -> MenuAction:
|
||||
else:
|
||||
trailer_url = f"https://www.youtube.com/watch?v={media_item.trailer.id}"
|
||||
|
||||
ctx.player.play(PlayerParams(url=trailer_url, title=""))
|
||||
ctx.player.play(
|
||||
PlayerParams(url=trailer_url, query="", episode="", title="")
|
||||
)
|
||||
|
||||
return InternalDirective.RELOAD
|
||||
|
||||
@@ -774,3 +782,12 @@ def _view_airing_schedule(ctx: Context, state: State) -> MenuAction:
|
||||
return InternalDirective.RELOAD
|
||||
|
||||
return action
|
||||
|
||||
|
||||
def _view_reviews(ctx: Context, state: State) -> MenuAction:
|
||||
"""Action to transition to the review selection menu."""
|
||||
|
||||
def action() -> State | InternalDirective:
|
||||
return State(menu_name=MenuName.MEDIA_REVIEW, media_api=state.media_api)
|
||||
|
||||
return action
|
||||
|
||||
83
fastanime/cli/interactive/menu/media/media_review.py
Normal file
83
fastanime/cli/interactive/menu/media/media_review.py
Normal file
@@ -0,0 +1,83 @@
|
||||
from typing import Dict, List, Optional, Union
|
||||
|
||||
from .....libs.media_api.types import MediaReview
|
||||
from ...session import Context, session
|
||||
from ...state import InternalDirective, State
|
||||
|
||||
|
||||
@session.menu
|
||||
def media_review(ctx: Context, state: State) -> Union[State, InternalDirective]:
|
||||
"""
|
||||
Fetches and displays a list of reviews for the user to select from.
|
||||
Shows the full review body upon selection or in the preview pane.
|
||||
"""
|
||||
from rich.console import Console
|
||||
from rich.markdown import Markdown
|
||||
from rich.panel import Panel
|
||||
|
||||
feedback = ctx.feedback
|
||||
selector = ctx.selector
|
||||
console = Console()
|
||||
config = ctx.config
|
||||
media_item = state.media_api.media_item
|
||||
|
||||
if not media_item:
|
||||
feedback.error("Media item is not in state.")
|
||||
return InternalDirective.BACK
|
||||
|
||||
from .....libs.media_api.params import MediaReviewsParams
|
||||
|
||||
loading_message = (
|
||||
f"Fetching reviews for {media_item.title.english or media_item.title.romaji}..."
|
||||
)
|
||||
reviews: Optional[List[MediaReview]] = None
|
||||
|
||||
with feedback.progress(loading_message):
|
||||
reviews = ctx.media_api.get_reviews_for(
|
||||
MediaReviewsParams(id=media_item.id, per_page=15)
|
||||
)
|
||||
|
||||
if not reviews:
|
||||
feedback.warning("No reviews found for this anime.")
|
||||
selector.ask("Press Enter to return...")
|
||||
return InternalDirective.BACK
|
||||
|
||||
choice_map: Dict[str, MediaReview] = {
|
||||
f"By {review.user.name}: {(review.summary or 'No summary')[:80]}": review
|
||||
for review in reviews
|
||||
}
|
||||
choices = list(choice_map.keys()) + ["Back"]
|
||||
|
||||
preview_command = None
|
||||
if config.general.preview != "none":
|
||||
from ....utils.preview import create_preview_context
|
||||
|
||||
with create_preview_context() as preview_ctx:
|
||||
preview_command = preview_ctx.get_review_preview(choice_map, ctx.config)
|
||||
|
||||
while True:
|
||||
chosen_title = selector.choose(
|
||||
prompt="Select a review to read",
|
||||
choices=choices,
|
||||
preview=preview_command,
|
||||
)
|
||||
|
||||
if not chosen_title or chosen_title == "Back":
|
||||
return InternalDirective.BACK
|
||||
|
||||
selected_review = choice_map[chosen_title]
|
||||
console.clear()
|
||||
|
||||
reviewer_name = f"[bold magenta]{selected_review.user.name}[/bold magenta]"
|
||||
review_summary = (
|
||||
f"[italic green]'{selected_review.summary}'[/italic green]"
|
||||
if selected_review.summary
|
||||
else ""
|
||||
)
|
||||
panel_title = f"Review by {reviewer_name} - {review_summary}"
|
||||
review_body = Markdown(selected_review.body)
|
||||
|
||||
console.print(
|
||||
Panel(review_body, title=panel_title, border_style="blue", expand=True)
|
||||
)
|
||||
selector.ask("\nPress Enter to return to the review list...")
|
||||
@@ -41,6 +41,7 @@ class MenuName(Enum):
|
||||
MEDIA_ACTIONS = "MEDIA_ACTIONS"
|
||||
DOWNLOADS = "DOWNLOADS"
|
||||
DYNAMIC_SEARCH = "DYNAMIC_SEARCH"
|
||||
MEDIA_REVIEW = "MEDIA_REVIEW"
|
||||
|
||||
|
||||
class StateModel(BaseModel):
|
||||
|
||||
@@ -22,9 +22,14 @@ class PlayerService:
|
||||
self.player = create_player(app_config)
|
||||
|
||||
def play(
|
||||
self, params: PlayerParams, anime: Anime, media_item: Optional[MediaItem] = None
|
||||
self,
|
||||
params: PlayerParams,
|
||||
anime: Optional[Anime] = None,
|
||||
media_item: Optional[MediaItem] = None,
|
||||
) -> PlayerResult:
|
||||
if self.app_config.stream.use_ipc:
|
||||
if not anime:
|
||||
raise FastAnimeError("Anime object is required to run with ipc support")
|
||||
return self._play_with_ipc(params, anime, media_item)
|
||||
else:
|
||||
return self.player.play(params)
|
||||
|
||||
@@ -1,11 +1,11 @@
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
from typing import List, Optional
|
||||
from typing import Dict, List, Optional
|
||||
|
||||
from ...core.config import AppConfig
|
||||
from ...core.constants import APP_CACHE_DIR, PLATFORM, SCRIPTS_DIR
|
||||
from ...libs.media_api.types import MediaItem
|
||||
from ...libs.media_api.types import MediaItem, MediaReview
|
||||
from . import ansi
|
||||
from .preview_workers import PreviewWorkerManager
|
||||
|
||||
@@ -16,12 +16,16 @@ os.environ["SHELL"] = "bash"
|
||||
PREVIEWS_CACHE_DIR = APP_CACHE_DIR / "previews"
|
||||
IMAGES_CACHE_DIR = PREVIEWS_CACHE_DIR / "images"
|
||||
INFO_CACHE_DIR = PREVIEWS_CACHE_DIR / "info"
|
||||
REVIEWS_CACHE_DIR = PREVIEWS_CACHE_DIR / "reviews"
|
||||
|
||||
FZF_SCRIPTS_DIR = SCRIPTS_DIR / "fzf"
|
||||
TEMPLATE_PREVIEW_SCRIPT = (FZF_SCRIPTS_DIR / "preview.template.sh").read_text(
|
||||
encoding="utf-8"
|
||||
)
|
||||
DYNAMIC_PREVIEW_SCRIPT = (FZF_SCRIPTS_DIR / "dynamic_preview.template.sh").read_text(
|
||||
TEMPLATE_REVIEW_PREVIEW_SCRIPT = (
|
||||
FZF_SCRIPTS_DIR / "review-preview.template.sh"
|
||||
).read_text(encoding="utf-8")
|
||||
DYNAMIC_PREVIEW_SCRIPT = (FZF_SCRIPTS_DIR / "dynamic-preview.template.sh").read_text(
|
||||
encoding="utf-8"
|
||||
)
|
||||
|
||||
@@ -88,6 +92,14 @@ class PreviewContext:
|
||||
self._manager = _get_preview_manager()
|
||||
return get_dynamic_anime_preview(config)
|
||||
|
||||
def get_review_preview(
|
||||
self, choice_map: Dict[str, MediaReview], config: AppConfig
|
||||
) -> str:
|
||||
"""Get review preview script with managed workers."""
|
||||
if not self._manager:
|
||||
self._manager = _get_preview_manager()
|
||||
return get_review_preview(choice_map, config)
|
||||
|
||||
def cancel_all_tasks(self) -> int:
|
||||
"""Cancel all running preview tasks."""
|
||||
if not self._manager:
|
||||
@@ -247,9 +259,6 @@ def get_dynamic_anime_preview(config: AppConfig) -> str:
|
||||
# Use the dynamic preview script template
|
||||
preview_script = DYNAMIC_PREVIEW_SCRIPT
|
||||
|
||||
# We need to return the path to the search results file
|
||||
from ...core.constants import APP_CACHE_DIR
|
||||
|
||||
search_cache_dir = APP_CACHE_DIR / "search"
|
||||
search_results_file = search_cache_dir / "current_search_results.json"
|
||||
|
||||
@@ -282,7 +291,9 @@ def _get_preview_manager() -> PreviewWorkerManager:
|
||||
"""Get or create the global preview worker manager."""
|
||||
global _preview_manager
|
||||
if _preview_manager is None:
|
||||
_preview_manager = PreviewWorkerManager(IMAGES_CACHE_DIR, INFO_CACHE_DIR)
|
||||
_preview_manager = PreviewWorkerManager(
|
||||
IMAGES_CACHE_DIR, INFO_CACHE_DIR, REVIEWS_CACHE_DIR
|
||||
)
|
||||
return _preview_manager
|
||||
|
||||
|
||||
@@ -306,3 +317,38 @@ def get_preview_worker_status() -> dict:
|
||||
if _preview_manager:
|
||||
return _preview_manager.get_status()
|
||||
return {"preview_worker": None, "episode_worker": None}
|
||||
|
||||
|
||||
def get_review_preview(choice_map: Dict[str, MediaReview], config: AppConfig) -> str:
|
||||
"""
|
||||
Generate the generic loader script for review previews and start background caching.
|
||||
"""
|
||||
|
||||
REVIEWS_CACHE_DIR.mkdir(parents=True, exist_ok=True)
|
||||
preview_manager = _get_preview_manager()
|
||||
worker = preview_manager.get_review_worker()
|
||||
worker.cache_review_previews(choice_map, config)
|
||||
logger.debug("Started background caching for review previews")
|
||||
|
||||
# Use the generic loader script
|
||||
preview_script = TEMPLATE_REVIEW_PREVIEW_SCRIPT
|
||||
path_sep = "\\" if PLATFORM == "win32" else "/"
|
||||
|
||||
# Inject the correct cache path and color codes
|
||||
replacements = {
|
||||
"PREVIEW_MODE": config.general.preview,
|
||||
"INFO_CACHE_DIR": str(REVIEWS_CACHE_DIR),
|
||||
"PATH_SEP": path_sep,
|
||||
"C_TITLE": ansi.get_true_fg(config.fzf.header_color.split(","), bold=True),
|
||||
"C_KEY": ansi.get_true_fg(config.fzf.header_color.split(","), bold=True),
|
||||
"C_VALUE": ansi.get_true_fg(config.fzf.header_color.split(","), bold=True),
|
||||
"C_RULE": ansi.get_true_fg(
|
||||
config.fzf.preview_separator_color.split(","), bold=True
|
||||
),
|
||||
"RESET": ansi.RESET,
|
||||
}
|
||||
|
||||
for key, value in replacements.items():
|
||||
preview_script = preview_script.replace(f"{{{key}}}", value)
|
||||
|
||||
return preview_script
|
||||
|
||||
@@ -6,7 +6,7 @@ including image downloads and info text generation with proper lifecycle managem
|
||||
"""
|
||||
|
||||
import logging
|
||||
from typing import List, Optional
|
||||
from typing import Dict, List, Optional
|
||||
|
||||
import httpx
|
||||
|
||||
@@ -19,17 +19,21 @@ from ...core.utils.concurrency import (
|
||||
thread_manager,
|
||||
)
|
||||
from ...core.utils.file import AtomicWriter
|
||||
from ...libs.media_api.types import MediaItem
|
||||
from ...libs.media_api.types import MediaItem, MediaReview
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
FZF_SCRIPTS_DIR = SCRIPTS_DIR / "fzf"
|
||||
|
||||
|
||||
FZF_SCRIPTS_DIR = SCRIPTS_DIR / "fzf"
|
||||
TEMPLATE_INFO_SCRIPT = (FZF_SCRIPTS_DIR / "info.template.sh").read_text(
|
||||
encoding="utf-8"
|
||||
)
|
||||
TEMPLATE_EPISODE_INFO_SCRIPT = (FZF_SCRIPTS_DIR / "episode-info.template.sh").read_text(
|
||||
encoding="utf-8"
|
||||
)
|
||||
TEMPLATE_REVIEW_INFO_SCRIPT = (FZF_SCRIPTS_DIR / "review-info.template.sh").read_text(
|
||||
encoding="utf-8"
|
||||
)
|
||||
|
||||
|
||||
class PreviewCacheWorker(ManagedBackgroundWorker):
|
||||
@@ -394,6 +398,81 @@ class EpisodeCacheWorker(ManagedBackgroundWorker):
|
||||
logger.debug("Episode cache task completed successfully")
|
||||
|
||||
|
||||
class ReviewCacheWorker(ManagedBackgroundWorker):
|
||||
"""
|
||||
Specialized background worker for caching fully-rendered media review previews.
|
||||
"""
|
||||
|
||||
def __init__(self, reviews_cache_dir, max_workers: int = 10):
|
||||
super().__init__(max_workers=max_workers, name="ReviewCacheWorker")
|
||||
self.reviews_cache_dir = reviews_cache_dir
|
||||
|
||||
def cache_review_previews(
|
||||
self, choice_map: Dict[str, MediaReview], config: AppConfig
|
||||
) -> None:
|
||||
"""
|
||||
Creates cache files containing the final, formatted preview content for each review.
|
||||
|
||||
Args:
|
||||
choice_map: Dictionary mapping the fzf choice string to the MediaReview object.
|
||||
config: The application configuration.
|
||||
"""
|
||||
if not self.is_running():
|
||||
raise RuntimeError("ReviewCacheWorker is not running")
|
||||
|
||||
for choice_str, review in choice_map.items():
|
||||
hash_id = self._get_cache_hash(choice_str)
|
||||
info_path = self.reviews_cache_dir / hash_id
|
||||
|
||||
preview_content = self._generate_review_preview_content(review, config)
|
||||
self.submit_function(self._save_preview_content, preview_content, hash_id)
|
||||
|
||||
def _generate_review_preview_content(
|
||||
self, review: MediaReview, config: AppConfig
|
||||
) -> str:
|
||||
"""
|
||||
Generates the final, formatted preview content by injecting data into the template.
|
||||
"""
|
||||
|
||||
# Prepare the data for injection
|
||||
reviewer = review.user.name
|
||||
summary = review.summary or "N/A"
|
||||
body = review.body
|
||||
|
||||
# Inject data into the presentation template
|
||||
template = TEMPLATE_REVIEW_INFO_SCRIPT
|
||||
replacements = {
|
||||
"REVIEWER_NAME": formatter.shell_safe(reviewer),
|
||||
"REVIEW_SUMMARY": formatter.shell_safe(summary),
|
||||
"REVIEW_BODY": formatter.shell_safe(body),
|
||||
}
|
||||
for key, value in replacements.items():
|
||||
template = template.replace(f"{{{key}}}", value)
|
||||
|
||||
return template
|
||||
|
||||
def _save_preview_content(self, content: str, hash_id: str) -> None:
|
||||
"""Saves the final preview content to the cache."""
|
||||
try:
|
||||
info_path = self.reviews_cache_dir / hash_id
|
||||
with AtomicWriter(info_path) as f:
|
||||
f.write(content)
|
||||
logger.debug(f"Successfully cached review preview: {hash_id}")
|
||||
except IOError as e:
|
||||
logger.error(f"Failed to write review preview cache for {hash_id}: {e}")
|
||||
raise
|
||||
|
||||
def _get_cache_hash(self, text: str) -> str:
|
||||
from hashlib import sha256
|
||||
|
||||
return sha256(text.encode("utf-8")).hexdigest()
|
||||
|
||||
def _on_task_completed(self, task: WorkerTask, future) -> None:
|
||||
super()._on_task_completed(task, future)
|
||||
if future.exception():
|
||||
logger.warning(f"Review cache task failed: {future.exception()}")
|
||||
|
||||
|
||||
class PreviewWorkerManager:
|
||||
"""
|
||||
High-level manager for preview caching workers.
|
||||
@@ -402,7 +481,7 @@ class PreviewWorkerManager:
|
||||
caching workers with automatic lifecycle management.
|
||||
"""
|
||||
|
||||
def __init__(self, images_cache_dir, info_cache_dir):
|
||||
def __init__(self, images_cache_dir, info_cache_dir, reviews_cache_dir):
|
||||
"""
|
||||
Initialize the preview worker manager.
|
||||
|
||||
@@ -412,8 +491,10 @@ class PreviewWorkerManager:
|
||||
"""
|
||||
self.images_cache_dir = images_cache_dir
|
||||
self.info_cache_dir = info_cache_dir
|
||||
self.reviews_cache_dir = reviews_cache_dir
|
||||
self._preview_worker: Optional[PreviewCacheWorker] = None
|
||||
self._episode_worker: Optional[EpisodeCacheWorker] = None
|
||||
self._review_worker: Optional[ReviewCacheWorker] = None
|
||||
|
||||
def get_preview_worker(self) -> PreviewCacheWorker:
|
||||
"""Get or create the preview cache worker."""
|
||||
@@ -445,6 +526,19 @@ class PreviewWorkerManager:
|
||||
|
||||
return self._episode_worker
|
||||
|
||||
def get_review_worker(self) -> ReviewCacheWorker:
|
||||
"""Get or create the episode cache worker."""
|
||||
if self._review_worker is None or not self._review_worker.is_running():
|
||||
if self._episode_worker:
|
||||
# Clean up old worker
|
||||
thread_manager.shutdown_worker("review_cache_worker")
|
||||
|
||||
self._review_worker = ReviewCacheWorker(self.reviews_cache_dir)
|
||||
self._review_worker.start()
|
||||
thread_manager.register_worker("review_cache_worker", self._review_worker)
|
||||
|
||||
return self._review_worker
|
||||
|
||||
def shutdown_all(self, wait: bool = True, timeout: Optional[float] = 30.0) -> None:
|
||||
"""Shutdown all managed workers."""
|
||||
thread_manager.shutdown_worker(
|
||||
|
||||
@@ -14,11 +14,18 @@ from ..params import (
|
||||
MediaCharactersParams,
|
||||
MediaRecommendationParams,
|
||||
MediaRelationsParams,
|
||||
MediaReviewsParams,
|
||||
MediaSearchParams,
|
||||
UpdateUserMediaListEntryParams,
|
||||
UserMediaListSearchParams,
|
||||
)
|
||||
from ..types import MediaItem, MediaSearchResult, UserMediaListStatus, UserProfile
|
||||
from ..types import (
|
||||
MediaItem,
|
||||
MediaReview,
|
||||
MediaSearchResult,
|
||||
UserMediaListStatus,
|
||||
UserProfile,
|
||||
)
|
||||
from . import gql, mapper
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
@@ -248,6 +255,21 @@ class AniListApi(BaseApiClient):
|
||||
# TODO: standardize airing schedule type
|
||||
return response.json()
|
||||
|
||||
def get_reviews_for(
|
||||
self, params: MediaReviewsParams
|
||||
) -> Optional[List[MediaReview]]:
|
||||
variables = {
|
||||
"id": params.id,
|
||||
"page": params.page,
|
||||
"per_page": params.per_page or 10, # Default to 10 reviews
|
||||
}
|
||||
response = execute_graphql(
|
||||
ANILIST_ENDPOINT, self.http_client, gql.GET_REVIEWS, variables
|
||||
)
|
||||
if response and "errors" not in response.json():
|
||||
return mapper.to_generic_reviews_list(response.json())
|
||||
return None
|
||||
|
||||
def transform_raw_search_data(self, raw_data: Any) -> Optional[MediaSearchResult]:
|
||||
"""
|
||||
Transform raw AniList API response data into a MediaSearchResult.
|
||||
|
||||
@@ -12,6 +12,7 @@ from ..types import (
|
||||
MediaGenre,
|
||||
MediaImage,
|
||||
MediaItem,
|
||||
MediaReview,
|
||||
MediaSearchResult,
|
||||
MediaStatus,
|
||||
MediaTag,
|
||||
@@ -19,6 +20,7 @@ from ..types import (
|
||||
MediaTitle,
|
||||
MediaTrailer,
|
||||
PageInfo,
|
||||
Reviewer,
|
||||
StreamingEpisode,
|
||||
Studio,
|
||||
UserListItem,
|
||||
@@ -38,6 +40,8 @@ from .types import (
|
||||
AnilistMediaTitle,
|
||||
AnilistMediaTrailer,
|
||||
AnilistPageInfo,
|
||||
AnilistReview,
|
||||
AnilistReviews,
|
||||
AnilistStudioNodes,
|
||||
AnilistViewerData,
|
||||
)
|
||||
@@ -349,3 +353,38 @@ def to_generic_recommendations(data: dict) -> Optional[List[MediaItem]]:
|
||||
continue
|
||||
|
||||
return result if result else None
|
||||
|
||||
|
||||
def _to_generic_reviewer(anilist_user: AnilistCurrentlyLoggedInUser) -> Reviewer:
|
||||
"""Maps an AniList user object to a generic Reviewer."""
|
||||
return Reviewer(
|
||||
name=anilist_user["name"],
|
||||
avatar_url=anilist_user["avatar"]["large"]
|
||||
if anilist_user.get("avatar")
|
||||
else None,
|
||||
)
|
||||
|
||||
|
||||
def _to_generic_review(anilist_review: AnilistReview) -> MediaReview:
|
||||
"""Maps a single AniList review to a generic Review."""
|
||||
return MediaReview(
|
||||
summary=anilist_review.get("summary"),
|
||||
body=anilist_review.get("body", "No review body provided.") or "",
|
||||
user=_to_generic_reviewer(anilist_review["user"]),
|
||||
)
|
||||
|
||||
|
||||
def to_generic_reviews_list(data: AnilistReviews) -> Optional[List[MediaReview]]:
|
||||
"""Top-level mapper for a list of reviews."""
|
||||
if not data or "data" not in data:
|
||||
return None
|
||||
|
||||
page_data = data["data"].get("Page", {})
|
||||
if not page_data:
|
||||
return None
|
||||
|
||||
raw_reviews = page_data.get("reviews", [])
|
||||
if not raw_reviews:
|
||||
return []
|
||||
|
||||
return [_to_generic_review(review) for review in raw_reviews if review]
|
||||
|
||||
@@ -1,4 +1,11 @@
|
||||
from typing import Literal, TypedDict
|
||||
from typing import Literal, Optional, TypedDict
|
||||
|
||||
|
||||
class AnilistPageInfo(TypedDict):
|
||||
total: int
|
||||
perPage: int
|
||||
currentPage: int
|
||||
hasNextPage: bool
|
||||
|
||||
|
||||
class AnilistMediaTitle(TypedDict):
|
||||
@@ -16,7 +23,7 @@ class AnilistImage(TypedDict):
|
||||
class AnilistCurrentlyLoggedInUser(TypedDict):
|
||||
id: int
|
||||
name: str
|
||||
bannerImage: str | None
|
||||
bannerImage: Optional[str]
|
||||
avatar: AnilistImage
|
||||
token: str
|
||||
|
||||
@@ -31,9 +38,9 @@ class AnilistViewerData(TypedDict):
|
||||
|
||||
class AnilistUser(TypedDict):
|
||||
name: str
|
||||
about: str | None
|
||||
about: Optional[str]
|
||||
avatar: AnilistImage
|
||||
bannerImage: str | None
|
||||
bannerImage: Optional[str]
|
||||
|
||||
|
||||
class AnilistUserInfo(TypedDict):
|
||||
@@ -76,15 +83,6 @@ class AnilistMediaNextAiringEpisode(TypedDict):
|
||||
episode: int
|
||||
|
||||
|
||||
class AnilistReview(TypedDict):
|
||||
summary: str
|
||||
user: AnilistCurrentlyLoggedInUser
|
||||
|
||||
|
||||
class AnilistReviewNodes(TypedDict):
|
||||
nodes: list[AnilistReview]
|
||||
|
||||
|
||||
class AnilistMediaRanking(TypedDict):
|
||||
rank: int
|
||||
context: str
|
||||
@@ -151,6 +149,29 @@ class StreamingEpisode(TypedDict):
|
||||
thumbnail: str
|
||||
|
||||
|
||||
class AnilistReview(TypedDict):
|
||||
summary: str
|
||||
user: AnilistCurrentlyLoggedInUser
|
||||
body: Optional[str]
|
||||
|
||||
|
||||
class AnilistReviewPage(TypedDict):
|
||||
pageInfo: AnilistPageInfo
|
||||
reviews: list[AnilistReview]
|
||||
|
||||
|
||||
class AnilistReviewPages(TypedDict):
|
||||
Page: AnilistReviewPage
|
||||
|
||||
|
||||
class AnilistReviews(TypedDict):
|
||||
data: AnilistReviewPages
|
||||
|
||||
|
||||
class AnilistReviewNodes(TypedDict):
|
||||
nodes: list[AnilistReview]
|
||||
|
||||
|
||||
class AnilistBaseMediaDataSchema(TypedDict):
|
||||
"""
|
||||
This a convenience class is used to type the received Anilist data to enhance dev experience
|
||||
@@ -192,13 +213,6 @@ class AnilistBaseMediaDataSchema(TypedDict):
|
||||
mediaListEntry: AnilistMediaList_ | None
|
||||
|
||||
|
||||
class AnilistPageInfo(TypedDict):
|
||||
total: int
|
||||
perPage: int
|
||||
currentPage: int
|
||||
hasNextPage: bool
|
||||
|
||||
|
||||
class AnilistPage(TypedDict):
|
||||
media: list[AnilistBaseMediaDataSchema]
|
||||
pageInfo: AnilistPageInfo
|
||||
|
||||
@@ -7,11 +7,12 @@ from .params import (
|
||||
MediaCharactersParams,
|
||||
MediaRecommendationParams,
|
||||
MediaRelationsParams,
|
||||
MediaReviewsParams,
|
||||
MediaSearchParams,
|
||||
UpdateUserMediaListEntryParams,
|
||||
UserMediaListSearchParams,
|
||||
)
|
||||
from .types import MediaItem, MediaSearchResult, UserProfile
|
||||
from .types import MediaItem, MediaReview, MediaSearchResult, UserProfile
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from httpx import Client
|
||||
@@ -79,6 +80,12 @@ class BaseApiClient(abc.ABC):
|
||||
) -> Optional[Dict]:
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def get_reviews_for(
|
||||
self, params: MediaReviewsParams
|
||||
) -> Optional[List[MediaReview]]:
|
||||
pass
|
||||
|
||||
@abc.abstractmethod
|
||||
def transform_raw_search_data(self, raw_data: Dict) -> Optional[MediaSearchResult]:
|
||||
"""
|
||||
|
||||
@@ -105,3 +105,10 @@ class MediaRelationsParams:
|
||||
@dataclass(frozen=True)
|
||||
class MediaAiringScheduleParams:
|
||||
id: int
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class MediaReviewsParams:
|
||||
id: int
|
||||
page: int = 1
|
||||
per_page: Optional[int] = None
|
||||
|
||||
@@ -199,7 +199,24 @@ class UserProfile(BaseMediaApiModel):
|
||||
banner_url: Optional[str] = None
|
||||
|
||||
|
||||
class Reviewer(BaseMediaApiModel):
|
||||
"""A generic representation of a user who wrote a review."""
|
||||
|
||||
name: str
|
||||
avatar_url: Optional[str] = None
|
||||
|
||||
|
||||
class MediaReview(BaseMediaApiModel):
|
||||
"""A generic representation of a media review."""
|
||||
|
||||
summary: Optional[str] = None
|
||||
body: str
|
||||
user: Reviewer
|
||||
|
||||
|
||||
# ENUMS
|
||||
|
||||
|
||||
class MediaTag(Enum):
|
||||
# Cast
|
||||
POLYAMOROUS = "Polyamorous"
|
||||
|
||||
Reference in New Issue
Block a user