feat:standardize the user data helper to kork for both cli and gui

This commit is contained in:
Benex254
2024-08-05 09:47:00 +03:00
parent 6b308d26d1
commit d1fb3e16c9
8 changed files with 192 additions and 51 deletions

View File

@@ -6,6 +6,7 @@ anime_normalizer = {
"1P": "one piece",
"Magia Record: Mahou Shoujo Madoka☆Magica Gaiden (TV)": "Mahou Shoujo Madoka☆Magica",
"Dungeon ni Deai o Motomeru no wa Machigatte Iru Darouka": "Dungeon ni Deai wo Motomeru no wa Machigatteiru Darou ka",
'Hazurewaku no "Joutai Ijou Skill" de Saikyou ni Natta Ore ga Subete wo Juurin suru made': "Hazure Waku no [Joutai Ijou Skill] de Saikyou ni Natta Ore ga Subete wo Juurin Suru made",
}

View File

@@ -1,32 +1,31 @@
import logging
from queue import Queue
from threading import Thread
import yt_dlp
from ... import USER_DOWNLOADS_DIR
from ..show_notification import show_notification
from ..utils import sanitize_filename
logger = logging.getLogger(__name__)
class MyLogger:
def debug(self, msg):
print(msg)
pass
def warning(self, msg):
print(msg)
pass
def error(self, msg):
print(msg)
pass
def main_progress_hook(data):
match data["status"]:
case "error":
show_notification(
"Something went wrong while downloading the video", data["filename"]
)
logger.error("sth went wrong")
case "finished":
show_notification("Downloaded", data["filename"])
logger.info("download complete")
# Options for yt-dlp
@@ -41,7 +40,7 @@ class YtDLPDownloader:
try:
task(*args)
except Exception as e:
show_notification("Something went wrong", f"Reason: {e}")
logger.error(f"Something went wrong {e}")
self.downloads_queue.task_done()
def __init__(self):
@@ -50,15 +49,18 @@ class YtDLPDownloader:
self._thread.start()
# Function to download the file
def _download_file(self, url: str, title, custom_progress_hook, silent):
def _download_file(
self, url: str, download_dir, title, custom_progress_hook, silent
):
anime_title = sanitize_filename(title[0])
ydl_opts = {
"outtmpl": f"{USER_DOWNLOADS_DIR}/{anime_title}/{anime_title}-episode {title[1]}.%(ext)s", # Specify the output path and template
"outtmpl": f"{download_dir}/{anime_title}/{anime_title}-episode {title[1]}.%(ext)s", # Specify the output path and template
"progress_hooks": [
main_progress_hook,
custom_progress_hook,
], # Progress hook
"silent": silent,
"verbose": False,
}
with yt_dlp.YoutubeDL(ydl_opts) as ydl:

View File

@@ -0,0 +1,36 @@
import json
import logging
import os
from .. import USER_DATA_PATH
logger = logging.getLogger(__name__)
class UserData:
user_data = {"watch_history": {}, "animelist": []}
def __init__(self):
try:
if os.path.isfile(USER_DATA_PATH):
with open(USER_DATA_PATH, "r") as f:
user_data = json.load(f)
self.user_data.update(user_data)
print(user_data, self.user_data)
except Exception as e:
logger.error(e)
def update_watch_history(self, watch_history: dict):
self.user_data["watch_history"] = watch_history
self._update_user_data()
def update_animelist(self, anime_list: list):
self.user_data["animelist"] = list(set(anime_list))
self._update_user_data()
def _update_user_data(self):
with open(USER_DATA_PATH, "w") as f:
json.dump(self.user_data, f)
user_data_helper = UserData()

View File

