Skip to content

Use futures to parallelize calls to _send_request_to_node() #1807

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
May 21, 2019
Merged
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 80 additions & 36 deletions kafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,11 @@ def _refresh_controller_id(self):
version = self._matching_api_version(MetadataRequest)
if 1 <= version <= 6:
request = MetadataRequest[version]()
response = self._send_request_to_node(self._client.least_loaded_node(), request)
future = self._send_request_to_node(self._client.least_loaded_node(), request)

self.wait_for_futures([future])

response = future.value
controller_id = response.controller_id
# verify the controller is new enough to support our requests
controller_version = self._client.check_version(controller_id)
Expand Down Expand Up @@ -281,7 +285,11 @@ def _find_group_coordinator_id(self, group_id):
# When I experimented with this, GroupCoordinatorResponse_v1 didn't
# match GroupCoordinatorResponse_v0 and I couldn't figure out why.
gc_request = GroupCoordinatorRequest[0](group_id)
gc_response = self._send_request_to_node(self._client.least_loaded_node(), gc_request)
future = self._send_request_to_node(self._client.least_loaded_node(), gc_request)

self.wait_for_futures([future])

gc_response = future.value
# use the extra error checking in add_group_coordinator() rather than
# immediately returning the group coordinator.
success = self._client.cluster.add_group_coordinator(group_id, gc_response)
Expand Down Expand Up @@ -315,12 +323,8 @@ def _send_request_to_node(self, node_id, request):
# poll until the connection to broker is ready, otherwise send()
# will fail with NodeNotReadyError
self._client.poll()
future = self._client.send(node_id, request)
self._client.poll(future=future)
if future.succeeded():
return future.value
else:
raise future.exception # pylint: disable-msg=raising-bad-type
return self._client.send(node_id, request)


def _send_request_to_controller(self, request):
"""Send a Kafka protocol message to the cluster controller.
Expand All @@ -333,7 +337,11 @@ def _send_request_to_controller(self, request):
tries = 2 # in case our cached self._controller_id is outdated
while tries:
tries -= 1
response = self._send_request_to_node(self._controller_id, request)
future = self._send_request_to_node(self._controller_id, request)

self.wait_for_futures([future])

response = future.value
# In Java, the error fieldname is inconsistent:
# - CreateTopicsResponse / CreatePartitionsResponse uses topic_errors
# - DeleteTopicsResponse uses topic_error_codes
Expand Down Expand Up @@ -490,7 +498,11 @@ def describe_configs(self, config_resources, include_synonyms=False):
raise NotImplementedError(
"Support for DescribeConfigs v{} has not yet been added to KafkaAdminClient."
.format(version))
return self._send_request_to_node(self._client.least_loaded_node(), request)
future = self._send_request_to_node(self._client.least_loaded_node(), request)

self.wait_for_futures([future])

return future.value

@staticmethod
def _convert_alter_config_resource_request(config_resource):
Expand Down Expand Up @@ -529,7 +541,11 @@ def alter_configs(self, config_resources):
# // a single request that may be sent to any broker.
#
# So this is currently broken as it always sends to the least_loaded_node()
return self._send_request_to_node(self._client.least_loaded_node(), request)
future = self._send_request_to_node(self._client.least_loaded_node(), request)

self.wait_for_futures([future])

return future.value

# alter replica logs dir protocol not yet implemented
# Note: have to lookup the broker with the replica assignment and send the request to that broker
Expand Down Expand Up @@ -605,42 +621,49 @@ def describe_consumer_groups(self, group_ids, group_coordinator_id=None):
partition assignments.
"""
group_descriptions = []
futures = []
version = self._matching_api_version(DescribeGroupsRequest)
for group_id in group_ids:
if group_coordinator_id is not None:
this_groups_coordinator_id = group_coordinator_id
else:
this_groups_coordinator_id = self._find_group_coordinator_id(group_id)
if version <= 1:
if group_coordinator_id is not None:
this_groups_coordinator_id = group_coordinator_id
else:
this_groups_coordinator_id = self._find_group_coordinator_id(group_id)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason you moved this under the version check? This seems generic code that's currently needed regardless of the version.

Side note: _find_group_coordinator_id() is a blocking call so this could be further optimized into firing off a bunch of futures for all the groups and then handling the responses but that would require breaking _find_group_coordinator_id() into two methods to generate request/parse the response... that optimization is probably best handled in different PR. Especially because that wouldn't even be used when fetching offsets for all consumer groups since the group coordinator is explicitly passed in already... example: https://github.com/DataDog/integrations-core/pull/2730/files#diff-deed983f73a04522ac621d7f6d0c8403R244

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Figured it could be a minor optimization. We don't actually use this_groups_coordinator_id unless version <= 1 evaluates true, so why initialize it or call _find_group_coordinator_id() if we don't have to?

