@@ -562,23 +562,60 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Non
562
562
# describe delegation_token protocol not yet implemented
563
563
# Note: send the request to the least_loaded_node()
564
564
565
- def describe_consumer_groups (self , group_ids ):
565
+ def describe_consumer_groups (self , group_ids , group_coordinator_id = None ):
566
566
"""Describe a set of consumer groups.
567
567
568
- :param group_ids: A list of consumer group id names
569
- :return: Appropriate version of DescribeGroupsResponse class
568
+ Any errors are immediately raised.
569
+
570
+ :param group_ids: A list of consumer group IDs. These are typically the
571
+ group names as strings.
572
+ :param group_coordinator_id: The node_id of the groups' coordinator
573
+ broker. If set to None, it will query the cluster for each group to
574
+ find that group's coordinator. Explicitly specifying this can be
575
+ useful for avoiding extra network round trips if you already know
576
+ the group coordinator. This is only useful when all the group_ids
577
+ have the same coordinator, otherwise it will error. Default: None.
578
+ :return: A list of group descriptions. For now the group descriptions
579
+ are the raw results from the DescribeGroupsResponse. Long-term, we
580
+ plan to change this to return namedtuples as well as decoding the
581
+ partition assignments.
570
582
"""
583
+ group_descriptions = []
571
584
version = self ._matching_api_version (DescribeGroupsRequest )
572
- if version <= 1 :
573
- request = DescribeGroupsRequest [version ](
574
- groups = group_ids
575
- )
576
- else :
577
- raise NotImplementedError (
578
- "Support for DescribeGroups v{} has not yet been added to KafkaAdmin."
579
- .format (version ))
580
- # TODO this is completely broken, as it needs to send to the group coordinator
581
- # return self._send(request)
585
+ for group_id in group_ids :
586
+ if group_coordinator_id is None :
587
+ this_groups_coordinator_id = self ._find_group_coordinator_id (group_id )
588
+ if version <= 1 :
589
+ # Note: KAFKA-6788 A potential optimization is to group the
590
+ # request per coordinator and send one request with a list of
591
+ # all consumer groups. Java still hasn't implemented this
592
+ # because the error checking is hard to get right when some
593
+ # groups error and others don't.
594
+ request = DescribeGroupsRequest [version ](groups = (group_id ,))
595
+ response = self ._send_request_to_node (this_groups_coordinator_id , request )
596
+ assert len (response .groups ) == 1
597
+ # TODO need to implement converting the response tuple into
598
+ # a more accessible interface like a namedtuple and then stop
599
+ # hardcoding tuple indices here. Several Java examples,
600
+ # including KafkaAdminClient.java
601
+ group_description = response .groups [0 ]
602
+ error_code = group_description [0 ]
603
+ error_type = Errors .for_code (error_code )
604
+ # Java has the note: KAFKA-6789, we can retry based on the error code
605
+ if error_type is not Errors .NoError :
606
+ raise error_type (
607
+ "Request '{}' failed with response '{}'."
608
+ .format (request , response ))
609
+ # TODO Java checks the group protocol type, and if consumer
610
+ # (ConsumerProtocol.PROTOCOL_TYPE) or empty string, it decodes
611
+ # the members' partition assignments... that hasn't yet been
612
+ # implemented here so just return the raw struct results
613
+ group_descriptions .append (group_description )
614
+ else :
615
+ raise NotImplementedError (
616
+ "Support for DescribeGroups v{} has not yet been added to KafkaAdmin."
617
+ .format (version ))
618
+ return group_descriptions
582
619
583
620
def list_consumer_groups (self ):
584
621
"""List all consumer groups known to the cluster.
0 commit comments