Skip to content

Commit c162ad3

Browse files
author
Julien Ruaux
committed
feat: Added key/cert authentication options
1 parent e428b75 commit c162ad3

File tree

9 files changed

+248
-155
lines changed

9 files changed

+248
-155
lines changed

pom.xml

Lines changed: 32 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,5 @@
11
<?xml version="1.0" encoding="UTF-8"?>
2-
<project
3-
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
4-
xmlns="http://maven.apache.org/POM/4.0.0"
5-
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
2+
<project xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd" xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance">
63
<modelVersion>4.0.0</modelVersion>
74
<groupId>com.redis</groupId>
85
<artifactId>redis-kafka-connect</artifactId>
@@ -16,20 +13,21 @@
1613
<github.repo>redis-kafka-connect</github.repo>
1714
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
1815
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
19-
<java.version>8</java.version>
20-
<kafka.version>3.2.1</kafka.version>
21-
<connect-utils.version>[0.7.166,0.7.2000)</connect-utils.version>
2216
<asciidoctor.maven.plugin.version>2.1.0</asciidoctor.maven.plugin.version>
2317
<asciidoctorj.version>2.5.1</asciidoctorj.version>
24-
<asciidoctorj.pdf.version>2.1.4</asciidoctorj.pdf.version>
18+
<asciidoctorj.pdf.version>2.3.0</asciidoctorj.pdf.version>
2519
<awaitility.version>4.2.0</awaitility.version>
20+
<commons-pool2.version>2.11.1</commons-pool2.version>
21+
<connect-utils.version>[0.7.166,0.7.2000)</connect-utils.version>
2622
<jruby.version>9.2.17.0</jruby.version>
23+
<java.version>8</java.version>
2724
<junit.version>5.8.1</junit.version>
28-
<lettucemod.version>3.0.4</lettucemod.version>
29-
<mockito.version>4.7.0</mockito.version>
25+
<kafka.version>3.2.1</kafka.version>
26+
<lettucemod.version>3.1.5</lettucemod.version>
27+
<mockito.version>4.8.0</mockito.version>
3028
<reactor.version>3.4.22</reactor.version>
3129
<spring-batch.version>4.3.6</spring-batch.version>
32-
<spring-batch-redis.version>2.34.1</spring-batch-redis.version>
30+
<spring-batch-redis.version>3.0.2-SNAPSHOT</spring-batch-redis.version>
3331
<slf4j.version>1.7.36</slf4j.version>
3432
<testcontainers-redis.version>1.6.2</testcontainers-redis.version>
3533
</properties>
@@ -94,6 +92,11 @@
9492
<artifactId>lettucemod</artifactId>
9593
<version>${lettucemod.version}</version>
9694
</dependency>
95+
<dependency>
96+
<groupId>org.apache.commons</groupId>
97+
<artifactId>commons-pool2</artifactId>
98+
<version>${commons-pool2.version}</version>
99+
</dependency>
97100
<dependency>
98101
<groupId>org.slf4j</groupId>
99102
<artifactId>slf4j-api</artifactId>
@@ -201,7 +204,22 @@
201204
<plugin>
202205
<groupId>org.jacoco</groupId>
203206
<artifactId>jacoco-maven-plugin</artifactId>
204-
<version>0.8.7</version>
207+
<version>0.8.8</version>
208+
<executions>
209+
<execution>
210+
<id>prepare-agent</id>
211+
<goals>
212+
<goal>prepare-agent</goal>
213+
</goals>
214+
</execution>
215+
<execution>
216+
<id>report</id>
217+
<phase>test</phase>
218+
<goals>
219+
<goal>report</goal>
220+
</goals>
221+
</execution>
222+
</executions>
205223
</plugin>
206224
<plugin>
207225
<groupId>org.apache.maven.plugins</groupId>
@@ -285,7 +303,7 @@
285303
<supportProviderName>Redis</supportProviderName>
286304
<supportLogo>src/docs/asciidoc/images/redis.png</supportLogo>
287305
<supportSummary>
288-
<![CDATA[Contact us on the <a href="https://forum.redis.com">Redis Forum</a> or create an issue on <a href="https://github.com/redis-field-engineering/redis-kafka-connect">Github</a> where we provide support on a good faith effort basis.]]>
306+
<![CDATA[Contact us on the <a href="https://forum.redis.com">Redis Forum</a> or create an issue on <a href="https://github.com/redis-field-engineering/redis-kafka-connect">Github</a> where we provide support on a good faith effort basis.]]>
289307
</supportSummary>
290308
<supportUrl>${project.issueManagement.url}</supportUrl>
291309
<confluentControlCenterIntegration>true</confluentControlCenterIntegration>
@@ -422,4 +440,4 @@
422440
</plugin>
423441
</plugins>
424442
</reporting>
425-
</project>
443+
</project>

