Skip to content

Commit df8f4ff

Browse files
pecallejachandlernine
authored andcommitted
Fix initialization order in KafkaClient (dpkp#2119)
Fix initialization order in KafkaClient
1 parent c159f38 commit df8f4ff

File tree

1 file changed

+6
-3
lines changed

1 file changed

+6
-3
lines changed

kafka/client_async.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -200,18 +200,22 @@ def __init__(self, **configs):
200200
if key in configs:
201201
self.config[key] = configs[key]
202202

203+
# these properties need to be set on top of the initialization pipeline
204+
# because they are used when __del__ method is called
205+
self._closed = False
206+
self._wake_r, self._wake_w = socket.socketpair()
207+
self._selector = self.config['selector']()
208+
203209
self.cluster = ClusterMetadata(**self.config)
204210
self._topics = set() # empty set will fetch all topic metadata
205211
self._metadata_refresh_in_progress = False
206-
self._selector = self.config['selector']()
207212
self._conns = Dict() # object to support weakrefs
208213
self._api_versions = None
209214
self._connecting = set()
210215
self._sending = set()
211216
self._refresh_on_disconnects = True
212217
self._last_bootstrap = 0
213218
self._bootstrap_fails = 0
214-
self._wake_r, self._wake_w = socket.socketpair()
215219
self._wake_r.setblocking(False)
216220
self._wake_w.settimeout(self.config['wakeup_timeout_ms'] / 1000.0)
217221
self._wake_lock = threading.Lock()
@@ -225,7 +229,6 @@ def __init__(self, **configs):
225229

226230
self._selector.register(self._wake_r, selectors.EVENT_READ)
227231
self._idle_expiry_manager = IdleConnectionManager(self.config['connections_max_idle_ms'])
228-
self._closed = False
229232
self._sensors = None
230233
if self.config['metrics']:
231234
self._sensors = KafkaClientMetrics(self.config['metrics'],

0 commit comments

Comments
 (0)