Skip to content

Commit 430e0ab

Browse files
committed
Adds tests
#15
1 parent d75498e commit 430e0ab

File tree

3 files changed

+154
-7
lines changed

3 files changed

+154
-7
lines changed

src/main/java/com/redis/kafka/connect/source/KeySourceRecordReader.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package com.redis.kafka.connect.source;
22

3+
import java.time.Clock;
34
import java.time.Duration;
4-
import java.time.Instant;
55
import java.util.HashMap;
66
import java.util.List;
77
import java.util.Map;
@@ -32,7 +32,7 @@ public class KeySourceRecordReader extends AbstractSourceRecordReader<DataStruct
3232
private static final Schema STRING_VALUE_SCHEMA = Schema.STRING_SCHEMA;
3333
private static final String HASH_VALUE_SCHEMA_NAME = "com.redis.kafka.connect.HashEventValue";
3434
private static final Schema HASH_VALUE_SCHEMA = SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA)
35-
.name(HASH_VALUE_SCHEMA_NAME);
35+
.name(HASH_VALUE_SCHEMA_NAME).build();
3636

3737
private final int batchSize;
3838
private final String topic;
@@ -41,6 +41,7 @@ public class KeySourceRecordReader extends AbstractSourceRecordReader<DataStruct
4141
private AbstractRedisClient client;
4242
private GenericObjectPool<StatefulConnection<String, String>> pool;
4343
private StatefulRedisPubSubConnection<String, String> pubSubConnection;
44+
Clock clock = Clock.systemDefaultZone();
4445

4546
public KeySourceRecordReader(RedisSourceConfig sourceConfig, Duration idleTimeout) {
4647
super(sourceConfig);
@@ -99,7 +100,7 @@ protected SourceRecord convert(DataStructure<String> input) {
99100
Map<String, ?> sourcePartition = new HashMap<>();
100101
Map<String, ?> sourceOffset = new HashMap<>();
101102
return new SourceRecord(sourcePartition, sourceOffset, topic, null, KEY_SCHEMA, input.getKey(), schema(input),
102-
input.getValue(), Instant.now().toEpochMilli());
103+
input.getValue(), clock.instant().toEpochMilli());
103104
}
104105

105106
private Schema schema(DataStructure<String> input) {

src/main/java/com/redis/kafka/connect/source/StreamSourceRecordReader.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
package com.redis.kafka.connect.source;
22

3+
import java.time.Clock;
34
import java.time.Duration;
4-
import java.time.Instant;
55
import java.util.Collections;
66
import java.util.HashMap;
77
import java.util.List;
@@ -33,14 +33,15 @@ public class StreamSourceRecordReader extends AbstractSourceRecordReader<StreamM
3333
private static final Schema KEY_SCHEMA = Schema.STRING_SCHEMA;
3434
private static final String VALUE_SCHEMA_NAME = "com.redis.kafka.connect.stream.Value";
3535
private static final Schema VALUE_SCHEMA = SchemaBuilder.struct().field(FIELD_ID, Schema.STRING_SCHEMA)
36-
.field(FIELD_BODY, SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA))
37-
.field(FIELD_STREAM, Schema.STRING_SCHEMA).name(VALUE_SCHEMA_NAME);
36+
.field(FIELD_BODY, SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).build())
37+
.field(FIELD_STREAM, Schema.STRING_SCHEMA).name(VALUE_SCHEMA_NAME).build();
3838
private final String topic;
3939
private final String consumer;
4040

4141
private StreamItemReader<String, String> reader;
4242
private AbstractRedisClient client;
4343
private GenericObjectPool<StatefulConnection<String, String>> pool;
44+
Clock clock = Clock.systemDefaultZone();
4445

4546
public StreamSourceRecordReader(RedisSourceConfig sourceConfig, int taskId) {
4647
super(sourceConfig);
@@ -92,7 +93,7 @@ protected SourceRecord convert(StreamMessage<String, String> message) {
9293
Struct value = new Struct(VALUE_SCHEMA).put(FIELD_ID, message.getId()).put(FIELD_BODY, message.getBody())
9394
.put(FIELD_STREAM, message.getStream());
9495
return new SourceRecord(sourcePartition, sourceOffset, topic, null, KEY_SCHEMA, key, VALUE_SCHEMA, value,
95-
Instant.now().toEpochMilli());
96+
clock.instant().toEpochMilli());
9697
}
9798

