Skip to content

Commit 5dd5f37

Browse files
Remove KafkaBackOffManagerConfigurationSupport
1 parent 9eb9ec4 commit 5dd5f37

File tree

6 files changed

+54
-125
lines changed

6 files changed

+54
-125
lines changed

spring-kafka/src/main/java/org/springframework/kafka/annotation/EnableRetryTopic.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424

2525
import org.springframework.context.annotation.Configuration;
2626
import org.springframework.context.annotation.Import;
27-
import org.springframework.kafka.config.KafkaBackOffManagerConfigurationSupport;
2827
import org.springframework.kafka.config.RetryTopicConfigurationSupport;
2928

3029
/**
@@ -82,6 +81,6 @@
8281
@Retention(RetentionPolicy.RUNTIME)
8382
@Target(ElementType.TYPE)
8483
@Documented
85-
@Import({RetryTopicConfigurationSupport.class, KafkaBackOffManagerConfigurationSupport.class})
84+
@Import(RetryTopicConfigurationSupport.class)
8685
public @interface EnableRetryTopic {
8786
}

spring-kafka/src/main/java/org/springframework/kafka/config/KafkaBackOffManagerConfigurationSupport.java

Lines changed: 0 additions & 104 deletions
This file was deleted.

spring-kafka/src/main/java/org/springframework/kafka/config/RetryTopicConfigurationSupport.java

Lines changed: 32 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import java.util.stream.Stream;
2222

2323
import org.springframework.beans.factory.BeanFactory;
24+
import org.springframework.beans.factory.DisposableBean;
2425
import org.springframework.beans.factory.annotation.Qualifier;
2526
import org.springframework.context.ApplicationContext;
2627
import org.springframework.context.annotation.Bean;
@@ -30,6 +31,9 @@
3031
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
3132
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
3233
import org.springframework.kafka.listener.KafkaConsumerBackoffManager;
34+
import org.springframework.kafka.listener.ListenerContainerRegistry;
35+
import org.springframework.kafka.listener.MessageListenerContainer;
36+
import org.springframework.kafka.listener.PartitionPausingBackOffManagerFactory;
3337
import org.springframework.kafka.retrytopic.DeadLetterPublishingRecovererFactory;
3438
import org.springframework.kafka.retrytopic.DefaultDestinationTopicResolver;
3539
import org.springframework.kafka.retrytopic.DestinationTopicProcessor;
@@ -57,10 +61,12 @@
5761
* @author Tomaz Fernandes
5862
* @since 2.9
5963
*/
60-
public class RetryTopicConfigurationSupport {
64+
public class RetryTopicConfigurationSupport implements DisposableBean {
6165

6266
private final RetryTopicComponentFactory componentFactory = createComponentFactory();
6367

68+
private DisposableBean disposableBackOffManagerFactory;
69+
6470
/**
6571
* Return a global {@link RetryTopicConfigurer} for configuring retry topics
6672
* for {@link KafkaListenerEndpoint} instances with a corresponding
@@ -252,6 +258,24 @@ protected Consumer<DestinationTopicResolver> customizeDestinationTopicResolver()
252258
};
253259
}
254260

261+
/**
262+
* Provides the {@link KafkaConsumerBackoffManager} instance.
263+
* Override this method to provide a customized implementation.
264+
* A {@link PartitionPausingBackOffManagerFactory} can be used for that purpose,
265+
* or a different implementation can be provided.
266+
* @param registry the {@link ListenerContainerRegistry} to be used to fetch the
267+
* {@link MessageListenerContainer} to be backed off.
268+
* @return the instance.
269+
*/
270+
@Bean(name = KafkaListenerConfigUtils.KAFKA_CONSUMER_BACK_OFF_MANAGER_BEAN_NAME)
271+
public KafkaConsumerBackoffManager kafkaConsumerBackoffManager(
272+
@Qualifier(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
273+
ListenerContainerRegistry registry) {
274+
PartitionPausingBackOffManagerFactory factory = new PartitionPausingBackOffManagerFactory(registry);
275+
this.disposableBackOffManagerFactory = factory;
276+
return factory.create();
277+
}
278+
255279
/**
256280
* Override this method if you want to provide a subclass
257281
* of {@link RetryTopicComponentFactory} with different
@@ -268,6 +292,13 @@ RetryTopicBootstrapper retryTopicBootstrapper(ApplicationContext context) {
268292
return new RetryTopicBootstrapper(context, context.getAutowireCapableBeanFactory());
269293
}
270294

295+
@Override
296+
public void destroy() throws Exception {
297+
if (this.disposableBackOffManagerFactory != null) {
298+
this.disposableBackOffManagerFactory.destroy();
299+
}
300+
}
301+
271302
public static class BlockingRetriesConfigurer {
272303

273304
private BackOff backOff;

spring-kafka/src/main/java/org/springframework/kafka/listener/AbstractKafkaBackOffManagerFactory.java

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2021 the original author or authors.
2+
* Copyright 2018-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,8 +18,6 @@
1818

1919
import org.springframework.context.ApplicationContext;
2020
import org.springframework.context.ApplicationContextAware;
21-
import org.springframework.context.ApplicationListener;
22-
import org.springframework.context.ConfigurableApplicationContext;
2321
import org.springframework.kafka.config.KafkaListenerConfigUtils;
2422
import org.springframework.util.Assert;
2523

@@ -88,10 +86,6 @@ protected <T> T getBean(String beanName, Class<T> beanClass) {
8886
return this.applicationContext.getBean(beanName, beanClass);
8987
}
9088

91-
protected void addApplicationListener(ApplicationListener<?> applicationListener) {
92-
((ConfigurableApplicationContext) this.applicationContext).addApplicationListener(applicationListener);
93-
}
94-
9589
@Override
9690
public void setApplicationContext(ApplicationContext applicationContext) {
9791
this.applicationContext = applicationContext;

spring-kafka/src/main/java/org/springframework/kafka/listener/PartitionPausingBackOffManagerFactory.java

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2021 the original author or authors.
2+
* Copyright 2018-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -18,8 +18,7 @@
1818

1919
import java.time.Clock;
2020

21-
import org.springframework.context.ApplicationListener;
22-
import org.springframework.context.event.ContextClosedEvent;
21+
import org.springframework.beans.factory.DisposableBean;
2322
import org.springframework.core.task.TaskExecutor;
2423
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
2524
import org.springframework.util.Assert;
@@ -32,7 +31,8 @@
3231
* @author Tomaz Fernandes
3332
* @since 2.7
3433
*/
35-
public class PartitionPausingBackOffManagerFactory extends AbstractKafkaBackOffManagerFactory {
34+
public class PartitionPausingBackOffManagerFactory extends AbstractKafkaBackOffManagerFactory
35+
implements DisposableBean {
3636

3737
private boolean timingAdjustmentEnabled = true;
3838

@@ -133,9 +133,7 @@ public final void setTaskExecutor(TaskExecutor taskExecutor) {
133133

134134
@Override
135135
protected KafkaConsumerBackoffManager doCreateManager(ListenerContainerRegistry registry) {
136-
PartitionPausingBackoffManager kafkaConsumerBackoffManager = getKafkaConsumerBackoffManager(registry);
137-
super.addApplicationListener(kafkaConsumerBackoffManager);
138-
return kafkaConsumerBackoffManager;
136+
return getKafkaConsumerBackoffManager(registry);
139137
}
140138

141139
protected final Clock getDefaultClock() {
@@ -161,7 +159,15 @@ private TaskExecutor getOrCreateTimingAdjustmentThreadExecutor() {
161159
}
162160
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
163161
executor.initialize();
164-
super.addApplicationListener((ApplicationListener<ContextClosedEvent>) event -> executor.shutdown());
162+
this.taskExecutor = executor;
165163
return executor;
166164
}
165+
166+
@Override
167+
public void destroy() throws Exception {
168+
if (this.taskExecutor != null
169+
&& ThreadPoolTaskExecutor.class.isAssignableFrom(this.taskExecutor.getClass())) {
170+
((ThreadPoolTaskExecutor) this.taskExecutor).shutdown();
171+
}
172+
}
167173
}

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicBootstrapper.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.springframework.kafka.listener.KafkaConsumerBackoffManager;
3333
import org.springframework.kafka.listener.KafkaConsumerTimingAdjuster;
3434
import org.springframework.kafka.listener.PartitionPausingBackOffManagerFactory;
35+
import org.springframework.kafka.listener.PartitionPausingBackoffManager;
3536
import org.springframework.retry.backoff.ThreadWaitSleeper;
3637

3738
/**
@@ -110,9 +111,11 @@ private void registerSingletons() {
110111
}
111112

112113
private void addApplicationListeners() {
113-
((ConfigurableApplicationContext) this.applicationContext)
114-
.addApplicationListener(this.applicationContext.getBean(
115-
RetryTopicInternalBeanNames.DESTINATION_TOPIC_CONTAINER_NAME, DefaultDestinationTopicResolver.class));
114+
ConfigurableApplicationContext context = (ConfigurableApplicationContext) this.applicationContext;
115+
context.addApplicationListener(this.applicationContext.getBean(
116+
RetryTopicInternalBeanNames.DESTINATION_TOPIC_CONTAINER_NAME, DefaultDestinationTopicResolver.class));
117+
context.addApplicationListener(this.applicationContext.getBean(
118+
RetryTopicInternalBeanNames.KAFKA_CONSUMER_BACKOFF_MANAGER, PartitionPausingBackoffManager.class));
116119
}
117120

118121
private KafkaConsumerBackoffManager createKafkaConsumerBackoffManager() {

0 commit comments

Comments
 (0)