diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java index f9a600f1a7..704945925f 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java @@ -188,7 +188,7 @@ public void start() throws OperatorException { if (reconciler instanceof EventSourceInitializer) { ((EventSourceInitializer) reconciler) .prepareEventSources(new EventSourceContext<>( - eventSourceManager.getControllerResourceEventSource().getResourceCache(), + eventSourceManager.getControllerResourceEventSource(), configurationService(), kubernetesClient)) .forEach(eventSourceManager::registerEventSource); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java index 29f9adab55..a88d80783d 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java @@ -29,7 +29,6 @@ import io.javaoperatorsdk.operator.processing.retry.RetryExecution; import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getName; -import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getVersion; class EventProcessor implements EventHandler, LifecycleAware { @@ -50,7 +49,7 @@ class EventProcessor implements EventHandler, LifecycleAw EventProcessor(EventSourceManager eventSourceManager) { this( - eventSourceManager.getControllerResourceEventSource().getResourceCache(), + eventSourceManager.getControllerResourceEventSource(), ExecutorServiceManager.instance().executorService(), eventSourceManager.getController().getConfiguration().getName(), new ReconciliationDispatcher<>(eventSourceManager.getController()), @@ -73,7 +72,7 @@ class EventProcessor implements EventHandler, LifecycleAw Retry retry, Metrics metrics) { this( - eventSourceManager.getControllerResourceEventSource().getResourceCache(), + eventSourceManager.getControllerResourceEventSource(), null, relatedControllerName, reconciliationDispatcher, @@ -208,12 +207,12 @@ void eventProcessingFinished( if (eventMarker.deleteEventPresent(resourceID)) { cleanupForDeletedEvent(executionScope.getCustomResourceID()); } else { + postExecutionControl.getUpdatedCustomResource().ifPresent(r -> { + eventSourceManager.getControllerResourceEventSource().handleRecentResourceUpdate( + r, executionScope.getResource()); + }); if (eventMarker.eventPresent(resourceID)) { - if (isCacheReadyForInstantReconciliation(executionScope, postExecutionControl)) { - submitReconciliationExecution(resourceID); - } else { - postponeReconciliationAndHandleCacheSyncEvent(resourceID); - } + submitReconciliationExecution(resourceID); } else { reScheduleExecutionIfInstructed(postExecutionControl, executionScope.getResource()); } @@ -223,41 +222,6 @@ void eventProcessingFinished( } } - private void postponeReconciliationAndHandleCacheSyncEvent(ResourceID resourceID) { - eventSourceManager.getControllerResourceEventSource().whitelistNextEvent(resourceID); - } - - private boolean isCacheReadyForInstantReconciliation( - ExecutionScope executionScope, PostExecutionControl postExecutionControl) { - if (!postExecutionControl.customResourceUpdatedDuringExecution()) { - return true; - } - String originalResourceVersion = getVersion(executionScope.getResource()); - String customResourceVersionAfterExecution = - getVersion( - postExecutionControl - .getUpdatedCustomResource() - .orElseThrow( - () -> new IllegalStateException( - "Updated custom resource must be present at this point of time"))); - String cachedCustomResourceVersion = - getVersion( - cache - .get(executionScope.getCustomResourceID()) - .orElseThrow( - () -> new IllegalStateException( - "Cached custom resource must be present at this point"))); - - if (cachedCustomResourceVersion.equals(customResourceVersionAfterExecution)) { - return true; - } - // If the cached resource version equals neither the version before nor after execution - // probably an update happened on the custom resource independent of the framework during - // reconciliation. We cannot tell at this point if it happened before our update or before. - // (Well we could if we would parse resource version, but that should not be done by definition) - return !cachedCustomResourceVersion.equals(originalResourceVersion); - } - private void reScheduleExecutionIfInstructed( PostExecutionControl postExecutionControl, R customResource) { postExecutionControl diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSource.java index 9feabf40bf..e2b4e89ebd 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSource.java @@ -5,6 +5,8 @@ import java.util.Objects; import java.util.Optional; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Predicate; +import java.util.stream.Stream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -22,6 +24,7 @@ import io.javaoperatorsdk.operator.processing.MDCUtils; import io.javaoperatorsdk.operator.processing.event.ResourceID; import io.javaoperatorsdk.operator.processing.event.source.AbstractResourceEventSource; +import io.javaoperatorsdk.operator.processing.event.source.ResourceCache; import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getName; import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getUID; @@ -29,7 +32,7 @@ public class ControllerResourceEventSource extends AbstractResourceEventSource - implements ResourceEventHandler { + implements ResourceEventHandler, ResourceCache { public static final String ANY_NAMESPACE_MAP_KEY = "anyNamespace"; @@ -40,8 +43,8 @@ public class ControllerResourceEventSource new ConcurrentHashMap<>(); private final ResourceEventFilter filter; - private final OnceWhitelistEventFilterEventFilter onceWhitelistEventFilterEventFilter; private final ControllerResourceCache cache; + private final TemporaryResourceCache temporaryResourceCache; public ControllerResourceEventSource(Controller controller) { super(controller.getConfiguration().getResourceClass()); @@ -50,20 +53,12 @@ public ControllerResourceEventSource(Controller controller) { var cloner = configurationService != null ? configurationService.getResourceCloner() : ConfigurationService.DEFAULT_CLONER; this.cache = new ControllerResourceCache<>(sharedIndexInformers, cloner); - + temporaryResourceCache = new TemporaryResourceCache<>(cache); var filters = new ResourceEventFilter[] { ResourceEventFilters.finalizerNeededAndApplied(), ResourceEventFilters.markedForDeletion(), - ResourceEventFilters.generationAware(), - null + ResourceEventFilters.generationAware() }; - - if (controller.getConfiguration().isGenerationAware()) { - onceWhitelistEventFilterEventFilter = new OnceWhitelistEventFilterEventFilter<>(); - filters[filters.length - 1] = onceWhitelistEventFilterEventFilter; - } else { - onceWhitelistEventFilterEventFilter = null; - } if (controller.getConfiguration().getEventFilter() != null) { filter = controller.getConfiguration().getEventFilter().and(ResourceEventFilters.or(filters)); } else { @@ -126,6 +121,7 @@ public void eventReceived(ResourceAction action, T customResource, T oldResource try { log.debug( "Event received for resource: {}", getName(customResource)); + temporaryResourceCache.removeResourceFromCache(customResource); MDCUtils.addResourceInfo(customResource); controller.getEventSourceManager().broadcastOnResourceEvent(action, customResource, oldResource); @@ -158,12 +154,31 @@ public void onDelete(T resource, boolean b) { eventReceived(ResourceAction.DELETED, resource, null); } + + @Override public Optional get(ResourceID resourceID) { - return cache.get(resourceID); + Optional resource = temporaryResourceCache.getResourceFromCache(resourceID); + if (resource.isPresent()) { + log.debug("Resource found in temporal cache for Resource ID: {}", resourceID); + return resource; + } else { + return cache.get(resourceID); + } + } + + @Override + public Stream keys() { + return cache.keys(); + } + + @Override + public Stream list(Predicate predicate) { + return cache.list(predicate); } - public ControllerResourceCache getResourceCache() { - return cache; + @Override + public Stream list(String namespace, Predicate predicate) { + return cache.list(namespace, predicate); } /** @@ -178,19 +193,6 @@ public SharedIndexInformer getInformer(String namespace) { return getInformers().get(Objects.requireNonNullElse(namespace, ANY_NAMESPACE_MAP_KEY)); } - /** - * This will ensure that the next event received after this method is called will not be filtered - * out. - * - * @param resourceID - to which the event is related - */ - public void whitelistNextEvent(ResourceID resourceID) { - if (onceWhitelistEventFilterEventFilter != null) { - onceWhitelistEventFilterEventFilter.whitelistNextEvent(resourceID); - } - } - - private void handleKubernetesClientException(Exception e) { KubernetesClientException ke = (KubernetesClientException) e; if (404 == ke.getCode()) { @@ -204,6 +206,13 @@ private void handleKubernetesClientException(Exception e) { @Override public Optional getAssociated(T primary) { - return cache.get(ResourceID.fromResource(primary)); + return get(ResourceID.fromResource(primary)); } + + public void handleRecentResourceUpdate(T resource, + T previousResourceVersion) { + temporaryResourceCache.putUpdatedResource(resource, + previousResourceVersion.getMetadata().getResourceVersion()); + } + } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/OnceWhitelistEventFilterEventFilter.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/OnceWhitelistEventFilterEventFilter.java deleted file mode 100644 index 8262ff1c21..0000000000 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/OnceWhitelistEventFilterEventFilter.java +++ /dev/null @@ -1,35 +0,0 @@ -package io.javaoperatorsdk.operator.processing.event.source.controller; - -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import io.fabric8.kubernetes.api.model.HasMetadata; -import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; -import io.javaoperatorsdk.operator.processing.event.ResourceID; - -public class OnceWhitelistEventFilterEventFilter - implements ResourceEventFilter { - - private static final Logger log = - LoggerFactory.getLogger(OnceWhitelistEventFilterEventFilter.class); - - private final ConcurrentMap whiteList = new ConcurrentHashMap<>(); - - @Override - public boolean acceptChange(ControllerConfiguration configuration, T oldResource, - T newResource) { - ResourceID resourceID = ResourceID.fromResource(newResource); - boolean res = whiteList.remove(resourceID, resourceID); - if (res) { - log.debug("Accepting whitelisted event for CR id: {}", resourceID); - } - return res; - } - - public void whitelistNextEvent(ResourceID resourceID) { - whiteList.putIfAbsent(resourceID, resourceID); - } -} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/TemporaryResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/TemporaryResourceCache.java new file mode 100644 index 0000000000..fb402b0b86 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/TemporaryResourceCache.java @@ -0,0 +1,50 @@ +package io.javaoperatorsdk.operator.processing.event.source.controller; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.javaoperatorsdk.operator.processing.event.ResourceID; + +public class TemporaryResourceCache { + + private static final Logger log = LoggerFactory.getLogger(TemporaryResourceCache.class); + + private final Map cache = new HashMap<>(); + private final ControllerResourceCache managedInformerEventSource; + + public TemporaryResourceCache(ControllerResourceCache managedInformerEventSource) { + this.managedInformerEventSource = managedInformerEventSource; + } + + public synchronized void removeResourceFromCache(T resource) { + cache.remove(ResourceID.fromResource(resource)); + } + + public synchronized void putUpdatedResource(T newResource, String previousResourceVersion) { + var resourceId = ResourceID.fromResource(newResource); + var informerCacheResource = managedInformerEventSource.get(resourceId); + if (informerCacheResource.isEmpty()) { + log.debug("No cached value present for resource: {}", newResource); + return; + } + // if this is not true that means the cache was already updated + if (informerCacheResource.get().getMetadata().getResourceVersion() + .equals(previousResourceVersion)) { + log.debug("Putting resource to temporal cache with id: {}", resourceId); + cache.put(resourceId, newResource); + } else { + // if something is in cache it's surely obsolete now + cache.remove(resourceId); + } + } + + public synchronized Optional getResourceFromCache(ResourceID resourceID) { + return Optional.ofNullable(cache.get(resourceID)); + } +} + diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java index 5a3f73742e..7422e6f572 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventProcessorTest.java @@ -13,7 +13,6 @@ import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.api.monitoring.Metrics; -import io.javaoperatorsdk.operator.processing.event.source.controller.ControllerResourceCache; import io.javaoperatorsdk.operator.processing.event.source.controller.ControllerResourceEventSource; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceAction; import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceEvent; @@ -44,8 +43,6 @@ class EventProcessorTest { private ReconciliationDispatcher reconciliationDispatcherMock = mock(ReconciliationDispatcher.class); private EventSourceManager eventSourceManagerMock = mock(EventSourceManager.class); - private ControllerResourceCache resourceCacheMock = - mock(ControllerResourceCache.class); private TimerEventSource retryTimerEventSourceMock = mock(TimerEventSource.class); private ControllerResourceEventSource controllerResourceEventSourceMock = mock(ControllerResourceEventSource.class); @@ -58,7 +55,6 @@ public void setup() { when(eventSourceManagerMock.getControllerResourceEventSource()) .thenReturn(controllerResourceEventSourceMock); - when(controllerResourceEventSourceMock.getResourceCache()).thenReturn(resourceCacheMock); eventProcessor = spy(new EventProcessor(reconciliationDispatcherMock, eventSourceManagerMock, "Test", null, @@ -83,7 +79,8 @@ public void dispatchesEventsIfNoExecutionInProgress() { @Test public void skipProcessingIfLatestCustomResourceNotInCache() { Event event = prepareCREvent(); - when(resourceCacheMock.get(event.getRelatedCustomResourceID())).thenReturn(Optional.empty()); + when(controllerResourceEventSourceMock.get(event.getRelatedCustomResourceID())) + .thenReturn(Optional.empty()); eventProcessor.handleEvent(event); @@ -214,57 +211,6 @@ public void doNotFireEventsIfClosing() { verify(reconciliationDispatcherMock, timeout(50).times(0)).handleExecution(any()); } - @Test - public void whitelistNextEventIfTheCacheIsNotPropagatedAfterAnUpdate() { - var crID = new ResourceID("test-cr", TEST_NAMESPACE); - var cr = testCustomResource(crID); - var updatedCr = testCustomResource(crID); - updatedCr.getMetadata().setResourceVersion("2"); - var mockCREventSource = mock(ControllerResourceEventSource.class); - eventProcessor.getEventMarker().markEventReceived(crID); - when(resourceCacheMock.get(eq(crID))).thenReturn(Optional.of(cr)); - when(eventSourceManagerMock.getControllerResourceEventSource()).thenReturn(mockCREventSource); - - eventProcessor.eventProcessingFinished(new ExecutionScope(cr, null), - PostExecutionControl.customResourceUpdated(updatedCr)); - - verify(mockCREventSource, times(1)).whitelistNextEvent(eq(crID)); - } - - @Test - public void dontWhitelistsEventWhenOtherChangeDuringExecution() { - var crID = new ResourceID("test-cr", TEST_NAMESPACE); - var cr = testCustomResource(crID); - var updatedCr = testCustomResource(crID); - updatedCr.getMetadata().setResourceVersion("2"); - var otherChangeCR = testCustomResource(crID); - otherChangeCR.getMetadata().setResourceVersion("3"); - var mockCREventSource = mock(ControllerResourceEventSource.class); - eventProcessor.getEventMarker().markEventReceived(crID); - when(resourceCacheMock.get(eq(crID))).thenReturn(Optional.of(otherChangeCR)); - when(eventSourceManagerMock.getControllerResourceEventSource()).thenReturn(mockCREventSource); - - eventProcessor.eventProcessingFinished(new ExecutionScope(cr, null), - PostExecutionControl.customResourceUpdated(updatedCr)); - - verify(mockCREventSource, times(0)).whitelistNextEvent(eq(crID)); - } - - @Test - public void dontWhitelistsEventIfUpdatedEventInCache() { - var crID = new ResourceID("test-cr", TEST_NAMESPACE); - var cr = testCustomResource(crID); - var mockCREventSource = mock(ControllerResourceEventSource.class); - eventProcessor.getEventMarker().markEventReceived(crID); - when(resourceCacheMock.get(eq(crID))).thenReturn(Optional.of(cr)); - when(eventSourceManagerMock.getControllerResourceEventSource()).thenReturn(mockCREventSource); - - eventProcessor.eventProcessingFinished(new ExecutionScope(cr, null), - PostExecutionControl.customResourceUpdated(cr)); - - verify(mockCREventSource, times(0)).whitelistNextEvent(eq(crID)); - } - @Test public void cancelScheduleOnceEventsOnSuccessfulExecution() { var crID = new ResourceID("test-cr", TEST_NAMESPACE); @@ -282,7 +228,8 @@ public void startProcessedMarkedEventReceivedBefore() { eventProcessor = spy(new EventProcessor(reconciliationDispatcherMock, eventSourceManagerMock, "Test", null, metricsMock)); - when(resourceCacheMock.get(eq(crID))).thenReturn(Optional.of(testCustomResource())); + when(controllerResourceEventSourceMock.get(eq(crID))) + .thenReturn(Optional.of(testCustomResource())); eventProcessor.handleEvent(new Event(crID)); verify(reconciliationDispatcherMock, timeout(100).times(0)).handleExecution(any()); @@ -311,7 +258,7 @@ private ResourceEvent prepareCREvent() { private ResourceEvent prepareCREvent(ResourceID uid) { TestCustomResource customResource = testCustomResource(uid); - when(resourceCacheMock.get(eq(uid))).thenReturn(Optional.of(customResource)); + when(controllerResourceEventSourceMock.get(eq(uid))).thenReturn(Optional.of(customResource)); return new ResourceEvent(ResourceAction.UPDATED, ResourceID.fromResource(customResource)); } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/OnceWhitelistEventFilterEventFilterTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/OnceWhitelistEventFilterEventFilterTest.java deleted file mode 100644 index f82bea55c7..0000000000 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/OnceWhitelistEventFilterEventFilterTest.java +++ /dev/null @@ -1,41 +0,0 @@ -package io.javaoperatorsdk.operator.processing.event.source; - -import org.junit.jupiter.api.Test; - -import io.javaoperatorsdk.operator.TestUtils; -import io.javaoperatorsdk.operator.processing.event.ResourceID; -import io.javaoperatorsdk.operator.processing.event.source.controller.OnceWhitelistEventFilterEventFilter; - -import static io.javaoperatorsdk.operator.TestUtils.testCustomResource; -import static org.assertj.core.api.Assertions.assertThat; - -class OnceWhitelistEventFilterEventFilterTest { - - private OnceWhitelistEventFilterEventFilter filter = new OnceWhitelistEventFilterEventFilter<>(); - - @Test - public void notAcceptCustomResourceNotWhitelisted() { - assertThat(filter.acceptChange(null, - testCustomResource(), testCustomResource())).isFalse(); - } - - @Test - public void allowCustomResourceWhitelisted() { - var cr = TestUtils.testCustomResource(); - - filter.whitelistNextEvent(ResourceID.fromResource(cr)); - - assertThat(filter.acceptChange(null, cr, cr)).isTrue(); - } - - @Test - public void allowCustomResourceWhitelistedOnlyOnce() { - var cr = TestUtils.testCustomResource(); - - filter.whitelistNextEvent(ResourceID.fromResource(cr)); - - assertThat(filter.acceptChange(null, cr, cr)).isTrue(); - assertThat(filter.acceptChange(null, cr, cr)).isFalse(); - } - -} diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSourceTest.java index 5859115ee2..7f20918f13 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSourceTest.java @@ -14,7 +14,6 @@ import io.javaoperatorsdk.operator.processing.Controller; import io.javaoperatorsdk.operator.processing.event.EventHandler; import io.javaoperatorsdk.operator.processing.event.EventSourceManager; -import io.javaoperatorsdk.operator.processing.event.ResourceID; import io.javaoperatorsdk.operator.processing.event.source.AbstractEventSourceTestBase; import io.javaoperatorsdk.operator.sample.simple.TestCustomResource; @@ -103,27 +102,6 @@ public void eventWithNoGenerationProcessedIfNoFinalizer() { verify(eventHandler, times(1)).handleEvent(any()); } - @Test - public void handlesNextEventIfWhitelisted() { - TestCustomResource customResource = TestUtils.testCustomResource(); - customResource.getMetadata().setFinalizers(List.of(FINALIZER)); - source.whitelistNextEvent(ResourceID.fromResource(customResource)); - - source.eventReceived(ResourceAction.UPDATED, customResource, customResource); - - verify(eventHandler, times(1)).handleEvent(any()); - } - - @Test - public void notHandlesNextEventIfNotWhitelisted() { - TestCustomResource customResource = TestUtils.testCustomResource(); - customResource.getMetadata().setFinalizers(List.of(FINALIZER)); - - source.eventReceived(ResourceAction.UPDATED, customResource, customResource); - - verify(eventHandler, times(0)).handleEvent(any()); - } - @Test public void callsBroadcastsOnResourceEvents() { TestCustomResource customResource1 = TestUtils.testCustomResource();