Skip to content

Commit 9b94752

Browse files
committed
fixed unit tests and added coverage report
1 parent 765f03d commit 9b94752

11 files changed

+149
-148
lines changed

pom.xml

Lines changed: 16 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -58,27 +58,14 @@
5858
<dependency>
5959
<groupId>org.projectlombok</groupId>
6060
<artifactId>lombok</artifactId>
61-
<version>1.18.18</version>
61+
<version>1.18.20</version>
6262
<scope>provided</scope>
6363
</dependency>
6464

65-
<dependency>
66-
<groupId>org.slf4j</groupId>
67-
<artifactId>slf4j-api</artifactId>
68-
<version>1.7.30</version>
69-
</dependency>
70-
7165
<dependency>
7266
<groupId>com.redislabs</groupId>
7367
<artifactId>spring-batch-redis</artifactId>
74-
<version>2.10.2</version>
75-
</dependency>
76-
77-
<dependency>
78-
<groupId>org.slf4j</groupId>
79-
<artifactId>slf4j-nop</artifactId>
80-
<version>1.7.30</version>
81-
<scope>test</scope>
68+
<version>2.12.3</version>
8269
</dependency>
8370

8471
<dependency>
@@ -89,30 +76,35 @@
8976
</dependency>
9077

9178
<dependency>
92-
<groupId>org.testcontainers</groupId>
93-
<artifactId>testcontainers</artifactId>
94-
<version>1.15.2</version>
79+
<groupId>com.redislabs</groupId>
80+
<artifactId>testcontainers-redis</artifactId>
81+
<version>1.1.3</version>
9582
<scope>test</scope>
9683
</dependency>
9784

9885
<dependency>
9986
<groupId>org.testcontainers</groupId>
10087
<artifactId>junit-jupiter</artifactId>
101-
<version>1.15.2</version>
88+
<version>1.15.3</version>
10289
<scope>test</scope>
10390
</dependency>
10491

10592
<dependency>
10693
<groupId>io.projectreactor</groupId>
10794
<artifactId>reactor-test</artifactId>
108-
<version>3.4.2</version>
95+
<version>3.4.6</version>
10996
<scope>test</scope>
11097
</dependency>
11198

11299
</dependencies>
113100

114101
<build>
115102
<plugins>
103+
<plugin>
104+
<groupId>org.codehaus.mojo</groupId>
105+
<artifactId>versions-maven-plugin</artifactId>
106+
<version>2.8.1</version>
107+
</plugin>
116108
<plugin>
117109
<groupId>org.apache.maven.plugins</groupId>
118110
<artifactId>maven-checkstyle-plugin</artifactId>
@@ -124,20 +116,22 @@
124116
<plugin>
125117
<groupId>io.confluent</groupId>
126118
<artifactId>kafka-connect-maven-plugin</artifactId>
127-
<version>0.11.3</version>
119+
<version>0.12.0</version>
128120
<executions>
129121
<execution>
130122
<id>hub</id>
131123
<goals>
132124
<goal>kafka-connect</goal>
133125
</goals>
134126
<configuration>
127+
<sourceUrl>https://github.com/RedisLabs-Field-Engineering/redis-enterprise-kafka</sourceUrl>
135128
<ownerName>Redis Labs</ownerName>
136129
<ownerUsername>redislabs</ownerUsername>
130+
<ownerUrl>https://github.com/RedisLabs-Field-Engineering/redis-enterprise-kafka</ownerUrl>
137131
<dockerNamespace>redislabs</dockerNamespace>
138132
<dockerName>redis-enterprise-kafka-docker</dockerName>
139133
<confluentControlCenterIntegration>true</confluentControlCenterIntegration>
140-
<documentationUrl>https://redislabs.com/redis-enterprise-kafka</documentationUrl>
134+
<documentationUrl>https://developer.redislabs.com/kafka</documentationUrl>
141135
<componentTypes>
142136
<componentType>sink</componentType>
143137
<componentType>source</componentType>

src/main/java/com/redislabs/kafkaconnect/RedisEnterpriseSinkConnector.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,18 @@
1+
/*
2+
* Copyright © 2021 Redis Labs
3+
* <p>
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
* <p>
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
* <p>
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
116
package com.redislabs.kafkaconnect;
217

318
import com.redislabs.kafkaconnect.sink.RedisEnterpriseSinkConfig;

src/main/java/com/redislabs/kafkaconnect/RedisEnterpriseSourceConnector.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/**
1+
/*
22
* Copyright © 2021 Redis Labs
33
* Licensed under the Apache License, Version 2.0 (the "License");
44
* you may not use this file except in compliance with the License.
@@ -12,6 +12,8 @@
1212
*/
1313
package com.redislabs.kafkaconnect;
1414

