Skip to content

Primary to Secondary Mapper #1300

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Jun 24, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.javaoperatorsdk.operator.api.config.ResourceConfiguration;
import io.javaoperatorsdk.operator.api.config.Utils;
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
import io.javaoperatorsdk.operator.processing.event.source.PrimaryToSecondaryMapper;
import io.javaoperatorsdk.operator.processing.event.source.SecondaryToPrimaryMapper;
import io.javaoperatorsdk.operator.processing.event.source.informer.Mappers;

Expand All @@ -19,15 +20,18 @@ public interface InformerConfiguration<R extends HasMetadata>
class DefaultInformerConfiguration<R extends HasMetadata> extends
DefaultResourceConfiguration<R> implements InformerConfiguration<R> {

private final PrimaryToSecondaryMapper<?> primaryToSecondaryMapper;
private final SecondaryToPrimaryMapper<R> secondaryToPrimaryMapper;
private final boolean followControllerNamespaceChanges;

protected DefaultInformerConfiguration(String labelSelector,
Class<R> resourceClass,
PrimaryToSecondaryMapper<?> primaryToSecondaryMapper,
SecondaryToPrimaryMapper<R> secondaryToPrimaryMapper,
Set<String> namespaces, boolean followControllerNamespaceChanges) {
super(labelSelector, resourceClass, namespaces);
this.followControllerNamespaceChanges = followControllerNamespaceChanges;
this.primaryToSecondaryMapper = primaryToSecondaryMapper;
this.secondaryToPrimaryMapper =
Objects.requireNonNullElse(secondaryToPrimaryMapper,
Mappers.fromOwnerReference());
Expand All @@ -41,6 +45,10 @@ public SecondaryToPrimaryMapper<R> getSecondaryToPrimaryMapper() {
return secondaryToPrimaryMapper;
}

@Override
public <P extends HasMetadata> PrimaryToSecondaryMapper<P> getPrimaryToSecondaryMapper() {
return (PrimaryToSecondaryMapper<P>) primaryToSecondaryMapper;
}
}

/**
Expand All @@ -53,9 +61,12 @@ public SecondaryToPrimaryMapper<R> getSecondaryToPrimaryMapper() {

SecondaryToPrimaryMapper<R> getSecondaryToPrimaryMapper();

<P extends HasMetadata> PrimaryToSecondaryMapper<P> getPrimaryToSecondaryMapper();

@SuppressWarnings("unused")
class InformerConfigurationBuilder<R extends HasMetadata> {

private PrimaryToSecondaryMapper<?> primaryToSecondaryMapper;
private SecondaryToPrimaryMapper<R> secondaryToPrimaryMapper;
private Set<String> namespaces;
private String labelSelector;
Expand All @@ -66,6 +77,12 @@ private InformerConfigurationBuilder(Class<R> resourceClass) {
this.resourceClass = resourceClass;
}

public <P extends HasMetadata> InformerConfigurationBuilder<R> withPrimaryToSecondaryMapper(
PrimaryToSecondaryMapper<P> primaryToSecondaryMapper) {
this.primaryToSecondaryMapper = primaryToSecondaryMapper;
return this;
}

public InformerConfigurationBuilder<R> withSecondaryToPrimaryMapper(
SecondaryToPrimaryMapper<R> secondaryToPrimaryMapper) {
this.secondaryToPrimaryMapper = secondaryToPrimaryMapper;
Expand Down Expand Up @@ -136,6 +153,7 @@ public InformerConfigurationBuilder<R> withLabelSelector(String labelSelector) {

public InformerConfiguration<R> build() {
return new DefaultInformerConfiguration<>(labelSelector, resourceClass,
primaryToSecondaryMapper,
secondaryToPrimaryMapper,
namespaces, inheritControllerNamespacesOnChange);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
package io.javaoperatorsdk.operator.processing.event.source;

import java.util.Set;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.processing.event.ResourceID;

public interface PrimaryToSecondaryMapper<P extends HasMetadata> {

Set<ResourceID> toSecondaryResourceIDs(P primary);
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.javaoperatorsdk.operator.processing.event.Event;
import io.javaoperatorsdk.operator.processing.event.EventHandler;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.PrimaryToSecondaryMapper;

/**
* <p>
Expand Down Expand Up @@ -74,28 +75,33 @@ public class InformerEventSource<R extends HasMetadata, P extends HasMetadata>
private final EventRecorder<R> eventRecorder = new EventRecorder<>();
// we need direct control for the indexer to propagate the just update resource also to the index
private final PrimaryToSecondaryIndex<R> primaryToSecondaryIndex;
private final PrimaryToSecondaryMapper<P> primaryToSecondaryMapper;

public InformerEventSource(
InformerConfiguration<R> configuration, EventSourceContext<P> context) {
super(context.getClient().resources(configuration.getResourceClass()), configuration);
this.configuration = configuration;
primaryToSecondaryIndex =
new PrimaryToSecondaryIndex<>(configuration.getSecondaryToPrimaryMapper());
this(configuration, context.getClient());
}

public InformerEventSource(InformerConfiguration<R> configuration, KubernetesClient client) {
super(client.resources(configuration.getResourceClass()), configuration);
this.configuration = configuration;
primaryToSecondaryIndex =
new PrimaryToSecondaryIndex<>(configuration.getSecondaryToPrimaryMapper());
primaryToSecondaryMapper = configuration.getPrimaryToSecondaryMapper();
if (primaryToSecondaryMapper == null) {
primaryToSecondaryIndex =
new PrimaryToSecondaryIndex<>(configuration.getSecondaryToPrimaryMapper());
} else {
primaryToSecondaryIndex = null;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it'd be cleaner to have a no-op index instead of setting it to null and having to guard for null everywhere else?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure about that, it is very explicit this way, that the index is not used in that case.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But also fine with that. Will try how that looks.

Copy link
Collaborator Author

@csviri csviri Jun 24, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One doesn't prevent the other: we could have a NOOP index and still be able to make it explicit when needed (i.e. useSecondaryToPrimaryIndex could check if the set index is the NOOP one instead of null where we need to know for sure whether to use it or not). Not a showstopper in any case.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Changed.

}
}

@Override
public void onAdd(R resource) {
if (log.isDebugEnabled()) {
log.debug("On add event received for resource id: {}", ResourceID.fromResource(resource));
}
primaryToSecondaryIndex.onAddOrUpdate(resource);
if (useSecondaryToPrimaryIndex()) {
primaryToSecondaryIndex.onAddOrUpdate(resource);
}
onAddOrUpdate("add", resource, () -> InformerEventSource.super.onAdd(resource));
}

Expand All @@ -104,7 +110,9 @@ public void onUpdate(R oldObject, R newObject) {
if (log.isDebugEnabled()) {
log.debug("On update event received for resource id: {}", ResourceID.fromResource(newObject));
}
primaryToSecondaryIndex.onAddOrUpdate(newObject);
if (useSecondaryToPrimaryIndex()) {
primaryToSecondaryIndex.onAddOrUpdate(newObject);
}
onAddOrUpdate("update", newObject,
() -> InformerEventSource.super.onUpdate(oldObject, newObject));
}
Expand All @@ -114,7 +122,9 @@ public void onDelete(R resource, boolean b) {
if (log.isDebugEnabled()) {
log.debug("On delete event received for resource id: {}", ResourceID.fromResource(resource));
}
primaryToSecondaryIndex.onDelete(resource);
if (useSecondaryToPrimaryIndex()) {
primaryToSecondaryIndex.onDelete(resource);
}
super.onDelete(resource, b);
propagateEvent(resource);
}
Expand Down Expand Up @@ -177,8 +187,13 @@ private void propagateEvent(R object) {

@Override
public Set<R> getSecondaryResources(P primary) {
var secondaryIDs =
primaryToSecondaryIndex.getSecondaryResources(ResourceID.fromResource(primary));
Set<ResourceID> secondaryIDs;
if (useSecondaryToPrimaryIndex()) {
secondaryIDs =
primaryToSecondaryIndex.getSecondaryResources(ResourceID.fromResource(primary));
} else {
secondaryIDs = primaryToSecondaryMapper.toSecondaryResourceIDs(primary);
}
return secondaryIDs.stream().map(this::get).flatMap(Optional::stream)
.collect(Collectors.toSet());
}
Expand All @@ -201,7 +216,9 @@ public synchronized void handleRecentResourceCreate(ResourceID resourceID, R res
}

private void handleRecentCreateOrUpdate(R resource, Runnable runnable) {
primaryToSecondaryIndex.onAddOrUpdate(resource);
if (useSecondaryToPrimaryIndex()) {
primaryToSecondaryIndex.onAddOrUpdate(resource);
}
if (eventRecorder.isRecordingFor(ResourceID.fromResource(resource))) {
handleRecentResourceOperationAndStopEventRecording(resource);
} else {
Expand Down Expand Up @@ -247,6 +264,10 @@ private void handleRecentResourceOperationAndStopEventRecording(R resource) {
}
}

private boolean useSecondaryToPrimaryIndex() {
return this.primaryToSecondaryMapper == null;
}

@Override
public synchronized void prepareForCreateOrUpdateEventFiltering(ResourceID resourceID,
R resource) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
import org.slf4j.LoggerFactory;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.client.CustomResource;
import io.fabric8.kubernetes.client.LocalPortForward;
import io.javaoperatorsdk.operator.Operator;
import io.javaoperatorsdk.operator.ReconcilerUtils;
import io.javaoperatorsdk.operator.RegisteredController;
import io.javaoperatorsdk.operator.api.config.ConfigurationService;
import io.javaoperatorsdk.operator.api.config.ControllerConfigurationOverrider;
Expand All @@ -32,15 +34,17 @@ public class LocalOperatorExtension extends AbstractOperatorExtension {

private final Operator operator;
private final List<ReconcilerSpec> reconcilers;
private final List<PortFowardSpec> portForwards;
private final List<PortForwardSpec> portForwards;
private final List<LocalPortForward> localPortForwards;
private final List<Class<? extends CustomResource>> additionalCustomResourceDefinitions;
private final Map<Reconciler, RegisteredController> registeredControllers;

private LocalOperatorExtension(
ConfigurationService configurationService,
List<ReconcilerSpec> reconcilers,
List<HasMetadata> infrastructure,
List<PortFowardSpec> portForwards,
List<PortForwardSpec> portForwards,
List<Class<? extends CustomResource>> additionalCustomResourceDefinitions,
Duration infrastructureTimeout,
boolean preserveNamespaceOnError,
boolean waitForNamespaceDeletion,
Expand All @@ -55,6 +59,7 @@ private LocalOperatorExtension(
this.reconcilers = reconcilers;
this.portForwards = portForwards;
this.localPortForwards = new ArrayList<>(portForwards.size());
this.additionalCustomResourceDefinitions = additionalCustomResourceDefinitions;
this.operator = new Operator(getKubernetesClient(), this.configurationService);
this.registeredControllers = new HashMap<>();
}
Expand Down Expand Up @@ -114,10 +119,12 @@ protected void before(ExtensionContext context) {
.withName(podName).portForward(ref.getPort(), ref.getLocalPort()));
}

additionalCustomResourceDefinitions
.forEach(cr -> applyCrd(ReconcilerUtils.getResourceTypeName(cr)));

for (var ref : reconcilers) {
final var config = configurationService.getConfigurationFor(ref.reconciler);
final var oconfig = override(config).settingNamespace(namespace);
final var path = "/META-INF/fabric8/" + config.getResourceTypeName() + "-v1.yml";

if (ref.retry != null) {
oconfig.withRetry(ref.retry);
Expand All @@ -126,17 +133,7 @@ protected void before(ExtensionContext context) {
ref.controllerConfigurationOverrider.accept(oconfig);
}

try (InputStream is = getClass().getResourceAsStream(path)) {
final var crd = kubernetesClient.load(is);
crd.createOrReplace();
Thread.sleep(CRD_READY_WAIT); // readiness is not applicable for CRD, just wait a little
LOGGER.debug("Applied CRD with name: {}", config.getResourceTypeName());
} catch (InterruptedException ex) {
LOGGER.error("Interrupted.", ex);
Thread.currentThread().interrupt();
} catch (Exception ex) {
throw new IllegalStateException("Cannot apply CRD yaml: " + path, ex);
}
applyCrd(config.getResourceTypeName());

if (ref.reconciler instanceof KubernetesClientAware) {
((KubernetesClientAware) ref.reconciler).setKubernetesClient(kubernetesClient);
Expand All @@ -150,6 +147,21 @@ protected void before(ExtensionContext context) {
this.operator.start();
}

private void applyCrd(String resourceTypeName) {
String path = "/META-INF/fabric8/" + resourceTypeName + "-v1.yml";
try (InputStream is = getClass().getResourceAsStream(path)) {
final var crd = getKubernetesClient().load(is);
crd.createOrReplace();
Thread.sleep(CRD_READY_WAIT); // readiness is not applicable for CRD, just wait a little
LOGGER.debug("Applied CRD with path: {}", path);
} catch (InterruptedException ex) {
LOGGER.error("Interrupted.", ex);
Thread.currentThread().interrupt();
} catch (Exception ex) {
throw new IllegalStateException("Cannot apply CRD yaml: " + path, ex);
}
}

protected void after(ExtensionContext context) {
super.after(context);

Expand All @@ -172,12 +184,14 @@ protected void after(ExtensionContext context) {
@SuppressWarnings("rawtypes")
public static class Builder extends AbstractBuilder<Builder> {
private final List<ReconcilerSpec> reconcilers;
private final List<PortFowardSpec> portForwards;
private final List<PortForwardSpec> portForwards;
private final List<Class<? extends CustomResource>> additionalCustomResourceDefinitions;

protected Builder() {
super();
this.reconcilers = new ArrayList<>();
this.portForwards = new ArrayList<>();
this.additionalCustomResourceDefinitions = new ArrayList<>();
}

public Builder withReconciler(
Expand Down Expand Up @@ -217,31 +231,39 @@ public Builder withReconciler(Class<? extends Reconciler> value) {

public Builder withPortForward(String namespace, String labelKey, String labelValue, int port,
int localPort) {
portForwards.add(new PortFowardSpec(namespace, labelKey, labelValue, port, localPort));
portForwards.add(new PortForwardSpec(namespace, labelKey, labelValue, port, localPort));
return this;
}

public Builder withAdditionalCustomResourceDefinition(
Class<? extends CustomResource> customResource) {
additionalCustomResourceDefinitions.add(customResource);
return this;
}


public LocalOperatorExtension build() {
return new LocalOperatorExtension(
configurationService,
reconcilers,
infrastructure,
portForwards,
additionalCustomResourceDefinitions,
infrastructureTimeout,
preserveNamespaceOnError,
waitForNamespaceDeletion,
oneNamespacePerClass);
}
}

private static class PortFowardSpec {
private static class PortForwardSpec {
final String namespace;
final String labelKey;
final String labelValue;
final int port;
final int localPort;

public PortFowardSpec(String namespace, String labelKey, String labelValue, int port,
public PortForwardSpec(String namespace, String labelKey, String labelValue, int port,
int localPort) {
this.namespace = namespace;
this.labelKey = labelKey;
Expand Down
Loading