Skip to content

Commit 58fe751

Browse files
authored
Improve PIT context relocation (#135231)
This change improves point-in-time (PIT) retries in cases where we loose a PIT context due to e.g. node shutdowns or restarts. We already retry open PITs in the case for read-only and frozen indices (via FrozenEngine / ReadOnlyEngine). However, currently we are re-creating the contexts for every subsequent search and don’t store the new context in a similar way as when opening a new PIT. With this enhancement we avoid this extra work by making the following changes: * add a secondary data structure alongside the map of ReaderContexts in SearchService. This secondary map holds mappings from PIT ids (ShardSearchContextId) to their new location in the map of “activerReaders” * update the PIT id we return with every search request in when the results we receive on the coordinating node show that the nodes involved in answering the PIT request have been changes. The later is possible because the in the original PIT design already provided hooks for changing the PIT id between search request and we already document that PIT ids can change between requests. We currently still guard this new behaviour by a feature flag (”pit_relocation_feature”) for testing with some additional changes planned for Serverless.
1 parent c92eef8 commit 58fe751

File tree

18 files changed

+888
-139
lines changed

18 files changed

+888
-139
lines changed

docs/changelog/135231.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 135231
2+
summary: Improve retrying PIT contexts for read-only indices
3+
area: Search
4+
type: enhancement
5+
issues: []

server/src/internalClusterTest/java/org/elasticsearch/search/scroll/SearchScrollIT.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,12 @@
1111

1212
import org.elasticsearch.ExceptionsHelper;
1313
import org.elasticsearch.action.search.ClearScrollResponse;
14+
import org.elasticsearch.action.search.ParsedScrollId;
1415
import org.elasticsearch.action.search.SearchPhaseExecutionException;
1516
import org.elasticsearch.action.search.SearchRequestBuilder;
1617
import org.elasticsearch.action.search.SearchResponse;
18+
import org.elasticsearch.action.search.SearchScrollRequestBuilder;
1719
import org.elasticsearch.action.search.SearchType;
18-
import org.elasticsearch.action.search.ShardSearchFailure;
1920
import org.elasticsearch.cluster.metadata.IndexMetadata;
2021
import org.elasticsearch.common.Priority;
2122
import org.elasticsearch.common.bytes.BytesReference;
@@ -28,6 +29,7 @@
2829
import org.elasticsearch.index.query.RangeQueryBuilder;
2930
import org.elasticsearch.rest.RestStatus;
3031
import org.elasticsearch.search.SearchHit;
32+
import org.elasticsearch.search.internal.ShardSearchContextId;
3133
import org.elasticsearch.search.sort.FieldSortBuilder;
3234
import org.elasticsearch.search.sort.SortOrder;
3335
import org.elasticsearch.test.ESIntegTestCase;
@@ -703,13 +705,15 @@ public void testRestartDataNodesDuringScrollSearch() throws Exception {
703705
} finally {
704706
respFromProdIndex.decRef();
705707
}
706-
SearchPhaseExecutionException error = expectThrows(
707-
SearchPhaseExecutionException.class,
708-
client().prepareSearchScroll(respFromDemoIndexScrollId)
708+
SearchScrollRequestBuilder searchScrollRequestBuilder = client().prepareSearchScroll(respFromDemoIndexScrollId);
709+
SearchPhaseExecutionException error = expectThrows(SearchPhaseExecutionException.class, searchScrollRequestBuilder);
710+
assertEquals(1, error.shardFailures().length);
711+
ParsedScrollId parsedScrollId = searchScrollRequestBuilder.request().parseScrollId();
712+
ShardSearchContextId shardSearchContextId = parsedScrollId.getContext()[0].getSearchContextId();
713+
assertThat(
714+
error.shardFailures()[0].getCause().getMessage(),
715+
containsString("No search context found for id [" + shardSearchContextId + "]")
709716
);
710-
for (ShardSearchFailure shardSearchFailure : error.shardFailures()) {
711-
assertThat(shardSearchFailure.getCause().getMessage(), containsString("No search context found for id [1]"));
712-
}
713717
client().prepareSearchScroll(respFromProdIndexScrollId).get().decRef();
714718
}
715719

server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java

Lines changed: 93 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.apache.lucene.util.SetOnce;
1414
import org.elasticsearch.ElasticsearchException;
1515
import org.elasticsearch.ExceptionsHelper;
16+
import org.elasticsearch.TransportVersion;
1617
import org.elasticsearch.action.ActionListener;
1718
import org.elasticsearch.action.NoShardAvailableActionException;
1819
import org.elasticsearch.action.OriginalIndices;
@@ -21,6 +22,7 @@
2122
import org.elasticsearch.action.support.SubscribableListener;
2223
import org.elasticsearch.action.support.TransportActions;
2324
import org.elasticsearch.cluster.ClusterState;
25+
import org.elasticsearch.cluster.node.DiscoveryNodes;
2426
import org.elasticsearch.common.bytes.BytesReference;
2527
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
2628
import org.elasticsearch.common.util.Maps;
@@ -30,8 +32,10 @@
3032
import org.elasticsearch.rest.action.search.SearchResponseMetrics;
3133
import org.elasticsearch.search.SearchContextMissingException;
3234
import org.elasticsearch.search.SearchPhaseResult;
35+
import org.elasticsearch.search.SearchService;
3336
import org.elasticsearch.search.SearchShardTarget;
3437
import org.elasticsearch.search.builder.PointInTimeBuilder;
38+
import org.elasticsearch.search.builder.SearchSourceBuilder;
3539
import org.elasticsearch.search.internal.AliasFilter;
3640
import org.elasticsearch.search.internal.SearchContext;
3741
import org.elasticsearch.search.internal.ShardSearchContextId;
@@ -40,6 +44,8 @@
4044

4145
import java.util.ArrayList;
4246
import java.util.Arrays;
47+
import java.util.Collection;
48+
import java.util.HashMap;
4349
import java.util.List;
4450
import java.util.Map;
4551
import java.util.concurrent.ConcurrentHashMap;
@@ -53,6 +59,7 @@
5359
import java.util.function.Supplier;
5460
import java.util.stream.Collectors;
5561

62+
import static org.elasticsearch.action.search.TransportClosePointInTimeAction.closeContexts;
5663
import static org.elasticsearch.core.Strings.format;
5764

5865
/**
@@ -94,11 +101,13 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
94101
private final Map<String, PendingExecutions> pendingExecutionsPerNode;
95102
private final AtomicBoolean requestCancelled = new AtomicBoolean();
96103
private final int skippedCount;
104+
private final TransportVersion mintransportVersion;
97105
protected final SearchResponseMetrics searchResponseMetrics;
98106
protected long phaseStartTimeInNanos;
99107

100108
// protected for tests
101109
protected final SubscribableListener<Void> doneFuture = new SubscribableListener<>();
110+
private final Supplier<DiscoveryNodes> discoveryNodes;
102111

103112
AbstractSearchAsyncAction(
104113
String name,
@@ -153,6 +162,8 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
153162
this.nodeIdToConnection = nodeIdToConnection;
154163
this.concreteIndexBoosts = concreteIndexBoosts;
155164
this.clusterStateVersion = clusterState.version();
165+
this.mintransportVersion = clusterState.getMinTransportVersion();
166+
this.discoveryNodes = clusterState::nodes;
156167
this.aliasFilter = aliasFilter;
157168
this.results = resultConsumer;
158169
// register the release of the query consumer to free up the circuit breaker memory
@@ -422,6 +433,7 @@ protected final void onShardFailure(final int shardIndex, SearchShardTarget shar
422433
onShardGroupFailure(shardIndex, shard, e);
423434
}
424435
if (lastShard == false) {
436+
logger.debug("Retrying shard [{}] with target [{}]", shard.getShardId(), nextShard);
425437
performPhaseOnShard(shardIndex, shardIt, nextShard);
426438
} else {
427439
// count down outstanding shards, we're done with this shard as there's no more copies to try
@@ -613,10 +625,87 @@ public void sendSearchResponse(SearchResponseSections internalSearchResponse, At
613625
}
614626

615627
protected BytesReference buildSearchContextId(ShardSearchFailure[] failures) {
616-
var source = request.source();
617-
return source != null && source.pointInTimeBuilder() != null && source.pointInTimeBuilder().singleSession() == false
618-
? source.pointInTimeBuilder().getEncodedId()
619-
: null;
628+
SearchSourceBuilder source = request.source();
629+
// only (re-)build a search context id if we are running a long-lived point-in-time request
630+
if (source != null && source.pointInTimeBuilder() != null && source.pointInTimeBuilder().singleSession() == false) {
631+
if (SearchService.PIT_RELOCATION_FEATURE_FLAG.isEnabled()) {
632+
// we want to change node ids in the PIT id if any shards and its PIT context have moved
633+
return maybeReEncodeNodeIds(
634+
source.pointInTimeBuilder(),
635+
results.getAtomicArray().asList(),
636+
namedWriteableRegistry,
637+
mintransportVersion,
638+
searchTransportService,
639+
discoveryNodes.get(),
640+
logger
641+
);
642+
} else {
643+
return source.pointInTimeBuilder().getEncodedId();
644+
}
645+
} else {
646+
return null;
647+
}
648+
}
649+
650+
static <Result extends SearchPhaseResult> BytesReference maybeReEncodeNodeIds(
651+
PointInTimeBuilder originalPit,
652+
List<Result> results,
653+
NamedWriteableRegistry namedWriteableRegistry,
654+
TransportVersion mintransportVersion,
655+
SearchTransportService searchTransportService,
656+
DiscoveryNodes nodes,
657+
Logger logger
658+
) {
659+
SearchContextId original = originalPit.getSearchContextId(namedWriteableRegistry);
660+
// only create the following two collections if we detect an id change
661+
Map<ShardId, SearchContextIdForNode> updatedShardMap = null;
662+
Collection<SearchContextIdForNode> contextsToClose = null;
663+
logger.debug("checking search result shards to detect PIT node changes");
664+
for (Result result : results) {
665+
SearchShardTarget searchShardTarget = result.getSearchShardTarget();
666+
ShardId shardId = searchShardTarget.getShardId();
667+
SearchContextIdForNode originalShard = original.shards().get(shardId);
668+
if (originalShard != null && originalShard.getSearchContextId() != null && originalShard.getSearchContextId().isRetryable()) {
669+
// check if the node is different, if so we need to re-encode the PIT
670+
String originalNode = originalShard.getNode();
671+
if (originalNode != null && originalNode.equals(searchShardTarget.getNodeId()) == false) {
672+
// the target node for this shard entry in the PIT has changed, we need to update it
673+
if (updatedShardMap == null) {
674+
// initialize the map with entries from old map to keep ids for shards that have not responded in this results
675+
updatedShardMap = new HashMap<>(original.shards());
676+
contextsToClose = new ArrayList<>();
677+
}
678+
SearchContextIdForNode updatedId = new SearchContextIdForNode(
679+
searchShardTarget.getClusterAlias(),
680+
searchShardTarget.getNodeId(),
681+
result.getContextId()
682+
);
683+
684+
logger.debug("changing node for PIT shard id from [{}] to [{}]", originalShard, updatedId);
685+
updatedShardMap.put(shardId, updatedId);
686+
contextsToClose.add(original.shards().get(shardId));
687+
688+
}
689+
}
690+
}
691+
if (updatedShardMap != null) {
692+
// we free all old contexts that have moved, just in case we have re-tried them elsewhere
693+
// but they still exist in the old location
694+
closeContexts(nodes, searchTransportService, contextsToClose, new ActionListener<Integer>() {
695+
@Override
696+
public void onResponse(Integer integer) {
697+
// ignore
698+
}
699+
700+
@Override
701+
public void onFailure(Exception e) {
702+
logger.trace("Failure while freeing old point in time contexts", e);
703+
}
704+
});
705+
return SearchContextId.encode(updatedShardMap, original.aliasFilter(), mintransportVersion, ShardSearchFailure.EMPTY_ARRAY);
706+
} else {
707+
return originalPit.getEncodedId();
708+
}
620709
}
621710

622711
/**

server/src/main/java/org/elasticsearch/action/search/ClearScrollController.java

Lines changed: 0 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -13,20 +13,14 @@
1313
import org.elasticsearch.action.support.RefCountingRunnable;
1414
import org.elasticsearch.cluster.node.DiscoveryNode;
1515
import org.elasticsearch.cluster.node.DiscoveryNodes;
16-
import org.elasticsearch.common.Strings;
17-
import org.elasticsearch.common.util.concurrent.ListenableFuture;
1816
import org.elasticsearch.transport.Transport;
1917
import org.elasticsearch.transport.TransportResponse;
2018

2119
import java.util.ArrayList;
22-
import java.util.Collection;
2320
import java.util.Collections;
2421
import java.util.List;
25-
import java.util.Set;
2622
import java.util.concurrent.atomic.AtomicBoolean;
2723
import java.util.concurrent.atomic.AtomicInteger;
28-
import java.util.function.BiFunction;
29-
import java.util.stream.Collectors;
3024

3125
import static org.elasticsearch.action.search.TransportSearchHelper.parseScrollId;
3226

@@ -143,53 +137,4 @@ private void onFailedFreedContext(Throwable e, DiscoveryNode node) {
143137
private void finish() {
144138
listener.onResponse(new ClearScrollResponse(hasFailed.get() == false, freedSearchContexts.get()));
145139
}
146-
147-
/**
148-
* Closes the given context id and reports the number of freed contexts via the listener
149-
*/
150-
public static void closeContexts(
151-
DiscoveryNodes nodes,
152-
SearchTransportService searchTransportService,
153-
Collection<SearchContextIdForNode> contextIds,
154-
ActionListener<Integer> listener
155-
) {
156-
final Set<String> clusters = contextIds.stream()
157-
.map(SearchContextIdForNode::getClusterAlias)
158-
.filter(clusterAlias -> Strings.isEmpty(clusterAlias) == false)
159-
.collect(Collectors.toSet());
160-
final ListenableFuture<BiFunction<String, String, DiscoveryNode>> lookupListener = new ListenableFuture<>();
161-
if (clusters.isEmpty()) {
162-
lookupListener.onResponse((cluster, nodeId) -> nodes.get(nodeId));
163-
} else {
164-
searchTransportService.getRemoteClusterService().collectNodes(clusters, lookupListener);
165-
}
166-
lookupListener.addListener(listener.delegateFailure((l, nodeLookup) -> {
167-
final var successes = new AtomicInteger();
168-
try (RefCountingRunnable refs = new RefCountingRunnable(() -> l.onResponse(successes.get()))) {
169-
for (SearchContextIdForNode contextId : contextIds) {
170-
if (contextId.getNode() == null) {
171-
// the shard was missing when creating the PIT, ignore.
172-
continue;
173-
}
174-
final DiscoveryNode node = nodeLookup.apply(contextId.getClusterAlias(), contextId.getNode());
175-
if (node != null) {
176-
try {
177-
searchTransportService.sendFreeContext(
178-
searchTransportService.getConnection(contextId.getClusterAlias(), node),
179-
contextId.getSearchContextId(),
180-
refs.acquireListener().map(r -> {
181-
if (r.isFreed()) {
182-
successes.incrementAndGet();
183-
}
184-
return null;
185-
})
186-
);
187-
} catch (Exception e) {
188-
// ignored
189-
}
190-
}
191-
}
192-
}
193-
}));
194-
}
195140
}

