From fccadb1effdbab28a7a58656865028e793bae110 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20B=C3=BCscher?= Date: Wed, 30 Jul 2025 18:13:24 +0200 Subject: [PATCH 01/18] PIT relocation POC [CI] Auto commit changes from spotless --- .../elasticsearch/action/ResolvedIndices.java | 4 + .../action/search/ClearScrollController.java | 41 +++++- .../action/search/PITHelper.java | 42 ++++++ .../TransportClosePointInTimeAction.java | 18 ++- .../action/search/TransportSearchAction.java | 10 +- .../index/engine/ReadOnlyEngine.java | 2 +- .../search/SearchContextMissingException.java | 2 +- .../elasticsearch/search/SearchService.java | 129 ++++++++++++++++-- .../search/internal/ReaderContext.java | 9 ++ .../search/internal/ShardSearchContextId.java | 5 + .../search/TransportSearchActionTests.java | 4 +- .../search/MockSearchService.java | 14 +- 12 files changed, 238 insertions(+), 42 deletions(-) create mode 100644 server/src/main/java/org/elasticsearch/action/search/PITHelper.java diff --git a/server/src/main/java/org/elasticsearch/action/ResolvedIndices.java b/server/src/main/java/org/elasticsearch/action/ResolvedIndices.java index 5bab04188a7a7..a1c472d35532d 100644 --- a/server/src/main/java/org/elasticsearch/action/ResolvedIndices.java +++ b/server/src/main/java/org/elasticsearch/action/ResolvedIndices.java @@ -61,6 +61,10 @@ public class ResolvedIndices { this(remoteClusterIndices, localIndices, localIndexMetadata, null); } + public String toString() { + return "ResolvedIndices{" + "local=" + localIndices + ", remote=" + remoteClusterIndices + '}'; + } + /** * Get the remote cluster indices, structured as a map where the key is the remote cluster alias. *
diff --git a/server/src/main/java/org/elasticsearch/action/search/ClearScrollController.java b/server/src/main/java/org/elasticsearch/action/search/ClearScrollController.java index bbd290a06a7f0..292f630b267fa 100644 --- a/server/src/main/java/org/elasticsearch/action/search/ClearScrollController.java +++ b/server/src/main/java/org/elasticsearch/action/search/ClearScrollController.java @@ -8,20 +8,27 @@ */ package org.elasticsearch.action.search; +import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.support.RefCountingRunnable; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; +import org.elasticsearch.cluster.project.ProjectResolver; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.ShardRouting; +import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.Strings; import org.elasticsearch.common.util.concurrent.ListenableFuture; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.transport.Transport; import org.elasticsearch.transport.TransportResponse; import java.util.ArrayList; -import java.util.Collection; import java.util.Collections; import java.util.List; +import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -38,6 +45,7 @@ public final class ClearScrollController implements Runnable { private final AtomicBoolean hasFailed = new AtomicBoolean(false); private final AtomicInteger freedSearchContexts = new AtomicInteger(0); private final Logger logger; + private static final Logger staticLogger = LogManager.getLogger(ClearScrollController.class); private final Runnable runner; ClearScrollController( @@ -148,12 +156,15 @@ private void finish() { * Closes the given context id and reports the number of freed contexts via the listener */ public static void closeContexts( - DiscoveryNodes nodes, + ClusterService clusterService, + ProjectResolver projectResolver, SearchTransportService searchTransportService, - Collection contextIds, + Map shards, ActionListener listener ) { - final Set clusters = contextIds.stream() + DiscoveryNodes nodes = clusterService.state().nodes(); + final Set clusters = shards.values() + .stream() .map(SearchContextIdForNode::getClusterAlias) .filter(clusterAlias -> Strings.isEmpty(clusterAlias) == false) .collect(Collectors.toSet()); @@ -166,16 +177,34 @@ public static void closeContexts( lookupListener.addListener(listener.delegateFailure((l, nodeLookup) -> { final var successes = new AtomicInteger(); try (RefCountingRunnable refs = new RefCountingRunnable(() -> l.onResponse(successes.get()))) { - for (SearchContextIdForNode contextId : contextIds) { + for (Entry entry : shards.entrySet()) { + var contextId = entry.getValue(); if (contextId.getNode() == null) { // the shard was missing when creating the PIT, ignore. continue; } final DiscoveryNode node = nodeLookup.apply(contextId.getClusterAlias(), contextId.getNode()); + + Set targetNodes; if (node != null) { + targetNodes = Collections.singleton(node); + } else { + staticLogger.info("---> missing node when closing context: " + contextId.getNode()); + // TODO we won't be able to use this with remote clusters + IndexShardRoutingTable indexShardRoutingTable = clusterService.state() + .routingTable(projectResolver.getProjectId()) + .shardRoutingTable(entry.getKey()); + targetNodes = indexShardRoutingTable.assignedUnpromotableShards() + .stream() + .map(ShardRouting::currentNodeId) + .map(nodeId -> nodeLookup.apply(contextId.getClusterAlias(), nodeId)) + .collect(Collectors.toSet()); + staticLogger.info("---> trying alternative nodes to close context: " + targetNodes); + } + for (DiscoveryNode targetNode : targetNodes) { try { searchTransportService.sendFreeContext( - searchTransportService.getConnection(contextId.getClusterAlias(), node), + searchTransportService.getConnection(contextId.getClusterAlias(), targetNode), contextId.getSearchContextId(), refs.acquireListener().map(r -> { if (r.isFreed()) { diff --git a/server/src/main/java/org/elasticsearch/action/search/PITHelper.java b/server/src/main/java/org/elasticsearch/action/search/PITHelper.java new file mode 100644 index 0000000000000..8cc3893fa3c3d --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/search/PITHelper.java @@ -0,0 +1,42 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.search; + +import org.elasticsearch.TransportVersion; +import org.elasticsearch.common.bytes.BytesArray; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.common.util.Maps; +import org.elasticsearch.index.shard.ShardId; + +import java.io.IOException; +import java.util.Base64; +import java.util.Collections; +import java.util.Map; + +public class PITHelper { + + public static SearchContextId decodePITId(String id) throws IOException { + return decodePITId(new BytesArray(Base64.getUrlDecoder().decode(id))); + } + + public static SearchContextId decodePITId(BytesReference id) throws IOException { + try (var in = id.streamInput()) { + final TransportVersion version = TransportVersion.readVersion(in); + in.setTransportVersion(version); + final Map shards = Collections.unmodifiableMap( + in.readCollection(Maps::newHashMapWithExpectedSize, (i, map) -> map.put(new ShardId(in), new SearchContextIdForNode(in))) + ); + return new SearchContextId(shards, Collections.emptyMap()); + } catch (IOException e) { + assert false : e; + throw new IllegalArgumentException(e); + } + } +} diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportClosePointInTimeAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportClosePointInTimeAction.java index 8fc954ca81ebf..1009c6ca2312b 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportClosePointInTimeAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportClosePointInTimeAction.java @@ -13,14 +13,16 @@ import org.elasticsearch.action.ActionType; import org.elasticsearch.action.support.ActionFilters; import org.elasticsearch.action.support.HandledTransportAction; +import org.elasticsearch.cluster.project.ProjectResolver; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; import org.elasticsearch.common.util.concurrent.EsExecutors; +import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.injection.guice.Inject; import org.elasticsearch.tasks.Task; import org.elasticsearch.transport.TransportService; -import java.util.Collection; +import java.util.Map; public class TransportClosePointInTimeAction extends HandledTransportAction { @@ -28,6 +30,7 @@ public class TransportClosePointInTimeAction extends HandledTransportAction listener) { final SearchContextId searchContextId = SearchContextId.decode(namedWriteableRegistry, request.getId()); - final Collection contextIds = searchContextId.shards().values(); + Map shards = searchContextId.shards(); ClearScrollController.closeContexts( - clusterService.state().nodes(), + clusterService, + projectResolver, searchTransportService, - contextIds, - listener.map(freed -> new ClosePointInTimeResponse(freed == contextIds.size(), freed)) + shards, + listener.map(freed -> new ClosePointInTimeResponse(freed == shards.size(), freed)) ); } } diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index bf85075781bc8..f86ea00d68c66 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -371,6 +371,7 @@ void executeRequest( ); frozenIndexCheck(resolvedIndices); } + logger.info("Executing search request on node [{}] with indices [{}]", clusterService.getNodeName(), resolvedIndices); ActionListener rewriteListener = listener.delegateFailureAndWrap((delegate, rewritten) -> { if (ccsCheckCompatibility) { @@ -1313,6 +1314,7 @@ private void executeSearch( SearchResponse.Clusters clusters, SearchPhaseProvider searchPhaseProvider ) { + logger.info("Executing search locally."); if (searchRequest.allowPartialSearchResults() == null) { // No user preference defined in search request - apply cluster service default searchRequest.allowPartialSearchResults(searchService.defaultAllowPartialSearchResults()); @@ -1905,11 +1907,9 @@ static List getLocalShardsIteratorFromPointInTime( if (projectState.cluster().nodes().nodeExists(perNode.getNode())) { targetNodes.add(perNode.getNode()); } - if (perNode.getSearchContextId().getSearcherId() != null) { - for (ShardRouting shard : shards) { - if (shard.currentNodeId().equals(perNode.getNode()) == false) { - targetNodes.add(shard.currentNodeId()); - } + for (ShardRouting shard : shards) { + if (shard.currentNodeId().equals(perNode.getNode()) == false) { + targetNodes.add(shard.currentNodeId()); } } } catch (IndexNotFoundException | ShardNotFoundException e) { diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index 800854dcedb0a..5b38cd03ecac3 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -152,7 +152,7 @@ public ReadOnlyEngine( * as the search id because the commit id changes whenever IndexWriter#commit is called although the segment files stay unchanged. * Any recovery except the local recovery performs IndexWriter#commit to generate a new translog uuid or history_uuid. */ - static String generateSearcherId(SegmentInfos sis) { + public static String generateSearcherId(SegmentInfos sis) { final MessageDigest md = MessageDigests.sha256(); for (SegmentCommitInfo si : sis) { final byte[] segmentId = si.getId(); diff --git a/server/src/main/java/org/elasticsearch/search/SearchContextMissingException.java b/server/src/main/java/org/elasticsearch/search/SearchContextMissingException.java index 3c75cf0e87ff1..c2acd3da3c23c 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchContextMissingException.java +++ b/server/src/main/java/org/elasticsearch/search/SearchContextMissingException.java @@ -22,7 +22,7 @@ public class SearchContextMissingException extends ElasticsearchException { private final ShardSearchContextId contextId; public SearchContextMissingException(ShardSearchContextId contextId) { - super("No search context found for id [" + contextId.getId() + "]"); + super("No search context found for id [" + contextId.getSessionId() + "/" + contextId.getId() + "]"); this.contextId = contextId; } diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 9cca55f2ec748..74b08d6f31d2e 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -12,10 +12,15 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.lucene.index.DirectoryReader; +import org.apache.lucene.index.IndexCommit; +import org.apache.lucene.index.SegmentInfos; +import org.apache.lucene.index.StandardDirectoryReader; import org.apache.lucene.search.FieldDoc; import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.Query; import org.apache.lucene.search.TopDocs; +import org.apache.lucene.store.Directory; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.TransportVersion; @@ -61,7 +66,12 @@ import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; +import org.elasticsearch.index.IndexVersions; import org.elasticsearch.index.engine.Engine; +import org.elasticsearch.index.engine.Engine.Searcher; +import org.elasticsearch.index.engine.Engine.SearcherSupplier; +import org.elasticsearch.index.engine.EngineConfig; +import org.elasticsearch.index.engine.ReadOnlyEngine; import org.elasticsearch.index.query.CoordinatorRewriteContextProvider; import org.elasticsearch.index.query.InnerHitContextBuilder; import org.elasticsearch.index.query.InnerHitsRewriteContext; @@ -146,6 +156,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutionException; @@ -159,6 +170,7 @@ import java.util.function.Function; import java.util.function.LongSupplier; import java.util.function.Supplier; +import java.util.stream.Collectors; import static org.elasticsearch.TransportVersions.ERROR_TRACE_IN_TRANSPORT_HEADER; import static org.elasticsearch.common.Strings.format; @@ -360,7 +372,7 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv private final AtomicLong idGenerator = new AtomicLong(); - private final Map activeReaders = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(); + private final Map activeReaders = ConcurrentCollections.newConcurrentMapWithAggressiveConcurrency(); private final MultiBucketConsumerService multiBucketConsumerService; @@ -539,19 +551,18 @@ public void beforeIndexShardCreated(ShardRouting routing, Settings indexSettings } protected void putReaderContext(ReaderContext context) { - final long id = context.id().getId(); - final ReaderContext previous = activeReaders.put(id, context); + final ReaderContext previous = activeReaders.put(context.readerContextId(), context); assert previous == null; // ensure that if we race against afterIndexRemoved, we remove the context from the active list. // this is important to ensure store can be cleaned up, in particular if the search is a scroll with a long timeout. final Index index = context.indexShard().shardId().getIndex(); if (indicesService.hasIndex(index) == false) { - removeReaderContext(id); + removeReaderContext(context.readerContextId()); throw new IndexNotFoundException(index); } } - protected ReaderContext removeReaderContext(long id) { + protected ReaderContext removeReaderContext(ReaderContextId id) { if (logger.isTraceEnabled()) { logger.trace("removing reader context [{}]", id); } @@ -859,7 +870,7 @@ static boolean isExecutorQueuedBeyondPrewarmingFactor(Executor searchOperationsE private IndexShard getShard(ShardSearchRequest request) { final ShardSearchContextId contextId = request.readerId(); if (contextId != null && sessionId.equals(contextId.getSessionId())) { - final ReaderContext readerContext = activeReaders.get(contextId.getId()); + final ReaderContext readerContext = activeReaders.get(contextId.readerContextId()); if (readerContext != null) { return readerContext.indexShard(); } @@ -1234,10 +1245,10 @@ private ReaderContext findReaderContext(ShardSearchContextId id, TransportReques if (id.getSessionId().isEmpty()) { throw new IllegalArgumentException("Session id must be specified"); } - if (sessionId.equals(id.getSessionId()) == false) { - throw new SearchContextMissingException(id); - } - final ReaderContext reader = activeReaders.get(id.getId()); + // if (sessionId.equals(id.getSessionId()) == false) { + // throw new SearchContextMissingException(id); + // } + final ReaderContext reader = activeReaders.get(id.readerContextId()); if (reader == null) { throw new SearchContextMissingException(id); } @@ -1461,12 +1472,13 @@ private void freeAllContextsForShard(ShardId shardId) { public boolean freeReaderContext(ShardSearchContextId contextId) { logger.trace("freeing reader context [{}]", contextId); - if (sessionId.equals(contextId.getSessionId())) { - try (ReaderContext context = removeReaderContext(contextId.getId())) { - return context != null; - } + // TODO check why sessionId should match here. This might need to be different now with PIT transfers + // if (sessionId.equals(contextId.getSessionId())) { + try (ReaderContext context = removeReaderContext(contextId.readerContextId())) { + return context != null; } - return false; + // } + // return false; } public void freeAllScrollContexts() { @@ -1834,6 +1846,70 @@ public long getDefaultKeepAliveInMillis() { return defaultKeepAlive; } + public List getActiveContexts(ShardId shardId) { + return this.activeReaders.values() + .stream() + .filter(c -> c.singleSession() == false) + .filter(c -> c.indexShard().shardId().equals(shardId)) + .collect(Collectors.toList()); + } + + public void reopenPitContexts(ShardId shardId, String segmentsFileName, long keepAlive, String sessionId, long contextId) { + IndexService indexService = this.indicesService.indexServiceSafe(shardId.getIndex()); + final IndexShard shard = indexService.getShard(shardId.id()); + ReaderContext readerContext = null; + try { + Directory directory = shard.store().directory(); + SegmentInfos segmentCommitInfos = SegmentInfos.readCommit( + directory, + segmentsFileName, + IndexVersions.MINIMUM_READONLY_COMPATIBLE.luceneVersion().major + ); + IndexCommit indexCommit = Lucene.getIndexCommit(segmentCommitInfos, directory); + DirectoryReader open = StandardDirectoryReader.open(indexCommit); + + String searcherId = ReadOnlyEngine.generateSearcherId(segmentCommitInfos); + final ShardSearchContextId shardSearchContextId = new ShardSearchContextId(sessionId, contextId, searcherId); + SearcherSupplier searchSupplier = new Engine.SearcherSupplier(Function.identity()) { + + @Override + protected void doClose() { + // TODO: implement closing logic + } + + @Override + protected Searcher acquireSearcherInternal(String source) { + EngineConfig engineConfig = shard.getEngineOrNull().getEngineConfig(); + + return new Searcher( + "source", + open, + engineConfig.getSimilarity(), + engineConfig.getQueryCache(), + engineConfig.getQueryCachingPolicy(), + () -> {} + ); + } + }; + readerContext = new ReaderContext(shardSearchContextId, indexService, shard, searchSupplier, keepAlive, false); + final ReaderContext finalReaderContext = readerContext; + final SearchOperationListener searchOperationListener = shard.getSearchOperationListener(); + searchOperationListener.onNewReaderContext(finalReaderContext); + if (finalReaderContext.scrollContext() != null) { + searchOperationListener.onNewScrollContext(finalReaderContext); + readerContext.addOnClose(() -> searchOperationListener.onFreeScrollContext(finalReaderContext)); + } + readerContext.addOnClose(() -> searchOperationListener.onFreeReaderContext(finalReaderContext)); + putReaderContext(finalReaderContext); + readerContext = null; + } catch (IOException e) { + throw new RuntimeException(e); + } finally { + // TODO what do we need to close here? + // Releasables.close(searchSupplier, readerContext); + } + } + /** * Used to indicate which result object should be instantiated when creating a search context */ @@ -2179,4 +2255,27 @@ public AggregationReduceContext forFinalReduction() { } }; } + + public static class ReaderContextId { + private final String sessionId; + private final long id; + + public ReaderContextId(String sessionId, long id) { + this.sessionId = Objects.requireNonNull(sessionId); + this.id = id; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + ReaderContextId that = (ReaderContextId) o; + return id == that.id && sessionId.equals(that.sessionId); + } + + @Override + public int hashCode() { + return Objects.hash(sessionId, id); + } + } } diff --git a/server/src/main/java/org/elasticsearch/search/internal/ReaderContext.java b/server/src/main/java/org/elasticsearch/search/internal/ReaderContext.java index c15b604b5b5fc..b0855d77566fe 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/ReaderContext.java +++ b/server/src/main/java/org/elasticsearch/search/internal/ReaderContext.java @@ -17,6 +17,7 @@ import org.elasticsearch.index.engine.Engine; import org.elasticsearch.index.shard.IndexShard; import org.elasticsearch.search.RescoreDocIds; +import org.elasticsearch.search.SearchService.ReaderContextId; import org.elasticsearch.search.dfs.AggregatedDfs; import org.elasticsearch.transport.TransportRequest; @@ -100,6 +101,10 @@ public ShardSearchContextId id() { return id; } + public ReaderContextId readerContextId() { + return id.readerContextId(); + } + public IndexService indexService() { return indexService; } @@ -116,6 +121,10 @@ private void tryUpdateKeepAlive(long keepAlive) { this.keepAlive.accumulateAndGet(keepAlive, Math::max); } + public long keepAlive() { + return keepAlive.longValue(); + } + /** * Returns a releasable to indicate that the caller has stopped using this reader. * The time to live of the reader after usage can be extended using the provided diff --git a/server/src/main/java/org/elasticsearch/search/internal/ShardSearchContextId.java b/server/src/main/java/org/elasticsearch/search/internal/ShardSearchContextId.java index 8a12d588b548f..857927d1bfdd7 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/ShardSearchContextId.java +++ b/server/src/main/java/org/elasticsearch/search/internal/ShardSearchContextId.java @@ -12,6 +12,7 @@ import org.elasticsearch.common.io.stream.StreamInput; import org.elasticsearch.common.io.stream.StreamOutput; import org.elasticsearch.common.io.stream.Writeable; +import org.elasticsearch.search.SearchService.ReaderContextId; import java.io.IOException; import java.util.Objects; @@ -58,6 +59,10 @@ public String getSearcherId() { return searcherId; } + public ReaderContextId readerContextId() { + return new ReaderContextId(sessionId, id); + } + @Override public boolean equals(Object o) { if (this == o) return true; diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java index 9f286efe28083..4a62780a84253 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java @@ -1704,8 +1704,10 @@ public void testLocalShardIteratorFromPointInTime() { final SearchShardIterator shardIterator = shardIterators.get(id); final SearchContextIdForNode context = contexts.get(shardId); if (context.getSearchContextId().getSearcherId() == null) { - assertThat(shardIterator.getTargetNodeIds(), hasSize(1)); + // TODO fix this broken test + // assertThat(shardIterator.getTargetNodeIds(), hasSize(1)); } else { + // TODO this branch seems never executed by this test. Needs investigation. final List targetNodes = clusterState.routingTable(project) .index(indexMetadata.getIndex()) .shard(id) diff --git a/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java b/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java index 07ffb3ab9a4eb..d1f5a04a4a6fb 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java +++ b/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java @@ -40,7 +40,7 @@ public class MockSearchService extends SearchService { */ public static class TestPlugin extends Plugin {} - private static final Map ACTIVE_SEARCH_CONTEXTS = new ConcurrentHashMap<>(); + private static final Map ACTIVE_SEARCH_CONTEXTS = new ConcurrentHashMap<>(); private Consumer onPutContext = context -> {}; private Consumer onRemoveContext = context -> {}; @@ -51,7 +51,7 @@ public static class TestPlugin extends Plugin {} /** Throw an {@link AssertionError} if there are still in-flight contexts. */ public static void assertNoInFlightContext() { - final Map copy = new HashMap<>(ACTIVE_SEARCH_CONTEXTS); + final Map copy = new HashMap<>(ACTIVE_SEARCH_CONTEXTS); if (copy.isEmpty() == false) { throw new AssertionError( "There are still [" @@ -65,14 +65,14 @@ public static void assertNoInFlightContext() { /** * Add an active search context to the list of tracked contexts. Package private for testing. */ - static void addActiveContext(ReaderContext context) { + static void addActiveContext(ReaderContextId context) { ACTIVE_SEARCH_CONTEXTS.put(context, new RuntimeException(context.toString())); } /** * Clear an active search context from the list of tracked contexts. Package private for testing. */ - static void removeActiveContext(ReaderContext context) { + static void removeActiveContext(ReaderContextId context) { ACTIVE_SEARCH_CONTEXTS.remove(context); } @@ -105,16 +105,16 @@ public MockSearchService( @Override protected void putReaderContext(ReaderContext context) { onPutContext.accept(context); - addActiveContext(context); + addActiveContext(context.readerContextId()); super.putReaderContext(context); } @Override - protected ReaderContext removeReaderContext(long id) { + protected ReaderContext removeReaderContext(ReaderContextId id) { final ReaderContext removed = super.removeReaderContext(id); if (removed != null) { onRemoveContext.accept(removed); - removeActiveContext(removed); + removeActiveContext(removed.readerContextId()); } return removed; } From f037bc0837d3dcb63cb56d410dcf567fcc55050a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20B=C3=BCscher?= Date: Fri, 1 Aug 2025 15:02:12 +0200 Subject: [PATCH 02/18] Iter TransportSearchAction --- .../action/search/TransportSearchAction.java | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index f86ea00d68c66..6f748d840788e 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -1904,12 +1904,15 @@ static List getLocalShardsIteratorFromPointInTime( try { final ShardIterator shards = OperationRouting.getShards(projectState.routingTable(), shardId); // Prefer executing shard requests on nodes that are part of PIT first. - if (projectState.cluster().nodes().nodeExists(perNode.getNode())) { + boolean nodeExists = projectState.cluster().nodes().nodeExists(perNode.getNode()); + if (nodeExists) { targetNodes.add(perNode.getNode()); } - for (ShardRouting shard : shards) { - if (shard.currentNodeId().equals(perNode.getNode()) == false) { - targetNodes.add(shard.currentNodeId()); + if (perNode.getSearchContextId().getSearcherId() != null || nodeExists == false) { + for (ShardRouting shard : shards) { + if (shard.currentNodeId().equals(perNode.getNode()) == false) { + targetNodes.add(shard.currentNodeId()); + } } } } catch (IndexNotFoundException | ShardNotFoundException e) { From 83a170e728871f932cb43cac52ccae9f1db56d19 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20B=C3=BCscher?= Date: Fri, 1 Aug 2025 15:58:57 +0200 Subject: [PATCH 03/18] Fix test --- .../org/elasticsearch/ElasticsearchExceptionTests.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/ElasticsearchExceptionTests.java b/server/src/test/java/org/elasticsearch/ElasticsearchExceptionTests.java index 2741ac37ce519..8de548ad51f81 100644 --- a/server/src/test/java/org/elasticsearch/ElasticsearchExceptionTests.java +++ b/server/src/test/java/org/elasticsearch/ElasticsearchExceptionTests.java @@ -9,6 +9,7 @@ package org.elasticsearch; +import com.carrotsearch.randomizedtesting.annotations.Seed; import org.apache.lucene.util.Constants; import org.elasticsearch.action.NoShardAvailableActionException; import org.elasticsearch.action.RoutingMissingException; @@ -1188,6 +1189,7 @@ public void testFailureToAndFromXContentWithNoDetailsV8() throws IOException { assertNull(parsedFailure.getCause()); } + public void testFailureToAndFromXContentWithDetails() throws IOException { final XContent xContent = randomFrom(XContentType.values()).xContent(); @@ -1258,6 +1260,7 @@ public void testFailureToAndFromXContentWithDetails() throws IOException { DiscoveryNode node = DiscoveryNodeUtils.create("node_g"); failureCause = new NodeClosedException(node); failureCause = new NoShardAvailableActionException(new ShardId("_index_g", "_uuid_g", 6), "node_g", failureCause); + String sessionId = UUIDs.randomBase64UUID(); ShardSearchFailure[] shardFailures = new ShardSearchFailure[] { new ShardSearchFailure( new ParsingException(0, 0, "Parsing g", null), @@ -1268,7 +1271,7 @@ public void testFailureToAndFromXContentWithDetails() throws IOException { new SearchShardTarget("node_g", new ShardId(new Index("_index_g", "_uuid_g"), 62), null) ), new ShardSearchFailure( - new SearchContextMissingException(new ShardSearchContextId(UUIDs.randomBase64UUID(), 0L)), + new SearchContextMissingException(new ShardSearchContextId(sessionId, 0L)), null ) }; failure = new SearchPhaseExecutionException("phase_g", "G", failureCause, shardFailures); @@ -1293,7 +1296,10 @@ public void testFailureToAndFromXContentWithDetails() throws IOException { ); expected.addSuppressed( new ElasticsearchException( - "Elasticsearch exception [type=search_context_missing_exception, " + "reason=No search context found for id [0]]" + "Elasticsearch exception [type=search_context_missing_exception, " + + "reason=No search context found for id [" + + sessionId + + "/0]]" ) ); } From f5e48878be4c410a3a8959a50c601fbcf7ee15c0 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Fri, 1 Aug 2025 14:11:38 +0000 Subject: [PATCH 04/18] [CI] Auto commit changes from spotless --- .../org/elasticsearch/ElasticsearchExceptionTests.java | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/ElasticsearchExceptionTests.java b/server/src/test/java/org/elasticsearch/ElasticsearchExceptionTests.java index 8de548ad51f81..55fa44352aa29 100644 --- a/server/src/test/java/org/elasticsearch/ElasticsearchExceptionTests.java +++ b/server/src/test/java/org/elasticsearch/ElasticsearchExceptionTests.java @@ -9,7 +9,6 @@ package org.elasticsearch; -import com.carrotsearch.randomizedtesting.annotations.Seed; import org.apache.lucene.util.Constants; import org.elasticsearch.action.NoShardAvailableActionException; import org.elasticsearch.action.RoutingMissingException; @@ -1189,7 +1188,6 @@ public void testFailureToAndFromXContentWithNoDetailsV8() throws IOException { assertNull(parsedFailure.getCause()); } - public void testFailureToAndFromXContentWithDetails() throws IOException { final XContent xContent = randomFrom(XContentType.values()).xContent(); @@ -1270,10 +1268,7 @@ public void testFailureToAndFromXContentWithDetails() throws IOException { new RepositoryException("repository_g", "Repo"), new SearchShardTarget("node_g", new ShardId(new Index("_index_g", "_uuid_g"), 62), null) ), - new ShardSearchFailure( - new SearchContextMissingException(new ShardSearchContextId(sessionId, 0L)), - null - ) }; + new ShardSearchFailure(new SearchContextMissingException(new ShardSearchContextId(sessionId, 0L)), null) }; failure = new SearchPhaseExecutionException("phase_g", "G", failureCause, shardFailures); expectedCause = new ElasticsearchException( "Elasticsearch exception [type=node_closed_exception, " + "reason=node closed " + node + "]" From d5759dcfa4e45943ed4e36842c295726cf7a6935 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20B=C3=BCscher?= Date: Fri, 1 Aug 2025 17:54:41 +0200 Subject: [PATCH 05/18] Don't relocate scroll contexts --- .../main/java/org/elasticsearch/search/SearchService.java | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 74b08d6f31d2e..941702b6e8586 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -1245,9 +1245,9 @@ private ReaderContext findReaderContext(ShardSearchContextId id, TransportReques if (id.getSessionId().isEmpty()) { throw new IllegalArgumentException("Session id must be specified"); } - // if (sessionId.equals(id.getSessionId()) == false) { - // throw new SearchContextMissingException(id); - // } +// if (sessionId.equals(id.getSessionId()) == false) { +// throw new SearchContextMissingException(id); +// } final ReaderContext reader = activeReaders.get(id.readerContextId()); if (reader == null) { throw new SearchContextMissingException(id); @@ -1850,6 +1850,7 @@ public List getActiveContexts(ShardId shardId) { return this.activeReaders.values() .stream() .filter(c -> c.singleSession() == false) + .filter(c -> c.scrollContext() == null) .filter(c -> c.indexShard().shardId().equals(shardId)) .collect(Collectors.toList()); } From b72487c79c88bd722fdbfdf284b96b4d98142ae9 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Fri, 1 Aug 2025 16:03:01 +0000 Subject: [PATCH 06/18] [CI] Auto commit changes from spotless --- .../main/java/org/elasticsearch/search/SearchService.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 941702b6e8586..9004cd9f08e1b 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -1245,9 +1245,9 @@ private ReaderContext findReaderContext(ShardSearchContextId id, TransportReques if (id.getSessionId().isEmpty()) { throw new IllegalArgumentException("Session id must be specified"); } -// if (sessionId.equals(id.getSessionId()) == false) { -// throw new SearchContextMissingException(id); -// } + // if (sessionId.equals(id.getSessionId()) == false) { + // throw new SearchContextMissingException(id); + // } final ReaderContext reader = activeReaders.get(id.readerContextId()); if (reader == null) { throw new SearchContextMissingException(id); From 0d1adda87fb9f03080333765a9d9ce85fff73195 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20B=C3=BCscher?= Date: Fri, 1 Aug 2025 23:09:34 +0200 Subject: [PATCH 07/18] Fix more tests --- .../search/scroll/SearchScrollIT.java | 17 ++++++++++------- .../search/MockSearchServiceTests.java | 10 +++++----- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/scroll/SearchScrollIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/scroll/SearchScrollIT.java index a54e19b839ad3..4e96023b269b6 100644 --- a/server/src/internalClusterTest/java/org/elasticsearch/search/scroll/SearchScrollIT.java +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/scroll/SearchScrollIT.java @@ -11,11 +11,12 @@ import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.action.search.ClearScrollResponse; +import org.elasticsearch.action.search.ParsedScrollId; import org.elasticsearch.action.search.SearchPhaseExecutionException; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; +import org.elasticsearch.action.search.SearchScrollRequestBuilder; import org.elasticsearch.action.search.SearchType; -import org.elasticsearch.action.search.ShardSearchFailure; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.common.Priority; import org.elasticsearch.common.bytes.BytesReference; @@ -703,13 +704,15 @@ public void testRestartDataNodesDuringScrollSearch() throws Exception { } finally { respFromProdIndex.decRef(); } - SearchPhaseExecutionException error = expectThrows( - SearchPhaseExecutionException.class, - client().prepareSearchScroll(respFromDemoIndexScrollId) + SearchScrollRequestBuilder searchScrollRequestBuilder = client().prepareSearchScroll(respFromDemoIndexScrollId); + SearchPhaseExecutionException error = expectThrows(SearchPhaseExecutionException.class, searchScrollRequestBuilder); + assertEquals(1, error.shardFailures().length); + ParsedScrollId parsedScrollId = searchScrollRequestBuilder.request().parseScrollId(); + String sessionId = parsedScrollId.getContext()[0].getSearchContextId().getSessionId(); + assertThat( + error.shardFailures()[0].getCause().getMessage(), + containsString("No search context found for id [" + sessionId + "/1]") ); - for (ShardSearchFailure shardSearchFailure : error.shardFailures()) { - assertThat(shardSearchFailure.getCause().getMessage(), containsString("No search context found for id [1]")); - } client().prepareSearchScroll(respFromProdIndexScrollId).get().decRef(); } diff --git a/test/framework/src/test/java/org/elasticsearch/search/MockSearchServiceTests.java b/test/framework/src/test/java/org/elasticsearch/search/MockSearchServiceTests.java index ad475c6b4ca4b..e6b02ff8c922a 100644 --- a/test/framework/src/test/java/org/elasticsearch/search/MockSearchServiceTests.java +++ b/test/framework/src/test/java/org/elasticsearch/search/MockSearchServiceTests.java @@ -9,7 +9,7 @@ package org.elasticsearch.search; -import org.elasticsearch.search.internal.ReaderContext; +import org.elasticsearch.search.SearchService.ReaderContextId; import org.elasticsearch.test.ESTestCase; import static org.mockito.Mockito.mock; @@ -17,10 +17,10 @@ public class MockSearchServiceTests extends ESTestCase { public void testAssertNoInFlightContext() { - ReaderContext reader = mock(ReaderContext.class); - MockSearchService.addActiveContext(reader); + ReaderContextId readerId = mock(ReaderContextId.class); + MockSearchService.addActiveContext(readerId); try { - Throwable e = expectThrows(AssertionError.class, () -> MockSearchService.assertNoInFlightContext()); + Throwable e = expectThrows(AssertionError.class, MockSearchService::assertNoInFlightContext); assertEquals( "There are still [1] in-flight contexts. The first one's creation site is listed as the cause of this exception.", e.getMessage() @@ -29,7 +29,7 @@ public void testAssertNoInFlightContext() { assertEquals(MockSearchService.class.getName(), e.getStackTrace()[0].getClassName()); assertEquals(MockSearchServiceTests.class.getName(), e.getStackTrace()[1].getClassName()); } finally { - MockSearchService.removeActiveContext(reader); + MockSearchService.removeActiveContext(readerId); } } } From 7b02a00131b50f1a613cfd862c59de790cf836c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20B=C3=BCscher?= Date: Fri, 1 Aug 2025 23:52:03 +0200 Subject: [PATCH 08/18] iter --- .../main/java/org/elasticsearch/action/ResolvedIndices.java | 4 ---- .../action/search/TransportSearchActionTests.java | 4 +--- 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/ResolvedIndices.java b/server/src/main/java/org/elasticsearch/action/ResolvedIndices.java index a1c472d35532d..5bab04188a7a7 100644 --- a/server/src/main/java/org/elasticsearch/action/ResolvedIndices.java +++ b/server/src/main/java/org/elasticsearch/action/ResolvedIndices.java @@ -61,10 +61,6 @@ public class ResolvedIndices { this(remoteClusterIndices, localIndices, localIndexMetadata, null); } - public String toString() { - return "ResolvedIndices{" + "local=" + localIndices + ", remote=" + remoteClusterIndices + '}'; - } - /** * Get the remote cluster indices, structured as a map where the key is the remote cluster alias. *
diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java index 4a62780a84253..08b1b5b516681 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java @@ -10,7 +10,6 @@ package org.elasticsearch.action.search; import com.carrotsearch.randomizedtesting.generators.RandomPicks; - import org.apache.lucene.search.TotalHits; import org.apache.lucene.util.SetOnce; import org.elasticsearch.TransportVersion; @@ -1704,8 +1703,7 @@ public void testLocalShardIteratorFromPointInTime() { final SearchShardIterator shardIterator = shardIterators.get(id); final SearchContextIdForNode context = contexts.get(shardId); if (context.getSearchContextId().getSearcherId() == null) { - // TODO fix this broken test - // assertThat(shardIterator.getTargetNodeIds(), hasSize(1)); + assertThat(shardIterator.getTargetNodeIds(), hasSize(1)); } else { // TODO this branch seems never executed by this test. Needs investigation. final List targetNodes = clusterState.routingTable(project) From 015b5d2f148452f60d9bed30a09bb8326df0c8f9 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Fri, 1 Aug 2025 22:29:40 +0000 Subject: [PATCH 09/18] [CI] Auto commit changes from spotless --- .../elasticsearch/action/search/TransportSearchActionTests.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java index 08b1b5b516681..58f21312a1ee7 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java @@ -10,6 +10,7 @@ package org.elasticsearch.action.search; import com.carrotsearch.randomizedtesting.generators.RandomPicks; + import org.apache.lucene.search.TotalHits; import org.apache.lucene.util.SetOnce; import org.elasticsearch.TransportVersion; From 9d41e27024794d72858395ffcb871c217fad7899 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20B=C3=BCscher?= Date: Wed, 27 Aug 2025 13:09:29 +0200 Subject: [PATCH 10/18] Move creation of search supplier to SearchEngine --- .../elasticsearch/search/SearchService.java | 38 ++++++++++++++++--- 1 file changed, 32 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 9004cd9f08e1b..064c8845e2de1 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -552,6 +552,7 @@ public void beforeIndexShardCreated(ShardRouting routing, Settings indexSettings protected void putReaderContext(ReaderContext context) { final ReaderContext previous = activeReaders.put(context.readerContextId(), context); + logger.info("--> adding reader context [{}], current [{}]", context.readerContextId(), activeReaders.size()); assert previous == null; // ensure that if we race against afterIndexRemoved, we remove the context from the active list. // this is important to ensure store can be cleaned up, in particular if the search is a scroll with a long timeout. @@ -566,7 +567,15 @@ protected ReaderContext removeReaderContext(ReaderContextId id) { if (logger.isTraceEnabled()) { logger.trace("removing reader context [{}]", id); } - return activeReaders.remove(id); + ReaderContext remove = activeReaders.remove(id); + logger.info( + "--> node [{}] removing [{}] reader context [{}], current [{}]", + clusterService.localNode().getId(), + remove != null, + id, + activeReaders.size() + ); + return remove; } @Override @@ -1286,18 +1295,30 @@ final ReaderContext createOrGetReaderContext(ShardSearchRequest request) { return createAndPutReaderContext(request, indexService, shard, shard.acquireSearcherSupplier(), keepAliveInMillis); } - final ReaderContext createAndPutReaderContext( + public final ReaderContext createAndPutReaderContext( + ShardSearchRequest request, + IndexService indexService, + IndexShard shard, + Engine.SearcherSupplier reader, + long keepAliveInMillis + ) { + final ShardSearchContextId id = new ShardSearchContextId(sessionId, idGenerator.incrementAndGet()); + return createAndPutReaderContext(request, id, indexService, shard, reader, keepAliveInMillis, true); + } + + public final ReaderContext createAndPutReaderContext( ShardSearchRequest request, + ShardSearchContextId id, IndexService indexService, IndexShard shard, Engine.SearcherSupplier reader, - long keepAliveInMillis + long keepAliveInMillis, + boolean singleSession ) { ReaderContext readerContext = null; Releasable decreaseScrollContexts = null; try { - final ShardSearchContextId id = new ShardSearchContextId(sessionId, idGenerator.incrementAndGet()); - if (request.scroll() != null) { + if (request != null && request.scroll() != null) { decreaseScrollContexts = openScrollContexts::decrementAndGet; if (openScrollContexts.incrementAndGet() > maxOpenScrollContext) { throw new TooManyScrollContextsException(maxOpenScrollContext, MAX_OPEN_SCROLL_CONTEXT.getKey()); @@ -1306,7 +1327,7 @@ final ReaderContext createAndPutReaderContext( readerContext.addOnClose(decreaseScrollContexts); decreaseScrollContexts = null; } else { - readerContext = new ReaderContext(id, indexService, shard, reader, keepAliveInMillis, true); + readerContext = new ReaderContext(id, indexService, shard, reader, keepAliveInMillis, singleSession); } reader = null; final ReaderContext finalReaderContext = readerContext; @@ -2278,5 +2299,10 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash(sessionId, id); } + + @Override + public String toString() { + return "[" + sessionId + "/" + id + "]"; + } } } From fe8679887fef46d5ab269c7112059b8d26ae1b9a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20B=C3=BCscher?= Date: Wed, 27 Aug 2025 13:15:10 +0200 Subject: [PATCH 11/18] iter --- .../index/engine/ReadOnlyEngine.java | 2 +- .../elasticsearch/search/SearchService.java | 66 ------------------- 2 files changed, 1 insertion(+), 67 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java index ec8d980f8efa6..e5531b10c393c 100644 --- a/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java +++ b/server/src/main/java/org/elasticsearch/index/engine/ReadOnlyEngine.java @@ -152,7 +152,7 @@ public ReadOnlyEngine( * as the search id because the commit id changes whenever IndexWriter#commit is called although the segment files stay unchanged. * Any recovery except the local recovery performs IndexWriter#commit to generate a new translog uuid or history_uuid. */ - public static String generateSearcherId(SegmentInfos sis) { + static String generateSearcherId(SegmentInfos sis) { final MessageDigest md = MessageDigests.sha256(); for (SegmentCommitInfo si : sis) { final byte[] segmentId = si.getId(); diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 064c8845e2de1..0e48aac8f48ff 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -12,15 +12,10 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; -import org.apache.lucene.index.DirectoryReader; -import org.apache.lucene.index.IndexCommit; -import org.apache.lucene.index.SegmentInfos; -import org.apache.lucene.index.StandardDirectoryReader; import org.apache.lucene.search.FieldDoc; import org.apache.lucene.search.MatchAllDocsQuery; import org.apache.lucene.search.Query; import org.apache.lucene.search.TopDocs; -import org.apache.lucene.store.Directory; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; import org.elasticsearch.TransportVersion; @@ -66,12 +61,7 @@ import org.elasticsearch.index.IndexNotFoundException; import org.elasticsearch.index.IndexService; import org.elasticsearch.index.IndexSettings; -import org.elasticsearch.index.IndexVersions; import org.elasticsearch.index.engine.Engine; -import org.elasticsearch.index.engine.Engine.Searcher; -import org.elasticsearch.index.engine.Engine.SearcherSupplier; -import org.elasticsearch.index.engine.EngineConfig; -import org.elasticsearch.index.engine.ReadOnlyEngine; import org.elasticsearch.index.query.CoordinatorRewriteContextProvider; import org.elasticsearch.index.query.InnerHitContextBuilder; import org.elasticsearch.index.query.InnerHitsRewriteContext; @@ -1876,62 +1866,6 @@ public List getActiveContexts(ShardId shardId) { .collect(Collectors.toList()); } - public void reopenPitContexts(ShardId shardId, String segmentsFileName, long keepAlive, String sessionId, long contextId) { - IndexService indexService = this.indicesService.indexServiceSafe(shardId.getIndex()); - final IndexShard shard = indexService.getShard(shardId.id()); - ReaderContext readerContext = null; - try { - Directory directory = shard.store().directory(); - SegmentInfos segmentCommitInfos = SegmentInfos.readCommit( - directory, - segmentsFileName, - IndexVersions.MINIMUM_READONLY_COMPATIBLE.luceneVersion().major - ); - IndexCommit indexCommit = Lucene.getIndexCommit(segmentCommitInfos, directory); - DirectoryReader open = StandardDirectoryReader.open(indexCommit); - - String searcherId = ReadOnlyEngine.generateSearcherId(segmentCommitInfos); - final ShardSearchContextId shardSearchContextId = new ShardSearchContextId(sessionId, contextId, searcherId); - SearcherSupplier searchSupplier = new Engine.SearcherSupplier(Function.identity()) { - - @Override - protected void doClose() { - // TODO: implement closing logic - } - - @Override - protected Searcher acquireSearcherInternal(String source) { - EngineConfig engineConfig = shard.getEngineOrNull().getEngineConfig(); - - return new Searcher( - "source", - open, - engineConfig.getSimilarity(), - engineConfig.getQueryCache(), - engineConfig.getQueryCachingPolicy(), - () -> {} - ); - } - }; - readerContext = new ReaderContext(shardSearchContextId, indexService, shard, searchSupplier, keepAlive, false); - final ReaderContext finalReaderContext = readerContext; - final SearchOperationListener searchOperationListener = shard.getSearchOperationListener(); - searchOperationListener.onNewReaderContext(finalReaderContext); - if (finalReaderContext.scrollContext() != null) { - searchOperationListener.onNewScrollContext(finalReaderContext); - readerContext.addOnClose(() -> searchOperationListener.onFreeScrollContext(finalReaderContext)); - } - readerContext.addOnClose(() -> searchOperationListener.onFreeReaderContext(finalReaderContext)); - putReaderContext(finalReaderContext); - readerContext = null; - } catch (IOException e) { - throw new RuntimeException(e); - } finally { - // TODO what do we need to close here? - // Releasables.close(searchSupplier, readerContext); - } - } - /** * Used to indicate which result object should be instantiated when creating a search context */ From 93dcb2fc3f37834098dd68202ffdaea86f8f4ce6 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Wed, 27 Aug 2025 11:22:33 +0000 Subject: [PATCH 12/18] [CI] Auto commit changes from spotless --- .../java/org/elasticsearch/search/SearchService.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 0e48aac8f48ff..9512758b32049 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -1286,11 +1286,11 @@ final ReaderContext createOrGetReaderContext(ShardSearchRequest request) { } public final ReaderContext createAndPutReaderContext( - ShardSearchRequest request, - IndexService indexService, - IndexShard shard, - Engine.SearcherSupplier reader, - long keepAliveInMillis + ShardSearchRequest request, + IndexService indexService, + IndexShard shard, + Engine.SearcherSupplier reader, + long keepAliveInMillis ) { final ShardSearchContextId id = new ShardSearchContextId(sessionId, idGenerator.incrementAndGet()); return createAndPutReaderContext(request, id, indexService, shard, reader, keepAliveInMillis, true); From 162ecf6dae49ae30128959d9427c3d0360b27499 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20B=C3=BCscher?= Date: Wed, 27 Aug 2025 13:15:10 +0200 Subject: [PATCH 13/18] iter --- .../elasticsearch/search/SearchService.java | 29 +------------------ 1 file changed, 1 insertion(+), 28 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 9512758b32049..1dbae7ec270ab 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -146,7 +146,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.ExecutionException; @@ -2212,31 +2211,5 @@ public AggregationReduceContext forFinalReduction() { }; } - public static class ReaderContextId { - private final String sessionId; - private final long id; - - public ReaderContextId(String sessionId, long id) { - this.sessionId = Objects.requireNonNull(sessionId); - this.id = id; - } - - @Override - public boolean equals(Object o) { - if (this == o) return true; - if (o == null || getClass() != o.getClass()) return false; - ReaderContextId that = (ReaderContextId) o; - return id == that.id && sessionId.equals(that.sessionId); - } - - @Override - public int hashCode() { - return Objects.hash(sessionId, id); - } - - @Override - public String toString() { - return "[" + sessionId + "/" + id + "]"; - } - } + public record ReaderContextId(String sessionId, long id) {} } From 3de1b3f961c26929dc60c753318bc8adf88ecbe7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20B=C3=BCscher?= Date: Wed, 27 Aug 2025 14:56:15 +0200 Subject: [PATCH 14/18] iter --- .../main/java/org/elasticsearch/search/SearchService.java | 7 ------- 1 file changed, 7 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 1dbae7ec270ab..fbf1354bdcd8c 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -557,13 +557,6 @@ protected ReaderContext removeReaderContext(ReaderContextId id) { logger.trace("removing reader context [{}]", id); } ReaderContext remove = activeReaders.remove(id); - logger.info( - "--> node [{}] removing [{}] reader context [{}], current [{}]", - clusterService.localNode().getId(), - remove != null, - id, - activeReaders.size() - ); return remove; } From cc60f507aaf48a2cd9c9f7ca11d4a0bbf1cc162a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20B=C3=BCscher?= Date: Wed, 27 Aug 2025 18:18:38 +0200 Subject: [PATCH 15/18] Fixing MockSearchServiceTests --- .../java/org/elasticsearch/search/MockSearchServiceTests.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/test/framework/src/test/java/org/elasticsearch/search/MockSearchServiceTests.java b/test/framework/src/test/java/org/elasticsearch/search/MockSearchServiceTests.java index e6b02ff8c922a..fb504e2dc343a 100644 --- a/test/framework/src/test/java/org/elasticsearch/search/MockSearchServiceTests.java +++ b/test/framework/src/test/java/org/elasticsearch/search/MockSearchServiceTests.java @@ -12,12 +12,10 @@ import org.elasticsearch.search.SearchService.ReaderContextId; import org.elasticsearch.test.ESTestCase; -import static org.mockito.Mockito.mock; - public class MockSearchServiceTests extends ESTestCase { public void testAssertNoInFlightContext() { - ReaderContextId readerId = mock(ReaderContextId.class); + ReaderContextId readerId = new ReaderContextId("sessionId", 1L); MockSearchService.addActiveContext(readerId); try { Throwable e = expectThrows(AssertionError.class, MockSearchService::assertNoInFlightContext); From 68280cf688338f88fee5133f688a21efd55b0410 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20B=C3=BCscher?= Date: Thu, 28 Aug 2025 23:29:35 +0200 Subject: [PATCH 16/18] Adding re-writing of PIT id --- .../search/AbstractSearchAsyncAction.java | 5 ++++- .../action/search/PITHelper.java | 2 +- .../action/search/SearchContextId.java | 20 ++++++++++++++++--- .../action/search/SearchContextIdForNode.java | 17 +++++++++++++++- .../SearchQueryThenFetchAsyncAction.java | 2 +- .../action/search/TransportSearchAction.java | 1 + .../elasticsearch/search/SearchService.java | 5 ++--- 7 files changed, 42 insertions(+), 10 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index e058a3c83d41c..c35945d8d1b7d 100644 --- a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -13,6 +13,7 @@ import org.apache.lucene.util.SetOnce; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; +import org.elasticsearch.TransportVersion; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.NoShardAvailableActionException; import org.elasticsearch.action.OriginalIndices; @@ -93,6 +94,7 @@ abstract class AbstractSearchAsyncAction exten private final Map pendingExecutionsPerNode; private final AtomicBoolean requestCancelled = new AtomicBoolean(); private final int skippedCount; + private final TransportVersion clusterMinTransportVersion; // protected for tests protected final SubscribableListener doneFuture = new SubscribableListener<>(); @@ -149,6 +151,7 @@ abstract class AbstractSearchAsyncAction exten this.nodeIdToConnection = nodeIdToConnection; this.concreteIndexBoosts = concreteIndexBoosts; this.clusterStateVersion = clusterState.version(); + this.clusterMinTransportVersion = clusterState.getMinTransportVersion(); this.aliasFilter = aliasFilter; this.results = resultConsumer; // register the release of the query consumer to free up the circuit breaker memory @@ -609,7 +612,7 @@ public void sendSearchResponse(SearchResponseSections internalSearchResponse, At protected BytesReference buildSearchContextId(ShardSearchFailure[] failures) { var source = request.source(); return source != null && source.pointInTimeBuilder() != null && source.pointInTimeBuilder().singleSession() == false - ? source.pointInTimeBuilder().getEncodedId() + ? SearchContextId.encode(results.getAtomicArray().asList(), aliasFilter, clusterMinTransportVersion, failures) : null; } diff --git a/server/src/main/java/org/elasticsearch/action/search/PITHelper.java b/server/src/main/java/org/elasticsearch/action/search/PITHelper.java index 8cc3893fa3c3d..ea5aeead73fb7 100644 --- a/server/src/main/java/org/elasticsearch/action/search/PITHelper.java +++ b/server/src/main/java/org/elasticsearch/action/search/PITHelper.java @@ -26,7 +26,7 @@ public static SearchContextId decodePITId(String id) throws IOException { return decodePITId(new BytesArray(Base64.getUrlDecoder().decode(id))); } - public static SearchContextId decodePITId(BytesReference id) throws IOException { + public static SearchContextId decodePITId(BytesReference id) { try (var in = id.streamInput()) { final TransportVersion version = TransportVersion.readVersion(in); in.setTransportVersion(version); diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchContextId.java b/server/src/main/java/org/elasticsearch/action/search/SearchContextId.java index c2f1510341fb0..8364beaf037cc 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchContextId.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchContextId.java @@ -30,6 +30,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.TreeSet; import java.util.stream.Collectors; @@ -39,7 +40,7 @@ public final class SearchContextId { private final Map aliasFilter; private final transient Set contextIds; - SearchContextId(Map shards, Map aliasFilter) { + public SearchContextId(Map shards, Map aliasFilter) { this.shards = shards; this.aliasFilter = aliasFilter; this.contextIds = shards.values().stream().map(SearchContextIdForNode::getSearchContextId).collect(Collectors.toSet()); @@ -57,8 +58,8 @@ public boolean contains(ShardSearchContextId contextId) { return contextIds.contains(contextId); } - public static BytesReference encode( - List searchPhaseResults, + public static BytesReference encode( + List searchPhaseResults, Map aliasFilter, TransportVersion version, ShardSearchFailure[] shardFailures @@ -142,4 +143,17 @@ public String[] getActualIndices() { } return indices.toArray(String[]::new); } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) return false; + SearchContextId that = (SearchContextId) o; + return Objects.equals(shards, that.shards) && Objects.equals(aliasFilter, + that.aliasFilter) && Objects.equals(contextIds, that.contextIds); + } + + @Override + public int hashCode() { + return Objects.hash(shards, aliasFilter, contextIds); + } } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchContextIdForNode.java b/server/src/main/java/org/elasticsearch/action/search/SearchContextIdForNode.java index f91a9d09f4bb4..431d0933718f7 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchContextIdForNode.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchContextIdForNode.java @@ -17,6 +17,7 @@ import org.elasticsearch.search.internal.ShardSearchContextId; import java.io.IOException; +import java.util.Objects; public final class SearchContextIdForNode implements Writeable { private final String node; @@ -30,7 +31,7 @@ public final class SearchContextIdForNode implements Writeable { * @param node The target node where the search context ID is defined, or {@code null} if the shard is missing or unavailable. * @param searchContextId The {@link ShardSearchContextId}, or {@code null} if the shard is missing or unavailable. */ - SearchContextIdForNode(@Nullable String clusterAlias, @Nullable String node, @Nullable ShardSearchContextId searchContextId) { + public SearchContextIdForNode(@Nullable String clusterAlias, @Nullable String node, @Nullable ShardSearchContextId searchContextId) { this.node = node; this.clusterAlias = clusterAlias; this.searchContextId = searchContextId; @@ -103,4 +104,18 @@ public String toString() { + '\'' + '}'; } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) return false; + SearchContextIdForNode that = (SearchContextIdForNode) o; + return Objects.equals(node, that.node) + && Objects.equals(searchContextId, that.searchContextId) + && Objects.equals(clusterAlias, that.clusterAlias); + } + + @Override + public int hashCode() { + return Objects.hash(node, searchContextId, clusterAlias); + } } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index fffbd26adce50..fd0b988715413 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -709,7 +709,7 @@ protected void innerOnResponse(SearchPhaseResult searchPhaseResult) { } catch (Exception e) { setFailure(state, dataNodeLocalIdx, e); } finally { - doneFuture.onResponse(null); + doneFuture.onResponse(null); } } diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index d3a331ac88d81..c8d3099d38028 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -1233,6 +1233,7 @@ static List getRemoteShardsIteratorFromPointInTime( // Otherwise, we add the shard iterator without a target node, allowing a partial search failure to // be thrown when a search phase attempts to access it. targetNodes.add(perNode.getNode()); + // TODO we will need to adapt something here as well I think if (perNode.getSearchContextId().getSearcherId() != null) { for (String node : group.allocatedNodes()) { if (node.equals(perNode.getNode()) == false) { diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index fbf1354bdcd8c..49a89b6573546 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -541,7 +541,6 @@ public void beforeIndexShardCreated(ShardRouting routing, Settings indexSettings protected void putReaderContext(ReaderContext context) { final ReaderContext previous = activeReaders.put(context.readerContextId(), context); - logger.info("--> adding reader context [{}], current [{}]", context.readerContextId(), activeReaders.size()); assert previous == null; // ensure that if we race against afterIndexRemoved, we remove the context from the active list. // this is important to ensure store can be cleaned up, in particular if the search is a scroll with a long timeout. @@ -556,8 +555,7 @@ protected ReaderContext removeReaderContext(ReaderContextId id) { if (logger.isTraceEnabled()) { logger.trace("removing reader context [{}]", id); } - ReaderContext remove = activeReaders.remove(id); - return remove; + return activeReaders.remove(id); } @Override @@ -1261,6 +1259,7 @@ final ReaderContext createOrGetReaderContext(ShardSearchRequest request) { if (searcherId == null) { throw e; } + // TODO this retries on context with same searcher id, currently offered in FrozenEngine and ReadonlyEngine final IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); final IndexShard shard = indexService.getShard(request.shardId().id()); final Engine.SearcherSupplier searcherSupplier = shard.acquireSearcherSupplier(); From b5deadc40ba07c91670ac152ec1e0996b850496d Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Thu, 28 Aug 2025 21:49:01 +0000 Subject: [PATCH 17/18] [CI] Auto commit changes from spotless --- .../org/elasticsearch/action/search/SearchContextId.java | 5 +++-- .../action/search/SearchQueryThenFetchAsyncAction.java | 2 +- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchContextId.java b/server/src/main/java/org/elasticsearch/action/search/SearchContextId.java index 8364beaf037cc..29d4e43ea7b9f 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchContextId.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchContextId.java @@ -148,8 +148,9 @@ public String[] getActualIndices() { public boolean equals(Object o) { if (o == null || getClass() != o.getClass()) return false; SearchContextId that = (SearchContextId) o; - return Objects.equals(shards, that.shards) && Objects.equals(aliasFilter, - that.aliasFilter) && Objects.equals(contextIds, that.contextIds); + return Objects.equals(shards, that.shards) + && Objects.equals(aliasFilter, that.aliasFilter) + && Objects.equals(contextIds, that.contextIds); } @Override diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java index fd0b988715413..fffbd26adce50 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchQueryThenFetchAsyncAction.java @@ -709,7 +709,7 @@ protected void innerOnResponse(SearchPhaseResult searchPhaseResult) { } catch (Exception e) { setFailure(state, dataNodeLocalIdx, e); } finally { - doneFuture.onResponse(null); + doneFuture.onResponse(null); } } From 7695a32c02105a93c6a864fb83cb896f43ff60e0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Christoph=20B=C3=BCscher?= Date: Thu, 4 Sep 2025 11:09:05 +0200 Subject: [PATCH 18/18] Revert "Adding re-writing of PIT id" This reverts commit 68280cf688338f88fee5133f688a21efd55b0410. --- .../action/search/AbstractSearchAsyncAction.java | 5 +---- .../java/org/elasticsearch/action/search/PITHelper.java | 2 +- .../org/elasticsearch/action/search/SearchContextId.java | 6 +++--- .../elasticsearch/action/search/SearchContextIdForNode.java | 2 +- .../elasticsearch/action/search/TransportSearchAction.java | 3 ++- .../main/java/org/elasticsearch/search/SearchService.java | 4 +++- 6 files changed, 11 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java index c35945d8d1b7d..e058a3c83d41c 100644 --- a/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java @@ -13,7 +13,6 @@ import org.apache.lucene.util.SetOnce; import org.elasticsearch.ElasticsearchException; import org.elasticsearch.ExceptionsHelper; -import org.elasticsearch.TransportVersion; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.NoShardAvailableActionException; import org.elasticsearch.action.OriginalIndices; @@ -94,7 +93,6 @@ abstract class AbstractSearchAsyncAction exten private final Map pendingExecutionsPerNode; private final AtomicBoolean requestCancelled = new AtomicBoolean(); private final int skippedCount; - private final TransportVersion clusterMinTransportVersion; // protected for tests protected final SubscribableListener doneFuture = new SubscribableListener<>(); @@ -151,7 +149,6 @@ abstract class AbstractSearchAsyncAction exten this.nodeIdToConnection = nodeIdToConnection; this.concreteIndexBoosts = concreteIndexBoosts; this.clusterStateVersion = clusterState.version(); - this.clusterMinTransportVersion = clusterState.getMinTransportVersion(); this.aliasFilter = aliasFilter; this.results = resultConsumer; // register the release of the query consumer to free up the circuit breaker memory @@ -612,7 +609,7 @@ public void sendSearchResponse(SearchResponseSections internalSearchResponse, At protected BytesReference buildSearchContextId(ShardSearchFailure[] failures) { var source = request.source(); return source != null && source.pointInTimeBuilder() != null && source.pointInTimeBuilder().singleSession() == false - ? SearchContextId.encode(results.getAtomicArray().asList(), aliasFilter, clusterMinTransportVersion, failures) + ? source.pointInTimeBuilder().getEncodedId() : null; } diff --git a/server/src/main/java/org/elasticsearch/action/search/PITHelper.java b/server/src/main/java/org/elasticsearch/action/search/PITHelper.java index ea5aeead73fb7..29fbabb6073c3 100644 --- a/server/src/main/java/org/elasticsearch/action/search/PITHelper.java +++ b/server/src/main/java/org/elasticsearch/action/search/PITHelper.java @@ -22,7 +22,7 @@ public class PITHelper { - public static SearchContextId decodePITId(String id) throws IOException { + public static SearchContextId decodePITId(String id) { return decodePITId(new BytesArray(Base64.getUrlDecoder().decode(id))); } diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchContextId.java b/server/src/main/java/org/elasticsearch/action/search/SearchContextId.java index 29d4e43ea7b9f..07f8875ee06a3 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchContextId.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchContextId.java @@ -40,7 +40,7 @@ public final class SearchContextId { private final Map aliasFilter; private final transient Set contextIds; - public SearchContextId(Map shards, Map aliasFilter) { + SearchContextId(Map shards, Map aliasFilter) { this.shards = shards; this.aliasFilter = aliasFilter; this.contextIds = shards.values().stream().map(SearchContextIdForNode::getSearchContextId).collect(Collectors.toSet()); @@ -58,8 +58,8 @@ public boolean contains(ShardSearchContextId contextId) { return contextIds.contains(contextId); } - public static BytesReference encode( - List searchPhaseResults, + public static BytesReference encode( + List searchPhaseResults, Map aliasFilter, TransportVersion version, ShardSearchFailure[] shardFailures diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchContextIdForNode.java b/server/src/main/java/org/elasticsearch/action/search/SearchContextIdForNode.java index 431d0933718f7..f68bd1ea52cb8 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchContextIdForNode.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchContextIdForNode.java @@ -31,7 +31,7 @@ public final class SearchContextIdForNode implements Writeable { * @param node The target node where the search context ID is defined, or {@code null} if the shard is missing or unavailable. * @param searchContextId The {@link ShardSearchContextId}, or {@code null} if the shard is missing or unavailable. */ - public SearchContextIdForNode(@Nullable String clusterAlias, @Nullable String node, @Nullable ShardSearchContextId searchContextId) { + SearchContextIdForNode(@Nullable String clusterAlias, @Nullable String node, @Nullable ShardSearchContextId searchContextId) { this.node = node; this.clusterAlias = clusterAlias; this.searchContextId = searchContextId; diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index c8d3099d38028..7d5224ed94e5f 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -1233,7 +1233,8 @@ static List getRemoteShardsIteratorFromPointInTime( // Otherwise, we add the shard iterator without a target node, allowing a partial search failure to // be thrown when a search phase attempts to access it. targetNodes.add(perNode.getNode()); - // TODO we will need to adapt something here as well I think + // TODO this looks like its on the cross-cluster search path, we will need to adapt the retry mechanism here as well I + // think if (perNode.getSearchContextId().getSearcherId() != null) { for (String node : group.allocatedNodes()) { if (node.equals(perNode.getNode()) == false) { diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index 49a89b6573546..93b2f1cccc5bd 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -1259,7 +1259,9 @@ final ReaderContext createOrGetReaderContext(ShardSearchRequest request) { if (searcherId == null) { throw e; } - // TODO this retries on context with same searcher id, currently offered in FrozenEngine and ReadonlyEngine + // TODO here we retry creating contexts on the same node if we can get a Searchwe with same searcher id as the original + // PIT context. That seems to currently only be working for FrozenEngine and ReadonlyEngine where the commitId is used + // as a searcher id final IndexService indexService = indicesService.indexServiceSafe(request.shardId().getIndex()); final IndexShard shard = indexService.getShard(request.shardId().id()); final Engine.SearcherSupplier searcherSupplier = shard.acquireSearcherSupplier();