diff --git a/kafka/admin/kafka.py b/kafka/admin/kafka.py index 224a660be..05f28730f 100644 --- a/kafka/admin/kafka.py +++ b/kafka/admin/kafka.py @@ -575,20 +575,54 @@ def describe_consumer_groups(self, group_ids): # TODO this is completely broken, as it needs to send to the group coordinator # return self._send(request) - def list_consumer_groups(self): + def list_consumer_groups(self, broker_ids=None): """List all consumer groups known to the cluster. - :return: Appropriate version of ListGroupsResponse class + This returns a list of Consumer Group tuples. The tuples are + composed of the consumer group name and the consumer group protocol + type. + + Only consumer groups that store their offsets in Kafka are returned. + The protocol type will be an empty string for groups created using + Kafka < 0.9 APIs because, although they store their offsets in Kafka, + they don't use Kafka for group coordination. For groups created using + Kafka >= 0.9, the protocol type will typically be "consumer". + + As soon as any error is encountered, it is immediately raised. + + :param broker_ids: A list of broker node_ids to query for consumer + groups. If set to None, will query all brokers in the cluster. + Explicitly specifying broker(s) can be useful for determining which + consumer groups are coordinated by those broker(s). Default: None + :return list: List of tuples of Consumer Groups. + :exception GroupCoordinatorNotAvailableError: The coordinator is not + available, so cannot process requests. + :exception GroupLoadInProgressError: The coordinator is loading and + hence can't process requests. """ + # While we return a list, internally use a set to prevent duplicates + # because if a group coordinator fails after being queried, and its + # consumer groups move to new brokers that haven't yet been queried, + # then the same group could be returned by multiple brokers. + consumer_groups = set() + if broker_ids is None: + broker_ids = [broker.nodeId for broker in self._client.cluster.brokers()] version = self._matching_api_version(ListGroupsRequest) - if version <= 1: + if version <= 2: request = ListGroupsRequest[version]() + for broker_id in broker_ids: + response = self._send_request_to_node(broker_id, request) + error_type = Errors.for_code(response.error_code) + if error_type is not Errors.NoError: + raise error_type( + "Request '{}' failed with response '{}'." + .format(request, response)) + consumer_groups.update(response.groups) else: raise NotImplementedError( "Support for ListGroups v{} has not yet been added to KafkaAdmin." .format(version)) - # TODO this is completely broken, as it needs to send to the group coordinator - # return self._send(request) + return list(consumer_groups) def list_consumer_group_offsets(self, group_id, group_coordinator_id=None, partitions=None):