Skip to content

Commit 93ee7ac

Browse files
garyrussellartembilan
authored andcommitted
GH-2297: Add KLERegistry.alwaysStartAfterRefresh
Resolves #2297 Add an option to apply `autoStartup` semantics to late registrations. **cherry-pick to 2.9.x, 2.8.x**
1 parent be7fdb4 commit 93ee7ac

File tree

3 files changed

+47
-2
lines changed

3 files changed

+47
-2
lines changed

spring-kafka-docs/src/main/asciidoc/kafka.adoc

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1953,6 +1953,10 @@ A collection of managed containers can be obtained by calling the registry's `ge
19531953
Version 2.2.5 added a convenience method `getAllListenerContainers()`, which returns a collection of all containers, including those managed by the registry and those declared as beans.
19541954
The collection returned will include any prototype beans that have been initialized, but it will not initialize any lazy bean declarations.
19551955

1956+
IMPORTANT: Endpoints registered after the application context has been refreshed will start immediately, regardless of their `autoStartup` property, to comply with the `SmartLifecycle` contract, where `autoStartup` is only considered during application context initialization.
1957+
An example of late registration is a bean with a `@KafkaListener` in prototype scope where an instance is created after the context is initialized.
1958+
Starting with version 2.8.7, you can set the registry's `alwaysStartAfterRefresh` property to `false` and then the container's `autoStartup` property will define whether or not the container is started.
1959+
19561960
[[kafka-validation]]
19571961
===== `@KafkaListener` `@Payload` Validation
19581962

spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistry.java

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,8 @@ public class KafkaListenerEndpointRegistry implements ListenerContainerRegistry,
8383

8484
private boolean contextRefreshed;
8585

86+
private boolean alwaysStartAfterRefresh = true;
87+
8688
private volatile boolean running;
8789

8890
@Override
@@ -107,6 +109,20 @@ public MessageListenerContainer getListenerContainer(String id) {
107109
return this.listenerContainers.get(id);
108110
}
109111

112+
/**
113+
* By default, containers registered for endpoints after the context is refreshed
114+
* are immediately started, regardless of their autoStartup property, to comply with
115+
* the {@link SmartLifecycle} contract, where autoStartup is only considered during
116+
* context initialization. Set to false to apply the autoStartup property, even for
117+
* late endpoint binding. If this is called after the context is refreshed, it will
118+
* apply to any endpoints registered after that call.
119+
* @param alwaysStartAfterRefresh false to apply the property.
120+
* @since 2.8.7
121+
*/
122+
public void setAlwaysStartAfterRefresh(boolean alwaysStartAfterRefresh) {
123+
this.alwaysStartAfterRefresh = alwaysStartAfterRefresh;
124+
}
125+
110126
/**
111127
* Return the ids of the managed {@link MessageListenerContainer} instance(s).
112128
* @return the ids.
@@ -327,7 +343,7 @@ public void onApplicationEvent(ContextRefreshedEvent event) {
327343
* @see MessageListenerContainer#isAutoStartup()
328344
*/
329345
private void startIfNecessary(MessageListenerContainer listenerContainer) {
330-
if (this.contextRefreshed || listenerContainer.isAutoStartup()) {
346+
if ((this.contextRefreshed && this.alwaysStartAfterRefresh) || listenerContainer.isAutoStartup()) {
331347
listenerContainer.start();
332348
}
333349
}

spring-kafka/src/test/java/org/springframework/kafka/annotation/EnableKafkaIntegrationTests.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,9 +74,12 @@
7474
import org.springframework.beans.factory.annotation.Autowired;
7575
import org.springframework.beans.factory.config.BeanDefinition;
7676
import org.springframework.beans.factory.config.BeanPostProcessor;
77+
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
78+
import org.springframework.context.ApplicationContext;
7779
import org.springframework.context.annotation.Bean;
7880
import org.springframework.context.annotation.Configuration;
7981
import org.springframework.context.annotation.Role;
82+
import org.springframework.context.annotation.Scope;
8083
import org.springframework.context.event.EventListener;
8184
import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
8285
import org.springframework.core.MethodParameter;
@@ -182,7 +185,7 @@
182185
"annotated25", "annotated25reply1", "annotated25reply2", "annotated26", "annotated27", "annotated28",
183186
"annotated29", "annotated30", "annotated30reply", "annotated31", "annotated32", "annotated33",
184187
"annotated34", "annotated35", "annotated36", "annotated37", "foo", "manualStart", "seekOnIdle",
185-
"annotated38", "annotated38reply", "annotated39", "annotated40", "annotated41" })
188+
"annotated38", "annotated38reply", "annotated39", "annotated40", "annotated41", "annotated42" })
186189
@TestPropertySource(properties = "spel.props=fetch.min.bytes=420000,max.poll.records=10")
187190
public class EnableKafkaIntegrationTests {
188191

@@ -989,6 +992,14 @@ public void testContentConversion() throws InterruptedException {
989992
assertThat(this.listener.contentFoo).isEqualTo(new Foo("bar"));
990993
}
991994

995+
@Test
996+
void proto(@Autowired ApplicationContext context) {
997+
this.registry.setAlwaysStartAfterRefresh(false);
998+
context.getBean(ProtoListener.class);
999+
assertThat(this.registry.getListenerContainer("proto").isRunning()).isFalse();
1000+
this.registry.setAlwaysStartAfterRefresh(true);
1001+
}
1002+
9921003
@Configuration
9931004
@EnableKafka
9941005
@EnableTransactionManagement(proxyTargetClass = true)
@@ -1713,6 +1724,20 @@ String barInfo() {
17131724
return "info for the bar listener";
17141725
}
17151726

1727+
@Bean
1728+
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
1729+
ProtoListener proto() {
1730+
return new ProtoListener();
1731+
}
1732+
1733+
}
1734+
1735+
static class ProtoListener {
1736+
1737+
@KafkaListener(id = "proto", topics = "annotated-42", autoStartup = "false")
1738+
public void listen(String in) {
1739+
}
1740+
17161741
}
17171742

17181743
@Component

0 commit comments

Comments
 (0)