diff --git a/CHANGELOG.md b/CHANGELOG.md index e9807fed..112fa4b2 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -22,6 +22,7 @@ and start a new "In Progress" section above it. ## In progress: 0.137.0 - Add `ephemeral_flask_server` testing utility (`openeo_driver.testing`) for request mocking based on a Flask app. Allows to do request/response mocking independently from actual request library (`requests`, `urllib`, `urllib3`, etc.) through a well-documented API (Flask). +- Support exposing auxiliary (non-asset) files as links ([Open-EO/openeo-geopyspark-driver#1278](https://github.com/Open-EO/openeo-geopyspark-driver/issues/1278)) ## 0.136.0 diff --git a/openeo_driver/_version.py b/openeo_driver/_version.py index b2ec02c3..ebb30f54 100644 --- a/openeo_driver/_version.py +++ b/openeo_driver/_version.py @@ -1 +1 @@ -__version__ = "0.137.0a1" +__version__ = "0.137.0a2" diff --git a/openeo_driver/constants.py b/openeo_driver/constants.py index 6713b502..04bb8601 100644 --- a/openeo_driver/constants.py +++ b/openeo_driver/constants.py @@ -61,3 +61,7 @@ class JOB_STATUS: DEFAULT_LOG_LEVEL_PROCESSING = "info" # Default value for `level in `GET /jobs/{job_id}/logs`, `GET /services/{service_id}/logs` requests DEFAULT_LOG_LEVEL_RETRIEVAL = "debug" + + +class ITEM_LINK_PROPERTY: + EXPOSE_AUXILIARY = "_expose_auxiliary" diff --git a/openeo_driver/views.py b/openeo_driver/views.py index ffccedc1..173d05fa 100644 --- a/openeo_driver/views.py +++ b/openeo_driver/views.py @@ -9,6 +9,7 @@ import typing from collections import defaultdict, namedtuple from typing import Callable, List, Optional, Tuple, Union +from urllib.parse import urlparse import flask import flask_cors @@ -48,6 +49,7 @@ from openeo_driver.constants import ( DEFAULT_LOG_LEVEL_PROCESSING, DEFAULT_LOG_LEVEL_RETRIEVAL, + ITEM_LINK_PROPERTY, JOB_STATUS, STAC_EXTENSION, ) @@ -1442,6 +1444,42 @@ def _get_job_result_item11(job_id, item_id, user_id): geometry = BoundingBox.from_wsen_tuple(job_info.proj_bbox, job_info.epsg).as_polygon() geometry = mapping(reproject_geometry(geometry, CRS.from_epsg(job_info.epsg), CRS.from_epsg(4326))) + exposable_links = [ + link for link in item_metadata.get("links", []) if link.get(ITEM_LINK_PROPERTY.EXPOSE_AUXILIARY, False) + ] + for link in exposable_links: + link.pop(ITEM_LINK_PROPERTY.EXPOSE_AUXILIARY) + auxiliary_filename = urlparse(link["href"]).path.split("/")[-1] # TODO: assumes file is not nested + + if link["href"].startswith("s3://"): + link["href"] = backend_implementation.config.asset_url.build_url( + asset_metadata={"href": link["href"]}, # TODO: clean up this hack to support s3proxy + asset_name=auxiliary_filename, + job_id=job_id, + user_id=user_id, + ) + else: + signer = get_backend_config().url_signer + if signer: + expires = signer.get_expires() + secure_key = signer.sign_job_asset( + job_id=job_id, user_id=user_id, filename=auxiliary_filename, expires=expires + ) + user_base64 = user_id_b64_encode(user_id) + link["href"] = flask.url_for( + ".download_job_auxiliary_file_signed", + job_id=job_id, + user_base64=user_base64, + filename=auxiliary_filename, + expires=expires, + secure_key=secure_key, + _external=True, + ) + else: + link["href"] = flask.url_for( + ".download_job_auxiliary_file", job_id=job_id, filename=auxiliary_filename, _external=True + ) + stac_item = { "type": "Feature", "stac_version": "1.1.0", @@ -1466,7 +1504,8 @@ def _get_job_result_item11(job_id, item_id, user_id): "href": url_for(".list_job_results", job_id=job_id, _external=True), # SHOULD be absolute "type": "application/json", }, - ], + ] + + exposable_links, "assets": assets, "collection": job_id, } @@ -1483,6 +1522,42 @@ def _get_job_result_item11(job_id, item_id, user_id): resp.mimetype = stac_item_media_type return resp + @blueprint.route("/jobs//results/aux///", methods=["GET"]) + def download_job_auxiliary_file_signed(job_id, user_base64, secure_key, filename): + expires = request.args.get("expires") + signer = get_backend_config().url_signer + user_id = user_id_b64_decode(user_base64) + signer.verify_job_asset( + signature=secure_key, job_id=job_id, user_id=user_id, filename=filename, expires=expires + ) + return _download_job_auxiliary_file(job_id=job_id, filename=filename, user_id=user_id) + + @blueprint.route("/jobs//results/aux/", methods=["GET"]) + @auth_handler.requires_bearer_auth + def download_job_auxiliary_file(job_id, filename, user: User): + return _download_job_auxiliary_file(job_id, filename, user.user_id) + + def _download_job_auxiliary_file(job_id, filename, user_id): + metadata = backend_implementation.batch_jobs.get_result_metadata(job_id=job_id, user_id=user_id) + + auxiliary_links = [ + link + for item in metadata.items.values() + for link in item.get("links", []) + if link.get(ITEM_LINK_PROPERTY.EXPOSE_AUXILIARY, False) and link["href"].endswith(f"/{filename}") + ] + + if not auxiliary_links: + raise FilePathInvalidException(f"invalid file {filename!r}") + + auxiliary_link = auxiliary_links[0] + uri_parts = urlparse(auxiliary_link["href"]) + + # S3 URIs are handled by s3proxy + assert uri_parts.scheme in ["", "file"], f"unexpected scheme {uri_parts.scheme}" + + auxiliary_file = pathlib.Path(uri_parts.path) + return send_from_directory(auxiliary_file.parent, auxiliary_file.name, mimetype=auxiliary_link.get("type")) def _get_job_result_item(job_id, item_id, user_id): if item_id == DriverMlModel.METADATA_FILE_NAME: diff --git a/tests/test_views.py b/tests/test_views.py index de944d03..74c94b99 100644 --- a/tests/test_views.py +++ b/tests/test_views.py @@ -31,6 +31,7 @@ not_implemented, ) from openeo_driver.config import OpenEoBackendConfig +from openeo_driver.constants import ITEM_LINK_PROPERTY from openeo_driver.datacube import DriverVectorCube from openeo_driver.dummy import dummy_backend, dummy_config from openeo_driver.dummy.dummy_backend import DummyBackendImplementation, DummyProcessing, DummyProcessRegistry @@ -3004,6 +3005,14 @@ def test_get_stac_1_1_item(self, api110, backend_implementation, backend_config_ "id": "5d2db643-5cc3-4b27-8ef3-11f7d203b221_2023-12-31T21:41:00Z", "properties": {"datetime": "2023-12-31T21:41:00Z"}, "bbox": [3.359808992021044, 51.08284561357965, 4.690166134878123, 51.88641704215104], + "links": [ + { + "rel": "custom", + "href": "/data/projects/OpenEO/07024ee9-7847-4b8a-b260-6c879a2b3cdc/07024ee9-7847-4b8a-b260-6c879a2b3cdc_input_items_9569134155392213115.json", + "type": "application/json", + ITEM_LINK_PROPERTY.EXPOSE_AUXILIARY: True, + }, + ], } } ), @@ -3042,10 +3051,16 @@ def test_get_stac_1_1_item(self, api110, backend_implementation, backend_config_ 'type': 'application/geo+json' }, { - 'href': 'http://oeo.net/openeo/1.1.0/jobs/07024ee9-7847-4b8a-b260-6c879a2b3cdc/results', - 'rel': 'collection', - 'type': 'application/json' - } + "href": "http://oeo.net/openeo/1.1.0/jobs/07024ee9-7847-4b8a-b260-6c879a2b3cdc/results", + "rel": "collection", + "type": "application/json", + }, + { + "rel": "custom", + # TODO: what does the URL look like? Currently /aux instead of /assets; should /items be in there? + "href": "http://oeo.net/openeo/1.1.0/jobs/07024ee9-7847-4b8a-b260-6c879a2b3cdc/results/aux/TXIuVGVzdA==/a0274432f627ca9cf9b4ff79d57c61bd/07024ee9-7847-4b8a-b260-6c879a2b3cdc_input_items_9569134155392213115.json", + "type": "application/json", + }, ], 'properties': {'datetime': '2023-12-31T21:41:00Z'}, 'stac_extensions': ['https://stac-extensions.github.io/eo/v1.1.0/schema.json', @@ -3055,6 +3070,99 @@ def test_get_stac_1_1_item(self, api110, backend_implementation, backend_config_ 'type': 'Feature' } + @mock.patch("time.time", mock.MagicMock(return_value=1234)) + @pytest.mark.parametrize("backend_config_overrides", [{"url_signer": UrlSigner(secret="123&@#", expiration=1000)}]) + def test_download_job_auxiliary_file_signed_with_expiration(self, api110, tmp_path, backend_config_overrides): + job_id = "07024ee9-7847-4b8a-b260-6c879a2b3cdc" + job_dir = tmp_path + auxiliary_file = job_dir / "07024ee9-7847-4b8a-b260-6c879a2b3cdc_input_items_9569134155392213115.json" + + with open(auxiliary_file, "w") as f: + f.write("aux") + + with self._fresh_job_registry(): + dummy_backend.DummyBatchJobs.set_result_metadata( + job_id=job_id, + user_id=TEST_USER, + metadata=BatchJobResultMetadata( + items={ + "5d2db643-5cc3-4b27-8ef3-11f7d203b221_2023-12-31T21:41:00Z": { + "geometry": { + "coordinates": [ + [ + [3.359808992021044, 51.08284561357965], + [3.359808992021044, 51.88641704215104], + [4.690166134878123, 51.88641704215104], + [4.690166134878123, 51.08284561357965], + [3.359808992021044, 51.08284561357965], + ] + ], + "type": "Polygon", + }, + "assets": { + "openEO": { + "datetime": "2023-12-31T21:41:00Z", + "roles": ["data"], + "bbox": [ + 3.359808992021044, + 51.08284561357965, + 4.690166134878123, + 51.88641704215104, + ], + "geometry": { + "coordinates": [ + [ + [3.359808992021044, 51.08284561357965], + [3.359808992021044, 51.88641704215104], + [4.690166134878123, 51.88641704215104], + [4.690166134878123, 51.08284561357965], + [3.359808992021044, 51.08284561357965], + ] + ], + "type": "Polygon", + }, + "href": "s3://openeo-data-staging-waw4-1/batch_jobs/j-250605095828442799fdde3c29b5b047/openEO_20231231T214100Z.tif", + "nodata": "nan", + "type": "image/tiff; application=geotiff", + "bands": [ + {"name": "LST", "common_name": "surface_temperature", "aliases": ["LST_in:LST"]} + ], + "raster:bands": [ + { + "name": "LST", + "statistics": { + "valid_percent": 66.88, + "maximum": 281.04800415039, + "stddev": 19.598456945276, + "minimum": 224.46798706055, + "mean": 259.57087672984, + }, + } + ], + } + }, + "id": "5d2db643-5cc3-4b27-8ef3-11f7d203b221_2023-12-31T21:41:00Z", + "properties": {"datetime": "2023-12-31T21:41:00Z"}, + "bbox": [3.359808992021044, 51.08284561357965, 4.690166134878123, 51.88641704215104], + "links": [ + { + "rel": "custom", + "href": str(auxiliary_file), + "type": "application/json", + ITEM_LINK_PROPERTY.EXPOSE_AUXILIARY: True, + }, + ], + } + } + ), + ) + + resp = api110.get( + "/jobs/07024ee9-7847-4b8a-b260-6c879a2b3cdc/results/aux/TXIuVGVzdA==/5b3d0f30d2ad8ef3146dc0785821aac3/07024ee9-7847-4b8a-b260-6c879a2b3cdc_input_items_9569134155392213115.json?expires=2234", + ) + + assert resp.text == "aux" + def test_get_job_results_invalid_job(self, api): api.get("/jobs/deadbeef-f00/results", headers=self.AUTH_HEADER).assert_error(404, "JobNotFound")