Skip to content

fix: run event source start on specific thread pool #1606

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Nov 25, 2022
Merged

Conversation

metacosm
Copy link
Collaborator

Fixes #1603

@metacosm metacosm self-assigned this Nov 16, 2022
@metacosm metacosm requested a review from csviri November 16, 2022 16:18
@@ -17,6 +18,8 @@ public class ExecutorServiceManager {
private static ExecutorServiceManager instance;
private final ExecutorService executor;
private final ExecutorService workflowExecutor;
private final ForkJoinPool threadPool =
new ForkJoinPool(Runtime.getRuntime().availableProcessors());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. Runtime.getRuntime().availableProcessors() is not needed, empty constructor is equivalent to this.
  2. We probably should not use ForkJoinPool for this purpose, since event sources are IO bound operations. A normal executor service would be a good choice imo, maybe a configurable size; or a size that is same as event source number (but with an upper bound)
    see:
    https://www.geeksforgeeks.org/java-forkjoinpool-vs-executorservice/

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point, I originally was using the executor services defined by the ConfigurationService but wasn't seeing changes due to me being dumb (i.e. not using the proper version of the SDK in my testing code) and then forgot to go back to it when I finally managed to get it to work as I was expecting. 😅

@metacosm metacosm requested a review from csviri November 21, 2022 10:51
@@ -41,7 +46,9 @@

@Override
public void start() throws OperatorException {
sources.values().parallelStream().forEach(LifecycleAware::start);
// make sure informers are all started before proceeding further
ExecutorServiceManager.executeAndWaitForCompletion(
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This can cause a deadlock. For example if the Thread count is 1 (but also others). So the event source is an InformerEventSource and on start it also want to start event sources from the same thread pool. This should not use the same executor service.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I'm following…

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Lets say there are 2 threads in the in the pool. We start 2 InformerEventSources at tha same time. Both started but then both of them starts with the same executor service two InformerManager, but those don't have available threads anymore at that point. Because all the threads available in the executor service are already blocked.

This can be verified if you just create an operator with 1 thread and 1 informer event source, that I assume will never start.

* @param task task to run concurrently
*/
public static void executeAndWaitForCompletion(Runnable task) {
executeAndWaitForCompletion(task, instance().workflowExecutorService());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is a little smelly that this workflow executor is reused for a general purpose executor service IMO.

final var thread = Thread.currentThread();
final var name = thread.getName();
try {
thread.setName(threadNamePrefix + "-" + thread.getId());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is the thread name changes here? It's not propagated to the executor, and basically there is just submits tasks and shutdows the executor.

* way
*/
public static void executeAndWaitForCompletion(Runnable task, String threadNamePrefix) {
final var executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors());
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not seems to be right. Creating a new threadpool for all tasks.

ExecutorServiceManager.executeAndWaitForCompletion(
() -> eventSources.additionalNamedEventSources()
.filter(es -> es.priority().equals(EventSourceStartPriority.RESOURCE_STATE_LOADER))
.parallel()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This won't be using the ForkJoinPool anyways?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, this won't.

.filter(es -> es.priority().equals(EventSourceStartPriority.RESOURCE_STATE_LOADER))
.parallel()
.forEach(this::startEventSource),
"LowLevelEventSourceStart");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"LowLevelEventSourceStart");
"ResourceStateLoaderEventSourceStart");

@metacosm metacosm requested a review from csviri November 24, 2022 17:33
@sonarqubecloud
Copy link

Kudos, SonarCloud Quality Gate passed!    Quality Gate passed

Bug A 0 Bugs
Vulnerability A 0 Vulnerabilities
Security Hotspot A 0 Security Hotspots
Code Smell A 0 Code Smells

81.5% 81.5% Coverage
0.0% 0.0% Duplication

@metacosm metacosm merged commit adb0963 into main Nov 25, 2022
@metacosm metacosm deleted the start-thread-pool branch November 25, 2022 09:37
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Move start methods out of the common thread pool
2 participants