Compare commits

...

5 Commits

17 changed files with 799 additions and 17 deletions

1
.gitignore vendored
View File

@@ -210,3 +210,4 @@ repomix-output.xml
.project/
result
.direnv
fastanime/libs/provider/anime/hianime/extractors/js/node_modules

View File

@@ -94,6 +94,9 @@ def media_actions(ctx: Context, state: State) -> State | InternalDirective:
f"{'🔘 ' if icons else ''}Toggle Translation Type (Current: {ctx.config.stream.translation_type.upper()})": _toggle_config_state(
ctx, state, "TRANSLATION_TYPE"
),
f"{'🔘 ' if icons else ''}Toggle Auto Skip (Current: {ctx.config.stream.auto_skip})": _toggle_config_state(
ctx, state, "AUTO_SKIP"
),
f"{'🔙 ' if icons else ''}Back to Results": lambda: InternalDirective.BACK,
f"{'' if icons else ''}Exit": lambda: InternalDirective.EXIT,
}
@@ -317,7 +320,11 @@ def _toggle_config_state(
ctx: Context,
state: State,
config_state: Literal[
"AUTO_ANIME", "AUTO_EPISODE", "CONTINUE_FROM_HISTORY", "TRANSLATION_TYPE"
"AUTO_ANIME",
"AUTO_EPISODE",
"CONTINUE_FROM_HISTORY",
"TRANSLATION_TYPE",
"AUTO_SKIP",
],
) -> MenuAction:
def action():
@@ -336,6 +343,8 @@ def _toggle_config_state(
ctx.config.stream.translation_type = (
"sub" if ctx.config.stream.translation_type == "dub" else "dub"
)
case "AUTO_SKIP":
ctx.config.stream.auto_skip = not ctx.config.stream.auto_skip
return InternalDirective.RELOAD
return action

View File

@@ -18,6 +18,7 @@ from typing import Any, Callable, Dict, List, Literal, Optional
from .....core.config.model import StreamConfig
from .....core.exceptions import FastAnimeError
from .....core.utils import formatter
from .....libs.aniskip.api import AniSkip, SkipTimeResult
from .....libs.media_api.types import MediaItem
from .....libs.player.base import BasePlayer
from .....libs.player.params import PlayerParams
@@ -252,6 +253,8 @@ class PlayerState:
class MpvIPCPlayer(BaseIPCPlayer):
"""MPV Player implementation using IPC for advanced features."""
_skip_times: Optional[SkipTimeResult] = None
_skipped_ids: set[str] = set() # To prevent re-skipping the same segment
stream_config: StreamConfig
mpv_process: subprocess.Popen
ipc_client: MPVIPCClient
@@ -283,6 +286,9 @@ class MpvIPCPlayer(BaseIPCPlayer):
registry: Optional[MediaRegistryService] = None,
media_item: Optional[MediaItem] = None,
) -> PlayerResult:
self._skip_times = None # Reset on each new play call
self._skipped_ids = set()
self.provider = provider
self.anime = anime
self.media_item = media_item
@@ -441,6 +447,7 @@ class MpvIPCPlayer(BaseIPCPlayer):
elif event == "client-message":
self._handle_client_message(message)
elif event == "file-loaded":
self._fetch_and_load_skip_times()
time.sleep(0.1)
self._configure_player()
elif event:
@@ -451,6 +458,8 @@ class MpvIPCPlayer(BaseIPCPlayer):
data = message.get("data")
if name == "time-pos" and isinstance(data, (int, float)):
self.player_state.stop_time_secs = data
self._check_for_skip(data)
elif name == "duration" and isinstance(data, (int, float)):
self.player_state.total_time_secs = data
elif name == "percent-pos" and isinstance(data, (int, float)):
@@ -698,3 +707,55 @@ class MpvIPCPlayer(BaseIPCPlayer):
def _handle_select_quality(self, quality: Optional[str] = None):
self._show_text("Quality switching is not yet implemented.")
def _check_for_skip(self, current_time: float):
"""Checks if the current playback time falls within a skip interval."""
if (
not self.stream_config.auto_skip
or not self._skip_times
or not self._skip_times.found
):
return
for skip in self._skip_times.results:
if skip.skip_id in self._skipped_ids:
continue
start_time, end_time = skip.interval
# Trigger skip slightly after the start time
if start_time <= current_time < end_time:
logger.info(
f"Skipping {skip.skip_type.upper()} from {start_time} to {end_time}"
)
self._show_text(f"Skipping {skip.skip_type.upper()}...", duration=1500)
self.ipc_client.send_command(["set_property", "time-pos", end_time])
self._skipped_ids.add(skip.skip_id)
break
def _fetch_and_load_skip_times(self):
"""Fetches skip times for the current episode in a background thread."""
if (
not self.stream_config.auto_skip
or not self.media_item
or not self.media_item.id_mal
):
return
try:
episode_num = int(float(self.player_state.episode))
mal_id = self.media_item.id_mal
def task():
self._skip_times = AniSkip.get_skip_times(mal_id, episode_num)
if self._skip_times and self._skip_times.found:
logger.info(
f"Found {len(self._skip_times.results)} skip intervals for Ep {episode_num}"
)
self._show_text("Skip times loaded.", duration=2000)
# Run in a thread to not block playback
threading.Thread(target=task, daemon=True).start()
except (ValueError, TypeError):
logger.warning(
f"Could not parse episode number for Aniskip: {self.player_state.episode}"
)

