15
15
import logging
16
16
import humanfriendly
17
17
import pathlib
18
+ import os .path
18
19
19
20
from simvue .eco .api_client import APIClient , CO2SignalResponse
20
21
@@ -46,6 +47,7 @@ class CO2Monitor(pydantic.BaseModel):
46
47
intensity_refresh_interval : int | None | str
47
48
co2_intensity : float | None
48
49
co2_signal_api_token : str | None
50
+ offline : bool = False
49
51
50
52
def now (self ) -> str :
51
53
"""Return data file timestamp for the current time"""
@@ -97,7 +99,9 @@ def __init__(self, *args, **kwargs) -> None:
97
99
disable using RestAPIs to retrieve CO2 intensity and instead use this value.
98
100
Default is None, use remote data. Value is in kgCO2/kWh
99
101
co2_signal_api_token: str
100
- RECOMMENDED. The API token for CO2 signal, default is None.
102
+ The API token for CO2 signal, default is None.
103
+ offline: bool, optional
104
+ Run without any server interaction
101
105
"""
102
106
_logger = logging .getLogger (self .__class__ .__name__ )
103
107
@@ -113,6 +117,7 @@ def __init__(self, *args, **kwargs) -> None:
113
117
"⚠️ No TDP value provided for current GPUs, will use arbitrary value of 130W."
114
118
)
115
119
super ().__init__ (* args , ** kwargs )
120
+ self ._last_local_write = datetime .datetime .now ()
116
121
117
122
if self .intensity_refresh_interval and isinstance (
118
123
self .intensity_refresh_interval , str
@@ -144,7 +149,7 @@ def __init__(self, *args, **kwargs) -> None:
144
149
self ._logger = _logger
145
150
self ._client : APIClient | None = (
146
151
None
147
- if self .co2_intensity
152
+ if self .co2_intensity or self . offline
148
153
else APIClient (co2_api_token = self .co2_signal_api_token , timeout = 10 )
149
154
)
150
155
self ._processes : dict [str , ProcessData ] = {}
@@ -158,10 +163,25 @@ def check_refresh(self) -> bool:
158
163
whether a refresh of the CO2 intensity was requested
159
164
from the CO2 Signal API.
160
165
"""
166
+ # Need to check if the local cache has been modified
167
+ # even if running offline
161
168
if (
162
- not self ._local_data .setdefault (self ._client .country_code , {})
163
- or self .outdated
169
+ self ._data_file_path .exists ()
170
+ and (
171
+ _check_write := datetime .datetime .fromtimestamp (
172
+ os .path .getmtime (f"{ self ._data_file_path } " )
173
+ )
174
+ )
175
+ > self ._last_local_write
164
176
):
177
+ self ._last_local_write = _check_write
178
+ with self ._data_file_path .open ("r" ) as in_f :
179
+ self ._local_data = json .load (in_f )
180
+
181
+ if (
182
+ self ._client
183
+ and not self ._local_data .setdefault (self ._client .country_code , {})
184
+ ) or self .outdated :
165
185
self ._logger .info ("🌍 CO2 emission outdated, calling API." )
166
186
_data : CO2SignalResponse = self ._client .get ()
167
187
self ._local_data [self ._client .country_code ] = _data .model_dump (mode = "json" )
@@ -200,8 +220,23 @@ def estimate_co2_emissions(
200
220
_co2_units = "kgCO2/kWh"
201
221
else :
202
222
self .check_refresh ()
223
+ if self ._client :
224
+ _country_code = self ._client .country_code
225
+ else :
226
+ # If no local data yet then return
227
+ if not (_country_codes := list (self ._local_data .keys ())):
228
+ self ._logger .warning (
229
+ "No CO2 emission data recorded as no CO2 intensity value "
230
+ "has been provided and there is no local intensity data available."
231
+ )
232
+ return
233
+
234
+ _country_code = _country_codes [0 ]
235
+ self ._logger .debug (
236
+ f"🗂️ Using data for region '{ _country_code } ' from local cache for offline estimation."
237
+ )
203
238
self ._current_co2_data = CO2SignalResponse (
204
- ** self ._local_data [self . _client . country_code ]
239
+ ** self ._local_data [_country_code ]
205
240
)
206
241
_current_co2_intensity = self ._current_co2_data .data .carbon_intensity
207
242
_co2_units = self ._current_co2_data .carbon_intensity_units
0 commit comments