Skip to content

Consumer can block indefinitely when resetting offsets for a deleted topic #1780

Closed
@royantman

Description

@royantman

Hi, bumped into the following edge case which results in the fetcher getting stuck indefinitely
inside fetcher._retrieve_offsets():

  • Consumer is running in a stable group, subscribed to multiple topics
  • A topic that is part of the subscription is deleted (no commits for these TPs)
  • A fetch request will get error_code=3 (Unknown) and a MetadataRequest will be made
  • Subscription state is being updated due after the MetadataResponse
  • Rebalance is triggered due to subscription change
  • Revoke, group leader election, sync assignment, and assignment are taking place
  • During that time, another topic that was part of the subscription is deleted (no commits for these TPs either)
  • KafkaConsumer._coordinator.poll() did not trigger a rebalance
  • KafkaConsumer._update_fetch_positions(self._subscription.missing_fetch_positions()) is causing the client to get the reset positions for the assignment (which still holds a topic that no longer exists due to deletion)
  • Errors.UnknownTopicOrPartitionError is thrown from fetcher._handle_offset_response() since the partition does no exist
  • A metadata refresh is taking place and the subscription is updated, but we are caught in an infinite loop inside fetcher._retrieve_offsets() since the timestamps are not updated and timeout is infinity and we now get an early exit in fetcher._send_offset_requests() that forces metadata update over and over again
        timestamps_by_node = collections.defaultdict(dict)
        for partition, timestamp in six.iteritems(timestamps):
            node_id = self._client.cluster.leader_for_partition(partition)
            if node_id is None:
                self._client.add_topic(partition.topic)
                log.debug("Partition %s is unknown for fetching offset,"
                          " wait for metadata refresh", partition)
                return Future().failure(Errors.StaleMetadata(partition))
...
...

If you guide me to what you think would be a good way to tackle this, i'll work on a PR.

Cheers.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions