Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 31 additions & 1 deletion pkg/engine/internal/planner/logical/planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,11 @@ func buildPlanForLogQuery(
// multiple parse stages. We will handle this in a future PR.
if hasLogfmtParser {
builder = builder.Parse(types.VariadicOpParseLogfmt, logfmtStrict, logfmtKeepEmpty)

// The old logfmt parse implementation does not seem to return errors, unlike json parse
builder = builder.ProjectDrop(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ben already added support to logfmt parser to support non-strict mode by default - it should not be adding error columns unless strict is explicitly requested with logfmt --strict

#19668

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are we still seeing errors from logfmt stage?

NewColumnRef(types.ColumnNameError, types.ColumnTypeGenerated), NewColumnRef(types.ColumnNameErrorDetails, types.ColumnTypeGenerated),
)
}
if hasJSONParser {
// JSON has no parameters
Expand Down Expand Up @@ -248,7 +253,15 @@ func walkRangeAggregation(e *syntax.RangeAggregationExpr, params logql.Params) (
return nil, errUnimplemented
}

builder = builder.Cast(unwrapIdentifier, unwrapOperation)
// Unwrap turns a column into numerical `value` column, and that original column shoul be dropped from the result.
builder = builder.
Cast(unwrapIdentifier, unwrapOperation).
ProjectDrop(&ColumnRef{
Ref: types.ColumnRef{
Column: unwrapIdentifier,
Type: types.ColumnTypeAmbiguous,
},
})
}

var rangeAggType types.RangeAggregationType
Expand All @@ -273,6 +286,23 @@ func walkRangeAggregation(e *syntax.RangeAggregationExpr, params logql.Params) (
return nil, errUnimplemented
}

// Filter out rows with any errors from parsing or unwrap stages.
builder = builder.Select(
&BinOp{
Left: &BinOp{
Left: NewColumnRef(types.ColumnNameError, types.ColumnTypeGenerated),
Right: NewLiteral(""),
Op: types.BinaryOpEq,
},
Right: &BinOp{
Left: NewColumnRef(types.ColumnNameErrorDetails, types.ColumnTypeGenerated),
Right: NewLiteral(""),
Op: types.BinaryOpEq,
},
Op: types.BinaryOpAnd,
},
)

builder = builder.RangeAggregation(
nil, rangeAggType, params.Start(), params.End(), params.Step(), rangeInterval,
)
Expand Down
125 changes: 79 additions & 46 deletions pkg/engine/internal/planner/logical/planner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,14 @@ func TestConvertAST_MetricQuery_Success(t *testing.T) {
%8 = LT builtin.timestamp 1970-01-01T02:00:00Z
%9 = SELECT %7 [predicate=%8]
%10 = SELECT %9 [predicate=%4]
%11 = RANGE_AGGREGATION %10 [operation=count, start_ts=1970-01-01T01:00:00Z, end_ts=1970-01-01T02:00:00Z, step=0s, range=5m0s]
%12 = VECTOR_AGGREGATION %11 [operation=sum, group_by=(ambiguous.level)]
%13 = LOGQL_COMPAT %12
RETURN %13
%11 = EQ generated.__error__ ""
%12 = EQ generated.__error_details__ ""
%13 = AND %11 %12
%14 = SELECT %10 [predicate=%13]
%15 = RANGE_AGGREGATION %14 [operation=count, start_ts=1970-01-01T01:00:00Z, end_ts=1970-01-01T02:00:00Z, step=0s, range=5m0s]
%16 = VECTOR_AGGREGATION %15 [operation=sum, group_by=(ambiguous.level)]
%17 = LOGQL_COMPAT %16
RETURN %17
`

require.Equal(t, expected, logicalPlan.String())
Expand Down Expand Up @@ -178,11 +182,15 @@ RETURN %13
%6 = SELECT %4 [predicate=%5]
%7 = LT builtin.timestamp 1970-01-01T02:00:00Z
%8 = SELECT %6 [predicate=%7]
%9 = RANGE_AGGREGATION %8 [operation=count, start_ts=1970-01-01T01:00:00Z, end_ts=1970-01-01T02:00:00Z, step=0s, range=5m0s]
%10 = DIV %9 300
%11 = VECTOR_AGGREGATION %10 [operation=sum, group_by=(ambiguous.level)]
%12 = LOGQL_COMPAT %11
RETURN %12
%9 = EQ generated.__error__ ""
%10 = EQ generated.__error_details__ ""
%11 = AND %9 %10
%12 = SELECT %8 [predicate=%11]
%13 = RANGE_AGGREGATION %12 [operation=count, start_ts=1970-01-01T01:00:00Z, end_ts=1970-01-01T02:00:00Z, step=0s, range=5m0s]
%14 = DIV %13 300
%15 = VECTOR_AGGREGATION %14 [operation=sum, group_by=(ambiguous.level)]
%16 = LOGQL_COMPAT %15
RETURN %16
`

require.Equal(t, expected, logicalPlan.String())
Expand Down Expand Up @@ -211,13 +219,17 @@ RETURN %12
%4 = SELECT %2 [predicate=%3]
%5 = LT builtin.timestamp 1970-01-01T02:00:00Z
%6 = SELECT %4 [predicate=%5]
%7 = RANGE_AGGREGATION %6 [operation=count, start_ts=1970-01-01T01:00:00Z, end_ts=1970-01-01T02:00:00Z, step=0s, range=5m0s]
%8 = DIV %7 300
%9 = SUB %8 100
%10 = POW %9 2
%11 = VECTOR_AGGREGATION %10 [operation=sum, group_by=(ambiguous.level)]
%12 = LOGQL_COMPAT %11
RETURN %12
%7 = EQ generated.__error__ ""
%8 = EQ generated.__error_details__ ""
%9 = AND %7 %8
%10 = SELECT %6 [predicate=%9]
%11 = RANGE_AGGREGATION %10 [operation=count, start_ts=1970-01-01T01:00:00Z, end_ts=1970-01-01T02:00:00Z, step=0s, range=5m0s]
%12 = DIV %11 300
%13 = SUB %12 100
%14 = POW %13 2
%15 = VECTOR_AGGREGATION %14 [operation=sum, group_by=(ambiguous.level)]
%16 = LOGQL_COMPAT %15
RETURN %16
`

require.Equal(t, expected, logicalPlan.String())
Expand Down Expand Up @@ -405,10 +417,15 @@ func TestPlannerCreatesCastOperationForUnwrap(t *testing.T) {
%5 = LT builtin.timestamp 1970-01-01T02:00:00Z
%6 = SELECT %4 [predicate=%5]
%7 = PROJECT %6 [mode=*E, expr=CAST_DURATION(ambiguous.response_time)]
%8 = RANGE_AGGREGATION %7 [operation=sum, start_ts=1970-01-01T01:00:00Z, end_ts=1970-01-01T02:00:00Z, step=0s, range=5m0s]
%9 = VECTOR_AGGREGATION %8 [operation=sum, group_by=(ambiguous.status)]
%10 = LOGQL_COMPAT %9
RETURN %10
%8 = PROJECT %7 [mode=*D, expr=ambiguous.response_time]
%9 = EQ generated.__error__ ""
%10 = EQ generated.__error_details__ ""
%11 = AND %9 %10
%12 = SELECT %8 [predicate=%11]
%13 = RANGE_AGGREGATION %12 [operation=sum, start_ts=1970-01-01T01:00:00Z, end_ts=1970-01-01T02:00:00Z, step=0s, range=5m0s]
%14 = VECTOR_AGGREGATION %13 [operation=sum, group_by=(ambiguous.status)]
%15 = LOGQL_COMPAT %14
RETURN %15
`
require.Equal(t, expected, plan.String())
})
Expand Down Expand Up @@ -437,12 +454,17 @@ func TestPlannerCreatesProjectionWithParseOperation(t *testing.T) {
%5 = LT builtin.timestamp 1970-01-01T02:00:00Z
%6 = SELECT %4 [predicate=%5]
%7 = PROJECT %6 [mode=*E, expr=PARSE_LOGFMT(builtin.message, [], false, false)]
%8 = EQ ambiguous.level "error"
%9 = SELECT %7 [predicate=%8]
%10 = RANGE_AGGREGATION %9 [operation=count, start_ts=1970-01-01T01:00:00Z, end_ts=1970-01-01T02:00:00Z, step=0s, range=5m0s]
%11 = VECTOR_AGGREGATION %10 [operation=sum, group_by=(ambiguous.level)]
%12 = LOGQL_COMPAT %11
RETURN %12
%8 = PROJECT %7 [mode=*D, expr=generated.__error__, expr=generated.__error_details__]
%9 = EQ ambiguous.level "error"
%10 = SELECT %8 [predicate=%9]
%11 = EQ generated.__error__ ""
%12 = EQ generated.__error_details__ ""
%13 = AND %11 %12
%14 = SELECT %10 [predicate=%13]
%15 = RANGE_AGGREGATION %14 [operation=count, start_ts=1970-01-01T01:00:00Z, end_ts=1970-01-01T02:00:00Z, step=0s, range=5m0s]
%16 = VECTOR_AGGREGATION %15 [operation=sum, group_by=(ambiguous.level)]
%17 = LOGQL_COMPAT %16
RETURN %17
`
require.Equal(t, expected, plan.String())
})
Expand All @@ -468,11 +490,12 @@ RETURN %12
%5 = LT builtin.timestamp 1970-01-01T02:00:00Z
%6 = SELECT %4 [predicate=%5]
%7 = PROJECT %6 [mode=*E, expr=PARSE_LOGFMT(builtin.message, [], false, false)]
%8 = EQ ambiguous.level "error"
%9 = SELECT %7 [predicate=%8]
%10 = TOPK %9 [sort_by=builtin.timestamp, k=1000, asc=false, nulls_first=false]
%11 = LOGQL_COMPAT %10
RETURN %11
%8 = PROJECT %7 [mode=*D, expr=generated.__error__, expr=generated.__error_details__]
%9 = EQ ambiguous.level "error"
%10 = SELECT %8 [predicate=%9]
%11 = TOPK %10 [sort_by=builtin.timestamp, k=1000, asc=false, nulls_first=false]
%12 = LOGQL_COMPAT %11
RETURN %12
`
require.Equal(t, expected, plan.String())
})
Expand Down Expand Up @@ -500,10 +523,14 @@ RETURN %11
%7 = PROJECT %6 [mode=*E, expr=PARSE_JSON(builtin.message, [], false, false)]
%8 = EQ ambiguous.level "error"
%9 = SELECT %7 [predicate=%8]
%10 = RANGE_AGGREGATION %9 [operation=count, start_ts=1970-01-01T01:00:00Z, end_ts=1970-01-01T02:00:00Z, step=0s, range=5m0s]
%11 = VECTOR_AGGREGATION %10 [operation=sum, group_by=(ambiguous.level)]
%12 = LOGQL_COMPAT %11
RETURN %12
%10 = EQ generated.__error__ ""
%11 = EQ generated.__error_details__ ""
%12 = AND %10 %11
%13 = SELECT %9 [predicate=%12]
%14 = RANGE_AGGREGATION %13 [operation=count, start_ts=1970-01-01T01:00:00Z, end_ts=1970-01-01T02:00:00Z, step=0s, range=5m0s]
%15 = VECTOR_AGGREGATION %14 [operation=sum, group_by=(ambiguous.level)]
%16 = LOGQL_COMPAT %15
RETURN %16
`
require.Equal(t, expected, plan.String())
})
Expand Down Expand Up @@ -566,11 +593,12 @@ RETURN %11
%9 = SELECT %8 [predicate=%2]
%10 = SELECT %9 [predicate=%3]
%11 = PROJECT %10 [mode=*E, expr=PARSE_LOGFMT(builtin.message, [], false, false)]
%12 = EQ ambiguous.level "debug"
%13 = SELECT %11 [predicate=%12]
%14 = TOPK %13 [sort_by=builtin.timestamp, k=1000, asc=false, nulls_first=false]
%15 = LOGQL_COMPAT %14
RETURN %15
%12 = PROJECT %11 [mode=*D, expr=generated.__error__, expr=generated.__error_details__]
%13 = EQ ambiguous.level "debug"
%14 = SELECT %12 [predicate=%13]
%15 = TOPK %14 [sort_by=builtin.timestamp, k=1000, asc=false, nulls_first=false]
%16 = LOGQL_COMPAT %15
RETURN %16
`

