Skip to content

Commit aba0171

Browse files
committed
spring-projectsGH-2239: Replace PartitionPausingBackOffManager
New back of manager (and factory) that uses a task scheduler to resume the paused partitions. Revert change to deprecated PartitionPausingBackoffManager. Log resume.
1 parent 2fcb082 commit aba0171

12 files changed

+286
-45
lines changed

spring-kafka/src/main/java/org/springframework/kafka/config/KafkaListenerEndpointRegistry.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -75,6 +75,8 @@ public class KafkaListenerEndpointRegistry implements ListenerContainerRegistry,
7575

7676
protected final LogAccessor logger = new LogAccessor(LogFactory.getLog(getClass())); //NOSONAR
7777

78+
private final Map<String, MessageListenerContainer> unregisteredContainers = new ConcurrentHashMap<>();
79+
7880
private final Map<String, MessageListenerContainer> listenerContainers = new ConcurrentHashMap<>();
7981

8082
private int phase = AbstractMessageListenerContainer.DEFAULT_PHASE;
@@ -109,6 +111,17 @@ public MessageListenerContainer getListenerContainer(String id) {
109111
return this.listenerContainers.get(id);
110112
}
111113

114+
@Override
115+
@Nullable
116+
public MessageListenerContainer getUnregisteredListenerContainer(String id) {
117+
MessageListenerContainer container = this.unregisteredContainers.get(id);
118+
if (container == null) {
119+
refreshContextContainers();
120+
return this.unregisteredContainers.get(id);
121+
}
122+
return null;
123+
}
124+
112125
/**
113126
* By default, containers registered for endpoints after the context is refreshed
114127
* are immediately started, regardless of their autoStartup property, to comply with
@@ -156,10 +169,17 @@ public Collection<MessageListenerContainer> getListenerContainers() {
156169
public Collection<MessageListenerContainer> getAllListenerContainers() {
157170
List<MessageListenerContainer> containers = new ArrayList<>();
158171
containers.addAll(getListenerContainers());
159-
containers.addAll(this.applicationContext.getBeansOfType(MessageListenerContainer.class, true, false).values());
172+
refreshContextContainers();
173+
containers.addAll(this.unregisteredContainers.values());
160174
return containers;
161175
}
162176

177+
private void refreshContextContainers() {
178+
this.unregisteredContainers.clear();
179+
this.applicationContext.getBeansOfType(MessageListenerContainer.class, true, false).values()
180+
.forEach(container -> this.unregisteredContainers.put(container.getListenerId(), container));
181+
}
182+
163183
/**
164184
* Create a message listener container for the given {@link KafkaListenerEndpoint}.
165185
* <p>This create the necessary infrastructure to honor that endpoint

spring-kafka/src/main/java/org/springframework/kafka/listener/BackOffHandler.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,18 @@
1616

1717
package org.springframework.kafka.listener;
1818

19+
import org.apache.kafka.common.TopicPartition;
20+
1921
import org.springframework.lang.Nullable;
2022

2123
/**
2224
* Handler for the provided back off time, listener container and exception.
25+
* Also supports back off for individual partitions.
2326
*
24-
* @author Jan Marincek
25-
* @since 2.9
27+
* @author Jan Marincek
28+
* @author Gary Russell
29+
* @since 2.9
2630
*/
27-
@FunctionalInterface
2831
public interface BackOffHandler {
2932

3033
/**
@@ -35,4 +38,16 @@ public interface BackOffHandler {
3538
*/
3639
void onNextBackOff(@Nullable MessageListenerContainer container, Exception exception, long nextBackOff);
3740

41+
/**
42+
* Perform the next back off for a partition.
43+
* @param container the container.
44+
* @param partition the partition.
45+
* @param nextBackOff the next back off.
46+
*/
47+
default void onNextBackOff(@Nullable MessageListenerContainer container, TopicPartition partition,
48+
long nextBackOff) {
49+
50+
throw new UnsupportedOperationException();
51+
}
52+
3853
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Copyright 2018-2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
import org.apache.commons.logging.LogFactory;
20+
import org.apache.kafka.common.TopicPartition;
21+
22+
import org.springframework.core.log.LogAccessor;
23+
import org.springframework.util.Assert;
24+
25+
/**
26+
*
27+
* A manager that backs off consumption for a given topic if the timestamp provided is not
28+
* due. Use with {@link DefaultErrorHandler} to guarantee that the message is read
29+
* again after partition consumption is resumed (or seek it manually by other means).
30+
* Note that when a record backs off the partition consumption gets paused for
31+
* approximately that amount of time, so you must have a fixed backoff value per partition.
32+
*
33+
* @author Tomaz Fernandes
34+
* @author Gary Russell
35+
* @since 2.9
36+
* @see DefaultErrorHandler
37+
*/
38+
public class ContainerPartitionPausingBackOffManager implements KafkaConsumerBackoffManager {
39+
40+
private static final LogAccessor LOGGER = new LogAccessor(LogFactory.getLog(KafkaConsumerBackoffManager.class));
41+
42+
private final ListenerContainerRegistry listenerContainerRegistry;
43+
44+
private final BackOffHandler backOffHandler;
45+
46+
/**
47+
* Construct an instance with the provided registry and back off handler.
48+
* @param listenerContainerRegistry
49+
* @param backOffHandler
50+
*/
51+
public ContainerPartitionPausingBackOffManager(ListenerContainerRegistry listenerContainerRegistry,
52+
BackOffHandler backOffHandler) {
53+
54+
this.listenerContainerRegistry = listenerContainerRegistry;
55+
this.backOffHandler = backOffHandler;
56+
}
57+
58+
/**
59+
* Backs off if the current time is before the dueTimestamp provided
60+
* in the {@link Context} object.
61+
* @param context the back off context for this execution.
62+
*/
63+
@Override
64+
public void backOffIfNecessary(Context context) {
65+
long backoffTime = context.getDueTimestamp() - System.currentTimeMillis();
66+
LOGGER.debug(() -> "Back off time: " + backoffTime + " Context: " + context);
67+
if (backoffTime > 0) {
68+
pauseConsumptionAndThrow(context, backoffTime);
69+
}
70+
}
71+
72+
private void pauseConsumptionAndThrow(Context context, Long backOffTime) throws KafkaBackoffException {
73+
TopicPartition topicPartition = context.getTopicPartition();
74+
getListenerContainerFromContext(context).pausePartition(topicPartition);
75+
this.backOffHandler.onNextBackOff(getListenerContainerFromContext(context), topicPartition, backOffTime);
76+
throw new KafkaBackoffException(String.format("Partition %s from topic %s is not ready for consumption, " +
77+
"backing off for approx. %s millis.", context.getTopicPartition().partition(),
78+
context.getTopicPartition().topic(), backOffTime),
79+
topicPartition, context.getListenerId(), context.getDueTimestamp());
80+
}
81+
82+
private MessageListenerContainer getListenerContainerFromContext(Context context) {
83+
MessageListenerContainer container = this.listenerContainerRegistry.getListenerContainer(context.getListenerId()); // NOSONAR
84+
if (container == null) {
85+
container = this.listenerContainerRegistry.getUnregisteredListenerContainer(context.getListenerId());
86+
}
87+
Assert.notNull(container, () -> "No container found with id: " + context.getListenerId());
88+
return container;
89+
}
90+
91+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright 2022 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.kafka.listener;
18+
19+
/**
20+
* A factory for {@link ContainerPartitionPausingBackoffManager}.
21+
*
22+
* @author Gary Russell
23+
* @since 2.9
24+
*
25+
*/
26+
public class ContainerPartitionPausingBackOffManagerFactory extends AbstractKafkaBackOffManagerFactory {
27+
28+
private BackOffHandler backOffHandler;
29+
30+
/**
31+
* Construct an instance with the provided properties.
32+
* @param listenerContainerRegistry the registry.
33+
* @param backOffHandler the back off handler.
34+
*/
35+
public ContainerPartitionPausingBackOffManagerFactory(ListenerContainerRegistry listenerContainerRegistry) {
36+
37+
super(listenerContainerRegistry);
38+
}
39+
40+
@Override
41+
protected KafkaConsumerBackoffManager doCreateManager(ListenerContainerRegistry registry) {
42+
return new ContainerPartitionPausingBackOffManager(getListenerContainerRegistry(), this.backOffHandler);
43+
}
44+
45+
public void setBackOffHandler(BackOffHandler backOffHandler) {
46+
this.backOffHandler = backOffHandler;
47+
}
48+
49+
}

spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerPausingBackOffHandler.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@
1818

1919
import java.time.Duration;
2020

21+
import org.apache.kafka.common.TopicPartition;
22+
2123
import org.springframework.lang.Nullable;
2224

2325
/**
@@ -51,4 +53,9 @@ public void onNextBackOff(@Nullable MessageListenerContainer container, Exceptio
5153
}
5254
}
5355

56+
@Override
57+
public void onNextBackOff(MessageListenerContainer container, TopicPartition partition, long nextBackOff) {
58+
this.pauser.pausePartition(container, partition, Duration.ofMillis(nextBackOff));
59+
}
60+
5461
}

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -318,6 +318,15 @@ public void resume() {
318318
}
319319
}
320320

321+
@Override
322+
public void resumePartition(TopicPartition topicPartition) {
323+
super.resumePartition(topicPartition);
324+
KafkaMessageListenerContainer<K, V>.ListenerConsumer consumer = this.listenerConsumer;
325+
if (consumer != null) {
326+
this.listenerConsumer.wakeIfNecessary();
327+
}
328+
}
329+
321330
@Override
322331
public Map<String, Map<MetricName, ? extends Metric>> metrics() {
323332
ListenerConsumer listenerConsumerForMetrics = this.listenerConsumer;

spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerContainerPauseService.java

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.Optional;
2323

2424
import org.apache.commons.logging.LogFactory;
25+
import org.apache.kafka.common.TopicPartition;
2526

2627
import org.springframework.core.log.LogAccessor;
2728
import org.springframework.lang.Nullable;
@@ -81,13 +82,37 @@ public void pause(MessageListenerContainer messageListenerContainer, Duration pa
8182
}
8283
else {
8384
Instant resumeAt = Instant.now().plusMillis(pauseDuration.toMillis());
84-
LOGGER.debug(() -> "Pausing container " + messageListenerContainer + "resume scheduled for "
85+
LOGGER.debug(() -> "Pausing container " + messageListenerContainer + ", resume scheduled for "
8586
+ resumeAt.atZone(ZoneId.systemDefault()).toLocalDateTime());
8687
messageListenerContainer.pause();
87-
this.scheduler.schedule(() -> resume(messageListenerContainer), resumeAt);
88+
this.scheduler.schedule(() -> {
89+
LOGGER.debug(() -> "Pausing container " + messageListenerContainer);
90+
resume(messageListenerContainer);
91+
}, resumeAt);
8892
}
8993
}
9094

95+
/**
96+
* Pause consumption from a given partition for the duration.
97+
* @param messageListenerContainer the container.
98+
* @param partition the partition.
99+
* @param pauseDuration the duration.
100+
*/
101+
public void pausePartition(MessageListenerContainer messageListenerContainer, TopicPartition partition,
102+
Duration pauseDuration) {
103+
104+
Instant resumeAt = Instant.now().plusMillis(pauseDuration.toMillis());
105+
LOGGER.debug(() -> "Pausing container: " + messageListenerContainer + " partition: " + partition
106+
+ ", resume scheduled for "
107+
+ resumeAt.atZone(ZoneId.systemDefault()).toLocalDateTime());
108+
messageListenerContainer.pausePartition(partition);
109+
this.scheduler.schedule(() -> {
110+
LOGGER.debug(() -> "Resuming container: " + messageListenerContainer + " partition: " + partition);
111+
messageListenerContainer.resumePartition(partition);
112+
}, resumeAt);
113+
114+
}
115+
91116
/**
92117
* Resume the listener container by given id.
93118
* @param listenerId the id of the listener

spring-kafka/src/main/java/org/springframework/kafka/listener/ListenerContainerRegistry.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2021 the original author or authors.
2+
* Copyright 2021-2022 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -41,6 +41,17 @@ public interface ListenerContainerRegistry {
4141
@Nullable
4242
MessageListenerContainer getListenerContainer(String id);
4343

44+
/**
45+
* Return the {@link MessageListenerContainer} with the specified id or {@code null}
46+
* if no such container exists. Returns containers that are not registered with the
47+
* registry, but exist in the application context.
48+
* @param id the id of the container
49+
* @return the container or {@code null} if no container with that id exists
50+
* @see #getListenerContainerIds()
51+
*/
52+
@Nullable
53+
MessageListenerContainer getUnregisteredListenerContainer(String id);
54+
4455
/**
4556
* Return the ids of the managed {@link MessageListenerContainer} instance(s).
4657
* @return the ids.

spring-kafka/src/main/java/org/springframework/kafka/listener/PartitionPausingBackOffManagerFactory.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,12 @@
2929
* IMPORTANT: Since 2.9 this class doesn't create a {@link ThreadPoolTaskExecutor}
3030
* by default. In order for the factory to create a {@link KafkaConsumerTimingAdjuster},
3131
* such thread executor must be provided.
32+
* @deprecated in favor of {@link ContainerPartitionPausingBackOffManager}.
3233
*
3334
* @author Tomaz Fernandes
3435
* @since 2.7
3536
*/
37+
@Deprecated
3638
public class PartitionPausingBackOffManagerFactory extends AbstractKafkaBackOffManagerFactory {
3739

3840
private boolean timingAdjustmentEnabled = true;

spring-kafka/src/main/java/org/springframework/kafka/listener/PartitionPausingBackoffManager.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,12 +39,14 @@
3939
*
4040
* Note that when a record backs off the partition consumption gets paused for
4141
* approximately that amount of time, so you must have a fixed backoff value per partition.
42+
* @deprecated in favor of {@link ContainerPartitionPausingBackOffManager}.
4243
*
4344
* @author Tomaz Fernandes
4445
* @author Gary Russell
4546
* @since 2.7
4647
* @see DefaultErrorHandler
4748
*/
49+
@Deprecated
4850
public class PartitionPausingBackoffManager implements KafkaConsumerBackoffManager,
4951
ApplicationListener<ListenerContainerPartitionIdleEvent> {
5052

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicComponentFactory.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,12 @@
2121
import org.springframework.beans.factory.BeanFactory;
2222
import org.springframework.kafka.config.KafkaListenerContainerFactory;
2323
import org.springframework.kafka.config.KafkaListenerEndpoint;
24+
import org.springframework.kafka.listener.ContainerPartitionPausingBackOffManagerFactory;
2425
import org.springframework.kafka.listener.DeadLetterPublishingRecoverer;
2526
import org.springframework.kafka.listener.KafkaBackOffManagerFactory;
2627
import org.springframework.kafka.listener.KafkaConsumerBackoffManager;
2728
import org.springframework.kafka.listener.ListenerContainerRegistry;
2829
import org.springframework.kafka.listener.MessageListenerContainer;
29-
import org.springframework.kafka.listener.PartitionPausingBackOffManagerFactory;
3030
import org.springframework.kafka.listener.adapter.KafkaBackoffAwareMessageListenerAdapter;
3131

3232
/**
@@ -153,7 +153,7 @@ public RetryTopicNamesProviderFactory retryTopicNamesProviderFactory() {
153153
* @return the instance.
154154
*/
155155
public KafkaBackOffManagerFactory kafkaBackOffManagerFactory(ListenerContainerRegistry registry) {
156-
return new PartitionPausingBackOffManagerFactory(registry);
156+
return new ContainerPartitionPausingBackOffManagerFactory(registry);
157157
}
158158

159159
/**

0 commit comments

Comments
 (0)