diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java index 38ed240c97..c4193b9a2a 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java @@ -5,9 +5,13 @@ import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.function.Function; +import java.util.stream.Collectors; +import java.util.stream.Stream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -60,6 +64,31 @@ public synchronized static ExecutorServiceManager instance() { return instance; } + public static void executeAndWaitForAllToComplete(Stream stream, + Function task, Function threadNamer) { + final var instrumented = new InstrumentedExecutorService( + Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors())); + + try { + instrumented.invokeAll(stream.parallel().map(item -> (Callable) () -> { + // change thread name for easier debugging + final var thread = Thread.currentThread(); + final var name = thread.getName(); + thread.setName(threadNamer.apply(item)); + try { + task.apply(item); + return null; + } finally { + // restore original name + thread.setName(name); + } + }).collect(Collectors.toList())); + shutdown(instrumented); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + public ExecutorService executorService() { return executor; } @@ -71,17 +100,18 @@ public ExecutorService workflowExecutorService() { private void doStop() { try { log.debug("Closing executor"); - executor.shutdown(); - workflowExecutor.shutdown(); - if (!workflowExecutor.awaitTermination(terminationTimeoutSeconds, TimeUnit.SECONDS)) { - workflowExecutor.shutdownNow(); // if we timed out, waiting, cancel everything - } - if (!executor.awaitTermination(terminationTimeoutSeconds, TimeUnit.SECONDS)) { - executor.shutdownNow(); // if we timed out, waiting, cancel everything - } - + shutdown(executor); + shutdown(workflowExecutor); } catch (InterruptedException e) { log.debug("Exception closing executor: {}", e.getLocalizedMessage()); + Thread.currentThread().interrupt(); + } + } + + private static void shutdown(ExecutorService executorService) throws InterruptedException { + executorService.shutdown(); + if (!executorService.awaitTermination(instance().terminationTimeoutSeconds, TimeUnit.SECONDS)) { + executorService.shutdownNow(); // if we timed out, waiting, cancel everything } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java index a34c4b38f1..ca27ad4f2c 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java @@ -360,8 +360,8 @@ public void changeNamespaces(Set namespaces) { } public synchronized void startEventProcessing() { - log.info("Started event processing for controller: {}", configuration.getName()); eventProcessor.start(); + log.info("Started event processing for controller: {}", configuration.getName()); } private void throwMissingCRDException(String crdName, String specVersion, String controllerName) { diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java index a85633bb00..8e9c981031 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java @@ -4,6 +4,7 @@ import java.util.List; import java.util.Objects; import java.util.Set; +import java.util.function.Function; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -13,6 +14,7 @@ import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.MissingCRDException; import io.javaoperatorsdk.operator.OperatorException; +import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager; import io.javaoperatorsdk.operator.api.config.NamespaceChangeable; import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer; import io.javaoperatorsdk.operator.processing.Controller; @@ -65,20 +67,36 @@ public void postProcessDefaultEventSourcesAfterProcessorInitializer() { @Override public synchronized void start() { startEventSource(eventSources.namedControllerResourceEventSource()); - eventSources.additionalNamedEventSources() - .filter(es -> es.priority().equals(EventSourceStartPriority.RESOURCE_STATE_LOADER)) - .parallel().forEach(this::startEventSource); - eventSources.additionalNamedEventSources() - .filter(es -> es.priority().equals(EventSourceStartPriority.DEFAULT)) - .parallel().forEach(this::startEventSource); + + ExecutorServiceManager.executeAndWaitForAllToComplete( + eventSources.additionalNamedEventSources() + .filter(es -> es.priority().equals(EventSourceStartPriority.RESOURCE_STATE_LOADER)), + this::startEventSource, + getThreadNamer("start")); + + ExecutorServiceManager.executeAndWaitForAllToComplete( + eventSources.additionalNamedEventSources() + .filter(es -> es.priority().equals(EventSourceStartPriority.DEFAULT)), + this::startEventSource, + getThreadNamer("start")); + } + + private static Function getThreadNamer(String stage) { + return es -> { + final var name = es.name(); + return es.priority() + " " + stage + " -> " + + (es.isNameSet() ? name + " " + es.original().getClass().getSimpleName() : name); + }; } @Override public synchronized void stop() { stopEventSource(eventSources.namedControllerResourceEventSource()); - eventSources.additionalNamedEventSources().parallel().forEach(this::stopEventSource); + ExecutorServiceManager.executeAndWaitForAllToComplete( + eventSources.additionalNamedEventSources(), + this::stopEventSource, + getThreadNamer("stop")); eventSources.clear(); - } @SuppressWarnings("rawtypes") @@ -94,7 +112,7 @@ private void logEventSourceEvent(NamedEventSource eventSource, String event) { } } - private void startEventSource(NamedEventSource eventSource) { + private Void startEventSource(NamedEventSource eventSource) { try { logEventSourceEvent(eventSource, "Starting"); eventSource.start(); @@ -104,9 +122,10 @@ private void startEventSource(NamedEventSource eventSource) { } catch (Exception e) { throw new OperatorException("Couldn't start source " + eventSource.name(), e); } + return null; } - private void stopEventSource(NamedEventSource eventSource) { + private Void stopEventSource(NamedEventSource eventSource) { try { logEventSourceEvent(eventSource, "Stopping"); eventSource.stop(); @@ -114,6 +133,7 @@ private void stopEventSource(NamedEventSource eventSource) { } catch (Exception e) { log.warn("Error closing {} -> {}", eventSource.name(), e); } + return null; } public final void registerEventSource(EventSource eventSource) throws OperatorException { diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/NamedEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/NamedEventSource.java index 13d5a10323..4787e675c9 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/NamedEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/NamedEventSource.java @@ -4,6 +4,7 @@ import java.util.Optional; import io.javaoperatorsdk.operator.OperatorException; +import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer; import io.javaoperatorsdk.operator.processing.event.source.Configurable; import io.javaoperatorsdk.operator.processing.event.source.EventSource; import io.javaoperatorsdk.operator.processing.event.source.EventSourceStartPriority; @@ -13,10 +14,12 @@ class NamedEventSource implements EventSource, EventSourceMetadata { private final EventSource original; private final String name; + private final boolean nameSet; NamedEventSource(EventSource original, String name) { this.original = original; this.name = name; + nameSet = !name.equals(EventSourceInitializer.generateNameFor(original)); } @Override @@ -95,4 +98,8 @@ public int hashCode() { public EventSourceStartPriority priority() { return original.priority(); } + + public boolean isNameSet() { + return nameSet; + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java index 0d22ccdef0..ba7aa9f67f 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java @@ -45,7 +45,8 @@ public class InformerManager, Resource> client, @@ -113,7 +114,6 @@ private InformerWrapper createEventSource( @Override public void stop() { - log.info("Stopping {}", this); sources.forEach((ns, source) -> { try { log.debug("Stopping informer for namespace: {} -> {}", ns, source); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java index 601cb0c10c..de81815a96 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java @@ -64,7 +64,11 @@ public void start() throws OperatorException { if (!configService.stopOnInformerErrorDuringStartup()) { informer.exceptionHandler((b, t) -> !ExceptionHandler.isDeserializationException(t)); } + // change thread name for easier debugging + final var thread = Thread.currentThread(); + final var name = thread.getName(); try { + thread.setName(informerInfo() + " " + thread.getId()); var start = informer.start(); // note that in case we don't put here timeout and stopOnInformerErrorDuringStartup is // false, and there is a rbac issue the get never returns; therefore operator never really @@ -81,6 +85,9 @@ public void start() throws OperatorException { } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IllegalStateException(e); + } finally { + // restore original name + thread.setName(name); } } catch (Exception e) { @@ -143,6 +150,10 @@ public List byIndex(String indexName, String indexKey) { @Override public String toString() { - return "InformerWrapper [" + versionedFullResourceName() + "] (" + informer + ')'; + return informerInfo() + " (" + informer + ')'; + } + + private String informerInfo() { + return "InformerWrapper [" + versionedFullResourceName() + "]"; } } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourceManagerTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourceManagerTest.java index ab345145da..51a4b597f7 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourceManagerTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourceManagerTest.java @@ -4,6 +4,7 @@ import org.junit.jupiter.api.Test; +import io.fabric8.kubernetes.api.model.ConfigMap; import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.MockKubernetesClient; import io.javaoperatorsdk.operator.OperatorException; @@ -171,9 +172,9 @@ void changesNamespacesOnControllerAndInformerEventSources() { } private EventSourceManager initManager() { - final var configuration = MockControllerConfiguration.forResource(HasMetadata.class); + final var configuration = MockControllerConfiguration.forResource(ConfigMap.class); final Controller controller = new Controller(mock(Reconciler.class), configuration, - MockKubernetesClient.client(HasMetadata.class)); + MockKubernetesClient.client(ConfigMap.class)); return new EventSourceManager(controller); } } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java index 8b5c0202c0..a33658415e 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java @@ -10,6 +10,7 @@ import io.fabric8.kubernetes.api.model.apps.Deployment; import io.fabric8.kubernetes.client.KubernetesClient; import io.javaoperatorsdk.operator.MockKubernetesClient; +import io.javaoperatorsdk.operator.OperatorException; import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider; import io.javaoperatorsdk.operator.api.config.InformerStoppedHandler; import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration; @@ -259,7 +260,7 @@ void informerStoppedHandlerShouldBeCalledWhenInformerStops() { // by default informer fails to start if there is an exception in the client on start. // Throws the exception further. - assertThrows(RuntimeException.class, () -> informerEventSource.start()); + assertThrows(OperatorException.class, () -> informerEventSource.start()); verify(informerStoppedHandler, atLeastOnce()).onStop(any(), eq(exception)); } finally { ConfigurationServiceProvider.reset();