diff --git a/CHANGELOG.md b/CHANGELOG.md index d96c2000..6cd8fbb7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,5 +1,15 @@ # Change log +## [v2.0.0-alpha3](https://github.com/simvue-io/client/releases/tag/v2.0.0a3) - 2025-03-04 +* Updated codecarbon to work with new API +* Codecarbon now works with offline mode +* Codecarbon metadata dict is now nested +* Add PID to sender lock file so it can recover from crashes +* Add accept Gzip encoding +* Fixed list of processes to add / remove from existing list of objects +* Add step to resource metrics +* Fix bug where process user alerts should not be overridden if manually set by the user + ## [v2.0.0-alpha2](https://github.com/simvue-io/client/releases/tag/v2.0.0a2) - 2025-02-27 * Removed 'no config file' and 'unstaged changes' warnings from Offline mode as they do not apply * Made `staging_check` not apply in Offline mode diff --git a/CITATION.cff b/CITATION.cff index d2090269..3fc71db0 100644 --- a/CITATION.cff +++ b/CITATION.cff @@ -42,9 +42,9 @@ keywords: - alerting - simulation license: Apache-2.0 -commit: 83b9144abd2092d4be304bf742d72a249ad1d8ff -version: 2.0.0a2 -date-released: '2025-02-27' +commit: 64ff8a5344232d44fc7da5b6ff601d3023497977 +version: 2.0.0a3 +date-released: '2025-03-04' references: - title: mlco2/codecarbon version: v2.8.2 diff --git a/README.md b/README.md index 974a361e..6c27cc20 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ - Simvue + Simvue

