Skip to content

Commit c67f80b

Browse files
GH-1487: Countdown not active AsyncMProcConsumer
Fixes #1487 * SimpleMessageListenerContainer.AsyncMessageProcessingConsumer countdown when the container is not active * test for AsyncMessageProcessingConsumer countdown when the container is not active * change TestExecutor for the static and final **Cherry-pick to `2.4.x`**
1 parent 0a6a606 commit c67f80b

File tree

2 files changed

+58
-1
lines changed

2 files changed

+58
-1
lines changed

spring-rabbit/src/main/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainer.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@
7878
* @author Artem Bilan
7979
* @author Alex Panchenko
8080
* @author Mat Jaggard
81+
* @author Yansong Ren
8182
*
8283
* @since 1.0
8384
*/
@@ -1192,6 +1193,7 @@ private FatalListenerStartupException getStartupException() throws InterruptedEx
11921193
@Override // NOSONAR - complexity - many catch blocks
11931194
public void run() { // NOSONAR - line count
11941195
if (!isActive()) {
1196+
this.start.countDown();
11951197
return;
11961198
}
11971199

spring-rabbit/src/test/java/org/springframework/amqp/rabbit/listener/SimpleMessageListenerContainerTests.java

Lines changed: 56 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2021 the original author or authors.
2+
* Copyright 2002-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -86,6 +86,7 @@
8686
import org.springframework.amqp.utils.test.TestUtils;
8787
import org.springframework.aop.support.AopUtils;
8888
import org.springframework.beans.DirectFieldAccessor;
89+
import org.springframework.core.task.SimpleAsyncTaskExecutor;
8990
import org.springframework.test.util.ReflectionTestUtils;
9091
import org.springframework.transaction.TransactionDefinition;
9192
import org.springframework.transaction.TransactionException;
@@ -106,6 +107,7 @@
106107
* @author Gary Russell
107108
* @author Artem Bilan
108109
* @author Mohammad Hewedy
110+
* @author Yansong Ren
109111
*/
110112
public class SimpleMessageListenerContainerTests {
111113

@@ -731,6 +733,30 @@ void filterMppNoDoubleAck() throws Exception {
731733
verifyNoMoreInteractions(listener);
732734
}
733735

736+
@Test
737+
void testWithConsumerStartWhenNotActive() {
738+
ConnectionFactory connectionFactory = mock(ConnectionFactory.class);
739+
Connection connection = mock(Connection.class);
740+
Channel channel = mock(Channel.class);
741+
given(connectionFactory.createConnection()).willReturn(connection);
742+
given(connection.createChannel(false)).willReturn(channel);
743+
744+
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
745+
// overwrite task execute. shutdown container before task execute.
746+
TestExecutor testExecutor = new TestExecutor(container);
747+
container.setTaskExecutor(testExecutor);
748+
container.start();
749+
750+
// then add queue for trigger container shutdown
751+
container.addQueueNames("bar");
752+
753+
// valid the 'start' countdown is 0. lastTask is AsyncMessageProcessingConsumer
754+
Runnable lastTask = testExecutor.getLastTask();
755+
CountDownLatch start = TestUtils.getPropertyValue(lastTask, "start", CountDownLatch.class);
756+
757+
assertThat(start.getCount()).isEqualTo(0L);
758+
}
759+
734760
private Answer<Object> messageToConsumer(final Channel mockChannel, final SimpleMessageListenerContainer container,
735761
final boolean cancel, final CountDownLatch latch) {
736762
return invocation -> {
@@ -784,4 +810,33 @@ protected void doRollback(DefaultTransactionStatus status) throws TransactionExc
784810

785811
}
786812

813+
@SuppressWarnings("serial")
814+
private static final class TestExecutor extends SimpleAsyncTaskExecutor {
815+
816+
private final SimpleMessageListenerContainer simpleMessageListenerContainer;
817+
818+
private int shutdownCount = 0;
819+
820+
private Runnable lastTask = null;
821+
822+
private TestExecutor(SimpleMessageListenerContainer simpleMessageListenerContainer) {
823+
this.simpleMessageListenerContainer = simpleMessageListenerContainer;
824+
}
825+
826+
public Runnable getLastTask() {
827+
return lastTask;
828+
}
829+
830+
@Override
831+
public void execute(Runnable task) {
832+
// skip the first execution
833+
if (++shutdownCount > 1) {
834+
lastTask = task;
835+
// before execute, shutdown the container for test
836+
this.simpleMessageListenerContainer.shutdown();
837+
}
838+
super.execute(task);
839+
}
840+
}
841+
787842
}

0 commit comments

Comments
 (0)