Skip to content

Commit b6dd3a2

Browse files
committed
fix: make sure we run each task on a different thread
1 parent 804fb0e commit b6dd3a2

File tree

4 files changed

+62
-51
lines changed

4 files changed

+62
-51
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ExecutorServiceManager.java

Lines changed: 28 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -9,12 +9,13 @@
99
import java.util.concurrent.Future;
1010
import java.util.concurrent.TimeUnit;
1111
import java.util.concurrent.TimeoutException;
12+
import java.util.function.Function;
13+
import java.util.stream.Collectors;
14+
import java.util.stream.Stream;
1215

1316
import org.slf4j.Logger;
1417
import org.slf4j.LoggerFactory;
1518

16-
import io.javaoperatorsdk.operator.OperatorException;
17-
1819
public class ExecutorServiceManager {
1920
private static final Logger log = LoggerFactory.getLogger(ExecutorServiceManager.class);
2021
private static ExecutorServiceManager instance;
@@ -63,6 +64,31 @@ public synchronized static ExecutorServiceManager instance() {
6364
return instance;
6465
}
6566

67+
public static <T> void executeAndWaitForAllToComplete(Stream<T> stream,
68+
Function<T, Void> task, Function<T, String> threadNamer) {
69+
final var instrumented = new InstrumentedExecutorService(
70+
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));
71+
72+
try {
73+
instrumented.invokeAll(stream.parallel().map(item -> (Callable<Void>) () -> {
74+
// change thread name for easier debugging
75+
final var thread = Thread.currentThread();
76+
final var name = thread.getName();
77+
thread.setName(threadNamer.apply(item));
78+
try {
79+
task.apply(item);
80+
return null;
81+
} finally {
82+
// restore original name
83+
thread.setName(name);
84+
}
85+
}).collect(Collectors.toList()));
86+
shutdown(instrumented);
87+
} catch (InterruptedException e) {
88+
Thread.currentThread().interrupt();
89+
}
90+
}
91+
6692
public ExecutorService executorService() {
6793
return executor;
6894
}
@@ -71,36 +97,6 @@ public ExecutorService workflowExecutorService() {
7197
return workflowExecutor;
7298
}
7399

