Skip to content

Support SASL OAuthBearer Authentication #1750

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Mar 23, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions kafka/admin/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,8 @@ class KafkaAdminClient(object):
Default: None
sasl_kerberos_service_name (str): Service name to include in GSSAPI
sasl mechanism handshake. Default: 'kafka'
sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider
instance. (See kafka.oauth.abstract). Default: None

"""
DEFAULT_CONFIG = {
Expand Down Expand Up @@ -166,6 +168,7 @@ class KafkaAdminClient(object):
'sasl_plain_username': None,
'sasl_plain_password': None,
'sasl_kerberos_service_name': 'kafka',
'sasl_oauth_token_provider': None,

# metrics configs
'metric_reporters': [],
Expand Down
5 changes: 4 additions & 1 deletion kafka/client_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,8 @@ class KafkaClient(object):
sasl mechanism handshake. Default: 'kafka'
sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI
sasl mechanism handshake. Default: one of bootstrap servers
sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider
instance. (See kafka.oauth.abstract). Default: None
"""

DEFAULT_CONFIG = {
Expand Down Expand Up @@ -182,7 +184,8 @@ class KafkaClient(object):
'sasl_plain_username': None,
'sasl_plain_password': None,
'sasl_kerberos_service_name': 'kafka',
'sasl_kerberos_domain_name': None
'sasl_kerberos_domain_name': None,
'sasl_oauth_token_provider': None
}

def __init__(self, **configs):
Expand Down
60 changes: 57 additions & 3 deletions kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import kafka.errors as Errors
from kafka.future import Future
from kafka.metrics.stats import Avg, Count, Max, Rate
from kafka.oauth.abstract import AbstractTokenProvider
from kafka.protocol.admin import SaslHandShakeRequest
from kafka.protocol.commit import OffsetFetchRequest
from kafka.protocol.metadata import MetadataRequest
Expand Down Expand Up @@ -179,6 +180,8 @@ class BrokerConnection(object):
sasl mechanism handshake. Default: 'kafka'
sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI
sasl mechanism handshake. Default: one of bootstrap servers
sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider
instance. (See kafka.oauth.abstract). Default: None
"""

DEFAULT_CONFIG = {
Expand Down Expand Up @@ -210,10 +213,11 @@ class BrokerConnection(object):
'sasl_plain_username': None,
'sasl_plain_password': None,
'sasl_kerberos_service_name': 'kafka',
'sasl_kerberos_domain_name': None
'sasl_kerberos_domain_name': None,
'sasl_oauth_token_provider': None
}
SECURITY_PROTOCOLS = ('PLAINTEXT', 'SSL', 'SASL_PLAINTEXT', 'SASL_SSL')
SASL_MECHANISMS = ('PLAIN', 'GSSAPI')
SASL_MECHANISMS = ('PLAIN', 'GSSAPI', 'OAUTHBEARER')

def __init__(self, host, port, afi, **configs):
self.host = host
Expand Down Expand Up @@ -257,7 +261,10 @@ def __init__(self, host, port, afi, **configs):
if self.config['sasl_mechanism'] == 'GSSAPI':
assert gssapi is not None, 'GSSAPI lib not available'
assert self.config['sasl_kerberos_service_name'] is not None, 'sasl_kerberos_service_name required for GSSAPI sasl'

if self.config['sasl_mechanism'] == 'OAUTHBEARER':
token_provider = self.config['sasl_oauth_token_provider']
assert token_provider is not None, 'sasl_oauth_token_provider required for OAUTHBEARER sasl'
assert callable(getattr(token_provider, "token", None)), 'sasl_oauth_token_provider must implement method #token()'
# This is not a general lock / this class is not generally thread-safe yet
# However, to avoid pushing responsibility for maintaining
# per-connection locks to the upstream client, we will use this lock to
Expand Down Expand Up @@ -536,6 +543,8 @@ def _handle_sasl_handshake_response(self, future, response):
return self._try_authenticate_plain(future)
elif self.config['sasl_mechanism'] == 'GSSAPI':
return self._try_authenticate_gssapi(future)
elif self.config['sasl_mechanism'] == 'OAUTHBEARER':
return self._try_authenticate_oauth(future)
else:
return future.failure(
Errors.UnsupportedSaslMechanismError(
Expand Down Expand Up @@ -659,6 +668,51 @@ def _try_authenticate_gssapi(self, future):
log.info('%s: Authenticated as %s via GSSAPI', self, gssapi_name)
return future.success(True)

def _try_authenticate_oauth(self, future):
data = b''

msg = bytes(self._build_oauth_client_request().encode("utf-8"))
size = Int32.encode(len(msg))
try:
# Send SASL OAuthBearer request with OAuth token
self._send_bytes_blocking(size + msg)

# The server will send a zero sized message (that is Int32(0)) on success.
# The connection is closed on failure
data = self._recv_bytes_blocking(4)

except ConnectionError as e:
log.exception("%s: Error receiving reply from server", self)
error = Errors.KafkaConnectionError("%s: %s" % (self, e))
self.close(error=error)
return future.failure(error)

if data != b'\x00\x00\x00\x00':
error = Errors.AuthenticationFailedError('Unrecognized response during authentication')
return future.failure(error)

log.info('%s: Authenticated via OAuth', self)
return future.success(True)

def _build_oauth_client_request(self):
token_provider = self.config['sasl_oauth_token_provider']
return "n,,\x01auth=Bearer {}{}\x01\x01".format(token_provider.token(), self._token_extensions())
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is string format() going to guarantee correct encoding here? I might suggest operating on and returning bytes instead. E.g., b''.join([b'n,,\x01auth=Bearer ', token, extensions, b'\x01\x01'])

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I convert to bytes and encode the output of this function in _try_authenticate_oauth so I believe the encoding will be correct.


def _token_extensions(self):
"""
Return a string representation of the OPTIONAL key-value pairs that can be sent with an OAUTHBEARER
initial request.
"""
token_provider = self.config['sasl_oauth_token_provider']

# Only run if the #extensions() method is implemented by the clients Token Provider class
# Builds up a string separated by \x01 via a dict of key value pairs
if callable(getattr(token_provider, "extensions", None)) and len(token_provider.extensions()) > 0:
msg = "\x01".join(["{}={}".format(k, v) for k, v in token_provider.extensions().items()])
return "\x01" + msg
else:
return ""

def blacked_out(self):
"""
Return true if we are disconnected from the given node and can't
Expand Down
5 changes: 4 additions & 1 deletion kafka/consumer/group.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,6 +235,8 @@ class KafkaConsumer(six.Iterator):
sasl mechanism handshake. Default: 'kafka'
sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI
sasl mechanism handshake. Default: one of bootstrap servers
sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider
instance. (See kafka.oauth.abstract). Default: None

