diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java index 5f227c94472..b976224148b 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java @@ -1372,4 +1372,18 @@ public void testNestedAggregationsExplain() throws IOException { + " timestamp, value_range, category", TEST_INDEX_TIME_DATA))); } + + @Test + public void testTopKThenSortExplain() throws IOException { + enabledOnlyWhenPushdownIsEnabled(); + String expected = loadExpectedPlan("explain_top_k_then_sort_push.yaml"); + assertYamlEqualsIgnoreId( + expected, + explainQueryYaml( + "source=opensearch-sql_test_index_account" + + "| sort balance" + + "| head 5 " + + "| sort age " + + "| fields age")); + } } diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteSortCommandIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteSortCommandIT.java index 981b418140e..3e01ba9f9d7 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteSortCommandIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteSortCommandIT.java @@ -24,14 +24,6 @@ public void init() throws Exception { enableCalcite(); } - // TODO: Move this test to SortCommandIT once head-then-sort is fixed in v2. - @Test - public void testHeadThenSort() throws IOException { - JSONObject result = - executeQuery(String.format("source=%s | head 2 | sort age | fields age", TEST_INDEX_BANK)); - verifyOrder(result, rows(32), rows(36)); - } - @Test public void testPushdownSortPlusExpression() throws IOException { String ppl = diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/ExplainIT.java b/integ-test/src/test/java/org/opensearch/sql/ppl/ExplainIT.java index 33ec09765d9..6d8f6e17046 100644 --- a/integ-test/src/test/java/org/opensearch/sql/ppl/ExplainIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/ppl/ExplainIT.java @@ -265,14 +265,8 @@ public void testSortThenLimitExplain() throws IOException { + "| fields age")); } - /** - * Push down LIMIT only Sort should NOT be pushed down since DSL process limit before sort when - * they coexist - */ @Test public void testLimitThenSortExplain() throws IOException { - // TODO: Fix the expected output in expectedOutput/ppl/explain_limit_then_sort_push.json (v2) - // limit-then-sort should not be pushed down. String expected = loadExpectedPlan("explain_limit_then_sort_push.yaml"); assertYamlEqualsIgnoreId( expected, diff --git a/integ-test/src/test/java/org/opensearch/sql/ppl/SortCommandIT.java b/integ-test/src/test/java/org/opensearch/sql/ppl/SortCommandIT.java index be2cae9c843..495a7d1673f 100644 --- a/integ-test/src/test/java/org/opensearch/sql/ppl/SortCommandIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/ppl/SortCommandIT.java @@ -253,4 +253,16 @@ public void testSortWithAscMultipleFields() throws IOException { rows(36, 20), rows(39, 25)); } + + @Test + public void testHeadThenSort() throws IOException { + JSONObject result = + executeQuery(String.format("source=%s | head 2 | sort age | fields age", TEST_INDEX_BANK)); + if (isPushdownDisabled()) { + // Pushdown is disabled, it will retrieve the first 2 docs since there's only 1 shard. + verifyOrder(result, rows(32), rows(36)); + } else { + verifyOrder(result, rows(28), rows(32)); + } + } } diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_join_with_fields.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_join_with_fields.yaml index 0dbd15655b0..12259a8e5ae 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite/explain_join_with_fields.yaml +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_join_with_fields.yaml @@ -12,11 +12,15 @@ calcite: EnumerableCalc(expr#0..13=[{inputs}], account_number=[$t1], firstname=[$t2], address=[$t3], birthdate=[$t4], gender=[$t5], city=[$t6], lastname=[$t7], balance=[$t8], employer=[$t9], state=[$t10], age=[$t11], email=[$t12], male=[$t13]) EnumerableLimit(fetch=[10000]) EnumerableMergeJoin(condition=[=($0, $1)], joinType=[left]) - CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]], PushDownContext=[[PROJECT->[account_number], SORT->[{ + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]], PushDownContext=[[PROJECT->[account_number], LIMIT->10000, SORT->[{ "account_number" : { "order" : "asc", "missing" : "_last" } - }]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["account_number"],"excludes":[]},"sort":[{"account_number":{"order":"asc","missing":"_last"}}]}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) - EnumerableSort(sort0=[$0], dir0=[ASC]) - CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]], PushDownContext=[[PROJECT->[account_number, firstname, address, birthdate, gender, city, lastname, balance, employer, state, age, email, male], LIMIT->50000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":50000,"timeout":"1m","_source":{"includes":["account_number","firstname","address","birthdate","gender","city","lastname","balance","employer","state","age","email","male"],"excludes":[]}}, requestedTotalSize=50000, pageSize=null, startFrom=0)]) \ No newline at end of file + }]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":10000,"timeout":"1m","_source":{"includes":["account_number"],"excludes":[]},"sort":[{"account_number":{"order":"asc","missing":"_last"}}]}, requestedTotalSize=10000, pageSize=null, startFrom=0)]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]], PushDownContext=[[PROJECT->[account_number, firstname, address, birthdate, gender, city, lastname, balance, employer, state, age, email, male], LIMIT->50000, SORT->[{ + "account_number" : { + "order" : "asc", + "missing" : "_last" + } + }]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":50000,"timeout":"1m","_source":{"includes":["account_number","firstname","address","birthdate","gender","city","lastname","balance","employer","state","age","email","male"],"excludes":[]},"sort":[{"account_number":{"order":"asc","missing":"_last"}}]}, requestedTotalSize=50000, pageSize=null, startFrom=0)]) diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_limit_then_sort_push.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_limit_then_sort_push.yaml index 1fdacd0c05b..7ab0399e85f 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite/explain_limit_then_sort_push.yaml +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_limit_then_sort_push.yaml @@ -6,6 +6,9 @@ calcite: LogicalSort(fetch=[5]) CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) physical: | - EnumerableLimit(fetch=[10000]) - EnumerableSort(sort0=[$0], dir0=[ASC-nulls-first]) - CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[age], LIMIT->5], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":5,"timeout":"1m","_source":{"includes":["age"],"excludes":[]}}, requestedTotalSize=5, pageSize=null, startFrom=0)]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[age], LIMIT->5, SORT->[{ + "age" : { + "order" : "asc", + "missing" : "_first" + } + }], LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":5,"timeout":"1m","_source":{"includes":["age"],"excludes":[]},"sort":[{"age":{"order":"asc","missing":"_first"}}]}, requestedTotalSize=5, pageSize=null, startFrom=0)]) diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_merge_join_sort_push.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_merge_join_sort_push.yaml index 13a5dfe1d7b..7c5f95fba77 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite/explain_merge_join_sort_push.yaml +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_merge_join_sort_push.yaml @@ -17,5 +17,9 @@ calcite: "missing" : "_last" } }]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["account_number","firstname","address","birthdate","gender","city","lastname","balance","employer","state","age","email","male"],"excludes":[]},"sort":[{"account_number":{"order":"asc","missing":"_last"}}]}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) - EnumerableSort(sort0=[$0], dir0=[ASC]) - CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]], PushDownContext=[[PROJECT->[account_number, firstname, address, birthdate, gender, city, lastname, balance, employer, state, age, email, male], LIMIT->50000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":50000,"timeout":"1m","_source":{"includes":["account_number","firstname","address","birthdate","gender","city","lastname","balance","employer","state","age","email","male"],"excludes":[]}}, requestedTotalSize=50000, pageSize=null, startFrom=0)]) \ No newline at end of file + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]], PushDownContext=[[PROJECT->[account_number, firstname, address, birthdate, gender, city, lastname, balance, employer, state, age, email, male], LIMIT->50000, SORT->[{ + "account_number" : { + "order" : "asc", + "missing" : "_last" + } + }]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":50000,"timeout":"1m","_source":{"includes":["account_number","firstname","address","birthdate","gender","city","lastname","balance","employer","state","age","email","male"],"excludes":[]},"sort":[{"account_number":{"order":"asc","missing":"_last"}}]}, requestedTotalSize=50000, pageSize=null, startFrom=0)]) diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_scalar_correlated_subquery_in_select.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_scalar_correlated_subquery_in_select.yaml index 0cacee911dd..9534086be86 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite/explain_scalar_correlated_subquery_in_select.yaml +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_scalar_correlated_subquery_in_select.yaml @@ -13,10 +13,10 @@ calcite: EnumerableCalc(expr#0..3=[{inputs}], expr#4=[IS NULL($t3)], expr#5=[0:BIGINT], expr#6=[CASE($t4, $t5, $t3)], id=[$t1], name=[$t0], count_dept=[$t6]) EnumerableLimit(fetch=[10000]) EnumerableMergeJoin(condition=[=($1, $2)], joinType=[left]) - CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_worker]], PushDownContext=[[PROJECT->[name, id], SORT->[{ + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_worker]], PushDownContext=[[PROJECT->[name, id], LIMIT->10000, SORT->[{ "id" : { "order" : "asc", "missing" : "_last" } - }]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["name","id"],"excludes":[]},"sort":[{"id":{"order":"asc","missing":"_last"}}]}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) - CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_work_information]], PushDownContext=[[PROJECT->[name, uid], FILTER->AND(IS NOT NULL($1), IS NOT NULL($0)), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},count(name)=COUNT($1)), SORT->[0]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"bool":{"must":[{"exists":{"field":"uid","boost":1.0}},{"exists":{"field":"name","boost":1.0}}],"adjust_pure_negative":true,"boost":1.0}},"_source":{"includes":["name","uid"],"excludes":[]},"aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"uid":{"terms":{"field":"uid","missing_bucket":true,"missing_order":"last","order":"asc"}}}]},"aggregations":{"count(name)":{"value_count":{"field":"name"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) \ No newline at end of file + }]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":10000,"timeout":"1m","_source":{"includes":["name","id"],"excludes":[]},"sort":[{"id":{"order":"asc","missing":"_last"}}]}, requestedTotalSize=10000, pageSize=null, startFrom=0)]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_work_information]], PushDownContext=[[PROJECT->[name, uid], FILTER->AND(IS NOT NULL($1), IS NOT NULL($0)), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},count(name)=COUNT($1)), SORT->[0]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"bool":{"must":[{"exists":{"field":"uid","boost":1.0}},{"exists":{"field":"name","boost":1.0}}],"adjust_pure_negative":true,"boost":1.0}},"_source":{"includes":["name","uid"],"excludes":[]},"aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"uid":{"terms":{"field":"uid","missing_bucket":true,"missing_order":"last","order":"asc"}}}]},"aggregations":{"count(name)":{"value_count":{"field":"name"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)]) diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_top_k_then_sort_push.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_top_k_then_sort_push.yaml new file mode 100644 index 00000000000..ec71a9d1130 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_top_k_then_sort_push.yaml @@ -0,0 +1,17 @@ +calcite: + logical: | + LogicalSystemLimit(sort0=[$0], dir0=[ASC-nulls-first], fetch=[10000], type=[QUERY_SIZE_LIMIT]) + LogicalProject(age=[$8]) + LogicalSort(sort0=[$8], dir0=[ASC-nulls-first]) + LogicalSort(sort0=[$3], dir0=[ASC-nulls-first], fetch=[5]) + CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]]) + physical: | + EnumerableLimit(fetch=[10000]) + EnumerableCalc(expr#0..1=[{inputs}], age=[$t1]) + EnumerableSort(sort0=[$1], dir0=[ASC-nulls-first]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[balance, age], SORT->[{ + "balance" : { + "order" : "asc", + "missing" : "_first" + } + }], LIMIT->5], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":5,"timeout":"1m","_source":{"includes":["balance","age"],"excludes":[]},"sort":[{"balance":{"order":"asc","missing":"_first"}}]}, requestedTotalSize=5, pageSize=null, startFrom=0)]) diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_trendline_sort_push.yaml b/integ-test/src/test/resources/expectedOutput/calcite/explain_trendline_sort_push.yaml index 2003143e673..6a3752840b6 100644 --- a/integ-test/src/test/resources/expectedOutput/calcite/explain_trendline_sort_push.yaml +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_trendline_sort_push.yaml @@ -11,5 +11,9 @@ calcite: EnumerableCalc(expr#0..3=[{inputs}], expr#4=[1], expr#5=[>($t1, $t4)], expr#6=[CAST($t3):DOUBLE NOT NULL], expr#7=[/($t2, $t6)], expr#8=[null:NULL], expr#9=[CASE($t5, $t7, $t8)], ageTrend=[$t9]) EnumerableWindow(window#0=[window(rows between $1 PRECEDING and CURRENT ROW aggs [COUNT(), $SUM0($0), COUNT($0)])]) EnumerableCalc(expr#0=[{inputs}], expr#1=[IS NOT NULL($t0)], age=[$t0], $condition=[$t1]) - EnumerableSort(sort0=[$0], dir0=[ASC]) - CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[age], LIMIT->5], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":5,"timeout":"1m","_source":{"includes":["age"],"excludes":[]}}, requestedTotalSize=5, pageSize=null, startFrom=0)]) + CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[age], LIMIT->5, SORT->[{ + "age" : { + "order" : "asc", + "missing" : "_last" + } + }]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":5,"timeout":"1m","_source":{"includes":["age"],"excludes":[]},"sort":[{"age":{"order":"asc","missing":"_last"}}]}, requestedTotalSize=5, pageSize=null, startFrom=0)]) diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchSortIndexScanRule.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchSortIndexScanRule.java index b519c50a1ec..47d0b73a935 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchSortIndexScanRule.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/planner/physical/OpenSearchSortIndexScanRule.java @@ -48,11 +48,12 @@ public interface Config extends RelRule.Config { .oneInput( b1 -> b1.operand(AbstractCalciteIndexScan.class) - // Skip the rule if a limit has already been pushed down - // because pushing down a sort after a limit will be treated - // as sort-then-limit by OpenSearch DSL. + // Skip the rule if Top-K(i.e. sort + limit) has already been + // pushed down. Otherwise, + // Continue to push down sort although limit has already been + // pushed down since we don't promise collation with only limit. .predicate( - Predicate.not(AbstractCalciteIndexScan::isLimitPushed) + Predicate.not(AbstractCalciteIndexScan::isTopKPushed) .and( Predicate.not( AbstractCalciteIndexScan diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/AbstractCalciteIndexScan.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/AbstractCalciteIndexScan.java index c3f5d2fe4e4..4c15cb8005d 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/AbstractCalciteIndexScan.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/AbstractCalciteIndexScan.java @@ -392,4 +392,8 @@ public boolean isLimitPushed() { public boolean isMetricsOrderPushed() { return this.getPushDownContext().isMetricOrderPushed(); } + + public boolean isTopKPushed() { + return this.getPushDownContext().isTopKPushed(); + } } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/context/PushDownContext.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/context/PushDownContext.java index dd36c2090b9..93e1798ba47 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/context/PushDownContext.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/context/PushDownContext.java @@ -29,6 +29,8 @@ public class PushDownContext extends AbstractCollection { private boolean isLimitPushed = false; private boolean isProjectPushed = false; private boolean isMetricOrderPushed = false; + private boolean isSortPushed = false; + private boolean isTopKPushed = false; public PushDownContext(OpenSearchIndex osIndex) { this.osIndex = osIndex; @@ -97,10 +99,16 @@ public boolean add(PushDownOperation operation) { } if (operation.type() == PushDownType.LIMIT) { isLimitPushed = true; + if (isSortPushed || isMetricOrderPushed) { + isTopKPushed = true; + } } if (operation.type() == PushDownType.PROJECT) { isProjectPushed = true; } + if (operation.type() == PushDownType.SORT) { + isSortPushed = true; + } if (operation.type() == PushDownType.SORT_AGG_METRICS) { isMetricOrderPushed = true; }