feat: stabilize the interactive workflow

This commit is contained in:
Benexl
2025-07-14 20:09:57 +03:00
parent e8491e3723
commit d1dfddf290
24 changed files with 617 additions and 406 deletions

View File

@@ -0,0 +1,29 @@
# Define ANSI escape codes as constants
RESET = "\033[0m"
BOLD = "\033[1m"
INVISIBLE_CURSOR = "\033[?25l"
VISIBLE_CURSOR = "\033[?25h"
UNDERLINE = "\033[4m"
def get_true_fg(color: list[str], bold: bool = True) -> str:
"""Custom helper function that enables colored text in the terminal
Args:
bold: whether to bolden the text
string: string to color
r: red
g: green
b: blue
Returns:
colored string
"""
# NOTE: Currently only supports terminals that support true color
r = color[0]
g = color[1]
b = color[2]
if bold:
return f"{BOLD}\033[38;2;{r};{g};{b};m"
else:
return f"\033[38;2;{r};{g};{b};m"

View File

@@ -0,0 +1,63 @@
import re
from typing import TYPE_CHECKING, List, Optional
from yt_dlp.utils import clean_html as ytdlp_clean_html
from ...libs.api.types import AiringSchedule, MediaItem
COMMA_REGEX = re.compile(r"([0-9]{3})(?=\d)")
def clean_html(raw_html: str) -> str:
"""A wrapper around yt-dlp's clean_html to handle None inputs."""
return ytdlp_clean_html(raw_html) if raw_html else ""
def format_number_with_commas(number: Optional[int]) -> str:
"""Formats an integer with commas for thousands separation."""
if number is None:
return "N/A"
return COMMA_REGEX.sub(r"\1,", str(number)[::-1])[::-1]
def format_airing_schedule(airing: Optional[AiringSchedule]) -> str:
"""Formats the next airing episode information into a readable string."""
if not airing or not airing.airing_at:
return "N/A"
# Get a human-readable date and time
air_date = airing.airing_at.strftime("%a, %b %d at %I:%M %p")
return f"Ep {airing.episode} on {air_date}"
def format_genres(genres: List[str]) -> str:
"""Joins a list of genres into a single, comma-separated string."""
return ", ".join(genres) if genres else "N/A"
def format_score_stars_full(score: Optional[float]) -> str:
"""Formats an AniList score (0-100) to a 0-10 scale using full stars."""
if score is None:
return "N/A"
# Convert 0-100 to 0-10, then to a whole number of stars
num_stars = min(round(score * 6 / 100), 6)
return "" * num_stars
def format_score(score: Optional[float]) -> str:
"""Formats an AniList score (0-100) to a 0-10 scale."""
if score is None:
return "N/A"
return f"{score / 10.0:.1f} / 10"
def shell_safe(text: Optional[str]) -> str:
"""
Escapes a string for safe inclusion in a shell script,
specifically for use within double quotes. It escapes backticks,
double quotes, and dollar signs.
"""
if not text:
return ""
return text.replace("`", "\\`").replace('"', '\\"').replace("$", "\\$")

View File

@@ -0,0 +1,87 @@
# fastanime/cli/utils/image.py
from __future__ import annotations
import logging
import shutil
import subprocess
from typing import Optional
import click
import httpx
logger = logging.getLogger(__name__)
def render_image(url: str, capture: bool = False, size: str = "30x30") -> Optional[str]:
"""
Renders an image from a URL in the terminal using icat or chafa.
This function automatically detects the best available tool.
Args:
url: The URL of the image to render.
capture: If True, returns the terminal-formatted image as a string
instead of printing it. Defaults to False.
size: The size parameter to pass to the rendering tool (e.g., "WxH").
Returns:
If capture is True, returns the image data as a string.
If capture is False, prints directly to the terminal and returns None.
Returns None on any failure.
"""
# --- Common subprocess arguments ---
subprocess_kwargs = {
"check": False, # We will handle errors manually
"capture_output": capture,
"text": capture, # Decode stdout/stderr as text if capturing
}
# --- Try icat (Kitty terminal) first ---
if icat_executable := shutil.which("icat"):
process = subprocess.run(
[icat_executable, "--align", "left", url], **subprocess_kwargs
)
if process.returncode == 0:
return process.stdout if capture else None
logger.warning(f"icat failed for URL {url} with code {process.returncode}")
# --- Fallback to chafa ---
if chafa_executable := shutil.which("chafa"):
try:
# Chafa requires downloading the image data first
with httpx.Client() as client:
response = client.get(url, follow_redirects=True, timeout=20)
response.raise_for_status()
img_bytes = response.content
# Add stdin input to the subprocess arguments
subprocess_kwargs["input"] = img_bytes
process = subprocess.run(
[chafa_executable, f"--size={size}", "-"], **subprocess_kwargs
)
if process.returncode == 0:
return process.stdout if capture else None
logger.warning(f"chafa failed for URL {url} with code {process.returncode}")
except httpx.HTTPStatusError as e:
logger.error(
f"HTTP error fetching image for chafa: {e.response.status_code}"
)
click.echo(
f"[dim]Error fetching image: {e.response.status_code}[/dim]", err=True
)
except Exception as e:
logger.error(f"An exception occurred while running chafa: {e}")
return None
# --- Final fallback if no tool is found ---
if not capture:
# Only show this message if the user expected to see something.
click.echo(
"[dim](Image preview skipped: icat or chafa not found)[/dim]", err=True
)
return None

