Skip to content

Deadlock in Python 3.7 when KafkaProducer closed as part of logging reconfiguration #1931

Closed
@nickwilliams-eventbrite

Description

I've reduced this to as simple a reproduction as possible, so bear with me a bit, and if anything doesn't make sense, just ask...

Using the latest 1.4.7 version of Kafka-Python.

In earlier versions of Python, if you call logging.config.dictConfig and then later call it again to re-configure logging, the close method of the previously-configured logging handlers was not called to clean up the abandoned handlers. This was a bug in Python that was fixed in Python 3.7. Now, when dictConfig is called a second time, the previous handlers are all cleaned up by having their close method called. If a Kafka producer is used in a logging handler and its close method is called within the handler's close method, it causes a deadlock that prevents program execution from continuing.

First, here's a simple use-case that works on Python 2.7, 3.5, and 3.7:

def replicate():
    sys.stdout.write('Creating producer...\n')
    sys.stdout.flush()
    producer = KafkaProducer(
        acks=1,
        bootstrap_servers=['kafka:9092'],
        client_id='analytics_producer',
        compression_type='gzip',
        linger_ms=100,
        max_block_ms=1000,
        request_timeout_ms=3000,
    )
    sys.stdout.write('Closing producer...\n')
    sys.stdout.flush()
    producer.close(timeout=2)

>>> replicate()
Creating producer...
Closing producer...
>>> replicate()
Creating producer...
Closing producer...
>>> replicate()
Creating producer...
Closing producer...

Each call to replicate completes in < 1 second. As you can see, this working use case does not involve logging. But once you involve logging as below, things break:

import logging
import logging.config
import sys
from kafka.producer import KafkaProducer


class KafkaAnalyticsLoggingHandler(logging.Handler):
    def __init__(self):
        super(KafkaAnalyticsLoggingHandler, self).__init__()
        sys.stdout.write('Creating producer...\n')
        sys.stdout.flush()
        self._producer = KafkaProducer(
            acks=1,
            bootstrap_servers=['kafka:9092'],
            client_id='analytics_producer',
            compression_type='gzip',
            linger_ms=100,
            max_block_ms=1000,
            request_timeout_ms=3000,
        )

    def emit(self, record):
        return

    def close(self):
        sys.stdout.write('Closing producer...\n')
        sys.stdout.flush()
        self._producer.close(timeout=2)
        super(KafkaAnalyticsLoggingHandler, self).close()


def replicate():
    config = {'version': 1, 'handlers': {'analytics': {'level': 'INFO', 'class': '__main__.KafkaAnalyticsLoggingHandler'}}, 'loggers': {'analytics': {'handlers': ['analytics'], 'level': 'INFO'}}, 'disable_existing_loggers': False}
    logging.config.dictConfig(config)
    logging.config.dictConfig(config)

>>> replicate()
Creating producer...
Closing producer...
^CTraceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "<stdin>", line 4, in replicate
  File "/usr/local/lib/python3.7/logging/config.py", line 799, in dictConfig
    dictConfigClass(config).configure()
  File "/usr/local/lib/python3.7/logging/config.py", line 535, in configure
    _clearExistingHandlers()
  File "/usr/local/lib/python3.7/logging/config.py", line 272, in _clearExistingHandlers
    logging.shutdown(logging._handlerList[:])
  File "/usr/local/lib/python3.7/logging/__init__.py", line 2034, in shutdown
    h.close()
  File "<stdin>", line 20, in close
  File "/usr/local/lib/python3.7/site-packages/kafka/producer/kafka.py", line 493, in close
    self._sender.join()
  File "/usr/local/lib/python3.7/threading.py", line 1044, in join
    self._wait_for_tstate_lock()
  File "/usr/local/lib/python3.7/threading.py", line 1060, in _wait_for_tstate_lock
    elif lock.acquire(block, timeout):
KeyboardInterrupt

In this case, Closing producer... appears and then it freezes. No prompt, no CPU usage, just total deadlock. Only Ctrl+C can get you out of it, and then you see the stack trace.

This only happens in Python 3.7, because that's when the logging config bug was fixed. In Python 2.7 and 3.5, there are no deadlocks and you see the output below. Note that it never closes the producer, because close is never called on the handler:

>>> replicate()
Creating producer...
Creating producer...
>>> replicate()
Creating producer...
Creating producer...
>>> replicate()
Creating producer...
Creating producer...

I think the first, working use case demonstrates that this isn't a timing issue. It's possible to rapidly close a producer after instantiating it without errors or deadlocks. The problem only occurs when the Producer's close method is called from within a logging handler's close method during logging reconfiguration. Critically, it does not happen when logging is shut down at the end of the application! Demonstration (you need a fresh Python shell to attempt this):

>>> config = {'version': 1, 'handlers': {'analytics': {'level': 'INFO', 'class': '__main__.KafkaAnalyticsLoggingHandler'}}, 'loggers': {'analytics': {'handlers': ['analytics'], 'level': 'INFO'}}, 'disable_existing_loggers': False}
>>> logging.config.dictConfig(config)
Creating producer...
>>> exit()
Closing producer...
nwilliams $

I'm certain this has something to do with Python logging's locks interfering with the thread locks of the Kafka sender somehow, but I don't begin to understand how. 🤔

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions