Skip to content

Commit 37ba260

Browse files
authored
Merge pull request #664 from Open-EO/issue588-jm_add_costs
added costs as a column to tracking databases
2 parents 34d42b8 + 1e7878e commit 37ba260

File tree

4 files changed

+50
-19
lines changed

4 files changed

+50
-19
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
1111

1212
### Changed
1313

14+
- `MultiBackendJobManager`: costs has been added as a column in tracking databases ([[#588](https://github.com/Open-EO/openeo-python-client/issues/588)])
15+
1416
### Removed
1517

1618
### Fixed

openeo/extra/job_management.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -207,6 +207,7 @@ def start_job(
207207
"cpu": _ColumnProperties(dtype="str"),
208208
"memory": _ColumnProperties(dtype="str"),
209209
"duration": _ColumnProperties(dtype="str"),
210+
"costs": _ColumnProperties(dtype="float64"),
210211
}
211212

212213
def __init__(
@@ -744,6 +745,8 @@ def _track_statuses(self, job_db: JobDatabaseInterface, stats: Optional[dict] =
744745
for key in job_metadata.get("usage", {}).keys():
745746
if key in active.columns:
746747
active.loc[i, key] = _format_usage_stat(job_metadata, key)
748+
if "costs" in job_metadata.keys():
749+
active.loc[i, "costs"] = job_metadata.get("costs")
747750

748751
except OpenEoApiError as e:
749752
# TODO: inspect status code and e.g. differentiate between 4xx/5xx

openeo/rest/_testing.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -225,11 +225,19 @@ def _handle_get_job(self, request, context):
225225
self.batch_jobs[job_id]["status"] = self._get_job_status(
226226
job_id=job_id, current_status=self.batch_jobs[job_id]["status"]
227227
)
228-
return {
228+
result = {
229229
# TODO: add some more required fields like "process" and "created"?
230230
"id": job_id,
231231
"status": self.batch_jobs[job_id]["status"],
232232
}
233+
if self.batch_jobs[job_id]["status"] == "finished": # HACK some realistic values for a small job
234+
result["costs"] = 123
235+
result["usage"] = {
236+
"cpu": {"unit": "cpu-seconds", "value": 1234.5},
237+
"memory": {"unit": "mb-seconds", "value": 34567.89},
238+
"duration": {"unit": "seconds", "value": 2345},
239+
}
240+
return result
233241

234242
def _handle_get_job_results(self, request, context):
235243
"""Handler of `GET /job/{job_id}/results` (list batch job results)."""

tests/extra/test_job_management.py

Lines changed: 36 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
# httpretty avoids this specific problem because it mocks at the socket level,
1919
# But I would rather not have two dependencies with almost the same goal.
2020
import httpretty
21+
import numpy as np
2122
import pandas
2223
import pandas as pd
2324
import pytest
@@ -166,12 +167,15 @@ def test_basic(self, tmp_path, job_manager, job_manager_root_dir, sleep_mock):
166167
}
167168
)
168169

169-
assert [(r.id, r.status, r.backend_name) for r in pd.read_csv(job_db_path).itertuples()] == [
170-
("job-2018", "finished", "foo"),
171-
("job-2019", "finished", "foo"),
172-
("job-2020", "finished", "bar"),
173-
("job-2021", "finished", "bar"),
174-
("job-2022", "finished", "foo"),
170+
assert [
171+
(r.id, r.status, r.backend_name, r.cpu, r.memory, r.duration, r.costs)
172+
for r in pd.read_csv(job_db_path).itertuples()
173+
] == [
174+
("job-2018", "finished", "foo", "1234.5 cpu-seconds", "34567.89 mb-seconds", "2345 seconds", 123),
175+
("job-2019", "finished", "foo", "1234.5 cpu-seconds", "34567.89 mb-seconds", "2345 seconds", 123),
176+
("job-2020", "finished", "bar", "1234.5 cpu-seconds", "34567.89 mb-seconds", "2345 seconds", 123),
177+
("job-2021", "finished", "bar", "1234.5 cpu-seconds", "34567.89 mb-seconds", "2345 seconds", 123),
178+
("job-2022", "finished", "foo", "1234.5 cpu-seconds", "34567.89 mb-seconds", "2345 seconds", 123),
175179
]
176180

177181
# Check downloaded results and metadata.
@@ -204,6 +208,10 @@ def test_db_class(self, tmp_path, job_manager, job_manager_root_dir, sleep_mock,
204208
assert len(result) == 5
205209
assert set(result.status) == {"finished"}
206210
assert set(result.backend_name) == {"foo", "bar"}
211+
assert set(result.cpu) == {"1234.5 cpu-seconds"}
212+
assert set(result.memory) == {"34567.89 mb-seconds"}
213+
assert set(result.duration) == {"2345 seconds"}
214+
assert set(result.costs) == {123}
207215

208216
@pytest.mark.parametrize(
209217
["filename", "expected_db_class"],
@@ -254,12 +262,15 @@ def test_basic_threading(self, tmp_path, job_manager, job_manager_root_dir, slee
254262
# TODO #645 how to collect stats with the threaded run_job?
255263
assert sleep_mock.call_count > 10
256264

257-
assert [(r.id, r.status, r.backend_name) for r in pd.read_csv(job_db_path).itertuples()] == [
258-
("job-2018", "finished", "foo"),
259-
("job-2019", "finished", "foo"),
260-
("job-2020", "finished", "bar"),
261-
("job-2021", "finished", "bar"),
262-
("job-2022", "finished", "foo"),
265+
assert [
266+
(r.id, r.status, r.backend_name, r.cpu, r.memory, r.duration, r.costs)
267+
for r in pd.read_csv(job_db_path).itertuples()
268+
] == [
269+
("job-2018", "finished", "foo", "1234.5 cpu-seconds", "34567.89 mb-seconds", "2345 seconds", 123),
270+
("job-2019", "finished", "foo", "1234.5 cpu-seconds", "34567.89 mb-seconds", "2345 seconds", 123),
271+
("job-2020", "finished", "bar", "1234.5 cpu-seconds", "34567.89 mb-seconds", "2345 seconds", 123),
272+
("job-2021", "finished", "bar", "1234.5 cpu-seconds", "34567.89 mb-seconds", "2345 seconds", 123),
273+
("job-2022", "finished", "foo", "1234.5 cpu-seconds", "34567.89 mb-seconds", "2345 seconds", 123),
263274
]
264275

265276
# Check downloaded results and metadata.
@@ -283,6 +294,7 @@ def test_normalize_df(self):
283294
"memory",
284295
"duration",
285296
"backend_name",
297+
"costs",
286298
]
287299
)
288300

@@ -333,12 +345,15 @@ def start_worker_thread():
333345
)
334346

335347
# Also check that we got sensible end results in the job db.
336-
assert [(r.id, r.status, r.backend_name) for r in pd.read_csv(job_db_path).itertuples()] == [
337-
("job-2018", "finished", "foo"),
338-
("job-2019", "finished", "foo"),
339-
("job-2020", "finished", "bar"),
340-
("job-2021", "finished", "bar"),
341-
("job-2022", "error", "foo"),
348+
results = pd.read_csv(job_db_path).replace({np.nan: None}) # np.nan's are replaced by None for easy comparison
349+
assert [
350+
(r.id, r.status, r.backend_name, r.cpu, r.memory, r.duration, r.costs) for r in results.itertuples()
351+
] == [
352+
("job-2018", "finished", "foo", "1234.5 cpu-seconds", "34567.89 mb-seconds", "2345 seconds", 123),
353+
("job-2019", "finished", "foo", "1234.5 cpu-seconds", "34567.89 mb-seconds", "2345 seconds", 123),
354+
("job-2020", "finished", "bar", "1234.5 cpu-seconds", "34567.89 mb-seconds", "2345 seconds", 123),
355+
("job-2021", "finished", "bar", "1234.5 cpu-seconds", "34567.89 mb-seconds", "2345 seconds", 123),
356+
("job-2022", "error", "foo", None, None, None, None),
342357
]
343358

344359
# Check downloaded results and metadata.
@@ -673,6 +688,7 @@ def test_initialize_from_df(self, tmp_path, db_class):
673688
"memory",
674689
"duration",
675690
"backend_name",
691+
"costs",
676692
}
677693

678694
actual_columns = set(db_class(path).read().columns)
@@ -852,6 +868,7 @@ def test_initialize_from_df(self, tmp_path):
852868
"memory",
853869
"duration",
854870
"backend_name",
871+
"costs",
855872
}
856873

857874
# Raw file content check
@@ -930,6 +947,7 @@ def test_initialize_from_df(self, tmp_path):
930947
"memory",
931948
"duration",
932949
"backend_name",
950+
"costs",
933951
}
934952

935953
df_from_disk = ParquetJobDatabase(path).read()

0 commit comments

Comments
 (0)