diff --git a/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java b/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java index f701756559..f20e95c038 100644 --- a/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java +++ b/core/src/main/java/org/opensearch/sql/analysis/Analyzer.java @@ -60,6 +60,7 @@ import org.opensearch.sql.ast.tree.Aggregation; import org.opensearch.sql.ast.tree.Append; import org.opensearch.sql.ast.tree.AppendCol; +import org.opensearch.sql.ast.tree.AppendPipe; import org.opensearch.sql.ast.tree.Bin; import org.opensearch.sql.ast.tree.Chart; import org.opensearch.sql.ast.tree.CloseCursor; @@ -833,6 +834,11 @@ public LogicalPlan visitAppendCol(AppendCol node, AnalysisContext context) { throw getOnlyForCalciteException("Appendcol"); } + @Override + public LogicalPlan visitAppendPipe(AppendPipe node, AnalysisContext context) { + throw getOnlyForCalciteException("AppendPipe"); + } + @Override public LogicalPlan visitAppend(Append node, AnalysisContext context) { throw getOnlyForCalciteException("Append"); diff --git a/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java b/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java index 320723fd57..2daa4b4a2f 100644 --- a/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/ast/AbstractNodeVisitor.java @@ -48,6 +48,7 @@ import org.opensearch.sql.ast.tree.Aggregation; import org.opensearch.sql.ast.tree.Append; import org.opensearch.sql.ast.tree.AppendCol; +import org.opensearch.sql.ast.tree.AppendPipe; import org.opensearch.sql.ast.tree.Bin; import org.opensearch.sql.ast.tree.Chart; import org.opensearch.sql.ast.tree.CloseCursor; @@ -140,6 +141,10 @@ public T visitSearch(Search node, C context) { return visitChildren(node, context); } + public T visitAppendPipe(AppendPipe node, C context) { + return visitChildren(node, context); + } + public T visitFilter(Filter node, C context) { return visitChildren(node, context); } diff --git a/core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java b/core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java index 67cc893c5b..4e87502e73 100644 --- a/core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java +++ b/core/src/main/java/org/opensearch/sql/ast/dsl/AstDSL.java @@ -49,6 +49,7 @@ import org.opensearch.sql.ast.expression.WindowFunction; import org.opensearch.sql.ast.expression.Xor; import org.opensearch.sql.ast.tree.Aggregation; +import org.opensearch.sql.ast.tree.AppendPipe; import org.opensearch.sql.ast.tree.Bin; import org.opensearch.sql.ast.tree.CountBin; import org.opensearch.sql.ast.tree.Dedupe; @@ -563,6 +564,11 @@ public static Trendline trendline( return new Trendline(sortField, Arrays.asList(computations)).attach(input); } + public static AppendPipe appendPipe(UnresolvedPlan input, UnresolvedPlan subquery) { + + return new AppendPipe(subquery).attach(input); + } + public static Trendline.TrendlineComputation computation( Integer numDataPoints, Field dataField, String alias, Trendline.TrendlineType type) { return new Trendline.TrendlineComputation(numDataPoints, dataField, alias, type); diff --git a/core/src/main/java/org/opensearch/sql/ast/tree/AppendPipe.java b/core/src/main/java/org/opensearch/sql/ast/tree/AppendPipe.java new file mode 100644 index 0000000000..0ea1cb9b45 --- /dev/null +++ b/core/src/main/java/org/opensearch/sql/ast/tree/AppendPipe.java @@ -0,0 +1,45 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.ast.tree; + +import com.google.common.collect.ImmutableList; +import java.util.List; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; +import org.opensearch.sql.ast.AbstractNodeVisitor; + +@Getter +@Setter +@ToString +@EqualsAndHashCode(callSuper = false) +public class AppendPipe extends UnresolvedPlan { + + private UnresolvedPlan subQuery; + + private UnresolvedPlan child; + + public AppendPipe(UnresolvedPlan subQuery) { + this.subQuery = subQuery; + } + + @Override + public AppendPipe attach(UnresolvedPlan child) { + this.child = child; + return this; + } + + @Override + public List getChild() { + return this.child == null ? ImmutableList.of() : ImmutableList.of(this.child); + } + + @Override + public T accept(AbstractNodeVisitor nodeVisitor, C context) { + return nodeVisitor.visitAppendPipe(this, context); + } +} diff --git a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java index 9408695261..415d47a0fd 100644 --- a/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java +++ b/core/src/main/java/org/opensearch/sql/calcite/CalciteRelNodeVisitor.java @@ -105,6 +105,7 @@ import org.opensearch.sql.ast.tree.Aggregation; import org.opensearch.sql.ast.tree.Append; import org.opensearch.sql.ast.tree.AppendCol; +import org.opensearch.sql.ast.tree.AppendPipe; import org.opensearch.sql.ast.tree.Bin; import org.opensearch.sql.ast.tree.Chart; import org.opensearch.sql.ast.tree.CloseCursor; @@ -246,6 +247,28 @@ public RelNode visitFilter(Filter node, CalcitePlanContext context) { return context.relBuilder.peek(); } + @Override + public RelNode visitAppendPipe(AppendPipe node, CalcitePlanContext context) { + visitChildren(node, context); + UnresolvedPlan subqueryPlan = node.getSubQuery(); + UnresolvedPlan childNode = subqueryPlan; + while (childNode.getChild() != null + && !childNode.getChild().isEmpty() + && !(childNode.getChild().getFirst() instanceof Values)) { + if (childNode.getChild().size() > 1) { + throw new RuntimeException("AppendPipe doesn't support multiply children subquery."); + } + childNode = (UnresolvedPlan) childNode.getChild().getFirst(); + } + childNode.attach(node.getChild().getFirst()); + + subqueryPlan.accept(this, context); + + RelNode subPipelineNode = context.relBuilder.build(); + RelNode mainNode = context.relBuilder.build(); + return mergeTableAndResolveColumnConflict(mainNode, subPipelineNode, context); + } + @Override public RelNode visitRegex(Regex node, CalcitePlanContext context) { visitChildren(node, context); @@ -2121,9 +2144,13 @@ public RelNode visitAppend(Append node, CalcitePlanContext context) { // 3. Merge two query schemas using shared logic RelNode subsearchNode = context.relBuilder.build(); RelNode mainNode = context.relBuilder.build(); + return mergeTableAndResolveColumnConflict(mainNode, subsearchNode, context); + } + private RelNode mergeTableAndResolveColumnConflict( + RelNode mainNode, RelNode subqueryNode, CalcitePlanContext context) { // Use shared schema merging logic that handles type conflicts via field renaming - List nodesToMerge = Arrays.asList(mainNode, subsearchNode); + List nodesToMerge = Arrays.asList(mainNode, subqueryNode); List projectedNodes = SchemaUnifier.buildUnifiedSchemaWithConflictResolution(nodesToMerge, context); diff --git a/docs/user/ppl/cmd/appendpipe.rst b/docs/user/ppl/cmd/appendpipe.rst new file mode 100644 index 0000000000..43c4dd1e84 --- /dev/null +++ b/docs/user/ppl/cmd/appendpipe.rst @@ -0,0 +1,72 @@ +========= +appendpipe +========= + +.. rubric:: Table of contents + +.. contents:: + :local: + :depth: 2 + + +Description +============ +| Using ``appendpipe`` command to appends the result of the subpipeline to the search results. Unlike a subsearch, the subpipeline is not run first.The subpipeline is run when the search reaches the appendpipe command. +The command aligns columns with the same field names and types. For different column fields between the main search and sub-search, NULL values are filled in the respective rows. + +Version +======= +3.3.0 + +Syntax +============ +appendpipe [] + +* subpipeline: mandatory. A list of commands that are applied to the search results from the commands that occur in the search before the ``appendpipe`` command. + +Example 1: Append rows from a total count to existing search result +==================================================================================== + +This example appends rows from "total by gender" to "sum by gender, state" with merged column of same field name and type. + +PPL query:: + + os> source=accounts | stats sum(age) as part by gender, state | sort -part | head 5 | appendpipe [ stats sum(part) as total by gender ]; + fetched rows / total rows = 6/6 + +------+--------+-------+-------+ + | part | gender | state | total | + |------+--------+-------+-------| + | 36 | M | TN | null | + | 33 | M | MD | null | + | 32 | M | IL | null | + | 28 | F | VA | null | + | null | F | null | 28 | + | null | M | null | 101 | + +------+--------+-------+-------+ + + + +Example 2: Append rows with merged column names +=============================================================== + +This example appends rows from "count by gender" to "sum by gender, state". + +PPL query:: + + os> source=accounts | stats sum(age) as total by gender, state | sort -total | head 5 | appendpipe [ stats sum(total) as total by gender ]; + fetched rows / total rows = 6/6 + +----------+--------+-------+ + | total | gender | state | + |----------+--------+-------| + | 36 | M | TN | + | 33 | M | MD | + | 32 | M | IL | + | 28 | F | VA | + | 28 | F | null | + | 101 | M | null | + +----------+--------+-------+ + +Limitations +=========== + +* **Schema Compatibility**: Same as command ``append``, when fields with the same name exist between the main search and sub-search but have incompatible types, the query will fail with an error. To avoid type conflicts, ensure that fields with the same name have the same data type, or use different field names (e.g., by renaming with ``eval`` or using ``fields`` to select non-conflicting columns). diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java index 994e583eaa..841469115d 100644 --- a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalciteExplainIT.java @@ -894,6 +894,18 @@ public void testExplainAppendCommand() throws IOException { TEST_INDEX_BANK))); } + @Test + public void testExplainAppendPipeCommand() throws IOException { + String expected = loadExpectedPlan("explain_appendpipe_command.json"); + assertJsonEqualsIgnoreId( + expected, + explainQueryToString( + String.format( + Locale.ROOT, + "source=%s | appendpipe [ stats count(balance) as cnt by gender ]", + TEST_INDEX_BANK))); + } + @Test public void testMvjoinExplain() throws IOException { String query = diff --git a/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAppendPipeCommandIT.java b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAppendPipeCommandIT.java new file mode 100644 index 0000000000..d25d3ca80d --- /dev/null +++ b/integ-test/src/test/java/org/opensearch/sql/calcite/remote/CalcitePPLAppendPipeCommandIT.java @@ -0,0 +1,90 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.calcite.remote; + +import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_ACCOUNT; +import static org.opensearch.sql.legacy.TestsConstants.TEST_INDEX_BANK; +import static org.opensearch.sql.util.MatcherUtils.rows; +import static org.opensearch.sql.util.MatcherUtils.schema; +import static org.opensearch.sql.util.MatcherUtils.verifyDataRows; +import static org.opensearch.sql.util.MatcherUtils.verifySchemaInOrder; + +import java.io.IOException; +import java.util.Locale; +import org.json.JSONObject; +import org.junit.Test; +import org.opensearch.sql.ppl.PPLIntegTestCase; + +public class CalcitePPLAppendPipeCommandIT extends PPLIntegTestCase { + @Override + public void init() throws Exception { + super.init(); + enableCalcite(); + loadIndex(Index.ACCOUNT); + loadIndex(Index.BANK); + } + + @Test + public void testAppendPipe() throws IOException { + JSONObject actual = + executeQuery( + String.format( + Locale.ROOT, + "source=%s | stats sum(age) as sum_age_by_gender by gender | appendpipe [ " + + " sort -sum_age_by_gender ] |" + + " head 5", + TEST_INDEX_ACCOUNT)); + verifySchemaInOrder(actual, schema("sum_age_by_gender", "bigint"), schema("gender", "string")); + verifyDataRows(actual, rows(14947, "F"), rows(15224, "M"), rows(15224, "M"), rows(14947, "F")); + } + + @Test + public void testAppendDifferentIndex() throws IOException { + JSONObject actual = + executeQuery( + String.format( + Locale.ROOT, + "source=%s | stats sum(age) as sum by gender | append [ source=%s | stats" + + " sum(age) as bank_sum_age ]", + TEST_INDEX_ACCOUNT, + TEST_INDEX_BANK)); + verifySchemaInOrder( + actual, + schema("sum", "bigint"), + schema("gender", "string"), + schema("bank_sum_age", "bigint")); + verifyDataRows(actual, rows(14947, "F", null), rows(15224, "M", null), rows(null, null, 238)); + } + + @Test + public void testAppendpipeWithMergedColumn() throws IOException { + JSONObject actual = + executeQuery( + String.format( + Locale.ROOT, + "source=%s | stats sum(age) as sum by gender |" + + " appendpipe [ stats sum(sum) as sum ] | head 5", + TEST_INDEX_ACCOUNT, + TEST_INDEX_ACCOUNT)); + verifySchemaInOrder(actual, schema("sum", "bigint"), schema("gender", "string")); + verifyDataRows(actual, rows(14947, "F"), rows(15224, "M"), rows(30171, null)); + } + + @Test + public void testAppendpipeWithConflictTypeColumn() throws IOException { + Exception exception = + assertThrows( + Exception.class, + () -> + executeQuery( + String.format( + Locale.ROOT, + "source=%s | stats sum(age) as sum by gender | appendpipe [ eval sum =" + + " cast(sum as double) ] | head 5", + TEST_INDEX_ACCOUNT))); + assertTrue(exception.getMessage().contains("due to incompatible types")); + } +} diff --git a/integ-test/src/test/resources/expectedOutput/calcite/explain_appendpipe_command.json b/integ-test/src/test/resources/expectedOutput/calcite/explain_appendpipe_command.json new file mode 100644 index 0000000000..6ec42972a1 --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite/explain_appendpipe_command.json @@ -0,0 +1,6 @@ +{ + "calcite": { + "logical":"LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], birthdate=[$3], gender=[$4], city=[$5], lastname=[$6], balance=[$7], employer=[$8], state=[$9], age=[$10], email=[$11], male=[$12], cnt=[$19])\n LogicalUnion(all=[true])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], birthdate=[$3], gender=[$4], city=[$5], lastname=[$6], balance=[$7], employer=[$8], state=[$9], age=[$10], email=[$11], male=[$12], _id=[$13], _index=[$14], _score=[$15], _maxscore=[$16], _sort=[$17], _routing=[$18], cnt=[null:BIGINT])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])\n LogicalProject(account_number=[null:BIGINT], firstname=[null:VARCHAR], address=[null:VARCHAR], birthdate=[null:EXPR_TIMESTAMP VARCHAR], gender=[$0], city=[null:VARCHAR], lastname=[null:VARCHAR], balance=[null:BIGINT], employer=[null:VARCHAR], state=[null:VARCHAR], age=[null:INTEGER], email=[null:VARCHAR], male=[null:BOOLEAN], _id=[null:VARCHAR], _index=[null:VARCHAR], _score=[null:REAL], _maxscore=[null:REAL], _sort=[null:BIGINT], _routing=[null:VARCHAR], cnt=[$1])\n LogicalAggregate(group=[{0}], cnt=[COUNT($1)])\n LogicalProject(gender=[$4], balance=[$7])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])\n", + "physical":"EnumerableLimit(fetch=[10000])\n EnumerableUnion(all=[true])\n EnumerableCalc(expr#0..12=[{inputs}], expr#13=[null:BIGINT], proj#0..13=[{exprs}])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]], PushDownContext=[[PROJECT->[account_number, firstname, address, birthdate, gender, city, lastname, balance, employer, state, age, email, male], LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":10000,\"timeout\":\"1m\",\"_source\":{\"includes\":[\"account_number\",\"firstname\",\"address\",\"birthdate\",\"gender\",\"city\",\"lastname\",\"balance\",\"employer\",\"state\",\"age\",\"email\",\"male\"],\"excludes\":[]}}, requestedTotalSize=10000, pageSize=null, startFrom=0)])\n EnumerableCalc(expr#0..1=[{inputs}], expr#2=[null:BIGINT], expr#3=[null:VARCHAR], expr#4=[null:EXPR_TIMESTAMP VARCHAR], expr#5=[null:INTEGER], expr#6=[null:BOOLEAN], account_number=[$t2], firstname=[$t3], address=[$t3], birthdate=[$t4], gender=[$t0], city=[$t3], lastname=[$t3], balance=[$t2], employer=[$t3], state=[$t3], age=[$t5], email=[$t3], male=[$t6], cnt=[$t1])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]], PushDownContext=[[AGGREGATION->rel#:LogicalAggregate.NONE.[](input=RelSubset#,group={0},cnt=COUNT($1)), LIMIT->10000], OpenSearchRequestBuilder(sourceBuilder={\"from\":0,\"size\":0,\"timeout\":\"1m\",\"aggregations\":{\"composite_buckets\":{\"composite\":{\"size\":1000,\"sources\":[{\"gender\":{\"terms\":{\"field\":\"gender.keyword\",\"missing_bucket\":true,\"missing_order\":\"first\",\"order\":\"asc\"}}}]},\"aggregations\":{\"cnt\":{\"value_count\":{\"field\":\"balance\"}}}}}}, requestedTotalSize=2147483647, pageSize=null, startFrom=0)])\n" + } +} \ No newline at end of file diff --git a/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_appendpipe_command.json b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_appendpipe_command.json new file mode 100644 index 0000000000..2b111e119d --- /dev/null +++ b/integ-test/src/test/resources/expectedOutput/calcite_no_pushdown/explain_appendpipe_command.json @@ -0,0 +1,6 @@ +{ + "calcite": { + "logical":"LogicalSystemLimit(fetch=[10000], type=[QUERY_SIZE_LIMIT])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], birthdate=[$3], gender=[$4], city=[$5], lastname=[$6], balance=[$7], employer=[$8], state=[$9], age=[$10], email=[$11], male=[$12], cnt=[$19])\n LogicalUnion(all=[true])\n LogicalProject(account_number=[$0], firstname=[$1], address=[$2], birthdate=[$3], gender=[$4], city=[$5], lastname=[$6], balance=[$7], employer=[$8], state=[$9], age=[$10], email=[$11], male=[$12], _id=[$13], _index=[$14], _score=[$15], _maxscore=[$16], _sort=[$17], _routing=[$18], cnt=[null:BIGINT])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])\n LogicalProject(account_number=[null:BIGINT], firstname=[null:VARCHAR], address=[null:VARCHAR], birthdate=[null:EXPR_TIMESTAMP VARCHAR], gender=[$0], city=[null:VARCHAR], lastname=[null:VARCHAR], balance=[null:BIGINT], employer=[null:VARCHAR], state=[null:VARCHAR], age=[null:INTEGER], email=[null:VARCHAR], male=[null:BOOLEAN], _id=[null:VARCHAR], _index=[null:VARCHAR], _score=[null:REAL], _maxscore=[null:REAL], _sort=[null:BIGINT], _routing=[null:VARCHAR], cnt=[$1])\n LogicalAggregate(group=[{0}], cnt=[COUNT($1)])\n LogicalProject(gender=[$4], balance=[$7])\n CalciteLogicalIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])\n", + "physical":"EnumerableLimit(fetch=[10000])\n EnumerableUnion(all=[true])\n EnumerableCalc(expr#0..18=[{inputs}], expr#19=[null:BIGINT], proj#0..12=[{exprs}], cnt=[$t19])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])\n EnumerableCalc(expr#0..1=[{inputs}], expr#2=[null:BIGINT], expr#3=[null:VARCHAR], expr#4=[null:EXPR_TIMESTAMP VARCHAR], expr#5=[null:INTEGER], expr#6=[null:BOOLEAN], account_number=[$t2], firstname=[$t3], address=[$t3], birthdate=[$t4], gender=[$t0], city=[$t3], lastname=[$t3], balance=[$t2], employer=[$t3], state=[$t3], age=[$t5], email=[$t3], male=[$t6], cnt=[$t1])\n EnumerableAggregate(group=[{4}], cnt=[COUNT($7)])\n CalciteEnumerableIndexScan(table=[[OpenSearch, opensearch-sql_test_index_bank]])\n" + } +} \ No newline at end of file diff --git a/ppl/src/main/antlr/OpenSearchPPLLexer.g4 b/ppl/src/main/antlr/OpenSearchPPLLexer.g4 index b0650c2442..7d893810c2 100644 --- a/ppl/src/main/antlr/OpenSearchPPLLexer.g4 +++ b/ppl/src/main/antlr/OpenSearchPPLLexer.g4 @@ -62,6 +62,7 @@ BUFFER_LIMIT: 'BUFFER_LIMIT'; LABEL: 'LABEL'; SHOW_NUMBERED_TOKEN: 'SHOW_NUMBERED_TOKEN'; AGGREGATION: 'AGGREGATION'; +APPENDPIPE: 'APPENDPIPE'; //Native JOIN KEYWORDS JOIN: 'JOIN'; diff --git a/ppl/src/main/antlr/OpenSearchPPLParser.g4 b/ppl/src/main/antlr/OpenSearchPPLParser.g4 index 3857c8557b..59756f84f1 100644 --- a/ppl/src/main/antlr/OpenSearchPPLParser.g4 +++ b/ppl/src/main/antlr/OpenSearchPPLParser.g4 @@ -19,6 +19,10 @@ pplStatement | queryStatement ; +subPipeline + : PIPE? commands (PIPE commands)* + ; + queryStatement : (PIPE)? pplCommands (PIPE commands)* ; @@ -80,6 +84,7 @@ commands | chartCommand | timechartCommand | rexCommand + | appendPipeCommand | replaceCommand ; @@ -120,6 +125,7 @@ commandName | APPEND | MULTISEARCH | REX + | APPENDPIPE | REPLACE ; @@ -220,6 +226,10 @@ statsCommand : STATS statsArgs statsAggTerm (COMMA statsAggTerm)* (statsByClause)? (dedupSplitArg)? ; +appendPipeCommand + : APPENDPIPE LT_SQR_PRTHS subPipeline RT_SQR_PRTHS + ; + statsArgs : (partitionsArg | allnumArg | delimArg | bucketNullableArg)* ; diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java index 09e9b4c77e..4566dc30a2 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/parser/AstBuilder.java @@ -73,6 +73,7 @@ import org.opensearch.sql.ast.tree.Aggregation; import org.opensearch.sql.ast.tree.Append; import org.opensearch.sql.ast.tree.AppendCol; +import org.opensearch.sql.ast.tree.AppendPipe; import org.opensearch.sql.ast.tree.Chart; import org.opensearch.sql.ast.tree.CountBin; import org.opensearch.sql.ast.tree.Dedupe; @@ -165,6 +166,16 @@ public UnresolvedPlan visitQueryStatement(OpenSearchPPLParser.QueryStatementCont .reduce(pplCommand, (r, e) -> e.attach(e instanceof Join ? projectExceptMeta(r) : r)); } + @Override + public UnresolvedPlan visitSubPipeline(OpenSearchPPLParser.SubPipelineContext ctx) { + List cmds = ctx.commands(); + if (cmds.isEmpty()) { + throw new IllegalArgumentException("appendpipe [] is empty"); + } + UnresolvedPlan seed = visit(cmds.getFirst()); + return cmds.stream().skip(1).map(this::visit).reduce(seed, (left, op) -> op.attach(left)); + } + @Override public UnresolvedPlan visitSubSearch(OpenSearchPPLParser.SubSearchContext ctx) { UnresolvedPlan searchCommand = visit(ctx.searchCommand()); @@ -236,6 +247,12 @@ public UnresolvedPlan visitWhereCommand(WhereCommandContext ctx) { return new Filter(internalVisitExpression(ctx.logicalExpression())); } + @Override + public UnresolvedPlan visitAppendPipeCommand(OpenSearchPPLParser.AppendPipeCommandContext ctx) { + UnresolvedPlan plan = visit(ctx.subPipeline()); + return new AppendPipe(plan); + } + @Override public UnresolvedPlan visitJoinCommand(OpenSearchPPLParser.JoinCommandContext ctx) { // a sql-like syntax if join criteria existed diff --git a/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java b/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java index 0971924295..b3b91d11b5 100644 --- a/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java +++ b/ppl/src/main/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizer.java @@ -55,6 +55,7 @@ import org.opensearch.sql.ast.tree.Aggregation; import org.opensearch.sql.ast.tree.Append; import org.opensearch.sql.ast.tree.AppendCol; +import org.opensearch.sql.ast.tree.AppendPipe; import org.opensearch.sql.ast.tree.Bin; import org.opensearch.sql.ast.tree.Chart; import org.opensearch.sql.ast.tree.CountBin; @@ -711,6 +712,19 @@ private String visitExpression(UnresolvedExpression expression) { return expressionAnalyzer.analyze(expression, null); } + @Override + public String visitAppendPipe(AppendPipe node, String context) { + Values emptyValue = new Values(null); + UnresolvedPlan childNode = node.getSubQuery(); + while (childNode != null && !childNode.getChild().isEmpty()) { + childNode = (UnresolvedPlan) childNode.getChild().get(0); + } + childNode.attach(emptyValue); + String child = node.getChild().get(0).accept(this, context); + String subPipeline = anonymizeData(node.getSubQuery()); + return StringUtils.format("%s | appendpipe [%s]", child, subPipeline); + } + @Override public String visitFillNull(FillNull node, String context) { String child = node.getChild().get(0).accept(this, context); diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAppendPipeTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAppendPipeTest.java new file mode 100644 index 0000000000..faf944da4a --- /dev/null +++ b/ppl/src/test/java/org/opensearch/sql/ppl/calcite/CalcitePPLAppendPipeTest.java @@ -0,0 +1,62 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.ppl.calcite; + +import org.apache.calcite.rel.RelNode; +import org.apache.calcite.test.CalciteAssert; +import org.junit.Test; + +public class CalcitePPLAppendPipeTest extends CalcitePPLAbstractTest { + public CalcitePPLAppendPipeTest() { + super(CalciteAssert.SchemaSpec.SCOTT_WITH_TEMPORAL); + } + + @Test + public void testAppendPipe() { + String ppl = "source=EMP | appendpipe [ where DEPTNO = 20 ]"; + RelNode root = getRelNode(ppl); + String expectedLogical = + "LogicalUnion(all=[true])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n" + + " LogicalFilter(condition=[=($7, 20)])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n"; + verifyLogical(root, expectedLogical); + verifyResultCount(root, 19); // 14 original table rows + 5 filtered subquery rows + + String expectedSparkSql = + "SELECT *\n" + + "FROM `scott`.`EMP`\n" + + "UNION ALL\n" + + "SELECT *\n" + + "FROM `scott`.`EMP`\n" + + "WHERE `DEPTNO` = 20"; + verifyPPLToSparkSQL(root, expectedSparkSql); + } + + @Test + public void testAppendPipeWithMergedColumns() { + String ppl = + "source=EMP | fields DEPTNO | appendpipe [ fields DEPTNO | eval DEPTNO_PLUS =" + + " DEPTNO + 10 ]"; + RelNode root = getRelNode(ppl); + String expectedLogical = + "LogicalUnion(all=[true])\n" + + " LogicalProject(DEPTNO=[$7], DEPTNO_PLUS=[null:INTEGER])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n" + + " LogicalProject(DEPTNO=[$7], DEPTNO_PLUS=[+($7, 10)])\n" + + " LogicalTableScan(table=[[scott, EMP]])\n"; + verifyLogical(root, expectedLogical); + verifyResultCount(root, 28); + + String expectedSparkSql = + "SELECT `DEPTNO`, CAST(NULL AS INTEGER) `DEPTNO_PLUS`\n" + + "FROM `scott`.`EMP`\n" + + "UNION ALL\n" + + "SELECT `DEPTNO`, `DEPTNO` + 10 `DEPTNO_PLUS`\n" + + "FROM `scott`.`EMP`"; + verifyPPLToSparkSQL(root, expectedSparkSql); + } +} diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstBuilderTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstBuilderTest.java index f1464e3106..8cc207b656 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstBuilderTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/parser/AstBuilderTest.java @@ -13,6 +13,7 @@ import static org.opensearch.sql.ast.dsl.AstDSL.agg; import static org.opensearch.sql.ast.dsl.AstDSL.aggregate; import static org.opensearch.sql.ast.dsl.AstDSL.alias; +import static org.opensearch.sql.ast.dsl.AstDSL.appendPipe; import static org.opensearch.sql.ast.dsl.AstDSL.argument; import static org.opensearch.sql.ast.dsl.AstDSL.booleanLiteral; import static org.opensearch.sql.ast.dsl.AstDSL.compare; @@ -1003,6 +1004,20 @@ public void testFillNullValueWithFields() { fillNull(relation("t"), intLiteral(0), true, field("a"), field("b"), field("c"))); } + @Test + public void testAppendPipe() { + assertEqual( + "source=t | appendpipe [ stats COUNT() ]", + appendPipe( + relation("t"), + agg( + null, + exprList(alias("COUNT()", aggregate("count", AstDSL.allFields()))), + emptyList(), + emptyList(), + defaultStatsArgs()))); + } + public void testTrendline() { assertEqual( "source=t | trendline sma(5, test_field) as test_field_alias sma(1, test_field_2) as" diff --git a/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java b/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java index ec87000b5b..be8cffffb5 100644 --- a/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java +++ b/ppl/src/test/java/org/opensearch/sql/ppl/utils/PPLQueryDataAnonymizerTest.java @@ -753,6 +753,19 @@ public void testRegex() { anonymize("source=t | regex email='.*@domain.com' | fields email")); } + @Test + public void testAppendPipe() { + assertEquals( + "source=table | appendpipe [ | stats count()]", + anonymize("source=t | appendpipe [stats count()]")); + assertEquals( + "source=table | appendpipe [ | where identifier = ***]", + anonymize("source=t | appendpipe [where fieldname=='pattern']")); + assertEquals( + "source=table | appendpipe [ | sort identifier]", + anonymize("source=t | appendpipe [sort fieldname]")); + } + @Test public void testRexCommand() { when(settings.getSettingValue(Key.PPL_REX_MAX_MATCH_LIMIT)).thenReturn(10);