Skip to content

Commit acd9ec4

Browse files
authored
Race condition fix (#1011)
1 parent bf075d7 commit acd9ec4

File tree

8 files changed

+101
-229
lines changed

8 files changed

+101
-229
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -188,7 +188,7 @@ public void start() throws OperatorException {
188188
if (reconciler instanceof EventSourceInitializer) {
189189
((EventSourceInitializer<R>) reconciler)
190190
.prepareEventSources(new EventSourceContext<>(
191-
eventSourceManager.getControllerResourceEventSource().getResourceCache(),
191+
eventSourceManager.getControllerResourceEventSource(),
192192
configurationService(), kubernetesClient))
193193
.forEach(eventSourceManager::registerEventSource);
194194
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java

Lines changed: 7 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@
2929
import io.javaoperatorsdk.operator.processing.retry.RetryExecution;
3030

3131
import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getName;
32-
import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getVersion;
3332

3433
class EventProcessor<R extends HasMetadata> implements EventHandler, LifecycleAware {
3534

@@ -50,7 +49,7 @@ class EventProcessor<R extends HasMetadata> implements EventHandler, LifecycleAw
5049

5150
EventProcessor(EventSourceManager<R> eventSourceManager) {
5251
this(
53-
eventSourceManager.getControllerResourceEventSource().getResourceCache(),
52+
eventSourceManager.getControllerResourceEventSource(),
5453
ExecutorServiceManager.instance().executorService(),
5554
eventSourceManager.getController().getConfiguration().getName(),
5655
new ReconciliationDispatcher<>(eventSourceManager.getController()),
@@ -73,7 +72,7 @@ class EventProcessor<R extends HasMetadata> implements EventHandler, LifecycleAw
7372
Retry retry,
7473
Metrics metrics) {
7574
this(
76-
eventSourceManager.getControllerResourceEventSource().getResourceCache(),
75+
eventSourceManager.getControllerResourceEventSource(),
7776
null,
7877
relatedControllerName,
7978
reconciliationDispatcher,
@@ -208,12 +207,12 @@ void eventProcessingFinished(
208207
if (eventMarker.deleteEventPresent(resourceID)) {
209208
cleanupForDeletedEvent(executionScope.getCustomResourceID());
210209
} else {
210+
postExecutionControl.getUpdatedCustomResource().ifPresent(r -> {
211+
eventSourceManager.getControllerResourceEventSource().handleRecentResourceUpdate(
212+
r, executionScope.getResource());
213+
});
211214
if (eventMarker.eventPresent(resourceID)) {
212-
if (isCacheReadyForInstantReconciliation(executionScope, postExecutionControl)) {
213-
submitReconciliationExecution(resourceID);
214-
} else {
215-
postponeReconciliationAndHandleCacheSyncEvent(resourceID);
216-
}
215+
submitReconciliationExecution(resourceID);
217216
} else {
218217
reScheduleExecutionIfInstructed(postExecutionControl, executionScope.getResource());
219218
}
@@ -223,41 +222,6 @@ void eventProcessingFinished(
223222
}
224223
}
225224

226-
private void postponeReconciliationAndHandleCacheSyncEvent(ResourceID resourceID) {
227-
eventSourceManager.getControllerResourceEventSource().whitelistNextEvent(resourceID);
228-
}
229-
230-
private boolean isCacheReadyForInstantReconciliation(
231-
ExecutionScope<R> executionScope, PostExecutionControl<R> postExecutionControl) {
232-
if (!postExecutionControl.customResourceUpdatedDuringExecution()) {
233-
return true;
234-
}
235-
String originalResourceVersion = getVersion(executionScope.getResource());
236-
String customResourceVersionAfterExecution =
237-
getVersion(
238-
postExecutionControl
239-
.getUpdatedCustomResource()
240-
.orElseThrow(
241-
() -> new IllegalStateException(
242-
"Updated custom resource must be present at this point of time")));
243-
String cachedCustomResourceVersion =
244-
getVersion(
245-
cache
246-
.get(executionScope.getCustomResourceID())
247-
.orElseThrow(
248-
() -> new IllegalStateException(
249-
"Cached custom resource must be present at this point")));
250-
251-
if (cachedCustomResourceVersion.equals(customResourceVersionAfterExecution)) {
252-
return true;
253-
}
254-
// If the cached resource version equals neither the version before nor after execution
255-
// probably an update happened on the custom resource independent of the framework during
256-
// reconciliation. We cannot tell at this point if it happened before our update or before.
257-
// (Well we could if we would parse resource version, but that should not be done by definition)
258-
return !cachedCustomResourceVersion.equals(originalResourceVersion);
259-
}
260-
261225
private void reScheduleExecutionIfInstructed(
262226
PostExecutionControl<R> postExecutionControl, R customResource) {
263227
postExecutionControl

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/ControllerResourceEventSource.java

Lines changed: 38 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
import java.util.Objects;
66
import java.util.Optional;
77
import java.util.concurrent.ConcurrentHashMap;
8+
import java.util.function.Predicate;
9+
import java.util.stream.Stream;
810

911
import org.slf4j.Logger;
1012
import org.slf4j.LoggerFactory;
@@ -22,14 +24,15 @@
2224
import io.javaoperatorsdk.operator.processing.MDCUtils;
2325
import io.javaoperatorsdk.operator.processing.event.ResourceID;
2426
import io.javaoperatorsdk.operator.processing.event.source.AbstractResourceEventSource;
27+
import io.javaoperatorsdk.operator.processing.event.source.ResourceCache;
2528

2629
import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getName;
2730
import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getUID;
2831
import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getVersion;
2932

3033
public class ControllerResourceEventSource<T extends HasMetadata>
3134
extends AbstractResourceEventSource<T, T>
32-
implements ResourceEventHandler<T> {
35+
implements ResourceEventHandler<T>, ResourceCache<T> {
3336

3437
public static final String ANY_NAMESPACE_MAP_KEY = "anyNamespace";
3538

@@ -40,8 +43,8 @@ public class ControllerResourceEventSource<T extends HasMetadata>
4043
new ConcurrentHashMap<>();
4144

4245
private final ResourceEventFilter<T> filter;
43-
private final OnceWhitelistEventFilterEventFilter<T> onceWhitelistEventFilterEventFilter;
4446
private final ControllerResourceCache<T> cache;
47+
private final TemporaryResourceCache<T> temporaryResourceCache;
4548

4649
public ControllerResourceEventSource(Controller<T> controller) {
4750
super(controller.getConfiguration().getResourceClass());
@@ -50,20 +53,12 @@ public ControllerResourceEventSource(Controller<T> controller) {
5053
var cloner = configurationService != null ? configurationService.getResourceCloner()
5154
: ConfigurationService.DEFAULT_CLONER;
5255
this.cache = new ControllerResourceCache<>(sharedIndexInformers, cloner);
53-
56+
temporaryResourceCache = new TemporaryResourceCache<>(cache);
5457
var filters = new ResourceEventFilter[] {
5558
ResourceEventFilters.finalizerNeededAndApplied(),
5659
ResourceEventFilters.markedForDeletion(),
57-
ResourceEventFilters.generationAware(),
58-
null
60+
ResourceEventFilters.generationAware()
5961
};
60-
61-
if (controller.getConfiguration().isGenerationAware()) {
62-
onceWhitelistEventFilterEventFilter = new OnceWhitelistEventFilterEventFilter<>();
63-
filters[filters.length - 1] = onceWhitelistEventFilterEventFilter;
64-
} else {
65-
onceWhitelistEventFilterEventFilter = null;
66-
}
6762
if (controller.getConfiguration().getEventFilter() != null) {
6863
filter = controller.getConfiguration().getEventFilter().and(ResourceEventFilters.or(filters));
6964
} else {
@@ -126,6 +121,7 @@ public void eventReceived(ResourceAction action, T customResource, T oldResource
126121
try {
127122
log.debug(
128123
"Event received for resource: {}", getName(customResource));
124+
temporaryResourceCache.removeResourceFromCache(customResource);
129125
MDCUtils.addResourceInfo(customResource);
130126
controller.getEventSourceManager().broadcastOnResourceEvent(action, customResource,
131127
oldResource);
@@ -158,12 +154,31 @@ public void onDelete(T resource, boolean b) {
158154
eventReceived(ResourceAction.DELETED, resource, null);
159155
}
160156

157+
158+
@Override
161159
public Optional<T> get(ResourceID resourceID) {
162-
return cache.get(resourceID);
160+
Optional<T> resource = temporaryResourceCache.getResourceFromCache(resourceID);
161+
if (resource.isPresent()) {
162+
log.debug("Resource found in temporal cache for Resource ID: {}", resourceID);
163+
return resource;
164+
} else {
165+
return cache.get(resourceID);
166+
}
167+
}
168+
169+
@Override
170+
public Stream<ResourceID> keys() {
171+
return cache.keys();
172+
}
173+
174+
@Override
175+
public Stream<T> list(Predicate<T> predicate) {
176+
return cache.list(predicate);
163177
}
164178

165-
public ControllerResourceCache<T> getResourceCache() {
166-
return cache;
179+
@Override
180+
public Stream<T> list(String namespace, Predicate<T> predicate) {
181+
return cache.list(namespace, predicate);
167182
}
168183

169184
/**
@@ -178,19 +193,6 @@ public SharedIndexInformer<T> getInformer(String namespace) {
178193
return getInformers().get(Objects.requireNonNullElse(namespace, ANY_NAMESPACE_MAP_KEY));
179194
}
180195

181-
/**
182-
* This will ensure that the next event received after this method is called will not be filtered
183-
* out.
184-
*
185-
* @param resourceID - to which the event is related
186-
*/
187-
public void whitelistNextEvent(ResourceID resourceID) {
188-
if (onceWhitelistEventFilterEventFilter != null) {
189-
onceWhitelistEventFilterEventFilter.whitelistNextEvent(resourceID);
190-
}
191-
}
192-
193-
194196
private void handleKubernetesClientException(Exception e) {
195197
KubernetesClientException ke = (KubernetesClientException) e;
196198
if (404 == ke.getCode()) {
@@ -204,6 +206,13 @@ private void handleKubernetesClientException(Exception e) {
204206

205207
@Override
206208
public Optional<T> getAssociated(T primary) {
207-
return cache.get(ResourceID.fromResource(primary));
209+
return get(ResourceID.fromResource(primary));
208210
}
211+
212+
public void handleRecentResourceUpdate(T resource,
213+
T previousResourceVersion) {
214+
temporaryResourceCache.putUpdatedResource(resource,
215+
previousResourceVersion.getMetadata().getResourceVersion());
216+
}
217+
209218
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/controller/OnceWhitelistEventFilterEventFilter.java

Lines changed: 0 additions & 35 deletions
This file was deleted.
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package io.javaoperatorsdk.operator.processing.event.source.controller;
2+
3+
import java.util.HashMap;
4+
import java.util.Map;
5+
import java.util.Optional;
6+
7+
import org.slf4j.Logger;
8+
import org.slf4j.LoggerFactory;
9+
10+
import io.fabric8.kubernetes.api.model.HasMetadata;
11+
import io.javaoperatorsdk.operator.processing.event.ResourceID;
12+
13+
public class TemporaryResourceCache<T extends HasMetadata> {
14+
15+
private static final Logger log = LoggerFactory.getLogger(TemporaryResourceCache.class);
16+
17+
private final Map<ResourceID, T> cache = new HashMap<>();
18+
private final ControllerResourceCache<T> managedInformerEventSource;
19+
20+
public TemporaryResourceCache(ControllerResourceCache<T> managedInformerEventSource) {
21+
this.managedInformerEventSource = managedInformerEventSource;
22+
}
23+
24+
public synchronized void removeResourceFromCache(T resource) {
25+
cache.remove(ResourceID.fromResource(resource));
26+
}
27+
28+
public synchronized void putUpdatedResource(T newResource, String previousResourceVersion) {
29+
var resourceId = ResourceID.fromResource(newResource);
30+
var informerCacheResource = managedInformerEventSource.get(resourceId);
31+
if (informerCacheResource.isEmpty()) {
32+
log.debug("No cached value present for resource: {}", newResource);
33+
return;
34+
}
35+
// if this is not true that means the cache was already updated
36+
if (informerCacheResource.get().getMetadata().getResourceVersion()
37+
.equals(previousResourceVersion)) {
38+
log.debug("Putting resource to temporal cache with id: {}", resourceId);
39+
cache.put(resourceId, newResource);
40+
} else {
41+
// if something is in cache it's surely obsolete now
42+
cache.remove(resourceId);
43+
}
44+
}
45+
46+
public synchronized Optional<T> getResourceFromCache(ResourceID resourceID) {
47+
return Optional.ofNullable(cache.get(resourceID));
48+
}
49+
}
50+

0 commit comments

Comments
 (0)