diff --git a/kafka/client_async.py b/kafka/client_async.py index fa150dbfb..dd48673e0 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -592,6 +592,10 @@ def _poll(self, timeout): # locked section of poll(), there is no additional lock acquisition here processed = set() + # Send pending requests first, before polling for responses + for conn in six.itervalues(self._conns): + conn.send_pending_requests() + start_select = time.time() ready = self._selector.select(timeout) end_select = time.time() @@ -644,8 +648,6 @@ def _poll(self, timeout): conn.close(error=Errors.RequestTimedOutError( 'Request timed out after %s ms' % conn.config['request_timeout_ms'])) - else: - conn.send_pending_requests() if self._sensors: self._sensors.io_time.record((time.time() - end_select) * 1000000000)