server/src/main/java/org/elasticsearch/action/search/SearchContextId.java

Lines changed: 45 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@
2121
import org.elasticsearch.common.util.Maps;
2222
import org.elasticsearch.index.shard.ShardId;
2323
import org.elasticsearch.search.SearchPhaseResult;
24-
import org.elasticsearch.search.SearchShardTarget;
2524
import org.elasticsearch.search.internal.AliasFilter;
2625
import org.elasticsearch.search.internal.ShardSearchContextId;
2726
import org.elasticsearch.transport.RemoteClusterAware;
@@ -30,6 +29,7 @@
3029
import java.util.Collections;
3130
import java.util.List;
3231
import java.util.Map;
32+
import java.util.Objects;
3333
import java.util.Set;
3434
import java.util.TreeSet;
3535
import java.util.stream.Collectors;
@@ -62,6 +62,26 @@ public static BytesReference encode(
6262
Map<String, AliasFilter> aliasFilter,
6363
TransportVersion version,
6464
ShardSearchFailure[] shardFailures
65+
) {
66+
Map<ShardId, SearchContextIdForNode> shards = searchPhaseResults.stream()
67+
.collect(
68+
Collectors.toMap(
69+
r -> r.getSearchShardTarget().getShardId(),
70+
r -> new SearchContextIdForNode(
71+
r.getSearchShardTarget().getClusterAlias(),
72+
r.getSearchShardTarget().getNodeId(),
73+
r.getContextId()
74+
)
75+
)
76+
);
77+
return encode(shards, aliasFilter, version, shardFailures);
78+
}
79+
80+
static BytesReference encode(
81+
Map<ShardId, SearchContextIdForNode> shards,
82+
Map<String, AliasFilter> aliasFilter,
83+
TransportVersion version,
84+
ShardSearchFailure[] shardFailures
6585
) {
6686
assert shardFailures.length == 0 || version.onOrAfter(TransportVersions.V_8_16_0)
6787
: "[allow_partial_search_results] cannot be enabled on a cluster that has not been fully upgraded to version ["
@@ -71,12 +91,12 @@ public static BytesReference encode(
7191
out.setTransportVersion(version);
7292
TransportVersion.writeVersion(version, out);
7393
boolean allowNullContextId = out.getTransportVersion().onOrAfter(TransportVersions.V_8_16_0);
74-
int shardSize = searchPhaseResults.size() + (allowNullContextId ? shardFailures.length : 0);
94+
int shardSize = shards.size() + (allowNullContextId ? shardFailures.length : 0);
7595
out.writeVInt(shardSize);
76-
for (var searchResult : searchPhaseResults) {
77-
final SearchShardTarget target = searchResult.getSearchShardTarget();
78-
target.getShardId().writeTo(out);
79-
new SearchContextIdForNode(target.getClusterAlias(), target.getNodeId(), searchResult.getContextId()).writeTo(out);
96+
for (ShardId shardId : shards.keySet()) {
97+
shardId.writeTo(out);
98+
SearchContextIdForNode searchContextIdForNode = shards.get(shardId);
99+
searchContextIdForNode.writeTo(out);
80100
}
81101
if (allowNullContextId) {
82102
for (var failure : shardFailures) {
@@ -142,4 +162,23 @@ public String[] getActualIndices() {
142162
}
143163
return indices.toArray(String[]::new);
144164
}
165+
166+
@Override
167+
public boolean equals(Object o) {
168+
if (o == null || getClass() != o.getClass()) return false;
169+
SearchContextId that = (SearchContextId) o;
170+
return Objects.equals(shards, that.shards)
171+
&& Objects.equals(aliasFilter, that.aliasFilter)
172+
&& Objects.equals(contextIds, that.contextIds);
173+
}
174+
175+
@Override
176+
public int hashCode() {
177+
return Objects.hash(shards, aliasFilter, contextIds);
178+
}
179+
180+
@Override
181+
public String toString() {
182+
return "SearchContextId{" + "shards=" + shards + ", aliasFilter=" + aliasFilter + '}';
183+
}
145184
}

0 commit comments

Comments
 (0)