Skip to content

Commit b1b8eb3

Browse files
committed
Add support for connections_max_idle_ms
1 parent 9acbc6f commit b1b8eb3

File tree

3 files changed

+131
-4
lines changed

3 files changed

+131
-4
lines changed

kafka/client_async.py

Lines changed: 94 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,6 +135,7 @@ class KafkaClient(object):
135135
'bootstrap_servers': 'localhost',
136136
'client_id': 'kafka-python-' + __version__,
137137
'request_timeout_ms': 40000,
138+
'connections_max_idle_ms': 9 * 60 * 1000,
138139
'reconnect_backoff_ms': 50,
139140
'max_in_flight_requests_per_connection': 5,
140141
'receive_buffer_bytes': None,
@@ -195,6 +196,7 @@ def __init__(self, **configs):
195196
self._wake_r.setblocking(False)
196197
self._wake_lock = threading.Lock()
197198
self._selector.register(self._wake_r, selectors.EVENT_READ)
199+
self._idle_expiry_manager = IdleConnectionManager(self.config['connections_max_idle_ms'])
198200
self._closed = False
199201
self._sensors = None
200202
if self.config['metrics']:
@@ -292,6 +294,8 @@ def _conn_state_change(self, node_id, conn):
292294
if self._sensors:
293295
self._sensors.connection_created.record()
294296

297+
self._idle_expiry_manager.update(node_id)
298+
295299
if 'bootstrap' in self._conns and node_id != 'bootstrap':
296300
bootstrap = self._conns.pop('bootstrap')
297301
# XXX: make conn.close() require error to cause refresh
@@ -309,7 +313,13 @@ def _conn_state_change(self, node_id, conn):
309313
pass
310314
if self._sensors:
311315
self._sensors.connection_closed.record()
312-
if self._refresh_on_disconnects and not self._closed:
316+
317+
idle_disconnect = False
318+
if self._idle_expiry_manager.is_expired(node_id):
319+
idle_disconnect = True
320+
self._idle_expiry_manager.remove(node_id)
321+
322+
if self._refresh_on_disconnects and not self._closed and not idle_disconnect:
313323
log.warning("Node %s connection failed -- refreshing metadata", node_id)
314324
self.cluster.request_update()
315325

@@ -515,10 +525,12 @@ def poll(self, timeout_ms=None, future=None, sleep=True, delayed_tasks=True):
515525
if future and future.is_done:
516526
timeout = 0
517527
else:
528+
idle_connection_timeout_ms = self._idle_expiry_manager.next_check_ms()
518529
timeout = min(
519530
timeout_ms,
520531
metadata_timeout_ms,
521532
self._delayed_tasks.next_at() * 1000,
533+
idle_connection_timeout_ms,
522534
self.config['request_timeout_ms'])
523535
timeout = max(0, timeout / 1000.0) # avoid negative timeouts
524536

@@ -573,6 +585,8 @@ def _poll(self, timeout, sleep=True):
573585
conn.close(Errors.ConnectionError('Socket EVENT_READ without in-flight-requests'))
574586
continue
575587

588+
self._idle_expiry_manager.update(conn.node_id)
589+
576590
# Accumulate as many responses as the connection has pending
577591
while conn.in_flight_requests:
578592
response = conn.recv() # Note: conn.recv runs callbacks / errbacks
@@ -602,6 +616,7 @@ def _poll(self, timeout, sleep=True):
602616

603617
if self._sensors:
604618
self._sensors.io_time.record((time.time() - end_select) * 1000000000)
619+
self._maybe_close_oldest_connection()
605620
return responses
606621

607622
def in_flight_request_count(self, node_id=None):
@@ -840,6 +855,14 @@ def _clear_wake_fd(self):
840855
except:
841856
break
842857

858+
def _maybe_close_oldest_connection(self):
859+
expired_connection = self._idle_expiry_manager.poll_expired_connection()
860+
if expired_connection:
861+
conn_id, ts = expired_connection
862+
idle_ms = (time.time() - ts) * 1000
863+
log.info('Closing idle connection %s, last active %d ms ago', conn_id, idle_ms)
864+
self.close(node_id=conn_id)
865+
843866

844867
class DelayedTaskQueue(object):
845868
# see https://docs.python.org/2/library/heapq.html
@@ -914,6 +937,76 @@ def pop_ready(self):
914937
return ready_tasks
915938

916939

