Skip to content

Commit 7da8eb1

Browse files
committed
Stop using broker-errors for client-side problems
`UnsupportedVersionError` is intended to indicate a server-side error: https://github.com/dpkp/kafka-python/blob/ba7372e44ffa1ee49fb4d5efbd67534393e944db/kafka/errors.py#L375-L378 So we should not be raising it for client-side errors. I realize that semantically this seems like the appropriate error to raise. However, this is confusing when debugging... for a real-life example, see Parsely/pykafka#697. So I strongly feel that server-side errors should be kept separate from client-side errors, even if all the client is doing is proactively protecting against hitting a situation where the broker would return this error.
1 parent 21d68c9 commit 7da8eb1

File tree

3 files changed

+44
-38
lines changed

3 files changed

+44
-38
lines changed

kafka/admin/kafka.py

Lines changed: 39 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@
55
import socket
66
from kafka.client_async import KafkaClient, selectors
77
from kafka.errors import (
8-
KafkaConfigurationError, UnsupportedVersionError, NodeNotReadyError, NotControllerError, KafkaConnectionError)
8+
IncompatibleBrokerVersion, KafkaConfigurationError, KafkaConnectionError,
9+
NodeNotReadyError, NotControllerError)
910
from kafka.metrics import MetricConfig, Metrics
1011
from kafka.protocol.admin import (
1112
CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest,
@@ -25,9 +26,11 @@ class KafkaAdmin(object):
2526
nicer, more pythonic objects. Unfortunately, this will likely break
2627
those interfaces.
2728
28-
The KafkaAdmin class will negotiate for the latest version of each message protocol format supported
29-
by both the kafka-python client library and the kafka broker. Usage of optional fields from protocol
30-
versions that are not supported by the broker will result in UnsupportedVersionError exceptions.
29+
The KafkaAdmin class will negotiate for the latest version of each message
30+
protocol format supported by both the kafka-python client library and the
31+
kafka broker. Usage of optional fields from protocol versions that are not
32+
supported by the broker will result in IncompatibleBrokerVersion exceptions.
33+
3134
3235
Use of this class requires a minimum broker version >= 0.10.0.0.
3336
@@ -223,8 +226,8 @@ def _matching_api_version(self, operation):
223226
if version < self._client.get_api_versions()[operation[0].API_KEY][0]:
224227
# max library version is less than min broker version. Not sure any brokers
225228
# actually set a min version greater than 0 right now, tho. But maybe in the future?
226-
raise UnsupportedVersionError(
227-
"Could not find matching protocol version for {}"
229+
raise IncompatibleBrokerVersion(
230+
"No version of the '{}' kafka protocol is supported by both the client and broker."
228231
.format(operation.__name__))
229232
return version
230233

@@ -246,9 +249,9 @@ def _refresh_controller_id(self):
246249
self._controller_id = response.controller_id
247250
version = self._client.check_version(self._controller_id)
248251
if version < (0, 10, 0):
249-
raise UnsupportedVersionError(
250-
"Kafka Admin interface not supported for cluster controller version {} < 0.10.0.0"
251-
.format(version))
252+
raise IncompatibleBrokerVersion(
253+
"The controller appears to be running Kafka {}. KafkaAdmin requires brokers >= 0.10.0.0."
254+
.format(version))
252255

253256
def _send_request_to_node(self, node, request):
254257
"""Send a kafka protocol message to a specific broker. Will block until the message result is received.
@@ -311,9 +314,9 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=None):
311314
timeout_ms = self._validate_timeout(timeout_ms)
312315
if version == 0:
313316
if validate_only:
314-
raise UnsupportedVersionError(
315-
"validate_only not supported on cluster version {}"
316-
.format(self.config['api_version']))
317+
raise IncompatibleBrokerVersion(
318+
"validate_only requires CreateTopicsRequest >= v1, which is not supported by Kafka {}."
319+
.format(self.config['api_version']))
317320
request = CreateTopicsRequest[version](
318321
create_topic_requests = [self._convert_new_topic_request(new_topic) for new_topic in new_topics],
319322
timeout = timeout_ms
@@ -326,10 +329,9 @@ def create_topics(self, new_topics, timeout_ms=None, validate_only=None):
326329
validate_only = validate_only
327330
)
328331
else:
329-
raise UnsupportedVersionError(
330-
"missing implementation of CreateTopics for library supported version {}"
331-
.format(version)
332-
)
332+
raise NotImplementedError(
333+
"Support for CreateTopics v{} has not yet been added to KafkaAdmin."
334+
.format(version))
333335
return self._send(request)
334336

335337
def delete_topics(self, topics, timeout_ms=None):
@@ -347,9 +349,9 @@ def delete_topics(self, topics, timeout_ms=None):
347349
timeout = timeout_ms
348350
)
349351
else:
350-
raise UnsupportedVersionError(
351-
"missing implementation of DeleteTopics for library supported version {}"
352-
.format(version))
352+
raise NotImplementedError(
353+
"Support for DeleteTopics v{} has not yet been added to KafkaAdmin."
354+
.format(version))
353355
return self._send(request)
354356

355357
# list topics functionality is in ClusterMetadata
@@ -386,9 +388,9 @@ def describe_configs(self, config_resources, include_synonyms=None):
386388
version = self._matching_api_version(DescribeConfigsRequest)
387389
if version == 0:
388390
if include_synonyms:
389-
raise UnsupportedVersionError(
390-
"include_synonyms not supported on cluster version {}"
391-
.format(self.config['api_version']))
391+
raise IncompatibleBrokerVersion(
392+
"include_synonyms requires DescribeConfigsRequest >= v1, which is not supported by Kafka {}."
393+
.format(self.config['api_version']))
392394
request = DescribeConfigsRequest[version](
393395
resources = [self._convert_describe_config_resource_request(config_resource) for config_resource in config_resources]
394396
)
@@ -399,9 +401,9 @@ def describe_configs(self, config_resources, include_synonyms=None):
399401
include_synonyms = include_synonyms
400402
)
401403
else:
402-
raise UnsupportedVersionError(
403-
"missing implementation of DescribeConfigs for library supported version {}"
404-
.format(version))
404+
raise NotImplementedError(
405+
"Support for DescribeConfigs v{} has not yet been added to KafkaAdmin."
406+
.format(version))
405407
return self._send(request)
406408

407409
@staticmethod
@@ -426,9 +428,9 @@ def alter_configs(self, config_resources):
426428
resources = [self._convert_alter_config_resource_request(config_resource) for config_resource in config_resources]
427429
)
428430
else:
429-
raise UnsupportedVersionError(
430-
"missing implementation of AlterConfigs for library supported version {}"
431-
.format(version))
431+
raise NotImplementedError(
432+
"Support for AlterConfigs v{} has not yet been added to KafkaAdmin."
433+
.format(version))
432434
return self._send(request)
433435

434436
# alter replica logs dir protocol not implemented
@@ -463,9 +465,9 @@ def create_partitions(self, topic_partitions, timeout_ms=None, validate_only=Non
463465
validate_only = validate_only
464466
)
465467
else:
466-
raise UnsupportedVersionError(
467-
"missing implementation of CreatePartitions for library supported version {}"
468-
.format(version))
468+
raise NotImplementedError(
469+
"Support for CreatePartitions v{} has not yet been added to KafkaAdmin."
470+
.format(version))
469471
return self._send(request)
470472

471473
# delete records protocol not implemented
@@ -490,9 +492,9 @@ def describe_consumer_groups(self, group_ids):
490492
groups = group_ids
491493
)
492494
else:
493-
raise UnsupportedVersionError(
494-
"missing implementation of DescribeGroups for library supported version {}"
495-
.format(version))
495+
raise NotImplementedError(
496+
"Support for DescribeGroups v{} has not yet been added to KafkaAdmin."
497+
.format(version))
496498
return self._send(request)
497499

498500
def list_consumer_groups(self):
@@ -504,9 +506,9 @@ def list_consumer_groups(self):
504506
if version <= 1:
505507
request = ListGroupsRequest[version]()
506508
else:
507-
raise UnsupportedVersionError(
508-
"missing implementation of ListGroups for library supported version {}"
509-
.format(version))
509+
raise NotImplementedError(
510+
"Support for ListGroups v{} has not yet been added to KafkaAdmin."
511+
.format(version))
510512
return self._send(request)
511513

512514
# delete groups protocol not implemented

kafka/conn.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -881,7 +881,7 @@ def get_api_versions(self):
881881
.format(version))
882882
# _api_versions is set as a side effect of check_versions() on a cluster
883883
# that supports 0.10.0 or later
884-
return self._api_versions;
884+
return self._api_versions
885885

886886
def _infer_broker_version_from_api_versions(self, api_versions):
887887
# The logic here is to check the list of supported request versions

kafka/errors.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,10 @@ class UnrecognizedBrokerVersion(KafkaError):
6262
pass
6363

6464

65+
class IncompatibleBrokerVersion(KafkaError):
66+
pass
67+
68+
6569
class CommitFailedError(KafkaError):
6670
def __init__(self, *args, **kwargs):
6771
super(CommitFailedError, self).__init__(

0 commit comments

Comments
 (0)