Skip to content

Conversation

@edsiper
Copy link
Member

@edsiper edsiper commented Oct 20, 2025

Add New Routing System with Conditional Logic (Logs)

Overview

This PR introduces a new routing system for Fluent Bit that provides content-aware routing based on actual data content rather than simple tags. The new system supports sophisticated conditional logic, multiple output support, and advanced telemetry handling.

Key Features

🆕 New Capabilities

  • Content-aware routing based on actual data content
  • Conditional logic with multiple operators (eq, gt, regex, in, etc.)
  • Multiple outputs per route
  • Default routes for unmatched data
  • Output fallbacks for reliability
  • Per-route processors for advanced processing
  • Signal-based organization (logs, metrics, traces)

⚠️ Important Notes

  • YAML-Only: Only available in YAML configuration format
  • Processing order: Routes are evaluated after input processors run
  • No classic support: Cannot be used with .conf format
  • Logs Only: Conditional routing is currently only available for logs signal type

Configuration Example

pipeline:
  inputs:
    - name: dummy
      dummy: '{"message": "Hello, Fluent Bit!", "level": "info"}'
      
      routes:
        logs:
          - name: error_logs
            condition:
              rules:
                - field: "$level"
                  op: eq
                  value: "error"
            to:
              outputs:
                - name: print_error

          - name: info_logs
            condition:
              rules:
                - field: "$level"
                  op: eq
                  value: "info"
            to:
              outputs:
                - name: print_info
                - name: print_info_secondary  # Multiple outputs

          - name: default_logs
            condition:
              default: true  # Catches unmatched data
            to:
              outputs:
                - name: print_default

  outputs:
    - name: stdout
      alias: print_info
    - name: stdout
      alias: print_error
    - name: stdout
      alias: print_default

Available Operators

Operator Description Example
eq Equal to $status eq 200
neq Not equal to $level neq "debug"
gt Greater than $metric.value gt 0.8
lt Less than $response_time lt 100
gte Greater than or equal $cpu_usage gte 90
lte Less than or equal $memory lte 1024
regex Regular expression $message regex "^ERROR"
not_regex Not matching regex $path not_regex "\.tmp$"
in In array $level in ["error", "warn"]
not_in Not in array $env not_in ["dev", "test"]

Signal Types

  • logs - for log data ✅ Conditionals supported
  • metrics - for metric data ❌ Conditionals not yet supported
  • traces - for trace data ❌ Conditionals not yet supported
  • any - for all telemetry types ❌ Conditionals not yet supported

Processing Order

  1. Input processors execute first (shared across all routes)
  2. Route conditions are evaluated in order (logs only)
  3. Default routes bypass rule evaluation
  4. Route processors execute after condition matches
  5. Outputs receive the processed data

Fluent Bit is licensed under Apache 2.0, by submitting this pull request I understand that this code will be released under the terms of that license.

Summary by CodeRabbit

  • New Features

    • Per-chunk/per-record routing for logs with richer condition operators (eq/neq/gt/lt/gte/lte/regex/in/not_in) and compiled/cached conditions for faster evaluation.
    • Route-aware payload splitting and tagging so logs can be routed and aggregated per-route before sending.
    • Path-level routing decisions to skip non-matching direct routes.
  • Tests

    • Added tests covering condition evaluation, IN operator, and path routing behavior.

@edsiper edsiper added this to the Fluent Bit v4.2 milestone Oct 20, 2025
@edsiper edsiper changed the title Router conditionals router: add conditional support for Logs Oct 20, 2025
@coderabbitai
Copy link

coderabbitai bot commented Oct 20, 2025

Important

Review skipped

Draft detected.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.

Note

Other AI code review bot(s) detected

CodeRabbit has detected other AI code review bot(s) in this pull request and will avoid duplicating their findings in the review comments. This may lead to a less comprehensive review.

Walkthrough

Adds a per-chunk router context and per-record condition compilation/evaluation, extends route/condition data structures and parsing to support operators and value arrays, threads the context through routing evaluation and task creation, and adds log-encoding helpers to support route-aware log splitting.

Changes

Cohort / File(s) Summary
Router Header API Expansion
include/fluent-bit/flb_router.h
Added forward declarations, new flb_router_chunk_context struct, added fields to flb_router_path, flb_route_condition_rule, and flb_route_condition, added context lifecycle and prepare APIs, updated evaluation/path APIs to accept context, and included flb_conditionals.h.
Router Path and Route Initialization
src/flb_router.c
Initialize p->route = NULL in flb_router_connect and flb_router_connect_direct.
Condition Evaluation & Compilation
src/flb_router_condition.c
Implemented chunk-context init/reset/destroy and log preparation; added context-aware condition evaluation stubs for logs/metrics/traces; implemented operator parsing, numeric parsing, condition compilation/caching, and per-record evaluation; added flb_router_path_should_route and flb_router_route_get_condition.
Router Configuration and Parsing
src/flb_router_config.c
Parse/support operators (eq, neq, gt, lt, gte, lte, regex, not_regex, in, not_in) and value arrays; track op and compiled status; free compiled conditions on destruction; store route into path for direct connections; support per_record_routing parsing.
Task Direct Route Processing
src/flb_task.c
Initialize and cleanup router chunk context in task flow; gate direct-route acceptance via flb_router_path_should_route; allocate direct routes with calloc, track direct_count, drop task if no matching direct routes.
Input Log Routing & Encoding
src/flb_input_log.c
Added route payload structures and lifecycle, per-record encoding helpers, route-aware aggregation/splitting, and integration with routing to build per-route payloads and append to outputs.
Tests: Router Condition Scenarios
tests/internal/router_config.c
Added build_log_chunk and free_route_condition helpers; added tests for log condition evaluation (AND, IN) and path routing; adjusted existing tests to new APIs.
Minor Cleanup
src/flb_event.c
Removed an extraneous blank line between includes.

Sequence Diagram(s)

sequenceDiagram
    autonumber
    participant Task as Task/Create
    participant Router as Router
    participant Context as RouterChunkContext
    participant Cond as ConditionCompiler
    participant Eval as ConditionEval
    participant Input as InputLog (encoder)

    Task->>Router: create task (direct routes)
    Router->>Context: flb_router_chunk_context_init()
    Context-->>Router: context ready

    loop for each path
        Router->>Cond: flb_router_route_get_condition(route)
        alt compiled exists
            Cond-->>Router: compiled condition
        else compile
            Router->>Cond: route_condition_get_compiled()
            Cond-->>Router: compiled or failure
        end
        Router->>Eval: flb_router_path_should_route(chunk, Context, path)
        Eval->>Context: flb_router_chunk_context_prepare_logs() (if logs)
        Context->>Input: encode/read records
        Input-->>Eval: records iterator
        Eval->>Cond: evaluate per-record against compiled condition
        alt match
            Eval-->>Router: accept path
        else no match
            Eval-->>Router: reject path
        end
    end

    alt any accepted
        Router->>Task: create task with filtered routes
    else none accepted
        Router->>Context: flb_router_chunk_context_destroy()
        Router->>Task: drop task
    end

    Router->>Context: flb_router_chunk_context_destroy()
    Context-->>Router: cleaned
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~45 minutes

Possibly related PRs

Suggested reviewers

  • fujimotos
  • koleini

Poem

