Skip to content

Commit 6c1ee55

Browse files
committed
Do not use split shards for search and refresh if they are not ready
1 parent 75060a9 commit 6c1ee55

File tree

6 files changed

+97
-28
lines changed

6 files changed

+97
-28
lines changed

server/src/main/java/org/elasticsearch/action/support/replication/TransportBroadcastReplicationAction.java

Lines changed: 14 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,12 +22,13 @@
2222
import org.elasticsearch.action.support.broadcast.BroadcastShardOperationFailedException;
2323
import org.elasticsearch.client.internal.node.NodeClient;
2424
import org.elasticsearch.cluster.ClusterState;
25+
import org.elasticsearch.cluster.ProjectState;
2526
import org.elasticsearch.cluster.metadata.IndexMetadata;
2627
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
2728
import org.elasticsearch.cluster.metadata.ProjectMetadata;
2829
import org.elasticsearch.cluster.project.ProjectResolver;
29-
import org.elasticsearch.cluster.routing.IndexRoutingTable;
30-
import org.elasticsearch.cluster.routing.RoutingTable;
30+
import org.elasticsearch.cluster.routing.IndexShardRoutingTable;
31+
import org.elasticsearch.cluster.routing.OperationRouting;
3132
import org.elasticsearch.cluster.service.ClusterService;
3233
import org.elasticsearch.common.io.stream.Writeable;
3334
import org.elasticsearch.common.util.concurrent.EsExecutors;
@@ -39,6 +40,7 @@
3940

