We read every piece of feedback, and take your input very seriously.
To see all available qualifiers, see our documentation.
There was an error while loading. Please reload this page.
1 parent 4281e3e commit 47156c3Copy full SHA for 47156c3
kafka/coordinator/base.py
@@ -920,9 +920,18 @@ def disable(self):
920
self.enabled = False
921
922
def close(self):
923
+ if self.closed:
924
+ return
925
self.closed = True
926
with self.coordinator._lock:
927
self.coordinator._lock.notify()
928
+
929
+ # Generally this should not happen - close() is triggered
930
+ # by the coordinator. But in some cases GC may close the coordinator
931
+ # from within the heartbeat thread.
932
+ if threading.current_thread() == self:
933
934
935
if self.is_alive():
936
self.join(self.coordinator.config['heartbeat_interval_ms'] / 1000)
937
0 commit comments