diff --git a/docs/documentation/features.md b/docs/documentation/features.md index 8cbf9ec841..7a604dc14e 100644 --- a/docs/documentation/features.md +++ b/docs/documentation/features.md @@ -377,22 +377,12 @@ public class TomcatReconciler implements Reconciler, EventSourceInitiali @Override public List prepareEventSources(EventSourceContext context) { - SharedIndexInformer deploymentInformer = - kubernetesClient.apps() - .deployments() - .inAnyNamespace() - .withLabel("app.kubernetes.io/managed-by", "tomcat-operator") - .runnableInformer(0); - - return List.of( - new InformerEventSource<>(deploymentInformer, d -> { - var ownerReferences = d.getMetadata().getOwnerReferences(); - if (!ownerReferences.isEmpty()) { - return Set.of(new ResourceID(ownerReferences.get(0).getName(), d.getMetadata().getNamespace())); - } else { - return EMPTY_SET; - } - })); + var configMapEventSource = + new InformerEventSource<>(InformerConfiguration.from(Deployment.class, context) + .withLabelSelector(SELECTOR) + .withSecondaryToPrimaryMapper(Mappers.fromAnnotation(ANNOTATION_NAME,ANNOTATION_NAMESPACE) + .build(), context)); + return EventSourceInitializer.nameEventSources(configMapEventSource); } ... } @@ -401,21 +391,38 @@ public class TomcatReconciler implements Reconciler, EventSourceInitiali In the example above an `InformerEventSource` is registered (more on this specific eventsource later). Multiple things are going on here: -1. An `SharedIndexInformer` (class from fabric8 Kubernetes client) is created. This will watch and produce events for +1. In the background `SharedIndexInformer` (class from fabric8 Kubernetes client) is created. This will watch and produce events for `Deployments` in every namespace, but will filter them based on label. So `Deployments` which are not managed by `tomcat-operator` (the label is not present on them) will not trigger a reconciliation. 2. In the next step an [InformerEventSource](https://github.com/java-operator-sdk/java-operator-sdk/blob/main/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java) - is created, which wraps the `SharedIndexInformer`. In addition to that a mapping functions is provided, **this maps - the event of the watched resource (in this case `Deployment`) to the custom resources to reconcile**. Not that in - this case this is a simple task, since `Deployment` is already created with an owner reference. Therefore, - the `ResourceID` - what identifies the custom resource to reconcile is created from the owner reference. + is created, which wraps the `SharedIndexInformer`. In addition to that a mapping functions is provided, + with `withSecondaryToPrimaryMapper`, this maps the event of the watched resource (in this case `Deployment`) to the + custom resources to reconcile. Note that usually this is covered by a default mapper , when `Deployment` + is created with an owner reference, the default mapper gets the mapping information from there. Thus, + the `ResourceID` what identifies the custom resource to reconcile is created from the owner reference. + For sake of the example a mapper is added that maps secondary to primary resource based on annotations. Note that a set of `ResourceID` is returned, this is usually just a set with one element. The possibility to specify multiple values are there to cover some rare corner cases. If an irrelevant resource is observed, an empty set can be returned to not reconcile any custom resource. +### Managing Relation between Primary and Secondary Resources + +As already touched in previous section, a `SecondaryToPrimaryMapper` is required to map events to trigger reconciliation +of the primary resource. By default, this is handled with a mapper that utilizes owner references. If an owner reference +cannot be used (for example resources are in different namespace), other mapper can be provided, typically an annotation +based on is provided. + +Adding a `SecondaryToPrimaryMapper` is typically sufficient when there is a one-to-many relationship between primary and +secondary resources. The secondary resources can be mapped to its primary owner, and this is enough information to also +get the resource using the API from the context in reconciler: `context.getSecondaryResources(...)`. There are however +cases when to map the other way around this mapper is not enough, a `PrimaryToSecondaryMapper` is required. +This is typically when there is a many-to-one or many-to-many relationship between resources, thus the primary resource +is referencing a secondary resources. In these cases the mentioned reverse mapper is required to work properly. +See [PrimaryToSecondaryIT](https://github.com/java-operator-sdk/java-operator-sdk/blob/main/operator-framework/src/test/java/io/javaoperatorsdk/operator/PrimaryToSecondaryIT.java) +integration test for a sample. + ### Built-in EventSources There are multiple event-sources provided out of the box, the following are some more central ones: diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/InformerConfiguration.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/InformerConfiguration.java index 9cb0f1ee1d..ddfae2919e 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/InformerConfiguration.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/informer/InformerConfiguration.java @@ -8,6 +8,7 @@ import io.javaoperatorsdk.operator.api.config.ResourceConfiguration; import io.javaoperatorsdk.operator.api.config.Utils; import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext; +import io.javaoperatorsdk.operator.processing.event.source.PrimaryToSecondaryMapper; import io.javaoperatorsdk.operator.processing.event.source.SecondaryToPrimaryMapper; import io.javaoperatorsdk.operator.processing.event.source.informer.Mappers; @@ -19,15 +20,18 @@ public interface InformerConfiguration class DefaultInformerConfiguration extends DefaultResourceConfiguration implements InformerConfiguration { + private final PrimaryToSecondaryMapper primaryToSecondaryMapper; private final SecondaryToPrimaryMapper secondaryToPrimaryMapper; private final boolean followControllerNamespaceChanges; protected DefaultInformerConfiguration(String labelSelector, Class resourceClass, + PrimaryToSecondaryMapper primaryToSecondaryMapper, SecondaryToPrimaryMapper secondaryToPrimaryMapper, Set namespaces, boolean followControllerNamespaceChanges) { super(labelSelector, resourceClass, namespaces); this.followControllerNamespaceChanges = followControllerNamespaceChanges; + this.primaryToSecondaryMapper = primaryToSecondaryMapper; this.secondaryToPrimaryMapper = Objects.requireNonNullElse(secondaryToPrimaryMapper, Mappers.fromOwnerReference()); @@ -41,6 +45,10 @@ public SecondaryToPrimaryMapper getSecondaryToPrimaryMapper() { return secondaryToPrimaryMapper; } + @Override + public

PrimaryToSecondaryMapper

getPrimaryToSecondaryMapper() { + return (PrimaryToSecondaryMapper

) primaryToSecondaryMapper; + } } /** @@ -53,9 +61,12 @@ public SecondaryToPrimaryMapper getSecondaryToPrimaryMapper() { SecondaryToPrimaryMapper getSecondaryToPrimaryMapper(); +

PrimaryToSecondaryMapper

getPrimaryToSecondaryMapper(); + @SuppressWarnings("unused") class InformerConfigurationBuilder { + private PrimaryToSecondaryMapper primaryToSecondaryMapper; private SecondaryToPrimaryMapper secondaryToPrimaryMapper; private Set namespaces; private String labelSelector; @@ -66,6 +77,12 @@ private InformerConfigurationBuilder(Class resourceClass) { this.resourceClass = resourceClass; } + public

InformerConfigurationBuilder withPrimaryToSecondaryMapper( + PrimaryToSecondaryMapper

primaryToSecondaryMapper) { + this.primaryToSecondaryMapper = primaryToSecondaryMapper; + return this; + } + public InformerConfigurationBuilder withSecondaryToPrimaryMapper( SecondaryToPrimaryMapper secondaryToPrimaryMapper) { this.secondaryToPrimaryMapper = secondaryToPrimaryMapper; @@ -136,6 +153,7 @@ public InformerConfigurationBuilder withLabelSelector(String labelSelector) { public InformerConfiguration build() { return new DefaultInformerConfiguration<>(labelSelector, resourceClass, + primaryToSecondaryMapper, secondaryToPrimaryMapper, namespaces, inheritControllerNamespacesOnChange); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/PrimaryToSecondaryMapper.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/PrimaryToSecondaryMapper.java new file mode 100644 index 0000000000..866a2b2251 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/PrimaryToSecondaryMapper.java @@ -0,0 +1,19 @@ +package io.javaoperatorsdk.operator.processing.event.source; + +import java.util.Set; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.javaoperatorsdk.operator.processing.event.ResourceID; + +/** + * Primary to Secondary mapper only needed in some cases, typically when there it many-to-one or + * many-to-many relation between primary and secondary resources. If there is owner reference (or + * reference with annotations) from secondary to primary this is not needed. See + * PrimaryToSecondaryIT integration tests that handles many-to-many relationship. + * + * @param

primary resource type + */ +public interface PrimaryToSecondaryMapper

