feat: implement dynamic search filter parser and enhance search script with filter syntax

This commit is contained in:
benexl
2025-12-31 17:59:04 +03:00
parent 0524af6e26
commit 5db33d2fa0
4 changed files with 429 additions and 25 deletions

View File

@@ -0,0 +1,323 @@
#!/usr/bin/env python3
"""
Filter Parser for Dynamic Search
This module provides a parser for the special filter syntax used in dynamic search.
Filter syntax allows users to add filters inline with their search query.
SYNTAX:
@filter:value - Apply a filter with the given value
@filter:value1,value2 - Apply multiple values (for array filters)
@filter:!value - Exclude/negate a filter value
SUPPORTED FILTERS:
@genre:action,comedy - Filter by genres
@genre:!hentai - Exclude genre
@status:airing - Filter by status (airing, finished, upcoming, cancelled, hiatus)
@year:2024 - Filter by season year
@season:winter - Filter by season (winter, spring, summer, fall)
@format:tv,movie - Filter by format (tv, movie, ova, ona, special, music)
@sort:score - Sort by (score, popularity, trending, title, date)
@score:>80 - Minimum score
@score:<50 - Maximum score
@popularity:>10000 - Minimum popularity
@onlist - Only show anime on user's list
@onlist:false - Only show anime NOT on user's list
EXAMPLES:
"naruto @genre:action @status:finished"
"isekai @year:2024 @season:winter @sort:score"
"@genre:action,adventure @status:airing"
"romance @genre:!hentai @format:tv,movie"
"""
import re
from typing import Any, Dict, List, Optional, Tuple
# Mapping of user-friendly filter names to GraphQL variable names
FILTER_ALIASES = {
# Status aliases
"airing": "RELEASING",
"releasing": "RELEASING",
"finished": "FINISHED",
"completed": "FINISHED",
"upcoming": "NOT_YET_RELEASED",
"not_yet_released": "NOT_YET_RELEASED",
"unreleased": "NOT_YET_RELEASED",
"cancelled": "CANCELLED",
"canceled": "CANCELLED",
"hiatus": "HIATUS",
"paused": "HIATUS",
# Format aliases
"tv": "TV",
"tv_short": "TV_SHORT",
"tvshort": "TV_SHORT",
"movie": "MOVIE",
"film": "MOVIE",
"ova": "OVA",
"ona": "ONA",
"special": "SPECIAL",
"music": "MUSIC",
# Season aliases
"winter": "WINTER",
"spring": "SPRING",
"summer": "SUMMER",
"fall": "FALL",
"autumn": "FALL",
# Sort aliases
"score": "SCORE_DESC",
"score_desc": "SCORE_DESC",
"score_asc": "SCORE",
"popularity": "POPULARITY_DESC",
"popularity_desc": "POPULARITY_DESC",
"popularity_asc": "POPULARITY",
"trending": "TRENDING_DESC",
"trending_desc": "TRENDING_DESC",
"trending_asc": "TRENDING",
"title": "TITLE_ROMAJI",
"title_desc": "TITLE_ROMAJI_DESC",
"date": "START_DATE_DESC",
"date_desc": "START_DATE_DESC",
"date_asc": "START_DATE",
"newest": "START_DATE_DESC",
"oldest": "START_DATE",
"favourites": "FAVOURITES_DESC",
"favorites": "FAVOURITES_DESC",
"episodes": "EPISODES_DESC",
}
# Genre name normalization (lowercase -> proper case)
GENRE_NAMES = {
"action": "Action",
"adventure": "Adventure",
"comedy": "Comedy",
"drama": "Drama",
"ecchi": "Ecchi",
"fantasy": "Fantasy",
"horror": "Horror",
"mahou_shoujo": "Mahou Shoujo",
"mahou": "Mahou Shoujo",
"magical_girl": "Mahou Shoujo",
"mecha": "Mecha",
"music": "Music",
"mystery": "Mystery",
"psychological": "Psychological",
"romance": "Romance",
"sci-fi": "Sci-Fi",
"scifi": "Sci-Fi",
"sci_fi": "Sci-Fi",
"slice_of_life": "Slice of Life",
"sol": "Slice of Life",
"sports": "Sports",
"supernatural": "Supernatural",
"thriller": "Thriller",
"hentai": "Hentai",
}
# Filter pattern: @key:value or @key (boolean flags)
FILTER_PATTERN = re.compile(r"@(\w+)(?::([^\s]+))?", re.IGNORECASE)
# Comparison operators for numeric filters
COMPARISON_PATTERN = re.compile(r"^([<>]=?)?(\d+)$")
def normalize_value(value: str, value_type: str) -> str:
"""Normalize a filter value based on its type."""
value_lower = value.lower().strip()
if value_type == "genre":
return GENRE_NAMES.get(value_lower, value.title())
elif value_type in ("status", "format", "season", "sort"):
return FILTER_ALIASES.get(value_lower, value.upper())
return value
def parse_value_list(value_str: str) -> Tuple[List[str], List[str]]:
"""
Parse a comma-separated value string, separating includes from excludes.
Returns:
Tuple of (include_values, exclude_values)
"""
includes = []
excludes = []
for val in value_str.split(","):
val = val.strip()
if not val:
continue
if val.startswith("!"):
excludes.append(val[1:])
else:
includes.append(val)
return includes, excludes
def parse_comparison(value: str) -> Tuple[Optional[str], Optional[int]]:
"""
Parse a comparison value like ">80" or "<50".
Returns:
Tuple of (operator, number) or (None, None) if invalid
"""
match = COMPARISON_PATTERN.match(value)
if match:
operator = match.group(1) or ">" # Default to greater than
number = int(match.group(2))
return operator, number
return None, None
def parse_filters(query: str) -> Tuple[str, Dict[str, Any]]:
"""
Parse a search query and extract filter directives.
Args:
query: The full search query including filter syntax
Returns:
Tuple of (clean_query, filters_dict)
- clean_query: The query with filter syntax removed
- filters_dict: Dictionary of GraphQL variables to apply
"""
filters: Dict[str, Any] = {}
# Find all filter matches
matches = list(FILTER_PATTERN.finditer(query))
for match in matches:
filter_name = match.group(1).lower()
filter_value = match.group(2) # May be None for boolean flags
# Handle different filter types
if filter_name == "genre":
if filter_value:
includes, excludes = parse_value_list(filter_value)
if includes:
normalized = [normalize_value(v, "genre") for v in includes]
filters.setdefault("genre_in", []).extend(normalized)
if excludes:
normalized = [normalize_value(v, "genre") for v in excludes]
filters.setdefault("genre_not_in", []).extend(normalized)
elif filter_name == "status":
if filter_value:
includes, excludes = parse_value_list(filter_value)
if includes:
normalized = [normalize_value(v, "status") for v in includes]
filters.setdefault("status_in", []).extend(normalized)
if excludes:
normalized = [normalize_value(v, "status") for v in excludes]
filters.setdefault("status_not_in", []).extend(normalized)
elif filter_name == "format":
if filter_value:
includes, _ = parse_value_list(filter_value)
if includes:
normalized = [normalize_value(v, "format") for v in includes]
filters.setdefault("format_in", []).extend(normalized)
elif filter_name == "year":
if filter_value:
try:
filters["seasonYear"] = int(filter_value)
except ValueError:
pass # Invalid year, skip
elif filter_name == "season":
if filter_value:
filters["season"] = normalize_value(filter_value, "season")
elif filter_name == "sort":
if filter_value:
sort_val = normalize_value(filter_value, "sort")
filters["sort"] = [sort_val]
elif filter_name == "score":
if filter_value:
op, num = parse_comparison(filter_value)
if num is not None:
if op in (">", ">="):
filters["averageScore_greater"] = num
elif op in ("<", "<="):
filters["averageScore_lesser"] = num
elif filter_name == "popularity":
if filter_value:
op, num = parse_comparison(filter_value)
if num is not None:
if op in (">", ">="):
filters["popularity_greater"] = num
elif op in ("<", "<="):
filters["popularity_lesser"] = num
elif filter_name == "onlist":
if filter_value is None or filter_value.lower() in ("true", "yes", "1"):
filters["on_list"] = True
elif filter_value.lower() in ("false", "no", "0"):
filters["on_list"] = False
elif filter_name == "tag":
if filter_value:
includes, excludes = parse_value_list(filter_value)
if includes:
# Tags use title case typically
normalized = [v.replace("_", " ").title() for v in includes]
filters.setdefault("tag_in", []).extend(normalized)
if excludes:
normalized = [v.replace("_", " ").title() for v in excludes]
filters.setdefault("tag_not_in", []).extend(normalized)
# Remove filter syntax from query to get clean search text
clean_query = FILTER_PATTERN.sub("", query).strip()
# Clean up multiple spaces
clean_query = re.sub(r"\s+", " ", clean_query).strip()
return clean_query, filters
def get_help_text() -> str:
"""Return a help string describing the filter syntax."""
return """
╭─────────────────── Filter Syntax Help ───────────────────╮
│ │
│ @genre:action,comedy Filter by genres │
│ @genre:!hentai Exclude genre │
│ @status:airing Status: airing, finished, │
│ upcoming, cancelled, hiatus │
│ @year:2024 Filter by year │
│ @season:winter winter, spring, summer, fall │
│ @format:tv,movie tv, movie, ova, ona, special │
│ @sort:score score, popularity, trending, │
│ date, title, newest, oldest │
│ @score:>80 Minimum score │
│ @score:<50 Maximum score │
│ @popularity:>10000 Minimum popularity │
│ @onlist Only on your list │
│ @onlist:false Not on your list │
│ @tag:isekai,reincarnation Filter by tags │
│ │
│ Examples: │
│ naruto @genre:action @status:finished │
│ @genre:action,adventure @year:2024 @sort:score │
│ isekai @season:winter @year:2024 │
│ │
╰──────────────────────────────────────────────────────────╯
""".strip()
if __name__ == "__main__":
# Test the parser
import json
import sys
if len(sys.argv) > 1:
test_query = " ".join(sys.argv[1:])
clean, filters = parse_filters(test_query)
print(f"Original: {test_query}")
print(f"Clean query: {clean}")
print(f"Filters: {json.dumps(filters, indent=2)}")
else:
print(get_help_text())

