diff --git a/fastanime/cli/interactive/menu/media/media_actions.py b/fastanime/cli/interactive/menu/media/media_actions.py index b52eb1c..36d7f9a 100644 --- a/fastanime/cli/interactive/menu/media/media_actions.py +++ b/fastanime/cli/interactive/menu/media/media_actions.py @@ -1,15 +1,12 @@ -from typing import Callable, Dict, List, Literal, Optional, Union +from typing import Callable, Dict, Literal, Optional from .....libs.media_api.params import ( - MediaAiringScheduleParams, - MediaCharactersParams, MediaRecommendationParams, MediaRelationsParams, UpdateUserMediaListEntryParams, ) from .....libs.media_api.types import ( MediaItem, - MediaReview, MediaStatus, UserMediaListStatus, ) @@ -610,7 +607,9 @@ def _view_airing_schedule(ctx: Context, state: State) -> MenuAction: """Action to transition to the airing schedule menu.""" def action() -> State | InternalDirective: - return State(menu_name=MenuName.MEDIA_AIRING_SCHEDULE, media_api=state.media_api) + return State( + menu_name=MenuName.MEDIA_AIRING_SCHEDULE, media_api=state.media_api + ) return action diff --git a/fastanime/cli/interactive/menu/media/media_airing_schedule.py b/fastanime/cli/interactive/menu/media/media_airing_schedule.py index eaf5669..c327dbf 100644 --- a/fastanime/cli/interactive/menu/media/media_airing_schedule.py +++ b/fastanime/cli/interactive/menu/media/media_airing_schedule.py @@ -6,16 +6,15 @@ from ...state import InternalDirective, State @session.menu -def media_airing_schedule(ctx: Context, state: State) -> Union[State, InternalDirective]: +def media_airing_schedule( + ctx: Context, state: State +) -> Union[State, InternalDirective]: """ Fetches and displays the airing schedule for an anime. Shows upcoming episodes with air dates and countdown timers. """ - from datetime import datetime from rich.console import Console from rich.panel import Panel - from rich.table import Table - from rich.text import Text feedback = ctx.feedback selector = ctx.selector @@ -28,9 +27,7 @@ def media_airing_schedule(ctx: Context, state: State) -> Union[State, InternalDi from .....libs.media_api.params import MediaAiringScheduleParams - loading_message = ( - f"Fetching airing schedule for {media_item.title.english or media_item.title.romaji}..." - ) + loading_message = f"Fetching airing schedule for {media_item.title.english or media_item.title.romaji}..." schedule_result: Optional[AiringScheduleResult] = None with feedback.progress(loading_message): @@ -41,7 +38,7 @@ def media_airing_schedule(ctx: Context, state: State) -> Union[State, InternalDi if not schedule_result or not schedule_result.schedule_items: feedback.warning( "No airing schedule found", - "This anime doesn't have upcoming episodes or airing data" + "This anime doesn't have upcoming episodes or airing data", ) return InternalDirective.BACK @@ -54,7 +51,7 @@ def media_airing_schedule(ctx: Context, state: State) -> Union[State, InternalDi display_name += f" - {airing_time.strftime('%Y-%m-%d %H:%M')}" if item.time_until_airing: display_name += f" (in {item.time_until_airing})" - + choice_map[display_name] = item choices = list(choice_map.keys()) + ["View Full Schedule", "Back"] @@ -65,7 +62,9 @@ def media_airing_schedule(ctx: Context, state: State) -> Union[State, InternalDi anime_title = media_item.title.english or media_item.title.romaji or "Unknown" with create_preview_context() as preview_ctx: - preview_command = preview_ctx.get_airing_schedule_preview(schedule_result, ctx.config, anime_title) + preview_command = preview_ctx.get_airing_schedule_preview( + schedule_result, ctx.config, anime_title + ) while True: chosen_title = selector.choose( @@ -76,11 +75,13 @@ def media_airing_schedule(ctx: Context, state: State) -> Union[State, InternalDi if not chosen_title or chosen_title == "Back": return InternalDirective.BACK - + if chosen_title == "View Full Schedule": console.clear() # Display airing schedule - anime_title = media_item.title.english or media_item.title.romaji or "Unknown" + anime_title = ( + media_item.title.english or media_item.title.romaji or "Unknown" + ) _display_airing_schedule(console, schedule_result, anime_title) selector.ask("\nPress Enter to return...") continue @@ -88,37 +89,40 @@ def media_airing_schedule(ctx: Context, state: State) -> Union[State, InternalDi # Show individual episode details selected_item = choice_map[chosen_title] console.clear() - - from rich.panel import Panel - from datetime import datetime - + episode_info = [] episode_info.append(f"[bold cyan]Episode {selected_item.episode}[/bold cyan]") - + if selected_item.airing_at: airing_time = selected_item.airing_at - episode_info.append(f"[green]Airs at:[/green] {airing_time.strftime('%Y-%m-%d %H:%M:%S')}") - + episode_info.append( + f"[green]Airs at:[/green] {airing_time.strftime('%Y-%m-%d %H:%M:%S')}" + ) + if selected_item.time_until_airing: - episode_info.append(f"[yellow]Time until airing:[/yellow] {selected_item.time_until_airing}") - + episode_info.append( + f"[yellow]Time until airing:[/yellow] {selected_item.time_until_airing}" + ) + episode_content = "\n".join(episode_info) - + console.print( Panel( episode_content, title=f"Episode Details - {media_item.title.english or media_item.title.romaji}", border_style="blue", - expand=True + expand=True, ) ) - + selector.ask("\nPress Enter to return to the schedule list...") return InternalDirective.BACK -def _display_airing_schedule(console, schedule_result: AiringScheduleResult, anime_title: str): +def _display_airing_schedule( + console, schedule_result: AiringScheduleResult, anime_title: str +): """Display the airing schedule in a formatted table.""" from datetime import datetime from rich.panel import Panel @@ -144,7 +148,7 @@ def _display_airing_schedule(console, schedule_result: AiringScheduleResult, ani # Format air date if episode.airing_at: formatted_date = episode.airing_at.strftime("%Y-%m-%d %H:%M") - + # Check if episode has already aired now = datetime.now() if episode.airing_at < now: @@ -181,15 +185,18 @@ def _display_airing_schedule(console, schedule_result: AiringScheduleResult, ani # Add summary information total_episodes = len(schedule_result.schedule_items) - upcoming_episodes = sum(1 for ep in schedule_result.schedule_items - if ep.airing_at and ep.airing_at > datetime.now()) - + upcoming_episodes = sum( + 1 + for ep in schedule_result.schedule_items + if ep.airing_at and ep.airing_at > datetime.now() + ) + summary_text = Text() - summary_text.append(f"Total episodes in schedule: ", style="bold") + summary_text.append("Total episodes in schedule: ", style="bold") summary_text.append(f"{total_episodes}", style="cyan") - summary_text.append(f"\nUpcoming episodes: ", style="bold") + summary_text.append("\nUpcoming episodes: ", style="bold") summary_text.append(f"{upcoming_episodes}", style="green") - + summary_panel = Panel( summary_text, title="[bold]Summary[/bold]", diff --git a/fastanime/cli/interactive/menu/media/media_characters.py b/fastanime/cli/interactive/menu/media/media_characters.py index 537f98f..4c101dd 100644 --- a/fastanime/cli/interactive/menu/media/media_characters.py +++ b/fastanime/cli/interactive/menu/media/media_characters.py @@ -1,5 +1,5 @@ import re -from typing import Dict, List, Optional, Union +from typing import Dict, Optional, Union from .....libs.media_api.types import Character, CharacterSearchResult from ...session import Context, session @@ -13,9 +13,6 @@ def media_characters(ctx: Context, state: State) -> Union[State, InternalDirecti Shows character details upon selection or in the preview pane. """ from rich.console import Console - from rich.panel import Panel - from rich.table import Table - from rich.text import Text feedback = ctx.feedback selector = ctx.selector @@ -29,9 +26,7 @@ def media_characters(ctx: Context, state: State) -> Union[State, InternalDirecti from .....libs.media_api.params import MediaCharactersParams - loading_message = ( - f"Fetching characters for {media_item.title.english or media_item.title.romaji}..." - ) + loading_message = f"Fetching characters for {media_item.title.english or media_item.title.romaji}..." characters_result: Optional[CharacterSearchResult] = None with feedback.progress(loading_message): @@ -45,7 +40,7 @@ def media_characters(ctx: Context, state: State) -> Union[State, InternalDirecti characters = characters_result.characters choice_map: Dict[str, Character] = {} - + # Create display names for characters for character in characters: display_name = character.name.full or character.name.first or "Unknown" @@ -53,7 +48,7 @@ def media_characters(ctx: Context, state: State) -> Union[State, InternalDirecti display_name += f" ({character.gender})" if character.age: display_name += f" - Age {character.age}" - + choice_map[display_name] = character choices = list(choice_map.keys()) + ["Back"] @@ -81,18 +76,16 @@ def media_characters(ctx: Context, state: State) -> Union[State, InternalDirecti # Display character details anime_title = media_item.title.english or media_item.title.romaji or "Unknown" _display_character_details(console, selected_character, anime_title) - + selector.ask("\nPress Enter to return to the character list...") def _display_character_details(console, character: Character, anime_title: str): """Display detailed character information in a formatted panel.""" from rich.columns import Columns - from rich.console import Console from rich.panel import Panel from rich.table import Table from rich.text import Text - import re # Character name panel name_text = Text() @@ -165,7 +158,7 @@ def _display_character_details(console, character: Character, anime_title: str): # Display everything console.print(name_panel) console.print() - + # Show panels side by side if there's basic info if info_table.rows: console.print(Columns([info_panel, description_panel], equal=True, expand=True)) diff --git a/fastanime/cli/service/feedback/service.py b/fastanime/cli/service/feedback/service.py index 1cc640e..6c4705a 100644 --- a/fastanime/cli/service/feedback/service.py +++ b/fastanime/cli/service/feedback/service.py @@ -1,4 +1,3 @@ -import time from contextlib import contextmanager from typing import Optional diff --git a/fastanime/cli/utils/preview_workers.py b/fastanime/cli/utils/preview_workers.py index d530c85..848d401 100644 --- a/fastanime/cli/utils/preview_workers.py +++ b/fastanime/cli/utils/preview_workers.py @@ -39,12 +39,12 @@ TEMPLATE_EPISODE_INFO_SCRIPT = (FZF_SCRIPTS_DIR / "episode-info.template.sh").re TEMPLATE_REVIEW_INFO_SCRIPT = (FZF_SCRIPTS_DIR / "review-info.template.sh").read_text( encoding="utf-8" ) -TEMPLATE_CHARACTER_INFO_SCRIPT = (FZF_SCRIPTS_DIR / "character-info.template.sh").read_text( - encoding="utf-8" -) -TEMPLATE_AIRING_SCHEDULE_INFO_SCRIPT = (FZF_SCRIPTS_DIR / "airing-schedule-info.template.sh").read_text( - encoding="utf-8" -) +TEMPLATE_CHARACTER_INFO_SCRIPT = ( + FZF_SCRIPTS_DIR / "character-info.template.sh" +).read_text(encoding="utf-8") +TEMPLATE_AIRING_SCHEDULE_INFO_SCRIPT = ( + FZF_SCRIPTS_DIR / "airing-schedule-info.template.sh" +).read_text(encoding="utf-8") class PreviewCacheWorker(ManagedBackgroundWorker): @@ -510,7 +510,9 @@ class CharacterCacheWorker(ManagedBackgroundWorker): hash_id = self._get_cache_hash(choice_str) info_path = self.characters_cache_dir / hash_id - preview_content = self._generate_character_preview_content(character, config) + preview_content = self._generate_character_preview_content( + character, config + ) self.submit_function(self._save_preview_content, preview_content, hash_id) def _generate_character_preview_content( @@ -519,18 +521,25 @@ class CharacterCacheWorker(ManagedBackgroundWorker): """ Generates the final, formatted preview content by injecting character data into the template. """ - character_name = character.name.full or character.name.first or "Unknown Character" + character_name = ( + character.name.full or character.name.first or "Unknown Character" + ) native_name = character.name.native or "N/A" gender = character.gender or "Unknown" age = str(character.age) if character.age else "Unknown" blood_type = character.blood_type or "N/A" favourites = f"{character.favourites:,}" if character.favourites else "0" - birthday = character.date_of_birth.strftime("%B %d, %Y") if character.date_of_birth else "N/A" - + birthday = ( + character.date_of_birth.strftime("%B %d, %Y") + if character.date_of_birth + else "N/A" + ) + # Clean and format description description = character.description or "No description available" if description: import re + description = re.sub(r"<[^>]+>", "", description) description = ( description.replace(""", '"') @@ -571,6 +580,7 @@ class CharacterCacheWorker(ManagedBackgroundWorker): def _get_cache_hash(self, text: str) -> str: from hashlib import sha256 + return sha256(text.encode("utf-8")).hexdigest() def _on_task_completed(self, task: WorkerTask, future) -> None: @@ -619,16 +629,21 @@ class AiringScheduleCacheWorker(ManagedBackgroundWorker): from datetime import datetime total_episodes = len(schedule_result.schedule_items) - upcoming_episodes = sum(1 for ep in schedule_result.schedule_items - if ep.airing_at and ep.airing_at > datetime.now()) + upcoming_episodes = sum( + 1 + for ep in schedule_result.schedule_items + if ep.airing_at and ep.airing_at > datetime.now() + ) # Generate schedule table text schedule_lines = [] - sorted_episodes = sorted(schedule_result.schedule_items, key=lambda x: x.episode) - + sorted_episodes = sorted( + schedule_result.schedule_items, key=lambda x: x.episode + ) + for episode in sorted_episodes[:10]: # Show next 10 episodes ep_num = str(episode.episode) - + if episode.airing_at: formatted_date = episode.airing_at.strftime("%Y-%m-%d %H:%M") now = datetime.now() @@ -650,13 +665,15 @@ class AiringScheduleCacheWorker(ManagedBackgroundWorker): elif hours > 0: time_str = f"{hours}h" else: - time_str = f"<1h" + time_str = "<1h" elif episode.airing_at and episode.airing_at < datetime.now(): time_str = "Aired" else: time_str = "Unknown" - schedule_lines.append(f"Episode {ep_num:>3}: {formatted_date} ({time_str}) - {status}") + schedule_lines.append( + f"Episode {ep_num:>3}: {formatted_date} ({time_str}) - {status}" + ) schedule_table = "\n".join(schedule_lines) @@ -681,11 +698,14 @@ class AiringScheduleCacheWorker(ManagedBackgroundWorker): f.write(content) logger.debug(f"Successfully cached airing schedule preview: {hash_id}") except IOError as e: - logger.error(f"Failed to write airing schedule preview cache for {hash_id}: {e}") + logger.error( + f"Failed to write airing schedule preview cache for {hash_id}: {e}" + ) raise def _get_cache_hash(self, text: str) -> str: from hashlib import sha256 + return sha256(text.encode("utf-8")).hexdigest() def _on_task_completed(self, task: WorkerTask, future) -> None: @@ -772,20 +792,29 @@ class PreviewWorkerManager: self._character_worker = CharacterCacheWorker(self.info_cache_dir) self._character_worker.start() - thread_manager.register_worker("character_cache_worker", self._character_worker) + thread_manager.register_worker( + "character_cache_worker", self._character_worker + ) return self._character_worker def get_airing_schedule_worker(self) -> AiringScheduleCacheWorker: """Get or create the airing schedule cache worker.""" - if self._airing_schedule_worker is None or not self._airing_schedule_worker.is_running(): + if ( + self._airing_schedule_worker is None + or not self._airing_schedule_worker.is_running() + ): if self._airing_schedule_worker: # Clean up old worker thread_manager.shutdown_worker("airing_schedule_cache_worker") - self._airing_schedule_worker = AiringScheduleCacheWorker(self.info_cache_dir) + self._airing_schedule_worker = AiringScheduleCacheWorker( + self.info_cache_dir + ) self._airing_schedule_worker.start() - thread_manager.register_worker("airing_schedule_cache_worker", self._airing_schedule_worker) + thread_manager.register_worker( + "airing_schedule_cache_worker", self._airing_schedule_worker + ) return self._airing_schedule_worker diff --git a/fastanime/libs/media_api/anilist/api.py b/fastanime/libs/media_api/anilist/api.py index 067d46e..c68149a 100644 --- a/fastanime/libs/media_api/anilist/api.py +++ b/fastanime/libs/media_api/anilist/api.py @@ -1,6 +1,6 @@ import logging from enum import Enum -from typing import Any, Dict, List, Optional +from typing import Any, List, Optional from httpx import Client @@ -230,7 +230,9 @@ class AniListApi(BaseApiClient): ) return mapper.to_generic_recommendations(response.json()) - def get_characters_of(self, params: MediaCharactersParams) -> Optional[CharacterSearchResult]: + def get_characters_of( + self, params: MediaCharactersParams + ) -> Optional[CharacterSearchResult]: variables = {"id": params.id, "type": "ANIME"} response = execute_graphql( ANILIST_ENDPOINT, self.http_client, gql.GET_MEDIA_CHARACTERS, variables diff --git a/fastanime/libs/media_api/anilist/mapper.py b/fastanime/libs/media_api/anilist/mapper.py index 951eed9..ca667b5 100644 --- a/fastanime/libs/media_api/anilist/mapper.py +++ b/fastanime/libs/media_api/anilist/mapper.py @@ -400,7 +400,7 @@ def _to_generic_character_name(anilist_name: Optional[Dict]) -> CharacterName: """Maps an AniList character name object to a generic CharacterName.""" if not anilist_name: return CharacterName() - + return CharacterName( first=anilist_name.get("first"), middle=anilist_name.get("middle"), @@ -410,11 +410,13 @@ def _to_generic_character_name(anilist_name: Optional[Dict]) -> CharacterName: ) -def _to_generic_character_image(anilist_image: Optional[Dict]) -> Optional[CharacterImage]: +def _to_generic_character_image( + anilist_image: Optional[Dict], +) -> Optional[CharacterImage]: """Maps an AniList character image object to a generic CharacterImage.""" if not anilist_image: return None - + return CharacterImage( medium=anilist_image.get("medium"), large=anilist_image.get("large"), @@ -425,7 +427,7 @@ def _to_generic_character(anilist_character: Dict) -> Optional[Character]: """Maps an AniList character object to a generic Character.""" if not anilist_character: return None - + # Parse date of birth if available date_of_birth = None if dob := anilist_character.get("dateOfBirth"): @@ -437,7 +439,7 @@ def _to_generic_character(anilist_character: Dict) -> Optional[Character]: date_of_birth = datetime(year, month, day) except (ValueError, TypeError): pass - + return Character( id=anilist_character.get("id"), name=_to_generic_character_name(anilist_character.get("name")), @@ -460,12 +462,12 @@ def to_generic_characters_result(data: Dict) -> Optional[CharacterSearchResult]: try: page_data = data["data"]["Page"]["media"][0] characters_data = page_data["characters"]["nodes"] - + characters = [] for char_data in characters_data: if character := _to_generic_character(char_data): characters.append(character) - + return CharacterSearchResult( characters=characters, page_info=None, # Characters don't typically have pagination @@ -475,18 +477,20 @@ def to_generic_characters_result(data: Dict) -> Optional[CharacterSearchResult]: return None -def _to_generic_airing_schedule_item(anilist_episode: Dict) -> Optional[AiringScheduleItem]: +def _to_generic_airing_schedule_item( + anilist_episode: Dict, +) -> Optional[AiringScheduleItem]: """Maps an AniList airing schedule episode to a generic AiringScheduleItem.""" if not anilist_episode: return None - + airing_at = None if airing_timestamp := anilist_episode.get("airingAt"): try: airing_at = datetime.fromtimestamp(airing_timestamp) except (ValueError, TypeError): pass - + return AiringScheduleItem( episode=anilist_episode.get("episode", 0), airing_at=airing_at, @@ -503,12 +507,12 @@ def to_generic_airing_schedule_result(data: Dict) -> Optional[AiringScheduleResu try: page_data = data["data"]["Page"]["media"][0] schedule_data = page_data["airingSchedule"]["nodes"] - + schedule_items = [] for episode_data in schedule_data: if item := _to_generic_airing_schedule_item(episode_data): schedule_items.append(item) - + return AiringScheduleResult( schedule_items=schedule_items, page_info=None, # Schedule doesn't typically have pagination diff --git a/fastanime/libs/media_api/base.py b/fastanime/libs/media_api/base.py index 9c3d788..0d7f0eb 100644 --- a/fastanime/libs/media_api/base.py +++ b/fastanime/libs/media_api/base.py @@ -14,7 +14,6 @@ from .params import ( ) from .types import ( AiringScheduleResult, - Character, CharacterSearchResult, MediaItem, MediaReview, @@ -73,7 +72,9 @@ class BaseApiClient(abc.ABC): pass @abc.abstractmethod - def get_characters_of(self, params: MediaCharactersParams) -> Optional[CharacterSearchResult]: + def get_characters_of( + self, params: MediaCharactersParams + ) -> Optional[CharacterSearchResult]: pass @abc.abstractmethod diff --git a/fastanime/libs/media_api/jikan/api.py b/fastanime/libs/media_api/jikan/api.py index ddee39d..93197af 100644 --- a/fastanime/libs/media_api/jikan/api.py +++ b/fastanime/libs/media_api/jikan/api.py @@ -1,7 +1,7 @@ from __future__ import annotations import logging -from typing import TYPE_CHECKING, Dict, List, Optional +from typing import TYPE_CHECKING, List, Optional from ..base import BaseApiClient from ..params import ( @@ -138,9 +138,13 @@ class JikanApi(BaseApiClient): logger.error(f"Failed to fetch recommendations for media {params.id}: {e}") return None - def get_characters_of(self, params: MediaCharactersParams) -> Optional[CharacterSearchResult]: + def get_characters_of( + self, params: MediaCharactersParams + ) -> Optional[CharacterSearchResult]: """Fetches characters for a given anime.""" - logger.warning("Jikan API does not support fetching character data in the standardized format.") + logger.warning( + "Jikan API does not support fetching character data in the standardized format." + ) return None def get_related_anime_for(