Skip to content

Commit a207c07

Browse files
authored
Merge branch 'master' into openshift_visit
2 parents 69c6fcd + c241520 commit a207c07

File tree

7 files changed

+36
-35
lines changed

7 files changed

+36
-35
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
* Fix #3083: CertificateException due to PEM being decoded in CertUtils
88
* Fix #3295: Fix wrong kind getting registered in KubernetesDeserializer in SharedInformerFactory
99
* Fix #3318: Informer relist add/update should not always be sync events
10+
* Fix #3328: Allow for generic watches of known types
1011
* Fix #3329: Moved the parsing of resource(String) to the common base client
1112
* Fix #3330: Added usage of the openshift specific handlers for resource(String/HasMetadata) to pickup the right Readiness
1213

kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java

Lines changed: 14 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,15 @@
2727
import io.fabric8.kubernetes.client.Watcher;
2828
import io.fabric8.kubernetes.client.WatcherException;
2929
import io.fabric8.kubernetes.client.Watcher.Action;
30+
import io.fabric8.kubernetes.client.dsl.base.BaseOperation;
3031
import io.fabric8.kubernetes.client.utils.Serialization;
3132
import io.fabric8.kubernetes.client.utils.Utils;
3233
import okhttp3.OkHttpClient;
3334
import okhttp3.Request;
3435
import org.slf4j.Logger;
3536
import org.slf4j.LoggerFactory;
3637

38+
import java.net.MalformedURLException;
3739
import java.util.List;
3840
import java.util.concurrent.ScheduledFuture;
3941
import java.util.concurrent.TimeUnit;
@@ -46,11 +48,6 @@
4648

