From 0829390a86e3358b0616dd85b1107506dc276976 Mon Sep 17 00:00:00 2001 From: m1a2st Date: Fri, 6 Sep 2024 21:06:22 +0800 Subject: [PATCH 1/4] add GroupCoordinatorConfig in ConsumerGroupCommandTestUtils --- .../tools/consumer/group/ConsumerGroupCommandTestUtils.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java index e3674bd247df5..4a79937b68bc2 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java @@ -42,6 +42,7 @@ import static kafka.test.annotation.Type.CO_KRAFT; import static kafka.test.annotation.Type.ZK; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG; import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG; import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG; @@ -85,6 +86,7 @@ static List forKRaftGroupCoordinator() { Map serverProperties = new HashMap<>(); serverProperties.put(OFFSETS_TOPIC_PARTITIONS_CONFIG, "1"); serverProperties.put(OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1"); + serverProperties.put(GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, "1000"); return Collections.singletonList(ClusterConfig.defaultBuilder() .setTypes(Collections.singleton(CO_KRAFT)) @@ -97,6 +99,7 @@ static List forZkGroupCoordinator() { Map serverProperties = new HashMap<>(); serverProperties.put(OFFSETS_TOPIC_PARTITIONS_CONFIG, "1"); serverProperties.put(OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1"); + serverProperties.put(GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, "1000"); return Collections.singletonList(ClusterConfig.defaultBuilder() .setTypes(Collections.singleton(ZK)) From 17bf18dfadbe73a5a02b1846381a133de60e28bb Mon Sep 17 00:00:00 2001 From: m1a2st Date: Fri, 6 Sep 2024 21:35:30 +0800 Subject: [PATCH 2/4] add MAX_POLL_INTERVAL_MS_CONFIG in DescribeConsumerGroupTest --- .../kafka/tools/consumer/group/DescribeConsumerGroupTest.java | 1 + 1 file changed, 1 insertion(+) diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java index 32f86dab6b09d..2d41a9ad61ddc 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java @@ -987,6 +987,7 @@ private Map composeConfigs(String groupId, String groupProtocol, configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); configs.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol); configs.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName()); + configs.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "1000"); configs.putAll(customConfigs); return configs; From 8b18308d6eb2107817deae4bc41216e035be70d2 Mon Sep 17 00:00:00 2001 From: m1a2st Date: Mon, 9 Sep 2024 21:22:05 +0800 Subject: [PATCH 3/4] add new config for broker --- .../tools/consumer/group/ConsumerGroupCommandTestUtils.java | 6 ++++++ .../tools/consumer/group/DescribeConsumerGroupTest.java | 2 +- 2 files changed, 7 insertions(+), 1 deletion(-) diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java index 4a79937b68bc2..e45d50b6823ef 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/ConsumerGroupCommandTestUtils.java @@ -42,6 +42,8 @@ import static kafka.test.annotation.Type.CO_KRAFT; import static kafka.test.annotation.Type.ZK; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG; +import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG; import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG; import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_PARTITIONS_CONFIG; import static org.apache.kafka.coordinator.group.GroupCoordinatorConfig.OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG; @@ -87,6 +89,8 @@ static List forKRaftGroupCoordinator() { serverProperties.put(OFFSETS_TOPIC_PARTITIONS_CONFIG, "1"); serverProperties.put(OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1"); serverProperties.put(GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, "1000"); + serverProperties.put(CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, "500"); + serverProperties.put(CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, "500"); return Collections.singletonList(ClusterConfig.defaultBuilder() .setTypes(Collections.singleton(CO_KRAFT)) @@ -100,6 +104,8 @@ static List forZkGroupCoordinator() { serverProperties.put(OFFSETS_TOPIC_PARTITIONS_CONFIG, "1"); serverProperties.put(OFFSETS_TOPIC_REPLICATION_FACTOR_CONFIG, "1"); serverProperties.put(GROUP_INITIAL_REBALANCE_DELAY_MS_CONFIG, "1000"); + serverProperties.put(CONSUMER_GROUP_HEARTBEAT_INTERVAL_MS_CONFIG, "500"); + serverProperties.put(CONSUMER_GROUP_MIN_HEARTBEAT_INTERVAL_MS_CONFIG, "500"); return Collections.singletonList(ClusterConfig.defaultBuilder() .setTypes(Collections.singleton(ZK)) diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java index 2d41a9ad61ddc..818d57771654d 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java @@ -987,7 +987,7 @@ private Map composeConfigs(String groupId, String groupProtocol, configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); configs.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol); configs.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName()); - configs.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "1000"); + configs.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "500"); configs.putAll(customConfigs); return configs; From 4268f6cbdd6bffd1b1604758ad903b409da283fb Mon Sep 17 00:00:00 2001 From: m1a2st Date: Mon, 9 Sep 2024 22:35:52 +0800 Subject: [PATCH 4/4] remove MAX_POLL_INTERVAL_MS_CONFIG in consumer --- .../kafka/tools/consumer/group/DescribeConsumerGroupTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java index 818d57771654d..32f86dab6b09d 100644 --- a/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java +++ b/tools/src/test/java/org/apache/kafka/tools/consumer/group/DescribeConsumerGroupTest.java @@ -987,7 +987,6 @@ private Map composeConfigs(String groupId, String groupProtocol, configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName()); configs.put(ConsumerConfig.GROUP_PROTOCOL_CONFIG, groupProtocol); configs.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG, RangeAssignor.class.getName()); - configs.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "500"); configs.putAll(customConfigs); return configs;