We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
There was an error while loading. Please reload this page.
1 parent b090b21 commit 12325c0Copy full SHA for 12325c0
kafka/consumer/fetcher.py
@@ -817,8 +817,9 @@ def _parse_fetched_data(self, completed_fetch):
817
position)
818
unpacked = list(self._unpack_message_set(tp, records))
819
parsed_records = self.PartitionRecords(fetch_offset, tp, unpacked)
820
- last_offset = unpacked[-1].offset
821
- self._sensors.records_fetch_lag.record(highwater - last_offset)
+ if unpacked:
+ last_offset = unpacked[-1].offset
822
+ self._sensors.records_fetch_lag.record(highwater - last_offset)
823
num_bytes = records.valid_bytes()
824
records_count = len(unpacked)
825
elif records.size_in_bytes() > 0:
0 commit comments