Skip to content

Commit 3b2f047

Browse files
committed
Fix list_consumer_groups() to query all brokers
Previously, this only queried the controller. In actuality, the Kafka protocol requires that the client query all brokers in order to get the full list of consumer groups. Note: The Java code (as best I can tell) doesn't allow limiting this to specific brokers. And on the surface, this makes sense... you typically don't care about specific brokers. However, the inverse is true... consumer groups care about knowing their group coordinator so they don't have to repeatedly query to find it. In fact, a Kafka broker will only return the groups that it's a coordinator for. While this is an implementation detail that is not guaranteed by the upstream broker code, and technically should not be relied upon, I think it very unlikely to change. So monitoring scripts that fetch the offsets or describe the consumers groups of all groups in the cluster can simply issue one call per broker to identify all the coordinators, rather than having to issue one call per consumer group. For an ad-hoc script this doesn't matter, but for a monitoring script that runs every couple of minutes, this can be a big deal. I know in the situations where I will use this, this matters more to me than the risk of the interface unexpectedly breaking.
1 parent d67157c commit 3b2f047

File tree

1 file changed

+41
-4
lines changed

1 file changed

+41
-4
lines changed

kafka/admin/kafka.py

Lines changed: 41 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
1+
12
from __future__ import absolute_import
23

34
import copy
45
import logging
56
import socket
67
from kafka.client_async import KafkaClient, selectors
8+
import kafka.errors as Errors
79
from kafka.errors import (
810
IncompatibleBrokerVersion, KafkaConfigurationError, KafkaConnectionError,
911
NodeNotReadyError, NotControllerError)
@@ -487,18 +489,53 @@ def describe_consumer_groups(self, group_ids):
487489
.format(version))
488490
return self._send(request)
489491

490-
def list_consumer_groups(self):
492+
def list_consumer_groups(self, broker_ids=None):
491493
"""List all consumer groups known to the cluster.
492494
493-
:return: Appropriate version of ListGroupsResponse class
495+
This returns a list of Consumer Group tuples. The tuples are
496+
composed of the consumer group name and the consumer group protocol
497+
type.
498+
499+
Only consumer groups that store their offsets in Kafka are returned.
500+
The protocol type will be an empty string for groups created using
501+
Kafka < 0.9 APIs because, although they store their offsets in Kafka,
502+
they don't use Kafka for group coordination. For groups created using
503+
Kafka >= 0.9, the protocol type will typically be "consumer".
504+
505+
As soon as any error is encountered, it is immediately raised.
506+
507+
:param broker_ids: A list of broker node_ids to query for consumer
508+
groups. If set to None, will query all brokers in the cluster.
509+
Explicitly specifying broker(s) can be useful for determining which
510+
consumer groups are coordinated by those broker(s). Default: None
511+
:return list: List of tuples of Consumer Groups.
512+
:exception GroupCoordinatorNotAvailableError: The coordinator is not
513+
available, so cannot process requests.
514+
:exception GroupLoadInProgressError: The coordinator is loading and
515+
hence can't process requests.
494516
"""
517+
# While we return a list, internally use a set to prevent duplicates
518+
# because if a group coordinator fails after being queried, and its
519+
# consumer groups move to new brokers that haven't yet been queried,
520+
# then the same group could be returned by multiple brokers.
521+
consumer_groups = set()
522+
if broker_ids is None:
523+
broker_ids = [broker.nodeId for broker in self._client.cluster.brokers()]
495524
version = self._matching_api_version(ListGroupsRequest)
496-
if version <= 1:
525+
if version <= 2:
497526
request = ListGroupsRequest[version]()
527+
for broker_id in broker_ids:
528+
response = self._send_request_to_node(broker_id, request)
529+
error_type = Errors.for_code(response.error_code)
530+
if error_type is not Errors.NoError:
531+
raise error_type(
532+
"Request '{}' failed with response '{}'."
533+
.format(request, response))
534+
consumer_groups.update(response.groups)
498535
else:
499536
raise NotImplementedError(
500537
"Support for ListGroups v{} has not yet been added to KafkaAdmin."
501538
.format(version))
502-
return self._send(request)
539+
return list(consumer_groups)
503540

504541
# delete groups protocol not implemented

0 commit comments

Comments
 (0)