Open
Description
Thanks to your awesome project, my consumer was up and running in minutes.
However, I needed to re-consume the entire queue so did a consumer-group offset reset, then started my consumer but it consumed only the last messages from the queue.
This is my config:
import io.vertx.kafka.client.consumer.KafkaConsumer;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.StringDeserializer;
...
@Override
public void start() {
Map<String, String> config = new HashMap<>();
config.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaUri);
config.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
config.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
config.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConsumerGroup);
config.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
if (!kafkaUsername.isEmpty() && !kafkaPassword.isEmpty()) {
config.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SASL_PLAINTEXT");
config.put(SaslConfigs.SASL_MECHANISM, "SCRAM-SHA-256");
config.put(SaslConfigs.SASL_JAAS_CONFIG, String.format(
"org.apache.kafka.common.security.scram.ScramLoginModule required username=\"%s\" " + "password=\"%s\";",
kafkaUsername,
kafkaPassword
));
}
kafkaConsumer = KafkaConsumer.create(vertx, config);
}
I tried disabling the auto commit, tried changing the consumer group but nothing worked.
What am I missing or doing wrong?
Metadata
Metadata
Assignees
Labels
No labels