Skip to content

Commit 0bc814a

Browse files
authored
Deserialize WatchEvents using the specific object type (#3786)
* Deserialize WatchEvents using the specific object type * fallback * tested * sonar
1 parent bcfd4fd commit 0bc814a

File tree

2 files changed

+57
-17
lines changed

2 files changed

+57
-17
lines changed

kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/AbstractWatchManager.java

Lines changed: 39 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,12 @@
1515
*/
1616
package io.fabric8.kubernetes.client.dsl.internal;
1717

18-
import io.fabric8.kubernetes.api.model.HasMetadata;
19-
import io.fabric8.kubernetes.api.model.KubernetesResource;
20-
import io.fabric8.kubernetes.api.model.KubernetesResourceList;
21-
import io.fabric8.kubernetes.api.model.ListOptions;
22-
import io.fabric8.kubernetes.api.model.Status;
23-
import io.fabric8.kubernetes.api.model.WatchEvent;
18+
import com.fasterxml.jackson.annotation.JsonProperty;
19+
import com.fasterxml.jackson.core.JsonProcessingException;
20+
import com.fasterxml.jackson.databind.JsonNode;
21+
import com.fasterxml.jackson.databind.ObjectMapper;
22+
import com.fasterxml.jackson.databind.node.ObjectNode;
23+
import io.fabric8.kubernetes.api.model.*;
2424
import io.fabric8.kubernetes.client.KubernetesClientException;
2525
import io.fabric8.kubernetes.client.Watch;
2626
import io.fabric8.kubernetes.client.Watcher;
@@ -232,9 +232,31 @@ public void close() {
232232
closeRequest();
233233
cancelReconnect();
234234
}
235+
236+
private WatchEvent contextAwareWatchEventDeserializer(String messageSource) {
237+
try {
238+
return Serialization.unmarshal(messageSource, WatchEvent.class);
239+
} catch (Exception ex1) {
240+
try {
241+
JsonNode json = Serialization.jsonMapper().readTree(messageSource);
242+
JsonNode objectJson = null;
243+
if (json instanceof ObjectNode && json.has("object")) {
244+
objectJson = ((ObjectNode) json).remove("object");
245+
}
246+
247+
WatchEvent watchEvent = Serialization.jsonMapper().treeToValue(json, WatchEvent.class);
248+
KubernetesResource object = Serialization.jsonMapper().treeToValue(objectJson, baseOperation.getType());
249+
250+
watchEvent.setObject(object);
251+
return watchEvent;
252+
} catch (JsonProcessingException ex2) {
253+
throw new IllegalArgumentException("Failed to deserialize WatchEvent", ex2);
254+
}
255+
}
256+
}
235257

236258
protected WatchEvent readWatchEvent(String messageSource) {
237-
WatchEvent event = Serialization.unmarshal(messageSource, WatchEvent.class);
259+
WatchEvent event = contextAwareWatchEventDeserializer(messageSource);
238260
KubernetesResource object = null;
239261
if (event != null) {
240262
object = event.getObject();
@@ -261,12 +283,10 @@ protected void onMessage(String message) {
261283
try {
262284
WatchEvent event = readWatchEvent(message);
263285
Object object = event.getObject();
264-
if (object instanceof HasMetadata) {
265-
@SuppressWarnings("unchecked")
266-
T obj = (T) object;
267-
updateResourceVersion(obj.getMetadata().getResourceVersion());
268-
Action action = Action.valueOf(event.getType());
269-
eventReceived(action, obj);
286+
if (object instanceof Status) {
287+
Status status = (Status) object;
288+
289+
onStatus(status);
270290
} else if (object instanceof KubernetesResourceList) {
271291
// Dirty cast - should always be valid though
272292
KubernetesResourceList list = (KubernetesResourceList) object;
@@ -278,10 +298,12 @@ protected void onMessage(String message) {
278298
eventReceived(action, item);
279299
}
280300
}
281-
} else if (object instanceof Status) {
282-
Status status = (Status) object;
283-
284-
onStatus(status);
301+
} else if (object instanceof HasMetadata) {
302+
@SuppressWarnings("unchecked")
303+
T obj = (T) object;
304+
updateResourceVersion(obj.getMetadata().getResourceVersion());
305+
Action action = Action.valueOf(event.getType());
306+
eventReceived(action, obj);
285307
} else {
286308
logger.error("Unknown message received: {}", message);
287309
}

kubernetes-tests/src/test/java/io/fabric8/kubernetes/client/mock/DefaultSharedIndexInformerTest.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@
5454
import io.fabric8.kubernetes.client.server.mock.EnableKubernetesMockClient;
5555
import io.fabric8.kubernetes.client.server.mock.KubernetesMockServer;
5656
import io.fabric8.kubernetes.client.utils.Utils;
57+
import io.fabric8.kubernetes.internal.KubernetesDeserializer;
5758
import org.junit.jupiter.api.AfterEach;
5859
import org.junit.jupiter.api.BeforeEach;
5960
import org.junit.jupiter.api.DisplayName;
@@ -927,6 +928,23 @@ void testGenericKubernetesResourceSharedIndexInformerWithNamespaceConfigured() t
927928
assertEquals(0, foundExistingAnimal.getCount());
928929
}
929930

931+
@Test
932+
void testGenericKubernetesResourceSharedIndexInformerWithAdditionalDeserializers() throws InterruptedException {
933+
// Given
934+
setupMockServerExpectations(Animal.class, "ns1", this::getList, r -> new WatchEvent(getAnimal("red-panda", "Carnivora", r), "ADDED"), null, null);
935+
936+
// When
937+
KubernetesDeserializer.registerCustomKind("jungle.example.com/v1","Animal", CronTab.class);
938+
SharedIndexInformer<GenericKubernetesResource> animalSharedIndexInformer = factory.inNamespace("ns1").sharedIndexInformerForCustomResource(animalContext, 60 * WATCH_EVENT_EMIT_TIME);
939+
CountDownLatch foundExistingAnimal = new CountDownLatch(1);
940+
animalSharedIndexInformer.addEventHandler(new TestResourceHandler<>(foundExistingAnimal, "red-panda"));
941+
factory.startAllRegisteredInformers();
942+
foundExistingAnimal.await(LATCH_AWAIT_PERIOD_IN_SECONDS, TimeUnit.SECONDS);
943+
944+
// Then
945+
assertEquals(0, foundExistingAnimal.getCount());
946+
}
947+
930948
private KubernetesResource getAnimal(String name, String order, String resourceVersion) {
931949
AnimalSpec animalSpec = new AnimalSpec();
932950
animalSpec.setOrder(order);

0 commit comments

Comments
 (0)