Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -2900,7 +2900,6 @@ project(':streams:integration-tests') {
testImplementation libs.mockitoCore
testImplementation testLog4j2Libs
testImplementation project(':streams:test-utils')
testImplementation project(':test-common:test-common-util')

testRuntimeOnly runtimeTestLibs
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,7 @@
import org.apache.kafka.coordinator.group.streams.StreamsGroupMember;
import org.apache.kafka.coordinator.group.streams.StreamsTopology;
import org.apache.kafka.coordinator.group.streams.TasksTuple;
import org.apache.kafka.coordinator.group.streams.TasksTupleWithEpochs;
import org.apache.kafka.coordinator.group.streams.assignor.StickyTaskAssignor;
import org.apache.kafka.coordinator.group.streams.assignor.TaskAssignor;
import org.apache.kafka.coordinator.group.streams.assignor.TaskAssignorException;
Expand Down Expand Up @@ -1450,6 +1451,8 @@ private static boolean isSubset(
* it owns any other tasks.
*
* @param ownedTasks The tasks provided by the streams group member in the request.
* If this is null, it indicates that we do not know which
* tasks are owned by the member, so we return false.
* @param assignedTasks The tasks that the member should have.
*
* @return A boolean indicating whether the owned partitions are a subset or not.
Expand All @@ -1471,6 +1474,32 @@ private static boolean areOwnedTasksContainedInAssignedTasks(
return true;
}

/**
* Checks whether all the tasks contained in the list are included in the provided assignment with epochs.
*
* @param ownedTasks The tasks provided by the streams group member in the request.
* If this is null, it indicates that we do not know which
* tasks are owned by the member, so we return false.
* @param assignedTasksWithEpochs The tasks that the member should have (with epochs).
* @return A boolean indicating whether the owned partitions are a subset or not.
*/
private static boolean areOwnedTasksContainedInAssignedTasksWithEpochs(
List<StreamsGroupHeartbeatRequestData.TaskIds> ownedTasks,
Map<String, Map<Integer, Integer>> assignedTasksWithEpochs
) {
if (ownedTasks == null) return false;

for (StreamsGroupHeartbeatRequestData.TaskIds ownedTasksOfSubtopology : ownedTasks) {
Map<Integer, Integer> partitionsWithEpochs = assignedTasksWithEpochs.get(ownedTasksOfSubtopology.subtopologyId());
if (partitionsWithEpochs == null) return false;
for (Integer partitionId : ownedTasksOfSubtopology.partitions()) {
if (!partitionsWithEpochs.containsKey(partitionId)) return false;
}
}

return true;
}

/**
* Checks whether the consumer group can accept a new member or not based on the
* max group size defined.
Expand Down Expand Up @@ -1670,7 +1699,7 @@ private static void throwIfStreamsGroupMemberEpochIsInvalid(
// If the member comes with the previous epoch and has a subset of the current assignment partitions,
// we accept it because the response with the bumped epoch may have been lost.
if (receivedMemberEpoch != member.previousMemberEpoch()
|| !areOwnedTasksContainedInAssignedTasks(ownedActiveTasks, member.assignedTasks().activeTasks())
|| !areOwnedTasksContainedInAssignedTasksWithEpochs(ownedActiveTasks, member.assignedTasks().activeTasksWithEpochs())
|| !areOwnedTasksContainedInAssignedTasks(ownedStandbyTasks, member.assignedTasks().standbyTasks())
|| !areOwnedTasksContainedInAssignedTasks(ownedWarmupTasks, member.assignedTasks().warmupTasks())) {
throw new FencedMemberEpochException("The streams group member has a smaller member "
Expand Down Expand Up @@ -2044,7 +2073,7 @@ private CoordinatorResult<StreamsGroupHeartbeatResult, CoordinatorRecord> stream
// 1. The member is joining.
// 2. The member's assignment has been updated.
if (memberEpoch == 0 || hasAssignedTasksChanged(member, updatedMember)) {
response.setActiveTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedTasks().activeTasks()));
response.setActiveTasks(createStreamsGroupHeartbeatResponseTaskIdsFromEpochs(updatedMember.assignedTasks().activeTasksWithEpochs()));
response.setStandbyTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedTasks().standbyTasks()));
response.setWarmupTasks(createStreamsGroupHeartbeatResponseTaskIds(updatedMember.assignedTasks().warmupTasks()));
if (memberEpoch != 0 || !updatedMember.assignedTasks().isEmpty()) {
Expand Down Expand Up @@ -2157,6 +2186,16 @@ private List<StreamsGroupHeartbeatResponseData.TaskIds> createStreamsGroupHeartb
.collect(Collectors.toList());
}

private List<StreamsGroupHeartbeatResponseData.TaskIds> createStreamsGroupHeartbeatResponseTaskIdsFromEpochs(
final Map<String, Map<Integer, Integer>> taskIdsWithEpochs
) {
return taskIdsWithEpochs.entrySet().stream()
.map(entry -> new StreamsGroupHeartbeatResponseData.TaskIds()
.setSubtopologyId(entry.getKey())
.setPartitions(entry.getValue().keySet().stream().sorted().toList()))
.collect(Collectors.toList());
}

private List<StreamsGroupHeartbeatResponseData.EndpointToPartitions> maybeBuildEndpointToPartitions(StreamsGroup group,
StreamsGroupMember updatedMember) {
List<StreamsGroupHeartbeatResponseData.EndpointToPartitions> endpointToPartitionsList = new ArrayList<>();
Expand Down Expand Up @@ -5710,8 +5749,8 @@ public void replay(
StreamsGroupMember newMember = new StreamsGroupMember.Builder(oldMember)
.setMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH)
.setPreviousMemberEpoch(LEAVE_GROUP_MEMBER_EPOCH)
.setAssignedTasks(TasksTuple.EMPTY)
.setTasksPendingRevocation(TasksTuple.EMPTY)
.setAssignedTasks(TasksTupleWithEpochs.EMPTY)
.setTasksPendingRevocation(TasksTupleWithEpochs.EMPTY)
.build();
streamsGroup.updateMember(newMember);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,101 @@ private static boolean computeAssignmentDifferenceForOneSubtopology(final String
return hasUnreleasedTasks;
}

/**
* Takes the current currentAssignment and the targetAssignment, and generates three
* collections:
*
* - the resultAssignedTasks: the tasks that are assigned in both the current and target
* assignments.
* - the resultTasksPendingRevocation: the tasks that are assigned in the current
* assignment but not in the target assignment.
* - the resultTasksPendingAssignment: the tasks that are assigned in the target assignment but
* not in the current assignment, and can be assigned currently (i.e., they are not owned by
* another member, as defined by the `isUnreleasedTask` predicate).
*
* Epoch Handling:
* - For tasks in resultAssignedTasks and resultTasksPendingRevocation, the epoch from currentAssignment is preserved.
* - For tasks in resultTasksPendingAssignment, the targetAssignmentEpoch is used.
*/
private boolean computeAssignmentDifferenceWithEpoch(Map<String, Map<Integer, Integer>> currentAssignment,
Map<String, Set<Integer>> targetAssignment,
int targetAssignmentEpoch,
Map<String, Map<Integer, Integer>> resultAssignedTasks,
Map<String, Map<Integer, Integer>> resultTasksPendingRevocation,
Map<String, Map<Integer, Integer>> resultTasksPendingAssignment,
BiPredicate<String, Integer> isUnreleasedTask) {
boolean hasUnreleasedTasks = false;

Set<String> allSubtopologyIds = new HashSet<>(targetAssignment.keySet());
allSubtopologyIds.addAll(currentAssignment.keySet());

for (String subtopologyId : allSubtopologyIds) {
hasUnreleasedTasks |= computeAssignmentDifferenceForOneSubtopologyWithEpoch(
subtopologyId,
currentAssignment.getOrDefault(subtopologyId, Map.of()),
targetAssignment.getOrDefault(subtopologyId, Set.of()),
targetAssignmentEpoch,
resultAssignedTasks,
resultTasksPendingRevocation,
resultTasksPendingAssignment,
isUnreleasedTask
);
}
return hasUnreleasedTasks;
}

private static boolean computeAssignmentDifferenceForOneSubtopologyWithEpoch(final String subtopologyId,
final Map<Integer, Integer> currentTasksForThisSubtopology,
final Set<Integer> targetTasksForThisSubtopology,
final int targetAssignmentEpoch,
final Map<String, Map<Integer, Integer>> resultAssignedTasks,
final Map<String, Map<Integer, Integer>> resultTasksPendingRevocation,
final Map<String, Map<Integer, Integer>> resultTasksPendingAssignment,
final BiPredicate<String, Integer> isUnreleasedTask) {
// Result Assigned Tasks = Current Tasks ∩ Target Tasks
// i.e. we remove all tasks from the current assignment that are not in the target
// assignment
Map<Integer, Integer> resultAssignedTasksForThisSubtopology = new HashMap<>();
for (Map.Entry<Integer, Integer> entry : currentTasksForThisSubtopology.entrySet()) {
if (targetTasksForThisSubtopology.contains(entry.getKey())) {
resultAssignedTasksForThisSubtopology.put(entry.getKey(), entry.getValue());
}
}

// Result Tasks Pending Revocation = Current Tasks - Result Assigned Tasks
// i.e. we will ask the member to revoke all tasks in its current assignment that
// are not in the target assignment
Map<Integer, Integer> resultTasksPendingRevocationForThisSubtopology = new HashMap<>(currentTasksForThisSubtopology);
resultTasksPendingRevocationForThisSubtopology.keySet().removeAll(resultAssignedTasksForThisSubtopology.keySet());

// Result Tasks Pending Assignment = Target Tasks - Result Assigned Tasks - Unreleased Tasks
// i.e. we will ask the member to assign all tasks in its target assignment,
// except those that are already assigned, and those that are unreleased
Map<Integer, Integer> resultTasksPendingAssignmentForThisSubtopology = new HashMap<>();
for (Integer taskId : targetTasksForThisSubtopology) {
if (!resultAssignedTasksForThisSubtopology.containsKey(taskId)) {
resultTasksPendingAssignmentForThisSubtopology.put(taskId, targetAssignmentEpoch);
}
}
boolean hasUnreleasedTasks = resultTasksPendingAssignmentForThisSubtopology.keySet().removeIf(taskId ->
isUnreleasedTask.test(subtopologyId, taskId)
);

if (!resultAssignedTasksForThisSubtopology.isEmpty()) {
resultAssignedTasks.put(subtopologyId, resultAssignedTasksForThisSubtopology);
}

if (!resultTasksPendingRevocationForThisSubtopology.isEmpty()) {
resultTasksPendingRevocation.put(subtopologyId, resultTasksPendingRevocationForThisSubtopology);
}

if (!resultTasksPendingAssignmentForThisSubtopology.isEmpty()) {
resultTasksPendingAssignment.put(subtopologyId, resultTasksPendingAssignmentForThisSubtopology);
}

return hasUnreleasedTasks;
}

/**
* Computes the next assignment.
*
Expand All @@ -313,20 +408,21 @@ private static boolean computeAssignmentDifferenceForOneSubtopology(final String
* @return A new StreamsGroupMember.
*/
private StreamsGroupMember computeNextAssignment(int memberEpoch,
TasksTuple memberAssignedTasks) {
Map<String, Set<Integer>> newActiveAssignedTasks = new HashMap<>();
Map<String, Set<Integer>> newActiveTasksPendingRevocation = new HashMap<>();
Map<String, Set<Integer>> newActiveTasksPendingAssignment = new HashMap<>();
TasksTupleWithEpochs memberAssignedTasks) {
Map<String, Map<Integer, Integer>> newActiveAssignedTasks = new HashMap<>();
Map<String, Map<Integer, Integer>> newActiveTasksPendingRevocation = new HashMap<>();
Map<String, Map<Integer, Integer>> newActiveTasksPendingAssignment = new HashMap<>();
Map<String, Set<Integer>> newStandbyAssignedTasks = new HashMap<>();
Map<String, Set<Integer>> newStandbyTasksPendingRevocation = new HashMap<>();
Map<String, Set<Integer>> newStandbyTasksPendingAssignment = new HashMap<>();
Map<String, Set<Integer>> newWarmupAssignedTasks = new HashMap<>();
Map<String, Set<Integer>> newWarmupTasksPendingRevocation = new HashMap<>();
Map<String, Set<Integer>> newWarmupTasksPendingAssignment = new HashMap<>();

boolean hasUnreleasedActiveTasks = computeAssignmentDifference(
memberAssignedTasks.activeTasks(),
boolean hasUnreleasedActiveTasks = computeAssignmentDifferenceWithEpoch(
memberAssignedTasks.activeTasksWithEpochs(),
targetAssignment.activeTasks(),
targetAssignmentEpoch,
newActiveAssignedTasks,
newActiveTasksPendingRevocation,
newActiveTasksPendingAssignment,
Expand Down Expand Up @@ -370,17 +466,17 @@ private StreamsGroupMember computeNextAssignment(int memberEpoch,

return buildNewMember(
memberEpoch,
new TasksTuple(
new TasksTupleWithEpochs(
newActiveTasksPendingRevocation,
newStandbyTasksPendingRevocation,
newWarmupTasksPendingRevocation
),
new TasksTuple(
new TasksTupleWithEpochs(
newActiveAssignedTasks,
newStandbyAssignedTasks,
newWarmupAssignedTasks
),
new TasksTuple(
new TasksTupleWithEpochs(
newActiveTasksPendingAssignment,
newStandbyTasksPendingAssignment,
newWarmupTasksPendingAssignment
Expand All @@ -390,9 +486,9 @@ private StreamsGroupMember computeNextAssignment(int memberEpoch,
}

private StreamsGroupMember buildNewMember(final int memberEpoch,
final TasksTuple newTasksPendingRevocation,
final TasksTuple newAssignedTasks,
final TasksTuple newTasksPendingAssignment,
final TasksTupleWithEpochs newTasksPendingRevocation,
final TasksTupleWithEpochs newAssignedTasks,
final TasksTupleWithEpochs newTasksPendingAssignment,
final boolean hasUnreleasedTasks) {

final boolean hasTasksToBeRevoked =
Expand Down Expand Up @@ -424,7 +520,7 @@ private StreamsGroupMember buildNewMember(final int memberEpoch,
.setState(newState)
.updateMemberEpoch(targetAssignmentEpoch)
.setAssignedTasks(newAssignedTasks.merge(newTasksPendingAssignment))
.setTasksPendingRevocation(TasksTuple.EMPTY)
.setTasksPendingRevocation(TasksTupleWithEpochs.EMPTY)
.build();
} else if (hasUnreleasedTasks) {
// If there are no tasks to be revoked nor to be assigned but some
Expand All @@ -434,7 +530,7 @@ private StreamsGroupMember buildNewMember(final int memberEpoch,
.setState(MemberState.UNRELEASED_TASKS)
.updateMemberEpoch(targetAssignmentEpoch)
.setAssignedTasks(newAssignedTasks)
.setTasksPendingRevocation(TasksTuple.EMPTY)
.setTasksPendingRevocation(TasksTupleWithEpochs.EMPTY)
.build();
} else {
// Otherwise, the member transitions to the target epoch and to the
Expand All @@ -443,7 +539,7 @@ private StreamsGroupMember buildNewMember(final int memberEpoch,
.setState(MemberState.STABLE)
.updateMemberEpoch(targetAssignmentEpoch)
.setAssignedTasks(newAssignedTasks)
.setTasksPendingRevocation(TasksTuple.EMPTY)
.setTasksPendingRevocation(TasksTupleWithEpochs.EMPTY)
.build();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,10 +266,10 @@ public static CoordinatorRecord newStreamsGroupCurrentAssignmentRecord(
.setMemberEpoch(member.memberEpoch())
.setPreviousMemberEpoch(member.previousMemberEpoch())
.setState(member.state().value())
.setActiveTasks(toTaskIds(member.assignedTasks().activeTasks()))
.setActiveTasks(toTaskIdsWithEpochs(member.assignedTasks().activeTasksWithEpochs()))
.setStandbyTasks(toTaskIds(member.assignedTasks().standbyTasks()))
.setWarmupTasks(toTaskIds(member.assignedTasks().warmupTasks()))
.setActiveTasksPendingRevocation(toTaskIds(member.tasksPendingRevocation().activeTasks()))
.setActiveTasksPendingRevocation(toTaskIdsWithEpochs(member.tasksPendingRevocation().activeTasksWithEpochs()))
.setStandbyTasksPendingRevocation(toTaskIds(member.tasksPendingRevocation().standbyTasks()))
.setWarmupTasksPendingRevocation(toTaskIds(member.tasksPendingRevocation().warmupTasks())),
(short) 0
Expand Down Expand Up @@ -311,6 +311,33 @@ private static List<StreamsGroupCurrentMemberAssignmentValue.TaskIds> toTaskIds(
return taskIds;
}

private static List<StreamsGroupCurrentMemberAssignmentValue.TaskIds> toTaskIdsWithEpochs(
Map<String, Map<Integer, Integer>> tasksWithEpochs
) {
List<StreamsGroupCurrentMemberAssignmentValue.TaskIds> taskIds = new ArrayList<>(tasksWithEpochs.size());
tasksWithEpochs.forEach((subtopologyId, partitionEpochMap) -> {
// Sort by partition for consistent ordering
List<Map.Entry<Integer, Integer>> sortedEntries = partitionEpochMap.entrySet().stream()
.sorted(Comparator.comparingInt(Map.Entry::getKey))
.toList();

List<Integer> partitions = new ArrayList<>(sortedEntries.size());
List<Integer> epochs = new ArrayList<>(sortedEntries.size());

for (Map.Entry<Integer, Integer> entry : sortedEntries) {
partitions.add(entry.getKey());
epochs.add(entry.getValue());
}

taskIds.add(new StreamsGroupCurrentMemberAssignmentValue.TaskIds()
.setSubtopologyId(subtopologyId)
.setPartitions(partitions)
.setAssignmentEpochs(epochs));
});
taskIds.sort(Comparator.comparing(StreamsGroupCurrentMemberAssignmentValue.TaskIds::subtopologyId));
return taskIds;
}

/**
* Creates a StreamsTopology record.
*
Expand Down
Loading
Loading