Skip to content

Commit 474aeaa

Browse files
committed
Merge pull request #331 from vshlapakov/feature-producer-retries
Async producer retries for failed messages
2 parents 67424a2 + 7d6f3f5 commit 474aeaa

File tree

6 files changed

+296
-29
lines changed

6 files changed

+296
-29
lines changed

kafka/common.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,11 @@
7171
KafkaMessage = namedtuple("KafkaMessage",
7272
["topic", "partition", "offset", "key", "value"])
7373

74+
# Define retry policy for async producer
75+
# Limit value: int >= 0, 0 means no retries
76+
RetryOptions = namedtuple("RetryOptions",
77+
["limit", "backoff_ms", "retry_on_timeouts"])
78+
7479

7580
#################
7681
# Exceptions #
@@ -205,6 +210,12 @@ class KafkaConfigurationError(KafkaError):
205210
pass
206211

207212

213+
class AsyncProducerQueueFull(KafkaError):
214+
def __init__(self, failed_msgs, *args):
215+
super(AsyncProducerQueueFull, self).__init__(*args)
216+
self.failed_msgs = failed_msgs
217+
218+
208219
def _iter_broker_errors():
209220
for name, obj in inspect.getmembers(sys.modules[__name__]):
210221
if inspect.isclass(obj) and issubclass(obj, BrokerResponseError) and obj != BrokerResponseError:
@@ -218,3 +229,18 @@ def check_error(response):
218229
if response.error:
219230
error_class = kafka_errors.get(response.error, UnknownError)
220231
raise error_class(response)
232+
233+
234+
RETRY_BACKOFF_ERROR_TYPES = (
235+
KafkaUnavailableError, LeaderNotAvailableError,
236+
ConnectionError, FailedPayloadsError
237+
)
238+
239+
240+
RETRY_REFRESH_ERROR_TYPES = (
241+
NotLeaderForPartitionError, UnknownTopicOrPartitionError,
242+
LeaderNotAvailableError, ConnectionError
243+
)
244+
245+
246+
RETRY_ERROR_TYPES = RETRY_BACKOFF_ERROR_TYPES + RETRY_REFRESH_ERROR_TYPES

kafka/producer/base.py

Lines changed: 98 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -5,18 +5,23 @@
55
import time
66

77
try:
8-
from queue import Empty, Queue
8+
from queue import Empty, Full, Queue
99
except ImportError:
10-
from Queue import Empty, Queue
10+
from Queue import Empty, Full, Queue
1111
from collections import defaultdict
1212

1313
from threading import Thread, Event
1414

1515
import six
1616

1717
from kafka.common import (
18-
ProduceRequest, TopicAndPartition, UnsupportedCodecError
18+
ProduceRequest, TopicAndPartition, RetryOptions,
19+
kafka_errors, UnsupportedCodecError, FailedPayloadsError,
20+
RequestTimedOutError, AsyncProducerQueueFull, UnknownError
1921
)
22+
from kafka.common import (
23+
RETRY_ERROR_TYPES, RETRY_BACKOFF_ERROR_TYPES, RETRY_REFRESH_ERROR_TYPES)
24+
2025
from kafka.protocol import CODEC_NONE, ALL_CODECS, create_message_set
2126
from kafka.util import kafka_bytestring
2227

@@ -25,21 +30,33 @@
2530
BATCH_SEND_DEFAULT_INTERVAL = 20
2631
BATCH_SEND_MSG_COUNT = 20
2732

33+
# unlimited
34+
ASYNC_QUEUE_MAXSIZE = 0
35+
ASYNC_QUEUE_PUT_TIMEOUT = 0
36+
# no retries by default
37+
ASYNC_RETRY_LIMIT = 0
38+
ASYNC_RETRY_BACKOFF_MS = 0
39+
ASYNC_RETRY_ON_TIMEOUTS = False
40+
2841
STOP_ASYNC_PRODUCER = -1
2942

3043

3144
def _send_upstream(queue, client, codec, batch_time, batch_size,
32-
req_acks, ack_timeout, stop_event):
45+
req_acks, ack_timeout, retry_options, stop_event):
3346
"""
3447
Listen on the queue for a specified number of messages or till
3548
a specified timeout and send them upstream to the brokers in one
3649
request
3750
"""
38-
stop = False
51+
reqs = {}
52+
client.reinit()
3953

4054
while not stop_event.is_set():
4155
timeout = batch_time
42-
count = batch_size
56+
57+
# it's a simplification: we're comparing message sets and
58+
# messages: each set can contain [1..batch_size] messages
59+
count = batch_size - len(reqs)
4360
send_at = time.time() + timeout
4461
msgset = defaultdict(list)
4562

