"""Comprehensive tests for database layer. Tests DatabaseManager, Recipe model, and Output model. """ import pytest import sqlite3 import tempfile import os from pathlib import Path from src.database.db_manager import DatabaseManager from src.database.models import Recipe, Output @pytest.fixture def test_db(): """Create a temporary test database. Yields: DatabaseManager: Initialized test database manager. """ # Create temporary database file temp_fd, temp_path = tempfile.mkstemp(suffix='.db') os.close(temp_fd) # Initialize database db = DatabaseManager(temp_path) db.init_database() yield db # Cleanup db.close() if Path(temp_path).exists(): os.unlink(temp_path) # Reset singleton for next test DatabaseManager._instance = None DatabaseManager._initialized = False class TestDatabaseManager: """Tests for DatabaseManager class.""" def test_singleton_pattern(self, test_db): """Verify that DatabaseManager implements singleton pattern.""" db1 = DatabaseManager("test1.db") db2 = DatabaseManager("test2.db") # Both should be the same instance assert db1 is db2 assert db1 is test_db def test_init_database(self, test_db): """Verify that database tables are created.""" conn = test_db.get_connection() cursor = conn.execute( "SELECT name FROM sqlite_master WHERE type='table' ORDER BY name" ) tables = [row[0] for row in cursor.fetchall()] assert 'recipes' in tables assert 'outputs' in tables def test_foreign_keys_enabled(self, test_db): """Verify that foreign key constraints are enabled.""" conn = test_db.get_connection() cursor = conn.execute("PRAGMA foreign_keys") result = cursor.fetchone() assert result[0] == 1 # Foreign keys should be ON (1) def test_context_manager_commit(self, test_db): """Test context manager commits on success.""" with test_db as conn: conn.execute( "INSERT INTO recipes (name, prompt_text) VALUES (?, ?)", ("Test Recipe", "Test prompt") ) # Verify data was committed recipes = Recipe.get_all(test_db) assert len(recipes) == 1 assert recipes[0].name == "Test Recipe" def test_context_manager_rollback(self, test_db): """Test context manager rolls back on error.""" try: with test_db as conn: conn.execute( "INSERT INTO recipes (name, prompt_text) VALUES (?, ?)", ("Test Recipe", "Test prompt") ) # Force an error raise Exception("Test error") except Exception: pass # Verify data was rolled back recipes = Recipe.get_all(test_db) assert len(recipes) == 0 class TestRecipe: """Tests for Recipe model.""" def test_create_recipe(self, test_db): """Test creating a new recipe.""" recipe = Recipe.create( test_db, name="Test Recipe", prompt_text="This is a test prompt", tags="test,python", description="A test recipe", notes="Some notes" ) assert recipe is not None assert recipe.id is not None assert recipe.name == "Test Recipe" assert recipe.prompt_text == "This is a test prompt" assert recipe.tags == "test,python" assert recipe.description == "A test recipe" assert recipe.notes == "Some notes" assert recipe.created_at is not None assert recipe.updated_at is not None def test_create_duplicate_name(self, test_db): """Test that creating a recipe with duplicate name fails.""" Recipe.create(test_db, name="Duplicate", prompt_text="First") duplicate = Recipe.create(test_db, name="Duplicate", prompt_text="Second") assert duplicate is None def test_get_by_id(self, test_db): """Test retrieving a recipe by ID.""" created = Recipe.create(test_db, name="Test", prompt_text="Prompt") retrieved = Recipe.get_by_id(test_db, created.id) assert retrieved is not None assert retrieved.id == created.id assert retrieved.name == created.name assert retrieved.prompt_text == created.prompt_text def test_get_by_id_not_found(self, test_db): """Test retrieving non-existent recipe returns None.""" result = Recipe.get_by_id(test_db, 9999) assert result is None def test_get_all(self, test_db): """Test retrieving all recipes.""" Recipe.create(test_db, name="Recipe 1", prompt_text="Prompt 1") Recipe.create(test_db, name="Recipe 2", prompt_text="Prompt 2") Recipe.create(test_db, name="Recipe 3", prompt_text="Prompt 3") recipes = Recipe.get_all(test_db) assert len(recipes) == 3 # Verify all recipes are present names = [r.name for r in recipes] assert "Recipe 1" in names assert "Recipe 2" in names assert "Recipe 3" in names def test_get_all_empty(self, test_db): """Test retrieving all recipes when none exist.""" recipes = Recipe.get_all(test_db) assert len(recipes) == 0 def test_update_recipe(self, test_db): """Test updating a recipe.""" recipe = Recipe.create(test_db, name="Original", prompt_text="Original prompt") original_updated_at = recipe.updated_at # Update the recipe success = Recipe.update( test_db, recipe.id, name="Updated", prompt_text="Updated prompt", tags="new,tags" ) assert success is True # Retrieve and verify updated = Recipe.get_by_id(test_db, recipe.id) assert updated.name == "Updated" assert updated.prompt_text == "Updated prompt" assert updated.tags == "new,tags" # updated_at should be set (checking it's not None is sufficient) assert updated.updated_at is not None # In fast tests, timestamps might be identical, so just verify it exists assert updated.updated_at >= original_updated_at def test_update_recipe_not_found(self, test_db): """Test updating non-existent recipe returns False.""" success = Recipe.update(test_db, 9999, name="Does not exist") assert success is False def test_update_recipe_duplicate_name(self, test_db): """Test updating recipe with duplicate name fails.""" Recipe.create(test_db, name="First", prompt_text="Prompt 1") recipe2 = Recipe.create(test_db, name="Second", prompt_text="Prompt 2") # Try to rename recipe2 to "First" success = Recipe.update(test_db, recipe2.id, name="First") assert success is False def test_delete_recipe(self, test_db): """Test deleting a recipe.""" recipe = Recipe.create(test_db, name="To Delete", prompt_text="Prompt") success = Recipe.delete(test_db, recipe.id) assert success is True # Verify it's gone result = Recipe.get_by_id(test_db, recipe.id) assert result is None def test_delete_recipe_not_found(self, test_db): """Test deleting non-existent recipe returns False.""" success = Recipe.delete(test_db, 9999) assert success is False def test_search_by_name(self, test_db): """Test searching recipes by name.""" Recipe.create(test_db, name="Python Tutorial", prompt_text="Prompt") Recipe.create(test_db, name="JavaScript Guide", prompt_text="Prompt") Recipe.create(test_db, name="Python Advanced", prompt_text="Prompt") # Search for "python" (case-insensitive) results = Recipe.search(test_db, query="python") assert len(results) == 2 names = [r.name for r in results] assert "Python Tutorial" in names assert "Python Advanced" in names def test_search_by_tags(self, test_db): """Test filtering recipes by tags.""" Recipe.create(test_db, name="Recipe 1", prompt_text="P", tags="python,web") Recipe.create(test_db, name="Recipe 2", prompt_text="P", tags="javascript") Recipe.create(test_db, name="Recipe 3", prompt_text="P", tags="python,data") # Search for recipes with "python" tag results = Recipe.search(test_db, tags=["python"]) assert len(results) == 2 names = [r.name for r in results] assert "Recipe 1" in names assert "Recipe 3" in names def test_search_combined(self, test_db): """Test combined name and tags search.""" Recipe.create(test_db, name="Python Web", prompt_text="P", tags="python,web") Recipe.create(test_db, name="Python Data", prompt_text="P", tags="python,data") Recipe.create(test_db, name="Web Design", prompt_text="P", tags="web,css") # Search for "Python" AND tag "web" results = Recipe.search(test_db, query="Python", tags=["web"]) assert len(results) == 1 assert results[0].name == "Python Web" def test_search_no_results(self, test_db): """Test search with no matches returns empty list.""" Recipe.create(test_db, name="Test", prompt_text="Prompt") results = Recipe.search(test_db, query="nonexistent") assert len(results) == 0 def test_to_dict(self, test_db): """Test recipe to_dict serialization.""" recipe = Recipe.create(test_db, name="Test", prompt_text="Prompt") data = recipe.to_dict() assert isinstance(data, dict) assert data['name'] == "Test" assert data['prompt_text'] == "Prompt" assert isinstance(data['created_at'], str) # Should be ISO format assert isinstance(data['updated_at'], str) def test_from_dict(self, test_db): """Test recipe from_dict deserialization.""" original = Recipe.create(test_db, name="Test", prompt_text="Prompt") data = original.to_dict() # Recreate from dict restored = Recipe.from_dict(data) assert restored.id == original.id assert restored.name == original.name assert restored.created_at == original.created_at class TestOutput: """Tests for Output model.""" def test_create_output(self, test_db): """Test creating a new output.""" # Create recipe first recipe = Recipe.create(test_db, name="Test Recipe", prompt_text="Prompt") # Create output output = Output.create( test_db, recipe_id=recipe.id, filename="output.pdf", filepath="recipe_1/output.pdf", file_type=".pdf", file_size=1024, execution_notes="Test execution" ) assert output is not None assert output.id is not None assert output.recipe_id == recipe.id assert output.filename == "output.pdf" assert output.filepath == "recipe_1/output.pdf" assert output.file_type == ".pdf" assert output.file_size == 1024 assert output.execution_notes == "Test execution" assert output.generated_at is not None def test_create_output_invalid_recipe(self, test_db): """Test creating output with invalid recipe_id fails.""" output = Output.create( test_db, recipe_id=9999, # Non-existent recipe filename="test.pdf", filepath="test.pdf" ) # Should fail due to foreign key constraint assert output is None def test_get_by_id(self, test_db): """Test retrieving output by ID.""" recipe = Recipe.create(test_db, name="Recipe", prompt_text="Prompt") created = Output.create(test_db, recipe_id=recipe.id, filename="test.pdf", filepath="test.pdf") retrieved = Output.get_by_id(test_db, created.id) assert retrieved is not None assert retrieved.id == created.id assert retrieved.filename == created.filename def test_get_by_recipe(self, test_db): """Test retrieving all outputs for a recipe.""" recipe = Recipe.create(test_db, name="Recipe", prompt_text="Prompt") # Create multiple outputs Output.create(test_db, recipe_id=recipe.id, filename="output1.pdf", filepath="path1") Output.create(test_db, recipe_id=recipe.id, filename="output2.pdf", filepath="path2") Output.create(test_db, recipe_id=recipe.id, filename="output3.pdf", filepath="path3") outputs = Output.get_by_recipe(test_db, recipe.id) assert len(outputs) == 3 filenames = [o.filename for o in outputs] assert "output1.pdf" in filenames assert "output2.pdf" in filenames assert "output3.pdf" in filenames def test_get_all(self, test_db): """Test retrieving all outputs.""" recipe1 = Recipe.create(test_db, name="Recipe 1", prompt_text="P1") recipe2 = Recipe.create(test_db, name="Recipe 2", prompt_text="P2") Output.create(test_db, recipe_id=recipe1.id, filename="out1.pdf", filepath="p1") Output.create(test_db, recipe_id=recipe2.id, filename="out2.pdf", filepath="p2") outputs = Output.get_all(test_db) assert len(outputs) == 2 def test_update_output(self, test_db): """Test updating an output.""" recipe = Recipe.create(test_db, name="Recipe", prompt_text="Prompt") output = Output.create(test_db, recipe_id=recipe.id, filename="original.pdf", filepath="path") success = Output.update( test_db, output.id, filename="updated.pdf", execution_notes="Updated notes" ) assert success is True updated = Output.get_by_id(test_db, output.id) assert updated.filename == "updated.pdf" assert updated.execution_notes == "Updated notes" def test_delete_output(self, test_db): """Test deleting an output.""" recipe = Recipe.create(test_db, name="Recipe", prompt_text="Prompt") output = Output.create(test_db, recipe_id=recipe.id, filename="test.pdf", filepath="path") success = Output.delete(test_db, output.id) assert success is True result = Output.get_by_id(test_db, output.id) assert result is None def test_cascade_delete(self, test_db): """Test CASCADE delete: deleting recipe deletes its outputs.""" recipe = Recipe.create(test_db, name="Recipe", prompt_text="Prompt") # Create multiple outputs Output.create(test_db, recipe_id=recipe.id, filename="out1.pdf", filepath="p1") Output.create(test_db, recipe_id=recipe.id, filename="out2.pdf", filepath="p2") Output.create(test_db, recipe_id=recipe.id, filename="out3.pdf", filepath="p3") # Verify outputs exist outputs_before = Output.get_by_recipe(test_db, recipe.id) assert len(outputs_before) == 3 # Delete the recipe Recipe.delete(test_db, recipe.id) # Verify outputs are automatically deleted (CASCADE) outputs_after = Output.get_by_recipe(test_db, recipe.id) assert len(outputs_after) == 0 def test_to_dict(self, test_db): """Test output to_dict serialization.""" recipe = Recipe.create(test_db, name="Recipe", prompt_text="Prompt") output = Output.create(test_db, recipe_id=recipe.id, filename="test.pdf", filepath="path") data = output.to_dict() assert isinstance(data, dict) assert data['filename'] == "test.pdf" assert isinstance(data['generated_at'], str) def test_from_dict(self, test_db): """Test output from_dict deserialization.""" recipe = Recipe.create(test_db, name="Recipe", prompt_text="Prompt") original = Output.create(test_db, recipe_id=recipe.id, filename="test.pdf", filepath="path") data = original.to_dict() restored = Output.from_dict(data) assert restored.id == original.id assert restored.filename == original.filename