74-
/**
75-
* Runs the specified I/O-bound task and waits for its completion using the new ExecutorService
76-
*
77-
* @param task task to run concurrently
78-
* @param threadNamePrefix the prefix with which to prefix thread names when tasks are run this
79-
* way
80-
*/
81-
public static void executeAndWaitForCompletion(Runnable task, String threadNamePrefix) {
82-
final var executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
83-
ExecutorService instrumented = new InstrumentedExecutorService(executor);
84-
// change thread name for easier debugging
85-
final var thread = Thread.currentThread();
86-
final var name = thread.getName();
87-
try {
88-
thread.setName(threadNamePrefix + "-" + thread.getId());
89-
instrumented.submit(task)
90-
.get(ConfigurationServiceProvider.instance().cacheSyncTimeout().toSeconds(),
91-
TimeUnit.SECONDS);
92-
shutdown(instrumented);
93-
} catch (InterruptedException e) {
94-
thread.interrupt();
95-
throw new OperatorException("Couldn't execute task", e);
96-
} catch (ExecutionException | TimeoutException e) {
97-
throw new OperatorException("Couldn't execute task", e);
98-
} finally {
99-
// restore original name
100-
thread.setName(name);
101-
}
102-
}
103-
104100
private void doStop() {
105101
try {
106102
log.debug("Closing executor");

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSourceManager.java

Lines changed: 27 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import java.util.List;
55
import java.util.Objects;
66
import java.util.Set;
7+
import java.util.function.Function;
78
import java.util.stream.Collectors;
89

910
import org.slf4j.Logger;
@@ -66,28 +67,34 @@ public void postProcessDefaultEventSourcesAfterProcessorInitializer() {
6667
public synchronized void start() {
6768
startEventSource(eventSources.namedControllerResourceEventSource());
6869

69-
// starting event sources on the workflow executor which shouldn't be used at this point
70-
ExecutorServiceManager.executeAndWaitForCompletion(
71-
() -> eventSources.additionalNamedEventSources()
72-
.filter(es -> es.priority().equals(EventSourceStartPriority.RESOURCE_STATE_LOADER))
73-
.parallel()
74-
.forEach(this::startEventSource),
75-
"LowLevelEventSourceStart");
70+
ExecutorServiceManager.executeAndWaitForAllToComplete(
71+
eventSources.additionalNamedEventSources()
72+
.filter(es -> es.priority().equals(EventSourceStartPriority.RESOURCE_STATE_LOADER)),
73+
this::startEventSource,
74+
getThreadNamer("start"));
7675

77-
ExecutorServiceManager.executeAndWaitForCompletion(
78-
() -> eventSources.additionalNamedEventSources()
79-
.filter(es -> es.priority().equals(EventSourceStartPriority.DEFAULT))
80-
.parallel().forEach(this::startEventSource),
81-
"DefaultEventSourceStart");
76+
ExecutorServiceManager.executeAndWaitForAllToComplete(
77+
eventSources.additionalNamedEventSources()
78+
.filter(es -> es.priority().equals(EventSourceStartPriority.DEFAULT)),
79+
this::startEventSource,
80+
getThreadNamer("start"));
81+
}
82+
83+
private static Function<NamedEventSource, String> getThreadNamer(String stage) {
84+
return es -> {
85+
final var name = es.name();
86+
return es.priority() + " " + stage + " -> "
87+
+ (es.isNameSet() ? name + " " + es.original().getClass().getSimpleName() : name);
88+
};
8289
}
8390

8491
@Override
8592
public synchronized void stop() {
8693
stopEventSource(eventSources.namedControllerResourceEventSource());
87-
ExecutorServiceManager.executeAndWaitForCompletion(
88-
() -> eventSources.additionalNamedEventSources().parallel()
89-
.forEach(this::stopEventSource),
90-
"EventSourceStop");
94+
ExecutorServiceManager.executeAndWaitForAllToComplete(
95+
eventSources.additionalNamedEventSources(),
96+
this::stopEventSource,
97+
getThreadNamer("stop"));
9198
eventSources.clear();
9299
}
93100

@@ -104,7 +111,7 @@ private void logEventSourceEvent(NamedEventSource eventSource, String event) {
104111
}
105112
}
106113

107-
private void startEventSource(NamedEventSource eventSource) {
114+
private Void startEventSource(NamedEventSource eventSource) {
108115
try {
109116
logEventSourceEvent(eventSource, "Starting");
110117
eventSource.start();
@@ -114,16 +121,18 @@ private void startEventSource(NamedEventSource eventSource) {
114121
} catch (Exception e) {
115122
throw new OperatorException("Couldn't start source " + eventSource.name(), e);
116123
}
124+
return null;
117125
}
118126

119-
private void stopEventSource(NamedEventSource eventSource) {
127+
private Void stopEventSource(NamedEventSource eventSource) {
120128
try {
121129
logEventSourceEvent(eventSource, "Stopping");
122130
eventSource.stop();
123131
logEventSourceEvent(eventSource, "Stopped");
124132
} catch (Exception e) {
125133
log.warn("Error closing {} -> {}", eventSource.name(), e);
126134
}
135+
return null;
127136
}
128137

129138
public final void registerEventSource(EventSource eventSource) throws OperatorException {

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/NamedEventSource.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,17 +3,20 @@
33
import java.util.Objects;
44

55
import io.javaoperatorsdk.operator.OperatorException;
6+
import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer;
67
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
78
import io.javaoperatorsdk.operator.processing.event.source.EventSourceStartPriority;
89

910
class NamedEventSource implements EventSource {
1011

1112
private final EventSource original;
1213
private final String name;
14+
private final boolean nameSet;
1315

1416
NamedEventSource(EventSource original, String name) {
1517
this.original = original;
1618
this.name = name;
19+
nameSet = !name.equals(EventSourceInitializer.generateNameFor(original));
1720
}
1821

1922
@Override
@@ -63,4 +66,8 @@ public int hashCode() {
6366
public EventSourceStartPriority priority() {
6467
return original.priority();
6568
}
69+
70+
public boolean isNameSet() {
71+
return nameSet;
72+
}
6673
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,6 @@ private InformerWrapper<T> createEventSource(
110110

111111
@Override
112112
public void stop() {
113-
log.info("Stopping {}", this);
114113
sources.forEach((ns, source) -> {
115114
try {
116115
log.debug("Stopping informer for namespace: {} -> {}", ns, source);

0 commit comments

Comments
 (0)