Skip to content

Commit e36fab7

Browse files
Add documentation
Review javadocs
1 parent a887fba commit e36fab7

File tree

6 files changed

+152
-122
lines changed

6 files changed

+152
-122
lines changed

spring-kafka-docs/src/main/asciidoc/retrytopic.adoc

Lines changed: 89 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,11 @@ NOTE: You can have separate `ListenerContainerFactory` instances for the main an
4848

4949
==== Configuration
5050

51+
IMPORTANT: Since 2.9, the `@EnableRetryTopic` annotation should be used in a `@Configuration` annotated class.
52+
This enables the feature to bootstrap properly and gives access to injecting some of the feature's components to be looked up at runtime.
53+
Also, to configure the components and global features, the `RetryTopicConfigurationSupport` class should be extended and imported in a `@Configuration` class.
54+
For more details refer to <<retry-topic-global-settings>>.
55+
5156
===== Using the `@RetryableTopic` annotation
5257

5358
To configure the retry topic and dlt for a `@KafkaListener` annotated method, you just have to add the `@RetryableTopic` annotation to it and Spring for Apache Kafka will bootstrap all the necessary topics and consumers with the default configurations.
@@ -151,6 +156,42 @@ public KafkaTemplate<String, Object> kafkaTemplate() {
151156
IMPORTANT: Multiple `@KafkaListener` annotations can be used for the same topic with or without manual partition assignment along with non-blocking retries, but only one configuration will be used for a given topic.
152157
It's best to use a single `RetryTopicConfiguration` bean for configuration of such topics; if multiple `@RetryableTopic` annotations are being used for the same topic, all of them should have the same values, otherwise one of them will be applied to all of that topic's listeners and the other annotations' values will be ignored.
153158

159+
[[retry-topic-global-settings]]
160+
===== Configuring Global Settings and Features
161+
162+
Since 2.9, the previous bean-based configuration approach has been deprecated.
163+
Instead, the `RetryTopicConfigurationSupport` class should be extended, the proper methods overridden, and imported in a `@Configuration` annotated class.
164+
An example follows:
165+
166+
====
167+
[source, java]
168+
----
169+
170+
@EnableKafka
171+
@Import(MyRetryTopicConfiguration.class)
172+
@Configuration
173+
public class KafkaConfiguration {
174+
}
175+
176+
public class MyRetryTopicConfiguration extends RetryTopicConfigurationSupport {
177+
178+
@Override
179+
protected void configureBlockingRetries(BlockingRetriesConfigurer blockingRetries) {
180+
blockingRetries
181+
.retryOn(MyBlockingRetriesException.class, MyOtherBlockingRetriesException.class)
182+
.backOff(new FixedBackOff(3000, 3));
183+
}
184+
185+
@Override
186+
protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
187+
nonBlockingFatalExceptions.add(MyNonBlockingException.class);
188+
}
189+
}
190+
----
191+
====
192+
193+
NOTE: When using this approach, the `@EnableRetryTopic` annotation is not necessary.
194+
154195
==== Features
155196

156197
Most of the features are available both for the `@RetryableTopic` annotation and the `RetryTopicConfiguration` beans.
@@ -315,24 +356,22 @@ NOTE: The default behavior is retrying on all exceptions and not traversing caus
315356

316357
Since 2.8.3 there's a global list of fatal exceptions which will cause the record to be sent to the DLT without any retries.
317358
See <<default-eh>> for the default list of fatal exceptions.
318-
You can add or remove exceptions to and from this list with:
359+
You can add or remove exceptions to and from this list by overriding the `configureNonBlockingRetries` method in a class that extends `RetryTopicConfigurationSupport`.
360+
See <<retry-topic-global-settings>> for more information.
319361

