diff --git a/fastanime/cli/service/player/ipc/mpv.py b/fastanime/cli/service/player/ipc/mpv.py index 1661a41..e57bf06 100644 --- a/fastanime/cli/service/player/ipc/mpv.py +++ b/fastanime/cli/service/player/ipc/mpv.py @@ -91,9 +91,12 @@ class MPVIPCClient: try: if not self.socket: break + # A blocking recv is efficient as the thread will sleep until data is available. data = self.socket.recv(4096) if not data: logger.info("MPV IPC socket closed.") + # Put a special event to signal the main loop that MPV has shut down. + self._event_queue.put({"event": "shutdown"}) break self._message_buffer += data @@ -114,13 +117,14 @@ class MPVIPCClient: try: message = json.loads(message_data.decode("utf-8")) - if "request_id" in message: + # Responses have a 'request_id' and 'error' field, events do not. + if "request_id" in message and "error" in message: req_id = message["request_id"] with self._lock: self._response_dict[req_id] = message if req_id in self._response_events: self._response_events[req_id].set() - else: + else: # It's an event self._event_queue.put(message) except (json.JSONDecodeError, UnicodeDecodeError) as e: logger.warning( @@ -247,24 +251,25 @@ class MpvIPCPlayer(BaseIPCPlayer): """MPV Player implementation using IPC for advanced features.""" stream_config: StreamConfig - mpv_process: subprocess.Popen ipc_client: MPVIPCClient - player_state: PlayerState player_fetching: bool = False player_first_run: bool = True - event_handlers: Dict[str, List[Callable]] = {} property_observers: Dict[str, List[Callable]] = {} key_bindings: Dict[str, Callable] = {} message_handlers: Dict[str, Callable] = {} - provider: BaseAnimeProvider anime: Anime - media_item: Optional[MediaItem] + def __init__(self, stream_config: StreamConfig): + super().__init__(stream_config) + self.socket_path: Optional[str] = None + self._fetch_thread: Optional[threading.Thread] = None + self._fetch_result_queue: Queue = Queue() + def play( self, player, player_params, provider, anime, media_item=None ) -> PlayerResult: @@ -289,31 +294,34 @@ class MpvIPCPlayer(BaseIPCPlayer): self._setup_key_bindings() self._setup_message_handlers() self._wait_for_playback() + + return PlayerResult( + episode=self.player_state.episode, + stop_time=self.player_state.stop_time, + total_time=self.player_state.total_time, + ) except MPVIPCError as e: logger.warning( f"IPC connection failed: {e}. Falling back to non-IPC playback." ) - self._cleanup() - choice = input( - "FAILED TO PLAY WITH IPC WOULD YOU LIKE TO RESUME PLAYBACK WITHOUT IT(Y/n): " - ) - if choice != "n": + if ( + input("Failed to play with IPC. Continue without it? (Y/n): ").lower() + != "n" + ): return player.play(params) + else: + return PlayerResult( + episode=params.episode, stop_time=None, total_time=None + ) finally: self._cleanup() - return PlayerResult( - episode=self.player_state.episode, - stop_time=self.player_state.stop_time, - total_time=self.player_state.total_time, - ) - def _start_mpv_process(self, player: BasePlayer, params: PlayerParams) -> None: """Start MPV process with IPC enabled.""" temp_dir = Path(tempfile.gettempdir()) self.socket_path = str(temp_dir / f"mpv_ipc_{time.time()}.sock") self.mpv_process = player.play_with_ipc(params, self.socket_path) - time.sleep(1.0) # Give MPV time to start and create the socket + time.sleep(1.0) def _connect_ipc(self): if not self.socket_path: @@ -380,13 +388,38 @@ class MpvIPCPlayer(BaseIPCPlayer): ) def _wait_for_playback(self): + """A non-blocking loop that checks for MPV process exit and processes events.""" if not self.ipc_client: return + + should_stop = False try: - while self.mpv_process and self.mpv_process.poll() is None: - message = self.ipc_client.get_event(block=True, timeout=0.1) - if message: + while not should_stop: + if self.mpv_process and self.mpv_process.poll() is not None: + logger.info("MPV process has exited.") + break + + while True: + message = self.ipc_client.get_event(block=False) + if message is None: + break + + if message.get("event") == "shutdown": + should_stop = True + break + self._handle_mpv_message(message) + + try: + fetch_result = self._fetch_result_queue.get(block=False) + self._handle_fetch_result(fetch_result) + except Empty: + pass + + if should_stop: + break + time.sleep(0.05) + except KeyboardInterrupt: logger.info("Playback interrupted by user") @@ -410,7 +443,8 @@ class MpvIPCPlayer(BaseIPCPlayer): self.player_state.total_time_secs = data elif name == "percent-pos" and isinstance(data, (int, float)): if ( - data >= self.stream_config.episode_complete_at + self.stream_config.auto_next + and data >= self.stream_config.episode_complete_at and not self.player_fetching ): self._auto_next_episode() @@ -451,39 +485,46 @@ class MpvIPCPlayer(BaseIPCPlayer): self.player_fetching = True self._show_text(f"Fetching {episode_type} episode...") + self._fetch_thread = threading.Thread( + target=self._fetch_episode_task, args=(episode_type, ep_no), daemon=True + ) + self._fetch_thread.start() + + def _fetch_episode_task( + self, + episode_type: Literal["next", "previous", "reload", "custom"], + ep_no: Optional[str] = None, + ): + """This function runs in a background thread to fetch episode streams.""" try: available_episodes = getattr( self.anime.episodes, self.stream_config.translation_type ) if not available_episodes: - self._show_text( + raise ValueError( f"No {self.stream_config.translation_type} episodes available." ) - return current_index = available_episodes.index(self.player_state.episode) if episode_type == "next": if current_index >= len(available_episodes) - 1: - self._show_text("Already at the last episode.") - return + raise ValueError("Already at the last episode.") target_episode = available_episodes[current_index + 1] elif episode_type == "previous": if current_index <= 0: - self._show_text("Already at first episode") - return + raise ValueError("Already at first episode") target_episode = available_episodes[current_index - 1] elif episode_type == "reload": target_episode = self.player_state.episode elif episode_type == "custom": if not ep_no or ep_no not in available_episodes: - self._show_text( + raise ValueError( f"Invalid episode. Available: {', '.join(available_episodes)}" ) - return target_episode = ep_no else: - return # Should not happen + return stream_params = EpisodeStreamsParams( anime_id=self.anime.id, @@ -491,38 +532,42 @@ class MpvIPCPlayer(BaseIPCPlayer): episode=target_episode, translation_type=self.stream_config.translation_type, ) - episode_streams = self.provider.episode_streams(stream_params) + # This is the blocking network call, now safely in a thread + episode_streams = list(self.provider.episode_streams(stream_params) or []) if not episode_streams: - self._show_text(f"No streams found for episode {target_episode}") - return + raise ValueError(f"No streams found for episode {target_episode}") - self.player_state.servers = { - ProviderServer(server.name): server for server in episode_streams + result = { + "type": "success", + "target_episode": target_episode, + "servers": {ProviderServer(s.name): s for s in episode_streams}, } - self.player_state.episode = target_episode - self.player_state.reset() - self._show_text( - f"Fetched {self.player_state.episode_title or 'Episode ' + target_episode}" - ) + self._fetch_result_queue.put(result) - except ValueError: - self._show_text("Current episode not found in list.") except Exception as e: - self._show_text(f"Error: {e}") - finally: - self.player_fetching = False + logger.error(f"Episode fetch task failed: {e}") + self._fetch_result_queue.put({"type": "error", "message": str(e)}) + + def _handle_fetch_result(self, result: Dict[str, Any]): + """Handles the result from the background fetch thread in the main thread.""" + self.player_fetching = False + if result["type"] == "success": + self.player_state.episode = result["target_episode"] + self.player_state.servers = result["servers"] + self.player_state.reset() + self._show_text(f"Fetched {self.player_state.episode_title}") + self._load_current_stream() + else: + self._show_text(f"Error: {result['message']}") def _next_episode(self): self._get_episode("next") - self._load_current_stream() def _previous_episode(self): self._get_episode("previous") - self._load_current_stream() def _reload_episode(self): self._get_episode("reload") - self._load_current_stream() def _auto_next_episode(self): if self.stream_config.auto_next: @@ -551,7 +596,7 @@ class MpvIPCPlayer(BaseIPCPlayer): if not self.ipc_client or not self.player_state.stream_subtitles: return - time.sleep(0.5) # Allow file to load + time.sleep(0.5) for i, sub_url in enumerate(self.player_state.stream_subtitles): flag = "select" if i == 0 else "auto" self.ipc_client.send_command(["sub-add", sub_url, flag]) @@ -565,21 +610,12 @@ class MpvIPCPlayer(BaseIPCPlayer): def _toggle_translation_type(self): new_type = "sub" if self.stream_config.translation_type == "dub" else "dub" self._show_text(f"Switching to {new_type}...") - old_type = self.stream_config.translation_type self.stream_config.translation_type = new_type - - self._get_episode("reload") - if self.player_state.stream_url: - self._load_current_stream() - self._show_text(f"Switched to {new_type}") - else: - self.stream_config.translation_type = old_type - self._show_text(f"Failed to switch. Reverting to {old_type}") + self._reload_episode() def _handle_select_episode(self, episode: Optional[str] = None): if episode: self._get_episode("custom", episode) - self._load_current_stream() def _handle_select_server(self, server: Optional[str] = None): if not server or not self.player_state: @@ -592,8 +628,11 @@ class MpvIPCPlayer(BaseIPCPlayer): else: self._show_text(f"Server '{server}' not available.") except ValueError: + available_servers = ", ".join( + [s.value for s in self.player_state.servers.keys()] + ) self._show_text( - f"Invalid server name: {server}. Servers are {', '.join([server.value for server in self.player_state.servers])}" + f"Invalid server name: {server}. Available: {available_servers}" ) def _handle_select_quality(self, quality: Optional[str] = None): diff --git a/fastanime/libs/player/mpv/player.py b/fastanime/libs/player/mpv/player.py index 09af7ff..97027f0 100644 --- a/fastanime/libs/player/mpv/player.py +++ b/fastanime/libs/player/mpv/player.py @@ -63,7 +63,7 @@ class MpvPlayer(BasePlayer): subprocess.run(args) - return PlayerResult() + return PlayerResult(params.episode) def _play_on_desktop(self, params) -> PlayerResult: if not self.executable: @@ -100,7 +100,9 @@ class MpvPlayer(BasePlayer): stop_time = match.group(1) total_time = match.group(2) break - return PlayerResult(total_time=total_time, stop_time=stop_time) + return PlayerResult( + episode=params.episode, total_time=total_time, stop_time=stop_time + ) def play_with_ipc(self, params: PlayerParams, socket_path: str) -> subprocess.Popen: """Stream using IPC player for enhanced features.""" @@ -120,9 +122,7 @@ class MpvPlayer(BasePlayer): logger.info(f"Starting MPV with IPC socket: {socket_path}") - process = subprocess.Popen( - pre_args + mpv_args, stdout=subprocess.PIPE, stderr=subprocess.PIPE - ) + process = subprocess.Popen(pre_args + mpv_args) return process @@ -141,7 +141,7 @@ class MpvPlayer(BasePlayer): args.extend(mpv_args) subprocess.run(args) - return PlayerResult() + return PlayerResult(params.episode) # TODO: Get people with real friends to do this lol def _stream_on_desktop_with_syncplay(self, params: PlayerParams) -> PlayerResult: @@ -156,7 +156,7 @@ class MpvPlayer(BasePlayer): args.extend(mpv_args) subprocess.run(args) - return PlayerResult() + return PlayerResult(params.episode) def _create_mpv_cli_options(self, params: PlayerParams) -> list[str]: mpv_args = [] @@ -185,5 +185,5 @@ if __name__ == "__main__": print(APP_ASCII_ART) url = input("Enter the url you would like to stream: ") mpv = MpvPlayer(MpvConfig()) - player_result = mpv.play(PlayerParams(url=url, title="")) + player_result = mpv.play(PlayerParams(episode="", query="", url=url, title="")) print(player_result)