Skip to content

Commit 0c3f2b4

Browse files
authored
[Backport 2.17] [Remote State] Upload incremental cluster state on master re-election (#15145) (#15792)
* [Remote State] Upload incremental cluster state on master re-election (#15145) Signed-off-by: Shivansh Arora <[email protected]> (cherry picked from commit cbdcbb7)
1 parent fc1bf2c commit 0c3f2b4

File tree

12 files changed

+734
-126
lines changed

12 files changed

+734
-126
lines changed

release-notes/opensearch.release-notes-2.17.0.md

+1
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
- MultiTermQueries in keyword fields now default to `indexed` approach and gated behind cluster setting ([#15637](https://github.com/opensearch-project/OpenSearch/pull/15637))
5454
- Adding WithFieldName interface for QueryBuilders with fieldName ([#15705](https://github.com/opensearch-project/OpenSearch/pull/15705))
5555
- Static RemotePublication setting added, removed experimental feature flag ([#15478](https://github.com/opensearch-project/OpenSearch/pull/15478))
56+
- [Remote Publication] Upload incremental cluster state on master re-election ([#15145](https://github.com/opensearch-project/OpenSearch/pull/15145))
5657

5758
### Dependencies
5859
- Bump `netty` from 4.1.111.Final to 4.1.112.Final ([#15081](https://github.com/opensearch-project/OpenSearch/pull/15081))

server/src/internalClusterTest/java/org/opensearch/gateway/remote/RemoteStatePublicationIT.java

+60-2
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,12 @@
1313
import org.opensearch.action.admin.cluster.settings.ClusterUpdateSettingsRequest;
1414
import org.opensearch.action.admin.cluster.state.ClusterStateResponse;
1515
import org.opensearch.client.Client;
16+
import org.opensearch.cluster.coordination.CoordinationState;
17+
import org.opensearch.cluster.coordination.PersistedStateRegistry;
1618
import org.opensearch.common.blobstore.BlobPath;
1719
import org.opensearch.common.settings.Settings;
1820
import org.opensearch.discovery.DiscoveryStats;
21+
import org.opensearch.gateway.GatewayMetaState;
1922
import org.opensearch.gateway.remote.ClusterMetadataManifest.UploadedIndexMetadata;
2023
import org.opensearch.gateway.remote.model.RemoteClusterMetadataManifest;
2124
import org.opensearch.gateway.remote.model.RemoteRoutingTableBlobStore;
@@ -35,6 +38,7 @@
3538
import java.util.Base64;
3639
import java.util.Locale;
3740
import java.util.Map;
41+
import java.util.concurrent.ExecutionException;
3842
import java.util.function.Function;
3943
import java.util.stream.Collectors;
4044

@@ -62,15 +66,15 @@ public class RemoteStatePublicationIT extends RemoteStoreBaseIntegTestCase {
6266
private static final String REMOTE_STATE_PREFIX = "!";
6367
private static final String REMOTE_ROUTING_PREFIX = "_";
6468
private boolean isRemoteStateEnabled = true;
65-
private String isRemotePublicationEnabled = "true";
69+
private boolean isRemotePublicationEnabled = true;
6670
private boolean hasRemoteStateCharPrefix;
6771
private boolean hasRemoteRoutingCharPrefix;
6872

6973
@Before
7074
public void setup() {
7175
asyncUploadMockFsRepo = false;
7276
isRemoteStateEnabled = true;
73-
isRemotePublicationEnabled = "true";
77+
isRemotePublicationEnabled = true;
7478
hasRemoteStateCharPrefix = randomBoolean();
7579
hasRemoteRoutingCharPrefix = randomBoolean();
7680
}
@@ -100,6 +104,7 @@ protected Settings nodeSettings(int nodeOrdinal) {
100104
RemoteClusterStateService.REMOTE_CLUSTER_STATE_CHECKSUM_VALIDATION_MODE_SETTING.getKey(),
101105
RemoteClusterStateService.RemoteClusterStateValidationMode.FAILURE
102106
)
107+
.put(REMOTE_PUBLICATION_SETTING_KEY, isRemotePublicationEnabled)
103108
.put(
104109
RemoteClusterStateService.CLUSTER_REMOTE_STORE_STATE_PATH_PREFIX.getKey(),
105110
hasRemoteStateCharPrefix ? REMOTE_STATE_PREFIX : ""
@@ -220,6 +225,59 @@ public void testRemotePublicationDownloadStats() {
220225

221226
}
222227

228+
public void testMasterReElectionUsesIncrementalUpload() throws IOException {
229+
prepareCluster(3, 2, INDEX_NAME, 1, 1);
230+
PersistedStateRegistry persistedStateRegistry = internalCluster().getClusterManagerNodeInstance(PersistedStateRegistry.class);
231+
GatewayMetaState.RemotePersistedState remotePersistedState = (GatewayMetaState.RemotePersistedState) persistedStateRegistry
232+
.getPersistedState(PersistedStateRegistry.PersistedStateType.REMOTE);
233+
ClusterMetadataManifest manifest = remotePersistedState.getLastAcceptedManifest();
234+
// force elected master to step down
235+
internalCluster().stopCurrentClusterManagerNode();
236+
ensureStableCluster(4);
237+
238+
persistedStateRegistry = internalCluster().getClusterManagerNodeInstance(PersistedStateRegistry.class);
239+
CoordinationState.PersistedState persistedStateAfterElection = persistedStateRegistry.getPersistedState(
240+
PersistedStateRegistry.PersistedStateType.REMOTE
241+
);
242+
ClusterMetadataManifest manifestAfterElection = persistedStateAfterElection.getLastAcceptedManifest();
243+
244+
// coordination metadata is updated, it will be unequal
245+
assertNotEquals(manifest.getCoordinationMetadata(), manifestAfterElection.getCoordinationMetadata());
246+
// all other attributes are not uploaded again and will be pointing to same files in manifest after new master is elected
247+
assertEquals(manifest.getClusterUUID(), manifestAfterElection.getClusterUUID());
248+
assertEquals(manifest.getIndices(), manifestAfterElection.getIndices());
249+
assertEquals(manifest.getSettingsMetadata(), manifestAfterElection.getSettingsMetadata());
250+
assertEquals(manifest.getTemplatesMetadata(), manifestAfterElection.getTemplatesMetadata());
251+
assertEquals(manifest.getCustomMetadataMap(), manifestAfterElection.getCustomMetadataMap());
252+
assertEquals(manifest.getRoutingTableVersion(), manifest.getRoutingTableVersion());
253+
assertEquals(manifest.getIndicesRouting(), manifestAfterElection.getIndicesRouting());
254+
}
255+
256+
public void testVotingConfigAreCommitted() throws ExecutionException, InterruptedException {
257+
prepareCluster(3, 2, INDEX_NAME, 1, 2);
258+
ensureStableCluster(5);
259+
ensureGreen(INDEX_NAME);
260+
// add two new nodes to the cluster, to update the voting config
261+
internalCluster().startClusterManagerOnlyNodes(2, Settings.EMPTY);
262+
ensureStableCluster(7);
263+
264+
internalCluster().getInstances(PersistedStateRegistry.class).forEach(persistedStateRegistry -> {
265+
CoordinationState.PersistedState localState = persistedStateRegistry.getPersistedState(
266+
PersistedStateRegistry.PersistedStateType.LOCAL
267+
);
268+
CoordinationState.PersistedState remoteState = persistedStateRegistry.getPersistedState(
269+
PersistedStateRegistry.PersistedStateType.REMOTE
270+
);
271+
if (remoteState != null) {
272+
assertEquals(
273+
localState.getLastAcceptedState().getLastCommittedConfiguration(),
274+
remoteState.getLastAcceptedState().getLastCommittedConfiguration()
275+
);
276+
assertEquals(5, remoteState.getLastAcceptedState().getLastCommittedConfiguration().getNodeIds().size());
277+
}
278+
});
279+
}
280+
223281
private void assertDataNodeDownloadStats(NodesStatsResponse nodesStatsResponse) {
224282
// assert cluster state stats for data node
225283
DiscoveryStats dataNodeDiscoveryStats = nodesStatsResponse.getNodes().get(0).getDiscoveryStats();

server/src/internalClusterTest/java/org/opensearch/remotestore/BaseRemoteStoreRestoreIT.java

-25
Original file line numberDiff line numberDiff line change
@@ -76,29 +76,4 @@ protected void verifyRestoredData(Map<String, Long> indexStats, String indexName
7676
protected void verifyRestoredData(Map<String, Long> indexStats, String indexName) throws Exception {
7777
verifyRestoredData(indexStats, indexName, true);
7878
}
79-
80-
public void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, String indices, int replicaCount, int shardCount) {
81-
prepareCluster(numClusterManagerNodes, numDataOnlyNodes, indices, replicaCount, shardCount, Settings.EMPTY);
82-
}
83-
84-
public void prepareCluster(
85-
int numClusterManagerNodes,
86-
int numDataOnlyNodes,
87-
String indices,
88-
int replicaCount,
89-
int shardCount,
90-
Settings settings
91-
) {
92-
prepareCluster(numClusterManagerNodes, numDataOnlyNodes, settings);
93-
for (String index : indices.split(",")) {
94-
createIndex(index, remoteStoreIndexSettings(replicaCount, shardCount));
95-
ensureYellowAndNoInitializingShards(index);
96-
ensureGreen(index);
97-
}
98-
}
99-
100-
public void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, Settings settings) {
101-
internalCluster().startClusterManagerOnlyNodes(numClusterManagerNodes, settings);
102-
internalCluster().startDataOnlyNodes(numDataOnlyNodes, settings);
103-
}
10479
}

server/src/internalClusterTest/java/org/opensearch/remotestore/RemoteStoreBaseIntegTestCase.java

+8-9
Original file line numberDiff line numberDiff line change
@@ -351,13 +351,7 @@ protected void restore(boolean restoreAllShards, String... indices) {
351351
}
352352

353353
protected void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, String indices, int replicaCount, int shardCount) {
354-
internalCluster().startClusterManagerOnlyNodes(numClusterManagerNodes);
355-
internalCluster().startDataOnlyNodes(numDataOnlyNodes);
356-
for (String index : indices.split(",")) {
357-
createIndex(index, remoteStoreIndexSettings(replicaCount, shardCount));
358-
ensureYellowAndNoInitializingShards(index);
359-
ensureGreen(index);
360-
}
354+
prepareCluster(numClusterManagerNodes, numDataOnlyNodes, indices, replicaCount, shardCount, Settings.EMPTY);
361355
}
362356

363357
protected void prepareCluster(
@@ -368,11 +362,16 @@ protected void prepareCluster(
368362
int shardCount,
369363
Settings settings
370364
) {
371-
internalCluster().startClusterManagerOnlyNodes(numClusterManagerNodes, settings);
372-
internalCluster().startDataOnlyNodes(numDataOnlyNodes, settings);
365+
prepareCluster(numClusterManagerNodes, numDataOnlyNodes, settings);
373366
for (String index : indices.split(",")) {
374367
createIndex(index, remoteStoreIndexSettings(replicaCount, shardCount));
368+
ensureYellowAndNoInitializingShards(index);
375369
ensureGreen(index);
376370
}
377371
}
372+
373+
protected void prepareCluster(int numClusterManagerNodes, int numDataOnlyNodes, Settings settings) {
374+
internalCluster().startClusterManagerOnlyNodes(numClusterManagerNodes, settings);
375+
internalCluster().startDataOnlyNodes(numDataOnlyNodes, settings);
376+
}
378377
}

server/src/main/java/org/opensearch/cluster/coordination/CoordinationState.java

+64-8
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import org.opensearch.cluster.node.DiscoveryNode;
4141
import org.opensearch.common.settings.Settings;
4242
import org.opensearch.common.util.io.IOUtils;
43+
import org.opensearch.gateway.remote.ClusterMetadataManifest;
4344

4445
import java.io.Closeable;
4546
import java.io.IOException;
@@ -104,6 +105,7 @@ public CoordinationState(
104105
.getLastAcceptedConfiguration();
105106
this.publishVotes = new VoteCollection();
106107
this.isRemoteStateEnabled = isRemoteStoreClusterStateEnabled(settings);
108+
// ToDo: revisit this check while making the setting dynamic
107109
this.isRemotePublicationEnabled = isRemoteStateEnabled
108110
&& REMOTE_PUBLICATION_SETTING.get(settings)
109111
&& localNode.isRemoteStatePublicationEnabled();
@@ -459,6 +461,9 @@ public PublishResponse handlePublishRequest(PublishRequest publishRequest) {
459461
clusterState.term()
460462
);
461463
persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL).setLastAcceptedState(clusterState);
464+
if (shouldUpdateRemotePersistedState(publishRequest)) {
465+
updateRemotePersistedStateOnPublishRequest(publishRequest);
466+
}
462467
assert getLastAcceptedState() == clusterState;
463468

464469
return new PublishResponse(clusterState.term(), clusterState.version());
@@ -571,6 +576,9 @@ public void handleCommit(ApplyCommitRequest applyCommit) {
571576
);
572577

573578
persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL).markLastAcceptedStateAsCommitted();
579+
if (shouldCommitRemotePersistedState()) {
580+
persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).markLastAcceptedStateAsCommitted();
581+
}
574582
assert getLastCommittedConfiguration().equals(getLastAcceptedConfiguration());
575583
}
576584

@@ -616,6 +624,33 @@ public void close() throws IOException {
616624
IOUtils.close(persistedStateRegistry);
617625
}
618626

627+
private boolean shouldUpdateRemotePersistedState(PublishRequest publishRequest) {
628+
return persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE) != null
629+
&& publishRequest.getAcceptedState().getNodes().isLocalNodeElectedClusterManager() == false;
630+
}
631+
632+
private void updateRemotePersistedStateOnPublishRequest(PublishRequest publishRequest) {
633+
if (publishRequest instanceof RemoteStatePublishRequest) {
634+
persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).setLastAcceptedState(publishRequest.getAcceptedState());
635+
persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE)
636+
.setLastAcceptedManifest(((RemoteStatePublishRequest) publishRequest).getAcceptedManifest());
637+
} else {
638+
// We will end up here if PublishRequest was sent not using Remote Store even with remote persisted state on this node
639+
persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).setLastAcceptedState(null);
640+
persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).setLastAcceptedManifest(null);
641+
}
642+
}
643+
644+
private boolean shouldCommitRemotePersistedState() {
645+
return persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE) != null
646+
&& persistedStateRegistry.getPersistedState(PersistedStateType.LOCAL)
647+
.getLastAcceptedState()
648+
.getNodes()
649+
.isLocalNodeElectedClusterManager() == false
650+
&& persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedState() != null
651+
&& persistedStateRegistry.getPersistedState(PersistedStateType.REMOTE).getLastAcceptedManifest() != null;
652+
}
653+
619654
/**
620655
* Pluggable persistence layer for {@link CoordinationState}.
621656
*
@@ -653,6 +688,22 @@ public interface PersistedState extends Closeable {
653688
*/
654689
PersistedStateStats getStats();
655690

691+
/**
692+
* Returns the last accepted {@link ClusterMetadataManifest}.
693+
*
694+
* @return The last accepted {@link ClusterMetadataManifest}, or null if no manifest
695+
* has been accepted yet.
696+
*/
697+
default ClusterMetadataManifest getLastAcceptedManifest() {
698+
// return null by default, this method needs to be overridden wherever required
699+
return null;
700+
}
701+
702+
/**
703+
* Sets the last accepted {@link ClusterMetadataManifest}.
704+
*/
705+
default void setLastAcceptedManifest(ClusterMetadataManifest manifest) {}
706+
656707
/**
657708
* Marks the last accepted cluster state as committed.
658709
* After a successful call to this method, {@link #getLastAcceptedState()} should return the last cluster state that was set,
@@ -661,14 +712,7 @@ public interface PersistedState extends Closeable {
661712
*/
662713
default void markLastAcceptedStateAsCommitted() {
663714
final ClusterState lastAcceptedState = getLastAcceptedState();
664-
Metadata.Builder metadataBuilder = null;
665-
if (lastAcceptedState.getLastAcceptedConfiguration().equals(lastAcceptedState.getLastCommittedConfiguration()) == false) {
666-
final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder(lastAcceptedState.coordinationMetadata())
667-
.lastCommittedConfiguration(lastAcceptedState.getLastAcceptedConfiguration())
668-
.build();
669-
metadataBuilder = Metadata.builder(lastAcceptedState.metadata());
670-
metadataBuilder.coordinationMetadata(coordinationMetadata);
671-
}
715+
Metadata.Builder metadataBuilder = commitVotingConfiguration(lastAcceptedState);
672716
// if we receive a commit from a Zen1 cluster-manager that has not recovered its state yet,
673717
// the cluster uuid might not been known yet.
674718
assert lastAcceptedState.metadata().clusterUUID().equals(Metadata.UNKNOWN_CLUSTER_UUID) == false
@@ -693,6 +737,18 @@ default void markLastAcceptedStateAsCommitted() {
693737
}
694738
}
695739

740+
default Metadata.Builder commitVotingConfiguration(ClusterState lastAcceptedState) {
741+
Metadata.Builder metadataBuilder = null;
742+
if (lastAcceptedState.getLastAcceptedConfiguration().equals(lastAcceptedState.getLastCommittedConfiguration()) == false) {
743+
final CoordinationMetadata coordinationMetadata = CoordinationMetadata.builder(lastAcceptedState.coordinationMetadata())
744+
.lastCommittedConfiguration(lastAcceptedState.getLastAcceptedConfiguration())
745+
.build();
746+
metadataBuilder = Metadata.builder(lastAcceptedState.metadata());
747+
metadataBuilder.coordinationMetadata(coordinationMetadata);
748+
}
749+
return metadataBuilder;
750+
}
751+
696752
default void close() throws IOException {}
697753
}
698754

0 commit comments

Comments
 (0)