diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 6f1d1ee31..852157811 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -309,7 +309,7 @@ def _reset_find_coordinator_future(self, result): self._find_coordinator_future = None def lookup_coordinator(self): - with self._lock: + with self._client._lock, self._lock: if self._find_coordinator_future is not None: return self._find_coordinator_future @@ -883,6 +883,7 @@ def _handle_leave_group_response(self, response): def _send_heartbeat_request(self): """Send a heartbeat request""" + # Note: acquire both client + coordinator lock before calling if self.coordinator_unknown(): e = Errors.CoordinatorNotAvailableError(self.coordinator_id) return Future().failure(e) @@ -1054,7 +1055,9 @@ def run(self): heartbeat_log.debug('Heartbeat thread closed') def _run_once(self): - with self.coordinator._client._lock, self.coordinator._lock: + self.coordinator._client._lock.acquire() + self.coordinator._lock.acquire() + try: if self.enabled and self.coordinator.state is MemberState.STABLE: # TODO: When consumer.wakeup() is implemented, we need to # disable here to prevent propagating an exception to this @@ -1063,27 +1066,26 @@ def _run_once(self): # failure callback in consumer poll self.coordinator._client.poll(timeout_ms=0) - with self.coordinator._lock: if not self.enabled: heartbeat_log.debug('Heartbeat disabled. Waiting') + self.coordinator._client._lock.release() self.coordinator._lock.wait() heartbeat_log.debug('Heartbeat re-enabled.') - return - if self.coordinator.state is not MemberState.STABLE: + elif self.coordinator.state is not MemberState.STABLE: # the group is not stable (perhaps because we left the # group or because the coordinator kicked us out), so # disable heartbeats and wait for the main thread to rejoin. heartbeat_log.debug('Group state is not stable, disabling heartbeats') self.disable() - return - if self.coordinator.coordinator_unknown(): + elif self.coordinator.coordinator_unknown(): future = self.coordinator.lookup_coordinator() if not future.is_done or future.failed(): # the immediate future check ensures that we backoff # properly in the case that no brokers are available # to connect to (and the future is automatically failed). + self.coordinator._client._lock.release() self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000) elif self.coordinator.heartbeat.session_timeout_expired(): @@ -1098,17 +1100,13 @@ def _run_once(self): # foreground thread has stalled in between calls to # poll(), so we explicitly leave the group. heartbeat_log.warning('Heartbeat poll expired, leaving group') - ### XXX - # maybe_leave_group acquires client + coordinator lock; - # if we hold coordinator lock before calling, we risk deadlock - # release() is safe here because this is the last code in the current context - self.coordinator._lock.release() self.coordinator.maybe_leave_group() elif not self.coordinator.heartbeat.should_heartbeat(): # poll again after waiting for the retry backoff in case # the heartbeat failed or the coordinator disconnected heartbeat_log.log(0, 'Not ready to heartbeat, waiting') + self.coordinator._client._lock.release() self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000) else: @@ -1116,6 +1114,13 @@ def _run_once(self): future = self.coordinator._send_heartbeat_request() future.add_callback(self._handle_heartbeat_success) future.add_errback(self._handle_heartbeat_failure) + finally: + self.coordinator._lock.release() + try: + # Possibly released in block above to allow coordinator lock wait() + self.coordinator._client._lock.release() + except RuntimeError: + pass def _handle_heartbeat_success(self, result): with self.coordinator._lock: diff --git a/test/integration/test_consumer_group.py b/test/integration/test_consumer_group.py index b2908c757..eed570074 100644 --- a/test/integration/test_consumer_group.py +++ b/test/integration/test_consumer_group.py @@ -125,6 +125,20 @@ def consumer_thread(i): for partition in range(num_partitions)]) logging.info('Assignment looks good!') + logging.info('Verifying heartbeats') + while True: + for c in range(num_consumers): + heartbeat = consumers[c]._coordinator.heartbeat + last_hb = time.time() - 0.5 + if (heartbeat.heartbeat_failed or + heartbeat.last_receive < last_hb or + heartbeat.last_reset > last_hb): + time.sleep(0.1) + continue + else: + break + logging.info('Heartbeats look good') + finally: logging.info('Shutting down %s consumers', num_consumers) for c in range(num_consumers): @@ -163,18 +177,28 @@ def test_heartbeat_thread(kafka_broker, topic): heartbeat_interval_ms=500) # poll until we have joined group / have assignment + start = time.time() while not consumer.assignment(): consumer.poll(timeout_ms=100) assert consumer._coordinator.state is MemberState.STABLE last_poll = consumer._coordinator.heartbeat.last_poll - last_beat = consumer._coordinator.heartbeat.last_send + + # wait until we receive first heartbeat + while consumer._coordinator.heartbeat.last_receive < start: + time.sleep(0.1) + + last_send = consumer._coordinator.heartbeat.last_send + last_recv = consumer._coordinator.heartbeat.last_receive + assert last_poll > start + assert last_send > start + assert last_recv > start timeout = time.time() + 30 while True: if time.time() > timeout: raise RuntimeError('timeout waiting for heartbeat') - if consumer._coordinator.heartbeat.last_send > last_beat: + if consumer._coordinator.heartbeat.last_receive > last_recv: break time.sleep(0.5)