Skip to content

Commit 4e339a7

Browse files
author
Dana Powers
committed
Use MurmurHash2 for key partition hashing
1 parent 8e3cd1c commit 4e339a7

File tree

2 files changed

+98
-3
lines changed

2 files changed

+98
-3
lines changed

kafka/partitioner/__init__.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
from .roundrobin import RoundRobinPartitioner
2-
from .hashed import HashedPartitioner
2+
from .hashed import HashedPartitioner, Murmur2Partitioner, LegacyPartitioner
33

44
__all__ = [
5-
'RoundRobinPartitioner', 'HashedPartitioner'
5+
'RoundRobinPartitioner', 'HashedPartitioner', 'Murmur2Partitioner',
6+
'LegacyPartitioner'
67
]

kafka/partitioner/hashed.py

Lines changed: 95 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,25 @@
11
from .base import Partitioner
22

3-
class HashedPartitioner(Partitioner):
3+
4+
class Murmur2Partitioner(Partitioner):
45
"""
6+
Implements a partitioner which selects the target partition based on
7+
the hash of the key. Attempts to apply the same hashing
8+
function as mainline java client.
9+
"""
10+
def partition(self, key, partitions=None):
11+
if not partitions:
12+
partitions = self.partitions
13+
14+
# https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/clients/producer/internals/Partitioner.java#L69
15+
idx = (murmur2(key) & 0x7fffffff) % len(partitions)
16+
17+
return partitions[idx]
18+
19+
20+
class LegacyPartitioner(Partitioner):
21+
"""DEPRECATED -- See Issue 374
22+
523
Implements a partitioner which selects the target partition based on
624
the hash of the key
725
"""
@@ -12,3 +30,79 @@ def partition(self, key, partitions=None):
1230
idx = hash(key) % size
1331

1432
return partitions[idx]
33+
34+
35+
# Default will change to Murmur2 in 0.10 release
36+
HashedPartitioner = LegacyPartitioner
37+
38+
39+
# https://github.com/apache/kafka/blob/0.8.2/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L244
40+
def murmur2(key):
41+
"""Pure-python Murmur2 implementation.
42+
43+
Based on java client, see org.apache.kafka.common.utils.Utils.murmur2
44+
45+
Args:
46+
key: if not a bytearray, converted via bytearray(str(key))
47+
48+
Returns: MurmurHash2 of key bytearray
49+
"""
50+
51+
# Convert key to a bytearray
52+
if not isinstance(key, bytearray):
53+
data = bytearray(str(key))
54+
55+
length = len(data)
56+
seed = 0x9747b28c
57+
# 'm' and 'r' are mixing constants generated offline.
58+
# They're not really 'magic', they just happen to work well.
59+
m = 0x5bd1e995
60+
r = 24
61+
62+
# Initialize the hash to a random value
63+
h = seed ^ length
64+
length4 = length / 4
65+
66+
for i in range(length4):
67+
i4 = i * 4
68+
k = ((data[i4 + 0] & 0xff) +
69+
((data[i4 + 1] & 0xff) << 8) +
70+
((data[i4 + 2] & 0xff) << 16) +
71+
((data[i4 + 3] & 0xff) << 24))
72+
k &= 0xffffffff
73+
k *= m
74+
k &= 0xffffffff
75+
k ^= (k % 0x100000000) >> r # k ^= k >>> r
76+
k &= 0xffffffff
77+
k *= m
78+
k &= 0xffffffff
79+
80+
h *= m
81+
h &= 0xffffffff
82+
h ^= k
83+
h &= 0xffffffff
84+
85+
# Handle the last few bytes of the input array
86+
extra_bytes = length % 4
87+
if extra_bytes == 3:
88+
h ^= (data[(length & ~3) + 2] & 0xff) << 16
89+
h &= 0xffffffff
90+
91+
if extra_bytes == 2:
92+
h ^= (data[(length & ~3) + 1] & 0xff) << 8
93+
h &= 0xffffffff
94+
95+
if extra_bytes == 1:
96+
h ^= (data[length & ~3] & 0xff)
97+
h &= 0xffffffff
98+
h *= m
99+
h &= 0xffffffff
100+
101+
h ^= (h % 0x100000000) >> 13 # h >>> 13;
102+
h &= 0xffffffff
103+
h *= m
104+
h &= 0xffffffff
105+
h ^= (h % 0x100000000) >> 15 # h >>> 15;
106+
h &= 0xffffffff
107+
108+
return h

0 commit comments

Comments
 (0)