mirror of
https://github.com/Benexl/FastAnime.git
synced 2025-12-31 23:15:51 -08:00
Merge pull request #177 from viu-media/dynamic-search-filters
Implement dynamic search enhancements (eg filters) and media info differentiation
This commit is contained in:
323
viu_media/assets/scripts/fzf/_filter_parser.py
Normal file
323
viu_media/assets/scripts/fzf/_filter_parser.py
Normal file
@@ -0,0 +1,323 @@
|
||||
#!/usr/bin/env python3
|
||||
"""
|
||||
Filter Parser for Dynamic Search
|
||||
|
||||
This module provides a parser for the special filter syntax used in dynamic search.
|
||||
Filter syntax allows users to add filters inline with their search query.
|
||||
|
||||
SYNTAX:
|
||||
@filter:value - Apply a filter with the given value
|
||||
@filter:value1,value2 - Apply multiple values (for array filters)
|
||||
@filter:!value - Exclude/negate a filter value
|
||||
|
||||
SUPPORTED FILTERS:
|
||||
@genre:action,comedy - Filter by genres
|
||||
@genre:!hentai - Exclude genre
|
||||
@status:airing - Filter by status (airing, finished, upcoming, cancelled, hiatus)
|
||||
@year:2024 - Filter by season year
|
||||
@season:winter - Filter by season (winter, spring, summer, fall)
|
||||
@format:tv,movie - Filter by format (tv, movie, ova, ona, special, music)
|
||||
@sort:score - Sort by (score, popularity, trending, title, date)
|
||||
@score:>80 - Minimum score
|
||||
@score:<50 - Maximum score
|
||||
@popularity:>10000 - Minimum popularity
|
||||
@onlist - Only show anime on user's list
|
||||
@onlist:false - Only show anime NOT on user's list
|
||||
|
||||
EXAMPLES:
|
||||
"naruto @genre:action @status:finished"
|
||||
"isekai @year:2024 @season:winter @sort:score"
|
||||
"@genre:action,adventure @status:airing"
|
||||
"romance @genre:!hentai @format:tv,movie"
|
||||
"""
|
||||
|
||||
import re
|
||||
from typing import Any, Dict, List, Optional, Tuple
|
||||
|
||||
# Mapping of user-friendly filter names to GraphQL variable names
|
||||
FILTER_ALIASES = {
|
||||
# Status aliases
|
||||
"airing": "RELEASING",
|
||||
"releasing": "RELEASING",
|
||||
"finished": "FINISHED",
|
||||
"completed": "FINISHED",
|
||||
"upcoming": "NOT_YET_RELEASED",
|
||||
"not_yet_released": "NOT_YET_RELEASED",
|
||||
"unreleased": "NOT_YET_RELEASED",
|
||||
"cancelled": "CANCELLED",
|
||||
"canceled": "CANCELLED",
|
||||
"hiatus": "HIATUS",
|
||||
"paused": "HIATUS",
|
||||
# Format aliases
|
||||
"tv": "TV",
|
||||
"tv_short": "TV_SHORT",
|
||||
"tvshort": "TV_SHORT",
|
||||
"movie": "MOVIE",
|
||||
"film": "MOVIE",
|
||||
"ova": "OVA",
|
||||
"ona": "ONA",
|
||||
"special": "SPECIAL",
|
||||
"music": "MUSIC",
|
||||
# Season aliases
|
||||
"winter": "WINTER",
|
||||
"spring": "SPRING",
|
||||
"summer": "SUMMER",
|
||||
"fall": "FALL",
|
||||
"autumn": "FALL",
|
||||
# Sort aliases
|
||||
"score": "SCORE_DESC",
|
||||
"score_desc": "SCORE_DESC",
|
||||
"score_asc": "SCORE",
|
||||
"popularity": "POPULARITY_DESC",
|
||||
"popularity_desc": "POPULARITY_DESC",
|
||||
"popularity_asc": "POPULARITY",
|
||||
"trending": "TRENDING_DESC",
|
||||
"trending_desc": "TRENDING_DESC",
|
||||
"trending_asc": "TRENDING",
|
||||
"title": "TITLE_ROMAJI",
|
||||
"title_desc": "TITLE_ROMAJI_DESC",
|
||||
"date": "START_DATE_DESC",
|
||||
"date_desc": "START_DATE_DESC",
|
||||
"date_asc": "START_DATE",
|
||||
"newest": "START_DATE_DESC",
|
||||
"oldest": "START_DATE",
|
||||
"favourites": "FAVOURITES_DESC",
|
||||
"favorites": "FAVOURITES_DESC",
|
||||
"episodes": "EPISODES_DESC",
|
||||
}
|
||||
|
||||
# Genre name normalization (lowercase -> proper case)
|
||||
GENRE_NAMES = {
|
||||
"action": "Action",
|
||||
"adventure": "Adventure",
|
||||
"comedy": "Comedy",
|
||||
"drama": "Drama",
|
||||
"ecchi": "Ecchi",
|
||||
"fantasy": "Fantasy",
|
||||
"horror": "Horror",
|
||||
"mahou_shoujo": "Mahou Shoujo",
|
||||
"mahou": "Mahou Shoujo",
|
||||
"magical_girl": "Mahou Shoujo",
|
||||
"mecha": "Mecha",
|
||||
"music": "Music",
|
||||
"mystery": "Mystery",
|
||||
"psychological": "Psychological",
|
||||
"romance": "Romance",
|
||||
"sci-fi": "Sci-Fi",
|
||||
"scifi": "Sci-Fi",
|
||||
"sci_fi": "Sci-Fi",
|
||||
"slice_of_life": "Slice of Life",
|
||||
"sol": "Slice of Life",
|
||||
"sports": "Sports",
|
||||
"supernatural": "Supernatural",
|
||||
"thriller": "Thriller",
|
||||
"hentai": "Hentai",
|
||||
}
|
||||
|
||||
# Filter pattern: @key:value or @key (boolean flags)
|
||||
FILTER_PATTERN = re.compile(r"@(\w+)(?::([^\s]+))?", re.IGNORECASE)
|
||||
|
||||
# Comparison operators for numeric filters
|
||||
COMPARISON_PATTERN = re.compile(r"^([<>]=?)?(\d+)$")
|
||||
|
||||
|
||||
def normalize_value(value: str, value_type: str) -> str:
|
||||
"""Normalize a filter value based on its type."""
|
||||
value_lower = value.lower().strip()
|
||||
|
||||
if value_type == "genre":
|
||||
return GENRE_NAMES.get(value_lower, value.title())
|
||||
elif value_type in ("status", "format", "season", "sort"):
|
||||
return FILTER_ALIASES.get(value_lower, value.upper())
|
||||
|
||||
return value
|
||||
|
||||
|
||||
def parse_value_list(value_str: str) -> Tuple[List[str], List[str]]:
|
||||
"""
|
||||
Parse a comma-separated value string, separating includes from excludes.
|
||||
|
||||
Returns:
|
||||
Tuple of (include_values, exclude_values)
|
||||
"""
|
||||
includes = []
|
||||
excludes = []
|
||||
|
||||
for val in value_str.split(","):
|
||||
val = val.strip()
|
||||
if not val:
|
||||
continue
|
||||
if val.startswith("!"):
|
||||
excludes.append(val[1:])
|
||||
else:
|
||||
includes.append(val)
|
||||
|
||||
return includes, excludes
|
||||
|
||||
|
||||
def parse_comparison(value: str) -> Tuple[Optional[str], Optional[int]]:
|
||||
"""
|
||||
Parse a comparison value like ">80" or "<50".
|
||||
|
||||
Returns:
|
||||
Tuple of (operator, number) or (None, None) if invalid
|
||||
"""
|
||||
match = COMPARISON_PATTERN.match(value)
|
||||
if match:
|
||||
operator = match.group(1) or ">" # Default to greater than
|
||||
number = int(match.group(2))
|
||||
return operator, number
|
||||
return None, None
|
||||
|
||||
|
||||
def parse_filters(query: str) -> Tuple[str, Dict[str, Any]]:
|
||||
"""
|
||||
Parse a search query and extract filter directives.
|
||||
|
||||
Args:
|
||||
query: The full search query including filter syntax
|
||||
|
||||
Returns:
|
||||
Tuple of (clean_query, filters_dict)
|
||||
- clean_query: The query with filter syntax removed
|
||||
- filters_dict: Dictionary of GraphQL variables to apply
|
||||
"""
|
||||
filters: Dict[str, Any] = {}
|
||||
|
||||
# Find all filter matches
|
||||
matches = list(FILTER_PATTERN.finditer(query))
|
||||
|
||||
for match in matches:
|
||||
filter_name = match.group(1).lower()
|
||||
filter_value = match.group(2) # May be None for boolean flags
|
||||
|
||||
# Handle different filter types
|
||||
if filter_name == "genre":
|
||||
if filter_value:
|
||||
includes, excludes = parse_value_list(filter_value)
|
||||
if includes:
|
||||
normalized = [normalize_value(v, "genre") for v in includes]
|
||||
filters.setdefault("genre_in", []).extend(normalized)
|
||||
if excludes:
|
||||
normalized = [normalize_value(v, "genre") for v in excludes]
|
||||
filters.setdefault("genre_not_in", []).extend(normalized)
|
||||
|
||||
elif filter_name == "status":
|
||||
if filter_value:
|
||||
includes, excludes = parse_value_list(filter_value)
|
||||
if includes:
|
||||
normalized = [normalize_value(v, "status") for v in includes]
|
||||
filters.setdefault("status_in", []).extend(normalized)
|
||||
if excludes:
|
||||
normalized = [normalize_value(v, "status") for v in excludes]
|
||||
filters.setdefault("status_not_in", []).extend(normalized)
|
||||
|
||||
elif filter_name == "format":
|
||||
if filter_value:
|
||||
includes, _ = parse_value_list(filter_value)
|
||||
if includes:
|
||||
normalized = [normalize_value(v, "format") for v in includes]
|
||||
filters.setdefault("format_in", []).extend(normalized)
|
||||
|
||||
elif filter_name == "year":
|
||||
if filter_value:
|
||||
try:
|
||||
filters["seasonYear"] = int(filter_value)
|
||||
except ValueError:
|
||||
pass # Invalid year, skip
|
||||
|
||||
elif filter_name == "season":
|
||||
if filter_value:
|
||||
filters["season"] = normalize_value(filter_value, "season")
|
||||
|
||||
elif filter_name == "sort":
|
||||
if filter_value:
|
||||
sort_val = normalize_value(filter_value, "sort")
|
||||
filters["sort"] = [sort_val]
|
||||
|
||||
elif filter_name == "score":
|
||||
if filter_value:
|
||||
op, num = parse_comparison(filter_value)
|
||||
if num is not None:
|
||||
if op in (">", ">="):
|
||||
filters["averageScore_greater"] = num
|
||||
elif op in ("<", "<="):
|
||||
filters["averageScore_lesser"] = num
|
||||
|
||||
elif filter_name == "popularity":
|
||||
if filter_value:
|
||||
op, num = parse_comparison(filter_value)
|
||||
if num is not None:
|
||||
if op in (">", ">="):
|
||||
filters["popularity_greater"] = num
|
||||
elif op in ("<", "<="):
|
||||
filters["popularity_lesser"] = num
|
||||
|
||||
elif filter_name == "onlist":
|
||||
if filter_value is None or filter_value.lower() in ("true", "yes", "1"):
|
||||
filters["on_list"] = True
|
||||
elif filter_value.lower() in ("false", "no", "0"):
|
||||
filters["on_list"] = False
|
||||
|
||||
elif filter_name == "tag":
|
||||
if filter_value:
|
||||
includes, excludes = parse_value_list(filter_value)
|
||||
if includes:
|
||||
# Tags use title case typically
|
||||
normalized = [v.replace("_", " ").title() for v in includes]
|
||||
filters.setdefault("tag_in", []).extend(normalized)
|
||||
if excludes:
|
||||
normalized = [v.replace("_", " ").title() for v in excludes]
|
||||
filters.setdefault("tag_not_in", []).extend(normalized)
|
||||
|
||||
# Remove filter syntax from query to get clean search text
|
||||
clean_query = FILTER_PATTERN.sub("", query).strip()
|
||||
# Clean up multiple spaces
|
||||
clean_query = re.sub(r"\s+", " ", clean_query).strip()
|
||||
|
||||
return clean_query, filters
|
||||
|
||||
|
||||
def get_help_text() -> str:
|
||||
"""Return a help string describing the filter syntax."""
|
||||
return """
|
||||
╭─────────────────── Filter Syntax Help ───────────────────╮
|
||||
│ │
|
||||
│ @genre:action,comedy Filter by genres │
|
||||
│ @genre:!hentai Exclude genre │
|
||||
│ @status:airing Status: airing, finished, │
|
||||
│ upcoming, cancelled, hiatus │
|
||||
│ @year:2024 Filter by year │
|
||||
│ @season:winter winter, spring, summer, fall │
|
||||
│ @format:tv,movie tv, movie, ova, ona, special │
|
||||
│ @sort:score score, popularity, trending, │
|
||||
│ date, title, newest, oldest │
|
||||
│ @score:>80 Minimum score │
|
||||
│ @score:<50 Maximum score │
|
||||
│ @popularity:>10000 Minimum popularity │
|
||||
│ @onlist Only on your list │
|
||||
│ @onlist:false Not on your list │
|
||||
│ @tag:isekai,reincarnation Filter by tags │
|
||||
│ │
|
||||
│ Examples: │
|
||||
│ naruto @genre:action @status:finished │
|
||||
│ @genre:action,adventure @year:2024 @sort:score │
|
||||
│ isekai @season:winter @year:2024 │
|
||||
│ │
|
||||
╰──────────────────────────────────────────────────────────╯
|
||||
""".strip()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
# Test the parser
|
||||
import json
|
||||
import sys
|
||||
|
||||
if len(sys.argv) > 1:
|
||||
test_query = " ".join(sys.argv[1:])
|
||||
clean, filters = parse_filters(test_query)
|
||||
print(f"Original: {test_query}")
|
||||
print(f"Clean query: {clean}")
|
||||
print(f"Filters: {json.dumps(filters, indent=2)}")
|
||||
else:
|
||||
print(get_help_text())
|
||||
@@ -45,6 +45,15 @@ def format_number(num):
|
||||
return f"{num:,}"
|
||||
|
||||
|
||||
def format_score_stars(score):
|
||||
"""Format score as stars out of 6."""
|
||||
if score is None:
|
||||
return "N/A"
|
||||
# Convert 0-100 score to 0-6 stars, capped at 6 for consistency
|
||||
stars = min(round(score * 6 / 100), 6)
|
||||
return "⭐" * stars + f" ({score}/100)"
|
||||
|
||||
|
||||
def format_date(date_obj):
|
||||
"""Format date object to string."""
|
||||
if not date_obj or date_obj == "null":
|
||||
@@ -342,31 +351,68 @@ def main():
|
||||
# Extract data
|
||||
status = media.get("status", "Unknown")
|
||||
format_type = media.get("format", "Unknown")
|
||||
episodes = media.get("episodes", "?")
|
||||
episodes = media.get("episodes", "??")
|
||||
duration = media.get("duration")
|
||||
duration_str = f"{duration} min" if duration else "Unknown"
|
||||
duration_str = f"{duration} min/ep" if duration else "Unknown"
|
||||
|
||||
score = media.get("averageScore")
|
||||
score_str = f"{score}/100" if score else "N/A"
|
||||
score_str = format_score_stars(score)
|
||||
|
||||
favourites = format_number(media.get("favourites", 0))
|
||||
popularity = format_number(media.get("popularity", 0))
|
||||
|
||||
genres = ", ".join(media.get("genres", [])[:5]) or "Unknown"
|
||||
genres = ", ".join(media.get("genres", [])) or "Unknown"
|
||||
|
||||
start_date = format_date(media.get("startDate"))
|
||||
end_date = format_date(media.get("endDate"))
|
||||
|
||||
studios_list = media.get("studios", {}).get("nodes", [])
|
||||
studios = ", ".join([s.get("name", "") for s in studios_list[:3]]) or "Unknown"
|
||||
# Studios are those with isAnimationStudio=true
|
||||
studios = ", ".join([s["name"] for s in studios_list if s.get("name") and s.get("isAnimationStudio")]) or "N/A"
|
||||
# Producers are those with isAnimationStudio=false
|
||||
producers = ", ".join([s["name"] for s in studios_list if s.get("name") and not s.get("isAnimationStudio")]) or "N/A"
|
||||
|
||||
synonyms_list = media.get("synonyms", [])
|
||||
synonyms = ", ".join(synonyms_list[:3]) or "N/A"
|
||||
# Include romaji in synonyms if different from title
|
||||
romaji = title_obj.get("romaji")
|
||||
if romaji and romaji != title and romaji not in synonyms_list:
|
||||
synonyms_list = [romaji] + synonyms_list
|
||||
synonyms = ", ".join(synonyms_list) or "N/A"
|
||||
|
||||
# Tags
|
||||
tags_list = media.get("tags", [])
|
||||
tags = ", ".join([t.get("name", "") for t in tags_list if t.get("name")]) or "N/A"
|
||||
|
||||
# Next airing episode
|
||||
next_airing = media.get("nextAiringEpisode")
|
||||
if next_airing:
|
||||
next_ep = next_airing.get("episode", "?")
|
||||
airing_at = next_airing.get("airingAt")
|
||||
if airing_at:
|
||||
from datetime import datetime
|
||||
try:
|
||||
dt = datetime.fromtimestamp(airing_at)
|
||||
next_episode_str = f"Episode {next_ep} on {dt.strftime('%A, %d %B %Y at %H:%M')}"
|
||||
except (ValueError, OSError):
|
||||
next_episode_str = f"Episode {next_ep}"
|
||||
else:
|
||||
next_episode_str = f"Episode {next_ep}"
|
||||
else:
|
||||
next_episode_str = "N/A"
|
||||
|
||||
# User list status
|
||||
media_list_entry = media.get("mediaListEntry")
|
||||
if media_list_entry:
|
||||
user_status = media_list_entry.get("status", "NOT_ON_LIST")
|
||||
user_progress = f"Episode {media_list_entry.get('progress', 0)}"
|
||||
else:
|
||||
user_status = "NOT_ON_LIST"
|
||||
user_progress = "0"
|
||||
|
||||
description = media.get("description", "No description available.")
|
||||
description = strip_markdown(description)
|
||||
|
||||
# Print sections matching media_info.py structure
|
||||
# Print sections matching media_info.py structure exactly
|
||||
rows = [
|
||||
("Score", score_str),
|
||||
("Favorites", favourites),
|
||||
@@ -376,16 +422,17 @@ def main():
|
||||
|
||||
print_rule(SEPARATOR_COLOR)
|
||||
for key, value in rows:
|
||||
print_table_row(key, value, HEADER_COLOR, 0, 0)
|
||||
print_table_row(key, value, HEADER_COLOR, 15, term_width - 20)
|
||||
|
||||
rows = [
|
||||
("Episodes", str(episodes)),
|
||||
("Duration", duration_str),
|
||||
("Next Episode", next_episode_str),
|
||||
]
|
||||
|
||||
print_rule(SEPARATOR_COLOR)
|
||||
for key, value in rows:
|
||||
print_table_row(key, value, HEADER_COLOR, 0, 0)
|
||||
print_table_row(key, value, HEADER_COLOR, 15, term_width - 20)
|
||||
|
||||
rows = [
|
||||
("Genres", genres),
|
||||
@@ -394,7 +441,16 @@ def main():
|
||||
|
||||
print_rule(SEPARATOR_COLOR)
|
||||
for key, value in rows:
|
||||
print_table_row(key, value, HEADER_COLOR, 0, 0)
|
||||
print_table_row(key, value, HEADER_COLOR, 15, term_width - 20)
|
||||
|
||||
rows = [
|
||||
("List Status", user_status),
|
||||
("Progress", user_progress),
|
||||
]
|
||||
|
||||
print_rule(SEPARATOR_COLOR)
|
||||
for key, value in rows:
|
||||
print_table_row(key, value, HEADER_COLOR, 15, term_width - 20)
|
||||
|
||||
rows = [
|
||||
("Start Date", start_date),
|
||||
@@ -403,15 +459,16 @@ def main():
|
||||
|
||||
print_rule(SEPARATOR_COLOR)
|
||||
for key, value in rows:
|
||||
print_table_row(key, value, HEADER_COLOR, 0, 0)
|
||||
print_table_row(key, value, HEADER_COLOR, 15, term_width - 20)
|
||||
|
||||
rows = [
|
||||
("Studios", studios),
|
||||
("Producers", producers),
|
||||
]
|
||||
|
||||
print_rule(SEPARATOR_COLOR)
|
||||
for key, value in rows:
|
||||
print_table_row(key, value, HEADER_COLOR, 0, 0)
|
||||
print_table_row(key, value, HEADER_COLOR, 15, term_width - 20)
|
||||
|
||||
rows = [
|
||||
("Synonyms", synonyms),
|
||||
@@ -419,7 +476,15 @@ def main():
|
||||
|
||||
print_rule(SEPARATOR_COLOR)
|
||||
for key, value in rows:
|
||||
print_table_row(key, value, HEADER_COLOR, 0, 0)
|
||||
print_table_row(key, value, HEADER_COLOR, 15, term_width - 20)
|
||||
|
||||
rows = [
|
||||
("Tags", tags),
|
||||
]
|
||||
|
||||
print_rule(SEPARATOR_COLOR)
|
||||
for key, value in rows:
|
||||
print_table_row(key, value, HEADER_COLOR, 15, term_width - 20)
|
||||
|
||||
print_rule(SEPARATOR_COLOR)
|
||||
print(wrap_text(description, term_width))
|
||||
|
||||
@@ -67,6 +67,7 @@ for key, value in rows:
|
||||
|
||||
rows = [
|
||||
("Studios", "{STUDIOS}"),
|
||||
("Producers", "{PRODUCERS}"),
|
||||
]
|
||||
|
||||
print_rule(SEPARATOR_COLOR)
|
||||
|
||||
@@ -5,6 +5,18 @@
|
||||
# This script is a template for dynamic search functionality in fzf.
|
||||
# The placeholders in curly braces, like {GRAPHQL_ENDPOINT} are dynamically
|
||||
# filled by Python using .replace() during runtime.
|
||||
#
|
||||
# FILTER SYNTAX:
|
||||
# @genre:action,comedy Filter by genres
|
||||
# @genre:!hentai Exclude genre
|
||||
# @status:airing Status: airing, finished, upcoming, cancelled, hiatus
|
||||
# @year:2024 Filter by year
|
||||
# @season:winter winter, spring, summer, fall
|
||||
# @format:tv,movie tv, movie, ova, ona, special
|
||||
# @sort:score score, popularity, trending, date, title
|
||||
# @score:>80 / @score:<50 Min/max score
|
||||
# @onlist / @onlist:false Filter by list status
|
||||
# @tag:isekai Filter by tags
|
||||
|
||||
import json
|
||||
import sys
|
||||
@@ -12,9 +24,13 @@ from pathlib import Path
|
||||
from urllib import request
|
||||
from urllib.error import URLError
|
||||
|
||||
# Import the filter parser
|
||||
from _filter_parser import parse_filters
|
||||
|
||||
# --- Template Variables (Injected by Python) ---
|
||||
GRAPHQL_ENDPOINT = "{GRAPHQL_ENDPOINT}"
|
||||
SEARCH_RESULTS_FILE = Path("{SEARCH_RESULTS_FILE}")
|
||||
LAST_QUERY_FILE = Path("{LAST_QUERY_FILE}")
|
||||
AUTH_HEADER = "{AUTH_HEADER}"
|
||||
|
||||
# The GraphQL query is injected as a properly escaped JSON string
|
||||
@@ -22,17 +38,29 @@ GRAPHQL_QUERY = "{GRAPHQL_QUERY}"
|
||||
|
||||
# --- Get Query from fzf ---
|
||||
# fzf passes the current query as the first argument when using --bind change:reload
|
||||
QUERY = sys.argv[1] if len(sys.argv) > 1 else ""
|
||||
RAW_QUERY = sys.argv[1] if len(sys.argv) > 1 else ""
|
||||
|
||||
# If query is empty, exit with empty results
|
||||
if not QUERY.strip():
|
||||
print("")
|
||||
# Parse the query to extract filters and clean search text
|
||||
QUERY, PARSED_FILTERS = parse_filters(RAW_QUERY)
|
||||
|
||||
# If query is empty and no filters, show help hint
|
||||
if not RAW_QUERY.strip():
|
||||
print("💡 Tip: Use @genre:action @status:airing for filters (type @help for syntax)")
|
||||
sys.exit(0)
|
||||
|
||||
# Show filter help if requested
|
||||
if RAW_QUERY.strip().lower() in ("@help", "@?", "@h"):
|
||||
from _filter_parser import get_help_text
|
||||
print(get_help_text())
|
||||
sys.exit(0)
|
||||
|
||||
# If we only have filters (no search text), that's valid - we'll search with filters only
|
||||
# But if we have neither query nor filters, we already showed the help hint above
|
||||
|
||||
|
||||
def make_graphql_request(
|
||||
endpoint: str, query: str, variables: dict, auth_token: str = ""
|
||||
) -> dict | None:
|
||||
) -> tuple[dict | None, str | None]:
|
||||
"""
|
||||
Make a GraphQL request to the specified endpoint.
|
||||
|
||||
@@ -43,7 +71,7 @@ def make_graphql_request(
|
||||
auth_token: Optional authorization token (Bearer token)
|
||||
|
||||
Returns:
|
||||
Response JSON as a dictionary, or None if request fails
|
||||
Tuple of (Response JSON, error message) - one will be None
|
||||
"""
|
||||
payload = {"query": query, "variables": variables}
|
||||
|
||||
@@ -61,10 +89,13 @@ def make_graphql_request(
|
||||
)
|
||||
|
||||
with request.urlopen(req, timeout=10) as response:
|
||||
return json.loads(response.read().decode("utf-8"))
|
||||
except (URLError, json.JSONDecodeError, Exception) as e:
|
||||
print(f"❌ Request failed: {e}", file=sys.stderr)
|
||||
return None
|
||||
return json.loads(response.read().decode("utf-8")), None
|
||||
except URLError as e:
|
||||
return None, f"Network error: {e.reason}"
|
||||
except json.JSONDecodeError as e:
|
||||
return None, f"Invalid response: {e}"
|
||||
except Exception as e:
|
||||
return None, f"Request error: {e}"
|
||||
|
||||
|
||||
def extract_title(media_item: dict) -> str:
|
||||
@@ -90,34 +121,67 @@ def main():
|
||||
# Ensure parent directory exists
|
||||
SEARCH_RESULTS_FILE.parent.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Create GraphQL variables
|
||||
# Base GraphQL variables
|
||||
variables = {
|
||||
"query": QUERY,
|
||||
"type": "ANIME",
|
||||
"per_page": 50,
|
||||
"genre_not_in": ["Hentai"],
|
||||
"genre_not_in": ["Hentai"], # Default exclusion
|
||||
}
|
||||
|
||||
# Add search query if provided
|
||||
if QUERY:
|
||||
variables["query"] = QUERY
|
||||
|
||||
# Apply parsed filters from the filter syntax
|
||||
for key, value in PARSED_FILTERS.items():
|
||||
# Handle array merging for _in and _not_in fields
|
||||
if key.endswith("_in") or key.endswith("_not_in"):
|
||||
if key in variables:
|
||||
# Merge arrays, avoiding duplicates
|
||||
existing = set(variables[key])
|
||||
existing.update(value)
|
||||
variables[key] = list(existing)
|
||||
else:
|
||||
variables[key] = value
|
||||
else:
|
||||
variables[key] = value
|
||||
|
||||
# Make the GraphQL request
|
||||
response = make_graphql_request(
|
||||
response, error = make_graphql_request(
|
||||
GRAPHQL_ENDPOINT, GRAPHQL_QUERY, variables, AUTH_HEADER
|
||||
)
|
||||
|
||||
if response is None:
|
||||
print("❌ Search failed")
|
||||
if error:
|
||||
print(f"❌ {error}")
|
||||
# Also show what we tried to search for debugging
|
||||
print(f" Query: {QUERY or '(none)'}")
|
||||
print(f" Filters: {json.dumps(PARSED_FILTERS) if PARSED_FILTERS else '(none)'}")
|
||||
sys.exit(1)
|
||||
|
||||
if response is None:
|
||||
print("❌ Search failed: No response received")
|
||||
sys.exit(1)
|
||||
|
||||
# Check for GraphQL errors first (these come in the response body)
|
||||
if "errors" in response:
|
||||
errors = response["errors"]
|
||||
if errors:
|
||||
# Extract error messages
|
||||
error_msgs = [e.get("message", str(e)) for e in errors]
|
||||
print(f"❌ API Error: {'; '.join(error_msgs)}")
|
||||
# Show variables for debugging
|
||||
print(f" Filters used: {json.dumps(PARSED_FILTERS, indent=2) if PARSED_FILTERS else '(none)'}")
|
||||
sys.exit(1)
|
||||
|
||||
# Save the raw response for later processing by dynamic_search.py
|
||||
try:
|
||||
with open(SEARCH_RESULTS_FILE, "w", encoding="utf-8") as f:
|
||||
json.dump(response, f, ensure_ascii=False, indent=2)
|
||||
# Also save the raw query so it can be restored when going back
|
||||
with open(LAST_QUERY_FILE, "w", encoding="utf-8") as f:
|
||||
f.write(RAW_QUERY)
|
||||
except IOError as e:
|
||||
print(f"❌ Failed to save results: {e}", file=sys.stderr)
|
||||
sys.exit(1)
|
||||
|
||||
# Parse and display results
|
||||
if "errors" in response:
|
||||
print(f"❌ Search error: {response['errors']}")
|
||||
print(f"❌ Failed to save results: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
# Navigate the response structure
|
||||
@@ -126,7 +190,9 @@ def main():
|
||||
media_list = page.get("media", [])
|
||||
|
||||
if not media_list:
|
||||
print("❌ No results found")
|
||||
print("🔍 No results found")
|
||||
if PARSED_FILTERS:
|
||||
print(" Try adjusting your filters")
|
||||
sys.exit(0)
|
||||
|
||||
# Output titles for fzf (one per line)
|
||||
@@ -141,5 +207,5 @@ if __name__ == "__main__":
|
||||
except KeyboardInterrupt:
|
||||
sys.exit(0)
|
||||
except Exception as e:
|
||||
print(f"❌ Unexpected error: {e}", file=sys.stderr)
|
||||
print(f"❌ Unexpected error: {type(e).__name__}: {e}")
|
||||
sys.exit(1)
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
import json
|
||||
import logging
|
||||
import shutil
|
||||
from pathlib import Path
|
||||
|
||||
from .....core.constants import APP_CACHE_DIR, SCRIPTS_DIR
|
||||
@@ -12,8 +13,36 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
SEARCH_CACHE_DIR = APP_CACHE_DIR / "previews" / "dynamic-search"
|
||||
SEARCH_RESULTS_FILE = SEARCH_CACHE_DIR / "current_search_results.json"
|
||||
LAST_QUERY_FILE = SEARCH_CACHE_DIR / "last_query.txt"
|
||||
RESTORE_MODE_FILE = SEARCH_CACHE_DIR / ".restore_mode"
|
||||
FZF_SCRIPTS_DIR = SCRIPTS_DIR / "fzf"
|
||||
SEARCH_TEMPLATE_SCRIPT = (FZF_SCRIPTS_DIR / "search.py").read_text(encoding="utf-8")
|
||||
FILTER_PARSER_SCRIPT = FZF_SCRIPTS_DIR / "_filter_parser.py"
|
||||
|
||||
|
||||
def _load_cached_titles() -> list[str]:
|
||||
"""Load titles from cached search results for display in fzf."""
|
||||
if not SEARCH_RESULTS_FILE.exists():
|
||||
return []
|
||||
|
||||
try:
|
||||
with open(SEARCH_RESULTS_FILE, "r", encoding="utf-8") as f:
|
||||
data = json.load(f)
|
||||
|
||||
media_list = data.get("data", {}).get("Page", {}).get("media", [])
|
||||
titles = []
|
||||
for media in media_list:
|
||||
title_obj = media.get("title", {})
|
||||
title = (
|
||||
title_obj.get("english")
|
||||
or title_obj.get("romaji")
|
||||
or title_obj.get("native")
|
||||
or "Unknown"
|
||||
)
|
||||
titles.append(title)
|
||||
return titles
|
||||
except (IOError, json.JSONDecodeError):
|
||||
return []
|
||||
|
||||
|
||||
@session.menu
|
||||
@@ -25,6 +54,12 @@ def dynamic_search(ctx: Context, state: State) -> State | InternalDirective:
|
||||
# Ensure cache directory exists
|
||||
SEARCH_CACHE_DIR.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Check if we're in restore mode (coming back from media_actions)
|
||||
restore_mode = RESTORE_MODE_FILE.exists()
|
||||
if restore_mode:
|
||||
# Clear the restore flag
|
||||
RESTORE_MODE_FILE.unlink(missing_ok=True)
|
||||
|
||||
# Read the GraphQL search query
|
||||
from .....libs.media_api.anilist import gql
|
||||
|
||||
@@ -44,6 +79,7 @@ def dynamic_search(ctx: Context, state: State) -> State | InternalDirective:
|
||||
"GRAPHQL_ENDPOINT": "https://graphql.anilist.co",
|
||||
"GRAPHQL_QUERY": search_query_json,
|
||||
"SEARCH_RESULTS_FILE": SEARCH_RESULTS_FILE.as_posix(),
|
||||
"LAST_QUERY_FILE": LAST_QUERY_FILE.as_posix(),
|
||||
"AUTH_HEADER": auth_header,
|
||||
}
|
||||
|
||||
@@ -54,12 +90,34 @@ def dynamic_search(ctx: Context, state: State) -> State | InternalDirective:
|
||||
search_script_file = SEARCH_CACHE_DIR / "search.py"
|
||||
search_script_file.write_text(search_command, encoding="utf-8")
|
||||
|
||||
# Copy the filter parser module to the cache directory
|
||||
# This is required for the search script to import it
|
||||
filter_parser_dest = SEARCH_CACHE_DIR / "_filter_parser.py"
|
||||
if FILTER_PARSER_SCRIPT.exists():
|
||||
shutil.copy2(FILTER_PARSER_SCRIPT, filter_parser_dest)
|
||||
|
||||
# Make the search script executable by calling it with python3
|
||||
# fzf will pass the query as {q} which becomes the first argument
|
||||
search_command_final = (
|
||||
f"{Path(get_python_executable()).as_posix()} {search_script_file.as_posix()} {{q}}"
|
||||
)
|
||||
|
||||
# Header hint for filter syntax
|
||||
filter_hint = "💡 Filters: @genre:action @status:airing @year:2024 @sort:score (type @help for more)"
|
||||
|
||||
# Only load previous query if we're in restore mode (coming back from media_actions)
|
||||
initial_query = None
|
||||
cached_results = None
|
||||
if restore_mode:
|
||||
# Load previous query
|
||||
if LAST_QUERY_FILE.exists():
|
||||
try:
|
||||
initial_query = LAST_QUERY_FILE.read_text(encoding="utf-8").strip()
|
||||
except IOError:
|
||||
pass
|
||||
# Load cached results to display immediately without network request
|
||||
cached_results = _load_cached_titles()
|
||||
|
||||
try:
|
||||
# Prepare preview functionality
|
||||
preview_command = None
|
||||
@@ -73,11 +131,17 @@ def dynamic_search(ctx: Context, state: State) -> State | InternalDirective:
|
||||
prompt="Search Anime",
|
||||
search_command=search_command_final,
|
||||
preview=preview_command,
|
||||
header=filter_hint,
|
||||
initial_query=initial_query,
|
||||
initial_results=cached_results,
|
||||
)
|
||||
else:
|
||||
choice = ctx.selector.search(
|
||||
prompt="Search Anime",
|
||||
search_command=search_command_final,
|
||||
header=filter_hint,
|
||||
initial_query=initial_query,
|
||||
initial_results=cached_results,
|
||||
)
|
||||
except NotImplementedError:
|
||||
feedback.error("Dynamic search is not supported by your current selector")
|
||||
@@ -116,6 +180,9 @@ def dynamic_search(ctx: Context, state: State) -> State | InternalDirective:
|
||||
logger.error(f"Could not find selected media for choice: {choice}")
|
||||
return InternalDirective.MAIN
|
||||
|
||||
# Set restore mode flag so we can restore state when user goes back
|
||||
RESTORE_MODE_FILE.touch()
|
||||
|
||||
# Navigate to media actions with the selected item
|
||||
return State(
|
||||
menu_name=MenuName.MEDIA_ACTIONS,
|
||||
|
||||
@@ -189,7 +189,12 @@ class PreviewCacheWorker(ManagedBackgroundWorker):
|
||||
),
|
||||
"STUDIOS": formatter.shell_safe(
|
||||
formatter.format_list_with_commas(
|
||||
[t.name for t in media_item.studios if t.name]
|
||||
[t.name for t in media_item.studios if t.name and t.is_animation_studio]
|
||||
)
|
||||
),
|
||||
"PRODUCERS": formatter.shell_safe(
|
||||
formatter.format_list_with_commas(
|
||||
[t.name for t in media_item.studios if t.name and not t.is_animation_studio]
|
||||
)
|
||||
),
|
||||
"SYNONYMNS": formatter.shell_safe(
|
||||
|
||||
@@ -184,13 +184,22 @@ def format_score(score: Optional[float]) -> str:
|
||||
|
||||
def shell_safe(text: Optional[str]) -> str:
|
||||
"""
|
||||
Escapes a string for safe inclusion in a shell script,
|
||||
specifically for use within double quotes. It escapes backticks,
|
||||
double quotes, and dollar signs.
|
||||
Escapes a string for safe inclusion in a Python script string literal.
|
||||
This is used when generating Python cache scripts with embedded text content.
|
||||
|
||||
For Python triple-quoted strings, we need to:
|
||||
- Escape backslashes first (so existing backslashes don't interfere)
|
||||
- Escape triple quotes (to not break the string literal)
|
||||
- Remove or replace problematic characters
|
||||
"""
|
||||
if not text:
|
||||
return ""
|
||||
return text.replace("`", "\\`").replace('"', '\\"').replace("$", "\\$")
|
||||
# Escape backslashes first
|
||||
result = text.replace("\\", "\\\\")
|
||||
# Escape triple quotes (both types) for Python triple-quoted string literals
|
||||
result = result.replace('"""', r'\"\"\"')
|
||||
result = result.replace("'''", r"\'\'\'")
|
||||
return result
|
||||
|
||||
|
||||
def extract_episode_number(title: str) -> Optional[float]:
|
||||
|
||||
@@ -88,6 +88,8 @@ class BaseSelector(ABC):
|
||||
*,
|
||||
preview: Optional[str] = None,
|
||||
header: Optional[str] = None,
|
||||
initial_query: Optional[str] = None,
|
||||
initial_results: Optional[List[str]] = None,
|
||||
) -> str | None:
|
||||
"""
|
||||
Provides dynamic search functionality that reloads results based on user input.
|
||||
@@ -97,6 +99,8 @@ class BaseSelector(ABC):
|
||||
search_command: The command to execute for searching/reloading results.
|
||||
preview: An optional command or string for a preview window.
|
||||
header: An optional header to display above the choices.
|
||||
initial_query: An optional initial query to pre-populate the search.
|
||||
initial_results: Optional list of results to display initially (avoids network request).
|
||||
|
||||
Returns:
|
||||
The string of the chosen item.
|
||||
|
||||
@@ -117,26 +117,42 @@ class FzfSelector(BaseSelector):
|
||||
lines = result.stdout.strip().splitlines()
|
||||
return lines[-1] if lines else (default or "")
|
||||
|
||||
def search(self, prompt, search_command, *, preview=None, header=None):
|
||||
def search(self, prompt, search_command, *, preview=None, header=None, initial_query=None, initial_results=None):
|
||||
"""Enhanced search using fzf's --reload flag for dynamic search."""
|
||||
# Build the header with optional custom header line
|
||||
display_header = self.header
|
||||
if header:
|
||||
display_header = f"{self.header}\n{header}"
|
||||
|
||||
commands = [
|
||||
self.executable,
|
||||
"--prompt",
|
||||
f"{prompt.title()}: ",
|
||||
"--header",
|
||||
self.header,
|
||||
display_header,
|
||||
"--header-first",
|
||||
"--disabled", # Disable local filtering - rely on external search command
|
||||
"--bind",
|
||||
f"change:reload({search_command})",
|
||||
"--ansi",
|
||||
]
|
||||
|
||||
# If there's an initial query, set it
|
||||
if initial_query:
|
||||
commands.extend(["--query", initial_query])
|
||||
# Only trigger reload on start if we don't have cached results
|
||||
if not initial_results:
|
||||
commands.extend(["--bind", f"start:reload({search_command})"])
|
||||
|
||||
if preview:
|
||||
commands.extend(["--preview", preview])
|
||||
|
||||
# Use cached results as initial input if provided (avoids network request)
|
||||
fzf_input = "\n".join(initial_results) if initial_results else ""
|
||||
|
||||
result = subprocess.run(
|
||||
commands,
|
||||
input="",
|
||||
input=fzf_input,
|
||||
stdout=subprocess.PIPE,
|
||||
text=True,
|
||||
encoding="utf-8",
|
||||
|
||||
Reference in New Issue
Block a user