Skip to content

Commit

Permalink
Introducing MSE result holder config to minimize rehashing for high c…
Browse files Browse the repository at this point in the history
…ardinality group by (#14981)
  • Loading branch information
shauryachats authored Feb 6, 2025
1 parent db3f606 commit 465c811
Show file tree
Hide file tree
Showing 13 changed files with 80 additions and 24 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,12 @@ public static boolean optimizeMaxInitialResultHolderCapacityEnabled(Map<String,
return Boolean.parseBoolean(queryOptions.get(QueryOptionKey.OPTIMIZE_MAX_INITIAL_RESULT_HOLDER_CAPACITY));
}

@Nullable
public static Integer getMSEMaxInitialResultHolderCapacity(Map<String, String> queryOptions) {
String maxInitialCapacity = queryOptions.get(QueryOptionKey.MSE_MAX_INITIAL_RESULT_HOLDER_CAPACITY);
return checkedParseIntPositive(QueryOptionKey.MSE_MAX_INITIAL_RESULT_HOLDER_CAPACITY, maxInitialCapacity);
}

@Nullable
public static Integer getMinInitialIndexedTableCapacity(Map<String, String> queryOptions) {
String minInitialIndexedTableCapacity = queryOptions.get(QueryOptionKey.MIN_INITIAL_INDEXED_TABLE_CAPACITY);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public static class AggregateOptions {
public static final String GROUP_TRIM_SIZE = "group_trim_size";

public static final String MAX_INITIAL_RESULT_HOLDER_CAPACITY = "max_initial_result_holder_capacity";
public static final String MSE_MAX_INITIAL_RESULT_HOLDER_CAPACITY = "mse_max_initial_result_holder_capacity";
}

public static class WindowHintOptions {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,8 @@ public class QueryRunner {
private Integer _maxInitialResultHolderCapacity;
@Nullable
private Integer _minInitialIndexedTableCapacity;
@Nullable
private Integer _mseMaxInitialResultHolderCapacity;

// Join overflow settings
@Nullable
Expand Down Expand Up @@ -158,6 +160,12 @@ public void init(PinotConfiguration config, InstanceDataManager instanceDataMana
_minInitialIndexedTableCapacity =
minInitialIndexedTableCapacityStr != null ? Integer.parseInt(minInitialIndexedTableCapacityStr) : null;


String mseMaxInitialGroupHolderCapacity =
config.getProperty(CommonConstants.Server.CONFIG_OF_MSE_MAX_INITIAL_RESULT_HOLDER_CAPACITY);
_mseMaxInitialResultHolderCapacity =
mseMaxInitialGroupHolderCapacity != null ? Integer.parseInt(mseMaxInitialGroupHolderCapacity) : null;

String maxRowsInJoinStr = config.getProperty(CommonConstants.MultiStageQueryRunner.KEY_OF_MAX_ROWS_IN_JOIN);
_maxRowsInJoin = maxRowsInJoinStr != null ? Integer.parseInt(maxRowsInJoinStr) : null;

Expand Down Expand Up @@ -377,6 +385,15 @@ private Map<String, String> consolidateMetadata(Map<String, String> customProper
Integer.toString(minInitialIndexedTableCapacity));
}

Integer mseMaxInitialResultHolderCapacity = QueryOptionsUtils.getMSEMaxInitialResultHolderCapacity(opChainMetadata);
if (mseMaxInitialResultHolderCapacity == null) {
mseMaxInitialResultHolderCapacity = _mseMaxInitialResultHolderCapacity;
}
if (mseMaxInitialResultHolderCapacity != null) {
opChainMetadata.put(QueryOptionKey.MSE_MAX_INITIAL_RESULT_HOLDER_CAPACITY,
Integer.toString(mseMaxInitialResultHolderCapacity));
}

Integer maxRowsInJoin = QueryOptionsUtils.getMaxRowsInJoin(opChainMetadata);
if (maxRowsInJoin == null) {
maxRowsInJoin = _maxRowsInJoin;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,9 @@ public MultistageGroupByExecutor(
_aggType = aggType;
_leafReturnFinalResult = leafReturnFinalResult;
_resultSchema = resultSchema;
int maxInitialResultHolderCapacity = getMaxInitialResultHolderCapacity(opChainMetadata, nodeHint);

int maxInitialResultHolderCapacity = getResolvedMaxInitialResultHolderCapacity(opChainMetadata, nodeHint);

_numGroupsLimit = getNumGroupsLimit(opChainMetadata, nodeHint);

// By default, we compute all groups for SQL compliant results. However, we allow overriding this behavior via
Expand All @@ -109,7 +111,7 @@ public MultistageGroupByExecutor(

_groupIdGenerator =
GroupIdGeneratorFactory.getGroupIdGenerator(_resultSchema.getStoredColumnDataTypes(), groupKeyIds.length,
_numGroupsLimit);
_numGroupsLimit, maxInitialResultHolderCapacity);
}

private int getNumGroupsLimit(Map<String, String> opChainMetadata, @Nullable PlanNode.NodeHint nodeHint) {
Expand All @@ -126,6 +128,13 @@ private int getNumGroupsLimit(Map<String, String> opChainMetadata, @Nullable Pla
return numGroupsLimit != null ? numGroupsLimit : InstancePlanMakerImplV2.DEFAULT_NUM_GROUPS_LIMIT;
}

private int getResolvedMaxInitialResultHolderCapacity(Map<String, String> opChainMetadata,
@Nullable PlanNode.NodeHint nodeHint) {
Integer mseMaxInitialResultHolderCapacity = getMSEMaxInitialResultHolderCapacity(opChainMetadata, nodeHint);
return (mseMaxInitialResultHolderCapacity != null) ? mseMaxInitialResultHolderCapacity
: getMaxInitialResultHolderCapacity(opChainMetadata, nodeHint);
}

private int getMaxInitialResultHolderCapacity(Map<String, String> opChainMetadata,
@Nullable PlanNode.NodeHint nodeHint) {
if (nodeHint != null) {
Expand All @@ -143,6 +152,22 @@ private int getMaxInitialResultHolderCapacity(Map<String, String> opChainMetadat
: InstancePlanMakerImplV2.DEFAULT_MAX_INITIAL_RESULT_HOLDER_CAPACITY;
}

private Integer getMSEMaxInitialResultHolderCapacity(Map<String, String> opChainMetadata,
@Nullable PlanNode.NodeHint nodeHint) {
if (nodeHint != null) {
Map<String, String> aggregateOptions = nodeHint.getHintOptions().get(PinotHintOptions.AGGREGATE_HINT_OPTIONS);
if (aggregateOptions != null) {
String maxInitialMSEResultHolderCapacityStr =
aggregateOptions.get(PinotHintOptions.AggregateOptions.MSE_MAX_INITIAL_RESULT_HOLDER_CAPACITY);
if (maxInitialMSEResultHolderCapacityStr != null) {
return Integer.parseInt(maxInitialMSEResultHolderCapacityStr);
}
}
}
// Don't return default value since null value means we need to fallback to MaxInitialResultHolderCapacity
return QueryOptionsUtils.getMSEMaxInitialResultHolderCapacity(opChainMetadata);
}

public int getNumGroupsLimit() {
return _numGroupsLimit;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,26 @@ public class GroupIdGeneratorFactory {
private GroupIdGeneratorFactory() {
}

public static GroupIdGenerator getGroupIdGenerator(ColumnDataType[] keyTypes, int numKeyColumns, int numGroupsLimit) {
public static GroupIdGenerator getGroupIdGenerator(ColumnDataType[] keyTypes, int numKeyColumns,
int numGroupsLimit, int maxInitialResultHolderCapacity) {
int initialCapacity = Math.min(maxInitialResultHolderCapacity, numGroupsLimit);
if (numKeyColumns == 1) {
switch (keyTypes[0]) {
case INT:
return new OneIntKeyGroupIdGenerator(numGroupsLimit);
return new OneIntKeyGroupIdGenerator(numGroupsLimit, initialCapacity);
case LONG:
return new OneLongKeyGroupIdGenerator(numGroupsLimit);
return new OneLongKeyGroupIdGenerator(numGroupsLimit, initialCapacity);
case FLOAT:
return new OneFloatKeyGroupIdGenerator(numGroupsLimit);
return new OneFloatKeyGroupIdGenerator(numGroupsLimit, initialCapacity);
case DOUBLE:
return new OneDoubleKeyGroupIdGenerator(numGroupsLimit);
return new OneDoubleKeyGroupIdGenerator(numGroupsLimit, initialCapacity);
default:
return new OneObjectKeyGroupIdGenerator(numGroupsLimit);
return new OneObjectKeyGroupIdGenerator(numGroupsLimit, initialCapacity);
}
} else if (numKeyColumns == 2) {
return new TwoKeysGroupIdGenerator(keyTypes[0], keyTypes[1], numGroupsLimit);
return new TwoKeysGroupIdGenerator(keyTypes[0], keyTypes[1], numGroupsLimit, initialCapacity);
} else {
return new MultiKeysGroupIdGenerator(keyTypes, numKeyColumns, numGroupsLimit);
return new MultiKeysGroupIdGenerator(keyTypes, numKeyColumns, numGroupsLimit, initialCapacity);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,9 @@ public class MultiKeysGroupIdGenerator implements GroupIdGenerator {
private final ValueToIdMap[] _keyToIdMaps;
private final int _numGroupsLimit;

public MultiKeysGroupIdGenerator(ColumnDataType[] keyTypes, int numKeyColumns, int numGroupsLimit) {
_groupIdMap = new Object2IntOpenHashMap<>();
public MultiKeysGroupIdGenerator(ColumnDataType[] keyTypes, int numKeyColumns,
int numGroupsLimit, int initialCapacity) {
_groupIdMap = new Object2IntOpenHashMap<>(initialCapacity);
_groupIdMap.defaultReturnValue(INVALID_ID);
_keyToIdMaps = new ValueToIdMap[numKeyColumns];
for (int i = 0; i < numKeyColumns; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ public class OneDoubleKeyGroupIdGenerator implements GroupIdGenerator {
private int _numGroups = 0;
private int _nullGroupId = INVALID_ID;

public OneDoubleKeyGroupIdGenerator(int numGroupsLimit) {
_groupIdMap = new Double2IntOpenHashMap();
public OneDoubleKeyGroupIdGenerator(int numGroupsLimit, int initialCapacity) {
_groupIdMap = new Double2IntOpenHashMap(initialCapacity);
_groupIdMap.defaultReturnValue(INVALID_ID);
_numGroupsLimit = numGroupsLimit;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ public class OneFloatKeyGroupIdGenerator implements GroupIdGenerator {
private int _numGroups = 0;
private int _nullGroupId = INVALID_ID;

public OneFloatKeyGroupIdGenerator(int numGroupsLimit) {
_groupIdMap = new Float2IntOpenHashMap();
public OneFloatKeyGroupIdGenerator(int numGroupsLimit, int initialCapacity) {
_groupIdMap = new Float2IntOpenHashMap(initialCapacity);
_groupIdMap.defaultReturnValue(INVALID_ID);
_numGroupsLimit = numGroupsLimit;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ public class OneIntKeyGroupIdGenerator implements GroupIdGenerator {
private int _numGroups = 0;
private int _nullGroupId = INVALID_ID;

public OneIntKeyGroupIdGenerator(int numGroupsLimit) {
_groupIdMap = new Int2IntOpenHashMap();
public OneIntKeyGroupIdGenerator(int numGroupsLimit, int initialCapacity) {
_groupIdMap = new Int2IntOpenHashMap(initialCapacity);
_groupIdMap.defaultReturnValue(INVALID_ID);
_numGroupsLimit = numGroupsLimit;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ public class OneLongKeyGroupIdGenerator implements GroupIdGenerator {
private int _numGroups = 0;
private int _nullGroupId = INVALID_ID;

public OneLongKeyGroupIdGenerator(int numGroupsLimit) {
_groupIdMap = new Long2IntOpenHashMap();
public OneLongKeyGroupIdGenerator(int numGroupsLimit, int initialCapacity) {
_groupIdMap = new Long2IntOpenHashMap(initialCapacity);
_groupIdMap.defaultReturnValue(INVALID_ID);
_numGroupsLimit = numGroupsLimit;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ public class OneObjectKeyGroupIdGenerator implements GroupIdGenerator {
private final Object2IntOpenHashMap<Object> _groupIdMap;
private final int _numGroupsLimit;

public OneObjectKeyGroupIdGenerator(int numGroupsLimit) {
_groupIdMap = new Object2IntOpenHashMap<>();
public OneObjectKeyGroupIdGenerator(int numGroupsLimit, int initialCapacity) {
_groupIdMap = new Object2IntOpenHashMap<>(initialCapacity);
_groupIdMap.defaultReturnValue(INVALID_ID);
_numGroupsLimit = numGroupsLimit;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,9 @@ public class TwoKeysGroupIdGenerator implements GroupIdGenerator {
private final ValueToIdMap _secondKeyToIdMap;
private final int _numGroupsLimit;

public TwoKeysGroupIdGenerator(ColumnDataType firstKeyType, ColumnDataType secondKeyType, int numGroupsLimit) {
_groupIdMap = new Long2IntOpenHashMap();
public TwoKeysGroupIdGenerator(ColumnDataType firstKeyType,
ColumnDataType secondKeyType, int numGroupsLimit, int initialCapacity) {
_groupIdMap = new Long2IntOpenHashMap(initialCapacity);
_groupIdMap.defaultReturnValue(INVALID_ID);
_firstKeyToIdMap = ValueToIdMapFactory.get(firstKeyType.toDataType());
_secondKeyToIdMap = ValueToIdMapFactory.get(secondKeyType.toDataType());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -509,6 +509,7 @@ public static class QueryOptionKey {
public static final String NUM_GROUPS_LIMIT = "numGroupsLimit";
public static final String MAX_INITIAL_RESULT_HOLDER_CAPACITY = "maxInitialResultHolderCapacity";
public static final String MIN_INITIAL_INDEXED_TABLE_CAPACITY = "minInitialIndexedTableCapacity";
public static final String MSE_MAX_INITIAL_RESULT_HOLDER_CAPACITY = "mseMaxInitialResultHolderCapacity";
public static final String GROUP_TRIM_THRESHOLD = "groupTrimThreshold";
public static final String STAGE_PARALLELISM = "stageParallelism";

Expand Down Expand Up @@ -764,6 +765,8 @@ public static class Server {
"pinot.server.query.executor.group.trim.size";
public static final String CONFIG_OF_QUERY_EXECUTOR_MAX_INITIAL_RESULT_HOLDER_CAPACITY =
"pinot.server.query.executor.max.init.group.holder.capacity";
public static final String CONFIG_OF_MSE_MAX_INITIAL_RESULT_HOLDER_CAPACITY =
"pinot.server.mse.max.init.group.holder.capacity";
public static final String CONFIG_OF_QUERY_EXECUTOR_MIN_INITIAL_INDEXED_TABLE_CAPACITY =
"pinot.server.query.executor.min.init.indexed.table.capacity";

Expand Down

0 comments on commit 465c811

Please sign in to comment.