feat: dynamic search rewrite in python

This commit is contained in:
Benexl
2025-12-02 14:36:03 +03:00
parent c8c4e1b2c0
commit 80771f65ea
4 changed files with 661 additions and 27 deletions

View File

@@ -0,0 +1,474 @@
#!/usr/bin/env python3
#
# FZF Dynamic Preview Script for Search Results
#
# This script handles previews for dynamic search by reading from the cached
# search results JSON and generating preview content on-the-fly.
# Template variables are injected by Python using .replace()
import json
import os
import shutil
import subprocess
import sys
from hashlib import sha256
from pathlib import Path
# Import the utility functions
try:
from _ansi_utils import (
get_terminal_width,
print_rule,
print_table_row,
strip_markdown,
wrap_text,
)
ANSI_UTILS_AVAILABLE = True
except ImportError:
ANSI_UTILS_AVAILABLE = False
# Fallback if _ansi_utils is not available
def get_terminal_width():
return int(os.environ.get("FZF_PREVIEW_COLUMNS", "80"))
def print_rule(sep_color):
r, g, b = map(int, sep_color.split(","))
width = get_terminal_width()
print(f"\x1b[38;2;{r};{g};{b}m" + ("" * width) + "\x1b[0m")
def print_table_row(key, value, header_color, _key_width, _value_width):
r, g, b = map(int, header_color.split(","))
print(f"\x1b[38;2;{r};{g};{b};1m{key}\x1b[0m: {value}")
def strip_markdown(text):
import re
text = re.sub(r'\*\*(.+?)\*\*', r'\1', text)
text = re.sub(r'__(.+?)__', r'\1', text)
text = re.sub(r'\*(.+?)\*', r'\1', text)
text = re.sub(r'_(.+?)_', r'\1', text)
return text
def wrap_text(text, width):
import textwrap
return '\n'.join(textwrap.wrap(text, width))
# --- Template Variables (Injected by Python) ---
SEARCH_RESULTS_FILE = Path("{SEARCH_RESULTS_FILE}")
IMAGE_CACHE_DIR = Path("{IMAGE_CACHE_DIR}")
PREVIEW_MODE = "{PREVIEW_MODE}"
IMAGE_RENDERER = "{IMAGE_RENDERER}"
HEADER_COLOR = "{HEADER_COLOR}"
SEPARATOR_COLOR = "{SEPARATOR_COLOR}"
SCALE_UP = "{SCALE_UP}" == "True"
# --- Arguments ---
# sys.argv[1] is the selected anime title from fzf
SELECTED_TITLE = sys.argv[1] if len(sys.argv) > 1 else ""
def format_number(num):
"""Format number with thousand separators."""
if num is None:
return "N/A"
return f"{num:,}"
def format_date(date_obj):
"""Format date object to string."""
if not date_obj or date_obj == "null":
return "N/A"
year = date_obj.get("year")
month = date_obj.get("month")
day = date_obj.get("day")
if not year:
return "N/A"
if month and day:
return f"{day}/{month}/{year}"
if month:
return f"{month}/{year}"
return str(year)
def get_media_from_results(title):
"""Find media item in search results by title."""
if not SEARCH_RESULTS_FILE.exists():
return None
try:
with open(SEARCH_RESULTS_FILE, "r", encoding="utf-8") as f:
data = json.load(f)
media_list = data.get("data", {}).get("Page", {}).get("media", [])
for media in media_list:
title_obj = media.get("title", {})
eng = title_obj.get("english")
rom = title_obj.get("romaji")
nat = title_obj.get("native")
if title in (eng, rom, nat):
return media
return None
except Exception as e:
print(f"Error reading search results: {e}", file=sys.stderr)
return None
def download_image(url: str, output_path: Path) -> bool:
"""Download image from URL and save to file."""
try:
# Try using urllib (stdlib)
from urllib import request
req = request.Request(url, headers={"User-Agent": "viu/1.0"})
with request.urlopen(req, timeout=5) as response:
data = response.read()
output_path.write_bytes(data)
return True
except Exception:
# Silently fail - preview will just not show image
return False
def which(cmd):
"""Check if command exists."""
return shutil.which(cmd)
def get_terminal_dimensions():
"""Get terminal dimensions from FZF environment."""
fzf_cols = os.environ.get("FZF_PREVIEW_COLUMNS")
fzf_lines = os.environ.get("FZF_PREVIEW_LINES")
if fzf_cols and fzf_lines:
return int(fzf_cols), int(fzf_lines)
try:
rows, cols = (
subprocess.check_output(
["stty", "size"], text=True, stderr=subprocess.DEVNULL
)
.strip()
.split()
)
return int(cols), int(rows)
except Exception:
return 80, 24
def render_kitty(file_path, width, height, scale_up):
"""Render using the Kitty Graphics Protocol (kitten/icat)."""
cmd = []
if which("kitten"):
cmd = ["kitten", "icat"]
elif which("icat"):
cmd = ["icat"]
elif which("kitty"):
cmd = ["kitty", "+kitten", "icat"]
if not cmd:
return False
args = [
"--clear",
"--transfer-mode=memory",
"--unicode-placeholder",
"--stdin=no",
f"--place={width}x{height}@0x0",
]
if scale_up:
args.append("--scale-up")
args.append(file_path)
subprocess.run(cmd + args, stdout=sys.stdout, stderr=sys.stderr)
return True
def render_sixel(file_path, width, height):
"""Render using Sixel."""
if which("chafa"):
subprocess.run(
["chafa", "-f", "sixel", "-s", f"{width}x{height}", file_path],
stdout=sys.stdout,
stderr=sys.stderr,
)
return True
if which("img2sixel"):
pixel_width = width * 10
pixel_height = height * 20
subprocess.run(
[
"img2sixel",
f"--width={pixel_width}",
f"--height={pixel_height}",
file_path,
],
stdout=sys.stdout,
stderr=sys.stderr,
)
return True
return False
def render_iterm(file_path, width, height):
"""Render using iTerm2 Inline Image Protocol."""
if which("imgcat"):
subprocess.run(
["imgcat", "-W", str(width), "-H", str(height), file_path],
stdout=sys.stdout,
stderr=sys.stderr,
)
return True
if which("chafa"):
subprocess.run(
["chafa", "-f", "iterm", "-s", f"{width}x{height}", file_path],
stdout=sys.stdout,
stderr=sys.stderr,
)
return True
return False
def render_timg(file_path, width, height):
"""Render using timg."""
if which("timg"):
subprocess.run(
["timg", f"-g{width}x{height}", "--upscale", file_path],
stdout=sys.stdout,
stderr=sys.stderr,
)
return True
return False
def render_chafa_auto(file_path, width, height):
"""Render using Chafa in auto mode."""
if which("chafa"):
subprocess.run(
["chafa", "-s", f"{width}x{height}", file_path],
stdout=sys.stdout,
stderr=sys.stderr,
)
return True
return False
def fzf_image_preview(file_path: str):
"""Main dispatch function to choose the best renderer."""
cols, lines = get_terminal_dimensions()
width = cols
height = lines
# Check explicit configuration
if IMAGE_RENDERER == "icat" or IMAGE_RENDERER == "system-kitty":
if render_kitty(file_path, width, height, SCALE_UP):
return
elif IMAGE_RENDERER == "sixel" or IMAGE_RENDERER == "system-sixels":
if render_sixel(file_path, width, height):
return
elif IMAGE_RENDERER == "imgcat":
if render_iterm(file_path, width, height):
return
elif IMAGE_RENDERER == "timg":
if render_timg(file_path, width, height):
return
elif IMAGE_RENDERER == "chafa":
if render_chafa_auto(file_path, width, height):
return
# Auto-detection / Fallback
if os.environ.get("KITTY_WINDOW_ID") or os.environ.get("GHOSTTY_BIN_DIR"):
if render_kitty(file_path, width, height, SCALE_UP):
return
if os.environ.get("TERM_PROGRAM") == "iTerm.app":
if render_iterm(file_path, width, height):
return
# Try standard tools in order of quality/preference
if render_kitty(file_path, width, height, SCALE_UP):
return
if render_sixel(file_path, width, height):
return
if render_timg(file_path, width, height):
return
if render_chafa_auto(file_path, width, height):
return
print("⚠️ No suitable image renderer found (icat, chafa, timg, img2sixel).")
def main():
if not SELECTED_TITLE:
print("No selection")
return
# Get the media data from cached search results
media = get_media_from_results(SELECTED_TITLE)
if not media:
print("Loading preview...")
return
term_width = get_terminal_width()
# Extract media information
title_obj = media.get("title", {})
title = title_obj.get("english") or title_obj.get("romaji") or title_obj.get("native") or "Unknown"
# Show image if in image or full mode
if PREVIEW_MODE in ("image", "full"):
cover_image = media.get("coverImage", {}).get("large", "")
if cover_image:
# Ensure image cache directory exists
IMAGE_CACHE_DIR.mkdir(parents=True, exist_ok=True)
# Generate hash matching the preview worker pattern
# Use "anime-" prefix and hash of just the title (no KEY prefix for dynamic search)
hash_id = f"anime-{sha256(SELECTED_TITLE.encode('utf-8')).hexdigest()}"
image_file = IMAGE_CACHE_DIR / f"{hash_id}.png"
# Download image if not cached
if not image_file.exists():
download_image(cover_image, image_file)
# Try to render the image
if image_file.exists():
fzf_image_preview(str(image_file))
print() # Spacer
else:
print("🖼️ Loading image...")
print()
# Show text info if in text or full mode
if PREVIEW_MODE in ("text", "full"):
# Separator line
r, g, b = map(int, SEPARATOR_COLOR.split(","))
separator = f"\x1b[38;2;{r};{g};{b}m" + ("" * term_width) + "\x1b[0m"
print(separator, flush=True)
# Title centered
print(title.center(term_width))
# Extract data
status = media.get("status", "Unknown")
format_type = media.get("format", "Unknown")
episodes = media.get("episodes", "?")
duration = media.get("duration")
duration_str = f"{duration} min" if duration else "Unknown"
score = media.get("averageScore")
score_str = f"{score}/100" if score else "N/A"
favourites = format_number(media.get("favourites", 0))
popularity = format_number(media.get("popularity", 0))
genres = ", ".join(media.get("genres", [])[:5]) or "Unknown"
start_date = format_date(media.get("startDate"))
end_date = format_date(media.get("endDate"))
studios_list = media.get("studios", {}).get("nodes", [])
studios = ", ".join([s.get("name", "") for s in studios_list[:3]]) or "Unknown"
synonyms_list = media.get("synonyms", [])
synonyms = ", ".join(synonyms_list[:3]) or "N/A"
description = media.get("description", "No description available.")
description = strip_markdown(description)
# Print sections matching media_info.py structure
rows = [
("Score", score_str),
("Favorites", favourites),
("Popularity", popularity),
("Status", status),
]
print_rule(SEPARATOR_COLOR)
for key, value in rows:
if ANSI_UTILS_AVAILABLE:
print_table_row(key, value, HEADER_COLOR, 0, 0)
else:
print_table_row(key, value, HEADER_COLOR, 15, term_width - 20)
rows = [
("Episodes", str(episodes)),
("Duration", duration_str),
]
print_rule(SEPARATOR_COLOR)
for key, value in rows:
if ANSI_UTILS_AVAILABLE:
print_table_row(key, value, HEADER_COLOR, 0, 0)
else:
print_table_row(key, value, HEADER_COLOR, 15, term_width - 20)
rows = [
("Genres", genres),
("Format", format_type),
]
print_rule(SEPARATOR_COLOR)
for key, value in rows:
if ANSI_UTILS_AVAILABLE:
print_table_row(key, value, HEADER_COLOR, 0, 0)
else:
print_table_row(key, value, HEADER_COLOR, 15, term_width - 20)
rows = [
("Start Date", start_date),
("End Date", end_date),
]
print_rule(SEPARATOR_COLOR)
for key, value in rows:
if ANSI_UTILS_AVAILABLE:
print_table_row(key, value, HEADER_COLOR, 0, 0)
else:
print_table_row(key, value, HEADER_COLOR, 15, term_width - 20)
rows = [
("Studios", studios),
]
print_rule(SEPARATOR_COLOR)
for key, value in rows:
if ANSI_UTILS_AVAILABLE:
print_table_row(key, value, HEADER_COLOR, 0, 0)
else:
print_table_row(key, value, HEADER_COLOR, 15, term_width - 20)
rows = [
("Synonyms", synonyms),
]
print_rule(SEPARATOR_COLOR)
for key, value in rows:
if ANSI_UTILS_AVAILABLE:
print_table_row(key, value, HEADER_COLOR, 0, 0)
else:
print_table_row(key, value, HEADER_COLOR, 15, term_width - 20)
print_rule(SEPARATOR_COLOR)
print(wrap_text(description, term_width))
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
pass
except Exception as e:
print(f"Preview Error: {e}", file=sys.stderr)

