Skip to content

Commit dbb5d14

Browse files
committed
in_tail: Implement long line truncation
For avoiding to skip long line consumption, it sometimes needs to consume until the limit of buffers. This could provide different approach of mitigation for consuming long lines. Signed-off-by: Hiroshi Hatake <[email protected]>
1 parent 65722e8 commit dbb5d14

File tree

4 files changed

+86
-2
lines changed

4 files changed

+86
-2
lines changed

plugins/in_tail/tail.c

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -719,6 +719,12 @@ static struct flb_config_map config_map[] = {
719719
0, FLB_TRUE, offsetof(struct flb_tail_config, skip_empty_lines),
720720
"Allows to skip empty lines."
721721
},
722+
723+
{
724+
FLB_CONFIG_MAP_BOOL, "truncate_long_lines", "false",
725+
0, FLB_TRUE, offsetof(struct flb_tail_config, truncate_long_lines),
726+
"Truncate overlong lines after input encoding to UTF-8"
727+
},
722728
#ifdef __linux__
723729
{
724730
FLB_CONFIG_MAP_BOOL, "file_cache_advise", "true",

plugins/in_tail/tail_config.c

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -485,6 +485,13 @@ struct flb_tail_config *flb_tail_config_create(struct flb_input_instance *ins,
485485
"multiline_truncated_total",
486486
"Total number of truncated occurences for multilines",
487487
1, (char *[]) {"name"});
488+
ctx->cmt_long_line_truncated = \
489+
cmt_counter_create(ins->cmt,
490+
"fluentbit", "input",
491+
"long_line_truncated_total",
492+
"Total number of truncated occurences for long lines",
493+
1, (char *[]) {"name"});
494+
488495
/* OLD metrics */
489496
flb_metrics_add(FLB_TAIL_METRIC_F_OPENED,
490497
"files_opened", ctx->ins->metrics);
@@ -494,6 +501,8 @@ struct flb_tail_config *flb_tail_config_create(struct flb_input_instance *ins,
494501
"files_rotated", ctx->ins->metrics);
495502
flb_metrics_add(FLB_TAIL_METRIC_M_TRUNCATED,
496503
"multiline_truncated", ctx->ins->metrics);
504+
flb_metrics_add(FLB_TAIL_METRIC_L_TRUNCATED,
505+
"long_line_truncated", ctx->ins->metrics);
497506
#endif
498507

499508
return ctx;

plugins/in_tail/tail_config.h

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
#define FLB_TAIL_METRIC_F_CLOSED 101 /* number of closed files */
4343
#define FLB_TAIL_METRIC_F_ROTATED 102 /* number of rotated files */
4444
#define FLB_TAIL_METRIC_M_TRUNCATED 103 /* number of truncated occurrences of multiline */
45+
#define FLB_TAIL_METRIC_L_TRUNCATED 104 /* number of truncated occurrences of long lines */
4546
#endif
4647

4748
struct flb_tail_config {
@@ -54,6 +55,7 @@ struct flb_tail_config {
5455
/* Buffer Config */
5556
size_t buf_chunk_size; /* allocation chunks */
5657
size_t buf_max_size; /* max size of a buffer */
58+
int truncate_long_lines; /* truncate long lines after re-encode */
5759

5860
/* Static files processor */
5961
size_t static_batch_size;
@@ -169,6 +171,7 @@ struct flb_tail_config {
169171
struct cmt_counter *cmt_files_closed;
170172
struct cmt_counter *cmt_files_rotated;
171173
struct cmt_counter *cmt_multiline_truncated;
174+
struct cmt_counter *cmt_long_line_truncated;
172175

173176
/* Hash: hash tables for quick acess to registered files */
174177
struct flb_hash_table *static_hash;

plugins/in_tail/tail_file.c

Lines changed: 68 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -457,6 +457,24 @@ static FLB_INLINE const char *flb_skip_leading_zeros_simd(const char *data, cons
457457
return data;
458458
}
459459

460+
/* Return a UTF-8 safe cut position <= max */
461+
static size_t utf8_safe_truncate_pos(const char *s, size_t len, size_t max)
462+
{
463+
size_t cut = 0;
464+
465+
cut = (len <= max) ? len : max;
466+
if (cut == len) {
467+
return cut;
468+
}
469+
470+
/* backtrack over continuation bytes 10xxxxxx */
471+
while (cut > 0 && ((unsigned char)s[cut] & 0xC0) == 0x80) {
472+
cut--;
473+
}
474+
475+
return cut;
476+
}
477+
460478
static int process_content(struct flb_tail_file *file, size_t *bytes)
461479
{
462480
size_t len;
@@ -481,6 +499,12 @@ static int process_content(struct flb_tail_file *file, size_t *bytes)
481499
#ifdef FLB_HAVE_UNICODE_ENCODER
482500
size_t decoded_len;
483501
#endif
502+
size_t cut = 0;
503+
size_t dec_len = 0;
504+
size_t window = 0;
505+
int truncation_happened = FLB_FALSE;
506+
size_t bytes_override = 0;
507+
void *nl = NULL;
484508
#ifdef FLB_HAVE_METRICS
485509
uint64_t ts;
486510
char *name;
@@ -542,6 +566,43 @@ static int process_content(struct flb_tail_file *file, size_t *bytes)
542566
data = (char *)flb_skip_leading_zeros_simd(data, end, &processed_bytes);
543567
}
544568

569+
if (ctx->truncate_long_lines == FLB_TRUE) {
570+
dec_len = (size_t)(end - data);
571+
window = ctx->buf_max_size + 1;
572+
if (window > dec_len) {
573+
window = dec_len;
574+
}
575+
576+
nl = memchr(data, '\n', window);
577+
if (nl == NULL && dec_len > ctx->buf_max_size) {
578+
cut = utf8_safe_truncate_pos(data, dec_len, ctx->buf_max_size);
579+
580+
if (cut > 0) {
581+
if (ctx->multiline == FLB_TRUE) {
582+
flb_tail_mult_flush(file, ctx);
583+
}
584+
585+
flb_tail_file_pack_line(NULL, data, cut, file, processed_bytes);
586+
587+
#ifdef FLB_HAVE_METRICS
588+
cmt_counter_inc(ctx->cmt_long_line_truncated,
589+
cfl_time_now(), 1,
590+
(char*[]){ (char*) flb_input_name(ctx->ins) });
591+
#endif
592+
file->skip_next = FLB_TRUE;
593+
594+
bytes_override = (original_len > 0) ? original_len : file->buf_len;
595+
truncation_happened = FLB_TRUE;
596+
597+
lines++;
598+
goto truncation_end;
599+
}
600+
else {
601+
file->skip_next = FLB_TRUE;
602+
}
603+
}
604+
}
605+
545606
while (data < end && (p = memchr(data, '\n', end - data))) {
546607
len = (p - data);
547608
crlf = 0;
@@ -700,6 +761,7 @@ static int process_content(struct flb_tail_file *file, size_t *bytes)
700761
file->last_processed_bytes = processed_bytes;
701762
}
702763

764+
truncation_end:
703765
if (decoded) {
704766
flb_free(decoded);
705767
decoded = NULL;
@@ -709,9 +771,13 @@ static int process_content(struct flb_tail_file *file, size_t *bytes)
709771

710772
if (lines > 0) {
711773
/* Append buffer content to a chunk */
712-
if (original_len > 0) {
774+
if (truncation_happened) {
775+
*bytes = bytes_override;
776+
}
777+
else if (original_len > 0) {
713778
*bytes = original_len;
714-
} else {
779+
}
780+
else {
715781
*bytes = processed_bytes;
716782
}
717783

0 commit comments

Comments
 (0)