From 2dbbb1c4df33baa8a1d31dd5ba26c24129e023c4 Mon Sep 17 00:00:00 2001 From: Benex254 Date: Fri, 23 Aug 2024 11:19:25 +0300 Subject: [PATCH] feat: add experimental manga support --- fastanime/MangaProvider.py | 105 ++++ fastanime/cli/__init__.py | 3 + fastanime/cli/commands/grab.py | 225 +++++---- fastanime/cli/commands/search.py | 475 +++++++++++------- fastanime/cli/config.py | 1 + fastanime/cli/interfaces/utils.py | 59 ++- fastanime/cli/utils/feh.py | 12 + fastanime/libs/anilist/types.py | 1 + fastanime/libs/anime_provider/mini_anilist.py | 153 ------ fastanime/libs/common/common.py | 15 + fastanime/libs/common/mini_anilist.py | 61 ++- fastanime/libs/manga_provider/__init__.py | 1 + .../libs/manga_provider/base_provider.py | 13 + fastanime/libs/manga_provider/common.py | 15 + .../libs/manga_provider/mangadex/__init__.py | 0 fastanime/libs/manga_provider/mangadex/api.py | 51 ++ 16 files changed, 756 insertions(+), 434 deletions(-) create mode 100644 fastanime/MangaProvider.py create mode 100644 fastanime/cli/utils/feh.py delete mode 100644 fastanime/libs/anime_provider/mini_anilist.py create mode 100644 fastanime/libs/common/common.py create mode 100644 fastanime/libs/manga_provider/__init__.py create mode 100644 fastanime/libs/manga_provider/base_provider.py create mode 100644 fastanime/libs/manga_provider/common.py create mode 100644 fastanime/libs/manga_provider/mangadex/__init__.py create mode 100644 fastanime/libs/manga_provider/mangadex/api.py diff --git a/fastanime/MangaProvider.py b/fastanime/MangaProvider.py new file mode 100644 index 0000000..0ff8c9a --- /dev/null +++ b/fastanime/MangaProvider.py @@ -0,0 +1,105 @@ +"""An abstraction over all providers offering added features with a simple and well typed api + +[TODO:description] +""" + +import importlib +import logging +from typing import TYPE_CHECKING + +from .libs.manga_provider import manga_sources + +if TYPE_CHECKING: + pass + + +logger = logging.getLogger(__name__) + + +class MangaProvider: + """Class that manages all anime sources adding some extra functionality to them. + Attributes: + PROVIDERS: [TODO:attribute] + provider: [TODO:attribute] + provider: [TODO:attribute] + dynamic: [TODO:attribute] + retries: [TODO:attribute] + manga_provider: [TODO:attribute] + """ + + PROVIDERS = list(manga_sources.keys()) + provider = PROVIDERS[0] + + def __init__(self, provider="mangadex", dynamic=False, retries=0) -> None: + self.provider = provider + self.dynamic = dynamic + self.retries = retries + self.lazyload_provider(self.provider) + + def lazyload_provider(self, provider): + """updates the current provider being used""" + _, anime_provider_cls_name = manga_sources[provider].split(".", 1) + package = f"fastanime.libs.manga_provider.{provider}" + provider_api = importlib.import_module(".api", package) + manga_provider = getattr(provider_api, anime_provider_cls_name) + self.manga_provider = manga_provider() + + def search_for_manga( + self, + user_query, + nsfw=True, + unknown=True, + ): + """core abstraction over all providers search functionality + + Args: + user_query ([TODO:parameter]): [TODO:description] + translation_type ([TODO:parameter]): [TODO:description] + nsfw ([TODO:parameter]): [TODO:description] + manga_provider ([TODO:parameter]): [TODO:description] + anilist_obj: [TODO:description] + + Returns: + [TODO:return] + """ + manga_provider = self.manga_provider + try: + results = manga_provider.search_for_manga(user_query, nsfw, unknown) + except Exception as e: + logger.error(e) + results = None + return results + + def get_manga( + self, + anime_id: str, + ): + """core abstraction over getting info of an anime from all providers + + Args: + anime_id: [TODO:description] + anilist_obj: [TODO:description] + + Returns: + [TODO:return] + """ + manga_provider = self.manga_provider + try: + results = manga_provider.get_manga(anime_id) + except Exception as e: + logger.error(e) + results = None + return results + + def get_chapter_thumbnails( + self, + manga_id: str, + chapter: str, + ): + manga_provider = self.manga_provider + try: + results = manga_provider.get_chapter_thumbnails(manga_id, chapter) + except Exception as e: + logger.error(e) + results = None + return results # pyright:ignore diff --git a/fastanime/cli/__init__.py b/fastanime/cli/__init__.py index ea65240..360bc79 100644 --- a/fastanime/cli/__init__.py +++ b/fastanime/cli/__init__.py @@ -40,6 +40,7 @@ signal.signal(signal.SIGINT, handle_exit) short_help="Stream Anime", ) @click.version_option(__version__, "--version") +@click.option("--manga", "-m", help="Enable manga mode", is_flag=True) @click.option("--log", help="Allow logging to stdout", is_flag=True) @click.option("--log-file", help="Allow logging to a file", is_flag=True) @click.option("--rich-traceback", help="Use rich to output tracebacks", is_flag=True) @@ -150,6 +151,7 @@ signal.signal(signal.SIGINT, handle_exit) @click.pass_context def run_cli( ctx: click.Context, + manga, log, log_file, rich_traceback, @@ -183,6 +185,7 @@ def run_cli( from .config import Config ctx.obj = Config() + ctx.obj.manga = manga if log: import logging diff --git a/fastanime/cli/commands/grab.py b/fastanime/cli/commands/grab.py index 4185212..8854caa 100644 --- a/fastanime/cli/commands/grab.py +++ b/fastanime/cli/commands/grab.py @@ -56,26 +56,19 @@ def grab( from thefuzz import fuzz - from ...AnimeProvider import AnimeProvider - logger = getLogger(__name__) + if config.manga: + manga_title = anime_titles[0] + from ...MangaProvider import MangaProvider - anime_provider = AnimeProvider(config.provider) - - grabbed_animes = [] - for anime_title in anime_titles: - # ---- search for anime ---- - search_results = anime_provider.search_for_anime( - anime_title, translation_type=config.translation_type - ) - if not search_results: + manga_provider = MangaProvider() + search_data = manga_provider.search_for_manga(manga_title) + if not search_data: exit(1) if search_results_only: - # grab only search results skipping all lines after this - grabbed_animes.append(search_results) - continue - - search_results = search_results["results"] + print(json.dumps(search_data)) + exit(0) + search_results = search_data["results"] if not search_results: logger.error("no results for your search") exit(1) @@ -83,83 +76,133 @@ def grab( search_result["title"]: search_result for search_result in search_results } - search_result = max( - search_results_.keys(), key=lambda title: fuzz.ratio(title, anime_title) + search_result_anime_title = max( + search_results_.keys(), key=lambda title: fuzz.ratio(title, anime_titles[0]) ) - - # ---- fetch anime ---- - anime = anime_provider.get_anime(search_results_[search_result]["id"]) - if not anime: - exit(1) + manga_info = manga_provider.get_manga( + search_results_[search_result_anime_title]["id"] + ) + if not manga_info: + return if anime_info_only: - # grab only the anime data skipping all lines after this - grabbed_animes.append(anime) - continue - episodes = sorted( - anime["availableEpisodesDetail"][config.translation_type], key=float + print(json.dumps(manga_info)) + exit(0) + + chapter_info = manga_provider.get_chapter_thumbnails( + manga_info["id"], str(episode_range) ) + if not chapter_info: + exit(1) + print(json.dumps(chapter_info)) - # where the magic happens - if episode_range: - if ":" in episode_range: - ep_range_tuple = episode_range.split(":") - if len(ep_range_tuple) == 2 and all(ep_range_tuple): - episodes_start, episodes_end = ep_range_tuple - episodes_range = episodes[int(episodes_start) : int(episodes_end)] - elif len(ep_range_tuple) == 3 and all(ep_range_tuple): - episodes_start, episodes_end, step = ep_range_tuple - episodes_range = episodes[ - int(episodes_start) : int(episodes_end) : int(step) - ] - else: - episodes_start, episodes_end = ep_range_tuple - if episodes_start.strip(): - episodes_range = episodes[int(episodes_start) :] - elif episodes_end.strip(): - episodes_range = episodes[: int(episodes_end)] - else: - episodes_range = episodes - else: - episodes_range = episodes[int(episode_range) :] - - else: - episodes_range = sorted(episodes, key=float) - - if not episode_streams_only: - grabbed_anime = dict(anime) - grabbed_anime["requested_episodes"] = episodes_range - grabbed_anime["translation_type"] = config.translation_type - grabbed_anime["episodes_streams"] = {} - else: - grabbed_anime = {} - - # lets download em - for episode in episodes_range: - try: - if episode not in episodes: - continue - streams = anime_provider.get_episode_streams( - anime, episode, config.translation_type - ) - if not streams: - continue - episode_streams = {server["server"]: server for server in streams} - - if episode_streams_only: - grabbed_anime[episode] = episode_streams - else: - grabbed_anime["episodes_streams"][ # pyright:ignore - episode - ] = episode_streams - - except Exception as e: - logger.error(e) - - # grab the full data for single title and appen to final result or episode streams - grabbed_animes.append(grabbed_anime) - - # print out the final result either {} or [] depending if more than one title os requested - if len(grabbed_animes) == 1: - print(json.dumps(grabbed_animes[0])) else: - print(json.dumps(grabbed_animes)) + from ...AnimeProvider import AnimeProvider + + anime_provider = AnimeProvider(config.provider) + + grabbed_animes = [] + for anime_title in anime_titles: + # ---- search for anime ---- + search_results = anime_provider.search_for_anime( + anime_title, translation_type=config.translation_type + ) + if not search_results: + exit(1) + if search_results_only: + # grab only search results skipping all lines after this + grabbed_animes.append(search_results) + continue + + search_results = search_results["results"] + if not search_results: + logger.error("no results for your search") + exit(1) + search_results_ = { + search_result["title"]: search_result + for search_result in search_results + } + + search_result_anime_title = max( + search_results_.keys(), key=lambda title: fuzz.ratio(title, anime_title) + ) + + # ---- fetch anime ---- + anime = anime_provider.get_anime( + search_results_[search_result_anime_title]["id"] + ) + if not anime: + exit(1) + if anime_info_only: + # grab only the anime data skipping all lines after this + grabbed_animes.append(anime) + continue + episodes = sorted( + anime["availableEpisodesDetail"][config.translation_type], key=float + ) + + # where the magic happens + if episode_range: + if ":" in episode_range: + ep_range_tuple = episode_range.split(":") + if len(ep_range_tuple) == 2 and all(ep_range_tuple): + episodes_start, episodes_end = ep_range_tuple + episodes_range = episodes[ + int(episodes_start) : int(episodes_end) + ] + elif len(ep_range_tuple) == 3 and all(ep_range_tuple): + episodes_start, episodes_end, step = ep_range_tuple + episodes_range = episodes[ + int(episodes_start) : int(episodes_end) : int(step) + ] + else: + episodes_start, episodes_end = ep_range_tuple + if episodes_start.strip(): + episodes_range = episodes[int(episodes_start) :] + elif episodes_end.strip(): + episodes_range = episodes[: int(episodes_end)] + else: + episodes_range = episodes + else: + episodes_range = episodes[int(episode_range) :] + + else: + episodes_range = sorted(episodes, key=float) + + if not episode_streams_only: + grabbed_anime = dict(anime) + grabbed_anime["requested_episodes"] = episodes_range + grabbed_anime["translation_type"] = config.translation_type + grabbed_anime["episodes_streams"] = {} + else: + grabbed_anime = {} + + # lets download em + for episode in episodes_range: + try: + if episode not in episodes: + continue + streams = anime_provider.get_episode_streams( + anime, episode, config.translation_type + ) + if not streams: + continue + episode_streams = {server["server"]: server for server in streams} + + if episode_streams_only: + grabbed_anime[episode] = episode_streams + else: + grabbed_anime["episodes_streams"][ # pyright:ignore + episode + ] = episode_streams + + except Exception as e: + logger.error(e) + + # grab the full data for single title and appen to final result or episode streams + grabbed_animes.append(grabbed_anime) + + # print out the final result either {} or [] depending if more than one title os requested + if len(grabbed_animes) == 1: + print(json.dumps(grabbed_animes[0])) + else: + print(json.dumps(grabbed_animes)) diff --git a/fastanime/cli/commands/search.py b/fastanime/cli/commands/search.py index eab62e3..f062f8f 100644 --- a/fastanime/cli/commands/search.py +++ b/fastanime/cli/commands/search.py @@ -1,8 +1,12 @@ +from typing import TYPE_CHECKING + import click -from ...cli.config import Config from ..completion_functions import anime_titles_shell_complete +if TYPE_CHECKING: + from ...cli.config import Config + @click.command( help="This subcommand directly interacts with the provider to enable basic streaming. Useful for binging anime.", @@ -23,240 +27,335 @@ from ..completion_functions import anime_titles_shell_complete help="A range of episodes to binge (start-end)", ) @click.pass_obj -def search(config: Config, anime_titles: str, episode_range: str): +def search(config: "Config", anime_titles: str, episode_range: str): from click import clear from rich import print from rich.progress import Progress from thefuzz import fuzz - from ...AnimeProvider import AnimeProvider - from ...libs.anime_provider.types import Anime from ...libs.fzf import fzf from ...libs.rofi import Rofi - from ...Utility.data import anime_normalizer - from ..utils.mpv import run_mpv from ..utils.tools import exit_app - from ..utils.utils import ( - filter_by_quality, - fuzzy_inquirer, - move_preferred_subtitle_lang_to_top, - ) + from ..utils.utils import fuzzy_inquirer - anime_provider = AnimeProvider(config.provider) - anilist_anime_info = None + if config.manga: + from InquirerPy.prompts.number import NumberPrompt + from yt_dlp.utils import sanitize_filename + + from ...MangaProvider import MangaProvider + from ..utils.feh import feh_manga_viewer + + manga_title = anime_titles[0] + + manga_provider = MangaProvider() + search_data = manga_provider.search_for_manga(manga_title) + if not search_data: + print("No search results") + exit(1) + + search_results = search_data["results"] - print(f"[green bold]Streaming:[/] {anime_titles}") - for anime_title in anime_titles: - # ---- search for anime ---- - with Progress() as progress: - progress.add_task("Fetching Search Results...", total=None) - search_results = anime_provider.search_for_anime( - anime_title, config.translation_type - ) - if not search_results: - print("Search results not found") - input("Enter to retry") - search(config, anime_title, episode_range) - return - search_results = search_results["results"] - if not search_results: - print("Anime not found :cry:") - exit_app() search_results_ = { - search_result["title"]: search_result for search_result in search_results + sanitize_filename(search_result["title"]): search_result + for search_result in search_results } if config.auto_select: - search_result = max( + search_result_manga_title = max( search_results_.keys(), - key=lambda title: fuzz.ratio( - anime_normalizer.get(title, title), anime_title - ), + key=lambda title: fuzz.ratio(title, manga_title), ) - print("[cyan]Auto Selecting:[/] ", search_result) + print("[cyan]Auto Selecting:[/] ", search_result_manga_title) else: choices = list(search_results_.keys()) + preview = None + if config.preview: + from ..interfaces.utils import get_fzf_manga_preview + + preview = get_fzf_manga_preview(search_results) if config.use_fzf: - search_result = fzf.run(choices, "Please Select title: ", "FastAnime") + search_result_manga_title = fzf.run( + choices, "Please Select title: ", preview=preview + ) elif config.use_rofi: - search_result = Rofi.run(choices, "Please Select Title") + search_result_manga_title = Rofi.run(choices, "Please Select Title") else: - search_result = fuzzy_inquirer( + search_result_manga_title = fuzzy_inquirer( choices, "Please Select Title", ) - # ---- fetch selected anime ---- - with Progress() as progress: - progress.add_task("Fetching Anime...", total=None) - anime: Anime | None = anime_provider.get_anime( - search_results_[search_result]["id"] + anilist_id = search_results_[search_result_manga_title]["id"] + manga_info = manga_provider.get_manga(anilist_id) + if not manga_info: + print("No manga info") + exit(1) + + anilist_helper = None + if config.user: + from ...anilist import AniList + + AniList.login_user(config.user["token"]) + anilist_helper = AniList + + def _manga_viewer(): + chapter_number = NumberPrompt("Select a chapter number").execute() + chapter_info = manga_provider.get_chapter_thumbnails( + manga_info["id"], str(chapter_number) ) - if not anime: - print("Sth went wring anime no found") - input("Enter to continue...") - search(config, anime_title, episode_range) - return - episodes_range = [] - episodes: list[str] = sorted( - anime["availableEpisodesDetail"][config.translation_type], key=float - ) - if episode_range: - if ":" in episode_range: - ep_range_tuple = episode_range.split(":") - if len(ep_range_tuple) == 3 and all(ep_range_tuple): - episodes_start, episodes_end, step = ep_range_tuple - episodes_range = episodes[ - int(episodes_start) : int(episodes_end) : int(step) - ] + if not chapter_info: + print("No chapter info") + input("Enter to retry...") + _manga_viewer() + return + print( + f"[purple bold]Now Reading: [/] {search_result_manga_title} [cyan bold]Chapter:[/] {chapter_info['title']}" + ) + feh_manga_viewer(chapter_info["thumbnails"], chapter_info["title"]) + if anilist_helper: + anilist_helper.update_anime_list( + {"mediaId": anilist_id, "progress": chapter_number} + ) + _manga_viewer() + + _manga_viewer() + else: + from ...AnimeProvider import AnimeProvider + from ...libs.anime_provider.types import Anime + from ...Utility.data import anime_normalizer + from ..utils.mpv import run_mpv + from ..utils.utils import filter_by_quality, move_preferred_subtitle_lang_to_top + + anime_provider = AnimeProvider(config.provider) + anilist_anime_info = None + + print(f"[green bold]Streaming:[/] {anime_titles}") + for anime_title in anime_titles: + # ---- search for anime ---- + with Progress() as progress: + progress.add_task("Fetching Search Results...", total=None) + search_results = anime_provider.search_for_anime( + anime_title, config.translation_type + ) + if not search_results: + print("Search results not found") + input("Enter to retry") + search(config, anime_title, episode_range) + return + search_results = search_results["results"] + if not search_results: + print("Anime not found :cry:") + exit_app() + search_results_ = { + search_result["title"]: search_result + for search_result in search_results + } + + if config.auto_select: + search_result_manga_title = max( + search_results_.keys(), + key=lambda title: fuzz.ratio( + anime_normalizer.get(title, title), anime_title + ), + ) + print("[cyan]Auto Selecting:[/] ", search_result_manga_title) - elif len(ep_range_tuple) == 2 and all(ep_range_tuple): - episodes_start, episodes_end = ep_range_tuple - episodes_range = episodes[int(episodes_start) : int(episodes_end)] - else: - episodes_start, episodes_end = ep_range_tuple - if episodes_start.strip(): - episodes_range = episodes[int(episodes_start) :] - elif episodes_end.strip(): - episodes_range = episodes[: int(episodes_end)] - else: - episodes_range = episodes else: - episodes_range = episodes[int(episode_range) :] - - episodes_range = iter(episodes_range) - - if config.normalize_titles: - from ...libs.common.mini_anilist import get_basic_anime_info_by_title - - anilist_anime_info = get_basic_anime_info_by_title(anime["title"]) - - def stream_anime(): - clear() - episode = None - - if episodes_range: - try: - episode = next(episodes_range) # pyright:ignore - print( - f"[cyan]Auto selecting:[/] {search_result} [cyan]Episode:[/] {episode}" - ) - except StopIteration: - print("[green]Completed binge sequence[/]:smile:") - return - - if not episode or episode not in episodes: - choices = [*episodes, "end"] + choices = list(search_results_.keys()) if config.use_fzf: - episode = fzf.run( - choices, "Select an episode: ", header=search_result + search_result_manga_title = fzf.run( + choices, "Please Select title: ", "FastAnime" ) elif config.use_rofi: - episode = Rofi.run(choices, "Select an episode") + search_result_manga_title = Rofi.run(choices, "Please Select Title") else: - episode = fuzzy_inquirer( + search_result_manga_title = fuzzy_inquirer( choices, - "Select episode", + "Please Select Title", ) - if episode == "end": - return - # ---- fetch streams ---- + # ---- fetch selected anime ---- with Progress() as progress: - progress.add_task("Fetching Episode Streams...", total=None) - streams = anime_provider.get_episode_streams( - anime, episode, config.translation_type + progress.add_task("Fetching Anime...", total=None) + anime: Anime | None = anime_provider.get_anime( + search_results_[search_result_manga_title]["id"] ) - if not streams: - print("Failed to get streams") + + if not anime: + print("Sth went wring anime no found") + input("Enter to continue...") + search(config, anime_title, episode_range) + return + episodes_range = [] + episodes: list[str] = sorted( + anime["availableEpisodesDetail"][config.translation_type], key=float + ) + if episode_range: + if ":" in episode_range: + ep_range_tuple = episode_range.split(":") + if len(ep_range_tuple) == 3 and all(ep_range_tuple): + episodes_start, episodes_end, step = ep_range_tuple + episodes_range = episodes[ + int(episodes_start) : int(episodes_end) : int(step) + ] + + elif len(ep_range_tuple) == 2 and all(ep_range_tuple): + episodes_start, episodes_end = ep_range_tuple + episodes_range = episodes[ + int(episodes_start) : int(episodes_end) + ] + else: + episodes_start, episodes_end = ep_range_tuple + if episodes_start.strip(): + episodes_range = episodes[int(episodes_start) :] + elif episodes_end.strip(): + episodes_range = episodes[: int(episodes_end)] + else: + episodes_range = episodes + else: + episodes_range = episodes[int(episode_range) :] + + episodes_range = iter(episodes_range) + + if config.normalize_titles: + from ...libs.common.mini_anilist import get_basic_anime_info_by_title + + anilist_anime_info = get_basic_anime_info_by_title(anime["title"]) + + def stream_anime(): + clear() + episode = None + + if episodes_range: + try: + episode = next(episodes_range) # pyright:ignore + print( + f"[cyan]Auto selecting:[/] {search_result_manga_title} [cyan]Episode:[/] {episode}" + ) + except StopIteration: + print("[green]Completed binge sequence[/]:smile:") + return + + if not episode or episode not in episodes: + choices = [*episodes, "end"] + if config.use_fzf: + episode = fzf.run( + choices, + "Select an episode: ", + header=search_result_manga_title, + ) + elif config.use_rofi: + episode = Rofi.run(choices, "Select an episode") + else: + episode = fuzzy_inquirer( + choices, + "Select episode", + ) + if episode == "end": return - try: - # ---- fetch servers ---- - if config.server == "top": - with Progress() as progress: - progress.add_task("Fetching top server...", total=None) - server = next(streams, None) - if not server: - print("Sth went wrong when fetching the episode") + # ---- fetch streams ---- + with Progress() as progress: + progress.add_task("Fetching Episode Streams...", total=None) + streams = anime_provider.get_episode_streams( + anime, episode, config.translation_type + ) + if not streams: + print("Failed to get streams") + return + + try: + # ---- fetch servers ---- + if config.server == "top": + with Progress() as progress: + progress.add_task("Fetching top server...", total=None) + server = next(streams, None) + if not server: + print("Sth went wrong when fetching the episode") + input("Enter to continue") + stream_anime() + return + stream_link = filter_by_quality(config.quality, server["links"]) + if not stream_link: + print("Quality not found") input("Enter to continue") stream_anime() return - stream_link = filter_by_quality(config.quality, server["links"]) - if not stream_link: - print("Quality not found") - input("Enter to continue") - stream_anime() - return - link = stream_link["link"] - subtitles = server["subtitles"] - stream_headers = server["headers"] - episode_title = server["episode_title"] - else: - with Progress() as progress: - progress.add_task("Fetching servers", total=None) - # prompt for server selection - servers = {server["server"]: server for server in streams} - servers_names = list(servers.keys()) - if config.server in servers_names: - server = config.server + link = stream_link["link"] + subtitles = server["subtitles"] + stream_headers = server["headers"] + episode_title = server["episode_title"] else: - if config.use_fzf: - server = fzf.run(servers_names, "Select an link: ") - elif config.use_rofi: - server = Rofi.run(servers_names, "Select an link") + with Progress() as progress: + progress.add_task("Fetching servers", total=None) + # prompt for server selection + servers = {server["server"]: server for server in streams} + servers_names = list(servers.keys()) + if config.server in servers_names: + server = config.server else: - server = fuzzy_inquirer( - servers_names, - "Select link", - ) - stream_link = filter_by_quality( - config.quality, servers[server]["links"] - ) - if not stream_link: - print("Quality not found") - input("Enter to continue") - stream_anime() - return - link = stream_link["link"] - stream_headers = servers[server]["headers"] - subtitles = servers[server]["subtitles"] - episode_title = servers[server]["episode_title"] + if config.use_fzf: + server = fzf.run(servers_names, "Select an link: ") + elif config.use_rofi: + server = Rofi.run(servers_names, "Select an link") + else: + server = fuzzy_inquirer( + servers_names, + "Select link", + ) + stream_link = filter_by_quality( + config.quality, servers[server]["links"] + ) + if not stream_link: + print("Quality not found") + input("Enter to continue") + stream_anime() + return + link = stream_link["link"] + stream_headers = servers[server]["headers"] + subtitles = servers[server]["subtitles"] + episode_title = servers[server]["episode_title"] - selected_anime_title = search_result - if anilist_anime_info: - selected_anime_title = ( - anilist_anime_info["title"][config.preferred_language] - or anilist_anime_info["title"]["romaji"] - or anilist_anime_info["title"]["english"] - ) - import re + selected_anime_title = search_result_manga_title + if anilist_anime_info: + selected_anime_title = ( + anilist_anime_info["title"][config.preferred_language] + or anilist_anime_info["title"]["romaji"] + or anilist_anime_info["title"]["english"] + ) + import re - for episode_detail in anilist_anime_info["episodes"]: - if re.match(f"Episode {episode} ", episode_detail["title"]): - episode_title = episode_detail["title"] - break - print( - f"[purple]Now Playing:[/] {selected_anime_title} Episode {episode}" - ) - subtitles = move_preferred_subtitle_lang_to_top( - subtitles, config.sub_lang - ) - if config.sync_play: - from ..utils.syncplay import SyncPlayer + for episode_detail in anilist_anime_info["episodes"]: + if re.match(f"Episode {episode} ", episode_detail["title"]): + episode_title = episode_detail["title"] + break + print( + f"[purple]Now Playing:[/] {selected_anime_title} Episode {episode}" + ) + subtitles = move_preferred_subtitle_lang_to_top( + subtitles, config.sub_lang + ) + if config.sync_play: + from ..utils.syncplay import SyncPlayer + + SyncPlayer( + link, + episode_title, + headers=stream_headers, + subtitles=subtitles, + ) + else: + run_mpv( + link, + episode_title, + headers=stream_headers, + subtitles=subtitles, + ) + except IndexError as e: + print(e) + input("Enter to continue") + stream_anime() - SyncPlayer( - link, episode_title, headers=stream_headers, subtitles=subtitles - ) - else: - run_mpv( - link, episode_title, headers=stream_headers, subtitles=subtitles - ) - except IndexError as e: - print(e) - input("Enter to continue") stream_anime() - - stream_anime() diff --git a/fastanime/cli/config.py b/fastanime/cli/config.py index 915a735..74efe5f 100644 --- a/fastanime/cli/config.py +++ b/fastanime/cli/config.py @@ -55,6 +55,7 @@ class Config(object): user: [TODO:attribute] """ + manga = False sync_play = False anime_list: list watch_history: dict diff --git a/fastanime/cli/interfaces/utils.py b/fastanime/cli/interfaces/utils.py index d9559d9..d2317fb 100644 --- a/fastanime/cli/interfaces/utils.py +++ b/fastanime/cli/interfaces/utils.py @@ -7,7 +7,7 @@ import textwrap from threading import Thread import requests -from yt_dlp.utils import clean_html +from yt_dlp.utils import clean_html, sanitize_filename from ...constants import APP_CACHE_DIR from ...libs.anilist.types import AnilistBaseMediaDataSchema @@ -168,6 +168,63 @@ def get_rofi_icons( logger.error("%r generated an exception: %s" % (url, e)) +# get rofi icons +def get_fzf_manga_preview(manga_results, workers=None, wait=False): + """A helper function to make sure that the images are downloaded so they can be used as icons + + Args: + titles (list[str]): sanitized titles of the anime; NOTE: its important that they are sanitized since they are used as the filenames of the images + workers ([TODO:parameter]): Number of threads to use to download the images; defaults to as many as possible + anilist_results: the anilist results from an anilist action + """ + + def _worker(): + # use concurrency to download the images as fast as possible + with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor: + # load the jobs + future_to_url = {} + for manga in manga_results: + image_url = manga["poster"] + future_to_url[ + executor.submit( + save_image_from_url, + image_url, + sanitize_filename(manga["title"]), + ) + ] = image_url + + # execute the jobs + for future in concurrent.futures.as_completed(future_to_url): + url = future_to_url[future] + try: + future.result() + except Exception as e: + logger.error("%r generated an exception: %s" % (url, e)) + + background_worker = Thread( + target=_worker, + ) + # ensure images and info exists + background_worker.daemon = True + background_worker.start() + + # the preview script is in bash so making sure fzf doesnt use any other shell lang to process the preview script + os.environ["SHELL"] = shutil.which("bash") or "bash" + preview = """ + %s + if [ -s %s/{} ]; then fzf-preview %s/{} + else echo Loading... + fi + """ % ( + fzf_preview, + IMAGES_CACHE_DIR, + IMAGES_CACHE_DIR, + ) + if wait: + background_worker.join() + return preview + + # get rofi icons def get_fzf_episode_preview( anilist_result: AnilistBaseMediaDataSchema, episodes, workers=None, wait=False diff --git a/fastanime/cli/utils/feh.py b/fastanime/cli/utils/feh.py new file mode 100644 index 0000000..4b9bfc7 --- /dev/null +++ b/fastanime/cli/utils/feh.py @@ -0,0 +1,12 @@ +import shutil +import subprocess +from sys import exit + + +def feh_manga_viewer(image_links: list[str], window_title: str): + FEH_EXECUTABLE = shutil.which("feh") + if not FEH_EXECUTABLE: + print("feh not found") + exit(1) + commands = [FEH_EXECUTABLE, *image_links, "--title", window_title] + subprocess.run(commands) diff --git a/fastanime/libs/anilist/types.py b/fastanime/libs/anilist/types.py index 06aef88..a1f2cb9 100644 --- a/fastanime/libs/anilist/types.py +++ b/fastanime/libs/anilist/types.py @@ -165,6 +165,7 @@ class AnilistBaseMediaDataSchema(TypedDict): nextAiringEpisode: AnilistMediaNextAiringEpisode season: str streamingEpisodes: list[StreamingEpisode] + chapters: int seasonYear: int duration: int synonyms: list[str] diff --git a/fastanime/libs/anime_provider/mini_anilist.py b/fastanime/libs/anime_provider/mini_anilist.py deleted file mode 100644 index e30f9f9..0000000 --- a/fastanime/libs/anime_provider/mini_anilist.py +++ /dev/null @@ -1,153 +0,0 @@ -import logging -from typing import TYPE_CHECKING - -from requests import post -from thefuzz import fuzz - -if TYPE_CHECKING: - from ..anilist.types import AnilistDataSchema -logger = logging.getLogger(__name__) - -ANILIST_ENDPOINT = "https://graphql.anilist.co" -""" -query($query:String){ - Page(perPage:50){ - pageInfo{ - total - currentPage - hasNextPage - } - media(search:$query,type:ANIME){ - id - idMal - title{ - romaji - english - } - episodes - status - nextAiringEpisode { - timeUntilAiring - airingAt - episode - } - } - } -} -""" - - -def search_for_anime_with_anilist(anime_title: str): - query = """ - query($query:String){ - Page(perPage:50){ - pageInfo{ - total - currentPage - hasNextPage - } - media(search:$query,type:ANIME){ - id - idMal - title{ - romaji - english - } - episodes - status - nextAiringEpisode { - timeUntilAiring - airingAt - episode - } - } - } - } - """ - response = post( - ANILIST_ENDPOINT, - json={"query": query, "variables": {"query": anime_title}}, - timeout=10, - ) - if response.status_code == 200: - anilist_data: "AnilistDataSchema" = response.json() - return { - "pageInfo": anilist_data["data"]["Page"]["pageInfo"], - "results": [ - { - "id": anime_result["id"], - "title": anime_result["title"]["romaji"] - or anime_result["title"]["english"], - "type": "anime", - "availableEpisodes": list( - range( - 1, - ( - anime_result["episodes"] - if not anime_result["status"] == "RELEASING" - and anime_result["episodes"] - else ( - anime_result["nextAiringEpisode"]["episode"] - 1 - if anime_result["nextAiringEpisode"] - else 0 - ) - ), - ) - ), - } - for anime_result in anilist_data["data"]["Page"]["media"] - ], - } - - -def get_mal_id_and_anilist_id(anime_title: str) -> "dict[str,int] | None": - """the abstraction over all none authenticated requests and that returns data of a similar type - - Args: - query: the anilist query - variables: the anilist api variables - - Returns: - a boolean indicating success and none or an anilist object depending on success - """ - query = """ - query($query:String){ - Page(perPage:50){ - pageInfo{ - total - currentPage - hasNextPage - } - media(search:$query,type:ANIME){ - id - idMal - title{ - romaji - english - } - } - } - } - """ - - try: - variables = {"query": anime_title} - response = post( - ANILIST_ENDPOINT, - json={"query": query, "variables": variables}, - timeout=10, - ) - anilist_data: "AnilistDataSchema" = response.json() - if response.status_code == 200: - anime = max( - anilist_data["data"]["Page"]["media"], - key=lambda anime: max( - ( - fuzz.ratio(anime, str(anime["title"]["romaji"])), - fuzz.ratio(anime_title, str(anime["title"]["english"])), - ) - ), - ) - return {"id_anilist": anime["id"], "id_mal": anime["idMal"]} - except Exception as e: - logger.error(f"Something unexpected occured {e}") diff --git a/fastanime/libs/common/common.py b/fastanime/libs/common/common.py new file mode 100644 index 0000000..8ff6b57 --- /dev/null +++ b/fastanime/libs/common/common.py @@ -0,0 +1,15 @@ +import logging + +from requests import get + +logger = logging.getLogger(__name__) + + +def fetch_anime_info_from_bal(anilist_id): + try: + url = f"https://raw.githubusercontent.com/bal-mackup/mal-backup/master/anilist/anime/{anilist_id}.json" + response = get(url, timeout=11) + if response.status_code == 200: + return response.json() + except Exception as e: + logger.error(e) diff --git a/fastanime/libs/common/mini_anilist.py b/fastanime/libs/common/mini_anilist.py index 90e0a77..8d6d47c 100644 --- a/fastanime/libs/common/mini_anilist.py +++ b/fastanime/libs/common/mini_anilist.py @@ -37,7 +37,66 @@ query($query:String){ """ -def search_foranime_with_anilist(anime_title: str): +def search_for_manga_with_anilist(manga_title: str): + query = """ + query($query:String){ + Page(perPage:50){ + pageInfo{ + currentPage + } + media(search:$query,type:MANGA){ + id + idMal + title{ + romaji + english + } + chapters + status + coverImage{ + medium + large + } + } + } + } + """ + response = post( + ANILIST_ENDPOINT, + json={"query": query, "variables": {"query": manga_title}}, + timeout=10, + ) + if response.status_code == 200: + anilist_data: "AnilistDataSchema" = response.json() + return { + "pageInfo": anilist_data["data"]["Page"]["pageInfo"], + "results": [ + { + "id": anime_result["id"], + "poster": anime_result["coverImage"]["large"], + "title": ( + anime_result["title"]["romaji"] + or anime_result["title"]["english"] + ) + + f" [Chapters: {anime_result['chapters']}]", + "type": "manga", + "availableChapters": list( + range( + 1, + ( + anime_result["chapters"] + if anime_result["chapters"] + else 0 + ), + ) + ), + } + for anime_result in anilist_data["data"]["Page"]["media"] + ], + } + + +def search_for_anime_with_anilist(anime_title: str): query = """ query($query:String){ Page(perPage:50){ diff --git a/fastanime/libs/manga_provider/__init__.py b/fastanime/libs/manga_provider/__init__.py new file mode 100644 index 0000000..a5c7e04 --- /dev/null +++ b/fastanime/libs/manga_provider/__init__.py @@ -0,0 +1 @@ +manga_sources = {"mangadex": "api.MangaDexApi"} diff --git a/fastanime/libs/manga_provider/base_provider.py b/fastanime/libs/manga_provider/base_provider.py new file mode 100644 index 0000000..59c6bce --- /dev/null +++ b/fastanime/libs/manga_provider/base_provider.py @@ -0,0 +1,13 @@ +import requests +from yt_dlp.utils.networking import random_user_agent + + +class MangaProvider: + session: requests.Session + + USER_AGENT = random_user_agent() + HEADERS = {} + + def __init__(self) -> None: + self.session = requests.session() + self.session.headers.update({"User-Agent": self.USER_AGENT, **self.HEADERS}) diff --git a/fastanime/libs/manga_provider/common.py b/fastanime/libs/manga_provider/common.py new file mode 100644 index 0000000..43d780a --- /dev/null +++ b/fastanime/libs/manga_provider/common.py @@ -0,0 +1,15 @@ +import logging + +from requests import get + +logger = logging.getLogger(__name__) + + +def fetch_manga_info_from_bal(anilist_id): + try: + url = f"https://raw.githubusercontent.com/bal-mackup/mal-backup/master/anilist/manga/{anilist_id}.json" + response = get(url, timeout=11) + if response.ok: + return response.json() + except Exception as e: + logger.error(e) diff --git a/fastanime/libs/manga_provider/mangadex/__init__.py b/fastanime/libs/manga_provider/mangadex/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/fastanime/libs/manga_provider/mangadex/api.py b/fastanime/libs/manga_provider/mangadex/api.py new file mode 100644 index 0000000..237fbdb --- /dev/null +++ b/fastanime/libs/manga_provider/mangadex/api.py @@ -0,0 +1,51 @@ +import logging + +from ...common.mini_anilist import search_for_manga_with_anilist +from ..base_provider import MangaProvider +from ..common import fetch_manga_info_from_bal + +logger = logging.getLogger(__name__) + + +class MangaDexApi(MangaProvider): + def search_for_manga(self, title: str, *args): + try: + search_results = search_for_manga_with_anilist(title) + return search_results + except Exception as e: + logger.error(f"[MANGADEX-ERROR]: {e}") + + def get_manga(self, anilist_manga_id: str): + bal_data = fetch_manga_info_from_bal(anilist_manga_id) + if not bal_data: + return + manga_id, MangaDexManga = next(iter(bal_data["Sites"]["Mangadex"].items())) + return { + "id": manga_id, + "title": MangaDexManga["title"], + "poster": MangaDexManga["image"], + "availableChapters": [], + } + + def get_chapter_thumbnails(self, manga_id, chapter): + chapter_info_url = f"https://api.mangadex.org/chapter?manga={manga_id}&translatedLanguage[]=en&chapter={chapter}&includeEmptyPages=0" + chapter_info_response = self.session.get(chapter_info_url) + if not chapter_info_response.ok: + return + chapter_info = next(iter(chapter_info_response.json()["data"])) + chapters_thumbnails_url = ( + f"https://api.mangadex.org/at-home/server/{chapter_info['id']}" + ) + chapter_thumbnails_response = self.session.get(chapters_thumbnails_url) + if not chapter_thumbnails_response.ok: + return + chapter_thumbnails_info = chapter_thumbnails_response.json() + base_url = chapter_thumbnails_info["baseUrl"] + hash = chapter_thumbnails_info["chapter"]["hash"] + return { + "thumbnails": [ + f"{base_url}/data/{hash}/{chapter_thumbnail}" + for chapter_thumbnail in chapter_thumbnails_info["chapter"]["data"] + ], + "title": chapter_info["attributes"]["title"], + }