Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
9214000
2 changes: 1 addition & 1 deletion server/src/main/resources/transport/upper_bounds/9.3.csv
Original file line number Diff line number Diff line change
@@ -1 +1 @@
batched_response_might_include_reduction_failure,9213000
eql_project_routing,9214000
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
*/
package org.elasticsearch.xpack.eql.action;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.TransportVersions;
import org.elasticsearch.action.ActionRequestValidationException;
import org.elasticsearch.action.IndicesRequest;
import org.elasticsearch.action.LegacyActionRequest;
import org.elasticsearch.action.ResolvedIndexExpressions;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.io.stream.StreamInput;
Expand Down Expand Up @@ -44,6 +46,8 @@

public class EqlSearchRequest extends LegacyActionRequest implements IndicesRequest.Replaceable, ToXContent {

private static final TransportVersion EQL_PROJECT_ROUTING = TransportVersion.fromName("eql_project_routing");

public static final long MIN_KEEP_ALIVE = TimeValue.timeValueMinutes(1).millis();
public static final TimeValue DEFAULT_KEEP_ALIVE = TimeValue.timeValueDays(5);
public static final IndicesOptions DEFAULT_INDICES_OPTIONS = IndicesOptions.fromOptions(true, true, true, false);
Expand All @@ -65,6 +69,8 @@ public class EqlSearchRequest extends LegacyActionRequest implements IndicesRequ
private int maxSamplesPerKey = RequestDefaults.MAX_SAMPLES_PER_KEY;
private Boolean allowPartialSearchResults;
private Boolean allowPartialSequenceResults;
private String projectRouting;
private ResolvedIndexExpressions resolvedIndexExpressions;

// Async settings
private TimeValue waitForCompletionTimeout = null;
Expand Down Expand Up @@ -140,6 +146,19 @@ public EqlSearchRequest(StreamInput in) throws IOException {
allowPartialSearchResults = false;
allowPartialSequenceResults = false;
}
if (in.getTransportVersion().supports(EQL_PROJECT_ROUTING)) {
projectRouting = in.readOptionalString();
resolvedIndexExpressions = in.readOptionalWriteable(ResolvedIndexExpressions::new);
}
}

public String projectRouting() {
return projectRouting;
}

public EqlSearchRequest projectRouting(String projectRouting) {
this.projectRouting = projectRouting;
return this;
}

@Override
Expand Down Expand Up @@ -297,6 +316,21 @@ public EqlSearchRequest indices(String... indices) {
return this;
}

@Override
public boolean allowsCrossProject() {
return true;
}

@Override
public void setResolvedIndexExpressions(ResolvedIndexExpressions expressions) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's still not clear if we really need resolved expressions, probably we can avoid this.

this.resolvedIndexExpressions = expressions;
}

@Override
public ResolvedIndexExpressions getResolvedIndexExpressions() {
return resolvedIndexExpressions;
}

