Skip to content

Commit 2eb41f7

Browse files
committed
Merge branch 'master' into better-api-for-reschedule
2 parents 34e1830 + 0494505 commit 2eb41f7

File tree

9 files changed

+382
-1
lines changed

9 files changed

+382
-1
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEvent.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public String toString() {
5050
+ " }";
5151
}
5252

53-
private static class UIDMatchingPredicate implements Predicate<CustomResource> {
53+
public static class UIDMatchingPredicate implements Predicate<CustomResource> {
5454
private final String uid;
5555

5656
public UIDMatchingPredicate(String uid) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
package io.javaoperatorsdk.operator.processing.event.internal;
2+
3+
import java.util.Optional;
4+
import java.util.function.Predicate;
5+
6+
import io.fabric8.kubernetes.client.CustomResource;
7+
import io.javaoperatorsdk.operator.processing.event.DefaultEvent;
8+
import io.javaoperatorsdk.operator.processing.event.EventSource;
9+
10+
public class InformerEvent<T> extends DefaultEvent {
11+
12+
private Action action;
13+
private T resource;
14+
private T oldResource;
15+
16+
public InformerEvent(String relatedCustomResourceUid, EventSource eventSource, Action action,
17+
T resource,
18+
T oldResource) {
19+
this(new UIDMatchingPredicate(relatedCustomResourceUid), eventSource, action, resource,
20+
oldResource);
21+
22+
}
23+
24+
public InformerEvent(Predicate<CustomResource> customResourcesSelector, EventSource eventSource,
25+
Action action,
26+
T resource, T oldResource) {
27+
super(customResourcesSelector, eventSource);
28+
this.action = action;
29+
this.resource = resource;
30+
this.oldResource = oldResource;
31+
}
32+
33+
public T getResource() {
34+
return resource;
35+
}
36+
37+
public Optional<T> getOldResource() {
38+
return Optional.ofNullable(oldResource);
39+
}
40+
41+
public Action getAction() {
42+
return action;
43+
}
44+
45+
public enum Action {
46+
ADD, UPDATE, DELETE
47+
}
48+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,117 @@
1+
package io.javaoperatorsdk.operator.processing.event.internal;
2+
3+
import java.io.IOException;
4+
import java.util.Objects;
5+
import java.util.Set;
6+
import java.util.function.Function;
7+
8+
import io.fabric8.kubernetes.api.model.HasMetadata;
9+
import io.fabric8.kubernetes.client.KubernetesClient;
10+
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
11+
import io.fabric8.kubernetes.client.informers.SharedInformer;
12+
import io.fabric8.kubernetes.client.informers.cache.Cache;
13+
import io.fabric8.kubernetes.client.informers.cache.Store;
14+
import io.javaoperatorsdk.operator.processing.event.AbstractEventSource;
15+
16+
public class InformerEventSource<T extends HasMetadata> extends AbstractEventSource {
17+
18+
private final SharedInformer<T> sharedInformer;
19+
private final Function<T, Set<String>> resourceToUIDs;
20+
private final Function<HasMetadata, T> associatedWith;
21+
private final boolean skipUpdateEventPropagationIfNoChange;
22+
23+
public InformerEventSource(SharedInformer<T> sharedInformer,
24+
Function<T, Set<String>> resourceToUIDs) {
25+
this(sharedInformer, resourceToUIDs, null, true);
26+
}
27+
28+
public InformerEventSource(KubernetesClient client, Class<T> type,
29+
Function<T, Set<String>> resourceToUIDs) {
30+
this(client, type, resourceToUIDs, false);
31+
}
32+
33+
InformerEventSource(KubernetesClient client, Class<T> type,
34+
Function<T, Set<String>> resourceToUIDs,
35+
boolean skipUpdateEventPropagationIfNoChange) {
36+
this(client.informers().sharedIndexInformerFor(type, 0), resourceToUIDs, null,
37+
skipUpdateEventPropagationIfNoChange);
38+
}
39+
40+
public InformerEventSource(SharedInformer<T> sharedInformer,
41+
Function<T, Set<String>> resourceToUIDs,
42+
Function<HasMetadata, T> associatedWith,
43+
boolean skipUpdateEventPropagationIfNoChange) {
44+
this.sharedInformer = sharedInformer;
45+
this.resourceToUIDs = resourceToUIDs;
46+
this.skipUpdateEventPropagationIfNoChange = skipUpdateEventPropagationIfNoChange;
47+
48+
this.associatedWith = Objects.requireNonNullElseGet(associatedWith, () -> cr -> {
49+
final var metadata = cr.getMetadata();
50+
return getStore().getByKey(Cache.namespaceKeyFunc(metadata.getNamespace(),
51+
metadata.getName()));
52+
});
53+
54+
sharedInformer.addEventHandler(new ResourceEventHandler<>() {
55+
@Override
56+
public void onAdd(T t) {
57+
propagateEvent(InformerEvent.Action.ADD, t, null);
58+
}
59+
60+
@Override
61+
public void onUpdate(T oldObject, T newObject) {
62+
if (InformerEventSource.this.skipUpdateEventPropagationIfNoChange &&
63+
oldObject.getMetadata().getResourceVersion()
64+
.equals(newObject.getMetadata().getResourceVersion())) {
65+
return;
66+
}
67+
propagateEvent(InformerEvent.Action.UPDATE, newObject, oldObject);
68+
}
69+
70+
@Override
71+
public void onDelete(T t, boolean b) {
72+
propagateEvent(InformerEvent.Action.DELETE, t, null);
73+
}
74+
});
75+
}
76+
77+
private void propagateEvent(InformerEvent.Action action, T object, T oldObject) {
78+
var uids = resourceToUIDs.apply(object);
79+
if (uids.isEmpty()) {
80+
return;
81+
}
82+
uids.forEach(uid -> {
83+
InformerEvent event = new InformerEvent(uid, this, action, object, oldObject);
84+
this.eventHandler.handleEvent(event);
85+
});
86+
}
87+
88+
@Override
89+
public void start() {
90+
sharedInformer.run();
91+
}
92+
93+
@Override
94+
public void close() throws IOException {
95+
sharedInformer.close();
96+
}
97+
98+
public Store<T> getStore() {
99+
return sharedInformer.getStore();
100+
}
101+
102+
/**
103+
* Retrieves the informed resource associated with the specified primary resource as defined by
104+
* the function provided when this InformerEventSource was created
105+
*
106+
* @param resource the primary resource we want to retrieve the associated resource for
107+
* @return the informed resource associated with the specified primary resource
108+
*/
109+
public T getAssociated(HasMetadata resource) {
110+
return associatedWith.apply(resource);
111+
}
112+
113+
114+
public SharedInformer<T> getSharedInformer() {
115+
return sharedInformer;
116+
}
117+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
package io.javaoperatorsdk.operator.processing.event.internal;
2+
3+
import java.util.Collections;
4+
import java.util.Set;
5+
import java.util.function.Function;
6+
7+
import io.fabric8.kubernetes.api.model.HasMetadata;
8+
9+
public class Mappers {
10+
public static <T extends HasMetadata> Function<T, Set<String>> fromAnnotation(
11+
String annotationKey) {
12+
return fromMetadata(annotationKey, false);
13+
}
14+
15+
public static <T extends HasMetadata> Function<T, Set<String>> fromLabel(
16+
String labelKey) {
17+
return fromMetadata(labelKey, true);
18+
}
19+
20+
private static <T extends HasMetadata> Function<T, Set<String>> fromMetadata(
21+
String key, boolean isLabel) {
22+
return resource -> {
23+
final var metadata = resource.getMetadata();
24+
if (metadata == null) {
25+
return Collections.emptySet();
26+
} else {
27+
final var map = isLabel ? metadata.getLabels() : metadata.getAnnotations();
28+
return map != null ? Set.of(map.get(key)) : Collections.emptySet();
29+
}
30+
};
31+
}
32+
}

operator-framework-junit5/src/main/java/io/javaoperatorsdk/operator/junit/OperatorExtension.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -131,6 +131,10 @@ public <T extends HasMetadata> T create(Class<T> type, T resource) {
131131
return kubernetesClient.resources(type).inNamespace(namespace).create(resource);
132132
}
133133

134+
public <T extends HasMetadata> T replace(Class<T> type, T resource) {
135+
return kubernetesClient.resources(type).inNamespace(namespace).replace(resource);
136+
}
137+
134138
@SuppressWarnings("unchecked")
135139
protected void before(ExtensionContext context) {
136140
namespace = context.getRequiredTestClass().getSimpleName();
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package io.javaoperatorsdk.operator;
2+
3+
import java.util.HashMap;
4+
import java.util.concurrent.TimeUnit;
5+
6+
import org.junit.jupiter.api.Test;
7+
import org.junit.jupiter.api.extension.RegisterExtension;
8+
9+
import io.fabric8.kubernetes.api.model.ConfigMap;
10+
import io.fabric8.kubernetes.api.model.ObjectMeta;
11+
import io.javaoperatorsdk.operator.config.runtime.DefaultConfigurationService;
12+
import io.javaoperatorsdk.operator.junit.OperatorExtension;
13+
import io.javaoperatorsdk.operator.sample.informereventsource.InformerEventSourceTestCustomResource;
14+
import io.javaoperatorsdk.operator.sample.informereventsource.InformerEventSourceTestCustomResourceController;
15+
16+
import static io.javaoperatorsdk.operator.sample.informereventsource.InformerEventSourceTestCustomResourceController.RELATED_RESOURCE_UID;
17+
import static io.javaoperatorsdk.operator.sample.informereventsource.InformerEventSourceTestCustomResourceController.TARGET_CONFIG_MAP_KEY;
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
import static org.awaitility.Awaitility.await;
20+
21+
public class InformerEventSourceIT {
22+
23+
public static final String RESOURCE_NAME = "informertestcr";
24+
public static final String INITIAL_STATUS_MESSAGE = "Initial Status";
25+
public static final String UPDATE_STATUS_MESSAGE = "Updated Status";
26+
27+
@RegisterExtension
28+
OperatorExtension operator =
29+
OperatorExtension.builder()
30+
.withConfigurationService(DefaultConfigurationService.instance())
31+
.withController(new InformerEventSourceTestCustomResourceController())
32+
.build();
33+
34+
@Test
35+
public void testUsingInformerToWatchChangesOfConfigMap() {
36+
var customResource = initialCustomResource();
37+
customResource = operator.create(InformerEventSourceTestCustomResource.class, customResource);
38+
ConfigMap configMap =
39+
operator.create(ConfigMap.class, relatedConfigMap(customResource.getMetadata().getUid()));
40+
waitForCRStatusValue(INITIAL_STATUS_MESSAGE);
41+
42+
configMap.getData().put(TARGET_CONFIG_MAP_KEY, UPDATE_STATUS_MESSAGE);
43+
operator.replace(ConfigMap.class, configMap);
44+
45+
waitForCRStatusValue(UPDATE_STATUS_MESSAGE);
46+
}
47+
48+
private ConfigMap relatedConfigMap(String relatedResourceAnnotation) {
49+
ConfigMap configMap = new ConfigMap();
50+
51+
ObjectMeta objectMeta = new ObjectMeta();
52+
objectMeta.setName(RESOURCE_NAME);
53+
objectMeta.setAnnotations(new HashMap<>());
54+
objectMeta.getAnnotations().put(RELATED_RESOURCE_UID, relatedResourceAnnotation);
55+
configMap.setMetadata(objectMeta);
56+
57+
configMap.setData(new HashMap<>());
58+
configMap.getData().put(TARGET_CONFIG_MAP_KEY, INITIAL_STATUS_MESSAGE);
59+
return configMap;
60+
}
61+
62+
private InformerEventSourceTestCustomResource initialCustomResource() {
63+
var customResource = new InformerEventSourceTestCustomResource();
64+
ObjectMeta objectMeta = new ObjectMeta();
65+
objectMeta.setName(RESOURCE_NAME);
66+
customResource.setMetadata(objectMeta);
67+
return customResource;
68+
}
69+
70+
private void waitForCRStatusValue(String value) {
71+
await().atMost(10, TimeUnit.SECONDS).untilAsserted(() -> {
72+
var cr =
73+
operator.getNamedResource(InformerEventSourceTestCustomResource.class, RESOURCE_NAME);
74+
assertThat(cr.getStatus()).isNotNull();
75+
assertThat(cr.getStatus().getConfigMapValue()).isEqualTo(value);
76+
});
77+
}
78+
79+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
package io.javaoperatorsdk.operator.sample.informereventsource;
2+
3+
import io.fabric8.kubernetes.api.model.Namespaced;
4+
import io.fabric8.kubernetes.client.CustomResource;
5+
import io.fabric8.kubernetes.model.annotation.Group;
6+
import io.fabric8.kubernetes.model.annotation.Kind;
7+
import io.fabric8.kubernetes.model.annotation.ShortNames;
8+
import io.fabric8.kubernetes.model.annotation.Version;
9+
10+
@Group("sample.javaoperatorsdk")
11+
@Version("v1")
12+
@Kind("Informereventsourcesample")
13+
@ShortNames("ies")
14+
public class InformerEventSourceTestCustomResource extends
15+
CustomResource<Void, InformerEventSourceTestCustomResourceStatus>
16+
implements Namespaced {
17+
18+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
package io.javaoperatorsdk.operator.sample.informereventsource;
2+
3+
import org.slf4j.Logger;
4+
import org.slf4j.LoggerFactory;
5+
6+
import io.fabric8.kubernetes.api.model.ConfigMap;
7+
import io.fabric8.kubernetes.client.KubernetesClient;
8+
import io.javaoperatorsdk.operator.api.Context;
9+
import io.javaoperatorsdk.operator.api.Controller;
10+
import io.javaoperatorsdk.operator.api.ResourceController;
11+
import io.javaoperatorsdk.operator.api.UpdateControl;
12+
import io.javaoperatorsdk.operator.junit.KubernetesClientAware;
13+
import io.javaoperatorsdk.operator.processing.event.EventSourceManager;
14+
import io.javaoperatorsdk.operator.processing.event.internal.InformerEventSource;
15+
import io.javaoperatorsdk.operator.processing.event.internal.Mappers;
16+
17+
import static io.javaoperatorsdk.operator.api.Controller.NO_FINALIZER;
18+
19+
/**
20+
* Copies the config map value from spec into status. The main purpose is to test and demonstrate
21+
* sample usage of InformerEventSource
22+
*/
23+
@Controller(finalizerName = NO_FINALIZER)
24+
public class InformerEventSourceTestCustomResourceController implements
25+
ResourceController<InformerEventSourceTestCustomResource>, KubernetesClientAware {
26+
27+
private static final Logger LOGGER =
28+
LoggerFactory.getLogger(InformerEventSourceTestCustomResourceController.class);
29+
30+
public static final String RELATED_RESOURCE_UID = "relatedResourceUID";
31+
public static final String TARGET_CONFIG_MAP_KEY = "targetStatus";
32+
33+
private KubernetesClient kubernetesClient;
34+
private InformerEventSource<ConfigMap> eventSource;
35+
36+
@Override
37+
public void init(EventSourceManager eventSourceManager) {
38+
eventSource = new InformerEventSource<>(kubernetesClient, ConfigMap.class,
39+
Mappers.fromAnnotation(RELATED_RESOURCE_UID));
40+
eventSourceManager.registerEventSource("configmap", eventSource);
41+
}
42+
43+
@Override
44+
public UpdateControl<InformerEventSourceTestCustomResource> createOrUpdateResource(
45+
InformerEventSourceTestCustomResource resource,
46+
Context<InformerEventSourceTestCustomResource> context) {
47+
48+
// Reading the config map from the informer not from the API
49+
// name of the config map same as custom resource for sake of simplicity
50+
ConfigMap configMap = eventSource.getAssociated(resource);
51+
52+
String targetStatus = configMap.getData().get(TARGET_CONFIG_MAP_KEY);
53+
LOGGER.debug("Setting target status for CR: {}", targetStatus);
54+
resource.setStatus(new InformerEventSourceTestCustomResourceStatus());
55+
resource.getStatus().setConfigMapValue(targetStatus);
56+
return UpdateControl.updateStatusSubResource(resource);
57+
}
58+
59+
@Override
60+
public KubernetesClient getKubernetesClient() {
61+
return kubernetesClient;
62+
}
63+
64+
@Override
65+
public void setKubernetesClient(KubernetesClient kubernetesClient) {
66+
this.kubernetesClient = kubernetesClient;
67+
}
68+
}

0 commit comments

Comments
 (0)