Skip to content

support userscramcredentials apis #1168

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 13 commits into from
Jul 28, 2023
Merged
107 changes: 107 additions & 0 deletions alteruserscramcredentials.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,107 @@
package kafka

import (
"context"
"fmt"
"net"
"time"

"github.com/segmentio/kafka-go/protocol/alteruserscramcredentials"
)

// AlterUserScramCredentialsRequest represents a request sent to a kafka broker to
// alter user scram credentials.
type AlterUserScramCredentialsRequest struct {
// Address of the kafka broker to send the request to.
Addr net.Addr

// List of credentials to delete.
Deletions []UserScramCredentialsDeletion

// List of credentials to upsert.
Upsertions []UserScramCredentialsUpsertion
}

type ScramMechanism int8

const (
ScramMechanismUnknown ScramMechanism = iota // 0
ScramMechanismSha256 // 1
ScramMechanismSha512 // 2
)

type UserScramCredentialsDeletion struct {
Name string
Mechanism ScramMechanism
}

type UserScramCredentialsUpsertion struct {
Name string
Mechanism ScramMechanism
Iterations int
Salt []byte
SaltedPassword []byte
}

// AlterUserScramCredentialsResponse represents a response from a kafka broker to an alter user
// credentials request.
type AlterUserScramCredentialsResponse struct {
// The amount of time that the broker throttled the request.
Throttle time.Duration

// List of altered user scram credentials.
Results []AlterUserScramCredentialsResponseUser
}

type AlterUserScramCredentialsResponseUser struct {
User string
Error error
}

// AlterUserScramCredentials sends user scram credentials alteration request to a kafka broker and returns
// the response.
func (c *Client) AlterUserScramCredentials(ctx context.Context, req *AlterUserScramCredentialsRequest) (*AlterUserScramCredentialsResponse, error) {
deletions := make([]alteruserscramcredentials.RequestUserScramCredentialsDeletion, len(req.Deletions))
upsertions := make([]alteruserscramcredentials.RequestUserScramCredentialsUpsertion, len(req.Upsertions))

for deletionIdx, deletion := range req.Deletions {
deletions[deletionIdx] = alteruserscramcredentials.RequestUserScramCredentialsDeletion{
Name: deletion.Name,
Mechanism: int8(deletion.Mechanism),
}
}

for upsertionIdx, upsertion := range req.Upsertions {
upsertions[upsertionIdx] = alteruserscramcredentials.RequestUserScramCredentialsUpsertion{
Name: upsertion.Name,
Mechanism: int8(upsertion.Mechanism),
Iterations: int32(upsertion.Iterations),
Salt: upsertion.Salt,
SaltedPassword: upsertion.SaltedPassword,
}
}

m, err := c.roundTrip(ctx, req.Addr, &alteruserscramcredentials.Request{
Deletions: deletions,
Upsertions: upsertions,
})
if err != nil {
return nil, fmt.Errorf("kafka.(*Client).AlterUserScramCredentials: %w", err)
}

res := m.(*alteruserscramcredentials.Response)
responseEntries := make([]AlterUserScramCredentialsResponseUser, len(res.Results))

for responseIdx, responseResult := range res.Results {
responseEntries[responseIdx] = AlterUserScramCredentialsResponseUser{
User: responseResult.User,
Error: makeError(responseResult.ErrorCode, responseResult.ErrorMessage),
}
}
ret := &AlterUserScramCredentialsResponse{
Throttle: makeDuration(res.ThrottleTimeMs),
Results: responseEntries,
}

return ret, nil
}
73 changes: 73 additions & 0 deletions alteruserscramcredentials_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package kafka

import (
"context"
"testing"

ktesting "github.com/segmentio/kafka-go/testing"
)

