19
19
from confluent_kafka .admin import (AclBinding , AclBindingFilter , ResourceType ,
20
20
ResourcePatternType , AclOperation , AclPermissionType )
21
21
from confluent_kafka .error import ConsumeError
22
- from confluent_kafka import ConsumerGroupState , TopicCollection
22
+ from confluent_kafka import ConsumerGroupState , TopicCollection , ConsumerGroupType
23
23
24
24
from tests .common import TestUtils
25
25
@@ -30,12 +30,16 @@ def verify_commit_result(err, _):
30
30
assert err is not None
31
31
32
32
33
- def consume_messages (sasl_cluster , group_id , topic , num_messages = None ):
33
+ def consume_messages (sasl_cluster , group_id , group_protocol , topic , num_messages = None ):
34
34
conf = {'group.id' : group_id ,
35
- 'session.timeout.ms ' : 6000 ,
35
+ 'group.protocol ' : group_protocol ,
36
36
'enable.auto.commit' : False ,
37
37
'on_commit' : verify_commit_result ,
38
38
'auto.offset.reset' : 'earliest' }
39
+
40
+ if group_protocol == 'classic' :
41
+ conf ['session.timeout.ms' ] = 6000
42
+
39
43
consumer = sasl_cluster .consumer (conf )
40
44
consumer .subscribe ([topic ])
41
45
read_messages = 0
@@ -164,7 +168,7 @@ def verify_describe_groups(cluster, admin_client, topic):
164
168
165
169
# Consume some messages for the group
166
170
group = 'test-group'
167
- consume_messages (cluster , group , topic , 2 )
171
+ consume_messages (cluster , group , 'classic' , topic , 2 )
168
172
169
173
# Verify Describe Consumer Groups
170
174
desc = verify_provided_describe_for_authorized_operations (admin_client ,
@@ -177,10 +181,29 @@ def verify_describe_groups(cluster, admin_client, topic):
177
181
assert group == desc .group_id
178
182
assert desc .is_simple_consumer_group is False
179
183
assert desc .state == ConsumerGroupState .EMPTY
184
+ assert desc .type == ConsumerGroupType .CLASSIC
180
185
181
186
# Delete group
182
187
perform_admin_operation_sync (admin_client .delete_consumer_groups , [group ], request_timeout = 10 )
183
188
189
+ consumer_group = 'test-group-consumer'
190
+
191
+ consume_messages (cluster , consumer_group , 'consumer' , topic , 2 )
192
+
193
+ desc = verify_provided_describe_for_authorized_operations (admin_client ,
194
+ admin_client .describe_consumer_groups ,
195
+ AclOperation .READ ,
196
+ AclOperation .DELETE ,
197
+ ResourceType .GROUP ,
198
+ consumer_group ,
199
+ [consumer_group ])
200
+ assert consumer_group == desc .group_id
201
+ assert desc .is_simple_consumer_group is False
202
+ assert desc .state == ConsumerGroupState .EMPTY
203
+ assert desc .type == ConsumerGroupType .CONSUMER
204
+
205
+ # Delete group
206
+ perform_admin_operation_sync (admin_client .delete_consumer_groups , [consumer_group ], request_timeout = 10 )
184
207
185
208
def verify_describe_cluster (admin_client ):
186
209
desc = verify_provided_describe_for_authorized_operations (admin_client ,
@@ -217,11 +240,7 @@ def test_describe_operations(sasl_cluster):
217
240
verify_describe_topics (admin_client , our_topic )
218
241
219
242
# Verify Authorized Operations in Describe Groups
220
- # Skip this test if using group protocol `consumer`
221
- # as there is new RPC for describe_groups() in
222
- # group protocol `consumer` case.
223
- if not TestUtils .use_group_protocol_consumer ():
224
- verify_describe_groups (sasl_cluster , admin_client , our_topic )
243
+ verify_describe_groups (sasl_cluster , admin_client , our_topic )
225
244
226
245
# Delete Topic
227
246
perform_admin_operation_sync (admin_client .delete_topics , [our_topic ], operation_timeout = 0 , request_timeout = 10 )
0 commit comments