Skip to content

Commit 359116e

Browse files
author
Tincu Gabriel
committed
core: add support for flexible versions
Varint code lifted from the java kafka repo, with the caveat that we need to specify a byte size by bitmasking due to how python integers handle the sign bit Serialization tests lifted from java repo as well - Add support for varints - Add support for compact collections (byte array, string, array) - Add support for new request and response headers, supporting flexible versions - Add flexible versions extraction script - Add List / Alter partition reassignments apis
1 parent 6f932ba commit 359116e

File tree

8 files changed

+391
-14
lines changed

8 files changed

+391
-14
lines changed

kafka/protocol/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,4 +43,6 @@
4343
40: 'ExpireDelegationToken',
4444
41: 'DescribeDelegationToken',
4545
42: 'DeleteGroups',
46+
45: 'AlterPartitionReassignments',
47+
46: 'ListPartitionReassignments',
4648
}

kafka/protocol/admin.py

Lines changed: 90 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from __future__ import absolute_import
22

33
from kafka.protocol.api import Request, Response
4-
from kafka.protocol.types import Array, Boolean, Bytes, Int8, Int16, Int32, Int64, Schema, String
4+
from kafka.protocol.types import Array, Boolean, Bytes, Int8, Int16, Int32, Int64, Schema, String, CompactString, CompactArray, TaggedFields
55

66

77
class ApiVersionResponse_v0(Response):
@@ -923,3 +923,92 @@ class DeleteGroupsRequest_v1(Request):
923923
DeleteGroupsResponse = [
924924
DeleteGroupsResponse_v0, DeleteGroupsResponse_v1
925925
]
926+
927+
928+
class AlterPartitionReassignmentsResponse_v0(Response):
929+
API_KEY = 45
930+
API_VERSION = 0
931+
SCHEMA = Schema(
932+
("throttle_time_ms", Int32),
933+
("error_code", Int16),
934+
("error_message", CompactString("utf-8")),
935+
("responses", CompactArray(
936+
("name", CompactString("utf-8")),
937+
("partitions", CompactArray(
938+
("partition_index", Int32),
939+
("error_code", Int16),
940+
("error_message", CompactString("utf-8")),
941+
("tags", TaggedFields)
942+
)),
943+
("tags", TaggedFields)
944+
)),
945+
("tags", TaggedFields)
946+
)
947+
948+
949+
class AlterPartitionReassignmentsRequest_v0(Request):
950+
FLEXIBLE_VERSION = True
951+
API_KEY = 45
952+
API_VERSION = 0
953+
RESPONSE_TYPE = AlterPartitionReassignmentsResponse_v0
954+
SCHEMA = Schema(
955+
("timeout_ms", Int32),
956+
("topics", CompactArray(
957+
("name", CompactString("utf-8")),
958+
("partitions", CompactArray(
959+
("partition_index", Int32),
960+
("replicas", CompactArray(Int32)),
961+
("tags", TaggedFields)
962+
)),
963+
("tags", TaggedFields)
964+
)),
965+
("tags", TaggedFields)
966+
)
967+
968+
969+
AlterPartitionReassignmentsRequest = [AlterPartitionReassignmentsRequest_v0]
970+
971+
AlterPartitionReassignmentsResponse = [AlterPartitionReassignmentsResponse_v0]
972+
973+
974+
class ListPartitionReassignmentsResponse_v0(Response):
975+
API_KEY = 46
976+
API_VERSION = 0
977+
SCHEMA = Schema(
978+
("throttle_time_ms", Int32),
979+
("error_code", Int16),
980+
("error_message", CompactString("utf-8")),
981+
("topics", CompactArray(
982+
("name", CompactString("utf-8")),
983+
("partitions", CompactArray(
984+
("partition_index", Int32),
985+
("replicas", CompactArray(Int32)),
986+
("adding_replicas", CompactArray(Int32)),
987+
("removing_replicas", CompactArray(Int32)),
988+
("tags", TaggedFields)
989+
)),
990+
("tags", TaggedFields)
991+
)),
992+
("tags", TaggedFields)
993+
)
994+
995+
996+
class ListPartitionReassignmentsRequest_v0(Request):
997+
FLEXIBLE_VERSION = True
998+
API_KEY = 46
999+
API_VERSION = 0
1000+
RESPONSE_TYPE = ListPartitionReassignmentsResponse_v0
1001+
SCHEMA = Schema(
1002+
("timeout_ms", Int32),
1003+
("topics", CompactArray(
1004+
("name", CompactString("utf-8")),
1005+
("partition_index", CompactArray(Int32)),
1006+
("tags", TaggedFields)
1007+
)),
1008+
("tags", TaggedFields)
1009+
)
1010+
1011+
1012+
ListPartitionReassignmentsRequest = [ListPartitionReassignmentsRequest_v0]
1013+
1014+
ListPartitionReassignmentsResponse = [ListPartitionReassignmentsResponse_v0]

kafka/protocol/api.py

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import abc
44

55
from kafka.protocol.struct import Struct
6-
from kafka.protocol.types import Int16, Int32, String, Schema, Array
6+
from kafka.protocol.types import Int16, Int32, String, Schema, Array, TaggedFields
77

