Skip to content

KAFKA-8962: Use least_loaded_node() for describe_topics() #2000

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 7 additions & 15 deletions kafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,16 +376,11 @@ def _send_request_to_controller(self, request):
# In Java, the error field name is inconsistent:
# - CreateTopicsResponse / CreatePartitionsResponse uses topic_errors
# - DeleteTopicsResponse uses topic_error_codes
# - MetadataResponse uses topics[].error_code
topic_error_tuples = []
if hasattr(response, 'topic_errors'):
topic_error_tuples.extend(response.topic_errors)
elif hasattr(response, 'topic_error_codes'):
topic_error_tuples.extend(response.topic_error_codes)
elif hasattr(response, 'topics'):
for topic in response.topics:
if hasattr(topic, 'topic') and hasattr(topic, 'error_code'):
topic_error_tuples.append((topic.topic, topic.error_code))
# So this is a little brittle in that it assumes all responses have
# one of these attributes and that they always unpack into
# (topic, error_code) tuples.
topic_error_tuples = (response.topic_errors if hasattr(response, 'topic_errors')
else response.topic_error_codes)
# Also small py2/py3 compatibility -- py3 can ignore extra values
# during unpack via: for x, y, *rest in list_of_values. py2 cannot.
# So for now we have to map across the list and explicitly drop any
Expand Down Expand Up @@ -478,7 +473,7 @@ def delete_topics(self, topics, timeout_ms=None):
return response


def _get_cluster_metadata(self, topics=None, auto_topic_creation=False, use_controller=False):
def _get_cluster_metadata(self, topics=None, auto_topic_creation=False):
"""
topics == None means "get all topics"
"""
Expand All @@ -497,9 +492,6 @@ def _get_cluster_metadata(self, topics=None, auto_topic_creation=False, use_cont
allow_auto_topic_creation=auto_topic_creation
)

if use_controller:
return self._send_request_to_controller(request)

future = self._send_request_to_node(
self._client.least_loaded_node(),
request
Expand All @@ -513,7 +505,7 @@ def list_topics(self):
return [t['topic'] for t in obj['topics']]

def describe_topics(self, topics=None):
metadata = self._get_cluster_metadata(topics=topics, use_controller=True)
metadata = self._get_cluster_metadata(topics=topics)
obj = metadata.to_object()
return obj['topics']

Expand Down