View File

@@ -1,7 +1,10 @@
import os
import sys
from rich.traceback import install as rich_install
from ...core.constants import PROJECT_NAME
def custom_exception_hook(exc_type, exc_value, exc_traceback):
print(f"{exc_type.__name__}: {exc_value}")
@@ -16,6 +19,9 @@ def setup_exceptions_handler(
rich_traceback: bool | None,
rich_traceback_theme: str,
):
if dev:
# auto set env
os.environ[f"{PROJECT_NAME}_DEBUG"] = "1"
if trace or dev:
sys.excepthook = default_exception_hook
if rich_traceback:

View File

@@ -1,22 +1,71 @@
from httpx import get
# fastanime/libs/aniskip/api.py
ANISKIP_ENDPOINT = "https://api.aniskip.com/v1/skip-times"
import logging
from typing import List, Literal, Optional
import httpx
from pydantic import BaseModel, Field
logger = logging.getLogger(__name__)
ANISKIP_API_URL = "https://api.aniskip.com/v2/skip-times"
class SkipTime(BaseModel):
"""Represents a single skip interval (e.g., an opening or ending)."""
interval: tuple[float, float]
skip_type: Literal["op", "ed"] = Field(alias="skipType")
skip_id: str = Field(alias="skipId")
episode_length: float = Field(alias="episodeLength")
class SkipTimeResult(BaseModel):
"""Represents the full response from the Aniskip API for an episode."""
found: bool
results: List[SkipTime] = Field(default_factory=list)
message: Optional[str] = None
status_code: int = Field(alias="statusCode")
# TODO: Finish own implementation of aniskip script
class AniSkip:
"""A client for fetching opening and ending skip times from the Aniskip API."""
@classmethod
def get_skip_times(
cls, mal_id: int, episode_number: float | int, types=["op", "ed"]
):
url = f"{ANISKIP_ENDPOINT}/{mal_id}/{episode_number}?types=op&types=ed"
response = get(url)
print(response.text)
return response.json()
cls,
mal_id: int,
episode_number: int,
types: List[Literal["op", "ed"]] = ["op", "ed"],
) -> Optional[SkipTimeResult]:
"""
Fetches skip times for a specific anime episode from Aniskip.
Args:
mal_id: The MyAnimeList ID of the anime.
episode_number: The episode number.
types: A list of types to fetch ('op' for opening, 'ed' for ending).
if __name__ == "__main__":
mal_id = input("Mal id: ")
episode_number = input("episode_number: ")
skip_times = AniSkip.get_skip_times(int(mal_id), float(episode_number))
print(skip_times)
Returns:
A SkipTimeResult object if the request is successful, otherwise None.
"""
if not mal_id or not episode_number:
return None
url = f"{ANISKIP_API_URL}/{mal_id}/{episode_number}"
params = [("type", t) for t in types]
try:
with httpx.Client() as client:
response = client.get(url, params=params, timeout=5)
# Aniskip can return 404 for not found, which is a valid response.
if response.status_code not in [200, 404]:
response.raise_for_status()
return SkipTimeResult.model_validate(response.json())
except (httpx.RequestError, httpx.HTTPStatusError, ValueError) as e:
logger.error(
f"Aniskip API request failed for MAL ID {mal_id}, Ep {episode_number}: {e}"
)
return None

