feat: add experimental manga support

This commit is contained in:
Benex254
2024-08-23 11:19:25 +03:00
parent 6a6efa9d56
commit 2dbbb1c4df
16 changed files with 756 additions and 434 deletions

105
fastanime/MangaProvider.py Normal file
View File

@@ -0,0 +1,105 @@
"""An abstraction over all providers offering added features with a simple and well typed api
[TODO:description]
"""
import importlib
import logging
from typing import TYPE_CHECKING
from .libs.manga_provider import manga_sources
if TYPE_CHECKING:
pass
logger = logging.getLogger(__name__)
class MangaProvider:
"""Class that manages all anime sources adding some extra functionality to them.
Attributes:
PROVIDERS: [TODO:attribute]
provider: [TODO:attribute]
provider: [TODO:attribute]
dynamic: [TODO:attribute]
retries: [TODO:attribute]
manga_provider: [TODO:attribute]
"""
PROVIDERS = list(manga_sources.keys())
provider = PROVIDERS[0]
def __init__(self, provider="mangadex", dynamic=False, retries=0) -> None:
self.provider = provider
self.dynamic = dynamic
self.retries = retries
self.lazyload_provider(self.provider)
def lazyload_provider(self, provider):
"""updates the current provider being used"""
_, anime_provider_cls_name = manga_sources[provider].split(".", 1)
package = f"fastanime.libs.manga_provider.{provider}"
provider_api = importlib.import_module(".api", package)
manga_provider = getattr(provider_api, anime_provider_cls_name)
self.manga_provider = manga_provider()
def search_for_manga(
self,
user_query,
nsfw=True,
unknown=True,
):
"""core abstraction over all providers search functionality
Args:
user_query ([TODO:parameter]): [TODO:description]
translation_type ([TODO:parameter]): [TODO:description]
nsfw ([TODO:parameter]): [TODO:description]
manga_provider ([TODO:parameter]): [TODO:description]
anilist_obj: [TODO:description]
Returns:
[TODO:return]
"""
manga_provider = self.manga_provider
try:
results = manga_provider.search_for_manga(user_query, nsfw, unknown)
except Exception as e:
logger.error(e)
results = None
return results
def get_manga(
self,
anime_id: str,
):
"""core abstraction over getting info of an anime from all providers
Args:
anime_id: [TODO:description]
anilist_obj: [TODO:description]
Returns:
[TODO:return]
"""
manga_provider = self.manga_provider
try:
results = manga_provider.get_manga(anime_id)
except Exception as e:
logger.error(e)
results = None
return results
def get_chapter_thumbnails(
self,
manga_id: str,
chapter: str,
):
manga_provider = self.manga_provider
try:
results = manga_provider.get_chapter_thumbnails(manga_id, chapter)
except Exception as e:
logger.error(e)
results = None
return results # pyright:ignore

View File

