@@ -334,10 +334,7 @@ def _terminate_run(
334
334
335
335
def _get_internal_metrics (
336
336
self ,
337
- system_metrics_step : int | None ,
338
- emission_metrics_step : int | None ,
339
- res_measure_interval : int | None = None ,
340
- ems_measure_interval : int | None = None ,
337
+ system_metrics_step : int ,
341
338
) -> None :
342
339
"""Refresh resource and emissions metrics.
343
340
@@ -346,55 +343,49 @@ def _get_internal_metrics(
346
343
347
344
Parameters
348
345
----------
349
- system_metrics_step: int | None
350
- the current step for this resource metric record,
351
- None if skipping resource metrics.
352
- emission_metrics_step: int | None
353
- the current step for this emission metrics record,
354
- None if skipping emission metrics.
355
- res_measure_interval: int | None, optional
356
- the interval for resource metric gathering, default is None
357
- ems_measure_interval: int | None, optional
358
- the interval for emission metric gathering, default is None
346
+ system_metrics_step: int
347
+ The current step for this system metric record
359
348
360
349
Return
361
350
------
362
351
tuple[float, float]
363
352
new resource metric measure time
364
353
new emissions metric measure time
365
354
"""
355
+
356
+ # In order to get a resource metric reading at t=0
357
+ # because there is no previous CPU reading yet we cannot
358
+ # use the default of None for the interval here, so we measure
359
+ # at an interval of 1s.
366
360
_current_system_measure = SystemResourceMeasurement (
367
361
self .processes ,
368
- interval = res_measure_interval ,
369
- cpu_only = not system_metrics_step ,
362
+ interval = 1 if system_metrics_step == 0 else None ,
370
363
)
371
364
372
- if system_metrics_step is not None :
373
- # Set join on fail to false as if an error is thrown
374
- # join would be called on this thread and a thread cannot
375
- # join itself!
376
- self ._add_metrics_to_dispatch (
377
- _current_system_measure .to_dict (),
378
- join_on_fail = False ,
379
- step = system_metrics_step ,
380
- )
365
+ # Set join on fail to false as if an error is thrown
366
+ # join would be called on this thread and a thread cannot
367
+ # join itself!
368
+ self ._add_metrics_to_dispatch (
369
+ _current_system_measure .to_dict (),
370
+ join_on_fail = False ,
371
+ step = system_metrics_step ,
372
+ )
381
373
382
- if (
383
- self ._emissions_monitor
384
- and emission_metrics_step is not None
385
- and ems_measure_interval is not None
386
- and _current_system_measure .cpu_percent is not None
387
- ):
374
+ # For the first emissions metrics reading, the time interval to use
375
+ # Is the time since the run started, otherwise just use the time between readings
376
+ if self ._emissions_monitor :
388
377
self ._emissions_monitor .estimate_co2_emissions (
389
378
process_id = f"{ self ._name } " ,
390
379
cpu_percent = _current_system_measure .cpu_percent ,
391
- measure_interval = ems_measure_interval ,
380
+ measure_interval = (time .time () - self ._start_time )
381
+ if system_metrics_step == 0
382
+ else self ._system_metrics_interval ,
392
383
gpu_percent = _current_system_measure .gpu_percent ,
393
384
)
394
385
self ._add_metrics_to_dispatch (
395
386
self ._emissions_monitor .simvue_metrics (),
396
387
join_on_fail = False ,
397
- step = emission_metrics_step ,
388
+ step = system_metrics_step ,
398
389
)
399
390
400
391
def _create_heartbeat_callback (
@@ -416,61 +407,29 @@ def _heartbeat(
416
407
raise RuntimeError ("Expected initialisation of heartbeat" )
417
408
418
409
last_heartbeat : float = 0
419
- last_res_metric_call : float = 0
420
- last_co2_metric_call : float = 0
421
-
422
- co2_step : int = 0
423
- res_step : int = 0
410
+ last_sys_metric_call : float = 0
424
411
425
- initial_ems_metrics_interval : float = time . time () - self . _start_time
412
+ sys_step : int = 0
426
413
427
414
while not heartbeat_trigger .is_set ():
428
415
with self ._configuration_lock :
429
416
_current_time : float = time .time ()
417
+
430
418
_update_system_metrics : bool = (
431
419
self ._system_metrics_interval is not None
432
- and _current_time - last_res_metric_call
433
- > self ._system_metrics_interval
434
- and self ._status == "running"
435
- )
436
- _update_emissions_metrics : bool = (
437
- self ._system_metrics_interval is not None
438
- and self ._emissions_monitor
439
- and _current_time - last_co2_metric_call
420
+ and _current_time - last_sys_metric_call
440
421
> self ._system_metrics_interval
441
422
and self ._status == "running"
442
423
)
443
424
444
- # In order to get a resource metric reading at t=0
445
- # because there is no previous CPU reading yet we cannot
446
- # use the default of None for the interval here, so we measure
447
- # at an interval of 1s. For emissions metrics the first step
448
- # is time since run start
449
- self ._get_internal_metrics (
450
- emission_metrics_step = co2_step
451
- if _update_emissions_metrics
452
- else None ,
453
- system_metrics_step = res_step
454
- if _update_system_metrics
455
- else None ,
456
- res_measure_interval = 1 if res_step == 0 else None ,
457
- ems_measure_interval = initial_ems_metrics_interval
458
- if co2_step == 0
459
- else self ._system_metrics_interval ,
460
- )
425
+ if _update_system_metrics :
426
+ self ._get_internal_metrics (system_metrics_step = sys_step )
427
+ sys_step += 1
461
428
462
- res_step += 1
463
- co2_step += 1
464
-
465
- last_res_metric_call = (
429
+ last_sys_metric_call = (
466
430
_current_time
467
431
if _update_system_metrics
468
- else last_res_metric_call
469
- )
470
- last_co2_metric_call = (
471
- _current_time
472
- if _update_emissions_metrics
473
- else last_co2_metric_call
432
+ else last_sys_metric_call
474
433
)
475
434
476
435
if time .time () - last_heartbeat < self ._heartbeat_interval :
@@ -1055,7 +1014,7 @@ def config(
1055
1014
queue_blocking : bool | None = None ,
1056
1015
system_metrics_interval : pydantic .PositiveInt | None = None ,
1057
1016
enable_emission_metrics : bool | None = None ,
1058
- disable_system_metrics : bool | None = None ,
1017
+ disable_resources_metrics : bool | None = None ,
1059
1018
storage_id : str | None = None ,
1060
1019
abort_on_alert : typing .Literal ["run" , "all" , "ignore" ] | bool | None = None ,
1061
1020
) -> bool :
@@ -1069,10 +1028,10 @@ def config(
1069
1028
queue_blocking : bool, optional
1070
1029
block thread queues during metric/event recording
1071
1030
system_metrics_interval : int, optional
1072
- frequency at which to collect resource metrics
1031
+ frequency at which to collect resource and emissions metrics, if enabled
1073
1032
enable_emission_metrics : bool, optional
1074
1033
enable monitoring of emission metrics
1075
- disable_system_metrics : bool, optional
1034
+ disable_resources_metrics : bool, optional
1076
1035
disable monitoring of resource metrics
1077
1036
storage_id : str, optional
1078
1037
identifier of storage to use, by default None
@@ -1095,17 +1054,30 @@ def config(
1095
1054
if queue_blocking is not None :
1096
1055
self ._queue_blocking = queue_blocking
1097
1056
1098
- if system_metrics_interval and disable_system_metrics :
1057
+ if system_metrics_interval and disable_resources_metrics :
1099
1058
self ._error (
1100
1059
"Setting of resource metric interval and disabling resource metrics is ambiguous"
1101
1060
)
1102
1061
return False
1103
1062
1104
- if disable_system_metrics :
1063
+ if system_metrics_interval :
1064
+ self ._system_metrics_interval = system_metrics_interval
1065
+
1066
+ if disable_resources_metrics :
1067
+ if self ._emissions_monitor :
1068
+ self ._error (
1069
+ "Emissions metrics require resource metrics collection."
1070
+ )
1071
+ return False
1105
1072
self ._pid = None
1106
1073
self ._system_metrics_interval = None
1107
1074
1108
1075
if enable_emission_metrics :
1076
+ if not self ._system_metrics_interval :
1077
+ self ._error (
1078
+ "Emissions metrics require resource metrics collection - make sure resource metrics are enabled!"
1079
+ )
1080
+ return False
1109
1081
if self ._user_config .run .mode == "offline" :
1110
1082
# Create an emissions monitor with no API calls
1111
1083
self ._emissions_monitor = CO2Monitor (
@@ -1130,9 +1102,6 @@ def config(
1130
1102
elif enable_emission_metrics is False and self ._emissions_monitor :
1131
1103
self ._error ("Cannot disable emissions monitor once it has been started" )
1132
1104
1133
- if system_metrics_interval :
1134
- self ._system_metrics_interval = system_metrics_interval
1135
-
1136
1105
if abort_on_alert is not None :
1137
1106
if isinstance (abort_on_alert , bool ):
1138
1107
warnings .warn (
0 commit comments