Skip to content

Commit 209515b

Browse files
authored
Implement methods to convert a Struct object to a pythonic object (#1951)
Implement methods to convert a Struct object to a pythonic object
1 parent 3d98741 commit 209515b

File tree

3 files changed

+273
-1
lines changed

3 files changed

+273
-1
lines changed

kafka/protocol/api.py

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
import abc
44

55
from kafka.protocol.struct import Struct
6-
from kafka.protocol.types import Int16, Int32, String, Schema
6+
from kafka.protocol.types import Int16, Int32, String, Schema, Array
77

88

99
class RequestHeader(Struct):
@@ -47,6 +47,9 @@ def expect_response(self):
4747
"""Override this method if an api request does not always generate a response"""
4848
return True
4949

50+
def to_object(self):
51+
return _to_object(self.SCHEMA, self)
52+
5053

5154
class Response(Struct):
5255
__metaclass__ = abc.ABCMeta
@@ -65,3 +68,30 @@ def API_VERSION(self):
6568
def SCHEMA(self):
6669
"""An instance of Schema() representing the response structure"""
6770
pass
71+
72+
def to_object(self):
73+
return _to_object(self.SCHEMA, self)
74+
75+
76+
def _to_object(schema, data):
77+
obj = {}
78+
for idx, (name, _type) in enumerate(zip(schema.names, schema.fields)):
79+
if isinstance(data, Struct):
80+
val = data.get_item(name)
81+
else:
82+
val = data[idx]
83+
84+
if isinstance(_type, Schema):
85+
obj[name] = _to_object(_type, val)
86+
elif isinstance(_type, Array):
87+
if isinstance(_type.array_of, (Array, Schema)):
88+
obj[name] = [
89+
_to_object(_type.array_of, x)
90+
for x in val
91+
]
92+
else:
93+
obj[name] = val
94+
else:
95+
obj[name] = val
96+
97+
return obj

kafka/protocol/struct.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ def __init__(self, *args, **kwargs):
3030
# causes instances to "leak" to garbage
3131
self.encode = WeakMethod(self._encode_self)
3232

33+
3334
@classmethod
3435
def encode(cls, item): # pylint: disable=E0202
3536
bits = []
@@ -48,6 +49,11 @@ def decode(cls, data):
4849
data = BytesIO(data)
4950
return cls(*[field.decode(data) for field in cls.SCHEMA.fields])
5051

52+
def get_item(self, name):
53+
if name not in self.SCHEMA.names:
54+
raise KeyError("%s is not in the schema" % name)
55+
return self.__dict__[name]
56+
5157
def __repr__(self):
5258
key_vals = []
5359
for name, field in zip(self.SCHEMA.names, self.SCHEMA.fields):

test/test_object_conversion.py

Lines changed: 236 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,236 @@
1+
from kafka.protocol.admin import Request
2+
from kafka.protocol.admin import Response
3+
from kafka.protocol.types import Schema
4+
from kafka.protocol.types import Array
5+
from kafka.protocol.types import Int16
6+
from kafka.protocol.types import String
7+
8+
import pytest
9+
10+
@pytest.mark.parametrize('superclass', (Request, Response))
11+
class TestObjectConversion:
12+
def test_get_item(self, superclass):
13+
class TestClass(superclass):
14+
API_KEY = 0
15+
API_VERSION = 0
16+
RESPONSE_TYPE = None # To satisfy the Request ABC
17+
SCHEMA = Schema(
18+
('myobject', Int16))
19+
20+
tc = TestClass(myobject=0)
21+
assert tc.get_item('myobject') == 0
22+
with pytest.raises(KeyError):
23+
tc.get_item('does-not-exist')
24+
25+
def test_with_empty_schema(self, superclass):
26+
class TestClass(superclass):
27+
API_KEY = 0
28+
API_VERSION = 0
29+
RESPONSE_TYPE = None # To satisfy the Request ABC
30+
SCHEMA = Schema()
31+
32+
tc = TestClass()
33+
tc.encode()
34+
assert tc.to_object() == {}
35+
36+
def test_with_basic_schema(self, superclass):
37+
class TestClass(superclass):
38+
API_KEY = 0
39+
API_VERSION = 0
40+
RESPONSE_TYPE = None # To satisfy the Request ABC
41+
SCHEMA = Schema(
42+
('myobject', Int16))
43+
44+
tc = TestClass(myobject=0)
45+
tc.encode()
46+
assert tc.to_object() == {'myobject': 0}
47+
48+
def test_with_basic_array_schema(self, superclass):
49+
class TestClass(superclass):
50+
API_KEY = 0
51+
API_VERSION = 0
52+
RESPONSE_TYPE = None # To satisfy the Request ABC
53+
SCHEMA = Schema(
54+
('myarray', Array(Int16)))
55+
56+
tc = TestClass(myarray=[1,2,3])
57+
tc.encode()
58+
assert tc.to_object()['myarray'] == [1, 2, 3]
59+
60+
def test_with_complex_array_schema(self, superclass):
61+
class TestClass(superclass):
62+
API_KEY = 0
63+
API_VERSION = 0
64+
RESPONSE_TYPE = None # To satisfy the Request ABC
65+
SCHEMA = Schema(
66+
('myarray', Array(
67+
('subobject', Int16),
68+
('othersubobject', String('utf-8')))))
69+
70+
tc = TestClass(
71+
myarray=[[10, 'hello']]
72+
)
73+
tc.encode()
74+
obj = tc.to_object()
75+
assert len(obj['myarray']) == 1
76+
assert obj['myarray'][0]['subobject'] == 10
77+
assert obj['myarray'][0]['othersubobject'] == 'hello'
78+
79+
def test_with_array_and_other(self, superclass):
80+
class TestClass(superclass):
81+
API_KEY = 0
82+
API_VERSION = 0
83+
RESPONSE_TYPE = None # To satisfy the Request ABC
84+
SCHEMA = Schema(
85+
('myarray', Array(
86+
('subobject', Int16),
87+
('othersubobject', String('utf-8')))),
88+
('notarray', Int16))
89+
90+
tc = TestClass(
91+
myarray=[[10, 'hello']],
92+
notarray=42
93+
)
94+
95+
obj = tc.to_object()
96+
assert len(obj['myarray']) == 1
97+
assert obj['myarray'][0]['subobject'] == 10
98+
assert obj['myarray'][0]['othersubobject'] == 'hello'
99+
assert obj['notarray'] == 42
100+
101+
def test_with_nested_array(self, superclass):
102+
class TestClass(superclass):
103+
API_KEY = 0
104+
API_VERSION = 0
105+
RESPONSE_TYPE = None # To satisfy the Request ABC
106+
SCHEMA = Schema(
107+
('myarray', Array(
108+
('subarray', Array(Int16)),
109+
('otherobject', Int16))))
110+
111+
tc = TestClass(
112+
myarray=[
113+
[[1, 2], 2],
114+
[[2, 3], 4],
115+
]
116+
)
117+
print(tc.encode())
118+
119+
120+
obj = tc.to_object()
121+
assert len(obj['myarray']) == 2
122+
assert obj['myarray'][0]['subarray'] == [1, 2]
123+
assert obj['myarray'][0]['otherobject'] == 2
124+
assert obj['myarray'][1]['subarray'] == [2, 3]
125+
assert obj['myarray'][1]['otherobject'] == 4
126+
127+
def test_with_complex_nested_array(self, superclass):
128+
class TestClass(superclass):
129+
API_KEY = 0
130+
API_VERSION = 0
131+
RESPONSE_TYPE = None # To satisfy the Request ABC
132+
SCHEMA = Schema(
133+
('myarray', Array(
134+
('subarray', Array(
135+
('innertest', String('utf-8')),
136+
('otherinnertest', String('utf-8')))),
137+
('othersubarray', Array(Int16)))),
138+
('notarray', String('utf-8')))
139+
140+
tc = TestClass(
141+
myarray=[
142+
[[['hello', 'hello'], ['hello again', 'hello again']], [0]],
143+
[[['hello', 'hello again']], [1]],
144+
],
145+
notarray='notarray'
146+
)
147+
tc.encode()
148+
149+
obj = tc.to_object()
150+
151+
assert obj['notarray'] == 'notarray'
152+
myarray = obj['myarray']
153+
assert len(myarray) == 2
154+
155+
assert myarray[0]['othersubarray'] == [0]
156+
assert len(myarray[0]['subarray']) == 2
157+
assert myarray[0]['subarray'][0]['innertest'] == 'hello'
158+
assert myarray[0]['subarray'][0]['otherinnertest'] == 'hello'
159+
assert myarray[0]['subarray'][1]['innertest'] == 'hello again'
160+
assert myarray[0]['subarray'][1]['otherinnertest'] == 'hello again'
161+
162+
assert myarray[1]['othersubarray'] == [1]
163+
assert len(myarray[1]['subarray']) == 1
164+
assert myarray[1]['subarray'][0]['innertest'] == 'hello'
165+
assert myarray[1]['subarray'][0]['otherinnertest'] == 'hello again'
166+
167+
def test_with_metadata_response():
168+
from kafka.protocol.metadata import MetadataResponse_v5
169+
tc = MetadataResponse_v5(
170+
throttle_time_ms=0,
171+
brokers=[
172+
[0, 'testhost0', 9092, 'testrack0'],
173+
[1, 'testhost1', 9092, 'testrack1'],
174+
],
175+
cluster_id='abcd',
176+
controller_id=0,
177+
topics=[
178+
[0, 'testtopic1', False, [
179+
[0, 0, 0, [0, 1], [0, 1], []],
180+
[0, 1, 1, [1, 0], [1, 0], []],
181+
],
182+
], [0, 'other-test-topic', True, [
183+
[0, 0, 0, [0, 1], [0, 1], []],
184+
]
185+
]]
186+
)
187+
tc.encode() # Make sure this object encodes successfully
188+
189+
190+
obj = tc.to_object()
191+
192+
assert obj['throttle_time_ms'] == 0
193+
194+
assert len(obj['brokers']) == 2
195+
assert obj['brokers'][0]['node_id'] == 0
196+
assert obj['brokers'][0]['host'] == 'testhost0'
197+
assert obj['brokers'][0]['port'] == 9092
198+
assert obj['brokers'][0]['rack'] == 'testrack0'
199+
assert obj['brokers'][1]['node_id'] == 1
200+
assert obj['brokers'][1]['host'] == 'testhost1'
201+
assert obj['brokers'][1]['port'] == 9092
202+
assert obj['brokers'][1]['rack'] == 'testrack1'
203+
204+
assert obj['cluster_id'] == 'abcd'
205+
assert obj['controller_id'] == 0
206+
207+
assert len(obj['topics']) == 2
208+
assert obj['topics'][0]['error_code'] == 0
209+
assert obj['topics'][0]['topic'] == 'testtopic1'
210+
assert obj['topics'][0]['is_internal'] == False
211+
assert len(obj['topics'][0]['partitions']) == 2
212+
assert obj['topics'][0]['partitions'][0]['error_code'] == 0
213+
assert obj['topics'][0]['partitions'][0]['partition'] == 0
214+
assert obj['topics'][0]['partitions'][0]['leader'] == 0
215+
assert obj['topics'][0]['partitions'][0]['replicas'] == [0, 1]
216+
assert obj['topics'][0]['partitions'][0]['isr'] == [0, 1]
217+
assert obj['topics'][0]['partitions'][0]['offline_replicas'] == []
218+
assert obj['topics'][0]['partitions'][1]['error_code'] == 0
219+
assert obj['topics'][0]['partitions'][1]['partition'] == 1
220+
assert obj['topics'][0]['partitions'][1]['leader'] == 1
221+
assert obj['topics'][0]['partitions'][1]['replicas'] == [1, 0]
222+
assert obj['topics'][0]['partitions'][1]['isr'] == [1, 0]
223+
assert obj['topics'][0]['partitions'][1]['offline_replicas'] == []
224+
225+
assert obj['topics'][1]['error_code'] == 0
226+
assert obj['topics'][1]['topic'] == 'other-test-topic'
227+
assert obj['topics'][1]['is_internal'] == True
228+
assert len(obj['topics'][1]['partitions']) == 1
229+
assert obj['topics'][1]['partitions'][0]['error_code'] == 0
230+
assert obj['topics'][1]['partitions'][0]['partition'] == 0
231+
assert obj['topics'][1]['partitions'][0]['leader'] == 0
232+
assert obj['topics'][1]['partitions'][0]['replicas'] == [0, 1]
233+
assert obj['topics'][1]['partitions'][0]['isr'] == [0, 1]
234+
assert obj['topics'][1]['partitions'][0]['offline_replicas'] == []
235+
236+
tc.encode()

0 commit comments

Comments
 (0)