Skip to content

Fix list_consumer_groups() to query all brokers #1635

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 18, 2018
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 39 additions & 5 deletions kafka/admin/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -575,20 +575,54 @@ def describe_consumer_groups(self, group_ids):
# TODO this is completely broken, as it needs to send to the group coordinator
# return self._send(request)

def list_consumer_groups(self):
def list_consumer_groups(self, broker_ids=None):
"""List all consumer groups known to the cluster.

:return: Appropriate version of ListGroupsResponse class
This returns a list of Consumer Group tuples. The tuples are
composed of the consumer group name and the consumer group protocol
type.

Only consumer groups that store their offsets in Kafka are returned.
The protocol type will be an empty string for groups created using
Kafka < 0.9 APIs because, although they store their offsets in Kafka,
they don't use Kafka for group coordination. For groups created using
Kafka >= 0.9, the protocol type will typically be "consumer".

As soon as any error is encountered, it is immediately raised.

:param broker_ids: A list of broker node_ids to query for consumer
groups. If set to None, will query all brokers in the cluster.
Explicitly specifying broker(s) can be useful for determining which
consumer groups are coordinated by those broker(s). Default: None
:return list: List of tuples of Consumer Groups.
:exception GroupCoordinatorNotAvailableError: The coordinator is not
available, so cannot process requests.
:exception GroupLoadInProgressError: The coordinator is loading and
hence can't process requests.
"""
# While we return a list, internally use a set to prevent duplicates
# because if a group coordinator fails after being queried, and its
# consumer groups move to new brokers that haven't yet been queried,
# then the same group could be returned by multiple brokers.
consumer_groups = set()
if broker_ids is None:
broker_ids = [broker.nodeId for broker in self._client.cluster.brokers()]
version = self._matching_api_version(ListGroupsRequest)
if version <= 1:
if version <= 2:
request = ListGroupsRequest[version]()
for broker_id in broker_ids:
response = self._send_request_to_node(broker_id, request)
error_type = Errors.for_code(response.error_code)
if error_type is not Errors.NoError:
raise error_type(
"Request '{}' failed with response '{}'."
.format(request, response))
consumer_groups.update(response.groups)
else:
raise NotImplementedError(
"Support for ListGroups v{} has not yet been added to KafkaAdmin."
.format(version))
# TODO this is completely broken, as it needs to send to the group coordinator
# return self._send(request)
return list(consumer_groups)

def list_consumer_group_offsets(self, group_id, group_coordinator_id=None,
partitions=None):
Expand Down