Skip to content

Commit 5d83941

Browse files
authored
Avoid RuntimeError on mutated _completed_fetches deque in consumer fetcher (#2646)
1 parent 5c7cd2f commit 5d83941

File tree

1 file changed

+2
-1
lines changed

1 file changed

+2
-1
lines changed

kafka/consumer/fetcher.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -612,7 +612,8 @@ def _handle_list_offsets_response(self, future, response):
612612
def _fetchable_partitions(self):
613613
fetchable = self._subscriptions.fetchable_partitions()
614614
# do not fetch a partition if we have a pending fetch response to process
615-
discard = {fetch.topic_partition for fetch in self._completed_fetches}
615+
# use copy.copy to avoid runtimeerror on mutation from different thread
616+
discard = {fetch.topic_partition for fetch in self._completed_fetches.copy()}
616617
current = self._next_partition_records
617618
if current:
618619
discard.add(current.topic_partition)

0 commit comments

Comments
 (0)