Skip to content

Registering stream consumers automatically with annotations #2528

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
NerminKarapandzic opened this issue Mar 16, 2023 · 10 comments
Open

Registering stream consumers automatically with annotations #2528

NerminKarapandzic opened this issue Mar 16, 2023 · 10 comments
Assignees
Labels
status: on-hold We cannot start working on this issue yet status: pending-design-work Needs design work before any code can be developed type: enhancement A general enhancement

Comments

@NerminKarapandzic
Copy link

Basically what spring for kafka does, we would have an annotation like @RedisStreamListener to which we could provide the stream name and maybe some other options, you get the idea.

Are there maybe some limitations which make this not possible?

@spring-projects-issues spring-projects-issues added the status: waiting-for-triage An issue we've not yet triaged label Mar 16, 2023
@jxblum
Copy link
Contributor

jxblum commented Mar 20, 2023

Hi @NerminKarapandzic - Thank you for your interests and submitting a request on this topic.

I don't think this is the first time users have requested functionality, similar to what you are asking for here, before. See Issue #1004.

While I think this would be valuable and convenient add to Spring Data Redis, it also requires more deliberate and careful consideration. We would be happy to collaborate on something if you were willing to make a contribution. Otherwise, we will need to take this into considerations based on other priorities (deliverables) and interest.

However, in the meantime, if I may. I think you could achieve something like this already, if you were to combine Spring Data Redis with Spring Integration and possibly Spring Cloud Stream, for instance.

Spring Integration already contains nice support for inbound/outbound channel, Redis adapters (see doc). In addition, this might be able to be combined with Spring Cloud Stream [Applications]; see here (under "Available Applications").

I understand that this alternate approach would necessarily involve multiple dependencies, and that it would be more convenient to have something in Spring Data Redis specifically. But there is a reason these other libraries (like Spring Integration, like Spring Cloud Stream [Applications]) exist so we must be careful not to duplicate effort, which could be confusing to users as well.

Food for thought.

@gregturn gregturn added status: pending-design-work Needs design work before any code can be developed type: enhancement A general enhancement and removed status: waiting-for-triage An issue we've not yet triaged labels Mar 20, 2023
@NerminKarapandzic
Copy link
Author

Thank you for the answer @jxblum, I would be happy to contribute. What would be the following steps?

@jxblum
Copy link
Contributor

jxblum commented Mar 20, 2023

Hi @NerminKarapandzic - We try to make it as easy for our awesome community, like yourself, as possible to contribute to any Spring project, including Spring Data Redis.

We have documented the Spring (Data) contribution guidelines here. Basically, you will need to sign the CLA and submit a Pull Request (PR) with the changes that implement your idea and solution.

The PR will be picked up by the team for review and feedback.

Having said this, we had a Spring Data team triage call this morning and we also feel like it would be valuable to get feedback from the Spring Integration team and possibly the Spring Cloud Stream team to coordinate our efforts there as well (again, to avoid duplicate efforts and overlapping functionality that could cause confusion for our other users). We can coordinate and collect feedback from the Spring Integration/Spring Cloud Stream teams internally and I can summarize in the comments here.

In terms of priorities, I am currently in the middle of a few other tickets that need my attention. I will have those wrapped up in a week or two (I hope, barring any interruptions).

Looking forward to collaborating with you.

@NerminKarapandzic
Copy link
Author

Great, @jxblum . I will already start looking into it and think/work on some implementation, but do mention me here please when you have other feedback from your meetings.

@jxblum
Copy link
Contributor

jxblum commented Mar 20, 2023

Thank you @NerminKarapandzic.

My advice would be to maybe start with just a simple prototype at first, a test and maybe an example, that would answer the question, "What would this look and feel like to users?", or "How would they use it (for different use cases, in different scenarios/contexts/etc?)"

This might seem obvious to use, but what about someone new to Spring specifically, Spring Data Redis? I know you mentioned Spring Kafka, but with a Spring/Redis spin on it. RedisStreamListener is not bad, but we can maybe play around here more as well. Include support or filtering, maybe conversion or transformation of elements arriving from the stream, or events coming from some source, etc.

I would not try to solve this entire problem by yourself since @mp911de was also thinking of tying this feature together with Redis pub/sub (doc), and Spring Data Redis's RedisMessageListenerContainer (Javadoc), and maybe few other areas as well.

Again, we will definitely want to get some feedback from the Spring Integration and Spring Cloud Stream team.

Thanks again.

@jxblum jxblum self-assigned this Mar 20, 2023
@NerminKarapandzic
Copy link
Author

