Skip to content

Retry Support #252

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Dec 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import io.javaoperatorsdk.operator.processing.EventDispatcher;
import io.javaoperatorsdk.operator.processing.event.DefaultEventSourceManager;
import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEventSource;
import io.javaoperatorsdk.operator.processing.retry.Retry;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
Expand All @@ -36,19 +37,33 @@ public Operator(KubernetesClient k8sClient) {
this.k8sClient = k8sClient;
}

public <R extends CustomResource> void registerControllerForAllNamespaces(
ResourceController<R> controller, Retry retry) throws OperatorException {
registerController(controller, true, retry);
}

public <R extends CustomResource> void registerControllerForAllNamespaces(
ResourceController<R> controller) throws OperatorException {
registerController(controller, true);
registerController(controller, true, null);
}

public <R extends CustomResource> void registerController(
ResourceController<R> controller, Retry retry, String... targetNamespaces)
throws OperatorException {
registerController(controller, false, retry, targetNamespaces);
}

public <R extends CustomResource> void registerController(
ResourceController<R> controller, String... targetNamespaces) throws OperatorException {
registerController(controller, false, targetNamespaces);
registerController(controller, false, null, targetNamespaces);
}

@SuppressWarnings("rawtypes")
private <R extends CustomResource> void registerController(
ResourceController<R> controller, boolean watchAllNamespaces, String... targetNamespaces)
ResourceController<R> controller,
boolean watchAllNamespaces,
Retry retry,
String... targetNamespaces)
throws OperatorException {
Class<R> resClass = getCustomResourceClass(controller);
CustomResourceDefinitionContext crd = getCustomResourceDefinitionForController(controller);
Expand All @@ -67,10 +82,10 @@ private <R extends CustomResource> void registerController(
CustomResourceCache customResourceCache = new CustomResourceCache();
DefaultEventHandler defaultEventHandler =
new DefaultEventHandler(
customResourceCache, eventDispatcher, controller.getClass().getName());
customResourceCache, eventDispatcher, controller.getClass().getName(), retry);
DefaultEventSourceManager eventSourceManager =
new DefaultEventSourceManager(defaultEventHandler);
defaultEventHandler.setDefaultEventSourceManager(eventSourceManager);
new DefaultEventSourceManager(defaultEventHandler, retry != null);
defaultEventHandler.setEventSourceManager(eventSourceManager);
eventDispatcher.setEventSourceManager(eventSourceManager);

customResourceClients.put(resClass, (CustomResourceOperationsImpl) client);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,13 @@
import io.fabric8.kubernetes.client.CustomResource;
import io.javaoperatorsdk.operator.processing.event.EventList;
import io.javaoperatorsdk.operator.processing.event.EventSourceManager;
import java.util.Optional;

public interface Context<T extends CustomResource> {

EventSourceManager getEventSourceManager();

EventList getEvents();

Optional<RetryInfo> getRetryInfo();
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,17 @@
import io.fabric8.kubernetes.client.CustomResource;
import io.javaoperatorsdk.operator.processing.event.EventList;
import io.javaoperatorsdk.operator.processing.event.EventSourceManager;
import java.util.Optional;

public class DefaultContext<T extends CustomResource> implements Context<T> {

private final RetryInfo retryInfo;
private final EventList events;
private final EventSourceManager eventSourceManager;

public DefaultContext(EventSourceManager eventSourceManager, EventList events) {
public DefaultContext(
EventSourceManager eventSourceManager, EventList events, RetryInfo retryInfo) {
this.retryInfo = retryInfo;
this.events = events;
this.eventSourceManager = eventSourceManager;
}
Expand All @@ -23,4 +27,9 @@ public EventSourceManager getEventSourceManager() {
public EventList getEvents() {
return events;
}

@Override
public Optional<RetryInfo> getRetryInfo() {
return Optional.ofNullable(retryInfo);
}
}
Original file line number Diff line number Diff line change
@@ -1,20 +1,8 @@
package io.javaoperatorsdk.operator.api;

public class RetryInfo {
public interface RetryInfo {

private int retryNumber;
private boolean lastAttempt;
int getAttemptCount();

public RetryInfo(int retryNumber, boolean lastAttempt) {
this.retryNumber = retryNumber;
this.lastAttempt = lastAttempt;
}

public int getRetryNumber() {
return retryNumber;
}

public boolean isLastAttempt() {
return lastAttempt;
}
boolean isLastAttempt();
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,13 @@
import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getVersion;

import io.fabric8.kubernetes.client.CustomResource;
import io.javaoperatorsdk.operator.api.RetryInfo;
import io.javaoperatorsdk.operator.processing.event.DefaultEventSourceManager;
import io.javaoperatorsdk.operator.processing.event.Event;
import io.javaoperatorsdk.operator.processing.event.EventHandler;
import io.javaoperatorsdk.operator.processing.retry.Retry;
import io.javaoperatorsdk.operator.processing.retry.RetryExecution;
import java.util.*;
import java.util.HashSet;
import java.util.Optional;
import java.util.Set;
Expand All @@ -30,16 +34,20 @@ public class DefaultEventHandler implements EventHandler {
private final Set<String> underProcessing = new HashSet<>();
private final ScheduledThreadPoolExecutor executor;
private final EventDispatcher eventDispatcher;
private DefaultEventSourceManager defaultEventSourceManager;
private final Retry retry;
private final Map<String, RetryExecution> retryState = new HashMap<>();
private DefaultEventSourceManager eventSourceManager;

private final ReentrantLock lock = new ReentrantLock();

public DefaultEventHandler(
CustomResourceCache customResourceCache,
EventDispatcher eventDispatcher,
String relatedControllerName) {
String relatedControllerName,
Retry retry) {
this.customResourceCache = customResourceCache;
this.eventDispatcher = eventDispatcher;
this.retry = retry;
eventBuffer = new EventBuffer();
executor =
new ScheduledThreadPoolExecutor(
Expand All @@ -52,8 +60,8 @@ public Thread newThread(Runnable runnable) {
});
}

public void setDefaultEventSourceManager(DefaultEventSourceManager defaultEventSourceManager) {
this.defaultEventSourceManager = defaultEventSourceManager;
public void setEventSourceManager(DefaultEventSourceManager eventSourceManager) {
this.eventSourceManager = eventSourceManager;
}

@Override
Expand All @@ -79,7 +87,8 @@ private void executeBufferedEvents(String customResourceUid) {
ExecutionScope executionScope =
new ExecutionScope(
eventBuffer.getAndRemoveEventsForExecution(customResourceUid),
latestCustomResource.get());
latestCustomResource.get(),
retryInfo(customResourceUid));
log.debug("Executing events for custom resource. Scope: {}", executionScope);
executor.execute(new ExecutionConsumer(executionScope, eventDispatcher, this));
} else {
Expand All @@ -93,12 +102,28 @@ private void executeBufferedEvents(String customResourceUid) {
}
}

private RetryInfo retryInfo(String customResourceUid) {
return retryState.get(customResourceUid);
}

void eventProcessingFinished(
ExecutionScope executionScope, PostExecutionControl postExecutionControl) {
try {
lock.lock();
log.debug("Event processing finished. Scope: {}", executionScope);
log.debug(
"Event processing finished. Scope: {}, PostExecutionControl: {}",
executionScope,
postExecutionControl);
unsetUnderExecution(executionScope.getCustomResourceUid());

if (retry != null && postExecutionControl.exceptionDuringExecution()) {
handleRetryOnException(executionScope);
return;
}

if (retry != null) {
markSuccessfulExecutionRegardingRetry(executionScope);
}
if (containsCustomResourceDeletedEvent(executionScope.getEvents())) {
cleanupAfterDeletedEvent(executionScope.getCustomResourceUid());
} else {
Expand All @@ -110,6 +135,53 @@ void eventProcessingFinished(
}
}

/**
* Regarding the events there are 2 approaches we can take. Either retry always when there are new
* events (received meanwhile retry is in place or already in buffer) instantly or always wait
* according to the retry timing if there was an exception.
*/
private void handleRetryOnException(ExecutionScope executionScope) {
RetryExecution execution = getOrInitRetryExecution(executionScope);
boolean newEventsExists = eventBuffer.newEventsExists(executionScope.getCustomResourceUid());
eventBuffer.putBackEvents(executionScope.getCustomResourceUid(), executionScope.getEvents());

if (newEventsExists) {
log.debug("New events exists for for resource id: {}", executionScope.getCustomResourceUid());
executeBufferedEvents(executionScope.getCustomResourceUid());
return;
}
Optional<Long> nextDelay = execution.nextDelay();

nextDelay.ifPresent(
delay -> {
log.debug(
"Scheduling timer event for retry with delay:{} for resource: {}",
delay,
executionScope.getCustomResourceUid());
eventSourceManager
.getRetryTimerEventSource()
.scheduleOnce(executionScope.getCustomResource(), delay);
});
}

private void markSuccessfulExecutionRegardingRetry(ExecutionScope executionScope) {
log.debug(
"Marking successful execution for resource: {}", executionScope.getCustomResourceUid());
retryState.remove(executionScope.getCustomResourceUid());
eventSourceManager
.getRetryTimerEventSource()
.cancelOnceSchedule(executionScope.getCustomResourceUid());
}

private RetryExecution getOrInitRetryExecution(ExecutionScope executionScope) {
RetryExecution retryExecution = retryState.get(executionScope.getCustomResourceUid());
if (retryExecution == null) {
retryExecution = retry.initExecution();
retryState.put(executionScope.getCustomResourceUid(), retryExecution);
}
return retryExecution;
}

/**
* Here we try to cache the latest resource after an update. The goal is to solve a concurrency
* issue we've seen: If an execution is finished, where we updated a custom resource, but there
Expand Down Expand Up @@ -146,7 +218,7 @@ private void cacheUpdatedResourceIfChanged(
}

private void cleanupAfterDeletedEvent(String customResourceUid) {
defaultEventSourceManager.cleanup(customResourceUid);
eventSourceManager.cleanup(customResourceUid);
eventBuffer.cleanup(customResourceUid);
customResourceCache.cleanup(customResourceUid);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,16 @@ public void addEvent(Event event) {
crEvents.add(event);
}

public boolean newEventsExists(String resourceId) {
return events.get(resourceId) != null && !events.get(resourceId).isEmpty();
}

public void putBackEvents(String resourceUid, List<Event> oldEvents) {
List<Event> crEvents =
events.computeIfAbsent(resourceUid, (id) -> new ArrayList<>(oldEvents.size()));
crEvents.addAll(0, oldEvents);
}

public boolean containsEvents(String customResourceId) {
return events.get(customResourceId) != null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,16 @@ public void setEventSourceManager(EventSourceManager eventSourceManager) {
this.eventSourceManager = eventSourceManager;
}

public PostExecutionControl handleEvent(ExecutionScope event) {
public PostExecutionControl handleExecution(ExecutionScope executionScope) {
try {
return handDispatch(event);
return handleDispatch(executionScope);
} catch (RuntimeException e) {
log.error("Error during event processing {} failed.", event, e);
return PostExecutionControl.defaultDispatch();
log.error("Error during event processing {} failed.", executionScope, e);
return PostExecutionControl.exceptionDuringExecution(e);
}
}

private PostExecutionControl handDispatch(ExecutionScope executionScope) {
private PostExecutionControl handleDispatch(ExecutionScope executionScope) {
CustomResource resource = executionScope.getCustomResource();
log.debug(
"Handling events: {} for resource {}", executionScope.getEvents(), resource.getMetadata());
Expand All @@ -72,7 +72,10 @@ private PostExecutionControl handDispatch(ExecutionScope executionScope) {
return PostExecutionControl.defaultDispatch();
}
Context context =
new DefaultContext(eventSourceManager, new EventList(executionScope.getEvents()));
new DefaultContext(
eventSourceManager,
new EventList(executionScope.getEvents()),
executionScope.getRetryInfo());
if (markedForDeletion(resource)) {
return handleDelete(resource, context);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class ExecutionConsumer implements Runnable {

@Override
public void run() {
PostExecutionControl postExecutionControl = eventDispatcher.handleEvent(executionScope);
PostExecutionControl postExecutionControl = eventDispatcher.handleExecution(executionScope);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just loud thinking, wouldn't the code be simpler if we had 2 ExecutionConsumers called
SimpleConsumer and RetryableConsumer and let the consumer to take care of the retrying rather than the handler?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@psycho-ir thing is this is closely relates with the events are comming in while the controller is executed, that layer does not have enough info basically now. (Also its just really a wraper around executor).

defaultEventHandler.eventProcessingFinished(executionScope, postExecutionControl);
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.javaoperatorsdk.operator.processing;

import io.fabric8.kubernetes.client.CustomResource;
import io.javaoperatorsdk.operator.api.RetryInfo;
import io.javaoperatorsdk.operator.processing.event.Event;
import java.util.List;

Expand All @@ -10,9 +11,12 @@ public class ExecutionScope {
// the latest custom resource from cache
private CustomResource customResource;

public ExecutionScope(List<Event> list, CustomResource customResource) {
private RetryInfo retryInfo;

public ExecutionScope(List<Event> list, CustomResource customResource, RetryInfo retryInfo) {
this.events = list;
this.customResource = customResource;
this.retryInfo = retryInfo;
}

public List<Event> getEvents() {
Expand All @@ -38,4 +42,8 @@ public String toString() {
+ customResource.getMetadata().getResourceVersion()
+ '}';
}

public RetryInfo getRetryInfo() {
return retryInfo;
}
}
Loading