Skip to content

amazon_security_lake: fix handling of JSON encoded flattened fields #14323

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jun 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions packages/amazon_security_lake/changelog.yml
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
# newer versions go on top
- version: "2.5.2"
changes:
- description: Fix handling `ocsf.api.request.data` and `ocsf.api.response.data` when they are a JSON encoded object.
type: bugfix
link: https://github.com/elastic/integrations/pull/14323
- version: "2.5.1"
changes:
- description: Fix handling of SQS worker count configuration.
Expand Down

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -30,37 +30,40 @@ processors:
value: 'Processor {{{_ingest.on_failure_processor_type}}} with tag {{{_ingest.on_failure_processor_tag}}} in pipeline {{{_ingest.on_failure_pipeline}}} failed with message: {{{_ingest.on_failure_message}}}'
source: >-
def convertToMilliseconds(long timestamp) {
if ((long)1e19 - 1 < timestamp) {
throw new IllegalArgumentException("Timestamp format not recognized: " + timestamp);
} else if ((long)1e16 - 1 < timestamp) {
return timestamp / 1000000; // Convert nanoseconds to milliseconds
} else if ((long)1e13 - 1 < timestamp) {
return timestamp / 1000; // Convert microseconds to milliseconds
} else if ((long)1e10 - 1 < timestamp) {
return timestamp; // Already in milliseconds, no conversion needed
} else {
return timestamp * 1000; // Convert seconds to milliseconds
}
if ((long)1e19 - 1 < timestamp) {
throw new IllegalArgumentException("Timestamp format not recognized: " + timestamp);
} else if ((long)1e16 - 1 < timestamp) {
return timestamp / 1000000; // Convert nanoseconds to milliseconds
} else if ((long)1e13 - 1 < timestamp) {
return timestamp / 1000; // Convert microseconds to milliseconds
} else if ((long)1e10 - 1 < timestamp) {
return timestamp; // Already in milliseconds, no conversion needed
} else {
return timestamp * 1000; // Convert seconds to milliseconds
}
}

def processFields(Map fields) {
for (entry in fields.entrySet()) {
def fieldName = entry.getKey();
def fieldValue = entry.getValue();
// Check if the field is a nested object (Map)
if (fieldValue instanceof Map) {
// Recursively process nested objects
processFields((Map) fieldValue);
} else if (fieldName.endsWith('time') || fieldName.endsWith('_time')) {
// If the field name ends with "time" or "_time" and is a number, convert it
if (fieldValue instanceof Number) {
fields[fieldName] = convertToMilliseconds(((Number) fieldValue).longValue());
}
}
def processFields(Map fields) {
if (fields == null) {
return null;
}
for (entry in fields.entrySet()) {
def fieldName = entry.getKey();
def fieldValue = entry.getValue();
// Check if the field is a nested object (Map)
if (fieldValue instanceof Map) {
// Recursively process nested objects
processFields((Map) fieldValue);
} else if (fieldName.endsWith('time') || fieldName.endsWith('_time')) {
// If the field name ends with "time" or "_time" and is a number, convert it
if (fieldValue instanceof Number) {
fields[fieldName] = convertToMilliseconds(((Number) fieldValue).longValue());
}
return null;
}
}
processFields(ctx.ocsf);
return null;
}
processFields(ctx.ocsf);

- rename:
field: ocsf.resource
Expand Down Expand Up @@ -402,6 +405,28 @@ processors:
- append:
field: error.message
value: 'Processor {{{_ingest.on_failure_processor_type}}} with tag {{{_ingest.on_failure_processor_tag}}} in pipeline {{{_ingest.on_failure_pipeline}}} failed with message: {{{_ingest.on_failure_message}}}'
- json:
field: ocsf.api.request.data
if: ctx.ocsf?.api?.request?.data instanceof String
on_failure:
- rename:
field: ocsf.api.request.data
target_field: ocsf.api.request.data.value
- rename:
field: ocsf.api.request.data
target_field: ocsf.api.request.data.value
if: ctx.ocsf?.api?.request?.data != null && !(ctx.ocsf.api.request.data instanceof Map)
- json:
field: ocsf.api.response.data
if: ctx.ocsf?.api?.response?.data instanceof String
on_failure:
- rename:
field: ocsf.api.response.data
target_field: ocsf.api.response.data.value
- rename:
field: ocsf.api.response.data
target_field: ocsf.api.response.data.value
if: ctx.ocsf?.api?.response?.data != null && !(ctx.ocsf.api.response.data instanceof Map)
- convert:
field: ocsf.activity_id
tag: convert_activity_id_to_string
Expand Down
2 changes: 1 addition & 1 deletion packages/amazon_security_lake/manifest.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
format_version: "3.0.3"
name: amazon_security_lake
title: Amazon Security Lake
version: "2.5.1"
version: "2.5.2"
description: Collect logs from Amazon Security Lake with Elastic Agent.
type: integration
categories: ["aws", "security"]
Expand Down