@@ -509,22 +509,60 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Non
509
509
510
510
# describe delegation_token protocol not implemented
511
511
512
- def describe_consumer_groups (self , group_ids ):
512
+ def describe_consumer_groups (self , group_ids , group_coordinator_id = None ):
513
513
"""Describe a set of consumer groups.
514
514
515
- :param group_ids: A list of consumer group id names
516
- :return: Appropriate version of DescribeGroupsResponse class
515
+ Any errors are immediately raised.
516
+
517
+ :param group_ids: A list of consumer group IDs. These are typically the
518
+ group names as strings.
519
+ :param group_coordinator_id: The node_id of the groups' coordinator
520
+ broker. If set to None, it will query the cluster for each group to
521
+ find that group's coordinator. Explicitly specifying this can be
522
+ useful for avoiding extra network round trips if you already know
523
+ the group coordinator. This is only useful when all the group_ids
524
+ have the same coordinator, otherwise it will error. Default: None.
525
+ :return: A list of group descriptions. For now the group descriptions
526
+ are the raw results from the DescribeGroupsResponse. Long-term, we
527
+ plan to change this to return namedtuples as well as decoding the
528
+ partition assignments.
517
529
"""
530
+ group_descriptions = []
518
531
version = self ._matching_api_version (DescribeGroupsRequest )
519
- if version <= 1 :
520
- request = DescribeGroupsRequest [version ](
521
- groups = group_ids
522
- )
523
- else :
524
- raise NotImplementedError (
525
- "Support for DescribeGroups v{} has not yet been added to KafkaAdmin."
526
- .format (version ))
527
- return self ._send (request )
532
+ for group_id in group_ids :
533
+ if group_coordinator_id is None :
534
+ this_groups_coordinator_id = self ._find_group_coordinator_id (group_id )
535
+ if version <= 1 :
536
+ # Note: KAFKA-6788 A potential optimization is to group the
537
+ # request per coordinator and send one request with a list of
538
+ # all consumer groups. Java still hasn't implemented this
539
+ # because the error checking is hard to get right when some
540
+ # groups error and others don't.
541
+ request = DescribeGroupsRequest [version ](groups = (group_id ,))
542
+ response = self ._send_request_to_node (this_groups_coordinator_id , request )
543
+ assert len (response .groups ) == 1
544
+ # TODO need to implement converting the response tuple into
545
+ # a more accessible interface like a namedtuple and then stop
546
+ # hardcoding tuple indices here. Several Java examples,
547
+ # including KafkaAdminClient.java
548
+ group_description = response .groups [0 ]
549
+ error_code = group_description [0 ]
550
+ error_type = Errors .for_code (error_code )
551
+ # Java has the note: KAFKA-6789, we can retry based on the error code
552
+ if error_type is not Errors .NoError :
553
+ raise error_type (
554
+ "Request '{}' failed with response '{}'."
555
+ .format (request , response ))
556
+ # TODO Java checks the group protocol type, and if consumer
557
+ # (ConsumerProtocol.PROTOCOL_TYPE) or empty string, it decodes
558
+ # the members' partition assignments... that hasn't yet been
559
+ # implemented here so just return the raw struct results
560
+ group_descriptions .append (group_description )
561
+ else :
562
+ raise NotImplementedError (
563
+ "Support for DescribeGroups v{} has not yet been added to KafkaAdmin."
564
+ .format (version ))
565
+ return group_descriptions
528
566
529
567
def list_consumer_groups (self ):
530
568
"""List all consumer groups known to the cluster.
0 commit comments