Skip to content

Commit 83c828b

Browse files
metacosmcsviri
authored andcommitted
fix: make sure we run each task on a different thread
1 parent 824c192 commit 83c828b

File tree

4 files changed

+62
-51
lines changed

4 files changed

+62
-51
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java

Lines changed: 28 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,13 @@
99
import java.util.concurrent.Future;
1010
import java.util.concurrent.TimeUnit;
1111
import java.util.concurrent.TimeoutException;
12+
import java.util.function.Function;
13+
import java.util.stream.Collectors;
14+
import java.util.stream.Stream;
1215

1316
import org.slf4j.Logger;
1417
import org.slf4j.LoggerFactory;
1518

16-
import io.javaoperatorsdk.operator.OperatorException;
17-
1819
public class ExecutorServiceManager {
1920
private static final Logger log = LoggerFactory.getLogger(ExecutorServiceManager.class);
2021
private static ExecutorServiceManager instance;
@@ -63,6 +64,31 @@ public synchronized static ExecutorServiceManager instance() {
6364
return instance;
6465
}
6566

67+
public static <T> void executeAndWaitForAllToComplete(Stream<T> stream,
68+
Function<T, Void> task, Function<T, String> threadNamer) {
69+
final var instrumented = new InstrumentedExecutorService(
70+
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
71+
72+
try {
73+
instrumented.invokeAll(stream.parallel().map(item -> (Callable<Void>) () -> {
74+
// change thread name for easier debugging
75+
final var thread = Thread.currentThread();
76+
final var name = thread.getName();
77+
thread.setName(threadNamer.apply(item));
78+
try {
79+
task.apply(item);
80+
return null;
81+
} finally {
82+
// restore original name
83+
thread.setName(name);
84+
}
85+
}).collect(Collectors.toList()));
86+
shutdown(instrumented);
87+
} catch (InterruptedException e) {
88+
Thread.currentThread().interrupt();
89+
}
90+
}
91+
6692
public ExecutorService executorService() {
6793
return executor;
6894
}
@@ -71,36 +97,6 @@ public ExecutorService workflowExecutorService() {
7197
return workflowExecutor;
7298
}
7399

74-
/**
75-
* Runs the specified I/O-bound task and waits for its completion using the new ExecutorService
76-
*
77-
* @param task task to run concurrently
78-
* @param threadNamePrefix the prefix with which to prefix thread names when tasks are run this
79-
* way
80-
*/
81-
public static void executeAndWaitForCompletion(Runnable task, String threadNamePrefix) {
82-
final var executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
83-
ExecutorService instrumented = new InstrumentedExecutorService(executor);
84-
// change thread name for easier debugging
85-
final var thread = Thread.currentThread();
86-
final var name = thread.getName();
87-
try {
88-
thread.setName(threadNamePrefix + "-" + thread.getId());
89-
instrumented.submit(task)
90-
.get(ConfigurationServiceProvider.instance().cacheSyncTimeout().toSeconds(),
91-
TimeUnit.SECONDS);
92-
shutdown(instrumented);
93-
} catch (InterruptedException e) {
94-
thread.interrupt();
95-
throw new OperatorException("Couldn't execute task", e);
96-
} catch (ExecutionException | TimeoutException e) {
97-
throw new OperatorException("Couldn't execute task", e);
98-
} finally {
99-
// restore original name
100-
thread.setName(name);
101-
}
102-
}
103-
104100
private void doStop() {
105101
try {
106102
log.debug("Closing executor");

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import java.util.List;
55
import java.util.Objects;
66
import java.util.Set;
7+
import java.util.function.Function;
78
import java.util.stream.Collectors;
89
import java.util.stream.Stream;
910

@@ -67,28 +68,34 @@ public void postProcessDefaultEventSourcesAfterProcessorInitializer() {
6768
public synchronized void start() {
6869
startEventSource(eventSources.namedControllerResourceEventSource());
6970

70-
// starting event sources on the workflow executor which shouldn't be used at this point
71-
ExecutorServiceManager.executeAndWaitForCompletion(
72-
() -> eventSources.additionalNamedEventSources()
73-
.filter(es -> es.priority().equals(EventSourceStartPriority.RESOURCE_STATE_LOADER))
74-
.parallel()
75-
.forEach(this::startEventSource),
76-
"LowLevelEventSourceStart");
71+
ExecutorServiceManager.executeAndWaitForAllToComplete(
72+
eventSources.additionalNamedEventSources()
73+
.filter(es -> es.priority().equals(EventSourceStartPriority.RESOURCE_STATE_LOADER)),
74+
this::startEventSource,
75+
getThreadNamer("start"));
7776

78-
ExecutorServiceManager.executeAndWaitForCompletion(
79-
() -> eventSources.additionalNamedEventSources()
80-
.filter(es -> es.priority().equals(EventSourceStartPriority.DEFAULT))
81-
.parallel().forEach(this::startEventSource),
82-
"DefaultEventSourceStart");
77+
ExecutorServiceManager.executeAndWaitForAllToComplete(
78+
eventSources.additionalNamedEventSources()
79+
.filter(es -> es.priority().equals(EventSourceStartPriority.DEFAULT)),
80+
this::startEventSource,
81+
getThreadNamer("start"));
82+
}
83+
84+
private static Function<NamedEventSource, String> getThreadNamer(String stage) {
85+
return es -> {
86+
final var name = es.name();
87+
return es.priority() + " " + stage + " -> "
88+
+ (es.isNameSet() ? name + " " + es.original().getClass().getSimpleName() : name);
89+
};
8390
}
8491

8592
@Override
8693
public synchronized void stop() {
8794
stopEventSource(eventSources.namedControllerResourceEventSource());
88-
ExecutorServiceManager.executeAndWaitForCompletion(
89-
() -> eventSources.additionalNamedEventSources().parallel()
90-
.forEach(this::stopEventSource),
91-
"EventSourceStop");
95+
ExecutorServiceManager.executeAndWaitForAllToComplete(
96+
eventSources.additionalNamedEventSources(),
97+
this::stopEventSource,
98+
getThreadNamer("stop"));
9299
eventSources.clear();
93100
}
94101

@@ -105,7 +112,7 @@ private void logEventSourceEvent(NamedEventSource eventSource, String event) {
105112
}
106113
}
107114

108-
private void startEventSource(NamedEventSource eventSource) {
115+
private Void startEventSource(NamedEventSource eventSource) {
109116
try {
110117
logEventSourceEvent(eventSource, "Starting");
111118
eventSource.start();
@@ -115,16 +122,18 @@ private void startEventSource(NamedEventSource eventSource) {
115122
} catch (Exception e) {
116123
throw new OperatorException("Couldn't start source " + eventSource.name(), e);
117124
}
125+
return null;
118126
}
119127

120-
private void stopEventSource(NamedEventSource eventSource) {
128+
private Void stopEventSource(NamedEventSource eventSource) {
121129
try {
122130
logEventSourceEvent(eventSource, "Stopping");
123131
eventSource.stop();
124132
logEventSourceEvent(eventSource, "Stopped");
125133
} catch (Exception e) {
126134
log.warn("Error closing {} -> {}", eventSource.name(), e);
127135
}
136+
return null;
128137
}
129138

130139
public final void registerEventSource(EventSource eventSource) throws OperatorException {

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/NamedEventSource.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import java.util.Optional;
55

66
import io.javaoperatorsdk.operator.OperatorException;
7+
import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer;
78
import io.javaoperatorsdk.operator.processing.event.source.Configurable;
89
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
910
import io.javaoperatorsdk.operator.processing.event.source.EventSourceStartPriority;
@@ -13,10 +14,12 @@ class NamedEventSource implements EventSource, EventSourceMetadata {
1314

1415
private final EventSource original;
1516
private final String name;
17+
private final boolean nameSet;
1618

1719
NamedEventSource(EventSource original, String name) {
1820
this.original = original;
1921
this.name = name;
22+
nameSet = !name.equals(EventSourceInitializer.generateNameFor(original));
2023
}
2124

2225
@Override
@@ -95,4 +98,8 @@ public int hashCode() {
9598
public EventSourceStartPriority priority() {
9699
return original.priority();
97100
}
101+
102+
public boolean isNameSet() {
103+
return nameSet;
104+
}
98105
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,6 @@ private InformerWrapper<T> createEventSource(
114114

115115
@Override
116116
public void stop() {
117-
log.info("Stopping {}", this);
118117
sources.forEach((ns, source) -> {
119118
try {
120119
log.debug("Stopping informer for namespace: {} -> {}", ns, source);

0 commit comments

Comments
 (0)