+
+
+ Upload a ZIP file containing a valid Huntarr backup to restore from
+
+
+
+
+
+
+
+ Warning: This action will permanently overwrite your current database with the
+ uploaded backup.
+ All current data will be lost. Make sure you have a recent backup before proceeding.
+
+
+
+
+
+ Upload and Restore Backup
+
+
+
+
Available Backups
@@ -140,7 +198,6 @@
Available Backups
-
-
\ No newline at end of file
+
diff --git a/src/routes/backup_routes.py b/src/routes/backup_routes.py
index 000b3ebc..766cda11 100644
--- a/src/routes/backup_routes.py
+++ b/src/routes/backup_routes.py
@@ -9,9 +9,10 @@
import sqlite3
import time
import threading
+import zipfile
from datetime import datetime, timedelta
from pathlib import Path
-from flask import Blueprint, request, jsonify
+from flask import Blueprint, request, jsonify, send_file
from src.primary.utils.database import get_database
from src.primary.routes.common import get_user_for_request
import logging
@@ -159,13 +160,20 @@ def save_backup_settings(self, settings):
def create_backup(self, backup_type='manual', name=None):
"""Create a backup of all databases"""
try:
+ # Get current version
+ version_file = Path(__file__).parent.parent.parent / "version.txt"
+ version = "0.0.0" # Default in case version file is not found
+ if version_file.exists():
+ version = version_file.read_text().strip()
+
# Generate backup name if not provided
if not name:
- timestamp = datetime.now().strftime("%Y-%m-%d_%H-%M-%S")
- name = f"{backup_type}_backup_{timestamp}"
+ timestamp = datetime.now().strftime("%Y.%m.%d_%H.%M.%S")
+ name = f"huntarr_backup_v{version}_{timestamp}"
# Create backup folder with timestamp
backup_folder = self.backup_dir / name
+ logger.info(f"Creating backup folder: {backup_folder}")
backup_folder.mkdir(parents=True, exist_ok=True)
# Get all database paths
@@ -184,6 +192,7 @@ def create_backup(self, backup_type='manual', name=None):
for db_name, db_path in databases.items():
if Path(db_path).exists():
backup_db_path = backup_folder / f"{db_name}.db"
+ logger.info(f"Backing up {db_name} from {db_path} to {backup_db_path}")
# Force WAL checkpoint before backup
try:
@@ -213,9 +222,15 @@ def create_backup(self, backup_type='manual', name=None):
# Save backup metadata
metadata_path = backup_folder / "backup_info.json"
+ logger.info(f"Saving backup metadata to: {metadata_path}")
with open(metadata_path, 'w') as f:
json.dump(backup_info, f, indent=2)
+ # Verify that the backup folder and metadata exist
+ logger.info(f"Verifying backup folder contents:")
+ for item in backup_folder.iterdir():
+ logger.info(f" {item.name}")
+
# Clean up old backups based on retention policy
self._cleanup_old_backups()
@@ -266,8 +281,11 @@ def list_backups(self):
backups = []
if not self.backup_dir.exists():
+ logger.info(f"Backup directory does not exist: {self.backup_dir}")
return backups
+ logger.info(f"Looking for backups in: {self.backup_dir}")
+
for backup_folder in self.backup_dir.iterdir():
if backup_folder.is_dir():
metadata_path = backup_folder / "backup_info.json"
@@ -287,9 +305,12 @@ def list_backups(self):
'timestamp': datetime.fromtimestamp(backup_folder.stat().st_mtime).isoformat(),
'size': sum(f.stat().st_size for f in backup_folder.rglob('*.db') if f.is_file())
})
+ else:
+ logger.warning(f"Backup folder {backup_folder.name} does not contain backup_info.json")
# Sort by timestamp (newest first)
backups.sort(key=lambda x: x['timestamp'], reverse=True)
+ logger.info(f"Found {len(backups)} backups")
return backups
except Exception as e:
@@ -316,7 +337,15 @@ def restore_backup(self, backup_id):
databases = self._get_all_database_paths()
# Create backup of current databases before restore
- current_backup_name = f"pre_restore_backup_{int(time.time())}"
+ # Get current version
+ version_file = Path(__file__).parent.parent.parent / "version.txt"
+ version = "0.0.0" # Default in case version file is not found
+ if version_file.exists():
+ version = version_file.read_text().strip()
+
+ # Generate timestamp in the required format
+ timestamp = datetime.now().strftime("%Y.%m.%d_%H.%M.%S")
+ current_backup_name = f"huntarr_pre_restore_backup_v{version}_{timestamp}"
logger.info(f"Creating backup of current databases: {current_backup_name}")
self.create_backup('pre-restore', current_backup_name)
@@ -364,10 +393,79 @@ def restore_backup(self, backup_id):
def delete_backup(self, backup_id):
"""Delete a backup"""
try:
+ # Ensure we're using the exact backup ID that was stored
backup_folder = self.backup_dir / backup_id
+ # Debug: Log what we're looking for
+ logger.info(f"Looking for backup folder: {backup_folder}")
+ logger.info(f"Backup folder exists: {backup_folder.exists()}")
+
+ # If the exact path doesn't exist, let's see what files are actually there
if not backup_folder.exists():
- raise Exception(f"Backup not found: {backup_id}")
+ logger.info(f"Backup directory contents:")
+ if self.backup_dir.exists():
+ for item in self.backup_dir.iterdir():
+ logger.info(f" {item.name}")
+ if item.is_dir():
+ logger.info(f" Directory contents:")
+ for sub_item in item.iterdir():
+ logger.info(f" {sub_item.name}")
+ else:
+ logger.info("Backup directory does not exist!")
+
+ # Try to find a folder with a similar name (in case of encoding issues)
+ logger.info("Attempting to find backup folder with similar name...")
+ found_backup = None
+ for item in self.backup_dir.iterdir():
+ if item.is_dir() and backup_id in item.name:
+ logger.info(f"Found similar backup: {item.name}")
+ found_backup = item
+ break
+
+ # If we didn't find an exact match but we have backups with numeric IDs
+ # that might match the name pattern, let's also check for the numeric prefix
+ if not found_backup:
+ # Check if backup_id is a human-readable name and try to find a matching numeric backup
+ for item in self.backup_dir.iterdir():
+ if item.is_dir() and item.name.startswith("uploaded_backup_"):
+ # Check if the backup_info.json exists and has matching name
+ metadata_path = item / "backup_info.json"
+ if metadata_path.exists():
+ try:
+ with open(metadata_path, 'r') as f:
+ backup_info = json.load(f)
+ # Check if the backup name matches (case insensitive)
+ if backup_info.get('name', '').lower() == backup_id.lower():
+ logger.info(f"Found backup by name match: {item.name}")
+ found_backup = item
+ break
+ except Exception as e:
+ logger.warning(f"Could not read metadata for {item.name}: {e}")
+
+ if found_backup:
+ backup_folder = found_backup
+ else:
+ # Try a more flexible approach - check if backup_id might be a timestamp or pattern
+ # by looking for backups that have backup_info.json with matching name
+ for item in self.backup_dir.iterdir():
+ if item.is_dir():
+ metadata_path = item / "backup_info.json"
+ if metadata_path.exists():
+ try:
+ with open(metadata_path, 'r') as f:
+ backup_info = json.load(f)
+ # Check if backup_id matches the name or ID in the metadata
+ if (backup_info.get('id') == backup_id or
+ backup_info.get('name') == backup_id or
+ backup_id in backup_info.get('name', '')):
+ logger.info(f"Found backup by metadata match: {item.name}")
+ backup_folder = item
+ break
+ except Exception as e:
+ logger.warning(f"Could not read metadata for {item.name}: {e}")
+
+ if not backup_folder.exists():
+ raise Exception(f"Backup not found: {backup_id}")
shutil.rmtree(backup_folder)
logger.info(f"Backup deleted: {backup_id}")
@@ -626,4 +724,315 @@ def next_scheduled_backup():
except Exception as e:
logger.error(f"Error getting next backup time: {e}")
- return jsonify({"success": False, "error": str(e)}), 500
\ No newline at end of file
+ return jsonify({"success": False, "error": str(e)}), 500
+
+@backup_bp.route('/api/backup/download/', methods=['GET'])
+def download_backup(backup_id):
+ """Download a backup as a ZIP file"""
+ username = get_user_for_request()
+ if not username:
+ return jsonify({"success": False, "error": "Authentication required"}), 401
+
+ try:
+ # Validate backup exists
+ backup_folder = backup_manager.backup_dir / backup_id
+ if not backup_folder.exists():
+ return jsonify({"success": False, "error": "Backup not found"}), 404
+
+ # Create a temporary ZIP file
+ import tempfile
+ import uuid
+
+ # Create a unique temporary file name
+ temp_zip_path = Path(tempfile.gettempdir()) / f"backup_{uuid.uuid4()}.zip"
+
+ # Create ZIP file
+ with zipfile.ZipFile(temp_zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
+ # Add all files in backup folder to ZIP
+ for file_path in backup_folder.rglob('*'):
+ if file_path.is_file():
+ # Add file to ZIP with relative path
+ zipf.write(file_path, file_path.relative_to(backup_manager.backup_dir.parent))
+
+ # Return ZIP file as download
+ return send_file(
+ str(temp_zip_path),
+ as_attachment=True,
+ download_name=f"{backup_id}.zip"
+ )
+
+ except Exception as e:
+ logger.error(f"Error downloading backup: {e}")
+ return jsonify({"success": False, "error": str(e)}), 500
+
+@backup_bp.route('/api/backup/upload', methods=['POST'])
+def upload_backup():
+ """Upload and restore a backup from a ZIP file"""
+ username = get_user_for_request()
+ if not username:
+ return jsonify({"success": False, "error": "Authentication required"}), 401
+
+ try:
+ if 'backup_file' not in request.files:
+ return jsonify({"success": False, "error": "No backup file provided"}), 400
+
+ file = request.files['backup_file']
+ if file.filename == '':
+ return jsonify({"success": False, "error": "No file selected"}), 400
+
+ if not file or not file.filename.endswith('.zip'):
+ return jsonify({"success": False, "error": "Invalid file type. Please upload a .zip file"}), 400
+
+ # Create a temporary directory to extract the backup
+ import tempfile
+ import uuid
+
+ temp_dir = Path(tempfile.gettempdir()) / f"upload_backup_{uuid.uuid4()}"
+ temp_dir.mkdir(parents=True, exist_ok=True)
+
+ # Save uploaded file temporarily
+ temp_zip_path = temp_dir / "backup.zip"
+ file.save(str(temp_zip_path))
+
+ # Extract ZIP file
+ with zipfile.ZipFile(temp_zip_path, 'r') as zipf:
+ zipf.extractall(str(temp_dir))
+
+ # Find the backup metadata file - more robust approach
+ metadata_path = None
+
+ # Log the structure of the extracted directory for debugging
+ logger.info(f"Extracted directory structure: {list(temp_dir.rglob('*'))}")
+
+ # Method 1: Look for backup_info.json directly in the temp directory
+ for file_path in temp_dir.rglob('backup_info.json'):
+ metadata_path = file_path
+ break
+
+ # Method 2: Look for directories that contain backup_info.json
+ if not metadata_path:
+ for item in temp_dir.iterdir():
+ if item.is_dir():
+ backup_info_file = item / "backup_info.json"
+ if backup_info_file.exists():
+ metadata_path = backup_info_file
+ break
+
+ # Method 3: Look for any directory that contains both DB files and backup_info.json
+ if not metadata_path:
+ # Look for directories that contain database files
+ for item in temp_dir.iterdir():
+ if item.is_dir():
+ # Check if this directory contains database files
+ db_files = list(item.rglob('*.db'))
+ if db_files:
+ # Check if it also has backup_info.json
+ backup_info_file = item / "backup_info.json"
+ if backup_info_file.exists():
+ metadata_path = backup_info_file
+ break
+
+ # Method 4: Look for a specific pattern - if we have a directory with a backup_info.json file
+ # that's in a structure like: backup_name/backup_info.json or backup_name/some_other_dir/backup_info.json
+ if not metadata_path:
+ # Check if there's a structure like: backup_name/backup_info.json
+ # This is the most likely structure when we download and upload
+ for item in temp_dir.iterdir():
+ if item.is_dir():
+ # Look for backup_info.json in subdirectories
+ for sub_item in item.rglob('backup_info.json'):
+ metadata_path = sub_item
+ break
+ if metadata_path:
+ break
+
+ # Method 5: If we still haven't found it, try to be more flexible and look for any directory
+ # that contains backup_info.json in any subdirectory
+ if not metadata_path:
+ # Try to find backup_info.json anywhere in the structure
+ for item in temp_dir.rglob('backup_info.json'):
+ metadata_path = item
+ break
+
+ # Method 6: If we still haven't found it, let's try a more robust approach
+ # We'll examine the structure and try to identify the backup directory based on
+ # the presence of both database files and backup_info.json
+ if not metadata_path:
+ # Look for directories that contain both database files and backup_info.json
+ potential_backup_dirs = []
+ for item in temp_dir.iterdir():
+ if item.is_dir():
+ db_files = list(item.rglob('*.db'))
+ backup_info_file = item / "backup_info.json"
+ if db_files and backup_info_file.exists():
+ potential_backup_dirs.append((item, len(db_files)))
+
+ # Sort by number of DB files (more files = more likely to be the backup dir)
+ if potential_backup_dirs:
+ potential_backup_dirs.sort(key=lambda x: x[1], reverse=True)
+ backup_dir_in_extracted = potential_backup_dirs[0][0]
+ metadata_path = backup_dir_in_extracted / "backup_info.json"
+
+ # If we still don't have metadata_path, try to find it in a different way
+ if not metadata_path:
+ # Try to find any directory that has backup_info.json in it or any subdirectory
+ for item in temp_dir.rglob('backup_info.json'):
+ # Get the directory that contains this file
+ parent_dir = item.parent
+ # Make sure it's a direct child of temp_dir or a subdirectory
+ if parent_dir.parent == temp_dir or parent_dir.parent.parent == temp_dir:
+ metadata_path = item
+ break
+
+ # If we still don't have metadata_path, let's try to be more flexible
+ if not metadata_path:
+ # Look for any directory with backup_info.json in it
+ for item in temp_dir.iterdir():
+ if item.is_dir():
+ backup_info_file = item / "backup_info.json"
+ if backup_info_file.exists():
+ metadata_path = backup_info_file
+ break
+
+ # Log what we found for debugging
+ logger.info(f"Found metadata_path: {metadata_path}")
+ if metadata_path:
+ logger.info(f"Metadata path exists: {metadata_path.exists()}")
+
+ # Final check if we still don't have a valid metadata path
+ if not metadata_path or not metadata_path.exists():
+ # Let's list all directories and their contents for debugging
+ logger.info("Directories in temp_dir:")
+ for item in temp_dir.iterdir():
+ if item.is_dir():
+ logger.info(f" Directory: {item.name}")
+ logger.info(f" Contents: {list(item.iterdir())}")
+ backup_info_file = item / "backup_info.json"
+ logger.info(f" Has backup_info.json: {backup_info_file.exists()}")
+ db_files = list(item.rglob('*.db'))
+ logger.info(f" Has DB files: {len(db_files)}")
+ # Check subdirectories too
+ for sub_item in item.rglob('*'):
+ if sub_item.is_dir():
+ sub_backup_info = sub_item / "backup_info.json"
+ logger.info(f" Subdirectory {sub_item.name}: backup_info.json exists = {sub_backup_info.exists()}")
+
+ # Let's also check the zip contents to understand the structure
+ logger.info("ZIP contents:")
+ with zipfile.ZipFile(temp_zip_path, 'r') as zipf:
+ for info in zipf.infolist():
+ logger.info(f" ZIP entry: {info.filename}")
+
+ # Try one more approach - look for any directory with backup_info.json in it
+ # This handles cases where the backup structure might be different than expected
+ logger.info("Trying alternative approach to find backup directory...")
+ for item in temp_dir.rglob('backup_info.json'):
+ logger.info(f"Found backup_info.json in: {item}")
+ logger.info(f"Parent directory: {item.parent}")
+ logger.info(f"Parent directory contents: {list(item.parent.iterdir())}")
+ # Check if parent directory contains database files
+ db_files = list(item.parent.rglob('*.db'))
+ logger.info(f"Database files found: {len(db_files)}")
+ if db_files:
+ logger.info("Using this as backup directory")
+ metadata_path = item
+ break
+
+ # If we still haven't found it, raise the error with more details
+ if not metadata_path or not metadata_path.exists():
+ raise Exception("Invalid backup file: backup_info.json not found in expected location. " +
+ "The backup file structure may be corrupted or incompatible.")
+
+ # Load backup info
+ with open(metadata_path, 'r') as f:
+ backup_info = json.load(f)
+
+ # Create a new backup folder with the extracted content
+ backup_name = f"uploaded_backup_{int(time.time())}"
+ new_backup_dir = backup_manager.backup_dir / backup_name
+ new_backup_dir.mkdir(parents=True, exist_ok=True)
+
+ # Move extracted files to the new backup directory
+ # The structure in the ZIP is: backup_name/backup_info.json (etc)
+ # We want to copy the contents of the backup_name directory to the new backup directory
+
+ # Find the actual backup directory in the extracted structure
+ backup_dir_in_extracted = None
+
+ # Look for the directory that contains backup_info.json (this should be the backup directory)
+ for item in temp_dir.iterdir():
+ if item.is_dir() and (item / "backup_info.json").exists():
+ backup_dir_in_extracted = item
+ break
+
+ # If not found, try to find it in subdirectories
+ if not backup_dir_in_extracted:
+ for item in temp_dir.rglob('backup_info.json'):
+ parent_dir = item.parent
+ # Make sure it's a direct child of temp_dir or a subdirectory of it
+ if parent_dir.parent == temp_dir or parent_dir.parent.parent == temp_dir:
+ backup_dir_in_extracted = parent_dir
+ break
+
+ # If we still don't have the backup directory, try a more robust approach
+ if not backup_dir_in_extracted:
+ # Look for any directory that contains database files and backup_info.json
+ for item in temp_dir.iterdir():
+ if item.is_dir():
+ db_files = list(item.rglob('*.db'))
+ backup_info_file = item / "backup_info.json"
+ if db_files and backup_info_file.exists():
+ backup_dir_in_extracted = item
+ break
+
+ # If we still don't have it, copy everything directly
+ if not backup_dir_in_extracted:
+ logger.info("Using fallback approach - copying all files directly")
+ # Just copy all files directly from temp_dir
+ for file_path in temp_dir.rglob('*'):
+ if file_path.is_file() and file_path != temp_zip_path:
+ relative_path = file_path.relative_to(temp_dir)
+ target_path = new_backup_dir / relative_path
+ target_path.parent.mkdir(parents=True, exist_ok=True)
+ shutil.copy2(file_path, target_path)
+ else:
+ # Copy the contents of the backup directory to the new backup directory
+ logger.info(f"Copying from backup directory: {backup_dir_in_extracted}")
+ for file_path in backup_dir_in_extracted.rglob('*'):
+ if file_path.is_file():
+ # Calculate relative path from the backup directory root
+ relative_path = file_path.relative_to(backup_dir_in_extracted)
+ target_path = new_backup_dir / relative_path
+ target_path.parent.mkdir(parents=True, exist_ok=True)
+ shutil.copy2(file_path, target_path)
+
+ # Verify that the backup_info.json exists in the new location
+ new_metadata_path = new_backup_dir / "backup_info.json"
+ if not new_metadata_path.exists():
+ logger.error(f"Metadata file not found in new backup directory: {new_metadata_path}")
+ raise Exception("Backup metadata not found in restored backup")
+
+ # Restore the backup
+ restore_info = backup_manager.restore_backup(backup_name)
+
+ # Clean up temporary files
+ shutil.rmtree(temp_dir, ignore_errors=True)
+
+ return jsonify({
+ 'success': True,
+ 'message': 'Backup uploaded and restored successfully',
+ 'restore_info': restore_info
+ })
+
+ except Exception as e:
+ logger.error(f"Error uploading backup: {e}")
+ # Clean up temporary files if any
+ try:
+ import tempfile
+ temp_dirs = [f for f in Path(tempfile.gettempdir()).iterdir() if 'upload_backup_' in str(f)]
+ for temp_dir in temp_dirs:
+ if temp_dir.is_dir():
+ shutil.rmtree(temp_dir, ignore_errors=True)
+ except:
+ pass
+ return jsonify({"success": False, "error": str(e)}), 500