Skip to content

Commit a887fba

Browse files
Make provided TaskExecutor a bean
Add KafkaBackOffManagerConfigurer to RetryTopicConfigurationSupport
1 parent 39d6549 commit a887fba

File tree

5 files changed

+312
-72
lines changed

5 files changed

+312
-72
lines changed

spring-kafka/src/main/java/org/springframework/kafka/config/RetryTopicComponentFactory.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,11 @@
1919
import java.time.Clock;
2020

2121
import org.springframework.beans.factory.BeanFactory;
22+
import org.springframework.core.task.TaskExecutor;
2223
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
2324
import org.springframework.kafka.listener.KafkaBackOffManagerFactory;
2425
import org.springframework.kafka.listener.KafkaConsumerBackoffManager;
26+
import org.springframework.kafka.listener.KafkaConsumerTimingAdjuster;
2527
import org.springframework.kafka.listener.ListenerContainerRegistry;
2628
import org.springframework.kafka.listener.MessageListenerContainer;
2729
import org.springframework.kafka.listener.PartitionPausingBackOffManagerFactory;
@@ -38,6 +40,7 @@
3840
import org.springframework.kafka.retrytopic.RetryTopicConfigurer;
3941
import org.springframework.kafka.retrytopic.RetryTopicNamesProviderFactory;
4042
import org.springframework.kafka.retrytopic.SuffixingRetryTopicNamesProviderFactory;
43+
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
4144

4245
/**
4346
* Provide the component instances that will be used with
@@ -166,6 +169,14 @@ public KafkaBackOffManagerFactory kafkaBackOffManagerFactory(ListenerContainerRe
166169
return new PartitionPausingBackOffManagerFactory(registry);
167170
}
168171

172+
/**
173+
* Create the {@link TaskExecutor} that will be used in the
174+
* {@link KafkaConsumerTimingAdjuster}.
175+
* @return the task executor.
176+
*/
177+
public TaskExecutor taskExecutor() {
178+
return new ThreadPoolTaskExecutor();
179+
}
169180

