""" UserProfileManager - Gestisce il profilo utente persistente estratto dalle conversazioni """ import json import os from datetime import datetime from typing import Dict, Any class UserProfileManager: """Gestisce il profilo utente persistente estratto dalle conversazioni""" def __init__(self, profile_path: str): self.profile_path = profile_path self.profile = self._load_or_create() def _load_or_create(self) -> Dict[str, Any]: """Carica profilo esistente o crea nuovo""" if os.path.exists(self.profile_path): try: with open(self.profile_path, 'r', encoding='utf-8') as f: return json.load(f) except (json.JSONDecodeError, IOError) as e: print(f"[WARNING] Errore caricamento profilo: {e}. Creo nuovo profilo.") return self._create_empty_profile() else: return self._create_empty_profile() def _create_empty_profile(self) -> Dict[str, Any]: """Crea struttura profilo vuoto""" return { "personal": { "name": None, "family": {}, "residence": None }, "professional": { "company": None, "role": None, "colleagues": [] }, "preferences": { "topics_of_interest": [], "conversation_style": None }, "metadata": { "created_at": datetime.now().isoformat(), "last_updated": datetime.now().isoformat(), "total_conversations": 0 } } def save(self): """Persiste il profilo su disco""" try: self.profile["metadata"]["last_updated"] = datetime.now().isoformat() # Crea directory se non esiste os.makedirs(os.path.dirname(self.profile_path), exist_ok=True) # Salva con formattazione leggibile with open(self.profile_path, 'w', encoding='utf-8') as f: json.dump(self.profile, f, indent=2, ensure_ascii=False) except IOError as e: print(f"[ERROR] Impossibile salvare profilo: {e}") def get_profile_summary(self) -> str: """Genera rappresentazione testuale del profilo per system prompt""" parts = [] # Personal info if self.profile["personal"]["name"]: parts.append(f"Nome utente: {self.profile['personal']['name']}") if self.profile["personal"]["family"]: family_items = [] for k, v in self.profile["personal"]["family"].items(): if isinstance(v, list): family_items.append(f"{k}: {', '.join(v)}") else: family_items.append(f"{k}: {v}") if family_items: parts.append(f"Famiglia: {'; '.join(family_items)}") if self.profile["personal"]["residence"]: parts.append(f"Residenza: {self.profile['personal']['residence']}") # Professional info if self.profile["professional"]["company"]: role_text = self.profile["professional"]["role"] if self.profile["professional"]["role"] else "ruolo non specificato" parts.append(f"Lavoro: {role_text} presso {self.profile['professional']['company']}") if self.profile["professional"]["colleagues"]: colleagues_str = ", ".join(self.profile["professional"]["colleagues"]) parts.append(f"Colleghi: {colleagues_str}") # Interests if self.profile["preferences"]["topics_of_interest"]: topics_str = ", ".join(self.profile["preferences"]["topics_of_interest"]) parts.append(f"Interessi: {topics_str}") if not parts: return "Nessuna informazione sul profilo utente disponibile." return "\n".join(parts) def update_from_extraction(self, extracted_info: Dict[str, Any]): """Aggiorna profilo con info estratte da Claude""" updated = False # Nome if "name" in extracted_info and extracted_info["name"]: if self.profile["personal"]["name"] != extracted_info["name"]: self.profile["personal"]["name"] = extracted_info["name"] updated = True # Famiglia if "family" in extracted_info and extracted_info["family"]: for key, value in extracted_info["family"].items(): if value: # Solo se non รจ null o vuoto if key not in self.profile["personal"]["family"] or self.profile["personal"]["family"][key] != value: self.profile["personal"]["family"][key] = value updated = True # Residenza if "residence" in extracted_info and extracted_info["residence"]: if self.profile["personal"]["residence"] != extracted_info["residence"]: self.profile["personal"]["residence"] = extracted_info["residence"] updated = True # Azienda if "company" in extracted_info and extracted_info["company"]: if self.profile["professional"]["company"] != extracted_info["company"]: self.profile["professional"]["company"] = extracted_info["company"] updated = True # Ruolo if "role" in extracted_info and extracted_info["role"]: if self.profile["professional"]["role"] != extracted_info["role"]: self.profile["professional"]["role"] = extracted_info["role"] updated = True # Colleghi if "colleagues" in extracted_info and extracted_info["colleagues"]: for colleague in extracted_info["colleagues"]: if colleague and colleague not in self.profile["professional"]["colleagues"]: self.profile["professional"]["colleagues"].append(colleague) updated = True # Interessi if "interests" in extracted_info and extracted_info["interests"]: for interest in extracted_info["interests"]: if interest and interest not in self.profile["preferences"]["topics_of_interest"]: self.profile["preferences"]["topics_of_interest"].append(interest) updated = True # Salva se ci sono stati aggiornamenti if updated: self.save() return updated