View File

@@ -0,0 +1,12 @@
# The base domain for HiAnime.
HIANIME_DOMAIN = "hianime.to"
HIANIME_BASE_URL = f"https://{HIANIME_DOMAIN}"
# The endpoint for making AJAX requests (fetching episodes, servers, etc.).
HIANIME_AJAX_URL = f"{HIANIME_BASE_URL}/ajax"
# The base URL for search queries.
SEARCH_URL = f"{HIANIME_BASE_URL}/search"
# The Referer header is crucial for making successful requests to the AJAX endpoints.
AJAX_REFERER_HEADER = f"{HIANIME_BASE_URL}/"

View File

@@ -0,0 +1,30 @@
import logging
from typing import Optional
from ....anime.types import Server
from .megacloud import MegaCloudExtractor
logger = logging.getLogger(__name__)
def extract_server(embed_url: str) -> Optional[Server]:
"""
Acts as a router to select the correct extractor based on the embed URL.
Args:
embed_url: The URL of the video host's embed page.
Returns:
A Server object containing the stream links, or None if extraction fails.
"""
hostname = embed_url.split("/")[2]
if "megacloud" in hostname or "megaplay" in hostname:
return MegaCloudExtractor().extract(embed_url)
# In the future, you could add other extractors here:
# if "streamsb" in hostname:
# return StreamSbExtractor().extract(embed_url)
logger.warning(f"No extractor found for hostname: {hostname}")
return None

View File

@@ -0,0 +1,55 @@
const CryptoJS = require("crypto-js");
/**
* Extracts a secret key from an encrypted string based on an array of index pairs,
* then uses that key to decrypt the rest of the string.
* @param {string} encryptedString - The full encrypted sources string.
* @param {string} varsJson - A JSON string representing an array of [start, length] pairs.
* @returns {string} The decrypted JSON string of video sources.
*/
function getSecretAndDecrypt(encryptedString, varsJson) {
const values = JSON.parse(varsJson);
let secret = "";
let encryptedSource = "";
let encryptedSourceArray = encryptedString.split("");
let currentIndex = 0;
for (const index of values) {
const start = index[0] + currentIndex;
const end = start + index[1];
for (let i = start; i < end; i++) {
secret += encryptedString[i];
encryptedSourceArray[i] = "";
}
currentIndex += index[1];
}
encryptedSource = encryptedSourceArray.join("");
const decrypted = CryptoJS.AES.decrypt(encryptedSource, secret).toString(
CryptoJS.enc.Utf8,
);
return decrypted;
}
// Main execution logic
const args = process.argv.slice(2);
if (args.length < 2) {
console.error(
"Usage: node megacloud_decrypt.js <encryptedString> '<varsJson>'",
);
process.exit(1);
}
const encryptedString = args[0];
const varsJson = args[1];
try {
const result = getSecretAndDecrypt(encryptedString, varsJson);
// The result is already a JSON string of the sources, just print it to stdout.
console.log(result);
} catch (e) {
console.error(e.message);
process.exit(1);
}

View File

@@ -0,0 +1,21 @@
{
"name": "hianime-extractor-helper",
"version": "1.0.0",
"lockfileVersion": 3,
"requires": true,
"packages": {
"": {
"name": "hianime-extractor-helper",
"version": "1.0.0",
"dependencies": {
"crypto-js": "^4.2.0"
}
},
"node_modules/crypto-js": {
"version": "4.2.0",
"resolved": "https://registry.npmjs.org/crypto-js/-/crypto-js-4.2.0.tgz",
"integrity": "sha512-KALDyEYgpY+Rlob/iriUtjV6d5Eq+Y191A5g4UqLAi8CyGP9N1+FdVbkc1SxKc2r4YAYqG8JzO2KGL+AizD70Q==",
"license": "MIT"
}
}
}

