Skip to content

Refactor MessageSet and Message into LegacyRecordBatch #1252

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Oct 14, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions benchmarks/README
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
The `record_batch_*` benchmarks in this section are written using
``perf`` library, created by Viktor Stinner. For more information on how to get
reliable results of test runs please consult
http://perf.readthedocs.io/en/latest/run_benchmark.html.
75 changes: 75 additions & 0 deletions benchmarks/record_batch_compose.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
#!/usr/bin/env python3
from __future__ import print_function
import hashlib
import itertools
import os
import random

import perf

from kafka.record.memory_records import MemoryRecordsBuilder


DEFAULT_BATCH_SIZE = 1600 * 1024
KEY_SIZE = 6
VALUE_SIZE = 60
TIMESTAMP_RANGE = [1505824130000, 1505824140000]

# With values above v1 record is 100 bytes, so 10 000 bytes for 100 messages
MESSAGES_PER_BATCH = 100


def random_bytes(length):
buffer = bytearray(length)
for i in range(length):
buffer[i] = random.randint(0, 255)
return bytes(buffer)


def prepare():
return iter(itertools.cycle([
(random_bytes(KEY_SIZE),
random_bytes(VALUE_SIZE),
random.randint(*TIMESTAMP_RANGE)
)
for _ in range(int(MESSAGES_PER_BATCH * 1.94))
]))


def finalize(results):
# Just some strange code to make sure PyPy does execute the main code
# properly, without optimizing it away
hash_val = hashlib.md5()
for buf in results:
hash_val.update(buf)
print(hash_val, file=open(os.devnull, "w"))


def func(loops, magic):
# Jit can optimize out the whole function if the result is the same each
# time, so we need some randomized input data )
precomputed_samples = prepare()
results = []

# Main benchmark code.
t0 = perf.perf_counter()
for _ in range(loops):
batch = MemoryRecordsBuilder(
magic, batch_size=DEFAULT_BATCH_SIZE, compression_type=0)
for _ in range(MESSAGES_PER_BATCH):
key, value, timestamp = next(precomputed_samples)
size = batch.append(timestamp=timestamp, key=key, value=value)
assert size
batch.close()
results.append(batch.buffer())

res = perf.perf_counter() - t0

finalize(results)

return res


runner = perf.Runner()
runner.bench_time_func('batch_append_v0', func, 0)
runner.bench_time_func('batch_append_v1', func, 1)
80 changes: 80 additions & 0 deletions benchmarks/record_batch_read.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
#!/usr/bin/env python
from __future__ import print_function
import hashlib
import itertools
import os
import random

import perf

from kafka.record.memory_records import MemoryRecords, MemoryRecordsBuilder


DEFAULT_BATCH_SIZE = 1600 * 1024
KEY_SIZE = 6
VALUE_SIZE = 60
TIMESTAMP_RANGE = [1505824130000, 1505824140000]

BATCH_SAMPLES = 5
MESSAGES_PER_BATCH = 100


def random_bytes(length):
buffer = bytearray(length)
for i in range(length):
buffer[i] = random.randint(0, 255)
return bytes(buffer)


def prepare(magic):
samples = []
for _ in range(BATCH_SAMPLES):
batch = MemoryRecordsBuilder(
magic, batch_size=DEFAULT_BATCH_SIZE, compression_type=0)
for _ in range(MESSAGES_PER_BATCH):
size = batch.append(
random.randint(*TIMESTAMP_RANGE),
random_bytes(KEY_SIZE),
random_bytes(VALUE_SIZE))
assert size
batch.close()
samples.append(bytes(batch.buffer()))

return iter(itertools.cycle(samples))


def finalize(results):
# Just some strange code to make sure PyPy does execute the code above
# properly
hash_val = hashlib.md5()
for buf in results:
hash_val.update(buf)
print(hash_val, file=open(os.devnull, "w"))


def func(loops, magic):
# Jit can optimize out the whole function if the result is the same each
# time, so we need some randomized input data )
precomputed_samples = prepare(magic)
results = []

# Main benchmark code.
batch_data = next(precomputed_samples)
t0 = perf.perf_counter()
for _ in range(loops):
records = MemoryRecords(batch_data)
while records.has_next():
batch = records.next_batch()
batch.validate_crc()
for record in batch:
results.append(record.value)

res = perf.perf_counter() - t0
finalize(results)

return res


runner = perf.Runner()
runner.bench_time_func('batch_read_v0', func, 0)
runner.bench_time_func('batch_read_v1', func, 1)
106 changes: 27 additions & 79 deletions kafka/consumer/fetcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,10 +13,10 @@
from kafka.future import Future
from kafka.metrics.stats import Avg, Count, Max, Rate
from kafka.protocol.fetch import FetchRequest
from kafka.protocol.message import PartialMessage
from kafka.protocol.offset import (
OffsetRequest, OffsetResetStrategy, UNKNOWN_OFFSET
)
from kafka.record import MemoryRecords
from kafka.serializer import Deserializer
from kafka.structs import TopicPartition, OffsetAndTimestamp

Expand Down Expand Up @@ -295,7 +295,7 @@ def fetched_records(self, max_records=None):

