Skip to content

Commit 1bc98bd

Browse files
committed
event source: use a predicate to select the custom resource for which a reconcile should be triggered operator-framework#430
1 parent c17663b commit 1bc98bd

File tree

5 files changed

+89
-2
lines changed

5 files changed

+89
-2
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/CustomResourceCache.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,15 @@
66
import com.fasterxml.jackson.core.JsonProcessingException;
77
import com.fasterxml.jackson.databind.ObjectMapper;
88
import io.fabric8.kubernetes.client.CustomResource;
9+
import java.util.List;
910
import java.util.Optional;
11+
import java.util.Set;
1012
import java.util.concurrent.ConcurrentHashMap;
1113
import java.util.concurrent.ConcurrentMap;
1214
import java.util.concurrent.locks.Lock;
1315
import java.util.concurrent.locks.ReentrantLock;
1416
import java.util.function.Predicate;
17+
import java.util.stream.Collectors;
1518
import org.slf4j.Logger;
1619
import org.slf4j.LoggerFactory;
1720

@@ -64,6 +67,30 @@ public Optional<CustomResource> getLatestResource(String uuid) {
6467
return Optional.ofNullable(resources.get(uuid)).map(this::clone);
6568
}
6669

70+
public List<CustomResource> getLatestResources(Predicate<CustomResource> selector) {
71+
try {
72+
lock.lock();
73+
return resources.values().stream()
74+
.filter(selector)
75+
.map(this::clone)
76+
.collect(Collectors.toList());
77+
} finally {
78+
lock.unlock();
79+
}
80+
}
81+
82+
public Set<String> getLatestResourcesUids(Predicate<CustomResource> selector) {
83+
try {
84+
lock.lock();
85+
return resources.values().stream()
86+
.filter(selector)
87+
.map(r -> r.getMetadata().getUid())
88+
.collect(Collectors.toSet());
89+
} finally {
90+
lock.unlock();
91+
}
92+
}
93+
6794
private CustomResource clone(CustomResource customResource) {
6895
try {
6996
return objectMapper.readValue(

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import io.javaoperatorsdk.operator.processing.retry.RetryExecution;
1919
import java.util.HashMap;
2020
import java.util.HashSet;
21+
import java.util.List;
2122
import java.util.Map;
2223
import java.util.Optional;
2324
import java.util.Set;
@@ -109,12 +110,26 @@ public void handleEvent(Event event) {
109110
lock.lock();
110111
log.debug("Received event: {}", event);
111112
eventBuffer.addEvent(event);
112-
executeBufferedEvents(event.getRelatedCustomResourceUid());
113+
114+
if (event.getRelatedCustomResourceUid() != null) {
115+
executeBufferedEvents(event.getRelatedCustomResourceUid());
116+
} else if (event.getCustomResourcesSelector() != null) {
117+
for (String uid :
118+
eventSourceManager.getLatestResourceUids(event.getCustomResourcesSelector())) {
119+
executeBufferedEvents(uid);
120+
}
121+
}
113122
} finally {
114123
lock.unlock();
115124
}
116125
}
117126

127+
private void executeBufferedEvents(List<String> customResourceUids) {
128+
for (String customResourceUid : customResourceUids) {
129+
executeBufferedEvents(customResourceUid);
130+
}
131+
}
132+
118133
private void executeBufferedEvents(String customResourceUid) {
119134
boolean newEventForResourceId = eventBuffer.containsEvents(customResourceUid);
120135
boolean controllerUnderExecution = isControllerUnderExecution(customResourceUid);

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/AbstractEvent.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,24 @@
11
package io.javaoperatorsdk.operator.processing.event;
22

3+
import io.fabric8.kubernetes.client.CustomResource;
4+
import java.util.function.Predicate;
5+
6+
@SuppressWarnings("rawtypes")
37
public abstract class AbstractEvent implements Event {
48

59
private final String relatedCustomResourceUid;
6-
10+
private final Predicate<CustomResource> customResourcesSelector;
711
private final EventSource eventSource;
812

913
public AbstractEvent(String relatedCustomResourceUid, EventSource eventSource) {
1014
this.relatedCustomResourceUid = relatedCustomResourceUid;
15+
this.customResourcesSelector = null;
16+
this.eventSource = eventSource;
17+
}
18+
19+
public AbstractEvent(Predicate<CustomResource> customResourcesSelector, EventSource eventSource) {
20+
this.relatedCustomResourceUid = null;
21+
this.customResourcesSelector = customResourcesSelector;
1122
this.eventSource = eventSource;
1223
}
1324

@@ -16,6 +27,10 @@ public String getRelatedCustomResourceUid() {
1627
return relatedCustomResourceUid;
1728
}
1829

30+
public Predicate<CustomResource> getCustomResourcesSelector() {
31+
return customResourcesSelector;
32+
}
33+
1934
@Override
2035
public EventSource getEventSource() {
2136
return eventSource;
@@ -27,6 +42,8 @@ public String toString() {
2742
+ this.getClass().getName()
2843
+ ", relatedCustomResourceUid="
2944
+ relatedCustomResourceUid
45+
+ ", customResourcesSelector="
46+
+ customResourcesSelector
3047
+ ", eventSource="
3148
+ eventSource
3249
+ " }";

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,11 @@
1212
import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEventSource;
1313
import io.javaoperatorsdk.operator.processing.event.internal.TimerEventSource;
1414
import java.util.Collections;
15+
import java.util.List;
1516
import java.util.Map;
1617
import java.util.Objects;
1718
import java.util.Optional;
19+
import java.util.Set;
1820
import java.util.concurrent.ConcurrentHashMap;
1921
import java.util.concurrent.locks.ReentrantLock;
2022
import java.util.function.Predicate;
@@ -163,6 +165,16 @@ public Optional<CustomResource> getLatestResource(String customResourceUid) {
163165
return getCache().getLatestResource(customResourceUid);
164166
}
165167

168+
// todo: remove
169+
public List<CustomResource> getLatestResources(Predicate<CustomResource> selector) {
170+
return getCache().getLatestResources(selector);
171+
}
172+
173+
// todo: remove
174+
public Set<String> getLatestResourceUids(Predicate<CustomResource> selector) {
175+
return getCache().getLatestResourcesUids(selector);
176+
}
177+
166178
// todo: remove
167179
public void cacheResource(CustomResource resource, Predicate<CustomResource> predicate) {
168180
getCache().cacheResource(resource, predicate);
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,24 @@
11
package io.javaoperatorsdk.operator.processing.event;
22

3+
import io.fabric8.kubernetes.client.CustomResource;
4+
import java.util.function.Predicate;
5+
36
public interface Event {
47

8+
/**
9+
* @return the UID of the the {@link CustomResource} for which a reconcile loop should be
10+
* triggered.
11+
* @deprecated use {@link #getCustomResourcesSelector()}
12+
*/
13+
@Deprecated
514
String getRelatedCustomResourceUid();
615

16+
/**
17+
* The selector used to determine the {@link CustomResource} for which a reconcile loop should be
18+
* triggered.
19+
*/
20+
Predicate<CustomResource> getCustomResourcesSelector();
21+
22+
/** @return the {@link EventSource} that has generated the event. */
723
EventSource getEventSource();
824
}

0 commit comments

Comments
 (0)