refactor: streamline authentication and search command preparation in dynamic search

This commit is contained in:
Benexl
2025-07-28 14:25:36 +03:00
parent 6e7e75b514
commit adf75f65b2
2 changed files with 76 additions and 108 deletions

View File

@@ -1,7 +1,5 @@
import json import json
import logging import logging
import os
import tempfile
from .....core.constants import APP_CACHE_DIR, SCRIPTS_DIR from .....core.constants import APP_CACHE_DIR, SCRIPTS_DIR
from .....libs.media_api.params import MediaSearchParams from .....libs.media_api.params import MediaSearchParams
@@ -36,129 +34,99 @@ def dynamic_search(ctx: Context, state: State) -> State | InternalDirective:
# Prepare the search script # Prepare the search script
auth_header = "" auth_header = ""
if ctx.media_api.is_authenticated() and hasattr(ctx.media_api, "token"): profile = ctx.auth.get_auth()
auth_header = f"Bearer {ctx.media_api.token}" if ctx.media_api.is_authenticated() and profile:
auth_header = f"Bearer {profile.token}"
# Create a temporary search script search_command = SEARCH_TEMPLATE_SCRIPT
with tempfile.NamedTemporaryFile(
mode="w", suffix=".sh", delete=False
) as temp_script:
script_content = SEARCH_TEMPLATE_SCRIPT
replacements = { replacements = {
"GRAPHQL_ENDPOINT": "https://graphql.anilist.co", "GRAPHQL_ENDPOINT": "https://graphql.anilist.co",
"GRAPHQL_QUERY": search_query_escaped, "GRAPHQL_QUERY": search_query_escaped,
"CACHE_DIR": str(SEARCH_CACHE_DIR), "CACHE_DIR": str(SEARCH_CACHE_DIR),
"SEARCH_RESULTS_FILE": str(SEARCH_RESULTS_FILE), "SEARCH_RESULTS_FILE": str(SEARCH_RESULTS_FILE),
"AUTH_HEADER": auth_header, "AUTH_HEADER": auth_header,
} }
for key, value in replacements.items(): for key, value in replacements.items():
script_content = script_content.replace(f"{{{key}}}", str(value)) search_command = search_command.replace(f"{{{key}}}", str(value))
temp_script.write(script_content)
temp_script_path = temp_script.name
try: try:
# Make the script executable # Prepare preview functionality
os.chmod(temp_script_path, 0o755) preview_command = None
if ctx.config.general.preview != "none":
from ....utils.preview import create_preview_context
# Use the selector's search functionality with create_preview_context() as preview_ctx:
try: preview_command = preview_ctx.get_dynamic_anime_preview(ctx.config)
# Prepare preview functionality
preview_command = None
if ctx.config.general.preview != "none":
from ....utils.preview import create_preview_context
with create_preview_context() as preview_ctx:
preview_command = preview_ctx.get_dynamic_anime_preview(ctx.config)
choice = ctx.selector.search(
prompt="Search Anime",
search_command=f"bash {temp_script_path} {{q}}",
preview=preview_command,
header="Type to search for anime dynamically",
)
else:
choice = ctx.selector.search( choice = ctx.selector.search(
prompt="Search Anime", prompt="Search Anime",
search_command=f"bash {temp_script_path} {{q}}", search_command=search_command,
header="Type to search for anime dynamically", preview=preview_command,
) )
except NotImplementedError: else:
feedback.error("Dynamic search is not supported by your current selector") choice = ctx.selector.search(
feedback.info( prompt="Search Anime",
"Please use the regular search option or switch to fzf selector" search_command=search_command,
) )
return InternalDirective.MAIN except NotImplementedError:
feedback.error("Dynamic search is not supported by your current selector")
feedback.info("Please use the regular search option or switch to fzf selector")
return InternalDirective.MAIN
if not choice: if not choice:
return InternalDirective.MAIN return InternalDirective.MAIN
# Read the cached search results # Read the cached search results
if not SEARCH_RESULTS_FILE.exists(): if not SEARCH_RESULTS_FILE.exists():
logger.error("Search results file not found") logger.error("Search results file not found")
return InternalDirective.MAIN return InternalDirective.MAIN
try: with open(SEARCH_RESULTS_FILE, "r", encoding="utf-8") as f:
with open(SEARCH_RESULTS_FILE, "r", encoding="utf-8") as f: raw_data = json.load(f)
raw_data = json.load(f)
# Transform the raw data into MediaSearchResult # Transform the raw data into MediaSearchResult
search_result = ctx.media_api.transform_raw_search_data(raw_data) search_result = ctx.media_api.transform_raw_search_data(raw_data)
if not search_result or not search_result.media: if not search_result or not search_result.media:
feedback.info("No results found") feedback.info("No results found")
return InternalDirective.MAIN return InternalDirective.MAIN
# Find the selected media item by matching the choice with the displayed format # Find the selected media item by matching the choice with the displayed format
selected_media = None selected_media = None
for media_item in search_result.media: for media_item in search_result.media:
title = ( title = (
media_item.title.english media_item.title.english
or media_item.title.romaji or media_item.title.romaji
or media_item.title.native or media_item.title.native
or "Unknown" or "Unknown"
) )
year = ( year = media_item.start_date.year if media_item.start_date else "Unknown"
media_item.start_date.year if media_item.start_date else "Unknown" status = media_item.status.value if media_item.status else "Unknown"
) genres = (
status = media_item.status.value if media_item.status else "Unknown" ", ".join([genre.value for genre in media_item.genres[:3]])
genres = ( if media_item.genres
", ".join([genre.value for genre in media_item.genres[:3]]) else "Unknown"
if media_item.genres )
else "Unknown"
)
display_format = f"{title} ({year}) [{status}] - {genres}" display_format = f"{title} ({year}) [{status}] - {genres}"
if choice.strip() == display_format.strip(): if choice.strip() == display_format.strip():
selected_media = media_item selected_media = media_item
break break
if not selected_media: if not selected_media:
logger.error(f"Could not find selected media for choice: {choice}") logger.error(f"Could not find selected media for choice: {choice}")
return InternalDirective.MAIN return InternalDirective.MAIN
# Navigate to media actions with the selected item # Navigate to media actions with the selected item
return State( return State(
menu_name=MenuName.MEDIA_ACTIONS, menu_name=MenuName.MEDIA_ACTIONS,
media_api=MediaApiState( media_api=MediaApiState(
search_result={selected_media.id: selected_media}, search_result={selected_media.id: selected_media},
media_id=selected_media.id, media_id=selected_media.id,
search_params=MediaSearchParams(), search_params=MediaSearchParams(),
page_info=search_result.page_info, page_info=search_result.page_info,
), ),
) )
except (json.JSONDecodeError, KeyError, Exception) as e:
logger.error(f"Error processing search results: {e}")
feedback.error("Failed to process search results")
return InternalDirective.MAIN
finally:
# Clean up the temporary script
try:
os.unlink(temp_script_path)
except OSError:
pass

View File

@@ -122,7 +122,7 @@ class FzfSelector(BaseSelector):
"--prompt", "--prompt",
f"{prompt.title()}: ", f"{prompt.title()}: ",
"--header", "--header",
header or self.header, self.header,
"--header-first", "--header-first",
"--bind", "--bind",
f"change:reload({search_command})", f"change:reload({search_command})",