View File

@@ -0,0 +1,153 @@
#!/usr/bin/env python3
#
# FZF Dynamic Search Script Template
#
# This script is a template for dynamic search functionality in fzf.
# The placeholders in curly braces, like {GRAPHQL_ENDPOINT} are dynamically
# filled by Python using .replace() during runtime.
import json
import os
import sys
from pathlib import Path
from urllib import request
from urllib.error import URLError
# --- Template Variables (Injected by Python) ---
GRAPHQL_ENDPOINT = "{GRAPHQL_ENDPOINT}"
SEARCH_RESULTS_FILE = Path("{SEARCH_RESULTS_FILE}")
AUTH_HEADER = "{AUTH_HEADER}"
# The GraphQL query is injected as a properly escaped JSON string
GRAPHQL_QUERY = {GRAPHQL_QUERY}
# --- Get Query from fzf ---
# fzf passes the current query as the first argument when using --bind change:reload
QUERY = sys.argv[1] if len(sys.argv) > 1 else ""
# If query is empty, exit with empty results
if not QUERY.strip():
print("")
sys.exit(0)
def make_graphql_request(endpoint: str, query: str, variables: dict, auth_token: str = "") -> dict | None:
"""
Make a GraphQL request to the specified endpoint.
Args:
endpoint: GraphQL API endpoint URL
query: GraphQL query string
variables: Query variables as a dictionary
auth_token: Optional authorization token (Bearer token)
Returns:
Response JSON as a dictionary, or None if request fails
"""
payload = {
"query": query,
"variables": variables
}
headers = {
"Content-Type": "application/json",
"User-Agent": "viu/1.0"
}
if auth_token:
headers["Authorization"] = auth_token
try:
req = request.Request(
endpoint,
data=json.dumps(payload).encode("utf-8"),
headers=headers,
method="POST"
)
with request.urlopen(req, timeout=10) as response:
return json.loads(response.read().decode("utf-8"))
except (URLError, json.JSONDecodeError, Exception) as e:
print(f"❌ Request failed: {e}", file=sys.stderr)
return None
def extract_title(media_item: dict) -> str:
"""
Extract the best available title from a media item.
Args:
media_item: Media object from GraphQL response
Returns:
Title string (english > romaji > native > "Unknown")
"""
title_obj = media_item.get("title", {})
return (
title_obj.get("english") or
title_obj.get("romaji") or
title_obj.get("native") or
"Unknown"
)
def main():
# Ensure parent directory exists
SEARCH_RESULTS_FILE.parent.mkdir(parents=True, exist_ok=True)
# Create GraphQL variables
variables = {
"query": QUERY,
"type": "ANIME",
"per_page": 50,
"genre_not_in": ["Hentai"]
}
# Make the GraphQL request
response = make_graphql_request(
GRAPHQL_ENDPOINT,
GRAPHQL_QUERY,
variables,
AUTH_HEADER
)
if response is None:
print("❌ Search failed")
sys.exit(1)
# Save the raw response for later processing by dynamic_search.py
try:
with open(SEARCH_RESULTS_FILE, "w", encoding="utf-8") as f:
json.dump(response, f, ensure_ascii=False, indent=2)
except IOError as e:
print(f"❌ Failed to save results: {e}", file=sys.stderr)
sys.exit(1)
# Parse and display results
if "errors" in response:
print(f"❌ Search error: {response['errors']}")
sys.exit(1)
# Navigate the response structure
data = response.get("data", {})
page = data.get("Page", {})
media_list = page.get("media", [])
if not media_list:
print("❌ No results found")
sys.exit(0)
# Output titles for fzf (one per line)
for media in media_list:
title = extract_title(media)
print(title)
if __name__ == "__main__":
try:
main()
except KeyboardInterrupt:
sys.exit(0)
except Exception as e:
print(f"❌ Unexpected error: {e}", file=sys.stderr)
sys.exit(1)

