mirror of
https://github.com/Benexl/FastAnime.git
synced 2026-01-17 07:11:50 -08:00
renamed app to anixstream
This commit is contained in:
4
anixstream/Utility/__init__.py
Normal file
4
anixstream/Utility/__init__.py
Normal file
@@ -0,0 +1,4 @@
|
||||
from .media_card_loader import MediaCardLoader
|
||||
from .show_notification import show_notification
|
||||
from .data import themes_available
|
||||
from .utils import write_crash
|
||||
35
anixstream/Utility/anilist_data_helper.py
Normal file
35
anixstream/Utility/anilist_data_helper.py
Normal file
@@ -0,0 +1,35 @@
|
||||
from datetime import datetime
|
||||
|
||||
from libs.anilist.anilist_data_schema import (
|
||||
AnilistDateObject,
|
||||
AnilistMediaNextAiringEpisode,
|
||||
)
|
||||
|
||||
|
||||
# TODO: Add formating options for the final date
|
||||
def format_anilist_date_object(anilist_date_object: AnilistDateObject):
|
||||
if anilist_date_object:
|
||||
return f"{anilist_date_object['day']}/{anilist_date_object['month']}/{anilist_date_object['year']}"
|
||||
else:
|
||||
return "Unknown"
|
||||
|
||||
|
||||
def format_anilist_timestamp(anilist_timestamp: int | None):
|
||||
if anilist_timestamp:
|
||||
return datetime.fromtimestamp(anilist_timestamp).strftime("%d/%m/%Y %H:%M:%S")
|
||||
else:
|
||||
return "Unknown"
|
||||
|
||||
|
||||
def format_list_data_with_comma(data: list | None):
|
||||
if data:
|
||||
return ", ".join(data)
|
||||
else:
|
||||
return "None"
|
||||
|
||||
|
||||
def extract_next_airing_episode(airing_episode: AnilistMediaNextAiringEpisode):
|
||||
if airing_episode:
|
||||
return f"Episode: {airing_episode['episode']} on {format_anilist_timestamp(airing_episode['airingAt'])}"
|
||||
else:
|
||||
return "Completed"
|
||||
41
anixstream/Utility/animdl_config_manager.py
Normal file
41
anixstream/Utility/animdl_config_manager.py
Normal file
@@ -0,0 +1,41 @@
|
||||
from typing import TypedDict
|
||||
import plyer
|
||||
import os
|
||||
|
||||
from .yaml_parser import YamlParser
|
||||
|
||||
|
||||
class AnimdlConfig(TypedDict):
|
||||
default_player: str
|
||||
default_provider: str
|
||||
quality_string: str
|
||||
|
||||
|
||||
if local_data_path:=os.getenv("LOCALAPPDATA"):
|
||||
config_dir = os.path.join(local_data_path,".config")
|
||||
if not os.path.exists(config_dir):
|
||||
os.mkdir(config_dir)
|
||||
animdl_config_folder_location = os.path.join(config_dir, "animdl")
|
||||
else:
|
||||
user_profile_path = plyer.storagepath.get_home_dir() # type: ignore
|
||||
animdl_config_folder_location = os.path.join(user_profile_path, ".animdl")
|
||||
if not os.path.exists(animdl_config_folder_location):
|
||||
os.mkdir(animdl_config_folder_location)
|
||||
|
||||
animdl_config_location = os.path.join(animdl_config_folder_location, "config.yml")
|
||||
# print(animdl_config_location)
|
||||
animdl_config = YamlParser(
|
||||
animdl_config_location,
|
||||
{"default_player": "mpv", "default_provider": "allanime", "quality_string": "best"},
|
||||
AnimdlConfig,
|
||||
)
|
||||
|
||||
|
||||
def update_animdl_config(field_to_update: str, value):
|
||||
current_data = animdl_config.data
|
||||
current_data[f"{field_to_update}"] = value
|
||||
animdl_config.write(current_data)
|
||||
|
||||
|
||||
def get_animdl_config() -> AnimdlConfig:
|
||||
return animdl_config.data
|
||||
6
anixstream/Utility/data.py
Normal file
6
anixstream/Utility/data.py
Normal file
@@ -0,0 +1,6 @@
|
||||
"""
|
||||
Just contains some useful data used across the codebase
|
||||
"""
|
||||
|
||||
themes_available = ['Aliceblue', 'Antiquewhite', 'Aqua', 'Aquamarine', 'Azure', 'Beige', 'Bisque', 'Black', 'Blanchedalmond', 'Blue', 'Blueviolet', 'Brown', 'Burlywood', 'Cadetblue', 'Chartreuse', 'Chocolate', 'Coral', 'Cornflowerblue', 'Cornsilk', 'Crimson', 'Cyan', 'Darkblue', 'Darkcyan', 'Darkgoldenrod', 'Darkgray', 'Darkgrey', 'Darkgreen', 'Darkkhaki', 'Darkmagenta', 'Darkolivegreen', 'Darkorange', 'Darkorchid', 'Darkred', 'Darksalmon', 'Darkseagreen', 'Darkslateblue', 'Darkslategray', 'Darkslategrey', 'Darkturquoise', 'Darkviolet', 'Deeppink', 'Deepskyblue', 'Dimgray', 'Dimgrey', 'Dodgerblue', 'Firebrick', 'Floralwhite', 'Forestgreen', 'Fuchsia', 'Gainsboro', 'Ghostwhite', 'Gold', 'Goldenrod', 'Gray', 'Grey', 'Green', 'Greenyellow', 'Honeydew', 'Hotpink', 'Indianred', 'Indigo', 'Ivory', 'Khaki', 'Lavender', 'Lavenderblush', 'Lawngreen', 'Lemonchiffon', 'Lightblue', 'Lightcoral', 'Lightcyan', 'Lightgoldenrodyellow', 'Lightgreen', 'Lightgray', 'Lightgrey', 'Lightpink', 'Lightsalmon', 'Lightseagreen', 'Lightskyblue', 'Lightslategray', 'Lightslategrey', 'Lightsteelblue', 'Lightyellow', 'Lime', 'Limegreen', 'Linen', 'Magenta', 'Maroon', 'Mediumaquamarine', 'Mediumblue', 'Mediumorchid', 'Mediumpurple', 'Mediumseagreen', 'Mediumslateblue', 'Mediumspringgreen', 'Mediumturquoise', 'Mediumvioletred', 'Midnightblue', 'Mintcream', 'Mistyrose', 'Moccasin', 'Navajowhite', 'Navy', 'Oldlace', 'Olive', 'Olivedrab', 'Orange', 'Orangered', 'Orchid', 'Palegoldenrod', 'Palegreen', 'Paleturquoise', 'Palevioletred', 'Papayawhip', 'Peachpuff', 'Peru', 'Pink', 'Plum', 'Powderblue', 'Purple', 'Red', 'Rosybrown', 'Royalblue', 'Saddlebrown', 'Salmon', 'Sandybrown', 'Seagreen', 'Seashell', 'Sienna', 'Silver', 'Skyblue', 'Slateblue', 'Slategray', 'Slategrey', 'Snow', 'Springgreen', 'Steelblue', 'Tan', 'Teal', 'Thistle', 'Tomato', 'Turquoise', 'Violet', 'Wheat', 'White', 'Whitesmoke', 'Yellow',
|
||||
'Yellowgreen']
|
||||
54
anixstream/Utility/kivy_markup_helper.py
Normal file
54
anixstream/Utility/kivy_markup_helper.py
Normal file
@@ -0,0 +1,54 @@
|
||||
"""
|
||||
Contains helper functions to make your life easy when adding kivy markup to text
|
||||
"""
|
||||
|
||||
from kivy.utils import get_hex_from_color
|
||||
|
||||
|
||||
def bolden(text: str):
|
||||
return f"[b]{text}[/b]"
|
||||
|
||||
|
||||
def italicize(text: str):
|
||||
return f"[i]{text}[/i]"
|
||||
|
||||
|
||||
def underline(text: str):
|
||||
return f"[u]{text}[/u]"
|
||||
|
||||
|
||||
def strike_through(text: str):
|
||||
return f"[s]{text}[/s]"
|
||||
|
||||
|
||||
def sub_script(text: str):
|
||||
return f"[sub]{text}[/sub]"
|
||||
|
||||
|
||||
def super_script(text: str):
|
||||
return f"[sup]{text}[/sup]"
|
||||
|
||||
|
||||
def color_text(text: str, color: tuple):
|
||||
hex_color = get_hex_from_color(color)
|
||||
return f"[color={hex_color}]{text}[/color]"
|
||||
|
||||
|
||||
def font(text: str, font_name: str):
|
||||
return f"[font={font_name}]{text}[/font]"
|
||||
|
||||
|
||||
def font_family(text: str, family: str):
|
||||
return f"[font_family={family}]{text}[/font_family]"
|
||||
|
||||
|
||||
def font_context(text: str, context: str):
|
||||
return f"[font_context={context}]{text}[/font_context]"
|
||||
|
||||
|
||||
def font_size(text: str, size: int):
|
||||
return f"[size={size}]{text}[/size]"
|
||||
|
||||
|
||||
def text_ref(text: str, ref: str):
|
||||
return f"[ref={ref}]{text}[/ref]"
|
||||
276
anixstream/Utility/media_card_loader.py
Normal file
276
anixstream/Utility/media_card_loader.py
Normal file
@@ -0,0 +1,276 @@
|
||||
from collections import deque
|
||||
from time import sleep
|
||||
import threading
|
||||
|
||||
from pytube import YouTube
|
||||
|
||||
from kivy.clock import Clock
|
||||
from kivy.cache import Cache
|
||||
from kivy.loader import _ThreadPool
|
||||
from kivy.logger import Logger
|
||||
|
||||
from View.components import MediaCard
|
||||
from Utility import anilist_data_helper, user_data_helper
|
||||
from libs.anilist.anilist_data_schema import AnilistBaseMediaDataSchema
|
||||
|
||||
|
||||
# Register anime cache in memory
|
||||
Cache.register("yt_stream_links.anime")
|
||||
|
||||
user_anime_list = user_data_helper.get_user_anime_list()
|
||||
|
||||
yt_stream_links = user_data_helper.get_anime_trailer_cache()
|
||||
for link in yt_stream_links:
|
||||
Cache.append("yt_stream_links.anime", link[0], tuple(link[1]))
|
||||
|
||||
|
||||
# TODO: Make this process more efficient
|
||||
# for youtube video links gotten from from pytube which is blocking
|
||||
class MediaCardDataLoader(object):
|
||||
"""this class loads an anime media card and gets the trailer url from pytube"""
|
||||
|
||||
def __init__(self):
|
||||
self._resume_cond = threading.Condition()
|
||||
self._num_workers = 5
|
||||
self._max_upload_per_frame = 5
|
||||
self._paused = False
|
||||
self._q_load = deque()
|
||||
self._q_done = deque()
|
||||
self._client = []
|
||||
self._running = False
|
||||
self._start_wanted = False
|
||||
self._trigger_update = Clock.create_trigger(self._update)
|
||||
|
||||
def start(self):
|
||||
"""Start the loader thread/process."""
|
||||
self._running = True
|
||||
|
||||
def run(self, *largs):
|
||||
"""Main loop for the loader."""
|
||||
pass
|
||||
|
||||
def stop(self):
|
||||
"""Stop the loader thread/process."""
|
||||
self._running = False
|
||||
|
||||
def pause(self):
|
||||
"""Pause the loader, can be useful during interactions.
|
||||
|
||||
.. versionadded:: 1.6.0
|
||||
"""
|
||||
self._paused = True
|
||||
|
||||
def resume(self):
|
||||
"""Resume the loader, after a :meth:`pause`.
|
||||
|
||||
.. versionadded:: 1.6.0
|
||||
"""
|
||||
self._paused = False
|
||||
self._resume_cond.acquire()
|
||||
self._resume_cond.notify_all()
|
||||
self._resume_cond.release()
|
||||
|
||||
def _wait_for_resume(self):
|
||||
while self._running and self._paused:
|
||||
self._resume_cond.acquire()
|
||||
self._resume_cond.wait(0.25)
|
||||
self._resume_cond.release()
|
||||
|
||||
def cached_fetch_data(self, yt_watch_url):
|
||||
data: tuple = Cache.get("yt_stream_links.anime", yt_watch_url) # type: ignore # trailer_url is the yt_watch_link
|
||||
|
||||
if not data[0]:
|
||||
yt = YouTube(yt_watch_url)
|
||||
preview_image = yt.thumbnail_url
|
||||
try:
|
||||
video_stream_url = yt.streams.filter(
|
||||
progressive=True, file_extension="mp4"
|
||||
)[-1].url
|
||||
data = preview_image, video_stream_url
|
||||
yt_stream_links.append((yt_watch_url, data))
|
||||
user_data_helper.update_anime_trailer_cache(yt_stream_links)
|
||||
except:
|
||||
data = preview_image, None
|
||||
return data
|
||||
|
||||
def _load(self, kwargs):
|
||||
while len(self._q_done) >= (self._max_upload_per_frame * self._num_workers):
|
||||
sleep(0.1) # type: ignore
|
||||
self._wait_for_resume()
|
||||
yt_watch_link = kwargs["yt_watch_link"]
|
||||
try:
|
||||
data = self.cached_fetch_data(yt_watch_link)
|
||||
except Exception as e:
|
||||
data = None
|
||||
Logger.error("Pytube:{e}")
|
||||
|
||||
self._q_done.appendleft((yt_watch_link, data))
|
||||
self._trigger_update()
|
||||
|
||||
def _update(self, *largs):
|
||||
if self._start_wanted:
|
||||
if not self._running:
|
||||
self.start()
|
||||
self._start_wanted = False
|
||||
|
||||
# in pause mode, don't unqueue anything.
|
||||
if self._paused:
|
||||
self._trigger_update()
|
||||
return
|
||||
|
||||
for _ in range(self._max_upload_per_frame):
|
||||
try:
|
||||
yt_watch_link, data = self._q_done.pop()
|
||||
except IndexError:
|
||||
return
|
||||
# update client
|
||||
for c_yt_watch_link, client in self._client[:]:
|
||||
if yt_watch_link != c_yt_watch_link:
|
||||
continue
|
||||
|
||||
# got one client to update
|
||||
if data:
|
||||
trailer_url = data[1]
|
||||
if trailer_url:
|
||||
client.set_trailer_url(trailer_url)
|
||||
Logger.info(f"Pytube:Found trailer url for {client.title}")
|
||||
Cache.append("yt_stream_links.anime", yt_watch_link, data)
|
||||
self._client.remove((c_yt_watch_link, client))
|
||||
|
||||
self._trigger_update()
|
||||
|
||||
def media_card(
|
||||
self,
|
||||
anime_item: AnilistBaseMediaDataSchema,
|
||||
load_callback=None,
|
||||
post_callback=None,
|
||||
**kwargs,
|
||||
):
|
||||
|
||||
media_card = MediaCard()
|
||||
media_card.anime_id = anime_id = anime_item["id"]
|
||||
|
||||
# TODO: ADD language preference
|
||||
if anime_item["title"].get("english"):
|
||||
media_card.title = anime_item["title"]["english"]
|
||||
else:
|
||||
media_card.title = anime_item["title"]["romaji"]
|
||||
|
||||
media_card.cover_image_url = anime_item["coverImage"]["medium"]
|
||||
|
||||
media_card.popularity = str(anime_item["popularity"])
|
||||
|
||||
media_card.favourites = str(anime_item["favourites"])
|
||||
|
||||
media_card.episodes = str(anime_item["episodes"])
|
||||
|
||||
if anime_item.get("description"):
|
||||
media_card.description = anime_item["description"]
|
||||
else:
|
||||
media_card.description = "None"
|
||||
|
||||
# TODO: switch to season and year
|
||||
media_card.first_aired_on = (
|
||||
f'{anilist_data_helper.format_anilist_date_object(anime_item["startDate"])}'
|
||||
)
|
||||
|
||||
media_card.studios = anilist_data_helper.format_list_data_with_comma(
|
||||
[
|
||||
studio["name"]
|
||||
for studio in anime_item["studios"]["nodes"]
|
||||
if studio["isAnimationStudio"]
|
||||
]
|
||||
)
|
||||
|
||||
media_card.producers = anilist_data_helper.format_list_data_with_comma(
|
||||
[
|
||||
studio["name"]
|
||||
for studio in anime_item["studios"]["nodes"]
|
||||
if not studio["isAnimationStudio"]
|
||||
]
|
||||
)
|
||||
|
||||
media_card.next_airing_episode = "{}".format(
|
||||
anilist_data_helper.extract_next_airing_episode(
|
||||
anime_item["nextAiringEpisode"]
|
||||
)
|
||||
)
|
||||
if anime_item.get("tags"):
|
||||
media_card.tags = anilist_data_helper.format_list_data_with_comma(
|
||||
[tag["name"] for tag in anime_item["tags"]]
|
||||
)
|
||||
|
||||
media_card.media_status = anime_item["status"]
|
||||
|
||||
if anime_item.get("genres"):
|
||||
media_card.genres = anilist_data_helper.format_list_data_with_comma(
|
||||
anime_item["genres"]
|
||||
)
|
||||
|
||||
if anime_id in user_anime_list:
|
||||
media_card.is_in_my_list = True
|
||||
|
||||
if anime_item["averageScore"]:
|
||||
stars = int(anime_item["averageScore"] / 100 * 6)
|
||||
if stars:
|
||||
for i in range(stars):
|
||||
media_card.stars[i] = 1
|
||||
|
||||
# Setting up trailer info to be gotten if available
|
||||
if anime_item["trailer"]:
|
||||
yt_watch_link = "https://youtube.com/watch?v=" + anime_item["trailer"]["id"]
|
||||
data = Cache.get("yt_stream_links.anime", yt_watch_link) # type: ignore # trailer_url is the yt_watch_link
|
||||
if data:
|
||||
if data[1] not in (None, False):
|
||||
media_card.set_preview_image(data[0])
|
||||
media_card.set_trailer_url(data[1])
|
||||
return media_card
|
||||
else:
|
||||
# if data is None, this is really the first time
|
||||
self._client.append((yt_watch_link, media_card))
|
||||
self._q_load.appendleft(
|
||||
{
|
||||
"yt_watch_link": yt_watch_link,
|
||||
"load_callback": load_callback,
|
||||
"post_callback": post_callback,
|
||||
"current_anime": anime_item["id"],
|
||||
"kwargs": kwargs,
|
||||
}
|
||||
)
|
||||
if not kwargs.get("nocache", False):
|
||||
Cache.append("yt_stream_links.anime", yt_watch_link, (False, False))
|
||||
self._start_wanted = True
|
||||
self._trigger_update()
|
||||
return media_card
|
||||
|
||||
|
||||
class LoaderThreadPool(MediaCardDataLoader):
|
||||
def __init__(self):
|
||||
super(LoaderThreadPool, self).__init__()
|
||||
self.pool: _ThreadPool | None = None
|
||||
|
||||
def start(self):
|
||||
super(LoaderThreadPool, self).start()
|
||||
self.pool = _ThreadPool(self._num_workers)
|
||||
Clock.schedule_interval(self.run, 0)
|
||||
|
||||
def stop(self):
|
||||
super(LoaderThreadPool, self).stop()
|
||||
Clock.unschedule(self.run)
|
||||
self.pool.stop() # type: ignore
|
||||
|
||||
def run(self, *largs):
|
||||
while self._running:
|
||||
try:
|
||||
parameters = self._q_load.pop()
|
||||
except:
|
||||
return
|
||||
self.pool.add_task(self._load, parameters) # type: ignore
|
||||
|
||||
|
||||
MediaCardLoader = LoaderThreadPool()
|
||||
Logger.info(
|
||||
"MediaCardLoader: using a thread pool of {} workers".format(
|
||||
MediaCardLoader._num_workers
|
||||
)
|
||||
)
|
||||
17
anixstream/Utility/observer.py
Normal file
17
anixstream/Utility/observer.py
Normal file
@@ -0,0 +1,17 @@
|
||||
|
||||
# Of course, "very flexible Python" allows you to do without an abstract
|
||||
# superclass at all or use the clever exception `NotImplementedError`. In my
|
||||
# opinion, this can negatively affect the architecture of the application.
|
||||
# I would like to point out that using Kivy, one could use the on-signaling
|
||||
# model. In this case, when the state changes, the model will send a signal
|
||||
# that can be received by all attached observers. This approach seems less
|
||||
# universal - you may want to use a different library in the future.
|
||||
|
||||
|
||||
class Observer:
|
||||
"""Abstract superclass for all observers."""
|
||||
|
||||
def model_is_changed(self):
|
||||
"""
|
||||
The method that will be called on the observer when the model changes.
|
||||
"""
|
||||
29
anixstream/Utility/show_notification.py
Normal file
29
anixstream/Utility/show_notification.py
Normal file
@@ -0,0 +1,29 @@
|
||||
from kivymd.uix.snackbar import MDSnackbar, MDSnackbarText, MDSnackbarSupportingText
|
||||
from kivy.clock import Clock
|
||||
|
||||
|
||||
def show_notification(title, details):
|
||||
"""helper function to display notifications
|
||||
|
||||
Args:
|
||||
title (str): the title of your message
|
||||
details (str): the details of your message
|
||||
"""
|
||||
|
||||
def _show(dt):
|
||||
MDSnackbar(
|
||||
MDSnackbarText(
|
||||
text=title,
|
||||
adaptive_height=True,
|
||||
),
|
||||
MDSnackbarSupportingText(
|
||||
text=details, shorten=False, max_lines=0, adaptive_height=True
|
||||
),
|
||||
duration=5,
|
||||
y="10dp",
|
||||
pos_hint={"bottom": 1, "right": 0.99},
|
||||
padding=[0, 0, "8dp", "8dp"],
|
||||
size_hint_x=0.4,
|
||||
).open()
|
||||
|
||||
Clock.schedule_once(_show, 1)
|
||||
82
anixstream/Utility/user_data_helper.py
Normal file
82
anixstream/Utility/user_data_helper.py
Normal file
@@ -0,0 +1,82 @@
|
||||
"""
|
||||
Contains Helper functions to read and write the user data files
|
||||
"""
|
||||
|
||||
from kivy.storage.jsonstore import JsonStore
|
||||
from datetime import date, datetime
|
||||
from kivy.logger import Logger
|
||||
from kivy.resources import resource_find
|
||||
|
||||
today = date.today()
|
||||
now = datetime.now()
|
||||
|
||||
user_data = JsonStore(resource_find("user_data.json"))
|
||||
yt_cache = JsonStore(resource_find("yt_cache.json"))
|
||||
|
||||
|
||||
# Get the user data
|
||||
def get_user_anime_list() -> list:
|
||||
try:
|
||||
return user_data.get("user_anime_list")[
|
||||
"user_anime_list"
|
||||
] # returns a list of anime ids
|
||||
except Exception as e:
|
||||
Logger.warning(f"User Data:Read failure:{e}")
|
||||
return []
|
||||
|
||||
|
||||
def update_user_anime_list(updated_list: list):
|
||||
try:
|
||||
updated_list_ = list(set(updated_list))
|
||||
user_data.put("user_anime_list", user_anime_list=updated_list_)
|
||||
except Exception as e:
|
||||
Logger.warning(f"User Data:Update failure:{e}")
|
||||
|
||||
|
||||
# Get the user data
|
||||
def get_user_downloads() -> list:
|
||||
try:
|
||||
return user_data.get("user_downloads")[
|
||||
"user_downloads"
|
||||
] # returns a list of anime ids
|
||||
except Exception as e:
|
||||
Logger.warning(f"User Data:Read failure:{e}")
|
||||
return []
|
||||
|
||||
|
||||
def update_user_downloads(updated_list: list):
|
||||
try:
|
||||
user_data.put("user_downloads", user_downloads=list(set(updated_list)))
|
||||
except Exception as e:
|
||||
Logger.warning(f"User Data:Update failure:{e}")
|
||||
|
||||
|
||||
# Yt persistent anime trailer cache
|
||||
t = 1
|
||||
if now.hour <= 6:
|
||||
t = 1
|
||||
elif now.hour <= 12:
|
||||
t = 2
|
||||
elif now.hour <= 18:
|
||||
t = 3
|
||||
else:
|
||||
t = 4
|
||||
|
||||
yt_anime_trailer_cache_name = f"{today}{t}"
|
||||
|
||||
|
||||
def get_anime_trailer_cache() -> list:
|
||||
try:
|
||||
return yt_cache["yt_stream_links"][f"{yt_anime_trailer_cache_name}"]
|
||||
except Exception as e:
|
||||
Logger.warning(f"User Data:Read failure:{e}")
|
||||
return []
|
||||
|
||||
|
||||
def update_anime_trailer_cache(yt_stream_links: list):
|
||||
try:
|
||||
yt_cache.put(
|
||||
"yt_stream_links", **{f"{yt_anime_trailer_cache_name}": yt_stream_links}
|
||||
)
|
||||
except Exception as e:
|
||||
Logger.warning(f"User Data:Update failure:{e}")
|
||||
34
anixstream/Utility/utils.py
Normal file
34
anixstream/Utility/utils.py
Normal file
@@ -0,0 +1,34 @@
|
||||
from datetime import datetime
|
||||
import shutil
|
||||
import os
|
||||
# TODO: make it use color_text instead of fixed vals
|
||||
# from .kivy_markup_helper import color_text
|
||||
|
||||
|
||||
# utility functions
|
||||
def write_crash(e: Exception):
|
||||
index = datetime.today()
|
||||
error = f"[b][color=#fa0000][ {index} ]:[/color][/b]\n(\n\n{e}\n\n)\n"
|
||||
try:
|
||||
with open("crashdump.txt", "a") as file:
|
||||
file.write(error)
|
||||
except:
|
||||
with open("crashdump.txt", "w") as file:
|
||||
file.write(error)
|
||||
return index
|
||||
|
||||
|
||||
def read_crash_file():
|
||||
crash_file_path = "./crashfile.txt"
|
||||
if not os.path.exists(crash_file_path):
|
||||
return None
|
||||
else:
|
||||
with open(crash_file_path,"r") as file:
|
||||
return file.read()
|
||||
|
||||
def move_file(source_path, dest_path):
|
||||
try:
|
||||
path = shutil.move(source_path, dest_path)
|
||||
return (1, path)
|
||||
except Exception as e:
|
||||
return (0, e)
|
||||
34
anixstream/Utility/yaml_parser.py
Normal file
34
anixstream/Utility/yaml_parser.py
Normal file
@@ -0,0 +1,34 @@
|
||||
import yaml
|
||||
|
||||
import os
|
||||
|
||||
|
||||
class YamlParser:
|
||||
"""makes managing yaml files easier"""
|
||||
|
||||
data = {}
|
||||
|
||||
def __init__(self, file_path: str, default, data_type):
|
||||
self.file_path: str = file_path
|
||||
self.data: data_type
|
||||
if os.path.exists(file_path):
|
||||
try:
|
||||
with open(self.file_path, "r") as yaml_file:
|
||||
self.data = yaml.safe_load(yaml_file)
|
||||
except:
|
||||
self.data = default
|
||||
with open(file_path, "w") as yaml_file:
|
||||
yaml.dump(default, yaml_file)
|
||||
else:
|
||||
self.data = default
|
||||
with open(file_path, "w") as yaml_file:
|
||||
yaml.dump(default, yaml_file)
|
||||
|
||||
def read(self):
|
||||
with open(self.file_path, "r") as yaml_file:
|
||||
self.data = yaml.safe_load(yaml_file)
|
||||
return self.data
|
||||
|
||||
def write(self, new_obj):
|
||||
with open(self.file_path, "w") as yaml_file:
|
||||
yaml.dump(new_obj, yaml_file)
|
||||
Reference in New Issue
Block a user