Skip to content

number of producers and number of cores #452

Closed
@imazor

Description

@imazor

Hi,

I would like to know whether there are any limits when the number of async producers is larger then the physical number of cores and each producer is running in its own process ?

we encounter that in such case not all the messages were sent to the kafka brokers.


I have the following use case:
I have created simple API using python Falcon framwork.
The api accept only GET requests (the source is mobile tracking service), extract url parameters from the request and send then as json message to kafka using async producer.
In order to scale the API to be able handling thousands of requests, I am using gunicorn.
I have tried gunicorn with 5 workers (when I have only 4 cores), thats mean that each worker is separate process, that will also create new kafka async producer.
In this configuration, we were not able to get all the messages using simple kafka consumer (cli).
We reduced then the number of workers to 2 and thats fine.

What could be the reason for not sending all the messages ?

my code


import falcon
from kafka import SimpleProducer, KafkaClient
import signal
import sys
import json
import os

import logging
import sys
logger = logging.getLogger('kafka')
lgr_frmt = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s\n')
main_fh = logging.FileHandler("/tmp/kafka_{pid}.log".format(pid=str(os.getpid())), mode='a')
main_fh.setFormatter(lgr_frmt)
logger.setLevel(logging.DEBUG)
logger.addHandler(main_fh)

class CollectorApi():

def __init__(self):
    # the signal is taking care of case when the app (or worker) are terminated by SIGINT or SIGTERM
    signal.signal(signal.SIGINT, self.signal_term_handler)
    signal.signal(signal.SIGTERM, self.signal_term_handler)
    # supported os types
    self.supported_os_types = ('ios',)

# template for kafka topics as function of os_type
kafka_topic = "{os_type}"
# create async kafka producer
kafka = KafkaClient('kafka1:9092', 'kafka2:9092')
producer = SimpleProducer(kafka, batch_send_every_n=50, batch_send_every_t=60, async=True)

def signal_term_handler(self, signal, frame):
    """
    register signal handlers
    """
    self.producer.stop()
    sys.exit()

def on_get(self, req, resp):
    #msg = req.relative_uri

    # parse event params and get the os type from the event
    event_params = req.params
    try:
        event_os_type = event_params['os']
    except KeyError:
        # if there is no os key in the params, then set os=unknown
        event_os_type = "unknown"
    else:
        # if the os type not in the supported os types list, set os=unknown
        if event_os_type not in self.supported_os_types:
            event_os_type = "unknown"

    # send to kafka only the params as json message.
    self.producer.send_messages(self.kafka_topic.format(os_type=event_os_type), json.dumps(event_params))

api = application = falcon.API()
collector = CollectorApi()
api.add_route('/', collector)


Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions