Compare commits

...

12 Commits

Author SHA1 Message Date
Benex254
de4ddf2f3a chore: bump version 2024-08-19 00:21:48 +03:00
Benex254
9c94d824d1 fix: rearrange servers available 2024-08-19 00:21:16 +03:00
Benex254
495f3cfbf6 chore: bump version 2024-08-18 23:59:30 +03:00
Benex254
b56c9ae3dd docs: update reamde 2024-08-18 23:59:16 +03:00
Benex254
5e9ef87526 feat: improve provider api 2024-08-18 23:55:29 +03:00
Benex254
b68d6d6fe9 feat: accomodate subtitle streams 2024-08-18 23:54:59 +03:00
Benex254
5870cc6640 feat: accomodate subtitle streams 2024-08-18 23:54:36 +03:00
Benex254
7a43d58d82 fix: command order 2024-08-18 23:54:16 +03:00
Benex254
fc7efebc8d feat: accomodate subtitle streams 2024-08-18 23:53:36 +03:00
Benex254
528be74194 feat(aniwatch): init 2024-08-18 23:52:18 +03:00
Benex254
ab782acf2f chore: bump version 2024-08-18 15:47:44 +03:00
Benex254
45836d1ebc fix: handle no matches for search results 2024-08-18 15:47:29 +03:00
27 changed files with 534 additions and 83 deletions

View File