diff --git a/pyproject.toml b/pyproject.toml index c49ce77c..7b03d9e5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "simvue" -version = "2.0.0a2" +version = "2.0.0a3" description = "Simulation tracking and monitoring" authors = [ {name = "Simvue Development Team", email = "info@simvue.io"} @@ -16,10 +16,10 @@ classifiers = [ "Operating System :: Unix", "Operating System :: Microsoft :: Windows", "Programming Language :: Python :: 3", - "Programming Language :: Python :: 3.9", "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", "Programming Language :: Python :: 3.12", + "Programming Language :: Python :: 3.13", "Topic :: Scientific/Engineering", "Topic :: System :: Monitoring", "Topic :: Utilities", diff --git a/simvue/api/objects/base.py b/simvue/api/objects/base.py index c5930995..84de6e1e 100644 --- a/simvue/api/objects/base.py +++ b/simvue/api/objects/base.py @@ -171,6 +171,7 @@ def __init__( { "Authorization": f"Bearer {self._user_config.server.token.get_secret_value()}", "User-Agent": _user_agent or f"Simvue Python client {__version__}", + "Accept-Encoding": "gzip", } if not self._offline else {} diff --git a/simvue/client.py b/simvue/client.py index 4bc7888d..453c1f75 100644 --- a/simvue/client.py +++ b/simvue/client.py @@ -84,7 +84,8 @@ def __init__( logger.warning(f"No {label} specified") self._headers: dict[str, str] = { - "Authorization": f"Bearer {self._user_config.server.token.get_secret_value()}" + "Authorization": f"Bearer {self._user_config.server.token.get_secret_value()}", + "Accept-Encoding": "gzip", } @prettify_pydantic diff --git a/simvue/config/parameters.py b/simvue/config/parameters.py index c6d65c93..9e0b38bc 100644 --- a/simvue/config/parameters.py +++ b/simvue/config/parameters.py @@ -48,6 +48,7 @@ def check_token(cls, v: typing.Any) -> str | None: class OfflineSpecifications(pydantic.BaseModel): cache: pathlib.Path | None = None + country_iso_code: str | None = None class MetricsSpecifications(pydantic.BaseModel): diff --git a/simvue/eco.py b/simvue/eco.py index 6ff7023b..50db508f 100644 --- a/simvue/eco.py +++ b/simvue/eco.py @@ -2,8 +2,8 @@ import logging import datetime -from codecarbon import EmissionsTracker -from codecarbon.output_methods.base_output import BaseOutput as cc_BaseOutput +from codecarbon import EmissionsTracker, OfflineEmissionsTracker +from codecarbon.output import BaseOutput as cc_BaseOutput from simvue.utilities import simvue_timestamp if typing.TYPE_CHECKING: @@ -32,30 +32,43 @@ def out( if meta_update: logger.debug("Logging CodeCarbon metadata") - self._simvue_run.update_metadata( - { - "codecarbon.country": total.country_name, - "codecarbon.country_iso_code": total.country_iso_code, - "codecarbon.region": total.region, - "codecarbon.version": total.codecarbon_version, - } + try: + self._simvue_run.update_metadata( + { + "codecarbon": { + "country": total.country_name, + "country_iso_code": total.country_iso_code, + "region": total.region, + "version": total.codecarbon_version, + } + } + ) + except AttributeError as e: + logger.error(f"Failed to update metadata: {e}") + try: + _cc_timestamp = datetime.datetime.strptime( + total.timestamp, "%Y-%m-%dT%H:%M:%S" ) - - _cc_timestamp: datetime.datetime = datetime.datetime.strptime( - total.timestamp, "%Y-%m-%dT%H:%M:%S" - ) + except ValueError as e: + logger.error(f"Error parsing timestamp: {e}") + return logger.debug("Logging CodeCarbon metrics") - self._simvue_run.log_metrics( - metrics={ - "codecarbon.total.emissions": total.emissions, - "codecarbon.total.energy_consumed": total.energy_consumed, - "codecarbon.delta.emissions": delta.emissions, - "codecarbon.delta.energy_consumed": delta.energy_consumed, - }, - step=self._metrics_step, - timestamp=simvue_timestamp(_cc_timestamp), - ) + try: + self._simvue_run.log_metrics( + metrics={ + "codecarbon.total.emissions": total.emissions, + "codecarbon.total.energy_consumed": total.energy_consumed, + "codecarbon.delta.emissions": delta.emissions, + "codecarbon.delta.energy_consumed": delta.energy_consumed, + }, + step=self._metrics_step, + timestamp=simvue_timestamp(_cc_timestamp), + ) + except ArithmeticError as e: + logger.error(f"Failed to log metrics: {e}") + return + self._metrics_step += 1 def live_out(self, total: "EmissionsData", delta: "EmissionsData") -> None: @@ -71,6 +84,36 @@ def __init__( super().__init__( project_name=project_name, measure_power_secs=metrics_interval, + api_call_interval=1, + experiment_id=None, + experiment_name=None, + logging_logger=CodeCarbonOutput(simvue_run), + save_to_logger=True, + allow_multiple_runs=True, + log_level="error", + ) + + def set_measure_interval(self, interval: int) -> None: + """Set the measure interval""" + self._set_from_conf(interval, "measure_power_secs") + + def post_init(self) -> None: + self._set_from_conf(self._simvue_run._id, "experiment_id") + self._set_from_conf(self._simvue_run._name, "experiment_name") + self.start() + + +class OfflineSimvueEmissionsTracker(OfflineEmissionsTracker): + def __init__( + self, project_name: str, simvue_run: "Run", metrics_interval: int + ) -> None: + self._simvue_run = simvue_run + logger.setLevel(logging.ERROR) + super().__init__( + country_iso_code=simvue_run._user_config.offline.country_iso_code, + project_name=project_name, + measure_power_secs=metrics_interval, + api_call_interval=1, experiment_id=None, experiment_name=None, logging_logger=CodeCarbonOutput(simvue_run), diff --git a/simvue/executor.py b/simvue/executor.py index df1dee37..951e3c16 100644 --- a/simvue/executor.py +++ b/simvue/executor.py @@ -22,6 +22,7 @@ import pathlib import time import typing +from simvue.api.objects.alert.user import UserAlert if typing.TYPE_CHECKING: import simvue @@ -113,6 +114,7 @@ def __init__(self, simvue_runner: "simvue.Run", keep_logs: bool = True) -> None: self._alert_ids: dict[str, str] = {} self.command_str: dict[str, str] = {} self._processes: dict[str, subprocess.Popen] = {} + self._all_processes: list[psutil.Process] = [] def std_out(self, process_id: str) -> str | None: if not os.path.exists(out_file := f"{self._runner.name}_{process_id}.out"): @@ -270,19 +272,48 @@ def processes(self) -> list[psutil.Process]: if not self._processes: return [] - _all_processes: list[psutil.Process] = [] + _current_processes: list[psutil.Process] = [] for process in self._processes.values(): with contextlib.suppress(psutil.NoSuchProcess): - _all_processes.append(psutil.Process(process.pid)) + _current_processes.append(psutil.Process(process.pid)) with contextlib.suppress(psutil.NoSuchProcess, psutil.ZombieProcess): - for process in _all_processes: + for process in _current_processes: for child in process.children(recursive=True): - if child not in _all_processes: - _all_processes.append(child) + if child not in _current_processes: + _current_processes.append(child) - return list(set(_all_processes)) + _current_pids = set([_process.pid for _process in _current_processes]) + _previous_pids = set([_process.pid for _process in self._all_processes]) + + # Find processes which used to exist, which are no longer running + _expired_process_pids = _previous_pids - _current_pids + + # Remove these processes from list of all processes + self._all_processes = [ + _process + for _process in self._all_processes + if _process.pid not in _expired_process_pids + ] + + # Find new processes + _new_process_pids = _current_pids - _previous_pids + _new_processes = [ + _process + for _process in _current_processes + if _process.pid in _new_process_pids + ] + + # Get CPU usage stats for each of those new processes, so that next time it's measured by the heartbeat the value is accurate + if _new_processes: + [_process.cpu_percent() for _process in _new_processes] + time.sleep(0.1) + + # Add these to the list of all processes + self._all_processes += _new_processes + + return self._all_processes @property def success(self) -> int: @@ -342,17 +373,26 @@ def _update_alerts(self) -> None: # allowing the executor to finish (and as such the run instance to exit) _wait_limit: float = 1 for proc_id, process in self._processes.items(): + # We don't want to override the user's setting for the alert status + # This is so that if a process incorrectly reports its return code, + # the user can manually set the correct status depending on logs etc. + _alert = UserAlert(identifier=self._alert_ids[proc_id]) + _is_set = _alert.get_status(run_id=self._runner._id) + if process.returncode != 0: # If the process fails then purge the dispatcher event queue # and ensure that the stderr event is sent before the run closes if self._runner._dispatcher: self._runner._dispatcher.purge() - - self._runner.log_alert( - identifier=self._alert_ids[proc_id], state="critical" - ) + if not _is_set: + self._runner.log_alert( + identifier=self._alert_ids[proc_id], state="critical" + ) else: - self._runner.log_alert(identifier=self._alert_ids[proc_id], state="ok") + if not _is_set: + self._runner.log_alert( + identifier=self._alert_ids[proc_id], state="ok" + ) _current_time: float = 0 while ( diff --git a/simvue/run.py b/simvue/run.py index ec574013..52861375 100644 --- a/simvue/run.py +++ b/simvue/run.py @@ -43,7 +43,7 @@ from .models import FOLDER_REGEX, NAME_REGEX, MetricKeyString from .system import get_system from .metadata import git_info, environment -from .eco import SimvueEmissionsTracker +from .eco import SimvueEmissionsTracker, OfflineSimvueEmissionsTracker from .utilities import ( skip_if_failed, validate_timestamp, @@ -186,7 +186,8 @@ def __init__( ) self._headers: dict[str, str] = ( { - "Authorization": f"Bearer {self._user_config.server.token.get_secret_value()}" + "Authorization": f"Bearer {self._user_config.server.token.get_secret_value()}", + "Accept-Encoding": "gzip", } if mode != "offline" else {} @@ -208,11 +209,28 @@ def __init__( ) else self._user_config.metrics.emission_metrics_interval ) - self._emissions_tracker: SimvueEmissionsTracker | None = ( - SimvueEmissionsTracker("simvue", self, self._emission_metrics_interval) - if self._user_config.metrics.enable_emission_metrics - else None - ) + if mode == "offline": + if ( + self._user_config.metrics.enable_emission_metrics + and not self._user_config.offline.country_iso_code + ): + raise ValueError( + "Country ISO code must be provided if tracking emissions metrics in offline mode." + ) + + self._emissions_tracker: OfflineSimvueEmissionsTracker | None = ( + OfflineSimvueEmissionsTracker( + "simvue", self, self._emission_metrics_interval + ) + if self._user_config.metrics.enable_emission_metrics + else None + ) + else: + self._emissions_tracker: SimvueEmissionsTracker | None = ( + SimvueEmissionsTracker("simvue", self, self._emission_metrics_interval) + if self._user_config.metrics.enable_emission_metrics + else None + ) def __enter__(self) -> Self: return self @@ -296,16 +314,11 @@ def processes(self) -> list[psutil.Process]: return process_list process_list += [self._parent_process] - - # Attach child processes relating to the process set by set_pid - with contextlib.suppress(psutil.NoSuchProcess, psutil.ZombieProcess): - for child in self._parent_process.children(recursive=True): - if child not in process_list: - process_list.append(child) + process_list += self._child_processes return list(set(process_list)) - def _get_sysinfo(self) -> dict[str, typing.Any]: + def _get_sysinfo(self, interval: float | None = None) -> dict[str, typing.Any]: """Retrieve system administration Parameters @@ -319,7 +332,7 @@ def _get_sysinfo(self) -> dict[str, typing.Any]: retrieved system specifications """ processes = self.processes - cpu = get_process_cpu(processes, interval=0.1) + cpu = get_process_cpu(processes, interval=interval) memory = get_process_memory(processes) gpu = get_gpu_metrics(processes) data: dict[str, typing.Any] = {} @@ -358,7 +371,10 @@ def _heartbeat( last_res_metric_call = time.time() if self._resources_metrics_interval: - self._add_metrics_to_dispatch(self._get_sysinfo(), join_on_fail=False) + self._add_metrics_to_dispatch( + self._get_sysinfo(interval=1), join_on_fail=False, step=0 + ) + res_step = 1 while not heartbeat_trigger.is_set(): time.sleep(0.1) @@ -373,9 +389,10 @@ def _heartbeat( # join would be called on this thread and a thread cannot # join itself! self._add_metrics_to_dispatch( - self._get_sysinfo(), join_on_fail=False + self._get_sysinfo(), join_on_fail=False, step=res_step ) last_res_metric_call = res_time + res_step += 1 if time.time() - last_heartbeat < self._heartbeat_interval: continue @@ -489,6 +506,9 @@ def _start(self, reconnect: bool = False) -> bool: self._pid = os.getpid() self._parent_process = psutil.Process(self._pid) if self._pid else None + self._child_processes = ( + self._get_child_processes() if self._parent_process else None + ) self._shutdown_event = threading.Event() self._heartbeat_termination_trigger = threading.Event() @@ -903,6 +923,16 @@ def kill_all_processes(self) -> None: ) self._executor.kill_all() + def _get_child_processes(self) -> list[psutil.Process]: + _process_list = [] + # Attach child processes relating to the process set by set_pid + with contextlib.suppress(psutil.NoSuchProcess, psutil.ZombieProcess): + for child in self._parent_process.children(recursive=True): + if child not in _process_list: + _process_list.append(child) + + return list(set(_process_list)) + @property def executor(self) -> Executor: """Return the executor for this run""" @@ -958,6 +988,13 @@ def set_pid(self, pid: int) -> None: """ self._pid = pid self._parent_process = psutil.Process(self._pid) + self._child_processes = self._get_child_processes() + # Get CPU usage stats for each of those new processes, so that next time it's measured by the heartbeat the value is accurate + [ + _process.cpu_percent() + for _process in self._child_processes + [self._parent_process] + ] + time.sleep(0.1) @skip_if_failed("_aborted", "_suppress_errors", False) @pydantic.validate_call @@ -1028,9 +1065,22 @@ def config( self._emission_metrics_interval = emission_metrics_interval if enable_emission_metrics: - self._emissions_tracker = SimvueEmissionsTracker( - "simvue", self, self._emission_metrics_interval - ) + if self._user_config.run.mode == "offline": + if not self._user_config.offline.country_iso_code: + self._error( + "Country ISO code must be provided if tracking emissions metrics in offline mode." + ) + self._emissions_tracker: OfflineSimvueEmissionsTracker = ( + OfflineSimvueEmissionsTracker( + "simvue", self, self._emission_metrics_interval + ) + ) + else: + self._emissions_tracker: SimvueEmissionsTracker = ( + SimvueEmissionsTracker( + "simvue", self, self._emission_metrics_interval + ) + ) # If the main Run API object is initialised the run is active # hence the tracker should start too @@ -1656,6 +1706,11 @@ def add_alerts( names = names or [] if names and not ids: + if self._user_config.run.mode == "offline": + self._error( + "Cannot retrieve alerts based on names in offline mode - please use IDs instead." + ) + return False try: if alerts := Alert.get(offline=self._user_config.run.mode == "offline"): ids += [id for id, alert in alerts if alert.name in names] @@ -1956,6 +2011,12 @@ def log_alert( self._error("Please specify alert to update either by ID or by name.") return False + if name and self._user_config.run.mode == "offline": + self._error( + "Cannot retrieve alerts based on names in offline mode - please use IDs instead." + ) + return False + if name: try: if alerts := Alert.get(offline=self._user_config.run.mode == "offline"): diff --git a/simvue/sender.py b/simvue/sender.py index 8a552f6e..d747dc9b 100644 --- a/simvue/sender.py +++ b/simvue/sender.py @@ -11,6 +11,7 @@ from concurrent.futures import ThreadPoolExecutor import threading import requests +import psutil from simvue.config.user import SimvueConfiguration import simvue.api.objects @@ -167,13 +168,14 @@ def sender( cache_dir = cache_dir or _user_config.offline.cache cache_dir.joinpath("server_ids").mkdir(parents=True, exist_ok=True) + _lock_path = cache_dir.joinpath("sender.lock") # Check that no other sender is already currently running... - if cache_dir.joinpath("sender.lock").exists(): + if _lock_path.exists() and psutil.pid_exists(int(_lock_path.read_text())): raise RuntimeError("A sender is already running for this cache!") # Create lock file to prevent other senders running while this one isn't finished - cache_dir.joinpath("sender.lock").touch() + _lock_path.write_text(str(psutil.Process().pid)) _id_mapping: dict[str, str] = { file_path.name.split(".")[0]: file_path.read_text() @@ -233,5 +235,5 @@ def sender( _heartbeat_files, ) # Remove lock file to allow another sender to start in the future - cache_dir.joinpath("sender.lock").unlink() + _lock_path.unlink() return _id_mapping diff --git a/tests/functional/test_run_class.py b/tests/functional/test_run_class.py index e3654e06..e4f6f55a 100644 --- a/tests/functional/test_run_class.py +++ b/tests/functional/test_run_class.py @@ -55,8 +55,21 @@ def test_run_with_emissions() -> None: run_created.config(enable_emission_metrics=True, emission_metrics_interval=1) time.sleep(5) _run = RunObject(identifier=run_created.id) - assert list(_run.metrics) - + _metric_names = [item[0] for item in _run.metrics] + client = sv_cl.Client() + for _metric in ["emissions", "energy_consumed"]: + _total_metric_name = f'codecarbon.total.{_metric}' + _delta_metric_name = f'codecarbon.delta.{_metric}' + assert _total_metric_name in _metric_names + assert _delta_metric_name in _metric_names + _metric_values = client.get_metric_values(metric_names=[_total_metric_name, _delta_metric_name], xaxis="time", output_format="dataframe", run_ids=[run_created.id]) + + # Check that total = previous total + latest delta + _total_values = _metric_values[_total_metric_name].tolist() + _delta_values = _metric_values[_delta_metric_name].tolist() + assert len(_total_values) > 1 + for i in range(1, len(_total_values)): + assert _total_values[i] == _total_values[i-1] + _delta_values[i] @pytest.mark.run @pytest.mark.parametrize("timestamp", (datetime.datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f"), None), ids=("timestamp", "no_timestamp"))