Skip to content

Commit 9248b7b

Browse files
committed
Add internal update_offsets param to consumer poll(); default to new iterator
1 parent 5a65334 commit 9248b7b

File tree

1 file changed

+13
-7
lines changed

1 file changed

+13
-7
lines changed

kafka/consumer/group.py

Lines changed: 13 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -303,7 +303,7 @@ class KafkaConsumer(six.Iterator):
303303
'sasl_kerberos_service_name': 'kafka',
304304
'sasl_kerberos_domain_name': None,
305305
'sasl_oauth_token_provider': None,
306-
'legacy_iterator': True, # experimental feature
306+
'legacy_iterator': False, # enable to revert to < 1.4.7 iterator
307307
}
308308
DEFAULT_SESSION_TIMEOUT_MS_0_9 = 30000
309309

@@ -598,7 +598,7 @@ def partitions_for_topic(self, topic):
598598
partitions = cluster.partitions_for_topic(topic)
599599
return partitions
600600

601-
def poll(self, timeout_ms=0, max_records=None):
601+
def poll(self, timeout_ms=0, max_records=None, update_offsets=True):
602602
"""Fetch data from assigned topics / partitions.
603603
604604
Records are fetched and returned in batches by topic-partition.
@@ -622,6 +622,12 @@ def poll(self, timeout_ms=0, max_records=None):
622622
dict: Topic to list of records since the last fetch for the
623623
subscribed list of topics and partitions.
624624
"""
625+
# Note: update_offsets is an internal-use only argument. It is used to
626+
# support the python iterator interface, and which wraps consumer.poll()
627+
# and requires that the partition offsets tracked by the fetcher are not
628+
# updated until the iterator returns each record to the user. As such,
629+
# the argument is not documented and should not be relied on by library
630+
# users to not break in the future.
625631
assert timeout_ms >= 0, 'Timeout must not be negative'
626632
if max_records is None:
627633
max_records = self.config['max_poll_records']
@@ -632,7 +638,7 @@ def poll(self, timeout_ms=0, max_records=None):
632638
start = time.time()
633639
remaining = timeout_ms
634640
while True:
635-
records = self._poll_once(remaining, max_records)
641+
records = self._poll_once(remaining, max_records, update_offsets=update_offsets)
636642
if records:
637643
return records
638644

@@ -642,7 +648,7 @@ def poll(self, timeout_ms=0, max_records=None):
642648
if remaining <= 0:
643649
return {}
644650

645-
def _poll_once(self, timeout_ms, max_records):
651+
def _poll_once(self, timeout_ms, max_records, update_offsets=True):
646652
"""Do one round of polling. In addition to checking for new data, this does
647653
any needed heart-beating, auto-commits, and offset updates.
648654
@@ -661,7 +667,7 @@ def _poll_once(self, timeout_ms, max_records):
661667

662668
# If data is available already, e.g. from a previous network client
663669
# poll() call to commit, then just return it immediately
664-
records, partial = self._fetcher.fetched_records(max_records, update_offsets=bool(self._iterator))
670+
records, partial = self._fetcher.fetched_records(max_records, update_offsets=update_offsets)
665671
if records:
666672
# Before returning the fetched records, we can send off the
667673
# next round of fetches and avoid block waiting for their
@@ -681,7 +687,7 @@ def _poll_once(self, timeout_ms, max_records):
681687
if self._coordinator.need_rejoin():
682688
return {}
683689

684-
records, _ = self._fetcher.fetched_records(max_records, update_offsets=bool(self._iterator))
690+
records, _ = self._fetcher.fetched_records(max_records, update_offsets=update_offsets)
685691
return records
686692

687693
def position(self, partition):
@@ -1089,7 +1095,7 @@ def _update_fetch_positions(self, partitions):
10891095

10901096
def _message_generator_v2(self):
10911097
timeout_ms = 1000 * (self._consumer_timeout - time.time())
1092-
record_map = self.poll(timeout_ms=timeout_ms)
1098+
record_map = self.poll(timeout_ms=timeout_ms, update_offsets=False)
10931099
for tp, records in six.iteritems(record_map):
10941100
# Generators are stateful, and it is possible that the tp / records
10951101
# here may become stale during iteration -- i.e., we seek to a

0 commit comments

Comments
 (0)