Skip to content

Commit fccadb1

Browse files
committed
PIT relocation POC
[CI] Auto commit changes from spotless
1 parent a2eeea4 commit fccadb1

File tree

12 files changed

+238
-42
lines changed

12 files changed

+238
-42
lines changed

server/src/main/java/org/elasticsearch/action/ResolvedIndices.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,10 @@ public class ResolvedIndices {
6161
this(remoteClusterIndices, localIndices, localIndexMetadata, null);
6262
}
6363

64+
public String toString() {
65+
return "ResolvedIndices{" + "local=" + localIndices + ", remote=" + remoteClusterIndices + '}';
66+
}
67+
6468
/**
6569
* Get the remote cluster indices, structured as a map where the key is the remote cluster alias.
6670
* <br/>

server/src/main/java/org/elasticsearch/action/search/ClearScrollController.java

Lines changed: 35 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -8,20 +8,27 @@
88
*/
99
package org.elasticsearch.action.search;
1010

11+
import org.apache.logging.log4j.LogManager;
1112
import org.apache.logging.log4j.Logger;
1213
import org.elasticsearch.action.ActionListener;
1314
import org.elasticsearch.action.support.RefCountingRunnable;
1415
import org.elasticsearch.cluster.node.DiscoveryNode;
1516
import org.elasticsearch.cluster.node.DiscoveryNodes;
17+
import org.elasticsearch.cluster.project.ProjectResolver;
18+
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
19+
import org.elasticsearch.cluster.routing.ShardRouting;
20+
import org.elasticsearch.cluster.service.ClusterService;
1621
import org.elasticsearch.common.Strings;
1722
import org.elasticsearch.common.util.concurrent.ListenableFuture;
23+
import org.elasticsearch.index.shard.ShardId;
1824
import org.elasticsearch.transport.Transport;
1925
import org.elasticsearch.transport.TransportResponse;
2026

