Skip to content

Commit 76cd097

Browse files
[source] Changed value schema to struct. Resolves #9
1 parent bb87fc1 commit 76cd097

File tree

4 files changed

+49
-32
lines changed

4 files changed

+49
-32
lines changed

docker/run.sh

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -117,8 +117,8 @@ docker-compose exec redis /usr/local/bin/redis-cli xlen pageviews
117117

118118
sleep 2
119119
echo -e "\nAdding messages to Redis stream 'mystream':"
120-
docker-compose exec redis /usr/local/bin/redis-cli "xadd" "mystream" "*" "field1" "message1: value1" "field2" "message1: value2"
121-
docker-compose exec redis /usr/local/bin/redis-cli "xadd" "mystream" "*" "field1" "message2: value1" "field2" "message2: value2"
120+
docker-compose exec redis /usr/local/bin/redis-cli "xadd" "mystream" "*" "field1" "value11" "field2" "value21"
121+
docker-compose exec redis /usr/local/bin/redis-cli "xadd" "mystream" "*" "field1" "value12" "field2" "value22"
122122

123123
echo -e '''
124124

src/docs/asciidoc/source.adoc

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -48,18 +48,23 @@ Use configuration property `tasks.max` to have the change stream handled by mult
4848
The {name} reads messages from a stream and publishes to a Kafka topic. Reading is done through a consumer group so that <<multiple-tasks,multiple instances>> of the connector configured via the `tasks.max` can consume messages in a round-robin fashion.
4949
5050
51-
==== Schema
51+
==== Stream Message Schema
5252
53-
The schema for the records produced by the source connector is the following:
53+
===== Key Schema
5454
55-
[source,json]
56-
----
57-
{
58-
"type": "map",
59-
"values": "string",
60-
"connect.name": "com.redislabs.kafka.connect.StreamEventValue"
61-
}
62-
----
55+
Keys are of type String and contain the stream message id.
56+
57+
===== Value Schema
58+
59+
The value schema defines the following fields:
60+
61+
[options="header"]
62+
|====
63+
|Name|Schema|Description
64+
|id |STRING |Stream message ID
65+
|stream|STRING |Stream key
66+
|body |Map of STRING|Stream message body
67+
|====
6368
6469
==== Configuration
6570

src/main/java/com/redislabs/kafka/connect/source/StreamSourceRecordReader.java

Lines changed: 9 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import io.lettuce.core.XReadArgs;
66
import org.apache.kafka.connect.data.Schema;
77
import org.apache.kafka.connect.data.SchemaBuilder;
8+
import org.apache.kafka.connect.data.Struct;
89
import org.apache.kafka.connect.source.SourceRecord;
910
import org.springframework.batch.item.ExecutionContext;
1011
import org.springframework.batch.item.redis.StreamItemReader;
@@ -19,9 +20,12 @@
1920
public class StreamSourceRecordReader extends AbstractSourceRecordReader<StreamMessage<String, String>> {
2021

2122
public static final String OFFSET_FIELD = "offset";
23+
public static final String FIELD_ID = "id";
24+
public static final String FIELD_BODY = "body";
25+
public static final String FIELD_STREAM = "stream";
2226
private static final Schema KEY_SCHEMA = Schema.STRING_SCHEMA;
23-
private static final String VALUE_SCHEMA_NAME = "com.redislabs.kafka.connect.StreamEventValue";
24-
private static final Schema VALUE_SCHEMA = SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA).name(VALUE_SCHEMA_NAME);
27+
private static final String VALUE_SCHEMA_NAME = "com.redislabs.kafka.connect.stream.Value";
28+
private static final Schema VALUE_SCHEMA = SchemaBuilder.struct().field(FIELD_ID, Schema.STRING_SCHEMA).field(FIELD_BODY, SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.STRING_SCHEMA)).field(FIELD_STREAM, Schema.STRING_SCHEMA).name(VALUE_SCHEMA_NAME);
2529
private final String topic;
2630
private final String consumer;
2731

@@ -56,7 +60,9 @@ protected void doClose() {
5660
protected SourceRecord convert(StreamMessage<String, String> message) {
5761
Map<String, ?> sourcePartition = new HashMap<>();
5862
Map<String, ?> sourceOffset = Collections.singletonMap(OFFSET_FIELD, message.getId());
59-
return new SourceRecord(sourcePartition, sourceOffset, topic, null, KEY_SCHEMA, message.getId(), VALUE_SCHEMA, message.getBody(), Instant.now().getEpochSecond());
63+
String key = message.getId();
64+
Struct value = new Struct(VALUE_SCHEMA).put(FIELD_ID, message.getId()).put(FIELD_BODY, message.getBody()).put(FIELD_STREAM, message.getStream());
65+
return new SourceRecord(sourcePartition, sourceOffset, topic, null, KEY_SCHEMA, key, VALUE_SCHEMA, value, Instant.now().getEpochSecond());
6066
}
6167

6268
}

src/test/integration/java/com/redislabs/kafka/connect/RedisEnterpriseSourceTaskIT.java

Lines changed: 23 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import com.redislabs.kafka.connect.source.RedisEnterpriseSourceConfig;
44
import com.redislabs.kafka.connect.source.RedisEnterpriseSourceTask;
55
import com.redislabs.testcontainers.RedisServer;
6+
import org.apache.kafka.connect.data.Struct;
67
import org.apache.kafka.connect.source.SourceRecord;
78
import org.junit.jupiter.api.AfterEach;
89
import org.junit.jupiter.api.Assertions;
@@ -13,8 +14,6 @@
1314
import java.util.List;
1415
import java.util.Map;
1516

16-
import static org.junit.jupiter.api.Assertions.assertEquals;
17-
1817
public class RedisEnterpriseSourceTaskIT extends AbstractRedisEnterpriseIT {
1918

2019
private RedisEnterpriseSourceTask task;
@@ -40,23 +39,30 @@ public void teardown() {
4039
@MethodSource("redisServers")
4140
public void pollStream(RedisServer redis) throws InterruptedException {
4241
final String stream = "stream1";
43-
startTask(redis, RedisEnterpriseSourceConfig.TOPIC, RedisEnterpriseSourceConfig.TOKEN_STREAM, RedisEnterpriseSourceConfig.READER_TYPE, RedisEnterpriseSourceConfig.ReaderType.STREAM.name(), RedisEnterpriseSourceConfig.STREAM_NAME, stream);
42+
final String topicPrefix = "testprefix-";
43+
startTask(redis, RedisEnterpriseSourceConfig.TOPIC, topicPrefix + RedisEnterpriseSourceConfig.TOKEN_STREAM, RedisEnterpriseSourceConfig.READER_TYPE, RedisEnterpriseSourceConfig.ReaderType.STREAM.name(), RedisEnterpriseSourceConfig.STREAM_NAME, stream);
4444
String field1 = "field1";
4545
String value1 = "value1";
4646
String field2 = "field2";
4747
String value2 = "value2";
48-
syncStream(redis).xadd(stream, map(field1, value1, field2, value2));
49-
syncStream(redis).xadd(stream, map(field1, value1, field2, value2));
50-
syncStream(redis).xadd(stream, map(field1, value1, field2, value2));
48+
final Map<String, String> body = map(field1, value1, field2, value2);
49+
final String id1 = syncStream(redis).xadd(stream, body);
50+
final String id2 = syncStream(redis).xadd(stream, body);
51+
final String id3 = syncStream(redis).xadd(stream, body);
5152
Thread.sleep(100);
5253
List<SourceRecord> sourceRecords = task.poll();
53-
assertEquals(3, sourceRecords.size());
54-
for (SourceRecord record : sourceRecords) {
55-
Assertions.assertEquals(stream, record.topic());
56-
Map<String, String> map = (Map<String, String>) record.value();
57-
Assertions.assertEquals(value1, map.get(field1));
58-
Assertions.assertEquals(value2, map.get(field2));
59-
}
54+
Assertions.assertEquals(3, sourceRecords.size());
55+
assertEquals(id1, body, stream, topicPrefix + stream, sourceRecords.get(0));
56+
assertEquals(id2, body, stream, topicPrefix + stream, sourceRecords.get(1));
57+
assertEquals(id3, body, stream, topicPrefix + stream, sourceRecords.get(2));
58+
}
59+
60+
private void assertEquals(String expectedId, Map<String, String> expectedBody, String expectedStream, String expectedTopic, SourceRecord record) {
61+
Struct struct = (Struct) record.value();
62+
Assertions.assertEquals(expectedId, struct.get("id"));
63+
Assertions.assertEquals(expectedBody, struct.get("body"));
64+
Assertions.assertEquals(expectedStream, struct.get("stream"));
65+
Assertions.assertEquals(expectedTopic, record.topic());
6066
}
6167

6268
@ParameterizedTest
@@ -73,12 +79,12 @@ public void pollKeys(RedisServer redis) throws InterruptedException {
7379
syncHash(redis).hset(hashKey, hashValue);
7480
Thread.sleep(100);
7581
List<SourceRecord> sourceRecords = task.poll();
76-
assertEquals(2, sourceRecords.size());
82+
Assertions.assertEquals(2, sourceRecords.size());
7783
for (SourceRecord record : sourceRecords) {
78-
assertEquals(topic, record.topic());
84+
Assertions.assertEquals(topic, record.topic());
7985
}
80-
assertEquals(stringValue, sourceRecords.get(0).value());
81-
assertEquals(hashValue, sourceRecords.get(1).value());
86+
Assertions.assertEquals(stringValue, sourceRecords.get(0).value());
87+
Assertions.assertEquals(hashValue, sourceRecords.get(1).value());
8288
}
8389

8490
}

0 commit comments

Comments
 (0)