Skip to content

Commit 78d067e

Browse files
metacosmcsviri
authored andcommitted
fix: run event source start on specific thread pool
Fixes #1603
1 parent b81a192 commit 78d067e

File tree

3 files changed

+59
-28
lines changed

3 files changed

+59
-28
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import java.util.concurrent.Callable;
66
import java.util.concurrent.ExecutionException;
77
import java.util.concurrent.ExecutorService;
8+
import java.util.concurrent.ForkJoinPool;
89
import java.util.concurrent.Future;
910
import java.util.concurrent.TimeUnit;
1011
import java.util.concurrent.TimeoutException;
@@ -17,6 +18,9 @@ public class ExecutorServiceManager {
1718
private static ExecutorServiceManager instance;
1819
private final ExecutorService executor;
1920
private final ExecutorService workflowExecutor;
21+
22+
private static final ForkJoinPool threadPool = new ForkJoinPool(
23+
Runtime.getRuntime().availableProcessors());
2024
private final int terminationTimeoutSeconds;
2125

2226
private ExecutorServiceManager(ExecutorService executor, ExecutorService workflowExecutor,
@@ -68,6 +72,21 @@ public ExecutorService workflowExecutorService() {
6872
return workflowExecutor;
6973
}
7074

75+
public static void executeInParallel(Runnable callable) {
76+
executeInParallel(() -> {
77+
callable.run();
78+
return null;
79+
});
80+
}
81+
82+
public static <T> T executeInParallel(Callable<T> callable) {
83+
try {
84+
return threadPool.submit(callable).get();
85+
} catch (InterruptedException | ExecutionException e) {
86+
throw new RuntimeException(e);
87+
}
88+
}
89+
7190
private void doStop() {
7291
try {
7392
log.debug("Closing executor");
@@ -80,6 +99,7 @@ private void doStop() {
8099
executor.shutdownNow(); // if we timed out, waiting, cancel everything
81100
}
82101

102+
threadPool.shutdown();
83103
} catch (InterruptedException e) {
84104
log.debug("Exception closing executor: {}", e.getLocalizedMessage());
85105
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import io.fabric8.kubernetes.api.model.HasMetadata;
1414
import io.javaoperatorsdk.operator.MissingCRDException;
1515
import io.javaoperatorsdk.operator.OperatorException;
16+
import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager;
1617
import io.javaoperatorsdk.operator.api.config.NamespaceChangeable;
1718
import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer;
1819
import io.javaoperatorsdk.operator.processing.Controller;
@@ -65,20 +66,24 @@ public void postProcessDefaultEventSourcesAfterProcessorInitializer() {
6566
@Override
6667
public synchronized void start() {
6768
startEventSource(eventSources.namedControllerResourceEventSource());
68-
eventSources.additionalNamedEventSources()
69+
70+
// starting event sources on the workflow executor which shouldn't be used at this point
71+
ExecutorServiceManager.executeInParallel(() -> eventSources.additionalNamedEventSources()
6972
.filter(es -> es.priority().equals(EventSourceStartPriority.RESOURCE_STATE_LOADER))
70-
.parallel().forEach(this::startEventSource);
71-
eventSources.additionalNamedEventSources()
73+
.parallel()
74+
.forEach(this::startEventSource));
75+
76+
ExecutorServiceManager.executeInParallel(() -> eventSources.additionalNamedEventSources()
7277
.filter(es -> es.priority().equals(EventSourceStartPriority.DEFAULT))
73-
.parallel().forEach(this::startEventSource);
78+
.parallel().forEach(this::startEventSource));
7479
}
7580

7681
@Override
7782
public synchronized void stop() {
7883
stopEventSource(eventSources.namedControllerResourceEventSource());
79-
eventSources.additionalNamedEventSources().parallel().forEach(this::stopEventSource);
84+
ExecutorServiceManager.executeInParallel(
85+
() -> eventSources.additionalNamedEventSources().parallel().forEach(this::stopEventSource));
8086
eventSources.clear();
81-
8287
}
8388

8489
@SuppressWarnings("rawtypes")

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java

Lines changed: 28 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import io.javaoperatorsdk.operator.ReconcilerUtils;
2525
import io.javaoperatorsdk.operator.api.config.Cloner;
2626
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider;
27+
import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager;
2728
import io.javaoperatorsdk.operator.api.config.ResourceConfiguration;
2829
import io.javaoperatorsdk.operator.processing.LifecycleAware;
2930
import io.javaoperatorsdk.operator.processing.event.ResourceID;
@@ -45,7 +46,8 @@ public class InformerManager<T extends HasMetadata, C extends ResourceConfigurat
4546

4647
@Override
4748
public void start() throws OperatorException {
48-
sources.values().parallelStream().forEach(LifecycleAware::start);
49+
ExecutorServiceManager
50+
.executeInParallel(() -> sources.values().parallelStream().forEach(LifecycleAware::start));
4951
}
5052

5153
void initSources(MixedOperation<T, KubernetesResourceList<T>, Resource<T>> client,
@@ -86,18 +88,19 @@ public void changeNamespaces(Set<String> namespaces) {
8688
log.debug("Stopped informer {} for namespaces: {}", this, sourcesToRemove);
8789
sourcesToRemove.forEach(k -> sources.remove(k).stop());
8890

89-
namespaces.forEach(ns -> {
90-
if (!sources.containsKey(ns)) {
91-
final var source =
92-
createEventSource(
93-
client.inNamespace(ns).withLabelSelector(configuration.getLabelSelector()),
94-
eventHandler, ns);
95-
source.addIndexers(this.indexers);
96-
source.start();
97-
log.debug("Registered new {} -> {} for namespace: {}", this, source,
98-
ns);
99-
}
100-
});
91+
ExecutorServiceManager.executeInParallel(
92+
() -> namespaces.forEach(ns -> {
93+
if (!sources.containsKey(ns)) {
94+
final var source =
95+
createEventSource(
96+
client.inNamespace(ns).withLabelSelector(configuration.getLabelSelector()),
97+
eventHandler, ns);
98+
source.addIndexers(this.indexers);
99+
source.start();
100+
log.debug("Registered new {} -> {} for namespace: {}", this, source,
101+
ns);
102+
}
103+
}));
101104
}
102105

103106

@@ -113,15 +116,18 @@ private InformerWrapper<T> createEventSource(
113116

114117
@Override
115118
public void stop() {
116-
log.info("Stopping {}", this);
117-
sources.forEach((ns, source) -> {
118-
try {
119-
log.debug("Stopping informer for namespace: {} -> {}", ns, source);
120-
source.stop();
121-
} catch (Exception e) {
122-
log.warn("Error stopping informer for namespace: {} -> {}", ns, source, e);
123-
}
124-
});
119+
ExecutorServiceManager.executeInParallel(
120+
() -> {
121+
log.info("Stopping {}", this);
122+
sources.forEach((ns, source) -> {
123+
try {
124+
log.debug("Stopping informer for namespace: {} -> {}", ns, source);
125+
source.stop();
126+
} catch (Exception e) {
127+
log.warn("Error stopping informer for namespace: {} -> {}", ns, source, e);
128+
}
129+
});
130+
});
125131
}
126132

127133
@Override

0 commit comments

Comments
 (0)