Skip to content

feat: workflows can be created without associated dependent resource #1632

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 19 commits into from
Dec 2, 2022
Merged
Show file tree
Hide file tree
Changes from 16 commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
87b4f6c
feat: make it possible to control how workflows are created
metacosm Nov 25, 2022
f671a21
refactor: rename DependentResourceNode to DefaultDependentResourceNode
metacosm Nov 25, 2022
e02b038
refactor: remove unneeded methods on Workflow
metacosm Nov 25, 2022
aa508e8
refactor: extract DependentResourceNode interface and use it
metacosm Nov 25, 2022
5e12ceb
feat: workflows can be created without associated dependent resource
metacosm Nov 25, 2022
8df1b32
fix: format
metacosm Nov 25, 2022
27145f8
refactor: extract common behavior from workflow reconcile and cleanup
metacosm Nov 26, 2022
d31eb4b
refactor: extract common behavior from node executors
metacosm Nov 26, 2022
b662118
refactor: extract canDeleteIfAble method
metacosm Nov 26, 2022
52a3686
fix: wait should be called from synchronized block
metacosm Nov 26, 2022
ccaa524
fix: format
metacosm Nov 26, 2022
3813feb
refactor: share code between node implementations
metacosm Nov 27, 2022
da9ca92
refactor: simplify dependent resource resolution
metacosm Nov 27, 2022
46e8cdf
refactor: move WorkflowBuilder to same directory, protect constructors
metacosm Nov 27, 2022
747aefe
refactor: reduce need to expose ManagedWorkflowSupport
metacosm Nov 28, 2022
9696748
refactor: associate nodes with their name
metacosm Nov 28, 2022
6d597ba
refactor: rename canDeleteIfAble to isDeletable
metacosm Dec 2, 2022
7b4a50f
refactor: isDeletable is now an instance method, overridable in impls
metacosm Dec 2, 2022
0a1a381
fix: tests
metacosm Dec 2, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import io.javaoperatorsdk.operator.api.monitoring.Metrics;
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResourceFactory;
import io.javaoperatorsdk.operator.processing.dependent.workflow.ManagedWorkflowFactory;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -193,4 +194,8 @@ default Optional<InformerStoppedHandler> getInformerStoppedHandler() {
}
});
}

