Skip to content

Break FindGroupCoordinator into request/response methods #1871

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 48 additions & 32 deletions kafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,49 @@ def _refresh_controller_id(self):
"Kafka Admin interface cannot determine the controller using MetadataRequest_v{}."
.format(version))

def _find_group_coordinator_id(self, group_id):
def _find_coordinator_id_send_request(self, group_id):
"""Send a FindCoordinatorRequest to a broker.

:param group_id: The consumer group ID. This is typically the group
name as a string.
:return: A message future
"""
# TODO add support for dynamically picking version of
# GroupCoordinatorRequest which was renamed to FindCoordinatorRequest.
# When I experimented with this, the coordinator value returned in
# GroupCoordinatorResponse_v1 didn't match the value returned by
# GroupCoordinatorResponse_v0 and I couldn't figure out why.
version = 0
# version = self._matching_api_version(GroupCoordinatorRequest)
if version <= 0:
request = GroupCoordinatorRequest[version](group_id)
else:
raise NotImplementedError(
"Support for GroupCoordinatorRequest_v{} has not yet been added to KafkaAdminClient."
.format(version))
return self._send_request_to_node(self._client.least_loaded_node(), request)

def _find_coordinator_id_process_response(self, response):
"""Process a FindCoordinatorResponse.

:param response: a FindCoordinatorResponse.
:return: The node_id of the broker that is the coordinator.
"""
if response.API_VERSION <= 0:
error_type = Errors.for_code(response.error_code)
if error_type is not Errors.NoError:
# Note: When error_type.retriable, Java will retry... see
# KafkaAdminClient's handleFindCoordinatorError method
raise error_type(
"FindCoordinatorRequest failed with response '{}'."
.format(response))
else:
raise NotImplementedError(
"Support for FindCoordinatorRequest_v{} has not yet been added to KafkaAdminClient."
.format(response.API_VERSION))
return response.coordinator_id

def _find_coordinator_id(self, group_id):
"""Find the broker node_id of the coordinator of the given group.

Sends a FindCoordinatorRequest message to the cluster. Will block until
Expand All @@ -283,35 +325,10 @@ def _find_group_coordinator_id(self, group_id):
:return: The node_id of the broker that is the coordinator.
"""
# Note: Java may change how this is implemented in KAFKA-6791.
#
# TODO add support for dynamically picking version of
# GroupCoordinatorRequest which was renamed to FindCoordinatorRequest.
# When I experimented with this, GroupCoordinatorResponse_v1 didn't
# match GroupCoordinatorResponse_v0 and I couldn't figure out why.
gc_request = GroupCoordinatorRequest[0](group_id)
future = self._send_request_to_node(self._client.least_loaded_node(), gc_request)

future = self._find_coordinator_id_send_request(group_id)
self._wait_for_futures([future])

gc_response = future.value
# use the extra error checking in add_group_coordinator() rather than
# immediately returning the group coordinator.
success = self._client.cluster.add_group_coordinator(group_id, gc_response)
if not success:
error_type = Errors.for_code(gc_response.error_code)
assert error_type is not Errors.NoError
# Note: When error_type.retriable, Java will retry... see
# KafkaAdminClient's handleFindCoordinatorError method
raise error_type(
"Could not identify group coordinator for group_id '{}' from response '{}'."
.format(group_id, gc_response))
group_coordinator = self._client.cluster.coordinator_for_group(group_id)
# will be None if the coordinator was never populated, which should never happen here
assert group_coordinator is not None
# will be -1 if add_group_coordinator() failed... but by this point the
# error should have been raised.
assert group_coordinator != -1
return group_coordinator
response = future.value
return self._find_coordinator_id_process_response(response)

def _send_request_to_node(self, node_id, request):
"""Send a Kafka protocol message to a specific broker.
Expand All @@ -329,7 +346,6 @@ def _send_request_to_node(self, node_id, request):
self._client.poll()
return self._client.send(node_id, request)


def _send_request_to_controller(self, request):
"""Send a Kafka protocol message to the cluster controller.

Expand Down Expand Up @@ -678,7 +694,7 @@ def describe_consumer_groups(self, group_ids, group_coordinator_id=None):
if group_coordinator_id is not None:
this_groups_coordinator_id = group_coordinator_id
else:
this_groups_coordinator_id = self._find_group_coordinator_id(group_id)
this_groups_coordinator_id = self._find_coordinator_id(group_id)
f = self._describe_consumer_groups_send_request(group_id, this_groups_coordinator_id)
futures.append(f)

Expand Down Expand Up @@ -853,7 +869,7 @@ def list_consumer_group_offsets(self, group_id, group_coordinator_id=None,
explicitly specified.
"""
if group_coordinator_id is None:
group_coordinator_id = self._find_group_coordinator_id(group_id)
group_coordinator_id = self._find_coordinator_id(group_id)
future = self._list_consumer_group_offsets_send_request(
group_id, group_coordinator_id, partitions)
self._wait_for_futures([future])
Expand Down