From 6052da41d568c5a946d46206fc91e8f221bbbf86 Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Mon, 26 May 2025 15:14:48 +0530 Subject: [PATCH 1/5] Modified describegroup test --- .../admin/test_describe_operations.py | 37 ++++++++++++++----- 1 file changed, 28 insertions(+), 9 deletions(-) diff --git a/tests/integration/admin/test_describe_operations.py b/tests/integration/admin/test_describe_operations.py index 3bfbfd88a..6f907c7fe 100644 --- a/tests/integration/admin/test_describe_operations.py +++ b/tests/integration/admin/test_describe_operations.py @@ -19,7 +19,7 @@ from confluent_kafka.admin import (AclBinding, AclBindingFilter, ResourceType, ResourcePatternType, AclOperation, AclPermissionType) from confluent_kafka.error import ConsumeError -from confluent_kafka import ConsumerGroupState, TopicCollection +from confluent_kafka import ConsumerGroupState, TopicCollection, ConsumerGroupType from tests.common import TestUtils @@ -30,12 +30,16 @@ def verify_commit_result(err, _): assert err is not None -def consume_messages(sasl_cluster, group_id, topic, num_messages=None): +def consume_messages(sasl_cluster, group_id, group_protocol, topic, num_messages=None): conf = {'group.id': group_id, - 'session.timeout.ms': 6000, + 'group.protocol': group_protocol, 'enable.auto.commit': False, 'on_commit': verify_commit_result, 'auto.offset.reset': 'earliest'} + + if group_protocol == 'classic' : + conf['session.timeout.ms'] = 6000 + consumer = sasl_cluster.consumer(conf) consumer.subscribe([topic]) read_messages = 0 @@ -164,7 +168,7 @@ def verify_describe_groups(cluster, admin_client, topic): # Consume some messages for the group group = 'test-group' - consume_messages(cluster, group, topic, 2) + consume_messages(cluster, group, 'classic', topic, 2) # Verify Describe Consumer Groups desc = verify_provided_describe_for_authorized_operations(admin_client, @@ -177,10 +181,29 @@ def verify_describe_groups(cluster, admin_client, topic): assert group == desc.group_id assert desc.is_simple_consumer_group is False assert desc.state == ConsumerGroupState.EMPTY + assert desc.type == ConsumerGroupType.CLASSIC # Delete group perform_admin_operation_sync(admin_client.delete_consumer_groups, [group], request_timeout=10) + consumer_group = 'test-group-consumer' + + consume_messages(cluster, consumer_group, 'consumer', topic, 2) + + desc = verify_provided_describe_for_authorized_operations(admin_client, + admin_client.describe_consumer_groups, + AclOperation.READ, + AclOperation.DELETE, + ResourceType.GROUP, + consumer_group, + [consumer_group]) + assert consumer_group == desc.group_id + assert desc.is_simple_consumer_group is False + assert desc.state == ConsumerGroupState.EMPTY + assert desc.type == ConsumerGroupType.CONSUMER + + # Delete group + perform_admin_operation_sync(admin_client.delete_consumer_groups, [consumer_group], request_timeout=10) def verify_describe_cluster(admin_client): desc = verify_provided_describe_for_authorized_operations(admin_client, @@ -217,11 +240,7 @@ def test_describe_operations(sasl_cluster): verify_describe_topics(admin_client, our_topic) # Verify Authorized Operations in Describe Groups - # Skip this test if using group protocol `consumer` - # as there is new RPC for describe_groups() in - # group protocol `consumer` case. - if not TestUtils.use_group_protocol_consumer(): - verify_describe_groups(sasl_cluster, admin_client, our_topic) + verify_describe_groups(sasl_cluster, admin_client, our_topic) # Delete Topic perform_admin_operation_sync(admin_client.delete_topics, [our_topic], operation_timeout=0, request_timeout=10) From a63cac44c64d085a0db1e201bf9ca98a06afd472 Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Mon, 26 May 2025 15:24:22 +0530 Subject: [PATCH 2/5] style fix --- tests/integration/admin/test_describe_operations.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/tests/integration/admin/test_describe_operations.py b/tests/integration/admin/test_describe_operations.py index 6f907c7fe..4f7778d7f 100644 --- a/tests/integration/admin/test_describe_operations.py +++ b/tests/integration/admin/test_describe_operations.py @@ -21,8 +21,6 @@ from confluent_kafka.error import ConsumeError from confluent_kafka import ConsumerGroupState, TopicCollection, ConsumerGroupType -from tests.common import TestUtils - topic_prefix = "test-topic" @@ -37,7 +35,7 @@ def consume_messages(sasl_cluster, group_id, group_protocol, topic, num_messages 'on_commit': verify_commit_result, 'auto.offset.reset': 'earliest'} - if group_protocol == 'classic' : + if group_protocol == 'classic': conf['session.timeout.ms'] = 6000 consumer = sasl_cluster.consumer(conf) @@ -186,7 +184,7 @@ def verify_describe_groups(cluster, admin_client, topic): # Delete group perform_admin_operation_sync(admin_client.delete_consumer_groups, [group], request_timeout=10) - consumer_group = 'test-group-consumer' + consumer_group = 'test-group-consumer' consume_messages(cluster, consumer_group, 'consumer', topic, 2) @@ -205,6 +203,7 @@ def verify_describe_groups(cluster, admin_client, topic): # Delete group perform_admin_operation_sync(admin_client.delete_consumer_groups, [consumer_group], request_timeout=10) + def verify_describe_cluster(admin_client): desc = verify_provided_describe_for_authorized_operations(admin_client, admin_client.describe_cluster, From e45e9632d0f6993ca54ea66a68a38c0fc4bb0c42 Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Mon, 26 May 2025 19:25:11 +0530 Subject: [PATCH 3/5] style fix and updated condition in update_conf_group_protocol --- tests/common/__init__.py | 3 ++- tests/integration/admin/test_describe_operations.py | 2 +- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/common/__init__.py b/tests/common/__init__.py index 3d9ec5c7a..5c5c201a9 100644 --- a/tests/common/__init__.py +++ b/tests/common/__init__.py @@ -56,7 +56,8 @@ def use_group_protocol_consumer(): @staticmethod def update_conf_group_protocol(conf=None): if conf is not None and 'group.id' in conf and TestUtils.use_group_protocol_consumer(): - conf['group.protocol'] = 'consumer' + if 'group.protocol' not in conf: + conf['group.protocol'] = 'consumer' @staticmethod def remove_forbidden_conf_group_protocol_consumer(conf): diff --git a/tests/integration/admin/test_describe_operations.py b/tests/integration/admin/test_describe_operations.py index 4f7778d7f..6d53df059 100644 --- a/tests/integration/admin/test_describe_operations.py +++ b/tests/integration/admin/test_describe_operations.py @@ -184,7 +184,7 @@ def verify_describe_groups(cluster, admin_client, topic): # Delete group perform_admin_operation_sync(admin_client.delete_consumer_groups, [group], request_timeout=10) - consumer_group = 'test-group-consumer' + consumer_group = 'test-group-consumer' consume_messages(cluster, consumer_group, 'consumer', topic, 2) From f370a0b8e20f7c0e9c47ea32d89a9793379152e5 Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Mon, 30 Jun 2025 15:20:04 +0530 Subject: [PATCH 4/5] tests changed --- tests/common/__init__.py | 3 +-- .../admin/test_describe_operations.py | 26 ++++--------------- 2 files changed, 6 insertions(+), 23 deletions(-) diff --git a/tests/common/__init__.py b/tests/common/__init__.py index 5c5c201a9..3d9ec5c7a 100644 --- a/tests/common/__init__.py +++ b/tests/common/__init__.py @@ -56,8 +56,7 @@ def use_group_protocol_consumer(): @staticmethod def update_conf_group_protocol(conf=None): if conf is not None and 'group.id' in conf and TestUtils.use_group_protocol_consumer(): - if 'group.protocol' not in conf: - conf['group.protocol'] = 'consumer' + conf['group.protocol'] = 'consumer' @staticmethod def remove_forbidden_conf_group_protocol_consumer(conf): diff --git a/tests/integration/admin/test_describe_operations.py b/tests/integration/admin/test_describe_operations.py index 6d53df059..60efbecfd 100644 --- a/tests/integration/admin/test_describe_operations.py +++ b/tests/integration/admin/test_describe_operations.py @@ -20,6 +20,7 @@ ResourcePatternType, AclOperation, AclPermissionType) from confluent_kafka.error import ConsumeError from confluent_kafka import ConsumerGroupState, TopicCollection, ConsumerGroupType +from tests.common import TestUtils topic_prefix = "test-topic" @@ -166,7 +167,9 @@ def verify_describe_groups(cluster, admin_client, topic): # Consume some messages for the group group = 'test-group' - consume_messages(cluster, group, 'classic', topic, 2) + group_type = ConsumerGroupType.CLASSIC if TestUtils.use_group_protocol_consumer() else ConsumerGroupType.CONSUMER + group_type_str = 'classic' if group_type == ConsumerGroupType.CLASSIC else 'consumer' + consume_messages(cluster, group, group_type_str, topic, 2) # Verify Describe Consumer Groups desc = verify_provided_describe_for_authorized_operations(admin_client, @@ -179,30 +182,11 @@ def verify_describe_groups(cluster, admin_client, topic): assert group == desc.group_id assert desc.is_simple_consumer_group is False assert desc.state == ConsumerGroupState.EMPTY - assert desc.type == ConsumerGroupType.CLASSIC + assert desc.type == group_type # Delete group perform_admin_operation_sync(admin_client.delete_consumer_groups, [group], request_timeout=10) - consumer_group = 'test-group-consumer' - - consume_messages(cluster, consumer_group, 'consumer', topic, 2) - - desc = verify_provided_describe_for_authorized_operations(admin_client, - admin_client.describe_consumer_groups, - AclOperation.READ, - AclOperation.DELETE, - ResourceType.GROUP, - consumer_group, - [consumer_group]) - assert consumer_group == desc.group_id - assert desc.is_simple_consumer_group is False - assert desc.state == ConsumerGroupState.EMPTY - assert desc.type == ConsumerGroupType.CONSUMER - - # Delete group - perform_admin_operation_sync(admin_client.delete_consumer_groups, [consumer_group], request_timeout=10) - def verify_describe_cluster(admin_client): desc = verify_provided_describe_for_authorized_operations(admin_client, From 830b12ce1e96a6142bc9cdb01cc9ef9ef5d2d03e Mon Sep 17 00:00:00 2001 From: PratRanj07 Date: Mon, 30 Jun 2025 16:07:45 +0530 Subject: [PATCH 5/5] typo --- tests/integration/admin/test_describe_operations.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/integration/admin/test_describe_operations.py b/tests/integration/admin/test_describe_operations.py index 60efbecfd..d7eb43921 100644 --- a/tests/integration/admin/test_describe_operations.py +++ b/tests/integration/admin/test_describe_operations.py @@ -167,7 +167,7 @@ def verify_describe_groups(cluster, admin_client, topic): # Consume some messages for the group group = 'test-group' - group_type = ConsumerGroupType.CLASSIC if TestUtils.use_group_protocol_consumer() else ConsumerGroupType.CONSUMER + group_type = ConsumerGroupType.CONSUMER if TestUtils.use_group_protocol_consumer() else ConsumerGroupType.CLASSIC group_type_str = 'classic' if group_type == ConsumerGroupType.CLASSIC else 'consumer' consume_messages(cluster, group, group_type_str, topic, 2)