diff --git a/server/src/internalClusterTest/java/org/elasticsearch/search/SearchWithIndexBlocksIT.java b/server/src/internalClusterTest/java/org/elasticsearch/search/SearchWithIndexBlocksIT.java new file mode 100644 index 0000000000000..2f9cde7c27bef --- /dev/null +++ b/server/src/internalClusterTest/java/org/elasticsearch/search/SearchWithIndexBlocksIT.java @@ -0,0 +1,129 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.search; + +import org.elasticsearch.action.index.IndexRequestBuilder; +import org.elasticsearch.action.search.ClosePointInTimeRequest; +import org.elasticsearch.action.search.OpenPointInTimeRequest; +import org.elasticsearch.action.search.SearchRequest; +import org.elasticsearch.action.search.TransportClosePointInTimeAction; +import org.elasticsearch.action.search.TransportOpenPointInTimeAction; +import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.block.ClusterBlocks; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.bytes.BytesReference; +import org.elasticsearch.core.TimeValue; +import org.elasticsearch.index.query.QueryBuilders; +import org.elasticsearch.search.builder.PointInTimeBuilder; +import org.elasticsearch.search.builder.SearchSourceBuilder; +import org.elasticsearch.test.ESIntegTestCase; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.elasticsearch.cluster.block.ClusterBlocks.EMPTY_CLUSTER_BLOCK; +import static org.elasticsearch.test.ClusterServiceUtils.setState; +import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount; + +public class SearchWithIndexBlocksIT extends ESIntegTestCase { + + public void testSearchIndicesWithIndexRefreshBlocks() { + List indices = createIndices(); + Map numDocsPerIndex = indexDocuments(indices); + List unblockedIndices = addIndexRefreshBlockToSomeIndices(indices); + + int expectedHits = 0; + for (String index : unblockedIndices) { + expectedHits += numDocsPerIndex.get(index); + } + + assertHitCount(prepareSearch().setQuery(QueryBuilders.matchAllQuery()), expectedHits); + } + + public void testOpenPITWithIndexRefreshBlock() { + List indices = createIndices(); + Map numDocsPerIndex = indexDocuments(indices); + List unblockedIndices = addIndexRefreshBlockToSomeIndices(indices); + + int expectedHits = 0; + for (String index : unblockedIndices) { + expectedHits += numDocsPerIndex.get(index); + } + + BytesReference pitId = null; + try { + OpenPointInTimeRequest openPITRequest = new OpenPointInTimeRequest(indices.toArray(new String[0])).keepAlive( + TimeValue.timeValueSeconds(10) + ).allowPartialSearchResults(true); + pitId = client().execute(TransportOpenPointInTimeAction.TYPE, openPITRequest).actionGet().getPointInTimeId(); + SearchRequest searchRequest = new SearchRequest().source( + new SearchSourceBuilder().pointInTimeBuilder(new PointInTimeBuilder(pitId).setKeepAlive(TimeValue.timeValueSeconds(10))) + ); + assertHitCount(client().search(searchRequest), expectedHits); + } finally { + if (pitId != null) { + client().execute(TransportClosePointInTimeAction.TYPE, new ClosePointInTimeRequest(pitId)).actionGet(); + } + } + } + + private List createIndices() { + int numIndices = randomIntBetween(1, 3); + List indices = new ArrayList<>(); + for (int i = 0; i < numIndices; i++) { + indices.add("test" + i); + createIndex("test" + i); + } + return indices; + } + + private Map indexDocuments(List indices) { + Map numDocsPerIndex = new HashMap<>(); + List indexRequests = new ArrayList<>(); + for (String index : indices) { + int numDocs = randomIntBetween(0, 10); + numDocsPerIndex.put(index, numDocs); + for (int i = 0; i < numDocs; i++) { + indexRequests.add(prepareIndex(index).setId(String.valueOf(i)).setSource("field", "value")); + } + } + indexRandom(true, indexRequests); + + return numDocsPerIndex; + } + + private List addIndexRefreshBlockToSomeIndices(List indices) { + List unblockedIndices = new ArrayList<>(); + var blocksBuilder = ClusterBlocks.builder().blocks(EMPTY_CLUSTER_BLOCK); + for (String index : indices) { + boolean blockIndex = randomBoolean(); + if (blockIndex) { + blocksBuilder.addIndexBlock(ProjectId.DEFAULT, index, IndexMetadata.INDEX_REFRESH_BLOCK); + } else { + unblockedIndices.add(index); + } + } + + var dataNodes = clusterService().state().getNodes().getAllNodes(); + for (DiscoveryNode dataNode : dataNodes) { + ClusterService clusterService = internalCluster().getInstance(ClusterService.class, dataNode.getName()); + ClusterState currentState = clusterService.state(); + ClusterState newState = ClusterState.builder(currentState).blocks(blocksBuilder).build(); + setState(clusterService, newState); + } + + return unblockedIndices; + } +} diff --git a/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java b/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java index e42f8127c5e97..8adb9180e3bae 100644 --- a/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java +++ b/server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java @@ -186,6 +186,12 @@ private void runCoordinatorRewritePhase() { assert assertSearchCoordinationThread(); final List matchedShardLevelRequests = new ArrayList<>(); for (SearchShardIterator searchShardIterator : shardsIts) { + if (searchShardIterator.prefiltered() == false && searchShardIterator.skip()) { + // This implies the iterator was skipped due to an index level block, + // not a remote can-match run. + continue; + } + final CanMatchNodeRequest canMatchNodeRequest = new CanMatchNodeRequest( request, searchShardIterator.getOriginalIndices().indicesOptions(), diff --git a/server/src/main/java/org/elasticsearch/action/search/SearchShardIterator.java b/server/src/main/java/org/elasticsearch/action/search/SearchShardIterator.java index 00ff8f33f5659..45c58c5a64611 100644 --- a/server/src/main/java/org/elasticsearch/action/search/SearchShardIterator.java +++ b/server/src/main/java/org/elasticsearch/action/search/SearchShardIterator.java @@ -41,19 +41,28 @@ public final class SearchShardIterator implements ComparableshardId. + * for a given shardId. * * @param clusterAlias the alias of the cluster where the shard is located * @param shardId shard id of the group * @param shards shards to iterate * @param originalIndices the indices that the search request originally related to (before any rewriting happened) + * @param skip if true, then this group won't have matches (due to an index level block), + * and it can be safely skipped from the search */ - public SearchShardIterator(@Nullable String clusterAlias, ShardId shardId, List shards, OriginalIndices originalIndices) { - this(clusterAlias, shardId, shards.stream().map(ShardRouting::currentNodeId).toList(), originalIndices, null, null, false, false); + public SearchShardIterator( + @Nullable String clusterAlias, + ShardId shardId, + List shards, + OriginalIndices originalIndices, + boolean skip + ) { + this(clusterAlias, shardId, shards.stream().map(ShardRouting::currentNodeId).toList(), originalIndices, null, null, false, skip); } /** * Creates a {@link SearchShardIterator} instance that iterates over a subset of the given shards + * for a given shardId. * * @param clusterAlias the alias of the cluster where the shard is located * @param shardId shard id of the group @@ -62,7 +71,8 @@ public SearchShardIterator(@Nullable String clusterAlias, ShardId shardId, List< * @param searchContextId the point-in-time specified for this group if exists * @param searchContextKeepAlive the time interval that data nodes should extend the keep alive of the point-in-time * @param prefiltered if true, then this group already executed the can_match phase - * @param skip if true, then this group won't have matches, and it can be safely skipped from the search + * @param skip if true, then this group won't have matches (due to can match, or an index level block), + * and it can be safely skipped from the search */ public SearchShardIterator( @Nullable String clusterAlias, @@ -83,7 +93,6 @@ public SearchShardIterator( assert searchContextKeepAlive == null || searchContextId != null; this.prefiltered = prefiltered; this.skip = skip; - assert skip == false || prefiltered : "only prefiltered shards are skip-able"; } /** diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 69260bcac105c..d63f6dec9bfc7 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -148,6 +148,9 @@ public class TransportSearchAction extends HandledTransportAction buildPerIndexOriginalIndices( for (String index : indices) { if (hasBlocks) { blocks.indexBlockedRaiseException(projectId, ClusterBlockLevel.READ, index); + if (blocks.hasIndexBlock(projectState.projectId(), index, IndexMetadata.INDEX_REFRESH_BLOCK)) { + res.put(index, SKIPPED_INDICES); + continue; + } } String[] aliases = indexNameExpressionResolver.allIndexAliases(projectState.metadata(), index, indicesAndAliases); @@ -589,7 +596,7 @@ public void onFailure(Exception e) {} ); } - static void adjustSearchType(SearchRequest searchRequest, boolean singleShard) { + static void adjustSearchType(SearchRequest searchRequest, boolean oneOrZeroShardsToSearch) { // if there's a kNN search, always use DFS_QUERY_THEN_FETCH if (searchRequest.hasKnnSearch()) { searchRequest.searchType(DFS_QUERY_THEN_FETCH); @@ -604,7 +611,7 @@ static void adjustSearchType(SearchRequest searchRequest, boolean singleShard) { } // optimize search type for cases where there is only one shard group to search on - if (singleShard) { + if (oneOrZeroShardsToSearch) { // if we only have one group, then we always want Q_T_F, no need for DFS, and no need to do THEN since we hit one shard searchRequest.searchType(QUERY_THEN_FETCH); } @@ -1305,7 +1312,7 @@ private void executeSearch( Map concreteIndexBoosts = resolveIndexBoosts(searchRequest, projectState.cluster()); - adjustSearchType(searchRequest, shardIterators.size() == 1); + adjustSearchType(searchRequest, oneOrZeroShardsToSearch(shardIterators)); final DiscoveryNodes nodes = projectState.cluster().nodes(); BiFunction connectionLookup = buildConnectionLookup( @@ -1338,6 +1345,33 @@ private void executeSearch( ); } + /** + * Determines if only one (or zero) search shard iterators will be searched. + * (At this point, iterators may be marked as skipped due to index level blockers). + * We expect skipped iterators to be unlikely, so returning fast after we see more + * than one "not skipped" is an intended optimization. + * + * @param searchShardIterators all the shard iterators derived from indices being searched + * @return true if there are no more than one shard iterators, or if there are no more than + * one not marked to skip + */ + private boolean oneOrZeroShardsToSearch(List searchShardIterators) { + if (searchShardIterators.size() <= 1) { + return true; + } + + int notSkippedCount = 0; + for (SearchShardIterator searchShardIterator : searchShardIterators) { + if (searchShardIterator.skip() == false) { + notSkippedCount++; + if (notSkippedCount > 1) { + return false; + } + } + } + return true; + } + Executor asyncSearchExecutor(final String[] indices) { boolean seenSystem = false; boolean seenCritical = false; @@ -1890,7 +1924,13 @@ List getLocalShardsIterator( final ShardId shardId = shardRouting.shardId(); OriginalIndices finalIndices = originalIndices.get(shardId.getIndex().getName()); assert finalIndices != null; - list[i++] = new SearchShardIterator(clusterAlias, shardId, shardRouting.getShardRoutings(), finalIndices); + list[i++] = new SearchShardIterator( + clusterAlias, + shardId, + shardRouting.getShardRoutings(), + finalIndices, + finalIndices == SKIPPED_INDICES + ); } // the returned list must support in-place sorting, so this is the most memory efficient we can do here return Arrays.asList(list); diff --git a/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java b/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java index abe7e893977f4..b4a3cf3e61668 100644 --- a/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/AbstractSearchAsyncActionTests.java @@ -82,7 +82,7 @@ private AbstractSearchAsyncAction createAction( null, request, listener, - Collections.singletonList(new SearchShardIterator(null, new ShardId("index", "_na", 0), Collections.emptyList(), null)), + Collections.singletonList(new SearchShardIterator(null, new ShardId("index", "_na", 0), Collections.emptyList(), null, false)), timeProvider, ClusterState.EMPTY_STATE, null, @@ -153,7 +153,8 @@ public void testBuildShardSearchTransportRequest() { clusterAlias, new ShardId(new Index("name", "foo"), 1), Collections.emptyList(), - new OriginalIndices(new String[] { "name", "name1" }, IndicesOptions.strictExpand()) + new OriginalIndices(new String[] { "name", "name1" }, IndicesOptions.strictExpand()), + false ); ShardSearchRequest shardSearchTransportRequest = action.buildShardSearchRequest(iterator, 10); assertEquals(IndicesOptions.strictExpand(), shardSearchTransportRequest.indicesOptions()); diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java index afd3bee4c4ab8..03d2a69c8b1f7 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchAsyncActionTests.java @@ -640,7 +640,8 @@ public void testSkipUnavailableSearchShards() throws InterruptedException { null, new ShardId(index, 0), Collections.emptyList(), - originalIndices + originalIndices, + false ); // Skip all the shards searchShardIterator.skip(true); @@ -760,7 +761,7 @@ static List getShardsIter( } Collections.shuffle(started, random()); started.addAll(initializing); - list.add(new SearchShardIterator(null, new ShardId(index, i), started, originalIndices)); + list.add(new SearchShardIterator(null, new ShardId(index, i), started, originalIndices, false)); } return list; } diff --git a/server/src/test/java/org/elasticsearch/action/search/SearchShardIteratorTests.java b/server/src/test/java/org/elasticsearch/action/search/SearchShardIteratorTests.java index 79736427f634d..f16347b09d147 100644 --- a/server/src/test/java/org/elasticsearch/action/search/SearchShardIteratorTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/SearchShardIteratorTests.java @@ -45,7 +45,13 @@ private static List randomShardRoutings(ShardId shardId, int numRe public void testShardId() { ShardId shardId = new ShardId(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLength(10), randomInt()); - SearchShardIterator searchShardIterator = new SearchShardIterator(null, shardId, Collections.emptyList(), OriginalIndices.NONE); + SearchShardIterator searchShardIterator = new SearchShardIterator( + null, + shardId, + Collections.emptyList(), + OriginalIndices.NONE, + false + ); assertSame(shardId, searchShardIterator.shardId()); } @@ -55,7 +61,7 @@ public void testGetOriginalIndices() { new String[] { randomAlphaOfLengthBetween(3, 10) }, IndicesOptions.fromOptions(randomBoolean(), randomBoolean(), randomBoolean(), randomBoolean()) ); - SearchShardIterator searchShardIterator = new SearchShardIterator(null, shardId, Collections.emptyList(), originalIndices); + SearchShardIterator searchShardIterator = new SearchShardIterator(null, shardId, Collections.emptyList(), originalIndices, false); assertSame(originalIndices, searchShardIterator.getOriginalIndices()); } @@ -66,7 +72,8 @@ public void testGetClusterAlias() { clusterAlias, shardId, Collections.emptyList(), - OriginalIndices.NONE + OriginalIndices.NONE, + false ); assertEquals(clusterAlias, searchShardIterator.getClusterAlias()); } @@ -164,7 +171,13 @@ public void testCompareTo() { for (String uuid : uuids) { ShardId shardId = new ShardId(index, uuid, i); shardIterators.add( - new SearchShardIterator(null, shardId, randomShardRoutings(shardId), OriginalIndicesTests.randomOriginalIndices()) + new SearchShardIterator( + null, + shardId, + randomShardRoutings(shardId), + OriginalIndicesTests.randomOriginalIndices(), + false + ) ); for (String cluster : clusters) { shardIterators.add( @@ -172,7 +185,8 @@ public void testCompareTo() { cluster, shardId, randomShardRoutings(shardId), - OriginalIndicesTests.randomOriginalIndices() + OriginalIndicesTests.randomOriginalIndices(), + false ) ); } @@ -217,6 +231,12 @@ public void testCompareToEqualItems() { private static SearchShardIterator randomSearchShardIterator() { String clusterAlias = randomBoolean() ? null : randomAlphaOfLengthBetween(5, 10); ShardId shardId = new ShardId(randomAlphaOfLengthBetween(5, 10), randomAlphaOfLength(10), randomIntBetween(0, Integer.MAX_VALUE)); - return new SearchShardIterator(clusterAlias, shardId, randomShardRoutings(shardId), OriginalIndicesTests.randomOriginalIndices()); + return new SearchShardIterator( + clusterAlias, + shardId, + randomShardRoutings(shardId), + OriginalIndicesTests.randomOriginalIndices(), + false + ); } } diff --git a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java index 4346351c1576c..27381009159d3 100644 --- a/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java +++ b/server/src/test/java/org/elasticsearch/action/search/TransportSearchActionTests.java @@ -41,6 +41,8 @@ import org.elasticsearch.cluster.node.VersionInformation; import org.elasticsearch.cluster.project.TestProjectResolvers; import org.elasticsearch.cluster.routing.IndexRoutingTable; +import org.elasticsearch.cluster.routing.OperationRouting; +import org.elasticsearch.cluster.routing.ShardIterator; import org.elasticsearch.cluster.routing.ShardRouting; import org.elasticsearch.cluster.routing.ShardRoutingState; import org.elasticsearch.cluster.routing.TestShardRouting; @@ -133,6 +135,8 @@ import static org.hamcrest.Matchers.hasSize; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyBoolean; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.ArgumentMatchers.nullable; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -154,7 +158,7 @@ private static SearchShardIterator createSearchShardIterator( ) { ShardId shardId = new ShardId(index, id); List shardRoutings = SearchShardIteratorTests.randomShardRoutings(shardId); - return new SearchShardIterator(clusterAlias, shardId, shardRoutings, originalIndices); + return new SearchShardIterator(clusterAlias, shardId, shardRoutings, originalIndices, false); } private static ResolvedIndices createMockResolvedIndices( @@ -1812,4 +1816,104 @@ public void onFailure(Exception ex) { assertTrue(ESTestCase.terminate(threadPool)); } } + + public void testSkippedIteratorsForIndicesWithRefreshBlock() { + final ProjectId projectId = randomProjectIdOrDefault(); + + String normalIndexName = "test-normal"; + String blockedIndexName = "test-blocked"; + final String[] indexNames = { normalIndexName, blockedIndexName }; + final Index normalIndex = new Index(normalIndexName, UUIDs.randomBase64UUID()); + final Index blockedIndex = new Index(blockedIndexName, UUIDs.randomBase64UUID()); + final int numberOfShards = randomIntBetween(1, 3); + final int numberOfReplicas = randomIntBetween(0, 1); + final int totalShards = numberOfShards + numberOfShards * numberOfReplicas; + + ClusterState clusterState = ClusterStateCreationUtils.stateWithAssignedPrimariesAndReplicas( + projectId, + indexNames, + numberOfShards, + numberOfReplicas + ); + ClusterBlocks.Builder blocksBuilder = ClusterBlocks.builder().blocks(clusterState.blocks()); + blocksBuilder.addIndexBlock(projectId, "test-blocked", IndexMetadata.INDEX_REFRESH_BLOCK); + clusterState = ClusterState.builder(clusterState).blocks(blocksBuilder).build(); + List shardIts = new ArrayList<>(); + for (int i = 0; i < totalShards; i++) { + shardIts.add(new ShardIterator(new ShardId(normalIndex, randomInt()), Collections.emptyList())); + shardIts.add(new ShardIterator(new ShardId(blockedIndex, randomInt()), Collections.emptyList())); + } + final OperationRouting operationRouting = mock(OperationRouting.class); + when( + operationRouting.searchShards( + eq(clusterState.projectState(projectId)), + eq(indexNames), + any(), + nullable(String.class), + any(), + any() + ) + ).thenReturn(shardIts); + ClusterService clusterService = mock(ClusterService.class); + when(clusterService.state()).thenReturn(clusterState); + when(clusterService.getSettings()).thenReturn(Settings.EMPTY); + when(clusterService.operationRouting()).thenReturn(operationRouting); + + Settings settings = Settings.builder().put("node.name", TransportSearchAction.class.getSimpleName()).build(); + TransportVersion transportVersion = TransportVersionUtils.getNextVersion(TransportVersions.MINIMUM_CCS_VERSION, true); + ThreadPool threadPool = new ThreadPool(settings, MeterRegistry.NOOP, new DefaultBuiltInExecutorBuilders()); + try { + TransportService transportService = MockTransportService.createNewService( + Settings.EMPTY, + VersionInformation.CURRENT, + transportVersion, + threadPool + ); + NodeClient client = new NodeClient(settings, threadPool, TestProjectResolvers.alwaysThrow()); + SearchService searchService = mock(SearchService.class); + when(searchService.getRewriteContext(any(), any(), any(), anyBoolean())).thenReturn( + new QueryRewriteContext(null, null, null, null, null, null) + ); + + TransportSearchAction transportSearchAction = new TransportSearchAction( + threadPool, + new NoneCircuitBreakerService(), + transportService, + searchService, + null, + new SearchTransportService(transportService, client, null), + null, + clusterService, + new ActionFilters(Collections.emptySet()), + TestProjectResolvers.usingRequestHeader(threadPool.getThreadContext()), + TestIndexNameExpressionResolver.newInstance(threadPool.getThreadContext()), + null, + null, + new SearchResponseMetrics(TelemetryProvider.NOOP.getMeterRegistry()), + client, + new UsageService() + ); + + SearchRequest searchRequest = new SearchRequest(indexNames); + searchRequest.allowPartialSearchResults(true); + List searchShardIts = transportSearchAction.getLocalShardsIterator( + clusterState.projectState(projectId), + searchRequest, + searchRequest.getLocalClusterAlias(), + new HashSet<>(), + indexNames + ); + + assertThat(searchShardIts.size(), equalTo(shardIts.size())); + for (SearchShardIterator searchShardIt : searchShardIts) { + if (searchShardIt.skip()) { + assertThat(searchShardIt.shardId().getIndexName(), equalTo("test-blocked")); + } else { + assertThat(searchShardIt.shardId().getIndexName(), equalTo("test-normal")); + } + } + } finally { + assertTrue(ESTestCase.terminate(threadPool)); + } + } }