1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
|
from datetime import datetime, timedelta, UTC
from sqlalchemy.orm import Session
from db import SessionLocal
from models.market import MarketDataDB
from gridstatus import ISONE, MISO, NYISO
from typing import Dict, Type
MARKET_CLASSES: Dict[str, Type] = {
"ISONE": ISONE,
"MISO": MISO,
"NYISO": NYISO,
}
def get_iso_instance(market: str):
market = market.upper()
if market not in MARKET_CLASSES:
raise ValueError(
f"Unsupported market '{market}'. Supported: {list(MARKET_CLASSES.keys())}"
)
return MARKET_CLASSES[market]()
def update_market_data():
db: Session = SessionLocal()
for market_name in MARKET_CLASSES.keys():
print(f"Processing {market_name}")
iso = get_iso_instance(market_name)
last_realtime = (
db.query(MarketDataDB)
.filter(MarketDataDB.market == market_name, MarketDataDB.type == "REALTIME")
.order_by(MarketDataDB.timestamp.desc())
.first()
)
if not last_realtime or (
datetime.now(UTC) - last_realtime.timestamp.replace(tzinfo=UTC)
> timedelta(minutes=5)
):
print(f"Getting realtime data for {market_name}")
df = iso.get_lmp(date="latest", market="REAL_TIME_5_MIN", locations="ALL")
df["Interval Start"] = df["Interval Start"].dt.tz_convert("UTC")
grouped = (
df.groupby("Interval Start")[["LMP", "Energy", "Congestion", "Loss"]]
.mean()
.reset_index()
)
for _, row in grouped.iterrows():
if last_realtime and row[
"Interval Start"
] <= last_realtime.timestamp.replace(tzinfo=UTC):
continue # Skip old data
entry = MarketDataDB(
timestamp=row["Interval Start"],
lmp=row["LMP"],
energy=row["Energy"],
congestion=row["Congestion"],
loss=row["Loss"],
market=market_name,
type="REALTIME",
)
db.add(entry)
last_dayahead = (
db.query(MarketDataDB)
.filter(MarketDataDB.market == market_name, MarketDataDB.type == "DAYAHEAD")
.order_by(MarketDataDB.timestamp.desc())
.first()
)
if not last_dayahead or (
datetime.now(UTC) - last_dayahead.timestamp.replace(tzinfo=UTC)
> timedelta(hours=1)
):
print(f"Getting day-ahead data for {market_name}")
now_utc = datetime.now(UTC)
day_ahead_date = now_utc.date()
if (
now_utc.hour >= 18
): # After 6PM UTC, markets usually publish next day's data
day_ahead_date += timedelta(days=1)
df = iso.get_lmp(
date=day_ahead_date, market="DAY_AHEAD_HOURLY", locations="ALL"
)
df["Interval Start"] = df["Interval Start"].dt.tz_convert("UTC")
grouped = (
df.groupby("Interval Start")[["LMP", "Energy", "Congestion", "Loss"]]
.mean()
.reset_index()
)
for _, row in grouped.iterrows():
if last_dayahead and row[
"Interval Start"
] <= last_dayahead.timestamp.replace(tzinfo=UTC):
continue
entry = MarketDataDB(
timestamp=row["Interval Start"],
lmp=row["LMP"],
energy=row["Energy"],
congestion=row["Congestion"],
loss=row["Loss"],
market=market_name,
type="DAYAHEAD",
)
db.add(entry)
db.commit()
db.close()
if __name__ == "__main__":
update_market_data()
|