Skip to content

Consumer more exception handling #404

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Jun 10, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 18 additions & 12 deletions kafka/consumer/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
import kafka.common
from kafka.common import (
OffsetRequest, OffsetCommitRequest, OffsetFetchRequest,
UnknownTopicOrPartitionError, check_error
UnknownTopicOrPartitionError, check_error, KafkaError
)

from kafka.util import kafka_bytestring, ReentrantTimer
Expand Down Expand Up @@ -114,12 +114,13 @@ def fetch_last_known_offsets(self, partitions=None):
self.offsets[resp.partition] = resp.offset

def commit(self, partitions=None):
"""
Commit offsets for this consumer
"""Commit stored offsets to Kafka via OffsetCommitRequest (v0)

Keyword Arguments:
partitions (list): list of partitions to commit, default is to commit
all of them

Returns: True on success, False on failure
"""

# short circuit if nothing happened. This check is kept outside
Expand All @@ -135,22 +136,27 @@ def commit(self, partitions=None):

reqs = []
if partitions is None: # commit all partitions
partitions = self.offsets.keys()
partitions = list(self.offsets.keys())

log.info('Committing new offsets for %s, partitions %s',
self.topic, partitions)
for partition in partitions:
offset = self.offsets[partition]
log.debug("Commit offset %d in SimpleConsumer: "
"group=%s, topic=%s, partition=%s" %
(offset, self.group, self.topic, partition))
log.debug('Commit offset %d in SimpleConsumer: '
'group=%s, topic=%s, partition=%s',
offset, self.group, self.topic, partition)

reqs.append(OffsetCommitRequest(self.topic, partition,
offset, None))

resps = self.client.send_offset_commit_request(self.group, reqs)
for resp in resps:
kafka.common.check_error(resp)

self.count_since_commit = 0
try:
self.client.send_offset_commit_request(self.group, reqs)
except KafkaError as e:
log.error('%s saving offsets: %s', e.__class__.__name__, e)
return False
else:
self.count_since_commit = 0
return True

def _auto_commit(self):
"""
Expand Down
24 changes: 19 additions & 5 deletions kafka/consumer/simple.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
NO_MESSAGES_WAIT_TIME_SECONDS
)
from ..common import (
FetchRequest, OffsetRequest,
FetchRequest, KafkaError, OffsetRequest,
ConsumerFetchSizeTooSmall, ConsumerNoMoreData,
UnknownTopicOrPartitionError, NotLeaderForPartitionError,
OffsetOutOfRangeError, FailedPayloadsError, check_error
Expand Down Expand Up @@ -144,6 +144,13 @@ def __repr__(self):
(self.group, self.topic, str(self.offsets.keys()))

def reset_partition_offset(self, partition):
"""Update offsets using auto_offset_reset policy (smallest|largest)

Arguments:
partition (int): the partition for which offsets should be updated

Returns: Updated offset on success, None on failure
"""
LATEST = -1
EARLIEST = -2
if self.auto_offset_reset == 'largest':
Expand All @@ -163,10 +170,17 @@ def reset_partition_offset(self, partition):
raise

# send_offset_request
(resp, ) = self.client.send_offset_request(reqs)
check_error(resp)
self.offsets[partition] = resp.offsets[0]
self.fetch_offsets[partition] = resp.offsets[0]
log.info('Resetting topic-partition offset to %s for %s:%d',
self.auto_offset_reset, self.topic, partition)
try:
(resp, ) = self.client.send_offset_request(reqs)
except KafkaError as e:
log.error('%s sending offset request for %s:%d',
e.__class__.__name__, self.topic, partition)
else:
self.offsets[partition] = resp.offsets[0]
self.fetch_offsets[partition] = resp.offsets[0]
return resp.offsets[0]

def provide_partition_info(self):
"""
Expand Down
44 changes: 42 additions & 2 deletions test/test_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from kafka import SimpleConsumer, KafkaConsumer, MultiProcessConsumer
from kafka.common import (
KafkaConfigurationError, FetchResponse,
KafkaConfigurationError, FetchResponse, OffsetFetchResponse,
FailedPayloadsError, OffsetAndMessage,
NotLeaderForPartitionError, UnknownTopicOrPartitionError
)
Expand All @@ -25,10 +25,11 @@ def test_partition_list(self):
client = MagicMock()
partitions = (0,)
with patch.object(MultiProcessConsumer, 'fetch_last_known_offsets') as fetch_last_known_offsets:
consumer = MultiProcessConsumer(client, 'testing-group', 'testing-topic', partitions=partitions)
MultiProcessConsumer(client, 'testing-group', 'testing-topic', partitions=partitions)
self.assertEqual(fetch_last_known_offsets.call_args[0], (partitions,) )
self.assertEqual(client.get_partition_ids_for_topic.call_count, 0) # pylint: disable=no-member

class TestSimpleConsumer(unittest.TestCase):
def test_simple_consumer_failed_payloads(self):
client = MagicMock()
consumer = SimpleConsumer(client, group=None,
Expand Down Expand Up @@ -80,6 +81,45 @@ def unknown_topic_partition(request):
with self.assertRaises(UnknownTopicOrPartitionError):
consumer.get_messages(20)

def test_simple_consumer_commit_does_not_raise(self):
client = MagicMock()
client.get_partition_ids_for_topic.return_value = [0, 1]

def mock_offset_fetch_request(group, payloads, **kwargs):
return [OffsetFetchResponse(p.topic, p.partition, 0, b'', 0) for p in payloads]

client.send_offset_fetch_request.side_effect = mock_offset_fetch_request

def mock_offset_commit_request(group, payloads, **kwargs):
raise FailedPayloadsError(payloads[0])

client.send_offset_commit_request.side_effect = mock_offset_commit_request

consumer = SimpleConsumer(client, group='foobar',
topic='topic', partitions=[0, 1],
auto_commit=False)

# Mock internal commit check
consumer.count_since_commit = 10

# This should not raise an exception
self.assertFalse(consumer.commit(partitions=[0, 1]))

def test_simple_consumer_reset_partition_offset(self):
client = MagicMock()

def mock_offset_request(payloads, **kwargs):
raise FailedPayloadsError(payloads[0])

client.send_offset_request.side_effect = mock_offset_request

consumer = SimpleConsumer(client, group='foobar',
topic='topic', partitions=[0, 1],
auto_commit=False)

# This should not raise an exception
self.assertEqual(consumer.reset_partition_offset(0), None)

@staticmethod
def fail_requests_factory(error_factory):
# Mock so that only the first request gets a valid response
Expand Down