Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,10 @@ public static IndexResolution invalid(String invalid) {
return new IndexResolution(null, invalid, Set.of(), Map.of());
}

public static IndexResolution empty(String indexPattern) {
return valid(new EsIndex(indexPattern, Map.of(), Map.of()));
}

public static IndexResolution notFound(String name) {
Objects.requireNonNull(name, "name must not be null");
return invalid("Unknown index [" + name + "]");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
package org.elasticsearch.xpack.esql.session;

import org.elasticsearch.ElasticsearchSecurityException;
import org.elasticsearch.ElasticsearchStatusException;
import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.fieldcaps.FieldCapabilitiesFailure;
Expand Down Expand Up @@ -310,53 +309,36 @@ static void updateExecutionInfoAtEndOfPlanning(EsqlExecutionInfo execInfo) {
}

/**
* Checks the index expression for the presence of remote clusters.
* If found, it will ensure that the caller has a valid Enterprise (or Trial) license on the querying cluster
* as well as initialize the corresponding cluster state in execution info.
* @throws org.elasticsearch.ElasticsearchStatusException if the license is not valid (or present) for ES|QL CCS search.
* This inits the cross cluster state in executionInfo using `indicesGrouper`
* when original to resolved index mapping is not available in field caps response
*/
public static void initCrossClusterState(
public static void initCrossClusterInfo(
IndicesExpressionGrouper indicesGrouper,
XPackLicenseState licenseState,
Set<IndexPattern> indexPatterns,
IndexPattern indexPattern,
EsqlExecutionInfo executionInfo
) throws ElasticsearchStatusException {
if (indexPatterns.isEmpty()) {
return;
}
) {
try {
// TODO it is not safe to concat multiple index patterns in case any of them contains exclusion.
// This is going to be resolved in #136804
String[] indexExpressions = indexPatterns.stream()
.map(indexPattern -> Strings.splitStringByCommaToArray(indexPattern.indexPattern()))
.reduce((a, b) -> {
String[] combined = new String[a.length + b.length];
System.arraycopy(a, 0, combined, 0, a.length);
System.arraycopy(b, 0, combined, a.length, b.length);
return combined;
})
.get();
var groupedIndices = indicesGrouper.groupIndices(IndicesOptions.DEFAULT, indexExpressions, false);
var groupedIndices = indicesGrouper.groupIndices(
IndicesOptions.DEFAULT,
Strings.splitStringByCommaToArray(indexPattern.indexPattern()),
false
);

executionInfo.clusterInfoInitializing(true);
// initialize the cluster entries in EsqlExecutionInfo before throwing the invalid license error
// so that the CCS telemetry handler can recognize that this error is CCS-related
try {
for (var entry : groupedIndices.entrySet()) {
final String clusterAlias = entry.getKey();
final String indexExpr = Strings.arrayToCommaDelimitedString(entry.getValue().indices());
groupedIndices.forEach((clusterAlias, indexGroup) -> {
executionInfo.swapCluster(clusterAlias, (k, v) -> {
assert v == null : "No cluster for " + clusterAlias + " should have been added to ExecutionInfo yet";
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not longer the case. We could have 2 distinct views targeting the same remote. In such case, their expressions are going to be concatenated.

return new EsqlExecutionInfo.Cluster(clusterAlias, indexExpr, executionInfo.shouldSkipOnFailure(clusterAlias));
var thisPattern = Strings.arrayToCommaDelimitedString(indexGroup.indices());
var clusterPattern = v == null ? thisPattern : v.getIndexExpression() + "," + thisPattern;
return new EsqlExecutionInfo.Cluster(clusterAlias, clusterPattern, executionInfo.shouldSkipOnFailure(clusterAlias));
});
}
});
} finally {
executionInfo.clusterInfoInitializing(false);
}

if (executionInfo.isCrossClusterSearch() && EsqlLicenseChecker.isCcsAllowed(licenseState) == false) {
throw EsqlLicenseChecker.invalidLicenseForCcsException(licenseState);
}
} catch (NoSuchRemoteClusterException e) {
if (EsqlLicenseChecker.isCcsAllowed(licenseState)) {
throw e;
Expand All @@ -366,6 +348,16 @@ public static void initCrossClusterState(
}
}

/**
* Ensures that the caller has a valid Enterprise (or Trial) license to perform CCS search
* @throws org.elasticsearch.ElasticsearchStatusException if the license is not valid (or present) for ES|QL CCS search.
*/
public static void checkLicense(EsqlExecutionInfo executionInfo, XPackLicenseState licenseState) {
if (executionInfo.isCrossClusterSearch() && EsqlLicenseChecker.isCcsAllowed(licenseState) == false) {
throw EsqlLicenseChecker.invalidLicenseForCcsException(licenseState);
}
}

/**
* Mark cluster with a final status (success or failure).
* Most metrics are set to 0 if not set yet, except for "took" which is set to the total time taken so far.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -544,19 +544,10 @@ private void resolveIndicesAndAnalyze(
PreAnalysisResult result,
ActionListener<Versioned<LogicalPlan>> logicalPlanListener
) {
EsqlCCSUtils.initCrossClusterState(
indicesExpressionGrouper,
verifier.licenseState(),
preAnalysis.indexes().keySet(),
executionInfo
);

SubscribableListener.<PreAnalysisResult>newForked(
// The main index pattern dictates on which nodes the query can be executed, so we use the minimum transport version from this
// field
// caps request.
l -> preAnalyzeMainIndices(preAnalysis.indexes().entrySet().iterator(), preAnalysis, executionInfo, result, requestFilter, l)
).andThenApply(r -> {
EsqlCCSUtils.checkLicense(executionInfo, verifier.licenseState());
if (r.indexResolution.isEmpty() == false // Rule out ROW case with no FROM clauses
&& executionInfo.isCrossClusterSearch()
&& executionInfo.getRunningClusterAliases().findAny().isEmpty()) {
Expand Down Expand Up @@ -844,38 +835,36 @@ private void preAnalyzeMainIndices(
ThreadPool.Names.SEARCH_COORDINATION,
ThreadPool.Names.SYSTEM_READ
);
// TODO: This is not yet index specific, but that will not matter as soon as #136804 is dealt with
if (executionInfo.clusterAliases().isEmpty()) {
// return empty resolution if the expression is pure CCS and resolved no remote clusters (like no-such-cluster*:index)
listener.onResponse(
result.withIndices(indexPattern, IndexResolution.valid(new EsIndex(indexPattern.indexPattern(), Map.of(), Map.of())))
);
} else {
indexResolver.resolveAsMergedMappingAndRetrieveMinimumVersion(
indexPattern.indexPattern(),
result.fieldNames,
// Maybe if no indices are returned, retry without index mode and provide a clearer error message.
switch (indexMode) {
case IndexMode.TIME_SERIES -> {
var indexModeFilter = new TermQueryBuilder(IndexModeFieldMapper.NAME, IndexMode.TIME_SERIES.getName());
yield requestFilter != null
? new BoolQueryBuilder().filter(requestFilter).filter(indexModeFilter)
: indexModeFilter;
}
default -> requestFilter;
},
indexMode == IndexMode.TIME_SERIES,
preAnalysis.useAggregateMetricDoubleWhenNotSupported(),
preAnalysis.useDenseVectorWhenNotSupported(),
listener.delegateFailureAndWrap((l, indexResolution) -> {
EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.inner().failures());
l.onResponse(
result.withIndices(indexPattern, indexResolution.inner())
.withMinimumTransportVersion(indexResolution.minimumVersion())
);
})
);
}
indexResolver.resolve(
indexPattern.indexPattern(),
result.fieldNames,
// Maybe if no indices are returned, retry without index mode and provide a clearer error message.
switch (indexMode) {
case IndexMode.TIME_SERIES -> {
var indexModeFilter = new TermQueryBuilder(IndexModeFieldMapper.NAME, IndexMode.TIME_SERIES.getName());
yield requestFilter != null ? new BoolQueryBuilder().filter(requestFilter).filter(indexModeFilter) : indexModeFilter;
}
default -> requestFilter;
},
indexMode == IndexMode.TIME_SERIES,
preAnalysis.useAggregateMetricDoubleWhenNotSupported(),
preAnalysis.useDenseVectorWhenNotSupported(),
listener.delegateFailureAndWrap((l, indexResolution) -> {
EsqlCCSUtils.initCrossClusterInfo(indicesExpressionGrouper, verifier.licenseState(), indexPattern, executionInfo);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here the state will be populated from field caps resolvedTo data structure if that is available.
We will keep existing approach for responses from older clusters that do not have this information available.

EsqlCCSUtils.updateExecutionInfoWithUnavailableClusters(executionInfo, indexResolution.inner().failures());

// today empty index resolution is not allowed unless it is caused by selecting from empty sets of remotes
if (executionInfo.clusterAliases().isEmpty() && indexResolution.inner().isValid() == false) {
indexResolution = new Versioned<>(IndexResolution.empty(indexPattern.indexPattern()), indexResolution.minimumVersion());
}

// The main index pattern dictates on which nodes the query can be executed,
// so we use the minimum transport version from this field caps request.
l.onResponse(
result.withIndices(indexPattern, indexResolution.inner()).withMinimumTransportVersion(indexResolution.minimumVersion())
);
})
);
}

private void analyzeWithRetry(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ public void resolveAsMergedMapping(
(l, versionedResolution) -> l.onResponse(versionedResolution.inner())
);

resolveAsMergedMappingAndRetrieveMinimumVersion(
resolve(
indexWildcard,
fieldNames,
requestFilter,
Expand All @@ -114,7 +114,7 @@ public void resolveAsMergedMapping(
* Resolves a pattern to one (potentially compound meaning that spawns multiple indices) mapping. Also retrieves the minimum transport
* version available in the cluster (and remotes).
*/
public void resolveAsMergedMappingAndRetrieveMinimumVersion(
public void resolve(
String indexWildcard,
Set<String> fieldNames,
QueryBuilder requestFilter,
Expand Down
Loading