Skip to content

Commit a4d5f90

Browse files
committed
Refactored source and sink connectors to use spring-batch-redis for reading and writing to Redis
1 parent 83bf138 commit a4d5f90

File tree

6 files changed

+136
-111
lines changed

6 files changed

+136
-111
lines changed

pom.xml

Lines changed: 16 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,22 @@
5656
<dependencies>
5757

5858
<dependency>
59-
<groupId>io.lettuce</groupId>
60-
<artifactId>lettuce-core</artifactId>
61-
<version>6.0.3.RELEASE</version>
59+
<groupId>org.projectlombok</groupId>
60+
<artifactId>lombok</artifactId>
61+
<version>1.18.18</version>
62+
<scope>provided</scope>
63+
</dependency>
64+
65+
<dependency>
66+
<groupId>org.slf4j</groupId>
67+
<artifactId>slf4j-api</artifactId>
68+
<version>1.7.30</version>
69+
</dependency>
70+
71+
<dependency>
72+
<groupId>com.redislabs</groupId>
73+
<artifactId>spring-batch-redis</artifactId>
74+
<version>2.9.6-SNAPSHOT</version>
6275
<exclusions>
6376
<exclusion>
6477
<groupId>io.netty</groupId>
@@ -67,13 +80,6 @@
6780
</exclusions>
6881
</dependency>
6982

70-
<dependency>
71-
<groupId>org.projectlombok</groupId>
72-
<artifactId>lombok</artifactId>
73-
<version>1.18.18</version>
74-
<scope>provided</scope>
75-
</dependency>
76-
7783
<dependency>
7884
<groupId>org.slf4j</groupId>
7985
<artifactId>slf4j-nop</artifactId>

src/main/java/com/redislabs/kafkaconnect/RedisEnterpriseSinkConnector.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
import java.util.List;
1212
import java.util.Map;
1313

