@@ -135,6 +135,7 @@ class KafkaClient(object):
135
135
'bootstrap_servers' : 'localhost' ,
136
136
'client_id' : 'kafka-python-' + __version__ ,
137
137
'request_timeout_ms' : 40000 ,
138
+ 'connections_max_idle_ms' : 9 * 60 * 1000 ,
138
139
'reconnect_backoff_ms' : 50 ,
139
140
'max_in_flight_requests_per_connection' : 5 ,
140
141
'receive_buffer_bytes' : None ,
@@ -195,6 +196,7 @@ def __init__(self, **configs):
195
196
self ._wake_r .setblocking (False )
196
197
self ._wake_lock = threading .Lock ()
197
198
self ._selector .register (self ._wake_r , selectors .EVENT_READ )
199
+ self ._idle_expiry_manager = IdleConnectionManager (self .config ['connections_max_idle_ms' ])
198
200
self ._closed = False
199
201
self ._sensors = None
200
202
if self .config ['metrics' ]:
@@ -292,6 +294,8 @@ def _conn_state_change(self, node_id, conn):
292
294
if self ._sensors :
293
295
self ._sensors .connection_created .record ()
294
296
297
+ self ._idle_expiry_manager .update (node_id )
298
+
295
299
if 'bootstrap' in self ._conns and node_id != 'bootstrap' :
296
300
bootstrap = self ._conns .pop ('bootstrap' )
297
301
# XXX: make conn.close() require error to cause refresh
@@ -309,7 +313,13 @@ def _conn_state_change(self, node_id, conn):
309
313
pass
310
314
if self ._sensors :
311
315
self ._sensors .connection_closed .record ()
312
- if self ._refresh_on_disconnects and not self ._closed :
316
+
317
+ idle_disconnect = False
318
+ if self ._idle_expiry_manager .is_expired (node_id ):
319
+ idle_disconnect = True
320
+ self ._idle_expiry_manager .remove (node_id )
321
+
322
+ if self ._refresh_on_disconnects and not self ._closed and not idle_disconnect :
313
323
log .warning ("Node %s connection failed -- refreshing metadata" , node_id )
314
324
self .cluster .request_update ()
315
325
@@ -515,10 +525,12 @@ def poll(self, timeout_ms=None, future=None, sleep=True, delayed_tasks=True):
515
525
if future and future .is_done :
516
526
timeout = 0
517
527
else :
528
+ idle_connection_timeout_ms = self ._idle_expiry_manager .next_check_ms ()
518
529
timeout = min (
519
530
timeout_ms ,
520
531
metadata_timeout_ms ,
521
532
self ._delayed_tasks .next_at () * 1000 ,
533
+ idle_connection_timeout_ms ,
522
534
self .config ['request_timeout_ms' ])
523
535
timeout = max (0 , timeout / 1000.0 ) # avoid negative timeouts
524
536
@@ -573,6 +585,8 @@ def _poll(self, timeout, sleep=True):
573
585
conn .close (Errors .ConnectionError ('Socket EVENT_READ without in-flight-requests' ))
574
586
continue
575
587
588
+ self ._idle_expiry_manager .update (conn .node_id )
589
+
576
590
# Accumulate as many responses as the connection has pending
577
591
while conn .in_flight_requests :
578
592
response = conn .recv () # Note: conn.recv runs callbacks / errbacks
@@ -602,6 +616,7 @@ def _poll(self, timeout, sleep=True):
602
616
603
617
if self ._sensors :
604
618
self ._sensors .io_time .record ((time .time () - end_select ) * 1000000000 )
619
+ self ._maybe_close_oldest_connection ()
605
620
return responses
606
621
607
622
def in_flight_request_count (self , node_id = None ):
@@ -840,6 +855,14 @@ def _clear_wake_fd(self):
840
855
except :
841
856
break
842
857
858
+ def _maybe_close_oldest_connection (self ):
859
+ expired_connection = self ._idle_expiry_manager .poll_expired_connection ()
860
+ if expired_connection :
861
+ conn_id , ts = expired_connection
862
+ idle_ms = (time .time () - ts ) * 1000
863
+ log .info ('Closing idle connection %s, last active %d ms ago' , conn_id , idle_ms )
864
+ self .close (node_id = conn_id )
865
+
843
866
844
867
class DelayedTaskQueue (object ):
845
868
# see https://docs.python.org/2/library/heapq.html
@@ -914,6 +937,76 @@ def pop_ready(self):
914
937
return ready_tasks
915
938
916
939
940
+ # OrderedDict requires python2.7+
941
+ try :
942
+ from collections import OrderedDict
943
+ except ImportError :
944
+ # If we dont have OrderedDict, we'll fallback to dict with O(n) priority reads
945
+ OrderedDict = dict
946
+
947
+
948
+ class IdleConnectionManager (object ):
949
+ def __init__ (self , connections_max_idle_ms ):
950
+ if connections_max_idle_ms > 0 :
951
+ self .connections_max_idle = connections_max_idle_ms / 1000
952
+ else :
953
+ self .connections_max_idle = float ('inf' )
954
+ self .next_idle_close_check_time = None
955
+ self .update_next_idle_close_check_time (time .time ())
956
+ self .lru_connections = OrderedDict ()
957
+
958
+ def update (self , conn_id ):
959
+ # order should reflect last-update
960
+ if conn_id in self .lru_connections :
961
+ del self .lru_connections [conn_id ]
962
+ self .lru_connections [conn_id ] = time .time ()
963
+
964
+ def remove (self , conn_id ):
965
+ if conn_id in self .lru_connections :
966
+ del self .lru_connections [conn_id ]
967
+
968
+ def is_expired (self , conn_id ):
969
+ if conn_id not in self .lru_connections :
970
+ return None
971
+ return time .time () >= self .lru_connections [conn_id ] + self .connections_max_idle
972
+
973
+ def next_check_ms (self ):
974
+ now = time .time ()
975
+ if not self .lru_connections :
976
+ return float ('inf' )
977
+ elif self .next_idle_close_check_time <= now :
978
+ return 0
979
+ else :
980
+ return int ((self .next_idle_close_check_time - now ) * 1000 )
981
+
982
+ def update_next_idle_close_check_time (self , ts ):
983
+ self .next_idle_close_check_time = ts + self .connections_max_idle
984
+
985
+ def poll_expired_connection (self ):
986
+ if time .time () < self .next_idle_close_check_time :
987
+ return None
988
+
989
+ if not len (self .lru_connections ):
990
+ return None
991
+
992
+ oldest_conn_id = None
993
+ oldest_ts = None
994
+ if OrderedDict is dict :
995
+ for conn_id , ts in self .lru_connections .items ():
996
+ if oldest_conn_id is None or ts < oldest_ts :
997
+ oldest_conn_id = conn_id
998
+ oldest_ts = ts
999
+ else :
1000
+ (oldest_conn_id , oldest_ts ) = self .lru_connections .items ()[0 ]
1001
+
1002
+ self .update_next_idle_close_check_time (oldest_ts )
1003
+
1004
+ if time .time () >= oldest_ts + self .connections_max_idle :
1005
+ return (oldest_conn_id , oldest_ts )
1006
+ else :
1007
+ return None
1008
+
1009
+
917
1010
class KafkaClientMetrics (object ):
918
1011
def __init__ (self , metrics , metric_group_prefix , conns ):
919
1012
self .metrics = metrics
0 commit comments