Hey @jxblum, I came up with some basic version and a test to prove it. I would like to get some feedback and ideas before I continue working on it.
Don't mind the commit messages and coding style yet.
Here's where you can check the current work: main...NerminKarapandzic:spring-data-redis:issue/DATAREDIS-2528

@jxblum
Copy link
Contributor

jxblum commented Mar 28, 2023

Thank you so much @NerminKarapandzic! Really appreciate this. I will have a look in a few days. Currently, involved in few other activities that need my quick attention. Will get back to you soon.

@NerminKarapandzic
Copy link
Author

Hey @jxblum , just wondering if you had a chance to look at this?

@jxblum
Copy link
Contributor

jxblum commented Apr 26, 2023

Hi @NerminKarapandzic - Sorry for the delayed response. No, sorry. This ticket is not deemed high priority at the moment, and needs better cross-project coordination (Spring Integration and Spring Cloud Stream teams) for sure.

This will take a bit of time. Currently, the entire team is preparing for critical GA release coming up in (mid) May (see May in Spring Release Calendar).

I promise to get back to you on this.

@Juungmini0601
Copy link

Juungmini0601 commented Jun 13, 2025

I've been following this issue and found myself needing this feature for one of my applications.
I created a simple prototype implementation that worked well in my use case.

I wanted to share my approach in case it helps someone more experienced to take it further.

https://github.com/Juungmini0601/jungmini-redis-messaging

ProtoType

@RedisListener

@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RedisListener {
	String[] channels() default {};

	String[] patterns() default {};
}

RedisListenerProcessor

@Component
public class RedisListenerProcessor implements BeanPostProcessor, ApplicationContextAware {

	private static final Logger log = LoggerFactory.getLogger(RedisListenerProcessor.class);
	private final ObjectMapper objectMapper;
	private RedisMessageListenerContainer listenerContainer;

	public RedisListenerProcessor(ObjectMapper objectMapper) {
		this.objectMapper = objectMapper;
	}

	@Override
	public Object postProcessAfterInitialization(Object bean, String beanName) throws BeansException {
		Method[] methods = bean.getClass().getDeclaredMethods();

		for (Method method : methods) {
			if (method.isAnnotationPresent(RedisListener.class)) {
				RedisListener annotation = method.getAnnotation(RedisListener.class);
				registerListener(bean, method, annotation);
			}
		}
		return bean;
	}

	@Override
	public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {
		this.listenerContainer = applicationContext.getBean(RedisMessageListenerContainer.class);
	}

	private void registerListener(Object bean, Method method, RedisListener annotation) {
		validateMethodSignature(method);

		MessageListener listener = (message, pattern) -> {
			try {
				String messageBody = new String(message.getBody());
				String channel = new String(message.getChannel());
				Parameter[] parameters = method.getParameters();

				if (parameters.length == 0) {
					method.invoke(bean);
					return;
				}

				if (parameters.length == 1) {
					Class<?> parameterType = parameters[0].getType();
					if (parameterType.equals(String.class)) {
						method.invoke(bean, messageBody);
					} else {
						Object parsedMessage = objectMapper.readValue(messageBody, parameterType);
						method.invoke(bean, parsedMessage);
					}
					return;
				}

				Class<?> parameterType = parameters[0].getType();
				Object parsedMessage = objectMapper.readValue(messageBody, parameterType);
				method.invoke(bean, parsedMessage, channel);
			} catch (Exception e) {
				throw new RuntimeException(e);
			}
		};

		for (String channel : annotation.channels()) {
			listenerContainer.addMessageListener(listener, new ChannelTopic(channel));
		}

		for (String pattern : annotation.patterns()) {
			listenerContainer.addMessageListener(listener, new PatternTopic(pattern));
		}
	}

	private void validateMethodSignature(Method method) {
		Parameter[] parameters = method.getParameters();
		String methodName = method.getDeclaringClass().getSimpleName() + "." + method.getName();

		if (parameters.length > 2) {
			throw new IllegalArgumentException(
				String.format("@RedisListener method '%s' can have at most 2 parameters", methodName));
		}

		if (parameters.length == 0) {
			return;
		}

		if (parameters.length == 1) {
			return;
		}

		Class<?> secondParamType = parameters[1].getType();
		if (!secondParamType.equals(String.class)) {
			throw new IllegalArgumentException(
				String.format("@RedisListener method '%s' second parameter must be String (channel), but was %s",
					methodName, secondParamType.getSimpleName()));
		}
	}
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status: on-hold We cannot start working on this issue yet status: pending-design-work Needs design work before any code can be developed type: enhancement A general enhancement
Projects
None yet
Development

No branches or pull requests

5 participants