Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -441,6 +441,11 @@ public void cancel(String key) {
if (prevTask != null) prevTask.cancel();
}

@Override
public boolean isScheduled(String key) {
return tasks.containsKey(key);
}

public void cancelAll() {
Iterator<Map.Entry<String, TimerTask>> iterator = tasks.entrySet().iterator();
while (iterator.hasNext()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,4 +84,12 @@ interface TimeoutOperation<T, U> {
* @param key The key.
*/
void cancel(String key);

/**
* Check if an operation with the given key is scheduled.
*
* @param key The key.
* @return True if an operation with the key is scheduled, false otherwise.
*/
boolean isScheduled(String key);
}
Original file line number Diff line number Diff line change
Expand Up @@ -2351,6 +2351,57 @@ public void testTimerScheduleIfAbsent() throws InterruptedException {
assertEquals(0, ctx.timer.size());
}

@Test
public void testTimerIsScheduled() throws InterruptedException {
MockTimer timer = new MockTimer();
CoordinatorRuntime<MockCoordinatorShard, String> runtime =
new CoordinatorRuntime.Builder<MockCoordinatorShard, String>()
.withTime(timer.time())
.withTimer(timer)
.withDefaultWriteTimeOut(DEFAULT_WRITE_TIMEOUT)
.withLoader(new MockCoordinatorLoader())
.withEventProcessor(new DirectEventProcessor())
.withPartitionWriter(new MockPartitionWriter())
.withCoordinatorShardBuilderSupplier(new MockCoordinatorShardBuilderSupplier())
.withCoordinatorRuntimeMetrics(mock(CoordinatorRuntimeMetrics.class))
.withCoordinatorMetrics(mock(CoordinatorMetrics.class))
.withSerializer(new StringSerializer())
.withExecutorService(mock(ExecutorService.class))
.build();

runtime.scheduleLoadOperation(TP, 10);

CoordinatorRuntime<MockCoordinatorShard, String>.CoordinatorContext ctx = runtime.contextOrThrow(TP);
assertEquals(0, ctx.timer.size());

assertFalse(ctx.timer.isScheduled("timer-1"));

ctx.timer.schedule("timer-1", 10, TimeUnit.MILLISECONDS, false,
() -> new CoordinatorResult<>(List.of("record1"), null));

assertTrue(ctx.timer.isScheduled("timer-1"));
assertFalse(ctx.timer.isScheduled("timer-2"));
assertEquals(1, ctx.timer.size());

ctx.timer.schedule("timer-2", 20, TimeUnit.MILLISECONDS, false,
() -> new CoordinatorResult<>(List.of("record2"), null));

assertTrue(ctx.timer.isScheduled("timer-1"));
assertTrue(ctx.timer.isScheduled("timer-2"));
assertEquals(2, ctx.timer.size());

ctx.timer.cancel("timer-1");

assertFalse(ctx.timer.isScheduled("timer-1"));
assertTrue(ctx.timer.isScheduled("timer-2"));
assertEquals(1, ctx.timer.size());

timer.advanceClock(21);

assertFalse(ctx.timer.isScheduled("timer-2"));
assertEquals(0, ctx.timer.size());
}

@Test
public void testStateChanges() throws Exception {
MockTimer timer = new MockTimer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,14 @@ public void cancel(String key) {
}
}

/**
* Checks if a timeout with the given key is scheduled.
*/
@Override
public boolean isScheduled(String key) {
return timeoutMap.containsKey(key);
}

/**
* @return True if a timeout with the key exists; false otherwise.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,8 @@ public final class GroupConfig extends AbstractConfig {

public static final String STREAMS_NUM_STANDBY_REPLICAS_CONFIG = "streams.num.standby.replicas";

public static final String STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG = "streams.initial.rebalance.delay.ms";

public final int consumerSessionTimeoutMs;

public final int consumerHeartbeatIntervalMs;
Expand All @@ -93,6 +95,8 @@ public final class GroupConfig extends AbstractConfig {

public final int streamsNumStandbyReplicas;

public final int streamsInitialRebalanceDelayMs;

public final String shareIsolationLevel;

private static final ConfigDef CONFIG = new ConfigDef()
Expand Down Expand Up @@ -155,7 +159,13 @@ public final class GroupConfig extends AbstractConfig {
GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT,
atLeast(0),
MEDIUM,
GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_DOC);
GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_DOC)
.define(STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG,
INT,
GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT,
atLeast(0),
MEDIUM,
GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DOC);

public GroupConfig(Map<?, ?> props) {
super(CONFIG, props, false);
Expand All @@ -168,6 +178,7 @@ public GroupConfig(Map<?, ?> props) {
this.streamsSessionTimeoutMs = getInt(STREAMS_SESSION_TIMEOUT_MS_CONFIG);
this.streamsHeartbeatIntervalMs = getInt(STREAMS_HEARTBEAT_INTERVAL_MS_CONFIG);
this.streamsNumStandbyReplicas = getInt(STREAMS_NUM_STANDBY_REPLICAS_CONFIG);
this.streamsInitialRebalanceDelayMs = getInt(STREAMS_INITIAL_REBALANCE_DELAY_MS_CONFIG);
this.shareIsolationLevel = getString(SHARE_ISOLATION_LEVEL_CONFIG);
}

Expand Down Expand Up @@ -379,6 +390,13 @@ public int streamsNumStandbyReplicas() {
return streamsNumStandbyReplicas;
}

/**
* The initial rebalance delay for streams groups.
*/
public int streamsInitialRebalanceDelayMs() {
return streamsInitialRebalanceDelayMs;
}

/**
* The share group isolation level.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,10 @@ public class GroupCoordinatorConfig {
public static final int STREAMS_GROUP_MAX_STANDBY_REPLICAS_DEFAULT = 2;
public static final String STREAMS_GROUP_MAX_STANDBY_REPLICAS_DOC = "The maximum allowed value for the group-level configuration of " + GroupConfig.STREAMS_NUM_STANDBY_REPLICAS_CONFIG;

public static final String STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG = "group.streams.initial.rebalance.delay.ms";
public static final int STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT = 3000;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@mjsax Are we good with a default of 3 seconds for delaying the initial rebalance?

public static final String STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DOC = "The amount of time the group coordinator will wait for more streams clients to join a new group before performing the first rebalance. A longer delay means potentially fewer rebalances.";

public static final String SHARE_GROUP_INITIALIZE_RETRY_INTERVAL_MS_CONFIG = "group.share.initialize.retry.interval.ms";
// Because persister retries with exp backoff 5 times and upper cap of 30 secs.
public static final int SHARE_GROUP_INITIALIZE_RETRY_INTERVAL_MS_DEFAULT = 30_000;
Expand Down Expand Up @@ -352,7 +356,8 @@ public class GroupCoordinatorConfig {
.define(STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_CONFIG, INT, STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_MAX_HEARTBEAT_INTERVAL_MS_DOC)
.define(STREAMS_GROUP_MAX_SIZE_CONFIG, INT, STREAMS_GROUP_MAX_SIZE_DEFAULT, atLeast(1), MEDIUM, STREAMS_GROUP_MAX_SIZE_DOC)
.define(STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG, INT, STREAMS_GROUP_NUM_STANDBY_REPLICAS_DEFAULT, atLeast(0), MEDIUM, STREAMS_GROUP_NUM_STANDBY_REPLICAS_DOC)
.define(STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG, INT, STREAMS_GROUP_MAX_STANDBY_REPLICAS_DEFAULT, atLeast(0), MEDIUM, STREAMS_GROUP_MAX_STANDBY_REPLICAS_DOC);
.define(STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG, INT, STREAMS_GROUP_MAX_STANDBY_REPLICAS_DEFAULT, atLeast(0), MEDIUM, STREAMS_GROUP_MAX_STANDBY_REPLICAS_DOC)
.define(STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, INT, STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DEFAULT, atLeast(0), MEDIUM, STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_DOC);


/**
Expand Down Expand Up @@ -405,6 +410,7 @@ public class GroupCoordinatorConfig {
private final int streamsGroupMaxSize;
private final int streamsGroupNumStandbyReplicas;
private final int streamsGroupMaxStandbyReplicas;
private final int streamsGroupInitialRebalanceDelayMs;

@SuppressWarnings("this-escape")
public GroupCoordinatorConfig(AbstractConfig config) {
Expand Down Expand Up @@ -457,6 +463,7 @@ public GroupCoordinatorConfig(AbstractConfig config) {
this.streamsGroupMaxSize = config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MAX_SIZE_CONFIG);
this.streamsGroupNumStandbyReplicas = config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_NUM_STANDBY_REPLICAS_CONFIG);
this.streamsGroupMaxStandbyReplicas = config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_MAX_STANDBY_REPLICAS_CONFIG);
this.streamsGroupInitialRebalanceDelayMs = config.getInt(GroupCoordinatorConfig.STREAMS_GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG);

// New group coordinator configs validation.
require(consumerGroupMaxHeartbeatIntervalMs >= consumerGroupMinHeartbeatIntervalMs,
Expand Down Expand Up @@ -961,4 +968,11 @@ public int streamsGroupNumStandbyReplicas() {
public int streamsGroupMaxNumStandbyReplicas() {
return streamsGroupMaxStandbyReplicas;
}

/**
* The initial rebalance delay for streams groups.
*/
public int streamsGroupInitialRebalanceDelayMs() {
return streamsGroupInitialRebalanceDelayMs;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1985,30 +1985,54 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> stream

// Actually bump the group epoch
int groupEpoch = group.groupEpoch();
boolean isInitialRebalance = (bumpGroupEpoch && groupEpoch == 0);
if (bumpGroupEpoch) {
groupEpoch += 1;
if (groupEpoch == 0) {
groupEpoch += 2;
} else {
groupEpoch += 1;
}
records.add(newStreamsGroupMetadataRecord(groupId, groupEpoch, metadataHash, validatedTopologyEpoch, currentAssignmentConfigs));
log.info("[GroupId {}][MemberId {}] Bumped streams group epoch to {} with metadata hash {} and validated topic epoch {}.", groupId, memberId, groupEpoch, metadataHash, validatedTopologyEpoch);
metrics.record(STREAMS_GROUP_REBALANCES_SENSOR_NAME);
group.setMetadataRefreshDeadline(currentTimeMs + METADATA_REFRESH_INTERVAL_MS, groupEpoch);
}

// Schedule initial rebalance delay for new streams groups to coalesce joins.
int initialDelayMs = streamsGroupInitialRebalanceDelayMs(groupId);
if (isInitialRebalance && initialDelayMs > 0) {
Comment on lines +2001 to +2003
Copy link

Copilot AI Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The isInitialRebalance check on line 1988 is evaluated before bumpGroupEpoch is processed (line 1989-1999), but the condition relies on group.isEmpty() which becomes false after the member is added to the group earlier in the method. This means isInitialRebalance will always be false for the first member join since the member is already added to the group before this check. The timer scheduling on line 2003 may never execute as intended.

Copilot uses AI. Check for mistakes.
timer.scheduleIfAbsent(
streamsInitialRebalanceKey(groupId),
initialDelayMs,
TimeUnit.MILLISECONDS,
false,
() -> fireStreamsInitialRebalance(groupId)
);
}
Comment on lines +2003 to +2011
Copy link

Copilot AI Oct 29, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The timer is only scheduled when bumpGroupEpoch is true (due to placement after line 1999), but isInitialRebalance is checked independently. If the first member join doesn't trigger a group epoch bump (which could happen if no metadata changes occur), the initial rebalance timer may not be scheduled. Consider scheduling the timer before the bumpGroupEpoch condition or ensuring it's always scheduled for the first member.

Copilot uses AI. Check for mistakes.

// 4. Update the target assignment if the group epoch is larger than the target assignment epoch or a static member
// replaces an existing static member.
// The delta between the existing and the new target assignment is persisted to the partition.
int targetAssignmentEpoch;
TasksTuple targetAssignment;
if (groupEpoch > group.assignmentEpoch()) {
targetAssignment = updateStreamsTargetAssignment(
group,
groupEpoch,
updatedMember,
updatedConfiguredTopology,
metadataImage,
records,
currentAssignmentConfigs
);
targetAssignmentEpoch = groupEpoch;
boolean initialDelayActive = timer.isScheduled(streamsInitialRebalanceKey(groupId));
if (initialDelayActive && group.assignmentEpoch() == 0) {
// During initial rebalance delay, return empty assignment to first joining members.
targetAssignmentEpoch = groupEpoch;
targetAssignment = TasksTuple.EMPTY;
} else {
targetAssignment = updateStreamsTargetAssignment(
group,
groupEpoch,
updatedMember,
updatedConfiguredTopology,
metadataImage,
records,
currentAssignmentConfigs
);
targetAssignmentEpoch = groupEpoch;
}
} else {
targetAssignmentEpoch = group.assignmentEpoch();
targetAssignment = group.targetAssignment(updatedMember.memberId());
Expand Down Expand Up @@ -3693,7 +3717,7 @@ private StreamsGroupMember maybeReconcile(
List<StreamsGroupHeartbeatRequestData.TaskIds> ownedWarmupTasks,
List<CoordinatorRecord> records
) {
if (member.isReconciledTo(targetAssignmentEpoch)) {
if (member.isReconciledTo(targetAssignmentEpoch) && member.assignedTasks().equals(targetAssignment)) {
return member;
}

Expand Down Expand Up @@ -4003,6 +4027,69 @@ private TasksTuple updateStreamsTargetAssignment(
}
}

/**
* Fires the initial rebalance for a streams group when the delay timer expires.
* Computes and persists target assignment for all members if conditions are met.
*
* @param groupId The group id.
* @return A CoordinatorResult with records to persist the target assignment, or EMPTY_RESULT.
*/
private CoordinatorResult<Void, CoordinatorRecord> fireStreamsInitialRebalance(
String groupId
) {
try {
StreamsGroup group = streamsGroup(groupId);

if (group.groupEpoch() <= group.assignmentEpoch()) {
return EMPTY_RESULT;
}

if (!group.configuredTopology().isPresent()) {
return EMPTY_RESULT;
}

TaskAssignor assignor = streamsGroupAssignor(group.groupId());

try {
org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder assignmentResultBuilder =
new org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder(
group.groupId(),
group.groupEpoch(),
assignor,
group.lastAssignmentConfigs()
)
.withMembers(group.members())
.withTopology(group.configuredTopology().get())
.withStaticMembers(group.staticMembers())
.withMetadataImage(metadataImage)
.withTargetAssignment(group.targetAssignment());

long startTimeMs = time.milliseconds();
org.apache.kafka.coordinator.group.streams.TargetAssignmentBuilder.TargetAssignmentResult assignmentResult =
assignmentResultBuilder.build();
long assignorTimeMs = time.milliseconds() - startTimeMs;

if (log.isDebugEnabled()) {
log.debug("[GroupId {}] Initial rebalance: Computed target assignment for epoch {} with '{}' assignor in {}ms: {}.",
group.groupId(), group.groupEpoch(), assignor, assignorTimeMs, assignmentResult.targetAssignment());
} else {
log.info("[GroupId {}] Initial rebalance: Computed target assignment for epoch {} with '{}' assignor in {}ms.",
group.groupId(), group.groupEpoch(), assignor, assignorTimeMs);
}

return new CoordinatorResult<>(assignmentResult.records(), null);
} catch (TaskAssignorException ex) {
String msg = String.format("Failed to compute target assignment for initial rebalance at epoch %d: %s",
group.groupEpoch(), ex.getMessage());
log.error("[GroupId {}] {}.", group.groupId(), msg);
throw new UnknownServerException(msg, ex);
}
} catch (GroupIdNotFoundException ex) {
log.warn("[GroupId {}] Group not found during initial rebalance.", groupId);
return EMPTY_RESULT;
}
}

/**
* Handles leave request from a consumer group member.
* @param groupId The group id from the request.
Expand Down Expand Up @@ -8570,6 +8657,10 @@ private boolean maybeDeleteEmptyStreamsGroup(String groupId, List<CoordinatorRec
// Add tombstones for the previous streams group. The tombstones won't actually be
// replayed because its coordinator result has a non-null appendFuture.
createGroupTombstoneRecords(group, records);
// Cancel any pending initial rebalance timer.
if (timer.isScheduled(streamsInitialRebalanceKey(groupId))) {
timer.cancel(streamsInitialRebalanceKey(groupId));
}
removeGroup(groupId);
return true;
}
Expand Down Expand Up @@ -8659,6 +8750,15 @@ private int streamsGroupHeartbeatIntervalMs(String groupId) {
.orElse(config.streamsGroupHeartbeatIntervalMs());
}

/**
* Get the initial rebalance delay of the provided streams group.
*/
private int streamsGroupInitialRebalanceDelayMs(String groupId) {
Optional<GroupConfig> groupConfig = groupConfigManager.groupConfig(groupId);
return groupConfig.map(GroupConfig::streamsInitialRebalanceDelayMs)
.orElse(config.streamsGroupInitialRebalanceDelayMs());
}

/**
* Get the assignor of the provided streams group.
*/
Expand Down Expand Up @@ -8715,6 +8815,19 @@ static String classicGroupJoinKey(String groupId) {
static String classicGroupSyncKey(String groupId) {
return "sync-" + groupId;
}

/**
* Generate a streams group initial rebalance key for the timer.
*
* Package private for testing.
*
* @param groupId The group id.
*
* @return the initial rebalance key.
*/
static String streamsInitialRebalanceKey(String groupId) {
return "initial-rebalance-timeout-" + groupId;
}

/**
* Generate a consumer group join key for the timer.
Expand Down
Loading
Loading