-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Add retry and queue size on async producer #283
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -8,3 +8,4 @@ env | |
servers/*/kafka-bin | ||
.coverage | ||
.noseids | ||
.idea |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,9 +4,9 @@ | |
import time | ||
|
||
try: | ||
from queue import Empty | ||
from queue import Empty, Full | ||
except ImportError: | ||
from Queue import Empty | ||
from Queue import Empty, Full | ||
from collections import defaultdict | ||
from multiprocessing import Queue, Process | ||
|
||
|
@@ -22,11 +22,17 @@ | |
BATCH_SEND_DEFAULT_INTERVAL = 20 | ||
BATCH_SEND_MSG_COUNT = 20 | ||
|
||
BATCH_SEND_QUEUE_BUFFERING_MAX_MESSAGES = 0 | ||
BATCH_SEND_QUEUE_MAX_WAIT = -1 | ||
|
||
BATCH_SEND_MAX_RETRY = 3 | ||
BATCH_SEND_RETRY_BACKOFF_MS = 300 | ||
|
||
STOP_ASYNC_PRODUCER = -1 | ||
|
||
|
||
def _send_upstream(queue, client, codec, batch_time, batch_size, | ||
req_acks, ack_timeout): | ||
req_acks, ack_timeout, batch_send_max_retry, batch_send_retry_backoff_ms): | ||
""" | ||
Listen on the queue for a specified number of messages or till | ||
a specified timeout and send them upstream to the brokers in one | ||
|
@@ -73,12 +79,16 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, | |
messages) | ||
reqs.append(req) | ||
|
||
try: | ||
client.send_produce_request(reqs, | ||
acks=req_acks, | ||
timeout=ack_timeout) | ||
except Exception: | ||
log.exception("Unable to send message") | ||
for i in range(batch_send_max_retry): | ||
try: | ||
client.send_produce_request(reqs, | ||
acks=req_acks, | ||
timeout=ack_timeout) | ||
except Exception: | ||
log.exception("Unable to send message - retry {0}".format(i)) | ||
time.sleep(float(batch_send_retry_backoff_ms) / float(1000)) | ||
continue | ||
break | ||
|
||
|
||
class Producer(object): | ||
|
@@ -99,11 +109,17 @@ class Producer(object): | |
batch_send - If True, messages are send in batches | ||
batch_send_every_n - If set, messages are send in batches of this size | ||
batch_send_every_t - If set, messages are send after this timeout | ||
batch_send_queue_buffering_max_messages - If set, maximum number of messages | ||
allowed on the async queue | ||
batch_send_queue_max_wait - If set, wait to put messages in the async queue | ||
until free space or this timeout | ||
batch_send_max_retry - Number of retry for async send, default: 3 | ||
batch_send_retry_backoff_ms - sleep between retry, default: 300ms | ||
""" | ||
|
||
ACK_NOT_REQUIRED = 0 # No ack is required | ||
ACK_AFTER_LOCAL_WRITE = 1 # Send response after it is written to log | ||
ACK_AFTER_CLUSTER_COMMIT = -1 # Send response after data is committed | ||
ACK_NOT_REQUIRED = 0 # No ack is required | ||
ACK_AFTER_LOCAL_WRITE = 1 # Send response after it is written to log | ||
ACK_AFTER_CLUSTER_COMMIT = -1 # Send response after data is committed | ||
|
||
DEFAULT_ACK_TIMEOUT = 1000 | ||
|
||
|
@@ -113,7 +129,11 @@ def __init__(self, client, async=False, | |
codec=None, | ||
batch_send=False, | ||
batch_send_every_n=BATCH_SEND_MSG_COUNT, | ||
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL): | ||
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, | ||
batch_send_queue_buffering_max_messages=BATCH_SEND_QUEUE_BUFFERING_MAX_MESSAGES, | ||
batch_send_queue_max_wait=BATCH_SEND_QUEUE_MAX_WAIT, | ||
batch_send_max_retry=BATCH_SEND_MAX_RETRY, | ||
batch_send_retry_backoff_ms=BATCH_SEND_RETRY_BACKOFF_MS): | ||
|
||
if batch_send: | ||
async = True | ||
|
@@ -127,6 +147,7 @@ def __init__(self, client, async=False, | |
self.async = async | ||
self.req_acks = req_acks | ||
self.ack_timeout = ack_timeout | ||
self.batch_send_queue_max_wait = batch_send_queue_max_wait | ||
|
||
if codec is None: | ||
codec = CODEC_NONE | ||
|
@@ -139,15 +160,17 @@ def __init__(self, client, async=False, | |
log.warning("async producer does not guarantee message delivery!") | ||
log.warning("Current implementation does not retry Failed messages") | ||
log.warning("Use at your own risk! (or help improve with a PR!)") | ||
self.queue = Queue() # Messages are sent through this queue | ||
self.queue = Queue(maxsize=batch_send_queue_buffering_max_messages) # Messages are sent through this queue | ||
self.proc = Process(target=_send_upstream, | ||
args=(self.queue, | ||
self.client.copy(), | ||
self.codec, | ||
batch_send_every_t, | ||
batch_send_every_n, | ||
self.req_acks, | ||
self.ack_timeout)) | ||
self.ack_timeout, | ||
batch_send_max_retry, | ||
batch_send_retry_backoff_ms)) | ||
|
||
# Process will die if main thread exits | ||
self.proc.daemon = True | ||
|
@@ -188,7 +211,12 @@ def _send_messages(self, topic, partition, *msg, **kwargs): | |
|
||
if self.async: | ||
for m in msg: | ||
self.queue.put((TopicAndPartition(topic, partition), m, key)) | ||
try: | ||
self.queue.put((TopicAndPartition(topic, partition), m, key), block=True, | ||
timeout=self.batch_send_queue_max_wait) | ||
except Full: | ||
log.exception('Queue full, failed to put message "{0}" in the async queue after {1} ms'.format( | ||
m, self.batch_send_queue_max_wait)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The timeout is in seconds, not milliseconds. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In fact the timeout on the message insertion in the async queue is in seconds. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Are you sure this is doing what you think it is then? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Here are the behaviour expected: If only batch_send_queue_max_wait is set, it won't change anything as the queue is unlimited in size. If only batch_send_queue_buffering_max_messages is set to a value then the queue is limited in size. Put of messages will raise a Full exception if the queue is full as the default value of batch_send_queue_max_wait will induce no wait despite the parameter block set to True. If both parameter are set the queue will be limited in size. Put of messages will wait the time set in batch_send_queue_max_wait (in second). If the queue is still full after this time an exception Full will be raised otherwise the put will succeed. Let me know if you think that there is an issue on the code or if you think that this implementation is not appropriate. |
||
resp = [] | ||
else: | ||
messages = create_message_set(msg, self.codec, key) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this break be in the try?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the try or out the try do the same.
We continue if we face an exception or we break if no exception.