Description
In what version(s) of Spring AMQP are you seeing this issue?
3.1.5
Describe the bug
When use Kotlin suspend (or reactive) RabbitListener
with consumerBatch
and AcknowledgeMode.MANUAL
error occurs
Operator called default onErrorDropped java.lang.NullPointerException: Cannot invoke \"org.springframework.amqp.core.Message.getMessageProperties()\" because \"request\" is null\n\tat org.springframework.amqp.rabbit.listener.adapter.AbstractAdaptableMessageListener.basicAck(AbstractAdaptableMessageListener.java:444)
To Reproduce
Below fragments using Kotlin
Configure SimpleRabbitListenerContainerFactory
with settings:
val factory = SimpleRabbitListenerContainerFactory()
// ...
factory.setConsumerBatchEnabled(true)
factory.setDeBatchingEnabled(true)
factory.setBatchSize(100)
factory.setPrefetchCount(100)
factory.setAcknowledgeMode(AcknowledgeMode.MANUAL)
Define rabbit listener like this:
@RabbitListener(queues = ["some-queue"], )
suspend fun receiveSensorMessage(messages: List<String>): Unit {
service.handleMessages(messages)
}
And after recieve messages and handled its you will get an error:
Operator called default onErrorDropped java.lang.NullPointerException: Cannot invoke \"org.springframework.amqp.core.Message.getMessageProperties()\" because \"request\" is null\n\tat org.springframework.amqp.rabbit.listener.adapter.AbstractAdaptableMessageListener.basicAck(AbstractAdaptableMessageListener.java:444)
After some research source code of spring-amqp
I have explanation why it is happens:
When we enable batch then used BatchMessagingMessageListenerAdapter
It is invokes invokeHandlerAndProcessResult method with amqpMessage
= null
and after invokeHandler we get result as MonoOnAssembly
therefore we also call handleResult method where we basicAck will be called with request
as null (because above we pass it as amqpMessage
= null
).
Expected behavior
No error occurs while using suspend RabbitListener
with consumerBatch
and AcknowledgeMode.MANUAL