Skip to content

Commit 9fd0811

Browse files
committed
Merge pull request #356 from dpkp/always_fetch_offsets
fetch commit offsets in base consumer unless group is None
2 parents 6fc6856 + 1d252bf commit 9fd0811

File tree

5 files changed

+105
-14
lines changed

5 files changed

+105
-14
lines changed

kafka/consumer/base.py

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import kafka.common
88
from kafka.common import (
99
OffsetRequest, OffsetCommitRequest, OffsetFetchRequest,
10-
UnknownTopicOrPartitionError
10+
UnknownTopicOrPartitionError, check_error
1111
)
1212

1313
from kafka.util import ReentrantTimer
@@ -68,29 +68,43 @@ def __init__(self, client, group, topic, partitions=None, auto_commit=True,
6868
self.commit)
6969
self.commit_timer.start()
7070

71-
if auto_commit:
71+
# Set initial offsets
72+
if self.group is not None:
7273
self.fetch_last_known_offsets(partitions)
7374
else:
7475
for partition in partitions:
7576
self.offsets[partition] = 0
7677

78+
7779
def fetch_last_known_offsets(self, partitions=None):
80+
if self.group is None:
81+
raise ValueError('KafkaClient.group must not be None')
82+
7883
if not partitions:
7984
partitions = self.client.get_partition_ids_for_topic(self.topic)
8085

81-
def get_or_init_offset(resp):
86+
responses = self.client.send_offset_fetch_request(
87+
self.group,
88+
[OffsetFetchRequest(self.topic, p) for p in partitions],
89+
fail_on_error=False
90+
)
91+
92+
for resp in responses:
8293
try:
83-
kafka.common.check_error(resp)
84-
return resp.offset
94+
check_error(resp)
95+
# API spec says server wont set an error here
96+
# but 0.8.1.1 does actually...
8597
except UnknownTopicOrPartitionError:
86-
return 0
98+
pass
8799

88-
for partition in partitions:
89-
req = OffsetFetchRequest(self.topic, partition)
90-
(resp,) = self.client.send_offset_fetch_request(self.group, [req],
91-
fail_on_error=False)
92-
self.offsets[partition] = get_or_init_offset(resp)
93-
self.fetch_offsets = self.offsets.copy()
100+
# -1 offset signals no commit is currently stored
101+
if resp.offset == -1:
102+
self.offsets[resp.partition] = 0
103+
104+
# Otherwise we committed the stored offset
105+
# and need to fetch the next one
106+
else:
107+
self.offsets[resp.partition] = resp.offset
94108

