Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.Map;
import java.util.Optional;
import java.util.PriorityQueue;
Expand Down Expand Up @@ -53,24 +54,23 @@ public GroupAssignment assign(final GroupSpec groupSpec, final TopologyDescriber
}

private GroupAssignment doAssign(final GroupSpec groupSpec, final TopologyDescriber topologyDescriber) {
//active
final Set<TaskId> activeTasks = taskIds(topologyDescriber, true);
final LinkedList<TaskId> activeTasks = taskIds(topologyDescriber, true);
assignActive(activeTasks);

//standby
final int numStandbyReplicas =
groupSpec.assignmentConfigs().isEmpty() ? 0
: Integer.parseInt(groupSpec.assignmentConfigs().get("num.standby.replicas"));
if (numStandbyReplicas > 0) {
final Set<TaskId> statefulTasks = taskIds(topologyDescriber, false);
final LinkedList<TaskId> statefulTasks = taskIds(topologyDescriber, false);
assignStandby(statefulTasks, numStandbyReplicas);
}

return buildGroupAssignment(groupSpec.members().keySet());
}

private Set<TaskId> taskIds(final TopologyDescriber topologyDescriber, final boolean isActive) {
final Set<TaskId> ret = new HashSet<>();
private LinkedList<TaskId> taskIds(final TopologyDescriber topologyDescriber, final boolean isActive) {
final LinkedList<TaskId> ret = new LinkedList<>();
for (final String subtopology : topologyDescriber.subtopologies()) {
if (isActive || topologyDescriber.isStateful(subtopology)) {
final int numberOfPartitions = topologyDescriber.maxNumInputPartitions(subtopology);
Expand Down Expand Up @@ -166,7 +166,10 @@ private Map<String, Set<Integer>> toCompactedTaskIds(final Set<TaskId> taskIds)
return ret;
}

private void assignActive(final Set<TaskId> activeTasks) {
private void assignActive(final LinkedList<TaskId> activeTasks) {

// Assuming our current assignment pairs same partitions (range-based), we want to sort by partition first
activeTasks.sort(Comparator.comparing(TaskId::partition).thenComparing(TaskId::subtopologyId));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the sorting here. The old assignor did not do the sorting explicitly, but randomly ran into the "good case".

The point is this:
Normally, we want to assign an active task like a range-assignor, when we have two subtopologies with two partitions and two clients, we will assign

Client1: 0_0, 1_0
Client2: 0_1, 1_1

The reason being, heuristically, if we'd have the assignment

Client1: 0_0, 0_1
Client2: 1_0, 1_1

and the first subtopology has large state and the second subtopology has small state, then one client gets most of the state.

The sorting here helps to also achieve this kind of range assignment when scaling up. Assume we have now all tasks assigned to the first member:

Client1: 0_0, 0_1, 1_0, 1_1
Client2: -

Now, we will first assign the previous tasks, we want to start with all 0 partitions, before doing all 1 partitions, until Client1 fills up:

Client1: 0_0, 1_0
Client2:

Then filling up client2 the usual way.

Client1: 0_0, 1_0
Client2: 1_0, 1_1

This is corner case, but seems like a useful improvement.


// 1. re-assigning existing active tasks to clients that previously had the same active tasks
for (final Iterator<TaskId> it = activeTasks.iterator(); it.hasNext();) {
Expand All @@ -193,6 +196,9 @@ private void assignActive(final Set<TaskId> activeTasks) {
}
}

// To achieve an initially range-based assignment, sort by subtopology
activeTasks.sort(Comparator.comparing(TaskId::subtopologyId).thenComparing(TaskId::partition));

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we do the second sort here by subtopologyId then partitions to get the range assignment to distribute optimizing for state across sub-topologies.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I should clarify in the comment that this mostly applies to the case where the number of partitions is a multiple of the number of nodes, and in particular the common case where number of partitions = number of nodes.

We assume we start from a pretty balanced assignment (all processes have roughly equal load). Then, the assignment by-load below is mostly a round-robin assignment in most situations:

  • If we start fresh, all processes have 0 load and we will do a complete round-robin assignment
  • If we scale down, all processes will have roughly the same N load and we will do roughly round-robin assignment
  • If we scale up, we will assign all the tasks that we didnt assign above to the new nodes. We will do a round-robin assignment among the new nodes.

// 3. assign any remaining unassigned tasks
final PriorityQueue<ProcessState> processByLoad = new PriorityQueue<>(Comparator.comparingDouble(ProcessState::load));
processByLoad.addAll(localState.processIdToState.values());
Expand Down Expand Up @@ -296,9 +302,13 @@ private boolean hasUnfulfilledQuota(final Member member) {
return localState.processIdToState.get(member.processId).memberToTaskCounts().get(member.memberId) < localState.tasksPerMember;
}

private void assignStandby(final Set<TaskId> standbyTasks, final int numStandbyReplicas) {
private void assignStandby(final LinkedList<TaskId> standbyTasks, int numStandbyReplicas) {
final ArrayList<StandbyToAssign> toLeastLoaded = new ArrayList<>(standbyTasks.size() * numStandbyReplicas);
for (final TaskId task : standbyTasks) {

// Assuming our current assignment is range-based, we want to sort by partition first.
standbyTasks.sort(Comparator.comparing(TaskId::partition).thenComparing(TaskId::subtopologyId).reversed());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to assign standby tasks in reverse.

The reason why we want to traverse standby tasks in reverse is the example that I added in the unit tests of both LegacyStickTaskAssignor and the new StickyTaskAssignor.

Assume we have
Node 1: Active task 0,1, Standby task 2,3
Node 2: Active task 2,3, Standby task 0,1
Node 3: - (new)

Then we don't want to assign active tasks and standby tasks in the same order.
Suppose we try to assign active tasks in increasing order, we will get:

Node 1: Active task 0,1
Node 2: Active task 2
Node 3: Active task 3

Since task 3 is the last task we will assign, and at that point, the quota for active tasks is 1, so it can only be assigned to Node 3.

Suppose now we assign standby tasks in the same order, we will get this:

Node 1: Active task 0,1, Standby task 2, 3
Node 2: Active task 2, Standby task 0, 1
Node 3: Active task 3

The reason is that we first assign tasks 0,1,2, which all can be assigned to the previous member that owned it. Finally, we want to assign standby task 3, but it cannot be assigned to Node 3, so we have to assign it to Node 1 or Node 2. Using reverse order means, when I have new nodes, they will get the numerically last few active tasks, and the numerically first standby tasks, which should avoid this problem.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using reverse order means, when I have new nodes, they will get the numerically last few active tasks, and the numerically first standby tasks,

I was going to ask about this working with the existing HA assignor, but I don't think that it applies anymore for KIP-1071, correct?

and the numerically first standby tasks

If I'm understanding your example correctly, previous ownership will take priority when assigning standbys?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was going to ask about this working with the existing HA assignor, but I don't think that it applies anymore for KIP-1071, correct?

Yes

If I'm understanding your example correctly, previous ownership will take priority when assigning standbys?

Yes


for (TaskId task : standbyTasks) {
for (int i = 0; i < numStandbyReplicas; i++) {

// prev active task
Expand Down Expand Up @@ -329,6 +339,10 @@ private void assignStandby(final Set<TaskId> standbyTasks, final int numStandbyR
}
}

// To achieve a range-based assignment, sort by subtopology
toLeastLoaded.sort(Comparator.<StandbyToAssign, String>comparing(x -> x.taskId.subtopologyId())
.thenComparing(x -> x.taskId.partition()).reversed());

final PriorityQueue<ProcessState> processByLoad = new PriorityQueue<>(Comparator.comparingDouble(ProcessState::load));
processByLoad.addAll(localState.processIdToState.values());
for (final StandbyToAssign toAssign : toLeastLoaded) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1091,6 +1091,148 @@ public void shouldHandleEdgeCaseWithMoreStandbyReplicasThanAvailableClients() {
assertEquals(numTasks, allStandbyTasks.size());
}

@Test
public void shouldReassignTasksWhenNewNodeJoinsWithExistingActiveAndStandbyAssignments() {
// Initial setup: Node 1 has active tasks 0,1 and standby tasks 2,3
// Node 2 has active tasks 2,3 and standby tasks 0,1
final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1",
mkMap(mkEntry("test-subtopology", Sets.newSet(0, 1))),
mkMap(mkEntry("test-subtopology", Sets.newSet(2, 3))));

final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2",
mkMap(mkEntry("test-subtopology", Sets.newSet(2, 3))),
mkMap(mkEntry("test-subtopology", Sets.newSet(0, 1))));

// Node 3 joins as new client
final AssignmentMemberSpec memberSpec3 = createAssignmentMemberSpec("process3");

final Map<String, AssignmentMemberSpec> members = mkMap(
mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2), mkEntry("member3", memberSpec3));

final GroupAssignment result = assignor.assign(
new GroupSpecImpl(members, mkMap(mkEntry(NUM_STANDBY_REPLICAS_CONFIG, "1"))),
new TopologyDescriberImpl(4, true, List.of("test-subtopology"))
);

// Verify all active tasks are assigned
final Set<Integer> allAssignedActiveTasks = new HashSet<>();
allAssignedActiveTasks.addAll(getAllActiveTaskIds(result, "member1"));
allAssignedActiveTasks.addAll(getAllActiveTaskIds(result, "member2"));
allAssignedActiveTasks.addAll(getAllActiveTaskIds(result, "member3"));
assertEquals(Sets.newSet(0, 1, 2, 3), allAssignedActiveTasks);

// Verify all standby tasks are assigned
final Set<Integer> allAssignedStandbyTasks = new HashSet<>();
allAssignedStandbyTasks.addAll(getAllStandbyTaskIds(result, "member1"));
allAssignedStandbyTasks.addAll(getAllStandbyTaskIds(result, "member2"));
allAssignedStandbyTasks.addAll(getAllStandbyTaskIds(result, "member3"));
assertEquals(Sets.newSet(0, 1, 2, 3), allAssignedStandbyTasks);

// Verify each member has 1-2 active tasks and at most 3 tasks total
assertTrue(getAllActiveTaskIds(result, "member1").size() >= 1 && getAllActiveTaskIds(result, "member1").size() <= 2);
assertTrue(getAllActiveTaskIds(result, "member1").size() + getAllStandbyTaskIds(result, "member1").size() <= 3);

assertTrue(getAllActiveTaskIds(result, "member2").size() >= 1 && getAllActiveTaskIds(result, "member2").size() <= 2);
assertTrue(getAllActiveTaskIds(result, "member2").size() + getAllStandbyTaskIds(result, "member2").size() <= 3);

assertTrue(getAllActiveTaskIds(result, "member3").size() >= 1 && getAllActiveTaskIds(result, "member3").size() <= 2);
assertTrue(getAllActiveTaskIds(result, "member3").size() + getAllStandbyTaskIds(result, "member3").size() <= 3);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also assert that the distribution of task ownership in addition to the owned count?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do you mean by distribution of task ownership?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're confirming the size or the number of tasks vs. the sub-topology where they are from but the test below confirms that already

}

@Test
public void shouldRangeAssignTasksWhenScalingUp() {
// Two clients, the second one is new
final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1",
Map.of("test-subtopology1", Set.of(0, 1), "test-subtopology2", Set.of(0, 1)),
Map.of());
final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2");
final Map<String, AssignmentMemberSpec> members = mkMap(
mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2));

// Two subtopologies with 2 tasks each (4 tasks total) with standby replicas enabled
final GroupAssignment result = assignor.assign(
new GroupSpecImpl(members, mkMap(mkEntry(NUM_STANDBY_REPLICAS_CONFIG, String.valueOf(1)))),
new TopologyDescriberImpl(2, true, Arrays.asList("test-subtopology1", "test-subtopology2"))
);

// Each client should get one task from each subtopology
final MemberAssignment testMember1 = result.members().get("member1");
assertNotNull(testMember1);
assertEquals(1, testMember1.activeTasks().get("test-subtopology1").size());
assertEquals(1, testMember1.activeTasks().get("test-subtopology2").size());

final MemberAssignment testMember2 = result.members().get("member2");
assertNotNull(testMember2);
assertEquals(1, testMember2.activeTasks().get("test-subtopology1").size());
assertEquals(1, testMember2.activeTasks().get("test-subtopology2").size());

// Verify all tasks are assigned exactly once
final Set<Integer> allSubtopology1Tasks = new HashSet<>();
allSubtopology1Tasks.addAll(testMember1.activeTasks().get("test-subtopology1"));
allSubtopology1Tasks.addAll(testMember2.activeTasks().get("test-subtopology1"));
assertEquals(Sets.newSet(0, 1), allSubtopology1Tasks);

final Set<Integer> allSubtopology2Tasks = new HashSet<>();
allSubtopology2Tasks.addAll(testMember1.activeTasks().get("test-subtopology2"));
allSubtopology2Tasks.addAll(testMember2.activeTasks().get("test-subtopology2"));
assertEquals(Sets.newSet(0, 1), allSubtopology2Tasks);

// Each client should get one task from each subtopology
assertNotNull(testMember1);
assertEquals(1, testMember1.standbyTasks().get("test-subtopology1").size());
assertEquals(1, testMember1.standbyTasks().get("test-subtopology2").size());

assertNotNull(testMember2);
assertEquals(1, testMember2.standbyTasks().get("test-subtopology1").size());
assertEquals(1, testMember2.standbyTasks().get("test-subtopology2").size());
}

@Test
public void shouldRangeAssignTasksWhenStartingEmpty() {
// Two clients starting empty (no previous tasks)
final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1");
final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2");
final Map<String, AssignmentMemberSpec> members = mkMap(
mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2));

// Two subtopologies with 2 tasks each (4 tasks total) with standby replicas enabled
final GroupAssignment result = assignor.assign(
new GroupSpecImpl(members, mkMap(mkEntry(NUM_STANDBY_REPLICAS_CONFIG, String.valueOf(1)))),
new TopologyDescriberImpl(2, true, Arrays.asList("test-subtopology1", "test-subtopology2"))
);

// Each client should get one task from each subtopology
final MemberAssignment testMember1 = result.members().get("member1");
assertNotNull(testMember1);
assertEquals(1, testMember1.activeTasks().get("test-subtopology1").size());
assertEquals(1, testMember1.activeTasks().get("test-subtopology2").size());

final MemberAssignment testMember2 = result.members().get("member2");
assertNotNull(testMember2);
assertEquals(1, testMember2.activeTasks().get("test-subtopology1").size());
assertEquals(1, testMember2.activeTasks().get("test-subtopology2").size());

// Verify all tasks are assigned exactly once
final Set<Integer> allSubtopology1Tasks = new HashSet<>();
allSubtopology1Tasks.addAll(testMember1.activeTasks().get("test-subtopology1"));
allSubtopology1Tasks.addAll(testMember2.activeTasks().get("test-subtopology1"));
assertEquals(Sets.newSet(0, 1), allSubtopology1Tasks);

final Set<Integer> allSubtopology2Tasks = new HashSet<>();
allSubtopology2Tasks.addAll(testMember1.activeTasks().get("test-subtopology2"));
allSubtopology2Tasks.addAll(testMember2.activeTasks().get("test-subtopology2"));
assertEquals(Sets.newSet(0, 1), allSubtopology2Tasks);

// Each client should get one task from each subtopology
assertNotNull(testMember1);
assertEquals(1, testMember1.standbyTasks().get("test-subtopology1").size());
assertEquals(1, testMember1.standbyTasks().get("test-subtopology2").size());

assertNotNull(testMember2);
assertEquals(1, testMember2.standbyTasks().get("test-subtopology1").size());
assertEquals(1, testMember2.standbyTasks().get("test-subtopology2").size());
}


private int getAllActiveTaskCount(GroupAssignment result, String... memberIds) {
int size = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1263,6 +1263,119 @@ public void shouldRemainOriginalAssignmentWithoutTrafficCostForMinCostStrategy(f
}
}

@ParameterizedTest
@ValueSource(strings = {
StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE,
StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC,
StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY
})
public void shouldReassignTasksWhenNewNodeJoinsWithExistingActiveAndStandbyAssignments(final String rackAwareStrategy) {
setUp(rackAwareStrategy);

// Initial setup: Node 1 has active tasks 0,1 and standby tasks 2,3
// Node 2 has active tasks 2,3 and standby tasks 0,1
final ClientState node1 = createClientWithPreviousActiveTasks(PID_1, 1, TASK_0_0, TASK_0_1);
node1.addPreviousStandbyTasks(Set.of(TASK_0_2, TASK_0_3));

final ClientState node2 = createClientWithPreviousActiveTasks(PID_2, 1, TASK_0_2, TASK_0_3);
node2.addPreviousStandbyTasks(Set.of(TASK_0_0, TASK_0_1));

// Node 3 joins as new client
final ClientState node3 = createClient(PID_3, 1);

final boolean probingRebalanceNeeded = assign(1, rackAwareStrategy, TASK_0_0, TASK_0_1, TASK_0_2, TASK_0_3);
assertThat(probingRebalanceNeeded, is(false));

// Verify all active tasks are assigned
final Set<TaskId> allAssignedActiveTasks = new HashSet<>();
allAssignedActiveTasks.addAll(node1.activeTasks());
allAssignedActiveTasks.addAll(node2.activeTasks());
allAssignedActiveTasks.addAll(node3.activeTasks());
assertThat(allAssignedActiveTasks, equalTo(Set.of(TASK_0_0, TASK_0_1, TASK_0_2, TASK_0_3)));

// Verify all standby tasks are assigned
final Set<TaskId> allAssignedStandbyTasks = new HashSet<>();
allAssignedStandbyTasks.addAll(node1.standbyTasks());
allAssignedStandbyTasks.addAll(node2.standbyTasks());
allAssignedStandbyTasks.addAll(node3.standbyTasks());
assertThat(allAssignedStandbyTasks, equalTo(Set.of(TASK_0_0, TASK_0_1, TASK_0_2, TASK_0_3)));

// Verify each client has 1-2 active tasks and at most 3 tasks total
assertThat(node1.activeTasks().size(), greaterThanOrEqualTo(1));
assertThat(node1.activeTasks().size(), lessThanOrEqualTo(2));
assertThat(node1.activeTasks().size() + node1.standbyTasks().size(), lessThanOrEqualTo(3));

assertThat(node2.activeTasks().size(), greaterThanOrEqualTo(1));
assertThat(node2.activeTasks().size(), lessThanOrEqualTo(2));
assertThat(node2.activeTasks().size() + node2.standbyTasks().size(), lessThanOrEqualTo(3));

assertThat(node3.activeTasks().size(), greaterThanOrEqualTo(1));
assertThat(node3.activeTasks().size(), lessThanOrEqualTo(2));
assertThat(node3.activeTasks().size() + node3.standbyTasks().size(), lessThanOrEqualTo(3));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same question about membership vs. task count - but I'm not sure if that applies in this case

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand the question

Copy link
Member

@bbejeck bbejeck Sep 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as my comment above - this is covered by another test

}

@ParameterizedTest
@ValueSource(strings = {
StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE,
StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC,
StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY
})
public void shouldRangeAssignTasksWhenStartingEmpty(final String rackAwareStrategy) {
setUp(rackAwareStrategy);

// Two clients with capacity 1 each, starting empty (no previous tasks)
createClient(PID_1, 1);
createClient(PID_2, 1);

// Two subtopologies with 2 tasks each (4 tasks total)
final boolean probingRebalanceNeeded = assign(1, rackAwareStrategy, TASK_0_0, TASK_0_1, TASK_1_0, TASK_1_1);
assertThat(probingRebalanceNeeded, is(false));

// Each client should get one active task from each subtopology
final ClientState client1 = clients.get(PID_1);
final ClientState client2 = clients.get(PID_2);

// Check that each client has one active task from subtopology 0
final long client1Subtopology0ActiveCount = client1.activeTasks().stream()
.filter(task -> task.subtopology() == 0)
.count();
final long client2Subtopology0ActiveCount = client2.activeTasks().stream()
.filter(task -> task.subtopology() == 0)
.count();
assertThat(client1Subtopology0ActiveCount, equalTo(1L));
assertThat(client2Subtopology0ActiveCount, equalTo(1L));

// Check that each client has one active task from subtopology 1
final long client1Subtopology1ActiveCount = client1.activeTasks().stream()
.filter(task -> task.subtopology() == 1)
.count();
final long client2Subtopology1ActiveCount = client2.activeTasks().stream()
.filter(task -> task.subtopology() == 1)
.count();
assertThat(client1Subtopology1ActiveCount, equalTo(1L));
assertThat(client2Subtopology1ActiveCount, equalTo(1L));

// Check that each client has one standby task from subtopology 0
final long client1Subtopology0StandbyCount = client1.standbyTasks().stream()
.filter(task -> task.subtopology() == 0)
.count();
final long client2Subtopology0StandbyCount = client2.standbyTasks().stream()
.filter(task -> task.subtopology() == 0)
.count();
assertThat(client1Subtopology0StandbyCount, equalTo(1L));
assertThat(client2Subtopology0StandbyCount, equalTo(1L));

// Check that each client has one standby task from subtopology 1
final long client1Subtopology1StandbyCount = client1.standbyTasks().stream()
.filter(task -> task.subtopology() == 1)
.count();
final long client2Subtopology1StandbyCount = client2.standbyTasks().stream()
.filter(task -> task.subtopology() == 1)
.count();
assertThat(client1Subtopology1StandbyCount, equalTo(1L));
assertThat(client2Subtopology1StandbyCount, equalTo(1L));
}

private boolean assign(final String rackAwareStrategy, final TaskId... tasks) {
return assign(0, rackAwareStrategy, tasks);
}
Expand Down
Loading