Raises:
OffsetOutOfRangeError: if no subscription offset_reset_strategy
InvalidMessageError: if message crc validation fails (check_crcs
CorruptRecordException: if message crc validation fails (check_crcs
must be set to True)
RecordTooLargeError: if a message is larger than the currently
configured max_partition_fetch_bytes
Expand Down Expand Up @@ -440,57 +440,25 @@ def _message_generator(self):

self._next_partition_records = None

def _unpack_message_set(self, tp, messages):
def _unpack_message_set(self, tp, records):
try:
for offset, size, msg in messages:
if self.config['check_crcs'] and not msg.validate_crc():
raise Errors.InvalidMessageError(msg)

if not msg.is_compressed():
yield self._parse_record(tp, offset, msg.timestamp, msg)

else:
# If relative offset is used, we need to decompress the entire message first
# to compute the absolute offset.
inner_mset = msg.decompress()

# There should only ever be a single layer of compression
if inner_mset[0][-1].is_compressed():
log.warning('MessageSet at %s offset %d appears '
' double-compressed. This should not'
' happen -- check your producers!',
tp, offset)
if self.config['skip_double_compressed_messages']:
log.warning('Skipping double-compressed message at'
' %s %d', tp, offset)
continue

if msg.magic > 0:
last_offset, _, _ = inner_mset[-1]
absolute_base_offset = offset - last_offset
else:
absolute_base_offset = -1

for inner_offset, inner_size, inner_msg in inner_mset:
if msg.magic > 0:
# When magic value is greater than 0, the timestamp
# of a compressed message depends on the
# typestamp type of the wrapper message:

if msg.timestamp_type == 0: # CREATE_TIME (0)
inner_timestamp = inner_msg.timestamp

elif msg.timestamp_type == 1: # LOG_APPEND_TIME (1)
inner_timestamp = msg.timestamp

else:
raise ValueError('Unknown timestamp type: {0}'.format(msg.timestamp_type))
else:
inner_timestamp = msg.timestamp

if absolute_base_offset >= 0:
inner_offset += absolute_base_offset
yield self._parse_record(tp, inner_offset, inner_timestamp, inner_msg)
batch = records.next_batch()
while batch is not None:
for record in batch:
key_size = len(record.key) if record.key is not None else -1
value_size = len(record.value) if record.value is not None else -1
key = self._deserialize(
self.config['key_deserializer'],
tp.topic, record.key)
value = self._deserialize(
self.config['value_deserializer'],
tp.topic, record.value)
yield ConsumerRecord(
tp.topic, tp.partition, record.offset, record.timestamp,
record.timestamp_type, key, value, record.checksum,
key_size, value_size)

batch = records.next_batch()

# If unpacking raises StopIteration, it is erroneously
# caught by the generator. We want all exceptions to be raised
Expand All @@ -499,21 +467,6 @@ def _unpack_message_set(self, tp, messages):
log.exception('StopIteration raised unpacking messageset: %s', e)
raise Exception('StopIteration raised unpacking messageset')

# If unpacking raises AssertionError, it means decompression unsupported
# See Issue 1033
except AssertionError as e:
log.exception('AssertionError raised unpacking messageset: %s', e)
raise

def _parse_record(self, tp, offset, timestamp, msg):
key = self._deserialize(self.config['key_deserializer'], tp.topic, msg.key)
value = self._deserialize(self.config['value_deserializer'], tp.topic, msg.value)
return ConsumerRecord(tp.topic, tp.partition, offset,
timestamp, msg.timestamp_type,
key, value, msg.crc,
len(msg.key) if msg.key is not None else -1,
len(msg.value) if msg.value is not None else -1)

def __iter__(self): # pylint: disable=non-iterator-returned
return self

Expand Down Expand Up @@ -775,15 +728,13 @@ def _handle_fetch_response(self, request, send_time, response):

def _parse_fetched_data(self, completed_fetch):
tp = completed_fetch.topic_partition
partition = completed_fetch.partition_data
fetch_offset = completed_fetch.fetched_offset
num_bytes = 0
records_count = 0
parsed_records = None

error_code, highwater = completed_fetch.partition_data[:2]
error_type = Errors.for_code(error_code)
messages = completed_fetch.partition_data[-1]

try:
if not self._subscriptions.is_fetchable(tp):
Expand All @@ -807,21 +758,18 @@ def _parse_fetched_data(self, completed_fetch):
position)
return None

partial = None
if messages and isinstance(messages[-1][-1], PartialMessage):
partial = messages.pop()

if messages:
records = MemoryRecords(completed_fetch.partition_data[-1])
if records.has_next():
log.debug("Adding fetched record for partition %s with"
" offset %d to buffered record list", tp,
position)
unpacked = list(self._unpack_message_set(tp, messages))
unpacked = list(self._unpack_message_set(tp, records))
parsed_records = self.PartitionRecords(fetch_offset, tp, unpacked)
last_offset, _, _ = messages[-1]
last_offset = unpacked[-1].offset
self._sensors.records_fetch_lag.record(highwater - last_offset)
num_bytes = sum(msg[1] for msg in messages)
records_count = len(messages)
elif partial:
num_bytes = records.valid_bytes()
records_count = len(unpacked)
elif records.size_in_bytes() > 0:
# we did not read a single message from a non-empty
# buffer because that message's size is larger than
# fetch size, in this case record this exception
Expand Down
7 changes: 5 additions & 2 deletions kafka/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,12 +101,15 @@ class OffsetOutOfRangeError(BrokerResponseError):
' maintained by the server for the given topic/partition.')


class InvalidMessageError(BrokerResponseError):
class CorruptRecordException(BrokerResponseError):
errno = 2
message = 'INVALID_MESSAGE'
message = 'CORRUPT_MESSAGE'
description = ('This message has failed its CRC checksum, exceeds the'
' valid size, or is otherwise corrupt.')

# Backward compatibility
InvalidMessageError = CorruptRecordException


class UnknownTopicOrPartitionError(BrokerResponseError):
errno = 3
Expand Down
Loading