Skip to content

Commit 84e37e0

Browse files
ulrikjohanssonjeffwidman
authored andcommitted
convert test_admin_integration to pytest (#1923)
1 parent f1cda98 commit 84e37e0

File tree

3 files changed

+90
-103
lines changed

3 files changed

+90
-103
lines changed

test/conftest.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,25 @@ def factory(**kafka_producer_params):
105105
if _producer[0]:
106106
_producer[0].close()
107107

108+
@pytest.fixture
109+
def kafka_admin_client(kafka_admin_client_factory):
110+
"""Return a KafkaAdminClient fixture"""
111+
yield kafka_admin_client_factory()
112+
113+
@pytest.fixture
114+
def kafka_admin_client_factory(kafka_broker):
115+
"""Return a KafkaAdminClient factory fixture"""
116+
_admin_client = [None]
117+
118+
def factory(**kafka_admin_client_params):
119+
params = {} if kafka_admin_client_params is None else kafka_admin_client_params.copy()
120+
_admin_client[0] = next(kafka_broker.get_admin_clients(cnt=1, **params))
121+
return _admin_client[0]
122+
123+
yield factory
124+
125+
if _admin_client[0]:
126+
_admin_client[0].close()
108127

109128
@pytest.fixture
110129
def topic(kafka_broker, request):

test/fixtures.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
from kafka.vendor.six.moves import urllib, range
1414
from kafka.vendor.six.moves.urllib.parse import urlparse # pylint: disable=E0611,F0401
1515

16-
from kafka import errors, KafkaConsumer, KafkaProducer, SimpleClient
16+
from kafka import errors, KafkaConsumer, KafkaProducer, SimpleClient, KafkaAdminClient
1717
from kafka.client_async import KafkaClient
1818
from kafka.protocol.admin import CreateTopicsRequest
1919
from kafka.protocol.metadata import MetadataRequest
@@ -500,6 +500,14 @@ def get_clients(self, cnt=1, client_id=None):
500500
return tuple(KafkaClient(client_id='%s_%s' % (client_id, random_string(4)),
501501
bootstrap_servers=self.bootstrap_server()) for x in range(cnt))
502502

503+
def get_admin_clients(self, cnt=1, **params):
504+
params.setdefault('client_id', 'admin_client')
505+
params['bootstrap_servers'] = self.bootstrap_server()
506+
client_id = params['client_id']
507+
for x in range(cnt):
508+
params['client_id'] = '%s_%s' % (client_id, random_string(4))
509+
yield KafkaAdminClient(**params)
510+
503511
def get_consumers(self, cnt, topics, **params):
504512
params.setdefault('client_id', 'consumer')
505513
params.setdefault('heartbeat_interval_ms', 500)

test/test_admin_integration.py

Lines changed: 62 additions & 102 deletions
Original file line numberDiff line numberDiff line change
@@ -1,122 +1,82 @@
11
import pytest
2-
import os
32

4-
from test.fixtures import ZookeeperFixture, KafkaFixture
5-
from test.testutil import KafkaIntegrationTestCase, env_kafka_version, current_offset
3+
from test.testutil import env_kafka_version
64

75
from kafka.errors import NoError
8-
from kafka.admin import KafkaAdminClient, ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL
6+
from kafka.admin import ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL
97

10-
# This test suite passes for me locally, but fails on travis
11-
# Needs investigation
12-
DISABLED = True
138

14-
# TODO: Convert to pytest / fixtures
15-
# Note that ACL features require broker 0.11, but other admin apis may work on
16-
# earlier broker versions
17-
class TestAdminClientIntegration(KafkaIntegrationTestCase):
18-
@classmethod
19-
def setUpClass(cls): # noqa
20-
if env_kafka_version() < (0, 11) or DISABLED:
21-
return
9+
@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="ACL features require broker >=0.11")
10+
def test_create_describe_delete_acls(kafka_admin_client):
11+
"""Tests that we can add, list and remove ACLs
12+
"""
2213

