From 3011aa15751d42a5afb35961b3c5778c1219ad51 Mon Sep 17 00:00:00 2001 From: Stas Spiridonov Date: Thu, 20 Nov 2025 14:53:27 -0500 Subject: [PATCH] wip --- .../internal/planner/logical/planner.go | 32 ++++- .../internal/planner/logical/planner_test.go | 125 +++++++++++------- pkg/engine/internal/planner/planner_test.go | 103 ++++++++------- 3 files changed, 167 insertions(+), 93 deletions(-) diff --git a/pkg/engine/internal/planner/logical/planner.go b/pkg/engine/internal/planner/logical/planner.go index 8ae35c2aa12d2..49e6620a67b3f 100644 --- a/pkg/engine/internal/planner/logical/planner.go +++ b/pkg/engine/internal/planner/logical/planner.go @@ -187,6 +187,11 @@ func buildPlanForLogQuery( // multiple parse stages. We will handle this in a future PR. if hasLogfmtParser { builder = builder.Parse(types.VariadicOpParseLogfmt, logfmtStrict, logfmtKeepEmpty) + + // The old logfmt parse implementation does not seem to return errors, unlike json parse + builder = builder.ProjectDrop( + NewColumnRef(types.ColumnNameError, types.ColumnTypeGenerated), NewColumnRef(types.ColumnNameErrorDetails, types.ColumnTypeGenerated), + ) } if hasJSONParser { // JSON has no parameters @@ -248,7 +253,15 @@ func walkRangeAggregation(e *syntax.RangeAggregationExpr, params logql.Params) ( return nil, errUnimplemented } - builder = builder.Cast(unwrapIdentifier, unwrapOperation) + // Unwrap turns a column into numerical `value` column, and that original column shoul be dropped from the result. + builder = builder. + Cast(unwrapIdentifier, unwrapOperation). + ProjectDrop(&ColumnRef{ + Ref: types.ColumnRef{ + Column: unwrapIdentifier, + Type: types.ColumnTypeAmbiguous, + }, + }) } var rangeAggType types.RangeAggregationType @@ -273,6 +286,23 @@ func walkRangeAggregation(e *syntax.RangeAggregationExpr, params logql.Params) ( return nil, errUnimplemented } + // Filter out rows with any errors from parsing or unwrap stages. + builder = builder.Select( + &BinOp{ + Left: &BinOp{ + Left: NewColumnRef(types.ColumnNameError, types.ColumnTypeGenerated), + Right: NewLiteral(""), + Op: types.BinaryOpEq, + }, + Right: &BinOp{ + Left: NewColumnRef(types.ColumnNameErrorDetails, types.ColumnTypeGenerated), + Right: NewLiteral(""), + Op: types.BinaryOpEq, + }, + Op: types.BinaryOpAnd, + }, + ) + builder = builder.RangeAggregation( nil, rangeAggType, params.Start(), params.End(), params.Step(), rangeInterval, ) diff --git a/pkg/engine/internal/planner/logical/planner_test.go b/pkg/engine/internal/planner/logical/planner_test.go index 9ecdab5fb3ffe..c49d6c97912f5 100644 --- a/pkg/engine/internal/planner/logical/planner_test.go +++ b/pkg/engine/internal/planner/logical/planner_test.go @@ -144,10 +144,14 @@ func TestConvertAST_MetricQuery_Success(t *testing.T) { %8 = LT builtin.timestamp 1970-01-01T02:00:00Z %9 = SELECT %7 [predicate=%8] %10 = SELECT %9 [predicate=%4] -%11 = RANGE_AGGREGATION %10 [operation=count, start_ts=1970-01-01T01:00:00Z, end_ts=1970-01-01T02:00:00Z, step=0s, range=5m0s] -%12 = VECTOR_AGGREGATION %11 [operation=sum, group_by=(ambiguous.level)] -%13 = LOGQL_COMPAT %12 -RETURN %13 +%11 = EQ generated.__error__ "" +%12 = EQ generated.__error_details__ "" +%13 = AND %11 %12 +%14 = SELECT %10 [predicate=%13] +%15 = RANGE_AGGREGATION %14 [operation=count, start_ts=1970-01-01T01:00:00Z, end_ts=1970-01-01T02:00:00Z, step=0s, range=5m0s] +%16 = VECTOR_AGGREGATION %15 [operation=sum, group_by=(ambiguous.level)] +%17 = LOGQL_COMPAT %16 +RETURN %17 ` require.Equal(t, expected, logicalPlan.String()) @@ -178,11 +182,15 @@ RETURN %13 %6 = SELECT %4 [predicate=%5] %7 = LT builtin.timestamp 1970-01-01T02:00:00Z %8 = SELECT %6 [predicate=%7] -%9 = RANGE_AGGREGATION %8 [operation=count, start_ts=1970-01-01T01:00:00Z, end_ts=1970-01-01T02:00:00Z, step=0s, range=5m0s] -%10 = DIV %9 300 -%11 = VECTOR_AGGREGATION %10 [operation=sum, group_by=(ambiguous.level)] -%12 = LOGQL_COMPAT %11 -RETURN %12 +%9 = EQ generated.__error__ "" +%10 = EQ generated.__error_details__ "" +%11 = AND %9 %10 +%12 = SELECT %8 [predicate=%11] +%13 = RANGE_AGGREGATION %12 [operation=count, start_ts=1970-01-01T01:00:00Z, end_ts=1970-01-01T02:00:00Z, step=0s, range=5m0s] +%14 = DIV %13 300 +%15 = VECTOR_AGGREGATION %14 [operation=sum, group_by=(ambiguous.level)] +%16 = LOGQL_COMPAT %15 +RETURN %16 ` require.Equal(t, expected, logicalPlan.String()) @@ -211,13 +219,17 @@ RETURN %12 %4 = SELECT %2 [predicate=%3] %5 = LT builtin.timestamp 1970-01-01T02:00:00Z %6 = SELECT %4 [predicate=%5] -%7 = RANGE_AGGREGATION %6 [operation=count, start_ts=1970-01-01T01:00:00Z, end_ts=1970-01-01T02:00:00Z, step=0s, range=5m0s] -%8 = DIV %7 300 -%9 = SUB %8 100 -%10 = POW %9 2 -%11 = VECTOR_AGGREGATION %10 [operation=sum, group_by=(ambiguous.level)] -%12 = LOGQL_COMPAT %11 -RETURN %12 +%7 = EQ generated.__error__ "" +%8 = EQ generated.__error_details__ "" +%9 = AND %7 %8 +%10 = SELECT %6 [predicate=%9] +%11 = RANGE_AGGREGATION %10 [operation=count, start_ts=1970-01-01T01:00:00Z, end_ts=1970-01-01T02:00:00Z, step=0s, range=5m0s] +%12 = DIV %11 300 +%13 = SUB %12 100 +%14 = POW %13 2 +%15 = VECTOR_AGGREGATION %14 [operation=sum, group_by=(ambiguous.level)] +%16 = LOGQL_COMPAT %15 +RETURN %16 ` require.Equal(t, expected, logicalPlan.String()) @@ -405,10 +417,15 @@ func TestPlannerCreatesCastOperationForUnwrap(t *testing.T) { %5 = LT builtin.timestamp 1970-01-01T02:00:00Z %6 = SELECT %4 [predicate=%5] %7 = PROJECT %6 [mode=*E, expr=CAST_DURATION(ambiguous.response_time)] -%8 = RANGE_AGGREGATION %7 [operation=sum, start_ts=1970-01-01T01:00:00Z, end_ts=1970-01-01T02:00:00Z, step=0s, range=5m0s] -%9 = VECTOR_AGGREGATION %8 [operation=sum, group_by=(ambiguous.status)] -%10 = LOGQL_COMPAT %9 -RETURN %10 +%8 = PROJECT %7 [mode=*D, expr=ambiguous.response_time] +%9 = EQ generated.__error__ "" +%10 = EQ generated.__error_details__ "" +%11 = AND %9 %10 +%12 = SELECT %8 [predicate=%11] +%13 = RANGE_AGGREGATION %12 [operation=sum, start_ts=1970-01-01T01:00:00Z, end_ts=1970-01-01T02:00:00Z, step=0s, range=5m0s] +%14 = VECTOR_AGGREGATION %13 [operation=sum, group_by=(ambiguous.status)] +%15 = LOGQL_COMPAT %14 +RETURN %15 ` require.Equal(t, expected, plan.String()) }) @@ -437,12 +454,17 @@ func TestPlannerCreatesProjectionWithParseOperation(t *testing.T) { %5 = LT builtin.timestamp 1970-01-01T02:00:00Z %6 = SELECT %4 [predicate=%5] %7 = PROJECT %6 [mode=*E, expr=PARSE_LOGFMT(builtin.message, [], false, false)] -%8 = EQ ambiguous.level "error" -%9 = SELECT %7 [predicate=%8] -%10 = RANGE_AGGREGATION %9 [operation=count, start_ts=1970-01-01T01:00:00Z, end_ts=1970-01-01T02:00:00Z, step=0s, range=5m0s] -%11 = VECTOR_AGGREGATION %10 [operation=sum, group_by=(ambiguous.level)] -%12 = LOGQL_COMPAT %11 -RETURN %12 +%8 = PROJECT %7 [mode=*D, expr=generated.__error__, expr=generated.__error_details__] +%9 = EQ ambiguous.level "error" +%10 = SELECT %8 [predicate=%9] +%11 = EQ generated.__error__ "" +%12 = EQ generated.__error_details__ "" +%13 = AND %11 %12 +%14 = SELECT %10 [predicate=%13] +%15 = RANGE_AGGREGATION %14 [operation=count, start_ts=1970-01-01T01:00:00Z, end_ts=1970-01-01T02:00:00Z, step=0s, range=5m0s] +%16 = VECTOR_AGGREGATION %15 [operation=sum, group_by=(ambiguous.level)] +%17 = LOGQL_COMPAT %16 +RETURN %17 ` require.Equal(t, expected, plan.String()) }) @@ -468,11 +490,12 @@ RETURN %12 %5 = LT builtin.timestamp 1970-01-01T02:00:00Z %6 = SELECT %4 [predicate=%5] %7 = PROJECT %6 [mode=*E, expr=PARSE_LOGFMT(builtin.message, [], false, false)] -%8 = EQ ambiguous.level "error" -%9 = SELECT %7 [predicate=%8] -%10 = TOPK %9 [sort_by=builtin.timestamp, k=1000, asc=false, nulls_first=false] -%11 = LOGQL_COMPAT %10 -RETURN %11 +%8 = PROJECT %7 [mode=*D, expr=generated.__error__, expr=generated.__error_details__] +%9 = EQ ambiguous.level "error" +%10 = SELECT %8 [predicate=%9] +%11 = TOPK %10 [sort_by=builtin.timestamp, k=1000, asc=false, nulls_first=false] +%12 = LOGQL_COMPAT %11 +RETURN %12 ` require.Equal(t, expected, plan.String()) }) @@ -500,10 +523,14 @@ RETURN %11 %7 = PROJECT %6 [mode=*E, expr=PARSE_JSON(builtin.message, [], false, false)] %8 = EQ ambiguous.level "error" %9 = SELECT %7 [predicate=%8] -%10 = RANGE_AGGREGATION %9 [operation=count, start_ts=1970-01-01T01:00:00Z, end_ts=1970-01-01T02:00:00Z, step=0s, range=5m0s] -%11 = VECTOR_AGGREGATION %10 [operation=sum, group_by=(ambiguous.level)] -%12 = LOGQL_COMPAT %11 -RETURN %12 +%10 = EQ generated.__error__ "" +%11 = EQ generated.__error_details__ "" +%12 = AND %10 %11 +%13 = SELECT %9 [predicate=%12] +%14 = RANGE_AGGREGATION %13 [operation=count, start_ts=1970-01-01T01:00:00Z, end_ts=1970-01-01T02:00:00Z, step=0s, range=5m0s] +%15 = VECTOR_AGGREGATION %14 [operation=sum, group_by=(ambiguous.level)] +%16 = LOGQL_COMPAT %15 +RETURN %16 ` require.Equal(t, expected, plan.String()) }) @@ -566,11 +593,12 @@ RETURN %11 %9 = SELECT %8 [predicate=%2] %10 = SELECT %9 [predicate=%3] %11 = PROJECT %10 [mode=*E, expr=PARSE_LOGFMT(builtin.message, [], false, false)] -%12 = EQ ambiguous.level "debug" -%13 = SELECT %11 [predicate=%12] -%14 = TOPK %13 [sort_by=builtin.timestamp, k=1000, asc=false, nulls_first=false] -%15 = LOGQL_COMPAT %14 -RETURN %15 +%12 = PROJECT %11 [mode=*D, expr=generated.__error__, expr=generated.__error_details__] +%13 = EQ ambiguous.level "debug" +%14 = SELECT %12 [predicate=%13] +%15 = TOPK %14 [sort_by=builtin.timestamp, k=1000, asc=false, nulls_first=false] +%16 = LOGQL_COMPAT %15 +RETURN %16 ` require.Equal(t, expected, plan.String(), "Operations should be in the correct order: LineFilter before Parse, LabelFilter after Parse") @@ -602,12 +630,17 @@ RETURN %15 %9 = SELECT %8 [predicate=%2] %10 = SELECT %9 [predicate=%3] %11 = PROJECT %10 [mode=*E, expr=PARSE_LOGFMT(builtin.message, [], false, false)] -%12 = EQ ambiguous.level "debug" -%13 = SELECT %11 [predicate=%12] -%14 = RANGE_AGGREGATION %13 [operation=count, start_ts=1970-01-01T01:00:00Z, end_ts=1970-01-01T02:00:00Z, step=0s, range=5m0s] -%15 = VECTOR_AGGREGATION %14 [operation=sum, group_by=(ambiguous.level)] -%16 = LOGQL_COMPAT %15 -RETURN %16 +%12 = PROJECT %11 [mode=*D, expr=generated.__error__, expr=generated.__error_details__] +%13 = EQ ambiguous.level "debug" +%14 = SELECT %12 [predicate=%13] +%15 = EQ generated.__error__ "" +%16 = EQ generated.__error_details__ "" +%17 = AND %15 %16 +%18 = SELECT %14 [predicate=%17] +%19 = RANGE_AGGREGATION %18 [operation=count, start_ts=1970-01-01T01:00:00Z, end_ts=1970-01-01T02:00:00Z, step=0s, range=5m0s] +%20 = VECTOR_AGGREGATION %19 [operation=sum, group_by=(ambiguous.level)] +%21 = LOGQL_COMPAT %20 +RETURN %21 ` require.Equal(t, expected, plan.String(), "Metric query should preserve operation order: filters before parse, then parse, then filters after parse") diff --git a/pkg/engine/internal/planner/planner_test.go b/pkg/engine/internal/planner/planner_test.go index 90a00dc2d4324..28fde2b229bc5 100644 --- a/pkg/engine/internal/planner/planner_test.go +++ b/pkg/engine/internal/planner/planner_test.go @@ -168,12 +168,13 @@ TopK sort_by=builtin.timestamp ascending=false nulls_first=false k=1000 └── Parallelize └── TopK sort_by=builtin.timestamp ascending=false nulls_first=false k=1000 └── Filter predicate[0]=EQ(ambiguous.level, "error") - └── Compat src=parsed dst=parsed collision=label - └── Projection all=true expand=(PARSE_LOGFMT(builtin.message, [], false, false)) - └── Compat src=metadata dst=metadata collision=label - └── ScanSet num_targets=2 predicate[0]=GTE(builtin.timestamp, 2025-01-01T00:00:00Z) predicate[1]=LT(builtin.timestamp, 2025-01-01T01:00:00Z) predicate[2]=MATCH_STR(builtin.message, "bar") - ├── @target type=ScanTypeDataObject location=objects/00/0000000000.dataobj streams=5 section_id=1 projections=() - └── @target type=ScanTypeDataObject location=objects/00/0000000000.dataobj streams=5 section_id=0 projections=() + └── Projection all=true drop=(generated.__error__, generated.__error_details__) + └── Compat src=parsed dst=parsed collision=label + └── Projection all=true expand=(PARSE_LOGFMT(builtin.message, [], false, false)) + └── Compat src=metadata dst=metadata collision=label + └── ScanSet num_targets=2 predicate[0]=GTE(builtin.timestamp, 2025-01-01T00:00:00Z) predicate[1]=LT(builtin.timestamp, 2025-01-01T01:00:00Z) predicate[2]=MATCH_STR(builtin.message, "bar") + ├── @target type=ScanTypeDataObject location=objects/00/0000000000.dataobj streams=5 section_id=1 projections=() + └── @target type=ScanTypeDataObject location=objects/00/0000000000.dataobj streams=5 section_id=0 projections=() `, }, { @@ -184,12 +185,13 @@ TopK sort_by=builtin.timestamp ascending=false nulls_first=false k=1000 └── Parallelize └── TopK sort_by=builtin.timestamp ascending=false nulls_first=false k=1000 └── Projection all=true drop=(ambiguous.service_name, ambiguous.__error__) - └── Compat src=parsed dst=parsed collision=label - └── Projection all=true expand=(PARSE_LOGFMT(builtin.message, [], false, false)) - └── Compat src=metadata dst=metadata collision=label - └── ScanSet num_targets=2 predicate[0]=GTE(builtin.timestamp, 2025-01-01T00:00:00Z) predicate[1]=LT(builtin.timestamp, 2025-01-01T01:00:00Z) - ├── @target type=ScanTypeDataObject location=objects/00/0000000000.dataobj streams=5 section_id=1 projections=() - └── @target type=ScanTypeDataObject location=objects/00/0000000000.dataobj streams=5 section_id=0 projections=() + └── Projection all=true drop=(generated.__error__, generated.__error_details__) + └── Compat src=parsed dst=parsed collision=label + └── Projection all=true expand=(PARSE_LOGFMT(builtin.message, [], false, false)) + └── Compat src=metadata dst=metadata collision=label + └── ScanSet num_targets=2 predicate[0]=GTE(builtin.timestamp, 2025-01-01T00:00:00Z) predicate[1]=LT(builtin.timestamp, 2025-01-01T01:00:00Z) + ├── @target type=ScanTypeDataObject location=objects/00/0000000000.dataobj streams=5 section_id=1 projections=() + └── @target type=ScanTypeDataObject location=objects/00/0000000000.dataobj streams=5 section_id=0 projections=() `, }, { @@ -203,14 +205,17 @@ TopK sort_by=builtin.timestamp ascending=false nulls_first=false k=1000 VectorAggregation operation=sum group_by=(ambiguous.bar) └── RangeAggregation operation=sum start=2025-01-01T00:00:00Z end=2025-01-01T01:00:00Z step=0s range=1m0s partition_by=(ambiguous.bar) └── Parallelize - └── Projection all=true expand=(CAST_DURATION(ambiguous.request_duration)) - └── Filter predicate[0]=NEQ(ambiguous.request_duration, "") - └── Compat src=parsed dst=parsed collision=label - └── Projection all=true expand=(PARSE_LOGFMT(builtin.message, [bar, request_duration], false, false)) - └── Compat src=metadata dst=metadata collision=label - └── ScanSet num_targets=2 projections=(ambiguous.bar, builtin.message, ambiguous.request_duration, builtin.timestamp) predicate[0]=GTE(builtin.timestamp, 2024-12-31T23:59:00Z) predicate[1]=LT(builtin.timestamp, 2025-01-01T01:00:00Z) - ├── @target type=ScanTypeDataObject location=objects/00/0000000000.dataobj streams=5 section_id=1 projections=() - └── @target type=ScanTypeDataObject location=objects/00/0000000000.dataobj streams=5 section_id=0 projections=() + └── Filter predicate[0]=AND(EQ(generated.__error__, ""), EQ(generated.__error_details__, "")) + └── Projection all=true drop=(ambiguous.request_duration) + └── Projection all=true expand=(CAST_DURATION(ambiguous.request_duration)) + └── Filter predicate[0]=NEQ(ambiguous.request_duration, "") + └── Projection all=true drop=(generated.__error__, generated.__error_details__) + └── Compat src=parsed dst=parsed collision=label + └── Projection all=true expand=(PARSE_LOGFMT(builtin.message, [bar, request_duration], false, false)) + └── Compat src=metadata dst=metadata collision=label + └── ScanSet num_targets=2 projections=(ambiguous.bar, builtin.message, ambiguous.request_duration, builtin.timestamp) predicate[0]=GTE(builtin.timestamp, 2024-12-31T23:59:00Z) predicate[1]=LT(builtin.timestamp, 2025-01-01T01:00:00Z) + ├── @target type=ScanTypeDataObject location=objects/00/0000000000.dataobj streams=5 section_id=1 projections=() + └── @target type=ScanTypeDataObject location=objects/00/0000000000.dataobj streams=5 section_id=0 projections=() `, }, { @@ -220,16 +225,18 @@ VectorAggregation operation=sum group_by=(ambiguous.bar) VectorAggregation operation=sum └── RangeAggregation operation=count start=2025-01-01T00:00:00Z end=2025-01-01T01:00:00Z step=0s range=1m0s └── Parallelize - └── Projection all=true drop=(ambiguous.__error__, ambiguous.__error_details__) - └── Compat src=parsed dst=parsed collision=label - └── Projection all=true expand=(PARSE_JSON(builtin.message, [], false, false)) - └── Compat src=parsed dst=parsed collision=label - └── Projection all=true expand=(PARSE_LOGFMT(builtin.message, [], false, false)) - └── Filter predicate[0]=EQ(ambiguous.detected_level, "error") - └── Compat src=metadata dst=metadata collision=label - └── ScanSet num_targets=2 projections=(ambiguous.detected_level, builtin.message, builtin.timestamp) predicate[0]=GTE(builtin.timestamp, 2024-12-31T23:59:00Z) predicate[1]=LT(builtin.timestamp, 2025-01-01T01:00:00Z) - ├── @target type=ScanTypeDataObject location=objects/00/0000000000.dataobj streams=5 section_id=1 projections=() - └── @target type=ScanTypeDataObject location=objects/00/0000000000.dataobj streams=5 section_id=0 projections=() + └── Filter predicate[0]=AND(EQ(generated.__error__, ""), EQ(generated.__error_details__, "")) + └── Projection all=true drop=(ambiguous.__error__, ambiguous.__error_details__) + └── Compat src=parsed dst=parsed collision=label + └── Projection all=true expand=(PARSE_JSON(builtin.message, [], false, false)) + └── Projection all=true drop=(generated.__error__, generated.__error_details__) + └── Compat src=parsed dst=parsed collision=label + └── Projection all=true expand=(PARSE_LOGFMT(builtin.message, [], false, false)) + └── Filter predicate[0]=EQ(ambiguous.detected_level, "error") + └── Compat src=metadata dst=metadata collision=label + └── ScanSet num_targets=2 projections=(ambiguous.detected_level, builtin.message, builtin.timestamp) predicate[0]=GTE(builtin.timestamp, 2024-12-31T23:59:00Z) predicate[1]=LT(builtin.timestamp, 2025-01-01T01:00:00Z) + ├── @target type=ScanTypeDataObject location=objects/00/0000000000.dataobj streams=5 section_id=1 projections=() + └── @target type=ScanTypeDataObject location=objects/00/0000000000.dataobj streams=5 section_id=0 projections=() `, }, @@ -241,10 +248,11 @@ VectorAggregation operation=sum group_by=(ambiguous.bar) └── Projection all=true expand=(DIV(generated.value, 300)) └── RangeAggregation operation=count start=2025-01-01T00:00:00Z end=2025-01-01T01:00:00Z step=0s range=1m0s partition_by=(ambiguous.bar) └── Parallelize - └── Compat src=metadata dst=metadata collision=label - └── ScanSet num_targets=2 projections=(ambiguous.bar, builtin.timestamp) predicate[0]=GTE(builtin.timestamp, 2024-12-31T23:59:00Z) predicate[1]=LT(builtin.timestamp, 2025-01-01T01:00:00Z) - ├── @target type=ScanTypeDataObject location=objects/00/0000000000.dataobj streams=5 section_id=1 projections=() - └── @target type=ScanTypeDataObject location=objects/00/0000000000.dataobj streams=5 section_id=0 projections=() + └── Filter predicate[0]=AND(EQ(generated.__error__, ""), EQ(generated.__error_details__, "")) + └── Compat src=metadata dst=metadata collision=label + └── ScanSet num_targets=2 projections=(ambiguous.bar, builtin.timestamp) predicate[0]=GTE(builtin.timestamp, 2024-12-31T23:59:00Z) predicate[1]=LT(builtin.timestamp, 2025-01-01T01:00:00Z) + ├── @target type=ScanTypeDataObject location=objects/00/0000000000.dataobj streams=5 section_id=1 projections=() + └── @target type=ScanTypeDataObject location=objects/00/0000000000.dataobj streams=5 section_id=0 projections=() `, }, { @@ -254,12 +262,13 @@ VectorAggregation operation=sum group_by=(ambiguous.bar) TopK sort_by=builtin.timestamp ascending=false nulls_first=false k=1000 └── Parallelize └── TopK sort_by=builtin.timestamp ascending=false nulls_first=false k=1000 - └── Compat src=parsed dst=parsed collision=label - └── Projection all=true expand=(PARSE_LOGFMT(builtin.message, [], false, false)) - └── Compat src=metadata dst=metadata collision=label - └── ScanSet num_targets=2 predicate[0]=GTE(builtin.timestamp, 2025-01-01T00:00:00Z) predicate[1]=LT(builtin.timestamp, 2025-01-01T01:00:00Z) - ├── @target type=ScanTypeDataObject location=objects/00/0000000000.dataobj streams=5 section_id=1 projections=() - └── @target type=ScanTypeDataObject location=objects/00/0000000000.dataobj streams=5 section_id=0 projections=() + └── Projection all=true drop=(generated.__error__, generated.__error_details__) + └── Compat src=parsed dst=parsed collision=label + └── Projection all=true expand=(PARSE_LOGFMT(builtin.message, [], false, false)) + └── Compat src=metadata dst=metadata collision=label + └── ScanSet num_targets=2 predicate[0]=GTE(builtin.timestamp, 2025-01-01T00:00:00Z) predicate[1]=LT(builtin.timestamp, 2025-01-01T01:00:00Z) + ├── @target type=ScanTypeDataObject location=objects/00/0000000000.dataobj streams=5 section_id=1 projections=() + └── @target type=ScanTypeDataObject location=objects/00/0000000000.dataobj streams=5 section_id=0 projections=() `, }, { @@ -269,12 +278,14 @@ TopK sort_by=builtin.timestamp ascending=false nulls_first=false k=1000 VectorAggregation operation=sum group_by=(ambiguous.bar) └── RangeAggregation operation=count start=2025-01-01T00:00:00Z end=2025-01-01T01:00:00Z step=0s range=1m0s partition_by=(ambiguous.bar) └── Parallelize - └── Compat src=parsed dst=parsed collision=label - └── Projection all=true expand=(PARSE_LOGFMT(builtin.message, [bar], false, false)) - └── Compat src=metadata dst=metadata collision=label - └── ScanSet num_targets=2 projections=(ambiguous.bar, builtin.message, builtin.timestamp) predicate[0]=GTE(builtin.timestamp, 2024-12-31T23:59:00Z) predicate[1]=LT(builtin.timestamp, 2025-01-01T01:00:00Z) - ├── @target type=ScanTypeDataObject location=objects/00/0000000000.dataobj streams=5 section_id=1 projections=() - └── @target type=ScanTypeDataObject location=objects/00/0000000000.dataobj streams=5 section_id=0 projections=() + └── Filter predicate[0]=AND(EQ(generated.__error__, ""), EQ(generated.__error_details__, "")) + └── Projection all=true drop=(generated.__error__, generated.__error_details__) + └── Compat src=parsed dst=parsed collision=label + └── Projection all=true expand=(PARSE_LOGFMT(builtin.message, [bar], false, false)) + └── Compat src=metadata dst=metadata collision=label + └── ScanSet num_targets=2 projections=(ambiguous.bar, builtin.message, builtin.timestamp) predicate[0]=GTE(builtin.timestamp, 2024-12-31T23:59:00Z) predicate[1]=LT(builtin.timestamp, 2025-01-01T01:00:00Z) + ├── @target type=ScanTypeDataObject location=objects/00/0000000000.dataobj streams=5 section_id=1 projections=() + └── @target type=ScanTypeDataObject location=objects/00/0000000000.dataobj streams=5 section_id=0 projections=() `, }, }