Skip to content

Commit bef4f06

Browse files
committed
Support produce with Kafka record headers
1 parent 5ed5b4a commit bef4f06

File tree

4 files changed

+36
-18
lines changed

4 files changed

+36
-18
lines changed

kafka/producer/future.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,11 +29,11 @@ def wait(self, timeout=None):
2929

3030

3131
class FutureRecordMetadata(Future):
32-
def __init__(self, produce_future, relative_offset, timestamp_ms, checksum, serialized_key_size, serialized_value_size):
32+
def __init__(self, produce_future, relative_offset, timestamp_ms, checksum, serialized_key_size, serialized_value_size, serialized_header_size):
3333
super(FutureRecordMetadata, self).__init__()
3434
self._produce_future = produce_future
3535
# packing args as a tuple is a minor speed optimization
36-
self.args = (relative_offset, timestamp_ms, checksum, serialized_key_size, serialized_value_size)
36+
self.args = (relative_offset, timestamp_ms, checksum, serialized_key_size, serialized_value_size, serialized_header_size)
3737
produce_future.add_callback(self._produce_success)
3838
produce_future.add_errback(self.failure)
3939

@@ -42,7 +42,7 @@ def _produce_success(self, offset_and_timestamp):
4242

4343
# Unpacking from args tuple is minor speed optimization
4444
(relative_offset, timestamp_ms, checksum,
45-
serialized_key_size, serialized_value_size) = self.args
45+
serialized_key_size, serialized_value_size, serialized_header_size) = self.args
4646

4747
# None is when Broker does not support the API (<0.10) and
4848
# -1 is when the broker is configured for CREATE_TIME timestamps
@@ -53,7 +53,7 @@ def _produce_success(self, offset_and_timestamp):
5353
tp = self._produce_future.topic_partition
5454
metadata = RecordMetadata(tp[0], tp[1], tp, offset, timestamp_ms,
5555
checksum, serialized_key_size,
56-
serialized_value_size)
56+
serialized_value_size, serialized_header_size)
5757
self.success(metadata)
5858

5959
def get(self, timeout=None):
@@ -68,4 +68,4 @@ def get(self, timeout=None):
6868

6969
RecordMetadata = collections.namedtuple(
7070
'RecordMetadata', ['topic', 'partition', 'topic_partition', 'offset', 'timestamp',
71-
'checksum', 'serialized_key_size', 'serialized_value_size'])
71+
'checksum', 'serialized_key_size', 'serialized_value_size', 'serialized_header_size'])

kafka/producer/kafka.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -509,7 +509,7 @@ def _estimate_size_in_bytes(self, key, value, headers=[]):
509509
return LegacyRecordBatchBuilder.estimate_size_in_bytes(
510510
magic, self.config['compression_type'], key, value)
511511