4041
import java.util.ArrayList;
4142
import java.util.Arrays;
43+
import java.util.Iterator;
4244
import java.util.List;
4345
import java.util.Map;
4446
import java.util.concurrent.Executor;
@@ -99,7 +101,8 @@ public void accept(ActionListener<Response> listener) {
99101

100102
final ClusterState clusterState = clusterService.state();
101103
final ProjectMetadata project = projectResolver.getProjectMetadata(clusterState);
102-
final List<ShardId> shards = shards(request, project, clusterState.routingTable(project.id()));
104+
final ProjectState projectState = projectResolver.getProjectState(clusterState);
105+
final List<ShardId> shards = shards(request, projectState);
103106
final Map<String, IndexMetadata> indexMetadataByName = project.indices();
104107

105108
try (var refs = new RefCountingRunnable(() -> finish(listener))) {
@@ -185,17 +188,17 @@ protected void shardExecute(Task task, Request request, ShardId shardId, ActionL
185188
/**
186189
* @return all shard ids the request should run on
187190
*/
188-
protected List<ShardId> shards(Request request, ProjectMetadata project, RoutingTable indexRoutingTables) {
191+
protected List<ShardId> shards(Request request, ProjectState projectState) {
189192
assert Transports.assertNotTransportThread("may hit all the shards");
190193
List<ShardId> shardIds = new ArrayList<>();
191-
String[] concreteIndices = indexNameExpressionResolver.concreteIndexNames(project, request);
194+
195+
OperationRouting operationRouting = clusterService.operationRouting();
196+
197+
String[] concreteIndices = indexNameExpressionResolver.concreteIndexNames(projectState.metadata(), request);
192198
for (String index : concreteIndices) {
193-
IndexMetadata indexMetadata = project.indices().get(index);
194-
if (indexMetadata != null) {
195-
final IndexRoutingTable indexRoutingTable = indexRoutingTables.indicesRouting().get(index);
196-
for (int i = 0; i < indexRoutingTable.size(); i++) {
197-
shardIds.add(indexRoutingTable.shard(i).shardId());
198-
}
199+
Iterator<IndexShardRoutingTable> iterator = operationRouting.allWritableShards(projectState, index);
200+
while (iterator.hasNext()) {
201+
shardIds.add(iterator.next().shardId());
199202
}
200203
}
201204
return shardIds;

server/src/main/java/org/elasticsearch/cluster/routing/IndexRouting.java

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -159,11 +159,18 @@ public void checkIndexSplitAllowed() {}
159159
* @param shardId shardId to which the current document is routed based on hashing
160160
* @return Updated shardId
161161
*/
162-
protected final int rerouteIfResharding(int shardId) {
162+
protected final int rerouteWritesIfResharding(int shardId) {
163+
return rerouteFromSplitTargetShard(shardId, IndexReshardingState.Split.TargetShardState.HANDOFF);
164+
}
165+
166+
protected final int rerouteReadsIfResharding(int shardId) {
167+
return rerouteFromSplitTargetShard(shardId, IndexReshardingState.Split.TargetShardState.SPLIT);
168+
}
169+
170+
private int rerouteFromSplitTargetShard(int shardId, IndexReshardingState.Split.TargetShardState minimumRequiredState) {
171+
assert indexReshardingMetadata == null || indexReshardingMetadata.isSplit() : "Index resharding state is not a split";
163172
if (indexReshardingMetadata != null && indexReshardingMetadata.getSplit().isTargetShard(shardId)) {
164-
assert indexReshardingMetadata.isSplit() : "Index resharding state is not a split";
165-
if (indexReshardingMetadata.getSplit()
166-
.targetStateAtLeast(shardId, IndexReshardingState.Split.TargetShardState.HANDOFF) == false) {
173+
if (indexReshardingMetadata.getSplit().targetStateAtLeast(shardId, minimumRequiredState) == false) {
167174
return indexReshardingMetadata.getSplit().sourceShard(shardId);
168175
}
169176
}
@@ -217,21 +224,21 @@ public int indexShard(String id, @Nullable String routing, XContentType sourceTy
217224
}
218225
checkRoutingRequired(id, routing);
219226
int shardId = shardId(id, routing);
220-
return rerouteIfResharding(shardId);
227+
return rerouteWritesIfResharding(shardId);
221228
}
222229

223230
@Override
224231
public int updateShard(String id, @Nullable String routing) {
225232
checkRoutingRequired(id, routing);
226233
int shardId = shardId(id, routing);
227-
return rerouteIfResharding(shardId);
234+
return rerouteWritesIfResharding(shardId);
228235
}
229236

230237
@Override
231238
public int deleteShard(String id, @Nullable String routing) {
232239
checkRoutingRequired(id, routing);
233240
int shardId = shardId(id, routing);
234-
return rerouteIfResharding(shardId);
241+
return rerouteWritesIfResharding(shardId);
235242
}
236243

237244
@Override
@@ -339,7 +346,7 @@ public int indexShard(String id, @Nullable String routing, XContentType sourceTy
339346
checkNoRouting(routing);
340347
hash = hashSource(sourceType, source).buildHash(IndexRouting.ExtractFromSource::defaultOnEmpty);
341348
int shardId = hashToShardId(hash);
342-
return (rerouteIfResharding(shardId));
349+
return (rerouteWritesIfResharding(shardId));
343350
}
344351

345352
public String createId(XContentType sourceType, BytesReference source, byte[] suffix) {
@@ -480,14 +487,14 @@ public int updateShard(String id, @Nullable String routing) {
480487
public int deleteShard(String id, @Nullable String routing) {
481488
checkNoRouting(routing);
482489
int shardId = idToHash(id);
483-
return (rerouteIfResharding(shardId));
490+
return (rerouteWritesIfResharding(shardId));
484491
}
485492

486493
@Override
487494
public int getShard(String id, @Nullable String routing) {
488495
checkNoRouting(routing);
489496
int shardId = idToHash(id);
490-
return (rerouteIfResharding(shardId));
497+
return (rerouteWritesIfResharding(shardId));
491498
}
492499

493500
private void checkNoRouting(@Nullable String routing) {

server/src/main/java/org/elasticsearch/cluster/routing/OperationRouting.java

Lines changed: 52 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111

1212
import org.elasticsearch.cluster.ProjectState;
1313
import org.elasticsearch.cluster.metadata.IndexMetadata;
14+
import org.elasticsearch.cluster.metadata.IndexReshardingMetadata;
15+
import org.elasticsearch.cluster.metadata.IndexReshardingState;
1416
import org.elasticsearch.cluster.metadata.ProjectMetadata;
1517
import org.elasticsearch.cluster.node.DiscoveryNodes;
1618
import org.elasticsearch.common.Strings;
@@ -26,6 +28,7 @@
2628
import java.util.Arrays;
2729
import java.util.Collections;
2830
import java.util.HashSet;
31+
import java.util.Iterator;
2932
import java.util.List;
3033
import java.util.Map;
3134
import java.util.Set;
@@ -112,6 +115,10 @@ public List<ShardIterator> searchShards(
112115
return res;
113116
}
114117

118+
public Iterator<IndexShardRoutingTable> allWritableShards(ProjectState projectState, String index) {
119+
return allShardsReadyForRefresh(projectState, index);
120+
}
121+
115122
public static ShardIterator getShards(RoutingTable routingTable, ShardId shardId) {
116123
final IndexShardRoutingTable shard = routingTable.shardRoutingTable(shardId);
117124
return shard.activeInitializingShardsRandomIt();
@@ -125,7 +132,7 @@ private static Set<IndexShardRoutingTable> computeTargetedShards(
125132
// we use set here and not list since we might get duplicates
126133
final Set<IndexShardRoutingTable> set = new HashSet<>();
127134
if (routing == null || routing.isEmpty()) {
128-
collectTargetShardsNoRouting(projectState.routingTable(), concreteIndices, set);
135+
collectTargetShardsNoRouting(projectState, concreteIndices, set);
129136
} else {
130137
collectTargetShardsWithRouting(projectState, concreteIndices, routing, set);
131138
}
@@ -154,13 +161,53 @@ private static void collectTargetShardsWithRouting(
154161
}
155162
}
156163

157-
private static void collectTargetShardsNoRouting(RoutingTable routingTable, String[] concreteIndices, Set<IndexShardRoutingTable> set) {
164+
private static void collectTargetShardsNoRouting(ProjectState projectState, String[] concreteIndices, Set<IndexShardRoutingTable> set) {
158165
for (String index : concreteIndices) {
159-
final IndexRoutingTable indexRoutingTable = indexRoutingTable(routingTable, index);
160-
for (int i = 0; i < indexRoutingTable.size(); i++) {
161-
set.add(indexRoutingTable.shard(i));
166+
Iterator<IndexShardRoutingTable> iterator = allShardsReadyForSearch(projectState, index);
167+
while (iterator.hasNext()) {
168+
set.add(iterator.next());
169+
}
170+
}
171+
}
172+
173+
/**
174+
* Returns an iterator of shards of the index that are ready to execute search requests.
175+
* A shard may not be ready to execute these operations during processes like resharding.
176+
*/
177+
private static Iterator<IndexShardRoutingTable> allShardsReadyForSearch(ProjectState projectState, String index) {
178+
return allShardsExceptSplitTargetsInStateBefore(projectState, index, IndexReshardingState.Split.TargetShardState.SPLIT);
179+
}
180+
181+
/**
182+
* Returns an iterator of shards of the index that are ready to execute refresh requests.
183+
* A shard may not be ready to execute these operations during processes like resharding.
184+
*/
185+
private static Iterator<IndexShardRoutingTable> allShardsReadyForRefresh(ProjectState projectState, String index) {
186+
return allShardsExceptSplitTargetsInStateBefore(projectState, index, IndexReshardingState.Split.TargetShardState.HANDOFF);
187+
}
188+
189+
private static Iterator<IndexShardRoutingTable> allShardsExceptSplitTargetsInStateBefore(
190+
ProjectState projectState,
191+
String index,
192+
IndexReshardingState.Split.TargetShardState targetShardState
193+
) {
194+
final IndexRoutingTable indexRoutingTable = indexRoutingTable(projectState.routingTable(), index);
195+
final IndexMetadata indexMetadata = indexMetadata(projectState.metadata(), index);
196+
if (indexMetadata.getReshardingMetadata() == null) {
197+
return indexRoutingTable.allShards().iterator();
198+
}
199+
200+
final IndexReshardingMetadata indexReshardingMetadata = indexMetadata.getReshardingMetadata();
201+
assert indexReshardingMetadata.isSplit();
202+
final IndexReshardingState.Split splitState = indexReshardingMetadata.getSplit();
203+
204+
var shards = new ArrayList<IndexShardRoutingTable>();
205+
for (int i = 0; i < indexRoutingTable.size(); i++) {
206+
if (splitState.isTargetShard(i) == false || splitState.targetStateAtLeast(i, targetShardState)) {
207+
shards.add(indexRoutingTable.shard(i));
162208
}
163209
}
210+
return shards.iterator();
164211
}
165212

166213
private ShardIterator preferenceActiveShardIterator(

server/src/main/java/org/elasticsearch/index/shard/ShardSplittingQuery.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
*/
99
package org.elasticsearch.index.shard;
1010

11+
import org.apache.logging.log4j.LogManager;
12+
import org.apache.logging.log4j.Logger;
1113
import org.apache.lucene.index.FieldInfo;
1214
import org.apache.lucene.index.LeafReader;
1315
import org.apache.lucene.index.LeafReaderContext;
@@ -32,6 +34,7 @@
3234
import org.apache.lucene.util.BitSetIterator;
3335
import org.apache.lucene.util.BytesRef;
3436
import org.apache.lucene.util.FixedBitSet;
37+
import org.elasticsearch.ElasticsearchException;
3538
import org.elasticsearch.cluster.metadata.IndexMetadata;
3639
import org.elasticsearch.cluster.routing.IndexRouting;
3740
import org.elasticsearch.common.lucene.search.Queries;
@@ -53,6 +56,8 @@
5356
* as deleted. See {@link org.apache.lucene.index.IndexWriter#deleteDocuments(Query...)}
5457
*/
5558
public final class ShardSplittingQuery extends Query {
59+
private static final Logger logger = LogManager.getLogger(ShardSplittingQuery.class);
60+
5661
private final IndexMetadata indexMetadata;
5762
private final IndexRouting indexRouting;
5863
private final int shardId;
@@ -77,6 +82,9 @@ public String toString() {
7782
public ScorerSupplier scorerSupplier(LeafReaderContext context) throws IOException {
7883
LeafReader leafReader = context.reader();
7984
FixedBitSet bitSet = new FixedBitSet(leafReader.maxDoc());
85+
logger.info("ShardSplittingQuery " + shardId + " - maxDoc is " + leafReader.maxDoc());
86+
logger.info("ShardSplittingQuery " + shardId + " - leafReader is " + leafReader);
87+
logger.error("ShardSplittingQuery", new ElasticsearchException("ShardSplittingQuery"));
8088
Terms terms = leafReader.terms(RoutingFieldMapper.NAME);
8189
Predicate<BytesRef> includeInShard = ref -> {
8290
// TODO IndexRouting should build the query somehow

server/src/main/java/org/elasticsearch/search/DefaultSearchContext.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.elasticsearch.index.query.SearchExecutionContext;
5252
import org.elasticsearch.index.search.NestedHelper;
5353
import org.elasticsearch.index.shard.IndexShard;
54+
import org.elasticsearch.index.shard.ShardSplittingQuery;
5455
import org.elasticsearch.search.aggregations.SearchContextAggregations;
5556
import org.elasticsearch.search.aggregations.support.AggregationContext;
5657
import org.elasticsearch.search.builder.SearchSourceBuilder;
@@ -471,6 +472,10 @@ public Query buildFilteredQuery(Query query) {
471472
}
472473
}
473474

475+
var q = Queries.not(
476+
new ShardSplittingQuery(indexService.getMetadata(), shardTarget.getShardId().getId(), nestedLookup != NestedLookup.EMPTY)
477+
);
478+
474479
if (filters.isEmpty()) {
475480
return query;
476481
} else {

server/src/test/java/org/elasticsearch/action/support/replication/BroadcastReplicationTests.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -248,8 +248,7 @@ public void testShardsList() throws InterruptedException, ExecutionException {
248248
logger.debug("--> using initial state:\n{}", clusterService.state());
249249
List<ShardId> shards = broadcastReplicationAction.shards(
250250
new DummyBroadcastRequest().indices(shardId.getIndexName()),
251-
clusterState.metadata().getProject(projectId),
252-
clusterState.routingTable(projectId)
251+
clusterState.projectState(projectId)
253252
);
254253
assertThat(shards.size(), equalTo(1));
255254
assertThat(shards.get(0), equalTo(shardId));

0 commit comments

Comments
 (0)