Skip to content

Commit 57ccb4d

Browse files
committed
Merge _find_coordinator_id methods
Previously there were two methods: * `_find_coordinator_id()` * `_find_many_coordinator_ids()` But they do basically the same thing internally. And we need the plural two places, but the singular only one place. So merge them, and change the function signature to take a list of `group_ids` and return a dict of `group_id: coordinator_id`s. As a result of this, the `describe_groups()` command should scale better because the `_find_coordinator_ids()` command issues all the requests async, instead of sequentially blocking as the `described_groups()` used to do.
1 parent 6cfe706 commit 57ccb4d

File tree

1 file changed

+27
-42
lines changed

1 file changed

+27
-42
lines changed

kafka/admin/client.py

Lines changed: 27 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -328,43 +328,27 @@ def _find_coordinator_id_process_response(self, response):
328328
.format(response.API_VERSION))
329329
return response.coordinator_id
330330

331-
def _find_coordinator_id(self, group_id):
332-
"""Find the broker node_id of the coordinator of the given group.
333-
334-
Sends a FindCoordinatorRequest message to the cluster. Will block until
335-
the FindCoordinatorResponse is received. Any errors are immediately
336-
raised.
337-
338-
:param group_id: The consumer group ID. This is typically the group
339-
name as a string.
340-
:return: The node_id of the broker that is the coordinator.
341-
"""
342-
future = self._find_coordinator_id_send_request(group_id)
343-
self._wait_for_futures([future])
344-
response = future.value
345-
return self._find_coordinator_id_process_response(response)
346-
347-
def _find_many_coordinator_ids(self, group_ids):
348-
"""Find the broker node_id of the coordinator for each of the given groups.
331+
def _find_coordinator_ids(self, group_ids):
332+
"""Find the broker node_ids of the coordinators of the given groups.
349333
350334
Sends a FindCoordinatorRequest message to the cluster for each group_id.
351335
Will block until the FindCoordinatorResponse is received for all groups.
352336
Any errors are immediately raised.
353337
354338
:param group_ids: A list of consumer group IDs. This is typically the group
355339
name as a string.
356-
:return: A list of tuples (group_id, node_id) where node_id is the id
357-
of the broker that is the coordinator for the corresponding group.
340+
:return: A dict of {group_id: node_id} where node_id is the id of the
341+
broker that is the coordinator for the corresponding group.
358342
"""
359-
futures = {
343+
groups_futures = {
360344
group_id: self._find_coordinator_id_send_request(group_id)
361345
for group_id in group_ids
362346
}
363-
self._wait_for_futures(list(futures.values()))
364-
groups_coordinators = [
365-
(group_id, self._find_coordinator_id_process_response(f.value))
366-
for group_id, f in futures.items()
367-
]
347+
self._wait_for_futures(groups_futures.values())
348+
groups_coordinators = {
349+
group_id: self._find_coordinator_id_process_response(future.value)
350+
for group_id, future in groups_futures.items()
351+
}
368352
return groups_coordinators
369353

370354
def _send_request_to_node(self, node_id, request):
@@ -1094,18 +1078,19 @@ def describe_consumer_groups(self, group_ids, group_coordinator_id=None, include
10941078
partition assignments.
10951079
"""
10961080
group_descriptions = []
1097-
futures = []
1098-
for group_id in group_ids:
1099-
if group_coordinator_id is not None:
1100-
this_groups_coordinator_id = group_coordinator_id
1101-
else:
1102-
this_groups_coordinator_id = self._find_coordinator_id(group_id)
1103-
f = self._describe_consumer_groups_send_request(
1081+
1082+
if group_coordinator_id is not None:
1083+
groups_coordinators = {group_id: group_coordinator_id for group_id in group_ids}
1084+
else:
1085+
groups_coordinators = self.groups_coordinators(group_ids)
1086+
1087+
futures = [
1088+
self._describe_consumer_groups_send_request(
11041089
group_id,
1105-
this_groups_coordinator_id,
1090+
coordinator_id,
11061091
include_authorized_operations)
1107-
futures.append(f)
1108-
1092+
for group_id, coordinator_id in groups_coordinators.items()
1093+
]
11091094
self._wait_for_futures(futures)
11101095

11111096
for future in futures:
@@ -1277,7 +1262,7 @@ def list_consumer_group_offsets(self, group_id, group_coordinator_id=None,
12771262
explicitly specified.
12781263
"""
12791264
if group_coordinator_id is None:
1280-
group_coordinator_id = self._find_coordinator_id(group_id)
1265+
group_coordinator_id = self._find_coordinator_ids([group_id])[group_id]
12811266
future = self._list_consumer_group_offsets_send_request(
12821267
group_id, group_coordinator_id, partitions)
12831268
self._wait_for_futures([future])
@@ -1305,12 +1290,12 @@ def delete_consumer_groups(self, group_ids, group_coordinator_id=None):
13051290
if group_coordinator_id is not None:
13061291
futures = [self._delete_consumer_groups_send_request(group_ids, group_coordinator_id)]
13071292
else:
1308-
groups_coordinators = defaultdict(list)
1309-
for group_id, group_coordinator_id in self._find_many_coordinator_ids(group_ids):
1310-
groups_coordinators[group_coordinator_id].append(group_id)
1293+
coordinators_groups = defaultdict(list)
1294+
for group_id, coordinator_id in self._find_coordinator_ids(group_ids).items():
1295+
coordinators_groups[coordinator_id].append(group_id)
13111296
futures = [
1312-
self._delete_consumer_groups_send_request(group_ids, group_coordinator_id)
1313-
for group_coordinator_id, group_ids in groups_coordinators.items()
1297+
self._delete_consumer_groups_send_request(group_ids, coordinator_id)
1298+
for coordinator_id, group_ids in coordinators_groups.items()
13141299
]
13151300

13161301
self._wait_for_futures(futures)

0 commit comments

Comments
 (0)