File tree Expand file tree Collapse file tree 1 file changed +6
-3
lines changed Expand file tree Collapse file tree 1 file changed +6
-3
lines changed Original file line number Diff line number Diff line change 6
6
from queue import Full
7
7
except ImportError :
8
8
from Queue import Full
9
- from mock import MagicMock
9
+ from mock import MagicMock , patch
10
10
from . import unittest
11
-
11
+ from six . moves import xrange
12
12
from kafka .producer .base import Producer
13
13
14
14
@@ -45,7 +45,8 @@ def partitions(topic):
45
45
producer .send_messages (topic , b'hi' )
46
46
assert client .send_produce_request .called
47
47
48
- def test_producer_async_queue_overfilled_batch_send (self ):
48
+ @patch ('kafka.producer.base._send_upstream' )
49
+ def test_producer_async_queue_overfilled_batch_send (self , mock ):
49
50
queue_size = 2
50
51
producer = Producer (MagicMock (), batch_send = True , maxsize = queue_size )
51
52
@@ -57,6 +58,8 @@ def test_producer_async_queue_overfilled_batch_send(self):
57
58
message_list = [message ] * (queue_size + 1 )
58
59
producer .send_messages (topic , partition , * message_list )
59
60
self .assertEqual (producer .queue .qsize (), queue_size )
61
+ for _ in xrange (producer .queue .qsize ()):
62
+ producer .queue .get ()
60
63
61
64
def test_producer_async_queue_overfilled (self ):
62
65
queue_size = 2
You can’t perform that action at this time.
0 commit comments