Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.calcite.plan.RelOptTable;
import org.apache.calcite.plan.ViewExpanders;
import org.apache.calcite.rel.RelNode;
import org.apache.calcite.rel.core.Aggregate;
import org.apache.calcite.rel.core.JoinRelType;
Expand Down Expand Up @@ -75,8 +73,6 @@
import org.opensearch.sql.ast.dsl.AstDSL;
import org.opensearch.sql.ast.expression.AggregateFunction;
import org.opensearch.sql.ast.expression.Alias;
import org.opensearch.sql.ast.expression.AllFields;
import org.opensearch.sql.ast.expression.AllFieldsExcludeMeta;
import org.opensearch.sql.ast.expression.Argument;
import org.opensearch.sql.ast.expression.Argument.ArgumentMap;
import org.opensearch.sql.ast.expression.Field;
Expand Down Expand Up @@ -133,14 +129,14 @@
import org.opensearch.sql.ast.tree.UnresolvedPlan;
import org.opensearch.sql.ast.tree.Values;
import org.opensearch.sql.ast.tree.Window;
import org.opensearch.sql.calcite.handlers.ProjectHandler;
import org.opensearch.sql.calcite.handlers.ProjectionUtils;
import org.opensearch.sql.calcite.plan.LogicalSystemLimit;
import org.opensearch.sql.calcite.plan.LogicalSystemLimit.SystemLimitType;
import org.opensearch.sql.calcite.plan.OpenSearchConstants;
import org.opensearch.sql.calcite.utils.BinUtils;
import org.opensearch.sql.calcite.utils.JoinAndLookupUtils;
import org.opensearch.sql.calcite.utils.PlanUtils;
import org.opensearch.sql.calcite.utils.UserDefinedFunctionUtils;
import org.opensearch.sql.calcite.utils.WildcardUtils;
import org.opensearch.sql.common.patterns.PatternUtils;
import org.opensearch.sql.common.utils.StringUtils;
import org.opensearch.sql.datasource.DataSourceService;
Expand All @@ -157,11 +153,13 @@ public class CalciteRelNodeVisitor extends AbstractNodeVisitor<RelNode, CalciteP
private final CalciteRexNodeVisitor rexVisitor;
private final CalciteAggCallVisitor aggVisitor;
private final DataSourceService dataSourceService;
private final ProjectHandler projectHandler;

public CalciteRelNodeVisitor(DataSourceService dataSourceService) {
this.rexVisitor = new CalciteRexNodeVisitor(this);
this.aggVisitor = new CalciteAggCallVisitor(rexVisitor);
this.dataSourceService = dataSourceService;
this.projectHandler = new ProjectHandler(rexVisitor);
}