@@ -40,6 +40,7 @@ signal.signal(signal.SIGINT, handle_exit)
short_help="Stream Anime",
)
@click.version_option(__version__, "--version")
@click.option("--manga", "-m", help="Enable manga mode", is_flag=True)
@click.option("--log", help="Allow logging to stdout", is_flag=True)
@click.option("--log-file", help="Allow logging to a file", is_flag=True)
@click.option("--rich-traceback", help="Use rich to output tracebacks", is_flag=True)
@@ -150,6 +151,7 @@ signal.signal(signal.SIGINT, handle_exit)
@click.pass_context
def run_cli(
ctx: click.Context,
manga,
log,
log_file,
rich_traceback,
@@ -183,6 +185,7 @@ def run_cli(
from .config import Config
ctx.obj = Config()
ctx.obj.manga = manga
if log:
import logging

View File

@@ -56,26 +56,19 @@ def grab(
from thefuzz import fuzz
from ...AnimeProvider import AnimeProvider
logger = getLogger(__name__)
if config.manga:
manga_title = anime_titles[0]
from ...MangaProvider import MangaProvider
anime_provider = AnimeProvider(config.provider)
grabbed_animes = []
for anime_title in anime_titles:
# ---- search for anime ----
search_results = anime_provider.search_for_anime(
anime_title, translation_type=config.translation_type
)
if not search_results:
manga_provider = MangaProvider()
search_data = manga_provider.search_for_manga(manga_title)
if not search_data:
exit(1)
if search_results_only:
# grab only search results skipping all lines after this
grabbed_animes.append(search_results)
continue
search_results = search_results["results"]
print(json.dumps(search_data))
exit(0)
search_results = search_data["results"]
if not search_results:
logger.error("no results for your search")
exit(1)
@@ -83,83 +76,133 @@ def grab(
search_result["title"]: search_result for search_result in search_results
}
search_result = max(
search_results_.keys(), key=lambda title: fuzz.ratio(title, anime_title)
search_result_anime_title = max(
search_results_.keys(), key=lambda title: fuzz.ratio(title, anime_titles[0])
)
# ---- fetch anime ----
anime = anime_provider.get_anime(search_results_[search_result]["id"])
if not anime:
exit(1)
manga_info = manga_provider.get_manga(
search_results_[search_result_anime_title]["id"]
)
if not manga_info:
return
if anime_info_only:
# grab only the anime data skipping all lines after this
grabbed_animes.append(anime)
continue
episodes = sorted(
anime["availableEpisodesDetail"][config.translation_type], key=float
print(json.dumps(manga_info))
exit(0)
chapter_info = manga_provider.get_chapter_thumbnails(
manga_info["id"], str(episode_range)
)
if not chapter_info:
exit(1)
print(json.dumps(chapter_info))
# where the magic happens
if episode_range:
if ":" in episode_range:
ep_range_tuple = episode_range.split(":")
if len(ep_range_tuple) == 2 and all(ep_range_tuple):
episodes_start, episodes_end = ep_range_tuple
episodes_range = episodes[int(episodes_start) : int(episodes_end)]
elif len(ep_range_tuple) == 3 and all(ep_range_tuple):
episodes_start, episodes_end, step = ep_range_tuple
episodes_range = episodes[
int(episodes_start) : int(episodes_end) : int(step)
]
else:
episodes_start, episodes_end = ep_range_tuple
if episodes_start.strip():
episodes_range = episodes[int(episodes_start) :]
elif episodes_end.strip():
episodes_range = episodes[: int(episodes_end)]
else:
episodes_range = episodes
else:
episodes_range = episodes[int(episode_range) :]
else:
episodes_range = sorted(episodes, key=float)
if not episode_streams_only:
grabbed_anime = dict(anime)
grabbed_anime["requested_episodes"] = episodes_range
grabbed_anime["translation_type"] = config.translation_type
grabbed_anime["episodes_streams"] = {}
else:
grabbed_anime = {}
# lets download em
for episode in episodes_range:
try:
if episode not in episodes:
continue
streams = anime_provider.get_episode_streams(
anime, episode, config.translation_type
)
if not streams:
continue
episode_streams = {server["server"]: server for server in streams}
if episode_streams_only:
grabbed_anime[episode] = episode_streams
else:
grabbed_anime["episodes_streams"][ # pyright:ignore
episode
] = episode_streams
except Exception as e:
logger.error(e)
# grab the full data for single title and appen to final result or episode streams
grabbed_animes.append(grabbed_anime)
# print out the final result either {} or [] depending if more than one title os requested
if len(grabbed_animes) == 1:
print(json.dumps(grabbed_animes[0]))
else:
print(json.dumps(grabbed_animes))
from ...AnimeProvider import AnimeProvider
anime_provider = AnimeProvider(config.provider)
grabbed_animes = []
for anime_title in anime_titles:
# ---- search for anime ----
search_results = anime_provider.search_for_anime(
anime_title, translation_type=config.translation_type
)
if not search_results:
exit(1)
if search_results_only:
# grab only search results skipping all lines after this
grabbed_animes.append(search_results)
continue
search_results = search_results["results"]
if not search_results:
logger.error("no results for your search")
exit(1)
search_results_ = {
search_result["title"]: search_result
for search_result in search_results
}
search_result_anime_title = max(
search_results_.keys(), key=lambda title: fuzz.ratio(title, anime_title)
)
# ---- fetch anime ----
anime = anime_provider.get_anime(
search_results_[search_result_anime_title]["id"]
)
if not anime:
exit(1)
if anime_info_only:
# grab only the anime data skipping all lines after this
grabbed_animes.append(anime)
continue
episodes = sorted(
anime["availableEpisodesDetail"][config.translation_type], key=float
)
# where the magic happens
if episode_range:
if ":" in episode_range:
ep_range_tuple = episode_range.split(":")
if len(ep_range_tuple) == 2 and all(ep_range_tuple):
episodes_start, episodes_end = ep_range_tuple
episodes_range = episodes[
int(episodes_start) : int(episodes_end)
]
elif len(ep_range_tuple) == 3 and all(ep_range_tuple):
episodes_start, episodes_end, step = ep_range_tuple
episodes_range = episodes[
int(episodes_start) : int(episodes_end) : int(step)
]
else:
episodes_start, episodes_end = ep_range_tuple
if episodes_start.strip():
episodes_range = episodes[int(episodes_start) :]
elif episodes_end.strip():
episodes_range = episodes[: int(episodes_end)]
else:
episodes_range = episodes
else:
episodes_range = episodes[int(episode_range) :]
else:
episodes_range = sorted(episodes, key=float)
if not episode_streams_only:
grabbed_anime = dict(anime)
grabbed_anime["requested_episodes"] = episodes_range
grabbed_anime["translation_type"] = config.translation_type
grabbed_anime["episodes_streams"] = {}
else:
grabbed_anime = {}
# lets download em
for episode in episodes_range:
try:
if episode not in episodes:
continue
streams = anime_provider.get_episode_streams(
anime, episode, config.translation_type
)
if not streams:
continue
episode_streams = {server["server"]: server for server in streams}
if episode_streams_only:
grabbed_anime[episode] = episode_streams
else:
grabbed_anime["episodes_streams"][ # pyright:ignore
episode
] = episode_streams
except Exception as e:
logger.error(e)
# grab the full data for single title and appen to final result or episode streams
grabbed_animes.append(grabbed_anime)
# print out the final result either {} or [] depending if more than one title os requested
if len(grabbed_animes) == 1:
print(json.dumps(grabbed_animes[0]))
else:
print(json.dumps(grabbed_animes))

View File

@@ -1,8 +1,12 @@
from typing import TYPE_CHECKING
import click
from ...cli.config import Config
from ..completion_functions import anime_titles_shell_complete
if TYPE_CHECKING:
from ...cli.config import Config
@click.command(
help="This subcommand directly interacts with the provider to enable basic streaming. Useful for binging anime.",
@@ -23,240 +27,335 @@ from ..completion_functions import anime_titles_shell_complete
help="A range of episodes to binge (start-end)",
)
@click.pass_obj
def search(config: Config, anime_titles: str, episode_range: str):
def search(config: "Config", anime_titles: str, episode_range: str):
from click import clear
from rich import print
from rich.progress import Progress
from thefuzz import fuzz
from ...AnimeProvider import AnimeProvider
from ...libs.anime_provider.types import Anime
from ...libs.fzf import fzf
from ...libs.rofi import Rofi
from ...Utility.data import anime_normalizer
from ..utils.mpv import run_mpv
from ..utils.tools import exit_app
from ..utils.utils import (
filter_by_quality,
fuzzy_inquirer,
move_preferred_subtitle_lang_to_top,
)
from ..utils.utils import fuzzy_inquirer
anime_provider = AnimeProvider(config.provider)
anilist_anime_info = None
if config.manga:
from InquirerPy.prompts.number import NumberPrompt
from yt_dlp.utils import sanitize_filename
from ...MangaProvider import MangaProvider
from ..utils.feh import feh_manga_viewer
manga_title = anime_titles[0]
manga_provider = MangaProvider()
search_data = manga_provider.search_for_manga(manga_title)
if not search_data:
print("No search results")
exit(1)
search_results = search_data["results"]
print(f"[green bold]Streaming:[/] {anime_titles}")
for anime_title in anime_titles:
# ---- search for anime ----
with Progress() as progress:
progress.add_task("Fetching Search Results...", total=None)
search_results = anime_provider.search_for_anime(
anime_title, config.translation_type
)
if not search_results:
print("Search results not found")
input("Enter to retry")
search(config, anime_title, episode_range)
return
search_results = search_results["results"]
if not search_results:
print("Anime not found :cry:")
exit_app()
search_results_ = {
search_result["title"]: search_result for search_result in search_results
sanitize_filename(search_result["title"]): search_result
for search_result in search_results
}
if config.auto_select:
search_result = max(
search_result_manga_title = max(
search_results_.keys(),
key=lambda title: fuzz.ratio(
anime_normalizer.get(title, title), anime_title
),
key=lambda title: fuzz.ratio(title, manga_title),
)
print("[cyan]Auto Selecting:[/] ", search_result)
print("[cyan]Auto Selecting:[/] ", search_result_manga_title)
else:
choices = list(search_results_.keys())
preview = None
if config.preview:
from ..interfaces.utils import get_fzf_manga_preview
preview = get_fzf_manga_preview(search_results)
if config.use_fzf:
search_result = fzf.run(choices, "Please Select title: ", "FastAnime")
search_result_manga_title = fzf.run(
choices, "Please Select title: ", preview=preview
)
elif config.use_rofi:
search_result = Rofi.run(choices, "Please Select Title")
search_result_manga_title = Rofi.run(choices, "Please Select Title")
else:
search_result = fuzzy_inquirer(
search_result_manga_title = fuzzy_inquirer(
choices,
"Please Select Title",
)
# ---- fetch selected anime ----
with Progress() as progress:
progress.add_task("Fetching Anime...", total=None)
anime: Anime | None = anime_provider.get_anime(
search_results_[search_result]["id"]
anilist_id = search_results_[search_result_manga_title]["id"]
manga_info = manga_provider.get_manga(anilist_id)
if not manga_info:
print("No manga info")
exit(1)
anilist_helper = None
if config.user:
from ...anilist import AniList
AniList.login_user(config.user["token"])
anilist_helper = AniList
def _manga_viewer():
chapter_number = NumberPrompt("Select a chapter number").execute()
chapter_info = manga_provider.get_chapter_thumbnails(
manga_info["id"], str(chapter_number)
)
if not anime:
print("Sth went wring anime no found")
input("Enter to continue...")
search(config, anime_title, episode_range)
return
episodes_range = []
episodes: list[str] = sorted(
anime["availableEpisodesDetail"][config.translation_type], key=float
)
if episode_range:
if ":" in episode_range:
ep_range_tuple = episode_range.split(":")
if len(ep_range_tuple) == 3 and all(ep_range_tuple):
episodes_start, episodes_end, step = ep_range_tuple
episodes_range = episodes[
int(episodes_start) : int(episodes_end) : int(step)
]
if not chapter_info:
print("No chapter info")
input("Enter to retry...")
_manga_viewer()
return
print(
f"[purple bold]Now Reading: [/] {search_result_manga_title} [cyan bold]Chapter:[/] {chapter_info['title']}"
)
feh_manga_viewer(chapter_info["thumbnails"], chapter_info["title"])
if anilist_helper:
anilist_helper.update_anime_list(
{"mediaId": anilist_id, "progress": chapter_number}
)
_manga_viewer()
_manga_viewer()
else:
from ...AnimeProvider import AnimeProvider
from ...libs.anime_provider.types import Anime
from ...Utility.data import anime_normalizer
from ..utils.mpv import run_mpv
from ..utils.utils import filter_by_quality, move_preferred_subtitle_lang_to_top
anime_provider = AnimeProvider(config.provider)
anilist_anime_info = None
print(f"[green bold]Streaming:[/] {anime_titles}")
for anime_title in anime_titles:
# ---- search for anime ----
with Progress() as progress:
progress.add_task("Fetching Search Results...", total=None)
search_results = anime_provider.search_for_anime(
anime_title, config.translation_type
)
if not search_results:
print("Search results not found")
input("Enter to retry")
search(config, anime_title, episode_range)
return
search_results = search_results["results"]
if not search_results:
print("Anime not found :cry:")
exit_app()
search_results_ = {
search_result["title"]: search_result
for search_result in search_results
}
if config.auto_select:
search_result_manga_title = max(
search_results_.keys(),
key=lambda title: fuzz.ratio(
anime_normalizer.get(title, title), anime_title
),
)
print("[cyan]Auto Selecting:[/] ", search_result_manga_title)
elif len(ep_range_tuple) == 2 and all(ep_range_tuple):
episodes_start, episodes_end = ep_range_tuple
episodes_range = episodes[int(episodes_start) : int(episodes_end)]
else:
episodes_start, episodes_end = ep_range_tuple
if episodes_start.strip():
episodes_range = episodes[int(episodes_start) :]
elif episodes_end.strip():
episodes_range = episodes[: int(episodes_end)]
else:
episodes_range = episodes
else:
episodes_range = episodes[int(episode_range) :]
episodes_range = iter(episodes_range)
if config.normalize_titles:
from ...libs.common.mini_anilist import get_basic_anime_info_by_title
anilist_anime_info = get_basic_anime_info_by_title(anime["title"])
def stream_anime():
clear()
episode = None
if episodes_range:
try:
episode = next(episodes_range) # pyright:ignore
print(
f"[cyan]Auto selecting:[/] {search_result} [cyan]Episode:[/] {episode}"
)
except StopIteration:
print("[green]Completed binge sequence[/]:smile:")
return
if not episode or episode not in episodes:
choices = [*episodes, "end"]
choices = list(search_results_.keys())
if config.use_fzf:
episode = fzf.run(
choices, "Select an episode: ", header=search_result
search_result_manga_title = fzf.run(
choices, "Please Select title: ", "FastAnime"
)
elif config.use_rofi:
episode = Rofi.run(choices, "Select an episode")
search_result_manga_title = Rofi.run(choices, "Please Select Title")
else:
episode = fuzzy_inquirer(
search_result_manga_title = fuzzy_inquirer(
choices,
"Select episode",
"Please Select Title",
)
if episode == "end":
return
# ---- fetch streams ----
# ---- fetch selected anime ----
with Progress() as progress:
progress.add_task("Fetching Episode Streams...", total=None)
streams = anime_provider.get_episode_streams(
anime, episode, config.translation_type
progress.add_task("Fetching Anime...", total=None)
anime: Anime | None = anime_provider.get_anime(
search_results_[search_result_manga_title]["id"]
)
if not streams:
print("Failed to get streams")
if not anime:
print("Sth went wring anime no found")
input("Enter to continue...")
search(config, anime_title, episode_range)
return
episodes_range = []
episodes: list[str] = sorted(
anime["availableEpisodesDetail"][config.translation_type], key=float
)
if episode_range:
if ":" in episode_range:
ep_range_tuple = episode_range.split(":")
if len(ep_range_tuple) == 3 and all(ep_range_tuple):
episodes_start, episodes_end, step = ep_range_tuple
episodes_range = episodes[
int(episodes_start) : int(episodes_end) : int(step)
]
elif len(ep_range_tuple) == 2 and all(ep_range_tuple):
episodes_start, episodes_end = ep_range_tuple
episodes_range = episodes[
int(episodes_start) : int(episodes_end)
]
else:
episodes_start, episodes_end = ep_range_tuple
if episodes_start.strip():
episodes_range = episodes[int(episodes_start) :]
elif episodes_end.strip():
episodes_range = episodes[: int(episodes_end)]
else:
episodes_range = episodes
else:
episodes_range = episodes[int(episode_range) :]
episodes_range = iter(episodes_range)
if config.normalize_titles:
from ...libs.common.mini_anilist import get_basic_anime_info_by_title
anilist_anime_info = get_basic_anime_info_by_title(anime["title"])
def stream_anime():
clear()
episode = None
if episodes_range:
try:
episode = next(episodes_range) # pyright:ignore
print(
f"[cyan]Auto selecting:[/] {search_result_manga_title} [cyan]Episode:[/] {episode}"
)
except StopIteration:
print("[green]Completed binge sequence[/]:smile:")
return
if not episode or episode not in episodes:
choices = [*episodes, "end"]
if config.use_fzf:
episode = fzf.run(
choices,
"Select an episode: ",
header=search_result_manga_title,
)
elif config.use_rofi:
episode = Rofi.run(choices, "Select an episode")
else:
episode = fuzzy_inquirer(
choices,
"Select episode",
)
if episode == "end":
return
try:
# ---- fetch servers ----
if config.server == "top":
with Progress() as progress:
progress.add_task("Fetching top server...", total=None)
server = next(streams, None)
if not server:
print("Sth went wrong when fetching the episode")
# ---- fetch streams ----
with Progress() as progress:
progress.add_task("Fetching Episode Streams...", total=None)
streams = anime_provider.get_episode_streams(
anime, episode, config.translation_type
)
if not streams:
print("Failed to get streams")
return
try:
# ---- fetch servers ----
if config.server == "top":
with Progress() as progress:
progress.add_task("Fetching top server...", total=None)
server = next(streams, None)
if not server:
print("Sth went wrong when fetching the episode")
input("Enter to continue")
stream_anime()
return
stream_link = filter_by_quality(config.quality, server["links"])
if not stream_link:
print("Quality not found")
input("Enter to continue")
stream_anime()
return
stream_link = filter_by_quality(config.quality, server["links"])
if not stream_link:
print("Quality not found")
input("Enter to continue")
stream_anime()
return
link = stream_link["link"]
subtitles = server["subtitles"]
stream_headers = server["headers"]
episode_title = server["episode_title"]
else:
with Progress() as progress:
progress.add_task("Fetching servers", total=None)
# prompt for server selection
servers = {server["server"]: server for server in streams}
servers_names = list(servers.keys())
if config.server in servers_names:
server = config.server
link = stream_link["link"]
subtitles = server["subtitles"]
stream_headers = server["headers"]
episode_title = server["episode_title"]
else:
if config.use_fzf:
server = fzf.run(servers_names, "Select an link: ")
elif config.use_rofi:
server = Rofi.run(servers_names, "Select an link")
with Progress() as progress:
progress.add_task("Fetching servers", total=None)
# prompt for server selection
servers = {server["server"]: server for server in streams}
servers_names = list(servers.keys())
if config.server in servers_names:
server = config.server
else:
server = fuzzy_inquirer(
servers_names,
"Select link",
)
stream_link = filter_by_quality(
config.quality, servers[server]["links"]
)
if not stream_link:
print("Quality not found")
input("Enter to continue")
stream_anime()
return
link = stream_link["link"]
stream_headers = servers[server]["headers"]
subtitles = servers[server]["subtitles"]
episode_title = servers[server]["episode_title"]
if config.use_fzf:
server = fzf.run(servers_names, "Select an link: ")
elif config.use_rofi:
server = Rofi.run(servers_names, "Select an link")
else:
server = fuzzy_inquirer(
servers_names,
"Select link",
)
stream_link = filter_by_quality(
config.quality, servers[server]["links"]
)
if not stream_link:
print("Quality not found")
input("Enter to continue")
stream_anime()
return
link = stream_link["link"]
stream_headers = servers[server]["headers"]
subtitles = servers[server]["subtitles"]
episode_title = servers[server]["episode_title"]
selected_anime_title = search_result
if anilist_anime_info:
selected_anime_title = (
anilist_anime_info["title"][config.preferred_language]
or anilist_anime_info["title"]["romaji"]
or anilist_anime_info["title"]["english"]
)
import re
selected_anime_title = search_result_manga_title
if anilist_anime_info:
selected_anime_title = (
anilist_anime_info["title"][config.preferred_language]
or anilist_anime_info["title"]["romaji"]
or anilist_anime_info["title"]["english"]
)
import re
for episode_detail in anilist_anime_info["episodes"]:
if re.match(f"Episode {episode} ", episode_detail["title"]):
episode_title = episode_detail["title"]
break
print(
f"[purple]Now Playing:[/] {selected_anime_title} Episode {episode}"
)
subtitles = move_preferred_subtitle_lang_to_top(
subtitles, config.sub_lang
)
if config.sync_play:
from ..utils.syncplay import SyncPlayer
for episode_detail in anilist_anime_info["episodes"]:
if re.match(f"Episode {episode} ", episode_detail["title"]):
episode_title = episode_detail["title"]
break
print(
f"[purple]Now Playing:[/] {selected_anime_title} Episode {episode}"
)
subtitles = move_preferred_subtitle_lang_to_top(
subtitles, config.sub_lang
)
if config.sync_play:
from ..utils.syncplay import SyncPlayer
SyncPlayer(
link,
episode_title,
headers=stream_headers,
subtitles=subtitles,
)
else:
run_mpv(
link,
episode_title,
headers=stream_headers,
subtitles=subtitles,
)
except IndexError as e:
print(e)
input("Enter to continue")
stream_anime()
SyncPlayer(
link, episode_title, headers=stream_headers, subtitles=subtitles
)
else:
run_mpv(
link, episode_title, headers=stream_headers, subtitles=subtitles
)
except IndexError as e:
print(e)
input("Enter to continue")
stream_anime()
stream_anime()

View File

@@ -55,6 +55,7 @@ class Config(object):
user: [TODO:attribute]
"""
manga = False
sync_play = False
anime_list: list
watch_history: dict

View File

@@ -7,7 +7,7 @@ import textwrap
from threading import Thread
import requests
from yt_dlp.utils import clean_html
from yt_dlp.utils import clean_html, sanitize_filename
from ...constants import APP_CACHE_DIR
from ...libs.anilist.types import AnilistBaseMediaDataSchema
@@ -168,6 +168,63 @@ def get_rofi_icons(
logger.error("%r generated an exception: %s" % (url, e))
# get rofi icons
def get_fzf_manga_preview(manga_results, workers=None, wait=False):
"""A helper function to make sure that the images are downloaded so they can be used as icons
Args:
titles (list[str]): sanitized titles of the anime; NOTE: its important that they are sanitized since they are used as the filenames of the images
workers ([TODO:parameter]): Number of threads to use to download the images; defaults to as many as possible
anilist_results: the anilist results from an anilist action
"""
def _worker():
# use concurrency to download the images as fast as possible
with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
# load the jobs
future_to_url = {}
for manga in manga_results:
image_url = manga["poster"]
future_to_url[
executor.submit(
save_image_from_url,
image_url,
sanitize_filename(manga["title"]),
)
] = image_url
# execute the jobs
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
future.result()
except Exception as e:
logger.error("%r generated an exception: %s" % (url, e))
background_worker = Thread(
target=_worker,
)
# ensure images and info exists
background_worker.daemon = True
background_worker.start()
# the preview script is in bash so making sure fzf doesnt use any other shell lang to process the preview script
os.environ["SHELL"] = shutil.which("bash") or "bash"
preview = """
%s
if [ -s %s/{} ]; then fzf-preview %s/{}
else echo Loading...
fi
""" % (
fzf_preview,
IMAGES_CACHE_DIR,
IMAGES_CACHE_DIR,
)
if wait:
background_worker.join()
return preview
# get rofi icons
def get_fzf_episode_preview(
anilist_result: AnilistBaseMediaDataSchema, episodes, workers=None, wait=False

View File

@@ -0,0 +1,12 @@
import shutil
import subprocess
from sys import exit
def feh_manga_viewer(image_links: list[str], window_title: str):
FEH_EXECUTABLE = shutil.which("feh")
if not FEH_EXECUTABLE:
print("feh not found")
exit(1)
commands = [FEH_EXECUTABLE, *image_links, "--title", window_title]
subprocess.run(commands)

View File

@@ -165,6 +165,7 @@ class AnilistBaseMediaDataSchema(TypedDict):
nextAiringEpisode: AnilistMediaNextAiringEpisode
season: str
streamingEpisodes: list[StreamingEpisode]
chapters: int
seasonYear: int
duration: int
synonyms: list[str]

View File

@@ -1,153 +0,0 @@
import logging
from typing import TYPE_CHECKING
from requests import post
from thefuzz import fuzz
if TYPE_CHECKING:
from ..anilist.types import AnilistDataSchema
logger = logging.getLogger(__name__)
ANILIST_ENDPOINT = "https://graphql.anilist.co"
"""
query($query:String){
Page(perPage:50){
pageInfo{
total
currentPage
hasNextPage
}
media(search:$query,type:ANIME){
id
idMal
title{
romaji
english
}
episodes
status
nextAiringEpisode {
timeUntilAiring
airingAt
episode
}
}
}
}
"""
def search_for_anime_with_anilist(anime_title: str):
query = """
query($query:String){
Page(perPage:50){
pageInfo{
total
currentPage
hasNextPage
}
media(search:$query,type:ANIME){
id
idMal
title{
romaji
english
}
episodes
status
nextAiringEpisode {
timeUntilAiring
airingAt
episode
}
}
}
}
"""
response = post(
ANILIST_ENDPOINT,
json={"query": query, "variables": {"query": anime_title}},
timeout=10,
)
if response.status_code == 200:
anilist_data: "AnilistDataSchema" = response.json()
return {
"pageInfo": anilist_data["data"]["Page"]["pageInfo"],
"results": [
{
"id": anime_result["id"],
"title": anime_result["title"]["romaji"]
or anime_result["title"]["english"],
"type": "anime",
"availableEpisodes": list(
range(
1,
(
anime_result["episodes"]
if not anime_result["status"] == "RELEASING"
and anime_result["episodes"]
else (
anime_result["nextAiringEpisode"]["episode"] - 1
if anime_result["nextAiringEpisode"]
else 0
)
),
)
),
}
for anime_result in anilist_data["data"]["Page"]["media"]
],
}
def get_mal_id_and_anilist_id(anime_title: str) -> "dict[str,int] | None":
"""the abstraction over all none authenticated requests and that returns data of a similar type
Args:
query: the anilist query
variables: the anilist api variables
Returns:
a boolean indicating success and none or an anilist object depending on success
"""
query = """
query($query:String){
Page(perPage:50){
pageInfo{
total
currentPage
hasNextPage
}
media(search:$query,type:ANIME){
id
idMal
title{
romaji
english
}
}
}
}
"""
try:
variables = {"query": anime_title}
response = post(
ANILIST_ENDPOINT,
json={"query": query, "variables": variables},
timeout=10,
)
anilist_data: "AnilistDataSchema" = response.json()
if response.status_code == 200:
anime = max(
anilist_data["data"]["Page"]["media"],
key=lambda anime: max(
(
fuzz.ratio(anime, str(anime["title"]["romaji"])),
fuzz.ratio(anime_title, str(anime["title"]["english"])),
)
),
)
return {"id_anilist": anime["id"], "id_mal": anime["idMal"]}
except Exception as e:
logger.error(f"Something unexpected occured {e}")

View File

@@ -0,0 +1,15 @@
import logging
from requests import get
logger = logging.getLogger(__name__)
def fetch_anime_info_from_bal(anilist_id):
try:
url = f"https://raw.githubusercontent.com/bal-mackup/mal-backup/master/anilist/anime/{anilist_id}.json"
response = get(url, timeout=11)
if response.status_code == 200:
return response.json()
except Exception as e:
logger.error(e)

View File

@@ -37,7 +37,66 @@ query($query:String){
"""
def search_foranime_with_anilist(anime_title: str):
def search_for_manga_with_anilist(manga_title: str):
query = """
query($query:String){
Page(perPage:50){
pageInfo{
currentPage
}
media(search:$query,type:MANGA){
id
idMal
title{
romaji
english
}
chapters
status
coverImage{
medium
large
}
}
}
}
"""
response = post(
ANILIST_ENDPOINT,
json={"query": query, "variables": {"query": manga_title}},
timeout=10,
)
if response.status_code == 200:
anilist_data: "AnilistDataSchema" = response.json()
return {
"pageInfo": anilist_data["data"]["Page"]["pageInfo"],
"results": [
{
"id": anime_result["id"],
"poster": anime_result["coverImage"]["large"],
"title": (
anime_result["title"]["romaji"]
or anime_result["title"]["english"]
)
+ f" [Chapters: {anime_result['chapters']}]",
"type": "manga",
"availableChapters": list(
range(
1,
(
anime_result["chapters"]
if anime_result["chapters"]
else 0
),
)
),
}
for anime_result in anilist_data["data"]["Page"]["media"]
],
}
def search_for_anime_with_anilist(anime_title: str):
query = """
query($query:String){
Page(perPage:50){

View File

@@ -0,0 +1 @@
manga_sources = {"mangadex": "api.MangaDexApi"}

View File

@@ -0,0 +1,13 @@
import requests
from yt_dlp.utils.networking import random_user_agent
class MangaProvider:
session: requests.Session
USER_AGENT = random_user_agent()
HEADERS = {}
def __init__(self) -> None:
self.session = requests.session()
self.session.headers.update({"User-Agent": self.USER_AGENT, **self.HEADERS})

View File

@@ -0,0 +1,15 @@
import logging
from requests import get
logger = logging.getLogger(__name__)
def fetch_manga_info_from_bal(anilist_id):
try:
url = f"https://raw.githubusercontent.com/bal-mackup/mal-backup/master/anilist/manga/{anilist_id}.json"
response = get(url, timeout=11)
if response.ok:
return response.json()
except Exception as e:
logger.error(e)

View File

@@ -0,0 +1,51 @@
import logging
from ...common.mini_anilist import search_for_manga_with_anilist
from ..base_provider import MangaProvider
from ..common import fetch_manga_info_from_bal
logger = logging.getLogger(__name__)
class MangaDexApi(MangaProvider):
def search_for_manga(self, title: str, *args):
try:
search_results = search_for_manga_with_anilist(title)
return search_results
except Exception as e:
logger.error(f"[MANGADEX-ERROR]: {e}")
def get_manga(self, anilist_manga_id: str):
bal_data = fetch_manga_info_from_bal(anilist_manga_id)
if not bal_data:
return
manga_id, MangaDexManga = next(iter(bal_data["Sites"]["Mangadex"].items()))
return {
"id": manga_id,
"title": MangaDexManga["title"],
"poster": MangaDexManga["image"],
"availableChapters": [],
}
def get_chapter_thumbnails(self, manga_id, chapter):
chapter_info_url = f"https://api.mangadex.org/chapter?manga={manga_id}&translatedLanguage[]=en&chapter={chapter}&includeEmptyPages=0"
chapter_info_response = self.session.get(chapter_info_url)
if not chapter_info_response.ok:
return
chapter_info = next(iter(chapter_info_response.json()["data"]))
chapters_thumbnails_url = (
f"https://api.mangadex.org/at-home/server/{chapter_info['id']}"
)
chapter_thumbnails_response = self.session.get(chapters_thumbnails_url)
if not chapter_thumbnails_response.ok:
return
chapter_thumbnails_info = chapter_thumbnails_response.json()
base_url = chapter_thumbnails_info["baseUrl"]
hash = chapter_thumbnails_info["chapter"]["hash"]
return {
"thumbnails": [
f"{base_url}/data/{hash}/{chapter_thumbnail}"
for chapter_thumbnail in chapter_thumbnails_info["chapter"]["data"]
],
"title": chapter_info["attributes"]["title"],
}