mirror of
https://github.com/Benexl/FastAnime.git
synced 2026-01-18 23:55:40 -08:00
feat(download screen):implement download capabilities
This commit is contained in:
@@ -1,4 +1,10 @@
|
||||
from threading import Thread
|
||||
from queue import Queue
|
||||
|
||||
import yt_dlp
|
||||
from ... import downloads_dir
|
||||
from ..utils import sanitize_filename
|
||||
from ..show_notification import show_notification
|
||||
|
||||
|
||||
class MyLogger:
|
||||
@@ -12,27 +18,55 @@ class MyLogger:
|
||||
print(msg)
|
||||
|
||||
|
||||
def my_hook(d):
|
||||
if d["status"] == "finished":
|
||||
print("Done downloading, now converting ...")
|
||||
def main_progress_hook(data):
|
||||
match data["status"]:
|
||||
case "error":
|
||||
show_notification(
|
||||
"Something went wrong while downloading the video", data["filename"]
|
||||
)
|
||||
case "finished":
|
||||
show_notification("Downloaded", data["filename"])
|
||||
|
||||
|
||||
# URL of the file you want to download
|
||||
url = "http://example.com/path/to/file.mp4"
|
||||
|
||||
# Options for yt-dlp
|
||||
ydl_opts = {
|
||||
"outtmpl": "/path/to/downloaded/file.%(ext)s", # Specify the output path and template
|
||||
"logger": MyLogger(), # Custom logger
|
||||
"progress_hooks": [my_hook], # Progress hook
|
||||
}
|
||||
|
||||
|
||||
# Function to download the file
|
||||
def download_file(url, options):
|
||||
with yt_dlp.YoutubeDL(options) as ydl:
|
||||
ydl.download([url])
|
||||
class YtDLPDownloader:
|
||||
downloads_queue = Queue()
|
||||
|
||||
def _worker(self):
|
||||
while True:
|
||||
task, args = self.downloads_queue.get()
|
||||
try:
|
||||
task(*args)
|
||||
except Exception as e:
|
||||
show_notification("Something went wrong", f"Reason: {e}")
|
||||
self.downloads_queue.task_done()
|
||||
|
||||
def __init__(self):
|
||||
self._thread = Thread(target=self._worker)
|
||||
self._thread.daemon = True
|
||||
self._thread.start()
|
||||
|
||||
# Function to download the file
|
||||
def _download_file(self, url: str, title, custom_progress_hook, silent):
|
||||
anime_title = sanitize_filename(title[0])
|
||||
ydl_opts = {
|
||||
"outtmpl": f"{downloads_dir}/{anime_title}/{anime_title}-episode {title[1]}.%(ext)s", # Specify the output path and template
|
||||
"progress_hooks": [
|
||||
main_progress_hook,
|
||||
custom_progress_hook,
|
||||
], # Progress hook
|
||||
"silent": silent,
|
||||
}
|
||||
|
||||
with yt_dlp.YoutubeDL(ydl_opts) as ydl:
|
||||
ydl.download([url])
|
||||
|
||||
def download_file(self, url: str, title, custom_progress_hook, silent=True):
|
||||
self.downloads_queue.put(
|
||||
(self._download_file, (url, title, custom_progress_hook, silent))
|
||||
)
|
||||
|
||||
|
||||
# Call the function
|
||||
download_file(url, ydl_opts)
|
||||
downloader = YtDLPDownloader()
|
||||
|
||||
@@ -1,7 +1,7 @@
|
||||
import os
|
||||
import shutil
|
||||
from datetime import datetime
|
||||
|
||||
import re
|
||||
# TODO: make it use color_text instead of fixed vals
|
||||
# from .kivy_markup_helper import color_text
|
||||
|
||||
@@ -34,3 +34,62 @@ def move_file(source_path, dest_path):
|
||||
return (1, path)
|
||||
except Exception as e:
|
||||
return (0, e)
|
||||
|
||||
|
||||
def sanitize_filename(filename: str):
|
||||
"""
|
||||
Sanitize a string to be safe for use as a file name.
|
||||
|
||||
:param filename: The original filename string.
|
||||
:return: A sanitized filename string.
|
||||
"""
|
||||
# List of characters not allowed in filenames on various operating systems
|
||||
invalid_chars = r'[<>:"/\\|?*\0]'
|
||||
reserved_names = {
|
||||
"CON",
|
||||
"PRN",
|
||||
"AUX",
|
||||
"NUL",
|
||||
"COM1",
|
||||
"COM2",
|
||||
"COM3",
|
||||
"COM4",
|
||||
"COM5",
|
||||
"COM6",
|
||||
"COM7",
|
||||
"COM8",
|
||||
"COM9",
|
||||
"LPT1",
|
||||
"LPT2",
|
||||
"LPT3",
|
||||
"LPT4",
|
||||
"LPT5",
|
||||
"LPT6",
|
||||
"LPT7",
|
||||
"LPT8",
|
||||
"LPT9",
|
||||
}
|
||||
|
||||
# Replace invalid characters with an underscore
|
||||
sanitized = re.sub(invalid_chars, "_", filename)
|
||||
|
||||
# Remove leading and trailing whitespace
|
||||
sanitized = sanitized.strip()
|
||||
|
||||
# Check for reserved filenames
|
||||
name, ext = os.path.splitext(sanitized)
|
||||
if name.upper() in reserved_names:
|
||||
name += "_file"
|
||||
sanitized = name + ext
|
||||
|
||||
# Ensure the filename is not empty
|
||||
if not sanitized:
|
||||
sanitized = "default_filename"
|
||||
|
||||
return sanitized
|
||||
|
||||
|
||||
# Example usage
|
||||
unsafe_filename = "CON:example?file*name.txt"
|
||||
safe_filename = sanitize_filename(unsafe_filename)
|
||||
print(safe_filename) # Output: 'CON_example_file_name.txt'
|
||||
|
||||
Reference in New Issue
Block a user