---
.../java/org/apache/kafka/clients/producer/KafkaProducer.java | 3 ++-
1 file changed, 2 insertions(+), 1 deletion(-)
diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
index 648de3ab4b90a..64930b36e27e4 100644
--- a/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
+++ b/clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
@@ -1154,7 +1154,8 @@ private void ensureValidRecordSize(int size) {
/**
* Invoking this method makes all buffered records immediately available to send (even if linger.ms
is
* greater than 0) and blocks on the completion of the requests associated with these records. The post-condition
- * of flush()
is that any previously sent record will have completed (e.g. Future.isDone() == true
).
+ * of flush()
is that any previously sent record will have completed (e.g. Future.isDone() == true
+ * and callbacks passed to {@link #send(ProducerRecord,Callback)} have been called).
* A request is considered completed when it is successfully acknowledged
* according to the acks
configuration you have specified or else it results in an error.
*
From 7e46087570d70f42b85fa6f0e23d3b2441591eb4 Mon Sep 17 00:00:00 2001
From: Ken Huang
Date: Fri, 24 Jan 2025 00:33:05 +0800
Subject: [PATCH 31/44] MINOR: rename `resendBrokerRegistrationUnlessZkMode` to
`resendBrokerRegistration` (#18645)
Reviewers: Chia-Ping Tsai
---
.../main/scala/kafka/server/BrokerLifecycleManager.scala | 6 +++---
core/src/main/scala/kafka/server/BrokerServer.scala | 2 +-
.../unit/kafka/server/BrokerLifecycleManagerTest.scala | 2 +-
3 files changed, 5 insertions(+), 5 deletions(-)
diff --git a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
index bd01311feddc4..3079af0a307bd 100644
--- a/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
+++ b/core/src/main/scala/kafka/server/BrokerLifecycleManager.scala
@@ -260,11 +260,11 @@ class BrokerLifecycleManager(
new OfflineDirBrokerFailureEvent(directory))
}
- def resendBrokerRegistrationUnlessZkMode(): Unit = {
- eventQueue.append(new ResendBrokerRegistrationUnlessZkModeEvent())
+ def resendBrokerRegistration(): Unit = {
+ eventQueue.append(new ResendBrokerRegistrationEvent())
}
- private class ResendBrokerRegistrationUnlessZkModeEvent extends EventQueue.Event {
+ private class ResendBrokerRegistrationEvent extends EventQueue.Event {
override def run(): Unit = {
registered = false
scheduleNextCommunicationImmediately()
diff --git a/core/src/main/scala/kafka/server/BrokerServer.scala b/core/src/main/scala/kafka/server/BrokerServer.scala
index ace134773ae2b..3c735be0004c4 100644
--- a/core/src/main/scala/kafka/server/BrokerServer.scala
+++ b/core/src/main/scala/kafka/server/BrokerServer.scala
@@ -534,7 +534,7 @@ class BrokerServer(
})
metadataPublishers.add(brokerMetadataPublisher)
brokerRegistrationTracker = new BrokerRegistrationTracker(config.brokerId,
- () => lifecycleManager.resendBrokerRegistrationUnlessZkMode())
+ () => lifecycleManager.resendBrokerRegistration())
metadataPublishers.add(brokerRegistrationTracker)
diff --git a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
index a7204ffe884c3..5b621671ad6e6 100644
--- a/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/server/BrokerLifecycleManagerTest.scala
@@ -288,7 +288,7 @@ class BrokerLifecycleManagerTest {
assertEquals(1000L, manager.brokerEpoch)
// Trigger JBOD MV update
- manager.resendBrokerRegistrationUnlessZkMode()
+ manager.resendBrokerRegistration()
// Accept new registration, response sets epoch to 1200
nextRegistrationRequest(1200L)
From aea699bdef01b169b73f14c6f1d58df456056e16 Mon Sep 17 00:00:00 2001
From: Lucas Brutschy
Date: Thu, 23 Jan 2025 17:35:03 +0100
Subject: [PATCH 32/44] KAFKA-18324: Add CurrentAssignmentBuilder (#18476)
Implements the current assignment builder, analogous to the current assignment builder of consumer groups. The main difference is the underlying assigned resource, and slightly different logic around process IDs: We make sure to move a task only to a new client, once the task is not owned anymore by any client with the same process ID (sharing the same state directory) - in any role (active, standby or warm-up).
Compared to the feature branch, the main difference is that I refactored the separate treatment of active, standby and warm-up tasks into a compound datatype called TaskTuple (which is used in place of the more specific Assignment class). This also has effects on StreamsGroupMember.
Reviewers: Bruno Cadonna , Bill Bejeck
---
.../streams/CurrentAssignmentBuilder.java | 451 ++++++++++
.../group/streams/StreamsGroupMember.java | 163 +---
.../{Assignment.java => TasksTuple.java} | 73 +-
.../streams/assignor/GroupAssignment.java | 2 +-
.../streams/CurrentAssignmentBuilderTest.java | 825 ++++++++++++++++++
.../group/streams/StreamsGroupMemberTest.java | 132 +--
.../group/streams/TaskAssignmentTestUtil.java | 26 +-
...ssignmentTest.java => TasksTupleTest.java} | 95 +-
8 files changed, 1530 insertions(+), 237 deletions(-)
create mode 100644 group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java
rename group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/{Assignment.java => TasksTuple.java} (50%)
create mode 100644 group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilderTest.java
rename group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/{AssignmentTest.java => TasksTupleTest.java} (57%)
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java
new file mode 100644
index 0000000000000..3c9ba064a40cd
--- /dev/null
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/CurrentAssignmentBuilder.java
@@ -0,0 +1,451 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.coordinator.group.streams;
+
+import org.apache.kafka.common.errors.FencedMemberEpochException;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.function.BiFunction;
+import java.util.function.BiPredicate;
+
+/**
+ * The CurrentAssignmentBuilder class encapsulates the reconciliation engine of the streams group protocol. Given the current state of a
+ * member and a desired or target assignment state, the state machine takes the necessary steps to converge them.
+ */
+public class CurrentAssignmentBuilder {
+
+ /**
+ * The streams group member which is reconciled.
+ */
+ private final StreamsGroupMember member;
+
+ /**
+ * The target assignment epoch.
+ */
+ private int targetAssignmentEpoch;
+
+ /**
+ * The target assignment.
+ */
+ private TasksTuple targetAssignment;
+
+ /**
+ * A function which returns the current process ID of an active task or null if the active task
+ * is not assigned. The current process ID is the process ID of the current owner.
+ */
+ private BiFunction currentActiveTaskProcessId;
+
+ /**
+ * A function which returns the current process IDs of a standby task or null if the standby
+ * task is not assigned. The current process IDs are the process IDs of all current owners.
+ */
+ private BiFunction> currentStandbyTaskProcessIds;
+
+ /**
+ * A function which returns the current process IDs of a warmup task or null if the warmup task
+ * is not assigned. The current process IDs are the process IDs of all current owners.
+ */
+ private BiFunction> currentWarmupTaskProcessIds;
+
+ /**
+ * The tasks owned by the member. This may be provided by the member in the StreamsGroupHeartbeat request.
+ */
+ private Optional ownedTasks = Optional.empty();
+
+ /**
+ * Constructs the CurrentAssignmentBuilder based on the current state of the provided streams group member.
+ *
+ * @param member The streams group member that must be reconciled.
+ */
+ public CurrentAssignmentBuilder(StreamsGroupMember member) {
+ this.member = Objects.requireNonNull(member);
+ }
+
+ /**
+ * Sets the target assignment epoch and the target assignment that the streams group member must be reconciled to.
+ *
+ * @param targetAssignmentEpoch The target assignment epoch.
+ * @param targetAssignment The target assignment.
+ * @return This object.
+ */
+ public CurrentAssignmentBuilder withTargetAssignment(int targetAssignmentEpoch,
+ TasksTuple targetAssignment) {
+ this.targetAssignmentEpoch = targetAssignmentEpoch;
+ this.targetAssignment = Objects.requireNonNull(targetAssignment);
+ return this;
+ }
+
+ /**
+ * Sets a BiFunction which allows to retrieve the current process ID of an active task. This is
+ * used by the state machine to determine if an active task is free or still used by another
+ * member, and if there is still a task on a specific process that is not yet revoked.
+ *
+ * @param currentActiveTaskProcessId A BiFunction which gets the process ID of a subtopology ID /
+ * partition ID pair.
+ * @return This object.
+ */
+ public CurrentAssignmentBuilder withCurrentActiveTaskProcessId(BiFunction currentActiveTaskProcessId) {
+ this.currentActiveTaskProcessId = Objects.requireNonNull(currentActiveTaskProcessId);
+ return this;
+ }
+
+ /**
+ * Sets a BiFunction which allows to retrieve the current process IDs of a standby task. This is
+ * used by the state machine to determine if there is still a task on a specific process that is
+ * not yet revoked.
+ *
+ * @param currentStandbyTaskProcessIds A BiFunction which gets the process IDs of a subtopology
+ * ID / partition ID pair.
+ * @return This object.
+ */
+ public CurrentAssignmentBuilder withCurrentStandbyTaskProcessIds(
+ BiFunction> currentStandbyTaskProcessIds
+ ) {
+ this.currentStandbyTaskProcessIds = Objects.requireNonNull(currentStandbyTaskProcessIds);
+ return this;
+ }
+
+ /**
+ * Sets a BiFunction which allows to retrieve the current process IDs of a warmup task. This is
+ * used by the state machine to determine if there is still a task on a specific process that is
+ * not yet revoked.
+ *
+ * @param currentWarmupTaskProcessIds A BiFunction which gets the process IDs of a subtopology ID
+ * / partition ID pair.
+ * @return This object.
+ */
+ public CurrentAssignmentBuilder withCurrentWarmupTaskProcessIds(BiFunction> currentWarmupTaskProcessIds) {
+ this.currentWarmupTaskProcessIds = Objects.requireNonNull(currentWarmupTaskProcessIds);
+ return this;
+ }
+
+ /**
+ * Sets the tasks currently owned by the member. This comes directly from the last StreamsGroupHeartbeat request. This is used to
+ * determine if the member has revoked the necessary tasks. Passing null into this function means that the member did not provide
+ * its owned tasks in this heartbeat.
+ *
+ * @param ownedAssignment A collection of active, standby and warm-up tasks
+ * @return This object.
+ */
+ protected CurrentAssignmentBuilder withOwnedAssignment(TasksTuple ownedAssignment) {
+ this.ownedTasks = Optional.ofNullable(ownedAssignment);
+ return this;
+ }
+
+ /**
+ * Builds the next state for the member or keep the current one if it is not possible to move forward with the current state.
+ *
+ * @return A new StreamsGroupMember or the current one.
+ */
+ public StreamsGroupMember build() {
+ switch (member.state()) {
+ case STABLE:
+ // When the member is in the STABLE state, we verify if a newer
+ // epoch (or target assignment) is available. If it is, we can
+ // reconcile the member towards it. Otherwise, we return.
+ if (member.memberEpoch() != targetAssignmentEpoch) {
+ return computeNextAssignment(
+ member.memberEpoch(),
+ member.assignedTasks()
+ );
+ } else {
+ return member;
+ }
+
+ case UNREVOKED_TASKS:
+ // When the member is in the UNREVOKED_TASKS state, we wait
+ // until the member has revoked the necessary tasks. They are
+ // considered revoked when they are not anymore reported in the
+ // owned tasks set in the StreamsGroupHeartbeat API.
+
+ // If the member provides its owned tasks, we verify if it still
+ // owns any of the revoked tasks. If it did not provide it's
+ // owned tasks, or we still own some of the revoked tasks, we
+ // cannot progress.
+ if (
+ ownedTasks.isEmpty() || ownedTasks.get().containsAny(member.tasksPendingRevocation())
+ ) {
+ return member;
+ }
+
+ // When the member has revoked all the pending tasks, it can
+ // transition to the next epoch (current + 1) and we can reconcile
+ // its state towards the latest target assignment.
+ return computeNextAssignment(
+ member.memberEpoch() + 1,
+ member.assignedTasks()
+ );
+
+ case UNRELEASED_TASKS:
+ // When the member is in the UNRELEASED_TASKS, we reconcile the
+ // member towards the latest target assignment. This will assign any
+ // of the unreleased tasks when they become available.
+ return computeNextAssignment(
+ member.memberEpoch(),
+ member.assignedTasks()
+ );
+
+ case UNKNOWN:
+ // We could only end up in this state if a new state is added in the
+ // future and the group coordinator is downgraded. In this case, the
+ // best option is to fence the member to force it to rejoin the group
+ // without any tasks and to reconcile it again from scratch.
+ if ((ownedTasks.isEmpty() || !ownedTasks.get().isEmpty())) {
+ throw new FencedMemberEpochException(
+ "The streams group member is in a unknown state. "
+ + "The member must abandon all its tasks and rejoin.");
+ }
+
+ return computeNextAssignment(
+ targetAssignmentEpoch,
+ member.assignedTasks()
+ );
+ }
+
+ return member;
+ }
+
+ /**
+ * Takes the current currentAssignment and the targetAssignment, and generates three
+ * collections:
+ *
+ * - the resultAssignedTasks: the tasks that are assigned in both the current and target
+ * assignments.
+ * - the resultTasksPendingRevocation: the tasks that are assigned in the current
+ * assignment but not in the target assignment.
+ * - the resultTasksPendingAssignment: the tasks that are assigned in the target assignment but
+ * not in the current assignment, and can be assigned currently (i.e., they are not owned by
+ * another member, as defined by the `isUnreleasedTask` predicate).
+ */
+ private boolean computeAssignmentDifference(Map> currentAssignment,
+ Map> targetAssignment,
+ Map> resultAssignedTasks,
+ Map> resultTasksPendingRevocation,
+ Map> resultTasksPendingAssignment,
+ BiPredicate isUnreleasedTask) {
+ boolean hasUnreleasedTasks = false;
+
+ Set allSubtopologyIds = new HashSet<>(targetAssignment.keySet());
+ allSubtopologyIds.addAll(currentAssignment.keySet());
+
+ for (String subtopologyId : allSubtopologyIds) {
+ hasUnreleasedTasks |= computeAssignmentDifferenceForOneSubtopology(
+ subtopologyId,
+ currentAssignment.getOrDefault(subtopologyId, Collections.emptySet()),
+ targetAssignment.getOrDefault(subtopologyId, Collections.emptySet()),
+ resultAssignedTasks,
+ resultTasksPendingRevocation,
+ resultTasksPendingAssignment,
+ isUnreleasedTask
+ );
+ }
+ return hasUnreleasedTasks;
+ }
+
+ private static boolean computeAssignmentDifferenceForOneSubtopology(final String subtopologyId,
+ final Set currentTasksForThisSubtopology,
+ final Set targetTasksForThisSubtopology,
+ final Map> resultAssignedTasks,
+ final Map> resultTasksPendingRevocation,
+ final Map> resultTasksPendingAssignment,
+ final BiPredicate isUnreleasedTask) {
+ // Result Assigned Tasks = Current Tasks ∩ Target Tasks
+ // i.e. we remove all tasks from the current assignment that are not in the target
+ // assignment
+ Set resultAssignedTasksForThisSubtopology = new HashSet<>(currentTasksForThisSubtopology);
+ resultAssignedTasksForThisSubtopology.retainAll(targetTasksForThisSubtopology);
+
+ // Result Tasks Pending Revocation = Current Tasks - Result Assigned Tasks
+ // i.e. we will ask the member to revoke all tasks in its current assignment that
+ // are not in the target assignment
+ Set resultTasksPendingRevocationForThisSubtopology = new HashSet<>(currentTasksForThisSubtopology);
+ resultTasksPendingRevocationForThisSubtopology.removeAll(resultAssignedTasksForThisSubtopology);
+
+ // Result Tasks Pending Assignment = Target Tasks - Result Assigned Tasks - Unreleased Tasks
+ // i.e. we will ask the member to assign all tasks in its target assignment,
+ // except those that are already assigned, and those that are unreleased
+ Set resultTasksPendingAssignmentForThisSubtopology = new HashSet<>(targetTasksForThisSubtopology);
+ resultTasksPendingAssignmentForThisSubtopology.removeAll(resultAssignedTasksForThisSubtopology);
+ boolean hasUnreleasedTasks = resultTasksPendingAssignmentForThisSubtopology.removeIf(taskId ->
+ isUnreleasedTask.test(subtopologyId, taskId)
+ );
+
+ if (!resultAssignedTasksForThisSubtopology.isEmpty()) {
+ resultAssignedTasks.put(subtopologyId, resultAssignedTasksForThisSubtopology);
+ }
+
+ if (!resultTasksPendingRevocationForThisSubtopology.isEmpty()) {
+ resultTasksPendingRevocation.put(subtopologyId, resultTasksPendingRevocationForThisSubtopology);
+ }
+
+ if (!resultTasksPendingAssignmentForThisSubtopology.isEmpty()) {
+ resultTasksPendingAssignment.put(subtopologyId, resultTasksPendingAssignmentForThisSubtopology);
+ }
+
+ return hasUnreleasedTasks;
+ }
+
+ /**
+ * Computes the next assignment.
+ *
+ * @param memberEpoch The epoch of the member to use. This may be different from
+ * the epoch in {@link CurrentAssignmentBuilder#member}.
+ * @param memberAssignedTasks The assigned tasks of the member to use.
+ * @return A new StreamsGroupMember.
+ */
+ private StreamsGroupMember computeNextAssignment(int memberEpoch,
+ TasksTuple memberAssignedTasks) {
+ Map> newActiveAssignedTasks = new HashMap<>();
+ Map> newActiveTasksPendingRevocation = new HashMap<>();
+ Map> newActiveTasksPendingAssignment = new HashMap<>();
+ Map> newStandbyAssignedTasks = new HashMap<>();
+ Map> newStandbyTasksPendingRevocation = new HashMap<>();
+ Map> newStandbyTasksPendingAssignment = new HashMap<>();
+ Map> newWarmupAssignedTasks = new HashMap<>();
+ Map> newWarmupTasksPendingRevocation = new HashMap<>();
+ Map> newWarmupTasksPendingAssignment = new HashMap<>();
+
+ boolean hasUnreleasedActiveTasks = computeAssignmentDifference(
+ memberAssignedTasks.activeTasks(),
+ targetAssignment.activeTasks(),
+ newActiveAssignedTasks,
+ newActiveTasksPendingRevocation,
+ newActiveTasksPendingAssignment,
+ (subtopologyId, partitionId) ->
+ currentActiveTaskProcessId.apply(subtopologyId, partitionId) != null ||
+ currentStandbyTaskProcessIds.apply(subtopologyId, partitionId)
+ .contains(member.processId()) ||
+ currentWarmupTaskProcessIds.apply(subtopologyId, partitionId)
+ .contains(member.processId())
+ );
+
+ boolean hasUnreleasedStandbyTasks = computeAssignmentDifference(
+ memberAssignedTasks.standbyTasks(),
+ targetAssignment.standbyTasks(),
+ newStandbyAssignedTasks,
+ newStandbyTasksPendingRevocation,
+ newStandbyTasksPendingAssignment,
+ (subtopologyId, partitionId) ->
+ Objects.equals(currentActiveTaskProcessId.apply(subtopologyId, partitionId),
+ member.processId()) ||
+ currentStandbyTaskProcessIds.apply(subtopologyId, partitionId)
+ .contains(member.processId()) ||
+ currentWarmupTaskProcessIds.apply(subtopologyId, partitionId)
+ .contains(member.processId())
+ );
+
+ boolean hasUnreleasedWarmupTasks = computeAssignmentDifference(
+ memberAssignedTasks.warmupTasks(),
+ targetAssignment.warmupTasks(),
+ newWarmupAssignedTasks,
+ newWarmupTasksPendingRevocation,
+ newWarmupTasksPendingAssignment,
+ (subtopologyId, partitionId) ->
+ Objects.equals(currentActiveTaskProcessId.apply(subtopologyId, partitionId),
+ member.processId()) ||
+ currentStandbyTaskProcessIds.apply(subtopologyId, partitionId)
+ .contains(member.processId()) ||
+ currentWarmupTaskProcessIds.apply(subtopologyId, partitionId)
+ .contains(member.processId())
+ );
+
+ return buildNewMember(
+ memberEpoch,
+ new TasksTuple(
+ newActiveTasksPendingRevocation,
+ newStandbyTasksPendingRevocation,
+ newWarmupTasksPendingRevocation
+ ),
+ new TasksTuple(
+ newActiveAssignedTasks,
+ newStandbyAssignedTasks,
+ newWarmupAssignedTasks
+ ),
+ new TasksTuple(
+ newActiveTasksPendingAssignment,
+ newStandbyTasksPendingAssignment,
+ newWarmupTasksPendingAssignment
+ ),
+ hasUnreleasedActiveTasks || hasUnreleasedStandbyTasks || hasUnreleasedWarmupTasks
+ );
+ }
+
+ private StreamsGroupMember buildNewMember(final int memberEpoch,
+ final TasksTuple newTasksPendingRevocation,
+ final TasksTuple newAssignedTasks,
+ final TasksTuple newTasksPendingAssignment,
+ final boolean hasUnreleasedTasks) {
+
+ final boolean hasTasksToBeRevoked =
+ (!newTasksPendingRevocation.isEmpty())
+ && (ownedTasks.isEmpty() || ownedTasks.get().containsAny(newTasksPendingRevocation));
+
+ if (hasTasksToBeRevoked) {
+ // If there are tasks to be revoked, the member remains in its current
+ // epoch and requests the revocation of those tasks. It transitions to
+ // the UNREVOKED_TASKS state to wait until the client acknowledges the
+ // revocation of the tasks.
+ return new StreamsGroupMember.Builder(member)
+ .setState(MemberState.UNREVOKED_TASKS)
+ .updateMemberEpoch(memberEpoch)
+ .setAssignedTasks(newAssignedTasks)
+ .setTasksPendingRevocation(newTasksPendingRevocation)
+ .build();
+ } else if (!newTasksPendingAssignment.isEmpty()) {
+ // If there are tasks to be assigned, the member transitions to the
+ // target epoch and requests the assignment of those tasks. Note that
+ // the tasks are directly added to the assigned tasks set. The
+ // member transitions to the STABLE state or to the UNRELEASED_TASKS
+ // state depending on whether there are unreleased tasks or not.
+ MemberState newState =
+ hasUnreleasedTasks
+ ? MemberState.UNRELEASED_TASKS
+ : MemberState.STABLE;
+ return new StreamsGroupMember.Builder(member)
+ .setState(newState)
+ .updateMemberEpoch(targetAssignmentEpoch)
+ .setAssignedTasks(newAssignedTasks.merge(newTasksPendingAssignment))
+ .setTasksPendingRevocation(TasksTuple.EMPTY)
+ .build();
+ } else if (hasUnreleasedTasks) {
+ // If there are no tasks to be revoked nor to be assigned but some
+ // tasks are not available yet, the member transitions to the target
+ // epoch, to the UNRELEASED_TASKS state and waits.
+ return new StreamsGroupMember.Builder(member)
+ .setState(MemberState.UNRELEASED_TASKS)
+ .updateMemberEpoch(targetAssignmentEpoch)
+ .setAssignedTasks(newAssignedTasks)
+ .setTasksPendingRevocation(TasksTuple.EMPTY)
+ .build();
+ } else {
+ // Otherwise, the member transitions to the target epoch and to the
+ // STABLE state.
+ return new StreamsGroupMember.Builder(member)
+ .setState(MemberState.STABLE)
+ .updateMemberEpoch(targetAssignmentEpoch)
+ .setAssignedTasks(newAssignedTasks)
+ .setTasksPendingRevocation(TasksTuple.EMPTY)
+ .build();
+ }
+ }
+}
diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java
index e23df3f5701c4..612e72fabddaa 100644
--- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java
+++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/StreamsGroupMember.java
@@ -49,18 +49,8 @@
* @param userEndpoint The user endpoint exposed for Interactive Queries by the Streams client that
* contains the member.
* @param clientTags Tags of the client of the member used for rack-aware assignment.
- * @param assignedActiveTasks Active tasks assigned to the member.
- * The key of the map is the subtopology ID and the value is the set of partition IDs.
- * @param assignedStandbyTasks Standby tasks assigned to the member.
- * The key of the map is the subtopology ID and the value is the set of partition IDs.
- * @param assignedWarmupTasks Warm-up tasks assigned to the member.
- * The key of the map is the subtopology ID and the value is the set of partition IDs.
- * @param activeTasksPendingRevocation Active tasks assigned to the member pending revocation.
- * The key of the map is the subtopology ID and the value is the set of partition IDs.
- * @param standbyTasksPendingRevocation Standby tasks assigned to the member pending revocation.
- * The key of the map is the subtopology ID and the value is the set of partition IDs.
- * @param warmupTasksPendingRevocation Warm-up tasks assigned to the member pending revocation.
- * The key of the map is the subtopology ID and the value is the set of partition IDs.
+ * @param assignedTasks Tasks assigned to the member.
+ * @param tasksPendingRevocation Tasks owned by the member pending revocation.
*/
@SuppressWarnings("checkstyle:JavaNCSS")
public record StreamsGroupMember(String memberId,
@@ -76,22 +66,12 @@ public record StreamsGroupMember(String memberId,
String processId,
Optional userEndpoint,
Map clientTags,
- Map> assignedActiveTasks,
- Map> assignedStandbyTasks,
- Map> assignedWarmupTasks,
- Map> activeTasksPendingRevocation,
- Map> standbyTasksPendingRevocation,
- Map> warmupTasksPendingRevocation) {
+ TasksTuple assignedTasks,
+ TasksTuple tasksPendingRevocation) {
public StreamsGroupMember {
Objects.requireNonNull(memberId, "memberId cannot be null");
clientTags = clientTags != null ? Collections.unmodifiableMap(clientTags) : null;
- assignedActiveTasks = assignedActiveTasks != null ? Collections.unmodifiableMap(assignedActiveTasks) : null;
- assignedStandbyTasks = assignedStandbyTasks != null ? Collections.unmodifiableMap(assignedStandbyTasks) : null;
- assignedWarmupTasks = assignedWarmupTasks != null ? Collections.unmodifiableMap(assignedWarmupTasks) : null;
- activeTasksPendingRevocation = activeTasksPendingRevocation != null ? Collections.unmodifiableMap(activeTasksPendingRevocation) : null;
- standbyTasksPendingRevocation = standbyTasksPendingRevocation != null ? Collections.unmodifiableMap(standbyTasksPendingRevocation) : null;
- warmupTasksPendingRevocation = warmupTasksPendingRevocation != null ? Collections.unmodifiableMap(warmupTasksPendingRevocation) : null;
}
/**
@@ -114,12 +94,8 @@ public static class Builder {
private String processId = null;
private Optional userEndpoint = null;
private Map clientTags = null;
- private Map> assignedActiveTasks = null;
- private Map> assignedStandbyTasks = null;
- private Map> assignedWarmupTasks = null;
- private Map> activeTasksPendingRevocation = null;
- private Map> standbyTasksPendingRevocation = null;
- private Map> warmupTasksPendingRevocation = null;
+ private TasksTuple assignedTasks = null;
+ private TasksTuple tasksPendingRevocation = null;
public Builder(String memberId) {
this.memberId = Objects.requireNonNull(memberId, "memberId cannot be null");
@@ -141,12 +117,8 @@ public Builder(StreamsGroupMember member) {
this.userEndpoint = member.userEndpoint;
this.clientTags = member.clientTags;
this.state = member.state;
- this.assignedActiveTasks = member.assignedActiveTasks;
- this.assignedStandbyTasks = member.assignedStandbyTasks;
- this.assignedWarmupTasks = member.assignedWarmupTasks;
- this.activeTasksPendingRevocation = member.activeTasksPendingRevocation;
- this.standbyTasksPendingRevocation = member.standbyTasksPendingRevocation;
- this.warmupTasksPendingRevocation = member.warmupTasksPendingRevocation;
+ this.assignedTasks = member.assignedTasks;
+ this.tasksPendingRevocation = member.tasksPendingRevocation;
}
public Builder updateMemberEpoch(int memberEpoch) {
@@ -251,50 +223,13 @@ public Builder maybeUpdateClientTags(Optional