@@ -48,7 +65,6 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
4865
while count > 0 and timeout >= 0:
4966
try:
5067
topic_partition, msg, key = queue.get(timeout=timeout)
51-
5268
except Empty:
5369
break
5470

@@ -63,20 +79,60 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
6379
msgset[topic_partition].append((msg, key))
6480

6581
# Send collected requests upstream
66-
reqs = []
6782
for topic_partition, msg in msgset.items():
6883
messages = create_message_set(msg, codec, key)
6984
req = ProduceRequest(topic_partition.topic,
7085
topic_partition.partition,
71-
messages)
72-
reqs.append(req)
86+
tuple(messages))
87+
reqs[req] = 0
88+
89+
if not reqs:
90+
continue
91+
92+
reqs_to_retry, error_cls = [], None
93+
do_backoff, do_refresh = False, False
94+
95+
def _handle_error(error_cls, reqs, all_retries):
96+
if ((error_cls == RequestTimedOutError and
97+
retry_options.retry_on_timeouts) or
98+
error_cls in RETRY_ERROR_TYPES):
99+
all_retries += reqs
100+
if error_cls in RETRY_BACKOFF_ERROR_TYPES:
101+
do_backoff = True
102+
if error_cls in RETRY_REFRESH_ERROR_TYPES:
103+
do_refresh = True
73104

74105
try:
75-
client.send_produce_request(reqs,
76-
acks=req_acks,
77-
timeout=ack_timeout)
78-
except Exception:
79-
log.exception("Unable to send message")
106+
reply = client.send_produce_request(reqs.keys(),
107+
acks=req_acks,
108+
timeout=ack_timeout,
109+
fail_on_error=False)
110+
for i, response in enumerate(reply):
111+
if isinstance(response, FailedPayloadsError):
112+
_handle_error(FailedPayloadsError, response.failed_payloads, reqs_to_retry)
113+
elif isinstance(response, ProduceResponse) and response.error:
114+
error_cls = kafka_errors.get(response.error, UnknownError)
115+
_handle_error(error_cls, [reqs.keys()[i]], reqs_to_retry)
116+
117+
except Exception as ex:
118+
error_cls = kafka_errors.get(type(ex), UnknownError)
119+
_handle_error(error_cls, reqs.keys(), reqs_to_retry)
120+
121+
if not reqs_to_retry:
122+
reqs = {}
123+
continue
124+
125+
# doing backoff before next retry
126+
if do_backoff and retry_options.backoff_ms:
127+
log.info("Doing backoff for %s(ms)." % retry_options.backoff_ms)
128+
time.sleep(float(retry_options.backoff_ms) / 1000)
129+
130+
# refresh topic metadata before next retry
131+
if do_refresh:
132+
client.load_metadata_for_topics()
133+
134+
reqs = dict((key, count + 1) for (key, count) in reqs.items()
135+
if key in reqs_to_retry and count < retry_options.limit)
80136

81137

82138
class Producer(object):
@@ -111,12 +167,18 @@ def __init__(self, client, async=False,
111167
codec=None,
112168
batch_send=False,
113169
batch_send_every_n=BATCH_SEND_MSG_COUNT,
114-
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL):
170+
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
171+
async_retry_limit=ASYNC_RETRY_LIMIT,
172+
async_retry_backoff_ms=ASYNC_RETRY_BACKOFF_MS,
173+
async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS,
174+
async_queue_maxsize=ASYNC_QUEUE_MAXSIZE,
175+
async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT):
115176

