Compare commits

...

1 Commits

Author SHA1 Message Date
Benexl
26f6ccc91e feat(aniskip): scaffhold with gemini 2025-08-12 12:59:51 +03:00
3 changed files with 134 additions and 15 deletions

View File

@@ -94,6 +94,9 @@ def media_actions(ctx: Context, state: State) -> State | InternalDirective:
f"{'🔘 ' if icons else ''}Toggle Translation Type (Current: {ctx.config.stream.translation_type.upper()})": _toggle_config_state(
ctx, state, "TRANSLATION_TYPE"
),
f"{'🔘 ' if icons else ''}Toggle Auto Skip (Current: {ctx.config.stream.auto_skip})": _toggle_config_state(
ctx, state, "AUTO_SKIP"
),
f"{'🔙 ' if icons else ''}Back to Results": lambda: InternalDirective.BACK,
f"{'' if icons else ''}Exit": lambda: InternalDirective.EXIT,
}
@@ -317,7 +320,11 @@ def _toggle_config_state(
ctx: Context,
state: State,
config_state: Literal[
"AUTO_ANIME", "AUTO_EPISODE", "CONTINUE_FROM_HISTORY", "TRANSLATION_TYPE"
"AUTO_ANIME",
"AUTO_EPISODE",
"CONTINUE_FROM_HISTORY",
"TRANSLATION_TYPE",
"AUTO_SKIP",
],
) -> MenuAction:
def action():
@@ -336,6 +343,8 @@ def _toggle_config_state(
ctx.config.stream.translation_type = (
"sub" if ctx.config.stream.translation_type == "dub" else "dub"
)
case "AUTO_SKIP":
ctx.config.stream.auto_skip = not ctx.config.stream.auto_skip
return InternalDirective.RELOAD
return action

View File

@@ -18,6 +18,7 @@ from typing import Any, Callable, Dict, List, Literal, Optional
from .....core.config.model import StreamConfig
from .....core.exceptions import FastAnimeError
from .....core.utils import formatter
from .....libs.aniskip.api import AniSkip, SkipTimeResult
from .....libs.media_api.types import MediaItem
from .....libs.player.base import BasePlayer
from .....libs.player.params import PlayerParams
@@ -252,6 +253,8 @@ class PlayerState:
class MpvIPCPlayer(BaseIPCPlayer):
"""MPV Player implementation using IPC for advanced features."""
_skip_times: Optional[SkipTimeResult] = None
_skipped_ids: set[str] = set() # To prevent re-skipping the same segment
stream_config: StreamConfig
mpv_process: subprocess.Popen
ipc_client: MPVIPCClient
@@ -283,6 +286,9 @@ class MpvIPCPlayer(BaseIPCPlayer):
registry: Optional[MediaRegistryService] = None,
media_item: Optional[MediaItem] = None,
) -> PlayerResult:
self._skip_times = None # Reset on each new play call
self._skipped_ids = set()
self.provider = provider
self.anime = anime
self.media_item = media_item
@@ -441,6 +447,7 @@ class MpvIPCPlayer(BaseIPCPlayer):
elif event == "client-message":
self._handle_client_message(message)
elif event == "file-loaded":
self._fetch_and_load_skip_times()
time.sleep(0.1)
self._configure_player()
elif event:
@@ -451,6 +458,8 @@ class MpvIPCPlayer(BaseIPCPlayer):
data = message.get("data")
if name == "time-pos" and isinstance(data, (int, float)):
self.player_state.stop_time_secs = data
self._check_for_skip(data)
elif name == "duration" and isinstance(data, (int, float)):
self.player_state.total_time_secs = data
elif name == "percent-pos" and isinstance(data, (int, float)):
@@ -698,3 +707,55 @@ class MpvIPCPlayer(BaseIPCPlayer):
def _handle_select_quality(self, quality: Optional[str] = None):
self._show_text("Quality switching is not yet implemented.")
def _check_for_skip(self, current_time: float):
"""Checks if the current playback time falls within a skip interval."""
if (
not self.stream_config.auto_skip
or not self._skip_times
or not self._skip_times.found
):
return
for skip in self._skip_times.results:
if skip.skip_id in self._skipped_ids:
continue
start_time, end_time = skip.interval
# Trigger skip slightly after the start time
if start_time <= current_time < end_time:
logger.info(
f"Skipping {skip.skip_type.upper()} from {start_time} to {end_time}"
)
self._show_text(f"Skipping {skip.skip_type.upper()}...", duration=1500)
self.ipc_client.send_command(["set_property", "time-pos", end_time])
self._skipped_ids.add(skip.skip_id)
break
def _fetch_and_load_skip_times(self):
"""Fetches skip times for the current episode in a background thread."""
if (
not self.stream_config.auto_skip
or not self.media_item
or not self.media_item.id_mal
):
return
try:
episode_num = int(float(self.player_state.episode))
mal_id = self.media_item.id_mal
def task():
self._skip_times = AniSkip.get_skip_times(mal_id, episode_num)
if self._skip_times and self._skip_times.found:
logger.info(
f"Found {len(self._skip_times.results)} skip intervals for Ep {episode_num}"
)
self._show_text("Skip times loaded.", duration=2000)
# Run in a thread to not block playback
threading.Thread(target=task, daemon=True).start()
except (ValueError, TypeError):
logger.warning(
f"Could not parse episode number for Aniskip: {self.player_state.episode}"
)

