diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesService.java b/server/src/main/java/org/elasticsearch/indices/IndicesService.java index 8fdc53e6b795f..528601f201fee 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/server/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -950,6 +950,7 @@ public synchronized void verifyIndexMetadata(IndexMetadata metadata, IndexMetada @Override public void createShard( + final ProjectId projectId, final ShardRouting shardRouting, final PeerRecoveryTargetService recoveryTargetService, final PeerRecoveryTargetService.RecoveryListener recoveryListener, @@ -968,26 +969,29 @@ public void createShard( RecoveryState recoveryState = indexService.createRecoveryState(shardRouting, targetNode, sourceNode); IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer, retentionLeaseSyncer); indexShard.addShardFailureCallback(onShardFailure); - indexShard.startRecovery( - recoveryState, - recoveryTargetService, - postRecoveryMerger.maybeMergeAfterRecovery(indexService.getMetadata(), shardRouting, recoveryListener), - repositoriesService, - (mapping, listener) -> { - assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS - : "mapping update consumer only required by local shards recovery"; - AcknowledgedRequest putMappingRequestAcknowledgedRequest = new PutMappingRequest() - // concrete index - no name clash, it uses uuid - .setConcreteIndex(shardRouting.index()) - .source(mapping.source().string(), XContentType.JSON); - client.execute( - TransportAutoPutMappingAction.TYPE, - putMappingRequestAcknowledgedRequest.ackTimeout(TimeValue.MAX_VALUE).masterNodeTimeout(TimeValue.MAX_VALUE), - new RefCountAwareThreadedActionListener<>(threadPool.generic(), listener.map(ignored -> null)) - ); - }, - this, - clusterStateVersion + projectResolver.executeOnProject( + projectId, + () -> indexShard.startRecovery( + recoveryState, + recoveryTargetService, + postRecoveryMerger.maybeMergeAfterRecovery(indexService.getMetadata(), shardRouting, recoveryListener), + repositoriesService, + (mapping, listener) -> { + assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS + : "mapping update consumer only required by local shards recovery"; + AcknowledgedRequest putMappingRequestAcknowledgedRequest = new PutMappingRequest() + // concrete index - no name clash, it uses uuid + .setConcreteIndex(shardRouting.index()) + .source(mapping.source().string(), XContentType.JSON); + client.execute( + TransportAutoPutMappingAction.TYPE, + putMappingRequestAcknowledgedRequest.ackTimeout(TimeValue.MAX_VALUE).masterNodeTimeout(TimeValue.MAX_VALUE), + new RefCountAwareThreadedActionListener<>(threadPool.generic(), listener.map(ignored -> null)) + ); + }, + this, + clusterStateVersion + ) ); } diff --git a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 9862ff9d30338..95c462072ae5a 100644 --- a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -26,6 +26,7 @@ import org.elasticsearch.cluster.ClusterStateApplier; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -781,6 +782,7 @@ private void createShardWhenLockAvailable( try { logger.debug("{} creating shard with primary term [{}], iteration [{}]", shardRouting.shardId(), primaryTerm, iteration); indicesService.createShard( + originalState.metadata().projectFor(shardRouting.index()).id(), shardRouting, recoveryTargetService, new RecoveryListener(shardRouting, primaryTerm), @@ -1330,6 +1332,7 @@ void removeIndex( /** * Creates a shard for the specified shard routing and starts recovery. * + * @param projectId the project for the shard * @param shardRouting the shard routing * @param recoveryTargetService recovery service for the target * @param recoveryListener a callback when recovery changes state (finishes or fails) @@ -1343,6 +1346,7 @@ void removeIndex( * @throws IOException if an I/O exception occurs when creating the shard */ void createShard( + ProjectId projectId, ShardRouting shardRouting, PeerRecoveryTargetService recoveryTargetService, PeerRecoveryTargetService.RecoveryListener recoveryListener, diff --git a/test/framework/src/main/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java b/test/framework/src/main/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java index 03d6ac6342b42..0c1e381f69c4e 100644 --- a/test/framework/src/main/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java @@ -12,6 +12,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; @@ -242,6 +243,7 @@ public MockIndexService indexService(Index index) { @Override public void createShard( + final ProjectId projectId, final ShardRouting shardRouting, final PeerRecoveryTargetService recoveryTargetService, final PeerRecoveryTargetService.RecoveryListener recoveryListener,