320362
====
321363
[source, java]
322364
----
323-
@Bean(name = RetryTopicInternalBeanNames.DESTINATION_TOPIC_CONTAINER_NAME)
324-
public DefaultDestinationTopicResolver topicResolver(ApplicationContext applicationContext,
325-
@Qualifier(RetryTopicInternalBeanNames
326-
.INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
327-
DefaultDestinationTopicResolver ddtr = new DefaultDestinationTopicResolver(clock, applicationContext);
328-
ddtr.addNotRetryableExceptions(MyFatalException.class);
329-
ddtr.removeNotRetryableException(ConversionException.class);
330-
return ddtr;
365+
366+
@Override
367+
protected void manageNonBlockingRetriesFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
368+
nonBlockingFatalExceptions.add(MyNonBlockingException.class);
331369
}
370+
332371
----
333372
====
334373

335-
NOTE: To disable fatal exceptions' classification, clear the default list using the `setClassifications` method in `DefaultDestinationTopicResolver`.
374+
NOTE: To disable fatal exceptions' classification, just clear the provided list.
336375

337376

338377
===== Include and Exclude Topics
@@ -419,19 +458,18 @@ This is to avoid creation of excessively large messages (due to the stack trace
419458

420459
See <<dlpr-headers>> for more information.
421460

422-
To reconfigure the framework to use different settings for these properties, replace the standard `DeadLetterPublishingRecovererFactory` bean by adding a `recovererCustomizer`:
461+
To reconfigure the framework to use different settings for these properties, configure a `DeadLetterPublishingRecoverer` customizer by overriding the `configureCustomizers` method in a class that extends `RetryTopicConfigurationSupport`.
462+
See <<retry-topic-global-settings>> for more details.
423463

424464
====
425465
[source, java]
426466
----
427-
@Bean(RetryTopicInternalBeanNames.DEAD_LETTER_PUBLISHING_RECOVERER_FACTORY_BEAN_NAME)
428-
DeadLetterPublishingRecovererFactory factory(DestinationTopicResolver resolver) {
429-
DeadLetterPublishingRecovererFactory factory = new DeadLetterPublishingRecovererFactory(resolver);
430-
factory.setDeadLetterPublishingRecovererCustomizer(dlpr -> {
431-
dlpr.appendOriginalHeaders(true);
467+
@Override
468+
protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
469+
customizersConfigurer.customizeDeadLetterPublishingRecoverer(dlpr -> {
470+
dlpr.setAppendOriginalHeaders(true);
432471
dlpr.setStripPreviousExceptionHeaders(false);
433472
});
434-
return factory;
435473
}
436474
----
437475
====
@@ -444,32 +482,22 @@ Starting with version 2.8.4, if you wish to add custom headers (in addition to t
444482
Starting in 2.8.4 you can configure the framework to use both blocking and non-blocking retries in conjunction.
445483
For example, you can have a set of exceptions that would likely trigger errors on the next records as well, such as `DatabaseAccessException`, so you can retry the same record a few times before sending it to the retry topic, or straight to the DLT.
446484

447-
To configure blocking retries you just need to add the exceptions you want to retry through the `addRetryableExceptions` method in the `ListenerContainerFactoryConfigurer` bean as follows.
448-
The default policy is `FixedBackOff`, with nine retries and no delay between them.
449-
Optionally, you can provide your own back off policy.
485+
To configure blocking retries, override the `configureBlockingRetries` method in a class that extends `RetryTopicConfigurationSupport` and add the exceptions you want to retry, along with the `BackOff` to be used.
486+
Then `@Import` this subclass in a `@Configuration` class.
487+
The default `BackOff` is a `FixedBackOff` with no delay and 9 attempts.
488+
See <<retry-topic-global-settings>> for more information.
450489

451490
====
452491
[source, java]
453492
----
454-
@Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME)
455-
public ListenerContainerFactoryConfigurer lcfc(KafkaConsumerBackoffManager kafkaConsumerBackoffManager,
456-
DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory,
457-
@Qualifier(RetryTopicInternalBeanNames
458-
.INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
459-
ListenerContainerFactoryConfigurer lcfc = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock);
460-
lcfc.setBlockingRetryableExceptions(MyBlockingRetryException.class, MyOtherBlockingRetryException.class);
461-
lcfc.setBlockingRetriesBackOff(new FixedBackOff(500, 5)); // Optional
462-
return lcfc;
463-
}
464-
----
465-
====
466493
467-
If you need to further tune the exception classification, you can set your own `Map` of classifications through the `ListenerContainerFactoryConfigurer.setErrorHandlerCustomizer()` method, such as:
494+
@Override
495+
protected void configureBlockingRetries(BlockingRetriesConfigurer blockingRetries) {
496+
blockingRetries
497+
.retryOn(MyBlockingRetryException.class, MyOtherBlockingRetryException.class)
498+
.backOff(new FixedBackOff(3000, 5));
499+
}
468500
469-
====
470-
[source, java]
471-
----
472-
lcfc.setErrorHandlerCustomizer(ceh -> ((DefaultErrorHandler) ceh).setClassifications(myClassificationsMap, myDefaultValue));
473501
----
474502
====
475503

@@ -480,23 +508,16 @@ Here's an example with both configurations working together:
480508
====
481509
[source, java]
482510
----
483-
@Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME)
484-
public ListenerContainerFactoryConfigurer lcfc(KafkaConsumerBackoffManager kafkaConsumerBackoffManager,
485-
DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory,
486-
@Qualifier(RetryTopicInternalBeanNames
487-
.INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
488-
ListenerContainerFactoryConfigurer lcfc = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock);
489-
lcfc.setBlockingRetryableExceptions(ShouldRetryOnlyBlockingException.class, ShouldRetryViaBothException.class);
490-
return lcfc;
511+
@Override
512+
protected void configureBlockingRetries(BlockingRetriesConfigurer blockingRetries) {
513+
blockingRetries
514+
.retryOn(ShouldRetryOnlyBlockingException.class, ShouldRetryViaBothException.class)
515+
.backOff(new FixedBackOff(50, 3));
491516
}
492517
493-
@Bean(name = RetryTopicInternalBeanNames.DESTINATION_TOPIC_CONTAINER_NAME)
494-
public DefaultDestinationTopicResolver ddtr(ApplicationContext applicationContext,
495-
@Qualifier(RetryTopicInternalBeanNames
496-
.INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
497-
DefaultDestinationTopicResolver ddtr = new DefaultDestinationTopicResolver(clock, applicationContext);
498-
ddtr.addNotRetryableExceptions(ShouldRetryOnlyBlockingException.class, ShouldSkipBothRetriesException.class);
499-
return ddtr;
518+
@Override
519+
protected void manageNonBlockingFatalExceptions(List<Class<? extends Throwable>> nonBlockingFatalExceptions) {
520+
nonBlockingFatalExceptions.add(ShouldSkipBothRetriesException.class);
500521
}
501522
502523
----
@@ -590,9 +611,14 @@ More complex naming strategies can be accomplished by registering a bean that im
590611
====
591612
[source, java]
592613
----
593-
@Bean
594-
public RetryTopicNamesProviderFactory myRetryNamingProviderFactory() {
595-
return new CustomRetryTopicNamesProviderFactory();
614+
@Override
615+
protected RetryTopicComponentFactory createComponentFactory() {
616+
return new RetryTopicComponentFactory() {
617+
@Override
618+
public RetryTopicNamesProviderFactory retryTopicNamesProviderFactory() {
619+
return new CustomRetryTopicNamesProviderFactory();
620+
}
621+
};
596622
}
597623
----
598624
====
@@ -811,21 +837,14 @@ public RetryTopicConfiguration myOtherRetryTopic(KafkaTemplate<Integer, MyPojo>
811837

812838
IMPORTANT: Since 2.8.3 you can use the same factory for retryable and non-retryable topics.
813839

814-
If you need to revert the factory configuration behavior to prior 2.8.3, you can replace the standard `RetryTopicConfigurer` bean and set `useLegacyFactoryConfigurer` to `true`, such as:
840+
If you need to revert the factory configuration behavior to prior 2.8.3, you can override the `configureRetryTopicConfigurer` method of a class that extends `RetryTopicConfigurationSupport` as explained in <<retry-topic-global-settings>> and set `useLegacyFactoryConfigurer` to `true`, such as:
815841

816842
====
817843
[source, java]
818844
----
819-
820-
@Bean(name = RetryTopicInternalBeanNames.RETRY_TOPIC_CONFIGURER)
821-
public RetryTopicConfigurer retryTopicConfigurer(DestinationTopicProcessor destinationTopicProcessor,
822-
ListenerContainerFactoryResolver containerFactoryResolver,
823-
ListenerContainerFactoryConfigurer listenerContainerFactoryConfigurer,
824-
BeanFactory beanFactory,
825-
RetryTopicNamesProviderFactory retryTopicNamesProviderFactory) {
826-
RetryTopicConfigurer retryTopicConfigurer = new RetryTopicConfigurer(destinationTopicProcessor, containerFactoryResolver, listenerContainerFactoryConfigurer, beanFactory, retryTopicNamesProviderFactory);
827-
retryTopicConfigurer.useLegacyFactoryConfigurer(true);
828-
return retryTopicConfigurer;
845+
@Override
846+
protected Consumer<RetryTopicConfigurer> configureRetryTopicConfigurer() {
847+
return rtc -> rtc.useLegacyFactoryConfigurer(true);
829848
}
830849
----
831850
====
@@ -840,14 +859,10 @@ For example, to change the logging level to WARN you might add:
840859
====
841860
[source, java]
842861
----
843-
@Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME)
844-
public ListenerContainerFactoryConfigurer listenerContainer(KafkaConsumerBackoffManager kafkaConsumerBackoffManager,
845-
DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory,
846-
@Qualifier(RetryTopicInternalBeanNames
847-
.INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
848-
ListenerContainerFactoryConfigurer configurer = new ListenerContainerFactoryConfigurer(kafkaConsumerBackoffManager, deadLetterPublishingRecovererFactory, clock);
849-
configurer.setErrorHandlerCustomizer(commonErrorHandler -> ((DefaultErrorHandler) commonErrorHandler).setLogLevel(KafkaException.Level.WARN));
850-
return configurer;
862+
@Override
863+
protected void configureCustomizers(CustomizersConfigurer customizersConfigurer) {
864+
customizersConfigurer.customizeErrorHandler(commonErrorHandler ->
865+
((DefaultErrorHandler) commonErrorHandler).setLogLevel(KafkaException.Level.WARN))
851866
}
852867
----
853868
====

spring-kafka/src/main/java/org/springframework/kafka/annotation/EnableRetryTopic.java

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -34,16 +34,21 @@
3434
* &#064;EnableRetryTopic
3535
* &#064;Configuration
3636
* public class AppConfig {
37-
*
38-
* &#064;Bean
39-
* public RetryTopicConfiguration myRetryTopicConfiguration(KafkaTemplate kafkaTemplate) {
40-
* return RetryTopicConfigurationBuilder
41-
* .newInstance()
42-
* .maxAttempts(4)
43-
* .create(kafkaTemplate);
44-
* }
45-
* // other &#064;Bean definitions
4637
* }
38+
*
39+
* &#064;Component
40+
* public class MyListener {
41+
*
42+
* &#064;RetryableTopic(fixedDelayTopicStrategy = FixedDelayStrategy.SINGLE_TOPIC, backoff = @Backoff(4000))
43+
* &#064;KafkaListener(topics = "myTopic")
44+
* public void listen(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) {
45+
* logger.info("Message {} received in topic {} ", message, receivedTopic);
46+
* }
47+
*
48+
* &#064;DltHandler
49+
* public void dltHandler(Object message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) {
50+
* logger.info("Message {} received in dlt handler at topic {} ", message, receivedTopic);
51+
* }
4752
* </pre>
4853
*
4954
* To configure the feature's components, extend the {@link RetryTopicConfigurationSupport}
@@ -72,7 +77,7 @@
7277
* protected void configureNonBlockingRetries(NonBlockingRetriesConfigurer nonBlockingRetries) {
7378
* nonBlockingRetries
7479
* .addToFatalExceptions(ShouldSkipBothRetriesException.class);
75-
* }
80+
* } *
7681
* </pre>
7782
*
7883
* @author Tomaz Fernandes

spring-kafka/src/main/java/org/springframework/kafka/config/RetryTopicComponentFactory.java

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import org.springframework.kafka.retrytopic.DefaultDestinationTopicProcessor;
3333
import org.springframework.kafka.retrytopic.DefaultDestinationTopicResolver;
3434
import org.springframework.kafka.retrytopic.DestinationTopic;
35+
import org.springframework.kafka.retrytopic.DestinationTopicContainer;
3536
import org.springframework.kafka.retrytopic.DestinationTopicProcessor;
3637
import org.springframework.kafka.retrytopic.DestinationTopicResolver;
3738
import org.springframework.kafka.retrytopic.ListenerContainerFactoryConfigurer;
@@ -59,10 +60,11 @@ public class RetryTopicComponentFactory {
5960
/**
6061
* Create the {@link RetryTopicConfigurer} that will serve as an entry point
6162
* for configuring non-blocking topic-based delayed retries for a given
62-
* {@link KafkaListenerEndpoint} by processing the appropriate
63+
* {@link KafkaListenerEndpoint}, by processing the appropriate
6364
* {@link RetryTopicConfiguration}.
6465
* @param destinationTopicProcessor the {@link DestinationTopicProcessor} that will be
65-
* used to process the {@link DestinationTopic} instances.
66+
* used to process the {@link DestinationTopic} instances and register them in a
67+
* {@link DestinationTopicContainer}.
6668
* @param listenerContainerFactoryConfigurer the
6769
* {@link ListenerContainerFactoryConfigurer} that will be used to configure the
6870
* {@link KafkaListenerContainerFactory} instances for the non-blocking delayed
@@ -96,20 +98,18 @@ public DestinationTopicProcessor destinationTopicProcessor(DestinationTopicResol
9698

9799
/**
98100
* Create the instance of {@link DestinationTopicResolver} that will be used to store
99-
* the {@link DestinationTopic} instances
100-
* and resolve which a given record should be forwarded to.
101-
*
101+
* the {@link DestinationTopic} instance and resolve which a given record should be
102+
* forwarded to.
102103
* @return the instance.
103104
*/
104105
public DestinationTopicResolver destinationTopicResolver() {
105106
return new DefaultDestinationTopicResolver(this.internalRetryTopicClock);
106107
}
107108

108109
/**
109-
* Create a {@link DeadLetterPublishingRecovererFactory} that will be used to create
110-
* the {@link DeadLetterPublishingRecoverer} that will forward the records to a given
110+
* Create the {@link DeadLetterPublishingRecovererFactory} that will be used to create
111+
* the {@link DeadLetterPublishingRecoverer} to forward the records to a given
111112
* {@link DestinationTopic}.
112-
*
113113
* @param destinationTopicResolver the {@link DestinationTopicResolver} instance to
114114
* resolve the destinations.
115115
* @return the instance.
@@ -122,7 +122,6 @@ public DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory
122122
/**
123123
* Create the {@link ListenerContainerFactoryResolver} that will be used to resolve
124124
* the appropriate {@link KafkaListenerContainerFactory} for a given topic.
125-
*
126125
* @param beanFactory the {@link BeanFactory} that will be used to retrieve the
127126
* {@link KafkaListenerContainerFactory} instance if necessary.
128127
* @return the instance.

0 commit comments

Comments
 (0)