From 863016d5a5912baa6bcfa8c73811aeec9b75b9a8 Mon Sep 17 00:00:00 2001 From: csviri Date: Wed, 21 Dec 2022 14:47:32 +0100 Subject: [PATCH 01/10] feat: operator can be restarted --- .../io/javaoperatorsdk/operator/Operator.java | 1 + .../operator/OperatorRestartIT.java | 48 +++++++++++++++++++ .../restart/ConfigMapDependentResource.java | 35 ++++++++++++++ .../restart/RestartTestCustomResource.java | 15 ++++++ .../sample/restart/RestartTestReconciler.java | 31 ++++++++++++ 5 files changed, 130 insertions(+) create mode 100644 operator-framework/src/test/java/io/javaoperatorsdk/operator/OperatorRestartIT.java create mode 100644 operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/restart/ConfigMapDependentResource.java create mode 100644 operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/restart/RestartTestCustomResource.java create mode 100644 operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/restart/RestartTestReconciler.java diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java index 12437155e9..96acfc9e22 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java @@ -137,6 +137,7 @@ public void stop() throws OperatorException { if (configurationService.closeClientOnStop()) { kubernetesClient.close(); } + started = false; } /** diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/OperatorRestartIT.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/OperatorRestartIT.java new file mode 100644 index 0000000000..ceca82d21c --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/OperatorRestartIT.java @@ -0,0 +1,48 @@ +package io.javaoperatorsdk.operator; + +import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension; +import io.javaoperatorsdk.operator.sample.informerrelatedbehavior.InformerRelatedBehaviorTestCustomResource; +import org.junit.jupiter.api.Test; + +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; +import io.fabric8.kubernetes.client.KubernetesClientBuilder; +import io.javaoperatorsdk.operator.sample.restart.RestartTestCustomResource; +import io.javaoperatorsdk.operator.sample.restart.RestartTestReconciler; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +class OperatorRestartIT { + + @Test + void operatorCanBeRestarted() { + try (var client = new KubernetesClientBuilder().build()) { + LocallyRunOperatorExtension.applyCrd(RestartTestCustomResource.class, + client); + Operator operator = new Operator(); + var reconciler = new RestartTestReconciler(); + operator.register(reconciler); + operator.start(); + + client.resource(testCustomResource()).createOrReplace(); + await().untilAsserted(() -> { + assertThat(reconciler.getNumberOfExecutions()).isGreaterThan(0); + }); + var reconcileNumberBeforeStop = reconciler.getNumberOfExecutions(); + operator.stop(); + operator.start(); + + await().untilAsserted(() -> { + assertThat(reconciler.getNumberOfExecutions()).isGreaterThan(reconcileNumberBeforeStop); + }); + } + } + + RestartTestCustomResource testCustomResource() { + RestartTestCustomResource cr = new RestartTestCustomResource(); + cr.setMetadata(new ObjectMetaBuilder() + .withName("test1") + .build()); + return cr; + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/restart/ConfigMapDependentResource.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/restart/ConfigMapDependentResource.java new file mode 100644 index 0000000000..edb4e2baff --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/restart/ConfigMapDependentResource.java @@ -0,0 +1,35 @@ +package io.javaoperatorsdk.operator.sample.restart; + +import java.util.Map; + +import io.fabric8.kubernetes.api.model.ConfigMap; +import io.fabric8.kubernetes.api.model.ConfigMapBuilder; +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; +import io.javaoperatorsdk.operator.api.reconciler.Context; +import io.javaoperatorsdk.operator.processing.dependent.kubernetes.CRUDKubernetesDependentResource; +import io.javaoperatorsdk.operator.processing.dependent.kubernetes.KubernetesDependent; + +@KubernetesDependent(labelSelector = "app=restart-test") +public class ConfigMapDependentResource + extends CRUDKubernetesDependentResource { + + public static final String DATA_KEY = "key"; + + public ConfigMapDependentResource() { + super(ConfigMap.class); + } + + @Override + protected ConfigMap desired(RestartTestCustomResource primary, + Context context) { + return new ConfigMapBuilder() + .withMetadata(new ObjectMetaBuilder() + .withLabels(Map.of("app", "restart-test")) + .withName(primary.getMetadata().getName()) + .withNamespace(primary.getMetadata().getNamespace()) + .build()) + .withData(Map.of(DATA_KEY, primary.getMetadata().getName())) + .build(); + + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/restart/RestartTestCustomResource.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/restart/RestartTestCustomResource.java new file mode 100644 index 0000000000..a3bcd31053 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/restart/RestartTestCustomResource.java @@ -0,0 +1,15 @@ +package io.javaoperatorsdk.operator.sample.restart; + +import io.fabric8.kubernetes.api.model.Namespaced; +import io.fabric8.kubernetes.client.CustomResource; +import io.fabric8.kubernetes.model.annotation.Group; +import io.fabric8.kubernetes.model.annotation.ShortNames; +import io.fabric8.kubernetes.model.annotation.Version; + +@Group("sample.javaoperatorsdk") +@Version("v1") +@ShortNames("rt") +public class RestartTestCustomResource + extends CustomResource + implements Namespaced { +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/restart/RestartTestReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/restart/RestartTestReconciler.java new file mode 100644 index 0000000000..decd9b597b --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/restart/RestartTestReconciler.java @@ -0,0 +1,31 @@ +package io.javaoperatorsdk.operator.sample.restart; + +import java.util.concurrent.atomic.AtomicInteger; + +import io.javaoperatorsdk.operator.api.reconciler.Context; +import io.javaoperatorsdk.operator.api.reconciler.ControllerConfiguration; +import io.javaoperatorsdk.operator.api.reconciler.Reconciler; +import io.javaoperatorsdk.operator.api.reconciler.UpdateControl; +import io.javaoperatorsdk.operator.api.reconciler.dependent.Dependent; +import io.javaoperatorsdk.operator.support.TestExecutionInfoProvider; + +@ControllerConfiguration( + dependents = @Dependent(type = ConfigMapDependentResource.class)) +public class RestartTestReconciler + implements Reconciler, TestExecutionInfoProvider { + + private final AtomicInteger numberOfExecutions = new AtomicInteger(0); + + @Override + public UpdateControl reconcile( + RestartTestCustomResource resource, + Context context) { + numberOfExecutions.addAndGet(1); + return UpdateControl.noUpdate(); + } + + public int getNumberOfExecutions() { + return numberOfExecutions.get(); + } + +} From 3dae636d180042c1be3c1835d78fcd7a01cc8d4d Mon Sep 17 00:00:00 2001 From: csviri Date: Wed, 21 Dec 2022 15:27:39 +0100 Subject: [PATCH 02/10] wip --- .../event/source/informer/InformerManager.java | 16 +++++++++++----- .../informer/ManagedInformerEventSource.java | 4 ++-- 2 files changed, 13 insertions(+), 7 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java index c6636a4a5d..d15f0e909c 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java @@ -46,18 +46,24 @@ public class InformerManager eventHandler; private final Map>> indexers = new HashMap<>(); + public InformerManager(MixedOperation, Resource> client, + C configuration, + ResourceEventHandler eventHandler) { + this.client = client; + this.configuration = configuration; + this.eventHandler = eventHandler; + } + @Override public void start() throws OperatorException { + initSources(); // make sure informers are all started before proceeding further sources.values().parallelStream().forEach(InformerWrapper::start); } - void initSources(MixedOperation, Resource> client, - C configuration, ResourceEventHandler eventHandler) { + private void initSources() { + sources.clear(); cloner = ConfigurationServiceProvider.instance().getResourceCloner(); - this.configuration = configuration; - this.client = client; - this.eventHandler = eventHandler; final var targetNamespaces = configuration.getEffectiveNamespaces(); if (ResourceConfiguration.allNamespacesWatched(targetNamespaces)) { diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index 3e7400b90f..dc48318e44 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -37,14 +37,14 @@ public abstract class ManagedInformerEventSource temporaryResourceCache; - protected InformerManager cache = new InformerManager<>(); + protected InformerManager cache; protected C configuration; protected ManagedInformerEventSource( MixedOperation, Resource> client, C configuration) { super(configuration.getResourceClass()); temporaryResourceCache = new TemporaryResourceCache<>(this); - manager().initSources(client, configuration, this); + cache = new InformerManager<>(client,configuration,this); this.configuration = configuration; } From 508fec1da151a92cedd95e1510442b0dc6af7415 Mon Sep 17 00:00:00 2001 From: csviri Date: Wed, 21 Dec 2022 15:49:46 +0100 Subject: [PATCH 03/10] wip --- .../src/main/java/io/javaoperatorsdk/operator/Operator.java | 2 ++ .../processing/event/source/informer/InformerManager.java | 6 ++++-- .../java/io/javaoperatorsdk/operator/OperatorRestartIT.java | 3 ++- 3 files changed, 8 insertions(+), 3 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java index 96acfc9e22..45636830d1 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java @@ -32,6 +32,7 @@ public class Operator implements LifecycleAware { private final LeaderElectionManager leaderElectionManager = new LeaderElectionManager(controllerManager); private volatile boolean started = false; + private volatile boolean clientClosed = false; public Operator() { this((KubernetesClient) null); @@ -137,6 +138,7 @@ public void stop() throws OperatorException { if (configurationService.closeClientOnStop()) { kubernetesClient.close(); } + started = false; } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java index d15f0e909c..0488a3d790 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java @@ -62,9 +62,10 @@ public void start() throws OperatorException { } private void initSources() { - sources.clear(); + if (!sources.isEmpty()) { + throw new IllegalStateException("Some sources already initialized."); + } cloner = ConfigurationServiceProvider.instance().getResourceCloner(); - final var targetNamespaces = configuration.getEffectiveNamespaces(); if (ResourceConfiguration.allNamespacesWatched(targetNamespaces)) { var source = createEventSourceForNamespace(WATCH_ALL_NAMESPACES); @@ -136,6 +137,7 @@ public void stop() { log.warn("Error stopping informer for namespace: {} -> {}", ns, source, e); } }); + sources.clear(); } @Override diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/OperatorRestartIT.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/OperatorRestartIT.java index ceca82d21c..a3eb1ee71f 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/OperatorRestartIT.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/OperatorRestartIT.java @@ -19,7 +19,8 @@ void operatorCanBeRestarted() { try (var client = new KubernetesClientBuilder().build()) { LocallyRunOperatorExtension.applyCrd(RestartTestCustomResource.class, client); - Operator operator = new Operator(); + // todo check if this is good enough for Quarkus dev mode + Operator operator = new Operator(o->o.withCloseClientOnStop(false)); var reconciler = new RestartTestReconciler(); operator.register(reconciler); operator.start(); From e59ab202749d23569f1238a7ecb85b1a2fb90955 Mon Sep 17 00:00:00 2001 From: csviri Date: Thu, 22 Dec 2022 10:03:48 +0100 Subject: [PATCH 04/10] fixes --- .../main/java/io/javaoperatorsdk/operator/Operator.java | 3 ++- .../operator/processing/event/EventProcessor.java | 3 ++- .../operator/processing/event/EventSourceManager.java | 1 - .../processing/event/source/informer/InformerManager.java | 4 ++-- .../event/source/informer/ManagedInformerEventSource.java | 2 +- .../processing/event/source/timer/TimerEventSource.java | 3 ++- .../io/javaoperatorsdk/operator/OperatorRestartIT.java | 7 +++---- 7 files changed, 12 insertions(+), 11 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java index 45636830d1..18dd43759b 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java @@ -133,7 +133,8 @@ public void stop() throws OperatorException { log.info( "Operator SDK {} is shutting down...", configurationService.getVersion().getSdkVersion()); controllerManager.stop(); - ExecutorServiceManager.stop(); + // todo + // ExecutorServiceManager.stop(); leaderElectionManager.stop(); if (configurationService.closeClientOnStop()) { kubernetesClient.close(); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java index 59962e39f1..8a7e2d4f68 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java @@ -424,7 +424,8 @@ public void run() { @Override public String toString() { - return controllerName() + " -> " + executionScope; + return controllerName() + " -> " + + (executionScope.getResource() != null ? executionScope : resourceID); } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java index 6a6aae471a..be412dcd68 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java @@ -97,7 +97,6 @@ public synchronized void stop() { eventSources.additionalNamedEventSources(), this::stopEventSource, getThreadNamer("stop")); - eventSources.clear(); } @SuppressWarnings("rawtypes") diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java index 0488a3d790..1905a6c8bf 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java @@ -47,8 +47,8 @@ public class InformerManager>> indexers = new HashMap<>(); public InformerManager(MixedOperation, Resource> client, - C configuration, - ResourceEventHandler eventHandler) { + C configuration, + ResourceEventHandler eventHandler) { this.client = client; this.configuration = configuration; this.eventHandler = eventHandler; diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index dc48318e44..d4fed3b816 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -44,7 +44,7 @@ protected ManagedInformerEventSource( MixedOperation, Resource> client, C configuration) { super(configuration.getResourceClass()); temporaryResourceCache = new TemporaryResourceCache<>(this); - cache = new InformerManager<>(client,configuration,this); + cache = new InformerManager<>(client, configuration, this); this.configuration = configuration; } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/timer/TimerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/timer/TimerEventSource.java index f22400453a..3b97d22a1f 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/timer/TimerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/timer/TimerEventSource.java @@ -20,7 +20,7 @@ public class TimerEventSource implements ResourceEventAware { private static final Logger log = LoggerFactory.getLogger(TimerEventSource.class); - private final Timer timer = new Timer(true); + private Timer timer; private final AtomicBoolean running = new AtomicBoolean(); private final Map onceTasks = new ConcurrentHashMap<>(); @@ -55,6 +55,7 @@ public void cancelOnceSchedule(ResourceID customResourceUid) { @Override public void start() { + timer = new Timer(true); running.set(true); } diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/OperatorRestartIT.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/OperatorRestartIT.java index a3eb1ee71f..ec3e597b1e 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/OperatorRestartIT.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/OperatorRestartIT.java @@ -1,11 +1,10 @@ package io.javaoperatorsdk.operator; -import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension; -import io.javaoperatorsdk.operator.sample.informerrelatedbehavior.InformerRelatedBehaviorTestCustomResource; import org.junit.jupiter.api.Test; import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; import io.fabric8.kubernetes.client.KubernetesClientBuilder; +import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension; import io.javaoperatorsdk.operator.sample.restart.RestartTestCustomResource; import io.javaoperatorsdk.operator.sample.restart.RestartTestReconciler; @@ -18,9 +17,9 @@ class OperatorRestartIT { void operatorCanBeRestarted() { try (var client = new KubernetesClientBuilder().build()) { LocallyRunOperatorExtension.applyCrd(RestartTestCustomResource.class, - client); + client); // todo check if this is good enough for Quarkus dev mode - Operator operator = new Operator(o->o.withCloseClientOnStop(false)); + Operator operator = new Operator(o -> o.withCloseClientOnStop(false)); var reconciler = new RestartTestReconciler(); operator.register(reconciler); operator.start(); From 14727af2fc7b629aa85b4e260b6b483846d05cae Mon Sep 17 00:00:00 2001 From: csviri Date: Thu, 22 Dec 2022 10:43:10 +0100 Subject: [PATCH 05/10] remove not used flag --- .../src/main/java/io/javaoperatorsdk/operator/Operator.java | 1 - 1 file changed, 1 deletion(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java index 18dd43759b..40d85f4f41 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java @@ -32,7 +32,6 @@ public class Operator implements LifecycleAware { private final LeaderElectionManager leaderElectionManager = new LeaderElectionManager(controllerManager); private volatile boolean started = false; - private volatile boolean clientClosed = false; public Operator() { this((KubernetesClient) null); From b3749d313cddade1352653592db89f593eb7fafa Mon Sep 17 00:00:00 2001 From: csviri Date: Thu, 22 Dec 2022 10:59:44 +0100 Subject: [PATCH 06/10] indexer fix --- .../processing/event/source/informer/InformerManager.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java index 1905a6c8bf..ed4ed5c098 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java @@ -93,7 +93,6 @@ public void changeNamespaces(Set namespaces) { namespaces.forEach(ns -> { if (!sources.containsKey(ns)) { final InformerWrapper source = createEventSourceForNamespace(ns); - source.addIndexers(this.indexers); source.start(); log.debug("Registered new {} -> {} for namespace: {}", this, source, ns); @@ -113,6 +112,7 @@ private InformerWrapper createEventSourceForNamespace(String namespace) { client.inNamespace(namespace).withLabelSelector(configuration.getLabelSelector()), eventHandler, namespace); } + source.addIndexers(indexers); return source; } @@ -185,7 +185,6 @@ private Optional> getSource(String namespace) { @Override public void addIndexers(Map>> indexers) { this.indexers.putAll(indexers); - sources.values().forEach(s -> s.addIndexers(indexers)); } @Override From 71635d7adcdb4bf999ed789f1c7d4644374ea2bf Mon Sep 17 00:00:00 2001 From: csviri Date: Thu, 22 Dec 2022 11:18:59 +0100 Subject: [PATCH 07/10] improvemets --- .../java/io/javaoperatorsdk/operator/Operator.java | 4 ++-- .../operator/processing/event/EventProcessor.java | 13 ++----------- .../javaoperatorsdk/operator/OperatorRestartIT.java | 2 +- 3 files changed, 5 insertions(+), 14 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java index 40d85f4f41..b7c2f5e108 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/Operator.java @@ -132,8 +132,8 @@ public void stop() throws OperatorException { log.info( "Operator SDK {} is shutting down...", configurationService.getVersion().getSdkVersion()); controllerManager.stop(); - // todo - // ExecutorServiceManager.stop(); + + ExecutorServiceManager.stop(); leaderElectionManager.stop(); if (configurationService.closeClientOnStop()) { kubernetesClient.close(); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java index 8a7e2d4f68..2ecfc3b009 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java @@ -5,14 +5,12 @@ import java.util.Map; import java.util.Optional; import java.util.concurrent.ExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.OperatorException; -import io.javaoperatorsdk.operator.api.config.ConfigurationService; import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider; import io.javaoperatorsdk.operator.api.config.ControllerConfiguration; import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager; @@ -40,20 +38,19 @@ public class EventProcessor

implements EventHandler, Life private final ControllerConfiguration controllerConfiguration; private final ReconciliationDispatcher

reconciliationDispatcher; private final Retry retry; - private final ExecutorService executor; private final Metrics metrics; private final Cache

cache; private final EventSourceManager

eventSourceManager; private final RateLimiter rateLimiter; private final ResourceStateManager resourceStateManager = new ResourceStateManager(); private final Map metricsMetadata; + private ExecutorService executor; public EventProcessor(EventSourceManager

eventSourceManager) { this( eventSourceManager.getController().getConfiguration(), eventSourceManager.getControllerResourceEventSource(), - ExecutorServiceManager.instance().executorService(), new ReconciliationDispatcher<>(eventSourceManager.getController()), ConfigurationServiceProvider.instance().getMetrics(), eventSourceManager); @@ -68,7 +65,6 @@ public EventProcessor(EventSourceManager

eventSourceManager) { this( controllerConfiguration, eventSourceManager.getControllerResourceEventSource(), - null, reconciliationDispatcher, metrics, eventSourceManager); @@ -78,17 +74,11 @@ public EventProcessor(EventSourceManager

eventSourceManager) { private EventProcessor( ControllerConfiguration controllerConfiguration, Cache

cache, - ExecutorService executor, ReconciliationDispatcher

reconciliationDispatcher, Metrics metrics, EventSourceManager

eventSourceManager) { this.controllerConfiguration = controllerConfiguration; this.running = false; - this.executor = - executor == null - ? new ScheduledThreadPoolExecutor( - ConfigurationService.DEFAULT_RECONCILIATION_THREADS_NUMBER) - : executor; this.reconciliationDispatcher = reconciliationDispatcher; this.retry = controllerConfiguration.getRetry(); this.cache = cache; @@ -376,6 +366,7 @@ public synchronized void stop() { @Override public void start() throws OperatorException { + executor = ExecutorServiceManager.instance().executorService(); this.running = true; handleAlreadyMarkedEvents(); } diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/OperatorRestartIT.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/OperatorRestartIT.java index ec3e597b1e..67c8fb8615 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/OperatorRestartIT.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/OperatorRestartIT.java @@ -18,7 +18,7 @@ void operatorCanBeRestarted() { try (var client = new KubernetesClientBuilder().build()) { LocallyRunOperatorExtension.applyCrd(RestartTestCustomResource.class, client); - // todo check if this is good enough for Quarkus dev mode + // TODO check if this is good enough for Quarkus dev mode Operator operator = new Operator(o -> o.withCloseClientOnStop(false)); var reconciler = new RestartTestReconciler(); operator.register(reconciler); From 2a3a9e0efdb2da86683aa54c3dae20687de43cfd Mon Sep 17 00:00:00 2001 From: csviri Date: Thu, 22 Dec 2022 11:20:58 +0100 Subject: [PATCH 08/10] wip --- .../operator/processing/event/EventProcessor.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java index 2ecfc3b009..a75f51f981 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventProcessor.java @@ -46,7 +46,6 @@ public class EventProcessor

implements EventHandler, Life private final Map metricsMetadata; private ExecutorService executor; - public EventProcessor(EventSourceManager

eventSourceManager) { this( eventSourceManager.getController().getConfiguration(), @@ -366,6 +365,7 @@ public synchronized void stop() { @Override public void start() throws OperatorException { + // on restart new executor service is created and needs to be set here executor = ExecutorServiceManager.instance().executorService(); this.running = true; handleAlreadyMarkedEvents(); From b29500f9674f4ec512540cd0a371901d4319908b Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Thu, 5 Jan 2023 10:45:13 +0100 Subject: [PATCH 09/10] fix: avoid NPE on stop if things were not started --- .../source/informer/InformerManager.java | 6 +++--- .../event/source/timer/TimerEventSource.java | 21 +++++++++++-------- 2 files changed, 15 insertions(+), 12 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java index ed4ed5c098..50ad1d6567 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java @@ -41,9 +41,9 @@ public class InformerManager> sources = new ConcurrentHashMap<>(); private Cloner cloner; - private C configuration; - private MixedOperation, Resource> client; - private ResourceEventHandler eventHandler; + private final C configuration; + private final MixedOperation, Resource> client; + private final ResourceEventHandler eventHandler; private final Map>> indexers = new HashMap<>(); public InformerManager(MixedOperation, Resource> client, diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/timer/TimerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/timer/TimerEventSource.java index 3b97d22a1f..fe641e0b0b 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/timer/TimerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/timer/TimerEventSource.java @@ -4,7 +4,6 @@ import java.util.Timer; import java.util.TimerTask; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -21,15 +20,15 @@ public class TimerEventSource private static final Logger log = LoggerFactory.getLogger(TimerEventSource.class); private Timer timer; - private final AtomicBoolean running = new AtomicBoolean(); private final Map onceTasks = new ConcurrentHashMap<>(); + @SuppressWarnings("unused") public void scheduleOnce(R resource, long delay) { scheduleOnce(ResourceID.fromResource(resource), delay); } public void scheduleOnce(ResourceID resourceID, long delay) { - if (!running.get()) { + if (!isRunning()) { throw new IllegalStateException("The TimerEventSource is not running"); } @@ -55,15 +54,19 @@ public void cancelOnceSchedule(ResourceID customResourceUid) { @Override public void start() { - timer = new Timer(true); - running.set(true); + if (!isRunning()) { + super.start(); + timer = new Timer(true); + } } @Override public void stop() { - running.set(false); - onceTasks.keySet().forEach(this::cancelOnceSchedule); - timer.cancel(); + if (isRunning()) { + onceTasks.keySet().forEach(this::cancelOnceSchedule); + timer.cancel(); + super.stop(); + } } public class EventProducerTimeTask extends TimerTask { @@ -76,7 +79,7 @@ public EventProducerTimeTask(ResourceID customResourceUid) { @Override public void run() { - if (running.get()) { + if (isRunning()) { log.debug("Producing event for custom resource id: {}", customResourceUid); getEventHandler().handleEvent(new Event(customResourceUid)); } From 64f15fce38c453cb68649f103736b3f37546997e Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Thu, 5 Jan 2023 14:40:09 +0100 Subject: [PATCH 10/10] chore: make test more "realistic" --- .../operator/OperatorRestartIT.java | 59 ++++++++++++------- 1 file changed, 37 insertions(+), 22 deletions(-) diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/OperatorRestartIT.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/OperatorRestartIT.java index 67c8fb8615..45b88a126b 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/OperatorRestartIT.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/OperatorRestartIT.java @@ -1,8 +1,13 @@ package io.javaoperatorsdk.operator; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Order; import org.junit.jupiter.api.Test; import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; +import io.fabric8.kubernetes.client.KubernetesClient; import io.fabric8.kubernetes.client.KubernetesClientBuilder; import io.javaoperatorsdk.operator.junit.LocallyRunOperatorExtension; import io.javaoperatorsdk.operator.sample.restart.RestartTestCustomResource; @@ -12,30 +17,40 @@ import static org.awaitility.Awaitility.await; class OperatorRestartIT { + private final static KubernetesClient client = new KubernetesClientBuilder().build(); + private final static Operator operator = new Operator(o -> o.withCloseClientOnStop(false)); + private final static RestartTestReconciler reconciler = new RestartTestReconciler(); + private static int reconcileNumberBeforeStop = 0; + + @BeforeAll + static void registerReconciler() { + LocallyRunOperatorExtension.applyCrd(RestartTestCustomResource.class, client); + operator.register(reconciler); + } + + @BeforeEach + void startOperator() { + operator.start(); + } + + @AfterEach + void stopOperator() { + operator.stop(); + } + + @Test + @Order(1) + void createResource() { + client.resource(testCustomResource()).createOrReplace(); + await().untilAsserted(() -> assertThat(reconciler.getNumberOfExecutions()).isGreaterThan(0)); + reconcileNumberBeforeStop = reconciler.getNumberOfExecutions(); + } @Test - void operatorCanBeRestarted() { - try (var client = new KubernetesClientBuilder().build()) { - LocallyRunOperatorExtension.applyCrd(RestartTestCustomResource.class, - client); - // TODO check if this is good enough for Quarkus dev mode - Operator operator = new Operator(o -> o.withCloseClientOnStop(false)); - var reconciler = new RestartTestReconciler(); - operator.register(reconciler); - operator.start(); - - client.resource(testCustomResource()).createOrReplace(); - await().untilAsserted(() -> { - assertThat(reconciler.getNumberOfExecutions()).isGreaterThan(0); - }); - var reconcileNumberBeforeStop = reconciler.getNumberOfExecutions(); - operator.stop(); - operator.start(); - - await().untilAsserted(() -> { - assertThat(reconciler.getNumberOfExecutions()).isGreaterThan(reconcileNumberBeforeStop); - }); - } + @Order(2) + void reconcile() { + await().untilAsserted(() -> assertThat(reconciler.getNumberOfExecutions()) + .isGreaterThan(reconcileNumberBeforeStop)); } RestartTestCustomResource testCustomResource() {