Skip to content

Commit 4550703

Browse files
author
Tincu Gabriel
committed
core: add support for flexible versions
- Add support for varints - Add support for compact collections (byte array, string, array) - Add support for new request and response headers, supporting flexible versions - Add list of supported flexible versions, plus extraction script - Add List / Alter partition reassignments apis plus admin client support
1 parent 6f932ba commit 4550703

File tree

10 files changed

+445
-9
lines changed

10 files changed

+445
-9
lines changed

.travis.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ env:
1717
- KAFKA_VERSION=1.1.1
1818
- KAFKA_VERSION=2.4.0
1919
- KAFKA_VERSION=2.5.0
20+
- KAFKA_VERSION=2.6.0
2021

2122
addons:
2223
apt:

kafka/admin/client.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020
from kafka.protocol.admin import (
2121
CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest,
2222
ListGroupsRequest, DescribeGroupsRequest, DescribeAclsRequest, CreateAclsRequest, DeleteAclsRequest,
23-
DeleteGroupsRequest
23+
DeleteGroupsRequest, ListPartitionReassignmentsRequest
2424
)
2525
from kafka.protocol.commit import GroupCoordinatorRequest, OffsetFetchRequest
2626
from kafka.protocol.metadata import MetadataRequest
@@ -460,6 +460,28 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=False):
460460
# TODO raise exceptions if errors
461461
return self._send_request_to_controller(request)
462462