15+
import com.redislabs.kafkaconnect.source.RedisEnterpriseSourceConfig;
16+
import com.redislabs.kafkaconnect.source.RedisEnterpriseSourceTask;
1517
import org.apache.kafka.common.config.ConfigDef;
1618
import org.apache.kafka.common.utils.AppInfoParser;
1719
import org.apache.kafka.connect.connector.Task;

src/main/java/com/redislabs/kafkaconnect/sink/RedisEnterpriseSinkConfig.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/**
1+
/*
22
* Copyright © 2021 Redis Labs
33
* <p>
44
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -15,7 +15,7 @@
1515
*/
1616
package com.redislabs.kafkaconnect.sink;
1717

18-
import com.redislabs.kafkaconnect.RedisEnterpriseSourceConfig;
18+
import com.redislabs.kafkaconnect.source.RedisEnterpriseSourceConfig;
1919
import lombok.Getter;
2020
import org.apache.kafka.common.config.AbstractConfig;
2121
import org.apache.kafka.common.config.ConfigDef;
@@ -26,8 +26,10 @@ public class RedisEnterpriseSinkConfig extends AbstractConfig {
2626

2727
public static final ConfigDef CONFIG_DEF = new RedisEnterpriseSinkConfigDef();
2828

29+
public static final String TOKEN_TOPIC = "${topic}";
30+
2931
public static final String STREAM_NAME = "redis.stream.name";
30-
public static final String STREAM_NAME_DEFAULT = "${topic}";
32+
public static final String STREAM_NAME_DEFAULT = TOKEN_TOPIC;
3133
public static final String STREAM_NAME_DOC = "A format string for the destination stream name, which may contain '${topic}' as a " + "placeholder for the originating topic name.\n" + "For example, ``kafka_${topic}`` for the topic 'orders' will map to the stream name " + "'kafka_orders'.";
3234
public static final String STREAM_NAME_DISPLAY = "Stream Name Format";
3335

src/main/java/com/redislabs/kafkaconnect/sink/RedisEnterpriseSinkTask.java

Lines changed: 24 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,32 @@
1+
/*
2+
* Copyright © 2021 Redis Labs
3+
* <p>
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
* <p>
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
* <p>
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
116
package com.redislabs.kafkaconnect.sink;
217

318
import com.redislabs.kafkaconnect.RedisEnterpriseSinkConnector;
419
import io.lettuce.core.RedisClient;
520
import io.lettuce.core.XAddArgs;
6-
import io.lettuce.core.api.StatefulRedisConnection;
7-
import io.lettuce.core.support.ConnectionPoolSupport;
821
import lombok.extern.slf4j.Slf4j;
9-
import org.apache.commons.pool2.impl.GenericObjectPool;
10-
import org.apache.commons.pool2.impl.GenericObjectPoolConfig;
1122
import org.apache.kafka.connect.data.Field;
1223
import org.apache.kafka.connect.data.Struct;
1324
import org.apache.kafka.connect.errors.ConnectException;
1425
import org.apache.kafka.connect.sink.SinkRecord;
1526
import org.apache.kafka.connect.sink.SinkTask;
1627
import org.springframework.batch.item.ExecutionContext;
17-
import org.springframework.batch.item.ItemStreamSupport;
18-
import org.springframework.batch.item.ItemWriter;
19-
import org.springframework.batch.item.redis.RedisOperationItemWriter;
20-
import org.springframework.batch.item.redis.RedisTransactionItemWriter;
21-
import org.springframework.batch.item.redis.support.RedisOperation;
22-
import org.springframework.batch.item.redis.support.RedisOperationBuilder;
28+
import org.springframework.batch.item.redis.OperationItemWriter;
29+
import org.springframework.batch.item.redis.RedisOperation;
2330

2431
import java.util.ArrayList;
2532
import java.util.Collection;
@@ -31,8 +38,7 @@ public class RedisEnterpriseSinkTask extends SinkTask {
3138

3239
private RedisClient client;
3340
private RedisEnterpriseSinkConfig sinkConfig;
34-
private ItemWriter<SinkRecord> writer;
35-
private GenericObjectPool<StatefulRedisConnection<String, String>> pool;
41+
private OperationItemWriter<String, String, SinkRecord> writer;
3642

3743
@Override
3844
public String version() {
@@ -43,26 +49,13 @@ public String version() {
4349
public void start(final Map<String, String> props) {
4450
sinkConfig = new RedisEnterpriseSinkConfig(props);
4551
client = RedisClient.create(sinkConfig.getRedisUri());
46-
GenericObjectPoolConfig<StatefulRedisConnection<String, String>> poolConfig = new GenericObjectPoolConfig<>();
47-
poolConfig.setMaxTotal(1);
48-
pool = ConnectionPoolSupport.createGenericObjectPool(client::connect, poolConfig);
49-
writer = writer(pool, Boolean.TRUE.equals(sinkConfig.isMultiexec()));
50-
if (writer instanceof ItemStreamSupport) {
51-
((ItemStreamSupport) writer).open(new ExecutionContext());
52-
}
53-
}
54-
55-
private ItemWriter<SinkRecord> writer(GenericObjectPool<StatefulRedisConnection<String, String>> pool, boolean transactional) {
5652
XAddArgs args = new XAddArgs();
57-
RedisOperation<String, String, SinkRecord> xadd = RedisOperationBuilder.<String, String, SinkRecord>xadd().keyConverter(this::key).argsConverter(i -> args).bodyConverter(this::body).build();
58-
if (Boolean.TRUE.equals(transactional)) {
59-
return new RedisTransactionItemWriter<>(pool, xadd);
60-
}
61-
return new RedisOperationItemWriter<>(pool, xadd);
53+
writer = OperationItemWriter.operation(RedisOperation.<SinkRecord>xadd().key(this::key).args(i -> args).body(this::body).build()).client(client).transactional(Boolean.TRUE.equals(sinkConfig.isMultiexec())).build();
54+
writer.open(new ExecutionContext());
6255
}
6356

6457
private String key(SinkRecord record) {
65-
return sinkConfig.getStreamNameFormat().replace("${topic}", record.topic());
58+
return sinkConfig.getStreamNameFormat().replace(RedisEnterpriseSinkConfig.TOKEN_TOPIC, record.topic());
6659
}
6760

6861
private Map<String, String> body(SinkRecord record) {
@@ -90,14 +83,12 @@ private Map<String, String> body(SinkRecord record) {
9083

9184
@Override
9285
public void stop() {
93-
if (writer != null && writer instanceof ItemStreamSupport) {
94-
((ItemStreamSupport) writer).close();
95-
}
96-
if (pool != null) {
97-
pool.close();
86+
if (writer != null) {
87+
writer.close();
9888
}
9989
if (client != null) {
10090
client.shutdown();
91+
client.getResources().shutdown();
10192
}
10293
}
10394

src/main/java/com/redislabs/kafkaconnect/RedisEnterpriseSourceConfig.java renamed to src/main/java/com/redislabs/kafkaconnect/source/RedisEnterpriseSourceConfig.java

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/**
1+
/*
22
* Copyright © 2021 Redis Labs
33
* <p>
44
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -13,7 +13,7 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package com.redislabs.kafkaconnect;
16+
package com.redislabs.kafkaconnect.source;
1717

1818
import com.redislabs.kafkaconnect.common.RedisEnterpriseConfigException;
1919
import lombok.Getter;
@@ -28,6 +28,8 @@ public class RedisEnterpriseSourceConfig extends AbstractConfig {
2828

2929
public static final ConfigDef CONFIG_DEF = new RedisEnterpriseSourceConfigDef();
3030

31+
public static final String TOKEN_STREAM = "${stream}";
32+
3133
public static final String REDIS_URI = "redis.uri";
3234
public static final String REDIS_URI_DEFAULT = "redis://localhost:6379";
3335
public static final String REDIS_URI_DISPLAY = "Connection URI";
@@ -53,7 +55,7 @@ public class RedisEnterpriseSourceConfig extends AbstractConfig {
5355
public static final String STREAM_BLOCK_DOC = "The max amount of time in milliseconds to wait while polling for stream messages (XREAD [BLOCK milliseconds])";
5456

5557
public static final String TOPIC_NAME_FORMAT = "topic.name.format";
56-
public static final String TOPIC_NAME_FORMAT_DEFAULT = "${stream}";
58+
public static final String TOPIC_NAME_FORMAT_DEFAULT = TOKEN_STREAM;
5759
public static final String TOPIC_NAME_FORMAT_DOC = "A format string for the destination topic name, which may contain '${stream}' as a " + "placeholder for the originating topic name.\n" + "For example, ``redis_${stream}`` for the stream 'orders' will map to the topic name " + "'redis_orders'.";
5860
public static final String TOPIC_NAME_FORMAT_DISPLAY = "Topic Name Format";
5961

src/main/java/com/redislabs/kafkaconnect/RedisEnterpriseSourceTask.java renamed to src/main/java/com/redislabs/kafkaconnect/source/RedisEnterpriseSourceTask.java

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
/**
1+
/*
22
* Copyright © 2021 Redis Labs
33
* <p>
44
* Licensed under the Apache License, Version 2.0 (the "License");
@@ -13,21 +13,22 @@
1313
* See the License for the specific language governing permissions and
1414
* limitations under the License.
1515
*/
16-
package com.redislabs.kafkaconnect;
16+
package com.redislabs.kafkaconnect.source;
1717

18+
import com.redislabs.kafkaconnect.RedisEnterpriseSourceConnector;
1819
import io.lettuce.core.RedisClient;
1920
import io.lettuce.core.StreamMessage;
2021
import io.lettuce.core.XReadArgs;
21-
import io.lettuce.core.api.StatefulRedisConnection;
2222
import lombok.extern.slf4j.Slf4j;
2323
import org.apache.kafka.connect.data.Schema;
2424
import org.apache.kafka.connect.data.SchemaBuilder;
2525
import org.apache.kafka.connect.errors.ConnectException;
2626
import org.apache.kafka.connect.source.SourceRecord;
2727
import org.apache.kafka.connect.source.SourceTask;
2828
import org.springframework.batch.item.ExecutionContext;
29-
import org.springframework.batch.item.redis.RedisStreamItemReader;
29+
import org.springframework.batch.item.redis.StreamItemReader;
3030

31+
import java.time.Duration;
3132
import java.time.Instant;
3233
import java.util.ArrayList;
3334
import java.util.Collections;
@@ -45,9 +46,8 @@ public class RedisEnterpriseSourceTask extends SourceTask {
4546
private static final Schema VALUE_SCHEMA = SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).name(VALUE_SCHEMA_NAME);
4647

4748
private RedisClient client;
48-
private StatefulRedisConnection<String, String> connection;
4949
private RedisEnterpriseSourceConfig sourceConfig;
50-
private RedisStreamItemReader<String, String> reader;
50+
private StreamItemReader<String, String> reader;
5151
private Map<String, String> offsetKey;
5252

5353
@Override
@@ -59,7 +59,6 @@ public String version() {
5959
public void start(Map<String, String> props) {
6060
this.sourceConfig = new RedisEnterpriseSourceConfig(props);
6161
this.client = RedisClient.create(sourceConfig.getRedisUri());
62-
this.connection = client.connect();
6362
this.offsetKey = Collections.singletonMap(STREAM_FIELD, sourceConfig.getStreamName());
6463
String offset = sourceConfig.getStreamOffset();
6564
if (context != null) {
@@ -76,7 +75,8 @@ public void start(Map<String, String> props) {
7675
}
7776
}
7877
}
79-
this.reader = RedisStreamItemReader.builder(connection).block(sourceConfig.getStreamBlock()).count(sourceConfig.getStreamCount()).offset(XReadArgs.StreamOffset.from(sourceConfig.getStreamName(), offset)).build();
78+
XReadArgs.StreamOffset<String> streamOffset = XReadArgs.StreamOffset.from(sourceConfig.getStreamName(), offset);
79+
this.reader = StreamItemReader.client(client).offset(streamOffset).block(Duration.ofMillis(sourceConfig.getStreamBlock())).count(sourceConfig.getStreamCount()).build();
8080
this.reader.open(new ExecutionContext());
8181
}
8282

@@ -85,21 +85,19 @@ public void stop() {
8585
if (reader != null) {
8686
reader.close();
8787
}
88-
if (connection != null) {
89-
connection.close();
90-
}
9188
if (client != null) {
9289
client.shutdown();
90+
client.getResources().shutdown();
9391
}
9492
}
9593

9694
@Override
9795
public List<SourceRecord> poll() {
98-
log.debug("Reading from stream {} at offset {}", reader.getOffset().getName(), reader.getOffset().getOffset());
96+
log.debug("Reading from offset {}", reader.getOffset().getOffset());
9997
List<SourceRecord> records = new ArrayList<>();
10098
for (StreamMessage<String, String> message : reader.readMessages()) {
10199
Map<String, String> offsetValue = Collections.singletonMap(OFFSET_FIELD, message.getId());
102-
String topic = sourceConfig.getTopicNameFormat().replace("${stream}", message.getStream());
100+
String topic = sourceConfig.getTopicNameFormat().replace(RedisEnterpriseSourceConfig.TOKEN_STREAM, message.getStream());
103101
records.add(new SourceRecord(offsetKey, offsetValue, topic, null, KEY_SCHEMA, message.getId(), VALUE_SCHEMA, message.getBody(), Instant.now().getEpochSecond()));
104102
}
105103
if (records.isEmpty()) {

0 commit comments

Comments
 (0)