aboutsummaryrefslogtreecommitdiff
path: root/server/services/get_data.py
blob: fbbf12fab6b7574e38c408ffcb72108f20bbb9e4 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
from datetime import datetime, timedelta, UTC
from sqlalchemy.orm import Session
from db import SessionLocal
from models.market import MarketDataDB
from gridstatus import ISONE, MISO, NYISO
from typing import Dict, Type
from sqlalchemy.exc import OperationalError
import time

MARKET_CLASSES: Dict[str, Type] = {
    "ISONE": ISONE,
    "MISO": MISO,
    "NYISO": NYISO,
}


def get_iso_instance(market: str):
    market = market.upper()
    if market not in MARKET_CLASSES:
        raise ValueError(
            f"Unsupported market '{market}'. Supported: {list(MARKET_CLASSES.keys())}"
        )
    return MARKET_CLASSES[market]()


def update_market_data():
    db: Session = SessionLocal()

    for market_name in MARKET_CLASSES.keys():
        print(f"Processing {market_name}")
        iso = get_iso_instance(market_name)

        last_realtime = (
            db.query(MarketDataDB)
            .filter(MarketDataDB.market == market_name, MarketDataDB.type == "REALTIME")
            .order_by(MarketDataDB.timestamp.desc())
            .first()
        )

        if not last_realtime or (
            datetime.now(UTC) - last_realtime.timestamp.replace(tzinfo=UTC)
            > timedelta(minutes=5)
        ):
            print(f"Getting realtime data for {market_name}")
            df = iso.get_lmp(date="latest", market="REAL_TIME_5_MIN", locations="ALL")
            df["Interval Start"] = df["Interval Start"].dt.tz_convert("UTC")
            grouped = (
                df.groupby("Interval Start")[["LMP", "Energy", "Congestion", "Loss"]]
                .mean()
                .reset_index()
            )

            for _, row in grouped.iterrows():
                if last_realtime and row[
                    "Interval Start"
                ] <= last_realtime.timestamp.replace(tzinfo=UTC):
                    continue  # Skip old data
                entry = MarketDataDB(
                    timestamp=row["Interval Start"],
                    lmp=row["LMP"],
                    energy=row["Energy"],
                    congestion=row["Congestion"],
                    loss=row["Loss"],
                    market=market_name,
                    type="REALTIME",
                )
                db.add(entry)

        last_dayahead = (
            db.query(MarketDataDB)
            .filter(MarketDataDB.market == market_name, MarketDataDB.type == "DAYAHEAD")
            .order_by(MarketDataDB.timestamp.desc())
            .first()
        )

        if not last_dayahead or (
            datetime.now(UTC) - last_dayahead.timestamp.replace(tzinfo=UTC)
            > timedelta(hours=1)
        ):
            print(f"Getting day-ahead data for {market_name}")
            now_utc = datetime.now(UTC)
            day_ahead_date = now_utc.date()
            if (
                now_utc.hour >= 18
            ):  # After 6PM UTC, markets usually publish next day's data
                day_ahead_date += timedelta(days=1)
            df = iso.get_lmp(
                date=day_ahead_date, market="DAY_AHEAD_HOURLY", locations="ALL"
            )
            df["Interval Start"] = df["Interval Start"].dt.tz_convert("UTC")
            grouped = (
                df.groupby("Interval Start")[["LMP", "Energy", "Congestion", "Loss"]]
                .mean()
                .reset_index()
            )

            for _, row in grouped.iterrows():
                if last_dayahead and row[
                    "Interval Start"
                ] <= last_dayahead.timestamp.replace(tzinfo=UTC):
                    continue
                entry = MarketDataDB(
                    timestamp=row["Interval Start"],
                    lmp=row["LMP"],
                    energy=row["Energy"],
                    congestion=row["Congestion"],
                    loss=row["Loss"],
                    market=market_name,
                    type="DAYAHEAD",
                )
                db.add(entry)

    max_retries = 3
    for attempt in range(max_retries):
        try:
            db.commit()
            break
        except OperationalError as e:
            if "database is locked" in str(e).lower():
                if attempt < max_retries - 1:
                    time.sleep(2)
                    continue
                else:
                    raise Exception("Database is locked, and retries exhausted.")
            else:
                raise e

    db.close()


if __name__ == "__main__":
    update_market_data()