"""Test suite for DaemonControl foundation infrastructure. Tests cover: - Database creation and schema - Configuration management - Logger setup - Singleton patterns - CRUD operations """ import os import sqlite3 import tempfile from pathlib import Path import pytest # Import modules to test import sys sys.path.insert(0, str(Path(__file__).parent.parent)) from core import ConfigManager, DatabaseManager, setup_logger class TestDatabaseManager: """Tests for DatabaseManager class.""" @pytest.fixture def temp_db_path(self, tmp_path, monkeypatch): """Create temporary database path for testing.""" db_path = tmp_path / "test_daemon.db" # Mock ConfigManager to use temp path def mock_get(section, key, default=None): if section == 'database' and key == 'path': return str(db_path) return default # Reset DatabaseManager singleton DatabaseManager._instance = None monkeypatch.setattr(ConfigManager, 'get', mock_get) yield db_path # Cleanup singleton DatabaseManager._instance = None def test_database_creation(self, temp_db_path): """Test database creates schema correctly.""" db = DatabaseManager() # Check database file was created assert temp_db_path.exists() # Check schema version assert db.get_schema_version() == 1 # Verify tables exist with db.get_connection() as conn: cursor = conn.execute( "SELECT name FROM sqlite_master WHERE type='table'" ) tables = {row['name'] for row in cursor.fetchall()} expected_tables = {'schema_version', 'jobs', 'schedules', 'executions'} assert expected_tables.issubset(tables) def test_database_singleton(self, temp_db_path): """Test DatabaseManager is singleton.""" db1 = DatabaseManager() db2 = DatabaseManager() assert db1 is db2 def test_foreign_keys_enabled(self, temp_db_path): """Test foreign key constraints are enabled.""" db = DatabaseManager() with db.get_connection() as conn: cursor = conn.execute("PRAGMA foreign_keys") result = cursor.fetchone() assert result[0] == 1 # Foreign keys enabled def test_create_job(self, temp_db_path): """Test creating a job in database.""" db = DatabaseManager() job_id = db.create_job( name="test_job", job_type="script", executable_path="/usr/bin/echo", description="Test job" ) assert job_id > 0 # Verify job was created with db.get_connection() as conn: cursor = conn.execute("SELECT * FROM jobs WHERE id = ?", (job_id,)) job = cursor.fetchone() assert job is not None assert job['name'] == "test_job" assert job['job_type'] == "script" assert job['executable_path'] == "/usr/bin/echo" assert job['enabled'] == 1 def test_create_duplicate_job(self, temp_db_path): """Test creating job with duplicate name fails.""" db = DatabaseManager() db.create_job( name="duplicate", job_type="script", executable_path="/bin/true" ) # Second job with same name should fail with pytest.raises(sqlite3.IntegrityError): db.create_job( name="duplicate", job_type="script", executable_path="/bin/false" ) def test_get_enabled_jobs(self, temp_db_path): """Test querying enabled jobs.""" db = DatabaseManager() # Create enabled job db.create_job( name="enabled_job", job_type="script", executable_path="/bin/true", enabled=True ) # Create disabled job db.create_job( name="disabled_job", job_type="script", executable_path="/bin/false", enabled=False ) jobs = db.get_enabled_jobs() # Should only return enabled job assert len(jobs) == 1 assert jobs[0]['name'] == "enabled_job" def test_create_execution(self, temp_db_path): """Test creating execution record.""" db = DatabaseManager() # Create job first job_id = db.create_job( name="exec_test", job_type="script", executable_path="/bin/true" ) # Create execution exec_id = db.create_execution(job_id, status='queued') assert exec_id > 0 # Verify execution with db.get_connection() as conn: cursor = conn.execute("SELECT * FROM executions WHERE id = ?", (exec_id,)) execution = cursor.fetchone() assert execution is not None assert execution['job_id'] == job_id assert execution['status'] == 'queued' def test_update_execution(self, temp_db_path): """Test updating execution record.""" db = DatabaseManager() job_id = db.create_job( name="update_test", job_type="script", executable_path="/bin/true" ) exec_id = db.create_execution(job_id) # Update execution db.update_execution( exec_id, status='success', exit_code=0, error_message=None ) # Verify update with db.get_connection() as conn: cursor = conn.execute("SELECT * FROM executions WHERE id = ?", (exec_id,)) execution = cursor.fetchone() assert execution['status'] == 'success' assert execution['exit_code'] == 0 class TestConfigManager: """Tests for ConfigManager class.""" @pytest.fixture def temp_config_path(self, tmp_path, monkeypatch): """Create temporary config path for testing.""" config_dir = tmp_path / ".config" / "daemon-control" config_file = config_dir / "config.json" # Mock the config file path monkeypatch.setattr( ConfigManager, '_ConfigManager__init__', lambda self: None # Skip init ) # Reset singleton ConfigManager._instance = None yield config_dir, config_file # Cleanup ConfigManager._instance = None def test_config_singleton(self): """Test ConfigManager is singleton.""" # Reset singleton ConfigManager._instance = None cfg1 = ConfigManager() cfg2 = ConfigManager() assert cfg1 is cfg2 # Cleanup ConfigManager._instance = None def test_config_defaults(self): """Test ConfigManager loads defaults.""" # Reset singleton ConfigManager._instance = None cfg = ConfigManager() # Check default values exist db_path = cfg.get('database', 'path') assert db_path is not None assert 'daemon.db' in db_path log_level = cfg.get('logging', 'level') assert log_level == 'INFO' # Cleanup ConfigManager._instance = None def test_config_path_expansion(self): """Test ~ and env var expansion.""" # Reset singleton ConfigManager._instance = None cfg = ConfigManager() # Test ~ expansion db_path = cfg.get('database', 'path') assert '~' not in db_path assert db_path.startswith('/') # Cleanup ConfigManager._instance = None def test_config_get_default(self): """Test get with default value.""" # Reset singleton ConfigManager._instance = None cfg = ConfigManager() # Non-existent key should return default value = cfg.get('nonexistent', 'key', default='default_value') assert value == 'default_value' # Cleanup ConfigManager._instance = None class TestLogger: """Tests for logger setup.""" def test_logger_creation(self, tmp_path): """Test logger creates log files.""" log_file = tmp_path / "test.log" logger = setup_logger('test_logger', str(log_file), 'INFO') assert logger is not None assert logger.name == 'test_logger' # Write a log message logger.info("Test message") # Check log file was created assert log_file.exists() # Check log content content = log_file.read_text() assert "Test message" in content assert "INFO" in content def test_logger_console_only(self): """Test logger works without file output.""" logger = setup_logger('console_logger', None, 'DEBUG') assert logger is not None # Should not raise exception logger.debug("Console only message") def test_logger_levels(self, tmp_path): """Test different log levels.""" log_file = tmp_path / "levels.log" logger = setup_logger('level_test', str(log_file), 'WARNING') logger.debug("Debug message") # Should not appear logger.info("Info message") # Should not appear logger.warning("Warning message") # Should appear logger.error("Error message") # Should appear content = log_file.read_text() assert "Debug message" not in content assert "Info message" not in content assert "Warning message" in content assert "Error message" in content class TestIntegration: """Integration tests for all components together.""" def test_full_workflow(self, tmp_path, monkeypatch): """Test complete workflow: config -> database -> job creation.""" # Setup temp paths db_path = tmp_path / "integration.db" log_file = tmp_path / "integration.log" # Mock config def mock_get(section, key, default=None): if section == 'database' and key == 'path': return str(db_path) if section == 'logging' and key == 'daemon_log_file': return str(log_file) return default # Reset singletons ConfigManager._instance = None DatabaseManager._instance = None monkeypatch.setattr(ConfigManager, 'get', mock_get) # Create instances config = ConfigManager() logger = setup_logger('integration', str(log_file)) db = DatabaseManager() # Create job job_id = db.create_job( name="integration_job", job_type="python_module", executable_path="test.module", timeout=600, description="Integration test job" ) assert job_id > 0 # Verify job jobs = db.get_enabled_jobs() assert len(jobs) == 1 assert jobs[0]['name'] == "integration_job" # Create execution exec_id = db.create_execution(job_id, 'running') # Update execution db.update_execution(exec_id, status='success', exit_code=0) # Verify everything worked assert db_path.exists() assert log_file.exists() # Cleanup ConfigManager._instance = None DatabaseManager._instance = None if __name__ == '__main__': pytest.main([__file__, '-v'])