Skip to content

Conversation

@lucasbru
Copy link
Member

@lucasbru lucasbru commented Sep 5, 2025

This is actually fixing a difference between the old and the new
assignor. Given the assignment ordering, the legacy assignor has a
preference for range-style assignments built in, that is, assigning

C1: 0_0, 1_0 C2: 0_1, 1_1

instead of

C1: 0_0, 0_1 C2: 1_0, 1_1

We add tests to both assignors to check for this behavior, and improve
the new assingor by enforcing corresponding orderings.

Reviewers: Bill Bejeck [email protected]

@lucasbru lucasbru changed the title KAFKA-19661 [5/N]: Prefer range-style assignment KAFKA-19661 [4/N]: Prefer range-style assignment Sep 5, 2025
@lucasbru lucasbru requested review from bbejeck and Copilot September 5, 2025 10:14
Copilot

This comment was marked as outdated.

private void assignActive(final LinkedList<TaskId> activeTasks) {

// Assuming our current assignment pairs same partitions (range-based), we want to sort by partition first
activeTasks.sort(Comparator.comparing(TaskId::partition).thenComparing(TaskId::subtopologyId));
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added the sorting here. The old assignor did not do the sorting explicitly, but randomly ran into the "good case".

The point is this:
Normally, we want to assign an active task like a range-assignor, when we have two subtopologies with two partitions and two clients, we will assign

Client1: 0_0, 1_0
Client2: 0_1, 1_1

The reason being, heuristically, if we'd have the assignment

Client1: 0_0, 0_1
Client2: 1_0, 1_1

and the first subtopology has large state and the second subtopology has small state, then one client gets most of the state.

The sorting here helps to also achieve this kind of range assignment when scaling up. Assume we have now all tasks assigned to the first member:

Client1: 0_0, 0_1, 1_0, 1_1
Client2: -

Now, we will first assign the previous tasks, we want to start with all 0 partitions, before doing all 1 partitions, until Client1 fills up:

Client1: 0_0, 1_0
Client2:

Then filling up client2 the usual way.

Client1: 0_0, 1_0
Client2: 1_0, 1_1

This is corner case, but seems like a useful improvement.

for (final TaskId task : standbyTasks) {

// Assuming our current assignment is range-based, we want to sort by partition first.
standbyTasks.sort(Comparator.comparing(TaskId::partition).thenComparing(TaskId::subtopologyId).reversed());
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We want to assign standby tasks in reverse.

The reason why we want to traverse standby tasks in reverse is the example that I added in the unit tests of both LegacyStickTaskAssignor and the new StickyTaskAssignor.

Assume we have
Node 1: Active task 0,1, Standby task 2,3
Node 2: Active task 2,3, Standby task 0,1
Node 3: - (new)

Then we don't want to assign active tasks and standby tasks in the same order.
Suppose we try to assign active tasks in increasing order, we will get:

Node 1: Active task 0,1
Node 2: Active task 2
Node 3: Active task 3

Since task 3 is the last task we will assign, and at that point, the quota for active tasks is 1, so it can only be assigned to Node 3.

Suppose now we assign standby tasks in the same order, we will get this:

Node 1: Active task 0,1, Standby task 2, 3
Node 2: Active task 2, Standby task 0, 1
Node 3: Active task 3

The reason is that we first assign tasks 0,1,2, which all can be assigned to the previous member that owned it. Finally, we want to assign standby task 3, but it cannot be assigned to Node 3, so we have to assign it to Node 1 or Node 2. Using reverse order means, when I have new nodes, they will get the numerically last few active tasks, and the numerically first standby tasks, which should avoid this problem.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using reverse order means, when I have new nodes, they will get the numerically last few active tasks, and the numerically first standby tasks,

I was going to ask about this working with the existing HA assignor, but I don't think that it applies anymore for KIP-1071, correct?

and the numerically first standby tasks

If I'm understanding your example correctly, previous ownership will take priority when assigning standbys?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was going to ask about this working with the existing HA assignor, but I don't think that it applies anymore for KIP-1071, correct?

Yes

If I'm understanding your example correctly, previous ownership will take priority when assigning standbys?

Yes

Copy link
Member

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @lucasbru - I've left some comments - overall lgtm

for (final TaskId task : standbyTasks) {

// Assuming our current assignment is range-based, we want to sort by partition first.
standbyTasks.sort(Comparator.comparing(TaskId::partition).thenComparing(TaskId::subtopologyId).reversed());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using reverse order means, when I have new nodes, they will get the numerically last few active tasks, and the numerically first standby tasks,

I was going to ask about this working with the existing HA assignor, but I don't think that it applies anymore for KIP-1071, correct?

and the numerically first standby tasks

If I'm understanding your example correctly, previous ownership will take priority when assigning standbys?


// To achieve an initially range-based assignment, sort by subtopology
activeTasks.sort(Comparator.comparing(TaskId::subtopologyId).thenComparing(TaskId::partition));

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So we do the second sort here by subtopologyId then partitions to get the range assignment to distribute optimizing for state across sub-topologies.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. I should clarify in the comment that this mostly applies to the case where the number of partitions is a multiple of the number of nodes, and in particular the common case where number of partitions = number of nodes.

We assume we start from a pretty balanced assignment (all processes have roughly equal load). Then, the assignment by-load below is mostly a round-robin assignment in most situations:

  • If we start fresh, all processes have 0 load and we will do a complete round-robin assignment
  • If we scale down, all processes will have roughly the same N load and we will do roughly round-robin assignment
  • If we scale up, we will assign all the tasks that we didnt assign above to the new nodes. We will do a round-robin assignment among the new nodes.

assertTrue(getAllActiveTaskIds(result, "member2").size() + getAllStandbyTaskIds(result, "member2").size() <= 3);

assertTrue(getAllActiveTaskIds(result, "member3").size() >= 1 && getAllActiveTaskIds(result, "member3").size() <= 2);
assertTrue(getAllActiveTaskIds(result, "member3").size() + getAllStandbyTaskIds(result, "member3").size() <= 3);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also assert that the distribution of task ownership in addition to the owned count?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what do you mean by distribution of task ownership?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We're confirming the size or the number of tasks vs. the sub-topology where they are from but the test below confirms that already


assertThat(node3.activeTasks().size(), greaterThanOrEqualTo(1));
assertThat(node3.activeTasks().size(), lessThanOrEqualTo(2));
assertThat(node3.activeTasks().size() + node3.standbyTasks().size(), lessThanOrEqualTo(3));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same question about membership vs. task count - but I'm not sure if that applies in this case

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure I understand the question

Copy link
Member

@bbejeck bbejeck Sep 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as my comment above - this is covered by another test

List.of(APP_ID, "", "", "", "ACTIVE:", "0:[0,1];"),
List.of(APP_ID, "", "", "", "ACTIVE:", "1:[0,1];"));
List.of(APP_ID, "", "", "", "ACTIVE:", "0:[1];", "1:[1];"),
List.of(APP_ID, "", "", "", "ACTIVE:", "0:[0];", "1:[0];"));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is confirming the subtopology_partition task ids right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes

Copy link
Member

@bbejeck bbejeck left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR @lucasbru - LGTM

@lucasbru lucasbru requested a review from Copilot September 8, 2025 14:57
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR implements range-style assignment preference in Kafka Streams task assignors to maintain consistency between legacy and new assignor implementations. The change ensures that tasks are assigned in a range-based pattern (e.g., C1: 0_0, 1_0; C2: 0_1, 1_1) rather than round-robin (e.g., C1: 0_0, 0_1; C2: 1_0, 1_1).

  • Updates the new StickyTaskAssignor to prefer range-style assignments through explicit sorting
  • Adds comprehensive test coverage for range-style assignment behavior in both legacy and new assignors
  • Updates existing test expectations to reflect the new assignment pattern

Reviewed Changes

Copilot reviewed 4 out of 4 changed files in this pull request and generated 3 comments.

File Description
StickyTaskAssignor.java Implements range-style assignment sorting for both active and standby tasks
StickyTaskAssignorTest.java Adds new tests to verify range-style assignment behavior
LegacyStickyTaskAssignorTest.java Adds tests to verify existing range-style assignment behavior
DescribeStreamsGroupTest.java Updates test expectations to match new assignment pattern

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.

@lucasbru lucasbru merged commit 620a01b into apache:trunk Sep 9, 2025
46 of 48 checks passed
@lucasbru lucasbru added the KIP-1071 PRs related to KIP-1071 label Oct 23, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants