# services/cognitiveservice/cognitiveservice.py import os import pickle from langchain_google_genai import ChatGoogleGenerativeAI from langchain.agents import AgentExecutor, create_react_agent from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder from langchain_community.chat_message_histories.sql import SQLChatMessageHistory from langchain_core.runnables.history import RunnableWithMessageHistory from langchain_community.tools.tavily_search import TavilySearchResults from langchain.retrievers import ParentDocumentRetriever from langchain.storage import LocalFileStore, EncoderBackedStore from langchain.text_splitter import RecursiveCharacterTextSplitter from langchain_chroma import Chroma from langchain_google_genai import GoogleGenerativeAIEmbeddings from langchain.tools import Tool class CognitiveService: """ Il "cervello" di Jarvis. Gestisce l'agente conversazionale, la memoria, e l'accesso agli strumenti (RAG, ricerca web, ecc.). Questa versione è ottimizzata per l'interazione sincrona tramite API. """ def __init__(self, core_api, config): self.core = core_api self.config = config self.agent_executor = None self.log("Servizio cognitivo inizializzato.") def start(self): """ Avvia il servizio, preparando l'agente conversazionale. Questa versione è sincrona e non si basa più su eventi del filesystem. """ self.log("Avvio servizio cognitivo...") try: self._initialize_agent() self.log("Agente conversazionale pronto.") # NOTA: Non ci iscriviamo più a eventi del filesystem. # La comunicazione ora avviene tramite il metodo 'ask'. except Exception as e: self.log(f"ERRORE CRITICO durante l'inizializzazione dell'agente: {e}", 'error') def stop(self): """Ferma il servizio.""" self.log("Servizio cognitivo fermato.") def log(self, message, level='info'): """Metodo di logging standard per il servizio.""" self.core.log(f"[COGNITIVE] {message}", level) def _initialize_agent(self): """ Costruisce l'intera catena dell'agente: LLM, strumenti, memoria e prompt. """ llm = ChatGoogleGenerativeAI( model=self.config.get('model_name', 'gemini-2.5-flash'), temperature=self.config.get('temperature', 0.7) ) # --- Inizializzazione Strumenti --- # Toggle debug degli step/tool (latency & intermediate steps) self.debug_intermediate = bool(self.config.get('debug_intermediate_steps', False)) # 1. Strumento di ricerca web (versione aggiornata) tavily_tool = TavilySearchResults(max_results=3) # 2. Strumento RAG (Ricerca nella base di conoscenza) vectorstore_rel = self.config.get('vectorstore_path', 'chroma_db') docstore_rel = self.config.get('docstore_path', 'doc_store') vectorstore_path = self.core.get_data_path(vectorstore_rel) # Selezione provider embeddings (google/hf) embeddings_provider = self.config.get('embedding_provider', 'google').lower() if embeddings_provider == 'hf': try: from langchain_community.embeddings import HuggingFaceEmbeddings except Exception as e: self.log(f"Errore caricamento HuggingFaceEmbeddings: {e}. Ricado su Google embeddings.", 'warning') embeddings_provider = 'google' if embeddings_provider == 'hf': model_name = self.config.get('embedding_model', 'sentence-transformers/all-MiniLM-L6-v2') embeddings = HuggingFaceEmbeddings(model_name=model_name) else: embeddings = GoogleGenerativeAIEmbeddings(model=self.config.get('embedding_model')) docstore_path = self.core.get_data_path(docstore_rel) # Fallback compatibilità legacy per 'aurelio': se i nuovi percorsi non esistono, # prova automaticamente i vecchi percorsi storici per evitare rotture. try: active = getattr(self.core, 'active_profile', None) except Exception: active = None if active == 'aurelio': if not os.path.exists(vectorstore_path): legacy_vec = self.core.get_data_path('chroma_db') if os.path.exists(legacy_vec): self.log("Percorso vectorstore nuovo assente, uso legacy per Aurelio.", 'warning') vectorstore_path = legacy_vec if not os.path.exists(docstore_path): legacy_doc = self.core.get_data_path('doc_store') if os.path.exists(legacy_doc): self.log("Percorso docstore nuovo assente, uso legacy per Aurelio.", 'warning') docstore_path = legacy_doc # --- SOLUZIONE FINALE: Ripristino del ParentDocumentRetriever --- # Il test di debug ha confermato che il database è leggibile. Ora ripristiniamo # la logica avanzata del "Palazzo Mentale" per fornire all'agente il contesto completo. if os.path.exists(vectorstore_path) and os.path.exists(docstore_path): self.log("Trovato database RAG esistente. Caricamento del ParentDocumentRetriever.") vectorstore = Chroma(collection_name="split_parents", persist_directory=vectorstore_path, embedding_function=embeddings) fs = LocalFileStore(docstore_path) store = EncoderBackedStore(fs, lambda key: key, pickle.dumps, pickle.loads) retriever = ParentDocumentRetriever( vectorstore=vectorstore, docstore=store, child_splitter=RecursiveCharacterTextSplitter(chunk_size=400, chunk_overlap=40), ) rag_description = self.config.get('rag_tool_description') self.memory_available = True else: # MIGLIORAMENTO: Warning dettagliato e descrizione tool dinamica self.log("=" * 80, level='warning') self.log("ATTENZIONE: DATABASE RAG NON TROVATO", level='warning') self.log("La memoria stoica di Aurelio è VUOTA", level='warning') self.log(f"Esegui indicizzazione di 'marco_aurelio.pdf'", level='warning') self.log("=" * 80, level='warning') # Creiamo una funzione fittizia che funge da retriever def dummy_retriever(query: str) -> list: return [] # Restituisce una lista vuota di documenti retriever = dummy_retriever # Descrizione tool che comunica all'agente che la memoria è vuota rag_description = "MEMORIA NON DISPONIBILE - database vuoto. Non usare questo tool." self.memory_available = False # --- FINE SOLUZIONE --- # Funzione helper per formattare l'output del RAG in una stringa pulita e leggibile def run_rag_retriever(query: str) -> str: # Invochiamo il retriever in modo diverso a seconda del suo tipo if callable(retriever) and not hasattr(retriever, 'invoke'): # Se ?? il nostro dummy_retriever docs = retriever(query) else: # Se è il ParentDocumentRetriever docs = retriever.invoke(query) if not docs: return ( "Nessuna informazione trovata nella mia memoria per questa domanda. " "Suggerimento: riformula la domanda oppure indicizza i testi (scripts/indicizza_documenti.py)." ) # Uniamo il contenuto dei documenti in un unico blocco di testo. # Questo è molto più pulito per l'LLM rispetto a un dump di oggetti Document. # Deduplica rapida dei contenuti: normalizza e rimuove ripetizioni seen = set() unique_contents = [] for doc in docs: content = (doc.page_content or "") content_norm = " ".join(content.split()).lower() if content_norm and content_norm not in seen: seen.add(content_norm) unique_contents.append(content) return "\n\n---\n\n".join(unique_contents) # MODIFICA CHIAVE: Usiamo la classe 'Tool' per creare lo strumento RAG in modo robusto. # Usa la descrizione dinamica basata sulla disponibilità del database rag_tool = Tool( name=self.config.get('rag_tool_name', 'knowledge_base_search'), description=rag_description, func=run_rag_retriever ) # Sostituiamo Tavily con un wrapper sicuro che gestisce i fallback try: tavily_safe # type: ignore # definito sopra se patch applicata interamente except NameError: # Se il wrapper non è stato inserito sopra per qualsiasi motivo, # definiamolo qui per garantire la robustezza. def tavily_safe(query: str) -> str: try: return tavily_tool.run(query) except Exception as e: self.log(f"Tavily non disponibile o errore durante la ricerca: {e}", 'warning') return ( "Ricerca web non disponibile al momento. " "Suggerimenti: 1) verifica 'TAVILY_API_KEY'; 2) controlla la rete; 3) riprova tra poco." ) tavily_safe_tool = Tool( name="Ricerca_Web_e_Calcolatrice_Avanzata", description=( "La tua finestra sul mondo moderno. Usalo per trovare informazioni non presenti " "nei tuoi scritti. Essenziale per dati in tempo reale, news e calcoli. " "Include fallback con messaggi guidati quando il servizio non è disponibile." ), func=tavily_safe, ) # Se il debug è attivo, fornisci versioni dei tool con timing if getattr(self, 'debug_intermediate', False): def _timed(name, fn): def _wrapped(*args, **kwargs): import time t0 = time.perf_counter() try: return fn(*args, **kwargs) finally: dt = (time.perf_counter() - t0) * 1000.0 self.log(f"Tool '{name}' eseguito in {dt:.0f} ms", 'info') return _wrapped # Ricrea i tool con funzioni cronometrate rag_tool = Tool( name=self.config.get('rag_tool_name', 'knowledge_base_search'), description=rag_description, func=_timed("rag_retriever", run_rag_retriever) ) tavily_safe_tool = Tool( name="Ricerca_Web_e_Calcolatrice_Avanzata", description=( "La tua finestra sul mondo moderno. Usalo per trovare informazioni non presenti " "nei tuoi scritti. Essenziale per dati in tempo reale, news e calcoli. " "Include fallback con messaggi guidati quando il servizio non ?? disponibile." ), func=_timed("tavily_search", tavily_safe), ) tools = [tavily_safe_tool, rag_tool] # Prompt ReAct conforme ai requisiti di LangChain prompt_template = ChatPromptTemplate.from_messages([ ("system", "{agent_prompt}\n\n" "Hai accesso ai seguenti strumenti:\n\n" "{tools}\n\n" "Usa ESATTAMENTE questo formato:\n\n" "Thought: [il tuo ragionamento su cosa fare]\n" "Action: [una tra: {tool_names}]\n" "Action Input: [l'input da passare allo strumento]\n" "Observation: [il risultato dello strumento verra inserito qui]\n" "... (questo ciclo Thought/Action/Action Input/Observation puo ripetersi N volte)\n" "Thought: Ho trovato la risposta finale\n" "Final Answer: [la tua risposta completa in italiano]\n" "Begin!"), MessagesPlaceholder(variable_name="chat_history"), ("human", "Domanda: {input}"), ("assistant", "{agent_scratchpad}"), ]).partial(agent_prompt=self.config.get('agent_prompt')) agent = create_react_agent(llm, tools, prompt_template) self.agent_executor = AgentExecutor( agent=agent, tools=tools, verbose=bool(getattr(self, 'debug_intermediate', False)), # Verbose solo in debug handle_parsing_errors=True, max_iterations=6, # Maggior margine per ricerche web max_execution_time=45, # Timeout di sicurezza esteso early_stopping_method="generate", # Prova comunque a generare la risposta finale return_intermediate_steps=bool(getattr(self, 'debug_intermediate', False)), ) # Inizializza il path per la memoria conversazionale chat_memory_path = self.core.get_data_path(self.config.get('chat_memory_db', 'memory/memoria_chat.sqlite')) self.chat_memory_path = chat_memory_path self.log("AgentExecutor con prompt ReAct creato.") def ask(self, query: str, session_id: str = "default"): """ Metodo pubblico per interrogare l'agente. Punto di ingresso per le richieste API. Gestisce manualmente la memoria conversazionale. """ if not self.agent_executor: self.log("ERRORE: L'agente non è inizializzato.", 'error') return "Mi dispiace, il mio nucleo cognitivo non è attualmente operativo." # Se la domanda è chiaramente su temi stoici ma la memoria non è disponibile, # comunichiamo esplicitamente lo stato invece di rispondere di modello. try: if self._is_stoic_query(query) and not getattr(self, 'memory_available', False): return ( "La mia memoria stoica non è disponibile o non è stata indicizzata. " "Esegui l'indicizzazione con: 'python scripts/indicizza_documenti.py data/marco_aurelio.pdf' " "e riprova." ) except Exception: pass try: # Carica la storia della conversazione per questa sessione chat_history = SQLChatMessageHistory( session_id=session_id, connection=f"sqlite:///{self.chat_memory_path}" ) # Passa la cronologia come lista di messaggi al prompt ReAct history_messages = list(chat_history.messages) response = self.agent_executor.invoke({ "input": query, "chat_history": history_messages }) # Log intermedio opzionale: azioni svolte if getattr(self, 'debug_intermediate', False) and isinstance(response, dict) and 'intermediate_steps' in response: try: steps = response.get('intermediate_steps', []) for i, step in enumerate(steps, start=1): try: action, observation = step except Exception: action, observation = step, None tool_name = getattr(action, 'tool', 'unknown') self.log(f"[DEBUG] Step {i}: tool={tool_name}", 'info') except Exception: pass # Salva la nuova interazione nella memoria chat_history.add_user_message(query) chat_history.add_ai_message(response.get("output", "")) output = response.get("output", "Non sono riuscito a formulare una risposta.") # Messaggio chiaro in caso di saturazione passi/tempo if isinstance(output, str): out_l = output.strip().lower() if (out_l.startswith("agent stopped due to iteration") or ("time limit" in out_l)): return ( "Risposta non conclusa per limite di passi/tempo. " "Riprova oppure riformula la domanda (più specifica)." ) return output except Exception as e: self.log(f"ERRORE durante l'invocazione dell'agente per la sessione '{session_id}': {e}", 'error') return f"Si è verificato un errore cognitivo: {e}" def _is_stoic_query(self, text: str) -> bool: if not text: return False t = text.lower() keywords = [ "stoic", "stoica", "virtù", "virtu", "logos", "meditazioni", "marco aurelio", "aurelio", "epitteto", "seneca", "saggio stoico" ] return any(k in t for k in keywords)