From e6c6b70801ba2e1e1ea2d762b2cdb3cd372e0420 Mon Sep 17 00:00:00 2001 From: csviri Date: Thu, 14 Oct 2021 13:04:26 +0200 Subject: [PATCH 1/8] fix: cache handling on update --- .../processing/DefaultEventHandler.java | 45 ++++++++++++++--- .../event/DefaultEventSourceManager.java | 2 +- .../CacheSyncNotificationEventFilter.java | 49 +++++++++++++++++++ .../internal/CustomResourceEventSource.java | 35 ++++++++++--- 4 files changed, 116 insertions(+), 15 deletions(-) create mode 100644 operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CacheSyncNotificationEventFilter.java diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java index 608a47567b..dfa1491855 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java @@ -27,6 +27,7 @@ import io.javaoperatorsdk.operator.processing.retry.RetryExecution; import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getName; +import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getVersion; /** * Event handler that makes sure that events are processed in a "single threaded" way per resource @@ -188,18 +189,18 @@ void eventProcessingFinished( if (!running) { return; } - + CustomResourceID customResourceID = executionScope.getCustomResourceID(); log.debug( "Event processing finished. Scope: {}, PostExecutionControl: {}", executionScope, postExecutionControl); - unsetUnderExecution(executionScope.getCustomResourceID()); + unsetUnderExecution(customResourceID); // If a delete event present at this phase, it was received during reconciliation. // So we either removed the finalizer during reconciliation or we don't use finalizers. // Either way we don't want to retry. if (retry != null && postExecutionControl.exceptionDuringExecution() && - !eventMarker.deleteEventPresent(executionScope.getCustomResourceID())) { + !eventMarker.deleteEventPresent(customResourceID)) { handleRetryOnException(executionScope); // todo revisit monitoring since events are not present anymore // final var monitor = monitor(); executionScope.getEvents().forEach(e -> @@ -210,11 +211,15 @@ void eventProcessingFinished( if (retry != null) { handleSuccessfulExecutionRegardingRetry(executionScope); } - if (eventMarker.deleteEventPresent(executionScope.getCustomResourceID())) { + if (eventMarker.deleteEventPresent(customResourceID)) { cleanupForDeletedEvent(executionScope.getCustomResourceID()); } else { - if (eventMarker.eventPresent(executionScope.getCustomResourceID())) { - submitReconciliationExecution(executionScope.getCustomResourceID()); + if (eventMarker.eventPresent(customResourceID)) { + if (isCacheReadyForInstantReconciliation(executionScope, postExecutionControl)) { + submitReconciliationExecution(customResourceID); + } else { + postponeReconciliationAndHandleCacheSyncEvent(customResourceID); + } } else { reScheduleExecutionIfInstructed(postExecutionControl, executionScope.getCustomResource()); @@ -225,6 +230,34 @@ void eventProcessingFinished( } } + private void postponeReconciliationAndHandleCacheSyncEvent(CustomResourceID customResourceID) { + eventSourceManager.getCustomResourceEventSource().allowNextEvent(customResourceID); + } + + private boolean isCacheReadyForInstantReconciliation(ExecutionScope executionScope, + PostExecutionControl postExecutionControl) { + if (!postExecutionControl.customResourceUpdatedDuringExecution()) { + return true; + } + String originalResourceVersion = getVersion(executionScope.getCustomResource()); + String customResourceVersionAfterExecution = getVersion(postExecutionControl + .getUpdatedCustomResource().get()); + String cachedCustomResourceVersion = getVersion(resourceCache + .getCustomResource(executionScope.getCustomResourceID()).get()); + + if (cachedCustomResourceVersion.equals(customResourceVersionAfterExecution)) { + return true; + } + if (cachedCustomResourceVersion.equals(originalResourceVersion)) { + return false; + } + // If the cached resource version equals neither the version before of after execution + // probably an update happened on the custom resource independent of the framework during + // reconciliation. We cannot tell at this point if it happened before our update or before. + // (Well we could if we would parse resource version, but that should not be done by definition) + return true; + } + private void reScheduleExecutionIfInstructed(PostExecutionControl postExecutionControl, R customResource) { postExecutionControl.getReScheduleDelay().ifPresent(delay -> eventSourceManager diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java index ef76244de3..02f27a010a 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java @@ -108,7 +108,7 @@ public Set getRegisteredEventSources() { } @Override - public CustomResourceEventSource getCustomResourceEventSource() { + public CustomResourceEventSource getCustomResourceEventSource() { return customResourceEventSource; } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CacheSyncNotificationEventFilter.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CacheSyncNotificationEventFilter.java new file mode 100644 index 0000000000..6c12597be7 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CacheSyncNotificationEventFilter.java @@ -0,0 +1,49 @@ +package io.javaoperatorsdk.operator.processing.event.internal; + +import java.util.HashSet; +import java.util.Set; +import java.util.concurrent.locks.ReentrantLock; + +import io.fabric8.kubernetes.client.CustomResource; +import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; +import io.javaoperatorsdk.operator.processing.event.CustomResourceID; + +public class CacheSyncNotificationEventFilter + implements CustomResourceEventFilter { + + private ReentrantLock lock = new ReentrantLock(); + private Set whiteList = new HashSet<>(); + + @Override + public boolean acceptChange(ControllerConfiguration configuration, T oldResource, + T newResource) { + lock.lock(); + try { + CustomResourceID customResourceID = CustomResourceID.fromResource(newResource); + boolean res = whiteList.contains(customResourceID); + cleanup(customResourceID); + return res; + } finally { + lock.unlock(); + } + } + + public void allowNextEvent(CustomResourceID customResourceID) { + lock.lock(); + try { + whiteList.add(customResourceID); + } finally { + lock.unlock(); + } + } + + public void cleanup(CustomResourceID customResourceID) { + lock.lock(); + try { + whiteList.remove(customResourceID); + } finally { + lock.unlock(); + } + } + +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java index 3b56e5b685..8b22911757 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java @@ -3,6 +3,7 @@ import java.io.IOException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -41,11 +42,30 @@ public class CustomResourceEventSource> extends A private final Map> sharedIndexInformers = new ConcurrentHashMap<>(); private final ObjectMapper cloningObjectMapper; + private final CustomResourceEventFilter filter; + private final CacheSyncNotificationEventFilter cacheSyncNotificationEventFilter; + public CustomResourceEventSource(ConfiguredController controller) { this.controller = controller; this.cloningObjectMapper = controller.getConfiguration().getConfigurationService().getObjectMapper(); + + var filters = Arrays.stream(new CustomResourceEventFilter[] { + CustomResourceEventFilters.finalizerNeededAndApplied(), + CustomResourceEventFilters.markedForDeletion(), + CustomResourceEventFilters.and( + controller.getConfiguration().getEventFilter(), + CustomResourceEventFilters.generationAware())}) + .collect(Collectors.toList()); + + if (controller.getConfiguration().isGenerationAware()) { + cacheSyncNotificationEventFilter = new CacheSyncNotificationEventFilter<>(); + filters.add(cacheSyncNotificationEventFilter); + } else { + cacheSyncNotificationEventFilter = null; + } + filter = CustomResourceEventFilters.or(filters.toArray(new CustomResourceEventFilter[0])); } @Override @@ -90,7 +110,7 @@ public void start() { @Override public void close() throws IOException { eventHandler.close(); - for (SharedIndexInformer informer : sharedIndexInformers.values()) { + for (SharedIndexInformer informer : sharedIndexInformers.values()) { try { log.info("Closing informer {} -> {}", controller, informer); informer.close(); @@ -104,13 +124,6 @@ public void eventReceived(ResourceAction action, T customResource, T oldResource log.debug( "Event received for resource: {}", getName(customResource)); - final CustomResourceEventFilter filter = CustomResourceEventFilters.or( - CustomResourceEventFilters.finalizerNeededAndApplied(), - CustomResourceEventFilters.markedForDeletion(), - CustomResourceEventFilters.and( - controller.getConfiguration().getEventFilter(), - CustomResourceEventFilters.generationAware())); - if (filter.acceptChange(controller.getConfiguration(), oldResource, customResource)) { eventHandler.handleEvent( new CustomResourceEvent(action, CustomResourceID.fromResource(customResource))); @@ -171,4 +184,10 @@ private T clone(T customResource) { throw new IllegalStateException(e); } } + + public void allowNextEvent(CustomResourceID customResourceID) { + if (cacheSyncNotificationEventFilter != null) { + cacheSyncNotificationEventFilter.allowNextEvent(customResourceID); + } + } } From 1e53eaac9b4b0ecd74a7a27a229d3e28a487fe9f Mon Sep 17 00:00:00 2001 From: csviri Date: Thu, 14 Oct 2021 16:29:50 +0200 Subject: [PATCH 2/8] fix: refining, namings etc --- .../processing/DefaultEventHandler.java | 2 +- .../internal/CustomResourceEventSource.java | 20 ++++++--- ... OnceWhitelistEventFilterEventFilter.java} | 4 +- .../processing/DefaultEventHandlerTest.java | 6 +++ ...ceWhitelistEventFilterEventFilterTest.java | 43 +++++++++++++++++++ 5 files changed, 65 insertions(+), 10 deletions(-) rename operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/{CacheSyncNotificationEventFilter.java => OnceWhitelistEventFilterEventFilter.java} (89%) create mode 100644 operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/internal/OnceWhitelistEventFilterEventFilterTest.java diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java index dfa1491855..48200d6048 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java @@ -231,7 +231,7 @@ void eventProcessingFinished( } private void postponeReconciliationAndHandleCacheSyncEvent(CustomResourceID customResourceID) { - eventSourceManager.getCustomResourceEventSource().allowNextEvent(customResourceID); + eventSourceManager.getCustomResourceEventSource().whitelistNextEvent(customResourceID); } private boolean isCacheReadyForInstantReconciliation(ExecutionScope executionScope, diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java index 8b22911757..7136dfefc8 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java @@ -43,7 +43,7 @@ public class CustomResourceEventSource> extends A new ConcurrentHashMap<>(); private final ObjectMapper cloningObjectMapper; private final CustomResourceEventFilter filter; - private final CacheSyncNotificationEventFilter cacheSyncNotificationEventFilter; + private final OnceWhitelistEventFilterEventFilter onceWhitelistEventFilterEventFilter; public CustomResourceEventSource(ConfiguredController controller) { @@ -60,10 +60,10 @@ public CustomResourceEventSource(ConfiguredController controller) { .collect(Collectors.toList()); if (controller.getConfiguration().isGenerationAware()) { - cacheSyncNotificationEventFilter = new CacheSyncNotificationEventFilter<>(); - filters.add(cacheSyncNotificationEventFilter); + onceWhitelistEventFilterEventFilter = new OnceWhitelistEventFilterEventFilter<>(); + filters.add(onceWhitelistEventFilterEventFilter); } else { - cacheSyncNotificationEventFilter = null; + onceWhitelistEventFilterEventFilter = null; } filter = CustomResourceEventFilters.or(filters.toArray(new CustomResourceEventFilter[0])); } @@ -185,9 +185,15 @@ private T clone(T customResource) { } } - public void allowNextEvent(CustomResourceID customResourceID) { - if (cacheSyncNotificationEventFilter != null) { - cacheSyncNotificationEventFilter.allowNextEvent(customResourceID); + /** + * This will ensure that the next event received after this method is called will not be filtered + * out. + * + * @param customResourceID - to which the event is related + */ + public void whitelistNextEvent(CustomResourceID customResourceID) { + if (onceWhitelistEventFilterEventFilter != null) { + onceWhitelistEventFilterEventFilter.whitelistNextEvent(customResourceID); } } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CacheSyncNotificationEventFilter.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/OnceWhitelistEventFilterEventFilter.java similarity index 89% rename from operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CacheSyncNotificationEventFilter.java rename to operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/OnceWhitelistEventFilterEventFilter.java index 6c12597be7..c31d578a06 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CacheSyncNotificationEventFilter.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/OnceWhitelistEventFilterEventFilter.java @@ -8,7 +8,7 @@ import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; import io.javaoperatorsdk.operator.processing.event.CustomResourceID; -public class CacheSyncNotificationEventFilter +public class OnceWhitelistEventFilterEventFilter implements CustomResourceEventFilter { private ReentrantLock lock = new ReentrantLock(); @@ -28,7 +28,7 @@ public boolean acceptChange(ControllerConfiguration configuration, T oldResou } } - public void allowNextEvent(CustomResourceID customResourceID) { + public void whitelistNextEvent(CustomResourceID customResourceID) { lock.lock(); try { whiteList.add(customResourceID); diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java index 1ecc551951..da39544bc8 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java @@ -25,6 +25,7 @@ import static io.javaoperatorsdk.operator.TestUtils.testCustomResource; import static io.javaoperatorsdk.operator.processing.event.internal.ResourceAction.DELETED; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.fail; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.*; @@ -225,6 +226,11 @@ public void cleansUpAfterExecutionIfOnlyDeleteEventMarkLeft() { .cleanupForCustomResource(eq(crEvent.getRelatedCustomResourceID())); } + @Test + public void whitelistNextEventIfTheCacheIsNotPropagatedAfterAnUpdate() { + fail("todo"); + } + private CustomResourceID eventAlreadyUnderProcessing() { when(eventDispatcherMock.handleExecution(any())) .then( diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/internal/OnceWhitelistEventFilterEventFilterTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/internal/OnceWhitelistEventFilterEventFilterTest.java new file mode 100644 index 0000000000..fde6bb2a7f --- /dev/null +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/internal/OnceWhitelistEventFilterEventFilterTest.java @@ -0,0 +1,43 @@ +package io.javaoperatorsdk.operator.processing.event.internal; + +import org.junit.jupiter.api.Test; + +import io.javaoperatorsdk.operator.TestUtils; +import io.javaoperatorsdk.operator.processing.event.CustomResourceID; + +import static io.javaoperatorsdk.operator.TestUtils.testCustomResource; +import static org.assertj.core.api.Assertions.assertThat; + +class OnceWhitelistEventFilterEventFilterTest { + + private OnceWhitelistEventFilterEventFilter filter = new OnceWhitelistEventFilterEventFilter<>(); + + @Test + public void notAcceptCustomResourceNotWhitelisted() { + assertThat(filter.acceptChange(null, + testCustomResource(), testCustomResource())).isFalse(); + } + + @Test + public void allowCustomResourceWhitelisted() { + var cr = TestUtils.testCustomResource(); + + filter.whitelistNextEvent(CustomResourceID.fromResource(cr)); + + assertThat(filter.acceptChange(null, + cr, cr)).isTrue(); + } + + @Test + public void allowCustomResourceWhitelistedOnlyOnce() { + var cr = TestUtils.testCustomResource(); + + filter.whitelistNextEvent(CustomResourceID.fromResource(cr)); + + assertThat(filter.acceptChange(null, + cr, cr)).isTrue(); + assertThat(filter.acceptChange(null, + cr, cr)).isFalse(); + } + +} From 5a8fda790b556907e23e5aba12466497a7b5753c Mon Sep 17 00:00:00 2001 From: csviri Date: Fri, 15 Oct 2021 09:47:54 +0200 Subject: [PATCH 3/8] fix: improvements and unit tests --- .../processing/event/CustomResourceID.java | 8 +++ .../OnceWhitelistEventFilterEventFilter.java | 9 +++ .../javaoperatorsdk/operator/TestUtils.java | 1 + .../processing/DefaultEventHandlerTest.java | 65 ++++++++++++++++--- .../CustomResourceEventSourceTest.java | 29 +++++++-- 5 files changed, 100 insertions(+), 12 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/CustomResourceID.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/CustomResourceID.java index d405e48a5a..b668a772dd 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/CustomResourceID.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/CustomResourceID.java @@ -47,4 +47,12 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash(name, namespace); } + + @Override + public String toString() { + return "CustomResourceID{" + + "name='" + name + '\'' + + ", namespace='" + namespace + '\'' + + '}'; + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/OnceWhitelistEventFilterEventFilter.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/OnceWhitelistEventFilterEventFilter.java index c31d578a06..d93c77db49 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/OnceWhitelistEventFilterEventFilter.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/OnceWhitelistEventFilterEventFilter.java @@ -4,6 +4,9 @@ import java.util.Set; import java.util.concurrent.locks.ReentrantLock; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import io.fabric8.kubernetes.client.CustomResource; import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; import io.javaoperatorsdk.operator.processing.event.CustomResourceID; @@ -11,6 +14,9 @@ public class OnceWhitelistEventFilterEventFilter implements CustomResourceEventFilter { + private static final Logger log = + LoggerFactory.getLogger(OnceWhitelistEventFilterEventFilter.class); + private ReentrantLock lock = new ReentrantLock(); private Set whiteList = new HashSet<>(); @@ -22,6 +28,9 @@ public boolean acceptChange(ControllerConfiguration configuration, T oldResou CustomResourceID customResourceID = CustomResourceID.fromResource(newResource); boolean res = whiteList.contains(customResourceID); cleanup(customResourceID); + if (res) { + log.debug("Accepting whitelisted event for CR id: {}", customResourceID); + } return res; } finally { lock.unlock(); diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/TestUtils.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/TestUtils.java index c1e887de86..49dc35abdb 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/TestUtils.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/TestUtils.java @@ -32,6 +32,7 @@ public static TestCustomResource testCustomResource(CustomResourceID id) { resource.setMetadata( new ObjectMetaBuilder() .withName(id.getName()) + .withResourceVersion("1") .withGeneration(1L) .withNamespace(id.getNamespace().orElse(null)) .build()); diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java index da39544bc8..bc1b43fb21 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java @@ -17,6 +17,7 @@ import io.javaoperatorsdk.operator.processing.event.DefaultEventSourceManager; import io.javaoperatorsdk.operator.processing.event.Event; import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEvent; +import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEventSource; import io.javaoperatorsdk.operator.processing.event.internal.ResourceAction; import io.javaoperatorsdk.operator.processing.event.internal.TimerEventSource; import io.javaoperatorsdk.operator.processing.retry.GenericRetry; @@ -25,7 +26,6 @@ import static io.javaoperatorsdk.operator.TestUtils.testCustomResource; import static io.javaoperatorsdk.operator.processing.event.internal.ResourceAction.DELETED; import static org.assertj.core.api.Assertions.assertThat; -import static org.assertj.core.api.Assertions.fail; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.*; @@ -40,15 +40,15 @@ class DefaultEventHandlerTest { private EventDispatcher eventDispatcherMock = mock(EventDispatcher.class); private DefaultEventSourceManager defaultEventSourceManagerMock = mock(DefaultEventSourceManager.class); - private ResourceCache resourceCache = mock(ResourceCache.class); + private ResourceCache resourceCacheMock = mock(ResourceCache.class); private TimerEventSource retryTimerEventSourceMock = mock(TimerEventSource.class); private DefaultEventHandler defaultEventHandler = - new DefaultEventHandler(eventDispatcherMock, resourceCache, "Test", null, eventMarker); + new DefaultEventHandler(eventDispatcherMock, resourceCacheMock, "Test", null, eventMarker); private DefaultEventHandler defaultEventHandlerWithRetry = - new DefaultEventHandler(eventDispatcherMock, resourceCache, "Test", + new DefaultEventHandler(eventDispatcherMock, resourceCacheMock, "Test", GenericRetry.defaultLimitedExponentialRetry(), eventMarker); @BeforeEach @@ -69,7 +69,7 @@ public void dispatchesEventsIfNoExecutionInProgress() { @Test public void skipProcessingIfLatestCustomResourceNotInCache() { Event event = prepareCREvent(); - when(resourceCache.getCustomResource(event.getRelatedCustomResourceID())) + when(resourceCacheMock.getCustomResource(event.getRelatedCustomResourceID())) .thenReturn(Optional.empty()); defaultEventHandler.handleEvent(event); @@ -214,7 +214,7 @@ public void cleansUpWhenDeleteEventReceivedAndNoEventPresent() { @Test public void cleansUpAfterExecutionIfOnlyDeleteEventMarkLeft() { - var cr = testCustomResource(new CustomResourceID(UUID.randomUUID().toString())); + var cr = testCustomResource(); var crEvent = prepareCREvent(CustomResourceID.fromResource(cr)); eventMarker.markDeleteEventReceived(crEvent.getRelatedCustomResourceID()); var executionScope = new ExecutionScope(cr, null); @@ -228,7 +228,56 @@ public void cleansUpAfterExecutionIfOnlyDeleteEventMarkLeft() { @Test public void whitelistNextEventIfTheCacheIsNotPropagatedAfterAnUpdate() { - fail("todo"); + var crID = new CustomResourceID("test-cr", TEST_NAMESPACE); + var cr = testCustomResource(crID); + var updatedCr = testCustomResource(crID); + updatedCr.getMetadata().setResourceVersion("2"); + var mockCREventSource = mock(CustomResourceEventSource.class); + eventMarker.markEventReceived(crID); + when(resourceCacheMock.getCustomResource(eq(crID))).thenReturn(Optional.of(cr)); + when(defaultEventSourceManagerMock.getCustomResourceEventSource()) + .thenReturn(mockCREventSource); + + defaultEventHandler.eventProcessingFinished(new ExecutionScope(cr, null), + PostExecutionControl.customResourceUpdated(updatedCr)); + + verify(mockCREventSource, times(1)).whitelistNextEvent(eq(crID)); + } + + @Test + public void dontWhitelistsEventWhenOtherChangeDuringExecution() { + var crID = new CustomResourceID("test-cr", TEST_NAMESPACE); + var cr = testCustomResource(crID); + var updatedCr = testCustomResource(crID); + updatedCr.getMetadata().setResourceVersion("2"); + var otherChangeCR = testCustomResource(crID); + otherChangeCR.getMetadata().setResourceVersion("3"); + var mockCREventSource = mock(CustomResourceEventSource.class); + eventMarker.markEventReceived(crID); + when(resourceCacheMock.getCustomResource(eq(crID))).thenReturn(Optional.of(otherChangeCR)); + when(defaultEventSourceManagerMock.getCustomResourceEventSource()) + .thenReturn(mockCREventSource); + + defaultEventHandler.eventProcessingFinished(new ExecutionScope(cr, null), + PostExecutionControl.customResourceUpdated(updatedCr)); + + verify(mockCREventSource, times(0)).whitelistNextEvent(eq(crID)); + } + + @Test + public void dontWhitelistsEventIfUpdatedEventInCache() { + var crID = new CustomResourceID("test-cr", TEST_NAMESPACE); + var cr = testCustomResource(crID); + var mockCREventSource = mock(CustomResourceEventSource.class); + eventMarker.markEventReceived(crID); + when(resourceCacheMock.getCustomResource(eq(crID))).thenReturn(Optional.of(cr)); + when(defaultEventSourceManagerMock.getCustomResourceEventSource()) + .thenReturn(mockCREventSource); + + defaultEventHandler.eventProcessingFinished(new ExecutionScope(cr, null), + PostExecutionControl.customResourceUpdated(cr)); + + verify(mockCREventSource, times(0)).whitelistNextEvent(eq(crID)); } private CustomResourceID eventAlreadyUnderProcessing() { @@ -249,7 +298,7 @@ private CustomResourceEvent prepareCREvent() { private CustomResourceEvent prepareCREvent(CustomResourceID uid) { TestCustomResource customResource = testCustomResource(uid); - when(resourceCache.getCustomResource(eq(uid))).thenReturn(Optional.of(customResource)); + when(resourceCacheMock.getCustomResource(eq(uid))).thenReturn(Optional.of(customResource)); return new CustomResourceEvent(ResourceAction.UPDATED, CustomResourceID.fromResource(customResource)); } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSourceTest.java index 4bf7d1afd7..0bd736b6c2 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSourceTest.java @@ -14,6 +14,7 @@ import io.javaoperatorsdk.operator.api.config.ConfigurationService; import io.javaoperatorsdk.operator.api.config.DefaultControllerConfiguration; import io.javaoperatorsdk.operator.processing.ConfiguredController; +import io.javaoperatorsdk.operator.processing.event.CustomResourceID; import io.javaoperatorsdk.operator.processing.event.EventHandler; import io.javaoperatorsdk.operator.sample.simple.TestCustomResource; @@ -103,16 +104,36 @@ public void handlesAllEventIfNotGenerationAware() { } @Test - public void eventNotMarkedForLastGenerationIfNoFinalizer() { + public void eventWithNoGenerationProcessedIfNoFinalizer() { TestCustomResource customResource1 = TestUtils.testCustomResource(); customResourceEventSource.eventReceived(ResourceAction.UPDATED, customResource1, customResource1); + verify(eventHandler, times(1)).handleEvent(any()); + } - customResourceEventSource.eventReceived(ResourceAction.UPDATED, customResource1, - customResource1); - verify(eventHandler, times(2)).handleEvent(any()); + @Test + public void handlesNextEventIfWhitelisted() { + TestCustomResource customResource = TestUtils.testCustomResource(); + customResource.getMetadata().setFinalizers(List.of(FINALIZER)); + customResourceEventSource.whitelistNextEvent(CustomResourceID.fromResource(customResource)); + + customResourceEventSource.eventReceived(ResourceAction.UPDATED, customResource, + customResource); + + verify(eventHandler, times(1)).handleEvent(any()); + } + + @Test + public void notHandlesNextEventIfNotWhitelisted() { + TestCustomResource customResource = TestUtils.testCustomResource(); + customResource.getMetadata().setFinalizers(List.of(FINALIZER)); + + customResourceEventSource.eventReceived(ResourceAction.UPDATED, customResource, + customResource); + + verify(eventHandler, times(0)).handleEvent(any()); } private static class TestConfiguredController extends ConfiguredController { From 2ea48e477f5d52ffa89138972be29153f5f315c4 Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Tue, 19 Oct 2021 18:53:33 +0200 Subject: [PATCH 4/8] refactor: simplify filter creation --- .../event/internal/CustomResourceEventSource.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java index 7136dfefc8..58e1b3bf54 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java @@ -3,7 +3,6 @@ import java.io.IOException; import java.util.*; import java.util.concurrent.ConcurrentHashMap; -import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,21 +50,22 @@ public CustomResourceEventSource(ConfiguredController controller) { this.cloningObjectMapper = controller.getConfiguration().getConfigurationService().getObjectMapper(); - var filters = Arrays.stream(new CustomResourceEventFilter[] { + var filters = new CustomResourceEventFilter[] { CustomResourceEventFilters.finalizerNeededAndApplied(), CustomResourceEventFilters.markedForDeletion(), CustomResourceEventFilters.and( controller.getConfiguration().getEventFilter(), - CustomResourceEventFilters.generationAware())}) - .collect(Collectors.toList()); + CustomResourceEventFilters.generationAware()), + null + }; if (controller.getConfiguration().isGenerationAware()) { onceWhitelistEventFilterEventFilter = new OnceWhitelistEventFilterEventFilter<>(); - filters.add(onceWhitelistEventFilterEventFilter); + filters[filters.length - 1] = onceWhitelistEventFilterEventFilter; } else { onceWhitelistEventFilterEventFilter = null; } - filter = CustomResourceEventFilters.or(filters.toArray(new CustomResourceEventFilter[0])); + filter = CustomResourceEventFilters.or(filters); } @Override From b20722b284c5120abdface284e5794efa7c76b33 Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Tue, 19 Oct 2021 18:59:00 +0200 Subject: [PATCH 5/8] refactor: simplify white listing filter by using concurrent hash map --- .../OnceWhitelistEventFilterEventFilter.java | 44 +++++-------------- 1 file changed, 11 insertions(+), 33 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/OnceWhitelistEventFilterEventFilter.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/OnceWhitelistEventFilterEventFilter.java index d93c77db49..4c144e65e6 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/OnceWhitelistEventFilterEventFilter.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/OnceWhitelistEventFilterEventFilter.java @@ -1,8 +1,7 @@ package io.javaoperatorsdk.operator.processing.event.internal; -import java.util.HashSet; -import java.util.Set; -import java.util.concurrent.locks.ReentrantLock; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -11,48 +10,27 @@ import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; import io.javaoperatorsdk.operator.processing.event.CustomResourceID; -public class OnceWhitelistEventFilterEventFilter +public class OnceWhitelistEventFilterEventFilter> implements CustomResourceEventFilter { private static final Logger log = LoggerFactory.getLogger(OnceWhitelistEventFilterEventFilter.class); - private ReentrantLock lock = new ReentrantLock(); - private Set whiteList = new HashSet<>(); + private final ConcurrentMap whiteList = + new ConcurrentHashMap<>(); @Override public boolean acceptChange(ControllerConfiguration configuration, T oldResource, T newResource) { - lock.lock(); - try { - CustomResourceID customResourceID = CustomResourceID.fromResource(newResource); - boolean res = whiteList.contains(customResourceID); - cleanup(customResourceID); - if (res) { - log.debug("Accepting whitelisted event for CR id: {}", customResourceID); - } - return res; - } finally { - lock.unlock(); + CustomResourceID customResourceID = CustomResourceID.fromResource(newResource); + boolean res = whiteList.remove(customResourceID, customResourceID); + if (res) { + log.debug("Accepting whitelisted event for CR id: {}", customResourceID); } + return res; } public void whitelistNextEvent(CustomResourceID customResourceID) { - lock.lock(); - try { - whiteList.add(customResourceID); - } finally { - lock.unlock(); - } + whiteList.putIfAbsent(customResourceID, customResourceID); } - - public void cleanup(CustomResourceID customResourceID) { - lock.lock(); - try { - whiteList.remove(customResourceID); - } finally { - lock.unlock(); - } - } - } From 700b1802654aad80bb60ef31c9b16f5d60c3a0c3 Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Tue, 19 Oct 2021 19:01:51 +0200 Subject: [PATCH 6/8] fix: typo --- .../operator/processing/DefaultEventHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java index 48200d6048..ac832bc273 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java @@ -251,7 +251,7 @@ private boolean isCacheReadyForInstantReconciliation(ExecutionScope execution if (cachedCustomResourceVersion.equals(originalResourceVersion)) { return false; } - // If the cached resource version equals neither the version before of after execution + // If the cached resource version equals neither the version before or after execution // probably an update happened on the custom resource independent of the framework during // reconciliation. We cannot tell at this point if it happened before our update or before. // (Well we could if we would parse resource version, but that should not be done by definition) From 04d8c093377a79241308399e5fedbb025542f1c8 Mon Sep 17 00:00:00 2001 From: csviri Date: Wed, 20 Oct 2021 10:03:37 +0200 Subject: [PATCH 7/8] fix: throw exception in case of missing custom resources --- .../operator/processing/DefaultEventHandler.java | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java index ac832bc273..c93c6ec34c 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java @@ -241,9 +241,11 @@ private boolean isCacheReadyForInstantReconciliation(ExecutionScope execution } String originalResourceVersion = getVersion(executionScope.getCustomResource()); String customResourceVersionAfterExecution = getVersion(postExecutionControl - .getUpdatedCustomResource().get()); + .getUpdatedCustomResource() + .orElseThrow(() -> new IllegalStateException("Updated custom resource must be present at this point of time"))); String cachedCustomResourceVersion = getVersion(resourceCache - .getCustomResource(executionScope.getCustomResourceID()).get()); + .getCustomResource(executionScope.getCustomResourceID()) + .orElseThrow(() -> new IllegalStateException("Cached custom resource must be present at this point"))); if (cachedCustomResourceVersion.equals(customResourceVersionAfterExecution)) { return true; @@ -251,7 +253,7 @@ private boolean isCacheReadyForInstantReconciliation(ExecutionScope execution if (cachedCustomResourceVersion.equals(originalResourceVersion)) { return false; } - // If the cached resource version equals neither the version before or after execution + // If the cached resource version equals neither the version before nor after execution // probably an update happened on the custom resource independent of the framework during // reconciliation. We cannot tell at this point if it happened before our update or before. // (Well we could if we would parse resource version, but that should not be done by definition) From a0ff4d505119e43df72e07041340b3748294537e Mon Sep 17 00:00:00 2001 From: csviri Date: Wed, 20 Oct 2021 10:05:53 +0200 Subject: [PATCH 8/8] fix: formatting --- .../operator/processing/DefaultEventHandler.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java index c93c6ec34c..d466eef95d 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java @@ -242,10 +242,12 @@ private boolean isCacheReadyForInstantReconciliation(ExecutionScope execution String originalResourceVersion = getVersion(executionScope.getCustomResource()); String customResourceVersionAfterExecution = getVersion(postExecutionControl .getUpdatedCustomResource() - .orElseThrow(() -> new IllegalStateException("Updated custom resource must be present at this point of time"))); + .orElseThrow(() -> new IllegalStateException( + "Updated custom resource must be present at this point of time"))); String cachedCustomResourceVersion = getVersion(resourceCache .getCustomResource(executionScope.getCustomResourceID()) - .orElseThrow(() -> new IllegalStateException("Cached custom resource must be present at this point"))); + .orElseThrow(() -> new IllegalStateException( + "Cached custom resource must be present at this point"))); if (cachedCustomResourceVersion.equals(customResourceVersionAfterExecution)) { return true;