"""Scheduler daemon module for DaemonControl. This module provides the main SchedulerDaemon class that orchestrates job scheduling using APScheduler and manages the daemon lifecycle. """ import signal import sys import time from typing import Optional from apscheduler.schedulers.background import BackgroundScheduler from apscheduler.triggers.cron import CronTrigger from core import ConfigManager, DatabaseManager, setup_logger from .job_executor import JobExecutor class SchedulerDaemon: """Main daemon process for job scheduling.""" def __init__(self): """Initialize daemon. Creates: - ConfigManager instance - DatabaseManager instance - Logger instance - JobExecutor instance - APScheduler BackgroundScheduler """ # Initialize core components self.config = ConfigManager() self.db = DatabaseManager() # Setup logger for daemon log_file = self.config.get('logging', 'daemon_log_file') log_level = self.config.get('logging', 'level', 'INFO') self.logger = setup_logger('daemon', log_file, log_level) self.logger.info("=" * 70) self.logger.info("Initializing DaemonControl Scheduler") self.logger.info("=" * 70) # Initialize job executor self.job_executor = JobExecutor(self.db, self.config, self.logger) # Initialize APScheduler self.scheduler = BackgroundScheduler( job_defaults={ 'coalesce': True, # If missed, run only once 'max_instances': 1, # One instance per job 'misfire_grace_time': 300 # 5 min grace period } ) self.running = False self.logger.info("Daemon initialized successfully") def start(self) -> None: """Start the daemon. Steps: 1. Load jobs from database 2. Create APScheduler triggers for each job 3. Start APScheduler 4. Setup signal handlers (SIGTERM, SIGINT) 5. Run forever (until stopped) """ try: self.logger.info("Starting daemon...") # Load and schedule jobs self._load_jobs() # Start APScheduler self.scheduler.start() self.logger.info("APScheduler started") # Setup signal handlers for graceful shutdown self._setup_signal_handlers() # Mark as running self.running = True self.logger.info("Daemon started successfully") self.logger.info("Press Ctrl+C to stop") # Run forever self._run_forever() except Exception as e: self.logger.error(f"Failed to start daemon: {e}", exc_info=True) raise def stop(self) -> None: """Stop the daemon gracefully. Steps: 1. Stop accepting new jobs 2. Wait for running jobs to complete (with timeout) 3. Shutdown APScheduler 4. Log shutdown """ if not self.running: return self.logger.info("=" * 70) self.logger.info("Stopping daemon...") self.logger.info("=" * 70) # Mark as not running to stop main loop self.running = False # Stop scheduler from triggering new jobs if self.scheduler.running: self.logger.info("Shutting down APScheduler...") self.scheduler.shutdown(wait=False) # Wait for active executions to complete active_count = self.job_executor.get_active_execution_count() if active_count > 0: self.logger.info( f"Waiting for {active_count} active job(s) to complete " f"(max 30 seconds)..." ) self.job_executor.wait_for_all(timeout=30) self.logger.info("Daemon stopped successfully") self.logger.info("=" * 70) def _load_jobs(self) -> None: """Load enabled jobs from database and schedule them. For each enabled job: 1. Get job details 2. Get schedule (cron expression) 3. Create CronTrigger from cron expression 4. Add job to APScheduler with trigger """ self.logger.info("Loading jobs from database...") jobs = self.db.get_enabled_jobs() if not jobs: self.logger.warning("No enabled jobs found in database") return self.logger.info(f"Found {len(jobs)} enabled job(s)") scheduled_count = 0 job_names = set() for job in jobs: # Skip duplicate job entries (jobs can have multiple schedules) if job['name'] in job_names: continue job_names.add(job['name']) # Get schedules for this job with self.db.get_connection() as conn: cursor = conn.execute( "SELECT id, cron_expression FROM schedules " "WHERE job_id = ? AND enabled = 1", (job['id'],) ) schedules = cursor.fetchall() if not schedules: self.logger.warning( f"Job '{job['name']}' has no enabled schedules, skipping" ) continue # Schedule job with each cron expression for schedule in schedules: cron_expr = schedule['cron_expression'] try: trigger = CronTrigger.from_crontab(cron_expr) self.scheduler.add_job( func=self._execute_job_callback, trigger=trigger, args=[job['id']], id=f"job_{job['id']}_schedule_{schedule['id']}", name=f"{job['name']} ({cron_expr})" ) scheduled_count += 1 self.logger.info( f" ✓ Scheduled '{job['name']}' with cron: {cron_expr}" ) except Exception as e: self.logger.error( f" ✗ Failed to schedule '{job['name']}': {e}", exc_info=True ) self.logger.info(f"Successfully scheduled {scheduled_count} job(s)") def _execute_job_callback(self, job_id: int) -> None: """Callback executed by APScheduler when trigger fires. Args: job_id: Database ID of job to execute """ try: # Fetch fresh job data from database with self.db.get_connection() as conn: cursor = conn.execute( "SELECT * FROM jobs WHERE id = ? AND enabled = 1", (job_id,) ) job = cursor.fetchone() if not job: self.logger.warning( f"Job {job_id} not found or disabled, skipping execution" ) return # Convert Row to dict job_dict = dict(job) # Check max concurrent jobs limit max_concurrent = self.config.get('daemon', 'max_concurrent_jobs', 3) active_count = self.job_executor.get_active_execution_count() if active_count >= max_concurrent: self.logger.warning( f"Max concurrent jobs ({max_concurrent}) reached, " f"deferring execution of '{job_dict['name']}'" ) return # Execute job self.logger.info( f"Triggering execution of '{job_dict['name']}' (job_id: {job_id})" ) execution_id = self.job_executor.execute_job(job_dict) self.logger.debug( f"Job '{job_dict['name']}' queued (execution_id: {execution_id})" ) except Exception as e: self.logger.error( f"Error in job callback for job_id {job_id}: {e}", exc_info=True ) def _setup_signal_handlers(self) -> None: """Setup graceful shutdown on SIGTERM and SIGINT.""" signal.signal(signal.SIGTERM, self._signal_handler) signal.signal(signal.SIGINT, self._signal_handler) self.logger.debug("Signal handlers registered (SIGTERM, SIGINT)") def _signal_handler(self, signum: int, frame) -> None: """Handle shutdown signals. Args: signum: Signal number frame: Current stack frame """ self.logger.info(f"Received signal {signum}, initiating graceful shutdown...") self.stop() sys.exit(0) def _run_forever(self) -> None: """Keep daemon running. Simple while loop with time.sleep(1) Exits when stop() is called (self.running = False) """ try: while self.running: time.sleep(1) except KeyboardInterrupt: # Ctrl+C pressed self.logger.info("Keyboard interrupt received") self.stop() def reload_jobs(self) -> None: """Reload jobs from database. This is useful for picking up configuration changes without restarting the daemon. Note: This will be used in future milestones for GUI integration. """ self.logger.info("Reloading jobs from database...") # Remove all existing jobs from scheduler self.scheduler.remove_all_jobs() # Reload jobs self._load_jobs() self.logger.info("Jobs reloaded successfully")