66 */
77package org .elasticsearch .xpack .esql .expression .function .aggregate ;
88
9+ import org .elasticsearch .TransportVersion ;
910import org .elasticsearch .common .io .stream .StreamInput ;
1011import org .elasticsearch .common .io .stream .StreamOutput ;
1112import org .elasticsearch .xpack .esql .capabilities .PostAnalysisPlanVerificationAware ;
1718import org .elasticsearch .xpack .esql .core .expression .Literal ;
1819import org .elasticsearch .xpack .esql .core .expression .TypeResolutions ;
1920import org .elasticsearch .xpack .esql .core .expression .function .Function ;
20- import org .elasticsearch .xpack .esql .core .tree .NodeInfo ;
2121import org .elasticsearch .xpack .esql .core .tree .Source ;
22- import org .elasticsearch .xpack .esql .core .type .DataType ;
2322import org .elasticsearch .xpack .esql .core .util .CollectionUtils ;
2423import org .elasticsearch .xpack .esql .io .stream .PlanStreamInput ;
2524import org .elasticsearch .xpack .esql .plan .logical .Aggregate ;
2625import org .elasticsearch .xpack .esql .plan .logical .LogicalPlan ;
2726
2827import java .io .IOException ;
28+ import java .time .Duration ;
2929import java .util .List ;
3030import java .util .Objects ;
3131import java .util .function .BiConsumer ;
3838
3939/**
4040 * A type of {@code Function} that takes multiple values and extracts a single value out of them. For example, {@code AVG()}.
41+ * - Aggregate functions can have an optional filter and window, which default to {@code Literal.TRUE} and {@code NO_WINDOW}.
42+ * - The aggregation function should be composed as: source, field, filter, window, parameters.
43+ * Extra parameters should go to the parameters after the filter and window.
4144 */
4245public abstract class AggregateFunction extends Function implements PostAnalysisPlanVerificationAware {
46+ public static final Literal NO_WINDOW = Literal .timeDuration (Source .EMPTY , Duration .ZERO );
47+ public static final TransportVersion WINDOW_INTERVAL = TransportVersion .fromName ("aggregation_window" );
4348
4449 private final Expression field ;
4550 private final List <? extends Expression > parameters ;
4651 private final Expression filter ;
52+ private final Expression window ;
4753
4854 protected AggregateFunction (Source source , Expression field ) {
49- this (source , field , Literal .TRUE , emptyList ());
55+ this (source , field , Literal .TRUE , NO_WINDOW , emptyList ());
5056 }
5157
5258 protected AggregateFunction (Source source , Expression field , List <? extends Expression > parameters ) {
53- this (source , field , Literal .TRUE , parameters );
59+ this (source , field , Literal .TRUE , NO_WINDOW , parameters );
5460 }
5561
56- protected AggregateFunction (Source source , Expression field , Expression filter , List <? extends Expression > parameters ) {
57- super (source , CollectionUtils .combine (asList (field , filter ), parameters ));
62+ protected AggregateFunction (
63+ Source source ,
64+ Expression field ,
65+ Expression filter ,
66+ Expression window ,
67+ List <? extends Expression > parameters
68+ ) {
69+ super (source , CollectionUtils .combine (asList (field , filter , window ), parameters ));
5870 this .field = field ;
5971 this .filter = filter ;
72+ this .window = Objects .requireNonNull (window , "[window] must be specified; use NO_WINDOW instead" );
6073 this .parameters = parameters ;
6174 }
6275
@@ -65,48 +78,27 @@ protected AggregateFunction(StreamInput in) throws IOException {
6578 Source .readFrom ((PlanStreamInput ) in ),
6679 in .readNamedWriteable (Expression .class ),
6780 in .readNamedWriteable (Expression .class ),
81+ readWindow (in ),
6882 in .readNamedWriteableCollectionAsList (Expression .class )
6983 );
7084 }
7185
72- /**
73- * Read a generic AggregateFunction from the stream input. This is used for BWC when the subclass requires a generic instance;
74- * then convert the parameters to the specific ones.
75- */
76- protected static AggregateFunction readGenericAggregateFunction (StreamInput in ) throws IOException {
77- return new AggregateFunction (in ) {
78- @ Override
79- public AggregateFunction withFilter (Expression filter ) {
80- throw new UnsupportedOperationException ();
81- }
82-
83- @ Override
84- public DataType dataType () {
85- throw new UnsupportedOperationException ();
86- }
87-
88- @ Override
89- public Expression replaceChildren (List <Expression > newChildren ) {
90- throw new UnsupportedOperationException ();
91- }
92-
93- @ Override
94- protected NodeInfo <? extends Expression > info () {
95- throw new UnsupportedOperationException ();
96- }
97-
98- @ Override
99- public String getWriteableName () {
100- throw new UnsupportedOperationException ();
101- }
102- };
86+ protected static Expression readWindow (StreamInput in ) throws IOException {
87+ if (in .getTransportVersion ().supports (WINDOW_INTERVAL )) {
88+ return in .readNamedWriteable (Expression .class );
89+ } else {
90+ return NO_WINDOW ;
91+ }
10392 }
10493
10594 @ Override
10695 public final void writeTo (StreamOutput out ) throws IOException {
10796 source ().writeTo (out );
10897 out .writeNamedWriteable (field );
10998 out .writeNamedWriteable (filter );
99+ if (out .getTransportVersion ().supports (WINDOW_INTERVAL )) {
100+ out .writeNamedWriteable (window );
101+ }
110102 out .writeNamedWriteableCollection (parameters );
111103 }
112104
@@ -144,6 +136,23 @@ public AggregateFunction withParameters(List<? extends Expression> parameters) {
144136 return (AggregateFunction ) replaceChildren (CollectionUtils .combine (asList (field , filter ), parameters ));
145137 }
146138
139+ /**
140+ * Return the window associated with the aggregate function.
141+ */
142+ public Expression window () {
143+ return window ;
144+ }
145+
146+ /**
147+ * Whether the aggregate function has a window different than NO_WINDOW.
148+ */
149+ public boolean hasWindow () {
150+ if (window instanceof Literal lit && lit .value () instanceof Duration duration ) {
151+ return duration .isZero () == false ;
152+ }
153+ return true ;
154+ }
155+
147156 /**
148157 * Returns the set of input attributes required by this aggregate function, excluding those referenced by the filter.
149158 */
@@ -168,6 +177,7 @@ public boolean equals(Object obj) {
168177 AggregateFunction other = (AggregateFunction ) obj ;
169178 return Objects .equals (other .field (), field ())
170179 && Objects .equals (other .filter (), filter ())
180+ && Objects .equals (other .window (), window ())
171181 && Objects .equals (other .parameters (), parameters ());
172182 }
173183 return false ;
0 commit comments