Files
FastAnime/fastanime/cli/interfaces/anilist_interfaces.py

1002 lines
36 KiB
Python

from __future__ import annotations
import os
import random
from datetime import datetime
from typing import TYPE_CHECKING
from click import clear
from InquirerPy import inquirer
from InquirerPy.validator import EmptyInputValidator
from rich import print
from rich.progress import Progress
from rich.prompt import Confirm, Prompt
from ...anilist import AniList
from ...constants import USER_CONFIG_PATH
from ...libs.fzf import fzf
from ...libs.rofi import Rofi
from ...Utility.data import anime_normalizer
from ...Utility.utils import anime_title_percentage_match, sanitize_filename
from ..utils.mpv import run_mpv
from ..utils.tools import QueryDict, exit_app
from ..utils.utils import fuzzy_inquirer
from .utils import aniskip
if TYPE_CHECKING:
from ...libs.anilist.types import AnilistBaseMediaDataSchema
from ...libs.anime_provider.types import Anime, SearchResult, Server
from ..config import Config
def calculate_time_delta(start_time, end_time):
time_format = "%H:%M:%S"
# Convert string times to datetime objects
start = datetime.strptime(start_time, time_format)
end = datetime.strptime(end_time, time_format)
# Calculate the difference
delta = end - start
return delta
def player_controls(config: "Config", anilist_config: QueryDict):
# user config
config.translation_type.lower()
# internal config
current_episode: str = anilist_config.episode_number
episodes: list = sorted(anilist_config.episodes, key=float)
links: list = anilist_config.current_stream_links
current_link: str = anilist_config.current_stream_link
anime_title: str = anilist_config.anime_title
anime_id: int = anilist_config.anime_id
def _servers():
config.server = ""
fetch_streams(config, anilist_config)
def _replay():
selected_server: "Server" = anilist_config.current_server
print(
"[bold magenta]Now Replaying:[/]",
anime_title,
"[bold magenta] Episode: [/]",
current_episode,
)
start_time = config.watch_history[str(anime_id)]["start_time"]
print("[green]Continuing from:[/] ", start_time)
custom_args = []
if config.skip:
if args := aniskip(
anilist_config.selected_anime_anilist["idMal"], current_episode
):
custom_args.extend(args)
if config.use_mpv_mod:
from ..utils.player import player
mpv = player.create_player(
current_link,
config.anime_provider,
anilist_config,
config,
selected_server["episode_title"],
)
if custom_args and None:
chapters_file = custom_args[0].split("=", 1)
script_opts = custom_args[1].split("=", 1)
mpv._set_property("chapters-file", chapters_file[1])
mpv._set_property("script-opts", script_opts[1])
mpv.start = start_time
mpv.wait_for_shutdown()
mpv.terminate()
stop_time = player.last_stop_time
total_time = player.last_total_time
else:
stop_time, total_time = run_mpv(
current_link,
selected_server["episode_title"],
start_time=start_time,
custom_args=custom_args,
)
if stop_time == "0" or total_time == "0":
episode = str(int(current_episode) + 1)
else:
error = 5 * 60
delta = calculate_time_delta(stop_time, total_time)
if delta.total_seconds() > error:
episode = current_episode
else:
episode = str(int(current_episode) + 1)
stop_time = "0"
total_time = "0"
clear()
config.update_watch_history(anime_id, episode, stop_time, total_time)
player_controls(config, anilist_config)
def _next_episode():
# ensures you dont accidentally erase your progress for an in complete episode
stop_time = config.watch_history.get(str(anime_id), {}).get("start_time", "0")
total_time = config.watch_history.get(str(anime_id), {}).get("total_time", "0")
error = config.error * 60
if stop_time == "0" or total_time == "0":
dt = 0
else:
delta = calculate_time_delta(stop_time, total_time)
dt = delta.total_seconds()
if dt > error:
if config.auto_next:
if config.use_rofi:
if not Rofi.confirm(
"Are you sure you wish to continue to the next episode you haven't completed the current episode?"
):
anilist_options(config, anilist_config)
return
else:
if not Confirm.ask(
"Are you sure you wish to continue to the next episode you haven't completed the current episode?",
default=False,
):
anilist_options(config, anilist_config)
return
elif not config.use_rofi:
if not Confirm.ask(
"Are you sure you wish to continue to the next episode, your progress for the current episodes will be erased?",
default=True,
):
player_controls(config, anilist_config)
return
# all checks have passed lets go to the next episode
next_episode = episodes.index(current_episode) + 1
if next_episode >= len(episodes):
next_episode = len(episodes) - 1
# updateinternal config
anilist_config.episode_number = episodes[next_episode]
# update user config
config.update_watch_history(anime_id, episodes[next_episode])
# call interface
fetch_streams(config, anilist_config)
def _episodes():
# reset watch_history
config.continue_from_history = False
# call interface
fetch_episode(config, anilist_config)
def _previous_episode():
prev_episode = episodes.index(current_episode) - 1
if prev_episode <= 0:
prev_episode = 0
# anilist_config.episode_title = episode["title"]
anilist_config.episode_number = episodes[prev_episode]
# update user config
config.update_watch_history(anime_id, episodes[prev_episode])
# call interface
fetch_streams(config, anilist_config)
def _change_quality():
# extract the actual link urls
options = [link["link"] for link in links]
# prompt for new quality
if config.use_fzf:
quality = fzf.run(
options, prompt="Select Quality:", header="Quality Options"
)
elif config.use_rofi:
quality = Rofi.run(options, "Select Quality")
else:
quality = fuzzy_inquirer("Select Quality", options)
config.quality = options.index(quality) # set quality
player_controls(config, anilist_config)
def _change_translation_type():
# prompt for new translation type
options = ["sub", "dub"]
if config.use_fzf:
translation_type = fzf.run(
options, prompt="Select Translation Type: ", header="Lang Options"
).lower()
elif config.use_rofi:
translation_type = Rofi.run(options, "Select Translation Type")
else:
translation_type = fuzzy_inquirer(
"Select Translation Type", options
).lower()
# update internal config
config.translation_type = translation_type.lower()
# reload to controls
player_controls(config, anilist_config)
icons = config.icons
options = {
f"{'🔂 ' if icons else ''}Replay": _replay,
f"{'' if icons else ''}Next Episode": _next_episode,
f"{'' if icons else ''}Previous Episode": _previous_episode,
f"{'🗃️ ' if icons else ''}Episodes": _episodes,
f"{'📀 ' if icons else ''}Change Quality": _change_quality,
f"{'🎧 ' if icons else ''}Change Translation Type": _change_translation_type,
f"{'💽 ' if icons else ''}Servers": _servers,
f"{'📱 ' if icons else ''}Main Menu": lambda: anilist(config, anilist_config),
f"{'📜 ' if icons else ''}Anime Options Menu": lambda: anilist_options(
config, anilist_config
),
f"{'🔎 ' if icons else ''}Search Results": lambda: select_anime(
config, anilist_config
),
f"{'' if icons else ''}Exit": exit_app,
}
if config.auto_next:
print("Auto selecting next episode")
_next_episode()
return
if config.use_fzf:
action = fzf.run(
list(options.keys()), prompt="Select Action:", header="Player Controls"
)
elif config.use_rofi:
action = Rofi.run(list(options.keys()), "Select Action")
else:
action = fuzzy_inquirer("Select Action", options.keys())
options[action]()
def fetch_streams(config: "Config", anilist_config: QueryDict):
# user config
quality: int = config.quality
# internal config
episode_number: str = anilist_config.episode_number
anime_title: str = anilist_config.anime_title
anime_id: int = anilist_config.anime_id
anime: "Anime" = anilist_config.anime
translation_type = config.translation_type
anime_provider = config.anime_provider
server = None
# get streams for episode from provider
with Progress() as progress:
progress.add_task("Fetching Episode Streams...", total=None)
episode_streams = anime_provider.get_episode_streams(
anime,
episode_number,
translation_type,
anilist_config.selected_anime_anilist,
)
if not episode_streams:
if not config.use_rofi:
print("Failed to fetch :cry:")
input("Enter to retry...")
else:
if not Rofi.confirm("Sth went wrong!!Enter to continue..."):
exit(1)
return fetch_streams(config, anilist_config)
if config.server == "top":
# no need to get all servers if top just works
with Progress() as progress:
progress.add_task("Fetching top server...", total=None)
selected_server = next(episode_streams)
server = "top"
else:
with Progress() as progress:
progress.add_task("Fetching servers...", total=None)
episode_streams_dict = {
episode_stream["server"]: episode_stream
for episode_stream in episode_streams
}
# prompt for preferred server
if config.server and config.server in episode_streams_dict.keys():
server = config.server
if not server:
choices = [*episode_streams_dict.keys(), "top", "Back"]
if config.use_fzf:
server = fzf.run(
choices,
prompt="Select Server: ",
header="Servers",
)
elif config.use_rofi:
server = Rofi.run(choices, "Select Server")
else:
server = fuzzy_inquirer("Select Server", choices)
if server == "Back":
# reset watch_history
config.update_watch_history(anime_id, None)
fetch_episode(config, anilist_config)
return
elif server == "top":
selected_server = episode_streams_dict[list(episode_streams_dict.keys())[0]]
else:
selected_server = episode_streams_dict[server]
links = selected_server["links"]
if quality > len(links) - 1:
quality = config.quality = len(links) - 1
elif quality < 0:
quality = config.quality = 0
stream_link = links[quality]["link"]
# update internal config
anilist_config.current_stream_links = links
anilist_config.current_stream_link = stream_link
anilist_config.current_server = selected_server
anilist_config.current_server_name = server
# play video
print(
"[bold magenta]Now playing:[/]",
anime_title,
"[bold magenta] Episode: [/]",
episode_number,
)
# -- update anilist info if user --
if config.user and episode_number:
AniList.update_anime_list(
{
"mediaId": anime_id,
# "status": "CURRENT",
"progress": episode_number,
}
)
start_time = config.watch_history.get(str(anime_id), {}).get("start_time", "0")
if start_time != "0":
print("[green]Continuing from:[/] ", start_time)
custom_args = []
if config.skip:
if args := aniskip(
anilist_config.selected_anime_anilist["idMal"], episode_number
):
custom_args.extend(args)
if config.use_mpv_mod:
from ..utils.player import player
mpv = player.create_player(
stream_link,
anime_provider,
anilist_config,
config,
selected_server["episode_title"],
)
if custom_args and None:
chapters_file = custom_args[0].split("=", 1)
script_opts = custom_args[1].split("=", 1)
mpv._set_property("chapters-file", chapters_file[1])
mpv._set_property("script-opts", script_opts[1])
mpv.start = start_time
mpv.wait_for_shutdown()
mpv.terminate()
stop_time = player.last_stop_time
total_time = player.last_total_time
else:
stop_time, total_time = run_mpv(
stream_link,
selected_server["episode_title"],
start_time=start_time,
custom_args=custom_args,
)
print("Finished at: ", stop_time)
# update_watch_history
if stop_time == "0" or total_time == "0":
episode = str(int(episode_number) + 1)
else:
error = config.error * 60
delta = calculate_time_delta(stop_time, total_time)
if delta.total_seconds() > error:
episode = episode_number
else:
episode = str(int(episode_number) + 1)
stop_time = "0"
total_time = "0"
config.update_watch_history(
anime_id, episode, start_time=stop_time, total_time=total_time
)
# switch to controls
clear()
player_controls(config, anilist_config)
def fetch_episode(config: "Config", anilist_config: QueryDict):
# user config
translation_type: str = config.translation_type.lower()
continue_from_history: bool = config.continue_from_history
user_watch_history: dict = config.watch_history
anime_id: int = anilist_config.anime_id
anime_title: str = anilist_config.anime_title
# internal config
anime: "Anime" = anilist_config.anime
_anime: "SearchResult" = anilist_config._anime
selected_anime_anilist: "AnilistBaseMediaDataSchema" = (
anilist_config.selected_anime_anilist
)
# prompt for episode number
episodes = anime["availableEpisodesDetail"][translation_type]
episode_number = ""
if continue_from_history:
if user_watch_history.get(str(anime_id), {}).get("episode") in episodes:
episode_number = user_watch_history[str(anime_id)]["episode"]
print(f"[bold cyan]Continuing from Episode:[/] [bold]{episode_number}[/]")
elif selected_anime_anilist["mediaListEntry"]:
episode_number = str(
(selected_anime_anilist["mediaListEntry"] or {"progress": ""}).get(
"progress"
)
)
if episode_number not in episodes:
episode_number = ""
print(f"[bold cyan]Continuing from Episode:[/] [bold]{episode_number}[/]")
else:
episode_number = ""
if not episode_number:
choices = [*episodes, "Back"]
if config.use_fzf:
episode_number = fzf.run(
choices,
prompt="Select Episode:",
header=anime_title,
)
elif config.use_rofi:
episode_number = Rofi.run(choices, "Select Episode")
else:
episode_number = fuzzy_inquirer("Select Episode", choices)
if episode_number == "Back":
anilist_options(config, anilist_config)
return
start_time = user_watch_history.get(str(anime_id), {}).get("start_time", "0")
config.update_watch_history(anime_id, episode_number, start_time=start_time)
# update internal config
anilist_config.episodes = episodes
# anilist_config.episode_title = episode["title"]
anilist_config.episode_number = episode_number
# next interface
fetch_streams(config, anilist_config)
def fetch_anime_episode(config, anilist_config: QueryDict):
selected_anime: "SearchResult" = anilist_config._anime
anime_provider = config.anime_provider
with Progress() as progress:
progress.add_task("Fetching Anime Info...", total=None)
anilist_config.anime = anime_provider.get_anime(
selected_anime["id"], anilist_config.selected_anime_anilist
)
if not anilist_config.anime:
print(
"Sth went wrong :cry: this could mean the provider is down or your internet"
)
if not config.use_rofi:
input("Enter to continue...")
else:
if not Rofi.confirm("Sth went wrong!!Enter to continue..."):
exit(1)
fetch_anime_episode(config, anilist_config)
return
fetch_episode(config, anilist_config)
def provide_anime(config: "Config", anilist_config: QueryDict):
# user config
translation_type = config.translation_type.lower()
# internal config
selected_anime_title = anilist_config.selected_anime_title
anime_data: "AnilistBaseMediaDataSchema" = anilist_config.selected_anime_anilist
anime_provider = config.anime_provider
# search and get the requested title from provider
with Progress() as progress:
progress.add_task("Fetching Search Results...", total=None)
search_results = anime_provider.search_for_anime(
selected_anime_title,
translation_type,
anilist_config.selected_anime_anilist,
)
if not search_results:
print(
"Sth went wrong :cry: while fetching this could mean you have poor internet connection or the provider is down"
)
if not config.use_rofi:
input("Enter to continue...")
else:
if not Rofi.confirm("Sth went wrong!!Enter to continue..."):
exit(1)
provide_anime(config, anilist_config)
return
search_results = {anime["title"]: anime for anime in search_results["results"]}
_title = None
if _title := next(
(
original
for original, normalized in anime_normalizer.items()
if normalized.lower() == selected_anime_title.lower()
),
None,
):
_title = _title
if config.auto_select:
anime_title = max(
search_results.keys(),
key=lambda title: anime_title_percentage_match(title, anime_data),
)
print(f"[cyan]Auto selecting[/]: {anime_title}")
else:
choices = [*search_results.keys(), "Back"]
if config.use_fzf:
anime_title = fzf.run(
choices,
prompt="Select Search Result:",
header="Anime Search Results",
)
elif config.use_rofi:
anime_title = Rofi.run(choices, "Select Search Result")
else:
anime_title = fuzzy_inquirer("Select Search Result", choices)
if anime_title == "Back":
anilist_options(config, anilist_config)
return
anilist_config.anime_title = anime_normalizer.get(anime_title) or anime_title
anilist_config._anime = search_results[anime_title]
fetch_anime_episode(config, anilist_config)
def anilist_options(config, anilist_config: QueryDict):
selected_anime: "AnilistBaseMediaDataSchema" = anilist_config.selected_anime_anilist
selected_anime_title: str = anilist_config.selected_anime_title
progress = (selected_anime["mediaListEntry"] or {"progress": 0}).get("progress", 0)
episodes_total = selected_anime["episodes"] or "Inf"
def _watch_trailer(config: "Config", anilist_config: QueryDict):
if trailer := selected_anime.get("trailer"):
trailer_url = "https://youtube.com/watch?v=" + trailer["id"]
print("[bold magenta]Watching Trailer of:[/]", selected_anime_title)
run_mpv(
trailer_url,
ytdl_format=config.format,
)
anilist_options(config, anilist_config)
else:
if not config.use_rofi:
print("no trailer available :confused:")
input("Enter to continue...")
else:
if not Rofi.confirm("No trailler found!!Enter to continue"):
exit(0)
anilist_options(config, anilist_config)
def _add_to_list(config: "Config", anilist_config: QueryDict):
# config.update_anime_list(anilist_config.anime_id)
anime_lists = {
"Watching": "CURRENT",
"Paused": "PAUSED",
"Planning": "PLANNING",
"Dropped": "DROPPED",
"Rewatching": "REPEATING",
"Completed": "COMPLETED",
}
if config.use_fzf:
anime_list = fzf.run(
list(anime_lists.keys()),
"Choose the list you want to add to",
"Add your animelist",
)
elif config.use_rofi:
anime_list = Rofi.run(
list(anime_lists.keys()), "Choose list you want to add to"
)
else:
anime_list = fuzzy_inquirer(
"Choose the list you want to add to", list(anime_lists.keys())
)
result = AniList.update_anime_list(
{"status": anime_lists[anime_list], "mediaId": selected_anime["id"]}
)
if not result[0]:
print("Failed to update", result)
else:
print(
f"Successfully added {selected_anime_title} to your {anime_list} list :smile:"
)
if not config.use_rofi:
input("Enter to continue...")
anilist_options(config, anilist_config)
def _score_anime(config: "Config", anilist_config: QueryDict):
if config.use_rofi:
score = Rofi.ask("Enter Score", is_int=True)
score = max(100, min(0, score))
else:
score = inquirer.number(
message="Enter the score:",
min_allowed=0,
max_allowed=100,
validate=EmptyInputValidator(),
).execute()
result = AniList.update_anime_list(
{"scoreRaw": score, "mediaId": selected_anime["id"]}
)
if not result[0]:
print("Failed to update", result)
else:
print(f"Successfully scored {selected_anime_title}; score: {score}")
if not config.use_rofi:
input("Enter to continue...")
anilist_options(config, anilist_config)
def _remove_from_list(config: "Config", anilist_config: QueryDict):
if Confirm.ask(
f"Are you sure you want to procede, the folowing action will permanently remove {selected_anime_title} from your list and your progress will be erased",
default=False,
):
success, data = AniList.delete_medialist_entry(selected_anime["id"])
if not success or not data:
print("Failed to delete", data)
elif not data.get("deleted"):
print("Failed to delete", data)
else:
print("Successfully deleted :cry:", selected_anime_title)
else:
print(selected_anime_title, ":relieved:")
if not config.use_rofi:
input("Enter to continue...")
anilist_options(config, anilist_config)
def _change_translation_type(config: "Config", anilist_config: QueryDict):
# prompt for new translation type
options = ["Sub", "Dub"]
if config.use_fzf:
translation_type = fzf.run(
options, prompt="Select Translation Type:", header="Language Options"
)
elif config.use_rofi:
translation_type = Rofi.run(options, "Select Translation Type")
else:
translation_type = fuzzy_inquirer("Select translation type", options)
# update internal config
config.translation_type = translation_type.lower()
anilist_options(config, anilist_config)
def _view_info(config, anilist_config):
from rich.console import Console
from rich.prompt import Confirm
from ...Utility import anilist_data_helper
from ...Utility.utils import remove_html_tags
from ..utils.print_img import print_img
clear()
console = Console()
print_img(selected_anime["coverImage"]["large"])
console.print("[bold cyan]Title(jp): ", selected_anime["title"]["romaji"])
console.print("[bold cyan]Title(eng): ", selected_anime["title"]["english"])
console.print("[bold cyan]Popularity: ", selected_anime["popularity"])
console.print("[bold cyan]Favourites: ", selected_anime["favourites"])
console.print("[bold cyan]Status: ", selected_anime["status"])
console.print(
"[bold cyan]Start Date: ",
anilist_data_helper.format_anilist_date_object(selected_anime["startDate"]),
)
console.print(
"[bold cyan]End Date: ",
anilist_data_helper.format_anilist_date_object(selected_anime["endDate"]),
)
# console.print("[bold cyan]Season: ", selected_anime["season"])
console.print("[bold cyan]Episodes: ", selected_anime["episodes"])
console.print(
"[bold cyan]Tags: ",
anilist_data_helper.format_list_data_with_comma(
[tag["name"] for tag in selected_anime["tags"]]
),
)
console.print(
"[bold cyan]Genres: ",
anilist_data_helper.format_list_data_with_comma(selected_anime["genres"]),
)
# console.print("[bold cyan]Type: ", selected_anime["st"])
if selected_anime["nextAiringEpisode"]:
console.print(
"[bold cyan]Next Episode: ",
anilist_data_helper.extract_next_airing_episode(
selected_anime["nextAiringEpisode"]
),
)
console.print(
"[bold underline cyan]Description\n[/]",
remove_html_tags(str(selected_anime["description"])),
)
if Confirm.ask("Enter to continue...", default=True):
anilist_options(config, anilist_config)
return
def _toggle_auto_select(config, anilist_config):
config.auto_select = not config.auto_select
anilist_options(config, anilist_config)
def _toggle_auto_next(config, anilist_config):
config.auto_select = not config.auto_select
anilist_options(config, anilist_config)
icons = config.icons
options = {
f"{'📽️ ' if icons else ''}Stream ({progress}/{episodes_total})": provide_anime,
f"{'📼 ' if icons else ''}Watch Trailer": _watch_trailer,
f"{'' if icons else ''}Score Anime": _score_anime,
f"{'📥 ' if icons else ''}Add to List": _add_to_list,
f"{'📤 ' if icons else ''}Remove from List": _remove_from_list,
f"{'📖 ' if icons else ''}View Info": _view_info,
f"{'🎧 ' if icons else ''}Change Translation Type": _change_translation_type,
f"{'🔘 ' if icons else ''}Toggle auto select anime": _toggle_auto_select, # problematic if you choose an anime that doesnt match id
f"{'💠 ' if icons else ''}Toggle auto next episode": _toggle_auto_next,
f"{'🔙 ' if icons else ''}Back": select_anime,
f"{'' if icons else ''}Exit": exit_app,
}
if config.use_fzf:
action = fzf.run(
list(options.keys()), prompt="Select Action:", header="Anime Menu"
)
elif config.use_rofi:
action = Rofi.run(list(options.keys()), "Select Action")
else:
action = fuzzy_inquirer("Select Action", options.keys())
options[action](config, anilist_config)
def select_anime(config: "Config", anilist_config: QueryDict):
search_results = anilist_config.data["data"]["Page"]["media"]
anime_data = {}
for anime in search_results:
anime: "AnilistBaseMediaDataSchema"
progress = (anime["mediaListEntry"] or {"progress": 0}).get("progress", 0)
episodes_total = anime["episodes"] or "Inf"
title = str(
anime["title"][config.preferred_language] or anime["title"]["romaji"]
)
title = sanitize_filename(f"{title} ({progress} of {episodes_total})")
# Check if the anime is currently airing and has new/unwatched episodes
if (
anime["status"] == "RELEASING"
and anime["nextAiringEpisode"]
and progress > 0
):
last_aired_episode = anime["nextAiringEpisode"]["episode"] - 1
if last_aired_episode - progress > 0:
title += f" 🔹{last_aired_episode - progress} new episode(s)🔹"
anime_data[title] = anime
choices = [*anime_data.keys(), "Back"]
if config.use_fzf:
if config.preview:
from .utils import get_preview
preview = get_preview(search_results, anime_data.keys())
selected_anime_title = fzf.run(
choices,
prompt="Select Anime: ",
header="Search Results",
preview=preview,
)
else:
selected_anime_title = fzf.run(
choices,
prompt="Select Anime: ",
header="Search Results",
)
elif config.use_rofi:
# TODO: Make this faster
if config.preview:
from .utils import IMAGES_DIR, get_icons
get_icons(search_results, anime_data.keys())
choices = []
for title in anime_data.keys():
icon_path = os.path.join(IMAGES_DIR, title)
choices.append(f"{title}\0icon\x1f{icon_path}")
choices.append("Back")
selected_anime_title = Rofi.run_with_icons(choices, "Select Anime")
else:
selected_anime_title = Rofi.run(choices, "Select Anime")
else:
selected_anime_title = fuzzy_inquirer("Select Anime", choices)
# "bat %s/{}" % SEARCH_RESULTS_CACHE
if selected_anime_title == "Back":
anilist(config, anilist_config)
return
selected_anime: "AnilistBaseMediaDataSchema" = anime_data[selected_anime_title]
anilist_config.selected_anime_anilist = selected_anime
anilist_config.selected_anime_title = (
selected_anime["title"]["romaji"] or selected_anime["title"]["english"]
)
anilist_config.anime_id = selected_anime["id"]
anilist_options(config, anilist_config)
def handle_animelist(anilist_config, config: "Config", list_type: str):
if not config.user:
if not config.use_rofi:
print("You haven't logged in please run: fastanime anilist login")
input("Enter to continue...")
else:
if not Rofi.confirm("You haven't logged in!!Enter to continue"):
exit(1)
anilist(config, anilist_config)
return
match list_type:
case "Watching":
status = "CURRENT"
case "Planned":
status = "PLANNING"
case "Completed":
status = "COMPLETED"
case "Dropped":
status = "DROPPED"
case "Paused":
status = "PAUSED"
case "Repeating":
status = "REPEATING"
case _:
return
anime_list = AniList.get_anime_list(status)
if not anime_list:
print("Sth went wrong", anime_list)
if not config.use_rofi:
input("Enter to continue")
else:
if not Rofi.confirm("Sth went wrong!!Enter to continue..."):
exit(1)
anilist(config, anilist_config)
return
if not anime_list[0] or not anime_list[1]:
print("Sth went wrong", anime_list)
if not config.use_rofi:
input("Enter to continue")
else:
if not Rofi.confirm("Sth went wrong!!Enter to continue..."):
exit(1)
anilist(config, anilist_config)
return
media = [
mediaListItem["media"]
for mediaListItem in anime_list[1]["data"]["Page"]["mediaList"]
] # pyright:ignore
anime_list[1]["data"]["Page"]["media"] = media # pyright:ignore
return anime_list
def anilist(config: "Config", anilist_config: QueryDict):
def _anilist_search():
if config.use_rofi:
search_term = str(Rofi.ask("Search for"))
else:
search_term = Prompt.ask("[cyan]Search for[/]")
return AniList.search(query=search_term)
def _anilist_random():
random_anime = range(1, 15000)
random_anime = random.sample(random_anime, k=50)
return AniList.search(id_in=list(random_anime))
def _watch_history():
watch_history = list(map(int, config.watch_history.keys()))
return AniList.search(id_in=watch_history, sort="TRENDING_DESC")
# NOTE: Will probably be depracated
def _anime_list():
anime_list = config.anime_list
return AniList.search(id_in=anime_list)
def edit_config():
from click import edit
edit(filename=USER_CONFIG_PATH)
if config.use_rofi:
config.load_config()
config.use_rofi = True
config.use_fzf = False
else:
config.load_config()
anilist(config, anilist_config)
icons = config.icons
options = {
f"{'🔥 ' if icons else ''}Trending": AniList.get_trending,
f"{'📺 ' if icons else ''}Watching": lambda x="Watching": handle_animelist(
anilist_config, config, x
),
f"{'' if icons else ''}Paused": lambda x="Paused": handle_animelist(
anilist_config, config, x
),
f"{'🚮 ' if icons else ''}Dropped": lambda x="Dropped": handle_animelist(
anilist_config, config, x
),
f"{'📑 ' if icons else ''}Planned": lambda x="Planned": handle_animelist(
anilist_config, config, x
),
f"{'' if icons else ''}Completed": lambda x="Completed": handle_animelist(
anilist_config, config, x
),
f"{'🔁 ' if icons else ''}Rewatching": lambda x="Repeating": handle_animelist(
anilist_config, config, x
),
f"{'🔔 ' if icons else ''}Recently Updated Anime": AniList.get_most_recently_updated,
f"{'🔎 ' if icons else ''}Search": _anilist_search,
f"{'🎞️ ' if icons else ''}Watch History": _watch_history,
# "AnimeList": _anime_list💯,
f"{'🎲 ' if icons else ''}Random Anime": _anilist_random,
f"{'🌟 ' if icons else ''}Most Popular Anime": AniList.get_most_popular,
f"{'💖 ' if icons else ''}Most Favourite Anime": AniList.get_most_favourite,
f"{'' if icons else ''}Most Scored Anime": AniList.get_most_scored,
f"{'🎬 ' if icons else ''}Upcoming Anime": AniList.get_upcoming_anime,
f"{'📝 ' if icons else ''}Edit Config": edit_config,
f"{'' if icons else ''}Exit": exit_app,
}
if config.use_fzf:
action = fzf.run(
list(options.keys()),
prompt="Select Action: ",
header="Anilist Menu",
)
elif config.use_rofi:
action = Rofi.run(list(options.keys()), "Select Action")
else:
action = fuzzy_inquirer("Select Action", options.keys())
anilist_data = options[action]()
if anilist_data[0]:
anilist_config.data = anilist_data[1]
select_anime(config, anilist_config)
else:
print(anilist_data[1])
if not config.use_rofi:
input("Enter to continue...")
else:
if not Rofi.confirm("Sth went wrong!!Enter to continue..."):
exit(1)
anilist(config, anilist_config)