View File

@@ -0,0 +1,9 @@
{
"name": "hianime-extractor-helper",
"version": "1.0.0",
"description": "Helper script to decrypt MegaCloud sources for FastAnime.",
"main": "megacloud_decrypt.js",
"dependencies": {
"crypto-js": "^4.2.0"
}
}

View File

@@ -0,0 +1,180 @@
import json
import logging
import re
import subprocess
from pathlib import Path
from typing import List, Optional
import httpx
from ...types import EpisodeStream, Server, Subtitle
from ..types import HiAnimeSource
logger = logging.getLogger(__name__)
# The path to our Node.js decryption script, relative to this file.
DECRYPT_SCRIPT_PATH = Path(__file__).parent / "js" / "megacloud_decrypt.js"
class MegaCloudExtractor:
"""
Extractor for MegaCloud streams.
It works by:
1. Fetching the embed page.
2. Finding the encrypted sources data and the URL to a JavaScript file.
3. Fetching the JavaScript file and using regex to find decryption keys.
4. Calling an external Node.js script to perform the decryption.
5. Parsing the decrypted result to get the final stream URLs.
"""
def _run_node_script(self, encrypted_string: str, vars_json: str) -> Optional[dict]:
"""
Executes the Node.js decryption script as a subprocess.
Args:
encrypted_string: The large encrypted sources string.
vars_json: A JSON string of the array of indexes for key extraction.
Returns:
The decrypted data as a dictionary, or None on failure.
"""
if not DECRYPT_SCRIPT_PATH.exists():
logger.error(
f"Node.js decryption script not found at: {DECRYPT_SCRIPT_PATH}"
)
return None
command = ["node", str(DECRYPT_SCRIPT_PATH), encrypted_string, vars_json]
try:
process = subprocess.run(
command,
capture_output=True,
text=True,
check=True,
cwd=DECRYPT_SCRIPT_PATH.parent, # Run from the 'js' directory
)
return json.loads(process.stdout)
except subprocess.CalledProcessError as e:
logger.error(f"Node.js script failed with error: {e.stderr}")
except json.JSONDecodeError:
logger.error("Failed to parse JSON output from Node.js script.")
except Exception as e:
logger.error(
f"An unexpected error occurred while running Node.js script: {e}"
)
return None
def extract_vars_from_script(self, script_content: str) -> Optional[str]:
"""
Uses regex to find the variable array needed for decryption from the script content.
This pattern is based on the logic from the TypeScript project.
"""
# This regex is a Python adaptation of the one in the TypeScript source.
# It looks for the specific pattern that initializes the decryption keys.
regex = r"case\s*0x[0-9a-f]+:(?![^;]*=partKey)\s*\w+\s*=\s*(\w+)\s*,\s*\w+\s*=\s*(\w+);"
matches = re.findall(regex, script_content)
if not matches:
logger.error("Could not find decryption variables in the script.")
return None
def matching_key(value: str, script: str) -> Optional[str]:
# This nested function replicates the `matchingKey` logic from the TS file.
key_regex = re.compile(f",{value}=((?:0x)?([0-9a-fA-F]+))")
match = key_regex.search(script)
return match.group(1) if match else None
vars_array = []
for match in matches:
try:
key1_hex = matching_key(match[0], script_content)
key2_hex = matching_key(match[1], script_content)
if key1_hex and key2_hex:
vars_array.append([int(key1_hex, 16), int(key2_hex, 16)])
except (ValueError, TypeError):
logger.warning(
f"Could not parse hex values from script for match: {match}"
)
continue
return json.dumps(vars_array) if vars_array else None
def extract(self, embed_url: str) -> Optional[Server]:
"""
Main extraction method.
Args:
embed_url: The URL of the MegaCloud embed page.
Returns:
A Server object containing stream links and subtitles.
"""
try:
with httpx.Client() as client:
# 1. Get the embed page content
embed_response = client.get(
embed_url, headers={"Referer": constants.HIANIME_BASE_URL}
)
embed_response.raise_for_status()
embed_html = embed_response.text
# 2. Find the encrypted sources and the script URL
# The data is usually stored in a script tag as `var sources = [...]`.
sources_match = re.search(r"var sources = ([^;]+);", embed_html)
script_url_match = re.search(
r'src="(/js/player/a/prod/e1-player.min.js\?[^"]+)"', embed_html
)
if not sources_match or not script_url_match:
logger.error("Could not find sources or script URL in embed page.")
return None
encrypted_sources_data = json.loads(sources_match.group(1))
script_url = "https:" + script_url_match.group(1)
encrypted_string = encrypted_sources_data.get("sources")
if not isinstance(encrypted_string, str) or not encrypted_string:
logger.error("Encrypted sources string is missing or invalid.")
return None
# 3. Fetch the script and extract decryption variables
script_response = client.get(script_url)
script_response.raise_for_status()
vars_json = self.extract_vars_from_script(script_response.text)
if not vars_json:
return None
# 4. Decrypt using the Node.js script
decrypted_data = self._run_node_script(encrypted_string, vars_json)
if not decrypted_data or not isinstance(decrypted_data, list):
logger.error("Decryption failed or returned invalid data.")
return None
# 5. Map to generic models
streams = [
EpisodeStream(
link=source["file"], quality="auto", format=source["type"]
)
for source in decrypted_data
]
subtitles = [
Subtitle(url=track["file"], language=track.get("label", "en"))
for track in encrypted_sources_data.get("tracks", [])
if track.get("kind") == "captions"
]
return Server(
name="MegaCloud",
links=streams,
subtitles=subtitles,
headers={"Referer": "https://megacloud.tv/"},
)
except Exception as e:
logger.error(f"MegaCloud extraction failed: {e}", exc_info=True)
return None

