Skip to content

MultiProcessConsumer disregards topic offsets between subsequent runs #173

Closed
@macmarcin-zz

Description

@macmarcin-zz

Not sure if this works as designed but, every time I invoke MultiProcesConsumer on same topic/partition (new execution thread), the consumer disregards the consumer offset in Zookeeper and reads all the messages that are available. SimpleProcessConsumer only reads messages that have not been read. Shouldn't MultiProcessConsumer behave the same way?

Code snippet:


brokers=["localhost:9092","localhost:9093","localhost:9094"]
kafka = KafkaClient(brokers)
consumer = MultiProcessConsumer(kafka, "marcin-group", "test_1", num_procs=2)
logging.debug('*** all offsets=%s' %consumer.offsets)
consumed_count=0
for message in consumer:
    print(message)
    consumed_count += 1

logging.debug('*** all offsets=%s ...after consuming all messages, number of messages consumed=%s' % (consumer.offsets, consumed_count)
----------------------------------------
ZK info:
get /kafka/consumers/marcin-group/offsets/test_1/0
200
cZxid = 0x1a2f0
--------
Debug snippets from the log file:
DEBUG:root:*** all offsets={0: 200}
.....
DEBUG:kafka:Commit offset 100 in SimpleConsumer: group=marcin-group, topic=test_1, partition=0
.....
DEBUG:kafka:Commit offset 200 in SimpleConsumer: group=marcin-group, topic=test_1, partition=0
.....
DEBUG:root:*** all offsets={0: 200} ...after consuming all messages, number of messages consumed=200

Metadata

Metadata

Assignees

No one assigned

    Labels

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions