#!/usr/bin/env python3 """ European Expansion Test Script ================================ Tests CAC 40 (France) and DAX 40 (Germany) ticker integration with EUR currency validation. Usage: python scripts/test_european_expansion.py """ import sys from pathlib import Path sys.path.insert(0, str(Path(__file__).parent.parent)) import pandas as pd from loguru import logger from sqlalchemy import desc from src.database.db_manager import DatabaseManager from src.data_collector.yahoo_collector import YahooFinanceCollector from src.analysis.warren_analyzer import WarrenAnalyzer from src.database.models import Stock, FundamentalData, PriceData from src.config.cac40_stocks import CAC_40_STOCKS from src.config.dax40_stocks import DAX_40_STOCKS # Test subset: 5 tickers per market (diversified sectors) TEST_TICKERS = { "CAC 40": [ ("MC.PA", "LVMH", "Luxury"), ("AI.PA", "Air Liquide", "Basic Materials"), ("TTE.PA", "TotalEnergies", "Energy"), ("SAN.PA", "Sanofi", "Healthcare"), ("BNP.PA", "BNP Paribas", "Banking"), ], "DAX 40": [ ("SAP.DE", "SAP", "Technology"), ("SIE.DE", "Siemens", "Industrials"), ("ALV.DE", "Allianz", "Insurance"), ("MBG.DE", "Mercedes-Benz", "Automotive"), ("DTE.DE", "Deutsche Telekom", "Telecommunications"), ], } def collect_test_data(): """ Download fundamental and price data for test tickers. Returns: tuple: (success_count, fail_count, currency_fails) """ logger.info("=" * 70) logger.info("PHASE 1: Data Collection") logger.info("=" * 70) db = DatabaseManager() collector = YahooFinanceCollector(db) success_count = 0 fail_count = 0 currency_fails = [] for market, tickers in TEST_TICKERS.items(): logger.info(f"\nšŸ“Š Testing {market} tickers...") for ticker, name, sector in tickers: logger.info(f"\n Processing {ticker} ({name})...") # Step 1: Add stock to database (name/sector auto-fetched from Yahoo) if not collector.add_stock(ticker, market=market): logger.error(f" āŒ Failed to add {ticker}") fail_count += 1 continue # Step 2: Collect fundamental data (includes currency check) if not collector.collect_fundamental_data(ticker): logger.warning(f" ā›” Failed fundamental data collection for {ticker}") currency_fails.append(ticker) fail_count += 1 continue # Step 3: Collect price data (last 90 days) if not collector.collect_historical_prices(ticker, days=90): logger.warning(f" āš ļø Failed price data collection for {ticker}") # Don't count as total fail, fundamentals might be OK logger.success(f" āœ… {ticker} data collected successfully") success_count += 1 logger.info(f"\n{'=' * 70}") logger.info(f"Data Collection Summary:") logger.info(f" āœ… Success: {success_count}/10") logger.info(f" āŒ Failed: {fail_count}/10") logger.info(f" ā›” Currency Fails: {len(currency_fails)}") if currency_fails: logger.warning(f" {', '.join(currency_fails)}") logger.info(f"{'=' * 70}\n") return success_count, fail_count, currency_fails def analyze_stocks(): """ Run Warren analysis on collected tickers. Returns: pd.DataFrame: Analysis results """ logger.info("=" * 70) logger.info("PHASE 2: Warren Analysis") logger.info("=" * 70) db = DatabaseManager() analyzer = WarrenAnalyzer() results = [] with db.get_session() as session: for market, tickers in TEST_TICKERS.items(): logger.info(f"\nšŸ“ˆ Analyzing {market} tickers...") for ticker, name, sector in tickers: # Query stock data stock = session.query(Stock).filter_by(ticker=ticker).first() if not stock: logger.warning(f" āš ļø {ticker} not found in database") continue fundamentals = session.query(FundamentalData).filter_by( stock_id=stock.id ).order_by(desc(FundamentalData.date)).first() latest_price = session.query(PriceData).filter_by( stock_id=stock.id ).order_by(desc(PriceData.date)).first() if not fundamentals or not latest_price: logger.warning(f" āš ļø {ticker} missing data") continue # Build stock_data dict (Schema v2) stock_data = { 'ticker': ticker, 'name': stock.name, 'sector': stock.sector, 'price': latest_price.close, 'pe_ratio': fundamentals.pe_ratio, 'pb_ratio': fundamentals.pb_ratio, 'roe': fundamentals.roe, 'debt_to_equity': fundamentals.debt_to_equity, 'market_cap': fundamentals.market_cap, 'net_income': fundamentals.net_income, 'operating_cashflow': fundamentals.operating_cashflow, 'operating_margin': fundamentals.operating_margin, 'free_cashflow': fundamentals.free_cashflow, 'enterprise_value': fundamentals.enterprise_value, 'ebitda': fundamentals.ebitda, 'net_debt': fundamentals.net_debt, } # Run analysis analysis = analyzer.analyze(stock_data) # Store result results.append({ 'Market': market, 'Ticker': ticker, 'Name': name, 'Sector': sector, 'Score': analysis['score'], 'Rating': analysis['valutazione'], 'Fair Value': analysis['fair_value'], 'Price': analysis['current_price'], 'MoS (%)': analysis['margin_of_safety'], 'Reasoning': analysis['ragionamento'][:60] + "..." if len(analysis['ragionamento']) > 60 else analysis['ragionamento'] }) # Log result score_emoji = "🟢" if analysis['score'] >= 70 else "🟔" if analysis['score'] >= 50 else "šŸ”“" logger.info(f" {score_emoji} {ticker}: Score {analysis['score']}/100 - {analysis['valutazione']}") df = pd.DataFrame(results) return df def print_results(df: pd.DataFrame): """ Print formatted results with filtering. Args: df: Analysis results DataFrame """ logger.info("\n" + "=" * 70) logger.info("PHASE 3: Results Summary") logger.info("=" * 70) if df.empty: logger.warning("āš ļø No results to display") return # Filter: Score > 50 (BUY/STRONG BUY candidates) buy_candidates = df[df['Score'] > 50].copy() if buy_candidates.empty: logger.warning("\nāš ļø No BUY candidates found (Score > 50)") else: # Sort by Score DESC, then MoS DESC buy_candidates = buy_candidates.sort_values( ['Score', 'MoS (%)'], ascending=[False, False] ) logger.info(f"\nšŸ“Š BUY Candidates (Score > 50): {len(buy_candidates)}\n") # Format for console pd.set_option('display.max_columns', None) pd.set_option('display.width', 200) pd.set_option('display.max_colwidth', 30) print(buy_candidates[['Market', 'Ticker', 'Name', 'Sector', 'Score', 'Rating', 'Fair Value', 'Price', 'MoS (%)']].to_string(index=False)) # Critical Danger Count critical = df[df['Score'] == 5] if not critical.empty: logger.warning(f"\nā›” CRITICAL DANGER stocks: {len(critical)}") for _, row in critical.iterrows(): logger.warning(f" {row['Ticker']} - {row['Reasoning']}") # Overall stats logger.info(f"\n{'=' * 70}") logger.info("Overall Statistics:") logger.info(f" Total Analyzed: {len(df)}/10") logger.info(f" BUY/STRONG BUY (Score > 50): {len(buy_candidates)}") logger.info(f" AVOID (Score 25-50): {len(df[(df['Score'] >= 25) & (df['Score'] <= 50)])}") logger.info(f" CRITICAL DANGER (Score 5): {len(critical)}") logger.info(f" Average Score: {df['Score'].mean():.1f}/100") logger.info(f"{'=' * 70}\n") def main(): """Main execution flow""" logger.info("\n" + "=" * 70) logger.info("šŸ‡ŖšŸ‡ŗ EUROPEAN MARKET EXPANSION TEST") logger.info("CAC 40 (France) + DAX 40 (Germany)") logger.info("=" * 70) # Phase 1: Data Collection success, fail, currency_fails = collect_test_data() if success == 0: logger.error("āŒ No data collected successfully. Aborting analysis.") return 1 # Phase 2: Analysis results_df = analyze_stocks() # Phase 3: Results print_results(results_df) # Final verdict logger.info("\n" + "=" * 70) if success >= 8 and len(currency_fails) <= 2: logger.success("āœ… EUROPEAN EXPANSION TEST PASSED") logger.info(" System ready for CAC 40 and DAX 40 integration") else: logger.warning("āš ļø EUROPEAN EXPANSION TEST PARTIAL") logger.info(f" {success}/10 tickers successful") logger.info(f" Review currency failures: {', '.join(currency_fails) if currency_fails else 'None'}") logger.info("=" * 70 + "\n") return 0 if __name__ == "__main__": sys.exit(main())