Skip to content

feat: reading cache just on reconciliation dispatching #1640

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Dec 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -151,11 +151,11 @@ private void submitReconciliationExecution(ResourceState state) {
}
state.setUnderProcessing(true);
final var latest = maybeLatest.get();
ExecutionScope<P> executionScope = new ExecutionScope<>(latest, state.getRetry());
ExecutionScope<P> executionScope = new ExecutionScope<>(state.getRetry());
state.unMarkEventReceived();
metrics.reconcileCustomResource(latest, state.getRetry(), metricsMetadata);
log.debug("Executing events for custom resource. Scope: {}", executionScope);
executor.execute(new ReconcilerExecutor(executionScope));
executor.execute(new ReconcilerExecutor(resourceID, executionScope));
} else {
log.debug(
"Skipping executing controller for resource id: {}. Controller in execution: {}. Latest Resource present: {}",
Expand Down Expand Up @@ -388,9 +388,11 @@ private void handleAlreadyMarkedEvents() {

private class ReconcilerExecutor implements Runnable {
private final ExecutionScope<P> executionScope;
private final ResourceID resourceID;

private ReconcilerExecutor(ExecutionScope<P> executionScope) {
private ReconcilerExecutor(ResourceID resourceID, ExecutionScope<P> executionScope) {
this.executionScope = executionScope;
this.resourceID = resourceID;
}

@Override
Expand All @@ -399,6 +401,13 @@ public void run() {
final var thread = Thread.currentThread();
final var name = thread.getName();
try {
var actualResource = cache.get(resourceID);
if (actualResource.isEmpty()) {
log.debug("Skipping execution; primary resource missing from cache: {}",
resourceID);
return;
}
actualResource.ifPresent(executionScope::setResource);
MDCUtils.addResourceInfo(executionScope.getResource());
thread.setName("ReconcilerExecutor-" + controllerName() + "-" + thread.getId());
PostExecutionControl<P> postExecutionControl =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,18 @@
class ExecutionScope<R extends HasMetadata> {

// the latest custom resource from cache
private final R resource;
private R resource;
private final RetryInfo retryInfo;

ExecutionScope(R resource, RetryInfo retryInfo) {
this.resource = resource;
ExecutionScope(RetryInfo retryInfo) {
this.retryInfo = retryInfo;
}

public ExecutionScope<R> setResource(R resource) {
this.resource = resource;
return this;
}

public R getResource() {
return resource;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,9 @@ void ifExecutionInProgressWaitsUntilItsFinished() {
void schedulesAnEventRetryOnException() {
TestCustomResource customResource = testCustomResource();

ExecutionScope executionScope = new ExecutionScope(customResource, null);
ExecutionScope executionScope =
new ExecutionScope(null);
executionScope.setResource(customResource);
PostExecutionControl postExecutionControl =
PostExecutionControl.exceptionDuringExecution(new RuntimeException("test"));

Expand Down Expand Up @@ -254,7 +256,7 @@ void cancelScheduleOnceEventsOnSuccessfulExecution() {
var crID = new ResourceID("test-cr", TEST_NAMESPACE);
var cr = testCustomResource(crID);

eventProcessor.eventProcessingFinished(new ExecutionScope(cr, null),
eventProcessor.eventProcessingFinished(new ExecutionScope(null).setResource(cr),
PostExecutionControl.defaultDispatch());

verify(retryTimerEventSourceMock, times(1)).cancelOnceSchedule(eq(crID));
Expand Down Expand Up @@ -283,7 +285,8 @@ void startProcessedMarkedEventReceivedBefore() {
@Test
void updatesEventSourceHandlerIfResourceUpdated() {
TestCustomResource customResource = testCustomResource();
ExecutionScope executionScope = new ExecutionScope(customResource, null);
ExecutionScope executionScope =
new ExecutionScope(null).setResource(customResource);
PostExecutionControl postExecutionControl =
PostExecutionControl.customResourceUpdated(customResource);

Expand All @@ -297,7 +300,8 @@ void updatesEventSourceHandlerIfResourceUpdated() {
@Test
void notUpdatesEventSourceHandlerIfResourceUpdated() {
TestCustomResource customResource = testCustomResource();
ExecutionScope executionScope = new ExecutionScope(customResource, null);
ExecutionScope executionScope =
new ExecutionScope(null).setResource(customResource);
PostExecutionControl postExecutionControl =
PostExecutionControl.customResourceStatusPatched(customResource);

Expand All @@ -311,7 +315,8 @@ void notUpdatesEventSourceHandlerIfResourceUpdated() {
void notReschedulesAfterTheFinalizerRemoveProcessed() {
TestCustomResource customResource = testCustomResource();
markForDeletion(customResource);
ExecutionScope executionScope = new ExecutionScope(customResource, null);
ExecutionScope executionScope =
new ExecutionScope(null).setResource(customResource);
PostExecutionControl postExecutionControl =
PostExecutionControl.customResourceFinalizerRemoved(customResource);

Expand All @@ -324,7 +329,8 @@ void notReschedulesAfterTheFinalizerRemoveProcessed() {
void skipEventProcessingIfFinalizerRemoveProcessed() {
TestCustomResource customResource = testCustomResource();
markForDeletion(customResource);
ExecutionScope executionScope = new ExecutionScope(customResource, null);
ExecutionScope executionScope =
new ExecutionScope(null).setResource(customResource);
PostExecutionControl postExecutionControl =
PostExecutionControl.customResourceFinalizerRemoved(customResource);

Expand All @@ -341,7 +347,8 @@ void skipEventProcessingIfFinalizerRemoveProcessed() {
void newResourceAfterMissedDeleteEvent() {
TestCustomResource customResource = testCustomResource();
markForDeletion(customResource);
ExecutionScope executionScope = new ExecutionScope(customResource, null);
ExecutionScope executionScope =
new ExecutionScope(null).setResource(customResource);
PostExecutionControl postExecutionControl =
PostExecutionControl.customResourceFinalizerRemoved(customResource);
var newResource = testCustomResource();
Expand Down Expand Up @@ -377,7 +384,8 @@ void rateLimitsReconciliationSubmission() {
@Test
void schedulesRetryForMarReconciliationInterval() {
TestCustomResource customResource = testCustomResource();
ExecutionScope executionScope = new ExecutionScope(customResource, null);
ExecutionScope executionScope =
new ExecutionScope(null).setResource(customResource);
PostExecutionControl postExecutionControl =
PostExecutionControl.defaultDispatch();

Expand All @@ -398,7 +406,8 @@ void schedulesRetryForMarReconciliationIntervalIfRetryExhausted() {
eventSourceManagerMock,
metricsMock));
eventProcessorWithRetry.start();
ExecutionScope executionScope = new ExecutionScope(testCustomResource(), null);
ExecutionScope executionScope =
new ExecutionScope(null).setResource(testCustomResource());
PostExecutionControl postExecutionControl =
PostExecutionControl.exceptionDuringExecution(new RuntimeException());
when(eventProcessorWithRetry.retryEventSource()).thenReturn(retryTimerEventSourceMock);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,6 @@ void propagatesRetryInfoToContextIfFinalizerSet() {

reconciliationDispatcher.handleExecution(
new ExecutionScope(
testCustomResource,
new RetryInfo() {
@Override
public int getAttemptCount() {
Expand All @@ -385,7 +384,7 @@ public int getAttemptCount() {
public boolean isLastAttempt() {
return true;
}
}));
}).setResource(testCustomResource));

ArgumentCaptor<Context> contextArgumentCaptor =
ArgumentCaptor.forClass(Context.class);
Expand Down Expand Up @@ -504,7 +503,6 @@ void callErrorStatusHandlerIfImplemented() {

reconciliationDispatcher.handleExecution(
new ExecutionScope(
testCustomResource,
new RetryInfo() {
@Override
public int getAttemptCount() {
Expand All @@ -515,7 +513,7 @@ public int getAttemptCount() {
public boolean isLastAttempt() {
return true;
}
}));
}).setResource(testCustomResource));

verify(customResourceFacade, times(1)).updateStatus(testCustomResource);
verify(((ErrorStatusHandler) reconciler), times(1)).updateErrorStatus(eq(testCustomResource),
Expand All @@ -535,8 +533,7 @@ void callErrorStatusHandlerEvenOnFirstError() {
};

var postExecControl = reconciliationDispatcher.handleExecution(
new ExecutionScope(
testCustomResource, null));
new ExecutionScope(null).setResource(testCustomResource));
verify(customResourceFacade, times(1)).updateStatus(testCustomResource);
verify(((ErrorStatusHandler) reconciler), times(1)).updateErrorStatus(eq(testCustomResource),
any(), any());
Expand All @@ -555,8 +552,7 @@ void errorHandlerCanInstructNoRetryWithUpdate() {
};

var postExecControl = reconciliationDispatcher.handleExecution(
new ExecutionScope(
testCustomResource, null));
new ExecutionScope(null).setResource(testCustomResource));

verify(((ErrorStatusHandler) reconciler), times(1)).updateErrorStatus(eq(testCustomResource),
any(), any());
Expand All @@ -576,8 +572,7 @@ void errorHandlerCanInstructNoRetryNoUpdate() {
};

var postExecControl = reconciliationDispatcher.handleExecution(
new ExecutionScope(
testCustomResource, null));
new ExecutionScope(null).setResource(testCustomResource));

verify(((ErrorStatusHandler) reconciler), times(1)).updateErrorStatus(eq(testCustomResource),
any(), any());
Expand All @@ -596,8 +591,7 @@ void errorStatusHandlerCanPatchResource() {


reconciliationDispatcher.handleExecution(
new ExecutionScope(
testCustomResource, null));
new ExecutionScope(null).setResource(testCustomResource));

verify(customResourceFacade, times(1)).patchStatus(eq(testCustomResource), any());
verify(((ErrorStatusHandler) reconciler), times(1)).updateErrorStatus(eq(testCustomResource),
Expand All @@ -622,8 +616,7 @@ void ifRetryLimitedToZeroMaxAttemptsErrorHandlerGetsCorrectLastAttempt() {
reconciler.errorHandler = mockErrorHandler;

reconciliationDispatcher.handleExecution(
new ExecutionScope(
testCustomResource, null));
new ExecutionScope(null).setResource(testCustomResource));

verify(mockErrorHandler, times(1)).updateErrorStatus(any(),
ArgumentMatchers.argThat((ArgumentMatcher<Context<TestCustomResource>>) context -> {
Expand Down Expand Up @@ -667,7 +660,7 @@ private void removeFinalizers(CustomResource customResource) {
}

public <T extends HasMetadata> ExecutionScope<T> executionScopeWithCREvent(T resource) {
return new ExecutionScope<>(resource, null);
return (ExecutionScope<T>) new ExecutionScope<>(null).setResource(resource);
}

private class TestReconciler
Expand Down