From d82a7753d0a5811bbaf7f9fd8d31efdab49ad982 Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Thu, 3 Jul 2025 17:34:22 +1000 Subject: [PATCH 1/2] executing shard recovery in project context --- .../main/java/org/elasticsearch/indices/IndicesService.java | 5 ++++- .../indices/cluster/IndicesClusterStateService.java | 4 ++++ .../cluster/AbstractIndicesClusterStateServiceTestCase.java | 2 ++ 3 files changed, 10 insertions(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesService.java b/server/src/main/java/org/elasticsearch/indices/IndicesService.java index 8fdc53e6b795f..770221db21406 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/server/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -75,6 +75,7 @@ import org.elasticsearch.core.AbstractRefCounted; import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.CheckedFunction; +import org.elasticsearch.core.CheckedRunnable; import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasable; @@ -950,6 +951,7 @@ public synchronized void verifyIndexMetadata(IndexMetadata metadata, IndexMetada @Override public void createShard( + final ProjectId projectId, final ShardRouting shardRouting, final PeerRecoveryTargetService recoveryTargetService, final PeerRecoveryTargetService.RecoveryListener recoveryListener, @@ -968,7 +970,7 @@ public void createShard( RecoveryState recoveryState = indexService.createRecoveryState(shardRouting, targetNode, sourceNode); IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer, retentionLeaseSyncer); indexShard.addShardFailureCallback(onShardFailure); - indexShard.startRecovery( + final CheckedRunnable recoveryRunnable = () -> indexShard.startRecovery( recoveryState, recoveryTargetService, postRecoveryMerger.maybeMergeAfterRecovery(indexService.getMetadata(), shardRouting, recoveryListener), @@ -989,6 +991,7 @@ public void createShard( this, clusterStateVersion ); + projectResolver.executeOnProject(projectId, recoveryRunnable); } @Override diff --git a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java index 83be37d553fef..d42e0ad9327d2 100644 --- a/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java +++ b/server/src/main/java/org/elasticsearch/indices/cluster/IndicesClusterStateService.java @@ -26,6 +26,7 @@ import org.elasticsearch.cluster.ClusterStateApplier; import org.elasticsearch.cluster.action.shard.ShardStateAction; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.node.DiscoveryNodes; @@ -781,6 +782,7 @@ private void createShardWhenLockAvailable( try { logger.debug("{} creating shard with primary term [{}], iteration [{}]", shardRouting.shardId(), primaryTerm, iteration); indicesService.createShard( + originalState.metadata().projectFor(shardRouting.index()).id(), shardRouting, recoveryTargetService, new RecoveryListener(shardRouting, primaryTerm), @@ -1330,6 +1332,7 @@ void removeIndex( /** * Creates a shard for the specified shard routing and starts recovery. * + * @param projectId the project for the shard * @param shardRouting the shard routing * @param recoveryTargetService recovery service for the target * @param recoveryListener a callback when recovery changes state (finishes or fails) @@ -1343,6 +1346,7 @@ void removeIndex( * @throws IOException if an I/O exception occurs when creating the shard */ void createShard( + ProjectId projectId, ShardRouting shardRouting, PeerRecoveryTargetService recoveryTargetService, PeerRecoveryTargetService.RecoveryListener recoveryListener, diff --git a/test/framework/src/main/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java b/test/framework/src/main/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java index 03d6ac6342b42..0c1e381f69c4e 100644 --- a/test/framework/src/main/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java +++ b/test/framework/src/main/java/org/elasticsearch/indices/cluster/AbstractIndicesClusterStateServiceTestCase.java @@ -12,6 +12,7 @@ import org.elasticsearch.action.ActionListener; import org.elasticsearch.cluster.ClusterState; import org.elasticsearch.cluster.metadata.IndexMetadata; +import org.elasticsearch.cluster.metadata.ProjectId; import org.elasticsearch.cluster.metadata.ProjectMetadata; import org.elasticsearch.cluster.node.DiscoveryNode; import org.elasticsearch.cluster.routing.IndexShardRoutingTable; @@ -242,6 +243,7 @@ public MockIndexService indexService(Index index) { @Override public void createShard( + final ProjectId projectId, final ShardRouting shardRouting, final PeerRecoveryTargetService recoveryTargetService, final PeerRecoveryTargetService.RecoveryListener recoveryListener, From 70783e8936306322e543216b79099ef89712f2fe Mon Sep 17 00:00:00 2001 From: Yang Wang Date: Tue, 15 Jul 2025 20:19:19 +1000 Subject: [PATCH 2/2] inline --- .../elasticsearch/indices/IndicesService.java | 45 ++++++++++--------- 1 file changed, 23 insertions(+), 22 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/indices/IndicesService.java b/server/src/main/java/org/elasticsearch/indices/IndicesService.java index 770221db21406..528601f201fee 100644 --- a/server/src/main/java/org/elasticsearch/indices/IndicesService.java +++ b/server/src/main/java/org/elasticsearch/indices/IndicesService.java @@ -75,7 +75,6 @@ import org.elasticsearch.core.AbstractRefCounted; import org.elasticsearch.core.CheckedConsumer; import org.elasticsearch.core.CheckedFunction; -import org.elasticsearch.core.CheckedRunnable; import org.elasticsearch.core.IOUtils; import org.elasticsearch.core.Nullable; import org.elasticsearch.core.Releasable; @@ -970,28 +969,30 @@ public void createShard( RecoveryState recoveryState = indexService.createRecoveryState(shardRouting, targetNode, sourceNode); IndexShard indexShard = indexService.createShard(shardRouting, globalCheckpointSyncer, retentionLeaseSyncer); indexShard.addShardFailureCallback(onShardFailure); - final CheckedRunnable recoveryRunnable = () -> indexShard.startRecovery( - recoveryState, - recoveryTargetService, - postRecoveryMerger.maybeMergeAfterRecovery(indexService.getMetadata(), shardRouting, recoveryListener), - repositoriesService, - (mapping, listener) -> { - assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS - : "mapping update consumer only required by local shards recovery"; - AcknowledgedRequest putMappingRequestAcknowledgedRequest = new PutMappingRequest() - // concrete index - no name clash, it uses uuid - .setConcreteIndex(shardRouting.index()) - .source(mapping.source().string(), XContentType.JSON); - client.execute( - TransportAutoPutMappingAction.TYPE, - putMappingRequestAcknowledgedRequest.ackTimeout(TimeValue.MAX_VALUE).masterNodeTimeout(TimeValue.MAX_VALUE), - new RefCountAwareThreadedActionListener<>(threadPool.generic(), listener.map(ignored -> null)) - ); - }, - this, - clusterStateVersion + projectResolver.executeOnProject( + projectId, + () -> indexShard.startRecovery( + recoveryState, + recoveryTargetService, + postRecoveryMerger.maybeMergeAfterRecovery(indexService.getMetadata(), shardRouting, recoveryListener), + repositoriesService, + (mapping, listener) -> { + assert recoveryState.getRecoverySource().getType() == RecoverySource.Type.LOCAL_SHARDS + : "mapping update consumer only required by local shards recovery"; + AcknowledgedRequest putMappingRequestAcknowledgedRequest = new PutMappingRequest() + // concrete index - no name clash, it uses uuid + .setConcreteIndex(shardRouting.index()) + .source(mapping.source().string(), XContentType.JSON); + client.execute( + TransportAutoPutMappingAction.TYPE, + putMappingRequestAcknowledgedRequest.ackTimeout(TimeValue.MAX_VALUE).masterNodeTimeout(TimeValue.MAX_VALUE), + new RefCountAwareThreadedActionListener<>(threadPool.generic(), listener.map(ignored -> null)) + ); + }, + this, + clusterStateVersion + ) ); - projectResolver.executeOnProject(projectId, recoveryRunnable); } @Override