Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
33cdbbe
router: implement new router paths and conditions
edsiper Oct 20, 2025
ac42f67
router_config: add support for rules
edsiper Oct 20, 2025
839a397
router_condition: add conditional logic for logs
edsiper Oct 20, 2025
3e147e1
task: add handling for direct route
edsiper Oct 20, 2025
a4ace4e
tests: internal: router_config: extend test for conditional logs
edsiper Oct 20, 2025
947c874
wip
edsiper Oct 20, 2025
840e2ba
wip
edsiper Oct 21, 2025
10058f2
wip
edsiper Oct 21, 2025
eb798a7
wip
edsiper Oct 21, 2025
e662be2
wip
edsiper Oct 21, 2025
38b010a
router: use cfl_list for router path
edsiper Oct 27, 2025
966387c
input_log: code cleanup and fix types for linked lists
edsiper Oct 27, 2025
b923d4a
input: fix data type for routes_direct
edsiper Oct 27, 2025
4163712
router_config: fix data type for linked list
edsiper Oct 27, 2025
55f9e0e
task: fix data type for linked list
edsiper Oct 27, 2025
2c2a5cf
input_log: preserve non-conditional routes when conditional routing i…
edsiper Oct 27, 2025
20eeb44
input_log: handle deferred chunk creation in threaded inputs with con…
edsiper Oct 27, 2025
49e41fe
input: include cfl header
edsiper Oct 27, 2025
b727628
router: use cfl_list intead of mk_list
edsiper Oct 27, 2025
28946e8
input: fix llist initialization
edsiper Oct 27, 2025
4a82d5d
sosreport: fix list iterator api
edsiper Oct 27, 2025
cb67d1c
tests: internal: router_config: fix lists API
edsiper Oct 27, 2025
c7949cc
input_chunk: expose flb_input_chunk_get_real_size()
edsiper Oct 28, 2025
ac76b20
tests: internal: input_chunk: remove unused code
edsiper Oct 28, 2025
a7e4c92
input_log: implement conditional routing with non-conditional route p…
edsiper Oct 28, 2025
8183d93
input_log: recompute per-output storage accounting
edsiper Oct 28, 2025
fde4e06
input_log: remove debug messages and cleanup
edsiper Oct 30, 2025
8d2105f
routes_mask: correct memcmp byte count in flb_routes_mask_is_empty
edsiper Oct 30, 2025
cb2ce79
router_config: remove debug message
edsiper Oct 30, 2025
90bfb20
task: remove routes_mask check from direct-route branch
edsiper Oct 30, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 6 additions & 4 deletions include/fluent-bit/flb_input.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,8 @@

#include <cmetrics/cmetrics.h>
#include <monkey/mk_core.h>
#include <cfl/cfl.h>

#include <msgpack.h>
#include <inttypes.h>