func TestAlterUserScramCredentials(t *testing.T) {
// https://issues.apache.org/jira/browse/KAFKA-10259
if !ktesting.KafkaIsAtLeast("2.7.0") {
return
}

client, shutdown := newLocalClient()
defer shutdown()

name := makeTopic()

createRes, err := client.AlterUserScramCredentials(context.Background(), &AlterUserScramCredentialsRequest{
Upsertions: []UserScramCredentialsUpsertion{
{
Name: name,
Mechanism: ScramMechanismSha512,
Iterations: 15000,
Salt: []byte("my-salt"),
SaltedPassword: []byte("my-salted-password"),
},
},
})

if err != nil {
t.Fatal(err)
}

if len(createRes.Results) != 1 {
t.Fatalf("expected 1 createResult; got %d", len(createRes.Results))
}

if createRes.Results[0].User != name {
t.Fatalf("expected createResult with user: %s, got %s", name, createRes.Results[0].User)
}

if createRes.Results[0].Error != nil {
t.Fatalf("didn't expect an error in createResult, got %v", createRes.Results[0].Error)
}

deleteRes, err := client.AlterUserScramCredentials(context.Background(), &AlterUserScramCredentialsRequest{
Deletions: []UserScramCredentialsDeletion{
{
Name: name,
Mechanism: ScramMechanismSha512,
},
},
})

if err != nil {
t.Fatal(err)
}

if len(deleteRes.Results) != 1 {
t.Fatalf("expected 1 deleteResult; got %d", len(deleteRes.Results))
}

if deleteRes.Results[0].User != name {
t.Fatalf("expected deleteResult with user: %s, got %s", name, deleteRes.Results[0].User)
}

if deleteRes.Results[0].Error != nil {
t.Fatalf("didn't expect an error in deleteResult, got %v", deleteRes.Results[0].Error)
}
}
97 changes: 97 additions & 0 deletions describeuserscramcredentials.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
package kafka

import (
"context"
"fmt"
"net"
"time"

"github.com/segmentio/kafka-go/protocol/describeuserscramcredentials"
)

// DescribeUserScramCredentialsRequest represents a request sent to a kafka broker to
// describe user scram credentials.
type DescribeUserScramCredentialsRequest struct {
// Address of the kafka broker to send the request to.
Addr net.Addr

// List of Scram users to describe
Users []UserScramCredentialsUser
}

type UserScramCredentialsUser struct {
Name string
}

// DescribeUserScramCredentialsResponse represents a response from a kafka broker to a describe user
// credentials request.
type DescribeUserScramCredentialsResponse struct {
// The amount of time that the broker throttled the request.
Throttle time.Duration

// Top level error that occurred while attempting to describe
// the user scram credentials.
//
// The errors contain the kafka error code. Programs may use the standard
// errors.Is function to test the error against kafka error codes.
Error error

// List of described user scram credentials.
Results []DescribeUserScramCredentialsResponseResult
}

type DescribeUserScramCredentialsResponseResult struct {
User string
CredentialInfos []DescribeUserScramCredentialsCredentialInfo
Error error
}

type DescribeUserScramCredentialsCredentialInfo struct {
Mechanism ScramMechanism
Iterations int
}

// DescribeUserScramCredentials sends a user scram credentials describe request to a kafka broker and returns
// the response.
func (c *Client) DescribeUserScramCredentials(ctx context.Context, req *DescribeUserScramCredentialsRequest) (*DescribeUserScramCredentialsResponse, error) {
users := make([]describeuserscramcredentials.RequestUser, len(req.Users))

for userIdx, user := range req.Users {
users[userIdx] = describeuserscramcredentials.RequestUser{
Name: user.Name,
}
}

m, err := c.roundTrip(ctx, req.Addr, &describeuserscramcredentials.Request{
Users: users,
})
if err != nil {
return nil, fmt.Errorf("kafka.(*Client).DescribeUserScramCredentials: %w", err)
}

res := m.(*describeuserscramcredentials.Response)
responseResults := make([]DescribeUserScramCredentialsResponseResult, len(res.Results))

for responseIdx, responseResult := range res.Results {
credentialInfos := make([]DescribeUserScramCredentialsCredentialInfo, len(responseResult.CredentialInfos))

for credentialInfoIdx, credentialInfo := range responseResult.CredentialInfos {
credentialInfos[credentialInfoIdx] = DescribeUserScramCredentialsCredentialInfo{
Mechanism: ScramMechanism(credentialInfo.Mechanism),
Iterations: int(credentialInfo.Iterations),
}
}
responseResults[responseIdx] = DescribeUserScramCredentialsResponseResult{
User: responseResult.User,
CredentialInfos: credentialInfos,
Error: makeError(responseResult.ErrorCode, responseResult.ErrorMessage),
}
}
ret := &DescribeUserScramCredentialsResponse{
Throttle: makeDuration(res.ThrottleTimeMs),
Error: makeError(res.ErrorCode, res.ErrorMessage),
Results: responseResults,
}

return ret, nil
}
Loading