Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException;
import org.elasticsearch.client.internal.node.NodeClient;
import org.elasticsearch.cluster.ClusterState;
import org.elasticsearch.cluster.ProjectState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.project.ProjectResolver;
import org.elasticsearch.cluster.routing.IndexRoutingTable;
import org.elasticsearch.cluster.routing.RoutingTable;
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
import org.elasticsearch.cluster.routing.OperationRouting;
import org.elasticsearch.cluster.service.ClusterService;
import org.elasticsearch.common.io.stream.Writeable;
import org.elasticsearch.common.util.concurrent.EsExecutors;
Expand All @@ -39,6 +40,7 @@

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
Expand Down Expand Up @@ -98,8 +100,9 @@ public void accept(ActionListener<Response> listener) {
assert totalShardCopyCount == 0 && successShardCopyCount == 0 && allFailures.isEmpty() : "shouldn't call this twice";

final ClusterState clusterState = clusterService.state();
final ProjectMetadata project = projectResolver.getProjectMetadata(clusterState);
final List<ShardId> shards = shards(request, project, clusterState.routingTable(project.id()));
final ProjectState projectState = projectResolver.getProjectState(clusterState);
final ProjectMetadata project = projectState.metadata();
final List<ShardId> shards = shards(request, projectState);
final Map<String, IndexMetadata> indexMetadataByName = project.indices();

try (var refs = new RefCountingRunnable(() -> finish(listener))) {
Expand Down Expand Up @@ -185,17 +188,17 @@ protected void shardExecute(Task task, Request request, ShardId shardId, ActionL
/**
* @return all shard ids the request should run on
*/
protected List<ShardId> shards(Request request, ProjectMetadata project, RoutingTable indexRoutingTables) {
protected List<ShardId> shards(Request request, ProjectState projectState) {
assert Transports.assertNotTransportThread("may hit all the shards");
List<ShardId> shardIds = new ArrayList<>();
String[] concreteIndices = indexNameExpressionResolver.concreteIndexNames(project, request);

OperationRouting operationRouting = clusterService.operationRouting();

String[] concreteIndices = indexNameExpressionResolver.concreteIndexNames(projectState.metadata(), request);
for (String index : concreteIndices) {
IndexMetadata indexMetadata = project.indices().get(index);
if (indexMetadata != null) {
final IndexRoutingTable indexRoutingTable = indexRoutingTables.indicesRouting().get(index);
for (int i = 0; i < indexRoutingTable.size(); i++) {
shardIds.add(indexRoutingTable.shard(i).shardId());
}
Iterator<IndexShardRoutingTable> iterator = operationRouting.allWritableShards(projectState, index);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This handles refresh and flush APIs.

while (iterator.hasNext()) {
shardIds.add(iterator.next().shardId());
}
}
return shardIds;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,11 +159,18 @@ public void checkIndexSplitAllowed() {}
* @param shardId shardId to which the current document is routed based on hashing
* @return Updated shardId
*/
protected final int rerouteIfResharding(int shardId) {
protected final int rerouteWritesIfResharding(int shardId) {
return rerouteFromSplitTargetShard(shardId, IndexReshardingState.Split.TargetShardState.HANDOFF);
}

protected final int rerouteSearchIfResharding(int shardId) {
return rerouteFromSplitTargetShard(shardId, IndexReshardingState.Split.TargetShardState.SPLIT);
}

private int rerouteFromSplitTargetShard(int shardId, IndexReshardingState.Split.TargetShardState minimumRequiredState) {
assert indexReshardingMetadata == null || indexReshardingMetadata.isSplit() : "Index resharding state is not a split";
if (indexReshardingMetadata != null && indexReshardingMetadata.getSplit().isTargetShard(shardId)) {
assert indexReshardingMetadata.isSplit() : "Index resharding state is not a split";
if (indexReshardingMetadata.getSplit()
.targetStateAtLeast(shardId, IndexReshardingState.Split.TargetShardState.HANDOFF) == false) {
if (indexReshardingMetadata.getSplit().targetStateAtLeast(shardId, minimumRequiredState) == false) {
return indexReshardingMetadata.getSplit().sourceShard(shardId);
}
}
Expand Down Expand Up @@ -217,21 +224,21 @@ public int indexShard(String id, @Nullable String routing, XContentType sourceTy
}
checkRoutingRequired(id, routing);
int shardId = shardId(id, routing);
return rerouteIfResharding(shardId);
return rerouteWritesIfResharding(shardId);
}

@Override
public int updateShard(String id, @Nullable String routing) {
checkRoutingRequired(id, routing);
int shardId = shardId(id, routing);
return rerouteIfResharding(shardId);
return rerouteWritesIfResharding(shardId);
}

@Override
public int deleteShard(String id, @Nullable String routing) {
checkRoutingRequired(id, routing);
int shardId = shardId(id, routing);
return rerouteIfResharding(shardId);
return rerouteWritesIfResharding(shardId);
}

@Override
Expand Down Expand Up @@ -262,7 +269,7 @@ protected int shardId(String id, @Nullable String routing) {

@Override
public void collectSearchShards(String routing, IntConsumer consumer) {
consumer.accept(hashToShardId(effectiveRoutingToHash(routing)));
consumer.accept(rerouteSearchIfResharding(hashToShardId(effectiveRoutingToHash(routing))));
}
}

Expand Down Expand Up @@ -290,7 +297,7 @@ protected int shardId(String id, @Nullable String routing) {
public void collectSearchShards(String routing, IntConsumer consumer) {
int hash = effectiveRoutingToHash(routing);
for (int i = 0; i < routingPartitionSize; i++) {
consumer.accept(hashToShardId(hash + i));
consumer.accept(rerouteSearchIfResharding(hashToShardId(hash + i)));
}
}
}
Expand Down Expand Up @@ -339,7 +346,7 @@ public int indexShard(String id, @Nullable String routing, XContentType sourceTy
checkNoRouting(routing);
hash = hashSource(sourceType, source).buildHash(IndexRouting.ExtractFromSource::defaultOnEmpty);
int shardId = hashToShardId(hash);
return (rerouteIfResharding(shardId));
return (rerouteWritesIfResharding(shardId));
}

public String createId(XContentType sourceType, BytesReference source, byte[] suffix) {
Expand Down Expand Up @@ -480,14 +487,14 @@ public int updateShard(String id, @Nullable String routing) {
public int deleteShard(String id, @Nullable String routing) {
checkNoRouting(routing);
int shardId = idToHash(id);
return (rerouteIfResharding(shardId));
return (rerouteWritesIfResharding(shardId));
}

@Override
public int getShard(String id, @Nullable String routing) {
checkNoRouting(routing);
int shardId = idToHash(id);
return (rerouteIfResharding(shardId));
return (rerouteWritesIfResharding(shardId));
}

private void checkNoRouting(@Nullable String routing) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@

import org.elasticsearch.cluster.ProjectState;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.cluster.metadata.IndexReshardingMetadata;
import org.elasticsearch.cluster.metadata.IndexReshardingState;
import org.elasticsearch.cluster.metadata.ProjectMetadata;
import org.elasticsearch.cluster.node.DiscoveryNodes;
import org.elasticsearch.common.Strings;
Expand All @@ -26,6 +28,7 @@
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -112,6 +115,10 @@ public List<ShardIterator> searchShards(
return res;
}

public Iterator<IndexShardRoutingTable> allWritableShards(ProjectState projectState, String index) {
return allWriteAddressableShards(projectState, index);
}

public static ShardIterator getShards(RoutingTable routingTable, ShardId shardId) {
final IndexShardRoutingTable shard = routingTable.shardRoutingTable(shardId);
return shard.activeInitializingShardsRandomIt();
Expand All @@ -125,7 +132,7 @@ private static Set<IndexShardRoutingTable> computeTargetedShards(
// we use set here and not list since we might get duplicates
final Set<IndexShardRoutingTable> set = new HashSet<>();
if (routing == null || routing.isEmpty()) {
collectTargetShardsNoRouting(projectState.routingTable(), concreteIndices, set);
collectTargetShardsNoRouting(projectState, concreteIndices, set);
} else {
collectTargetShardsWithRouting(projectState, concreteIndices, routing, set);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

how is this branch resolving target shards to source shards before split?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is the routing passed in here based on what we got from IndexRouting ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is handled inside IndexRouting#collectSearchShards, yes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, but what about this codepath in collectTargetShardsWithRouting. When is this called and why does this not have to check for resharding ?

else {
                for (int i = 0; i < indexRoutingTable.size(); i++) {
                    set.add(indexRoutingTable.shard(i));
        }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it should, thanks for catching that.

}
Expand All @@ -147,20 +154,64 @@ private static void collectTargetShardsWithRouting(
indexRouting.collectSearchShards(r, s -> set.add(RoutingTable.shardRoutingTable(indexRoutingTable, s)));
}
} else {
for (int i = 0; i < indexRoutingTable.size(); i++) {
set.add(indexRoutingTable.shard(i));
Iterator<IndexShardRoutingTable> iterator = allSearchAddressableShards(projectState, index);
while (iterator.hasNext()) {
set.add(iterator.next());
}
}
}
}

private static void collectTargetShardsNoRouting(RoutingTable routingTable, String[] concreteIndices, Set<IndexShardRoutingTable> set) {
private static void collectTargetShardsNoRouting(ProjectState projectState, String[] concreteIndices, Set<IndexShardRoutingTable> set) {
for (String index : concreteIndices) {
final IndexRoutingTable indexRoutingTable = indexRoutingTable(routingTable, index);
for (int i = 0; i < indexRoutingTable.size(); i++) {
set.add(indexRoutingTable.shard(i));
Iterator<IndexShardRoutingTable> iterator = allSearchAddressableShards(projectState, index);
while (iterator.hasNext()) {
set.add(iterator.next());
}
}
}

/**
* Returns an iterator of shards that can possibly serve searches. A shard may not be addressable during processes like resharding.
* This logic is not related to shard state or a recovery process. A shard returned here may f.e. be unassigned.
*/
private static Iterator<IndexShardRoutingTable> allSearchAddressableShards(ProjectState projectState, String index) {
return allShardsExceptSplitTargetsInStateBefore(projectState, index, IndexReshardingState.Split.TargetShardState.SPLIT);
}

/**
* Returns an iterator of shards that can possibly serve writes. A shard may not be addressable during processes like resharding.
* This logic is not related to shard state or a recovery process. A shard returned here may f.e. be unassigned.
*/
private static Iterator<IndexShardRoutingTable> allWriteAddressableShards(ProjectState projectState, String index) {
return allShardsExceptSplitTargetsInStateBefore(projectState, index, IndexReshardingState.Split.TargetShardState.HANDOFF);
}

/**
* Filters shards based on their state in resharding metadata. If resharing metadata is not present returns all shards.
*/
private static Iterator<IndexShardRoutingTable> allShardsExceptSplitTargetsInStateBefore(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a comment here so the reader knows it is associated with resharding and not to be confused with the split API.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that class level documentation on IndexReshardingMetadata can be fairly easily discovered from this code and it explains what it is.

ProjectState projectState,
String index,
IndexReshardingState.Split.TargetShardState targetShardState
) {
final IndexRoutingTable indexRoutingTable = indexRoutingTable(projectState.routingTable(), index);
final IndexMetadata indexMetadata = indexMetadata(projectState.metadata(), index);
if (indexMetadata.getReshardingMetadata() == null) {
return indexRoutingTable.allShards().iterator();
}

final IndexReshardingMetadata indexReshardingMetadata = indexMetadata.getReshardingMetadata();
assert indexReshardingMetadata.isSplit();
final IndexReshardingState.Split splitState = indexReshardingMetadata.getSplit();

var shards = new ArrayList<IndexShardRoutingTable>();
for (int i = 0; i < indexRoutingTable.size(); i++) {
if (splitState.isTargetShard(i) == false || splitState.targetStateAtLeast(i, targetShardState)) {
shards.add(indexRoutingTable.shard(i));
}
}
return shards.iterator();
}

private ShardIterator preferenceActiveShardIterator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,8 +248,7 @@ public void testShardsList() throws InterruptedException, ExecutionException {
logger.debug("--> using initial state:\n{}", clusterService.state());
List<ShardId> shards = broadcastReplicationAction.shards(
new DummyBroadcastRequest().indices(shardId.getIndexName()),
clusterState.metadata().getProject(projectId),
clusterState.routingTable(projectId)
clusterState.projectState(projectId)
);
assertThat(shards.size(), equalTo(1));
assertThat(shards.get(0), equalTo(shardId));
Expand Down
Loading