feat: make allanime provider functional

This commit is contained in:
Benexl
2025-07-07 17:35:19 +03:00
parent 317fee916b
commit 783b63219f
25 changed files with 286 additions and 206 deletions

View File

@@ -7,9 +7,9 @@ PLATFORM = sys.platform
APP_NAME = os.environ.get("FASTANIME_APP_NAME", "fastanime")
try:
pkg = resources.files("fastanime")
APP_DIR = Path(str(resources.files("fastanime")))
ASSETS_DIR = pkg / "assets"
ASSETS_DIR = APP_DIR / "assets"
DEFAULTS = ASSETS_DIR / "defaults"
ICONS_DIR = ASSETS_DIR / "icons"
@@ -26,8 +26,8 @@ try:
except ModuleNotFoundError:
from pathlib import Path
pkg = Path(__file__).resolve().parent.parent
ASSETS_DIR = pkg / "assets"
APP_DIR = Path(__file__).resolve().parent.parent
ASSETS_DIR = APP_DIR / "assets"
DEFAULTS = ASSETS_DIR / "defaults"
ICONS_DIR = ASSETS_DIR / "icons"
@@ -56,19 +56,15 @@ try:
APP_DATA_DIR = Path(click.get_app_dir(APP_NAME, roaming=False))
except ModuleNotFoundError:
# TODO: change to path objects
if PLATFORM == "win32":
folder = os.environ.get("LOCALAPPDATA")
if folder is None:
folder = os.path.expanduser("~")
APP_DATA_DIR = os.path.join(folder, APP_NAME)
folder = Path.home()
APP_DATA_DIR = Path(folder) / APP_NAME
if PLATFORM == "darwin":
APP_DATA_DIR = os.path.join(
os.path.expanduser("~/Library/Application Support"), APP_NAME
)
APP_DATA_DIR = os.path.join(
os.environ.get("XDG_CONFIG_HOME", os.path.expanduser("~/.config")),
)
APP_DATA_DIR = Path("~/Library/Application Support") / APP_NAME
APP_DATA_DIR = Path(os.environ.get("XDG_CONFIG_HOME", "~/.config")) / APP_NAME
if PLATFORM == "win32":
APP_CACHE_DIR = APP_DATA_DIR / "cache"
@@ -79,10 +75,10 @@ elif PLATFORM == "darwin":
USER_VIDEOS_DIR = Path.home() / "Movies" / APP_NAME
else:
xdg_cache_home = Path(os.environ.get("XDG_CACHE_HOME", Path.home() / ".cache"))
xdg_cache_home = Path(os.environ.get("XDG_CACHE_HOME", "~/.cache"))
APP_CACHE_DIR = xdg_cache_home / APP_NAME
xdg_videos_dir = Path(os.environ.get("XDG_VIDEOS_DIR", Path.home() / "Videos"))
xdg_videos_dir = Path(os.environ.get("XDG_VIDEOS_DIR", "~/Videos"))
USER_VIDEOS_DIR = xdg_videos_dir / APP_NAME
APP_DATA_DIR.mkdir(parents=True, exist_ok=True)
@@ -94,4 +90,4 @@ USER_WATCH_HISTORY_PATH = APP_DATA_DIR / "watch_history.json"
USER_CONFIG_PATH = APP_DATA_DIR / "config.ini"
LOG_FILE_PATH = APP_CACHE_DIR / "fastanime.log"
ICON_PATH = ICONS_DIR / ("logo.ico" if PLATFORM == "Windows" else "logo.png")
ICON_PATH = ICONS_DIR / ("logo.ico" if PLATFORM == "Win32" else "logo.png")

View File

