diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index b0e236a06..2acd68dea 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -243,7 +243,7 @@ def ensure_coordinator_ready(self): """Block until the coordinator for this group is known (and we have an active connection -- java client uses unsent queue). """ - with self._lock: + with self._client._lock, self._lock: while self.coordinator_unknown(): # Prior to 0.8.2 there was no group coordinator @@ -346,7 +346,7 @@ def _handle_join_failure(self, _): def ensure_active_group(self): """Ensure that the group is active (i.e. joined and synced)""" - with self._lock: + with self._client._lock, self._lock: if self._heartbeat_thread is None: self._start_heartbeat_thread() @@ -764,7 +764,7 @@ def close(self): def maybe_leave_group(self): """Leave the current group and reset local generation/memberId.""" - with self._lock: + with self._client._lock, self._lock: if (not self.coordinator_unknown() and self.state is not MemberState.UNJOINED and self._generation is not Generation.NO_GENERATION): @@ -947,6 +947,15 @@ def run(self): log.debug('Heartbeat thread closed') def _run_once(self): + with self.coordinator._client._lock, self.coordinator._lock: + if self.enabled and self.coordinator.state is MemberState.STABLE: + # TODO: When consumer.wakeup() is implemented, we need to + # disable here to prevent propagating an exception to this + # heartbeat thread + # must get client._lock, or maybe deadlock at heartbeat + # failure callbak in consumer poll + self.coordinator._client.poll(timeout_ms=0) + with self.coordinator._lock: if not self.enabled: log.debug('Heartbeat disabled. Waiting') @@ -962,11 +971,6 @@ def _run_once(self): self.disable() return - # TODO: When consumer.wakeup() is implemented, we need to - # disable here to prevent propagating an exception to this - # heartbeat thread - self.coordinator._client.poll(timeout_ms=0) - if self.coordinator.coordinator_unknown(): future = self.coordinator.lookup_coordinator() if not future.is_done or future.failed():