From 3b677d289c0822383eae02704de0338d463d25fa Mon Sep 17 00:00:00 2001 From: maxwolf8852 Date: Wed, 30 Apr 2025 13:38:25 +0300 Subject: [PATCH 01/12] Kafka 4.0 protocol fields updated --- conn.go | 6 ++--- conn_test.go | 11 ++++---- consumergroup.go | 12 ++++----- consumergroup_test.go | 12 ++++----- createtopics.go | 62 ++++++++++++++++++++++++++++--------------- createtopics_test.go | 8 +++--- deletetopics.go | 34 ++++++++++++++---------- deletetopics_test.go | 4 +-- joingroup.go | 6 ++--- reader_test.go | 2 +- 10 files changed, 91 insertions(+), 66 deletions(-) diff --git a/conn.go b/conn.go index 2b51afbd5..eafde7563 100644 --- a/conn.go +++ b/conn.go @@ -306,7 +306,7 @@ func (c *Conn) Brokers() ([]Broker, error) { // DeleteTopics deletes the specified topics. func (c *Conn) DeleteTopics(topics ...string) error { - _, err := c.deleteTopics(deleteTopicsRequestV0{ + _, err := c.deleteTopics(deleteTopicsRequestV1{ Topics: topics, }) return err @@ -368,12 +368,12 @@ func (c *Conn) heartbeat(request heartbeatRequestV0) (heartbeatResponseV0, error // joinGroup attempts to join a consumer group // // See http://kafka.apache.org/protocol.html#The_Messages_JoinGroup -func (c *Conn) joinGroup(request joinGroupRequestV1) (joinGroupResponseV1, error) { +func (c *Conn) joinGroup(request joinGroupRequestV2) (joinGroupResponseV1, error) { var response joinGroupResponseV1 err := c.writeOperation( func(deadline time.Time, id int32) error { - return c.writeRequest(joinGroup, v1, id, request) + return c.writeRequest(joinGroup, v2, id, request) }, func(deadline time.Time, size int) error { return expectZeroSize(func() (remain int, err error) { diff --git a/conn_test.go b/conn_test.go index bdce327e0..bd94a752b 100644 --- a/conn_test.go +++ b/conn_test.go @@ -13,8 +13,9 @@ import ( "testing" "time" - ktesting "github.com/segmentio/kafka-go/testing" "golang.org/x/net/nettest" + + ktesting "github.com/segmentio/kafka-go/testing" ) type timeout struct{} @@ -682,7 +683,7 @@ func createGroup(t *testing.T, conn *Conn, groupID string) (generationID int32, join := func() (joinGroup joinGroupResponseV1) { var err error for attempt := 0; attempt < 10; attempt++ { - joinGroup, err = conn.joinGroup(joinGroupRequestV1{ + joinGroup, err = conn.joinGroup(joinGroupRequestV2{ GroupID: groupID, SessionTimeout: int32(time.Minute / time.Millisecond), RebalanceTimeout: int32(time.Second / time.Millisecond), @@ -770,7 +771,7 @@ func testConnFindCoordinator(t *testing.T, conn *Conn) { } func testConnJoinGroupInvalidGroupID(t *testing.T, conn *Conn) { - _, err := conn.joinGroup(joinGroupRequestV1{}) + _, err := conn.joinGroup(joinGroupRequestV2{}) if !errors.Is(err, InvalidGroupId) && !errors.Is(err, NotCoordinatorForGroup) { t.Fatalf("expected %v or %v; got %v", InvalidGroupId, NotCoordinatorForGroup, err) } @@ -780,7 +781,7 @@ func testConnJoinGroupInvalidSessionTimeout(t *testing.T, conn *Conn) { groupID := makeGroupID() waitForCoordinator(t, conn, groupID) - _, err := conn.joinGroup(joinGroupRequestV1{ + _, err := conn.joinGroup(joinGroupRequestV2{ GroupID: groupID, }) if !errors.Is(err, InvalidSessionTimeout) && !errors.Is(err, NotCoordinatorForGroup) { @@ -792,7 +793,7 @@ func testConnJoinGroupInvalidRefreshTimeout(t *testing.T, conn *Conn) { groupID := makeGroupID() waitForCoordinator(t, conn, groupID) - _, err := conn.joinGroup(joinGroupRequestV1{ + _, err := conn.joinGroup(joinGroupRequestV2{ GroupID: groupID, SessionTimeout: int32(3 * time.Second / time.Millisecond), }) diff --git a/consumergroup.go b/consumergroup.go index f4bb382cb..5fa5bd42b 100644 --- a/consumergroup.go +++ b/consumergroup.go @@ -555,7 +555,7 @@ func (g *Generation) partitionWatcher(interval time.Duration, topic string) { type coordinator interface { io.Closer findCoordinator(findCoordinatorRequestV0) (findCoordinatorResponseV0, error) - joinGroup(joinGroupRequestV1) (joinGroupResponseV1, error) + joinGroup(joinGroupRequestV2) (joinGroupResponseV1, error) syncGroup(syncGroupRequestV0) (syncGroupResponseV0, error) leaveGroup(leaveGroupRequestV0) (leaveGroupResponseV0, error) heartbeat(heartbeatRequestV0) (heartbeatResponseV0, error) @@ -588,7 +588,7 @@ func (t *timeoutCoordinator) findCoordinator(req findCoordinatorRequestV0) (find return t.conn.findCoordinator(req) } -func (t *timeoutCoordinator) joinGroup(req joinGroupRequestV1) (joinGroupResponseV1, error) { +func (t *timeoutCoordinator) joinGroup(req joinGroupRequestV2) (joinGroupResponseV1, error) { // in the case of join group, the consumer group coordinator may wait up // to rebalance timeout in order to wait for all members to join. if err := t.conn.SetDeadline(time.Now().Add(t.timeout + t.rebalanceTimeout)); err != nil { @@ -932,7 +932,7 @@ func (cg *ConsumerGroup) coordinator() (coordinator, error) { // * InvalidSessionTimeout: // * GroupAuthorizationFailed: func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, int32, GroupMemberAssignments, error) { - request, err := cg.makeJoinGroupRequestV1(memberID) + request, err := cg.makeJoinGroupRequestV2(memberID) if err != nil { return "", 0, nil, err } @@ -978,8 +978,8 @@ func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, i // makeJoinGroupRequestV1 handles the logic of constructing a joinGroup // request. -func (cg *ConsumerGroup) makeJoinGroupRequestV1(memberID string) (joinGroupRequestV1, error) { - request := joinGroupRequestV1{ +func (cg *ConsumerGroup) makeJoinGroupRequestV2(memberID string) (joinGroupRequestV2, error) { + request := joinGroupRequestV2{ GroupID: cg.config.ID, MemberID: memberID, SessionTimeout: int32(cg.config.SessionTimeout / time.Millisecond), @@ -990,7 +990,7 @@ func (cg *ConsumerGroup) makeJoinGroupRequestV1(memberID string) (joinGroupReque for _, balancer := range cg.config.GroupBalancers { userData, err := balancer.UserData() if err != nil { - return joinGroupRequestV1{}, fmt.Errorf("unable to construct protocol metadata for member, %v: %w", balancer.ProtocolName(), err) + return joinGroupRequestV2{}, fmt.Errorf("unable to construct protocol metadata for member, %v: %w", balancer.ProtocolName(), err) } request.GroupProtocols = append(request.GroupProtocols, joinGroupRequestGroupProtocolV1{ ProtocolName: balancer.ProtocolName(), diff --git a/consumergroup_test.go b/consumergroup_test.go index 0d3e290a9..1c7a065f9 100644 --- a/consumergroup_test.go +++ b/consumergroup_test.go @@ -15,7 +15,7 @@ var _ coordinator = mockCoordinator{} type mockCoordinator struct { closeFunc func() error findCoordinatorFunc func(findCoordinatorRequestV0) (findCoordinatorResponseV0, error) - joinGroupFunc func(joinGroupRequestV1) (joinGroupResponseV1, error) + joinGroupFunc func(joinGroupRequestV2) (joinGroupResponseV1, error) syncGroupFunc func(syncGroupRequestV0) (syncGroupResponseV0, error) leaveGroupFunc func(leaveGroupRequestV0) (leaveGroupResponseV0, error) heartbeatFunc func(heartbeatRequestV0) (heartbeatResponseV0, error) @@ -38,7 +38,7 @@ func (c mockCoordinator) findCoordinator(req findCoordinatorRequestV0) (findCoor return c.findCoordinatorFunc(req) } -func (c mockCoordinator) joinGroup(req joinGroupRequestV1) (joinGroupResponseV1, error) { +func (c mockCoordinator) joinGroup(req joinGroupRequestV2) (joinGroupResponseV1, error) { if c.joinGroupFunc == nil { return joinGroupResponseV1{}, errors.New("no joinGroup behavior specified") } @@ -419,7 +419,7 @@ func TestConsumerGroupErrors(t *testing.T) { }, }, nil } - mc.joinGroupFunc = func(joinGroupRequestV1) (joinGroupResponseV1, error) { + mc.joinGroupFunc = func(joinGroupRequestV2) (joinGroupResponseV1, error) { return joinGroupResponseV1{}, errors.New("join group failed") } // NOTE : no stub for leaving the group b/c the member never joined. @@ -449,7 +449,7 @@ func TestConsumerGroupErrors(t *testing.T) { }, }, nil } - mc.joinGroupFunc = func(joinGroupRequestV1) (joinGroupResponseV1, error) { + mc.joinGroupFunc = func(joinGroupRequestV2) (joinGroupResponseV1, error) { return joinGroupResponseV1{ ErrorCode: int16(InvalidTopic), }, nil @@ -472,7 +472,7 @@ func TestConsumerGroupErrors(t *testing.T) { { scenario: "fails to join group (leader, unsupported protocol)", prepare: func(mc *mockCoordinator) { - mc.joinGroupFunc = func(joinGroupRequestV1) (joinGroupResponseV1, error) { + mc.joinGroupFunc = func(joinGroupRequestV2) (joinGroupResponseV1, error) { return joinGroupResponseV1{ GenerationID: 12345, GroupProtocol: "foo", @@ -498,7 +498,7 @@ func TestConsumerGroupErrors(t *testing.T) { { scenario: "fails to sync group (general error)", prepare: func(mc *mockCoordinator) { - mc.joinGroupFunc = func(joinGroupRequestV1) (joinGroupResponseV1, error) { + mc.joinGroupFunc = func(joinGroupRequestV2) (joinGroupResponseV1, error) { return joinGroupResponseV1{ GenerationID: 12345, GroupProtocol: "range", diff --git a/createtopics.go b/createtopics.go index 8ad9ebf44..1bf61472f 100644 --- a/createtopics.go +++ b/createtopics.go @@ -262,7 +262,7 @@ func (t createTopicsRequestV0Topic) writeTo(wb *writeBuffer) { } // See http://kafka.apache.org/protocol.html#The_Messages_CreateTopics -type createTopicsRequestV0 struct { +type createTopicsRequestV2 struct { // Topics contains n array of single topic creation requests. Can not // have multiple entries for the same topic. Topics []createTopicsRequestV0Topic @@ -270,77 +270,95 @@ type createTopicsRequestV0 struct { // Timeout ms to wait for a topic to be completely created on the // controller node. Values <= 0 will trigger topic creation and return immediately Timeout int32 + + // If true, check that the topics can be created as specified, but don't create anything. + // Internal use only for Kafka 4.0 support. + ValidateOnly bool } -func (t createTopicsRequestV0) size() int32 { +func (t createTopicsRequestV2) size() int32 { return sizeofArray(len(t.Topics), func(i int) int32 { return t.Topics[i].size() }) + - sizeofInt32(t.Timeout) + sizeofInt32(t.Timeout) + 1 } -func (t createTopicsRequestV0) writeTo(wb *writeBuffer) { +func (t createTopicsRequestV2) writeTo(wb *writeBuffer) { wb.writeArray(len(t.Topics), func(i int) { t.Topics[i].writeTo(wb) }) wb.writeInt32(t.Timeout) + wb.writeBool(t.ValidateOnly) } -type createTopicsResponseV0TopicError struct { +type createTopicsResponseV1TopicError struct { // Topic name Topic string // ErrorCode holds response error code ErrorCode int16 + + // ErrorMessage holds responce error message string + ErrorMessage string } -func (t createTopicsResponseV0TopicError) size() int32 { +func (t createTopicsResponseV1TopicError) size() int32 { return sizeofString(t.Topic) + - sizeofInt16(t.ErrorCode) + sizeofInt16(t.ErrorCode) + + sizeofString(t.ErrorMessage) } -func (t createTopicsResponseV0TopicError) writeTo(wb *writeBuffer) { +func (t createTopicsResponseV1TopicError) writeTo(wb *writeBuffer) { wb.writeString(t.Topic) wb.writeInt16(t.ErrorCode) + wb.writeString(t.ErrorMessage) } -func (t *createTopicsResponseV0TopicError) readFrom(r *bufio.Reader, size int) (remain int, err error) { +func (t *createTopicsResponseV1TopicError) readFrom(r *bufio.Reader, size int) (remain int, err error) { if remain, err = readString(r, size, &t.Topic); err != nil { return } if remain, err = readInt16(r, remain, &t.ErrorCode); err != nil { return } + if remain, err = readString(r, remain, &t.ErrorMessage); err != nil { + return + } return } // See http://kafka.apache.org/protocol.html#The_Messages_CreateTopics -type createTopicsResponseV0 struct { - TopicErrors []createTopicsResponseV0TopicError +type createTopicsResponseV1 struct { + ThrottleTime int32 + TopicErrors []createTopicsResponseV1TopicError } -func (t createTopicsResponseV0) size() int32 { - return sizeofArray(len(t.TopicErrors), func(i int) int32 { return t.TopicErrors[i].size() }) +func (t createTopicsResponseV1) size() int32 { + return sizeofInt32(t.ThrottleTime) + sizeofArray(len(t.TopicErrors), func(i int) int32 { return t.TopicErrors[i].size() }) } -func (t createTopicsResponseV0) writeTo(wb *writeBuffer) { +func (t createTopicsResponseV1) writeTo(wb *writeBuffer) { + wb.writeInt32(t.ThrottleTime) wb.writeArray(len(t.TopicErrors), func(i int) { t.TopicErrors[i].writeTo(wb) }) } -func (t *createTopicsResponseV0) readFrom(r *bufio.Reader, size int) (remain int, err error) { +func (t *createTopicsResponseV1) readFrom(r *bufio.Reader, size int) (remain int, err error) { fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) { - var topic createTopicsResponseV0TopicError - if fnRemain, fnErr = (&topic).readFrom(r, size); err != nil { + var topic createTopicsResponseV1TopicError + if fnRemain, fnErr = (&topic).readFrom(r, size); fnErr != nil { return } t.TopicErrors = append(t.TopicErrors, topic) return } - if remain, err = readArrayWith(r, size, fn); err != nil { + if remain, err = readInt32(r, size, &t.ThrottleTime); err != nil { + return + } + if remain, err = readArrayWith(r, remain, fn); err != nil { return } return } -func (c *Conn) createTopics(request createTopicsRequestV0) (createTopicsResponseV0, error) { - var response createTopicsResponseV0 +func (c *Conn) createTopics(request createTopicsRequestV2) (createTopicsResponseV1, error) { + var response createTopicsResponseV1 err := c.writeOperation( func(deadline time.Time, id int32) error { @@ -349,7 +367,7 @@ func (c *Conn) createTopics(request createTopicsRequestV0) (createTopicsResponse deadline = adjustDeadlineForRTT(deadline, now, defaultRTT) request.Timeout = milliseconds(deadlineToTimeout(deadline, now)) } - return c.writeRequest(createTopics, v0, id, request) + return c.writeRequest(createTopics, v2, id, request) }, func(deadline time.Time, size int) error { return expectZeroSize(func() (remain int, err error) { @@ -383,7 +401,7 @@ func (c *Conn) CreateTopics(topics ...TopicConfig) error { t.toCreateTopicsRequestV0Topic()) } - _, err := c.createTopics(createTopicsRequestV0{ + _, err := c.createTopics(createTopicsRequestV2{ Topics: requestV0Topics, }) return err diff --git a/createtopics_test.go b/createtopics_test.go index 38819c382..78bba5bc1 100644 --- a/createtopics_test.go +++ b/createtopics_test.go @@ -160,9 +160,9 @@ func TestClientCreateTopics(t *testing.T) { } } -func TestCreateTopicsResponseV0(t *testing.T) { - item := createTopicsResponseV0{ - TopicErrors: []createTopicsResponseV0TopicError{ +func TestCreateTopicsResponseV1(t *testing.T) { + item := createTopicsResponseV1{ + TopicErrors: []createTopicsResponseV1TopicError{ { Topic: "topic", ErrorCode: 2, @@ -174,7 +174,7 @@ func TestCreateTopicsResponseV0(t *testing.T) { w := &writeBuffer{w: b} item.writeTo(w) - var found createTopicsResponseV0 + var found createTopicsResponseV1 remain, err := (&found).readFrom(bufio.NewReader(b), b.Len()) if err != nil { t.Error(err) diff --git a/deletetopics.go b/deletetopics.go index d758d9fd6..e3c3be198 100644 --- a/deletetopics.go +++ b/deletetopics.go @@ -67,7 +67,7 @@ func (c *Client) DeleteTopics(ctx context.Context, req *DeleteTopicsRequest) (*D } // See http://kafka.apache.org/protocol.html#The_Messages_DeleteTopics -type deleteTopicsRequestV0 struct { +type deleteTopicsRequestV1 struct { // Topics holds the topic names Topics []string @@ -77,41 +77,47 @@ type deleteTopicsRequestV0 struct { Timeout int32 } -func (t deleteTopicsRequestV0) size() int32 { +func (t deleteTopicsRequestV1) size() int32 { return sizeofStringArray(t.Topics) + sizeofInt32(t.Timeout) } -func (t deleteTopicsRequestV0) writeTo(wb *writeBuffer) { +func (t deleteTopicsRequestV1) writeTo(wb *writeBuffer) { wb.writeStringArray(t.Topics) wb.writeInt32(t.Timeout) } -type deleteTopicsResponseV0 struct { +type deleteTopicsResponseV1 struct { + ThrottleTime int32 // TopicErrorCodes holds per topic error codes TopicErrorCodes []deleteTopicsResponseV0TopicErrorCode } -func (t deleteTopicsResponseV0) size() int32 { - return sizeofArray(len(t.TopicErrorCodes), func(i int) int32 { return t.TopicErrorCodes[i].size() }) +func (t deleteTopicsResponseV1) size() int32 { + return sizeofInt32(t.ThrottleTime) + + sizeofArray(len(t.TopicErrorCodes), func(i int) int32 { return t.TopicErrorCodes[i].size() }) } -func (t *deleteTopicsResponseV0) readFrom(r *bufio.Reader, size int) (remain int, err error) { +func (t *deleteTopicsResponseV1) readFrom(r *bufio.Reader, size int) (remain int, err error) { fn := func(withReader *bufio.Reader, withSize int) (fnRemain int, fnErr error) { var item deleteTopicsResponseV0TopicErrorCode - if fnRemain, fnErr = (&item).readFrom(withReader, withSize); err != nil { + if fnRemain, fnErr = (&item).readFrom(withReader, withSize); fnErr != nil { return } t.TopicErrorCodes = append(t.TopicErrorCodes, item) return } - if remain, err = readArrayWith(r, size, fn); err != nil { + if remain, err = readInt32(r, size, &t.ThrottleTime); err != nil { + return + } + if remain, err = readArrayWith(r, remain, fn); err != nil { return } return } -func (t deleteTopicsResponseV0) writeTo(wb *writeBuffer) { +func (t deleteTopicsResponseV1) writeTo(wb *writeBuffer) { + wb.writeInt32(t.ThrottleTime) wb.writeArray(len(t.TopicErrorCodes), func(i int) { t.TopicErrorCodes[i].writeTo(wb) }) } @@ -146,8 +152,8 @@ func (t deleteTopicsResponseV0TopicErrorCode) writeTo(wb *writeBuffer) { // deleteTopics deletes the specified topics. // // See http://kafka.apache.org/protocol.html#The_Messages_DeleteTopics -func (c *Conn) deleteTopics(request deleteTopicsRequestV0) (deleteTopicsResponseV0, error) { - var response deleteTopicsResponseV0 +func (c *Conn) deleteTopics(request deleteTopicsRequestV1) (deleteTopicsResponseV1, error) { + var response deleteTopicsResponseV1 err := c.writeOperation( func(deadline time.Time, id int32) error { if request.Timeout == 0 { @@ -155,7 +161,7 @@ func (c *Conn) deleteTopics(request deleteTopicsRequestV0) (deleteTopicsResponse deadline = adjustDeadlineForRTT(deadline, now, defaultRTT) request.Timeout = milliseconds(deadlineToTimeout(deadline, now)) } - return c.writeRequest(deleteTopics, v0, id, request) + return c.writeRequest(deleteTopics, v1, id, request) }, func(deadline time.Time, size int) error { return expectZeroSize(func() (remain int, err error) { @@ -164,7 +170,7 @@ func (c *Conn) deleteTopics(request deleteTopicsRequestV0) (deleteTopicsResponse }, ) if err != nil { - return deleteTopicsResponseV0{}, err + return deleteTopicsResponseV1{}, err } for _, c := range response.TopicErrorCodes { if c.ErrorCode != 0 { diff --git a/deletetopics_test.go b/deletetopics_test.go index 3caffe840..98c6cd00d 100644 --- a/deletetopics_test.go +++ b/deletetopics_test.go @@ -29,7 +29,7 @@ func TestClientDeleteTopics(t *testing.T) { } func TestDeleteTopicsResponseV1(t *testing.T) { - item := deleteTopicsResponseV0{ + item := deleteTopicsResponseV1{ TopicErrorCodes: []deleteTopicsResponseV0TopicErrorCode{ { Topic: "a", @@ -42,7 +42,7 @@ func TestDeleteTopicsResponseV1(t *testing.T) { w := &writeBuffer{w: b} item.writeTo(w) - var found deleteTopicsResponseV0 + var found deleteTopicsResponseV1 remain, err := (&found).readFrom(bufio.NewReader(b), b.Len()) if err != nil { t.Fatal(err) diff --git a/joingroup.go b/joingroup.go index 30823a69a..d76a27491 100644 --- a/joingroup.go +++ b/joingroup.go @@ -241,7 +241,7 @@ func (t joinGroupRequestGroupProtocolV1) writeTo(wb *writeBuffer) { wb.writeBytes(t.ProtocolMetadata) } -type joinGroupRequestV1 struct { +type joinGroupRequestV2 struct { // GroupID holds the unique group identifier GroupID string @@ -264,7 +264,7 @@ type joinGroupRequestV1 struct { GroupProtocols []joinGroupRequestGroupProtocolV1 } -func (t joinGroupRequestV1) size() int32 { +func (t joinGroupRequestV2) size() int32 { return sizeofString(t.GroupID) + sizeofInt32(t.SessionTimeout) + sizeofInt32(t.RebalanceTimeout) + @@ -273,7 +273,7 @@ func (t joinGroupRequestV1) size() int32 { sizeofArray(len(t.GroupProtocols), func(i int) int32 { return t.GroupProtocols[i].size() }) } -func (t joinGroupRequestV1) writeTo(wb *writeBuffer) { +func (t joinGroupRequestV2) writeTo(wb *writeBuffer) { wb.writeString(t.GroupID) wb.writeInt32(t.SessionTimeout) wb.writeInt32(t.RebalanceTimeout) diff --git a/reader_test.go b/reader_test.go index 64d45190f..0dc593d8c 100644 --- a/reader_test.go +++ b/reader_test.go @@ -301,7 +301,7 @@ func createTopic(t *testing.T, topic string, partitions int) { conn.SetDeadline(time.Now().Add(10 * time.Second)) - _, err = conn.createTopics(createTopicsRequestV0{ + _, err = conn.createTopics(createTopicsRequestV2{ Topics: []createTopicsRequestV0Topic{ { Topic: topic, From e2e2dfa7f16b8a593d626373fae6ff848897af94 Mon Sep 17 00:00:00 2001 From: maxwolf8852 Date: Wed, 30 Apr 2025 15:25:47 +0300 Subject: [PATCH 02/12] dynamic versions support --- conn.go | 19 +++++---- conn_test.go | 10 ++--- consumergroup.go | 16 +++---- consumergroup_test.go | 29 ++++++------- createtopics.go | 98 +++++++++++++++++++++++++++++-------------- createtopics_test.go | 6 +-- deletetopics.go | 52 +++++++++++++++-------- deletetopics_test.go | 4 +- joingroup.go | 35 ++++++++++++---- joingroup_test.go | 6 ++- reader_test.go | 2 +- 11 files changed, 177 insertions(+), 100 deletions(-) diff --git a/conn.go b/conn.go index eafde7563..777c3fb07 100644 --- a/conn.go +++ b/conn.go @@ -306,7 +306,7 @@ func (c *Conn) Brokers() ([]Broker, error) { // DeleteTopics deletes the specified topics. func (c *Conn) DeleteTopics(topics ...string) error { - _, err := c.deleteTopics(deleteTopicsRequestV1{ + _, err := c.deleteTopics(deleteTopicsRequestV0{ Topics: topics, }) return err @@ -368,12 +368,17 @@ func (c *Conn) heartbeat(request heartbeatRequestV0) (heartbeatResponseV0, error // joinGroup attempts to join a consumer group // // See http://kafka.apache.org/protocol.html#The_Messages_JoinGroup -func (c *Conn) joinGroup(request joinGroupRequestV2) (joinGroupResponseV1, error) { - var response joinGroupResponseV1 +func (c *Conn) joinGroup(request joinGroupRequestV1) (joinGroupResponse, error) { + version, err := c.negotiateVersion(joinGroup, v1, v2) + if err != nil { + return joinGroupResponse{}, err + } - err := c.writeOperation( + response := joinGroupResponse{v: version} + + err = c.writeOperation( func(deadline time.Time, id int32) error { - return c.writeRequest(joinGroup, v2, id, request) + return c.writeRequest(joinGroup, version, id, request) }, func(deadline time.Time, size int) error { return expectZeroSize(func() (remain int, err error) { @@ -382,10 +387,10 @@ func (c *Conn) joinGroup(request joinGroupRequestV2) (joinGroupResponseV1, error }, ) if err != nil { - return joinGroupResponseV1{}, err + return joinGroupResponse{}, err } if response.ErrorCode != 0 { - return joinGroupResponseV1{}, Error(response.ErrorCode) + return joinGroupResponse{}, Error(response.ErrorCode) } return response, nil diff --git a/conn_test.go b/conn_test.go index bd94a752b..0ec09c114 100644 --- a/conn_test.go +++ b/conn_test.go @@ -680,10 +680,10 @@ func waitForCoordinator(t *testing.T, conn *Conn, groupID string) { func createGroup(t *testing.T, conn *Conn, groupID string) (generationID int32, memberID string, stop func()) { waitForCoordinator(t, conn, groupID) - join := func() (joinGroup joinGroupResponseV1) { + join := func() (joinGroup joinGroupResponse) { var err error for attempt := 0; attempt < 10; attempt++ { - joinGroup, err = conn.joinGroup(joinGroupRequestV2{ + joinGroup, err = conn.joinGroup(joinGroupRequestV1{ GroupID: groupID, SessionTimeout: int32(time.Minute / time.Millisecond), RebalanceTimeout: int32(time.Second / time.Millisecond), @@ -771,7 +771,7 @@ func testConnFindCoordinator(t *testing.T, conn *Conn) { } func testConnJoinGroupInvalidGroupID(t *testing.T, conn *Conn) { - _, err := conn.joinGroup(joinGroupRequestV2{}) + _, err := conn.joinGroup(joinGroupRequestV1{}) if !errors.Is(err, InvalidGroupId) && !errors.Is(err, NotCoordinatorForGroup) { t.Fatalf("expected %v or %v; got %v", InvalidGroupId, NotCoordinatorForGroup, err) } @@ -781,7 +781,7 @@ func testConnJoinGroupInvalidSessionTimeout(t *testing.T, conn *Conn) { groupID := makeGroupID() waitForCoordinator(t, conn, groupID) - _, err := conn.joinGroup(joinGroupRequestV2{ + _, err := conn.joinGroup(joinGroupRequestV1{ GroupID: groupID, }) if !errors.Is(err, InvalidSessionTimeout) && !errors.Is(err, NotCoordinatorForGroup) { @@ -793,7 +793,7 @@ func testConnJoinGroupInvalidRefreshTimeout(t *testing.T, conn *Conn) { groupID := makeGroupID() waitForCoordinator(t, conn, groupID) - _, err := conn.joinGroup(joinGroupRequestV2{ + _, err := conn.joinGroup(joinGroupRequestV1{ GroupID: groupID, SessionTimeout: int32(3 * time.Second / time.Millisecond), }) diff --git a/consumergroup.go b/consumergroup.go index 5fa5bd42b..41cf03531 100644 --- a/consumergroup.go +++ b/consumergroup.go @@ -555,7 +555,7 @@ func (g *Generation) partitionWatcher(interval time.Duration, topic string) { type coordinator interface { io.Closer findCoordinator(findCoordinatorRequestV0) (findCoordinatorResponseV0, error) - joinGroup(joinGroupRequestV2) (joinGroupResponseV1, error) + joinGroup(joinGroupRequestV1) (joinGroupResponse, error) syncGroup(syncGroupRequestV0) (syncGroupResponseV0, error) leaveGroup(leaveGroupRequestV0) (leaveGroupResponseV0, error) heartbeat(heartbeatRequestV0) (heartbeatResponseV0, error) @@ -588,11 +588,11 @@ func (t *timeoutCoordinator) findCoordinator(req findCoordinatorRequestV0) (find return t.conn.findCoordinator(req) } -func (t *timeoutCoordinator) joinGroup(req joinGroupRequestV2) (joinGroupResponseV1, error) { +func (t *timeoutCoordinator) joinGroup(req joinGroupRequestV1) (joinGroupResponse, error) { // in the case of join group, the consumer group coordinator may wait up // to rebalance timeout in order to wait for all members to join. if err := t.conn.SetDeadline(time.Now().Add(t.timeout + t.rebalanceTimeout)); err != nil { - return joinGroupResponseV1{}, err + return joinGroupResponse{}, err } return t.conn.joinGroup(req) } @@ -932,7 +932,7 @@ func (cg *ConsumerGroup) coordinator() (coordinator, error) { // * InvalidSessionTimeout: // * GroupAuthorizationFailed: func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, int32, GroupMemberAssignments, error) { - request, err := cg.makeJoinGroupRequestV2(memberID) + request, err := cg.makeJoinGroupRequestV1(memberID) if err != nil { return "", 0, nil, err } @@ -978,8 +978,8 @@ func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, i // makeJoinGroupRequestV1 handles the logic of constructing a joinGroup // request. -func (cg *ConsumerGroup) makeJoinGroupRequestV2(memberID string) (joinGroupRequestV2, error) { - request := joinGroupRequestV2{ +func (cg *ConsumerGroup) makeJoinGroupRequestV1(memberID string) (joinGroupRequestV1, error) { + request := joinGroupRequestV1{ GroupID: cg.config.ID, MemberID: memberID, SessionTimeout: int32(cg.config.SessionTimeout / time.Millisecond), @@ -990,7 +990,7 @@ func (cg *ConsumerGroup) makeJoinGroupRequestV2(memberID string) (joinGroupReque for _, balancer := range cg.config.GroupBalancers { userData, err := balancer.UserData() if err != nil { - return joinGroupRequestV2{}, fmt.Errorf("unable to construct protocol metadata for member, %v: %w", balancer.ProtocolName(), err) + return joinGroupRequestV1{}, fmt.Errorf("unable to construct protocol metadata for member, %v: %w", balancer.ProtocolName(), err) } request.GroupProtocols = append(request.GroupProtocols, joinGroupRequestGroupProtocolV1{ ProtocolName: balancer.ProtocolName(), @@ -1007,7 +1007,7 @@ func (cg *ConsumerGroup) makeJoinGroupRequestV2(memberID string) (joinGroupReque // assignTopicPartitions uses the selected GroupBalancer to assign members to // their various partitions. -func (cg *ConsumerGroup) assignTopicPartitions(conn coordinator, group joinGroupResponseV1) (GroupMemberAssignments, error) { +func (cg *ConsumerGroup) assignTopicPartitions(conn coordinator, group joinGroupResponse) (GroupMemberAssignments, error) { cg.withLogger(func(l Logger) { l.Printf("selected as leader for group, %s\n", cg.config.ID) }) diff --git a/consumergroup_test.go b/consumergroup_test.go index 1c7a065f9..bbfecb42f 100644 --- a/consumergroup_test.go +++ b/consumergroup_test.go @@ -15,7 +15,7 @@ var _ coordinator = mockCoordinator{} type mockCoordinator struct { closeFunc func() error findCoordinatorFunc func(findCoordinatorRequestV0) (findCoordinatorResponseV0, error) - joinGroupFunc func(joinGroupRequestV2) (joinGroupResponseV1, error) + joinGroupFunc func(joinGroupRequestV1) (joinGroupResponse, error) syncGroupFunc func(syncGroupRequestV0) (syncGroupResponseV0, error) leaveGroupFunc func(leaveGroupRequestV0) (leaveGroupResponseV0, error) heartbeatFunc func(heartbeatRequestV0) (heartbeatResponseV0, error) @@ -38,9 +38,9 @@ func (c mockCoordinator) findCoordinator(req findCoordinatorRequestV0) (findCoor return c.findCoordinatorFunc(req) } -func (c mockCoordinator) joinGroup(req joinGroupRequestV2) (joinGroupResponseV1, error) { +func (c mockCoordinator) joinGroup(req joinGroupRequestV1) (joinGroupResponse, error) { if c.joinGroupFunc == nil { - return joinGroupResponseV1{}, errors.New("no joinGroup behavior specified") + return joinGroupResponse{}, errors.New("no joinGroup behavior specified") } return c.joinGroupFunc(req) } @@ -140,8 +140,9 @@ func TestReaderAssignTopicPartitions(t *testing.T) { }, } - newJoinGroupResponseV1 := func(topicsByMemberID map[string][]string) joinGroupResponseV1 { - resp := joinGroupResponseV1{ + newJoinGroupResponseV1 := func(topicsByMemberID map[string][]string) joinGroupResponse { + resp := joinGroupResponse{ + v: v1, GroupProtocol: RoundRobinGroupBalancer{}.ProtocolName(), } @@ -158,7 +159,7 @@ func TestReaderAssignTopicPartitions(t *testing.T) { } testCases := map[string]struct { - Members joinGroupResponseV1 + Members joinGroupResponse Assignments GroupMemberAssignments }{ "nil": { @@ -419,8 +420,8 @@ func TestConsumerGroupErrors(t *testing.T) { }, }, nil } - mc.joinGroupFunc = func(joinGroupRequestV2) (joinGroupResponseV1, error) { - return joinGroupResponseV1{}, errors.New("join group failed") + mc.joinGroupFunc = func(joinGroupRequestV1) (joinGroupResponse, error) { + return joinGroupResponse{}, errors.New("join group failed") } // NOTE : no stub for leaving the group b/c the member never joined. }, @@ -449,8 +450,8 @@ func TestConsumerGroupErrors(t *testing.T) { }, }, nil } - mc.joinGroupFunc = func(joinGroupRequestV2) (joinGroupResponseV1, error) { - return joinGroupResponseV1{ + mc.joinGroupFunc = func(joinGroupRequestV1) (joinGroupResponse, error) { + return joinGroupResponse{ ErrorCode: int16(InvalidTopic), }, nil } @@ -472,8 +473,8 @@ func TestConsumerGroupErrors(t *testing.T) { { scenario: "fails to join group (leader, unsupported protocol)", prepare: func(mc *mockCoordinator) { - mc.joinGroupFunc = func(joinGroupRequestV2) (joinGroupResponseV1, error) { - return joinGroupResponseV1{ + mc.joinGroupFunc = func(joinGroupRequestV1) (joinGroupResponse, error) { + return joinGroupResponse{ GenerationID: 12345, GroupProtocol: "foo", LeaderID: "abc", @@ -498,8 +499,8 @@ func TestConsumerGroupErrors(t *testing.T) { { scenario: "fails to sync group (general error)", prepare: func(mc *mockCoordinator) { - mc.joinGroupFunc = func(joinGroupRequestV2) (joinGroupResponseV1, error) { - return joinGroupResponseV1{ + mc.joinGroupFunc = func(joinGroupRequestV1) (joinGroupResponse, error) { + return joinGroupResponse{ GenerationID: 12345, GroupProtocol: "range", LeaderID: "abc", diff --git a/createtopics.go b/createtopics.go index 1bf61472f..708a314d7 100644 --- a/createtopics.go +++ b/createtopics.go @@ -262,7 +262,9 @@ func (t createTopicsRequestV0Topic) writeTo(wb *writeBuffer) { } // See http://kafka.apache.org/protocol.html#The_Messages_CreateTopics -type createTopicsRequestV2 struct { +type createTopicsRequest struct { + v apiVersion + // Topics contains n array of single topic creation requests. Can not // have multiple entries for the same topic. Topics []createTopicsRequestV0Topic @@ -276,18 +278,26 @@ type createTopicsRequestV2 struct { ValidateOnly bool } -func (t createTopicsRequestV2) size() int32 { - return sizeofArray(len(t.Topics), func(i int) int32 { return t.Topics[i].size() }) + - sizeofInt32(t.Timeout) + 1 +func (t createTopicsRequest) size() int32 { + sz := sizeofArray(len(t.Topics), func(i int) int32 { return t.Topics[i].size() }) + + sizeofInt32(t.Timeout) + if t.v >= v1 { + sz += 1 + } + return sz } -func (t createTopicsRequestV2) writeTo(wb *writeBuffer) { +func (t createTopicsRequest) writeTo(wb *writeBuffer) { wb.writeArray(len(t.Topics), func(i int) { t.Topics[i].writeTo(wb) }) wb.writeInt32(t.Timeout) - wb.writeBool(t.ValidateOnly) + if t.v >= v1 { + wb.writeBool(t.ValidateOnly) + } } -type createTopicsResponseV1TopicError struct { +type createTopicsResponseTopicError struct { + v apiVersion + // Topic name Topic string @@ -298,57 +308,75 @@ type createTopicsResponseV1TopicError struct { ErrorMessage string } -func (t createTopicsResponseV1TopicError) size() int32 { - return sizeofString(t.Topic) + - sizeofInt16(t.ErrorCode) + - sizeofString(t.ErrorMessage) +func (t createTopicsResponseTopicError) size() int32 { + sz := sizeofString(t.Topic) + + sizeofInt16(t.ErrorCode) + if t.v >= v1 { + sz += sizeofString(t.ErrorMessage) + } + return sz } -func (t createTopicsResponseV1TopicError) writeTo(wb *writeBuffer) { +func (t createTopicsResponseTopicError) writeTo(wb *writeBuffer) { wb.writeString(t.Topic) wb.writeInt16(t.ErrorCode) - wb.writeString(t.ErrorMessage) + if t.v >= v1 { + wb.writeString(t.ErrorMessage) + } } -func (t *createTopicsResponseV1TopicError) readFrom(r *bufio.Reader, size int) (remain int, err error) { +func (t *createTopicsResponseTopicError) readFrom(r *bufio.Reader, size int) (remain int, err error) { if remain, err = readString(r, size, &t.Topic); err != nil { return } if remain, err = readInt16(r, remain, &t.ErrorCode); err != nil { return } - if remain, err = readString(r, remain, &t.ErrorMessage); err != nil { - return + if t.v >= v1 { + if remain, err = readString(r, remain, &t.ErrorMessage); err != nil { + return + } } return } // See http://kafka.apache.org/protocol.html#The_Messages_CreateTopics -type createTopicsResponseV1 struct { - ThrottleTime int32 - TopicErrors []createTopicsResponseV1TopicError +type createTopicsResponse struct { + v apiVersion + + ThrottleTime int32 // v1+ + TopicErrors []createTopicsResponseTopicError } -func (t createTopicsResponseV1) size() int32 { - return sizeofInt32(t.ThrottleTime) + sizeofArray(len(t.TopicErrors), func(i int) int32 { return t.TopicErrors[i].size() }) +func (t createTopicsResponse) size() int32 { + sz := sizeofArray(len(t.TopicErrors), func(i int) int32 { return t.TopicErrors[i].size() }) + if t.v >= v1 { + sz += sizeofInt32(t.ThrottleTime) + } + return sz } -func (t createTopicsResponseV1) writeTo(wb *writeBuffer) { - wb.writeInt32(t.ThrottleTime) +func (t createTopicsResponse) writeTo(wb *writeBuffer) { + if t.v >= v1 { + wb.writeInt32(t.ThrottleTime) + } wb.writeArray(len(t.TopicErrors), func(i int) { t.TopicErrors[i].writeTo(wb) }) } -func (t *createTopicsResponseV1) readFrom(r *bufio.Reader, size int) (remain int, err error) { +func (t *createTopicsResponse) readFrom(r *bufio.Reader, size int) (remain int, err error) { fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) { - var topic createTopicsResponseV1TopicError + topic := createTopicsResponseTopicError{v: t.v} if fnRemain, fnErr = (&topic).readFrom(r, size); fnErr != nil { return } t.TopicErrors = append(t.TopicErrors, topic) return } - if remain, err = readInt32(r, size, &t.ThrottleTime); err != nil { - return + remain = size + if t.v >= v1 { + if remain, err = readInt32(r, size, &t.ThrottleTime); err != nil { + return + } } if remain, err = readArrayWith(r, remain, fn); err != nil { return @@ -357,17 +385,23 @@ func (t *createTopicsResponseV1) readFrom(r *bufio.Reader, size int) (remain int return } -func (c *Conn) createTopics(request createTopicsRequestV2) (createTopicsResponseV1, error) { - var response createTopicsResponseV1 +func (c *Conn) createTopics(request createTopicsRequest) (createTopicsResponse, error) { + version, err := c.negotiateVersion(createTopics, v0, v2) + if err != nil { + return createTopicsResponse{}, err + } + + request.v = version + response := createTopicsResponse{v: version} - err := c.writeOperation( + err = c.writeOperation( func(deadline time.Time, id int32) error { if request.Timeout == 0 { now := time.Now() deadline = adjustDeadlineForRTT(deadline, now, defaultRTT) request.Timeout = milliseconds(deadlineToTimeout(deadline, now)) } - return c.writeRequest(createTopics, v2, id, request) + return c.writeRequest(createTopics, version, id, request) }, func(deadline time.Time, size int) error { return expectZeroSize(func() (remain int, err error) { @@ -401,7 +435,7 @@ func (c *Conn) CreateTopics(topics ...TopicConfig) error { t.toCreateTopicsRequestV0Topic()) } - _, err := c.createTopics(createTopicsRequestV2{ + _, err := c.createTopics(createTopicsRequest{ Topics: requestV0Topics, }) return err diff --git a/createtopics_test.go b/createtopics_test.go index 78bba5bc1..b3d080247 100644 --- a/createtopics_test.go +++ b/createtopics_test.go @@ -161,8 +161,8 @@ func TestClientCreateTopics(t *testing.T) { } func TestCreateTopicsResponseV1(t *testing.T) { - item := createTopicsResponseV1{ - TopicErrors: []createTopicsResponseV1TopicError{ + item := createTopicsResponse{ + TopicErrors: []createTopicsResponseTopicError{ { Topic: "topic", ErrorCode: 2, @@ -174,7 +174,7 @@ func TestCreateTopicsResponseV1(t *testing.T) { w := &writeBuffer{w: b} item.writeTo(w) - var found createTopicsResponseV1 + var found createTopicsResponse remain, err := (&found).readFrom(bufio.NewReader(b), b.Len()) if err != nil { t.Error(err) diff --git a/deletetopics.go b/deletetopics.go index e3c3be198..a3674b4d7 100644 --- a/deletetopics.go +++ b/deletetopics.go @@ -67,7 +67,7 @@ func (c *Client) DeleteTopics(ctx context.Context, req *DeleteTopicsRequest) (*D } // See http://kafka.apache.org/protocol.html#The_Messages_DeleteTopics -type deleteTopicsRequestV1 struct { +type deleteTopicsRequestV0 struct { // Topics holds the topic names Topics []string @@ -77,28 +77,33 @@ type deleteTopicsRequestV1 struct { Timeout int32 } -func (t deleteTopicsRequestV1) size() int32 { +func (t deleteTopicsRequestV0) size() int32 { return sizeofStringArray(t.Topics) + sizeofInt32(t.Timeout) } -func (t deleteTopicsRequestV1) writeTo(wb *writeBuffer) { +func (t deleteTopicsRequestV0) writeTo(wb *writeBuffer) { wb.writeStringArray(t.Topics) wb.writeInt32(t.Timeout) } -type deleteTopicsResponseV1 struct { +type deleteTopicsResponse struct { + v apiVersion + ThrottleTime int32 // TopicErrorCodes holds per topic error codes TopicErrorCodes []deleteTopicsResponseV0TopicErrorCode } -func (t deleteTopicsResponseV1) size() int32 { - return sizeofInt32(t.ThrottleTime) + - sizeofArray(len(t.TopicErrorCodes), func(i int) int32 { return t.TopicErrorCodes[i].size() }) +func (t deleteTopicsResponse) size() int32 { + sz := sizeofArray(len(t.TopicErrorCodes), func(i int) int32 { return t.TopicErrorCodes[i].size() }) + if t.v >= v1 { + sz += sizeofInt32(t.ThrottleTime) + } + return sz } -func (t *deleteTopicsResponseV1) readFrom(r *bufio.Reader, size int) (remain int, err error) { +func (t *deleteTopicsResponse) readFrom(r *bufio.Reader, size int) (remain int, err error) { fn := func(withReader *bufio.Reader, withSize int) (fnRemain int, fnErr error) { var item deleteTopicsResponseV0TopicErrorCode if fnRemain, fnErr = (&item).readFrom(withReader, withSize); fnErr != nil { @@ -107,8 +112,11 @@ func (t *deleteTopicsResponseV1) readFrom(r *bufio.Reader, size int) (remain int t.TopicErrorCodes = append(t.TopicErrorCodes, item) return } - if remain, err = readInt32(r, size, &t.ThrottleTime); err != nil { - return + remain = size + if t.v >= v1 { + if remain, err = readInt32(r, size, &t.ThrottleTime); err != nil { + return + } } if remain, err = readArrayWith(r, remain, fn); err != nil { return @@ -116,8 +124,10 @@ func (t *deleteTopicsResponseV1) readFrom(r *bufio.Reader, size int) (remain int return } -func (t deleteTopicsResponseV1) writeTo(wb *writeBuffer) { - wb.writeInt32(t.ThrottleTime) +func (t deleteTopicsResponse) writeTo(wb *writeBuffer) { + if t.v >= v1 { + wb.writeInt32(t.ThrottleTime) + } wb.writeArray(len(t.TopicErrorCodes), func(i int) { t.TopicErrorCodes[i].writeTo(wb) }) } @@ -152,16 +162,24 @@ func (t deleteTopicsResponseV0TopicErrorCode) writeTo(wb *writeBuffer) { // deleteTopics deletes the specified topics. // // See http://kafka.apache.org/protocol.html#The_Messages_DeleteTopics -func (c *Conn) deleteTopics(request deleteTopicsRequestV1) (deleteTopicsResponseV1, error) { - var response deleteTopicsResponseV1 - err := c.writeOperation( +func (c *Conn) deleteTopics(request deleteTopicsRequestV0) (deleteTopicsResponse, error) { + version, err := c.negotiateVersion(deleteTopics, v0, v1) + if err != nil { + return deleteTopicsResponse{}, err + } + + response := deleteTopicsResponse{ + v: version, + } + + err = c.writeOperation( func(deadline time.Time, id int32) error { if request.Timeout == 0 { now := time.Now() deadline = adjustDeadlineForRTT(deadline, now, defaultRTT) request.Timeout = milliseconds(deadlineToTimeout(deadline, now)) } - return c.writeRequest(deleteTopics, v1, id, request) + return c.writeRequest(deleteTopics, version, id, request) }, func(deadline time.Time, size int) error { return expectZeroSize(func() (remain int, err error) { @@ -170,7 +188,7 @@ func (c *Conn) deleteTopics(request deleteTopicsRequestV1) (deleteTopicsResponse }, ) if err != nil { - return deleteTopicsResponseV1{}, err + return deleteTopicsResponse{}, err } for _, c := range response.TopicErrorCodes { if c.ErrorCode != 0 { diff --git a/deletetopics_test.go b/deletetopics_test.go index 98c6cd00d..4dc681831 100644 --- a/deletetopics_test.go +++ b/deletetopics_test.go @@ -29,7 +29,7 @@ func TestClientDeleteTopics(t *testing.T) { } func TestDeleteTopicsResponseV1(t *testing.T) { - item := deleteTopicsResponseV1{ + item := deleteTopicsResponse{ TopicErrorCodes: []deleteTopicsResponseV0TopicErrorCode{ { Topic: "a", @@ -42,7 +42,7 @@ func TestDeleteTopicsResponseV1(t *testing.T) { w := &writeBuffer{w: b} item.writeTo(w) - var found deleteTopicsResponseV1 + var found deleteTopicsResponse remain, err := (&found).readFrom(bufio.NewReader(b), b.Len()) if err != nil { t.Fatal(err) diff --git a/joingroup.go b/joingroup.go index d76a27491..c5a09eb52 100644 --- a/joingroup.go +++ b/joingroup.go @@ -241,7 +241,7 @@ func (t joinGroupRequestGroupProtocolV1) writeTo(wb *writeBuffer) { wb.writeBytes(t.ProtocolMetadata) } -type joinGroupRequestV2 struct { +type joinGroupRequestV1 struct { // GroupID holds the unique group identifier GroupID string @@ -264,7 +264,7 @@ type joinGroupRequestV2 struct { GroupProtocols []joinGroupRequestGroupProtocolV1 } -func (t joinGroupRequestV2) size() int32 { +func (t joinGroupRequestV1) size() int32 { return sizeofString(t.GroupID) + sizeofInt32(t.SessionTimeout) + sizeofInt32(t.RebalanceTimeout) + @@ -273,7 +273,7 @@ func (t joinGroupRequestV2) size() int32 { sizeofArray(len(t.GroupProtocols), func(i int) int32 { return t.GroupProtocols[i].size() }) } -func (t joinGroupRequestV2) writeTo(wb *writeBuffer) { +func (t joinGroupRequestV1) writeTo(wb *writeBuffer) { wb.writeString(t.GroupID) wb.writeInt32(t.SessionTimeout) wb.writeInt32(t.RebalanceTimeout) @@ -308,7 +308,11 @@ func (t *joinGroupResponseMemberV1) readFrom(r *bufio.Reader, size int) (remain return } -type joinGroupResponseV1 struct { +type joinGroupResponse struct { + v apiVersion + + ThrottleTime int32 + // ErrorCode holds response error code ErrorCode int16 @@ -326,16 +330,23 @@ type joinGroupResponseV1 struct { Members []joinGroupResponseMemberV1 } -func (t joinGroupResponseV1) size() int32 { - return sizeofInt16(t.ErrorCode) + +func (t joinGroupResponse) size() int32 { + sz := sizeofInt16(t.ErrorCode) + sizeofInt32(t.GenerationID) + sizeofString(t.GroupProtocol) + sizeofString(t.LeaderID) + sizeofString(t.MemberID) + sizeofArray(len(t.MemberID), func(i int) int32 { return t.Members[i].size() }) + if t.v >= v2 { + sz += sizeofInt32(t.ThrottleTime) + } + return sz } -func (t joinGroupResponseV1) writeTo(wb *writeBuffer) { +func (t joinGroupResponse) writeTo(wb *writeBuffer) { + if t.v >= v2 { + wb.writeInt32(t.ThrottleTime) + } wb.writeInt16(t.ErrorCode) wb.writeInt32(t.GenerationID) wb.writeString(t.GroupProtocol) @@ -344,8 +355,14 @@ func (t joinGroupResponseV1) writeTo(wb *writeBuffer) { wb.writeArray(len(t.Members), func(i int) { t.Members[i].writeTo(wb) }) } -func (t *joinGroupResponseV1) readFrom(r *bufio.Reader, size int) (remain int, err error) { - if remain, err = readInt16(r, size, &t.ErrorCode); err != nil { +func (t *joinGroupResponse) readFrom(r *bufio.Reader, size int) (remain int, err error) { + remain = size + if t.v >= v2 { + if remain, err = readInt32(r, remain, &t.ThrottleTime); err != nil { + return + } + } + if remain, err = readInt16(r, remain, &t.ErrorCode); err != nil { return } if remain, err = readInt32(r, remain, &t.GenerationID); err != nil { diff --git a/joingroup_test.go b/joingroup_test.go index 926f5b4a6..f98f97f7a 100644 --- a/joingroup_test.go +++ b/joingroup_test.go @@ -218,7 +218,9 @@ func TestMemberMetadata(t *testing.T) { } func TestJoinGroupResponseV1(t *testing.T) { - item := joinGroupResponseV1{ + const version = v1 + item := joinGroupResponse{ + v: version, ErrorCode: 2, GenerationID: 3, GroupProtocol: "a", @@ -236,7 +238,7 @@ func TestJoinGroupResponseV1(t *testing.T) { w := &writeBuffer{w: b} item.writeTo(w) - var found joinGroupResponseV1 + found := joinGroupResponse{v: version} remain, err := (&found).readFrom(bufio.NewReader(b), b.Len()) if err != nil { t.Error(err) diff --git a/reader_test.go b/reader_test.go index 0dc593d8c..83d240cb0 100644 --- a/reader_test.go +++ b/reader_test.go @@ -301,7 +301,7 @@ func createTopic(t *testing.T, topic string, partitions int) { conn.SetDeadline(time.Now().Add(10 * time.Second)) - _, err = conn.createTopics(createTopicsRequestV2{ + _, err = conn.createTopics(createTopicsRequest{ Topics: []createTopicsRequestV0Topic{ { Topic: topic, From 5a30e00e7cb5c9a545ea93db3474b01678b6162b Mon Sep 17 00:00:00 2001 From: maxwolf8852 Date: Thu, 1 May 2025 01:07:51 +0300 Subject: [PATCH 03/12] review fixes --- .circleci/config.yml | 47 ++++++++++++++++ alterpartitionreassignments_test.go | 3 + conn.go | 4 +- conn_test.go | 8 +-- consumergroup.go | 14 ++--- consumergroup_test.go | 86 +++++++++++++++-------------- createtopics.go | 12 ++-- createtopics_test.go | 53 ++++++++++-------- deletetopics.go | 10 ++-- joingroup.go | 20 +++---- joingroup_test.go | 64 ++++++++++----------- listgroups.go | 2 +- offsetfetch.go | 2 +- 13 files changed, 194 insertions(+), 131 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 96bcf0280..eb2ccbf8f 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -278,6 +278,52 @@ jobs: entrypoint: *entrypoint steps: *steps + kafka-400: + working_directory: *working_directory + environment: + KAFKA_VERSION: "4.0.0" + + # Need to skip nettest to avoid these kinds of errors: + # --- FAIL: TestConn/nettest (17.56s) + # --- FAIL: TestConn/nettest/PingPong (7.40s) + # conntest.go:112: unexpected Read error: [7] Request Timed Out: the request exceeded the user-specified time limit in the request + # conntest.go:118: mismatching value: got 77, want 78 + # conntest.go:118: mismatching value: got 78, want 79 + # ... + # + # TODO: Figure out why these are happening and fix them (they don't appear to be new). + KAFKA_SKIP_NETTEST: "1" + docker: + - image: circleci/golang + - image: bitnami/kafka:4.0.0 + ports: + - 9092:9092 + - 9093:9093 + environment: + KAFKA_CFG_NODE_ID: 1 + KAFKA_CFG_BROKER_ID: 1 + KAFKA_CFG_PROCESS_ROLES: broker,controller + KAFKA_CFG_ADVERTISED_HOST_NAME: 'localhost' + KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAIN:PLAINTEXT,SASL:SASL_PLAINTEXT + KAFKA_CFG_LISTENERS: CONTROLLER://:9094,PLAIN://:9092,SASL://:9093 + KAFKA_CFG_ADVERTISED_LISTENERS: PLAIN://localhost:9092,SASL://localhost:9093 + KAFKA_CFG_INTER_BROKER_LISTENER_NAME: PLAIN + KAFKA_CFG_SASL_ENABLED_MECHANISMS: 'PLAIN,SCRAM-SHA-256,SCRAM-SHA-512' + KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@localhost:9094 + ALLOW_PLAINTEXT_LISTENER: yes + KAFKA_CFG_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true' + KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/bitnami/kafka/config/kafka_jaas.conf" + KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'true' + KAFKA_CFG_DELETE_TOPIC_ENABLE: 'true' + KAFKA_CFG_MESSAGE_MAX_BYTES: '200000000' + KAFKA_CFG_AUTHORIZER_CLASS_NAME: 'org.apache.kafka.metadata.authorizer.StandardAuthorizer' + entrypoint: + - "/bin/bash" + - "-c" + - echo -e 'KafkaServer {\norg.apache.kafka.common.security.scram.ScramLoginModule required\n username="adminscram"\n password="admin-secret";\n org.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n };' > /opt/bitnami/kafka/config/kafka_jaas.conf; exec /entrypoint.sh /run.sh + steps: *steps + workflows: version: 2 run: @@ -295,3 +341,4 @@ workflows: - kafka-260 - kafka-270 - kafka-281 + - kafka-400 diff --git a/alterpartitionreassignments_test.go b/alterpartitionreassignments_test.go index 7bbce8fff..48974c7c5 100644 --- a/alterpartitionreassignments_test.go +++ b/alterpartitionreassignments_test.go @@ -3,6 +3,7 @@ package kafka import ( "context" "testing" + "time" ktesting "github.com/segmentio/kafka-go/testing" ) @@ -35,6 +36,7 @@ func TestClientAlterPartitionReassignments(t *testing.T) { BrokerIDs: []int{1}, }, }, + Timeout: 5 * time.Second, }, ) @@ -96,6 +98,7 @@ func TestClientAlterPartitionReassignmentsMultiTopics(t *testing.T) { BrokerIDs: []int{1}, }, }, + Timeout: 5 * time.Second, }, ) diff --git a/conn.go b/conn.go index 777c3fb07..9f9f25903 100644 --- a/conn.go +++ b/conn.go @@ -306,7 +306,7 @@ func (c *Conn) Brokers() ([]Broker, error) { // DeleteTopics deletes the specified topics. func (c *Conn) DeleteTopics(topics ...string) error { - _, err := c.deleteTopics(deleteTopicsRequestV0{ + _, err := c.deleteTopics(deleteTopicsRequest{ Topics: topics, }) return err @@ -368,7 +368,7 @@ func (c *Conn) heartbeat(request heartbeatRequestV0) (heartbeatResponseV0, error // joinGroup attempts to join a consumer group // // See http://kafka.apache.org/protocol.html#The_Messages_JoinGroup -func (c *Conn) joinGroup(request joinGroupRequestV1) (joinGroupResponse, error) { +func (c *Conn) joinGroup(request joinGroupRequest) (joinGroupResponse, error) { version, err := c.negotiateVersion(joinGroup, v1, v2) if err != nil { return joinGroupResponse{}, err diff --git a/conn_test.go b/conn_test.go index 0ec09c114..ef5ce3071 100644 --- a/conn_test.go +++ b/conn_test.go @@ -683,7 +683,7 @@ func createGroup(t *testing.T, conn *Conn, groupID string) (generationID int32, join := func() (joinGroup joinGroupResponse) { var err error for attempt := 0; attempt < 10; attempt++ { - joinGroup, err = conn.joinGroup(joinGroupRequestV1{ + joinGroup, err = conn.joinGroup(joinGroupRequest{ GroupID: groupID, SessionTimeout: int32(time.Minute / time.Millisecond), RebalanceTimeout: int32(time.Second / time.Millisecond), @@ -771,7 +771,7 @@ func testConnFindCoordinator(t *testing.T, conn *Conn) { } func testConnJoinGroupInvalidGroupID(t *testing.T, conn *Conn) { - _, err := conn.joinGroup(joinGroupRequestV1{}) + _, err := conn.joinGroup(joinGroupRequest{}) if !errors.Is(err, InvalidGroupId) && !errors.Is(err, NotCoordinatorForGroup) { t.Fatalf("expected %v or %v; got %v", InvalidGroupId, NotCoordinatorForGroup, err) } @@ -781,7 +781,7 @@ func testConnJoinGroupInvalidSessionTimeout(t *testing.T, conn *Conn) { groupID := makeGroupID() waitForCoordinator(t, conn, groupID) - _, err := conn.joinGroup(joinGroupRequestV1{ + _, err := conn.joinGroup(joinGroupRequest{ GroupID: groupID, }) if !errors.Is(err, InvalidSessionTimeout) && !errors.Is(err, NotCoordinatorForGroup) { @@ -793,7 +793,7 @@ func testConnJoinGroupInvalidRefreshTimeout(t *testing.T, conn *Conn) { groupID := makeGroupID() waitForCoordinator(t, conn, groupID) - _, err := conn.joinGroup(joinGroupRequestV1{ + _, err := conn.joinGroup(joinGroupRequest{ GroupID: groupID, SessionTimeout: int32(3 * time.Second / time.Millisecond), }) diff --git a/consumergroup.go b/consumergroup.go index 41cf03531..b32f90162 100644 --- a/consumergroup.go +++ b/consumergroup.go @@ -555,7 +555,7 @@ func (g *Generation) partitionWatcher(interval time.Duration, topic string) { type coordinator interface { io.Closer findCoordinator(findCoordinatorRequestV0) (findCoordinatorResponseV0, error) - joinGroup(joinGroupRequestV1) (joinGroupResponse, error) + joinGroup(joinGroupRequest) (joinGroupResponse, error) syncGroup(syncGroupRequestV0) (syncGroupResponseV0, error) leaveGroup(leaveGroupRequestV0) (leaveGroupResponseV0, error) heartbeat(heartbeatRequestV0) (heartbeatResponseV0, error) @@ -588,7 +588,7 @@ func (t *timeoutCoordinator) findCoordinator(req findCoordinatorRequestV0) (find return t.conn.findCoordinator(req) } -func (t *timeoutCoordinator) joinGroup(req joinGroupRequestV1) (joinGroupResponse, error) { +func (t *timeoutCoordinator) joinGroup(req joinGroupRequest) (joinGroupResponse, error) { // in the case of join group, the consumer group coordinator may wait up // to rebalance timeout in order to wait for all members to join. if err := t.conn.SetDeadline(time.Now().Add(t.timeout + t.rebalanceTimeout)); err != nil { @@ -932,7 +932,7 @@ func (cg *ConsumerGroup) coordinator() (coordinator, error) { // * InvalidSessionTimeout: // * GroupAuthorizationFailed: func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, int32, GroupMemberAssignments, error) { - request, err := cg.makeJoinGroupRequestV1(memberID) + request, err := cg.makeJoinGroupRequest(memberID) if err != nil { return "", 0, nil, err } @@ -978,8 +978,8 @@ func (cg *ConsumerGroup) joinGroup(conn coordinator, memberID string) (string, i // makeJoinGroupRequestV1 handles the logic of constructing a joinGroup // request. -func (cg *ConsumerGroup) makeJoinGroupRequestV1(memberID string) (joinGroupRequestV1, error) { - request := joinGroupRequestV1{ +func (cg *ConsumerGroup) makeJoinGroupRequest(memberID string) (joinGroupRequest, error) { + request := joinGroupRequest{ GroupID: cg.config.ID, MemberID: memberID, SessionTimeout: int32(cg.config.SessionTimeout / time.Millisecond), @@ -990,7 +990,7 @@ func (cg *ConsumerGroup) makeJoinGroupRequestV1(memberID string) (joinGroupReque for _, balancer := range cg.config.GroupBalancers { userData, err := balancer.UserData() if err != nil { - return joinGroupRequestV1{}, fmt.Errorf("unable to construct protocol metadata for member, %v: %w", balancer.ProtocolName(), err) + return joinGroupRequest{}, fmt.Errorf("unable to construct protocol metadata for member, %v: %w", balancer.ProtocolName(), err) } request.GroupProtocols = append(request.GroupProtocols, joinGroupRequestGroupProtocolV1{ ProtocolName: balancer.ProtocolName(), @@ -1050,7 +1050,7 @@ func (cg *ConsumerGroup) assignTopicPartitions(conn coordinator, group joinGroup } // makeMemberProtocolMetadata maps encoded member metadata ([]byte) into []GroupMember. -func (cg *ConsumerGroup) makeMemberProtocolMetadata(in []joinGroupResponseMemberV1) ([]GroupMember, error) { +func (cg *ConsumerGroup) makeMemberProtocolMetadata(in []joinGroupResponseMember) ([]GroupMember, error) { members := make([]GroupMember, 0, len(in)) for _, item := range in { metadata := groupMetadata{} diff --git a/consumergroup_test.go b/consumergroup_test.go index bbfecb42f..dbbe4ec47 100644 --- a/consumergroup_test.go +++ b/consumergroup_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "reflect" + "strconv" "strings" "sync" "testing" @@ -15,7 +16,7 @@ var _ coordinator = mockCoordinator{} type mockCoordinator struct { closeFunc func() error findCoordinatorFunc func(findCoordinatorRequestV0) (findCoordinatorResponseV0, error) - joinGroupFunc func(joinGroupRequestV1) (joinGroupResponse, error) + joinGroupFunc func(joinGroupRequest) (joinGroupResponse, error) syncGroupFunc func(syncGroupRequestV0) (syncGroupResponseV0, error) leaveGroupFunc func(leaveGroupRequestV0) (leaveGroupResponseV0, error) heartbeatFunc func(heartbeatRequestV0) (heartbeatResponseV0, error) @@ -38,7 +39,7 @@ func (c mockCoordinator) findCoordinator(req findCoordinatorRequestV0) (findCoor return c.findCoordinatorFunc(req) } -func (c mockCoordinator) joinGroup(req joinGroupRequestV1) (joinGroupResponse, error) { +func (c mockCoordinator) joinGroup(req joinGroupRequest) (joinGroupResponse, error) { if c.joinGroupFunc == nil { return joinGroupResponse{}, errors.New("no joinGroup behavior specified") } @@ -140,34 +141,36 @@ func TestReaderAssignTopicPartitions(t *testing.T) { }, } - newJoinGroupResponseV1 := func(topicsByMemberID map[string][]string) joinGroupResponse { - resp := joinGroupResponse{ - v: v1, - GroupProtocol: RoundRobinGroupBalancer{}.ProtocolName(), - } + newJoinGroupResponse := func(topicsByMemberID map[string][]string) func(v apiVersion) joinGroupResponse { + return func(v apiVersion) joinGroupResponse { + resp := joinGroupResponse{ + v: v, + GroupProtocol: RoundRobinGroupBalancer{}.ProtocolName(), + } - for memberID, topics := range topicsByMemberID { - resp.Members = append(resp.Members, joinGroupResponseMemberV1{ - MemberID: memberID, - MemberMetadata: groupMetadata{ - Topics: topics, - }.bytes(), - }) - } + for memberID, topics := range topicsByMemberID { + resp.Members = append(resp.Members, joinGroupResponseMember{ + MemberID: memberID, + MemberMetadata: groupMetadata{ + Topics: topics, + }.bytes(), + }) + } - return resp + return resp + } } testCases := map[string]struct { - Members joinGroupResponse + MembersFunc func(v apiVersion) joinGroupResponse Assignments GroupMemberAssignments }{ "nil": { - Members: newJoinGroupResponseV1(nil), + MembersFunc: newJoinGroupResponse(nil), Assignments: GroupMemberAssignments{}, }, "one member, one topic": { - Members: newJoinGroupResponseV1(map[string][]string{ + MembersFunc: newJoinGroupResponse(map[string][]string{ "member-1": {"topic-1"}, }), Assignments: GroupMemberAssignments{ @@ -177,7 +180,7 @@ func TestReaderAssignTopicPartitions(t *testing.T) { }, }, "one member, two topics": { - Members: newJoinGroupResponseV1(map[string][]string{ + MembersFunc: newJoinGroupResponse(map[string][]string{ "member-1": {"topic-1", "topic-2"}, }), Assignments: GroupMemberAssignments{ @@ -188,7 +191,7 @@ func TestReaderAssignTopicPartitions(t *testing.T) { }, }, "two members, one topic": { - Members: newJoinGroupResponseV1(map[string][]string{ + MembersFunc: newJoinGroupResponse(map[string][]string{ "member-1": {"topic-1"}, "member-2": {"topic-1"}, }), @@ -202,7 +205,7 @@ func TestReaderAssignTopicPartitions(t *testing.T) { }, }, "two members, two unshared topics": { - Members: newJoinGroupResponseV1(map[string][]string{ + MembersFunc: newJoinGroupResponse(map[string][]string{ "member-1": {"topic-1"}, "member-2": {"topic-2"}, }), @@ -217,21 +220,24 @@ func TestReaderAssignTopicPartitions(t *testing.T) { }, } + supportedVersions := []apiVersion{v1, v2} // joinGroup versions for label, tc := range testCases { - t.Run(label, func(t *testing.T) { - cg := ConsumerGroup{} - cg.config.GroupBalancers = []GroupBalancer{ - RangeGroupBalancer{}, - RoundRobinGroupBalancer{}, - } - assignments, err := cg.assignTopicPartitions(conn, tc.Members) - if err != nil { - t.Fatalf("bad err: %v", err) - } - if !reflect.DeepEqual(tc.Assignments, assignments) { - t.Errorf("expected %v; got %v", tc.Assignments, assignments) - } - }) + for _, v := range supportedVersions { + t.Run(label+"_v"+strconv.Itoa(int(v)), func(t *testing.T) { + cg := ConsumerGroup{} + cg.config.GroupBalancers = []GroupBalancer{ + RangeGroupBalancer{}, + RoundRobinGroupBalancer{}, + } + assignments, err := cg.assignTopicPartitions(conn, tc.MembersFunc(v)) + if err != nil { + t.Fatalf("bad err: %v", err) + } + if !reflect.DeepEqual(tc.Assignments, assignments) { + t.Errorf("expected %v; got %v", tc.Assignments, assignments) + } + }) + } } } @@ -420,7 +426,7 @@ func TestConsumerGroupErrors(t *testing.T) { }, }, nil } - mc.joinGroupFunc = func(joinGroupRequestV1) (joinGroupResponse, error) { + mc.joinGroupFunc = func(joinGroupRequest) (joinGroupResponse, error) { return joinGroupResponse{}, errors.New("join group failed") } // NOTE : no stub for leaving the group b/c the member never joined. @@ -450,7 +456,7 @@ func TestConsumerGroupErrors(t *testing.T) { }, }, nil } - mc.joinGroupFunc = func(joinGroupRequestV1) (joinGroupResponse, error) { + mc.joinGroupFunc = func(joinGroupRequest) (joinGroupResponse, error) { return joinGroupResponse{ ErrorCode: int16(InvalidTopic), }, nil @@ -473,7 +479,7 @@ func TestConsumerGroupErrors(t *testing.T) { { scenario: "fails to join group (leader, unsupported protocol)", prepare: func(mc *mockCoordinator) { - mc.joinGroupFunc = func(joinGroupRequestV1) (joinGroupResponse, error) { + mc.joinGroupFunc = func(joinGroupRequest) (joinGroupResponse, error) { return joinGroupResponse{ GenerationID: 12345, GroupProtocol: "foo", @@ -499,7 +505,7 @@ func TestConsumerGroupErrors(t *testing.T) { { scenario: "fails to sync group (general error)", prepare: func(mc *mockCoordinator) { - mc.joinGroupFunc = func(joinGroupRequestV1) (joinGroupResponse, error) { + mc.joinGroupFunc = func(joinGroupRequest) (joinGroupResponse, error) { return joinGroupResponse{ GenerationID: 12345, GroupProtocol: "range", diff --git a/createtopics.go b/createtopics.go index 708a314d7..3c02415bc 100644 --- a/createtopics.go +++ b/createtopics.go @@ -263,7 +263,7 @@ func (t createTopicsRequestV0Topic) writeTo(wb *writeBuffer) { // See http://kafka.apache.org/protocol.html#The_Messages_CreateTopics type createTopicsRequest struct { - v apiVersion + v apiVersion // v0, v1, v2 // Topics contains n array of single topic creation requests. Can not // have multiple entries for the same topic. @@ -344,20 +344,20 @@ func (t *createTopicsResponseTopicError) readFrom(r *bufio.Reader, size int) (re type createTopicsResponse struct { v apiVersion - ThrottleTime int32 // v1+ + ThrottleTime int32 // v2+ TopicErrors []createTopicsResponseTopicError } func (t createTopicsResponse) size() int32 { sz := sizeofArray(len(t.TopicErrors), func(i int) int32 { return t.TopicErrors[i].size() }) - if t.v >= v1 { + if t.v >= v2 { sz += sizeofInt32(t.ThrottleTime) } return sz } func (t createTopicsResponse) writeTo(wb *writeBuffer) { - if t.v >= v1 { + if t.v >= v2 { wb.writeInt32(t.ThrottleTime) } wb.writeArray(len(t.TopicErrors), func(i int) { t.TopicErrors[i].writeTo(wb) }) @@ -373,7 +373,7 @@ func (t *createTopicsResponse) readFrom(r *bufio.Reader, size int) (remain int, return } remain = size - if t.v >= v1 { + if t.v >= v2 { if remain, err = readInt32(r, size, &t.ThrottleTime); err != nil { return } @@ -386,7 +386,7 @@ func (t *createTopicsResponse) readFrom(r *bufio.Reader, size int) (remain int, } func (c *Conn) createTopics(request createTopicsRequest) (createTopicsResponse, error) { - version, err := c.negotiateVersion(createTopics, v0, v2) + version, err := c.negotiateVersion(createTopics, v0, v1, v2) if err != nil { return createTopicsResponse{}, err } diff --git a/createtopics_test.go b/createtopics_test.go index b3d080247..119d17094 100644 --- a/createtopics_test.go +++ b/createtopics_test.go @@ -160,32 +160,37 @@ func TestClientCreateTopics(t *testing.T) { } } -func TestCreateTopicsResponseV1(t *testing.T) { - item := createTopicsResponse{ - TopicErrors: []createTopicsResponseTopicError{ - { - Topic: "topic", - ErrorCode: 2, +func TestCreateTopicsResponse(t *testing.T) { + supportedVersions := []apiVersion{v0, v1, v2} + for _, v := range supportedVersions { + item := createTopicsResponse{ + v: v, + TopicErrors: []createTopicsResponseTopicError{ + { + v: v, + Topic: "topic", + ErrorCode: 2, + }, }, - }, - } + } - b := bytes.NewBuffer(nil) - w := &writeBuffer{w: b} - item.writeTo(w) + b := bytes.NewBuffer(nil) + w := &writeBuffer{w: b} + item.writeTo(w) - var found createTopicsResponse - remain, err := (&found).readFrom(bufio.NewReader(b), b.Len()) - if err != nil { - t.Error(err) - t.FailNow() - } - if remain != 0 { - t.Errorf("expected 0 remain, got %v", remain) - t.FailNow() - } - if !reflect.DeepEqual(item, found) { - t.Error("expected item and found to be the same") - t.FailNow() + found := createTopicsResponse{v: v} + remain, err := (&found).readFrom(bufio.NewReader(b), b.Len()) + if err != nil { + t.Error(err) + t.FailNow() + } + if remain != 0 { + t.Errorf("expected 0 remain, got %v", remain) + t.FailNow() + } + if !reflect.DeepEqual(item, found) { + t.Error("expected item and found to be the same") + t.FailNow() + } } } diff --git a/deletetopics.go b/deletetopics.go index a3674b4d7..ff73d553b 100644 --- a/deletetopics.go +++ b/deletetopics.go @@ -67,7 +67,7 @@ func (c *Client) DeleteTopics(ctx context.Context, req *DeleteTopicsRequest) (*D } // See http://kafka.apache.org/protocol.html#The_Messages_DeleteTopics -type deleteTopicsRequestV0 struct { +type deleteTopicsRequest struct { // Topics holds the topic names Topics []string @@ -77,18 +77,18 @@ type deleteTopicsRequestV0 struct { Timeout int32 } -func (t deleteTopicsRequestV0) size() int32 { +func (t deleteTopicsRequest) size() int32 { return sizeofStringArray(t.Topics) + sizeofInt32(t.Timeout) } -func (t deleteTopicsRequestV0) writeTo(wb *writeBuffer) { +func (t deleteTopicsRequest) writeTo(wb *writeBuffer) { wb.writeStringArray(t.Topics) wb.writeInt32(t.Timeout) } type deleteTopicsResponse struct { - v apiVersion + v apiVersion // v0, v1 ThrottleTime int32 // TopicErrorCodes holds per topic error codes @@ -162,7 +162,7 @@ func (t deleteTopicsResponseV0TopicErrorCode) writeTo(wb *writeBuffer) { // deleteTopics deletes the specified topics. // // See http://kafka.apache.org/protocol.html#The_Messages_DeleteTopics -func (c *Conn) deleteTopics(request deleteTopicsRequestV0) (deleteTopicsResponse, error) { +func (c *Conn) deleteTopics(request deleteTopicsRequest) (deleteTopicsResponse, error) { version, err := c.negotiateVersion(deleteTopics, v0, v1) if err != nil { return deleteTopicsResponse{}, err diff --git a/joingroup.go b/joingroup.go index c5a09eb52..f3d90a937 100644 --- a/joingroup.go +++ b/joingroup.go @@ -241,7 +241,7 @@ func (t joinGroupRequestGroupProtocolV1) writeTo(wb *writeBuffer) { wb.writeBytes(t.ProtocolMetadata) } -type joinGroupRequestV1 struct { +type joinGroupRequest struct { // GroupID holds the unique group identifier GroupID string @@ -264,7 +264,7 @@ type joinGroupRequestV1 struct { GroupProtocols []joinGroupRequestGroupProtocolV1 } -func (t joinGroupRequestV1) size() int32 { +func (t joinGroupRequest) size() int32 { return sizeofString(t.GroupID) + sizeofInt32(t.SessionTimeout) + sizeofInt32(t.RebalanceTimeout) + @@ -273,7 +273,7 @@ func (t joinGroupRequestV1) size() int32 { sizeofArray(len(t.GroupProtocols), func(i int) int32 { return t.GroupProtocols[i].size() }) } -func (t joinGroupRequestV1) writeTo(wb *writeBuffer) { +func (t joinGroupRequest) writeTo(wb *writeBuffer) { wb.writeString(t.GroupID) wb.writeInt32(t.SessionTimeout) wb.writeInt32(t.RebalanceTimeout) @@ -282,23 +282,23 @@ func (t joinGroupRequestV1) writeTo(wb *writeBuffer) { wb.writeArray(len(t.GroupProtocols), func(i int) { t.GroupProtocols[i].writeTo(wb) }) } -type joinGroupResponseMemberV1 struct { +type joinGroupResponseMember struct { // MemberID assigned by the group coordinator MemberID string MemberMetadata []byte } -func (t joinGroupResponseMemberV1) size() int32 { +func (t joinGroupResponseMember) size() int32 { return sizeofString(t.MemberID) + sizeofBytes(t.MemberMetadata) } -func (t joinGroupResponseMemberV1) writeTo(wb *writeBuffer) { +func (t joinGroupResponseMember) writeTo(wb *writeBuffer) { wb.writeString(t.MemberID) wb.writeBytes(t.MemberMetadata) } -func (t *joinGroupResponseMemberV1) readFrom(r *bufio.Reader, size int) (remain int, err error) { +func (t *joinGroupResponseMember) readFrom(r *bufio.Reader, size int) (remain int, err error) { if remain, err = readString(r, size, &t.MemberID); err != nil { return } @@ -309,7 +309,7 @@ func (t *joinGroupResponseMemberV1) readFrom(r *bufio.Reader, size int) (remain } type joinGroupResponse struct { - v apiVersion + v apiVersion // v1, v2 ThrottleTime int32 @@ -327,7 +327,7 @@ type joinGroupResponse struct { // MemberID assigned by the group coordinator MemberID string - Members []joinGroupResponseMemberV1 + Members []joinGroupResponseMember } func (t joinGroupResponse) size() int32 { @@ -379,7 +379,7 @@ func (t *joinGroupResponse) readFrom(r *bufio.Reader, size int) (remain int, err } fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) { - var item joinGroupResponseMemberV1 + var item joinGroupResponseMember if fnRemain, fnErr = (&item).readFrom(r, size); fnErr != nil { return } diff --git a/joingroup_test.go b/joingroup_test.go index f98f97f7a..73922d6a0 100644 --- a/joingroup_test.go +++ b/joingroup_test.go @@ -217,39 +217,41 @@ func TestMemberMetadata(t *testing.T) { } } -func TestJoinGroupResponseV1(t *testing.T) { - const version = v1 - item := joinGroupResponse{ - v: version, - ErrorCode: 2, - GenerationID: 3, - GroupProtocol: "a", - LeaderID: "b", - MemberID: "c", - Members: []joinGroupResponseMemberV1{ - { - MemberID: "d", - MemberMetadata: []byte("blah"), +func TestJoinGroupResponse(t *testing.T) { + supportedVersions := []apiVersion{v1, v2} + for _, v := range supportedVersions { + item := joinGroupResponse{ + v: v, + ErrorCode: 2, + GenerationID: 3, + GroupProtocol: "a", + LeaderID: "b", + MemberID: "c", + Members: []joinGroupResponseMember{ + { + MemberID: "d", + MemberMetadata: []byte("blah"), + }, }, - }, - } + } - b := bytes.NewBuffer(nil) - w := &writeBuffer{w: b} - item.writeTo(w) + b := bytes.NewBuffer(nil) + w := &writeBuffer{w: b} + item.writeTo(w) - found := joinGroupResponse{v: version} - remain, err := (&found).readFrom(bufio.NewReader(b), b.Len()) - if err != nil { - t.Error(err) - t.FailNow() - } - if remain != 0 { - t.Errorf("expected 0 remain, got %v", remain) - t.FailNow() - } - if !reflect.DeepEqual(item, found) { - t.Error("expected item and found to be the same") - t.FailNow() + found := joinGroupResponse{v: v} + remain, err := (&found).readFrom(bufio.NewReader(b), b.Len()) + if err != nil { + t.Error(err) + t.FailNow() + } + if remain != 0 { + t.Errorf("expected 0 remain, got %v", remain) + t.FailNow() + } + if !reflect.DeepEqual(item, found) { + t.Error("expected item and found to be the same") + t.FailNow() + } } } diff --git a/listgroups.go b/listgroups.go index 229de9352..5034b5440 100644 --- a/listgroups.go +++ b/listgroups.go @@ -125,7 +125,7 @@ func (t *listGroupsResponseV1) readFrom(r *bufio.Reader, size int) (remain int, fn := func(withReader *bufio.Reader, withSize int) (fnRemain int, fnErr error) { var item listGroupsResponseGroupV1 - if fnRemain, fnErr = (&item).readFrom(withReader, withSize); err != nil { + if fnRemain, fnErr = (&item).readFrom(withReader, withSize); fnErr != nil { return } t.Groups = append(t.Groups, item) diff --git a/offsetfetch.go b/offsetfetch.go index b85bc5c83..ce80213f8 100644 --- a/offsetfetch.go +++ b/offsetfetch.go @@ -229,7 +229,7 @@ func (t *offsetFetchResponseV1Response) readFrom(r *bufio.Reader, size int) (rem fn := func(r *bufio.Reader, size int) (fnRemain int, fnErr error) { item := offsetFetchResponseV1PartitionResponse{} - if fnRemain, fnErr = (&item).readFrom(r, size); err != nil { + if fnRemain, fnErr = (&item).readFrom(r, size); fnErr != nil { return } t.PartitionResponses = append(t.PartitionResponses, item) From 992667750002d81287ad990f7c12bd149573e033 Mon Sep 17 00:00:00 2001 From: maxwolf8852 Date: Thu, 1 May 2025 20:47:00 +0300 Subject: [PATCH 04/12] fix scram sasl deployment for kafka 4.0 --- .circleci/config.yml | 11 +++++++---- sasl/sasl_test.go | 13 +++++++++---- 2 files changed, 16 insertions(+), 8 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index eb2ccbf8f..651b999b1 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -318,10 +318,13 @@ jobs: KAFKA_CFG_DELETE_TOPIC_ENABLE: 'true' KAFKA_CFG_MESSAGE_MAX_BYTES: '200000000' KAFKA_CFG_AUTHORIZER_CLASS_NAME: 'org.apache.kafka.metadata.authorizer.StandardAuthorizer' - entrypoint: - - "/bin/bash" - - "-c" - - echo -e 'KafkaServer {\norg.apache.kafka.common.security.scram.ScramLoginModule required\n username="adminscram"\n password="admin-secret";\n org.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n };' > /opt/bitnami/kafka/config/kafka_jaas.conf; exec /entrypoint.sh /run.sh + KAFKA_CFG_SUPER_USERS: User:adminscram256;User:adminscram512;User:adminplain + KAFKA_CLIENT_USERS: adminscram256,adminscram512,adminplain + KAFKA_CLIENT_PASSWORDS: admin-secret-256,admin-secret-512,admin-secret + KAFKA_CLIENT_SASL_MECHANISMS: SCRAM-SHA-256,SCRAM-SHA-512,PLAIN + KAFKA_INTER_BROKER_USER: adminscram512 + KAFKA_INTER_BROKER_PASSWORD: admin-secret-512 + KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL: SCRAM-SHA-512 steps: *steps workflows: diff --git a/sasl/sasl_test.go b/sasl/sasl_test.go index a4101391a..57ff8b7cf 100644 --- a/sasl/sasl_test.go +++ b/sasl/sasl_test.go @@ -18,6 +18,11 @@ const ( ) func TestSASL(t *testing.T) { + scramUsers := map[scram.Algorithm]string{scram.SHA256: "adminscram", scram.SHA512: "adminscram"} + // kafka 4.0.0 test environment supports only different users for different scram algorithms. + if ktesting.KafkaIsAtLeast("4.0.0") { + scramUsers = map[scram.Algorithm]string{scram.SHA256: "adminscram256", scram.SHA512: "adminscram512"} + } tests := []struct { valid func() sasl.Mechanism invalid func() sasl.Mechanism @@ -39,22 +44,22 @@ func TestSASL(t *testing.T) { }, { valid: func() sasl.Mechanism { - mech, _ := scram.Mechanism(scram.SHA256, "adminscram", "admin-secret-256") + mech, _ := scram.Mechanism(scram.SHA256, scramUsers[scram.SHA256], "admin-secret-256") return mech }, invalid: func() sasl.Mechanism { - mech, _ := scram.Mechanism(scram.SHA256, "adminscram", "badpassword") + mech, _ := scram.Mechanism(scram.SHA256, scramUsers[scram.SHA256], "badpassword") return mech }, minKafka: "0.10.2.0", }, { valid: func() sasl.Mechanism { - mech, _ := scram.Mechanism(scram.SHA512, "adminscram", "admin-secret-512") + mech, _ := scram.Mechanism(scram.SHA512, scramUsers[scram.SHA512], "admin-secret-512") return mech }, invalid: func() sasl.Mechanism { - mech, _ := scram.Mechanism(scram.SHA512, "adminscram", "badpassword") + mech, _ := scram.Mechanism(scram.SHA512, scramUsers[scram.SHA512], "badpassword") return mech }, minKafka: "0.10.2.0", From 514d401d74006edce3c23d1972aa2c4db50223a1 Mon Sep 17 00:00:00 2001 From: Peter Dannemann Date: Tue, 6 May 2025 10:49:28 -0400 Subject: [PATCH 05/12] fix indentation --- .circleci/config.yml | 66 ++++++++++++++++++++++---------------------- 1 file changed, 33 insertions(+), 33 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index c490ac1c4..4b6d25df3 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -194,39 +194,39 @@ jobs: # TODO: Figure out why these are happening and fix them (they don't appear to be new). KAFKA_SKIP_NETTEST: "1" docker: - - image: circleci/golang - - image: bitnami/kafka:4.0.0 - ports: - - 9092:9092 - - 9093:9093 - environment: - KAFKA_CFG_NODE_ID: 1 - KAFKA_CFG_BROKER_ID: 1 - KAFKA_CFG_PROCESS_ROLES: broker,controller - KAFKA_CFG_ADVERTISED_HOST_NAME: 'localhost' - KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAIN:PLAINTEXT,SASL:SASL_PLAINTEXT - KAFKA_CFG_LISTENERS: CONTROLLER://:9094,PLAIN://:9092,SASL://:9093 - KAFKA_CFG_ADVERTISED_LISTENERS: PLAIN://localhost:9092,SASL://localhost:9093 - KAFKA_CFG_INTER_BROKER_LISTENER_NAME: PLAIN - KAFKA_CFG_SASL_ENABLED_MECHANISMS: 'PLAIN,SCRAM-SHA-256,SCRAM-SHA-512' - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@localhost:9094 - ALLOW_PLAINTEXT_LISTENER: yes - KAFKA_CFG_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true' - KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/bitnami/kafka/config/kafka_jaas.conf" - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'true' - KAFKA_CFG_DELETE_TOPIC_ENABLE: 'true' - KAFKA_CFG_MESSAGE_MAX_BYTES: '200000000' - KAFKA_CFG_AUTHORIZER_CLASS_NAME: 'org.apache.kafka.metadata.authorizer.StandardAuthorizer' - KAFKA_CFG_SUPER_USERS: User:adminscram256;User:adminscram512;User:adminplain - KAFKA_CLIENT_USERS: adminscram256,adminscram512,adminplain - KAFKA_CLIENT_PASSWORDS: admin-secret-256,admin-secret-512,admin-secret - KAFKA_CLIENT_SASL_MECHANISMS: SCRAM-SHA-256,SCRAM-SHA-512,PLAIN - KAFKA_INTER_BROKER_USER: adminscram512 - KAFKA_INTER_BROKER_PASSWORD: admin-secret-512 - KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL: SCRAM-SHA-512 - entrypoint: *entrypoint - steps: *steps + - image: circleci/golang + - image: bitnami/kafka:4.0.0 + ports: + - 9092:9092 + - 9093:9093 + environment: + KAFKA_CFG_NODE_ID: 1 + KAFKA_CFG_BROKER_ID: 1 + KAFKA_CFG_PROCESS_ROLES: broker,controller + KAFKA_CFG_ADVERTISED_HOST_NAME: 'localhost' + KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAIN:PLAINTEXT,SASL:SASL_PLAINTEXT + KAFKA_CFG_LISTENERS: CONTROLLER://:9094,PLAIN://:9092,SASL://:9093 + KAFKA_CFG_ADVERTISED_LISTENERS: PLAIN://localhost:9092,SASL://localhost:9093 + KAFKA_CFG_INTER_BROKER_LISTENER_NAME: PLAIN + KAFKA_CFG_SASL_ENABLED_MECHANISMS: 'PLAIN,SCRAM-SHA-256,SCRAM-SHA-512' + KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@localhost:9094 + ALLOW_PLAINTEXT_LISTENER: yes + KAFKA_CFG_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true' + KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/bitnami/kafka/config/kafka_jaas.conf" + KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'true' + KAFKA_CFG_DELETE_TOPIC_ENABLE: 'true' + KAFKA_CFG_MESSAGE_MAX_BYTES: '200000000' + KAFKA_CFG_AUTHORIZER_CLASS_NAME: 'org.apache.kafka.metadata.authorizer.StandardAuthorizer' + KAFKA_CFG_SUPER_USERS: User:adminscram256;User:adminscram512;User:adminplain + KAFKA_CLIENT_USERS: adminscram256,adminscram512,adminplain + KAFKA_CLIENT_PASSWORDS: admin-secret-256,admin-secret-512,admin-secret + KAFKA_CLIENT_SASL_MECHANISMS: SCRAM-SHA-256,SCRAM-SHA-512,PLAIN + KAFKA_INTER_BROKER_USER: adminscram512 + KAFKA_INTER_BROKER_PASSWORD: admin-secret-512 + KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL: SCRAM-SHA-512 + entrypoint: *entrypoint + steps: *steps workflows: version: 2 From 71ca58e4eed19f1c582e724874b2b0e49c61a9dc Mon Sep 17 00:00:00 2001 From: Peter Dannemann Date: Tue, 6 May 2025 12:27:45 -0400 Subject: [PATCH 06/12] fix some tests --- .circleci/config.yml | 1 - writer_test.go | 4 ++-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 4b6d25df3..51165261c 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -225,7 +225,6 @@ jobs: KAFKA_INTER_BROKER_USER: adminscram512 KAFKA_INTER_BROKER_PASSWORD: admin-secret-512 KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL: SCRAM-SHA-512 - entrypoint: *entrypoint steps: *steps workflows: diff --git a/writer_test.go b/writer_test.go index 6f894ecd3..bd64b668b 100644 --- a/writer_test.go +++ b/writer_test.go @@ -856,7 +856,7 @@ func testWriterAutoCreateTopic(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() err = w.WriteMessages(ctx, msg) - if errors.Is(err, LeaderNotAvailable) || errors.Is(err, context.DeadlineExceeded) { + if errors.Is(err, LeaderNotAvailable) || errors.Is(err, context.DeadlineExceeded) || errors.Is(err, UnknownTopicOrPartition) { time.Sleep(time.Millisecond * 250) continue } @@ -924,7 +924,7 @@ func testWriterSasl(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) defer cancel() err = w.WriteMessages(ctx, msg) - if errors.Is(err, LeaderNotAvailable) || errors.Is(err, context.DeadlineExceeded) { + if errors.Is(err, LeaderNotAvailable) || errors.Is(err, context.DeadlineExceeded) || errors.Is(err, UnknownTopicOrPartition) { time.Sleep(time.Millisecond * 250) continue } From 6e23b977d39f4ce7815ffcb36416f6905267c25e Mon Sep 17 00:00:00 2001 From: Peter Dannemann Date: Tue, 6 May 2025 12:27:57 -0400 Subject: [PATCH 07/12] add docker_compose_versions --- .../docker-compose-370.yml | 39 +++++++++++++++++++ .../docker-compose-400.yml | 36 +++++++++++++++++ 2 files changed, 75 insertions(+) create mode 100644 docker_compose_versions/docker-compose-370.yml create mode 100644 docker_compose_versions/docker-compose-400.yml diff --git a/docker_compose_versions/docker-compose-370.yml b/docker_compose_versions/docker-compose-370.yml new file mode 100644 index 000000000..dffb0e448 --- /dev/null +++ b/docker_compose_versions/docker-compose-370.yml @@ -0,0 +1,39 @@ +# See https://hub.docker.com/r/bitnami/kafka/tags for the complete list. +version: '3' +services: + zookeeper: + container_name: zookeeper + hostname: zookeeper + image: bitnami/zookeeper:latest + ports: + - 2181:2181 + environment: + ALLOW_ANONYMOUS_LOGIN: yes + kafka: + container_name: kafka + image: bitnami/kafka:3.7.0 + restart: on-failure:3 + links: + - zookeeper + ports: + - 9092:9092 + - 9093:9093 + environment: + KAFKA_CFG_BROKER_ID: 1 + KAFKA_CFG_DELETE_TOPIC_ENABLE: 'true' + KAFKA_CFG_ADVERTISED_HOST_NAME: 'localhost' + KAFKA_CFG_ADVERTISED_PORT: '9092' + KAFKA_CFG_ZOOKEEPER_CONNECT: zookeeper:2181 + KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'true' + KAFKA_CFG_MESSAGE_MAX_BYTES: '200000000' + KAFKA_CFG_LISTENERS: 'PLAINTEXT://:9092,SASL_PLAINTEXT://:9093' + KAFKA_CFG_ADVERTISED_LISTENERS: 'PLAINTEXT://localhost:9092,SASL_PLAINTEXT://localhost:9093' + KAFKA_CFG_SASL_ENABLED_MECHANISMS: 'PLAIN,SCRAM-SHA-256,SCRAM-SHA-512' + KAFKA_CFG_AUTHORIZER_CLASS_NAME: 'kafka.security.authorizer.AclAuthorizer' + KAFKA_CFG_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true' + KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/bitnami/kafka/config/kafka_jaas.conf" + ALLOW_PLAINTEXT_LISTENER: yes + entrypoint: + - "/bin/bash" + - "-c" + - echo -e 'KafkaServer {\norg.apache.kafka.common.security.scram.ScramLoginModule required\n username="adminscram"\n password="admin-secret";\n org.apache.kafka.common.security.plain.PlainLoginModule required\n username="adminplain"\n password="admin-secret"\n user_adminplain="admin-secret";\n };' > /opt/bitnami/kafka/config/kafka_jaas.conf; /opt/bitnami/kafka/bin/kafka-configs.sh --zookeeper zookeeper:2181 --alter --add-config "SCRAM-SHA-256=[password=admin-secret-256],SCRAM-SHA-512=[password=admin-secret-512]" --entity-type users --entity-name adminscram; exec /entrypoint.sh /run.sh diff --git a/docker_compose_versions/docker-compose-400.yml b/docker_compose_versions/docker-compose-400.yml new file mode 100644 index 000000000..cb53e9864 --- /dev/null +++ b/docker_compose_versions/docker-compose-400.yml @@ -0,0 +1,36 @@ +# See https://hub.docker.com/r/bitnami/kafka/tags for the complete list. +version: '3' +services: + kafka: + container_name: kafka + image: bitnami/kafka:4.0.0 + restart: on-failure:3 + ports: + - 9092:9092 + - 9093:9093 + environment: + KAFKA_CFG_NODE_ID: 1 + KAFKA_CFG_BROKER_ID: 1 + KAFKA_CFG_PROCESS_ROLES: broker,controller + KAFKA_CFG_ADVERTISED_HOST_NAME: 'localhost' + KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER + KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAIN:PLAINTEXT,SASL:SASL_PLAINTEXT + KAFKA_CFG_LISTENERS: CONTROLLER://:9094,PLAIN://:9092,SASL://:9093 + KAFKA_CFG_ADVERTISED_LISTENERS: PLAIN://localhost:9092,SASL://localhost:9093 + KAFKA_CFG_INTER_BROKER_LISTENER_NAME: PLAIN + KAFKA_CFG_SASL_ENABLED_MECHANISMS: 'PLAIN,SCRAM-SHA-256,SCRAM-SHA-512' + KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@localhost:9094 + ALLOW_PLAINTEXT_LISTENER: yes + KAFKA_CFG_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true' + KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/bitnami/kafka/config/kafka_jaas.conf" + KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'true' + KAFKA_CFG_DELETE_TOPIC_ENABLE: 'true' + KAFKA_CFG_MESSAGE_MAX_BYTES: '200000000' + KAFKA_CFG_AUTHORIZER_CLASS_NAME: 'org.apache.kafka.metadata.authorizer.StandardAuthorizer' + KAFKA_CFG_SUPER_USERS: User:adminscram256;User:adminscram512;User:adminplain + KAFKA_CLIENT_USERS: adminscram256,adminscram512,adminplain + KAFKA_CLIENT_PASSWORDS: admin-secret-256,admin-secret-512,admin-secret + KAFKA_CLIENT_SASL_MECHANISMS: SCRAM-SHA-256,SCRAM-SHA-512,PLAIN + KAFKA_INTER_BROKER_USER: adminscram512 + KAFKA_INTER_BROKER_PASSWORD: admin-secret-512 + KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL: SCRAM-SHA-512 \ No newline at end of file From b8ad0b5e32da45cc60ee86a4cfc5d9d35981cbb0 Mon Sep 17 00:00:00 2001 From: Peter Dannemann Date: Tue, 6 May 2025 12:39:56 -0400 Subject: [PATCH 08/12] add KAFKA_HEAP_OPTS to stop OOM errors --- .circleci/config.yml | 2 ++ docker_compose_versions/docker-compose-400.yml | 3 ++- 2 files changed, 4 insertions(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 51165261c..fbe6cc40f 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -214,6 +214,8 @@ jobs: ALLOW_PLAINTEXT_LISTENER: yes KAFKA_CFG_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true' KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/bitnami/kafka/config/kafka_jaas.conf" + # Kafka 4.0 runs out of memory with the default heap size + KAFKA_HEAP_OPTS: '-Xmx1000m -Xms1000m' KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'true' KAFKA_CFG_DELETE_TOPIC_ENABLE: 'true' KAFKA_CFG_MESSAGE_MAX_BYTES: '200000000' diff --git a/docker_compose_versions/docker-compose-400.yml b/docker_compose_versions/docker-compose-400.yml index cb53e9864..4cbab2e4f 100644 --- a/docker_compose_versions/docker-compose-400.yml +++ b/docker_compose_versions/docker-compose-400.yml @@ -33,4 +33,5 @@ services: KAFKA_CLIENT_SASL_MECHANISMS: SCRAM-SHA-256,SCRAM-SHA-512,PLAIN KAFKA_INTER_BROKER_USER: adminscram512 KAFKA_INTER_BROKER_PASSWORD: admin-secret-512 - KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL: SCRAM-SHA-512 \ No newline at end of file + KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL: SCRAM-SHA-512 + KAFKA_HEAP_OPTS: '-Xmx1000m -Xms1000m' \ No newline at end of file From aca41de1de046a3d93eec3b59e9a59f18bb64fac Mon Sep 17 00:00:00 2001 From: Peter Dannemann Date: Tue, 6 May 2025 12:51:11 -0400 Subject: [PATCH 09/12] try running kafka 400 last to see if that avoids OOM errors --- .circleci/config.yml | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index fbe6cc40f..0a0eea4ad 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -238,4 +238,10 @@ workflows: - kafka-270 - kafka-281 - kafka-370 - - kafka-400 + - kafka-400: + requires: + - kafka-010 + - kafka-270 + - kafka-281 + - kafka-370 + - lint From caa41bb95363945602b51c862d2f692c3d8907d2 Mon Sep 17 00:00:00 2001 From: Peter Dannemann Date: Tue, 6 May 2025 13:17:41 -0400 Subject: [PATCH 10/12] try adding G1GC to reduce out of memory errors from kafka --- .circleci/config.yml | 1 + docker_compose_versions/docker-compose-400.yml | 3 ++- 2 files changed, 3 insertions(+), 1 deletion(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 0a0eea4ad..e1d4816c1 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -216,6 +216,7 @@ jobs: KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/bitnami/kafka/config/kafka_jaas.conf" # Kafka 4.0 runs out of memory with the default heap size KAFKA_HEAP_OPTS: '-Xmx1000m -Xms1000m' + KAFKA_JVM_OPTS: '-XX:+UseG1GC' KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'true' KAFKA_CFG_DELETE_TOPIC_ENABLE: 'true' KAFKA_CFG_MESSAGE_MAX_BYTES: '200000000' diff --git a/docker_compose_versions/docker-compose-400.yml b/docker_compose_versions/docker-compose-400.yml index 4cbab2e4f..00fa43eaa 100644 --- a/docker_compose_versions/docker-compose-400.yml +++ b/docker_compose_versions/docker-compose-400.yml @@ -34,4 +34,5 @@ services: KAFKA_INTER_BROKER_USER: adminscram512 KAFKA_INTER_BROKER_PASSWORD: admin-secret-512 KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL: SCRAM-SHA-512 - KAFKA_HEAP_OPTS: '-Xmx1000m -Xms1000m' \ No newline at end of file + KAFKA_HEAP_OPTS: '-Xmx1000m -Xms1000m' + KAFKA_JVM_OPTS: '-XX:+UseG1GC' \ No newline at end of file From 1477d19a0aa72b6ccd63ffcb49669601773adda4 Mon Sep 17 00:00:00 2001 From: Peter Dannemann Date: Tue, 6 May 2025 14:29:23 -0400 Subject: [PATCH 11/12] disable kafka 4.0.0 for now --- .circleci/config.yml | 108 ++++++++++++++++++++----------------------- 1 file changed, 51 insertions(+), 57 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index e1d4816c1..64217146c 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -178,57 +178,57 @@ jobs: entrypoint: *entrypoint steps: *steps - kafka-400: - working_directory: *working_directory - environment: - KAFKA_VERSION: "4.0.0" + # NOTE: this fails quite often due to Java heap errors from Kafka. + # Once we switch to Github actions and can use larger instances, we can + # set the heap size and enable these + # kafka-400: + # working_directory: *working_directory + # environment: + # KAFKA_VERSION: "4.0.0" - # Need to skip nettest to avoid these kinds of errors: - # --- FAIL: TestConn/nettest (17.56s) - # --- FAIL: TestConn/nettest/PingPong (7.40s) - # conntest.go:112: unexpected Read error: [7] Request Timed Out: the request exceeded the user-specified time limit in the request - # conntest.go:118: mismatching value: got 77, want 78 - # conntest.go:118: mismatching value: got 78, want 79 - # ... - # - # TODO: Figure out why these are happening and fix them (they don't appear to be new). - KAFKA_SKIP_NETTEST: "1" - docker: - - image: circleci/golang - - image: bitnami/kafka:4.0.0 - ports: - - 9092:9092 - - 9093:9093 - environment: - KAFKA_CFG_NODE_ID: 1 - KAFKA_CFG_BROKER_ID: 1 - KAFKA_CFG_PROCESS_ROLES: broker,controller - KAFKA_CFG_ADVERTISED_HOST_NAME: 'localhost' - KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAIN:PLAINTEXT,SASL:SASL_PLAINTEXT - KAFKA_CFG_LISTENERS: CONTROLLER://:9094,PLAIN://:9092,SASL://:9093 - KAFKA_CFG_ADVERTISED_LISTENERS: PLAIN://localhost:9092,SASL://localhost:9093 - KAFKA_CFG_INTER_BROKER_LISTENER_NAME: PLAIN - KAFKA_CFG_SASL_ENABLED_MECHANISMS: 'PLAIN,SCRAM-SHA-256,SCRAM-SHA-512' - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@localhost:9094 - ALLOW_PLAINTEXT_LISTENER: yes - KAFKA_CFG_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true' - KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/bitnami/kafka/config/kafka_jaas.conf" - # Kafka 4.0 runs out of memory with the default heap size - KAFKA_HEAP_OPTS: '-Xmx1000m -Xms1000m' - KAFKA_JVM_OPTS: '-XX:+UseG1GC' - KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'true' - KAFKA_CFG_DELETE_TOPIC_ENABLE: 'true' - KAFKA_CFG_MESSAGE_MAX_BYTES: '200000000' - KAFKA_CFG_AUTHORIZER_CLASS_NAME: 'org.apache.kafka.metadata.authorizer.StandardAuthorizer' - KAFKA_CFG_SUPER_USERS: User:adminscram256;User:adminscram512;User:adminplain - KAFKA_CLIENT_USERS: adminscram256,adminscram512,adminplain - KAFKA_CLIENT_PASSWORDS: admin-secret-256,admin-secret-512,admin-secret - KAFKA_CLIENT_SASL_MECHANISMS: SCRAM-SHA-256,SCRAM-SHA-512,PLAIN - KAFKA_INTER_BROKER_USER: adminscram512 - KAFKA_INTER_BROKER_PASSWORD: admin-secret-512 - KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL: SCRAM-SHA-512 - steps: *steps + # # Need to skip nettest to avoid these kinds of errors: + # # --- FAIL: TestConn/nettest (17.56s) + # # --- FAIL: TestConn/nettest/PingPong (7.40s) + # # conntest.go:112: unexpected Read error: [7] Request Timed Out: the request exceeded the user-specified time limit in the request + # # conntest.go:118: mismatching value: got 77, want 78 + # # conntest.go:118: mismatching value: got 78, want 79 + # # ... + # # + # # TODO: Figure out why these are happening and fix them (they don't appear to be new). + # KAFKA_SKIP_NETTEST: "1" + # docker: + # - image: circleci/golang + # - image: bitnami/kafka:4.0.0 + # ports: + # - 9092:9092 + # - 9093:9093 + # environment: + # KAFKA_CFG_NODE_ID: 1 + # KAFKA_CFG_BROKER_ID: 1 + # KAFKA_CFG_PROCESS_ROLES: broker,controller + # KAFKA_CFG_ADVERTISED_HOST_NAME: 'localhost' + # KAFKA_CFG_CONTROLLER_LISTENER_NAMES: CONTROLLER + # KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP: CONTROLLER:PLAINTEXT,PLAIN:PLAINTEXT,SASL:SASL_PLAINTEXT + # KAFKA_CFG_LISTENERS: CONTROLLER://:9094,PLAIN://:9092,SASL://:9093 + # KAFKA_CFG_ADVERTISED_LISTENERS: PLAIN://localhost:9092,SASL://localhost:9093 + # KAFKA_CFG_INTER_BROKER_LISTENER_NAME: PLAIN + # KAFKA_CFG_SASL_ENABLED_MECHANISMS: 'PLAIN,SCRAM-SHA-256,SCRAM-SHA-512' + # KAFKA_CFG_CONTROLLER_QUORUM_VOTERS: 1@localhost:9094 + # ALLOW_PLAINTEXT_LISTENER: yes + # KAFKA_CFG_ALLOW_EVERYONE_IF_NO_ACL_FOUND: 'true' + # KAFKA_OPTS: "-Djava.security.auth.login.config=/opt/bitnami/kafka/config/kafka_jaas.conf" + # KAFKA_CFG_AUTO_CREATE_TOPICS_ENABLE: 'true' + # KAFKA_CFG_DELETE_TOPIC_ENABLE: 'true' + # KAFKA_CFG_MESSAGE_MAX_BYTES: '200000000' + # KAFKA_CFG_AUTHORIZER_CLASS_NAME: 'org.apache.kafka.metadata.authorizer.StandardAuthorizer' + # KAFKA_CFG_SUPER_USERS: User:adminscram256;User:adminscram512;User:adminplain + # KAFKA_CLIENT_USERS: adminscram256,adminscram512,adminplain + # KAFKA_CLIENT_PASSWORDS: admin-secret-256,admin-secret-512,admin-secret + # KAFKA_CLIENT_SASL_MECHANISMS: SCRAM-SHA-256,SCRAM-SHA-512,PLAIN + # KAFKA_INTER_BROKER_USER: adminscram512 + # KAFKA_INTER_BROKER_PASSWORD: admin-secret-512 + # KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL: SCRAM-SHA-512 + # steps: *steps workflows: version: 2 @@ -239,10 +239,4 @@ workflows: - kafka-270 - kafka-281 - kafka-370 - - kafka-400: - requires: - - kafka-010 - - kafka-270 - - kafka-281 - - kafka-370 - - lint + #- kafka-400 From 16b6b1bb1dcc5c551195af678333b554acad609a Mon Sep 17 00:00:00 2001 From: Peter Dannemann Date: Mon, 12 May 2025 11:14:28 -0400 Subject: [PATCH 12/12] fix typo and add documentation on Kafka 4.0 issues --- .circleci/config.yml | 4 ++-- alterclientquotas_test.go | 6 ++++++ createtopics.go | 2 +- docker_compose_versions/docker-compose-400.yml | 2 ++ 4 files changed, 11 insertions(+), 3 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 64217146c..a32a0d322 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -179,8 +179,8 @@ jobs: steps: *steps # NOTE: this fails quite often due to Java heap errors from Kafka. - # Once we switch to Github actions and can use larger instances, we can - # set the heap size and enable these + # Once we figure out how to fix that, we can re-enable this. + # https://github.com/segmentio/kafka-go/issues/1360#issuecomment-2858935900 # kafka-400: # working_directory: *working_directory # environment: diff --git a/alterclientquotas_test.go b/alterclientquotas_test.go index d61c745e3..23568ad83 100644 --- a/alterclientquotas_test.go +++ b/alterclientquotas_test.go @@ -3,6 +3,7 @@ package kafka import ( "context" "testing" + "time" ktesting "github.com/segmentio/kafka-go/testing" "github.com/stretchr/testify/assert" @@ -65,6 +66,11 @@ func TestClientAlterClientQuotas(t *testing.T) { assert.Equal(t, expectedAlterResp, *alterResp) + // kraft mode is slow + if ktesting.KafkaIsAtLeast("3.7.0") { + time.Sleep(3 * time.Second) + } + describeResp, err := client.DescribeClientQuotas(context.Background(), &DescribeClientQuotasRequest{ Components: []DescribeClientQuotasRequestComponent{ { diff --git a/createtopics.go b/createtopics.go index 40f180f58..9c75d7aaa 100644 --- a/createtopics.go +++ b/createtopics.go @@ -304,7 +304,7 @@ type createTopicsResponseTopicError struct { // ErrorCode holds response error code ErrorCode int16 - // ErrorMessage holds responce error message string + // ErrorMessage holds response error message string ErrorMessage string } diff --git a/docker_compose_versions/docker-compose-400.yml b/docker_compose_versions/docker-compose-400.yml index 00fa43eaa..b563d5e39 100644 --- a/docker_compose_versions/docker-compose-400.yml +++ b/docker_compose_versions/docker-compose-400.yml @@ -34,5 +34,7 @@ services: KAFKA_INTER_BROKER_USER: adminscram512 KAFKA_INTER_BROKER_PASSWORD: admin-secret-512 KAFKA_CFG_SASL_MECHANISM_INTER_BROKER_PROTOCOL: SCRAM-SHA-512 + # Note you will need to increase this to at least 4GB of memory for the tests to pass + # https://github.com/segmentio/kafka-go/issues/1360#issuecomment-2858935900 KAFKA_HEAP_OPTS: '-Xmx1000m -Xms1000m' KAFKA_JVM_OPTS: '-XX:+UseG1GC' \ No newline at end of file