Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-18839: Drop EAGER rebalancing support in Kafka Streams #18988

Open
wants to merge 10 commits into
base: trunk
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -794,13 +794,11 @@ public class StreamsConfig extends AbstractConfig {
/** {@code upgrade.from} */
@SuppressWarnings("WeakerAccess")
public static final String UPGRADE_FROM_CONFIG = "upgrade.from";
private static final String UPGRADE_FROM_DOC = "Allows upgrading in a backward compatible way. " +
"This is needed when upgrading from [0.10.0, 1.1] to 2.0+, or when upgrading from [2.0, 2.3] to 2.4+. " +
"When upgrading from 3.3 to a newer version it is not required to specify this config. Default is `null`. " +
"Accepted values are \"" + UPGRADE_FROM_0100 + "\", \"" + UPGRADE_FROM_0101 + "\", \"" +
UPGRADE_FROM_0102 + "\", \"" + UPGRADE_FROM_0110 + "\", \"" + UPGRADE_FROM_10 + "\", \"" +
UPGRADE_FROM_11 + "\", \"" + UPGRADE_FROM_20 + "\", \"" + UPGRADE_FROM_21 + "\", \"" +
UPGRADE_FROM_22 + "\", \"" + UPGRADE_FROM_23 + "\", \"" + UPGRADE_FROM_24 + "\", \"" +
private static final String UPGRADE_FROM_DOC = "Allows live upgrading (and downgrading in some cases -- see upgrade guide) in a backward compatible way. Default is `null`. " +
"Please refer to the Kafka Streams upgrade guide for instructions on how and when to use this config. " +
"Note that when upgrading from 3.5 to a newer version it is never required to specify this config, " +
"while upgrading live directly to 4.0+ from 2.3 or below is no longer supported even with this config. " +
"Accepted values are \"" + UPGRADE_FROM_24 + "\", \"" +
UPGRADE_FROM_25 + "\", \"" + UPGRADE_FROM_26 + "\", \"" + UPGRADE_FROM_27 + "\", \"" +
UPGRADE_FROM_28 + "\", \"" + UPGRADE_FROM_30 + "\", \"" + UPGRADE_FROM_31 + "\", \"" +
UPGRADE_FROM_32 + "\", \"" + UPGRADE_FROM_33 + "\", \"" + UPGRADE_FROM_34 + "\", \"" +
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@
import java.util.function.Supplier;
import java.util.stream.Collectors;

import static java.util.Collections.singletonList;
import static java.util.Collections.unmodifiableSet;
import static java.util.Map.Entry.comparingByKey;
import static org.apache.kafka.common.utils.Utils.filterMap;
Expand Down Expand Up @@ -216,11 +217,13 @@ public interface UserTaskAssignmentListener {
private Queue<StreamsException> nonFatalExceptionsToHandle;
private Time time;

// since live upgrades from 2.3 (or earlier) to 4.0 or above are no longer supported, we can always
// start with the latest supported metadata version since version probing will take
// care of downgrading it if/when necessary
protected int usedSubscriptionMetadataVersion = LATEST_SUPPORTED_VERSION;

private InternalTopicManager internalTopicManager;
private CopartitionedTopicsEnforcer copartitionedTopicsEnforcer;
private RebalanceProtocol rebalanceProtocol;
private AssignmentListener assignmentListener;

private Supplier<Optional<org.apache.kafka.streams.processor.assignment.TaskAssignor>>
Expand All @@ -242,7 +245,6 @@ public void configure(final Map<String, ?> configs) {

logPrefix = assignorConfiguration.logPrefix();
log = new LogContext(logPrefix).logger(getClass());
usedSubscriptionMetadataVersion = assignorConfiguration.configuredMetadataVersion(usedSubscriptionMetadataVersion);

final ReferenceContainer referenceContainer = assignorConfiguration.referenceContainer();
mainConsumerSupplier = () -> Objects.requireNonNull(referenceContainer.mainConsumer, "Main consumer was not specified");
Expand All @@ -258,7 +260,6 @@ public void configure(final Map<String, ?> configs) {
userEndPoint = assignorConfiguration.userEndPoint();
internalTopicManager = assignorConfiguration.internalTopicManager();
copartitionedTopicsEnforcer = assignorConfiguration.copartitionedTopicsEnforcer();
rebalanceProtocol = assignorConfiguration.rebalanceProtocol();
customTaskAssignorSupplier = assignorConfiguration::customTaskAssignor;
legacyTaskAssignorSupplier = assignorConfiguration::taskAssignor;
assignmentListener = assignorConfiguration.assignmentListener();
Expand All @@ -273,12 +274,7 @@ public String name() {

@Override
public List<RebalanceProtocol> supportedProtocols() {
final List<RebalanceProtocol> supportedProtocols = new ArrayList<>();
supportedProtocols.add(RebalanceProtocol.EAGER);
if (rebalanceProtocol == RebalanceProtocol.COOPERATIVE) {
supportedProtocols.add(rebalanceProtocol);
}
return supportedProtocols;
return singletonList(RebalanceProtocol.COOPERATIVE);
}

@Override
Expand Down Expand Up @@ -1669,10 +1665,6 @@ void setInternalTopicManager(final InternalTopicManager internalTopicManager) {
this.internalTopicManager = internalTopicManager;
}

RebalanceProtocol rebalanceProtocol() {
return rebalanceProtocol;
}

protected String userEndPoint() {
return userEndPoint;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
package org.apache.kafka.streams.processor.internals.assignment;

import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.RebalanceProtocol;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.config.ConfigException;
import org.apache.kafka.common.utils.LogContext;
Expand All @@ -37,7 +36,6 @@
import static org.apache.kafka.common.utils.Utils.getHost;
import static org.apache.kafka.common.utils.Utils.getPort;
import static org.apache.kafka.streams.StreamsConfig.InternalConfig.INTERNAL_TASK_ASSIGNOR_CLASS;
import static org.apache.kafka.streams.processor.internals.assignment.StreamsAssignmentProtocolVersions.LATEST_SUPPORTED_VERSION;

public final class AssignorConfiguration {
private final String internalTaskAssignorClass;
Expand All @@ -61,6 +59,8 @@ public AssignorConfiguration(final Map<String, ?> configs) {
final LogContext logContext = new LogContext(logPrefix);
log = logContext.logger(getClass());

validateUpgradeFrom();

{
final Object o = configs.get(InternalConfig.REFERENCE_CONTAINER_PARTITION_ASSIGNOR);
if (o == null) {
Expand Down Expand Up @@ -94,7 +94,9 @@ public ReferenceContainer referenceContainer() {
return referenceContainer;
}

public RebalanceProtocol rebalanceProtocol() {
// cooperative rebalancing was introduced in 2.4 and the old protocol (eager rebalancing) was removed
// in 4.0, meaning live upgrades from 2.3 or below to 4.0+ are no longer possible without a bridge release
public void validateUpgradeFrom() {
final String upgradeFrom = streamsConfig.getString(StreamsConfig.UPGRADE_FROM_CONFIG);
if (upgradeFrom != null) {
switch (UpgradeFromValues.fromString(upgradeFrom)) {
Expand All @@ -108,106 +110,20 @@ public RebalanceProtocol rebalanceProtocol() {
case UPGRADE_FROM_21:
case UPGRADE_FROM_22:
case UPGRADE_FROM_23:
// ATTENTION: The following log messages is used for verification in system test
// streams/streams_cooperative_rebalance_upgrade_test.py::StreamsCooperativeRebalanceUpgradeTest.test_upgrade_to_cooperative_rebalance
// If you change it, please do also change the system test accordingly and
// verify whether the test passes.
log.info("Eager rebalancing protocol is enabled now for upgrade from {}.x", upgradeFrom);
log.warn("The eager rebalancing protocol is deprecated and will stop being supported in a future release." +
" Please be prepared to remove the 'upgrade.from' config soon.");
return RebalanceProtocol.EAGER;
case UPGRADE_FROM_24:
case UPGRADE_FROM_25:
case UPGRADE_FROM_26:
case UPGRADE_FROM_27:
case UPGRADE_FROM_28:
case UPGRADE_FROM_30:
case UPGRADE_FROM_31:
case UPGRADE_FROM_32:
case UPGRADE_FROM_33:
case UPGRADE_FROM_34:
case UPGRADE_FROM_35:
case UPGRADE_FROM_36:
case UPGRADE_FROM_37:
case UPGRADE_FROM_38:
case UPGRADE_FROM_39:
// we need to add new version when new "upgrade.from" values become available
final String errMsg = String.format(
"The eager rebalancing protocol is no longer supported in 4.0 which means live upgrades from 2.3 or below are not possible."
+ " Please see the Streams upgrade guide for the bridge releases and recommended upgrade path. Got upgrade.from='%s'", upgradeFrom);
log.error(errMsg);
throw new ConfigException(errMsg);

// This config is for explicitly sending FK response to a requested partition
// and should not affect the rebalance protocol
break;
default:
throw new IllegalArgumentException("Unknown configuration value for parameter 'upgrade.from': " + upgradeFrom);
}
}
// ATTENTION: The following log messages is used for verification in system test
// streams/streams_cooperative_rebalance_upgrade_test.py::StreamsCooperativeRebalanceUpgradeTest.test_upgrade_to_cooperative_rebalance
// If you change it, please do also change the system test accordingly and
// verify whether the test passes.
log.info("Cooperative rebalancing protocol is enabled now");
return RebalanceProtocol.COOPERATIVE;
}

public String logPrefix() {
return logPrefix;
}

public int configuredMetadataVersion(final int priorVersion) {
final String upgradeFrom = streamsConfig.getString(StreamsConfig.UPGRADE_FROM_CONFIG);
if (upgradeFrom != null) {
switch (UpgradeFromValues.fromString(upgradeFrom)) {
case UPGRADE_FROM_0100:
log.info(
"Downgrading metadata.version from {} to 1 for upgrade from 0.10.0.x.",
LATEST_SUPPORTED_VERSION
);
return 1;
case UPGRADE_FROM_0101:
case UPGRADE_FROM_0102:
case UPGRADE_FROM_0110:
case UPGRADE_FROM_10:
case UPGRADE_FROM_11:
log.info(
"Downgrading metadata.version from {} to 2 for upgrade from {}.x.",
LATEST_SUPPORTED_VERSION,
upgradeFrom
);
return 2;
case UPGRADE_FROM_20:
case UPGRADE_FROM_21:
case UPGRADE_FROM_22:
case UPGRADE_FROM_23:
// These configs are for cooperative rebalancing and should not affect the metadata version
break;
case UPGRADE_FROM_24:
case UPGRADE_FROM_25:
case UPGRADE_FROM_26:
case UPGRADE_FROM_27:
case UPGRADE_FROM_28:
case UPGRADE_FROM_30:
case UPGRADE_FROM_31:
case UPGRADE_FROM_32:
case UPGRADE_FROM_33:
case UPGRADE_FROM_34:
case UPGRADE_FROM_35:
case UPGRADE_FROM_36:
case UPGRADE_FROM_37:
case UPGRADE_FROM_38:
case UPGRADE_FROM_39:
// we need to add new version when new "upgrade.from" values become available

// This config is for explicitly sending FK response to a requested partition
// and should not affect the metadata version
break;
default:
throw new IllegalArgumentException(
"Unknown configuration value for parameter 'upgrade.from': " + upgradeFrom
);
}
}
return priorVersion;
}

public String userEndPoint() {
final String configuredUserEndpoint = streamsConfig.getString(StreamsConfig.APPLICATION_SERVER_CONFIG);
if (configuredUserEndpoint != null && !configuredUserEndpoint.isEmpty()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
Expand Down Expand Up @@ -371,23 +370,11 @@ private void setUp(final Map<String, Object> parameterizedConfig, final boolean

@ParameterizedTest
@MethodSource("parameter")
public void shouldUseEagerRebalancingProtocol(final Map<String, Object> parameterizedConfig) {
setUp(parameterizedConfig, false);
createDefaultMockTaskManager();
configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.UPGRADE_FROM_CONFIG, StreamsConfig.UPGRADE_FROM_23), parameterizedConfig);

assertEquals(1, partitionAssignor.supportedProtocols().size());
assertTrue(partitionAssignor.supportedProtocols().contains(RebalanceProtocol.EAGER));
assertFalse(partitionAssignor.supportedProtocols().contains(RebalanceProtocol.COOPERATIVE));
}

@ParameterizedTest
@MethodSource("parameter")
public void shouldUseCooperativeRebalancingProtocol(final Map<String, Object> parameterizedConfig) {
public void shouldSupportOnlyCooperativeRebalancingProtocol(final Map<String, Object> parameterizedConfig) {
setUp(parameterizedConfig, false);
configureDefault(parameterizedConfig);

assertEquals(2, partitionAssignor.supportedProtocols().size());
assertEquals(1, partitionAssignor.supportedProtocols().size());
assertTrue(partitionAssignor.supportedProtocols().contains(RebalanceProtocol.COOPERATIVE));
}

Expand Down Expand Up @@ -586,7 +573,7 @@ public void shouldInterleaveTasksByGroupIdDuringNewAssignment(final Map<String,

@ParameterizedTest
@MethodSource("parameter")
public void testEagerSubscription(final Map<String, Object> parameterizedConfig) {
public void shouldThrowOnEagerSubscription(final Map<String, Object> parameterizedConfig) {
setUp(parameterizedConfig, false);
builder.addSource(null, "source1", null, null, null, "topic1");
builder.addSource(null, "source2", null, null, null, "topic2");
Expand All @@ -600,17 +587,10 @@ public void testEagerSubscription(final Map<String, Object> parameterizedConfig)
);

createMockTaskManager(prevTasks, standbyTasks);
configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.UPGRADE_FROM_CONFIG, StreamsConfig.UPGRADE_FROM_23), parameterizedConfig);
assertThat(partitionAssignor.rebalanceProtocol(), equalTo(RebalanceProtocol.EAGER));

final Set<String> topics = Set.of("topic1", "topic2");
final Subscription subscription = new Subscription(new ArrayList<>(topics), partitionAssignor.subscriptionUserData(topics));

Collections.sort(subscription.topics());
assertEquals(asList("topic1", "topic2"), subscription.topics());

final SubscriptionInfo info = getInfo(PID_1, prevTasks, standbyTasks, uniqueField);
assertEquals(info, SubscriptionInfo.decode(subscription.userData()));
assertThrows(
ConfigException.class,
() -> configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.UPGRADE_FROM_CONFIG, StreamsConfig.UPGRADE_FROM_23), parameterizedConfig)
);
}

@ParameterizedTest
Expand Down Expand Up @@ -2135,64 +2115,6 @@ private void shouldReturnLowestAssignmentVersionForDifferentSubscriptionVersions
assertThat(AssignmentInfo.decode(assignment.get("consumer2").userData()).version(), equalTo(smallestVersion));
}

@ParameterizedTest
@MethodSource("parameter")
public void shouldDownGradeSubscriptionToVersion1(final Map<String, Object> parameterizedConfig) {
setUp(parameterizedConfig, false);
createDefaultMockTaskManager();
configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.UPGRADE_FROM_CONFIG, StreamsConfig.UPGRADE_FROM_0100), parameterizedConfig);

final Set<String> topics = Set.of("topic1");
final Subscription subscription = new Subscription(new ArrayList<>(topics), partitionAssignor.subscriptionUserData(topics));

assertThat(SubscriptionInfo.decode(subscription.userData()).version(), equalTo(1));
}

@ParameterizedTest
@MethodSource("parameter")
public void shouldDownGradeSubscriptionToVersion2For0101(final Map<String, Object> parameterizedConfig) {
setUp(parameterizedConfig, false);
shouldDownGradeSubscriptionToVersion2(StreamsConfig.UPGRADE_FROM_0101, parameterizedConfig);
}

@ParameterizedTest
@MethodSource("parameter")
public void shouldDownGradeSubscriptionToVersion2For0102(final Map<String, Object> parameterizedConfig) {
setUp(parameterizedConfig, false);
shouldDownGradeSubscriptionToVersion2(StreamsConfig.UPGRADE_FROM_0102, parameterizedConfig);
}

@ParameterizedTest
@MethodSource("parameter")
public void shouldDownGradeSubscriptionToVersion2For0110(final Map<String, Object> parameterizedConfig) {
setUp(parameterizedConfig, false);
shouldDownGradeSubscriptionToVersion2(StreamsConfig.UPGRADE_FROM_0110, parameterizedConfig);
}

@ParameterizedTest
@MethodSource("parameter")
public void shouldDownGradeSubscriptionToVersion2For10(final Map<String, Object> parameterizedConfig) {
setUp(parameterizedConfig, false);
shouldDownGradeSubscriptionToVersion2(StreamsConfig.UPGRADE_FROM_10, parameterizedConfig);
}

@ParameterizedTest
@MethodSource("parameter")
public void shouldDownGradeSubscriptionToVersion2For11(final Map<String, Object> parameterizedConfig) {
setUp(parameterizedConfig, false);
shouldDownGradeSubscriptionToVersion2(StreamsConfig.UPGRADE_FROM_11, parameterizedConfig);
}

private void shouldDownGradeSubscriptionToVersion2(final Object upgradeFromValue, final Map<String, Object> parameterizedConfig) {
createDefaultMockTaskManager();
configurePartitionAssignorWith(Collections.singletonMap(StreamsConfig.UPGRADE_FROM_CONFIG, upgradeFromValue), parameterizedConfig);

final Set<String> topics = Set.of("topic1");
final Subscription subscription = new Subscription(new ArrayList<>(topics), partitionAssignor.subscriptionUserData(topics));

assertThat(SubscriptionInfo.decode(subscription.userData()).version(), equalTo(2));
}

@ParameterizedTest
@MethodSource("parameter")
public void shouldReturnInterleavedAssignmentWithUnrevokedPartitionsRemovedWhenNewConsumerJoins(final Map<String, Object> parameterizedConfig) {
Expand Down
Loading