Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
271 changes: 216 additions & 55 deletions plugins/out_vivo_exporter/vivo.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,61 @@
#include <fluent-bit/flb_output_plugin.h>
#include <fluent-bit/flb_kv.h>
#include <fluent-bit/flb_pack.h>
#include <fluent-bit/flb_mp.h>
#include <fluent-bit/flb_log_event_decoder.h>
#include <fluent-bit/flb_log_event_encoder.h>
#include <string.h>

#include "vivo.h"
#include "vivo_http.h"
#include "vivo_stream.h"

static flb_sds_t format_logs(struct flb_event_chunk *event_chunk, struct flb_config *config)
static msgpack_object *find_map_value(msgpack_object *map,
const char *key, size_t key_len)
{
struct flb_log_event_decoder log_decoder;
struct flb_log_event log_event;
size_t i;

if (!map || map->type != MSGPACK_OBJECT_MAP) {
return NULL;
}

for (i = 0; i < map->via.map.size; i++) {
if (map->via.map.ptr[i].key.type != MSGPACK_OBJECT_STR) {
continue;
}

if (map->via.map.ptr[i].key.via.str.size == key_len &&
strncmp(map->via.map.ptr[i].key.via.str.ptr, key, key_len) == 0) {
return &map->via.map.ptr[i].val;
}
}

return NULL;
}