512-
def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None):
512+
def send(self, topic, value=None, key=None, headers=None, partition=None, timestamp_ms=None):
513513
"""Publish a message to a topic.
514514
515515
Arguments:
@@ -530,6 +530,8 @@ def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None):
530530
partition (but if key is None, partition is chosen randomly).
531531
Must be type bytes, or be serializable to bytes via configured
532532
key_serializer.
533+
headers (optional): a list of header key value pairs. List items
534+
are tuples of str key and bytes value.
533535
timestamp_ms (int, optional): epoch milliseconds (from Jan 1 1970 UTC)
534536
to use as the message timestamp. Defaults to current time.
535537
@@ -559,13 +561,18 @@ def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None):
559561
partition = self._partition(topic, partition, key, value,
560562
key_bytes, value_bytes)
561563

562-
message_size = self._estimate_size_in_bytes(key_bytes, value_bytes)
564+
if headers is None:
565+
headers = []
566+
assert type(headers) == list
567+
assert all(type(item) == tuple and len(item) == 2 and type(item[0]) == str and type(item[1]) == bytes for item in headers)
568+
569+
message_size = self._estimate_size_in_bytes(key_bytes, value_bytes, headers)
563570
self._ensure_valid_record_size(message_size)
564571

565572
tp = TopicPartition(topic, partition)
566-
log.debug("Sending (key=%r value=%r) to %s", key, value, tp)
573+
log.debug("Sending (key=%r value=%r headers=%r) to %s", key, value, headers, tp)
567574
result = self._accumulator.append(tp, timestamp_ms,
568-
key_bytes, value_bytes,
575+
key_bytes, value_bytes, headers,
569576
self.config['max_block_ms'],
570577
estimated_size=message_size)
571578
future, batch_is_full, new_batch_created = result
@@ -584,7 +591,8 @@ def send(self, topic, value=None, key=None, partition=None, timestamp_ms=None):
584591
FutureProduceResult(TopicPartition(topic, partition)),
585592
-1, None, None,
586593
len(key_bytes) if key_bytes is not None else -1,
587-
len(value_bytes) if value_bytes is not None else -1
594+
len(value_bytes) if value_bytes is not None else -1,
595+
sum(len(h_key.encode("utf-8")) + len(h_value) for h_key, h_value in headers) if headers else -1,
588596
).failure(e)
589597

590598
def flush(self, timeout=None):

kafka/producer/record_accumulator.py

Lines changed: 9 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -55,8 +55,8 @@ def __init__(self, tp, records, buffer):
5555
def record_count(self):
5656
return self.records.next_offset()
5757

58-
def try_append(self, timestamp_ms, key, value):
59-
metadata = self.records.append(timestamp_ms, key, value)
58+
def try_append(self, timestamp_ms, key, value, headers):
59+
metadata = self.records.append(timestamp_ms, key, value, headers)
6060
if metadata is None:
6161
return None
6262

@@ -65,7 +65,8 @@ def try_append(self, timestamp_ms, key, value):
6565
future = FutureRecordMetadata(self.produce_future, metadata.offset,
6666
metadata.timestamp, metadata.crc,
6767
len(key) if key is not None else -1,
68-
len(value) if value is not None else -1)
68+
len(value) if value is not None else -1,
69+
sum(len(h_key.encode("utf-8")) + len(h_val) for h_key, h_val in headers) if headers else -1)
6970
return future
7071

7172
def done(self, base_offset=None, timestamp_ms=None, exception=None):
@@ -196,7 +197,7 @@ def __init__(self, **configs):
196197
self.muted = set()
197198
self._drain_index = 0
198199

199-
def append(self, tp, timestamp_ms, key, value, max_time_to_block_ms,
200+
def append(self, tp, timestamp_ms, key, value, headers, max_time_to_block_ms,
200201
estimated_size=0):
201202
"""Add a record to the accumulator, return the append result.
202203
@@ -209,6 +210,7 @@ def append(self, tp, timestamp_ms, key, value, max_time_to_block_ms,
209210
timestamp_ms (int): The timestamp of the record (epoch ms)
210211
key (bytes): The key for the record
211212
value (bytes): The value for the record
213+
headers (List[Tuple[str, bytes]]): The header fields for the record
212214
max_time_to_block_ms (int): The maximum time in milliseconds to
213215
block for buffer memory to be available
214216
@@ -231,7 +233,7 @@ def append(self, tp, timestamp_ms, key, value, max_time_to_block_ms,
231233
dq = self._batches[tp]
232234
if dq:
233235
last = dq[-1]
234-
future = last.try_append(timestamp_ms, key, value)
236+
future = last.try_append(timestamp_ms, key, value, headers)
235237
if future is not None:
236238
batch_is_full = len(dq) > 1 or last.records.is_full()
237239
return future, batch_is_full, False
@@ -246,7 +248,7 @@ def append(self, tp, timestamp_ms, key, value, max_time_to_block_ms,
246248

247249
if dq:
248250
last = dq[-1]
249-
future = last.try_append(timestamp_ms, key, value)
251+
future = last.try_append(timestamp_ms, key, value, headers)
250252
if future is not None:
251253
# Somebody else found us a batch, return the one we
252254
# waited for! Hopefully this doesn't happen often...
@@ -261,7 +263,7 @@ def append(self, tp, timestamp_ms, key, value, max_time_to_block_ms,
261263
)
262264

263265
batch = ProducerBatch(tp, records, buf)
264-
future = batch.try_append(timestamp_ms, key, value)
266+
future = batch.try_append(timestamp_ms, key, value, headers)
265267
if not future:
266268
raise Exception()
267269

test/test_producer.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,10 +91,16 @@ def test_kafka_producer_proper_record_metadata(kafka_broker, compression):
9191
compression_type=compression)
9292
magic = producer._max_usable_produce_magic()
9393

94+
# record headers are supported in 0.11.0
95+
if version() < (0, 11, 0):
96+
headers = None
97+
else:
98+
headers = [("Header Key", b"Header Value")]
99+
94100
topic = random_string(5)
95101
future = producer.send(
96102
topic,
97-
value=b"Simple value", key=b"Simple key", timestamp_ms=9999999,
103+
value=b"Simple value", key=b"Simple key", headers=headers, timestamp_ms=9999999,
98104
partition=0)
99105
record = future.get(timeout=5)
100106
assert record is not None
@@ -116,6 +122,8 @@ def test_kafka_producer_proper_record_metadata(kafka_broker, compression):
116122

117123
assert record.serialized_key_size == 10
118124
assert record.serialized_value_size == 12
125+
if headers:
126+
assert record.serialized_header_size == 22
119127

120128
# generated timestamp case is skipped for broker 0.9 and below
121129
if magic == 0:

0 commit comments

Comments
 (0)