""" Transaction Management Module. Handles BUY, SELL, and DIVIDEND transactions. Updates portfolio holdings automatically. """ import logging from typing import List, Dict, Tuple, Optional from datetime import datetime logger = logging.getLogger(__name__) class TransactionManager: """Manages portfolio transactions and updates holdings.""" def __init__(self, db_manager, portfolio): """ Initialize transaction manager. Args: db_manager: DBManager instance portfolio: Portfolio instance """ self.db_manager = db_manager self.portfolio = portfolio def process_buy( self, ticker: str, quantity: float, price: float, date: str, notes: str = "" ) -> None: """ Process a BUY transaction. Updates holding quantity and average price using weighted average. Args: ticker: Stock ticker quantity: Number of shares bought price: Price per share date: Transaction date (YYYY-MM-DD) notes: Optional notes """ amount = quantity * price # Get existing holding (if any) holding_data = self.db_manager.get_holding(ticker) if holding_data: # Update existing holding old_qty = holding_data['quantity'] old_avg_price = holding_data['avg_price'] # Calculate new weighted average price new_qty = old_qty + quantity new_avg_price = self.calculate_new_avg_price( old_qty, old_avg_price, quantity, price ) # Update holding in database self.db_manager.update_holding( ticker, quantity=new_qty, avg_price=new_avg_price ) # Update portfolio object self.portfolio.update_holding_quantity(ticker, new_qty, new_avg_price) logger.info(f"Updated holding {ticker}: {old_qty} -> {new_qty} @ {new_avg_price}") else: # Create new holding (should rarely happen - use add_holding dialog instead) logger.warning(f"BUY transaction for non-existent holding {ticker}, creating new holding") self.portfolio.add_holding( ticker=ticker, name=ticker, # Use ticker as name (user should edit via dialog) asset_type='Stock', # Default to Stock quantity=quantity, avg_price=price ) # Add transaction to log self.db_manager.add_transaction( ticker=ticker, tx_type='BUY', date=date, quantity=quantity, price=price, amount=amount, notes=notes ) logger.info(f"Processed BUY: {ticker} {quantity} @ {price} = {amount}") def process_sell( self, ticker: str, quantity: float, price: float, date: str, notes: str = "" ) -> Tuple[float, float]: """ Process a SELL transaction. Updates holding quantity and calculates realized P&L. Args: ticker: Stock ticker quantity: Number of shares sold price: Price per share date: Transaction date notes: Optional notes Returns: Tuple of (realized_pnl_amount, realized_pnl_percent) Raises: ValueError: If insufficient quantity to sell """ amount = quantity * price # Get existing holding holding_data = self.db_manager.get_holding(ticker) if not holding_data: raise ValueError(f"Cannot sell {ticker}: holding not found") old_qty = holding_data['quantity'] avg_price = holding_data['avg_price'] if quantity > old_qty: raise ValueError(f"Cannot sell {quantity} shares of {ticker}: only {old_qty} available") # Calculate realized P&L cost_basis = quantity * avg_price realized_pnl = amount - cost_basis realized_pnl_pct = (realized_pnl / cost_basis) * 100 if cost_basis > 0 else 0.0 # Update holding quantity new_qty = old_qty - quantity if new_qty > 0: # Partial sell: keep holding with reduced quantity self.db_manager.update_holding(ticker, quantity=new_qty) self.portfolio.update_holding_quantity(ticker, new_qty, avg_price) logger.info(f"Partial sell {ticker}: {old_qty} -> {new_qty}") else: # Complete sell: remove holding self.db_manager.delete_holding(ticker) self.portfolio.holdings = [h for h in self.portfolio.holdings if h.ticker != ticker] self.portfolio.calculate_weights() logger.info(f"Complete sell {ticker}: holding removed") # Add transaction to log (amount is positive for sells) self.db_manager.add_transaction( ticker=ticker, tx_type='SELL', date=date, quantity=quantity, price=price, amount=amount, notes=f"Realized P&L: {realized_pnl:+.2f} ({realized_pnl_pct:+.1f}%). {notes}".strip() ) logger.info(f"Processed SELL: {ticker} {quantity} @ {price} = {amount} (P&L: {realized_pnl:+.2f})") return (realized_pnl, realized_pnl_pct) def process_dividend( self, ticker: str, amount: float, date: str, notes: str = "" ) -> None: """ Process a DIVIDEND transaction. Records dividend income (doesn't affect holdings). Args: ticker: Stock ticker amount: Dividend amount received date: Transaction date notes: Optional notes (e.g., "Q4 2025") """ # Verify holding exists holding_data = self.db_manager.get_holding(ticker) if not holding_data: logger.warning(f"DIVIDEND for non-existent holding {ticker}") # Add transaction to log (quantity and price are NULL for dividends) self.db_manager.add_transaction( ticker=ticker, tx_type='DIVIDEND', date=date, quantity=None, price=None, amount=amount, notes=notes ) logger.info(f"Processed DIVIDEND: {ticker} {amount} ({notes})") def get_transaction_history( self, ticker: Optional[str] = None, tx_type: Optional[str] = None ) -> List[Dict]: """ Get transaction history with optional filtering. Args: ticker: Filter by ticker (None for all) tx_type: Filter by type (None for all) Returns: List of transactions as dictionaries """ return self.db_manager.get_transactions(ticker=ticker, tx_type=tx_type) def calculate_new_avg_price( self, old_qty: float, old_avg: float, buy_qty: float, buy_price: float ) -> float: """ Calculate new weighted average price after a buy. Formula: (old_qty * old_avg + buy_qty * buy_price) / (old_qty + buy_qty) Args: old_qty: Current quantity held old_avg: Current average price buy_qty: Quantity being bought buy_price: Price of new purchase Returns: New weighted average price """ total_cost = (old_qty * old_avg) + (buy_qty * buy_price) total_qty = old_qty + buy_qty if total_qty > 0: new_avg = total_cost / total_qty else: new_avg = buy_price logger.debug(f"Calculated new avg price: {old_avg} -> {new_avg}") return new_avg def delete_transaction(self, transaction_id: int) -> None: """ Delete a transaction from the log. WARNING: This does NOT reverse the transaction effects on holdings. Use with caution - manual adjustment of holdings may be needed. Args: transaction_id: Transaction ID to delete """ # Get transaction details for logging transactions = self.db_manager.get_transactions() tx = next((t for t in transactions if t['id'] == transaction_id), None) if tx: logger.warning(f"Deleting transaction {transaction_id}: {tx['transaction_type']} {tx['ticker']} (manual holding adjustment may be needed)") self.db_manager.delete_transaction(transaction_id) def get_total_dividends(self, ticker: Optional[str] = None) -> float: """ Get total dividends received. Args: ticker: Filter by ticker (None for all) Returns: Total dividend amount """ transactions = self.db_manager.get_transactions(ticker=ticker, tx_type='DIVIDEND') total = sum(tx['amount'] for tx in transactions) logger.debug(f"Total dividends: {total} (ticker={ticker or 'all'})") return total def get_realized_pnl(self, ticker: Optional[str] = None) -> float: """ Get total realized P&L from SELL transactions. Note: This parses P&L from transaction notes (format: "Realized P&L: +123.45") Args: ticker: Filter by ticker (None for all) Returns: Total realized P&L """ transactions = self.db_manager.get_transactions(ticker=ticker, tx_type='SELL') total_pnl = 0.0 for tx in transactions: # Try to parse P&L from notes notes = tx.get('notes', '') if 'Realized P&L:' in notes: try: # Extract number after "Realized P&L:" pnl_str = notes.split('Realized P&L:')[1].split('(')[0].strip() pnl = float(pnl_str) total_pnl += pnl except (ValueError, IndexError): logger.warning(f"Failed to parse P&L from transaction {tx['id']}: {notes}") logger.debug(f"Total realized P&L: {total_pnl} (ticker={ticker or 'all'})") return total_pnl