-
Notifications
You must be signed in to change notification settings - Fork 220
feat: delay the registration of controller till the operator is started #491
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -18,16 +18,21 @@ | |
|
||
@SuppressWarnings("rawtypes") | ||
public class Operator implements AutoCloseable { | ||
|
||
private static final Logger log = LoggerFactory.getLogger(Operator.class); | ||
private final KubernetesClient k8sClient; | ||
private final ConfigurationService configurationService; | ||
private final List<Closeable> closeables; | ||
private final Object lock; | ||
private final List<ControllerRef> controllers; | ||
private volatile boolean started; | ||
|
||
public Operator(KubernetesClient k8sClient, ConfigurationService configurationService) { | ||
this.k8sClient = k8sClient; | ||
this.configurationService = configurationService; | ||
this.closeables = new ArrayList<>(); | ||
this.lock = new Object(); | ||
this.controllers = new ArrayList<>(); | ||
this.started = false; | ||
|
||
Runtime.getRuntime().addShutdownHook(new Thread(this::close)); | ||
} | ||
|
@@ -45,43 +50,65 @@ public ConfigurationService getConfigurationService() { | |
* where there is no obvious entrypoint to the application which can trigger the injection process | ||
* and start the cluster monitoring processes. | ||
*/ | ||
@SuppressWarnings("unchecked") | ||
public void start() { | ||
final var version = configurationService.getVersion(); | ||
log.info( | ||
"Operator SDK {} (commit: {}) built on {} starting...", | ||
version.getSdkVersion(), | ||
version.getCommit(), | ||
version.getBuiltTime()); | ||
log.info("Client version: {}", Version.clientVersion()); | ||
try { | ||
final var k8sVersion = k8sClient.getVersion(); | ||
if (k8sVersion != null) { | ||
log.info("Server version: {}.{}", k8sVersion.getMajor(), k8sVersion.getMinor()); | ||
synchronized (lock) { | ||
if (started) { | ||
return; | ||
} | ||
|
||
final var version = configurationService.getVersion(); | ||
log.info( | ||
"Operator SDK {} (commit: {}) built on {} starting...", | ||
version.getSdkVersion(), | ||
version.getCommit(), | ||
version.getBuiltTime()); | ||
log.info("Client version: {}", Version.clientVersion()); | ||
try { | ||
final var k8sVersion = k8sClient.getVersion(); | ||
if (k8sVersion != null) { | ||
log.info("Server version: {}.{}", k8sVersion.getMajor(), k8sVersion.getMinor()); | ||
} | ||
} catch (Exception e) { | ||
log.error("Error retrieving the server version. Exiting!", e); | ||
throw new OperatorException("Error retrieving the server version", e); | ||
} | ||
|
||
for (ControllerRef ref : controllers) { | ||
startController(ref.controller, ref.configuration); | ||
} | ||
} catch (Exception e) { | ||
log.error("Error retrieving the server version. Exiting!", e); | ||
throw new OperatorException("Error retrieving the server version", e); | ||
|
||
started = true; | ||
} | ||
} | ||
|
||
/** Stop the operator. */ | ||
@Override | ||
public void close() { | ||
log.info( | ||
"Operator SDK {} is shutting down...", configurationService.getVersion().getSdkVersion()); | ||
synchronized (lock) { | ||
if (!started) { | ||
return; | ||
} | ||
|
||
for (Closeable closeable : this.closeables) { | ||
try { | ||
log.debug("closing {}", closeable); | ||
closeable.close(); | ||
} catch (IOException e) { | ||
log.warn("Error closing {}", closeable, e); | ||
log.info( | ||
"Operator SDK {} is shutting down...", configurationService.getVersion().getSdkVersion()); | ||
|
||
for (Closeable closeable : this.closeables) { | ||
try { | ||
log.debug("closing {}", closeable); | ||
closeable.close(); | ||
} catch (IOException e) { | ||
log.warn("Error closing {}", closeable, e); | ||
} | ||
} | ||
|
||
started = false; | ||
} | ||
} | ||
|
||
/** | ||
* Registers the specified controller with this operator. | ||
* Add a registration requests for the specified controller with this operator. The effective | ||
* registration of the controller is delayed till the operator is started. | ||
* | ||
* @param controller the controller to register | ||
* @param <R> the {@code CustomResource} type associated with the controller | ||
|
@@ -92,6 +119,32 @@ public <R extends CustomResource> void register(ResourceController<R> controller | |
register(controller, null); | ||
} | ||
|
||
/** | ||
* Add a registration requests for the specified controller with this operator, overriding its | ||
* default configuration by the specified one (usually created via {@link | ||
* io.javaoperatorsdk.operator.api.config.ControllerConfigurationOverrider#override(ControllerConfiguration)}, | ||
* passing it the controller's original configuration. The effective registration of the | ||
* controller is delayed till the operator is started. | ||
* | ||
* @param controller the controller to register | ||
* @param configuration the configuration with which we want to register the controller, if {@code | ||
* null}, the controller's original configuration is used | ||
* @param <R> the {@code CustomResource} type associated with the controller | ||
* @throws OperatorException if a problem occurred during the registration process | ||
*/ | ||
public <R extends CustomResource> void register( | ||
ResourceController<R> controller, ControllerConfiguration<R> configuration) | ||
throws OperatorException { | ||
synchronized (lock) { | ||
if (!started) { | ||
this.controllers.add(new ControllerRef(controller, configuration)); | ||
} else { | ||
this.controllers.add(new ControllerRef(controller, configuration)); | ||
startController(controller, configuration); | ||
} | ||
} | ||
} | ||
|
||
/** | ||
* Registers the specified controller with this operator, overriding its default configuration by | ||
* the specified one (usually created via {@link | ||
|
@@ -104,9 +157,10 @@ public <R extends CustomResource> void register(ResourceController<R> controller | |
* @param <R> the {@code CustomResource} type associated with the controller | ||
* @throws OperatorException if a problem occurred during the registration process | ||
*/ | ||
public <R extends CustomResource> void register( | ||
private <R extends CustomResource> void startController( | ||
ResourceController<R> controller, ControllerConfiguration<R> configuration) | ||
throws OperatorException { | ||
|
||
final var existing = configurationService.getConfigurationFor(controller); | ||
if (existing == null) { | ||
lburgazzoli marked this conversation as resolved.
Show resolved
Hide resolved
|
||
log.warn( | ||
|
@@ -120,7 +174,7 @@ public <R extends CustomResource> void register( | |
configuration = existing; | ||
} | ||
|
||
Class<R> resClass = configuration.getCustomResourceClass(); | ||
final Class<R> resClass = configuration.getCustomResourceClass(); | ||
final String controllerName = configuration.getName(); | ||
final var crdName = configuration.getCRDName(); | ||
final var specVersion = "v1"; | ||
|
@@ -137,10 +191,10 @@ public <R extends CustomResource> void register( | |
CustomResourceUtils.assertCustomResource(resClass, crd); | ||
} | ||
|
||
final var client = k8sClient.customResources(resClass); | ||
try { | ||
DefaultEventSourceManager eventSourceManager = | ||
new DefaultEventSourceManager(controller, configuration, client); | ||
new DefaultEventSourceManager( | ||
controller, configuration, k8sClient.customResources(resClass)); | ||
controller.init(eventSourceManager); | ||
closeables.add(eventSourceManager); | ||
} catch (MissingCRDException e) { | ||
|
@@ -195,4 +249,14 @@ private static <R extends CustomResource> boolean failOnMissingCurrentNS( | |
} | ||
return false; | ||
} | ||
|
||
private static class ControllerRef { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This should be parameterized by the CR type so that the There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I'm not sure how it should be done as the list of ControllerRef would need to be on the base type in any case. Unfortunately, the usage of generic in the project is a little bit confused an inconsistent so the only way I found to make all the methods happy were to remove generics. But I'm happy to change this if you help me understanding the right way of doing it. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I agree that the generic usage is inconsistent in the project. Let's address this in a subsequent PR. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. thx, can you re-trigger the CI ? |
||
public final ResourceController controller; | ||
public final ControllerConfiguration configuration; | ||
|
||
public ControllerRef(ResourceController controller, ControllerConfiguration configuration) { | ||
this.controller = controller; | ||
this.configuration = configuration; | ||
} | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,10 +1,8 @@ | ||
package io.javaoperatorsdk.operator.processing.event; | ||
|
||
import io.fabric8.kubernetes.api.model.KubernetesResourceList; | ||
import io.fabric8.kubernetes.client.CustomResource; | ||
import io.fabric8.kubernetes.client.KubernetesClientException; | ||
import io.fabric8.kubernetes.client.dsl.MixedOperation; | ||
import io.fabric8.kubernetes.client.dsl.Resource; | ||
import io.javaoperatorsdk.operator.MissingCRDException; | ||
import io.javaoperatorsdk.operator.OperatorException; | ||
import io.javaoperatorsdk.operator.api.ResourceController; | ||
|
@@ -45,10 +43,9 @@ public class DefaultEventSourceManager implements EventSourceManager { | |
} | ||
} | ||
|
||
@SuppressWarnings({"rawtypes", "unchecked"}) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Unneeded without the following change. |
||
public <R extends CustomResource<?, ?>> DefaultEventSourceManager( | ||
ResourceController<R> controller, | ||
ControllerConfiguration<R> configuration, | ||
MixedOperation<R, KubernetesResourceList<R>, Resource<R>> client) { | ||
ResourceController controller, ControllerConfiguration configuration, MixedOperation client) { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't think that this change is needed. |
||
this(new DefaultEventHandler(controller, configuration, client), true); | ||
registerEventSource( | ||
CUSTOM_RESOURCE_EVENT_SOURCE_NAME, new CustomResourceEventSource<>(client, configuration)); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The
ControllerRef
object is added to the queue in case a scenario like start, stop, start would be supported.