Skip to content

Commit d94fcb2

Browse files
committed
add support for transforming objects in shared informers
Signed-off-by: dddddai <[email protected]>
1 parent 9107904 commit d94fcb2

File tree

5 files changed

+237
-12
lines changed

5 files changed

+237
-12
lines changed

util/src/main/java/io/kubernetes/client/informer/SharedInformer.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,4 +54,19 @@ public interface SharedInformer<ApiType extends KubernetesObject> {
5454
// store. The value returned is not synchronized with access to the underlying store and is not
5555
// thread-safe.
5656
String lastSyncResourceVersion();
57+
58+
/**
59+
* The TransformFunc is called for each object which is about to be stored. This function is
60+
* intended for you to take the opportunity to remove, transform, or normalize fields. One use
61+
* case is to strip unused metadata fields out of objects to save on RAM cost.
62+
*
63+
* <p>Must be set before starting the informer.
64+
*
65+
* <p>Note: Since the object given to the handler may be already shared with other goroutines, it
66+
* is advisable to copy the object being transform before mutating it at all and returning the
67+
* copy to prevent data races.
68+
*
69+
* @param transformFunc the transform function
70+
*/
71+
void setTransform(TransformFunc transformFunc);
5772
}
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
Copyright 2022 The Kubernetes Authors.
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
package io.kubernetes.client.informer;
14+
15+
import io.kubernetes.client.common.KubernetesObject;
16+
import io.kubernetes.client.informer.exception.ObjectTransformException;
17+
18+
/*
19+
* TransformFunc allows for transforming an object before it will be processed
20+
* and put into the controller cache and before the corresponding handlers will
21+
* be called on it.
22+
* TransformFunc (similarly to ResourceEventHandler functions) should be able
23+
* to correctly handle the tombstone of type DeletedFinalStateUnknown
24+
*
25+
* The most common usage pattern is to clean-up some parts of the object to
26+
* reduce component memory usage if a given component doesn't care about them.
27+
* given controller doesn't care for them
28+
*/
29+
public interface TransformFunc {
30+
31+
/**
32+
* @param the original object to be transformed
33+
* @return the transformed object
34+
*/
35+
KubernetesObject transform(KubernetesObject object) throws ObjectTransformException;
36+
}
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
/*
2+
Copyright 2022 The Kubernetes Authors.
3+
Licensed under the Apache License, Version 2.0 (the "License");
4+
you may not use this file except in compliance with the License.
5+
You may obtain a copy of the License at
6+
http://www.apache.org/licenses/LICENSE-2.0
7+
Unless required by applicable law or agreed to in writing, software
8+
distributed under the License is distributed on an "AS IS" BASIS,
9+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
package io.kubernetes.client.informer.exception;
14+
15+
public class ObjectTransformException extends RuntimeException {
16+
public ObjectTransformException(String message) {
17+
super(message);
18+
}
19+
}

util/src/main/java/io/kubernetes/client/informer/impl/DefaultSharedIndexInformer.java

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import io.kubernetes.client.informer.ListerWatcher;
1818
import io.kubernetes.client.informer.ResourceEventHandler;
1919
import io.kubernetes.client.informer.SharedIndexInformer;
20+
import io.kubernetes.client.informer.TransformFunc;
2021
import io.kubernetes.client.informer.cache.Cache;
2122
import io.kubernetes.client.informer.cache.Controller;
2223
import io.kubernetes.client.informer.cache.DeltaFIFO;
@@ -58,6 +59,8 @@ public class DefaultSharedIndexInformer<
5859

5960
private Thread controllerThread;
6061

62+
private TransformFunc transform;
63+
6164
private volatile boolean started = false;
6265
private volatile boolean stopped = false;
6366

@@ -211,6 +214,14 @@ public String lastSyncResourceVersion() {
211214
return this.controller.lastSyncResourceVersion();
212215
}
213216

217+
@Override
218+
public void setTransform(TransformFunc transformFunc) {
219+
if (started) {
220+
throw new IllegalStateException("cannot set transform func to a running informer");
221+
}
222+
this.transform = transformFunc;
223+
}
224+
214225
@Override
215226
public void run() {
216227
if (started) {
@@ -258,26 +269,28 @@ public void handleDeltas(Deque<MutablePair<DeltaFIFO.DeltaType, KubernetesObject
258269
// from oldest to newest
259270
for (MutablePair<DeltaFIFO.DeltaType, KubernetesObject> delta : deltas) {
260271
DeltaFIFO.DeltaType deltaType = delta.getLeft();
272+
KubernetesObject obj = delta.getRight();
273+
if (transform != null) {
274+
obj = transform.transform(obj);
275+
}
261276
switch (deltaType) {
262277
case Sync:
263278
case Added:
264279
case Updated:
265280
boolean isSync = deltaType == DeltaFIFO.DeltaType.Sync;
266-
Object oldObj = this.indexer.get((ApiType) delta.getRight());
281+
Object oldObj = this.indexer.get((ApiType) obj);
267282
if (oldObj != null) {
268-
this.indexer.update((ApiType) delta.getRight());
283+
this.indexer.update((ApiType) obj);
269284
this.processor.distribute(
270-
new ProcessorListener.UpdateNotification(oldObj, delta.getRight()), isSync);
285+
new ProcessorListener.UpdateNotification(oldObj, obj), isSync);
271286
} else {
272-
this.indexer.add((ApiType) delta.getRight());
273-
this.processor.distribute(
274-
new ProcessorListener.AddNotification(delta.getRight()), isSync);
287+
this.indexer.add((ApiType) obj);
288+
this.processor.distribute(new ProcessorListener.AddNotification(obj), isSync);
275289
}
276290
break;
277291
case Deleted:
278-
this.indexer.delete((ApiType) delta.getRight());
279-
this.processor.distribute(
280-
new ProcessorListener.DeleteNotification(delta.getRight()), false);
292+
this.indexer.delete((ApiType) obj);
293+
this.processor.distribute(new ProcessorListener.DeleteNotification(obj), false);
281294
break;
282295
}
283296
}

util/src/test/java/io/kubernetes/client/informer/impl/DefaultSharedIndexInformerWireMockTest.java

Lines changed: 145 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,25 @@
1212
*/
1313
package io.kubernetes.client.informer.impl;
1414

