Skip to content

Add retry and queue size on async producer #283

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,4 @@ env
servers/*/kafka-bin
.coverage
.noseids
.idea
60 changes: 44 additions & 16 deletions kafka/producer/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,9 @@
import time

try:
from queue import Empty
from queue import Empty, Full
except ImportError:
from Queue import Empty
from Queue import Empty, Full
from collections import defaultdict
from multiprocessing import Queue, Process

Expand All @@ -22,11 +22,17 @@
BATCH_SEND_DEFAULT_INTERVAL = 20
BATCH_SEND_MSG_COUNT = 20

BATCH_SEND_QUEUE_BUFFERING_MAX_MESSAGES = 0
BATCH_SEND_QUEUE_MAX_WAIT = -1

BATCH_SEND_MAX_RETRY = 3
BATCH_SEND_RETRY_BACKOFF_MS = 300

STOP_ASYNC_PRODUCER = -1


def _send_upstream(queue, client, codec, batch_time, batch_size,
req_acks, ack_timeout):
req_acks, ack_timeout, batch_send_max_retry, batch_send_retry_backoff_ms):
"""
Listen on the queue for a specified number of messages or till
a specified timeout and send them upstream to the brokers in one
Expand Down Expand Up @@ -73,12 +79,16 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
messages)
reqs.append(req)

try:
client.send_produce_request(reqs,
acks=req_acks,
timeout=ack_timeout)
except Exception:
log.exception("Unable to send message")
for i in range(batch_send_max_retry):
try:
client.send_produce_request(reqs,
acks=req_acks,
timeout=ack_timeout)
except Exception:
log.exception("Unable to send message - retry {0}".format(i))
time.sleep(float(batch_send_retry_backoff_ms) / float(1000))
continue
break
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this break be in the try?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the try or out the try do the same.
We continue if we face an exception or we break if no exception.



class Producer(object):
Expand All @@ -99,11 +109,17 @@ class Producer(object):
batch_send - If True, messages are send in batches
batch_send_every_n - If set, messages are send in batches of this size
batch_send_every_t - If set, messages are send after this timeout
batch_send_queue_buffering_max_messages - If set, maximum number of messages
allowed on the async queue
batch_send_queue_max_wait - If set, wait to put messages in the async queue
until free space or this timeout
batch_send_max_retry - Number of retry for async send, default: 3
batch_send_retry_backoff_ms - sleep between retry, default: 300ms
"""

ACK_NOT_REQUIRED = 0 # No ack is required
ACK_AFTER_LOCAL_WRITE = 1 # Send response after it is written to log
ACK_AFTER_CLUSTER_COMMIT = -1 # Send response after data is committed
ACK_NOT_REQUIRED = 0 # No ack is required
ACK_AFTER_LOCAL_WRITE = 1 # Send response after it is written to log
ACK_AFTER_CLUSTER_COMMIT = -1 # Send response after data is committed

DEFAULT_ACK_TIMEOUT = 1000

Expand All @@ -113,7 +129,11 @@ def __init__(self, client, async=False,
codec=None,
batch_send=False,
batch_send_every_n=BATCH_SEND_MSG_COUNT,
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL):
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
batch_send_queue_buffering_max_messages=BATCH_SEND_QUEUE_BUFFERING_MAX_MESSAGES,
batch_send_queue_max_wait=BATCH_SEND_QUEUE_MAX_WAIT,
batch_send_max_retry=BATCH_SEND_MAX_RETRY,
batch_send_retry_backoff_ms=BATCH_SEND_RETRY_BACKOFF_MS):

if batch_send:
async = True
Expand All @@ -127,6 +147,7 @@ def __init__(self, client, async=False,
self.async = async
self.req_acks = req_acks
self.ack_timeout = ack_timeout
self.batch_send_queue_max_wait = batch_send_queue_max_wait

if codec is None:
codec = CODEC_NONE
Expand All @@ -139,15 +160,17 @@ def __init__(self, client, async=False,
log.warning("async producer does not guarantee message delivery!")
log.warning("Current implementation does not retry Failed messages")
log.warning("Use at your own risk! (or help improve with a PR!)")
self.queue = Queue() # Messages are sent through this queue
self.queue = Queue(maxsize=batch_send_queue_buffering_max_messages) # Messages are sent through this queue
self.proc = Process(target=_send_upstream,
args=(self.queue,
self.client.copy(),
self.codec,
batch_send_every_t,
batch_send_every_n,
self.req_acks,
self.ack_timeout))
self.ack_timeout,
batch_send_max_retry,
batch_send_retry_backoff_ms))

# Process will die if main thread exits
self.proc.daemon = True
Expand Down Expand Up @@ -188,7 +211,12 @@ def _send_messages(self, topic, partition, *msg, **kwargs):

if self.async:
for m in msg:
self.queue.put((TopicAndPartition(topic, partition), m, key))
try:
self.queue.put((TopicAndPartition(topic, partition), m, key), block=True,
timeout=self.batch_send_queue_max_wait)
except Full:
log.exception('Queue full, failed to put message "{0}" in the async queue after {1} ms'.format(
m, self.batch_send_queue_max_wait))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The timeout is in seconds, not milliseconds.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In fact the timeout on the message insertion in the async queue is in seconds.
Only the backoff for retry is in milliseconds.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are you sure this is doing what you think it is then?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here are the behaviour expected:
If batch_send_queue_buffering_max_messages and batch_send_queue_max_wait are not set then the queue is an unlimited sized queue and put in the queue never wait or fail for Full exception.

If only batch_send_queue_max_wait is set, it won't change anything as the queue is unlimited in size.

If only batch_send_queue_buffering_max_messages is set to a value then the queue is limited in size. Put of messages will raise a Full exception if the queue is full as the default value of batch_send_queue_max_wait will induce no wait despite the parameter block set to True.

If both parameter are set the queue will be limited in size. Put of messages will wait the time set in batch_send_queue_max_wait (in second). If the queue is still full after this time an exception Full will be raised otherwise the put will succeed.

Let me know if you think that there is an issue on the code or if you think that this implementation is not appropriate.

resp = []
else:
messages = create_message_set(msg, self.codec, key)
Expand Down
26 changes: 21 additions & 5 deletions kafka/producer/keyed.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
from kafka.partitioner import HashedPartitioner
from .base import (
Producer, BATCH_SEND_DEFAULT_INTERVAL,
BATCH_SEND_MSG_COUNT
BATCH_SEND_MSG_COUNT, BATCH_SEND_QUEUE_BUFFERING_MAX_MESSAGES,
BATCH_SEND_QUEUE_MAX_WAIT, BATCH_SEND_MAX_RETRY, BATCH_SEND_RETRY_BACKOFF_MS
)

log = logging.getLogger("kafka")
Expand All @@ -26,14 +27,25 @@ class KeyedProducer(Producer):
batch_send - If True, messages are send in batches
batch_send_every_n - If set, messages are send in batches of this size
batch_send_every_t - If set, messages are send after this timeout
batch_send_queue_buffering_max_messages - If set, maximum number of messages
allowed on the async queue
batch_send_queue_max_wait - If set, wait to put messages in the async queue
until free space or this timeout
batch_send_max_retry - Number of retry for async send, default: 3
batch_send_retry_backoff_ms - sleep between retry, default: 300ms
"""

def __init__(self, client, partitioner=None, async=False,
req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
ack_timeout=Producer.DEFAULT_ACK_TIMEOUT,
codec=None,
batch_send=False,
batch_send_every_n=BATCH_SEND_MSG_COUNT,
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL):
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
batch_send_queue_buffering_max_messages=BATCH_SEND_QUEUE_BUFFERING_MAX_MESSAGES,
batch_send_queue_max_wait=BATCH_SEND_QUEUE_MAX_WAIT,
batch_send_max_retry=BATCH_SEND_MAX_RETRY,
batch_send_retry_backoff_ms=BATCH_SEND_RETRY_BACKOFF_MS):
if not partitioner:
partitioner = HashedPartitioner
self.partitioner_class = partitioner
Expand All @@ -42,7 +54,11 @@ def __init__(self, client, partitioner=None, async=False,
super(KeyedProducer, self).__init__(client, async, req_acks,
ack_timeout, codec, batch_send,
batch_send_every_n,
batch_send_every_t)
batch_send_every_t,
batch_send_queue_buffering_max_messages,
batch_send_queue_max_wait,
batch_send_max_retry,
batch_send_retry_backoff_ms)

def _next_partition(self, topic, key):
if topic not in self.partitioners:
Expand All @@ -54,9 +70,9 @@ def _next_partition(self, topic, key):
partitioner = self.partitioners[topic]
return partitioner.partition(key, self.client.get_partition_ids_for_topic(topic))

def send_messages(self,topic,key,*msg):
def send_messages(self, topic, key, *msg):
partition = self._next_partition(topic, key)
return self._send_messages(topic, partition, *msg,key=key)
return self._send_messages(topic, partition, *msg, key=key)

def send(self, topic, key, msg):
partition = self._next_partition(topic, key)
Expand Down
22 changes: 19 additions & 3 deletions kafka/producer/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@

from .base import (
Producer, BATCH_SEND_DEFAULT_INTERVAL,
BATCH_SEND_MSG_COUNT
BATCH_SEND_MSG_COUNT, BATCH_SEND_QUEUE_BUFFERING_MAX_MESSAGES,
BATCH_SEND_QUEUE_MAX_WAIT, BATCH_SEND_MAX_RETRY, BATCH_SEND_RETRY_BACKOFF_MS
)

log = logging.getLogger("kafka")
Expand All @@ -30,25 +31,40 @@ class SimpleProducer(Producer):
batch_send - If True, messages are send in batches
batch_send_every_n - If set, messages are send in batches of this size
batch_send_every_t - If set, messages are send after this timeout
batch_send_queue_buffering_max_messages - If set, maximum number of messages
allowed on the async queue
batch_send_queue_max_wait - If set, wait to put messages in the async queue
until free space or this timeout
batch_send_max_retry - Number of retry for async send, default: 3
batch_send_retry_backoff_ms - sleep between retry, default: 300ms
random_start - If true, randomize the initial partition which the
the first message block will be published to, otherwise
if false, the first message block will always publish
to partition 0 before cycling through each partition
"""

def __init__(self, client, async=False,
req_acks=Producer.ACK_AFTER_LOCAL_WRITE,
ack_timeout=Producer.DEFAULT_ACK_TIMEOUT,
codec=None,
batch_send=False,
batch_send_every_n=BATCH_SEND_MSG_COUNT,
batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL,
batch_send_queue_buffering_max_messages=BATCH_SEND_QUEUE_BUFFERING_MAX_MESSAGES,
batch_send_queue_max_wait=BATCH_SEND_QUEUE_MAX_WAIT,
batch_send_max_retry=BATCH_SEND_MAX_RETRY,
batch_send_retry_backoff_ms=BATCH_SEND_RETRY_BACKOFF_MS,
random_start=False):
self.partition_cycles = {}
self.random_start = random_start
super(SimpleProducer, self).__init__(client, async, req_acks,
ack_timeout, codec, batch_send,
batch_send_every_n,
batch_send_every_t)
batch_send_every_t,
batch_send_queue_buffering_max_messages,
batch_send_queue_max_wait,
batch_send_max_retry,
batch_send_retry_backoff_ms)

def _next_partition(self, topic):
if topic not in self.partition_cycles:
Expand All @@ -60,7 +76,7 @@ def _next_partition(self, topic):
# Randomize the initial partition that is returned
if self.random_start:
num_partitions = len(self.client.get_partition_ids_for_topic(topic))
for _ in xrange(random.randint(0, num_partitions-1)):
for _ in xrange(random.randint(0, num_partitions - 1)):
next(self.partition_cycles[topic])

return next(self.partition_cycles[topic])
Expand Down