diff --git a/kafka/client_async.py b/kafka/client_async.py index d608e6a5e..acdf497bc 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -5,7 +5,9 @@ import functools import logging import random +import socket import threading +import time import weakref # selectors in stdlib as of py3.4 @@ -15,9 +17,6 @@ # vendored backport module from kafka.vendor import selectors34 as selectors -import socket -import time - from kafka.vendor import six from kafka.cluster import ClusterMetadata @@ -611,7 +610,8 @@ def poll(self, timeout_ms=None, future=None): return responses def _poll(self, timeout): - """Returns list of (response, future) tuples""" + # This needs to be locked, but since it is only called from within the + # locked section of poll(), there is no additional lock acquisition here processed = set() start_select = time.time() diff --git a/kafka/conn.py b/kafka/conn.py index 4f324c87d..c5a5f571c 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -230,6 +230,9 @@ def __init__(self, host, port, afi, **configs): self.node_id = self.config.pop('node_id') + if self.config['api_version'] is None: + self.config['api_version'] = self.DEFAULT_CONFIG['api_version'] + if self.config['receive_buffer_bytes'] is not None: self.config['socket_options'].append( (socket.SOL_SOCKET, socket.SO_RCVBUF,