Skip to content

Commit

Permalink
MINOR: Move the ELR default version to 4.1 (#18954)
Browse files Browse the repository at this point in the history
- ELR is enabled (ELRV_1) by default if the cluster is created with its bootstrap metadata version >= IBP_4_1_IV0.
- ELRV_1 can be manually enabled iff the metadata version is >= IBP_4_0_IV1.

Reviewers: Ismael Juma <[email protected]>, Colin P. McCabe <[email protected]>, David Jacot <[email protected]>
  • Loading branch information
CalvinConfluent authored Feb 21, 2025
1 parent 7da1a6c commit 8f13e7c
Show file tree
Hide file tree
Showing 6 changed files with 89 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.storage.internals.checkpoint.CleanShutdownFileHandler;

import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -79,6 +80,12 @@ public class EligibleLeaderReplicasIntegrationTest extends KafkaServerTestHarnes
private String bootstrapServer;
private String testTopicName;
private Admin adminClient;

@Override
public MetadataVersion metadataVersion() {
return MetadataVersion.IBP_4_0_IV1;
}

@Override
public Seq<KafkaConfig> generateConfigs() {
List<Properties> brokerConfigs = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.kafka.metadata.RecordTestUtils;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.EligibleLeaderReplicasVersion;
import org.apache.kafka.server.common.MetadataVersion;
import org.apache.kafka.server.config.ConfigSynonym;
import org.apache.kafka.server.policy.AlterConfigPolicy;
import org.apache.kafka.server.policy.AlterConfigPolicy.RequestMetadata;
Expand Down Expand Up @@ -64,7 +65,9 @@
import static org.apache.kafka.common.metadata.MetadataRecordType.CONFIG_RECORD;
import static org.apache.kafka.server.config.ConfigSynonym.HOURS_TO_MILLISECONDS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;


@Timeout(value = 40)
Expand Down Expand Up @@ -499,4 +502,36 @@ public void testRejectMinIsrChangeWhenElrEnabled(boolean removal) {
assertEquals(Errors.NONE, result.response().error());
}
}

@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testElrUpgrade(boolean isMetadataVersionElrEnabled) {
FeatureControlManager featureManager = new FeatureControlManager.Builder().
setQuorumFeatures(new QuorumFeatures(0,
QuorumFeatures.defaultSupportedFeatureMap(true),
Collections.emptyList())).
setMetadataVersion(isMetadataVersionElrEnabled ? MetadataVersion.IBP_4_0_IV1 : MetadataVersion.IBP_4_0_IV0).
build();
ConfigurationControlManager manager = new ConfigurationControlManager.Builder().
setStaticConfig(Map.of(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2")).
setFeatureControl(featureManager).
setKafkaConfigSchema(SCHEMA).
build();
assertFalse(featureManager.isElrFeatureEnabled());
ControllerResult<ApiError> result = manager.updateFeatures(
Collections.singletonMap(EligibleLeaderReplicasVersion.FEATURE_NAME,
EligibleLeaderReplicasVersion.ELRV_1.featureLevel()),
Collections.singletonMap(EligibleLeaderReplicasVersion.FEATURE_NAME,
FeatureUpdate.UpgradeType.UPGRADE),
false);
assertNotNull(result.response());
if (isMetadataVersionElrEnabled) {
assertEquals(Errors.NONE, result.response().error());
RecordTestUtils.replayAll(manager, result.records());
RecordTestUtils.replayAll(featureManager, result.records());
assertTrue(featureManager.isElrFeatureEnabled());
} else {
assertEquals(Errors.INVALID_UPDATE_VERSION, result.response().error());
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.Timeout;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.ValueSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -55,9 +57,11 @@
import java.util.Optional;
import java.util.OptionalInt;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.apache.kafka.metadata.storage.ScramParserTest.TEST_SALT;
import static org.apache.kafka.metadata.storage.ScramParserTest.TEST_SALTED_PASSWORD;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
Expand Down Expand Up @@ -343,9 +347,6 @@ public void testFeatureFlag(short version) throws Exception {
setName(MetadataVersion.FEATURE_NAME).
setFeatureLevel(MetadataVersion.latestProduction().featureLevel()),
(short) 0));
expected.add(new ApiMessageAndVersion(new FeatureLevelRecord().
setName(EligibleLeaderReplicasVersion.FEATURE_NAME).
setFeatureLevel(EligibleLeaderReplicasVersion.ELRV_1.featureLevel()), (short) 0));
expected.add(new ApiMessageAndVersion(new FeatureLevelRecord().
setName(GroupVersion.FEATURE_NAME).
setFeatureLevel(GroupVersion.GV_1.featureLevel()), (short) 0));
Expand Down Expand Up @@ -457,6 +458,34 @@ public void testFormatWithInitialVotersFailsWithOlderMetadataVersion() throws Ex
}
}

private static Stream<Arguments> elrTestMetadataVersions() {
return Stream.of(
MetadataVersion.IBP_3_9_IV0,
MetadataVersion.IBP_4_0_IV0,
MetadataVersion.IBP_4_0_IV1 // ELR minimal MV
).map(Arguments::of);
}

@ParameterizedTest
@MethodSource("elrTestMetadataVersions")
public void testFormatElrEnabledWithMetadataVersions(MetadataVersion metadataVersion) throws Exception {
try (TestEnv testEnv = new TestEnv(2)) {
FormatterContext formatter1 = testEnv.newFormatter();
formatter1.formatter.setReleaseVersion(metadataVersion);
formatter1.formatter.setFeatureLevel(EligibleLeaderReplicasVersion.FEATURE_NAME, (short) 1);
formatter1.formatter.setInitialControllers(DynamicVoters.
parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw"));
if (metadataVersion.isAtLeast(MetadataVersion.IBP_4_0_IV1)) {
assertDoesNotThrow(() -> formatter1.formatter.run());
} else {
assertEquals("eligible.leader.replicas.version could not be set to 1 because it depends on " +
"metadata.version level 23",
assertThrows(IllegalArgumentException.class,
() -> formatter1.formatter.run()).getMessage());
}
}
}

@ParameterizedTest
@ValueSource(booleans = {false, true})
public void testFormatWithNoInitialControllers(boolean specifyKRaftVersion) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public enum EligibleLeaderReplicasVersion implements FeatureVersion {
ELRV_0(0, MetadataVersion.MINIMUM_VERSION, Collections.emptyMap()),

// Version 1 enables the ELR (KIP-966).
ELRV_1(1, MetadataVersion.IBP_4_0_IV1, Collections.emptyMap());
ELRV_1(1, MetadataVersion.IBP_4_1_IV0, Map.of(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_4_0_IV1.featureLevel()));

public static final String FEATURE_NAME = "eligible.leader.replicas.version";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public enum MetadataVersion {
// Bootstrap metadata version for version 1 of the GroupVersion feature (KIP-848).
IBP_4_0_IV0(22, "4.0", "IV0", false),

// Add ELR related supports (KIP-966).
// Add ELR related metadata records (KIP-966). Note, ELR is for preview only in 4.0.
// PartitionRecord and PartitionChangeRecord are updated.
// ClearElrRecord is added.
IBP_4_0_IV1(23, "4.0", "IV1", true),
Expand All @@ -111,7 +111,7 @@ public enum MetadataVersion {
// Please move this comment when updating the LATEST_PRODUCTION constant.
//

// Not used by anything yet.
// Enables ELR by default for new clusters (KIP-966).
IBP_4_1_IV0(26, "4.1", "IV0", false);

// NOTES when adding a new version:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.util.Map;

import static org.apache.kafka.server.common.Feature.validateDefaultValueAndLatestProductionValue;
import static org.apache.kafka.server.common.Feature.validateVersion;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -291,4 +293,14 @@ public void testValidateWithMVDependencyAheadOfBootstrapMV() {
assertEquals("Feature UNIT_TEST_VERSION_7 has default FeatureVersion UT_FV7_0 when MV=" + MetadataVersion.MINIMUM_VERSION
+ " with MV dependency 3.7-IV0 that is behind its bootstrap MV " + MetadataVersion.MINIMUM_VERSION + ".", exception.getMessage());
}

@Test
public void testValidateEligibleLeaderReplicasVersion() {
assertThrows(IllegalArgumentException.class, () ->
validateVersion(EligibleLeaderReplicasVersion.ELRV_1, Map.of(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_4_0_IV0.featureLevel())),
"ELR requires MV to be at least 4.0IV1.");
assertDoesNotThrow(() ->
validateVersion(EligibleLeaderReplicasVersion.ELRV_1, Map.of(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_4_0_IV1.featureLevel())),
"ELR requires MV to be at least 4.0IV1.");
}
}

0 comments on commit 8f13e7c

Please sign in to comment.