From 8984141aefecedd260485047760c70fdc171c74f Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Tue, 9 Jun 2015 23:18:03 -0700 Subject: [PATCH 1/2] Log response error type in async producer --- kafka/producer/base.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 49090bd7f..3c826cdb9 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -166,8 +166,9 @@ def _handle_error(error_cls, request): if error_cls: _handle_error(error_cls, orig_req) - log.error('Error sending ProduceRequest (#%d of %d) to %s:%d ' - 'with msgs %s', i + 1, len(requests), + log.error('%s sending ProduceRequest (#%d of %d) ' + 'to %s:%d with msgs %s', + error_cls.__name__, (i + 1), len(requests), orig_req.topic, orig_req.partition, orig_req.messages if log_messages_on_error else hash(orig_req.messages)) From 942c693c23996a3430c82fff8714fecb821a2b9d Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Wed, 10 Jun 2015 12:08:02 -0700 Subject: [PATCH 2/2] Add error type to SimpleConsumer error logging --- kafka/consumer/simple.py | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/kafka/consumer/simple.py b/kafka/consumer/simple.py index e4233ff6f..aa49fca5f 100644 --- a/kafka/consumer/simple.py +++ b/kafka/consumer/simple.py @@ -344,23 +344,26 @@ def _fetch(self): try: check_error(resp) except UnknownTopicOrPartitionError: + log.error('UnknownTopicOrPartitionError for %s:%d', + resp.topic, resp.partition) self.client.reset_topic_metadata(resp.topic) raise except NotLeaderForPartitionError: + log.error('NotLeaderForPartitionError for %s:%d', + resp.topic, resp.partition) self.client.reset_topic_metadata(resp.topic) continue except OffsetOutOfRangeError: - log.warning("OffsetOutOfRangeError for %s - %d. " - "Resetting partition offset...", + log.warning('OffsetOutOfRangeError for %s:%d. ' + 'Resetting partition offset...', resp.topic, resp.partition) self.reset_partition_offset(resp.partition) # Retry this partition retry_partitions[resp.partition] = partitions[resp.partition] continue except FailedPayloadsError as e: - log.warning("Failed payloads of %s" - "Resetting partition offset...", - e.payload) + log.warning('FailedPayloadsError for %s:%d', + e.payload.topic, e.payload.partition) # Retry this partition retry_partitions[e.payload.partition] = partitions[e.payload.partition] continue