Skip to content

BC Break in 1.2.0 #705

Closed
Closed
@leric

Description

@leric

I'm using KafkaConsumer read messages from kafka 0.9, message is compressed using snappy, it works on 1.1.1, breaks on 1.2.0

consumer = KafkaConsumer(topic, bootstrap_servers='172.17.12.12:9092', group_id="log_dump", auto_offset_reset='earliest')
for rec in consumer:
    data, ts = format_topic(rec)
......
Traceback (most recent call last):
  File "/usr/lib64/python2.7/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/usr/lib64/python2.7/multiprocessing/process.py", line 114, in run
    self._target(*self._args, **self._kwargs)
  File "logdump.py", line 24, in handle_topic
    for rec in consumer:
  File "/home/ec2-user/py27/local/lib/python2.7/site-packages/six.py", line 558, in next
    return type(self).__next__(self)
  File "/home/ec2-user/py27/local/lib/python2.7/site-packages/kafka/consumer/group.py", line 850, in __next__
    return next(self._iterator)
  File "/home/ec2-user/py27/local/lib/python2.7/site-packages/kafka/consumer/group.py", line 790, in _message_generator
    self._client.poll(timeout_ms=poll_ms, sleep=True)
  File "/home/ec2-user/py27/local/lib/python2.7/site-packages/kafka/client_async.py", line 436, in poll
    responses.extend(self._poll(timeout, sleep=sleep))
  File "/home/ec2-user/py27/local/lib/python2.7/site-packages/kafka/client_async.py", line 479, in _poll
    response = conn.recv() # Note: conn.recv runs callbacks / errbacks
  File "/home/ec2-user/py27/local/lib/python2.7/site-packages/kafka/conn.py", line 477, in recv
    response = self._process_response(self._rbuffer)
  File "/home/ec2-user/py27/local/lib/python2.7/site-packages/kafka/conn.py", line 510, in _process_response
    response = ifr.response_type.decode(read_buffer)
  File "/home/ec2-user/py27/local/lib/python2.7/site-packages/kafka/protocol/struct.py", line 39, in decode
    return cls(*[field.decode(data) for field in cls.SCHEMA.fields])
  File "/home/ec2-user/py27/local/lib/python2.7/site-packages/kafka/protocol/types.py", line 155, in decode
    return [self.array_of.decode(data) for _ in range(length)]
  File "/home/ec2-user/py27/local/lib/python2.7/site-packages/kafka/protocol/types.py", line 118, in decode
    return tuple([field.decode(data) for field in self.fields])
  File "/home/ec2-user/py27/local/lib/python2.7/site-packages/kafka/protocol/types.py", line 155, in decode
    return [self.array_of.decode(data) for _ in range(length)]
  File "/home/ec2-user/py27/local/lib/python2.7/site-packages/kafka/protocol/types.py", line 118, in decode
    return tuple([field.decode(data) for field in self.fields])
  File "/home/ec2-user/py27/local/lib/python2.7/site-packages/kafka/protocol/types.py", line 50, in decode
    return _unpack('>i', data.read(4))
  File "/home/ec2-user/py27/local/lib/python2.7/site-packages/kafka/protocol/types.py", line 20, in _unpack
    raise ValueError(error)
ValueError: <class 'struct.error'>

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions