Closed
Description
Hi, bumped into the following edge case which results in the fetcher getting stuck indefinitely
inside fetcher._retrieve_offsets()
:
- Consumer is running in a stable group, subscribed to multiple topics
- A topic that is part of the subscription is deleted (no commits for these TPs)
- A fetch request will get
error_code=3
(Unknown) and aMetadataRequest
will be made - Subscription state is being updated due after the
MetadataResponse
- Rebalance is triggered due to subscription change
- Revoke, group leader election, sync assignment, and assignment are taking place
- During that time, another topic that was part of the subscription is deleted (no commits for these TPs either)
KafkaConsumer._coordinator.poll()
did not trigger a rebalanceKafkaConsumer._update_fetch_positions(self._subscription.missing_fetch_positions())
is causing the client to get the reset positions for the assignment (which still holds a topic that no longer exists due to deletion)Errors.UnknownTopicOrPartitionError
is thrown fromfetcher._handle_offset_response()
since the partition does no exist- A metadata refresh is taking place and the subscription is updated, but we are caught in an infinite loop inside
fetcher._retrieve_offsets()
since thetimestamps
are not updated and timeout is infinity and we now get an early exit infetcher._send_offset_requests()
that forces metadata update over and over again
timestamps_by_node = collections.defaultdict(dict)
for partition, timestamp in six.iteritems(timestamps):
node_id = self._client.cluster.leader_for_partition(partition)
if node_id is None:
self._client.add_topic(partition.topic)
log.debug("Partition %s is unknown for fetching offset,"
" wait for metadata refresh", partition)
return Future().failure(Errors.StaleMetadata(partition))
...
...
If you guide me to what you think would be a good way to tackle this, i'll work on a PR.
Cheers.
Metadata
Metadata
Assignees
Labels
No labels