""" ChatMemory - Gestione memoria conversazionale con SQLite """ import sqlite3 from datetime import datetime from typing import List, Dict, Optional, Any class ChatMemory: """Gestisce la memoria conversazionale persistente con SQLite""" def __init__(self, db_path: str): self.db_path = db_path self._init_db() def _init_db(self): """Inizializza il database SQLite""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' CREATE TABLE IF NOT EXISTS messages ( id INTEGER PRIMARY KEY AUTOINCREMENT, session_id TEXT NOT NULL, role TEXT NOT NULL, content TEXT NOT NULL, timestamp DATETIME DEFAULT CURRENT_TIMESTAMP ) ''') cursor.execute(''' CREATE INDEX IF NOT EXISTS idx_session_timestamp ON messages(session_id, timestamp) ''') # Tabella conversation_summaries cursor.execute(''' CREATE TABLE IF NOT EXISTS conversation_summaries ( id INTEGER PRIMARY KEY AUTOINCREMENT, session_id TEXT NOT NULL, summary_text TEXT NOT NULL, message_range_start INTEGER NOT NULL, message_range_end INTEGER NOT NULL, token_count INTEGER, created_at DATETIME DEFAULT CURRENT_TIMESTAMP ) ''') cursor.execute(''' CREATE INDEX IF NOT EXISTS idx_summary_session ON conversation_summaries(session_id, created_at) ''') conn.commit() conn.close() def add_message(self, session_id: str, role: str, content: str): """Aggiunge un messaggio alla cronologia""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute( 'INSERT INTO messages (session_id, role, content) VALUES (?, ?, ?)', (session_id, role, content) ) conn.commit() conn.close() def get_recent_messages(self, session_id: str, limit: int = 10) -> List[Dict[str, str]]: """Recupera gli ultimi N messaggi per una sessione""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' SELECT role, content, timestamp FROM messages WHERE session_id = ? ORDER BY timestamp DESC LIMIT ? ''', (session_id, limit)) rows = cursor.fetchall() conn.close() # Inverti l'ordine per avere cronologico (più vecchio → più recente) messages = [ {"role": row[0], "content": row[1], "timestamp": row[2]} for row in reversed(rows) ] return messages def clear_session(self, session_id: str): """Cancella tutta la cronologia di una sessione""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute('DELETE FROM messages WHERE session_id = ?', (session_id,)) conn.commit() conn.close() def get_session_count(self, session_id: str) -> int: """Conta i messaggi in una sessione""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute( 'SELECT COUNT(*) FROM messages WHERE session_id = ?', (session_id,) ) count = cursor.fetchone()[0] conn.close() return count # ===== METODI PER GESTIONE SUMMARIES ===== def add_summary(self, session_id: str, summary_text: str, msg_start: int, msg_end: int, token_count: int): """Salva un summary di un range di messaggi""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute( '''INSERT INTO conversation_summaries (session_id, summary_text, message_range_start, message_range_end, token_count) VALUES (?, ?, ?, ?, ?)''', (session_id, summary_text, msg_start, msg_end, token_count) ) conn.commit() conn.close() def get_summaries(self, session_id: str, limit: int = 5) -> List[Dict[str, Any]]: """Recupera gli ultimi N summaries""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() cursor.execute(''' SELECT summary_text, message_range_start, message_range_end, token_count, created_at FROM conversation_summaries WHERE session_id = ? ORDER BY created_at DESC LIMIT ? ''', (session_id, limit)) rows = cursor.fetchall() conn.close() summaries = [ { "summary_text": row[0], "msg_start": row[1], "msg_end": row[2], "token_count": row[3], "created_at": row[4] } for row in reversed(rows) # Cronologico ] return summaries def get_messages_since_last_summary(self, session_id: str) -> List[Dict[str, Any]]: """Recupera messaggi non ancora sintetizzati""" conn = sqlite3.connect(self.db_path) cursor = conn.cursor() # Trova l'ultimo summary cursor.execute(''' SELECT message_range_end FROM conversation_summaries WHERE session_id = ? ORDER BY created_at DESC LIMIT 1 ''', (session_id,)) result = cursor.fetchone() last_summarized_id = result[0] if result else 0 # Recupera messaggi dopo l'ultimo summary cursor.execute(''' SELECT id, role, content, timestamp FROM messages WHERE session_id = ? AND id > ? ORDER BY timestamp ASC ''', (session_id, last_summarized_id)) rows = cursor.fetchall() conn.close() messages = [ {"id": row[0], "role": row[1], "content": row[2], "timestamp": row[3]} for row in rows ] return messages