170181
/**
171182
* Return the {@link Clock} instance that will be used for all

spring-kafka/src/main/java/org/springframework/kafka/config/RetryTopicConfigurationSupport.java

Lines changed: 167 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package org.springframework.kafka.config;
1818

19+
import java.time.Clock;
1920
import java.util.ArrayList;
2021
import java.util.List;
2122
import java.util.function.Consumer;
@@ -26,15 +27,20 @@
2627
import org.springframework.context.ApplicationContext;
2728
import org.springframework.context.annotation.Bean;
2829
import org.springframework.context.annotation.Configuration;
30+
import org.springframework.core.task.TaskExecutor;
2931
import org.springframework.kafka.annotation.EnableRetryTopic;
3032
import org.springframework.kafka.listener.CommonErrorHandler;
3133
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
3234
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
35+
import org.springframework.kafka.listener.DefaultErrorHandler;
3336
import org.springframework.kafka.listener.ExceptionClassifier;
37+
import org.springframework.kafka.listener.KafkaBackOffManagerFactory;
3438
import org.springframework.kafka.listener.KafkaConsumerBackoffManager;
39+
import org.springframework.kafka.listener.KafkaConsumerTimingAdjuster;
3540
import org.springframework.kafka.listener.ListenerContainerRegistry;
3641
import org.springframework.kafka.listener.MessageListenerContainer;
3742
import org.springframework.kafka.listener.PartitionPausingBackOffManagerFactory;
43+
import org.springframework.kafka.listener.WakingKafkaConsumerTimingAdjuster;
3844
import org.springframework.kafka.retrytopic.DeadLetterPublishingRecovererFactory;
3945
import org.springframework.kafka.retrytopic.DefaultDestinationTopicResolver;
4046
import org.springframework.kafka.retrytopic.DestinationTopicProcessor;
@@ -46,8 +52,10 @@
4652
import org.springframework.kafka.retrytopic.RetryTopicConfigurer;
4753
import org.springframework.kafka.retrytopic.RetryTopicInternalBeanNames;
4854
import org.springframework.kafka.retrytopic.RetryTopicNamesProviderFactory;
55+
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
4956
import org.springframework.util.Assert;
5057
import org.springframework.util.backoff.BackOff;
58+
import org.springframework.util.backoff.FixedBackOff;
5159

5260
/**
5361
* This is the main class providing the configuration behind the non-blocking,
@@ -66,6 +74,9 @@ public class RetryTopicConfigurationSupport {
6674

6775
private final RetryTopicComponentFactory componentFactory = createComponentFactory();
6876

77+
private static final String BACK_OFF_MANAGER_THREAD_EXECUTOR_BEAN_NAME = "backOffManagerThreadExecutor";
78+
private static final int NOT_SET = -1;
79+
6980
/**
7081
* Return a global {@link RetryTopicConfigurer} for configuring retry topics
7182
* for {@link KafkaListenerEndpoint} instances with a corresponding
@@ -253,24 +264,79 @@ protected Consumer<DestinationTopicResolver> customizeDestinationTopicResolver()
253264
}
254265

255266
/**
256-
* Provides the {@link KafkaConsumerBackoffManager} instance.
257-
* Override this method to provide a customized implementation.
258-
* A {@link PartitionPausingBackOffManagerFactory} can be used for that purpose,
259-
* or a different implementation can be provided.
267+
* Create the {@link KafkaConsumerBackoffManager} instance that will be used to
268+
* back off partitions.
269+
* To configure it, override the {@link #configureKafkaBackOffManager} method.
270+
* To provide a different implementation, either override this method, or
271+
* override the {@link RetryTopicComponentFactory#kafkaBackOffManagerFactory} method
272+
* and return a different {@link KafkaBackOffManagerFactory}.
260273
* @param registry the {@link ListenerContainerRegistry} to be used to fetch the
261274
* {@link MessageListenerContainer} to be backed off.
275+
* @param taskExecutor the {@link TaskExecutor} to be used with the
276+
* {@link KafkaConsumerTimingAdjuster}.
262277
* @return the instance.
263278
*/
264279
@Bean(name = KafkaListenerConfigUtils.KAFKA_CONSUMER_BACK_OFF_MANAGER_BEAN_NAME)
265280
public KafkaConsumerBackoffManager kafkaConsumerBackoffManager(
266281
@Qualifier(KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME)
267-
ListenerContainerRegistry registry) {
268-
return this.componentFactory.kafkaBackOffManagerFactory(registry).create();
282+
ListenerContainerRegistry registry,
283+
@Qualifier(BACK_OFF_MANAGER_THREAD_EXECUTOR_BEAN_NAME) TaskExecutor taskExecutor) {
284+
285+
KafkaBackOffManagerFactory backOffManagerFactory =
286+
this.componentFactory.kafkaBackOffManagerFactory(registry);
287+
if (backOffManagerFactory instanceof PartitionPausingBackOffManagerFactory) {
288+
configurePartitionPausingFactory(taskExecutor,
289+
(PartitionPausingBackOffManagerFactory) backOffManagerFactory);
290+
}
291+
return backOffManagerFactory.create();
292+
}
293+
294+
private void configurePartitionPausingFactory(TaskExecutor taskExecutor,
295+
PartitionPausingBackOffManagerFactory factory) {
296+
KafkaBackOffManagerConfigurer configurer = new KafkaBackOffManagerConfigurer();
297+
configureKafkaBackOffManager(configurer);
298+
factory.setTimingAdjustmentEnabled(configurer.timingAdjustmentEnabled);
299+
if (configurer.timingAdjustmentEnabled) {
300+
factory.setTaskExecutor(taskExecutor);
301+
if (configurer.maxThreadPoolSize != NOT_SET) {
302+
Assert.isInstanceOf(ThreadPoolTaskExecutor.class, taskExecutor,
303+
() -> "TaskExecutor must be an instance of ThreadPoolTaskExecutor to set maxThreadPoolSize");
304+
((ThreadPoolTaskExecutor) taskExecutor).setMaxPoolSize(configurer.maxThreadPoolSize);
305+
}
306+
}
307+
if (configurer.clock != null) {
308+
factory.setClock(configurer.clock);
309+
}
310+
}
311+
312+
/**
313+
* Create the {@link TaskExecutor} instance that will be used with
314+
* the {@link WakingKafkaConsumerTimingAdjuster} if
315+
* A {@link PartitionPausingBackOffManagerFactory} can be used for that purpose,
316+
* or a different implementation can be provided.
317+
* {@link MessageListenerContainer} to be backed off.
318+
* @return the instance.
319+
*/
320+
@Bean(name = BACK_OFF_MANAGER_THREAD_EXECUTOR_BEAN_NAME)
321+
public TaskExecutor backoffManagerTaskExecutor() {
322+
KafkaBackOffManagerConfigurer configurer = new KafkaBackOffManagerConfigurer();
323+
configureKafkaBackOffManager(configurer);
324+
return configurer.timingAdjustmentEnabled
325+
? this.componentFactory.taskExecutor()
326+
: task -> {
327+
};
328+
}
329+
330+
/**
331+
* Override this method to configure the {@link KafkaConsumerBackoffManager}.
332+
* @param backOffManagerConfigurer a {@link KafkaBackOffManagerConfigurer}.
333+
*/
334+
protected void configureKafkaBackOffManager(KafkaBackOffManagerConfigurer backOffManagerConfigurer) {
269335
}
270336

