Skip to content

Commit 6fc6856

Browse files
committed
Merge pull request #355 from dpkp/correlation_id_modulo
correlation_id modulo
2 parents d05fccb + 1313388 commit 6fc6856

File tree

2 files changed

+15
-7
lines changed

2 files changed

+15
-7
lines changed

kafka/client.py

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
import collections
33
import copy
44
import functools
5-
import itertools
65
import logging
76
import time
87
import kafka.common
@@ -23,17 +22,18 @@
2322
class KafkaClient(object):
2423

2524
CLIENT_ID = b"kafka-python"
26-
ID_GEN = itertools.count()
2725

2826
# NOTE: The timeout given to the client should always be greater than the
2927
# one passed to SimpleConsumer.get_message(), otherwise you can get a
3028
# socket timeout.
3129
def __init__(self, hosts, client_id=CLIENT_ID,
32-
timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS):
30+
timeout=DEFAULT_SOCKET_TIMEOUT_SECONDS,
31+
correlation_id=0):
3332
# We need one connection to bootstrap
3433
self.client_id = kafka_bytestring(client_id)
3534
self.timeout = timeout
3635
self.hosts = collect_hosts(hosts)
36+
self.correlation_id = correlation_id
3737

3838
# create connections only when we need them
3939
self.conns = {}
@@ -98,10 +98,10 @@ def _get_leader_for_partition(self, topic, partition):
9898
return self.brokers[meta.leader]
9999

100100
def _next_id(self):
101-
"""
102-
Generate a new correlation id
103-
"""
104-
return next(KafkaClient.ID_GEN)
101+
"""Generate a new correlation id"""
102+
# modulo to keep w/i int32
103+
self.correlation_id = (self.correlation_id + 1) % 2**31
104+
return self.correlation_id
105105

106106
def _send_broker_unaware_request(self, payloads, encoder_fn, decoder_fn):
107107
"""

test/test_client.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -401,3 +401,11 @@ def _timeout(*args, **kwargs):
401401
with self.assertRaises(ConnectionError):
402402
KafkaConnection("nowhere", 1234, 1.0)
403403
self.assertGreaterEqual(t.interval, 1.0)
404+
405+
def test_correlation_rollover(self):
406+
with patch.object(KafkaClient, 'load_metadata_for_topics'):
407+
big_num = 2**31 - 3
408+
client = KafkaClient(hosts=[], correlation_id=big_num)
409+
self.assertEqual(big_num + 1, client._next_id())
410+
self.assertEqual(big_num + 2, client._next_id())
411+
self.assertEqual(0, client._next_id())

0 commit comments

Comments
 (0)