Source code for neosqlite.migration

"""
Auto-vacuum migration utilities for NeoSQLite.

Provides functionality to migrate SQLite databases to different auto_vacuum modes
while preserving all data.
"""

from __future__ import annotations

import logging
import os
import shutil

from ._sqlite import sqlite3

logger = logging.getLogger(__name__)


[docs] def needs_migration( db: sqlite3.Connection, target_autovacuum: int, ) -> bool: """ Check if the database needs auto_vacuum migration. Args: db: SQLite connection target_autovacuum: Desired auto_vacuum mode Returns: True if migration is needed, False otherwise """ current = db.execute("PRAGMA auto_vacuum").fetchone()[0] return current != target_autovacuum
[docs] def get_journal_mode(db: sqlite3.Connection) -> str: """Get the current journal mode of a database.""" return db.execute("PRAGMA journal_mode").fetchone()[0]
[docs] def checkpoint_and_prepare_for_migration( db: sqlite3.Connection, ) -> list[str]: """ Ensure database is ready for migration by checkpointing WAL. Args: db: SQLite connection Returns: List of files that need to be backed up (main db + wal/shm if exist) """ # Ensure all WAL data is written to main file try: db.execute("PRAGMA wal_checkpoint(FULL)") except Exception as e: logger.warning(f"WAL checkpoint failed during migration: {e}") pass try: db.execute("COMMIT") except Exception as e: logger.warning(f"COMMIT failed during migration: {e}") pass return []
[docs] def migrate_autovacuum( db_path: str, target_autovacuum: int, target_journal_mode: str = "WAL", extra_conn_kwargs: dict | None = None, ) -> bool: """ Migrate a database to a different auto_vacuum mode. This function: 1. Check if migration is actually needed 2. Checkpoints any WAL data 3. Backs up all database files (main + WAL + SHM) 4. Closes the database 5. Opens backup and VACUUM INTO new file with desired auto_vacuum 6. Replaces original with vacuumed file Args: db_path: Path to the database file target_autovacuum: Desired auto_vacuum mode (0, 1, or 2) target_journal_mode: Desired journal mode (default: WAL) extra_conn_kwargs: Extra arguments for sqlite3.connect() Returns: True if migration succeeded, False if skipped (already correct) """ conn = sqlite3.connect(db_path) try: if not needs_migration(conn, target_autovacuum): return False finally: conn.close() import time timestamp = int(time.time() * 1000) wal_path = f"{db_path}-wal" shm_path = f"{db_path}-shm" files_to_backup = [db_path] if os.path.exists(wal_path): files_to_backup.append(wal_path) if os.path.exists(shm_path): files_to_backup.append(shm_path) backup_files: dict[str, str] = {} for file_path in files_to_backup: backup_path = f"{file_path}.backup_{timestamp}" shutil.copy2(file_path, backup_path) backup_files[file_path] = backup_path original_backup_path = backup_files[db_path] temp_new_path = f"{db_path}.temp_autovacuum_{timestamp}" try: conn = sqlite3.connect(original_backup_path) conn.isolation_level = None old_journal = conn.execute("PRAGMA journal_mode").fetchone()[0] if old_journal != "wal": conn.execute("PRAGMA journal_mode=WAL") conn.execute(f"PRAGMA auto_vacuum={target_autovacuum}") if os.path.exists(temp_new_path): os.remove(temp_new_path) conn.execute(f"VACUUM INTO '{temp_new_path}'") conn.close() shutil.move(temp_new_path, db_path) for backup_path in backup_files.values(): if os.path.exists(backup_path): os.remove(backup_path) if os.path.exists(wal_path): os.remove(wal_path) if os.path.exists(shm_path): os.remove(shm_path) return True except Exception as e: logger.error(f"Migration failed, restoring backup: {e}") for original_path, backup_path in backup_files.items(): if os.path.exists(backup_path): shutil.move(backup_path, original_path) raise
[docs] def should_migrate() -> bool: """ Check if auto_vacuum migration is enabled via environment variable. Returns: True if AUTOVACUUM_MIGRATION=1 is set, False otherwise """ return os.environ.get("AUTOVACUUM_MIGRATION", "0") == "1"