default ManagedWorkflowFactory getWorkflowFactory() {
return ManagedWorkflowFactory.DEFAULT;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,8 @@ default Optional<R> getSecondaryResource(P primary, Context<P> context) {
static String defaultNameFor(Class<? extends DependentResource> dependentResourceClass) {
return dependentResourceClass.getName();
}

static boolean canDeleteIfAble(DependentResource<?, ?> dependentResource) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this naming somehow feels strange

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What would you name it?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deletable?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also problem here is that GarbageCollected is only applicatble for KubernetesDependentResource, so it is little smelly that it's on this layer.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, wasn't too sure where to put it but the problem is that the code in this method was spread at different spots that didn't have anything to do with KubernetesDependentResource either so it's less smelly now than it was.

return dependentResource instanceof Deleter && !(dependentResource instanceof GarbageCollected);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would rather just check Deleter here. And override this is KubernetesDependentResource and check there also the GarbageCOllected as here. So it more expresses that GarbageCollected just applies on KubernetesDependentResource

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This isn't the case in the tests, though…

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm where exactly?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See 0a1a381

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,12 @@ public Controller(Reconciler<P> reconciler,
this.reconciler = reconciler;
this.configuration = configuration;
this.kubernetesClient = kubernetesClient;
this.metrics = Optional.ofNullable(ConfigurationServiceProvider.instance().getMetrics())
.orElse(Metrics.NOOP);
final var configurationService = ConfigurationServiceProvider.instance();
this.metrics = Optional.ofNullable(configurationService.getMetrics()).orElse(Metrics.NOOP);
contextInitializer = reconciler instanceof ContextInitializer;
isCleaner = reconciler instanceof Cleaner;
managedWorkflow =
ManagedWorkflow.workflowFor(kubernetesClient, configuration.getDependentResources());
managedWorkflow = configurationService.getWorkflowFactory().workflowFor(configuration);
managedWorkflow.resolve(kubernetesClient, configuration.getDependentResources());

eventSourceManager = new EventSourceManager<>(this);
eventProcessor = new EventProcessor<>(eventSourceManager);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package io.javaoperatorsdk.operator.processing.dependent.workflow;

import java.util.LinkedList;
import java.util.List;
import java.util.Optional;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource;

@SuppressWarnings("rawtypes")
abstract class AbstractDependentResourceNode<R, P extends HasMetadata>
implements DependentResourceNode<R, P> {

private final List<DependentResourceNode> dependsOn = new LinkedList<>();
private final List<DependentResourceNode> parents = new LinkedList<>();
private final String name;
private Condition<R, P> reconcilePrecondition;
private Condition<R, P> deletePostcondition;
private Condition<R, P> readyPostcondition;
private DependentResource<R, P> dependentResource;

protected AbstractDependentResourceNode(String name) {
this.name = name;
}

@Override
public List<? extends DependentResourceNode> getDependsOn() {
return dependsOn;
}

@Override
public void addParent(DependentResourceNode parent) {
parents.add(parent);
}

@Override
public void addDependsOnRelation(DependentResourceNode node) {
node.addParent(this);
dependsOn.add(node);
}

@Override
public List<DependentResourceNode> getParents() {
return parents;
}

@Override
public String getName() {
return name;
}

@Override
public Optional<Condition<R, P>> getReconcilePrecondition() {
return Optional.ofNullable(reconcilePrecondition);
}

@Override
public Optional<Condition<R, P>> getDeletePostcondition() {
return Optional.ofNullable(deletePostcondition);
}

public void setReconcilePrecondition(Condition<R, P> reconcilePrecondition) {
this.reconcilePrecondition = reconcilePrecondition;
}

public void setDeletePostcondition(Condition<R, P> cleanupCondition) {
this.deletePostcondition = cleanupCondition;
}

@Override
public Optional<Condition<R, P>> getReadyPostcondition() {
return Optional.ofNullable(readyPostcondition);
}

public void setReadyPostcondition(Condition<R, P> readyPostcondition) {
this.readyPostcondition = readyPostcondition;
}

public DependentResource<R, P> getDependentResource() {
return dependentResource;
}

public void setDependentResource(DependentResource<R, P> dependentResource) {
this.dependentResource = dependentResource;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
AbstractDependentResourceNode<?, ?> that = (AbstractDependentResourceNode<?, ?>) o;
return name.equals(that.name);
}

@Override
public int hashCode() {
return name.hashCode();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package io.javaoperatorsdk.operator.processing.dependent.workflow;

import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.Future;
import java.util.stream.Collectors;

import org.slf4j.Logger;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource;

@SuppressWarnings("rawtypes")
public abstract class AbstractWorkflowExecutor<P extends HasMetadata> {

protected final Workflow<P> workflow;
protected final P primary;
protected final Context<P> context;
/**
* Covers both deleted and reconciled
*/
private final Set<DependentResourceNode> alreadyVisited = ConcurrentHashMap.newKeySet();
private final Map<DependentResourceNode, Future<?>> actualExecutions = new HashMap<>();
private final Map<DependentResourceNode, Exception> exceptionsDuringExecution =
new ConcurrentHashMap<>();

public AbstractWorkflowExecutor(Workflow<P> workflow, P primary, Context<P> context) {
this.workflow = workflow;
this.primary = primary;
this.context = context;
}

protected abstract Logger logger();

protected synchronized void waitForScheduledExecutionsToRun() {
while (true) {
try {
this.wait();
if (noMoreExecutionsScheduled()) {
break;
} else {
logger().warn("Notified but still resources under execution. This should not happen.");
}
} catch (InterruptedException e) {
logger().warn("Thread interrupted", e);
Thread.currentThread().interrupt();
}
}
}

protected boolean noMoreExecutionsScheduled() {
return actualExecutions.isEmpty();
}

protected boolean alreadyVisited(DependentResourceNode<?, P> dependentResourceNode) {
return alreadyVisited.contains(dependentResourceNode);
}

protected void markAsVisited(DependentResourceNode<?, P> dependentResourceNode) {
alreadyVisited.add(dependentResourceNode);
}

protected boolean isExecutingNow(DependentResourceNode<?, P> dependentResourceNode) {
return actualExecutions.containsKey(dependentResourceNode);
}

protected void markAsExecuting(DependentResourceNode<?, P> dependentResourceNode,
Future<?> future) {
actualExecutions.put(dependentResourceNode, future);
}

protected synchronized void handleExceptionInExecutor(
DependentResourceNode<?, P> dependentResourceNode,
RuntimeException e) {
exceptionsDuringExecution.put(dependentResourceNode, e);
}

protected boolean isInError(DependentResourceNode<?, P> dependentResourceNode) {
return exceptionsDuringExecution.containsKey(dependentResourceNode);
}

protected Map<DependentResource, Exception> getErroredDependents() {
return exceptionsDuringExecution.entrySet().stream()
.collect(
Collectors.toMap(e -> workflow.getDependentResourceFor(e.getKey()), Entry::getValue));
}

protected synchronized void handleNodeExecutionFinish(
DependentResourceNode<?, P> dependentResourceNode) {
logger().debug("Finished execution for: {}", dependentResourceNode);
actualExecutions.remove(dependentResourceNode);
if (noMoreExecutionsScheduled()) {
this.notifyAll();
}
}

@SuppressWarnings("unchecked")
protected <R> DependentResource<R, P> getDependentResourceFor(DependentResourceNode<R, P> drn) {
return (DependentResource<R, P>) workflow.getDependentResourceFor(drn);
}

protected <R> boolean isConditionMet(Optional<Condition<R, P>> condition,
DependentResource<R, P> dependentResource) {
return condition.map(c -> c.isMet(primary,
dependentResource.getSecondaryResource(primary, context).orElse(null),
context))
.orElse(true);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package io.javaoperatorsdk.operator.processing.dependent.workflow;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource;

class DefaultDependentResourceNode<R, P extends HasMetadata>
extends AbstractDependentResourceNode<R, P> {

public DefaultDependentResourceNode(DependentResource<R, P> dependentResource) {
this(dependentResource, null, null);
}

public DefaultDependentResourceNode(DependentResource<R, P> dependentResource,
Condition<R, P> reconcilePrecondition, Condition<R, P> deletePostcondition) {
super(getNameFor(dependentResource));
setDependentResource(dependentResource);
setReconcilePrecondition(reconcilePrecondition);
setDeletePostcondition(deletePostcondition);
}

@SuppressWarnings("rawtypes")
static String getNameFor(DependentResource dependentResource) {
return DependentResource.defaultNameFor(dependentResource.getClass()) + "#"
+ dependentResource.hashCode();
}

@Override
public String toString() {
return "DependentResourceNode{" + getDependentResource() + '}';
}
}
Original file line number Diff line number Diff line change
@@ -1,65 +1,65 @@
package io.javaoperatorsdk.operator.processing.dependent.workflow;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import io.fabric8.kubernetes.api.model.HasMetadata;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.javaoperatorsdk.operator.api.config.dependent.DependentResourceSpec;
import io.javaoperatorsdk.operator.api.reconciler.Context;
import io.javaoperatorsdk.operator.api.reconciler.dependent.Deleter;
import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource;
import io.javaoperatorsdk.operator.api.reconciler.dependent.GarbageCollected;

@SuppressWarnings("rawtypes")
public class DefaultManagedWorkflow<P extends HasMetadata> implements ManagedWorkflow<P> {

private final Workflow<P> workflow;
private final boolean isCleaner;
private final boolean isEmptyWorkflow;
private final Map<String, DependentResource> dependentResourcesByName;

DefaultManagedWorkflow(KubernetesClient client,
List<DependentResourceSpec> dependentResourceSpecs,
ManagedWorkflowSupport managedWorkflowSupport) {
managedWorkflowSupport.checkForNameDuplication(dependentResourceSpecs);
dependentResourcesByName = dependentResourceSpecs
.stream().collect(Collectors.toMap(DependentResourceSpec::getName,
spec -> managedWorkflowSupport.createAndConfigureFrom(spec, client)));
private boolean resolved;

DefaultManagedWorkflow(List<DependentResourceSpec> dependentResourceSpecs, Workflow<P> workflow) {
isEmptyWorkflow = dependentResourceSpecs.isEmpty();
workflow =
managedWorkflowSupport.createWorkflow(dependentResourceSpecs, dependentResourcesByName);
isCleaner = checkIfCleaner();
this.workflow = workflow;
}

public WorkflowReconcileResult reconcile(P primary, Context<P> context) {
checkIfResolved();
return workflow.reconcile(primary, context);
}

public WorkflowCleanupResult cleanup(P primary, Context<P> context) {
checkIfResolved();
return workflow.cleanup(primary, context);
}

private boolean checkIfCleaner() {
for (var dr : workflow.getDependentResources()) {
if (dr instanceof Deleter && !(dr instanceof GarbageCollected)) {
return true;
}
}
return false;
}

public boolean isCleaner() {
return isCleaner;
return workflow.hasCleaner();
}

public boolean isEmptyWorkflow() {
return isEmptyWorkflow;
}

public Map<String, DependentResource> getDependentResourcesByName() {
return dependentResourcesByName;
checkIfResolved();
final var nodes = workflow.nodes();
final var result = new HashMap<String, DependentResource>(nodes.size());
nodes.forEach((key, drn) -> result.put(key, workflow.getDependentResourceFor(drn)));
return result;
}

@Override
public ManagedWorkflow<P> resolve(KubernetesClient client, List<DependentResourceSpec> specs) {
if (!resolved) {
workflow.resolve(client, specs);
resolved = true;
}
return this;
}

private void checkIfResolved() {
if (!resolved) {
throw new IllegalStateException("resolve should be called before");
}
}
}
Loading