View File

@@ -0,0 +1,149 @@
import re
from typing import List, Optional
from ....provider.anime.types import (
Anime,
AnimeEpisodes,
PageInfo,
SearchResult,
SearchResults,
)
from ....provider.scraping.html_parser import (
extract_attributes,
get_element_by_class,
get_elements_by_class,
)
def _parse_episodes(element_html: str) -> AnimeEpisodes:
"""Helper function to parse sub/dub episode counts from an anime item."""
sub_text = get_element_by_class("tick-sub", element_html)
dub_text = get_element_by_class("tick-dub", element_html)
sub_count = 0
dub_count = 0
if sub_text:
match = re.search(r"\d+", sub_text)
if match:
sub_count = int(match.group(0))
if dub_text:
match = re.search(r"\d+", dub_text)
if match:
dub_count = int(match.group(0))
# Generate a list of episode numbers as strings
sub_list = [str(i) for i in range(1, sub_count + 1)]
dub_list = [str(i) for i in range(1, dub_count + 1)]
return AnimeEpisodes(sub=sub_list, dub=dub_list, raw=[])
def map_to_search_results(
anime_elements: List[str], full_html: str
) -> Optional[SearchResults]:
"""
Maps a list of HTML elements from a HiAnime search page to a generic SearchResults object.
Args:
anime_elements: A list of raw HTML strings, each representing an anime (.flw-item).
full_html: The full HTML content of the search page for parsing pagination.
Returns:
A SearchResults object or None if parsing fails.
"""
results = []
for element in anime_elements:
title_element = get_element_by_class("dynamic-name", element)
if not title_element:
continue
attrs = extract_attributes(title_element)
title = title_element.split(">")[1].split("<")[0].strip()
anime_id = attrs.get("href", "").lstrip("/")
poster_element = get_element_by_class("film-poster-img", element)
poster_attrs = extract_attributes(poster_element or "")
results.append(
SearchResult(
id=anime_id,
title=title,
poster=poster_attrs.get("data-src"),
episodes=_parse_episodes(element),
)
)
# Parse pagination to determine total pages
total_pages = 1
# Use a simpler selector that is less prone to parsing issues.
pagination_elements = get_elements_by_class("page-item", full_html)
if pagination_elements:
# Find the last page number from all pagination links
last_page_num = 0
for el in pagination_elements:
attrs = extract_attributes(el)
href = attrs.get("href", "")
if "?page=" in href:
try:
num = int(href.split("?page=")[-1])
if num > last_page_num:
last_page_num = num
except (ValueError, IndexError):
continue
if last_page_num > 0:
total_pages = last_page_num
page_info = PageInfo(total=total_pages)
return SearchResults(page_info=page_info, results=results)
def map_to_anime_result(anime_id_slug: str, episode_list_html: str) -> Optional[Anime]:
"""
Maps the AJAX response for an episode list to a generic Anime object.
Args:
anime_id_slug: The anime's unique ID string (e.g., "steinsgate-3").
episode_list_html: The raw HTML snippet containing the list of episodes.
Returns:
An Anime object containing the episode list, or None.
"""
episodes = get_elements_by_class("ssl-item", episode_list_html)
episode_numbers_sub = []
# Note: HiAnime's episode list doesn't differentiate sub/dub, so we assume all are sub for now.
# The user selects sub/dub when choosing a server later.
for ep_element in episodes:
attrs = extract_attributes(ep_element)
ep_num = attrs.get("data-number")
if ep_num:
episode_numbers_sub.append(ep_num)
# The title isn't in this AJAX response, so we derive a placeholder from the slug.
# The application's state usually carries the real title from the search/list step.
placeholder_title = anime_id_slug.replace("-", " ").title()
return Anime(
id=anime_id_slug,
title=placeholder_title,
episodes=AnimeEpisodes(
sub=episode_numbers_sub,
dub=[], # We don't know dub count from this endpoint
raw=[],
),
)
def map_to_server_id(server_element_html: str) -> Optional[str]:
"""
Extracts the server's unique data-id from its HTML element.
Args:
server_element_html: The raw HTML of a server-item.
Returns:
The server ID string, or None.
"""
attrs = extract_attributes(server_element_html)
return attrs.get("data-id")

