Skip to content

Commit

Permalink
Update lambda handler to act on ObjectRemoved* s3 events
Browse files Browse the repository at this point in the history
This changeset adds the stub of a delete_model_output function to the
ModelOutputHandler and calls it in the lambda handler.
  • Loading branch information
bsweger committed Jan 23, 2025
1 parent ce4d7fc commit eaa3338
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 19 deletions.
40 changes: 21 additions & 19 deletions faas/lambda_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,27 +27,29 @@ def lambda_handler(event, context):
bucket = event["Records"][0]["s3"]["bucket"]["name"]
key = urllib.parse.unquote_plus(event["Records"][0]["s3"]["object"]["key"], encoding="utf-8")

# Below is some old testing code that we were using to ignore all file
# types that don't have a supported extension. It's commented-out now, to ensure
# that we don't have to update this handler every time we add support for a new
# file type in the model-output transforms.
# extensions = [".csv", ".parquet"]
# if not any(ext in key.lower() for ext in extensions):
# print(f"{key} is not a supported file type, skipping")
# return

# Until we implement ModelOutputHandler functionality to act on deleted model-output files, ignore any ObjectDelete
# events that are triggered by by a hub's S3 bucket.
if "objectcreated" not in event_name.lower():
logger.info(f"Event type {event_source}:{event_name} is not supported, skipping")
return

logger.info(f"Transforming file: {bucket}/{key}")
try:
mo = ModelOutputHandler.from_s3(bucket, key)
mo.transform_model_output()
if "objectcreated" in event_name.lower():
logger.info(f"Adding file: {bucket}/{key}")
mo.add_model_output()
elif "objectremoved" in event_name.lower():
logger.info("Deleting file: {bucket}/{key}")
mo.delete_model_output()
else:
logger.info({
"msg": "Event type not supported, skipping",
"event_source": event_source,
"event": event_name,
"file": f"{bucket}/{key}"
})
return
except UserWarning:
pass
except Exception as e:
logger.exception(f"Error transforming file: {bucket}/{key}")
raise e
logger.exception({
"msg": "Error handling file",
"event": event_name,
"event_source": event_source,
"file": f"{bucket}/{key}",
"error": str(e)
})
4 changes: 4 additions & 0 deletions src/hubverse_transform/model_output.py
Original file line number Diff line number Diff line change
Expand Up @@ -259,3 +259,7 @@ def add_model_output(self) -> str:
transformed_file_path = self.write_parquet(updated_model_output_table)

return transformed_file_path

def delete_model_output(self) -> str:
"""Delete specified model-output file."""
return "Not implemented"

0 comments on commit eaa3338

Please sign in to comment.