Skip to content

Client Describegroup returns empty Members #1147

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
sansmoraxz opened this issue Jun 21, 2023 · 10 comments
Closed

Client Describegroup returns empty Members #1147

sansmoraxz opened this issue Jun 21, 2023 · 10 comments
Assignees
Labels

Comments

@sansmoraxz
Copy link

sansmoraxz commented Jun 21, 2023

Describe the bug

Describegroup returns empty Members

Kafka Version

  • What version(s) of Kafka are you testing against? - 2.3.1
  • What version of kafka-go are you using? - v0.4.40

To Reproduce

Resources to reproduce the behavior:

Use the docker-compose.yml available in this repo

And the following code:

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"github.com/segmentio/kafka-go"
	"net"
	"time"
)

func newKafkaClient(kafkaURL string) *kafka.Client {
	return &kafka.Client{
		Addr: kafka.TCP(kafkaURL),
		Transport: &kafka.Transport{
			Dial: (&net.Dialer{
				Timeout: 3 * time.Second,
			}).DialContext,
		},
	}
}

func main() {
	kafkaURL := "localhost:9092"
	topic := "test-writer-0"
	consumerGroup := "some-consumer-group"

	client := newKafkaClient(kafkaURL)

	startProducer(client, topic)
	startConsumer(topic, consumerGroup)

	describeGroups(client, consumerGroup)

}

func startProducer(client *kafka.Client, topic string) {
	fmt.Println("start producing ... !!")
	writer := kafka.Writer{
		Addr:      client.Addr,
		Topic:     topic,
		Transport: client.Transport,
		Balancer:  &kafka.LeastBytes{},
	}
	defer func(writer *kafka.Writer) {
		err := writer.Close()
		if err != nil {
			panic(err.Error())
		}
	}(&writer)
	for i := 0; i < 3; i++ {
		key := fmt.Sprintf("Key-%d", i)
		value := fmt.Sprintf("Value-%d", i)
		err := writer.WriteMessages(context.Background(), kafka.Message{
			Key:   []byte(key),
			Value: []byte(value),
		},
		)
		if err != nil {
			panic(err)
		}
		fmt.Printf("produced %s\n", value)
	}
}

func startConsumer(topic string, groupID string) {
	fmt.Println("start consuming ... !!")
	reader := kafka.NewReader(kafka.ReaderConfig{
		Brokers: []string{"localhost:9092"},
		Topic:   topic,
		GroupID: groupID,
	})
	defer func(reader *kafka.Reader) {
		err := reader.Close()
		if err != nil {
			panic(err.Error())
		}
	}(reader)
	for i := 0; i < 2; i++ {
		m, err := reader.ReadMessage(context.Background())
		if err != nil {
			panic(err)
		}
		fmt.Printf("message at offset %d, key %s and partition %d = %s\n", m.Offset, string(m.Key), m.Partition, string(m.Value))
	}
}

func describeGroups(client *kafka.Client, groupID string) {
	res, _ := client.DescribeGroups(context.Background(), &kafka.DescribeGroupsRequest{
		GroupIDs: []string{groupID},
	})
	output, _ := json.Marshal(res)
	fmt.Printf("groups describe: %s\n", output)
	fmt.Printf("groups describe member: %v\n", res.Groups[0].Members) // bug output here
}

Expected Behavior

Members to return the topics and partions

Observed Behavior

Empty members array

Additional Context

The test cases for the file were not executing as can be checked here:
https://app.circleci.com/pipelines/github/segmentio/kafka-go/1338/workflows/17bcd2cd-208c-483e-9d8a-bee17559be36/jobs/14793/parallel-runs/0/steps/0-105

@hhahn-tw
Copy link

hhahn-tw commented Jul 3, 2023

Hello - I may need some further clarification but this seems to be working as intended. By the time you call describeGroups in your sample code, the startConsumer() function has completed and the reader closed. The describeGroups() response shows GroupState=Empty and therefore has no members to report on. This is mirrored in the output of kafka-consumer-groups.sh --group some-consumer-group --describe.

If I run kafka-consumer-groups.sh while startConsumer() function is running, I do see detail for members, eg

GROUP               TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                                            HOST            CLIENT-ID
some-consumer-group test-writer-0   0          13              19              6               main@C02GG1VBMD6M (github.com/segmentio/kafka-go)-856a25df-5fe7-4f33-884e-831153cc4551 /172.18.0.1     main@C02GG1VBMD6M (github.com/segmentio/kafka-go)
some-consumer-group test-writer-0   1          11              19              8               main@C02GG1VBMD6M (github.com/segmentio/kafka-go)-856a25df-5fe7-4f33-884e-831153cc4551 /172.18.0.1     main@C02GG1VBMD6M (github.com/segmentio/kafka-go)
some-consumer-group test-writer-0   2          8               19              11              main@C02GG1VBMD6M (github.com/segmentio/kafka-go)-856a25df-5fe7-4f33-884e-831153cc4551 /172.18.0.1     main@C02GG1VBMD6M (github.com/segmentio/kafka-go)

versus when it's not running/consuming:

Consumer group 'some-consumer-group' has no active members.

GROUP               TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID     HOST            CLIENT-ID
some-consumer-group test-writer-0   0          13              19              6               -               -               -
some-consumer-group test-writer-0   1          11              19              8               -               -               -
some-consumer-group test-writer-0   2          8               19              11              -               -               -

Is there some nuance I have missed?

@sansmoraxz
Copy link
Author

All I need is the list of topics and partitions that the consumer is subscribed to. Not necessarily the client members for the group.
I guess that's a functionality missing unless it's buried somewhere else.

@hhahn-tw
Copy link

hhahn-tw commented Jul 6, 2023

I see, so the output of kafka-consumer-groups.sh when the consumer group is empty has the information you need, but describeGroups() does not provide it, is that correct? Is the scenario where the consumer group is empty one you regularly encounter?

@sansmoraxz
Copy link
Author

Yes, exactly. The data is missing if there are no active consumers.

I am not sure whether DescribeGroup is the right candidate, but I need a function that can consistently provide metadata about a consumer group irrespective of it's state.

@sansmoraxz
Copy link
Author

For example in sarama with admin client you can pass the second param topicPartitions as nil and get almost the exact functionality I need.
https://github.com/Shopify/sarama/blob/v1.38.1/admin.go#L952

But if I try to do the same with OffsetFetch it doesn't work. (returns an empty array)

@hhahn-tw
Copy link

hhahn-tw commented Jul 6, 2023

Yes, there's this issue filed for exactly that #1115

Do you think issue this can be considered a duplicate of that one?

@sansmoraxz
Copy link
Author

Sure I guess it makes sense to track it there, that one once fixed can cover my usecase.

Closing this.

@hhahn-tw
Copy link

@sansmoraxz - in case you weren't tracking it, the related issue #1115 was closed with a fix. I believe it now behaves as documented and should provide the functionality you needed.

@sansmoraxz
Copy link
Author

sansmoraxz commented Jul 21, 2023

Sure thanks. Might I ask what is the expected release date for the version where it's supported?

@hhahn-tw
Copy link

I'm not sure when the next formal release will occur - looks like they're typically once every 2-3 months - but in the meantime, I've confirmed using go get github.com/segmentio/kafka-go@8ceaf94e9b2d7b082297e4476134b7c241985a91 that the fix worked for my use case

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

2 participants