Skip to content

Race condition fix #1011

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Mar 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ public void start() throws OperatorException {
if (reconciler instanceof EventSourceInitializer) {
((EventSourceInitializer<R>) reconciler)
.prepareEventSources(new EventSourceContext<>(
eventSourceManager.getControllerResourceEventSource().getResourceCache(),
eventSourceManager.getControllerResourceEventSource(),
configurationService(), kubernetesClient))
.forEach(eventSourceManager::registerEventSource);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import io.javaoperatorsdk.operator.processing.retry.RetryExecution;

import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getName;
import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getVersion;

class EventProcessor<R extends HasMetadata> implements EventHandler, LifecycleAware {

Expand All @@ -50,7 +49,7 @@ class EventProcessor<R extends HasMetadata> implements EventHandler, LifecycleAw

EventProcessor(EventSourceManager<R> eventSourceManager) {
this(
eventSourceManager.getControllerResourceEventSource().getResourceCache(),
eventSourceManager.getControllerResourceEventSource(),
ExecutorServiceManager.instance().executorService(),
eventSourceManager.getController().getConfiguration().getName(),
new ReconciliationDispatcher<>(eventSourceManager.getController()),
Expand All @@ -73,7 +72,7 @@ class EventProcessor<R extends HasMetadata> implements EventHandler, LifecycleAw
Retry retry,
Metrics metrics) {
this(
eventSourceManager.getControllerResourceEventSource().getResourceCache(),
eventSourceManager.getControllerResourceEventSource(),
null,
relatedControllerName,
reconciliationDispatcher,
Expand Down Expand Up @@ -208,12 +207,12 @@ void eventProcessingFinished(
if (eventMarker.deleteEventPresent(resourceID)) {
cleanupForDeletedEvent(executionScope.getCustomResourceID());
} else {
postExecutionControl.getUpdatedCustomResource().ifPresent(r -> {
eventSourceManager.getControllerResourceEventSource().handleRecentResourceUpdate(
r, executionScope.getResource());
});
if (eventMarker.eventPresent(resourceID)) {
if (isCacheReadyForInstantReconciliation(executionScope, postExecutionControl)) {
submitReconciliationExecution(resourceID);
} else {
postponeReconciliationAndHandleCacheSyncEvent(resourceID);
}
submitReconciliationExecution(resourceID);
} else {
reScheduleExecutionIfInstructed(postExecutionControl, executionScope.getResource());
}
Expand All @@ -223,41 +222,6 @@ void eventProcessingFinished(
}
}

private void postponeReconciliationAndHandleCacheSyncEvent(ResourceID resourceID) {
eventSourceManager.getControllerResourceEventSource().whitelistNextEvent(resourceID);
}

private boolean isCacheReadyForInstantReconciliation(
ExecutionScope<R> executionScope, PostExecutionControl<R> postExecutionControl) {
if (!postExecutionControl.customResourceUpdatedDuringExecution()) {
return true;
}
String originalResourceVersion = getVersion(executionScope.getResource());
String customResourceVersionAfterExecution =
getVersion(
postExecutionControl
.getUpdatedCustomResource()
.orElseThrow(
() -> new IllegalStateException(
"Updated custom resource must be present at this point of time")));
String cachedCustomResourceVersion =
getVersion(
cache
.get(executionScope.getCustomResourceID())
.orElseThrow(
() -> new IllegalStateException(
"Cached custom resource must be present at this point")));

if (cachedCustomResourceVersion.equals(customResourceVersionAfterExecution)) {
return true;
}
// If the cached resource version equals neither the version before nor after execution
// probably an update happened on the custom resource independent of the framework during
// reconciliation. We cannot tell at this point if it happened before our update or before.
// (Well we could if we would parse resource version, but that should not be done by definition)
return !cachedCustomResourceVersion.equals(originalResourceVersion);
}

private void reScheduleExecutionIfInstructed(
PostExecutionControl<R> postExecutionControl, R customResource) {
postExecutionControl
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
import java.util.Objects;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Predicate;
import java.util.stream.Stream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -22,14 +24,15 @@
import io.javaoperatorsdk.operator.processing.MDCUtils;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.AbstractResourceEventSource;
import io.javaoperatorsdk.operator.processing.event.source.ResourceCache;

import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getName;
import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getUID;
import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getVersion;

public class ControllerResourceEventSource<T extends HasMetadata>
extends AbstractResourceEventSource<T, T>
implements ResourceEventHandler<T> {
implements ResourceEventHandler<T>, ResourceCache<T> {

public static final String ANY_NAMESPACE_MAP_KEY = "anyNamespace";

Expand All @@ -40,8 +43,8 @@ public class ControllerResourceEventSource<T extends HasMetadata>
new ConcurrentHashMap<>();

private final ResourceEventFilter<T> filter;
private final OnceWhitelistEventFilterEventFilter<T> onceWhitelistEventFilterEventFilter;
private final ControllerResourceCache<T> cache;
private final TemporaryResourceCache<T> temporaryResourceCache;

public ControllerResourceEventSource(Controller<T> controller) {
super(controller.getConfiguration().getResourceClass());
Expand All @@ -50,20 +53,12 @@ public ControllerResourceEventSource(Controller<T> controller) {
var cloner = configurationService != null ? configurationService.getResourceCloner()
: ConfigurationService.DEFAULT_CLONER;
this.cache = new ControllerResourceCache<>(sharedIndexInformers, cloner);

temporaryResourceCache = new TemporaryResourceCache<>(cache);
var filters = new ResourceEventFilter[] {
ResourceEventFilters.finalizerNeededAndApplied(),
ResourceEventFilters.markedForDeletion(),
ResourceEventFilters.generationAware(),
null
ResourceEventFilters.generationAware()
};

if (controller.getConfiguration().isGenerationAware()) {
onceWhitelistEventFilterEventFilter = new OnceWhitelistEventFilterEventFilter<>();
filters[filters.length - 1] = onceWhitelistEventFilterEventFilter;
} else {
onceWhitelistEventFilterEventFilter = null;
}
if (controller.getConfiguration().getEventFilter() != null) {
filter = controller.getConfiguration().getEventFilter().and(ResourceEventFilters.or(filters));
} else {
Expand Down Expand Up @@ -126,6 +121,7 @@ public void eventReceived(ResourceAction action, T customResource, T oldResource
try {
log.debug(
"Event received for resource: {}", getName(customResource));
temporaryResourceCache.removeResourceFromCache(customResource);
MDCUtils.addResourceInfo(customResource);
controller.getEventSourceManager().broadcastOnResourceEvent(action, customResource,
oldResource);
Expand Down Expand Up @@ -158,12 +154,31 @@ public void onDelete(T resource, boolean b) {
eventReceived(ResourceAction.DELETED, resource, null);
}


@Override
public Optional<T> get(ResourceID resourceID) {
return cache.get(resourceID);
Optional<T> resource = temporaryResourceCache.getResourceFromCache(resourceID);
if (resource.isPresent()) {
log.debug("Resource found in temporal cache for Resource ID: {}", resourceID);
return resource;
} else {
return cache.get(resourceID);
}
}

@Override
public Stream<ResourceID> keys() {
return cache.keys();
}

@Override
public Stream<T> list(Predicate<T> predicate) {
return cache.list(predicate);
}

public ControllerResourceCache<T> getResourceCache() {
return cache;
@Override
public Stream<T> list(String namespace, Predicate<T> predicate) {
return cache.list(namespace, predicate);
}

/**
Expand All @@ -178,19 +193,6 @@ public SharedIndexInformer<T> getInformer(String namespace) {
return getInformers().get(Objects.requireNonNullElse(namespace, ANY_NAMESPACE_MAP_KEY));
}

/**
* This will ensure that the next event received after this method is called will not be filtered
* out.
*
* @param resourceID - to which the event is related
*/
public void whitelistNextEvent(ResourceID resourceID) {
if (onceWhitelistEventFilterEventFilter != null) {
onceWhitelistEventFilterEventFilter.whitelistNextEvent(resourceID);
}
}


private void handleKubernetesClientException(Exception e) {
KubernetesClientException ke = (KubernetesClientException) e;
if (404 == ke.getCode()) {
Expand All @@ -204,6 +206,13 @@ private void handleKubernetesClientException(Exception e) {

@Override
public Optional<T> getAssociated(T primary) {
return cache.get(ResourceID.fromResource(primary));
return get(ResourceID.fromResource(primary));
}

public void handleRecentResourceUpdate(T resource,
T previousResourceVersion) {
temporaryResourceCache.putUpdatedResource(resource,
previousResourceVersion.getMetadata().getResourceVersion());
}

}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package io.javaoperatorsdk.operator.processing.event.source.controller;

import java.util.HashMap;
import java.util.Map;
import java.util.Optional;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.processing.event.ResourceID;

public class TemporaryResourceCache<T extends HasMetadata> {

private static final Logger log = LoggerFactory.getLogger(TemporaryResourceCache.class);

private final Map<ResourceID, T> cache = new HashMap<>();
private final ControllerResourceCache<T> managedInformerEventSource;

public TemporaryResourceCache(ControllerResourceCache<T> managedInformerEventSource) {
this.managedInformerEventSource = managedInformerEventSource;
}

public synchronized void removeResourceFromCache(T resource) {
cache.remove(ResourceID.fromResource(resource));
}

public synchronized void putUpdatedResource(T newResource, String previousResourceVersion) {
var resourceId = ResourceID.fromResource(newResource);
var informerCacheResource = managedInformerEventSource.get(resourceId);
if (informerCacheResource.isEmpty()) {
log.debug("No cached value present for resource: {}", newResource);
return;
}
// if this is not true that means the cache was already updated
if (informerCacheResource.get().getMetadata().getResourceVersion()
.equals(previousResourceVersion)) {
log.debug("Putting resource to temporal cache with id: {}", resourceId);
cache.put(resourceId, newResource);
} else {
// if something is in cache it's surely obsolete now
cache.remove(resourceId);
}
}

public synchronized Optional<T> getResourceFromCache(ResourceID resourceID) {
return Optional.ofNullable(cache.get(resourceID));
}
}

Loading