88

99
class RequestHeader(Struct):
@@ -20,9 +20,41 @@ def __init__(self, request, correlation_id=0, client_id='kafka-python'):
2020
)
2121

2222

23+
class RequestHeaderV2(Struct):
24+
# Flexible response / request headers end in field buffer
25+
SCHEMA = Schema(
26+
('api_key', Int16),
27+
('api_version', Int16),
28+
('correlation_id', Int32),
29+
('client_id', String('utf-8')),
30+
('_tag_buffer', TaggedFields)
31+
)
32+
33+
def __init__(self, request, correlation_id=0, client_id='kafka-python', tags=None):
34+
super(RequestHeaderV2, self).__init__(
35+
request.API_KEY, request.API_VERSION, correlation_id, client_id, tags or {}
36+
)
37+
38+
39+
class ResponseHeader(Struct):
40+
SCHEMA = Schema(
41+
('correlation_id', Int32),
42+
)
43+
44+
45+
class ResponseHeaderV2(Struct):
46+
# Flexible response / request headers end in field buffer
47+
SCHEMA = Schema(
48+
('correlation_id', Int32),
49+
('tags', TaggedFields)
50+
)
51+
52+
2353
class Request(Struct):
2454
__metaclass__ = abc.ABCMeta
2555

56+
FLEXIBLE_VERSION = False
57+
2658
@abc.abstractproperty
2759
def API_KEY(self):
2860
"""Integer identifier for api request"""
@@ -50,6 +82,16 @@ def expect_response(self):
5082
def to_object(self):
5183
return _to_object(self.SCHEMA, self)
5284

85+
def build_request_header(self, correlation_id, client_id):
86+
if self.FLEXIBLE_VERSION:
87+
return RequestHeaderV2(self, correlation_id=correlation_id, client_id=client_id)
88+
return RequestHeader(self, correlation_id=correlation_id, client_id=client_id)
89+
90+
def parse_response_header(self, read_buffer):
91+
if self.FLEXIBLE_VERSION:
92+
return ResponseHeaderV2.decode(read_buffer)
93+
return ResponseHeader.decode(read_buffer)
94+
5395

5496
class Response(Struct):
5597
__metaclass__ = abc.ABCMeta

kafka/protocol/parser.py

Lines changed: 10 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,9 @@
44
import logging
55

66
import kafka.errors as Errors
7-
from kafka.protocol.api import RequestHeader
87
from kafka.protocol.commit import GroupCoordinatorResponse
98
from kafka.protocol.frame import KafkaBytes
10-
from kafka.protocol.types import Int32
9+
from kafka.protocol.types import Int32, TaggedFields
1110
from kafka.version import __version__
1211

1312
log = logging.getLogger(__name__)
@@ -59,9 +58,7 @@ def send_request(self, request, correlation_id=None):
5958
log.debug('Sending request %s', request)
6059
if correlation_id is None:
6160
correlation_id = self._next_correlation_id()
62-
header = RequestHeader(request,
63-
correlation_id=correlation_id,
64-
client_id=self._client_id)
61+
header = request.build_request_header(correlation_id=correlation_id, client_id=self._client_id)
6562
message = b''.join([header.encode(), request.encode()])
6663
size = Int32.encode(len(message))
6764
data = size + message
@@ -135,16 +132,14 @@ def receive_bytes(self, data):
135132
return responses
136133

137134
def _process_response(self, read_buffer):
138-
recv_correlation_id = Int32.decode(read_buffer)
139-
log.debug('Received correlation id: %d', recv_correlation_id)
140-
141135
if not self.in_flight_requests:
142136
raise Errors.CorrelationIdError(
143137
'No in-flight-request found for server response'
144-
' with correlation ID %d'
145-
% (recv_correlation_id,))
146-
138+
)
147139
(correlation_id, request) = self.in_flight_requests.popleft()
140+
response_header = request.parse_response_header(read_buffer)
141+
recv_correlation_id = response_header.correlation_id
142+
log.debug('Received correlation id: %d', recv_correlation_id)
148143

149144
# 0.8.2 quirk
150145
if (recv_correlation_id == 0 and
@@ -162,6 +157,9 @@ def _process_response(self, read_buffer):
162157
'Correlation IDs do not match: sent %d, recv %d'
163158
% (correlation_id, recv_correlation_id))
164159

160+
# Flexible response / request headers end in field buffer
161+
if request.FLEXIBLE_VERSION:
162+
_ = TaggedFields.decode(read_buffer)
165163
# decode response
166164
log.debug('Processing response %s', request.RESPONSE_TYPE.__name__)
167165
try:
@@ -181,3 +179,4 @@ def _reset_buffer(self):
181179
self._receiving = False
182180
self._header.seek(0)
183181
self._rbuffer = None
182+

kafka/protocol/types.py