View File

@@ -1,5 +1,6 @@
import json
import logging
import sys
from .....core.constants import APP_CACHE_DIR, SCRIPTS_DIR
from .....libs.media_api.params import MediaSearchParams
@@ -11,7 +12,7 @@ logger = logging.getLogger(__name__)
SEARCH_CACHE_DIR = APP_CACHE_DIR / "search"
SEARCH_RESULTS_FILE = SEARCH_CACHE_DIR / "current_search_results.json"
FZF_SCRIPTS_DIR = SCRIPTS_DIR / "fzf"
SEARCH_TEMPLATE_SCRIPT = (FZF_SCRIPTS_DIR / "search.template.sh").read_text(
SEARCH_TEMPLATE_SCRIPT = (FZF_SCRIPTS_DIR / "search.py").read_text(
encoding="utf-8"
)
@@ -29,8 +30,8 @@ def dynamic_search(ctx: Context, state: State) -> State | InternalDirective:
from .....libs.media_api.anilist import gql
search_query = gql.SEARCH_MEDIA.read_text(encoding="utf-8")
# Properly escape the GraphQL query for JSON
search_query_escaped = json.dumps(search_query)
# Escape the GraphQL query as a JSON string literal for Python script
search_query_json = json.dumps(search_query)
# Prepare the search script
auth_header = ""
@@ -42,8 +43,7 @@ def dynamic_search(ctx: Context, state: State) -> State | InternalDirective:
replacements = {
"GRAPHQL_ENDPOINT": "https://graphql.anilist.co",
"GRAPHQL_QUERY": search_query_escaped,
"CACHE_DIR": str(SEARCH_CACHE_DIR),
"GRAPHQL_QUERY": search_query_json,
"SEARCH_RESULTS_FILE": str(SEARCH_RESULTS_FILE),
"AUTH_HEADER": auth_header,
}
@@ -51,6 +51,14 @@ def dynamic_search(ctx: Context, state: State) -> State | InternalDirective:
for key, value in replacements.items():
search_command = search_command.replace(f"{{{key}}}", str(value))
# Write the filled template to a cache file
search_script_file = SEARCH_CACHE_DIR / "search-script.py"
search_script_file.write_text(search_command, encoding="utf-8")
# Make the search script executable by calling it with python3
# fzf will pass the query as {q} which becomes the first argument
search_command_final = f"{sys.executable} {search_script_file} {{q}}"
try:
# Prepare preview functionality
preview_command = None
@@ -62,13 +70,13 @@ def dynamic_search(ctx: Context, state: State) -> State | InternalDirective:
choice = ctx.selector.search(
prompt="Search Anime",
search_command=search_command,
search_command=search_command_final,
preview=preview_command,
)
else:
choice = ctx.selector.search(
prompt="Search Anime",
search_command=search_command,
search_command=search_command_final,
)
except NotImplementedError:
feedback.error("Dynamic search is not supported by your current selector")

View File

@@ -127,7 +127,9 @@ INFO_CACHE_DIR = PREVIEWS_CACHE_DIR / "info"
FZF_SCRIPTS_DIR = SCRIPTS_DIR / "fzf"
TEMPLATE_PREVIEW_SCRIPT = (FZF_SCRIPTS_DIR / "preview.py").read_text(encoding="utf-8")
DYNAMIC_PREVIEW_SCRIPT = ""
DYNAMIC_PREVIEW_SCRIPT = (FZF_SCRIPTS_DIR / "dynamic_preview.py").read_text(
encoding="utf-8"
)
EPISODE_PATTERN = re.compile(r"^Episode\s+(\d+)\s-\s.*")
@@ -534,13 +536,13 @@ def get_dynamic_anime_preview(config: AppConfig) -> str:
This is different from regular anime preview because:
1. We don't have media items upfront
2. The preview needs to work with search results as they come in
3. Preview is handled entirely in shell by parsing JSON results
3. Preview script dynamically loads data from search results JSON
Args:
config: Application configuration
Returns:
Preview script content for fzf dynamic search
Preview script command for fzf dynamic search
"""
# Ensure cache directories exist
IMAGES_CACHE_DIR.mkdir(parents=True, exist_ok=True)
@@ -556,30 +558,27 @@ def get_dynamic_anime_preview(config: AppConfig) -> str:
search_cache_dir = APP_CACHE_DIR / "search"
search_results_file = search_cache_dir / "current_search_results.json"
# Prepare values to inject into the template
path_sep = "\\" if PLATFORM == "win32" else "/"
# Format the template with the dynamic values
# Prepare replacements for the template
replacements = {
"PREVIEW_MODE": config.general.preview,
"IMAGE_CACHE_PATH": str(IMAGES_CACHE_DIR),
"INFO_CACHE_PATH": str(INFO_CACHE_DIR),
"PATH_SEP": path_sep,
"IMAGE_RENDERER": config.general.image_renderer,
"SEARCH_RESULTS_FILE": str(search_results_file),
# Color codes
"C_TITLE": ansi.get_true_fg(HEADER_COLOR, bold=True),
"C_KEY": ansi.get_true_fg(HEADER_COLOR, bold=True),
"C_VALUE": ansi.get_true_fg(HEADER_COLOR, bold=True),
"C_RULE": ansi.get_true_fg(SEPARATOR_COLOR, bold=True),
"RESET": ansi.RESET,
"SCALE_UP": " --scale-up" if config.general.preview_scale_up else "",
"IMAGE_CACHE_DIR": str(IMAGES_CACHE_DIR),
"PREVIEW_MODE": config.general.preview,
"IMAGE_RENDERER": config.general.image_renderer,
"HEADER_COLOR": ",".join(HEADER_COLOR),
"SEPARATOR_COLOR": ",".join(SEPARATOR_COLOR),
"SCALE_UP": str(config.general.preview_scale_up),
}
for key, value in replacements.items():
preview_script = preview_script.replace(f"{{{key}}}", value)
return preview_script
# Write the preview script to cache
preview_file = PREVIEWS_CACHE_DIR / "dynamic-search-preview-script.py"
preview_file.write_text(preview_script, encoding="utf-8")
# Return the command to execute the preview script
preview_script_final = f"{sys.executable} {preview_file} {{}}"
return preview_script_final
def _get_preview_manager() -> PreviewWorkerManager: