@@ -473,7 +473,7 @@ def delete_topics(self, topics, timeout_ms=None):
473
473
return response
474
474
475
475
476
- def _get_cluster_metadata (self , topics = None , auto_topic_creation = False ):
476
+ def _get_cluster_metadata (self , topics = None , auto_topic_creation = False , use_controller = False ):
477
477
"""
478
478
topics == None means "get all topics"
479
479
"""
@@ -492,10 +492,13 @@ def _get_cluster_metadata(self, topics=None, auto_topic_creation=False):
492
492
allow_auto_topic_creation = auto_topic_creation
493
493
)
494
494
495
- future = self ._send_request_to_node (
496
- self ._client .least_loaded_node (),
497
- request
498
- )
495
+ if use_controller :
496
+ future = self ._send_request_to_controller (request )
497
+ else :
498
+ future = self ._send_request_to_node (
499
+ self ._client .least_loaded_node (),
500
+ request
501
+ )
499
502
self ._wait_for_futures ([future ])
500
503
return future .value
501
504
@@ -505,7 +508,7 @@ def list_topics(self):
505
508
return [t ['topic' ] for t in obj ['topics' ]]
506
509
507
510
def describe_topics (self , topics = None ):
508
- metadata = self ._get_cluster_metadata (topics = topics )
511
+ metadata = self ._get_cluster_metadata (topics = topics , use_controller = True )
509
512
obj = metadata .to_object ()
510
513
return obj ['topics' ]
511
514
0 commit comments