-
Notifications
You must be signed in to change notification settings - Fork 813
support userscramcredentials apis #1168
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from all commits
Commits
Show all changes
13 commits
Select commit
Hold shift + click to select a range
b1e64b1
userscramcredentials protocols
petedannemann b5c2c5d
alteruserscramcredentials working
petedannemann 8ed61fb
describeuserscramcredentials working
petedannemann 8da13e8
gofmt -s -w alteruserscramcredentials_test.go
petedannemann 663543e
fix typo
petedannemann a79d9e1
add tests for deletion
petedannemann 209bfe4
gofmt
petedannemann 54fc57c
improve test
petedannemann 72d25e2
separate alteruserscramcredentials_test and describeuserscramcredenti…
petedannemann 7eac2f3
add protocol tests
petedannemann 736ebcc
remove unused v1 constant
petedannemann 09698f0
change iterations from int32 to int
petedannemann d10c841
keep errors with results
petedannemann File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,107 @@ | ||
package kafka | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"net" | ||
"time" | ||
|
||
"github.com/segmentio/kafka-go/protocol/alteruserscramcredentials" | ||
) | ||
|
||
// AlterUserScramCredentialsRequest represents a request sent to a kafka broker to | ||
// alter user scram credentials. | ||
type AlterUserScramCredentialsRequest struct { | ||
// Address of the kafka broker to send the request to. | ||
Addr net.Addr | ||
|
||
// List of credentials to delete. | ||
Deletions []UserScramCredentialsDeletion | ||
|
||
// List of credentials to upsert. | ||
Upsertions []UserScramCredentialsUpsertion | ||
} | ||
|
||
type ScramMechanism int8 | ||
|
||
const ( | ||
ScramMechanismUnknown ScramMechanism = iota // 0 | ||
ScramMechanismSha256 // 1 | ||
ScramMechanismSha512 // 2 | ||
) | ||
|
||
type UserScramCredentialsDeletion struct { | ||
Name string | ||
Mechanism ScramMechanism | ||
} | ||
|
||
type UserScramCredentialsUpsertion struct { | ||
Name string | ||
Mechanism ScramMechanism | ||
Iterations int | ||
Salt []byte | ||
SaltedPassword []byte | ||
} | ||
|
||
// AlterUserScramCredentialsResponse represents a response from a kafka broker to an alter user | ||
// credentials request. | ||
type AlterUserScramCredentialsResponse struct { | ||
// The amount of time that the broker throttled the request. | ||
Throttle time.Duration | ||
|
||
// List of altered user scram credentials. | ||
Results []AlterUserScramCredentialsResponseUser | ||
} | ||
|
||
type AlterUserScramCredentialsResponseUser struct { | ||
User string | ||
Error error | ||
} | ||
|
||
// AlterUserScramCredentials sends user scram credentials alteration request to a kafka broker and returns | ||
// the response. | ||
func (c *Client) AlterUserScramCredentials(ctx context.Context, req *AlterUserScramCredentialsRequest) (*AlterUserScramCredentialsResponse, error) { | ||
deletions := make([]alteruserscramcredentials.RequestUserScramCredentialsDeletion, len(req.Deletions)) | ||
upsertions := make([]alteruserscramcredentials.RequestUserScramCredentialsUpsertion, len(req.Upsertions)) | ||
|
||
for deletionIdx, deletion := range req.Deletions { | ||
deletions[deletionIdx] = alteruserscramcredentials.RequestUserScramCredentialsDeletion{ | ||
Name: deletion.Name, | ||
Mechanism: int8(deletion.Mechanism), | ||
} | ||
} | ||
|
||
for upsertionIdx, upsertion := range req.Upsertions { | ||
upsertions[upsertionIdx] = alteruserscramcredentials.RequestUserScramCredentialsUpsertion{ | ||
Name: upsertion.Name, | ||
Mechanism: int8(upsertion.Mechanism), | ||
Iterations: int32(upsertion.Iterations), | ||
Salt: upsertion.Salt, | ||
SaltedPassword: upsertion.SaltedPassword, | ||
} | ||
} | ||
|
||
m, err := c.roundTrip(ctx, req.Addr, &alteruserscramcredentials.Request{ | ||
Deletions: deletions, | ||
Upsertions: upsertions, | ||
}) | ||
if err != nil { | ||
return nil, fmt.Errorf("kafka.(*Client).AlterUserScramCredentials: %w", err) | ||
} | ||
|
||
res := m.(*alteruserscramcredentials.Response) | ||
responseEntries := make([]AlterUserScramCredentialsResponseUser, len(res.Results)) | ||
|
||
for responseIdx, responseResult := range res.Results { | ||
responseEntries[responseIdx] = AlterUserScramCredentialsResponseUser{ | ||
User: responseResult.User, | ||
Error: makeError(responseResult.ErrorCode, responseResult.ErrorMessage), | ||
} | ||
} | ||
ret := &AlterUserScramCredentialsResponse{ | ||
Throttle: makeDuration(res.ThrottleTimeMs), | ||
Results: responseEntries, | ||
} | ||
|
||
return ret, nil | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
package kafka | ||
|
||
import ( | ||
"context" | ||
"testing" | ||
|
||
ktesting "github.com/segmentio/kafka-go/testing" | ||
) | ||
|
||
func TestAlterUserScramCredentials(t *testing.T) { | ||
// https://issues.apache.org/jira/browse/KAFKA-10259 | ||
if !ktesting.KafkaIsAtLeast("2.7.0") { | ||
return | ||
} | ||
|
||
client, shutdown := newLocalClient() | ||
defer shutdown() | ||
|
||
name := makeTopic() | ||
|
||
createRes, err := client.AlterUserScramCredentials(context.Background(), &AlterUserScramCredentialsRequest{ | ||
Upsertions: []UserScramCredentialsUpsertion{ | ||
{ | ||
Name: name, | ||
Mechanism: ScramMechanismSha512, | ||
Iterations: 15000, | ||
Salt: []byte("my-salt"), | ||
SaltedPassword: []byte("my-salted-password"), | ||
}, | ||
}, | ||
}) | ||
|
||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
if len(createRes.Results) != 1 { | ||
t.Fatalf("expected 1 createResult; got %d", len(createRes.Results)) | ||
} | ||
|
||
if createRes.Results[0].User != name { | ||
t.Fatalf("expected createResult with user: %s, got %s", name, createRes.Results[0].User) | ||
} | ||
|
||
if createRes.Results[0].Error != nil { | ||
t.Fatalf("didn't expect an error in createResult, got %v", createRes.Results[0].Error) | ||
} | ||
|
||
deleteRes, err := client.AlterUserScramCredentials(context.Background(), &AlterUserScramCredentialsRequest{ | ||
Deletions: []UserScramCredentialsDeletion{ | ||
{ | ||
Name: name, | ||
Mechanism: ScramMechanismSha512, | ||
}, | ||
}, | ||
}) | ||
|
||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
|
||
if len(deleteRes.Results) != 1 { | ||
t.Fatalf("expected 1 deleteResult; got %d", len(deleteRes.Results)) | ||
} | ||
|
||
if deleteRes.Results[0].User != name { | ||
t.Fatalf("expected deleteResult with user: %s, got %s", name, deleteRes.Results[0].User) | ||
} | ||
|
||
if deleteRes.Results[0].Error != nil { | ||
t.Fatalf("didn't expect an error in deleteResult, got %v", deleteRes.Results[0].Error) | ||
} | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,97 @@ | ||
package kafka | ||
|
||
import ( | ||
"context" | ||
"fmt" | ||
"net" | ||
"time" | ||
|
||
"github.com/segmentio/kafka-go/protocol/describeuserscramcredentials" | ||
) | ||
|
||
// DescribeUserScramCredentialsRequest represents a request sent to a kafka broker to | ||
// describe user scram credentials. | ||
type DescribeUserScramCredentialsRequest struct { | ||
// Address of the kafka broker to send the request to. | ||
Addr net.Addr | ||
|
||
// List of Scram users to describe | ||
Users []UserScramCredentialsUser | ||
} | ||
|
||
type UserScramCredentialsUser struct { | ||
Name string | ||
} | ||
|
||
// DescribeUserScramCredentialsResponse represents a response from a kafka broker to a describe user | ||
// credentials request. | ||
type DescribeUserScramCredentialsResponse struct { | ||
// The amount of time that the broker throttled the request. | ||
Throttle time.Duration | ||
|
||
// Top level error that occurred while attempting to describe | ||
// the user scram credentials. | ||
// | ||
// The errors contain the kafka error code. Programs may use the standard | ||
// errors.Is function to test the error against kafka error codes. | ||
Error error | ||
|
||
// List of described user scram credentials. | ||
Results []DescribeUserScramCredentialsResponseResult | ||
} | ||
|
||
type DescribeUserScramCredentialsResponseResult struct { | ||
User string | ||
CredentialInfos []DescribeUserScramCredentialsCredentialInfo | ||
Error error | ||
} | ||
|
||
type DescribeUserScramCredentialsCredentialInfo struct { | ||
Mechanism ScramMechanism | ||
Iterations int | ||
} | ||
|
||
// DescribeUserScramCredentials sends a user scram credentials describe request to a kafka broker and returns | ||
// the response. | ||
func (c *Client) DescribeUserScramCredentials(ctx context.Context, req *DescribeUserScramCredentialsRequest) (*DescribeUserScramCredentialsResponse, error) { | ||
users := make([]describeuserscramcredentials.RequestUser, len(req.Users)) | ||
|
||
for userIdx, user := range req.Users { | ||
users[userIdx] = describeuserscramcredentials.RequestUser{ | ||
Name: user.Name, | ||
} | ||
} | ||
|
||
m, err := c.roundTrip(ctx, req.Addr, &describeuserscramcredentials.Request{ | ||
Users: users, | ||
}) | ||
if err != nil { | ||
return nil, fmt.Errorf("kafka.(*Client).DescribeUserScramCredentials: %w", err) | ||
} | ||
|
||
res := m.(*describeuserscramcredentials.Response) | ||
responseResults := make([]DescribeUserScramCredentialsResponseResult, len(res.Results)) | ||
|
||
for responseIdx, responseResult := range res.Results { | ||
credentialInfos := make([]DescribeUserScramCredentialsCredentialInfo, len(responseResult.CredentialInfos)) | ||
|
||
for credentialInfoIdx, credentialInfo := range responseResult.CredentialInfos { | ||
credentialInfos[credentialInfoIdx] = DescribeUserScramCredentialsCredentialInfo{ | ||
Mechanism: ScramMechanism(credentialInfo.Mechanism), | ||
Iterations: int(credentialInfo.Iterations), | ||
} | ||
} | ||
responseResults[responseIdx] = DescribeUserScramCredentialsResponseResult{ | ||
User: responseResult.User, | ||
CredentialInfos: credentialInfos, | ||
Error: makeError(responseResult.ErrorCode, responseResult.ErrorMessage), | ||
} | ||
} | ||
ret := &DescribeUserScramCredentialsResponse{ | ||
Throttle: makeDuration(res.ThrottleTimeMs), | ||
Error: makeError(res.ErrorCode, res.ErrorMessage), | ||
Results: responseResults, | ||
} | ||
|
||
return ret, nil | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.