Skip to content

Commit 4c9095b

Browse files
author
Gabriel Tincu
committed
Add logic for inferring newer broker versions
- New Fetch / ListOffsets request / response objects - Add new test cases to inferr code based on mentioned objects - Add unit test to check inferred version against whatever resides in KAFKA_VERSION - Add new kafka broker versions to integration list - Add more kafka broker versions to travis task list
1 parent f9e0264 commit 4c9095b

File tree

8 files changed

+299
-9
lines changed

8 files changed

+299
-9
lines changed

.travis.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,10 @@ env:
1414
- KAFKA_VERSION=0.10.2.2
1515
- KAFKA_VERSION=0.11.0.3
1616
- KAFKA_VERSION=1.1.1
17+
- KAFKA_VERSION=2.0.1
18+
- KAFKA_VERSION=2.1.1
19+
- KAFKA_VERSION=2.2.0
20+
- KAFKA_VERSION=2.3.0
1721
- KAFKA_VERSION=2.4.0
1822

1923
addons:

build_integration.sh

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
#!/bin/bash
22

3-
: ${ALL_RELEASES:="0.8.2.2 0.9.0.1 0.10.1.1 0.10.2.2 0.11.0.3 1.0.2 1.1.1 2.0.1 2.1.1"}
3+
: ${ALL_RELEASES:="0.8.2.2 0.9.0.1 0.10.1.1 0.10.2.2 0.11.0.3 1.0.2 1.1.1 2.0.1 2.1.1 2.2.0 2.2.1 2.3.0 2.4.0"}
44
: ${SCALA_VERSION:=2.11}
55
: ${DIST_BASE_URL:=https://archive.apache.org/dist/kafka/}
66
: ${KAFKA_SRC_GIT:=https://github.com/apache/kafka.git}
@@ -33,7 +33,6 @@ pushd servers
3333
echo "-------------------------------------"
3434
echo "Checking kafka binaries for ${kafka}"
3535
echo
36-
# kafka 0.8.0 is only available w/ scala 2.8.0
3736
if [ "$kafka" == "0.8.0" ]; then
3837
KAFKA_ARTIFACT="kafka_2.8.0-${kafka}.tar.gz"
3938
else

kafka/conn.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,10 @@
2626
from kafka.oauth.abstract import AbstractTokenProvider
2727
from kafka.protocol.admin import SaslHandShakeRequest
2828
from kafka.protocol.commit import OffsetFetchRequest
29+
from kafka.protocol.offset import OffsetRequest
30+
from kafka.protocol.produce import ProduceRequest
2931
from kafka.protocol.metadata import MetadataRequest
32+
from kafka.protocol.fetch import FetchRequest
3033
from kafka.protocol.parser import KafkaProtocol
3134
from kafka.protocol.types import Int32, Int8
3235
from kafka.scram import ScramClient
@@ -1166,10 +1169,20 @@ def _infer_broker_version_from_api_versions(self, api_versions):
11661169
# in reverse order. As soon as we find one that works, return it
11671170
test_cases = [
11681171
# format (<broker version>, <needed struct>)
1172+
((2, 4, 0), ProduceRequest[8]),
1173+
((2, 3, 0), FetchRequest[11]),
1174+
((2, 2, 0), OffsetRequest[5]),
1175+
((2, 1, 0), FetchRequest[10]),
1176+
((2, 0, 0), FetchRequest[8]),
1177+
((1, 1, 0), FetchRequest[7]),
11691178
((1, 0, 0), MetadataRequest[5]),
11701179
((0, 11, 0), MetadataRequest[4]),
11711180
((0, 10, 2), OffsetFetchRequest[2]),
11721181
((0, 10, 1), MetadataRequest[2]),
1182+
# taken from https://github.com/edenhill/librdkafka/blob/master/src/rdkafka_feature.c#L234
1183+
((0, 9, 0), ProduceRequest[1]),
1184+
((0, 8, 2), OffsetFetchRequest[1]),
1185+
((0, 8, 1), OffsetFetchRequest[0]),
11731186
]
11741187

11751188
# Get the best match of test cases

kafka/protocol/admin.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -862,3 +862,4 @@ class CreatePartitionsRequest_v1(Request):
862862
CreatePartitionsResponse = [
863863
CreatePartitionsResponse_v0, CreatePartitionsResponse_v1,
864864
]
865+

kafka/protocol/fetch.py

Lines changed: 180 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,6 +94,72 @@ class FetchResponse_v6(Response):
9494
SCHEMA = FetchResponse_v5.SCHEMA
9595

9696

97+
class FetchResponse_v7(Response):
98+
"""
99+
Add error_code and session_id to response
100+
"""
101+
API_KEY = 1
102+
API_VERSION = 7
103+
SCHEMA = Schema(
104+
('throttle_time_ms', Int32),
105+
('error_code', Int16),
106+
('session_id', Int32),
107+
('topics', Array(
108+
('topics', String('utf-8')),
109+
('partitions', Array(
110+
('partition', Int32),
111+
('error_code', Int16),
112+
('highwater_offset', Int64),
113+
('last_stable_offset', Int64),
114+
('log_start_offset', Int64),
115+
('aborted_transactions', Array(
116+
('producer_id', Int64),
117+
('first_offset', Int64))),
118+
('message_set', Bytes)))))
119+
)
120+
121+
122+
class FetchResponse_v8(Response):
123+
API_KEY = 1
124+
API_VERSION = 8
125+
SCHEMA = FetchResponse_v7.SCHEMA
126+
127+
128+
class FetchResponse_v9(Response):
129+
API_KEY = 1
130+
API_VERSION = 9
131+
SCHEMA = FetchResponse_v7.SCHEMA
132+
133+
134+
class FetchResponse_v10(Response):
135+
API_KEY = 1
136+
API_VERSION = 10
137+
SCHEMA = FetchResponse_v7.SCHEMA
138+
139+
140+
class FetchResponse_v11(Response):
141+
API_KEY = 1
142+
API_VERSION = 11
143+
SCHEMA = Schema(
144+
('throttle_time_ms', Int32),
145+
('error_code', Int16),
146+
('session_id', Int32),
147+
('topics', Array(
148+
('topics', String('utf-8')),
149+
('partitions', Array(
150+
('partition', Int32),
151+
('error_code', Int16),
152+
('highwater_offset', Int64),
153+
('last_stable_offset', Int64),
154+
('log_start_offset', Int64),
155+
('aborted_transactions', Array(
156+
('producer_id', Int64),
157+
('first_offset', Int64))),
158+
('preferred_read_replica', Int32),
159+
('message_set', Bytes)))))
160+
)
161+
162+
97163
class FetchRequest_v0(Request):
98164
API_KEY = 1
99165
API_VERSION = 0
@@ -196,13 +262,125 @@ class FetchRequest_v6(Request):
196262
SCHEMA = FetchRequest_v5.SCHEMA
197263

198264

265+
class FetchRequest_v7(Request):
266+
"""
267+
Add incremental fetch requests
268+
"""
269+
API_KEY = 1
270+
API_VERSION = 7
271+
RESPONSE_TYPE = FetchResponse_v7
272+
SCHEMA = Schema(
273+
('replica_id', Int32),
274+
('max_wait_time', Int32),
275+
('min_bytes', Int32),
276+
('max_bytes', Int32),
277+
('isolation_level', Int8),
278+
('session_id', Int32),
279+
('session_epoch', Int32),
280+
('topics', Array(
281+
('topic', String('utf-8')),
282+
('partitions', Array(
283+
('partition', Int32),
284+
('fetch_offset', Int64),
285+
('log_start_offset', Int64),
286+
('max_bytes', Int32))))),
287+
('forgotten_topics_data', Array(
288+
('topic', String),
289+
('partitions', Array(Int32))
290+
)),
291+
)
292+
293+
294+
class FetchRequest_v8(Request):
295+
"""
296+
bump used to indicate that on quota violation brokers send out responses before throttling.
297+
"""
298+
API_KEY = 1
299+
API_VERSION = 8
300+
RESPONSE_TYPE = FetchResponse_v8
301+
SCHEMA = FetchRequest_v7.SCHEMA
302+
303+
304+
class FetchRequest_v9(Request):
305+
"""
306+
adds the current leader epoch (see KIP-320)
307+
"""
308+
API_KEY = 1
309+
API_VERSION = 9
310+
RESPONSE_TYPE = FetchResponse_v9
311+
SCHEMA = Schema(
312+
('replica_id', Int32),
313+
('max_wait_time', Int32),
314+
('min_bytes', Int32),
315+
('max_bytes', Int32),
316+
('isolation_level', Int8),
317+
('session_id', Int32),
318+
('session_epoch', Int32),
319+
('topics', Array(
320+
('topic', String('utf-8')),
321+
('partitions', Array(
322+
('partition', Int32),
323+
('current_leader_epoch', Int32),
324+
('fetch_offset', Int64),
325+
('log_start_offset', Int64),
326+
('max_bytes', Int32))))),
327+
('forgotten_topics_data', Array(
328+
('topic', String),
329+
('partitions', Array(Int32)),
330+
)),
331+
)
332+
333+
334+
class FetchRequest_v10(Request):
335+
"""
336+
bumped up to indicate ZStandard capability. (see KIP-110)
337+
"""
338+
API_KEY = 1
339+
API_VERSION = 10
340+
RESPONSE_TYPE = FetchResponse_v10
341+
SCHEMA = FetchRequest_v9.SCHEMA
342+
343+
344+
class FetchRequest_v11(Request):
345+
"""
346+
added rack ID to support read from followers (KIP-392)
347+
"""
348+
API_KEY = 1
349+
API_VERSION = 11
350+
RESPONSE_TYPE = FetchResponse_v11
351+
SCHEMA = Schema(
352+
('replica_id', Int32),
353+
('max_wait_time', Int32),
354+
('min_bytes', Int32),
355+
('max_bytes', Int32),
356+
('isolation_level', Int8),
357+
('session_id', Int32),
358+
('session_epoch', Int32),
359+
('topics', Array(
360+
('topic', String('utf-8')),
361+
('partitions', Array(
362+
('partition', Int32),
363+
('current_leader_epoch', Int32),
364+
('fetch_offset', Int64),
365+
('log_start_offset', Int64),
366+
('max_bytes', Int32))))),
367+
('forgotten_topics_data', Array(
368+
('topic', String),
369+
('partitions', Array(Int32))
370+
)),
371+
('rack_id', String('utf-8')),
372+
)
373+
374+
199375
FetchRequest = [
200376
FetchRequest_v0, FetchRequest_v1, FetchRequest_v2,
201377
FetchRequest_v3, FetchRequest_v4, FetchRequest_v5,
202-
FetchRequest_v6
378+
FetchRequest_v6, FetchRequest_v7, FetchRequest_v8,
379+
FetchRequest_v9, FetchRequest_v10, FetchRequest_v11,
203380
]
204381
FetchResponse = [
205382
FetchResponse_v0, FetchResponse_v1, FetchResponse_v2,
206383
FetchResponse_v3, FetchResponse_v4, FetchResponse_v5,
207-
FetchResponse_v6
384+
FetchResponse_v6, FetchResponse_v7, FetchResponse_v8,
385+
FetchResponse_v9, FetchResponse_v10, FetchResponse_v11,
208386
]

kafka/protocol/offset.py

Lines changed: 87 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,43 @@ class OffsetResponse_v2(Response):
5353
)
5454

5555

56+
class OffsetResponse_v3(Response):
57+
"""
58+
on quota violation, brokers send out responses before throttling
59+
"""
60+
API_KEY = 2
61+
API_VERSION = 3
62+
SCHEMA = OffsetResponse_v2.SCHEMA
63+
64+
65+
class OffsetResponse_v4(Response):
66+
"""
67+
Add leader_epoch to response
68+
"""
69+
API_KEY = 2
70+
API_VERSION = 4
71+
SCHEMA = Schema(
72+
('throttle_time_ms', Int32),
73+
('topics', Array(
74+
('topic', String('utf-8')),
75+
('partitions', Array(
76+
('partition', Int32),
77+
('error_code', Int16),
78+
('timestamp', Int64),
79+
('offset', Int64),
80+
('leader_epoch', Int32)))))
81+
)
82+
83+
84+
class OffsetResponse_v5(Response):
85+
"""
86+
adds a new error code, OFFSET_NOT_AVAILABLE
87+
"""
88+
API_KEY = 2
89+
API_VERSION = 5
90+
SCHEMA = OffsetResponse_v4.SCHEMA
91+
92+
5693
class OffsetRequest_v0(Request):
5794
API_KEY = 2
5895
API_VERSION = 0
@@ -105,5 +142,53 @@ class OffsetRequest_v2(Request):
105142
}
106143