View File

@@ -1,22 +1,71 @@
from httpx import get
# fastanime/libs/aniskip/api.py
ANISKIP_ENDPOINT = "https://api.aniskip.com/v1/skip-times"
import logging
from typing import List, Literal, Optional
import httpx
from pydantic import BaseModel, Field
logger = logging.getLogger(__name__)
ANISKIP_API_URL = "https://api.aniskip.com/v2/skip-times"
class SkipTime(BaseModel):
"""Represents a single skip interval (e.g., an opening or ending)."""
interval: tuple[float, float]
skip_type: Literal["op", "ed"] = Field(alias="skipType")
skip_id: str = Field(alias="skipId")
episode_length: float = Field(alias="episodeLength")
class SkipTimeResult(BaseModel):
"""Represents the full response from the Aniskip API for an episode."""
found: bool
results: List[SkipTime] = Field(default_factory=list)
message: Optional[str] = None
status_code: int = Field(alias="statusCode")
# TODO: Finish own implementation of aniskip script
class AniSkip:
"""A client for fetching opening and ending skip times from the Aniskip API."""
@classmethod
def get_skip_times(
cls, mal_id: int, episode_number: float | int, types=["op", "ed"]
):
url = f"{ANISKIP_ENDPOINT}/{mal_id}/{episode_number}?types=op&types=ed"
response = get(url)
print(response.text)
return response.json()
cls,
mal_id: int,
episode_number: int,
types: List[Literal["op", "ed"]] = ["op", "ed"],
) -> Optional[SkipTimeResult]:
"""
Fetches skip times for a specific anime episode from Aniskip.
Args:
mal_id: The MyAnimeList ID of the anime.
episode_number: The episode number.
types: A list of types to fetch ('op' for opening, 'ed' for ending).
if __name__ == "__main__":
mal_id = input("Mal id: ")
episode_number = input("episode_number: ")
skip_times = AniSkip.get_skip_times(int(mal_id), float(episode_number))
print(skip_times)
Returns:
A SkipTimeResult object if the request is successful, otherwise None.
"""
if not mal_id or not episode_number:
return None
url = f"{ANISKIP_API_URL}/{mal_id}/{episode_number}"
params = [("type", t) for t in types]
try:
with httpx.Client() as client:
response = client.get(url, params=params, timeout=5)
# Aniskip can return 404 for not found, which is a valid response.
if response.status_code not in [200, 404]:
response.raise_for_status()
return SkipTimeResult.model_validate(response.json())
except (httpx.RequestError, httpx.HTTPStatusError, ValueError) as e:
logger.error(
f"Aniskip API request failed for MAL ID {mal_id}, Ep {episode_number}: {e}"
)
return None