Closed
Description
I'm using KafkaConsumer read messages from kafka 0.9, message is compressed using snappy, it works on 1.1.1, breaks on 1.2.0
consumer = KafkaConsumer(topic, bootstrap_servers='172.17.12.12:9092', group_id="log_dump", auto_offset_reset='earliest')
for rec in consumer:
data, ts = format_topic(rec)
......
Traceback (most recent call last):
File "/usr/lib64/python2.7/multiprocessing/process.py", line 258, in _bootstrap
self.run()
File "/usr/lib64/python2.7/multiprocessing/process.py", line 114, in run
self._target(*self._args, **self._kwargs)
File "logdump.py", line 24, in handle_topic
for rec in consumer:
File "/home/ec2-user/py27/local/lib/python2.7/site-packages/six.py", line 558, in next
return type(self).__next__(self)
File "/home/ec2-user/py27/local/lib/python2.7/site-packages/kafka/consumer/group.py", line 850, in __next__
return next(self._iterator)
File "/home/ec2-user/py27/local/lib/python2.7/site-packages/kafka/consumer/group.py", line 790, in _message_generator
self._client.poll(timeout_ms=poll_ms, sleep=True)
File "/home/ec2-user/py27/local/lib/python2.7/site-packages/kafka/client_async.py", line 436, in poll
responses.extend(self._poll(timeout, sleep=sleep))
File "/home/ec2-user/py27/local/lib/python2.7/site-packages/kafka/client_async.py", line 479, in _poll
response = conn.recv() # Note: conn.recv runs callbacks / errbacks
File "/home/ec2-user/py27/local/lib/python2.7/site-packages/kafka/conn.py", line 477, in recv
response = self._process_response(self._rbuffer)
File "/home/ec2-user/py27/local/lib/python2.7/site-packages/kafka/conn.py", line 510, in _process_response
response = ifr.response_type.decode(read_buffer)
File "/home/ec2-user/py27/local/lib/python2.7/site-packages/kafka/protocol/struct.py", line 39, in decode
return cls(*[field.decode(data) for field in cls.SCHEMA.fields])
File "/home/ec2-user/py27/local/lib/python2.7/site-packages/kafka/protocol/types.py", line 155, in decode
return [self.array_of.decode(data) for _ in range(length)]
File "/home/ec2-user/py27/local/lib/python2.7/site-packages/kafka/protocol/types.py", line 118, in decode
return tuple([field.decode(data) for field in self.fields])
File "/home/ec2-user/py27/local/lib/python2.7/site-packages/kafka/protocol/types.py", line 155, in decode
return [self.array_of.decode(data) for _ in range(length)]
File "/home/ec2-user/py27/local/lib/python2.7/site-packages/kafka/protocol/types.py", line 118, in decode
return tuple([field.decode(data) for field in self.fields])
File "/home/ec2-user/py27/local/lib/python2.7/site-packages/kafka/protocol/types.py", line 50, in decode
return _unpack('>i', data.read(4))
File "/home/ec2-user/py27/local/lib/python2.7/site-packages/kafka/protocol/types.py", line 20, in _unpack
raise ValueError(error)
ValueError: <class 'struct.error'>
Metadata
Metadata
Assignees
Labels
No labels