Skip to content

Commit f32009c

Browse files
committed
Merge pull request #388 from dpkp/331_fixups
async producer fixups
2 parents 87bea90 + 1d5f4b1 commit f32009c

File tree

6 files changed

+180
-110
lines changed

6 files changed

+180
-110
lines changed

docs/usage.rst

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ SimpleProducer
4747
# Notes:
4848
# * If the producer dies before the messages are sent, there will be losses
4949
# * Call producer.stop() to send the messages and cleanup
50-
producer = SimpleProducer(kafka, batch_send=True,
50+
producer = SimpleProducer(kafka, async=True,
5151
batch_send_every_n=20,
5252
batch_send_every_t=60)
5353

kafka/producer/base.py

Lines changed: 173 additions & 86 deletions
Original file line numberDiff line numberDiff line change
@@ -17,10 +17,9 @@
1717
from kafka.common import (
1818
ProduceRequest, ProduceResponse, TopicAndPartition, RetryOptions,
1919
kafka_errors, UnsupportedCodecError, FailedPayloadsError,
20-
RequestTimedOutError, AsyncProducerQueueFull, UnknownError
20+
RequestTimedOutError, AsyncProducerQueueFull, UnknownError,
21+
RETRY_ERROR_TYPES, RETRY_BACKOFF_ERROR_TYPES, RETRY_REFRESH_ERROR_TYPES
2122
)
22-
from kafka.common import (
23-
RETRY_ERROR_TYPES, RETRY_BACKOFF_ERROR_TYPES, RETRY_REFRESH_ERROR_TYPES)
2423

2524
from kafka.protocol import CODEC_NONE, ALL_CODECS, create_message_set
2625
from kafka.util import kafka_bytestring
@@ -33,33 +32,75 @@
3332
# unlimited
3433
ASYNC_QUEUE_MAXSIZE = 0
3534
ASYNC_QUEUE_PUT_TIMEOUT = 0
36-
# no retries by default
37-
ASYNC_RETRY_LIMIT = 0
38-
ASYNC_RETRY_BACKOFF_MS = 0
39-
ASYNC_RETRY_ON_TIMEOUTS = False
35+
# unlimited retries by default
36+
ASYNC_RETRY_LIMIT = None
37+
ASYNC_RETRY_BACKOFF_MS = 100
38+
ASYNC_RETRY_ON_TIMEOUTS = True
39+
ASYNC_LOG_MESSAGES_ON_ERROR = True
4040

4141
STOP_ASYNC_PRODUCER = -1
42+
ASYNC_STOP_TIMEOUT_SECS = 30
4243

4344

4445
def _send_upstream(queue, client, codec, batch_time, batch_size,
45-
req_acks, ack_timeout, retry_options, stop_event):
46-
"""
47-
Listen on the queue for a specified number of messages or till
48-
a specified timeout and send them upstream to the brokers in one
49-
request
46+
req_acks, ack_timeout, retry_options, stop_event,
47+
log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR,
48+
stop_timeout=ASYNC_STOP_TIMEOUT_SECS):
49+
"""Private method to manage producing messages asynchronously
50+
51+
Listens on the queue for a specified number of messages or until
52+
a specified timeout and then sends messages to the brokers in grouped
53+
requests (one per broker).
54+
55+
Messages placed on the queue should be tuples that conform to this format:
56+
((topic, partition), message, key)
57+
58+
Currently does not mark messages with task_done. Do not attempt to join()!
59+
60+
Arguments:
61+
queue (threading.Queue): the queue from which to get messages
62+
client (KafkaClient): instance to use for communicating with brokers
63+
codec (kafka.protocol.ALL_CODECS): compression codec to use
64+
batch_time (int): interval in seconds to send message batches
65+
batch_size (int): count of messages that will trigger an immediate send
66+
req_acks: required acks to use with ProduceRequests. see server protocol
67+
ack_timeout: timeout to wait for required acks. see server protocol
68+
retry_options (RetryOptions): settings for retry limits, backoff etc
69+
stop_event (threading.Event): event to monitor for shutdown signal.
70+
when this event is 'set', the producer will stop sending messages.
71+
log_messages_on_error (bool, optional): log stringified message-contents
72+
on any produce error, otherwise only log a hash() of the contents,
73+
defaults to True.
74+
stop_timeout (int or float, optional): number of seconds to continue
75+
retrying messages after stop_event is set, defaults to 30.
5076
"""
51-
reqs = {}
77+
request_tries = {}
5278
client.reinit()
79+
stop_at = None
5380

54-
while not stop_event.is_set():
55-
timeout = batch_time
81+
while not (stop_event.is_set() and queue.empty() and not request_tries):
82+
83+
# Handle stop_timeout
84+
if stop_event.is_set():
85+
if not stop_at:
86+
stop_at = stop_timeout + time.time()
87+
if time.time() > stop_at:
88+
log.debug('Async producer stopping due to stop_timeout')
89+
break
5690

57-
# it's a simplification: we're comparing message sets and
58-
# messages: each set can contain [1..batch_size] messages
59-
count = batch_size - len(reqs)
91+
timeout = batch_time
92+
count = batch_size
6093
send_at = time.time() + timeout
6194
msgset = defaultdict(list)
6295

96+
# Merging messages will require a bit more work to manage correctly
97+
# for now, dont look for new batches if we have old ones to retry
98+
if request_tries:
99+
count = 0
100+
log.debug('Skipping new batch collection to handle retries')
101+
else:
102+
log.debug('Batching size: {0}, timeout: {1}'.format(count, timeout))
103+
63104
# Keep fetching till we gather enough messages or a
64105
# timeout is reached
65106
while count > 0 and timeout >= 0:
@@ -84,104 +125,151 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
84125
req = ProduceRequest(topic_partition.topic,
85126
topic_partition.partition,
86127
tuple(messages))
87-
reqs[req] = 0
128+
request_tries[req] = 0
88129

89-
if not reqs:
130+
if not request_tries:
90131
continue
91132

92133
reqs_to_retry, error_cls = [], None
93-
do_backoff, do_refresh = False, False
94-
95-
def _handle_error(error_cls, reqs, all_retries):
96-
if ((error_cls == RequestTimedOutError and
97-
retry_options.retry_on_timeouts) or
98-
error_cls in RETRY_ERROR_TYPES):
99-
all_retries += reqs
100-
if error_cls in RETRY_BACKOFF_ERROR_TYPES:
101-
do_backoff = True
102-
if error_cls in RETRY_REFRESH_ERROR_TYPES:
103-
do_refresh = True
104-
105-
try:
106-
reply = client.send_produce_request(reqs.keys(),
107-
acks=req_acks,
108-
timeout=ack_timeout,
109-
fail_on_error=False)
110-
for i, response in enumerate(reply):
111-
if isinstance(response, FailedPayloadsError):
112-
_handle_error(FailedPayloadsError, response.failed_payloads, reqs_to_retry)
113-
elif isinstance(response, ProduceResponse) and response.error:
114-
error_cls = kafka_errors.get(response.error, UnknownError)
115-
_handle_error(error_cls, [reqs.keys()[i]], reqs_to_retry)
116-
117-
except Exception as ex:
118-
error_cls = kafka_errors.get(type(ex), UnknownError)
119-
_handle_error(error_cls, reqs.keys(), reqs_to_retry)
134+
retry_state = {
135+
'do_backoff': False,
136+
'do_refresh': False
137+
}
138+
139+
def _handle_error(error_cls, request):
140+
if issubclass(error_cls, RETRY_ERROR_TYPES) or (retry_options.retry_on_timeouts and issubclass(error_cls, RequestTimedOutError)):
141+
reqs_to_retry.append(request)
142+
if issubclass(error_cls, RETRY_BACKOFF_ERROR_TYPES):
143+
retry_state['do_backoff'] |= True
144+
if issubclass(error_cls, RETRY_REFRESH_ERROR_TYPES):
145+
retry_state['do_refresh'] |= True
146+
147+
reply = client.send_produce_request(request_tries.keys(),
148+
acks=req_acks,
149+
timeout=ack_timeout,
150+
fail_on_error=False)
151+
for i, response in enumerate(reply):
152+
error_cls = None
153+
if isinstance(response, FailedPayloadsError):
154+
error_cls = response.__class__
155+
orig_req = response.payload
156+
157+
elif isinstance(response, ProduceResponse) and response.error:
158+
error_cls = kafka_errors.get(response.error, UnknownError)
159+
orig_req = request_tries.keys()[i]
160+
161+
if error_cls:
162+
_handle_error(error_cls, orig_req)
163+
log.error('Error sending ProduceRequest to %s:%d with msgs %s',
164+
orig_req.topic, orig_req.partition,
165+
orig_req.messages if log_messages_on_error
166+
else hash(orig_req.messages))
120167

121168
if not reqs_to_retry:
122-
reqs = {}
169+
request_tries = {}
123170
continue
124171

125172
# doing backoff before next retry
126-
if do_backoff and retry_options.backoff_ms:
127-
log.info("Doing backoff for %s(ms)." % retry_options.backoff_ms)
173+
if retry_state['do_backoff'] and retry_options.backoff_ms:
174+
log.warn('Async producer backoff for %s(ms) before retrying', retry_options.backoff_ms)
128175
time.sleep(float(retry_options.backoff_ms) / 1000)
129176

130177
# refresh topic metadata before next retry
131-
if do_refresh:
178+
if retry_state['do_refresh']:
179+
log.warn('Async producer forcing metadata refresh metadata before retrying')
132180
client.load_metadata_for_topics()
133181

134-
reqs = dict((key, count + 1) for (key, count) in reqs.items()
135-
if key in reqs_to_retry and count < retry_options.limit)
182+
# Apply retry limit, dropping messages that are over
183+
request_tries = dict(
184+
(key, count + 1)
185+
for (key, count) in request_tries.items()
186+
if key in reqs_to_retry
187+
and (retry_options.limit is None
188+
or (count < retry_options.limit))
189+
)
190+
191+
# Log messages we are going to retry
192+
for orig_req in request_tries.keys():
193+
log.info('Retrying ProduceRequest to %s:%d with msgs %s',
194+
orig_req.topic, orig_req.partition,
195+
orig_req.messages if log_messages_on_error
196+
else hash(orig_req.messages))
197+
198+
if request_tries or not queue.empty():
199+
log.error('Stopped producer with {0} unsent messages'
200+
.format(len(request_tries) + queue.qsize()))
136201

137202

138203
class Producer(object):
139204
"""
140205
Base class to be used by producers
141206
142207
Arguments:
143-
client: The Kafka client instance to use
144-
async: If set to true, the messages are sent asynchronously via another
145-
thread (process). We will not wait for a response to these
146-
WARNING!!! current implementation of async producer does not
147-
guarantee message delivery. Use at your own risk! Or help us
148-
improve with a PR!
149-
req_acks: A value indicating the acknowledgements that the server must
150-
receive before responding to the request
151-
ack_timeout: Value (in milliseconds) indicating a timeout for waiting
152-
for an acknowledgement
153-
batch_send: If True, messages are send in batches
154-
batch_send_every_n: If set, messages are send in batches of this size
155-
batch_send_every_t: If set, messages are send after this timeout
208+
client (KafkaClient): instance to use for broker communications.
209+
codec (kafka.protocol.ALL_CODECS): compression codec to use.
210+
req_acks (int, optional): A value indicating the acknowledgements that
211+
the server must receive before responding to the request,
212+
defaults to 1 (local ack).
213+
ack_timeout (int, optional): millisecond timeout to wait for the
214+
configured req_acks, defaults to 1000.
215+
async (bool, optional): send message using a background thread,
216+
defaults to False.
217+
batch_send_every_n (int, optional): If async is True, messages are
218+
sent in batches of this size, defaults to 20.
219+
batch_send_every_t (int or float, optional): If async is True,
220+
messages are sent immediately after this timeout in seconds, even
221+
if there are fewer than batch_send_every_n, defaults to 20.
222+
async_retry_limit (int, optional): number of retries for failed messages
223+
or None for unlimited, defaults to None / unlimited.
224+
async_retry_backoff_ms (int, optional): milliseconds to backoff on
225+
failed messages, defaults to 100.
226+
async_retry_on_timeouts (bool, optional): whether to retry on
227+
RequestTimeoutError, defaults to True.
228+
async_queue_maxsize (int, optional): limit to the size of the
229+
internal message queue in number of messages (not size), defaults
230+
to 0 (no limit).
231+
async_queue_put_timeout (int or float, optional): timeout seconds
232+
for queue.put in send_messages for async producers -- will only
233+
apply if async_queue_maxsize > 0 and the queue is Full,
234+
defaults to 0 (fail immediately on full queue).
235+
async_log_messages_on_error (bool, optional): set to False and the
236+
async producer will only log hash() contents on failed produce
237+
requests, defaults to True (log full messages). Hash logging
238+
will not allow you to identify the specific message that failed,
239+
but it will allow you to match failures with retries.
240+
async_stop_timeout (int or float, optional): seconds to continue
241+
attempting to send queued messages after producer.stop(),
242+
defaults to 30.
243+
244+
Deprecated Arguments:
245+
batch_send (bool, optional): If True, messages are sent by a background
246+
thread in batches, defaults to False. Deprecated, use 'async'
156247
"""
157-
158248
ACK_NOT_REQUIRED = 0 # No ack is required
159249
ACK_AFTER_LOCAL_WRITE = 1 # Send response after it is written to log
160250
ACK_AFTER_CLUSTER_COMMIT = -1 # Send response after data is committed
161-
162251
DEFAULT_ACK_TIMEOUT = 1000
163252

164-
def __init__(self, client, async=False,
253+
def __init__(self, client,
165254
req_acks=ACK_AFTER_LOCAL_WRITE,
166255
ack_timeout=DEFAULT_ACK_TIMEOUT,
167256
codec=None,
168-
batch_send=False,
257+
async=False,
258+
batch_send=False, # deprecated, use async
169259
batch_send_every_n=BATCH_SEND_MSG_COUNT,
170260
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
171261
async_retry_limit=ASYNC_RETRY_LIMIT,
172262
async_retry_backoff_ms=ASYNC_RETRY_BACKOFF_MS,
173263
async_retry_on_timeouts=ASYNC_RETRY_ON_TIMEOUTS,
174264
async_queue_maxsize=ASYNC_QUEUE_MAXSIZE,
175-
async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT):
265+
async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT,
266+
async_log_messages_on_error=ASYNC_LOG_MESSAGES_ON_ERROR,
267+
async_stop_timeout=ASYNC_STOP_TIMEOUT_SECS):
176268

177-
if batch_send:
178-
async = True
269+
if async:
179270
assert batch_send_every_n > 0
180271
assert batch_send_every_t > 0
181272
assert async_queue_maxsize >= 0
182-
else:
183-
batch_send_every_n = 1
184-
batch_send_every_t = 3600
185273

186274
self.client = client
187275
self.async = async
@@ -205,16 +293,15 @@ def __init__(self, client, async=False,
205293
backoff_ms=async_retry_backoff_ms,
206294
retry_on_timeouts=async_retry_on_timeouts)
207295
self.thread_stop_event = Event()
208-
self.thread = Thread(target=_send_upstream,
209-
args=(self.queue,
210-
self.client.copy(),
211-
self.codec,
212-
batch_send_every_t,
213-
batch_send_every_n,
214-
self.req_acks,
215-
self.ack_timeout,
216-
async_retry_options,
217-
self.thread_stop_event))
296+
self.thread = Thread(
297+
target=_send_upstream,
298+
args=(self.queue, self.client.copy(), self.codec,
299+
batch_send_every_t, batch_send_every_n,
300+
self.req_acks, self.ack_timeout,
301+
async_retry_options, self.thread_stop_event),
302+
kwargs={'log_messages_on_error': async_log_messages_on_error,
303+
'stop_timeout': async_stop_timeout}
304+
)
218305

219306
# Thread will die if main thread exits
220307
self.thread.daemon = True

kafka/producer/keyed.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -49,8 +49,8 @@ def __init__(self, client, partitioner=None, async=False,
4949
self.partitioner_class = partitioner
5050
self.partitioners = {}
5151

52-
super(KeyedProducer, self).__init__(client, async, req_acks,
53-
ack_timeout, codec, batch_send,
52+
super(KeyedProducer, self).__init__(client, req_acks, ack_timeout,
53+
codec, async, batch_send,
5454
batch_send_every_n,
5555
batch_send_every_t,
5656
async_retry_limit,

kafka/producer/simple.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,8 +54,8 @@ def __init__(self, client, async=False,
5454
async_queue_put_timeout=ASYNC_QUEUE_PUT_TIMEOUT):
5555
self.partition_cycles = {}
5656
self.random_start = random_start
57-
super(SimpleProducer, self).__init__(client, async, req_acks,
58-
ack_timeout, codec, batch_send,
57+
super(SimpleProducer, self).__init__(client, req_acks, ack_timeout,
58+
codec, async, batch_send,
5959
batch_send_every_n,
6060
batch_send_every_t,
6161
async_retry_limit,

test/test_producer.py

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -56,23 +56,6 @@ def partitions(topic):
5656
producer.send_messages(topic, b'hi')
5757
assert client.send_produce_request.called
5858

59-
@patch('kafka.producer.base._send_upstream')
60-
def test_producer_async_queue_overfilled_batch_send(self, mock):
61-
queue_size = 2
62-
producer = Producer(MagicMock(), batch_send=True,
63-
async_queue_maxsize=queue_size)
64-
65-
topic = b'test-topic'
66-
partition = 0
67-
message = b'test-message'
68-
69-
with self.assertRaises(AsyncProducerQueueFull):
70-
message_list = [message] * (queue_size + 1)
71-
producer.send_messages(topic, partition, *message_list)
72-
self.assertEqual(producer.queue.qsize(), queue_size)
73-
for _ in xrange(producer.queue.qsize()):
74-
producer.queue.get()
75-
7659
@patch('kafka.producer.base._send_upstream')
7760
def test_producer_async_queue_overfilled(self, mock):
7861
queue_size = 2

0 commit comments

Comments
 (0)