diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 852157811..6d926992c 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -43,6 +43,9 @@ def __eq__(self, other): self.member_id == other.member_id and self.protocol == other.protocol) + def __str__(self): + return "" % (self.generation_id, self.member_id, self.protocol) + Generation.NO_GENERATION = Generation(DEFAULT_GENERATION_ID, UNKNOWN_MEMBER_ID, None) @@ -398,17 +401,16 @@ def _handle_join_success(self, member_assignment_bytes): # will be invoked even if the consumer is woken up before # finishing the rebalance with self._lock: - log.info("Successfully joined group %s with generation %s", - self.group_id, self._generation.generation_id) self.state = MemberState.STABLE if self._heartbeat_thread: self._heartbeat_thread.enable() - def _handle_join_failure(self, _): + def _handle_join_failure(self, exception): # we handle failures below after the request finishes. # if the join completes after having been woken up, # the exception is ignored and we will rejoin with self._lock: + log.info("Failed to join group %s: %s", self.group_id, exception) self.state = MemberState.UNJOINED def ensure_active_group(self, timeout_ms=None): @@ -566,10 +568,9 @@ def _failed_request(self, node_id, request, future, error): future.failure(error) def _handle_join_group_response(self, future, send_time, response): + log.debug("Received JoinGroup response: %s", response) error_type = Errors.for_code(response.error_code) if error_type is Errors.NoError: - log.debug("Received successful JoinGroup response for group %s: %s", - self.group_id, response) if self._sensors: self._sensors.join_latency.record((time.time() - send_time) * 1000) with self._lock: @@ -583,6 +584,7 @@ def _handle_join_group_response(self, future, send_time, response): response.member_id, response.group_protocol) + log.info("Successfully joined group %s %s", self.group_id, self._generation) if response.leader_id == response.member_id: log.info("Elected group leader -- performing partition" " assignments using %s", self._generation.protocol) @@ -591,24 +593,24 @@ def _handle_join_group_response(self, future, send_time, response): self._on_join_follower().chain(future) elif error_type is Errors.CoordinatorLoadInProgressError: - log.debug("Attempt to join group %s rejected since coordinator %s" - " is loading the group.", self.group_id, self.coordinator_id) + log.info("Attempt to join group %s rejected since coordinator %s" + " is loading the group.", self.group_id, self.coordinator_id) # backoff and retry future.failure(error_type(response)) elif error_type is Errors.UnknownMemberIdError: # reset the member id and retry immediately error = error_type(self._generation.member_id) self.reset_generation() - log.debug("Attempt to join group %s failed due to unknown member id", - self.group_id) + log.info("Attempt to join group %s failed due to unknown member id", + self.group_id) future.failure(error) elif error_type in (Errors.CoordinatorNotAvailableError, Errors.NotCoordinatorError): # re-discover the coordinator and retry with backoff self.coordinator_dead(error_type()) - log.debug("Attempt to join group %s failed due to obsolete " - "coordinator information: %s", self.group_id, - error_type.__name__) + log.info("Attempt to join group %s failed due to obsolete " + "coordinator information: %s", self.group_id, + error_type.__name__) future.failure(error_type()) elif error_type in (Errors.InconsistentGroupProtocolError, Errors.InvalidSessionTimeoutError, @@ -619,12 +621,21 @@ def _handle_join_group_response(self, future, send_time, response): self.group_id, error) future.failure(error) elif error_type is Errors.GroupAuthorizationFailedError: + log.error("Attempt to join group %s failed due to group authorization error", + self.group_id) future.failure(error_type(self.group_id)) elif error_type is Errors.MemberIdRequiredError: # Broker requires a concrete member id to be allowed to join the group. Update member id # and send another join group request in next cycle. + log.info("Received member id %s for group %s; will retry join-group", + response.member_id, self.group_id) self.reset_generation(response.member_id) future.failure(error_type()) + elif error_type is Errors.RebalanceInProgressError: + log.info("Attempt to join group %s failed due to RebalanceInProgressError," + " which could indicate a replication timeout on the broker. Will retry.", + self.group_id) + future.failure(error_type()) else: # unexpected error, throw the exception error = error_type() @@ -693,6 +704,7 @@ def _send_sync_group_request(self, request): return future def _handle_sync_group_response(self, future, send_time, response): + log.debug("Received SyncGroup response: %s", response) error_type = Errors.for_code(response.error_code) if error_type is Errors.NoError: if self._sensors: @@ -739,13 +751,13 @@ def _send_group_coordinator_request(self): e = Errors.NodeNotReadyError(node_id) return Future().failure(e) - log.debug("Sending group coordinator request for group %s to broker %s", - self.group_id, node_id) version = self._client.api_version(FindCoordinatorRequest, max_version=2) if version == 0: request = FindCoordinatorRequest[version](self.group_id) else: request = FindCoordinatorRequest[version](self.group_id, 0) + log.debug("Sending group coordinator request for group %s to broker %s: %s", + self.group_id, node_id, request) future = Future() _f = self._client.send(node_id, request) _f.add_callback(self._handle_group_coordinator_response, future) @@ -865,6 +877,7 @@ def maybe_leave_group(self, timeout_ms=None): log.info('Leaving consumer group (%s).', self.group_id) version = self._client.api_version(LeaveGroupRequest, max_version=2) request = LeaveGroupRequest[version](self.group_id, self._generation.member_id) + log.debug('Sending LeaveGroupRequest to %s: %s', self.coordinator_id, request) future = self._client.send(self.coordinator_id, request) future.add_callback(self._handle_leave_group_response) future.add_errback(log.error, "LeaveGroup request failed: %s") @@ -873,10 +886,11 @@ def maybe_leave_group(self, timeout_ms=None): self.reset_generation() def _handle_leave_group_response(self, response): + log.debug("Received LeaveGroupResponse: %s", response) error_type = Errors.for_code(response.error_code) if error_type is Errors.NoError: - log.debug("LeaveGroup request for group %s returned successfully", - self.group_id) + log.info("LeaveGroup request for group %s returned successfully", + self.group_id) else: log.error("LeaveGroup request for group %s failed with error: %s", self.group_id, error_type()) @@ -896,7 +910,7 @@ def _send_heartbeat_request(self): request = HeartbeatRequest[version](self.group_id, self._generation.generation_id, self._generation.member_id) - heartbeat_log.debug("Heartbeat: %s[%s] %s", request.group, request.generation_id, request.member_id) # pylint: disable-msg=no-member + heartbeat_log.debug("Sending HeartbeatRequest to %s: %s", self.coordinator_id, request) future = Future() _f = self._client.send(self.coordinator_id, request) _f.add_callback(self._handle_heartbeat_response, future, time.time()) @@ -907,10 +921,10 @@ def _send_heartbeat_request(self): def _handle_heartbeat_response(self, future, send_time, response): if self._sensors: self._sensors.heartbeat_latency.record((time.time() - send_time) * 1000) + heartbeat_log.debug("Received heartbeat response for group %s: %s", + self.group_id, response) error_type = Errors.for_code(response.error_code) if error_type is Errors.NoError: - heartbeat_log.debug("Received successful heartbeat response for group %s", - self.group_id) future.success(None) elif error_type in (Errors.CoordinatorNotAvailableError, Errors.NotCoordinatorError): @@ -1099,7 +1113,13 @@ def _run_once(self): # the poll timeout has expired, which means that the # foreground thread has stalled in between calls to # poll(), so we explicitly leave the group. - heartbeat_log.warning('Heartbeat poll expired, leaving group') + heartbeat_log.warning( + "Consumer poll timeout has expired. This means the time between subsequent calls to poll()" + " was longer than the configured max_poll_interval_ms, which typically implies that" + " the poll loop is spending too much time processing messages. You can address this" + " either by increasing max_poll_interval_ms or by reducing the maximum size of batches" + " returned in poll() with max_poll_records." + ) self.coordinator.maybe_leave_group() elif not self.coordinator.heartbeat.should_heartbeat(): @@ -1110,10 +1130,12 @@ def _run_once(self): self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000) else: + heartbeat_log.debug('Sending heartbeat for group %s %s', self.coordinator.group_id, self.coordinator._generation) self.coordinator.heartbeat.sent_heartbeat() future = self.coordinator._send_heartbeat_request() future.add_callback(self._handle_heartbeat_success) future.add_errback(self._handle_heartbeat_failure) + finally: self.coordinator._lock.release() try: @@ -1124,6 +1146,7 @@ def _run_once(self): def _handle_heartbeat_success(self, result): with self.coordinator._lock: + heartbeat_log.debug('Heartbeat success') self.coordinator.heartbeat.received_heartbeat() def _handle_heartbeat_failure(self, exception): @@ -1134,8 +1157,10 @@ def _handle_heartbeat_failure(self, exception): # member in the group for as long as the duration of the # rebalance timeout. If we stop sending heartbeats, however, # then the session timeout may expire before we can rejoin. + heartbeat_log.debug('Treating RebalanceInProgressError as successful heartbeat') self.coordinator.heartbeat.received_heartbeat() else: + heartbeat_log.debug('Heartbeat failure: %s', exception) self.coordinator.heartbeat.fail_heartbeat() # wake up the thread if it's sleeping to reschedule the heartbeat self.coordinator._lock.notify()