diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index f2eaefc6c..f3832c531 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -920,9 +920,18 @@ def disable(self): self.enabled = False def close(self): + if self.closed: + return self.closed = True with self.coordinator._lock: self.coordinator._lock.notify() + + # Generally this should not happen - close() is triggered + # by the coordinator. But in some cases GC may close the coordinator + # from within the heartbeat thread. + if threading.current_thread() == self: + return + if self.is_alive(): self.join(self.coordinator.config['heartbeat_interval_ms'] / 1000) if self.is_alive():