From fc586ac104e72b0b70a43eae767135ca72d4634e Mon Sep 17 00:00:00 2001 From: Jinhe Zhang Date: Wed, 22 Oct 2025 23:54:38 -0400 Subject: [PATCH 01/11] config setup --- .../kafka/coordinator/group/GroupConfig.java | 20 ++++++++++++++++++- .../group/GroupCoordinatorConfig.java | 16 ++++++++++++++- 2 files changed, 34 insertions(+), 2 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java index 565d492507be6..bdb0cbb9f1ce8 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupConfig.java @@ -75,6 +75,8 @@ public final class GroupConfig extends AbstractConfig { public static final String STREAMS_NUM_STANDBY_REPLICAS_CONFIG = "streams.num.standby.replicas"; + public static final String STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG = "streams.initial.rebalance.delay.ms"; + public final int consumerSessionTimeoutMs; public final int consumerHeartbeatIntervalMs; @@ -93,6 +95,8 @@ public final class GroupConfig extends AbstractConfig { public final int streamsNumStandbyReplicas; + public final int streamsInitialRebalanceDelayMs; + public final String shareIsolationLevel; private static final ConfigDef CONFIG = new ConfigDef() @@ -155,7 +159,13 @@ public final class GroupConfig extends AbstractConfig { GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT, atLeast(0), MEDIUM, - GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_DOC); + GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_DOC) + .define(STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG, + INT, + GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT, + atLeast(0), + MEDIUM, + GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DOC); public GroupConfig(Map props) { super(CONFIG, props, false); @@ -168,6 +178,7 @@ public GroupConfig(Map props) { this.streamsSessionTimeoutMs = getInt(STREAMS_SESSION_TIMEOUT_MS_CONFIG); this.streamsHeartbeatIntervalMs = getInt(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG); this.streamsNumStandbyReplicas = getInt(STREAMS_NUM_STANDBY_REPLICAS_CONFIG); + this.streamsInitialRebalanceDelayMs = getInt(STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG); this.shareIsolationLevel = getString(SHARE_ISOLATION_LEVEL_CONFIG); } @@ -379,6 +390,13 @@ public int streamsNumStandbyReplicas() { return streamsNumStandbyReplicas; } + /** + * The initial rebalance delay for streams groups. + */ + public int streamsInitialRebalanceDelayMs() { + return streamsInitialRebalanceDelayMs; + } + /** * The share group isolation level. */ diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java index e8a2f49663955..a2a99707dc710 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfig.java @@ -290,6 +290,10 @@ public class GroupCoordinatorConfig { public static final int STREAMS_GROUP_MAX_STANDBY_REPLICAS_DEFAULT = 2; public static final String STREAMS_GROUP_MAX_STANDBY_REPLICAS_DOC = "The maximum allowed value for the group-level configuration of " + GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG; + public static final String STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG = "group.streams.initial.rebalance.delay.ms"; + public static final int STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT = 3000; + public static final String STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DOC = "The amount of time the group coordinator will wait for more streams clients to join a new group before performing the first rebalance. A longer delay means potentially fewer rebalances."; + public static final String SHARE_GROUP_INITIALIZE_RETRY_INTERVAL_MS_CONFIG = "group.share.initialize.retry.interval.ms"; // Because persister retries with exp backoff 5 times and upper cap of 30 secs. public static final int SHARE_GROUP_INITIALIZE_RETRY_INTERVAL_MS_DEFAULT = 30_000; @@ -352,7 +356,8 @@ public class GroupCoordinatorConfig { .define(STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, INT, STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC) .define(STREAMS_GROUP_MAX_SIZE_CONFIG, INT, STREAMS_GROUP_MAX_SIZE_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_MAX_SIZE_DOC) .define(STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG, INT, STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT, atLeast(0), MEDIUM, STREAMS_GROUP_NUM_STANDBY_REPLICAS_DOC) - .define(STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG, INT, STREAMS_GROUP_MAX_STANDBY_REPLICAS_DEFAULT, atLeast(0), MEDIUM, STREAMS_GROUP_MAX_STANDBY_REPLICAS_DOC); + .define(STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG, INT, STREAMS_GROUP_MAX_STANDBY_REPLICAS_DEFAULT, atLeast(0), MEDIUM, STREAMS_GROUP_MAX_STANDBY_REPLICAS_DOC) + .define(STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, INT, STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT, atLeast(0), MEDIUM, STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DOC); /** @@ -405,6 +410,7 @@ public class GroupCoordinatorConfig { private final int streamsGroupMaxSize; private final int streamsGroupNumStandbyReplicas; private final int streamsGroupMaxStandbyReplicas; + private final int streamsGroupInitialRebalanceDelayMs; @SuppressWarnings("this-escape") public GroupCoordinatorConfig(AbstractConfig config) { @@ -457,6 +463,7 @@ public GroupCoordinatorConfig(AbstractConfig config) { this.streamsGroupMaxSize = config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MAX_SIZE_CONFIG); this.streamsGroupNumStandbyReplicas = config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG); this.streamsGroupMaxStandbyReplicas = config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG); + this.streamsGroupInitialRebalanceDelayMs = config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG); // New group coordinator configs validation. require(consumerGroupMaxHeartbeatIntervalMs >= consumerGroupMinHeartbeatIntervalMs, @@ -961,4 +968,11 @@ public int streamsGroupNumStandbyReplicas() { public int streamsGroupMaxNumStandbyReplicas() { return streamsGroupMaxStandbyReplicas; } + + /** + * The initial rebalance delay for streams groups. + */ + public int streamsGroupInitialRebalanceDelayMs() { + return streamsGroupInitialRebalanceDelayMs; + } } From 33f966714db277eb68016f0482049d411b8c5f4b Mon Sep 17 00:00:00 2001 From: Jinhe Zhang Date: Thu, 23 Oct 2025 22:07:07 -0400 Subject: [PATCH 02/11] add rebalance delay logic --- .../common/runtime/CoordinatorRuntime.java | 5 ++ .../common/runtime/CoordinatorTimer.java | 8 ++ .../common/runtime/MockCoordinatorTimer.java | 8 ++ .../group/GroupMetadataManager.java | 84 ++++++++++++++++--- 4 files changed, 94 insertions(+), 11 deletions(-) diff --git a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java index 965f8074f8014..e5b80651b817d 100644 --- a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java +++ b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntime.java @@ -441,6 +441,11 @@ public void cancel(String key) { if (prevTask != null) prevTask.cancel(); } + @Override + public boolean isScheduled(String key) { + return tasks.containsKey(key); + } + public void cancelAll() { Iterator> iterator = tasks.entrySet().iterator(); while (iterator.hasNext()) { diff --git a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimer.java b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimer.java index d10e38a7d828b..2f288df3026d0 100644 --- a/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimer.java +++ b/coordinator-common/src/main/java/org/apache/kafka/coordinator/common/runtime/CoordinatorTimer.java @@ -84,4 +84,12 @@ interface TimeoutOperation { * @param key The key. */ void cancel(String key); + + /** + * Check if an operation with the given key is scheduled. + * + * @param key The key. + * @return True if an operation with the key is scheduled, false otherwise. + */ + boolean isScheduled(String key); } diff --git a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorTimer.java b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorTimer.java index 78e14ac576b39..69e3954a0a63a 100644 --- a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorTimer.java +++ b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/MockCoordinatorTimer.java @@ -110,6 +110,14 @@ public void cancel(String key) { } } + /** + * Checks if a timeout with the given key is scheduled. + */ + @Override + public boolean isScheduled(String key) { + return timeoutMap.containsKey(key); + } + /** * @return True if a timeout with the key exists; false otherwise. */ diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 66f755148e9de..6fc675f1f4208 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -1985,30 +1985,54 @@ private CoordinatorResult stream // Actually bump the group epoch int groupEpoch = group.groupEpoch(); + boolean isInitialRebalance = group.isEmpty(); if (bumpGroupEpoch) { - groupEpoch += 1; + if (isInitialRebalance) { + groupEpoch = 2; + } else { + groupEpoch += 1; + } records.add(newStreamsGroupMetadataRecord(groupId, groupEpoch, metadataHash, validatedTopologyEpoch, currentAssignmentConfigs)); log.info("[GroupId {}][MemberId {}] Bumped streams group epoch to {} with metadata hash {} and validated topic epoch {}.", groupId, memberId, groupEpoch, metadataHash, validatedTopologyEpoch); metrics.record(STREAMS_GROUP_REBALANCES_SENSOR_NAME); group.setMetadataRefreshDeadline(currentTimeMs + METADATA_REFRESH_INTERVAL_MS, groupEpoch); } + // Schedule initial rebalance delay for new streams groups to coalesce joins. + int initialDelayMs = streamsGroupInitialRebalanceDelayMs(groupId); + if (isInitialRebalance & initialDelayMs > 0) { + timer.scheduleIfAbsent( + streamsInitialRebalanceKey(groupId), + initialDelayMs, + TimeUnit.MILLISECONDS, + false, + () -> fireStreamsInitialRebalance(groupId) + ); + } + // 4. Update the target assignment if the group epoch is larger than the target assignment epoch or a static member // replaces an existing static member. // The delta between the existing and the new target assignment is persisted to the partition. int targetAssignmentEpoch; TasksTuple targetAssignment; if (groupEpoch > group.assignmentEpoch()) { - targetAssignment = updateStreamsTargetAssignment( - group, - groupEpoch, - updatedMember, - updatedConfiguredTopology, - metadataImage, - records, - currentAssignmentConfigs - ); - targetAssignmentEpoch = groupEpoch; + boolean initialDelayActive = timer.isScheduled(streamsInitialRebalanceKey(groupId)); + if (initialDelayActive && group.assignmentEpoch() == 0) { + // During initial rebalance delay, return empty assignment to first joining members. + targetAssignmentEpoch = group.assignmentEpoch(); + targetAssignment = TasksTuple.EMPTY; + } else { + targetAssignment = updateStreamsTargetAssignment( + group, + groupEpoch, + updatedMember, + updatedConfiguredTopology, + metadataImage, + records, + currentAssignmentConfigs + ); + targetAssignmentEpoch = groupEpoch; + } } else { targetAssignmentEpoch = group.assignmentEpoch(); targetAssignment = group.targetAssignment(updatedMember.memberId()); @@ -8570,6 +8594,10 @@ private boolean maybeDeleteEmptyStreamsGroup(String groupId, List groupConfig = groupConfigManager.groupConfig(groupId); + return groupConfig.map(GroupConfig::streamsInitialRebalanceDelayMs) + .orElse(config.streamsGroupInitialRebalanceDelayMs()); + } + /** * Get the assignor of the provided streams group. */ @@ -8716,6 +8753,31 @@ static String classicGroupSyncKey(String groupId) { return "sync-" + groupId; } + /** + * Callback when the initial rebalance delay timer expires. + * This is a no-op as the actual assignment computation happens on the next heartbeat. + * + * @param groupId The group id. + * + * @return An empty result. + */ + private CoordinatorResult fireStreamsInitialRebalance(String groupId) { + return EMPTY_RESULT; + } + + /** + * Generate a streams group initial rebalance key for the timer. + * + * Package private for testing. + * + * @param groupId The group id. + * + * @return the initial rebalance key. + */ + static String streamsInitialRebalanceKey(String groupId) { + return "initial-rebalance-timeout-" + groupId; + } + /** * Generate a consumer group join key for the timer. * From e57d4a18c2063918f855e05f668ac6c395a2ce5e Mon Sep 17 00:00:00 2001 From: Jinhe Zhang Date: Fri, 24 Oct 2025 16:48:12 -0400 Subject: [PATCH 03/11] fix --- .../coordinator/group/GroupMetadataManager.java | 17 +++-------------- 1 file changed, 3 insertions(+), 14 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 6fc675f1f4208..73b3bdadc0b3e 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -1985,10 +1985,10 @@ private CoordinatorResult stream // Actually bump the group epoch int groupEpoch = group.groupEpoch(); - boolean isInitialRebalance = group.isEmpty(); + boolean isInitialRebalance = (group.isEmpty() && groupEpoch == 0); if (bumpGroupEpoch) { if (isInitialRebalance) { - groupEpoch = 2; + groupEpoch += 2; } else { groupEpoch += 1; } @@ -2006,7 +2006,7 @@ private CoordinatorResult stream initialDelayMs, TimeUnit.MILLISECONDS, false, - () -> fireStreamsInitialRebalance(groupId) + () -> EMPTY_RESULT ); } @@ -8753,17 +8753,6 @@ static String classicGroupSyncKey(String groupId) { return "sync-" + groupId; } - /** - * Callback when the initial rebalance delay timer expires. - * This is a no-op as the actual assignment computation happens on the next heartbeat. - * - * @param groupId The group id. - * - * @return An empty result. - */ - private CoordinatorResult fireStreamsInitialRebalance(String groupId) { - return EMPTY_RESULT; - } /** * Generate a streams group initial rebalance key for the timer. From 164143f4b6a2dd24379c6a638ed2e05166cab1db Mon Sep 17 00:00:00 2001 From: Jinhe Zhang Date: Mon, 27 Oct 2025 14:21:55 -0400 Subject: [PATCH 04/11] fire initial rebalance --- .../group/GroupMetadataManager.java | 70 ++++++++++++++++++- 1 file changed, 68 insertions(+), 2 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 73b3bdadc0b3e..4ad2f19439f9a 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -2000,13 +2000,13 @@ private CoordinatorResult stream // Schedule initial rebalance delay for new streams groups to coalesce joins. int initialDelayMs = streamsGroupInitialRebalanceDelayMs(groupId); - if (isInitialRebalance & initialDelayMs > 0) { + if (isInitialRebalance && initialDelayMs > 0) { timer.scheduleIfAbsent( streamsInitialRebalanceKey(groupId), initialDelayMs, TimeUnit.MILLISECONDS, false, - () -> EMPTY_RESULT + () -> fireStreamsInitialRebalance(groupId) ); } @@ -4027,6 +4027,72 @@ private TasksTuple updateStreamsTargetAssignment( } } + /** + * Fires the initial rebalance for a streams group when the delay timer expires. + * Computes and persists target assignment for all members if conditions are met. + * + * @param groupId The group id. + * @return A CoordinatorResult with records to persist the target assignment, or EMPTY_RESULT. + */ + private CoordinatorResult fireStreamsInitialRebalance( + String groupId + ) { + try { + StreamsGroup group = streamsGroup(groupId); + + if (group.groupEpoch() <= group.assignmentEpoch()) { + return EMPTY_RESULT; + } + + if (!group.configuredTopology().isPresent()) { + return EMPTY_RESULT; + } + + List records = new ArrayList<>(); + TaskAssignor assignor = streamsGroupAssignor(group.groupId()); + + try { + org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder assignmentResultBuilder = + new org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder( + group.groupId(), + group.groupEpoch(), + assignor, + group.lastAssignmentConfigs() + ) + .withMembers(group.members()) + .withTopology(group.configuredTopology().get()) + .withStaticMembers(group.staticMembers()) + .withMetadataImage(metadataImage) + .withTargetAssignment(group.targetAssignment()); + + long startTimeMs = time.milliseconds(); + org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.TargetAssignmentResult assignmentResult = + assignmentResultBuilder.build(); + long assignorTimeMs = time.milliseconds() - startTimeMs; + + if (log.isDebugEnabled()) { + log.debug("[GroupId {}] Initial rebalance: Computed target assignment for epoch {} with '{}' assignor in {}ms: {}.", + group.groupId(), group.groupEpoch(), assignor, assignorTimeMs, assignmentResult.targetAssignment()); + } else { + log.info("[GroupId {}] Initial rebalance: Computed target assignment for epoch {} with '{}' assignor in {}ms.", + group.groupId(), group.groupEpoch(), assignor, assignorTimeMs); + } + + records.addAll(assignmentResult.records()); + + return new CoordinatorResult<>(records, null); + } catch (TaskAssignorException ex) { + String msg = String.format("Failed to compute target assignment for initial rebalance at epoch %d: %s", + group.groupEpoch(), ex.getMessage()); + log.error("[GroupId {}] {}.", group.groupId(), msg); + throw new UnknownServerException(msg, ex); + } + } catch (GroupIdNotFoundException ex) { + log.warn("[GroupId {}] Group not found during initial rebalance.", groupId); + return EMPTY_RESULT; + } + } + /** * Handles leave request from a consumer group member. * @param groupId The group id from the request. From e2410bc18c38d25193adf377872af0f4fd5af69b Mon Sep 17 00:00:00 2001 From: Jinhe Zhang Date: Mon, 27 Oct 2025 15:26:29 -0400 Subject: [PATCH 05/11] add test for groupcoordinator config --- .../group/GroupMetadataManager.java | 5 +--- .../group/GroupCoordinatorConfigTest.java | 23 +++++++++++++++++++ 2 files changed, 24 insertions(+), 4 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 4ad2f19439f9a..29adbbf6ede07 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -4048,7 +4048,6 @@ private CoordinatorResult fireStreamsInitialRebalance( return EMPTY_RESULT; } - List records = new ArrayList<>(); TaskAssignor assignor = streamsGroupAssignor(group.groupId()); try { @@ -4078,9 +4077,7 @@ private CoordinatorResult fireStreamsInitialRebalance( group.groupId(), group.groupEpoch(), assignor, assignorTimeMs); } - records.addAll(assignmentResult.records()); - - return new CoordinatorResult<>(records, null); + return new CoordinatorResult<>(assignmentResult.records(), null); } catch (TaskAssignorException ex) { String msg = String.format("Failed to compute target assignment for initial rebalance at epoch %d: %s", group.groupEpoch(), ex.getMessage()); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java index 491df993e0999..91795064f4dd3 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java @@ -198,6 +198,7 @@ public void testConfigs() { configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, 111); configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, 222); configs.put(GroupCoordinatorConfig.CONSUMER_GROUP_REGEX_REFRESH_INTERVAL_MS_CONFIG, 15 * 60 * 1000); + configs.put(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 5000); GroupCoordinatorConfig config = createConfig(configs); @@ -227,6 +228,7 @@ public void testConfigs() { assertEquals(111, config.consumerGroupMinHeartbeatIntervalMs()); assertEquals(222, config.consumerGroupMaxHeartbeatIntervalMs()); assertEquals(15 * 60 * 1000, config.consumerGroupRegexRefreshIntervalMs()); + assertEquals(5000, config.streamsGroupInitialRebalanceDelayMs()); } @Test @@ -323,6 +325,11 @@ public void testInvalidConfigs() { configs.put(GroupCoordinatorConfig.STREAMS_GROUP_SESSION_TIMEOUT_MS_CONFIG, 50000); assertEquals("group.streams.heartbeat.interval.ms must be less than group.streams.session.timeout.ms", assertThrows(IllegalArgumentException.class, () -> createConfig(configs)).getMessage()); + + configs.clear(); + configs.put(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, -1); + assertEquals("Invalid value -1 for configuration group.streams.initial.rebalance.delay.ms: Value must be at least 0", + assertThrows(ConfigException.class, () -> createConfig(configs)).getMessage()); } public static GroupCoordinatorConfig createGroupCoordinatorConfig( @@ -359,6 +366,22 @@ public static GroupCoordinatorConfig createGroupCoordinatorConfig( return createConfig(configs); } + @Test + public void testStreamsGroupInitialRebalanceDelayDefaultValue() { + GroupCoordinatorConfig config = createConfig(configs); + assertEquals(3000, config.streamsGroupInitialRebalanceDelayMs()); + assertEquals(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT, + config.streamsGroupInitialRebalanceDelayMs()); + } + + @Test + public void testStreamsGroupInitialRebalanceDelayCustomValue() { + Map configs = new HashMap<>(); + configs.put(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 7000); + GroupCoordinatorConfig config = createConfig(configs); + assertEquals(7000, config.streamsGroupInitialRebalanceDelayMs()); + } + public static GroupCoordinatorConfig createConfig(Map configs) { return new GroupCoordinatorConfig(new AbstractConfig( GroupCoordinatorConfig.CONFIG_DEF, From 3c163b72c6b2893e535a3e5a2d375cf1737eef29 Mon Sep 17 00:00:00 2001 From: Jinhe Zhang Date: Mon, 27 Oct 2025 15:50:41 -0400 Subject: [PATCH 06/11] test for groupconfig and groupcoordinatorconfig --- .../coordinator/group/GroupConfigTest.java | 5 + .../group/GroupCoordinatorConfigTest.java | 1 + .../group/GroupMetadataManagerTest.java | 164 ++++++++++++++++++ 3 files changed, 170 insertions(+) diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java index 77014de5bf18a..a68379268182b 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupConfigTest.java @@ -68,6 +68,8 @@ public void testFromPropsInvalid() { assertPropertyInvalid(name, "not_a_number", "1.0"); } else if (GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG.equals(name)) { assertPropertyInvalid(name, "not_a_number", "1.0"); + } else if (GroupConfig.STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG.equals(name)) { + assertPropertyInvalid(name, "not_a_number", "-1", "1.0"); } else { assertPropertyInvalid(name, "not_a_number", "-0.1"); } @@ -237,6 +239,7 @@ public void testFromPropsWithDefaultValue() { defaultValue.put(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, "10"); defaultValue.put(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG, "2000"); defaultValue.put(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG, "1"); + defaultValue.put(GroupConfig.STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG, "3000"); Properties props = new Properties(); props.put(GroupConfig.CONSUMER_SESSION_TIMEOUT_MS_CONFIG, "20"); @@ -252,6 +255,7 @@ public void testFromPropsWithDefaultValue() { assertEquals(10, config.getInt(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG)); assertEquals(2000, config.getInt(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG)); assertEquals(1, config.getInt(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG)); + assertEquals(3000, config.getInt(GroupConfig.STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG)); } @Test @@ -274,6 +278,7 @@ private Properties createValidGroupConfig() { props.put(GroupConfig.STREAMS_SESSION_TIMEOUT_MS_CONFIG, "50000"); props.put(GroupConfig.STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG, "6000"); props.put(GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG, "1"); + props.put(GroupConfig.STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG, "3000"); return props; } diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java index 91795064f4dd3..c447aec537495 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupCoordinatorConfigTest.java @@ -368,6 +368,7 @@ public static GroupCoordinatorConfig createGroupCoordinatorConfig( @Test public void testStreamsGroupInitialRebalanceDelayDefaultValue() { + Map configs = new HashMap<>(); GroupCoordinatorConfig config = createConfig(configs); assertEquals(3000, config.streamsGroupInitialRebalanceDelayMs()); assertEquals(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT, diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 02fc987e10182..680d8c1c27ef8 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -24173,4 +24173,168 @@ private Map getDefaultAssignmentConfigs() { // Use the same default value as GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT return Map.of("num.standby.replicas", String.valueOf(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT)); } + + @Test + public void testStreamsInitialDelaySchedulesTimerOnFirstJoin() { + String groupId = "test-group"; + String memberId = Uuid.randomUuid().toString(); + String topicName = "test-topic"; + Uuid topicId = Uuid.randomUuid(); + String subtopologyId = "subtopology-1"; + + Topology topology = new Topology().setSubtopologies(List.of( + new Subtopology().setSubtopologyId(subtopologyId).setSourceTopics(List.of(topicName)) + )); + + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withMetadataImage(new MetadataImageBuilder() + .addTopic(topicId, topicName, 3) + .buildCoordinatorMetadataImage()) + .build(); + + context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(5000) + .setTopology(topology) + .setProcessId(DEFAULT_PROCESS_ID) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of())); + + StreamsGroup group = context.groupMetadataManager.streamsGroup(groupId); + String timerKey = GroupMetadataManager.streamsInitialRebalanceKey(groupId); + + assertTrue(context.timer.isScheduled(timerKey)); + assertNotNull(context.timer.timeout(timerKey)); + assertEquals(2, group.groupEpoch()); + assertEquals(0, group.assignmentEpoch()); + } + + @Test + public void testStreamsEmptyAssignmentDuringInitialDelay() { + String groupId = "test-group"; + String memberId = Uuid.randomUuid().toString(); + String topicName = "test-topic"; + Uuid topicId = Uuid.randomUuid(); + String subtopologyId = "subtopology-1"; + + Topology topology = new Topology().setSubtopologies(List.of( + new Subtopology().setSubtopologyId(subtopologyId).setSourceTopics(List.of(topicName)) + )); + + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withMetadataImage(new MetadataImageBuilder() + .addTopic(topicId, topicName, 3) + .buildCoordinatorMetadataImage()) + .build(); + + context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(5000) + .setTopology(topology) + .setProcessId(DEFAULT_PROCESS_ID) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of())); + + CoordinatorResult secondHeartbeat = context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(2) + .setRebalanceTimeoutMs(5000) + .setTopology(topology) + .setProcessId(DEFAULT_PROCESS_ID) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of())); + + StreamsGroup group = context.groupMetadataManager.streamsGroup(groupId); + + assertEquals(TasksTuple.EMPTY, group.targetAssignment(memberId)); + assertEquals(0, group.assignmentEpoch()); + assertEquals(List.of(), secondHeartbeat.response().data().activeTasks()); + assertEquals(List.of(), secondHeartbeat.response().data().standbyTasks()); + } + + @Test + public void testStreamsNoDelayForNonInitialRebalances() { + String groupId = "test-group"; + String memberId1 = Uuid.randomUuid().toString(); + String memberId2 = Uuid.randomUuid().toString(); + String topicName = "test-topic"; + Uuid topicId = Uuid.randomUuid(); + String subtopologyId = "subtopology-1"; + + Topology topology = new Topology().setSubtopologies(List.of( + new Subtopology().setSubtopologyId(subtopologyId).setSourceTopics(List.of(topicName)) + )); + + MockTaskAssignor assignor = new MockTaskAssignor("sticky"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withStreamsGroupTaskAssignors(List.of(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(topicId, topicName, 3) + .buildCoordinatorMetadataImage()) + .build(); + + context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId1) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(5000) + .setTopology(topology) + .setProcessId(DEFAULT_PROCESS_ID) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of())); + + context.sleep(3000); + + assignor.prepareGroupAssignment(Map.of( + memberId1, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopologyId, 0, 1)), + memberId2, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, + TaskAssignmentTestUtil.mkTasks(subtopologyId, 2)) + )); + + context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId1) + .setMemberEpoch(2) + .setRebalanceTimeoutMs(5000) + .setTopology(topology) + .setProcessId(DEFAULT_PROCESS_ID) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of())); + + StreamsGroup group = context.groupMetadataManager.streamsGroup(groupId); + assertTrue(group.assignmentEpoch() > 0); + + String timerKey = GroupMetadataManager.streamsInitialRebalanceKey(groupId); + assertFalse(context.timer.isScheduled(timerKey)); + + context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId2) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(5000) + .setTopology(topology) + .setProcessId(DEFAULT_PROCESS_ID) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of())); + + assertFalse(context.timer.isScheduled(timerKey)); + } } From b0814e2f05d487594a02269a043bac3651c56ffa Mon Sep 17 00:00:00 2001 From: Jinhe Zhang Date: Tue, 28 Oct 2025 14:22:25 -0400 Subject: [PATCH 07/11] add test for coordinator timer --- .../runtime/CoordinatorRuntimeTest.java | 51 +++++++++++++++++++ 1 file changed, 51 insertions(+) diff --git a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java index dfbbdf048bc20..c9951ed579ddf 100644 --- a/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java +++ b/coordinator-common/src/test/java/org/apache/kafka/coordinator/common/runtime/CoordinatorRuntimeTest.java @@ -2351,6 +2351,57 @@ public void testTimerScheduleIfAbsent() throws InterruptedException { assertEquals(0, ctx.timer.size()); } + @Test + public void testTimerIsScheduled() throws InterruptedException { + MockTimer timer = new MockTimer(); + CoordinatorRuntime runtime = + new CoordinatorRuntime.Builder() + .withTime(timer.time()) + .withTimer(timer) + .withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT) + .withLoader(new MockCoordinatorLoader()) + .withEventProcessor(new DirectEventProcessor()) + .withPartitionWriter(new MockPartitionWriter()) + .withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier()) + .withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class)) + .withCoordinatorMetrics(mock(CoordinatorMetrics.class)) + .withSerializer(new StringSerializer()) + .withExecutorService(mock(ExecutorService.class)) + .build(); + + runtime.scheduleLoadOperation(TP, 10); + + CoordinatorRuntime.CoordinatorContext ctx = runtime.contextOrThrow(TP); + assertEquals(0, ctx.timer.size()); + + assertFalse(ctx.timer.isScheduled("timer-1")); + + ctx.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, false, + () -> new CoordinatorResult<>(List.of("record1"), null)); + + assertTrue(ctx.timer.isScheduled("timer-1")); + assertFalse(ctx.timer.isScheduled("timer-2")); + assertEquals(1, ctx.timer.size()); + + ctx.timer.schedule("timer-2", 20, TimeUnit.MILLISECONDS, false, + () -> new CoordinatorResult<>(List.of("record2"), null)); + + assertTrue(ctx.timer.isScheduled("timer-1")); + assertTrue(ctx.timer.isScheduled("timer-2")); + assertEquals(2, ctx.timer.size()); + + ctx.timer.cancel("timer-1"); + + assertFalse(ctx.timer.isScheduled("timer-1")); + assertTrue(ctx.timer.isScheduled("timer-2")); + assertEquals(1, ctx.timer.size()); + + timer.advanceClock(21); + + assertFalse(ctx.timer.isScheduled("timer-2")); + assertEquals(0, ctx.timer.size()); + } + @Test public void testStateChanges() throws Exception { MockTimer timer = new MockTimer(); From f402c0599d74bc70e61f82628b234795ccafe41e Mon Sep 17 00:00:00 2001 From: Jinhe Zhang Date: Wed, 29 Oct 2025 19:43:23 -0400 Subject: [PATCH 08/11] update existing tests --- .../group/GroupMetadataManager.java | 6 +- .../group/streams/StreamsGroupMember.java | 1 + .../group/GroupMetadataManagerTest.java | 258 ++++-------------- 3 files changed, 58 insertions(+), 207 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index 29adbbf6ede07..e8f24a28c3c3b 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -1985,9 +1985,9 @@ private CoordinatorResult stream // Actually bump the group epoch int groupEpoch = group.groupEpoch(); - boolean isInitialRebalance = (group.isEmpty() && groupEpoch == 0); + boolean isInitialRebalance = (bumpGroupEpoch && groupEpoch == 0); if (bumpGroupEpoch) { - if (isInitialRebalance) { + if (groupEpoch == 0) { groupEpoch += 2; } else { groupEpoch += 1; @@ -2019,7 +2019,7 @@ private CoordinatorResult stream boolean initialDelayActive = timer.isScheduled(streamsInitialRebalanceKey(groupId)); if (initialDelayActive && group.assignmentEpoch() == 0) { // During initial rebalance delay, return empty assignment to first joining members. - targetAssignmentEpoch = group.assignmentEpoch(); + targetAssignmentEpoch = groupEpoch; targetAssignment = TasksTuple.EMPTY; } else { targetAssignment = updateStreamsTargetAssignment( diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java index c440deaa1a966..461d843c1b158 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java @@ -288,6 +288,7 @@ public static Builder withDefaults(String memberId) { .setClientTags(Collections.emptyMap()) .setState(MemberState.STABLE) .setMemberEpoch(0) + .setPreviousMemberEpoch(0) .setAssignedTasks(TasksTuple.EMPTY) .setTasksPendingRevocation(TasksTuple.EMPTY) .setUserEndpoint(null); diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 680d8c1c27ef8..a8aa82fed6988 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -16233,6 +16233,7 @@ barTopicName, computeTopicHash(barTopicName, metadataImage) GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withStreamsGroupTaskAssignors(List.of(assignor)) .withMetadataImage(metadataImage) + .withConfig(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 0) .build(); assignor.prepareGroupAssignment(Map.of(memberId, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, @@ -16258,7 +16259,7 @@ barTopicName, computeTopicHash(barTopicName, metadataImage) assertResponseEquals( new StreamsGroupHeartbeatResponseData() .setMemberId(memberId) - .setMemberEpoch(1) + .setMemberEpoch(2) .setHeartbeatIntervalMs(5000) .setActiveTasks(List.of( new StreamsGroupHeartbeatResponseData.TaskIds() @@ -16275,7 +16276,7 @@ barTopicName, computeTopicHash(barTopicName, metadataImage) StreamsGroupMember expectedMember = streamsGroupMemberBuilderWithDefaults(memberId) .setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE) - .setMemberEpoch(1) + .setMemberEpoch(2) .setPreviousMemberEpoch(0) .setClientId(DEFAULT_CLIENT_ID) .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) @@ -16288,13 +16289,13 @@ barTopicName, computeTopicHash(barTopicName, metadataImage) List expectedRecords = List.of( StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, expectedMember), StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, topology), - StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 1, groupMetadataHash, 0, Map.of("num.standby.replicas", "0")), + StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 2, groupMetadataHash, 0, Map.of("num.standby.replicas", "0")), StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5), TaskAssignmentTestUtil.mkTasks(subtopology2, 0, 1, 2) )), - StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 1), + StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 2), StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, expectedMember) ); @@ -16320,6 +16321,7 @@ public void testJoinEmptyStreamsGroupAndDescribe() { GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withStreamsGroupTaskAssignors(List.of(assignor)) .withMetadataImage(metadataImage) + .withConfig(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 0) .build(); assignor.prepareGroupAssignment(Map.of(memberId, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, @@ -16344,7 +16346,7 @@ public void testJoinEmptyStreamsGroupAndDescribe() { assertResponseEquals( new StreamsGroupHeartbeatResponseData() .setMemberId(memberId) - .setMemberEpoch(1) + .setMemberEpoch(2) .setHeartbeatIntervalMs(5000) .setActiveTasks(List.of( new StreamsGroupHeartbeatResponseData.TaskIds() @@ -16358,7 +16360,7 @@ public void testJoinEmptyStreamsGroupAndDescribe() { StreamsGroupMember expectedMember = streamsGroupMemberBuilderWithDefaults(memberId) .setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE) - .setMemberEpoch(1) + .setMemberEpoch(2) .setPreviousMemberEpoch(0) .setClientId(DEFAULT_CLIENT_ID) .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) @@ -16373,7 +16375,7 @@ public void testJoinEmptyStreamsGroupAndDescribe() { List actualDescribedGroups = context.groupMetadataManager.streamsGroupDescribe(List.of(groupId), context.lastCommittedOffset); StreamsGroupDescribeResponseData.DescribedGroup expectedDescribedGroup = new StreamsGroupDescribeResponseData.DescribedGroup() .setGroupId(groupId) - .setAssignmentEpoch(1) + .setAssignmentEpoch(2) .setTopology( new StreamsGroupDescribeResponseData.Topology() .setEpoch(0) @@ -16388,7 +16390,7 @@ public void testJoinEmptyStreamsGroupAndDescribe() { TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1, 2, 3, 4, 5))) )) .setGroupState(StreamsGroupState.STABLE.toString()) - .setGroupEpoch(1); + .setGroupEpoch(2); assertEquals(1, actualDescribedGroups.size()); assertEquals(expectedDescribedGroup, actualDescribedGroups.get(0)); } @@ -16415,6 +16417,7 @@ public void testStreamsGroupMemberJoiningWithMissingSourceTopic() { GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withStreamsGroupTaskAssignors(List.of(assignor)) .withMetadataImage(metadataImage) + .withConfig(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 0) .build(); // Member joins the streams group. @@ -16437,7 +16440,7 @@ public void testStreamsGroupMemberJoiningWithMissingSourceTopic() { assertResponseEquals( new StreamsGroupHeartbeatResponseData() .setMemberId(memberId) - .setMemberEpoch(1) + .setMemberEpoch(2) .setHeartbeatIntervalMs(5000) .setActiveTasks(List.of()) .setStandbyTasks(List.of()) @@ -16451,7 +16454,7 @@ public void testStreamsGroupMemberJoiningWithMissingSourceTopic() { StreamsGroupMember expectedMember = streamsGroupMemberBuilderWithDefaults(memberId) .setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE) - .setMemberEpoch(1) + .setMemberEpoch(2) .setPreviousMemberEpoch(0) .setClientId(DEFAULT_CLIENT_ID) .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) @@ -16461,11 +16464,11 @@ public void testStreamsGroupMemberJoiningWithMissingSourceTopic() { List expectedRecords = List.of( StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, expectedMember), StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, topology), - StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 1, computeGroupHash(Map.of( + StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 2, computeGroupHash(Map.of( fooTopicName, computeTopicHash(fooTopicName, metadataImage) )), -1, Map.of("num.standby.replicas", "0")), StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, TasksTuple.EMPTY), - StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 1), + StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 2), StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, expectedMember) ); @@ -16496,6 +16499,7 @@ public void testStreamsGroupMemberJoiningWithMissingInternalTopic() { GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withStreamsGroupTaskAssignors(List.of(assignor)) .withMetadataImage(metadataImage) + .withConfig(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 0) .build(); // Member joins the streams group. @@ -16523,7 +16527,7 @@ public void testStreamsGroupMemberJoiningWithMissingInternalTopic() { assertResponseEquals( new StreamsGroupHeartbeatResponseData() .setMemberId(memberId) - .setMemberEpoch(1) + .setMemberEpoch(2) .setHeartbeatIntervalMs(5000) .setActiveTasks(List.of()) .setStandbyTasks(List.of()) @@ -16537,7 +16541,7 @@ public void testStreamsGroupMemberJoiningWithMissingInternalTopic() { StreamsGroupMember expectedMember = streamsGroupMemberBuilderWithDefaults(memberId) .setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE) - .setMemberEpoch(1) + .setMemberEpoch(2) .setPreviousMemberEpoch(0) .setClientId(DEFAULT_CLIENT_ID) .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) @@ -16547,11 +16551,11 @@ public void testStreamsGroupMemberJoiningWithMissingInternalTopic() { List expectedRecords = List.of( StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, expectedMember), StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, topology), - StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 1, computeGroupHash(Map.of( + StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 2, computeGroupHash(Map.of( fooTopicName, computeTopicHash(fooTopicName, metadataImage) )), -1, Map.of("num.standby.replicas", "0")), StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, TasksTuple.EMPTY), - StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 1), + StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 2), StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, expectedMember) ); @@ -16584,6 +16588,7 @@ public void testStreamsGroupMemberJoiningWithIncorrectlyPartitionedTopic() { GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withStreamsGroupTaskAssignors(List.of(assignor)) .withMetadataImage(metadataImage) + .withConfig(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 0) .build(); // Member joins the streams group. @@ -16606,7 +16611,7 @@ public void testStreamsGroupMemberJoiningWithIncorrectlyPartitionedTopic() { assertResponseEquals( new StreamsGroupHeartbeatResponseData() .setMemberId(memberId) - .setMemberEpoch(1) + .setMemberEpoch(2) .setHeartbeatIntervalMs(5000) .setActiveTasks(List.of()) .setStandbyTasks(List.of()) @@ -16620,7 +16625,7 @@ public void testStreamsGroupMemberJoiningWithIncorrectlyPartitionedTopic() { StreamsGroupMember expectedMember = streamsGroupMemberBuilderWithDefaults(memberId) .setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE) - .setMemberEpoch(1) + .setMemberEpoch(2) .setPreviousMemberEpoch(0) .setClientId(DEFAULT_CLIENT_ID) .setClientHost(DEFAULT_CLIENT_ADDRESS.toString()) @@ -16630,12 +16635,12 @@ public void testStreamsGroupMemberJoiningWithIncorrectlyPartitionedTopic() { List expectedRecords = List.of( StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(groupId, expectedMember), StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(groupId, topology), - StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 1, computeGroupHash(Map.of( + StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 2, computeGroupHash(Map.of( fooTopicName, computeTopicHash(fooTopicName, metadataImage), barTopicName, computeTopicHash(barTopicName, metadataImage) )), -1, Map.of("num.standby.replicas", "0")), StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(groupId, memberId, TasksTuple.EMPTY), - StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 1), + StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(groupId, 2), StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(groupId, expectedMember) ); @@ -17330,6 +17335,7 @@ public void testStreamsGroupHeartbeatPartialResponseWhenNothingChanges() { .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 2) .buildCoordinatorMetadataImage()) + .withConfig(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 0) .build(); // Prepare new assignment for the group. @@ -17353,7 +17359,7 @@ public void testStreamsGroupHeartbeatPartialResponseWhenNothingChanges() { assertResponseEquals( new StreamsGroupHeartbeatResponseData() .setMemberId(memberId) - .setMemberEpoch(1) + .setMemberEpoch(2) .setHeartbeatIntervalMs(5000) .setActiveTasks(List.of( new StreamsGroupHeartbeatResponseData.TaskIds() @@ -17374,7 +17380,7 @@ public void testStreamsGroupHeartbeatPartialResponseWhenNothingChanges() { assertResponseEquals( new StreamsGroupHeartbeatResponseData() .setMemberId(memberId) - .setMemberEpoch(1) + .setMemberEpoch(2) .setHeartbeatIntervalMs(5000) .setEndpointInformationEpoch(-1), result.response().data() @@ -17940,6 +17946,7 @@ public void testStreamsTaskAssignorExceptionOnRegularHeartbeat() { .addTopic(fooTopicId, fooTopicName, 6) .addTopic(barTopicId, barTopicName, 3) .buildCoordinatorMetadataImage()) + .withConfig(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 0) .build(); // Member 1 joins the streams group. The request fails because the @@ -17955,7 +17962,7 @@ public void testStreamsTaskAssignorExceptionOnRegularHeartbeat() { .setActiveTasks(List.of()) .setStandbyTasks(List.of()) .setWarmupTasks(List.of()))); - assertEquals("Failed to compute a new target assignment for epoch 1: Assignment failed.", e.getMessage()); + assertEquals("Failed to compute a new target assignment for epoch 2: Assignment failed.", e.getMessage()); } @Test @@ -18198,6 +18205,7 @@ public void testStreamsSessionTimeoutLifecycle() { .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 6) .buildCoordinatorMetadataImage()) + .withConfig(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 0) .build(); assignor.prepareGroupAssignment(Map.of(memberId, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, @@ -18216,7 +18224,7 @@ public void testStreamsSessionTimeoutLifecycle() { .setActiveTasks(List.of()) .setStandbyTasks(List.of()) .setWarmupTasks(List.of())); - assertEquals(1, result.response().data().memberEpoch()); + assertEquals(2, result.response().data().memberEpoch()); // Verify that there is a session time. context.assertSessionTimeout(groupId, memberId, 45000); @@ -18233,7 +18241,7 @@ public void testStreamsSessionTimeoutLifecycle() { .setGroupId(groupId) .setMemberId(memberId) .setMemberEpoch(result.response().data().memberEpoch())); - assertEquals(1, result.response().data().memberEpoch()); + assertEquals(2, result.response().data().memberEpoch()); // Verify that there is a session time. context.assertSessionTimeout(groupId, memberId, 45000); @@ -18278,6 +18286,7 @@ fooTopicName, computeTopicHash(fooTopicName, metadataImage) GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withStreamsGroupTaskAssignors(List.of(assignor)) .withMetadataImage(metadataImage) + .withConfig(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 0) .build(); assignor.prepareGroupAssignment(Map.of(memberId, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, @@ -18296,7 +18305,7 @@ fooTopicName, computeTopicHash(fooTopicName, metadataImage) .setActiveTasks(List.of()) .setStandbyTasks(List.of()) .setWarmupTasks(List.of())); - assertEquals(1, result.response().data().memberEpoch()); + assertEquals(2, result.response().data().memberEpoch()); // Verify that there is a session time. context.assertSessionTimeout(groupId, memberId, 45000); @@ -18313,7 +18322,7 @@ fooTopicName, computeTopicHash(fooTopicName, metadataImage) StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord(groupId, memberId), StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord(groupId, memberId), StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(groupId, memberId), - StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 2, groupMetadataHash, 0, Map.of("num.standby.replicas", "0")) + StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 3, groupMetadataHash, 0, Map.of("num.standby.replicas", "0")) ) ) )), @@ -18343,6 +18352,7 @@ public void testStreamsRebalanceTimeoutLifecycle() { .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 3) .buildCoordinatorMetadataImage()) + .withConfig(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 0) .build(); assignor.prepareGroupAssignment(Map.of(memberId1, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, @@ -18365,7 +18375,7 @@ public void testStreamsRebalanceTimeoutLifecycle() { assertResponseEquals( new StreamsGroupHeartbeatResponseData() .setMemberId(memberId1) - .setMemberEpoch(1) + .setMemberEpoch(2) .setHeartbeatIntervalMs(5000) .setActiveTasks(List.of( new StreamsGroupHeartbeatResponseData.TaskIds() @@ -18406,7 +18416,7 @@ public void testStreamsRebalanceTimeoutLifecycle() { assertResponseEquals( new StreamsGroupHeartbeatResponseData() .setMemberId(memberId2) - .setMemberEpoch(2) + .setMemberEpoch(3) .setHeartbeatIntervalMs(5000) .setActiveTasks(List.of()) .setStandbyTasks(List.of()) @@ -18426,13 +18436,13 @@ public void testStreamsRebalanceTimeoutLifecycle() { new StreamsGroupHeartbeatRequestData() .setGroupId(groupId) .setMemberId(memberId1) - .setMemberEpoch(1) + .setMemberEpoch(2) .setRebalanceTimeoutMs(12000)); assertResponseEquals( new StreamsGroupHeartbeatResponseData() .setMemberId(memberId1) - .setMemberEpoch(1) + .setMemberEpoch(2) .setHeartbeatIntervalMs(5000) .setActiveTasks(List.of( new StreamsGroupHeartbeatResponseData.TaskIds() @@ -18459,7 +18469,7 @@ public void testStreamsRebalanceTimeoutLifecycle() { new StreamsGroupHeartbeatRequestData() .setGroupId(groupId) .setMemberId(memberId1) - .setMemberEpoch(1) + .setMemberEpoch(2) .setActiveTasks(List.of(new StreamsGroupHeartbeatRequestData.TaskIds() .setSubtopologyId(subtopology1) .setPartitions(List.of(0, 1)))) @@ -18469,7 +18479,7 @@ public void testStreamsRebalanceTimeoutLifecycle() { assertResponseEquals( new StreamsGroupHeartbeatResponseData() .setMemberId(memberId1) - .setMemberEpoch(2) + .setMemberEpoch(3) .setHeartbeatIntervalMs(5000) .setEndpointInformationEpoch(0), result.response().data() @@ -18506,6 +18516,7 @@ fooTopicName, computeTopicHash(fooTopicName, metadataImage) GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withStreamsGroupTaskAssignors(List.of(assignor)) .withMetadataImage(metadataImage) + .withConfig(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 0) .build(); assignor.prepareGroupAssignment( @@ -18527,7 +18538,7 @@ fooTopicName, computeTopicHash(fooTopicName, metadataImage) assertResponseEquals( new StreamsGroupHeartbeatResponseData() .setMemberId(memberId1) - .setMemberEpoch(1) + .setMemberEpoch(2) .setHeartbeatIntervalMs(5000) .setActiveTasks(List.of( new StreamsGroupHeartbeatResponseData.TaskIds() @@ -18568,7 +18579,7 @@ fooTopicName, computeTopicHash(fooTopicName, metadataImage) assertResponseEquals( new StreamsGroupHeartbeatResponseData() .setMemberId(memberId2) - .setMemberEpoch(2) + .setMemberEpoch(3) .setHeartbeatIntervalMs(5000) .setActiveTasks(List.of()) .setStandbyTasks(List.of()) @@ -18744,6 +18755,7 @@ public void testStreamsGroupEndpointInformationOnlyWhenEpochGreater() { .withMetadataImage(new MetadataImageBuilder() .addTopic(fooTopicId, fooTopicName, 2) .buildCoordinatorMetadataImage()) + .withConfig(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 0) .build(); // Prepare new assignment for the group. @@ -18769,7 +18781,7 @@ public void testStreamsGroupEndpointInformationOnlyWhenEpochGreater() { assertResponseEquals( new StreamsGroupHeartbeatResponseData() .setMemberId(memberId) - .setMemberEpoch(1) + .setMemberEpoch(2) .setHeartbeatIntervalMs(5000) .setActiveTasks(List.of( new StreamsGroupHeartbeatResponseData.TaskIds() @@ -18969,6 +18981,7 @@ public void testStreamsGroupHeartbeatWithEmptyClassicGroup() { MockTaskAssignor assignor = new MockTaskAssignor("sticky"); GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() .withStreamsGroupTaskAssignors(List.of(assignor)) + .withConfig(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 0) .build(); ClassicGroup classicGroup = new ClassicGroup( new LogContext(), @@ -18991,7 +19004,7 @@ public void testStreamsGroupHeartbeatWithEmptyClassicGroup() { StreamsGroupMember expectedMember = StreamsGroupMember.Builder.withDefaults(memberId) .setState(org.apache.kafka.coordinator.group.streams.MemberState.STABLE) - .setMemberEpoch(1) + .setMemberEpoch(2) .setPreviousMemberEpoch(0) .setRebalanceTimeoutMs(5000) .setClientId(DEFAULT_CLIENT_ID) @@ -19008,9 +19021,9 @@ public void testStreamsGroupHeartbeatWithEmptyClassicGroup() { GroupCoordinatorRecordHelpers.newGroupMetadataTombstoneRecord(classicGroupId), StreamsCoordinatorRecordHelpers.newStreamsGroupMemberRecord(classicGroupId, expectedMember), StreamsCoordinatorRecordHelpers.newStreamsGroupTopologyRecord(classicGroupId, topology), - StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(classicGroupId, 1, 0, -1, Map.of("num.standby.replicas", "0")), + StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(classicGroupId, 2, 0, -1, Map.of("num.standby.replicas", "0")), StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentRecord(classicGroupId, memberId, TasksTuple.EMPTY), - StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(classicGroupId, 1), + StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentEpochRecord(classicGroupId, 2), StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentRecord(classicGroupId, expectedMember) ), result.records() @@ -19308,6 +19321,7 @@ public void testStreamsGroupDynamicConfigs() { .addTopic(fooTopicId, fooTopicName, 6) .addRacks() .buildCoordinatorMetadataImage()) + .withConfig(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 0) .build(); assignor.prepareGroupAssignment( @@ -19326,7 +19340,7 @@ public void testStreamsGroupDynamicConfigs() { .setActiveTasks(List.of()) .setStandbyTasks(List.of()) .setWarmupTasks(List.of())); - assertEquals(1, result.response().data().memberEpoch()); + assertEquals(2, result.response().data().memberEpoch()); assertEquals(Map.of("num.standby.replicas", "0"), assignor.lastPassedAssignmentConfigs()); // Verify heartbeat interval @@ -24173,168 +24187,4 @@ private Map getDefaultAssignmentConfigs() { // Use the same default value as GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT return Map.of("num.standby.replicas", String.valueOf(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT)); } - - @Test - public void testStreamsInitialDelaySchedulesTimerOnFirstJoin() { - String groupId = "test-group"; - String memberId = Uuid.randomUuid().toString(); - String topicName = "test-topic"; - Uuid topicId = Uuid.randomUuid(); - String subtopologyId = "subtopology-1"; - - Topology topology = new Topology().setSubtopologies(List.of( - new Subtopology().setSubtopologyId(subtopologyId).setSourceTopics(List.of(topicName)) - )); - - GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withMetadataImage(new MetadataImageBuilder() - .addTopic(topicId, topicName, 3) - .buildCoordinatorMetadataImage()) - .build(); - - context.streamsGroupHeartbeat( - new StreamsGroupHeartbeatRequestData() - .setGroupId(groupId) - .setMemberId(memberId) - .setMemberEpoch(0) - .setRebalanceTimeoutMs(5000) - .setTopology(topology) - .setProcessId(DEFAULT_PROCESS_ID) - .setActiveTasks(List.of()) - .setStandbyTasks(List.of()) - .setWarmupTasks(List.of())); - - StreamsGroup group = context.groupMetadataManager.streamsGroup(groupId); - String timerKey = GroupMetadataManager.streamsInitialRebalanceKey(groupId); - - assertTrue(context.timer.isScheduled(timerKey)); - assertNotNull(context.timer.timeout(timerKey)); - assertEquals(2, group.groupEpoch()); - assertEquals(0, group.assignmentEpoch()); - } - - @Test - public void testStreamsEmptyAssignmentDuringInitialDelay() { - String groupId = "test-group"; - String memberId = Uuid.randomUuid().toString(); - String topicName = "test-topic"; - Uuid topicId = Uuid.randomUuid(); - String subtopologyId = "subtopology-1"; - - Topology topology = new Topology().setSubtopologies(List.of( - new Subtopology().setSubtopologyId(subtopologyId).setSourceTopics(List.of(topicName)) - )); - - GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withMetadataImage(new MetadataImageBuilder() - .addTopic(topicId, topicName, 3) - .buildCoordinatorMetadataImage()) - .build(); - - context.streamsGroupHeartbeat( - new StreamsGroupHeartbeatRequestData() - .setGroupId(groupId) - .setMemberId(memberId) - .setMemberEpoch(0) - .setRebalanceTimeoutMs(5000) - .setTopology(topology) - .setProcessId(DEFAULT_PROCESS_ID) - .setActiveTasks(List.of()) - .setStandbyTasks(List.of()) - .setWarmupTasks(List.of())); - - CoordinatorResult secondHeartbeat = context.streamsGroupHeartbeat( - new StreamsGroupHeartbeatRequestData() - .setGroupId(groupId) - .setMemberId(memberId) - .setMemberEpoch(2) - .setRebalanceTimeoutMs(5000) - .setTopology(topology) - .setProcessId(DEFAULT_PROCESS_ID) - .setActiveTasks(List.of()) - .setStandbyTasks(List.of()) - .setWarmupTasks(List.of())); - - StreamsGroup group = context.groupMetadataManager.streamsGroup(groupId); - - assertEquals(TasksTuple.EMPTY, group.targetAssignment(memberId)); - assertEquals(0, group.assignmentEpoch()); - assertEquals(List.of(), secondHeartbeat.response().data().activeTasks()); - assertEquals(List.of(), secondHeartbeat.response().data().standbyTasks()); - } - - @Test - public void testStreamsNoDelayForNonInitialRebalances() { - String groupId = "test-group"; - String memberId1 = Uuid.randomUuid().toString(); - String memberId2 = Uuid.randomUuid().toString(); - String topicName = "test-topic"; - Uuid topicId = Uuid.randomUuid(); - String subtopologyId = "subtopology-1"; - - Topology topology = new Topology().setSubtopologies(List.of( - new Subtopology().setSubtopologyId(subtopologyId).setSourceTopics(List.of(topicName)) - )); - - MockTaskAssignor assignor = new MockTaskAssignor("sticky"); - GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() - .withStreamsGroupTaskAssignors(List.of(assignor)) - .withMetadataImage(new MetadataImageBuilder() - .addTopic(topicId, topicName, 3) - .buildCoordinatorMetadataImage()) - .build(); - - context.streamsGroupHeartbeat( - new StreamsGroupHeartbeatRequestData() - .setGroupId(groupId) - .setMemberId(memberId1) - .setMemberEpoch(0) - .setRebalanceTimeoutMs(5000) - .setTopology(topology) - .setProcessId(DEFAULT_PROCESS_ID) - .setActiveTasks(List.of()) - .setStandbyTasks(List.of()) - .setWarmupTasks(List.of())); - - context.sleep(3000); - - assignor.prepareGroupAssignment(Map.of( - memberId1, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, - TaskAssignmentTestUtil.mkTasks(subtopologyId, 0, 1)), - memberId2, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, - TaskAssignmentTestUtil.mkTasks(subtopologyId, 2)) - )); - - context.streamsGroupHeartbeat( - new StreamsGroupHeartbeatRequestData() - .setGroupId(groupId) - .setMemberId(memberId1) - .setMemberEpoch(2) - .setRebalanceTimeoutMs(5000) - .setTopology(topology) - .setProcessId(DEFAULT_PROCESS_ID) - .setActiveTasks(List.of()) - .setStandbyTasks(List.of()) - .setWarmupTasks(List.of())); - - StreamsGroup group = context.groupMetadataManager.streamsGroup(groupId); - assertTrue(group.assignmentEpoch() > 0); - - String timerKey = GroupMetadataManager.streamsInitialRebalanceKey(groupId); - assertFalse(context.timer.isScheduled(timerKey)); - - context.streamsGroupHeartbeat( - new StreamsGroupHeartbeatRequestData() - .setGroupId(groupId) - .setMemberId(memberId2) - .setMemberEpoch(0) - .setRebalanceTimeoutMs(5000) - .setTopology(topology) - .setProcessId(DEFAULT_PROCESS_ID) - .setActiveTasks(List.of()) - .setStandbyTasks(List.of()) - .setWarmupTasks(List.of())); - - assertFalse(context.timer.isScheduled(timerKey)); - } } From 6ea4ee5b70c29f07a2f18786f6bf8da2806427d1 Mon Sep 17 00:00:00 2001 From: Jinhe Zhang Date: Thu, 30 Oct 2025 14:00:14 -0400 Subject: [PATCH 09/11] test for rebalance delay logic --- .../group/GroupMetadataManager.java | 2 +- .../streams/CurrentAssignmentBuilder.java | 7 +- .../group/GroupMetadataManagerTest.java | 81 ++++++++++++++++++- 3 files changed, 84 insertions(+), 6 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index e8f24a28c3c3b..b50c24134aca2 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -3717,7 +3717,7 @@ private StreamsGroupMember maybeReconcile( List ownedWarmupTasks, List records ) { - if (member.isReconciledTo(targetAssignmentEpoch)) { + if (member.isReconciledTo(targetAssignmentEpoch) && member.assignedTasks().equals(targetAssignment)) { return member; } diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java index 2330497a7be39..6ab8f9d014d26 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java @@ -161,8 +161,11 @@ public StreamsGroupMember build() { case STABLE: // When the member is in the STABLE state, we verify if a newer // epoch (or target assignment) is available. If it is, we can - // reconcile the member towards it. Otherwise, we return. - if (member.memberEpoch() != targetAssignmentEpoch) { + // reconcile the member towards it. If the epoch is the same but + // the target assignment has changed (e.g., after initial rebalance + // delay fires), we must still reconcile to propagate the new tasks. + if (member.memberEpoch() != targetAssignmentEpoch + || !member.assignedTasks().equals(targetAssignment)) { return computeNextAssignment( member.memberEpoch(), member.assignedTasks() diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index a8aa82fed6988..2c219e8350bc9 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -17387,6 +17387,81 @@ public void testStreamsGroupHeartbeatPartialResponseWhenNothingChanges() { ); } + @Test + public void testStreamsInitialRebalanceDelay_EmptyDuringDelay_AssignsAfterTimer() { + String groupId = "fooup"; + String memberId = Uuid.randomUuid().toString(); + String subtopology1 = "subtopology1"; + String fooTopicName = "foo"; + Uuid fooTopicId = Uuid.randomUuid(); + Topology topology = new Topology().setSubtopologies(List.of( + new Subtopology().setSubtopologyId(subtopology1).setSourceTopics(List.of(fooTopicName)) + )); + + MockTaskAssignor assignor = new MockTaskAssignor("sticky"); + GroupMetadataManagerTestContext context = new GroupMetadataManagerTestContext.Builder() + .withStreamsGroupTaskAssignors(List.of(assignor)) + .withMetadataImage(new MetadataImageBuilder() + .addTopic(fooTopicId, fooTopicName, 2) + .buildCoordinatorMetadataImage()) + .withConfig(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 1000) + .build(); + + assignor.prepareGroupAssignment( + Map.of(memberId, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1)))); + + CoordinatorResult result; + + result = context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(0) + .setRebalanceTimeoutMs(1500) + .setTopology(topology) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of())); + + assertResponseEquals( + new StreamsGroupHeartbeatResponseData() + .setMemberId(memberId) + .setMemberEpoch(2) + .setHeartbeatIntervalMs(5000) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of()) + .setPartitionsByUserEndpoint(null) + .setEndpointInformationEpoch(-1), + result.response().data() + ); + + context.sleep(10000); + + result = context.streamsGroupHeartbeat( + new StreamsGroupHeartbeatRequestData() + .setGroupId(groupId) + .setMemberId(memberId) + .setMemberEpoch(2) + .setActiveTasks(List.of()) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of())); + + assertResponseEquals( + new StreamsGroupHeartbeatResponseData() + .setMemberId(memberId) + .setMemberEpoch(2) + .setHeartbeatIntervalMs(5000) + .setActiveTasks(List.of( + new StreamsGroupHeartbeatResponseData.TaskIds() + .setSubtopologyId(subtopology1) + .setPartitions(List.of(0, 1)))) + .setStandbyTasks(List.of()) + .setWarmupTasks(List.of()), + result.response().data() + ); + } + @Test public void testStreamsReconciliationProcess() { String groupId = "fooup"; @@ -18599,12 +18674,12 @@ fooTopicName, computeTopicHash(fooTopicName, metadataImage) new StreamsGroupHeartbeatRequestData() .setGroupId(groupId) .setMemberId(memberId1) - .setMemberEpoch(1)); + .setMemberEpoch(2)); assertResponseEquals( new StreamsGroupHeartbeatResponseData() .setMemberId(memberId1) - .setMemberEpoch(1) + .setMemberEpoch(2) .setHeartbeatIntervalMs(5000) .setActiveTasks(List.of( new StreamsGroupHeartbeatResponseData.TaskIds() @@ -18628,7 +18703,7 @@ fooTopicName, computeTopicHash(fooTopicName, metadataImage) StreamsCoordinatorRecordHelpers.newStreamsGroupCurrentAssignmentTombstoneRecord(groupId, memberId1), StreamsCoordinatorRecordHelpers.newStreamsGroupTargetAssignmentTombstoneRecord(groupId, memberId1), StreamsCoordinatorRecordHelpers.newStreamsGroupMemberTombstoneRecord(groupId, memberId1), - StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 3, groupMetadataHash, 0, Map.of("num.standby.replicas", "0")) + StreamsCoordinatorRecordHelpers.newStreamsGroupMetadataRecord(groupId, 4, groupMetadataHash, 0, Map.of("num.standby.replicas", "0")) ) ) )), From 6867fca0c268e5eed1d7653ca8fc010f10e699a3 Mon Sep 17 00:00:00 2001 From: Jinhe Zhang Date: Thu, 30 Oct 2025 15:06:07 -0400 Subject: [PATCH 10/11] remove empty line --- .../apache/kafka/coordinator/group/GroupMetadataManager.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index b50c24134aca2..f8668a5727c75 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -8815,8 +8815,7 @@ static String classicGroupJoinKey(String groupId) { static String classicGroupSyncKey(String groupId) { return "sync-" + groupId; } - - + /** * Generate a streams group initial rebalance key for the timer. * From c94fed0ae77da27c3d459746d1df72d6e9003b86 Mon Sep 17 00:00:00 2001 From: Jinhe Zhang Date: Fri, 31 Oct 2025 11:14:37 -0400 Subject: [PATCH 11/11] fix targetassignment epoch bug --- .../kafka/coordinator/group/GroupMetadataManager.java | 4 ++-- .../group/streams/CurrentAssignmentBuilder.java | 3 +-- .../coordinator/group/GroupMetadataManagerTest.java | 10 +++++----- 3 files changed, 8 insertions(+), 9 deletions(-) diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java index f8668a5727c75..18a1fa26f7e0b 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/GroupMetadataManager.java @@ -2019,7 +2019,7 @@ private CoordinatorResult stream boolean initialDelayActive = timer.isScheduled(streamsInitialRebalanceKey(groupId)); if (initialDelayActive && group.assignmentEpoch() == 0) { // During initial rebalance delay, return empty assignment to first joining members. - targetAssignmentEpoch = groupEpoch; + targetAssignmentEpoch = groupEpoch - 1; targetAssignment = TasksTuple.EMPTY; } else { targetAssignment = updateStreamsTargetAssignment( @@ -8815,7 +8815,7 @@ static String classicGroupJoinKey(String groupId) { static String classicGroupSyncKey(String groupId) { return "sync-" + groupId; } - + /** * Generate a streams group initial rebalance key for the timer. * diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java index 6ab8f9d014d26..c974409d6cbe3 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java @@ -164,8 +164,7 @@ public StreamsGroupMember build() { // reconcile the member towards it. If the epoch is the same but // the target assignment has changed (e.g., after initial rebalance // delay fires), we must still reconcile to propagate the new tasks. - if (member.memberEpoch() != targetAssignmentEpoch - || !member.assignedTasks().equals(targetAssignment)) { + if (member.memberEpoch() != targetAssignmentEpoch) { return computeNextAssignment( member.memberEpoch(), member.assignedTasks() diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java index 2c219e8350bc9..13f7b9ed71083 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/GroupMetadataManagerTest.java @@ -17407,9 +17407,6 @@ public void testStreamsInitialRebalanceDelay_EmptyDuringDelay_AssignsAfterTimer( .withConfig(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, 1000) .build(); - assignor.prepareGroupAssignment( - Map.of(memberId, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1)))); - CoordinatorResult result; result = context.streamsGroupHeartbeat( @@ -17426,7 +17423,7 @@ public void testStreamsInitialRebalanceDelay_EmptyDuringDelay_AssignsAfterTimer( assertResponseEquals( new StreamsGroupHeartbeatResponseData() .setMemberId(memberId) - .setMemberEpoch(2) + .setMemberEpoch(1) .setHeartbeatIntervalMs(5000) .setActiveTasks(List.of()) .setStandbyTasks(List.of()) @@ -17436,13 +17433,16 @@ public void testStreamsInitialRebalanceDelay_EmptyDuringDelay_AssignsAfterTimer( result.response().data() ); + assignor.prepareGroupAssignment( + Map.of(memberId, TaskAssignmentTestUtil.mkTasksTuple(TaskRole.ACTIVE, TaskAssignmentTestUtil.mkTasks(subtopology1, 0, 1)))); + context.sleep(10000); result = context.streamsGroupHeartbeat( new StreamsGroupHeartbeatRequestData() .setGroupId(groupId) .setMemberId(memberId) - .setMemberEpoch(2) + .setMemberEpoch(1) .setActiveTasks(List.of()) .setStandbyTasks(List.of()) .setWarmupTasks(List.of()));