diff --git a/kafka/admin/client.py b/kafka/admin/client.py index d02a68a19..39f7e1af7 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -133,6 +133,8 @@ class KafkaAdminClient(object): Default: None sasl_kerberos_service_name (str): Service name to include in GSSAPI sasl mechanism handshake. Default: 'kafka' + sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider + instance. (See kafka.oauth.abstract). Default: None """ DEFAULT_CONFIG = { @@ -166,6 +168,7 @@ class KafkaAdminClient(object): 'sasl_plain_username': None, 'sasl_plain_password': None, 'sasl_kerberos_service_name': 'kafka', + 'sasl_oauth_token_provider': None, # metrics configs 'metric_reporters': [], diff --git a/kafka/client_async.py b/kafka/client_async.py index fdf5454f6..b2527a361 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -146,6 +146,8 @@ class KafkaClient(object): sasl mechanism handshake. Default: 'kafka' sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI sasl mechanism handshake. Default: one of bootstrap servers + sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider + instance. (See kafka.oauth.abstract). Default: None """ DEFAULT_CONFIG = { @@ -182,7 +184,8 @@ class KafkaClient(object): 'sasl_plain_username': None, 'sasl_plain_password': None, 'sasl_kerberos_service_name': 'kafka', - 'sasl_kerberos_domain_name': None + 'sasl_kerberos_domain_name': None, + 'sasl_oauth_token_provider': None } def __init__(self, **configs): diff --git a/kafka/conn.py b/kafka/conn.py index e857d0ac5..7a1eb2332 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -25,6 +25,7 @@ import kafka.errors as Errors from kafka.future import Future from kafka.metrics.stats import Avg, Count, Max, Rate +from kafka.oauth.abstract import AbstractTokenProvider from kafka.protocol.admin import SaslHandShakeRequest from kafka.protocol.commit import OffsetFetchRequest from kafka.protocol.metadata import MetadataRequest @@ -179,6 +180,8 @@ class BrokerConnection(object): sasl mechanism handshake. Default: 'kafka' sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI sasl mechanism handshake. Default: one of bootstrap servers + sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider + instance. (See kafka.oauth.abstract). Default: None """ DEFAULT_CONFIG = { @@ -210,10 +213,11 @@ class BrokerConnection(object): 'sasl_plain_username': None, 'sasl_plain_password': None, 'sasl_kerberos_service_name': 'kafka', - 'sasl_kerberos_domain_name': None + 'sasl_kerberos_domain_name': None, + 'sasl_oauth_token_provider': None } SECURITY_PROTOCOLS = ('PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL') - SASL_MECHANISMS = ('PLAIN', 'GSSAPI') + SASL_MECHANISMS = ('PLAIN', 'GSSAPI', 'OAUTHBEARER') def __init__(self, host, port, afi, **configs): self.host = host @@ -257,7 +261,10 @@ def __init__(self, host, port, afi, **configs): if self.config['sasl_mechanism'] == 'GSSAPI': assert gssapi is not None, 'GSSAPI lib not available' assert self.config['sasl_kerberos_service_name'] is not None, 'sasl_kerberos_service_name required for GSSAPI sasl' - + if self.config['sasl_mechanism'] == 'OAUTHBEARER': + token_provider = self.config['sasl_oauth_token_provider'] + assert token_provider is not None, 'sasl_oauth_token_provider required for OAUTHBEARER sasl' + assert callable(getattr(token_provider, "token", None)), 'sasl_oauth_token_provider must implement method #token()' # This is not a general lock / this class is not generally thread-safe yet # However, to avoid pushing responsibility for maintaining # per-connection locks to the upstream client, we will use this lock to @@ -536,6 +543,8 @@ def _handle_sasl_handshake_response(self, future, response): return self._try_authenticate_plain(future) elif self.config['sasl_mechanism'] == 'GSSAPI': return self._try_authenticate_gssapi(future) + elif self.config['sasl_mechanism'] == 'OAUTHBEARER': + return self._try_authenticate_oauth(future) else: return future.failure( Errors.UnsupportedSaslMechanismError( @@ -659,6 +668,51 @@ def _try_authenticate_gssapi(self, future): log.info('%s: Authenticated as %s via GSSAPI', self, gssapi_name) return future.success(True) + def _try_authenticate_oauth(self, future): + data = b'' + + msg = bytes(self._build_oauth_client_request().encode("utf-8")) + size = Int32.encode(len(msg)) + try: + # Send SASL OAuthBearer request with OAuth token + self._send_bytes_blocking(size + msg) + + # The server will send a zero sized message (that is Int32(0)) on success. + # The connection is closed on failure + data = self._recv_bytes_blocking(4) + + except ConnectionError as e: + log.exception("%s: Error receiving reply from server", self) + error = Errors.KafkaConnectionError("%s: %s" % (self, e)) + self.close(error=error) + return future.failure(error) + + if data != b'\x00\x00\x00\x00': + error = Errors.AuthenticationFailedError('Unrecognized response during authentication') + return future.failure(error) + + log.info('%s: Authenticated via OAuth', self) + return future.success(True) + + def _build_oauth_client_request(self): + token_provider = self.config['sasl_oauth_token_provider'] + return "n,,\x01auth=Bearer {}{}\x01\x01".format(token_provider.token(), self._token_extensions()) + + def _token_extensions(self): + """ + Return a string representation of the OPTIONAL key-value pairs that can be sent with an OAUTHBEARER + initial request. + """ + token_provider = self.config['sasl_oauth_token_provider'] + + # Only run if the #extensions() method is implemented by the clients Token Provider class + # Builds up a string separated by \x01 via a dict of key value pairs + if callable(getattr(token_provider, "extensions", None)) and len(token_provider.extensions()) > 0: + msg = "\x01".join(["{}={}".format(k, v) for k, v in token_provider.extensions().items()]) + return "\x01" + msg + else: + return "" + def blacked_out(self): """ Return true if we are disconnected from the given node and can't diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index f52189188..f9ddabc7e 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -235,6 +235,8 @@ class KafkaConsumer(six.Iterator): sasl mechanism handshake. Default: 'kafka' sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI sasl mechanism handshake. Default: one of bootstrap servers + sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider + instance. (See kafka.oauth.abstract). Default: None Note: Configuration parameters are described in more detail at @@ -293,7 +295,8 @@ class KafkaConsumer(six.Iterator): 'sasl_plain_username': None, 'sasl_plain_password': None, 'sasl_kerberos_service_name': 'kafka', - 'sasl_kerberos_domain_name': None + 'sasl_kerberos_domain_name': None, + 'sasl_oauth_token_provider': None } DEFAULT_SESSION_TIMEOUT_MS_0_9 = 30000 diff --git a/kafka/oauth/__init__.py b/kafka/oauth/__init__.py new file mode 100644 index 000000000..8c8349564 --- /dev/null +++ b/kafka/oauth/__init__.py @@ -0,0 +1,3 @@ +from __future__ import absolute_import + +from kafka.oauth.abstract import AbstractTokenProvider diff --git a/kafka/oauth/abstract.py b/kafka/oauth/abstract.py new file mode 100644 index 000000000..8d89ff51d --- /dev/null +++ b/kafka/oauth/abstract.py @@ -0,0 +1,42 @@ +from __future__ import absolute_import + +import abc + +# This statement is compatible with both Python 2.7 & 3+ +ABC = abc.ABCMeta('ABC', (object,), {'__slots__': ()}) + +class AbstractTokenProvider(ABC): + """ + A Token Provider must be used for the SASL OAuthBearer protocol. + + The implementation should ensure token reuse so that multiple + calls at connect time do not create multiple tokens. The implementation + should also periodically refresh the token in order to guarantee + that each call returns an unexpired token. A timeout error should + be returned after a short period of inactivity so that the + broker can log debugging info and retry. + + Token Providers MUST implement the token() method + """ + + def __init__(self, **config): + pass + + @abc.abstractmethod + def token(self): + """ + Returns a (str) ID/Access Token to be sent to the Kafka + client. + """ + pass + + def extensions(self): + """ + This is an OPTIONAL method that may be implemented. + + Returns a map of key-value pairs that can + be sent with the SASL/OAUTHBEARER initial client request. If + not implemented, the values are ignored. This feature is only available + in Kafka >= 2.1.0. + """ + return {} diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index ccdd91ad4..2c76c59d4 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -272,6 +272,8 @@ class KafkaProducer(object): sasl mechanism handshake. Default: 'kafka' sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI sasl mechanism handshake. Default: one of bootstrap servers + sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider + instance. (See kafka.oauth.abstract). Default: None Note: Configuration parameters are described in more detail at @@ -322,7 +324,8 @@ class KafkaProducer(object): 'sasl_plain_username': None, 'sasl_plain_password': None, 'sasl_kerberos_service_name': 'kafka', - 'sasl_kerberos_domain_name': None + 'sasl_kerberos_domain_name': None, + 'sasl_oauth_token_provider': None } _COMPRESSORS = {