20
20
ResourcePatternType , AclOperation , AclPermissionType )
21
21
from confluent_kafka .error import ConsumeError
22
22
from confluent_kafka import ConsumerGroupState , TopicCollection , ConsumerGroupType
23
+ from tests .common import TestUtils
23
24
24
25
topic_prefix = "test-topic"
25
26
@@ -166,7 +167,9 @@ def verify_describe_groups(cluster, admin_client, topic):
166
167
167
168
# Consume some messages for the group
168
169
group = 'test-group'
169
- consume_messages (cluster , group , 'classic' , topic , 2 )
170
+ group_type = ConsumerGroupType .CLASSIC if TestUtils .use_group_protocol_consumer () else ConsumerGroupType .CONSUMER
171
+ group_type_str = 'classic' if group_type == ConsumerGroupType .CLASSIC else 'consumer'
172
+ consume_messages (cluster , group , group_type_str , topic , 2 )
170
173
171
174
# Verify Describe Consumer Groups
172
175
desc = verify_provided_describe_for_authorized_operations (admin_client ,
@@ -179,30 +182,11 @@ def verify_describe_groups(cluster, admin_client, topic):
179
182
assert group == desc .group_id
180
183
assert desc .is_simple_consumer_group is False
181
184
assert desc .state == ConsumerGroupState .EMPTY
182
- assert desc .type == ConsumerGroupType . CLASSIC
185
+ assert desc .type == group_type
183
186
184
187
# Delete group
185
188
perform_admin_operation_sync (admin_client .delete_consumer_groups , [group ], request_timeout = 10 )
186
189
187
- consumer_group = 'test-group-consumer'
188
-
189
- consume_messages (cluster , consumer_group , 'consumer' , topic , 2 )
190
-
191
- desc = verify_provided_describe_for_authorized_operations (admin_client ,
192
- admin_client .describe_consumer_groups ,
193
- AclOperation .READ ,
194
- AclOperation .DELETE ,
195
- ResourceType .GROUP ,
196
- consumer_group ,
197
- [consumer_group ])
198
- assert consumer_group == desc .group_id
199
- assert desc .is_simple_consumer_group is False
200
- assert desc .state == ConsumerGroupState .EMPTY
201
- assert desc .type == ConsumerGroupType .CONSUMER
202
-
203
- # Delete group
204
- perform_admin_operation_sync (admin_client .delete_consumer_groups , [consumer_group ], request_timeout = 10 )
205
-
206
190
207
191
def verify_describe_cluster (admin_client ):
208
192
desc = verify_provided_describe_for_authorized_operations (admin_client ,
0 commit comments