Skip to content

Commit 1193783

Browse files
committed
Maintain shadow bootstrap cluster metadata
1 parent ee4a53e commit 1193783

File tree

2 files changed

+20
-34
lines changed

2 files changed

+20
-34
lines changed

kafka/client_async.py

Lines changed: 10 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -245,11 +245,7 @@ def _can_bootstrap(self):
245245

246246
def _can_connect(self, node_id):
247247
if node_id not in self._conns:
248-
# cluster.broker_metadata() is stateful when called w/ 'bootstrap'
249-
# (it cycles through all of the bootstrap servers)
250-
# so we short-circuit here and assume that we should always have
251-
# some bootstrap_servers config to power bootstrap broker_metadata
252-
if node_id == 'bootstrap' or self.cluster.broker_metadata(node_id):
248+
if self.cluster.broker_metadata(node_id):
253249
return True
254250
return False
255251
conn = self._conns[node_id]
@@ -266,7 +262,7 @@ def _conn_state_change(self, node_id, conn):
266262
except KeyError:
267263
self._selector.modify(conn._sock, selectors.EVENT_WRITE)
268264

269-
if node_id == 'bootstrap':
265+
if self.cluster.is_bootstrap(node_id):
270266
self._last_bootstrap = time.time()
271267

272268
elif conn.connected():
@@ -284,12 +280,13 @@ def _conn_state_change(self, node_id, conn):
284280

285281
self._idle_expiry_manager.update(node_id)
286282

287-
if node_id == 'bootstrap':
283+
if self.cluster.is_bootstrap(node_id):
288284
self._bootstrap_fails = 0
289285

290-
elif 'bootstrap' in self._conns:
291-
bootstrap = self._conns.pop('bootstrap')
292-
bootstrap.close()
286+
else:
287+
for node_id in list(self._conns.keys()):
288+
if self.cluster.is_bootstrap(node_id):
289+
self._conns.pop(node_id).close()
293290

294291
# Connection failures imply that our metadata is stale, so let's refresh
295292
elif conn.state is ConnectionStates.DISCONNECTING:
@@ -308,7 +305,7 @@ def _conn_state_change(self, node_id, conn):
308305
idle_disconnect = True
309306
self._idle_expiry_manager.remove(node_id)
310307

311-
if node_id == 'bootstrap':
308+
if self.cluster.is_bootstrap(node_id):
312309
self._bootstrap_fails += 1
313310

314311
elif self._refresh_on_disconnects and not self._closed and not idle_disconnect:
@@ -331,10 +328,6 @@ def _should_recycle_connection(self, conn):
331328
if not conn.disconnected():
332329
return False
333330

334-
# Always recycled disconnected bootstraps
335-
elif conn.node_id == 'bootstrap':
336-
return True
337-
338331
# Otherwise, only recycle when broker metadata has changed
339332
broker = self.cluster.broker_metadata(conn.node_id)
340333
if broker is None:
@@ -355,10 +348,6 @@ def _maybe_connect(self, node_id):
355348
conn = self._conns.get(node_id)
356349

357350
if conn is None:
358-
# Note that when bootstrapping, each call to broker_metadata may
359-
# return a different host/port. So we need to be careful to only
360-
# call when necessary to avoid skipping some possible bootstrap
361-
# source.
362351
broker = self.cluster.broker_metadata(node_id)
363352
assert broker, 'Broker id %s not in current metadata' % (node_id,)
364353

@@ -697,7 +686,7 @@ def least_loaded_node(self):
697686
in-flight-requests. If no such node is found, a node will be chosen
698687
randomly from disconnected nodes that are not "blacked out" (i.e.,
699688
are not subject to a reconnect backoff). If no node metadata has been
700-
obtained, will return 'bootstrap' (subject to exponential backoff).
689+
obtained, will return a bootstrap node (subject to exponential backoff).
701690
702691
Returns:
703692
node_id or None if no suitable node was found
@@ -724,10 +713,6 @@ def least_loaded_node(self):
724713
if found is not None:
725714
return found
726715

727-
elif not nodes and self._can_bootstrap():
728-
self._last_bootstrap = time.time()
729-
return 'bootstrap'
730-
731716
return None
732717

733718
def set_topics(self, topics):
@@ -785,7 +770,7 @@ def _maybe_refresh_metadata(self):
785770

786771
if self._can_send_request(node_id):
787772
topics = list(self._topics)
788-
if not topics and node_id == 'bootstrap':
773+
if not topics and self.cluster.is_bootstrap(node_id):
789774
topics = list(self.config['bootstrap_topics_filter'])
790775

791776
if self.cluster.need_all_topic_metadata or not topics:

kafka/cluster.py

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -70,18 +70,22 @@ def _generate_bootstrap_brokers(self):
7070
# collect_hosts does not perform DNS, so we should be fine to re-use
7171
bootstrap_hosts = collect_hosts(self.config['bootstrap_servers'])
7272

73-
while True:
74-
for host, port, afi in bootstrap_hosts:
75-
for _, __, ___, ____, sockaddr in dns_lookup(host, port, afi):
76-
yield BrokerMetadata('bootstrap', sockaddr[0], sockaddr[1], None)
73+
brokers = {}
74+
for i, (host, port, _) in enumerate(bootstrap_hosts):
75+
node_id = 'bootstrap-%s' % i
76+
brokers[node_id] = BrokerMetadata(node_id, host, port, None)
77+
return brokers
78+
79+
def is_bootstrap(self, node_id):
80+
return node_id in self._bootstrap_brokers
7781

7882
def brokers(self):
7983
"""Get all BrokerMetadata
8084
8185
Returns:
8286
set: {BrokerMetadata, ...}
8387
"""
84-
return set(self._brokers.values())
88+
return set(self._brokers.values()) or set(self._bootstrap_brokers.values())
8589

8690
def broker_metadata(self, broker_id):
8791
"""Get BrokerMetadata
@@ -92,10 +96,7 @@ def broker_metadata(self, broker_id):
9296
Returns:
9397
BrokerMetadata or None if not found
9498
"""
95-
if broker_id == 'bootstrap':
96-
return next(self._bootstrap_brokers)
97-
98-
return self._brokers.get(broker_id)
99+
return self._brokers.get(broker_id) or self._bootstrap_brokers.get(broker_id)
99100

100101
def partitions_for_topic(self, topic):
101102
"""Return set of all partitions for topic (whether available or not)

0 commit comments

Comments
 (0)