271337
/**
272-
* Override this method if you want to provide a subclass
273-
* of {@link RetryTopicComponentFactory} with different
338+
* Override this method to provide a subclass of
339+
* {@link RetryTopicComponentFactory} with different
274340
* component implementations or subclasses.
275341
* @return the instance.
276342
*/
@@ -284,42 +350,132 @@ RetryTopicBootstrapper retryTopicBootstrapper(ApplicationContext context) {
284350
return new RetryTopicBootstrapper(context, context.getAutowireCapableBeanFactory());
285351
}
286352

353+
/**
354+
* Configure blocking retries to be used along non-blocking.
355+
*/
287356
public static class BlockingRetriesConfigurer {
288357

289358
private BackOff backOff;
290359

291360
private Class<? extends Exception>[] retryableExceptions;
292361

362+
/**
363+
* Set the exceptions that should be retried by the blocking retry mechanism.
364+
* @param exceptions the exceptions.
365+
* @return the configurer.
366+
* @see DefaultErrorHandler
367+
*/
293368
@SuppressWarnings("varargs")
294369
@SafeVarargs
295370
public final BlockingRetriesConfigurer retryOn(Class<? extends Exception>... exceptions) {
296371
this.retryableExceptions = exceptions;
297372
return this;
298373
}
299374

375+
/**
376+
* Set the {@link BackOff} that should be used with the blocking retry mechanism.
377+
* By default, a {@link FixedBackOff} with 0 delay and 9 retry attempts
378+
* is configured. Note that this only has any effect for exceptions specified
379+
* with the {@link #retryOn} method - by default blocking retries are disabled.
380+
* @param backoff the {@link BackOff} instance.
381+
* @return the configurer.
382+
* @see DefaultErrorHandler
383+
*/
300384
public BlockingRetriesConfigurer backOff(BackOff backoff) {
301385
this.backOff = backoff;
302386
return this;
303387
}
304388
}
305389

