|
3 | 3 | from test.testutil import env_kafka_version
|
4 | 4 |
|
5 | 5 | from kafka.errors import NoError
|
6 |
| -from kafka.admin import ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL |
7 |
| - |
| 6 | +from kafka.admin import ( |
| 7 | + ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL, ConfigResource, ConfigResourceType) |
8 | 8 |
|
| 9 | + |
9 | 10 | @pytest.mark.skipif(env_kafka_version() < (0, 11), reason="ACL features require broker >=0.11")
|
10 | 11 | def test_create_describe_delete_acls(kafka_admin_client):
|
11 | 12 | """Tests that we can add, list and remove ACLs
|
@@ -80,3 +81,45 @@ def test_create_describe_delete_acls(kafka_admin_client):
|
80 | 81 |
|
81 | 82 | assert error is NoError
|
82 | 83 | assert len(acls) == 0
|
| 84 | + |
| 85 | + |
| 86 | +@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Describe config features require broker >=0.11") |
| 87 | +def test_describe_configs_broker_resource_returns_configs(simple_client, kafka_admin_client): |
| 88 | + """Tests that describe config returns configs for broker |
| 89 | + """ |
| 90 | + broker_id = simple_client.brokers[0].nodeId |
| 91 | + configs = kafka_admin_client.describe_configs([ConfigResource(ConfigResourceType.BROKER, broker_id)]) |
| 92 | + |
| 93 | + assert len(configs) == 1 |
| 94 | + assert configs[0].resources[0][2] == ConfigResourceType.BROKER |
| 95 | + assert configs[0].resources[0][3] == str(broker_id) |
| 96 | + assert len(configs[0].resources[0][4]) > 1 |
| 97 | + |
| 98 | + |
| 99 | +@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Describe config features require broker >=0.11") |
| 100 | +def test_describe_configs_topic_resource_returns_configs(simple_client, kafka_admin_client): |
| 101 | + topic = simple_client.topics[0] |
| 102 | + configs = kafka_admin_client.describe_configs([ConfigResource(ConfigResourceType.TOPIC, topic)]) |
| 103 | + |
| 104 | + assert len(configs) == 1 |
| 105 | + assert configs[0].resources[0][2] == ConfigResourceType.TOPIC |
| 106 | + assert configs[0].resources[0][3] == topic |
| 107 | + assert len(configs[0].resources[0][4]) > 1 |
| 108 | + |
| 109 | + |
| 110 | +@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Describe config features require broker >=0.11") |
| 111 | +def test_describe_configs_mixed_resources_returns_configs(simple_client, kafka_admin_client): |
| 112 | + topic = simple_client.topics[0] |
| 113 | + broker_id = simple_client.brokers[0].nodeId |
| 114 | + configs = kafka_admin_client.describe_configs([ |
| 115 | + ConfigResource(ConfigResourceType.TOPIC, topic), |
| 116 | + ConfigResource(ConfigResourceType.BROKER, broker_id)]) |
| 117 | + |
| 118 | + assert len(configs) == 2 |
| 119 | + |
| 120 | + for config in configs: |
| 121 | + assert (config.resources[0][2] == ConfigResourceType.TOPIC |
| 122 | + and config.resources[0][3] == topic) or \ |
| 123 | + (config.resources[0][2] == ConfigResourceType.BROKER |
| 124 | + and config.resources[0][3] == str(broker_id)) |
| 125 | + assert len(config.resources[0][4]) > 1 |
0 commit comments