fix(registry-cmds): pyright errors

This commit is contained in:
Benexl
2025-10-26 21:30:10 +03:00
parent 39bd7bed61
commit a5e99122f5
2 changed files with 72 additions and 24 deletions

View File

@@ -113,6 +113,7 @@ def _create_tar_backup(
api: str,
):
"""Create a tar-based backup."""
# TODO: Add support for bz2/xz compression if needed
mode = "w:gz" if compress else "w"
with tarfile.open(output_path, mode) as tar:

View File

@@ -5,6 +5,7 @@ Registry restore command - restore registry from backup files
import json
import shutil
import tarfile
import zipfile
from datetime import datetime
from pathlib import Path
@@ -25,6 +26,11 @@ from ....service.registry.service import MediaRegistryService
is_flag=True,
help="Create backup of current registry before restoring",
)
@click.option(
"--backup-current-tar-compression-fmt",
type=click.Choice(["gz", "bz2", "xz"], case_sensitive=False),
help="The compression format to use for the current registry backup (if enabled)",
)
@click.option("--verify", is_flag=True, help="Verify backup integrity before restoring")
@click.option(
"--api",
@@ -38,6 +44,7 @@ def restore(
backup_file: Path,
force: bool,
backup_current: bool,
backup_current_compression_fmt: str,
verify: bool,
api: str,
):
@@ -61,7 +68,7 @@ def restore(
"Verification Failed",
"Backup file appears to be corrupted or invalid",
)
raise click.Abort()
return
feedback.success("Verification", "Backup file integrity verified")
# Check if current registry exists
@@ -77,7 +84,13 @@ def restore(
# Create backup of current registry if requested
if backup_current and registry_exists:
_backup_current_registry(registry_service, api, feedback)
_backup_current_registry(
registry_service,
api,
feedback,
backup_format=backup_format,
compression_fmt=backup_current_compression_fmt,
)
# Show restore summary
_show_restore_summary(backup_file, backup_format, feedback)
@@ -110,7 +123,13 @@ def restore(
def _detect_backup_format(backup_file: Path) -> str:
"""Detect backup file format."""
suffixes = "".join(backup_file.suffixes).lower()
if ".tar" in suffixes or ".gz" in suffixes or ".tgz" in suffixes:
if (
".tar" in suffixes
or ".gz" in suffixes
or ".tgz" in suffixes
or ".bz2" in suffixes
or ".xz" in suffixes
):
return "tar"
elif ".zip" in suffixes:
return "zip"
@@ -122,25 +141,38 @@ def _verify_backup(
) -> bool:
"""Verify backup file integrity."""
try:
metadata = {}
has_registry = has_index = has_metadata = False
if format_type == "tar":
with tarfile.open(backup_file, "r:*") as tar:
names = tar.getnames()
has_registry = any("registry/" in name for name in names)
has_index = any("index/" in name for name in names)
has_metadata = "backup_metadata.json" in names
for name in names:
if name == "registry/":
has_registry = True
continue
if name == "index/":
has_index = True
continue
if name == "backup_metadata.json":
has_metadata = True
continue
if has_metadata:
metadata_member = tar.getmember("backup_metadata.json")
if metadata_file := tar.extractfile(metadata_member):
metadata = json.load(metadata_file)
else: # zip
import zipfile
with zipfile.ZipFile(backup_file, "r") as zip_file:
names = zip_file.namelist()
has_registry = any("registry/" in name for name in names)
has_index = any("index/" in name for name in names)
has_metadata = "backup_metadata.json" in names
for name in names:
if name == "registry/":
has_registry = True
continue
if name == "index/":
has_index = True
continue
if name == "backup_metadata.json":
has_metadata = True
continue
if has_metadata:
with zip_file.open("backup_metadata.json") as metadata_file:
metadata = json.load(metadata_file)
@@ -163,27 +195,42 @@ def _verify_backup(
def _check_registry_exists(registry_service: MediaRegistryService) -> bool:
"""Check if a registry already exists."""
try:
stats = registry_service.get_registry_stats()
return stats.get("total_media", 0) > 0
except Exception:
return False
# TODO: Improve this check to be more robust
return registry_service.media_registry_dir.exists() and any(
registry_service.media_registry_dir.iterdir()
)
def _backup_current_registry(
registry_service: MediaRegistryService, api: str, feedback: FeedbackService
registry_service: MediaRegistryService,
api: str,
feedback: FeedbackService,
backup_format: str,
compression_fmt: str,
):
"""Create backup of current registry before restoring."""
from .backup import _create_tar_backup
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_path = Path(f"viu_registry_pre_restore_{api}_{timestamp}.tar.gz")
if backup_format == "tar":
from .backup import _create_tar_backup
try:
_create_tar_backup(registry_service, backup_path, True, False, feedback, api)
feedback.success("Current Registry Backed Up", f"Saved to {backup_path}")
except Exception as e:
feedback.warning("Backup Warning", f"Failed to backup current registry: {e}")
backup_path = Path(f"viu_registry_pre_restore_{api}_{timestamp}.tar.gz")
try:
_create_tar_backup(
registry_service, backup_path, True, False, feedback, api
)
feedback.success("Current Registry Backed Up", f"Saved to {backup_path}")
except Exception as e:
feedback.warning(
"Backup Warning", f"Failed to backup current registry: {e}"
)
else:
from .backup import _create_zip_backup
backup_path = Path(f"viu_registry_pre_restore_{api}_{timestamp}.zip")
_create_zip_backup(registry_service, backup_path, True, feedback, api)
def _show_restore_summary(