diff --git a/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/ArrayFunctions.java b/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/ArrayFunctions.java
index 5a165f12a7ec..f5abf48d2f19 100644
--- a/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/ArrayFunctions.java
+++ b/pinot-common/src/main/java/org/apache/pinot/common/function/scalar/ArrayFunctions.java
@@ -365,4 +365,29 @@ public static String arrayToString(String[] values, String delimiter, String nul
             .map(s -> s == null || s.equals(NullValuePlaceHolder.STRING) ? nullString : s)
             .toArray(String[]::new));
   }
+
+  @ScalarFunction
+  public static int arrayLengthInt(int[] values) {
+    return values.length;
+  }
+
+  @ScalarFunction
+  public static int arrayLengthLong(long[] values) {
+    return values.length;
+  }
+
+  @ScalarFunction
+  public static int arrayLengthFloat(float[] values) {
+    return values.length;
+  }
+
+  @ScalarFunction
+  public static int arrayLengthDouble(double[] values) {
+    return values.length;
+  }
+
+  @ScalarFunction
+  public static int arrayLengthString(String[] values) {
+    return values.length;
+  }
 }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
index 379c697f76ab..8650ae55bf63 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/ObjectSerDeUtils.java
@@ -77,6 +77,7 @@
 import org.apache.pinot.common.CustomObject;
 import org.apache.pinot.common.utils.HashUtil;
 import org.apache.pinot.core.query.aggregation.function.funnel.FunnelStepEvent;
+import org.apache.pinot.core.query.aggregation.function.funnel.FunnelStepEventWithExtraFields;
 import org.apache.pinot.core.query.aggregation.utils.exprminmax.ExprMinMaxObject;
 import org.apache.pinot.core.query.utils.idset.IdSet;
 import org.apache.pinot.core.query.utils.idset.IdSets;
@@ -1747,6 +1748,63 @@ public PriorityQueue<FunnelStepEvent> deserialize(ByteBuffer byteBuffer) {
         }
       };
 
