Skip to content

Fixes redis-field-engineering/redis-kafka-connect#15 #16

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Oct 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.redis.kafka.connect.source;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -32,7 +32,7 @@ public class KeySourceRecordReader extends AbstractSourceRecordReader<DataStruct
private static final Schema STRING_VALUE_SCHEMA = Schema.STRING_SCHEMA;
private static final String HASH_VALUE_SCHEMA_NAME = "com.redis.kafka.connect.HashEventValue";
private static final Schema HASH_VALUE_SCHEMA = SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA)
.name(HASH_VALUE_SCHEMA_NAME);
.name(HASH_VALUE_SCHEMA_NAME).build();

private final int batchSize;
private final String topic;
Expand All @@ -41,6 +41,7 @@ public class KeySourceRecordReader extends AbstractSourceRecordReader<DataStruct
private AbstractRedisClient client;
private GenericObjectPool<StatefulConnection<String, String>> pool;
private StatefulRedisPubSubConnection<String, String> pubSubConnection;
Clock clock = Clock.systemDefaultZone();

public KeySourceRecordReader(RedisSourceConfig sourceConfig, Duration idleTimeout) {
super(sourceConfig);
Expand Down Expand Up @@ -99,7 +100,7 @@ protected SourceRecord convert(DataStructure<String> input) {
Map<String, ?> sourcePartition = new HashMap<>();
Map<String, ?> sourceOffset = new HashMap<>();
return new SourceRecord(sourcePartition, sourceOffset, topic, null, KEY_SCHEMA, input.getKey(), schema(input),
input.getValue(), Instant.now().getEpochSecond());
input.getValue(), clock.instant().toEpochMilli());
}

private Schema schema(DataStructure<String> input) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package com.redis.kafka.connect.source;

import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
Expand Down Expand Up @@ -33,14 +33,15 @@ public class StreamSourceRecordReader extends AbstractSourceRecordReader<StreamM
private static final Schema KEY_SCHEMA = Schema.STRING_SCHEMA;
private static final String VALUE_SCHEMA_NAME = "com.redis.kafka.connect.stream.Value";
private static final Schema VALUE_SCHEMA = SchemaBuilder.struct().field(FIELD_ID, Schema.STRING_SCHEMA)
.field(FIELD_BODY, SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA))
.field(FIELD_STREAM, Schema.STRING_SCHEMA).name(VALUE_SCHEMA_NAME);
.field(FIELD_BODY, SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).build())
.field(FIELD_STREAM, Schema.STRING_SCHEMA).name(VALUE_SCHEMA_NAME).build();
private final String topic;
private final String consumer;

private StreamItemReader<String, String> reader;
private AbstractRedisClient client;
private GenericObjectPool<StatefulConnection<String, String>> pool;
Clock clock = Clock.systemDefaultZone();

public StreamSourceRecordReader(RedisSourceConfig sourceConfig, int taskId) {
super(sourceConfig);
Expand Down Expand Up @@ -92,7 +93,7 @@ protected SourceRecord convert(StreamMessage<String, String> message) {
Struct value = new Struct(VALUE_SCHEMA).put(FIELD_ID, message.getId()).put(FIELD_BODY, message.getBody())
.put(FIELD_STREAM, message.getStream());
return new SourceRecord(sourcePartition, sourceOffset, topic, null, KEY_SCHEMA, key, VALUE_SCHEMA, value,
Instant.now().getEpochSecond());
clock.instant().toEpochMilli());
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
package com.redis.kafka.connect.source;

import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;

import java.time.Clock;
import java.time.Instant;
import java.time.ZoneId;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Stream;

import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.SchemaBuilder;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.ArgumentsProvider;
import org.junit.jupiter.params.provider.ArgumentsSource;

import io.lettuce.core.StreamMessage;

class StreamSourceRecordReaderTest {

public static final String OFFSET_FIELD = "offset";
public static final String FIELD_ID = "id";
public static final String FIELD_BODY = "body";
public static final String FIELD_STREAM = "stream";
private static final String VALUE_SCHEMA_NAME = "com.redis.kafka.connect.stream.Value";
private static final long NOW = System.currentTimeMillis();
private static final Clock CLOCK = Clock.fixed(Instant.ofEpochMilli(NOW), ZoneId.systemDefault());

// This is published, so if it's changed, it may impact users.
private static final Schema PUBLISHED_SCHEMA =
SchemaBuilder.struct().field(FIELD_ID, Schema.STRING_SCHEMA)
.field(FIELD_BODY, SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).build())
.field(FIELD_STREAM, Schema.STRING_SCHEMA).name(VALUE_SCHEMA_NAME).build();

@ParameterizedTest
@ArgumentsSource(ConvertArgs.class)
void testConvertStreamMessageOfStringString(ConvertArgs args) {
final StreamSourceRecordReader r = new StreamSourceRecordReader(args.config, 0);
r.clock = CLOCK;

final SourceRecord got = r.convert(args.message);

assertThat(got, equalTo(args.want));
}

static class ConvertArgs implements ArgumentsProvider, Arguments {
@Override
public Stream<? extends Arguments> provideArguments(ExtensionContext context) throws Exception {
return Stream.of(
new ConvertArgs(
new RedisSourceConfig(mapOf("redis.stream.name", "stream1")),
new StreamMessage<>("stream1", "1-0", mapOf()),
new SourceRecord(
mapOf(),
mapOf("offset", "1-0"),
"stream1",
null,
Schema.STRING_SCHEMA,
"1-0",
PUBLISHED_SCHEMA,
new Struct(PUBLISHED_SCHEMA)
.put(FIELD_ID, "1-0")
.put(FIELD_STREAM, "stream1")
.put(FIELD_BODY, mapOf()),
NOW)),

new ConvertArgs(
new RedisSourceConfig(mapOf("redis.stream.name", "stream2")),
new StreamMessage<>("stream2", "2-0", mapOf("key2", "value2")),
new SourceRecord(
mapOf(),
mapOf("offset", "2-0"),
"stream2",
null,
Schema.STRING_SCHEMA,
"2-0",
PUBLISHED_SCHEMA,
new Struct(PUBLISHED_SCHEMA)
.put(FIELD_ID, "2-0")
.put(FIELD_STREAM, "stream2")
.put(FIELD_BODY, mapOf("key2", "value2")),
NOW)),

new ConvertArgs(
new RedisSourceConfig(
mapOf(
"redis.stream.name",
"stream3",
"topic",
"topic3")),
new StreamMessage<>("stream3", "3-0", mapOf("key3", "value3")),
new SourceRecord(
mapOf(),
mapOf("offset", "3-0"),
"topic3",
null,
Schema.STRING_SCHEMA,
"3-0",
PUBLISHED_SCHEMA,
new Struct(PUBLISHED_SCHEMA)
.put(FIELD_ID, "3-0")
.put(FIELD_STREAM, "stream3")
.put(FIELD_BODY, mapOf("key3", "value3")),
NOW))
);
}

RedisSourceConfig config;
StreamMessage<String, String> message;
SourceRecord want;

ConvertArgs() {
}

ConvertArgs(RedisSourceConfig config, StreamMessage<String, String> message, SourceRecord want) {
this.config = config;
this.message = message;
this.want = want;
}

@Override
public Object[] get() {
return new Object[] { this };
}
}

static Map<String, String> mapOf(String... args) {
final HashMap<String, String> ret = new HashMap<>();
int i = 0;
for (; i < args.length; i+=2) {
ret.put(args[i], args[i+1]);
}
if (i != args.length) {
throw new IllegalArgumentException("Expects an even number of arguments");
}
return ret;
}

}