Skip to content

fix: run event source start on specific thread pool #1606

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 15 commits into from
Nov 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,13 @@
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -60,6 +64,31 @@ public synchronized static ExecutorServiceManager instance() {
return instance;
}

public static <T> void executeAndWaitForAllToComplete(Stream<T> stream,
Function<T, Void> task, Function<T, String> threadNamer) {
final var instrumented = new InstrumentedExecutorService(
Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors()));

try {
instrumented.invokeAll(stream.parallel().map(item -> (Callable<Void>) () -> {
// change thread name for easier debugging
final var thread = Thread.currentThread();
final var name = thread.getName();
thread.setName(threadNamer.apply(item));
try {
task.apply(item);
return null;
} finally {
// restore original name
thread.setName(name);
}
}).collect(Collectors.toList()));
shutdown(instrumented);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

public ExecutorService executorService() {
return executor;
}
Expand All @@ -71,17 +100,18 @@ public ExecutorService workflowExecutorService() {
private void doStop() {
try {
log.debug("Closing executor");
executor.shutdown();
workflowExecutor.shutdown();
if (!workflowExecutor.awaitTermination(terminationTimeoutSeconds, TimeUnit.SECONDS)) {
workflowExecutor.shutdownNow(); // if we timed out, waiting, cancel everything
}
if (!executor.awaitTermination(terminationTimeoutSeconds, TimeUnit.SECONDS)) {
executor.shutdownNow(); // if we timed out, waiting, cancel everything
}

shutdown(executor);
shutdown(workflowExecutor);
} catch (InterruptedException e) {
log.debug("Exception closing executor: {}", e.getLocalizedMessage());
Thread.currentThread().interrupt();
}
}

private static void shutdown(ExecutorService executorService) throws InterruptedException {
executorService.shutdown();
if (!executorService.awaitTermination(instance().terminationTimeoutSeconds, TimeUnit.SECONDS)) {
executorService.shutdownNow(); // if we timed out, waiting, cancel everything
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -360,8 +360,8 @@ public void changeNamespaces(Set<String> namespaces) {
}

public synchronized void startEventProcessing() {
log.info("Started event processing for controller: {}", configuration.getName());
eventProcessor.start();
log.info("Started event processing for controller: {}", configuration.getName());
}

private void throwMissingCRDException(String crdName, String specVersion, String controllerName) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand All @@ -13,6 +14,7 @@
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.MissingCRDException;
import io.javaoperatorsdk.operator.OperatorException;
import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager;
import io.javaoperatorsdk.operator.api.config.NamespaceChangeable;
import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer;
import io.javaoperatorsdk.operator.processing.Controller;
Expand Down Expand Up @@ -65,20 +67,36 @@ public void postProcessDefaultEventSourcesAfterProcessorInitializer() {
@Override
public synchronized void start() {
startEventSource(eventSources.namedControllerResourceEventSource());
eventSources.additionalNamedEventSources()
.filter(es -> es.priority().equals(EventSourceStartPriority.RESOURCE_STATE_LOADER))
.parallel().forEach(this::startEventSource);
eventSources.additionalNamedEventSources()
.filter(es -> es.priority().equals(EventSourceStartPriority.DEFAULT))
.parallel().forEach(this::startEventSource);

ExecutorServiceManager.executeAndWaitForAllToComplete(
eventSources.additionalNamedEventSources()
.filter(es -> es.priority().equals(EventSourceStartPriority.RESOURCE_STATE_LOADER)),
this::startEventSource,
getThreadNamer("start"));

ExecutorServiceManager.executeAndWaitForAllToComplete(
eventSources.additionalNamedEventSources()
.filter(es -> es.priority().equals(EventSourceStartPriority.DEFAULT)),
this::startEventSource,
getThreadNamer("start"));
}

private static Function<NamedEventSource, String> getThreadNamer(String stage) {
return es -> {
final var name = es.name();
return es.priority() + " " + stage + " -> "
+ (es.isNameSet() ? name + " " + es.original().getClass().getSimpleName() : name);
};
}

@Override
public synchronized void stop() {
stopEventSource(eventSources.namedControllerResourceEventSource());
eventSources.additionalNamedEventSources().parallel().forEach(this::stopEventSource);
ExecutorServiceManager.executeAndWaitForAllToComplete(
eventSources.additionalNamedEventSources(),
this::stopEventSource,
getThreadNamer("stop"));
eventSources.clear();

}

@SuppressWarnings("rawtypes")
Expand All @@ -94,7 +112,7 @@ private void logEventSourceEvent(NamedEventSource eventSource, String event) {
}
}

private void startEventSource(NamedEventSource eventSource) {
private Void startEventSource(NamedEventSource eventSource) {
try {
logEventSourceEvent(eventSource, "Starting");
eventSource.start();
Expand All @@ -104,16 +122,18 @@ private void startEventSource(NamedEventSource eventSource) {
} catch (Exception e) {
throw new OperatorException("Couldn't start source " + eventSource.name(), e);
}
return null;
}

private void stopEventSource(NamedEventSource eventSource) {
private Void stopEventSource(NamedEventSource eventSource) {
try {
logEventSourceEvent(eventSource, "Stopping");
eventSource.stop();
logEventSourceEvent(eventSource, "Stopped");
} catch (Exception e) {
log.warn("Error closing {} -> {}", eventSource.name(), e);
}
return null;
}

public final void registerEventSource(EventSource eventSource) throws OperatorException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.util.Optional;

import io.javaoperatorsdk.operator.OperatorException;
import io.javaoperatorsdk.operator.api.reconciler.EventSourceInitializer;
import io.javaoperatorsdk.operator.processing.event.source.Configurable;
import io.javaoperatorsdk.operator.processing.event.source.EventSource;
import io.javaoperatorsdk.operator.processing.event.source.EventSourceStartPriority;
Expand All @@ -13,10 +14,12 @@ class NamedEventSource implements EventSource, EventSourceMetadata {

private final EventSource original;
private final String name;
private final boolean nameSet;

NamedEventSource(EventSource original, String name) {
this.original = original;
this.name = name;
nameSet = !name.equals(EventSourceInitializer.generateNameFor(original));
}

@Override
Expand Down Expand Up @@ -95,4 +98,8 @@ public int hashCode() {
public EventSourceStartPriority priority() {
return original.priority();
}

public boolean isNameSet() {
return nameSet;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,8 @@ public class InformerManager<T extends HasMetadata, C extends ResourceConfigurat

@Override
public void start() throws OperatorException {
sources.values().parallelStream().forEach(LifecycleAware::start);
// make sure informers are all started before proceeding further
sources.values().parallelStream().forEach(InformerWrapper::start);
}

void initSources(MixedOperation<T, KubernetesResourceList<T>, Resource<T>> client,
Expand Down Expand Up @@ -113,7 +114,6 @@ private InformerWrapper<T> createEventSource(

@Override
public void stop() {
log.info("Stopping {}", this);
sources.forEach((ns, source) -> {
try {
log.debug("Stopping informer for namespace: {} -> {}", ns, source);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,11 @@ public void start() throws OperatorException {
if (!configService.stopOnInformerErrorDuringStartup()) {
informer.exceptionHandler((b, t) -> !ExceptionHandler.isDeserializationException(t));
}
// change thread name for easier debugging
final var thread = Thread.currentThread();
final var name = thread.getName();
try {
thread.setName(informerInfo() + " " + thread.getId());
var start = informer.start();
// note that in case we don't put here timeout and stopOnInformerErrorDuringStartup is
// false, and there is a rbac issue the get never returns; therefore operator never really
Expand All @@ -81,6 +85,9 @@ public void start() throws OperatorException {
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
throw new IllegalStateException(e);
} finally {
// restore original name
thread.setName(name);
}

} catch (Exception e) {
Expand Down Expand Up @@ -143,6 +150,10 @@ public List<T> byIndex(String indexName, String indexKey) {

@Override
public String toString() {
return "InformerWrapper [" + versionedFullResourceName() + "] (" + informer + ')';
return informerInfo() + " (" + informer + ')';
}

private String informerInfo() {
return "InformerWrapper [" + versionedFullResourceName() + "]";
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

import org.junit.jupiter.api.Test;

import io.fabric8.kubernetes.api.model.ConfigMap;
import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.MockKubernetesClient;
import io.javaoperatorsdk.operator.OperatorException;
Expand Down Expand Up @@ -171,9 +172,9 @@ void changesNamespacesOnControllerAndInformerEventSources() {
}

private EventSourceManager initManager() {
final var configuration = MockControllerConfiguration.forResource(HasMetadata.class);
final var configuration = MockControllerConfiguration.forResource(ConfigMap.class);
final Controller controller = new Controller(mock(Reconciler.class), configuration,
MockKubernetesClient.client(HasMetadata.class));
MockKubernetesClient.client(ConfigMap.class));
return new EventSourceManager(controller);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import io.fabric8.kubernetes.api.model.apps.Deployment;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.MockKubernetesClient;
import io.javaoperatorsdk.operator.OperatorException;
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider;
import io.javaoperatorsdk.operator.api.config.InformerStoppedHandler;
import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration;
Expand Down Expand Up @@ -259,7 +260,7 @@ void informerStoppedHandlerShouldBeCalledWhenInformerStops() {

// by default informer fails to start if there is an exception in the client on start.
// Throws the exception further.
assertThrows(RuntimeException.class, () -> informerEventSource.start());
assertThrows(OperatorException.class, () -> informerEventSource.start());
verify(informerStoppedHandler, atLeastOnce()).onStop(any(), eq(exception));
} finally {
ConfigurationServiceProvider.reset();
Expand Down