Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

simplify writing of job_metadata.json #1035

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 21 additions & 25 deletions openeogeotrellis/deploy/batch_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,13 @@
from openeogeotrellis.deploy import load_custom_processes
from openeogeotrellis.deploy.batch_job_metadata import (
_assemble_result_metadata,
_convert_job_metadatafile_outputs_to_s3_urls,
_convert_asset_outputs_to_s3_urls,
_get_tracker_metadata,
_transform_stac_metadata,
)
from openeogeotrellis.integrations.gdal import get_abs_path_of_asset
from openeogeotrellis.integrations.hadoop import setup_kerberos_auth
from openeogeotrellis.udf import (
UDF_PYTHON_DEPENDENCIES_ARCHIVE_NAME,
UDF_PYTHON_DEPENDENCIES_FOLDER_NAME,
build_python_udf_dependencies_archive,
collect_python_udf_dependencies,
install_python_udf_dependencies,
Expand Down Expand Up @@ -272,8 +270,6 @@ def run_job(
):
result_metadata = {}

# while creating the stac metadata, hrefs are temporary local paths.
stac_file_paths = []
try:
# We actually expect type Path, but in reality paths as strings tend to
# slip in anyway, so we better catch them and convert them.
Expand Down Expand Up @@ -504,7 +500,7 @@ def result_write_assets(result_arg) -> dict:

assert len(results) == len(assets_metadata)
for result, result_assets_metadata in zip(results, assets_metadata):
stac_file_paths += _export_to_workspaces(
_export_to_workspaces(
result,
result_metadata,
result_assets_metadata=result_assets_metadata,
Expand All @@ -513,31 +509,32 @@ def result_write_assets(result_arg) -> dict:
enable_merge=job_options.get("export-workspace-enable-merge", False),
)
finally:
write_metadata({**result_metadata, **_get_tracker_metadata("")}, metadata_file, stac_file_paths)
write_metadata({**result_metadata, **_get_tracker_metadata("")}, metadata_file)


def write_metadata(metadata: dict, metadata_file: Path):
def log_asset_hrefs(context: str):
asset_hrefs = {asset_key: asset.get("href") for asset_key, asset in metadata.get("assets", {}).items()}
logger.info(f"{context} asset hrefs: {asset_hrefs!r}")

log_asset_hrefs("input")
if ConfigParams().is_kube_deploy:
metadata = _convert_asset_outputs_to_s3_urls(metadata)
log_asset_hrefs("output")

def write_metadata(metadata, metadata_file, stac_file_paths: List[Union[str, Path]] = None):
with open(metadata_file, 'w') as f:
json.dump(metadata, f, default=json_default)
add_permissions(metadata_file, stat.S_IWGRP)
logger.info("wrote metadata to %s" % metadata_file)
if ConfigParams().is_kube_deploy:
if not get_backend_config().fuse_mount_batchjob_s3_bucket:
from openeogeotrellis.utils import s3_client

_convert_job_metadatafile_outputs_to_s3_urls(metadata_file)
if ConfigParams().is_kube_deploy and not get_backend_config().fuse_mount_batchjob_s3_bucket:
from openeogeotrellis.utils import s3_client

bucket = os.environ.get('SWIFT_BUCKET')
s3_instance = s3_client()
bucket = os.environ.get("SWIFT_BUCKET")
s3_instance = s3_client()

paths = [metadata_file] + (stac_file_paths or [])
# asset files are already uploaded by Scala code
logger.info(f"Writing results to object storage. paths={paths}")
for file_path in paths:
file_path = urlparse(str(file_path)).path
s3_instance.upload_file(file_path, bucket, file_path.strip("/"))
else:
_convert_job_metadatafile_outputs_to_s3_urls(metadata_file)
# asset files are already uploaded by Scala code
s3_instance.upload_file(str(metadata_file), bucket, str(metadata_file).strip("/"))


def _export_to_workspaces(
Expand All @@ -547,15 +544,15 @@ def _export_to_workspaces(
job_dir: Path,
remove_exported_assets: bool,
enable_merge: bool,
) -> List[str]:
):
workspace_repository: WorkspaceRepository = backend_config_workspace_repository
workspace_exports = sorted(
list(result.workspace_exports),
key=lambda export: export.workspace_id + (export.merge or ""), # arbitrary but deterministic order of hrefs
)

if not workspace_exports:
return []
return

stac_hrefs = [
f"file:{path}"
Expand Down Expand Up @@ -625,7 +622,6 @@ def _export_to_workspaces(

if alternate:
result_metadata["assets"][asset_key]["alternate"] = alternate
return stac_hrefs


def _export_to_workspace(
Expand Down
20 changes: 4 additions & 16 deletions openeogeotrellis/deploy/batch_job_metadata.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from copy import deepcopy
import json
import logging
import os
Expand Down Expand Up @@ -312,30 +313,17 @@ def convert_bbox_to_lat_long(bbox: List[int], bbox_crs: Optional[Union[str, int,
return bbox


def _convert_job_metadatafile_outputs_to_s3_urls(metadata_file: Path):
"""Convert each asset's output_dir value to a URL on S3, in the job metadata file."""
with open(metadata_file, "rt") as mdf:
metadata_to_update = json.load(mdf)
with open(metadata_file, "wt") as mdf:
_convert_asset_outputs_to_s3_urls(metadata_to_update)
json.dump(metadata_to_update, mdf)


def _convert_asset_outputs_to_s3_urls(job_metadata: dict):
def _convert_asset_outputs_to_s3_urls(job_metadata: dict) -> dict:
"""Convert each asset's output_dir value to a URL on S3 in the metadata dictionary."""

def log_asset_hrefs(context: str):
asset_hrefs = {asset_key: asset.get("href") for asset_key, asset in job_metadata.get("assets", {}).items()}
logger.info(f"{context} asset hrefs: {asset_hrefs!r}")

log_asset_hrefs("input")
job_metadata = deepcopy(job_metadata)

out_assets = job_metadata.get("assets", {})
for asset in out_assets.values():
if "href" in asset and not asset["href"].startswith("s3://"):
asset["href"] = to_s3_url(asset["href"])

log_asset_hrefs("output")
return job_metadata


def _transform_stac_metadata(job_dir: Path):
Expand Down
24 changes: 1 addition & 23 deletions tests/deploy/test_batch_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
)
from openeogeotrellis.deploy.batch_job_metadata import (
_convert_asset_outputs_to_s3_urls,
_convert_job_metadatafile_outputs_to_s3_urls,
_get_tracker,
extract_result_metadata,
)
Expand Down Expand Up @@ -1853,33 +1852,12 @@ def test_convert_asset_outputs_to_s3_urls():
"""Test that it converts a metadata dict, translating each output_dir to a URL with the s3:// scheme."""

metadata = get_job_metadata_without_s3(Path("/data/projects/OpenEO/6d11e901-bb5d-4589-b600-8dfb50524740/"))
_convert_asset_outputs_to_s3_urls(metadata)
metadata = _convert_asset_outputs_to_s3_urls(metadata)

assert metadata['assets']['openEO_2017-11-21Z.tif']["href"].startswith("s3://")
assert metadata['assets']['a-second-asset-file.tif']["href"].startswith("s3://")


def test_convert_job_metadatafile_outputs_to_s3_urls(tmp_path):
"""Test that it processes the file on disk, converting each output_dir to a URL with the s3:// scheme."""

job_id = "6d11e901-bb5d-4589-b600-8dfb50524740"
job_dir = (tmp_path / job_id)
metadata_path = job_dir / "whatever.json"
job_dir.mkdir(parents=True)
metadata = get_job_metadata_without_s3(job_dir)

with open(metadata_path, "wt") as md_file:
json.dump(metadata, md_file)

_convert_job_metadatafile_outputs_to_s3_urls(metadata_path)

with open(metadata_path, "rt") as md_file:
converted_metadata = json.load(md_file)

assert converted_metadata['assets']['openEO_2017-11-21Z.tif']["href"].startswith("s3://")
assert converted_metadata['assets']['a-second-asset-file.tif']["href"].startswith("s3://")


class TestUdfDependenciesHandling:
def _get_process_graph(self) -> dict:
"""
Expand Down