Skip to content

Commit 4c65bfb

Browse files
committed
Improve async producer code: logic and style fixes
- send_producer_request with fail_on_error=False to retry failed reqs only - using an internal dict with with namedtuple keys for retry counters - refresh metadata on refresh_error irrespective to retries options - removed infinite retries (retry_options.limit=None) as an over-feature - separate producer init args for retries options (limit,backoff,on_timeouts) - AsyncProducerQueueFull returns a list of failed messages - producer tests improved thanks to @rogaha and @toli
1 parent 5d6916f commit 4c65bfb

File tree

5 files changed

+75
-76
lines changed

5 files changed

+75
-76
lines changed

kafka/common.py

Lines changed: 6 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,15 +14,8 @@
1414
["brokers", "topics"])
1515

1616
# https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-ProduceAPI
17-
_ProduceRequest = namedtuple("ProduceRequest",
18-
["topic", "partition", "messages", "retries"])
19-
20-
21-
class ProduceRequest(_ProduceRequest):
22-
def __new__(cls, topic, partition, messages, retries=0):
23-
return super(ProduceRequest, cls).__new__(
24-
cls, topic, partition, messages, retries)
25-
17+
ProduceRequest = namedtuple("ProduceRequest",
18+
["topic", "partition", "messages"])
2619

2720
ProduceResponse = namedtuple("ProduceResponse",
2821
["topic", "partition", "error", "offset"])
@@ -79,7 +72,7 @@ def __new__(cls, topic, partition, messages, retries=0):
7972
["topic", "partition", "offset", "key", "value"])
8073

8174
# Define retry policy for async producer
82-
# Limit corner values: None - infinite retries, 0 - no retries
75+
# Limit value: int >= 0, 0 means no retries
8376
RetryOptions = namedtuple("RetryOptions",
8477
["limit", "backoff_ms", "retry_on_timeouts"])
8578

@@ -218,7 +211,9 @@ class KafkaConfigurationError(KafkaError):
218211

219212

220213
class AsyncProducerQueueFull(KafkaError):
221-
pass
214+
def __init__(self, failed_msgs, *args):
215+
super(AsyncProducerQueueFull, self).__init__(*args)
216+
self.failed_msgs = failed_msgs
222217

223218

224219
def _iter_broker_errors():

kafka/producer/base.py

Lines changed: 33 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -34,8 +34,10 @@
3434
ASYNC_QUEUE_MAXSIZE = 0
3535
ASYNC_QUEUE_PUT_TIMEOUT = 0
3636
# no retries by default
37-
ASYNC_RETRY_OPTIONS = RetryOptions(
38-
limit=0, backoff_ms=0, retry_on_timeouts=False)
37+
ASYNC_RETRY_LIMIT = 0
38+
ASYNC_RETRY_BACKOFF_MS = 0
39+
ASYNC_RETRY_ON_TIMEOUTS = False
40+
3941
STOP_ASYNC_PRODUCER = -1
4042

4143