public QueryBuilder filter() {
return this.filter;
}
Expand Down Expand Up @@ -495,6 +529,10 @@ public void writeTo(StreamOutput out) throws IOException {
out.writeOptionalBoolean(allowPartialSearchResults);
out.writeOptionalBoolean(allowPartialSequenceResults);
}
if (out.getTransportVersion().supports(EQL_PROJECT_ROUTING)) {
out.writeOptionalString(projectRouting);
out.writeOptionalWriteable(resolvedIndexExpressions);
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,18 +53,21 @@ public List<Route> routes() {

@Override
protected RestChannelConsumer prepareRequest(RestRequest request, NodeClient client) throws IOException {
if (settings != null && settings.getAsBoolean("serverless.cross_project.enabled", false)) {
// accept but drop project_routing param until fully supported
request.param("project_routing");
}

final boolean crossProjectEnabled = settings != null && settings.getAsBoolean("serverless.cross_project.enabled", false);
EqlSearchRequest eqlRequest;
String indices;
try (XContentParser parser = request.contentOrSourceParamParser()) {
eqlRequest = EqlSearchRequest.fromXContent(parser);
indices = request.param("index");
eqlRequest.indices(Strings.splitStringByCommaToArray(indices));
eqlRequest.indicesOptions(IndicesOptions.fromRequest(request, eqlRequest.indicesOptions()));
IndicesOptions indicesOptions = IndicesOptions.fromRequest(request, eqlRequest.indicesOptions());
if (crossProjectEnabled) {
indicesOptions = IndicesOptions.builder(indicesOptions)
.crossProjectModeOptions(new IndicesOptions.CrossProjectModeOptions(true))
.build();
eqlRequest.projectRouting(request.param("project_routing"));
}
eqlRequest.indicesOptions(indicesOptions);
if (request.hasParam("wait_for_completion_timeout")) {
eqlRequest.waitForCompletionTimeout(
request.paramAsTime("wait_for_completion_timeout", eqlRequest.waitForCompletionTimeout())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ public static void operation(
request.indicesOptions()
);
Set<String> clusterAliases = remoteClusterRegistry.clusterAliases(request.indices(), false);
if (canMinimizeRountrips(request, clusterAliases)) {
if (canMinimizeRountrips(request, clusterAliases, transportService.getRemoteClusterService().crossProjectEnabled())) {
String clusterAlias = clusterAliases.iterator().next();
String[] remoteIndices = new String[request.indices().length];
for (int i = 0; i < request.indices().length; i++) {
Expand Down Expand Up @@ -239,6 +239,7 @@ public static void operation(
request.allowPartialSequenceResults() == null
? defaultAllowPartialSequenceResults(clusterService)
: request.allowPartialSequenceResults(),
request.projectRouting(),
clientId,
new TaskId(nodeId, task.getId()),
task
Expand Down Expand Up @@ -288,7 +289,10 @@ private static boolean requestIsAsync(EqlSearchRequest request) {
}

// can the request be proxied to the remote cluster?
private static boolean canMinimizeRountrips(EqlSearchRequest request, Set<String> clusterAliases) {
private static boolean canMinimizeRountrips(EqlSearchRequest request, Set<String> clusterAliases, boolean crossProjectEnabled) {
if (crossProjectEnabled) {
return false;
}
// Has minimizing the round trips been (explicitly) disabled?
if (request.ccsMinimizeRoundtrips() == false) {
return false;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public class EqlConfiguration extends org.elasticsearch.xpack.ql.session.Configu
private final int maxSamplesPerKey;
private final boolean allowPartialSearchResults;
private final boolean allowPartialSequenceResults;
private final String projectRouting;
private final boolean crossProjectEnabled;

@Nullable
private final QueryBuilder filter;
Expand All @@ -54,9 +56,52 @@ public EqlConfiguration(
int maxSamplesPerKey,
boolean allowPartialSearchResults,
boolean allowPartialSequenceResults,
String projectRouting,
String clientId,
TaskId taskId,
EqlSearchTask task
) {
this(
indices,
zi,
username,
clusterName,
filter,
runtimeMappings,
fetchFields,
requestTimeout,
indicesOptions,
fetchSize,
maxSamplesPerKey,
allowPartialSearchResults,
allowPartialSequenceResults,
projectRouting,
clientId,
taskId,
task,
false
);
}

public EqlConfiguration(
String[] indices,
ZoneId zi,
String username,
String clusterName,
QueryBuilder filter,
Map<String, Object> runtimeMappings,
List<FieldAndFormat> fetchFields,
TimeValue requestTimeout,
IndicesOptions indicesOptions,
int fetchSize,
int maxSamplesPerKey,
boolean allowPartialSearchResults,
boolean allowPartialSequenceResults,
String projectRouting,
String clientId,
TaskId taskId,
EqlSearchTask task,
boolean crossProjectEnabled
) {
super(zi, username, clusterName);

Expand All @@ -73,6 +118,16 @@ public EqlConfiguration(
this.maxSamplesPerKey = maxSamplesPerKey;
this.allowPartialSearchResults = allowPartialSearchResults;
this.allowPartialSequenceResults = allowPartialSequenceResults;
this.projectRouting = projectRouting;
this.crossProjectEnabled = crossProjectEnabled;
}

public boolean crossProjectEnabled() {
return crossProjectEnabled;
}

public String projectRouting() {
return projectRouting;
}

public String[] indices() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@
package org.elasticsearch.xpack.eql.session;

import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.IndicesOptions;
import org.elasticsearch.client.internal.Client;
import org.elasticsearch.client.internal.ParentTaskAssigningClient;
import org.elasticsearch.common.breaker.CircuitBreaker;
import org.elasticsearch.search.crossproject.CrossProjectIndexResolutionValidator;
import org.elasticsearch.tasks.TaskCancelledException;
import org.elasticsearch.xpack.eql.analysis.Analyzer;
import org.elasticsearch.xpack.eql.analysis.AnalyzerContext;
Expand Down Expand Up @@ -122,10 +124,15 @@ private <T> void preAnalyze(LogicalPlan parsed, ActionListener<LogicalPlan> list
return;
}
Set<String> fieldNames = fieldNames(parsed);
IndicesOptions indicesOptions = configuration.indicesOptions();
if (configuration.crossProjectEnabled()) {
indicesOptions = CrossProjectIndexResolutionValidator.indicesOptionsForCrossProjectFanout(indicesOptions);
}
// TODO pass configuration.projectRouting();
indexResolver.resolveAsMergedMapping(
indexWildcard,
fieldNames,
configuration.indicesOptions(),
indicesOptions,
configuration.runtimeMappings(),
map(listener, r -> preAnalyzer.preAnalyze(parsed, r))
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ private EqlTestUtils() {}
1,
false,
true,
null,
"",
new TaskId("test", 123),
null
Expand All @@ -73,6 +74,7 @@ public static EqlConfiguration randomConfiguration() {
randomIntBetween(1, 1000),
randomBoolean(),
randomBoolean(),
null,
randomAlphaOfLength(16),
new TaskId(randomAlphaOfLength(10), randomNonNegativeLong()),
randomTask()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ public void testQueryFilterUsedInPitAndSearches() {
1,
randomBoolean(),
randomBoolean(),
null,
"",
new TaskId("test", 123),
new EqlSearchTask(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,6 +352,7 @@ private QueryClient buildQueryClient(ESMockClient esClient, CircuitBreaker eqlCi
1,
randomBoolean(),
randomBoolean(),
null,
"",
new TaskId("test", 123),
new EqlSearchTask(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ public void testHandlingPitFailure() {
1,
randomBoolean(),
randomBoolean(),
null,
"",
new TaskId("test", 123),
new EqlSearchTask(
Expand Down