diff --git a/kafka/conn.py b/kafka/conn.py index 99466d90f..94d59f7c9 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -107,6 +107,10 @@ class BrokerConnection(object): server-side log entries that correspond to this client. Also submitted to GroupCoordinator for logging with respect to consumer group administration. Default: 'kafka-python-{version}' + connecting_delay_ms (int): Time to wait before sending data when a + connection is in state CONNECTING. Small but non-zero timeout is + recommended to avoid busy loop as well as to avoid unnecessary + delays while connecting. Default: 100 reconnect_backoff_ms (int): The amount of time in milliseconds to wait before attempting to reconnect to a given host. Default: 50. @@ -192,6 +196,7 @@ class BrokerConnection(object): DEFAULT_CONFIG = { 'client_id': 'kafka-python-' + __version__, + 'connecting_delay_ms': 100, 'node_id': 0, 'request_timeout_ms': 30000, 'reconnect_backoff_ms': 50, @@ -769,15 +774,16 @@ def connection_delay(self): """ Return the number of milliseconds to wait, based on the connection state, before attempting to send data. When disconnected, this respects - the reconnect backoff time. When connecting, returns 0 to allow - non-blocking connect to finish. When connected, returns a very large - number to handle slow/stalled connections. + the reconnect backoff time. When connecting, returns value defined by + connecting_delay_ms config value -- typically something less than a + second -- to allow non-blocking connect to finish. When connected, + returns a very large number to handle slow/stalled connections. """ time_waited = time.time() - (self.last_attempt or 0) if self.state is ConnectionStates.DISCONNECTED: return max(self._reconnect_backoff - time_waited, 0) * 1000 elif self.connecting(): - return 0 + return self.config['connecting_delay_ms'] else: return float('inf') diff --git a/test/test_conn.py b/test/test_conn.py index 6412cb6a6..7abfec5c2 100644 --- a/test/test_conn.py +++ b/test/test_conn.py @@ -85,7 +85,7 @@ def test_connection_delay(conn): conn.last_attempt = 1000 assert conn.connection_delay() == conn.config['reconnect_backoff_ms'] conn.state = ConnectionStates.CONNECTING - assert conn.connection_delay() == 0 + assert conn.connection_delay() == 100 conn.state = ConnectionStates.CONNECTED assert conn.connection_delay() == float('inf')