Skip to content

Commit af2dd48

Browse files
authored
Maintain shadow cluster metadata for bootstrapping (#1753)
1 parent 0bc7518 commit af2dd48

File tree

2 files changed

+21
-35
lines changed

2 files changed

+21
-35
lines changed

kafka/client_async.py

Lines changed: 10 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -251,11 +251,7 @@ def _can_bootstrap(self):
251251

252252
def _can_connect(self, node_id):
253253
if node_id not in self._conns:
254-
# cluster.broker_metadata() is stateful when called w/ 'bootstrap'
255-
# (it cycles through all of the bootstrap servers)
256-
# so we short-circuit here and assume that we should always have
257-
# some bootstrap_servers config to power bootstrap broker_metadata
258-
if node_id == 'bootstrap' or self.cluster.broker_metadata(node_id):
254+
if self.cluster.broker_metadata(node_id):
259255
return True
260256
return False
261257
conn = self._conns[node_id]
@@ -272,7 +268,7 @@ def _conn_state_change(self, node_id, conn):
272268
except KeyError:
273269
self._selector.modify(conn._sock, selectors.EVENT_WRITE)
274270

275-
if node_id == 'bootstrap':
271+
if self.cluster.is_bootstrap(node_id):
276272
self._last_bootstrap = time.time()
277273

278274
elif conn.connected():
@@ -290,12 +286,13 @@ def _conn_state_change(self, node_id, conn):
290286

291287
self._idle_expiry_manager.update(node_id)
292288

293-
if node_id == 'bootstrap':
289+
if self.cluster.is_bootstrap(node_id):
294290
self._bootstrap_fails = 0
295291

296-
elif 'bootstrap' in self._conns:
297-
bootstrap = self._conns.pop('bootstrap')
298-
bootstrap.close()
292+
else:
293+
for node_id in list(self._conns.keys()):
294+
if self.cluster.is_bootstrap(node_id):
295+
self._conns.pop(node_id).close()
299296

300297
# Connection failures imply that our metadata is stale, so let's refresh
301298
elif conn.state is ConnectionStates.DISCONNECTING:
@@ -314,7 +311,7 @@ def _conn_state_change(self, node_id, conn):
314311
idle_disconnect = True
315312
self._idle_expiry_manager.remove(node_id)
316313

317-
if node_id == 'bootstrap':
314+
if self.cluster.is_bootstrap(node_id):
318315
self._bootstrap_fails += 1
319316

320317
elif self._refresh_on_disconnects and not self._closed and not idle_disconnect:
@@ -337,10 +334,6 @@ def _should_recycle_connection(self, conn):
337334
if not conn.disconnected():
338335
return False
339336

340-
# Always recycled disconnected bootstraps
341-
elif conn.node_id == 'bootstrap':
342-
return True
343-
344337
# Otherwise, only recycle when broker metadata has changed
345338
broker = self.cluster.broker_metadata(conn.node_id)
346339
if broker is None:
@@ -361,10 +354,6 @@ def _maybe_connect(self, node_id):
361354
conn = self._conns.get(node_id)
362355

363356
if conn is None:
364-
# Note that when bootstrapping, each call to broker_metadata may
365-
# return a different host/port. So we need to be careful to only
366-
# call when necessary to avoid skipping some possible bootstrap
367-
# source.
368357
broker = self.cluster.broker_metadata(node_id)
369358
assert broker, 'Broker id %s not in current metadata' % (node_id,)
370359

@@ -703,7 +692,7 @@ def least_loaded_node(self):
703692
in-flight-requests. If no such node is found, a node will be chosen
704693
randomly from disconnected nodes that are not "blacked out" (i.e.,
705694
are not subject to a reconnect backoff). If no node metadata has been
706-
obtained, will return 'bootstrap' (subject to exponential backoff).
695+
obtained, will return a bootstrap node (subject to exponential backoff).
707696
708697
Returns:
709698
node_id or None if no suitable node was found
@@ -730,10 +719,6 @@ def least_loaded_node(self):
730719
if found is not None:
731720
return found
732721

733-
elif not nodes and self._can_bootstrap():
734-
self._last_bootstrap = time.time()
735-
return 'bootstrap'
736-
737722
return None
738723

739724
def set_topics(self, topics):
@@ -791,7 +776,7 @@ def _maybe_refresh_metadata(self):
791776

792777
if self._can_send_request(node_id):
793778
topics = list(self._topics)
794-
if not topics and node_id == 'bootstrap':
779+
if not topics and self.cluster.is_bootstrap(node_id):
795780
topics = list(self.config['bootstrap_topics_filter'])
796781

797782
if self.cluster.need_all_topic_metadata or not topics:

kafka/cluster.py

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ class ClusterMetadata(object):
4040
DEFAULT_CONFIG = {
4141
'retry_backoff_ms': 100,
4242
'metadata_max_age_ms': 300000,
43-
'bootstrap_servers': 'localhost',
43+
'bootstrap_servers': [],
4444
}
4545

4646
def __init__(self, **configs):
@@ -70,18 +70,22 @@ def _generate_bootstrap_brokers(self):
7070
# collect_hosts does not perform DNS, so we should be fine to re-use
7171
bootstrap_hosts = collect_hosts(self.config['bootstrap_servers'])
7272

73-
while True:
74-
for host, port, afi in bootstrap_hosts:
75-
for _, __, ___, ____, sockaddr in dns_lookup(host, port, afi):
76-
yield BrokerMetadata('bootstrap', sockaddr[0], sockaddr[1], None)
73+
brokers = {}
74+
for i, (host, port, _) in enumerate(bootstrap_hosts):
75+
node_id = 'bootstrap-%s' % i
76+
brokers[node_id] = BrokerMetadata(node_id, host, port, None)
77+
return brokers
78+
79+
def is_bootstrap(self, node_id):
80+
return node_id in self._bootstrap_brokers
7781

7882
def brokers(self):
7983
"""Get all BrokerMetadata
8084
8185
Returns:
8286
set: {BrokerMetadata, ...}
8387
"""
84-
return set(self._brokers.values())
88+
return set(self._brokers.values()) or set(self._bootstrap_brokers.values())
8589

8690
def broker_metadata(self, broker_id):
8791
"""Get BrokerMetadata
@@ -92,10 +96,7 @@ def broker_metadata(self, broker_id):
9296
Returns:
9397
BrokerMetadata or None if not found
9498
"""
95-
if broker_id == 'bootstrap':
96-
return next(self._bootstrap_brokers)
97-
98-
return self._brokers.get(broker_id)
99+
return self._brokers.get(broker_id) or self._bootstrap_brokers.get(broker_id)
99100

100101
def partitions_for_topic(self, topic):
101102
"""Return set of all partitions for topic (whether available or not)

0 commit comments

Comments
 (0)