Skip to content

Commit bd10c44

Browse files
committed
Started simplifying ecoclient
1 parent 8f79810 commit bd10c44

File tree

3 files changed

+195
-208
lines changed

3 files changed

+195
-208
lines changed

simvue/eco/emissions_monitor.py

Lines changed: 28 additions & 118 deletions
Original file line numberDiff line numberDiff line change
@@ -6,22 +6,15 @@
66
"""
77

88
__author__ = "Kristian Zarebski"
9-
__version__ = "0.1.0"
10-
__license__ = "MIT"
119
__date__ = "2025-02-27"
1210

1311
import datetime
1412
import json
1513
import pydantic
1614
import dataclasses
17-
import threading
18-
import time
1915
import logging
20-
import typing
21-
import psutil
2216
import humanfriendly
2317
import pathlib
24-
import os
2518

2619
from simvue.eco.api_client import APIClient, CO2SignalResponse
2720

@@ -30,7 +23,6 @@
3023

3124
@dataclasses.dataclass
3225
class ProcessData:
33-
process: psutil.Process
3426
cpu_percentage: float = 0.0
3527
power_usage: float = 0.0
3628
total_energy: float = 0.0
@@ -145,45 +137,23 @@ def __init__(self, *args, **kwargs) -> None:
145137
)
146138
self._processes: dict[str, ProcessData] = {}
147139

148-
@pydantic.validate_call(config={"arbitrary_types_allowed": True})
149-
def attach_process(
150-
self, process: psutil.Process | None = None, label: str | None = None
151-
) -> str:
152-
"""
153-
Attach a process to the CO2 Monitor.
154-
155-
Parameters
156-
----------
157-
process : psutil.Process | None
158-
The process to monitor, if None measures the current running process. Default is None.
159-
label : str | None
160-
The label to assign to the process. Default is process_<pid>.
161-
162-
Returns
163-
-------
164-
int
165-
The PID of the process.
166-
"""
167-
if process is None:
168-
process = psutil.Process(pid=os.getpid())
169-
170-
self._logger.info(f"📎 Attaching process with PID {process.pid}")
171-
172-
label = label or f"process_{process.pid}"
173-
self._processes[label] = ProcessData(process=process)
174-
175-
return label
176-
177-
def estimate_co2_emissions(self) -> None:
140+
def estimate_co2_emissions(
141+
self, process_id: str, cpu_percent: float, cpu_interval: float
142+
) -> None:
178143
"""Estimate the CO2 emissions"""
179-
self._logger.info("📐 Measuring CPU usage and power.")
144+
self._logger.debug(
145+
f"📐 Estimating CO2 emissions from CPU usage of {cpu_percent}% in interval {cpu_interval}s."
146+
)
180147

181148
if self._local_data is None:
182149
raise RuntimeError("Expected local data to be initialised.")
183150

184151
if not self._data_file_path:
185152
raise RuntimeError("Expected local data file to be defined.")
186153

154+
if not (_process := self._processes.get(process_id)):
155+
self._processes[process_id] = (_process := ProcessData())
156+
187157
if (
188158
not self.co2_intensity
189159
and not self._local_data.setdefault(self._client.country_code, {})
@@ -207,86 +177,30 @@ def estimate_co2_emissions(self) -> None:
207177
_current_co2_intensity = self._current_co2_data.data.carbon_intensity
208178
_co2_units = self._current_co2_data.carbon_intensity_units
209179

210-
for label, process in self._processes.items():
211-
process.cpu_percentage = process.process.cpu_percent(
212-
interval=self.cpu_interval
213-
)
214-
_previous_energy: float = process.total_energy
215-
process.power_usage = min(
216-
self.cpu_idle_power,
217-
(process.cpu_percentage / 100.0) * self.thermal_design_power_per_core,
218-
)
219-
process.total_energy += process.power_usage * self.cpu_interval
220-
process.energy_delta = process.total_energy - _previous_energy
221-
222-
# Measured value is in g/kWh, convert to kg/kWs
223-
_carbon_intensity_kgpws: float = _current_co2_intensity / (60 * 60 * 1e3)
224-
225-
_previous_emission: float = process.co2_emission
180+
_process.cpu_percentage = cpu_percent
181+
_previous_energy: float = _process.total_energy
182+
_process.power_usage = min(
183+
self.cpu_idle_power,
184+
(_process.cpu_percentage / 100.0) * self.thermal_design_power_per_core,
185+
)
186+
_process.total_energy += _process.power_usage * self.cpu_interval
187+
_process.energy_delta = _process.total_energy - _previous_energy
226188

227-
process.co2_delta = (
228-
process.power_usage * _carbon_intensity_kgpws * self.cpu_interval
229-
)
189+
# Measured value is in g/kWh, convert to kg/kWs
190+
_carbon_intensity_kgpws: float = _current_co2_intensity / (60 * 60 * 1e3)
230191

231-
process.co2_emission += process.co2_delta
192+
_previous_emission: float = _process.co2_emission
232193

233-
self._logger.debug(
234-
f"📝 For process '{label}', recorded: CPU={process.cpu_percentage}%, "
235-
f"Power={process.power_usage}W, CO2={process.co2_emission}{_co2_units}"
236-
)
194+
_process.co2_delta = (
195+
_process.power_usage * _carbon_intensity_kgpws * self.cpu_interval
196+
)
237197

238-
@pydantic.validate_call(config={"arbitrary_types_allowed": True})
239-
def run(
240-
self,
241-
termination_trigger: threading.Event,
242-
callback: typing.Callable,
243-
measure_interval: pydantic.PositiveFloat = pydantic.Field(default=10.0, gt=2.0),
244-
return_all: bool = False,
245-
) -> None:
246-
"""Run the API client in a thread.
198+
_process.co2_emission += _process.co2_delta
247199