🐰 I nibbled fields and stitched a chunk-sized cloak,
Compiled the rules where tiny log-threads spoke,
Per-record hops, I split the trail,
Routes found home on a moonlit trail,
A rabbit's patchwork router, light and woke.

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 0.00% which is insufficient. The required threshold is 80.00%. You can run @coderabbitai generate docstrings to improve docstring coverage.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title Check ✅ Passed The PR title "router: add conditional support for Logs" directly and accurately describes the primary changes in this changeset. The modifications across all files are focused on implementing conditional routing logic for the logs signal type, including new router chunk context structures, route condition evaluation APIs, condition compilation and caching, and per-record evaluation mechanisms. The title is concise, specific, and uses clear terminology without vague or generic phrases, allowing a teammate to understand that the PR adds condition-based routing capabilities for log events.

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines 1220 to 1314
}

if (flb_router_connect_direct(input_ins, output_ins) == 0) {
struct flb_router_path *path;

path = mk_list_entry_last(&input_ins->routes_direct,
struct flb_router_path, _head);
path->route = route;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Do not skip additional routes to the same output

The new conditional router connects direct paths in a loop, but the early input_has_direct_route() check drops every route after the first one that targets the same output instance. With conditional routing it is valid to define multiple routes (e.g. an error rule and a default rule) that all forward to the same output but with different conditions or processors. Because only the first route is connected, later routes are silently ignored and chunks that don’t satisfy the first route’s condition are dropped in flb_task_create (no direct routes match and the task is destroyed). This makes common configurations unusable whenever two routes share the same output alias. The router should allow multiple direct routes per output or include the route identity in the deduplication.

Useful? React with 👍 / 👎.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 5

🧹 Nitpick comments (7)
src/flb_router.c (1)

157-159: Route pointer init: good; consider zero-initializing the struct.

Setting p->route = NULL prevents dangling reads. Optionally switch to flb_calloc and drop the explicit assignment for future fields.

Apply (optional):

-    p = flb_malloc(sizeof(struct flb_router_path));
+    p = flb_calloc(1, sizeof(struct flb_router_path));
@@
-    p->route = NULL;

Also applies to: 175-177

tests/internal/router_config.c (3)

32-86: Helper looks correct; minor DRY improvement.

build_log_chunk is fine. You can reduce repeated error cleanup with a single exit path to improve readability.


88-129: Destructor mirrors production; avoid duplication long‑term.

free_route_condition fully cleans nested allocations (incl. compiled). Consider exposing a small public/test helper to avoid divergence from route_condition_destroy.

Would you prefer a tiny test-only header that reuses the production destroy routine?


1127-1203: path_should_route test: LGTM; consider also default-route case.

Add a small case asserting flb_router_path_should_route(..., path with route==NULL) returns FLB_TRUE.

include/fluent-bit/flb_router.h (1)

29-30: New dependency include: OK; consider forward-decl.

If feasible, forward-declare struct flb_condition and enum to avoid tight header coupling to flb_conditionals.h.

src/flb_router_config.c (2)

548-550: Rule parsing for array values: LGTM; add early operator validation.

Array handling and cleanup paths are correct. Consider validating rule->op against the supported set (eq, neq, gt, lt, gte, lte, regex, not_regex, in, not_in) here to fail fast and give clearer config errors.

Example:

@@
-    rule->op = copy_from_cfl_sds(op_var->data.as_string);
+    rule->op = copy_from_cfl_sds(op_var->data.as_string);
     if (!rule->op) {
         ...
     }
+    /* optional: early operator validation */
+    if (!is_valid_rule_op(rule->op)) {
+        flb_sds_destroy(rule->op);
+        flb_sds_destroy(rule->field);
+        flb_free(rule);
+        return NULL;
+    }

Implement is_valid_rule_op() locally or reuse existing helpers if available.

Also applies to: 564-569, 571-610


1310-1315: Assigning path->route: verify lifetime; guard access.

Linking path->route to a route is correct, but it assumes these route objects live at least as long as input.routes_direct. If routes are later destroyed (e.g., via flb_router_routes_destroy) while paths persist, this becomes a dangling pointer.

  • Confirm lifecycle: when and where are input_routes freed relative to path usage?
  • If not guaranteed, either:
    • Retain routes until paths are torn down, or
    • Store only condition/compiled state on the path, or
    • Null path->route on teardown to make flb_router_path_should_route() return FLB_TRUE (safe default).

Additionally, consider a defensive NULL-check on path retrieved via mk_list_entry_last to avoid undefined behavior if list ops change in the future.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 65722e8 and a4ace4e.

📒 Files selected for processing (6)
  • include/fluent-bit/flb_router.h (3 hunks)
  • src/flb_router.c (2 hunks)
  • src/flb_router_condition.c (4 hunks)
  • src/flb_router_config.c (7 hunks)
  • src/flb_task.c (3 hunks)
  • tests/internal/router_config.c (4 hunks)
🧰 Additional context used
🧬 Code graph analysis (5)
tests/internal/router_config.c (4)
src/flb_log_event_encoder.c (5)
  • flb_log_event_encoder_init (42-74)
  • flb_log_event_encoder_begin_record (246-254)
  • flb_log_event_encoder_destroy (99-116)
  • flb_log_event_encoder_set_current_timestamp (289-292)
  • flb_log_event_encoder_commit_record (256-274)
src/flb_sds.c (2)
  • flb_sds_destroy (389-399)
  • flb_sds_create (78-90)
include/fluent-bit/flb_mem.h (2)
  • flb_free (126-128)
  • flb_calloc (84-96)
src/flb_router_condition.c (2)
  • flb_condition_eval_logs (54-119)
  • flb_router_path_should_route (179-191)
src/flb_task.c (3)
src/flb_router_condition.c (1)
  • flb_router_path_should_route (179-191)
include/fluent-bit/flb_mem.h (1)
  • flb_calloc (84-96)
src/flb_routes_mask.c (1)
  • flb_routes_mask_get_bit (125-140)
src/flb_router_condition.c (4)
src/flb_log_event_decoder.c (4)
  • flb_log_event_decoder_init (99-116)
  • flb_log_event_decoder_read_groups (85-97)
  • flb_log_event_decoder_next (310-406)
  • flb_log_event_decoder_destroy (147-179)
src/flb_mp.c (1)
  • flb_mp_object_to_cfl (899-934)
src/flb_conditionals.c (4)
  • flb_condition_evaluate (359-406)
  • flb_condition_create (219-233)
  • flb_condition_destroy (257-274)
  • flb_condition_add_rule (235-255)
lib/cfl/src/cfl_object.c (1)
  • cfl_object_destroy (95-106)
include/fluent-bit/flb_router.h (1)
src/flb_router_condition.c (1)
  • flb_router_path_should_route (179-191)
src/flb_router_config.c (5)
src/flb_sds.c (1)
  • flb_sds_destroy (389-399)
include/fluent-bit/flb_mem.h (2)
  • flb_free (126-128)
  • flb_calloc (84-96)
src/flb_conditionals.c (1)
  • flb_condition_destroy (257-274)
lib/cfl/include/cfl/cfl_array.h (2)
  • cfl_array_size (46-49)
  • cfl_array_fetch_by_index (36-44)
lib/cfl/src/cfl_kvlist.c (1)
  • cfl_kvlist_fetch (418-421)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (30)
  • GitHub Check: pr-windows-build / call-build-windows-package (Windows 64bit, x64, x64-windows-static, 3.31.6)
  • GitHub Check: pr-windows-build / call-build-windows-package (Windows 32bit, x86, x86-windows-static, 3.31.6)
  • GitHub Check: pr-windows-build / call-build-windows-package (Windows 64bit (Arm64), amd64_arm64, -DCMAKE_SYSTEM_NAME=Windows -DCMA...
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_ARROW=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_ADDRESS=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=Off, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_THREAD=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_THREAD=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=Off, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_ADDRESS=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_COVERAGE=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_UNDEFINED=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_MEMORY=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_MEMORY=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=Off, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_UNDEFINED=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SMALL=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=Off, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SMALL=On, 3.31.6, clang, clang++)
  • GitHub Check: PR - fuzzing test
  • GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, clang, clang++, ubuntu-24.04, clang-14)
  • GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, gcc, g++, ubuntu-24.04, clang-14)
  • GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, gcc, g++, ubuntu-22.04, clang-12)
  • GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, clang, clang++, ubuntu-22.04, clang-12)
  • GitHub Check: pr-compile-without-cxx (3.31.6)
  • GitHub Check: pr-compile-centos-7