940+
# OrderedDict requires python2.7+
941+
try:
942+
from collections import OrderedDict
943+
except ImportError:
944+
# If we dont have OrderedDict, we'll fallback to dict with O(n) priority reads
945+
OrderedDict = dict
946+
947+
948+
class IdleConnectionManager(object):
949+
def __init__(self, connections_max_idle_ms):
950+
if connections_max_idle_ms > 0:
951+
self.connections_max_idle = connections_max_idle_ms / 1000
952+
else:
953+
self.connections_max_idle = float('inf')
954+
self.next_idle_close_check_time = None
955+
self.update_next_idle_close_check_time(time.time())
956+
self.lru_connections = OrderedDict()
957+
958+
def update(self, conn_id):
959+
# order should reflect last-update
960+
if conn_id in self.lru_connections:
961+
del self.lru_connections[conn_id]
962+
self.lru_connections[conn_id] = time.time()
963+
964+
def remove(self, conn_id):
965+
if conn_id in self.lru_connections:
966+
del self.lru_connections[conn_id]
967+
968+
def is_expired(self, conn_id):
969+
if conn_id not in self.lru_connections:
970+
return None
971+
return time.time() >= self.lru_connections[conn_id] + self.connections_max_idle
972+
973+
def next_check_ms(self):
974+
now = time.time()
975+
if not self.lru_connections:
976+
return float('inf')
977+
elif self.next_idle_close_check_time <= now:
978+
return 0
979+
else:
980+
return int((self.next_idle_close_check_time - now) * 1000)
981+
982+
def update_next_idle_close_check_time(self, ts):
983+
self.next_idle_close_check_time = ts + self.connections_max_idle
984+
985+
def poll_expired_connection(self):
986+
if time.time() < self.next_idle_close_check_time:
987+
return None
988+
989+
if not len(self.lru_connections):
990+
return None
991+
992+
oldest_conn_id = None
993+
oldest_ts = None
994+
if OrderedDict is dict:
995+
for conn_id, ts in self.lru_connections.items():
996+
if oldest_conn_id is None or ts < oldest_ts:
997+
oldest_conn_id = conn_id
998+
oldest_ts = ts
999+
else:
1000+
(oldest_conn_id, oldest_ts) = next(iter(self.lru_connections.items()))
1001+
1002+
self.update_next_idle_close_check_time(oldest_ts)
1003+
1004+
if time.time() >= oldest_ts + self.connections_max_idle:
1005+
return (oldest_conn_id, oldest_ts)
1006+
else:
1007+
return None
1008+
1009+
9171010
class KafkaClientMetrics(object):
9181011
def __init__(self, metrics, metric_group_prefix, conns):
9191012
self.metrics = metrics

kafka/producer/kafka.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -266,7 +266,7 @@ class KafkaProducer(object):
266266
'linger_ms': 0,
267267
'partitioner': DefaultPartitioner(),
268268
'buffer_memory': 33554432,
269-
'connections_max_idle_ms': 600000, # not implemented yet
269+
'connections_max_idle_ms': 9 * 60 * 1000, # not implemented yet
270270
'max_block_ms': 60000,
271271
'max_request_size': 1048576,
272272
'metadata_max_age_ms': 300000,

test/test_client_async.py

Lines changed: 36 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from __future__ import absolute_import, division
2+
13
# selectors in stdlib as of py3.4
24
try:
35
import selectors # pylint: disable=import-error
@@ -10,7 +12,7 @@
1012

1113
import pytest
1214

13-
from kafka.client_async import KafkaClient
15+
from kafka.client_async import KafkaClient, IdleConnectionManager
1416
from kafka.conn import ConnectionStates
1517
import kafka.errors as Errors
1618
from kafka.future import Future
@@ -319,7 +321,10 @@ def client(mocker):
319321
mocker.patch.object(KafkaClient, '_bootstrap')
320322
_poll = mocker.patch.object(KafkaClient, '_poll')
321323

322-
cli = KafkaClient(request_timeout_ms=9999999, retry_backoff_ms=2222, api_version=(0, 9))
324+
cli = KafkaClient(request_timeout_ms=9999999,
325+
retry_backoff_ms=2222,
326+
connections_max_idle_ms=float('inf'),
327+
api_version=(0, 9))
323328

324329
tasks = mocker.patch.object(cli._delayed_tasks, 'next_at')
325330
tasks.return_value = 9999999
@@ -384,3 +389,32 @@ def test_schedule():
384389

385390
def test_unschedule():
386391
pass
392+
393+
394+
def test_idle_connection_manager(mocker):
395+
t = mocker.patch.object(time, 'time')
396+
t.return_value = 0
397+
398+
idle = IdleConnectionManager(100)
399+
assert idle.next_check_ms() == float('inf')
400+
401+
idle.update('foo')
402+
assert not idle.is_expired('foo')
403+
assert idle.poll_expired_connection() is None
404+
assert idle.next_check_ms() == 100
405+
406+
t.return_value = 90 / 1000
407+
assert not idle.is_expired('foo')
408+
assert idle.poll_expired_connection() is None
409+
assert idle.next_check_ms() == 10
410+
411+
t.return_value = 100 / 1000
412+
assert idle.is_expired('foo')
413+
assert idle.next_check_ms() == 0
414+
415+
conn_id, conn_ts = idle.poll_expired_connection()
416+
assert conn_id == 'foo'
417+
assert conn_ts == 0
418+
419+
idle.remove('foo')
420+
assert idle.next_check_ms() == float('inf')

0 commit comments

Comments
 (0)