""" Price Fetcher for Portfolio Manager. Fetches stock/ETF prices from yfinance with smart caching. Includes PyQt6 threading wrapper for GUI integration. """ import logging from datetime import datetime, timedelta from typing import Dict, Optional, List import yfinance as yf try: from PyQt6.QtCore import QThread, pyqtSignal PYQT_AVAILABLE = True except ImportError: PYQT_AVAILABLE = False logger = logging.getLogger(__name__) logger.warning("PyQt6 not available, PriceFetcherThread will not work") logger = logging.getLogger(__name__) class PriceFetcher: """Fetches stock prices from yfinance with smart caching.""" def __init__(self, db_manager): """ Initialize price fetcher. Args: db_manager: DBManager instance for cache operations """ self.db_manager = db_manager def fetch_price(self, ticker: str, force_refresh: bool = False) -> Optional[float]: """ Fetch price for a single ticker. Args: ticker: Stock ticker symbol (e.g., 'VWCE.MI') force_refresh: Bypass cache and fetch fresh price Returns: Current price or None if fetch failed """ # Check cache first (unless force refresh) if not force_refresh: cached = self.db_manager.get_cached_price(ticker) if cached and self.is_cache_valid(ticker, max_age_hours=4): price, last_updated = cached logger.debug(f"Using cached price for {ticker}: {price} (cached at {last_updated})") return price # Fetch from yfinance try: logger.debug(f"Fetching price for {ticker} from yfinance...") stock = yf.Ticker(ticker) # Try to get current price from multiple sources price = None # Method 1: Try fast_info (fastest) try: if hasattr(stock, 'fast_info') and hasattr(stock.fast_info, 'last_price'): price = stock.fast_info.last_price except Exception: pass # Method 2: Try info dict if price is None: try: info = stock.info price = info.get('currentPrice') or info.get('regularMarketPrice') except Exception: pass # Method 3: Try history (most reliable but slower) if price is None: try: hist = stock.history(period='1d') if not hist.empty: price = hist['Close'].iloc[-1] except Exception: pass if price is not None and price > 0: # Update cache self.db_manager.update_price_cache(ticker, price) logger.info(f"Fetched price for {ticker}: {price}") return float(price) else: logger.warning(f"Invalid price for {ticker}: {price}") return None except Exception as e: logger.error(f"Failed to fetch price for {ticker}: {e}") return None def fetch_prices_batch( self, tickers: List[str], force_refresh: bool = False ) -> Dict[str, float]: """ Fetch prices for multiple tickers. Args: tickers: List of ticker symbols force_refresh: Bypass cache and fetch fresh prices Returns: Dictionary mapping ticker -> price """ results = {} for ticker in tickers: price = self.fetch_price(ticker, force_refresh=force_refresh) if price is not None: results[ticker] = price logger.info(f"Fetched {len(results)}/{len(tickers)} prices successfully") return results def is_cache_valid(self, ticker: str, max_age_hours: int = 4) -> bool: """ Check if cached price is still valid. Args: ticker: Ticker to check max_age_hours: Maximum age of cache in hours Returns: True if cache is valid, False otherwise """ cached = self.db_manager.get_cached_price(ticker) if not cached: return False price, last_updated = cached age = datetime.now() - last_updated is_valid = age < timedelta(hours=max_age_hours) if not is_valid: logger.debug(f"Cache for {ticker} is stale (age: {age})") return is_valid if PYQT_AVAILABLE: class PriceFetcherThread(QThread): """ PyQt6 thread for fetching prices in background. Signals: progress(int, int): Current ticker index and total count finished(dict): Dictionary of ticker -> price mapping error(str): Error message """ progress = pyqtSignal(int, int) # current, total finished = pyqtSignal(dict) # ticker -> price mapping error = pyqtSignal(str) # error message def __init__(self, price_fetcher: PriceFetcher, tickers: List[str], force_refresh: bool = False): """ Initialize thread. Args: price_fetcher: PriceFetcher instance tickers: List of tickers to fetch force_refresh: Bypass cache """ super().__init__() self.price_fetcher = price_fetcher self.tickers = tickers self.force_refresh = force_refresh self._is_running = True def run(self) -> None: """Run price fetching in background thread.""" try: results = {} total = len(self.tickers) for i, ticker in enumerate(self.tickers): if not self._is_running: logger.info("Price fetching cancelled by user") break # Emit progress self.progress.emit(i + 1, total) # Fetch price price = self.price_fetcher.fetch_price(ticker, force_refresh=self.force_refresh) if price is not None: results[ticker] = price # Emit results if self._is_running: self.finished.emit(results) logger.info(f"Price fetching thread completed: {len(results)}/{total} prices") except Exception as e: error_msg = f"Price fetching failed: {str(e)}" logger.error(error_msg, exc_info=True) self.error.emit(error_msg) def stop(self) -> None: """Stop the thread gracefully.""" self._is_running = False logger.info("Stopping price fetcher thread...") else: # Dummy class if PyQt6 not available class PriceFetcherThread: """Dummy class when PyQt6 is not available.""" def __init__(self, *args, **kwargs): raise RuntimeError("PyQt6 is not available. Install PyQt6 to use PriceFetcherThread.")