Skip to content

Primary to Secondary Mapper #1300

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Jun 24, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 29 additions & 22 deletions docs/documentation/features.md
Original file line number Diff line number Diff line change
Expand Up @@ -377,22 +377,12 @@ public class TomcatReconciler implements Reconciler<Tomcat>, EventSourceInitiali

@Override
public List<EventSource> prepareEventSources(EventSourceContext<Tomcat> context) {
SharedIndexInformer<Deployment> deploymentInformer =
kubernetesClient.apps()
.deployments()
.inAnyNamespace()
.withLabel("app.kubernetes.io/managed-by", "tomcat-operator")
.runnableInformer(0);

return List.of(
new InformerEventSource<>(deploymentInformer, d -> {
var ownerReferences = d.getMetadata().getOwnerReferences();
if (!ownerReferences.isEmpty()) {
return Set.of(new ResourceID(ownerReferences.get(0).getName(), d.getMetadata().getNamespace()));
} else {
return EMPTY_SET;
}
}));
var configMapEventSource =
new InformerEventSource<>(InformerConfiguration.from(Deployment.class, context)
.withLabelSelector(SELECTOR)
.withSecondaryToPrimaryMapper(Mappers.fromAnnotation(ANNOTATION_NAME,ANNOTATION_NAMESPACE)
.build(), context));
return EventSourceInitializer.nameEventSources(configMapEventSource);
}
...
}
Expand All @@ -401,21 +391,38 @@ public class TomcatReconciler implements Reconciler<Tomcat>, EventSourceInitiali
In the example above an `InformerEventSource` is registered (more on this specific eventsource later). Multiple things
are going on here:

