Skip to content

BrokerConnection: deadlock casused by not handled socket recv exception, exists in latest master branch #1817

Closed
@ventfang

Description

@ventfang

kafka-python: 1.4.6

here is my code snippet :

while 1:
    msgs = self.consumer.poll(timeout_ms)
    handle_message(msg)

traceback:

 2019-05-25 19:51:09,205 kafka_clent.py[line:56] ERROR Traceback (most recent call last):
  File "kafka_clent.py", line 48, in __main
    self.__poll_once(handle_messages, timeout_ms)
  File "kafka_clent.py", line 67, in __poll_once
    msgs = self.consumer.poll(timeout_ms)
  File "./opt/venv/lib/python3.7/site-packages/kafka/consumer/group.py", line 617, in poll
    records = self._poll_once(remaining, max_records)
  File "./opt/venv/lib/python3.7/site-packages/kafka/consumer/group.py", line 637, in _poll_once
    self._coordinator.poll()
  File "./opt/venv/lib/python3.7/site-packages/kafka/coordinator/consumer.py", line 263, in poll
    self.ensure_coordinator_ready()
  File "./opt/venv/lib/python3.7/site-packages/kafka/coordinator/base.py", line 259, in ensure_coordinator_ready
    self._client.poll(future=future)
  File "./opt/venv/lib/python3.7/site-packages/kafka/client_async.py", line 593, in poll
    self._poll(timeout)
  File "./opt/venv/lib/python3.7/site-packages/kafka/client_async.py", line 649, in _poll
    self._pending_completion.extend(conn.recv())
  File "./opt/venv/lib/python3.7/site-packages/kafka/conn.py", line 905, in recv
    responses = self._recv()
  File "./opt/venv/lib/python3.7/site-packages/kafka/conn.py", line 942, in _recv
    data = self._sock.recv(self.config['sock_chunk_bytes'])
TimeoutError: [Errno 60] Operation timed out

^C2019-05-25 19:52:19,183 kafka_clent.py[line:51] INFO Traceback (most recent call last):
  File "kafka_clent.py", line 48, in __main
    self.__poll_once(handle_messages, timeout_ms)
  File "kafka_clent.py", line 67, in __poll_once
    msgs = self.consumer.poll(timeout_ms)
  File "./opt/venv/lib/python3.7/site-packages/kafka/consumer/group.py", line 617, in poll
    records = self._poll_once(remaining, max_records)
  File "./opt/venv/lib/python3.7/site-packages/kafka/consumer/group.py", line 637, in _poll_once
    self._coordinator.poll()
  File "./opt/venv/lib/python3.7/site-packages/kafka/coordinator/consumer.py", line 263, in poll
    self.ensure_coordinator_ready()
  File "./opt/venv/lib/python3.7/site-packages/kafka/coordinator/base.py", line 246, in ensure_coordinator_ready
    with self._client._lock, self._lock:
KeyboardInterrupt

Metadata

Metadata

Assignees

Labels

No labels
No labels

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions