From a5e99122f545d4ec054f4ba33c5694b9fb8e159e Mon Sep 17 00:00:00 2001 From: Benexl Date: Sun, 26 Oct 2025 21:30:10 +0300 Subject: [PATCH] fix(registry-cmds): pyright errors --- .../cli/commands/registry/commands/backup.py | 1 + .../cli/commands/registry/commands/restore.py | 95 ++++++++++++++----- 2 files changed, 72 insertions(+), 24 deletions(-) diff --git a/viu_media/cli/commands/registry/commands/backup.py b/viu_media/cli/commands/registry/commands/backup.py index 4b3f6b1..14fa678 100644 --- a/viu_media/cli/commands/registry/commands/backup.py +++ b/viu_media/cli/commands/registry/commands/backup.py @@ -113,6 +113,7 @@ def _create_tar_backup( api: str, ): """Create a tar-based backup.""" + # TODO: Add support for bz2/xz compression if needed mode = "w:gz" if compress else "w" with tarfile.open(output_path, mode) as tar: diff --git a/viu_media/cli/commands/registry/commands/restore.py b/viu_media/cli/commands/registry/commands/restore.py index d04ea37..a2bd50f 100644 --- a/viu_media/cli/commands/registry/commands/restore.py +++ b/viu_media/cli/commands/registry/commands/restore.py @@ -5,6 +5,7 @@ Registry restore command - restore registry from backup files import json import shutil import tarfile +import zipfile from datetime import datetime from pathlib import Path @@ -25,6 +26,11 @@ from ....service.registry.service import MediaRegistryService is_flag=True, help="Create backup of current registry before restoring", ) +@click.option( + "--backup-current-tar-compression-fmt", + type=click.Choice(["gz", "bz2", "xz"], case_sensitive=False), + help="The compression format to use for the current registry backup (if enabled)", +) @click.option("--verify", is_flag=True, help="Verify backup integrity before restoring") @click.option( "--api", @@ -38,6 +44,7 @@ def restore( backup_file: Path, force: bool, backup_current: bool, + backup_current_compression_fmt: str, verify: bool, api: str, ): @@ -61,7 +68,7 @@ def restore( "Verification Failed", "Backup file appears to be corrupted or invalid", ) - raise click.Abort() + return feedback.success("Verification", "Backup file integrity verified") # Check if current registry exists @@ -77,7 +84,13 @@ def restore( # Create backup of current registry if requested if backup_current and registry_exists: - _backup_current_registry(registry_service, api, feedback) + _backup_current_registry( + registry_service, + api, + feedback, + backup_format=backup_format, + compression_fmt=backup_current_compression_fmt, + ) # Show restore summary _show_restore_summary(backup_file, backup_format, feedback) @@ -110,7 +123,13 @@ def restore( def _detect_backup_format(backup_file: Path) -> str: """Detect backup file format.""" suffixes = "".join(backup_file.suffixes).lower() - if ".tar" in suffixes or ".gz" in suffixes or ".tgz" in suffixes: + if ( + ".tar" in suffixes + or ".gz" in suffixes + or ".tgz" in suffixes + or ".bz2" in suffixes + or ".xz" in suffixes + ): return "tar" elif ".zip" in suffixes: return "zip" @@ -122,25 +141,38 @@ def _verify_backup( ) -> bool: """Verify backup file integrity.""" try: + metadata = {} has_registry = has_index = has_metadata = False if format_type == "tar": with tarfile.open(backup_file, "r:*") as tar: names = tar.getnames() - has_registry = any("registry/" in name for name in names) - has_index = any("index/" in name for name in names) - has_metadata = "backup_metadata.json" in names + for name in names: + if name == "registry/": + has_registry = True + continue + if name == "index/": + has_index = True + continue + if name == "backup_metadata.json": + has_metadata = True + continue if has_metadata: metadata_member = tar.getmember("backup_metadata.json") if metadata_file := tar.extractfile(metadata_member): metadata = json.load(metadata_file) else: # zip - import zipfile - with zipfile.ZipFile(backup_file, "r") as zip_file: names = zip_file.namelist() - has_registry = any("registry/" in name for name in names) - has_index = any("index/" in name for name in names) - has_metadata = "backup_metadata.json" in names + for name in names: + if name == "registry/": + has_registry = True + continue + if name == "index/": + has_index = True + continue + if name == "backup_metadata.json": + has_metadata = True + continue if has_metadata: with zip_file.open("backup_metadata.json") as metadata_file: metadata = json.load(metadata_file) @@ -163,27 +195,42 @@ def _verify_backup( def _check_registry_exists(registry_service: MediaRegistryService) -> bool: """Check if a registry already exists.""" - try: - stats = registry_service.get_registry_stats() - return stats.get("total_media", 0) > 0 - except Exception: - return False + # TODO: Improve this check to be more robust + return registry_service.media_registry_dir.exists() and any( + registry_service.media_registry_dir.iterdir() + ) def _backup_current_registry( - registry_service: MediaRegistryService, api: str, feedback: FeedbackService + registry_service: MediaRegistryService, + api: str, + feedback: FeedbackService, + backup_format: str, + compression_fmt: str, ): """Create backup of current registry before restoring.""" - from .backup import _create_tar_backup timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") - backup_path = Path(f"viu_registry_pre_restore_{api}_{timestamp}.tar.gz") + if backup_format == "tar": + from .backup import _create_tar_backup - try: - _create_tar_backup(registry_service, backup_path, True, False, feedback, api) - feedback.success("Current Registry Backed Up", f"Saved to {backup_path}") - except Exception as e: - feedback.warning("Backup Warning", f"Failed to backup current registry: {e}") + backup_path = Path(f"viu_registry_pre_restore_{api}_{timestamp}.tar.gz") + + try: + _create_tar_backup( + registry_service, backup_path, True, False, feedback, api + ) + feedback.success("Current Registry Backed Up", f"Saved to {backup_path}") + except Exception as e: + feedback.warning( + "Backup Warning", f"Failed to backup current registry: {e}" + ) + else: + from .backup import _create_zip_backup + + backup_path = Path(f"viu_registry_pre_restore_{api}_{timestamp}.zip") + + _create_zip_backup(registry_service, backup_path, True, feedback, api) def _show_restore_summary(