|
| 1 | + |
1 | 2 | from __future__ import absolute_import
|
2 | 3 |
|
3 | 4 | from collections import defaultdict
|
@@ -575,20 +576,54 @@ def describe_consumer_groups(self, group_ids):
|
575 | 576 | # TODO this is completely broken, as it needs to send to the group coordinator
|
576 | 577 | # return self._send(request)
|
577 | 578 |
|
578 |
| - def list_consumer_groups(self): |
| 579 | + def list_consumer_groups(self, broker_ids=None): |
579 | 580 | """List all consumer groups known to the cluster.
|
580 | 581 |
|
581 |
| - :return: Appropriate version of ListGroupsResponse class |
| 582 | + This returns a list of Consumer Group tuples. The tuples are |
| 583 | + composed of the consumer group name and the consumer group protocol |
| 584 | + type. |
| 585 | +
|
| 586 | + Only consumer groups that store their offsets in Kafka are returned. |
| 587 | + The protocol type will be an empty string for groups created using |
| 588 | + Kafka < 0.9 APIs because, although they store their offsets in Kafka, |
| 589 | + they don't use Kafka for group coordination. For groups created using |
| 590 | + Kafka >= 0.9, the protocol type will typically be "consumer". |
| 591 | +
|
| 592 | + As soon as any error is encountered, it is immediately raised. |
| 593 | +
|
| 594 | + :param broker_ids: A list of broker node_ids to query for consumer |
| 595 | + groups. If set to None, will query all brokers in the cluster. |
| 596 | + Explicitly specifying broker(s) can be useful for determining which |
| 597 | + consumer groups are coordinated by those broker(s). Default: None |
| 598 | + :return list: List of tuples of Consumer Groups. |
| 599 | + :exception GroupCoordinatorNotAvailableError: The coordinator is not |
| 600 | + available, so cannot process requests. |
| 601 | + :exception GroupLoadInProgressError: The coordinator is loading and |
| 602 | + hence can't process requests. |
582 | 603 | """
|
| 604 | + # While we return a list, internally use a set to prevent duplicates |
| 605 | + # because if a group coordinator fails after being queried, and its |
| 606 | + # consumer groups move to new brokers that haven't yet been queried, |
| 607 | + # then the same group could be returned by multiple brokers. |
| 608 | + consumer_groups = set() |
| 609 | + if broker_ids is None: |
| 610 | + broker_ids = [broker.nodeId for broker in self._client.cluster.brokers()] |
583 | 611 | version = self._matching_api_version(ListGroupsRequest)
|
584 |
| - if version <= 1: |
| 612 | + if version <= 2: |
585 | 613 | request = ListGroupsRequest[version]()
|
| 614 | + for broker_id in broker_ids: |
| 615 | + response = self._send_request_to_node(broker_id, request) |
| 616 | + error_type = Errors.for_code(response.error_code) |
| 617 | + if error_type is not Errors.NoError: |
| 618 | + raise error_type( |
| 619 | + "Request '{}' failed with response '{}'." |
| 620 | + .format(request, response)) |
| 621 | + consumer_groups.update(response.groups) |
586 | 622 | else:
|
587 | 623 | raise NotImplementedError(
|
588 | 624 | "Support for ListGroups v{} has not yet been added to KafkaAdmin."
|
589 | 625 | .format(version))
|
590 |
| - # TODO this is completely broken, as it needs to send to the group coordinator |
591 |
| - # return self._send(request) |
| 626 | + return list(consumer_groups) |
592 | 627 |
|
593 | 628 | def list_consumer_group_offsets(self, group_id, group_coordinator_id=None,
|
594 | 629 | partitions=None):
|
|
0 commit comments