Skip to content

Commit bc8965a

Browse files
committed
Merge pull request #1 from scrapinghub/feature-producer-retries
Async producer: added retries for other error types
2 parents 3a389cc + fd6c076 commit bc8965a

File tree

1 file changed

+24
-15
lines changed

1 file changed

+24
-15
lines changed

kafka/producer/base.py

Lines changed: 24 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -86,30 +86,39 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
8686
client.send_produce_request(reqs,
8787
acks=req_acks,
8888
timeout=ack_timeout)
89+
8990
except FailedPayloadsError as ex:
90-
failed_reqs = ex.args[0]
91-
log.exception("Failed payloads count %s" % len(failed_reqs))
92-
93-
# if no limit, retry all failed messages until success
94-
if retries_limit is None:
95-
reqs_to_retry = failed_reqs
96-
# makes sense to check failed reqs only if we have a limit > 0
97-
elif retries_limit > 0:
98-
for req in failed_reqs:
99-
if retries_limit and req.retries < retries_limit:
100-
updated_req = req._replace(retries=req.retries+1)
101-
reqs_to_retry.append(updated_req)
91+
log.warning("Async producer send warning: failed payloads.")
92+
reqs_to_retry = filter_by_retries(ex.args[0], retries_limit)
93+
10294
except Exception as ex:
103-
log.exception("Unable to send message: %s" % type(ex))
104-
finally:
105-
reqs = []
95+
log.error("Async producer send exception: %s" % type(ex))
96+
reqs_to_retry = filter_by_retries(reqs, retries_limit)
10697

98+
reqs = []
10799
if reqs_to_retry and retry_backoff:
108100
reqs = reqs_to_retry
109101
log.warning("%s requests will be retried next call." % len(reqs))
110102
time.sleep(float(retry_backoff) / 1000)
111103

112104

105+
def filter_by_retries(failed_reqs, retries_limit):
106+
""" Get requests to retry using retries limit """
107+
108+
# if no limit, retry all failed messages until success
109+
if retries_limit is None:
110+
return failed_reqs
111+
112+
# makes sense to check failed reqs only if we have a limit > 0
113+
reqs_to_retry = []
114+
if retries_limit > 0:
115+
for req in failed_reqs:
116+
if req.retries < retries_limit:
117+
updated_req = req._replace(retries=req.retries+1)
118+
reqs_to_retry.append(updated_req)
119+
return reqs_to_retry
120+
121+
113122
class Producer(object):
114123
"""
115124
Base class to be used by producers

0 commit comments

Comments
 (0)