diff --git a/server/src/main/resources/transport/definitions/referable/aggregation_window.csv b/server/src/main/resources/transport/definitions/referable/aggregation_window.csv new file mode 100644 index 0000000000000..0135b6d28dffd --- /dev/null +++ b/server/src/main/resources/transport/definitions/referable/aggregation_window.csv @@ -0,0 +1 @@ +9208000 diff --git a/server/src/main/resources/transport/upper_bounds/9.3.csv b/server/src/main/resources/transport/upper_bounds/9.3.csv index 8b97057346bc7..6bac5d55fcd84 100644 --- a/server/src/main/resources/transport/upper_bounds/9.3.csv +++ b/server/src/main/resources/transport/upper_bounds/9.3.csv @@ -1 +1 @@ -search_shards_resolved_index_expressions,9207000 +aggregation_window,9208000 diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java index 9909a7bc1e9ef..b6134609825a0 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/analysis/Analyzer.java @@ -1080,13 +1080,19 @@ private LogicalPlan resolveFuse(Fuse fuse, List childrenOutput) { Expression aggFilter = new Literal(source, true, DataType.BOOLEAN); List aggregates = new ArrayList<>(); - aggregates.add(new Alias(source, score.name(), new Sum(source, score, aggFilter, SummationMode.COMPENSATED_LITERAL))); + aggregates.add( + new Alias( + source, + score.name(), + new Sum(source, score, aggFilter, AggregateFunction.NO_WINDOW, SummationMode.COMPENSATED_LITERAL) + ) + ); for (Attribute attr : childrenOutput) { if (attr.name().equals(score.name())) { continue; } - var valuesAgg = new Values(source, attr, aggFilter); + var valuesAgg = new Values(source, attr, aggFilter, AggregateFunction.NO_WINDOW); // Use VALUES only on supported fields. // FuseScoreEval will check that the input contains only columns with supported data types // and will fail with an appropriate error message if it doesn't. diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java index ee45267c7bbf1..9031aed5b9e52 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/EsqlFunctionRegistry.java @@ -531,11 +531,11 @@ private static FunctionDefinition[][] functions() { def(Score.class, uni(Score::new), "score") }, // time-series functions new FunctionDefinition[] { - defTS(Rate.class, Rate::new, "rate"), - defTS(Irate.class, Irate::new, "irate"), - defTS(Idelta.class, Idelta::new, "idelta"), - defTS(Delta.class, Delta::new, "delta"), - defTS(Increase.class, Increase::new, "increase"), + defTS(Rate.class, bi(Rate::new), "rate"), + defTS(Irate.class, bi(Irate::new), "irate"), + defTS(Idelta.class, bi(Idelta::new), "idelta"), + defTS(Delta.class, bi(Delta::new), "delta"), + defTS(Increase.class, bi(Increase::new), "increase"), def(MaxOverTime.class, uni(MaxOverTime::new), "max_over_time"), def(MinOverTime.class, uni(MinOverTime::new), "min_over_time"), def(SumOverTime.class, uni(SumOverTime::new), "sum_over_time"), @@ -546,8 +546,8 @@ private static FunctionDefinition[][] functions() { def(PresentOverTime.class, uni(PresentOverTime::new), "present_over_time"), def(AbsentOverTime.class, uni(AbsentOverTime::new), "absent_over_time"), def(AvgOverTime.class, uni(AvgOverTime::new), "avg_over_time"), - defTS(LastOverTime.class, LastOverTime::new, "last_over_time"), - defTS(FirstOverTime.class, FirstOverTime::new, "first_over_time"), + defTS(LastOverTime.class, bi(LastOverTime::new), "last_over_time"), + defTS(FirstOverTime.class, bi(FirstOverTime::new), "first_over_time"), def(PercentileOverTime.class, bi(PercentileOverTime::new), "percentile_over_time"), // dense vector function def(TextEmbedding.class, bi(TextEmbedding::new), "text_embedding") } }; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Absent.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Absent.java index 7337d55b8f7df..febf4e3b2499b 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Absent.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Absent.java @@ -84,11 +84,11 @@ public Absent( description = "Expression that outputs values to be checked for absence." ) Expression field ) { - this(source, field, Literal.TRUE); + this(source, field, Literal.TRUE, NO_WINDOW); } - public Absent(Source source, Expression field, Expression filter) { - super(source, field, filter, emptyList()); + public Absent(Source source, Expression field, Expression filter, Expression window) { + super(source, field, filter, window, emptyList()); } private Absent(StreamInput in) throws IOException { @@ -102,17 +102,17 @@ public String getWriteableName() { @Override protected NodeInfo info() { - return NodeInfo.create(this, Absent::new, field(), filter()); + return NodeInfo.create(this, Absent::new, field(), filter(), window()); } @Override public AggregateFunction withFilter(Expression filter) { - return new Absent(source(), field(), filter); + return new Absent(source(), field(), filter, window()); } @Override public Absent replaceChildren(List newChildren) { - return new Absent(source(), newChildren.get(0), newChildren.get(1)); + return new Absent(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2)); } @Override @@ -138,6 +138,6 @@ protected TypeResolution resolveType() { @Override public Expression surrogate() { - return new Not(source(), new Present(source(), field(), filter())); + return new Not(source(), new Present(source(), field(), filter(), window())); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AbsentOverTime.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AbsentOverTime.java index 072d98a461855..5ecc0a9618884 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AbsentOverTime.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AbsentOverTime.java @@ -24,8 +24,6 @@ import java.io.IOException; import java.util.List; -import static java.util.Collections.emptyList; - /** * Similar to {@link Absent}, but it is used to check the absence of values over a time series in the given field. */ @@ -70,11 +68,11 @@ public AbsentOverTime( "version" } ) Expression field ) { - this(source, field, Literal.TRUE); + this(source, field, Literal.TRUE, NO_WINDOW); } - public AbsentOverTime(Source source, Expression field, Expression filter) { - super(source, field, filter, emptyList()); + public AbsentOverTime(Source source, Expression field, Expression filter, Expression window) { + super(source, field, filter, window, List.of()); } private AbsentOverTime(StreamInput in) throws IOException { @@ -88,17 +86,17 @@ public String getWriteableName() { @Override public AbsentOverTime withFilter(Expression filter) { - return new AbsentOverTime(source(), field(), filter); + return new AbsentOverTime(source(), field(), filter, window()); } @Override protected NodeInfo info() { - return NodeInfo.create(this, AbsentOverTime::new, field(), filter()); + return NodeInfo.create(this, AbsentOverTime::new, field(), filter(), window()); } @Override public AbsentOverTime replaceChildren(List newChildren) { - return new AbsentOverTime(source(), newChildren.get(0), newChildren.get(1)); + return new AbsentOverTime(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2)); } @Override @@ -113,6 +111,6 @@ public DataType dataType() { @Override public Absent perTimeSeriesAggregation() { - return new Absent(source(), field(), filter()); + return new Absent(source(), field(), filter(), window()); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateFunction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateFunction.java index 97b82444441b5..097bbaae99aa7 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateFunction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AggregateFunction.java @@ -6,6 +6,7 @@ */ package org.elasticsearch.xpack.esql.expression.function.aggregate; +import org.elasticsearch.TransportVersion; import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.xpack.esql.capabilities.PostAnalysisPlanVerificationAware; @@ -17,15 +18,14 @@ import org.elasticsearch.xpack.esql.core.expression.Literal; import org.elasticsearch.xpack.esql.core.expression.TypeResolutions; import org.elasticsearch.xpack.esql.core.expression.function.Function; -import org.elasticsearch.xpack.esql.core.tree.NodeInfo; import org.elasticsearch.xpack.esql.core.tree.Source; -import org.elasticsearch.xpack.esql.core.type.DataType; import org.elasticsearch.xpack.esql.core.util.CollectionUtils; import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput; import org.elasticsearch.xpack.esql.plan.logical.Aggregate; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; import java.io.IOException; +import java.time.Duration; import java.util.List; import java.util.Objects; import java.util.function.BiConsumer; @@ -38,25 +38,38 @@ /** * A type of {@code Function} that takes multiple values and extracts a single value out of them. For example, {@code AVG()}. + * - Aggregate functions can have an optional filter and window, which default to {@code Literal.TRUE} and {@code NO_WINDOW}. + * - The aggregation function should be composed as: source, field, filter, window, parameters. + * Extra parameters should go to the parameters after the filter and window. */ public abstract class AggregateFunction extends Function implements PostAnalysisPlanVerificationAware { + public static final Literal NO_WINDOW = Literal.timeDuration(Source.EMPTY, Duration.ZERO); + public static final TransportVersion WINDOW_INTERVAL = TransportVersion.fromName("aggregation_window"); private final Expression field; private final List parameters; private final Expression filter; + private final Expression window; protected AggregateFunction(Source source, Expression field) { - this(source, field, Literal.TRUE, emptyList()); + this(source, field, Literal.TRUE, NO_WINDOW, emptyList()); } protected AggregateFunction(Source source, Expression field, List parameters) { - this(source, field, Literal.TRUE, parameters); + this(source, field, Literal.TRUE, NO_WINDOW, parameters); } - protected AggregateFunction(Source source, Expression field, Expression filter, List parameters) { - super(source, CollectionUtils.combine(asList(field, filter), parameters)); + protected AggregateFunction( + Source source, + Expression field, + Expression filter, + Expression window, + List parameters + ) { + super(source, CollectionUtils.combine(asList(field, filter, window), parameters)); this.field = field; this.filter = filter; + this.window = Objects.requireNonNull(window, "[window] must be specified; use NO_WINDOW instead"); this.parameters = parameters; } @@ -65,41 +78,17 @@ protected AggregateFunction(StreamInput in) throws IOException { Source.readFrom((PlanStreamInput) in), in.readNamedWriteable(Expression.class), in.readNamedWriteable(Expression.class), + readWindow(in), in.readNamedWriteableCollectionAsList(Expression.class) ); } - /** - * Read a generic AggregateFunction from the stream input. This is used for BWC when the subclass requires a generic instance; - * then convert the parameters to the specific ones. - */ - protected static AggregateFunction readGenericAggregateFunction(StreamInput in) throws IOException { - return new AggregateFunction(in) { - @Override - public AggregateFunction withFilter(Expression filter) { - throw new UnsupportedOperationException(); - } - - @Override - public DataType dataType() { - throw new UnsupportedOperationException(); - } - - @Override - public Expression replaceChildren(List newChildren) { - throw new UnsupportedOperationException(); - } - - @Override - protected NodeInfo info() { - throw new UnsupportedOperationException(); - } - - @Override - public String getWriteableName() { - throw new UnsupportedOperationException(); - } - }; + protected static Expression readWindow(StreamInput in) throws IOException { + if (in.getTransportVersion().supports(WINDOW_INTERVAL)) { + return in.readNamedWriteable(Expression.class); + } else { + return NO_WINDOW; + } } @Override @@ -107,6 +96,9 @@ public final void writeTo(StreamOutput out) throws IOException { source().writeTo(out); out.writeNamedWriteable(field); out.writeNamedWriteable(filter); + if (out.getTransportVersion().supports(WINDOW_INTERVAL)) { + out.writeNamedWriteable(window); + } out.writeNamedWriteableCollection(parameters); } @@ -144,6 +136,23 @@ public AggregateFunction withParameters(List parameters) { return (AggregateFunction) replaceChildren(CollectionUtils.combine(asList(field, filter), parameters)); } + /** + * Return the window associated with the aggregate function. + */ + public Expression window() { + return window; + } + + /** + * Whether the aggregate function has a window different than NO_WINDOW. + */ + public boolean hasWindow() { + if (window instanceof Literal lit && lit.value() instanceof Duration duration) { + return duration.isZero() == false; + } + return true; + } + /** * Returns the set of input attributes required by this aggregate function, excluding those referenced by the filter. */ @@ -168,6 +177,7 @@ public boolean equals(Object obj) { AggregateFunction other = (AggregateFunction) obj; return Objects.equals(other.field(), field()) && Objects.equals(other.filter(), filter()) + && Objects.equals(other.window(), window()) && Objects.equals(other.parameters(), parameters()); } return false; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Avg.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Avg.java index 5616b0947077c..b8b2170617c44 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Avg.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Avg.java @@ -21,6 +21,7 @@ import org.elasticsearch.xpack.esql.expression.function.Param; import org.elasticsearch.xpack.esql.expression.function.scalar.multivalue.MvAvg; import org.elasticsearch.xpack.esql.expression.predicate.operator.arithmetic.Div; +import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput; import java.io.IOException; import java.util.List; @@ -30,7 +31,7 @@ import static org.elasticsearch.xpack.esql.core.type.DataType.AGGREGATE_METRIC_DOUBLE; public class Avg extends AggregateFunction implements SurrogateExpression { - public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Expression.class, "Avg", Avg::readFrom); + public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Expression.class, "Avg", Avg::new); private final Expression summationMode; @FunctionInfo( @@ -55,11 +56,11 @@ public Avg( description = "Expression that outputs values to average." ) Expression field ) { - this(source, field, Literal.TRUE, SummationMode.COMPENSATED_LITERAL); + this(source, field, Literal.TRUE, NO_WINDOW, SummationMode.COMPENSATED_LITERAL); } - public Avg(Source source, Expression field, Expression filter, Expression summationMode) { - super(source, field, filter, List.of(summationMode)); + public Avg(Source source, Expression field, Expression filter, Expression window, Expression summationMode) { + super(source, field, filter, window, List.of(summationMode)); this.summationMode = summationMode; } @@ -78,12 +79,19 @@ protected Expression.TypeResolution resolveType() { ); } - private static Avg readFrom(StreamInput in) throws IOException { - // For BWC and to ensure parameters always include the summation mode, first read a generic AggregateFunction, then convert to AVG. - var fn = readGenericAggregateFunction(in); - var parameters = fn.parameters(); - var summationMode = parameters.isEmpty() ? SummationMode.COMPENSATED_LITERAL : parameters.getFirst(); - return new Avg(fn.source(), fn.field(), fn.filter(), summationMode); + private Avg(StreamInput in) throws IOException { + this( + Source.readFrom((PlanStreamInput) in), + in.readNamedWriteable(Expression.class), + in.readNamedWriteable(Expression.class), + readWindow(in), + readSummationMode(in) + ); + } + + private static Expression readSummationMode(StreamInput in) throws IOException { + List parameters = in.readNamedWriteableCollectionAsList(Expression.class); + return parameters.isEmpty() ? SummationMode.COMPENSATED_LITERAL : parameters.getFirst(); } @Override @@ -98,17 +106,17 @@ public DataType dataType() { @Override protected NodeInfo info() { - return NodeInfo.create(this, Avg::new, field(), filter(), summationMode); + return NodeInfo.create(this, Avg::new, field(), filter(), window(), summationMode); } @Override public Avg replaceChildren(List newChildren) { - return new Avg(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2)); + return new Avg(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2), newChildren.get(3)); } @Override public Avg withFilter(Expression filter) { - return new Avg(source(), field(), filter, summationMode); + return new Avg(source(), field(), filter, window(), summationMode); } @Override @@ -116,11 +124,15 @@ public Expression surrogate() { var s = source(); var field = field(); if (field.dataType() == AGGREGATE_METRIC_DOUBLE) { - return new Div(s, new Sum(s, field, filter(), summationMode).surrogate(), new Count(s, field, filter()).surrogate()); + return new Div( + s, + new Sum(s, field, filter(), window(), summationMode).surrogate(), + new Count(s, field, filter(), window()).surrogate() + ); } if (field.foldable()) { return new MvAvg(s, field); } - return new Div(s, new Sum(s, field, filter(), summationMode), new Count(s, field, filter()), dataType()); + return new Div(s, new Sum(s, field, filter(), window(), summationMode), new Count(s, field, filter(), window()), dataType()); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AvgOverTime.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AvgOverTime.java index d0f68ae87d00a..9ae9466c6c7b6 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AvgOverTime.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AvgOverTime.java @@ -20,6 +20,7 @@ import org.elasticsearch.xpack.esql.expression.function.FunctionAppliesToLifecycle; import org.elasticsearch.xpack.esql.expression.function.FunctionInfo; import org.elasticsearch.xpack.esql.expression.function.FunctionType; +import org.elasticsearch.xpack.esql.expression.function.OptionalArgument; import org.elasticsearch.xpack.esql.expression.function.Param; import org.elasticsearch.xpack.esql.expression.predicate.operator.arithmetic.Div; @@ -31,7 +32,7 @@ /** * Similar to {@link Avg}, but it is used to calculate the average value over a time series of values from the given field. */ -public class AvgOverTime extends TimeSeriesAggregateFunction implements SurrogateExpression { +public class AvgOverTime extends TimeSeriesAggregateFunction implements OptionalArgument, SurrogateExpression { public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry( Expression.class, "AvgOverTime", @@ -54,11 +55,11 @@ public AvgOverTime( description = "Expression that outputs values to average." ) Expression field ) { - this(source, field, Literal.TRUE); + this(source, field, Literal.TRUE, NO_WINDOW); } - public AvgOverTime(Source source, Expression field, Expression filter) { - super(source, field, filter, emptyList()); + public AvgOverTime(Source source, Expression field, Expression filter, Expression window) { + super(source, field, filter, window, emptyList()); } private AvgOverTime(StreamInput in) throws IOException { @@ -82,28 +83,28 @@ public DataType dataType() { @Override protected NodeInfo info() { - return NodeInfo.create(this, AvgOverTime::new, field(), filter()); + return NodeInfo.create(this, AvgOverTime::new, field(), filter(), window()); } @Override public AvgOverTime replaceChildren(List newChildren) { - return new AvgOverTime(source(), newChildren.get(0), newChildren.get(1)); + return new AvgOverTime(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2)); } @Override public AvgOverTime withFilter(Expression filter) { - return new AvgOverTime(source(), field(), filter); + return new AvgOverTime(source(), field(), filter, window()); } @Override public Expression surrogate() { Source s = source(); Expression f = field(); - return new Div(s, new SumOverTime(s, f, filter()), new CountOverTime(s, f, filter()), dataType()); + return new Div(s, new SumOverTime(s, f, filter(), window()), new CountOverTime(s, f, filter(), window()), dataType()); } @Override public AggregateFunction perTimeSeriesAggregation() { - return new Avg(source(), field(), filter(), SummationMode.LOSSY_LITERAL); + return new Avg(source(), field(), filter(), window(), SummationMode.LOSSY_LITERAL); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Count.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Count.java index 5ee5f371ca0fa..80c9ccec320b7 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Count.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Count.java @@ -96,11 +96,11 @@ public Count( description = "Expression that outputs values to be counted. If omitted, equivalent to `COUNT(*)` (the number of rows)." ) Expression field ) { - this(source, field, Literal.TRUE); + this(source, field, Literal.TRUE, NO_WINDOW); } - public Count(Source source, Expression field, Expression filter) { - super(source, field, filter, emptyList()); + public Count(Source source, Expression field, Expression filter, Expression window) { + super(source, field, filter, window, emptyList()); } private Count(StreamInput in) throws IOException { @@ -114,17 +114,17 @@ public String getWriteableName() { @Override protected NodeInfo info() { - return NodeInfo.create(this, Count::new, field(), filter()); + return NodeInfo.create(this, Count::new, field(), filter(), window()); } @Override public AggregateFunction withFilter(Expression filter) { - return new Count(source(), field(), filter); + return new Count(source(), field(), filter, window()); } @Override public Count replaceChildren(List newChildren) { - return new Count(source(), newChildren.get(0), newChildren.get(1)); + return new Count(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2)); } @Override @@ -161,7 +161,9 @@ public Expression surrogate() { return new Sum( s, FromAggregateMetricDouble.withMetric(source(), field, AggregateMetricDoubleBlockBuilder.Metric.COUNT), - filter() + filter(), + window(), + SummationMode.COMPENSATED_LITERAL ); } @@ -179,7 +181,7 @@ public Expression surrogate() { return new Mul( s, new Coalesce(s, new MvCount(s, field), List.of(new Literal(s, 0, DataType.INTEGER))), - new Count(s, Literal.keyword(s, StringUtils.WILDCARD), filter()) + new Count(s, Literal.keyword(s, StringUtils.WILDCARD), filter(), window()) ); } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/CountDistinct.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/CountDistinct.java index 64f567e6a2d9e..136631fa68e09 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/CountDistinct.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/CountDistinct.java @@ -131,15 +131,15 @@ public CountDistinct( + "same effect as a threshold of 40000. The default value is 3000." ) Expression precision ) { - this(source, field, Literal.TRUE, precision); + this(source, field, Literal.TRUE, NO_WINDOW, precision); } - public CountDistinct(Source source, Expression field, Expression filter, Expression precision) { - this(source, field, filter, precision != null ? List.of(precision) : List.of()); + public CountDistinct(Source source, Expression field, Expression filter, Expression window, Expression precision) { + this(source, field, filter, window, precision != null ? List.of(precision) : List.of()); } - private CountDistinct(Source source, Expression field, Expression filter, List params) { - super(source, field, filter, params); + private CountDistinct(Source source, Expression field, Expression filter, Expression window, List params) { + super(source, field, filter, window, params); this.precision = params.size() > 0 ? params.get(0) : null; } @@ -148,6 +148,7 @@ private CountDistinct(StreamInput in) throws IOException { Source.readFrom((PlanStreamInput) in), in.readNamedWriteable(Expression.class), in.readNamedWriteable(Expression.class), + readWindow(in), in.readNamedWriteableCollectionAsList(Expression.class) ); } @@ -159,17 +160,18 @@ public String getWriteableName() { @Override protected NodeInfo info() { - return NodeInfo.create(this, CountDistinct::new, field(), filter(), precision); + return NodeInfo.create(this, CountDistinct::new, field(), filter(), window(), precision); } @Override public CountDistinct replaceChildren(List newChildren) { - return new CountDistinct(source(), newChildren.get(0), newChildren.get(1), newChildren.size() > 2 ? newChildren.get(2) : null); + Expression precision = newChildren.size() > 3 ? newChildren.get(3) : null; + return new CountDistinct(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2), precision); } @Override public CountDistinct withFilter(Expression filter) { - return new CountDistinct(source(), field(), filter, precision); + return new CountDistinct(source(), field(), filter, window(), precision); } @Override diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/CountDistinctOverTime.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/CountDistinctOverTime.java index 2396e4e69bad8..3dda46b5ac538 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/CountDistinctOverTime.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/CountDistinctOverTime.java @@ -60,11 +60,11 @@ public CountDistinctOverTime( + "same effect as a threshold of 40000. The default value is 3000." ) Expression precision ) { - this(source, field, Literal.TRUE, precision); + this(source, field, Literal.TRUE, NO_WINDOW, precision); } - public CountDistinctOverTime(Source source, Expression field, Expression filter, Expression precision) { - super(source, field, filter, precision == null ? List.of() : List.of(precision)); + public CountDistinctOverTime(Source source, Expression field, Expression filter, Expression window, Expression precision) { + super(source, field, filter, window, precision == null ? List.of() : List.of(precision)); this.precision = precision; } @@ -80,20 +80,18 @@ public String getWriteableName() { @Override public CountDistinctOverTime withFilter(Expression filter) { - return new CountDistinctOverTime(source(), field(), filter, precision); + return new CountDistinctOverTime(source(), field(), filter, window(), precision); } @Override protected NodeInfo info() { - return NodeInfo.create(this, CountDistinctOverTime::new, field(), filter(), precision); + return NodeInfo.create(this, CountDistinctOverTime::new, field(), filter(), window(), precision); } @Override public CountDistinctOverTime replaceChildren(List newChildren) { - if (newChildren.size() < 3) { - return new CountDistinctOverTime(source(), newChildren.get(0), newChildren.get(1), null); - } - return new CountDistinctOverTime(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2)); + Expression precision = newChildren.size() > 3 ? newChildren.get(3) : null; + return new CountDistinctOverTime(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2), precision); } @Override @@ -108,6 +106,6 @@ public DataType dataType() { @Override public CountDistinct perTimeSeriesAggregation() { - return new CountDistinct(source(), field(), filter(), precision); + return new CountDistinct(source(), field(), filter(), window(), precision); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/CountOverTime.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/CountOverTime.java index 0d75c1f082f74..b285d372a83a8 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/CountOverTime.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/CountOverTime.java @@ -70,11 +70,11 @@ public CountOverTime( "version" } ) Expression field ) { - this(source, field, Literal.TRUE); + this(source, field, Literal.TRUE, NO_WINDOW); } - public CountOverTime(Source source, Expression field, Expression filter) { - super(source, field, filter, emptyList()); + public CountOverTime(Source source, Expression field, Expression filter, Expression window) { + super(source, field, filter, window, emptyList()); } private CountOverTime(StreamInput in) throws IOException { @@ -88,17 +88,17 @@ public String getWriteableName() { @Override public CountOverTime withFilter(Expression filter) { - return new CountOverTime(source(), field(), filter); + return new CountOverTime(source(), field(), filter, window()); } @Override protected NodeInfo info() { - return NodeInfo.create(this, CountOverTime::new, field(), filter()); + return NodeInfo.create(this, CountOverTime::new, field(), filter(), window()); } @Override public CountOverTime replaceChildren(List newChildren) { - return new CountOverTime(source(), newChildren.get(0), newChildren.get(1)); + return new CountOverTime(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2)); } @Override @@ -113,6 +113,6 @@ public DataType dataType() { @Override public Count perTimeSeriesAggregation() { - return new Count(source(), field(), filter()); + return new Count(source(), field(), filter(), window()); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Delta.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Delta.java index ae0d44ae61a94..19636e5845a04 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Delta.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Delta.java @@ -51,16 +51,11 @@ public class Delta extends TimeSeriesAggregateFunction implements OptionalArgume examples = { @Example(file = "k8s-timeseries-delta", tag = "delta") } ) public Delta(Source source, @Param(name = "field", type = { "long", "integer", "double" }) Expression field, Expression timestamp) { - this(source, field, Literal.TRUE, timestamp); + this(source, field, Literal.TRUE, NO_WINDOW, timestamp); } - // compatibility constructor used when reading from the stream - private Delta(Source source, Expression field, Expression filter, List children) { - this(source, field, filter, children.getFirst()); - } - - private Delta(Source source, Expression field, Expression filter, Expression timestamp) { - super(source, field, filter, List.of(timestamp)); + private Delta(Source source, Expression field, Expression filter, Expression window, Expression timestamp) { + super(source, field, filter, window, List.of(timestamp)); this.timestamp = timestamp; } @@ -69,7 +64,8 @@ public Delta(StreamInput in) throws IOException { Source.readFrom((PlanStreamInput) in), in.readNamedWriteable(Expression.class), in.readNamedWriteable(Expression.class), - in.readNamedWriteableCollectionAsList(Expression.class) + readWindow(in), + in.readNamedWriteableCollectionAsList(Expression.class).getFirst() ); } @@ -85,16 +81,12 @@ protected NodeInfo info() { @Override public Delta replaceChildren(List newChildren) { - if (newChildren.size() != 3) { - assert false : "expected 3 children for field, filter, @timestamp; got " + newChildren; - throw new IllegalArgumentException("expected 3 children for field, filter, @timestamp; got " + newChildren); - } - return new Delta(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2)); + return new Delta(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2), newChildren.get(3)); } @Override public Delta withFilter(Expression filter) { - return new Delta(source(), field(), filter, timestamp); + return new Delta(source(), field(), filter, window(), timestamp); } @Override diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/DimensionValues.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/DimensionValues.java index f92ee87f3964c..7ff575a2f2d87 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/DimensionValues.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/DimensionValues.java @@ -50,7 +50,7 @@ public class DimensionValues extends AggregateFunction implements ToAggregator { public static final TransportVersion DIMENSION_VALUES_VERSION = TransportVersion.fromName("dimension_values"); public DimensionValues(Source source, Expression field) { - super(source, field, Literal.TRUE, emptyList()); + super(source, field, Literal.TRUE, NO_WINDOW, emptyList()); } private DimensionValues(StreamInput in) throws IOException { @@ -87,7 +87,7 @@ public DataType dataType() { @Override protected TypeResolution resolveType() { - return new Values(source(), field(), filter()).resolveType(); + return new Values(source(), field(), filter(), window()).resolveType(); } @Override @@ -96,6 +96,6 @@ public AggregatorFunctionSupplier supplier() { if (supplier != null) { return supplier.get(); } - return new Values(source(), field(), filter()).supplier(); + return new Values(source(), field(), filter(), window()).supplier(); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/First.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/First.java index 83522080f13ad..281c817931fb4 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/First.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/First.java @@ -59,11 +59,11 @@ public First( ) Expression field, @Param(name = "sort", type = { "date", "date_nanos" }, description = "Sort key") Expression sort ) { - this(source, field, Literal.TRUE, sort); + this(source, field, Literal.TRUE, NO_WINDOW, sort); } - private First(Source source, Expression field, Expression filter, Expression sort) { - super(source, field, filter, List.of(sort)); + private First(Source source, Expression field, Expression filter, Expression window, Expression sort) { + super(source, field, filter, window, List.of(sort)); this.sort = sort; } @@ -71,9 +71,10 @@ private static First readFrom(StreamInput in) throws IOException { Source source = Source.readFrom((PlanStreamInput) in); Expression field = in.readNamedWriteable(Expression.class); Expression filter = in.readNamedWriteable(Expression.class); + Expression window = readWindow(in); List params = in.readNamedWriteableCollectionAsList(Expression.class); Expression sort = params.getFirst(); - return new First(source, field, filter, sort); + return new First(source, field, filter, window, sort); } @Override @@ -88,12 +89,12 @@ protected NodeInfo info() { @Override public First replaceChildren(List newChildren) { - return new First(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2)); + return new First(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2), newChildren.get(3)); } @Override public First withFilter(Expression filter) { - return new First(source(), field(), filter, sort); + return new First(source(), field(), filter, window(), sort); } public Expression sort() { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/FirstDocId.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/FirstDocId.java index 83ed55af49c3c..b6580c33c409a 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/FirstDocId.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/FirstDocId.java @@ -29,11 +29,11 @@ public class FirstDocId extends AggregateFunction implements ToAggregator { public FirstDocId(Source source, Expression v) { - this(source, v, Literal.TRUE); + this(source, v, Literal.TRUE, NO_WINDOW); } - public FirstDocId(Source source, Expression field, Expression filter) { - super(source, field, filter, emptyList()); + public FirstDocId(Source source, Expression field, Expression filter, Expression window) { + super(source, field, filter, window, emptyList()); } @Override @@ -43,17 +43,17 @@ public String getWriteableName() { @Override protected NodeInfo info() { - return NodeInfo.create(this, FirstDocId::new, field(), filter()); + return NodeInfo.create(this, FirstDocId::new, field(), filter(), window()); } @Override public FirstDocId replaceChildren(List newChildren) { - return new FirstDocId(source(), newChildren.get(0), newChildren.get(1)); + return new FirstDocId(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2)); } @Override public FirstDocId withFilter(Expression filter) { - return new FirstDocId(source(), field(), filter); + return new FirstDocId(source(), field(), filter, window()); } @Override diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/FirstOverTime.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/FirstOverTime.java index 5cbe7d699e18f..67ab4de0dc17a 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/FirstOverTime.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/FirstOverTime.java @@ -64,16 +64,11 @@ public FirstOverTime( ) Expression field, Expression timestamp ) { - this(source, field, Literal.TRUE, timestamp); + this(source, field, Literal.TRUE, NO_WINDOW, timestamp); } - // compatibility constructor used when reading from the stream - private FirstOverTime(Source source, Expression field, Expression filter, List children) { - this(source, field, filter, children.getFirst()); - } - - private FirstOverTime(Source source, Expression field, Expression filter, Expression timestamp) { - super(source, field, filter, List.of(timestamp)); + public FirstOverTime(Source source, Expression field, Expression filter, Expression window, Expression timestamp) { + super(source, field, filter, window, List.of(timestamp)); this.timestamp = timestamp; } @@ -82,7 +77,8 @@ public FirstOverTime(StreamInput in) throws IOException { Source.readFrom((PlanStreamInput) in), in.readNamedWriteable(Expression.class), in.readNamedWriteable(Expression.class), - in.readNamedWriteableCollectionAsList(Expression.class) + readWindow(in), + in.readNamedWriteableCollectionAsList(Expression.class).getFirst() ); } @@ -93,21 +89,17 @@ public String getWriteableName() { @Override protected NodeInfo info() { - return NodeInfo.create(this, FirstOverTime::new, field(), timestamp); + return NodeInfo.create(this, FirstOverTime::new, field(), filter(), window(), timestamp); } @Override public FirstOverTime replaceChildren(List newChildren) { - if (newChildren.size() != 3) { - assert false : "expected 3 children for field, filter, @timestamp; got " + newChildren; - throw new IllegalArgumentException("expected 3 children for field, filter, @timestamp; got " + newChildren); - } - return new FirstOverTime(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2)); + return new FirstOverTime(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2), newChildren.get(3)); } @Override public FirstOverTime withFilter(Expression filter) { - return new FirstOverTime(source(), field(), filter, timestamp); + return new FirstOverTime(source(), field(), filter, window(), timestamp); } @Override diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Idelta.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Idelta.java index b9a4ee5ac5ca4..c70a1b4487b76 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Idelta.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Idelta.java @@ -53,16 +53,11 @@ public class Idelta extends TimeSeriesAggregateFunction implements OptionalArgum examples = { @Example(file = "k8s-timeseries-idelta", tag = "idelta") } ) public Idelta(Source source, @Param(name = "field", type = { "long", "integer", "double" }) Expression field, Expression timestamp) { - this(source, field, Literal.TRUE, timestamp); + this(source, field, Literal.TRUE, NO_WINDOW, timestamp); } - // compatibility constructor used when reading from the stream - private Idelta(Source source, Expression field, Expression filter, List children) { - this(source, field, filter, children.getFirst()); - } - - private Idelta(Source source, Expression field, Expression filter, Expression timestamp) { - super(source, field, filter, List.of(timestamp)); + public Idelta(Source source, Expression field, Expression filter, Expression window, Expression timestamp) { + super(source, field, filter, window, List.of(timestamp)); this.timestamp = timestamp; } @@ -71,7 +66,8 @@ public Idelta(StreamInput in) throws IOException { Source.readFrom((PlanStreamInput) in), in.readNamedWriteable(Expression.class), in.readNamedWriteable(Expression.class), - in.readNamedWriteableCollectionAsList(Expression.class) + readWindow(in), + in.readNamedWriteableCollectionAsList(Expression.class).getFirst() ); } @@ -82,21 +78,17 @@ public String getWriteableName() { @Override protected NodeInfo info() { - return NodeInfo.create(this, Idelta::new, field(), timestamp); + return NodeInfo.create(this, Idelta::new, field(), filter(), window(), timestamp); } @Override public Idelta replaceChildren(List newChildren) { - if (newChildren.size() != 3) { - assert false : "expected 3 children for field, filter, @timestamp; got " + newChildren; - throw new IllegalArgumentException("expected 3 children for field, filter, @timestamp; got " + newChildren); - } - return new Idelta(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2)); + return new Idelta(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2), newChildren.get(3)); } @Override public Idelta withFilter(Expression filter) { - return new Idelta(source(), field(), filter, timestamp); + return new Idelta(source(), field(), filter, window(), timestamp); } @Override diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Increase.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Increase.java index 549858cb72829..543ecf97327d7 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Increase.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Increase.java @@ -60,16 +60,11 @@ public Increase( @Param(name = "field", type = { "counter_long", "counter_integer", "counter_double" }) Expression field, Expression timestamp ) { - this(source, field, Literal.TRUE, timestamp); + this(source, field, Literal.TRUE, NO_WINDOW, timestamp); } - // compatibility constructor used when reading from the stream - private Increase(Source source, Expression field, Expression filter, List children) { - this(source, field, filter, children.getFirst()); - } - - private Increase(Source source, Expression field, Expression filter, Expression timestamp) { - super(source, field, filter, List.of(timestamp)); + private Increase(Source source, Expression field, Expression filter, Expression window, Expression timestamp) { + super(source, field, filter, window, List.of(timestamp)); this.timestamp = timestamp; } @@ -78,7 +73,8 @@ public Increase(StreamInput in) throws IOException { Source.readFrom((PlanStreamInput) in), in.readNamedWriteable(Expression.class), in.readNamedWriteable(Expression.class), - in.readNamedWriteableCollectionAsList(Expression.class) + readWindow(in), + in.readNamedWriteableCollectionAsList(Expression.class).getFirst() ); } @@ -94,16 +90,12 @@ protected NodeInfo info() { @Override public Increase replaceChildren(List newChildren) { - if (newChildren.size() != 3) { - assert false : "expected 3 children for field, filter, @timestamp; got " + newChildren; - throw new IllegalArgumentException("expected 3 children for field, filter, @timestamp; got " + newChildren); - } - return new Increase(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2)); + return new Increase(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2), newChildren.get(3)); } @Override public Increase withFilter(Expression filter) { - return new Increase(source(), field(), filter, timestamp); + return new Increase(source(), field(), filter, window(), timestamp); } @Override diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Irate.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Irate.java index a8b6dfb4ffa13..74bc8082beb9d 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Irate.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Irate.java @@ -56,16 +56,11 @@ public Irate( @Param(name = "field", type = { "counter_long", "counter_integer", "counter_double" }) Expression field, Expression timestamp ) { - this(source, field, Literal.TRUE, timestamp); + this(source, field, Literal.TRUE, NO_WINDOW, timestamp); } - // compatibility constructor used when reading from the stream - private Irate(Source source, Expression field, Expression filter, List children) { - this(source, field, filter, children.getFirst()); - } - - private Irate(Source source, Expression field, Expression filter, Expression timestamp) { - super(source, field, filter, List.of(timestamp)); + private Irate(Source source, Expression field, Expression filter, Expression window, Expression timestamp) { + super(source, field, filter, window, List.of(timestamp)); this.timestamp = timestamp; } @@ -74,7 +69,8 @@ public Irate(StreamInput in) throws IOException { Source.readFrom((PlanStreamInput) in), in.readNamedWriteable(Expression.class), in.readNamedWriteable(Expression.class), - in.readNamedWriteableCollectionAsList(Expression.class) + readWindow(in), + in.readNamedWriteableCollectionAsList(Expression.class).getFirst() ); } @@ -90,16 +86,12 @@ protected NodeInfo info() { @Override public Irate replaceChildren(List newChildren) { - if (newChildren.size() != 3) { - assert false : "expected 3 children for field, filter, @timestamp; got " + newChildren; - throw new IllegalArgumentException("expected 3 children for field, filter, @timestamp; got " + newChildren); - } - return new Irate(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2)); + return new Irate(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2), newChildren.get(3)); } @Override public Irate withFilter(Expression filter) { - return new Irate(source(), field(), filter, timestamp); + return new Irate(source(), field(), filter, window(), timestamp); } @Override diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Last.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Last.java index d042c7528dcdd..1315a4b80f35a 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Last.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Last.java @@ -59,11 +59,11 @@ public Last( ) Expression field, @Param(name = "sort", type = { "date", "date_nanos" }, description = "Sort key") Expression sort ) { - this(source, field, Literal.TRUE, sort); + this(source, field, Literal.TRUE, NO_WINDOW, sort); } - private Last(Source source, Expression field, Expression filter, Expression sort) { - super(source, field, filter, List.of(sort)); + private Last(Source source, Expression field, Expression filter, Expression window, Expression sort) { + super(source, field, filter, window, List.of(sort)); this.sort = sort; } @@ -71,9 +71,10 @@ private static Last readFrom(StreamInput in) throws IOException { Source source = Source.readFrom((PlanStreamInput) in); Expression field = in.readNamedWriteable(Expression.class); Expression filter = in.readNamedWriteable(Expression.class); + Expression window = readWindow(in); List params = in.readNamedWriteableCollectionAsList(Expression.class); Expression sort = params.getFirst(); - return new Last(source, field, filter, sort); + return new Last(source, field, filter, window, sort); } @Override @@ -88,12 +89,12 @@ protected NodeInfo info() { @Override public Last replaceChildren(List newChildren) { - return new Last(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2)); + return new Last(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2), newChildren.get(3)); } @Override public Last withFilter(Expression filter) { - return new Last(source(), field(), filter, sort); + return new Last(source(), field(), filter, window(), sort); } public Expression sort() { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/LastOverTime.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/LastOverTime.java index 87727bf08e1f5..4469a44242ecd 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/LastOverTime.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/LastOverTime.java @@ -65,16 +65,11 @@ public LastOverTime( ) Expression field, Expression timestamp ) { - this(source, field, Literal.TRUE, timestamp); + this(source, field, Literal.TRUE, NO_WINDOW, timestamp); } - // compatibility constructor used when reading from the stream - private LastOverTime(Source source, Expression field, Expression filter, List children) { - this(source, field, filter, children.getFirst()); - } - - private LastOverTime(Source source, Expression field, Expression filter, Expression timestamp) { - super(source, field, filter, List.of(timestamp)); + public LastOverTime(Source source, Expression field, Expression filter, Expression window, Expression timestamp) { + super(source, field, filter, window, List.of(timestamp)); this.timestamp = timestamp; } @@ -83,7 +78,8 @@ public LastOverTime(StreamInput in) throws IOException { Source.readFrom((PlanStreamInput) in), in.readNamedWriteable(Expression.class), in.readNamedWriteable(Expression.class), - in.readNamedWriteableCollectionAsList(Expression.class) + readWindow(in), + in.readNamedWriteableCollectionAsList(Expression.class).getFirst() ); } @@ -94,21 +90,17 @@ public String getWriteableName() { @Override protected NodeInfo info() { - return NodeInfo.create(this, LastOverTime::new, field(), timestamp); + return NodeInfo.create(this, LastOverTime::new, field(), filter(), window(), timestamp); } @Override public LastOverTime replaceChildren(List newChildren) { - if (newChildren.size() != 3) { - assert false : "expected 3 children for field, filter, @timestamp; got " + newChildren; - throw new IllegalArgumentException("expected 3 children for field, filter, @timestamp; got " + newChildren); - } - return new LastOverTime(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2)); + return new LastOverTime(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2), newChildren.get(3)); } @Override public LastOverTime withFilter(Expression filter) { - return new LastOverTime(source(), field(), filter, timestamp); + return new LastOverTime(source(), field(), filter, window(), timestamp); } @Override diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Max.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Max.java index 932ee08bdbed6..46a76eb241d5c 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Max.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Max.java @@ -91,11 +91,11 @@ public Max( "version" } ) Expression field ) { - this(source, field, Literal.TRUE); + this(source, field, Literal.TRUE, NO_WINDOW); } - public Max(Source source, Expression field, Expression filter) { - super(source, field, filter, emptyList()); + public Max(Source source, Expression field, Expression filter, Expression window) { + super(source, field, filter, window, emptyList()); } private Max(StreamInput in) throws IOException { @@ -109,17 +109,17 @@ public String getWriteableName() { @Override public Max withFilter(Expression filter) { - return new Max(source(), field(), filter); + return new Max(source(), field(), filter, window()); } @Override protected NodeInfo info() { - return NodeInfo.create(this, Max::new, field(), filter()); + return NodeInfo.create(this, Max::new, field(), filter(), window()); } @Override public Max replaceChildren(List newChildren) { - return new Max(source(), newChildren.get(0), newChildren.get(1)); + return new Max(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2)); } @Override @@ -163,7 +163,8 @@ public Expression surrogate() { return new Max( source(), FromAggregateMetricDouble.withMetric(source(), field(), AggregateMetricDoubleBlockBuilder.Metric.MAX), - filter() + filter(), + window() ); } return field().foldable() ? new MvMax(source(), field()) : null; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/MaxOverTime.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/MaxOverTime.java index 2bc011eecfc36..fe875eb0fd7db 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/MaxOverTime.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/MaxOverTime.java @@ -63,11 +63,11 @@ public MaxOverTime( "version" } ) Expression field ) { - this(source, field, Literal.TRUE); + this(source, field, Literal.TRUE, NO_WINDOW); } - public MaxOverTime(Source source, Expression field, Expression filter) { - super(source, field, filter, emptyList()); + public MaxOverTime(Source source, Expression field, Expression filter, Expression window) { + super(source, field, filter, window, emptyList()); } private MaxOverTime(StreamInput in) throws IOException { @@ -81,17 +81,17 @@ public String getWriteableName() { @Override public MaxOverTime withFilter(Expression filter) { - return new MaxOverTime(source(), field(), filter); + return new MaxOverTime(source(), field(), filter, window()); } @Override protected NodeInfo info() { - return NodeInfo.create(this, MaxOverTime::new, field(), filter()); + return NodeInfo.create(this, MaxOverTime::new, field(), filter(), window()); } @Override public MaxOverTime replaceChildren(List newChildren) { - return new MaxOverTime(source(), newChildren.get(0), newChildren.get(1)); + return new MaxOverTime(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2)); } @Override @@ -106,6 +106,6 @@ public DataType dataType() { @Override public Max perTimeSeriesAggregation() { - return new Max(source(), field(), filter()); + return new Max(source(), field(), filter(), window()); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Median.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Median.java index cd8b244a3d81e..2dc0af5eac27a 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Median.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Median.java @@ -63,11 +63,11 @@ public Median( description = "Expression that outputs values to calculate the median of." ) Expression field ) { - this(source, field, Literal.TRUE); + this(source, field, Literal.TRUE, NO_WINDOW); } - public Median(Source source, Expression field, Expression filter) { - super(source, field, filter, emptyList()); + public Median(Source source, Expression field, Expression filter, Expression window) { + super(source, field, filter, window, emptyList()); } @Override @@ -97,17 +97,17 @@ public DataType dataType() { @Override protected NodeInfo info() { - return NodeInfo.create(this, Median::new, field(), filter()); + return NodeInfo.create(this, Median::new, field(), filter(), window()); } @Override public Median replaceChildren(List newChildren) { - return new Median(source(), newChildren.get(0), newChildren.get(1)); + return new Median(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2)); } @Override public AggregateFunction withFilter(Expression filter) { - return new Median(source(), field(), filter); + return new Median(source(), field(), filter, window()); } @Override @@ -117,6 +117,6 @@ public Expression surrogate() { return field.foldable() ? new MvMedian(s, new ToDouble(s, field)) - : new Percentile(source(), field(), filter(), new Literal(source(), (int) QuantileStates.MEDIAN, DataType.INTEGER)); + : new Percentile(source(), field(), filter(), window(), new Literal(source(), (int) QuantileStates.MEDIAN, DataType.INTEGER)); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/MedianAbsoluteDeviation.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/MedianAbsoluteDeviation.java index f25d95a7df8c8..e5e69fa182c64 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/MedianAbsoluteDeviation.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/MedianAbsoluteDeviation.java @@ -67,11 +67,11 @@ public class MedianAbsoluteDeviation extends NumericAggregate implements Surroga ), } ) public MedianAbsoluteDeviation(Source source, @Param(name = "number", type = { "double", "integer", "long" }) Expression field) { - this(source, field, Literal.TRUE); + this(source, field, Literal.TRUE, NO_WINDOW); } - public MedianAbsoluteDeviation(Source source, Expression field, Expression filter) { - super(source, field, filter, emptyList()); + public MedianAbsoluteDeviation(Source source, Expression field, Expression filter, Expression window) { + super(source, field, filter, window, emptyList()); } private MedianAbsoluteDeviation(StreamInput in) throws IOException { @@ -85,17 +85,17 @@ public String getWriteableName() { @Override protected NodeInfo info() { - return NodeInfo.create(this, MedianAbsoluteDeviation::new, field(), filter()); + return NodeInfo.create(this, MedianAbsoluteDeviation::new, field(), filter(), window()); } @Override public MedianAbsoluteDeviation replaceChildren(List newChildren) { - return new MedianAbsoluteDeviation(source(), newChildren.get(0), newChildren.get(1)); + return new MedianAbsoluteDeviation(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2)); } @Override public MedianAbsoluteDeviation withFilter(Expression filter) { - return new MedianAbsoluteDeviation(source(), field(), filter); + return new MedianAbsoluteDeviation(source(), field(), filter, window()); } @Override diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Min.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Min.java index 7ea6241f896c0..c50b437867f2f 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Min.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Min.java @@ -91,11 +91,11 @@ public Min( "version" } ) Expression field ) { - this(source, field, Literal.TRUE); + this(source, field, Literal.TRUE, NO_WINDOW); } - public Min(Source source, Expression field, Expression filter) { - super(source, field, filter, emptyList()); + public Min(Source source, Expression field, Expression filter, Expression window) { + super(source, field, filter, window, emptyList()); } private Min(StreamInput in) throws IOException { @@ -109,17 +109,17 @@ public String getWriteableName() { @Override protected NodeInfo info() { - return NodeInfo.create(this, Min::new, field(), filter()); + return NodeInfo.create(this, Min::new, field(), filter(), window()); } @Override public Min replaceChildren(List newChildren) { - return new Min(source(), newChildren.get(0), newChildren.get(1)); + return new Min(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2)); } @Override public Min withFilter(Expression filter) { - return new Min(source(), field(), filter); + return new Min(source(), field(), filter, window()); } @Override @@ -163,7 +163,8 @@ public Expression surrogate() { return new Min( source(), FromAggregateMetricDouble.withMetric(source(), field(), AggregateMetricDoubleBlockBuilder.Metric.MIN), - filter() + filter(), + window() ); } return field().foldable() ? new MvMin(source(), field()) : null; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/MinOverTime.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/MinOverTime.java index 3258688cb1844..b94ff220eeac4 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/MinOverTime.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/MinOverTime.java @@ -63,11 +63,11 @@ public MinOverTime( "version" } ) Expression field ) { - this(source, field, Literal.TRUE); + this(source, field, Literal.TRUE, NO_WINDOW); } - public MinOverTime(Source source, Expression field, Expression filter) { - super(source, field, filter, emptyList()); + public MinOverTime(Source source, Expression field, Expression filter, Expression window) { + super(source, field, filter, window, emptyList()); } private MinOverTime(StreamInput in) throws IOException { @@ -81,17 +81,17 @@ public String getWriteableName() { @Override public MinOverTime withFilter(Expression filter) { - return new MinOverTime(source(), field(), filter); + return new MinOverTime(source(), field(), filter, window()); } @Override protected NodeInfo info() { - return NodeInfo.create(this, MinOverTime::new, field(), filter()); + return NodeInfo.create(this, MinOverTime::new, field(), filter(), window()); } @Override public MinOverTime replaceChildren(List newChildren) { - return new MinOverTime(source(), newChildren.get(0), newChildren.get(1)); + return new MinOverTime(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2)); } @Override @@ -106,6 +106,6 @@ public DataType dataType() { @Override public Min perTimeSeriesAggregation() { - return new Min(source(), field(), filter()); + return new Min(source(), field(), filter(), window()); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/NumericAggregate.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/NumericAggregate.java index 3289e1aded4ea..c57ab0dcf54da 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/NumericAggregate.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/NumericAggregate.java @@ -45,16 +45,8 @@ */ public abstract class NumericAggregate extends AggregateFunction implements ToAggregator { - NumericAggregate(Source source, Expression field, List parameters) { - super(source, field, parameters); - } - - NumericAggregate(Source source, Expression field, Expression filter, List parameters) { - super(source, field, filter, parameters); - } - - NumericAggregate(Source source, Expression field) { - super(source, field); + NumericAggregate(Source source, Expression field, Expression filter, Expression window, List parameters) { + super(source, field, filter, window, parameters); } NumericAggregate(StreamInput in) throws IOException { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Percentile.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Percentile.java index e3418aa17c3a3..23b55c429c009 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Percentile.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Percentile.java @@ -77,11 +77,11 @@ public Percentile( @Param(name = "number", type = { "double", "integer", "long" }) Expression field, @Param(name = "percentile", type = { "double", "integer", "long" }) Expression percentile ) { - this(source, field, Literal.TRUE, percentile); + this(source, field, Literal.TRUE, NO_WINDOW, percentile); } - public Percentile(Source source, Expression field, Expression filter, Expression percentile) { - super(source, field, filter, singletonList(percentile)); + public Percentile(Source source, Expression field, Expression filter, Expression window, Expression percentile) { + super(source, field, filter, window, singletonList(percentile)); this.percentile = percentile; } @@ -90,7 +90,8 @@ private Percentile(StreamInput in) throws IOException { Source.readFrom((PlanStreamInput) in), in.readNamedWriteable(Expression.class), in.readNamedWriteable(Expression.class), - in.readNamedWriteableCollectionAsList(Expression.class).get(0) + readWindow(in), + in.readNamedWriteableCollectionAsList(Expression.class).getFirst() ); } @@ -101,17 +102,17 @@ public String getWriteableName() { @Override protected NodeInfo info() { - return NodeInfo.create(this, Percentile::new, field(), filter(), percentile); + return NodeInfo.create(this, Percentile::new, field(), filter(), window(), percentile); } @Override public Percentile replaceChildren(List newChildren) { - return new Percentile(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2)); + return new Percentile(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2), newChildren.get(3)); } @Override public Percentile withFilter(Expression filter) { - return new Percentile(source(), field(), filter, percentile); + return new Percentile(source(), field(), filter, window(), percentile); } public Expression percentile() { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/PercentileOverTime.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/PercentileOverTime.java index 155296fc0edbd..e2148fee019c7 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/PercentileOverTime.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/PercentileOverTime.java @@ -7,7 +7,6 @@ package org.elasticsearch.xpack.esql.expression.function.aggregate; -import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.xpack.esql.core.expression.Expression; import org.elasticsearch.xpack.esql.core.expression.Literal; import org.elasticsearch.xpack.esql.core.tree.NodeInfo; @@ -20,7 +19,6 @@ import org.elasticsearch.xpack.esql.expression.function.FunctionType; import org.elasticsearch.xpack.esql.expression.function.Param; -import java.io.IOException; import java.util.List; /** @@ -44,15 +42,11 @@ public PercentileOverTime( ) Expression field, @Param(name = "percentile", type = { "double", "integer", "long" }) Expression percentile ) { - this(source, field, Literal.TRUE, percentile); + this(source, field, Literal.TRUE, NO_WINDOW, percentile); } - public PercentileOverTime(Source source, Expression field, Expression filter, Expression percentile) { - super(source, field, filter, List.of(percentile)); - } - - private PercentileOverTime(StreamInput in) throws IOException { - super(in); + public PercentileOverTime(Source source, Expression field, Expression filter, Expression window, Expression percentile) { + super(source, field, filter, window, List.of(percentile)); } @Override @@ -70,22 +64,22 @@ public DataType dataType() { @Override protected NodeInfo info() { - return NodeInfo.create(this, PercentileOverTime::new, field(), filter(), children().get(2)); + return NodeInfo.create(this, PercentileOverTime::new, field(), filter(), window(), children().get(3)); } @Override public PercentileOverTime replaceChildren(List newChildren) { - return new PercentileOverTime(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2)); + return new PercentileOverTime(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2), newChildren.get(3)); } @Override public PercentileOverTime withFilter(Expression filter) { - return new PercentileOverTime(source(), field(), filter, children().get(2)); + return new PercentileOverTime(source(), field(), filter, window(), children().get(3)); } @Override public AggregateFunction perTimeSeriesAggregation() { - return new Percentile(source(), field(), filter(), children().get(2)); + return new Percentile(source(), field(), filter(), window(), children().get(3)); } @Override diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Present.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Present.java index c65601211b544..f3fe8dd8786a2 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Present.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Present.java @@ -85,11 +85,11 @@ public Present( description = "Expression that outputs values to be checked for presence." ) Expression field ) { - this(source, field, Literal.TRUE); + this(source, field, Literal.TRUE, NO_WINDOW); } - public Present(Source source, Expression field, Expression filter) { - super(source, field, filter, emptyList()); + public Present(Source source, Expression field, Expression filter, Expression window) { + super(source, field, filter, window, emptyList()); } private Present(StreamInput in) throws IOException { @@ -103,17 +103,17 @@ public String getWriteableName() { @Override protected NodeInfo info() { - return NodeInfo.create(this, Present::new, field(), filter()); + return NodeInfo.create(this, Present::new, field(), filter(), window()); } @Override public AggregateFunction withFilter(Expression filter) { - return new Present(source(), field(), filter); + return new Present(source(), field(), filter, window()); } @Override public Present replaceChildren(List newChildren) { - return new Present(source(), newChildren.get(0), newChildren.get(1)); + return new Present(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2)); } @Override diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/PresentOverTime.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/PresentOverTime.java index 805a4b09d6da5..e922efca42602 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/PresentOverTime.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/PresentOverTime.java @@ -70,11 +70,11 @@ public PresentOverTime( "version" } ) Expression field ) { - this(source, field, Literal.TRUE); + this(source, field, Literal.TRUE, NO_WINDOW); } - public PresentOverTime(Source source, Expression field, Expression filter) { - super(source, field, filter, emptyList()); + public PresentOverTime(Source source, Expression field, Expression filter, Expression window) { + super(source, field, filter, window, emptyList()); } private PresentOverTime(StreamInput in) throws IOException { @@ -88,17 +88,17 @@ public String getWriteableName() { @Override public PresentOverTime withFilter(Expression filter) { - return new PresentOverTime(source(), field(), filter); + return new PresentOverTime(source(), field(), filter, window()); } @Override protected NodeInfo info() { - return NodeInfo.create(this, PresentOverTime::new, field(), filter()); + return NodeInfo.create(this, PresentOverTime::new, field(), filter(), window()); } @Override public PresentOverTime replaceChildren(List newChildren) { - return new PresentOverTime(source(), newChildren.get(0), newChildren.get(1)); + return new PresentOverTime(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2)); } @Override @@ -113,6 +113,6 @@ public DataType dataType() { @Override public Present perTimeSeriesAggregation() { - return new Present(source(), field(), filter()); + return new Present(source(), field(), filter(), window()); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Rate.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Rate.java index 59ccbfefc329d..48880199f188b 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Rate.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Rate.java @@ -60,16 +60,11 @@ public Rate( @Param(name = "field", type = { "counter_long", "counter_integer", "counter_double" }) Expression field, Expression timestamp ) { - this(source, field, Literal.TRUE, timestamp); + this(source, field, Literal.TRUE, NO_WINDOW, timestamp); } - // compatibility constructor used when reading from the stream - private Rate(Source source, Expression field, Expression filter, List children) { - this(source, field, filter, children.getFirst()); - } - - private Rate(Source source, Expression field, Expression filter, Expression timestamp) { - super(source, field, filter, List.of(timestamp)); + public Rate(Source source, Expression field, Expression filter, Expression window, Expression timestamp) { + super(source, field, filter, window, List.of(timestamp)); this.timestamp = timestamp; } @@ -78,7 +73,8 @@ public Rate(StreamInput in) throws IOException { Source.readFrom((PlanStreamInput) in), in.readNamedWriteable(Expression.class), in.readNamedWriteable(Expression.class), - in.readNamedWriteableCollectionAsList(Expression.class) + readWindow(in), + in.readNamedWriteableCollectionAsList(Expression.class).getFirst() ); } @@ -89,21 +85,17 @@ public String getWriteableName() { @Override protected NodeInfo info() { - return NodeInfo.create(this, Rate::new, field(), timestamp); + return NodeInfo.create(this, Rate::new, field(), filter(), window(), timestamp); } @Override public Rate replaceChildren(List newChildren) { - if (newChildren.size() != 3) { - assert false : "expected 3 children for field, filter, @timestamp; got " + newChildren; - throw new IllegalArgumentException("expected 3 children for field, filter, @timestamp; got " + newChildren); - } - return new Rate(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2)); + return new Rate(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2), newChildren.get(3)); } @Override public Rate withFilter(Expression filter) { - return new Rate(source(), field(), filter, timestamp); + return new Rate(source(), field(), filter, window(), timestamp); } @Override diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Sample.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Sample.java index 4f7ae2f3a2a34..5067c90d62441 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Sample.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Sample.java @@ -101,11 +101,7 @@ public Sample( ) Expression field, @Param(name = "limit", type = { "integer" }, description = "The maximum number of values to collect.") Expression limit ) { - this(source, field, Literal.TRUE, limit); - } - - public Sample(Source source, Expression field, Expression filter, Expression limit) { - this(source, field, filter, limit, new Literal(Source.EMPTY, Randomness.get().nextLong(), DataType.LONG)); + this(source, field, Literal.TRUE, NO_WINDOW, limit, new Literal(Source.EMPTY, Randomness.get().nextLong(), DataType.LONG)); } /** @@ -113,8 +109,8 @@ public Sample(Source source, Expression field, Expression filter, Expression lim * samples of size N. The uuid is used to ensure that the optimizer does not optimize both * expressions to one, resulting in identical samples. */ - public Sample(Source source, Expression field, Expression filter, Expression limit, Expression uuid) { - super(source, field, filter, List.of(limit, uuid)); + public Sample(Source source, Expression field, Expression filter, Expression window, Expression limit, Expression uuid) { + super(source, field, filter, window, List.of(limit, uuid)); } private Sample(StreamInput in) throws IOException { @@ -151,12 +147,12 @@ public DataType dataType() { @Override protected NodeInfo info() { - return NodeInfo.create(this, Sample::new, field(), filter(), limitField(), uuid()); + return NodeInfo.create(this, Sample::new, field(), filter(), window(), limitField(), uuid()); } @Override public Sample replaceChildren(List newChildren) { - return new Sample(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2), newChildren.get(3)); + return new Sample(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2), newChildren.get(3), newChildren.get(4)); } @Override @@ -173,7 +169,7 @@ public AggregatorFunctionSupplier supplier() { @Override public Sample withFilter(Expression filter) { - return new Sample(source(), field(), filter, limitField(), uuid()); + return new Sample(source(), field(), filter, window(), limitField(), uuid()); } Expression limitField() { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/SpatialAggregateFunction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/SpatialAggregateFunction.java index 248c151bcf948..3fe02f2df739b 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/SpatialAggregateFunction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/SpatialAggregateFunction.java @@ -28,8 +28,14 @@ public abstract class SpatialAggregateFunction extends AggregateFunction implements LicenseAware { protected final FieldExtractPreference fieldExtractPreference; - protected SpatialAggregateFunction(Source source, Expression field, Expression filter, FieldExtractPreference fieldExtractPreference) { - super(source, field, filter, emptyList()); + protected SpatialAggregateFunction( + Source source, + Expression field, + Expression filter, + Expression window, + FieldExtractPreference fieldExtractPreference + ) { + super(source, field, filter, window, emptyList()); this.fieldExtractPreference = fieldExtractPreference; } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/SpatialCentroid.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/SpatialCentroid.java index ac7475f9ca98a..be297b1898452 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/SpatialCentroid.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/SpatialCentroid.java @@ -54,11 +54,11 @@ public class SpatialCentroid extends SpatialAggregateFunction implements ToAggre examples = @Example(file = "spatial", tag = "st_centroid_agg-airports") ) public SpatialCentroid(Source source, @Param(name = "field", type = { "geo_point", "cartesian_point" }) Expression field) { - this(source, field, Literal.TRUE, NONE); + this(source, field, Literal.TRUE, NO_WINDOW, NONE); } - private SpatialCentroid(Source source, Expression field, Expression filter, FieldExtractPreference preference) { - super(source, field, filter, preference); + private SpatialCentroid(Source source, Expression field, Expression filter, Expression window, FieldExtractPreference preference) { + super(source, field, filter, window, preference); } private SpatialCentroid(StreamInput in) throws IOException { @@ -72,12 +72,12 @@ public String getWriteableName() { @Override public SpatialCentroid withFilter(Expression filter) { - return new SpatialCentroid(source(), field(), filter, fieldExtractPreference); + return new SpatialCentroid(source(), field(), filter, window(), fieldExtractPreference); } @Override public SpatialCentroid withFieldExtractPreference(FieldExtractPreference preference) { - return new SpatialCentroid(source(), field(), filter(), preference); + return new SpatialCentroid(source(), field(), filter(), window(), preference); } @Override diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/SpatialExtent.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/SpatialExtent.java index 9fe0d6a9b5474..53dcbef285984 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/SpatialExtent.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/SpatialExtent.java @@ -60,11 +60,11 @@ public SpatialExtent( Source source, @Param(name = "field", type = { "geo_point", "cartesian_point", "geo_shape", "cartesian_shape" }) Expression field ) { - this(source, field, Literal.TRUE, FieldExtractPreference.NONE); + this(source, field, Literal.TRUE, NO_WINDOW, FieldExtractPreference.NONE); } - private SpatialExtent(Source source, Expression field, Expression filter, FieldExtractPreference preference) { - super(source, field, filter, preference); + private SpatialExtent(Source source, Expression field, Expression filter, Expression window, FieldExtractPreference preference) { + super(source, field, filter, window, preference); } private SpatialExtent(StreamInput in) throws IOException { @@ -78,12 +78,12 @@ public String getWriteableName() { @Override public SpatialExtent withFilter(Expression filter) { - return new SpatialExtent(source(), field(), filter, fieldExtractPreference); + return new SpatialExtent(source(), field(), filter, window(), fieldExtractPreference); } @Override public SpatialExtent withFieldExtractPreference(FieldExtractPreference preference) { - return new SpatialExtent(source(), field(), filter(), preference); + return new SpatialExtent(source(), field(), filter(), window(), preference); } @Override diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/StdDev.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/StdDev.java index af4428199e322..f2883fbe33894 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/StdDev.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/StdDev.java @@ -50,11 +50,11 @@ public class StdDev extends AggregateFunction implements ToAggregator { ) } ) public StdDev(Source source, @Param(name = "number", type = { "double", "integer", "long" }) Expression field) { - this(source, field, Literal.TRUE); + this(source, field, Literal.TRUE, NO_WINDOW); } - public StdDev(Source source, Expression field, Expression filter) { - super(source, field, filter, emptyList()); + public StdDev(Source source, Expression field, Expression filter, Expression window) { + super(source, field, filter, window, emptyList()); } private StdDev(StreamInput in) throws IOException { @@ -84,16 +84,16 @@ protected Expression.TypeResolution resolveType() { @Override protected NodeInfo info() { - return NodeInfo.create(this, StdDev::new, field(), filter()); + return NodeInfo.create(this, StdDev::new, field(), filter(), window()); } @Override public StdDev replaceChildren(List newChildren) { - return new StdDev(source(), newChildren.get(0), newChildren.get(1)); + return new StdDev(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2)); } public StdDev withFilter(Expression filter) { - return new StdDev(source(), field(), filter); + return new StdDev(source(), field(), filter, window()); } @Override diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/StdDevOverTime.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/StdDevOverTime.java index c6ee0f3df5ad3..bc49cdb8948eb 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/StdDevOverTime.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/StdDevOverTime.java @@ -43,11 +43,11 @@ public StdDevOverTime( description = "Expression that outputs values to calculate the standard deviation of." ) Expression field ) { - this(source, field, Literal.TRUE); + this(source, field, Literal.TRUE, NO_WINDOW); } - public StdDevOverTime(Source source, Expression field, Expression filter) { - super(source, field, filter, emptyList()); + public StdDevOverTime(Source source, Expression field, Expression filter, Expression window) { + super(source, field, filter, window, emptyList()); } @Override @@ -67,21 +67,21 @@ public DataType dataType() { @Override protected NodeInfo info() { - return NodeInfo.create(this, StdDevOverTime::new, field(), filter()); + return NodeInfo.create(this, StdDevOverTime::new, field(), filter(), window()); } @Override public StdDevOverTime replaceChildren(List newChildren) { - return new StdDevOverTime(source(), newChildren.get(0), newChildren.get(1)); + return new StdDevOverTime(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2)); } @Override public StdDevOverTime withFilter(Expression filter) { - return new StdDevOverTime(source(), field(), filter); + return new StdDevOverTime(source(), field(), filter, window()); } @Override public AggregateFunction perTimeSeriesAggregation() { - return new StdDev(source(), field(), filter()); + return new StdDev(source(), field(), filter(), window()); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Sum.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Sum.java index bf78b0c5f8fdf..03a8978605d09 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Sum.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Sum.java @@ -29,6 +29,7 @@ import org.elasticsearch.xpack.esql.expression.function.scalar.convert.FromAggregateMetricDouble; import org.elasticsearch.xpack.esql.expression.function.scalar.multivalue.MvSum; import org.elasticsearch.xpack.esql.expression.predicate.operator.arithmetic.Mul; +import org.elasticsearch.xpack.esql.io.stream.PlanStreamInput; import java.io.IOException; import java.util.List; @@ -44,7 +45,7 @@ * Sum all values of a field in matching documents. */ public class Sum extends NumericAggregate implements SurrogateExpression { - public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Expression.class, "Sum", Sum::readFrom); + public static final NamedWriteableRegistry.Entry ENTRY = new NamedWriteableRegistry.Entry(Expression.class, "Sum", Sum::new); private final Expression summationMode; @@ -63,28 +64,27 @@ public class Sum extends NumericAggregate implements SurrogateExpression { ) } ) public Sum(Source source, @Param(name = "number", type = { "aggregate_metric_double", "double", "integer", "long" }) Expression field) { - this(source, field, Literal.TRUE, SummationMode.COMPENSATED_LITERAL); + this(source, field, Literal.TRUE, NO_WINDOW, SummationMode.COMPENSATED_LITERAL); } - public Sum( - Source source, - @Param(name = "number", type = { "aggregate_metric_double", "double", "integer", "long" }) Expression field, - Expression filter - ) { - this(source, field, filter, SummationMode.COMPENSATED_LITERAL); + public Sum(Source source, Expression field, Expression filter, Expression window, Expression summationMode) { + super(source, field, filter, window, List.of(summationMode)); + this.summationMode = summationMode; } - public Sum(Source source, Expression field, Expression filter, Expression summationMode) { - super(source, field, filter, List.of(summationMode)); - this.summationMode = summationMode; + private Sum(StreamInput in) throws IOException { + this( + Source.readFrom((PlanStreamInput) in), + in.readNamedWriteable(Expression.class), + in.readNamedWriteable(Expression.class), + readWindow(in), + readSummationMode(in) + ); } - private static Sum readFrom(StreamInput in) throws IOException { - // For BWC and to ensure parameters always include the summation mode, first read a generic AggregateFunction, then convert to SUM. - var fn = readGenericAggregateFunction(in); - var parameters = fn.parameters(); - var summationMode = parameters.isEmpty() ? SummationMode.COMPENSATED_LITERAL : parameters.getFirst(); - return new Sum(fn.source(), fn.field(), fn.filter(), summationMode); + private static Expression readSummationMode(StreamInput in) throws IOException { + List parameters = in.readNamedWriteableCollectionAsList(Expression.class); + return parameters.isEmpty() ? SummationMode.COMPENSATED_LITERAL : parameters.getFirst(); } @Override @@ -94,17 +94,17 @@ public String getWriteableName() { @Override protected NodeInfo info() { - return NodeInfo.create(this, Sum::new, field(), filter(), summationMode); + return NodeInfo.create(this, Sum::new, field(), filter(), window(), summationMode); } @Override public Sum replaceChildren(List newChildren) { - return new Sum(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2)); + return new Sum(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2), newChildren.get(3)); } @Override public Sum withFilter(Expression filter) { - return new Sum(source(), field(), filter, summationMode); + return new Sum(source(), field(), filter, window(), summationMode); } @Override @@ -166,11 +166,14 @@ public Expression surrogate() { s, FromAggregateMetricDouble.withMetric(source(), field, AggregateMetricDoubleBlockBuilder.Metric.SUM), filter(), + window(), summationMode ); } // SUM(const) is equivalent to MV_SUM(const)*COUNT(*). - return field.foldable() ? new Mul(s, new MvSum(s, field), new Count(s, Literal.keyword(s, StringUtils.WILDCARD), filter())) : null; + return field.foldable() + ? new Mul(s, new MvSum(s, field), new Count(s, Literal.keyword(s, StringUtils.WILDCARD), filter(), window())) + : null; } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/SumOverTime.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/SumOverTime.java index c8968a63b38dd..cc9509082d1c0 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/SumOverTime.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/SumOverTime.java @@ -48,11 +48,11 @@ public SumOverTime( Source source, @Param(name = "field", type = { "aggregate_metric_double", "double", "integer", "long" }) Expression field ) { - this(source, field, Literal.TRUE); + this(source, field, Literal.TRUE, NO_WINDOW); } - public SumOverTime(Source source, Expression field, Expression filter) { - super(source, field, filter, emptyList()); + public SumOverTime(Source source, Expression field, Expression filter, Expression window) { + super(source, field, filter, window, emptyList()); } private SumOverTime(StreamInput in) throws IOException { @@ -66,17 +66,17 @@ public String getWriteableName() { @Override public SumOverTime withFilter(Expression filter) { - return new SumOverTime(source(), field(), filter); + return new SumOverTime(source(), field(), filter, window()); } @Override protected NodeInfo info() { - return NodeInfo.create(this, SumOverTime::new, field(), filter()); + return NodeInfo.create(this, SumOverTime::new, field(), filter(), window()); } @Override public SumOverTime replaceChildren(List newChildren) { - return new SumOverTime(source(), newChildren.get(0), newChildren.get(1)); + return new SumOverTime(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2)); } @Override @@ -91,6 +91,6 @@ public DataType dataType() { @Override public Sum perTimeSeriesAggregation() { - return new Sum(source(), field(), filter(), SummationMode.LOSSY_LITERAL); + return new Sum(source(), field(), filter(), window(), SummationMode.LOSSY_LITERAL); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/TimeSeriesAggregateFunction.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/TimeSeriesAggregateFunction.java index 175c867e34714..7f28e5a447c99 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/TimeSeriesAggregateFunction.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/TimeSeriesAggregateFunction.java @@ -26,8 +26,14 @@ */ public abstract class TimeSeriesAggregateFunction extends AggregateFunction { - protected TimeSeriesAggregateFunction(Source source, Expression field, Expression filter, List parameters) { - super(source, field, filter, parameters); + protected TimeSeriesAggregateFunction( + Source source, + Expression field, + Expression filter, + Expression window, + List parameters + ) { + super(source, field, filter, window, parameters); } protected TimeSeriesAggregateFunction(StreamInput in) throws IOException { diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Top.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Top.java index 6a12f63ada117..6c32932c7cd19 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Top.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Top.java @@ -83,11 +83,11 @@ public Top( description = "The order to calculate the top values. Either `asc` or `desc`, and defaults to `asc` if omitted." ) Expression order ) { - this(source, field, Literal.TRUE, limit, order == null ? Literal.keyword(source, ORDER_ASC) : order); + this(source, field, Literal.TRUE, NO_WINDOW, limit, order == null ? Literal.keyword(source, ORDER_ASC) : order); } - public Top(Source source, Expression field, Expression filter, Expression limit, Expression order) { - super(source, field, filter, asList(limit, order)); + public Top(Source source, Expression field, Expression filter, Expression window, Expression limit, Expression order) { + super(source, field, filter, window, asList(limit, order)); } private Top(StreamInput in) throws IOException { @@ -95,13 +95,14 @@ private Top(StreamInput in) throws IOException { Source.readFrom((PlanStreamInput) in), in.readNamedWriteable(Expression.class), in.readNamedWriteable(Expression.class), + readWindow(in), in.readNamedWriteableCollectionAsList(Expression.class) ); } @Override public Top withFilter(Expression filter) { - return new Top(source(), field(), filter, limitField(), orderField()); + return new Top(source(), field(), filter, window(), limitField(), orderField()); } @Override @@ -247,12 +248,12 @@ public DataType dataType() { @Override protected NodeInfo info() { - return NodeInfo.create(this, Top::new, field(), filter(), limitField(), orderField()); + return NodeInfo.create(this, Top::new, field(), filter(), window(), limitField(), orderField()); } @Override public Top replaceChildren(List newChildren) { - return new Top(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2), newChildren.get(3)); + return new Top(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2), newChildren.get(3), newChildren.get(4)); } @Override @@ -284,9 +285,9 @@ public Expression surrogate() { var s = source(); if (orderField() instanceof Literal && limitField() instanceof Literal && limitValue() == 1) { if (orderValue()) { - return new Min(s, field(), filter()); + return new Min(s, field(), filter(), window()); } else { - return new Max(s, field(), filter()); + return new Max(s, field(), filter(), window()); } } return null; diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Values.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Values.java index a5dabf01cbd51..867f438aa53f2 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Values.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Values.java @@ -127,11 +127,11 @@ public Values( "version" } ) Expression v ) { - this(source, v, Literal.TRUE); + this(source, v, Literal.TRUE, NO_WINDOW); } - public Values(Source source, Expression field, Expression filter) { - super(source, field, filter, emptyList()); + public Values(Source source, Expression field, Expression filter, Expression window) { + super(source, field, filter, window, emptyList()); } private Values(StreamInput in) throws IOException { @@ -145,17 +145,17 @@ public String getWriteableName() { @Override protected NodeInfo info() { - return NodeInfo.create(this, Values::new, field(), filter()); + return NodeInfo.create(this, Values::new, field(), filter(), window()); } @Override public Values replaceChildren(List newChildren) { - return new Values(source(), newChildren.get(0), newChildren.get(1)); + return new Values(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2)); } @Override public Values withFilter(Expression filter) { - return new Values(source(), field(), filter); + return new Values(source(), field(), filter, window()); } @Override diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Variance.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Variance.java index bcb4734875650..d6d350d0f72f7 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Variance.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/Variance.java @@ -42,11 +42,11 @@ public class Variance extends AggregateFunction implements ToAggregator { examples = { @Example(file = "stats", tag = "variance") } ) public Variance(Source source, @Param(name = "number", type = { "double", "integer", "long" }) Expression field) { - this(source, field, Literal.TRUE); + this(source, field, Literal.TRUE, NO_WINDOW); } - public Variance(Source source, Expression field, Expression filter) { - super(source, field, filter, emptyList()); + public Variance(Source source, Expression field, Expression filter, Expression window) { + super(source, field, filter, window, emptyList()); } private Variance(StreamInput in) throws IOException { @@ -76,16 +76,16 @@ protected Expression.TypeResolution resolveType() { @Override protected NodeInfo info() { - return NodeInfo.create(this, Variance::new, field(), filter()); + return NodeInfo.create(this, Variance::new, field(), filter(), window()); } @Override public Variance replaceChildren(List newChildren) { - return new Variance(source(), newChildren.get(0), newChildren.get(1)); + return new Variance(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2)); } public Variance withFilter(Expression filter) { - return new Variance(source(), field(), filter); + return new Variance(source(), field(), filter, window()); } @Override diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/VarianceOverTime.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/VarianceOverTime.java index 61745619f98d0..7dbdb7283eff9 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/VarianceOverTime.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/VarianceOverTime.java @@ -43,11 +43,11 @@ public VarianceOverTime( description = "Expression for which to calculate the variance over time." ) Expression field ) { - this(source, field, Literal.TRUE); + this(source, field, Literal.TRUE, NO_WINDOW); } - public VarianceOverTime(Source source, Expression field, Expression filter) { - super(source, field, filter, emptyList()); + public VarianceOverTime(Source source, Expression field, Expression filter, Expression window) { + super(source, field, filter, window, emptyList()); } @Override @@ -67,21 +67,21 @@ public DataType dataType() { @Override protected NodeInfo info() { - return NodeInfo.create(this, VarianceOverTime::new, field(), filter()); + return NodeInfo.create(this, VarianceOverTime::new, field(), filter(), window()); } @Override public VarianceOverTime replaceChildren(List newChildren) { - return new VarianceOverTime(source(), newChildren.get(0), newChildren.get(1)); + return new VarianceOverTime(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2)); } @Override public VarianceOverTime withFilter(Expression filter) { - return new VarianceOverTime(source(), field(), filter); + return new VarianceOverTime(source(), field(), filter, window()); } @Override public AggregateFunction perTimeSeriesAggregation() { - return new Variance(source(), field(), filter()); + return new Variance(source(), field(), filter(), window()); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/WeightedAvg.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/WeightedAvg.java index d535b665eb267..264d6feeabfb1 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/WeightedAvg.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/expression/function/aggregate/WeightedAvg.java @@ -55,11 +55,11 @@ public WeightedAvg( @Param(name = "number", type = { "double", "integer", "long" }, description = "A numeric value.") Expression field, @Param(name = "weight", type = { "double", "integer", "long" }, description = "A numeric weight.") Expression weight ) { - this(source, field, Literal.TRUE, weight); + this(source, field, Literal.TRUE, NO_WINDOW, weight); } - public WeightedAvg(Source source, Expression field, Expression filter, Expression weight) { - super(source, field, filter, List.of(weight)); + public WeightedAvg(Source source, Expression field, Expression filter, Expression window, Expression weight) { + super(source, field, filter, window, List.of(weight)); this.weight = weight; } @@ -68,6 +68,7 @@ private WeightedAvg(StreamInput in) throws IOException { Source.readFrom((PlanStreamInput) in), in.readNamedWriteable(Expression.class), in.readNamedWriteable(Expression.class), + readWindow(in), in.readNamedWriteableCollectionAsList(Expression.class).get(0) ); } @@ -128,17 +129,17 @@ public DataType dataType() { @Override protected NodeInfo info() { - return NodeInfo.create(this, WeightedAvg::new, field(), filter(), weight); + return NodeInfo.create(this, WeightedAvg::new, field(), filter(), window(), weight); } @Override public WeightedAvg replaceChildren(List newChildren) { - return new WeightedAvg(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2)); + return new WeightedAvg(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2), newChildren.get(3)); } @Override public WeightedAvg withFilter(Expression filter) { - return new WeightedAvg(source(), field(), filter, weight()); + return new WeightedAvg(source(), field(), filter, window(), weight()); } @Override @@ -151,9 +152,19 @@ public Expression surrogate() { return new MvAvg(s, field); } if (weight.foldable()) { - return new Div(s, new Sum(s, field, filter()), new Count(s, field, filter()), dataType()); + return new Div( + s, + new Sum(s, field, filter(), window(), SummationMode.COMPENSATED_LITERAL), + new Count(s, field, filter(), window()), + dataType() + ); } else { - return new Div(s, new Sum(s, new Mul(s, field, weight), filter()), new Sum(s, weight, filter()), dataType()); + return new Div( + s, + new Sum(s, new Mul(s, field, weight), filter(), window(), SummationMode.COMPENSATED_LITERAL), + new Sum(s, weight, filter(), window(), SummationMode.COMPENSATED_LITERAL), + dataType() + ); } } diff --git a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/fuse/FuseScoreEval.java b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/fuse/FuseScoreEval.java index c848be878fd3e..87052943ca696 100644 --- a/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/fuse/FuseScoreEval.java +++ b/x-pack/plugin/esql/src/main/java/org/elasticsearch/xpack/esql/plan/logical/fuse/FuseScoreEval.java @@ -28,6 +28,7 @@ import org.elasticsearch.xpack.esql.core.tree.Source; import org.elasticsearch.xpack.esql.core.type.DataType; import org.elasticsearch.xpack.esql.core.util.Holder; +import org.elasticsearch.xpack.esql.expression.function.aggregate.AggregateFunction; import org.elasticsearch.xpack.esql.expression.function.aggregate.Values; import org.elasticsearch.xpack.esql.plan.logical.ExecutesOn; import org.elasticsearch.xpack.esql.plan.logical.LogicalPlan; @@ -151,7 +152,7 @@ private void validateInput(Failures failures) { Expression aggFilter = new Literal(source(), true, DataType.BOOLEAN); for (Attribute attr : child().output()) { - var valuesAgg = new Values(source(), attr, aggFilter); + var valuesAgg = new Values(source(), attr, aggFilter, AggregateFunction.NO_WINDOW); if (valuesAgg.resolved() == false) { failures.add( diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java index 47bd2ec45141f..e31bfdd5364b2 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/analysis/AnalyzerTests.java @@ -1448,7 +1448,7 @@ public void testAggsOverGroupingKey() throws Exception { assertThat(output, hasSize(2)); var aggs = agg.aggregates(); var min = as(Alias.unwrap(aggs.get(0)), Min.class); - assertThat(min.arguments(), hasSize(2)); // field + filter + assertThat(min.arguments(), hasSize(3)); // field + filter + window var group = Alias.unwrap(agg.groupings().get(0)); assertEquals(min.arguments().get(0), group); } @@ -1470,7 +1470,7 @@ public void testAggsOverGroupingKeyWithAlias() throws Exception { assertThat(output, hasSize(2)); var aggs = agg.aggregates(); var min = as(Alias.unwrap(aggs.get(0)), Min.class); - assertThat(min.arguments(), hasSize(2)); // field + filter + assertThat(min.arguments(), hasSize(3)); // field + filter assertEquals(Expressions.attribute(min.arguments().get(0)), Expressions.attribute(agg.groupings().get(0))); } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AvgSerializationTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AvgSerializationTests.java index d9f30f5dd8b05..d6c3fb679b199 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AvgSerializationTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/AvgSerializationTests.java @@ -27,26 +27,28 @@ public class AvgSerializationTests extends AbstractExpressionSerializationTests { @Override protected Avg createTestInstance() { - return new Avg(randomSource(), randomChild(), randomChild(), randomChild()); + return new Avg(randomSource(), randomChild(), randomChild(), randomChild(), randomChild()); } @Override protected Avg mutateInstance(Avg instance) throws IOException { Expression field = instance.field(); Expression filter = instance.filter(); + Expression window = instance.window(); Expression summationMode = instance.summationMode(); - switch (randomIntBetween(0, 2)) { + switch (randomIntBetween(0, 3)) { case 0 -> field = randomValueOtherThan(field, AbstractExpressionSerializationTests::randomChild); case 1 -> filter = randomValueOtherThan(filter, AbstractExpressionSerializationTests::randomChild); - case 2 -> summationMode = randomValueOtherThan(summationMode, AbstractExpressionSerializationTests::randomChild); + case 2 -> window = randomValueOtherThan(window, AbstractExpressionSerializationTests::randomChild); + case 3 -> summationMode = randomValueOtherThan(summationMode, AbstractExpressionSerializationTests::randomChild); default -> throw new AssertionError("unexpected value"); } - return new Avg(instance.source(), field, filter, summationMode); + return new Avg(instance.source(), field, filter, window, summationMode); } public static class OldAvg extends AggregateFunction { public OldAvg(Source source, Expression field, Expression filter) { - super(source, field, filter, List.of()); + super(source, field, filter, NO_WINDOW, List.of()); } @Override diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/FirstOverTimeTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/FirstOverTimeTests.java index dc5c463e1ac5a..af0efff1e754e 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/FirstOverTimeTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/FirstOverTimeTests.java @@ -11,6 +11,7 @@ import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; import org.elasticsearch.xpack.esql.core.expression.Expression; +import org.elasticsearch.xpack.esql.core.expression.Literal; import org.elasticsearch.xpack.esql.core.tree.Source; import org.elasticsearch.xpack.esql.core.type.DataType; import org.elasticsearch.xpack.esql.expression.function.AbstractAggregationTestCase; @@ -57,7 +58,7 @@ public static Iterable parameters() { @Override protected Expression build(Source source, List args) { - return new FirstOverTime(source, args.get(0), args.get(1)); + return new FirstOverTime(source, args.get(0), Literal.TRUE, AggregateFunction.NO_WINDOW, args.get(1)); } @Override diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/IdeltaTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/IdeltaTests.java index b174cf7154f13..508776402a5b5 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/IdeltaTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/IdeltaTests.java @@ -11,6 +11,7 @@ import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; import org.elasticsearch.xpack.esql.core.expression.Expression; +import org.elasticsearch.xpack.esql.core.expression.Literal; import org.elasticsearch.xpack.esql.core.tree.Source; import org.elasticsearch.xpack.esql.core.type.DataType; import org.elasticsearch.xpack.esql.expression.function.AbstractAggregationTestCase; @@ -54,7 +55,7 @@ public static Iterable parameters() { @Override protected Expression build(Source source, List args) { - return new Idelta(source, args.get(0), args.get(1)); + return new Idelta(source, args.get(0), Literal.TRUE, AggregateFunction.NO_WINDOW, args.get(1)); } @Override diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/RateSerializationTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/RateSerializationTests.java index d3130f9ae5f44..fd3299f4a55e6 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/RateSerializationTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/RateSerializationTests.java @@ -18,8 +18,10 @@ public class RateSerializationTests extends AbstractExpressionSerializationTests protected Rate createTestInstance() { Source source = randomSource(); Expression field = randomChild(); + Expression filter = randomChild(); + Expression window = randomChild(); Expression timestamp = randomChild(); - return new Rate(source, field, timestamp); + return new Rate(source, field, filter, window, timestamp); } @Override @@ -27,10 +29,15 @@ protected Rate mutateInstance(Rate instance) throws IOException { Source source = randomSource(); Expression field = instance.field(); Expression timestamp = instance.timestamp(); - switch (between(0, 1)) { + Expression filter = instance.filter(); + Expression window = instance.window(); + switch (between(0, 3)) { case 0 -> field = randomValueOtherThan(field, AbstractExpressionSerializationTests::randomChild); case 1 -> timestamp = randomValueOtherThan(timestamp, AbstractExpressionSerializationTests::randomChild); + case 2 -> filter = randomValueOtherThan(filter, AbstractExpressionSerializationTests::randomChild); + case 3 -> window = randomValueOtherThan(window, AbstractExpressionSerializationTests::randomChild); + default -> throw new AssertionError("unexpected value"); } - return new Rate(source, field, timestamp); + return new Rate(source, field, filter, window, timestamp); } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/RateTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/RateTests.java index 88e25ca47d913..dd79cf6c46c2e 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/RateTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/RateTests.java @@ -11,10 +11,12 @@ import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; import org.elasticsearch.xpack.esql.core.expression.Expression; +import org.elasticsearch.xpack.esql.core.expression.Literal; import org.elasticsearch.xpack.esql.core.tree.Source; import org.elasticsearch.xpack.esql.core.type.DataType; import org.elasticsearch.xpack.esql.expression.function.AbstractAggregationTestCase; import org.elasticsearch.xpack.esql.expression.function.DocsV3Support; +import org.elasticsearch.xpack.esql.expression.function.FunctionAppliesToLifecycle; import org.elasticsearch.xpack.esql.expression.function.MultiRowTestCaseSupplier; import org.elasticsearch.xpack.esql.expression.function.TestCaseSupplier; import org.hamcrest.Matcher; @@ -25,6 +27,7 @@ import java.util.Objects; import java.util.function.Supplier; +import static org.elasticsearch.xpack.esql.expression.function.TestCaseSupplier.appliesTo; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; @@ -54,7 +57,7 @@ public static Iterable parameters() { @Override protected Expression build(Source source, List args) { - return new Rate(source, args.get(0), args.get(1)); + return new Rate(source, args.get(0), Literal.TRUE, args.get(1), Rate.NO_WINDOW); } @Override @@ -155,6 +158,8 @@ public static List signatureTypes(List assertThat(params.get(1).dataType(), equalTo(DataType.DATETIME)); assertThat(params.get(2).dataType(), equalTo(DataType.INTEGER)); assertThat(params.get(3).dataType(), equalTo(DataType.LONG)); + var unavailable = appliesTo(FunctionAppliesToLifecycle.UNAVAILABLE, "9.3.0", "", false); + DocsV3Support.Param windowParam = new DocsV3Support.Param(DataType.TIME_DURATION, List.of(unavailable)); return List.of(params.get(0)); } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/StdDevOverTimeTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/StdDevOverTimeTests.java index e99401d41062c..c13b1c264cb46 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/StdDevOverTimeTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/StdDevOverTimeTests.java @@ -10,6 +10,7 @@ import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; import org.elasticsearch.xpack.esql.core.expression.Expression; +import org.elasticsearch.xpack.esql.core.expression.Literal; import org.elasticsearch.xpack.esql.core.tree.Source; import org.elasticsearch.xpack.esql.expression.function.AbstractFunctionTestCase; import org.elasticsearch.xpack.esql.expression.function.TestCaseSupplier; @@ -34,6 +35,6 @@ public static Iterable parameters() { @Override protected Expression build(Source source, List args) { - return new StdDevOverTime(source, args.get(0)); + return new StdDevOverTime(source, args.get(0), Literal.TRUE, AggregateFunction.NO_WINDOW); } } diff --git a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/SumSerializationTests.java b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/SumSerializationTests.java index a30c075efbc1f..25592151a8034 100644 --- a/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/SumSerializationTests.java +++ b/x-pack/plugin/esql/src/test/java/org/elasticsearch/xpack/esql/expression/function/aggregate/SumSerializationTests.java @@ -27,31 +27,33 @@ public class SumSerializationTests extends AbstractExpressionSerializationTests { @Override protected Sum createTestInstance() { - return new Sum(randomSource(), randomChild(), randomChild(), randomChild()); + return new Sum(randomSource(), randomChild(), randomChild(), randomChild(), randomChild()); } @Override protected Sum mutateInstance(Sum instance) throws IOException { Expression field = instance.field(); Expression filter = instance.filter(); + Expression window = instance.window(); Expression summationMode = instance.summationMode(); - switch (randomIntBetween(0, 2)) { + switch (randomIntBetween(0, 3)) { case 0 -> field = randomValueOtherThan(field, AbstractExpressionSerializationTests::randomChild); case 1 -> filter = randomValueOtherThan(filter, AbstractExpressionSerializationTests::randomChild); - case 2 -> summationMode = randomValueOtherThan(summationMode, AbstractExpressionSerializationTests::randomChild); + case 2 -> window = randomValueOtherThan(window, AbstractExpressionSerializationTests::randomChild); + case 3 -> summationMode = randomValueOtherThan(summationMode, AbstractExpressionSerializationTests::randomChild); default -> throw new AssertionError("unexpected value"); } - return new Sum(instance.source(), field, filter, summationMode); + return new Sum(instance.source(), field, filter, window, summationMode); } public static class OldSum extends AggregateFunction { - public OldSum(Source source, Expression field, Expression filter) { - super(source, field, filter, List.of()); + public OldSum(Source source, Expression field, Expression filter, Expression window) { + super(source, field, filter, window, List.of()); } @Override public AggregateFunction withFilter(Expression filter) { - return new OldSum(source(), filter, filter); + return new OldSum(source(), filter, filter, window()); } @Override @@ -61,12 +63,12 @@ public DataType dataType() { @Override public Expression replaceChildren(List newChildren) { - return new OldSum(source(), newChildren.get(0), newChildren.get(1)); + return new OldSum(source(), newChildren.get(0), newChildren.get(1), newChildren.get(2)); } @Override protected NodeInfo info() { - return NodeInfo.create(this, OldSum::new, field(), filter()); + return NodeInfo.create(this, OldSum::new, field(), filter(), window()); } @Override @@ -76,7 +78,7 @@ public String getWriteableName() { } public void testSerializeOldSum() throws IOException { - var oldSum = new OldSum(randomSource(), randomChild(), randomChild()); + var oldSum = new OldSum(randomSource(), randomChild(), randomChild(), randomChild()); try (BytesStreamOutput out = new BytesStreamOutput()) { PlanStreamOutput planOut = new PlanStreamOutput(out, configuration()); planOut.writeNamedWriteable(oldSum);