-
Notifications
You must be signed in to change notification settings - Fork 1.2k
Registering stream consumers automatically with annotations #2528
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Comments
Hi @NerminKarapandzic - Thank you for your interests and submitting a request on this topic. I don't think this is the first time users have requested functionality, similar to what you are asking for here, before. See Issue #1004. While I think this would be valuable and convenient add to Spring Data Redis, it also requires more deliberate and careful consideration. We would be happy to collaborate on something if you were willing to make a contribution. Otherwise, we will need to take this into considerations based on other priorities (deliverables) and interest. However, in the meantime, if I may. I think you could achieve something like this already, if you were to combine Spring Data Redis with Spring Integration and possibly Spring Cloud Stream, for instance. Spring Integration already contains nice support for inbound/outbound channel, Redis adapters (see doc). In addition, this might be able to be combined with Spring Cloud Stream [Applications]; see here (under "Available Applications"). I understand that this alternate approach would necessarily involve multiple dependencies, and that it would be more convenient to have something in Spring Data Redis specifically. But there is a reason these other libraries (like Spring Integration, like Spring Cloud Stream [Applications]) exist so we must be careful not to duplicate effort, which could be confusing to users as well. Food for thought. |
Thank you for the answer @jxblum, I would be happy to contribute. What would be the following steps? |
Hi @NerminKarapandzic - We try to make it as easy for our awesome community, like yourself, as possible to contribute to any Spring project, including Spring Data Redis. We have documented the Spring (Data) contribution guidelines here. Basically, you will need to sign the CLA and submit a Pull Request (PR) with the changes that implement your idea and solution. The PR will be picked up by the team for review and feedback. Having said this, we had a Spring Data team triage call this morning and we also feel like it would be valuable to get feedback from the Spring Integration team and possibly the Spring Cloud Stream team to coordinate our efforts there as well (again, to avoid duplicate efforts and overlapping functionality that could cause confusion for our other users). We can coordinate and collect feedback from the Spring Integration/Spring Cloud Stream teams internally and I can summarize in the comments here. In terms of priorities, I am currently in the middle of a few other tickets that need my attention. I will have those wrapped up in a week or two (I hope, barring any interruptions). Looking forward to collaborating with you. |
Great, @jxblum . I will already start looking into it and think/work on some implementation, but do mention me here please when you have other feedback from your meetings. |
Thank you @NerminKarapandzic. My advice would be to maybe start with just a simple prototype at first, a test and maybe an example, that would answer the question, "What would this look and feel like to users?", or "How would they use it (for different use cases, in different scenarios/contexts/etc?)" This might seem obvious to use, but what about someone new to Spring specifically, Spring Data Redis? I know you mentioned Spring Kafka, but with a Spring/Redis spin on it. I would not try to solve this entire problem by yourself since @mp911de was also thinking of tying this feature together with Redis pub/sub (doc), and Spring Data Redis's Again, we will definitely want to get some feedback from the Spring Integration and Spring Cloud Stream team. Thanks again. |
Hey @jxblum, I came up with some basic version and a test to prove it. I would like to get some feedback and ideas before I continue working on it. |
Thank you so much @NerminKarapandzic! Really appreciate this. I will have a look in a few days. Currently, involved in few other activities that need my quick attention. Will get back to you soon. |
Hey @jxblum , just wondering if you had a chance to look at this? |
Hi @NerminKarapandzic - Sorry for the delayed response. No, sorry. This ticket is not deemed high priority at the moment, and needs better cross-project coordination (Spring Integration and Spring Cloud Stream teams) for sure. This will take a bit of time. Currently, the entire team is preparing for critical GA release coming up in (mid) May (see May in Spring Release Calendar). I promise to get back to you on this. |
I've been following this issue and found myself needing this feature for one of my applications. I wanted to share my approach in case it helps someone more experienced to take it further. https://github.com/Juungmini0601/jungmini-redis-messaging ProtoType@RedisListener @Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RedisListener {
String[] channels() default {};
String[] patterns() default {};
} RedisListenerProcessor @Component
public class RedisListenerProcessor implements BeanPostProcessor, ApplicationContextAware {
private static final Logger log = LoggerFactory.getLogger(RedisListenerProcessor.class);
private final ObjectMapper objectMapper;
private RedisMessageListenerContainer listenerContainer;
public RedisListenerProcessor(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
}
@Override
public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
Method[] methods = bean.getClass().getDeclaredMethods();
for (Method method : methods) {
if (method.isAnnotationPresent(RedisListener.class)) {
RedisListener annotation = method.getAnnotation(RedisListener.class);
registerListener(bean, method, annotation);
}
}
return bean;
}
@Override
public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
this.listenerContainer = applicationContext.getBean(RedisMessageListenerContainer.class);
}
private void registerListener(Object bean, Method method, RedisListener annotation) {
validateMethodSignature(method);
MessageListener listener = (message, pattern) -> {
try {
String messageBody = new String(message.getBody());
String channel = new String(message.getChannel());
Parameter[] parameters = method.getParameters();
if (parameters.length == 0) {
method.invoke(bean);
return;
}
if (parameters.length == 1) {
Class<?> parameterType = parameters[0].getType();
if (parameterType.equals(String.class)) {
method.invoke(bean, messageBody);
} else {
Object parsedMessage = objectMapper.readValue(messageBody, parameterType);
method.invoke(bean, parsedMessage);
}
return;
}
Class<?> parameterType = parameters[0].getType();
Object parsedMessage = objectMapper.readValue(messageBody, parameterType);
method.invoke(bean, parsedMessage, channel);
} catch (Exception e) {
throw new RuntimeException(e);
}
};
for (String channel : annotation.channels()) {
listenerContainer.addMessageListener(listener, new ChannelTopic(channel));
}
for (String pattern : annotation.patterns()) {
listenerContainer.addMessageListener(listener, new PatternTopic(pattern));
}
}
private void validateMethodSignature(Method method) {
Parameter[] parameters = method.getParameters();
String methodName = method.getDeclaringClass().getSimpleName() + "." + method.getName();
if (parameters.length > 2) {
throw new IllegalArgumentException(
String.format("@RedisListener method '%s' can have at most 2 parameters", methodName));
}
if (parameters.length == 0) {
return;
}
if (parameters.length == 1) {
return;
}
Class<?> secondParamType = parameters[1].getType();
if (!secondParamType.equals(String.class)) {
throw new IllegalArgumentException(
String.format("@RedisListener method '%s' second parameter must be String (channel), but was %s",
methodName, secondParamType.getSimpleName()));
}
}
} |
Basically what spring for kafka does, we would have an annotation like @RedisStreamListener to which we could provide the stream name and maybe some other options, you get the idea.
Are there maybe some limitations which make this not possible?
The text was updated successfully, but these errors were encountered: