Skip to content

Add support for transforming objects in shared informers #2148

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -54,4 +54,19 @@ public interface SharedInformer<ApiType extends KubernetesObject> {
// store. The value returned is not synchronized with access to the underlying store and is not
// thread-safe.
String lastSyncResourceVersion();

/**
* The TransformFunc is called for each object which is about to be stored. This function is
* intended for you to take the opportunity to remove, transform, or normalize fields. One use
* case is to strip unused metadata fields out of objects to save on RAM cost.
*
* <p>Must be set before starting the informer.
*
* <p>Note: Since the object given to the handler may be already shared with other goroutines, it
* is advisable to copy the object being transform before mutating it at all and returning the
* copy to prevent data races.
*
* @param transformFunc the transform function
*/
void setTransform(TransformFunc transformFunc);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package io.kubernetes.client.informer;

import io.kubernetes.client.common.KubernetesObject;
import io.kubernetes.client.informer.exception.ObjectTransformException;

/*
* TransformFunc allows for transforming an object before it will be processed
* and put into the controller cache and before the corresponding handlers will
* be called on it.
* TransformFunc (similarly to ResourceEventHandler functions) should be able
* to correctly handle the tombstone of type DeletedFinalStateUnknown
*
* The most common usage pattern is to clean-up some parts of the object to
* reduce component memory usage if a given component doesn't care about them.
* given controller doesn't care for them
*/
public interface TransformFunc {

/**
* @param the original object to be transformed
* @return the transformed object
*/
KubernetesObject transform(KubernetesObject object) throws ObjectTransformException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
/*
Copyright 2022 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package io.kubernetes.client.informer.exception;

public class ObjectTransformException extends RuntimeException {
public ObjectTransformException(String message) {
super(message);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import io.kubernetes.client.informer.ListerWatcher;
import io.kubernetes.client.informer.ResourceEventHandler;
import io.kubernetes.client.informer.SharedIndexInformer;
import io.kubernetes.client.informer.TransformFunc;
import io.kubernetes.client.informer.cache.Cache;
import io.kubernetes.client.informer.cache.Controller;
import io.kubernetes.client.informer.cache.DeltaFIFO;
Expand Down Expand Up @@ -58,6 +59,8 @@ public class DefaultSharedIndexInformer<

private Thread controllerThread;

private TransformFunc transform;

private volatile boolean started = false;
private volatile boolean stopped = false;

Expand Down Expand Up @@ -211,6 +214,14 @@ public String lastSyncResourceVersion() {
return this.controller.lastSyncResourceVersion();
}

@Override
public void setTransform(TransformFunc transformFunc) {
if (started) {
throw new IllegalStateException("cannot set transform func to a running informer");
}
this.transform = transformFunc;
}

@Override
public void run() {
if (started) {
Expand Down Expand Up @@ -258,26 +269,28 @@ public void handleDeltas(Deque<MutablePair<DeltaFIFO.DeltaType, KubernetesObject
// from oldest to newest
for (MutablePair<DeltaFIFO.DeltaType, KubernetesObject> delta : deltas) {
DeltaFIFO.DeltaType deltaType = delta.getLeft();
KubernetesObject obj = delta.getRight();
if (transform != null) {
obj = transform.transform(obj);
}
switch (deltaType) {
case Sync:
case Added:
case Updated:
boolean isSync = deltaType == DeltaFIFO.DeltaType.Sync;
Object oldObj = this.indexer.get((ApiType) delta.getRight());
Object oldObj = this.indexer.get((ApiType) obj);
if (oldObj != null) {
this.indexer.update((ApiType) delta.getRight());
this.indexer.update((ApiType) obj);
this.processor.distribute(
new ProcessorListener.UpdateNotification(oldObj, delta.getRight()), isSync);
new ProcessorListener.UpdateNotification(oldObj, obj), isSync);
} else {
this.indexer.add((ApiType) delta.getRight());
this.processor.distribute(
new ProcessorListener.AddNotification(delta.getRight()), isSync);
this.indexer.add((ApiType) obj);
this.processor.distribute(new ProcessorListener.AddNotification(obj), isSync);
}
break;
case Deleted:
this.indexer.delete((ApiType) delta.getRight());
this.processor.distribute(
new ProcessorListener.DeleteNotification(delta.getRight()), false);
this.indexer.delete((ApiType) obj);
this.processor.distribute(new ProcessorListener.DeleteNotification(obj), false);
break;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,25 @@
*/
package io.kubernetes.client.informer.impl;

import static com.github.tomakehurst.wiremock.client.WireMock.*;
import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
import static com.github.tomakehurst.wiremock.client.WireMock.equalTo;
import static com.github.tomakehurst.wiremock.client.WireMock.get;
import static com.github.tomakehurst.wiremock.client.WireMock.getRequestedFor;
import static com.github.tomakehurst.wiremock.client.WireMock.moreThan;
import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo;
import static com.github.tomakehurst.wiremock.client.WireMock.verify;
import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.options;
import static org.junit.Assert.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import com.github.tomakehurst.wiremock.junit.WireMockRule;
import io.kubernetes.client.informer.EventType;
import io.kubernetes.client.informer.ResourceEventHandler;
import io.kubernetes.client.informer.SharedIndexInformer;
import io.kubernetes.client.informer.SharedInformerFactory;
import io.kubernetes.client.informer.exception.ObjectTransformException;
import io.kubernetes.client.openapi.ApiClient;
import io.kubernetes.client.openapi.ApiException;
import io.kubernetes.client.openapi.JSON;
Expand All @@ -35,6 +45,7 @@
import io.kubernetes.client.util.Watch;
import java.io.IOException;
import java.util.Arrays;
import java.util.Collections;
import java.util.concurrent.atomic.AtomicBoolean;
import org.junit.Before;
import org.junit.Rule;
Expand Down Expand Up @@ -162,6 +173,130 @@ public void testAllNamespacedPodInformerNormalBehavior() throws InterruptedExcep
String startRV = "1000";
String endRV = "1001";

V1PodList podList =
new V1PodList().metadata(new V1ListMeta().resourceVersion(startRV)).items(Arrays.asList());

stubFor(
get(urlPathEqualTo("/api/v1/pods"))
.withQueryParam("watch", equalTo("false"))
.willReturn(
aResponse()
.withStatus(200)
.withHeader("Content-Type", "application/json")
.withBody(new JSON().serialize(podList))));

Watch.Response<V1Pod> watchResponse =
new Watch.Response<>(
EventType.ADDED.name(),
new V1Pod()
.metadata(
new V1ObjectMeta()
.namespace(namespace)
.name(podName)
.resourceVersion(endRV)
.labels(Collections.singletonMap("foo", "bar"))
.annotations(Collections.singletonMap("foo", "bar"))));

stubFor(
get(urlPathEqualTo("/api/v1/pods"))
.withQueryParam("watch", equalTo("true"))
.willReturn(
aResponse()
.withStatus(200)
.withHeader("Content-Type", "application/json")
.withBody(new JSON().serialize(watchResponse))));

SharedInformerFactory factory = new SharedInformerFactory();
SharedIndexInformer<V1Pod> podInformer =
factory.sharedIndexInformerFor(
(CallGeneratorParams params) -> {
try {
return coreV1Api.listPodForAllNamespacesCall(
null,
null,
null,
null,
null,
null,
params.resourceVersion,
null,
params.timeoutSeconds,
params.watch,
null);
} catch (ApiException e) {
throw new RuntimeException(e);
}
},
V1Pod.class,
V1PodList.class);

podInformer.setTransform(
(obj) -> {
// deepcopy
String json = new JSON().serialize(obj);
V1Pod pod = new JSON().deserialize(json, V1Pod.class);
// remove pod annotations
pod.getMetadata().setAnnotations(null);
return pod;
});

AtomicBoolean foundExistingPod = new AtomicBoolean(false);
AtomicBoolean transformed = new AtomicBoolean(false);
AtomicBoolean setTransformAfterStarted = new AtomicBoolean(false);
podInformer.addEventHandler(
new ResourceEventHandler<V1Pod>() {
@Override
public void onAdd(V1Pod obj) {
if (podName.equals(obj.getMetadata().getName())
&& namespace.equals(obj.getMetadata().getNamespace())) {
foundExistingPod.set(true);
}
V1ObjectMeta metadata = obj.getMetadata();
// check if the object was transformed
if (metadata.getLabels().get("foo").equals("bar")
&& metadata.getAnnotations() == null) {
transformed.set(true);
}
}

@Override
public void onUpdate(V1Pod oldObj, V1Pod newObj) {}

@Override
public void onDelete(V1Pod obj, boolean deletedFinalStateUnknown) {}
});
factory.startAllRegisteredInformers();
Thread.sleep(1000);

// can not set transform func if the informer has started
try {
podInformer.setTransform((obj) -> new V1Pod());
setTransformAfterStarted.set(true);
} catch (IllegalStateException e) {
}

assertTrue(foundExistingPod.get());
assertTrue(transformed.get());
assertFalse(setTransformAfterStarted.get());
assertEquals(endRV, podInformer.lastSyncResourceVersion());

verify(
1,
getRequestedFor(urlPathEqualTo("/api/v1/pods")).withQueryParam("watch", equalTo("false")));
verify(
moreThan(1),
getRequestedFor(urlPathEqualTo("/api/v1/pods")).withQueryParam("watch", equalTo("true")));
factory.stopAllRegisteredInformers();
}

@Test
public void testAllNamespacedPodInformerTransformFailure() throws InterruptedException {

CoreV1Api coreV1Api = new CoreV1Api(client);

String startRV = "1000";
String endRV = "1001";

V1PodList podList =
new V1PodList().metadata(new V1ListMeta().resourceVersion(startRV)).items(Arrays.asList());

Expand Down Expand Up @@ -213,6 +348,12 @@ public void testAllNamespacedPodInformerNormalBehavior() throws InterruptedExcep
},
V1Pod.class,
V1PodList.class);

podInformer.setTransform(
(obj) -> {
throw new ObjectTransformException("test transform failure");
});

AtomicBoolean foundExistingPod = new AtomicBoolean(false);
podInformer.addEventHandler(
new ResourceEventHandler<V1Pod>() {
Expand All @@ -233,7 +374,8 @@ public void onDelete(V1Pod obj, boolean deletedFinalStateUnknown) {}
factory.startAllRegisteredInformers();
Thread.sleep(1000);

assertEquals(true, foundExistingPod.get());
// cannot find the pod due to transform failure
assertFalse(foundExistingPod.get());
assertEquals(endRV, podInformer.lastSyncResourceVersion());

verify(
Expand Down