@@ -1,10 +1,10 @@
from __future__ import annotations
import json
import logging
from pathlib import Path
from typing import TYPE_CHECKING
from httpx import Client, Response
from .networking import TIMEOUT
if TYPE_CHECKING:
@@ -32,53 +32,17 @@ def load_graphql_from_file(file: Path) -> str:
def execute_graphql_query(
url: str, httpx_client: Client, graphql_file: Path, variables: dict
) -> dict | None:
"""
Executes a GraphQL query using a GET request with query parameters.
Suitable for read-only operations.
Args:
url: The base GraphQL endpoint URL.
httpx_client: The httpx.Client instance to use.
graphql_file: Path to the .gql file containing the query.
variables: A dictionary of variables for the query.
Returns:
The JSON response as a dictionary, or None on failure.
"""
) -> Response:
query = load_graphql_from_file(graphql_file)
params = {"query": query, "variables": json.dumps(variables)}
try:
response = httpx_client.get(url, params=params, timeout=TIMEOUT)
response.raise_for_status()
return response.json()
except Exception as e:
logger.error(f"GraphQL GET request failed for {graphql_file.name}: {e}")
return None
response = httpx_client.get(url, params=params, timeout=TIMEOUT)
return response
def execute_graphql_mutation(
url: str, httpx_client: Client, graphql_file: Path, variables: dict
) -> dict | None:
"""
Executes a GraphQL mutation using a POST request with a JSON body.
Suitable for write/update operations.
Args:
url: The GraphQL endpoint URL.
httpx_client: The httpx.Client instance to use.
graphql_file: Path to the .gql file containing the mutation.
variables: A dictionary of variables for the mutation.
Returns:
The JSON response as a dictionary, or None on failure.
"""
) -> Response:
query = load_graphql_from_file(graphql_file)
json_body = {"query": query, "variables": variables}
try:
response = httpx_client.post(url, json=json_body, timeout=TIMEOUT)
response.raise_for_status()
return response.json()
except Exception as e:
logger.error(f"GraphQL POST request failed for {graphql_file.name}: {e}")
return None
response = httpx_client.post(url, json=json_body, timeout=TIMEOUT)
return response

View File

