Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1372,4 +1372,18 @@ public void testNestedAggregationsExplain() throws IOException {
+ " timestamp, value_range, category",
TEST_INDEX_TIME_DATA)));
}

@Test
public void testTopKThenSortExplain() throws IOException {
enabledOnlyWhenPushdownIsEnabled();
String expected = loadExpectedPlan("explain_top_k_then_sort_push.yaml");
assertYamlEqualsIgnoreId(
expected,
explainQueryYaml(
"source=opensearch-sql_test_index_account"
+ "| sort balance"
+ "| head 5 "
+ "| sort age "
+ "| fields age"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,6 @@ public void init() throws Exception {
enableCalcite();
}

// TODO: Move this test to SortCommandIT once head-then-sort is fixed in v2.
@Test
public void testHeadThenSort() throws IOException {
JSONObject result =
executeQuery(String.format("source=%s | head 2 | sort age | fields age", TEST_INDEX_BANK));
verifyOrder(result, rows(32), rows(36));
}

@Test
public void testPushdownSortPlusExpression() throws IOException {
String ppl =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,14 +265,8 @@ public void testSortThenLimitExplain() throws IOException {
+ "| fields age"));
}

/**
* Push down LIMIT only Sort should NOT be pushed down since DSL process limit before sort when
* they coexist
*/
@Test
public void testLimitThenSortExplain() throws IOException {
// TODO: Fix the expected output in expectedOutput/ppl/explain_limit_then_sort_push.json (v2)
// limit-then-sort should not be pushed down.
String expected = loadExpectedPlan("explain_limit_then_sort_push.yaml");
assertYamlEqualsIgnoreId(
expected,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -253,4 +253,16 @@ public void testSortWithAscMultipleFields() throws IOException {
rows(36, 20),
rows(39, 25));
}

@Test
public void testHeadThenSort() throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nit] Better to check NoPushDownIT case. We may need different branch for these two test cases

JSONObject result =
executeQuery(String.format("source=%s | head 2 | sort age | fields age", TEST_INDEX_BANK));
if (isPushdownDisabled()) {
// Pushdown is disabled, it will retrieve the first 2 docs since there's only 1 shard.
verifyOrder(result, rows(32), rows(36));
} else {
verifyOrder(result, rows(28), rows(32));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,15 @@ calcite:
EnumerableCalc(expr#0..13=[{inputs}], account_number=[$t1], firstname=[$t2], address=[$t3], birthdate=[$t4], gender=[$t5], city=[$t6], lastname=[$t7], balance=[$t8], employer=[$t9], state=[$t10], age=[$t11], email=[$t12], male=[$t13])
EnumerableLimit(fetch=[10000])
EnumerableMergeJoin(condition=[=($0, $1)], joinType=[left])
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]], PushDownContext=[[PROJECT->[account_number], SORT->[{
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]], PushDownContext=[[PROJECT->[account_number], LIMIT->10000, SORT->[{
"account_number" : {
"order" : "asc",
"missing" : "_last"
}
}]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["account_number"],"excludes":[]},"sort":[{"account_number":{"order":"asc","missing":"_last"}}]}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
EnumerableSort(sort0=[$0], dir0=[ASC])
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]], PushDownContext=[[PROJECT->[account_number, firstname, address, birthdate, gender, city, lastname, balance, employer, state, age, email, male], LIMIT->50000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":50000,"timeout":"1m","_source":{"includes":["account_number","firstname","address","birthdate","gender","city","lastname","balance","employer","state","age","email","male"],"excludes":[]}}, requestedTotalSize=50000, pageSize=null, startFrom=0)])
}]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":10000,"timeout":"1m","_source":{"includes":["account_number"],"excludes":[]},"sort":[{"account_number":{"order":"asc","missing":"_last"}}]}, requestedTotalSize=10000, pageSize=null, startFrom=0)])
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]], PushDownContext=[[PROJECT->[account_number, firstname, address, birthdate, gender, city, lastname, balance, employer, state, age, email, male], LIMIT->50000, SORT->[{
"account_number" : {
"order" : "asc",
"missing" : "_last"
}
}]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":50000,"timeout":"1m","_source":{"includes":["account_number","firstname","address","birthdate","gender","city","lastname","balance","employer","state","age","email","male"],"excludes":[]},"sort":[{"account_number":{"order":"asc","missing":"_last"}}]}, requestedTotalSize=50000, pageSize=null, startFrom=0)])
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ calcite:
LogicalSort(fetch=[5])
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
physical: |
EnumerableLimit(fetch=[10000])
EnumerableSort(sort0=[$0], dir0=[ASC-nulls-first])
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[age], LIMIT->5], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":5,"timeout":"1m","_source":{"includes":["age"],"excludes":[]}}, requestedTotalSize=5, pageSize=null, startFrom=0)])
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[age], LIMIT->5, SORT->[{
"age" : {
"order" : "asc",
"missing" : "_first"
}
}], LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":5,"timeout":"1m","_source":{"includes":["age"],"excludes":[]},"sort":[{"age":{"order":"asc","missing":"_first"}}]}, requestedTotalSize=5, pageSize=null, startFrom=0)])
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,9 @@ calcite:
"missing" : "_last"
}
}]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["account_number","firstname","address","birthdate","gender","city","lastname","balance","employer","state","age","email","male"],"excludes":[]},"sort":[{"account_number":{"order":"asc","missing":"_last"}}]}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
EnumerableSort(sort0=[$0], dir0=[ASC])
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]], PushDownContext=[[PROJECT->[account_number, firstname, address, birthdate, gender, city, lastname, balance, employer, state, age, email, male], LIMIT->50000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":50000,"timeout":"1m","_source":{"includes":["account_number","firstname","address","birthdate","gender","city","lastname","balance","employer","state","age","email","male"],"excludes":[]}}, requestedTotalSize=50000, pageSize=null, startFrom=0)])
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]], PushDownContext=[[PROJECT->[account_number, firstname, address, birthdate, gender, city, lastname, balance, employer, state, age, email, male], LIMIT->50000, SORT->[{
"account_number" : {
"order" : "asc",
"missing" : "_last"
}
}]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":50000,"timeout":"1m","_source":{"includes":["account_number","firstname","address","birthdate","gender","city","lastname","balance","employer","state","age","email","male"],"excludes":[]},"sort":[{"account_number":{"order":"asc","missing":"_last"}}]}, requestedTotalSize=50000, pageSize=null, startFrom=0)])
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ calcite:
EnumerableCalc(expr#0..3=[{inputs}], expr#4=[IS NULL($t3)], expr#5=[0:BIGINT], expr#6=[CASE($t4, $t5, $t3)], id=[$t1], name=[$t0], count_dept=[$t6])
EnumerableLimit(fetch=[10000])
EnumerableMergeJoin(condition=[=($1, $2)], joinType=[left])
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_worker]], PushDownContext=[[PROJECT->[name, id], SORT->[{
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_worker]], PushDownContext=[[PROJECT->[name, id], LIMIT->10000, SORT->[{
"id" : {
"order" : "asc",
"missing" : "_last"
}
}]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["name","id"],"excludes":[]},"sort":[{"id":{"order":"asc","missing":"_last"}}]}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_work_information]], PushDownContext=[[PROJECT->[name, uid], FILTER->AND(IS NOT NULL($1), IS NOT NULL($0)), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},count(name)=COUNT($1)), SORT->[0]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"bool":{"must":[{"exists":{"field":"uid","boost":1.0}},{"exists":{"field":"name","boost":1.0}}],"adjust_pure_negative":true,"boost":1.0}},"_source":{"includes":["name","uid"],"excludes":[]},"aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"uid":{"terms":{"field":"uid","missing_bucket":true,"missing_order":"last","order":"asc"}}}]},"aggregations":{"count(name)":{"value_count":{"field":"name"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
}]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":10000,"timeout":"1m","_source":{"includes":["name","id"],"excludes":[]},"sort":[{"id":{"order":"asc","missing":"_last"}}]}, requestedTotalSize=10000, pageSize=null, startFrom=0)])
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_work_information]], PushDownContext=[[PROJECT->[name, uid], FILTER->AND(IS NOT NULL($1), IS NOT NULL($0)), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},count(name)=COUNT($1)), SORT->[0]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"bool":{"must":[{"exists":{"field":"uid","boost":1.0}},{"exists":{"field":"name","boost":1.0}}],"adjust_pure_negative":true,"boost":1.0}},"_source":{"includes":["name","uid"],"excludes":[]},"aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"uid":{"terms":{"field":"uid","missing_bucket":true,"missing_order":"last","order":"asc"}}}]},"aggregations":{"count(name)":{"value_count":{"field":"name"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
calcite:
logical: |
LogicalSystemLimit(sort0=[$0], dir0=[ASC-nulls-first], fetch=[10000], type=[QUERY_SIZE_LIMIT])
LogicalProject(age=[$8])
LogicalSort(sort0=[$8], dir0=[ASC-nulls-first])
LogicalSort(sort0=[$3], dir0=[ASC-nulls-first], fetch=[5])
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
physical: |
EnumerableLimit(fetch=[10000])
EnumerableCalc(expr#0..1=[{inputs}], age=[$t1])
EnumerableSort(sort0=[$1], dir0=[ASC-nulls-first])
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[balance, age], SORT->[{
"balance" : {
"order" : "asc",
"missing" : "_first"
}
}], LIMIT->5], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":5,"timeout":"1m","_source":{"includes":["balance","age"],"excludes":[]},"sort":[{"balance":{"order":"asc","missing":"_first"}}]}, requestedTotalSize=5, pageSize=null, startFrom=0)])
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,9 @@ calcite:
EnumerableCalc(expr#0..3=[{inputs}], expr#4=[1], expr#5=[>($t1, $t4)], expr#6=[CAST($t3):DOUBLE NOT NULL], expr#7=[/($t2, $t6)], expr#8=[null:NULL], expr#9=[CASE($t5, $t7, $t8)], ageTrend=[$t9])
EnumerableWindow(window#0=[window(rows between $1 PRECEDING and CURRENT ROW aggs [COUNT(), $SUM0($0), COUNT($0)])])
EnumerableCalc(expr#0=[{inputs}], expr#1=[IS NOT NULL($t0)], age=[$t0], $condition=[$t1])
EnumerableSort(sort0=[$0], dir0=[ASC])
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[age], LIMIT->5], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":5,"timeout":"1m","_source":{"includes":["age"],"excludes":[]}}, requestedTotalSize=5, pageSize=null, startFrom=0)])
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[age], LIMIT->5, SORT->[{
"age" : {
"order" : "asc",
"missing" : "_last"
}
}]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":5,"timeout":"1m","_source":{"includes":["age"],"excludes":[]},"sort":[{"age":{"order":"asc","missing":"_last"}}]}, requestedTotalSize=5, pageSize=null, startFrom=0)])
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,12 @@ public interface Config extends RelRule.Config {
.oneInput(
b1 ->
b1.operand(AbstractCalciteIndexScan.class)
// Skip the rule if a limit has already been pushed down
// because pushing down a sort after a limit will be treated
// as sort-then-limit by OpenSearch DSL.
// Skip the rule if Top-K(i.e. sort + limit) has already been
// pushed down. Otherwise,
// Continue to push down sort although limit has already been
// pushed down since we don't promise collation with only limit.
.predicate(
Predicate.not(AbstractCalciteIndexScan::isLimitPushed)
Predicate.not(AbstractCalciteIndexScan::isTopKPushed)
Copy link
Member

@LantaoJin LantaoJin Oct 27, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a case for below ppl?

source = t | head 100 | stats count() as cnt | sort cnt

The sort cnt must not be pushed down through head 100.

Cannot for following ppl either.

source = t | head 100 | eval rand = rand() | sort rand

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any sort expression evaluated after limit cannot pushed through limit.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both cases cannot push the sort and it's not related to whether there is limit in the PPL query. Their no push down reason are:

  1. We cannot push down metric sort into agg unless the agg is nullable=false
  2. We don't support script push down for sort currently.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could you add these cases in explain IT:

  1. It's not the case of pushdown sort agg metrics due to there is no bucket.
  2. got it. but better to add a explain IT.

.and(
Predicate.not(
AbstractCalciteIndexScan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -392,4 +392,8 @@ public boolean isLimitPushed() {
public boolean isMetricsOrderPushed() {
return this.getPushDownContext().isMetricOrderPushed();
}

public boolean isTopKPushed() {
return this.getPushDownContext().isTopKPushed();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public class PushDownContext extends AbstractCollection<PushDownOperation> {
private boolean isLimitPushed = false;
private boolean isProjectPushed = false;
private boolean isMetricOrderPushed = false;
private boolean isSortPushed = false;
private boolean isTopKPushed = false;

public PushDownContext(OpenSearchIndex osIndex) {
this.osIndex = osIndex;
Expand Down Expand Up @@ -97,10 +99,16 @@ public boolean add(PushDownOperation operation) {
}
if (operation.type() == PushDownType.LIMIT) {
isLimitPushed = true;
if (isSortPushed || isMetricOrderPushed) {
isTopKPushed = true;
}
}
if (operation.type() == PushDownType.PROJECT) {
isProjectPushed = true;
}
if (operation.type() == PushDownType.SORT) {
isSortPushed = true;
}
if (operation.type() == PushDownType.SORT_AGG_METRICS) {
isMetricOrderPushed = true;
}
Expand Down
Loading