"""Test suite for DaemonControl daemon functionality. Tests cover: - JobExecutor initialization and execution - Python executable detection - Command building - SchedulerDaemon initialization - Job loading and scheduling """ import os import sys import tempfile import time from datetime import datetime from pathlib import Path import pytest # Add parent directory to path sys.path.insert(0, str(Path(__file__).parent.parent)) from core import ConfigManager, DatabaseManager, setup_logger from daemon import JobExecutor, SchedulerDaemon class TestJobExecutor: """Tests for JobExecutor class.""" @pytest.fixture def executor(self, tmp_path, monkeypatch): """Create JobExecutor with temporary database.""" # Setup temp database db_path = tmp_path / "test_daemon.db" def mock_get(section, key, default=None): if section == 'database' and key == 'path': return str(db_path) if section == 'logging' and key == 'execution_log_dir': return str(tmp_path / "logs" / "executions") if section == 'daemon' and key == 'default_timeout': return 30 return default # Reset singletons ConfigManager._instance = None DatabaseManager._instance = None monkeypatch.setattr(ConfigManager, 'get', mock_get) # Create instances config = ConfigManager() db = DatabaseManager() logger = setup_logger('test_executor', None, 'DEBUG') executor = JobExecutor(db, config, logger) yield executor # Cleanup ConfigManager._instance = None DatabaseManager._instance = None def test_job_executor_initialization(self, executor): """Test JobExecutor can be instantiated.""" assert executor is not None assert executor.db is not None assert executor.config is not None assert executor.logger is not None assert isinstance(executor.active_executions, dict) def test_python_executable_detection(self, executor): """Test _get_python_executable finds system Python.""" python_exe = executor._get_python_executable(None) assert python_exe is not None assert 'python' in python_exe.lower() assert Path(python_exe).exists() def test_python_executable_venv_detection(self, executor, tmp_path): """Test venv detection in working directory.""" # Create fake venv structure venv_dir = tmp_path / "venv" / "bin" venv_dir.mkdir(parents=True) # Create fake python executable venv_python = venv_dir / "python" venv_python.write_text("#!/bin/bash\necho 'fake python'\n") venv_python.chmod(0o755) # Test detection python_exe = executor._get_python_executable(str(tmp_path)) assert str(venv_python) == python_exe def test_build_command_script(self, executor): """Test command building for script type.""" job = { 'job_type': 'script', 'executable_path': '/tmp/test.py' } cmd = executor._build_command(job, '/usr/bin/python3') assert cmd == ['/usr/bin/python3', '/tmp/test.py'] def test_build_command_python_module(self, executor): """Test command building for python_module type.""" job = { 'job_type': 'python_module', 'executable_path': 'mymodule.main' } cmd = executor._build_command(job, '/usr/bin/python3') assert cmd == ['/usr/bin/python3', '-m', 'mymodule.main'] def test_create_log_file_path(self, executor, tmp_path): """Test log file path creation.""" log_file = executor._create_log_file_path('test_job') assert log_file is not None assert 'test_job' in str(log_file) assert log_file.suffix == '.log' # Check parent directory was created assert log_file.parent.exists() def test_execute_simple_job(self, executor): """Test executing a real job.""" # Create a simple echo job job_id = executor.db.create_job( name="test_echo", job_type="script", executable_path="/usr/bin/echo", description="Test echo job" ) # Get job from database with executor.db.get_connection() as conn: cursor = conn.execute("SELECT * FROM jobs WHERE id = ?", (job_id,)) job = dict(cursor.fetchone()) # Execute job execution_id = executor.execute_job(job) assert execution_id > 0 # Wait for execution to complete (max 5 seconds) for _ in range(50): with executor.db.get_connection() as conn: cursor = conn.execute( "SELECT status FROM executions WHERE id = ?", (execution_id,) ) result = cursor.fetchone() if result and result['status'] in ('success', 'failed', 'timeout'): break time.sleep(0.1) # Verify execution record with executor.db.get_connection() as conn: cursor = conn.execute( "SELECT * FROM executions WHERE id = ?", (execution_id,) ) execution = dict(cursor.fetchone()) assert execution is not None assert execution['status'] in ('success', 'failed') assert execution['job_id'] == job_id def test_get_active_execution_count(self, executor): """Test getting active execution count.""" count = executor.get_active_execution_count() assert isinstance(count, int) assert count >= 0 class TestSchedulerDaemon: """Tests for SchedulerDaemon class.""" @pytest.fixture def daemon(self, tmp_path, monkeypatch): """Create SchedulerDaemon with temporary database.""" # Setup temp paths db_path = tmp_path / "test_daemon.db" log_file = tmp_path / "daemon.log" def mock_get(section, key, default=None): if section == 'database' and key == 'path': return str(db_path) if section == 'logging' and key == 'daemon_log_file': return str(log_file) if section == 'logging' and key == 'execution_log_dir': return str(tmp_path / "logs" / "executions") if section == 'logging' and key == 'level': return 'DEBUG' if section == 'daemon' and key == 'default_timeout': return 30 if section == 'daemon' and key == 'max_concurrent_jobs': return 3 return default # Reset singletons ConfigManager._instance = None DatabaseManager._instance = None monkeypatch.setattr(ConfigManager, 'get', mock_get) daemon = SchedulerDaemon() yield daemon # Cleanup if daemon.scheduler.running: daemon.scheduler.shutdown(wait=False) ConfigManager._instance = None DatabaseManager._instance = None def test_scheduler_daemon_initialization(self, daemon): """Test SchedulerDaemon instantiates correctly.""" assert daemon is not None assert daemon.config is not None assert daemon.db is not None assert daemon.logger is not None assert daemon.job_executor is not None assert daemon.scheduler is not None assert daemon.running is False def test_load_jobs_no_jobs(self, daemon): """Test _load_jobs with no jobs in database.""" # Should not raise exception daemon._load_jobs() # Verify no jobs scheduled jobs = daemon.scheduler.get_jobs() assert len(jobs) == 0 def test_load_jobs_with_schedule(self, daemon): """Test _load_jobs loads from database.""" # Create test job job_id = daemon.db.create_job( name="test_scheduled_job", job_type="script", executable_path="/usr/bin/echo", description="Test job with schedule" ) # Add schedule now = datetime.now().isoformat() with daemon.db.get_connection() as conn: conn.execute( "INSERT INTO schedules " "(job_id, cron_expression, enabled, created_at, updated_at) " "VALUES (?, ?, 1, ?, ?)", (job_id, "0 2 * * *", now, now) # Daily at 2 AM ) # Load jobs daemon._load_jobs() # Verify job was scheduled jobs = daemon.scheduler.get_jobs() assert len(jobs) == 1 assert "test_scheduled_job" in jobs[0].name def test_reload_jobs(self, daemon): """Test reload_jobs clears and reloads.""" # Create initial job job_id = daemon.db.create_job( name="initial_job", job_type="script", executable_path="/usr/bin/echo" ) now = datetime.now().isoformat() with daemon.db.get_connection() as conn: conn.execute( "INSERT INTO schedules " "(job_id, cron_expression, enabled, created_at, updated_at) " "VALUES (?, ?, 1, ?, ?)", (job_id, "* * * * *", now, now) ) # Load jobs daemon._load_jobs() assert len(daemon.scheduler.get_jobs()) == 1 # Reload daemon.reload_jobs() assert len(daemon.scheduler.get_jobs()) == 1 class TestIntegration: """Integration tests for daemon functionality.""" def test_daemon_can_import(self): """Test daemon modules can be imported.""" from daemon import JobExecutor, SchedulerDaemon assert JobExecutor is not None assert SchedulerDaemon is not None def test_create_and_execute_job_manually(self, tmp_path): """Test creating and manually executing a job.""" # Reset singletons ConfigManager._instance = None DatabaseManager._instance = None # Create temp script test_script = tmp_path / "test_script.py" test_script.write_text( "#!/usr/bin/env python3\n" "print('Hello from test script')\n" ) test_script.chmod(0o755) # Create job db = DatabaseManager() job_id = db.create_job( name="manual_test", job_type="script", executable_path=str(test_script), working_directory=str(tmp_path) ) # Create executor config = ConfigManager() logger = setup_logger('integration', None, 'INFO') executor = JobExecutor(db, config, logger) # Get job with db.get_connection() as conn: cursor = conn.execute("SELECT * FROM jobs WHERE id = ?", (job_id,)) job = dict(cursor.fetchone()) # Execute execution_id = executor.execute_job(job) assert execution_id > 0 # Wait for completion time.sleep(2) # Verify with db.get_connection() as conn: cursor = conn.execute( "SELECT status FROM executions WHERE id = ?", (execution_id,) ) result = cursor.fetchone() # Cleanup ConfigManager._instance = None DatabaseManager._instance = None # May still be running, so check if exists assert result is not None if __name__ == '__main__': pytest.main([__file__, '-v'])