from datetime import UTC, datetime, timedelta from sqlalchemy.orm import Session from db import SessionLocal from models.bid import Bid from models.auth import User from gridstatus import ISONE, NYISO, MISO from zoneinfo import ZoneInfo from sqlalchemy.exc import OperationalError import time MARKET_ISOS = { "ISONE": ISONE(), "NYISO": NYISO(), "MISO": MISO(), } MARKET_TIMEZONES = { "ISONE": ZoneInfo("America/New_York"), "NYISO": ZoneInfo("America/New_York"), "MISO": ZoneInfo("America/Chicago"), } def get_day_ahead_price(market: str, target_time: datetime) -> float: """Fetch the Day Ahead clearing price for the hour of target_time.""" iso = MARKET_ISOS[market] df = iso.get_lmp( date=target_time.date(), market="DAY_AHEAD_HOURLY", locations="ALL" ) df = df.groupby("Interval Start")["LMP"].mean().reset_index() for _, row in df.iterrows(): if abs( row["Interval Start"] - target_time.replace(minute=0, second=0, microsecond=0) ) < timedelta(minutes=30): return row["LMP"] raise ValueError(f"No day ahead price found for {target_time} in {market}") def get_real_time_prices(market: str, target_time: datetime) -> list[float]: """Fetch the Real Time 5-min prices during the hour of target_time.""" iso = MARKET_ISOS[market] df = iso.get_lmp(date=target_time.date(), market="REAL_TIME_5_MIN", locations="ALL") df = df.groupby("Interval Start")["LMP"].mean().reset_index() start_of_hour = target_time.replace(minute=0, second=0, microsecond=0) end_of_hour = start_of_hour + timedelta(hours=1) prices = [ row["LMP"] for _, row in df.iterrows() if start_of_hour <= row["Interval Start"] < end_of_hour ] return prices def process_bids(): db: Session = SessionLocal() now_utc = datetime.now(UTC) # Step 1: Process Submitted bids submitted_bids = db.query(Bid).filter(Bid.status == "Submitted").all() for bid in submitted_bids: try: market = bid.market market_tz = MARKET_TIMEZONES[market] target_time = bid.timestamp.astimezone(market_tz) now = now_utc.astimezone(market_tz) # Only process bids whose timestamp is already in the past if target_time > now: print(f"Skipping future bid {bid.id} at {target_time} ({market})") continue print(f"Processing submitted bid {bid.id} for {target_time} ({market})") day_ahead_price = get_day_ahead_price(market, target_time) if bid.price >= day_ahead_price: bid.status = "Executed" bid.pnl = None # Will be calculated later print(f"Bid {bid.id}: Executed (waiting for real-time PnL)") else: bid.status = "Fail" bid.pnl = None print(f"Bid {bid.id}: Failed (price too low)") max_retries = 3 for attempt in range(max_retries): try: db.commit() break except OperationalError as e: if "database is locked" in str(e).lower(): if attempt < max_retries - 1: time.sleep(2) continue else: raise Exception( "Database is locked, and retries exhausted." ) else: raise e except Exception as e: print(f"Error processing submitted bid {bid.id}: {e}") # Step 2: Process Executed bids (calculate PnL) executed_bids = db.query(Bid).filter(Bid.status == "Executed").all() for bid in executed_bids: try: market = bid.market market_tz = MARKET_TIMEZONES[market] target_time = bid.timestamp.astimezone(market_tz) now = now_utc.astimezone(market_tz) if now < target_time + timedelta(hours=1): print(f"Skipping bid {bid.id}, real-time window not complete yet") continue print(f"Settling executed bid {bid.id} for {target_time} ({market})") real_time_prices = get_real_time_prices(market, target_time) if not real_time_prices: print(f"No real-time prices available for bid {bid.id} yet") continue # Retry later avg_real_time_price = sum(real_time_prices) / len(real_time_prices) day_ahead_price = get_day_ahead_price(market, target_time) pnl = (day_ahead_price - avg_real_time_price) * bid.quantity bid.status = "Settled" bid.pnl = pnl print(f"Bid {bid.id} Settled | PnL: ${pnl:.2f}") max_retries = 3 for attempt in range(max_retries): try: db.commit() break except OperationalError as e: if "database is locked" in str(e).lower(): if attempt < max_retries - 1: time.sleep(2) continue else: raise Exception( "Database is locked, and retries exhausted." ) else: raise e except Exception as e: print(f"Error settling executed bid {bid.id}: {e}") db.close() if __name__ == "__main__": process_bids()