Skip to content

Conversation

@spiridonov
Copy link
Contributor

@spiridonov spiridonov commented Nov 19, 2025

What this PR does / why we need it:

This PR introduces proper groupings (a list of columns and a mode by/without). Previously it was represented only a s list of columns and without was not supported at all.

  • aggregator is changed to aggregate by an arbitrary variable list of labels. The resulted record will have a union of all columns seen during aggregation.
  • Pushdown optimizations are changed. For example, if a range aggregation has without () we have to read all columns from data objects and nothing can be pushed down.
  • Printers are changed to reflect gropings mode.
  • Proto changes for physical plans.

Which issue(s) this PR fixes:
Fixes #

Special notes for your reviewer:

Checklist

  • Reviewed the CONTRIBUTING.md guide (required)
  • Documentation added
  • Tests updated
  • Title matches the required conventional commits format, see here
    • Note that Promtail is considered to be feature complete, and future development for logs collection will be in Grafana Alloy. As such, feat PRs are unlikely to be accepted unless a case can be made for the feature actually being a bug fix to existing behavior.
  • Changes that require user attention or interaction to upgrade are documented in docs/sources/setup/upgrade/_index.md
  • If the change is deprecating or removing a configuration option, update the deprecated-config.yaml and deleted-config.yaml files respectively in the tools/deprecated-config-checker directory. Example PR

@spiridonov spiridonov changed the title chore: Aggregation grouping and min/max/avg functions chore: Aggregation groupings for by() and without() Nov 20, 2025
@spiridonov spiridonov marked this pull request as ready for review November 20, 2025 15:42
@spiridonov spiridonov requested a review from a team as a code owner November 20, 2025 15:42

// One of the parsed columns
case ident.ColumnType() == types.ColumnTypeParsed:
case ident.ColumnType() == types.ColumnTypeParsed || (ident.ColumnType() == types.ColumnTypeGenerated &&
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this added to handle error columns that are of type Generated?

fmt.Sprintf(`count_over_time(%s[%s])`, selector, rangeInterval),
fmt.Sprintf(`count_over_time(%s | detected_level=~"error|warn" [%s])`, selector, rangeInterval),
fmt.Sprintf(`count_over_time(%s |= "level" [%s])`, selector, rangeInterval),
//fmt.Sprintf(`avg_over_time(%s | json | unwrap rows_affected [%s])`, selector, rangeInterval),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think we should uncomment these if tests are passing or do that in a follow-up after fixing the correctness issues


const (
GroupingModeInvalid GroupingMode = iota
GroupingModeByEmptySet // Grouping by empty label set: <operation> by () (<expr>)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need 4 of these? GroupingModeByEmptySet is the same as GroupingModeByLabelSet without any groupings right

This can also be a flag. without=(true|false)


// Aggregation operation to perform over the underlying range vector.
AggregateVectorOp operation = 2;
AggregateVectorOp operation = 3;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: no need to reorder this entry

panic("len(labels) != len(labelValues)")
}

for _, label := range labels {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this can be expensive as we run it for each row. can we check the benchmarks for a metric test with this change?


t.Run("basic SUM aggregation with record building", func(t *testing.T) {
agg := newAggregator(groupBy, 10, aggregationOperationSum)
agg := newAggregator(10, aggregationOperationSum)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add more aggregation tests that call Add() with different labels as the existing ones assume the same labels each time?

// rangeAggregationOperations holds the mapping of range aggregation types to operations for an aggregator.
rangeAggregationOperations = map[types.RangeAggregationType]aggregationOperation{
types.RangeAggregationTypeSum: aggregationOperationSum,
types.RangeAggregationTypeCount: aggregationOperationCount,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any reason why these are removed?


for _, w := range windows {
r.aggregator.Add(w.end, value, labelValues)
r.aggregator.Add(w.end, value, labels, labelValues)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as we know the labels of a record, can we give a hint to the aggregator once per record to update its label set instead of doing it once per row?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants