Skip to content

Support SASL OAuthBearer Authentication #1750

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Mar 23, 2019
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
Handle empty extensions map, fix documentation
  • Loading branch information
pt2pham committed Mar 22, 2019
commit 7fc9cef1188c86c4d7743f95c37826232d2113b4
2 changes: 1 addition & 1 deletion kafka/conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -707,7 +707,7 @@ def _token_extensions(self):

# Only run if the #extensions() method is implemented by the clients Token Provider class
# Builds up a string separated by \x01 via a dict of key value pairs
if callable(getattr(token_provider, "extensions", None)):
if callable(getattr(token_provider, "extensions", None)) and len(token_provider.extensions()) > 0:
msg = "\x01".join(["{}={}".format(k, v) for k, v in token_provider.extensions().items()])
return "\x01" + msg
else:
Expand Down
18 changes: 11 additions & 7 deletions kafka/oauth/abstract.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,20 +9,14 @@ class AbstractTokenProvider(ABC):
"""
A Token Provider must be used for the SASL OAuthBearer protocol.

The implementation shsould ensure token reuse so that multiple
The implementation should ensure token reuse so that multiple
calls at connect time do not create multiple tokens. The implementation
should also periodically refresh the token in order to guarantee
that each call returns an unexpired token. A timeout error should
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How does this timeout error work?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's up to the implementer but ideally if the Token Provider pings some OAuth server for a token, if it takes too long it shouldn't go on forever. This was something that I've read/done in other OAuth implementations for different Kafka client libraries.

be returned after a short period of inactivity so that the
broker can log debugging info and retry.

Token Providers MUST be implemented from this ABC.

An optional method that may be implemented if the user chooses is:
#extensions() - Returns a map of key-value pairs that can
be sent with the SASL/OAUTHBEARER initial client request. If
not provided, the values are ignored. This feature is only available
in Kafka >= 2.1.0.
"""

def __init__(self, **config):
Expand All @@ -36,3 +30,13 @@ def token(self):
"""
pass

def extensions(self):
"""
This is an OPTIONAL method that may be implemented.

Returns a map of key-value pairs that can
be sent with the SASL/OAUTHBEARER initial client request. If
not implemented, the values are ignored. This feature is only available
in Kafka >= 2.1.0.
"""
pass