src/main/java/com/redis/kafka/connect/common/RedisConfig.java

Lines changed: 105 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -15,34 +15,35 @@
1515
*/
1616
package com.redis.kafka.connect.common;
1717

18+
import java.io.File;
1819
import java.time.Duration;
1920
import java.util.Map;
20-
import java.util.Optional;
2121

22+
import org.apache.commons.pool2.impl.GenericObjectPool;
2223
import org.apache.kafka.common.config.AbstractConfig;
2324
import org.apache.kafka.common.config.ConfigDef;
2425
import org.apache.kafka.common.config.types.Password;
25-
import org.slf4j.Logger;
26-
import org.slf4j.LoggerFactory;
2726

2827
import com.github.jcustenborder.kafka.connect.utils.config.ConfigKeyBuilder;
2928
import com.github.jcustenborder.kafka.connect.utils.config.ConfigUtils;
30-
import com.github.jcustenborder.kafka.connect.utils.config.validators.Validators;
3129
import com.google.common.base.Strings;
3230
import com.google.common.net.HostAndPort;
33-
import com.redis.lettucemod.RedisModulesClient;
34-
import com.redis.lettucemod.cluster.RedisModulesClusterClient;
31+
import com.redis.lettucemod.util.ClientBuilder;
32+
import com.redis.lettucemod.util.RedisURIBuilder;
33+
import com.redis.spring.batch.common.ConnectionPoolBuilder;
3534

3635
import io.lettuce.core.AbstractRedisClient;
3736
import io.lettuce.core.RedisURI;
37+
import io.lettuce.core.SslVerifyMode;
38+
import io.lettuce.core.api.StatefulConnection;
39+
import io.lettuce.core.codec.RedisCodec;
40+
import io.lettuce.core.codec.StringCodec;
3841

