feat: recreate all allanime extractors

This commit is contained in:
Benexl
2025-07-07 13:48:19 +03:00
parent f51ceaacd7
commit 870bb24e1b
9 changed files with 148 additions and 2694 deletions

File diff suppressed because it is too large Load Diff

View File

@@ -1,528 +0,0 @@
import concurrent.futures
import logging
import os
import shutil
import subprocess
import textwrap
from hashlib import sha256
from threading import Thread
import requests
from yt_dlp.utils import clean_html
from ...constants import APP_CACHE_DIR, S_PLATFORM
from ...libs.anilist.types import AnilistBaseMediaDataSchema
from ...Utility import anilist_data_helper
from ..utils.scripts import bash_functions
from ..utils.utils import get_true_fg, which_bashlike
logger = logging.getLogger(__name__)
# ---- aniskip intergration ----
def aniskip(mal_id: int, episode: str):
"""helper function to be used for setting and getting skip data
Args:
mal_id: mal id of the anime
episode: episode number
Returns:
mpv chapter options
"""
ANISKIP = shutil.which("ani-skip")
if not ANISKIP:
print("Aniskip not found, please install and try again")
return
args = [ANISKIP, "-q", str(mal_id), "-e", str(episode)]
aniskip_result = subprocess.run(
args, text=True, stdout=subprocess.PIPE, check=False
)
if aniskip_result.returncode != 0:
return
mpv_skip_args = aniskip_result.stdout.strip()
return mpv_skip_args.split(" ")
# ---- prevew stuff ----
# import tempfile
# NOTE: May change this to a temp dir but there were issues so later
WORKING_DIR = APP_CACHE_DIR # tempfile.gettempdir()
_HEADER_COLOR = os.environ.get("FASTANIME_PREVIEW_HEADER_COLOR", "215,0,95").split(",")
HEADER_COLOR = _HEADER_COLOR[0], _HEADER_COLOR[1], _HEADER_COLOR[2]
_SEPARATOR_COLOR = os.environ.get(
"FASTANIME_PREVIEW_SEPARATOR_COLOR", "208,208,208"
).split(",")
SEPARATOR_COLOR = _SEPARATOR_COLOR[0], _SEPARATOR_COLOR[1], _SEPARATOR_COLOR[2]
SINGLE_QUOTE = "'"
IMAGES_CACHE_DIR = os.path.join(WORKING_DIR, "images")
if not os.path.exists(IMAGES_CACHE_DIR):
os.mkdir(IMAGES_CACHE_DIR)
ANIME_INFO_CACHE_DIR = os.path.join(WORKING_DIR, "info")
if not os.path.exists(ANIME_INFO_CACHE_DIR):
os.mkdir(ANIME_INFO_CACHE_DIR)
def save_image_from_url(url: str, file_name: str):
"""Helper function that downloads an image to the FastAnime images cache dir given its url and filename
Args:
url: image url to download
file_name: filename to use
"""
image = requests.get(url)
with open(
os.path.join(
IMAGES_CACHE_DIR, f"{sha256(file_name.encode('utf-8')).hexdigest()}.png"
),
"wb",
) as f:
f.write(image.content)
def save_info_from_str(info: str, file_name: str):
"""Helper function that writes text (anime details and info) to a file given its filename
Args:
info: the information anilist has on the anime
file_name: the filename to use
"""
with open(
os.path.join(
ANIME_INFO_CACHE_DIR,
sha256(file_name.encode("utf-8")).hexdigest(),
),
"w",
encoding="utf-8",
) as f:
f.write(info)
def write_search_results(
anilist_results: list[AnilistBaseMediaDataSchema],
titles: list[str],
workers: int | None = None,
):
"""A helper function used by and run in a background thread by get_fzf_preview function in order to get the actual preview data to be displayed by fzf
Args:
anilist_results: the anilist results from an anilist action
titles: sanitized anime titles
workers:number of threads to use defaults to as many as possible
"""
# use concurency to download and write as fast as possible
with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
future_to_task = {}
for anime, title in zip(anilist_results, titles, strict=False):
# actual image url
image_url = ""
if os.environ.get("FASTANIME_IMAGE_PREVIEWS", "true").lower() == "true":
image_url = anime["coverImage"]["large"]
if not (
os.path.exists(
os.path.join(
IMAGES_CACHE_DIR,
f"{sha256(title.encode('utf-8')).hexdigest()}.png",
)
)
):
future_to_task[
executor.submit(save_image_from_url, image_url, title)
] = image_url
mediaListName = "Not in any of your lists"
progress = "UNKNOWN"
if anime_list := anime["mediaListEntry"]:
mediaListName = anime_list["status"]
progress = anime_list["progress"]
# handle the text data
template = f"""
image_url={image_url}
ll=2
while [ $ll -le $FZF_PREVIEW_COLUMNS ];do
echo -n -e "{get_true_fg("", *SEPARATOR_COLOR, bold=False)}"
((ll++))
done
echo
echo "{get_true_fg("Title(jp):", *HEADER_COLOR)} {(anime["title"]["romaji"] or "").replace('"', SINGLE_QUOTE)}"
echo "{get_true_fg("Title(eng):", *HEADER_COLOR)} {(anime["title"]["english"] or "").replace('"', SINGLE_QUOTE)}"
ll=2
while [ $ll -le $FZF_PREVIEW_COLUMNS ];do
echo -n -e "{get_true_fg("", *SEPARATOR_COLOR, bold=False)}"
((ll++))
done
echo
echo "{get_true_fg("Popularity:", *HEADER_COLOR)} {anilist_data_helper.format_number_with_commas(anime["popularity"])}"
echo "{get_true_fg("Favourites:", *HEADER_COLOR)} {anilist_data_helper.format_number_with_commas(anime["favourites"])}"
echo "{get_true_fg("Status:", *HEADER_COLOR)} {str(anime["status"]).replace('"', SINGLE_QUOTE)}"
echo "{get_true_fg("Next Episode:", *HEADER_COLOR)} {anilist_data_helper.extract_next_airing_episode(anime["nextAiringEpisode"]).replace('"', SINGLE_QUOTE)}"
echo "{get_true_fg("Genres:", *HEADER_COLOR)} {anilist_data_helper.format_list_data_with_comma(anime["genres"]).replace('"', SINGLE_QUOTE)}"
ll=2
while [ $ll -le $FZF_PREVIEW_COLUMNS ];do
echo -n -e "{get_true_fg("", *SEPARATOR_COLOR, bold=False)}"
((ll++))
done
echo
echo "{get_true_fg("Episodes:", *HEADER_COLOR)} {(anime["episodes"]) or "UNKNOWN"}"
echo "{get_true_fg("Start Date:", *HEADER_COLOR)} {anilist_data_helper.format_anilist_date_object(anime["startDate"]).replace('"', SINGLE_QUOTE)}"
echo "{get_true_fg("End Date:", *HEADER_COLOR)} {anilist_data_helper.format_anilist_date_object(anime["endDate"]).replace('"', SINGLE_QUOTE)}"
ll=2
while [ $ll -le $FZF_PREVIEW_COLUMNS ];do
echo -n -e "{get_true_fg("", *SEPARATOR_COLOR, bold=False)}"
((ll++))
done
echo
echo "{get_true_fg("Media List:", *HEADER_COLOR)} {mediaListName.replace('"', SINGLE_QUOTE)}"
echo "{get_true_fg("Progress:", *HEADER_COLOR)} {progress}"
ll=2
while [ $ll -le $FZF_PREVIEW_COLUMNS ];do
echo -n -e "{get_true_fg("", *SEPARATOR_COLOR, bold=False)}"
((ll++))
done
echo
# echo "{get_true_fg("Description:", *HEADER_COLOR).replace('"', SINGLE_QUOTE)}"
"""
template = textwrap.dedent(template)
template = f"""
{template}
echo "{textwrap.fill(clean_html((anime["description"]) or "").replace('"', SINGLE_QUOTE), width=45)}"
"""
future_to_task[executor.submit(save_info_from_str, template, title)] = title
# execute the jobs
for future in concurrent.futures.as_completed(future_to_task):
task = future_to_task[future]
try:
future.result()
except Exception as exc:
logger.error("%r generated an exception: %s" % (task, exc))
# get rofi icons
def get_rofi_icons(
anilist_results: list[AnilistBaseMediaDataSchema], titles, workers=None
):
"""A helper function to make sure that the images are downloaded so they can be used as icons
Args:
titles (list[str]): sanitized titles of the anime; NOTE: its important that they are sanitized since they are used as the filenames of the images
workers ([TODO:parameter]): Number of threads to use to download the images; defaults to as many as possible
anilist_results: the anilist results from an anilist action
"""
# use concurrency to download the images as fast as possible
with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
# load the jobs
future_to_url = {}
for anime, title in zip(anilist_results, titles, strict=False):
# actual link to download image from
image_url = anime["coverImage"]["large"]
if not (
os.path.exists(
os.path.join(
IMAGES_CACHE_DIR,
f"{sha256(title.encode('utf-8')).hexdigest()}.png",
)
)
):
future_to_url[
executor.submit(save_image_from_url, image_url, title)
] = image_url
# execute the jobs
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
future.result()
except Exception as e:
logger.error("%r generated an exception: %s" % (url, e))
# get rofi icons
def get_fzf_manga_preview(manga_results, workers=None, wait=False):
"""A helper function to make sure that the images are downloaded so they can be used as icons
Args:
titles (list[str]): sanitized titles of the anime; NOTE: its important that they are sanitized since they are used as the filenames of the images
workers ([TODO:parameter]): Number of threads to use to download the images; defaults to as many as possible
anilist_results: the anilist results from an anilist action
"""
def _worker():
# use concurrency to download the images as fast as possible
with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
# load the jobs
future_to_url = {}
for manga in manga_results:
image_url = manga["poster"]
if not (
os.path.exists(
os.path.join(
IMAGES_CACHE_DIR,
f"{sha256(manga['title'].encode('utf-8')).hexdigest()}.png",
)
)
):
future_to_url[
executor.submit(
save_image_from_url,
image_url,
manga["title"],
)
] = image_url
# execute the jobs
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
future.result()
except Exception as e:
logger.error("%r generated an exception: %s" % (url, e))
background_worker = Thread(
target=_worker,
)
background_worker.daemon = True
# ensure images and info exists
background_worker.start()
# the preview script is in bash so making sure fzf doesnt use any other shell lang to process the preview script
os.environ["SHELL"] = shutil.which("bash") or "bash"
preview = """
%s
title="$(echo -n {})"
title="$(echo -n "$title" |generate_sha256)"
if [ -s "%s/$title" ]; then fzf_preview "%s/title"
else echo Loading...
fi
""" % (
bash_functions,
IMAGES_CACHE_DIR,
IMAGES_CACHE_DIR,
)
if wait:
background_worker.join()
return preview
# get rofi icons
def get_fzf_episode_preview(
anilist_result: AnilistBaseMediaDataSchema, episodes, workers=None, wait=False
):
"""A helper function to make sure that the images are downloaded so they can be used as icons
Args:
titles (list[str]): sanitized titles of the anime; NOTE: its important that they are sanitized since they are used as the filenames of the images
workers ([TODO:parameter]): Number of threads to use to download the images; defaults to as many as possible
anilist_results: the anilist results from an anilist action
Returns:
The fzf preview script to use or None if the bash is not found
"""
# HEADER_COLOR = 215, 0, 95
import re
def _worker():
# use concurrency to download the images as fast as possible
with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor:
# load the jobs
future_to_url = {}
for episode in episodes:
episode_title = ""
image_url = ""
for episode_detail in anilist_result["streamingEpisodes"]:
if re.match(f".*Episode {episode} .*", episode_detail["title"]):
episode_title = episode_detail["title"]
image_url = episode_detail["thumbnail"]
if episode_title and image_url:
future_to_url[
executor.submit(save_image_from_url, image_url, str(episode))
] = image_url
template = textwrap.dedent(
f"""
ll=2
while [ $ll -le $FZF_PREVIEW_COLUMNS ];do
echo -n -e "{get_true_fg("", *SEPARATOR_COLOR, bold=False)}"
((ll++))
done
echo
echo "{get_true_fg("Anime Title(eng):", *HEADER_COLOR)} {("" or anilist_result["title"]["english"]).replace('"', SINGLE_QUOTE)}"
echo "{get_true_fg("Anime Title(jp):", *HEADER_COLOR)} {(anilist_result["title"]["romaji"] or "").replace('"', SINGLE_QUOTE)}"
ll=2
while [ $ll -le $FZF_PREVIEW_COLUMNS ];do
echo -n -e "{get_true_fg("", *SEPARATOR_COLOR, bold=False)}"
((ll++))
done
echo
echo "{str(episode_title).replace('"', SINGLE_QUOTE)}"
ll=2
while [ $ll -le $FZF_PREVIEW_COLUMNS ];do
echo -n -e "{get_true_fg("", *SEPARATOR_COLOR, bold=False)}"
((ll++))
done
"""
)
future_to_url[
executor.submit(save_info_from_str, template, str(episode))
] = str(episode)
# execute the jobs
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
future.result()
except Exception as e:
logger.error("%r generated an exception: %s" % (url, e))
background_worker = Thread(
target=_worker,
)
background_worker.daemon = True
# ensure images and info exists
background_worker.start()
# the preview script is in bash so making sure fzf doesnt use any other shell lang to process the preview script
bash_path = which_bashlike()
if not bash_path:
return
os.environ["SHELL"] = bash_path
if S_PLATFORM == "win32":
preview = """
%s
title="$(echo -n {})"
title="$(echo -n "$title" |generate_sha256)"
dim=${FZF_PREVIEW_COLUMNS}x${FZF_PREVIEW_LINES}
if [ "$FASTANIME_IMAGE_PREVIEWS" = "True" ];then
if [ -s "%s\\\\\\${title}.png" ]; then
if command -v "chafa">/dev/null;then
chafa -s $dim "%s\\\\\\${title}.png"
else
echo please install chafa to enjoy image previews
fi
echo
else
echo Loading...
fi
fi
if [ -s "%s\\\\\\$title" ]; then source "%s\\\\\\$title"
else echo Loading...
fi
""" % (
bash_functions,
IMAGES_CACHE_DIR.replace("\\", "\\\\\\"),
IMAGES_CACHE_DIR.replace("\\", "\\\\\\"),
ANIME_INFO_CACHE_DIR.replace("\\", "\\\\\\"),
ANIME_INFO_CACHE_DIR.replace("\\", "\\\\\\"),
)
else:
preview = """
%s
title="$(echo -n {})"
title="$(echo -n "$title" |generate_sha256)"
if [ "$FASTANIME_IMAGE_PREVIEWS" = "True" ];then
if [ -s %s/${title}.png ]; then fzf_preview %s/${title}.png
else echo Loading...
fi
fi
if [ -f %s/${title} ]; then source %s/${title}
else echo Loading...
fi
""" % (
bash_functions,
IMAGES_CACHE_DIR,
IMAGES_CACHE_DIR,
ANIME_INFO_CACHE_DIR,
ANIME_INFO_CACHE_DIR,
)
if wait:
background_worker.join()
return preview
def get_fzf_anime_preview(
anilist_results: list[AnilistBaseMediaDataSchema], titles, wait=False
):
"""A helper function that constructs data to be used for the fzf preview
Args:
titles (list[str]): The sanitized titles to use, NOTE: its important that they are sanitized since thay will be used as filenames
wait (bool): whether to block the ui as we wait for preview defaults to false
anilist_results: the anilist results got from an anilist action
Returns:
The fzf preview script to use or None if the bash is not found
"""
# ensure images and info exists
background_worker = Thread(
target=write_search_results, args=(anilist_results, titles)
)
background_worker.daemon = True
background_worker.start()
# the preview script is in bash so making sure fzf doesnt use any other shell lang to process the preview script
bash_path = which_bashlike()
if not bash_path:
return
os.environ["SHELL"] = bash_path
if S_PLATFORM == "win32":
preview = """
%s
title="$(echo -n {})"
title="$(echo -n "$title" |generate_sha256)"
dim=${FZF_PREVIEW_COLUMNS}x${FZF_PREVIEW_LINES}
if [ "$FASTANIME_IMAGE_PREVIEWS" = "True" ];then
if [ -s "%s\\\\\\${title}.png" ]; then
if command -v "chafa">/dev/null;then
chafa -s $dim "%s\\\\\\${title}.png"
else
echo please install chafa to enjoy image previews
fi
echo
else
echo Loading...
fi
fi
if [ -s "%s\\\\\\$title" ]; then source "%s\\\\\\$title"
else echo Loading...
fi
""" % (
bash_functions,
IMAGES_CACHE_DIR.replace("\\", "\\\\\\"),
IMAGES_CACHE_DIR.replace("\\", "\\\\\\"),
ANIME_INFO_CACHE_DIR.replace("\\", "\\\\\\"),
ANIME_INFO_CACHE_DIR.replace("\\", "\\\\\\"),
)
else:
preview = """
%s
title="$(echo -n {})"
title="$(echo -n "$title" |generate_sha256)"
if [ "$FASTANIME_IMAGE_PREVIEWS" = "True" ];then
if [ -s "%s/${title}.png" ]; then fzf_preview "%s/${title}.png"
else echo Loading...
fi
fi
if [ -s "%s/$title" ]; then source "%s/$title"
else echo Loading...
fi
""" % (
bash_functions,
IMAGES_CACHE_DIR,
IMAGES_CACHE_DIR,
ANIME_INFO_CACHE_DIR,
ANIME_INFO_CACHE_DIR,
)
if wait:
background_worker.join()
return preview

