-
Notifications
You must be signed in to change notification settings - Fork 814
Implementation of ListPartitionReassignments API #1203
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
petedannemann
merged 4 commits into
segmentio:main
from
bgranvea:listpartitionreassignments
Oct 16, 2023
Merged
Changes from all commits
Commits
Show all changes
4 commits
Select commit
Hold shift + click to select a range
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,135 @@ | ||
package kafka | ||
|
||
import ( | ||
"context" | ||
"net" | ||
"time" | ||
|
||
"github.com/segmentio/kafka-go/protocol/listpartitionreassignments" | ||
) | ||
|
||
// ListPartitionReassignmentsRequest is a request to the ListPartitionReassignments API. | ||
type ListPartitionReassignmentsRequest struct { | ||
// Address of the kafka broker to send the request to. | ||
Addr net.Addr | ||
|
||
// Topics we want reassignments for, mapped by their name, or nil to list everything. | ||
Topics map[string]ListPartitionReassignmentsRequestTopic | ||
|
||
// Timeout is the amount of time to wait for the request to complete. | ||
Timeout time.Duration | ||
} | ||
|
||
// ListPartitionReassignmentsRequestTopic contains the requested partitions for a single | ||
// topic. | ||
type ListPartitionReassignmentsRequestTopic struct { | ||
// The partitions to list partition reassignments for. | ||
PartitionIndexes []int | ||
} | ||
|
||
// ListPartitionReassignmentsResponse is a response from the ListPartitionReassignments API. | ||
type ListPartitionReassignmentsResponse struct { | ||
// Error is set to a non-nil value including the code and message if a top-level | ||
// error was encountered. | ||
Error error | ||
|
||
// Topics contains results for each topic, mapped by their name. | ||
Topics map[string]ListPartitionReassignmentsResponseTopic | ||
} | ||
|
||
// ListPartitionReassignmentsResponseTopic contains the detailed result of | ||
// ongoing reassignments for a topic. | ||
type ListPartitionReassignmentsResponseTopic struct { | ||
// Partitions contains result for topic partitions. | ||
Partitions []ListPartitionReassignmentsResponsePartition | ||
} | ||
|
||
// ListPartitionReassignmentsResponsePartition contains the detailed result of | ||
// ongoing reassignments for a single partition. | ||
type ListPartitionReassignmentsResponsePartition struct { | ||
// PartitionIndex contains index of the partition. | ||
PartitionIndex int | ||
|
||
// Replicas contains the current replica set. | ||
Replicas []int | ||
|
||
// AddingReplicas contains the set of replicas we are currently adding. | ||
AddingReplicas []int | ||
|
||
// RemovingReplicas contains the set of replicas we are currently removing. | ||
RemovingReplicas []int | ||
} | ||
|
||
func (c *Client) ListPartitionReassignments( | ||
ctx context.Context, | ||
req *ListPartitionReassignmentsRequest, | ||
) (*ListPartitionReassignmentsResponse, error) { | ||
apiReq := &listpartitionreassignments.Request{ | ||
TimeoutMs: int32(req.Timeout.Milliseconds()), | ||
} | ||
|
||
for topicName, topicReq := range req.Topics { | ||
apiReq.Topics = append( | ||
apiReq.Topics, | ||
listpartitionreassignments.RequestTopic{ | ||
Name: topicName, | ||
PartitionIndexes: intToInt32Array(topicReq.PartitionIndexes), | ||
}, | ||
) | ||
} | ||
|
||
protoResp, err := c.roundTrip( | ||
ctx, | ||
req.Addr, | ||
apiReq, | ||
) | ||
if err != nil { | ||
return nil, err | ||
} | ||
apiResp := protoResp.(*listpartitionreassignments.Response) | ||
|
||
resp := &ListPartitionReassignmentsResponse{ | ||
Error: makeError(apiResp.ErrorCode, apiResp.ErrorMessage), | ||
Topics: make(map[string]ListPartitionReassignmentsResponseTopic), | ||
} | ||
|
||
for _, topicResult := range apiResp.Topics { | ||
respTopic := ListPartitionReassignmentsResponseTopic{} | ||
for _, partitionResult := range topicResult.Partitions { | ||
respTopic.Partitions = append( | ||
respTopic.Partitions, | ||
ListPartitionReassignmentsResponsePartition{ | ||
PartitionIndex: int(partitionResult.PartitionIndex), | ||
Replicas: int32ToIntArray(partitionResult.Replicas), | ||
AddingReplicas: int32ToIntArray(partitionResult.AddingReplicas), | ||
RemovingReplicas: int32ToIntArray(partitionResult.RemovingReplicas), | ||
}, | ||
) | ||
} | ||
resp.Topics[topicResult.Name] = respTopic | ||
} | ||
|
||
return resp, nil | ||
} | ||
|
||
func intToInt32Array(arr []int) []int32 { | ||
if arr == nil { | ||
return nil | ||
} | ||
res := make([]int32, len(arr)) | ||
for i := range arr { | ||
res[i] = int32(arr[i]) | ||
} | ||
return res | ||
} | ||
|
||
func int32ToIntArray(arr []int32) []int { | ||
if arr == nil { | ||
return nil | ||
} | ||
res := make([]int, len(arr)) | ||
for i := range arr { | ||
res[i] = int(arr[i]) | ||
} | ||
return res | ||
} |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,50 @@ | ||
package kafka | ||
|
||
import ( | ||
"context" | ||
"testing" | ||
|
||
ktesting "github.com/segmentio/kafka-go/testing" | ||
) | ||
|
||
func TestClientListPartitionReassignments(t *testing.T) { | ||
if !ktesting.KafkaIsAtLeast("2.4.0") { | ||
return | ||
} | ||
|
||
ctx := context.Background() | ||
client, shutdown := newLocalClient() | ||
defer shutdown() | ||
|
||
topic := makeTopic() | ||
createTopic(t, topic, 2) | ||
defer deleteTopic(t, topic) | ||
|
||
// Can't really get an ongoing partition reassignment with local Kafka, so just do a superficial test here. | ||
resp, err := client.ListPartitionReassignments( | ||
ctx, | ||
&ListPartitionReassignmentsRequest{ | ||
Topics: map[string]ListPartitionReassignmentsRequestTopic{ | ||
topic: {PartitionIndexes: []int{0, 1}}, | ||
}, | ||
}, | ||
) | ||
|
||
if err != nil { | ||
t.Fatal(err) | ||
} | ||
if resp.Error != nil { | ||
t.Error( | ||
"Unexpected error in response", | ||
"expected", nil, | ||
"got", resp.Error, | ||
) | ||
} | ||
if len(resp.Topics) != 0 { | ||
t.Error( | ||
"Unexpected length of topic results", | ||
"expected", 0, | ||
"got", len(resp.Topics), | ||
) | ||
} | ||
} |
70 changes: 70 additions & 0 deletions
70
protocol/listpartitionreassignments/listpartitionreassignments.go
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
package listpartitionreassignments | ||
|
||
import "github.com/segmentio/kafka-go/protocol" | ||
|
||
func init() { | ||
protocol.Register(&Request{}, &Response{}) | ||
} | ||
|
||
// Detailed API definition: https://kafka.apache.org/protocol#The_Messages_ListPartitionReassignments. | ||
|
||
type Request struct { | ||
// We need at least one tagged field to indicate that this is a "flexible" message | ||
// type. | ||
_ struct{} `kafka:"min=v0,max=v0,tag"` | ||
|
||
TimeoutMs int32 `kafka:"min=v0,max=v0"` | ||
Topics []RequestTopic `kafka:"min=v0,max=v0,nullable"` | ||
} | ||
|
||
type RequestTopic struct { | ||
// We need at least one tagged field to indicate that this is a "flexible" message | ||
// type. | ||
_ struct{} `kafka:"min=v0,max=v0,tag"` | ||
|
||
Name string `kafka:"min=v0,max=v0"` | ||
PartitionIndexes []int32 `kafka:"min=v0,max=v0"` | ||
} | ||
|
||
func (r *Request) ApiKey() protocol.ApiKey { | ||
return protocol.ListPartitionReassignments | ||
} | ||
|
||
func (r *Request) Broker(cluster protocol.Cluster) (protocol.Broker, error) { | ||
return cluster.Brokers[cluster.Controller], nil | ||
} | ||
|
||
type Response struct { | ||
// We need at least one tagged field to indicate that this is a "flexible" message | ||
// type. | ||
_ struct{} `kafka:"min=v0,max=v0,tag"` | ||
|
||
ThrottleTimeMs int32 `kafka:"min=v0,max=v0"` | ||
ErrorCode int16 `kafka:"min=v0,max=v0"` | ||
ErrorMessage string `kafka:"min=v0,max=v0,nullable"` | ||
Topics []ResponseTopic `kafka:"min=v0,max=v0"` | ||
} | ||
|
||
type ResponseTopic struct { | ||
// We need at least one tagged field to indicate that this is a "flexible" message | ||
// type. | ||
_ struct{} `kafka:"min=v0,max=v0,tag"` | ||
|
||
Name string `kafka:"min=v0,max=v0"` | ||
Partitions []ResponsePartition `kafka:"min=v0,max=v0"` | ||
} | ||
|
||
type ResponsePartition struct { | ||
// We need at least one tagged field to indicate that this is a "flexible" message | ||
// type. | ||
_ struct{} `kafka:"min=v0,max=v0,tag"` | ||
|
||
PartitionIndex int32 `kafka:"min=v0,max=v0"` | ||
Replicas []int32 `kafka:"min=v0,max=v0"` | ||
AddingReplicas []int32 `kafka:"min=v0,max=v0"` | ||
RemovingReplicas []int32 `kafka:"min=v0,max=v0"` | ||
} | ||
|
||
func (r *Response) ApiKey() protocol.ApiKey { | ||
return protocol.ListPartitionReassignments | ||
} |
41 changes: 41 additions & 0 deletions
41
protocol/listpartitionreassignments/listpartitionreassignments_test.go
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,41 @@ | ||
package listpartitionreassignments_test | ||
|
||
import ( | ||
"testing" | ||
|
||
"github.com/segmentio/kafka-go/protocol/listpartitionreassignments" | ||
"github.com/segmentio/kafka-go/protocol/prototest" | ||
) | ||
|
||
const ( | ||
v0 = 0 | ||
) | ||
|
||
func TestListPartitionReassignmentsRequest(t *testing.T) { | ||
prototest.TestRequest(t, v0, &listpartitionreassignments.Request{ | ||
Topics: []listpartitionreassignments.RequestTopic{ | ||
{ | ||
Name: "topic-1", | ||
PartitionIndexes: []int32{1, 2, 3}, | ||
}, | ||
}, | ||
}) | ||
} | ||
|
||
func TestListPartitionReassignmentsResponse(t *testing.T) { | ||
prototest.TestResponse(t, v0, &listpartitionreassignments.Response{ | ||
Topics: []listpartitionreassignments.ResponseTopic{ | ||
{ | ||
Name: "topic-1", | ||
Partitions: []listpartitionreassignments.ResponsePartition{ | ||
{ | ||
PartitionIndex: 1, | ||
Replicas: []int32{1, 2, 3}, | ||
AddingReplicas: []int32{4, 5, 6}, | ||
RemovingReplicas: []int32{7, 8, 9}, | ||
}, | ||
}, | ||
}, | ||
}, | ||
}) | ||
} |
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.