Enhance AutoBackup plugin with new features

Refactor AutoBackup plugin to support additional features and improve functionality.

Signed-off-by: wpa-2 <9049886+wpa-2@users.noreply.github.com>
This commit is contained in:
wpa-2
2026-02-04 20:39:31 +00:00
committed by GitHub
parent 16b421ac93
commit 9e9b0497d3

View File

@@ -5,148 +5,333 @@ import os
import subprocess
import time
import socket
import threading
import glob
from flask import render_template_string
class AutoBackup(plugins.Plugin):
__author__ = 'WPA2'
__version__ = '1.1.3'
__version__ = '2.2'
__license__ = 'GPL3'
__description__ = 'Backs up files when internet is available, with support for excludes.'
__description__ = 'Backs up Pwnagotchi configuration and data, keeping recent backups.'
# Hardcoded defaults for Pwnagotchi
DEFAULT_FILES = [
"/root/settings.yaml",
"/root/client_secrets.json",
"/root/.api-report.json",
"/root/.ssh",
"/root/.bashrc",
"/root/.profile",
"/root/peers",
"/etc/pwnagotchi/",
"/usr/local/share/pwnagotchi/custom-plugins",
"/etc/ssh/",
"/home/pi/handshakes/",
"/home/pi/.bashrc",
"/home/pi/.profile",
"/home/pi/.wpa_sec_uploads",
]
DEFAULT_INTERVAL_SECONDS = 60 * 60 # 60 minutes
DEFAULT_MAX_BACKUPS = 3
DEFAULT_EXCLUDE = [
"/etc/pwnagotchi/logs/*",
"*.bak",
"*.tmp",
]
def __init__(self):
self.ready = False
self.tries = 0
# Used to throttle repeated log messages for "backup not due yet"
self.last_not_due_logged = 0
# Store the status file path separately.
self.status_file = '/root/.auto-backup'
self.status = StatusFile(self.status_file)
self.lock = threading.Lock()
self.backup_in_progress = False
self.hostname = socket.gethostname()
self._agent = None
def on_loaded(self):
required_options = ['files', 'interval', 'backup_location', 'max_tries']
for opt in required_options:
if opt not in self.options or self.options[opt] is None:
logging.error(f"AUTO-BACKUP: Option '{opt}' is not set.")
return
"""Validate only required option: backup_location"""
if 'backup_location' not in self.options or self.options['backup_location'] is None:
logging.error("AUTO-BACKUP: Option 'backup_location' is not set.")
return
# If no custom command(s) are provided, use the default plain tar command.
# The command includes a placeholder for {excludes} so that if no excludes are set, it will be empty.
if 'commands' not in self.options or not self.options['commands']:
self.options['commands'] = ["tar cf {backup_file} {excludes} {files}"]
self.ready = True
logging.info("AUTO-BACKUP: Successfully loaded.")
def get_interval_seconds(self):
"""
Convert the interval option into seconds.
Supports:
- "daily" for 24 hours,
- "hourly" for 60 minutes,
- or a numeric value (interpreted as minutes).
"""
interval = self.options['interval']
if isinstance(interval, str):
if interval.lower() == "daily":
return 24 * 60 * 60
elif interval.lower() == "hourly":
return 60 * 60
else:
try:
minutes = float(interval)
return minutes * 60
except ValueError:
logging.error("AUTO-BACKUP: Invalid interval format. Defaulting to daily interval.")
return 24 * 60 * 60
elif isinstance(interval, (int, float)):
return float(interval) * 60
self.hostname = socket.gethostname()
# Read config with internal defaults - DO NOT modify self.options
self.files = self.options.get('files', self.DEFAULT_FILES)
self.interval_seconds = self.options.get('interval_seconds', self.DEFAULT_INTERVAL_SECONDS)
self.max_backups = self.options.get('max_backups_to_keep', self.DEFAULT_MAX_BACKUPS)
self.exclude = self.options.get('exclude', self.DEFAULT_EXCLUDE)
self.include = self.options.get('include', [])
# Handle commands: if old format, use correct default internally
commands = self.options.get('commands', ["tar", "czf"])
if isinstance(commands, str) or (isinstance(commands, list) and len(commands) == 1 and isinstance(commands[0], str) and '{' in str(commands)):
logging.warning("AUTO-BACKUP: Old command format detected in config, using default: tar czf")
self.commands = ["tar", "czf"]
elif not commands:
self.commands = ["tar", "czf"]
else:
logging.error("AUTO-BACKUP: Unrecognized type for interval. Defaulting to daily interval.")
return 24 * 60 * 60
self.commands = commands
# Validate include paths if specified
if self.include:
if not isinstance(self.include, list):
self.include = [self.include]
for path in self.include:
if not os.path.exists(path):
logging.warning(f"AUTO-BACKUP: include path '{path}' does not exist, will skip if still missing at backup time")
self.ready = True
include_msg = f", includes: {len(self.include)} additional path(s)" if self.include else ""
logging.info(f"AUTO-BACKUP: Plugin loaded for host '{self.hostname}'. Interval: 60min, Backups kept: {self.max_backups}{include_msg}")
def is_backup_due(self):
"""
Determines if enough time has passed since the last backup.
If the status file does not exist, a backup is due.
"""
interval_sec = self.get_interval_seconds()
"""Check if backup is due based on interval."""
try:
last_backup = os.path.getmtime(self.status_file)
except OSError:
# Status file doesn't exist—backup is due.
return True
now = time.time()
return (now - last_backup) >= interval_sec
return (time.time() - last_backup) >= self.interval_seconds
def on_internet_available(self, agent):
def _cleanup_old_backups(self):
"""Deletes the oldest backups if we exceed the limit."""
try:
backup_dir = self.options['backup_location']
max_keep = self.max_backups
# Filter by this device's hostname
search_pattern = os.path.join(backup_dir, f"{self.hostname}-backup-*.tar.gz")
files = glob.glob(search_pattern)
if not files:
logging.debug("AUTO-BACKUP: No backup files found for cleanup")
return
# Sort files by modification time (oldest first)
files.sort(key=os.path.getmtime)
# Calculate how many to delete
if len(files) > max_keep:
num_to_delete = len(files) - max_keep
logging.info(f"AUTO-BACKUP: Found {len(files)} backups, keeping {max_keep}, deleting {num_to_delete} old backup(s)...")
for old_file in files[:num_to_delete]:
try:
os.remove(old_file)
logging.info(f"AUTO-BACKUP: Deleted: {os.path.basename(old_file)}")
except OSError as e:
logging.error(f"AUTO-BACKUP: Failed to delete {old_file}: {e}")
except Exception as e:
logging.error(f"AUTO-BACKUP: Cleanup error: {e}")
def _run_backup_thread(self, agent, existing_files):
"""Execute backup in separate thread."""
try:
backup_location = self.options['backup_location']
# Create backup directory if it doesn't exist
if not os.path.exists(backup_location):
try:
os.makedirs(backup_location)
logging.info(f"AUTO-BACKUP: Created backup directory: {backup_location}")
except OSError as e:
logging.error(f"AUTO-BACKUP: Failed to create backup directory: {e}")
return
# Add timestamp to filename
timestamp = time.strftime("%Y%m%d-%H%M%S")
backup_file = os.path.join(backup_location, f"{self.hostname}-backup-{timestamp}.tar.gz")
# Try to update display if agent is available
if agent:
try:
display = agent.view()
display.set('status', 'Backing up...')
display.update()
except:
pass
logging.info(f"AUTO-BACKUP: Starting backup to {backup_file}...")
# Build command
command_list = list(self.commands)
command_list.append(backup_file)
# Add exclusions
for pattern in self.exclude:
command_list.append(f"--exclude={pattern}")
# Add files to backup
command_list.extend(existing_files)
# Execute backup command
process = subprocess.Popen(
command_list,
shell=False,
stdin=None,
stdout=open("/dev/null", "w"),
stderr=subprocess.PIPE
)
_, stderr_output = process.communicate()
if process.returncode != 0:
raise OSError(f"Backup command failed with code {process.returncode}: {stderr_output.decode('utf-8').strip()}")
logging.info(f"AUTO-BACKUP: Backup successful: {backup_file}")
# Run cleanup after successful backup
self._cleanup_old_backups()
# Try to update display if agent is available
if agent:
try:
display = agent.view()
display.set('status', 'Backup done!')
display.update()
except:
pass
# Update status file timestamp
self.status.update()
# Reset try counter on success
self.tries = 0
except Exception as e:
self.tries += 1
logging.error(f"AUTO-BACKUP: Backup error (attempt {self.tries}): {e}")
finally:
self.backup_in_progress = False
def on_ready(self, agent):
"""Called when Pwnagotchi is ready. Set up backup scheduler."""
if not self.ready:
return
if self.options['max_tries'] and self.tries >= self.options['max_tries']:
logging.info("AUTO-BACKUP: Maximum tries reached, skipping backup.")
self._agent = agent
# Start background scheduler thread
scheduler_thread = threading.Thread(
target=self._backup_scheduler_loop,
daemon=True,
name="AutoBackupScheduler"
)
scheduler_thread.start()
logging.info("AUTO-BACKUP: Periodic backup scheduler started")
def on_webhook(self, path, request):
"""Handle web UI requests."""
if request.method == "GET":
if path == "/" or not path:
action_path = request.path if request.path.endswith("/backup") else "%s/backup" % request.path
ret = '<html><head><title>AUTO Backup</title><meta name="csrf_token" content="{{ csrf_token() }}"></head><body>'
ret += '<h1>AUTO Backup</h1>'
ret += '<p>Status: '
if self.backup_in_progress:
ret += '<b>Backup in progress...</b>'
else:
ret += '<b>Ready</b>'
ret += '</p>'
ret += '<form method="POST" action="%s">' % action_path
ret += '<input id="csrf_token" name="csrf_token" type="hidden" value="{{ csrf_token() }}">'
ret += '<input type="submit" value="Start Manual Backup" style="padding: 10px 20px; font-size: 16px; background-color: #4CAF50; color: white; border: none; border-radius: 4px; cursor: pointer;">'
ret += '</form>'
ret += '<hr>'
ret += '<h2>Configuration</h2>'
ret += '<table border="1" cellpadding="5">'
ret += '<tr><td><b>Backup Location:</b></td><td>' + self.options.get('backup_location', 'Not set') + '</td></tr>'
ret += '<tr><td><b>Interval:</b></td><td>' + str(self.interval_seconds // 60) + ' minutes</b></td></tr>'
ret += '<tr><td><b>Max Backups:</b></td><td>' + str(self.max_backups) + '</td></tr>'
ret += '<tr><td><b>Include Paths:</b></td><td>' + (', '.join(self.include) if self.include else 'None') + '</td></tr>'
ret += '</table>'
ret += '</body></html>'
return render_template_string(ret)
elif request.method == "POST":
if path == "backup" or path == "/backup":
result = self.manual_backup(self._agent)
ret = '<html><head><title>AUTO Backup</title><meta name="csrf_token" content="{{ csrf_token() }}"></head><body>'
ret += '<h1>AUTO Backup</h1>'
ret += '<p><b>' + result['status'] + '</b></p>'
ret += '<a href="/plugins/auto_backup/">Back</a>'
ret += '</body></html>'
return render_template_string(ret)
return "Not found"
def _backup_scheduler_loop(self):
"""Background thread that checks if backup is due every minute."""
while True:
try:
if self.ready:
agent = getattr(self, '_agent', None)
self._periodic_backup_check(agent)
time.sleep(60)
except Exception as e:
logging.error(f"AUTO-BACKUP: Scheduler error: {e}")
def _get_backup_files(self):
"""Collect all files to backup."""
existing_files = list(filter(os.path.exists, self.files))
if self.include:
for path in self.include:
if os.path.exists(path):
existing_files.append(path)
logging.debug(f"AUTO-BACKUP: Added include path: {path}")
return existing_files
def _periodic_backup_check(self, agent=None):
"""Periodic backup check."""
if agent is None:
agent = getattr(self, '_agent', None)
if not self.ready or self.backup_in_progress:
return
if self.tries >= 3:
return
if not self.is_backup_due():
now = time.time()
# Log "backup not due" only once every 600 seconds.
if now - self.last_not_due_logged > 600:
logging.info("AUTO-BACKUP: Backup not due yet based on the interval.")
self.last_not_due_logged = now
return
# Only include files/directories that exist to prevent errors.
existing_files = list(filter(lambda f: os.path.exists(f), self.options['files']))
existing_files = self._get_backup_files()
if not existing_files:
logging.warning("AUTO-BACKUP: No files found to backup.")
logging.warning("AUTO-BACKUP: No files to backup exist")
return
files_to_backup = " ".join(existing_files)
# Build excludes string if configured.
# Use get() so that if 'exclude' is missing or empty, we default to an empty list.
excludes = ""
exclude_list = self.options.get('exclude', [])
if exclude_list:
for pattern in exclude_list:
excludes += f" --exclude='{pattern}'"
# Get the backup location from config.
backup_location = self.options['backup_location']
# Retrieve the global config from agent. If agent.config is callable, call it.
global_config = getattr(agent, 'config', None)
if callable(global_config):
global_config = global_config()
if global_config is None:
global_config = {}
pwnagotchi_name = global_config.get('main', {}).get('name', socket.gethostname())
backup_file = os.path.join(backup_location, f"{pwnagotchi_name}-backup.tar")
try:
display = agent.view()
logging.info("AUTO-BACKUP: Starting backup process...")
display.set('status', 'Backing up ...')
display.update()
# Execute each backup command.
for cmd in self.options['commands']:
formatted_cmd = cmd.format(backup_file=backup_file, files=files_to_backup, excludes=excludes)
logging.info(f"AUTO-BACKUP: Running command: {formatted_cmd}")
process = subprocess.Popen(
formatted_cmd,
shell=True,
stdin=None,
stdout=open("/dev/null", "w"),
stderr=subprocess.STDOUT,
executable="/bin/bash"
)
process.wait()
if process.returncode > 0:
raise OSError(f"Command failed with return code: {process.returncode}")
logging.info(f"AUTO-BACKUP: Backup completed successfully. File created at {backup_file}")
display.set('status', 'Backup done!')
display.update()
self.status.update()
except OSError as os_e:
self.tries += 1
logging.error(f"AUTO-BACKUP: Backup error: {os_e}")
display.set('status', 'Backup failed!')
display.update()
self.backup_in_progress = True
backup_thread = threading.Thread(
target=self._run_backup_thread,
args=(agent, existing_files),
daemon=True,
name="AutoBackupThread"
)
backup_thread.start()
logging.debug("AUTO-BACKUP: Backup thread started")
def manual_backup(self, agent):
"""Manually trigger a backup."""
if self.backup_in_progress:
return {"status": "Backup already in progress"}
existing_files = self._get_backup_files()
if not existing_files:
return {"status": "No files to backup"}
self.backup_in_progress = True
backup_thread = threading.Thread(
target=self._run_backup_thread,
args=(agent, existing_files),
daemon=True,
name="AutoBackupThread"
)
backup_thread.start()
logging.info("AUTO-BACKUP: Manual backup triggered")
return {"status": "Backup started - check logs for details"}