-
Notifications
You must be signed in to change notification settings - Fork 25.3k
Skip search shards with INDEX_REFRESH_BLOCK #129132
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
2aa74e3
12b6b81
9c705cd
1c75721
1ecc447
cdb4bc1
b7ade2d
cd991c2
5f50d5c
3f86fb8
0edc27c
9de6f06
be37bf6
17706e2
8759a07
bf8a2be
8887609
0f0200a
7689263
598e906
4cdfbd0
76ecade
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,129 @@ | ||
/* | ||
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one | ||
* or more contributor license agreements. Licensed under the "Elastic License | ||
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side | ||
* Public License v 1"; you may not use this file except in compliance with, at | ||
* your election, the "Elastic License 2.0", the "GNU Affero General Public | ||
* License v3.0 only", or the "Server Side Public License, v 1". | ||
*/ | ||
|
||
package org.elasticsearch.search; | ||
|
||
import org.elasticsearch.action.index.IndexRequestBuilder; | ||
import org.elasticsearch.action.search.ClosePointInTimeRequest; | ||
import org.elasticsearch.action.search.OpenPointInTimeRequest; | ||
import org.elasticsearch.action.search.SearchRequest; | ||
import org.elasticsearch.action.search.TransportClosePointInTimeAction; | ||
import org.elasticsearch.action.search.TransportOpenPointInTimeAction; | ||
import org.elasticsearch.cluster.ClusterState; | ||
import org.elasticsearch.cluster.block.ClusterBlocks; | ||
import org.elasticsearch.cluster.metadata.IndexMetadata; | ||
import org.elasticsearch.cluster.metadata.ProjectId; | ||
import org.elasticsearch.cluster.node.DiscoveryNode; | ||
import org.elasticsearch.cluster.service.ClusterService; | ||
import org.elasticsearch.common.bytes.BytesReference; | ||
import org.elasticsearch.core.TimeValue; | ||
import org.elasticsearch.index.query.QueryBuilders; | ||
import org.elasticsearch.search.builder.PointInTimeBuilder; | ||
import org.elasticsearch.search.builder.SearchSourceBuilder; | ||
import org.elasticsearch.test.ESIntegTestCase; | ||
|
||
import java.util.ArrayList; | ||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
|
||
import static org.elasticsearch.cluster.block.ClusterBlocks.EMPTY_CLUSTER_BLOCK; | ||
import static org.elasticsearch.test.ClusterServiceUtils.setState; | ||
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; | ||
|
||
public class SearchWithIndexBlocksIT extends ESIntegTestCase { | ||
|
||
public void testSearchIndicesWithIndexRefreshBlocks() { | ||
List<String> indices = createIndices(); | ||
Map<String, Integer> numDocsPerIndex = indexDocuments(indices); | ||
List<String> unblockedIndices = addIndexRefreshBlockToSomeIndices(indices); | ||
|
||
int expectedHits = 0; | ||
for (String index : unblockedIndices) { | ||
expectedHits += numDocsPerIndex.get(index); | ||
} | ||
|
||
assertHitCount(prepareSearch().setQuery(QueryBuilders.matchAllQuery()), expectedHits); | ||
} | ||
|
||
public void testOpenPITWithIndexRefreshBlock() { | ||
List<String> indices = createIndices(); | ||
Map<String, Integer> numDocsPerIndex = indexDocuments(indices); | ||
List<String> unblockedIndices = addIndexRefreshBlockToSomeIndices(indices); | ||
|
||
int expectedHits = 0; | ||
for (String index : unblockedIndices) { | ||
expectedHits += numDocsPerIndex.get(index); | ||
} | ||
|
||
BytesReference pitId = null; | ||
try { | ||
OpenPointInTimeRequest openPITRequest = new OpenPointInTimeRequest(indices.toArray(new String[0])).keepAlive( | ||
TimeValue.timeValueSeconds(10) | ||
).allowPartialSearchResults(true); | ||
pitId = client().execute(TransportOpenPointInTimeAction.TYPE, openPITRequest).actionGet().getPointInTimeId(); | ||
SearchRequest searchRequest = new SearchRequest().source( | ||
new SearchSourceBuilder().pointInTimeBuilder(new PointInTimeBuilder(pitId).setKeepAlive(TimeValue.timeValueSeconds(10))) | ||
); | ||
assertHitCount(client().search(searchRequest), expectedHits); | ||
} finally { | ||
if (pitId != null) { | ||
client().execute(TransportClosePointInTimeAction.TYPE, new ClosePointInTimeRequest(pitId)).actionGet(); | ||
} | ||
} | ||
} | ||
|
||
private List<String> createIndices() { | ||
int numIndices = randomIntBetween(1, 3); | ||
List<String> indices = new ArrayList<>(); | ||
for (int i = 0; i < numIndices; i++) { | ||
indices.add("test" + i); | ||
createIndex("test" + i); | ||
} | ||
return indices; | ||
} | ||
|
||
private Map<String, Integer> indexDocuments(List<String> indices) { | ||
Map<String, Integer> numDocsPerIndex = new HashMap<>(); | ||
List<IndexRequestBuilder> indexRequests = new ArrayList<>(); | ||
for (String index : indices) { | ||
int numDocs = randomIntBetween(0, 10); | ||
numDocsPerIndex.put(index, numDocs); | ||
for (int i = 0; i < numDocs; i++) { | ||
indexRequests.add(prepareIndex(index).setId(String.valueOf(i)).setSource("field", "value")); | ||
} | ||
} | ||
indexRandom(true, indexRequests); | ||
|
||
return numDocsPerIndex; | ||
} | ||
|
||
private List<String> addIndexRefreshBlockToSomeIndices(List<String> indices) { | ||
List<String> unblockedIndices = new ArrayList<>(); | ||
var blocksBuilder = ClusterBlocks.builder().blocks(EMPTY_CLUSTER_BLOCK); | ||
for (String index : indices) { | ||
boolean blockIndex = randomBoolean(); | ||
if (blockIndex) { | ||
blocksBuilder.addIndexBlock(ProjectId.DEFAULT, index, IndexMetadata.INDEX_REFRESH_BLOCK); | ||
} else { | ||
unblockedIndices.add(index); | ||
} | ||
} | ||
|
||
var dataNodes = clusterService().state().getNodes().getAllNodes(); | ||
for (DiscoveryNode dataNode : dataNodes) { | ||
ClusterService clusterService = internalCluster().getInstance(ClusterService.class, dataNode.getName()); | ||
ClusterState currentState = clusterService.state(); | ||
ClusterState newState = ClusterState.builder(currentState).blocks(blocksBuilder).build(); | ||
setState(clusterService, newState); | ||
} | ||
|
||
return unblockedIndices; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change | ||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -186,6 +186,12 @@ private void runCoordinatorRewritePhase() { | |||||||||||||||
assert assertSearchCoordinationThread(); | ||||||||||||||||
final List<SearchShardIterator> matchedShardLevelRequests = new ArrayList<>(); | ||||||||||||||||
for (SearchShardIterator searchShardIterator : shardsIts) { | ||||||||||||||||
if (searchShardIterator.prefiltered() == false && searchShardIterator.skip()) { | ||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. As far as I understand this is what actually skips the shards being searched. Why is this done here in the CanMatchPreFilterSearchPhase? My understanding is that we don't always use this phase, e.g. "shouldPreFilterSearchShards" returns There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Actually, what really skips the shards for search is this code: elasticsearch/server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java Lines 123 to 129 in 01b6de3
I added this check to avoid running can-match on shards that are skipped. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. TY There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This got my attention too. Is it a necessary change for this PR? I was trying to figure out how it ties to the refresh block. |
||||||||||||||||
// This implies the iterator was skipped due to an index level block, | ||||||||||||||||
// not a remote can-match run. | ||||||||||||||||
continue; | ||||||||||||||||
} | ||||||||||||||||
|
||||||||||||||||
final CanMatchNodeRequest canMatchNodeRequest = new CanMatchNodeRequest( | ||||||||||||||||
request, | ||||||||||||||||
searchShardIterator.getOriginalIndices().indicesOptions(), | ||||||||||||||||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -41,19 +41,28 @@ public final class SearchShardIterator implements Comparable<SearchShardIterator | |
|
||
/** | ||
* Creates a {@link SearchShardIterator} instance that iterates over a subset of the given shards | ||
* this the a given <code>shardId</code>. | ||
* for a given <code>shardId</code>. | ||
* | ||
* @param clusterAlias the alias of the cluster where the shard is located | ||
* @param shardId shard id of the group | ||
* @param shards shards to iterate | ||
* @param originalIndices the indices that the search request originally related to (before any rewriting happened) | ||
* @param skip if true, then this group won't have matches (due to an index level block), | ||
* and it can be safely skipped from the search | ||
*/ | ||
public SearchShardIterator(@Nullable String clusterAlias, ShardId shardId, List<ShardRouting> shards, OriginalIndices originalIndices) { | ||
this(clusterAlias, shardId, shards.stream().map(ShardRouting::currentNodeId).toList(), originalIndices, null, null, false, false); | ||
public SearchShardIterator( | ||
benchaplin marked this conversation as resolved.
Show resolved
Hide resolved
|
||
@Nullable String clusterAlias, | ||
ShardId shardId, | ||
List<ShardRouting> shards, | ||
OriginalIndices originalIndices, | ||
boolean skip | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. skip is mutable, did we need to add a new constructor variant to mutate it? |
||
) { | ||
this(clusterAlias, shardId, shards.stream().map(ShardRouting::currentNodeId).toList(), originalIndices, null, null, false, skip); | ||
} | ||
|
||
/** | ||
* Creates a {@link SearchShardIterator} instance that iterates over a subset of the given shards | ||
* for a given <code>shardId</code>. | ||
* | ||
* @param clusterAlias the alias of the cluster where the shard is located | ||
* @param shardId shard id of the group | ||
|
@@ -62,7 +71,8 @@ public SearchShardIterator(@Nullable String clusterAlias, ShardId shardId, List< | |
* @param searchContextId the point-in-time specified for this group if exists | ||
* @param searchContextKeepAlive the time interval that data nodes should extend the keep alive of the point-in-time | ||
* @param prefiltered if true, then this group already executed the can_match phase | ||
* @param skip if true, then this group won't have matches, and it can be safely skipped from the search | ||
* @param skip if true, then this group won't have matches (due to can match, or an index level block), | ||
* and it can be safely skipped from the search | ||
*/ | ||
public SearchShardIterator( | ||
@Nullable String clusterAlias, | ||
|
@@ -83,7 +93,6 @@ public SearchShardIterator( | |
assert searchContextKeepAlive == null || searchContextId != null; | ||
this.prefiltered = prefiltered; | ||
this.skip = skip; | ||
assert skip == false || prefiltered : "only prefiltered shards are skip-able"; | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am trying to remember what prefiltered was all about. I think it was only for bw comp, to support two variants of the search shards API, the new one that supports coordinator rewrite, and the other one that does not. Looking at the code, i wonder if we can remove prefiltered actually (as a follow up?) in main. But my actual question is: why remove this assert? |
||
} | ||
|
||
/** | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -148,6 +148,9 @@ public class TransportSearchAction extends HandledTransportAction<SearchRequest, | |
Property.NodeScope | ||
); | ||
|
||
// Marker to indicate this index's shards should be skipped in a search | ||
private static final OriginalIndices SKIPPED_INDICES = new OriginalIndices(Strings.EMPTY_ARRAY, IndicesOptions.strictExpandOpen()); | ||
cbuescher marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
||
private final ThreadPool threadPool; | ||
private final ClusterService clusterService; | ||
private final TransportService transportService; | ||
|
@@ -234,6 +237,10 @@ private Map<String, OriginalIndices> buildPerIndexOriginalIndices( | |
for (String index : indices) { | ||
if (hasBlocks) { | ||
blocks.indexBlockedRaiseException(projectId, ClusterBlockLevel.READ, index); | ||
if (blocks.hasIndexBlock(projectState.projectId(), index, IndexMetadata.INDEX_REFRESH_BLOCK)) { | ||
res.put(index, SKIPPED_INDICES); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I wonder if we are doing the filtering in the right place. It is a bit counter intuitive that we would resolve the shards given the indices and the skip some of them. Can we not filter the indices to start with? Maybe that removes the need to use that SKIPPED_INDICES marker thing too :) Do we need to check for this block only in the search API? By the way, something probably needs to happen in ES|QL too around this (does not need to be in this PR, but I am raising the need to track that). |
||
continue; | ||
} | ||
} | ||
|
||
String[] aliases = indexNameExpressionResolver.allIndexAliases(projectState.metadata(), index, indicesAndAliases); | ||
|
@@ -589,7 +596,7 @@ public void onFailure(Exception e) {} | |
); | ||
} | ||
|
||
static void adjustSearchType(SearchRequest searchRequest, boolean singleShard) { | ||
static void adjustSearchType(SearchRequest searchRequest, boolean oneOrZeroShardsToSearch) { | ||
// if there's a kNN search, always use DFS_QUERY_THEN_FETCH | ||
if (searchRequest.hasKnnSearch()) { | ||
searchRequest.searchType(DFS_QUERY_THEN_FETCH); | ||
|
@@ -604,7 +611,7 @@ static void adjustSearchType(SearchRequest searchRequest, boolean singleShard) { | |
} | ||
|
||
// optimize search type for cases where there is only one shard group to search on | ||
if (singleShard) { | ||
if (oneOrZeroShardsToSearch) { | ||
// if we only have one group, then we always want Q_T_F, no need for DFS, and no need to do THEN since we hit one shard | ||
searchRequest.searchType(QUERY_THEN_FETCH); | ||
} | ||
|
@@ -1305,7 +1312,7 @@ private void executeSearch( | |
|
||
Map<String, Float> concreteIndexBoosts = resolveIndexBoosts(searchRequest, projectState.cluster()); | ||
|
||
adjustSearchType(searchRequest, shardIterators.size() == 1); | ||
adjustSearchType(searchRequest, oneOrZeroShardsToSearch(shardIterators)); | ||
|
||
final DiscoveryNodes nodes = projectState.cluster().nodes(); | ||
BiFunction<String, String, Transport.Connection> connectionLookup = buildConnectionLookup( | ||
|
@@ -1338,6 +1345,33 @@ private void executeSearch( | |
); | ||
} | ||
|
||
/** | ||
* Determines if only one (or zero) search shard iterators will be searched. | ||
* (At this point, iterators may be marked as skipped due to index level blockers). | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. replace blockers with blocks? |
||
* We expect skipped iterators to be unlikely, so returning fast after we see more | ||
* than one "not skipped" is an intended optimization. | ||
* | ||
* @param searchShardIterators all the shard iterators derived from indices being searched | ||
* @return true if there are no more than one shard iterators, or if there are no more than | ||
* one not marked to skip | ||
*/ | ||
private boolean oneOrZeroShardsToSearch(List<SearchShardIterator> searchShardIterators) { | ||
if (searchShardIterators.size() <= 1) { | ||
return true; | ||
} | ||
|
||
int notSkippedCount = 0; | ||
for (SearchShardIterator searchShardIterator : searchShardIterators) { | ||
if (searchShardIterator.skip() == false) { | ||
notSkippedCount++; | ||
if (notSkippedCount > 1) { | ||
return false; | ||
} | ||
} | ||
} | ||
return true; | ||
} | ||
|
||
Executor asyncSearchExecutor(final String[] indices) { | ||
boolean seenSystem = false; | ||
boolean seenCritical = false; | ||
|
@@ -1890,7 +1924,13 @@ List<SearchShardIterator> getLocalShardsIterator( | |
final ShardId shardId = shardRouting.shardId(); | ||
OriginalIndices finalIndices = originalIndices.get(shardId.getIndex().getName()); | ||
assert finalIndices != null; | ||
list[i++] = new SearchShardIterator(clusterAlias, shardId, shardRouting.getShardRoutings(), finalIndices); | ||
list[i++] = new SearchShardIterator( | ||
clusterAlias, | ||
shardId, | ||
shardRouting.getShardRoutings(), | ||
finalIndices, | ||
finalIndices == SKIPPED_INDICES | ||
cbuescher marked this conversation as resolved.
Show resolved
Hide resolved
|
||
); | ||
} | ||
// the returned list must support in-place sorting, so this is the most memory efficient we can do here | ||
return Arrays.asList(list); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This method is not intended to be used in integration test as it overrides the current data node cluster state.
For testing the INDEX_REFRESH_BLOCK I think it makes sense to only have unit tests in stateful elasticsearch.