Skip to content

Commit ee47bbb

Browse files
committed
Added missing _send_upstream mock patch and converted the if statements to asserts
1 parent a69a9fc commit ee47bbb

File tree

2 files changed

+7
-10
lines changed

2 files changed

+7
-10
lines changed

kafka/producer/base.py

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -121,15 +121,9 @@ def __init__(self, client, async=False,
121121

122122
if batch_send:
123123
async = True
124-
if batch_send_every_n <= 0:
125-
log.exception('Batch send message count lower than zero.')
126-
raise ValueError
127-
if batch_send_every_t <= 0:
128-
log.exception('Batch send message interval lower than zero.')
129-
raise ValueError
130-
if maxsize < 0:
131-
log.exception('Queue size upper bound lower than zero.')
132-
raise ValueError
124+
assert batch_send_every_n > 0
125+
assert batch_send_every_t > 0
126+
assert maxsize >= 0
133127
else:
134128
batch_send_every_n = 1
135129
batch_send_every_t = 3600

test/test_producer.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,8 @@ def test_producer_async_queue_overfilled_batch_send(self, mock):
6161
for _ in xrange(producer.queue.qsize()):
6262
producer.queue.get()
6363

64-
def test_producer_async_queue_overfilled(self):
64+
@patch('kafka.producer.base._send_upstream')
65+
def test_producer_async_queue_overfilled(self, mock):
6566
queue_size = 2
6667
producer = Producer(MagicMock(), async=True, maxsize=queue_size)
6768

@@ -73,6 +74,8 @@ def test_producer_async_queue_overfilled(self):
7374
message_list = [message] * (queue_size + 1)
7475
producer.send_messages(topic, partition, *message_list)
7576
self.assertEqual(producer.queue.qsize(), queue_size)
77+
for _ in xrange(producer.queue.qsize()):
78+
producer.queue.get()
7679

7780
def test_producer_async_queue_normal(self):
7881
queue_size = 4

0 commit comments

Comments
 (0)