Skip to content

Commit 1c0e894

Browse files
flaneur2020dpkp
authored andcommitted
set socket timeout for the wake_w (#1577)
1 parent 0a2ccba commit 1c0e894

File tree

2 files changed

+6
-0
lines changed

2 files changed

+6
-0
lines changed

kafka/client_async.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,7 @@ class KafkaClient(object):
154154
'bootstrap_topics_filter': set(),
155155
'client_id': 'kafka-python-' + __version__,
156156
'request_timeout_ms': 30000,
157+
'wakeup_timeout_ms': 3000,
157158
'connections_max_idle_ms': 9 * 60 * 1000,
158159
'reconnect_backoff_ms': 50,
159160
'reconnect_backoff_max_ms': 1000,
@@ -203,6 +204,7 @@ def __init__(self, **configs):
203204
self._bootstrap_fails = 0
204205
self._wake_r, self._wake_w = socket.socketpair()
205206
self._wake_r.setblocking(False)
207+
self._wake_w.settimeout(self.config['wakeup_timeout_ms'] / 1000.0)
206208
self._wake_lock = threading.Lock()
207209

208210
self._lock = threading.RLock()
@@ -871,6 +873,9 @@ def wakeup(self):
871873
with self._wake_lock:
872874
try:
873875
self._wake_w.sendall(b'x')
876+
except socket.timeout:
877+
log.warning('Timeout to send to wakeup socket!')
878+
raise Errors.KafkaTimeoutError()
874879
except socket.error:
875880
log.warning('Unable to send to wakeup socket!')
876881

kafka/producer/kafka.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -368,6 +368,7 @@ def __init__(self, **configs):
368368
self._metrics = Metrics(metric_config, reporters)
369369

370370
client = KafkaClient(metrics=self._metrics, metric_group_prefix='producer',
371+
wakeup_timeout_ms=self.config['max_block_ms'],
371372
**self.config)
372373

373374
# Get auto-discovered version from client if necessary

0 commit comments

Comments
 (0)