Skip to content

Commit 25cfdd4

Browse files
dpkp88manpreet
authored andcommitted
Timeout idle connections via connections_max_idle_ms (dpkp#1068)
1 parent b42540a commit 25cfdd4

File tree

4 files changed

+135
-6
lines changed

4 files changed

+135
-6
lines changed

kafka/client_async.py

Lines changed: 94 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ class KafkaClient(object):
135135
'bootstrap_servers': 'localhost',
136136
'client_id': 'kafka-python-' + __version__,
137137
'request_timeout_ms': 40000,
138+
'connections_max_idle_ms': 9 * 60 * 1000,
138139
'reconnect_backoff_ms': 50,
139140
'max_in_flight_requests_per_connection': 5,
140141
'receive_buffer_bytes': None,
@@ -194,6 +195,7 @@ def __init__(self, **configs):
194195
self._wake_r.setblocking(False)
195196
self._wake_lock = threading.Lock()
196197
self._selector.register(self._wake_r, selectors.EVENT_READ)
198+
self._idle_expiry_manager = IdleConnectionManager(self.config['connections_max_idle_ms'])
197199
self._closed = False
198200
self._sensors = None
199201
if self.config['metrics']:
@@ -291,6 +293,8 @@ def _conn_state_change(self, node_id, conn):
291293
if self._sensors:
292294
self._sensors.connection_created.record()
293295

296+
self._idle_expiry_manager.update(node_id)
297+
294298
if 'bootstrap' in self._conns and node_id != 'bootstrap':
295299
bootstrap = self._conns.pop('bootstrap')
296300
# XXX: make conn.close() require error to cause refresh
@@ -308,7 +312,13 @@ def _conn_state_change(self, node_id, conn):
308312
pass
309313
if self._sensors:
310314
self._sensors.connection_closed.record()
311-
if self._refresh_on_disconnects and not self._closed:
315+
316+
idle_disconnect = False
317+
if self._idle_expiry_manager.is_expired(node_id):
318+
idle_disconnect = True
319+
self._idle_expiry_manager.remove(node_id)
320+
321+
if self._refresh_on_disconnects and not self._closed and not idle_disconnect:
312322
log.warning("Node %s connection failed -- refreshing metadata", node_id)
313323
self.cluster.request_update()
314324

@@ -513,10 +523,12 @@ def poll(self, timeout_ms=None, future=None, sleep=True):
513523
if future and future.is_done:
514524
timeout = 0
515525
else:
526+
idle_connection_timeout_ms = self._idle_expiry_manager.next_check_ms()
516527
timeout = min(
517528
timeout_ms,
518529
metadata_timeout_ms,
519530
self._delayed_tasks.next_at() * 1000,
531+
idle_connection_timeout_ms,
520532
self.config['request_timeout_ms'])
521533
timeout = max(0, timeout / 1000.0) # avoid negative timeouts
522534

@@ -571,6 +583,8 @@ def _poll(self, timeout, sleep=True):
571583
conn.close(Errors.ConnectionError('Socket EVENT_READ without in-flight-requests'))
572584
continue
573585

586+
self._idle_expiry_manager.update(conn.node_id)
587+
574588
# Accumulate as many responses as the connection has pending
575589
while conn.in_flight_requests:
576590
response = conn.recv() # Note: conn.recv runs callbacks / errbacks
@@ -592,6 +606,7 @@ def _poll(self, timeout, sleep=True):
592606

593607
if self._sensors:
594608
self._sensors.io_time.record((time.time() - end_select) * 1000000000)
609+
self._maybe_close_oldest_connection()
595610
return responses
596611

597612
def in_flight_request_count(self, node_id=None):
@@ -844,6 +859,14 @@ def _clear_wake_fd(self):
844859
except socket.error:
845860
break
846861

862+
def _maybe_close_oldest_connection(self):
863+
expired_connection = self._idle_expiry_manager.poll_expired_connection()
864+
if expired_connection:
865+
conn_id, ts = expired_connection
866+
idle_ms = (time.time() - ts) * 1000
867+
log.info('Closing idle connection %s, last active %d ms ago', conn_id, idle_ms)
868+
self.close(node_id=conn_id)
869+
847870

848871
class DelayedTaskQueue(object):
849872
# see https://docs.python.org/2/library/heapq.html
@@ -918,6 +941,76 @@ def pop_ready(self):
918941
return ready_tasks
919942

920943

944+
# OrderedDict requires python2.7+
945+
try:
946+
from collections import OrderedDict
947+
except ImportError:
948+
# If we dont have OrderedDict, we'll fallback to dict with O(n) priority reads
949+
OrderedDict = dict
950+
951+
952+
class IdleConnectionManager(object):
953+
def __init__(self, connections_max_idle_ms):
954+
if connections_max_idle_ms > 0:
955+
self.connections_max_idle = connections_max_idle_ms / 1000
956+
else:
957+
self.connections_max_idle = float('inf')
958+
self.next_idle_close_check_time = None
959+
self.update_next_idle_close_check_time(time.time())
960+
self.lru_connections = OrderedDict()
961+
962+
def update(self, conn_id):
963+
# order should reflect last-update
964+
if conn_id in self.lru_connections:
965+
del self.lru_connections[conn_id]
966+
self.lru_connections[conn_id] = time.time()
967+
968+
def remove(self, conn_id):
969+
if conn_id in self.lru_connections:
970+
del self.lru_connections[conn_id]
971+
972+
def is_expired(self, conn_id):
973+
if conn_id not in self.lru_connections:
974+
return None
975+
return time.time() >= self.lru_connections[conn_id] + self.connections_max_idle
976+
977+
def next_check_ms(self):
978+
now = time.time()
979+
if not self.lru_connections:
980+
return float('inf')
981+
elif self.next_idle_close_check_time <= now:
982+
return 0
983+
else:
984+
return int((self.next_idle_close_check_time - now) * 1000)
985+
986+
def update_next_idle_close_check_time(self, ts):
987+
self.next_idle_close_check_time = ts + self.connections_max_idle
988+
989+
def poll_expired_connection(self):
990+
if time.time() < self.next_idle_close_check_time:
991+
return None
992+
993+
if not len(self.lru_connections):
994+
return None
995+
996+
oldest_conn_id = None
997+
oldest_ts = None
998+
if OrderedDict is dict:
999+
for conn_id, ts in self.lru_connections.items():
1000+
if oldest_conn_id is None or ts < oldest_ts:
1001+
oldest_conn_id = conn_id
1002+
oldest_ts = ts
1003+
else:
1004+
(oldest_conn_id, oldest_ts) = next(iter(self.lru_connections.items()))
1005+
1006+
self.update_next_idle_close_check_time(oldest_ts)
1007+
1008+
if time.time() >= oldest_ts + self.connections_max_idle:
1009+
return (oldest_conn_id, oldest_ts)
1010+
else:
1011+
return None
1012+
1013+
9211014
class KafkaClientMetrics(object):
9221015
def __init__(self, metrics, metric_group_prefix, conns):
9231016
self.metrics = metrics

kafka/conn.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,6 +174,8 @@ def __init__(self, host, port, afi, **configs):
174174
if key in configs:
175175
self.config[key] = configs[key]
176176

177+
self.node_id = self.config.pop('node_id')
178+
177179
if self.config['receive_buffer_bytes'] is not None:
178180
self.config['socket_options'].append(
179181
(socket.SOL_SOCKET, socket.SO_RCVBUF,
@@ -211,7 +213,7 @@ def __init__(self, host, port, afi, **configs):
211213
if self.config['metrics']:
212214
self._sensors = BrokerConnectionMetrics(self.config['metrics'],
213215
self.config['metric_group_prefix'],
214-
self.config['node_id'])
216+
self.node_id)
215217

216218
def connect(self):
217219
"""Attempt to connect and return ConnectionState"""
@@ -900,7 +902,7 @@ def connect():
900902

901903
def __repr__(self):
902904
return "<BrokerConnection node_id=%s host=%s/%s port=%d>" % (
903-
self.config['node_id'], self.hostname, self.host, self.port)
905+
self.node_id, self.hostname, self.host, self.port)
904906

905907

906908
class BrokerConnectionMetrics(object):

kafka/producer/kafka.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -265,7 +265,7 @@ class KafkaProducer(object):
265265
'linger_ms': 0,
266266
'partitioner': DefaultPartitioner(),
267267
'buffer_memory': 33554432,
268-
'connections_max_idle_ms': 600000, # not implemented yet
268+
'connections_max_idle_ms': 9 * 60 * 1000, # not implemented yet
269269
'max_block_ms': 60000,
270270
'max_request_size': 1048576,
271271
'metadata_max_age_ms': 300000,

test/test_client_async.py

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from __future__ import absolute_import, division
2+
13
# selectors in stdlib as of py3.4
24
try:
35
import selectors # pylint: disable=import-error
@@ -10,7 +12,7 @@
1012

1113
import pytest
1214

13-
from kafka.client_async import KafkaClient
15+
from kafka.client_async import KafkaClient, IdleConnectionManager
1416
from kafka.conn import ConnectionStates
1517
import kafka.errors as Errors
1618
from kafka.future import Future
@@ -295,7 +297,10 @@ def client(mocker):
295297
mocker.patch.object(KafkaClient, '_bootstrap')
296298
_poll = mocker.patch.object(KafkaClient, '_poll')
297299

298-
cli = KafkaClient(request_timeout_ms=9999999, reconnect_backoff_ms=2222, api_version=(0, 9))
300+
cli = KafkaClient(request_timeout_ms=9999999,
301+
reconnect_backoff_ms=2222,
302+
connections_max_idle_ms=float('inf'),
303+
api_version=(0, 9))
299304

300305
tasks = mocker.patch.object(cli._delayed_tasks, 'next_at')
301306
tasks.return_value = 9999999
@@ -371,3 +376,32 @@ def test_schedule():
371376

372377
def test_unschedule():
373378
pass
379+
380+
381+
def test_idle_connection_manager(mocker):
382+
t = mocker.patch.object(time, 'time')
383+
t.return_value = 0
384+
385+
idle = IdleConnectionManager(100)
386+
assert idle.next_check_ms() == float('inf')
387+
388+
idle.update('foo')
389+
assert not idle.is_expired('foo')
390+
assert idle.poll_expired_connection() is None
391+
assert idle.next_check_ms() == 100
392+
393+
t.return_value = 90 / 1000
394+
assert not idle.is_expired('foo')
395+
assert idle.poll_expired_connection() is None
396+
assert idle.next_check_ms() == 10
397+
398+
t.return_value = 100 / 1000
399+
assert idle.is_expired('foo')
400+
assert idle.next_check_ms() == 0
401+
402+
conn_id, conn_ts = idle.poll_expired_connection()
403+
assert conn_id == 'foo'
404+
assert conn_ts == 0
405+
406+
idle.remove('foo')
407+
assert idle.next_check_ms() == float('inf')

0 commit comments

Comments
 (0)