import concurrent.futures import logging import os import shutil import subprocess import textwrap from threading import Thread import requests from ...constants import APP_CACHE_DIR from ...libs.anilist.anilist_data_schema import AnilistBaseMediaDataSchema from ...Utility import anilist_data_helper from ...Utility.utils import remove_html_tags from ..utils.utils import get_true_fg logger = logging.getLogger(__name__) fzf_preview = r""" # # The purpose of this script is to demonstrate how to preview a file or an # image in the preview window of fzf. # # Dependencies: # - https://github.com/sharkdp/bat # - https://github.com/hpjansson/chafa # - https://iterm2.com/utilities/imgcat fzf-preview(){ if [[ $# -ne 1 ]]; then >&2 echo "usage: $0 FILENAME" exit 1 fi file=${1/#\~\//$HOME/} type=$(file --dereference --mime -- "$file") if [[ ! $type =~ image/ ]]; then if [[ $type =~ =binary ]]; then file "$1" exit fi # Sometimes bat is installed as batcat. if command -v batcat > /dev/null; then batname="batcat" elif command -v bat > /dev/null; then batname="bat" else cat "$1" exit fi ${batname} --style="${BAT_STYLE:-numbers}" --color=always --pager=never -- "$file" exit fi dim=${FZF_PREVIEW_COLUMNS}x${FZF_PREVIEW_LINES} if [[ $dim = x ]]; then dim=$(stty size < /dev/tty | awk '{print $2 "x" $1}') elif ! [[ $KITTY_WINDOW_ID ]] && (( FZF_PREVIEW_TOP + FZF_PREVIEW_LINES == $(stty size < /dev/tty | awk '{print $1}') )); then # Avoid scrolling issue when the Sixel image touches the bottom of the screen # * https://github.com/junegunn/fzf/issues/2544 dim=${FZF_PREVIEW_COLUMNS}x$((FZF_PREVIEW_LINES - 1)) fi # 1. Use kitty icat on kitty terminal if [[ $KITTY_WINDOW_ID ]]; then # 1. 'memory' is the fastest option but if you want the image to be scrollable, # you have to use 'stream'. # # 2. The last line of the output is the ANSI reset code without newline. # This confuses fzf and makes it render scroll offset indicator. # So we remove the last line and append the reset code to its previous line. kitty icat --clear --transfer-mode=memory --unicode-placeholder --stdin=no --place="$dim@0x0" "$file" | sed '$d' | sed $'$s/$/\e[m/' # 2. Use chafa with Sixel output elif command -v chafa > /dev/null; then chafa -f sixel -s "$dim" "$file" # Add a new line character so that fzf can display multiple images in the preview window echo # 3. If chafa is not found but imgcat is available, use it on iTerm2 elif command -v imgcat > /dev/null; then # NOTE: We should use https://iterm2.com/utilities/it2check to check if the # user is running iTerm2. But for the sake of simplicity, we just assume # that's the case here. imgcat -W "${dim%%x*}" -H "${dim##*x}" "$file" # 4. Cannot find any suitable method to preview the image else file "$file" fi } """ # ---- aniskip intergration ---- def aniskip(mal_id, episode): ANISKIP = shutil.which("ani-skip") if not ANISKIP: print("Aniskip not found, please install and try again") return args = [ANISKIP, "-q", str(mal_id), "-e", str(episode)] aniskip_result = subprocess.run(args, text=True, stdout=subprocess.PIPE) if aniskip_result.returncode != 0: return mpv_skip_args = aniskip_result.stdout.strip() return mpv_skip_args.split(" ") # ---- prevew stuff ---- # import tempfile WORKING_DIR = APP_CACHE_DIR # tempfile.gettempdir() IMAGES_DIR = os.path.join(WORKING_DIR, "images") if not os.path.exists(IMAGES_DIR): os.mkdir(IMAGES_DIR) INFO_DIR = os.path.join(WORKING_DIR, "info") if not os.path.exists(INFO_DIR): os.mkdir(INFO_DIR) def save_image_from_url(url: str, file_name: str): image = requests.get(url) with open(f"{IMAGES_DIR}/{file_name}", "wb") as f: f.write(image.content) def save_info_from_str(info: str, file_name: str): with open(f"{INFO_DIR}/{file_name}", "w") as f: f.write(info) def write_search_results( search_results: list[AnilistBaseMediaDataSchema], titles, workers=None, ): H_COLOR = 215, 0, 95 S_COLOR = 208, 208, 208 S_WIDTH = 45 with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor: future_to_task = {} for anime, title in zip(search_results, titles): image_url = anime["coverImage"]["large"] future_to_task[executor.submit(save_image_from_url, image_url, title)] = ( image_url ) # handle the text data template = f""" {get_true_fg("-"*S_WIDTH,*S_COLOR,bold=False)} {get_true_fg('Title(jp):',*H_COLOR)} {anime['title']['romaji']} {get_true_fg('Title(eng):',*H_COLOR)} {anime['title']['english']} {get_true_fg('Popularity:',*H_COLOR)} {anime['popularity']} {get_true_fg('Favourites:',*H_COLOR)} {anime['favourites']} {get_true_fg('Status:',*H_COLOR)} {anime['status']} {get_true_fg('Episodes:',*H_COLOR)} {anime['episodes']} {get_true_fg('Genres:',*H_COLOR)} {anilist_data_helper.format_list_data_with_comma(anime['genres'])} {get_true_fg('Next Episode:',*H_COLOR)} {anilist_data_helper.extract_next_airing_episode(anime['nextAiringEpisode'])} {get_true_fg('Start Date:',*H_COLOR)} {anilist_data_helper.format_anilist_date_object(anime['startDate'])} {get_true_fg('End Date:',*H_COLOR)} {anilist_data_helper.format_anilist_date_object(anime['endDate'])} {get_true_fg("-"*S_WIDTH,*S_COLOR,bold=False)} {get_true_fg('Description:',*H_COLOR)} """ template = textwrap.dedent(template) template = f""" {template} {textwrap.fill(remove_html_tags( str(anime['description'])), width=45)} """ future_to_task[executor.submit(save_info_from_str, template, title)] = title # execute the jobs for future in concurrent.futures.as_completed(future_to_task): task = future_to_task[future] try: future.result() except Exception as exc: logger.error("%r generated an exception: %s" % (task, exc)) # get rofi icons def get_icons(search_results: list[AnilistBaseMediaDataSchema], titles, workers=None): with concurrent.futures.ThreadPoolExecutor(max_workers=workers) as executor: # load the jobs future_to_url = {} for anime, title in zip(search_results, titles): image_url = anime["coverImage"]["large"] future_to_url[executor.submit(save_image_from_url, image_url, title)] = ( image_url ) # execute the jobs for future in concurrent.futures.as_completed(future_to_url): url = future_to_url[future] try: future.result() except Exception as exc: logger.error("%r generated an exception: %s" % (url, exc)) def get_preview(search_results: list[AnilistBaseMediaDataSchema], titles, wait=False): # ensure images and info exists background_worker = Thread( target=write_search_results, args=(search_results, titles) ) background_worker.daemon = True background_worker.start() os.environ["SHELL"] = shutil.which("bash") or "sh" preview = """ %s if [ -s %s/{} ]; then fzf-preview %s/{} else echo Loading... fi if [ -s %s/{} ]; then cat %s/{} else echo Loading... fi """ % ( fzf_preview, IMAGES_DIR, IMAGES_DIR, INFO_DIR, INFO_DIR, ) # preview.replace("\n", ";") if wait: background_worker.join() return preview