aboutsummaryrefslogtreecommitdiff
path: root/server/services/process_bids.py
blob: edac780f756e43a792352d81fa3e73d4c9bd8661 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
from datetime import UTC, datetime, timedelta
from sqlalchemy.orm import Session
from db import SessionLocal
from models.bid import Bid
from models.auth import User
from gridstatus import ISONE, NYISO, MISO
from zoneinfo import ZoneInfo
from sqlalchemy.exc import OperationalError
import time

MARKET_ISOS = {
    "ISONE": ISONE(),
    "NYISO": NYISO(),
    "MISO": MISO(),
}

MARKET_TIMEZONES = {
    "ISONE": ZoneInfo("America/New_York"),
    "NYISO": ZoneInfo("America/New_York"),
    "MISO": ZoneInfo("America/Chicago"),
}


def get_day_ahead_price(market: str, target_time: datetime) -> float:
    """Fetch the Day Ahead clearing price for the hour of target_time."""
    iso = MARKET_ISOS[market]
    df = iso.get_lmp(
        date=target_time.date(), market="DAY_AHEAD_HOURLY", locations="ALL"
    )
    df = df.groupby("Interval Start")["LMP"].mean().reset_index()

    for _, row in df.iterrows():
        if abs(
            row["Interval Start"]
            - target_time.replace(minute=0, second=0, microsecond=0)
        ) < timedelta(minutes=30):
            return row["LMP"]

    raise ValueError(f"No day ahead price found for {target_time} in {market}")


def get_real_time_prices(market: str, target_time: datetime) -> list[float]:
    """Fetch the Real Time 5-min prices during the hour of target_time."""
    iso = MARKET_ISOS[market]
    df = iso.get_lmp(date=target_time.date(), market="REAL_TIME_5_MIN", locations="ALL")
    df = df.groupby("Interval Start")["LMP"].mean().reset_index()

    start_of_hour = target_time.replace(minute=0, second=0, microsecond=0)
    end_of_hour = start_of_hour + timedelta(hours=1)

    prices = [
        row["LMP"]
        for _, row in df.iterrows()
        if start_of_hour <= row["Interval Start"] < end_of_hour
    ]

    return prices


def process_bids():
    db: Session = SessionLocal()

    now_utc = datetime.now(UTC)

    # Step 1: Process Submitted bids
    submitted_bids = db.query(Bid).filter(Bid.status == "Submitted").all()
    for bid in submitted_bids:
        try:
            market = bid.market
            market_tz = MARKET_TIMEZONES[market]
            target_time = bid.timestamp.astimezone(market_tz)
            now = now_utc.astimezone(market_tz)

            # Only process bids whose timestamp is already in the past
            if target_time > now:
                print(f"Skipping future bid {bid.id} at {target_time} ({market})")
                continue

            print(f"Processing submitted bid {bid.id} for {target_time} ({market})")

            day_ahead_price = get_day_ahead_price(market, target_time)

            if bid.price >= day_ahead_price:
                bid.status = "Executed"
                bid.pnl = None  # Will be calculated later
                print(f"Bid {bid.id}: Executed (waiting for real-time PnL)")
            else:
                bid.status = "Fail"
                bid.pnl = None
                print(f"Bid {bid.id}: Failed (price too low)")

            max_retries = 3
            for attempt in range(max_retries):
                try:
                    db.commit()
                    break
                except OperationalError as e:
                    if "database is locked" in str(e).lower():
                        if attempt < max_retries - 1:
                            time.sleep(2)
                            continue
                        else:
                            raise Exception(
                                "Database is locked, and retries exhausted."
                            )
                    else:
                        raise e

        except Exception as e:
            print(f"Error processing submitted bid {bid.id}: {e}")

    # Step 2: Process Executed bids (calculate PnL)
    executed_bids = db.query(Bid).filter(Bid.status == "Executed").all()
    for bid in executed_bids:
        try:
            market = bid.market
            market_tz = MARKET_TIMEZONES[market]
            target_time = bid.timestamp.astimezone(market_tz)
            now = now_utc.astimezone(market_tz)

            if now < target_time + timedelta(hours=1):
                print(f"Skipping bid {bid.id}, real-time window not complete yet")
                continue

            print(f"Settling executed bid {bid.id} for {target_time} ({market})")

            real_time_prices = get_real_time_prices(market, target_time)

            if not real_time_prices:
                print(f"No real-time prices available for bid {bid.id} yet")
                continue  # Retry later

            avg_real_time_price = sum(real_time_prices) / len(real_time_prices)
            day_ahead_price = get_day_ahead_price(market, target_time)

            pnl = (day_ahead_price - avg_real_time_price) * bid.quantity

            bid.status = "Settled"
            bid.pnl = pnl
            print(f"Bid {bid.id} Settled | PnL: ${pnl:.2f}")

            max_retries = 3
            for attempt in range(max_retries):
                try:
                    db.commit()
                    break
                except OperationalError as e:
                    if "database is locked" in str(e).lower():
                        if attempt < max_retries - 1:
                            time.sleep(2)
                            continue
                        else:
                            raise Exception(
                                "Database is locked, and retries exhausted."
                            )
                    else:
                        raise e

        except Exception as e:
            print(f"Error settling executed bid {bid.id}: {e}")

    db.close()


if __name__ == "__main__":
    process_bids()