{ + + Set toSecondaryResourceIDs(P primary); +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/DefaultPrimaryToSecondaryIndex.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/DefaultPrimaryToSecondaryIndex.java new file mode 100644 index 0000000000..65434cf53d --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/DefaultPrimaryToSecondaryIndex.java @@ -0,0 +1,52 @@ +package io.javaoperatorsdk.operator.processing.event.source.informer; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.javaoperatorsdk.operator.processing.event.ResourceID; +import io.javaoperatorsdk.operator.processing.event.source.SecondaryToPrimaryMapper; + +class DefaultPrimaryToSecondaryIndex implements PrimaryToSecondaryIndex { + + private SecondaryToPrimaryMapper secondaryToPrimaryMapper; + private Map> index = new HashMap<>(); + + public DefaultPrimaryToSecondaryIndex(SecondaryToPrimaryMapper secondaryToPrimaryMapper) { + this.secondaryToPrimaryMapper = secondaryToPrimaryMapper; + } + + @Override + public synchronized void onAddOrUpdate(R resource) { + Set primaryResources = secondaryToPrimaryMapper.toPrimaryResourceIDs(resource); + primaryResources.forEach( + primaryResource -> { + var resourceSet = + index.computeIfAbsent(primaryResource, pr -> ConcurrentHashMap.newKeySet()); + resourceSet.add(ResourceID.fromResource(resource)); + }); + } + + @Override + public synchronized void onDelete(R resource) { + Set primaryResources = secondaryToPrimaryMapper.toPrimaryResourceIDs(resource); + primaryResources.forEach( + primaryResource -> { + var secondaryResources = index.get(primaryResource); + secondaryResources.remove(ResourceID.fromResource(resource)); + if (secondaryResources.isEmpty()) { + index.remove(primaryResource); + } + }); + } + + @Override + public synchronized Set getSecondaryResources(ResourceID primary) { + var resourceIDs = index.get(primary); + if (resourceIDs == null) { + return Collections.emptySet(); + } else { + return Collections.unmodifiableSet(resourceIDs); + } + } +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java index 47fe4abcf3..4e0d96d5d9 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java @@ -16,6 +16,7 @@ import io.javaoperatorsdk.operator.processing.event.Event; import io.javaoperatorsdk.operator.processing.event.EventHandler; import io.javaoperatorsdk.operator.processing.event.ResourceID; +import io.javaoperatorsdk.operator.processing.event.source.PrimaryToSecondaryMapper; /** *

@@ -74,20 +75,23 @@ public class InformerEventSource private final EventRecorder eventRecorder = new EventRecorder<>(); // we need direct control for the indexer to propagate the just update resource also to the index private final PrimaryToSecondaryIndex primaryToSecondaryIndex; + private final PrimaryToSecondaryMapper

primaryToSecondaryMapper; public InformerEventSource( InformerConfiguration configuration, EventSourceContext

context) { - super(context.getClient().resources(configuration.getResourceClass()), configuration); - this.configuration = configuration; - primaryToSecondaryIndex = - new PrimaryToSecondaryIndex<>(configuration.getSecondaryToPrimaryMapper()); + this(configuration, context.getClient()); } public InformerEventSource(InformerConfiguration configuration, KubernetesClient client) { super(client.resources(configuration.getResourceClass()), configuration); this.configuration = configuration; - primaryToSecondaryIndex = - new PrimaryToSecondaryIndex<>(configuration.getSecondaryToPrimaryMapper()); + primaryToSecondaryMapper = configuration.getPrimaryToSecondaryMapper(); + if (primaryToSecondaryMapper == null) { + primaryToSecondaryIndex = + new DefaultPrimaryToSecondaryIndex<>(configuration.getSecondaryToPrimaryMapper()); + } else { + primaryToSecondaryIndex = NOOPPrimaryToSecondaryIndex.getInstance(); + } } @Override @@ -177,8 +181,13 @@ private void propagateEvent(R object) { @Override public Set getSecondaryResources(P primary) { - var secondaryIDs = - primaryToSecondaryIndex.getSecondaryResources(ResourceID.fromResource(primary)); + Set secondaryIDs; + if (useSecondaryToPrimaryIndex()) { + secondaryIDs = + primaryToSecondaryIndex.getSecondaryResources(ResourceID.fromResource(primary)); + } else { + secondaryIDs = primaryToSecondaryMapper.toSecondaryResourceIDs(primary); + } return secondaryIDs.stream().map(this::get).flatMap(Optional::stream) .collect(Collectors.toSet()); } @@ -247,6 +256,10 @@ private void handleRecentResourceOperationAndStopEventRecording(R resource) { } } + private boolean useSecondaryToPrimaryIndex() { + return this.primaryToSecondaryMapper == null; + } + @Override public synchronized void prepareForCreateOrUpdateEventFiltering(ResourceID resourceID, R resource) { diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/NOOPPrimaryToSecondaryIndex.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/NOOPPrimaryToSecondaryIndex.java new file mode 100644 index 0000000000..ddc8cbec18 --- /dev/null +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/NOOPPrimaryToSecondaryIndex.java @@ -0,0 +1,34 @@ +package io.javaoperatorsdk.operator.processing.event.source.informer; + +import java.util.Set; + +import io.fabric8.kubernetes.api.model.HasMetadata; +import io.javaoperatorsdk.operator.processing.event.ResourceID; + +class NOOPPrimaryToSecondaryIndex + implements PrimaryToSecondaryIndex { + + @SuppressWarnings("rawtypes") + private static final NOOPPrimaryToSecondaryIndex instance = new NOOPPrimaryToSecondaryIndex(); + + public static NOOPPrimaryToSecondaryIndex getInstance() { + return instance; + } + + private NOOPPrimaryToSecondaryIndex() {} + + @Override + public void onAddOrUpdate(R resource) { + // empty method because of noop implementation + } + + @Override + public void onDelete(R resource) { + // empty method because of noop implementation + } + + @Override + public Set getSecondaryResources(ResourceID primary) { + throw new UnsupportedOperationException(); + } +} diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/PrimaryToSecondaryIndex.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/PrimaryToSecondaryIndex.java index 8918895aa1..7a87b23272 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/PrimaryToSecondaryIndex.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/PrimaryToSecondaryIndex.java @@ -1,49 +1,15 @@ package io.javaoperatorsdk.operator.processing.event.source.informer; -import java.util.*; -import java.util.concurrent.ConcurrentHashMap; +import java.util.Set; import io.fabric8.kubernetes.api.model.HasMetadata; import io.javaoperatorsdk.operator.processing.event.ResourceID; -import io.javaoperatorsdk.operator.processing.event.source.SecondaryToPrimaryMapper; -class PrimaryToSecondaryIndex { +public interface PrimaryToSecondaryIndex { - private SecondaryToPrimaryMapper secondaryToPrimaryMapper; - private Map> index = new HashMap<>(); + void onAddOrUpdate(R resource); - public PrimaryToSecondaryIndex(SecondaryToPrimaryMapper secondaryToPrimaryMapper) { - this.secondaryToPrimaryMapper = secondaryToPrimaryMapper; - } + void onDelete(R resource); - public synchronized void onAddOrUpdate(R resource) { - Set primaryResources = secondaryToPrimaryMapper.toPrimaryResourceIDs(resource); - primaryResources.forEach( - primaryResource -> { - var resourceSet = - index.computeIfAbsent(primaryResource, pr -> ConcurrentHashMap.newKeySet()); - resourceSet.add(ResourceID.fromResource(resource)); - }); - } - - public synchronized void onDelete(R resource) { - Set primaryResources = secondaryToPrimaryMapper.toPrimaryResourceIDs(resource); - primaryResources.forEach( - primaryResource -> { - var secondaryResources = index.get(primaryResource); - secondaryResources.remove(ResourceID.fromResource(resource)); - if (secondaryResources.isEmpty()) { - index.remove(primaryResource); - } - }); - } - - public synchronized Set getSecondaryResources(ResourceID primary) { - var resourceIDs = index.get(primary); - if (resourceIDs == null) { - return Collections.emptySet(); - } else { - return Collections.unmodifiableSet(resourceIDs); - } - } + Set getSecondaryResources(ResourceID primary); } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/PrimaryToSecondaryIndexTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/DefaultPrimaryToSecondaryIndexTest.java similarity index 94% rename from operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/PrimaryToSecondaryIndexTest.java rename to operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/DefaultPrimaryToSecondaryIndexTest.java index ca73b135a7..da2d1b7cf0 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/PrimaryToSecondaryIndexTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/DefaultPrimaryToSecondaryIndexTest.java @@ -15,12 +15,12 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; -class PrimaryToSecondaryIndexTest { +class DefaultPrimaryToSecondaryIndexTest { private SecondaryToPrimaryMapper secondaryToPrimaryMapperMock = mock(SecondaryToPrimaryMapper.class); - private PrimaryToSecondaryIndex primaryToSecondaryIndex = - new PrimaryToSecondaryIndex<>(secondaryToPrimaryMapperMock); + private DefaultPrimaryToSecondaryIndex primaryToSecondaryIndex = + new DefaultPrimaryToSecondaryIndex<>(secondaryToPrimaryMapperMock); private ResourceID primaryID1 = new ResourceID("id1", "default"); private ResourceID primaryID2 = new ResourceID("id2", "default"); diff --git a/operator-framework-junit5/src/main/java/io/javaoperatorsdk/operator/junit/LocalOperatorExtension.java b/operator-framework-junit5/src/main/java/io/javaoperatorsdk/operator/junit/LocalOperatorExtension.java index edccd025a6..2eb0fa5900 100644 --- a/operator-framework-junit5/src/main/java/io/javaoperatorsdk/operator/junit/LocalOperatorExtension.java +++ b/operator-framework-junit5/src/main/java/io/javaoperatorsdk/operator/junit/LocalOperatorExtension.java @@ -15,8 +15,10 @@ import org.slf4j.LoggerFactory; import io.fabric8.kubernetes.api.model.HasMetadata; +import io.fabric8.kubernetes.client.CustomResource; import io.fabric8.kubernetes.client.LocalPortForward; import io.javaoperatorsdk.operator.Operator; +import io.javaoperatorsdk.operator.ReconcilerUtils; import io.javaoperatorsdk.operator.RegisteredController; import io.javaoperatorsdk.operator.api.config.ConfigurationService; import io.javaoperatorsdk.operator.api.config.ControllerConfigurationOverrider; @@ -32,15 +34,17 @@ public class LocalOperatorExtension extends AbstractOperatorExtension { private final Operator operator; private final List reconcilers; - private final List portForwards; + private final List portForwards; private final List localPortForwards; + private final List> additionalCustomResourceDefinitions; private final Map registeredControllers; private LocalOperatorExtension( ConfigurationService configurationService, List reconcilers, List infrastructure, - List portForwards, + List portForwards, + List> additionalCustomResourceDefinitions, Duration infrastructureTimeout, boolean preserveNamespaceOnError, boolean waitForNamespaceDeletion, @@ -55,6 +59,7 @@ private LocalOperatorExtension( this.reconcilers = reconcilers; this.portForwards = portForwards; this.localPortForwards = new ArrayList<>(portForwards.size()); + this.additionalCustomResourceDefinitions = additionalCustomResourceDefinitions; this.operator = new Operator(getKubernetesClient(), this.configurationService); this.registeredControllers = new HashMap<>(); } @@ -114,10 +119,12 @@ protected void before(ExtensionContext context) { .withName(podName).portForward(ref.getPort(), ref.getLocalPort())); } + additionalCustomResourceDefinitions + .forEach(cr -> applyCrd(ReconcilerUtils.getResourceTypeName(cr))); + for (var ref : reconcilers) { final var config = configurationService.getConfigurationFor(ref.reconciler); final var oconfig = override(config).settingNamespace(namespace); - final var path = "/META-INF/fabric8/" + config.getResourceTypeName() + "-v1.yml"; if (ref.retry != null) { oconfig.withRetry(ref.retry); @@ -126,17 +133,7 @@ protected void before(ExtensionContext context) { ref.controllerConfigurationOverrider.accept(oconfig); } - try (InputStream is = getClass().getResourceAsStream(path)) { - final var crd = kubernetesClient.load(is); - crd.createOrReplace(); - Thread.sleep(CRD_READY_WAIT); // readiness is not applicable for CRD, just wait a little - LOGGER.debug("Applied CRD with name: {}", config.getResourceTypeName()); - } catch (InterruptedException ex) { - LOGGER.error("Interrupted.", ex); - Thread.currentThread().interrupt(); - } catch (Exception ex) { - throw new IllegalStateException("Cannot apply CRD yaml: " + path, ex); - } + applyCrd(config.getResourceTypeName()); if (ref.reconciler instanceof KubernetesClientAware) { ((KubernetesClientAware) ref.reconciler).setKubernetesClient(kubernetesClient); @@ -150,6 +147,21 @@ protected void before(ExtensionContext context) { this.operator.start(); } + private void applyCrd(String resourceTypeName) { + String path = "/META-INF/fabric8/" + resourceTypeName + "-v1.yml"; + try (InputStream is = getClass().getResourceAsStream(path)) { + final var crd = getKubernetesClient().load(is); + crd.createOrReplace(); + Thread.sleep(CRD_READY_WAIT); // readiness is not applicable for CRD, just wait a little + LOGGER.debug("Applied CRD with path: {}", path); + } catch (InterruptedException ex) { + LOGGER.error("Interrupted.", ex); + Thread.currentThread().interrupt(); + } catch (Exception ex) { + throw new IllegalStateException("Cannot apply CRD yaml: " + path, ex); + } + } + protected void after(ExtensionContext context) { super.after(context); @@ -172,12 +184,14 @@ protected void after(ExtensionContext context) { @SuppressWarnings("rawtypes") public static class Builder extends AbstractBuilder { private final List reconcilers; - private final List portForwards; + private final List portForwards; + private final List> additionalCustomResourceDefinitions; protected Builder() { super(); this.reconcilers = new ArrayList<>(); this.portForwards = new ArrayList<>(); + this.additionalCustomResourceDefinitions = new ArrayList<>(); } public Builder withReconciler( @@ -217,16 +231,24 @@ public Builder withReconciler(Class value) { public Builder withPortForward(String namespace, String labelKey, String labelValue, int port, int localPort) { - portForwards.add(new PortFowardSpec(namespace, labelKey, labelValue, port, localPort)); + portForwards.add(new PortForwardSpec(namespace, labelKey, labelValue, port, localPort)); return this; } + public Builder withAdditionalCustomResourceDefinition( + Class customResource) { + additionalCustomResourceDefinitions.add(customResource); + return this; + } + + public LocalOperatorExtension build() { return new LocalOperatorExtension( configurationService, reconcilers, infrastructure, portForwards, + additionalCustomResourceDefinitions, infrastructureTimeout, preserveNamespaceOnError, waitForNamespaceDeletion, @@ -234,14 +256,14 @@ public LocalOperatorExtension build() { } } - private static class PortFowardSpec { + private static class PortForwardSpec { final String namespace; final String labelKey; final String labelValue; final int port; final int localPort; - public PortFowardSpec(String namespace, String labelKey, String labelValue, int port, + public PortForwardSpec(String namespace, String labelKey, String labelValue, int port, int localPort) { this.namespace = namespace; this.labelKey = labelKey; diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/PrimaryToSecondaryIT.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/PrimaryToSecondaryIT.java new file mode 100644 index 0000000000..b54923c6e6 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/PrimaryToSecondaryIT.java @@ -0,0 +1,59 @@ +package io.javaoperatorsdk.operator; + +import java.time.Duration; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.RegisterExtension; + +import io.fabric8.kubernetes.api.model.ObjectMetaBuilder; +import io.javaoperatorsdk.operator.junit.LocalOperatorExtension; +import io.javaoperatorsdk.operator.sample.primarytosecondary.Cluster; +import io.javaoperatorsdk.operator.sample.primarytosecondary.Job; +import io.javaoperatorsdk.operator.sample.primarytosecondary.JobReconciler; +import io.javaoperatorsdk.operator.sample.primarytosecondary.JobSpec; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.awaitility.Awaitility.await; + +class PrimaryToSecondaryIT { + + public static final String CLUSTER_NAME = "cluster1"; + public static final int MIN_DELAY = 150; + + @RegisterExtension + LocalOperatorExtension operator = + LocalOperatorExtension.builder() + .withAdditionalCustomResourceDefinition(Cluster.class) + .withReconciler(new JobReconciler()) + .build(); + + @Test + void readsSecondaryInManyToOneCases() throws InterruptedException { + operator.create(Cluster.class, cluster()); + Thread.sleep(MIN_DELAY); + operator.create(Job.class, job()); + + await().pollDelay(Duration.ofMillis(300)).untilAsserted( + () -> assertThat(operator.getReconcilerOfType(JobReconciler.class).getNumberOfExecutions()) + .isEqualTo(1)); + } + + Job job() { + var job = new Job(); + job.setMetadata(new ObjectMetaBuilder() + .withName("job1") + .build()); + job.setSpec(new JobSpec()); + job.getSpec().setClusterName(CLUSTER_NAME); + return job; + } + + Cluster cluster() { + Cluster cluster = new Cluster(); + cluster.setMetadata(new ObjectMetaBuilder() + .withName(CLUSTER_NAME) + .build()); + return cluster; + } + +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/primarytosecondary/Cluster.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/primarytosecondary/Cluster.java new file mode 100644 index 0000000000..ffc43141a4 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/primarytosecondary/Cluster.java @@ -0,0 +1,15 @@ +package io.javaoperatorsdk.operator.sample.primarytosecondary; + +import io.fabric8.kubernetes.api.model.Namespaced; +import io.fabric8.kubernetes.client.CustomResource; +import io.fabric8.kubernetes.model.annotation.Group; +import io.fabric8.kubernetes.model.annotation.ShortNames; +import io.fabric8.kubernetes.model.annotation.Version; + +@Group("sample.javaoperatorsdk") +@Version("v1") +@ShortNames("clu") +public class Cluster + extends CustomResource + implements Namespaced { +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/primarytosecondary/ClusterStatus.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/primarytosecondary/ClusterStatus.java new file mode 100644 index 0000000000..eac5c48b70 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/primarytosecondary/ClusterStatus.java @@ -0,0 +1,5 @@ +package io.javaoperatorsdk.operator.sample.primarytosecondary; + +public class ClusterStatus { + +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/primarytosecondary/Job.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/primarytosecondary/Job.java new file mode 100644 index 0000000000..5a3d43de79 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/primarytosecondary/Job.java @@ -0,0 +1,15 @@ +package io.javaoperatorsdk.operator.sample.primarytosecondary; + +import io.fabric8.kubernetes.api.model.Namespaced; +import io.fabric8.kubernetes.client.CustomResource; +import io.fabric8.kubernetes.model.annotation.Group; +import io.fabric8.kubernetes.model.annotation.ShortNames; +import io.fabric8.kubernetes.model.annotation.Version; + +@Group("sample.javaoperatorsdk") +@Version("v1") +@ShortNames("cjo") +public class Job + extends CustomResource + implements Namespaced { +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/primarytosecondary/JobReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/primarytosecondary/JobReconciler.java new file mode 100644 index 0000000000..1ac7676265 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/primarytosecondary/JobReconciler.java @@ -0,0 +1,62 @@ +package io.javaoperatorsdk.operator.sample.primarytosecondary; + +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; + +import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration; +import io.javaoperatorsdk.operator.api.reconciler.*; +import io.javaoperatorsdk.operator.processing.event.ResourceID; +import io.javaoperatorsdk.operator.processing.event.source.EventSource; +import io.javaoperatorsdk.operator.processing.event.source.PrimaryToSecondaryMapper; +import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource; + +@ControllerConfiguration() +public class JobReconciler + implements Reconciler, EventSourceInitializer { + + private static final String JOB_CLUSTER_INDEX = "job-cluster-index"; + + private final AtomicInteger numberOfExecutions = new AtomicInteger(0); + + @Override + public UpdateControl reconcile( + Job resource, Context context) { + + context.getSecondaryResource(Cluster.class) + .orElseThrow(() -> new IllegalStateException("Secondary resource should be present")); + numberOfExecutions.addAndGet(1); + return UpdateControl.noUpdate(); + } + + @Override + public Map prepareEventSources(EventSourceContext context) { + context.getPrimaryCache().addIndexer(JOB_CLUSTER_INDEX, (job -> List + .of(indexKey(job.getSpec().getClusterName(), job.getMetadata().getNamespace())))); + + InformerConfiguration informerConfiguration = + InformerConfiguration.from(Cluster.class, context) + .withSecondaryToPrimaryMapper(cluster -> context.getPrimaryCache() + .byIndex(JOB_CLUSTER_INDEX, indexKey(cluster.getMetadata().getName(), + cluster.getMetadata().getNamespace())) + .stream().map(ResourceID::fromResource).collect(Collectors.toSet())) + .withPrimaryToSecondaryMapper( + (PrimaryToSecondaryMapper) primary -> Set.of(new ResourceID( + primary.getSpec().getClusterName(), primary.getMetadata().getNamespace()))) + .withNamespacesInheritedFromController(context) + .build(); + + return EventSourceInitializer + .nameEventSources(new InformerEventSource<>(informerConfiguration, context)); + } + + private String indexKey(String clusterName, String namespace) { + return clusterName + "#" + namespace; + } + + public int getNumberOfExecutions() { + return numberOfExecutions.get(); + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/primarytosecondary/JobSpec.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/primarytosecondary/JobSpec.java new file mode 100644 index 0000000000..c7546dea71 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/primarytosecondary/JobSpec.java @@ -0,0 +1,15 @@ +package io.javaoperatorsdk.operator.sample.primarytosecondary; + +public class JobSpec { + + private String clusterName; + + public String getClusterName() { + return clusterName; + } + + public JobSpec setClusterName(String clusterName) { + this.clusterName = clusterName; + return this; + } +} diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/primarytosecondary/JobStatus.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/primarytosecondary/JobStatus.java new file mode 100644 index 0000000000..b2d7cf6259 --- /dev/null +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/primarytosecondary/JobStatus.java @@ -0,0 +1,5 @@ +package io.javaoperatorsdk.operator.sample.primarytosecondary; + +public class JobStatus { + +}