Skip to content

Do not reset fetch positions if offset commit fetch times out #2629

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 21, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion kafka/consumer/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,9 @@ def reset_offsets_if_needed(self):
Arguments:
partitions ([TopicPartition]): the partitions that need offsets reset

Returns:
bool: True if any partitions need reset; otherwise False (no reset pending)

Raises:
NoOffsetForPartitionError: if no offset reset strategy is defined
KafkaTimeoutError if timeout_ms provided
Expand All @@ -189,7 +192,8 @@ def reset_offsets_if_needed(self):

partitions = self._subscriptions.partitions_needing_reset()
if not partitions:
return
return False
log.debug('Resetting offsets for %s', partitions)

offset_resets = dict()
for tp in partitions:
Expand All @@ -198,6 +202,7 @@ def reset_offsets_if_needed(self):
offset_resets[tp] = ts

self._reset_offsets_async(offset_resets)
return True

def offsets_by_times(self, timestamps, timeout_ms=None):
"""Fetch offset for each partition passed in ``timestamps`` map.
Expand Down
21 changes: 9 additions & 12 deletions kafka/consumer/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -1124,7 +1124,7 @@ def _update_fetch_positions(self, timeout_ms=None):
partitions (List[TopicPartition]): The partitions that need
updating fetch positions.

Returns True if fetch positions updated, False if timeout
Returns True if fetch positions updated, False if timeout or async reset is pending

Raises:
NoOffsetForPartitionError: If no offset is stored for a given
Expand All @@ -1135,15 +1135,13 @@ def _update_fetch_positions(self, timeout_ms=None):

if (self.config['api_version'] >= (0, 8, 1) and
self.config['group_id'] is not None):
try:
# If there are any partitions which do not have a valid position and are not
# awaiting reset, then we need to fetch committed offsets. We will only do a
# coordinator lookup if there are partitions which have missing positions, so
# a consumer with manually assigned partitions can avoid a coordinator dependence
# by always ensuring that assigned partitions have an initial position.
self._coordinator.refresh_committed_offsets_if_needed(timeout_ms=timeout_ms)
except KafkaTimeoutError:
pass
# If there are any partitions which do not have a valid position and are not
# awaiting reset, then we need to fetch committed offsets. We will only do a
# coordinator lookup if there are partitions which have missing positions, so
# a consumer with manually assigned partitions can avoid a coordinator dependence
# by always ensuring that assigned partitions have an initial position.
if not self._coordinator.refresh_committed_offsets_if_needed(timeout_ms=timeout_ms):
return False

# If there are partitions still needing a position and a reset policy is defined,
# request reset using the default policy. If no reset strategy is defined and there
Expand All @@ -1152,8 +1150,7 @@ def _update_fetch_positions(self, timeout_ms=None):

# Finally send an asynchronous request to lookup and update the positions of any
# partitions which are awaiting reset.
self._fetcher.reset_offsets_if_needed()
return False
return not self._fetcher.reset_offsets_if_needed()

def _message_generator_v2(self):
timeout_ms = 1000 * max(0, self._consumer_timeout - time.time())
Expand Down
3 changes: 2 additions & 1 deletion kafka/coordinator/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -427,7 +427,8 @@ def fetch_committed_offsets(self, partitions, timeout_ms=None):
future_key = frozenset(partitions)
timer = Timer(timeout_ms)
while True:
self.ensure_coordinator_ready(timeout_ms=timer.timeout_ms)
if not self.ensure_coordinator_ready(timeout_ms=timer.timeout_ms):
timer.maybe_raise()

# contact coordinator to fetch committed offsets
if future_key in self._offset_fetch_futures:
Expand Down
Loading