Description
I've reduced this to as simple a reproduction as possible, so bear with me a bit, and if anything doesn't make sense, just ask...
Using the latest 1.4.7 version of Kafka-Python.
In earlier versions of Python, if you call logging.config.dictConfig
and then later call it again to re-configure logging, the close
method of the previously-configured logging handlers was not called to clean up the abandoned handlers. This was a bug in Python that was fixed in Python 3.7. Now, when dictConfig
is called a second time, the previous handlers are all cleaned up by having their close
method called. If a Kafka producer is used in a logging handler and its close
method is called within the handler's close
method, it causes a deadlock that prevents program execution from continuing.
First, here's a simple use-case that works on Python 2.7, 3.5, and 3.7:
def replicate():
sys.stdout.write('Creating producer...\n')
sys.stdout.flush()
producer = KafkaProducer(
acks=1,
bootstrap_servers=['kafka:9092'],
client_id='analytics_producer',
compression_type='gzip',
linger_ms=100,
max_block_ms=1000,
request_timeout_ms=3000,
)
sys.stdout.write('Closing producer...\n')
sys.stdout.flush()
producer.close(timeout=2)
>>> replicate()
Creating producer...
Closing producer...
>>> replicate()
Creating producer...
Closing producer...
>>> replicate()
Creating producer...
Closing producer...
Each call to replicate
completes in < 1 second. As you can see, this working use case does not involve logging. But once you involve logging as below, things break:
import logging
import logging.config
import sys
from kafka.producer import KafkaProducer
class KafkaAnalyticsLoggingHandler(logging.Handler):
def __init__(self):
super(KafkaAnalyticsLoggingHandler, self).__init__()
sys.stdout.write('Creating producer...\n')
sys.stdout.flush()
self._producer = KafkaProducer(
acks=1,
bootstrap_servers=['kafka:9092'],
client_id='analytics_producer',
compression_type='gzip',
linger_ms=100,
max_block_ms=1000,
request_timeout_ms=3000,
)
def emit(self, record):
return
def close(self):
sys.stdout.write('Closing producer...\n')
sys.stdout.flush()
self._producer.close(timeout=2)
super(KafkaAnalyticsLoggingHandler, self).close()
def replicate():
config = {'version': 1, 'handlers': {'analytics': {'level': 'INFO', 'class': '__main__.KafkaAnalyticsLoggingHandler'}}, 'loggers': {'analytics': {'handlers': ['analytics'], 'level': 'INFO'}}, 'disable_existing_loggers': False}
logging.config.dictConfig(config)
logging.config.dictConfig(config)
>>> replicate()
Creating producer...
Closing producer...
^CTraceback (most recent call last):
File "<stdin>", line 1, in <module>
File "<stdin>", line 4, in replicate
File "/usr/local/lib/python3.7/logging/config.py", line 799, in dictConfig
dictConfigClass(config).configure()
File "/usr/local/lib/python3.7/logging/config.py", line 535, in configure
_clearExistingHandlers()
File "/usr/local/lib/python3.7/logging/config.py", line 272, in _clearExistingHandlers
logging.shutdown(logging._handlerList[:])
File "/usr/local/lib/python3.7/logging/__init__.py", line 2034, in shutdown
h.close()
File "<stdin>", line 20, in close
File "/usr/local/lib/python3.7/site-packages/kafka/producer/kafka.py", line 493, in close
self._sender.join()
File "/usr/local/lib/python3.7/threading.py", line 1044, in join
self._wait_for_tstate_lock()
File "/usr/local/lib/python3.7/threading.py", line 1060, in _wait_for_tstate_lock
elif lock.acquire(block, timeout):
KeyboardInterrupt
In this case, Closing producer...
appears and then it freezes. No prompt, no CPU usage, just total deadlock. Only Ctrl+C can get you out of it, and then you see the stack trace.
This only happens in Python 3.7, because that's when the logging config bug was fixed. In Python 2.7 and 3.5, there are no deadlocks and you see the output below. Note that it never closes the producer, because close
is never called on the handler:
>>> replicate()
Creating producer...
Creating producer...
>>> replicate()
Creating producer...
Creating producer...
>>> replicate()
Creating producer...
Creating producer...
I think the first, working use case demonstrates that this isn't a timing issue. It's possible to rapidly close a producer after instantiating it without errors or deadlocks. The problem only occurs when the Producer's close
method is called from within a logging handler's close
method during logging reconfiguration. Critically, it does not happen when logging is shut down at the end of the application! Demonstration (you need a fresh Python shell to attempt this):
>>> config = {'version': 1, 'handlers': {'analytics': {'level': 'INFO', 'class': '__main__.KafkaAnalyticsLoggingHandler'}}, 'loggers': {'analytics': {'handlers': ['analytics'], 'level': 'INFO'}}, 'disable_existing_loggers': False}
>>> logging.config.dictConfig(config)
Creating producer...
>>> exit()
Closing producer...
nwilliams $
I'm certain this has something to do with Python logging's locks interfering with the thread locks of the Kafka sender somehow, but I don't begin to understand how. 🤔