Skip to content

Commit cbbe59f

Browse files
committed
just logging resourceeventhandler exceptions
also stopping messages when a watch closes
1 parent 64905e1 commit cbbe59f

File tree

5 files changed

+39
-14
lines changed

5 files changed

+39
-14
lines changed

kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -189,7 +189,13 @@ void eventReceived(Watcher.Action action, HasMetadata resource) {
189189
}
190190
@SuppressWarnings("unchecked")
191191
final T t = (T) resource;
192-
watcher.eventReceived(action, t);
192+
try {
193+
watcher.eventReceived(action, t);
194+
} catch (Exception e) {
195+
// for compatibility, this will just log the exception as was done in previous versions
196+
// a case could be made for this to terminate the watch instead
197+
logger.error("Unhandled exception encountered in watcher event handler", e);
198+
}
193199
}
194200

195201
void updateResourceVersion(final String newResourceVersion) {
@@ -311,7 +317,7 @@ protected void onMessage(String message) {
311317
final String msg = "Couldn't deserialize watch event: " + message;
312318
close(new WatcherException(msg, e, message));
313319
} catch (Exception e) {
314-
final String msg = "Unhandled exception encountered in watcher event handler";
320+
final String msg = "Unexpected exception processing watch event";
315321
close(new WatcherException(msg, e, message));
316322
}
317323
}

kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import java.net.URI;
3434
import java.net.URL;
3535
import java.util.Map;
36+
import java.util.Optional;
3637
import java.util.concurrent.CompletableFuture;
3738
import java.util.concurrent.TimeUnit;
3839

@@ -50,7 +51,7 @@ public class WatchConnectionManager<T extends HasMetadata, L extends KubernetesR
5051
private static final Logger logger = LoggerFactory.getLogger(WatchConnectionManager.class);
5152

5253
protected WatcherWebSocketListener<T> listener;
53-
private CompletableFuture<WebSocket> websocketFuture;
54+
private volatile CompletableFuture<WebSocket> websocketFuture;
5455
private WebSocket websocket;
5556

5657
private volatile boolean ready;
@@ -87,14 +88,14 @@ public WatchConnectionManager(final HttpClient client, final BaseOperation<T, L,
8788
@Override
8889
protected synchronized void closeRequest() {
8990
closeWebSocket(websocket);
90-
if (this.websocketFuture != null) {
91-
this.websocketFuture.whenComplete((w, t) -> {
91+
Optional.ofNullable(this.websocketFuture).ifPresent(theFuture -> {
92+
this.websocketFuture = null;
93+
theFuture.whenComplete((w, t) -> {
9294
if (w != null) {
9395
closeWebSocket(w);
9496
}
9597
});
96-
websocketFuture = null;
97-
}
98+
});
9899
}
99100

100101
synchronized WatcherWebSocketListener<T> getListener() {
@@ -105,6 +106,14 @@ public CompletableFuture<WebSocket> getWebsocketFuture() {
105106
return websocketFuture;
106107
}
107108

109+
@Override
110+
protected void onMessage(String message) {
111+
// for consistency we only want to process the message when we're open
112+
if (this.websocketFuture != null) {
113+
super.onMessage(message);
114+
}
115+
}
116+
108117
@Override
109118
protected void start(URL url, Map<String, String> headers) {
110119
this.listener = new WatcherWebSocketListener<>(this);

kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,12 +29,14 @@
2929
import java.net.MalformedURLException;
3030
import java.net.URL;
3131
import java.util.Map;
32+
import java.util.Optional;
3233
import java.util.concurrent.CompletableFuture;
3334
import java.util.concurrent.TimeUnit;
3435

3536
public class WatchHTTPManager<T extends HasMetadata, L extends KubernetesResourceList<T>> extends AbstractWatchManager<T> {
3637
private static final Logger logger = LoggerFactory.getLogger(WatchHTTPManager.class);
3738
private CompletableFuture<HttpResponse<AsyncBody>> call;
39+
private volatile AsyncBody body;
3840

3941
public WatchHTTPManager(final HttpClient client,
4042
final BaseOperation<T, L, ?> baseOperation,
@@ -74,7 +76,7 @@ protected synchronized void start(URL url, Map<String, String> headers) {
7476
scheduleReconnect();
7577
}
7678
if (response != null) {
77-
AsyncBody body = response.body();
79+
body = response.body();
7880
if (!response.isSuccessful()) {
7981
body.cancel();
8082
if (onStatus(OperationSupport.createStatus(response.code(), response.message()))) {
@@ -101,9 +103,9 @@ protected synchronized void start(URL url, Map<String, String> headers) {
101103

102104
@Override
103105
protected synchronized void closeRequest() {
104-
if (call != null) {
105-
call.cancel(true);
106-
call = null;
107-
}
106+
Optional.ofNullable(call).ifPresent(theFuture -> {
107+
theFuture.cancel(true);
108+
});
109+
Optional.ofNullable(body).ifPresent(AsyncBody::cancel);
108110
}
109111
}

kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatcherWebSocketListener.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ public void onMessage(WebSocket webSocket, String text) {
5757
} finally {
5858
webSocket.request();
5959
}
60-
webSocket.request();
6160
}
6261

6362
@Override

kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/ResourceTest.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -444,7 +444,7 @@ void testWaitUntilCondition() throws InterruptedException {
444444
}
445445

446446
@Test
447-
void tesErrorEventDuringWaitReturnFromAPIIfMatch() throws InterruptedException {
447+
void testErrorEventDuringWaitReturnFromAPIIfMatch() throws InterruptedException {
448448
Pod pod1 = new PodBuilder().withNewMetadata()
449449
.withName("pod1")
450450
.withResourceVersion("1")
@@ -470,6 +470,15 @@ void tesErrorEventDuringWaitReturnFromAPIIfMatch() throws InterruptedException {
470470
.open()
471471
.waitFor(500)
472472
.andEmit(new WatchEvent(status, "ERROR"))
473+
.done()
474+
.once();
475+
476+
server.expect()
477+
.get()
478+
.withPath(
479+
"/api/v1/namespaces/test/pods?fieldSelector=metadata.name%3Dpod1&resourceVersion=1&allowWatchBookmarks=true&watch=true")
480+
.andUpgradeToWebSocket()
481+
.open()
473482
.waitFor(500)
474483
.andEmit(new WatchEvent(ready, "MODIFIED"))
475484
.done()

0 commit comments

Comments
 (0)