Note:
Configuration parameters are described in more detail at
Expand Down Expand Up @@ -293,7 +295,8 @@ class KafkaConsumer(six.Iterator):
'sasl_plain_username': None,
'sasl_plain_password': None,
'sasl_kerberos_service_name': 'kafka',
'sasl_kerberos_domain_name': None
'sasl_kerberos_domain_name': None,
'sasl_oauth_token_provider': None
}
DEFAULT_SESSION_TIMEOUT_MS_0_9 = 30000

Expand Down
3 changes: 3 additions & 0 deletions kafka/oauth/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
from __future__ import absolute_import

from kafka.oauth.abstract import AbstractTokenProvider
42 changes: 42 additions & 0 deletions kafka/oauth/abstract.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from __future__ import absolute_import

import abc

# This statement is compatible with both Python 2.7 & 3+
ABC = abc.ABCMeta('ABC', (object,), {'__slots__': ()})
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

interesting -- haven't seen this before


class AbstractTokenProvider(ABC):
"""
A Token Provider must be used for the SASL OAuthBearer protocol.

The implementation should ensure token reuse so that multiple
calls at connect time do not create multiple tokens. The implementation
should also periodically refresh the token in order to guarantee
that each call returns an unexpired token. A timeout error should
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this timeout error work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's up to the implementer but ideally if the Token Provider pings some OAuth server for a token, if it takes too long it shouldn't go on forever. This was something that I've read/done in other OAuth implementations for different Kafka client libraries.

be returned after a short period of inactivity so that the
broker can log debugging info and retry.

Token Providers MUST implement the token() method
"""

def __init__(self, **config):
pass

@abc.abstractmethod
def token(self):
"""
Returns a (str) ID/Access Token to be sent to the Kafka
client.
"""
pass

def extensions(self):
"""
This is an OPTIONAL method that may be implemented.

Returns a map of key-value pairs that can
be sent with the SASL/OAUTHBEARER initial client request. If
not implemented, the values are ignored. This feature is only available
in Kafka >= 2.1.0.
"""
return {}
5 changes: 4 additions & 1 deletion kafka/producer/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,8 @@ class KafkaProducer(object):
sasl mechanism handshake. Default: 'kafka'
sasl_kerberos_domain_name (str): kerberos domain name to use in GSSAPI
sasl mechanism handshake. Default: one of bootstrap servers
sasl_oauth_token_provider (AbstractTokenProvider): OAuthBearer token provider
instance. (See kafka.oauth.abstract). Default: None

Note:
Configuration parameters are described in more detail at
Expand Down Expand Up @@ -322,7 +324,8 @@ class KafkaProducer(object):
'sasl_plain_username': None,
'sasl_plain_password': None,
'sasl_kerberos_service_name': 'kafka',
'sasl_kerberos_domain_name': None
'sasl_kerberos_domain_name': None,
'sasl_oauth_token_provider': None
}

_COMPRESSORS = {
Expand Down