diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java index 608a47567b..d466eef95d 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/DefaultEventHandler.java @@ -27,6 +27,7 @@ import io.javaoperatorsdk.operator.processing.retry.RetryExecution; import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getName; +import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getVersion; /** * Event handler that makes sure that events are processed in a "single threaded" way per resource @@ -188,18 +189,18 @@ void eventProcessingFinished( if (!running) { return; } - + CustomResourceID customResourceID = executionScope.getCustomResourceID(); log.debug( "Event processing finished. Scope: {}, PostExecutionControl: {}", executionScope, postExecutionControl); - unsetUnderExecution(executionScope.getCustomResourceID()); + unsetUnderExecution(customResourceID); // If a delete event present at this phase, it was received during reconciliation. // So we either removed the finalizer during reconciliation or we don't use finalizers. // Either way we don't want to retry. if (retry != null && postExecutionControl.exceptionDuringExecution() && - !eventMarker.deleteEventPresent(executionScope.getCustomResourceID())) { + !eventMarker.deleteEventPresent(customResourceID)) { handleRetryOnException(executionScope); // todo revisit monitoring since events are not present anymore // final var monitor = monitor(); executionScope.getEvents().forEach(e -> @@ -210,11 +211,15 @@ void eventProcessingFinished( if (retry != null) { handleSuccessfulExecutionRegardingRetry(executionScope); } - if (eventMarker.deleteEventPresent(executionScope.getCustomResourceID())) { + if (eventMarker.deleteEventPresent(customResourceID)) { cleanupForDeletedEvent(executionScope.getCustomResourceID()); } else { - if (eventMarker.eventPresent(executionScope.getCustomResourceID())) { - submitReconciliationExecution(executionScope.getCustomResourceID()); + if (eventMarker.eventPresent(customResourceID)) { + if (isCacheReadyForInstantReconciliation(executionScope, postExecutionControl)) { + submitReconciliationExecution(customResourceID); + } else { + postponeReconciliationAndHandleCacheSyncEvent(customResourceID); + } } else { reScheduleExecutionIfInstructed(postExecutionControl, executionScope.getCustomResource()); @@ -225,6 +230,38 @@ void eventProcessingFinished( } } + private void postponeReconciliationAndHandleCacheSyncEvent(CustomResourceID customResourceID) { + eventSourceManager.getCustomResourceEventSource().whitelistNextEvent(customResourceID); + } + + private boolean isCacheReadyForInstantReconciliation(ExecutionScope executionScope, + PostExecutionControl postExecutionControl) { + if (!postExecutionControl.customResourceUpdatedDuringExecution()) { + return true; + } + String originalResourceVersion = getVersion(executionScope.getCustomResource()); + String customResourceVersionAfterExecution = getVersion(postExecutionControl + .getUpdatedCustomResource() + .orElseThrow(() -> new IllegalStateException( + "Updated custom resource must be present at this point of time"))); + String cachedCustomResourceVersion = getVersion(resourceCache + .getCustomResource(executionScope.getCustomResourceID()) + .orElseThrow(() -> new IllegalStateException( + "Cached custom resource must be present at this point"))); + + if (cachedCustomResourceVersion.equals(customResourceVersionAfterExecution)) { + return true; + } + if (cachedCustomResourceVersion.equals(originalResourceVersion)) { + return false; + } + // If the cached resource version equals neither the version before nor after execution + // probably an update happened on the custom resource independent of the framework during + // reconciliation. We cannot tell at this point if it happened before our update or before. + // (Well we could if we would parse resource version, but that should not be done by definition) + return true; + } + private void reScheduleExecutionIfInstructed(PostExecutionControl postExecutionControl, R customResource) { postExecutionControl.getReScheduleDelay().ifPresent(delay -> eventSourceManager diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/CustomResourceID.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/CustomResourceID.java index d405e48a5a..b668a772dd 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/CustomResourceID.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/CustomResourceID.java @@ -47,4 +47,12 @@ public boolean equals(Object o) { public int hashCode() { return Objects.hash(name, namespace); } + + @Override + public String toString() { + return "CustomResourceID{" + + "name='" + name + '\'' + + ", namespace='" + namespace + '\'' + + '}'; + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java index ef76244de3..02f27a010a 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/DefaultEventSourceManager.java @@ -108,7 +108,7 @@ public Set getRegisteredEventSources() { } @Override - public CustomResourceEventSource getCustomResourceEventSource() { + public CustomResourceEventSource getCustomResourceEventSource() { return customResourceEventSource; } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java index 3b56e5b685..58e1b3bf54 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSource.java @@ -41,11 +41,31 @@ public class CustomResourceEventSource> extends A private final Map> sharedIndexInformers = new ConcurrentHashMap<>(); private final ObjectMapper cloningObjectMapper; + private final CustomResourceEventFilter filter; + private final OnceWhitelistEventFilterEventFilter onceWhitelistEventFilterEventFilter; + public CustomResourceEventSource(ConfiguredController controller) { this.controller = controller; this.cloningObjectMapper = controller.getConfiguration().getConfigurationService().getObjectMapper(); + + var filters = new CustomResourceEventFilter[] { + CustomResourceEventFilters.finalizerNeededAndApplied(), + CustomResourceEventFilters.markedForDeletion(), + CustomResourceEventFilters.and( + controller.getConfiguration().getEventFilter(), + CustomResourceEventFilters.generationAware()), + null + }; + + if (controller.getConfiguration().isGenerationAware()) { + onceWhitelistEventFilterEventFilter = new OnceWhitelistEventFilterEventFilter<>(); + filters[filters.length - 1] = onceWhitelistEventFilterEventFilter; + } else { + onceWhitelistEventFilterEventFilter = null; + } + filter = CustomResourceEventFilters.or(filters); } @Override @@ -90,7 +110,7 @@ public void start() { @Override public void close() throws IOException { eventHandler.close(); - for (SharedIndexInformer informer : sharedIndexInformers.values()) { + for (SharedIndexInformer informer : sharedIndexInformers.values()) { try { log.info("Closing informer {} -> {}", controller, informer); informer.close(); @@ -104,13 +124,6 @@ public void eventReceived(ResourceAction action, T customResource, T oldResource log.debug( "Event received for resource: {}", getName(customResource)); - final CustomResourceEventFilter filter = CustomResourceEventFilters.or( - CustomResourceEventFilters.finalizerNeededAndApplied(), - CustomResourceEventFilters.markedForDeletion(), - CustomResourceEventFilters.and( - controller.getConfiguration().getEventFilter(), - CustomResourceEventFilters.generationAware())); - if (filter.acceptChange(controller.getConfiguration(), oldResource, customResource)) { eventHandler.handleEvent( new CustomResourceEvent(action, CustomResourceID.fromResource(customResource))); @@ -171,4 +184,16 @@ private T clone(T customResource) { throw new IllegalStateException(e); } } + + /** + * This will ensure that the next event received after this method is called will not be filtered + * out. + * + * @param customResourceID - to which the event is related + */ + public void whitelistNextEvent(CustomResourceID customResourceID) { + if (onceWhitelistEventFilterEventFilter != null) { + onceWhitelistEventFilterEventFilter.whitelistNextEvent(customResourceID); + } + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/OnceWhitelistEventFilterEventFilter.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/OnceWhitelistEventFilterEventFilter.java new file mode 100644 index 0000000000..4c144e65e6 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/internal/OnceWhitelistEventFilterEventFilter.java @@ -0,0 +1,36 @@ +package io.javaoperatorsdk.operator.processing.event.internal; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.fabric8.kubernetes.client.CustomResource; +import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; +import io.javaoperatorsdk.operator.processing.event.CustomResourceID; + +public class OnceWhitelistEventFilterEventFilter> + implements CustomResourceEventFilter { + + private static final Logger log = + LoggerFactory.getLogger(OnceWhitelistEventFilterEventFilter.class); + + private final ConcurrentMap whiteList = + new ConcurrentHashMap<>(); + + @Override + public boolean acceptChange(ControllerConfiguration configuration, T oldResource, + T newResource) { + CustomResourceID customResourceID = CustomResourceID.fromResource(newResource); + boolean res = whiteList.remove(customResourceID, customResourceID); + if (res) { + log.debug("Accepting whitelisted event for CR id: {}", customResourceID); + } + return res; + } + + public void whitelistNextEvent(CustomResourceID customResourceID) { + whiteList.putIfAbsent(customResourceID, customResourceID); + } +} diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/TestUtils.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/TestUtils.java index c1e887de86..49dc35abdb 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/TestUtils.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/TestUtils.java @@ -32,6 +32,7 @@ public static TestCustomResource testCustomResource(CustomResourceID id) { resource.setMetadata( new ObjectMetaBuilder() .withName(id.getName()) + .withResourceVersion("1") .withGeneration(1L) .withNamespace(id.getNamespace().orElse(null)) .build()); diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java index 1ecc551951..bc1b43fb21 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/DefaultEventHandlerTest.java @@ -17,6 +17,7 @@ import io.javaoperatorsdk.operator.processing.event.DefaultEventSourceManager; import io.javaoperatorsdk.operator.processing.event.Event; import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEvent; +import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEventSource; import io.javaoperatorsdk.operator.processing.event.internal.ResourceAction; import io.javaoperatorsdk.operator.processing.event.internal.TimerEventSource; import io.javaoperatorsdk.operator.processing.retry.GenericRetry; @@ -39,15 +40,15 @@ class DefaultEventHandlerTest { private EventDispatcher eventDispatcherMock = mock(EventDispatcher.class); private DefaultEventSourceManager defaultEventSourceManagerMock = mock(DefaultEventSourceManager.class); - private ResourceCache resourceCache = mock(ResourceCache.class); + private ResourceCache resourceCacheMock = mock(ResourceCache.class); private TimerEventSource retryTimerEventSourceMock = mock(TimerEventSource.class); private DefaultEventHandler defaultEventHandler = - new DefaultEventHandler(eventDispatcherMock, resourceCache, "Test", null, eventMarker); + new DefaultEventHandler(eventDispatcherMock, resourceCacheMock, "Test", null, eventMarker); private DefaultEventHandler defaultEventHandlerWithRetry = - new DefaultEventHandler(eventDispatcherMock, resourceCache, "Test", + new DefaultEventHandler(eventDispatcherMock, resourceCacheMock, "Test", GenericRetry.defaultLimitedExponentialRetry(), eventMarker); @BeforeEach @@ -68,7 +69,7 @@ public void dispatchesEventsIfNoExecutionInProgress() { @Test public void skipProcessingIfLatestCustomResourceNotInCache() { Event event = prepareCREvent(); - when(resourceCache.getCustomResource(event.getRelatedCustomResourceID())) + when(resourceCacheMock.getCustomResource(event.getRelatedCustomResourceID())) .thenReturn(Optional.empty()); defaultEventHandler.handleEvent(event); @@ -213,7 +214,7 @@ public void cleansUpWhenDeleteEventReceivedAndNoEventPresent() { @Test public void cleansUpAfterExecutionIfOnlyDeleteEventMarkLeft() { - var cr = testCustomResource(new CustomResourceID(UUID.randomUUID().toString())); + var cr = testCustomResource(); var crEvent = prepareCREvent(CustomResourceID.fromResource(cr)); eventMarker.markDeleteEventReceived(crEvent.getRelatedCustomResourceID()); var executionScope = new ExecutionScope(cr, null); @@ -225,6 +226,60 @@ public void cleansUpAfterExecutionIfOnlyDeleteEventMarkLeft() { .cleanupForCustomResource(eq(crEvent.getRelatedCustomResourceID())); } + @Test + public void whitelistNextEventIfTheCacheIsNotPropagatedAfterAnUpdate() { + var crID = new CustomResourceID("test-cr", TEST_NAMESPACE); + var cr = testCustomResource(crID); + var updatedCr = testCustomResource(crID); + updatedCr.getMetadata().setResourceVersion("2"); + var mockCREventSource = mock(CustomResourceEventSource.class); + eventMarker.markEventReceived(crID); + when(resourceCacheMock.getCustomResource(eq(crID))).thenReturn(Optional.of(cr)); + when(defaultEventSourceManagerMock.getCustomResourceEventSource()) + .thenReturn(mockCREventSource); + + defaultEventHandler.eventProcessingFinished(new ExecutionScope(cr, null), + PostExecutionControl.customResourceUpdated(updatedCr)); + + verify(mockCREventSource, times(1)).whitelistNextEvent(eq(crID)); + } + + @Test + public void dontWhitelistsEventWhenOtherChangeDuringExecution() { + var crID = new CustomResourceID("test-cr", TEST_NAMESPACE); + var cr = testCustomResource(crID); + var updatedCr = testCustomResource(crID); + updatedCr.getMetadata().setResourceVersion("2"); + var otherChangeCR = testCustomResource(crID); + otherChangeCR.getMetadata().setResourceVersion("3"); + var mockCREventSource = mock(CustomResourceEventSource.class); + eventMarker.markEventReceived(crID); + when(resourceCacheMock.getCustomResource(eq(crID))).thenReturn(Optional.of(otherChangeCR)); + when(defaultEventSourceManagerMock.getCustomResourceEventSource()) + .thenReturn(mockCREventSource); + + defaultEventHandler.eventProcessingFinished(new ExecutionScope(cr, null), + PostExecutionControl.customResourceUpdated(updatedCr)); + + verify(mockCREventSource, times(0)).whitelistNextEvent(eq(crID)); + } + + @Test + public void dontWhitelistsEventIfUpdatedEventInCache() { + var crID = new CustomResourceID("test-cr", TEST_NAMESPACE); + var cr = testCustomResource(crID); + var mockCREventSource = mock(CustomResourceEventSource.class); + eventMarker.markEventReceived(crID); + when(resourceCacheMock.getCustomResource(eq(crID))).thenReturn(Optional.of(cr)); + when(defaultEventSourceManagerMock.getCustomResourceEventSource()) + .thenReturn(mockCREventSource); + + defaultEventHandler.eventProcessingFinished(new ExecutionScope(cr, null), + PostExecutionControl.customResourceUpdated(cr)); + + verify(mockCREventSource, times(0)).whitelistNextEvent(eq(crID)); + } + private CustomResourceID eventAlreadyUnderProcessing() { when(eventDispatcherMock.handleExecution(any())) .then( @@ -243,7 +298,7 @@ private CustomResourceEvent prepareCREvent() { private CustomResourceEvent prepareCREvent(CustomResourceID uid) { TestCustomResource customResource = testCustomResource(uid); - when(resourceCache.getCustomResource(eq(uid))).thenReturn(Optional.of(customResource)); + when(resourceCacheMock.getCustomResource(eq(uid))).thenReturn(Optional.of(customResource)); return new CustomResourceEvent(ResourceAction.UPDATED, CustomResourceID.fromResource(customResource)); } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSourceTest.java index 4bf7d1afd7..0bd736b6c2 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/internal/CustomResourceEventSourceTest.java @@ -14,6 +14,7 @@ import io.javaoperatorsdk.operator.api.config.ConfigurationService; import io.javaoperatorsdk.operator.api.config.DefaultControllerConfiguration; import io.javaoperatorsdk.operator.processing.ConfiguredController; +import io.javaoperatorsdk.operator.processing.event.CustomResourceID; import io.javaoperatorsdk.operator.processing.event.EventHandler; import io.javaoperatorsdk.operator.sample.simple.TestCustomResource; @@ -103,16 +104,36 @@ public void handlesAllEventIfNotGenerationAware() { } @Test - public void eventNotMarkedForLastGenerationIfNoFinalizer() { + public void eventWithNoGenerationProcessedIfNoFinalizer() { TestCustomResource customResource1 = TestUtils.testCustomResource(); customResourceEventSource.eventReceived(ResourceAction.UPDATED, customResource1, customResource1); + verify(eventHandler, times(1)).handleEvent(any()); + } - customResourceEventSource.eventReceived(ResourceAction.UPDATED, customResource1, - customResource1); - verify(eventHandler, times(2)).handleEvent(any()); + @Test + public void handlesNextEventIfWhitelisted() { + TestCustomResource customResource = TestUtils.testCustomResource(); + customResource.getMetadata().setFinalizers(List.of(FINALIZER)); + customResourceEventSource.whitelistNextEvent(CustomResourceID.fromResource(customResource)); + + customResourceEventSource.eventReceived(ResourceAction.UPDATED, customResource, + customResource); + + verify(eventHandler, times(1)).handleEvent(any()); + } + + @Test + public void notHandlesNextEventIfNotWhitelisted() { + TestCustomResource customResource = TestUtils.testCustomResource(); + customResource.getMetadata().setFinalizers(List.of(FINALIZER)); + + customResourceEventSource.eventReceived(ResourceAction.UPDATED, customResource, + customResource); + + verify(eventHandler, times(0)).handleEvent(any()); } private static class TestConfiguredController extends ConfiguredController { diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/internal/OnceWhitelistEventFilterEventFilterTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/internal/OnceWhitelistEventFilterEventFilterTest.java new file mode 100644 index 0000000000..fde6bb2a7f --- /dev/null +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/internal/OnceWhitelistEventFilterEventFilterTest.java @@ -0,0 +1,43 @@ +package io.javaoperatorsdk.operator.processing.event.internal; + +import org.junit.jupiter.api.Test; + +import io.javaoperatorsdk.operator.TestUtils; +import io.javaoperatorsdk.operator.processing.event.CustomResourceID; + +import static io.javaoperatorsdk.operator.TestUtils.testCustomResource; +import static org.assertj.core.api.Assertions.assertThat; + +class OnceWhitelistEventFilterEventFilterTest { + + private OnceWhitelistEventFilterEventFilter filter = new OnceWhitelistEventFilterEventFilter<>(); + + @Test + public void notAcceptCustomResourceNotWhitelisted() { + assertThat(filter.acceptChange(null, + testCustomResource(), testCustomResource())).isFalse(); + } + + @Test + public void allowCustomResourceWhitelisted() { + var cr = TestUtils.testCustomResource(); + + filter.whitelistNextEvent(CustomResourceID.fromResource(cr)); + + assertThat(filter.acceptChange(null, + cr, cr)).isTrue(); + } + + @Test + public void allowCustomResourceWhitelistedOnlyOnce() { + var cr = TestUtils.testCustomResource(); + + filter.whitelistNextEvent(CustomResourceID.fromResource(cr)); + + assertThat(filter.acceptChange(null, + cr, cr)).isTrue(); + assertThat(filter.acceptChange(null, + cr, cr)).isFalse(); + } + +}