From fd19d15b6df13c8ca847906ab0acb4ac34dd3f4c Mon Sep 17 00:00:00 2001 From: Jan Van den bosch Date: Fri, 20 Jun 2025 15:08:15 +0200 Subject: [PATCH 1/4] support results_metadata_uri in job entity https://github.com/Open-EO/openeo-geopyspark-driver/issues/1255 --- openeo_driver/backend.py | 1 + openeo_driver/jobregistry.py | 11 +++++++++++ tests/test_jobregistry.py | 9 +++++++++ 3 files changed, 21 insertions(+) diff --git a/openeo_driver/backend.py b/openeo_driver/backend.py index 70c002bd..b90bb13e 100644 --- a/openeo_driver/backend.py +++ b/openeo_driver/backend.py @@ -355,6 +355,7 @@ class BatchJobMetadata(NamedTuple): # TODO #190 the STAC projection extension fields "proj:..." are not batch job metadata, but batch job *result* metadata: proj_shape: Optional[List[int]] = None proj_bbox: Optional[List[int]] = None + results_metadata_uri: Optional[str] = None @property def duration(self) -> Union[timedelta, None]: diff --git a/openeo_driver/jobregistry.py b/openeo_driver/jobregistry.py index d32b2659..87b774ae 100644 --- a/openeo_driver/jobregistry.py +++ b/openeo_driver/jobregistry.py @@ -146,6 +146,11 @@ def set_results_metadata( ) -> None: raise NotImplementedError + def set_results_metadata_uri( + self, job_id: str, *, user_id: Optional[str] = None, results_metadata_uri: str + ) -> None: + raise NotImplementedError + def list_user_jobs( self, user_id: str, @@ -227,6 +232,7 @@ def map_results_metadata_safe(result_metadata_prop: str, f): costs=job_info.get("costs"), proj_shape=get_results_metadata("proj:shape"), proj_bbox=get_results_metadata("proj:bbox"), + results_metadata_uri=job_info.get("results_metadata_uri"), ) @@ -610,6 +616,11 @@ def set_proxy_user(self, job_id: str, *, user_id: Optional[str] = None, proxy_us def set_application_id(self, job_id: str, *, user_id: Optional[str] = None, application_id: str) -> None: self._update(job_id=job_id, data={"application_id": application_id}) + def set_results_metadata_uri( + self, job_id: str, *, user_id: Optional[str] = None, results_metadata_uri: str + ) -> None: + self._update(job_id=job_id, data={"results_metadata_uri": results_metadata_uri}) + def _search(self, query: dict, fields: Optional[List[str]] = None) -> List[JobDict]: # TODO: sorting, pagination? fields = set(fields or []) diff --git a/tests/test_jobregistry.py b/tests/test_jobregistry.py index 4122b68b..8962fe4f 100644 --- a/tests/test_jobregistry.py +++ b/tests/test_jobregistry.py @@ -992,6 +992,15 @@ def test_set_application_id(self, requests_mock, oidc_mock, ejr): ejr.set_application_id(job_id="job-123", application_id="app-456") assert patch_mock.call_count == 1 + def test_set_results_metadata_uri(self, requests_mock, oidc_mock, ejr): + handler = self._handle_patch_jobs( + oidc_mock=oidc_mock, expected_data={"results_metadata_uri": "s3://bucket/path/to/job_metadata.json"} + ) + patch_mock = requests_mock.patch(f"{self.EJR_API_URL}/jobs/job-123", json=handler) + + ejr.set_results_metadata_uri(job_id="job-123", results_metadata_uri="s3://bucket/path/to/job_metadata.json") + assert patch_mock.call_count == 1 + def test_set_results_metadata(self, requests_mock, oidc_mock, ejr): handler = self._handle_patch_jobs( oidc_mock=oidc_mock, expected_data={ From 078d60db0352141f4927231bd98efafe04599ee2 Mon Sep 17 00:00:00 2001 From: Jan Van den bosch Date: Fri, 20 Jun 2025 15:28:10 +0200 Subject: [PATCH 2/4] test reading of results_metadata_uri https://github.com/Open-EO/openeo-geopyspark-driver/issues/1255 --- tests/test_jobregistry.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/tests/test_jobregistry.py b/tests/test_jobregistry.py index 8962fe4f..c120c877 100644 --- a/tests/test_jobregistry.py +++ b/tests/test_jobregistry.py @@ -21,6 +21,7 @@ EjrApiResponseError, EjrError, ElasticJobRegistry, + ejr_job_info_to_metadata, get_ejr_credentials_from_env, ) from openeo_driver.testing import ( @@ -98,6 +99,17 @@ def test_get_partial_job_status(): assert PARTIAL_JOB_STATUS.for_job_status(JOB_STATUS.CANCELED) == 'canceled' +def test_ejr_job_info_to_metadata(): + job_info = { + "job_id": "j-123", + "status": "running", + "results_metadata_uri": "s3://bucket/path/to/job_metadata.json", + } + + metadata = ejr_job_info_to_metadata(job_info) + assert metadata.results_metadata_uri == "s3://bucket/path/to/job_metadata.json" + + class TestElasticJobRegistry: EJR_API_URL = "https://ejr.test" From 7a7f785a00c228af0c96615458d773f109988bd8 Mon Sep 17 00:00:00 2001 From: Jan Van den bosch Date: Fri, 20 Jun 2025 16:04:42 +0200 Subject: [PATCH 3/4] adapt version and CHANGELOG https://github.com/Open-EO/openeo-geopyspark-driver/issues/1255 --- CHANGELOG.md | 1 + openeo_driver/_version.py | 2 +- 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4f59e0e8..b310ca8d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ and start a new "In Progress" section above it. - Have `integrations.s3` for interaction with Object Storage that follows the S3 API. - `ElasticJobRegistry`: add support for pre-serialization of process graph ([Open-EO/openeo-geopyspark-driver#1232](https://github.com/Open-EO/openeo-geopyspark-driver/issues/1232)) +- Support `results_metadata_uri` in batch job entity ([[Open-EO/openeo-geopyspark-driver#1255](https://github.com/Open-EO/openeo-geopyspark-driver/issues/1255)]) ## 0.134.0 diff --git a/openeo_driver/_version.py b/openeo_driver/_version.py index 4f450854..7fd448da 100644 --- a/openeo_driver/_version.py +++ b/openeo_driver/_version.py @@ -1 +1 @@ -__version__ = "0.135.0a3" +__version__ = "0.135.0a4" From 4211e264dade7024170176f708b931ef76e803da Mon Sep 17 00:00:00 2001 From: Jan Van den bosch Date: Fri, 27 Jun 2025 13:45:00 +0200 Subject: [PATCH 4/4] avoid implementation detail "results_metadata_uri" in API #1255 --- CHANGELOG.md | 2 +- openeo_driver/backend.py | 1 - openeo_driver/jobregistry.py | 1 - tests/test_jobregistry.py | 2 +- 4 files changed, 2 insertions(+), 4 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index b310ca8d..81bda63e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,7 +23,7 @@ and start a new "In Progress" section above it. - Have `integrations.s3` for interaction with Object Storage that follows the S3 API. - `ElasticJobRegistry`: add support for pre-serialization of process graph ([Open-EO/openeo-geopyspark-driver#1232](https://github.com/Open-EO/openeo-geopyspark-driver/issues/1232)) -- Support `results_metadata_uri` in batch job entity ([[Open-EO/openeo-geopyspark-driver#1255](https://github.com/Open-EO/openeo-geopyspark-driver/issues/1255)]) +- Support persisting results metadata URI in job registry ([Open-EO/openeo-geopyspark-driver#1255](https://github.com/Open-EO/openeo-geopyspark-driver/issues/1255)) ## 0.134.0 diff --git a/openeo_driver/backend.py b/openeo_driver/backend.py index b90bb13e..70c002bd 100644 --- a/openeo_driver/backend.py +++ b/openeo_driver/backend.py @@ -355,7 +355,6 @@ class BatchJobMetadata(NamedTuple): # TODO #190 the STAC projection extension fields "proj:..." are not batch job metadata, but batch job *result* metadata: proj_shape: Optional[List[int]] = None proj_bbox: Optional[List[int]] = None - results_metadata_uri: Optional[str] = None @property def duration(self) -> Union[timedelta, None]: diff --git a/openeo_driver/jobregistry.py b/openeo_driver/jobregistry.py index 87b774ae..21033369 100644 --- a/openeo_driver/jobregistry.py +++ b/openeo_driver/jobregistry.py @@ -232,7 +232,6 @@ def map_results_metadata_safe(result_metadata_prop: str, f): costs=job_info.get("costs"), proj_shape=get_results_metadata("proj:shape"), proj_bbox=get_results_metadata("proj:bbox"), - results_metadata_uri=job_info.get("results_metadata_uri"), ) diff --git a/tests/test_jobregistry.py b/tests/test_jobregistry.py index c120c877..6fd876e3 100644 --- a/tests/test_jobregistry.py +++ b/tests/test_jobregistry.py @@ -107,7 +107,7 @@ def test_ejr_job_info_to_metadata(): } metadata = ejr_job_info_to_metadata(job_info) - assert metadata.results_metadata_uri == "s3://bucket/path/to/job_metadata.json" + assert metadata.status == JOB_STATUS.RUNNING class TestElasticJobRegistry: