From 0294e4539979e58b84d058db3a989c805359d1bc Mon Sep 17 00:00:00 2001 From: Jeppe Andersen Date: Sun, 28 Jul 2019 20:24:45 +0200 Subject: [PATCH 1/5] Fix describe config for multi-broker clusters Currently all describe config requests are sent to "least loaded node". Requests for broker configs must, however, be sent to the specific broker, otherwise an error is returned. Only topic requests can be handled by any node. This changes the logic to send all describe config requests to the specific broker. --- kafka/admin/client.py | 71 +++++++++++++++++++++++++++++++++++-------- 1 file changed, 59 insertions(+), 12 deletions(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index df85f442b..f2cd0c339 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -5,6 +5,7 @@ import logging import socket +from . import ConfigResourceType from kafka.vendor import six from kafka.client_async import KafkaClient, selectors @@ -763,28 +764,74 @@ def describe_configs(self, config_resources, include_synonyms=False): supported by all versions. Default: False. :return: Appropriate version of DescribeConfigsResponse class. """ + + # Break up requests by type - a broker config request must be sent to the specific broker. + # All other (currently just topic resources) can be sent to any broker. + broker_resources = [] + topic_resources = [] + + for config_resource in config_resources: + if config_resource.resource_type == ConfigResourceType.BROKER: + broker_resources.append(self._convert_describe_config_resource_request(config_resource)) + else: + topic_resources.append(self._convert_describe_config_resource_request(config_resource)) + + futures = [] version = self._matching_api_version(DescribeConfigsRequest) if version == 0: if include_synonyms: raise IncompatibleBrokerVersion( "include_synonyms requires DescribeConfigsRequest >= v1, which is not supported by Kafka {}." - .format(self.config['api_version'])) - request = DescribeConfigsRequest[version]( - resources=[self._convert_describe_config_resource_request(config_resource) for config_resource in config_resources] - ) + .format(self.config['api_version'])) + + if len(broker_resources) > 0: + for broker_resource in broker_resources: + try: + futures.append(self._send_request_to_node( + int(broker_resource[1]), + DescribeConfigsRequest[version](resources=[broker_resource]) + )) + except ValueError: + raise ValueError("Broker resource names must be an integer or a string represented integer") + + if len(topic_resources) > 0: + futures.append(self._send_request_to_node( + self._client.least_loaded_node(), + DescribeConfigsRequest[version](resources=topic_resources) + )) + elif version == 1: - request = DescribeConfigsRequest[version]( - resources=[self._convert_describe_config_resource_request(config_resource) for config_resource in config_resources], - include_synonyms=include_synonyms - ) + if len(broker_resources) > 0: + for broker_resource in broker_resources: + try: + futures.append(self._send_request_to_node( + int(broker_resource[1]), + DescribeConfigsRequest[version]( + resources=[broker_resource], + include_synonyms=include_synonyms) + )) + except ValueError: + raise ValueError("Broker resource names must be an integer or a string represented integer") + + if len(topic_resources) > 0: + futures.append(self._send_request_to_node( + self._client.least_loaded_node(), + DescribeConfigsRequest[version](resources=topic_resources, include_synonyms=include_synonyms) + )) else: raise NotImplementedError( "Support for DescribeConfigs v{} has not yet been added to KafkaAdminClient." - .format(version)) - future = self._send_request_to_node(self._client.least_loaded_node(), request) + .format(version)) + + self._wait_for_futures(futures) + + # Use one of the results as the general response and add all other resources to it + response = copy.copy(futures[0]) + response.resources = [] + + for future in futures: + response.resources.extend(future.value.resources) - self._wait_for_futures([future]) - response = future.value return response @staticmethod From c65a5e9f32ce654bf0a064955d7cfb6ac1a74724 Mon Sep 17 00:00:00 2001 From: Jeppe Andersen Date: Wed, 4 Sep 2019 19:49:16 +0200 Subject: [PATCH 2/5] Copy response value rather than future object --- kafka/admin/client.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index f2cd0c339..3230ff24f 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -826,7 +826,7 @@ def describe_configs(self, config_resources, include_synonyms=False): self._wait_for_futures(futures) # Use one of the results as the general response and add all other resources to it - response = copy.copy(futures[0]) + response = copy.copy(futures[0].value) response.resources = [] for future in futures: From cb9e6645645529834ba0b245859d5df914d0cc42 Mon Sep 17 00:00:00 2001 From: Jeppe Andersen Date: Tue, 8 Oct 2019 18:11:41 +0200 Subject: [PATCH 3/5] Return list of describe config responses instead of merging them --- kafka/admin/client.py | 13 ++-------- test/test_admin_integration.py | 47 ++++++++++++++++++++++++++++++++-- 2 files changed, 47 insertions(+), 13 deletions(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 3230ff24f..ff82fed48 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -820,19 +820,10 @@ def describe_configs(self, config_resources, include_synonyms=False): )) else: raise NotImplementedError( - "Support for DescribeConfigs v{} has not yet been added to KafkaAdminClient." - .format(version)) + "Support for DescribeConfigs v{} has not yet been added to KafkaAdminClient.".format(version)) self._wait_for_futures(futures) - - # Use one of the results as the general response and add all other resources to it - response = copy.copy(futures[0].value) - response.resources = [] - - for future in futures: - response.resources.extend(future.value.resources) - - return response + return [f.value for f in futures] @staticmethod def _convert_alter_config_resource_request(config_resource): diff --git a/test/test_admin_integration.py b/test/test_admin_integration.py index 3efa021a8..453028f67 100644 --- a/test/test_admin_integration.py +++ b/test/test_admin_integration.py @@ -3,9 +3,10 @@ from test.testutil import env_kafka_version from kafka.errors import NoError -from kafka.admin import ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL - +from kafka.admin import ( + ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL, ConfigResource, ConfigResourceType) + @pytest.mark.skipif(env_kafka_version() < (0, 11), reason="ACL features require broker >=0.11") def test_create_describe_delete_acls(kafka_admin_client): """Tests that we can add, list and remove ACLs @@ -80,3 +81,45 @@ def test_create_describe_delete_acls(kafka_admin_client): assert error is NoError assert len(acls) == 0 + + +@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Describe config features require broker >=0.11") +def test_describe_configs_broker_resource_returns_configs(simple_client, kafka_admin_client): + """Tests that describe config returns configs for broker + """ + broker_id = simple_client.brokers[0].nodeId + configs = kafka_admin_client.describe_configs([ConfigResource(ConfigResourceType.BROKER, broker_id)]) + + assert len(configs) == 1 + assert configs[0].resources[0][2] == ConfigResourceType.BROKER + assert configs[0].resources[0][3] == str(broker_id) + assert len(configs[0].resources[0][4]) > 1 + + +@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Describe config features require broker >=0.11") +def test_describe_configs_topic_resource_returns_configs(simple_client, kafka_admin_client): + topic = simple_client.topics[0] + configs = kafka_admin_client.describe_configs([ConfigResource(ConfigResourceType.TOPIC, topic)]) + + assert len(configs) == 1 + assert configs[0].resources[0][2] == ConfigResourceType.TOPIC + assert configs[0].resources[0][3] == topic + assert len(configs[0].resources[0][4]) > 1 + + +@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Describe config features require broker >=0.11") +def test_describe_configs_mixed_resources_returns_configs(simple_client, kafka_admin_client): + topic = simple_client.topics[0] + broker_id = simple_client.brokers[0].nodeId + configs = kafka_admin_client.describe_configs([ + ConfigResource(ConfigResourceType.TOPIC, topic), + ConfigResource(ConfigResourceType.BROKER, broker_id)]) + + assert len(configs) == 2 + + for config in configs: + assert (config.resources[0][2] == ConfigResourceType.TOPIC + and config.resources[0][3] == topic) or \ + (config.resources[0][2] == ConfigResourceType.BROKER + and config.resources[0][3] == str(broker_id)) + assert len(config.resources[0][4]) > 1 From 43a3644e2d976e12056ae7c9eaf7e73ff5f6a1ee Mon Sep 17 00:00:00 2001 From: Jeppe Andersen Date: Wed, 9 Oct 2019 17:44:33 +0200 Subject: [PATCH 4/5] Extract and validate broker id in isolation Also removes dependency on simple_client in tests --- kafka/admin/client.py | 24 ++++++++++++++---------- test/test_admin_integration.py | 28 ++++++++++++++++++++-------- 2 files changed, 34 insertions(+), 18 deletions(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index ff82fed48..bb1e2b5cf 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -787,13 +787,15 @@ def describe_configs(self, config_resources, include_synonyms=False): if len(broker_resources) > 0: for broker_resource in broker_resources: try: - futures.append(self._send_request_to_node( - int(broker_resource[1]), - DescribeConfigsRequest[version](resources=[broker_resource]) - )) + broker_id = int(broker_resource[1]) except ValueError: raise ValueError("Broker resource names must be an integer or a string represented integer") + futures.append(self._send_request_to_node( + broker_id, + DescribeConfigsRequest[version](resources=[broker_resource]) + )) + if len(topic_resources) > 0: futures.append(self._send_request_to_node( self._client.least_loaded_node(), @@ -804,15 +806,17 @@ def describe_configs(self, config_resources, include_synonyms=False): if len(broker_resources) > 0: for broker_resource in broker_resources: try: - futures.append(self._send_request_to_node( - int(broker_resource[1]), - DescribeConfigsRequest[version]( - resources=[broker_resource], - include_synonyms=include_synonyms) - )) + broker_id = int(broker_resource[1]) except ValueError: raise ValueError("Broker resource names must be an integer or a string represented integer") + futures.append(self._send_request_to_node( + broker_id, + DescribeConfigsRequest[version]( + resources=[broker_resource], + include_synonyms=include_synonyms) + )) + if len(topic_resources) > 0: futures.append(self._send_request_to_node( self._client.least_loaded_node(), diff --git a/test/test_admin_integration.py b/test/test_admin_integration.py index 453028f67..a81b039ea 100644 --- a/test/test_admin_integration.py +++ b/test/test_admin_integration.py @@ -6,7 +6,7 @@ from kafka.admin import ( ACLFilter, ACLOperation, ACLPermissionType, ResourcePattern, ResourceType, ACL, ConfigResource, ConfigResourceType) - + @pytest.mark.skipif(env_kafka_version() < (0, 11), reason="ACL features require broker >=0.11") def test_create_describe_delete_acls(kafka_admin_client): """Tests that we can add, list and remove ACLs @@ -84,10 +84,10 @@ def test_create_describe_delete_acls(kafka_admin_client): @pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Describe config features require broker >=0.11") -def test_describe_configs_broker_resource_returns_configs(simple_client, kafka_admin_client): +def test_describe_configs_broker_resource_returns_configs(kafka_admin_client): """Tests that describe config returns configs for broker """ - broker_id = simple_client.brokers[0].nodeId + broker_id = kafka_admin_client._client.cluster._brokers[0].nodeId configs = kafka_admin_client.describe_configs([ConfigResource(ConfigResourceType.BROKER, broker_id)]) assert len(configs) == 1 @@ -97,8 +97,9 @@ def test_describe_configs_broker_resource_returns_configs(simple_client, kafka_a @pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Describe config features require broker >=0.11") -def test_describe_configs_topic_resource_returns_configs(simple_client, kafka_admin_client): - topic = simple_client.topics[0] +def test_describe_configs_topic_resource_returns_configs(kafka_admin_client, topic): + """Tests that describe config returns configs for topic + """ configs = kafka_admin_client.describe_configs([ConfigResource(ConfigResourceType.TOPIC, topic)]) assert len(configs) == 1 @@ -108,9 +109,10 @@ def test_describe_configs_topic_resource_returns_configs(simple_client, kafka_ad @pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Describe config features require broker >=0.11") -def test_describe_configs_mixed_resources_returns_configs(simple_client, kafka_admin_client): - topic = simple_client.topics[0] - broker_id = simple_client.brokers[0].nodeId +def test_describe_configs_mixed_resources_returns_configs(kafka_admin_client, topic): + """Tests that describe config returns configs for mixed resource types (topic + broker) + """ + broker_id = kafka_admin_client._client.cluster._brokers[0].nodeId configs = kafka_admin_client.describe_configs([ ConfigResource(ConfigResourceType.TOPIC, topic), ConfigResource(ConfigResourceType.BROKER, broker_id)]) @@ -123,3 +125,13 @@ def test_describe_configs_mixed_resources_returns_configs(simple_client, kafka_a (config.resources[0][2] == ConfigResourceType.BROKER and config.resources[0][3] == str(broker_id)) assert len(config.resources[0][4]) > 1 + + +@pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Describe config features require broker >=0.11") +def test_describe_configs_invalid_broker_id_raises(kafka_admin_client): + """Tests that describe config raises exception on non-integer broker id + """ + broker_id = "str" + + with pytest.raises(ValueError): + configs = kafka_admin_client.describe_configs([ConfigResource(ConfigResourceType.BROKER, broker_id)]) From 88afc9edec5b6891a413beb85171f9f486f589d2 Mon Sep 17 00:00:00 2001 From: Jeppe Andersen Date: Wed, 9 Oct 2019 20:11:54 +0200 Subject: [PATCH 5/5] Fix fixture order causing topic not to be available --- test/test_admin_integration.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/test_admin_integration.py b/test/test_admin_integration.py index a81b039ea..0b041b27d 100644 --- a/test/test_admin_integration.py +++ b/test/test_admin_integration.py @@ -97,7 +97,7 @@ def test_describe_configs_broker_resource_returns_configs(kafka_admin_client): @pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Describe config features require broker >=0.11") -def test_describe_configs_topic_resource_returns_configs(kafka_admin_client, topic): +def test_describe_configs_topic_resource_returns_configs(topic, kafka_admin_client): """Tests that describe config returns configs for topic """ configs = kafka_admin_client.describe_configs([ConfigResource(ConfigResourceType.TOPIC, topic)]) @@ -109,7 +109,7 @@ def test_describe_configs_topic_resource_returns_configs(kafka_admin_client, top @pytest.mark.skipif(env_kafka_version() < (0, 11), reason="Describe config features require broker >=0.11") -def test_describe_configs_mixed_resources_returns_configs(kafka_admin_client, topic): +def test_describe_configs_mixed_resources_returns_configs(topic, kafka_admin_client): """Tests that describe config returns configs for mixed resource types (topic + broker) """ broker_id = kafka_admin_client._client.cluster._brokers[0].nodeId