from datetime import datetime, timedelta, UTC from sqlalchemy.orm import Session from db import SessionLocal from models.market import MarketDataDB from gridstatus import ISONE, MISO, NYISO from typing import Dict, Type MARKET_CLASSES: Dict[str, Type] = { "ISONE": ISONE, "MISO": MISO, "NYISO": NYISO, } def get_iso_instance(market: str): market = market.upper() if market not in MARKET_CLASSES: raise ValueError( f"Unsupported market '{market}'. Supported: {list(MARKET_CLASSES.keys())}" ) return MARKET_CLASSES[market]() def update_market_data(): db: Session = SessionLocal() for market_name in MARKET_CLASSES.keys(): print(f"Processing {market_name}") iso = get_iso_instance(market_name) last_realtime = ( db.query(MarketDataDB) .filter(MarketDataDB.market == market_name, MarketDataDB.type == "REALTIME") .order_by(MarketDataDB.timestamp.desc()) .first() ) if not last_realtime or ( datetime.now(UTC) - last_realtime.timestamp.replace(tzinfo=UTC) > timedelta(minutes=5) ): print(f"Getting realtime data for {market_name}") df = iso.get_lmp(date="latest", market="REAL_TIME_5_MIN", locations="ALL") df["Interval Start"] = df["Interval Start"].dt.tz_convert("UTC") grouped = ( df.groupby("Interval Start")[["LMP", "Energy", "Congestion", "Loss"]] .mean() .reset_index() ) for _, row in grouped.iterrows(): if last_realtime and row[ "Interval Start" ] <= last_realtime.timestamp.replace(tzinfo=UTC): continue # Skip old data entry = MarketDataDB( timestamp=row["Interval Start"], lmp=row["LMP"], energy=row["Energy"], congestion=row["Congestion"], loss=row["Loss"], market=market_name, type="REALTIME", ) db.add(entry) last_dayahead = ( db.query(MarketDataDB) .filter(MarketDataDB.market == market_name, MarketDataDB.type == "DAYAHEAD") .order_by(MarketDataDB.timestamp.desc()) .first() ) if not last_dayahead or ( datetime.now(UTC) - last_dayahead.timestamp.replace(tzinfo=UTC) > timedelta(hours=1) ): print(f"Getting day-ahead data for {market_name}") now_utc = datetime.now(UTC) day_ahead_date = now_utc.date() if ( now_utc.hour >= 18 ): # After 6PM UTC, markets usually publish next day's data day_ahead_date += timedelta(days=1) df = iso.get_lmp( date=day_ahead_date, market="DAY_AHEAD_HOURLY", locations="ALL" ) df["Interval Start"] = df["Interval Start"].dt.tz_convert("UTC") grouped = ( df.groupby("Interval Start")[["LMP", "Energy", "Congestion", "Loss"]] .mean() .reset_index() ) for _, row in grouped.iterrows(): if last_dayahead and row[ "Interval Start" ] <= last_dayahead.timestamp.replace(tzinfo=UTC): continue entry = MarketDataDB( timestamp=row["Interval Start"], lmp=row["LMP"], energy=row["Energy"], congestion=row["Congestion"], loss=row["Loss"], market=market_name, type="DAYAHEAD", ) db.add(entry) db.commit() db.close() if __name__ == "__main__": update_market_data()