248-
Parameters
249-
----------
250-
termination_trigger : threading.Event
251-
thread event used to terminate monitor
252-
callback : typing.Callable
253-
callback to execute on measured results
254-
measure_interval : float, optional
255-
interval of measurement, note the API is limited at a rate of 30 requests per
256-
hour, therefore any interval less than 2 minutes will use the previously recorded CO2 intensity.
257-
Default is 10 seconds.
258-
return_all : bool, optional
259-
whether to return all processes or just the current. Default is False.
260-
261-
Returns
262-
-------
263-
ProcessData | dict[str, ProcessData]
264-
Either the process data for the current process or for all processes.
265-
"""
266-
self._logger.info("🧵 Launching monitor in multi-threaded mode.")
267-
self._logger.info(f"⌚ Will record at interval of {measure_interval}s.")
268-
269-
def _run(
270-
monitor: "CO2Monitor" = self,
271-
callback: typing.Callable = callback,
272-
return_all: bool = return_all,
273-
) -> None:
274-
if not return_all and not monitor.last_process:
275-
raise ValueError("No processes attached to monitor.")
276-
277-
while not termination_trigger.is_set():
278-
monitor.estimate_co2_emissions()
279-
# Depending on user choice either
280-
# return all process data or just the last
281-
callback(
282-
monitor.process_data
283-
if return_all
284-
else monitor.process_data[monitor.last_process] # type: ignore
285-
)
286-
time.sleep(measure_interval)
287-
288-
_thread = threading.Thread(target=_run)
289-
_thread.start()
200+
self._logger.debug(
201+
f"📝 For _process '{process_id}', recorded: CPU={_process.cpu_percentage}%, "
202+
f"Power={_process.power_usage}W, CO2={_process.co2_emission}{_co2_units}"
203+
)
290204

291205
@property
292206
def last_process(self) -> str | None:
@@ -300,10 +214,6 @@ def process_data(self) -> dict[str, ProcessData]:
300214
def current_carbon_intensity(self) -> float:
301215
return self._client.get().data.carbon_intensity
302216

303-
@property
304-
def total_cpu_percentage(self) -> float:
305-
return sum(process.cpu_percentage for process in self._processes.values())
306-
307217
@property
308218
def total_power_usage(self) -> float:
309219
return sum(process.power_usage for process in self._processes.values())

simvue/metrics.py

Lines changed: 36 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@
1010
import logging
1111
import psutil
1212

13+
1314
from .pynvml import (
1415
nvmlDeviceGetComputeRunningProcesses,
1516
nvmlDeviceGetCount,
@@ -21,6 +22,8 @@
2122
nvmlShutdown,
2223
)
2324

25+
RESOURCES_METRIC_PREFIX: str = "resources"
26+
2427
logger = logging.getLogger(__name__)
2528

2629

@@ -65,11 +68,11 @@ def is_gpu_used(handle, processes: list[psutil.Process]) -> bool:
6568
return len(list(set(gpu_pids) & set(pids))) > 0
6669

6770

68-
def get_gpu_metrics(processes: list[psutil.Process]) -> dict[str, float]:
71+
def get_gpu_metrics(processes: list[psutil.Process]) -> list[tuple[float, float]]:
6972
"""
7073
Get GPU metrics
7174
"""
72-
gpu_metrics: dict[str, float] = {}
75+
gpu_metrics: list[tuple[float, float]] = []
7376

7477
with contextlib.suppress(Exception):
7578
nvmlInit()
@@ -80,11 +83,38 @@ def get_gpu_metrics(processes: list[psutil.Process]) -> dict[str, float]:
8083
utilisation_percent = nvmlDeviceGetUtilizationRates(handle).gpu
8184
memory = nvmlDeviceGetMemoryInfo(handle)
8285
memory_percent = 100 * memory.free / memory.total
83-
gpu_metrics[f"resources/gpu.utilisation.percent.{i}"] = (
84-
utilisation_percent
85-
)
86-
gpu_metrics[f"resources/gpu.memory.percent.{i}"] = memory_percent
86+
gpu_metrics.append((utilisation_percent, memory_percent))
8787

8888
nvmlShutdown()
8989

9090
return gpu_metrics
91+
92+
93+
class SystemResourceMeasurement:
94+
def __init__(
95+
self,
96+
processes: list[psutil.Process],
97+
interval: float | None,
98+
cpu_only: bool = False,
99+
) -> None:
100+
self.cpu_percent: float | None = get_process_cpu(processes, interval=interval)
101+
self.cpu_memory: float | None = get_process_memory(processes)
102+
self.gpus: list[dict[str, float]] = (
103+
None if cpu_only else get_gpu_metrics(processes)
104+
)
105+
106+
def to_dict(self) -> dict[str, float]:
107+
_metrics: dict[str, float] = {
108+
f"{RESOURCES_METRIC_PREFIX}/cpu.usage.percentage": self.cpu_percent,
109+
f"{RESOURCES_METRIC_PREFIX}/cpu.usage.memory": self.cpu_memory,
110+
}
111+
112+
for i, gpu in enumerate(self.gpus):
113+
_metrics[f"{RESOURCES_METRIC_PREFIX}/gpu.utilisation.percent.{i}"] = gpu[
114+
"utilisation"
115+
]
116+
_metrics[f"{RESOURCES_METRIC_PREFIX}/gpu.utilisation.memory.{i}"] = gpu[
117+
"memory"
118+
]
119+
120+
return _metrics

0 commit comments

Comments
 (0)