Skip to content

Commit 94595b8

Browse files
committed
Error if connections_max_idle_ms not larger than request_timeout_ms
`connections_max_idle_ms` must always be larger than `request_timeout_ms` to avoid potentially unexpected behavior. Fix #1680.
1 parent 70ea4c1 commit 94595b8

File tree

2 files changed

+10
-3
lines changed

2 files changed

+10
-3
lines changed

kafka/consumer/group.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -323,9 +323,12 @@ def __init__(self, *topics, **configs):
323323

324324
request_timeout_ms = self.config['request_timeout_ms']
325325
fetch_max_wait_ms = self.config['fetch_max_wait_ms']
326-
if request_timeout_ms <= fetch_max_wait_ms:
327-
raise KafkaConfigurationError("Request timeout (%s) must be larger than fetch-max-wait-ms (%s)" %
328-
(request_timeout_ms, fetch_max_wait_ms))
326+
if connections_max_idle_ms <= request_timeout_ms <= fetch_max_wait_ms:
327+
raise KafkaConfigurationError(
328+
"connections_max_idle_ms ({}) must be larger than "
329+
"request_timeout_ms ({}) which must be larger than "
330+
"fetch_max_wait_ms ({})."
331+
.format(connections_max_idle_ms, request_timeout_ms, fetch_max_wait_ms))
329332

330333
metrics_tags = {'client-id': self.config['client_id']}
331334
metric_config = MetricConfig(samples=self.config['metrics_num_samples'],

test/test_consumer.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,10 @@ def test_fetch_max_wait_larger_than_request_timeout_raises(self):
2121
with pytest.raises(KafkaConfigurationError):
2222
KafkaConsumer(bootstrap_servers='localhost:9092', fetch_max_wait_ms=41000, request_timeout_ms=40000)
2323

24+
def test_connections_max_idle_ms_smaller_than_request_timeout_raises(self):
25+
with pytest.raises(KafkaConfigurationError):
26+
KafkaConsumer(bootstrap_servers='localhost:9092', connections_max_idle_ms=39000, request_timeout_ms=40000)
27+
2428
def test_subscription_copy(self):
2529
consumer = KafkaConsumer('foo', api_version=(0, 10))
2630
sub = consumer.subscription()

0 commit comments

Comments
 (0)