Skip to content

Merge _find_coordinator_id methods #2127

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 17, 2020
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 27 additions & 42 deletions kafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,43 +328,27 @@ def _find_coordinator_id_process_response(self, response):
.format(response.API_VERSION))
return response.coordinator_id

def _find_coordinator_id(self, group_id):
"""Find the broker node_id of the coordinator of the given group.

Sends a FindCoordinatorRequest message to the cluster. Will block until
the FindCoordinatorResponse is received. Any errors are immediately
raised.

:param group_id: The consumer group ID. This is typically the group
name as a string.
:return: The node_id of the broker that is the coordinator.
"""
future = self._find_coordinator_id_send_request(group_id)
self._wait_for_futures([future])
response = future.value
return self._find_coordinator_id_process_response(response)

def _find_many_coordinator_ids(self, group_ids):
"""Find the broker node_id of the coordinator for each of the given groups.
def _find_coordinator_ids(self, group_ids):
"""Find the broker node_ids of the coordinators of the given groups.

Sends a FindCoordinatorRequest message to the cluster for each group_id.
Will block until the FindCoordinatorResponse is received for all groups.
Any errors are immediately raised.

:param group_ids: A list of consumer group IDs. This is typically the group
name as a string.
:return: A list of tuples (group_id, node_id) where node_id is the id
of the broker that is the coordinator for the corresponding group.
:return: A dict of {group_id: node_id} where node_id is the id of the
broker that is the coordinator for the corresponding group.
"""
futures = {
groups_futures = {
group_id: self._find_coordinator_id_send_request(group_id)
for group_id in group_ids
}
self._wait_for_futures(list(futures.values()))
groups_coordinators = [
(group_id, self._find_coordinator_id_process_response(f.value))
for group_id, f in futures.items()
]
self._wait_for_futures(groups_futures.values())
groups_coordinators = {
group_id: self._find_coordinator_id_process_response(future.value)
for group_id, future in groups_futures.items()
}
return groups_coordinators

def _send_request_to_node(self, node_id, request):
Expand Down Expand Up @@ -1094,18 +1078,19 @@ def describe_consumer_groups(self, group_ids, group_coordinator_id=None, include
partition assignments.
"""
group_descriptions = []
futures = []
for group_id in group_ids:
if group_coordinator_id is not None:
this_groups_coordinator_id = group_coordinator_id
else:
this_groups_coordinator_id = self._find_coordinator_id(group_id)
f = self._describe_consumer_groups_send_request(

if group_coordinator_id is not None:
groups_coordinators = {group_id: group_coordinator_id for group_id in group_ids}
else:
groups_coordinators = self._find_coordinator_ids(group_ids)

futures = [
self._describe_consumer_groups_send_request(
group_id,
this_groups_coordinator_id,
coordinator_id,
include_authorized_operations)
futures.append(f)

for group_id, coordinator_id in groups_coordinators.items()
]
self._wait_for_futures(futures)

for future in futures:
Expand Down Expand Up @@ -1277,7 +1262,7 @@ def list_consumer_group_offsets(self, group_id, group_coordinator_id=None,
explicitly specified.
"""
if group_coordinator_id is None:
group_coordinator_id = self._find_coordinator_id(group_id)
group_coordinator_id = self._find_coordinator_ids([group_id])[group_id]
future = self._list_consumer_group_offsets_send_request(
group_id, group_coordinator_id, partitions)
self._wait_for_futures([future])
Expand Down Expand Up @@ -1305,12 +1290,12 @@ def delete_consumer_groups(self, group_ids, group_coordinator_id=None):
if group_coordinator_id is not None:
futures = [self._delete_consumer_groups_send_request(group_ids, group_coordinator_id)]
else:
groups_coordinators = defaultdict(list)
for group_id, group_coordinator_id in self._find_many_coordinator_ids(group_ids):
groups_coordinators[group_coordinator_id].append(group_id)
coordinators_groups = defaultdict(list)
for group_id, coordinator_id in self._find_coordinator_ids(group_ids).items():
coordinators_groups[coordinator_id].append(group_id)
futures = [
self._delete_consumer_groups_send_request(group_ids, group_coordinator_id)
for group_coordinator_id, group_ids in groups_coordinators.items()
self._delete_consumer_groups_send_request(group_ids, coordinator_id)
for coordinator_id, group_ids in coordinators_groups.items()
]

self._wait_for_futures(futures)
Expand Down