Skip to content

2.0.0a3 release #742

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Mar 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,15 @@
# Change log

## [v2.0.0-alpha3](https://github.com/simvue-io/client/releases/tag/v2.0.0a3) - 2025-03-04
* Updated codecarbon to work with new API
* Codecarbon now works with offline mode
* Codecarbon metadata dict is now nested
* Add PID to sender lock file so it can recover from crashes
* Add accept Gzip encoding
* Fixed list of processes to add / remove from existing list of objects
* Add step to resource metrics
* Fix bug where process user alerts should not be overridden if manually set by the user

## [v2.0.0-alpha2](https://github.com/simvue-io/client/releases/tag/v2.0.0a2) - 2025-02-27
* Removed 'no config file' and 'unstaged changes' warnings from Offline mode as they do not apply
* Made `staging_check` not apply in Offline mode
Expand Down
6 changes: 3 additions & 3 deletions CITATION.cff
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ keywords:
- alerting
- simulation
license: Apache-2.0
commit: 83b9144abd2092d4be304bf742d72a249ad1d8ff
version: 2.0.0a2
date-released: '2025-02-27'
commit: 64ff8a5344232d44fc7da5b6ff601d3023497977
version: 2.0.0a3
date-released: '2025-03-04'
references:
- title: mlco2/codecarbon
version: v2.8.2
Expand Down
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "simvue"
version = "2.0.0a2"
version = "2.0.0a3"
description = "Simulation tracking and monitoring"
authors = [
{name = "Simvue Development Team", email = "[email protected]"}
Expand Down
1 change: 1 addition & 0 deletions simvue/api/objects/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ def __init__(
{
"Authorization": f"Bearer {self._user_config.server.token.get_secret_value()}",
"User-Agent": _user_agent or f"Simvue Python client {__version__}",
"Accept-Encoding": "gzip",
}
if not self._offline
else {}
Expand Down
3 changes: 2 additions & 1 deletion simvue/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ def __init__(
logger.warning(f"No {label} specified")

self._headers: dict[str, str] = {
"Authorization": f"Bearer {self._user_config.server.token.get_secret_value()}"
"Authorization": f"Bearer {self._user_config.server.token.get_secret_value()}",
"Accept-Encoding": "gzip",
}

@prettify_pydantic
Expand Down
1 change: 1 addition & 0 deletions simvue/config/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def check_token(cls, v: typing.Any) -> str | None:

class OfflineSpecifications(pydantic.BaseModel):
cache: pathlib.Path | None = None
country_iso_code: str | None = None


class MetricsSpecifications(pydantic.BaseModel):
Expand Down
89 changes: 66 additions & 23 deletions simvue/eco.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@
import logging
import datetime

from codecarbon import EmissionsTracker
from codecarbon.output_methods.base_output import BaseOutput as cc_BaseOutput
from codecarbon import EmissionsTracker, OfflineEmissionsTracker
from codecarbon.output import BaseOutput as cc_BaseOutput
from simvue.utilities import simvue_timestamp

if typing.TYPE_CHECKING:
Expand Down Expand Up @@ -32,30 +32,43 @@ def out(

if meta_update:
logger.debug("Logging CodeCarbon metadata")
self._simvue_run.update_metadata(
{
"codecarbon.country": total.country_name,
"codecarbon.country_iso_code": total.country_iso_code,
"codecarbon.region": total.region,
"codecarbon.version": total.codecarbon_version,
}
try:
self._simvue_run.update_metadata(
{
"codecarbon": {
"country": total.country_name,
"country_iso_code": total.country_iso_code,
"region": total.region,
"version": total.codecarbon_version,
}
}
)
except AttributeError as e:
logger.error(f"Failed to update metadata: {e}")
try:
_cc_timestamp = datetime.datetime.strptime(
total.timestamp, "%Y-%m-%dT%H:%M:%S"
)

_cc_timestamp: datetime.datetime = datetime.datetime.strptime(
total.timestamp, "%Y-%m-%dT%H:%M:%S"
)
except ValueError as e:
logger.error(f"Error parsing timestamp: {e}")
return

logger.debug("Logging CodeCarbon metrics")
self._simvue_run.log_metrics(
metrics={
"codecarbon.total.emissions": total.emissions,
"codecarbon.total.energy_consumed": total.energy_consumed,
"codecarbon.delta.emissions": delta.emissions,
"codecarbon.delta.energy_consumed": delta.energy_consumed,
},
step=self._metrics_step,
timestamp=simvue_timestamp(_cc_timestamp),
)
try:
self._simvue_run.log_metrics(
metrics={
"codecarbon.total.emissions": total.emissions,
"codecarbon.total.energy_consumed": total.energy_consumed,
"codecarbon.delta.emissions": delta.emissions,
"codecarbon.delta.energy_consumed": delta.energy_consumed,
},
step=self._metrics_step,
timestamp=simvue_timestamp(_cc_timestamp),
)
except ArithmeticError as e:
logger.error(f"Failed to log metrics: {e}")
return

self._metrics_step += 1

def live_out(self, total: "EmissionsData", delta: "EmissionsData") -> None:
Expand All @@ -71,6 +84,36 @@ def __init__(
super().__init__(
project_name=project_name,
measure_power_secs=metrics_interval,
api_call_interval=1,
experiment_id=None,
experiment_name=None,
logging_logger=CodeCarbonOutput(simvue_run),
save_to_logger=True,
allow_multiple_runs=True,
log_level="error",
)

def set_measure_interval(self, interval: int) -> None:
"""Set the measure interval"""
self._set_from_conf(interval, "measure_power_secs")

def post_init(self) -> None:
self._set_from_conf(self._simvue_run._id, "experiment_id")
self._set_from_conf(self._simvue_run._name, "experiment_name")
self.start()


class OfflineSimvueEmissionsTracker(OfflineEmissionsTracker):
def __init__(
self, project_name: str, simvue_run: "Run", metrics_interval: int
) -> None:
self._simvue_run = simvue_run
logger.setLevel(logging.ERROR)
super().__init__(
country_iso_code=simvue_run._user_config.offline.country_iso_code,
project_name=project_name,
measure_power_secs=metrics_interval,
api_call_interval=1,
experiment_id=None,
experiment_name=None,
logging_logger=CodeCarbonOutput(simvue_run),
Expand Down
62 changes: 51 additions & 11 deletions simvue/executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import pathlib
import time
import typing
from simvue.api.objects.alert.user import UserAlert

if typing.TYPE_CHECKING:
import simvue
Expand Down Expand Up @@ -113,6 +114,7 @@ def __init__(self, simvue_runner: "simvue.Run", keep_logs: bool = True) -> None:
self._alert_ids: dict[str, str] = {}
self.command_str: dict[str, str] = {}
self._processes: dict[str, subprocess.Popen] = {}
self._all_processes: list[psutil.Process] = []

def std_out(self, process_id: str) -> str | None:
if not os.path.exists(out_file := f"{self._runner.name}_{process_id}.out"):
Expand Down Expand Up @@ -270,19 +272,48 @@ def processes(self) -> list[psutil.Process]:
if not self._processes:
return []

_all_processes: list[psutil.Process] = []
_current_processes: list[psutil.Process] = []

for process in self._processes.values():
with contextlib.suppress(psutil.NoSuchProcess):
_all_processes.append(psutil.Process(process.pid))
_current_processes.append(psutil.Process(process.pid))

with contextlib.suppress(psutil.NoSuchProcess, psutil.ZombieProcess):
for process in _all_processes:
for process in _current_processes:
for child in process.children(recursive=True):
if child not in _all_processes:
_all_processes.append(child)
if child not in _current_processes:
_current_processes.append(child)

return list(set(_all_processes))
_current_pids = set([_process.pid for _process in _current_processes])
_previous_pids = set([_process.pid for _process in self._all_processes])

# Find processes which used to exist, which are no longer running
_expired_process_pids = _previous_pids - _current_pids

# Remove these processes from list of all processes
self._all_processes = [
_process
for _process in self._all_processes
if _process.pid not in _expired_process_pids
]

# Find new processes
_new_process_pids = _current_pids - _previous_pids
_new_processes = [
_process
for _process in _current_processes
if _process.pid in _new_process_pids
]

# Get CPU usage stats for each of those new processes, so that next time it's measured by the heartbeat the value is accurate
if _new_processes:
[_process.cpu_percent() for _process in _new_processes]
time.sleep(0.1)

# Add these to the list of all processes
self._all_processes += _new_processes

return self._all_processes

@property
def success(self) -> int:
Expand Down Expand Up @@ -342,17 +373,26 @@ def _update_alerts(self) -> None:
# allowing the executor to finish (and as such the run instance to exit)
_wait_limit: float = 1
for proc_id, process in self._processes.items():
# We don't want to override the user's setting for the alert status
# This is so that if a process incorrectly reports its return code,
# the user can manually set the correct status depending on logs etc.
_alert = UserAlert(identifier=self._alert_ids[proc_id])
_is_set = _alert.get_status(run_id=self._runner._id)

if process.returncode != 0:
# If the process fails then purge the dispatcher event queue
# and ensure that the stderr event is sent before the run closes
if self._runner._dispatcher:
self._runner._dispatcher.purge()

self._runner.log_alert(
identifier=self._alert_ids[proc_id], state="critical"
)
if not _is_set:
self._runner.log_alert(
identifier=self._alert_ids[proc_id], state="critical"
)
else:
self._runner.log_alert(identifier=self._alert_ids[proc_id], state="ok")
if not _is_set:
self._runner.log_alert(
identifier=self._alert_ids[proc_id], state="ok"
)

_current_time: float = 0
while (
Expand Down
Loading
Loading