Skip to content

Commit 8bddfb5

Browse files
committed
fix fabric8io#4365: adding a test for the termination exception
1 parent 0742e94 commit 8bddfb5

File tree

3 files changed

+70
-4
lines changed

3 files changed

+70
-4
lines changed

kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/impl/cache/Reflector.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,8 +60,20 @@ public Reflector(ListerWatcher<T, L> listerWatcher, SyncableStore<T> store) {
6060
}
6161

6262
public CompletableFuture<Void> start() {
63+
return start(false); // start without reconnecting
64+
}
65+
66+
public CompletableFuture<Void> start(boolean reconnect) {
6367
this.running = true;
64-
return listSyncAndWatch(false);
68+
CompletableFuture<Void> result = listSyncAndWatch(reconnect);
69+
if (!reconnect) {
70+
result.whenComplete((v, t) -> {
71+
if (t != null) {
72+
stopFuture.completeExceptionally(t);
73+
}
74+
});
75+
}
76+
return result;
6577
}
6678

6779
public void stop() {
@@ -130,7 +142,7 @@ public CompletableFuture<Void> listSyncAndWatch(boolean reconnect) {
130142
return theFuture;
131143
}
132144

133-
private void reconnect() {
145+
protected void reconnect() {
134146
if (!running) {
135147
return;
136148
}

kubernetes-client/src/test/java/io/fabric8/kubernetes/client/informers/impl/cache/ReflectorTest.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,12 @@ void testStateFlags() {
4141
PodList list = new PodListBuilder().withNewMetadata().withResourceVersion("1").endMetadata().build();
4242
Mockito.when(mock.submitList(Mockito.any())).thenReturn(CompletableFuture.completedFuture(list));
4343

44-
Reflector<Pod, PodList> reflector = new Reflector<>(mock, Mockito.mock(SyncableStore.class));
44+
Reflector<Pod, PodList> reflector = new Reflector<Pod, PodList>(mock, Mockito.mock(SyncableStore.class)) {
45+
@Override
46+
protected void reconnect() {
47+
// do nothing
48+
}
49+
};
4550

4651
assertFalse(reflector.isWatching());
4752
assertFalse(reflector.isRunning());
@@ -51,7 +56,7 @@ void testStateFlags() {
5156
.thenThrow(new KubernetesClientException("error"))
5257
.thenReturn(CompletableFuture.completedFuture(Mockito.mock(Watch.class)));
5358

54-
CompletableFuture<Void> future = reflector.start();
59+
CompletableFuture<Void> future = reflector.start(true);
5560

5661
assertThrows(CompletionException.class, future::join);
5762

kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import io.fabric8.kubernetes.api.model.Pod;
2727
import io.fabric8.kubernetes.api.model.PodBuilder;
2828
import io.fabric8.kubernetes.api.model.PodListBuilder;
29+
import io.fabric8.kubernetes.api.model.Service;
2930
import io.fabric8.kubernetes.api.model.Status;
3031
import io.fabric8.kubernetes.api.model.StatusBuilder;
3132
import io.fabric8.kubernetes.api.model.WatchEvent;
@@ -36,7 +37,9 @@
3637
import io.fabric8.kubernetes.api.model.rbac.ClusterRoleBindingBuilder;
3738
import io.fabric8.kubernetes.client.CustomResourceList;
3839
import io.fabric8.kubernetes.client.KubernetesClient;
40+
import io.fabric8.kubernetes.client.KubernetesClientException;
3941
import io.fabric8.kubernetes.client.Watcher;
42+
import io.fabric8.kubernetes.client.WatcherException;
4043
import io.fabric8.kubernetes.client.dsl.base.CustomResourceDefinitionContext;
4144
import io.fabric8.kubernetes.client.dsl.base.ResourceDefinitionContext;
4245
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
@@ -65,6 +68,7 @@
6568
import java.util.concurrent.ExecutionException;
6669
import java.util.concurrent.Future;
6770
import java.util.concurrent.TimeUnit;
71+
import java.util.concurrent.TimeoutException;
6872
import java.util.function.BiFunction;
6973
import java.util.function.Function;
7074

@@ -897,6 +901,51 @@ public void onDelete(Pod oldObj, boolean deletedFinalStateUnknown) {
897901
assertFalse(podInformer.isRunning());
898902
}
899903

904+
@Test
905+
void testTerminalException() throws InterruptedException, TimeoutException {
906+
// should be an initial 404
907+
SharedIndexInformer<Pod> informer = client.pods().runnableInformer(0);
908+
try {
909+
informer.run();
910+
} catch (Exception e) {
911+
}
912+
try {
913+
informer.stopped().get(10, TimeUnit.SECONDS);
914+
} catch (ExecutionException e) {
915+
assertTrue(e.getCause() instanceof KubernetesClientException);
916+
}
917+
918+
String startResourceVersion = "1000";
919+
920+
// initial list
921+
server.expect().withPath("/api/v1/pods")
922+
.andReturn(200, new PodListBuilder().withNewMetadata().withResourceVersion(startResourceVersion).endMetadata()
923+
.withItems(Collections.emptyList()).build())
924+
.once();
925+
926+
// initial watch - terminates with an exception
927+
server.expect().withPath("/api/v1/pods?resourceVersion=" + startResourceVersion + "&allowWatchBookmarks=true&watch=true")
928+
.andUpgradeToWebSocket()
929+
.open()
930+
.waitFor(WATCH_EVENT_EMIT_TIME)
931+
.andEmit(new WatchEvent(new Service(), "ADDED")) // not a pod
932+
.waitFor(OUTDATED_WATCH_EVENT_EMIT_TIME)
933+
.andEmit(outdatedEvent)
934+
.done().always();
935+
936+
// When
937+
informer = client.pods().inAnyNamespace().runnableInformer(0);
938+
try {
939+
informer.run();
940+
} catch (Exception e) {
941+
}
942+
try {
943+
informer.stopped().get(10, TimeUnit.SECONDS);
944+
} catch (ExecutionException e) {
945+
assertTrue(e.getCause() instanceof WatcherException);
946+
}
947+
}
948+
900949
@Test
901950
void testRunAfterStop() {
902951
// Given

0 commit comments

Comments
 (0)