View File

@@ -1,11 +1,11 @@
import concurrent.futures
import logging
import textwrap
import os
import shutil
from hashlib import sha256
from io import StringIO
from pathlib import Path
from threading import Thread
from typing import TYPE_CHECKING, List
from typing import List
import httpx
from rich.console import Console
@@ -13,11 +13,9 @@ from rich.panel import Panel
from rich.text import Text
from ...core.config import AppConfig
from ...core.constants import APP_DIR, PLATFORM
from .scripts import bash_functions
if TYPE_CHECKING:
from ...libs.api.types import MediaItem
from ...core.constants import APP_CACHE_DIR, APP_DIR, PLATFORM
from ...libs.api.types import MediaItem
from . import ansi, formatters
logger = logging.getLogger(__name__)
@@ -27,15 +25,7 @@ IMAGES_CACHE_DIR = PREVIEWS_CACHE_DIR / "images"
INFO_CACHE_DIR = PREVIEWS_CACHE_DIR / "info"
FZF_SCRIPTS_DIR = APP_DIR / "libs" / "selectors" / "fzf" / "scripts"
PREVIEW_SCRIPT_TEMPLATE_PATH = FZF_SCRIPTS_DIR / "preview.sh"
# Ensure cache directories exist on startup
IMAGES_CACHE_DIR.mkdir(parents=True, exist_ok=True)
INFO_CACHE_DIR.mkdir(parents=True, exist_ok=True)
# The helper functions (_get_cache_hash, _save_image_from_url, _save_info_text,
# _format_info_text, and _cache_worker) remain exactly the same as before.
# I am including them here for completeness.
INFO_SCRIPT_TEMPLATE_PATH = FZF_SCRIPTS_DIR / "info.sh"
def _get_cache_hash(text: str) -> str:
@@ -45,9 +35,9 @@ def _get_cache_hash(text: str) -> str:
def _save_image_from_url(url: str, hash_id: str):
"""Downloads an image using httpx and saves it to the cache."""
temp_image_path = IMAGES_CACHE_DIR / f"{hash_id}.png.tmp"
image_path = IMAGES_CACHE_DIR / f"{hash_id}.png"
try:
temp_image_path = IMAGES_CACHE_DIR / f"{hash_id}.png.tmp"
image_path = IMAGES_CACHE_DIR / f"{hash_id}.png"
with httpx.stream("GET", url, follow_redirects=True, timeout=20) as response:
response.raise_for_status()
with temp_image_path.open("wb") as f:
@@ -69,25 +59,40 @@ def _save_info_text(info_text: str, hash_id: str):
logger.error(f"Failed to write info cache for {hash_id}: {e}")
def _format_info_text(item: MediaItem) -> str:
"""Uses Rich to format a media item's details into a string."""
from .anilist import anilist_data_helper
def _populate_info_template(item: MediaItem, config: AppConfig) -> str:
"""
Takes the info.sh template and injects formatted, shell-safe data.
"""
template = INFO_SCRIPT_TEMPLATE_PATH.read_text(encoding="utf-8")
description = formatters.clean_html(item.description or "No description available.")
io_buffer = StringIO()
console = Console(file=io_buffer, force_terminal=True, color_system="truecolor")
title = Text(
item.title.english or item.title.romaji or "Unknown Title", style="bold cyan"
)
description = anilist_data_helper.clean_html(
item.description or "No description available."
)
description = (description[:350] + "...") if len(description) > 350 else description
genres = f"[bold]Genres:[/bold] {', '.join(item.genres)}"
status = f"[bold]Status:[/bold] {item.status}"
score = f"[bold]Score:[/bold] {item.average_score / 10 if item.average_score else 'N/A'}"
panel_content = f"{genres}\n{status}\n{score}\n\n{description}"
console.print(Panel(panel_content, title=title, border_style="dim"))
return io_buffer.getvalue()
HEADER_COLOR = config.fzf.preview_header_color.split(",")
SEPARATOR_COLOR = config.fzf.preview_separator_color.split(",")
# Escape all variables before injecting them into the script
replacements = {
"TITLE": formatters.shell_safe(item.title.english or item.title.romaji),
"SCORE": formatters.shell_safe(
formatters.format_score_stars_full(item.average_score)
),
"STATUS": formatters.shell_safe(item.status),
"FAVOURITES": formatters.shell_safe(
formatters.format_number_with_commas(item.favourites)
),
"GENRES": formatters.shell_safe(formatters.format_genres(item.genres)),
"SYNOPSIS": formatters.shell_safe(description),
# Color codes
"C_TITLE": ansi.get_true_fg(HEADER_COLOR, bold=True),
"C_KEY": ansi.get_true_fg(HEADER_COLOR, bold=True),
"C_VALUE": ansi.get_true_fg(HEADER_COLOR, bold=True),
"C_RULE": ansi.get_true_fg(SEPARATOR_COLOR, bold=True),
"RESET": ansi.RESET,
}
for key, value in replacements.items():
template = template.replace(f"{{{key}}}", value)
return template
def _cache_worker(items: List[MediaItem], titles: List[str], config: AppConfig):
@@ -102,7 +107,7 @@ def _cache_worker(items: List[MediaItem], titles: List[str], config: AppConfig):
)
if config.general.preview in ("full", "text"):
if not (INFO_CACHE_DIR / hash_id).exists():
info_text = _format_info_text(item)
info_text = _populate_info_template(item, config)
executor.submit(_save_info_text, info_text, hash_id)
@@ -114,6 +119,10 @@ def get_anime_preview(
Starts a background task to cache preview data and returns the fzf preview command
by formatting a shell script template.
"""
# Ensure cache directories exist on startup
IMAGES_CACHE_DIR.mkdir(parents=True, exist_ok=True)
INFO_CACHE_DIR.mkdir(parents=True, exist_ok=True)
# Start the non-blocking background Caching
Thread(target=_cache_worker, args=(items, titles, config), daemon=True).start()
@@ -130,15 +139,17 @@ def get_anime_preview(
path_sep = "\\" if PLATFORM == "win32" else "/"
# Format the template with the dynamic values
final_script = template.format(
bash_functions=bash_functions,
preview_mode=config.general.preview,
image_cache_path=str(IMAGES_CACHE_DIR),
info_cache_path=str(INFO_CACHE_DIR),
path_sep=path_sep,
final_script = (
template.replace("{preview_mode}", config.general.preview)
.replace("{image_cache_path}", str(IMAGES_CACHE_DIR))
.replace("{info_cache_path}", str(INFO_CACHE_DIR))
.replace("{path_sep}", path_sep)
.replace("{image_renderer}", config.general.image_renderer)
)
# )
# Return the command for fzf to execute. `sh -c` is used to run the script string.
# The -- "{}" ensures that the selected item is passed as the first argument ($1)
# to the script, even if it contains spaces or special characters.
return f'sh -c {final_script!r} -- "{{}}"'
os.environ["SHELL"] = "bash"
return final_script

View File

@@ -1,33 +0,0 @@
import shutil
import subprocess
import requests
def print_img(url: str):
"""helper function to print an image given its url
Args:
url: [TODO:description]
"""
if EXECUTABLE := shutil.which("icat"):
subprocess.run([EXECUTABLE, url], check=False)
else:
EXECUTABLE = shutil.which("chafa")
if EXECUTABLE is None:
print("chafanot found")
return
res = requests.get(url)
if res.status_code != 200:
print("Error fetching image")
return
img_bytes = res.content
"""
Change made in call to chafa. Chafa dev dropped ability
to pull from urls. Keeping old line here just in case.
subprocess.run([EXECUTABLE, url, "--size=15x15"], input=img_bytes)
"""
subprocess.run([EXECUTABLE, "--size=15x15"], input=img_bytes, check=False)