@@ -46,7 +48,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
4648
a specified timeout and send them upstream to the brokers in one
4749
request
4850
"""
49-
reqs = []
51+
reqs = {}
5052
client.reinit()
5153

5254
while not stop_event.is_set():
@@ -81,36 +83,38 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
8183
messages = create_message_set(msg, codec, key)
8284
req = ProduceRequest(topic_partition.topic,
8385
topic_partition.partition,
84-
messages)
85-
reqs.append(req)
86+
tuple(messages))
87+
reqs[req] = 0
8688

8789
if not reqs:
8890
continue
8991

9092
reqs_to_retry, error_type = [], None
91-
try:
92-
client.send_produce_request(reqs,
93-
acks=req_acks,
94-
timeout=ack_timeout)
9593

96-
except FailedPayloadsError as ex:
97-
error_type = FailedPayloadsError
98-
reqs_to_retry = ex.failed_payloads
94+
try:
95+
reply = client.send_produce_request(reqs.keys(),
96+
acks=req_acks,
97+
timeout=ack_timeout,
98+
fail_on_error=False)
99+
reqs_to_retry = [req for broker_responses in reply
100+
for response in broker_responses
101+
for req in response.failed_payloads
102+
if isinstance(response, FailedPayloadsError)]
103+
if reqs_to_retry:
104+
error_type = FailedPayloadsError
99105

100106
except RequestTimedOutError:
101107
error_type = RequestTimedOutError
102108
if retry_options.retry_on_timeouts:
103-
reqs_to_retry = reqs
109+
reqs_to_retry = reqs.keys()
104110

105111
except Exception as ex:
106112
error_type = type(ex)
107113
if type(ex) in RETRY_ERROR_TYPES:
108-
reqs_to_retry = reqs
109-
110-
finally:
111-
reqs = []
114+
reqs_to_retry = reqs.keys()
112115

113-
if not reqs_to_retry or retry_options.limit == 0:
116+
if not reqs_to_retry:
117+
reqs = {}
114118
continue
115119

116120
# doing backoff before next retry
@@ -122,10 +126,8 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
122126
if error_type in RETRY_REFRESH_ERROR_TYPES:
123127
client.load_metadata_for_topics()
124128

125-
reqs = [req._replace(retries=req.retries+1)
126-
for req in reqs_to_retry
127-
if not retry_options.limit or
128-
(retry_options.limit and req.retries < retry_options.limit)]
129+
reqs = {key: count + 1 for key, count in reqs.items()
130+
if key in reqs_to_retry and count < retry_options.limit}
129131

130132

131133
class Producer(object):
@@ -161,7 +163,9 @@ def __init__(self, client, async=False,
161163
batch_send=False,
162164
batch_send_every_n=BATCH_SEND_MSG_COUNT,
163165
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
164-
async_retry_options=ASYNC_RETRY_OPTIONS,
166+
async_retry_limit=ASYNC_RETRY_LIMIT,
167+
async_retry_backoff_ms=ASYNC_RETRY_BACKOFF_MS,
168+
async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS,
165169
async_queue_maxsize=ASYNC_QUEUE_MAXSIZE,
166170
async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT):
167171

@@ -191,6 +195,10 @@ def __init__(self, client, async=False,
191195
# Messages are sent through this queue
192196
self.queue = Queue(async_queue_maxsize)
193197
self.async_queue_put_timeout = async_queue_put_timeout
198+
async_retry_options = RetryOptions(
199+
limit=async_retry_limit,
200+
backoff_ms=async_retry_backoff_ms,
201+
retry_on_timeouts=async_retry_on_timeouts)
194202
self.thread_stop_event = Event()
195203
self.thread = Thread(target=_send_upstream,
196204
args=(self.queue,
@@ -252,7 +260,7 @@ def _send_messages(self, topic, partition, *msg, **kwargs):
252260
raise TypeError("the key must be type bytes")
253261

254262
if self.async:
255-
for m in msg:
263+
for idx, m in enumerate(msg):
256264
try:
257265
item = (TopicAndPartition(topic, partition), m, key)
258266
if self.async_queue_put_timeout == 0:
@@ -261,6 +269,7 @@ def _send_messages(self, topic, partition, *msg, **kwargs):
261269
self.queue.put(item, True, self.async_queue_put_timeout)
262270
except Full:
263271
raise AsyncProducerQueueFull(
272+
msg[idx:],
264273
'Producer async queue overfilled. '
265274
'Current queue size %d.' % self.queue.qsize())
266275
resp = []

kafka/producer/keyed.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@
77

88
from .base import (
99
Producer, BATCH_SEND_DEFAULT_INTERVAL,
10-
BATCH_SEND_MSG_COUNT, ASYNC_RETRY_OPTIONS,
11-
ASYNC_QUEUE_MAXSIZE, ASYNC_QUEUE_PUT_TIMEOUT
10+
BATCH_SEND_MSG_COUNT, ASYNC_QUEUE_MAXSIZE, ASYNC_QUEUE_PUT_TIMEOUT,
11+
ASYNC_RETRY_LIMIT, ASYNC_RETRY_BACKOFF_MS, ASYNC_RETRY_ON_TIMEOUTS
1212
)
1313

1414
log = logging.getLogger("kafka")
@@ -39,7 +39,9 @@ def __init__(self, client, partitioner=None, async=False,
3939
batch_send=False,
4040
batch_send_every_n=BATCH_SEND_MSG_COUNT,
4141
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
42-
async_retry_options=ASYNC_RETRY_OPTIONS,
42+
async_retry_limit=ASYNC_RETRY_LIMIT,
43+
async_retry_backoff_ms=ASYNC_RETRY_BACKOFF_MS,
44+
async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS,
4345
async_queue_maxsize=ASYNC_QUEUE_MAXSIZE,
4446
async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT):
4547
if not partitioner:
@@ -51,7 +53,9 @@ def __init__(self, client, partitioner=None, async=False,
5153
ack_timeout, codec, batch_send,
5254
batch_send_every_n,
5355
batch_send_every_t,
54-
async_retry_options,
56+
async_retry_limit,
57+
async_retry_backoff_ms,
58+
async_retry_on_timeouts,
5559
async_queue_maxsize,
5660
async_queue_put_timeout)
5761

kafka/producer/simple.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@
1010

1111
from .base import (
1212
Producer, BATCH_SEND_DEFAULT_INTERVAL,
13-
BATCH_SEND_MSG_COUNT, ASYNC_RETRY_OPTIONS,
14-
ASYNC_QUEUE_MAXSIZE, ASYNC_QUEUE_PUT_TIMEOUT
13+
BATCH_SEND_MSG_COUNT, ASYNC_QUEUE_MAXSIZE, ASYNC_QUEUE_PUT_TIMEOUT,
14+
ASYNC_RETRY_LIMIT, ASYNC_RETRY_BACKOFF_MS, ASYNC_RETRY_ON_TIMEOUTS
1515
)
1616

1717
log = logging.getLogger("kafka")
@@ -47,7 +47,9 @@ def __init__(self, client, async=False,
4747
batch_send_every_n=BATCH_SEND_MSG_COUNT,
4848
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
4949
random_start=True,
50-
async_retry_options=ASYNC_RETRY_OPTIONS,
50+
async_retry_limit=ASYNC_RETRY_LIMIT,
51+
async_retry_backoff_ms=ASYNC_RETRY_BACKOFF_MS,
52+
async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS,
5153
async_queue_maxsize=ASYNC_QUEUE_MAXSIZE,
5254
async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT):
5355
self.partition_cycles = {}
@@ -56,7 +58,9 @@ def __init__(self, client, async=False,
5658
ack_timeout, codec, batch_send,
5759
batch_send_every_n,
5860
batch_send_every_t,
59-
async_retry_options,
61+
async_retry_limit,
62+
async_retry_backoff_ms,
63+
async_retry_on_timeouts,
6064
async_queue_maxsize,
6165
async_queue_put_timeout)
6266

test/test_producer.py

Lines changed: 20 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,10 @@
1717
from queue import Empty, Queue
1818
except ImportError:
1919
from Queue import Empty, Queue
20+
try:
21+
xrange
22+
except NameError:
23+
xrange = range
2024

2125

2226
class TestKafkaProducer(unittest.TestCase):
@@ -52,7 +56,8 @@ def partitions(topic):
5256
producer.send_messages(topic, b'hi')
5357
assert client.send_produce_request.called
5458

55-
def test_producer_async_queue_overfilled_batch_send(self):
59+
@patch('kafka.producer.base._send_upstream')
60+
def test_producer_async_queue_overfilled_batch_send(self, mock):
5661
queue_size = 2
5762
producer = Producer(MagicMock(), batch_send=True,
5863
async_queue_maxsize=queue_size)
@@ -64,8 +69,12 @@ def test_producer_async_queue_overfilled_batch_send(self):
6469
with self.assertRaises(AsyncProducerQueueFull):
6570
message_list = [message] * (queue_size + 1)
6671
producer.send_messages(topic, partition, *message_list)
72+
self.assertEqual(producer.queue.qsize(), queue_size)
73+
for _ in xrange(producer.queue.qsize()):
74+
producer.queue.get()
6775

68-
def test_producer_async_queue_overfilled(self):
76+
@patch('kafka.producer.base._send_upstream')
77+
def test_producer_async_queue_overfilled(self, mock):
6978
queue_size = 2
7079
producer = Producer(MagicMock(), async=True,
7180
async_queue_maxsize=queue_size)
@@ -77,7 +86,9 @@ def test_producer_async_queue_overfilled(self):
7786
with self.assertRaises(AsyncProducerQueueFull):
7887
message_list = [message] * (queue_size + 1)
7988
producer.send_messages(topic, partition, *message_list)
80-
89+
self.assertEqual(producer.queue.qsize(), queue_size)
90+
for _ in xrange(producer.queue.qsize()):
91+
producer.queue.get()
8192

8293

8394
class TestKafkaProducerSendUpstream(unittest.TestCase):
@@ -121,7 +132,6 @@ def test_wo_retries(self):
121132
# 3 batches of 3 msgs each + 1 batch of 1 message
122133
self.assertEqual(self.client.send_produce_request.call_count, 4)
123134

124-
125135
def test_first_send_failed(self):
126136

127137
# lets create a queue and add 10 messages for 10 different partitions
@@ -133,7 +143,8 @@ def test_first_send_failed(self):
133143
def send_side_effect(reqs, *args, **kwargs):
134144
if self.client.is_first_time:
135145
self.client.is_first_time = False
136-
raise FailedPayloadsError(reqs)
146+
return [[FailedPayloadsError(reqs)]]
147+
return []
137148

138149
self.client.send_produce_request.side_effect = send_side_effect
139150

@@ -154,7 +165,7 @@ def test_with_limited_retries(self):
154165
self.queue.put((TopicAndPartition("test", i), "msg %i" % i, "key %i" % i))
155166

156167
def send_side_effect(reqs, *args, **kwargs):
157-
raise FailedPayloadsError(reqs)
168+
return [[FailedPayloadsError(reqs)]]
158169

159170
self.client.send_produce_request.side_effect = send_side_effect
160171

@@ -168,30 +179,6 @@ def send_side_effect(reqs, *args, **kwargs):
168179
# 3 retries of the batches above = 4 + 3 * 4 = 16, all failed
169180
self.assertEqual(self.client.send_produce_request.call_count, 16)
170181

171-
def test_with_unlimited_retries(self):
172-
173-
# lets create a queue and add 10 messages for 10 different partitions
174-
# to show how retries should work ideally
175-
for i in range(10):
176-
self.queue.put((TopicAndPartition("test", i), "msg %i", "key %i"))
177-
178-
def send_side_effect(reqs, *args, **kwargs):
179-
raise FailedPayloadsError(reqs)
180-
181-
self.client.send_produce_request.side_effect = send_side_effect
182-
183-
self._run_process(None)
184-
185-
# the queue should have 7 elements
186-
# 3 batches of 1 msg each were retried all this time
187-
self.assertEqual(self.queue.empty(), False)
188-
try:
189-
for i in range(7):
190-
self.queue.get(timeout=0.01)
191-
except Empty:
192-
self.fail("Should be 7 elems in the queue")
193-
self.assertEqual(self.queue.empty(), True)
194-
195-
# 1s / 50ms of backoff = 20 times max
196-
calls = self.client.send_produce_request.call_count
197-
self.assertTrue(calls > 10 & calls <= 20)
182+
def tearDown(self):
183+
for _ in xrange(self.queue.qsize()):
184+
self.queue.get()

0 commit comments

Comments
 (0)