116177
if batch_send:
117178
async = True
118179
assert batch_send_every_n > 0
119180
assert batch_send_every_t > 0
181+
assert async_queue_maxsize >= 0
120182
else:
121183
batch_send_every_n = 1
122184
batch_send_every_t = 3600
@@ -135,10 +197,13 @@ def __init__(self, client, async=False,
135197
self.codec = codec
136198

137199
if self.async:
138-
log.warning("async producer does not guarantee message delivery!")
139-
log.warning("Current implementation does not retry Failed messages")
140-
log.warning("Use at your own risk! (or help improve with a PR!)")
141-
self.queue = Queue() # Messages are sent through this queue
200+
# Messages are sent through this queue
201+
self.queue = Queue(async_queue_maxsize)
202+
self.async_queue_put_timeout = async_queue_put_timeout
203+
async_retry_options = RetryOptions(
204+
limit=async_retry_limit,
205+
backoff_ms=async_retry_backoff_ms,
206+
retry_on_timeouts=async_retry_on_timeouts)
142207
self.thread_stop_event = Event()
143208
self.thread = Thread(target=_send_upstream,
144209
args=(self.queue,
@@ -148,6 +213,7 @@ def __init__(self, client, async=False,
148213
batch_send_every_n,
149214
self.req_acks,
150215
self.ack_timeout,
216+
async_retry_options,
151217
self.thread_stop_event))
152218

153219
# Thread will die if main thread exits
@@ -199,8 +265,18 @@ def _send_messages(self, topic, partition, *msg, **kwargs):
199265
raise TypeError("the key must be type bytes")
200266

201267
if self.async:
202-
for m in msg:
203-
self.queue.put((TopicAndPartition(topic, partition), m, key))
268+
for idx, m in enumerate(msg):
269+
try:
270+
item = (TopicAndPartition(topic, partition), m, key)
271+
if self.async_queue_put_timeout == 0:
272+
self.queue.put_nowait(item)
273+
else:
274+
self.queue.put(item, True, self.async_queue_put_timeout)
275+
except Full:
276+
raise AsyncProducerQueueFull(
277+
msg[idx:],
278+
'Producer async queue overfilled. '
279+
'Current queue size %d.' % self.queue.qsize())
204280
resp = []
205281
else:
206282
messages = create_message_set([(m, key) for m in msg], self.codec, key)

kafka/producer/keyed.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,8 @@
77

88
from .base import (
99
Producer, BATCH_SEND_DEFAULT_INTERVAL,
10-
BATCH_SEND_MSG_COUNT
10+
BATCH_SEND_MSG_COUNT, ASYNC_QUEUE_MAXSIZE, ASYNC_QUEUE_PUT_TIMEOUT,
11+
ASYNC_RETRY_LIMIT, ASYNC_RETRY_BACKOFF_MS, ASYNC_RETRY_ON_TIMEOUTS
1112
)
1213

1314
log = logging.getLogger("kafka")
@@ -37,7 +38,12 @@ def __init__(self, client, partitioner=None, async=False,
3738
codec=None,
3839
batch_send=False,
3940
batch_send_every_n=BATCH_SEND_MSG_COUNT,
40-
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL):
41+
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
42+
async_retry_limit=ASYNC_RETRY_LIMIT,
43+
async_retry_backoff_ms=ASYNC_RETRY_BACKOFF_MS,
44+
async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS,
45+
async_queue_maxsize=ASYNC_QUEUE_MAXSIZE,
46+
async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT):
4147
if not partitioner:
4248
partitioner = HashedPartitioner
4349
self.partitioner_class = partitioner
@@ -46,7 +52,12 @@ def __init__(self, client, partitioner=None, async=False,
4652
super(KeyedProducer, self).__init__(client, async, req_acks,
4753
ack_timeout, codec, batch_send,
4854
batch_send_every_n,
49-
batch_send_every_t)
55+
batch_send_every_t,
56+
async_retry_limit,
57+
async_retry_backoff_ms,
58+
async_retry_on_timeouts,
59+
async_queue_maxsize,
60+
async_queue_put_timeout)
5061

5162
def _next_partition(self, topic, key):
5263
if topic not in self.partitioners:

kafka/producer/simple.py

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,8 @@
1010

1111
from .base import (
1212
Producer, BATCH_SEND_DEFAULT_INTERVAL,
13-
BATCH_SEND_MSG_COUNT
13+
BATCH_SEND_MSG_COUNT, ASYNC_QUEUE_MAXSIZE, ASYNC_QUEUE_PUT_TIMEOUT,
14+
ASYNC_RETRY_LIMIT, ASYNC_RETRY_BACKOFF_MS, ASYNC_RETRY_ON_TIMEOUTS
1415
)
1516

1617
log = logging.getLogger("kafka")
@@ -45,13 +46,23 @@ def __init__(self, client, async=False,
4546
batch_send=False,
4647
batch_send_every_n=BATCH_SEND_MSG_COUNT,
4748
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
48-
random_start=True):
49+
random_start=True,
50+
async_retry_limit=ASYNC_RETRY_LIMIT,
51+
async_retry_backoff_ms=ASYNC_RETRY_BACKOFF_MS,
52+
async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS,
53+
async_queue_maxsize=ASYNC_QUEUE_MAXSIZE,
54+
async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT):
4955
self.partition_cycles = {}
5056
self.random_start = random_start
5157
super(SimpleProducer, self).__init__(client, async, req_acks,
5258
ack_timeout, codec, batch_send,
5359
batch_send_every_n,
54-
batch_send_every_t)
60+
batch_send_every_t,
61+
async_retry_limit,
62+
async_retry_backoff_ms,
63+
async_retry_on_timeouts,
64+
async_queue_maxsize,
65+
async_queue_put_timeout)
5566

5667
def _next_partition(self, topic):
5768
if topic not in self.partition_cycles:

0 commit comments

Comments
 (0)