Skip to content

fix: improvements on informer start and logging #1749

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Feb 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.javaoperatorsdk.operator.OperatorException;

public class ExecutorServiceManager {
private static final Logger log = LoggerFactory.getLogger(ExecutorServiceManager.class);
private static ExecutorServiceManager instance;
Expand Down Expand Up @@ -82,7 +84,16 @@ public static <T> void executeAndWaitForAllToComplete(Stream<T> stream,
// restore original name
thread.setName(name);
}
}).collect(Collectors.toList()));
}).collect(Collectors.toList())).forEach(f -> {
try {
// to find out any exceptions
f.get();
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this defeating the purpose? I mean this will block each callable in turn…

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Those callables are already finsihed that time. (At least if I correctly understand the javadocs of invoke all :) )

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah yes, you are right!

} catch (ExecutionException e) {
throw new OperatorException(e.getCause());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
shutdown(instrumented);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,7 +335,7 @@ public synchronized void start(boolean startEventProcessor) throws OperatorExcep
if (startEventProcessor) {
eventProcessor.start();
}
log.info("'{}' controller started, pending event sources initialization", controllerName);
log.info("'{}' controller started", controllerName);
} catch (MissingCRDException e) {
stop();
throwMissingCRDException(e.getCrdName(), e.getSpecVersion(), controllerName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ private static Function<NamedEventSource, String> getThreadNamer(String stage) {
return es -> {
final var name = es.name();
return es.priority() + " " + stage + " -> "
+ (es.isNameSet() ? name + " " + es.original().getClass().getSimpleName() : name);
+ (es.isNameSet() ? name + " " + es.original().getClass() : es.original());
};
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import io.javaoperatorsdk.operator.ReconcilerUtils;
import io.javaoperatorsdk.operator.api.config.Cloner;
import io.javaoperatorsdk.operator.api.config.ConfigurationServiceProvider;
import io.javaoperatorsdk.operator.api.config.ExecutorServiceManager;
import io.javaoperatorsdk.operator.api.config.ResourceConfiguration;
import io.javaoperatorsdk.operator.health.InformerHealthIndicator;
import io.javaoperatorsdk.operator.processing.LifecycleAware;
Expand Down Expand Up @@ -58,7 +59,13 @@ public InformerManager(MixedOperation<T, KubernetesResourceList<T>, Resource<T>>
public void start() throws OperatorException {
initSources();
// make sure informers are all started before proceeding further
sources.values().parallelStream().forEach(InformerWrapper::start);
ExecutorServiceManager.executeAndWaitForAllToComplete(sources.values().stream(),
iw -> {
iw.start();
return null;
},
iw -> "InformerStarter-" + iw.getTargetNamespace() + "-"
+ configuration.getResourceClass().getSimpleName());
}

private void initSources() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,12 +73,19 @@ public void start() throws OperatorException {
final var name = thread.getName();
try {
thread.setName(informerInfo() + " " + thread.getId());
final var resourceName = informer.getApiTypeClass().getSimpleName();
log.debug("Starting informer for namespace: {} resource: {}", namespaceIdentifier,
resourceName);
var start = informer.start();
// note that in case we don't put here timeout and stopOnInformerErrorDuringStartup is
// false, and there is a rbac issue the get never returns; therefore operator never really
// starts
log.trace("Waiting informer to start namespace: {} resource: {}", namespaceIdentifier,
resourceName);
start.toCompletableFuture().get(configService.cacheSyncTimeout().toMillis(),
TimeUnit.MILLISECONDS);
log.debug("Started informer for namespace: {} resource: {}", namespaceIdentifier,
resourceName);
} catch (TimeoutException | ExecutionException e) {
if (configService.stopOnInformerErrorDuringStartup()) {
log.error("Informer startup error. Operator will be stopped. Informer: {}", informer, e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,4 +163,11 @@ public ResourceConfiguration<R> getInformerConfiguration() {
public C configuration() {
return manager().configuration();
}

@Override
public String toString() {
return getClass().getSimpleName() + "{" +
"resourceClass: " + configuration.getResourceClass().getSimpleName() +
"}";
}
}