Skip to content

Commit 98c0058

Browse files
authored
Cleanup handling of KAFKA_VERSION env var in tests (#1887)
Now that we are using `pytest`, there is no need for a custom decorator because we can use `pytest.mark.skipif()`. This makes the code significantly simpler. In particular, dropping the custom `@kafka_versions()` decorator is necessary because it uses `func.wraps()` which doesn't play nice with `pytest` fixtures: - pytest-dev/pytest#677 - https://stackoverflow.com/a/19614807/770425 So this is a pre-requisite to migrating some of those tests to using pytest fixtures.
1 parent e49caeb commit 98c0058

10 files changed

+65
-142
lines changed

test/conftest.py

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,8 @@
22

33
import pytest
44

5-
from test.fixtures import KafkaFixture, ZookeeperFixture, random_string, version as kafka_version
6-
7-
8-
@pytest.fixture(scope="module")
9-
def version():
10-
"""Return the Kafka version set in the OS environment"""
11-
return kafka_version()
12-
5+
from test.testutil import env_kafka_version, random_string
6+
from test.fixtures import KafkaFixture, ZookeeperFixture
137

148
@pytest.fixture(scope="module")
159
def zookeeper():
@@ -26,9 +20,9 @@ def kafka_broker(kafka_broker_factory):
2620

2721

2822
@pytest.fixture(scope="module")
29-
def kafka_broker_factory(version, zookeeper):
23+
def kafka_broker_factory(zookeeper):
3024
"""Return a Kafka broker fixture factory"""
31-
assert version, 'KAFKA_VERSION must be specified to run integration tests'
25+
assert env_kafka_version(), 'KAFKA_VERSION must be specified to run integration tests'
3226

3327
_brokers = []
3428
def factory(**broker_params):

test/fixtures.py

Lines changed: 3 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,7 @@
44
import logging
55
import os
66
import os.path
7-
import random
87
import socket
9-
import string
108
import subprocess
119
import time
1210
import uuid
@@ -19,29 +17,12 @@
1917
from kafka.client_async import KafkaClient
2018
from kafka.protocol.admin import CreateTopicsRequest
2119
from kafka.protocol.metadata import MetadataRequest
20+
from test.testutil import env_kafka_version, random_string
2221
from test.service import ExternalService, SpawnedService
2322

2423
log = logging.getLogger(__name__)
2524

2625

27-
def random_string(length):
28-
return "".join(random.choice(string.ascii_letters) for i in range(length))
29-
30-
31-
def version_str_to_tuple(version_str):
32-
"""Transform a version string into a tuple.
33-
34-
Example: '0.8.1.1' --> (0, 8, 1, 1)
35-
"""
36-
return tuple(map(int, version_str.split('.')))
37-
38-
39-
def version():
40-
if 'KAFKA_VERSION' not in os.environ:
41-
return ()
42-
return version_str_to_tuple(os.environ['KAFKA_VERSION'])
43-
44-
4526
def get_open_port():
4627
sock = socket.socket()
4728
sock.bind(("", 0))
@@ -477,7 +458,7 @@ def _create_topic(self, topic_name, num_partitions, replication_factor, timeout_
477458
num_partitions == self.partitions and \
478459
replication_factor == self.replicas:
479460
self._send_request(MetadataRequest[0]([topic_name]))
480-
elif version() >= (0, 10, 1, 0):
461+
elif env_kafka_version() >= (0, 10, 1, 0):
481462
request = CreateTopicsRequest[0]([(topic_name, num_partitions,
482463
replication_factor, [], [])], timeout_ms)
483464
result = self._send_request(request, timeout=timeout_ms)
@@ -497,7 +478,7 @@ def _create_topic(self, topic_name, num_partitions, replication_factor, timeout_
497478
'--replication-factor', self.replicas \
498479
if replication_factor is None \
499480
else replication_factor)
500-
if version() >= (0, 10):
481+
if env_kafka_version() >= (0, 10):
501482
args.append('--if-not-exists')
502483
env = self.kafka_run_class_env()
503484
proc = subprocess.Popen(args, env=env, stdout=subprocess.PIPE, stderr=subprocess.PIPE)

test/test_client_integration.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,15 @@
11
import os
22

3+
import pytest
4+
35
from kafka.errors import KafkaTimeoutError
46
from kafka.protocol import create_message
57
from kafka.structs import (
68
FetchRequestPayload, OffsetCommitRequestPayload, OffsetFetchRequestPayload,
79
ProduceRequestPayload)
810

911
from test.fixtures import ZookeeperFixture, KafkaFixture
10-
from test.testutil import KafkaIntegrationTestCase, kafka_versions
12+
from test.testutil import KafkaIntegrationTestCase, env_kafka_version
1113

1214

1315
class TestKafkaClientIntegration(KafkaIntegrationTestCase):
@@ -80,7 +82,7 @@ def test_send_produce_request_maintains_request_response_order(self):
8082
# Offset Tests #
8183
####################
8284

83-
@kafka_versions('>=0.8.1')
85+
@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
8486
def test_commit_fetch_offsets(self):
8587
req = OffsetCommitRequestPayload(self.topic, 0, 42, 'metadata')
8688
(resp,) = self.client.send_offset_commit_request('group', [req])

test/test_codec.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@
1414
lz4_encode_old_kafka, lz4_decode_old_kafka,
1515
)
1616

17-
from test.fixtures import random_string
17+
from test.testutil import random_string
1818

1919

2020
def test_gzip():

test/test_consumer_group.py

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -11,15 +11,15 @@
1111
from kafka.coordinator.base import MemberState
1212
from kafka.structs import TopicPartition
1313

14-
from test.fixtures import random_string, version
14+
from test.testutil import env_kafka_version, random_string
1515

1616

1717
def get_connect_str(kafka_broker):
1818
return kafka_broker.host + ':' + str(kafka_broker.port)
1919

2020

21-
@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
22-
def test_consumer(kafka_broker, topic, version):
21+
@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
22+
def test_consumer(kafka_broker, topic):
2323
# The `topic` fixture is included because
2424
# 0.8.2 brokers need a topic to function well
2525
consumer = KafkaConsumer(bootstrap_servers=get_connect_str(kafka_broker))
@@ -29,17 +29,16 @@ def test_consumer(kafka_broker, topic, version):
2929
assert consumer._client._conns[node_id].state is ConnectionStates.CONNECTED
3030
consumer.close()
3131

32-
@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
33-
def test_consumer_topics(kafka_broker, topic, version):
32+
@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
33+
def test_consumer_topics(kafka_broker, topic):
3434
consumer = KafkaConsumer(bootstrap_servers=get_connect_str(kafka_broker))
3535
# Necessary to drive the IO
3636
consumer.poll(500)
3737
assert topic in consumer.topics()
3838
assert len(consumer.partitions_for_topic(topic)) > 0
3939
consumer.close()
4040

41-
@pytest.mark.skipif(version() < (0, 9), reason='Unsupported Kafka Version')
42-
@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
41+
@pytest.mark.skipif(env_kafka_version() < (0, 9), reason='Unsupported Kafka Version')
4342
def test_group(kafka_broker, topic):
4443
num_partitions = 4
4544
connect_str = get_connect_str(kafka_broker)
@@ -129,7 +128,7 @@ def consumer_thread(i):
129128
threads[c] = None
130129

131130

132-
@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
131+
@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
133132
def test_paused(kafka_broker, topic):
134133
consumer = KafkaConsumer(bootstrap_servers=get_connect_str(kafka_broker))
135134
topics = [TopicPartition(topic, 1)]
@@ -148,8 +147,7 @@ def test_paused(kafka_broker, topic):
148147
consumer.close()
149148

150149

151-
@pytest.mark.skipif(version() < (0, 9), reason='Unsupported Kafka Version')
152-
@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
150+
@pytest.mark.skipif(env_kafka_version() < (0, 9), reason='Unsupported Kafka Version')
153151
def test_heartbeat_thread(kafka_broker, topic):
154152
group_id = 'test-group-' + random_string(6)
155153
consumer = KafkaConsumer(topic,

test/test_consumer_integration.py

Lines changed: 21 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,18 @@
11
import logging
22
import os
33
import time
4-
from mock import patch
5-
import pytest
6-
import kafka.codec
74

5+
from mock import patch
86
import pytest
9-
from kafka.vendor.six.moves import range
107
from kafka.vendor import six
8+
from kafka.vendor.six.moves import range
119

1210
from . import unittest
1311
from kafka import (
1412
KafkaConsumer, MultiProcessConsumer, SimpleConsumer, create_message,
1513
create_gzip_message, KafkaProducer
1614
)
15+
import kafka.codec
1716
from kafka.consumer.base import MAX_FETCH_BUFFER_SIZE_BYTES
1817
from kafka.errors import (
1918
ConsumerFetchSizeTooSmall, OffsetOutOfRangeError, UnsupportedVersionError,
@@ -23,11 +22,11 @@
2322
ProduceRequestPayload, TopicPartition, OffsetAndTimestamp
2423
)
2524

26-
from test.fixtures import ZookeeperFixture, KafkaFixture, random_string, version
27-
from test.testutil import KafkaIntegrationTestCase, kafka_versions, Timer
25+
from test.fixtures import ZookeeperFixture, KafkaFixture
26+
from test.testutil import KafkaIntegrationTestCase, Timer, env_kafka_version, random_string
2827

2928

30-
@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
29+
@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
3130
def test_kafka_consumer(kafka_producer, topic, kafka_consumer_factory):
3231
"""Test KafkaConsumer"""
3332
kafka_consumer = kafka_consumer_factory(auto_offset_reset='earliest')
@@ -54,7 +53,7 @@ def test_kafka_consumer(kafka_producer, topic, kafka_consumer_factory):
5453
kafka_consumer.close()
5554

5655

57-
@pytest.mark.skipif(not version(), reason="No KAFKA_VERSION set")
56+
@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
5857
def test_kafka_consumer_unsupported_encoding(
5958
topic, kafka_producer_factory, kafka_consumer_factory):
6059
# Send a compressed message
@@ -211,7 +210,7 @@ def test_simple_consumer_no_reset(self):
211210
with self.assertRaises(OffsetOutOfRangeError):
212211
consumer.get_message()
213212

214-
@kafka_versions('>=0.8.1')
213+
@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
215214
def test_simple_consumer_load_initial_offsets(self):
216215
self.send_messages(0, range(0, 100))
217216
self.send_messages(1, range(100, 200))
@@ -388,7 +387,7 @@ def test_multi_proc_pending(self):
388387
consumer.stop()
389388

390389
@unittest.skip('MultiProcessConsumer deprecated and these tests are flaky')
391-
@kafka_versions('>=0.8.1')
390+
@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
392391
def test_multi_process_consumer_load_initial_offsets(self):
393392
self.send_messages(0, range(0, 10))
394393
self.send_messages(1, range(10, 20))
@@ -459,7 +458,7 @@ def test_huge_messages(self):
459458

460459
big_consumer.stop()
461460

462-
@kafka_versions('>=0.8.1')
461+
@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
463462
def test_offset_behavior__resuming_behavior(self):
464463
self.send_messages(0, range(0, 100))
465464
self.send_messages(1, range(100, 200))
@@ -491,7 +490,7 @@ def test_offset_behavior__resuming_behavior(self):
491490
consumer2.stop()
492491

493492
@unittest.skip('MultiProcessConsumer deprecated and these tests are flaky')
494-
@kafka_versions('>=0.8.1')
493+
@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
495494
def test_multi_process_offset_behavior__resuming_behavior(self):
496495
self.send_messages(0, range(0, 100))
497496
self.send_messages(1, range(100, 200))
@@ -548,6 +547,7 @@ def test_fetch_buffer_size(self):
548547
messages = [ message for message in consumer ]
549548
self.assertEqual(len(messages), 2)
550549

550+
@pytest.mark.skipif(not env_kafka_version(), reason="No KAFKA_VERSION set")
551551
def test_kafka_consumer__blocking(self):
552552
TIMEOUT_MS = 500
553553
consumer = self.kafka_consumer(auto_offset_reset='earliest',
@@ -586,7 +586,7 @@ def test_kafka_consumer__blocking(self):
586586
self.assertGreaterEqual(t.interval, TIMEOUT_MS / 1000.0 )
587587
consumer.close()
588588

589-
@kafka_versions('>=0.8.1')
589+
@pytest.mark.skipif(env_kafka_version() < (0, 8, 1), reason="Requires KAFKA_VERSION >= 0.8.1")
590590
def test_kafka_consumer__offset_commit_resume(self):
591591
GROUP_ID = random_string(10)
592592

@@ -605,7 +605,7 @@ def test_kafka_consumer__offset_commit_resume(self):
605605
output_msgs1 = []
606606
for _ in range(180):
607607
m = next(consumer1)
608-
output_msgs1.append(m)
608+
output_msgs1.append((m.key, m.value))
609609
self.assert_message_count(output_msgs1, 180)
610610
consumer1.close()
611611

@@ -621,12 +621,12 @@ def test_kafka_consumer__offset_commit_resume(self):
621621
output_msgs2 = []
622622
for _ in range(20):
623623
m = next(consumer2)
624-
output_msgs2.append(m)
624+
output_msgs2.append((m.key, m.value))
625625
self.assert_message_count(output_msgs2, 20)
626626
self.assertEqual(len(set(output_msgs1) | set(output_msgs2)), 200)
627627
consumer2.close()
628628

629-
@kafka_versions('>=0.10.1')
629+
@pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1")
630630
def test_kafka_consumer_max_bytes_simple(self):
631631
self.send_messages(0, range(100, 200))
632632
self.send_messages(1, range(200, 300))
@@ -647,7 +647,7 @@ def test_kafka_consumer_max_bytes_simple(self):
647647
TopicPartition(self.topic, 0), TopicPartition(self.topic, 1)]))
648648
consumer.close()
649649

