"""Database management module for DaemonControl. This module provides a singleton DatabaseManager class that handles all SQLite database operations for job scheduling and execution tracking. """ import sqlite3 from contextlib import contextmanager from datetime import datetime from pathlib import Path from typing import Dict, List, Optional from .config import ConfigManager from .logger import setup_logger class DatabaseManager: """Singleton SQLite database manager. Manages the SQLite database for jobs, schedules, and execution history. Uses WAL mode for better concurrency and enforces foreign key constraints. """ _instance: Optional['DatabaseManager'] = None def __new__(cls) -> 'DatabaseManager': """Ensure only one instance exists (singleton pattern).""" if cls._instance is None: cls._instance = super().__new__(cls) return cls._instance def __init__(self) -> None: """Initialize the database manager. Creates database file and schema if they don't exist. Only initializes once due to singleton pattern. """ # Avoid re-initialization for singleton if hasattr(self, '_initialized'): return self._config = ConfigManager() self._logger = setup_logger('database') # Get database path from config db_path_str = self._config.get('database', 'path') self._db_path = Path(db_path_str).expanduser() # Create database directory if needed self._db_path.parent.mkdir(parents=True, exist_ok=True) # Initialize database and schema self._init_database() self._initialized = True self._logger.info(f"Database initialized at {self._db_path}") def _init_database(self) -> None: """Initialize database connection and create schema.""" with self.get_connection() as conn: # Enable WAL mode for better concurrency conn.execute("PRAGMA journal_mode=WAL") # Enable foreign key constraints conn.execute("PRAGMA foreign_keys=ON") # Create schema self._create_schema(conn) def _create_schema(self, conn: sqlite3.Connection) -> None: """Create database schema if it doesn't exist. Args: conn: SQLite connection object """ # Schema version tracking conn.execute(""" CREATE TABLE IF NOT EXISTS schema_version ( version INTEGER PRIMARY KEY, applied_at TEXT NOT NULL ) """) # Jobs definition conn.execute(""" CREATE TABLE IF NOT EXISTS jobs ( id INTEGER PRIMARY KEY AUTOINCREMENT, name TEXT NOT NULL UNIQUE, job_type TEXT NOT NULL, executable_path TEXT NOT NULL, working_directory TEXT, timeout INTEGER DEFAULT 3600, enabled INTEGER DEFAULT 1, created_at TEXT NOT NULL, updated_at TEXT NOT NULL, description TEXT ) """) conn.execute(""" CREATE INDEX IF NOT EXISTS idx_jobs_name ON jobs(name) """) conn.execute(""" CREATE INDEX IF NOT EXISTS idx_jobs_enabled ON jobs(enabled) """) # Schedules conn.execute(""" CREATE TABLE IF NOT EXISTS schedules ( id INTEGER PRIMARY KEY AUTOINCREMENT, job_id INTEGER NOT NULL, cron_expression TEXT NOT NULL, enabled INTEGER DEFAULT 1, created_at TEXT NOT NULL, updated_at TEXT NOT NULL, FOREIGN KEY (job_id) REFERENCES jobs(id) ON DELETE CASCADE ) """) conn.execute(""" CREATE INDEX IF NOT EXISTS idx_schedules_job_id ON schedules(job_id) """) # Execution history conn.execute(""" CREATE TABLE IF NOT EXISTS executions ( id INTEGER PRIMARY KEY AUTOINCREMENT, job_id INTEGER NOT NULL, status TEXT NOT NULL, start_time TEXT NOT NULL, end_time TEXT, exit_code INTEGER, log_output TEXT, error_message TEXT, created_at TEXT NOT NULL, FOREIGN KEY (job_id) REFERENCES jobs(id) ON DELETE CASCADE ) """) conn.execute(""" CREATE INDEX IF NOT EXISTS idx_executions_job_id ON executions(job_id) """) conn.execute(""" CREATE INDEX IF NOT EXISTS idx_executions_status ON executions(status) """) conn.execute(""" CREATE INDEX IF NOT EXISTS idx_executions_start_time ON executions(start_time) """) # Fleet status table (for monitoring cache) conn.execute(""" CREATE TABLE IF NOT EXISTS fleet_status ( id INTEGER PRIMARY KEY AUTOINCREMENT, check_time TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP, check_type TEXT NOT NULL, status TEXT NOT NULL, data TEXT, message TEXT ) """) conn.execute(""" CREATE INDEX IF NOT EXISTS idx_fleet_status_type_time ON fleet_status(check_type, check_time DESC) """) # Insert schema version if not exists cursor = conn.execute("SELECT version FROM schema_version WHERE version = 1") if cursor.fetchone() is None: conn.execute( "INSERT INTO schema_version (version, applied_at) VALUES (?, ?)", (1, datetime.now().isoformat()) ) # Run migrations self._run_migrations(conn) def _run_migrations(self, conn: sqlite3.Connection) -> None: """Run database migrations to update schema. Args: conn: SQLite connection object """ # Check current schema version cursor = conn.execute("SELECT MAX(version) FROM schema_version") result = cursor.fetchone() current_version = result[0] if result[0] is not None else 0 # Migration to version 2: Add log_file_path to executions if current_version < 2: self._logger.info("Running migration to schema version 2...") try: # Check if column already exists cursor = conn.execute("PRAGMA table_info(executions)") columns = [row[1] for row in cursor.fetchall()] if 'log_file_path' not in columns: conn.execute(""" ALTER TABLE executions ADD COLUMN log_file_path TEXT """) self._logger.info("Added log_file_path column to executions table") # Update schema version conn.execute( "INSERT INTO schema_version (version, applied_at) VALUES (?, ?)", (2, datetime.now().isoformat()) ) self._logger.info("Schema migrated to version 2") except Exception as e: self._logger.error(f"Migration to version 2 failed: {e}") raise @contextmanager def get_connection(self): """Context manager for database connections. Automatically commits on success and rolls back on exception. Ensures connections are properly closed. Yields: sqlite3.Connection: Database connection Example: >>> with db.get_connection() as conn: ... conn.execute("INSERT INTO jobs ...") """ conn = sqlite3.connect(str(self._db_path)) conn.row_factory = sqlite3.Row # Enable column access by name try: yield conn conn.commit() except Exception as e: conn.rollback() self._logger.error(f"Database error: {e}") raise finally: conn.close() def get_schema_version(self) -> int: """Get current schema version. Returns: Current schema version number """ with self.get_connection() as conn: cursor = conn.execute("SELECT MAX(version) as version FROM schema_version") row = cursor.fetchone() return row['version'] if row['version'] is not None else 0 def get_enabled_jobs(self) -> List[Dict]: """Get all enabled jobs with their schedules. Returns: List of job dictionaries with schedule information """ with self.get_connection() as conn: cursor = conn.execute(""" SELECT j.id, j.name, j.job_type, j.executable_path, j.working_directory, j.timeout, j.description, s.id as schedule_id, s.cron_expression, s.enabled as schedule_enabled FROM jobs j LEFT JOIN schedules s ON j.id = s.job_id WHERE j.enabled = 1 ORDER BY j.name """) jobs = [] for row in cursor.fetchall(): jobs.append(dict(row)) return jobs def create_job( self, name: str, job_type: str, executable_path: str, working_directory: Optional[str] = None, timeout: int = 3600, enabled: bool = True, description: Optional[str] = None ) -> int: """Create a new job. Args: name: Unique job name job_type: Type of job ('script' or 'python_module') executable_path: Path to executable or module working_directory: Working directory for execution timeout: Execution timeout in seconds enabled: Whether job is enabled description: Job description Returns: ID of created job Raises: sqlite3.IntegrityError: If job name already exists """ now = datetime.now().isoformat() with self.get_connection() as conn: cursor = conn.execute(""" INSERT INTO jobs ( name, job_type, executable_path, working_directory, timeout, enabled, created_at, updated_at, description ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) """, ( name, job_type, executable_path, working_directory, timeout, 1 if enabled else 0, now, now, description )) job_id = cursor.lastrowid self._logger.info(f"Created job '{name}' with ID {job_id}") return job_id def create_execution(self, job_id: int, status: str = 'queued') -> int: """Create execution record. Args: job_id: ID of job being executed status: Initial status ('queued', 'running', etc.) Returns: ID of created execution record """ now = datetime.now().isoformat() with self.get_connection() as conn: cursor = conn.execute(""" INSERT INTO executions ( job_id, status, start_time, created_at ) VALUES (?, ?, ?, ?) """, (job_id, status, now, now)) execution_id = cursor.lastrowid self._logger.debug(f"Created execution {execution_id} for job {job_id}") return execution_id def update_execution( self, execution_id: int, status: Optional[str] = None, end_time: Optional[str] = None, exit_code: Optional[int] = None, log_output: Optional[str] = None, error_message: Optional[str] = None ) -> None: """Update execution record. Args: execution_id: ID of execution to update status: New status end_time: Execution end time (ISO 8601) exit_code: Process exit code log_output: Captured log output error_message: Error message if failed """ # Build dynamic UPDATE query based on provided parameters updates = [] params = [] if status is not None: updates.append("status = ?") params.append(status) if end_time is not None: updates.append("end_time = ?") params.append(end_time) if exit_code is not None: updates.append("exit_code = ?") params.append(exit_code) if log_output is not None: updates.append("log_output = ?") params.append(log_output) if error_message is not None: updates.append("error_message = ?") params.append(error_message) if not updates: return # Nothing to update params.append(execution_id) query = f"UPDATE executions SET {', '.join(updates)} WHERE id = ?" with self.get_connection() as conn: conn.execute(query, params) self._logger.debug(f"Updated execution {execution_id}") # =========================================================================== # Fleet Status Methods # =========================================================================== def save_fleet_status(self, check_type: str, status: str, data: str = None, message: str = None): """Save fleet monitoring status to database. Args: check_type: Type of check ('docker', 'tunnel', 'backup', 'disk', 'overall') status: Status ('healthy', 'warning', 'critical', 'unknown') data: JSON string with detailed data message: Human-readable message """ with self.get_connection() as conn: conn.execute(''' INSERT INTO fleet_status (check_type, status, data, message) VALUES (?, ?, ?, ?) ''', (check_type, status, data, message)) self._logger.debug(f"Saved fleet status: {check_type} = {status}") def get_latest_fleet_status(self) -> dict: """Get latest status for all check types. Returns: dict: Latest status for each check type { 'docker': {'status': 'healthy', 'data': {...}, 'message': '...', 'age_seconds': 123}, 'tunnel': {...}, 'backup': {...}, 'disk': {...}, 'overall': {...} } """ import json result = {} for check_type in ['docker', 'tunnel', 'backup', 'disk', 'overall']: with self.get_connection() as conn: cursor = conn.execute(''' SELECT status, data, message, check_time FROM fleet_status WHERE check_type = ? ORDER BY check_time DESC LIMIT 1 ''', (check_type,)) row = cursor.fetchone() if row: from datetime import datetime check_time = datetime.fromisoformat(row['check_time']) # Use UTC for age calculation since DB timestamps are in UTC age_seconds = (datetime.utcnow() - check_time).total_seconds() result[check_type] = { 'status': row['status'], 'data': json.loads(row['data']) if row['data'] else None, 'message': row['message'], 'check_time': row['check_time'], 'age_seconds': int(age_seconds) } else: # No data yet result[check_type] = { 'status': 'unknown', 'data': None, 'message': 'No data available yet', 'check_time': None, 'age_seconds': None } return result def cleanup_old_fleet_status(self, keep_hours: int = 24): """Delete old fleet status records. Args: keep_hours: Keep records from last N hours (default 24) """ with self.get_connection() as conn: conn.execute(''' DELETE FROM fleet_status WHERE check_time < datetime('now', '-' || ? || ' hours') ''', (keep_hours,)) self._logger.debug(f"Cleaned up fleet status older than {keep_hours} hours")