Skip to content

Temporal resource cache in Event Source #965

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 34 commits into from
Feb 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
1cb3457
feat: up to date resource from get resource
csviri Feb 16, 2022
5ecfe4b
feature: populate change to informer after latest update
csviri Feb 16, 2022
cdd91b3
fix: sample change
csviri Feb 17, 2022
b501d19
fix: integration tests
csviri Feb 17, 2022
f53b46e
fix: temporal cache for reading resources after update
csviri Feb 17, 2022
58d52cd
docs: temporal cache
csviri Feb 17, 2022
71e54df
fix: remove not used method
csviri Feb 17, 2022
d0033fa
fix: unit test for temp cache
csviri Feb 17, 2022
40e4063
fix: unit test wip
csviri Feb 17, 2022
38fbed8
fix: unit test dependent resource
csviri Feb 18, 2022
bf71615
fix: format
csviri Feb 18, 2022
2dbdbf3
Merge branch 'next' into dr-up-to-date-get-resource
csviri Feb 18, 2022
9d31856
Merge branch 'next' into dr-up-to-date-get-resource
csviri Feb 21, 2022
ec2a72c
feature: Handling just updated resources in informers and related log…
csviri Feb 22, 2022
6e3c5ed
fix: remove feature flag for informer event filter
csviri Feb 23, 2022
b4286bf
fix: unit and integration tests
csviri Feb 23, 2022
49f008c
fix: logging
csviri Feb 23, 2022
26e14d5
fix: loggings
csviri Feb 23, 2022
e6a0aa8
fix: test condition
csviri Feb 23, 2022
73afe1e
fix: bulletproof resource version filter
csviri Feb 23, 2022
a39dc1c
fix: recording wip
csviri Feb 24, 2022
b2910a3
fix: algorithm for event syncing
csviri Feb 24, 2022
c176c7f
docs: explanation
csviri Feb 24, 2022
3d13950
fix: format
csviri Feb 24, 2022
87c6b20
fix: dependent resource integration
csviri Feb 24, 2022
980a4c1
Merge branch 'next' into temporal-resource-cache-in-eventsource
csviri Feb 24, 2022
30d8cd8
fix: build fix merged next
csviri Feb 24, 2022
56bbe97
fix: dependent support resource update
csviri Feb 24, 2022
253c805
fix: integration test
csviri Feb 24, 2022
abc15f4
docs: wip
csviri Feb 24, 2022
1fade82
Update operator-framework-core/src/main/java/io/javaoperatorsdk/opera…
csviri Feb 24, 2022
2c6e1be
fix: format, some fixes from review
csviri Feb 24, 2022
a65d24e
fix: fixes for CR
csviri Feb 24, 2022
51e6494
fix: naming and unit tests
csviri Feb 25, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ protected DefaultInformerConfiguration(ConfigurationService service, String labe
Objects.requireNonNullElseGet(associatedWith, () -> ResourceID::fromResource);
}


public PrimaryResourcesRetriever<R> getPrimaryResourcesRetriever() {
return secondaryToPrimaryResourcesIdSet;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ public void start() throws OperatorException {
}

final var context = new EventSourceContext<>(
eventSourceManager.getControllerResourceEventSource().getResourceCache(),
eventSourceManager.getControllerResourceEventSource(),
configurationService(), kubernetesClient);

prepareEventSources(context).forEach(eventSourceManager::registerEventSource);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,4 @@
* @return the label selector
*/
String labelSelector() default EMPTY_STRING;

}
Original file line number Diff line number Diff line change
Expand Up @@ -75,30 +75,46 @@ private void configureWith(ConfigurationService configService, String labelSelec
.withPrimaryResourcesRetriever(primaryResourcesRetriever)
.withAssociatedSecondaryResourceIdentifier(secondaryResourceIdentifier)
.build();
configureWith(configService, new InformerEventSource<>(ic, client), addOwnerReference);
configureWith(new InformerEventSource<>(ic, client), addOwnerReference);
}