95109
def commit(self, partitions=None):
96110
"""

kafka/consumer/multiprocess.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -93,6 +93,8 @@ class MultiProcessConsumer(Consumer):
9393
Arguments:
9494
client: a connected KafkaClient
9595
group: a name for this consumer, used for offset storage and must be unique
96+
If you are connecting to a server that does not support offset
97+
commit/fetch (any prior to 0.8.1.1), then you *must* set this to None
9698
topic: the topic to consume
9799
98100
Keyword Arguments:

kafka/consumer/simple.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,8 @@ class SimpleConsumer(Consumer):
7373
Arguments:
7474
client: a connected KafkaClient
7575
group: a name for this consumer, used for offset storage and must be unique
76+
If you are connecting to a server that does not support offset
77+
commit/fetch (any prior to 0.8.1.1), then you *must* set this to None
7678
topic: the topic to consume
7779
7880
Keyword Arguments:

test/test_consumer_integration.py

Lines changed: 74 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ def assert_message_count(self, messages, num_messages):
5353
def consumer(self, **kwargs):
5454
if os.environ['KAFKA_VERSION'] == "0.8.0":
5555
# Kafka 0.8.0 simply doesn't support offset requests, so hard code it being off
56+
kwargs['group'] = None
5657
kwargs['auto_commit'] = False
5758
else:
5859
kwargs.setdefault('auto_commit', True)
@@ -127,6 +128,23 @@ def test_simple_consumer_no_reset(self):
127128
with self.assertRaises(OffsetOutOfRangeError):
128129
consumer.get_message()
129130

131+
@kafka_versions("0.8.1", "0.8.1.1", "0.8.2.0")
132+
def test_simple_consumer_load_initial_offsets(self):
133+
self.send_messages(0, range(0, 100))
134+
self.send_messages(1, range(100, 200))
135+
136+
# Create 1st consumer and change offsets
137+
consumer = self.consumer()
138+
self.assertEqual(consumer.offsets, {0: 0, 1: 0})
139+
consumer.offsets.update({0:51, 1:101})
140+
# Update counter after manual offsets update
141+
consumer.count_since_commit += 1
142+
consumer.commit()
143+
144+
# Create 2nd consumer and check initial offsets
145+
consumer = self.consumer(auto_commit=False)
146+
self.assertEqual(consumer.offsets, {0: 51, 1: 101})
147+
130148
@kafka_versions("all")
131149
def test_simple_consumer__seek(self):
132150
self.send_messages(0, range(0, 100))
@@ -243,7 +261,9 @@ def test_multi_proc_pending(self):
243261
self.send_messages(0, range(0, 10))
244262
self.send_messages(1, range(10, 20))
245263

246-
consumer = MultiProcessConsumer(self.client, "group1", self.topic,
264+
# set group to None and auto_commit to False to avoid interactions w/
265+
# offset commit/fetch apis
266+
consumer = MultiProcessConsumer(self.client, None, self.topic,
247267
auto_commit=False, iter_timeout=0)
248268

249269
self.assertEqual(consumer.pending(), 20)
@@ -252,6 +272,24 @@ def test_multi_proc_pending(self):
252272

253273
consumer.stop()
254274

275+
@kafka_versions("0.8.1", "0.8.1.1", "0.8.2.0")
276+
def test_multi_process_consumer_load_initial_offsets(self):
277+
self.send_messages(0, range(0, 10))
278+
self.send_messages(1, range(10, 20))
279+
280+
# Create 1st consumer and change offsets
281+
consumer = self.consumer()
282+
self.assertEqual(consumer.offsets, {0: 0, 1: 0})
283+
consumer.offsets.update({0:5, 1:15})
284+
# Update counter after manual offsets update
285+
consumer.count_since_commit += 1
286+
consumer.commit()
287+
288+
# Create 2nd consumer and check initial offsets
289+
consumer = self.consumer(consumer = MultiProcessConsumer,
290+
auto_commit=False)
291+
self.assertEqual(consumer.offsets, {0: 5, 1: 15})
292+
255293
@kafka_versions("all")
256294
def test_large_messages(self):
257295
# Produce 10 "normal" size messages
@@ -327,6 +365,41 @@ def test_offset_behavior__resuming_behavior(self):
327365
consumer1.stop()
328366
consumer2.stop()
329367

368+
@kafka_versions("0.8.1", "0.8.1.1", "0.8.2.0")
369+
def test_multi_process_offset_behavior__resuming_behavior(self):
370+
self.send_messages(0, range(0, 100))
371+
self.send_messages(1, range(100, 200))
372+
373+
# Start a consumer
374+
consumer1 = self.consumer(
375+
consumer=MultiProcessConsumer,
376+
auto_commit_every_t = None,
377+
auto_commit_every_n = 20,
378+
)
379+
380+
# Grab the first 195 messages
381+
output_msgs1 = []
382+
idx = 0
383+
for message in consumer1:
384+
output_msgs1.append(message.message.value)
385+
idx += 1
386+
if idx >= 195:
387+
break
388+
self.assert_message_count(output_msgs1, 195)
389+
390+
# The total offset across both partitions should be at 180
391+
consumer2 = self.consumer(
392+
consumer=MultiProcessConsumer,
393+
auto_commit_every_t = None,
394+
auto_commit_every_n = 20,
395+
)
396+
397+
# 181-200
398+
self.assert_message_count([ message for message in consumer2 ], 20)
399+
400+
consumer1.stop()
401+
consumer2.stop()
402+
330403
# TODO: Make this a unit test -- should not require integration
331404
@kafka_versions("all")
332405
def test_fetch_buffer_size(self):

test/test_failover_integration.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ def assert_message_count(self, topic, check_count, timeout=10, partitions=None):
183183

184184
client = KafkaClient(hosts)
185185
group = random_string(10)
186-
consumer = SimpleConsumer(client, group, topic,
186+
consumer = SimpleConsumer(client, None, topic,
187187
partitions=partitions,
188188
auto_commit=False,
189189
iter_timeout=timeout)

0 commit comments

Comments
 (0)