390+
/**
391+
* Configure the {@link KafkaConsumerBackoffManager} instance.
392+
*/
393+
public static class KafkaBackOffManagerConfigurer {
394+
395+
boolean timingAdjustmentEnabled = true;
396+
397+
private int maxThreadPoolSize = NOT_SET;
398+
399+
private Clock clock;
400+
401+
/**
402+
* Disable timing adjustment for the delays. With this option records won't be
403+
* processed exactly at the proper time. It's guaranteed however that records
404+
* won't be processed before their due time.
405+
* @return the configurer.
406+
* @see WakingKafkaConsumerTimingAdjuster
407+
*/
408+
public KafkaBackOffManagerConfigurer disableTimingAdjustment() {
409+
this.timingAdjustmentEnabled = false;
410+
return this;
411+
}
412+
413+
/**
414+
* Set the maximum thread pool size to be used by the
415+
* {@link WakingKafkaConsumerTimingAdjuster}. This
416+
* {@link KafkaConsumerTimingAdjuster} implementation spawns threads that will
417+
* sleep for a calculated time, and after that will
418+
* {@link org.apache.kafka.clients.consumer.Consumer#wakeup()} the consumer, in
419+
* order to improve delay precision.
420+
* @param maxThreadPoolSize the maximum thread pool size.
421+
* @return the configurer.
422+
*/
423+
public KafkaBackOffManagerConfigurer setMaxThreadPoolSize(int maxThreadPoolSize) {
424+
this.maxThreadPoolSize = maxThreadPoolSize;
425+
return this;
426+
}
427+
428+
/**
429+
* Set the {@link Clock} instance to be used with the
430+
* {@link KafkaConsumerBackoffManager}.
431+
* @param clock the clock instance.
432+
* @return the configurer.
433+
*/
434+
public KafkaBackOffManagerConfigurer setClock(Clock clock) {
435+
this.clock = clock;
436+
return this;
437+
}
438+
}
439+
440+
/**
441+
* Configure customizers for components instantiated by the retry topics feature.
442+
*/
306443
public static class CustomizersConfigurer {
307444

308445
private Consumer<CommonErrorHandler> errorHandlerCustomizer;
309446
private Consumer<ConcurrentMessageListenerContainer<?, ?>> listenerContainerCustomizer;
310447
private Consumer<DeadLetterPublishingRecoverer> deadLetterPublishingRecovererCustomizer;
311448

312-
protected CustomizersConfigurer customizeErrorHandler(Consumer<CommonErrorHandler> errorHandlerCustomizer) {
449+
/**
450+
* Customize the {@link CommonErrorHandler} instances that will be used for the
451+
* feature.
452+
* @param errorHandlerCustomizer the customizer.
453+
* @return the configurer.
454+
* @see DefaultErrorHandler
455+
*/
456+
public CustomizersConfigurer customizeErrorHandler(Consumer<CommonErrorHandler> errorHandlerCustomizer) {
313457
this.errorHandlerCustomizer = errorHandlerCustomizer;
314458
return this;
315459
}
316460

317-
protected CustomizersConfigurer customizeListenerContainer(Consumer<ConcurrentMessageListenerContainer<?, ?>> listenerContainerCustomizer) {
461+
/**
462+
* Customize the {@link ConcurrentMessageListenerContainer} instances created
463+
* for the retry and DLT consumers.
464+
* @param listenerContainerCustomizer the customizer.
465+
* @return the configurer.
466+
*/
467+
public CustomizersConfigurer customizeListenerContainer(Consumer<ConcurrentMessageListenerContainer<?, ?>> listenerContainerCustomizer) {
318468
this.listenerContainerCustomizer = listenerContainerCustomizer;
319469
return this;
320470
}
321471

322-
protected CustomizersConfigurer customizeDeadLetterPublishingRecoverer(Consumer<DeadLetterPublishingRecoverer> dlprCustomizer) {
472+
/**
473+
* Customize the {@link DeadLetterPublishingRecoverer} that will be used to
474+
* forward the records to the retry topics and DLT.
475+
* @param dlprCustomizer the customizer.
476+
* @return the configurer.
477+
*/
478+
public CustomizersConfigurer customizeDeadLetterPublishingRecoverer(Consumer<DeadLetterPublishingRecoverer> dlprCustomizer) {
323479
this.deadLetterPublishingRecovererCustomizer = dlprCustomizer;
324480
return this;
325481
}

0 commit comments

Comments
 (0)