diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/CustomResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/CustomResourceCache.java index c5ad134a90..95f85d26d0 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/CustomResourceCache.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/CustomResourceCache.java @@ -6,12 +6,15 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; import io.fabric8.kubernetes.client.CustomResource; +import java.util.List; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Predicate; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -64,6 +67,30 @@ public Optional getLatestResource(String uuid) { return Optional.ofNullable(resources.get(uuid)).map(this::clone); } + public List getLatestResources(Predicate selector) { + try { + lock.lock(); + return resources.values().stream() + .filter(selector) + .map(this::clone) + .collect(Collectors.toList()); + } finally { + lock.unlock(); + } + } + + public Set getLatestResourcesUids(Predicate selector) { + try { + lock.lock(); + return resources.values().stream() + .filter(selector) + .map(r -> r.getMetadata().getUid()) + .collect(Collectors.toSet()); + } finally { + lock.unlock(); + } + } + private CustomResource clone(CustomResource customResource) { try { return objectMapper.readValue( diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java index fb3bb023c7..c25ceba4e2 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java @@ -19,11 +19,13 @@ import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Predicate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -108,8 +110,19 @@ public void handleEvent(Event event) { try { lock.lock(); log.debug("Received event: {}", event); - eventBuffer.addEvent(event); - executeBufferedEvents(event.getRelatedCustomResourceUid()); + + Predicate selector = event.getCustomResourcesSelector(); + if (selector == null) { + final String uid = + Objects.requireNonNull(event.getRelatedCustomResourceUid(), "CustomResource UID"); + + selector = customResource -> Objects.equals(uid, customResource.getMetadata().getUid()); + } + + for (String uid : eventSourceManager.getLatestResourceUids(selector)) { + eventBuffer.addEvent(uid, event); + executeBufferedEvents(uid); + } } finally { lock.unlock(); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/EventBuffer.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/EventBuffer.java index db6a82fc1b..e2b383545a 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/EventBuffer.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/EventBuffer.java @@ -6,13 +6,22 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; class EventBuffer { private final Map> events = new HashMap<>(); + /** @deprecated use {@link #addEvent(String, Event)} */ + @Deprecated public void addEvent(Event event) { - String uid = event.getRelatedCustomResourceUid(); + addEvent(event.getRelatedCustomResourceUid(), event); + } + + public void addEvent(String uid, Event event) { + Objects.requireNonNull(uid, "uid"); + Objects.requireNonNull(event, "event"); + List crEvents = events.computeIfAbsent(uid, (id) -> new ArrayList<>(1)); crEvents.add(event); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/AbstractEvent.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/AbstractEvent.java index 79bd68e4d5..638dd802e6 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/AbstractEvent.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/AbstractEvent.java @@ -1,13 +1,24 @@ package io.javaoperatorsdk.operator.processing.event; +import io.fabric8.kubernetes.client.CustomResource; +import java.util.function.Predicate; + +@SuppressWarnings("rawtypes") public abstract class AbstractEvent implements Event { private final String relatedCustomResourceUid; - + private final Predicate customResourcesSelector; private final EventSource eventSource; public AbstractEvent(String relatedCustomResourceUid, EventSource eventSource) { this.relatedCustomResourceUid = relatedCustomResourceUid; + this.customResourcesSelector = null; + this.eventSource = eventSource; + } + + public AbstractEvent(Predicate customResourcesSelector, EventSource eventSource) { + this.relatedCustomResourceUid = null; + this.customResourcesSelector = customResourcesSelector; this.eventSource = eventSource; } @@ -16,6 +27,10 @@ public String getRelatedCustomResourceUid() { return relatedCustomResourceUid; } + public Predicate getCustomResourcesSelector() { + return customResourcesSelector; + } + @Override public EventSource getEventSource() { return eventSource; @@ -27,6 +42,8 @@ public String toString() { + this.getClass().getName() + ", relatedCustomResourceUid=" + relatedCustomResourceUid + + ", customResourcesSelector=" + + customResourcesSelector + ", eventSource=" + eventSource + " }"; diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java index b8111e7053..84900cbf8f 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java @@ -12,9 +12,11 @@ import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEventSource; import io.javaoperatorsdk.operator.processing.event.internal.TimerEventSource; import java.util.Collections; +import java.util.List; import java.util.Map; import java.util.Objects; import java.util.Optional; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantLock; import java.util.function.Predicate; @@ -163,6 +165,16 @@ public Optional getLatestResource(String customResourceUid) { return getCache().getLatestResource(customResourceUid); } + // todo: remove + public List getLatestResources(Predicate selector) { + return getCache().getLatestResources(selector); + } + + // todo: remove + public Set getLatestResourceUids(Predicate selector) { + return getCache().getLatestResourcesUids(selector); + } + // todo: remove public void cacheResource(CustomResource resource, Predicate predicate) { getCache().cacheResource(resource, predicate); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/Event.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/Event.java index 5af3b47eb6..a43531fb10 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/Event.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/Event.java @@ -1,8 +1,24 @@ package io.javaoperatorsdk.operator.processing.event; +import io.fabric8.kubernetes.client.CustomResource; +import java.util.function.Predicate; + public interface Event { + /** + * @return the UID of the the {@link CustomResource} for which a reconcile loop should be + * triggered. + * @deprecated use {@link #getCustomResourcesSelector()} + */ + @Deprecated String getRelatedCustomResourceUid(); + /** + * The selector used to determine the {@link CustomResource} for which a reconcile loop should be + * triggered. + */ + Predicate getCustomResourcesSelector(); + + /** @return the {@link EventSource} that has generated the event. */ EventSource getEventSource(); } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/CustomResourceSelectorTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/CustomResourceSelectorTest.java new file mode 100644 index 0000000000..376c0118d6 --- /dev/null +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/CustomResourceSelectorTest.java @@ -0,0 +1,125 @@ +package io.javaoperatorsdk.operator.processing; + +import static io.javaoperatorsdk.operator.TestUtils.testCustomResource; +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doCallRealMethod; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import io.fabric8.kubernetes.client.Watcher; +import io.javaoperatorsdk.operator.api.config.ConfigurationService; +import io.javaoperatorsdk.operator.processing.event.AbstractEvent; +import io.javaoperatorsdk.operator.processing.event.DefaultEventSourceManager; +import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEvent; +import io.javaoperatorsdk.operator.sample.simple.TestCustomResource; +import java.util.Objects; +import java.util.UUID; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.ArgumentCaptor; + +class CustomResourceSelectorTest { + + public static final int FAKE_CONTROLLER_EXECUTION_DURATION = 250; + public static final int SEPARATE_EXECUTION_TIMEOUT = 450; + + private final EventDispatcher eventDispatcherMock = mock(EventDispatcher.class); + private final CustomResourceCache customResourceCache = new CustomResourceCache(); + + private final DefaultEventSourceManager defaultEventSourceManagerMock = + mock(DefaultEventSourceManager.class); + + private final DefaultEventHandler defaultEventHandler = + new DefaultEventHandler( + eventDispatcherMock, + "Test", + null, + ConfigurationService.DEFAULT_RECONCILIATION_THREADS_NUMBER); + + @BeforeEach + public void setup() { + defaultEventHandler.setEventSourceManager(defaultEventSourceManagerMock); + + // todo: remove + when(defaultEventSourceManagerMock.getCache()).thenReturn(customResourceCache); + doCallRealMethod().when(defaultEventSourceManagerMock).getLatestResource(any()); + doCallRealMethod().when(defaultEventSourceManagerMock).getLatestResources(any()); + doCallRealMethod().when(defaultEventSourceManagerMock).getLatestResourceUids(any()); + doCallRealMethod().when(defaultEventSourceManagerMock).cacheResource(any(), any()); + doAnswer( + invocation -> { + final var resourceId = (String) invocation.getArgument(0); + customResourceCache.cleanup(resourceId); + return null; + }) + .when(defaultEventSourceManagerMock) + .cleanup(any()); + } + + @Test + public void dispatchEventsWithPredicate() { + TestCustomResource cr1 = testCustomResource(UUID.randomUUID().toString()); + cr1.getSpec().setValue("1"); + TestCustomResource cr2 = testCustomResource(UUID.randomUUID().toString()); + cr2.getSpec().setValue("2"); + TestCustomResource cr3 = testCustomResource(UUID.randomUUID().toString()); + cr3.getSpec().setValue("3"); + + customResourceCache.cacheResource(cr1); + customResourceCache.cacheResource(cr2); + customResourceCache.cacheResource(cr3); + + defaultEventHandler.handleEvent( + new AbstractEvent( + c -> { + var tcr = ((TestCustomResource) c); + return Objects.equals("1", tcr.getSpec().getValue()) + || Objects.equals("3", tcr.getSpec().getValue()); + }, + null) {}); + + verify(eventDispatcherMock, timeout(SEPARATE_EXECUTION_TIMEOUT).times(2)) + .handleExecution(any()); + + waitMinimalTime(); + + ArgumentCaptor executionScopeArgumentCaptor = + ArgumentCaptor.forClass(ExecutionScope.class); + + verify(eventDispatcherMock, timeout(SEPARATE_EXECUTION_TIMEOUT).times(2)) + .handleExecution(executionScopeArgumentCaptor.capture()); + + assertThat(executionScopeArgumentCaptor.getAllValues()) + .hasSize(2) + .allSatisfy( + s -> { + assertThat(s.getEvents()).isNotEmpty().hasOnlyElementsOfType(AbstractEvent.class); + assertThat(s) + .satisfiesAnyOf( + e -> Objects.equals(cr1.getMetadata().getUid(), e.getCustomResourceUid()), + e -> Objects.equals(cr3.getMetadata().getUid(), e.getCustomResourceUid())); + }); + } + + private void waitMinimalTime() { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + throw new IllegalStateException(e); + } + } + + private CustomResourceEvent prepareCREvent() { + return prepareCREvent(UUID.randomUUID().toString()); + } + + private CustomResourceEvent prepareCREvent(String uid) { + TestCustomResource customResource = testCustomResource(uid); + customResourceCache.cacheResource(customResource); + return new CustomResourceEvent(Watcher.Action.MODIFIED, customResource, null); + } +} diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java index 947ac74be8..fbc63ae1fa 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java @@ -69,6 +69,9 @@ public void setup() { // todo: remove when(defaultEventSourceManagerMock.getCache()).thenReturn(customResourceCache); doCallRealMethod().when(defaultEventSourceManagerMock).getLatestResource(any()); + doCallRealMethod().when(defaultEventSourceManagerMock).getLatestResource(any()); + doCallRealMethod().when(defaultEventSourceManagerMock).getLatestResources(any()); + doCallRealMethod().when(defaultEventSourceManagerMock).getLatestResourceUids(any()); doCallRealMethod().when(defaultEventSourceManagerMock).cacheResource(any(), any()); doAnswer( invocation -> {