15-
import static com.github.tomakehurst.wiremock.client.WireMock.*;
15+
import static com.github.tomakehurst.wiremock.client.WireMock.aResponse;
16+
import static com.github.tomakehurst.wiremock.client.WireMock.equalTo;
17+
import static com.github.tomakehurst.wiremock.client.WireMock.get;
18+
import static com.github.tomakehurst.wiremock.client.WireMock.getRequestedFor;
19+
import static com.github.tomakehurst.wiremock.client.WireMock.moreThan;
20+
import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
21+
import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo;
22+
import static com.github.tomakehurst.wiremock.client.WireMock.verify;
1623
import static com.github.tomakehurst.wiremock.core.WireMockConfiguration.options;
17-
import static org.junit.Assert.*;
24+
import static org.junit.Assert.assertEquals;
25+
import static org.junit.Assert.assertFalse;
26+
import static org.junit.Assert.assertTrue;
1827

1928
import com.github.tomakehurst.wiremock.junit.WireMockRule;
2029
import io.kubernetes.client.informer.EventType;
2130
import io.kubernetes.client.informer.ResourceEventHandler;
2231
import io.kubernetes.client.informer.SharedIndexInformer;
2332
import io.kubernetes.client.informer.SharedInformerFactory;
33+
import io.kubernetes.client.informer.exception.ObjectTransformException;
2434
import io.kubernetes.client.openapi.ApiClient;
2535
import io.kubernetes.client.openapi.ApiException;
2636
import io.kubernetes.client.openapi.JSON;
@@ -35,6 +45,7 @@
3545
import io.kubernetes.client.util.Watch;
3646
import java.io.IOException;
3747
import java.util.Arrays;
48+
import java.util.Collections;
3849
import java.util.concurrent.atomic.AtomicBoolean;
3950
import org.junit.Before;
4051
import org.junit.Rule;
@@ -162,6 +173,130 @@ public void testAllNamespacedPodInformerNormalBehavior() throws InterruptedExcep
162173
String startRV = "1000";
163174
String endRV = "1001";
164175

