# Jarvis-Cognitive/scripts/indicizza_documenti.py import os import sys import argparse import hashlib import shutil import yaml import pickle from dotenv import load_dotenv from langchain_community.document_loaders import PyMuPDFLoader from langchain_chroma import Chroma from langchain_google_genai import GoogleGenerativeAIEmbeddings from langchain.storage import LocalFileStore, EncoderBackedStore from langchain.retrievers import ParentDocumentRetriever from langchain.text_splitter import RecursiveCharacterTextSplitter # --- Caricamento delle API Keys --- # Questo script viene eseguito in un processo separato, quindi deve caricare # autonomamente le variabili d'ambiente dal file .env. project_root_for_env = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) env_path = os.path.join(project_root_for_env, 'data', '.env') if os.path.exists(env_path): load_dotenv(dotenv_path=env_path) print(f"[INDICIZZATORE] Variabili d'ambiente caricate da: {env_path}") # --- Fine Caricamento --- def load_profile_embedding_config(profile: str): """Load embedding provider and model from config/config.yaml for a profile.""" project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) cfg_path = os.path.join(project_root, 'config', 'config.yaml') provider = 'google' model = None try: with open(cfg_path, 'r', encoding='utf-8') as f: cfg = yaml.safe_load(f) profile_services = (cfg or {}).get('profiles', {}).get(profile or '', []) for svc in profile_services: if svc.get('service_name') == 'CognitiveService': c = svc.get('config', {}) provider = c.get('embedding_provider', provider) model = c.get('embedding_model', model) break except Exception: pass return (provider or 'google'), model def _load_default_profile_from_config(): try: project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) cfg_path = os.path.join(project_root, 'config', 'config.yaml') with open(cfg_path, 'r', encoding='utf-8') as f: cfg = yaml.safe_load(f) return (cfg or {}).get('profile') except Exception: return None def resolve_paths(args): """Resolve vectorstore/docstore directories based on args/env/defaults.""" project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) # Priority: explicit args > AGENT_PROFILE/env > defaults profile = args.profile or os.getenv('AGENT_PROFILE') or _load_default_profile_from_config() if args.vectorstore: vector_rel = args.vectorstore elif profile: vector_rel = os.path.join('agents', profile, 'chroma_db') else: vector_rel = 'chroma_db' if args.docstore: doc_rel = args.docstore elif profile: doc_rel = os.path.join('agents', profile, 'doc_store') else: doc_rel = 'doc_store' vectorstore_path = os.path.join(project_root, 'data', vector_rel) docstore_path = os.path.join(project_root, 'data', doc_rel) os.makedirs(vectorstore_path, exist_ok=True) os.makedirs(docstore_path, exist_ok=True) return vectorstore_path, docstore_path def build_embeddings(args): """Instantiate embeddings based on args/config/env.""" # Resolve defaults from config for selected profile provider_default, model_default = load_profile_embedding_config(args.profile or os.getenv('AGENT_PROFILE') or 'aurelio') provider = (getattr(args, "embedding_provider", None) or os.getenv('EMBEDDING_PROVIDER') or provider_default or 'google').lower() model = getattr(args, "embedding_model", None) or os.getenv('EMBEDDING_MODEL') or model_default if provider == 'hf': try: from langchain_community.embeddings import HuggingFaceEmbeddings except Exception as e: print(f"[INDICIZZATORE] ERRORE: manca HuggingFaceEmbeddings ({e}). Installare 'sentence-transformers'.") raise if not model: model = 'sentence-transformers/all-MiniLM-L6-v2' print(f"[INDICIZZATORE] Embeddings provider=hf, model={model}") return HuggingFaceEmbeddings(model_name=model) else: from langchain_google_genai import GoogleGenerativeAIEmbeddings if not model: model = 'models/text-embedding-004' print(f"[INDICIZZATORE] Embeddings provider=google, model={model}") return GoogleGenerativeAIEmbeddings(model=model) def _profile_from_args_env(args): return args.profile or os.getenv('AGENT_PROFILE') or _load_default_profile_from_config() or 'aurelio' def _ensure_source_copy(profile: str, file_path: str) -> str: """Save a copy of the original file under data/agents//source_docs with hash prefix. Returns the path to the saved file (existing or newly copied).""" project_root = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) target_dir = os.path.join(project_root, 'data', 'agents', profile, 'source_docs') os.makedirs(target_dir, exist_ok=True) # Compute sha256 for deduplication h = hashlib.sha256() with open(file_path, 'rb') as f: for chunk in iter(lambda: f.read(1024 * 1024), b''): h.update(chunk) digest = h.hexdigest()[:16] base = os.path.basename(file_path) target_name = f"{digest}__{base}" target_path = os.path.join(target_dir, target_name) if not os.path.exists(target_path): shutil.copy2(file_path, target_path) print(f"[INDICIZZATORE] Copia sorgente salvata: {target_path}") else: print(f"[INDICIZZATORE] Copia sorgente già presente: {target_path}") return target_path def main(file_path, args): """ Funzione principale per indicizzare un documento usando la strategia ParentDocumentRetriever. """ vectorstore_path, docstore_path = resolve_paths(args) print(f"--- Avvio Indicizzazione Avanzata per: {os.path.basename(file_path)} ---") # 0. Salvataggio copia sorgente (default: attivo) if getattr(args, 'save_source', True): profile = _profile_from_args_env(args) file_path = _ensure_source_copy(profile, file_path) # 1. Caricamento del documento print("1/5: Caricamento del documento...") loader = PyMuPDFLoader(file_path) docs = loader.load() # 2. Setup del Vectorstore e del Document Store print("2/5: Setup degli store...") embeddings = build_embeddings(args) vectorstore = Chroma(collection_name="split_parents", embedding_function=embeddings, persist_directory=vectorstore_path) # MODIFICA: Usiamo EncoderBackedStore per serializzare correttamente i documenti # su disco. LocalFileStore da solo si aspetta bytes, non oggetti Document, # causando un TypeError. # La versione di langchain in uso richiede un key_encoder esplicito. Poiché le nostre # chiavi (ID dei documenti) sono già stringhe, usiamo una funzione pass-through. fs = LocalFileStore(docstore_path) store = EncoderBackedStore(fs, lambda key: key, pickle.dumps, pickle.loads) # 3. Definizione degli splitter # Questo splitter crea i documenti "genitori" (pagine intere, in questo caso) parent_splitter = RecursiveCharacterTextSplitter(chunk_size=2000, chunk_overlap=200) # Questo splitter crea i piccoli documenti "figli" che verranno usati per la ricerca child_splitter = RecursiveCharacterTextSplitter(chunk_size=400, chunk_overlap=40) # 4. Creazione del ParentDocumentRetriever print("3/5: Creazione del retriever avanzato...") retriever = ParentDocumentRetriever( vectorstore=vectorstore, docstore=store, child_splitter=child_splitter, parent_splitter=parent_splitter, ) # 5. Esecuzione dell'indicizzazione print("4/5: Aggiunta dei documenti al retriever...") retriever.add_documents(docs, ids=None) print("5/5: Indicizzazione completata con successo!") print("--- Fine Indicizzazione ---") if __name__ == "__main__": parser = argparse.ArgumentParser(description='Indicizza un documento PDF per il RAG (ParentDocumentRetriever).') parser.add_argument('file_path', help='Percorso del file da indicizzare (PDF)') parser.add_argument('--vectorstore', help='Percorso relativo sotto data/ per il vector store', default=None) parser.add_argument('--docstore', help='Percorso relativo sotto data/ per il document store', default=None) parser.add_argument('--profile', help='Nome profilo agente (usa cartelle in data/agents//...)', default=None) parser.add_argument('--no-save-source', dest='save_source', action='store_false', help='Non salvare copia sorgente (default: salva)') parser.set_defaults(save_source=True) args = parser.parse_args() if args.file_path: main(args.file_path, args) else: print("Errore: Fornire il percorso del file da indicizzare come argomento.")