"""Aggregator module for combining multiple markdown files.""" import os from pathlib import Path from typing import Callable, Optional, Dict, List from .exceptions import InvalidDirectoryError, FileOperationError class Aggregator: """Combines multiple .md files into a single aggregated file.""" SEPARATOR_FORMAT = "--- FILE: {} ---" def __init__(self): """Initialize the Aggregator.""" pass def aggregate( self, source_dir: str, output_file: str, progress_callback: Optional[Callable[[int, str], None]] = None ) -> Dict[str, any]: """ Aggregate all .md files from source directory into a single file. Args: source_dir: Path to directory containing .md files output_file: Path to output aggregated file progress_callback: Optional callback function(percentage, message) Returns: Dictionary with statistics (files_processed, total_size, etc.) Raises: InvalidDirectoryError: If source directory doesn't exist FileOperationError: If file operations fail """ source_path = Path(source_dir) # Validate source directory if not source_path.exists(): raise InvalidDirectoryError(f"Source directory does not exist: {source_dir}") if not source_path.is_dir(): raise InvalidDirectoryError(f"Path is not a directory: {source_dir}") # Find all .md files md_files = self._find_md_files(source_path) total_files = len(md_files) if total_files == 0: # Create empty output file try: Path(output_file).write_text("", encoding="utf-8") return { "files_processed": 0, "total_size": 0, "source_dir": source_dir, "output_file": output_file } except Exception as e: raise FileOperationError(f"Failed to create output file: {e}") # Aggregate files try: total_size = 0 with open(output_file, 'w', encoding='utf-8') as out_f: for index, file_path in enumerate(md_files): # Calculate relative path with forward slashes rel_path = file_path.relative_to(source_path) rel_path_str = str(rel_path).replace('\\', '/') # Update progress if progress_callback: percentage = int((index / total_files) * 100) progress_callback(percentage, f"Processing {file_path.name}") # Write separator separator = self.SEPARATOR_FORMAT.format(rel_path_str) out_f.write(separator + '\n') # Write file content try: content = file_path.read_text(encoding='utf-8') out_f.write(content) # Ensure newline after content if not present if content and not content.endswith('\n'): out_f.write('\n') # Add blank line between files for readability out_f.write('\n') total_size += len(content) except Exception as e: # Skip files that can't be read out_f.write(f"[Error reading file: {e}]\n\n") continue # Final progress update if progress_callback: progress_callback(100, "Aggregation complete") return { "files_processed": total_files, "total_size": total_size, "source_dir": source_dir, "output_file": output_file } except Exception as e: raise FileOperationError(f"Failed to write aggregated file: {e}") def _find_md_files(self, source_path: Path) -> List[Path]: """ Recursively find all .md files in the source directory. Args: source_path: Path to search Returns: Sorted list of Path objects for .md files """ md_files = [] for root, dirs, files in os.walk(source_path): for file in files: if file.endswith('.md'): full_path = Path(root) / file md_files.append(full_path) # Sort for consistent ordering return sorted(md_files)