Skip to content

Commit 321a0b0

Browse files
authored
Fix CompletableFuture hangs when RetryStrategy/MetricsCollector raise errors (#6125)
* Fix CompletableFuture hangs when RetryStrategy/MetricsCollector raise errors * add changelog * Fix failing test * Remove useless exception from test method signature
1 parent 752d8ad commit 321a0b0

File tree

5 files changed

+258
-2
lines changed

5 files changed

+258
-2
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "bugfix",
3+
"category": "AWS SDK for Java v2",
4+
"contributor": "",
5+
"description": "Fix CompletableFuture hanging when RetryStrategy/MetricsCollector raise errors"
6+
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/AsyncApiCallMetricCollectionStage.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -57,6 +57,9 @@ public CompletableFuture<OutputT> execute(SdkHttpFullRequest input, RequestExecu
5757
} else {
5858
future.complete(r);
5959
}
60+
}).exceptionally(t -> {
61+
future.completeExceptionally(t);
62+
return null;
6063
});
6164

6265
return CompletableFutureUtils.forwardExceptionTo(future, executeFuture);

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/AsyncRetryableStage.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,11 @@ private RetryingExecutor(SdkHttpFullRequest request, RequestExecutionContext con
7575

7676
public CompletableFuture<Response<OutputT>> execute() {
7777
CompletableFuture<Response<OutputT>> future = new CompletableFuture<>();
78-
attemptFirstExecute(future);
78+
try {
79+
attemptFirstExecute(future);
80+
} catch (Throwable t) {
81+
future.completeExceptionally(t);
82+
}
7983
return future;
8084
}
8185

@@ -149,7 +153,11 @@ public void maybeAttemptExecute(CompletableFuture<Response<OutputT>> future) {
149153

150154
private void maybeRetryExecute(CompletableFuture<Response<OutputT>> future, Exception exception) {
151155
retryableStageHelper.setLastException(exception);
152-
maybeAttemptExecute(future);
156+
try {
157+
maybeAttemptExecute(future);
158+
} catch (Throwable t) {
159+
future.completeExceptionally(t);
160+
}
153161
}
154162
}
155163
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.core.client;
17+
18+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
19+
import static org.mockito.ArgumentMatchers.any;
20+
import static org.mockito.ArgumentMatchers.eq;
21+
import static org.mockito.Mockito.doThrow;
22+
import static org.mockito.Mockito.when;
23+
import static software.amazon.awssdk.core.internal.util.AsyncResponseHandlerTestUtils.noOpResponseHandler;
24+
import static utils.HttpTestUtils.testAsyncClientBuilder;
25+
26+
import java.time.Duration;
27+
import java.util.Collections;
28+
import java.util.concurrent.CompletableFuture;
29+
import java.util.concurrent.TimeUnit;
30+
import org.junit.Test;
31+
import org.junit.runner.RunWith;
32+
import org.mockito.Mock;
33+
import org.mockito.junit.MockitoJUnitRunner;
34+
import org.mockito.stubbing.Answer;
35+
import software.amazon.awssdk.core.Response;
36+
import software.amazon.awssdk.core.SdkResponse;
37+
import software.amazon.awssdk.core.async.EmptyPublisher;
38+
import software.amazon.awssdk.core.http.ExecutionContext;
39+
import software.amazon.awssdk.core.http.NoopTestRequest;
40+
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
41+
import software.amazon.awssdk.core.interceptor.ExecutionInterceptorChain;
42+
import software.amazon.awssdk.core.interceptor.InterceptorContext;
43+
import software.amazon.awssdk.core.internal.http.AmazonAsyncHttpClient;
44+
import software.amazon.awssdk.core.metrics.CoreMetric;
45+
import software.amazon.awssdk.core.protocol.VoidSdkResponse;
46+
import software.amazon.awssdk.http.SdkHttpFullRequest;
47+
import software.amazon.awssdk.http.SdkHttpFullResponse;
48+
import software.amazon.awssdk.http.SdkHttpResponse;
49+
import software.amazon.awssdk.http.async.AsyncExecuteRequest;
50+
import software.amazon.awssdk.http.async.SdkAsyncHttpClient;
51+
import software.amazon.awssdk.http.async.SdkAsyncHttpResponseHandler;
52+
import software.amazon.awssdk.metrics.MetricCollector;
53+
import software.amazon.awssdk.retries.DefaultRetryStrategy;
54+
import utils.ValidSdkObjects;
55+
56+
/**
57+
* Tests to verify that exceptions thrown by the MetricCollector are reported through the returned future.
58+
* {@link java.util.concurrent.CompletableFuture}.
59+
*
60+
* @see AsyncClientHandlerExceptionTest
61+
*/
62+
@RunWith(MockitoJUnitRunner.class)
63+
public class AsyncClientMetricCollectorExceptionTest {
64+
65+
public static final String MESSAGE = "test exception";
66+
67+
@Mock
68+
private MetricCollector metricCollector;
69+
70+
@Mock
71+
private SdkAsyncHttpClient asyncHttpClient;
72+
73+
@Test
74+
public void exceptionInReportMetricReportedInFuture() {
75+
when(metricCollector.createChild(any())).thenReturn(metricCollector);
76+
Exception exception = new RuntimeException(MESSAGE);
77+
doThrow(exception).when(metricCollector).reportMetric(eq(CoreMetric.API_CALL_DURATION), any(Duration.class));
78+
79+
CompletableFuture<SdkResponse> responseFuture = makeRequest();
80+
81+
assertThatThrownBy(() -> responseFuture.get(1, TimeUnit.SECONDS)).hasRootCause(exception);
82+
}
83+
84+
private CompletableFuture<SdkResponse> makeRequest() {
85+
when(asyncHttpClient.execute(any(AsyncExecuteRequest.class))).thenAnswer((Answer<CompletableFuture<Void>>) invocationOnMock -> {
86+
SdkAsyncHttpResponseHandler handler = invocationOnMock.getArgument(0, AsyncExecuteRequest.class).responseHandler();
87+
handler.onHeaders(SdkHttpFullResponse.builder()
88+
.statusCode(200)
89+
.build());
90+
handler.onStream(new EmptyPublisher<>());
91+
return CompletableFuture.completedFuture(null);
92+
});
93+
94+
AmazonAsyncHttpClient asyncClient = testAsyncClientBuilder()
95+
.retryStrategy(DefaultRetryStrategy.doNotRetry())
96+
.asyncHttpClient(asyncHttpClient)
97+
.build();
98+
99+
SdkHttpFullRequest httpFullRequest = ValidSdkObjects.sdkHttpFullRequest().build();
100+
NoopTestRequest sdkRequest = NoopTestRequest.builder().build();
101+
InterceptorContext interceptorContext = InterceptorContext
102+
.builder()
103+
.request(sdkRequest)
104+
.httpRequest(httpFullRequest)
105+
.build();
106+
107+
Response<SdkResponse> response =
108+
Response.<SdkResponse>builder()
109+
.isSuccess(true)
110+
.response(VoidSdkResponse.builder().build())
111+
.httpResponse(SdkHttpResponse.builder().statusCode(200).build())
112+
.build();
113+
114+
return asyncClient
115+
.requestExecutionBuilder()
116+
.originalRequest(sdkRequest)
117+
.request(httpFullRequest)
118+
.executionContext(
119+
ExecutionContext
120+
.builder()
121+
.executionAttributes(new ExecutionAttributes())
122+
.interceptorContext(interceptorContext)
123+
.metricCollector(metricCollector)
124+
.interceptorChain(new ExecutionInterceptorChain(Collections.emptyList()))
125+
.build()
126+
)
127+
.execute(noOpResponseHandler(response));
128+
}
129+
}
Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.core.client;
17+
18+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
19+
import static org.mockito.ArgumentMatchers.any;
20+
import static org.mockito.Mockito.when;
21+
import static software.amazon.awssdk.core.internal.util.AsyncResponseHandlerTestUtils.noOpResponseHandler;
22+
import static utils.HttpTestUtils.testAsyncClientBuilder;
23+
24+
import java.time.Duration;
25+
import java.util.Collections;
26+
import java.util.concurrent.CompletableFuture;
27+
import java.util.concurrent.TimeUnit;
28+
import org.junit.Test;
29+
import org.junit.runner.RunWith;
30+
import org.mockito.Mock;
31+
import org.mockito.junit.MockitoJUnitRunner;
32+
import software.amazon.awssdk.core.SdkResponse;
33+
import software.amazon.awssdk.core.http.ExecutionContext;
34+
import software.amazon.awssdk.core.http.NoopTestRequest;
35+
import software.amazon.awssdk.core.interceptor.ExecutionAttributes;
36+
import software.amazon.awssdk.core.interceptor.ExecutionInterceptorChain;
37+
import software.amazon.awssdk.core.interceptor.InterceptorContext;
38+
import software.amazon.awssdk.core.internal.http.AmazonAsyncHttpClient;
39+
import software.amazon.awssdk.http.SdkHttpFullRequest;
40+
import software.amazon.awssdk.metrics.MetricCollector;
41+
import software.amazon.awssdk.retries.api.AcquireInitialTokenResponse;
42+
import software.amazon.awssdk.retries.api.RetryStrategy;
43+
import software.amazon.awssdk.retries.api.RetryToken;
44+
import utils.ValidSdkObjects;
45+
46+
/**
47+
* Tests to verify that exceptions thrown by the RetryStrategy are reported through the returned future.
48+
* {@link java.util.concurrent.CompletableFuture}.
49+
*
50+
* @see AsyncClientHandlerExceptionTest
51+
*/
52+
@RunWith(MockitoJUnitRunner.class)
53+
public class AsyncClientRetryStrategyExceptionTest {
54+
55+
public static final String MESSAGE = "test exception";
56+
57+
@Mock
58+
private RetryStrategy retryStrategy;
59+
60+
@Test
61+
public void exceptionInInitialTokenReportedInFuture() {
62+
Exception exception = new RuntimeException(MESSAGE);
63+
when(retryStrategy.acquireInitialToken(any())).thenThrow(exception);
64+
65+
CompletableFuture<SdkResponse> responseFuture = makeRequest();
66+
67+
assertThatThrownBy(() -> responseFuture.get(1, TimeUnit.SECONDS)).hasRootCause(exception);
68+
}
69+
70+
@Test
71+
public void exceptionInRefreshTokenReportedInFuture() {
72+
when(retryStrategy.acquireInitialToken(any())).thenReturn(
73+
AcquireInitialTokenResponse.create(new RetryToken() {
74+
}, Duration.ZERO)
75+
);
76+
Exception exception = new RuntimeException(MESSAGE);
77+
when(retryStrategy.refreshRetryToken(any())).thenThrow(exception);
78+
79+
CompletableFuture<SdkResponse> responseFuture = makeRequest();
80+
81+
assertThatThrownBy(() -> responseFuture.get(1, TimeUnit.SECONDS)).hasRootCause(exception);
82+
}
83+
84+
private CompletableFuture<SdkResponse> makeRequest() {
85+
AmazonAsyncHttpClient asyncClient = testAsyncClientBuilder().retryStrategy(retryStrategy).build();
86+
87+
SdkHttpFullRequest httpFullRequest = ValidSdkObjects.sdkHttpFullRequest().build();
88+
NoopTestRequest sdkRequest = NoopTestRequest.builder().build();
89+
InterceptorContext interceptorContext = InterceptorContext
90+
.builder()
91+
.request(sdkRequest)
92+
.httpRequest(httpFullRequest)
93+
.build();
94+
95+
return asyncClient
96+
.requestExecutionBuilder()
97+
.originalRequest(sdkRequest)
98+
.request(httpFullRequest)
99+
.executionContext(
100+
ExecutionContext
101+
.builder()
102+
.executionAttributes(new ExecutionAttributes())
103+
.interceptorContext(interceptorContext)
104+
.metricCollector(MetricCollector.create("test"))
105+
.interceptorChain(new ExecutionInterceptorChain(Collections.emptyList()))
106+
.build()
107+
)
108+
.execute(noOpResponseHandler());
109+
}
110+
}

0 commit comments

Comments
 (0)