Skip to content

Commit

Permalink
uploading temporary STAC files for export_workspace is unnecessary #1031
Browse files Browse the repository at this point in the history
  • Loading branch information
bossie committed Feb 4, 2025
1 parent 0d5f3e0 commit 509ad92
Showing 1 changed file with 7 additions and 13 deletions.
20 changes: 7 additions & 13 deletions openeogeotrellis/deploy/batch_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,6 @@ def run_job(
):
result_metadata = {}

# while creating the stac metadata, hrefs are temporary local paths.
stac_file_paths = []
try:
# We actually expect type Path, but in reality paths as strings tend to
# slip in anyway, so we better catch them and convert them.
Expand Down Expand Up @@ -504,7 +502,7 @@ def result_write_assets(result_arg) -> dict:

assert len(results) == len(assets_metadata)
for result, result_assets_metadata in zip(results, assets_metadata):
stac_file_paths += _export_to_workspaces(
_export_to_workspaces(
result,
result_metadata,
result_assets_metadata=result_assets_metadata,
Expand All @@ -513,14 +511,15 @@ def result_write_assets(result_arg) -> dict:
enable_merge=job_options.get("export-workspace-enable-merge", False),
)
finally:
write_metadata({**result_metadata, **_get_tracker_metadata("")}, metadata_file, stac_file_paths)
write_metadata({**result_metadata, **_get_tracker_metadata("")}, metadata_file)


def write_metadata(metadata, metadata_file, stac_file_paths: List[Union[str, Path]] = None):
def write_metadata(metadata: dict, metadata_file: Path):
with open(metadata_file, 'w') as f:
json.dump(metadata, f, default=json_default)
add_permissions(metadata_file, stat.S_IWGRP)
logger.info("wrote metadata to %s" % metadata_file)

if ConfigParams().is_kube_deploy:
if not get_backend_config().fuse_mount_batchjob_s3_bucket:
from openeogeotrellis.utils import s3_client
Expand All @@ -530,12 +529,8 @@ def write_metadata(metadata, metadata_file, stac_file_paths: List[Union[str, Pat
bucket = os.environ.get('SWIFT_BUCKET')
s3_instance = s3_client()

paths = [metadata_file] + (stac_file_paths or [])
# asset files are already uploaded by Scala code
logger.info(f"Writing results to object storage. paths={paths}")
for file_path in paths:
file_path = urlparse(str(file_path)).path
s3_instance.upload_file(file_path, bucket, file_path.strip("/"))
s3_instance.upload_file(str(metadata_file), bucket, str(metadata_file).strip("/"))
else:
_convert_job_metadatafile_outputs_to_s3_urls(metadata_file)

Expand All @@ -547,15 +542,15 @@ def _export_to_workspaces(
job_dir: Path,
remove_exported_assets: bool,
enable_merge: bool,
) -> List[str]:
):
workspace_repository: WorkspaceRepository = backend_config_workspace_repository
workspace_exports = sorted(
list(result.workspace_exports),
key=lambda export: export.workspace_id + (export.merge or ""), # arbitrary but deterministic order of hrefs
)

if not workspace_exports:
return []
return

stac_hrefs = [
f"file:{path}"
Expand Down Expand Up @@ -625,7 +620,6 @@ def _export_to_workspaces(

if alternate:
result_metadata["assets"][asset_key]["alternate"] = alternate
return stac_hrefs


def _export_to_workspace(
Expand Down

0 comments on commit 509ad92

Please sign in to comment.