Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.opensearch.sql.ast.tree.Aggregation;
import org.opensearch.sql.ast.tree.Append;
import org.opensearch.sql.ast.tree.AppendCol;
import org.opensearch.sql.ast.tree.AppendPipe;
import org.opensearch.sql.ast.tree.Bin;
import org.opensearch.sql.ast.tree.CloseCursor;
import org.opensearch.sql.ast.tree.Dedupe;
Expand Down Expand Up @@ -821,6 +822,11 @@ public LogicalPlan visitAppendCol(AppendCol node, AnalysisContext context) {
throw getOnlyForCalciteException("Appendcol");
}

@Override
public LogicalPlan visitAppendPipe(AppendPipe node, AnalysisContext context) {
throw getOnlyForCalciteException("AppendPipe");
}

@Override
public LogicalPlan visitAppend(Append node, AnalysisContext context) {
throw getOnlyForCalciteException("Append");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import org.opensearch.sql.ast.tree.Aggregation;
import org.opensearch.sql.ast.tree.Append;
import org.opensearch.sql.ast.tree.AppendCol;
import org.opensearch.sql.ast.tree.AppendPipe;
import org.opensearch.sql.ast.tree.Bin;
import org.opensearch.sql.ast.tree.CloseCursor;
import org.opensearch.sql.ast.tree.Dedupe;
Expand Down Expand Up @@ -138,6 +139,10 @@ public T visitSearch(Search node, C context) {
return visitChildren(node, context);
}

public T visitAppendPipe(AppendPipe node, C context) {
return visitChildren(node, context);
}

public T visitFilter(Filter node, C context) {
return visitChildren(node, context);
}
Expand Down
6 changes: 6 additions & 0 deletions core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import org.opensearch.sql.ast.expression.WindowFunction;
import org.opensearch.sql.ast.expression.Xor;
import org.opensearch.sql.ast.tree.Aggregation;
import org.opensearch.sql.ast.tree.AppendPipe;
import org.opensearch.sql.ast.tree.Bin;
import org.opensearch.sql.ast.tree.CountBin;
import org.opensearch.sql.ast.tree.Dedupe;
Expand Down Expand Up @@ -563,6 +564,11 @@ public static Trendline trendline(
return new Trendline(sortField, Arrays.asList(computations)).attach(input);
}

public static AppendPipe appendPipe(UnresolvedPlan input, UnresolvedPlan subquery) {

return new AppendPipe(subquery).attach(input);
}

public static Trendline.TrendlineComputation computation(
Integer numDataPoints, Field dataField, String alias, Trendline.TrendlineType type) {
return new Trendline.TrendlineComputation(numDataPoints, dataField, alias, type);
Expand Down
45 changes: 45 additions & 0 deletions core/src/main/java/org/opensearch/sql/ast/tree/AppendPipe.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.ast.tree;

import com.google.common.collect.ImmutableList;
import java.util.List;
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.Setter;
import lombok.ToString;
import org.opensearch.sql.ast.AbstractNodeVisitor;

@Getter
@Setter
@ToString
@EqualsAndHashCode(callSuper = false)
public class AppendPipe extends UnresolvedPlan {

private UnresolvedPlan subQuery;

private UnresolvedPlan child;

public AppendPipe(UnresolvedPlan subQuery) {
this.subQuery = subQuery;
}

@Override
public AppendPipe attach(UnresolvedPlan child) {
this.child = child;
return this;
}

@Override
public List<UnresolvedPlan> getChild() {
return this.child == null ? ImmutableList.of() : ImmutableList.of(this.child);
}

@Override
public <T, C> T accept(AbstractNodeVisitor<T, C> nodeVisitor, C context) {
return nodeVisitor.visitAppendPipe(this, context);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@
import org.opensearch.sql.ast.tree.Aggregation;
import org.opensearch.sql.ast.tree.Append;
import org.opensearch.sql.ast.tree.AppendCol;
import org.opensearch.sql.ast.tree.AppendPipe;
import org.opensearch.sql.ast.tree.Bin;
import org.opensearch.sql.ast.tree.CloseCursor;
import org.opensearch.sql.ast.tree.Dedupe;
Expand Down Expand Up @@ -239,6 +240,25 @@ public RelNode visitFilter(Filter node, CalcitePlanContext context) {
return context.relBuilder.peek();
}

@Override
public RelNode visitAppendPipe(AppendPipe node, CalcitePlanContext context) {
visitChildren(node, context);
UnresolvedPlan subqueryPlan = node.getSubQuery();
UnresolvedPlan childNode = subqueryPlan;
while (childNode.getChild() != null
&& !childNode.getChild().isEmpty()
&& !(childNode.getChild().getFirst() instanceof Values)) {
childNode = (UnresolvedPlan) childNode.getChild().getFirst();
}
childNode.attach(node.getChild().getFirst());

subqueryPlan.accept(this, context);

RelNode subPipelineNode = context.relBuilder.build();
RelNode mainNode = context.relBuilder.build();
return mergeTableAndResolveColumnConflict(mainNode, subPipelineNode, context);
}

@Override
public RelNode visitRegex(Regex node, CalcitePlanContext context) {
visitChildren(node, context);
Expand Down Expand Up @@ -1755,9 +1775,13 @@ public RelNode visitAppend(Append node, CalcitePlanContext context) {
// 3. Merge two query schemas using shared logic
RelNode subsearchNode = context.relBuilder.build();
RelNode mainNode = context.relBuilder.build();
return mergeTableAndResolveColumnConflict(mainNode, subsearchNode, context);
}

private RelNode mergeTableAndResolveColumnConflict(
RelNode mainNode, RelNode subqueryNode, CalcitePlanContext context) {
// Use shared schema merging logic that handles type conflicts via field renaming
List<RelNode> nodesToMerge = Arrays.asList(mainNode, subsearchNode);
List<RelNode> nodesToMerge = Arrays.asList(mainNode, subqueryNode);
List<RelNode> projectedNodes =
SchemaUnifier.buildUnifiedSchemaWithConflictResolution(nodesToMerge, context);

Expand Down
72 changes: 72 additions & 0 deletions docs/user/ppl/cmd/appendpipe.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
=========
appendpipe
=========

.. rubric:: Table of contents

.. contents::
:local:
:depth: 2


Description
============
| Using ``appendpipe`` command to appends the result of the subpipeline to the search results. Unlike a subsearch, the subpipeline is not run first.The subpipeline is run when the search reaches the appendpipe command.
The command aligns columns with the same field names and types. For different column fields between the main search and sub-search, NULL values are filled in the respective rows.

Version
=======
3.3.0

Syntax
============
appendpipe [<subpipeline>]

* subpipeline: mandatory. A list of commands that are applied to the search results from the commands that occur in the search before the ``appendpipe`` command.

Example 1: Append rows from a total count to existing search result
====================================================================================

This example appends rows from "total by gender" to "sum by gender, state" with merged column of same field name and type.

PPL query::

os> source=accounts | stats sum(age) as part by gender, state | sort -part | head 5 | appendpipe [ stats sum(part) as total by gender ];
fetched rows / total rows = 6/6
+------+--------+-------+-------+
| part | gender | state | total |
|------+--------+-------+-------|
| 36 | M | TN | null |
| 33 | M | MD | null |
| 32 | M | IL | null |
| 28 | F | VA | null |
| null | F | null | 28 |
| null | M | null | 101 |
+------+--------+-------+-------+



Example 2: Append rows with merged column names
===============================================================

This example appends rows from "count by gender" to "sum by gender, state".

PPL query::

os> source=accounts | stats sum(age) as total by gender, state | sort -total | head 5 | appendpipe [ stats sum(total) as total by gender ];
fetched rows / total rows = 6/6
+----------+--------+-------+
| total | gender | state |
|----------+--------+-------|
| 36 | M | TN |
| 33 | M | MD |
| 32 | M | IL |
| 28 | F | VA |
| 28 | F | null |
| 101 | M | null |
+----------+--------+-------+

Limitations
===========

* **Schema Compatibility**: Same as command ``append``, when fields with the same name exist between the main search and sub-search but have incompatible types, the query will fail with an error. To avoid type conflicts, ensure that fields with the same name have the same data type, or use different field names (e.g., by renaming with ``eval`` or using ``fields`` to select non-conflicting columns).
Original file line number Diff line number Diff line change
Expand Up @@ -817,6 +817,18 @@ public void testExplainAppendCommand() throws IOException {
TEST_INDEX_BANK)));
}

@Test
public void testExplainAppendPipeCommand() throws IOException {
String expected = loadExpectedPlan("explain_appendpipe_command.json");
assertJsonEqualsIgnoreId(
expected,
explainQueryToString(
String.format(
Locale.ROOT,
"source=%s | appendpipe [ stats count(balance) as cnt by gender ]",
TEST_INDEX_BANK)));
}

@Test
public void testMvjoinExplain() throws IOException {
String query =
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.sql.calcite.remote;

import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_ACCOUNT;
import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_BANK;
import static org.opensearch.sql.util.MatcherUtils.rows;
import static org.opensearch.sql.util.MatcherUtils.schema;
import static org.opensearch.sql.util.MatcherUtils.verifyDataRows;
import static org.opensearch.sql.util.MatcherUtils.verifySchemaInOrder;

import java.io.IOException;
import java.util.Locale;
import org.json.JSONObject;
import org.junit.Test;
import org.opensearch.sql.ppl.PPLIntegTestCase;

public class CalcitePPLAppendPipeCommandIT extends PPLIntegTestCase {
@Override
public void init() throws Exception {
super.init();
enableCalcite();
loadIndex(Index.ACCOUNT);
loadIndex(Index.BANK);
}

@Test
public void testAppendPipe() throws IOException {
JSONObject actual =
executeQuery(
String.format(
Locale.ROOT,
"source=%s | stats sum(age) as sum_age_by_gender by gender | appendpipe [ "
+ " sort -sum_age_by_gender ] |"
+ " head 5",
TEST_INDEX_ACCOUNT));
verifySchemaInOrder(actual, schema("sum_age_by_gender", "bigint"), schema("gender", "string"));
verifyDataRows(actual, rows(14947, "F"), rows(15224, "M"), rows(15224, "M"), rows(14947, "F"));
}

@Test
public void testAppendDifferentIndex() throws IOException {
JSONObject actual =
executeQuery(
String.format(
Locale.ROOT,
"source=%s | stats sum(age) as sum by gender | append [ source=%s | stats"
+ " sum(age) as bank_sum_age ]",
TEST_INDEX_ACCOUNT,
TEST_INDEX_BANK));
verifySchemaInOrder(
actual,
schema("sum", "bigint"),
schema("gender", "string"),
schema("bank_sum_age", "bigint"));
verifyDataRows(actual, rows(14947, "F", null), rows(15224, "M", null), rows(null, null, 238));
}

@Test
public void testAppendpipeWithMergedColumn() throws IOException {
JSONObject actual =
executeQuery(
String.format(
Locale.ROOT,
"source=%s | stats sum(age) as sum by gender |"
+ " appendpipe [ stats sum(sum) as sum ] | head 5",
TEST_INDEX_ACCOUNT,
TEST_INDEX_ACCOUNT));
verifySchemaInOrder(actual, schema("sum", "bigint"), schema("gender", "string"));
verifyDataRows(actual, rows(14947, "F"), rows(15224, "M"), rows(30171, null));
Comment on lines +63 to +73
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found a case that's slightly different from expectation: source=opensearch-sql_test_index_account | stats count() by state, span(age, 10) as span | appendpipe [ stats count() by span | eval state='nomadland'] | sort span"

The result is not merged although both state in the main- and sub-query are of string types:

{
  "schema": [
    {
      "name": "count()",
      "type": "bigint"
    },
    {
      "name": "span",
      "type": "bigint"
    },
    {
      "name": "state",
      "type": "string"
    },
    {
      "name": "state0",
      "type": "string"
    }
  ]
}

This may originates from the fact that the state in the main query is nullable, while the type of state in the subquery is not. IMO, they can be merged, but I'm also not quite sure whether this should be a concern.

}

@Test
public void testAppendpipeWithConflictTypeColumn() throws IOException {
Exception exception =
assertThrows(
Exception.class,
() ->
executeQuery(
String.format(
Locale.ROOT,
"source=%s | stats sum(age) as sum by gender | appendpipe [ eval sum ="
+ " cast(sum as double) ] | head 5",
TEST_INDEX_ACCOUNT)));
assertTrue(exception.getMessage().contains("due to incompatible types"));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"calcite": {
"logical":"LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], birthdate=[$3], gender=[$4], city=[$5], lastname=[$6], balance=[$7], employer=[$8], state=[$9], age=[$10], email=[$11], male=[$12], cnt=[$19])\n LogicalUnion(all=[true])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], birthdate=[$3], gender=[$4], city=[$5], lastname=[$6], balance=[$7], employer=[$8], state=[$9], age=[$10], email=[$11], male=[$12], _id=[$13], _index=[$14], _score=[$15], _maxscore=[$16], _sort=[$17], _routing=[$18], cnt=[null:BIGINT])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])\n LogicalProject(account_number=[null:BIGINT], firstname=[null:VARCHAR], address=[null:VARCHAR], birthdate=[null:EXPR_TIMESTAMP VARCHAR], gender=[$0], city=[null:VARCHAR], lastname=[null:VARCHAR], balance=[null:BIGINT], employer=[null:VARCHAR], state=[null:VARCHAR], age=[null:INTEGER], email=[null:VARCHAR], male=[null:BOOLEAN], _id=[null:VARCHAR], _index=[null:VARCHAR], _score=[null:REAL], _maxscore=[null:REAL], _sort=[null:BIGINT], _routing=[null:VARCHAR], cnt=[$1])\n LogicalAggregate(group=[{0}], cnt=[COUNT($1)])\n LogicalProject(gender=[$4], balance=[$7])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])\n",
"physical":"EnumerableLimit(fetch=[10000])\n EnumerableUnion(all=[true])\n EnumerableCalc(expr#0..12=[{inputs}], expr#13=[null:BIGINT], proj#0..13=[{exprs}])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]], PushDownContext=[[PROJECT->[account_number, firstname, address, birthdate, gender, city, lastname, balance, employer, state, age, email, male], LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"account_number\",\"firstname\",\"address\",\"birthdate\",\"gender\",\"city\",\"lastname\",\"balance\",\"employer\",\"state\",\"age\",\"email\",\"male\"],\"excludes\":[]}}, requestedTotalSize=10000, pageSize=null, startFrom=0)])\n EnumerableCalc(expr#0..1=[{inputs}], expr#2=[null:BIGINT], expr#3=[null:VARCHAR], expr#4=[null:EXPR_TIMESTAMP VARCHAR], expr#5=[null:INTEGER], expr#6=[null:BOOLEAN], account_number=[$t2], firstname=[$t3], address=[$t3], birthdate=[$t4], gender=[$t0], city=[$t3], lastname=[$t3], balance=[$t2], employer=[$t3], state=[$t3], age=[$t5], email=[$t3], male=[$t6], cnt=[$t1])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},cnt=COUNT($1)), LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":0,\"timeout\":\"1m\",\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"gender\":{\"terms\":{\"field\":\"gender.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}}]},\"aggregations\":{\"cnt\":{\"value_count\":{\"field\":\"balance\"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n"
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
{
"calcite": {
"logical":"LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], birthdate=[$3], gender=[$4], city=[$5], lastname=[$6], balance=[$7], employer=[$8], state=[$9], age=[$10], email=[$11], male=[$12], cnt=[$19])\n LogicalUnion(all=[true])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], birthdate=[$3], gender=[$4], city=[$5], lastname=[$6], balance=[$7], employer=[$8], state=[$9], age=[$10], email=[$11], male=[$12], _id=[$13], _index=[$14], _score=[$15], _maxscore=[$16], _sort=[$17], _routing=[$18], cnt=[null:BIGINT])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])\n LogicalProject(account_number=[null:BIGINT], firstname=[null:VARCHAR], address=[null:VARCHAR], birthdate=[null:EXPR_TIMESTAMP VARCHAR], gender=[$0], city=[null:VARCHAR], lastname=[null:VARCHAR], balance=[null:BIGINT], employer=[null:VARCHAR], state=[null:VARCHAR], age=[null:INTEGER], email=[null:VARCHAR], male=[null:BOOLEAN], _id=[null:VARCHAR], _index=[null:VARCHAR], _score=[null:REAL], _maxscore=[null:REAL], _sort=[null:BIGINT], _routing=[null:VARCHAR], cnt=[$1])\n LogicalAggregate(group=[{0}], cnt=[COUNT($1)])\n LogicalProject(gender=[$4], balance=[$7])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])\n",
"physical":"EnumerableLimit(fetch=[10000])\n EnumerableUnion(all=[true])\n EnumerableCalc(expr#0..18=[{inputs}], expr#19=[null:BIGINT], proj#0..12=[{exprs}], cnt=[$t19])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])\n EnumerableCalc(expr#0..1=[{inputs}], expr#2=[null:BIGINT], expr#3=[null:VARCHAR], expr#4=[null:EXPR_TIMESTAMP VARCHAR], expr#5=[null:INTEGER], expr#6=[null:BOOLEAN], account_number=[$t2], firstname=[$t3], address=[$t3], birthdate=[$t4], gender=[$t0], city=[$t3], lastname=[$t3], balance=[$t2], employer=[$t3], state=[$t3], age=[$t5], email=[$t3], male=[$t6], cnt=[$t1])\n EnumerableAggregate(group=[{4}], cnt=[COUNT($7)])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])\n"
}
}
1 change: 1 addition & 0 deletions ppl/src/main/antlr/OpenSearchPPLLexer.g4
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ BUFFER_LIMIT: 'BUFFER_LIMIT';
LABEL: 'LABEL';
SHOW_NUMBERED_TOKEN: 'SHOW_NUMBERED_TOKEN';
AGGREGATION: 'AGGREGATION';
APPENDPIPE: 'APPENDPIPE';

//Native JOIN KEYWORDS
JOIN: 'JOIN';
Expand Down
10 changes: 10 additions & 0 deletions ppl/src/main/antlr/OpenSearchPPLParser.g4
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,10 @@ pplStatement
| queryStatement
;

subPipeline
: PIPE? commands (PIPE commands)*
;

queryStatement
: (PIPE)? pplCommands (PIPE commands)*
;
Expand Down Expand Up @@ -78,6 +82,7 @@ commands
| regexCommand
| timechartCommand
| rexCommand
| appendPipeCommand
| replaceCommand
;

Expand Down Expand Up @@ -117,6 +122,7 @@ commandName
| APPEND
| MULTISEARCH
| REX
| APPENDPIPE
| REPLACE
;

Expand Down Expand Up @@ -217,6 +223,10 @@ statsCommand
: STATS statsArgs statsAggTerm (COMMA statsAggTerm)* (statsByClause)? (dedupSplitArg)?
;

appendPipeCommand
: APPENDPIPE LT_SQR_PRTHS subPipeline RT_SQR_PRTHS
;

statsArgs
: (partitionsArg | allnumArg | delimArg | bucketNullableArg)*
;
Expand Down
Loading
Loading