View File

@@ -5,6 +5,18 @@
# This script is a template for dynamic search functionality in fzf.
# The placeholders in curly braces, like {GRAPHQL_ENDPOINT} are dynamically
# filled by Python using .replace() during runtime.
#
# FILTER SYNTAX:
# @genre:action,comedy Filter by genres
# @genre:!hentai Exclude genre
# @status:airing Status: airing, finished, upcoming, cancelled, hiatus
# @year:2024 Filter by year
# @season:winter winter, spring, summer, fall
# @format:tv,movie tv, movie, ova, ona, special
# @sort:score score, popularity, trending, date, title
# @score:>80 / @score:<50 Min/max score
# @onlist / @onlist:false Filter by list status
# @tag:isekai Filter by tags
import json
import sys
@@ -12,6 +24,9 @@ from pathlib import Path
from urllib import request
from urllib.error import URLError
# Import the filter parser
from _filter_parser import parse_filters
# --- Template Variables (Injected by Python) ---
GRAPHQL_ENDPOINT = "{GRAPHQL_ENDPOINT}"
SEARCH_RESULTS_FILE = Path("{SEARCH_RESULTS_FILE}")
@@ -22,17 +37,29 @@ GRAPHQL_QUERY = "{GRAPHQL_QUERY}"
# --- Get Query from fzf ---
# fzf passes the current query as the first argument when using --bind change:reload
QUERY = sys.argv[1] if len(sys.argv) > 1 else ""
RAW_QUERY = sys.argv[1] if len(sys.argv) > 1 else ""
# If query is empty, exit with empty results
if not QUERY.strip():
print("")
# Parse the query to extract filters and clean search text
QUERY, PARSED_FILTERS = parse_filters(RAW_QUERY)
# If query is empty and no filters, show help hint
if not RAW_QUERY.strip():
print("💡 Tip: Use @genre:action @status:airing for filters (type @help for syntax)")
sys.exit(0)
# Show filter help if requested
if RAW_QUERY.strip().lower() in ("@help", "@?", "@h"):
from _filter_parser import get_help_text
print(get_help_text())
sys.exit(0)
# If we only have filters (no search text), that's valid - we'll search with filters only
# But if we have neither query nor filters, we already showed the help hint above
def make_graphql_request(
endpoint: str, query: str, variables: dict, auth_token: str = ""
) -> dict | None:
) -> tuple[dict | None, str | None]:
"""
Make a GraphQL request to the specified endpoint.
@@ -43,7 +70,7 @@ def make_graphql_request(
auth_token: Optional authorization token (Bearer token)
Returns:
Response JSON as a dictionary, or None if request fails
Tuple of (Response JSON, error message) - one will be None
"""
payload = {"query": query, "variables": variables}
@@ -61,10 +88,13 @@ def make_graphql_request(
)
with request.urlopen(req, timeout=10) as response:
return json.loads(response.read().decode("utf-8"))
except (URLError, json.JSONDecodeError, Exception) as e:
print(f"❌ Request failed: {e}", file=sys.stderr)
return None
return json.loads(response.read().decode("utf-8")), None
except URLError as e:
return None, f"Network error: {e.reason}"
except json.JSONDecodeError as e:
return None, f"Invalid response: {e}"
except Exception as e:
return None, f"Request error: {e}"
def extract_title(media_item: dict) -> str:
@@ -90,34 +120,64 @@ def main():
# Ensure parent directory exists
SEARCH_RESULTS_FILE.parent.mkdir(parents=True, exist_ok=True)
# Create GraphQL variables
# Base GraphQL variables
variables = {
"query": QUERY,
"type": "ANIME",
"per_page": 50,
"genre_not_in": ["Hentai"],
"genre_not_in": ["Hentai"], # Default exclusion
}
# Add search query if provided
if QUERY:
variables["query"] = QUERY
# Apply parsed filters from the filter syntax
for key, value in PARSED_FILTERS.items():
# Handle array merging for _in and _not_in fields
if key.endswith("_in") or key.endswith("_not_in"):
if key in variables:
# Merge arrays, avoiding duplicates
existing = set(variables[key])
existing.update(value)
variables[key] = list(existing)
else:
variables[key] = value
else:
variables[key] = value
# Make the GraphQL request
response = make_graphql_request(
response, error = make_graphql_request(
GRAPHQL_ENDPOINT, GRAPHQL_QUERY, variables, AUTH_HEADER
)
if response is None:
print("Search failed")
if error:
print(f"{error}")
# Also show what we tried to search for debugging
print(f" Query: {QUERY or '(none)'}")
print(f" Filters: {json.dumps(PARSED_FILTERS) if PARSED_FILTERS else '(none)'}")
sys.exit(1)
if response is None:
print("❌ Search failed: No response received")
sys.exit(1)
# Check for GraphQL errors first (these come in the response body)
if "errors" in response:
errors = response["errors"]
if errors:
# Extract error messages
error_msgs = [e.get("message", str(e)) for e in errors]
print(f"❌ API Error: {'; '.join(error_msgs)}")
# Show variables for debugging
print(f" Filters used: {json.dumps(PARSED_FILTERS, indent=2) if PARSED_FILTERS else '(none)'}")
sys.exit(1)
# Save the raw response for later processing by dynamic_search.py
try:
with open(SEARCH_RESULTS_FILE, "w", encoding="utf-8") as f:
json.dump(response, f, ensure_ascii=False, indent=2)
except IOError as e:
print(f"❌ Failed to save results: {e}", file=sys.stderr)
sys.exit(1)
# Parse and display results
if "errors" in response:
print(f"❌ Search error: {response['errors']}")
print(f"❌ Failed to save results: {e}")
sys.exit(1)
# Navigate the response structure
@@ -126,7 +186,9 @@ def main():
media_list = page.get("media", [])
if not media_list:
print(" No results found")
print("🔍 No results found")
if PARSED_FILTERS:
print(f" Try adjusting your filters")
sys.exit(0)
# Output titles for fzf (one per line)
@@ -141,5 +203,5 @@ if __name__ == "__main__":
except KeyboardInterrupt:
sys.exit(0)
except Exception as e:
print(f"❌ Unexpected error: {e}", file=sys.stderr)
print(f"❌ Unexpected error: {type(e).__name__}: {e}")
sys.exit(1)

View File

@@ -1,5 +1,6 @@
import json
import logging
import shutil
from pathlib import Path
from .....core.constants import APP_CACHE_DIR, SCRIPTS_DIR
@@ -14,6 +15,7 @@ SEARCH_CACHE_DIR = APP_CACHE_DIR / "previews" / "dynamic-search"
SEARCH_RESULTS_FILE = SEARCH_CACHE_DIR / "current_search_results.json"
FZF_SCRIPTS_DIR = SCRIPTS_DIR / "fzf"
SEARCH_TEMPLATE_SCRIPT = (FZF_SCRIPTS_DIR / "search.py").read_text(encoding="utf-8")
FILTER_PARSER_SCRIPT = FZF_SCRIPTS_DIR / "_filter_parser.py"
@session.menu
@@ -54,12 +56,21 @@ def dynamic_search(ctx: Context, state: State) -> State | InternalDirective:
search_script_file = SEARCH_CACHE_DIR / "search.py"
search_script_file.write_text(search_command, encoding="utf-8")
# Copy the filter parser module to the cache directory
# This is required for the search script to import it
filter_parser_dest = SEARCH_CACHE_DIR / "_filter_parser.py"
if FILTER_PARSER_SCRIPT.exists():
shutil.copy2(FILTER_PARSER_SCRIPT, filter_parser_dest)
# Make the search script executable by calling it with python3
# fzf will pass the query as {q} which becomes the first argument
search_command_final = (
f"{Path(get_python_executable()).as_posix()} {search_script_file.as_posix()} {{q}}"
)
# Header hint for filter syntax
filter_hint = "💡 Filters: @genre:action @status:airing @year:2024 @sort:score (type @help for more)"
try:
# Prepare preview functionality
preview_command = None
@@ -73,11 +84,13 @@ def dynamic_search(ctx: Context, state: State) -> State | InternalDirective:
prompt="Search Anime",
search_command=search_command_final,
preview=preview_command,
header=filter_hint,
)
else:
choice = ctx.selector.search(
prompt="Search Anime",
search_command=search_command_final,
header=filter_hint,
)
except NotImplementedError:
feedback.error("Dynamic search is not supported by your current selector")

View File

@@ -119,13 +119,19 @@ class FzfSelector(BaseSelector):
def search(self, prompt, search_command, *, preview=None, header=None):
"""Enhanced search using fzf's --reload flag for dynamic search."""
# Build the header with optional custom header line
display_header = self.header
if header:
display_header = f"{self.header}\n{header}"
commands = [
self.executable,
"--prompt",
f"{prompt.title()}: ",
"--header",
self.header,
display_header,
"--header-first",
"--disabled", # Disable local filtering - rely on external search command
"--bind",
f"change:reload({search_command})",
"--ansi",