Skip to content

Commit ce702ef

Browse files
committed
Merge pull request #392 from reAsOn2010/fix-uncaught-FailedPayloadsError
try to fix uncaught FailedPayloadsError in consumer
2 parents b1aad92 + 945fc04 commit ce702ef

File tree

3 files changed

+18
-1
lines changed

3 files changed

+18
-1
lines changed

kafka/common.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -226,6 +226,8 @@ def _iter_broker_errors():
226226

227227

228228
def check_error(response):
229+
if isinstance(response, Exception):
230+
raise response
229231
if response.error:
230232
error_class = kafka_errors.get(response.error, UnknownError)
231233
raise error_class(response)

kafka/consumer/simple.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
FetchRequest, OffsetRequest,
2020
ConsumerFetchSizeTooSmall, ConsumerNoMoreData,
2121
UnknownTopicOrPartitionError, NotLeaderForPartitionError,
22-
OffsetOutOfRangeError, check_error
22+
OffsetOutOfRangeError, FailedPayloadsError, check_error
2323
)
2424
from .base import (
2525
Consumer,
@@ -355,6 +355,13 @@ def _fetch(self):
355355
# Retry this partition
356356
retry_partitions[resp.partition] = partitions[resp.partition]
357357
continue
358+
except FailedPayloadsError as e:
359+
log.warning("Failed payloads of %s"
360+
"Resetting partition offset...",
361+
e.payload)
362+
# Retry this partition
363+
retry_partitions[e.payload.partition] = partitions[e.payload.partition]
364+
continue
358365

359366
partition = resp.partition
360367
buffer_size = partitions[partition]

test/test_failover_integration.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,14 @@ def test_switch_leader_keyed_producer(self):
169169
msg = random_string(10).encode('utf-8')
170170
producer.send_messages(topic, key, msg)
171171

172+
@kafka_versions("all")
173+
def test_switch_leader_simple_consumer(self):
174+
producer = Producer(self.client, async=False)
175+
consumer = SimpleConsumer(self.client, None, self.topic, partitions=None, auto_commit=False, iter_timeout=10)
176+
self._send_random_messages(producer, self.topic, 0, 2)
177+
consumer.get_messages()
178+
self._kill_leader(self.topic, 0)
179+
consumer.get_messages()
172180

173181
def _send_random_messages(self, producer, topic, partition, n):
174182
for j in range(n):

0 commit comments

Comments
 (0)