""" Warren Buffett Style Stock Analyzer Deterministic fundamental analysis without LLM dependency """ from typing import Dict, Optional, Any from datetime import datetime from loguru import logger class WarrenAnalyzer: """ Analyzes stocks using Warren Buffett-style value investing principles. Uses deterministic formulas instead of LLM for fast, reliable, and cost-free analysis. """ # Financial tickers (special handling: no D/E penalties, P/B focus) FINANCIALS_TICKERS = [ 'ISP.MI', 'UCG.MI', 'BAMI.MI', 'BPE.MI', 'BMPS.MI', 'FBK.MI', 'MB.MI', 'PST.MI', 'UNI.MI', 'G.MI', 'AZM.MI', # Azimut (Asset Management - valutato con P/B come le banche) ] # Utilities/Energy tickers (special handling: cashflow/debt dynamics) UTILITY_TICKERS = ['A2A.MI', 'ENEL.MI', 'IG.MI', 'SRG.MI', 'TRN.MI', 'ERG.MI'] # Automotive / Industrial tickers (cashflow can be lumpy year to year) AUTOMOTIVE_TICKERS = ['RACE.MI', 'STLAM.MI', 'IVG.MI', 'PIRC.MI'] INDUSTRIAL_TICKERS = ['PRY.MI', 'TEN.MI', 'IP.MI', 'BZU.MI', 'LDO.MI'] # Combined list for special treatment (D/E and advanced metrics tolerance) SPECIAL_TREATMENT_TICKERS = FINANCIALS_TICKERS + UTILITY_TICKERS # Hard fail thresholds HARD_FAIL_FCF_YIELD_THRESHOLD = -0.20 # -20% HARD_FAIL_NET_DEBT_EBITDA_THRESHOLD = 10.0 # Valuation targets (parametrized for easier tuning) TARGET_FCF_YIELD = 0.08 # 8% FCF yield (più conservativo) TARGET_EV_EBITDA = 10.0 # Standard EV/EBITDA TARGET_EV_EBITDA_LUXURY = 20.0 # Premium EV/EBITDA for luxury/brand moat # Special ratings/scores SCORE_CRITICAL_DANGER = 5 SCORE_DATA_INSUFFICIENT = 10 RATING_CRITICAL_DANGER = "CRITICAL DANGER" RATING_DATA_INSUFFICIENT = "DATA INSUFFICIENT" # Scoring weights (total 100 points) VALUATION_WEIGHT = 30 # Price metrics QUALITY_WEIGHT = 40 # Business quality GROWTH_WEIGHT = 30 # Growth metrics # Recommendation thresholds BUY_SCORE = 80 STRONG_BUY_MARGIN = 20.0 # Minimum 20% margin of safety for STRONG BUY BUY_MARGIN = 15.0 # Minimum 15% margin of safety for BUY HOLD_SCORE = 60 WATCH_SCORE = 70 # Nuova soglia per WATCH (high quality but overvalued) RATING_WATCH = "WATCH" # Nuova categoria raccomandazione def __init__(self): """Initialize the Warren Analyzer""" pass def analyze(self, stock_data: Dict) -> Dict: """ Analyze a stock and return Warren Buffett-style recommendation Args: stock_data: Dictionary containing: - ticker: Stock symbol - name: Company name - sector: Industry sector - price: Current price - pe_ratio: P/E ratio - pb_ratio: P/B ratio - roe: Return on Equity (as decimal, e.g., 0.15 = 15%) - debt_to_equity: Debt to Equity ratio - dividend_yield: Dividend yield (as decimal) - revenue_growth: Revenue growth rate (as decimal) - earnings_growth: Earnings growth rate (as decimal) - market_cap: Market capitalization Returns: Dictionary with: - ticker: Stock symbol - name: Company name - score: Quality score (0-100) - valutazione: BUY/HOLD/AVOID/CRITICAL DANGER/DATA INSUFFICIENT - fair_value: Estimated fair value per share - current_price: Current market price - margin_of_safety: % discount/premium - ragionamento: Explanation text """ # === STEP 1: EXTRACT BASE DATA === ticker = stock_data.get('ticker', '') sector = stock_data.get('sector', '') fcf = stock_data.get('free_cashflow') market_cap = stock_data.get('market_cap') ev = stock_data.get('enterprise_value') ebitda = stock_data.get('ebitda') net_debt = stock_data.get('net_debt') # === STEP 2: CALCULATE ADVANCED QUALITY METRICS (ALWAYS) === # These metrics are calculated FIRST, before any rating classification # This ensures we have data even for CRITICAL DANGER or DATA INSUFFICIENT stocks logger.debug(f"{ticker}: Calculating Schema v4 advanced quality metrics...") roic = self.calculate_roic(ticker) interest_coverage = self.calculate_interest_coverage(ticker, sector) piotroski_fscore = self.calculate_piotroski_fscore(ticker) logger.debug(f"{ticker}: Advanced metrics - ROIC: {roic}, Interest Coverage: {interest_coverage}, F-Score: {piotroski_fscore}") # === STEP 3: EARLY CLASSIFICATION CHECKS (v2 metrics) === # Compute FCF yield if possible fcf_yield = None if fcf is not None and market_cap: fcf_yield = fcf / market_cap # Compute net debt / EBITDA net_debt_ebitda = None if net_debt is not None and ebitda: if ebitda != 0: net_debt_ebitda = net_debt / ebitda # Sector-specific FCF yield handling for automotive/industrials (use 3y avg, softer threshold) is_auto_industrial = self._is_auto_industrial(ticker, sector) fcf_yield_to_check = fcf_yield hard_fail_fcf_threshold = self.HARD_FAIL_FCF_YIELD_THRESHOLD if is_auto_industrial: avg_fcf_yield = stock_data.get('avg_fcf_yield_3y') if avg_fcf_yield is not None: fcf_yield_to_check = avg_fcf_yield hard_fail_fcf_threshold = -0.05 # -5% on average over 3y # Hard fail condition if (fcf_yield_to_check is not None and fcf_yield_to_check < hard_fail_fcf_threshold) or \ (net_debt_ebitda is not None and net_debt_ebitda > self.HARD_FAIL_NET_DEBT_EBITDA_THRESHOLD): # PST.MI ha struttura ibrida: declassa a DATA INSUFFICIENT invece di CRITICAL if stock_data.get('ticker') == 'PST.MI': return { 'ticker': stock_data.get('ticker'), 'name': stock_data.get('name'), 'score': self.SCORE_DATA_INSUFFICIENT, 'valutazione': self.RATING_DATA_INSUFFICIENT, 'fair_value': round(stock_data.get('price', 0), 2), 'current_price': round(stock_data.get('price', 0), 2), 'margin_of_safety': 0.0, 'ragionamento': "PST.MI: Criticità FCF/Debito non standard per settore finanziario/ibrido. Classificazione declassata a Dati Insufficienti.", # Schema v4 metrics (calculated before classification) 'roic': roic, 'interest_coverage': interest_coverage, 'piotroski_fscore': piotroski_fscore } return { 'ticker': stock_data.get('ticker'), 'name': stock_data.get('name'), 'score': self.SCORE_CRITICAL_DANGER, 'valutazione': self.RATING_CRITICAL_DANGER, 'fair_value': round(stock_data.get('price', 0), 2), 'current_price': round(stock_data.get('price', 0), 2), 'margin_of_safety': 0.0, 'ragionamento': "Dati critici: FCF yield o Net Debt/EBITDA fuori soglia.", # Schema v4 metrics (calculated before classification) 'roic': roic, 'interest_coverage': interest_coverage, 'piotroski_fscore': piotroski_fscore } # Data completeness check (conditional on sector) missing_advanced = 0 if stock_data.get('ticker') not in self.SPECIAL_TREATMENT_TICKERS: for v in [fcf, ev, ebitda, net_debt]: if v is None: missing_advanced += 1 else: # For special tickers (financials/utilities), ignore FCF/EBITDA gaps # but require basic valuation inputs for v in [stock_data.get('pe_ratio'), stock_data.get('pb_ratio'), stock_data.get('roe'), stock_data.get('market_cap')]: if v is None: missing_advanced += 1 if missing_advanced > 2: return { 'ticker': stock_data.get('ticker'), 'name': stock_data.get('name'), 'score': self.SCORE_DATA_INSUFFICIENT, 'valutazione': self.RATING_DATA_INSUFFICIENT, 'fair_value': round(stock_data.get('price', 0), 2), 'current_price': round(stock_data.get('price', 0), 2), 'margin_of_safety': 0.0, 'ragionamento': "Dati incompleti: per questo settore mancano metriche fondamentali necessarie (non richiesti FCF/EBITDA per financial/utility).", # Schema v4 metrics (calculated before classification) 'roic': roic, 'interest_coverage': interest_coverage, 'piotroski_fscore': piotroski_fscore } # === CIO QUALITY GATEKEEPER === cio_failures = self._check_cio_gatekeeper(stock_data, ticker, sector) if cio_failures: logger.warning(f"{ticker}: CIO Quality Fail - {', '.join(cio_failures)}") return { 'ticker': stock_data.get('ticker'), 'name': stock_data.get('name'), 'score': self.SCORE_CRITICAL_DANGER, 'valutazione': self.RATING_CRITICAL_DANGER, 'fair_value': round(stock_data.get('price', 0), 2), 'current_price': round(stock_data.get('price', 0), 2), 'margin_of_safety': 0.0, 'ragionamento': f"CIO Quality Fail: {', '.join(cio_failures)}", # Schema v4 metrics (calculated before classification) 'roic': roic, 'interest_coverage': interest_coverage, 'piotroski_fscore': piotroski_fscore } # === STEP 4: CALCULATE VALUATION === # v4.1 returns Dict with fair_value, methods, adjustments, parameters fair_value_result = self.calculate_fair_value(stock_data) fair_value = fair_value_result['fair_value'] fair_value_methods = fair_value_result['methods'] fair_value_method_weights = fair_value_result['method_weights'] fair_value_adjustments = fair_value_result['adjustments'] fair_value_parameters = fair_value_result['parameters'] current_price = stock_data.get('price', 0) margin_of_safety = self._calculate_margin_of_safety(current_price, fair_value) # === STEP 5: CALCULATE SCORES === # Calculate base quality score (0-100) - v4.1 returns Dict score_result = self.calculate_score(stock_data) base_score = score_result['score'] score_breakdown = score_result['breakdown'] score_parameters = score_result['parameters'] # Calculate advanced quality score (0-20) advanced_quality = self.calculate_advanced_quality_score(roic, interest_coverage, piotroski_fscore) # Combine scores and normalize to 100 # Max raw score: base (~110 with bonuses) + advanced (20) = ~130 raw_score = base_score + advanced_quality final_score = min(100, int((raw_score / 130) * 100)) # === STEP 6: DETERMINE RATING AND GENERATE REASONING === valutazione = self._get_recommendation(final_score, margin_of_safety) ragionamento = self._generate_reasoning(stock_data, final_score, fair_value, margin_of_safety, valutazione) return { 'ticker': stock_data.get('ticker'), 'name': stock_data.get('name'), 'score': final_score, 'valutazione': valutazione, 'fair_value': round(fair_value, 2), 'current_price': round(current_price, 2), 'margin_of_safety': round(margin_of_safety, 1), 'ragionamento': ragionamento, # Schema v4 metrics 'roic': roic, 'interest_coverage': interest_coverage, 'piotroski_fscore': piotroski_fscore, # Schema v4.1 tracking - scoring 'score_breakdown': score_breakdown, 'score_parameters': score_parameters, # Schema v4.1 tracking - fair value 'fair_value_methods': fair_value_methods, 'fair_value_method_weights': fair_value_method_weights, 'fair_value_adjustments': fair_value_adjustments, 'fair_value_parameters': fair_value_parameters } def calculate_fair_value(self, stock_data: Dict) -> Dict[str, Any]: """ Calculate fair value using multiple valuation methods (Schema v2) Uses a weighted average of: 1. P/E based valuation (35%) 2. P/B based valuation (25%) 3. FCF Yield valuation (20%) [NEW - Schema v2] 4. EV/EBITDA valuation (15%) [NEW - Schema v2] 5. Dividend discount model (5%) Weights auto-normalize when data is missing. Special handling for luxury/growth stocks (Ferrari, Moncler, etc.) Returns Dict with fair_value, methods breakdown, adjustments, and parameters (v4.1) """ price = stock_data.get('price', 0) pe = stock_data.get('pe_ratio') pb = stock_data.get('pb_ratio') roe = stock_data.get('roe') earnings_growth = stock_data.get('earnings_growth') dividend_yield = stock_data.get('dividend_yield') sector = stock_data.get('sector', '') ticker = stock_data.get('ticker', '') # Schema v2 fields market_cap = stock_data.get('market_cap') free_cashflow = stock_data.get('free_cashflow') enterprise_value = stock_data.get('enterprise_value') ebitda = stock_data.get('ebitda') # === v4.1 TRACKING INITIALIZATION === methods = { 'pe': 0, 'pb': 0, 'fcf_yield': 0, 'ev_ebitda': 0, 'dividend': 0 } method_weights = { 'pe': 0, 'pb': 0, 'fcf_yield': 0, 'ev_ebitda': 0, 'dividend': 0 } adjustments = { 'base_fair_value': 0, 'utility_bonus': 0, 'quality_premium': 0, 'country_penalty': 0 } parameters = { 'growth_rate_original': 0, 'growth_rate_used': 0, 'is_growth_capped': False, 'graham_multiplier': 0, 'is_luxury': False, 'is_utility': False, 'is_financial': False, 'is_mature_sector': False } valuations = [] weights = [] # Detect luxury/brand-premium stocks is_luxury = self._is_luxury_brand(ticker, sector) is_utility = self._is_utility_sector(ticker, sector) parameters['is_luxury'] = is_luxury parameters['is_utility'] = is_utility net_debt_val = stock_data.get('net_debt') roe_val = stock_data.get('roe') # Method 1: P/E based (Benjamin Graham formula) components = {} if pe and pe > 0 and earnings_growth is not None: # Handle negative earnings growth conservatively if earnings_growth < -0.2: logger.warning( f"{ticker}: Negative earnings growth ({earnings_growth:.1%}). " f"Using conservative P/E approach." ) growth_rate = 0 # Don't penalize too much, but don't reward parameters['growth_rate_original'] = earnings_growth * 100 parameters['growth_rate_used'] = 0 parameters['is_growth_capped'] = True else: growth_rate_original = earnings_growth * 100 # Convert to percentage growth_rate = max(0, growth_rate_original) # Floor at 0 parameters['growth_rate_original'] = growth_rate_original # === MATURE SECTOR GROWTH CAP (Schema v3) === # Cap growth at 4% (nominal GDP growth) for mature sectors mature_sectors = ['financial services', 'utilities', 'energy', 'banks'] is_mature = any(keyword in sector.lower() for keyword in mature_sectors) if sector else False parameters['is_mature_sector'] = is_mature if is_mature: growth_rate_capped = min(growth_rate, 4.0) # 4% cap for mature sectors parameters['is_growth_capped'] = (growth_rate != growth_rate_capped) growth_rate = growth_rate_capped logger.debug(f"{ticker}: Mature sector ({sector}) - growth capped at 4%") else: growth_rate_capped = min(growth_rate, 5.0) # 5% cap for other sectors parameters['is_growth_capped'] = (growth_rate != growth_rate_capped) growth_rate = growth_rate_capped parameters['growth_rate_used'] = growth_rate # === SECTOR-AWARE GRAHAM MULTIPLIER (Schema v3) === # Financial sectors: defensive 15.0x base multiplier # Other sectors: standard 22.5x multiplier (10 + 1.5 * g, capped at 22.5) sector_l = sector.lower() if sector else '' is_financial = any(k in sector_l for k in ['financial services', 'banks', 'insurance', 'asset management']) parameters['is_financial'] = is_financial if is_luxury: # Luxury brands: premium multiples fair_pe = min(30, 20 + 2.0 * growth_rate) elif is_financial: # Financial sectors: defensive 15.0x multiplier fair_pe = 15.0 # Conservative fixed multiplier, no growth premium logger.debug(f"{ticker}: Financial sector - using defensive 15.0x P/E multiplier") else: # Standard Graham formula: 10 + 1.5 * g, capped at 22.5 fair_pe = min(22.5, 10 + 1.5 * growth_rate) parameters['graham_multiplier'] = fair_pe pe_fair_value = price * (fair_pe / pe) # Sanity check: don't let fair value be > 3x current price pe_fair_value = min(pe_fair_value, price * 3.0) valuations.append(pe_fair_value) weights.append(0.35) # 35% weight (reduced from 40%) components["pe"] = pe_fair_value methods['pe'] = pe_fair_value method_weights['pe'] = 0.35 # Method 2: P/B and ROE based if pb and pb > 0 and roe and roe > 0: # Fair P/B ratio based on ROE roe_pct = roe * 100 # Luxury brands can sustain higher P/B multiples if is_luxury: fair_pb = max(2.0, min(6.0, roe_pct / 5)) else: fair_pb = max(1.0, min(2.0, roe_pct / 12)) # Cap più conservativo pb_fair_value = price * (fair_pb / pb) # Sanity check pb_fair_value = min(pb_fair_value, price * 3.0) valuations.append(pb_fair_value) weights.append(0.25) # 25% weight (reduced from 30%) components["pb"] = pb_fair_value methods['pb'] = pb_fair_value method_weights['pb'] = 0.25 # Method 3: FCF Yield valuation (Schema v2) - 20% weight if free_cashflow and market_cap and market_cap > 0: # Calculate FCF Yield fcf_yield = free_cashflow / market_cap # Target FCF yield for quality stocks: 5-8% # Lower yield = higher valuation (stock is expensive) # Higher yield = lower valuation (stock is cheap) if fcf_yield > 0: target_fcf_yield = self.TARGET_FCF_YIELD # Fair value based on FCF yield normalization (lower yield -> higher FV) fcf_fair_value = price * (fcf_yield / target_fcf_yield) # Sanity check fcf_fair_value = min(fcf_fair_value, price * 3.0) fcf_fair_value = max(fcf_fair_value, price * 0.3) # Floor at 30% valuations.append(fcf_fair_value) weights.append(0.20) components["fcf_yield"] = fcf_fair_value methods['fcf_yield'] = fcf_fair_value method_weights['fcf_yield'] = 0.20 # Method 4: EV/EBITDA valuation (Schema v2) - 15% weight if enterprise_value and ebitda and ebitda > 0 and enterprise_value > 0: # Calculate current EV/EBITDA multiple ev_ebitda_ratio = enterprise_value / ebitda # Target EV/EBITDA for quality stocks # Luxury brands can sustain higher multiples target_ev_ebitda = self.TARGET_EV_EBITDA_LUXURY if is_luxury else self.TARGET_EV_EBITDA # Fair value based on EV/EBITDA normalization # If current multiple is high, stock is expensive ev_fair_value = price * (target_ev_ebitda / ev_ebitda_ratio) # Sanity check ev_fair_value = min(ev_fair_value, price * 2.5) # Slightly tighter cap ev_fair_value = max(ev_fair_value, price * 0.4) # Slightly higher floor valuations.append(ev_fair_value) weights.append(0.15) components["ev_ebitda"] = ev_fair_value methods['ev_ebitda'] = ev_fair_value method_weights['ev_ebitda'] = 0.15 # Method 5: Dividend-based (for dividend payers) - 5% weight if dividend_yield and dividend_yield > 0: # Normalize dividend yield normalized_div_yield = dividend_yield if dividend_yield < 1 else dividend_yield / 100 # Fair dividend yield for quality stocks: 2-4% target_yield = 0.03 # 3% # Only use dividend valuation if yield is reasonable (0.5% - 15%) if 0.005 < normalized_div_yield < 0.15: div_fair_value = price * (target_yield / normalized_div_yield) # Sanity check div_fair_value = min(div_fair_value, price * 2.0) valuations.append(div_fair_value) weights.append(0.05) # 5% weight (reduced from 30%) components["dividend"] = div_fair_value methods['dividend'] = div_fair_value method_weights['dividend'] = 0.05 # Calculate weighted average if valuations: total_weight = sum(weights) fair_value = sum(v * w for v, w in zip(valuations, weights)) / total_weight else: # Fallback: use current price if no data fair_value = price # Final floor: Fair value should be at least 20% of current price # (prevents absurd undervaluations for luxury/growth stocks) min_fair_value = price * 0.2 fair_value = max(min_fair_value, fair_value) base_fair_value = fair_value adjustments['base_fair_value'] = base_fair_value # Utilities defensive bonus: premia bassa beta o alto dividend yield if is_utility: beta = stock_data.get('beta') dividend_yield_val = stock_data.get('dividend_yield') bonus = 0.0 if beta is not None and beta < 0.8: bonus += 0.05 # +5% fair value per bassa volatilità if dividend_yield_val: normalized_div_yield = dividend_yield_val if dividend_yield_val < 1 else dividend_yield_val / 100 if normalized_div_yield > 0.05: bonus += 0.05 # +5% per yield sostenuto if bonus > 0: bonus_capped = min(bonus, 0.10) # Cap bonus al 10% fair_value *= (1 + bonus_capped) adjustments['utility_bonus'] = bonus_capped # Quality Premium: premia aziende best-in-class (alto ROE + net cash) if fair_value > 0: roe_pct = roe_val * 100 if roe_val is not None else None multiplier = 1.0 # Esclusione finanziari dal premio ROE (ROE spesso alto per leva/regolazione) sector_l = sector.lower() if sector else '' is_financial = self._is_financial_sector(ticker, sector) or any( kw in sector_l for kw in ['financial', 'bank', 'insurance', 'asset'] ) # ROE premium solo per non-financial if roe_pct is not None and not is_financial: if roe_pct > 25: multiplier += 0.25 # +25% FV per ROE eccellente elif roe_pct > 15: multiplier += 0.10 # +10% FV per buon ROE # Net cash premium (valido per tutti) if net_debt_val is not None and net_debt_val < 0: multiplier += 0.10 # +10% FV per posizione net cash # Brand/lusso/consumer cyclical premium (solo non-financial) if roe_pct is not None and not is_financial: if (is_luxury or any(k in sector_l for k in ['luxury', 'leisure', 'consumer cyclical'])) and roe_pct > 20: multiplier += 0.15 multiplier = min(multiplier, 1.3) # Cap al +30% (era 1.5/+50%, più conservativo) quality_premium = multiplier - 1.0 # Store premium as percentage (0.0 to 0.3) fair_value *= multiplier adjustments['quality_premium'] = quality_premium logger.debug( f"{ticker}: Quality premium applied", extra={ "ticker": ticker, "roe_pct": roe_pct, "net_debt": net_debt_val, "is_financial": is_financial, "is_luxury": is_luxury, "multiplier": multiplier, }, ) # === COUNTRY RISK PENALTY (Schema v3) === # Penalize Italian stocks (-20%), French/German stocks (-10%) # Rationale: systematic risk, lower liquidity, political/economic volatility country_penalty = 0.0 if ticker.endswith('.MI'): # Italian stocks: -20% fair value penalty country_penalty = 0.20 logger.debug(f"{ticker}: Italian stock - applying -20% country risk penalty") elif ticker.endswith('.PA') or ticker.endswith('.DE'): # French/German stocks: -10% fair value penalty country_penalty = 0.10 logger.debug(f"{ticker}: French/German stock - applying -10% country risk penalty") fair_value *= (1 - country_penalty) adjustments['country_penalty'] = country_penalty logger.debug( f"{ticker}: fair value components", extra={ "ticker": ticker, "components": components, "weights": weights, "base_fair_value": round(base_fair_value, 4), "country_penalty": country_penalty, "final_fair_value": round(fair_value, 4), }, ) final_fair_value = max(0.01, fair_value) # Ensure positive value # === v4.1 RETURN DICT === return { 'fair_value': final_fair_value, 'methods': methods, 'method_weights': method_weights, 'adjustments': adjustments, 'parameters': parameters } def _debt_penalty_sector_aware(self, ticker: str, debt_equity: Optional[float], net_debt_ebitda: Optional[float], sector: str) -> int: """ Sector-aware debt penalty: - Automotive/Consumer Cyclical/Financial Services: ignore D/E (spesso falsato), usa Net Debt/EBITDA più tollerante. - Altri settori: soglie standard su D/E e Net Debt/EBITDA. """ penalty = 0 sector_l = sector.lower() if sector else '' is_auto_fin = ( ticker in self.AUTOMOTIVE_TICKERS or any(k in sector_l for k in ['automotive', 'auto', 'consumer cyclical', 'financial services']) ) if is_auto_fin: if net_debt_ebitda is not None: if net_debt_ebitda > 5.0: penalty += 15 elif net_debt_ebitda > 3.5: penalty += 5 else: if debt_equity is not None: # Normalize D/E before checking threshold (Yahoo inconsistency fix) de_ratio = debt_equity / 100.0 if debt_equity > 10 else debt_equity if de_ratio > 1.5: penalty += 10 if net_debt_ebitda is not None and net_debt_ebitda > 3.0: penalty += 10 return penalty def _is_luxury_brand(self, ticker: str, sector: str) -> bool: """ Detect if stock is a luxury/brand-premium company These stocks trade at premium multiples due to: - Strong brand moat - Pricing power - Scarcity/exclusivity """ # Explicit luxury brand tickers (even if sector classification is generic) luxury_tickers = ['RACE.MI', 'MONC.MI'] # Ferrari, Moncler # Sector keywords that indicate luxury positioning luxury_keywords = ['Luxury', 'Consumer Cyclical'] return ticker in luxury_tickers or any(keyword in sector for keyword in luxury_keywords) def _is_auto_industrial(self, ticker: str, sector: str) -> bool: """Detect automotive/industrial sectors where FCF can be volatile YoY.""" sector_l = sector.lower() if sector else '' keywords = ['automotive', 'industrial', 'construction', 'machinery', 'capital goods'] return ( ticker in self.AUTOMOTIVE_TICKERS or ticker in self.INDUSTRIAL_TICKERS or any(k in sector_l for k in keywords) ) def _is_financial_sector(self, ticker: str, sector: str) -> bool: """ Detect if stock is in BANKING/INSURANCE sector (NOT asset management) These stocks need special metrics: - D/E is meaningless (deposits/reserves counted as debt) - Use P/B ratio instead - Different valuation multiples NOTE: Excludes asset management firms (like Azimut) which operate differently """ # ONLY banks and insurance - NOT asset managers! financial_tickers = self.FINANCIALS_TICKERS # Keywords that identify BANKS and INSURANCE only financial_keywords = [ 'Banks', 'Insurance', 'Diversified Banks', 'Regional Banks' ] return ticker in financial_tickers or any(keyword in sector for keyword in financial_keywords) def _is_utility_sector(self, ticker: str, sector: str) -> bool: """ Detect if stock is a utility company (Schema v2) Utilities have special characteristics: - High infrastructure costs → high debt is normal - Stable cashflows → dividends should be covered by FCF - Regulated pricing → lower growth but predictable Uses hybrid detection (hardcoded list + keywords) """ # Hardcoded utility tickers (per user configuration) utility_tickers = [ 'ENEL.MI', # Enel (energy) 'TRN.MI', # Terna (grid operator) 'SRG.MI', # Snam (gas infrastructure) 'HER.MI', # Hera (multi-utility) 'A2A.MI', # A2A (multi-utility) 'IG.MI', # Italgas (gas distribution) ] # Sector keywords for utilities utility_keywords = [ 'Utilities', 'Energy', 'Oil & Gas', 'Electric', 'Gas', ] return ticker in utility_tickers or any(keyword in sector for keyword in utility_keywords) def _check_cio_gatekeeper(self, stock_data: Dict, ticker: str, sector: str) -> list: """ CIO Quality Trio: Gatekeeper to avoid value traps. Returns list of failure reasons (empty = pass all checks). Three quality checks using snapshot data (Schema v2): 1. Cash King: OCF should support earnings (OCF >= Net Income) 2. Margin Check: Operating margin must be positive 3. Solvency Check: D/E within sector-appropriate thresholds """ failures = [] # CRITERIO 1: Cash King (Quality of Earnings) net_income = stock_data.get('net_income') operating_cf = stock_data.get('operating_cashflow') if net_income is not None and net_income > 0 and operating_cf is not None: if operating_cf < net_income: failures.append("Cash Flow Quality (OCF < Net Income)") # CRITERIO 2: Margin Check (Profitability) operating_margin = stock_data.get('operating_margin') if operating_margin is not None and operating_margin < 0: failures.append("Negative Operating Margin") # CRITERIO 3: Solvency Check (Debt Sector-Aware) debt_to_equity = stock_data.get('debt_to_equity') if debt_to_equity is not None: # Normalize D/E ratio: Yahoo Finance is inconsistent # Some tickers return ratio (0.5-3.0), others percentage (50-300) # Heuristic: if value > 10, assume it's percentage → divide by 100 de_ratio = debt_to_equity / 100.0 if debt_to_equity > 10 else debt_to_equity # Skip for Financials (D/E not meaningful for banks) if ticker in self.FINANCIALS_TICKERS: pass # No D/E check for financial institutions # High tolerance for Utilities (infrastructure-heavy) elif ticker in self.UTILITY_TICKERS: if de_ratio > 2.5: failures.append(f"High Debt/Equity ({de_ratio:.2f} > 2.5)") # Standard threshold for Industrial/Services else: if de_ratio > 1.5: failures.append(f"High Debt/Equity ({de_ratio:.2f} > 1.5)") return failures def calculate_score(self, stock_data: Dict) -> Dict[str, Any]: """ Calculate Warren Buffett quality score (0-100) Breakdown: - Valuation (30 points): P/E, P/B, Dividend yield - Quality (40 points): ROE, Debt/Equity - Growth (30 points): Revenue growth, Earnings growth Returns: Dict with final_score, raw_score, breakdown, and parameters (Schema v4.1) """ # === v4.1 TRACKING INITIALIZATION === breakdown = { 'valuation': {'total': 0, 'pe': 0, 'pb': 0, 'dividend': 0}, 'quality': {'total': 0, 'roe': 0, 'debt': 0}, 'growth': {'total': 0, 'revenue': 0, 'earnings': 0}, 'bonuses': {'total': 0, 'margins': 0, 'debt_coverage': 0, 'fcf_payout': 0, 'peg': 0, 'ev_ebitda': 0}, 'penalties': {'total': 0, 'roe_negative': 0, 'debt_excess': 0}, 'advanced_quality': {'total': 0, 'roic': 0, 'interest_coverage': 0, 'piotroski': 0} } parameters = { 'growth_rate_original': 0, 'growth_rate_used': 0, 'is_growth_capped': False, 'graham_multiplier': 0, 'is_financial': False, 'is_utility': False, 'is_luxury': False, 'is_auto_industrial': False } score = 0 # === VALUATION SCORE (30 points) === pe = stock_data.get('pe_ratio') pb = stock_data.get('pb_ratio') div_yield = stock_data.get('dividend_yield', 0) # P/E scoring (15 points) if pe: if pe < 12: score += 15 breakdown['valuation']['pe'] = 15 elif pe < 18: score += 12 breakdown['valuation']['pe'] = 12 elif pe < 25: score += 8 breakdown['valuation']['pe'] = 8 elif pe < 35: score += 4 breakdown['valuation']['pe'] = 4 # P/B scoring (10 points) if pb: if pb < 1.5: score += 10 breakdown['valuation']['pb'] = 10 elif pb < 2.5: score += 7 breakdown['valuation']['pb'] = 7 elif pb < 4: score += 4 breakdown['valuation']['pb'] = 4 elif pb < 6: score += 2 breakdown['valuation']['pb'] = 2 # Dividend yield (5 points) if div_yield: # Normalize dividend yield (sometimes stored as percentage, sometimes as decimal) normalized_div = div_yield if div_yield < 1 else div_yield / 100 div_pct = normalized_div * 100 if div_pct > 4: score += 5 breakdown['valuation']['dividend'] = 5 elif div_pct > 2.5: score += 4 breakdown['valuation']['dividend'] = 4 elif div_pct > 1: score += 2 breakdown['valuation']['dividend'] = 2 # === QUALITY SCORE (40 points) === roe = stock_data.get('roe') debt_to_equity = stock_data.get('debt_to_equity') pb = stock_data.get('pb_ratio') sector = stock_data.get('sector', '') ticker = stock_data.get('ticker', '') # ROE scoring (25 points) - Most important for Warren # VALUE TRAP DETECTION: Penalize negative ROE if roe is not None: roe_pct = roe * 100 if roe_pct > 20: score += 25 breakdown['quality']['roe'] = 25 elif roe_pct > 15: score += 20 breakdown['quality']['roe'] = 20 elif roe_pct > 10: score += 15 breakdown['quality']['roe'] = 15 elif roe_pct > 5: score += 8 breakdown['quality']['roe'] = 8 elif roe_pct > 0: score += 3 breakdown['quality']['roe'] = 3 elif roe_pct > -5: score -= 5 # Small penalty for slightly negative ROE breakdown['quality']['roe'] = -5 breakdown['penalties']['roe_negative'] = 5 # Track as positive else: score -= 15 # HEAVY penalty for deeply negative ROE (value trap!) breakdown['quality']['roe'] = -15 breakdown['penalties']['roe_negative'] = 15 # Track as positive # Debt to Equity (15 points) - Warren prefers low debt # SPECIAL HANDLING: Financials sector (banks, insurance) # Treat utilities as special too is_financial = self._is_financial_sector(ticker, sector) or ticker in self.UTILITY_TICKERS parameters['is_financial'] = is_financial # Track flag if is_financial: # For financials, use P/B ratio instead of D/E # Banks/insurance have high D/E by nature (deposits/reserves) if pb is not None: if pb < 0.8: score += 15 # Trading below book value breakdown['quality']['debt'] = 15 elif pb < 1.2: score += 12 # Around book value breakdown['quality']['debt'] = 12 elif pb < 1.5: score += 8 breakdown['quality']['debt'] = 8 elif pb < 2.0: score += 4 breakdown['quality']['debt'] = 4 else: # Normal D/E scoring for non-financials if debt_to_equity is not None: # CRITICAL FIX: Normalize D/E (same heuristic as CIO gatekeeper line 598) # Yahoo Finance inconsistency: some tickers return percentage (>10), others ratio de_ratio = debt_to_equity / 100.0 if debt_to_equity > 10 else debt_to_equity if de_ratio < 0.3: score += 15 breakdown['quality']['debt'] = 15 elif de_ratio < 0.6: score += 12 breakdown['quality']['debt'] = 12 elif de_ratio < 1.0: score += 8 breakdown['quality']['debt'] = 8 elif de_ratio < 2.0: score += 4 breakdown['quality']['debt'] = 4 # Penalize very high debt (>3.0) elif de_ratio > 3.0: score -= 5 breakdown['quality']['debt'] = -5 breakdown['penalties']['debt_excess'] = 5 # Track as positive # === GROWTH SCORE (30 points) === revenue_growth = stock_data.get('revenue_growth') earnings_growth = stock_data.get('earnings_growth') # Revenue growth (15 points) if revenue_growth is not None: rev_pct = revenue_growth * 100 if rev_pct > 15: score += 15 breakdown['growth']['revenue'] = 15 elif rev_pct > 10: score += 12 breakdown['growth']['revenue'] = 12 elif rev_pct > 5: score += 9 breakdown['growth']['revenue'] = 9 elif rev_pct > 0: score += 5 breakdown['growth']['revenue'] = 5 elif rev_pct > -5: score += 2 breakdown['growth']['revenue'] = 2 # Earnings growth (15 points) if earnings_growth is not None: earn_pct = earnings_growth * 100 if earn_pct > 15: score += 15 breakdown['growth']['earnings'] = 15 elif earn_pct > 10: score += 12 breakdown['growth']['earnings'] = 12 elif earn_pct > 5: score += 9 breakdown['growth']['earnings'] = 9 elif earn_pct > 0: score += 5 breakdown['growth']['earnings'] = 5 elif earn_pct > -5: score += 2 breakdown['growth']['earnings'] = 2 # === SCHEMA V2 ENHANCEMENTS === # Detect utility sector once for all v2 logic is_utility = self._is_utility_sector(ticker, sector) parameters['is_utility'] = is_utility # Margin-based scoring bonus (up to +10 points) gross_margin = stock_data.get('gross_margin') operating_margin = stock_data.get('operating_margin') if gross_margin is not None and operating_margin is not None: gross_pct = gross_margin * 100 operating_pct = operating_margin * 100 # High margins indicate pricing power and efficiency margin_bonus = 0 if gross_pct > 40 and operating_pct > 15: margin_bonus = 10 # Excellent margins (Ferrari-like) elif gross_pct > 30 and operating_pct > 10: margin_bonus = 7 # Strong margins elif gross_pct > 20 and operating_pct > 5: margin_bonus = 4 # Good margins score += margin_bonus breakdown['bonuses']['margins'] = margin_bonus # Net Debt/EBITDA scoring (up to +5 points bonus, -10 penalty) net_debt = stock_data.get('net_debt') ebitda = stock_data.get('ebitda') net_debt_ebitda_ratio = None if net_debt is not None and ebitda is not None and ebitda > 0: net_debt_ebitda_ratio = net_debt / ebitda # Sector-specific thresholds if is_utility: # Utilities: infrastructure-heavy, higher debt is normal if net_debt_ebitda_ratio < 2.0: score += 5 # Very low debt for utility breakdown['bonuses']['debt_coverage'] = 5 elif net_debt_ebitda_ratio < 4.0: score += 3 # Normal debt for utility breakdown['bonuses']['debt_coverage'] = 3 elif net_debt_ebitda_ratio > 6.0: score -= 10 # Excessive debt even for utility breakdown['bonuses']['debt_coverage'] = -10 breakdown['penalties']['debt_excess'] += 10 else: # Non-utilities: lower debt preferred if net_debt_ebitda_ratio < 1.0: score += 5 # Excellent debt coverage breakdown['bonuses']['debt_coverage'] = 5 elif net_debt_ebitda_ratio < 2.0: score += 3 # Good debt coverage breakdown['bonuses']['debt_coverage'] = 3 elif net_debt_ebitda_ratio > 4.0: score -= 10 # Excessive debt breakdown['bonuses']['debt_coverage'] = -10 breakdown['penalties']['debt_excess'] += 10 # Sector-aware debt penalty (applied after bonuses/penalties above) debt_penalty = self._debt_penalty_sector_aware(ticker, debt_to_equity, net_debt_ebitda_ratio, sector) score -= debt_penalty if debt_penalty > 0: breakdown['penalties']['debt_excess'] += debt_penalty # Utilities-specific: Payout sustainability (FCF vs Dividend) if is_utility: free_cashflow = stock_data.get('free_cashflow') dividend_rate = stock_data.get('dividend_rate') shares_outstanding = stock_data.get('shares_outstanding') if free_cashflow and dividend_rate and shares_outstanding: total_dividends = dividend_rate * shares_outstanding payout_ratio_fcf = total_dividends / free_cashflow if free_cashflow > 0 else 0 # Utilities should pay dividends from FCF, not debt if payout_ratio_fcf < 0.7: score += 5 # Sustainable payout (room for growth) breakdown['bonuses']['fcf_payout'] = 5 elif payout_ratio_fcf < 0.9: score += 2 # Acceptable payout breakdown['bonuses']['fcf_payout'] = 2 elif payout_ratio_fcf > 1.2: score -= 10 # Paying dividends from debt (unsustainable!) breakdown['bonuses']['fcf_payout'] = -10 breakdown['penalties']['debt_excess'] += 10 # === FORWARD-LOOKING BONUSES (Schema v3 enhancement) === # Compensate growth stocks penalized by traditional P/E scoring bonus_forward = 0 # PEG Ratio Bonus (up to +5 points) peg_ratio = stock_data.get('peg_ratio') if peg_ratio is not None and peg_ratio > 0: if peg_ratio < 1.0: bonus_forward += 5 # Undervalued relative to growth breakdown['bonuses']['peg'] = 5 elif peg_ratio < 1.5: bonus_forward += 3 # Reasonable valuation for growth breakdown['bonuses']['peg'] = 3 # EV/EBITDA Bonus (up to +5 points) ev_to_ebitda = stock_data.get('ev_to_ebitda') if ev_to_ebitda is not None and ev_to_ebitda > 0: if ev_to_ebitda < 8.0: bonus_forward += 5 # Very cheap enterprise valuation breakdown['bonuses']['ev_ebitda'] = 5 elif ev_to_ebitda < 12.0: bonus_forward += 3 # Cheap enterprise valuation breakdown['bonuses']['ev_ebitda'] = 3 score += bonus_forward # === v4.1 CALCULATE SECTION TOTALS === breakdown['valuation']['total'] = sum([ breakdown['valuation']['pe'], breakdown['valuation']['pb'], breakdown['valuation']['dividend'] ]) breakdown['quality']['total'] = sum([ breakdown['quality']['roe'], breakdown['quality']['debt'] ]) breakdown['growth']['total'] = sum([ breakdown['growth']['revenue'], breakdown['growth']['earnings'] ]) breakdown['bonuses']['total'] = sum([ breakdown['bonuses']['margins'], breakdown['bonuses']['debt_coverage'], breakdown['bonuses']['fcf_payout'], breakdown['bonuses']['peg'], breakdown['bonuses']['ev_ebitda'] ]) breakdown['penalties']['total'] = sum([ breakdown['penalties']['roe_negative'], breakdown['penalties']['debt_excess'] ]) # Clamp score to [0, 100] to avoid negative values after penalties final_score = max(0, min(100, score)) # === v4.1 RETURN DICT === return { 'score': final_score, 'breakdown': breakdown, 'parameters': parameters } # === SCHEMA v4: ADVANCED QUALITY METRICS === def _safe_get_field(self, df, field_names: list, period_index: int = 0): """ Safely extract field from yfinance DataFrame with fuzzy name matching Args: df: DataFrame (balance_sheet, financials, cashflow) field_names: List of possible field names period_index: Column index (0 = most recent) Returns: Field value or None if not found """ if df is None or df.empty or len(df.columns) <= period_index: return None for name in field_names: name_normalized = name.lower().replace(' ', '').replace('_', '').replace('-', '') for idx in df.index: idx_normalized = str(idx).lower().replace(' ', '').replace('_', '').replace('-', '') if name_normalized in idx_normalized or idx_normalized in name_normalized: try: value = df.loc[idx, df.columns[period_index]] if value is not None and not (isinstance(value, float) and (value != value)): # Check for NaN return value except: continue return None def calculate_roic(self, ticker: str) -> Optional[float]: """ Calculate ROIC = NOPAT / Invested Capital NOPAT = EBIT × (1 - Tax Rate) Invested Capital = Total Assets - Current Liabilities - Cash OR Debt + Equity (fallback) Returns: ROIC as percentage (e.g., 15.5) or None if not calculable """ try: import yfinance as yf ticker_obj = yf.Ticker(ticker) balance_sheet = ticker_obj.balance_sheet income_stmt = ticker_obj.financials if balance_sheet.empty or income_stmt.empty: logger.debug(f"{ticker}: ROIC - Empty balance sheet or income statement") return None # === STEP 1: Calculate EBIT (with fallback) === ebit = self._safe_get_field(income_stmt, [ 'EBIT', 'Operating Income', 'OperatingIncome', 'Total Operating Income As Reported', 'Normalized EBITDA' ]) if ebit is None: # FALLBACK: EBIT = Net Income + Interest Expense + Tax Provision logger.debug(f"{ticker}: ROIC - Primary EBIT not found, trying fallback calculation") net_income = self._safe_get_field(income_stmt, ['Net Income', 'NetIncome', 'Net Income Common Stockholders']) interest_exp = self._safe_get_field(income_stmt, ['Interest Expense', 'InterestExpense', 'Net Interest Income']) tax_prov = self._safe_get_field(income_stmt, ['Tax Provision', 'TaxProvision', 'Income Tax Expense']) if net_income is not None: ebit = net_income if interest_exp is not None: ebit += abs(interest_exp) if tax_prov is not None: ebit += abs(tax_prov) logger.debug(f"{ticker}: ROIC - Calculated EBIT via fallback: {ebit}") else: logger.debug(f"{ticker}: ROIC - Cannot calculate EBIT (no Net Income)") return None # === STEP 2: Calculate Tax Rate (with fallback) === tax_prov = self._safe_get_field(income_stmt, ['Tax Provision', 'TaxProvision', 'Income Tax Expense']) net_income = self._safe_get_field(income_stmt, ['Net Income', 'NetIncome', 'Net Income Common Stockholders']) if tax_prov and net_income and (net_income + abs(tax_prov)) != 0: tax_rate = abs(tax_prov) / (net_income + abs(tax_prov)) logger.debug(f"{ticker}: ROIC - Calculated tax rate from financials: {tax_rate:.2%}") else: # Fallback: country-aware tax rate if '.MI' in ticker: # Italy tax_rate = 0.24 elif '.PA' in ticker: # France tax_rate = 0.25 elif '.DE' in ticker: # Germany tax_rate = 0.30 else: # USA and others tax_rate = 0.21 logger.debug(f"{ticker}: ROIC - Using country-aware fallback tax rate: {tax_rate:.2%}") # === STEP 3: Calculate NOPAT === nopat = ebit * (1 - tax_rate) logger.debug(f"{ticker}: ROIC - NOPAT: {nopat}") # === STEP 4: Calculate Invested Capital (with fallback) === total_assets = self._safe_get_field(balance_sheet, ['Total Assets', 'TotalAssets']) current_liab = self._safe_get_field(balance_sheet, [ 'Current Liabilities', 'CurrentLiabilities', 'Total Current Liabilities' ]) cash = self._safe_get_field(balance_sheet, [ 'Cash And Cash Equivalents', 'Cash', 'Cash Cash Equivalents And Short Term Investments', 'CashAndCashEquivalents' ]) invested_capital = None # PRIMARY METHOD: Total Assets - Current Liabilities - Cash if total_assets is not None and current_liab is not None: invested_capital = total_assets - current_liab if cash: invested_capital -= cash logger.debug(f"{ticker}: ROIC - Invested Capital (primary method): {invested_capital}") # FALLBACK METHOD: Total Debt + Total Equity if invested_capital is None or invested_capital <= 0: logger.debug(f"{ticker}: ROIC - Primary Invested Capital failed, trying fallback (Debt + Equity)") total_debt = self._safe_get_field(balance_sheet, [ 'Total Debt', 'TotalDebt', 'Long Term Debt', 'Net Debt', 'Short Long Term Debt Total' ]) stockholders_equity = self._safe_get_field(balance_sheet, [ 'Stockholders Equity', 'StockholdersEquity', 'Total Equity Gross Minority Interest', 'Common Stock Equity' ]) if total_debt is not None and stockholders_equity is not None: invested_capital = abs(total_debt) + stockholders_equity logger.debug(f"{ticker}: ROIC - Invested Capital (fallback Debt+Equity): {invested_capital}") elif stockholders_equity is not None: # Ultra-fallback: use just equity if debt not available invested_capital = stockholders_equity logger.debug(f"{ticker}: ROIC - Invested Capital (ultra-fallback, Equity only): {invested_capital}") if invested_capital is None or invested_capital <= 0: logger.debug(f"{ticker}: ROIC - Cannot calculate Invested Capital (all methods failed)") return None # === STEP 5: Calculate ROIC === roic = (nopat / invested_capital) * 100 # Sanity check for anomalous values if abs(roic) > 1000: # ROIC > 1000% is likely data error logger.warning(f"{ticker}: ROIC calculation resulted in anomalous value: {roic}% - returning None") return None logger.debug(f"{ticker}: ROIC - Final ROIC: {roic:.2f}%") return round(roic, 2) except Exception as e: logger.debug(f"{ticker}: Error calculating ROIC: {e}") return None def calculate_interest_coverage(self, ticker: str, sector: str) -> Optional[float]: """ Calculate Interest Coverage = EBIT / Interest Expense Returns: Coverage ratio (e.g., 5.2) or None """ # Skip for financials and utilities if self._is_financial_sector(ticker, sector) or ticker in self.UTILITY_TICKERS: logger.debug(f"{ticker}: Interest Coverage - Skipped (Financial/Utility sector)") return None try: import yfinance as yf ticker_obj = yf.Ticker(ticker) income_stmt = ticker_obj.financials if income_stmt.empty: logger.debug(f"{ticker}: Interest Coverage - Empty income statement") return None ebit = self._safe_get_field(income_stmt, [ 'EBIT', 'Operating Income', 'OperatingIncome', 'Total Operating Income As Reported', 'Normalized EBITDA' ]) interest = self._safe_get_field(income_stmt, [ 'Interest Expense', 'InterestExpense', 'Interest Expense Non Operating', 'Net Interest Income' ]) if ebit is None: logger.debug(f"{ticker}: Interest Coverage - EBIT not found") return None if interest is None: logger.debug(f"{ticker}: Interest Coverage - Interest Expense not found") return None interest_abs = abs(interest) # Interest expense usually negative if interest_abs == 0: logger.debug(f"{ticker}: Interest Coverage - Interest Expense is zero") return None # Avoid division by zero coverage = ebit / interest_abs logger.debug(f"{ticker}: Interest Coverage - Calculated: {coverage:.2f}x") return round(coverage, 2) except Exception as e: logger.debug(f"{ticker}: Error calculating Interest Coverage: {e}") return None def calculate_piotroski_fscore(self, ticker: str) -> Optional[int]: """ Calculate Piotroski F-Score (0-9 points) Requires 2+ years of historical data for YoY comparisons 9 Criteria: Profitability (4): ROA>0, OCF>0, ΔROA>0, Accruals<0 Leverage (3): ΔLTD≤0, ΔCR>0, NoNewEquity Efficiency (2): ΔGM>0, ΔAT>0 """ try: import yfinance as yf ticker_obj = yf.Ticker(ticker) bs = ticker_obj.balance_sheet inc = ticker_obj.financials cf = ticker_obj.cashflow # Need at least 2 years if bs.empty or inc.empty or cf.empty: logger.debug(f"{ticker}: F-Score - Missing financial statements") return None if len(bs.columns) < 2 or len(inc.columns) < 2: logger.debug(f"{ticker}: F-Score - Insufficient historical data (need 2+ years)") return None score = 0 criteria_met = [] # === PROFITABILITY (4 points) === # 1. ROA > 0 net_income = self._safe_get_field(inc, ['Net Income', 'NetIncome'], 0) total_assets = self._safe_get_field(bs, ['Total Assets', 'TotalAssets'], 0) if net_income and total_assets and total_assets > 0: roa_t0 = net_income / total_assets if roa_t0 > 0: score += 1 criteria_met.append("ROA>0") # 3. ΔROA > 0 net_income_t1 = self._safe_get_field(inc, ['Net Income', 'NetIncome'], 1) total_assets_t1 = self._safe_get_field(bs, ['Total Assets', 'TotalAssets'], 1) if net_income_t1 and total_assets_t1 and total_assets_t1 > 0: roa_t1 = net_income_t1 / total_assets_t1 if roa_t0 > roa_t1: score += 1 criteria_met.append("ΔROA>0") # 2. Operating Cash Flow > 0 ocf = self._safe_get_field(cf, ['Operating Cash Flow', 'OperatingCashFlow', 'Total Cash From Operating Activities'], 0) if ocf and ocf > 0: score += 1 criteria_met.append("OCF>0") # 4. Accruals < 0 (OCF > Net Income = quality) if net_income and ocf > net_income: score += 1 criteria_met.append("Accruals<0") # === LEVERAGE/LIQUIDITY (3 points) === # 5. ΔLong-term Debt ≤ 0 ltd_t0 = self._safe_get_field(bs, ['Long Term Debt', 'LongTermDebt'], 0) ltd_t1 = self._safe_get_field(bs, ['Long Term Debt', 'LongTermDebt'], 1) if ltd_t0 is not None and ltd_t1 is not None: if ltd_t0 <= ltd_t1: score += 1 # 6. ΔCurrent Ratio > 0 curr_assets_t0 = self._safe_get_field(bs, ['Current Assets', 'CurrentAssets'], 0) curr_liab_t0 = self._safe_get_field(bs, ['Current Liabilities', 'CurrentLiabilities'], 0) curr_assets_t1 = self._safe_get_field(bs, ['Current Assets', 'CurrentAssets'], 1) curr_liab_t1 = self._safe_get_field(bs, ['Current Liabilities', 'CurrentLiabilities'], 1) if all([curr_assets_t0, curr_liab_t0, curr_assets_t1, curr_liab_t1]): if curr_liab_t0 > 0 and curr_liab_t1 > 0: cr_t0 = curr_assets_t0 / curr_liab_t0 cr_t1 = curr_assets_t1 / curr_liab_t1 if cr_t0 > cr_t1: score += 1 # 7. No New Equity (shares not increased) shares_t0 = self._safe_get_field(bs, [ 'Share Issued', 'Ordinary Shares Number', 'Common Stock Shares Outstanding', 'ShareIssued' ], 0) shares_t1 = self._safe_get_field(bs, [ 'Share Issued', 'Ordinary Shares Number', 'Common Stock Shares Outstanding', 'ShareIssued' ], 1) if shares_t0 and shares_t1: if shares_t0 <= shares_t1: score += 1 # === OPERATING EFFICIENCY (2 points) === # 8. ΔGross Margin > 0 gross_profit_t0 = self._safe_get_field(inc, ['Gross Profit', 'GrossProfit'], 0) revenue_t0 = self._safe_get_field(inc, ['Total Revenue', 'TotalRevenue'], 0) gross_profit_t1 = self._safe_get_field(inc, ['Gross Profit', 'GrossProfit'], 1) revenue_t1 = self._safe_get_field(inc, ['Total Revenue', 'TotalRevenue'], 1) if all([gross_profit_t0, revenue_t0, gross_profit_t1, revenue_t1]): if revenue_t0 > 0 and revenue_t1 > 0: gm_t0 = gross_profit_t0 / revenue_t0 gm_t1 = gross_profit_t1 / revenue_t1 if gm_t0 > gm_t1: score += 1 # 9. ΔAsset Turnover > 0 if revenue_t0 and total_assets and revenue_t1: total_assets_t1 = self._safe_get_field(bs, ['Total Assets', 'TotalAssets'], 1) if total_assets > 0 and total_assets_t1 and total_assets_t1 > 0: at_t0 = revenue_t0 / total_assets at_t1 = revenue_t1 / total_assets_t1 if at_t0 > at_t1: score += 1 logger.debug(f"{ticker}: F-Score - Calculated: {score}/9 (criteria met: {len(criteria_met)})") return score except Exception as e: logger.debug(f"{ticker}: Error calculating Piotroski F-Score: {e}") return None def calculate_advanced_quality_score(self, roic: Optional[float], interest_coverage: Optional[float], fscore: Optional[int]) -> float: """ Calculate Advanced Quality Score (max 20 points) Scoring: - ROIC: 10 pts (≥15%: 10, ≥10%: 7, ≥5%: 4, <5%: 0) - Interest Coverage: 5 pts (≥5x: 5, ≥3x: 3, ≥1.5x: 1, <1.5x: 0, None: 2.5) - F-Score: 5 pts (8-9: 5, 6-7: 3, 4-5: 1, 0-3: 0) """ score = 0.0 # ROIC Score (10 pts) if roic is not None: if roic >= 15: score += 10 elif roic >= 10: score += 7 elif roic >= 5: score += 4 # Interest Coverage Score (5 pts) if interest_coverage is not None: if interest_coverage >= 5.0: score += 5 elif interest_coverage >= 3.0: score += 3 elif interest_coverage >= 1.5: score += 1 elif interest_coverage is None: # Neutral for excluded sectors (financials, utilities) score += 2.5 # Piotroski F-Score (5 pts) if fscore is not None: if fscore >= 8: score += 5 elif fscore >= 6: score += 3 elif fscore >= 4: score += 1 return score def _calculate_margin_of_safety(self, current_price: float, fair_value: float) -> float: """Calculate margin of safety as percentage""" if fair_value <= 0 or current_price <= 0: return -100.0 margin = ((fair_value - current_price) / fair_value) * 100 # Cap extreme values to prevent display issues return max(-99.9, min(99.9, margin)) def _get_recommendation(self, score: int, margin_of_safety: float) -> str: """ Determine recommendation based on score and margin of safety. Hierarchy (Deep Value Philosophy): - STRONG BUY: Excellent business (80+) with strong safety margin (20%+) - BUY: Excellent business (80+) with good safety margin (15%+) - HOLD: Good business (60+) trading at or below fair value (0%+ margin) - WATCH: High-quality business (70+) but overvalued (negative margin) - AVOID: Below quality threshold or significantly overvalued """ if score >= self.BUY_SCORE and margin_of_safety >= self.STRONG_BUY_MARGIN: return "STRONG BUY" elif score >= self.BUY_SCORE and margin_of_safety >= self.BUY_MARGIN: return "BUY" elif score >= self.HOLD_SCORE and margin_of_safety >= 0.0: return "HOLD" elif score >= self.WATCH_SCORE and margin_of_safety < 0.0: return self.RATING_WATCH else: return "AVOID" def _generate_reasoning( self, stock_data: Dict, score: int, fair_value: float, margin_of_safety: float, valutazione: str ) -> str: """Generate human-readable reasoning for the recommendation""" ticker = stock_data.get('ticker', '') name = stock_data.get('name', '') sector = stock_data.get('sector', 'N/A') price = stock_data.get('price', 0) pe = stock_data.get('pe_ratio') roe = stock_data.get('roe', 0) debt_eq = stock_data.get('debt_to_equity') debt_eq_reported = stock_data.get('debt_to_equity_reported') net_debt_val = stock_data.get('net_debt') # Start with valuation assessment if valutazione == "STRONG BUY": reasoning = f"{name} è un'eccezionale opportunità di valore: score {score}/100 con margin of safety straordinario del {margin_of_safety:.1f}%! " elif valutazione == "BUY": reasoning = f"{name} è un'opportunità eccellente: score {score}/100 con margin of safety del {margin_of_safety:.1f}%. " elif margin_of_safety > 0: # Positive margin = fair value > current price = undervalued if score >= 60: if margin_of_safety >= self.BUY_MARGIN: reasoning = ( f"{name} è un business di qualità (score {score}/100). " f"Il prezzo attuale di €{price:.2f} incorpora uno sconto del {margin_of_safety:.1f}% rispetto al fair value di €{fair_value:.2f}, " f"ma il punteggio non raggiunge ancora la soglia BUY ({self.BUY_SCORE}). " ) else: reasoning = ( f"{name} è un business di qualità (score {score}/100). " f"Il prezzo di €{price:.2f} offre uno sconto limitato del {margin_of_safety:.1f}% rispetto al fair value di €{fair_value:.2f}. " ) else: reasoning = ( f"{name} (score {score}/100) quota a €{price:.2f}, con uno sconto del {margin_of_safety:.1f}% rispetto al fair value stimato di €{fair_value:.2f}, " "ma la qualità del business non soddisfa i nostri standard. " ) else: # Negative margin = current price > fair value = overvalued reasoning = f"{name} presenta un premio del {abs(margin_of_safety):.1f}% rispetto al fair value stimato di €{fair_value:.2f}. " # Add quality assessment if roe: roe_pct = roe * 100 if roe_pct > 15: reasoning += f"Eccellente ROE del {roe_pct:.1f}% indica un business di qualità superiore. " elif roe_pct > 10: reasoning += f"ROE del {roe_pct:.1f}% mostra una buona redditività. " else: reasoning += f"ROE del {roe_pct:.1f}% è al di sotto degli standard desiderati. " # Add valuation comment if pe: if pe < 15: reasoning += f"P/E di {pe:.1f}x è attraente. " elif pe < 25: reasoning += f"P/E di {pe:.1f}x è nella norma. " else: reasoning += f"P/E di {pe:.1f}x è elevato. " # Add debt assessment if net_debt_val is not None and net_debt_val <= 0 and debt_eq_reported is not None and debt_eq_reported > 1.0: reasoning += f"Posizione net cash: D/E riportato {debt_eq_reported:.2f} distorto dalla cassa/depositi. " elif debt_eq is not None: display_de = debt_eq_reported if debt_eq_reported is not None else debt_eq if debt_eq < 0.5: reasoning += f"Bilancio solido con debt/equity di {display_de:.2f}. " elif debt_eq < 1.5: reasoning += f"Debt/equity di {display_de:.2f} è accettabile. " else: reasoning += f"Debt/equity di {display_de:.2f} richiede cautela. " # Final recommendation if valutazione == "STRONG BUY": reasoning += f"Questa è un'occasione rara - Acquistare immediatamente! Il margine di sicurezza del {margin_of_safety:.1f}% offre protezione eccezionale." elif valutazione == "BUY": reasoning += f"Raccomando l'acquisto a prezzi attuali." elif valutazione == "HOLD": reasoning += ( f"{name} presenta fondamentali discreti (Warren Score: {score}/100) " f"e sta trattando a €{price:.2f}, sostanzialmente in linea " f"con il fair value stimato di €{fair_value:.2f}. " ) reasoning += ( f"\n\n📊 **Nota**: Prezzo congruo ma **manca un ampio margine di sicurezza**. " f"Opportunità di acquisto limitata. " f"Considerare ingresso solo se scende sotto €{fair_value * 0.90:.2f} (-10% da fair value)." ) elif valutazione == self.RATING_WATCH: # Alta qualità ma prezzo eccessivo reasoning = f"{name} è un'azienda eccellente (Warren Score: {score}/100) con fondamentali solidi " if roe: roe_pct = roe * 100 if roe_pct > 20: reasoning += f"(ROE {roe_pct:.1f}%), " profit_margin = stock_data.get('profit_margin') if profit_margin and profit_margin > 0.15: reasoning += f"margini operativi {profit_margin * 100:.1f}%, " reasoning += ( f"ma il prezzo attuale di €{price:.2f} incorpora un premio " f"eccessivo ({abs(margin_of_safety):.1f}% sopra il fair value di €{fair_value:.2f}). " ) reasoning += ( f"\n\n📊 **Strategia Consigliata**: Inserire in **Watchlist** e attendere storni del mercato. " f"Livelli di ingresso interessanti: €{fair_value * 1.05:.2f} (fair value) " f"o €{fair_value * 0.85:.2f} (con 15% margin of safety)." ) else: reasoning += f"Evitare al prezzo attuale, non soddisfa i criteri di qualità e prezzo." return reasoning