From a2da6974fa0ad931665fd2b1c86c0ff77f2db5ae Mon Sep 17 00:00:00 2001 From: Benexl Date: Sun, 13 Jul 2025 01:39:52 +0300 Subject: [PATCH] feat: cli search --- fastanime/cli/cli.py | 1 + fastanime/cli/commands/__init__.py | 3 +- fastanime/cli/commands/search.py | 474 +++++++++-------------------- fastanime/libs/players/player.py | 4 +- 4 files changed, 156 insertions(+), 326 deletions(-) diff --git a/fastanime/cli/cli.py b/fastanime/cli/cli.py index c07e449..f808579 100644 --- a/fastanime/cli/cli.py +++ b/fastanime/cli/cli.py @@ -11,6 +11,7 @@ from .utils.logging import setup_logging commands = { "config": ".config", + "search": ".search", } diff --git a/fastanime/cli/commands/__init__.py b/fastanime/cli/commands/__init__.py index abcccd3..26d4f94 100644 --- a/fastanime/cli/commands/__init__.py +++ b/fastanime/cli/commands/__init__.py @@ -1,3 +1,4 @@ from .config import config +from .search import search -__all__ = ["config"] +__all__ = ["config", "search"] diff --git a/fastanime/cli/commands/search.py b/fastanime/cli/commands/search.py index f644145..e0afd35 100644 --- a/fastanime/cli/commands/search.py +++ b/fastanime/cli/commands/search.py @@ -1,11 +1,24 @@ from typing import TYPE_CHECKING import click +from fastanime.core.exceptions import FastAnimeError +from ...core.config import AppConfig from ..utils.completion_functions import anime_titles_shell_complete if TYPE_CHECKING: - from ...cli.config import Config + from typing import TypedDict + + from typing_extensions import Unpack + + from ...libs.players.base import BasePlayer + from ...libs.providers.anime.base import BaseAnimeProvider + from ...libs.providers.anime.types import Anime + from ...libs.selectors.base import BaseSelector + + class Options(TypedDict): + anime_title: list[str] + episode_range: str | None @click.command( @@ -36,8 +49,7 @@ if TYPE_CHECKING: """, ) @click.option( - "--anime-titles", - "--anime_title", + "--anime-title", "-t", required=True, shell_complete=anime_titles_shell_complete, @@ -50,341 +62,157 @@ if TYPE_CHECKING: help="A range of episodes to binge (start-end)", ) @click.pass_obj -def search(config: "Config", anime_titles: str, episode_range: str): - from click import clear +def search(config: AppConfig, **options: "Unpack[Options]"): from rich import print from rich.progress import Progress - from thefuzz import fuzz - from ...libs.fzf import fzf - from ...libs.rofi import Rofi - from ..utils.tools import exit_app - from ..utils.utils import fuzzy_inquirer + from ...core.exceptions import FastAnimeError + from ...libs.players.player import create_player + from ...libs.providers.anime.params import ( + AnimeParams, + SearchParams, + ) + from ...libs.providers.anime.provider import create_provider + from ...libs.selectors.selector import create_selector - if config.manga: - from InquirerPy.prompts.number import NumberPrompt - from yt_dlp.utils import sanitize_filename + provider = create_provider(config.general.provider) + player = create_player(config) + selector = create_selector(config) - from ...MangaProvider import MangaProvider - from ..utils.feh import feh_manga_viewer - from ..utils.icat import icat_manga_viewer + anime_titles = options["anime_title"] + print(f"[green bold]Streaming:[/] {anime_titles}") + for anime_title in anime_titles: + # ---- search for anime ---- + print(f"[green bold]Searching for:[/] {anime_title}") + with Progress() as progress: + progress.add_task("Fetching Search Results...", total=None) + search_results = provider.search( + SearchParams( + query=anime_title, translation_type=config.stream.translation_type + ) + ) + if not search_results: + raise FastAnimeError("No results were found matching your query") - manga_title = anime_titles[0] - - manga_provider = MangaProvider() - search_data = manga_provider.search_for_manga(manga_title) - if not search_data: - print("No search results") - exit(1) - - search_results = search_data["results"] - - search_results_ = { - sanitize_filename(search_result["title"]): search_result - for search_result in search_results + _search_results = { + search_result.title: search_result + for search_result in search_results.results } - if config.auto_select: - search_result_manga_title = max( - search_results_.keys(), - key=lambda title: fuzz.ratio(title, manga_title), - ) - print("[cyan]Auto Selecting:[/] ", search_result_manga_title) + selected_anime_title = selector.choose( + "Select Anime", list(_search_results.keys()) + ) + if not selected_anime_title: + raise FastAnimeError("No title selected") + anime_result = _search_results[selected_anime_title] + # ---- fetch selected anime ---- + with Progress() as progress: + progress.add_task("Fetching Anime...", total=None) + anime = provider.get(AnimeParams(id=anime_result.id)) + + if not anime: + raise FastAnimeError(f"Failed to fetch anime {anime_result.title}") + episodes_range = [] + episodes: list[str] = sorted( + getattr(anime.episodes, config.stream.translation_type), key=float + ) + if options["episode_range"]: + if ":" in options["episode_range"]: + ep_range_tuple = options["episode_range"].split(":") + if len(ep_range_tuple) == 3 and all(ep_range_tuple): + episodes_start, episodes_end, step = ep_range_tuple + episodes_range = episodes[ + int(episodes_start) : int(episodes_end) : int(step) + ] + + elif len(ep_range_tuple) == 2 and all(ep_range_tuple): + episodes_start, episodes_end = ep_range_tuple + episodes_range = episodes[int(episodes_start) : int(episodes_end)] + else: + episodes_start, episodes_end = ep_range_tuple + if episodes_start.strip(): + episodes_range = episodes[int(episodes_start) :] + elif episodes_end.strip(): + episodes_range = episodes[: int(episodes_end)] + else: + episodes_range = episodes + else: + episodes_range = episodes[int(options["episode_range"]) :] + + episodes_range = iter(episodes_range) + + for episode in episodes_range: + stream_anime(config, provider, selector, player, anime, episode) else: - choices = list(search_results_.keys()) - preview = None - if config.preview: - from ..interfaces.utils import get_fzf_manga_preview + episode = selector.choose( + "Select Episode", + getattr(anime.episodes, config.stream.translation_type), + ) + if not episode: + raise FastAnimeError("No episode selected") + stream_anime(config, provider, selector, player, anime, episode) - preview = get_fzf_manga_preview(search_results) - if config.use_fzf: - search_result_manga_title = fzf.run( - choices, "Please Select title", preview=preview - ) - elif config.use_rofi: - search_result_manga_title = Rofi.run(choices, "Please Select Title") - else: - search_result_manga_title = fuzzy_inquirer( - choices, - "Please Select Title", - ) - anilist_id = search_results_[search_result_manga_title]["id"] - manga_info = manga_provider.get_manga(anilist_id) - if not manga_info: - print("No manga info") - exit(1) +def stream_anime( + config: AppConfig, + provider: "BaseAnimeProvider", + selector: "BaseSelector", + player: "BasePlayer", + anime: "Anime", + episode: str, +): + from rich import print + from rich.progress import Progress - anilist_helper = None - if config.user: - from ...anilist import AniList + from ...libs.players.params import PlayerParams + from ...libs.providers.anime.params import EpisodeStreamsParams - AniList.login_user(config.user["token"]) - anilist_helper = AniList - - def _manga_viewer(): - chapter_number = NumberPrompt("Select a chapter number").execute() - chapter_info = manga_provider.get_chapter_thumbnails( - manga_info["id"], str(chapter_number) + with Progress() as progress: + progress.add_task("Fetching Episode Streams...", total=None) + streams = provider.episode_streams( + EpisodeStreamsParams( + anime_id=anime.id, + episode=episode, + translation_type=config.stream.translation_type, + ) + ) + if not streams: + raise FastAnimeError( + f"Failed to get streams for anime: {anime.title}, episode: {episode}" ) - if not chapter_info: - print("No chapter info") - input("Enter to retry...") - _manga_viewer() - return - print( - f"[purple bold]Now Reading: [/] {search_result_manga_title} [cyan bold]Chapter:[/] {chapter_info['title']}" - ) - if config.manga_viewer == "feh": - feh_manga_viewer(chapter_info["thumbnails"], str(chapter_info["title"])) - elif config.manga_viewer == "icat": - icat_manga_viewer( - chapter_info["thumbnails"], str(chapter_info["title"]) + if config.stream.server == "TOP": + with Progress() as progress: + progress.add_task("Fetching top server...", total=None) + server = next(streams, None) + if not server: + raise FastAnimeError( + f"Failed to get server for anime: {anime.title}, episode: {episode}" ) - if anilist_helper: - anilist_helper.update_anime_list( - {"mediaId": anilist_id, "progress": chapter_number} - ) - _manga_viewer() - - _manga_viewer() else: - from ...BaseAnimeProvider import BaseAnimeProvider - from ...libs.anime_provider.types import Anime - from ...Utility.data import anime_normalizer - from ..utils.mpv import run_mpv - from ..utils.utils import filter_by_quality, move_preferred_subtitle_lang_to_top - - anime_provider = BaseAnimeProvider(config.provider) - anilist_anime_info = None - - print(f"[green bold]Streaming:[/] {anime_titles}") - for anime_title in anime_titles: - # ---- search for anime ---- - with Progress() as progress: - progress.add_task("Fetching Search Results...", total=None) - search_results = anime_provider.search_for_anime( - anime_title, config.translation_type - ) - if not search_results: - print("Search results not found") - input("Enter to retry") - search(config, anime_title, episode_range) - return - search_results = search_results["results"] - if not search_results: - print("Anime not found :cry:") - exit_app() - search_results_ = { - search_result["title"]: search_result - for search_result in search_results - } - - if config.auto_select: - search_result_manga_title = max( - search_results_.keys(), - key=lambda title: fuzz.ratio( - anime_normalizer.get(title, title), anime_title - ), - ) - print("[cyan]Auto Selecting:[/] ", search_result_manga_title) - - else: - choices = list(search_results_.keys()) - if config.use_fzf: - search_result_manga_title = fzf.run( - choices, "Please Select title", "FastAnime" - ) - elif config.use_rofi: - search_result_manga_title = Rofi.run(choices, "Please Select Title") - else: - search_result_manga_title = fuzzy_inquirer( - choices, - "Please Select Title", - ) - - # ---- fetch selected anime ---- - with Progress() as progress: - progress.add_task("Fetching Anime...", total=None) - anime: Anime | None = anime_provider.get_anime( - search_results_[search_result_manga_title]["id"] - ) - - if not anime: - print("Sth went wring anime no found") - input("Enter to continue...") - search(config, anime_title, episode_range) - return - episodes_range = [] - episodes: list[str] = sorted( - anime["availableEpisodesDetail"][config.translation_type], key=float - ) - if episode_range: - if ":" in episode_range: - ep_range_tuple = episode_range.split(":") - if len(ep_range_tuple) == 3 and all(ep_range_tuple): - episodes_start, episodes_end, step = ep_range_tuple - episodes_range = episodes[ - int(episodes_start) : int(episodes_end) : int(step) - ] - - elif len(ep_range_tuple) == 2 and all(ep_range_tuple): - episodes_start, episodes_end = ep_range_tuple - episodes_range = episodes[ - int(episodes_start) : int(episodes_end) - ] - else: - episodes_start, episodes_end = ep_range_tuple - if episodes_start.strip(): - episodes_range = episodes[int(episodes_start) :] - elif episodes_end.strip(): - episodes_range = episodes[: int(episodes_end)] - else: - episodes_range = episodes - else: - episodes_range = episodes[int(episode_range) :] - - episodes_range = iter(episodes_range) - - if config.normalize_titles: - from ...libs.common.mini_anilist import get_basic_anime_info_by_title - - anilist_anime_info = get_basic_anime_info_by_title(anime["title"]) - - def stream_anime(anime: "Anime"): - clear() - episode = None - - if episodes_range: - try: - episode = next(episodes_range) # pyright:ignore - print( - f"[cyan]Auto selecting:[/] {search_result_manga_title} [cyan]Episode:[/] {episode}" - ) - except StopIteration: - print("[green]Completed binge sequence[/]:smile:") - return - - if not episode or episode not in episodes: - choices = [*episodes, "end"] - if config.use_fzf: - episode = fzf.run( - choices, - "Select an episode", - header=search_result_manga_title, - ) - elif config.use_rofi: - episode = Rofi.run(choices, "Select an episode") - else: - episode = fuzzy_inquirer( - choices, - "Select episode", - ) - if episode == "end": - return - - # ---- fetch streams ---- - with Progress() as progress: - progress.add_task("Fetching Episode Streams...", total=None) - streams = anime_provider.get_episode_streams( - anime["id"], episode, config.translation_type - ) - if not streams: - print("Failed to get streams") - return - - try: - # ---- fetch servers ---- - if config.server == "top": - with Progress() as progress: - progress.add_task("Fetching top server...", total=None) - server = next(streams, None) - if not server: - print("Sth went wrong when fetching the episode") - input("Enter to continue") - stream_anime(anime) - return - stream_link = filter_by_quality(config.quality, server["links"]) - if not stream_link: - print("Quality not found") - input("Enter to continue") - stream_anime(anime) - return - link = stream_link["link"] - subtitles = server["subtitles"] - stream_headers = server["headers"] - episode_title = server["episode_title"] - else: - with Progress() as progress: - progress.add_task("Fetching servers", total=None) - # prompt for server selection - servers = {server["server"]: server for server in streams} - servers_names = list(servers.keys()) - if config.server in servers_names: - server = config.server - elif config.use_fzf: - server = fzf.run(servers_names, "Select an link") - elif config.use_rofi: - server = Rofi.run(servers_names, "Select an link") - else: - server = fuzzy_inquirer( - servers_names, - "Select link", - ) - stream_link = filter_by_quality( - config.quality, servers[server]["links"] - ) - if not stream_link: - print("Quality not found") - input("Enter to continue") - stream_anime(anime) - return - link = stream_link["link"] - stream_headers = servers[server]["headers"] - subtitles = servers[server]["subtitles"] - episode_title = servers[server]["episode_title"] - - selected_anime_title = search_result_manga_title - if anilist_anime_info: - selected_anime_title = ( - anilist_anime_info["title"][config.preferred_language] - or anilist_anime_info["title"]["romaji"] - or anilist_anime_info["title"]["english"] - ) - import re - - for episode_detail in anilist_anime_info["episodes"]: - if re.match(f"Episode {episode} ", episode_detail["title"]): - episode_title = episode_detail["title"] - break - print( - f"[purple]Now Playing:[/] {selected_anime_title} Episode {episode}" - ) - subtitles = move_preferred_subtitle_lang_to_top( - subtitles, config.sub_lang - ) - if config.sync_play: - from ..utils.syncplay import SyncPlayer - - SyncPlayer( - link, - episode_title, - headers=stream_headers, - subtitles=subtitles, - ) - else: - run_mpv( - link, - episode_title, - headers=stream_headers, - subtitles=subtitles, - player=config.player, - ) - except IndexError as e: - print(e) - input("Enter to continue") - stream_anime(anime) - - stream_anime(anime) + with Progress() as progress: + progress.add_task("Fetching servers", total=None) + servers = {server.name: server for server in streams} + servers_names = list(servers.keys()) + if config.stream.server in servers_names: + server = servers[config.stream.server] + else: + server_name = selector.choose("Select Server", servers_names) + if not server_name: + raise FastAnimeError("Server not selected") + server = servers[server_name] + stream_link = server.links[0].link + if not stream_link: + raise FastAnimeError( + f"Failed to get stream link for anime: {anime.title}, episode: {episode}" + ) + print(f"[green bold]Now Streaming:[/] {anime.title} Episode: {episode}") + player.play( + PlayerParams( + url=stream_link, + title=f"{anime.title}; Episode {episode}", + subtitles=server.subtitles, # type:ignore + headers=server.headers, + ) + ) diff --git a/fastanime/libs/players/player.py b/fastanime/libs/players/player.py index 8d886fb..38f5cdd 100644 --- a/fastanime/libs/players/player.py +++ b/fastanime/libs/players/player.py @@ -6,12 +6,11 @@ PLAYERS = ["mpv", "vlc", "syncplay"] class PlayerFactory: @staticmethod - def create(player_name: str, config: AppConfig) -> BasePlayer: + def create(config: AppConfig) -> BasePlayer: """ Factory method to create a player instance based on its name. Args: - player_name: The name of the player (e.g., 'mpv', 'vlc'). config: The full application configuration object. Returns: @@ -20,6 +19,7 @@ class PlayerFactory: Raises: ValueError: If the player_name is not supported. """ + player_name = config.stream.player if player_name not in PLAYERS: raise ValueError(