View File

@@ -0,0 +1,168 @@
import logging
from typing import Iterator, Optional
from ....provider.anime.base import BaseAnimeProvider
from ....provider.anime.params import AnimeParams, EpisodeStreamsParams, SearchParams
from ....provider.anime.types import Anime, SearchResults, Server
from ....provider.scraping.html_parser import get_elements_by_class
from . import constants, mappers
from .extractors import extract_server
logger = logging.getLogger(__name__)
class HiAnime(BaseAnimeProvider):
"""
Provider for scraping anime data from HiAnime.
This provider implements the search, get, and episode_streams methods
to fetch anime information and video stream URLs from HiAnime's website
and internal AJAX APIs.
"""
HEADERS = {"Referer": constants.HIANIME_BASE_URL}
def search(self, params: SearchParams) -> Optional[SearchResults]:
"""
Searches HiAnime for a given query.
Args:
params: The search parameters containing the query.
Returns:
A SearchResults object containing the found anime, or None.
"""
search_url = f"{constants.SEARCH_URL}?keyword={params.query}"
try:
response = self.client.get(search_url, follow_redirects=True)
response.raise_for_status()
# The search results are rendered in the HTML. We use our HTML parser
# to find all elements with the class '.flw-item', which represent
# individual anime search results.
anime_elements = get_elements_by_class("flw-item", response.text)
if not anime_elements:
return None
# The mapper will convert the raw HTML elements into our generic SearchResults model.
return mappers.map_to_search_results(anime_elements, response.text)
except Exception as e:
logger.error(
f"Failed to perform search on HiAnime for query '{params.query}': {e}"
)
return None
def get(self, params: AnimeParams) -> Optional[Anime]:
"""
Retrieves detailed information and a list of episodes for a specific anime.
Args:
params: The parameters containing the anime ID (slug).
Returns:
An Anime object with a full episode list, or None.
"""
try:
# The numeric ID is the last part of the slug.
clean_id_slug = params.id.split("?")[0]
anime_id_numeric = clean_id_slug.split("-")[-1]
if not anime_id_numeric.isdigit():
raise ValueError("Could not extract numeric ID from anime slug.")
# HiAnime loads episodes via an AJAX request.
episodes_url = (
f"{constants.HIANIME_AJAX_URL}/v2/episode/list/{anime_id_numeric}"
)
response = self.client.get(
episodes_url,
headers={
"X-Requested-With": "XMLHttpRequest",
"Referer": constants.AJAX_REFERER_HEADER,
},
)
response.raise_for_status()
# The response is JSON containing an 'html' key with the episode list.
html_snippet = response.json().get("html", "")
if not html_snippet:
return None
# We pass the original anime ID (slug) and the HTML snippet to the mapper.
return mappers.map_to_anime_result(params.id, html_snippet)
except Exception as e:
logger.error(f"Failed to get anime details for '{params.id}': {e}")
return None
def episode_streams(
self, params: EpisodeStreamsParams
) -> Optional[Iterator[Server]]:
"""
Fetches the actual video stream URLs for a given episode.
This is a multi-step process:
1. Get the list of available servers (e.g., MegaCloud, StreamSB).
2. For each server, get the embed URL.
3. Pass the embed URL to an extractor to get the final stream URL.
Args:
params: The parameters containing the episode ID.
Yields:
A Server object for each available video source.
"""
try:
# The episode ID is in the format 'anime-slug?ep=12345'
episode_id_numeric = params.episode.split("?ep=")[-1]
if not episode_id_numeric.isdigit():
raise ValueError("Could not extract numeric episode ID.")
# 1. Get available servers for the episode.
servers_url = f"{constants.HIANIME_AJAX_URL}/v2/episode/servers?episodeId={episode_id_numeric}"
servers_response = self.client.get(
servers_url,
headers={
"X-Requested-With": "XMLHttpRequest",
"Referer": constants.AJAX_REFERER_HEADER,
},
)
servers_response.raise_for_status()
server_elements = get_elements_by_class(
"server-item", servers_response.json().get("html", "")
)
for server_element in server_elements:
try:
# 2. Extract the server's unique ID.
server_id = mappers.map_to_server_id(server_element)
if not server_id:
continue
# 3. Get the embed URL for this server.
sources_url = f"{constants.HIANIME_AJAX_URL}/v2/episode/sources?id={server_id}"
sources_response = self.client.get(
sources_url,
headers={
"X-Requested-With": "XMLHttpRequest",
"Referer": constants.AJAX_REFERER_HEADER,
},
)
sources_response.raise_for_status()
embed_url = sources_response.json().get("link")
if not embed_url:
continue
# 4. Use an extractor to get the final stream URLs from the embed page.
# The extractor handles the complex, host-specific logic.
server = extract_server(embed_url)
if server:
yield server
except Exception as e:
logger.warning(
f"Failed to process a server for episode '{params.episode}': {e}"
)
continue
except Exception as e:
logger.error(f"Failed to get episode streams for '{params.episode}': {e}")
return None