Lines changed: 153 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -196,3 +196,156 @@ def repr(self, list_of_items):
196196
if list_of_items is None:
197197
return 'NULL'
198198
return '[' + ', '.join([self.array_of.repr(item) for item in list_of_items]) + ']'
199+
200+
201+
class UnsignedVarInt32(AbstractType):
202+
@classmethod
203+
def decode(cls, data):
204+
value, i = 0, 0
205+
while True:
206+
b, = struct.unpack('B', data.read(1))
207+
if not (b & 0x80):
208+
break
209+
value |= (b & 0x7f) << i
210+
i += 7
211+
if i > 28:
212+
raise ValueError('Invalid value {}'.format(value))
213+
value |= b << i
214+
return value
215+
216+
@classmethod
217+
def encode(cls, value):
218+
value &= 0xffffffff
219+
ret = b''
220+
while (value & 0xffffff80) != 0:
221+
b = (value & 0x7f) | 0x80
222+
ret += struct.pack('B', b)
223+
value >>= 7
224+
ret += struct.pack('B', value)
225+
return ret
226+
227+
228+
class VarInt32(AbstractType):
229+
@classmethod
230+
def decode(cls, data):
231+
value = UnsignedVarInt32.decode(data)
232+
return (value >> 1) ^ -(value & 1)
233+
234+
@classmethod
235+
def encode(cls, value):
236+
# bring it in line with the java binary repr
237+
value &= 0xffffffff
238+
return UnsignedVarInt32.encode((value << 1) ^ (value >> 31))
239+
240+
241+
class VarInt64(AbstractType):
242+
@classmethod
243+
def decode(cls, data):
244+
value, i = 0, 0
245+
while True:
246+
b = data.read(1)
247+
if not (b & 0x80):
248+
break
249+
value |= (b & 0x7f) << i
250+
i += 7
251+
if i > 63:
252+
raise ValueError('Invalid value {}'.format(value))
253+
value |= b << i
254+
return (value >> 1) ^ -(value & 1)
255+
256+
@classmethod
257+
def encode(cls, value):
258+
# bring it in line with the java binary repr
259+
value &= 0xffffffffffffffff
260+
v = (value << 1) ^ (value >> 63)
261+
ret = b''
262+
while (v & 0xffffffffffffff80) != 0:
263+
b = (value & 0x7f) | 0x80
264+
ret += struct.pack('B', b)
265+
v >>= 7
266+
ret += struct.pack('B', v)
267+
return ret
268+
269+
270+
class CompactString(String):
271+
def decode(self, data):
272+
length = UnsignedVarInt32.decode(data) - 1
273+
if length < 0:
274+
return None
275+
value = data.read(length)
276+
if len(value) != length:
277+
raise ValueError('Buffer underrun decoding string')
278+
return value.decode(self.encoding)
279+
280+
def encode(self, value):
281+
if value is None:
282+
return UnsignedVarInt32.encode(0)
283+
value = str(value).encode(self.encoding)
284+
return UnsignedVarInt32.encode(len(value) + 1) + value
285+
286+
287+
class TaggedFields(AbstractType):
288+
@classmethod
289+
def decode(cls, data):
290+
num_fields = UnsignedVarInt32.decode(data)
291+
ret = {}
292+
if not num_fields:
293+
return ret
294+
prev_tag = -1
295+
for i in range(num_fields):
296+
tag = UnsignedVarInt32.decode(data)
297+
if tag <= prev_tag:
298+
raise ValueError('Invalid or out-of-order tag {}'.format(tag))
299+
prev_tag = tag
300+
size = UnsignedVarInt32.decode(data)
301+
val = data.read(size)
302+
ret[tag] = val
303+
return ret
304+
305+
@classmethod
306+
def encode(cls, value):
307+
ret = UnsignedVarInt32.encode(len(value))
308+
for k, v in value.items():
309+
# do we allow for other data types ?? It could get complicated really fast
310+
assert isinstance(v, bytes), 'Value {} is not a byte array'.format(v)
311+
assert isinstance(k, int) and k > 0, 'Key {} is not a positive integer'.format(k)
312+
ret += UnsignedVarInt32.encode(k)
313+
ret += v
314+
return ret
315+
316+
317+
class CompactBytes(AbstractType):
318+
@classmethod
319+
def decode(cls, data):
320+
length = UnsignedVarInt32.decode(data) - 1
321+
if length < 0:
322+
return None
323+
value = data.read(length)
324+
if len(value) != length:
325+
raise ValueError('Buffer underrun decoding Bytes')
326+
return value
327+
328+
@classmethod
329+
def encode(cls, value):
330+
if value is None:
331+
return UnsignedVarInt32.encode(0)
332+
else:
333+
return UnsignedVarInt32.encode(len(value) + 1) + value
334+
335+
336+
class CompactArray(Array):
337+
338+
def encode(self, items):
339+
if items is None:
340+
return UnsignedVarInt32.encode(0)
341+
return b''.join(
342+
[UnsignedVarInt32.encode(len(items) + 1)] +
343+
[self.array_of.encode(item) for item in items]
344+
)
345+
346+
def decode(self, data):
347+
length = UnsignedVarInt32.decode(data) - 1
348+
if length == -1:
349+
return None
350+
return [self.array_of.decode(data) for _ in range(length)]
351+

0 commit comments

Comments
 (0)