-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Use futures to parallelize calls to _send_request_to_node() #1807
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -249,7 +249,11 @@ def _refresh_controller_id(self): | |
version = self._matching_api_version(MetadataRequest) | ||
if 1 <= version <= 6: | ||
request = MetadataRequest[version]() | ||
response = self._send_request_to_node(self._client.least_loaded_node(), request) | ||
future = self._send_request_to_node(self._client.least_loaded_node(), request) | ||
|
||
self.wait_for_futures([future]) | ||
|
||
response = future.value | ||
controller_id = response.controller_id | ||
# verify the controller is new enough to support our requests | ||
controller_version = self._client.check_version(controller_id) | ||
|
@@ -281,7 +285,11 @@ def _find_group_coordinator_id(self, group_id): | |
# When I experimented with this, GroupCoordinatorResponse_v1 didn't | ||
# match GroupCoordinatorResponse_v0 and I couldn't figure out why. | ||
gc_request = GroupCoordinatorRequest[0](group_id) | ||
gc_response = self._send_request_to_node(self._client.least_loaded_node(), gc_request) | ||
future = self._send_request_to_node(self._client.least_loaded_node(), gc_request) | ||
|
||
self.wait_for_futures([future]) | ||
|
||
gc_response = future.value | ||
# use the extra error checking in add_group_coordinator() rather than | ||
# immediately returning the group coordinator. | ||
success = self._client.cluster.add_group_coordinator(group_id, gc_response) | ||
|
@@ -315,12 +323,8 @@ def _send_request_to_node(self, node_id, request): | |
# poll until the connection to broker is ready, otherwise send() | ||
# will fail with NodeNotReadyError | ||
self._client.poll() | ||
future = self._client.send(node_id, request) | ||
self._client.poll(future=future) | ||
if future.succeeded(): | ||
return future.value | ||
else: | ||
raise future.exception # pylint: disable-msg=raising-bad-type | ||
return self._client.send(node_id, request) | ||
|
||
|
||
def _send_request_to_controller(self, request): | ||
"""Send a Kafka protocol message to the cluster controller. | ||
|
@@ -333,7 +337,11 @@ def _send_request_to_controller(self, request): | |
tries = 2 # in case our cached self._controller_id is outdated | ||
while tries: | ||
tries -= 1 | ||
response = self._send_request_to_node(self._controller_id, request) | ||
future = self._send_request_to_node(self._controller_id, request) | ||
|
||
self.wait_for_futures([future]) | ||
|
||
response = future.value | ||
# In Java, the error fieldname is inconsistent: | ||
# - CreateTopicsResponse / CreatePartitionsResponse uses topic_errors | ||
# - DeleteTopicsResponse uses topic_error_codes | ||
|
@@ -490,7 +498,11 @@ def describe_configs(self, config_resources, include_synonyms=False): | |
raise NotImplementedError( | ||
"Support for DescribeConfigs v{} has not yet been added to KafkaAdminClient." | ||
.format(version)) | ||
return self._send_request_to_node(self._client.least_loaded_node(), request) | ||
future = self._send_request_to_node(self._client.least_loaded_node(), request) | ||
|
||
self.wait_for_futures([future]) | ||
|
||
return future.value | ||
|
||
@staticmethod | ||
def _convert_alter_config_resource_request(config_resource): | ||
|
@@ -529,7 +541,11 @@ def alter_configs(self, config_resources): | |
# // a single request that may be sent to any broker. | ||
# | ||
# So this is currently broken as it always sends to the least_loaded_node() | ||
return self._send_request_to_node(self._client.least_loaded_node(), request) | ||
future = self._send_request_to_node(self._client.least_loaded_node(), request) | ||
|
||
self.wait_for_futures([future]) | ||
|
||
return future.value | ||
|
||
# alter replica logs dir protocol not yet implemented | ||
# Note: have to lookup the broker with the replica assignment and send the request to that broker | ||
|
@@ -605,42 +621,49 @@ def describe_consumer_groups(self, group_ids, group_coordinator_id=None): | |
partition assignments. | ||
""" | ||
group_descriptions = [] | ||
futures = [] | ||
version = self._matching_api_version(DescribeGroupsRequest) | ||
for group_id in group_ids: | ||
if group_coordinator_id is not None: | ||
this_groups_coordinator_id = group_coordinator_id | ||
else: | ||
this_groups_coordinator_id = self._find_group_coordinator_id(group_id) | ||
if version <= 1: | ||
if group_coordinator_id is not None: | ||
this_groups_coordinator_id = group_coordinator_id | ||
else: | ||
this_groups_coordinator_id = self._find_group_coordinator_id(group_id) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there a reason you moved this under the version check? This seems generic code that's currently needed regardless of the version. Side note: There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Figured it could be a minor optimization. We don't actually use this_groups_coordinator_id unless version <= 1 evaluates true, so why initialize it or call _find_group_coordinator_id() if we don't have to? I'll peek at _find_group_coordinator_id() when I have a spare minute and see if I can optimize it. Agreed, something for another PR, but shouldn't be too bad. |
||
# Note: KAFKA-6788 A potential optimization is to group the | ||
# request per coordinator and send one request with a list of | ||
# all consumer groups. Java still hasn't implemented this | ||
# because the error checking is hard to get right when some | ||
# groups error and others don't. | ||
request = DescribeGroupsRequest[version](groups=(group_id,)) | ||
response = self._send_request_to_node(this_groups_coordinator_id, request) | ||
assert len(response.groups) == 1 | ||
# TODO need to implement converting the response tuple into | ||
# a more accessible interface like a namedtuple and then stop | ||
# hardcoding tuple indices here. Several Java examples, | ||
# including KafkaAdminClient.java | ||
group_description = response.groups[0] | ||
error_code = group_description[0] | ||
error_type = Errors.for_code(error_code) | ||
# Java has the note: KAFKA-6789, we can retry based on the error code | ||
if error_type is not Errors.NoError: | ||
raise error_type( | ||
"Request '{}' failed with response '{}'." | ||
.format(request, response)) | ||
# TODO Java checks the group protocol type, and if consumer | ||
# (ConsumerProtocol.PROTOCOL_TYPE) or empty string, it decodes | ||
# the members' partition assignments... that hasn't yet been | ||
# implemented here so just return the raw struct results | ||
group_descriptions.append(group_description) | ||
futures.append(self._send_request_to_node(this_groups_coordinator_id, request)) | ||
else: | ||
raise NotImplementedError( | ||
"Support for DescribeGroups v{} has not yet been added to KafkaAdminClient." | ||
.format(version)) | ||
|
||
self.wait_for_futures(futures) | ||
|
||
for future in futures: | ||
response = future.value | ||
assert len(response.groups) == 1 | ||
# TODO need to implement converting the response tuple into | ||
# a more accessible interface like a namedtuple and then stop | ||
# hardcoding tuple indices here. Several Java examples, | ||
# including KafkaAdminClient.java | ||
group_description = response.groups[0] | ||
error_code = group_description[0] | ||
error_type = Errors.for_code(error_code) | ||
# Java has the note: KAFKA-6789, we can retry based on the error code | ||
if error_type is not Errors.NoError: | ||
raise error_type( | ||
"Request '{}' failed with response '{}'." | ||
.format(request, response)) | ||
# TODO Java checks the group protocol type, and if consumer | ||
# (ConsumerProtocol.PROTOCOL_TYPE) or empty string, it decodes | ||
# the members' partition assignments... that hasn't yet been | ||
# implemented here so just return the raw struct results | ||
group_descriptions.append(group_description) | ||
|
||
return group_descriptions | ||
|
||
def list_consumer_groups(self, broker_ids=None): | ||
|
@@ -673,13 +696,19 @@ def list_consumer_groups(self, broker_ids=None): | |
# consumer groups move to new brokers that haven't yet been queried, | ||
# then the same group could be returned by multiple brokers. | ||
consumer_groups = set() | ||
futures = [] | ||
if broker_ids is None: | ||
broker_ids = [broker.nodeId for broker in self._client.cluster.brokers()] | ||
version = self._matching_api_version(ListGroupsRequest) | ||
if version <= 2: | ||
request = ListGroupsRequest[version]() | ||
for broker_id in broker_ids: | ||
response = self._send_request_to_node(broker_id, request) | ||
futures.append(self._send_request_to_node(broker_id, request)) | ||
|
||
self.wait_for_futures(futures) | ||
|
||
for future in futures: | ||
response = future.value | ||
error_type = Errors.for_code(response.error_code) | ||
if error_type is not Errors.NoError: | ||
raise error_type( | ||
|
@@ -738,7 +767,12 @@ def list_consumer_group_offsets(self, group_id, group_coordinator_id=None, | |
topics_partitions_dict[topic].add(partition) | ||
topics_partitions = list(six.iteritems(topics_partitions_dict)) | ||
request = OffsetFetchRequest[version](group_id, topics_partitions) | ||
response = self._send_request_to_node(group_coordinator_id, request) | ||
future = self._send_request_to_node(group_coordinator_id, request) | ||
|
||
self.wait_for_futures([future]) | ||
|
||
response = future.value | ||
|
||
jeffwidman marked this conversation as resolved.
Show resolved
Hide resolved
|
||
if version > 1: # OffsetFetchResponse_v1 lacks a top-level error_code | ||
error_type = Errors.for_code(response.error_code) | ||
if error_type is not Errors.NoError: | ||
|
@@ -764,3 +798,13 @@ def list_consumer_group_offsets(self, group_id, group_coordinator_id=None, | |
|
||
# delete groups protocol not yet implemented | ||
# Note: send the request to the group's coordinator. | ||
|
||
def wait_for_futures(self, futures): | ||
jeffwidman marked this conversation as resolved.
Show resolved
Hide resolved
|
||
while not all(future.succeeded() for future in futures): | ||
for future in futures: | ||
self._client.poll(future=future) | ||
|
||
if future.failed(): | ||
raise future.exception # pylint: disable-msg=raising-bad-type | ||
|
||
return True | ||
jeffwidman marked this conversation as resolved.
Show resolved
Hide resolved
|
Uh oh!
There was an error while loading. Please reload this page.