diff --git a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java index 7ef5a382584a4..fc29f93b883ad 100644 --- a/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java +++ b/group-coordinator/src/main/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignor.java @@ -25,6 +25,7 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; +import java.util.LinkedList; import java.util.Map; import java.util.Optional; import java.util.PriorityQueue; @@ -53,8 +54,7 @@ public GroupAssignment assign(final GroupSpec groupSpec, final TopologyDescriber } private GroupAssignment doAssign(final GroupSpec groupSpec, final TopologyDescriber topologyDescriber) { - //active - final Set activeTasks = taskIds(topologyDescriber, true); + final LinkedList activeTasks = taskIds(topologyDescriber, true); assignActive(activeTasks); //standby @@ -62,15 +62,15 @@ private GroupAssignment doAssign(final GroupSpec groupSpec, final TopologyDescri groupSpec.assignmentConfigs().isEmpty() ? 0 : Integer.parseInt(groupSpec.assignmentConfigs().get("num.standby.replicas")); if (numStandbyReplicas > 0) { - final Set statefulTasks = taskIds(topologyDescriber, false); + final LinkedList statefulTasks = taskIds(topologyDescriber, false); assignStandby(statefulTasks, numStandbyReplicas); } return buildGroupAssignment(groupSpec.members().keySet()); } - private Set taskIds(final TopologyDescriber topologyDescriber, final boolean isActive) { - final Set ret = new HashSet<>(); + private LinkedList taskIds(final TopologyDescriber topologyDescriber, final boolean isActive) { + final LinkedList ret = new LinkedList<>(); for (final String subtopology : topologyDescriber.subtopologies()) { if (isActive || topologyDescriber.isStateful(subtopology)) { final int numberOfPartitions = topologyDescriber.maxNumInputPartitions(subtopology); @@ -166,7 +166,10 @@ private Map> toCompactedTaskIds(final Set taskIds) return ret; } - private void assignActive(final Set activeTasks) { + private void assignActive(final LinkedList activeTasks) { + + // Assuming our current assignment pairs same partitions (range-based), we want to sort by partition first + activeTasks.sort(Comparator.comparing(TaskId::partition).thenComparing(TaskId::subtopologyId)); // 1. re-assigning existing active tasks to clients that previously had the same active tasks for (final Iterator it = activeTasks.iterator(); it.hasNext();) { @@ -193,6 +196,9 @@ private void assignActive(final Set activeTasks) { } } + // To achieve an initially range-based assignment, sort by subtopology + activeTasks.sort(Comparator.comparing(TaskId::subtopologyId).thenComparing(TaskId::partition)); + // 3. assign any remaining unassigned tasks final PriorityQueue processByLoad = new PriorityQueue<>(Comparator.comparingDouble(ProcessState::load)); processByLoad.addAll(localState.processIdToState.values()); @@ -296,9 +302,13 @@ private boolean hasUnfulfilledQuota(final Member member) { return localState.processIdToState.get(member.processId).memberToTaskCounts().get(member.memberId) < localState.tasksPerMember; } - private void assignStandby(final Set standbyTasks, final int numStandbyReplicas) { + private void assignStandby(final LinkedList standbyTasks, int numStandbyReplicas) { final ArrayList toLeastLoaded = new ArrayList<>(standbyTasks.size() * numStandbyReplicas); - for (final TaskId task : standbyTasks) { + + // Assuming our current assignment is range-based, we want to sort by partition first. + standbyTasks.sort(Comparator.comparing(TaskId::partition).thenComparing(TaskId::subtopologyId).reversed()); + + for (TaskId task : standbyTasks) { for (int i = 0; i < numStandbyReplicas; i++) { // prev active task @@ -329,6 +339,10 @@ private void assignStandby(final Set standbyTasks, final int numStandbyR } } + // To achieve a range-based assignment, sort by subtopology + toLeastLoaded.sort(Comparator.comparing(x -> x.taskId.subtopologyId()) + .thenComparing(x -> x.taskId.partition()).reversed()); + final PriorityQueue processByLoad = new PriorityQueue<>(Comparator.comparingDouble(ProcessState::load)); processByLoad.addAll(localState.processIdToState.values()); for (final StandbyToAssign toAssign : toLeastLoaded) { diff --git a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignorTest.java b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignorTest.java index b4fa9c4db9924..1e9d4115cb2bd 100644 --- a/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignorTest.java +++ b/group-coordinator/src/test/java/org/apache/kafka/coordinator/group/streams/assignor/StickyTaskAssignorTest.java @@ -1091,6 +1091,148 @@ public void shouldHandleEdgeCaseWithMoreStandbyReplicasThanAvailableClients() { assertEquals(numTasks, allStandbyTasks.size()); } + @Test + public void shouldReassignTasksWhenNewNodeJoinsWithExistingActiveAndStandbyAssignments() { + // Initial setup: Node 1 has active tasks 0,1 and standby tasks 2,3 + // Node 2 has active tasks 2,3 and standby tasks 0,1 + final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1", + mkMap(mkEntry("test-subtopology", Sets.newSet(0, 1))), + mkMap(mkEntry("test-subtopology", Sets.newSet(2, 3)))); + + final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2", + mkMap(mkEntry("test-subtopology", Sets.newSet(2, 3))), + mkMap(mkEntry("test-subtopology", Sets.newSet(0, 1)))); + + // Node 3 joins as new client + final AssignmentMemberSpec memberSpec3 = createAssignmentMemberSpec("process3"); + + final Map members = mkMap( + mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2), mkEntry("member3", memberSpec3)); + + final GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, mkMap(mkEntry(NUM_STANDBY_REPLICAS_CONFIG, "1"))), + new TopologyDescriberImpl(4, true, List.of("test-subtopology")) + ); + + // Verify all active tasks are assigned + final Set allAssignedActiveTasks = new HashSet<>(); + allAssignedActiveTasks.addAll(getAllActiveTaskIds(result, "member1")); + allAssignedActiveTasks.addAll(getAllActiveTaskIds(result, "member2")); + allAssignedActiveTasks.addAll(getAllActiveTaskIds(result, "member3")); + assertEquals(Sets.newSet(0, 1, 2, 3), allAssignedActiveTasks); + + // Verify all standby tasks are assigned + final Set allAssignedStandbyTasks = new HashSet<>(); + allAssignedStandbyTasks.addAll(getAllStandbyTaskIds(result, "member1")); + allAssignedStandbyTasks.addAll(getAllStandbyTaskIds(result, "member2")); + allAssignedStandbyTasks.addAll(getAllStandbyTaskIds(result, "member3")); + assertEquals(Sets.newSet(0, 1, 2, 3), allAssignedStandbyTasks); + + // Verify each member has 1-2 active tasks and at most 3 tasks total + assertTrue(getAllActiveTaskIds(result, "member1").size() >= 1 && getAllActiveTaskIds(result, "member1").size() <= 2); + assertTrue(getAllActiveTaskIds(result, "member1").size() + getAllStandbyTaskIds(result, "member1").size() <= 3); + + assertTrue(getAllActiveTaskIds(result, "member2").size() >= 1 && getAllActiveTaskIds(result, "member2").size() <= 2); + assertTrue(getAllActiveTaskIds(result, "member2").size() + getAllStandbyTaskIds(result, "member2").size() <= 3); + + assertTrue(getAllActiveTaskIds(result, "member3").size() >= 1 && getAllActiveTaskIds(result, "member3").size() <= 2); + assertTrue(getAllActiveTaskIds(result, "member3").size() + getAllStandbyTaskIds(result, "member3").size() <= 3); + } + + @Test + public void shouldRangeAssignTasksWhenScalingUp() { + // Two clients, the second one is new + final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1", + Map.of("test-subtopology1", Set.of(0, 1), "test-subtopology2", Set.of(0, 1)), + Map.of()); + final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2"); + final Map members = mkMap( + mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2)); + + // Two subtopologies with 2 tasks each (4 tasks total) with standby replicas enabled + final GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, mkMap(mkEntry(NUM_STANDBY_REPLICAS_CONFIG, String.valueOf(1)))), + new TopologyDescriberImpl(2, true, Arrays.asList("test-subtopology1", "test-subtopology2")) + ); + + // Each client should get one task from each subtopology + final MemberAssignment testMember1 = result.members().get("member1"); + assertNotNull(testMember1); + assertEquals(1, testMember1.activeTasks().get("test-subtopology1").size()); + assertEquals(1, testMember1.activeTasks().get("test-subtopology2").size()); + + final MemberAssignment testMember2 = result.members().get("member2"); + assertNotNull(testMember2); + assertEquals(1, testMember2.activeTasks().get("test-subtopology1").size()); + assertEquals(1, testMember2.activeTasks().get("test-subtopology2").size()); + + // Verify all tasks are assigned exactly once + final Set allSubtopology1Tasks = new HashSet<>(); + allSubtopology1Tasks.addAll(testMember1.activeTasks().get("test-subtopology1")); + allSubtopology1Tasks.addAll(testMember2.activeTasks().get("test-subtopology1")); + assertEquals(Sets.newSet(0, 1), allSubtopology1Tasks); + + final Set allSubtopology2Tasks = new HashSet<>(); + allSubtopology2Tasks.addAll(testMember1.activeTasks().get("test-subtopology2")); + allSubtopology2Tasks.addAll(testMember2.activeTasks().get("test-subtopology2")); + assertEquals(Sets.newSet(0, 1), allSubtopology2Tasks); + + // Each client should get one task from each subtopology + assertNotNull(testMember1); + assertEquals(1, testMember1.standbyTasks().get("test-subtopology1").size()); + assertEquals(1, testMember1.standbyTasks().get("test-subtopology2").size()); + + assertNotNull(testMember2); + assertEquals(1, testMember2.standbyTasks().get("test-subtopology1").size()); + assertEquals(1, testMember2.standbyTasks().get("test-subtopology2").size()); + } + + @Test + public void shouldRangeAssignTasksWhenStartingEmpty() { + // Two clients starting empty (no previous tasks) + final AssignmentMemberSpec memberSpec1 = createAssignmentMemberSpec("process1"); + final AssignmentMemberSpec memberSpec2 = createAssignmentMemberSpec("process2"); + final Map members = mkMap( + mkEntry("member1", memberSpec1), mkEntry("member2", memberSpec2)); + + // Two subtopologies with 2 tasks each (4 tasks total) with standby replicas enabled + final GroupAssignment result = assignor.assign( + new GroupSpecImpl(members, mkMap(mkEntry(NUM_STANDBY_REPLICAS_CONFIG, String.valueOf(1)))), + new TopologyDescriberImpl(2, true, Arrays.asList("test-subtopology1", "test-subtopology2")) + ); + + // Each client should get one task from each subtopology + final MemberAssignment testMember1 = result.members().get("member1"); + assertNotNull(testMember1); + assertEquals(1, testMember1.activeTasks().get("test-subtopology1").size()); + assertEquals(1, testMember1.activeTasks().get("test-subtopology2").size()); + + final MemberAssignment testMember2 = result.members().get("member2"); + assertNotNull(testMember2); + assertEquals(1, testMember2.activeTasks().get("test-subtopology1").size()); + assertEquals(1, testMember2.activeTasks().get("test-subtopology2").size()); + + // Verify all tasks are assigned exactly once + final Set allSubtopology1Tasks = new HashSet<>(); + allSubtopology1Tasks.addAll(testMember1.activeTasks().get("test-subtopology1")); + allSubtopology1Tasks.addAll(testMember2.activeTasks().get("test-subtopology1")); + assertEquals(Sets.newSet(0, 1), allSubtopology1Tasks); + + final Set allSubtopology2Tasks = new HashSet<>(); + allSubtopology2Tasks.addAll(testMember1.activeTasks().get("test-subtopology2")); + allSubtopology2Tasks.addAll(testMember2.activeTasks().get("test-subtopology2")); + assertEquals(Sets.newSet(0, 1), allSubtopology2Tasks); + + // Each client should get one task from each subtopology + assertNotNull(testMember1); + assertEquals(1, testMember1.standbyTasks().get("test-subtopology1").size()); + assertEquals(1, testMember1.standbyTasks().get("test-subtopology2").size()); + + assertNotNull(testMember2); + assertEquals(1, testMember2.standbyTasks().get("test-subtopology1").size()); + assertEquals(1, testMember2.standbyTasks().get("test-subtopology2").size()); + } + private int getAllActiveTaskCount(GroupAssignment result, String... memberIds) { int size = 0; diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/LegacyStickyTaskAssignorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/LegacyStickyTaskAssignorTest.java index 3103e72cd52d1..64830546f0f14 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/LegacyStickyTaskAssignorTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/assignment/LegacyStickyTaskAssignorTest.java @@ -1263,6 +1263,119 @@ public void shouldRemainOriginalAssignmentWithoutTrafficCostForMinCostStrategy(f } } + @ParameterizedTest + @ValueSource(strings = { + StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE, + StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC, + StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY + }) + public void shouldReassignTasksWhenNewNodeJoinsWithExistingActiveAndStandbyAssignments(final String rackAwareStrategy) { + setUp(rackAwareStrategy); + + // Initial setup: Node 1 has active tasks 0,1 and standby tasks 2,3 + // Node 2 has active tasks 2,3 and standby tasks 0,1 + final ClientState node1 = createClientWithPreviousActiveTasks(PID_1, 1, TASK_0_0, TASK_0_1); + node1.addPreviousStandbyTasks(Set.of(TASK_0_2, TASK_0_3)); + + final ClientState node2 = createClientWithPreviousActiveTasks(PID_2, 1, TASK_0_2, TASK_0_3); + node2.addPreviousStandbyTasks(Set.of(TASK_0_0, TASK_0_1)); + + // Node 3 joins as new client + final ClientState node3 = createClient(PID_3, 1); + + final boolean probingRebalanceNeeded = assign(1, rackAwareStrategy, TASK_0_0, TASK_0_1, TASK_0_2, TASK_0_3); + assertThat(probingRebalanceNeeded, is(false)); + + // Verify all active tasks are assigned + final Set allAssignedActiveTasks = new HashSet<>(); + allAssignedActiveTasks.addAll(node1.activeTasks()); + allAssignedActiveTasks.addAll(node2.activeTasks()); + allAssignedActiveTasks.addAll(node3.activeTasks()); + assertThat(allAssignedActiveTasks, equalTo(Set.of(TASK_0_0, TASK_0_1, TASK_0_2, TASK_0_3))); + + // Verify all standby tasks are assigned + final Set allAssignedStandbyTasks = new HashSet<>(); + allAssignedStandbyTasks.addAll(node1.standbyTasks()); + allAssignedStandbyTasks.addAll(node2.standbyTasks()); + allAssignedStandbyTasks.addAll(node3.standbyTasks()); + assertThat(allAssignedStandbyTasks, equalTo(Set.of(TASK_0_0, TASK_0_1, TASK_0_2, TASK_0_3))); + + // Verify each client has 1-2 active tasks and at most 3 tasks total + assertThat(node1.activeTasks().size(), greaterThanOrEqualTo(1)); + assertThat(node1.activeTasks().size(), lessThanOrEqualTo(2)); + assertThat(node1.activeTasks().size() + node1.standbyTasks().size(), lessThanOrEqualTo(3)); + + assertThat(node2.activeTasks().size(), greaterThanOrEqualTo(1)); + assertThat(node2.activeTasks().size(), lessThanOrEqualTo(2)); + assertThat(node2.activeTasks().size() + node2.standbyTasks().size(), lessThanOrEqualTo(3)); + + assertThat(node3.activeTasks().size(), greaterThanOrEqualTo(1)); + assertThat(node3.activeTasks().size(), lessThanOrEqualTo(2)); + assertThat(node3.activeTasks().size() + node3.standbyTasks().size(), lessThanOrEqualTo(3)); + } + + @ParameterizedTest + @ValueSource(strings = { + StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_NONE, + StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_MIN_TRAFFIC, + StreamsConfig.RACK_AWARE_ASSIGNMENT_STRATEGY_BALANCE_SUBTOPOLOGY + }) + public void shouldRangeAssignTasksWhenStartingEmpty(final String rackAwareStrategy) { + setUp(rackAwareStrategy); + + // Two clients with capacity 1 each, starting empty (no previous tasks) + createClient(PID_1, 1); + createClient(PID_2, 1); + + // Two subtopologies with 2 tasks each (4 tasks total) + final boolean probingRebalanceNeeded = assign(1, rackAwareStrategy, TASK_0_0, TASK_0_1, TASK_1_0, TASK_1_1); + assertThat(probingRebalanceNeeded, is(false)); + + // Each client should get one active task from each subtopology + final ClientState client1 = clients.get(PID_1); + final ClientState client2 = clients.get(PID_2); + + // Check that each client has one active task from subtopology 0 + final long client1Subtopology0ActiveCount = client1.activeTasks().stream() + .filter(task -> task.subtopology() == 0) + .count(); + final long client2Subtopology0ActiveCount = client2.activeTasks().stream() + .filter(task -> task.subtopology() == 0) + .count(); + assertThat(client1Subtopology0ActiveCount, equalTo(1L)); + assertThat(client2Subtopology0ActiveCount, equalTo(1L)); + + // Check that each client has one active task from subtopology 1 + final long client1Subtopology1ActiveCount = client1.activeTasks().stream() + .filter(task -> task.subtopology() == 1) + .count(); + final long client2Subtopology1ActiveCount = client2.activeTasks().stream() + .filter(task -> task.subtopology() == 1) + .count(); + assertThat(client1Subtopology1ActiveCount, equalTo(1L)); + assertThat(client2Subtopology1ActiveCount, equalTo(1L)); + + // Check that each client has one standby task from subtopology 0 + final long client1Subtopology0StandbyCount = client1.standbyTasks().stream() + .filter(task -> task.subtopology() == 0) + .count(); + final long client2Subtopology0StandbyCount = client2.standbyTasks().stream() + .filter(task -> task.subtopology() == 0) + .count(); + assertThat(client1Subtopology0StandbyCount, equalTo(1L)); + assertThat(client2Subtopology0StandbyCount, equalTo(1L)); + + // Check that each client has one standby task from subtopology 1 + final long client1Subtopology1StandbyCount = client1.standbyTasks().stream() + .filter(task -> task.subtopology() == 1) + .count(); + final long client2Subtopology1StandbyCount = client2.standbyTasks().stream() + .filter(task -> task.subtopology() == 1) + .count(); + assertThat(client1Subtopology1StandbyCount, equalTo(1L)); + assertThat(client2Subtopology1StandbyCount, equalTo(1L)); + } + private boolean assign(final String rackAwareStrategy, final TaskId... tasks) { return assign(0, rackAwareStrategy, tasks); } diff --git a/tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java b/tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java index 9c4d40167488a..6d7ea57b8ac6c 100644 --- a/tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/streams/DescribeStreamsGroupTest.java @@ -180,8 +180,8 @@ public void testDescribeStreamsGroupWithStateAndVerboseOptions() throws Exceptio public void testDescribeStreamsGroupWithMembersOption() throws Exception { final List expectedHeader = List.of("GROUP", "MEMBER", "PROCESS", "CLIENT-ID", "ASSIGNMENTS"); final Set> expectedRows = Set.of( - List.of(APP_ID, "", "", "", "ACTIVE:", "0:[0,1];"), - List.of(APP_ID, "", "", "", "ACTIVE:", "1:[0,1];")); + List.of(APP_ID, "", "", "", "ACTIVE:", "0:[1];", "1:[1];"), + List.of(APP_ID, "", "", "", "ACTIVE:", "0:[0];", "1:[0];")); // The member and process names as well as client-id are not deterministic, so we don't care about them. final List dontCares = List.of(1, 2, 3); @@ -193,8 +193,8 @@ public void testDescribeStreamsGroupWithMembersOption() throws Exception { public void testDescribeStreamsGroupWithMembersAndVerboseOptions() throws Exception { final List expectedHeader = List.of("GROUP", "TARGET-ASSIGNMENT-EPOCH", "TOPOLOGY-EPOCH", "MEMBER", "MEMBER-PROTOCOL", "MEMBER-EPOCH", "PROCESS", "CLIENT-ID", "ASSIGNMENTS"); final Set> expectedRows = Set.of( - List.of(APP_ID, "3", "0", "", "streams", "3", "", "", "ACTIVE:", "0:[0,1];", "TARGET-ACTIVE:", "0:[0,1];"), - List.of(APP_ID, "3", "0", "", "streams", "3", "", "", "ACTIVE:", "1:[0,1];", "TARGET-ACTIVE:", "1:[0,1];")); + List.of(APP_ID, "3", "0", "", "streams", "3", "", "", "ACTIVE:", "0:[1];", "1:[1];", "TARGET-ACTIVE:", "0:[1];", "1:[1];"), + List.of(APP_ID, "3", "0", "", "streams", "3", "", "", "ACTIVE:", "0:[0];", "1:[0];", "TARGET-ACTIVE:", "0:[0];", "1:[0];")); // The member and process names as well as client-id are not deterministic, so we don't care about them. final List dontCares = List.of(3, 6, 7); @@ -212,8 +212,8 @@ public void testDescribeMultipleStreamsGroupWithMembersAndVerboseOptions() throw final List expectedHeader = List.of("GROUP", "TARGET-ASSIGNMENT-EPOCH", "TOPOLOGY-EPOCH", "MEMBER", "MEMBER-PROTOCOL", "MEMBER-EPOCH", "PROCESS", "CLIENT-ID", "ASSIGNMENTS"); final Set> expectedRows1 = Set.of( - List.of(APP_ID, "3", "0", "", "streams", "3", "", "", "ACTIVE:", "0:[0,1];", "TARGET-ACTIVE:", "0:[0,1];"), - List.of(APP_ID, "3", "0", "", "streams", "3", "", "", "ACTIVE:", "1:[0,1];", "TARGET-ACTIVE:", "1:[0,1];")); + List.of(APP_ID, "3", "0", "", "streams", "3", "", "", "ACTIVE:", "0:[1];", "1:[1];", "TARGET-ACTIVE:", "0:[1];", "1:[1];"), + List.of(APP_ID, "3", "0", "", "streams", "3", "", "", "ACTIVE:", "0:[0];", "1:[0];", "TARGET-ACTIVE:", "0:[0];", "1:[0];")); final Set> expectedRows2 = Set.of( List.of(APP_ID_2, "3", "0", "", "streams", "3", "", "", "ACTIVE:", "1:[0];", "TARGET-ACTIVE:", "1:[0];"), List.of(APP_ID_2, "3", "0", "", "streams", "3", "", "", "ACTIVE:", "0:[0];", "TARGET-ACTIVE:", "0:[0];"));