#!/usr/bin/env python3 """ Backtest rapido (intraday) delle raccomandazioni Warren Analyzer su dati storici. Note: - I dati fondamentali nel DB partono da ~2022-06-30, quindi il backtest coprirà da quella data in avanti. - Il backtest è read-only: non modifica il DB di produzione. - Output: CSV in logs/backtest_results.csv con tutte le raccomandazioni per snapshot mensili. """ import csv import os import sqlite3 import sys from datetime import datetime, date, timedelta from typing import Dict, Optional, Tuple # Ensure repo root is on sys.path ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) if ROOT not in sys.path: sys.path.insert(0, ROOT) from src.analysis.warren_analyzer import WarrenAnalyzer # type: ignore from warren_scan import FTSE_MIB_STOCKS # reuse universe DB_PATH = "data/trading_system.db" OUTPUT_CSV = "logs/backtest_results.csv" SNAPSHOT_STEP_DAYS = 30 # approx monthly snapshots def get_stock_ids(conn) -> Dict[str, int]: cur = conn.cursor() cur.execute("SELECT id, ticker FROM stocks") return {row[1]: row[0] for row in cur.fetchall()} def get_date_bounds(conn) -> Tuple[Optional[date], Optional[date]]: cur = conn.cursor() cur.execute("SELECT MIN(date), MAX(date) FROM fundamental_data") min_dt, max_dt = cur.fetchone() start = datetime.fromisoformat(min_dt).date() if min_dt else None end = datetime.fromisoformat(max_dt).date() if max_dt else None return start, end def last_on_or_before(conn, table: str, stock_id: int, snap: date) -> Optional[sqlite3.Row]: cur = conn.cursor() cur.execute( f"SELECT * FROM {table} WHERE stock_id = ? AND date <= ? ORDER BY date DESC LIMIT 1", (stock_id, snap), ) return cur.fetchone() def build_stock_data(price_row, fund_row, sector: str) -> Dict: return { "ticker": None, # filled by caller "name": None, # filled by caller "sector": sector or "N/A", "price": price_row["close"], "price_date": price_row["date"], "pe_ratio": fund_row["pe_ratio"], "pb_ratio": fund_row["pb_ratio"], "roe": fund_row["roe"], "debt_to_equity": fund_row["debt_to_equity"], "dividend_yield": fund_row["dividend_yield"], "revenue_growth": fund_row["revenue_growth"], "earnings_growth": fund_row["earnings_growth"], "market_cap": fund_row["market_cap"], "beta": fund_row["beta"], "free_cashflow": fund_row["free_cashflow"], "enterprise_value": fund_row["enterprise_value"], "ebitda": fund_row["ebitda"], "gross_margin": fund_row["gross_margin"], "operating_margin": fund_row["operating_margin"], "net_debt": fund_row["net_debt"], "dividend_rate": fund_row["dividend_rate"], "shares_outstanding": fund_row["shares_outstanding"], "total_debt": fund_row["total_debt"], "total_cash": fund_row["total_cash"], "avg_fcf_yield_3y": None, # non ricostruiamo la media storica qui "debt_to_equity_reported": fund_row["debt_to_equity"], } def main(): if not os.path.exists(DB_PATH): raise SystemExit(f"DB non trovato: {DB_PATH}") os.makedirs("logs", exist_ok=True) conn = sqlite3.connect(DB_PATH) conn.row_factory = sqlite3.Row stock_ids = get_stock_ids(conn) if not stock_ids: raise SystemExit("Nessun ticker trovato nel DB.") start_date, end_date = get_date_bounds(conn) if not start_date or not end_date: raise SystemExit("Dati fondamentali non disponibili per definire il periodo di backtest.") # Limita a fine 2024 come richiesto backtest_end = min(end_date, date(2024, 12, 31)) snap = start_date analyzer = WarrenAnalyzer() rows = [] while snap <= backtest_end: for ticker in FTSE_MIB_STOCKS: stock_id = stock_ids.get(ticker) if stock_id is None: continue price_row = last_on_or_before(conn, "price_data", stock_id, snap) fund_row = last_on_or_before(conn, "fundamental_data", stock_id, snap) if not price_row or not fund_row: continue # skip if missing data # Fetch sector/name from stocks table cur = conn.cursor() cur.execute("SELECT name, sector FROM stocks WHERE id = ?", (stock_id,)) stock_meta = cur.fetchone() name = stock_meta["name"] if stock_meta else ticker sector = stock_meta["sector"] if stock_meta else "N/A" stock_data = build_stock_data(price_row, fund_row, sector) stock_data["ticker"] = ticker stock_data["name"] = name try: result = analyzer.analyze(stock_data) except Exception as e: # log minimal info and continue print(f"[{snap}] Skip {ticker}: {e}") continue rows.append( { "snapshot_date": snap.isoformat(), "ticker": ticker, "name": name, "score": result["score"], "valutazione": result["valutazione"], "price": result["current_price"], "fair_value": result["fair_value"], "margin": result["margin_of_safety"], } ) snap += timedelta(days=SNAPSHOT_STEP_DAYS) # Write CSV with open(OUTPUT_CSV, "w", newline="", encoding="utf-8") as f: writer = csv.DictWriter( f, fieldnames=[ "snapshot_date", "ticker", "name", "score", "valutazione", "price", "fair_value", "margin", ], ) writer.writeheader() writer.writerows(rows) # Simple aggregate summary total_snapshots = len({r["snapshot_date"] for r in rows}) total_rows = len(rows) buy = sum(1 for r in rows if r["valutazione"] in ("BUY", "STRONG BUY")) print(f"Backtest completato: {total_snapshots} snapshot, {total_rows} raccomandazioni.") print(f"BUY/STRONG BUY totali: {buy}") print(f"Output CSV: {OUTPUT_CSV}") if __name__ == "__main__": main()