@@ -8,7 +8,7 @@ from rich.traceback import install
install(show_locals=True)
# Create a logger instance
logger = logging.getLogger(__name__)
# initiate constants
__version__ = "v0.30.0"
@@ -36,11 +36,12 @@ if not APP_DATA_DIR:
USER_DATA_PATH = os.path.join(APP_DATA_DIR, "user_data.json")
USER_CONFIG_PATH = os.path.join(APP_DATA_DIR, "config.ini")
USER_WATCH_HISTORY = os.path.join(APP_DATA_DIR, "watch_history.json")
# video dir
USER_DOWNLOADS_DIR = dirs.user_downloads_dir
USER_DOWNLOADS_DIR = dirs.user_videos_dir
print(f"USER_DOWNLOADS_DIR: {USER_DOWNLOADS_DIR}")
def FastAnime(gui=False):
@@ -52,15 +53,18 @@ def FastAnime(gui=False):
if "--gui" in sys.argv:
gui = True
sys.argv.remove("--gui")
if "--log" in sys.argv:
# Configure logging
from rich.logging import RichHandler
logging.getLogger(__name__)
logging.basicConfig(
level=logging.DEBUG, # Set the logging level to DEBUG
format="%(message)s", # Use a simple message format
datefmt="[%X]", # Use a custom date format
handlers=[RichHandler()], # Use RichHandler to format the logs
)
sys.argv.remove("--log")
if gui:
from .gui import run_gui

View File

@@ -1,6 +1,68 @@
import click
from ...libs.anime_provider.allanime.api import anime_provider
from ...Utility.downloader.downloader import downloader
from ..config import Config
from ..utils.utils import clear, fuzzy_inquirer
@click.command()
def download():
print("download")
@click.option("--anime-title", prompt="Enter the anime title", required=True)
@click.option("--episode-start", prompt="Enter the episode start", required=True)
@click.option("--episode-end", prompt="Enter the episode end", required=True)
@click.pass_obj
def download(config: Config, anime_title, episode_start, episode_end):
translation_type = config.translation_type
download_dir = config.downloads_dir
quality = config.quality
search_results = anime_provider.search_for_anime(
anime_title, translation_type=translation_type
)
episodes_to_download = range(int(episode_start), int(episode_end) + 1)
options = {show["name"]: show for show in search_results["shows"]["edges"]}
anime = fuzzy_inquirer("Please select the anime:", options.keys())
anime_data = options[anime]
availableEpisodesDetail = anime_data["availableEpisodes"]
episodes = availableEpisodesDetail[translation_type]
server = config.server
for episode_number in episodes_to_download:
if episode_number not in range(episodes):
print(f"Episode {episode_number} not available")
continue
print(f"Downloading episode {episode_number} of {anime_data['name']}")
episode = anime_provider.get_anime_episode(
anime_data["_id"], str(episode_number), translation_type
)
# get streams for episode from provider
episode_streams = anime_provider.get_episode_streams(episode)
episode_streams = {
episode_stream[0]: episode_stream[1] for episode_stream in episode_streams
}
# prompt for preferred server
if not server or server not in episode_streams.keys():
server = fuzzy_inquirer("Please select server:", episode_streams.keys())
print(episode)
print(episode_streams)
selected_server = episode_streams[server]
links = selected_server["links"]
if quality > len(links) - 1:
quality = config.quality = len(links) - 1
elif quality < 0:
quality = config.quality = 0
stream_link = links[quality]["link"]
downloader._download_file(
stream_link,
download_dir,
(anime_data["name"], str(episode_number)),
lambda *_: "",
silent=True,
)
clear()

View File

@@ -1,8 +1,8 @@
import json
import os
from configparser import ConfigParser
from .. import USER_CONFIG_PATH, USER_DOWNLOADS_DIR, USER_WATCH_HISTORY
from .. import USER_CONFIG_PATH, USER_DOWNLOADS_DIR
from ..Utility import user_data_helper
class Config(object):
@@ -37,17 +37,17 @@ class Config(object):
self.server = self.get_server()
self.preferred_language = self.get_preferred_language()
# ---- setup history ------
if not os.path.exists(USER_WATCH_HISTORY):
self.watch_history = {}
else:
with open(USER_WATCH_HISTORY, "r") as history:
self.watch_history = json.load(history)
# ---- setup user data ------
self.watch_history = user_data_helper.user_data.get("watch_history", {})
self.anime_list = user_data_helper.user_data.get("animelist", [])
def update_watch_history(self, title, episode):
with open(USER_WATCH_HISTORY, "w") as history:
self.watch_history[title] = episode
json.dump(self.watch_history, history)
def update_watch_history(self, anime_id: int, episode: str | None):
self.watch_history.update({str(anime_id): episode})
user_data_helper.update_watch_history(self.watch_history)
def update_anime_list(self, anime_id: int):
self.anime_list.append(anime_id)
user_data_helper.update_animelist(self.anime_list)
def get_downloads_dir(self):
return self.configparser.get("general", "downloads_dir")

