"""Job execution module for DaemonControl. This module provides the JobExecutor class that handles executing jobs as subprocesses with logging, timeout management, and execution tracking. """ import os import subprocess import sys import threading from datetime import datetime from pathlib import Path from typing import Dict, Optional, Tuple class JobExecutor: """Execute jobs as subprocesses with logging and timeout.""" def __init__(self, db_manager, config_manager, logger): """Initialize job executor. Args: db_manager: DatabaseManager instance config_manager: ConfigManager instance logger: Logger instance """ self.db = db_manager self.config = config_manager self.logger = logger self.active_executions = {} # execution_id -> thread def execute_job(self, job: Dict) -> int: """Execute a job asynchronously. Args: job: Job dict from database with keys: id, name, job_type, executable_path, working_directory, timeout Returns: execution_id: Database ID of created execution record """ try: # Create execution record with queued status execution_id = self.db.create_execution(job['id'], status='queued') self.logger.info( f"Queuing job '{job['name']}' (execution_id: {execution_id})" ) # Start execution thread thread = threading.Thread( target=self._run_job_thread, args=(execution_id, job), daemon=True ) self.active_executions[execution_id] = thread thread.start() return execution_id except Exception as e: self.logger.error(f"Failed to execute job '{job['name']}': {e}") # Create failed execution record execution_id = self.db.create_execution(job['id'], status='failed') self.db.update_execution( execution_id, end_time=datetime.now().isoformat(), error_message=str(e) ) return execution_id def _run_job_thread(self, execution_id: int, job: Dict) -> None: """Run job in subprocess (called in separate thread). Args: execution_id: Database execution record ID job: Job dictionary with execution details """ try: # Update status to running self.db.update_execution(execution_id, status='running') self.logger.info(f"Starting execution {execution_id}: {job['name']}") # Determine Python executable python_exe = self._get_python_executable(job.get('working_directory')) # Build command cmd = self._build_command(job, python_exe) # Create log file log_file = self._create_log_file_path(job['name']) # Get timeout (use job timeout or config default) timeout = job.get('timeout') or self.config.get( 'daemon', 'default_timeout', 3600 ) # Execute subprocess exit_code, status, error_msg = self._execute_subprocess( cmd, log_file, job.get('working_directory'), timeout ) # Update execution record with results self.db.update_execution( execution_id, status=status, exit_code=exit_code, end_time=datetime.now().isoformat(), log_output=str(log_file), error_message=error_msg ) log_level = self.logger.info if status == 'success' else self.logger.error log_level( f"Execution {execution_id} completed: " f"status={status}, exit_code={exit_code}" ) except Exception as e: self.logger.error(f"Execution {execution_id} failed with exception: {e}") # Update as failed self.db.update_execution( execution_id, status='failed', end_time=datetime.now().isoformat(), error_message=str(e) ) finally: # Remove from active executions if execution_id in self.active_executions: del self.active_executions[execution_id] def _get_python_executable(self, working_directory: Optional[str]) -> str: """Detect Python executable with venv support. Priority: 1. Check working_directory/venv/bin/python (Linux venv) 2. Check $VIRTUAL_ENV/bin/python (if env var set) 3. Fallback to sys.executable (system Python) Args: working_directory: Optional working directory path Returns: Full path to Python executable """ # Priority 1: Check for venv in working directory if working_directory: venv_python = Path(working_directory) / 'venv' / 'bin' / 'python' if venv_python.exists(): self.logger.debug(f"Using venv Python: {venv_python}") return str(venv_python) # Priority 2: Check VIRTUAL_ENV environment variable venv_path = os.environ.get('VIRTUAL_ENV') if venv_path: venv_python = Path(venv_path) / 'bin' / 'python' if venv_python.exists(): self.logger.debug(f"Using VIRTUAL_ENV Python: {venv_python}") return str(venv_python) # Priority 3: System Python self.logger.debug(f"Using system Python: {sys.executable}") return sys.executable def _build_command(self, job: Dict, python_exe: str) -> list: """Build subprocess command based on job type. Args: job: Job dictionary python_exe: Path to Python executable Returns: Command as list of strings """ job_type = job['job_type'] executable_path = job['executable_path'] if job_type == 'script': # Execute Python script directly cmd = [python_exe, executable_path] elif job_type == 'python_module': # Execute as Python module cmd = [python_exe, '-m', executable_path] else: # Fallback: treat as script self.logger.warning( f"Unknown job_type '{job_type}', treating as script" ) cmd = [python_exe, executable_path] self.logger.debug(f"Built command: {' '.join(cmd)}") return cmd def _create_log_file_path(self, job_name: str) -> Path: """Create path for execution log file. Format: ~/.config/daemon-control/logs/executions/{job_name}_{timestamp}.log Args: job_name: Name of the job Returns: Path object for log file """ # Get execution log directory from config log_dir = self.config.get('logging', 'execution_log_dir') log_dir_path = Path(log_dir).expanduser() # Create directory if it doesn't exist log_dir_path.mkdir(parents=True, exist_ok=True) # Create log file name with timestamp timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') log_file = log_dir_path / f"{job_name}_{timestamp}.log" return log_file def _execute_subprocess( self, cmd: list, log_file: Path, working_dir: Optional[str], timeout: int ) -> Tuple[int, str, Optional[str]]: """Execute subprocess with timeout and logging. Args: cmd: Command list log_file: Path to log file working_dir: Working directory (or None) timeout: Timeout in seconds Returns: Tuple of (exit_code, status, error_message) status: 'success', 'failed', or 'timeout' """ try: with open(log_file, 'w') as f: # Write header to log f.write("=" * 70 + "\n") f.write("=== DaemonControl Job Execution Log ===\n") f.write("=" * 70 + "\n") f.write(f"Command: {' '.join(cmd)}\n") f.write(f"Working Directory: {working_dir or 'None'}\n") f.write(f"Timeout: {timeout}s\n") f.write(f"Start Time: {datetime.now().isoformat()}\n") f.write("=" * 70 + "\n\n") f.flush() # Start process process = subprocess.Popen( cmd, stdout=f, stderr=subprocess.STDOUT, cwd=working_dir, text=True ) # Wait with timeout try: exit_code = process.wait(timeout=timeout) status = 'success' if exit_code == 0 else 'failed' error_msg = None if exit_code == 0 else f"Exit code: {exit_code}" # Write footer f.write("\n" + "=" * 70 + "\n") f.write(f"End Time: {datetime.now().isoformat()}\n") f.write(f"Exit Code: {exit_code}\n") f.write(f"Status: {status.upper()}\n") f.write("=" * 70 + "\n") except subprocess.TimeoutExpired: self.logger.warning( f"Process timeout after {timeout}s, killing process" ) process.kill() process.wait() # Wait for kill to complete exit_code = -1 status = 'timeout' error_msg = f"Timeout after {timeout} seconds" # Write timeout to log f.write("\n" + "=" * 70 + "\n") f.write(f"!!! TIMEOUT after {timeout} seconds !!!\n") f.write(f"Process was killed.\n") f.write("=" * 70 + "\n") except FileNotFoundError as e: self.logger.error(f"Executable not found: {cmd[0]}") exit_code = -1 status = 'failed' error_msg = f"Executable not found: {str(e)}" except PermissionError as e: self.logger.error(f"Permission denied: {str(e)}") exit_code = -1 status = 'failed' error_msg = f"Permission denied: {str(e)}" except Exception as e: self.logger.error(f"Subprocess execution failed: {str(e)}") exit_code = -1 status = 'failed' error_msg = str(e) return exit_code, status, error_msg def get_active_execution_count(self) -> int: """Get number of currently running executions. Returns: Number of active execution threads """ return len(self.active_executions) def wait_for_all(self, timeout: Optional[int] = None) -> None: """Wait for all active executions to complete. Args: timeout: Maximum time to wait in seconds (None = wait forever) """ if not self.active_executions: return self.logger.info( f"Waiting for {len(self.active_executions)} active executions to complete" ) # Join all threads for execution_id, thread in list(self.active_executions.items()): if thread.is_alive(): thread.join(timeout=timeout) self.logger.info("All executions completed")