23-
cls.zk = ZookeeperFixture.instance()
24-
cls.server = KafkaFixture.instance(0, cls.zk)
25-
26-
@classmethod
27-
def tearDownClass(cls): # noqa
28-
if env_kafka_version() < (0, 11) or DISABLED:
29-
return
30-
31-
cls.server.close()
32-
cls.zk.close()
33-
34-
def setUp(self):
35-
if env_kafka_version() < (0, 11) or DISABLED:
36-
self.skipTest('Admin ACL Integration test requires KAFKA_VERSION >= 0.11')
37-
super(TestAdminClientIntegration, self).setUp()
38-
39-
def tearDown(self):
40-
if env_kafka_version() < (0, 11) or DISABLED:
41-
return
42-
super(TestAdminClientIntegration, self).tearDown()
43-
44-
def test_create_describe_delete_acls(self):
45-
"""Tests that we can add, list and remove ACLs
46-
"""
47-
48-
# Setup
49-
brokers = '%s:%d' % (self.server.host, self.server.port)
50-
admin_client = KafkaAdminClient(
51-
bootstrap_servers=brokers
14+
# Check that we don't have any ACLs in the cluster
15+
acls, error = kafka_admin_client.describe_acls(
16+
ACLFilter(
17+
principal=None,
18+
host="*",
19+
operation=ACLOperation.ANY,
20+
permission_type=ACLPermissionType.ANY,
21+
resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic")
5222
)
53-
54-
# Check that we don't have any ACLs in the cluster
55-
acls, error = admin_client.describe_acls(
23+
)
24+
25+
assert error is NoError
26+
assert len(acls) == 0
27+
28+
# Try to add an ACL
29+
acl = ACL(
30+
principal="User:test",
31+
host="*",
32+
operation=ACLOperation.READ,
33+
permission_type=ACLPermissionType.ALLOW,
34+
resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic")
35+
)
36+
result = kafka_admin_client.create_acls([acl])
37+
38+
assert len(result["failed"]) == 0
39+
assert len(result["succeeded"]) == 1
40+
41+
# Check that we can list the ACL we created
42+
acl_filter = ACLFilter(
43+
principal=None,
44+
host="*",
45+
operation=ACLOperation.ANY,
46+
permission_type=ACLPermissionType.ANY,
47+
resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic")
48+
)
49+
acls, error = kafka_admin_client.describe_acls(acl_filter)
50+
51+
assert error is NoError
52+
assert len(acls) == 1
53+
54+
# Remove the ACL
55+
delete_results = kafka_admin_client.delete_acls(
56+
[
5657
ACLFilter(
57-
principal=None,
58+
principal="User:test",
5859
host="*",
59-
operation=ACLOperation.ANY,
60-
permission_type=ACLPermissionType.ANY,
60+
operation=ACLOperation.READ,
61+
permission_type=ACLPermissionType.ALLOW,
6162
resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic")
6263
)
63-
)
64+
]
65+
)
6466

65-
self.assertIs(error, NoError)
66-
self.assertEqual(0, len(acls))
67+
assert len(delete_results) == 1
68+
assert len(delete_results[0][1]) == 1 # Check number of affected ACLs
6769

68-
# Try to add an ACL
69-
acl = ACL(
70-
principal="User:test",
71-
host="*",
72-
operation=ACLOperation.READ,
73-
permission_type=ACLPermissionType.ALLOW,
74-
resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic")
75-
)
76-
result = admin_client.create_acls([acl])
77-
78-
self.assertFalse(len(result["failed"]))
79-
self.assertEqual(len(result["succeeded"]), 1)
80-
81-
# Check that we can list the ACL we created
82-
acl_filter = ACLFilter(
83-
principal=None,
70+
# Make sure the ACL does not exist in the cluster anymore
71+
acls, error = kafka_admin_client.describe_acls(
72+
ACLFilter(
73+
principal="*",
8474
host="*",
8575
operation=ACLOperation.ANY,
8676
permission_type=ACLPermissionType.ANY,
8777
resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic")
8878
)
89-
acls, error = admin_client.describe_acls(acl_filter)
90-
91-
self.assertIs(error, NoError)
92-
self.assertEqual(1, len(acls))
93-
94-
# Remove the ACL
95-
delete_results = admin_client.delete_acls(
96-
[
97-
ACLFilter(
98-
principal="User:test",
99-
host="*",
100-
operation=ACLOperation.READ,
101-
permission_type=ACLPermissionType.ALLOW,
102-
resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic")
103-
)
104-
]
105-
)
79+
)
10680

107-
self.assertEqual(1, len(delete_results))
108-
self.assertEqual(1, len(delete_results[0][1])) # Check number of affected ACLs
109-
110-
111-
# Make sure the ACL does not exist in the cluster anymore
112-
acls, error = admin_client.describe_acls(
113-
ACLFilter(
114-
principal="*",
115-
host="*",
116-
operation=ACLOperation.ANY,
117-
permission_type=ACLPermissionType.ANY,
118-
resource_pattern=ResourcePattern(ResourceType.TOPIC, "topic")
119-
)
120-
)
121-
self.assertIs(error, NoError)
122-
self.assertEqual(0, len(acls))
81+
assert error is NoError
82+
assert len(acls) == 0

0 commit comments

Comments
 (0)