From d7710d1a6ea1e158918d90e56833db53375ff636 Mon Sep 17 00:00:00 2001 From: Gabriel Tincu Date: Thu, 12 Mar 2020 16:25:01 +0200 Subject: [PATCH 1/9] Add zstd compression support to the kafka client --- requirements-dev.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements-dev.txt b/requirements-dev.txt index d2830905b..77b8b5134 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -15,3 +15,4 @@ pytest-mock==1.10.0 sphinx-rtd-theme==0.2.4 crc32c==1.7 py==1.8.0 +zstandard==0.13.0 \ No newline at end of file From 7e84854b6556102acd958349331577c3752630a3 Mon Sep 17 00:00:00 2001 From: Gabriel Tincu Date: Mon, 16 Mar 2020 00:06:47 +0100 Subject: [PATCH 2/9] Add all produce api required changes to a separate branch (basically all but the zstd related stuff) --- kafka/protocol/produce.py | 1 - 1 file changed, 1 deletion(-) diff --git a/kafka/protocol/produce.py b/kafka/protocol/produce.py index 9b3f6bf55..40a3bf566 100644 --- a/kafka/protocol/produce.py +++ b/kafka/protocol/produce.py @@ -61,7 +61,6 @@ class ProduceResponse_v4(Response): API_VERSION = 4 SCHEMA = ProduceResponse_v3.SCHEMA - class ProduceResponse_v5(Response): API_KEY = 0 API_VERSION = 5 From 400f6b99e6233de9774e4e5e49e3763dff0e9e05 Mon Sep 17 00:00:00 2001 From: Gabriel Tincu Date: Mon, 16 Mar 2020 18:36:34 +0100 Subject: [PATCH 3/9] Newline between class defs, revert requirements file --- kafka/protocol/produce.py | 1 + requirements-dev.txt | 1 - 2 files changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/protocol/produce.py b/kafka/protocol/produce.py index 40a3bf566..9b3f6bf55 100644 --- a/kafka/protocol/produce.py +++ b/kafka/protocol/produce.py @@ -61,6 +61,7 @@ class ProduceResponse_v4(Response): API_VERSION = 4 SCHEMA = ProduceResponse_v3.SCHEMA + class ProduceResponse_v5(Response): API_KEY = 0 API_VERSION = 5 diff --git a/requirements-dev.txt b/requirements-dev.txt index 77b8b5134..d2830905b 100644 --- a/requirements-dev.txt +++ b/requirements-dev.txt @@ -15,4 +15,3 @@ pytest-mock==1.10.0 sphinx-rtd-theme==0.2.4 crc32c==1.7 py==1.8.0 -zstandard==0.13.0 \ No newline at end of file From 3b8c706b084e0e9b88b2ad446dae684d34868946 Mon Sep 17 00:00:00 2001 From: Gabriel Tincu Date: Mon, 16 Mar 2020 00:14:52 +0100 Subject: [PATCH 4/9] Add zstd relevant code to a separate PR --- kafka/codec.py | 21 +++++++++++++++++++++ kafka/producer/kafka.py | 8 ++++++-- kafka/record/default_records.py | 11 +++++++++-- kafka/record/memory_records.py | 2 +- test/test_codec.py | 11 ++++++++++- test/test_producer.py | 15 +++++++++++++-- tox.ini | 1 + 7 files changed, 61 insertions(+), 8 deletions(-) diff --git a/kafka/codec.py b/kafka/codec.py index aa9fc8291..a63bbdcb9 100644 --- a/kafka/codec.py +++ b/kafka/codec.py @@ -16,6 +16,11 @@ except ImportError: snappy = None +try: + import zstandard as zstd +except ImportError: + zstd = None + try: import lz4.frame as lz4 @@ -58,6 +63,10 @@ def has_snappy(): return snappy is not None +def has_zstd(): + return zstd is not None + + def has_lz4(): if lz4 is not None: return True @@ -299,3 +308,15 @@ def lz4_decode_old_kafka(payload): payload[header_size:] ]) return lz4_decode(munged_payload) + + +def zstd_encode(payload): + if not zstd: + raise NotImplementedError("Zstd codec is not available") + return zstd.ZstdCompressor().compress(payload) + + +def zstd_decode(payload): + if not zstd: + raise NotImplementedError("Zstd codec is not available") + return zstd.ZstdDecompressor().decompress(payload) diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 9509ab940..dba18015a 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -12,7 +12,7 @@ import kafka.errors as Errors from kafka.client_async import KafkaClient, selectors -from kafka.codec import has_gzip, has_snappy, has_lz4 +from kafka.codec import has_gzip, has_snappy, has_lz4, has_zstd from kafka.metrics import MetricConfig, Metrics from kafka.partitioner.default import DefaultPartitioner from kafka.producer.future import FutureRecordMetadata, FutureProduceResult @@ -119,7 +119,7 @@ class KafkaProducer(object): available guarantee. If unset, defaults to acks=1. compression_type (str): The compression type for all data generated by - the producer. Valid values are 'gzip', 'snappy', 'lz4', or None. + the producer. Valid values are 'gzip', 'snappy', 'lz4', 'zstd' or None. Compression is of full batches of data, so the efficacy of batching will also impact the compression ratio (more batching means better compression). Default: None. @@ -339,6 +339,7 @@ class KafkaProducer(object): 'gzip': (has_gzip, LegacyRecordBatchBuilder.CODEC_GZIP), 'snappy': (has_snappy, LegacyRecordBatchBuilder.CODEC_SNAPPY), 'lz4': (has_lz4, LegacyRecordBatchBuilder.CODEC_LZ4), + 'zstd': (has_zstd, DefaultRecordBatchBuilder.CODEC_ZSTD), None: (lambda: True, LegacyRecordBatchBuilder.CODEC_NONE), } @@ -388,6 +389,9 @@ def __init__(self, **configs): if self.config['compression_type'] == 'lz4': assert self.config['api_version'] >= (0, 8, 2), 'LZ4 Requires >= Kafka 0.8.2 Brokers' + if self.config['compression_type'] == 'zstd': + assert self.config['api_version'] >= (2, 1, 0), 'Zstd Requires >= Kafka 2.1.0 Brokers' + # Check compression_type for library support ct = self.config['compression_type'] if ct not in self._COMPRESSORS: diff --git a/kafka/record/default_records.py b/kafka/record/default_records.py index 07368bba9..917c81cb8 100644 --- a/kafka/record/default_records.py +++ b/kafka/record/default_records.py @@ -62,8 +62,8 @@ ) from kafka.errors import CorruptRecordException, UnsupportedCodecError from kafka.codec import ( - gzip_encode, snappy_encode, lz4_encode, - gzip_decode, snappy_decode, lz4_decode + gzip_encode, snappy_encode, lz4_encode, zstd_encode, + gzip_decode, snappy_decode, lz4_decode, zstd_decode ) import kafka.codec as codecs @@ -97,6 +97,7 @@ class DefaultRecordBase(object): CODEC_GZIP = 0x01 CODEC_SNAPPY = 0x02 CODEC_LZ4 = 0x03 + CODEC_ZSTD = 0x04 TIMESTAMP_TYPE_MASK = 0x08 TRANSACTIONAL_MASK = 0x10 CONTROL_MASK = 0x20 @@ -111,6 +112,8 @@ def _assert_has_codec(self, compression_type): checker, name = codecs.has_snappy, "snappy" elif compression_type == self.CODEC_LZ4: checker, name = codecs.has_lz4, "lz4" + elif compression_type == self.CODEC_ZSTD: + checker, name = codecs.has_zstd, "zstd" if not checker(): raise UnsupportedCodecError( "Libraries for {} compression codec not found".format(name)) @@ -185,6 +188,8 @@ def _maybe_uncompress(self): uncompressed = snappy_decode(data.tobytes()) if compression_type == self.CODEC_LZ4: uncompressed = lz4_decode(data.tobytes()) + if compression_type == self.CODEC_ZSTD: + uncompressed = zstd_decode(data) self._buffer = bytearray(uncompressed) self._pos = 0 self._decompressed = True @@ -517,6 +522,8 @@ def _maybe_compress(self): compressed = snappy_encode(data) elif self._compression_type == self.CODEC_LZ4: compressed = lz4_encode(data) + elif self._compression_type == self.CODEC_ZSTD: + compressed = zstd_encode(data) compressed_size = len(compressed) if len(data) <= compressed_size: # We did not get any benefit from compression, lets send diff --git a/kafka/record/memory_records.py b/kafka/record/memory_records.py index a6c4b51f7..fc2ef2d6b 100644 --- a/kafka/record/memory_records.py +++ b/kafka/record/memory_records.py @@ -117,7 +117,7 @@ class MemoryRecordsBuilder(object): def __init__(self, magic, compression_type, batch_size): assert magic in [0, 1, 2], "Not supported magic" - assert compression_type in [0, 1, 2, 3], "Not valid compression type" + assert compression_type in [0, 1, 2, 3, 4], "Not valid compression type" if magic >= 2: self._builder = DefaultRecordBatchBuilder( magic=magic, compression_type=compression_type, diff --git a/test/test_codec.py b/test/test_codec.py index 9eff888fe..e05707451 100644 --- a/test/test_codec.py +++ b/test/test_codec.py @@ -7,11 +7,12 @@ from kafka.vendor.six.moves import range from kafka.codec import ( - has_snappy, has_lz4, + has_snappy, has_lz4, has_zstd, gzip_encode, gzip_decode, snappy_encode, snappy_decode, lz4_encode, lz4_decode, lz4_encode_old_kafka, lz4_decode_old_kafka, + zstd_encode, zstd_decode, ) from test.testutil import random_string @@ -113,3 +114,11 @@ def test_lz4_incremental(): b2 = lz4_decode(lz4_encode(b1)) assert len(b1) == len(b2) assert b1 == b2 + + +@pytest.mark.skipif(not has_zstd(), reason="Zstd not available") +def test_zstd(): + for _ in range(1000): + b1 = random_string(100).encode('utf-8') + b2 = zstd_decode(zstd_encode(b1)) + assert b1 == b2 diff --git a/test/test_producer.py b/test/test_producer.py index 9605adf58..793ad660c 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -23,7 +23,7 @@ def test_buffer_pool(): @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") -@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4']) +@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4', 'zstd']) def test_end_to_end(kafka_broker, compression): if compression == 'lz4': @@ -34,10 +34,15 @@ def test_end_to_end(kafka_broker, compression): elif platform.python_implementation() == 'PyPy': return + if compression == 'zstd' and env_kafka_version() < (2, 1, 0): + return + env_version = env_kafka_version() + api_version = env_version if env_version >= (2, 1, 0) else None connect_str = ':'.join([kafka_broker.host, str(kafka_broker.port)]) producer = KafkaProducer(bootstrap_servers=connect_str, retries=5, max_block_ms=30000, + api_version=api_version, compression_type=compression, value_serializer=str.encode) consumer = KafkaConsumer(bootstrap_servers=connect_str, @@ -81,16 +86,22 @@ def test_kafka_producer_gc_cleanup(): @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") -@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4']) +@pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4', 'zstd']) def test_kafka_producer_proper_record_metadata(kafka_broker, compression): + if compression == 'zstd' and env_kafka_version() < (2, 1, 0): + return connect_str = ':'.join([kafka_broker.host, str(kafka_broker.port)]) + env_version = env_kafka_version() + api_version = env_version if env_version >= (2, 1, 0) else None producer = KafkaProducer(bootstrap_servers=connect_str, retries=5, + api_version=api_version, max_block_ms=30000, compression_type=compression) magic = producer._max_usable_produce_magic() # record headers are supported in 0.11.0 + if env_kafka_version() < (0, 11, 0): headers = None else: diff --git a/tox.ini b/tox.ini index 06403d6ed..596a9f211 100644 --- a/tox.ini +++ b/tox.ini @@ -15,6 +15,7 @@ deps = pytest-mock mock python-snappy + zstandard lz4 xxhash crc32c From 217d5b3f0d09cda7a0cf064b4f195959e39204e2 Mon Sep 17 00:00:00 2001 From: Gabriel Tincu Date: Mon, 16 Mar 2020 00:26:11 +0100 Subject: [PATCH 5/9] Remove extra newline --- test/test_producer.py | 1 - 1 file changed, 1 deletion(-) diff --git a/test/test_producer.py b/test/test_producer.py index 793ad660c..5a470179f 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -101,7 +101,6 @@ def test_kafka_producer_proper_record_metadata(kafka_broker, compression): magic = producer._max_usable_produce_magic() # record headers are supported in 0.11.0 - if env_kafka_version() < (0, 11, 0): headers = None else: From 14265537374e31827293ec75c7b22f1223465c8a Mon Sep 17 00:00:00 2001 From: Gabriel Tincu Date: Mon, 16 Mar 2020 19:14:01 +0100 Subject: [PATCH 6/9] Update docstring, always force api version to be in sync with actual broker running when building producer --- docs/index.rst | 10 +++++----- test/test_producer.py | 10 +++------- 2 files changed, 8 insertions(+), 12 deletions(-) diff --git a/docs/index.rst b/docs/index.rst index fa6f93c50..242f9eb8d 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -122,11 +122,11 @@ multiprocessing is recommended. Compression *********** -kafka-python supports gzip compression/decompression natively. To produce or -consume lz4 compressed messages, you should install python-lz4 (pip install lz4). -To enable snappy, install python-snappy (also requires snappy library). -See `Installation `_ for more information. - +kafka-python supports multiple compression types. To produce or + - gzip : supported natively + - lz4 : requires `python-lz4 `_ installed + - snappy : requires the `python-snappy `_ package (which requires the snappy C library) + - zstd : requires the `python-zstandard `_ package installed Protocol ******** diff --git a/test/test_producer.py b/test/test_producer.py index 5a470179f..94b2faa9e 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -25,7 +25,6 @@ def test_buffer_pool(): @pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set") @pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4', 'zstd']) def test_end_to_end(kafka_broker, compression): - if compression == 'lz4': # LZ4 requires 0.8.2 if env_kafka_version() < (0, 8, 2): @@ -36,13 +35,12 @@ def test_end_to_end(kafka_broker, compression): if compression == 'zstd' and env_kafka_version() < (2, 1, 0): return - env_version = env_kafka_version() - api_version = env_version if env_version >= (2, 1, 0) else None + connect_str = ':'.join([kafka_broker.host, str(kafka_broker.port)]) producer = KafkaProducer(bootstrap_servers=connect_str, retries=5, max_block_ms=30000, - api_version=api_version, + api_version=env_kafka_version(), compression_type=compression, value_serializer=str.encode) consumer = KafkaConsumer(bootstrap_servers=connect_str, @@ -91,11 +89,9 @@ def test_kafka_producer_proper_record_metadata(kafka_broker, compression): if compression == 'zstd' and env_kafka_version() < (2, 1, 0): return connect_str = ':'.join([kafka_broker.host, str(kafka_broker.port)]) - env_version = env_kafka_version() - api_version = env_version if env_version >= (2, 1, 0) else None producer = KafkaProducer(bootstrap_servers=connect_str, retries=5, - api_version=api_version, + api_version=env_kafka_version(), max_block_ms=30000, compression_type=compression) magic = producer._max_usable_produce_magic() From 75b810fb150d9644a8d651e6ce8755232f2ba07d Mon Sep 17 00:00:00 2001 From: Gabriel Tincu Date: Tue, 17 Mar 2020 10:44:19 +0100 Subject: [PATCH 7/9] Update producer test to use pytest.skip instead of return statements Harden zstd decompression for missing frame size information (can happen when the sender is not under our control) --- kafka/codec.py | 6 +++++- test/test_producer.py | 14 +++++--------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/kafka/codec.py b/kafka/codec.py index a63bbdcb9..8ca0728f6 100644 --- a/kafka/codec.py +++ b/kafka/codec.py @@ -10,6 +10,7 @@ _XERIAL_V1_HEADER = (-126, b'S', b'N', b'A', b'P', b'P', b'Y', 0, 1, 1) _XERIAL_V1_FORMAT = 'bccccccBii' +ZSTD_MAX_OUTPUT_SIZE = 1024 ** 3 try: import snappy @@ -319,4 +320,7 @@ def zstd_encode(payload): def zstd_decode(payload): if not zstd: raise NotImplementedError("Zstd codec is not available") - return zstd.ZstdDecompressor().decompress(payload) + try: + return zstd.ZstdDecompressor().decompress(payload) + except zstd.ZstdError: + return zstd.ZstdDecompressor().decompress(payload, max_output_size=ZSTD_MAX_OUTPUT_SIZE) diff --git a/test/test_producer.py b/test/test_producer.py index 94b2faa9e..af8fc26f3 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -26,15 +26,13 @@ def test_buffer_pool(): @pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4', 'zstd']) def test_end_to_end(kafka_broker, compression): if compression == 'lz4': - # LZ4 requires 0.8.2 if env_kafka_version() < (0, 8, 2): - return - # python-lz4 crashes on older versions of pypy + pytest.skip('LZ4 requires 0.8.2') elif platform.python_implementation() == 'PyPy': - return + pytest.skip('python-lz4 crashes on older versions of pypy') if compression == 'zstd' and env_kafka_version() < (2, 1, 0): - return + pytest.skip('zstd requires kafka 2.1.0 or newer') connect_str = ':'.join([kafka_broker.host, str(kafka_broker.port)]) producer = KafkaProducer(bootstrap_servers=connect_str, @@ -87,7 +85,7 @@ def test_kafka_producer_gc_cleanup(): @pytest.mark.parametrize("compression", [None, 'gzip', 'snappy', 'lz4', 'zstd']) def test_kafka_producer_proper_record_metadata(kafka_broker, compression): if compression == 'zstd' and env_kafka_version() < (2, 1, 0): - return + pytest.skip('zstd requires 2.1.0 or more') connect_str = ':'.join([kafka_broker.host, str(kafka_broker.port)]) producer = KafkaProducer(bootstrap_servers=connect_str, retries=5, @@ -130,10 +128,8 @@ def test_kafka_producer_proper_record_metadata(kafka_broker, compression): if headers: assert record.serialized_header_size == 22 - # generated timestamp case is skipped for broker 0.9 and below if magic == 0: - return - + pytest.skip('generated timestamp case is skipped for broker 0.9 and below') send_time = time.time() * 1000 future = producer.send( topic, From c9b5ba262b8d97ccf6f7c2a421d6bf40c8572858 Mon Sep 17 00:00:00 2001 From: Gabriel Tincu Date: Sat, 21 Mar 2020 00:31:50 +0100 Subject: [PATCH 8/9] Update travis config with native zstd lib package install directive --- .travis.yml | 1 + 1 file changed, 1 insertion(+) diff --git a/.travis.yml b/.travis.yml index b98aa16b1..fe5028881 100644 --- a/.travis.yml +++ b/.travis.yml @@ -21,6 +21,7 @@ addons: apt: packages: - libsnappy-dev + - libzstd-dev - openjdk-8-jdk cache: From 5002d670253806e9a8f2b77702e906464e845d11 Mon Sep 17 00:00:00 2001 From: Gabriel Tincu Date: Sat, 21 Mar 2020 00:54:40 +0100 Subject: [PATCH 9/9] PR change requests Update readme zstd decompress will transform memoryview object to bytes before decompressing, to keep the same behavior as other decompression strategies Decrease fallback max message size due to OOM concerns Rewrite the 1MB message limit Test producer constructor makes use of broker version inference again, also add logic for zstd decompression --- docs/index.rst | 3 ++- kafka/codec.py | 2 +- kafka/protocol/message.py | 10 +++++++--- kafka/record/default_records.py | 2 +- test/test_producer.py | 2 -- 5 files changed, 11 insertions(+), 8 deletions(-) diff --git a/docs/index.rst b/docs/index.rst index 242f9eb8d..9c46e3313 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -122,7 +122,8 @@ multiprocessing is recommended. Compression *********** -kafka-python supports multiple compression types. To produce or +kafka-python supports multiple compression types: + - gzip : supported natively - lz4 : requires `python-lz4 `_ installed - snappy : requires the `python-snappy `_ package (which requires the snappy C library) diff --git a/kafka/codec.py b/kafka/codec.py index 8ca0728f6..917400e74 100644 --- a/kafka/codec.py +++ b/kafka/codec.py @@ -10,7 +10,7 @@ _XERIAL_V1_HEADER = (-126, b'S', b'N', b'A', b'P', b'P', b'Y', 0, 1, 1) _XERIAL_V1_FORMAT = 'bccccccBii' -ZSTD_MAX_OUTPUT_SIZE = 1024 ** 3 +ZSTD_MAX_OUTPUT_SIZE = 1024 * 1024 try: import snappy diff --git a/kafka/protocol/message.py b/kafka/protocol/message.py index 31527bf63..4c5c031b8 100644 --- a/kafka/protocol/message.py +++ b/kafka/protocol/message.py @@ -3,8 +3,8 @@ import io import time -from kafka.codec import (has_gzip, has_snappy, has_lz4, - gzip_decode, snappy_decode, +from kafka.codec import (has_gzip, has_snappy, has_lz4, has_zstd, + gzip_decode, snappy_decode, zstd_decode, lz4_decode, lz4_decode_old_kafka) from kafka.protocol.frame import KafkaBytes from kafka.protocol.struct import Struct @@ -35,6 +35,7 @@ class Message(Struct): CODEC_GZIP = 0x01 CODEC_SNAPPY = 0x02 CODEC_LZ4 = 0x03 + CODEC_ZSTD = 0x04 TIMESTAMP_TYPE_MASK = 0x08 HEADER_SIZE = 22 # crc(4), magic(1), attributes(1), timestamp(8), key+value size(4*2) @@ -119,7 +120,7 @@ def is_compressed(self): def decompress(self): codec = self.attributes & self.CODEC_MASK - assert codec in (self.CODEC_GZIP, self.CODEC_SNAPPY, self.CODEC_LZ4) + assert codec in (self.CODEC_GZIP, self.CODEC_SNAPPY, self.CODEC_LZ4, self.CODEC_ZSTD) if codec == self.CODEC_GZIP: assert has_gzip(), 'Gzip decompression unsupported' raw_bytes = gzip_decode(self.value) @@ -132,6 +133,9 @@ def decompress(self): raw_bytes = lz4_decode_old_kafka(self.value) else: raw_bytes = lz4_decode(self.value) + elif codec == self.CODEC_ZSTD: + assert has_zstd(), "ZSTD decompression unsupported" + raw_bytes = zstd_decode(self.value) else: raise Exception('This should be impossible') diff --git a/kafka/record/default_records.py b/kafka/record/default_records.py index 917c81cb8..a098c42a9 100644 --- a/kafka/record/default_records.py +++ b/kafka/record/default_records.py @@ -189,7 +189,7 @@ def _maybe_uncompress(self): if compression_type == self.CODEC_LZ4: uncompressed = lz4_decode(data.tobytes()) if compression_type == self.CODEC_ZSTD: - uncompressed = zstd_decode(data) + uncompressed = zstd_decode(data.tobytes()) self._buffer = bytearray(uncompressed) self._pos = 0 self._decompressed = True diff --git a/test/test_producer.py b/test/test_producer.py index af8fc26f3..7263130d1 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -38,7 +38,6 @@ def test_end_to_end(kafka_broker, compression): producer = KafkaProducer(bootstrap_servers=connect_str, retries=5, max_block_ms=30000, - api_version=env_kafka_version(), compression_type=compression, value_serializer=str.encode) consumer = KafkaConsumer(bootstrap_servers=connect_str, @@ -89,7 +88,6 @@ def test_kafka_producer_proper_record_metadata(kafka_broker, compression): connect_str = ':'.join([kafka_broker.host, str(kafka_broker.port)]) producer = KafkaProducer(bootstrap_servers=connect_str, retries=5, - api_version=env_kafka_version(), max_block_ms=30000, compression_type=compression) magic = producer._max_usable_produce_magic()