From 1d79a9df7a4f67e2a1651b6f27a9552645c73018 Mon Sep 17 00:00:00 2001 From: Bolek Ziobrowski <26925920+bziobrowski@users.noreply.github.com> Date: Tue, 11 Feb 2025 06:37:05 +0100 Subject: [PATCH] Add broker config with default value for is_enable_group_trim hint (#14990) PR adds pinot.broker.enable.group.trim broker configuration setting which allows enabling is_enable_group_trim hint for all queries, which in turn enables V1-style trimming in leaf nodes and v2-style trimming in intermediate nodes. This default value is overridden with value passed in hint. --- .../MultiStageBrokerRequestHandler.java | 4 + .../tests/ExplainIntegrationTestTrait.java | 13 + ...roupByEnableTrimOptionIntegrationTest.java | 231 ++++++++++++++++++ .../tests/GroupByOptionsIntegrationTest.java | 51 +++- .../PinotAggregateExchangeNodeInsertRule.java | 36 ++- .../apache/pinot/query/QueryEnvironment.java | 7 +- .../pinot/query/context/PlannerContext.java | 5 +- .../pinot/spi/utils/CommonConstants.java | 5 + 8 files changed, 340 insertions(+), 12 deletions(-) create mode 100644 pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByEnableTrimOptionIntegrationTest.java diff --git a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java index 642ea7039766..46b8a8d64be8 100644 --- a/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java +++ b/pinot-broker/src/main/java/org/apache/pinot/broker/requesthandler/MultiStageBrokerRequestHandler.java @@ -156,10 +156,13 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO queryTimeoutMs = timeoutMsFromQueryOption != null ? timeoutMsFromQueryOption : _brokerTimeoutMs; queryTimer = new Timer(queryTimeoutMs); database = DatabaseUtils.extractDatabaseFromQueryRequest(queryOptions, httpHeaders); + boolean inferPartitionHint = _config.getProperty(CommonConstants.Broker.CONFIG_OF_INFER_PARTITION_HINT, CommonConstants.Broker.DEFAULT_INFER_PARTITION_HINT); boolean defaultUseSpool = _config.getProperty(CommonConstants.Broker.CONFIG_OF_SPOOLS, CommonConstants.Broker.DEFAULT_OF_SPOOLS); + boolean defaultEnableGroupTrim = _config.getProperty(CommonConstants.Broker.CONFIG_OF_ENABLE_GROUP_TRIM, + CommonConstants.Broker.DEFAULT_BROKER_ENABLE_GROUP_TRIM); queryEnvironment = new QueryEnvironment(QueryEnvironment.configBuilder() .database(database) @@ -167,6 +170,7 @@ protected BrokerResponse handleRequest(long requestId, String query, SqlNodeAndO .workerManager(_workerManager) .defaultInferPartitionHint(inferPartitionHint) .defaultUseSpools(defaultUseSpool) + .defaultEnableGroupTrim(defaultEnableGroupTrim) .build()); switch (sqlNodeAndOptions.getSqlNode().getKind()) { diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExplainIntegrationTestTrait.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExplainIntegrationTestTrait.java index cbe0ffd09fbe..a2c93c6e6eb0 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExplainIntegrationTestTrait.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/ExplainIntegrationTestTrait.java @@ -102,6 +102,19 @@ default void explain(@Language("sql") String query, String expected) { } } + default void explainAskingServers(@Language("sql") String query, String expected) { + try { + JsonNode jsonNode = postQuery("set explainAskingServers=true; explain plan for " + query); + JsonNode plan = jsonNode.get("resultTable").get("rows").get(0).get(1); + + Assert.assertEquals(plan.asText(), expected); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + throw new RuntimeException(e); + } + } + default void explainVerbose(@Language("sql") String query, String expected) { try { JsonNode jsonNode = postQuery("set explainPlanVerbose=true; explain plan for " + query); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByEnableTrimOptionIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByEnableTrimOptionIntegrationTest.java new file mode 100644 index 000000000000..6b3bfd0b5b7d --- /dev/null +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByEnableTrimOptionIntegrationTest.java @@ -0,0 +1,231 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.integration.tests; + +import com.fasterxml.jackson.databind.JsonNode; +import java.io.File; +import java.time.Duration; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import org.apache.commons.io.FileUtils; +import org.apache.pinot.spi.config.table.TableConfig; +import org.apache.pinot.spi.config.table.TableType; +import org.apache.pinot.spi.data.FieldSpec; +import org.apache.pinot.spi.data.Schema; +import org.apache.pinot.spi.env.PinotConfiguration; +import org.apache.pinot.spi.utils.CommonConstants; +import org.apache.pinot.spi.utils.builder.TableConfigBuilder; +import org.apache.pinot.util.TestUtils; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import static org.apache.pinot.integration.tests.ClusterIntegrationTestUtils.getBrokerQueryApiUrl; + + +// similar to GroupByOptionsIntegrationTest but this test verifies that default enable group trim option works even +// if hint is not set in the query +public class GroupByEnableTrimOptionIntegrationTest extends BaseClusterIntegrationTestSet { + + static final int FILES_NO = 4; + static final int RECORDS_NO = 20; + static final String I_COL = "i"; + static final String J_COL = "j"; + static final int SERVERS_NO = 2; + + @BeforeClass + public void setUp() + throws Exception { + TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir); + + startZk(); + startController(); + startServers(SERVERS_NO); + startBroker(); + + Schema schema = new Schema.SchemaBuilder().setSchemaName(DEFAULT_SCHEMA_NAME) + .addSingleValueDimension(I_COL, FieldSpec.DataType.INT) + .addSingleValueDimension(J_COL, FieldSpec.DataType.LONG) + .build(); + addSchema(schema); + TableConfig tableConfig = createOfflineTableConfig(); + addTableConfig(tableConfig); + + List avroFiles = GroupByOptionsIntegrationTest.createAvroFile(_tempDir); + ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig, schema, 0, _segmentDir, _tarDir); + uploadSegments(DEFAULT_TABLE_NAME, _tarDir); + + // Wait for all documents loaded + TestUtils.waitForCondition(() -> getCurrentCountStarResult(DEFAULT_TABLE_NAME) == FILES_NO * RECORDS_NO, 100L, + 60_000, + "Failed to load documents", true, Duration.ofMillis(60_000 / 10)); + + setUseMultiStageQueryEngine(true); + + Map> map = getTableServersToSegmentsMap(getTableName(), TableType.OFFLINE); + + // make sure segments are split between multiple servers + Assert.assertEquals(map.size(), SERVERS_NO); + } + + @Override + protected void overrideBrokerConf(PinotConfiguration brokerConf) { + super.overrideBrokerConf(brokerConf); + + brokerConf.setProperty(CommonConstants.Broker.CONFIG_OF_ENABLE_GROUP_TRIM, "true"); + } + + @Override + protected void overrideServerConf(PinotConfiguration serverConf) { + super.overrideServerConf(serverConf); + serverConf.setProperty(CommonConstants.Server.CONFIG_OF_QUERY_EXECUTOR_GROUP_TRIM_SIZE, "3"); + } + + @Test + public void testOrderByKeysIsPushedToFinalAggregationStageWhenGroupTrimIsEnabledByDefault() + throws Exception { + final String trimEnabledPlan = "Execution Plan\n" + + "LogicalSort(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[ASC], offset=[0], fetch=[3])\n" + + " PinotLogicalSortExchange(distribution=[hash], collation=[[0, 1]], isSortOnSender=[false], " + + "isSortOnReceiver=[true])\n" + + " LogicalSort(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[ASC], fetch=[3])\n" + // 'collations' below is the important bit + + " PinotLogicalAggregate(group=[{0, 1}], agg#0=[COUNT($2)], aggType=[FINAL], collations=[[0, " + + "1]], limit=[3])\n" + + " PinotLogicalExchange(distribution=[hash[0, 1]])\n" + + " LeafStageCombineOperator(table=[mytable])\n" + + " StreamingInstanceResponse\n" + + " CombineGroupBy\n" + + " GroupBy(groupKeys=[[i, j]], aggregations=[[count(*)]])\n" + + " Project(columns=[[i, j]])\n" + + " DocIdSet(maxDocs=[40000])\n" + + " FilterMatchEntireSegment(numDocs=[80])\n"; + + assertResultAndPlan( + // group_trim_size should sort and limit v2 aggregate output if order by and limit is propagated + " ", + " select i, j, count(*) as cnt " + + " from " + getTableName() + + " group by i, j " + + " order by i asc, j asc " + + " limit 3", + "\"i\"[\"INT\"],\t\"j\"[\"LONG\"],\t\"cnt\"[\"LONG\"]\n" + + "0,\t0,\t2\n" + + "0,\t1,\t2\n" + + "0,\t2,\t2", + trimEnabledPlan); + + assertResultAndPlan( + " ", + " select /*+ aggOptions(bogus_hint='false') */ i, j, count(*) as cnt " + + " from " + getTableName() + + " group by i, j " + + " order by i asc, j asc " + + " limit 3", + "\"i\"[\"INT\"],\t\"j\"[\"LONG\"],\t\"cnt\"[\"LONG\"]\n" + + "0,\t0,\t2\n" + + "0,\t1,\t2\n" + + "0,\t2,\t2", + trimEnabledPlan); + } + + @Test + public void testOrderByKeysIsNotPushedToFinalAggregationStageWhenGroupTrimIsDisabledInHint() + throws Exception { + assertResultAndPlan( + " ", + " select /*+ aggOptions(is_enable_group_trim='false') */ i, j, count(*) as cnt " + + " from " + getTableName() + + " group by i, j " + + " order by i asc, j asc " + + " limit 3", + "\"i\"[\"INT\"],\t\"j\"[\"LONG\"],\t\"cnt\"[\"LONG\"]\n" + + "0,\t0,\t2\n" + + "0,\t1,\t2\n" + + "0,\t2,\t2", + "Execution Plan\n" + + "LogicalSort(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[ASC], offset=[0], fetch=[3])\n" + + " PinotLogicalSortExchange(distribution=[hash], collation=[[0, 1]], isSortOnSender=[false], " + + "isSortOnReceiver=[true])\n" + + " LogicalSort(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[ASC], fetch=[3])\n" + // lack of 'collations' below is the important bit + + " PinotLogicalAggregate(group=[{0, 1}], agg#0=[COUNT($2)], aggType=[FINAL])\n" + + " PinotLogicalExchange(distribution=[hash[0, 1]])\n" + + " LeafStageCombineOperator(table=[mytable])\n" + + " StreamingInstanceResponse\n" + + " CombineGroupBy\n" + + " GroupBy(groupKeys=[[i, j]], aggregations=[[count(*)]])\n" + + " Project(columns=[[i, j]])\n" + + " DocIdSet(maxDocs=[40000])\n" + + " FilterMatchEntireSegment(numDocs=[80])\n"); + } + + protected TableConfig createOfflineTableConfig() { + return new TableConfigBuilder(TableType.OFFLINE) + .setTableName(getTableName()) + .setNumReplicas(getNumReplicas()) + .setBrokerTenant(getBrokerTenant()) + .build(); + } + + // for debug only + protected Properties getPinotConnectionProperties() { + Properties properties = new Properties(); + properties.put("timeoutMs", "3600000"); + properties.put("brokerReadTimeoutMs", "3600000"); + properties.put("brokerConnectTimeoutMs", "3600000"); + properties.putAll(getExtraQueryProperties()); + return properties; + } + + public void assertResultAndPlan(String option, String query, String expectedResult, String expectedPlan) + throws Exception { + String sql = option + //disable timeout in debug + + "set timeoutMs=3600000; set brokerReadTimeoutMs=3600000; set brokerConnectTimeoutMs=3600000; " + + query; + + JsonNode result = postV2Query(sql); + JsonNode plan = postV2Query(option + " set explainAskingServers=true; explain plan for " + query); + + Assert.assertEquals(GroupByOptionsIntegrationTest.toResultStr(result), expectedResult); + Assert.assertEquals(GroupByOptionsIntegrationTest.toExplainStr(plan), expectedPlan); + } + + private JsonNode postV2Query(String query) + throws Exception { + return postQuery(query, getBrokerQueryApiUrl(getBrokerBaseApiUrl(), true), null, + getExtraQueryProperties()); + } + + @AfterClass + public void tearDown() + throws Exception { + dropOfflineTable(DEFAULT_TABLE_NAME); + + stopServer(); + stopBroker(); + stopController(); + stopZk(); + + FileUtils.deleteDirectory(_tempDir); + } +} diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByOptionsIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByOptionsIntegrationTest.java index 03af87b0602f..fc970d5ca0d5 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByOptionsIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/GroupByOptionsIntegrationTest.java @@ -74,7 +74,7 @@ public void setUp() TableConfig tableConfig = createOfflineTableConfig(); addTableConfig(tableConfig); - List avroFiles = createAvroFile(); + List avroFiles = createAvroFile(_tempDir); ClusterIntegrationTestUtils.buildSegmentsFromAvro(avroFiles, tableConfig, schema, 0, _segmentDir, _tarDir); uploadSegments(DEFAULT_TABLE_NAME, _tarDir); @@ -99,7 +99,7 @@ protected TableConfig createOfflineTableConfig() { .build(); } - private List createAvroFile() + public static List createAvroFile(File tempDir) throws IOException { // create avro schema @@ -112,7 +112,7 @@ private List createAvroFile() List files = new ArrayList<>(); for (int file = 0; file < FILES_NO; file++) { - File avroFile = new File(_tempDir, "data_" + file + ".avro"); + File avroFile = new File(tempDir, "data_" + file + ".avro"); try (DataFileWriter fileWriter = new DataFileWriter<>(new GenericDatumWriter<>(avroSchema))) { fileWriter.create(avroSchema, avroFile); @@ -128,6 +128,47 @@ private List createAvroFile() return files; } + @Test + public void testOrderByKeysIsNotPushedToFinalAggregationWhenGroupTrimHintIsDisabled() + throws Exception { + String trimDisabledPlan = "Execution Plan\n" + + "LogicalSort(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[DESC], offset=[0], fetch=[1])\n" + + " PinotLogicalSortExchange(distribution=[hash], collation=[[0, 1 DESC]], isSortOnSender=[false], " + + "isSortOnReceiver=[true])\n" + + " LogicalSort(sort0=[$0], sort1=[$1], dir0=[ASC], dir1=[DESC], fetch=[1])\n" + + " PinotLogicalAggregate(group=[{0, 1}], agg#0=[COUNT($2)], aggType=[FINAL])\n" + + " PinotLogicalExchange(distribution=[hash[0, 1]])\n" + + " LeafStageCombineOperator(table=[mytable])\n" + + " StreamingInstanceResponse\n" + + " CombineGroupBy\n" + + " GroupBy(groupKeys=[[i, j]], aggregations=[[count(*)]])\n" + + " Project(columns=[[i, j]])\n" + + " DocIdSet(maxDocs=[40000])\n" + + " FilterMatchEntireSegment(numDocs=[80])\n"; + + assertResultAndPlan( + "", + " select /*+ aggOptions(is_enable_group_trim='false') */ i, j, count(*) as cnt " + + " from " + getTableName() + + " group by i, j " + + " order by i, j desc " + + " limit 1", + "\"i\"[\"INT\"],\t\"j\"[\"LONG\"],\t\"cnt\"[\"LONG\"]\n" + + "0,\t9,\t2", + trimDisabledPlan); + + assertResultAndPlan( + "", + " select i, j, count(*) as cnt " + + " from " + getTableName() + + " group by i, j " + + " order by i, j desc " + + " limit 1", + "\"i\"[\"INT\"],\t\"j\"[\"LONG\"],\t\"cnt\"[\"LONG\"]\n" + + "0,\t9,\t2", + trimDisabledPlan); + } + @Test public void testOrderByKeysIsPushedToFinalAggregationStageWithoutGroupTrimSize() throws Exception { @@ -511,7 +552,7 @@ private JsonNode postV2Query(String query) getExtraQueryProperties()); } - private static @NotNull String toResultStr(JsonNode mainNode) { + static @NotNull String toResultStr(JsonNode mainNode) { if (mainNode == null) { return "null"; } @@ -522,7 +563,7 @@ private JsonNode postV2Query(String query) return toString(node); } - private static @NotNull String toExplainStr(JsonNode mainNode) { + static @NotNull String toExplainStr(JsonNode mainNode) { if (mainNode == null) { return "null"; } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotAggregateExchangeNodeInsertRule.java b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotAggregateExchangeNodeInsertRule.java index 84b2a274aa27..063aea58c60a 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotAggregateExchangeNodeInsertRule.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/calcite/rel/rules/PinotAggregateExchangeNodeInsertRule.java @@ -20,9 +20,11 @@ import com.google.common.collect.ImmutableList; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import javax.annotation.Nullable; +import org.apache.calcite.plan.Context; import org.apache.calcite.plan.RelOptRule; import org.apache.calcite.plan.RelOptRuleCall; import org.apache.calcite.rel.RelCollation; @@ -62,8 +64,10 @@ import org.apache.pinot.calcite.rel.logical.PinotLogicalExchange; import org.apache.pinot.calcite.rel.logical.PinotLogicalSortExchange; import org.apache.pinot.common.function.sql.PinotSqlAggFunction; +import org.apache.pinot.query.QueryEnvironment; import org.apache.pinot.query.planner.plannode.AggregateNode.AggType; import org.apache.pinot.segment.spi.AggregationFunctionType; +import org.apache.pinot.spi.utils.CommonConstants; /** @@ -126,9 +130,11 @@ public void onMatch(RelOptRuleCall call) { } Map hintOptions = PinotHintStrategyTable.getHintOptions(aggRel.getHints(), PinotHintOptions.AGGREGATE_HINT_OPTIONS); - if (hintOptions == null || !Boolean.parseBoolean( - hintOptions.get(PinotHintOptions.AggregateOptions.IS_ENABLE_GROUP_TRIM))) { + + if (!isGroupTrimmingEnabled(call, hintOptions)) { return; + } else if (hintOptions == null) { + hintOptions = Collections.emptyMap(); } Sort sortRel = call.rel(0); @@ -175,11 +181,14 @@ public void onMatch(RelOptRuleCall call) { if (aggRel.getGroupSet().isEmpty()) { return; } + Map hintOptions = PinotHintStrategyTable.getHintOptions(aggRel.getHints(), PinotHintOptions.AGGREGATE_HINT_OPTIONS); - if (hintOptions == null || !Boolean.parseBoolean( - hintOptions.get(PinotHintOptions.AggregateOptions.IS_ENABLE_GROUP_TRIM))) { + + if (!isGroupTrimmingEnabled(call, hintOptions)) { return; + } else if (hintOptions == null) { + hintOptions = Collections.emptyMap(); } Sort sortRel = call.rel(0); @@ -466,4 +475,23 @@ private static List findImmediateProjects(RelNode relNode) { } return null; } + + private static boolean isGroupTrimmingEnabled(RelOptRuleCall call, Map hintOptions) { + if (hintOptions != null) { + String option = hintOptions.get(PinotHintOptions.AggregateOptions.IS_ENABLE_GROUP_TRIM); + if (option != null) { + return Boolean.parseBoolean(option); + } + } + + Context genericContext = call.getPlanner().getContext(); + if (genericContext != null) { + QueryEnvironment.Config context = genericContext.unwrap(QueryEnvironment.Config.class); + if (context != null) { + return context.defaultEnableGroupTrim(); + } + } + + return CommonConstants.Broker.DEFAULT_BROKER_ENABLE_GROUP_TRIM; + } } diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java index 0ff0b7740236..1516dfd0099b 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/QueryEnvironment.java @@ -139,7 +139,7 @@ private PlannerContext getPlannerContext(SqlNodeAndOptions sqlNodeAndOptions) { WorkerManager workerManager = getWorkerManager(sqlNodeAndOptions); HepProgram traitProgram = getTraitProgram(workerManager); return new PlannerContext(_config, _catalogReader, _typeFactory, _optProgram, traitProgram, - sqlNodeAndOptions.getOptions()); + sqlNodeAndOptions.getOptions(), _envConfig); } public Set getResolvedTables() { @@ -510,6 +510,11 @@ default boolean defaultUseSpools() { return CommonConstants.Broker.DEFAULT_OF_SPOOLS; } + @Value.Default + default boolean defaultEnableGroupTrim() { + return CommonConstants.Broker.DEFAULT_BROKER_ENABLE_GROUP_TRIM; + } + /** * Returns the worker manager. * diff --git a/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PlannerContext.java b/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PlannerContext.java index 4505e16da3d8..c797f5a6fe8c 100644 --- a/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PlannerContext.java +++ b/pinot-query-planner/src/main/java/org/apache/pinot/query/context/PlannerContext.java @@ -29,6 +29,7 @@ import org.apache.calcite.rel.type.RelDataTypeFactory; import org.apache.calcite.sql.validate.SqlValidator; import org.apache.calcite.tools.FrameworkConfig; +import org.apache.pinot.query.QueryEnvironment; import org.apache.pinot.query.planner.logical.LogicalPlanner; import org.apache.pinot.query.validate.Validator; @@ -50,11 +51,11 @@ public class PlannerContext implements AutoCloseable { private final Map _options; public PlannerContext(FrameworkConfig config, Prepare.CatalogReader catalogReader, RelDataTypeFactory typeFactory, - HepProgram optProgram, HepProgram traitProgram, Map options) { + HepProgram optProgram, HepProgram traitProgram, Map options, QueryEnvironment.Config envConfig) { _planner = new PlannerImpl(config); _validator = new Validator(config.getOperatorTable(), catalogReader, typeFactory); _relOptPlanner = new LogicalPlanner(optProgram, Contexts.EMPTY_CONTEXT, config.getTraitDefs()); - _relTraitPlanner = new LogicalPlanner(traitProgram, Contexts.EMPTY_CONTEXT, + _relTraitPlanner = new LogicalPlanner(traitProgram, Contexts.of(envConfig), Collections.singletonList(RelDistributionTraitDef.INSTANCE)); _options = options; } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index bdf1cbed0f55..8ab3bd3e1117 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -349,6 +349,11 @@ public static class Broker { "pinot.broker.min.init.indexed.table.capacity"; public static final int DEFAULT_BROKER_MIN_INITIAL_INDEXED_TABLE_CAPACITY = 128; + // this config allows enabling is_enable_group_trim hint for all queries and thus enabling V1-style trimming in + // leaf nodes and v2-style trimming in intermediate nodes + public static final String CONFIG_OF_ENABLE_GROUP_TRIM = "pinot.broker.enable.group.trim"; + public static final boolean DEFAULT_BROKER_ENABLE_GROUP_TRIM = false; + // Configure the request handler type used by broker to handler inbound query request. // NOTE: the request handler type refers to the communication between Broker and Server. public static final String BROKER_REQUEST_HANDLER_TYPE = "pinot.broker.request.handler.type";