public RelNode analyze(UnresolvedPlan unresolved, CalcitePlanContext context) {
Expand Down Expand Up @@ -189,17 +187,6 @@ public RelNode visitRelation(Relation node, CalcitePlanContext context) {
return context.relBuilder.peek();
}

// This is a tool method to add an existed RelOptTable to builder stack, not used for now
private RelBuilder scan(RelOptTable tableSchema, CalcitePlanContext context) {
final RelNode scan =
context
.relBuilder
.getScanFactory()
.createScan(ViewExpanders.simpleContext(context.relBuilder.getCluster()), tableSchema);
context.relBuilder.push(scan);
return context.relBuilder;
}

@Override
public RelNode visitSearch(Search node, CalcitePlanContext context) {
// Visit the Relation child to get the scan
Expand Down Expand Up @@ -343,191 +330,10 @@ private boolean containsSubqueryExpression(Node expr) {
@Override
public RelNode visitProject(Project node, CalcitePlanContext context) {
visitChildren(node, context);

if (isSingleAllFieldsProject(node)) {
return handleAllFieldsProject(node, context);
}

List<String> currentFields = context.relBuilder.peek().getRowType().getFieldNames();
List<RexNode> expandedFields =
expandProjectFields(node.getProjectList(), currentFields, context);

if (node.isExcluded()) {
validateExclusion(expandedFields, currentFields);
context.relBuilder.projectExcept(expandedFields);
} else {
if (!context.isResolvingSubquery()) {
context.setProjectVisited(true);
}
context.relBuilder.project(expandedFields);
}
return context.relBuilder.peek();
}

private boolean isSingleAllFieldsProject(Project node) {
return node.getProjectList().size() == 1
&& node.getProjectList().getFirst() instanceof AllFields;
}

private RelNode handleAllFieldsProject(Project node, CalcitePlanContext context) {
if (node.isExcluded()) {
throw new IllegalArgumentException(
"Invalid field exclusion: operation would exclude all fields from the result set");
}
AllFields allFields = (AllFields) node.getProjectList().getFirst();
tryToRemoveNestedFields(context);
tryToRemoveMetaFields(context, allFields instanceof AllFieldsExcludeMeta);
projectHandler.handleProject(node, context);
return context.relBuilder.peek();
}

private List<RexNode> expandProjectFields(
List<UnresolvedExpression> projectList,
List<String> currentFields,
CalcitePlanContext context) {
List<RexNode> expandedFields = new ArrayList<>();
Set<String> addedFields = new HashSet<>();

for (UnresolvedExpression expr : projectList) {
switch (expr) {
case Field field -> {
String fieldName = field.getField().toString();
if (WildcardUtils.containsWildcard(fieldName)) {
List<String> matchingFields =
WildcardUtils.expandWildcardPattern(fieldName, currentFields).stream()
.filter(f -> !isMetadataField(f))
.filter(addedFields::add)
.toList();
if (matchingFields.isEmpty()) {
continue;
}
matchingFields.forEach(f -> expandedFields.add(context.relBuilder.field(f)));
} else if (addedFields.add(fieldName)) {
expandedFields.add(rexVisitor.analyze(field, context));
}
}
case AllFields ignored -> {
currentFields.stream()
.filter(field -> !isMetadataField(field))
.filter(addedFields::add)
.forEach(field -> expandedFields.add(context.relBuilder.field(field)));
}
default -> throw new IllegalStateException(
"Unexpected expression type in project list: " + expr.getClass().getSimpleName());
}
}

if (expandedFields.isEmpty()) {
validateWildcardPatterns(projectList, currentFields);
}

return expandedFields;
}

private void validateExclusion(List<RexNode> fieldsToExclude, List<String> currentFields) {
Set<String> nonMetaFields =
currentFields.stream().filter(field -> !isMetadataField(field)).collect(Collectors.toSet());

if (fieldsToExclude.size() >= nonMetaFields.size()) {
throw new IllegalArgumentException(
"Invalid field exclusion: operation would exclude all fields from the result set");
}
}

private void validateWildcardPatterns(
List<UnresolvedExpression> projectList, List<String> currentFields) {
String firstWildcardPattern =
projectList.stream()
.filter(
expr ->
expr instanceof Field field
&& WildcardUtils.containsWildcard(field.getField().toString()))
.map(expr -> ((Field) expr).getField().toString())
.findFirst()
.orElse(null);

if (firstWildcardPattern != null) {
throw new IllegalArgumentException(
String.format("wildcard pattern [%s] matches no fields", firstWildcardPattern));
}
}

private boolean isMetadataField(String fieldName) {
return OpenSearchConstants.METADATAFIELD_TYPE_MAP.containsKey(fieldName);
}

/** See logic in {@link org.opensearch.sql.analysis.symbol.SymbolTable#lookupAllFields} */
private static void tryToRemoveNestedFields(CalcitePlanContext context) {
Set<String> allFields = new HashSet<>(context.relBuilder.peek().getRowType().getFieldNames());
List<RexNode> duplicatedNestedFields =
allFields.stream()
.filter(
field -> {
int lastDot = field.lastIndexOf(".");
return -1 != lastDot && allFields.contains(field.substring(0, lastDot));
})
.map(field -> (RexNode) context.relBuilder.field(field))
.toList();
if (!duplicatedNestedFields.isEmpty()) {
// This is a workaround to avoid the bug in Calcite:
// In {@link RelBuilder#project_(Iterable, Iterable, Iterable, boolean, Iterable)},
// the check `RexUtil.isIdentity(nodeList, inputRowType)` will pass when the input
// and the output nodeList refer to the same fields, even if the field name list
// is different. As a result, renaming operation will not be applied. This makes
// the logical plan for the flatten command incorrect, where the operation is
// equivalent to renaming the flattened sub-fields. E.g. emp.name -> name.
forceProjectExcept(context.relBuilder, duplicatedNestedFields);
}
}

/**
* Project except with force.
*
* <p>This method is copied from {@link RelBuilder#projectExcept(Iterable)} and modified with the
* force flag in project set to true. It is subject to future changes in Calcite.
*
* @param relBuilder RelBuilder
* @param expressions Expressions to exclude from the project
*/
private static void forceProjectExcept(RelBuilder relBuilder, Iterable<RexNode> expressions) {
List<RexNode> allExpressions = new ArrayList<>(relBuilder.fields());
Set<RexNode> excludeExpressions = new HashSet<>();
for (RexNode excludeExp : expressions) {
if (!excludeExpressions.add(excludeExp)) {
throw new IllegalArgumentException(
"Input list contains duplicates. Expression " + excludeExp + " exists multiple times.");
}
if (!allExpressions.remove(excludeExp)) {
throw new IllegalArgumentException("Expression " + excludeExp.toString() + " not found.");
}
}
relBuilder.project(allExpressions, ImmutableList.of(), true);
}

/**
* Try to remove metadata fields in two cases:
*
* <p>1. It's explicitly specified excluding by force, usually for join or subquery.
*
* <p>2. There is no other project ever visited in the main query
*
* @param context CalcitePlanContext
* @param excludeByForce whether exclude metadata fields by force
*/
private static void tryToRemoveMetaFields(CalcitePlanContext context, boolean excludeByForce) {
if (excludeByForce || !context.isProjectVisited()) {
List<String> originalFields = context.relBuilder.peek().getRowType().getFieldNames();
List<RexNode> metaFieldsRef =
originalFields.stream()
.filter(OpenSearchConstants.METADATAFIELD_TYPE_MAP::containsKey)
.map(metaField -> (RexNode) context.relBuilder.field(metaField))
.toList();
// Remove metadata fields if there is and ensure there are other fields.
if (!metaFieldsRef.isEmpty() && metaFieldsRef.size() != originalFields.size()) {
context.relBuilder.projectExcept(metaFieldsRef);
}
}
}

@Override
public RelNode visitRename(Rename node, CalcitePlanContext context) {
visitChildren(node, context);
Expand Down Expand Up @@ -2594,7 +2400,7 @@ private void buildExpandRelNode(

if (alias != null) {
// Sub-nested fields cannot be removed after renaming the nested field.
tryToRemoveNestedFields(context);
ProjectionUtils.tryToRemoveNestedFields(context);
RexInputRef expandedField = context.relBuilder.field(arrayFieldName);
List<String> names = new ArrayList<>(context.relBuilder.peek().getRowType().getFieldNames());
names.set(expandedField.getIndex(), alias);
Expand Down
Loading
Loading