Closed
Description
Not sure if this works as designed but, every time I invoke MultiProcesConsumer on same topic/partition (new execution thread), the consumer disregards the consumer offset in Zookeeper and reads all the messages that are available. SimpleProcessConsumer only reads messages that have not been read. Shouldn't MultiProcessConsumer behave the same way?
Code snippet:
brokers=["localhost:9092","localhost:9093","localhost:9094"]
kafka = KafkaClient(brokers)
consumer = MultiProcessConsumer(kafka, "marcin-group", "test_1", num_procs=2)
logging.debug('*** all offsets=%s' %consumer.offsets)
consumed_count=0
for message in consumer:
print(message)
consumed_count += 1
logging.debug('*** all offsets=%s ...after consuming all messages, number of messages consumed=%s' % (consumer.offsets, consumed_count)
----------------------------------------
ZK info:
get /kafka/consumers/marcin-group/offsets/test_1/0
200
cZxid = 0x1a2f0
--------
Debug snippets from the log file:
DEBUG:root:*** all offsets={0: 200}
.....
DEBUG:kafka:Commit offset 100 in SimpleConsumer: group=marcin-group, topic=test_1, partition=0
.....
DEBUG:kafka:Commit offset 200 in SimpleConsumer: group=marcin-group, topic=test_1, partition=0
.....
DEBUG:root:*** all offsets={0: 200} ...after consuming all messages, number of messages consumed=200