176+
V1PodList podList =
177+
new V1PodList().metadata(new V1ListMeta().resourceVersion(startRV)).items(Arrays.asList());
178+
179+
stubFor(
180+
get(urlPathEqualTo("/api/v1/pods"))
181+
.withQueryParam("watch", equalTo("false"))
182+
.willReturn(
183+
aResponse()
184+
.withStatus(200)
185+
.withHeader("Content-Type", "application/json")
186+
.withBody(new JSON().serialize(podList))));
187+
188+
Watch.Response<V1Pod> watchResponse =
189+
new Watch.Response<>(
190+
EventType.ADDED.name(),
191+
new V1Pod()
192+
.metadata(
193+
new V1ObjectMeta()
194+
.namespace(namespace)
195+
.name(podName)
196+
.resourceVersion(endRV)
197+
.labels(Collections.singletonMap("foo", "bar"))
198+
.annotations(Collections.singletonMap("foo", "bar"))));
199+
200+
stubFor(
201+
get(urlPathEqualTo("/api/v1/pods"))
202+
.withQueryParam("watch", equalTo("true"))
203+
.willReturn(
204+
aResponse()
205+
.withStatus(200)
206+
.withHeader("Content-Type", "application/json")
207+
.withBody(new JSON().serialize(watchResponse))));
208+
209+
SharedInformerFactory factory = new SharedInformerFactory();
210+
SharedIndexInformer<V1Pod> podInformer =
211+
factory.sharedIndexInformerFor(
212+
(CallGeneratorParams params) -> {
213+
try {
214+
return coreV1Api.listPodForAllNamespacesCall(
215+
null,
216+
null,
217+
null,
218+
null,
219+
null,
220+
null,
221+
params.resourceVersion,
222+
null,
223+
params.timeoutSeconds,
224+
params.watch,
225+
null);
226+
} catch (ApiException e) {
227+
throw new RuntimeException(e);
228+
}
229+
},
230+
V1Pod.class,
231+
V1PodList.class);
232+
233+
podInformer.setTransform(
234+
(obj) -> {
235+
// deepcopy
236+
String json = new JSON().serialize(obj);
237+
V1Pod pod = new JSON().deserialize(json, V1Pod.class);
238+
// remove pod annotations
239+
pod.getMetadata().setAnnotations(null);
240+
return pod;
241+
});
242+
243+
AtomicBoolean foundExistingPod = new AtomicBoolean(false);
244+
AtomicBoolean transformed = new AtomicBoolean(false);
245+
AtomicBoolean setTransformAfterStarted = new AtomicBoolean(false);
246+
podInformer.addEventHandler(
247+
new ResourceEventHandler<V1Pod>() {
248+
@Override
249+
public void onAdd(V1Pod obj) {
250+
if (podName.equals(obj.getMetadata().getName())
251+
&& namespace.equals(obj.getMetadata().getNamespace())) {
252+
foundExistingPod.set(true);
253+
}
254+
V1ObjectMeta metadata = obj.getMetadata();
255+
// check if the object was transformed
256+
if (metadata.getLabels().get("foo").equals("bar")
257+
&& metadata.getAnnotations() == null) {
258+
transformed.set(true);
259+
}
260+
}
261+
262+
@Override
263+
public void onUpdate(V1Pod oldObj, V1Pod newObj) {}
264+
265+
@Override
266+
public void onDelete(V1Pod obj, boolean deletedFinalStateUnknown) {}
267+
});
268+
factory.startAllRegisteredInformers();
269+
Thread.sleep(1000);
270+
271+
// can not set transform func if the informer has started
272+
try {
273+
podInformer.setTransform((obj) -> new V1Pod());
274+
setTransformAfterStarted.set(true);
275+
} catch (IllegalStateException e) {
276+
}
277+
278+
assertTrue(foundExistingPod.get());
279+
assertTrue(transformed.get());
280+
assertFalse(setTransformAfterStarted.get());
281+
assertEquals(endRV, podInformer.lastSyncResourceVersion());
282+
283+
verify(
284+
1,
285+
getRequestedFor(urlPathEqualTo("/api/v1/pods")).withQueryParam("watch", equalTo("false")));
286+
verify(
287+
moreThan(1),
288+
getRequestedFor(urlPathEqualTo("/api/v1/pods")).withQueryParam("watch", equalTo("true")));
289+
factory.stopAllRegisteredInformers();
290+
}
291+
292+
@Test
293+
public void testAllNamespacedPodInformerTransformFailure() throws InterruptedException {
294+
295+
CoreV1Api coreV1Api = new CoreV1Api(client);
296+
297+
String startRV = "1000";
298+
String endRV = "1001";
299+
165300
V1PodList podList =
166301
new V1PodList().metadata(new V1ListMeta().resourceVersion(startRV)).items(Arrays.asList());
167302

@@ -213,6 +348,12 @@ public void testAllNamespacedPodInformerNormalBehavior() throws InterruptedExcep
213348
},
214349
V1Pod.class,
215350
V1PodList.class);
351+
352+
podInformer.setTransform(
353+
(obj) -> {
354+
throw new ObjectTransformException("test transform failure");
355+
});
356+
216357
AtomicBoolean foundExistingPod = new AtomicBoolean(false);
217358
podInformer.addEventHandler(
218359
new ResourceEventHandler<V1Pod>() {
@@ -233,7 +374,8 @@ public void onDelete(V1Pod obj, boolean deletedFinalStateUnknown) {}
233374
factory.startAllRegisteredInformers();
234375
Thread.sleep(1000);
235376

236-
assertEquals(true, foundExistingPod.get());
377+
// cannot find the pod due to transform failure
378+
assertFalse(foundExistingPod.get());
237379
assertEquals(endRV, podInformer.lastSyncResourceVersion());
238380

239381
verify(

0 commit comments

Comments
 (0)