I'll peek at _find_group_coordinator_id() when I have a spare minute and see if I can optimize it. Agreed, something for another PR, but shouldn't be too bad.

# Note: KAFKA-6788 A potential optimization is to group the
# request per coordinator and send one request with a list of
# all consumer groups. Java still hasn't implemented this
# because the error checking is hard to get right when some
# groups error and others don't.
request = DescribeGroupsRequest[version](groups=(group_id,))
response = self._send_request_to_node(this_groups_coordinator_id, request)
assert len(response.groups) == 1
# TODO need to implement converting the response tuple into
# a more accessible interface like a namedtuple and then stop
# hardcoding tuple indices here. Several Java examples,
# including KafkaAdminClient.java
group_description = response.groups[0]
error_code = group_description[0]
error_type = Errors.for_code(error_code)
# Java has the note: KAFKA-6789, we can retry based on the error code
if error_type is not Errors.NoError:
raise error_type(
"Request '{}' failed with response '{}'."
.format(request, response))
# TODO Java checks the group protocol type, and if consumer
# (ConsumerProtocol.PROTOCOL_TYPE) or empty string, it decodes
# the members' partition assignments... that hasn't yet been
# implemented here so just return the raw struct results
group_descriptions.append(group_description)
futures.append(self._send_request_to_node(this_groups_coordinator_id, request))
else:
raise NotImplementedError(
"Support for DescribeGroups v{} has not yet been added to KafkaAdminClient."
.format(version))

self.wait_for_futures(futures)

for future in futures:
response = future.value
assert len(response.groups) == 1
# TODO need to implement converting the response tuple into
# a more accessible interface like a namedtuple and then stop
# hardcoding tuple indices here. Several Java examples,
# including KafkaAdminClient.java
group_description = response.groups[0]
error_code = group_description[0]
error_type = Errors.for_code(error_code)
# Java has the note: KAFKA-6789, we can retry based on the error code
if error_type is not Errors.NoError:
raise error_type(
"Request '{}' failed with response '{}'."
.format(request, response))
# TODO Java checks the group protocol type, and if consumer
# (ConsumerProtocol.PROTOCOL_TYPE) or empty string, it decodes
# the members' partition assignments... that hasn't yet been
# implemented here so just return the raw struct results
group_descriptions.append(group_description)

return group_descriptions

def list_consumer_groups(self, broker_ids=None):
Expand Down Expand Up @@ -673,13 +696,19 @@ def list_consumer_groups(self, broker_ids=None):
# consumer groups move to new brokers that haven't yet been queried,
# then the same group could be returned by multiple brokers.
consumer_groups = set()
futures = []
if broker_ids is None:
broker_ids = [broker.nodeId for broker in self._client.cluster.brokers()]
version = self._matching_api_version(ListGroupsRequest)
if version <= 2:
request = ListGroupsRequest[version]()
for broker_id in broker_ids:
response = self._send_request_to_node(broker_id, request)
futures.append(self._send_request_to_node(broker_id, request))

self.wait_for_futures(futures)

for future in futures:
response = future.value
error_type = Errors.for_code(response.error_code)
if error_type is not Errors.NoError:
raise error_type(
Expand Down Expand Up @@ -738,7 +767,12 @@ def list_consumer_group_offsets(self, group_id, group_coordinator_id=None,
topics_partitions_dict[topic].add(partition)
topics_partitions = list(six.iteritems(topics_partitions_dict))
request = OffsetFetchRequest[version](group_id, topics_partitions)
response = self._send_request_to_node(group_coordinator_id, request)
future = self._send_request_to_node(group_coordinator_id, request)

self.wait_for_futures([future])

response = future.value

if version > 1: # OffsetFetchResponse_v1 lacks a top-level error_code
error_type = Errors.for_code(response.error_code)
if error_type is not Errors.NoError:
Expand All @@ -764,3 +798,13 @@ def list_consumer_group_offsets(self, group_id, group_coordinator_id=None,

# delete groups protocol not yet implemented
# Note: send the request to the group's coordinator.

def wait_for_futures(self, futures):
while not all(future.succeeded() for future in futures):
for future in futures:
self._client.poll(future=future)

if future.failed():
raise future.exception # pylint: disable-msg=raising-bad-type

return True