2127
import java.util.ArrayList;
22-
import java.util.Collection;
2328
import java.util.Collections;
2429
import java.util.List;
30+
import java.util.Map;
31+
import java.util.Map.Entry;
2532
import java.util.Set;
2633
import java.util.concurrent.atomic.AtomicBoolean;
2734
import java.util.concurrent.atomic.AtomicInteger;
@@ -38,6 +45,7 @@ public final class ClearScrollController implements Runnable {
3845
private final AtomicBoolean hasFailed = new AtomicBoolean(false);
3946
private final AtomicInteger freedSearchContexts = new AtomicInteger(0);
4047
private final Logger logger;
48+
private static final Logger staticLogger = LogManager.getLogger(ClearScrollController.class);
4149
private final Runnable runner;
4250

4351
ClearScrollController(
@@ -148,12 +156,15 @@ private void finish() {
148156
* Closes the given context id and reports the number of freed contexts via the listener
149157
*/
150158
public static void closeContexts(
151-
DiscoveryNodes nodes,
159+
ClusterService clusterService,
160+
ProjectResolver projectResolver,
152161
SearchTransportService searchTransportService,
153-
Collection<SearchContextIdForNode> contextIds,
162+
Map<ShardId, SearchContextIdForNode> shards,
154163
ActionListener<Integer> listener
155164
) {
156-
final Set<String> clusters = contextIds.stream()
165+
DiscoveryNodes nodes = clusterService.state().nodes();
166+
final Set<String> clusters = shards.values()
167+
.stream()
157168
.map(SearchContextIdForNode::getClusterAlias)
158169
.filter(clusterAlias -> Strings.isEmpty(clusterAlias) == false)
159170
.collect(Collectors.toSet());
@@ -166,16 +177,34 @@ public static void closeContexts(
166177
lookupListener.addListener(listener.delegateFailure((l, nodeLookup) -> {
167178
final var successes = new AtomicInteger();
168179
try (RefCountingRunnable refs = new RefCountingRunnable(() -> l.onResponse(successes.get()))) {
169-
for (SearchContextIdForNode contextId : contextIds) {
180+
for (Entry<ShardId, SearchContextIdForNode> entry : shards.entrySet()) {
181+
var contextId = entry.getValue();
170182
if (contextId.getNode() == null) {
171183
// the shard was missing when creating the PIT, ignore.
172184
continue;
173185
}
174186
final DiscoveryNode node = nodeLookup.apply(contextId.getClusterAlias(), contextId.getNode());
187+
188+
Set<DiscoveryNode> targetNodes;
175189
if (node != null) {
190+
targetNodes = Collections.singleton(node);
191+
} else {
192+
staticLogger.info("---> missing node when closing context: " + contextId.getNode());
193+
// TODO we won't be able to use this with remote clusters
194+
IndexShardRoutingTable indexShardRoutingTable = clusterService.state()
195+
.routingTable(projectResolver.getProjectId())
196+
.shardRoutingTable(entry.getKey());
197+
targetNodes = indexShardRoutingTable.assignedUnpromotableShards()
198+
.stream()
199+
.map(ShardRouting::currentNodeId)
200+
.map(nodeId -> nodeLookup.apply(contextId.getClusterAlias(), nodeId))
201+
.collect(Collectors.toSet());
202+
staticLogger.info("---> trying alternative nodes to close context: " + targetNodes);
203+
}
204+
for (DiscoveryNode targetNode : targetNodes) {
176205
try {
177206
searchTransportService.sendFreeContext(
178-
searchTransportService.getConnection(contextId.getClusterAlias(), node),
207+
searchTransportService.getConnection(contextId.getClusterAlias(), targetNode),
179208
contextId.getSearchContextId(),
180209
refs.acquireListener().map(r -> {
181210
if (r.isFreed()) {
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.action.search;
11+
12+
import org.elasticsearch.TransportVersion;
13+
import org.elasticsearch.common.bytes.BytesArray;
14+
import org.elasticsearch.common.bytes.BytesReference;
15+
import org.elasticsearch.common.util.Maps;
16+
import org.elasticsearch.index.shard.ShardId;
17+
18+
import java.io.IOException;
19+
import java.util.Base64;
20+
import java.util.Collections;
21+
import java.util.Map;
22+
23+
public class PITHelper {
24+
25+
public static SearchContextId decodePITId(String id) throws IOException {
26+
return decodePITId(new BytesArray(Base64.getUrlDecoder().decode(id)));
27+
}
28+
29+
public static SearchContextId decodePITId(BytesReference id) throws IOException {
30+
try (var in = id.streamInput()) {
31+
final TransportVersion version = TransportVersion.readVersion(in);
32+
in.setTransportVersion(version);
33+
final Map<ShardId, SearchContextIdForNode> shards = Collections.unmodifiableMap(
34+
in.readCollection(Maps::newHashMapWithExpectedSize, (i, map) -> map.put(new ShardId(in), new SearchContextIdForNode(in)))
35+
);
36+
return new SearchContextId(shards, Collections.emptyMap());
37+
} catch (IOException e) {
38+
assert false : e;
39+
throw new IllegalArgumentException(e);
40+
}
41+
}
42+
}

server/src/main/java/org/elasticsearch/action/search/TransportClosePointInTimeAction.java

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,45 +13,51 @@
1313
import org.elasticsearch.action.ActionType;
1414
import org.elasticsearch.action.support.ActionFilters;
1515
import org.elasticsearch.action.support.HandledTransportAction;
16+
import org.elasticsearch.cluster.project.ProjectResolver;
1617
import org.elasticsearch.cluster.service.ClusterService;
1718
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
1819
import org.elasticsearch.common.util.concurrent.EsExecutors;
20+
import org.elasticsearch.index.shard.ShardId;
1921
import org.elasticsearch.injection.guice.Inject;
2022
import org.elasticsearch.tasks.Task;
2123
import org.elasticsearch.transport.TransportService;
2224

23-
import java.util.Collection;
25+
import java.util.Map;
2426

2527
public class TransportClosePointInTimeAction extends HandledTransportAction<ClosePointInTimeRequest, ClosePointInTimeResponse> {
2628

2729
public static final ActionType<ClosePointInTimeResponse> TYPE = new ActionType<>("indices:data/read/close_point_in_time");
2830
private final ClusterService clusterService;
2931
private final SearchTransportService searchTransportService;
3032
private final NamedWriteableRegistry namedWriteableRegistry;
33+
private final ProjectResolver projectResolver;
3134

3235
@Inject
3336
public TransportClosePointInTimeAction(
3437
TransportService transportService,
3538
ClusterService clusterService,
3639
ActionFilters actionFilters,
3740
SearchTransportService searchTransportService,
38-
NamedWriteableRegistry namedWriteableRegistry
41+
NamedWriteableRegistry namedWriteableRegistry,
42+
ProjectResolver projectResolver
3943
) {
4044
super(TYPE.name(), transportService, actionFilters, ClosePointInTimeRequest::new, EsExecutors.DIRECT_EXECUTOR_SERVICE);
4145
this.clusterService = clusterService;
4246
this.searchTransportService = searchTransportService;
4347
this.namedWriteableRegistry = namedWriteableRegistry;
48+
this.projectResolver = projectResolver;
4449
}
4550

4651
@Override
4752
protected void doExecute(Task task, ClosePointInTimeRequest request, ActionListener<ClosePointInTimeResponse> listener) {
4853
final SearchContextId searchContextId = SearchContextId.decode(namedWriteableRegistry, request.getId());
49-
final Collection<SearchContextIdForNode> contextIds = searchContextId.shards().values();
54+
Map<ShardId, SearchContextIdForNode> shards = searchContextId.shards();
5055
ClearScrollController.closeContexts(
51-
clusterService.state().nodes(),
56+
clusterService,
57+
projectResolver,
5258
searchTransportService,
53-
contextIds,
54-
listener.map(freed -> new ClosePointInTimeResponse(freed == contextIds.size(), freed))
59+
shards,
60+
listener.map(freed -> new ClosePointInTimeResponse(freed == shards.size(), freed))
5561
);
5662
}
5763
}

server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -371,6 +371,7 @@ void executeRequest(
371371
);
372372
frozenIndexCheck(resolvedIndices);
373373
}
374+
logger.info("Executing search request on node [{}] with indices [{}]", clusterService.getNodeName(), resolvedIndices);
374375

375376
ActionListener<SearchRequest> rewriteListener = listener.delegateFailureAndWrap((delegate, rewritten) -> {
376377
if (ccsCheckCompatibility) {
@@ -1313,6 +1314,7 @@ private void executeSearch(
13131314
SearchResponse.Clusters clusters,
13141315
SearchPhaseProvider searchPhaseProvider
13151316
) {
1317+
logger.info("Executing search locally.");
13161318
if (searchRequest.allowPartialSearchResults() == null) {
13171319
// No user preference defined in search request - apply cluster service default
13181320
searchRequest.allowPartialSearchResults(searchService.defaultAllowPartialSearchResults());
@@ -1905,11 +1907,9 @@ static List<SearchShardIterator> getLocalShardsIteratorFromPointInTime(
19051907
if (projectState.cluster().nodes().nodeExists(perNode.getNode())) {
19061908
targetNodes.add(perNode.getNode());
19071909
}
1908-
if (perNode.getSearchContextId().getSearcherId() != null) {
1909-
for (ShardRouting shard : shards) {
1910-
if (shard.currentNodeId().equals(perNode.getNode()) == false) {
1911-
targetNodes.add(shard.currentNodeId());
1912-
}
1910+
for (ShardRouting shard : shards) {
1911+
if (shard.currentNodeId().equals(perNode.getNode()) == false) {
1912+
targetNodes.add(shard.currentNodeId());
19131913
}
19141914
}
19151915
} catch (IndexNotFoundException | ShardNotFoundException e) {

server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ public ReadOnlyEngine(
152152
* as the search id because the commit id changes whenever IndexWriter#commit is called although the segment files stay unchanged.
153153
* Any recovery except the local recovery performs IndexWriter#commit to generate a new translog uuid or history_uuid.
154154
*/
155-
static String generateSearcherId(SegmentInfos sis) {
155+
public static String generateSearcherId(SegmentInfos sis) {
156156
final MessageDigest md = MessageDigests.sha256();
157157
for (SegmentCommitInfo si : sis) {
158158
final byte[] segmentId = si.getId();

server/src/main/java/org/elasticsearch/search/SearchContextMissingException.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@ public class SearchContextMissingException extends ElasticsearchException {
2222
private final ShardSearchContextId contextId;
2323

2424
public SearchContextMissingException(ShardSearchContextId contextId) {
25-
super("No search context found for id [" + contextId.getId() + "]");
25+
super("No search context found for id [" + contextId.getSessionId() + "/" + contextId.getId() + "]");
2626
this.contextId = contextId;
2727
}
2828

0 commit comments

Comments
 (0)