View File

@@ -5,6 +5,16 @@ from ...types import Server
from ..types import AllAnimeEpisode, AllAnimeSource
from ..utils import one_digit_symmetric_xor
from .ak import AkExtractor
from .dropbox import SakExtractor
from .filemoon import FmHlsExtractor, OkExtractor
from .gogoanime import Lufmp4Extractor
from .mp4_upload import Mp4Extractor
from .sharepoint import Smp4Extractor
from .streamsb import SsHlsExtractor
from .vid_mp4 import VidMp4Extractor
from .we_transfer import KirExtractor
from .wixmp import DefaultExtractor
from .yt_mp4 import YtExtractor
logger = getLogger(__name__)
@@ -17,15 +27,21 @@ class BaseExtractor(ABC):
AVAILABLE_SOURCES = {
"Sak": AkExtractor,
"S-mp4": AkExtractor,
"Luf-mp4": AkExtractor,
"Default": AkExtractor,
"Yt-mp4": AkExtractor,
"Kir": AkExtractor,
"Mp4": AkExtractor,
"Sak": SakExtractor,
"S-mp4": Smp4Extractor,
"Luf-mp4": Lufmp4Extractor,
"Default": DefaultExtractor,
"Yt-mp4": YtExtractor,
"Kir": KirExtractor,
"Mp4": Mp4Extractor,
}
OTHER_SOURCES = {
"Ak": AkExtractor,
"Vid-mp4": VidMp4Extractor,
"Ok": OkExtractor,
"Ss-Hls": SsHlsExtractor,
"Fm-Hls": FmHlsExtractor,
}
OTHER_SOURCES = {"Ak": AkExtractor, "Vid-mp4": "", "Ok": "", "Ss-Hls": "", "Fm-Hls": ""}
def extract_server(

View File

@@ -1,21 +1,33 @@
from ...types import EpisodeStream, Server
from ..constants import API_BASE_URL
from ..types import AllAnimeEpisode, AllAnimeSource
from .extractor import BaseExtractor
# TODO: requires some serious work i think : )
response = self.session.get(
url,
timeout=10,
)
response.raise_for_status()
embed_html = response.text.replace(" ", "").replace("\n", "")
logger.debug("Found streams from Ss-Hls")
return {
"server": "StreamSb",
"headers": {"Referer": f"https://{API_BASE_URL}/"},
"subtitles": [],
"episode_title": (allanime_episode["notes"] or f"{anime_title}")
+ f"; Episode {episode_number}",
"links": give_random_quality(response.json()["links"]),
}
class SsHlsExtractor(BaseExtractor):
pass
@classmethod
def extract(
cls,
url,
client,
episode_number: str,
episode: AllAnimeEpisode,
source: AllAnimeSource,
) -> Server:
# TODO: requires some serious work i think : )
response = client.get(
url,
timeout=10,
)
response.raise_for_status()
embed_html = response.text.replace(" ", "").replace("\n", "")
streams = response.json()["links"]
return Server(
name="StreamSb",
links=[
EpisodeStream(link=link, quality="1080") for link in streams["links"]
],
episode_title=episode["notes"],
headers={"Referer": f"https://{API_BASE_URL}/"},
)

View File

@@ -1,21 +1,33 @@
from ...types import EpisodeStream, Server
from ..constants import API_BASE_URL
from ..types import AllAnimeEpisode, AllAnimeSource
from .extractor import BaseExtractor
# TODO: requires some serious work i think : )
response = self.session.get(
url,
timeout=10,
)
response.raise_for_status()
embed_html = response.text.replace(" ", "").replace("\n", "")
logger.debug("Found streams from vid-mp4")
return {
"server": "Vid-mp4",
"headers": {"Referer": f"https://{API_BASE_URL}/"},
"subtitles": [],
"episode_title": (allanime_episode["notes"] or f"{anime_title}")
+ f"; Episode {episode_number}",
"links": give_random_quality(response.json()["links"]),
}
# TODO: requires some serious work i think : )
class VidMp4Extractor(BaseExtractor):
pass
@classmethod
def extract(
cls,
url,
client,
episode_number: str,
episode: AllAnimeEpisode,
source: AllAnimeSource,
) -> Server:
response = client.get(
f"https://{API_BASE_URL}{url.replace('clock', 'clock.json')}",
timeout=10,
)
embed_html = response.text.replace(" ", "").replace("\n", "")
response.raise_for_status()
streams = response.json()
return Server(
name="Vid-mp4",
links=[
EpisodeStream(link=link, quality="1080") for link in streams["links"]
],
episode_title=episode["notes"],
headers={"Referer": f"https://{API_BASE_URL}/"},
)

View File

@@ -1,22 +1,31 @@
from ...types import EpisodeStream, Server
from ..constants import API_BASE_URL
from ..types import AllAnimeEpisode, AllAnimeSource
from .extractor import BaseExtractor
# get the stream url for an episode of the defined source names
response = self.session.get(
class KirExtractor(BaseExtractor):
@classmethod
def extract(
cls,
url,
client,
episode_number: str,
episode: AllAnimeEpisode,
source: AllAnimeSource,
) -> Server:
response = client.get(
f"https://{API_BASE_URL}{url.replace('clock', 'clock.json')}",
timeout=10,
)
response.raise_for_status()
case "Kir":
logger.debug("Found streams from wetransfer")
return {
"server": "weTransfer",
"headers": {"Referer": f"https://{API_BASE_URL}/"},
"subtitles": [],
"episode_title": (allanime_episode["notes"] or f"{anime_title}")
+ f"; Episode {episode_number}",
"links": give_random_quality(response.json()["links"]),
}
streams = response.json()
class KirExtractor(BaseExtractor):
pass
return Server(
name="weTransfer",
links=[
EpisodeStream(link=link, quality="1080") for link in streams["links"]
],
episode_title=episode["notes"],
headers={"Referer": f"https://{API_BASE_URL}/"},
)

View File

@@ -1,22 +1,31 @@
from ...types import EpisodeStream, Server
from ..constants import API_BASE_URL
from ..types import AllAnimeEpisode, AllAnimeSource
from .extractor import BaseExtractor
# get the stream url for an episode of the defined source names
response = self.session.get(
class DefaultExtractor(BaseExtractor):
@classmethod
def extract(
cls,
url,
client,
episode_number: str,
episode: AllAnimeEpisode,
source: AllAnimeSource,
) -> Server:
response = client.get(
f"https://{API_BASE_URL}{url.replace('clock', 'clock.json')}",
timeout=10,
)
response.raise_for_status()
case "Sak":
logger.debug("Found streams from dropbox")
return {
"server": "dropbox",
"headers": {"Referer": f"https://{API_BASE_URL}/"},
"subtitles": [],
"episode_title": (allanime_episode["notes"] or f"{anime_title}")
+ f"; Episode {episode_number}",
"links": give_random_quality(response.json()["links"]),
}
class DefaultExtractor(BaseExtractor):
pass
streams = response.json()
return Server(
name="wixmp",
links=[
EpisodeStream(link=link, quality="1080") for link in streams["links"]
],
episode_title=episode["notes"],
headers={"Referer": f"https://{API_BASE_URL}/"},
)

View File

@@ -1,17 +1,22 @@
from ...types import EpisodeStream, Server
from ..constants import API_BASE_URL
from ..types import AllAnimeEpisode, AllAnimeSource
from .extractor import BaseExtractor
return {
"server": "Yt",
"episode_title": f"{anime_title}; Episode {episode_number}",
"headers": {"Referer": f"https://{API_BASE_URL}/"},
"subtitles": [],
"links": [
{
"link": url,
"quality": "1080",
}
],
}
class YtExtractor(BaseExtractor):
pass
@classmethod
def extract(
cls,
url,
client,
episode_number: str,
episode: AllAnimeEpisode,
source: AllAnimeSource,
) -> Server:
return Server(
name="Yt",
links=[EpisodeStream(link=url, quality="1080")],
episode_title=episode["notes"],
headers={"Referer": f"https://{API_BASE_URL}/"},
)