@@ -196,6 +196,7 @@ def _handle_produce_response(self, node_id, send_time, batches, response):
196
196
for topic , partitions in response .topics :
197
197
for partition_info in partitions :
198
198
global_error = None
199
+ log_start_offset = None
199
200
if response .API_VERSION < 2 :
200
201
partition , error_code , offset = partition_info
201
202
ts = None
@@ -208,7 +209,7 @@ def _handle_produce_response(self, node_id, send_time, batches, response):
208
209
tp = TopicPartition (topic , partition )
209
210
error = Errors .for_code (error_code )
210
211
batch = batches_by_partition [tp ]
211
- self ._complete_batch (batch , error , offset , ts , global_error )
212
+ self ._complete_batch (batch , error , offset , ts , log_start_offset , global_error )
212
213
213
214
if response .API_VERSION > 0 :
214
215
self ._sensors .record_throttle_time (response .throttle_time_ms , node = node_id )
@@ -227,6 +228,7 @@ def _complete_batch(self, batch, error, base_offset, timestamp_ms=None, log_star
227
228
base_offset (int): The base offset assigned to the records if successful
228
229
timestamp_ms (int, optional): The timestamp returned by the broker for this batch
229
230
log_start_offset (int): The start offset of the log at the time this produce response was created
231
+ global_error (Exception): The summarising error message
230
232
"""
231
233
# Standardize no-error to None
232
234
if error is Errors .NoError :
0 commit comments