From 384d326fa821adf3f6cf03b558487ecc94841df9 Mon Sep 17 00:00:00 2001 From: Benexl Date: Tue, 22 Jul 2025 14:55:38 +0300 Subject: [PATCH] feat: cleanup --- fastanime/assets/normalizer.json | 20 +- fastanime/cli/cli.py | 9 +- fastanime/cli/commands/__init__.py | 5 - fastanime/cli/commands/anilist/download.py | 178 -------- fastanime/cli/commands/anilist/downloads.py | 381 ------------------ fastanime/cli/commands/cache.py | 56 --- fastanime/cli/commands/grab.py | 239 ----------- fastanime/cli/commands/helpers.py | 38 -- fastanime/cli/commands/update.py | 55 --- .../libs/providers/anime/hianime/__init__.py | 0 .../libs/providers/anime/hianime/constants.py | 26 -- .../providers/anime/hianime/extractors.py | 191 --------- .../libs/providers/anime/hianime/provider.py | 274 ------------- .../libs/providers/anime/hianime/types.py | 26 -- .../libs/providers/anime/nyaa/__init__.py | 0 .../libs/providers/anime/nyaa/constants.py | 1 - .../libs/providers/anime/nyaa/provider.py | 342 ---------------- fastanime/libs/providers/anime/nyaa/utils.py | 126 ------ fastanime/libs/providers/anime/provider.py | 3 +- .../libs/providers/anime/utils/common.py | 15 - fastanime/libs/providers/anime/utils/data.py | 33 -- fastanime/libs/providers/anime/utils/store.py | 114 ------ fastanime/libs/providers/anime/utils/utils.py | 70 ---- .../libs/providers/anime/utils/utils_1.py | 48 --- .../libs/providers/anime/yugen/__init__.py | 0 .../libs/providers/anime/yugen/constants.py | 4 - .../libs/providers/anime/yugen/provider.py | 223 ---------- 27 files changed, 20 insertions(+), 2457 deletions(-) delete mode 100644 fastanime/cli/commands/anilist/download.py delete mode 100644 fastanime/cli/commands/anilist/downloads.py delete mode 100644 fastanime/cli/commands/cache.py delete mode 100644 fastanime/cli/commands/grab.py delete mode 100644 fastanime/cli/commands/helpers.py delete mode 100644 fastanime/cli/commands/update.py delete mode 100644 fastanime/libs/providers/anime/hianime/__init__.py delete mode 100644 fastanime/libs/providers/anime/hianime/constants.py delete mode 100644 fastanime/libs/providers/anime/hianime/extractors.py delete mode 100644 fastanime/libs/providers/anime/hianime/provider.py delete mode 100644 fastanime/libs/providers/anime/hianime/types.py delete mode 100644 fastanime/libs/providers/anime/nyaa/__init__.py delete mode 100644 fastanime/libs/providers/anime/nyaa/constants.py delete mode 100644 fastanime/libs/providers/anime/nyaa/provider.py delete mode 100644 fastanime/libs/providers/anime/nyaa/utils.py delete mode 100644 fastanime/libs/providers/anime/utils/common.py delete mode 100644 fastanime/libs/providers/anime/utils/data.py delete mode 100644 fastanime/libs/providers/anime/utils/store.py delete mode 100644 fastanime/libs/providers/anime/utils/utils.py delete mode 100644 fastanime/libs/providers/anime/utils/utils_1.py delete mode 100644 fastanime/libs/providers/anime/yugen/__init__.py delete mode 100644 fastanime/libs/providers/anime/yugen/constants.py delete mode 100644 fastanime/libs/providers/anime/yugen/provider.py diff --git a/fastanime/assets/normalizer.json b/fastanime/assets/normalizer.json index 50cf9d2..e662e0b 100644 --- a/fastanime/assets/normalizer.json +++ b/fastanime/assets/normalizer.json @@ -1,5 +1,17 @@ { - "allanime":{ - "1p":"One Piece" - } -} \ No newline at end of file + "allanime": { + "1P": "one piece", + "Magia Record: Mahou Shoujo Madoka☆Magica Gaiden (TV)": "Mahou Shoujo Madoka☆Magica", + "Dungeon ni Deai o Motomeru no wa Machigatte Iru Darouka": "Dungeon ni Deai wo Motomeru no wa Machigatteiru Darou ka", + "Hazurewaku no \"Joutai Ijou Skill\" de Saikyou ni Natta Ore ga Subete wo Juurin suru made": "Hazure Waku no [Joutai Ijou Skill] de Saikyou ni Natta Ore ga Subete wo Juurin Suru made", + "Re:Zero kara Hajimeru Isekai Seikatsu Season 3": "Re:Zero kara Hajimeru Isekai Seikatsu 3rd Season" + }, + "hianime": { + "My Star": "Oshi no Ko" + }, + "animepahe": { + "Azumanga Daiou The Animation": "Azumanga Daioh", + "Mairimashita! Iruma-kun 2nd Season": "Mairimashita! Iruma-kun 2", + "Mairimashita! Iruma-kun 3rd Season": "Mairimashita! Iruma-kun 3" + } +} diff --git a/fastanime/cli/cli.py b/fastanime/cli/cli.py index 37718db..7665fa2 100644 --- a/fastanime/cli/cli.py +++ b/fastanime/cli/cli.py @@ -27,12 +27,9 @@ if TYPE_CHECKING: commands = { - "config": ".config", - "search": ".search", - "download": ".download", - "anilist": ".anilist", - "queue": ".queue", - "service": ".service", + "config": "config.config", + "search": "search.search", + "anilist": "anilist.anilist", } diff --git a/fastanime/cli/commands/__init__.py b/fastanime/cli/commands/__init__.py index b8c591c..e69de29 100644 --- a/fastanime/cli/commands/__init__.py +++ b/fastanime/cli/commands/__init__.py @@ -1,5 +0,0 @@ -from .anilist import anilist -from .config import config -from .search import search - -__all__ = ["config", "search", "anilist"] diff --git a/fastanime/cli/commands/anilist/download.py b/fastanime/cli/commands/anilist/download.py deleted file mode 100644 index c4f987f..0000000 --- a/fastanime/cli/commands/anilist/download.py +++ /dev/null @@ -1,178 +0,0 @@ -""" -Single download command for the anilist CLI. - -Handles downloading specific episodes or continuing from watch history. -""" - -import click -from pathlib import Path -from typing import List, Optional - -from ....core.config.model import AppConfig -from ....libs.api.types import MediaItem -from ...services.downloads import get_download_manager -from ...services.watch_history.manager import WatchHistoryManager - - -def parse_episode_range(range_str: str) -> List[int]: - """Parse episode range string into list of episode numbers.""" - episodes = [] - - for part in range_str.split(','): - part = part.strip() - if '-' in part: - start, end = map(int, part.split('-', 1)) - episodes.extend(range(start, end + 1)) - else: - episodes.append(int(part)) - - return sorted(set(episodes)) # Remove duplicates and sort - - -@click.command(name="download") -@click.argument("query", required=False) -@click.option("--episode", "-e", type=int, help="Specific episode number") -@click.option("--range", "-r", help="Episode range (e.g., 1-12, 5,7,9)") -@click.option("--quality", "-q", - type=click.Choice(["360", "480", "720", "1080", "best"]), - help="Preferred download quality") -@click.option("--continue", "continue_watch", is_flag=True, - help="Continue from watch history") -@click.option("--background", "-b", is_flag=True, - help="Download in background") -@click.option("--path", type=click.Path(exists=True, file_okay=False, dir_okay=True), - help="Custom download location") -@click.option("--subtitles/--no-subtitles", default=None, - help="Include subtitles (overrides config)") -@click.option("--priority", type=int, default=0, - help="Download priority (higher number = higher priority)") -@click.pass_context -def download(ctx: click.Context, query: Optional[str], episode: Optional[int], - range: Optional[str], quality: Optional[str], continue_watch: bool, - background: bool, path: Optional[str], subtitles: Optional[bool], - priority: int): - """ - Download anime episodes with tracking. - - Examples: - - \b - # Download specific episode - fastanime anilist download "Attack on Titan" --episode 1 - - \b - # Download episode range - fastanime anilist download "Naruto" --range "1-5,10,15-20" - - \b - # Continue from watch history - fastanime anilist download --continue - - \b - # Download with custom quality - fastanime anilist download "One Piece" --episode 1000 --quality 720 - """ - - config: AppConfig = ctx.obj - download_manager = get_download_manager(config.downloads) - - try: - # Handle continue from watch history - if continue_watch: - if query: - click.echo("--continue flag cannot be used with a search query", err=True) - ctx.exit(1) - - # Get current watching anime from history - watch_manager = WatchHistoryManager() - current_watching = watch_manager.get_currently_watching() - - if not current_watching: - click.echo("No anime currently being watched found in history", err=True) - ctx.exit(1) - - if len(current_watching) == 1: - media_item = current_watching[0].media_item - next_episode = current_watching[0].last_watched_episode + 1 - episodes_to_download = [next_episode] - else: - # Multiple anime, let user choose - click.echo("Multiple anime found in watch history:") - for i, entry in enumerate(current_watching): - title = entry.media_item.title.english or entry.media_item.title.romaji - next_ep = entry.last_watched_episode + 1 - click.echo(f" {i + 1}. {title} (next episode: {next_ep})") - - choice = click.prompt("Select anime to download", type=int) - if choice < 1 or choice > len(current_watching): - click.echo("Invalid selection", err=True) - ctx.exit(1) - - selected_entry = current_watching[choice - 1] - media_item = selected_entry.media_item - next_episode = selected_entry.last_watched_episode + 1 - episodes_to_download = [next_episode] - - else: - # Search for anime - if not query: - click.echo("Query is required when not using --continue", err=True) - ctx.exit(1) - - # TODO: Integrate with search functionality - # For now, this is a placeholder - you'll need to integrate with your existing search system - click.echo(f"Searching for: {query}") - click.echo("Note: Search integration not yet implemented in this example") - ctx.exit(1) - - # Determine episodes to download - if episode: - episodes_to_download = [episode] - elif range: - try: - episodes_to_download = parse_episode_range(range) - except ValueError as e: - click.echo(f"Invalid episode range: {e}", err=True) - ctx.exit(1) - elif not continue_watch: - # Default to episode 1 if nothing specified - episodes_to_download = [1] - - # Validate episodes - if not episodes_to_download: - click.echo("No episodes specified for download", err=True) - ctx.exit(1) - - if media_item.episodes and max(episodes_to_download) > media_item.episodes: - click.echo(f"Episode {max(episodes_to_download)} exceeds total episodes ({media_item.episodes})", err=True) - ctx.exit(1) - - # Use quality from config if not specified - if not quality: - quality = config.downloads.preferred_quality - - # Add to download queue - success = download_manager.add_to_queue( - media_item=media_item, - episodes=episodes_to_download, - quality=quality, - priority=priority - ) - - if success: - title = media_item.title.english or media_item.title.romaji - episode_text = f"episode {episodes_to_download[0]}" if len(episodes_to_download) == 1 else f"{len(episodes_to_download)} episodes" - - click.echo(f"✓ Added {episode_text} of '{title}' to download queue") - - if background: - click.echo("Download will continue in the background") - else: - click.echo("Run 'fastanime anilist downloads status' to monitor progress") - else: - click.echo("Failed to add episodes to download queue", err=True) - ctx.exit(1) - - except Exception as e: - click.echo(f"Error: {e}", err=True) - ctx.exit(1) diff --git a/fastanime/cli/commands/anilist/downloads.py b/fastanime/cli/commands/anilist/downloads.py deleted file mode 100644 index fc0a2db..0000000 --- a/fastanime/cli/commands/anilist/downloads.py +++ /dev/null @@ -1,381 +0,0 @@ -""" -Downloads management commands for the anilist CLI. - -Provides comprehensive download management including listing, status monitoring, -cleanup, and verification operations. -""" - -import click -import json -from datetime import datetime -from pathlib import Path -from typing import Optional - -from ....core.config.model import AppConfig -from ...services.downloads import get_download_manager -from ...services.downloads.validator import DownloadValidator - - -def format_size(size_bytes: int) -> str: - """Format file size in human-readable format.""" - for unit in ['B', 'KB', 'MB', 'GB', 'TB']: - if size_bytes < 1024.0: - return f"{size_bytes:.1f} {unit}" - size_bytes /= 1024.0 - return f"{size_bytes:.1f} PB" - - -def format_duration(seconds: Optional[float]) -> str: - """Format duration in human-readable format.""" - if seconds is None: - return "Unknown" - - if seconds < 60: - return f"{seconds:.0f}s" - elif seconds < 3600: - return f"{seconds/60:.0f}m {seconds%60:.0f}s" - else: - hours = seconds // 3600 - minutes = (seconds % 3600) // 60 - return f"{hours:.0f}h {minutes:.0f}m" - - -@click.group(name="downloads") -@click.pass_context -def downloads(ctx: click.Context): - """Manage downloaded anime.""" - pass - - -@downloads.command() -@click.option("--status", - type=click.Choice(["all", "completed", "active", "failed", "paused"]), - default="all", - help="Filter by download status") -@click.option("--format", "output_format", - type=click.Choice(["table", "json", "simple"]), - default="table", - help="Output format") -@click.option("--limit", type=int, help="Limit number of results") -@click.pass_context -def list(ctx: click.Context, status: str, output_format: str, limit: Optional[int]): - """List all downloads.""" - - config: AppConfig = ctx.obj - download_manager = get_download_manager(config.downloads) - - try: - # Get download records - status_filter = None if status == "all" else status - records = download_manager.list_downloads(status_filter=status_filter, limit=limit) - - if not records: - click.echo("No downloads found") - return - - if output_format == "json": - # JSON output - output_data = [] - for record in records: - output_data.append({ - "media_id": record.media_item.id, - "title": record.display_title, - "status": record.status, - "episodes_downloaded": record.total_episodes_downloaded, - "total_episodes": record.media_item.episodes or 0, - "completion_percentage": record.completion_percentage, - "total_size_gb": record.total_size_gb, - "last_updated": record.last_updated.isoformat() - }) - - click.echo(json.dumps(output_data, indent=2)) - - elif output_format == "simple": - # Simple text output - for record in records: - title = record.display_title - status_emoji = { - "completed": "✓", - "active": "⬇", - "failed": "✗", - "paused": "⏸" - }.get(record.status, "?") - - click.echo(f"{status_emoji} {title} ({record.total_episodes_downloaded}/{record.media_item.episodes or 0} episodes)") - - else: - # Table output (default) - click.echo() - click.echo("Downloads:") - click.echo("=" * 80) - - # Header - header = f"{'Title':<30} {'Status':<10} {'Episodes':<12} {'Size':<10} {'Updated':<15}" - click.echo(header) - click.echo("-" * 80) - - # Rows - for record in records: - title = record.display_title - if len(title) > 28: - title = title[:25] + "..." - - status_display = record.status.capitalize() - - episodes_display = f"{record.total_episodes_downloaded}/{record.media_item.episodes or '?'}" - - size_display = format_size(record.total_size_bytes) - - updated_display = record.last_updated.strftime("%Y-%m-%d") - - row = f"{title:<30} {status_display:<10} {episodes_display:<12} {size_display:<10} {updated_display:<15}" - click.echo(row) - - click.echo("-" * 80) - click.echo(f"Total: {len(records)} anime") - - except Exception as e: - click.echo(f"Error listing downloads: {e}", err=True) - ctx.exit(1) - - -@downloads.command() -@click.pass_context -def status(ctx: click.Context): - """Show download queue status and statistics.""" - - config: AppConfig = ctx.obj - download_manager = get_download_manager(config.downloads) - - try: - # Get statistics - stats = download_manager.get_download_stats() - - click.echo() - click.echo("Download Statistics:") - click.echo("=" * 40) - click.echo(f"Total Anime: {stats.get('total_anime', 0)}") - click.echo(f"Total Episodes: {stats.get('total_episodes', 0)}") - click.echo(f"Total Size: {stats.get('total_size_gb', 0):.2f} GB") - click.echo(f"Queue Size: {stats.get('queue_size', 0)}") - - # Show completion stats - completion_stats = stats.get('completion_stats', {}) - if completion_stats: - click.echo() - click.echo("Status Breakdown:") - click.echo("-" * 20) - for status, count in completion_stats.items(): - click.echo(f" {status.capitalize()}: {count}") - - # Show active downloads - queue = download_manager._load_queue() - if queue.items: - click.echo() - click.echo("Download Queue:") - click.echo("-" * 30) - for item in queue.items[:5]: # Show first 5 items - title = f"Media {item.media_id}" # Would need to lookup title - click.echo(f" Episode {item.episode_number} of {title} ({item.quality_preference})") - - if len(queue.items) > 5: - click.echo(f" ... and {len(queue.items) - 5} more items") - - except Exception as e: - click.echo(f"Error getting download status: {e}", err=True) - ctx.exit(1) - - -@downloads.command() -@click.option("--dry-run", is_flag=True, help="Show what would be cleaned without doing it") -@click.pass_context -def clean(ctx: click.Context, dry_run: bool): - """Clean up failed downloads and orphaned entries.""" - - config: AppConfig = ctx.obj - download_manager = get_download_manager(config.downloads) - - try: - if dry_run: - click.echo("Dry run mode - no changes will be made") - click.echo() - - # Clean up failed downloads - if not dry_run: - failed_count = download_manager.cleanup_failed_downloads() - click.echo(f"Cleaned up {failed_count} failed downloads") - else: - click.echo("Would clean up failed downloads older than retention period") - - # Clean up orphaned files - validator = DownloadValidator(download_manager) - if not dry_run: - orphaned_count = validator.cleanup_orphaned_files() - click.echo(f"Cleaned up {orphaned_count} orphaned files") - else: - click.echo("Would clean up orphaned files and fix index inconsistencies") - - if dry_run: - click.echo() - click.echo("Run without --dry-run to perform actual cleanup") - - except Exception as e: - click.echo(f"Error during cleanup: {e}", err=True) - ctx.exit(1) - - -@downloads.command() -@click.argument("media_id", type=int, required=False) -@click.option("--all", "verify_all", is_flag=True, help="Verify all downloads") -@click.pass_context -def verify(ctx: click.Context, media_id: Optional[int], verify_all: bool): - """Verify download integrity for specific anime or all downloads.""" - - config: AppConfig = ctx.obj - download_manager = get_download_manager(config.downloads) - - try: - validator = DownloadValidator(download_manager) - - if verify_all: - click.echo("Generating comprehensive validation report...") - report = validator.generate_validation_report() - - click.echo() - click.echo("Validation Report:") - click.echo("=" * 50) - click.echo(f"Total Records: {report['total_records']}") - click.echo(f"Valid Records: {report['valid_records']}") - click.echo(f"Invalid Records: {report['invalid_records']}") - click.echo(f"Integrity Issues: {report['integrity_issues']}") - click.echo(f"Path Issues: {report['path_issues']}") - click.echo(f"Orphaned Files: {report['orphaned_files']}") - - if report['details']['invalid_files']: - click.echo() - click.echo("Invalid Files:") - for file_path in report['details']['invalid_files']: - click.echo(f" - {file_path}") - - if report['details']['integrity_failures']: - click.echo() - click.echo("Integrity Failures:") - for failure in report['details']['integrity_failures']: - click.echo(f" - {failure['title']}: Episodes {failure['failed_episodes']}") - - elif media_id: - record = download_manager.get_download_record(media_id) - if not record: - click.echo(f"No download record found for media ID {media_id}", err=True) - ctx.exit(1) - - click.echo(f"Verifying downloads for: {record.display_title}") - - # Verify integrity - integrity_results = validator.verify_file_integrity(record) - - # Verify paths - path_issues = validator.validate_file_paths(record) - - # Display results - click.echo() - click.echo("Episode Verification:") - click.echo("-" * 30) - - for episode_num, episode_download in record.episodes.items(): - status_emoji = "✓" if integrity_results.get(episode_num, False) else "✗" - click.echo(f" {status_emoji} Episode {episode_num} ({episode_download.status})") - - if not integrity_results.get(episode_num, False): - if not episode_download.file_path.exists(): - click.echo(f" - File missing: {episode_download.file_path}") - elif episode_download.checksum and not episode_download.verify_integrity(): - click.echo(f" - Checksum mismatch") - - if path_issues: - click.echo() - click.echo("Path Issues:") - for issue in path_issues: - click.echo(f" - {issue}") - - else: - click.echo("Specify --all to verify all downloads or provide a media ID", err=True) - ctx.exit(1) - - except Exception as e: - click.echo(f"Error during verification: {e}", err=True) - ctx.exit(1) - - -@downloads.command() -@click.argument("output_file", type=click.Path()) -@click.option("--format", "export_format", - type=click.Choice(["json", "csv"]), - default="json", - help="Export format") -@click.pass_context -def export(ctx: click.Context, output_file: str, export_format: str): - """Export download list to a file.""" - - config: AppConfig = ctx.obj - download_manager = get_download_manager(config.downloads) - - try: - records = download_manager.list_downloads() - output_path = Path(output_file) - - if export_format == "json": - export_data = [] - for record in records: - export_data.append({ - "media_id": record.media_item.id, - "title": record.display_title, - "status": record.status, - "episodes": { - str(ep_num): { - "episode_number": ep.episode_number, - "file_path": str(ep.file_path), - "file_size": ep.file_size, - "quality": ep.quality, - "status": ep.status, - "download_date": ep.download_date.isoformat() - } - for ep_num, ep in record.episodes.items() - }, - "download_path": str(record.download_path), - "created_date": record.created_date.isoformat(), - "last_updated": record.last_updated.isoformat() - }) - - with open(output_path, 'w', encoding='utf-8') as f: - json.dump(export_data, f, indent=2, ensure_ascii=False) - - elif export_format == "csv": - import csv - - with open(output_path, 'w', newline='', encoding='utf-8') as f: - writer = csv.writer(f) - - # Write header - writer.writerow([ - "Media ID", "Title", "Status", "Episodes Downloaded", - "Total Episodes", "Total Size (GB)", "Last Updated" - ]) - - # Write data - for record in records: - writer.writerow([ - record.media_item.id, - record.display_title, - record.status, - record.total_episodes_downloaded, - record.media_item.episodes or 0, - f"{record.total_size_gb:.2f}", - record.last_updated.strftime("%Y-%m-%d %H:%M:%S") - ]) - - click.echo(f"Exported {len(records)} download records to {output_path}") - - except Exception as e: - click.echo(f"Error exporting downloads: {e}", err=True) - ctx.exit(1) diff --git a/fastanime/cli/commands/cache.py b/fastanime/cli/commands/cache.py deleted file mode 100644 index d1e31e2..0000000 --- a/fastanime/cli/commands/cache.py +++ /dev/null @@ -1,56 +0,0 @@ -import click - - -@click.command( - help="Helper command to manage cache", - epilog=""" -\b -\b\bExamples: - # delete everything in the cache dir - fastanime cache --clean -\b - # print the path to the cache dir and exit - fastanime cache --path -\b - # print the current size of the cache dir and exit - fastanime cache --size -\b - # open the cache dir and exit - fastanime cache -""", -) -@click.option("--clean", help="Clean the cache dir", is_flag=True) -@click.option("--path", help="The path to the cache dir", is_flag=True) -@click.option("--size", help="The size of the cache dir", is_flag=True) -def cache(clean, path, size): - from ...constants import APP_CACHE_DIR - - if path: - print(APP_CACHE_DIR) - elif clean: - import shutil - - from rich.prompt import Confirm - - if Confirm.ask( - f"Are you sure you want to clean the following path: {APP_CACHE_DIR};(NOTE: !!The action is irreversible and will clean your cache!!)", - default=False, - ): - print("Cleaning...") - shutil.rmtree(APP_CACHE_DIR) - print("Successfully removed: ", APP_CACHE_DIR) - elif size: - import os - - from ..utils.utils import format_bytes_to_human - - total_size = 0 - for dirpath, dirnames, filenames in os.walk(APP_CACHE_DIR): - for f in filenames: - fp = os.path.join(dirpath, f) - total_size += os.path.getsize(fp) - print("Total Size: ", format_bytes_to_human(total_size)) - else: - import click - - click.launch(APP_CACHE_DIR) diff --git a/fastanime/cli/commands/grab.py b/fastanime/cli/commands/grab.py deleted file mode 100644 index 6aee380..0000000 --- a/fastanime/cli/commands/grab.py +++ /dev/null @@ -1,239 +0,0 @@ -from typing import TYPE_CHECKING - -import click - -from ..utils.completions import anime_titles_shell_complete - -if TYPE_CHECKING: - from ..config import Config - - -@click.command( - help="Helper command to get streams for anime to use externally in a non-python application", - short_help="Print anime streams to standard out", - epilog=""" -\b -\b\bExamples: - # --- print anime info + episode streams --- -\b - # multiple titles can be specified with the -t option - fastanime grab -t -t - # -- or -- - # print all available episodes - fastanime grab -t -r ':' -\b - # print the latest episode - fastanime grab -t -r '-1' -\b - # print a specific episode range - # be sure to observe the range Syntax - fastanime grab -t -r ':' -\b - fastanime grab -t -r '::' -\b - fastanime grab -t -r ':' -\b - fastanime grab -t -r ':' -\b - # --- grab options --- -\b - # print search results only - fastanime grab -t -r --search-results-only -\b - # print anime info only - fastanime grab -t -r --anime-info-only -\b - # print episode streams only - fastanime grab -t -r --episode-streams-only -""", -) -@click.option( - "--anime-titles", - "--anime_title", - "-t", - required=True, - shell_complete=anime_titles_shell_complete, - multiple=True, - help="Specify which anime to download", -) -@click.option( - "--episode-range", - "-r", - help="A range of episodes to download (start-end)", -) -@click.option( - "--search-results-only", - "-s", - help="print only the search results to stdout", - is_flag=True, -) -@click.option( - "--anime-info-only", "-i", help="print only selected anime title info", is_flag=True -) -@click.option( - "--episode-streams-only", - "-e", - help="print only selected anime episodes streams of given range", - is_flag=True, -) -@click.pass_obj -def grab( - config: "Config", - anime_titles: tuple, - episode_range, - search_results_only, - anime_info_only, - episode_streams_only, -): - import json - from logging import getLogger - from sys import exit - - from thefuzz import fuzz - - logger = getLogger(__name__) - if config.manga: - manga_title = anime_titles[0] - from ...MangaProvider import MangaProvider - - manga_provider = MangaProvider() - search_data = manga_provider.search_for_manga(manga_title) - if not search_data: - exit(1) - if search_results_only: - print(json.dumps(search_data)) - exit(0) - search_results = search_data["results"] - if not search_results: - logger.error("no results for your search") - exit(1) - search_results_ = { - search_result["title"]: search_result for search_result in search_results - } - - search_result_anime_title = max( - search_results_.keys(), key=lambda title: fuzz.ratio(title, anime_titles[0]) - ) - manga_info = manga_provider.get_manga( - search_results_[search_result_anime_title]["id"] - ) - if not manga_info: - return - if anime_info_only: - print(json.dumps(manga_info)) - exit(0) - - chapter_info = manga_provider.get_chapter_thumbnails( - manga_info["id"], str(episode_range) - ) - if not chapter_info: - exit(1) - print(json.dumps(chapter_info)) - - else: - from ...BaseAnimeProvider import BaseAnimeProvider - - anime_provider = BaseAnimeProvider(config.provider) - - grabbed_animes = [] - for anime_title in anime_titles: - # ---- search for anime ---- - search_results = anime_provider.search_for_anime( - anime_title, translation_type=config.translation_type - ) - if not search_results: - exit(1) - if search_results_only: - # grab only search results skipping all lines after this - grabbed_animes.append(search_results) - continue - - search_results = search_results["results"] - if not search_results: - logger.error("no results for your search") - exit(1) - search_results_ = { - search_result["title"]: search_result - for search_result in search_results - } - - search_result_anime_title = max( - search_results_.keys(), key=lambda title: fuzz.ratio(title, anime_title) - ) - - # ---- fetch anime ---- - anime = anime_provider.get_anime( - search_results_[search_result_anime_title]["id"] - ) - if not anime: - exit(1) - if anime_info_only: - # grab only the anime data skipping all lines after this - grabbed_animes.append(anime) - continue - episodes = sorted( - anime["availableEpisodesDetail"][config.translation_type], key=float - ) - - # where the magic happens - if episode_range: - if ":" in episode_range: - ep_range_tuple = episode_range.split(":") - if len(ep_range_tuple) == 2 and all(ep_range_tuple): - episodes_start, episodes_end = ep_range_tuple - episodes_range = episodes[ - int(episodes_start) : int(episodes_end) - ] - elif len(ep_range_tuple) == 3 and all(ep_range_tuple): - episodes_start, episodes_end, step = ep_range_tuple - episodes_range = episodes[ - int(episodes_start) : int(episodes_end) : int(step) - ] - else: - episodes_start, episodes_end = ep_range_tuple - if episodes_start.strip(): - episodes_range = episodes[int(episodes_start) :] - elif episodes_end.strip(): - episodes_range = episodes[: int(episodes_end)] - else: - episodes_range = episodes - else: - episodes_range = episodes[int(episode_range) :] - - else: - episodes_range = sorted(episodes, key=float) - - if not episode_streams_only: - grabbed_anime = dict(anime) - grabbed_anime["requested_episodes"] = episodes_range - grabbed_anime["translation_type"] = config.translation_type - grabbed_anime["episodes_streams"] = {} - else: - grabbed_anime = {} - - # lets download em - for episode in episodes_range: - if episode not in episodes: - continue - streams = anime_provider.get_episode_streams( - anime["id"], episode, config.translation_type - ) - if not streams: - continue - episode_streams = {server["server"]: server for server in streams} - - if episode_streams_only: - grabbed_anime[episode] = episode_streams - else: - grabbed_anime["episodes_streams"][ # pyright:ignore - episode - ] = episode_streams - - # grab the full data for single title and appen to final result or episode streams - grabbed_animes.append(grabbed_anime) - - # print out the final result either {} or [] depending if more than one title os requested - if len(grabbed_animes) == 1: - print(json.dumps(grabbed_animes[0])) - else: - print(json.dumps(grabbed_animes)) diff --git a/fastanime/cli/commands/helpers.py b/fastanime/cli/commands/helpers.py deleted file mode 100644 index 5424f1d..0000000 --- a/fastanime/cli/commands/helpers.py +++ /dev/null @@ -1,38 +0,0 @@ -import click - -from ...core.config import AppConfig -from ...libs.api.factory import create_api_client -from ...libs.api.params import ApiSearchParams - - -@click.group(hidden=True) -def helpers_cmd(): - """A hidden group for helper commands called by shell scripts.""" - pass - - -@helpers_cmd.command("search-as-you-type") -@click.argument("query", required=False, default="") -@click.pass_obj -def search_as_you_type(config: AppConfig, query: str): - """ - Performs a live search on AniList and prints results formatted for fzf. - Called by an fzf `reload` binding. - """ - if not query or len(query) < 3: - # Don't search for very short queries to avoid spamming the API - return - - api_client = create_api_client(config.general.media_api, config) - search_params = ApiSearchParams(query=query, per_page=25) - results = api_client.search_media(search_params) - - if not results or not results.media: - return - - # Format output for fzf: one line per item. - for item in results.media: - title = item.title.english or item.title.romaji or "Unknown Title" - score = f"{item.average_score / 10 if item.average_score else 'N/A'}" - # Use a unique, parsable format. The title must come last for the preview helper. - click.echo(f"{item.id} | Score: {score} | {title}") diff --git a/fastanime/cli/commands/update.py b/fastanime/cli/commands/update.py deleted file mode 100644 index 8b13bb2..0000000 --- a/fastanime/cli/commands/update.py +++ /dev/null @@ -1,55 +0,0 @@ -import click - - -@click.command( - help="Helper command to update fastanime to latest", - epilog=""" -\b -\b\bExamples: - # update fastanime to latest - fastanime update -\b - # check for latest release - fastanime update --check - - # Force an update regardless of the current version - fastanime update --force -""", -) -@click.option("--check", "-c", help="Check for the latest release", is_flag=True) -@click.option("--force", "-c", help="Force update", is_flag=True) -def update(check, force): - from rich.console import Console - from rich.markdown import Markdown - - from ... import __version__ - from ..app_updater import check_for_updates, update_app - - def _print_release(release_data): - console = Console() - body = Markdown(release_data["body"]) - tag = github_release_data["tag_name"] - tag_title = release_data["name"] - github_page_url = release_data["html_url"] - console.print(f"Release Page: {github_page_url}") - console.print(f"Tag: {tag}") - console.print(f"Title: {tag_title}") - console.print(body) - - if check: - is_latest, github_release_data = check_for_updates() - if not is_latest: - print( - f"You are running an older version ({__version__}) of fastanime please update to get the latest features" - ) - _print_release(github_release_data) - else: - print(f"You are running the latest version ({__version__}) of fastanime") - _print_release(github_release_data) - else: - success, github_release_data = update_app(force) - _print_release(github_release_data) - if success: - print("Successfully updated") - else: - print("failed to update") diff --git a/fastanime/libs/providers/anime/hianime/__init__.py b/fastanime/libs/providers/anime/hianime/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/fastanime/libs/providers/anime/hianime/constants.py b/fastanime/libs/providers/anime/hianime/constants.py deleted file mode 100644 index 17706e7..0000000 --- a/fastanime/libs/providers/anime/hianime/constants.py +++ /dev/null @@ -1,26 +0,0 @@ -SERVERS_AVAILABLE = ["HD1", "HD2", "StreamSB", "StreamTape"] -"""" - | "hd-1" - | "hd-2" - | "megacloud" - | "streamsb" - | "streamtape"; - -""" - - -""" - VidStreaming = "hd-1", - MegaCloud = "megacloud", - StreamSB = "streamsb", - StreamTape = "streamtape", - VidCloud = "hd-2", - AsianLoad = "asianload", - GogoCDN = "gogocdn", - MixDrop = "mixdrop", - UpCloud = "upcloud", - VizCloud = "vizcloud", - MyCloud = "mycloud", - Filemoon = "filemoon", - -""" diff --git a/fastanime/libs/providers/anime/hianime/extractors.py b/fastanime/libs/providers/anime/hianime/extractors.py deleted file mode 100644 index fa118f7..0000000 --- a/fastanime/libs/providers/anime/hianime/extractors.py +++ /dev/null @@ -1,191 +0,0 @@ -import hashlib -import json -import re -import time -from base64 import b64decode -from typing import TYPE_CHECKING - -from Crypto.Cipher import AES - -if TYPE_CHECKING: - from ...common.requests_cacher import CachedRequestsSession - - -# Constants -megacloud = { - "script": "https://megacloud.tv/js/player/a/prod/e1-player.min.js?v=", - "sources": "https://megacloud.tv/embed-2/ajax/e-1/getSources?id=", -} - - -class HiAnimeError(Exception): - def __init__(self, message, context, status_code): - super().__init__(f"{context}: {message} (Status: {status_code})") - self.context = context - self.status_code = status_code - - -# Adapted from https://github.com/ghoshRitesh12/aniwatch -class MegaCloud: - def __init__(self, session): - self.session: CachedRequestsSession = session - - def extract(self, video_url: str) -> dict: - try: - extracted_data = { - "tracks": [], - "intro": {"start": 0, "end": 0}, - "outro": {"start": 0, "end": 0}, - "sources": [], - } - - video_id = video_url.split("/")[-1].split("?")[0] - response = self.session.get( - megacloud["sources"] + video_id, - headers={ - "Accept": "*/*", - "X-Requested-With": "XMLHttpRequest", - "Referer": video_url, - }, - fresh=1, # pyright: ignore - ) - srcs_data = response.json() - - if not srcs_data: - raise HiAnimeError( - "Url may have an invalid video id", "getAnimeEpisodeSources", 400 - ) - - encrypted_string = srcs_data["sources"] - if not srcs_data["encrypted"] and isinstance(encrypted_string, list): - extracted_data.update( - { - "intro": srcs_data["intro"], - "outro": srcs_data["outro"], - "tracks": srcs_data["tracks"], - "sources": [ - {"url": s["file"], "type": s["type"]} - for s in encrypted_string - ], - } - ) - return extracted_data - - # Fetch decryption script - script_response = self.session.get( - megacloud["script"] + str(int(time.time() * 1000)), - fresh=1, # pyright: ignore - ) - script_text = script_response.text - if not script_text: - raise HiAnimeError( - "Couldn't fetch script to decrypt resource", - "getAnimeEpisodeSources", - 500, - ) - - vars_ = self.extract_variables(script_text) - if not vars_: - raise Exception( - "Can't find variables. Perhaps the extractor is outdated." - ) - - secret, encrypted_source = self.get_secret(encrypted_string, vars_) - decrypted = self.decrypt(encrypted_source, secret) - - try: - sources = json.loads(decrypted) - extracted_data.update( - { - "intro": srcs_data["intro"], - "outro": srcs_data["outro"], - "tracks": srcs_data["tracks"], - "sources": [ - {"url": s["file"], "type": s["type"]} for s in sources - ], - } - ) - return extracted_data - except Exception: - raise HiAnimeError( - "Failed to decrypt resource", "getAnimeEpisodeSources", 500 - ) - except Exception as err: - raise err - - def extract_variables(self, text: str) -> list[list[int]]: - regex = r"case\s*0x[0-9a-f]+:(?![^;]*=partKey)\s*\w+\s*=\s*(\w+)\s*,\s*\w+\s*=\s*(\w+);" - matches = re.finditer(regex, text) - vars_ = [] - for match in matches: - key1 = self.matching_key(match[1], text) - key2 = self.matching_key(match[2], text) - try: - vars_.append([int(key1, 16), int(key2, 16)]) - except ValueError: - continue - return vars_ - - def get_secret( - self, encrypted_string: str, values: list[list[int]] - ) -> tuple[str, str]: - secret = [] - encrypted_source_array = list(encrypted_string) - current_index = 0 - - for start, length in values: - start += current_index - end = start + length - secret.extend(encrypted_string[start:end]) - encrypted_source_array[start:end] = [""] * length - current_index += length - - encrypted_source = "".join(encrypted_source_array) # .replace("\x00", "") - return ("".join(secret), encrypted_source) - - def decrypt(self, encrypted: str, key_or_secret: str, maybe_iv: str = "") -> str: - if maybe_iv: - key = key_or_secret.encode() - iv = maybe_iv.encode() - contents = encrypted - else: - # Decode the Base64 string - cypher = b64decode(encrypted) - - # Extract the salt from the cypher text - salt = cypher[8:16] - - # Combine the key_or_secret with the salt - password = key_or_secret.encode() + salt - - # Generate MD5 hashes - md5_hashes = [] - digest = password - for _ in range(3): - md5 = hashlib.md5() - md5.update(digest) - md5_hashes.append(md5.digest()) - digest = md5_hashes[-1] + password - - # Derive the key and IV - key = md5_hashes[0] + md5_hashes[1] - iv = md5_hashes[2] - - # Extract the encrypted contents - contents = cypher[16:] - - # Initialize the AES decipher - decipher = AES.new(key, AES.MODE_CBC, iv) - - # Decrypt and decode - decrypted = decipher.decrypt(contents).decode("utf-8") # pyright: ignore - - # Remove any padding (PKCS#7) - pad = ord(decrypted[-1]) - return decrypted[:-pad] - - def matching_key(self, value: str, script: str) -> str: - match = re.search(rf",{value}=((?:0x)?[0-9a-fA-F]+)", script) - if match: - return match.group(1).replace("0x", "") - raise Exception("Failed to match the key") diff --git a/fastanime/libs/providers/anime/hianime/provider.py b/fastanime/libs/providers/anime/hianime/provider.py deleted file mode 100644 index a2ab2d2..0000000 --- a/fastanime/libs/providers/anime/hianime/provider.py +++ /dev/null @@ -1,274 +0,0 @@ -import logging -import re -from html.parser import HTMLParser -from itertools import cycle -from urllib.parse import quote_plus - -from yt_dlp.utils import ( - clean_html, - extract_attributes, - get_element_by_class, - get_element_html_by_class, - get_elements_by_class, - get_elements_html_by_class, -) - -from ..base import BaseAnimeProvider -from ..decorators import debug_provider -from ..utils.utils import give_random_quality -from .constants import SERVERS_AVAILABLE -from .extractors import MegaCloud -from .types import HiAnimeStream - -logger = logging.getLogger(__name__) - -LINK_TO_STREAMS_REGEX = re.compile(r".*://(.*)/embed-(2|4|6)/e-([0-9])/(.*)\?.*") -IMAGE_HTML_ELEMENT_REGEX = re.compile(r"") - - -class ParseAnchorAndImgTag(HTMLParser): - def __init__(self): - super().__init__() - self.img_tag = None - self.a_tag = None - - def handle_starttag(self, tag, attrs): - if tag == "img": - self.img_tag = {attr[0]: attr[1] for attr in attrs} - if tag == "a": - self.a_tag = {attr[0]: attr[1] for attr in attrs} - - -class HiAnime(BaseAnimeProvider): - # HEADERS = {"Referer": "https://hianime.to/home"} - - @debug_provider - def search_for_anime(self, anime_title: str, translation_type, **kwargs): - query = quote_plus(anime_title) - url = f"https://hianime.to/search?keyword={query}" - response = self.session.get(url) - if not response.ok: - return - search_page = response.text - search_results_html_items = get_elements_by_class("flw-item", search_page) - results = [] - for search_results_html_item in search_results_html_items: - film_poster_html = get_element_by_class( - "film-poster", search_results_html_item - ) - - if not film_poster_html: - continue - # get availableEpisodes - episodes_html = get_element_html_by_class("tick-sub", film_poster_html) - episodes = clean_html(episodes_html) or 12 - - # get anime id and poster image url - parser = ParseAnchorAndImgTag() - parser.feed(film_poster_html) - image_data = parser.img_tag - anime_link_data = parser.a_tag - if not image_data or not anime_link_data: - continue - - episodes = int(episodes) - - # finally!! - image_link = image_data["data-src"] - anime_id = anime_link_data["data-id"] - title = anime_link_data["title"] - - result = { - "availableEpisodes": list(range(1, episodes)), - "id": anime_id, - "title": title, - "poster": image_link, - } - - results.append(result) - - self.store.set(result["id"], "search_result", result) - return {"pageInfo": {}, "results": results} - - @debug_provider - def get_anime(self, hianime_id, **kwargs): - anime_result = {} - if d := self.store.get(str(hianime_id), "search_result"): - anime_result = d - anime_url = f"https://hianime.to/ajax/v2/episode/list/{hianime_id}" - response = self.session.get(anime_url, timeout=10) - if response.ok: - response_json = response.json() - hianime_anime_page = response_json["html"] - episodes_info_container_html = get_element_html_by_class( - "ss-list", hianime_anime_page - ) - episodes_info_html_list = get_elements_html_by_class( - "ep-item", episodes_info_container_html - ) - # keys: [ data-number: episode_number, data-id: episode_id, title: episode_title , href:episode_page_url] - episodes_info_dicts = [ - extract_attributes(episode_dict) - for episode_dict in episodes_info_html_list - ] - episodes = [episode["data-number"] for episode in episodes_info_dicts] - episodes_info = [ - { - "id": episode["data-id"], - "title": ( - (episode["title"] or "").replace( - f"Episode {episode['data-number']}", "" - ) - or anime_result["title"] - ) - + f"; Episode {episode['data-number']}", - "episode": episode["data-number"], - } - for episode in episodes_info_dicts - ] - self.store.set( - str(hianime_id), - "anime_info", - episodes_info, - ) - return { - "id": hianime_id, - "availableEpisodesDetail": { - "dub": episodes, - "sub": episodes, - "raw": episodes, - }, - "poster": anime_result["poster"], - "title": anime_result["title"], - "episodes_info": episodes_info, - } - - @debug_provider - def get_episode_streams(self, anime_id, episode, translation_type, **kwargs): - if d := self.store.get(str(anime_id), "anime_info"): - episodes_info = d - episode_details = [ - episode_details - for episode_details in episodes_info - if episode_details["episode"] == episode - ] - if not episode_details: - return - episode_details = episode_details[0] - episode_url = f"https://hianime.to/ajax/v2/episode/servers?episodeId={episode_details['id']}" - response = self.session.get(episode_url) - if response.ok: - response_json = response.json() - episode_page_html = response_json["html"] - servers_containers_html = get_elements_html_by_class( - "ps__-list", episode_page_html - ) - if not servers_containers_html: - return - # sub servers - try: - servers_html_sub = get_elements_html_by_class( - "server-item", servers_containers_html[0] - ) - except Exception: - logger.warning("HiAnime: sub not found") - servers_html_sub = None - - # dub servers - try: - servers_html_dub = get_elements_html_by_class( - "server-item", servers_containers_html[1] - ) - except Exception: - logger.warning("HiAnime: dub not found") - servers_html_dub = None - - if translation_type == "dub": - servers_html = servers_html_dub - else: - servers_html = servers_html_sub - if not servers_html: - return - - @debug_provider - def _get_server(server_name, server_html): - # keys: [ data-type: translation_type, data-id: embed_id, data-server-id: server_id ] - servers_info = extract_attributes(server_html) - server_id = servers_info["data-id"] - embed_url = ( - f"https://hianime.to/ajax/v2/episode/sources?id={server_id}" - ) - embed_response = self.session.get(embed_url) - if embed_response.ok: - embed_json = embed_response.json() - raw_link_to_streams = embed_json["link"] - match server_name: - # TODO: Finish the other servers - case "HD2": - data = MegaCloud(self.session).extract( - raw_link_to_streams - ) - return { - "headers": {}, - "subtitles": [ - { - "url": track["file"], - "language": track["label"], - } - for track in data["tracks"] - if track["kind"] == "captions" - ], - "server": server_name, - "episode_title": episode_details["title"], - "links": give_random_quality( - [ - {"link": link["url"]} - for link in data["sources"] - ] - ), - } - case _: - # NOTE: THIS METHOD DOES'NT WORK will get the other servers later - match = LINK_TO_STREAMS_REGEX.match(raw_link_to_streams) - if not match: - return - provider_domain = match.group(1) - embed_type = match.group(2) - episode_number = match.group(3) - source_id = match.group(4) - - link_to_streams = f"https://{provider_domain}/embed-{embed_type}/ajax/e-{episode_number}/getSources?id={source_id}" - link_to_streams_response = self.session.get( - link_to_streams - ) - if link_to_streams_response.ok: - juicy_streams_json: HiAnimeStream = ( - link_to_streams_response.json() - ) - - return { - "headers": {}, - "subtitles": [ - { - "url": track["file"], - "language": track["label"], - } - for track in juicy_streams_json["tracks"] - if track["kind"] == "captions" - ], - "server": server_name, - "episode_title": episode_details["title"], - "links": give_random_quality( - [ - {"link": link["file"]} - for link in juicy_streams_json["tracks"] - ] - ), - } - - for server_name, server_html in zip( - cycle(SERVERS_AVAILABLE), servers_html - ): - if server_name == "HD2": - if server := _get_server(server_name, server_html): - yield server diff --git a/fastanime/libs/providers/anime/hianime/types.py b/fastanime/libs/providers/anime/hianime/types.py deleted file mode 100644 index 1f0cff1..0000000 --- a/fastanime/libs/providers/anime/hianime/types.py +++ /dev/null @@ -1,26 +0,0 @@ -from typing import Literal, TypedDict - - -class HiAnimeSkipTime(TypedDict): - start: int - end: int - - -class HiAnimeSource(TypedDict): - file: str - type: str - - -class HiAnimeTrack(TypedDict): - file: str - label: str - kind: Literal["captions", "thumbnails", "audio"] - - -class HiAnimeStream(TypedDict): - sources: list[HiAnimeSource] - tracks: list[HiAnimeTrack] - encrypted: bool - intro: HiAnimeSkipTime - outro: HiAnimeSkipTime - server: int diff --git a/fastanime/libs/providers/anime/nyaa/__init__.py b/fastanime/libs/providers/anime/nyaa/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/fastanime/libs/providers/anime/nyaa/constants.py b/fastanime/libs/providers/anime/nyaa/constants.py deleted file mode 100644 index eda1cd3..0000000 --- a/fastanime/libs/providers/anime/nyaa/constants.py +++ /dev/null @@ -1 +0,0 @@ -NYAA_ENDPOINT = "https://nyaa.si" diff --git a/fastanime/libs/providers/anime/nyaa/provider.py b/fastanime/libs/providers/anime/nyaa/provider.py deleted file mode 100644 index 214268f..0000000 --- a/fastanime/libs/providers/anime/nyaa/provider.py +++ /dev/null @@ -1,342 +0,0 @@ -import os -import re -from logging import getLogger - -from yt_dlp.utils import ( - extract_attributes, - get_element_html_by_attribute, - get_element_html_by_class, - get_element_text_and_html_by_tag, - get_elements_html_by_class, -) - -from ...common.mini_anilist import search_for_anime_with_anilist -from ..base import BaseAnimeProvider -from ..decorators import debug_provider -from ..types import SearchResults -from .constants import NYAA_ENDPOINT - -logger = getLogger(__name__) - -EXTRACT_USEFUL_INFO_PATTERN_1 = re.compile( - r"\[(\w+)\] (.+) - (\d+) [\[\(](\d+)p[\]\)].*" -) - -EXTRACT_USEFUL_INFO_PATTERN_2 = re.compile( - r"\[(\w+)\] (.+)E(\d+) [\[\(]?(\d+)p.*[\]\)]?.*" -) - - -class Nyaa(BaseAnimeProvider): - search_results: SearchResults - - @debug_provider - def search_for_anime(self, user_query: str, *args, **_): - self.search_results = search_for_anime_with_anilist(user_query, True) # pyright: ignore - self.user_query = user_query - return self.search_results - - @debug_provider - def get_anime(self, anilist_id: str, *_): - for anime in self.search_results["results"]: - if anime["id"] == anilist_id: - self.titles = [anime["title"], *anime["otherTitles"], self.user_query] - return { - "id": anime["id"], - "title": anime["title"], - "poster": anime["poster"], - "availableEpisodesDetail": { - "dub": anime["availableEpisodes"], - "sub": anime["availableEpisodes"], - "raw": anime["availableEpisodes"], - }, - } - - @debug_provider - def get_episode_streams( - self, - anime_id: str, - episode_number: str, - translation_type: str, - trusted_only=bool(int(os.environ.get("FA_NYAA_TRUSTED_ONLY", "0"))), - allow_dangerous=bool(int(os.environ.get("FA_NYAA_ALLOW_DANGEROUS", "0"))), - sort_by="seeders", - *args, - ): - anime_title = self.titles[0] - logger.debug(f"Searching nyaa for query: '{anime_title} {episode_number}'") - servers = {} - - torrents_table = "" - for title in self.titles: - try: - url_arguments: dict[str, str] = { - "c": "1_2", # Language (English) - "q": f"{title} {'0' if len(episode_number) == 1 else ''}{episode_number}", # Search Query - } - # url_arguments["q"] = anime_title - - # if trusted_only: - # url_arguments["f"] = "2" # Trusted uploaders only - - # What to sort torrents by - if sort_by == "seeders": - url_arguments["s"] = "seeders" - elif sort_by == "date": - url_arguments["s"] = "id" - elif sort_by == "size": - url_arguments["s"] = "size" - elif sort_by == "comments": - url_arguments["s"] = "comments" - - logger.debug(f"URL Arguments: {url_arguments}") - - response = self.session.get(NYAA_ENDPOINT, params=url_arguments) - if not response.ok: - logger.error(f"[NYAA]: {response.text}") - return - - try: - torrents_table = get_element_text_and_html_by_tag( - "table", response.text - ) - except Exception as e: - logger.error(f"[NYAA]: {e}") - continue - - if not torrents_table: - continue - - for anime_torrent in get_elements_html_by_class( - "success", torrents_table[1] - ): - td_title = get_element_html_by_attribute( - "colspan", "2", anime_torrent - ) - if not td_title: - continue - title_anchor_tag = get_element_text_and_html_by_tag("a", td_title) - - if not title_anchor_tag: - continue - title_anchor_tag_attrs = extract_attributes(title_anchor_tag[1]) - if not title_anchor_tag_attrs: - continue - if "class" in title_anchor_tag_attrs: - td_title = td_title.replace(title_anchor_tag[1], "") - title_anchor_tag = get_element_text_and_html_by_tag( - "a", td_title - ) - - if not title_anchor_tag: - continue - title_anchor_tag_attrs = extract_attributes(title_anchor_tag[1]) - if not title_anchor_tag_attrs: - continue - anime_title_info = title_anchor_tag_attrs["title"] - if not anime_title_info: - continue - match = EXTRACT_USEFUL_INFO_PATTERN_1.search( - anime_title_info.strip() - ) - if not match: - continue - server = match[1] - match[2] - _episode_number = match[3] - quality = match[4] - if float(episode_number) != float(_episode_number): - continue - - links_td = get_element_html_by_class("text-center", anime_torrent) - if not links_td: - continue - torrent_anchor_tag = get_element_text_and_html_by_tag("a", links_td) - if not torrent_anchor_tag: - continue - torrent_anchor_tag_atrrs = extract_attributes(torrent_anchor_tag[1]) - if not torrent_anchor_tag_atrrs: - continue - torrent_file_url = ( - f"{NYAA_ENDPOINT}{torrent_anchor_tag_atrrs['href']}" - ) - if server in servers: - link = { - "translation_type": "sub", - "link": torrent_file_url, - "quality": quality, - } - if link not in servers[server]["links"]: - servers[server]["links"].append(link) - else: - servers[server] = { - "server": server, - "headers": {}, - "episode_title": f"{anime_title}; Episode {episode_number}", - "subtitles": [], - "links": [ - { - "translation_type": "sub", - "link": torrent_file_url, - "quality": quality, - } - ], - } - for anime_torrent in get_elements_html_by_class( - "default", torrents_table[1] - ): - td_title = get_element_html_by_attribute( - "colspan", "2", anime_torrent - ) - if not td_title: - continue - title_anchor_tag = get_element_text_and_html_by_tag("a", td_title) - - if not title_anchor_tag: - continue - title_anchor_tag_attrs = extract_attributes(title_anchor_tag[1]) - if not title_anchor_tag_attrs: - continue - if "class" in title_anchor_tag_attrs: - td_title = td_title.replace(title_anchor_tag[1], "") - title_anchor_tag = get_element_text_and_html_by_tag( - "a", td_title - ) - - if not title_anchor_tag: - continue - title_anchor_tag_attrs = extract_attributes(title_anchor_tag[1]) - if not title_anchor_tag_attrs: - continue - anime_title_info = title_anchor_tag_attrs["title"] - if not anime_title_info: - continue - match = EXTRACT_USEFUL_INFO_PATTERN_2.search( - anime_title_info.strip() - ) - if not match: - continue - server = match[1] - match[2] - _episode_number = match[3] - quality = match[4] - if float(episode_number) != float(_episode_number): - continue - - links_td = get_element_html_by_class("text-center", anime_torrent) - if not links_td: - continue - torrent_anchor_tag = get_element_text_and_html_by_tag("a", links_td) - if not torrent_anchor_tag: - continue - torrent_anchor_tag_atrrs = extract_attributes(torrent_anchor_tag[1]) - if not torrent_anchor_tag_atrrs: - continue - torrent_file_url = ( - f"{NYAA_ENDPOINT}{torrent_anchor_tag_atrrs['href']}" - ) - if server in servers: - link = { - "translation_type": "sub", - "link": torrent_file_url, - "quality": quality, - } - if link not in servers[server]["links"]: - servers[server]["links"].append(link) - else: - servers[server] = { - "server": server, - "headers": {}, - "episode_title": f"{anime_title}; Episode {episode_number}", - "subtitles": [], - "links": [ - { - "translation_type": "sub", - "link": torrent_file_url, - "quality": quality, - } - ], - } - if not allow_dangerous: - break - for anime_torrent in get_elements_html_by_class( - "danger", torrents_table[1] - ): - td_title = get_element_html_by_attribute( - "colspan", "2", anime_torrent - ) - if not td_title: - continue - title_anchor_tag = get_element_text_and_html_by_tag("a", td_title) - - if not title_anchor_tag: - continue - title_anchor_tag_attrs = extract_attributes(title_anchor_tag[1]) - if not title_anchor_tag_attrs: - continue - if "class" in title_anchor_tag_attrs: - td_title = td_title.replace(title_anchor_tag[1], "") - title_anchor_tag = get_element_text_and_html_by_tag( - "a", td_title - ) - - if not title_anchor_tag: - continue - title_anchor_tag_attrs = extract_attributes(title_anchor_tag[1]) - if not title_anchor_tag_attrs: - continue - anime_title_info = title_anchor_tag_attrs["title"] - if not anime_title_info: - continue - match = EXTRACT_USEFUL_INFO_PATTERN_2.search( - anime_title_info.strip() - ) - if not match: - continue - server = match[1] - match[2] - _episode_number = match[3] - quality = match[4] - if float(episode_number) != float(_episode_number): - continue - - links_td = get_element_html_by_class("text-center", anime_torrent) - if not links_td: - continue - torrent_anchor_tag = get_element_text_and_html_by_tag("a", links_td) - if not torrent_anchor_tag: - continue - torrent_anchor_tag_atrrs = extract_attributes(torrent_anchor_tag[1]) - if not torrent_anchor_tag_atrrs: - continue - torrent_file_url = ( - f"{NYAA_ENDPOINT}{torrent_anchor_tag_atrrs['href']}" - ) - if server in servers: - link = { - "translation_type": "sub", - "link": torrent_file_url, - "quality": quality, - } - if link not in servers[server]["links"]: - servers[server]["links"].append(link) - else: - servers[server] = { - "server": server, - "headers": {}, - "episode_title": f"{anime_title}; Episode {episode_number}", - "subtitles": [], - "links": [ - { - "translation_type": "sub", - "link": torrent_file_url, - "quality": quality, - } - ], - } - except Exception as e: - logger.error(f"[NYAA]: {e}") - continue - - for server in servers: - yield servers[server] diff --git a/fastanime/libs/providers/anime/nyaa/utils.py b/fastanime/libs/providers/anime/nyaa/utils.py deleted file mode 100644 index aab47c7..0000000 --- a/fastanime/libs/providers/anime/nyaa/utils.py +++ /dev/null @@ -1,126 +0,0 @@ -import logging -import os -import sys -import time - -import libtorrent # pyright: ignore -from rich import print -from rich.progress import ( - BarColumn, - DownloadColumn, - Progress, - TextColumn, - TimeRemainingColumn, - TransferSpeedColumn, -) - -logger = logging.getLogger("nyaa") - - -def download_torrent( - filename: str, - result_filename: str | None = None, - show_progress: bool = True, - base_path: str = "Anime", -) -> str: - session = libtorrent.session({"listen_interfaces": "0.0.0.0:6881"}) - logger.debug("Started libtorrent session") - - base_path = os.path.expanduser(base_path) - logger.debug(f"Downloading output to: '{base_path}'") - - info = libtorrent.torrent_info(filename) - - logger.debug("Started downloading torrent") - handle: libtorrent.torrent_handle = session.add_torrent( - {"ti": info, "save_path": base_path} - ) - - status: libtorrent.session_status = handle.status() - - progress_bar = Progress( - "[progress.description]{task.description}", - BarColumn(bar_width=None), - "[progress.percentage]{task.percentage:>3.1f}%", - "•", - DownloadColumn(), - "•", - TransferSpeedColumn(), - "•", - TimeRemainingColumn(), - "•", - TextColumn("[green]Peers: {task.fields[peers]}[/green]"), - ) - - if show_progress: - with progress_bar: - download_task = progress_bar.add_task( - "downloading", - filename=status.name, - total=status.total_wanted, - peers=0, - start=False, - ) - - while not status.total_done: - # Checking files - status = handle.status() - description = "[bold yellow]Checking files[/bold yellow]" - progress_bar.update( - download_task, - completed=status.total_done, - peers=status.num_peers, - description=description, - ) - - # Started download - progress_bar.start_task(download_task) - description = f"[bold blue]Downloading[/bold blue] [bold yellow]{result_filename}[/bold yellow]" - - while not status.is_seeding: - status = handle.status() - - progress_bar.update( - download_task, - completed=status.total_done, - peers=status.num_peers, - description=description, - ) - - alerts = session.pop_alerts() - - alert: libtorrent.alert - for alert in alerts: - if ( - alert.category() - & libtorrent.alert.category_t.error_notification - ): - logger.debug(f"[Alert] {alert}") - - time.sleep(1) - - progress_bar.update( - download_task, - description=f"[bold blue]Finished Downloading[/bold blue] [bold green]{result_filename}[/bold green]", - completed=status.total_wanted, - ) - - if result_filename: - old_name = f"{base_path}/{status.name}" - new_name = f"{base_path}/{result_filename}" - - os.rename(old_name, new_name) - - logger.debug(f"Finished torrent download, renamed '{old_name}' to '{new_name}'") - - return new_name - - return "" - - -if __name__ == "__main__": - if len(sys.argv) < 2: - print("You need to pass in the .torrent file path.") - sys.exit(1) - - download_torrent(sys.argv[1]) diff --git a/fastanime/libs/providers/anime/provider.py b/fastanime/libs/providers/anime/provider.py index bbbb1c9..7689bf2 100644 --- a/fastanime/libs/providers/anime/provider.py +++ b/fastanime/libs/providers/anime/provider.py @@ -7,7 +7,6 @@ from yt_dlp.utils.networking import random_user_agent from .allanime.constants import SERVERS_AVAILABLE as ALLANIME_SERVERS from .animepahe.constants import SERVERS_AVAILABLE as ANIMEPAHE_SERVERS from .base import BaseAnimeProvider -from .hianime.constants import SERVERS_AVAILABLE as HIANIME_SERVERS logger = logging.getLogger(__name__) @@ -18,7 +17,7 @@ PROVIDERS_AVAILABLE = { "nyaa": "provider.Nyaa", "yugen": "provider.Yugen", } -SERVERS_AVAILABLE = ["TOP", *ALLANIME_SERVERS, *ANIMEPAHE_SERVERS, *HIANIME_SERVERS] +SERVERS_AVAILABLE = ["TOP", *ALLANIME_SERVERS, *ANIMEPAHE_SERVERS] class AnimeProviderFactory: diff --git a/fastanime/libs/providers/anime/utils/common.py b/fastanime/libs/providers/anime/utils/common.py deleted file mode 100644 index 8ff6b57..0000000 --- a/fastanime/libs/providers/anime/utils/common.py +++ /dev/null @@ -1,15 +0,0 @@ -import logging - -from requests import get - -logger = logging.getLogger(__name__) - - -def fetch_anime_info_from_bal(anilist_id): - try: - url = f"https://raw.githubusercontent.com/bal-mackup/mal-backup/master/anilist/anime/{anilist_id}.json" - response = get(url, timeout=11) - if response.status_code == 200: - return response.json() - except Exception as e: - logger.error(e) diff --git a/fastanime/libs/providers/anime/utils/data.py b/fastanime/libs/providers/anime/utils/data.py deleted file mode 100644 index bfe2a85..0000000 --- a/fastanime/libs/providers/anime/utils/data.py +++ /dev/null @@ -1,33 +0,0 @@ -""" -Just contains some useful data used across the codebase -""" - -# useful incases where the anilist title is too different from the provider title -anime_normalizer_raw = { - "allanime": { - "1P": "one piece", - "Magia Record: Mahou Shoujo Madoka☆Magica Gaiden (TV)": "Mahou Shoujo Madoka☆Magica", - "Dungeon ni Deai o Motomeru no wa Machigatte Iru Darouka": "Dungeon ni Deai wo Motomeru no wa Machigatteiru Darou ka", - 'Hazurewaku no "Joutai Ijou Skill" de Saikyou ni Natta Ore ga Subete wo Juurin suru made': "Hazure Waku no [Joutai Ijou Skill] de Saikyou ni Natta Ore ga Subete wo Juurin Suru made", - "Re:Zero kara Hajimeru Isekai Seikatsu Season 3": "Re:Zero kara Hajimeru Isekai Seikatsu 3rd Season", - }, - "hianime": {"My Star": "Oshi no Ko"}, - "animepahe": { - "Azumanga Daiou The Animation": "Azumanga Daioh", - "Mairimashita! Iruma-kun 2nd Season": "Mairimashita! Iruma-kun 2", - "Mairimashita! Iruma-kun 3rd Season": "Mairimashita! Iruma-kun 3", - }, - "nyaa": {}, - "yugen": {}, -} - - -def get_anime_normalizer(): - """Used because there are different providers""" - import os - - current_provider = os.environ.get("FASTANIME_PROVIDER", "allanime") - return anime_normalizer_raw[current_provider] - - -anime_normalizer = get_anime_normalizer() diff --git a/fastanime/libs/providers/anime/utils/store.py b/fastanime/libs/providers/anime/utils/store.py deleted file mode 100644 index 1936973..0000000 --- a/fastanime/libs/providers/anime/utils/store.py +++ /dev/null @@ -1,114 +0,0 @@ -import json -import logging -import time - -logger = logging.getLogger(__name__) - - -class ProviderStoreDB: - def __init__( - self, - provider_name, - cache_db_path: str, - max_lifetime: int = 604800, - max_size: int = (1024**2) * 10, - table_name: str = "fastanime_providers_store", - clean_db=False, - ): - from ..common.sqlitedb_helper import SqliteDB - - self.cache_db_path = cache_db_path - self.clean_db = clean_db - self.provider_name = provider_name - self.max_lifetime = max_lifetime - self.max_size = max_size - self.table_name = table_name - self.sqlite_db_connection = SqliteDB(self.cache_db_path) - - # Prepare the cache table if it doesn't exist - self._create_store_table() - - def _create_store_table(self): - """Create cache table if it doesn't exist.""" - with self.sqlite_db_connection as conn: - conn.execute( - f""" - CREATE TABLE IF NOT EXISTS {self.table_name} ( - id TEXT, - data_type TEXT, - provider_name TEXT, - data TEXT, - cache_expiry INTEGER - )""" - ) - - def get(self, id: str, data_type: str, default=None): - with self.sqlite_db_connection as conn: - cursor = conn.cursor() - cursor.execute( - f""" - SELECT - data - FROM {self.table_name} - WHERE - id = ? - AND data_type = ? - AND provider_name = ? - AND cache_expiry > ? - """, - (id, data_type, self.provider_name, int(time.time())), - ) - cached_data = cursor.fetchone() - - if cached_data: - logger.debug("Found existing request in cache") - (json_data,) = cached_data - return json.loads(json_data) - return default - - def set(self, id: str, data_type: str, data): - with self.sqlite_db_connection as connection: - cursor = connection.cursor() - cursor.execute( - f""" - INSERT INTO {self.table_name} - VALUES ( ?, ?,?, ?, ?) - """, - ( - id, - data_type, - self.provider_name, - json.dumps(data), - int(time.time()) + self.max_lifetime, - ), - ) - - -class ProviderStoreMem: - def __init__(self) -> None: - from collections import defaultdict - - self._store = defaultdict(dict) - - def get(self, id: str, data_type: str, default=None): - return self._store[id][data_type] - - def set(self, id: str, data_type: str, data): - self._store[id][data_type] = data - - -def ProviderStore(store_type, *args, **kwargs): - if store_type == "persistent": - return ProviderStoreDB(*args, **kwargs) - else: - return ProviderStoreMem() - - -if __name__ == "__main__": - store = ProviderStore("persistent", "test_provider", "provider_store") - store.set("123", "test", {"hello": "world"}) - print(store.get("123", "test")) - print("-------------------------------") - store = ProviderStore("memory") - store.set("1", "test", {"hello": "world"}) - print(store.get("1", "test")) diff --git a/fastanime/libs/providers/anime/utils/utils.py b/fastanime/libs/providers/anime/utils/utils.py deleted file mode 100644 index 3dee3fc..0000000 --- a/fastanime/libs/providers/anime/utils/utils.py +++ /dev/null @@ -1,70 +0,0 @@ -import re -from itertools import cycle - -# Dictionary to map hex values to characters -hex_to_char = { - "01": "9", - "08": "0", - "05": "=", - "0a": "2", - "0b": "3", - "0c": "4", - "07": "?", - "00": "8", - "5c": "d", - "0f": "7", - "5e": "f", - "17": "/", - "54": "l", - "09": "1", - "48": "p", - "4f": "w", - "0e": "6", - "5b": "c", - "5d": "e", - "0d": "5", - "53": "k", - "1e": "&", - "5a": "b", - "59": "a", - "4a": "r", - "4c": "t", - "4e": "v", - "57": "o", - "51": "i", -} - - -def give_random_quality(links): - qualities = cycle(["1080", "720", "480", "360"]) - - return [ - {**episode_stream, "quality": quality} - for episode_stream, quality in zip(links, qualities, strict=False) - ] - - -def one_digit_symmetric_xor(password: int, target: str): - def genexp(): - for segment in bytearray.fromhex(target): - yield segment ^ password - - return bytes(genexp()).decode("utf-8") - - -def decode_hex_string(hex_string): - """some of the sources encrypt the urls into hex codes this function decrypts the urls - - Args: - hex_string ([TODO:parameter]): [TODO:description] - - Returns: - [TODO:return] - """ - # Split the hex string into pairs of characters - hex_pairs = re.findall("..", hex_string) - - # Decode each hex pair - decoded_chars = [hex_to_char.get(pair.lower(), pair) for pair in hex_pairs] - - return "".join(decoded_chars) diff --git a/fastanime/libs/providers/anime/utils/utils_1.py b/fastanime/libs/providers/anime/utils/utils_1.py deleted file mode 100644 index 379551a..0000000 --- a/fastanime/libs/providers/anime/utils/utils_1.py +++ /dev/null @@ -1,48 +0,0 @@ -import logging -from typing import TYPE_CHECKING - -from thefuzz import fuzz - -from .data import anime_normalizer - -if TYPE_CHECKING: - from ..libs.anilist.types import AnilistBaseMediaDataSchema - -logger = logging.getLogger(__name__) - - -def sort_by_episode_number(filename: str): - import re - - match = re.search(r"\d+", filename) - return int(match.group()) if match else 0 - - -def anime_title_percentage_match( - possible_user_requested_anime_title: str, anime: "AnilistBaseMediaDataSchema" -) -> float: - """Returns the percentage match between the possible title and user title - - Args: - possible_user_requested_anime_title (str): an Animdl search result title - title (str): the anime title the user wants - - Returns: - int: the percentage match - """ - possible_user_requested_anime_title = anime_normalizer.get( - possible_user_requested_anime_title, possible_user_requested_anime_title - ) - # compares both the romaji and english names and gets highest Score - title_a = str(anime["title"]["romaji"]) - title_b = str(anime["title"]["english"]) - percentage_ratio = max( - *[ - fuzz.ratio(title.lower(), possible_user_requested_anime_title.lower()) - for title in anime["synonyms"] - ], - fuzz.ratio(title_a.lower(), possible_user_requested_anime_title.lower()), - fuzz.ratio(title_b.lower(), possible_user_requested_anime_title.lower()), - ) - logger.info(f"{locals()}") - return percentage_ratio diff --git a/fastanime/libs/providers/anime/yugen/__init__.py b/fastanime/libs/providers/anime/yugen/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/fastanime/libs/providers/anime/yugen/constants.py b/fastanime/libs/providers/anime/yugen/constants.py deleted file mode 100644 index f286fc9..0000000 --- a/fastanime/libs/providers/anime/yugen/constants.py +++ /dev/null @@ -1,4 +0,0 @@ -YUGEN_ENDPOINT: str = "https://yugenanime.tv" - -SEARCH_URL = YUGEN_ENDPOINT + "/api/discover/" -SERVERS_AVAILABLE = ["gogoanime"] diff --git a/fastanime/libs/providers/anime/yugen/provider.py b/fastanime/libs/providers/anime/yugen/provider.py deleted file mode 100644 index 621f095..0000000 --- a/fastanime/libs/providers/anime/yugen/provider.py +++ /dev/null @@ -1,223 +0,0 @@ -import base64 -import re -from itertools import cycle - -from yt_dlp.utils import ( - extract_attributes, - get_element_by_attribute, - get_element_text_and_html_by_tag, - get_elements_text_and_html_by_attribute, -) -from yt_dlp.utils.traversal import get_element_html_by_attribute - -from ..base import BaseAnimeProvider -from ..decorators import debug_provider -from .constants import SEARCH_URL, YUGEN_ENDPOINT - - -# ** Adapted from anipy-cli ** -class Yugen(BaseAnimeProvider): - """ - Provides a fast and effective interface to YugenApi site. - """ - - api_endpoint = YUGEN_ENDPOINT - # HEADERS = { - # "Referer": ALLANIME_REFERER, - # } - - @debug_provider - def search_for_anime( - self, - user_query: str, - translation_type: str = "sub", - nsfw=True, - unknown=True, - **kwargs, - ): - results = [] - has_next = True - page = 0 - while has_next: - page += 1 - response = self.session.get( - SEARCH_URL, params={"q": user_query, "page": page} - ) - search_results = response.json() - has_next = search_results["hasNext"] - - results_html = search_results["query"] - anime = get_elements_text_and_html_by_attribute( - "class", "anime-meta", results_html, tag="a" - ) - id_regex = re.compile(r"(\d+)\/([^\/]+)") - for _a in anime: - if not _a: - continue - a = extract_attributes(_a[1]) - - if not a: - continue - uri = a["href"] - identifier = id_regex.search(uri) # pyright:ignore - if identifier is None: - continue - - if len(identifier.groups()) != 2: - continue - - identifier = base64.b64encode( - f"{identifier.group(1)}/{identifier.group(2)}".encode() - ).decode() - - anime_title = a["title"] - languages = {"sub": 1, "dub": 0} - excl = get_element_by_attribute( - "class", "ani-exclamation", _a[1], tag="div" - ) - if excl is not None: - if "dub" in excl.lower(): - languages["dub"] = 1 - results.append( - { - "id": identifier, - "title": anime_title, - "availableEpisodes": languages, - } - ) - - page += 1 - - return { - "pageInfo": {"total": len(results)}, - "results": results, - } - - @debug_provider - def get_anime(self, anime_id: str, **kwargs): - identifier = base64.b64decode(anime_id).decode() - response = self.session.get(f"{YUGEN_ENDPOINT}/anime/{identifier}") - html_page = response.text - data_map = { - "id": anime_id, - "title": None, - "poster": None, - "genres": [], - "synopsis": None, - "release_year": None, - "status": None, - "otherTitles": [], - "availableEpisodesDetail": {}, - } - - sub_match = re.search( - r'
Episodes
(\d+)', - html_page, - ) - - if sub_match: - eps = int(sub_match.group(1)) - data_map["availableEpisodesDetail"]["sub"] = list( - map(str, range(1, eps + 1)) - ) - - dub_match = re.search( - r'
Episodes \(Dub\)
(\d+)', - html_page, - ) - - if dub_match: - eps = int(dub_match.group(1)) - data_map["availableEpisodesDetail"]["dub"] = list( - map(str, range(1, eps + 1)) - ) - - name = get_element_text_and_html_by_tag("h1", html_page) - if name is not None: - data_map["title"] = name[0].strip() - - synopsis = get_element_by_attribute("class", "description", html_page, tag="p") - if synopsis is not None: - data_map["synopsis"] = synopsis - - # FIXME: This is not working because ytdl is too strict on also getting a closing tag - try: - image = get_element_html_by_attribute( - "class", "cover", html_page, tag="img" - ) - img_attrs = extract_attributes(image) - if img_attrs is not None: - data_map["image"] = img_attrs.get("src") - except Exception: - pass - - data = get_elements_text_and_html_by_attribute( - "class", "data", html_page, tag="div" - ) - for d in data: - title = get_element_text_and_html_by_tag("div", d[1]) - desc = get_element_text_and_html_by_tag("span", d[1]) - if title is None or desc is None: - continue - title = title[0] - desc = desc[0] - if title in ["Native", "Romaji"]: - data_map["alternative_names"].append(desc) - elif title == "Synonyms": - data_map["alternative_names"].extend(desc.split(",")) - elif title == "Premiered": - try: - data_map["release_year"] = int(desc.split()[-1]) - except (ValueError, TypeError): - pass - elif title == "Status": - data_map["status"] = title - elif title == "Genres": - data_map["genres"].extend([g.strip() for g in desc.split(",")]) - - return data_map - - @debug_provider - def get_episode_streams( - self, anime_id, episode_number: str, translation_type="sub" - ): - """get the streams of an episode - - Args: - translation_type ([TODO:parameter]): [TODO:description] - anime: [TODO:description] - episode_number: [TODO:description] - - Yields: - [TODO:description] - """ - - identifier = base64.b64decode(anime_id).decode() - - id_num, anime_title = identifier.split("/") - if translation_type == "dub": - video_query = f"{id_num}|{episode_number}|dub" - else: - video_query = f"{id_num}|{episode_number}" - - res = self.session.post( - f"{YUGEN_ENDPOINT}/api/embed/", - data={ - "id": base64.b64encode(video_query.encode()).decode(), - "ac": "0", - }, - headers={"x-requested-with": "XMLHttpRequest"}, - ) - res = res.json() - yield { - "server": "gogoanime", - "episode_title": f"{anime_title}; Episode {episode_number}", - "headers": {}, - "subtitles": [], - "links": [ - {"quality": quality, "link": link} - for quality, link in zip( - cycle(["1080", "720", "480", "360"]), res["hls"] - ) - ], - }