Skip to content

Commit 8f4461d

Browse files
committed
CTE scan operator and CTE materialization memory control
1 parent 37c9191 commit 8f4461d

File tree

16 files changed

+221
-63
lines changed

16 files changed

+221
-63
lines changed

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBConfig.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -426,6 +426,9 @@ public class IoTDBConfig {
426426
/** The buffer for sort operation */
427427
private long sortBufferSize = 32 * 1024 * 1024L;
428428

429+
/** The buffer for cte scan operation */
430+
private long cteBufferSize = 32 * 1024 * 1024L;
431+
429432
/** Mods cache size limit per fi */
430433
private long modsCacheSizeLimitPerFI = 32 * 1024 * 1024;
431434

@@ -1370,6 +1373,7 @@ private void formulateFolders() {
13701373
extPipeDir = addDataHomeDir(extPipeDir);
13711374
queryDir = addDataHomeDir(queryDir);
13721375
sortTmpDir = addDataHomeDir(sortTmpDir);
1376+
cteTmpDir = addDataHomeDir(cteTmpDir);
13731377
formulateDataDirs(tierDataDirs);
13741378
}
13751379

@@ -4076,6 +4080,14 @@ public long getSortBufferSize() {
40764080
return sortBufferSize;
40774081
}
40784082

4083+
public void setCteBufferSize(long cteBufferSize) {
4084+
this.cteBufferSize = cteBufferSize;
4085+
}
4086+
4087+
public long getCteBufferSize() {
4088+
return cteBufferSize;
4089+
}
4090+
40794091
public void setModsCacheSizeLimitPerFI(long modsCacheSizeLimitPerFI) {
40804092
this.modsCacheSizeLimitPerFI = modsCacheSizeLimitPerFI;
40814093
}
@@ -4092,6 +4104,10 @@ public String getSortTmpDir() {
40924104
return sortTmpDir;
40934105
}
40944106

4107+
public void setCteTmpDir(String cteTmpDir) {
4108+
this.cteTmpDir = cteTmpDir;
4109+
}
4110+
40954111
public String getCteTmpDir() {
40964112
return cteTmpDir;
40974113
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/conf/IoTDBDescriptor.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1030,12 +1030,18 @@ public void loadProperties(TrimProperties properties) throws BadNodeUrlException
10301030
// The buffer for sort operator to calculate
10311031
loadFixedSizeLimitForQuery(properties, "sort_buffer_size_in_bytes", conf::setSortBufferSize);
10321032

1033+
// The buffer for cte scan operator
1034+
loadFixedSizeLimitForQuery(properties, "cte_buffer_size_in_bytes", conf::setCteBufferSize);
1035+
10331036
loadFixedSizeLimitForQuery(
10341037
properties, "mods_cache_size_limit_per_fi_in_bytes", conf::setModsCacheSizeLimitPerFI);
10351038

10361039
// tmp filePath for sort operator
10371040
conf.setSortTmpDir(properties.getProperty("sort_tmp_dir", conf.getSortTmpDir()));
10381041

1042+
// tmp filePath for cte
1043+
conf.setCteTmpDir(properties.getProperty("cte_tmp_dir", conf.getCteTmpDir()));
1044+
10391045
conf.setRateLimiterType(properties.getProperty("rate_limiter_type", conf.getRateLimiterType()));
10401046

10411047
conf.setDataNodeSchemaCacheEvictionPolicy(

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/common/MPPQueryContext.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,9 @@ public class MPPQueryContext {
9696

9797
private boolean userQuery = false;
9898

99+
/** check if there is tmp file to be deleted. */
100+
private boolean mayHaveTmpFile = false;
101+
99102
public MPPQueryContext(QueryId queryId) {
100103
this.queryId = queryId;
101104
this.endPointBlackList = ConcurrentHashMap.newKeySet();
@@ -426,4 +429,12 @@ public boolean isQuery() {
426429
public void setUserQuery(boolean userQuery) {
427430
this.userQuery = userQuery;
428431
}
432+
433+
public void setMayHaveTmpFile(boolean mayHaveTmpFile) {
434+
this.mayHaveTmpFile = mayHaveTmpFile;
435+
}
436+
437+
public boolean mayHaveTmpFile() {
438+
return mayHaveTmpFile;
439+
}
429440
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/CteScanOperator.java

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
package org.apache.iotdb.db.queryengine.execution.operator.source.relational;
2323

2424
import org.apache.iotdb.commons.exception.IoTDBException;
25+
import org.apache.iotdb.db.queryengine.execution.MemoryEstimationHelper;
2526
import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
2627
import org.apache.iotdb.db.queryengine.execution.operator.source.AbstractSourceOperator;
2728
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
@@ -30,14 +31,17 @@
3031
import org.apache.iotdb.db.utils.cte.MemoryReader;
3132

3233
import org.apache.tsfile.read.common.block.TsBlock;
34+
import org.apache.tsfile.utils.RamUsageEstimator;
3335
import org.slf4j.Logger;
3436
import org.slf4j.LoggerFactory;
3537

3638
import java.util.ArrayList;
3739
import java.util.List;
3840

3941
public class CteScanOperator extends AbstractSourceOperator {
40-
private static final Logger logger = LoggerFactory.getLogger(CteScanOperator.class);
42+
private static final Logger LOGGER = LoggerFactory.getLogger(CteScanOperator.class);
43+
private static final long INSTANCE_SIZE =
44+
RamUsageEstimator.shallowSizeOfInstance(CteScanOperator.class);
4145

4246
private final CteDataStore dataStore;
4347
private List<CteDataReader> dataReaders;
@@ -82,7 +86,7 @@ public void close() throws Exception {
8286
}
8387
}
8488
} catch (Exception e) {
85-
logger.error("Fail to close fileChannel", e);
89+
LOGGER.error("Fail to close fileChannel", e);
8690
}
8791
}
8892

@@ -93,22 +97,27 @@ public boolean isFinished() throws Exception {
9397

9498
@Override
9599
public long calculateMaxPeekMemory() {
96-
return 0;
100+
if (dataReaders == null || readerIndex >= dataReaders.size()) {
101+
return 0;
102+
}
103+
return dataReaders.get(readerIndex).bytesUsed();
97104
}
98105

99106
@Override
100107
public long calculateMaxReturnSize() {
101-
return 0;
108+
// The returned object is a reference to TsBlock in CteDataReader
109+
return RamUsageEstimator.NUM_BYTES_OBJECT_REF;
102110
}
103111

104112
@Override
105113
public long calculateRetainedSizeAfterCallingNext() {
106-
return 0;
114+
return calculateMaxPeekMemoryWithCounter();
107115
}
108116

109117
@Override
110118
public long ramBytesUsed() {
111-
return 0;
119+
return INSTANCE_SIZE
120+
+ MemoryEstimationHelper.getEstimatedSizeOfAccountableObject(operatorContext);
112121
}
113122

114123
private void prepareReaders() throws IoTDBException {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/execution/QueryExecution.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.apache.iotdb.commons.exception.IoTDBException;
2525
import org.apache.iotdb.commons.exception.IoTDBRuntimeException;
2626
import org.apache.iotdb.commons.service.metric.PerformanceOverviewMetrics;
27+
import org.apache.iotdb.commons.utils.FileUtils;
2728
import org.apache.iotdb.db.conf.IoTDBConfig;
2829
import org.apache.iotdb.db.conf.IoTDBDescriptor;
2930
import org.apache.iotdb.db.exception.query.KilledByOthersException;
@@ -47,6 +48,8 @@
4748
import org.apache.iotdb.db.queryengine.plan.planner.plan.FragmentInstance;
4849
import org.apache.iotdb.db.queryengine.plan.planner.plan.LogicalQueryPlan;
4950
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeUtil;
51+
import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
52+
import org.apache.iotdb.db.queryengine.plan.relational.planner.ir.CteMaterializer;
5053
import org.apache.iotdb.db.queryengine.plan.scheduler.IScheduler;
5154
import org.apache.iotdb.db.utils.SetThreadName;
5255
import org.apache.iotdb.mpp.rpc.thrift.TFragmentInstanceId;
@@ -59,6 +62,7 @@
5962
import org.slf4j.Logger;
6063
import org.slf4j.LoggerFactory;
6164

65+
import java.io.File;
6266
import java.nio.ByteBuffer;
6367
import java.time.format.DateTimeParseException;
6468
import java.util.List;
@@ -349,6 +353,9 @@ private void releaseResource() {
349353
resultHandle.close();
350354
cleanUpResultHandle();
351355
}
356+
if (getSQLDialect().equals(IClientSession.SqlDialect.TABLE)) {
357+
CteMaterializer.cleanUpCTE((Analysis) analysis, context);
358+
}
352359
}
353360

354361
private void cleanUpResultHandle() {
@@ -373,6 +380,15 @@ private void cleanUpResultHandle() {
373380
public void stopAndCleanup(Throwable t) {
374381
stop(t);
375382
releaseResource(t);
383+
384+
try {
385+
// delete cte tmp file if exists
386+
deleteTmpFile();
387+
} catch (Throwable err) {
388+
LOGGER.error(
389+
"Errors occurred while attempting to delete tmp files, potentially leading to resource leakage.",
390+
err);
391+
}
376392
}
377393

378394
/** Release the resources that current QueryExecution hold with a specified exception */
@@ -392,6 +408,9 @@ private void releaseResource(Throwable t) {
392408
}
393409
cleanUpResultHandle();
394410
}
411+
if (getSQLDialect().equals(IClientSession.SqlDialect.TABLE)) {
412+
CteMaterializer.cleanUpCTE((Analysis) analysis, context);
413+
}
395414
}
396415

397416
/**
@@ -712,4 +731,18 @@ public String toString() {
712731
public ScheduledExecutorService getScheduledExecutor() {
713732
return planner.getScheduledExecutorService();
714733
}
734+
735+
private void deleteTmpFile() {
736+
if (context.mayHaveTmpFile()) {
737+
String tmpFilePath =
738+
IoTDBDescriptor.getInstance().getConfig().getCteTmpDir()
739+
+ File.separator
740+
+ context.getQueryId()
741+
+ File.separator;
742+
File tmpFile = new File(tmpFilePath);
743+
if (tmpFile.exists()) {
744+
FileUtils.deleteFileOrDirectory(tmpFile, true);
745+
}
746+
}
747+
}
715748
}

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/PlanNodeType.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,7 @@
117117
import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertTabletNode;
118118
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationTreeDeviceViewScanNode;
119119
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.AssignUniqueId;
120+
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CteScanNode;
120121
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.EnforceSingleRowNode;
121122
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.GapFillNode;
122123
import org.apache.iotdb.db.queryengine.plan.relational.planner.node.GroupNode;
@@ -310,6 +311,7 @@ public enum PlanNodeType {
310311
TABLE_WINDOW_FUNCTION((short) 1032),
311312
TABLE_INTO_NODE((short) 1033),
312313
TABLE_UNION_NODE((short) 1034),
314+
TABLE_CTE_SCAN_NODE((short) 1035),
313315

314316
RELATIONAL_INSERT_TABLET((short) 2000),
315317
RELATIONAL_INSERT_ROW((short) 2001),
@@ -697,6 +699,8 @@ public static PlanNode deserialize(ByteBuffer buffer, short nodeType) {
697699
buffer);
698700
case 1034:
699701
return UnionNode.deserialize(buffer);
702+
case 1035:
703+
return CteScanNode.deserialize(buffer);
700704
case 2000:
701705
return RelationalInsertTabletNode.deserialize(buffer);
702706
case 2001:

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/Analysis.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ public class Analysis implements IAnalysis {
122122

123123
private String updateType;
124124

125-
private final Map<Query, CteDataStore> namedQueriesDataStore = new LinkedHashMap<>();
125+
private final Map<String, CteDataStore> cteDataStores = new LinkedHashMap<>();
126126

127127
private final Map<NodeRef<Table>, Query> namedQueries = new LinkedHashMap<>();
128128

@@ -285,12 +285,16 @@ public Map<NodeRef<Table>, Query> getNamedQueries() {
285285
return namedQueries;
286286
}
287287

288-
public void addCteDataStore(Query query, CteDataStore dataStore) {
289-
namedQueriesDataStore.put(query, dataStore);
288+
public void addCteDataStore(String cteName, CteDataStore dataStore) {
289+
cteDataStores.put(cteName, dataStore);
290290
}
291291

292-
public CteDataStore getCTEDataStore(Query query) {
293-
return namedQueriesDataStore.get(query);
292+
public Map<String, CteDataStore> getCteDataStores() {
293+
return cteDataStores;
294+
}
295+
296+
public CteDataStore getCteDataStore(String cteName) {
297+
return cteDataStores.get(cteName);
294298
}
295299

296300
public boolean isAnalyzed(Expression expression) {

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,7 @@ protected RelationPlan visitTable(final Table table, final Void context) {
248248
}
249249

250250
if (namedQuery.isMaterialized()) {
251-
CteDataStore dataStore = analysis.getCTEDataStore(namedQuery);
251+
CteDataStore dataStore = analysis.getCteDataStore(table.getName().toString());
252252
if (dataStore != null) {
253253
List<Symbol> outputSymbols =
254254
analysis.getOutputDescriptor(table).getAllFields().stream()

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/ir/CteMaterializer.java

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,11 +40,11 @@
4040

4141
import java.io.File;
4242
import java.util.HashSet;
43+
import java.util.Map;
4344
import java.util.Optional;
4445
import java.util.Set;
4546

4647
public class CteMaterializer {
47-
private static final SqlParser relationSqlParser = new SqlParser();
4848

4949
private static final Coordinator coordinator = Coordinator.getInstance();
5050

@@ -57,16 +57,33 @@ public static void materializeCTE(Analysis analysis, MPPQueryContext context) {
5757
.forEach(
5858
(tableRef, query) -> {
5959
if (query.isMaterialized() && !materializedQueries.contains(query)) {
60-
CteDataStore dataStore =
61-
fetchCteQueryResult(tableRef.getNode().getName().toString(), query, context);
60+
if (!context.mayHaveTmpFile()) {
61+
context.setMayHaveTmpFile(true);
62+
}
63+
64+
String cteName = tableRef.getNode().getName().toString();
65+
CteDataStore dataStore = fetchCteQueryResult(cteName, query, context);
6266
if (dataStore != null) {
63-
analysis.addCteDataStore(query, dataStore);
67+
analysis.addCteDataStore(cteName, dataStore);
68+
context.reserveMemoryForFrontEnd(dataStore.getCachedBytes());
6469
materializedQueries.add(query);
6570
}
6671
}
6772
});
6873
}
6974

75+
public static void cleanUpCTE(Analysis analysis, MPPQueryContext context) {
76+
Map<String, CteDataStore> cteDataStores = analysis.getCteDataStores();
77+
cteDataStores
78+
.values()
79+
.forEach(
80+
dataStore -> {
81+
context.releaseMemoryReservedForFrontEnd(dataStore.getCachedBytes());
82+
dataStore.clear();
83+
});
84+
cteDataStores.clear();
85+
}
86+
7087
public static CteDataStore fetchCteQueryResult(
7188
String cteName, Query query, MPPQueryContext context) {
7289
final long queryId = SessionManager.getInstance().requestQueryId();
@@ -91,7 +108,7 @@ public static CteDataStore fetchCteQueryResult(
91108
String folderPath =
92109
IoTDBDescriptor.getInstance().getConfig().getCteTmpDir()
93110
+ File.separator
94-
+ queryId
111+
+ context.getQueryId()
95112
+ File.separator;
96113
CteDataStore cteDataStore =
97114
new CteDataStore(new DiskSpiller(folderPath, folderPath + cteName));

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/CteScanNode.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -119,4 +119,8 @@ protected void serializeAttributes(ByteBuffer byteBuffer) {}
119119

120120
@Override
121121
protected void serializeAttributes(DataOutputStream stream) throws IOException {}
122+
123+
public static CteScanNode deserialize(ByteBuffer byteBuffer) {
124+
return null;
125+
}
122126
}

0 commit comments

Comments
 (0)