""" Yahoo Finance Data Collector Sprint 1: Foundation - Market data collection from Yahoo Finance """ from datetime import datetime, timedelta from typing import Dict, List, Optional, Tuple import pandas as pd import yfinance as yf from loguru import logger from src.database.db_manager import DatabaseManager from src.database.models import FundamentalData, PriceData, Stock class YahooFinanceCollector: """Collects market data from Yahoo Finance for Italian stocks""" def __init__(self, db_manager: DatabaseManager, max_retries: int = 3, retry_delay: int = 5): """ Initialize Yahoo Finance collector Args: db_manager: Database manager instance max_retries: Maximum number of retry attempts retry_delay: Delay between retries in seconds """ self.db_manager = db_manager self.max_retries = max_retries self.retry_delay = retry_delay def _download_with_retry(self, ticker: str, start: datetime, end: datetime, progress: bool = False, auto_adjust: bool = False): """ Download data with retry logic for network failures. Args: ticker: Stock ticker symbol start: Start date end: End date progress: Show progress bar auto_adjust: Auto adjust prices Returns: DataFrame with price data or empty DataFrame on failure """ import time for attempt in range(self.max_retries): try: df = yf.download(ticker, start=start, end=end, progress=progress, auto_adjust=auto_adjust) return df except Exception as e: if attempt < self.max_retries - 1: wait_time = self.retry_delay * (2 ** attempt) # Exponential backoff logger.warning(f"Download failed for {ticker} (attempt {attempt + 1}/{self.max_retries}): {str(e)}. Retrying in {wait_time}s...") time.sleep(wait_time) else: logger.error(f"Download failed for {ticker} after {self.max_retries} attempts: {str(e)}") return pd.DataFrame() # Return empty DataFrame on final failure return pd.DataFrame() def add_stock(self, ticker: str, priority: str = "primary", market: str = "Borsa Italiana") -> Optional[Stock]: """ Add a new stock to the database Args: ticker: Stock ticker symbol (e.g., 'ENEL.MI', 'MC.PA', 'SAP.DE') priority: Stock priority level ('primary' or 'secondary') market: Stock market name (default: 'Borsa Italiana') Returns: Stock object if successful, None otherwise """ try: # Fetch stock info from Yahoo Finance stock_info = self._get_stock_info(ticker) if not stock_info: logger.warning(f"Could not fetch info for ticker {ticker}") return None with self.db_manager.get_session() as session: # Check if stock already exists existing_stock = session.query(Stock).filter_by(ticker=ticker).first() if existing_stock: logger.info(f"Stock {ticker} already exists in database") # Expunge to avoid detached instance error session.expunge(existing_stock) return existing_stock # Create new stock stock = Stock( ticker=ticker, name=stock_info.get("longName", ticker), sector=stock_info.get("sector"), industry=stock_info.get("industry"), market=market, priority=priority, is_active=True, ) session.add(stock) session.commit() session.refresh(stock) logger.info(f"Added stock: {ticker} - {stock.name}") # Expunge to avoid detached instance error when session closes session.expunge(stock) return stock except Exception as e: logger.error(f"Error adding stock {ticker}: {e}") return None def collect_historical_prices( self, ticker: str, days: int = 365, force_update: bool = False ) -> bool: """ Collect historical price data for a stock Args: ticker: Stock ticker symbol days: Number of days of historical data to collect force_update: Force update even if data exists Returns: True if successful, False otherwise """ try: with self.db_manager.get_session() as session: # Get stock from database stock = session.query(Stock).filter_by(ticker=ticker).first() if not stock: logger.error(f"Stock {ticker} not found in database") return False # Check if we already have data if not force_update: existing_count = ( session.query(PriceData) .filter_by(stock_id=stock.id) .count() ) if existing_count > 0: logger.info(f"Price data for {ticker} already exists ({existing_count} records)") return True else: # If force_update, delete all existing data first deleted_count = ( session.query(PriceData) .filter_by(stock_id=stock.id) .delete() ) session.commit() if deleted_count > 0: logger.info(f"🗑️ Deleted {deleted_count} existing price records for {ticker} (force_update mode)") else: logger.info(f"🆕 No existing data for {ticker}, starting fresh download") # Download historical data end_date = datetime.now() start_date = end_date - timedelta(days=days) logger.info(f"Downloading price data for {ticker} from {start_date.date()} to {end_date.date()}") df = self._download_with_retry(ticker, start=start_date, end=end_date, progress=False, auto_adjust=False) if df.empty: logger.warning(f"No price data available for {ticker}") return False logger.info(f"Downloaded {len(df)} rows from Yahoo Finance for {ticker}") # Validation: Check if we got enough data expected_rows = days / 365.0 * 252 # ~252 trading days per year if len(df) < expected_rows * 0.5: # Less than 50% of expected logger.warning( f"⚠️ WARNING: Downloaded only {len(df)} rows for {ticker}, " f"expected ~{int(expected_rows)} rows for {days} days. " f"Yahoo Finance may have limited data for this ticker." ) else: logger.info(f"✓ Data validation passed: {len(df)} rows is adequate for {days} days period") # Flatten multi-index columns if present if isinstance(df.columns, pd.MultiIndex): df.columns = df.columns.get_level_values(0) # Insert or update data in database records_added = 0 records_updated = 0 for date, row in df.iterrows(): # Extract values safely (handle both scalar and Series) def get_value(row, col): try: val = row[col] if isinstance(val, pd.Series): val = val.iloc[0] if len(val) > 0 else None return val if pd.notna(val) else None except (KeyError, IndexError): return None # Check if record already exists existing = ( session.query(PriceData) .filter_by(stock_id=stock.id, date=date) .first() ) if existing and not force_update: # Update existing record with new data and refresh created_at existing.open = float(get_value(row, "Open")) if get_value(row, "Open") is not None else existing.open existing.high = float(get_value(row, "High")) if get_value(row, "High") is not None else existing.high existing.low = float(get_value(row, "Low")) if get_value(row, "Low") is not None else existing.low existing.close = float(get_value(row, "Close")) existing.volume = int(get_value(row, "Volume")) if get_value(row, "Volume") is not None else existing.volume existing.adj_close = float(get_value(row, "Adj Close")) if get_value(row, "Adj Close") is not None else existing.adj_close existing.created_at = datetime.now() # Update timestamp to mark fresh data records_updated += 1 else: # Create new record price_data = PriceData( stock_id=stock.id, date=date, open=float(get_value(row, "Open")) if get_value(row, "Open") is not None else None, high=float(get_value(row, "High")) if get_value(row, "High") is not None else None, low=float(get_value(row, "Low")) if get_value(row, "Low") is not None else None, close=float(get_value(row, "Close")), volume=int(get_value(row, "Volume")) if get_value(row, "Volume") is not None else None, adj_close=float(get_value(row, "Adj Close")) if get_value(row, "Adj Close") is not None else None, data_source="yfinance", ) session.add(price_data) records_added += 1 session.commit() if records_updated > 0: logger.info(f"Added {records_added} new, updated {records_updated} existing price records for {ticker}") else: logger.info(f"Added {records_added} price records for {ticker}") return True except Exception as e: logger.error(f"Error collecting historical prices for {ticker}: {e}") return False def collect_fundamental_data(self, ticker: str) -> bool: """ Collect fundamental data for a stock Args: ticker: Stock ticker symbol Returns: True if successful, False otherwise """ try: with self.db_manager.get_session() as session: # Get stock from database stock = session.query(Stock).filter_by(ticker=ticker).first() if not stock: logger.error(f"Stock {ticker} not found in database") return False # Get stock info stock_info = self._get_stock_info(ticker) if not stock_info: logger.warning(f"No fundamental data available for {ticker}") return False # Currency Normalization Protocol ACCEPTED_CURRENCIES = ["EUR", "USD"] currency = stock_info.get("currency") if currency not in ACCEPTED_CURRENCIES: logger.warning(f"⛔ Skipping {ticker}: Currency is '{currency}', expected {ACCEPTED_CURRENCIES}.") return False # Extract base fields total_debt = stock_info.get("totalDebt") total_cash = stock_info.get("totalCash") operating_cf = stock_info.get("operatingCashflow") capex = stock_info.get("capitalExpenditures") # Calculate derived fields net_debt = None if total_debt is not None and total_cash is not None: net_debt = total_debt - total_cash # Free cashflow: prefer direct value, fallback to calculation free_cashflow = stock_info.get("freeCashflow") if free_cashflow is None and operating_cf is not None and capex is not None: # CapEx is usually negative, so we add it (which subtracts) free_cashflow = operating_cf + capex # ==================================================================== # DEBT/EQUITY CALCULATION - Multi-layer approach for maximum reliability # ==================================================================== # Layer 1: Get Yahoo's reported D/E (may be unreliable - percentage format issue) debt_to_equity_raw = stock_info.get("debtToEquity") # Layer 2: Calculate D/E from balance sheet (PRIMARY SOURCE) shareholders_equity = None debt_to_equity_calculated = None try: ticker_obj = yf.Ticker(ticker) balance_sheet = ticker_obj.balance_sheet if not balance_sheet.empty and len(balance_sheet.columns) > 0: # Get most recent column (latest period) latest = balance_sheet.columns[0] # Try to get shareholders equity (multiple possible field names) if 'Stockholders Equity' in balance_sheet.index: shareholders_equity = balance_sheet.loc['Stockholders Equity', latest] elif 'Total Equity Gross Minority Interest' in balance_sheet.index: shareholders_equity = balance_sheet.loc['Total Equity Gross Minority Interest', latest] elif 'Stockholder Equity' in balance_sheet.index: # Alternative spelling shareholders_equity = balance_sheet.loc['Stockholder Equity', latest] # Calculate D/E if we have both debt and equity if total_debt is not None and shareholders_equity is not None and shareholders_equity > 0: debt_to_equity_calculated = total_debt / shareholders_equity logger.debug( f"{ticker}: D/E calculated from balance sheet: {debt_to_equity_calculated:.4f} " f"(debt={total_debt:,.0f}, equity={shareholders_equity:,.0f})" ) except Exception as e: logger.warning(f"{ticker}: Could not fetch balance sheet for D/E calculation: {e}") # Layer 3: Determine which D/E to use (with validation) debt_to_equity_final = None debt_to_equity_source = None if debt_to_equity_calculated is not None: # PRIMARY: Use calculated value from balance sheet debt_to_equity_final = debt_to_equity_calculated debt_to_equity_source = "balance_sheet" # Cross-validate with Yahoo's reported value if debt_to_equity_raw is not None: # Normalize Yahoo's value for comparison (if >10, likely percentage) yahoo_normalized = debt_to_equity_raw / 100.0 if debt_to_equity_raw > 10 else debt_to_equity_raw discrepancy = abs(debt_to_equity_calculated - yahoo_normalized) if discrepancy > 0.5: # >50% difference logger.warning( f"{ticker}: D/E discrepancy detected - " f"Balance Sheet: {debt_to_equity_calculated:.4f}, " f"Yahoo: {yahoo_normalized:.4f} (raw: {debt_to_equity_raw:.2f}), " f"Diff: {discrepancy:.4f}" ) elif debt_to_equity_raw is not None: # FALLBACK: Use Yahoo's value with normalization if debt_to_equity_raw > 10: # Likely percentage format (e.g., 143.2 = 1.432) debt_to_equity_final = debt_to_equity_raw / 100.0 debt_to_equity_source = "yahoo_normalized" logger.warning( f"{ticker}: D/E from Yahoo appears to be percentage " f"({debt_to_equity_raw:.2f}), normalizing to {debt_to_equity_final:.4f}" ) else: # Use as-is (likely ratio format) debt_to_equity_final = debt_to_equity_raw debt_to_equity_source = "yahoo_raw" else: # No D/E data available debt_to_equity_source = "missing" logger.debug(f"{ticker}: No D/E data available from any source") # Create fundamental data record (Schema v2) fundamental = FundamentalData( stock_id=stock.id, date=datetime.now(), # Basic valuation metrics pe_ratio=stock_info.get("trailingPE"), pb_ratio=stock_info.get("priceToBook"), ps_ratio=stock_info.get("priceToSalesTrailing12Months"), peg_ratio=stock_info.get("pegRatio"), ev_to_ebitda=self._safe_extract_ratio(stock_info.get("enterpriseToEbitda")), # Profitability metrics profit_margin=stock_info.get("profitMargins"), operating_margin=stock_info.get("operatingMargins"), roe=stock_info.get("returnOnEquity"), roa=stock_info.get("returnOnAssets"), # Dividend metrics dividend_yield=stock_info.get("dividendYield") / 100 if stock_info.get("dividendYield") else None, payout_ratio=stock_info.get("payoutRatio"), # Financial health debt_to_equity=debt_to_equity_final, # Final processed value debt_to_equity_raw=debt_to_equity_raw, # Original Yahoo value debt_to_equity_calculated=debt_to_equity_calculated, # Calculated from balance sheet debt_to_equity_source=debt_to_equity_source, # Source tracking shareholders_equity=shareholders_equity, # Equity from balance sheet current_ratio=stock_info.get("currentRatio"), quick_ratio=stock_info.get("quickRatio"), # Growth metrics revenue_growth=stock_info.get("revenueGrowth"), earnings_growth=stock_info.get("earningsGrowth"), # Market metrics market_cap=stock_info.get("marketCap"), beta=stock_info.get("beta"), # === SCHEMA V2: ADVANCED FUNDAMENTAL METRICS === # Capital structure total_debt=total_debt, total_cash=total_cash, net_debt=net_debt, enterprise_value=stock_info.get("enterpriseValue"), # Cash flow metrics operating_cashflow=operating_cf, capital_expenditures=capex, free_cashflow=free_cashflow, # Margin metrics gross_margin=stock_info.get("grossMargins"), ebitda_margin=stock_info.get("ebitdaMargins"), ebitda=stock_info.get("ebitda"), # Revenue/earnings total_revenue=stock_info.get("totalRevenue"), ebit=stock_info.get("ebit"), net_income=stock_info.get("netIncomeToCommon"), trailing_eps=stock_info.get("trailingEps"), # Shares/dividend details shares_outstanding=stock_info.get("sharesOutstanding"), dividend_rate=stock_info.get("dividendRate"), ex_dividend_date=self._parse_date(stock_info.get("exDividendDate")), dividend_payment_date=self._parse_date(stock_info.get("lastDividendDate")), # Schema version schema_version=4, # Updated to v4 for advanced quality metrics data_source="yfinance", ) # === SCHEMA V4: Calculate and save advanced quality metrics === try: from src.analysis.warren_analyzer import WarrenAnalyzer analyzer = WarrenAnalyzer() fundamental.roic = analyzer.calculate_roic(ticker) fundamental.interest_coverage = analyzer.calculate_interest_coverage(ticker, stock.sector or '') fundamental.piotroski_fscore = analyzer.calculate_piotroski_fscore(ticker) logger.debug(f"{ticker}: Schema v4 metrics - ROIC: {fundamental.roic}, " f"Interest Coverage: {fundamental.interest_coverage}, " f"F-Score: {fundamental.piotroski_fscore}") except Exception as e: logger.warning(f"{ticker}: Could not calculate Schema v4 metrics: {e}") # Continue anyway - metrics will be None session.add(fundamental) session.commit() # Backfill historical FCF (last 3 available periods) to support 3y averages self._backfill_historical_fcf(session, stock, stock_info) logger.info(f"Added fundamental data for {ticker}") return True except Exception as e: logger.error(f"Error collecting fundamental data for {ticker}: {e}") return False def _backfill_historical_fcf(self, session, stock: Stock, stock_info: Dict): """ Store historical free cash flow entries (up to 3 periods) so that the analyzer can compute a 3y average FCF yield for cyclical sectors. Uses yfinance cashflow statement; falls back silently if unavailable. """ try: ticker_obj = yf.Ticker(stock.ticker) cashflow_df = ticker_obj.cashflow if cashflow_df is None or cashflow_df.empty: return # Try to locate the free cash flow row (case-insensitive) row_keys = [idx for idx in cashflow_df.index if str(idx).lower() in ("freecashflow", "free cash flow")] if not row_keys: return fcf_row = cashflow_df.loc[row_keys[0]] # Columns are periods; take the most recent 3 for col in list(cashflow_df.columns)[:3]: value = fcf_row.get(col) if value is None or pd.isna(value): continue # Convert column (Timestamp/Period) to datetime try: period_date = pd.to_datetime(col).to_pydatetime() except Exception: continue # Skip if we already stored a record for this period existing = ( session.query(FundamentalData) .filter_by(stock_id=stock.id, date=period_date) .first() ) if existing: continue fundamental_hist = FundamentalData( stock_id=stock.id, date=period_date, free_cashflow=float(value), market_cap=stock_info.get("marketCap"), enterprise_value=stock_info.get("enterpriseValue"), schema_version=2, data_source="yfinance_cashflow", ) session.add(fundamental_hist) session.commit() except Exception as e: logger.warning(f"Could not backfill historical FCF for {stock.ticker}: {e}") def update_daily_prices(self, tickers: List[str]) -> Dict[str, bool]: """ Update daily prices for multiple stocks Args: tickers: List of stock ticker symbols Returns: Dictionary with ticker as key and success status as value """ results = {} for ticker in tickers: try: logger.info(f"Updating daily prices for {ticker}") with self.db_manager.get_session() as session: stock = session.query(Stock).filter_by(ticker=ticker).first() if not stock: logger.warning(f"Stock {ticker} not found, skipping") results[ticker] = False continue # Get latest price data latest_data = ( session.query(PriceData) .filter_by(stock_id=stock.id) .order_by(PriceData.date.desc()) .first() ) # Determine start date for update if latest_data: start_date = latest_data.date + timedelta(days=1) else: start_date = datetime.now() - timedelta(days=7) end_date = datetime.now() # Guard: if start_date is in the future (e.g., last record is today), clamp to today if start_date.date() > end_date.date(): start_date = end_date - timedelta(days=1) # Download new data df = self._download_with_retry(ticker, start=start_date, end=end_date, progress=False, auto_adjust=False) if df.empty: # Check if market is likely closed (weekend/holiday) # If latest data is recent (< 4 days old), probably just market closed if latest_data: days_since_last = (datetime.now().date() - latest_data.date.date()).days if days_since_last <= 3: # Weekend = 2-3 days, accept up to 3 logger.info(f"No new data for {ticker} (likely market closed, last data: {latest_data.date.date()})") results[ticker] = True # Not a failure - market just closed continue # Otherwise it's a real failure logger.warning(f"No new data for {ticker} (possible data issue or delisting)") results[ticker] = False # Indicate failure - no data downloaded continue # Flatten multi-index columns if present if isinstance(df.columns, pd.MultiIndex): df.columns = df.columns.get_level_values(0) # Insert new records records_added = 0 for date, row in df.iterrows(): existing = ( session.query(PriceData) .filter_by(stock_id=stock.id, date=date) .first() ) if existing: continue # Extract values safely (handle both scalar and Series) def get_value(row, col): try: val = row[col] if isinstance(val, pd.Series): val = val.iloc[0] if len(val) > 0 else None return val if pd.notna(val) else None except (KeyError, IndexError): return None price_data = PriceData( stock_id=stock.id, date=date, open=float(get_value(row, "Open")) if get_value(row, "Open") is not None else None, high=float(get_value(row, "High")) if get_value(row, "High") is not None else None, low=float(get_value(row, "Low")) if get_value(row, "Low") is not None else None, close=float(get_value(row, "Close")), volume=int(get_value(row, "Volume")) if get_value(row, "Volume") is not None else None, adj_close=float(get_value(row, "Adj Close")) if get_value(row, "Adj Close") is not None else None, data_source="yfinance", ) session.add(price_data) records_added += 1 session.commit() logger.info(f"Added {records_added} new records for {ticker}") results[ticker] = True except Exception as e: logger.error(f"Error updating {ticker}: {e}") results[ticker] = False return results def _get_stock_info(self, ticker: str) -> Optional[Dict]: """ Get stock information from Yahoo Finance with retry logic. Uses stock.info which contains all fundamental data needed. Args: ticker: Stock ticker symbol Returns: Dictionary with stock information or None """ import time for attempt in range(self.max_retries): try: stock = yf.Ticker(ticker) info = stock.info # Check if we got valid data if not info or "symbol" not in info: if attempt < self.max_retries - 1: wait_time = self.retry_delay * (2 ** attempt) logger.warning(f"Invalid data for {ticker} (attempt {attempt + 1}/{self.max_retries}). Retrying in {wait_time}s...") time.sleep(wait_time) continue else: logger.warning(f"Invalid data received for {ticker} after {self.max_retries} attempts") return None return info except Exception as e: if attempt < self.max_retries - 1: wait_time = self.retry_delay * (2 ** attempt) logger.warning(f"Error fetching info for {ticker} (attempt {attempt + 1}/{self.max_retries}): {e}. Retrying in {wait_time}s...") time.sleep(wait_time) else: logger.error(f"Error fetching stock info for {ticker} after {self.max_retries} attempts: {e}") return None return None def _parse_date(self, timestamp: Optional[int]) -> Optional[datetime]: """ Parse Unix timestamp from Yahoo Finance to datetime. Yahoo Finance returns dates as Unix timestamps (seconds since epoch). Args: timestamp: Unix timestamp (int) or None Returns: datetime object or None """ if timestamp is None: return None try: return datetime.fromtimestamp(timestamp) except (ValueError, TypeError, OSError) as e: logger.warning(f"Failed to parse timestamp {timestamp}: {e}") return None def _safe_extract_ratio(self, value) -> Optional[float]: """ Safely extract numeric ratio from Yahoo Finance. Handles None, Infinity, NaN, and non-numeric strings. Args: value: Raw value from Yahoo Finance Returns: Float value or None """ if value is None: return None try: float_val = float(value) # Reject infinity and NaN if float_val in [float('inf'), float('-inf')] or float_val != float_val: # NaN check return None return float_val except (ValueError, TypeError): return None def get_latest_price(self, ticker: str) -> Optional[Tuple[datetime, float]]: """ Get the latest price for a stock from database Args: ticker: Stock ticker symbol Returns: Tuple of (date, price) or None """ try: with self.db_manager.get_session() as session: stock = session.query(Stock).filter_by(ticker=ticker).first() if not stock: return None latest = ( session.query(PriceData) .filter_by(stock_id=stock.id) .order_by(PriceData.date.desc()) .first() ) if latest: return (latest.date, latest.close) return None except Exception as e: logger.error(f"Error getting latest price for {ticker}: {e}") return None