-
Notifications
You must be signed in to change notification settings - Fork 1.4k
Support SASL OAuthBearer Authentication #1750
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
2582245
971aa1b
7fc9cef
acfcd86
f925328
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,3 @@ | ||
from __future__ import absolute_import | ||
|
||
from kafka.oauth.abstract import AbstractTokenProvider |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
from __future__ import absolute_import | ||
|
||
import abc | ||
|
||
# This statement is compatible with both Python 2.7 & 3+ | ||
ABC = abc.ABCMeta('ABC', (object,), {'__slots__': ()}) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. interesting -- haven't seen this before |
||
|
||
class AbstractTokenProvider(ABC): | ||
""" | ||
A Token Provider must be used for the SASL OAuthBearer protocol. | ||
|
||
The implementation should ensure token reuse so that multiple | ||
calls at connect time do not create multiple tokens. The implementation | ||
should also periodically refresh the token in order to guarantee | ||
that each call returns an unexpired token. A timeout error should | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How does this timeout error work? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It's up to the implementer but ideally if the Token Provider pings some OAuth server for a token, if it takes too long it shouldn't go on forever. This was something that I've read/done in other OAuth implementations for different Kafka client libraries. |
||
be returned after a short period of inactivity so that the | ||
broker can log debugging info and retry. | ||
|
||
Token Providers MUST implement the token() method | ||
""" | ||
|
||
def __init__(self, **config): | ||
pass | ||
|
||
@abc.abstractmethod | ||
def token(self): | ||
""" | ||
Returns a (str) ID/Access Token to be sent to the Kafka | ||
client. | ||
""" | ||
pass | ||
|
||
def extensions(self): | ||
""" | ||
This is an OPTIONAL method that may be implemented. | ||
|
||
Returns a map of key-value pairs that can | ||
be sent with the SASL/OAUTHBEARER initial client request. If | ||
not implemented, the values are ignored. This feature is only available | ||
in Kafka >= 2.1.0. | ||
""" | ||
return {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is string
format()
going to guarantee correct encoding here? I might suggest operating on and returningbytes
instead. E.g.,b''.join([b'n,,\x01auth=Bearer ', token, extensions, b'\x01\x01'])
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I convert to bytes and encode the output of this function in
_try_authenticate_oauth
so I believe the encoding will be correct.