Skip to content

Commit 947c874

Browse files
committed
wip
Signed-off-by: Eduardo Silva <[email protected]>
1 parent a4ace4e commit 947c874

File tree

5 files changed

+215
-69
lines changed

5 files changed

+215
-69
lines changed

include/fluent-bit/flb_router.h

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,16 @@
3030
#include <cfl/cfl.h>
3131
#include <monkey/mk_core.h>
3232

33+
struct flb_mp_chunk_cobj;
34+
struct flb_log_event_encoder;
35+
struct flb_log_event_decoder;
36+
37+
struct flb_router_chunk_context {
38+
struct flb_mp_chunk_cobj *chunk_cobj;
39+
struct flb_log_event_encoder *log_encoder;
40+
struct flb_log_event_decoder *log_decoder;
41+
};
42+
3343
struct flb_router_path {
3444
struct flb_output_instance *ins;
3545
struct flb_route *route;
@@ -135,15 +145,26 @@ void flb_router_exit(struct flb_config *config);
135145

136146
uint32_t flb_router_signal_from_chunk(struct flb_event_chunk *chunk);
137147

148+
int flb_router_chunk_context_init(struct flb_router_chunk_context *context);
149+
void flb_router_chunk_context_reset(struct flb_router_chunk_context *context);
150+
void flb_router_chunk_context_destroy(struct flb_router_chunk_context *context);
151+
int flb_router_chunk_context_prepare_logs(struct flb_router_chunk_context *context,
152+
struct flb_event_chunk *chunk);
153+
138154
int flb_route_condition_eval(struct flb_event_chunk *chunk,
155+
struct flb_router_chunk_context *context,
139156
struct flb_route *route);
140157
int flb_condition_eval_logs(struct flb_event_chunk *chunk,
158+
struct flb_router_chunk_context *context,
141159
struct flb_route *route);
142160
int flb_condition_eval_metrics(struct flb_event_chunk *chunk,
161+
struct flb_router_chunk_context *context,
143162
struct flb_route *route);
144163
int flb_condition_eval_traces(struct flb_event_chunk *chunk,
164+
struct flb_router_chunk_context *context,
145165
struct flb_route *route);
146166
int flb_router_path_should_route(struct flb_event_chunk *chunk,
167+
struct flb_router_chunk_context *context,
147168
struct flb_router_path *path);
148169

149170
struct flb_cf;

src/flb_event.c

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@
2323

2424
#include <fluent-bit/flb_event.h>
2525
#include <fluent-bit/flb_sds.h>
26-
2726
struct flb_event_chunk *flb_event_chunk_create(int type,
2827
int total_events,
2928
char *tag_buf, int tag_len,

src/flb_router_condition.c

Lines changed: 135 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -21,15 +21,117 @@
2121
#include <fluent-bit/flb_log.h>
2222
#include <fluent-bit/flb_router.h>
2323
#include <fluent-bit/flb_conditionals.h>
24+
#include <fluent-bit/flb_log_event_encoder.h>
2425
#include <fluent-bit/flb_log_event_decoder.h>
25-
#include <fluent-bit/flb_mp.h>
2626
#include <fluent-bit/flb_mp_chunk.h>
2727

2828
#define FLB_ROUTE_CONDITION_COMPILED_SUCCESS 1
2929
#define FLB_ROUTE_CONDITION_COMPILED_FAILURE -1
3030

3131
static struct flb_condition *route_condition_get_compiled(struct flb_route_condition *condition);
32-
static void route_condition_record_destroy(struct flb_mp_chunk_record *record);
32+
33+
int flb_router_chunk_context_init(struct flb_router_chunk_context *context)
34+
{
35+
if (!context) {
36+
return -1;
37+
}
38+
39+
context->chunk_cobj = NULL;
40+
context->log_encoder = NULL;
41+
context->log_decoder = NULL;
42+
43+
return 0;
44+
}
45+
46+
void flb_router_chunk_context_reset(struct flb_router_chunk_context *context)
47+
{
48+
if (!context) {
49+
return;
50+
}
51+
52+
if (context->chunk_cobj) {
53+
flb_mp_chunk_cobj_destroy(context->chunk_cobj);
54+
context->chunk_cobj = NULL;
55+
}
56+
57+
if (context->log_decoder) {
58+
flb_log_event_decoder_destroy(context->log_decoder);
59+
context->log_decoder = NULL;
60+
}
61+
62+
if (context->log_encoder) {
63+
flb_log_event_encoder_destroy(context->log_encoder);
64+
context->log_encoder = NULL;
65+
}
66+
}
67+
68+
void flb_router_chunk_context_destroy(struct flb_router_chunk_context *context)
69+
{
70+
flb_router_chunk_context_reset(context);
71+
}
72+
73+
int flb_router_chunk_context_prepare_logs(struct flb_router_chunk_context *context,
74+
struct flb_event_chunk *chunk)
75+
{
76+
int ret;
77+
struct flb_mp_chunk_record *record;
78+
79+
if (!context || !chunk) {
80+
return -1;
81+
}
82+
83+
if (chunk->type != FLB_EVENT_TYPE_LOGS) {
84+
return 0;
85+
}
86+
87+
if (context->chunk_cobj) {
88+
return 0;
89+
}
90+
91+
if (!chunk->data || chunk->size == 0) {
92+
return -1;
93+
}
94+
95+
if (!context->log_encoder) {
96+
context->log_encoder = flb_log_event_encoder_create(FLB_LOG_EVENT_FORMAT_DEFAULT);
97+
if (!context->log_encoder) {
98+
return -1;
99+
}
100+
}
101+
102+
if (!context->log_decoder) {
103+
context->log_decoder = flb_log_event_decoder_create(NULL, 0);
104+
if (!context->log_decoder) {
105+
flb_router_chunk_context_reset(context);
106+
return -1;
107+
}
108+
flb_log_event_decoder_read_groups(context->log_decoder, FLB_TRUE);
109+
}
110+
111+
flb_log_event_decoder_reset(context->log_decoder, chunk->data, chunk->size);
112+
113+
context->chunk_cobj = flb_mp_chunk_cobj_create(context->log_encoder,
114+
context->log_decoder);
115+
if (!context->chunk_cobj) {
116+
flb_router_chunk_context_reset(context);
117+
return -1;
118+
}
119+
120+
while ((ret = flb_mp_chunk_cobj_record_next(context->chunk_cobj, &record)) ==
121+
FLB_MP_CHUNK_RECORD_OK) {
122+
continue;
123+
}
124+
125+
if (ret != FLB_MP_CHUNK_RECORD_EOF) {
126+
flb_router_chunk_context_reset(context);
127+
return -1;
128+
}
129+
130+
context->chunk_cobj->record_pos = NULL;
131+
context->chunk_cobj->condition = NULL;
132+
133+
return 0;
134+
}
33135

34136
uint32_t flb_router_signal_from_chunk(struct flb_event_chunk *chunk)
35137
{
@@ -52,21 +154,16 @@ uint32_t flb_router_signal_from_chunk(struct flb_event_chunk *chunk)
52154
}
53155

54156
int flb_condition_eval_logs(struct flb_event_chunk *chunk,
157+
struct flb_router_chunk_context *context,
55158
struct flb_route *route)
56159
{
57-
int ret;
58160
int result = FLB_FALSE;
59161
struct flb_route_condition *condition;
60162
struct flb_condition *compiled;
61-
struct flb_log_event_decoder decoder;
62-
struct flb_log_event event;
63-
struct flb_mp_chunk_record record;
64-
65-
if (!chunk || !route || !route->condition) {
66-
return FLB_FALSE;
67-
}
163+
struct flb_mp_chunk_record *record;
164+
struct cfl_list *head;
68165

69-
if (!chunk->data || chunk->size == 0) {
166+
if (!chunk || !context || !route || !route->condition) {
70167
return FLB_FALSE;
71168
}
72169

@@ -77,66 +174,50 @@ int flb_condition_eval_logs(struct flb_event_chunk *chunk,
77174
return FLB_FALSE;
78175
}
79176

80-
ret = flb_log_event_decoder_init(&decoder, chunk->data, chunk->size);
81-
if (ret != FLB_EVENT_DECODER_SUCCESS) {
177+
if (flb_router_chunk_context_prepare_logs(context, chunk) != 0) {
82178
return FLB_FALSE;
83179
}
84180

85-
flb_log_event_decoder_read_groups(&decoder, FLB_TRUE);
86-
87-
while ((ret = flb_log_event_decoder_next(&decoder, &event)) == FLB_EVENT_DECODER_SUCCESS) {
88-
memset(&record, 0, sizeof(record));
89-
record.event = event;
90-
91-
if (event.metadata) {
92-
record.cobj_metadata = flb_mp_object_to_cfl(event.metadata);
93-
if (!record.cobj_metadata) {
94-
route_condition_record_destroy(&record);
95-
break;
96-
}
97-
}
181+
if (!context->chunk_cobj) {
182+
return FLB_FALSE;
183+
}
98184

99-
if (event.body) {
100-
record.cobj_record = flb_mp_object_to_cfl(event.body);
101-
if (!record.cobj_record) {
102-
route_condition_record_destroy(&record);
103-
break;
104-
}
105-
}
185+
cfl_list_foreach(head, &context->chunk_cobj->records) {
186+
record = cfl_list_entry(head, struct flb_mp_chunk_record, _head);
106187

107-
if (flb_condition_evaluate(compiled, &record) == FLB_TRUE) {
188+
if (flb_condition_evaluate(compiled, record) == FLB_TRUE) {
108189
result = FLB_TRUE;
109-
route_condition_record_destroy(&record);
110190
break;
111191
}
112-
113-
route_condition_record_destroy(&record);
114192
}
115193

116-
flb_log_event_decoder_destroy(&decoder);
117-
118194
return result;
119195
}
120196

121197
int flb_condition_eval_metrics(struct flb_event_chunk *chunk,
198+
struct flb_router_chunk_context *context,
122199
struct flb_route *route)
123200
{
124201
(void) chunk;
202+
(void) context;
125203
(void) route;
126204

127205
return FLB_FALSE;
128206
}
129207

130208
int flb_condition_eval_traces(struct flb_event_chunk *chunk,
209+
struct flb_router_chunk_context *context,
131210
struct flb_route *route)
132211
{
133212
(void) chunk;
213+
(void) context;
134214
(void) route;
135215

136216
return FLB_FALSE;
137217
}
138218

139219
int flb_route_condition_eval(struct flb_event_chunk *chunk,
220+
struct flb_router_chunk_context *context,
140221
struct flb_route *route)
141222
{
142223
uint32_t signal;
@@ -164,11 +245,11 @@ int flb_route_condition_eval(struct flb_event_chunk *chunk,
164245

165246
switch (signal) {
166247
case FLB_ROUTER_SIGNAL_LOGS:
167-
return flb_condition_eval_logs(chunk, route);
248+
return flb_condition_eval_logs(chunk, context, route);
168249
case FLB_ROUTER_SIGNAL_METRICS:
169-
return flb_condition_eval_metrics(chunk, route);
250+
return flb_condition_eval_metrics(chunk, context, route);
170251
case FLB_ROUTER_SIGNAL_TRACES:
171-
return flb_condition_eval_traces(chunk, route);
252+
return flb_condition_eval_traces(chunk, context, route);
172253
default:
173254
break;
174255
}
@@ -177,17 +258,28 @@ int flb_route_condition_eval(struct flb_event_chunk *chunk,
177258
}
178259

179260
int flb_router_path_should_route(struct flb_event_chunk *chunk,
261+
struct flb_router_chunk_context *context,
180262
struct flb_router_path *path)
181263
{
182264
if (!path) {
183265
return FLB_FALSE;
184266
}
185267

268+
if (chunk && chunk->type == FLB_EVENT_TYPE_LOGS) {
269+
if (!context) {
270+
return FLB_FALSE;
271+
}
272+
273+
if (flb_router_chunk_context_prepare_logs(context, chunk) != 0) {
274+
return FLB_FALSE;
275+
}
276+
}
277+
186278
if (!path->route) {
187279
return FLB_TRUE;
188280
}
189281

190-
return flb_route_condition_eval(chunk, path->route);
282+
return flb_route_condition_eval(chunk, context, path->route);
191283
}
192284

193285
static int parse_rule_operator(const flb_sds_t op_str,
@@ -357,20 +449,3 @@ static struct flb_condition *route_condition_get_compiled(struct flb_route_condi
357449
return condition->compiled;
358450
}
359451

360-
static void route_condition_record_destroy(struct flb_mp_chunk_record *record)
361-
{
362-
if (!record) {
363-
return;
364-
}
365-
366-
if (record->cobj_record) {
367-
cfl_object_destroy(record->cobj_record);
368-
record->cobj_record = NULL;
369-
}
370-
371-
if (record->cobj_metadata) {
372-
cfl_object_destroy(record->cobj_metadata);
373-
record->cobj_metadata = NULL;
374-
}
375-
}
376-

0 commit comments

Comments
 (0)