Skip to content

Commit 3a135fe

Browse files
committed
removed custom exception
1 parent 74e37d3 commit 3a135fe

File tree

3 files changed

+5
-52
lines changed

3 files changed

+5
-52
lines changed

src/main/java/com/redislabs/kafkaconnect/common/RedisEnterpriseConnectorConfigException.java

Lines changed: 0 additions & 46 deletions
This file was deleted.

src/main/java/com/redislabs/kafkaconnect/sink/RedisEnterpriseSinkTask.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,12 @@
2020
import com.github.jcustenborder.kafka.connect.utils.data.TopicPartitionCounter;
2121
import com.github.jcustenborder.kafka.connect.utils.jackson.ObjectMapperFactory;
2222
import com.redislabs.kafkaconnect.RedisEnterpriseSinkConnector;
23-
import com.redislabs.kafkaconnect.common.RedisEnterpriseConnectorConfigException;
2423
import io.lettuce.core.KeyValue;
2524
import io.lettuce.core.RedisClient;
2625
import io.lettuce.core.api.StatefulRedisConnection;
2726
import lombok.extern.slf4j.Slf4j;
2827
import org.apache.kafka.common.TopicPartition;
28+
import org.apache.kafka.common.config.ConfigException;
2929
import org.apache.kafka.connect.data.Field;
3030
import org.apache.kafka.connect.data.Struct;
3131
import org.apache.kafka.connect.errors.ConnectException;
@@ -123,7 +123,7 @@ private OperationItemWriter.RedisOperation<SinkRecord> operation() {
123123
case ZSET:
124124
return new Zadd<>(this::collectionKey, this::key, new ConstantPredicate<>(false), new NullValuePredicate<>(this::score), this::score);
125125
default:
126-
throw new RedisEnterpriseConnectorConfigException("Data structure not supported: " + config.getType());
126+
throw new ConfigException(RedisEnterpriseSinkConfig.TYPE, config.getType());
127127
}
128128
}
129129

src/test/java/com/redislabs/kafkaconnect/RedisEnterpriseSinkTaskIT.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -200,13 +200,12 @@ public void putZset(RedisServer redis) {
200200
List<SinkRecord> records = new ArrayList<>(count);
201201
for (int i = 0; i < count; i++) {
202202
String value = "zsetmember:" + i;
203-
Double score = (double) i;
204-
expected.add(ScoredValue.just(score, value));
205-
records.add(SinkRecordHelper.write(topic, new SchemaAndValue(Schema.STRING_SCHEMA, value), new SchemaAndValue(Schema.FLOAT64_SCHEMA, score)));
203+
expected.add(ScoredValue.just(i, value));
204+
records.add(SinkRecordHelper.write(topic, new SchemaAndValue(Schema.STRING_SCHEMA, value), new SchemaAndValue(Schema.FLOAT64_SCHEMA, i)));
206205
}
207206
put(topic, RedisEnterpriseSinkConfig.DataType.ZSET, redis, records);
208207
List<ScoredValue<String>> actual = syncSortedSet(redis).zrangeWithScores(topic, 0, -1);
209-
Collections.sort(expected, Comparator.comparing(ScoredValue::getScore));
208+
expected.sort(Comparator.comparing(ScoredValue::getScore));
210209
assertEquals(expected, actual);
211210
}
212211

0 commit comments

Comments
 (0)