require.Equal(t, expected, plan.String(), "Operations should be in the correct order: LineFilter before Parse, LabelFilter after Parse")
Expand Down Expand Up @@ -602,12 +630,17 @@ RETURN %15
%9 = SELECT %8 [predicate=%2]
%10 = SELECT %9 [predicate=%3]
%11 = PROJECT %10 [mode=*E, expr=PARSE_LOGFMT(builtin.message, [], false, false)]
%12 = EQ ambiguous.level "debug"
%13 = SELECT %11 [predicate=%12]
%14 = RANGE_AGGREGATION %13 [operation=count, start_ts=1970-01-01T01:00:00Z, end_ts=1970-01-01T02:00:00Z, step=0s, range=5m0s]
%15 = VECTOR_AGGREGATION %14 [operation=sum, group_by=(ambiguous.level)]
%16 = LOGQL_COMPAT %15
RETURN %16
%12 = PROJECT %11 [mode=*D, expr=generated.__error__, expr=generated.__error_details__]
%13 = EQ ambiguous.level "debug"
%14 = SELECT %12 [predicate=%13]
%15 = EQ generated.__error__ ""
%16 = EQ generated.__error_details__ ""
%17 = AND %15 %16
%18 = SELECT %14 [predicate=%17]
%19 = RANGE_AGGREGATION %18 [operation=count, start_ts=1970-01-01T01:00:00Z, end_ts=1970-01-01T02:00:00Z, step=0s, range=5m0s]
%20 = VECTOR_AGGREGATION %19 [operation=sum, group_by=(ambiguous.level)]
%21 = LOGQL_COMPAT %20
RETURN %21
`

require.Equal(t, expected, plan.String(), "Metric query should preserve operation order: filters before parse, then parse, then filters after parse")
Expand Down
Loading
Loading