diff --git a/CHANGELOG.md b/CHANGELOG.md index 4f59e0e8..81bda63e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -23,6 +23,7 @@ and start a new "In Progress" section above it. - Have `integrations.s3` for interaction with Object Storage that follows the S3 API. - `ElasticJobRegistry`: add support for pre-serialization of process graph ([Open-EO/openeo-geopyspark-driver#1232](https://github.com/Open-EO/openeo-geopyspark-driver/issues/1232)) +- Support persisting results metadata URI in job registry ([Open-EO/openeo-geopyspark-driver#1255](https://github.com/Open-EO/openeo-geopyspark-driver/issues/1255)) ## 0.134.0 diff --git a/openeo_driver/jobregistry.py b/openeo_driver/jobregistry.py index d32b2659..21033369 100644 --- a/openeo_driver/jobregistry.py +++ b/openeo_driver/jobregistry.py @@ -146,6 +146,11 @@ def set_results_metadata( ) -> None: raise NotImplementedError + def set_results_metadata_uri( + self, job_id: str, *, user_id: Optional[str] = None, results_metadata_uri: str + ) -> None: + raise NotImplementedError + def list_user_jobs( self, user_id: str, @@ -610,6 +615,11 @@ def set_proxy_user(self, job_id: str, *, user_id: Optional[str] = None, proxy_us def set_application_id(self, job_id: str, *, user_id: Optional[str] = None, application_id: str) -> None: self._update(job_id=job_id, data={"application_id": application_id}) + def set_results_metadata_uri( + self, job_id: str, *, user_id: Optional[str] = None, results_metadata_uri: str + ) -> None: + self._update(job_id=job_id, data={"results_metadata_uri": results_metadata_uri}) + def _search(self, query: dict, fields: Optional[List[str]] = None) -> List[JobDict]: # TODO: sorting, pagination? fields = set(fields or []) diff --git a/tests/test_jobregistry.py b/tests/test_jobregistry.py index 4122b68b..6fd876e3 100644 --- a/tests/test_jobregistry.py +++ b/tests/test_jobregistry.py @@ -21,6 +21,7 @@ EjrApiResponseError, EjrError, ElasticJobRegistry, + ejr_job_info_to_metadata, get_ejr_credentials_from_env, ) from openeo_driver.testing import ( @@ -98,6 +99,17 @@ def test_get_partial_job_status(): assert PARTIAL_JOB_STATUS.for_job_status(JOB_STATUS.CANCELED) == 'canceled' +def test_ejr_job_info_to_metadata(): + job_info = { + "job_id": "j-123", + "status": "running", + "results_metadata_uri": "s3://bucket/path/to/job_metadata.json", + } + + metadata = ejr_job_info_to_metadata(job_info) + assert metadata.status == JOB_STATUS.RUNNING + + class TestElasticJobRegistry: EJR_API_URL = "https://ejr.test" @@ -992,6 +1004,15 @@ def test_set_application_id(self, requests_mock, oidc_mock, ejr): ejr.set_application_id(job_id="job-123", application_id="app-456") assert patch_mock.call_count == 1 + def test_set_results_metadata_uri(self, requests_mock, oidc_mock, ejr): + handler = self._handle_patch_jobs( + oidc_mock=oidc_mock, expected_data={"results_metadata_uri": "s3://bucket/path/to/job_metadata.json"} + ) + patch_mock = requests_mock.patch(f"{self.EJR_API_URL}/jobs/job-123", json=handler) + + ejr.set_results_metadata_uri(job_id="job-123", results_metadata_uri="s3://bucket/path/to/job_metadata.json") + assert patch_mock.call_count == 1 + def test_set_results_metadata(self, requests_mock, oidc_mock, ejr): handler = self._handle_patch_jobs( oidc_mock=oidc_mock, expected_data={