"""Execution history tab""" from PyQt6.QtWidgets import ( QWidget, QVBoxLayout, QHBoxLayout, QTableWidget, QTableWidgetItem, QPushButton, QComboBox, QLabel, QMessageBox, QHeaderView, QTextEdit, QDialog, QDialogButtonBox ) from PyQt6.QtCore import Qt from core import DatabaseManager from datetime import datetime, timedelta from pathlib import Path class HistoryTab(QWidget): """Execution history tab""" def __init__(self): """Initialize history tab""" super().__init__() self.db = DatabaseManager() self._init_ui() self.refresh() def _init_ui(self): """Create UI layout""" layout = QVBoxLayout() self.setLayout(layout) # Filter bar filter_layout = QHBoxLayout() filter_layout.addWidget(QLabel("Job:")) self.job_filter = QComboBox() self.job_filter.addItem("(All Jobs)", None) self._populate_job_filter() self.job_filter.currentIndexChanged.connect(self.refresh) filter_layout.addWidget(self.job_filter) filter_layout.addWidget(QLabel("Status:")) self.status_filter = QComboBox() self.status_filter.addItems([ "(All)", "success", "failed", "timeout", "running", "queued" ]) self.status_filter.currentIndexChanged.connect(self.refresh) filter_layout.addWidget(self.status_filter) filter_layout.addWidget(QLabel("Period:")) self.period_filter = QComboBox() self.period_filter.addItems([ "Last 24 hours", "Last 7 days", "Last 30 days", "All time" ]) self.period_filter.currentIndexChanged.connect(self.refresh) filter_layout.addWidget(self.period_filter) filter_layout.addStretch() refresh_btn = QPushButton("πŸ”„ Refresh") refresh_btn.clicked.connect(self.refresh) filter_layout.addWidget(refresh_btn) layout.addLayout(filter_layout) # Table self.table = QTableWidget() self.table.setColumnCount(7) self.table.setHorizontalHeaderLabels([ "ID", "Job", "Status", "Start Time", "Duration", "Exit Code", "Log" ]) # Column sizing header = self.table.horizontalHeader() header.setSectionResizeMode(0, QHeaderView.ResizeMode.ResizeToContents) header.setSectionResizeMode(1, QHeaderView.ResizeMode.Stretch) header.setSectionResizeMode(2, QHeaderView.ResizeMode.ResizeToContents) header.setSectionResizeMode(3, QHeaderView.ResizeMode.ResizeToContents) header.setSectionResizeMode(4, QHeaderView.ResizeMode.ResizeToContents) header.setSectionResizeMode(5, QHeaderView.ResizeMode.ResizeToContents) header.setSectionResizeMode(6, QHeaderView.ResizeMode.ResizeToContents) self.table.setSelectionBehavior(QTableWidget.SelectionBehavior.SelectRows) self.table.setSelectionMode(QTableWidget.SelectionMode.SingleSelection) self.table.itemDoubleClicked.connect(self._on_row_double_clicked) layout.addWidget(self.table) # Action buttons button_layout = QHBoxLayout() self.view_log_btn = QPushButton("πŸ“„ View Log") self.view_log_btn.clicked.connect(self.view_log) self.view_log_btn.setEnabled(False) button_layout.addWidget(self.view_log_btn) self.clear_btn = QPushButton("πŸ—‘οΈ Clear Old History") self.clear_btn.clicked.connect(self.clear_old_history) button_layout.addWidget(self.clear_btn) button_layout.addStretch() # Stats label self.stats_label = QLabel() button_layout.addWidget(self.stats_label) layout.addLayout(button_layout) # Connect selection self.table.itemSelectionChanged.connect(self._on_selection_changed) def _populate_job_filter(self): """Populate job filter dropdown with all jobs""" with self.db.get_connection() as conn: cursor = conn.cursor() cursor.execute("SELECT id, name FROM jobs ORDER BY name") jobs = cursor.fetchall() for job in jobs: self.job_filter.addItem(job['name'], job['id']) def refresh(self): """Refresh execution history from database""" self.table.setRowCount(0) # Build query based on filters query = """ SELECT e.id, j.name, e.status, e.start_time, e.end_time, e.exit_code, e.log_file_path FROM executions e JOIN jobs j ON j.id = e.job_id WHERE 1=1 """ params = [] # Job filter job_id = self.job_filter.currentData() if job_id is not None: query += " AND e.job_id = ?" params.append(job_id) # Status filter status = self.status_filter.currentText() if status != "(All)": query += " AND e.status = ?" params.append(status) # Period filter period = self.period_filter.currentText() if period != "All time": if period == "Last 24 hours": hours = 24 elif period == "Last 7 days": hours = 24 * 7 elif period == "Last 30 days": hours = 24 * 30 cutoff = datetime.now() - timedelta(hours=hours) query += " AND e.start_time >= ?" params.append(cutoff.isoformat()) query += " ORDER BY e.start_time DESC LIMIT 1000" with self.db.get_connection() as conn: cursor = conn.cursor() cursor.execute(query, params) executions = cursor.fetchall() # Populate table for execution in executions: row = self.table.rowCount() self.table.insertRow(row) # ID self.table.setItem(row, 0, QTableWidgetItem(str(execution['id']))) # Job name self.table.setItem(row, 1, QTableWidgetItem(execution['name'])) # Status with emoji status = execution['status'] if status == 'success': status_text = "βœ… Success" elif status == 'failed': status_text = "❌ Failed" elif status == 'timeout': status_text = "⏱️ Timeout" elif status == 'running': status_text = "▢️ Running" else: status_text = "⏸️ Queued" self.table.setItem(row, 2, QTableWidgetItem(status_text)) # Start time start_time = datetime.fromisoformat(execution['start_time']) self.table.setItem(row, 3, QTableWidgetItem( start_time.strftime('%Y-%m-%d %H:%M:%S') )) # Duration if execution['end_time']: end_time = datetime.fromisoformat(execution['end_time']) duration = end_time - start_time duration_text = str(duration).split('.')[0] # Remove microseconds else: duration_text = "β€”" self.table.setItem(row, 4, QTableWidgetItem(duration_text)) # Exit code exit_code = execution['exit_code'] exit_code_text = str(exit_code) if exit_code is not None else "β€”" self.table.setItem(row, 5, QTableWidgetItem(exit_code_text)) # Log button log_item = QTableWidgetItem("πŸ“„ View") self.table.setItem(row, 6, log_item) # Update stats total = len(executions) success = sum(1 for e in executions if e['status'] == 'success') failed = sum(1 for e in executions if e['status'] == 'failed') self.stats_label.setText( f"Total: {total} | Success: {success} | Failed: {failed}" ) def view_log(self): """View execution log for selected execution""" row = self.table.currentRow() if row < 0: return execution_id = int(self.table.item(row, 0).text()) # Get execution details with self.db.get_connection() as conn: cursor = conn.cursor() cursor.execute(""" SELECT e.*, j.name FROM executions e JOIN jobs j ON j.id = e.job_id WHERE e.id = ? """, (execution_id,)) execution = cursor.fetchone() # Show log dialog dialog = LogViewerDialog(execution, self) dialog.exec() def clear_old_history(self): """Clear execution history older than 30 days""" reply = QMessageBox.question( self, "Confirm Clear History", "Delete execution history older than 30 days?\n\n" "This cannot be undone.", QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No ) if reply == QMessageBox.StandardButton.Yes: cutoff = datetime.now() - timedelta(days=30) with self.db.get_connection() as conn: cursor = conn.cursor() cursor.execute( "DELETE FROM executions WHERE start_time < ?", (cutoff.isoformat(),) ) deleted = cursor.rowcount QMessageBox.information( self, "History Cleared", f"Deleted {deleted} old execution records." ) self.refresh() def _on_selection_changed(self): """Handle selection change""" has_selection = self.table.currentRow() >= 0 self.view_log_btn.setEnabled(has_selection) def _on_row_double_clicked(self, item): """Handle row double-click""" self.view_log() class LogViewerDialog(QDialog): """Dialog to view execution log""" def __init__(self, execution, parent=None): """ Initialize log viewer. Args: execution: Execution record dict parent: Parent widget """ super().__init__(parent) self.execution = execution self.setWindowTitle(f"Execution Log - {execution['name']}") self.setMinimumSize(800, 600) self._init_ui() self._load_log() def _init_ui(self): """Create UI""" layout = QVBoxLayout() self.setLayout(layout) # Info bar info_text = ( f"Job: {self.execution['name']} | " f"Status: {self.execution['status']} | " f"Exit Code: {self.execution['exit_code'] or 'N/A'} | " f"Start: {self.execution['start_time']}" ) info_label = QLabel(info_text) layout.addWidget(info_label) # Log text self.log_text = QTextEdit() self.log_text.setReadOnly(True) self.log_text.setFontFamily("Monospace") layout.addWidget(self.log_text) # Buttons buttons = QDialogButtonBox(QDialogButtonBox.StandardButton.Close) buttons.rejected.connect(self.reject) layout.addWidget(buttons) def _load_log(self): """Load log file content""" log_path = self.execution.get('log_file_path') if not log_path or not Path(log_path).exists(): self.log_text.setPlainText("Log file not found.") return try: with open(log_path, 'r') as f: log_content = f.read() self.log_text.setPlainText(log_content) except Exception as e: self.log_text.setPlainText(f"Error reading log file: {e}")