Description
Hi,
I would like to know whether there are any limits when the number of async producers is larger then the physical number of cores and each producer is running in its own process ?
we encounter that in such case not all the messages were sent to the kafka brokers.
I have the following use case:
I have created simple API using python Falcon framwork.
The api accept only GET requests (the source is mobile tracking service), extract url parameters from the request and send then as json message to kafka using async producer.
In order to scale the API to be able handling thousands of requests, I am using gunicorn.
I have tried gunicorn with 5 workers (when I have only 4 cores), thats mean that each worker is separate process, that will also create new kafka async producer.
In this configuration, we were not able to get all the messages using simple kafka consumer (cli).
We reduced then the number of workers to 2 and thats fine.
What could be the reason for not sending all the messages ?
my code
import falcon
from kafka import SimpleProducer, KafkaClient
import signal
import sys
import json
import os
import logging
import sys
logger = logging.getLogger('kafka')
lgr_frmt = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s\n')
main_fh = logging.FileHandler("/tmp/kafka_{pid}.log".format(pid=str(os.getpid())), mode='a')
main_fh.setFormatter(lgr_frmt)
logger.setLevel(logging.DEBUG)
logger.addHandler(main_fh)
class CollectorApi():
def __init__(self):
# the signal is taking care of case when the app (or worker) are terminated by SIGINT or SIGTERM
signal.signal(signal.SIGINT, self.signal_term_handler)
signal.signal(signal.SIGTERM, self.signal_term_handler)
# supported os types
self.supported_os_types = ('ios',)
# template for kafka topics as function of os_type
kafka_topic = "{os_type}"
# create async kafka producer
kafka = KafkaClient('kafka1:9092', 'kafka2:9092')
producer = SimpleProducer(kafka, batch_send_every_n=50, batch_send_every_t=60, async=True)
def signal_term_handler(self, signal, frame):
"""
register signal handlers
"""
self.producer.stop()
sys.exit()
def on_get(self, req, resp):
#msg = req.relative_uri
# parse event params and get the os type from the event
event_params = req.params
try:
event_os_type = event_params['os']
except KeyError:
# if there is no os key in the params, then set os=unknown
event_os_type = "unknown"
else:
# if the os type not in the supported os types list, set os=unknown
if event_os_type not in self.supported_os_types:
event_os_type = "unknown"
# send to kafka only the params as json message.
self.producer.send_messages(self.kafka_topic.format(os_type=event_os_type), json.dumps(event_params))
api = application = falcon.API()
collector = CollectorApi()
api.add_route('/', collector)