Skip to content

Commit 00c6b86

Browse files
committed
Merge pull request #393 from dpkp/simple_consumer_leader_change
Simple consumer leader change
2 parents f1dc01e + b235ce8 commit 00c6b86

File tree

2 files changed

+78
-2
lines changed

2 files changed

+78
-2
lines changed

kafka/consumer/simple.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -344,9 +344,12 @@ def _fetch(self):
344344

345345
try:
346346
check_error(resp)
347-
except (UnknownTopicOrPartitionError, NotLeaderForPartitionError):
347+
except UnknownTopicOrPartitionError:
348348
self.client.reset_topic_metadata(resp.topic)
349349
raise
350+
except NotLeaderForPartitionError:
351+
self.client.reset_topic_metadata(resp.topic)
352+
continue
350353
except OffsetOutOfRangeError:
351354
log.warning("OffsetOutOfRangeError for %s - %d. "
352355
"Resetting partition offset...",

test/test_consumer.py

Lines changed: 74 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,12 @@
33
from . import unittest
44

55
from kafka import SimpleConsumer, KafkaConsumer, MultiProcessConsumer
6-
from kafka.common import KafkaConfigurationError
6+
from kafka.common import (
7+
KafkaConfigurationError, FetchResponse,
8+
FailedPayloadsError, OffsetAndMessage,
9+
NotLeaderForPartitionError, UnknownTopicOrPartitionError
10+
)
11+
712

813
class TestKafkaConsumer(unittest.TestCase):
914
def test_non_integer_partitions(self):
@@ -14,6 +19,7 @@ def test_broker_list_required(self):
1419
with self.assertRaises(KafkaConfigurationError):
1520
KafkaConsumer()
1621

22+
1723
class TestMultiProcessConsumer(unittest.TestCase):
1824
def test_partition_list(self):
1925
client = MagicMock()
@@ -22,3 +28,70 @@ def test_partition_list(self):
2228
consumer = MultiProcessConsumer(client, 'testing-group', 'testing-topic', partitions=partitions)
2329
self.assertEqual(fetch_last_known_offsets.call_args[0], (partitions,) )
2430
self.assertEqual(client.get_partition_ids_for_topic.call_count, 0) # pylint: disable=no-member
31+
32+
def test_simple_consumer_failed_payloads(self):
33+
client = MagicMock()
34+
consumer = SimpleConsumer(client, group=None,
35+
topic='topic', partitions=[0, 1],
36+
auto_commit=False)
37+
38+
def failed_payloads(payload):
39+
return FailedPayloadsError(payload)
40+
41+
client.send_fetch_request.side_effect = self.fail_requests_factory(failed_payloads)
42+
43+
# This should not raise an exception
44+
consumer.get_messages(5)
45+
46+
def test_simple_consumer_leader_change(self):
47+
client = MagicMock()
48+
consumer = SimpleConsumer(client, group=None,
49+
topic='topic', partitions=[0, 1],
50+
auto_commit=False)
51+
52+
# Mock so that only the first request gets a valid response
53+
def not_leader(request):
54+
return FetchResponse(request.topic, request.partition,
55+
NotLeaderForPartitionError.errno, -1, ())
56+
57+
client.send_fetch_request.side_effect = self.fail_requests_factory(not_leader)
58+
59+
# This should not raise an exception
60+
consumer.get_messages(20)
61+
62+
# client should have updated metadata
63+
self.assertGreaterEqual(client.reset_topic_metadata.call_count, 1)
64+
self.assertGreaterEqual(client.load_metadata_for_topics.call_count, 1)
65+
66+
def test_simple_consumer_unknown_topic_partition(self):
67+
client = MagicMock()
68+
consumer = SimpleConsumer(client, group=None,
69+
topic='topic', partitions=[0, 1],
70+
auto_commit=False)
71+
72+
# Mock so that only the first request gets a valid response
73+
def unknown_topic_partition(request):
74+
return FetchResponse(request.topic, request.partition,
75+
UnknownTopicOrPartitionError.errno, -1, ())
76+
77+
client.send_fetch_request.side_effect = self.fail_requests_factory(unknown_topic_partition)
78+
79+
# This should not raise an exception
80+
with self.assertRaises(UnknownTopicOrPartitionError):
81+
consumer.get_messages(20)
82+
83+
@staticmethod
84+
def fail_requests_factory(error_factory):
85+
# Mock so that only the first request gets a valid response
86+
def fail_requests(payloads, **kwargs):
87+
responses = [
88+
FetchResponse(payloads[0].topic, payloads[0].partition, 0, 0,
89+
(OffsetAndMessage(
90+
payloads[0].offset + i,
91+
"msg %d" % (payloads[0].offset + i))
92+
for i in range(10))),
93+
]
94+
for failure in payloads[1:]:
95+
responses.append(error_factory(failure))
96+
return responses
97+
return fail_requests

0 commit comments

Comments
 (0)