View File

@@ -0,0 +1,33 @@
from typing import List, Literal, TypedDict
class HiAnimeEpisode(TypedDict):
"""
Represents a single episode entry returned by the
`/ajax/v2/episode/list/{anime_id}` endpoint.
"""
title: str | None
episodeId: str | None
number: int
isFiller: bool
class HiAnimeEpisodeServer(TypedDict):
"""
Represents a single server entry returned by the
`/ajax/v2/episode/servers?episodeId={episode_id}` endpoint.
"""
serverName: str
serverId: int | None
class HiAnimeSource(TypedDict):
"""
Represents the JSON response from the
`/ajax/v2/episode/sources?id={server_id}` endpoint,
which contains the link to the extractor's embed page.
"""
link: str

View File

@@ -12,8 +12,6 @@ PROVIDERS_AVAILABLE = {
"allanime": "provider.AllAnime",
"animepahe": "provider.AnimePahe",
"hianime": "provider.HiAnime",
"nyaa": "provider.Nyaa",
"yugen": "provider.Yugen",
}

View File

@@ -11,6 +11,7 @@ from pydantic import BaseModel, ConfigDict
class ProviderName(Enum):
ALLANIME = "allanime"
ANIMEPAHE = "animepahe"
HIANIME = "hianime"
class ProviderServer(Enum):