Skip to content

Commit 0cec44a

Browse files
committed
Merge pull request #396 from dpkp/sync_producer_fail_on_error_kwarg
Sync producer fail on error kwarg
2 parents f6be283 + 99bcf07 commit 0cec44a

File tree

2 files changed

+30
-4
lines changed

2 files changed

+30
-4
lines changed

kafka/producer/base.py

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
STOP_ASYNC_PRODUCER = -1
4242
ASYNC_STOP_TIMEOUT_SECS = 30
4343

44+
SYNC_FAIL_ON_ERROR_DEFAULT = True
45+
4446

4547
def _send_upstream(queue, client, codec, batch_time, batch_size,
4648
req_acks, ack_timeout, retry_options, stop_event,
@@ -216,6 +218,9 @@ class Producer(object):
216218
defaults to 1 (local ack).
217219
ack_timeout (int, optional): millisecond timeout to wait for the
218220
configured req_acks, defaults to 1000.
221+
sync_fail_on_error (bool, optional): whether sync producer should
222+
raise exceptions (True), or just return errors (False),
223+
defaults to True.
219224
async (bool, optional): send message using a background thread,
220225
defaults to False.
221226
batch_send_every_n (int, optional): If async is True, messages are
@@ -258,6 +263,7 @@ def __init__(self, client,
258263
req_acks=ACK_AFTER_LOCAL_WRITE,
259264
ack_timeout=DEFAULT_ACK_TIMEOUT,
260265
codec=None,
266+
sync_fail_on_error=SYNC_FAIL_ON_ERROR_DEFAULT,
261267
async=False,
262268
batch_send=False, # deprecated, use async
263269
batch_send_every_n=BATCH_SEND_MSG_COUNT,
@@ -316,6 +322,8 @@ def cleanup(obj):
316322
obj.stop()
317323
self._cleanup_func = cleanup
318324
atexit.register(cleanup, self)
325+
else:
326+
self.sync_fail_on_error = sync_fail_on_error
319327

320328
def send_messages(self, topic, partition, *msg):
321329
"""
@@ -373,8 +381,10 @@ def _send_messages(self, topic, partition, *msg, **kwargs):
373381
messages = create_message_set([(m, key) for m in msg], self.codec, key)
374382
req = ProduceRequest(topic, partition, messages)
375383
try:
376-
resp = self.client.send_produce_request([req], acks=self.req_acks,
377-
timeout=self.ack_timeout)
384+
resp = self.client.send_produce_request(
385+
[req], acks=self.req_acks, timeout=self.ack_timeout,
386+
fail_on_error=self.sync_fail_on_error
387+
)
378388
except Exception:
379389
log.exception("Unable to send messages")
380390
raise

test/test_producer.py

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
from mock import MagicMock, patch
88
from . import unittest
99

10+
from kafka import KafkaClient, SimpleProducer
1011
from kafka.common import (
1112
AsyncProducerQueueFull, FailedPayloadsError, NotLeaderForPartitionError,
1213
ProduceResponse, RetryOptions, TopicAndPartition
@@ -44,8 +45,6 @@ def test_producer_message_types(self):
4445
producer.send_messages(topic, partition, m)
4546

4647
def test_topic_message_types(self):
47-
from kafka.producer.simple import SimpleProducer
48-
4948
client = MagicMock()
5049

5150
def partitions(topic):
@@ -75,6 +74,23 @@ def test_producer_async_queue_overfilled(self, mock):
7574
for _ in xrange(producer.queue.qsize()):
7675
producer.queue.get()
7776

77+
def test_producer_sync_fail_on_error(self):
78+
error = FailedPayloadsError('failure')
79+
with patch.object(KafkaClient, 'load_metadata_for_topics'):
80+
with patch.object(KafkaClient, 'get_partition_ids_for_topic', return_value=[0, 1]):
81+
with patch.object(KafkaClient, '_send_broker_aware_request', return_value = [error]):
82+
83+
client = KafkaClient(MagicMock())
84+
producer = SimpleProducer(client, async=False, sync_fail_on_error=False)
85+
86+
# This should not raise
87+
(response,) = producer.send_messages('foobar', b'test message')
88+
self.assertEqual(response, error)
89+
90+
producer = SimpleProducer(client, async=False, sync_fail_on_error=True)
91+
with self.assertRaises(FailedPayloadsError):
92+
producer.send_messages('foobar', b'test message')
93+
7894

7995
class TestKafkaProducerSendUpstream(unittest.TestCase):
8096

0 commit comments

Comments
 (0)