463+
def list_partition_reassignments(self, topics_partitions, timeout_ms=None):
464+
"""List ongoing partition reassignments in the cluster.
465+
466+
:param topics_partitions: A list of TopicPartition objects.
467+
:param timeout_ms: Milliseconds to wait for new topics to be created
468+
before the broker returns.
469+
:return: Appropriate version of ListPartitionReassignment class.
470+
"""
471+
version = self._matching_api_version(ListPartitionReassignmentsRequest)
472+
timeout_ms = self._validate_timeout(timeout_ms)
473+
if version != 0:
474+
raise NotImplementedError(
475+
"Support for ListPartitionReassignments v{} has not yet been added to KafkaAdminClient."
476+
.format(version))
477+
payload = defaultdict(set)
478+
for tp in topics_partitions:
479+
payload[tp.topic].add(tp.partition)
480+
request = ListPartitionReassignmentsRequest[version](timeout_ms, [(k, v, {}) for k, v in payload.items()], {})
481+
future = self._send_request_to_node(self._controller_id, request)
482+
self._wait_for_futures([future])
483+
return future.value
484+
463485
def delete_topics(self, topics, timeout_ms=None):
464486
"""Delete topics from the cluster.
465487

kafka/protocol/__init__.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,4 +43,52 @@
4343
40: 'ExpireDelegationToken',
4444
41: 'DescribeDelegationToken',
4545
42: 'DeleteGroups',
46+
45: 'AlterPartitionReassignments',
47+
46: 'ListPartitionReassignments',
48+
}
49+
50+
# generated using the util function
51+
FLEXIBLE_VERSIONS = {
52+
1: 12,
53+
3: 9,
54+
4: 4,
55+
5: 2,
56+
6: 6,
57+
7: 3,
58+
8: 8,
59+
9: 6,
60+
10: 3,
61+
11: 6,
62+
12: 4,
63+
13: 4,
64+
14: 4,
65+
15: 5,
66+
16: 3,
67+
18: 3,
68+
19: 5,
69+
20: 4,
70+
21: 2,
71+
22: 2,
72+
28: 3,
73+
29: 2,
74+
30: 2,
75+
31: 2,
76+
35: 2,
77+
36: 2,
78+
37: 2,
79+
38: 2,
80+
39: 2,
81+
40: 2,
82+
41: 2,
83+
42: 2,
84+
43: 2,
85+
44: 1,
86+
45: 0,
87+
46: 0,
88+
50: 0,
89+
51: 0,
90+
52: 0,
91+
55: 0,
92+
56: 0,
93+
57: 0
4694
}

kafka/protocol/admin.py

Lines changed: 88 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from __future__ import absolute_import
22

33
from kafka.protocol.api import Request, Response
4-
from kafka.protocol.types import Array, Boolean, Bytes, Int8, Int16, Int32, Int64, Schema, String
4+
from kafka.protocol.types import Array, Boolean, Bytes, Int8, Int16, Int32, Int64, Schema, String, CompactString, CompactArray, TaggedFields
55

66

77
class ApiVersionResponse_v0(Response):
@@ -923,3 +923,90 @@ class DeleteGroupsRequest_v1(Request):
923923
DeleteGroupsResponse = [
924924
DeleteGroupsResponse_v0, DeleteGroupsResponse_v1
925925
]
926+
927+
928+
class AlterPartitionReassignmentsResponse_v0(Response):
929+
API_KEY = 45
930+
API_VERSION = 0
931+
SCHEMA = Schema(
932+
("throttle_time_ms", Int32),
933+
("error_code", Int16),
934+
("error_message", CompactString("utf-8")),
935+
("responses", CompactArray(
936+
("name", CompactString("utf-8")),
937+
("partitions", CompactArray(
938+
("partition_index", Int32),
939+
("error_code", Int16),
940+
("error_message", CompactString("utf-8")),
941+
("_tagged_fields", TaggedFields)
942+
)),
943+
("_tagged_fields", TaggedFields)
944+
)),
945+
("_tagged_fields", TaggedFields)
946+
)
947+
948+
949+
class AlterPartitionReassignmentsRequest_v0(Request):
950+
API_KEY = 45
951+
API_VERSION = 0
952+
RESPONSE_TYPE = AlterPartitionReassignmentsResponse_v0
953+
SCHEMA = Schema(
954+
("timeout_ms", Int32),
955+
("topics", CompactArray(
956+
("name", CompactString("utf-8")),
957+
("partitions", CompactArray(
958+
("partition_index", Int32),
959+
("replicas", CompactArray(Int32)),
960+
("_tagged_fields", TaggedFields)
961+
)),
962+
("_tagged_fields", TaggedFields)
963+
)),
964+
("_tagged_fields", TaggedFields)
965+
)
966+
967+
968+
AlterPartitionReassignmentsRequest = [AlterPartitionReassignmentsRequest_v0]
969+
970+
AlterPartitionReassignmentsResponse = [AlterPartitionReassignmentsResponse_v0]
971+
972+
973+
class ListPartitionReassignmentsResponse_v0(Response):
974+
API_KEY = 46
975+
API_VERSION = 0
976+
SCHEMA = Schema(
977+
("throttle_time_ms", Int32),
978+
("error_code", Int16),
979+
("error_message", CompactString("utf-8")),
980+
("topics", CompactArray(
981+
("name", CompactString("utf-8")),
982+
("partitions", CompactArray(
983+
("partition_index", Int32),
984+
("replicas", CompactArray(Int32)),
985+
("adding_replicas", CompactArray(Int32)),
986+
("removing_replicas", CompactArray(Int32)),
987+
("_tagged_fields", TaggedFields)
988+
)),
989+
("_tagged_fields", TaggedFields)
990+
)),
991+
("_tagged_fields", TaggedFields)
992+
)
993+
994+
995+
class ListPartitionReassignmentsRequest_v0(Request):
996+
API_KEY = 46
997+
API_VERSION = 0
998+
RESPONSE_TYPE = ListPartitionReassignmentsResponse_v0
999+
SCHEMA = Schema(
1000+
("timeout_ms", Int32),
1001+
("topics", CompactArray(
1002+
("name", CompactString("utf-8")),
1003+
("partition_index", CompactArray(Int32)),
1004+
("_tagged_fields", TaggedFields)
1005+
)),
1006+
("_tagged_fields", TaggedFields)
1007+
)
1008+
1009+
1010+
ListPartitionReassignmentsRequest = [ListPartitionReassignmentsRequest_v0]
1011+
1012+
ListPartitionReassignmentsResponse = [ListPartitionReassignmentsResponse_v0]

kafka/protocol/api.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import abc
44

55
from kafka.protocol.struct import Struct
6-
from kafka.protocol.types import Int16, Int32, String, Schema, Array
6+
from kafka.protocol.types import Int16, Int32, String, Schema, Array, TaggedFields
77

88

99
class RequestHeader(Struct):
@@ -20,6 +20,22 @@ def __init__(self, request, correlation_id=0, client_id='kafka-python'):
2020
)
2121

2222

23+
class RequestHeaderV2(Struct):
24+
# Flexible response / request headers end in field buffer
25+
SCHEMA = Schema(
26+
('api_key', Int16),
27+
('api_version', Int16),
28+
('correlation_id', Int32),
29+
('client_id', String('utf-8')),
30+
('_tag_buffer', TaggedFields)
31+
)
32+
33+
def __init__(self, request, correlation_id=0, client_id='kafka-python'):
34+
super(RequestHeaderV2, self).__init__(
35+
request.API_KEY, request.API_VERSION, correlation_id, client_id, {}
36+
)
37+
38+
2339
class Request(Struct):
2440
__metaclass__ = abc.ABCMeta
2541

kafka/protocol/parser.py

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,11 @@
44
import logging
55

66
import kafka.errors as Errors
7-
from kafka.protocol.api import RequestHeader
7+
from kafka.protocol import FLEXIBLE_VERSIONS
8+
from kafka.protocol.api import RequestHeader, RequestHeaderV2
89
from kafka.protocol.commit import GroupCoordinatorResponse
910
from kafka.protocol.frame import KafkaBytes
10-
from kafka.protocol.types import Int32
11+
from kafka.protocol.types import Int32, TaggedFields
1112
from kafka.version import __version__
1213

1314
log = logging.getLogger(__name__)
@@ -59,9 +60,14 @@ def send_request(self, request, correlation_id=None):
5960
log.debug('Sending request %s', request)
6061
if correlation_id is None:
6162
correlation_id = self._next_correlation_id()
62-
header = RequestHeader(request,
63-
correlation_id=correlation_id,
64-
client_id=self._client_id)
63+
if not self.is_flexible_version_request(request):
64+
header = RequestHeader(request,
65+
correlation_id=correlation_id,
66+
client_id=self._client_id)
67+
else:
68+
header = RequestHeaderV2(request,
69+
correlation_id=correlation_id,
70+
client_id=self._client_id)
6571
message = b''.join([header.encode(), request.encode()])
6672
size = Int32.encode(len(message))
6773
data = size + message
@@ -162,6 +168,9 @@ def _process_response(self, read_buffer):
162168
'Correlation IDs do not match: sent %d, recv %d'
163169
% (correlation_id, recv_correlation_id))
164170

171+
# Flexible response / request headers end in field buffer
172+
if self.is_flexible_version_request(request):
173+
_ = TaggedFields.decode(read_buffer)
165174
# decode response
166175
log.debug('Processing response %s', request.RESPONSE_TYPE.__name__)
167176
try:
@@ -181,3 +190,11 @@ def _reset_buffer(self):
181190
self._receiving = False
182191
self._header.seek(0)
183192
self._rbuffer = None
193+
194+
@staticmethod
195+
def is_flexible_version_request(request):
196+
flexible_version = FLEXIBLE_VERSIONS.get(request.API_KEY)
197+
request_version = request.API_VERSION
198+
if flexible_version is None or flexible_version > request_version:
199+
return False
200+
return True

0 commit comments

Comments
 (0)