Skip to content

Commit bd21713

Browse files
authored
More / updated debug logging for coordinator / consumer (#2630)
1 parent 3463f59 commit bd21713

File tree

2 files changed

+10
-2
lines changed

2 files changed

+10
-2
lines changed

kafka/consumer/group.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -699,20 +699,21 @@ def _poll_once(self, timer, max_records, update_offsets=True):
699699
dict: Map of topic to list of records (may be empty).
700700
"""
701701
if not self._coordinator.poll(timeout_ms=timer.timeout_ms):
702+
log.debug('poll: timeout during coordinator.poll(); returning early')
702703
return {}
703704

704705
has_all_fetch_positions = self._update_fetch_positions(timeout_ms=timer.timeout_ms)
705706

706707
# If data is available already, e.g. from a previous network client
707708
# poll() call to commit, then just return it immediately
708709
records, partial = self._fetcher.fetched_records(max_records, update_offsets=update_offsets)
709-
log.debug('Fetched records: %s, %s', records, partial)
710+
log.debug('poll: fetched records: %s, %s', records, partial)
710711
# Before returning the fetched records, we can send off the
711712
# next round of fetches and avoid block waiting for their
712713
# responses to enable pipelining while the user is handling the
713714
# fetched records.
714715
if not partial:
715-
log.debug("Sending fetches")
716+
log.debug("poll: Sending fetches")
716717
futures = self._fetcher.send_fetches()
717718
if len(futures):
718719
self._client.poll(timeout_ms=0)
@@ -724,12 +725,14 @@ def _poll_once(self, timer, max_records, update_offsets=True):
724725
# since the offset lookup may be backing off after a failure
725726
poll_timeout_ms = min(timer.timeout_ms, self._coordinator.time_to_next_poll() * 1000)
726727
if not has_all_fetch_positions:
728+
log.debug('poll: do not have all fetch positions...')
727729
poll_timeout_ms = min(poll_timeout_ms, self.config['retry_backoff_ms'])
728730

729731
self._client.poll(timeout_ms=poll_timeout_ms)
730732
# after the long poll, we should check whether the group needs to rebalance
731733
# prior to returning data so that the group can stabilize faster
732734
if self._coordinator.need_rejoin():
735+
log.debug('poll: coordinator needs rejoin; returning early')
733736
return {}
734737

735738
records, _ = self._fetcher.fetched_records(max_records, update_offsets=update_offsets)

kafka/coordinator/consumer.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -274,6 +274,7 @@ def poll(self, timeout_ms=None):
274274
try:
275275
self._invoke_completed_offset_commit_callbacks()
276276
if not self.ensure_coordinator_ready(timeout_ms=timer.timeout_ms):
277+
log.debug('coordinator.poll: timeout in ensure_coordinator_ready; returning early')
277278
return False
278279

279280
if self.config['api_version'] >= (0, 9) and self._subscription.partitions_auto_assigned():
@@ -293,9 +294,11 @@ def poll(self, timeout_ms=None):
293294
metadata_update = self._client.cluster.request_update()
294295
self._client.poll(future=metadata_update, timeout_ms=timer.timeout_ms)
295296
if not metadata_update.is_done:
297+
log.debug('coordinator.poll: timeout updating metadata; returning early')
296298
return False
297299

298300
if not self.ensure_active_group(timeout_ms=timer.timeout_ms):
301+
log.debug('coordinator.poll: timeout in ensure_active_group; returning early')
299302
return False
300303

301304
self.poll_heartbeat()
@@ -723,6 +726,7 @@ def _send_offset_commit_request(self, offsets):
723726
return future
724727

725728
def _handle_offset_commit_response(self, offsets, future, send_time, response):
729+
log.debug("Received OffsetCommitResponse: %s", response)
726730
# TODO look at adding request_latency_ms to response (like java kafka)
727731
if self._consumer_sensors:
728732
self._consumer_sensors.commit_latency.record((time.time() - send_time) * 1000)
@@ -849,6 +853,7 @@ def _send_offset_fetch_request(self, partitions):
849853
return future
850854

851855
def _handle_offset_fetch_response(self, future, response):
856+
log.debug("Received OffsetFetchResponse: %s", response)
852857
if response.API_VERSION >= 2 and response.error_code != Errors.NoError.errno:
853858
error_type = Errors.for_code(response.error_code)
854859
log.debug("Offset fetch failed: %s", error_type.__name__)

0 commit comments

Comments
 (0)