Skip to content

Commit

Permalink
avoid unnecessary I/O in mounted batch job directory
Browse files Browse the repository at this point in the history
* uploading temporary STAC files for export_workspace is unnecessary #1031

* avoid unnecessary writes of job_metadata.json #1031
  • Loading branch information
bossie authored Feb 4, 2025
1 parent 0d5f3e0 commit 4f57b9b
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 64 deletions.
46 changes: 21 additions & 25 deletions openeogeotrellis/deploy/batch_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,13 @@
from openeogeotrellis.deploy import load_custom_processes
from openeogeotrellis.deploy.batch_job_metadata import (
_assemble_result_metadata,
_convert_job_metadatafile_outputs_to_s3_urls,
_convert_asset_outputs_to_s3_urls,
_get_tracker_metadata,
_transform_stac_metadata,
)
from openeogeotrellis.integrations.gdal import get_abs_path_of_asset
from openeogeotrellis.integrations.hadoop import setup_kerberos_auth
from openeogeotrellis.udf import (
UDF_PYTHON_DEPENDENCIES_ARCHIVE_NAME,
UDF_PYTHON_DEPENDENCIES_FOLDER_NAME,
build_python_udf_dependencies_archive,
collect_python_udf_dependencies,
install_python_udf_dependencies,
Expand Down Expand Up @@ -272,8 +270,6 @@ def run_job(
):
result_metadata = {}

# while creating the stac metadata, hrefs are temporary local paths.
stac_file_paths = []
try:
# We actually expect type Path, but in reality paths as strings tend to
# slip in anyway, so we better catch them and convert them.
Expand Down Expand Up @@ -504,7 +500,7 @@ def result_write_assets(result_arg) -> dict:

assert len(results) == len(assets_metadata)
for result, result_assets_metadata in zip(results, assets_metadata):
stac_file_paths += _export_to_workspaces(
_export_to_workspaces(
result,
result_metadata,
result_assets_metadata=result_assets_metadata,
Expand All @@ -513,31 +509,32 @@ def result_write_assets(result_arg) -> dict:
enable_merge=job_options.get("export-workspace-enable-merge", False),
)
finally:
write_metadata({**result_metadata, **_get_tracker_metadata("")}, metadata_file, stac_file_paths)
write_metadata({**result_metadata, **_get_tracker_metadata("")}, metadata_file)


def write_metadata(metadata: dict, metadata_file: Path):
def log_asset_hrefs(context: str):
asset_hrefs = {asset_key: asset.get("href") for asset_key, asset in metadata.get("assets", {}).items()}
logger.info(f"{context} asset hrefs: {asset_hrefs!r}")

log_asset_hrefs("input")
if ConfigParams().is_kube_deploy:
metadata = _convert_asset_outputs_to_s3_urls(metadata)
log_asset_hrefs("output")

def write_metadata(metadata, metadata_file, stac_file_paths: List[Union[str, Path]] = None):
with open(metadata_file, 'w') as f:
json.dump(metadata, f, default=json_default)
add_permissions(metadata_file, stat.S_IWGRP)
logger.info("wrote metadata to %s" % metadata_file)
if ConfigParams().is_kube_deploy:
if not get_backend_config().fuse_mount_batchjob_s3_bucket:
from openeogeotrellis.utils import s3_client

_convert_job_metadatafile_outputs_to_s3_urls(metadata_file)
if ConfigParams().is_kube_deploy and not get_backend_config().fuse_mount_batchjob_s3_bucket:
from openeogeotrellis.utils import s3_client

bucket = os.environ.get('SWIFT_BUCKET')
s3_instance = s3_client()
bucket = os.environ.get("SWIFT_BUCKET")
s3_instance = s3_client()

paths = [metadata_file] + (stac_file_paths or [])
# asset files are already uploaded by Scala code
logger.info(f"Writing results to object storage. paths={paths}")
for file_path in paths:
file_path = urlparse(str(file_path)).path
s3_instance.upload_file(file_path, bucket, file_path.strip("/"))
else:
_convert_job_metadatafile_outputs_to_s3_urls(metadata_file)
# asset files are already uploaded by Scala code
s3_instance.upload_file(str(metadata_file), bucket, str(metadata_file).strip("/"))


def _export_to_workspaces(
Expand All @@ -547,15 +544,15 @@ def _export_to_workspaces(
job_dir: Path,
remove_exported_assets: bool,
enable_merge: bool,
) -> List[str]:
):
workspace_repository: WorkspaceRepository = backend_config_workspace_repository
workspace_exports = sorted(
list(result.workspace_exports),
key=lambda export: export.workspace_id + (export.merge or ""), # arbitrary but deterministic order of hrefs
)

if not workspace_exports:
return []
return

stac_hrefs = [
f"file:{path}"
Expand Down Expand Up @@ -625,7 +622,6 @@ def _export_to_workspaces(

if alternate:
result_metadata["assets"][asset_key]["alternate"] = alternate
return stac_hrefs


def _export_to_workspace(
Expand Down
20 changes: 4 additions & 16 deletions openeogeotrellis/deploy/batch_job_metadata.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from copy import deepcopy
import json
import logging
import os
Expand Down Expand Up @@ -312,30 +313,17 @@ def convert_bbox_to_lat_long(bbox: List[int], bbox_crs: Optional[Union[str, int,
return bbox


def _convert_job_metadatafile_outputs_to_s3_urls(metadata_file: Path):
"""Convert each asset's output_dir value to a URL on S3, in the job metadata file."""
with open(metadata_file, "rt") as mdf:
metadata_to_update = json.load(mdf)
with open(metadata_file, "wt") as mdf:
_convert_asset_outputs_to_s3_urls(metadata_to_update)
json.dump(metadata_to_update, mdf)


def _convert_asset_outputs_to_s3_urls(job_metadata: dict):
def _convert_asset_outputs_to_s3_urls(job_metadata: dict) -> dict:
"""Convert each asset's output_dir value to a URL on S3 in the metadata dictionary."""

def log_asset_hrefs(context: str):
asset_hrefs = {asset_key: asset.get("href") for asset_key, asset in job_metadata.get("assets", {}).items()}
logger.info(f"{context} asset hrefs: {asset_hrefs!r}")

log_asset_hrefs("input")
job_metadata = deepcopy(job_metadata)

out_assets = job_metadata.get("assets", {})
for asset in out_assets.values():
if "href" in asset and not asset["href"].startswith("s3://"):
asset["href"] = to_s3_url(asset["href"])

log_asset_hrefs("output")
return job_metadata


def _transform_stac_metadata(job_dir: Path):
Expand Down
24 changes: 1 addition & 23 deletions tests/deploy/test_batch_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@
)
from openeogeotrellis.deploy.batch_job_metadata import (
_convert_asset_outputs_to_s3_urls,
_convert_job_metadatafile_outputs_to_s3_urls,
_get_tracker,
extract_result_metadata,
)
Expand Down Expand Up @@ -1853,33 +1852,12 @@ def test_convert_asset_outputs_to_s3_urls():
"""Test that it converts a metadata dict, translating each output_dir to a URL with the s3:// scheme."""

metadata = get_job_metadata_without_s3(Path("/data/projects/OpenEO/6d11e901-bb5d-4589-b600-8dfb50524740/"))
_convert_asset_outputs_to_s3_urls(metadata)
metadata = _convert_asset_outputs_to_s3_urls(metadata)

assert metadata['assets']['openEO_2017-11-21Z.tif']["href"].startswith("s3://")
assert metadata['assets']['a-second-asset-file.tif']["href"].startswith("s3://")


def test_convert_job_metadatafile_outputs_to_s3_urls(tmp_path):
"""Test that it processes the file on disk, converting each output_dir to a URL with the s3:// scheme."""

job_id = "6d11e901-bb5d-4589-b600-8dfb50524740"
job_dir = (tmp_path / job_id)
metadata_path = job_dir / "whatever.json"
job_dir.mkdir(parents=True)
metadata = get_job_metadata_without_s3(job_dir)

with open(metadata_path, "wt") as md_file:
json.dump(metadata, md_file)

_convert_job_metadatafile_outputs_to_s3_urls(metadata_path)

with open(metadata_path, "rt") as md_file:
converted_metadata = json.load(md_file)

assert converted_metadata['assets']['openEO_2017-11-21Z.tif']["href"].startswith("s3://")
assert converted_metadata['assets']['a-second-asset-file.tif']["href"].startswith("s3://")


class TestUdfDependenciesHandling:
def _get_process_graph(self) -> dict:
"""
Expand Down

0 comments on commit 4f57b9b

Please sign in to comment.