Skip to content

Fix describe config for multi-broker clusters #1869

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 56 additions & 14 deletions kafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import logging
import socket

from . import ConfigResourceType
from kafka.vendor import six

from kafka.client_async import KafkaClient, selectors
Expand Down Expand Up @@ -763,29 +764,70 @@ def describe_configs(self, config_resources, include_synonyms=False):
supported by all versions. Default: False.
:return: Appropriate version of DescribeConfigsResponse class.
"""

# Break up requests by type - a broker config request must be sent to the specific broker.
# All other (currently just topic resources) can be sent to any broker.
broker_resources = []
topic_resources = []

for config_resource in config_resources:
if config_resource.resource_type == ConfigResourceType.BROKER:
broker_resources.append(self._convert_describe_config_resource_request(config_resource))
else:
topic_resources.append(self._convert_describe_config_resource_request(config_resource))

futures = []
version = self._matching_api_version(DescribeConfigsRequest)
if version == 0:
if include_synonyms:
raise IncompatibleBrokerVersion(
"include_synonyms requires DescribeConfigsRequest >= v1, which is not supported by Kafka {}."
.format(self.config['api_version']))
request = DescribeConfigsRequest[version](
resources=[self._convert_describe_config_resource_request(config_resource) for config_resource in config_resources]
)
.format(self.config['api_version']))

if len(broker_resources) > 0:
for broker_resource in broker_resources:
try:
broker_id = int(broker_resource[1])
except ValueError:
raise ValueError("Broker resource names must be an integer or a string represented integer")

futures.append(self._send_request_to_node(
broker_id,
DescribeConfigsRequest[version](resources=[broker_resource])
))

if len(topic_resources) > 0:
futures.append(self._send_request_to_node(
self._client.least_loaded_node(),
DescribeConfigsRequest[version](resources=topic_resources)
))

elif version == 1:
request = DescribeConfigsRequest[version](
resources=[self._convert_describe_config_resource_request(config_resource) for config_resource in config_resources],
include_synonyms=include_synonyms
)
if len(broker_resources) > 0:
for broker_resource in broker_resources:
try:
broker_id = int(broker_resource[1])
except ValueError:
raise ValueError("Broker resource names must be an integer or a string represented integer")

futures.append(self._send_request_to_node(
broker_id,
DescribeConfigsRequest[version](
resources=[broker_resource],
include_synonyms=include_synonyms)
))

if len(topic_resources) > 0:
futures.append(self._send_request_to_node(
self._client.least_loaded_node(),
DescribeConfigsRequest[version](resources=topic_resources, include_synonyms=include_synonyms)
))
else:
raise NotImplementedError(
"Support for DescribeConfigs v{} has not yet been added to KafkaAdminClient."
.format(version))
future = self._send_request_to_node(self._client.least_loaded_node(), request)
"Support for DescribeConfigs v{} has not yet been added to KafkaAdminClient.".format(version))

self._wait_for_futures([future])
response = future.value
return response
self._wait_for_futures(futures)
return [f.value for f in futures]

@staticmethod
def _convert_alter_config_resource_request(config_resource):
Expand Down
57 changes: 56 additions & 1 deletion test/test_admin_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@
from test.testutil import env_kafka_version

from kafka.errors import NoError
from kafka.admin import ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL
from kafka.admin import (
ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL, ConfigResource, ConfigResourceType)


@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="ACL features require broker >=0.11")
Expand Down Expand Up @@ -80,3 +81,57 @@ def test_create_describe_delete_acls(kafka_admin_client):

assert error is NoError
assert len(acls) == 0


@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Describe config features require broker >=0.11")
def test_describe_configs_broker_resource_returns_configs(kafka_admin_client):
"""Tests that describe config returns configs for broker
"""
broker_id = kafka_admin_client._client.cluster._brokers[0].nodeId
configs = kafka_admin_client.describe_configs([ConfigResource(ConfigResourceType.BROKER, broker_id)])

assert len(configs) == 1
assert configs[0].resources[0][2] == ConfigResourceType.BROKER
assert configs[0].resources[0][3] == str(broker_id)
assert len(configs[0].resources[0][4]) > 1


@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Describe config features require broker >=0.11")
def test_describe_configs_topic_resource_returns_configs(topic, kafka_admin_client):
"""Tests that describe config returns configs for topic
"""
configs = kafka_admin_client.describe_configs([ConfigResource(ConfigResourceType.TOPIC, topic)])

assert len(configs) == 1
assert configs[0].resources[0][2] == ConfigResourceType.TOPIC
assert configs[0].resources[0][3] == topic
assert len(configs[0].resources[0][4]) > 1


@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Describe config features require broker >=0.11")
def test_describe_configs_mixed_resources_returns_configs(topic, kafka_admin_client):
"""Tests that describe config returns configs for mixed resource types (topic + broker)
"""
broker_id = kafka_admin_client._client.cluster._brokers[0].nodeId
configs = kafka_admin_client.describe_configs([
ConfigResource(ConfigResourceType.TOPIC, topic),
ConfigResource(ConfigResourceType.BROKER, broker_id)])

assert len(configs) == 2

for config in configs:
assert (config.resources[0][2] == ConfigResourceType.TOPIC
and config.resources[0][3] == topic) or \
(config.resources[0][2] == ConfigResourceType.BROKER
and config.resources[0][3] == str(broker_id))
assert len(config.resources[0][4]) > 1


@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Describe config features require broker >=0.11")
def test_describe_configs_invalid_broker_id_raises(kafka_admin_client):
"""Tests that describe config raises exception on non-integer broker id
"""
broker_id = "str"

with pytest.raises(ValueError):
configs = kafka_admin_client.describe_configs([ConfigResource(ConfigResourceType.BROKER, broker_id)])