🔇 Additional comments (20)
tests/internal/router_config.c (3)

954-1026: Logs condition “eq” test: LGTM.

Covers positive/negative paths with proper encoder lifecycle.


1028-1125: “in” operator test: LGTM.

Exercises multi-value path correctly.


1214-1217: Test registrations: LGTM.

src/flb_task.c (3)

419-454: Direct-route gating: LGTM.

Correctly filters direct paths and initializes routes; drops empty-direct tasks to save work.

Please confirm that flb_router_path->route remains valid for the engine lifetime (or until the path is removed). Otherwise flb_router_path_should_route() could dereference a stale pointer if routes are freed (e.g., via flb_router_routes_destroy) while paths persist.


444-450: Early drop path: OK.

Cleanup is consistent (NULLing event_chunk->data before destroy). No further action.


467-472: Minor consistency nit.

Both code paths now use flb_calloc for struct flb_task_route; good consistency.

include/fluent-bit/flb_router.h (4)

34-37: route back-reference on path: LGTM.

Enables content-aware routing decisions at task time.


75-82: Multi-value rule support: LGTM.

values/values_count fields are appropriate for in/not_in.


84-90: Condition fields for op/compiled state: LGTM.

Matches compiled-eval flow.


146-148: New API declaration: LGTM.

src/flb_router_config.c (3)

32-32: Conditionals include: LGTM.


294-297: Compiled-condition cleanup: LGTM.

Ensures compiled objects are destroyed with the route condition.


647-671: Condition op parsing: LGTM.

Default AND, accepts “and”/“or”, rejects others.

src/flb_router_condition.c (7)

152-164: Default-route semantics vs signals mask: confirm order

Currently, signals filtering runs before default-route short‑circuiting. If the intent is “default routes bypass rule evaluation but still honor the route’s signals,” this is correct. If default routes should catch any signal, move the is_default check before the signals mask.

Option if default should ignore signals:

