Skip to content

Draft: PIT context relocation on shard relocation #132251

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 11 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@

import org.elasticsearch.ExceptionsHelper;
import org.elasticsearch.action.search.ClearScrollResponse;
import org.elasticsearch.action.search.ParsedScrollId;
import org.elasticsearch.action.search.SearchPhaseExecutionException;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchScrollRequestBuilder;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.search.ShardSearchFailure;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.Priority;
import org.elasticsearch.common.bytes.BytesReference;
Expand Down Expand Up @@ -703,13 +704,15 @@ public void testRestartDataNodesDuringScrollSearch() throws Exception {
} finally {
respFromProdIndex.decRef();
}
SearchPhaseExecutionException error = expectThrows(
SearchPhaseExecutionException.class,
client().prepareSearchScroll(respFromDemoIndexScrollId)
SearchScrollRequestBuilder searchScrollRequestBuilder = client().prepareSearchScroll(respFromDemoIndexScrollId);
SearchPhaseExecutionException error = expectThrows(SearchPhaseExecutionException.class, searchScrollRequestBuilder);
assertEquals(1, error.shardFailures().length);
ParsedScrollId parsedScrollId = searchScrollRequestBuilder.request().parseScrollId();
String sessionId = parsedScrollId.getContext()[0].getSearchContextId().getSessionId();
assertThat(
error.shardFailures()[0].getCause().getMessage(),
containsString("No search context found for id [" + sessionId + "/1]")
);
for (ShardSearchFailure shardSearchFailure : error.shardFailures()) {
assertThat(shardSearchFailure.getCause().getMessage(), containsString("No search context found for id [1]"));
}
client().prepareSearchScroll(respFromProdIndexScrollId).get().decRef();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,20 +8,27 @@
*/
package org.elasticsearch.action.search;

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.elasticsearch.action.ActionListener;
import org.elasticsearch.action.support.RefCountingRunnable;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.ShardRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.Strings;
import org.elasticsearch.common.util.concurrent.ListenableFuture;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.transport.Transport;
import org.elasticsearch.transport.TransportResponse;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
Expand All @@ -38,6 +45,7 @@ public final class ClearScrollController implements Runnable {
private final AtomicBoolean hasFailed = new AtomicBoolean(false);
private final AtomicInteger freedSearchContexts = new AtomicInteger(0);
private final Logger logger;
private static final Logger staticLogger = LogManager.getLogger(ClearScrollController.class);
private final Runnable runner;

ClearScrollController(
Expand Down Expand Up @@ -148,12 +156,15 @@ private void finish() {
* Closes the given context id and reports the number of freed contexts via the listener
*/
public static void closeContexts(
DiscoveryNodes nodes,
ClusterService clusterService,
ProjectResolver projectResolver,
SearchTransportService searchTransportService,
Collection<SearchContextIdForNode> contextIds,
Map<ShardId, SearchContextIdForNode> shards,
ActionListener<Integer> listener
) {
final Set<String> clusters = contextIds.stream()
DiscoveryNodes nodes = clusterService.state().nodes();
final Set<String> clusters = shards.values()
.stream()
.map(SearchContextIdForNode::getClusterAlias)
.filter(clusterAlias -> Strings.isEmpty(clusterAlias) == false)
.collect(Collectors.toSet());
Expand All @@ -166,16 +177,34 @@ public static void closeContexts(
lookupListener.addListener(listener.delegateFailure((l, nodeLookup) -> {
final var successes = new AtomicInteger();
try (RefCountingRunnable refs = new RefCountingRunnable(() -> l.onResponse(successes.get()))) {
for (SearchContextIdForNode contextId : contextIds) {
for (Entry<ShardId, SearchContextIdForNode> entry : shards.entrySet()) {
var contextId = entry.getValue();
if (contextId.getNode() == null) {
// the shard was missing when creating the PIT, ignore.
continue;
}
final DiscoveryNode node = nodeLookup.apply(contextId.getClusterAlias(), contextId.getNode());

Set<DiscoveryNode> targetNodes;
if (node != null) {
targetNodes = Collections.singleton(node);
} else {
staticLogger.info("---> missing node when closing context: " + contextId.getNode());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

note: We need to close the contexts after moving them when the "old" PIT is used, so if the originally encoded node it gone we try all remaining ones that currently hold that shard here (regardless of whether that node also hold a pit context.

// TODO we won't be able to use this with remote clusters
IndexShardRoutingTable indexShardRoutingTable = clusterService.state()
.routingTable(projectResolver.getProjectId())
.shardRoutingTable(entry.getKey());
targetNodes = indexShardRoutingTable.assignedUnpromotableShards()
.stream()
.map(ShardRouting::currentNodeId)
.map(nodeId -> nodeLookup.apply(contextId.getClusterAlias(), nodeId))
.collect(Collectors.toSet());
staticLogger.info("---> trying alternative nodes to close context: " + targetNodes);
}
for (DiscoveryNode targetNode : targetNodes) {
try {
searchTransportService.sendFreeContext(
searchTransportService.getConnection(contextId.getClusterAlias(), node),
searchTransportService.getConnection(contextId.getClusterAlias(), targetNode),
contextId.getSearchContextId(),
refs.acquireListener().map(r -> {
if (r.isFreed()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the "Elastic License
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
* Public License v 1"; you may not use this file except in compliance with, at
* your election, the "Elastic License 2.0", the "GNU Affero General Public
* License v3.0 only", or the "Server Side Public License, v 1".
*/

package org.elasticsearch.action.search;

import org.elasticsearch.TransportVersion;
import org.elasticsearch.common.bytes.BytesArray;
import org.elasticsearch.common.bytes.BytesReference;
import org.elasticsearch.common.util.Maps;
import org.elasticsearch.index.shard.ShardId;

import java.io.IOException;
import java.util.Base64;
import java.util.Collections;
import java.util.Map;

public class PITHelper {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just for debugging atm


public static SearchContextId decodePITId(String id) throws IOException {
return decodePITId(new BytesArray(Base64.getUrlDecoder().decode(id)));
}

public static SearchContextId decodePITId(BytesReference id) throws IOException {
try (var in = id.streamInput()) {
final TransportVersion version = TransportVersion.readVersion(in);
in.setTransportVersion(version);
final Map<ShardId, SearchContextIdForNode> shards = Collections.unmodifiableMap(
in.readCollection(Maps::newHashMapWithExpectedSize, (i, map) -> map.put(new ShardId(in), new SearchContextIdForNode(in)))
);
return new SearchContextId(shards, Collections.emptyMap());
} catch (IOException e) {
assert false : e;
throw new IllegalArgumentException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,45 +13,51 @@
import org.elasticsearch.action.ActionType;
import org.elasticsearch.action.support.ActionFilters;
import org.elasticsearch.action.support.HandledTransportAction;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.injection.guice.Inject;
import org.elasticsearch.tasks.Task;
import org.elasticsearch.transport.TransportService;

import java.util.Collection;
import java.util.Map;

public class TransportClosePointInTimeAction extends HandledTransportAction<ClosePointInTimeRequest, ClosePointInTimeResponse> {

public static final ActionType<ClosePointInTimeResponse> TYPE = new ActionType<>("indices:data/read/close_point_in_time");
private final ClusterService clusterService;
private final SearchTransportService searchTransportService;
private final NamedWriteableRegistry namedWriteableRegistry;
private final ProjectResolver projectResolver;

@Inject
public TransportClosePointInTimeAction(
TransportService transportService,
ClusterService clusterService,
ActionFilters actionFilters,
SearchTransportService searchTransportService,
NamedWriteableRegistry namedWriteableRegistry
NamedWriteableRegistry namedWriteableRegistry,
ProjectResolver projectResolver
) {
super(TYPE.name(), transportService, actionFilters, ClosePointInTimeRequest::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
this.clusterService = clusterService;
this.searchTransportService = searchTransportService;
this.namedWriteableRegistry = namedWriteableRegistry;
this.projectResolver = projectResolver;
}

@Override
protected void doExecute(Task task, ClosePointInTimeRequest request, ActionListener<ClosePointInTimeResponse> listener) {
final SearchContextId searchContextId = SearchContextId.decode(namedWriteableRegistry, request.getId());
final Collection<SearchContextIdForNode> contextIds = searchContextId.shards().values();
Map<ShardId, SearchContextIdForNode> shards = searchContextId.shards();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above changes in ClearScrollController. We also need to pass in the shardIds now so we can retry if the original node is gone.

ClearScrollController.closeContexts(
clusterService.state().nodes(),
clusterService,
projectResolver,
searchTransportService,
contextIds,
listener.map(freed -> new ClosePointInTimeResponse(freed == contextIds.size(), freed))
shards,
listener.map(freed -> new ClosePointInTimeResponse(freed == shards.size(), freed))
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -371,6 +371,7 @@ void executeRequest(
);
frozenIndexCheck(resolvedIndices);
}
logger.info("Executing search request on node [{}] with indices [{}]", clusterService.getNodeName(), resolvedIndices);

ActionListener<SearchRequest> rewriteListener = listener.delegateFailureAndWrap((delegate, rewritten) -> {
if (ccsCheckCompatibility) {
Expand Down Expand Up @@ -1316,6 +1317,7 @@ private void executeSearch(
SearchResponse.Clusters clusters,
SearchPhaseProvider searchPhaseProvider
) {
logger.info("Executing search locally.");
if (searchRequest.allowPartialSearchResults() == null) {
// No user preference defined in search request - apply cluster service default
searchRequest.allowPartialSearchResults(searchService.defaultAllowPartialSearchResults());
Expand Down Expand Up @@ -1905,10 +1907,11 @@ static List<SearchShardIterator> getLocalShardsIteratorFromPointInTime(
try {
final ShardIterator shards = OperationRouting.getShards(projectState.routingTable(), shardId);
// Prefer executing shard requests on nodes that are part of PIT first.
if (projectState.cluster().nodes().nodeExists(perNode.getNode())) {
boolean nodeExists = projectState.cluster().nodes().nodeExists(perNode.getNode());
if (nodeExists) {
targetNodes.add(perNode.getNode());
}
if (perNode.getSearchContextId().getSearcherId() != null) {
if (perNode.getSearchContextId().getSearcherId() != null || nodeExists == false) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is where we re-try other shards when the original PIT node is gone now. Trying every node with a shard copy might be too much on the long run but without a cluster-wide service that keeps track of where the PIT contexts are this might be unavoidable. Needs follow up in terms of altering the PIT id once we found the new node where the PIT context lives now.

for (ShardRouting shard : shards) {
if (shard.currentNodeId().equals(perNode.getNode()) == false) {
targetNodes.add(shard.currentNodeId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ public ReadOnlyEngine(
* as the search id because the commit id changes whenever IndexWriter#commit is called although the segment files stay unchanged.
* Any recovery except the local recovery performs IndexWriter#commit to generate a new translog uuid or history_uuid.
*/
static String generateSearcherId(SegmentInfos sis) {
public static String generateSearcherId(SegmentInfos sis) {
final MessageDigest md = MessageDigests.sha256();
for (SegmentCommitInfo si : sis) {
final byte[] segmentId = si.getId();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public class SearchContextMissingException extends ElasticsearchException {
private final ShardSearchContextId contextId;

public SearchContextMissingException(ShardSearchContextId contextId) {
super("No search context found for id [" + contextId.getId() + "]");
super("No search context found for id [" + contextId.getSessionId() + "/" + contextId.getId() + "]");
this.contextId = contextId;
}

Expand Down
Loading