-
Notifications
You must be signed in to change notification settings - Fork 220
Removing events from context #596
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 41 commits
Commits
Show all changes
48 commits
Select commit
Hold shift + click to select a range
898e2c2
WIP
csviri be4e5cf
Addressing Custom Resource by Name and Namespace refactor + Informer …
csviri b2ab4b9
Build is fixed, (test failing)
csviri 9145b52
Test fixes
csviri 4ef27bf
Merge branch 'master' into informer-creventsource
csviri 34cc2a1
minor update
csviri 4b15974
EventSourceManager small fix
csviri 77033e6
Merge branch 'access-event-source-manager' into informer-creventsource
csviri e1b5926
Unit tests fixed
csviri e7d1b99
fix: DefaultEventHandler init from EventSourceManager
csviri a1f92c6
fix: custom resource selector test improvement
csviri e928a3e
fix: wip test imrpovements
csviri f35a340
fix: test improvements
csviri 5d5817e
fix: further improvements
csviri 544ce35
Merge branch 'v2' into informer-creventsource
csviri 75ad7d2
fix: build
csviri 7012fb3
feature: add mvn jar to gitignore
csviri 3746122
Exposing CustomResourceEventSource and informers
csviri f0f2e91
fix: cleanup
csviri 6638a48
fix: remove caching optimization since it not possible anymore with i…
csviri 1d786ef
fix: formatting
csviri a5343b7
refactor: make name/namespace final
metacosm 9e0430a
feature: Simple label selector support
csviri 7f48b9a
Merge branch 'informer-creventsource' of github.com:java-operator-sdk…
csviri 0aa29a1
fix: formatting
csviri 3ad2fc5
fix: code inspection reports
csviri 86b8185
Merge branch 'v2' into informer-creventsource
csviri 92d3ed3
fix: merge from v2
csviri d194d25
fix: removed most deprecated apis
csviri 7ee0b7d
wip: started to remove events from variouse layers
csviri 150e875
fix: progress with implementation and tests
csviri a03cfb9
fix: Updated informer mapping to CustomResourceID
csviri 6b1d7fe
Merge branch 'informer-creventsource' into removing-events-from-context
csviri f97ced5
fix: imports
csviri 109b7bc
fix: decorational changes
csviri 874c25e
fix: event marker unit test
csviri ff2b32a
Default Event Handler Unit tests
csviri 1b66865
Merge branch 'v2' into removing-events-from-context
csviri b6c87f0
fix: fixes after merge
csviri af50089
fix: changes from code review
csviri 58c3f5c
fix: method naming
csviri 6635b54
Update operator-framework-core/src/main/java/io/javaoperatorsdk/opera…
csviri f40ee4e
Update operator-framework-core/src/main/java/io/javaoperatorsdk/opera…
csviri 73c32a8
fix: comment
csviri a8a1ea6
Merge branch 'v2' into removing-events-from-context
csviri e75cad4
fix: fixes from merge
csviri b9fc1fc
fix: remove not used method
csviri 0d409ee
fix: formatting
csviri File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -20,11 +20,12 @@ | |
import io.javaoperatorsdk.operator.processing.event.DefaultEventSourceManager; | ||
import io.javaoperatorsdk.operator.processing.event.Event; | ||
import io.javaoperatorsdk.operator.processing.event.EventHandler; | ||
import io.javaoperatorsdk.operator.processing.event.internal.CustomResourceEvent; | ||
import io.javaoperatorsdk.operator.processing.event.internal.ResourceAction; | ||
import io.javaoperatorsdk.operator.processing.retry.GenericRetry; | ||
import io.javaoperatorsdk.operator.processing.retry.Retry; | ||
import io.javaoperatorsdk.operator.processing.retry.RetryExecution; | ||
|
||
import static io.javaoperatorsdk.operator.EventListUtils.containsCustomResourceDeletedEvent; | ||
import static io.javaoperatorsdk.operator.processing.KubernetesResourceUtils.getName; | ||
|
||
/** | ||
|
@@ -38,7 +39,6 @@ public class DefaultEventHandler<R extends CustomResource<?, ?>> implements Even | |
@Deprecated | ||
private static EventMonitor monitor = EventMonitor.NOOP; | ||
|
||
private final EventBuffer eventBuffer; | ||
private final Set<CustomResourceID> underProcessing = new HashSet<>(); | ||
private final EventDispatcher<R> eventDispatcher; | ||
private final Retry retry; | ||
|
@@ -50,6 +50,7 @@ public class DefaultEventHandler<R extends CustomResource<?, ?>> implements Even | |
private volatile boolean running; | ||
private final ResourceCache<R> resourceCache; | ||
private DefaultEventSourceManager<R> eventSourceManager; | ||
private final EventMarker eventMarker; | ||
|
||
public DefaultEventHandler(ConfiguredController<R> controller, ResourceCache<R> resourceCache) { | ||
this( | ||
|
@@ -58,18 +59,20 @@ public DefaultEventHandler(ConfiguredController<R> controller, ResourceCache<R> | |
controller.getConfiguration().getName(), | ||
new EventDispatcher<>(controller), | ||
GenericRetry.fromConfiguration(controller.getConfiguration().getRetryConfiguration()), | ||
controller.getConfiguration().getConfigurationService().getMetrics().getEventMonitor()); | ||
controller.getConfiguration().getConfigurationService().getMetrics().getEventMonitor(), | ||
new EventMarker()); | ||
} | ||
|
||
DefaultEventHandler(EventDispatcher<R> eventDispatcher, ResourceCache<R> resourceCache, | ||
String relatedControllerName, | ||
Retry retry) { | ||
this(resourceCache, null, relatedControllerName, eventDispatcher, retry, null); | ||
Retry retry, EventMarker eventMarker) { | ||
this(resourceCache, null, relatedControllerName, eventDispatcher, retry, null, eventMarker); | ||
} | ||
|
||
private DefaultEventHandler(ResourceCache<R> resourceCache, ExecutorService executor, | ||
String relatedControllerName, | ||
EventDispatcher<R> eventDispatcher, Retry retry, EventMonitor monitor) { | ||
EventDispatcher<R> eventDispatcher, Retry retry, EventMonitor monitor, | ||
EventMarker eventMarker) { | ||
this.running = true; | ||
this.executor = | ||
executor == null | ||
|
@@ -79,9 +82,9 @@ private DefaultEventHandler(ResourceCache<R> resourceCache, ExecutorService exec | |
this.controllerName = relatedControllerName; | ||
this.eventDispatcher = eventDispatcher; | ||
this.retry = retry; | ||
eventBuffer = new EventBuffer(); | ||
this.resourceCache = resourceCache; | ||
this.eventMonitor = monitor != null ? monitor : EventMonitor.NOOP; | ||
this.eventMarker = eventMarker; | ||
} | ||
|
||
public void setEventSourceManager(DefaultEventSourceManager<R> eventSourceManager) { | ||
|
@@ -113,71 +116,75 @@ private EventMonitor monitor() { | |
|
||
@Override | ||
public void handleEvent(Event event) { | ||
lock.lock(); | ||
try { | ||
lock.lock(); | ||
log.debug("Received event: {}", event); | ||
if (!this.running) { | ||
log.debug("Skipping event: {} because the event handler is shutting down", event); | ||
return; | ||
} | ||
final var monitor = monitor(); | ||
eventBuffer.addEvent(event.getRelatedCustomResourceID(), event); | ||
monitor.processedEvent(event.getRelatedCustomResourceID(), event); | ||
executeBufferedEvents(event.getRelatedCustomResourceID()); | ||
} finally { | ||
lock.unlock(); | ||
} | ||
} | ||
|
||
@Override | ||
public void close() { | ||
try { | ||
lock.lock(); | ||
this.running = false; | ||
handleEventMarking(event); | ||
if (!eventMarker.deleteEventPresent(event.getRelatedCustomResourceID())) { | ||
submitReconciliationExecution(event.getRelatedCustomResourceID()); | ||
} else { | ||
cleanupForDeletedEvent(event.getRelatedCustomResourceID()); | ||
} | ||
} finally { | ||
lock.unlock(); | ||
} | ||
} | ||
|
||
private boolean executeBufferedEvents(CustomResourceID customResourceUid) { | ||
boolean newEventForResourceId = eventBuffer.containsEvents(customResourceUid); | ||
private boolean submitReconciliationExecution(CustomResourceID customResourceUid) { | ||
boolean controllerUnderExecution = isControllerUnderExecution(customResourceUid); | ||
Optional<R> latestCustomResource = | ||
resourceCache.getCustomResource(customResourceUid); | ||
|
||
if (!controllerUnderExecution && newEventForResourceId && latestCustomResource.isPresent()) { | ||
if (!controllerUnderExecution | ||
&& latestCustomResource.isPresent()) { | ||
setUnderExecutionProcessing(customResourceUid); | ||
ExecutionScope executionScope = | ||
new ExecutionScope( | ||
eventBuffer.getAndRemoveEventsForExecution(customResourceUid), | ||
latestCustomResource.get(), | ||
retryInfo(customResourceUid)); | ||
eventMarker.unMarkEventReceived(customResourceUid); | ||
log.debug("Executing events for custom resource. Scope: {}", executionScope); | ||
executor.execute(new ControllerExecution(executionScope)); | ||
return true; | ||
} else { | ||
log.debug( | ||
"Skipping executing controller for resource id: {}. Events in queue: {}." | ||
"Skipping executing controller for resource id: {}." | ||
+ " Controller in execution: {}. Latest CustomResource present: {}", | ||
customResourceUid, | ||
newEventForResourceId, | ||
controllerUnderExecution, | ||
latestCustomResource.isPresent()); | ||
if (latestCustomResource.isEmpty()) { | ||
log.warn("no custom resource found in cache for CustomResourceID: {}", customResourceUid); | ||
log.warn("no custom resource found in cache for CustomResourceID: {}", | ||
customResourceUid); | ||
} | ||
return false; | ||
} | ||
} | ||
|
||
private void handleEventMarking(Event event) { | ||
if (event instanceof CustomResourceEvent && | ||
((CustomResourceEvent) event).getAction() == ResourceAction.DELETED) { | ||
eventMarker.markDeleteEventReceived(event); | ||
} else if (!eventMarker.deleteEventPresent(event.getRelatedCustomResourceID())) { | ||
eventMarker.markEventReceived(event); | ||
} | ||
} | ||
|
||
private RetryInfo retryInfo(CustomResourceID customResourceUid) { | ||
return retryState.get(customResourceUid); | ||
} | ||
|
||
void eventProcessingFinished( | ||
ExecutionScope<R> executionScope, PostExecutionControl<R> postExecutionControl) { | ||
lock.lock(); | ||
try { | ||
lock.lock(); | ||
if (!running) { | ||
return; | ||
} | ||
|
@@ -188,23 +195,29 @@ void eventProcessingFinished( | |
postExecutionControl); | ||
unsetUnderExecution(executionScope.getCustomResourceID()); | ||
|
||
if (retry != null && postExecutionControl.exceptionDuringExecution()) { | ||
// If a delete event present it was received during reconciliation. | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure I understand this comment… :( There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. tried to rephrase it. |
||
// So we either removed the finalizer during reconciliation or we don't use one | ||
// for the cr. Neither way we want to retry. | ||
if (retry != null && postExecutionControl.exceptionDuringExecution() && | ||
!eventMarker.deleteEventPresent(executionScope.getCustomResourceID())) { | ||
handleRetryOnException(executionScope); | ||
final var monitor = monitor(); | ||
executionScope.getEvents() | ||
.forEach(e -> monitor.failedEvent(executionScope.getCustomResourceID(), e)); | ||
// todo revisit monitoring since events are not present anymore | ||
csviri marked this conversation as resolved.
Show resolved
Hide resolved
|
||
// final var monitor = monitor(); executionScope.getEvents().forEach(e -> | ||
// monitor.failedEvent(executionScope.getCustomResourceID(), e)); | ||
return; | ||
} | ||
|
||
if (retry != null) { | ||
markSuccessfulExecutionRegardingRetry(executionScope); | ||
handleSuccessfulExecutionRegardingRetry(executionScope); | ||
} | ||
if (containsCustomResourceDeletedEvent(executionScope.getEvents())) { | ||
cleanupAfterDeletedEvent(executionScope.getCustomResourceID()); | ||
if (eventMarker.deleteEventPresent(executionScope.getCustomResourceID())) { | ||
cleanupForDeletedEvent(executionScope.getCustomResourceID()); | ||
} else { | ||
var executed = executeBufferedEvents(executionScope.getCustomResourceID()); | ||
if (!executed) { | ||
reScheduleExecutionIfInstructed(postExecutionControl, executionScope.getCustomResource()); | ||
if (eventMarker.eventPresent(executionScope.getCustomResourceID())) { | ||
submitReconciliationExecution(executionScope.getCustomResourceID()); | ||
} else { | ||
reScheduleExecutionIfInstructed(postExecutionControl, | ||
executionScope.getCustomResource()); | ||
} | ||
} | ||
} finally { | ||
|
@@ -227,13 +240,13 @@ private void reScheduleExecutionIfInstructed(PostExecutionControl<R> postExecuti | |
private void handleRetryOnException(ExecutionScope<R> executionScope) { | ||
RetryExecution execution = getOrInitRetryExecution(executionScope); | ||
var customResourceID = executionScope.getCustomResourceID(); | ||
boolean newEventsExists = eventBuffer | ||
.newEventsExists(customResourceID); | ||
eventBuffer.putBackEvents(customResourceID, executionScope.getEvents()); | ||
boolean eventPresent = eventMarker.eventPresent(customResourceID); | ||
eventMarker.markEventReceived(customResourceID); | ||
|
||
if (newEventsExists) { | ||
log.debug("New events exists for for resource id: {}", customResourceID); | ||
executeBufferedEvents(customResourceID); | ||
if (eventPresent) { | ||
log.debug("New events exists for for resource id: {}", | ||
customResourceID); | ||
submitReconciliationExecution(customResourceID); | ||
return; | ||
} | ||
Optional<Long> nextDelay = execution.nextDelay(); | ||
|
@@ -251,7 +264,7 @@ private void handleRetryOnException(ExecutionScope<R> executionScope) { | |
() -> log.error("Exhausted retries for {}", executionScope)); | ||
} | ||
|
||
private void markSuccessfulExecutionRegardingRetry(ExecutionScope<R> executionScope) { | ||
private void handleSuccessfulExecutionRegardingRetry(ExecutionScope<R> executionScope) { | ||
log.debug( | ||
"Marking successful execution for resource: {}", | ||
getName(executionScope.getCustomResource())); | ||
|
@@ -270,9 +283,9 @@ private RetryExecution getOrInitRetryExecution(ExecutionScope<R> executionScope) | |
return retryExecution; | ||
} | ||
|
||
private void cleanupAfterDeletedEvent(CustomResourceID customResourceUid) { | ||
private void cleanupForDeletedEvent(CustomResourceID customResourceUid) { | ||
eventSourceManager.cleanup(customResourceUid); | ||
eventBuffer.cleanup(customResourceUid); | ||
eventMarker.cleanup(customResourceUid); | ||
} | ||
|
||
private boolean isControllerUnderExecution(CustomResourceID customResourceUid) { | ||
|
@@ -287,6 +300,15 @@ private void unsetUnderExecution(CustomResourceID customResourceUid) { | |
underProcessing.remove(customResourceUid); | ||
} | ||
|
||
@Override | ||
public void close() { | ||
lock.lock(); | ||
try { | ||
this.running = false; | ||
} finally { | ||
lock.unlock(); | ||
} | ||
} | ||
|
||
private class ControllerExecution implements Runnable { | ||
private final ExecutionScope<R> executionScope; | ||
|
45 changes: 0 additions & 45 deletions
45
...ator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/EventBuffer.java
This file was deleted.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, that's a significant change. So if you don't have a finalizer, you don't get a chance to clean up associated components? That seems wrong to me, the SDK should always give the users the opportunity to clean things up, regardless of finalizers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The thing is, that users either use finalizers or all dependent resources are cleaned up using owner reference by kubernetes. Otherwise it's not correct. Typically if the operator is down, the delete event will be missed, and cleanup would not happen anyways. So I think it is actually good to force this pattern.