@@ -51,7 +51,6 @@ Heavily inspired by [animdl](https://github.com/justfoolingaround/animdl), [magi
- [Key Bindings](#key-bindings)
- [Script Messages](#script-messages)
- [Configuration](#configuration)
- [The python api](#the-python-api)
- [Contributing](#contributing)
- [Receiving Support](#receiving-support)
- [Supporting the Project](#supporting-the-project)
@@ -59,7 +58,7 @@ Heavily inspired by [animdl](https://github.com/justfoolingaround/animdl), [magi
> [!IMPORTANT]
>
> This project currently scrapes allanime and animepahe. The site is in the public domain and can be accessed by any one with a browser.
> This project currently scrapes allanime, aniwatch and animepahe. The site is in the public domain and can be accessed by any one with a browser.
## Installation
@@ -234,6 +233,7 @@ Available options for the fastanime include:
- `--use-mpv-mod/--use-default-player` whether to use python-mpv
- `--provider <allanime/animepahe>` anime site of choice to scrape from
- `--sync-play` or `-sp` use syncplay for streaming anime so you can watch with your friends
- `--sub_lang <en/or any other common shortform for country>` regex is used to determine the appropriate. Only works when provider is aniwatch.
Example usage of the above options
@@ -654,28 +654,6 @@ notification_duration=2
# Not implemented yet
```
## The python api
The project offers a python api that can be used in other python programs.
```python
from fastanime.AnimeProvider import AnimeProvider
# all output is typed, so will be easy to work with
# providers include [allanime, animepahe]
provider = AnimeProvider(provider="allanime")
# to search for anime
provider.search_for_anime()
# to get anime info
provider.get_anime()
# to get streams of an episode
provider.get_episode_streams()
```
## Contributing
We welcome your issues and feature requests. However, due to time constraints, we currently do not plan to add another provider.

View File

@@ -38,6 +38,7 @@ class YtDLPDownloader:
force_unknown_ext=False,
verbose=False,
headers={},
sub="",
):
"""Helper function that downloads anime given url and path details
@@ -60,9 +61,11 @@ class YtDLPDownloader:
"format": vid_format,
"compat_opts": ("allow-unsafe-ext",) if force_unknown_ext else tuple(),
}
urls = [url]
if sub:
urls.append(sub)
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
ydl.download([url])
ydl.download(urls)
# WARN: May remove this legacy functionality
def download_file(self, url: str, title, silent=True):

View File

@@ -6,7 +6,7 @@ if sys.version_info < (3, 10):
) # noqa: F541
__version__ = "v2.2.5"
__version__ = "v2.3.1"
APP_NAME = "FastAnime"
AUTHOR = "Benex254"

View File

@@ -98,6 +98,11 @@ signal.signal(signal.SIGINT, handle_exit)
type=click.Choice(["dub", "sub"]),
help="Anime language[dub/sub]",
)
@click.option(
"-sl",
"--sub-lang",
help="Set the preferred language for subs",
)
@click.option(
"-A/-no-A",
"--auto-next/--no-auto-next",
@@ -156,6 +161,7 @@ def run_cli(
local_history,
skip,
translation_type,
sub_lang,
quality,
auto_next,
auto_select,
@@ -216,6 +222,8 @@ def run_cli(
ctx.obj.server = server
if format:
ctx.obj.format = format
if sub_lang:
ctx.obj.sub_lang = sub_lang
if ctx.get_parameter_source("continue_") == click.core.ParameterSource.COMMANDLINE:
ctx.obj.continue_from_history = continue_
if ctx.get_parameter_source("skip") == click.core.ParameterSource.COMMANDLINE:

View File

@@ -59,7 +59,11 @@ def download(
from ...libs.fzf import fzf
from ...Utility.downloader.downloader import downloader
from ..utils.tools import exit_app
from ..utils.utils import filter_by_quality, fuzzy_inquirer
from ..utils.utils import (
filter_by_quality,
fuzzy_inquirer,
move_preferred_subtitle_lang_to_top,
)
anime_provider = AnimeProvider(config.provider)
@@ -83,6 +87,9 @@ def download(
)
return
search_results = search_results["results"]
if not search_results:
print("Nothing muches your search term")
exit_app(1)
search_results_ = {
search_result["title"]: search_result for search_result in search_results
}
@@ -165,37 +172,12 @@ def download(
if config.server == "top":
with Progress() as progress:
progress.add_task("Fetching top server...", total=None)
server = next(streams, None)
if not server:
server_name = next(streams, None)
if not server_name:
print("Sth went wrong when fetching the server")
continue
stream_link = filter_by_quality(config.quality, server["links"])
if not stream_link:
print("[yellow bold]WARNING:[/] No streams found")
time.sleep(1)
print("Continuing...")
continue
link = stream_link["link"]
provider_headers = server["headers"]
episode_title = server["episode_title"]
else:
with Progress() as progress:
progress.add_task("Fetching servers", total=None)
# prompt for server selection
servers = {server["server"]: server for server in streams}
servers_names = list(servers.keys())
if config.server in servers_names:
server = config.server
else:
if config.use_fzf:
server = fzf.run(servers_names, "Select an link: ")
else:
server = fuzzy_inquirer(
servers_names,
"Select link",
)
stream_link = filter_by_quality(
config.quality, servers[server]["links"]
config.quality, server_name["links"]
)
if not stream_link:
print("[yellow bold]WARNING:[/] No streams found")
@@ -203,11 +185,42 @@ def download(
print("Continuing...")
continue
link = stream_link["link"]
provider_headers = servers[server]["headers"]
provider_headers = server_name["headers"]
episode_title = server_name["episode_title"]
subtitles = server_name["subtitles"]
else:
with Progress() as progress:
progress.add_task("Fetching servers", total=None)
# prompt for server selection
servers = {server["server"]: server for server in streams}
servers_names = list(servers.keys())
if config.server in servers_names:
server_name = config.server
else:
if config.use_fzf:
server_name = fzf.run(servers_names, "Select an link: ")
else:
server_name = fuzzy_inquirer(
servers_names,
"Select link",
)
stream_link = filter_by_quality(
config.quality, servers[server_name]["links"]
)
if not stream_link:
print("[yellow bold]WARNING:[/] No streams found")
time.sleep(1)
print("Continuing...")
continue
link = stream_link["link"]
provider_headers = servers[server_name]["headers"]
episode_title = servers[server]["episode_title"]
subtitles = servers[server_name]["subtitles"]
episode_title = servers[server_name]["episode_title"]
print(f"[purple]Now Downloading:[/] {search_result} Episode {episode}")
subtitles = move_preferred_subtitle_lang_to_top(
subtitles, config.sub_lang
)
downloader._download_file(
link,
anime["title"],
@@ -218,6 +231,7 @@ def download(
force_unknown_ext,
verbose,
headers=provider_headers,
sub=subtitles[0]["url"] if subtitles else "",
)
except Exception as e:
print(e)

View File

@@ -76,6 +76,9 @@ def grab(
continue
search_results = search_results["results"]
if not search_results:
logger.error("no results for your search")
exit(1)
search_results_ = {
search_result["title"]: search_result for search_result in search_results
}
@@ -88,13 +91,13 @@ def grab(
anime = anime_provider.get_anime(search_results_[search_result]["id"])
if not anime:
exit(1)
episodes = sorted(
anime["availableEpisodesDetail"][config.translation_type], key=float
)
if anime_info_only:
# grab only the anime data skipping all lines after this
grabbed_animes.append(anime)
continue
episodes = sorted(
anime["availableEpisodesDetail"][config.translation_type], key=float
)
# where the magic happens
if episode_range:

View File

@@ -35,7 +35,11 @@ def search(config: Config, anime_titles: str, episode_range: str):
from ...libs.rofi import Rofi
from ..utils.mpv import run_mpv
from ..utils.tools import exit_app
from ..utils.utils import filter_by_quality, fuzzy_inquirer
from ..utils.utils import (
filter_by_quality,
fuzzy_inquirer,
move_preferred_subtitle_lang_to_top,
)
anime_provider = AnimeProvider(config.provider)
@@ -177,6 +181,7 @@ def search(config: Config, anime_titles: str, episode_range: str):
stream_anime()
return
link = stream_link["link"]
subtitles = server["subtitles"]
stream_headers = server["headers"]
episode_title = server["episode_title"]
else:
@@ -207,15 +212,23 @@ def search(config: Config, anime_titles: str, episode_range: str):
return
link = stream_link["link"]
stream_headers = servers[server]["headers"]
subtitles = servers[server]["subtitles"]
episode_title = servers[server]["episode_title"]
print(f"[purple]Now Playing:[/] {search_result} Episode {episode}")
subtitles = move_preferred_subtitle_lang_to_top(
subtitles, config.sub_lang
)
if config.sync_play:
from ..utils.syncplay import SyncPlayer
SyncPlayer(link, episode_title, headers=stream_headers)
SyncPlayer(
link, episode_title, headers=stream_headers, subtitles=subtitles
)
else:
run_mpv(link, episode_title, headers=stream_headers)
run_mpv(
link, episode_title, headers=stream_headers, subtitles=subtitles
)
except IndexError as e:
print(e)
input("Enter to continue")

View File

@@ -96,6 +96,7 @@ class Config(object):
"rofi_theme_input": "",
"rofi_theme_confirm": "",
"ffmpegthumnailer_seek_time": "-1",
"sub_lang": "eng",
}
)
self.configparser.add_section("stream")
@@ -109,6 +110,7 @@ class Config(object):
# --- set config values from file or using defaults ---
self.downloads_dir = self.get_downloads_dir()
self.sub_lang = self.get_sub_lang()
self.provider = self.get_provider()
self.use_fzf = self.get_use_fzf()
self.use_rofi = self.get_use_rofi()
@@ -187,6 +189,9 @@ class Config(object):
def get_preferred_language(self):
return self.configparser.get("general", "preferred_language")
def get_sub_lang(self):
return self.configparser.get("general", "sub_lang")
def get_downloads_dir(self):
return self.configparser.get("general", "downloads_dir")

View File

@@ -113,6 +113,7 @@ def media_player_controls(
current_episode_number,
):
custom_args.extend(args)
subtitles = selected_server["subtitles"]
if config.sync_play:
from ..utils.syncplay import SyncPlayer
@@ -120,6 +121,7 @@ def media_player_controls(
current_episode_stream_link,
selected_server["episode_title"],
headers=selected_server["headers"],
subtitles=subtitles,
)
elif config.use_mpv_mod:
from ..utils.player import player
@@ -141,6 +143,11 @@ def media_player_controls(
mpv._set_property("script-opts", script_opts[1])
if not start_time == "0":
mpv.start = start_time
mpv.wait_until_playing()
if subtitles:
mpv.sub_add(
subtitles[0]["url"], "select", None, subtitles[0]["language"]
)
mpv.wait_for_shutdown()
mpv.terminate()
stop_time = player.last_stop_time
@@ -152,6 +159,7 @@ def media_player_controls(
start_time=start_time,
custom_args=custom_args,
headers=selected_server["headers"],
subtitles=subtitles,
)
# either update the watch history to the next episode or current depending on progress
@@ -509,6 +517,7 @@ def provider_anime_episode_servers_menu(
current_episode_number,
):
custom_args.extend(args)
subtitles = selected_server["subtitles"]
if config.sync_play:
from ..utils.syncplay import SyncPlayer
@@ -516,6 +525,7 @@ def provider_anime_episode_servers_menu(
current_stream_link,
selected_server["episode_title"],
headers=selected_server["headers"],
subtitles=subtitles,
)
elif config.use_mpv_mod:
from ..utils.player import player
@@ -537,6 +547,13 @@ def provider_anime_episode_servers_menu(
mpv._set_property("script-opts", script_opts[1])
if not start_time == "0" and episode_in_history == current_episode_number:
mpv.start = start_time
mpv.wait_until_playing()
if subtitles:
# subs = ""
# for subtitle in subtitles:
# subs += f"{subtitle['url']},"
mpv.sub_add(subtitles[0]["url"], "select", None, subtitles[0]["language"])
# mpv.sub_files = subs
mpv.wait_for_shutdown()
mpv.terminate()
stop_time = player.last_stop_time
@@ -551,6 +568,7 @@ def provider_anime_episode_servers_menu(
start_time=start_time,
custom_args=custom_args,
headers=selected_server["headers"],
subtitles=subtitles,
)
print("Finished at: ", stop_time)

View File

@@ -55,6 +55,7 @@ def run_mpv(
ytdl_format="",
custom_args=[],
headers={},
subtitles=[],
):
# Determine if mpv is available
MPV = shutil.which("mpv")
@@ -108,6 +109,8 @@ def run_mpv(
for header_name, header_value in headers.items():
mpv_headers += f"{header_name}:{header_value},"
mpv_args.append(mpv_headers)
for subtitle in subtitles:
mpv_args.append(f"--sub-file={subtitle['url']}")
if start_time != "0":
mpv_args.append(f"--start={start_time}")
if title:

View File

@@ -4,7 +4,7 @@ import subprocess
from .tools import exit_app
def SyncPlayer(url: str, anime_title=None, headers={}, *args):
def SyncPlayer(url: str, anime_title=None, headers={}, subtitles=[], *args):
# TODO: handle m3u8 multi quality streams
#
# check for SyncPlay
@@ -20,6 +20,8 @@ def SyncPlayer(url: str, anime_title=None, headers={}, *args):
for header_name, header_value in headers.items():
mpv_headers += f"{header_name}:{header_value},"
mpv_args.append(mpv_headers)
for subtitle in subtitles:
mpv_args.append(f"--sub-file={subtitle['url']}")
if not anime_title:
subprocess.run(
[

View File

@@ -19,6 +19,25 @@ BG_GREEN = "\033[48;2;120;233;12;m"
GREEN = "\033[38;2;45;24;45;m"
def move_preferred_subtitle_lang_to_top(sub_list, lang_str):
"""Moves the dictionary with the given ID to the front of the list.
Args:
sub_list: list of subs
lang_str: the sub lang pref
Returns:
The modified list.
"""
import re
for i, d in enumerate(sub_list):
if re.search(lang_str, d["language"], re.IGNORECASE):
sub_list.insert(0, sub_list.pop(i))
break
return sub_list
def filter_by_quality(quality: str, stream_links: "list[EpisodeStream]", default=True):
"""Helper function used to filter a list of EpisodeStream objects to one that has a corresponding quality

View File

@@ -1,13 +1,10 @@
from .allanime import SERVERS_AVAILABLE as ALLANIME_SERVERS
from .animepahe import SERVERS_AVAILABLE as ANIMEPAHESERVERS
from .aniwatch import SERVERS_AVAILABLE as ANIWATCHSERVERS
anime_sources = {
"allanime": "api.AllAnimeAPI",
"animepahe": "api.AnimePaheApi",
"aniwatch": "api.AniWatchApi",
}
SERVERS_AVAILABLE = [
"sharepoint",
"dropbox",
"gogoanime",
"weTransfer",
"wixmp",
"kwik",
"Yt",
]
SERVERS_AVAILABLE = [*ALLANIME_SERVERS, *ANIMEPAHESERVERS, *ANIWATCHSERVERS]

View File

@@ -0,0 +1 @@
SERVERS_AVAILABLE = ["sharepoint", "dropbox", "gogoanime", "weTransfer", "wixmp", "Yt"]

View File

@@ -231,6 +231,7 @@ class AllAnimeAPI(AnimeProvider):
"server": "Yt",
"episode_title": f'{anime["title"]}; Episode {episode_number}',
"headers": {"Referer": f"https://{ALLANIME_BASE}/"},
"subtitles": [],
"links": [
{
"link": url,
@@ -260,6 +261,7 @@ class AllAnimeAPI(AnimeProvider):
yield {
"server": "gogoanime",
"headers": {},
"subtitles": [],
"episode_title": (
allanime_episode["notes"] or f'{anime["title"]}'
)
@@ -271,6 +273,7 @@ class AllAnimeAPI(AnimeProvider):
yield {
"server": "wetransfer",
"headers": {},
"subtitles": [],
"episode_title": (
allanime_episode["notes"] or f'{anime["title"]}'
)
@@ -282,6 +285,7 @@ class AllAnimeAPI(AnimeProvider):
yield {
"server": "sharepoint",
"headers": {},
"subtitles": [],
"episode_title": (
allanime_episode["notes"] or f'{anime["title"]}'
)
@@ -293,6 +297,7 @@ class AllAnimeAPI(AnimeProvider):
yield {
"server": "dropbox",
"headers": {},
"subtitles": [],
"episode_title": (
allanime_episode["notes"] or f'{anime["title"]}'
)
@@ -304,6 +309,7 @@ class AllAnimeAPI(AnimeProvider):
yield {
"server": "wixmp",
"headers": {},
"subtitles": [],
"episode_title": (
allanime_episode["notes"] or f'{anime["title"]}'
)

View File

@@ -4,4 +4,3 @@ ALLANIME_BASE = "allanime.day"
ALLANIME_REFERER = "https://allanime.to/"
ALLANIME_API_ENDPOINT = "https://api.{}/api/".format(ALLANIME_BASE)
USER_AGENT = random_user_agent()
SERVERS_AVAILABLE = ["sharepoint", "dropbox", "gogoanime", "weTransfer", "wixmp"]

View File

@@ -0,0 +1 @@
SERVERS_AVAILABLE = ["kwik"]

View File

@@ -136,7 +136,7 @@ class AnimePaheApi(AnimeProvider):
},
"episodesInfo": [
{
"title": episode["title"] or f"{title};{episode['episode']}",
"title": f"{episode['title'] or title};{episode['episode']}",
"episode": episode["episode"],
"id": episode["session"],
"translation_type": episode["audio"],
@@ -190,6 +190,7 @@ class AnimePaheApi(AnimeProvider):
"server": "kwik",
"links": [],
"episode_title": episode_title,
"subtitles": [],
"headers": {},
}
for res_dict in res_dicts:

View File

@@ -0,0 +1 @@
SERVERS_AVAILABLE = ["HD1", "HD2", "StreamSB", "StreamTape"]

View File

@@ -0,0 +1,167 @@
import logging
import re
from itertools import cycle
from yt_dlp.utils import (
extract_attributes,
get_element_html_by_class,
get_elements_html_by_class,
)
from ..base_provider import AnimeProvider
from ..common import fetch_anime_info_from_bal
from ..mini_anilist import search_for_anime_with_anilist
from ..utils import give_random_quality
from . import SERVERS_AVAILABLE
from .types import AniWatchStream
logger = logging.getLogger(__name__)
LINK_TO_STREAMS_REGEX = re.compile(r".*://(.*)/embed-(2|4|6)/e-([0-9])/(.*)\?.*")
class AniWatchApi(AnimeProvider):
def search_for_anime(self, anime_title: str, *args):
try:
return search_for_anime_with_anilist(anime_title)
except Exception as e:
logger.error(e)
def get_anime(self, anilist_id, *args):
try:
bal_results = fetch_anime_info_from_bal(anilist_id)
if not bal_results:
return
ZORO = bal_results["Sites"]["Zoro"]
aniwatch_id = list(ZORO.keys())[0]
anime_url = f"https://hianime.to/ajax/v2/episode/list/{aniwatch_id}"
response = self.session.get(anime_url, timeout=10)
if response.status_code == 200:
response_json = response.json()
aniwatch_anime_page = response_json["html"]
episodes_info_container_html = get_element_html_by_class(
"ss-list", aniwatch_anime_page
)
episodes_info_html_list = get_elements_html_by_class(
"ep-item", episodes_info_container_html
)
# keys: [ data-number: episode_number, data-id: episode_id, title: episode_title , href:episode_page_url]
episodes_info_dicts = [
extract_attributes(episode_dict)
for episode_dict in episodes_info_html_list
]
episodes = [episode["data-number"] for episode in episodes_info_dicts]
self.episodes_info = [
{
"id": episode["data-id"],
"title": f"{episode['title'] or ZORO['title']}; Episode {episode['data-number']}",
"episode": episode["data-number"],
}
for episode in episodes_info_dicts
]
return {
"id": aniwatch_id,
"availableEpisodesDetail": {
"dub": episodes,
"sub": episodes,
"raw": episodes,
},
"poster": ZORO[aniwatch_id]["image"],
"title": ZORO[aniwatch_id]["title"],
"episodes_info": self.episodes_info,
}
except Exception as e:
logger.error(e)
def get_episode_streams(self, anime, episode, translation_type, *args):
try:
episode_details = [
episode_details
for episode_details in self.episodes_info
if episode_details["episode"] == episode
]
if not episode_details:
return
episode_details = episode_details[0]
episode_url = f"https://hianime.to/ajax/v2/episode/servers?episodeId={episode_details['id']}"
response = self.session.get(episode_url)
if response.status_code == 200:
response_json = response.json()
episode_page_html = response_json["html"]
servers_containers_html = get_elements_html_by_class(
"ps__-list", episode_page_html
)
if not servers_containers_html:
return
# sub servers
try:
servers_html_sub = get_elements_html_by_class(
"server-item", servers_containers_html[0]
)
except Exception:
logger.warn("AniWatch: sub not found")
servers_html_sub = None
# dub servers
try:
servers_html_dub = get_elements_html_by_class(
"server-item", servers_containers_html[1]
)
except Exception:
logger.warn("AniWatch: dub not found")
servers_html_dub = None
if translation_type == "dub":
servers_html = servers_html_dub
else:
servers_html = servers_html_sub
if not servers_html:
return
for server_name, server_html in zip(
cycle(SERVERS_AVAILABLE), servers_html
):
try:
# keys: [ data-type: translation_type, data-id: embed_id, data-server-id: server_id ]
servers_info = extract_attributes(server_html)
embed_url = f"https://hianime.to/ajax/v2/episode/sources?id={servers_info['data-id']}"
embed_response = self.session.get(embed_url)
if embed_response.status_code == 200:
embed_json = embed_response.json()
raw_link_to_streams = embed_json["link"]
match = LINK_TO_STREAMS_REGEX.match(raw_link_to_streams)
if not match:
continue
provider_domain = match.group(1)
embed_type = match.group(2)
episode_number = match.group(3)
source_id = match.group(4)
link_to_streams = f"https://{provider_domain}/embed-{embed_type}/ajax/e-{episode_number}/getSources?id={source_id}"
link_to_streams_response = self.session.get(link_to_streams)
if link_to_streams_response.status_code == 200:
juicy_streams_json: "AniWatchStream" = (
link_to_streams_response.json()
)
yield {
"headers": {},
"subtitles": [
{
"url": track["file"],
"language": track["label"],
}
for track in juicy_streams_json["tracks"]
if track["kind"] == "captions"
],
"server": server_name,
"episode_title": episode_details["title"],
"links": give_random_quality(
[
{"link": link["file"], "type": link["type"]}
for link in juicy_streams_json["sources"]
]
),
}
except Exception as e:
logger.error(e)
except Exception as e:
logger.error(e)

View File

@@ -0,0 +1,26 @@
from typing import Literal, TypedDict
class AniWatchSkipTime(TypedDict):
start: int
end: int
class AniWatchSource(TypedDict):
file: str
type: str
class AniWatchTrack(TypedDict):
file: str
label: str
kind: Literal["captions", "thumbnails", "audio"]
class AniWatchStream(TypedDict):
sources: list[AniWatchSource]
tracks: list[AniWatchTrack]
encrypted: bool
intro: AniWatchSkipTime
outro: AniWatchSkipTime
server: int

View File

@@ -0,0 +1,15 @@
import logging
from requests import get
logger = logging.getLogger(__name__)
def fetch_anime_info_from_bal(anilist_id):
try:
url = f"https://raw.githubusercontent.com/bal-mackup/mal-backup/master/anilist/anime/{anilist_id}.json"
response = get(url, timeout=11)
if response.status_code == 200:
return response.json()
except Exception as e:
logger.error(e)

View File

@@ -0,0 +1,153 @@
import logging
from typing import TYPE_CHECKING
from requests import post
from thefuzz import fuzz
if TYPE_CHECKING:
from ..anilist.types import AnilistDataSchema
logger = logging.getLogger(__name__)
ANILIST_ENDPOINT = "https://graphql.anilist.co"
"""
query($query:String){
Page(perPage:50){
pageInfo{
total
currentPage
hasNextPage
}
media(search:$query,type:ANIME){
id
idMal
title{
romaji
english
}
episodes
status
nextAiringEpisode {
timeUntilAiring
airingAt
episode
}
}
}
}
"""
def search_for_anime_with_anilist(anime_title: str):
query = """
query($query:String){
Page(perPage:50){
pageInfo{
total
currentPage
hasNextPage
}
media(search:$query,type:ANIME){
id
idMal
title{
romaji
english
}
episodes
status
nextAiringEpisode {
timeUntilAiring
airingAt
episode
}
}
}
}
"""
response = post(
ANILIST_ENDPOINT,
json={"query": query, "variables": {"query": anime_title}},
timeout=10,
)
if response.status_code == 200:
anilist_data: "AnilistDataSchema" = response.json()
return {
"pageInfo": anilist_data["data"]["Page"]["pageInfo"],
"results": [
{
"id": anime_result["id"],
"title": anime_result["title"]["romaji"]
or anime_result["title"]["english"],
"type": "anime",
"availableEpisodes": list(
range(
1,
(
anime_result["episodes"]
if not anime_result["status"] == "RELEASING"
and anime_result["episodes"]
else (
anime_result["nextAiringEpisode"]["episode"] - 1
if anime_result["nextAiringEpisode"]
else 0
)
),
)
),
}
for anime_result in anilist_data["data"]["Page"]["media"]
],
}
def get_mal_id_and_anilist_id(anime_title: str) -> "dict[str,int] | None":
"""the abstraction over all none authenticated requests and that returns data of a similar type
Args:
query: the anilist query
variables: the anilist api variables
Returns:
a boolean indicating success and none or an anilist object depending on success
"""
query = """
query($query:String){
Page(perPage:50){
pageInfo{
total
currentPage
hasNextPage
}
media(search:$query,type:ANIME){
id
idMal
title{
romaji
english
}
}
}
}
"""
try:
variables = {"query": anime_title}
response = post(
ANILIST_ENDPOINT,
json={"query": query, "variables": variables},
timeout=10,
)
anilist_data: "AnilistDataSchema" = response.json()
if response.status_code == 200:
anime = max(
anilist_data["data"]["Page"]["media"],
key=lambda anime: max(
(
fuzz.ratio(anime, str(anime["title"]["romaji"])),
fuzz.ratio(anime_title, str(anime["title"]["english"])),
)
),
)
return {"id_anilist": anime["id"], "id_mal": anime["idMal"]}
except Exception as e:
logger.error(f"Something unexpected occured {e}")

View File

@@ -39,9 +39,20 @@ class AnimeEpisodeDetails(TypedDict):
raw: list[str]
class AnimeEpisode(TypedDict):
#
# class AnimeEpisode(TypedDict):
# id: str
# title: str
#
class AnimeEpisodeInfo(TypedDict):
id: str
title: str
episode: str
poster: str | None
duration: str | None
translation_type: str | None
class Anime(TypedDict):
@@ -49,7 +60,7 @@ class Anime(TypedDict):
title: str
availableEpisodesDetail: AnimeEpisodeDetails
type: str | None
episodesInfo: list[AnimeEpisode] | None
episodesInfo: list[AnimeEpisodeInfo] | None
poster: str
year: str
@@ -64,8 +75,15 @@ class EpisodeStream(TypedDict):
translation_type: Literal["dub", "sub"]
class Subtitle(TypedDict):
url: str
language: str
class Server(TypedDict):
headers: dict
subtitles: list[Subtitle]
audio: list
server: str
episode_title: str
links: list[EpisodeStream]

View File

@@ -35,12 +35,12 @@ hex_to_char = {
}
def give_random_quality(links: list[dict]):
def give_random_quality(links):
qualities = cycle(["1080", "720", "480", "360"])
return [
{"link": link["link"], "quality": quality}
for link, quality in zip(links, qualities)
{**episode_stream, "quality": quality}
for episode_stream, quality in zip(links, qualities)
]

View File

@@ -1,6 +1,6 @@
[tool.poetry]
name = "fastanime"
version = "2.2.5"
version = "2.3.1"
description = "A browser anime site experience from the terminal"
authors = ["Benextempest <benextempest@gmail.com>"]
license = "UNLICENSE"