Expand Down Expand Up @@ -335,10 +337,10 @@ struct flb_input_instance {

struct mk_list _head; /* link to config->inputs */

struct mk_list routes_direct; /* direct routes set by API */
struct mk_list routes; /* flb_router_path's list */
struct mk_list properties; /* properties / configuration */
struct mk_list collectors; /* collectors */
struct cfl_list routes_direct; /* direct routes set by API */
struct cfl_list routes; /* flb_router_path's list */
struct mk_list properties; /* properties / configuration */
struct mk_list collectors; /* collectors */

/* Storage Chunks */
struct mk_list chunks; /* linked list of all chunks */
Expand Down
1 change: 1 addition & 0 deletions include/fluent-bit/flb_input_chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ int flb_input_chunk_get_tag(struct flb_input_chunk *ic,
void flb_input_chunk_ring_buffer_cleanup(struct flb_input_instance *ins);
void flb_input_chunk_ring_buffer_collector(struct flb_config *ctx, void *data);
ssize_t flb_input_chunk_get_size(struct flb_input_chunk *ic);
ssize_t flb_input_chunk_get_real_size(struct flb_input_chunk *ic);
size_t flb_input_chunk_set_limits(struct flb_input_instance *in);
size_t flb_input_chunk_total_size(struct flb_input_instance *in);
struct flb_input_chunk *flb_input_chunk_map(struct flb_input_instance *in,
Expand Down
35 changes: 34 additions & 1 deletion include/fluent-bit/flb_router.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,24 @@
#include <fluent-bit/flb_event.h>
#include <fluent-bit/flb_sds.h>
#include <fluent-bit/flb_config.h>
#include <fluent-bit/flb_conditionals.h>
#include <cfl/cfl.h>
#include <monkey/mk_core.h>

struct flb_mp_chunk_cobj;
struct flb_log_event_encoder;
struct flb_log_event_decoder;

struct flb_router_chunk_context {
struct flb_mp_chunk_cobj *chunk_cobj;
struct flb_log_event_encoder *log_encoder;
struct flb_log_event_decoder *log_decoder;
};

struct flb_router_path {
struct flb_output_instance *ins;
struct mk_list _head;
struct flb_route *route;
struct cfl_list _head;
};

static inline int flb_router_match_type(int in_event_type,
Expand Down Expand Up @@ -74,12 +86,17 @@ struct flb_route_condition_rule {
flb_sds_t field;
flb_sds_t op;
flb_sds_t value;
flb_sds_t *values;
size_t values_count;
struct cfl_list _head;
};

struct flb_route_condition {
struct cfl_list rules;
int is_default;
enum flb_condition_operator op;
struct flb_condition *compiled;
int compiled_status;
};

struct flb_route_output {
Expand All @@ -104,6 +121,7 @@ struct flb_route {
flb_sds_t name;
uint32_t signals;
struct flb_route_condition *condition;
int per_record_routing;
struct cfl_list outputs;
struct cfl_list processors;
struct cfl_list _head;
Expand All @@ -128,14 +146,29 @@ void flb_router_exit(struct flb_config *config);

uint32_t flb_router_signal_from_chunk(struct flb_event_chunk *chunk);

int flb_router_chunk_context_init(struct flb_router_chunk_context *context);
void flb_router_chunk_context_reset(struct flb_router_chunk_context *context);
void flb_router_chunk_context_destroy(struct flb_router_chunk_context *context);
int flb_router_chunk_context_prepare_logs(struct flb_router_chunk_context *context,
struct flb_event_chunk *chunk);

int flb_route_condition_eval(struct flb_event_chunk *chunk,
struct flb_router_chunk_context *context,
struct flb_route *route);
int flb_condition_eval_logs(struct flb_event_chunk *chunk,
struct flb_router_chunk_context *context,
struct flb_route *route);
int flb_condition_eval_metrics(struct flb_event_chunk *chunk,
struct flb_router_chunk_context *context,
struct flb_route *route);
int flb_condition_eval_traces(struct flb_event_chunk *chunk,
struct flb_router_chunk_context *context,
struct flb_route *route);
int flb_router_path_should_route(struct flb_event_chunk *chunk,
struct flb_router_chunk_context *context,
struct flb_router_path *path);

struct flb_condition *flb_router_route_get_condition(struct flb_route *route);

struct flb_cf;

Expand Down
31 changes: 27 additions & 4 deletions src/flb_conditionals.c
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,8 @@ int flb_condition_evaluate(struct flb_condition *cond,
struct flb_condition_rule *rule;
struct cfl_variant *record_variant;
int result;
int any_rule_evaluated = FLB_FALSE;
int any_rule_matched = FLB_FALSE;

if (!cond || !record) {
flb_trace("[condition] NULL condition or record, returning TRUE");
Expand All @@ -382,9 +384,16 @@ int flb_condition_evaluate(struct flb_condition *cond,

/* Get the variant for this rule's context */
record_variant = get_record_variant(record, rule->context);
any_rule_evaluated = FLB_TRUE;
if (!record_variant) {
flb_trace("[condition] no record variant found for context %d", rule->context);
continue;
if (cond->op == FLB_COND_OP_AND) {
flb_trace("[condition] AND condition missing field, returning FALSE");
return FLB_FALSE;
}
else {
continue;
}
}

flb_trace("[condition] evaluating rule against record");
Expand All @@ -399,8 +408,22 @@ int flb_condition_evaluate(struct flb_condition *cond,
flb_trace("[condition] OR condition with TRUE result, short-circuiting");
return FLB_TRUE;
}

if (result == FLB_TRUE) {
any_rule_matched = FLB_TRUE;
}
}

if (cond->op == FLB_COND_OP_OR) {
flb_trace("[condition] final evaluation result: %d", any_rule_matched);
return any_rule_matched;
}

flb_trace("[condition] final evaluation result: %d", (cond->op == FLB_COND_OP_AND) ? FLB_TRUE : FLB_FALSE);
return (cond->op == FLB_COND_OP_AND) ? FLB_TRUE : FLB_FALSE;
}
if (any_rule_evaluated == FLB_FALSE) {
flb_trace("[condition] no rules evaluated, defaulting to FALSE for AND condition");
return FLB_FALSE;
}

flb_trace("[condition] final evaluation result: TRUE");
return FLB_TRUE;
}
1 change: 0 additions & 1 deletion src/flb_event.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@

#include <fluent-bit/flb_event.h>
#include <fluent-bit/flb_sds.h>

struct flb_event_chunk *flb_event_chunk_create(int type,
int total_events,
char *tag_buf, int tag_len,
Expand Down
4 changes: 2 additions & 2 deletions src/flb_input.c
Original file line number Diff line number Diff line change
Expand Up @@ -352,8 +352,8 @@ struct flb_input_instance *flb_input_new(struct flb_config *config,
instance->host.ipv6 = FLB_FALSE;

/* Initialize list heads */
mk_list_init(&instance->routes_direct);
mk_list_init(&instance->routes);
cfl_list_init(&instance->routes_direct);
cfl_list_init(&instance->routes);
mk_list_init(&instance->tasks);
mk_list_init(&instance->chunks);
mk_list_init(&instance->collectors);
Expand Down
3 changes: 1 addition & 2 deletions src/flb_input_chunk.c
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ static int flb_input_chunk_drop_task_route(
struct flb_output_instance *o_ins,
ssize_t *dropped_record_count);

static ssize_t flb_input_chunk_get_real_size(struct flb_input_chunk *ic);

static ssize_t get_input_chunk_record_count(struct flb_input_chunk *input_chunk)
{
Expand Down Expand Up @@ -283,7 +282,7 @@ ssize_t flb_input_chunk_get_size(struct flb_input_chunk *ic)
* is used to track the size of chunks in filesystem so we need to call
* cio_chunk_get_real_size to return the original size in the file system
*/
static ssize_t flb_input_chunk_get_real_size(struct flb_input_chunk *ic)
ssize_t flb_input_chunk_get_real_size(struct flb_input_chunk *ic)
{
ssize_t meta_size;
ssize_t size;
Expand Down
Loading
Loading