Skip to content

Commit

Permalink
Support FunnelEventsFunctionEvalAggregationFunction and FunnelStepDur…
Browse files Browse the repository at this point in the history
…ationStatsAggregationFunction functions
  • Loading branch information
xiangfu0 committed Jan 30, 2025
1 parent 156be80 commit d47dc56
Show file tree
Hide file tree
Showing 11 changed files with 1,265 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -365,4 +365,29 @@ public static String arrayToString(String[] values, String delimiter, String nul
.map(s -> s == null || s.equals(NullValuePlaceHolder.STRING) ? nullString : s)
.toArray(String[]::new));
}

@ScalarFunction
public static int arrayLengthInt(int[] values) {
return values.length;
}

@ScalarFunction
public static int arrayLengthLong(long[] values) {
return values.length;
}

@ScalarFunction
public static int arrayLengthFloat(float[] values) {
return values.length;
}

@ScalarFunction
public static int arrayLengthDouble(double[] values) {
return values.length;
}

@ScalarFunction
public static int arrayLengthString(String[] values) {
return values.length;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
import org.apache.pinot.common.CustomObject;
import org.apache.pinot.common.utils.HashUtil;
import org.apache.pinot.core.query.aggregation.function.funnel.FunnelStepEvent;
import org.apache.pinot.core.query.aggregation.function.funnel.FunnelStepEventWithExtraFields;
import org.apache.pinot.core.query.aggregation.utils.exprminmax.ExprMinMaxObject;
import org.apache.pinot.core.query.utils.idset.IdSet;
import org.apache.pinot.core.query.utils.idset.IdSets;
Expand Down Expand Up @@ -1747,6 +1748,63 @@ public PriorityQueue<FunnelStepEvent> deserialize(ByteBuffer byteBuffer) {
}
};

public static final ObjectSerDe<PriorityQueue<FunnelStepEventWithExtraFields>>
FUNNEL_STEP_EVENT_WITH_EXTRA_FIELDS_ACCUMULATOR_SER_DE =
new ObjectSerDe<PriorityQueue<FunnelStepEventWithExtraFields>>() {

@Override
public byte[] serialize(PriorityQueue<FunnelStepEventWithExtraFields> funnelStepEvents) {
int numEvents = funnelStepEvents.size();
List<byte[]> serializedEvents = new ArrayList<>(numEvents);
long bufferSize = Integer.BYTES; // Start with size for number of events

// First pass: Serialize each event and calculate total buffer size
for (FunnelStepEventWithExtraFields funnelStepEvent : funnelStepEvents) {
byte[] eventBytes = funnelStepEvent.getBytes(); // Costly operation, compute only once
serializedEvents.add(eventBytes); // Store serialized form
bufferSize += Integer.BYTES; // Add size for storing length
bufferSize += eventBytes.length; // Add size of serialized content
}

// Ensure the total buffer size doesn't exceed 2GB
Preconditions.checkState(bufferSize <= Integer.MAX_VALUE, "Buffer size exceeds 2GB");

// Allocate buffer
byte[] bytes = new byte[(int) bufferSize];
ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);

// Second pass: Write data to the buffer
byteBuffer.putInt(numEvents); // Write number of events
for (byte[] eventBytes : serializedEvents) {
byteBuffer.putInt(eventBytes.length); // Write length of each event
byteBuffer.put(eventBytes); // Write event content
}

return bytes;
}

@Override
public PriorityQueue<FunnelStepEventWithExtraFields> deserialize(byte[] bytes) {
return deserialize(ByteBuffer.wrap(bytes));
}

@Override
public PriorityQueue<FunnelStepEventWithExtraFields> deserialize(ByteBuffer byteBuffer) {
int size = byteBuffer.getInt();
if (size == 0) {
return new PriorityQueue<>();
}
PriorityQueue<FunnelStepEventWithExtraFields> funnelStepEvents = new PriorityQueue<>(size);
for (int i = 0; i < size; i++) {
int funnelStepEventWithExtraFieldsByteSize = byteBuffer.getInt();
byte[] bytes = new byte[funnelStepEventWithExtraFieldsByteSize];
byteBuffer.get(bytes);
funnelStepEvents.add(new FunnelStepEventWithExtraFields(bytes));
}
return funnelStepEvents;
}
};