@@ -1,6 +1,6 @@
import re
from importlib import resources
from pathlib import Path
from .....core.constants import APP_DIR
SERVERS_AVAILABLE = [
"sharepoint",
@@ -28,7 +28,7 @@ MP4_SERVER_JUICY_STREAM_REGEX = re.compile(
)
# graphql files
GQLS = resources.files("fastanime.libs.providers.anime.allanime") / "queries"
SEARCH_GQL = Path(str(GQLS / "search.gql"))
ANIME_GQL = Path(str(GQLS / "anime.gql"))
EPISODE_GQL = Path(str(GQLS / "episode.gql"))
GQLS = APP_DIR / "libs" / "providers" / "anime" / "allanime" / "queries"
SEARCH_GQL = GQLS / "search.gql"
ANIME_GQL = GQLS / "anime.gql"
EPISODE_GQL = GQLS / "episodes.gql"

View File

@@ -1,7 +1,7 @@
from ...types import EpisodeStream, Server
from ..constants import API_BASE_URL
from ..types import AllAnimeEpisode, AllAnimeSource
from .extractor import BaseExtractor
from .base import BaseExtractor
class AkExtractor(BaseExtractor):

View File

@@ -0,0 +1,20 @@
from abc import ABC, abstractmethod
from httpx import Client
from ...types import Server
from ..types import AllAnimeEpisode, AllAnimeSource
class BaseExtractor(ABC):
@classmethod
@abstractmethod
def extract(
cls,
url: str,
client: Client,
episode_number: str,
episode: AllAnimeEpisode,
source: AllAnimeSource,
) -> Server | None:
pass

View File

@@ -1,7 +1,7 @@
from ...types import EpisodeStream, Server
from ..constants import API_BASE_URL
from ..types import AllAnimeEpisode, AllAnimeSource
from .extractor import BaseExtractor
from .base import BaseExtractor
class SakExtractor(BaseExtractor):

View File

@@ -1,9 +1,8 @@
from abc import ABC, abstractmethod
from logging import getLogger
from httpx import Client
from ...types import Server
from ..types import AllAnimeEpisode, AllAnimeSource
from ..utils import one_digit_symmetric_xor
from ..utils import debug_extractor, logger, one_digit_symmetric_xor
from .ak import AkExtractor
from .dropbox import SakExtractor
from .filemoon import FmHlsExtractor, OkExtractor
@@ -16,16 +15,6 @@ from .we_transfer import KirExtractor
from .wixmp import DefaultExtractor
from .yt_mp4 import YtExtractor
logger = getLogger(__name__)
class BaseExtractor(ABC):
@abstractmethod
@classmethod
def extract(cls, url, client, episode_number, episode, source) -> Server:
pass
AVAILABLE_SOURCES = {
"Sak": SakExtractor,
"S-mp4": Smp4Extractor,
@@ -44,8 +33,12 @@ OTHER_SOURCES = {
}
@debug_extractor
def extract_server(
client, episode_number: str, episode: AllAnimeEpisode, source: AllAnimeSource
client: Client,
episode_number: str,
episode: AllAnimeEpisode,
source: AllAnimeSource,
) -> Server | None:
url = source.get("sourceUrl")
if not url:

View File

@@ -1,7 +1,7 @@
from ...types import EpisodeStream, Server
from ..constants import API_BASE_URL, MP4_SERVER_JUICY_STREAM_REGEX
from ..types import AllAnimeEpisode, AllAnimeSource
from .extractor import BaseExtractor
from .base import BaseExtractor
# TODO: requires decoding obsfucated js (filemoon)

View File

@@ -1,7 +1,7 @@
from ...types import EpisodeStream, Server
from ..constants import API_BASE_URL
from ..types import AllAnimeEpisode, AllAnimeSource
from .extractor import BaseExtractor
from .base import BaseExtractor
class Lufmp4Extractor(BaseExtractor):

View File

@@ -1,30 +1,31 @@
import logging
from ...types import EpisodeStream, Server
from ..constants import API_BASE_URL, MP4_SERVER_JUICY_STREAM_REGEX
from ..types import AllAnimeEpisode, AllAnimeSource
from .extractor import BaseExtractor
from ..constants import MP4_SERVER_JUICY_STREAM_REGEX
from ..utils import logger
from .base import BaseExtractor
class Mp4Extractor(BaseExtractor):
@classmethod
def extract(
cls,
url,
client,
episode_number: str,
episode: AllAnimeEpisode,
source: AllAnimeSource,
) -> Server:
response = client.get(
f"https://{API_BASE_URL}{url.replace('clock', 'clock.json')}",
timeout=10,
)
def extract(cls, url, client, episode_number, episode, source):
response = client.get(url, timeout=10, follow_redirects=True)
response.raise_for_status()
streams = response.json()
embed_html = response.text.replace(" ", "").replace("\n", "")
# NOTE: some of the video were deleted so the embed html will just be "Filewasdeleted"
vid = MP4_SERVER_JUICY_STREAM_REGEX.search(embed_html)
if not vid:
raise Exception("")
if embed_html == "Filewasdeleted":
logger.debug(
"Failed to extract stream url from mp4-uploads. Reason: Filewasdeleted"
)
return
logger.debug(
f"Failed to extract stream url from mp4-uploads. Reason: unknown. Embed html: {embed_html}"
)
return
return Server(
name="mp4-upload",
links=[EpisodeStream(link=vid.group(1), quality="1080")],

View File

@@ -1,30 +1,27 @@
from ...types import EpisodeStream, Server
from ..constants import API_BASE_URL
from ..types import AllAnimeEpisode, AllAnimeSource
from .extractor import BaseExtractor
from ..types import AllAnimeEpisodeStreams
from .base import BaseExtractor
class Smp4Extractor(BaseExtractor):
@classmethod
def extract(
cls,
url,
client,
episode_number: str,
episode: AllAnimeEpisode,
source: AllAnimeSource,
) -> Server:
def extract(cls, url, client, episode_number, episode, source):
response = client.get(
f"https://{API_BASE_URL}{url.replace('clock', 'clock.json')}",
timeout=10,
)
response.raise_for_status()
streams = response.json()
streams: AllAnimeEpisodeStreams = response.json()
return Server(
name="sharepoint",
links=[
EpisodeStream(link=link, quality="1080") for link in streams["links"]
EpisodeStream(
link=stream["link"],
quality="1080",
format=stream["resolutionStr"],
)
for stream in streams["links"]
],
episode_title=episode["notes"],
headers={"Referer": f"https://{API_BASE_URL}/"},

View File

@@ -1,7 +1,7 @@
from ...types import EpisodeStream, Server
from ..constants import API_BASE_URL
from ..types import AllAnimeEpisode, AllAnimeSource
from .extractor import BaseExtractor
from .base import BaseExtractor
class SsHlsExtractor(BaseExtractor):

View File

@@ -1,7 +1,7 @@
from ...types import EpisodeStream, Server
from ..constants import API_BASE_URL
from ..types import AllAnimeEpisode, AllAnimeSource
from .extractor import BaseExtractor
from .base import BaseExtractor
# TODO: requires some serious work i think : )

View File

@@ -1,7 +1,7 @@
from ...types import EpisodeStream, Server
from ..constants import API_BASE_URL
from ..types import AllAnimeEpisode, AllAnimeSource
from .extractor import BaseExtractor
from .base import BaseExtractor
class KirExtractor(BaseExtractor):

View File

@@ -1,30 +1,25 @@
from ...types import EpisodeStream, Server
from ..constants import API_BASE_URL
from ..types import AllAnimeEpisode, AllAnimeSource
from .extractor import BaseExtractor
from ..types import AllAnimeEpisodeStreams
from .base import BaseExtractor
class DefaultExtractor(BaseExtractor):
@classmethod
def extract(
cls,
url,
client,
episode_number: str,
episode: AllAnimeEpisode,
source: AllAnimeSource,
) -> Server:
def extract(cls, url, client, episode_number, episode, source):
response = client.get(
f"https://{API_BASE_URL}{url.replace('clock', 'clock.json')}",
timeout=10,
)
response.raise_for_status()
streams = response.json()
streams: AllAnimeEpisodeStreams = response.json()
return Server(
name="wixmp",
links=[
EpisodeStream(link=link, quality="1080") for link in streams["links"]
EpisodeStream(
link=stream["link"], quality="1080", format=stream["resolutionStr"]
)
for stream in streams["links"]
],
episode_title=episode["notes"],
headers={"Referer": f"https://{API_BASE_URL}/"},

View File

@@ -1,7 +1,7 @@
from ...types import EpisodeStream, Server
from ..constants import API_BASE_URL
from ..types import AllAnimeEpisode, AllAnimeSource
from .extractor import BaseExtractor
from .base import BaseExtractor
class YtExtractor(BaseExtractor):

View File

@@ -17,7 +17,11 @@ def map_to_search_results(response: Response) -> SearchResults:
id=result["_id"],
title=result["name"],
media_type=result["__typename"],
available_episodes=AnimeEpisodes(sub=result["availableEpisodes"]),
episodes=AnimeEpisodes(
sub=generate_list(result["availableEpisodes"]["sub"]),
dub=generate_list(result["availableEpisodes"]["dub"]),
raw=generate_list(result["availableEpisodes"]["raw"]),
),
)
for result in search_results["shows"]["edges"]
],
@@ -30,9 +34,9 @@ def map_to_anime_result(response: Response) -> Anime:
id=anime["_id"],
title=anime["name"],
episodes=AnimeEpisodes(
sub=generate_list(anime["availableEpisodesDetail"]["sub"]),
dub=generate_list(anime["availableEpisodesDetail"]["dub"]),
raw=generate_list(anime["availableEpisodesDetail"]["raw"]),
sub=anime["availableEpisodesDetail"]["sub"],
dub=anime["availableEpisodesDetail"]["dub"],
raw=anime["availableEpisodesDetail"]["raw"],
),
type=anime.get("__typename"),
)

View File

@@ -3,10 +3,9 @@ from typing import TYPE_CHECKING
from .....core.utils.graphql import execute_graphql_query
from ..base import BaseAnimeProvider
from ..utils.decorators import debug_provider
from ..utils.debug import debug_provider
from .constants import (
ANIME_GQL,
API_BASE_URL,
API_GRAPHQL_ENDPOINT,
API_GRAPHQL_REFERER,
EPISODE_GQL,
@@ -27,7 +26,7 @@ class AllAnime(BaseAnimeProvider):
HEADERS = {"Referer": API_GRAPHQL_REFERER}
@debug_provider
def search_for_anime(self, params):
def search(self, params):
response = execute_graphql_query(
API_GRAPHQL_ENDPOINT,
self.client,
@@ -47,19 +46,19 @@ class AllAnime(BaseAnimeProvider):
return map_to_search_results(response)
@debug_provider
def get_anime(self, params):
def get(self, params):
response = execute_graphql_query(
API_GRAPHQL_ENDPOINT,
self.client,
ANIME_GQL,
variables={"showId": params.anime_id},
variables={"showId": params.id},
)
return map_to_anime_result(response)
@debug_provider
def get_episode_streams(self, params):
def episode_streams(self, params):
episode_response = execute_graphql_query(
API_BASE_URL,
API_GRAPHQL_ENDPOINT,
self.client,
EPISODE_GQL,
variables={
@@ -72,3 +71,9 @@ class AllAnime(BaseAnimeProvider):
for source in episode["sourceUrls"]:
if server := extract_server(self.client, params.episode, episode, source):
yield server
if __name__ == "__main__":
from ..utils.debug import test_anime_provider
test_anime_provider(AllAnime)

View File

@@ -2,6 +2,12 @@ from typing import Literal, TypedDict
class AllAnimeEpisodesDetail(TypedDict):
dub: list[str]
sub: list[str]
raw: list[str]
class AllAnimeEpisodes(TypedDict):
dub: int
sub: int
raw: int
@@ -21,7 +27,7 @@ class AllAnimeShow(TypedDict):
class AllAnimeSearchResult(TypedDict):
_id: str
name: str
availableEpisodes: list[str]
availableEpisodes: AllAnimeEpisodesDetail
__typename: str | None
@@ -63,6 +69,17 @@ class AllAnimeSource(TypedDict):
downloads: AllAnimeSourceDownload
class AllAnimeEpisodeStream(TypedDict):
link: str
hls: bool
resolutionStr: str
fromCache: str
class AllAnimeEpisodeStreams(TypedDict):
links: [AllAnimeEpisodeStream]
Server = Literal["gogoanime", "dropbox", "wetransfer", "sharepoint"]

View File

@@ -1,6 +1,11 @@
import functools
import logging
import os
import re
from itertools import cycle
logger = logging.getLogger(__name__)
# Dictionary to map hex values to characters
hex_to_char = {
"01": "9",
@@ -35,6 +40,22 @@ hex_to_char = {
}
def debug_extractor(extractor_function):
@functools.wraps(extractor_function)
def _provider_function_wrapper(*args):
if not os.environ.get("FASTANIME_DEBUG"):
try:
return extractor_function(*args)
except Exception as e:
logger.error(
f"[AllAnime@Server={args[3].get('sourceName', 'UNKNOWN')}]: {e}"
)
else:
return extractor_function(*args, **kwargs)
return _provider_function_wrapper
def give_random_quality(links):
qualities = cycle(["1080", "720", "480", "360"])

View File

@@ -40,4 +40,4 @@ class EpisodeStreamsParams:
class AnimeParams:
"""Parameters for fetching anime details."""
anime_id: str
id: str

View File

@@ -1,26 +1,28 @@
from dataclasses import dataclass
from typing import Literal
from pydantic import BaseModel
@dataclass
class PageInfo:
class BaseAnimeProviderModel(BaseModel):
pass
class PageInfo(BaseAnimeProviderModel):
total: int | None = None
per_page: int | None = None
current_page: int | None = None
@dataclass
class AnimeEpisodes:
class AnimeEpisodes(BaseAnimeProviderModel):
sub: list[str]
dub: list[str] = []
raw: list[str] = []
@dataclass
class SearchResult:
class SearchResult(BaseAnimeProviderModel):
id: str
title: str
available_episodes: AnimeEpisodes
episodes: AnimeEpisodes
other_titles: list[str] = []
media_type: str | None = None
score: int | None = None
@@ -29,24 +31,20 @@ class SearchResult:
poster: str | None = None
@dataclass
class SearchResults:
class SearchResults(BaseAnimeProviderModel):
page_info: PageInfo
results: list[SearchResult]
@dataclass
class AnimeEpisodeInfo:
class AnimeEpisodeInfo(BaseAnimeProviderModel):
id: str
title: str
episode: str
poster: str | None
duration: str | None
translation_type: str | None
title: str | None = None
poster: str | None = None
duration: str | None = None
@dataclass
class Anime:
class Anime(BaseAnimeProviderModel):
id: str
title: str
episodes: AnimeEpisodes
@@ -56,25 +54,23 @@ class Anime:
year: str | None = None
@dataclass
class EpisodeStream:
class EpisodeStream(BaseAnimeProviderModel):
link: str
title: str | None = None
quality: Literal["360", "480", "720", "1080"] = "720"
translation_type: Literal["dub", "sub"] = "sub"
resolution: str | None = None
format: str | None = None
hls: bool | None = None
mp4: bool | None = None
priority: int | None = None
@dataclass
class Subtitle:
class Subtitle(BaseAnimeProviderModel):
url: str
language: str | None = None
@dataclass
class Server:
class Server(BaseAnimeProviderModel):
name: str
links: list[EpisodeStream]
episode_title: str | None = None

View File

@@ -0,0 +1,64 @@
import functools
import logging
import os
from typing import Type
from ..base import BaseAnimeProvider
logger = logging.getLogger(__name__)
def debug_provider(provider_function):
@functools.wraps(provider_function)
def _provider_function_wrapper(self, *args, **kwargs):
provider_name = self.__class__.__name__.upper()
if not os.environ.get("FASTANIME_DEBUG"):
try:
return provider_function(self, *args, **kwargs)
except Exception as e:
logger.error(f"[{provider_name}@{provider_function.__name__}]: {e}")
else:
return provider_function(self, *args, **kwargs)
return _provider_function_wrapper
def test_anime_provider(AnimeProvider: Type[BaseAnimeProvider]):
from httpx import Client
from yt_dlp.utils.networking import random_user_agent
from .....core.constants import APP_ASCII_ART
from ..params import AnimeParams, EpisodeStreamsParams, SearchParams
anime_provider = AnimeProvider(
Client(headers={"User-Agent": random_user_agent(), **AnimeProvider.HEADERS})
)
print(APP_ASCII_ART)
query = input("What anime would you like to stream: ")
search_results = anime_provider.search(SearchParams(query=query))
if not search_results:
return
for i, search_result in enumerate(search_results.results):
print(f"{i + 1}: {search_result.title}")
result = search_results.results[
int(input(f"Select result (1-{len(search_results.results)}): ")) - 1
]
anime = anime_provider.get(AnimeParams(id=result.id))
if not anime:
return
translation_type = input("Preferred Translation Type: [dub,sub,raw]: ")
for episode in getattr(anime.episodes, translation_type):
print(episode)
episode_number = input("What episode do you wish to watch: ")
episode_streams = anime_provider.episode_streams(
EpisodeStreamsParams(
anime_id=anime.id,
episode=episode_number,
translation_type=translation_type, # type:ignore
)
)
if not episode_streams:
return
print(list(episode_streams))

View File

@@ -1,37 +0,0 @@
import functools
import logging
import os
logger = logging.getLogger(__name__)
def debug_provider(provider_function):
@functools.wraps(provider_function)
def _provider_function_wrapper(self, *args, **kwargs):
provider_name = self.__class__.__name__.upper()
if not os.environ.get("FASTANIME_DEBUG"):
try:
return provider_function(self, *args, **kwargs)
except Exception as e:
logger.error(f"[{provider_name}@{provider_function.__name__}]: {e}")
else:
return provider_function(self, *args, **kwargs)
return _provider_function_wrapper
def ensure_internet_connection(provider_function):
@functools.wraps(provider_function)
def _wrapper(*args, **kwargs):
import requests
try:
requests.get("https://google.com", timeout=5)
except requests.ConnectionError:
from sys import exit
print("You are not connected to the internet;Aborting...")
exit(1)
return provider_function(*args, **kwargs)
return _wrapper

44
product_validation.py Normal file
View File

@@ -0,0 +1,44 @@
from dataclasses import dataclass
@dataclass
class Product:
name: str
price: float
quantity: int
def __post_init__(self):
if not isinstance(self.name, str):
raise TypeError(f"Expected 'name' to be a string, got {type(self.name).__name__}")
if not isinstance(self.price, (int, float)):
raise TypeError(f"Expected 'price' to be a number, got {type(self.price).__name__}")
if not isinstance(self.quantity, int):
raise TypeError(f"Expected 'quantity' to be an integer, got {type(self.quantity).__name__}")
if self.price < 0:
raise ValueError("Price cannot be negative.")
if self.quantity < 0:
raise ValueError("Quantity cannot be negative.")
# Valid usage
try:
p1 = Product(name="Laptop", price=1200.50, quantity=10)
print(p1)
except (TypeError, ValueError) as e:
print(f"Error creating product: {e}")
print("-" * 20)
# Invalid type for price
try:
p2 = Product(name="Mouse", price="fifty", quantity=5)
print(p2)
except (TypeError, ValueError) as e:
print(f"Error creating product: {e}")
print("-" * 20)
# Invalid value for quantity
try:
p3 = Product(name="Keyboard", price=75.00, quantity=-2)
print(p3)
except (TypeError, ValueError) as e:
print(f"Error creating product: {e}")