Skip to content

Commit d10051b

Browse files
committed
Added minor fixes for PR review
1 parent e992fbf commit d10051b

File tree

6 files changed

+20
-27
lines changed

6 files changed

+20
-27
lines changed

benchmarks/record_batch_compose.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,21 @@
11
#!/usr/bin/env python3
22
from __future__ import print_function
3-
import perf
4-
from kafka.record.memory_records import MemoryRecordsBuilder
5-
import itertools
6-
import random
73
import hashlib
4+
import itertools
85
import os
6+
import random
7+
8+
import perf
9+
10+
from kafka.record.memory_records import MemoryRecordsBuilder
911

1012

1113
DEFAULT_BATCH_SIZE = 1600 * 1024
1214
KEY_SIZE = 6
1315
VALUE_SIZE = 60
1416
TIMESTAMP_RANGE = [1505824130000, 1505824140000]
1517

16-
# With values above v1 record is 100 bytes, so 10_000 bytes for 100 messages
18+
# With values above v1 record is 100 bytes, so 10 000 bytes for 100 messages
1719
MESSAGES_PER_BATCH = 100
1820

1921

benchmarks/record_batch_read.py

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
11
#!/usr/bin/env python
22
from __future__ import print_function
3-
import perf
4-
from kafka.record.memory_records import MemoryRecords, MemoryRecordsBuilder
5-
import itertools
6-
import random
73
import hashlib
4+
import itertools
85
import os
6+
import random
7+
8+
import perf
9+
10+
from kafka.record.memory_records import MemoryRecords, MemoryRecordsBuilder
911

1012

1113
DEFAULT_BATCH_SIZE = 1600 * 1024

kafka/producer/kafka.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -370,7 +370,7 @@ def __init__(self, **configs):
370370
else:
371371
checker, compression_attrs = self._COMPRESSORS[ct]
372372
assert checker(), "Libraries for {} compression codec not found".format(ct)
373-
self.config['compression_type'] = compression_attrs
373+
self.config['compression_attrs'] = compression_attrs
374374

375375
message_version = self._max_usable_produce_magic()
376376
self._accumulator = RecordAccumulator(message_version=message_version, metrics=self._metrics, **self.config)

kafka/producer/record_accumulator.py

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ class RecordAccumulator(object):
149149
will block up to max_block_ms, raising an exception on timeout.
150150
In the current implementation, this setting is an approximation.
151151
Default: 33554432 (32MB)
152-
compression_type (int): The compression type for all data generated by
152+
compression_attrs (int): The compression type for all data generated by
153153
the producer. Valid values are gzip(1), snappy(2), lz4(3), or
154154
none(0).
155155
Compression is of full batches of data, so the efficacy of batching
@@ -168,32 +168,20 @@ class RecordAccumulator(object):
168168
DEFAULT_CONFIG = {
169169
'buffer_memory': 33554432,
170170
'batch_size': 16384,
171-
'compression_type': None,
171+
'compression_attrs': 0,
172172
'linger_ms': 0,
173173
'retry_backoff_ms': 100,
174174
'message_version': 0,
175175
'metrics': None,
176176
'metric_group_prefix': 'producer-metrics',
177177
}
178178

179-
_COMPRESSORS = {
180-
'gzip': LegacyRecordBatchBuilder.CODEC_GZIP,
181-
'snappy': LegacyRecordBatchBuilder.CODEC_SNAPPY,
182-
'lz4': LegacyRecordBatchBuilder.CODEC_LZ4,
183-
None: LegacyRecordBatchBuilder.CODEC_NONE
184-
}
185-
186179
def __init__(self, **configs):
187180
self.config = copy.copy(self.DEFAULT_CONFIG)
188181
for key in self.config:
189182
if key in configs:
190183
self.config[key] = configs.pop(key)
191184

192-
# Convert compression type to INT presentation. Mostly for unit tests,
193-
# as Producer should pass already converted values.
194-
ct = self.config["compression_type"]
195-
self.config["compression_type"] = self._COMPRESSORS.get(ct, ct)
196-
197185
self._closed = False
198186
self._flushes_in_progress = AtomicInteger()
199187
self._appends_in_progress = AtomicInteger()
@@ -269,7 +257,7 @@ def append(self, tp, timestamp_ms, key, value, max_time_to_block_ms,
269257

270258
records = MemoryRecordsBuilder(
271259
self.config['message_version'],
272-
self.config['compression_type'],
260+
self.config['compression_attrs'],
273261
self.config['batch_size']
274262
)
275263

kafka/protocol/message.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,8 @@ def encode(cls, items, prepend_size=True):
161161
if prepend_size:
162162
# rewind and return all the bytes
163163
items.seek(items.tell() - 4)
164-
return items.read(size + 4)
164+
size += 4
165+
return items.read(size)
165166

166167
encoded_values = []
167168
for (offset, message) in items:

kafka/record/abc.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ def append(self, offset, timestamp, key, value):
4747
Arguments:
4848
offset (int): Relative offset of record, starting from 0
4949
timestamp (int or None): Timestamp in milliseconds since beginning
50-
of the epoch (midnight Jan 1, 1970 (UTC)). If omited, will be
50+
of the epoch (midnight Jan 1, 1970 (UTC)). If omitted, will be
5151
set to current time.
5252
key (bytes or None): Key of the record
5353
value (bytes or None): Value of the record

0 commit comments

Comments
 (0)