3942
public class RedisConfig extends AbstractConfig {
4043

41-
private static final Logger log = LoggerFactory.getLogger(RedisConfig.class);
42-
43-
public static final String CLIENT_MODE_CONFIG = "redis.client.mode";
44-
private static final ClientMode CLIENT_MODE_DEFAULT = ClientMode.standalone;
45-
private static final String CLIENT_MODE_DOC = "Redis standalone or cluster mode";
44+
public static final String CLUSTER_CONFIG = "redis.cluster";
45+
private static final boolean CLUSTER_DEFAULT = false;
46+
private static final String CLUSTER_DOC = "Connect to a Redis Cluster database";
4647

4748
public static final String HOST_CONFIG = "redis.host";
4849
private static final HostAndPort HOST_DEFAULT = HostAndPort.fromParts("localhost", RedisURI.DEFAULT_REDIS_PORT);
@@ -52,14 +53,6 @@ public class RedisConfig extends AbstractConfig {
5253
private static final String URI_DEFAULT = "";
5354
private static final String URI_DOC = "URI of the Redis database to connect to, e.g. redis://redis-12000.redis.com:12000. For secure connections use rediss URI scheme, e.g. rediss://...";
5455

55-
public static final String INSECURE_CONFIG = "redis.insecure";
56-
private static final boolean INSECURE_DEFAULT = false;
57-
private static final String INSECURE_DOC = "Allow insecure connections (e.g. invalid certificates) to Redis when using SSL";
58-
59-
public static final String TLS_CONFIG = "redis.tls";
60-
private static final boolean TLS_DEFAULT = false;
61-
private static final String TLS_DOC = "Establish a secure TLS connection";
62-
6356
public static final String USERNAME_CONFIG = "redis.username";
6457
private static final String USERNAME_DEFAULT = "";
6558
private static final String USERNAME_DOC = "Username to use to connect to Redis";
@@ -72,51 +65,36 @@ public class RedisConfig extends AbstractConfig {
7265
private static final long TIMEOUT_DEFAULT = RedisURI.DEFAULT_TIMEOUT;
7366
private static final String TIMEOUT_DOC = "Redis command timeout in seconds";
7467

75-
public RedisConfig(ConfigDef config, Map<?, ?> originals) {
76-
super(config, originals);
77-
}
68+
public static final String POOL_MAX_CONFIG = "redis.pool";
69+
private static final int POOL_MAX_DEFAULT = ConnectionPoolBuilder.DEFAULT_MAX_TOTAL;
70+
private static final String POOL_MAX_DOC = "Max pool connections";
7871

79-
@SuppressWarnings("deprecation")
80-
public RedisURI getRedisURI() {
81-
RedisURI uri = uri();
82-
if (Boolean.TRUE.equals(getBoolean(INSECURE_CONFIG))) {
83-
uri.setVerifyPeer(false);
84-
}
85-
if (Boolean.TRUE.equals(getBoolean(TLS_CONFIG))) {
86-
uri.setSsl(true);
87-
}
88-
username().ifPresent(uri::setUsername);
89-
password().ifPresent(uri::setPassword);
90-
Long timeout = getLong(TIMEOUT_CONFIG);
91-
if (timeout != null) {
92-
uri.setTimeout(Duration.ofSeconds(timeout));
93-
}
94-
return uri;
95-
}
72+
public static final String TLS_CONFIG = "redis.tls";
73+
private static final boolean TLS_DEFAULT = false;
74+
private static final String TLS_DOC = "Establish a secure TLS connection";
9675

97-
private Optional<String> password() {
98-
Password password = getPassword(PASSWORD_CONFIG);
99-
if (password == null || Strings.isNullOrEmpty(password.value())) {
100-
return Optional.empty();
101-
}
102-
return Optional.of(password.value());
103-
}
76+
public static final String INSECURE_CONFIG = "redis.insecure";
77+
private static final boolean INSECURE_DEFAULT = false;
78+
private static final String INSECURE_DOC = "Allow insecure connections (e.g. invalid certificates) to Redis when using SSL";
10479

105-
private Optional<String> username() {
106-
String username = getString(USERNAME_CONFIG);
107-
if (Strings.isNullOrEmpty(username)) {
108-
return Optional.empty();
109-
}
110-
return Optional.of(username);
111-
}
80+
public static final String KEY_CONFIG = "redis.key.file";
81+
public static final String KEY_DEFAULT = "";
82+
private static final String KEY_DOC = "PKCS#8 private key file to authenticate with (PEM format)";
11283

113-
private RedisURI uri() {
114-
String uriString = getString(URI_CONFIG);
115-
if (Strings.isNullOrEmpty(uriString)) {
116-
HostAndPort hostAndPort = ConfigUtils.hostAndPort(this, HOST_CONFIG, 6379);
117-
return RedisURI.create(hostAndPort.getHost(), hostAndPort.getPort());
118-
}
119-
return RedisURI.create(ConfigUtils.uri(this, URI_CONFIG));
84+
public static final String KEY_CERT_CONFIG = "redis.key.cert";
85+
public static final String KEY_CERT_DEFAULT = "";
86+
private static final String KEY_CERT_DOC = "X.509 certificate chain file to authenticate with (PEM format)";
87+
88+
public static final String KEY_PASSWORD_CONFIG = "redis.key.password";
89+
private static final String KEY_PASSWORD_DEFAULT = "";
90+
private static final String KEY_PASSWORD_DOC = "Password of the private key file. Leave empty if key file is not password-protected";
91+
92+
public static final String CACERT_CONFIG = "redis.cacert";
93+
public static final String CACERT_DEFAULT = "";
94+
private static final String CACERT_DOC = "X.509 CA certificate file to verify with";
95+
96+
public RedisConfig(ConfigDef config, Map<?, ?> originals) {
97+
super(config, originals);
12098
}
12199

122100
public static class RedisConfigDef extends ConfigDef {
@@ -131,9 +109,8 @@ public RedisConfigDef(ConfigDef base) {
131109
}
132110

133111
private void defineConfigs() {
134-
define(ConfigKeyBuilder.of(CLIENT_MODE_CONFIG, ConfigDef.Type.STRING).documentation(CLIENT_MODE_DOC)
135-
.defaultValue(CLIENT_MODE_DEFAULT.name()).validator(Validators.validEnum(ClientMode.class))
136-
.importance(ConfigDef.Importance.MEDIUM).build());
112+
define(ConfigKeyBuilder.of(CLUSTER_CONFIG, ConfigDef.Type.BOOLEAN).documentation(CLUSTER_DOC)
113+
.defaultValue(CLUSTER_DEFAULT).importance(ConfigDef.Importance.MEDIUM).build());
137114
define(ConfigKeyBuilder.of(HOST_CONFIG, ConfigDef.Type.STRING).documentation(HOST_DOC)
138115
.defaultValue(HOST_DEFAULT.toString()).importance(ConfigDef.Importance.HIGH).build());
139116
define(ConfigKeyBuilder.of(URI_CONFIG, ConfigDef.Type.STRING).documentation(URI_DOC)
@@ -148,25 +125,79 @@ private void defineConfigs() {
148125
.defaultValue(USERNAME_DEFAULT).importance(ConfigDef.Importance.MEDIUM).build());
149126
define(ConfigKeyBuilder.of(TIMEOUT_CONFIG, ConfigDef.Type.LONG).documentation(TIMEOUT_DOC)
150127
.defaultValue(TIMEOUT_DEFAULT).importance(ConfigDef.Importance.MEDIUM).build());
151-
128+
define(ConfigKeyBuilder.of(POOL_MAX_CONFIG, ConfigDef.Type.INT).documentation(POOL_MAX_DOC)
129+
.defaultValue(POOL_MAX_DEFAULT).importance(ConfigDef.Importance.MEDIUM).build());
130+
define(ConfigKeyBuilder.of(KEY_CONFIG, ConfigDef.Type.STRING).documentation(KEY_DOC)
131+
.defaultValue(KEY_DEFAULT).importance(ConfigDef.Importance.MEDIUM).build());
132+
define(ConfigKeyBuilder.of(KEY_CERT_CONFIG, ConfigDef.Type.STRING).documentation(KEY_CERT_DOC)
133+
.defaultValue(KEY_CERT_DEFAULT).importance(ConfigDef.Importance.MEDIUM).build());
134+
define(ConfigKeyBuilder.of(KEY_PASSWORD_CONFIG, ConfigDef.Type.PASSWORD).documentation(KEY_PASSWORD_DOC)
135+
.defaultValue(KEY_PASSWORD_DEFAULT).importance(ConfigDef.Importance.MEDIUM).build());
136+
define(ConfigKeyBuilder.of(CACERT_CONFIG, ConfigDef.Type.STRING).documentation(CACERT_DOC)
137+
.defaultValue(CACERT_DEFAULT).importance(ConfigDef.Importance.MEDIUM).build());
152138
}
153139

154140
}
155141

156-
public enum ClientMode {
142+
public RedisURI uri() {
143+
RedisURIBuilder builder = RedisURIBuilder.create();
144+
String uri = getString(URI_CONFIG);
145+
if (Strings.isNullOrEmpty(uri)) {
146+
HostAndPort hostAndPort = ConfigUtils.hostAndPort(this, HOST_CONFIG, 6379);
147+
builder.host(hostAndPort.getHost());
148+
builder.port(hostAndPort.getPort());
149+
} else {
150+
builder.uriString(uri);
151+
}
152+
if (Boolean.TRUE.equals(getBoolean(INSECURE_CONFIG))) {
153+
builder.sslVerifyMode(SslVerifyMode.NONE);
154+
}
155+
builder.ssl(getBoolean(TLS_CONFIG));
156+
String username = getString(USERNAME_CONFIG);
157+
if (!Strings.isNullOrEmpty(username)) {
158+
builder.username(username);
159+
}
160+
Password password = getPassword(PASSWORD_CONFIG);
161+
if (password != null && !Strings.isNullOrEmpty(password.value())) {
162+
builder.password(password.value().toCharArray());
163+
}
164+
Long timeout = getLong(TIMEOUT_CONFIG);
165+
if (timeout != null) {
166+
builder.timeout(Duration.ofSeconds(timeout));
167+
}
168+
return builder.build();
169+
}
157170

158-
standalone, cluster
171+
public AbstractRedisClient client(RedisURI uri) {
172+
ClientBuilder builder = ClientBuilder.create(uri);
173+
builder.cluster(getBoolean(CLUSTER_CONFIG));
174+
String keyFile = getString(KEY_CONFIG);
175+
if (!Strings.isNullOrEmpty(keyFile)) {
176+
builder.key(new File(keyFile));
177+
builder.keyCert(new File(getString(KEY_CERT_CONFIG)));
178+
Password password = getPassword(KEY_PASSWORD_CONFIG);
179+
if (password != null && !Strings.isNullOrEmpty(password.value())) {
180+
builder.keyPassword(password.value().toCharArray());
181+
}
182+
}
183+
String cacert = getString(CACERT_CONFIG);
184+
if (!Strings.isNullOrEmpty(cacert)) {
185+
builder.trustManager(new File(cacert));
186+
}
187+
return builder.build();
188+
}
159189

190+
public AbstractRedisClient client() {
191+
return client(uri());
160192
}
161193

162-
public AbstractRedisClient redisClient() {
163-
RedisURI uri = getRedisURI();
164-
ClientMode clientMode = ConfigUtils.getEnum(ClientMode.class, this, CLIENT_MODE_CONFIG);
165-
if (clientMode == ClientMode.cluster) {
166-
log.info("Connecting to Redis cluster with {}", uri);
167-
return RedisModulesClusterClient.create(uri);
168-
}
169-
return RedisModulesClient.create(uri);
194+
public GenericObjectPool<StatefulConnection<String, String>> pool(AbstractRedisClient client) {
195+
return pool(client, StringCodec.UTF8);
196+
}
197+
198+
public <K, V> GenericObjectPool<StatefulConnection<K, V>> pool(AbstractRedisClient client, RedisCodec<K, V> codec) {
199+
return ConnectionPoolBuilder.create(client).maxTotal(getInt(POOL_MAX_CONFIG)).build(codec);
200+
170201
}
171202

172203
}

src/main/java/com/redis/kafka/connect/sink/RedisSinkConfig.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,12 @@
1616
package com.redis.kafka.connect.sink;
1717

1818
import java.nio.charset.Charset;
19+
import java.time.Duration;
1920
import java.util.Arrays;
2021
import java.util.HashSet;
2122
import java.util.Map;
2223
import java.util.Objects;
24+
import java.util.Optional;
2325
import java.util.Set;
2426

2527
import org.apache.kafka.common.config.ConfigDef;
@@ -29,6 +31,8 @@
2931
import com.github.jcustenborder.kafka.connect.utils.config.ConfigUtils;
3032
import com.github.jcustenborder.kafka.connect.utils.config.validators.Validators;
3133
import com.redis.kafka.connect.common.RedisConfig;
34+
import com.redis.spring.batch.writer.WaitForReplication;
35+
import com.redis.spring.batch.writer.WriterOptions;
3236

3337
public class RedisSinkConfig extends RedisConfig {
3438

@@ -219,4 +223,15 @@ public boolean equals(Object obj) {
219223
&& waitTimeout == other.waitTimeout;
220224
}
221225

226+
public WriterOptions writerOptions() {
227+
return WriterOptions.builder().multiExec(isMultiexec()).waitForReplication(waitForReplication()).build();
228+
}
229+
230+
private Optional<WaitForReplication> waitForReplication() {
231+
if (waitReplicas > 0) {
232+
return Optional.of(WaitForReplication.of(waitReplicas, Duration.ofMillis(waitTimeout)));
233+
}
234+
return Optional.empty();
235+
}
236+
222237
}

0 commit comments

Comments
 (0)