650-
@kafka_versions('>=0.10.1')
650+
@pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1")
651651
def test_kafka_consumer_max_bytes_one_msg(self):
652652
# We send to only 1 partition so we don't have parallel requests to 2
653653
# nodes for data.
@@ -673,7 +673,7 @@ def test_kafka_consumer_max_bytes_one_msg(self):
673673
self.assertEqual(len(fetched_msgs), 10)
674674
consumer.close()
675675

676-
@kafka_versions('>=0.10.1')
676+
@pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1")
677677
def test_kafka_consumer_offsets_for_time(self):
678678
late_time = int(time.time()) * 1000
679679
middle_time = late_time - 1000
@@ -727,7 +727,7 @@ def test_kafka_consumer_offsets_for_time(self):
727727
})
728728
consumer.close()
729729

730-
@kafka_versions('>=0.10.1')
730+
@pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1")
731731
def test_kafka_consumer_offsets_search_many_partitions(self):
732732
tp0 = TopicPartition(self.topic, 0)
733733
tp1 = TopicPartition(self.topic, 1)
@@ -766,15 +766,15 @@ def test_kafka_consumer_offsets_search_many_partitions(self):
766766
})
767767
consumer.close()
768768

769-
@kafka_versions('<0.10.1')
769+
@pytest.mark.skipif(env_kafka_version() >= (0, 10, 1), reason="Requires KAFKA_VERSION < 0.10.1")
770770
def test_kafka_consumer_offsets_for_time_old(self):
771771
consumer = self.kafka_consumer()
772772
tp = TopicPartition(self.topic, 0)
773773

774774
with self.assertRaises(UnsupportedVersionError):
775775
consumer.offsets_for_times({tp: int(time.time())})
776776

777-
@kafka_versions('>=0.10.1')
777+
@pytest.mark.skipif(env_kafka_version() < (0, 10, 1), reason="Requires KAFKA_VERSION >= 0.10.1")
778778
def test_kafka_consumer_offsets_for_times_errors(self):
779779
consumer = self.kafka_consumer(fetch_max_wait_ms=200,
780780
request_timeout_ms=500)

test/test_failover_integration.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,8 +9,8 @@
99
from kafka.producer.base import Producer
1010
from kafka.structs import TopicPartition
1111

12-
from test.fixtures import ZookeeperFixture, KafkaFixture, random_string
13-
from test.testutil import KafkaIntegrationTestCase
12+
from test.fixtures import ZookeeperFixture, KafkaFixture
13+
from test.testutil import KafkaIntegrationTestCase, random_string
1414

1515

1616
log = logging.getLogger(__name__)

0 commit comments

Comments
 (0)