107144

108-
OffsetRequest = [OffsetRequest_v0, OffsetRequest_v1, OffsetRequest_v2]
109-
OffsetResponse = [OffsetResponse_v0, OffsetResponse_v1, OffsetResponse_v2]
145+
class OffsetRequest_v3(Request):
146+
API_KEY = 2
147+
API_VERSION = 3
148+
RESPONSE_TYPE = OffsetResponse_v3
149+
SCHEMA = OffsetRequest_v2.SCHEMA
150+
DEFAULTS = {
151+
'replica_id': -1
152+
}
153+
154+
155+
class OffsetRequest_v4(Request):
156+
"""
157+
Add current_leader_epoch to request
158+
"""
159+
API_KEY = 2
160+
API_VERSION = 4
161+
RESPONSE_TYPE = OffsetResponse_v4
162+
SCHEMA = Schema(
163+
('replica_id', Int32),
164+
('isolation_level', Int8), # <- added isolation_level
165+
('topics', Array(
166+
('topic', String('utf-8')),
167+
('partitions', Array(
168+
('partition', Int32),
169+
('current_leader_epoch', Int64),
170+
('timestamp', Int64)))))
171+
)
172+
DEFAULTS = {
173+
'replica_id': -1
174+
}
175+
176+
177+
class OffsetRequest_v5(Request):
178+
API_KEY = 2
179+
API_VERSION = 5
180+
RESPONSE_TYPE = OffsetResponse_v5
181+
SCHEMA = OffsetRequest_v4.SCHEMA
182+
DEFAULTS = {
183+
'replica_id': -1
184+
}
185+
186+
187+
OffsetRequest = [
188+
OffsetRequest_v0, OffsetRequest_v1, OffsetRequest_v2,
189+
OffsetRequest_v3, OffsetRequest_v4, OffsetRequest_v5,
190+
]
191+
OffsetResponse = [
192+
OffsetResponse_v0, OffsetResponse_v1, OffsetResponse_v2,
193+
OffsetResponse_v3, OffsetResponse_v4, OffsetResponse_v5,
194+
]