/**
* Use to share informers between event more resources.
*
* @param configurationService get configs
*
* @param informerEventSource informer to use
* @param addOwnerReference to the created resource
*/
public void configureWith(ConfigurationService configurationService,
public void configureWith(
InformerEventSource<R, P> informerEventSource,
boolean addOwnerReference) {
this.informerEventSource = informerEventSource;
this.addOwnerReference = addOwnerReference;
}

public void create(R target, P primary, Context context) {
prepare(target, primary, "Creating").create(target);
var resourceID = ResourceID.fromResource(target);
try {
informerEventSource.prepareForCreateOrUpdateEventFiltering(resourceID);
var created = prepare(target, primary, "Creating").create(target);
informerEventSource.handleRecentResourceCreate(created);
} catch (RuntimeException e) {
informerEventSource.cleanupOnCreateOrUpdateEventFiltering(resourceID);
throw e;
}
}

public void update(R actual, R target, P primary, Context context) {
var updatedActual = processor.replaceSpecOnActual(actual, target, context);
prepare(target, primary, "Updating").replace(updatedActual);
var resourceID = ResourceID.fromResource(target);
try {
var updatedActual = processor.replaceSpecOnActual(actual, target, context);
informerEventSource.prepareForCreateOrUpdateEventFiltering(resourceID);
var updated = prepare(target, primary, "Updating").replace(updatedActual);
informerEventSource.handleRecentResourceUpdate(updated,
actual.getMetadata().getResourceVersion());
} catch (RuntimeException e) {
informerEventSource.cleanupOnCreateOrUpdateEventFiltering(resourceID);
throw e;
}
}

public boolean match(R actualResource, R desiredResource, Context context) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import io.javaoperatorsdk.operator.processing.retry.RetryExecution;

import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getName;
import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getVersion;

class EventProcessor<R extends HasMetadata> implements EventHandler, LifecycleAware {

Expand All @@ -50,7 +49,7 @@ class EventProcessor<R extends HasMetadata> implements EventHandler, LifecycleAw

EventProcessor(EventSourceManager<R> eventSourceManager) {
this(
eventSourceManager.getControllerResourceEventSource().getResourceCache(),
eventSourceManager.getControllerResourceEventSource(),
ExecutorServiceManager.instance().executorService(),
eventSourceManager.getController().getConfiguration().getName(),
new ReconciliationDispatcher<>(eventSourceManager.getController()),
Expand All @@ -73,7 +72,7 @@ class EventProcessor<R extends HasMetadata> implements EventHandler, LifecycleAw
Retry retry,
Metrics metrics) {
this(
eventSourceManager.getControllerResourceEventSource().getResourceCache(),
eventSourceManager.getControllerResourceEventSource(),
null,
relatedControllerName,
reconciliationDispatcher,
Expand Down Expand Up @@ -208,12 +207,12 @@ void eventProcessingFinished(
if (eventMarker.deleteEventPresent(resourceID)) {
cleanupForDeletedEvent(executionScope.getCustomResourceID());
} else {
postExecutionControl.getUpdatedCustomResource().ifPresent(r -> {
eventSourceManager.getControllerResourceEventSource().handleRecentResourceUpdate(r,
executionScope.getResource().getMetadata().getResourceVersion());
});
if (eventMarker.eventPresent(resourceID)) {
if (isCacheReadyForInstantReconciliation(executionScope, postExecutionControl)) {
submitReconciliationExecution(resourceID);
} else {
postponeReconciliationAndHandleCacheSyncEvent(resourceID);
}
submitReconciliationExecution(resourceID);
} else {
reScheduleExecutionIfInstructed(postExecutionControl, executionScope.getResource());
}
Expand All @@ -223,41 +222,6 @@ void eventProcessingFinished(
}
}

private void postponeReconciliationAndHandleCacheSyncEvent(ResourceID resourceID) {
eventSourceManager.getControllerResourceEventSource().whitelistNextEvent(resourceID);
}

private boolean isCacheReadyForInstantReconciliation(
ExecutionScope<R> executionScope, PostExecutionControl<R> postExecutionControl) {
if (!postExecutionControl.customResourceUpdatedDuringExecution()) {
return true;
}
String originalResourceVersion = getVersion(executionScope.getResource());
String customResourceVersionAfterExecution =
getVersion(
postExecutionControl
.getUpdatedCustomResource()
.orElseThrow(
() -> new IllegalStateException(
"Updated custom resource must be present at this point of time")));
String cachedCustomResourceVersion =
getVersion(
cache
.get(executionScope.getCustomResourceID())
.orElseThrow(
() -> new IllegalStateException(
"Cached custom resource must be present at this point")));

if (cachedCustomResourceVersion.equals(customResourceVersionAfterExecution)) {
return true;
}
// If the cached resource version equals neither the version before nor after execution
// probably an update happened on the custom resource independent of the framework during
// reconciliation. We cannot tell at this point if it happened before our update or before.
// (Well we could if we would parse resource version, but that should not be done by definition)
return !cachedCustomResourceVersion.equals(originalResourceVersion);
}

private void reScheduleExecutionIfInstructed(
PostExecutionControl<R> postExecutionControl, R customResource) {
postExecutionControl
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
import io.javaoperatorsdk.operator.processing.Controller;
import io.javaoperatorsdk.operator.processing.MDCUtils;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.ResourceCache;
import io.javaoperatorsdk.operator.processing.event.source.informer.ManagedInformerEventSource;

import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getName;
Expand All @@ -25,30 +24,19 @@ public class ControllerResourceEventSource<T extends HasMetadata>
implements ResourceEventHandler<T> {

public static final String ANY_NAMESPACE_MAP_KEY = "anyNamespace";

private static final Logger log = LoggerFactory.getLogger(ControllerResourceEventSource.class);

private final Controller<T> controller;
private final ResourceEventFilter<T> filter;
private final OnceWhitelistEventFilterEventFilter<T> onceWhitelistEventFilterEventFilter;

public ControllerResourceEventSource(Controller<T> controller) {
super(controller.getCRClient(), controller.getConfiguration());
this.controller = controller;

var filters = new ResourceEventFilter[] {
ResourceEventFilters.finalizerNeededAndApplied(),
ResourceEventFilters.markedForDeletion(),
ResourceEventFilters.generationAware(),
null
};

if (controller.getConfiguration().isGenerationAware()) {
onceWhitelistEventFilterEventFilter = new OnceWhitelistEventFilterEventFilter<>();
filters[filters.length - 1] = onceWhitelistEventFilterEventFilter;
} else {
onceWhitelistEventFilterEventFilter = null;
}
if (controller.getConfiguration().getEventFilter() != null) {
filter = controller.getConfiguration().getEventFilter().and(ResourceEventFilters.or(filters));
} else {
Expand Down Expand Up @@ -87,36 +75,22 @@ public void eventReceived(ResourceAction action, T resource, T oldResource) {

@Override
public void onAdd(T resource) {
super.onAdd(resource);
eventReceived(ResourceAction.ADDED, resource, null);
}

@Override
public void onUpdate(T oldCustomResource, T newCustomResource) {
super.onUpdate(oldCustomResource, newCustomResource);
eventReceived(ResourceAction.UPDATED, newCustomResource, oldCustomResource);
}

@Override
public void onDelete(T resource, boolean b) {
super.onDelete(resource, b);
eventReceived(ResourceAction.DELETED, resource, null);
}

public ResourceCache<T> getResourceCache() {
return manager();
}

/**
* This will ensure that the next event received after this method is called will not be filtered
* out.
*
* @param resourceID - to which the event is related
*/
public void whitelistNextEvent(ResourceID resourceID) {
if (onceWhitelistEventFilterEventFilter != null) {
onceWhitelistEventFilterEventFilter.whitelistNextEvent(resourceID);
}
}


private void handleKubernetesClientException(Exception e) {
KubernetesClientException ke = (KubernetesClientException) e;
if (404 == ke.getCode()) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
package io.javaoperatorsdk.operator.processing.event.source.informer;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.processing.event.ResourceID;

public class EventRecorder<R extends HasMetadata> {

private final Map<ResourceID, ArrayList<R>> resourceEvents = new ConcurrentHashMap<>();

void startEventRecording(ResourceID resourceID) {
resourceEvents.putIfAbsent(resourceID, new ArrayList<>(5));
}

public boolean isRecordingFor(ResourceID resourceID) {
return resourceEvents.get(resourceID) != null;
}

public void stopEventRecording(ResourceID resourceID) {
resourceEvents.remove(resourceID);
}

public void recordEvent(R resource) {
resourceEvents.get(ResourceID.fromResource(resource)).add(resource);
}

public boolean containsEventWithResourceVersion(ResourceID resourceID, String resourceVersion) {
List<R> events = resourceEvents.get(resourceID);
if (events == null) {
return false;
}
if (events.isEmpty()) {
return false;
} else {
return events.stream()
.anyMatch(e -> e.getMetadata().getResourceVersion().equals(resourceVersion));
}
}

public boolean containsEventWithVersionButItsNotLastOne(
ResourceID resourceID, String resourceVersion) {
List<R> resources = resourceEvents.get(resourceID);
if (resources == null) {
throw new IllegalStateException(
"Null events list, this is probably a result of invalid usage of the " +
"InformerEventSource. Resource ID: " + resourceID);
}
if (resources.isEmpty()) {
throw new IllegalStateException("No events for resource id: " + resourceID);
}
return !resources
.get(resources.size() - 1)
.getMetadata()
.getResourceVersion()
.equals(resourceVersion);
}

public R getLastEvent(ResourceID resourceID) {
List<R> resources = resourceEvents.get(resourceID);
if (resources == null) {
throw new IllegalStateException(
"Null events list, this is probably a result of invalid usage of the " +
"InformerEventSource. Resource ID: " + resourceID);
}
return resources.get(resources.size() - 1);
}
}
Loading