diff --git a/core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java b/core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java index a3dafed5e4c38..8db4bd4d898b0 100644 --- a/core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java +++ b/core/src/test/java/kafka/server/integration/EligibleLeaderReplicasIntegrationTest.java @@ -45,6 +45,7 @@ import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.common.serialization.StringSerializer; import org.apache.kafka.server.common.EligibleLeaderReplicasVersion; +import org.apache.kafka.server.common.MetadataVersion; import org.apache.kafka.storage.internals.checkpoint.CleanShutdownFileHandler; import org.junit.jupiter.api.AfterEach; @@ -79,6 +80,12 @@ public class EligibleLeaderReplicasIntegrationTest extends KafkaServerTestHarnes private String bootstrapServer; private String testTopicName; private Admin adminClient; + + @Override + public MetadataVersion metadataVersion() { + return MetadataVersion.IBP_4_0_IV1; + } + @Override public Seq generateConfigs() { List brokerConfigs = new ArrayList<>(); diff --git a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java index 39979cc891644..4c1c73f64ac79 100644 --- a/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java +++ b/metadata/src/test/java/org/apache/kafka/controller/ConfigurationControlManagerTest.java @@ -31,6 +31,7 @@ import org.apache.kafka.metadata.RecordTestUtils; import org.apache.kafka.server.common.ApiMessageAndVersion; import org.apache.kafka.server.common.EligibleLeaderReplicasVersion; +import org.apache.kafka.server.common.MetadataVersion; import org.apache.kafka.server.config.ConfigSynonym; import org.apache.kafka.server.policy.AlterConfigPolicy; import org.apache.kafka.server.policy.AlterConfigPolicy.RequestMetadata; @@ -64,7 +65,9 @@ import static org.apache.kafka.common.metadata.MetadataRecordType.CONFIG_RECORD; import static org.apache.kafka.server.config.ConfigSynonym.HOURS_TO_MILLISECONDS; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; @Timeout(value = 40) @@ -499,4 +502,36 @@ public void testRejectMinIsrChangeWhenElrEnabled(boolean removal) { assertEquals(Errors.NONE, result.response().error()); } } + + @ParameterizedTest + @ValueSource(booleans = {false, true}) + public void testElrUpgrade(boolean isMetadataVersionElrEnabled) { + FeatureControlManager featureManager = new FeatureControlManager.Builder(). + setQuorumFeatures(new QuorumFeatures(0, + QuorumFeatures.defaultSupportedFeatureMap(true), + Collections.emptyList())). + setMetadataVersion(isMetadataVersionElrEnabled ? MetadataVersion.IBP_4_0_IV1 : MetadataVersion.IBP_4_0_IV0). + build(); + ConfigurationControlManager manager = new ConfigurationControlManager.Builder(). + setStaticConfig(Map.of(TopicConfig.MIN_IN_SYNC_REPLICAS_CONFIG, "2")). + setFeatureControl(featureManager). + setKafkaConfigSchema(SCHEMA). + build(); + assertFalse(featureManager.isElrFeatureEnabled()); + ControllerResult result = manager.updateFeatures( + Collections.singletonMap(EligibleLeaderReplicasVersion.FEATURE_NAME, + EligibleLeaderReplicasVersion.ELRV_1.featureLevel()), + Collections.singletonMap(EligibleLeaderReplicasVersion.FEATURE_NAME, + FeatureUpdate.UpgradeType.UPGRADE), + false); + assertNotNull(result.response()); + if (isMetadataVersionElrEnabled) { + assertEquals(Errors.NONE, result.response().error()); + RecordTestUtils.replayAll(manager, result.records()); + RecordTestUtils.replayAll(featureManager, result.records()); + assertTrue(featureManager.isElrFeatureEnabled()); + } else { + assertEquals(Errors.INVALID_UPDATE_VERSION, result.response().error()); + } + } } diff --git a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java index d6ee95c8ccf4f..0706e4e738fdd 100644 --- a/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java +++ b/metadata/src/test/java/org/apache/kafka/metadata/storage/FormatterTest.java @@ -40,6 +40,8 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.Timeout; import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import org.junit.jupiter.params.provider.ValueSource; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -55,9 +57,11 @@ import java.util.Optional; import java.util.OptionalInt; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.kafka.metadata.storage.ScramParserTest.TEST_SALT; import static org.apache.kafka.metadata.storage.ScramParserTest.TEST_SALTED_PASSWORD; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotEquals; @@ -343,9 +347,6 @@ public void testFeatureFlag(short version) throws Exception { setName(MetadataVersion.FEATURE_NAME). setFeatureLevel(MetadataVersion.latestProduction().featureLevel()), (short) 0)); - expected.add(new ApiMessageAndVersion(new FeatureLevelRecord(). - setName(EligibleLeaderReplicasVersion.FEATURE_NAME). - setFeatureLevel(EligibleLeaderReplicasVersion.ELRV_1.featureLevel()), (short) 0)); expected.add(new ApiMessageAndVersion(new FeatureLevelRecord(). setName(GroupVersion.FEATURE_NAME). setFeatureLevel(GroupVersion.GV_1.featureLevel()), (short) 0)); @@ -457,6 +458,34 @@ public void testFormatWithInitialVotersFailsWithOlderMetadataVersion() throws Ex } } + private static Stream elrTestMetadataVersions() { + return Stream.of( + MetadataVersion.IBP_3_9_IV0, + MetadataVersion.IBP_4_0_IV0, + MetadataVersion.IBP_4_0_IV1 // ELR minimal MV + ).map(Arguments::of); + } + + @ParameterizedTest + @MethodSource("elrTestMetadataVersions") + public void testFormatElrEnabledWithMetadataVersions(MetadataVersion metadataVersion) throws Exception { + try (TestEnv testEnv = new TestEnv(2)) { + FormatterContext formatter1 = testEnv.newFormatter(); + formatter1.formatter.setReleaseVersion(metadataVersion); + formatter1.formatter.setFeatureLevel(EligibleLeaderReplicasVersion.FEATURE_NAME, (short) 1); + formatter1.formatter.setInitialControllers(DynamicVoters. + parse("1@localhost:8020:4znU-ou9Taa06bmEJxsjnw")); + if (metadataVersion.isAtLeast(MetadataVersion.IBP_4_0_IV1)) { + assertDoesNotThrow(() -> formatter1.formatter.run()); + } else { + assertEquals("eligible.leader.replicas.version could not be set to 1 because it depends on " + + "metadata.version level 23", + assertThrows(IllegalArgumentException.class, + () -> formatter1.formatter.run()).getMessage()); + } + } + } + @ParameterizedTest @ValueSource(booleans = {false, true}) public void testFormatWithNoInitialControllers(boolean specifyKRaftVersion) throws Exception { diff --git a/server-common/src/main/java/org/apache/kafka/server/common/EligibleLeaderReplicasVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/EligibleLeaderReplicasVersion.java index b9049b636f9bf..850cb28db5b14 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/EligibleLeaderReplicasVersion.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/EligibleLeaderReplicasVersion.java @@ -25,7 +25,7 @@ public enum EligibleLeaderReplicasVersion implements FeatureVersion { ELRV_0(0, MetadataVersion.MINIMUM_VERSION, Collections.emptyMap()), // Version 1 enables the ELR (KIP-966). - ELRV_1(1, MetadataVersion.IBP_4_0_IV1, Collections.emptyMap()); + ELRV_1(1, MetadataVersion.IBP_4_1_IV0, Map.of(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_4_0_IV1.featureLevel())); public static final String FEATURE_NAME = "eligible.leader.replicas.version"; diff --git a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java index 9207d08e3e78d..cbbdc15cd1bf6 100644 --- a/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java +++ b/server-common/src/main/java/org/apache/kafka/server/common/MetadataVersion.java @@ -94,7 +94,7 @@ public enum MetadataVersion { // Bootstrap metadata version for version 1 of the GroupVersion feature (KIP-848). IBP_4_0_IV0(22, "4.0", "IV0", false), - // Add ELR related supports (KIP-966). + // Add ELR related metadata records (KIP-966). Note, ELR is for preview only in 4.0. // PartitionRecord and PartitionChangeRecord are updated. // ClearElrRecord is added. IBP_4_0_IV1(23, "4.0", "IV1", true), @@ -111,7 +111,7 @@ public enum MetadataVersion { // Please move this comment when updating the LATEST_PRODUCTION constant. // - // Not used by anything yet. + // Enables ELR by default for new clusters (KIP-966). IBP_4_1_IV0(26, "4.1", "IV0", false); // NOTES when adding a new version: diff --git a/server-common/src/test/java/org/apache/kafka/server/common/FeatureTest.java b/server-common/src/test/java/org/apache/kafka/server/common/FeatureTest.java index 1e50a8b58c496..a7a9ee3d311bc 100644 --- a/server-common/src/test/java/org/apache/kafka/server/common/FeatureTest.java +++ b/server-common/src/test/java/org/apache/kafka/server/common/FeatureTest.java @@ -25,6 +25,8 @@ import java.util.Map; import static org.apache.kafka.server.common.Feature.validateDefaultValueAndLatestProductionValue; +import static org.apache.kafka.server.common.Feature.validateVersion; +import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertThrows; import static org.junit.jupiter.api.Assertions.assertTrue; @@ -291,4 +293,14 @@ public void testValidateWithMVDependencyAheadOfBootstrapMV() { assertEquals("Feature UNIT_TEST_VERSION_7 has default FeatureVersion UT_FV7_0 when MV=" + MetadataVersion.MINIMUM_VERSION + " with MV dependency 3.7-IV0 that is behind its bootstrap MV " + MetadataVersion.MINIMUM_VERSION + ".", exception.getMessage()); } + + @Test + public void testValidateEligibleLeaderReplicasVersion() { + assertThrows(IllegalArgumentException.class, () -> + validateVersion(EligibleLeaderReplicasVersion.ELRV_1, Map.of(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_4_0_IV0.featureLevel())), + "ELR requires MV to be at least 4.0IV1."); + assertDoesNotThrow(() -> + validateVersion(EligibleLeaderReplicasVersion.ELRV_1, Map.of(MetadataVersion.FEATURE_NAME, MetadataVersion.IBP_4_0_IV1.featureLevel())), + "ELR requires MV to be at least 4.0IV1."); + } }