test/test_conn.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import socket
66

77
import mock
8+
import os
89
import pytest
910

1011
from kafka.conn import BrokerConnection, ConnectionStates, collect_hosts

test/test_consumer_integration.py

Lines changed: 12 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,14 +6,23 @@
66
from kafka.vendor.six.moves import range
77

88
import kafka.codec
9-
from kafka.errors import (
10-
KafkaTimeoutError, UnsupportedCodecError, UnsupportedVersionError
11-
)
9+
from kafka.errors import UnsupportedCodecError, UnsupportedVersionError
1210
from kafka.structs import TopicPartition, OffsetAndTimestamp
1311

1412
from test.testutil import Timer, assert_message_count, env_kafka_version, random_string
1513

1614

15+
@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
16+
def test_kafka_version_infer(kafka_consumer_factory):
17+
consumer = kafka_consumer_factory()
18+
actual_ver_major_minor = env_kafka_version()[:2]
19+
client = consumer._client
20+
conn = list(client._conns.values())[0]
21+
inferred_ver_major_minor = conn.check_version()[:2]
22+
assert actual_ver_major_minor == inferred_ver_major_minor, \
23+
"Was expecting inferred broker version to be %s but was %s" % (actual_ver_major_minor, inferred_ver_major_minor)
24+
25+
1726
@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
1827
def test_kafka_consumer(kafka_consumer_factory, send_messages):
1928
"""Test KafkaConsumer"""

0 commit comments

Comments
 (0)