Skip to content

Should consumer.poll() first verify the consumer isn't closed? #1219

Closed
@jeffwidman

Description

@jeffwidman

Is it expected behavior that, depending on the choice of selector, you can successfully poll for messages after closing the consumer?

If you call consumer.close() followed by consumer.poll() that you'll generally (but not always) get an exception... The exception is thrown by the underlying selector implementation. EpollSelector and SelectSelector throw an error about the file descriptor being closed.

However, I noticed that PollSelector does not throw any error, and in a surprising twist, I can continue to consume messages by calling consumer.poll() after calling consumer.close(). I specifically observed this behavior within a docker container while running some simple tests, as we need to pin the selector due to a gevent issue at my day job:

from kafka import KafkaConsumer, KafkaProducer
from kafka.client_async import selectors

bootstrap_server = '172.18.0.6'  # docker IP

kp = KafkaProducer(bootstrap_servers=[bootstrap_server])

kp.send('test_topic', 'auto create the topic')
kp.flush()

kc = KafkaConsumer('test_topic',
                group_id='testing-kp-selectors',
                bootstrap_servers=[bootstrap_server],
                enable_auto_commit=False,
                selector=selectors.PollSelector)

# force the connection to kafka
p0 = kc.poll(timeout_ms=1000)

# start the test
kp.send('test_topic', 't1')
kp.flush()

# confirm poll successful
p1 = kc.poll(timeout_ms=1000)

assert p1.values()[0][0].value == 't1'

kc.close()

# send message after consumer closed
kp.send('test_topic', 't2')
kp.flush()

# continue consuming from closed consumer
p2 = kc.poll(timeout_ms=1000)
assert p2.values()[0][0].value == 't2'

print("Finished")

Should we change this behavior?

On the one hand, it seems like a natural safeguard that the client should throw an straightforward human-readable error if the connection is closed, rather than an indirect error about an unavailable file descriptor, or, in the case of PollSelector, continuing to return records.

On the other hand, this check would run on every single poll() call, and I'm loathe to waste the world's compute energy on something that is only hit in edge cases.

Thoughts?

Metadata

Metadata

Assignees

No one assigned

    Labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions