Skip to content

Commit 19a9a45

Browse files
garyrussellartembilan
authored andcommitted
GH-2280: Add ContainerProperties.pauseImmediate
Resolves #2280
1 parent 6f44846 commit 19a9a45

File tree

4 files changed

+287
-16
lines changed

4 files changed

+287
-16
lines changed

spring-kafka-docs/src/main/asciidoc/kafka.adoc

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2580,6 +2580,10 @@ See `monitorInterval`.
25802580
Deprecated.
25812581
Replaced by `KafkaUtils.setConsumerRecordFormatter`.
25822582

2583+
|[[pauseImmediate]]<<pauseImmediate,`pauseImmediate`>>
2584+
|`false`
2585+
|When the container is paused, stop processing after the current record instead of after processing all the records from the previous poll; the remaining records are retained in memory and will be passed to the listener when the container is resumed.
2586+
25832587
|[[pollTimeout]]<<pollTimeout,`pollTimeout`>>
25842588
|5000
25852589
|The timeout passed into `Consumer.poll()`.
@@ -3789,6 +3793,10 @@ However, the consumers might not have actually paused yet.
37893793

37903794
In addition (also since 2.1.5), `ConsumerPausedEvent` and `ConsumerResumedEvent` instances are published with the container as the `source` property and the `TopicPartition` instances involved in the `partitions` property.
37913795

3796+
Starting with version 2.9, a new container property `pauseImmediate`, when set to true, causes the pause to take effect after the current record is processed.
3797+
By default, the pause takes effect when all of the records from the previous poll have been processed.
3798+
See <<pauseImmediate>>.
3799+
37923800
The following simple Spring Boot application demonstrates by using the container registry to get a reference to a `@KafkaListener` method's container and pausing or resuming its consumers as well as receiving the corresponding events:
37933801

37943802
====

