"""Execution history tab"""
from PyQt6.QtWidgets import (
QWidget, QVBoxLayout, QHBoxLayout, QTableWidget,
QTableWidgetItem, QPushButton, QComboBox, QLabel,
QMessageBox, QHeaderView, QTextEdit, QDialog,
QDialogButtonBox
)
from PyQt6.QtCore import Qt
from core import DatabaseManager
from datetime import datetime, timedelta
from pathlib import Path
class HistoryTab(QWidget):
"""Execution history tab"""
def __init__(self):
"""Initialize history tab"""
super().__init__()
self.db = DatabaseManager()
self._init_ui()
self.refresh()
def _init_ui(self):
"""Create UI layout"""
layout = QVBoxLayout()
self.setLayout(layout)
# Filter bar
filter_layout = QHBoxLayout()
filter_layout.addWidget(QLabel("Job:"))
self.job_filter = QComboBox()
self.job_filter.addItem("(All Jobs)", None)
self._populate_job_filter()
self.job_filter.currentIndexChanged.connect(self.refresh)
filter_layout.addWidget(self.job_filter)
filter_layout.addWidget(QLabel("Status:"))
self.status_filter = QComboBox()
self.status_filter.addItems([
"(All)", "success", "failed", "timeout", "running", "queued"
])
self.status_filter.currentIndexChanged.connect(self.refresh)
filter_layout.addWidget(self.status_filter)
filter_layout.addWidget(QLabel("Period:"))
self.period_filter = QComboBox()
self.period_filter.addItems([
"Last 24 hours", "Last 7 days", "Last 30 days", "All time"
])
self.period_filter.currentIndexChanged.connect(self.refresh)
filter_layout.addWidget(self.period_filter)
filter_layout.addStretch()
refresh_btn = QPushButton("π Refresh")
refresh_btn.clicked.connect(self.refresh)
filter_layout.addWidget(refresh_btn)
layout.addLayout(filter_layout)
# Table
self.table = QTableWidget()
self.table.setColumnCount(7)
self.table.setHorizontalHeaderLabels([
"ID", "Job", "Status", "Start Time", "Duration", "Exit Code", "Log"
])
# Column sizing
header = self.table.horizontalHeader()
header.setSectionResizeMode(0, QHeaderView.ResizeMode.ResizeToContents)
header.setSectionResizeMode(1, QHeaderView.ResizeMode.Stretch)
header.setSectionResizeMode(2, QHeaderView.ResizeMode.ResizeToContents)
header.setSectionResizeMode(3, QHeaderView.ResizeMode.ResizeToContents)
header.setSectionResizeMode(4, QHeaderView.ResizeMode.ResizeToContents)
header.setSectionResizeMode(5, QHeaderView.ResizeMode.ResizeToContents)
header.setSectionResizeMode(6, QHeaderView.ResizeMode.ResizeToContents)
self.table.setSelectionBehavior(QTableWidget.SelectionBehavior.SelectRows)
self.table.setSelectionMode(QTableWidget.SelectionMode.SingleSelection)
self.table.itemDoubleClicked.connect(self._on_row_double_clicked)
layout.addWidget(self.table)
# Action buttons
button_layout = QHBoxLayout()
self.view_log_btn = QPushButton("π View Log")
self.view_log_btn.clicked.connect(self.view_log)
self.view_log_btn.setEnabled(False)
button_layout.addWidget(self.view_log_btn)
self.clear_btn = QPushButton("ποΈ Clear Old History")
self.clear_btn.clicked.connect(self.clear_old_history)
button_layout.addWidget(self.clear_btn)
button_layout.addStretch()
# Stats label
self.stats_label = QLabel()
button_layout.addWidget(self.stats_label)
layout.addLayout(button_layout)
# Connect selection
self.table.itemSelectionChanged.connect(self._on_selection_changed)
def _populate_job_filter(self):
"""Populate job filter dropdown with all jobs"""
with self.db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("SELECT id, name FROM jobs ORDER BY name")
jobs = cursor.fetchall()
for job in jobs:
self.job_filter.addItem(job['name'], job['id'])
def refresh(self):
"""Refresh execution history from database"""
self.table.setRowCount(0)
# Build query based on filters
query = """
SELECT e.id, j.name, e.status, e.start_time, e.end_time,
e.exit_code, e.log_file_path
FROM executions e
JOIN jobs j ON j.id = e.job_id
WHERE 1=1
"""
params = []
# Job filter
job_id = self.job_filter.currentData()
if job_id is not None:
query += " AND e.job_id = ?"
params.append(job_id)
# Status filter
status = self.status_filter.currentText()
if status != "(All)":
query += " AND e.status = ?"
params.append(status)
# Period filter
period = self.period_filter.currentText()
if period != "All time":
if period == "Last 24 hours":
hours = 24
elif period == "Last 7 days":
hours = 24 * 7
elif period == "Last 30 days":
hours = 24 * 30
cutoff = datetime.now() - timedelta(hours=hours)
query += " AND e.start_time >= ?"
params.append(cutoff.isoformat())
query += " ORDER BY e.start_time DESC LIMIT 1000"
with self.db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(query, params)
executions = cursor.fetchall()
# Populate table
for execution in executions:
row = self.table.rowCount()
self.table.insertRow(row)
# ID
self.table.setItem(row, 0, QTableWidgetItem(str(execution['id'])))
# Job name
self.table.setItem(row, 1, QTableWidgetItem(execution['name']))
# Status with emoji
status = execution['status']
if status == 'success':
status_text = "β
Success"
elif status == 'failed':
status_text = "β Failed"
elif status == 'timeout':
status_text = "β±οΈ Timeout"
elif status == 'running':
status_text = "βΆοΈ Running"
else:
status_text = "βΈοΈ Queued"
self.table.setItem(row, 2, QTableWidgetItem(status_text))
# Start time
start_time = datetime.fromisoformat(execution['start_time'])
self.table.setItem(row, 3, QTableWidgetItem(
start_time.strftime('%Y-%m-%d %H:%M:%S')
))
# Duration
if execution['end_time']:
end_time = datetime.fromisoformat(execution['end_time'])
duration = end_time - start_time
duration_text = str(duration).split('.')[0] # Remove microseconds
else:
duration_text = "β"
self.table.setItem(row, 4, QTableWidgetItem(duration_text))
# Exit code
exit_code = execution['exit_code']
exit_code_text = str(exit_code) if exit_code is not None else "β"
self.table.setItem(row, 5, QTableWidgetItem(exit_code_text))
# Log button
log_item = QTableWidgetItem("π View")
self.table.setItem(row, 6, log_item)
# Update stats
total = len(executions)
success = sum(1 for e in executions if e['status'] == 'success')
failed = sum(1 for e in executions if e['status'] == 'failed')
self.stats_label.setText(
f"Total: {total} | Success: {success} | Failed: {failed}"
)
def view_log(self):
"""View execution log for selected execution"""
row = self.table.currentRow()
if row < 0:
return
execution_id = int(self.table.item(row, 0).text())
# Get execution details
with self.db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute("""
SELECT e.*, j.name
FROM executions e
JOIN jobs j ON j.id = e.job_id
WHERE e.id = ?
""", (execution_id,))
execution = cursor.fetchone()
# Show log dialog
dialog = LogViewerDialog(execution, self)
dialog.exec()
def clear_old_history(self):
"""Clear execution history older than 30 days"""
reply = QMessageBox.question(
self,
"Confirm Clear History",
"Delete execution history older than 30 days?\n\n"
"This cannot be undone.",
QMessageBox.StandardButton.Yes | QMessageBox.StandardButton.No
)
if reply == QMessageBox.StandardButton.Yes:
cutoff = datetime.now() - timedelta(days=30)
with self.db.get_connection() as conn:
cursor = conn.cursor()
cursor.execute(
"DELETE FROM executions WHERE start_time < ?",
(cutoff.isoformat(),)
)
deleted = cursor.rowcount
QMessageBox.information(
self,
"History Cleared",
f"Deleted {deleted} old execution records."
)
self.refresh()
def _on_selection_changed(self):
"""Handle selection change"""
has_selection = self.table.currentRow() >= 0
self.view_log_btn.setEnabled(has_selection)
def _on_row_double_clicked(self, item):
"""Handle row double-click"""
self.view_log()
class LogViewerDialog(QDialog):
"""Dialog to view execution log"""
def __init__(self, execution, parent=None):
"""
Initialize log viewer.
Args:
execution: Execution record dict
parent: Parent widget
"""
super().__init__(parent)
self.execution = execution
self.setWindowTitle(f"Execution Log - {execution['name']}")
self.setMinimumSize(800, 600)
self._init_ui()
self._load_log()
def _init_ui(self):
"""Create UI"""
layout = QVBoxLayout()
self.setLayout(layout)
# Info bar
info_text = (
f"Job: {self.execution['name']} | "
f"Status: {self.execution['status']} | "
f"Exit Code: {self.execution['exit_code'] or 'N/A'} | "
f"Start: {self.execution['start_time']}"
)
info_label = QLabel(info_text)
layout.addWidget(info_label)
# Log text
self.log_text = QTextEdit()
self.log_text.setReadOnly(True)
self.log_text.setFontFamily("Monospace")
layout.addWidget(self.log_text)
# Buttons
buttons = QDialogButtonBox(QDialogButtonBox.StandardButton.Close)
buttons.rejected.connect(self.reject)
layout.addWidget(buttons)
def _load_log(self):
"""Load log file content"""
log_path = self.execution.get('log_file_path')
if not log_path or not Path(log_path).exists():
self.log_text.setPlainText("Log file not found.")
return
try:
with open(log_path, 'r') as f:
log_content = f.read()
self.log_text.setPlainText(log_content)
except Exception as e:
self.log_text.setPlainText(f"Error reading log file: {e}")