// NOTE: DO NOT change the order, it has to be the same order as the ObjectType
//@formatter:off
private static final ObjectSerDe[] SER_DES = {
Expand Down Expand Up @@ -1802,6 +1860,7 @@ public PriorityQueue<FunnelStepEvent> deserialize(ByteBuffer byteBuffer) {
DATA_SKETCH_CPC_ACCUMULATOR_SER_DE,
ORDERED_STRING_SET_SER_DE,
FUNNEL_STEP_EVENT_ACCUMULATOR_SER_DE,
FUNNEL_STEP_EVENT_WITH_EXTRA_FIELDS_ACCUMULATOR_SER_DE,
};
//@formatter:on

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@
package org.apache.pinot.core.common.datablock;

import com.google.common.base.Preconditions;
import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
import it.unimi.dsi.fastutil.floats.FloatArrayList;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.longs.LongArrayList;
import it.unimi.dsi.fastutil.objects.Object2IntMap;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import java.io.IOException;
Expand Down Expand Up @@ -121,16 +125,32 @@ public static RowDataBlock buildFromRows(List<Object[]> rows, DataSchema dataSch
break;
// Multi-value column
case INT_ARRAY:
setColumn(fixedSize, varSize, (int[]) value);
if (value instanceof IntArrayList) {
setColumn(fixedSize, varSize, ((IntArrayList) value).elements());
} else {
setColumn(fixedSize, varSize, (int[]) value);
}
break;
case LONG_ARRAY:
setColumn(fixedSize, varSize, (long[]) value);
if (value instanceof LongArrayList) {
setColumn(fixedSize, varSize, ((LongArrayList) value).elements());
} else {
setColumn(fixedSize, varSize, (long[]) value);
}
break;
case FLOAT_ARRAY:
setColumn(fixedSize, varSize, (float[]) value);
if (value instanceof FloatArrayList) {
setColumn(fixedSize, varSize, ((FloatArrayList) value).elements());
} else {
setColumn(fixedSize, varSize, (float[]) value);
}
break;
case DOUBLE_ARRAY:
setColumn(fixedSize, varSize, (double[]) value);
if (value instanceof DoubleArrayList) {
setColumn(fixedSize, varSize, ((DoubleArrayList) value).elements());
} else {
setColumn(fixedSize, varSize, (double[]) value);
}
break;
case STRING_ARRAY:
setColumn(fixedSize, varSize, (String[]) value, dictionary);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
*/
package org.apache.pinot.core.operator.docvalsets;

import it.unimi.dsi.fastutil.doubles.DoubleArrayList;
import it.unimi.dsi.fastutil.ints.IntArrayList;
import it.unimi.dsi.fastutil.longs.LongArrayList;
import it.unimi.dsi.fastutil.objects.ObjectArrayList;
import java.math.BigDecimal;
import java.util.Arrays;
import java.util.List;
Expand Down Expand Up @@ -435,6 +439,30 @@ public int[][] getIntValuesMV() {
for (int j = 0; j < stringArray.length; j++) {
values[i][j] = Integer.parseInt(stringArray[j]);
}
} else if (storedValue instanceof IntArrayList) {
IntArrayList intArrayList = (IntArrayList) storedValue;
values[i] = new int[intArrayList.size()];
for (int j = 0; j < intArrayList.size(); j++) {
values[i][j] = intArrayList.getInt(j);
}
} else if (storedValue instanceof LongArrayList) {
LongArrayList longArrayList = (LongArrayList) storedValue;
values[i] = new int[longArrayList.size()];
for (int j = 0; j < longArrayList.size(); j++) {
values[i][j] = (int) longArrayList.getLong(j);
}
} else if (storedValue instanceof DoubleArrayList) {
DoubleArrayList doubleArrayList = (DoubleArrayList) storedValue;
values[i] = new int[doubleArrayList.size()];
for (int j = 0; j < doubleArrayList.size(); j++) {
values[i][j] = (int) doubleArrayList.getDouble(j);
}
} else if (storedValue instanceof ObjectArrayList) {
ObjectArrayList list = (ObjectArrayList) storedValue;
values[i] = new int[list.size()];
for (int j = 0; j < list.size(); j++) {
values[i][j] = Integer.parseInt(list.get(j).toString());
}
} else {
throw new IllegalStateException("Unsupported data type: " + storedValue.getClass().getName());
}
Expand Down Expand Up @@ -475,6 +503,30 @@ public long[][] getLongValuesMV() {
for (int j = 0; j < stringArray.length; j++) {
values[i][j] = Long.parseLong(stringArray[j]);
}
} else if (storedValue instanceof IntArrayList) {
IntArrayList intArrayList = (IntArrayList) storedValue;
values[i] = new long[intArrayList.size()];
for (int j = 0; j < intArrayList.size(); j++) {
values[i][j] = intArrayList.getInt(j);
}
} else if (storedValue instanceof LongArrayList) {
LongArrayList longArrayList = (LongArrayList) storedValue;
values[i] = new long[longArrayList.size()];
for (int j = 0; j < longArrayList.size(); j++) {
values[i][j] = longArrayList.getLong(j);
}
} else if (storedValue instanceof DoubleArrayList) {
DoubleArrayList doubleArrayList = (DoubleArrayList) storedValue;
values[i] = new long[doubleArrayList.size()];
for (int j = 0; j < doubleArrayList.size(); j++) {
values[i][j] = (long) doubleArrayList.getDouble(j);
}
} else if (storedValue instanceof ObjectArrayList) {
ObjectArrayList list = (ObjectArrayList) storedValue;
values[i] = new long[list.size()];
for (int j = 0; j < list.size(); j++) {
values[i][j] = Long.parseLong(list.get(j).toString());
}
} else {
throw new IllegalStateException("Unsupported data type: " + storedValue.getClass().getName());
}
Expand Down Expand Up @@ -515,6 +567,30 @@ public float[][] getFloatValuesMV() {
for (int j = 0; j < stringArray.length; j++) {
values[i][j] = Float.parseFloat(stringArray[j]);
}
} else if (storedValue instanceof IntArrayList) {
IntArrayList intArrayList = (IntArrayList) storedValue;
values[i] = new float[intArrayList.size()];
for (int j = 0; j < intArrayList.size(); j++) {
values[i][j] = intArrayList.getInt(j);
}
} else if (storedValue instanceof LongArrayList) {
LongArrayList longArrayList = (LongArrayList) storedValue;
values[i] = new float[longArrayList.size()];
for (int j = 0; j < longArrayList.size(); j++) {
values[i][j] = longArrayList.getLong(j);
}
} else if (storedValue instanceof DoubleArrayList) {
DoubleArrayList doubleArrayList = (DoubleArrayList) storedValue;
values[i] = new float[doubleArrayList.size()];
for (int j = 0; j < doubleArrayList.size(); j++) {
values[i][j] = (float) doubleArrayList.getDouble(j);
}
} else if (storedValue instanceof ObjectArrayList) {
ObjectArrayList list = (ObjectArrayList) storedValue;
values[i] = new float[list.size()];
for (int j = 0; j < list.size(); j++) {
values[i][j] = Float.parseFloat(list.get(j).toString());
}
} else {
throw new IllegalStateException("Unsupported data type: " + storedValue.getClass().getName());
}
Expand Down Expand Up @@ -555,6 +631,30 @@ public double[][] getDoubleValuesMV() {
for (int j = 0; j < stringArray.length; j++) {
values[i][j] = Double.parseDouble(stringArray[j]);
}
} else if (storedValue instanceof IntArrayList) {
IntArrayList intArrayList = (IntArrayList) storedValue;
values[i] = new double[intArrayList.size()];
for (int j = 0; j < intArrayList.size(); j++) {
values[i][j] = intArrayList.getInt(j);
}
} else if (storedValue instanceof LongArrayList) {
LongArrayList longArrayList = (LongArrayList) storedValue;
values[i] = new double[longArrayList.size()];
for (int j = 0; j < longArrayList.size(); j++) {
values[i][j] = longArrayList.getLong(j);
}
} else if (storedValue instanceof DoubleArrayList) {
DoubleArrayList doubleArrayList = (DoubleArrayList) storedValue;
values[i] = new double[doubleArrayList.size()];
for (int j = 0; j < doubleArrayList.size(); j++) {
values[i][j] = doubleArrayList.getDouble(j);
}
} else if (storedValue instanceof ObjectArrayList) {
ObjectArrayList list = (ObjectArrayList) storedValue;
values[i] = new double[list.size()];
for (int j = 0; j < list.size(); j++) {
values[i][j] = Double.parseDouble(list.get(j).toString());
}
} else {
throw new IllegalStateException("Unsupported data type: " + storedValue.getClass().getName());
}
Expand Down Expand Up @@ -601,6 +701,30 @@ public String[][] getStringValuesMV() {
}
} else if (storedValue instanceof String[]) {
values[i] = (String[]) storedValue;
} else if (storedValue instanceof IntArrayList) {
IntArrayList intArrayList = (IntArrayList) storedValue;
values[i] = new String[intArrayList.size()];
for (int j = 0; j < intArrayList.size(); j++) {
values[i][j] = Integer.toString(intArrayList.getInt(j));
}
} else if (storedValue instanceof LongArrayList) {
LongArrayList longArrayList = (LongArrayList) storedValue;
values[i] = new String[longArrayList.size()];
for (int j = 0; j < longArrayList.size(); j++) {
values[i][j] = Long.toString(longArrayList.getLong(j));
}
} else if (storedValue instanceof DoubleArrayList) {
DoubleArrayList doubleArrayList = (DoubleArrayList) storedValue;
values[i] = new String[doubleArrayList.size()];
for (int j = 0; j < doubleArrayList.size(); j++) {
values[i][j] = Double.toString(doubleArrayList.getDouble(j));
}
} else if (storedValue instanceof ObjectArrayList) {
ObjectArrayList list = (ObjectArrayList) storedValue;
values[i] = new String[list.size()];
for (int j = 0; j < list.size(); j++) {
values[i][j] = list.get(j).toString();
}
} else {
throw new IllegalStateException("Unsupported data type: " + storedValue.getClass().getName());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,10 @@
import org.apache.pinot.core.query.aggregation.function.array.SumArrayLongAggregationFunction;
import org.apache.pinot.core.query.aggregation.function.funnel.FunnelCountAggregationFunctionFactory;
import org.apache.pinot.core.query.aggregation.function.funnel.window.FunnelCompleteCountAggregationFunction;
import org.apache.pinot.core.query.aggregation.function.funnel.window.FunnelEventsFunctionEvalAggregationFunction;
import org.apache.pinot.core.query.aggregation.function.funnel.window.FunnelMatchStepAggregationFunction;
import org.apache.pinot.core.query.aggregation.function.funnel.window.FunnelMaxStepAggregationFunction;
import org.apache.pinot.core.query.aggregation.function.funnel.window.FunnelStepDurationStatsAggregationFunction;
import org.apache.pinot.segment.spi.AggregationFunctionType;
import org.apache.pinot.spi.data.FieldSpec.DataType;
import org.apache.pinot.spi.exception.BadQueryRequestException;
Expand Down Expand Up @@ -467,6 +469,10 @@ public static AggregationFunction getAggregationFunction(FunctionContext functio
return new FunnelMatchStepAggregationFunction(arguments);
case FUNNELCOMPLETECOUNT:
return new FunnelCompleteCountAggregationFunction(arguments);
case FUNNELSTEPDURATIONSTATS:
return new FunnelStepDurationStatsAggregationFunction(arguments);
case FUNNELEVENTSFUNCTIONEVAL:
return new FunnelEventsFunctionEvalAggregationFunction(arguments);
case FREQUENTSTRINGSSKETCH:
return new FrequentStringsSketchAggregationFunction(arguments);
case FREQUENTLONGSSKETCH:
Expand Down
Loading

0 comments on commit d47dc56

Please sign in to comment.