14-
public class RedisEnterpriseSinkConnector extends SinkConnector {
14+
public class RedisEnterpriseSinkConnector extends SinkConnector {
1515

1616
private Map<String, String> props;
1717

src/main/java/com/redislabs/kafkaconnect/sink/RedisEnterpriseSinkConfig.java

Lines changed: 17 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -27,18 +27,21 @@ public class RedisEnterpriseSinkConfig extends AbstractConfig {
2727
public static final ConfigDef CONFIG_DEF = new RedisEnterpriseSinkConfigDef();
2828

2929
public static final String STREAM_NAME_FORMAT = "stream.name.format";
30-
private static final String STREAM_NAME_FORMAT_DEFAULT = "${topic}";
31-
private static final String STREAM_NAME_FORMAT_DOC =
32-
"A format string for the destination stream name, which may contain '${topic}' as a "
33-
+ "placeholder for the originating topic name.\n"
34-
+ "For example, ``kafka_${topic}`` for the topic 'orders' will map to the stream name "
35-
+ "'kafka_orders'.";
36-
private static final String STREAM_NAME_FORMAT_DISPLAY = "Stream Name Format";
30+
public static final String STREAM_NAME_FORMAT_DEFAULT = "${topic}";
31+
public static final String STREAM_NAME_FORMAT_DOC = "A format string for the destination stream name, which may contain '${topic}' as a " + "placeholder for the originating topic name.\n" + "For example, ``kafka_${topic}`` for the topic 'orders' will map to the stream name " + "'kafka_orders'.";
32+
public static final String STREAM_NAME_FORMAT_DISPLAY = "Stream Name Format";
33+
34+
public static final String TRANSACTIONAL = "transactional";
35+
public static final String TRANSACTIONAL_DEFAULT = "false";
36+
public static final String TRANSACTIONAL_DOC = "Whether to execute Redis commands in multi/exec transactions.";
37+
public static final String TRANSACTIONAL_DISPLAY = "Use Transactions";
3738

3839
@Getter
3940
private final String redisUri;
4041
@Getter
4142
private final String streamNameFormat;
43+
@Getter
44+
private final Boolean transactional;
4245

4346
public RedisEnterpriseSinkConfig(final Map<?, ?> originals) {
4447
this(originals, true);
@@ -48,16 +51,19 @@ private RedisEnterpriseSinkConfig(final Map<?, ?> originals, final boolean valid
4851
super(CONFIG_DEF, originals, false);
4952
redisUri = getString(RedisEnterpriseSourceConfig.REDIS_URI);
5053
streamNameFormat = getString(STREAM_NAME_FORMAT).trim();
54+
transactional = getBoolean(TRANSACTIONAL);
5155
}
5256

53-
private static class RedisEnterpriseSinkConfigDef extends ConfigDef {
57+
58+
public static class RedisEnterpriseSinkConfigDef extends ConfigDef {
5459

5560
public RedisEnterpriseSinkConfigDef() {
5661
String group = "Redis Enterprise";
57-
define(RedisEnterpriseSourceConfig.REDIS_URI, Type.STRING, RedisEnterpriseSourceConfig.REDIS_URI_DEFAULT, Importance.HIGH, RedisEnterpriseSourceConfig.REDIS_URI_DOC, group, 0, Width.MEDIUM, RedisEnterpriseSourceConfig.REDIS_URI_DISPLAY);
58-
define(STREAM_NAME_FORMAT, Type.STRING, STREAM_NAME_FORMAT_DEFAULT, Importance.MEDIUM, STREAM_NAME_FORMAT_DOC, group, 1, Width.MEDIUM, STREAM_NAME_FORMAT_DISPLAY);
62+
int order = 0;
63+
define(RedisEnterpriseSourceConfig.REDIS_URI, Type.STRING, RedisEnterpriseSourceConfig.REDIS_URI_DEFAULT, Importance.HIGH, RedisEnterpriseSourceConfig.REDIS_URI_DOC, group, ++order, Width.MEDIUM, RedisEnterpriseSourceConfig.REDIS_URI_DISPLAY);
64+
define(STREAM_NAME_FORMAT, Type.STRING, STREAM_NAME_FORMAT_DEFAULT, Importance.MEDIUM, STREAM_NAME_FORMAT_DOC, group, ++order, Width.MEDIUM, STREAM_NAME_FORMAT_DISPLAY);
65+
define(TRANSACTIONAL, Type.BOOLEAN, TRANSACTIONAL_DEFAULT, Importance.MEDIUM, TRANSACTIONAL_DOC, group, ++order, Width.SHORT, TRANSACTIONAL_DISPLAY);
5966
}
6067

6168
}
62-
6369
}

src/main/java/com/redislabs/kafkaconnect/sink/RedisEnterpriseSinkTask.java

Lines changed: 58 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -2,30 +2,37 @@
22

33
import com.redislabs.kafkaconnect.RedisEnterpriseSinkConnector;
44
import io.lettuce.core.RedisClient;
5-
import io.lettuce.core.RedisFuture;
5+
import io.lettuce.core.XAddArgs;
66
import io.lettuce.core.api.StatefulRedisConnection;
7-
import io.lettuce.core.api.async.RedisAsyncCommands;
7+
import io.lettuce.core.support.ConnectionPoolSupport;
88
import lombok.extern.slf4j.Slf4j;
9+
import org.apache.commons.pool2.impl.GenericObjectPool;
10+
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
911
import org.apache.kafka.connect.data.Field;
1012
import org.apache.kafka.connect.data.Struct;
1113
import org.apache.kafka.connect.errors.ConnectException;
1214
import org.apache.kafka.connect.sink.SinkRecord;
1315
import org.apache.kafka.connect.sink.SinkTask;
16+
import org.springframework.batch.item.ExecutionContext;
17+
import org.springframework.batch.item.ItemStreamSupport;
18+
import org.springframework.batch.item.ItemWriter;
19+
import org.springframework.batch.item.redis.RedisOperationItemWriter;
20+
import org.springframework.batch.item.redis.RedisTransactionItemWriter;
21+
import org.springframework.batch.item.redis.support.RedisOperation;
22+
import org.springframework.batch.item.redis.support.RedisOperationBuilder;
1423

24+
import java.util.ArrayList;
1525
import java.util.Collection;
16-
import java.util.HashMap;
1726
import java.util.LinkedHashMap;
1827
import java.util.Map;
19-
import java.util.concurrent.TimeUnit;
2028

2129
@Slf4j
2230
public class RedisEnterpriseSinkTask extends SinkTask {
2331

2432
private RedisClient client;
25-
private StatefulRedisConnection<String, String> connection;
26-
private RedisAsyncCommands<String, String> commands;
2733
private RedisEnterpriseSinkConfig sinkConfig;
28-
private long timeout;
34+
private ItemWriter<SinkRecord> writer;
35+
private GenericObjectPool<StatefulRedisConnection<String, String>> pool;
2936

3037
@Override
3138
public String version() {
@@ -34,44 +41,28 @@ public String version() {
3441

3542
@Override
3643
public void start(final Map<String, String> props) {
37-
this.sinkConfig = new RedisEnterpriseSinkConfig(props);
38-
this.client = RedisClient.create(sinkConfig.getRedisUri());
39-
this.connection = client.connect();
40-
this.commands = connection.async();
41-
this.commands.setAutoFlushCommands(false);
42-
this.timeout = connection.getTimeout().toMillis();
44+
sinkConfig = new RedisEnterpriseSinkConfig(props);
45+
client = RedisClient.create(sinkConfig.getRedisUri());
46+
GenericObjectPoolConfig<StatefulRedisConnection<String, String>> poolConfig = new GenericObjectPoolConfig<>();
47+
poolConfig.setMaxTotal(1);
48+
pool = ConnectionPoolSupport.createGenericObjectPool(client::connect, poolConfig);
49+
writer = writer(pool, Boolean.TRUE.equals(sinkConfig.getTransactional()));
50+
if (writer instanceof ItemStreamSupport) {
51+
((ItemStreamSupport) writer).open(new ExecutionContext());
52+
}
4353
}
4454

45-
@Override
46-
public void stop() {
47-
commands = null;
48-
if (connection != null) {
49-
connection.close();
50-
}
51-
if (client != null) {
52-
client.shutdown();
55+
private ItemWriter<SinkRecord> writer(GenericObjectPool<StatefulRedisConnection<String, String>> pool, boolean transactional) {
56+
XAddArgs args = new XAddArgs();
57+
RedisOperation<String, String, SinkRecord> xadd = RedisOperationBuilder.<String, String, SinkRecord>xadd().keyConverter(this::key).argsConverter(i -> args).bodyConverter(this::body).build();
58+
if (Boolean.TRUE.equals(transactional)) {
59+
return new RedisTransactionItemWriter<>(pool, xadd);
5360
}
61+
return new RedisOperationItemWriter<>(pool, xadd);
5462
}
5563

56-
@Override
57-
public void put(final Collection<SinkRecord> records) {
58-
if (records.isEmpty()) {
59-
return;
60-
}
61-
Map<SinkRecord, RedisFuture<?>> futures = new HashMap<>();
62-
for (SinkRecord record : records) {
63-
String stream = sinkConfig.getStreamNameFormat().replace("${topic}", record.topic());
64-
futures.put(record, commands.xadd(stream, body(record)));
65-
}
66-
commands.flushCommands();
67-
for (Map.Entry<SinkRecord, RedisFuture<?>> entry : futures.entrySet()) {
68-
try {
69-
entry.getValue().get(timeout, TimeUnit.MILLISECONDS);
70-
} catch (Exception e) {
71-
log.warn("Could not write record at offset {}", entry.getKey().kafkaOffset());
72-
}
73-
}
74-
log.info("Wrote {} messages to Redis", records.size());
64+
private String key(SinkRecord record) {
65+
return sinkConfig.getStreamNameFormat().replace("${topic}", record.topic());
7566
}
7667

7768
private Map<String, String> body(SinkRecord record) {
@@ -97,4 +88,31 @@ private Map<String, String> body(SinkRecord record) {
9788
throw new ConnectException("Unsupported source value type: " + record.valueSchema().type().name());
9889
}
9990

91+
@Override
92+
public void stop() {
93+
if (writer != null && writer instanceof ItemStreamSupport) {
94+
((ItemStreamSupport) writer).close();
95+
}
96+
if (pool != null) {
97+
pool.close();
98+
}
99+
if (client != null) {
100+
client.shutdown();
101+
}
102+
}
103+
104+
@Override
105+
public void put(final Collection<SinkRecord> records) {
106+
if (records.isEmpty()) {
107+
return;
108+
}
109+
try {
110+
writer.write(new ArrayList<>(records));
111+
log.info("Wrote {} records", records.size());
112+
} catch (Exception e) {
113+
log.warn("Could not write {} records", records.size(), e);
114+
}
115+
}
116+
117+
100118
}

src/main/java/com/redislabs/kafkaconnect/source/RedisEnterpriseSourceConfig.java

Lines changed: 21 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -34,28 +34,28 @@ public class RedisEnterpriseSourceConfig extends AbstractConfig {
3434
public static final String REDIS_URI_DOC = "URI of the Redis Enterprise database to connect to, e.g. redis://redis-12000.redislabs.com:12000";
3535

3636
public static final String STREAM_NAME = "stream";
37-
private static final String STREAM_NAME_DISPLAY = "Stream name";
38-
private static final String STREAM_NAME_DOC = "Name of the Redis stream to read from";
37+
public static final String STREAM_NAME_DISPLAY = "Stream name";
38+
public static final String STREAM_NAME_DOC = "Name of the Redis stream to read from";
3939

4040
public static final String STREAM_OFFSET = "stream.offset";
41-
private static final String STREAM_OFFSET_DEFAULT = "0-0";
42-
private static final String STREAM_OFFSET_DISPLAY = "Stream offset";
43-
private static final String STREAM_OFFSET_DOC = "Stream offset to start reading from";
41+
public static final String STREAM_OFFSET_DEFAULT = "0-0";
42+
public static final String STREAM_OFFSET_DISPLAY = "Stream offset";
43+
public static final String STREAM_OFFSET_DOC = "Stream offset to start reading from";
4444

4545
public static final String STREAM_COUNT = "stream.count";
46-
private static final int STREAM_COUNT_DEFAULT = 50;
47-
private static final String STREAM_COUNT_DISPLAY = "The maximum batch size";
48-
private static final String STREAM_COUNT_DOC = "Maximum number of stream messages to include in a single read when polling for new data (XREAD [COUNT count]). This setting can be used to limit the amount of data buffered internally in the connector.";
46+
public static final long STREAM_COUNT_DEFAULT = 50;
47+
public static final String STREAM_COUNT_DISPLAY = "The maximum batch size";
48+
public static final String STREAM_COUNT_DOC = "Maximum number of stream messages to include in a single read when polling for new data (XREAD [COUNT count]). This setting can be used to limit the amount of data buffered internally in the connector.";
4949

5050
public static final String STREAM_BLOCK = "stream.block";
51-
private static final long STREAM_BLOCK_DEFAULT = 100;
52-
private static final String STREAM_BLOCK_DISPLAY = "Max poll duration";
53-
private static final String STREAM_BLOCK_DOC = "The max amount of time in milliseconds to wait while polling for stream messages (XREAD [BLOCK milliseconds])";
51+
public static final long STREAM_BLOCK_DEFAULT = 100;
52+
public static final String STREAM_BLOCK_DISPLAY = "Max poll duration";
53+
public static final String STREAM_BLOCK_DOC = "The max amount of time in milliseconds to wait while polling for stream messages (XREAD [BLOCK milliseconds])";
5454

5555
public static final String TOPIC_NAME_FORMAT = "topic.name.format";
56-
private static final String TOPIC_NAME_FORMAT_DEFAULT = "${stream}";
57-
private static final String TOPIC_NAME_FORMAT_DOC = "A format string for the destination topic name, which may contain '${stream}' as a " + "placeholder for the originating topic name.\n" + "For example, ``redis_${stream}`` for the stream 'orders' will map to the topic name " + "'redis_orders'.";
58-
private static final String TOPIC_NAME_FORMAT_DISPLAY = "Topic Name Format";
56+
public static final String TOPIC_NAME_FORMAT_DEFAULT = "${stream}";
57+
public static final String TOPIC_NAME_FORMAT_DOC = "A format string for the destination topic name, which may contain '${stream}' as a " + "placeholder for the originating topic name.\n" + "For example, ``redis_${stream}`` for the stream 'orders' will map to the topic name " + "'redis_orders'.";
58+
public static final String TOPIC_NAME_FORMAT_DISPLAY = "Topic Name Format";
5959

6060

6161
@Getter
@@ -74,8 +74,8 @@ public String getStreamOffset() {
7474
return getString(STREAM_OFFSET);
7575
}
7676

77-
public int getStreamCount() {
78-
return getInt(STREAM_COUNT);
77+
public long getStreamCount() {
78+
return getLong(STREAM_COUNT);
7979
}
8080

8181
public long getStreamBlock() {
@@ -86,26 +86,26 @@ public String getTopicNameFormat() {
8686
return getString(TOPIC_NAME_FORMAT);
8787
}
8888

89-
private void validateStream() {
89+
public void validateStream() {
9090
String stream = getString(STREAM_NAME);
9191
if (stream == null || stream.isEmpty()) {
9292
throw new RedisEnterpriseConfigException(STREAM_NAME, stream, String.format("Missing stream configuration: '%s'", STREAM_NAME));
9393
}
9494
}
9595

96-
private static class RedisEnterpriseSourceConfigDef extends ConfigDef {
96+
public static class RedisEnterpriseSourceConfigDef extends ConfigDef {
9797

9898
public RedisEnterpriseSourceConfigDef() {
9999
String group = "Redis Enterprise";
100100
int index = 0;
101101
define(REDIS_URI, Type.STRING, REDIS_URI_DEFAULT, Importance.HIGH, REDIS_URI_DOC, group, ++index, Width.MEDIUM, REDIS_URI_DISPLAY);
102102
define(STREAM_NAME, Type.STRING, null, Importance.HIGH, STREAM_NAME_DOC, group, ++index, Width.SHORT, STREAM_NAME_DISPLAY);
103103
define(STREAM_OFFSET, Type.STRING, STREAM_OFFSET_DEFAULT, Importance.MEDIUM, STREAM_OFFSET_DOC, group, ++index, Width.SHORT, STREAM_OFFSET_DISPLAY);
104-
define(STREAM_COUNT, Type.INT, STREAM_COUNT_DEFAULT, ConfigDef.Range.atLeast(1), Importance.LOW, STREAM_COUNT_DOC, group, ++index, Width.MEDIUM, STREAM_COUNT_DISPLAY);
105-
define(STREAM_BLOCK, Type.LONG, STREAM_BLOCK_DEFAULT, ConfigDef.Range.atLeast(1), Importance.LOW, STREAM_BLOCK_DOC, group, ++index, Width.MEDIUM, STREAM_BLOCK_DISPLAY);
104+
define(STREAM_COUNT, Type.LONG, STREAM_COUNT_DEFAULT, Range.atLeast(1), Importance.LOW, STREAM_COUNT_DOC, group, ++index, Width.MEDIUM, STREAM_COUNT_DISPLAY);
105+
define(STREAM_BLOCK, Type.LONG, STREAM_BLOCK_DEFAULT, Range.atLeast(1), Importance.LOW, STREAM_BLOCK_DOC, group, ++index, Width.MEDIUM, STREAM_BLOCK_DISPLAY);
106106
group = "Connector";
107107
index = 0;
108-
define(TOPIC_NAME_FORMAT, ConfigDef.Type.STRING, TOPIC_NAME_FORMAT_DEFAULT, Importance.MEDIUM, TOPIC_NAME_FORMAT_DOC, group, ++index, ConfigDef.Width.LONG, TOPIC_NAME_FORMAT_DISPLAY);
108+
define(TOPIC_NAME_FORMAT, Type.STRING, TOPIC_NAME_FORMAT_DEFAULT, Importance.MEDIUM, TOPIC_NAME_FORMAT_DOC, group, ++index, Width.LONG, TOPIC_NAME_FORMAT_DISPLAY);
109109
}
110110

111111
@Override
@@ -124,5 +124,4 @@ public Map<String, ConfigValue> validateAll(final Map<String, String> props) {
124124
return results;
125125
}
126126
}
127-
128127
}

0 commit comments

Comments
 (0)