@@ -334,10 +334,7 @@ def _terminate_run(
334
334
335
335
def _get_internal_metrics (
336
336
self ,
337
- system_metrics_step : int | None ,
338
- emission_metrics_step : int | None ,
339
- res_measure_interval : int | None = None ,
340
- ems_measure_interval : int | None = None ,
337
+ system_metrics_step : int ,
341
338
) -> None :
342
339
"""Refresh resource and emissions metrics.
343
340
@@ -346,55 +343,49 @@ def _get_internal_metrics(
346
343
347
344
Parameters
348
345
----------
349
- system_metrics_step: int | None
350
- the current step for this resource metric record,
351
- None if skipping resource metrics.
352
- emission_metrics_step: int | None
353
- the current step for this emission metrics record,
354
- None if skipping emission metrics.
355
- res_measure_interval: int | None, optional
356
- the interval for resource metric gathering, default is None
357
- ems_measure_interval: int | None, optional
358
- the interval for emission metric gathering, default is None
346
+ system_metrics_step: int
347
+ The current step for this system metric record
359
348
360
349
Return
361
350
------
362
351
tuple[float, float]
363
352
new resource metric measure time
364
353
new emissions metric measure time
365
354
"""
355
+
356
+ # In order to get a resource metric reading at t=0
357
+ # because there is no previous CPU reading yet we cannot
358
+ # use the default of None for the interval here, so we measure
359
+ # at an interval of 1s.
366
360
_current_system_measure = SystemResourceMeasurement (
367
361
self .processes ,
368
- interval = res_measure_interval ,
369
- cpu_only = not system_metrics_step ,
362
+ interval = 1 if system_metrics_step == 0 else None ,
370
363
)
371
364
372
- if system_metrics_step is not None :
373
- # Set join on fail to false as if an error is thrown
374
- # join would be called on this thread and a thread cannot
375
- # join itself!
376
- self ._add_metrics_to_dispatch (
377
- _current_system_measure .to_dict (),
378
- join_on_fail = False ,
379
- step = system_metrics_step ,
380
- )
365
+ # Set join on fail to false as if an error is thrown
366
+ # join would be called on this thread and a thread cannot
367
+ # join itself!
368
+ self ._add_metrics_to_dispatch (
369
+ _current_system_measure .to_dict (),
370
+ join_on_fail = False ,
371
+ step = system_metrics_step ,
372
+ )
381
373
382
- if (
383
- self ._emissions_monitor
384
- and emission_metrics_step is not None
385
- and ems_measure_interval is not None
386
- and _current_system_measure .cpu_percent is not None
387
- ):
374
+ # For the first emissions metrics reading, the time interval to use
375
+ # Is the time since the run started, otherwise just use the time between readings
376
+ if self ._emissions_monitor :
388
377
self ._emissions_monitor .estimate_co2_emissions (
389
378
process_id = f"{ self ._name } " ,
390
379
cpu_percent = _current_system_measure .cpu_percent ,
391
- measure_interval = ems_measure_interval ,
380
+ measure_interval = (time .time () - self ._start_time )
381
+ if system_metrics_step == 0
382
+ else self ._system_metrics_interval ,
392
383
gpu_percent = _current_system_measure .gpu_percent ,
393
384
)
394
385
self ._add_metrics_to_dispatch (
395
386
self ._emissions_monitor .simvue_metrics (),
396
387
join_on_fail = False ,
397
- step = emission_metrics_step ,
388
+ step = system_metrics_step ,
398
389
)
399
390
400
391
def _create_heartbeat_callback (
@@ -416,61 +407,30 @@ def _heartbeat(
416
407
raise RuntimeError ("Expected initialisation of heartbeat" )
417
408
418
409
last_heartbeat : float = 0
419
- last_res_metric_call : float = 0
420
- last_co2_metric_call : float = 0
421
-
422
- co2_step : int = 0
423
- res_step : int = 0
410
+ last_sys_metric_call : float = 0
424
411
425
- initial_ems_metrics_interval : float = time . time () - self . _start_time
412
+ sys_step : int = 0
426
413
427
414
while not heartbeat_trigger .is_set ():
428
415
with self ._configuration_lock :
429
416
_current_time : float = time .time ()
417
+
430
418
_update_system_metrics : bool = (
431
419
self ._system_metrics_interval is not None
432
- and _current_time - last_res_metric_call
433
- > self ._system_metrics_interval
434
- and self ._status == "running"
435
- )
436
- _update_emissions_metrics : bool = (
437
- self ._system_metrics_interval is not None
438
- and self ._emissions_monitor
439
- and _current_time - last_co2_metric_call
420
+ and _current_time - last_sys_metric_call
440
421
> self ._system_metrics_interval
441
422
and self ._status == "running"
442
423
)
443
424
444
- # In order to get a resource metric reading at t=0
445
- # because there is no previous CPU reading yet we cannot
446
- # use the default of None for the interval here, so we measure
447
- # at an interval of 1s. For emissions metrics the first step
448
- # is time since run start
449
- self ._get_internal_metrics (
450
- emission_metrics_step = co2_step
451
- if _update_emissions_metrics
452
- else None ,
453
- system_metrics_step = res_step
454
- if _update_system_metrics
455
- else None ,
456
- res_measure_interval = 1 if res_step == 0 else None ,
457
- ems_measure_interval = initial_ems_metrics_interval
458
- if co2_step == 0
459
- else self ._system_metrics_interval ,
460
- )
425
+ if _update_system_metrics :
426
+ self ._get_internal_metrics (system_metrics_step = sys_step )
461
427
462
- res_step += 1
463
- co2_step += 1
428
+ sys_step += 1
464
429
465
- last_res_metric_call = (
430
+ last_sys_metric_call = (
466
431
_current_time
467
432
if _update_system_metrics
468
- else last_res_metric_call
469
- )
470
- last_co2_metric_call = (
471
- _current_time
472
- if _update_emissions_metrics
473
- else last_co2_metric_call
433
+ else last_sys_metric_call
474
434
)
475
435
476
436
if time .time () - last_heartbeat < self ._heartbeat_interval :
@@ -1055,7 +1015,7 @@ def config(
1055
1015
queue_blocking : bool | None = None ,
1056
1016
system_metrics_interval : pydantic .PositiveInt | None = None ,
1057
1017
enable_emission_metrics : bool | None = None ,
1058
- disable_system_metrics : bool | None = None ,
1018
+ disable_resources_metrics : bool | None = None ,
1059
1019
storage_id : str | None = None ,
1060
1020
abort_on_alert : typing .Literal ["run" , "all" , "ignore" ] | bool | None = None ,
1061
1021
) -> bool :
@@ -1069,10 +1029,10 @@ def config(
1069
1029
queue_blocking : bool, optional
1070
1030
block thread queues during metric/event recording
1071
1031
system_metrics_interval : int, optional
1072
- frequency at which to collect resource metrics
1032
+ frequency at which to collect resource and emissions metrics, if enabled
1073
1033
enable_emission_metrics : bool, optional
1074
1034
enable monitoring of emission metrics
1075
- disable_system_metrics : bool, optional
1035
+ disable_resources_metrics : bool, optional
1076
1036
disable monitoring of resource metrics
1077
1037
storage_id : str, optional
1078
1038
identifier of storage to use, by default None
@@ -1095,17 +1055,30 @@ def config(
1095
1055
if queue_blocking is not None :
1096
1056
self ._queue_blocking = queue_blocking
1097
1057
1098
- if system_metrics_interval and disable_system_metrics :
1058
+ if system_metrics_interval and disable_resources_metrics :
1099
1059
self ._error (
1100
1060
"Setting of resource metric interval and disabling resource metrics is ambiguous"
1101
1061
)
1102
1062
return False
1103
1063
1104
- if disable_system_metrics :
1064
+ if system_metrics_interval :
1065
+ self ._system_metrics_interval = system_metrics_interval
1066
+
1067
+ if disable_resources_metrics :
1068
+ if self ._emissions_monitor :
1069
+ self ._error (
1070
+ "Emissions metrics require resource metrics collection."
1071
+ )
1072
+ return False
1105
1073
self ._pid = None
1106
1074
self ._system_metrics_interval = None
1107
1075
1108
1076
if enable_emission_metrics :
1077
+ if not self ._system_metrics_interval :
1078
+ self ._error (
1079
+ "Emissions metrics require resource metrics collection - make sure resource metrics are enabled!"
1080
+ )
1081
+ return False
1109
1082
if self ._user_config .run .mode == "offline" :
1110
1083
# Create an emissions monitor with no API calls
1111
1084
self ._emissions_monitor = CO2Monitor (
@@ -1130,9 +1103,6 @@ def config(
1130
1103
elif enable_emission_metrics is False and self ._emissions_monitor :
1131
1104
self ._error ("Cannot disable emissions monitor once it has been started" )
1132
1105
1133
- if system_metrics_interval :
1134
- self ._system_metrics_interval = system_metrics_interval
1135
-
1136
1106
if abort_on_alert is not None :
1137
1107
if isinstance (abort_on_alert , bool ):
1138
1108
warnings .warn (
0 commit comments