View File

@@ -9,6 +9,7 @@ from ...libs.anilist.anilist import AniList
from ...libs.anilist.anilist_data_schema import AnilistBaseMediaDataSchema
from ...libs.anime_provider.allanime.api import anime_provider
from ...libs.anime_provider.allanime.data_types import AllAnimeEpisode, AllAnimeShow
from ...Utility.data import anime_normalizer
from ..config import Config
from ..utils.mpv import mpv
from ..utils.tools import QueryDict
@@ -22,10 +23,11 @@ def player_controls(config: Config, anilist_config: QueryDict):
# internal config
_anime: AllAnimeShow = anilist_config._anime
current_episode: str = anilist_config.episode_number
episodes: list = anilist_config.episodes
episodes: list = sorted(anilist_config.episodes, key=float)
links: list = anilist_config.current_stream_links
current_link: str = anilist_config.current_stream_link
anime_title: str = anilist_config.anime_title
anime_id: int = anilist_config.anime_id
def _back():
fetch_streams(config, anilist_config)
@@ -38,8 +40,6 @@ def player_controls(config: Config, anilist_config: QueryDict):
current_episode,
)
config.update_watch_history(anime_title, current_episode)
mpv(current_link)
clear()
player_controls(config, anilist_config)
@@ -57,14 +57,14 @@ def player_controls(config: Config, anilist_config: QueryDict):
anilist_config.episode_number = episodes[next_episode]
# update user config
config.update_watch_history(anime_title, episodes[next_episode])
config.update_watch_history(anime_id, episodes[next_episode])
# call interface
fetch_streams(config, anilist_config)
def _episodes():
# reset watch_history
config.update_watch_history(anime_title, None)
config.update_watch_history(anime_id, None)
# call interface
fetch_episode(config, anilist_config)
@@ -83,7 +83,7 @@ def player_controls(config: Config, anilist_config: QueryDict):
anilist_config.episode_number = episodes[prev_episode]
# update user config
config.update_watch_history(anime_title, episodes[prev_episode])
config.update_watch_history(anime_id, episodes[prev_episode])
# call interface
fetch_streams(config, anilist_config)
@@ -122,10 +122,11 @@ def player_controls(config: Config, anilist_config: QueryDict):
"exit": sys.exit,
}
if config.auto_next:
_next_episode()
return
action = fuzzy_inquirer("Select Action:", options.keys())
# update_watch_history
config.update_watch_history(anime_title, current_episode)
options[action]()
@@ -137,6 +138,7 @@ def fetch_streams(config: Config, anilist_config: QueryDict):
episode: AllAnimeEpisode = anilist_config.episode
episode_number: str = anilist_config.episode_number
anime_title: str = anilist_config.anime_title
anime_id: int = anilist_config.anime_id
# get streams for episode from provider
episode_streams = anime_provider.get_episode_streams(episode)
@@ -145,10 +147,16 @@ def fetch_streams(config: Config, anilist_config: QueryDict):
}
# prompt for preferred server
server = fuzzy_inquirer("Select Server:", [*episode_streams.keys(), "back"])
server = None
if config.server and config.server in episode_streams.keys():
server = config.server
if config.server == "top":
server = list(episode_streams.keys())[0]
if not server:
server = fuzzy_inquirer("Select Server:", [*episode_streams.keys(), "back"])
if server == "back":
# reset watch_history
config.update_watch_history(anime_title, None)
config.update_watch_history(anime_id, None)
fetch_episode(config, anilist_config)
return
@@ -177,8 +185,12 @@ def fetch_streams(config: Config, anilist_config: QueryDict):
mpv(stream_link)
# update_watch_history
config.update_watch_history(anime_id, str(int(episode_number) + 1))
# switch to controls
clear()
player_controls(config, anilist_config)
@@ -186,18 +198,17 @@ def fetch_episode(config: Config, anilist_config: QueryDict):
# user config
translation_type: str = config.translation_type
continue_from_history: bool = config.continue_from_history
user_watch_history = config.watch_history
anime_title = anilist_config.anime_title
user_watch_history: dict = config.watch_history
anime_id: int = anilist_config.anime_id
# internal config
anime = anilist_config.anime
_anime: AllAnimeShow = anilist_config._anime
# prompt for episode number
# TODO: Load episode number from cache
episodes = anime["show"]["availableEpisodesDetail"][translation_type]
if continue_from_history and user_watch_history.get(anime_title) in episodes:
episode_number = user_watch_history[anime_title]
if continue_from_history and user_watch_history.get(str(anime_id)) in episodes:
episode_number = user_watch_history[str(anime_id)]
print(f"[bold cyan]Continuing from Episode:[/] [bold]{episode_number}[/]")
else:
episode_number = fuzzy_inquirer(
@@ -208,7 +219,7 @@ def fetch_episode(config: Config, anilist_config: QueryDict):
if episode_number == "back":
provide_anime(config, anilist_config)
return
config.update_watch_history(anime_title, episode_number)
config.update_watch_history(anime_id, episode_number)
# get the episode info from provider
episode = anime_provider.get_anime_episode(
@@ -247,16 +258,27 @@ def provide_anime(config: Config, anilist_config: QueryDict):
search_results = {
anime["name"]: anime for anime in search_results["shows"]["edges"]
}
_title = None
if _title := next(
(
original
for original, normalized in anime_normalizer.items()
if normalized.lower() == selected_anime_title.lower()
),
None,
):
_title = _title
anime_title = fuzzy_inquirer(
"Select Search Result:",
[*search_results.keys(), "back"],
default=selected_anime_title,
default=_title or selected_anime_title,
)
if anime_title == "back":
anilist_options(config, anilist_config)
return
anilist_config.anime_title = anime_title
anilist_config.anime_title = anime_normalizer.get(anime_title) or anime_title
anilist_config._anime = search_results[anime_title]
fetch_anime_epiosode(config, anilist_config)
@@ -373,9 +395,12 @@ def select_anime(config: Config, anilist_config: QueryDict):
anilist(config, anilist_config)
return
selected_anime = anime_data[selected_anime_title]
selected_anime: AnilistBaseMediaDataSchema = anime_data[selected_anime_title]
anilist_config.selected_anime_anilist = selected_anime
anilist_config.selected_anime_title = selected_anime_title
anilist_config.selected_anime_title = (
selected_anime["title"]["romaji"] or selected_anime["title"]["english"]
)
anilist_config.anime_id = selected_anime["id"]
anilist_options(config, anilist_config)
@@ -388,14 +413,25 @@ def anilist(config: Config, anilist_config: QueryDict):
return AniList.search(query=search_term)
def _watch_history():
watch_history = list(map(int, config.watch_history.keys()))
print(watch_history)
return AniList.search(id_in=watch_history)
def _anime_list():
anime_list = config.anime_list
return AniList.search(id_in=anime_list)
options = {
"trending": AniList.get_trending,
"recently updated anime": AniList.get_most_recently_updated,
"search": _anilist_search,
"Watch History": _watch_history,
"AnimeList": _anime_list,
"most popular anime": AniList.get_most_popular,
"most favourite anime": AniList.get_most_favourite,
"most scored anime": AniList.get_most_scored,
"upcoming anime": AniList.get_upcoming_anime,
"recently updated anime": AniList.get_most_recently_updated,
"exit": sys.exit,
}
action = fuzzy_inquirer("Select Action:", options.keys())

View File

@@ -323,7 +323,7 @@ query{
most_recently_updated_query = """
query{
Page(perPage:15){
media(sort:UPDATED_AT_DESC,type:ANIME,averageScore_greater:50,genre_not_in:["hentai"]){
media(sort:UPDATED_AT_DESC,type:ANIME,averageScore_greater:60,genre_not_in:["hentai"]){
id
title{
romaji