11
11
12
12
from kafka .common import ProduceRequest , TopicAndPartition
13
13
from kafka .partitioner import HashedPartitioner
14
- from kafka .protocol import create_message
14
+ from kafka .protocol import (
15
+ CODEC_NONE , CODEC_GZIP , CODEC_SNAPPY , ALL_CODECS ,
16
+ create_message , create_gzip_message , create_snappy_message ,
17
+ )
15
18
16
19
log = logging .getLogger ("kafka" )
17
20
21
24
STOP_ASYNC_PRODUCER = - 1
22
25
23
26
24
- def _send_upstream (queue , client , batch_time , batch_size ,
27
+ def _send_upstream (queue , client , codec , batch_time , batch_size ,
25
28
req_acks , ack_timeout ):
26
29
"""
27
30
Listen on the queue for a specified number of messages or till
@@ -62,7 +65,14 @@ def _send_upstream(queue, client, batch_time, batch_size,
62
65
63
66
# Send collected requests upstream
64
67
reqs = []
65
- for topic_partition , messages in msgset .items ():
68
+ for topic_partition , msg in msgset .items ():
69
+ if codec == CODEC_GZIP :
70
+ messages = [create_gzip_message (msg )]
71
+ elif codec == CODEC_SNAPPY :
72
+ messages = [create_snappy_message (msg )]
73
+ else :
74
+ messages = [create_message (m ) for m in msg ]
75
+
66
76
req = ProduceRequest (topic_partition .topic ,
67
77
topic_partition .partition ,
68
78
messages )
@@ -102,6 +112,7 @@ class Producer(object):
102
112
def __init__ (self , client , async = False ,
103
113
req_acks = ACK_AFTER_LOCAL_WRITE ,
104
114
ack_timeout = DEFAULT_ACK_TIMEOUT ,
115
+ codec = None ,
105
116
batch_send = False ,
106
117
batch_send_every_n = BATCH_SEND_MSG_COUNT ,
107
118
batch_send_every_t = BATCH_SEND_DEFAULT_INTERVAL ):
@@ -119,11 +130,17 @@ def __init__(self, client, async=False,
119
130
self .req_acks = req_acks
120
131
self .ack_timeout = ack_timeout
121
132
133
+ if codec is None :
134
+ codec = CODEC_NONE
135
+ assert codec in ALL_CODECS
136
+ self .codec = codec
137
+
122
138
if self .async :
123
139
self .queue = Queue () # Messages are sent through this queue
124
140
self .proc = Process (target = _send_upstream ,
125
141
args = (self .queue ,
126
142
self .client .copy (),
143
+ self .codec ,
127
144
batch_send_every_t ,
128
145
batch_send_every_n ,
129
146
self .req_acks ,
@@ -139,11 +156,16 @@ def send_messages(self, topic, partition, *msg):
139
156
"""
140
157
if self .async :
141
158
for m in msg :
142
- self .queue .put ((TopicAndPartition (topic , partition ),
143
- create_message (m )))
159
+ self .queue .put ((TopicAndPartition (topic , partition ), m ))
144
160
resp = []
145
161
else :
146
- messages = [create_message (m ) for m in msg ]
162
+ if self .codec == CODEC_GZIP :
163
+ messages = [create_gzip_message (msg )]
164
+ elif self .codec == CODEC_SNAPPY :
165
+ messages = [create_snappy_message (msg )]
166
+ else :
167
+ messages = [create_message (m ) for m in msg ]
168
+
147
169
req = ProduceRequest (topic , partition , messages )
148
170
try :
149
171
resp = self .client .send_produce_request ([req ], acks = self .req_acks ,
@@ -168,7 +190,7 @@ def stop(self, timeout=1):
168
190
169
191
class SimpleProducer (Producer ):
170
192
"""
171
- A simple, round-robbin producer. Each message goes to exactly one partition
193
+ A simple, round-robin producer. Each message goes to exactly one partition
172
194
173
195
Params:
174
196
client - The Kafka client instance to use
@@ -189,14 +211,15 @@ class SimpleProducer(Producer):
189
211
def __init__ (self , client , async = False ,
190
212
req_acks = Producer .ACK_AFTER_LOCAL_WRITE ,
191
213
ack_timeout = Producer .DEFAULT_ACK_TIMEOUT ,
214
+ codec = None ,
192
215
batch_send = False ,
193
216
batch_send_every_n = BATCH_SEND_MSG_COUNT ,
194
217
batch_send_every_t = BATCH_SEND_DEFAULT_INTERVAL ,
195
218
random_start = False ):
196
219
self .partition_cycles = {}
197
220
self .random_start = random_start
198
221
super (SimpleProducer , self ).__init__ (client , async , req_acks ,
199
- ack_timeout , batch_send ,
222
+ ack_timeout , codec , batch_send ,
200
223
batch_send_every_n ,
201
224
batch_send_every_t )
202
225
@@ -241,6 +264,7 @@ class KeyedProducer(Producer):
241
264
def __init__ (self , client , partitioner = None , async = False ,
242
265
req_acks = Producer .ACK_AFTER_LOCAL_WRITE ,
243
266
ack_timeout = Producer .DEFAULT_ACK_TIMEOUT ,
267
+ codec = None ,
244
268
batch_send = False ,
245
269
batch_send_every_n = BATCH_SEND_MSG_COUNT ,
246
270
batch_send_every_t = BATCH_SEND_DEFAULT_INTERVAL ):
@@ -250,7 +274,7 @@ def __init__(self, client, partitioner=None, async=False,
250
274
self .partitioners = {}
251
275
252
276
super (KeyedProducer , self ).__init__ (client , async , req_acks ,
253
- ack_timeout , batch_send ,
277
+ ack_timeout , codec , batch_send ,
254
278
batch_send_every_n ,
255
279
batch_send_every_t )
256
280
0 commit comments