4749
public abstract class AbstractWatchManager<T extends HasMetadata> implements Watch {
4850

49-
@FunctionalInterface
50-
interface RequestBuilder {
51-
Request build(final String resourceVersion);
52-
}
53-
5451
private static final Logger logger = LoggerFactory.getLogger(AbstractWatchManager.class);
5552

5653
final Watcher<T> watcher;
@@ -62,21 +59,21 @@ interface RequestBuilder {
6259
final AtomicInteger currentReconnectAttempt;
6360
private ScheduledFuture<?> reconnectAttempt;
6461

65-
private final RequestBuilder requestBuilder;
62+
private final BaseOperationRequestBuilder requestBuilder;
6663
protected final OkHttpClient client;
6764

6865
private final AtomicBoolean reconnectPending = new AtomicBoolean(false);
6966

7067
AbstractWatchManager(
71-
Watcher<T> watcher, ListOptions listOptions, int reconnectLimit, int reconnectInterval, int maxIntervalExponent, RequestBuilder requestBuilder, Supplier<OkHttpClient> clientSupplier
72-
) {
68+
Watcher<T> watcher, BaseOperation<T, ?, ?> baseOperation, ListOptions listOptions, int reconnectLimit, int reconnectInterval, int maxIntervalExponent, Supplier<OkHttpClient> clientSupplier
69+
) throws MalformedURLException {
7370
this.watcher = watcher;
7471
this.reconnectLimit = reconnectLimit;
7572
this.retryIntervalCalculator = new ExponentialBackoffIntervalCalculator(reconnectInterval, maxIntervalExponent);
7673
this.resourceVersion = new AtomicReference<>(listOptions.getResourceVersion());
7774
this.currentReconnectAttempt = new AtomicInteger(0);
7875
this.forceClosed = new AtomicBoolean();
79-
this.requestBuilder = requestBuilder;
76+
this.requestBuilder = new BaseOperationRequestBuilder<>(baseOperation, listOptions);
8077
this.client = clientSupplier.get();
8178

8279
runWatch();
@@ -172,8 +169,13 @@ boolean isForceClosed() {
172169
return forceClosed.get();
173170
}
174171

175-
void eventReceived(Watcher.Action action, T resource) {
176-
watcher.eventReceived(action, resource);
172+
void eventReceived(Watcher.Action action, HasMetadata resource) {
173+
// the WatchEvent deserialization is not specifically typed
174+
// modify the type here if needed
175+
if (resource != null && !requestBuilder.getBaseOperation().getType().isAssignableFrom(resource.getClass())) {
176+
resource = (HasMetadata) Serialization.jsonMapper().convertValue(resource, requestBuilder.getBaseOperation().getType());
177+
}
178+
watcher.eventReceived(action, (T)resource);
177179
}
178180

179181
void updateResourceVersion(final String newResourceVersion) {
@@ -238,7 +240,7 @@ protected void onMessage(String message) {
238240
List<HasMetadata> items = list.getItems();
239241
if (items != null) {
240242
for (HasMetadata item : items) {
241-
eventReceived(action, (T) item);
243+
eventReceived(action, item);
242244
}
243245
}
244246
} else if (object instanceof Status) {

kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/BaseOperationRequestBuilder.java

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@
3030
import okhttp3.HttpUrl;
3131
import okhttp3.Request;
3232

33-
class BaseOperationRequestBuilder<T extends HasMetadata, L extends KubernetesResourceList<T>> implements AbstractWatchManager.RequestBuilder {
33+
class BaseOperationRequestBuilder<T extends HasMetadata, L extends KubernetesResourceList<T>> {
3434
private final URL requestUrl;
3535
private final BaseOperation<T, L, ?> baseOperation;
3636
private final ListOptions listOptions;
@@ -40,8 +40,11 @@ public BaseOperationRequestBuilder(BaseOperation<T, L, ?> baseOperation, ListOpt
4040
this.requestUrl = baseOperation.getNamespacedUrl();
4141
this.listOptions = listOptions;
4242
}
43-
44-
@Override
43+
44+
public BaseOperation<T, L, ?> getBaseOperation() {
45+
return baseOperation;
46+
}
47+
4548
public Request build(final String resourceVersion) {
4649
HttpUrl.Builder httpUrlBuilder = HttpUrl.get(requestUrl).newBuilder();
4750

kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchConnectionManager.java

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,9 +60,7 @@ static void closeBody(Response response) {
6060
}
6161

6262
public WatchConnectionManager(final OkHttpClient client, final BaseOperation<T, L, ?> baseOperation, final ListOptions listOptions, final Watcher<T> watcher, final int reconnectInterval, final int reconnectLimit, long websocketTimeout, int maxIntervalExponent) throws MalformedURLException {
63-
super(
64-
watcher, listOptions, reconnectLimit, reconnectInterval, maxIntervalExponent,
65-
new BaseOperationRequestBuilder<>(baseOperation, listOptions), () -> client.newBuilder()
63+
super(watcher, baseOperation, listOptions, reconnectLimit, reconnectInterval, maxIntervalExponent, () -> client.newBuilder()
6664
.readTimeout(websocketTimeout, TimeUnit.MILLISECONDS)
6765
.build());
6866
}

kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/WatchHTTPManager.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -56,8 +56,7 @@ public WatchHTTPManager(final OkHttpClient client,
5656
throws MalformedURLException {
5757

5858
super(
59-
watcher, listOptions, reconnectLimit, reconnectInterval, maxIntervalExponent,
60-
new BaseOperationRequestBuilder<>(baseOperation, listOptions), () -> {
59+
watcher, baseOperation, listOptions, reconnectLimit, reconnectInterval, maxIntervalExponent, () -> {
6160
final OkHttpClient clonedClient = client.newBuilder()
6261
.connectTimeout(connectTimeout, TimeUnit.MILLISECONDS)
6362
.readTimeout(0, TimeUnit.MILLISECONDS)

kubernetes-client/src/main/java/io/fabric8/kubernetes/client/informers/cache/Reflector.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -128,11 +128,6 @@ public void eventReceived(Action action, T resource) {
128128
if (resource == null) {
129129
throw new KubernetesClientException("Unrecognized resource");
130130
}
131-
// the WatchEvent deserialization is not specifically typed
132-
// modify the type here if needed
133-
if (!apiTypeClass.isAssignableFrom(resource.getClass())) {
134-
resource = Serialization.jsonMapper().convertValue(resource, apiTypeClass);
135-
}
136131
if (log.isDebugEnabled()) {
137132
log.debug("Event received {} {}# resourceVersion {}", action.name(), resource.getKind(), resource.getMetadata().getResourceVersion());
138133
}

kubernetes-client/src/test/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManagerTest.java

Lines changed: 13 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -19,13 +19,16 @@
1919
import io.fabric8.kubernetes.api.model.ListOptions;
2020
import io.fabric8.kubernetes.client.Watcher;
2121
import io.fabric8.kubernetes.client.WatcherException;
22+
import io.fabric8.kubernetes.client.dsl.base.BaseOperation;
2223
import io.fabric8.kubernetes.client.utils.Utils;
2324
import okhttp3.Request;
2425
import okhttp3.WebSocket;
2526
import org.junit.jupiter.api.DisplayName;
2627
import org.junit.jupiter.api.Test;
2728
import org.mockito.MockedStatic;
29+
import org.mockito.Mockito;
2830

31+
import java.net.MalformedURLException;
2932
import java.util.concurrent.ScheduledFuture;
3033
import java.util.concurrent.atomic.AtomicInteger;
3134

@@ -43,7 +46,7 @@ class AbstractWatchManagerTest {
4346

4447
@Test
4548
@DisplayName("closeEvent, is idempotent, multiple calls only close watcher once")
46-
void closeEventIsIdempotent() {
49+
void closeEventIsIdempotent() throws MalformedURLException {
4750
// Given
4851
final WatcherAdapter<HasMetadata> watcher = new WatcherAdapter<>();
4952
final WatchManager<HasMetadata> awm = withDefaultWatchManager(watcher);
@@ -57,7 +60,7 @@ void closeEventIsIdempotent() {
5760

5861
@Test
5962
@DisplayName("closeEvent, with Exception, is idempotent, multiple calls only close watcher once")
60-
void closeEventWithExceptionIsIdempotent() {
63+
void closeEventWithExceptionIsIdempotent() throws MalformedURLException {
6164
// Given
6265
final WatcherAdapter<HasMetadata> watcher = new WatcherAdapter<>();
6366
final WatchManager<HasMetadata> awm = withDefaultWatchManager(watcher);
@@ -82,7 +85,7 @@ void closeWebSocket() {
8285

8386
@Test
8487
@DisplayName("nextReconnectInterval, returns exponential interval values up to the provided limit")
85-
void nextReconnectInterval() {
88+
void nextReconnectInterval() throws MalformedURLException {
8689
// Given
8790
final WatchManager<HasMetadata> awm = new WatchManager<>(
8891
null, mock(ListOptions.class), 0, 10, 5);
@@ -98,7 +101,7 @@ void nextReconnectInterval() {
98101

99102
@Test
100103
@DisplayName("cancelReconnect, with null attempt, should do nothing")
101-
void cancelReconnectNullAttempt() {
104+
void cancelReconnectNullAttempt() throws MalformedURLException {
102105
// Given
103106
final ScheduledFuture<?> sf = spy(ScheduledFuture.class);
104107
final WatcherAdapter<HasMetadata> watcher = new WatcherAdapter<>();
@@ -111,7 +114,7 @@ void cancelReconnectNullAttempt() {
111114

112115
@Test
113116
@DisplayName("cancelReconnect, with non-null attempt, should cancel")
114-
void cancelReconnectNonNullAttempt() {
117+
void cancelReconnectNonNullAttempt() throws MalformedURLException {
115118
// Given
116119
final ScheduledFuture<?> sf = mock(ScheduledFuture.class);
117120
final MockedStatic<Utils> utils = mockStatic(Utils.class);
@@ -127,7 +130,7 @@ void cancelReconnectNonNullAttempt() {
127130

128131
@Test
129132
@DisplayName("isClosed, after close invocation, should return true")
130-
void isForceClosedWhenClosed() {
133+
void isForceClosedWhenClosed() throws MalformedURLException {
131134
// Given
132135
final WatcherAdapter<HasMetadata> watcher = new WatcherAdapter<>();
133136
final WatchManager<HasMetadata> awm = withDefaultWatchManager(watcher);
@@ -139,7 +142,7 @@ void isForceClosedWhenClosed() {
139142

140143
@Test
141144
@DisplayName("close, after close invocation, should return true")
142-
void closeWithNonNullRunnerShouldCancelRunner() {
145+
void closeWithNonNullRunnerShouldCancelRunner() throws MalformedURLException {
143146
// Given
144147
final WatcherAdapter<HasMetadata> watcher = new WatcherAdapter<>();
145148
final WatchManager<HasMetadata> awm = withDefaultWatchManager(watcher);
@@ -149,7 +152,7 @@ void closeWithNonNullRunnerShouldCancelRunner() {
149152
assertThat(awm.closeCount.get()).isEqualTo(1);
150153
}
151154

152-
private static <T extends HasMetadata> WatchManager<T> withDefaultWatchManager(Watcher<T> watcher) {
155+
private static <T extends HasMetadata> WatchManager<T> withDefaultWatchManager(Watcher<T> watcher) throws MalformedURLException {
153156
return new WatchManager<>(
154157
watcher, mock(ListOptions.class, RETURNS_DEEP_STUBS), 1, 0, 0);
155158
}
@@ -175,8 +178,8 @@ private static final class WatchManager<T extends HasMetadata> extends AbstractW
175178

176179
private final AtomicInteger closeCount = new AtomicInteger(0);
177180

178-
public WatchManager(Watcher<T> watcher, ListOptions listOptions, int reconnectLimit, int reconnectInterval, int maxIntervalExponent) {
179-
super(watcher, listOptions, reconnectLimit, reconnectInterval, maxIntervalExponent, resourceVersion -> null, ()->null);
181+
public WatchManager(Watcher<T> watcher, ListOptions listOptions, int reconnectLimit, int reconnectInterval, int maxIntervalExponent) throws MalformedURLException {
182+
super(watcher, Mockito.mock(BaseOperation.class), listOptions, reconnectLimit, reconnectInterval, maxIntervalExponent, () -> null);
180183
}
181184

182185
@Override

0 commit comments

Comments
 (0)