-
Notifications
You must be signed in to change notification settings - Fork 1.1k
[To dev/1.3] Improve DeviceViewIntoOperator's return style to pipeline #16980
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## dev/1.3 #16980 +/- ##
==========================================
Coverage 41.24% 41.25%
Complexity 198 198
==========================================
Files 3594 3594
Lines 236153 236195 +42
Branches 28526 28542 +16
==========================================
+ Hits 97408 97434 +26
- Misses 138745 138761 +16 ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pull request overview
This pull request introduces support for partial result returning in the DeviceViewIntoOperator and refactors the SELECT INTO statement processing. The key changes include adding a new abstract method for returning partial results and removing upfront schema validation in favor of runtime validation during data insertion.
Key changes:
- Added
tryToReturnPartialResult()method toAbstractIntoOperatorand its subclasses to enable streaming of intermediate results - Removed schema fetching and type validation from the analysis phase, deferring error detection to insertion time
- Refactored method signatures by removing redundant
statusCodeparameter fromsetRedirectInfo()
Reviewed changes
Copilot reviewed 11 out of 11 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| AbstractIntoOperator.java | Added abstract method tryToReturnPartialResult() and modified next() to call it when full results aren't ready |
| DeviceViewIntoOperator.java | Implemented partial result handling, modified device switching logic, and overrode constructInsertMultiTabletsStatement() |
| IntoOperator.java | Provided default null implementation of tryToReturnPartialResult() to preserve existing behavior |
| SelectIntoUtils.java | Removed schema validation and type compatibility checking logic from bindTypeForSourceTargetPathPairList() |
| AnalyzeVisitor.java | Removed schema fetching and validation calls for both device view and regular INTO operations |
| IntoPathDescriptor.java | Removed ISchemaTree parameter from bindType() method |
| DeviceViewIntoPathDescriptor.java | Removed ISchemaTree parameter from bindType() method |
| IPlanner.java | Removed redundant statusCode parameter from setRedirectInfo() method |
| TreeModelPlanner.java | Updated setRedirectInfo() to get status code from TSStatus object directly |
| QueryExecution.java | Updated call to setRedirectInfo() removing statusCode parameter |
| IoTDBSelectIntoIT.java | Updated test expectations to reflect runtime type checking errors instead of semantic validation errors |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
.../java/org/apache/iotdb/db/queryengine/execution/operator/process/DeviceViewIntoOperator.java
Show resolved
Hide resolved
| if (writtenCountInCurrentGenerator >= 0) { | ||
| return writtenCountInCurrentGenerator; | ||
| } | ||
| continue; |
Copilot
AI
Jan 6, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The continue statement at line 264 is redundant. After the return statement at line 262, the loop would naturally continue to the next iteration anyway. This redundant statement reduces code clarity.
| continue; |
.../java/org/apache/iotdb/db/queryengine/execution/operator/process/DeviceViewIntoOperator.java
Show resolved
Hide resolved
| if (!Objects.equals(device, currentDevice)) { | ||
| final InsertMultiTabletsStatement insertMultiTabletsStatement = | ||
| constructInsertMultiTabletsStatement(false); | ||
| updateResultTsBlock(); |
Copilot
AI
Jan 6, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's a potential issue where updateResultTsBlock() is called at line 111 without checking if resultTsBlockBuilder is already full. If the builder becomes full during the update, subsequent device switches or updates may fail to add data properly.
Additionally, when the builder becomes full and returns a partial result via tryToReturnPartialResult() (which resets the builder), the code should ensure that the current device's result information is still properly recorded. Currently, when switching devices at line 108-111, updateResultTsBlock() is called for the previous device, but if this causes the builder to become full, the builder might be reset in tryToReturnPartialResult() before the update is complete.
Consider adding a check before calling updateResultTsBlock() or ensuring that partial result handling happens at appropriate boundaries.
integration-test/src/test/java/org/apache/iotdb/db/it/selectinto/IoTDBSelectIntoIT.java
Show resolved
Hide resolved
...core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/analyze/AnalyzeVisitor.java
Show resolved
Hide resolved
| if (insertTabletStatementGenerators == null | ||
| || (needCheck && !existFullStatement(insertTabletStatementGenerators))) { | ||
| return null; | ||
| } | ||
|
|
||
| List<InsertTabletStatement> insertTabletStatementList = new ArrayList<>(); | ||
| try { | ||
| if (child.hasNextWithTimer() | ||
| && batchedRowCount < CONFIG.getSelectIntoInsertTabletPlanRowLimit()) { | ||
| return null; | ||
| } | ||
| } catch (InterruptedException e) { | ||
| Thread.currentThread().interrupt(); | ||
| throw new IntoProcessException(e.getMessage()); | ||
| } catch (Exception e) { | ||
| throw new IntoProcessException(e.getMessage()); | ||
| } |
Copilot
AI
Jan 6, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The logic in this method has a potential issue with the ordering of checks. The method checks child.hasNextWithTimer() at line 229, which is a potentially blocking I/O operation that queries whether more data is available. This check happens inside the method that's called during processTsBlock, which means:
- It performs I/O checks even when
needCheckis true and the statement is already full (line 223) - The
child.hasNextWithTimer()call could throw InterruptedException or other exceptions, which are caught and wrapped in IntoProcessException - This introduces a dependency on the child operator's state within what should be a simple statement construction method
A cleaner design would be to:
- Check the
needCheckandexistFullStatementconditions first - Only perform the
child.hasNextWithTimer()check if those pass - Consider whether this I/O operation belongs in this method at all, or if the decision to flush should be made at a higher level
Additionally, the broad exception catch at line 236 (catch (Exception e)) masks potential programming errors and makes debugging difficult. Consider catching only expected exception types.
| if (insertTabletStatementGenerators == null | |
| || (needCheck && !existFullStatement(insertTabletStatementGenerators))) { | |
| return null; | |
| } | |
| List<InsertTabletStatement> insertTabletStatementList = new ArrayList<>(); | |
| try { | |
| if (child.hasNextWithTimer() | |
| && batchedRowCount < CONFIG.getSelectIntoInsertTabletPlanRowLimit()) { | |
| return null; | |
| } | |
| } catch (InterruptedException e) { | |
| Thread.currentThread().interrupt(); | |
| throw new IntoProcessException(e.getMessage()); | |
| } catch (Exception e) { | |
| throw new IntoProcessException(e.getMessage()); | |
| } | |
| if (insertTabletStatementGenerators == null) { | |
| return null; | |
| } | |
| boolean hasFullStatement = existFullStatement(insertTabletStatementGenerators); | |
| if (needCheck) { | |
| // When needCheck is true, we only proceed if there already exists a full statement. | |
| if (!hasFullStatement) { | |
| return null; | |
| } | |
| } else { | |
| // When needCheck is false, we may delay flushing to accumulate more rows | |
| // if the batch is not yet at the configured row limit and the child has more data. | |
| try { | |
| if (batchedRowCount < CONFIG.getSelectIntoInsertTabletPlanRowLimit() | |
| && child.hasNextWithTimer()) { | |
| return null; | |
| } | |
| } catch (InterruptedException e) { | |
| Thread.currentThread().interrupt(); | |
| throw new IntoProcessException(e.getMessage(), e); | |
| } catch (Exception e) { | |
| throw new IntoProcessException(e.getMessage(), e); | |
| } | |
| } | |
| List<InsertTabletStatement> insertTabletStatementList = new ArrayList<>(); |
.../java/org/apache/iotdb/db/queryengine/execution/operator/process/DeviceViewIntoOperator.java
Show resolved
Hide resolved
|



This pull request introduces support for partial result returning in the
AbstractIntoOperatorand its subclasses. The changes enable operators to return intermediate results when their internal buffers are full, improving efficiency and responsiveness in streaming scenarios. The most important changes are:Core logic changes:
tryToReturnPartialResult()toAbstractIntoOperator, and updated thenext()method to use it when a full result is not ready. This allows subclasses to provide partial results when possible. [1] [2]Subclass implementations:
tryToReturnPartialResult()inDeviceViewIntoOperatorto return a builtTsBlockif the internalresultTsBlockBuilderis full, and reset the builder afterwards.tryToReturnPartialResult()inIntoOperatorthat always returnsnull, preserving existing behavior where partial results are not supported.