- signal = flb_router_signal_from_chunk(chunk);
- if (signal == 0) {
-     return FLB_FALSE;
- }
-
- if ((route->signals != 0) && (route->signals != FLB_ROUTER_SIGNAL_ANY) && ((route->signals & signal) == 0)) {
-     return FLB_FALSE;
- }
-
- if (route->condition->is_default) {
+ if (route->condition->is_default) {
     return FLB_TRUE;
 }
+
+ signal = flb_router_signal_from_chunk(chunk);
+ if (signal == 0) {
+     return FLB_FALSE;
+ }
+ if ((route->signals != 0) && (route->signals != FLB_ROUTER_SIGNAL_ANY) && ((route->signals & signal) == 0)) {
+     return FLB_FALSE;
+ }

Also applies to: 157-159


179-191: Path-level routing gate looks good

Null path → false, path without route → true, otherwise delegate to route evaluation. Matches the intended behavior.


193-235: Operator parsing is fine; ensure portability is covered by headers

The mapping is clear and case‑insensitive. The earlier header fix covers strcasecmp. No further changes required.


237-254: Numeric parsing is strict; OK once headers are added

The ERANGE and trailing‑garbage checks are solid. After adding stdlib.h/errno.h, this is good.


303-309: Numeric value lifetime: confirm add_rule copies the value

You pass &numeric_value (stack address) to flb_condition_add_rule(). If add_rule/rule_create does not deep‑copy the double, this becomes a dangling pointer.

If it does not copy, apply:

-            ret = flb_condition_add_rule(compiled, rule->field, op,
-                                         &numeric_value, 1, RECORD_CONTEXT_BODY);
+            double *nv = flb_malloc(sizeof(*nv));
+            if (nv == NULL) {
+                cfl_errno();
+                flb_condition_destroy(compiled);
+                return NULL;
+            }
+            *nv = numeric_value;
+            ret = flb_condition_add_rule(compiled, rule->field, op,
+                                         nv, 1, RECORD_CONTEXT_BODY);

316-321: IN/NOT_IN values ownership: confirm deep copy

rule->values is passed directly. Please confirm flb_condition_add_rule/rule_create duplicates the array and its elements. Otherwise, the route’s config buffers must outlive condition use.


360-375: Record cleanup helper looks correct

Properly destroys CFL objects and nulls pointers. LGTM.

Comment on lines 20 to 27
#include <fluent-bit/flb_mem.h>
#include <fluent-bit/flb_log.h>
#include <fluent-bit/flb_router.h>
#include <fluent-bit/flb_conditionals.h>
#include <fluent-bit/flb_log_event_decoder.h>
#include <fluent-bit/flb_mp.h>
#include <fluent-bit/flb_mp_chunk.h>

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Add missing standard headers for strcasecmp/strtod/errno

This file uses strcasecmp, strtod and errno but doesn’t include their headers. Some platforms won’t get these via transitive includes, causing build failures.

Apply:

 #include <fluent-bit/flb_mp_chunk.h>
 
+#include <strings.h>  /* strcasecmp */
+#include <stdlib.h>   /* strtod */
+#include <errno.h>    /* errno, ERANGE */
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
#include <fluent-bit/flb_mem.h>
#include <fluent-bit/flb_log.h>
#include <fluent-bit/flb_router.h>
#include <fluent-bit/flb_conditionals.h>
#include <fluent-bit/flb_log_event_decoder.h>
#include <fluent-bit/flb_mp.h>
#include <fluent-bit/flb_mp_chunk.h>
#include <fluent-bit/flb_mem.h>
#include <fluent-bit/flb_log.h>
#include <fluent-bit/flb_router.h>
#include <fluent-bit/flb_conditionals.h>
#include <fluent-bit/flb_log_event_decoder.h>
#include <fluent-bit/flb_mp.h>
#include <fluent-bit/flb_mp_chunk.h>
#include <strings.h> /* strcasecmp */
#include <stdlib.h> /* strtod */
#include <errno.h> /* errno, ERANGE */
🤖 Prompt for AI Agents
In src/flb_router_condition.c around lines 20 to 27, the file uses strcasecmp,
strtod and errno but lacks their standard headers; add the missing includes at
the top with the other headers: include <string.h> for strcasecmp, <stdlib.h>
for strtod, and <errno.h> for errno so builds on platforms without transitive
includes succeed.

Comment on lines 85 to 86
flb_log_event_decoder_read_groups(&decoder, FLB_TRUE);

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Avoid reading group meta records; skip them and check the return

Setting read_groups = TRUE surfaces group open/close meta records which can be (incorrectly) evaluated by conditions. Decoder already propagates group context to normal records without needing read_groups = TRUE.

- flb_log_event_decoder_read_groups(&decoder, FLB_TRUE);
+ if (flb_log_event_decoder_read_groups(&decoder, FLB_FALSE) != 0) {
+     flb_log_event_decoder_destroy(&decoder);
+     return FLB_FALSE;
+ }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
flb_log_event_decoder_read_groups(&decoder, FLB_TRUE);
if (flb_log_event_decoder_read_groups(&decoder, FLB_FALSE) != 0) {
flb_log_event_decoder_destroy(&decoder);
return FLB_FALSE;
}
🤖 Prompt for AI Agents
In src/flb_router_condition.c around lines 85-86, the call to
flb_log_event_decoder_read_groups(&decoder, FLB_TRUE) causes group open/close
meta records to be surfaced and wrongly evaluated; change this to disable
reading group meta records (use FLB_FALSE / remove the read_groups enable) so
the decoder only propagates group context to normal records, and check the
function's return value for errors—handle a non-success return by logging an
error and returning/propagating failure appropriately.

Copy link

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
src/flb_router_config.c (2)

503-510: Fix processor cleanup: wrong destructor used on properties list (type confusion/UAF).

On add_processor_properties() failure you call route_processors_destroy(&processor->properties), but that function expects a list of struct flb_route_processor, not struct flb_route_processor_property. This can corrupt memory. Clean up the properties list explicitly instead.

@@
-        if (parse_processors(processors_var, &route->processors, config) != 0) {
-            if (route->condition) {
-                route_condition_destroy(route->condition);
-            }
-            flb_sds_destroy(route->name);
-            flb_free(route);
-            return -1;
-        }
+        if (parse_processors(processors_var, &route->processors, config) != 0) {
+            if (route->condition) {
+                route_condition_destroy(route->condition);
+            }
+            flb_sds_destroy(route->name);
+            flb_free(route);
+            return -1;
+        }
@@
-        if (add_processor_properties(processor, kvlist) != 0) {
-            route_processors_destroy(&processor->properties);
-            if (processor->name) {
-                flb_sds_destroy(processor->name);
-            }
-            flb_free(processor);
-            return -1;
-        }
+        if (add_processor_properties(processor, kvlist) != 0) {
+            /* Clean up properties list correctly (list of properties, not processors) */
+            struct cfl_list *p_head;
+            struct cfl_list *p_tmp;
+            struct flb_route_processor_property *prop;
+            cfl_list_foreach_safe(p_head, p_tmp, &processor->properties) {
+                prop = cfl_list_entry(p_head, struct flb_route_processor_property, _head);
+                cfl_list_del(&prop->_head);
+                if (prop->key) { flb_sds_destroy(prop->key); }
+                if (prop->value) { flb_sds_destroy(prop->value); }
+                flb_free(prop);
+            }
+            if (processor->name) {
+                flb_sds_destroy(processor->name);
+            }
+            flb_free(processor);
+            return -1;
+        }

Optionally, introduce a static helper route_processor_destroy() to reuse in other paths.


1231-1248: Signal mask bug: ANY check matches every non-zero mask.

(signals & FLB_ROUTER_SIGNAL_ANY) is true for any non-zero signals since ANY is a bitwise union. Outputs will be accepted even when they don’t support requested signals.

-    /* Handle ANY signal - if ANY is present in the bitmask, allow all signals */
-    if (signals == 0 || (signals & FLB_ROUTER_SIGNAL_ANY)) {
+    /* Handle ANY signal: no specific signals or explicit 'any' */
+    if (signals == 0 || signals == FLB_ROUTER_SIGNAL_ANY) {
         return FLB_TRUE;
     }
src/flb_mp.c (1)

1556-1560: Avoid double list removal in record destroy.

flb_router_condition.c already cfl_list_del() before calling flb_mp_chunk_cobj_record_destroy(NULL, rec). Your destructor always calls cfl_list_del(), causing double-del and list corruption.

-    cfl_list_del(&record->_head);
+    if (chunk_cobj) {
+        cfl_list_del(&record->_head);
+    }
     flb_free(record);

Alternatively, ensure callers never cfl_list_del() and let destroy handle it, but the guarded approach is safer across call sites.

♻️ Duplicate comments (4)
src/flb_router_config.c (1)

1205-1223: Do not dedup direct routes by output only; allow multiple routes per output.

Current input_has_direct_route() drops later routes to the same output, breaking common configs (e.g., error and default routes pointing to one output). This issue was already raised earlier and still applies. Include route identity in deduplication.

@@
-static int input_has_direct_route(struct flb_input_instance *in,
-                                  struct flb_output_instance *out)
+static int input_has_direct_route(struct flb_input_instance *in,
+                                  struct flb_output_instance *out,
+                                  struct flb_route *route)
 {
@@
-        if (path->ins == out) {
-            return FLB_TRUE;
-        }
+        if (path->ins == out) {
+            /* Allow multiple paths to the same output as long as they are for different routes */
+            if (path->route == NULL || path->route == route) {
+                return FLB_TRUE;
+            }
+        }
@@
-                if (input_has_direct_route(input_ins, output_ins)) {
+                if (input_has_direct_route(input_ins, output_ins, route)) {
                     continue;
                 }
@@
                 if (flb_router_connect_direct(input_ins, output_ins) == 0) {
                     struct flb_router_path *path;
 
                     path = mk_list_entry_last(&input_ins->routes_direct,
                                               struct flb_router_path, _head);
                     path->route = route;

This preserves dedup for exact same route/out pair while enabling multiple conditional/default routes to share the output.

Also applies to: 1314-1324

src/flb_router_condition.c (3)

20-27: Add missing standard headers (build break across platforms).

This file uses strcasecmp, strtod, errno, and strlen but doesn’t include their headers. Add them to avoid implicit declarations and portability issues.

 #include <fluent-bit/flb_mp_chunk.h>
 
+/* std headers required by this file */
+#include <string.h>   /* strlen */
+#include <strings.h>  /* strcasecmp */
+#include <stdlib.h>   /* strtod */
+#include <errno.h>    /* errno, ERANGE */

102-109: Don’t surface group meta records; disable read_groups and check return.

Evaluating group open/close meta records can cause false matches. Disable them and handle errors.

-        flb_log_event_decoder_read_groups(context->log_decoder, FLB_TRUE);
+        if (flb_log_event_decoder_read_groups(context->log_decoder, FLB_FALSE) != 0) {
+            flb_router_chunk_context_reset(context);
+            return -1;
+        }

630-653: Racy lazy compilation; synchronize or precompile to avoid data races.

route_condition_get_compiled() is unsynchronized; multiple workers can compile and assign concurrently, leading to leaks or corruption. Precompile during config load (preferred), or guard lazy init with a lock.

Minimal lock around lazy compile:

+#include <pthread.h>
+static pthread_mutex_t g_route_cond_compile_lock = PTHREAD_MUTEX_INITIALIZER;
 ...
 static struct flb_condition *route_condition_get_compiled(struct flb_route_condition *condition)
 {
     if (!condition) {
         return NULL;
     }
@@
-    condition->compiled = route_condition_compile(condition);
-    if (!condition->compiled) {
-        condition->compiled_status = FLB_ROUTE_CONDITION_COMPILED_FAILURE;
-        return NULL;
-    }
-
-    condition->compiled_status = FLB_ROUTE_CONDITION_COMPILED_SUCCESS;
-    return condition->compiled;
+    pthread_mutex_lock(&g_route_cond_compile_lock);
+    if (condition->compiled_status == FLB_ROUTE_CONDITION_COMPILED_SUCCESS && condition->compiled) {
+        pthread_mutex_unlock(&g_route_cond_compile_lock);
+        return condition->compiled;
+    }
+    condition->compiled = route_condition_compile(condition);
+    if (!condition->compiled) {
+        condition->compiled_status = FLB_ROUTE_CONDITION_COMPILED_FAILURE;
+        pthread_mutex_unlock(&g_route_cond_compile_lock);
+        return NULL;
+    }
+    condition->compiled_status = FLB_ROUTE_CONDITION_COMPILED_SUCCESS;
+    pthread_mutex_unlock(&g_route_cond_compile_lock);
+    return condition->compiled;
 }

Alternative: compile eagerly when parsing the route and drop lazy compile here.

🧹 Nitpick comments (6)
src/flb_task.c (1)

398-406: Initialize router context lazily only when needed (direct routes path).

Small perf cleanup: avoid creating/destroying router_context when there are no direct routes or for non-log signals.

@@
-    if (flb_router_chunk_context_init(&router_context) != 0) {
-        flb_error("[task] failed to initialize router chunk context");
-        flb_event_chunk_destroy(evc);
-        flb_free(task);
-        *err = FLB_TRUE;
-        return NULL;
-    }
-    router_context_initialized = FLB_TRUE;
@@
-    if (mk_list_size(&i_ins->routes_direct) > 0) {
+    if (mk_list_size(&i_ins->routes_direct) > 0) {
+        if (flb_router_chunk_context_init(&router_context) != 0) {
+            flb_error("[task] failed to initialize router chunk context");
+            flb_event_chunk_destroy(evc);
+            flb_free(task);
+            *err = FLB_TRUE;
+            return NULL;
+        }
+        router_context_initialized = FLB_TRUE;
@@
-        if (router_context_initialized) {
+        if (router_context_initialized) {
             flb_router_chunk_context_destroy(&router_context);
             router_context_initialized = FLB_FALSE;
         }

Also applies to: 429-479, 512-524

src/flb_router_condition.c (5)

356-384: Prefer skipping only the bad record instead of aborting the whole route chunk.

On a single record’s encode/convert failure you return -1, dropping all prior matches. Safer UX is to log and continue with remaining records.

If a conversion/encode fails:

  • Reset the in-progress record (e.g., flb_log_event_encoder_reset_record if available) and continue; or
  • Guard converts with “if (…) { … } else { continue; }” and only commit successful ones.

Let me know if you want a concrete patch once you confirm the preferred reset API.


291-294: Use flb_sds_len for tag length (avoid strlen on SDS).

Minor perf/clarity win and avoids surprises if tags ever contain embedded NULs.

-    new_chunk = flb_event_chunk_create(chunk->type, match_count,
-                                      chunk->tag, strlen(chunk->tag),
+    new_chunk = flb_event_chunk_create(chunk->type, match_count,
+                                      chunk->tag, flb_sds_len(chunk->tag),
                                       out_buf, out_size);

Add include:

 #include <fluent-bit/flb_mp_chunk.h>
+#include <fluent-bit/flb_sds.h>

91-94: Treat empty log chunks as a no-op instead of an error.

Returning -1 for empty data short-circuits routing; consider returning 0 so evaluation proceeds gracefully.

-    if (!context || !chunk) {
+    if (!context || !chunk) {
         return -1;
     }
 ...
-    if (!chunk->data || chunk->size == 0) {
-        return -1;
-    }
+    if (!chunk->data || chunk->size == 0) {
+        return 0; /* nothing to prepare */
+    }

471-486: Avoid unnecessary decode when path has no route.

If path->route is NULL you return TRUE; move that check before preparing the logs context to save work.

-    if (chunk && chunk->type == FLB_EVENT_TYPE_LOGS) {
-        if (!context) {
-            return FLB_FALSE;
-        }
-        if (flb_router_chunk_context_prepare_logs(context, chunk) != 0) {
-            return FLB_FALSE;
-        }
-    }
-
-    if (!path->route) {
-        return FLB_TRUE;
-    }
+    if (!path->route) {
+        return FLB_TRUE;
+    }
+
+    if (chunk && chunk->type == FLB_EVENT_TYPE_LOGS) {
+        if (!context) {
+            return FLB_FALSE;
+        }
+        if (flb_router_chunk_context_prepare_logs(context, chunk) != 0) {
+            return FLB_FALSE;
+        }
+    }

317-323: Unused parameters in flb_router_create_chunk_from_records.

tag and event_type aren’t used in this function. Either remove them or document why they’re needed.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a4ace4e and 7be7826.

📒 Files selected for processing (8)
  • include/fluent-bit/flb_mp_chunk.h (1 hunks)
  • include/fluent-bit/flb_router.h (4 hunks)
  • src/flb_event.c (0 hunks)
  • src/flb_mp.c (1 hunks)
  • src/flb_router_condition.c (3 hunks)
  • src/flb_router_config.c (9 hunks)
  • src/flb_task.c (6 hunks)
  • tests/internal/router_config.c (5 hunks)
💤 Files with no reviewable changes (1)
  • src/flb_event.c
🚧 Files skipped from review as they are similar to previous changes (1)
  • tests/internal/router_config.c
🧰 Additional context used
🧬 Code graph analysis (6)
src/flb_task.c (4)
src/flb_router_condition.c (3)
  • flb_router_chunk_context_init (33-44)
  • flb_router_path_should_route (463-486)
  • flb_router_chunk_context_destroy (68-71)
src/flb_event.c (1)
  • flb_event_chunk_destroy (69-79)
include/fluent-bit/flb_mem.h (2)
  • flb_free (126-128)
  • flb_calloc (84-96)
src/flb_routes_mask.c (1)
  • flb_routes_mask_get_bit (125-140)
include/fluent-bit/flb_mp_chunk.h (1)
src/flb_mp.c (1)
  • flb_mp_chunk_record_dup (1438-1511)
src/flb_router_config.c (5)
src/flb_sds.c (1)
  • flb_sds_destroy (389-399)
include/fluent-bit/flb_mem.h (2)
  • flb_free (126-128)
  • flb_calloc (84-96)
src/flb_conditionals.c (1)
  • flb_condition_destroy (257-274)
lib/cfl/include/cfl/cfl_array.h (2)
  • cfl_array_size (46-49)
  • cfl_array_fetch_by_index (36-44)
lib/cfl/src/cfl_kvlist.c (1)
  • cfl_kvlist_fetch (418-421)
src/flb_router_condition.c (6)
src/flb_mp.c (5)
  • flb_mp_chunk_cobj_create (1065-1087)
  • flb_mp_chunk_cobj_record_next (1243-1435)
  • flb_mp_chunk_record_dup (1438-1511)
  • flb_mp_chunk_cobj_record_destroy (1513-1560)
  • flb_mp_cfl_to_msgpack (1017-1045)
src/flb_log_event_decoder.c (4)
  • flb_log_event_decoder_destroy (147-179)
  • flb_log_event_decoder_create (118-145)
  • flb_log_event_decoder_read_groups (85-97)
  • flb_log_event_decoder_reset (67-83)
src/flb_log_event_encoder.c (5)
  • flb_log_event_encoder_destroy (99-116)
  • flb_log_event_encoder_create (76-97)
  • flb_log_event_encoder_begin_record (246-254)
  • flb_log_event_encoder_set_timestamp (276-287)
  • flb_log_event_encoder_commit_record (256-274)
src/flb_conditionals.c (4)
  • flb_condition_evaluate (359-406)
  • flb_condition_create (219-233)
  • flb_condition_destroy (257-274)
  • flb_condition_add_rule (235-255)
src/flb_event.c (1)
  • flb_event_chunk_create (26-57)
include/fluent-bit/flb_mem.h (1)
  • flb_free (126-128)
include/fluent-bit/flb_router.h (1)
src/flb_router_condition.c (12)
  • flb_router_chunk_context_init (33-44)
  • flb_router_chunk_context_reset (46-66)
  • flb_router_chunk_context_destroy (68-71)
  • flb_router_chunk_context_prepare_logs (73-134)
  • flb_route_condition_eval (422-461)
  • flb_condition_eval_logs (156-195)
  • flb_condition_eval_logs_per_record (198-244)
  • flb_router_route_per_record (247-315)
  • flb_router_create_chunk_from_records (318-398)
  • flb_condition_eval_metrics (400-409)
  • flb_condition_eval_traces (411-420)
  • flb_router_path_should_route (463-486)
src/flb_mp.c (2)
include/fluent-bit/flb_mem.h (2)
  • flb_calloc (84-96)
  • flb_free (126-128)
lib/cfl/src/cfl_object.c (1)
  • cfl_object_destroy (95-106)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (27)
  • GitHub Check: pr-windows-build / call-build-windows-package (Windows 64bit (Arm64), amd64_arm64, -DCMAKE_SYSTEM_NAME=Windows -DCMA...
  • GitHub Check: pr-windows-build / call-build-windows-package (Windows 32bit, x86, x86-windows-static, 3.31.6)
  • GitHub Check: pr-windows-build / call-build-windows-package (Windows 64bit, x64, x64-windows-static, 3.31.6)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_THREAD=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_THREAD=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=Off, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=Off, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SIMD=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_ARROW=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SMALL=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=Off, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_MEMORY=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_UNDEFINED=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SANITIZE_MEMORY=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_ADDRESS=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=On, 3.31.6, gcc, g++)
  • GitHub Check: run-ubuntu-unit-tests (-DSANITIZE_UNDEFINED=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_SMALL=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=On, 3.31.6, clang, clang++)
  • GitHub Check: run-ubuntu-unit-tests (-DFLB_JEMALLOC=Off, 3.31.6, gcc, g++)
  • GitHub Check: PR - fuzzing test
  • GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, clang, clang++, ubuntu-22.04, clang-12)
  • GitHub Check: pr-compile-without-cxx (3.31.6)
  • GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, clang, clang++, ubuntu-24.04, clang-14)
  • GitHub Check: pr-compile-system-libs (-DFLB_PREFER_SYSTEM_LIBS=On, 3.31.6, gcc, g++, ubuntu-24.04, clang-14)
  • GitHub Check: pr-compile-centos-7
🔇 Additional comments (5)
src/flb_router_config.c (1)

280-289: LGTM on condition memory management and path->route association.

  • Properly free rule->values and compiled condition in route_condition_destroy.
  • Initialize rule->values/values_count.
  • Validate/marshal array values safely.
  • Default op=AND and op parsing are correct.
  • Associate created path with route via path->route.

Ensure route_condition_get_compiled() destroys/refreshes compiled on config reloads to avoid stale pointers.

Also applies to: 294-297, 548-550, 564-611, 613-618, 647-650, 653-672, 1319-1324

include/fluent-bit/flb_router.h (1)

45-46: Header additions look consistent.

  • route pointer on flb_router_path
  • condition values/op/compiled fields
  • per_record_routing and context APIs

Also applies to: 85-101, 120-128, 149-174, 180-183

include/fluent-bit/flb_mp_chunk.h (1)

63-64: New duplication API declaration is fine.

Matches implementation intent for per-record routing. Ensure ownership semantics are documented (caller frees via flb_mp_chunk_cobj_record_destroy).

Document whether the duplicate's group objects are owned by the record (they should be).

src/flb_router_condition.c (2)

281-286: The NULL parameter is intentionally supported for standalone duplicated records.

The implementation of flb_mp_chunk_cobj_record_destroy() properly handles NULL as the first parameter. All dereferences of chunk_cobj are guarded by conditional checks (lines 1519, 1523–1525). The actual record cleanup (lines 1537–1546) destroys the record's own object copies (cobj_metadata, cobj_record, cobj_group_metadata) independently of chunk_cobj.

Since flb_mp_chunk_record_dup() creates independent record copies with duplicated object data (via cfl_object_dup()), these duplicates own their resources and do not depend on chunk_cobj for cleanup. Passing NULL is correct for these standalone records and matches the existing usage pattern in plugins/processor_opentelemetry_envelope/otel_envelope.c:137.


598-604: Review comment is incorrect—no dangling pointer issue exists.

The concern about numeric_value being a stack variable with a dangling pointer is unfounded. In rule_create (src/flb_conditionals.c:128-131), the numeric value is handled via:

rule->value.num_val = *(double *)value;

This dereferences the pointer and copies the value into the num_val union member. The stack variable's contents are copied into the allocated rule structure—the pointer is never stored. Once the copy completes, the stack variable going out of scope causes no issue.

This is safe memory management and requires no allocation or deallocation changes.

src/flb_mp.c Outdated
Comment on lines 1437 to 1511
/* Duplicate a chunk record for per-record routing */
struct flb_mp_chunk_record *flb_mp_chunk_record_dup(struct flb_mp_chunk_record *src)
{
struct flb_mp_chunk_record *dup;

if (!src) {
return NULL;
}

dup = flb_calloc(1, sizeof(struct flb_mp_chunk_record));
if (!dup) {
return NULL;
}

/* Copy basic fields */
dup->event = src->event;
dup->owns_group_metadata = FLB_FALSE;
dup->owns_group_attributes = FLB_FALSE;

/* Duplicate metadata if present */
if (src->cobj_metadata) {
dup->cobj_metadata = cfl_object_dup(src->cobj_metadata);
if (!dup->cobj_metadata) {
flb_free(dup);
return NULL;
}
}

/* Duplicate record body if present */
if (src->cobj_record) {
dup->cobj_record = cfl_object_dup(src->cobj_record);
if (!dup->cobj_record) {
if (dup->cobj_metadata) {
cfl_object_destroy(dup->cobj_metadata);
}
flb_free(dup);
return NULL;
}
}

/* Duplicate group metadata if present */
if (src->cobj_group_metadata) {
dup->cobj_group_metadata = cfl_object_dup(src->cobj_group_metadata);
if (!dup->cobj_group_metadata) {
if (dup->cobj_metadata) {
cfl_object_destroy(dup->cobj_metadata);
}
if (dup->cobj_record) {
cfl_object_destroy(dup->cobj_record);
}
flb_free(dup);
return NULL;
}
}

/* Duplicate group attributes if present */
if (src->cobj_group_attributes) {
dup->cobj_group_attributes = cfl_object_dup(src->cobj_group_attributes);
if (!dup->cobj_group_attributes) {
if (dup->cobj_metadata) {
cfl_object_destroy(dup->cobj_metadata);
}
if (dup->cobj_record) {
cfl_object_destroy(dup->cobj_record);
}
if (dup->cobj_group_metadata) {
cfl_object_destroy(dup->cobj_group_metadata);
}
flb_free(dup);
return NULL;
}
}

return dup;
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Leak: duplicated group objects not freed (owns_group_ not set).*

flb_mp_chunk_record_dup() deep-copies cobj_group_metadata/attributes but leaves owns_group_metadata/owns_group_attributes = FLB_FALSE, so the destructor won’t free them.

@@
-    dup->owns_group_metadata = FLB_FALSE;
-    dup->owns_group_attributes = FLB_FALSE;
+    dup->owns_group_metadata = FLB_FALSE;
+    dup->owns_group_attributes = FLB_FALSE;
@@
     if (src->cobj_group_metadata) {
         dup->cobj_group_metadata = cfl_object_dup(src->cobj_group_metadata);
         if (!dup->cobj_group_metadata) {
@@
         }
+        dup->owns_group_metadata = FLB_TRUE;
     }
@@
     if (src->cobj_group_attributes) {
         dup->cobj_group_attributes = cfl_object_dup(src->cobj_group_attributes);
         if (!dup->cobj_group_attributes) {
@@
         }
+        dup->owns_group_attributes = FLB_TRUE;
     }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/* Duplicate a chunk record for per-record routing */
struct flb_mp_chunk_record *flb_mp_chunk_record_dup(struct flb_mp_chunk_record *src)
{
struct flb_mp_chunk_record *dup;
if (!src) {
return NULL;
}
dup = flb_calloc(1, sizeof(struct flb_mp_chunk_record));
if (!dup) {
return NULL;
}
/* Copy basic fields */
dup->event = src->event;
dup->owns_group_metadata = FLB_FALSE;
dup->owns_group_attributes = FLB_FALSE;
/* Duplicate metadata if present */
if (src->cobj_metadata) {
dup->cobj_metadata = cfl_object_dup(src->cobj_metadata);
if (!dup->cobj_metadata) {
flb_free(dup);
return NULL;
}
}
/* Duplicate record body if present */
if (src->cobj_record) {
dup->cobj_record = cfl_object_dup(src->cobj_record);
if (!dup->cobj_record) {
if (dup->cobj_metadata) {
cfl_object_destroy(dup->cobj_metadata);
}
flb_free(dup);
return NULL;
}
}
/* Duplicate group metadata if present */
if (src->cobj_group_metadata) {
dup->cobj_group_metadata = cfl_object_dup(src->cobj_group_metadata);
if (!dup->cobj_group_metadata) {
if (dup->cobj_metadata) {
cfl_object_destroy(dup->cobj_metadata);
}
if (dup->cobj_record) {
cfl_object_destroy(dup->cobj_record);
}
flb_free(dup);
return NULL;
}
}
/* Duplicate group attributes if present */
if (src->cobj_group_attributes) {
dup->cobj_group_attributes = cfl_object_dup(src->cobj_group_attributes);
if (!dup->cobj_group_attributes) {
if (dup->cobj_metadata) {
cfl_object_destroy(dup->cobj_metadata);
}
if (dup->cobj_record) {
cfl_object_destroy(dup->cobj_record);
}
if (dup->cobj_group_metadata) {
cfl_object_destroy(dup->cobj_group_metadata);
}
flb_free(dup);
return NULL;
}
}
return dup;
}
/* Duplicate a chunk record for per-record routing */
struct flb_mp_chunk_record *flb_mp_chunk_record_dup(struct flb_mp_chunk_record *src)
{
struct flb_mp_chunk_record *dup;
if (!src) {
return NULL;
}
dup = flb_calloc(1, sizeof(struct flb_mp_chunk_record));
if (!dup) {
return NULL;
}
/* Copy basic fields */
dup->event = src->event;
dup->owns_group_metadata = FLB_FALSE;
dup->owns_group_attributes = FLB_FALSE;
/* Duplicate metadata if present */
if (src->cobj_metadata) {
dup->cobj_metadata = cfl_object_dup(src->cobj_metadata);
if (!dup->cobj_metadata) {
flb_free(dup);
return NULL;
}
}
/* Duplicate record body if present */
if (src->cobj_record) {
dup->cobj_record = cfl_object_dup(src->cobj_record);
if (!dup->cobj_record) {
if (dup->cobj_metadata) {
cfl_object_destroy(dup->cobj_metadata);
}
flb_free(dup);
return NULL;
}
}
/* Duplicate group metadata if present */
if (src->cobj_group_metadata) {
dup->cobj_group_metadata = cfl_object_dup(src->cobj_group_metadata);
if (!dup->cobj_group_metadata) {
if (dup->cobj_metadata) {
cfl_object_destroy(dup->cobj_metadata);
}
if (dup->cobj_record) {
cfl_object_destroy(dup->cobj_record);
}
flb_free(dup);
return NULL;
}
dup->owns_group_metadata = FLB_TRUE;
}
/* Duplicate group attributes if present */
if (src->cobj_group_attributes) {
dup->cobj_group_attributes = cfl_object_dup(src->cobj_group_attributes);
if (!dup->cobj_group_attributes) {
if (dup->cobj_metadata) {
cfl_object_destroy(dup->cobj_metadata);
}
if (dup->cobj_record) {
cfl_object_destroy(dup->cobj_record);
}
if (dup->cobj_group_metadata) {
cfl_object_destroy(dup->cobj_group_metadata);
}
flb_free(dup);
return NULL;
}
dup->owns_group_attributes = FLB_TRUE;
}
return dup;
}

Comment on lines 344 to 391
ret = flb_log_event_encoder_begin_record(encoder);
if (ret == -1) {
flb_log_event_encoder_destroy(encoder);
return -1;
}

ret = flb_log_event_encoder_set_timestamp(encoder, &record->event.timestamp);
if (ret == -1) {
flb_log_event_encoder_destroy(encoder);
return -1;
}

/* Encode metadata */
if (record->cobj_metadata) {
ret = flb_mp_cfl_to_msgpack(record->cobj_metadata, &mp_buf, &mp_size);
if (ret == -1) {
flb_log_event_encoder_destroy(encoder);
return -1;
}
ret = flb_log_event_encoder_set_metadata_from_raw_msgpack(encoder, mp_buf, mp_size);
flb_free(mp_buf);
if (ret != FLB_EVENT_ENCODER_SUCCESS) {
flb_log_event_encoder_destroy(encoder);
return -1;
}
}

/* Encode record body */
if (record->cobj_record) {
ret = flb_mp_cfl_to_msgpack(record->cobj_record, &mp_buf, &mp_size);
if (ret == -1) {
flb_log_event_encoder_destroy(encoder);
return -1;
}
ret = flb_log_event_encoder_set_body_from_raw_msgpack(encoder, mp_buf, mp_size);
flb_free(mp_buf);
if (ret != FLB_EVENT_ENCODER_SUCCESS) {
flb_log_event_encoder_destroy(encoder);
return -1;
}
}

ret = flb_log_event_encoder_commit_record(encoder);
if (ret == -1) {
flb_log_event_encoder_destroy(encoder);
return -1;
}
}
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

Encoder API checks are wrong; compare against FLB_EVENT_ENCODER_SUCCESS.

flb_log_event_encoder_* functions don’t return -1 reliably. Use the FLB_EVENT_ENCODER_SUCCESS constant to detect errors.

-        ret = flb_log_event_encoder_begin_record(encoder);
-        if (ret == -1) {
+        ret = flb_log_event_encoder_begin_record(encoder);
+        if (ret != FLB_EVENT_ENCODER_SUCCESS) {
             flb_log_event_encoder_destroy(encoder);
             return -1;
         }
 
-        ret = flb_log_event_encoder_set_timestamp(encoder, &record->event.timestamp);
-        if (ret == -1) {
+        ret = flb_log_event_encoder_set_timestamp(encoder, &record->event.timestamp);
+        if (ret != FLB_EVENT_ENCODER_SUCCESS) {
             flb_log_event_encoder_destroy(encoder);
             return -1;
         }
 ...
-        ret = flb_log_event_encoder_commit_record(encoder);
-        if (ret == -1) {
+        ret = flb_log_event_encoder_commit_record(encoder);
+        if (ret != FLB_EVENT_ENCODER_SUCCESS) {
             flb_log_event_encoder_destroy(encoder);
             return -1;
         }
🤖 Prompt for AI Agents
In src/flb_router_condition.c around lines 344 to 391, the code is checking
flb_log_event_encoder_* return values against -1 but the encoder API signals
success via FLB_EVENT_ENCODER_SUCCESS; update all checks that compare return
values to -1 to instead compare against FLB_EVENT_ENCODER_SUCCESS (i.e. treat
any return != FLB_EVENT_ENCODER_SUCCESS as an error), and keep the existing
cleanup (flb_log_event_encoder_destroy and flb_free) and early returns intact.

Comment on lines 895 to 902
/* Parse per_record_routing option */
{
struct cfl_variant *per_record_var = cfl_kvlist_fetch(kvlist, "per_record_routing");
if (per_record_var && per_record_var->type == CFL_VARIANT_BOOLEAN) {
route->per_record_routing = per_record_var->data.as_boolean;
}
}

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🔴 Critical

🧩 Analysis chain

per_record_routing parsing uses non-existent CFL_VARIANT_BOOLEAN/as_boolean.

This will not compile and won’t accept string booleans. Use variant_to_bool() or CFL_VARIANT_BOOL/as_bool.

-    {
-        struct cfl_variant *per_record_var = cfl_kvlist_fetch(kvlist, "per_record_routing");
-        if (per_record_var && per_record_var->type == CFL_VARIANT_BOOLEAN) {
-            route->per_record_routing = per_record_var->data.as_boolean;
-        }
-    }
+    {
+        struct cfl_variant *per_record_var = cfl_kvlist_fetch(kvlist, "per_record_routing");
+        int prr;
+        if (per_record_var && variant_to_bool(per_record_var, &prr) == 0) {
+            route->per_record_routing = prr;
+        }
+    }

🏁 Script executed:

#!/bin/bash
# Find any other accidental uses of non-existent CFL_VARIANT_BOOLEAN/as_boolean
rg -n 'CFL_VARIANT_BOOLEAN|as_boolean|BOOLEAN' -S

Length of output: 14519



🏁 Script executed:

#!/bin/bash
# Search for CFL variant type definitions and how booleans are handled
rg -n 'CFL_VARIANT_BOOL|variant_to_bool|CFL_VARIANT_' -C 2 --type c | head -100

Length of output: 5138


🏁 Script executed:

#!/bin/bash
# Find cfl header files and check for variant type definitions
fd -e h 'cfl' | xargs rg -l 'CFL_VARIANT' | head -10

Length of output: 33


🏁 Script executed:

#!/bin/bash
# Search for other uses of cfl_kvlist_fetch to see the pattern
rg -n 'cfl_kvlist_fetch' -C 3 --type c | head -80

Length of output: 4362



🏁 Script executed:

#!/bin/bash
# Check the CFL variant header for correct definitions
cat lib/cfl/include/cfl/cfl_variant.h | head -150

Length of output: 2975


🏁 Script executed:

#!/bin/bash
# Search for variant_to_bool function definition/usage
rg -n 'variant_to_bool' --type c -C 3

Length of output: 661


CFL_VARIANT_BOOLEAN and as_boolean do not exist; use CFL_VARIANT_BOOL with variant_to_bool().

The code at lines 898-899 references non-existent CFL variant type and field. The correct type is CFL_VARIANT_BOOL with union field as_bool. The variant_to_bool() function already exists in this file and handles both boolean types and string conversion, making it the appropriate solution.

    /* Parse per_record_routing option */
    {
        struct cfl_variant *per_record_var = cfl_kvlist_fetch(kvlist, "per_record_routing");
-       if (per_record_var && per_record_var->type == CFL_VARIANT_BOOLEAN) {
-           route->per_record_routing = per_record_var->data.as_boolean;
+       int prr;
+       if (per_record_var && variant_to_bool(per_record_var, &prr) == 0) {
+           route->per_record_routing = prr;
        }
    }
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
/* Parse per_record_routing option */
{
struct cfl_variant *per_record_var = cfl_kvlist_fetch(kvlist, "per_record_routing");
if (per_record_var && per_record_var->type == CFL_VARIANT_BOOLEAN) {
route->per_record_routing = per_record_var->data.as_boolean;
}
}
/* Parse per_record_routing option */
{
struct cfl_variant *per_record_var = cfl_kvlist_fetch(kvlist, "per_record_routing");
int prr;
if (per_record_var && variant_to_bool(per_record_var, &prr) == 0) {
route->per_record_routing = prr;
}
}
🤖 Prompt for AI Agents
In src/flb_router_config.c around lines 895 to 902, the code incorrectly checks
for CFL_VARIANT_BOOLEAN and uses per_record_var->data.as_boolean; replace this
with a CFL_VARIANT_BOOL check and use the existing variant_to_bool() helper to
obtain the boolean value, assigning that result to route->per_record_routing;
specifically, ensure you check per_record_var->type == CFL_VARIANT_BOOL (or
simply test for per_record_var != NULL) and call variant_to_bool(per_record_var)
to set route->per_record_routing so both proper variant type and string-to-bool
conversions are handled.

Signed-off-by: Eduardo Silva <[email protected]>
Signed-off-by: Eduardo Silva <[email protected]>
@edsiper
Copy link
Member Author

edsiper commented Oct 27, 2025

@codex review

Copy link

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines 112 to 123
ret = flb_hash_table_get(ins->ht_log_chunks,
payload->tag,
flb_sds_len(payload->tag),
(void **) &chunk,
&out_size);
if (ret == -1 || !chunk || !chunk->routes_mask) {
/* For threaded inputs, chunk may not exist yet - this is expected */
if (flb_input_is_threaded(ins)) {
/* In threaded mode, routing will be handled when chunk is materialized */
flb_plg_debug(ins, "chunk not yet materialized for threaded input, "
"routing will be handled asynchronously");
return 0; /* Success - don't treat as error */

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Conditional routing never applied for threaded inputs

When route_payload_apply_outputs is invoked the new code assumes threaded inputs will have their route masks patched later if the chunk is not yet present in ht_log_chunks. However there is no follow‑up code that performs that asynchronous update. Returning success at lines 117‑123 leaves the split chunk with the default mask calculated by flb_routes_mask_set_by_tag (usually empty because the synthetic tag is the route name). For threaded inputs this means conditional payloads are either dropped or routed to all default outputs, defeating the conditional routing logic introduced in this change. The function needs to schedule or perform the mask update once the chunk is materialized rather than bailing out early.

Useful? React with 👍 / 👎.

Disable conditional routing for threaded inputs to prevent data corruption,
complete mk_list to cfl_list migration, and fix return value logic differences.

Signed-off-by: Eduardo Silva <[email protected]>
@edsiper
Copy link
Member Author

edsiper commented Oct 27, 2025

@codex review

Copy link

@chatgpt-codex-connector chatgpt-codex-connector bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment on lines +757 to 772
ret = split_and_append_route_payloads(ins, records, tag, tag_len,
out_buf, out_size);
if (ret < 0) {
if (processor_is_active && buf != out_buf) {
flb_free(out_buf);
}
return -1;
}

/*
* Always call flb_input_chunk_append_raw to ensure non-conditional routes
* receive data even when conditional routes exist. The conditional routing
* should be additive, not exclusive.
*/
ret = flb_input_chunk_append_raw(ins, FLB_INPUT_LOGS, records,
tag, tag_len, out_buf, out_size);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Route-split logs still pass through original direct routes

After split_and_append_route_payloads() has already emitted per‑route chunks, the code still appends the original chunk (flb_input_chunk_append_raw(ins, …, out_buf, out_size)) without removing the conditional routes from its route mask. During task creation flb_router_path_should_route() evaluates the same conditions again and the original chunk matches because it still contains the records that satisfied the rule, so each conditional output receives both the filtered chunk and the original one. This duplicates every log routed conditionally and defeats the point of per‑route filtering. Consider clearing conditional outputs from the base chunk’s routes mask (or skipping the base append when conditional routing handled the record) so records are delivered exactly once.

Useful? React with 👍 / 👎.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant