from datetime import datetime, timedelta, UTC from sqlalchemy.orm import Session from db import SessionLocal from models.market import MarketDataDB from gridstatus import ISONE, MISO, NYISO from typing import Dict, Type from sqlalchemy.exc import OperationalError import time MARKET_CLASSES: Dict[str, Type] = { "ISONE": ISONE, "MISO": MISO, "NYISO": NYISO, } def get_iso_instance(market: str): market = market.upper() if market not in MARKET_CLASSES: raise ValueError( f"Unsupported market '{market}'. Supported: {list(MARKET_CLASSES.keys())}" ) return MARKET_CLASSES[market]() def update_market_data(): db: Session = SessionLocal() for market_name in MARKET_CLASSES.keys(): print(f"Processing {market_name}") iso = get_iso_instance(market_name) last_realtime = ( db.query(MarketDataDB) .filter(MarketDataDB.market == market_name, MarketDataDB.type == "REALTIME") .order_by(MarketDataDB.timestamp.desc()) .first() ) if not last_realtime or ( datetime.now(UTC) - last_realtime.timestamp.replace(tzinfo=UTC) > timedelta(minutes=5) ): print(f"Getting realtime data for {market_name}") df = iso.get_lmp(date="latest", market="REAL_TIME_5_MIN", locations="ALL") df["Interval Start"] = df["Interval Start"].dt.tz_convert("UTC") grouped = ( df.groupby("Interval Start")[["LMP", "Energy", "Congestion", "Loss"]] .mean() .reset_index() ) for _, row in grouped.iterrows(): if last_realtime and row[ "Interval Start" ] <= last_realtime.timestamp.replace(tzinfo=UTC): continue # Skip old data entry = MarketDataDB( timestamp=row["Interval Start"], lmp=row["LMP"], energy=row["Energy"], congestion=row["Congestion"], loss=row["Loss"], market=market_name, type="REALTIME", ) db.add(entry) last_dayahead = ( db.query(MarketDataDB) .filter(MarketDataDB.market == market_name, MarketDataDB.type == "DAYAHEAD") .order_by(MarketDataDB.timestamp.desc()) .first() ) if not last_dayahead or ( datetime.now(UTC) - last_dayahead.timestamp.replace(tzinfo=UTC) > timedelta(hours=1) ): print(f"Getting day-ahead data for {market_name}") now_utc = datetime.now(UTC) day_ahead_date = now_utc.date() if ( now_utc.hour >= 18 ): # After 6PM UTC, markets usually publish next day's data day_ahead_date += timedelta(days=1) df = iso.get_lmp( date=day_ahead_date, market="DAY_AHEAD_HOURLY", locations="ALL" ) df["Interval Start"] = df["Interval Start"].dt.tz_convert("UTC") grouped = ( df.groupby("Interval Start")[["LMP", "Energy", "Congestion", "Loss"]] .mean() .reset_index() ) for _, row in grouped.iterrows(): if last_dayahead and row[ "Interval Start" ] <= last_dayahead.timestamp.replace(tzinfo=UTC): continue entry = MarketDataDB( timestamp=row["Interval Start"], lmp=row["LMP"], energy=row["Energy"], congestion=row["Congestion"], loss=row["Loss"], market=market_name, type="DAYAHEAD", ) db.add(entry) max_retries = 3 for attempt in range(max_retries): try: db.commit() break except OperationalError as e: if "database is locked" in str(e).lower(): if attempt < max_retries - 1: time.sleep(2) continue else: raise Exception("Database is locked, and retries exhausted.") else: raise e db.close() if __name__ == "__main__": update_market_data()