Skip to content

Commit be92fcd

Browse files
authored
Fix sort properties process of AggregationNode when generate distribution plan
1 parent d16ec65 commit be92fcd

File tree

3 files changed

+150
-9
lines changed

3 files changed

+150
-9
lines changed
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
20+
package org.apache.iotdb.relational.it.query.recent;
21+
22+
import org.apache.iotdb.it.env.EnvFactory;
23+
import org.apache.iotdb.it.framework.IoTDBTestRunner;
24+
import org.apache.iotdb.itbase.category.TableClusterIT;
25+
import org.apache.iotdb.itbase.category.TableLocalStandaloneIT;
26+
27+
import org.junit.AfterClass;
28+
import org.junit.BeforeClass;
29+
import org.junit.Test;
30+
import org.junit.experimental.categories.Category;
31+
import org.junit.runner.RunWith;
32+
33+
import static org.apache.iotdb.db.it.utils.TestUtils.prepareTableData;
34+
import static org.apache.iotdb.db.it.utils.TestUtils.tableResultSetEqualTest;
35+
36+
@RunWith(IoTDBTestRunner.class)
37+
@Category({TableLocalStandaloneIT.class, TableClusterIT.class})
38+
public class IoTDBComplexQueryIT {
39+
protected static final String DATABASE_NAME = "test_db";
40+
protected static final String[] createSqls =
41+
new String[] {
42+
"CREATE DATABASE " + DATABASE_NAME,
43+
"USE " + DATABASE_NAME,
44+
"create table employees(department_id STRING TAG,remark STRING ATTRIBUTE,name TEXT FIELD,Gender TEXT FIELD,Status BOOLEAN FIELD,employee_id INT32 FIELD,salary DOUBLE FIELD,date_of_birth DATE FIELD,Contac_info string FIELD)",
45+
"create table departments(department_id STRING TAG,dep_description STRING ATTRIBUTE,dep_name TEXT FIELD,dep_phone TEXT FIELD,dep_status BOOLEAN FIELD,dep_member INT32 FIELD,employee_id INT32 FIELD)",
46+
"insert into employees(time, department_id, remark, name, gender, status, employee_id, salary, date_of_birth, contac_info) values(1, 'D001', 'good', 'Mary','Female', false, 1223, 5500.22, '1988-10-12', '133-1212-1234')",
47+
"insert into employees(time, department_id, remark, name, gender, status, employee_id, salary, date_of_birth, contac_info) values(2, 'D001', 'great', 'John', 'Male', true, 40012, 8822, '1985-06-15', '130-1002-1334')",
48+
"insert into employees(time, department_id, remark, name, gender, status, employee_id, salary, date_of_birth, contac_info) values(3, 'D002', 'excellent', 'Nancy', 'Female', true, 30112, 10002, '1983-08-15', '135-1302-1354')",
49+
"insert into employees(time, department_id, remark, name, gender, status, employee_id, salary, date_of_birth, contac_info) values(4, 'D002', 'good', 'Jack', 'Male', false, 12212, 7000, '1990-03-26', '138-1012-1353')",
50+
"insert into employees(time, department_id, remark, name, gender, status, employee_id, salary, date_of_birth, contac_info) values(5, 'D003', 'great', 'Linda', 'Female', false, 10212, 5600, '1995-06-15', '150-2003-1355')",
51+
"insert into departments(time, department_id, dep_description, dep_name, dep_phone, dep_status, dep_member,employee_id) values(1, 'D001', 'goods','销售部', '010-2271-2120', false, 1223,1223)",
52+
"insert into departments(time, department_id, dep_description, dep_name, dep_phone, dep_status, dep_member,employee_id) values(2, 'D001', 'goods','销售部', '010-2271-2120', false, 102, 40012)",
53+
"insert into departments(time, department_id, dep_description, dep_name, dep_phone, dep_status, dep_member,employee_id) values(3, 'D002', 'service','客服部', '010-2077-2520', true, 220, 30112)",
54+
"insert into departments(time, department_id, dep_description, dep_name, dep_phone, dep_status, dep_member,employee_id) values(4, 'D002', 'service','客服部', '010-2077-2520', true, 2012, 12212)",
55+
"insert into departments(time, department_id, dep_description, dep_name, dep_phone, dep_status, dep_member,employee_id) values(5, 'D003', 'IT','研发部', '010-3272-2310', true, 300, 10212)",
56+
"insert into departments(time, department_id, dep_description, dep_name, dep_phone, dep_status, dep_member,employee_id) values(6, 'D004', 'IT','人事部', '010-3272-2312', true, 300, 10200)",
57+
"FLUSH",
58+
"CLEAR ATTRIBUTE CACHE",
59+
};
60+
61+
@BeforeClass
62+
public static void setUp() throws Exception {
63+
EnvFactory.getEnv().initClusterEnvironment();
64+
prepareTableData(createSqls);
65+
}
66+
67+
@AfterClass
68+
public static void tearDown() throws Exception {
69+
EnvFactory.getEnv().cleanClusterEnvironment();
70+
}
71+
72+
@Test
73+
public void queryTest1() {
74+
// Look for the non-intersecting departments in the two tables
75+
String[] expectedHeader = new String[] {"department_id", "dep_name"};
76+
String[] retArray = new String[] {"D004,人事部,"};
77+
tableResultSetEqualTest(
78+
"select department_id, dep_name from departments where not exists("
79+
+ "select 1 from employees where employees.department_id = departments.department_id)",
80+
expectedHeader,
81+
retArray,
82+
DATABASE_NAME);
83+
}
84+
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java

Lines changed: 37 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -817,14 +817,32 @@ public List<PlanNode> visitInformationSchemaTableScan(
817817

818818
@Override
819819
public List<PlanNode> visitAggregation(AggregationNode node, PlanContext context) {
820+
List<Symbol> preGroupedSymbols = node.getPreGroupedSymbols();
821+
OrderingScheme expectedOrderingSchema;
820822
if (node.isStreamable()) {
821-
OrderingScheme expectedOrderingSchema = constructOrderingSchema(node.getPreGroupedSymbols());
823+
expectedOrderingSchema = constructOrderingSchema(preGroupedSymbols);
822824
context.setExpectedOrderingScheme(expectedOrderingSchema);
825+
} else {
826+
expectedOrderingSchema = null;
827+
context.clearExpectedOrderingScheme();
823828
}
829+
824830
List<PlanNode> childrenNodes = node.getChild().accept(this, context);
825831
OrderingScheme childOrdering = nodeOrderingMap.get(childrenNodes.get(0).getPlanNodeId());
826-
if (childOrdering != null) {
827-
nodeOrderingMap.put(node.getPlanNodeId(), childOrdering);
832+
if (node.isStreamable()) {
833+
// Child has Ordering, we need to check if it is the Ordering we expected
834+
if (childOrdering != null) {
835+
if (prefixMatched(childOrdering, node.getPreGroupedSymbols())) {
836+
nodeOrderingMap.put(node.getPlanNodeId(), expectedOrderingSchema);
837+
} else {
838+
throw new IllegalStateException(
839+
String.format(
840+
"Should never reach here. Child ordering: %s. PreGroupedSymbols: %s",
841+
childOrdering.getOrderBy(), node.getPreGroupedSymbols()));
842+
}
843+
}
844+
// Child has no Ordering, do nothing here because the logical optimizer
845+
// 'TransformAggregationToStreamable' will ensure the grouped property of child
828846
}
829847

830848
if (childrenNodes.size() == 1) {
@@ -861,8 +879,8 @@ public List<PlanNode> visitAggregation(AggregationNode node, PlanContext context
861879
intermediate.getStep(),
862880
intermediate.getHashSymbol(),
863881
intermediate.getGroupIdSymbol());
864-
if (node.isStreamable()) {
865-
nodeOrderingMap.put(planNodeId, childOrdering);
882+
if (node.isStreamable() && childOrdering != null) {
883+
nodeOrderingMap.put(planNodeId, expectedOrderingSchema);
866884
}
867885
return aggregationNode;
868886
})
@@ -873,6 +891,20 @@ public List<PlanNode> visitAggregation(AggregationNode node, PlanContext context
873891
return Collections.singletonList(splitResult.left);
874892
}
875893

894+
private boolean prefixMatched(OrderingScheme childOrdering, List<Symbol> preGroupedSymbols) {
895+
List<Symbol> orderKeys = childOrdering.getOrderBy();
896+
if (orderKeys.size() < preGroupedSymbols.size()) {
897+
return false;
898+
}
899+
900+
for (int i = 0; i < preGroupedSymbols.size(); i++) {
901+
if (!orderKeys.get(i).equals(preGroupedSymbols.get(i))) {
902+
return false;
903+
}
904+
}
905+
return true;
906+
}
907+
876908
@Override
877909
public List<PlanNode> visitAggregationTableScan(
878910
AggregationTableScanNode node, PlanContext context) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/TransformAggregationToStreamable.java

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,17 @@
2323
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanVisitor;
2424
import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
2525
import org.apache.iotdb.db.queryengine.plan.relational.planner.DataOrganizationSpecification;
26+
import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme;
2627
import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
2728
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode;
2829
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTableScanNode;
2930
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.DeviceTableScanNode;
31+
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FillNode;
3032
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MergeSortNode;
33+
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
3134
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
3235
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableFunctionProcessorNode;
36+
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ValueFillNode;
3337

3438
import com.google.common.collect.ImmutableList;
3539
import com.google.common.collect.ImmutableSet;
@@ -109,13 +113,13 @@ public List<Symbol> visitPlan(PlanNode node, GroupContext context) {
109113

110114
@Override
111115
public List<Symbol> visitMergeSort(MergeSortNode node, GroupContext context) {
112-
return node.getChildren().get(0).accept(this, context);
116+
return getMatchedPrefixSymbols(context, node.getOrderingScheme());
113117
}
114118

115-
@Override
116-
public List<Symbol> visitSort(SortNode node, GroupContext context) {
119+
private List<Symbol> getMatchedPrefixSymbols(
120+
GroupContext context, OrderingScheme orderingScheme) {
117121
Set<Symbol> expectedGroupingKeys = context.groupingKeys;
118-
List<Symbol> orderKeys = node.getOrderingScheme().getOrderBy();
122+
List<Symbol> orderKeys = orderingScheme.getOrderBy();
119123
for (int i = 0; i < orderKeys.size(); i++) {
120124
if (!expectedGroupingKeys.contains(orderKeys.get(i))) {
121125
return orderKeys.subList(0, i);
@@ -124,6 +128,27 @@ public List<Symbol> visitSort(SortNode node, GroupContext context) {
124128
return ImmutableList.of();
125129
}
126130

131+
@Override
132+
public List<Symbol> visitProject(ProjectNode node, GroupContext context) {
133+
if (ImmutableSet.copyOf(node.getOutputSymbols()).containsAll(context.groupingKeys)) {
134+
return node.getChild().accept(this, context);
135+
}
136+
return ImmutableList.of();
137+
}
138+
139+
@Override
140+
public List<Symbol> visitFill(FillNode node, GroupContext context) {
141+
if (node instanceof ValueFillNode) {
142+
return ImmutableList.of();
143+
}
144+
return node.getChild().accept(this, context);
145+
}
146+
147+
@Override
148+
public List<Symbol> visitSort(SortNode node, GroupContext context) {
149+
return getMatchedPrefixSymbols(context, node.getOrderingScheme());
150+
}
151+
127152
@Override
128153
public List<Symbol> visitTableFunctionProcessor(
129154
TableFunctionProcessorNode node, GroupContext context) {

0 commit comments

Comments
 (0)