diff --git a/server/src/main/java/org/elasticsearch/action/support/replication/TransportBroadcastReplicationAction.java b/server/src/main/java/org/elasticsearch/action/support/replication/TransportBroadcastReplicationAction.java index 7b24388d7cf58..aeb44696e5134 100644 --- a/server/src/main/java/org/elasticsearch/action/support/replication/TransportBroadcastReplicationAction.java +++ b/server/src/main/java/org/elasticsearch/action/support/replication/TransportBroadcastReplicationAction.java @@ -22,12 +22,13 @@ import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException; import org.elasticsearch.client.internal.node.NodeClient; import org.elasticsearch.cluster.ClusterState; +import org.elasticsearch.cluster.ProjectState; import org.elasticsearch.cluster.metadata.IndexMetadata; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.project.ProjectResolver; -import org.elasticsearch.cluster.routing.IndexRoutingTable; -import org.elasticsearch.cluster.routing.RoutingTable; +import org.elasticsearch.cluster.routing.IndexShardRoutingTable; +import org.elasticsearch.cluster.routing.OperationRouting; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.io.stream.Writeable; import org.elasticsearch.common.util.concurrent.EsExecutors; @@ -39,6 +40,7 @@ import java.util.ArrayList; import java.util.Arrays; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.Executor; @@ -98,8 +100,9 @@ public void accept(ActionListener listener) { assert totalShardCopyCount == 0 && successShardCopyCount == 0 && allFailures.isEmpty() : "shouldn't call this twice"; final ClusterState clusterState = clusterService.state(); - final ProjectMetadata project = projectResolver.getProjectMetadata(clusterState); - final List shards = shards(request, project, clusterState.routingTable(project.id())); + final ProjectState projectState = projectResolver.getProjectState(clusterState); + final ProjectMetadata project = projectState.metadata(); + final List shards = shards(request, projectState); final Map indexMetadataByName = project.indices(); try (var refs = new RefCountingRunnable(() -> finish(listener))) { @@ -185,17 +188,17 @@ protected void shardExecute(Task task, Request request, ShardId shardId, ActionL /** * @return all shard ids the request should run on */ - protected List shards(Request request, ProjectMetadata project, RoutingTable indexRoutingTables) { + protected List shards(Request request, ProjectState projectState) { assert Transports.assertNotTransportThread("may hit all the shards"); List shardIds = new ArrayList<>(); - String[] concreteIndices = indexNameExpressionResolver.concreteIndexNames(project, request); + + OperationRouting operationRouting = clusterService.operationRouting(); + + String[] concreteIndices = indexNameExpressionResolver.concreteIndexNames(projectState.metadata(), request); for (String index : concreteIndices) { - IndexMetadata indexMetadata = project.indices().get(index); - if (indexMetadata != null) { - final IndexRoutingTable indexRoutingTable = indexRoutingTables.indicesRouting().get(index); - for (int i = 0; i < indexRoutingTable.size(); i++) { - shardIds.add(indexRoutingTable.shard(i).shardId()); - } + Iterator iterator = operationRouting.allWritableShards(projectState, index); + while (iterator.hasNext()) { + shardIds.add(iterator.next().shardId()); } } return shardIds; diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/IndexRouting.java b/server/src/main/java/org/elasticsearch/cluster/routing/IndexRouting.java index 050181802af8d..1e34128ac4f8d 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/IndexRouting.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/IndexRouting.java @@ -159,11 +159,18 @@ public void checkIndexSplitAllowed() {} * @param shardId shardId to which the current document is routed based on hashing * @return Updated shardId */ - protected final int rerouteIfResharding(int shardId) { + protected final int rerouteWritesIfResharding(int shardId) { + return rerouteFromSplitTargetShard(shardId, IndexReshardingState.Split.TargetShardState.HANDOFF); + } + + protected final int rerouteSearchIfResharding(int shardId) { + return rerouteFromSplitTargetShard(shardId, IndexReshardingState.Split.TargetShardState.SPLIT); + } + + private int rerouteFromSplitTargetShard(int shardId, IndexReshardingState.Split.TargetShardState minimumRequiredState) { + assert indexReshardingMetadata == null || indexReshardingMetadata.isSplit() : "Index resharding state is not a split"; if (indexReshardingMetadata != null && indexReshardingMetadata.getSplit().isTargetShard(shardId)) { - assert indexReshardingMetadata.isSplit() : "Index resharding state is not a split"; - if (indexReshardingMetadata.getSplit() - .targetStateAtLeast(shardId, IndexReshardingState.Split.TargetShardState.HANDOFF) == false) { + if (indexReshardingMetadata.getSplit().targetStateAtLeast(shardId, minimumRequiredState) == false) { return indexReshardingMetadata.getSplit().sourceShard(shardId); } } @@ -217,21 +224,21 @@ public int indexShard(String id, @Nullable String routing, XContentType sourceTy } checkRoutingRequired(id, routing); int shardId = shardId(id, routing); - return rerouteIfResharding(shardId); + return rerouteWritesIfResharding(shardId); } @Override public int updateShard(String id, @Nullable String routing) { checkRoutingRequired(id, routing); int shardId = shardId(id, routing); - return rerouteIfResharding(shardId); + return rerouteWritesIfResharding(shardId); } @Override public int deleteShard(String id, @Nullable String routing) { checkRoutingRequired(id, routing); int shardId = shardId(id, routing); - return rerouteIfResharding(shardId); + return rerouteWritesIfResharding(shardId); } @Override @@ -262,7 +269,7 @@ protected int shardId(String id, @Nullable String routing) { @Override public void collectSearchShards(String routing, IntConsumer consumer) { - consumer.accept(hashToShardId(effectiveRoutingToHash(routing))); + consumer.accept(rerouteSearchIfResharding(hashToShardId(effectiveRoutingToHash(routing)))); } } @@ -290,7 +297,7 @@ protected int shardId(String id, @Nullable String routing) { public void collectSearchShards(String routing, IntConsumer consumer) { int hash = effectiveRoutingToHash(routing); for (int i = 0; i < routingPartitionSize; i++) { - consumer.accept(hashToShardId(hash + i)); + consumer.accept(rerouteSearchIfResharding(hashToShardId(hash + i))); } } } @@ -339,7 +346,7 @@ public int indexShard(String id, @Nullable String routing, XContentType sourceTy checkNoRouting(routing); hash = hashSource(sourceType, source).buildHash(IndexRouting.ExtractFromSource::defaultOnEmpty); int shardId = hashToShardId(hash); - return (rerouteIfResharding(shardId)); + return (rerouteWritesIfResharding(shardId)); } public String createId(XContentType sourceType, BytesReference source, byte[] suffix) { @@ -480,14 +487,14 @@ public int updateShard(String id, @Nullable String routing) { public int deleteShard(String id, @Nullable String routing) { checkNoRouting(routing); int shardId = idToHash(id); - return (rerouteIfResharding(shardId)); + return (rerouteWritesIfResharding(shardId)); } @Override public int getShard(String id, @Nullable String routing) { checkNoRouting(routing); int shardId = idToHash(id); - return (rerouteIfResharding(shardId)); + return (rerouteWritesIfResharding(shardId)); } private void checkNoRouting(@Nullable String routing) { diff --git a/server/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java b/server/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java index be4e4bdc94878..39fc76de9f629 100644 --- a/server/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java +++ b/server/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java @@ -11,6 +11,8 @@ import org.elasticsearch.cluster.ProjectState; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.IndexReshardingMetadata; +import org.elasticsearch.cluster.metadata.IndexReshardingState; import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.node.DiscoveryNodes; import org.elasticsearch.common.Strings; @@ -26,6 +28,7 @@ import java.util.Arrays; import java.util.Collections; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -112,6 +115,10 @@ public List searchShards( return res; } + public Iterator allWritableShards(ProjectState projectState, String index) { + return allWriteAddressableShards(projectState, index); + } + public static ShardIterator getShards(RoutingTable routingTable, ShardId shardId) { final IndexShardRoutingTable shard = routingTable.shardRoutingTable(shardId); return shard.activeInitializingShardsRandomIt(); @@ -125,7 +132,7 @@ private static Set computeTargetedShards( // we use set here and not list since we might get duplicates final Set set = new HashSet<>(); if (routing == null || routing.isEmpty()) { - collectTargetShardsNoRouting(projectState.routingTable(), concreteIndices, set); + collectTargetShardsNoRouting(projectState, concreteIndices, set); } else { collectTargetShardsWithRouting(projectState, concreteIndices, routing, set); } @@ -147,20 +154,64 @@ private static void collectTargetShardsWithRouting( indexRouting.collectSearchShards(r, s -> set.add(RoutingTable.shardRoutingTable(indexRoutingTable, s))); } } else { - for (int i = 0; i < indexRoutingTable.size(); i++) { - set.add(indexRoutingTable.shard(i)); + Iterator iterator = allSearchAddressableShards(projectState, index); + while (iterator.hasNext()) { + set.add(iterator.next()); } } } } - private static void collectTargetShardsNoRouting(RoutingTable routingTable, String[] concreteIndices, Set set) { + private static void collectTargetShardsNoRouting(ProjectState projectState, String[] concreteIndices, Set set) { for (String index : concreteIndices) { - final IndexRoutingTable indexRoutingTable = indexRoutingTable(routingTable, index); - for (int i = 0; i < indexRoutingTable.size(); i++) { - set.add(indexRoutingTable.shard(i)); + Iterator iterator = allSearchAddressableShards(projectState, index); + while (iterator.hasNext()) { + set.add(iterator.next()); + } + } + } + + /** + * Returns an iterator of shards that can possibly serve searches. A shard may not be addressable during processes like resharding. + * This logic is not related to shard state or a recovery process. A shard returned here may f.e. be unassigned. + */ + private static Iterator allSearchAddressableShards(ProjectState projectState, String index) { + return allShardsExceptSplitTargetsInStateBefore(projectState, index, IndexReshardingState.Split.TargetShardState.SPLIT); + } + + /** + * Returns an iterator of shards that can possibly serve writes. A shard may not be addressable during processes like resharding. + * This logic is not related to shard state or a recovery process. A shard returned here may f.e. be unassigned. + */ + private static Iterator allWriteAddressableShards(ProjectState projectState, String index) { + return allShardsExceptSplitTargetsInStateBefore(projectState, index, IndexReshardingState.Split.TargetShardState.HANDOFF); + } + + /** + * Filters shards based on their state in resharding metadata. If resharing metadata is not present returns all shards. + */ + private static Iterator allShardsExceptSplitTargetsInStateBefore( + ProjectState projectState, + String index, + IndexReshardingState.Split.TargetShardState targetShardState + ) { + final IndexRoutingTable indexRoutingTable = indexRoutingTable(projectState.routingTable(), index); + final IndexMetadata indexMetadata = indexMetadata(projectState.metadata(), index); + if (indexMetadata.getReshardingMetadata() == null) { + return indexRoutingTable.allShards().iterator(); + } + + final IndexReshardingMetadata indexReshardingMetadata = indexMetadata.getReshardingMetadata(); + assert indexReshardingMetadata.isSplit(); + final IndexReshardingState.Split splitState = indexReshardingMetadata.getSplit(); + + var shards = new ArrayList(); + for (int i = 0; i < indexRoutingTable.size(); i++) { + if (splitState.isTargetShard(i) == false || splitState.targetStateAtLeast(i, targetShardState)) { + shards.add(indexRoutingTable.shard(i)); } } + return shards.iterator(); } private ShardIterator preferenceActiveShardIterator( diff --git a/server/src/test/java/org/elasticsearch/action/support/replication/BroadcastReplicationTests.java b/server/src/test/java/org/elasticsearch/action/support/replication/BroadcastReplicationTests.java index c1567d72d059a..15b66ed32dbad 100644 --- a/server/src/test/java/org/elasticsearch/action/support/replication/BroadcastReplicationTests.java +++ b/server/src/test/java/org/elasticsearch/action/support/replication/BroadcastReplicationTests.java @@ -248,8 +248,7 @@ public void testShardsList() throws InterruptedException, ExecutionException { logger.debug("--> using initial state:\n{}", clusterService.state()); List shards = broadcastReplicationAction.shards( new DummyBroadcastRequest().indices(shardId.getIndexName()), - clusterState.metadata().getProject(projectId), - clusterState.routingTable(projectId) + clusterState.projectState(projectId) ); assertThat(shards.size(), equalTo(1)); assertThat(shards.get(0), equalTo(shardId)); diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/IndexRoutingTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/IndexRoutingTests.java index 1db6192eee80e..570a4ac1a7c3b 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/IndexRoutingTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/IndexRoutingTests.java @@ -14,6 +14,8 @@ import org.elasticsearch.action.RoutingMissingException; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.IndexReshardingMetadata; +import org.elasticsearch.cluster.metadata.IndexReshardingState; import org.elasticsearch.common.bytes.BytesReference; import org.elasticsearch.core.Nullable; import org.elasticsearch.index.IndexMode; @@ -30,6 +32,7 @@ import org.elasticsearch.xcontent.support.MapXContentParser; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; @@ -38,6 +41,7 @@ import java.util.Set; import java.util.TreeMap; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Function; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; @@ -687,6 +691,191 @@ public void testRoutingPathLogsdb() throws IOException { assertEquals(expectedShard, routing.getShard(req.id(), null)); } + public void testCollectSearchShardsUnpartitionedWithResharding() throws IOException { + int shards = 1; + int newShardCount = 2; + var initialRouting = IndexRouting.fromIndexMetadata( + IndexMetadata.builder("test") + .settings(indexSettings(IndexVersion.current(), 2, 1)) + .numberOfShards(newShardCount) + .numberOfReplicas(1) + .build() + ); + + var shardToRouting = new HashMap(); + do { + var routing = randomAlphaOfLength(5); + var shard = initialRouting.indexShard("dummy", routing, null, null); + if (shardToRouting.containsKey(shard) == false) { + shardToRouting.put(shard, routing); + } + } while (shardToRouting.size() < newShardCount); + + var initialReshardingRouting = IndexRouting.fromIndexMetadata( + IndexMetadata.builder("test") + .settings(indexSettings(IndexVersion.current(), newShardCount, 1)) + .numberOfShards(newShardCount) + .numberOfReplicas(1) + .reshardingMetadata(IndexReshardingMetadata.newSplitByMultiple(shards, 2)) + .build() + ); + + for (var shardAndRouting : shardToRouting.entrySet()) { + var collectedShards = new ArrayList<>(); + initialReshardingRouting.collectSearchShards(shardAndRouting.getValue(), collectedShards::add); + assertEquals(1, collectedShards.size()); + // Rerouting is in effect due to resharding metadata having a shard in CLONE state. + assertEquals(0, collectedShards.get(0)); + } + + var reshardingRoutingWithShardInHandoff = IndexRouting.fromIndexMetadata( + IndexMetadata.builder("test") + .settings(indexSettings(IndexVersion.current(), newShardCount, 1)) + .numberOfShards(newShardCount) + .numberOfReplicas(1) + .reshardingMetadata( + IndexReshardingMetadata.newSplitByMultiple(shards, 2) + .transitionSplitTargetToNewState(new ShardId("test", "na", 1), IndexReshardingState.Split.TargetShardState.HANDOFF) + ) + .build() + ); + + for (var shardAndRouting : shardToRouting.entrySet()) { + var collectedShards = new ArrayList<>(); + reshardingRoutingWithShardInHandoff.collectSearchShards(shardAndRouting.getValue(), collectedShards::add); + assertEquals(1, collectedShards.size()); + // Rerouting is in effect due to resharding metadata having a shard in CLONE state. + assertEquals(0, collectedShards.get(0)); + } + + var reshardingRoutingWithShardInSplit = IndexRouting.fromIndexMetadata( + IndexMetadata.builder("test") + .settings(indexSettings(IndexVersion.current(), newShardCount, 1)) + .numberOfShards(newShardCount) + .numberOfReplicas(1) + .reshardingMetadata( + IndexReshardingMetadata.newSplitByMultiple(shards, 2) + .transitionSplitTargetToNewState(new ShardId("test", "na", 1), IndexReshardingState.Split.TargetShardState.HANDOFF) + .transitionSplitTargetToNewState(new ShardId("test", "na", 1), IndexReshardingState.Split.TargetShardState.SPLIT) + ) + .build() + ); + + for (var shardAndRouting : shardToRouting.entrySet()) { + var collectedShards = new ArrayList<>(); + reshardingRoutingWithShardInSplit.collectSearchShards(shardAndRouting.getValue(), collectedShards::add); + assertEquals(1, collectedShards.size()); + // Rerouting is no longer in effect since resharding metadata has SPLIT state for this shard + assertEquals(shardAndRouting.getKey(), collectedShards.get(0)); + } + } + + public void testCollectSearchShardsPartitionedWithResharding() throws IOException { + int preReshardShards = 4; + int postReshardShards = 8; + var initialRouting = IndexRouting.fromIndexMetadata( + IndexMetadata.builder("test") + .settings(indexSettings(IndexVersion.current(), 8, 1)) + .numberOfShards(postReshardShards) + .numberOfReplicas(1) + .routingPartitionSize(2) + .build() + ); + + var shardToRouting = new TreeMap(); + do { + var routing = randomAlphaOfLength(5); + var shard = initialRouting.indexShard("dummy", routing, null, null); + if (shardToRouting.containsKey(shard) == false) { + shardToRouting.put(shard, routing); + } + } while (shardToRouting.size() < postReshardShards); + + var initialReshardingRouting = IndexRouting.fromIndexMetadata( + IndexMetadata.builder("test") + .settings(indexSettings(IndexVersion.current(), postReshardShards, 1)) + .numberOfShards(postReshardShards) + .numberOfReplicas(1) + .routingPartitionSize(2) + .reshardingMetadata(IndexReshardingMetadata.newSplitByMultiple(preReshardShards, 2)) + .build() + ); + + // Rerouting is in effect due to presence of resharding metadata. + // We won't see shard 4 and above (there is a corresponding logic in index operation routing). + Function adjustForResharding = i -> i < preReshardShards + ? i + : IndexReshardingMetadata.newSplitByMultiple(preReshardShards, 2).getSplit().sourceShard(i); + + for (var shardAndRouting : shardToRouting.entrySet()) { + var collectedShards = new ArrayList(); + initialReshardingRouting.collectSearchShards(shardAndRouting.getValue(), collectedShards::add); + assertEquals(2, collectedShards.size()); + + var expected = new ArrayList(); + expected.add(adjustForResharding.apply(shardAndRouting.getKey())); + expected.add(adjustForResharding.apply(shardAndRouting.getKey() + 1)); + + assertEquals(expected, collectedShards); + } + + var reshardingRoutingWithShardInHandoff = IndexRouting.fromIndexMetadata( + IndexMetadata.builder("test") + .settings(indexSettings(IndexVersion.current(), postReshardShards, 1)) + .numberOfShards(postReshardShards) + .numberOfReplicas(1) + .routingPartitionSize(2) + .reshardingMetadata( + IndexReshardingMetadata.newSplitByMultiple(preReshardShards, 2) + .transitionSplitTargetToNewState(new ShardId("test", "na", 4), IndexReshardingState.Split.TargetShardState.HANDOFF) + ) + .build() + ); + + // Rerouting is in effect due to presence of resharding metadata. + // We won't see shard 4 and above (there is a corresponding logic in index operation routing). + for (var shardAndRouting : shardToRouting.entrySet()) { + var collectedShards = new ArrayList(); + reshardingRoutingWithShardInHandoff.collectSearchShards(shardAndRouting.getValue(), collectedShards::add); + assertEquals(2, collectedShards.size()); + + var expected = new ArrayList(); + expected.add(adjustForResharding.apply(shardAndRouting.getKey())); + expected.add(adjustForResharding.apply(shardAndRouting.getKey() + 1)); + + assertEquals(expected, collectedShards); + } + + var reshardingRoutingWithShardInSplit = IndexRouting.fromIndexMetadata( + IndexMetadata.builder("test") + .settings(indexSettings(IndexVersion.current(), postReshardShards, 1)) + .numberOfShards(postReshardShards) + .numberOfReplicas(1) + .routingPartitionSize(2) + .reshardingMetadata( + IndexReshardingMetadata.newSplitByMultiple(preReshardShards, 2) + .transitionSplitTargetToNewState(new ShardId("test", "na", 4), IndexReshardingState.Split.TargetShardState.HANDOFF) + .transitionSplitTargetToNewState(new ShardId("test", "na", 4), IndexReshardingState.Split.TargetShardState.SPLIT) + ) + .build() + ); + + // Shard 4 can now be included in routing too based on the resharding metadata, adjust the rule. + adjustForResharding = i -> i < 5 ? i : IndexReshardingMetadata.newSplitByMultiple(preReshardShards, 2).getSplit().sourceShard(i); + + for (var shardAndRouting : shardToRouting.entrySet()) { + var collectedShards = new ArrayList(); + reshardingRoutingWithShardInSplit.collectSearchShards(shardAndRouting.getValue(), collectedShards::add); + assertEquals(2, collectedShards.size()); + + var expected = new ArrayList(); + expected.add(adjustForResharding.apply(shardAndRouting.getKey())); + expected.add(adjustForResharding.apply(shardAndRouting.getKey() + 1)); + + assertEquals(expected, collectedShards); + } + } + /** * Extract a shardId from an {@link IndexRouting} that extracts routingusing a randomly * chosen method. All of the random methods should return the diff --git a/server/src/test/java/org/elasticsearch/cluster/routing/OperationRoutingTests.java b/server/src/test/java/org/elasticsearch/cluster/routing/OperationRoutingTests.java index 2f90a72ce12cb..679fd3f0e9fb2 100644 --- a/server/src/test/java/org/elasticsearch/cluster/routing/OperationRoutingTests.java +++ b/server/src/test/java/org/elasticsearch/cluster/routing/OperationRoutingTests.java @@ -9,9 +9,15 @@ package org.elasticsearch.cluster.routing; import org.elasticsearch.action.support.replication.ClusterStateCreationUtils; +import org.elasticsearch.cluster.ClusterName; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.ProjectState; +import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.IndexReshardingMetadata; +import org.elasticsearch.cluster.metadata.IndexReshardingState; +import org.elasticsearch.cluster.metadata.Metadata; import org.elasticsearch.cluster.metadata.ProjectId; +import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.Settings; @@ -19,6 +25,7 @@ import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.TimeValue; import org.elasticsearch.index.Index; +import org.elasticsearch.index.IndexVersion; import org.elasticsearch.index.shard.ShardId; import org.elasticsearch.node.ResponseCollectorService; import org.elasticsearch.test.ClusterServiceUtils; @@ -468,4 +475,174 @@ public void testARSOutstandingRequestTracking() throws Exception { terminate(threadPool); } + public void testOperationRoutingWithResharding() throws IOException { + var threadPool = new TestThreadPool("testOperationRoutingWithResharding"); + var clusterService = ClusterServiceUtils.createClusterService(threadPool); + + final ProjectId projectId = randomProjectIdOrDefault(); + final String indexName = "test"; + final int shardCount = 1; + final int newShardCount = randomIntBetween(2, 5); + + var indexMetadata = IndexMetadata.builder(indexName) + .settings(indexSettings(IndexVersion.current(), newShardCount, 1)) + .numberOfShards(newShardCount) + .numberOfReplicas(1) + .reshardingMetadata(IndexReshardingMetadata.newSplitByMultiple(shardCount, newShardCount)) + .build(); + + ClusterState.Builder initialStateBuilder = ClusterState.builder(new ClusterName("test")); + initialStateBuilder.metadata( + Metadata.builder().put(ProjectMetadata.builder(projectId).put(indexMetadata, false)).generateClusterUuidIfNeeded() + ); + IndexRoutingTable.Builder indexRoutingTableBuilder = IndexRoutingTable.builder(indexMetadata.getIndex()); + for (int i = 0; i < newShardCount; i++) { + final ShardId shardId = new ShardId(indexName, "_na_", i); + IndexShardRoutingTable.Builder indexShardRoutingBuilder = IndexShardRoutingTable.builder(shardId); + indexShardRoutingBuilder.addShard( + TestShardRouting.newShardRouting(indexName, i, "node0", null, true, ShardRoutingState.STARTED) + ); + indexShardRoutingBuilder.addShard( + TestShardRouting.newShardRouting(indexName, i, "node1", null, false, ShardRoutingState.STARTED) + ); + indexRoutingTableBuilder.addIndexShard(indexShardRoutingBuilder); + } + initialStateBuilder.routingTable( + GlobalRoutingTable.builder().put(projectId, RoutingTable.builder().add(indexRoutingTableBuilder.build())).build() + ); + + var initialState = initialStateBuilder.build(); + ClusterServiceUtils.setState(clusterService, initialState); + + var initialSearchShards = clusterService.operationRouting() + .searchShards(clusterService.state().projectState(projectId), new String[] { indexName }, null, null); + assertEquals(shardCount, initialSearchShards.size()); + assertEquals(0, initialSearchShards.get(0).shardId().id()); + + // We are testing a case when there is routing configuration but not for the index in question. + // Actual routing behavior is tested in IndexRoutingTests. + var initialSearchShardsWithRouting = clusterService.operationRouting() + .searchShards(clusterService.state().projectState(projectId), new String[] { indexName }, Map.of("other", Set.of("1")), null); + assertEquals(shardCount, initialSearchShardsWithRouting.size()); + assertEquals(0, initialSearchShardsWithRouting.get(0).shardId().id()); + + var initialWriteableShards = clusterService.operationRouting() + .allWritableShards(clusterService.state().projectState(projectId), indexName); + assertEquals(0, initialWriteableShards.next().shardId().id()); + assertFalse(initialWriteableShards.hasNext()); + + final Index index = clusterService.state().metadata().getProject(projectId).index(indexName).getIndex(); + + var shardChangingSplitTargetState = randomIntBetween(1, newShardCount - 1); + + var currentIndexMetadata = clusterService.state().projectState(projectId).metadata().index(indexName); + var updatedReshardingMetadataOneShardInHandoff = IndexMetadata.builder(currentIndexMetadata) + .reshardingMetadata( + currentIndexMetadata.getReshardingMetadata() + .transitionSplitTargetToNewState( + new ShardId(index, shardChangingSplitTargetState), + IndexReshardingState.Split.TargetShardState.HANDOFF + ) + ) + .build(); + var newState = ClusterState.builder(initialState) + .putProjectMetadata( + ProjectMetadata.builder(initialState.projectState(projectId).metadata()) + .put(updatedReshardingMetadataOneShardInHandoff, true) + .build() + ); + ClusterServiceUtils.setState(clusterService, newState); + + var searchShardsWithOneShardHandoff = clusterService.operationRouting() + .searchShards(clusterService.state().projectState(projectId), new String[] { indexName }, null, null); + assertEquals(shardCount, searchShardsWithOneShardHandoff.size()); + assertEquals(0, searchShardsWithOneShardHandoff.get(0).shardId().id()); + + var searchShardsWithOneShardHandoffAndRouting = clusterService.operationRouting() + .searchShards(clusterService.state().projectState(projectId), new String[] { indexName }, Map.of("other", Set.of("1")), null); + assertEquals(shardCount, searchShardsWithOneShardHandoffAndRouting.size()); + assertEquals(0, searchShardsWithOneShardHandoffAndRouting.get(0).shardId().id()); + + var writeableShardsWithOneShardHandoff = clusterService.operationRouting() + .allWritableShards(clusterService.state().projectState(projectId), indexName); + assertEquals(0, writeableShardsWithOneShardHandoff.next().shardId().id()); + assertEquals(shardChangingSplitTargetState, writeableShardsWithOneShardHandoff.next().shardId().id()); + assertFalse(writeableShardsWithOneShardHandoff.hasNext()); + + currentIndexMetadata = clusterService.state().projectState(projectId).metadata().index(indexName); + var updatedReshardingMetadataOneShardInSplit = IndexMetadata.builder(currentIndexMetadata) + .reshardingMetadata( + currentIndexMetadata.getReshardingMetadata() + .transitionSplitTargetToNewState( + new ShardId(index, shardChangingSplitTargetState), + IndexReshardingState.Split.TargetShardState.SPLIT + ) + ) + .build(); + newState = ClusterState.builder(initialState) + .putProjectMetadata( + ProjectMetadata.builder(initialState.projectState(projectId).metadata()) + .put(updatedReshardingMetadataOneShardInSplit, true) + .build() + ); + ClusterServiceUtils.setState(clusterService, newState); + + var searchShardsWithOneShardSplit = clusterService.operationRouting() + .searchShards(clusterService.state().projectState(projectId), new String[] { indexName }, null, null); + assertEquals(shardCount + 1, searchShardsWithOneShardSplit.size()); + assertEquals(0, searchShardsWithOneShardSplit.get(0).shardId().id()); + assertEquals(shardChangingSplitTargetState, searchShardsWithOneShardSplit.get(1).shardId().id()); + + var searchShardsWithOneShardSplitAndRouting = clusterService.operationRouting() + .searchShards(clusterService.state().projectState(projectId), new String[] { indexName }, Map.of("other", Set.of("1")), null); + assertEquals(shardCount + 1, searchShardsWithOneShardSplitAndRouting.size()); + assertEquals(0, searchShardsWithOneShardSplitAndRouting.get(0).shardId().id()); + assertEquals(shardChangingSplitTargetState, searchShardsWithOneShardSplitAndRouting.get(1).shardId().id()); + + var writeableShardsWithOneShardSplit = clusterService.operationRouting() + .allWritableShards(clusterService.state().projectState(projectId), indexName); + assertEquals(0, writeableShardsWithOneShardSplit.next().shardId().id()); + assertEquals(shardChangingSplitTargetState, writeableShardsWithOneShardSplit.next().shardId().id()); + assertFalse(writeableShardsWithOneShardSplit.hasNext()); + + currentIndexMetadata = clusterService.state().projectState(projectId).metadata().index(indexName); + var updatedReshardingMetadataOneShardInDone = IndexMetadata.builder(currentIndexMetadata) + .reshardingMetadata( + currentIndexMetadata.getReshardingMetadata() + .transitionSplitTargetToNewState( + new ShardId(index, shardChangingSplitTargetState), + IndexReshardingState.Split.TargetShardState.DONE + ) + ) + .build(); + newState = ClusterState.builder(initialState) + .putProjectMetadata( + ProjectMetadata.builder(initialState.projectState(projectId).metadata()) + .put(updatedReshardingMetadataOneShardInDone, true) + .build() + ); + ClusterServiceUtils.setState(clusterService, newState); + + var searchShardsWithOneShardDone = clusterService.operationRouting() + .searchShards(clusterService.state().projectState(projectId), new String[] { indexName }, null, null); + assertEquals(shardCount + 1, searchShardsWithOneShardDone.size()); + assertEquals(0, searchShardsWithOneShardDone.get(0).shardId().id()); + assertEquals(shardChangingSplitTargetState, searchShardsWithOneShardDone.get(1).shardId().id()); + + var searchShardsWithOneShardDoneAndRouting = clusterService.operationRouting() + .searchShards(clusterService.state().projectState(projectId), new String[] { indexName }, Map.of("other", Set.of("1")), null); + assertEquals(shardCount + 1, searchShardsWithOneShardDoneAndRouting.size()); + assertEquals(0, searchShardsWithOneShardDoneAndRouting.get(0).shardId().id()); + assertEquals(shardChangingSplitTargetState, searchShardsWithOneShardDoneAndRouting.get(1).shardId().id()); + + var writeableShardsWithOneShardDone = clusterService.operationRouting() + .allWritableShards(clusterService.state().projectState(projectId), indexName); + assertEquals(0, writeableShardsWithOneShardDone.next().shardId().id()); + assertEquals(shardChangingSplitTargetState, writeableShardsWithOneShardDone.next().shardId().id()); + assertFalse(writeableShardsWithOneShardDone.hasNext()); + + IOUtils.close(clusterService); + terminate(threadPool); + } + }