diff --git a/src/main/java/com/redis/kafka/connect/source/KeySourceRecordReader.java b/src/main/java/com/redis/kafka/connect/source/KeySourceRecordReader.java index ebae27c..e6447d8 100644 --- a/src/main/java/com/redis/kafka/connect/source/KeySourceRecordReader.java +++ b/src/main/java/com/redis/kafka/connect/source/KeySourceRecordReader.java @@ -1,7 +1,7 @@ package com.redis.kafka.connect.source; +import java.time.Clock; import java.time.Duration; -import java.time.Instant; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -32,7 +32,7 @@ public class KeySourceRecordReader extends AbstractSourceRecordReader> pool; private StatefulRedisPubSubConnection pubSubConnection; + Clock clock = Clock.systemDefaultZone(); public KeySourceRecordReader(RedisSourceConfig sourceConfig, Duration idleTimeout) { super(sourceConfig); @@ -99,7 +100,7 @@ protected SourceRecord convert(DataStructure input) { Map sourcePartition = new HashMap<>(); Map sourceOffset = new HashMap<>(); return new SourceRecord(sourcePartition, sourceOffset, topic, null, KEY_SCHEMA, input.getKey(), schema(input), - input.getValue(), Instant.now().getEpochSecond()); + input.getValue(), clock.instant().toEpochMilli()); } private Schema schema(DataStructure input) { diff --git a/src/main/java/com/redis/kafka/connect/source/StreamSourceRecordReader.java b/src/main/java/com/redis/kafka/connect/source/StreamSourceRecordReader.java index 777589c..548a59a 100644 --- a/src/main/java/com/redis/kafka/connect/source/StreamSourceRecordReader.java +++ b/src/main/java/com/redis/kafka/connect/source/StreamSourceRecordReader.java @@ -1,7 +1,7 @@ package com.redis.kafka.connect.source; +import java.time.Clock; import java.time.Duration; -import java.time.Instant; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -33,14 +33,15 @@ public class StreamSourceRecordReader extends AbstractSourceRecordReader reader; private AbstractRedisClient client; private GenericObjectPool> pool; + Clock clock = Clock.systemDefaultZone(); public StreamSourceRecordReader(RedisSourceConfig sourceConfig, int taskId) { super(sourceConfig); @@ -92,7 +93,7 @@ protected SourceRecord convert(StreamMessage message) { Struct value = new Struct(VALUE_SCHEMA).put(FIELD_ID, message.getId()).put(FIELD_BODY, message.getBody()) .put(FIELD_STREAM, message.getStream()); return new SourceRecord(sourcePartition, sourceOffset, topic, null, KEY_SCHEMA, key, VALUE_SCHEMA, value, - Instant.now().getEpochSecond()); + clock.instant().toEpochMilli()); } } diff --git a/src/test/unit/java/com/redis/kafka/connect/source/StreamSourceRecordReaderTest.java b/src/test/unit/java/com/redis/kafka/connect/source/StreamSourceRecordReaderTest.java new file mode 100644 index 0000000..62f2ee2 --- /dev/null +++ b/src/test/unit/java/com/redis/kafka/connect/source/StreamSourceRecordReaderTest.java @@ -0,0 +1,145 @@ +package com.redis.kafka.connect.source; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; + +import java.time.Clock; +import java.time.Instant; +import java.time.ZoneId; +import java.util.HashMap; +import java.util.Map; +import java.util.stream.Stream; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.source.SourceRecord; +import org.junit.jupiter.api.extension.ExtensionContext; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.ArgumentsProvider; +import org.junit.jupiter.params.provider.ArgumentsSource; + +import io.lettuce.core.StreamMessage; + +class StreamSourceRecordReaderTest { + + public static final String OFFSET_FIELD = "offset"; + public static final String FIELD_ID = "id"; + public static final String FIELD_BODY = "body"; + public static final String FIELD_STREAM = "stream"; + private static final String VALUE_SCHEMA_NAME = "com.redis.kafka.connect.stream.Value"; + private static final long NOW = System.currentTimeMillis(); + private static final Clock CLOCK = Clock.fixed(Instant.ofEpochMilli(NOW), ZoneId.systemDefault()); + + // This is published, so if it's changed, it may impact users. + private static final Schema PUBLISHED_SCHEMA = + SchemaBuilder.struct().field(FIELD_ID, Schema.STRING_SCHEMA) + .field(FIELD_BODY, SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).build()) + .field(FIELD_STREAM, Schema.STRING_SCHEMA).name(VALUE_SCHEMA_NAME).build(); + + @ParameterizedTest + @ArgumentsSource(ConvertArgs.class) + void testConvertStreamMessageOfStringString(ConvertArgs args) { + final StreamSourceRecordReader r = new StreamSourceRecordReader(args.config, 0); + r.clock = CLOCK; + + final SourceRecord got = r.convert(args.message); + + assertThat(got, equalTo(args.want)); + } + + static class ConvertArgs implements ArgumentsProvider, Arguments { + @Override + public Stream provideArguments(ExtensionContext context) throws Exception { + return Stream.of( + new ConvertArgs( + new RedisSourceConfig(mapOf("redis.stream.name", "stream1")), + new StreamMessage<>("stream1", "1-0", mapOf()), + new SourceRecord( + mapOf(), + mapOf("offset", "1-0"), + "stream1", + null, + Schema.STRING_SCHEMA, + "1-0", + PUBLISHED_SCHEMA, + new Struct(PUBLISHED_SCHEMA) + .put(FIELD_ID, "1-0") + .put(FIELD_STREAM, "stream1") + .put(FIELD_BODY, mapOf()), + NOW)), + + new ConvertArgs( + new RedisSourceConfig(mapOf("redis.stream.name", "stream2")), + new StreamMessage<>("stream2", "2-0", mapOf("key2", "value2")), + new SourceRecord( + mapOf(), + mapOf("offset", "2-0"), + "stream2", + null, + Schema.STRING_SCHEMA, + "2-0", + PUBLISHED_SCHEMA, + new Struct(PUBLISHED_SCHEMA) + .put(FIELD_ID, "2-0") + .put(FIELD_STREAM, "stream2") + .put(FIELD_BODY, mapOf("key2", "value2")), + NOW)), + + new ConvertArgs( + new RedisSourceConfig( + mapOf( + "redis.stream.name", + "stream3", + "topic", + "topic3")), + new StreamMessage<>("stream3", "3-0", mapOf("key3", "value3")), + new SourceRecord( + mapOf(), + mapOf("offset", "3-0"), + "topic3", + null, + Schema.STRING_SCHEMA, + "3-0", + PUBLISHED_SCHEMA, + new Struct(PUBLISHED_SCHEMA) + .put(FIELD_ID, "3-0") + .put(FIELD_STREAM, "stream3") + .put(FIELD_BODY, mapOf("key3", "value3")), + NOW)) + ); + } + + RedisSourceConfig config; + StreamMessage message; + SourceRecord want; + + ConvertArgs() { + } + + ConvertArgs(RedisSourceConfig config, StreamMessage message, SourceRecord want) { + this.config = config; + this.message = message; + this.want = want; + } + + @Override + public Object[] get() { + return new Object[] { this }; + } + } + + static Map mapOf(String... args) { + final HashMap ret = new HashMap<>(); + int i = 0; + for (; i < args.length; i+=2) { + ret.put(args[i], args[i+1]); + } + if (i != args.length) { + throw new IllegalArgumentException("Expects an even number of arguments"); + } + return ret; + } + +}