-
Notifications
You must be signed in to change notification settings - Fork 218
fix: run event source start on specific thread pool #1606
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@@ -17,6 +18,8 @@ public class ExecutorServiceManager { | |||
private static ExecutorServiceManager instance; | |||
private final ExecutorService executor; | |||
private final ExecutorService workflowExecutor; | |||
private final ForkJoinPool threadPool = | |||
new ForkJoinPool(Runtime.getRuntime().availableProcessors()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Runtime.getRuntime().availableProcessors()
is not needed, empty constructor is equivalent to this.- We probably should not use ForkJoinPool for this purpose, since event sources are IO bound operations. A normal executor service would be a good choice imo, maybe a configurable size; or a size that is same as event source number (but with an upper bound)
see:
https://www.geeksforgeeks.org/java-forkjoinpool-vs-executorservice/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good point, I originally was using the executor services defined by the ConfigurationService
but wasn't seeing changes due to me being dumb (i.e. not using the proper version of the SDK in my testing code) and then forgot to go back to it when I finally managed to get it to work as I was expecting. 😅
@@ -41,7 +46,9 @@ | |||
|
|||
@Override | |||
public void start() throws OperatorException { | |||
sources.values().parallelStream().forEach(LifecycleAware::start); | |||
// make sure informers are all started before proceeding further | |||
ExecutorServiceManager.executeAndWaitForCompletion( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This can cause a deadlock. For example if the Thread count is 1 (but also others). So the event source is an InformerEventSource and on start it also want to start event sources from the same thread pool. This should not use the same executor service.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure I'm following…
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Lets say there are 2 threads in the in the pool. We start 2 InformerEventSources
at tha same time. Both started but then both of them starts with the same executor service two InformerManager, but those don't have available threads anymore at that point. Because all the threads available in the executor service are already blocked.
This can be verified if you just create an operator with 1 thread and 1 informer event source, that I assume will never start.
* @param task task to run concurrently | ||
*/ | ||
public static void executeAndWaitForCompletion(Runnable task) { | ||
executeAndWaitForCompletion(task, instance().workflowExecutorService()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It is a little smelly that this workflow executor is reused for a general purpose executor service IMO.
b0ec0e8
to
1e6a90e
Compare
final var thread = Thread.currentThread(); | ||
final var name = thread.getName(); | ||
try { | ||
thread.setName(threadNamePrefix + "-" + thread.getId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is the thread name changes here? It's not propagated to the executor, and basically there is just submits tasks and shutdows the executor.
* way | ||
*/ | ||
public static void executeAndWaitForCompletion(Runnable task, String threadNamePrefix) { | ||
final var executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This does not seems to be right. Creating a new threadpool for all tasks.
ExecutorServiceManager.executeAndWaitForCompletion( | ||
() -> eventSources.additionalNamedEventSources() | ||
.filter(es -> es.priority().equals(EventSourceStartPriority.RESOURCE_STATE_LOADER)) | ||
.parallel() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This won't be using the ForkJoinPool anyways?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, this won't.
.filter(es -> es.priority().equals(EventSourceStartPriority.RESOURCE_STATE_LOADER)) | ||
.parallel() | ||
.forEach(this::startEventSource), | ||
"LowLevelEventSourceStart"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"LowLevelEventSourceStart"); | |
"ResourceStateLoaderEventSourceStart"); |
IO-bound operations shouldn't use ForkJoinPools and workflow service should be in use during these operations so it should be available to run these tasks.
This reverts commit 7d007db.
b6dd3a2
to
83c828b
Compare
Kudos, SonarCloud Quality Gate passed! |
Fixes #1603