Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1393,4 +1393,18 @@ public void testNestedAggregationsExplain() throws IOException {
+ " timestamp, value_range, category",
TEST_INDEX_TIME_DATA)));
}

@Test
public void testTopKThenSortExplain() throws IOException {
enabledOnlyWhenPushdownIsEnabled();
String expected = loadExpectedPlan("explain_top_k_then_sort_push.yaml");
assertYamlEqualsIgnoreId(
expected,
explainQueryYaml(
"source=opensearch-sql_test_index_account"
+ "| sort balance"
+ "| head 5 "
+ "| sort age "
+ "| fields age"));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,6 @@ public void init() throws Exception {
enableCalcite();
}

// TODO: Move this test to SortCommandIT once head-then-sort is fixed in v2.
@Test
public void testHeadThenSort() throws IOException {
JSONObject result =
executeQuery(String.format("source=%s | head 2 | sort age | fields age", TEST_INDEX_BANK));
verifyOrder(result, rows(32), rows(36));
}

@Test
public void testPushdownSortPlusExpression() throws IOException {
String ppl =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,14 +302,8 @@ public void testSortThenLimitExplain() throws IOException {
+ "| fields age"));
}

/**
* Push down LIMIT only Sort should NOT be pushed down since DSL process limit before sort when
* they coexist
*/
@Test
public void testLimitThenSortExplain() throws IOException {
// TODO: Fix the expected output in expectedOutput/ppl/explain_limit_then_sort_push.json (v2)
// limit-then-sort should not be pushed down.
String expected = loadExpectedPlan("explain_limit_then_sort_push.yaml");
assertYamlEqualsIgnoreId(
expected,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -254,4 +254,16 @@ public void testSortWithAscMultipleFields() throws IOException {
rows(36, 20),
rows(39, 25));
}

@Test
public void testHeadThenSort() throws IOException {
JSONObject result =
executeQuery(String.format("source=%s | head 2 | sort age | fields age", TEST_INDEX_BANK));
if (isPushdownDisabled()) {
// Pushdown is disabled, it will retrieve the first 2 docs since there's only 1 shard.
verifyOrder(result, rows(32), rows(36));
} else {
verifyOrder(result, rows(28), rows(32));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,15 @@ calcite:
EnumerableCalc(expr#0..13=[{inputs}], account_number=[$t1], firstname=[$t2], address=[$t3], birthdate=[$t4], gender=[$t5], city=[$t6], lastname=[$t7], balance=[$t8], employer=[$t9], state=[$t10], age=[$t11], email=[$t12], male=[$t13])
EnumerableLimit(fetch=[10000])
EnumerableMergeJoin(condition=[=($0, $1)], joinType=[left])
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]], PushDownContext=[[PROJECT->[account_number], SORT->[{
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]], PushDownContext=[[PROJECT->[account_number], LIMIT->10000, SORT->[{
"account_number" : {
"order" : "asc",
"missing" : "_last"
}
}]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["account_number"],"excludes":[]},"sort":[{"account_number":{"order":"asc","missing":"_last"}}]}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
EnumerableSort(sort0=[$0], dir0=[ASC])
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]], PushDownContext=[[PROJECT->[account_number, firstname, address, birthdate, gender, city, lastname, balance, employer, state, age, email, male], LIMIT->50000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":50000,"timeout":"1m","_source":{"includes":["account_number","firstname","address","birthdate","gender","city","lastname","balance","employer","state","age","email","male"],"excludes":[]}}, requestedTotalSize=50000, pageSize=null, startFrom=0)])
}]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":10000,"timeout":"1m","_source":{"includes":["account_number"],"excludes":[]},"sort":[{"account_number":{"order":"asc","missing":"_last"}}]}, requestedTotalSize=10000, pageSize=null, startFrom=0)])
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]], PushDownContext=[[PROJECT->[account_number, firstname, address, birthdate, gender, city, lastname, balance, employer, state, age, email, male], LIMIT->50000, SORT->[{
"account_number" : {
"order" : "asc",
"missing" : "_last"
}
}]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":50000,"timeout":"1m","_source":{"includes":["account_number","firstname","address","birthdate","gender","city","lastname","balance","employer","state","age","email","male"],"excludes":[]},"sort":[{"account_number":{"order":"asc","missing":"_last"}}]}, requestedTotalSize=50000, pageSize=null, startFrom=0)])
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ calcite:
LogicalSort(fetch=[5])
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
physical: |
EnumerableLimit(fetch=[10000])
EnumerableSort(sort0=[$0], dir0=[ASC-nulls-first])
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[age], LIMIT->5], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":5,"timeout":"1m","_source":{"includes":["age"],"excludes":[]}}, requestedTotalSize=5, pageSize=null, startFrom=0)])
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[age], LIMIT->5, SORT->[{
"age" : {
"order" : "asc",
"missing" : "_first"
}
}], LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":5,"timeout":"1m","_source":{"includes":["age"],"excludes":[]},"sort":[{"age":{"order":"asc","missing":"_first"}}]}, requestedTotalSize=5, pageSize=null, startFrom=0)])
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,9 @@ calcite:
"missing" : "_last"
}
}]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["account_number","firstname","address","birthdate","gender","city","lastname","balance","employer","state","age","email","male"],"excludes":[]},"sort":[{"account_number":{"order":"asc","missing":"_last"}}]}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
EnumerableSort(sort0=[$0], dir0=[ASC])
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]], PushDownContext=[[PROJECT->[account_number, firstname, address, birthdate, gender, city, lastname, balance, employer, state, age, email, male], LIMIT->50000], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":50000,"timeout":"1m","_source":{"includes":["account_number","firstname","address","birthdate","gender","city","lastname","balance","employer","state","age","email","male"],"excludes":[]}}, requestedTotalSize=50000, pageSize=null, startFrom=0)])
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]], PushDownContext=[[PROJECT->[account_number, firstname, address, birthdate, gender, city, lastname, balance, employer, state, age, email, male], LIMIT->50000, SORT->[{
"account_number" : {
"order" : "asc",
"missing" : "_last"
}
}]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":50000,"timeout":"1m","_source":{"includes":["account_number","firstname","address","birthdate","gender","city","lastname","balance","employer","state","age","email","male"],"excludes":[]},"sort":[{"account_number":{"order":"asc","missing":"_last"}}]}, requestedTotalSize=50000, pageSize=null, startFrom=0)])
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@ calcite:
EnumerableCalc(expr#0..3=[{inputs}], expr#4=[IS NULL($t3)], expr#5=[0:BIGINT], expr#6=[CASE($t4, $t5, $t3)], id=[$t1], name=[$t0], count_dept=[$t6])
EnumerableLimit(fetch=[10000])
EnumerableMergeJoin(condition=[=($1, $2)], joinType=[left])
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_worker]], PushDownContext=[[PROJECT->[name, id], SORT->[{
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_worker]], PushDownContext=[[PROJECT->[name, id], LIMIT->10000, SORT->[{
"id" : {
"order" : "asc",
"missing" : "_last"
}
}]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"timeout":"1m","_source":{"includes":["name","id"],"excludes":[]},"sort":[{"id":{"order":"asc","missing":"_last"}}]}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_work_information]], PushDownContext=[[PROJECT->[name, uid], FILTER->AND(IS NOT NULL($1), IS NOT NULL($0)), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},count(name)=COUNT($1)), SORT->[0]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"bool":{"must":[{"exists":{"field":"uid","boost":1.0}},{"exists":{"field":"name","boost":1.0}}],"adjust_pure_negative":true,"boost":1.0}},"_source":{"includes":["name","uid"],"excludes":[]},"aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"uid":{"terms":{"field":"uid","missing_bucket":true,"missing_order":"last","order":"asc"}}}]},"aggregations":{"count(name)":{"value_count":{"field":"name"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
}]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":10000,"timeout":"1m","_source":{"includes":["name","id"],"excludes":[]},"sort":[{"id":{"order":"asc","missing":"_last"}}]}, requestedTotalSize=10000, pageSize=null, startFrom=0)])
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_work_information]], PushDownContext=[[PROJECT->[name, uid], FILTER->AND(IS NOT NULL($1), IS NOT NULL($0)), AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},count(name)=COUNT($1)), SORT->[0]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":0,"timeout":"1m","query":{"bool":{"must":[{"exists":{"field":"uid","boost":1.0}},{"exists":{"field":"name","boost":1.0}}],"adjust_pure_negative":true,"boost":1.0}},"_source":{"includes":["name","uid"],"excludes":[]},"aggregations":{"composite_buckets":{"composite":{"size":1000,"sources":[{"uid":{"terms":{"field":"uid","missing_bucket":true,"missing_order":"last","order":"asc"}}}]},"aggregations":{"count(name)":{"value_count":{"field":"name"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
calcite:
logical: |
LogicalSystemLimit(sort0=[$0], dir0=[ASC-nulls-first], fetch=[10000], type=[QUERY_SIZE_LIMIT])
LogicalProject(age=[$8])
LogicalSort(sort0=[$8], dir0=[ASC-nulls-first])
LogicalSort(sort0=[$3], dir0=[ASC-nulls-first], fetch=[5])
CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]])
physical: |
EnumerableLimit(fetch=[10000])
EnumerableCalc(expr#0..1=[{inputs}], age=[$t1])
EnumerableSort(sort0=[$1], dir0=[ASC-nulls-first])
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[balance, age], SORT->[{
"balance" : {
"order" : "asc",
"missing" : "_first"
}
}], LIMIT->5], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":5,"timeout":"1m","_source":{"includes":["balance","age"],"excludes":[]},"sort":[{"balance":{"order":"asc","missing":"_first"}}]}, requestedTotalSize=5, pageSize=null, startFrom=0)])
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,9 @@ calcite:
EnumerableCalc(expr#0..3=[{inputs}], expr#4=[1], expr#5=[>($t1, $t4)], expr#6=[CAST($t3):DOUBLE NOT NULL], expr#7=[/($t2, $t6)], expr#8=[null:NULL], expr#9=[CASE($t5, $t7, $t8)], ageTrend=[$t9])
EnumerableWindow(window#0=[window(rows between $1 PRECEDING and CURRENT ROW aggs [COUNT(), $SUM0($0), COUNT($0)])])
EnumerableCalc(expr#0=[{inputs}], expr#1=[IS NOT NULL($t0)], age=[$t0], $condition=[$t1])
EnumerableSort(sort0=[$0], dir0=[ASC])
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[age], LIMIT->5], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":5,"timeout":"1m","_source":{"includes":["age"],"excludes":[]}}, requestedTotalSize=5, pageSize=null, startFrom=0)])
CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_account]], PushDownContext=[[PROJECT->[age], LIMIT->5, SORT->[{
"age" : {
"order" : "asc",
"missing" : "_last"
}
}]], OpenSearchRequestBuilder(sourceBuilder={"from":0,"size":5,"timeout":"1m","_source":{"includes":["age"],"excludes":[]},"sort":[{"age":{"order":"asc","missing":"_last"}}]}, requestedTotalSize=5, pageSize=null, startFrom=0)])
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,12 @@ public interface Config extends RelRule.Config {
.oneInput(
b1 ->
b1.operand(AbstractCalciteIndexScan.class)
// Skip the rule if a limit has already been pushed down
// because pushing down a sort after a limit will be treated
// as sort-then-limit by OpenSearch DSL.
// Skip the rule if Top-K(i.e. sort + limit) has already been
// pushed down. Otherwise,
// Continue to push down sort although limit has already been
// pushed down since we don't promise collation with only limit.
.predicate(
Predicate.not(AbstractCalciteIndexScan::isLimitPushed)
Predicate.not(AbstractCalciteIndexScan::isTopKPushed)
.and(
Predicate.not(
AbstractCalciteIndexScan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -421,4 +421,8 @@ public boolean isLimitPushed() {
public boolean isMetricsOrderPushed() {
return this.getPushDownContext().isMetricOrderPushed();
}

public boolean isTopKPushed() {
return this.getPushDownContext().isTopKPushed();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public class PushDownContext extends AbstractCollection<PushDownOperation> {
private boolean isLimitPushed = false;
private boolean isProjectPushed = false;
private boolean isMetricOrderPushed = false;
private boolean isSortPushed = false;
private boolean isTopKPushed = false;

public PushDownContext(OpenSearchIndex osIndex) {
this.osIndex = osIndex;
Expand Down Expand Up @@ -97,10 +99,16 @@ public boolean add(PushDownOperation operation) {
}
if (operation.type() == PushDownType.LIMIT) {
isLimitPushed = true;
if (isSortPushed || isMetricOrderPushed) {
isTopKPushed = true;
}
}
if (operation.type() == PushDownType.PROJECT) {
isProjectPushed = true;
}
if (operation.type() == PushDownType.SORT) {
isSortPushed = true;
}
if (operation.type() == PushDownType.SORT_AGG_METRICS) {
isMetricOrderPushed = true;
}
Expand Down
Loading