1. An `SharedIndexInformer` (class from fabric8 Kubernetes client) is created. This will watch and produce events for
1. In the background `SharedIndexInformer` (class from fabric8 Kubernetes client) is created. This will watch and produce events for
`Deployments` in every namespace, but will filter them based on label. So `Deployments` which are not managed by
`tomcat-operator` (the label is not present on them) will not trigger a reconciliation.
2. In the next step
an [InformerEventSource](https://github.com/java-operator-sdk/java-operator-sdk/blob/main/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSource.java)
is created, which wraps the `SharedIndexInformer`. In addition to that a mapping functions is provided, **this maps
the event of the watched resource (in this case `Deployment`) to the custom resources to reconcile**. Not that in
this case this is a simple task, since `Deployment` is already created with an owner reference. Therefore,
the `ResourceID`
what identifies the custom resource to reconcile is created from the owner reference.
is created, which wraps the `SharedIndexInformer`. In addition to that a mapping functions is provided,
with `withSecondaryToPrimaryMapper`, this maps the event of the watched resource (in this case `Deployment`) to the
custom resources to reconcile. Note that usually this is covered by a default mapper , when `Deployment`
is created with an owner reference, the default mapper gets the mapping information from there. Thus,
the `ResourceID` what identifies the custom resource to reconcile is created from the owner reference.
For sake of the example a mapper is added that maps secondary to primary resource based on annotations.

Note that a set of `ResourceID` is returned, this is usually just a set with one element. The possibility to specify
multiple values are there to cover some rare corner cases. If an irrelevant resource is observed, an empty set can
be returned to not reconcile any custom resource.

### Managing Relation between Primary and Secondary Resources

As already touched in previous section, a `SecondaryToPrimaryMapper` is required to map events to trigger reconciliation
of the primary resource. By default, this is handled with a mapper that utilizes owner references. If an owner reference
cannot be used (for example resources are in different namespace), other mapper can be provided, typically an annotation
based on is provided.

Adding a `SecondaryToPrimaryMapper` is typically sufficient when there is a one-to-many relationship between primary and
secondary resources. The secondary resources can be mapped to its primary owner, and this is enough information to also
get the resource using the API from the context in reconciler: `context.getSecondaryResources(...)`. There are however
cases when to map the other way around this mapper is not enough, a `PrimaryToSecondaryMapper` is required.
This is typically when there is a many-to-one or many-to-many relationship between resources, thus the primary resource
is referencing a secondary resources. In these cases the mentioned reverse mapper is required to work properly.
See [PrimaryToSecondaryIT](https://github.com/java-operator-sdk/java-operator-sdk/blob/main/operator-framework/src/test/java/io/javaoperatorsdk/operator/PrimaryToSecondaryIT.java)
integration test for a sample.

### Built-in EventSources

There are multiple event-sources provided out of the box, the following are some more central ones:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import io.javaoperatorsdk.operator.api.config.ResourceConfiguration;
import io.javaoperatorsdk.operator.api.config.Utils;
import io.javaoperatorsdk.operator.api.reconciler.EventSourceContext;
import io.javaoperatorsdk.operator.processing.event.source.PrimaryToSecondaryMapper;
import io.javaoperatorsdk.operator.processing.event.source.SecondaryToPrimaryMapper;
import io.javaoperatorsdk.operator.processing.event.source.informer.Mappers;

Expand All @@ -19,15 +20,18 @@ public interface InformerConfiguration<R extends HasMetadata>
class DefaultInformerConfiguration<R extends HasMetadata> extends
DefaultResourceConfiguration<R> implements InformerConfiguration<R> {

private final PrimaryToSecondaryMapper<?> primaryToSecondaryMapper;
private final SecondaryToPrimaryMapper<R> secondaryToPrimaryMapper;
private final boolean followControllerNamespaceChanges;

protected DefaultInformerConfiguration(String labelSelector,
Class<R> resourceClass,
PrimaryToSecondaryMapper<?> primaryToSecondaryMapper,
SecondaryToPrimaryMapper<R> secondaryToPrimaryMapper,
Set<String> namespaces, boolean followControllerNamespaceChanges) {
super(labelSelector, resourceClass, namespaces);
this.followControllerNamespaceChanges = followControllerNamespaceChanges;
this.primaryToSecondaryMapper = primaryToSecondaryMapper;
this.secondaryToPrimaryMapper =
Objects.requireNonNullElse(secondaryToPrimaryMapper,
Mappers.fromOwnerReference());
Expand All @@ -41,6 +45,10 @@ public SecondaryToPrimaryMapper<R> getSecondaryToPrimaryMapper() {
return secondaryToPrimaryMapper;
}

@Override
public <P extends HasMetadata> PrimaryToSecondaryMapper<P> getPrimaryToSecondaryMapper() {
return (PrimaryToSecondaryMapper<P>) primaryToSecondaryMapper;
}
}

/**
Expand All @@ -53,9 +61,12 @@ public SecondaryToPrimaryMapper<R> getSecondaryToPrimaryMapper() {

SecondaryToPrimaryMapper<R> getSecondaryToPrimaryMapper();

<P extends HasMetadata> PrimaryToSecondaryMapper<P> getPrimaryToSecondaryMapper();

@SuppressWarnings("unused")
class InformerConfigurationBuilder<R extends HasMetadata> {

private PrimaryToSecondaryMapper<?> primaryToSecondaryMapper;
private SecondaryToPrimaryMapper<R> secondaryToPrimaryMapper;
private Set<String> namespaces;
private String labelSelector;
Expand All @@ -66,6 +77,12 @@ private InformerConfigurationBuilder(Class<R> resourceClass) {
this.resourceClass = resourceClass;
}

public <P extends HasMetadata> InformerConfigurationBuilder<R> withPrimaryToSecondaryMapper(
PrimaryToSecondaryMapper<P> primaryToSecondaryMapper) {
this.primaryToSecondaryMapper = primaryToSecondaryMapper;
return this;
}

public InformerConfigurationBuilder<R> withSecondaryToPrimaryMapper(
SecondaryToPrimaryMapper<R> secondaryToPrimaryMapper) {
this.secondaryToPrimaryMapper = secondaryToPrimaryMapper;
Expand Down Expand Up @@ -136,6 +153,7 @@ public InformerConfigurationBuilder<R> withLabelSelector(String labelSelector) {

public InformerConfiguration<R> build() {
return new DefaultInformerConfiguration<>(labelSelector, resourceClass,
primaryToSecondaryMapper,
secondaryToPrimaryMapper,
namespaces, inheritControllerNamespacesOnChange);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package io.javaoperatorsdk.operator.processing.event.source;

import java.util.Set;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.processing.event.ResourceID;

/**
* Primary to Secondary mapper only needed in some cases, typically when there it many-to-one or
* many-to-many relation between primary and secondary resources. If there is owner reference (or
* reference with annotations) from secondary to primary this is not needed. See
* PrimaryToSecondaryIT integration tests that handles many-to-many relationship.
*
* @param <P> primary resource type
*/
public interface PrimaryToSecondaryMapper<P extends HasMetadata> {

Set<ResourceID> toSecondaryResourceIDs(P primary);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package io.javaoperatorsdk.operator.processing.event.source.informer;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.SecondaryToPrimaryMapper;

class DefaultPrimaryToSecondaryIndex<R extends HasMetadata> implements PrimaryToSecondaryIndex<R> {

private SecondaryToPrimaryMapper<R> secondaryToPrimaryMapper;
private Map<ResourceID, Set<ResourceID>> index = new HashMap<>();

public DefaultPrimaryToSecondaryIndex(SecondaryToPrimaryMapper<R> secondaryToPrimaryMapper) {
this.secondaryToPrimaryMapper = secondaryToPrimaryMapper;
}

@Override
public synchronized void onAddOrUpdate(R resource) {
Set<ResourceID> primaryResources = secondaryToPrimaryMapper.toPrimaryResourceIDs(resource);
primaryResources.forEach(
primaryResource -> {
var resourceSet =
index.computeIfAbsent(primaryResource, pr -> ConcurrentHashMap.newKeySet());
resourceSet.add(ResourceID.fromResource(resource));
});
}

@Override
public synchronized void onDelete(R resource) {
Set<ResourceID> primaryResources = secondaryToPrimaryMapper.toPrimaryResourceIDs(resource);
primaryResources.forEach(
primaryResource -> {
var secondaryResources = index.get(primaryResource);
secondaryResources.remove(ResourceID.fromResource(resource));
if (secondaryResources.isEmpty()) {
index.remove(primaryResource);
}
});
}

@Override
public synchronized Set<ResourceID> getSecondaryResources(ResourceID primary) {
var resourceIDs = index.get(primary);
if (resourceIDs == null) {
return Collections.emptySet();
} else {
return Collections.unmodifiableSet(resourceIDs);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.javaoperatorsdk.operator.processing.event.Event;
import io.javaoperatorsdk.operator.processing.event.EventHandler;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.PrimaryToSecondaryMapper;

/**
* <p>
Expand Down Expand Up @@ -74,20 +75,23 @@ public class InformerEventSource<R extends HasMetadata, P extends HasMetadata>
private final EventRecorder<R> eventRecorder = new EventRecorder<>();
// we need direct control for the indexer to propagate the just update resource also to the index
private final PrimaryToSecondaryIndex<R> primaryToSecondaryIndex;
private final PrimaryToSecondaryMapper<P> primaryToSecondaryMapper;

public InformerEventSource(
InformerConfiguration<R> configuration, EventSourceContext<P> context) {
super(context.getClient().resources(configuration.getResourceClass()), configuration);
this.configuration = configuration;
primaryToSecondaryIndex =
new PrimaryToSecondaryIndex<>(configuration.getSecondaryToPrimaryMapper());
this(configuration, context.getClient());
}

public InformerEventSource(InformerConfiguration<R> configuration, KubernetesClient client) {
super(client.resources(configuration.getResourceClass()), configuration);
this.configuration = configuration;
primaryToSecondaryIndex =
new PrimaryToSecondaryIndex<>(configuration.getSecondaryToPrimaryMapper());
primaryToSecondaryMapper = configuration.getPrimaryToSecondaryMapper();
if (primaryToSecondaryMapper == null) {
primaryToSecondaryIndex =
new DefaultPrimaryToSecondaryIndex<>(configuration.getSecondaryToPrimaryMapper());
} else {
primaryToSecondaryIndex = NOOPPrimaryToSecondaryIndex.getInstance();
}
}

@Override
Expand Down Expand Up @@ -177,8 +181,13 @@ private void propagateEvent(R object) {

@Override
public Set<R> getSecondaryResources(P primary) {
var secondaryIDs =
primaryToSecondaryIndex.getSecondaryResources(ResourceID.fromResource(primary));
Set<ResourceID> secondaryIDs;
if (useSecondaryToPrimaryIndex()) {
secondaryIDs =
primaryToSecondaryIndex.getSecondaryResources(ResourceID.fromResource(primary));
} else {
secondaryIDs = primaryToSecondaryMapper.toSecondaryResourceIDs(primary);
}
return secondaryIDs.stream().map(this::get).flatMap(Optional::stream)
.collect(Collectors.toSet());
}
Expand Down Expand Up @@ -247,6 +256,10 @@ private void handleRecentResourceOperationAndStopEventRecording(R resource) {
}
}

private boolean useSecondaryToPrimaryIndex() {
return this.primaryToSecondaryMapper == null;
}

@Override
public synchronized void prepareForCreateOrUpdateEventFiltering(ResourceID resourceID,
R resource) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package io.javaoperatorsdk.operator.processing.event.source.informer;

import java.util.Set;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.processing.event.ResourceID;

class NOOPPrimaryToSecondaryIndex<R extends HasMetadata>
implements PrimaryToSecondaryIndex<R> {

@SuppressWarnings("rawtypes")
private static final NOOPPrimaryToSecondaryIndex instance = new NOOPPrimaryToSecondaryIndex();

public static <T extends HasMetadata> NOOPPrimaryToSecondaryIndex<T> getInstance() {
return instance;
}

private NOOPPrimaryToSecondaryIndex() {}

@Override
public void onAddOrUpdate(R resource) {
// empty method because of noop implementation
}

@Override
public void onDelete(R resource) {
// empty method because of noop implementation
}

@Override
public Set<ResourceID> getSecondaryResources(ResourceID primary) {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
@@ -1,49 +1,15 @@
package io.javaoperatorsdk.operator.processing.event.source.informer;

import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.Set;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.SecondaryToPrimaryMapper;

class PrimaryToSecondaryIndex<R extends HasMetadata> {
public interface PrimaryToSecondaryIndex<R extends HasMetadata> {

private SecondaryToPrimaryMapper<R> secondaryToPrimaryMapper;
private Map<ResourceID, Set<ResourceID>> index = new HashMap<>();
void onAddOrUpdate(R resource);

public PrimaryToSecondaryIndex(SecondaryToPrimaryMapper<R> secondaryToPrimaryMapper) {
this.secondaryToPrimaryMapper = secondaryToPrimaryMapper;
}
void onDelete(R resource);

public synchronized void onAddOrUpdate(R resource) {
Set<ResourceID> primaryResources = secondaryToPrimaryMapper.toPrimaryResourceIDs(resource);
primaryResources.forEach(
primaryResource -> {
var resourceSet =
index.computeIfAbsent(primaryResource, pr -> ConcurrentHashMap.newKeySet());
resourceSet.add(ResourceID.fromResource(resource));
});
}

public synchronized void onDelete(R resource) {
Set<ResourceID> primaryResources = secondaryToPrimaryMapper.toPrimaryResourceIDs(resource);
primaryResources.forEach(
primaryResource -> {
var secondaryResources = index.get(primaryResource);
secondaryResources.remove(ResourceID.fromResource(resource));
if (secondaryResources.isEmpty()) {
index.remove(primaryResource);
}
});
}

public synchronized Set<ResourceID> getSecondaryResources(ResourceID primary) {
var resourceIDs = index.get(primary);
if (resourceIDs == null) {
return Collections.emptySet();
} else {
return Collections.unmodifiableSet(resourceIDs);
}
}
Set<ResourceID> getSecondaryResources(ResourceID primary);
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;

class PrimaryToSecondaryIndexTest {
class DefaultPrimaryToSecondaryIndexTest {

private SecondaryToPrimaryMapper<ConfigMap> secondaryToPrimaryMapperMock =
mock(SecondaryToPrimaryMapper.class);
private PrimaryToSecondaryIndex<ConfigMap> primaryToSecondaryIndex =
new PrimaryToSecondaryIndex<>(secondaryToPrimaryMapperMock);
private DefaultPrimaryToSecondaryIndex<ConfigMap> primaryToSecondaryIndex =
new DefaultPrimaryToSecondaryIndex<>(secondaryToPrimaryMapperMock);

private ResourceID primaryID1 = new ResourceID("id1", "default");
private ResourceID primaryID2 = new ResourceID("id2", "default");
Expand Down
Loading