9899
}
Lines changed: 145 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,145 @@
1+
package com.redis.kafka.connect.source;
2+
3+
import static org.hamcrest.CoreMatchers.equalTo;
4+
import static org.hamcrest.MatcherAssert.assertThat;
5+
6+
import java.time.Clock;
7+
import java.time.Instant;
8+
import java.time.ZoneId;
9+
import java.util.HashMap;
10+
import java.util.Map;
11+
import java.util.stream.Stream;
12+
13+
import org.apache.kafka.connect.data.Schema;
14+
import org.apache.kafka.connect.data.SchemaBuilder;
15+
import org.apache.kafka.connect.data.Struct;
16+
import org.apache.kafka.connect.source.SourceRecord;
17+
import org.junit.jupiter.api.extension.ExtensionContext;
18+
import org.junit.jupiter.params.ParameterizedTest;
19+
import org.junit.jupiter.params.provider.Arguments;
20+
import org.junit.jupiter.params.provider.ArgumentsProvider;
21+
import org.junit.jupiter.params.provider.ArgumentsSource;
22+
23+
import io.lettuce.core.StreamMessage;
24+
25+
class StreamSourceRecordReaderTest {
26+
27+
public static final String OFFSET_FIELD = "offset";
28+
public static final String FIELD_ID = "id";
29+
public static final String FIELD_BODY = "body";
30+
public static final String FIELD_STREAM = "stream";
31+
private static final String VALUE_SCHEMA_NAME = "com.redis.kafka.connect.stream.Value";
32+
private static final long NOW = System.currentTimeMillis();
33+
private static final Clock CLOCK = Clock.fixed(Instant.ofEpochMilli(NOW), ZoneId.systemDefault());
34+
35+
// This is published, so if it's changed, it may impact users.
36+
private static final Schema PUBLISHED_SCHEMA =
37+
SchemaBuilder.struct().field(FIELD_ID, Schema.STRING_SCHEMA)
38+
.field(FIELD_BODY, SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).build())
39+
.field(FIELD_STREAM, Schema.STRING_SCHEMA).name(VALUE_SCHEMA_NAME).build();
40+
41+
@ParameterizedTest
42+
@ArgumentsSource(ConvertArgs.class)
43+
void testConvertStreamMessageOfStringString(ConvertArgs args) {
44+
final StreamSourceRecordReader r = new StreamSourceRecordReader(args.config, 0);
45+
r.clock = CLOCK;
46+
47+
final SourceRecord got = r.convert(args.message);
48+
49+
assertThat(got, equalTo(args.want));
50+
}
51+
52+
static class ConvertArgs implements ArgumentsProvider, Arguments {
53+
@Override
54+
public Stream<? extends Arguments> provideArguments(ExtensionContext context) throws Exception {
55+
return Stream.of(
56+
new ConvertArgs(
57+
new RedisSourceConfig(mapOf("redis.stream.name", "stream1")),
58+
new StreamMessage<>("stream1", "1-0", mapOf()),
59+
new SourceRecord(
60+
mapOf(),
61+
mapOf("offset", "1-0"),
62+
"stream1",
63+
null,
64+
Schema.STRING_SCHEMA,
65+
"1-0",
66+
PUBLISHED_SCHEMA,
67+
new Struct(PUBLISHED_SCHEMA)
68+
.put(FIELD_ID, "1-0")
69+
.put(FIELD_STREAM, "stream1")
70+
.put(FIELD_BODY, mapOf()),
71+
NOW)),
72+
73+
new ConvertArgs(
74+
new RedisSourceConfig(mapOf("redis.stream.name", "stream2")),
75+
new StreamMessage<>("stream2", "2-0", mapOf("key2", "value2")),
76+
new SourceRecord(
77+
mapOf(),
78+
mapOf("offset", "2-0"),
79+
"stream2",
80+
null,
81+
Schema.STRING_SCHEMA,
82+
"2-0",
83+
PUBLISHED_SCHEMA,
84+
new Struct(PUBLISHED_SCHEMA)
85+
.put(FIELD_ID, "2-0")
86+
.put(FIELD_STREAM, "stream2")
87+
.put(FIELD_BODY, mapOf("key2", "value2")),
88+
NOW)),
89+
90+
new ConvertArgs(
91+
new RedisSourceConfig(
92+
mapOf(
93+
"redis.stream.name",
94+
"stream3",
95+
"topic",
96+
"topic3")),
97+
new StreamMessage<>("stream3", "3-0", mapOf("key3", "value3")),
98+
new SourceRecord(
99+
mapOf(),
100+
mapOf("offset", "3-0"),
101+
"topic3",
102+
null,
103+
Schema.STRING_SCHEMA,
104+
"3-0",
105+
PUBLISHED_SCHEMA,
106+
new Struct(PUBLISHED_SCHEMA)
107+
.put(FIELD_ID, "3-0")
108+
.put(FIELD_STREAM, "stream3")
109+
.put(FIELD_BODY, mapOf("key3", "value3")),
110+
NOW))
111+
);
112+
}
113+
114+
RedisSourceConfig config;
115+
StreamMessage<String, String> message;
116+
SourceRecord want;
117+
118+
ConvertArgs() {
119+
}
120+
121+
ConvertArgs(RedisSourceConfig config, StreamMessage<String, String> message, SourceRecord want) {
122+
this.config = config;
123+
this.message = message;
124+
this.want = want;
125+
}
126+
127+
@Override
128+
public Object[] get() {
129+
return new Object[] { this };
130+
}
131+
}
132+
133+
static Map<String, String> mapOf(String... args) {
134+
final HashMap<String, String> ret = new HashMap<>();
135+
int i = 0;
136+
for (; i < args.length; i+=2) {
137+
ret.put(args[i], args[i+1]);
138+
}
139+
if (i != args.length) {
140+
throw new IllegalArgumentException("Expects an even number of arguments");
141+
}
142+
return ret;
143+
}
144+
145+
}

0 commit comments

Comments
 (0)