Skip to content

Commit 7cc13f6

Browse files
committed
feat: use a predicate to select the custom resource for which a reconcile should be triggered #430
1 parent c17663b commit 7cc13f6

File tree

9 files changed

+240
-4
lines changed

9 files changed

+240
-4
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/CustomResourceCache.java

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,15 @@
66
import com.fasterxml.jackson.core.JsonProcessingException;
77
import com.fasterxml.jackson.databind.ObjectMapper;
88
import io.fabric8.kubernetes.client.CustomResource;
9+
import java.util.List;
910
import java.util.Optional;
11+
import java.util.Set;
1012
import java.util.concurrent.ConcurrentHashMap;
1113
import java.util.concurrent.ConcurrentMap;
1214
import java.util.concurrent.locks.Lock;
1315
import java.util.concurrent.locks.ReentrantLock;
1416
import java.util.function.Predicate;
17+
import java.util.stream.Collectors;
1518
import org.slf4j.Logger;
1619
import org.slf4j.LoggerFactory;
1720

@@ -64,6 +67,30 @@ public Optional<CustomResource> getLatestResource(String uuid) {
6467
return Optional.ofNullable(resources.get(uuid)).map(this::clone);
6568
}
6669

70+
public List<CustomResource> getLatestResources(Predicate<CustomResource> selector) {
71+
try {
72+
lock.lock();
73+
return resources.values().stream()
74+
.filter(selector)
75+
.map(this::clone)
76+
.collect(Collectors.toList());
77+
} finally {
78+
lock.unlock();
79+
}
80+
}
81+
82+
public Set<String> getLatestResourcesUids(Predicate<CustomResource> selector) {
83+
try {
84+
lock.lock();
85+
return resources.values().stream()
86+
.filter(selector)
87+
.map(r -> r.getMetadata().getUid())
88+
.collect(Collectors.toSet());
89+
} finally {
90+
lock.unlock();
91+
}
92+
}
93+
6794
private CustomResource clone(CustomResource customResource) {
6895
try {
6996
return objectMapper.readValue(

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
import io.javaoperatorsdk.operator.processing.retry.RetryExecution;
1919
import java.util.HashMap;
2020
import java.util.HashSet;
21+
import java.util.List;
2122
import java.util.Map;
2223
import java.util.Optional;
2324
import java.util.Set;
@@ -108,13 +109,28 @@ public void handleEvent(Event event) {
108109
try {
109110
lock.lock();
110111
log.debug("Received event: {}", event);
111-
eventBuffer.addEvent(event);
112-
executeBufferedEvents(event.getRelatedCustomResourceUid());
112+
113+
if (event.getRelatedCustomResourceUid() != null) {
114+
eventBuffer.addEvent(event.getRelatedCustomResourceUid(), event);
115+
executeBufferedEvents(event.getRelatedCustomResourceUid());
116+
} else if (event.getCustomResourcesSelector() != null) {
117+
for (String uid :
118+
eventSourceManager.getLatestResourceUids(event.getCustomResourcesSelector())) {
119+
eventBuffer.addEvent(uid, event);
120+
executeBufferedEvents(uid);
121+
}
122+
}
113123
} finally {
114124
lock.unlock();
115125
}
116126
}
117127

128+
private void executeBufferedEvents(List<String> customResourceUids) {
129+
for (String customResourceUid : customResourceUids) {
130+
executeBufferedEvents(customResourceUid);
131+
}
132+
}
133+
118134
private void executeBufferedEvents(String customResourceUid) {
119135
boolean newEventForResourceId = eventBuffer.containsEvents(customResourceUid);
120136
boolean controllerUnderExecution = isControllerUnderExecution(customResourceUid);

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/EventBuffer.java

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,22 @@
66
import java.util.HashMap;
77
import java.util.List;
88
import java.util.Map;
9+
import java.util.Objects;
910

1011
class EventBuffer {
1112

1213
private final Map<String, List<Event>> events = new HashMap<>();
1314

15+
/** @deprecated use {@link #addEvent(String, Event)} */
16+
@Deprecated
1417
public void addEvent(Event event) {
15-
String uid = event.getRelatedCustomResourceUid();
18+
addEvent(event.getRelatedCustomResourceUid(), event);
19+
}
20+
21+
public void addEvent(String uid, Event event) {
22+
Objects.requireNonNull(uid, "uid");
23+
Objects.requireNonNull(event, "event");
24+
1625
List<Event> crEvents = events.computeIfAbsent(uid, (id) -> new ArrayList<>(1));
1726
crEvents.add(event);
1827
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/AbstractEvent.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,24 @@
11
package io.javaoperatorsdk.operator.processing.event;
22

3+
import io.fabric8.kubernetes.client.CustomResource;
4+
import java.util.function.Predicate;
5+
6+
@SuppressWarnings("rawtypes")
37
public abstract class AbstractEvent implements Event {
48

59
private final String relatedCustomResourceUid;
6-
10+
private final Predicate<CustomResource> customResourcesSelector;
711
private final EventSource eventSource;
812

913
public AbstractEvent(String relatedCustomResourceUid, EventSource eventSource) {
1014
this.relatedCustomResourceUid = relatedCustomResourceUid;
15+
this.customResourcesSelector = null;
16+
this.eventSource = eventSource;
17+
}
18+
19+
public AbstractEvent(Predicate<CustomResource> customResourcesSelector, EventSource eventSource) {
20+
this.relatedCustomResourceUid = null;
21+
this.customResourcesSelector = customResourcesSelector;
1122
this.eventSource = eventSource;
1223
}
1324

@@ -16,6 +27,10 @@ public String getRelatedCustomResourceUid() {
1627
return relatedCustomResourceUid;
1728
}
1829

30+
public Predicate<CustomResource> getCustomResourcesSelector() {
31+
return customResourcesSelector;
32+
}
33+
1934
@Override
2035
public EventSource getEventSource() {
2136
return eventSource;
@@ -27,6 +42,8 @@ public String toString() {
2742
+ this.getClass().getName()
2843
+ ", relatedCustomResourceUid="
2944
+ relatedCustomResourceUid
45+
+ ", customResourcesSelector="
46+
+ customResourcesSelector
3047
+ ", eventSource="
3148
+ eventSource
3249
+ " }";
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package io.javaoperatorsdk.operator.processing.event;
2+
3+
import io.fabric8.kubernetes.client.CustomResource;
4+
import java.util.function.Predicate;
5+
6+
@SuppressWarnings("rawtypes")
7+
public class DefaultEvent extends AbstractEvent {
8+
9+
public DefaultEvent(String relatedCustomResourceUid, EventSource eventSource) {
10+
super(relatedCustomResourceUid, eventSource);
11+
}
12+
13+
public DefaultEvent(Predicate<CustomResource> customResourcesSelector, EventSource eventSource) {
14+
super(customResourcesSelector, eventSource);
15+
}
16+
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,11 @@
1212
import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEventSource;
1313
import io.javaoperatorsdk.operator.processing.event.internal.TimerEventSource;
1414
import java.util.Collections;
15+
import java.util.List;
1516
import java.util.Map;
1617
import java.util.Objects;
1718
import java.util.Optional;
19+
import java.util.Set;
1820
import java.util.concurrent.ConcurrentHashMap;
1921
import java.util.concurrent.locks.ReentrantLock;
2022
import java.util.function.Predicate;
@@ -163,6 +165,16 @@ public Optional<CustomResource> getLatestResource(String customResourceUid) {
163165
return getCache().getLatestResource(customResourceUid);
164166
}
165167

168+
// todo: remove
169+
public List<CustomResource> getLatestResources(Predicate<CustomResource> selector) {
170+
return getCache().getLatestResources(selector);
171+
}
172+
173+
// todo: remove
174+
public Set<String> getLatestResourceUids(Predicate<CustomResource> selector) {
175+
return getCache().getLatestResourcesUids(selector);
176+
}
177+
166178
// todo: remove
167179
public void cacheResource(CustomResource resource, Predicate<CustomResource> predicate) {
168180
getCache().cacheResource(resource, predicate);
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,24 @@
11
package io.javaoperatorsdk.operator.processing.event;
22

3+
import io.fabric8.kubernetes.client.CustomResource;
4+
import java.util.function.Predicate;
5+
36
public interface Event {
47

8+
/**
9+
* @return the UID of the the {@link CustomResource} for which a reconcile loop should be
10+
* triggered.
11+
* @deprecated use {@link #getCustomResourcesSelector()}
12+
*/
13+
@Deprecated
514
String getRelatedCustomResourceUid();
615

16+
/**
17+
* The selector used to determine the {@link CustomResource} for which a reconcile loop should be
18+
* triggered.
19+
*/
20+
Predicate<CustomResource> getCustomResourcesSelector();
21+
22+
/** @return the {@link EventSource} that has generated the event. */
723
EventSource getEventSource();
824
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,120 @@
1+
package io.javaoperatorsdk.operator.processing;
2+
3+
import static io.javaoperatorsdk.operator.TestUtils.testCustomResource;
4+
import static org.assertj.core.api.Assertions.assertThat;
5+
import static org.mockito.Mockito.any;
6+
import static org.mockito.Mockito.doAnswer;
7+
import static org.mockito.Mockito.doCallRealMethod;
8+
import static org.mockito.Mockito.mock;
9+
import static org.mockito.Mockito.timeout;
10+
import static org.mockito.Mockito.verify;
11+
import static org.mockito.Mockito.when;
12+
13+
import io.fabric8.kubernetes.client.Watcher;
14+
import io.javaoperatorsdk.operator.api.config.ConfigurationService;
15+
import io.javaoperatorsdk.operator.processing.event.DefaultEvent;
16+
import io.javaoperatorsdk.operator.processing.event.DefaultEventSourceManager;
17+
import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEvent;
18+
import io.javaoperatorsdk.operator.sample.simple.TestCustomResource;
19+
import java.util.Objects;
20+
import java.util.UUID;
21+
import org.junit.jupiter.api.BeforeEach;
22+
import org.junit.jupiter.api.Test;
23+
import org.mockito.ArgumentCaptor;
24+
25+
class CustomResourceSelectorTest {
26+
public static final int FAKE_CONTROLLER_EXECUTION_DURATION = 250;
27+
public static final int SEPARATE_EXECUTION_TIMEOUT = 450;
28+
29+
private final EventDispatcher eventDispatcherMock = mock(EventDispatcher.class);
30+
private final CustomResourceCache customResourceCache = new CustomResourceCache();
31+
32+
private final DefaultEventSourceManager defaultEventSourceManagerMock =
33+
mock(DefaultEventSourceManager.class);
34+
35+
private final DefaultEventHandler defaultEventHandler =
36+
new DefaultEventHandler(
37+
eventDispatcherMock,
38+
"Test",
39+
null,
40+
ConfigurationService.DEFAULT_RECONCILIATION_THREADS_NUMBER);
41+
42+
@BeforeEach
43+
public void setup() {
44+
defaultEventHandler.setEventSourceManager(defaultEventSourceManagerMock);
45+
46+
// todo: remove
47+
when(defaultEventSourceManagerMock.getCache()).thenReturn(customResourceCache);
48+
doCallRealMethod().when(defaultEventSourceManagerMock).getLatestResource(any());
49+
doCallRealMethod().when(defaultEventSourceManagerMock).getLatestResources(any());
50+
doCallRealMethod().when(defaultEventSourceManagerMock).getLatestResourceUids(any());
51+
doCallRealMethod().when(defaultEventSourceManagerMock).cacheResource(any(), any());
52+
doAnswer(
53+
invocation -> {
54+
final var resourceId = (String) invocation.getArgument(0);
55+
customResourceCache.cleanup(resourceId);
56+
return null;
57+
})
58+
.when(defaultEventSourceManagerMock)
59+
.cleanup(any());
60+
}
61+
62+
@Test
63+
public void withPredicate() {
64+
TestCustomResource cr1 = testCustomResource(UUID.randomUUID().toString());
65+
cr1.getSpec().setValue("1");
66+
TestCustomResource cr2 = testCustomResource(UUID.randomUUID().toString());
67+
cr2.getSpec().setValue("2");
68+
TestCustomResource cr3 = testCustomResource(UUID.randomUUID().toString());
69+
cr3.getSpec().setValue("3");
70+
71+
customResourceCache.cacheResource(cr1);
72+
customResourceCache.cacheResource(cr2);
73+
customResourceCache.cacheResource(cr3);
74+
75+
defaultEventHandler.handleEvent(
76+
new DefaultEvent(
77+
c -> {
78+
var tcr = ((TestCustomResource) c);
79+
return Objects.equals("1", tcr.getSpec().getValue())
80+
|| Objects.equals("3", tcr.getSpec().getValue());
81+
},
82+
null));
83+
84+
verify(eventDispatcherMock, timeout(SEPARATE_EXECUTION_TIMEOUT).times(2))
85+
.handleExecution(any());
86+
87+
waitMinimalTime();
88+
89+
ArgumentCaptor<ExecutionScope> executionScopeArgumentCaptor =
90+
ArgumentCaptor.forClass(ExecutionScope.class);
91+
92+
verify(eventDispatcherMock, timeout(SEPARATE_EXECUTION_TIMEOUT).times(2))
93+
.handleExecution(executionScopeArgumentCaptor.capture());
94+
95+
assertThat(executionScopeArgumentCaptor.getAllValues())
96+
.hasSize(2)
97+
.allSatisfy(
98+
s -> {
99+
assertThat(s.getEvents()).isNotEmpty().hasOnlyElementsOfType(DefaultEvent.class);
100+
});
101+
}
102+
103+
private void waitMinimalTime() {
104+
try {
105+
Thread.sleep(1000);
106+
} catch (InterruptedException e) {
107+
throw new IllegalStateException(e);
108+
}
109+
}
110+
111+
private CustomResourceEvent prepareCREvent() {
112+
return prepareCREvent(UUID.randomUUID().toString());
113+
}
114+
115+
private CustomResourceEvent prepareCREvent(String uid) {
116+
TestCustomResource customResource = testCustomResource(uid);
117+
customResourceCache.cacheResource(customResource);
118+
return new CustomResourceEvent(Watcher.Action.MODIFIED, customResource, null);
119+
}
120+
}

operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,9 @@ public void setup() {
6969
// todo: remove
7070
when(defaultEventSourceManagerMock.getCache()).thenReturn(customResourceCache);
7171
doCallRealMethod().when(defaultEventSourceManagerMock).getLatestResource(any());
72+
doCallRealMethod().when(defaultEventSourceManagerMock).getLatestResource(any());
73+
doCallRealMethod().when(defaultEventSourceManagerMock).getLatestResources(any());
74+
doCallRealMethod().when(defaultEventSourceManagerMock).getLatestResourceUids(any());
7275
doCallRealMethod().when(defaultEventSourceManagerMock).cacheResource(any(), any());
7376
doAnswer(
7477
invocation -> {

0 commit comments

Comments
 (0)