+  public static final ObjectSerDe<PriorityQueue<FunnelStepEventWithExtraFields>>
+      FUNNEL_STEP_EVENT_WITH_EXTRA_FIELDS_ACCUMULATOR_SER_DE =
+      new ObjectSerDe<PriorityQueue<FunnelStepEventWithExtraFields>>() {
+
+        @Override
+        public byte[] serialize(PriorityQueue<FunnelStepEventWithExtraFields> funnelStepEvents) {
+          int numEvents = funnelStepEvents.size();
+          List<byte[]> serializedEvents = new ArrayList<>(numEvents);
+          long bufferSize = Integer.BYTES; // Start with size for number of events
+
+          // First pass: Serialize each event and calculate total buffer size
+          for (FunnelStepEventWithExtraFields funnelStepEvent : funnelStepEvents) {
+            byte[] eventBytes = funnelStepEvent.getBytes(); // Costly operation, compute only once
+            serializedEvents.add(eventBytes); // Store serialized form
+            bufferSize += Integer.BYTES; // Add size for storing length
+            bufferSize += eventBytes.length; // Add size of serialized content
+          }
+
+          // Ensure the total buffer size doesn't exceed 2GB
+          Preconditions.checkState(bufferSize <= Integer.MAX_VALUE, "Buffer size exceeds 2GB");
+
+          // Allocate buffer
+          byte[] bytes = new byte[(int) bufferSize];
+          ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+
+          // Second pass: Write data to the buffer
+          byteBuffer.putInt(numEvents); // Write number of events
+          for (byte[] eventBytes : serializedEvents) {
+            byteBuffer.putInt(eventBytes.length); // Write length of each event
+            byteBuffer.put(eventBytes); // Write event content
+          }
+
+          return bytes;
+        }
+
+        @Override
+        public PriorityQueue<FunnelStepEventWithExtraFields> deserialize(byte[] bytes) {
+          return deserialize(ByteBuffer.wrap(bytes));
+        }
+
+        @Override
+        public PriorityQueue<FunnelStepEventWithExtraFields> deserialize(ByteBuffer byteBuffer) {
+          int size = byteBuffer.getInt();
+          if (size == 0) {
+            return new PriorityQueue<>();
+          }
+          PriorityQueue<FunnelStepEventWithExtraFields> funnelStepEvents = new PriorityQueue<>(size);
+          for (int i = 0; i < size; i++) {
+            int funnelStepEventWithExtraFieldsByteSize = byteBuffer.getInt();
+            byte[] bytes = new byte[funnelStepEventWithExtraFieldsByteSize];
+            byteBuffer.get(bytes);
+            funnelStepEvents.add(new FunnelStepEventWithExtraFields(bytes));
+          }
+          return funnelStepEvents;
+        }
+      };
+
   // NOTE: DO NOT change the order, it has to be the same order as the ObjectType
   //@formatter:off
   private static final ObjectSerDe[] SER_DES = {
@@ -1802,6 +1860,7 @@ public PriorityQueue<FunnelStepEvent> deserialize(ByteBuffer byteBuffer) {
       DATA_SKETCH_CPC_ACCUMULATOR_SER_DE,
       ORDERED_STRING_SET_SER_DE,
       FUNNEL_STEP_EVENT_ACCUMULATOR_SER_DE,
+      FUNNEL_STEP_EVENT_WITH_EXTRA_FIELDS_ACCUMULATOR_SER_DE,
   };
   //@formatter:on
 
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
index c795f5af2544..0c746328e200 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/common/datablock/DataBlockBuilder.java
@@ -19,6 +19,10 @@
 package org.apache.pinot.core.common.datablock;
 
 import com.google.common.base.Preconditions;
+import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
+import it.unimi.dsi.fastutil.floats.FloatArrayList;
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import it.unimi.dsi.fastutil.longs.LongArrayList;
 import it.unimi.dsi.fastutil.objects.Object2IntMap;
 import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
 import java.io.IOException;
@@ -121,16 +125,32 @@ public static RowDataBlock buildFromRows(List<Object[]> rows, DataSchema dataSch
             break;
           // Multi-value column
           case INT_ARRAY:
-            setColumn(fixedSize, varSize, (int[]) value);
+            if (value instanceof IntArrayList) {
+              setColumn(fixedSize, varSize, ((IntArrayList) value).elements());
+            } else {
+              setColumn(fixedSize, varSize, (int[]) value);
+            }
             break;
           case LONG_ARRAY:
-            setColumn(fixedSize, varSize, (long[]) value);
+            if (value instanceof LongArrayList) {
+              setColumn(fixedSize, varSize, ((LongArrayList) value).elements());
+            } else {
+              setColumn(fixedSize, varSize, (long[]) value);
+            }
             break;
           case FLOAT_ARRAY:
-            setColumn(fixedSize, varSize, (float[]) value);
+            if (value instanceof FloatArrayList) {
+              setColumn(fixedSize, varSize, ((FloatArrayList) value).elements());
+            } else {
+              setColumn(fixedSize, varSize, (float[]) value);
+            }
             break;
           case DOUBLE_ARRAY:
-            setColumn(fixedSize, varSize, (double[]) value);
+            if (value instanceof DoubleArrayList) {
+              setColumn(fixedSize, varSize, ((DoubleArrayList) value).elements());
+            } else {
+              setColumn(fixedSize, varSize, (double[]) value);
+            }
             break;
           case STRING_ARRAY:
             setColumn(fixedSize, varSize, (String[]) value, dictionary);
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/operator/docvalsets/RowBasedBlockValSet.java b/pinot-core/src/main/java/org/apache/pinot/core/operator/docvalsets/RowBasedBlockValSet.java
index c18cd3acb486..400b9f5d2b5d 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/operator/docvalsets/RowBasedBlockValSet.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/operator/docvalsets/RowBasedBlockValSet.java
@@ -18,6 +18,10 @@
  */
 package org.apache.pinot.core.operator.docvalsets;
 
+import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
+import it.unimi.dsi.fastutil.ints.IntArrayList;
+import it.unimi.dsi.fastutil.longs.LongArrayList;
+import it.unimi.dsi.fastutil.objects.ObjectArrayList;
 import java.math.BigDecimal;
 import java.util.Arrays;
 import java.util.List;
@@ -435,6 +439,30 @@ public int[][] getIntValuesMV() {
         for (int j = 0; j < stringArray.length; j++) {
           values[i][j] = Integer.parseInt(stringArray[j]);
         }
+      } else if (storedValue instanceof IntArrayList) {
+        IntArrayList intArrayList = (IntArrayList) storedValue;
+        values[i] = new int[intArrayList.size()];
+        for (int j = 0; j < intArrayList.size(); j++) {
+          values[i][j] = intArrayList.getInt(j);
+        }
+      } else if (storedValue instanceof LongArrayList) {
+        LongArrayList longArrayList = (LongArrayList) storedValue;
+        values[i] = new int[longArrayList.size()];
+        for (int j = 0; j < longArrayList.size(); j++) {
+          values[i][j] = (int) longArrayList.getLong(j);
+        }
+      } else if (storedValue instanceof DoubleArrayList) {
+        DoubleArrayList doubleArrayList = (DoubleArrayList) storedValue;
+        values[i] = new int[doubleArrayList.size()];
+        for (int j = 0; j < doubleArrayList.size(); j++) {
+          values[i][j] = (int) doubleArrayList.getDouble(j);
+        }
+      } else if (storedValue instanceof ObjectArrayList) {
+        ObjectArrayList list = (ObjectArrayList) storedValue;
+        values[i] = new int[list.size()];
+        for (int j = 0; j < list.size(); j++) {
+          values[i][j] = Integer.parseInt(list.get(j).toString());
+        }
       } else {
         throw new IllegalStateException("Unsupported data type: " + storedValue.getClass().getName());
       }
@@ -475,6 +503,30 @@ public long[][] getLongValuesMV() {
         for (int j = 0; j < stringArray.length; j++) {
           values[i][j] = Long.parseLong(stringArray[j]);
         }
+      } else if (storedValue instanceof IntArrayList) {
+        IntArrayList intArrayList = (IntArrayList) storedValue;
+        values[i] = new long[intArrayList.size()];
+        for (int j = 0; j < intArrayList.size(); j++) {
+          values[i][j] = intArrayList.getInt(j);
+        }
+      } else if (storedValue instanceof LongArrayList) {
+        LongArrayList longArrayList = (LongArrayList) storedValue;
+        values[i] = new long[longArrayList.size()];
+        for (int j = 0; j < longArrayList.size(); j++) {
+          values[i][j] = longArrayList.getLong(j);
+        }
+      } else if (storedValue instanceof DoubleArrayList) {
+        DoubleArrayList doubleArrayList = (DoubleArrayList) storedValue;
+        values[i] = new long[doubleArrayList.size()];
+        for (int j = 0; j < doubleArrayList.size(); j++) {
+          values[i][j] = (long) doubleArrayList.getDouble(j);
+        }
+      } else if (storedValue instanceof ObjectArrayList) {
+        ObjectArrayList list = (ObjectArrayList) storedValue;
+        values[i] = new long[list.size()];
+        for (int j = 0; j < list.size(); j++) {
+          values[i][j] = Long.parseLong(list.get(j).toString());
+        }
       } else {
         throw new IllegalStateException("Unsupported data type: " + storedValue.getClass().getName());
       }
@@ -515,6 +567,30 @@ public float[][] getFloatValuesMV() {
         for (int j = 0; j < stringArray.length; j++) {
           values[i][j] = Float.parseFloat(stringArray[j]);
         }
+      } else if (storedValue instanceof IntArrayList) {
+        IntArrayList intArrayList = (IntArrayList) storedValue;
+        values[i] = new float[intArrayList.size()];
+        for (int j = 0; j < intArrayList.size(); j++) {
+          values[i][j] = intArrayList.getInt(j);
+        }
+      } else if (storedValue instanceof LongArrayList) {
+        LongArrayList longArrayList = (LongArrayList) storedValue;
+        values[i] = new float[longArrayList.size()];
+        for (int j = 0; j < longArrayList.size(); j++) {
+          values[i][j] = longArrayList.getLong(j);
+        }
+      } else if (storedValue instanceof DoubleArrayList) {
+        DoubleArrayList doubleArrayList = (DoubleArrayList) storedValue;
+        values[i] = new float[doubleArrayList.size()];
+        for (int j = 0; j < doubleArrayList.size(); j++) {
+          values[i][j] = (float) doubleArrayList.getDouble(j);
+        }
+      } else if (storedValue instanceof ObjectArrayList) {
+        ObjectArrayList list = (ObjectArrayList) storedValue;
+        values[i] = new float[list.size()];
+        for (int j = 0; j < list.size(); j++) {
+          values[i][j] = Float.parseFloat(list.get(j).toString());
+        }
       } else {
         throw new IllegalStateException("Unsupported data type: " + storedValue.getClass().getName());
       }
@@ -555,6 +631,30 @@ public double[][] getDoubleValuesMV() {
         for (int j = 0; j < stringArray.length; j++) {
           values[i][j] = Double.parseDouble(stringArray[j]);
         }
+      } else if (storedValue instanceof IntArrayList) {
+        IntArrayList intArrayList = (IntArrayList) storedValue;
+        values[i] = new double[intArrayList.size()];
+        for (int j = 0; j < intArrayList.size(); j++) {
+          values[i][j] = intArrayList.getInt(j);
+        }
+      } else if (storedValue instanceof LongArrayList) {
+        LongArrayList longArrayList = (LongArrayList) storedValue;
+        values[i] = new double[longArrayList.size()];
+        for (int j = 0; j < longArrayList.size(); j++) {
+          values[i][j] = longArrayList.getLong(j);
+        }
+      } else if (storedValue instanceof DoubleArrayList) {
+        DoubleArrayList doubleArrayList = (DoubleArrayList) storedValue;
+        values[i] = new double[doubleArrayList.size()];
+        for (int j = 0; j < doubleArrayList.size(); j++) {
+          values[i][j] = doubleArrayList.getDouble(j);
+        }
+      } else if (storedValue instanceof ObjectArrayList) {
+        ObjectArrayList list = (ObjectArrayList) storedValue;
+        values[i] = new double[list.size()];
+        for (int j = 0; j < list.size(); j++) {
+          values[i][j] = Double.parseDouble(list.get(j).toString());
+        }
       } else {
         throw new IllegalStateException("Unsupported data type: " + storedValue.getClass().getName());
       }
@@ -601,6 +701,30 @@ public String[][] getStringValuesMV() {
         }
       } else if (storedValue instanceof String[]) {
         values[i] = (String[]) storedValue;
+      } else if (storedValue instanceof IntArrayList) {
+        IntArrayList intArrayList = (IntArrayList) storedValue;
+        values[i] = new String[intArrayList.size()];
+        for (int j = 0; j < intArrayList.size(); j++) {
+          values[i][j] = Integer.toString(intArrayList.getInt(j));
+        }
+      } else if (storedValue instanceof LongArrayList) {
+        LongArrayList longArrayList = (LongArrayList) storedValue;
+        values[i] = new String[longArrayList.size()];
+        for (int j = 0; j < longArrayList.size(); j++) {
+          values[i][j] = Long.toString(longArrayList.getLong(j));
+        }
+      } else if (storedValue instanceof DoubleArrayList) {
+        DoubleArrayList doubleArrayList = (DoubleArrayList) storedValue;
+        values[i] = new String[doubleArrayList.size()];
+        for (int j = 0; j < doubleArrayList.size(); j++) {
+          values[i][j] = Double.toString(doubleArrayList.getDouble(j));
+        }
+      } else if (storedValue instanceof ObjectArrayList) {
+        ObjectArrayList list = (ObjectArrayList) storedValue;
+        values[i] = new String[list.size()];
+        for (int j = 0; j < list.size(); j++) {
+          values[i][j] = list.get(j).toString();
+        }
       } else {
         throw new IllegalStateException("Unsupported data type: " + storedValue.getClass().getName());
       }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
index e6ddb994d2fa..205f1ae71abf 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/AggregationFunctionFactory.java
@@ -39,8 +39,10 @@
 import org.apache.pinot.core.query.aggregation.function.array.SumArrayLongAggregationFunction;
 import org.apache.pinot.core.query.aggregation.function.funnel.FunnelCountAggregationFunctionFactory;
 import org.apache.pinot.core.query.aggregation.function.funnel.window.FunnelCompleteCountAggregationFunction;
+import org.apache.pinot.core.query.aggregation.function.funnel.window.FunnelEventsFunctionEvalAggregationFunction;
 import org.apache.pinot.core.query.aggregation.function.funnel.window.FunnelMatchStepAggregationFunction;
 import org.apache.pinot.core.query.aggregation.function.funnel.window.FunnelMaxStepAggregationFunction;
+import org.apache.pinot.core.query.aggregation.function.funnel.window.FunnelStepDurationStatsAggregationFunction;
 import org.apache.pinot.segment.spi.AggregationFunctionType;
 import org.apache.pinot.spi.data.FieldSpec.DataType;
 import org.apache.pinot.spi.exception.BadQueryRequestException;
@@ -467,6 +469,10 @@ public static AggregationFunction getAggregationFunction(FunctionContext functio
             return new FunnelMatchStepAggregationFunction(arguments);
           case FUNNELCOMPLETECOUNT:
             return new FunnelCompleteCountAggregationFunction(arguments);
+          case FUNNELSTEPDURATIONSTATS:
+            return new FunnelStepDurationStatsAggregationFunction(arguments);
+          case FUNNELEVENTSFUNCTIONEVAL:
+            return new FunnelEventsFunctionEvalAggregationFunction(arguments);
           case FREQUENTSTRINGSSKETCH:
             return new FrequentStringsSketchAggregationFunction(arguments);
           case FREQUENTLONGSSKETCH:
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/FunnelStepEventWithExtraFields.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/FunnelStepEventWithExtraFields.java
new file mode 100644
index 000000000000..57eb8d17ee6d
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/FunnelStepEventWithExtraFields.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.funnel;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.ByteArrayOutputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+
+
+public class FunnelStepEventWithExtraFields implements Comparable<FunnelStepEventWithExtraFields> {
+  protected static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();
+  private final FunnelStepEvent _funnelStepEvent;
+  private final List<Object> _extraFields;
+
+  public FunnelStepEventWithExtraFields(FunnelStepEvent funnelStepEvent, List<Object> extraFields) {
+    _funnelStepEvent = funnelStepEvent;
+    _extraFields = extraFields;
+  }
+
+  public FunnelStepEventWithExtraFields(byte[] bytes) {
+    _funnelStepEvent = new FunnelStepEvent(Arrays.copyOf(bytes, FunnelStepEvent.SIZE_IN_BYTES));
+    try {
+      _extraFields = OBJECT_MAPPER.readValue(bytes, 2, bytes.length, new TypeReference<List<Object>>() {
+      });
+    } catch (IOException e) {
+      throw new RuntimeException("Caught exception while converting byte[] to FunnelStepEventWithExtraFields", e);
+    }
+  }
+
+  public FunnelStepEvent getFunnelStepEvent() {
+    return _funnelStepEvent;
+  }
+
+  public List<Object> getExtraFields() {
+    return _extraFields;
+  }
+
+  @Override
+  public String toString() {
+    return "StepEventWithExtraFields{" + "funnelStepEvent=" + _funnelStepEvent + ", extraFields=" + _extraFields + '}';
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    FunnelStepEventWithExtraFields stepEvent = (FunnelStepEventWithExtraFields) o;
+
+    if (!_funnelStepEvent.equals(stepEvent.getFunnelStepEvent())) {
+      return false;
+    }
+    return _extraFields.equals(stepEvent.getExtraFields());
+  }
+
+  @Override
+  public int hashCode() {
+    int result = _funnelStepEvent.hashCode();
+    result = 31 * result + _extraFields.hashCode();
+    return result;
+  }
+
+  @Override
+  public int compareTo(FunnelStepEventWithExtraFields o) {
+    return _funnelStepEvent.compareTo(o.getFunnelStepEvent());
+  }
+
+  public byte[] getBytes() {
+    ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+    DataOutputStream dataOutputStream = new DataOutputStream(byteArrayOutputStream);
+    try {
+      dataOutputStream.write(_funnelStepEvent.getBytes());
+      dataOutputStream.write(OBJECT_MAPPER.writeValueAsBytes(_extraFields));
+      dataOutputStream.close();
+    } catch (Exception e) {
+      throw new RuntimeException("Caught exception while converting FunnelStepEvent to byte[]", e);
+    }
+    return byteArrayOutputStream.toByteArray();
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelBaseAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelBaseAggregationFunction.java
index 52d04188490f..13b9b7c1afef 100644
--- a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelBaseAggregationFunction.java
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelBaseAggregationFunction.java
@@ -21,6 +21,7 @@
 import com.google.common.base.Preconditions;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.PriorityQueue;
@@ -44,6 +45,7 @@ public abstract class FunnelBaseAggregationFunction<F extends Comparable>
   protected final FunnelModes _modes = new FunnelModes();
   protected final int _numSteps;
   protected long _maxStepDuration = 0L;
+  protected final Map<String, String> _extraArguments = new HashMap<>();
 
   public FunnelBaseAggregationFunction(List<ExpressionContext> arguments) {
     int numArguments = arguments.size();
@@ -78,7 +80,8 @@ public FunnelBaseAggregationFunction(List<ExpressionContext> arguments) {
             }
             break;
           default:
-            throw new IllegalArgumentException("Unrecognized arguments: " + extraArgument);
+            _extraArguments.put(key, parsedExtraArguments[1]);
+            break;
         }
         continue;
       }
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelEventsFunctionEvalAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelEventsFunctionEvalAggregationFunction.java
new file mode 100644
index 000000000000..4c2c2a76bcb4
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelEventsFunctionEvalAggregationFunction.java
@@ -0,0 +1,502 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.funnel.window;
+
+import com.google.common.base.Preconditions;
+import it.unimi.dsi.fastutil.objects.ObjectArrayList;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.stream.Collectors;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.common.BlockValSet;
+import org.apache.pinot.core.query.aggregation.AggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.ObjectAggregationResultHolder;
+import org.apache.pinot.core.query.aggregation.function.AggregationFunction;
+import org.apache.pinot.core.query.aggregation.function.funnel.FunnelStepEvent;
+import org.apache.pinot.core.query.aggregation.function.funnel.FunnelStepEventWithExtraFields;
+import org.apache.pinot.core.query.aggregation.groupby.GroupByResultHolder;
+import org.apache.pinot.core.query.aggregation.groupby.ObjectGroupByResultHolder;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+
+
+public class FunnelEventsFunctionEvalAggregationFunction
+    implements AggregationFunction<PriorityQueue<FunnelStepEventWithExtraFields>, ObjectArrayList<String>> {
+  protected final ExpressionContext _timestampExpression;
+  protected final long _windowSize;
+  protected final List<ExpressionContext> _stepExpressions;
+  protected final FunnelBaseAggregationFunction.FunnelModes _modes = new FunnelBaseAggregationFunction.FunnelModes();
+  protected final int _numSteps;
+  protected final int _numExtraFields;
+  protected final List<ExpressionContext> _extraExpressions;
+  protected long _maxStepDuration = 0L;
+
+  public FunnelEventsFunctionEvalAggregationFunction(List<ExpressionContext> arguments) {
+    int numArguments = arguments.size();
+    Preconditions.checkArgument(numArguments > 3,
+        "FUNNEL_EVENTS_FUNCTION_EVAL expects >= 4 arguments, got: %s. The function can be used as "
+            + getType().getName() + "(timestampExpression, windowSize, numberSteps, stepExpression, "
+            + "[stepExpression, ..], [mode, [mode, ... ]])",
+        numArguments);
+    _timestampExpression = arguments.get(0);
+    _windowSize = arguments.get(1).getLiteral().getLongValue();
+    Preconditions.checkArgument(_windowSize > 0, "Window size must be > 0");
+    _numSteps = arguments.get(2).getLiteral().getIntValue();
+    Preconditions.checkArgument(numArguments >= 3 + _numSteps,
+        "FUNNEL_EVENTS_FUNCTION_EVAL expects >= " + (3 + _numSteps)
+            + " arguments, got: %s. The function can be used as "
+            + getType().getName() + "(timestampExpression, windowSize, numberSteps, stepExpression, "
+            + "[stepExpression, ..], [extraArgument/mode, [extraArgument/mode, ... ]])",
+        numArguments);
+    _stepExpressions = arguments.subList(3, 3 + _numSteps);
+    _numExtraFields = arguments.get(3 + _numSteps).getLiteral().getIntValue();
+    Preconditions.checkArgument(numArguments >= 4 + _numSteps + _numExtraFields,
+        "FUNNEL_EVENTS_FUNCTION_EVAL expects >= " + (4 + _numSteps + _numExtraFields)
+            + " arguments, got: %s. The function can be used as "
+            + getType().getName() + "(timestampExpression, windowSize, numberSteps, stepExpression, "
+            + "[stepExpression, ..], [extraArgument/mode, [extraArgument/mode, ... ]])",
+        numArguments);
+    _extraExpressions = arguments.subList(4 + _numSteps, 4 + _numSteps + _numExtraFields);
+
+    for (int i = 4 + _numSteps + _numExtraFields; i < numArguments; i++) {
+      String extraArgument = arguments.get(i).getLiteral().getStringValue().toUpperCase();
+      String[] parsedExtraArguments = extraArgument.split("=");
+      if (parsedExtraArguments.length == 2) {
+        String key = parsedExtraArguments[0].toUpperCase();
+        switch (key) {
+          case FunnelBaseAggregationFunction.FunnelConfigs.MAX_STEP_DURATION:
+            _maxStepDuration = Long.parseLong(parsedExtraArguments[1]);
+            Preconditions.checkArgument(_maxStepDuration > 0, "MaxStepDuration must be > 0");
+            break;
+          case FunnelBaseAggregationFunction.FunnelConfigs.MODE:
+            for (String modeStr : parsedExtraArguments[1].split(",")) {
+              _modes.add(FunnelBaseAggregationFunction.Mode.valueOf(modeStr.trim()));
+            }
+            break;
+          default:
+            throw new IllegalArgumentException("Unrecognized arguments: " + extraArgument);
+        }
+        continue;
+      }
+      try {
+        _modes.add(FunnelBaseAggregationFunction.Mode.valueOf(extraArgument));
+      } catch (Exception e) {
+        throw new RuntimeException("Unrecognized extra argument for funnel function: " + extraArgument, e);
+      }
+    }
+  }
+
+  @Override
+  public String getResultColumnName() {
+    return
+        String.format("%s(%d)(%s,%s,%s)", getType().getName(), _windowSize, _timestampExpression.toString(),
+            _stepExpressions.stream().map(ExpressionContext::toString).collect(Collectors.joining(",")),
+            (_numExtraFields > 0 ? ", " + _extraExpressions.stream().map(ExpressionContext::toString)
+                .collect(Collectors.joining(",")) : "")
+        );
+  }
+
+  @Override
+  public List<ExpressionContext> getInputExpressions() {
+    List<ExpressionContext> inputs = new ArrayList<>(1 + _numSteps + _numExtraFields);
+    inputs.add(_timestampExpression);
+    inputs.addAll(_stepExpressions);
+    inputs.addAll(_extraExpressions);
+    return inputs;
+  }
+
+  @Override
+  public AggregationResultHolder createAggregationResultHolder() {
+    return new ObjectAggregationResultHolder();
+  }
+
+  @Override
+  public GroupByResultHolder createGroupByResultHolder(int initialCapacity, int maxCapacity) {
+    return new ObjectGroupByResultHolder(initialCapacity, maxCapacity);
+  }
+
+  @Override
+  public void aggregate(int length, AggregationResultHolder aggregationResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    long[] timestampBlock = blockValSetMap.get(_timestampExpression).getLongValuesSV();
+    List<int[]> stepBlocks = new ArrayList<>(_numSteps);
+    for (ExpressionContext stepExpression : _stepExpressions) {
+      stepBlocks.add(blockValSetMap.get(stepExpression).getIntValuesSV());
+    }
+    PriorityQueue<FunnelStepEventWithExtraFields> stepEvents = aggregationResultHolder.getResult();
+    if (stepEvents == null) {
+      stepEvents = new PriorityQueue<>();
+      aggregationResultHolder.setValue(stepEvents);
+    }
+    List<Object> extraFieldsBlocks = getExtraFieldsBlocks(blockValSetMap);
+    for (int i = 0; i < length; i++) {
+      boolean stepFound = false;
+      for (int j = 0; j < _numSteps; j++) {
+        if (stepBlocks.get(j)[i] == 1) {
+          List<Object> extraFields = extractExtraFields(extraFieldsBlocks, i);
+          stepEvents.add(new FunnelStepEventWithExtraFields(new FunnelStepEvent(timestampBlock[i], j), extraFields));
+          stepFound = true;
+          break;
+        }
+      }
+      // If the mode is KEEP_ALL and no step is found, add a dummy step event with step -1
+      if (_modes.hasKeepAll() && !stepFound) {
+        List<Object> extraFields = extractExtraFields(extraFieldsBlocks, i);
+        stepEvents.add(new FunnelStepEventWithExtraFields(new FunnelStepEvent(timestampBlock[i], -1), extraFields));
+      }
+    }
+  }
+
+  private List<Object> getExtraFieldsBlocks(Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    List<Object> extraFieldsBlocks = new ArrayList<>(_numExtraFields);
+    for (ExpressionContext extraExpression : _extraExpressions) {
+      BlockValSet blockValSet = blockValSetMap.get(extraExpression);
+      switch (blockValSet.getValueType()) {
+        case INT:
+          extraFieldsBlocks.add(blockValSet.getIntValuesSV());
+          break;
+        case LONG:
+        case TIMESTAMP:
+          extraFieldsBlocks.add(blockValSet.getLongValuesSV());
+          break;
+        case FLOAT:
+          extraFieldsBlocks.add(blockValSet.getFloatValuesSV());
+          break;
+        case DOUBLE:
+          extraFieldsBlocks.add(blockValSet.getDoubleValuesSV());
+          break;
+        case STRING:
+          extraFieldsBlocks.add(blockValSet.getStringValuesSV());
+          break;
+        default:
+          throw new IllegalArgumentException("Unsupported data type for extra field: " + extraExpression + " - "
+              + blockValSet.getValueType());
+      }
+    }
+    return extraFieldsBlocks;
+  }
+
+  private List<Object> extractExtraFields(List<Object> extraFieldsBlocks, int i) {
+    List<Object> extraFields = new ArrayList<>(_numExtraFields);
+    for (Object extraFieldsBlock : extraFieldsBlocks) {
+      switch (extraFieldsBlock.getClass().getComponentType().getSimpleName()) {
+        case "int":
+          extraFields.add(((int[]) extraFieldsBlock)[i]);
+          break;
+        case "long":
+          extraFields.add(((long[]) extraFieldsBlock)[i]);
+          break;
+        case "float":
+          extraFields.add(((float[]) extraFieldsBlock)[i]);
+          break;
+        case "double":
+          extraFields.add(((double[]) extraFieldsBlock)[i]);
+          break;
+        case "String":
+          extraFields.add(((String[]) extraFieldsBlock)[i]);
+          break;
+        default:
+          throw new IllegalArgumentException(
+              "Unsupported data type for extra field: " + extraFieldsBlock.getClass().getComponentType()
+                  .getSimpleName());
+      }
+    }
+    return extraFields;
+  }
+
+  @Override
+  public void aggregateGroupBySV(int length, int[] groupKeyArray, GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    long[] timestampBlock = blockValSetMap.get(_timestampExpression).getLongValuesSV();
+    List<int[]> stepBlocks = new ArrayList<>(_numSteps);
+    for (ExpressionContext stepExpression : _stepExpressions) {
+      stepBlocks.add(blockValSetMap.get(stepExpression).getIntValuesSV());
+    }
+    List<Object> extraFieldsBlocks = getExtraFieldsBlocks(blockValSetMap);
+    for (int i = 0; i < length; i++) {
+      int groupKey = groupKeyArray[i];
+      boolean stepFound = false;
+      for (int j = 0; j < _numSteps; j++) {
+        if (stepBlocks.get(j)[i] == 1) {
+          PriorityQueue<FunnelStepEventWithExtraFields> stepEvents = getFunnelStepEvents(groupByResultHolder, groupKey);
+          List<Object> extraFields = extractExtraFields(extraFieldsBlocks, i);
+          stepEvents.add(new FunnelStepEventWithExtraFields(new FunnelStepEvent(timestampBlock[i], j), extraFields));
+          stepFound = true;
+          break;
+        }
+      }
+      // If the mode is KEEP_ALL and no step is found, add a dummy step event with step -1
+      if (_modes.hasKeepAll() && !stepFound) {
+        PriorityQueue<FunnelStepEventWithExtraFields> stepEvents = getFunnelStepEvents(groupByResultHolder, groupKey);
+        List<Object> extraFields = extractExtraFields(extraFieldsBlocks, i);
+        stepEvents.add(new FunnelStepEventWithExtraFields(new FunnelStepEvent(timestampBlock[i], -1), extraFields));
+      }
+    }
+  }
+
+  @Override
+  public void aggregateGroupByMV(int length, int[][] groupKeysArray, GroupByResultHolder groupByResultHolder,
+      Map<ExpressionContext, BlockValSet> blockValSetMap) {
+    long[] timestampBlock = blockValSetMap.get(_timestampExpression).getLongValuesSV();
+    List<int[]> stepBlocks = new ArrayList<>(_numSteps);
+    for (ExpressionContext stepExpression : _stepExpressions) {
+      stepBlocks.add(blockValSetMap.get(stepExpression).getIntValuesSV());
+    }
+    List<Object> extraFieldsBlocks = getExtraFieldsBlocks(blockValSetMap);
+    for (int i = 0; i < length; i++) {
+      int[] groupKeys = groupKeysArray[i];
+      boolean stepFound = false;
+      for (int j = 0; j < _numSteps; j++) {
+        if (stepBlocks.get(j)[i] == 1) {
+          for (int groupKey : groupKeys) {
+            PriorityQueue<FunnelStepEventWithExtraFields> stepEvents =
+                getFunnelStepEvents(groupByResultHolder, groupKey);
+            List<Object> extraFields = extractExtraFields(extraFieldsBlocks, i);
+            stepEvents.add(new FunnelStepEventWithExtraFields(new FunnelStepEvent(timestampBlock[i], j), extraFields));
+          }
+          stepFound = true;
+          break;
+        }
+      }
+      // If the mode is KEEP_ALL and no step is found, add a dummy step event with step -1
+      if (_modes.hasKeepAll() && !stepFound) {
+        for (int groupKey : groupKeys) {
+          PriorityQueue<FunnelStepEventWithExtraFields> stepEvents = getFunnelStepEvents(groupByResultHolder, groupKey);
+          List<Object> extraFields = extractExtraFields(extraFieldsBlocks, i);
+          stepEvents.add(new FunnelStepEventWithExtraFields(new FunnelStepEvent(timestampBlock[i], -1), extraFields));
+        }
+      }
+    }
+  }
+
+  private static PriorityQueue<FunnelStepEventWithExtraFields> getFunnelStepEvents(
+      GroupByResultHolder groupByResultHolder,
+      int groupKey) {
+    PriorityQueue<FunnelStepEventWithExtraFields> stepEvents = groupByResultHolder.getResult(groupKey);
+    if (stepEvents == null) {
+      stepEvents = new PriorityQueue<>();
+      groupByResultHolder.setValueForKey(groupKey, stepEvents);
+    }
+    return stepEvents;
+  }
+
+  @Override
+  public PriorityQueue<FunnelStepEventWithExtraFields> extractAggregationResult(
+      AggregationResultHolder aggregationResultHolder) {
+    return aggregationResultHolder.getResult();
+  }
+
+  @Override
+  public PriorityQueue<FunnelStepEventWithExtraFields> extractGroupByResult(GroupByResultHolder groupByResultHolder,
+      int groupKey) {
+    return groupByResultHolder.getResult(groupKey);
+  }
+
+  @Override
+  public PriorityQueue<FunnelStepEventWithExtraFields> merge(
+      PriorityQueue<FunnelStepEventWithExtraFields> intermediateResult1,
+      PriorityQueue<FunnelStepEventWithExtraFields> intermediateResult2) {
+    if (intermediateResult1 == null) {
+      return intermediateResult2;
+    }
+    if (intermediateResult2 == null) {
+      return intermediateResult1;
+    }
+    intermediateResult1.addAll(intermediateResult2);
+    return intermediateResult1;
+  }
+
+  @Override
+  public DataSchema.ColumnDataType getIntermediateResultColumnType() {
+    return DataSchema.ColumnDataType.STRING_ARRAY;
+  }
+
+  /**
+   * Fill the sliding window with the events that fall into the window.
+   * Note that the events from stepEvents are dequeued and added to the sliding window.
+   * This method ensure the first event from the sliding window is the first step event.
+   *
+   * @param stepEvents    The priority queue of step events
+   * @param slidingWindow The sliding window with events that fall into the window
+   */
+  protected void fillWindow(PriorityQueue<FunnelStepEventWithExtraFields> stepEvents,
+      ArrayDeque<FunnelStepEventWithExtraFields> slidingWindow) {
+    // Ensure for the sliding window, the first event is the first step
+    while ((!slidingWindow.isEmpty()) && slidingWindow.peek().getFunnelStepEvent().getStep() != 0) {
+      slidingWindow.pollFirst();
+    }
+    if (slidingWindow.isEmpty()) {
+      while (!stepEvents.isEmpty() && stepEvents.peek().getFunnelStepEvent().getStep() != 0) {
+        stepEvents.poll();
+      }
+      if (stepEvents.isEmpty()) {
+        return;
+      }
+      slidingWindow.addLast(stepEvents.poll());
+    }
+    // SlidingWindow is not empty
+    long windowStart = slidingWindow.peek().getFunnelStepEvent().getTimestamp();
+    long windowEnd = windowStart + _windowSize;
+    while (!stepEvents.isEmpty() && (stepEvents.peek().getFunnelStepEvent().getTimestamp() < windowEnd)) {
+      if (_maxStepDuration > 0) {
+        // When maxStepDuration > 0, we need to check if the event_to_add has a timestamp within the max duration
+        // from the last event in the sliding window. If not, we break the loop.
+        if (stepEvents.peek().getFunnelStepEvent().getTimestamp() - slidingWindow.getLast().getFunnelStepEvent()
+            .getTimestamp() > _maxStepDuration) {
+          break;
+        }
+      }
+      slidingWindow.addLast(stepEvents.poll());
+    }
+  }
+
+  @Override
+  public String toExplainString() {
+    //@formatter:off
+    return getType().getName() + "{"
+        + "timestampExpression=" + _timestampExpression
+        + ", windowSize=" + _windowSize
+        + ", stepExpressions=" + _stepExpressions
+        + ", extraExpressions=" + _extraExpressions
+        + '}';
+    //@formatter:on
+  }
+
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.FUNNELEVENTSFUNCTIONEVAL;
+  }
+
+  @Override
+  public DataSchema.ColumnDataType getFinalResultColumnType() {
+    return DataSchema.ColumnDataType.STRING_ARRAY;
+  }
+
+  @Override
+  public ObjectArrayList<String> extractFinalResult(PriorityQueue<FunnelStepEventWithExtraFields> stepEvents) {
+    ObjectArrayList<String> finalResults = new ObjectArrayList<>();
+    List<List<Object[]>> matchedFunnelEventsExtraFields = new ArrayList<>(_numExtraFields);
+    if (stepEvents == null || stepEvents.isEmpty()) {
+      return finalResults;
+    }
+    ArrayDeque<FunnelStepEventWithExtraFields> slidingWindow = new ArrayDeque<>();
+    while (!stepEvents.isEmpty()) {
+      fillWindow(stepEvents, slidingWindow);
+      if (slidingWindow.isEmpty()) {
+        break;
+      }
+
+      long windowStart = slidingWindow.peek().getFunnelStepEvent().getTimestamp();
+
+      int maxStep = 0;
+      long previousTimestamp = -1;
+      for (FunnelStepEventWithExtraFields event : slidingWindow) {
+        int currentEventStep = event.getFunnelStepEvent().getStep();
+        // If the same condition holds for the sequence of events, then such repeating event interrupts further
+        // processing.
+        if (_modes.hasStrictDeduplication()) {
+          if (currentEventStep == maxStep - 1) {
+            maxStep = 0;
+          }
+        }
+        // Don't allow interventions of other events. E.g. in the case of A->B->D->C, it stops finding A->B->C at the D
+        // and the max event level is 2.
+        if (_modes.hasStrictOrder()) {
+          if (currentEventStep != maxStep) {
+            maxStep = 0;
+          }
+        }
+        // Apply conditions only to events with strictly increasing timestamps.
+        if (_modes.hasStrictIncrease()) {
+          if (previousTimestamp == event.getFunnelStepEvent().getTimestamp()) {
+            continue;
+          }
+        }
+        previousTimestamp = event.getFunnelStepEvent().getTimestamp();
+        if (maxStep == currentEventStep) {
+          maxStep++;
+        }
+        if (maxStep == _numSteps) {
+          matchedFunnelEventsExtraFields.add(extractFunnelEventsExtraFields(slidingWindow));
+          maxStep = 0;
+          windowStart = event.getFunnelStepEvent().getTimestamp();
+        }
+      }
+      if (!slidingWindow.isEmpty()) {
+        slidingWindow.pollFirst();
+      }
+      // sliding window should pop until current event:
+      while (!slidingWindow.isEmpty() && slidingWindow.peek().getFunnelStepEvent().getTimestamp() < windowStart) {
+        slidingWindow.pollFirst();
+      }
+    }
+
+    evalFunctionOnMatchedFunnelEvents(matchedFunnelEventsExtraFields, finalResults);
+    return finalResults;
+  }
+
+  private void evalFunctionOnMatchedFunnelEvents(List<List<Object[]>> matchedFunnelEventsExtraFields,
+      ObjectArrayList<String> finalResults) {
+    StringBuilder arrayAssignments = new StringBuilder(Integer.toString(matchedFunnelEventsExtraFields.size()));
+    for (List<Object[]> funnelEventsExtraField : matchedFunnelEventsExtraFields) {
+      arrayAssignments.append(", ").append(funnelEventsExtraField.size() * _numExtraFields);
+    }
+    finalResults.add(arrayAssignments.toString());
+    for (List<Object[]> matchedFunnelEventsExtraField : matchedFunnelEventsExtraFields) {
+      for (Object[] extraFields : matchedFunnelEventsExtraField) {
+        for (Object extraField : extraFields) {
+          finalResults.add(extraField.toString());
+        }
+      }
+    }
+  }
+
+  private List<Object[]> extractFunnelEventsExtraFields(ArrayDeque<FunnelStepEventWithExtraFields> slidingWindow) {
+    List<Object[]> results = new ArrayList<>();
+    int step = 0;
+    for (FunnelStepEventWithExtraFields event : slidingWindow) {
+      if (event.getFunnelStepEvent().getStep() == step) {
+        Object[] extraFields = new Object[_numExtraFields];
+        List<Object> extraFieldsList = event.getExtraFields();
+        for (int i = 0; i < _numExtraFields; i++) {
+          extraFields[i] = extraFieldsList.get(i);
+        }
+        results.add(extraFields);
+        step++;
+      }
+    }
+    return results;
+  }
+
+  @Override
+  public ObjectArrayList<String> mergeFinalResult(ObjectArrayList<String> finalResult1,
+      ObjectArrayList<String> finalResult2) {
+    if (finalResult1 == null) {
+      return finalResult2;
+    }
+    if (finalResult2 == null) {
+      return finalResult1;
+    }
+    finalResult1.addAll(finalResult2);
+    return finalResult1;
+  }
+}
diff --git a/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelStepDurationStatsAggregationFunction.java b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelStepDurationStatsAggregationFunction.java
new file mode 100644
index 000000000000..ef585e0583ea
--- /dev/null
+++ b/pinot-core/src/main/java/org/apache/pinot/core/query/aggregation/function/funnel/window/FunnelStepDurationStatsAggregationFunction.java
@@ -0,0 +1,261 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.core.query.aggregation.function.funnel.window;
+
+import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
+import java.util.ArrayDeque;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.pinot.common.request.context.ExpressionContext;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.core.query.aggregation.function.funnel.FunnelStepEvent;
+import org.apache.pinot.segment.local.aggregator.AvgValueAggregator;
+import org.apache.pinot.segment.local.aggregator.PercentileEstValueAggregator;
+import org.apache.pinot.segment.local.customobject.AvgPair;
+import org.apache.pinot.segment.local.customobject.QuantileDigest;
+import org.apache.pinot.segment.spi.AggregationFunctionType;
+import org.apache.pinot.spi.utils.CommonConstants;
+
+
+public class FunnelStepDurationStatsAggregationFunction extends FunnelBaseAggregationFunction<DoubleArrayList> {
+
+  private static final AvgValueAggregator AVG_VALUE_AGGREGATOR = new AvgValueAggregator();
+  private static final PercentileEstValueAggregator PERCENTILE_EST_VALUE_AGGREGATOR =
+      new PercentileEstValueAggregator();
+
+  private final List<String> _durationFunctions = new ArrayList<>();
+  private boolean _canSkipNonMatchedFunnel = true;
+
+  public FunnelStepDurationStatsAggregationFunction(List<ExpressionContext> arguments) {
+    super(arguments);
+    if (_extraArguments.get("DURATIONFUNCTIONS") != null) {
+      String[] durationFunctions = _extraArguments.get("DURATIONFUNCTIONS").split(",");
+      for (String durationFunction : durationFunctions) {
+        String functionName = durationFunction.trim().toUpperCase();
+        if (functionName.equals("AVG") || functionName.equals("MEDIAN") || functionName.equals("MIN")
+            || functionName.equals("MAX")) {
+          _durationFunctions.add(functionName);
+        } else if (functionName.equals("COUNT")) {
+          _canSkipNonMatchedFunnel = false;
+          _durationFunctions.add(functionName);
+        } else if (functionName.startsWith("PERCENTILE")) {
+          try {
+            double quantile = Double.parseDouble(functionName.substring("PERCENTILE".length())) / 100.0;
+            if (quantile < 0 || quantile > 1) {
+              throw new IllegalArgumentException("Invalid percentile value: " + quantile);
+            }
+          } catch (NumberFormatException e) {
+            throw new IllegalArgumentException("Invalid percentile function name: " + functionName + ", must be "
+                + "PERCENTILE followed by a double value between 0 and 100");
+          }
+          _durationFunctions.add(functionName);
+        } else {
+          throw new IllegalArgumentException("Unsupported duration function: " + functionName);
+        }
+      }
+    } else {
+      throw new IllegalArgumentException(
+          "Duration functions must be provided for FunnelStepDurationStatsAggregationFunction");
+    }
+  }
+
+  @Override
+  public AggregationFunctionType getType() {
+    return AggregationFunctionType.FUNNELSTEPDURATIONSTATS;
+  }
+
+  @Override
+  public DataSchema.ColumnDataType getFinalResultColumnType() {
+    return DataSchema.ColumnDataType.DOUBLE_ARRAY;
+  }
+
+  @Override
+  public DoubleArrayList extractFinalResult(PriorityQueue<FunnelStepEvent> stepEvents) {
+    if (stepEvents == null || stepEvents.isEmpty()) {
+      return new DoubleArrayList();
+    }
+    Map<Integer, List<Object>> stepValueAggregators = initValueAggregator();
+    boolean hasMatchedFunnel = false;
+    ArrayDeque<FunnelStepEvent> slidingWindow = new ArrayDeque<>();
+    while (!stepEvents.isEmpty()) {
+      fillWindow(stepEvents, slidingWindow);
+      if (slidingWindow.isEmpty()) {
+        break;
+      }
+      int maxSteps = processWindow(slidingWindow);
+      if (maxSteps == _numSteps) {
+        applyStepDurations(stepValueAggregators, slidingWindow);
+        hasMatchedFunnel = true;
+      } else {
+        // Add counts for not completed funnels
+        for (int i = 0; i < maxSteps; i++) {
+          List<Object> objects = stepValueAggregators.get(i);
+          for (Object count : objects) {
+            if (count instanceof AtomicInteger) {
+              ((AtomicInteger) count).set(1);
+            }
+          }
+        }
+      }
+      if (!slidingWindow.isEmpty()) {
+        slidingWindow.pollFirst();
+      }
+    }
+    if (_canSkipNonMatchedFunnel && !hasMatchedFunnel) {
+      return new DoubleArrayList();
+    }
+    return getStepDurationResults(stepValueAggregators, hasMatchedFunnel);
+  }
+
+  private void applyStepDurations(Map<Integer, List<Object>> stepAggregatorValues,
+      ArrayDeque<FunnelStepEvent> slidingWindow) {
+    List<Long> stepTimestamp = new ArrayList<>();
+    for (FunnelStepEvent event : slidingWindow) {
+      int step = event.getStep();
+      if (stepTimestamp.size() <= step) {
+        stepTimestamp.add(event.getTimestamp());
+      }
+    }
+    for (int i = 0; i < stepTimestamp.size() - 1; i++) {
+      long duration = stepTimestamp.get(i + 1) - stepTimestamp.get(i);
+      for (Object stepAggregatorValue : stepAggregatorValues.get(i)) {
+        if (stepAggregatorValue instanceof AtomicInteger) {
+          ((AtomicInteger) stepAggregatorValue).set(1);
+        } else if (stepAggregatorValue instanceof AvgPair) {
+          AVG_VALUE_AGGREGATOR.applyRawValue((AvgPair) stepAggregatorValue, duration);
+        } else if (stepAggregatorValue instanceof QuantileDigest) {
+          PERCENTILE_EST_VALUE_AGGREGATOR.applyRawValue((QuantileDigest) stepAggregatorValue, duration);
+        }
+      }
+    }
+    if (stepAggregatorValues.get(_numSteps - 1) != null) {
+      for (Object stepAggregatorValue : stepAggregatorValues.get(_numSteps - 1)) {
+        if (stepAggregatorValue instanceof AtomicInteger) {
+          ((AtomicInteger) stepAggregatorValue).set(1);
+        }
+      }
+    }
+  }
+
+  private Map<Integer, List<Object>> initValueAggregator() {
+    Map<Integer, List<Object>> stepValueAggregators = new HashMap<>();
+    for (int step = 0; step < _numSteps; step++) {
+      List<Object> valueAggregators = new ArrayList<>();
+      valueAggregators.add(new AtomicInteger(0));
+      valueAggregators.add(new AvgPair());
+      valueAggregators.add(new QuantileDigest(0));
+      stepValueAggregators.put(step, valueAggregators);
+    }
+    return stepValueAggregators;
+  }
+
+  private DoubleArrayList getStepDurationResults(Map<Integer, List<Object>> valueAggregatorResults,
+      boolean hasMatchedFunnel) {
+    DoubleArrayList result = new DoubleArrayList(_durationFunctions.size() * (_numSteps - 1));
+    for (int step = 0; step < _numSteps; step++) {
+      AtomicReference<AvgPair> avgPair = new AtomicReference<>();
+      AtomicReference<QuantileDigest> quantileDigest = new AtomicReference<>();
+      AtomicInteger count = new AtomicInteger();
+      valueAggregatorResults.get(step).forEach(valueAggregator -> {
+        if (valueAggregator instanceof AvgPair) {
+          avgPair.set((AvgPair) valueAggregator);
+        }
+        if (valueAggregator instanceof QuantileDigest) {
+          quantileDigest.set((QuantileDigest) valueAggregator);
+        }
+        if (valueAggregator instanceof AtomicInteger) {
+          count.set(((AtomicInteger) valueAggregator).intValue());
+        }
+      });
+      for (int i = 0; i < _durationFunctions.size(); i++) {
+        String durationFunction = _durationFunctions.get(i);
+        if (durationFunction.equals("COUNT")) {
+          result.add(count.get());
+          continue;
+        }
+        if (!hasMatchedFunnel || step == _numSteps - 1) {
+          result.add(CommonConstants.NullValuePlaceHolder.DOUBLE);
+          continue;
+        }
+        if (durationFunction.equals("AVG")) {
+          result.add(avgPair.get().getSum() / avgPair.get().getCount());
+        } else if (durationFunction.equals("MEDIAN")) {
+          result.add(quantileDigest.get().getQuantile(0.5));
+        } else if (durationFunction.equals("MIN")) {
+          result.add(quantileDigest.get().getQuantile(0));
+        } else if (durationFunction.equals("MAX")) {
+          result.add(quantileDigest.get().getQuantile(1));
+        } else if (durationFunction.startsWith("PERCENTILE")) {
+          double quantile = Double.parseDouble(durationFunction.substring("PERCENTILE".length())) / 100.0;
+          result.add(quantileDigest.get().getQuantile(quantile));
+        }
+      }
+    }
+    return result;
+  }
+
+  protected Integer processWindow(ArrayDeque<FunnelStepEvent> slidingWindow) {
+    int maxStep = 0;
+    long previousTimestamp = -1;
+    for (FunnelStepEvent event : slidingWindow) {
+      int currentEventStep = event.getStep();
+      // If the same condition holds for the sequence of events, then such repeating event interrupts further
+      // processing.
+      if (_modes.hasStrictDeduplication()) {
+        if (currentEventStep == maxStep - 1) {
+          return maxStep;
+        }
+      }
+      // Don't allow interventions of other events. E.g. in the case of A->B->D->C, it stops finding A->B->C at the D
+      // and the max event level is 2.
+      if (_modes.hasStrictOrder()) {
+        if (currentEventStep != maxStep) {
+          return maxStep;
+        }
+      }
+      // Apply conditions only to events with strictly increasing timestamps.
+      if (_modes.hasStrictIncrease()) {
+        if (previousTimestamp == event.getTimestamp()) {
+          continue;
+        }
+      }
+      if (maxStep == currentEventStep) {
+        maxStep++;
+        previousTimestamp = event.getTimestamp();
+      }
+      if (maxStep == _numSteps) {
+        break;
+      }
+    }
+    return maxStep;
+  }
+
+  @Override
+  public DoubleArrayList mergeFinalResult(DoubleArrayList finalResult1, DoubleArrayList finalResult2) {
+    if (finalResult1 == null) {
+      return finalResult2;
+    }
+    return finalResult1;
+  }
+}
diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/WindowFunnelTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/WindowFunnelTest.java
index a318e5698c49..0c4cc22a51fc 100644
--- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/WindowFunnelTest.java
+++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/custom/WindowFunnelTest.java
@@ -894,6 +894,157 @@ public void testFunnelCompleteCountGroupByQueriesSkipLeaf(boolean useMultiStageQ
     }
   }
 
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testFunnelEventsFunctionEvalGroupByQueries(boolean useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    String query =
+        String.format("SELECT "
+            + "userId, funnelEventsFunctionEval(timestampCol, '1000', 4, "
+            + "url = '/product/search', "
+            + "url = '/cart/add', "
+            + "url = '/checkout/start', "
+            + "url = '/checkout/confirmation', "
+            + "3, timestampCol, userId, url"
+            + ") "
+            + "FROM %s GROUP BY userId ORDER BY userId LIMIT %d", getTableName(), getCountStarResult());
+    JsonNode jsonNode = postQuery(query);
+    System.out.println("query = " + query);
+    System.out.println("jsonNode = " + jsonNode);
+    JsonNode rows = jsonNode.get("resultTable").get("rows");
+    System.out.println(rows);
+    assertEquals(rows.size(), 40);
+    for (int i = 0; i < rows.size(); i++) {
+      JsonNode row = rows.get(i);
+      System.out.println("row = " + row);
+      //assertEquals(row.size(), 2);
+      //assertEquals(row.get(0).textValue(), "user" + (i / 10) + (i % 10));
+      switch (i / 10) {
+        case 0:
+          //assertEquals(row.get(1).intValue(), 1);
+          break;
+        case 1:
+          //assertEquals(row.get(1).intValue(), 0);
+          break;
+        case 2:
+          //assertEquals(row.get(1).intValue(), 0);
+          break;
+        case 3:
+          //assertEquals(row.get(1).intValue(), 0);
+          break;
+        default:
+          throw new IllegalStateException();
+      }
+    }
+  }
+
+  @Test(dataProvider = "useBothQueryEngines")
+  public void testFunnelStepDurationStatsGroupByQueries(boolean useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    String query =
+        String.format("SELECT "
+                + "userId, funnelStepDurationStats(timestampCol, '1000', 4, "
+                + "url = '/product/search', "
+                + "url = '/cart/add', "
+                + "url = '/checkout/start', "
+                + "url = '/checkout/confirmation', "
+                + "'durationFunctions=count,avg,min,median,percentile95,max' "
+                + ") as statsArray "
+                + "FROM %s GROUP BY userId HAVING arrayLengthDouble(statsArray) > 0 ORDER BY userId LIMIT %d",
+            getTableName(), getCountStarResult());
+    JsonNode jsonNode = postQuery(query);
+    System.out.println("query = " + query);
+    System.out.println("jsonNode = " + jsonNode);
+    JsonNode rows = jsonNode.get("resultTable").get("rows");
+    System.out.println(rows);
+    //assertEquals(rows.size(), 40);
+    for (int i = 0; i < rows.size(); i++) {
+      JsonNode row = rows.get(i);
+      System.out.println("row = " + row);
+      //assertEquals(row.size(), 2);
+      //assertEquals(row.get(0).textValue(), "user" + (i / 10) + (i % 10));
+      switch (i / 10) {
+        case 0:
+          //assertEquals(row.get(1).intValue(), 1);
+          break;
+        case 1:
+          //assertEquals(row.get(1).intValue(), 0);
+          break;
+        case 2:
+          //assertEquals(row.get(1).intValue(), 0);
+          break;
+        case 3:
+          //assertEquals(row.get(1).intValue(), 0);
+          break;
+        default:
+          throw new IllegalStateException();
+      }
+    }
+  }
+
+
+  @Test(dataProvider = "useV2QueryEngine")
+  public void testFunnelStepDurationStatsGroupByQueries2(boolean useMultiStageQueryEngine)
+      throws Exception {
+    setUseMultiStageQueryEngine(useMultiStageQueryEngine);
+    String query =
+        String.format("WITH durationStats AS (SELECT "
+                + "/*+ aggOptions( is_partitioned_by_group_by_keys = 'true') */ "
+                + "userId, funnelStepDurationStats(timestampCol, '1000', 4, "
+                + "url = '/product/search', "
+                + "url = '/cart/add', "
+                + "url = '/checkout/start', "
+                + "url = '/checkout/confirmation', "
+                + "'durationFunctions=count,avg,median' "
+                + ") as stats "
+                + "FROM %s "
+                + "GROUP BY userId) "
+                + "SELECT "
+                + "sum(arrayElementAtDouble(stats, 1)) AS count_step0, "
+                + "AVG(arrayElementAtDouble(stats, 2)) AS avg_avg_step0_to_step1, "
+                + "AVG(arrayElementAtDouble(stats, 3)) AS avg_median_step0_to_step1, "
+                + "sum(arrayElementAtDouble(stats, 4)) AS count_step1, "
+                + "AVG(arrayElementAtDouble(stats, 5)) AS avg_avg_step1_to_step2, "
+                + "AVG(arrayElementAtDouble(stats, 6)) AS avg_median_step1_to_step2, "
+                + "sum(arrayElementAtDouble(stats, 7)) AS count_step2, "
+                + "AVG(arrayElementAtDouble(stats, 8)) AS avg_avg_step2_to_step3, "
+                + "AVG(arrayElementAtDouble(stats, 9)) AS avg_median_step2_to_step3, "
+                + "sum(arrayElementAtDouble(stats, 10)) AS count_step3 "
+                + "FROM durationStats "
+                + "OPTION(useMultistageEngine=true, numGroupsLimit=2000000, timeoutMs=1800000, "
+                + "serverReturnFinalResult=true, numThreadsForFinalReduce=4)",
+            getTableName());
+    JsonNode jsonNode = postQuery(query);
+    System.out.println("query = " + query);
+    System.out.println("jsonNode = " + jsonNode);
+    JsonNode rows = jsonNode.get("resultTable").get("rows");
+    System.out.println(rows);
+    //assertEquals(rows.size(), 40);
+    for (int i = 0; i < rows.size(); i++) {
+      JsonNode row = rows.get(i);
+      System.out.println("row = " + row);
+      //assertEquals(row.size(), 2);
+      //assertEquals(row.get(0).textValue(), "user" + (i / 10) + (i % 10));
+      switch (i / 10) {
+        case 0:
+          //assertEquals(row.get(1).intValue(), 1);
+          break;
+        case 1:
+          //assertEquals(row.get(1).intValue(), 0);
+          break;
+        case 2:
+          //assertEquals(row.get(1).intValue(), 0);
+          break;
+        case 3:
+          //assertEquals(row.get(1).intValue(), 0);
+          break;
+        default:
+          throw new IllegalStateException();
+      }
+    }
+  }
+
   @Override
   public String getTableName() {
     return DEFAULT_TABLE_NAME;
diff --git a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
index 74e7a135b45b..04e4038e0fa9 100644
--- a/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
+++ b/pinot-segment-spi/src/main/java/org/apache/pinot/segment/spi/AggregationFunctionType.java
@@ -180,8 +180,13 @@ public enum AggregationFunctionType {
   // funnel aggregate functions
   FUNNELMAXSTEP("funnelMaxStep", ReturnTypes.INTEGER, OperandTypes.VARIADIC, SqlTypeName.OTHER),
   FUNNELCOMPLETECOUNT("funnelCompleteCount", ReturnTypes.INTEGER, OperandTypes.VARIADIC, SqlTypeName.OTHER),
+  FUNNELSTEPDURATIONSTATS("funnelStepDurationStats", new ArrayReturnTypeInference(SqlTypeName.DOUBLE),
+      OperandTypes.VARIADIC, SqlTypeName.OTHER),
   FUNNELMATCHSTEP("funnelMatchStep", new ArrayReturnTypeInference(SqlTypeName.INTEGER), OperandTypes.VARIADIC,
       SqlTypeName.OTHER),
+  FUNNELEVENTSFUNCTIONEVAL("funnelEventsFunctionEval", new ArrayReturnTypeInference(SqlTypeName.VARCHAR),
+      OperandTypes.VARIADIC,
+      SqlTypeName.OTHER),
   FUNNELCOUNT("funnelCount", new ArrayReturnTypeInference(SqlTypeName.BIGINT), OperandTypes.VARIADIC,
       SqlTypeName.OTHER),