"""File Utility Functions for ChefSystem. Helper functions for file operations and output management. """ import os import shutil import logging import subprocess import mimetypes from pathlib import Path from typing import Optional from datetime import datetime from src.database.models import Output logger = logging.getLogger(__name__) def get_recipe_output_dir(recipe_id: int) -> Path: """Get the output directory for a recipe, creating it if needed. Args: recipe_id: Recipe ID. Returns: Path: Path to recipe output directory (outputs/recipe_{id}). """ output_dir = Path("outputs") / f"recipe_{recipe_id}" try: output_dir.mkdir(parents=True, exist_ok=True) logger.debug(f"Recipe output directory ensured: {output_dir}") return output_dir except Exception as e: logger.error(f"Error creating recipe output directory: {e}", exc_info=True) raise def save_output_file(db_manager, source_path: str, recipe_id: int, execution_notes: str = "") -> Optional[Output]: """Save an output file for a recipe. Copies file to recipe output directory, generates unique name if needed, and creates Output record in database. Args: db_manager: DatabaseManager instance. source_path: Path to source file to copy. recipe_id: Recipe ID. execution_notes: Optional notes about this execution. Returns: Output: Created Output instance, or None on error. """ try: source = Path(source_path) # Validate source file exists if not source.exists(): logger.error(f"Source file does not exist: {source_path}") return None if not source.is_file(): logger.error(f"Source path is not a file: {source_path}") return None # Get recipe output directory output_dir = get_recipe_output_dir(recipe_id) # Generate destination filename (unique if already exists) dest_filename = source.name dest_path = output_dir / dest_filename # If file already exists, add timestamp prefix if dest_path.exists(): timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") dest_filename = f"{timestamp}_{source.name}" dest_path = output_dir / dest_filename logger.info(f"File already exists, using timestamped name: {dest_filename}") # Copy file (preserving metadata) shutil.copy2(source, dest_path) logger.info(f"Copied file: {source} → {dest_path}") # Get file info file_size = dest_path.stat().st_size file_type = dest_path.suffix # Create Output record in database # Use absolute path for dest_path, then make relative abs_dest = dest_path.absolute() abs_cwd = Path.cwd().absolute() relative_path = str(abs_dest.relative_to(abs_cwd)) output = Output.create( db_manager, recipe_id=recipe_id, filename=dest_filename, filepath=relative_path, file_type=file_type, file_size=file_size, execution_notes=execution_notes ) if output: logger.info(f"Created output record: {output.filename} (ID: {output.id})") else: logger.error(f"Failed to create output record for {dest_filename}") # Cleanup: delete copied file if DB insert failed dest_path.unlink(missing_ok=True) return output except Exception as e: logger.error(f"Error saving output file: {e}", exc_info=True) return None def open_file(filepath: str) -> bool: """Open a file with the system's default application. Args: filepath: Path to file to open. Returns: bool: True if successful, False otherwise. """ try: path = Path(filepath) # Validate file exists if not path.exists(): logger.error(f"Cannot open file: does not exist: {filepath}") return False if not path.is_file(): logger.error(f"Cannot open: not a file: {filepath}") return False # Open with system default application # Platform-specific handling if os.name == 'posix': # Linux/Unix subprocess.run(['xdg-open', str(path)], check=True) elif os.name == 'nt': # Windows os.startfile(str(path)) elif os.name == 'darwin': # macOS subprocess.run(['open', str(path)], check=True) else: logger.error(f"Unsupported platform: {os.name}") return False logger.info(f"Opened file: {filepath}") return True except subprocess.CalledProcessError as e: logger.error(f"Error opening file (no application found?): {e}") return False except Exception as e: logger.error(f"Error opening file: {e}", exc_info=True) return False def get_file_info(filepath: str) -> dict: """Get information about a file. Args: filepath: Path to file. Returns: dict: File information (size, extension, mime_type). """ try: path = Path(filepath) if not path.exists(): return { "size": 0, "extension": "", "mime_type": "unknown", "exists": False } # Get file size size = path.stat().st_size # Get extension extension = path.suffix # Get MIME type mime_type, _ = mimetypes.guess_type(str(path)) if mime_type is None: mime_type = "application/octet-stream" return { "size": size, "extension": extension, "mime_type": mime_type, "exists": True } except Exception as e: logger.error(f"Error getting file info: {e}", exc_info=True) return { "size": 0, "extension": "", "mime_type": "unknown", "exists": False, "error": str(e) } def delete_output_file(filepath: str) -> bool: """Delete an output file from the filesystem. Args: filepath: Path to file to delete. Returns: bool: True if successful, False otherwise. """ try: path = Path(filepath) if not path.exists(): logger.warning(f"File does not exist (already deleted?): {filepath}") return True # Already gone, consider success # Delete file path.unlink() logger.info(f"Deleted file: {filepath}") return True except Exception as e: logger.error(f"Error deleting file: {e}", exc_info=True) return False def format_file_size(size_bytes: int) -> str: """Format file size in human-readable format. Args: size_bytes: Size in bytes. Returns: str: Formatted size (e.g., "1.5 MB"). """ for unit in ['B', 'KB', 'MB', 'GB', 'TB']: if size_bytes < 1024.0: return f"{size_bytes:.1f} {unit}" size_bytes /= 1024.0 return f"{size_bytes:.1f} PB"