static flb_sds_t format_logs(struct flb_input_instance *src_ins,
struct flb_event_chunk *event_chunk, struct flb_config *config)
{
int len;
int result;
int i;
char *name;
flb_sds_t out_js;
flb_sds_t out_buf = NULL;
msgpack_sbuffer tmp_sbuf;
msgpack_packer tmp_pck;
int group_mismatch = FLB_FALSE;
int is_otlp = FLB_FALSE;
struct flb_log_event log_event;
struct flb_log_event_decoder log_decoder;
struct flb_mp_map_header mh;
struct flb_mp_map_header root_map;
struct flb_mp_map_header otlp_map;
struct flb_mp_map_header group_map;
msgpack_object *group_metadata = NULL;
msgpack_object *group_attributes = NULL;
msgpack_object *schema_value = NULL;
msgpack_object *resource_value = NULL;
msgpack_object *scope_value = NULL;

result = flb_log_event_decoder_init(&log_decoder,
(char *) event_chunk->data,
Expand All @@ -56,90 +94,212 @@ static flb_sds_t format_logs(struct flb_event_chunk *event_chunk, struct flb_con
msgpack_sbuffer_init(&tmp_sbuf);
msgpack_packer_init(&tmp_pck, &tmp_sbuf, msgpack_sbuffer_write);

/*
* Here is an example of the packaging done for Logs
*
* {
* "source_type": "forward",
* "source_name": "forward.0",
* "tag": "dummy.0",
* "records": [
* {
* "timestamp": 1759591426808913765,
* "metadata": {
* "level": "info"
* },
* "message": "dummy"
* },
* {
* "timestamp": 1759591426908563348,
* "metadata": {
* "level": "debug",
* "service": "auth"
* },
* "message": "dummy"
* }
* ]
* }
*/

flb_mp_map_header_init(&root_map, &tmp_pck);

/* source_type: internal type of the plugin */
flb_mp_map_header_append(&root_map);
name = src_ins->p->name;
len = strlen(name);

msgpack_pack_str(&tmp_pck, 11);
msgpack_pack_str_body(&tmp_pck, "source_type", 11);
msgpack_pack_str(&tmp_pck, len);
msgpack_pack_str_body(&tmp_pck, name, len);

/* source_name: internal name or alias set by the user */
flb_mp_map_header_append(&root_map);
name = (char *) flb_input_name(src_ins);
len = strlen(name);
msgpack_pack_str(&tmp_pck, 11);
msgpack_pack_str_body(&tmp_pck, "source_name", 11);
msgpack_pack_str(&tmp_pck, len);
msgpack_pack_str_body(&tmp_pck, name, len);

/* tag */
flb_mp_map_header_append(&root_map);
msgpack_pack_str(&tmp_pck, 3);
msgpack_pack_str_body(&tmp_pck, "tag", 3);
msgpack_pack_str(&tmp_pck, flb_sds_len(event_chunk->tag));
msgpack_pack_str_body(&tmp_pck, event_chunk->tag, flb_sds_len(event_chunk->tag));

/* records */
flb_mp_map_header_append(&root_map);
msgpack_pack_str(&tmp_pck, 7);
msgpack_pack_str_body(&tmp_pck, "records", 7);

flb_mp_array_header_init(&mh, &tmp_pck);

while ((result = flb_log_event_decoder_next(
&log_decoder,
&log_event)) == FLB_EVENT_DECODER_SUCCESS) {

if (log_event.group_metadata != NULL) {
if (group_metadata == NULL) {
group_metadata = log_event.group_metadata;
}
else if (group_metadata != log_event.group_metadata) {
group_mismatch = FLB_TRUE;
}
}

if (log_event.group_attributes != NULL) {
if (group_attributes == NULL) {
group_attributes = log_event.group_attributes;
}
else if (group_attributes != log_event.group_attributes) {
group_mismatch = FLB_TRUE;
}
}

flb_mp_array_header_append(&mh);

/*
* If the caller specified FLB_PACK_JSON_DATE_FLUENT, we format the data
* by using the following structure:
*
* [[TIMESTAMP, {"_tag": "...", ...MORE_METADATA}], {RECORD CONTENT}]
* [[TIMESTAMP, {"....": "...", ...MORE_METADATA}], {RECORD CONTENT}]
*/
msgpack_pack_array(&tmp_pck, 2);
msgpack_pack_array(&tmp_pck, 2);
msgpack_pack_uint64(&tmp_pck, flb_time_to_nanosec(&log_event.timestamp));

/* add tag only */
msgpack_pack_map(&tmp_pck, 1 + log_event.metadata->via.map.size);
/* pack metadata */
msgpack_pack_object(&tmp_pck, *log_event.metadata);

msgpack_pack_str(&tmp_pck, 4);
msgpack_pack_str_body(&tmp_pck, "_tag", 4);
/* pack the remaining content */
msgpack_pack_object(&tmp_pck, *log_event.body);
}

msgpack_pack_str(&tmp_pck, flb_sds_len(event_chunk->tag));
msgpack_pack_str_body(&tmp_pck, event_chunk->tag, flb_sds_len(event_chunk->tag));
flb_mp_array_header_end(&mh);

/* Append remaining keys/values */
for (i = 0;
i < log_event.metadata->via.map.size;
i++) {
msgpack_pack_object(&tmp_pck,
log_event.metadata->via.map.ptr[i].key);
msgpack_pack_object(&tmp_pck,
log_event.metadata->via.map.ptr[i].val);
if (group_mismatch == FLB_FALSE &&
(group_metadata != NULL || group_attributes != NULL)) {
if (group_metadata != NULL) {
schema_value = find_map_value(group_metadata, "schema", 6);
}

/* pack the remaining content */
msgpack_pack_map(&tmp_pck, log_event.body->via.map.size);

/* Append remaining keys/values */
for (i = 0;
i < log_event.body->via.map.size;
i++) {
msgpack_pack_object(&tmp_pck,
log_event.body->via.map.ptr[i].key);
msgpack_pack_object(&tmp_pck,
log_event.body->via.map.ptr[i].val);
if (schema_value &&
schema_value->type == MSGPACK_OBJECT_STR &&
schema_value->via.str.size == 4 &&
strncmp(schema_value->via.str.ptr, "otlp", 4) == 0) {
is_otlp = FLB_TRUE;
}

/* Concatenate by using break lines */
out_js = flb_msgpack_raw_to_json_sds(tmp_sbuf.data, tmp_sbuf.size,
config->json_escape_unicode);
if (!out_js) {
flb_sds_destroy(out_buf);
msgpack_sbuffer_destroy(&tmp_sbuf);
flb_log_event_decoder_destroy(&log_decoder);
return NULL;
if (is_otlp == FLB_TRUE) {
resource_value = NULL;
scope_value = NULL;

if (group_attributes != NULL &&
group_attributes->type == MSGPACK_OBJECT_MAP) {
resource_value = find_map_value(group_attributes, "resource", 8);
scope_value = find_map_value(group_attributes, "scope", 5);
}

flb_mp_map_header_append(&root_map);
msgpack_pack_str(&tmp_pck, 4);
msgpack_pack_str_body(&tmp_pck, "otlp", 4);

flb_mp_map_header_init(&otlp_map, &tmp_pck);

if (resource_value != NULL) {
flb_mp_map_header_append(&otlp_map);
msgpack_pack_str(&tmp_pck, 8);
msgpack_pack_str_body(&tmp_pck, "resource", 8);
msgpack_pack_object(&tmp_pck, *resource_value);
}

if (scope_value != NULL) {
flb_mp_map_header_append(&otlp_map);
msgpack_pack_str(&tmp_pck, 5);
msgpack_pack_str_body(&tmp_pck, "scope", 5);
msgpack_pack_object(&tmp_pck, *scope_value);
}

flb_mp_map_header_end(&otlp_map);
}
else {
flb_mp_map_header_append(&root_map);
msgpack_pack_str(&tmp_pck, 9);
msgpack_pack_str_body(&tmp_pck, "flb_group", 9);

flb_mp_map_header_init(&group_map, &tmp_pck);

if (group_metadata != NULL) {
flb_mp_map_header_append(&group_map);
msgpack_pack_str(&tmp_pck, 8);
msgpack_pack_str_body(&tmp_pck, "metadata", 8);
msgpack_pack_object(&tmp_pck, *group_metadata);
}

if (group_attributes != NULL) {
flb_mp_map_header_append(&group_map);
msgpack_pack_str(&tmp_pck, 4);
msgpack_pack_str_body(&tmp_pck, "body", 4);
msgpack_pack_object(&tmp_pck, *group_attributes);
}

flb_mp_map_header_end(&group_map);
}

/*
* One map record has been converted, now append it to the
* outgoing out_buf sds variable.
*/
flb_sds_cat_safe(&out_buf, out_js, flb_sds_len(out_js));
flb_sds_cat_safe(&out_buf, "\n", 1);

flb_sds_destroy(out_js);
msgpack_sbuffer_clear(&tmp_sbuf);
}

flb_mp_map_header_end(&root_map);

/* Release the unpacker */
flb_log_event_decoder_destroy(&log_decoder);

/* Convert the complete msgpack structure to JSON */
out_js = flb_msgpack_raw_to_json_sds(tmp_sbuf.data, tmp_sbuf.size,
config->json_escape_unicode);

msgpack_sbuffer_destroy(&tmp_sbuf);

return out_buf;
/* append a newline */
flb_sds_cat_safe(&out_js, "\n", 1);

if (!out_js) {
Comment on lines +274 to +283

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Guard newline append against failed JSON conversion

If flb_msgpack_raw_to_json_sds fails, out_js is NULL but flb_sds_cat_safe(&out_js, "\n", 1) is called before checking for the error. flb_sds_cat_safe dereferences the pointer, so this path will segfault instead of propagating a graceful error whenever the JSON conversion runs out of memory or receives invalid data. The previous implementation checked the return value before appending, so the crash is new to this change.

Useful? React with 👍 / 👎.

flb_sds_destroy(out_buf);
return NULL;
}

/* Replace out_buf with the complete JSON */
flb_sds_destroy(out_buf);
return out_js;
}

static int logs_event_chunk_append(struct vivo_exporter *ctx,
struct flb_input_instance *src_ins,
struct flb_event_chunk *event_chunk,
struct flb_config *config)
{
size_t len;
flb_sds_t json;
struct vivo_stream_entry *entry;


json = format_logs(event_chunk, config);
json = format_logs(src_ins, event_chunk, config);
if (!json) {
flb_plg_error(ctx->ins, "cannot convert logs chunk to JSON");
return -1;
Expand Down Expand Up @@ -207,6 +367,7 @@ static int cb_vivo_init(struct flb_output_instance *ins,
return -1;
}
ctx->ins = ins;
ctx->config = config;

ret = flb_output_config_map_set(ins, (void *) ctx);
if (ret == -1) {
Expand Down Expand Up @@ -272,7 +433,7 @@ static void cb_vivo_flush(struct flb_event_chunk *event_chunk,
}
#endif
if (event_chunk->type == FLB_EVENT_TYPE_LOGS) {
ret = logs_event_chunk_append(ctx, event_chunk, config);
ret = logs_event_chunk_append(ctx, ins, event_chunk, config);
}
else if (event_chunk->type == FLB_EVENT_TYPE_TRACES) {
ret = metrics_traces_event_chunk_append(ctx, ctx->stream_traces, event_chunk, config);
Expand Down
3 changes: 3 additions & 0 deletions plugins/out_vivo_exporter/vivo.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
#include <fluent-bit/flb_output_plugin.h>
#include <fluent-bit/flb_ring_buffer.h>

struct flb_config;

#define VIVO_RING_BUFFER_SIZE 10

/* Plugin context */
Expand All @@ -40,6 +42,7 @@ struct vivo_exporter {

/* instance context */
struct flb_output_instance *ins;
struct flb_config *config;
};

#endif
Loading
Loading