spring-kafka/src/main/java/org/springframework/kafka/listener/ContainerProperties.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,8 @@ public EOSMode getMode() {
326326

327327
private boolean asyncAcks;
328328

329+
private boolean pauseImmediate;
330+
329331
/**
330332
* Create properties for a container that will subscribe to the specified topics.
331333
* @param topics the topics.
@@ -934,6 +936,27 @@ public void setAsyncAcks(boolean asyncAcks) {
934936
this.asyncAcks = asyncAcks;
935937
}
936938

939+
/**
940+
* When pausing the container with a record listener, whether the pause takes effect
941+
* immediately, when the current record has been processed, or after all records from
942+
* the previous poll have been processed. Default false.
943+
* @return whether to pause immediately.
944+
* @since 2.9
945+
*/
946+
public boolean isPauseImmediate() {
947+
return this.pauseImmediate;
948+
}
949+
950+
/**
951+
* Set to true to pause the container after the current record has been processed, rather
952+
* than after all the records from the previous poll have been processed.
953+
* @param pauseImmediate true to pause immediately.
954+
* @since 2.9
955+
*/
956+
public void setPauseImmediate(boolean pauseImmediate) {
957+
this.pauseImmediate = pauseImmediate;
958+
}
959+
937960
private void adviseListenerIfNeeded() {
938961
if (!CollectionUtils.isEmpty(this.adviceChain)) {
939962
if (AopUtils.isAopProxy(this.messageListener)) {

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 39 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -723,6 +723,8 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
723723

724724
private final Set<TopicPartition> pausedForNack = new HashSet<>();
725725

726+
private final boolean pauseImmediate = this.containerProperties.isPauseImmediate();
727+
726728
private Map<TopicPartition, OffsetMetadata> definedPartitions;
727729

728730
private int count;
@@ -761,7 +763,7 @@ private final class ListenerConsumer implements SchedulingAwareRunnable, Consume
761763

762764
private boolean receivedSome;
763765

764-
private ConsumerRecords<K, V> pendingRecordsAfterError;
766+
private ConsumerRecords<K, V> remainingRecords;
765767

766768
private boolean pauseForPending;
767769

@@ -1362,7 +1364,7 @@ protected void pollAndInvoke() {
13621364
debugRecords(records);
13631365

13641366
invokeIfHaveRecords(records);
1365-
if (this.pendingRecordsAfterError == null) {
1367+
if (this.remainingRecords == null) {
13661368
resumeConsumerIfNeccessary();
13671369
if (!this.consumerPaused) {
13681370
resumePartitionsIfNecessary();
@@ -1376,9 +1378,9 @@ private void doProcessCommits() {
13761378
processCommits();
13771379
}
13781380
catch (CommitFailedException cfe) {
1379-
if (this.pendingRecordsAfterError != null && !this.isBatchListener) {
1380-
ConsumerRecords<K, V> pending = this.pendingRecordsAfterError;
1381-
this.pendingRecordsAfterError = null;
1381+
if (this.remainingRecords != null && !this.isBatchListener) {
1382+
ConsumerRecords<K, V> pending = this.remainingRecords;
1383+
this.remainingRecords = null;
13821384
List<ConsumerRecord<?, ?>> records = new ArrayList<>();
13831385
Iterator<ConsumerRecord<K, V>> iterator = pending.iterator();
13841386
while (iterator.hasNext()) {
@@ -1544,19 +1546,19 @@ private ConsumerRecords<K, V> doPoll() {
15441546
}
15451547
else {
15461548
records = pollConsumer();
1547-
if (this.pendingRecordsAfterError != null) {
1549+
if (this.remainingRecords != null) {
15481550
int howManyRecords = records.count();
15491551
if (howManyRecords > 0) {
15501552
this.logger.error(() -> String.format("Poll returned %d record(s) while consumer was paused "
15511553
+ "after an error; emergency stop invoked to avoid message loss", howManyRecords));
15521554
KafkaMessageListenerContainer.this.emergencyStop.run();
15531555
}
1554-
TopicPartition firstPart = this.pendingRecordsAfterError.partitions().iterator().next();
1556+
TopicPartition firstPart = this.remainingRecords.partitions().iterator().next();
15551557
boolean isPaused = isPaused() || isPartitionPauseRequested(firstPart);
15561558
this.logger.debug(() -> "First pending after error: " + firstPart + "; paused: " + isPaused);
15571559
if (!isPaused) {
1558-
records = this.pendingRecordsAfterError;
1559-
this.pendingRecordsAfterError = null;
1560+
records = this.remainingRecords;
1561+
this.remainingRecords = null;
15601562
}
15611563
}
15621564
captureOffsets(records);
@@ -2221,8 +2223,8 @@ private RuntimeException doInvokeBatchListener(final ConsumerRecords<K, V> recor
22212223
private void commitOffsetsIfNeeded(final ConsumerRecords<K, V> records) {
22222224
if ((!this.autoCommit && this.commonErrorHandler.isAckAfterHandle())
22232225
|| this.producer != null) {
2224-
if (this.pendingRecordsAfterError != null) {
2225-
ConsumerRecord<K, V> firstUncommitted = this.pendingRecordsAfterError.iterator().next();
2226+
if (this.remainingRecords != null) {
2227+
ConsumerRecord<K, V> firstUncommitted = this.remainingRecords.iterator().next();
22262228
Iterator<ConsumerRecord<K, V>> it = records.iterator();
22272229
while (it.hasNext()) {
22282230
ConsumerRecord<K, V> next = it.next();
@@ -2388,7 +2390,7 @@ private void invokeBatchErrorHandler(final ConsumerRecords<K, V> records,
23882390
records, this.consumer, KafkaMessageListenerContainer.this.thisOrParentContainer,
23892391
() -> invokeBatchOnMessageWithRecordsOrList(records, list));
23902392
if (!afterHandling.isEmpty()) {
2391-
this.pendingRecordsAfterError = afterHandling;
2393+
this.remainingRecords = afterHandling;
23922394
this.pauseForPending = true;
23932395
}
23942396
}
@@ -2445,7 +2447,9 @@ private void invokeRecordListenerInTx(final ConsumerRecords<K, V> records) {
24452447
handleNack(records, record);
24462448
break;
24472449
}
2448-
2450+
if (checkImmediatePause(iterator)) {
2451+
break;
2452+
}
24492453
}
24502454
}
24512455

@@ -2529,7 +2533,26 @@ private void doInvokeWithRecords(final ConsumerRecords<K, V> records) {
25292533
handleNack(records, record);
25302534
break;
25312535
}
2536+
if (checkImmediatePause(iterator)) {
2537+
break;
2538+
}
2539+
}
2540+
}
2541+
2542+
private boolean checkImmediatePause(Iterator<ConsumerRecord<K, V>> iterator) {
2543+
if (isPaused() && this.pauseImmediate) {
2544+
Map<TopicPartition, List<ConsumerRecord<K, V>>> remaining = new HashMap<>();
2545+
while (iterator.hasNext()) {
2546+
ConsumerRecord<K, V> next = iterator.next();
2547+
remaining.computeIfAbsent(new TopicPartition(next.topic(), next.partition()),
2548+
tp -> new ArrayList<ConsumerRecord<K, V>>()).add(next);
2549+
}
2550+
if (remaining.size() > 0) {
2551+
this.remainingRecords = new ConsumerRecords<>(remaining);
2552+
return true;
2553+
}
25322554
}
2555+
return false;
25332556
}
25342557

25352558
@Nullable
@@ -2676,8 +2699,8 @@ private void commitOffsetsIfNeeded(final ConsumerRecord<K, V> record) {
26762699
if (this.isManualAck) {
26772700
this.commitRecovered = true;
26782701
}
2679-
if (this.pendingRecordsAfterError == null
2680-
|| !record.equals(this.pendingRecordsAfterError.iterator().next())) {
2702+
if (this.remainingRecords == null
2703+
|| !record.equals(this.remainingRecords.iterator().next())) {
26812704
ackCurrent(record);
26822705
}
26832706
if (this.isManualAck) {
@@ -2795,7 +2818,7 @@ private void invokeErrorHandler(final ConsumerRecord<K, V> record,
27952818
tp -> new ArrayList<ConsumerRecord<K, V>>()).add(next);
27962819
}
27972820
if (records.size() > 0) {
2798-
this.pendingRecordsAfterError = new ConsumerRecords<>(records);
2821+
this.remainingRecords = new ConsumerRecords<>(records);
27992822
this.pauseForPending = true;
28002823
}
28012824
}

0 commit comments

Comments
 (0)