-
Notifications
You must be signed in to change notification settings - Fork 219
feat: workflows can be created without associated dependent resource #1632
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 16 commits
87b4f6c
f671a21
e02b038
aa508e8
5e12ceb
8df1b32
27145f8
d31eb4b
b662118
52a3686
ccaa524
3813feb
da9ca92
46e8cdf
747aefe
9696748
6d597ba
7b4a50f
0a1a381
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -64,4 +64,8 @@ default Optional<R> getSecondaryResource(P primary, Context<P> context) { | |
static String defaultNameFor(Class<? extends DependentResource> dependentResourceClass) { | ||
return dependentResourceClass.getName(); | ||
} | ||
|
||
static boolean canDeleteIfAble(DependentResource<?, ?> dependentResource) { | ||
return dependentResource instanceof Deleter && !(dependentResource instanceof GarbageCollected); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Would rather just check There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This isn't the case in the tests, though… There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Hmm where exactly? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. See 0a1a381 |
||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
package io.javaoperatorsdk.operator.processing.dependent.workflow; | ||
|
||
import java.util.LinkedList; | ||
import java.util.List; | ||
import java.util.Optional; | ||
|
||
import io.fabric8.kubernetes.api.model.HasMetadata; | ||
import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource; | ||
|
||
@SuppressWarnings("rawtypes") | ||
abstract class AbstractDependentResourceNode<R, P extends HasMetadata> | ||
implements DependentResourceNode<R, P> { | ||
|
||
private final List<DependentResourceNode> dependsOn = new LinkedList<>(); | ||
private final List<DependentResourceNode> parents = new LinkedList<>(); | ||
private final String name; | ||
private Condition<R, P> reconcilePrecondition; | ||
private Condition<R, P> deletePostcondition; | ||
private Condition<R, P> readyPostcondition; | ||
private DependentResource<R, P> dependentResource; | ||
|
||
protected AbstractDependentResourceNode(String name) { | ||
this.name = name; | ||
} | ||
|
||
@Override | ||
public List<? extends DependentResourceNode> getDependsOn() { | ||
return dependsOn; | ||
} | ||
|
||
@Override | ||
public void addParent(DependentResourceNode parent) { | ||
parents.add(parent); | ||
} | ||
|
||
@Override | ||
public void addDependsOnRelation(DependentResourceNode node) { | ||
node.addParent(this); | ||
dependsOn.add(node); | ||
} | ||
|
||
@Override | ||
public List<DependentResourceNode> getParents() { | ||
return parents; | ||
} | ||
|
||
@Override | ||
public String getName() { | ||
return name; | ||
} | ||
|
||
@Override | ||
public Optional<Condition<R, P>> getReconcilePrecondition() { | ||
return Optional.ofNullable(reconcilePrecondition); | ||
} | ||
|
||
@Override | ||
public Optional<Condition<R, P>> getDeletePostcondition() { | ||
return Optional.ofNullable(deletePostcondition); | ||
} | ||
|
||
public void setReconcilePrecondition(Condition<R, P> reconcilePrecondition) { | ||
this.reconcilePrecondition = reconcilePrecondition; | ||
} | ||
|
||
public void setDeletePostcondition(Condition<R, P> cleanupCondition) { | ||
this.deletePostcondition = cleanupCondition; | ||
} | ||
|
||
@Override | ||
public Optional<Condition<R, P>> getReadyPostcondition() { | ||
return Optional.ofNullable(readyPostcondition); | ||
} | ||
|
||
public void setReadyPostcondition(Condition<R, P> readyPostcondition) { | ||
this.readyPostcondition = readyPostcondition; | ||
} | ||
|
||
public DependentResource<R, P> getDependentResource() { | ||
return dependentResource; | ||
} | ||
|
||
public void setDependentResource(DependentResource<R, P> dependentResource) { | ||
this.dependentResource = dependentResource; | ||
} | ||
|
||
@Override | ||
public boolean equals(Object o) { | ||
if (this == o) { | ||
return true; | ||
} | ||
if (o == null || getClass() != o.getClass()) { | ||
return false; | ||
} | ||
AbstractDependentResourceNode<?, ?> that = (AbstractDependentResourceNode<?, ?>) o; | ||
return name.equals(that.name); | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
return name.hashCode(); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,114 @@ | ||
package io.javaoperatorsdk.operator.processing.dependent.workflow; | ||
|
||
import java.util.HashMap; | ||
import java.util.Map; | ||
import java.util.Map.Entry; | ||
import java.util.Optional; | ||
import java.util.Set; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
import java.util.concurrent.Future; | ||
import java.util.stream.Collectors; | ||
|
||
import org.slf4j.Logger; | ||
|
||
import io.fabric8.kubernetes.api.model.HasMetadata; | ||
import io.javaoperatorsdk.operator.api.reconciler.Context; | ||
import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource; | ||
|
||
@SuppressWarnings("rawtypes") | ||
public abstract class AbstractWorkflowExecutor<P extends HasMetadata> { | ||
|
||
protected final Workflow<P> workflow; | ||
protected final P primary; | ||
protected final Context<P> context; | ||
/** | ||
* Covers both deleted and reconciled | ||
*/ | ||
private final Set<DependentResourceNode> alreadyVisited = ConcurrentHashMap.newKeySet(); | ||
private final Map<DependentResourceNode, Future<?>> actualExecutions = new HashMap<>(); | ||
private final Map<DependentResourceNode, Exception> exceptionsDuringExecution = | ||
new ConcurrentHashMap<>(); | ||
|
||
public AbstractWorkflowExecutor(Workflow<P> workflow, P primary, Context<P> context) { | ||
this.workflow = workflow; | ||
this.primary = primary; | ||
this.context = context; | ||
} | ||
|
||
protected abstract Logger logger(); | ||
|
||
protected synchronized void waitForScheduledExecutionsToRun() { | ||
while (true) { | ||
try { | ||
this.wait(); | ||
if (noMoreExecutionsScheduled()) { | ||
break; | ||
} else { | ||
logger().warn("Notified but still resources under execution. This should not happen."); | ||
} | ||
} catch (InterruptedException e) { | ||
logger().warn("Thread interrupted", e); | ||
Thread.currentThread().interrupt(); | ||
} | ||
} | ||
} | ||
|
||
protected boolean noMoreExecutionsScheduled() { | ||
return actualExecutions.isEmpty(); | ||
} | ||
|
||
protected boolean alreadyVisited(DependentResourceNode<?, P> dependentResourceNode) { | ||
return alreadyVisited.contains(dependentResourceNode); | ||
} | ||
|
||
protected void markAsVisited(DependentResourceNode<?, P> dependentResourceNode) { | ||
alreadyVisited.add(dependentResourceNode); | ||
} | ||
|
||
protected boolean isExecutingNow(DependentResourceNode<?, P> dependentResourceNode) { | ||
return actualExecutions.containsKey(dependentResourceNode); | ||
} | ||
|
||
protected void markAsExecuting(DependentResourceNode<?, P> dependentResourceNode, | ||
Future<?> future) { | ||
actualExecutions.put(dependentResourceNode, future); | ||
} | ||
|
||
protected synchronized void handleExceptionInExecutor( | ||
DependentResourceNode<?, P> dependentResourceNode, | ||
RuntimeException e) { | ||
exceptionsDuringExecution.put(dependentResourceNode, e); | ||
} | ||
|
||
protected boolean isInError(DependentResourceNode<?, P> dependentResourceNode) { | ||
return exceptionsDuringExecution.containsKey(dependentResourceNode); | ||
} | ||
|
||
protected Map<DependentResource, Exception> getErroredDependents() { | ||
return exceptionsDuringExecution.entrySet().stream() | ||
.collect( | ||
Collectors.toMap(e -> workflow.getDependentResourceFor(e.getKey()), Entry::getValue)); | ||
} | ||
|
||
protected synchronized void handleNodeExecutionFinish( | ||
DependentResourceNode<?, P> dependentResourceNode) { | ||
logger().debug("Finished execution for: {}", dependentResourceNode); | ||
actualExecutions.remove(dependentResourceNode); | ||
if (noMoreExecutionsScheduled()) { | ||
this.notifyAll(); | ||
} | ||
} | ||
|
||
@SuppressWarnings("unchecked") | ||
protected <R> DependentResource<R, P> getDependentResourceFor(DependentResourceNode<R, P> drn) { | ||
return (DependentResource<R, P>) workflow.getDependentResourceFor(drn); | ||
} | ||
|
||
protected <R> boolean isConditionMet(Optional<Condition<R, P>> condition, | ||
DependentResource<R, P> dependentResource) { | ||
return condition.map(c -> c.isMet(primary, | ||
dependentResource.getSecondaryResource(primary, context).orElse(null), | ||
context)) | ||
.orElse(true); | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
package io.javaoperatorsdk.operator.processing.dependent.workflow; | ||
|
||
import io.fabric8.kubernetes.api.model.HasMetadata; | ||
import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource; | ||
|
||
class DefaultDependentResourceNode<R, P extends HasMetadata> | ||
extends AbstractDependentResourceNode<R, P> { | ||
|
||
public DefaultDependentResourceNode(DependentResource<R, P> dependentResource) { | ||
this(dependentResource, null, null); | ||
} | ||
|
||
public DefaultDependentResourceNode(DependentResource<R, P> dependentResource, | ||
Condition<R, P> reconcilePrecondition, Condition<R, P> deletePostcondition) { | ||
super(getNameFor(dependentResource)); | ||
setDependentResource(dependentResource); | ||
setReconcilePrecondition(reconcilePrecondition); | ||
setDeletePostcondition(deletePostcondition); | ||
} | ||
|
||
@SuppressWarnings("rawtypes") | ||
static String getNameFor(DependentResource dependentResource) { | ||
return DependentResource.defaultNameFor(dependentResource.getClass()) + "#" | ||
+ dependentResource.hashCode(); | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return "DependentResourceNode{" + getDependentResource() + '}'; | ||
} | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,65 +1,65 @@ | ||
package io.javaoperatorsdk.operator.processing.dependent.workflow; | ||
|
||
import java.util.HashMap; | ||
import java.util.List; | ||
import java.util.Map; | ||
import java.util.stream.Collectors; | ||
|
||
import io.fabric8.kubernetes.api.model.HasMetadata; | ||
import io.fabric8.kubernetes.client.KubernetesClient; | ||
import io.javaoperatorsdk.operator.api.config.dependent.DependentResourceSpec; | ||
import io.javaoperatorsdk.operator.api.reconciler.Context; | ||
import io.javaoperatorsdk.operator.api.reconciler.dependent.Deleter; | ||
import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource; | ||
import io.javaoperatorsdk.operator.api.reconciler.dependent.GarbageCollected; | ||
|
||
@SuppressWarnings("rawtypes") | ||
public class DefaultManagedWorkflow<P extends HasMetadata> implements ManagedWorkflow<P> { | ||
|
||
private final Workflow<P> workflow; | ||
private final boolean isCleaner; | ||
private final boolean isEmptyWorkflow; | ||
private final Map<String, DependentResource> dependentResourcesByName; | ||
|
||
DefaultManagedWorkflow(KubernetesClient client, | ||
List<DependentResourceSpec> dependentResourceSpecs, | ||
ManagedWorkflowSupport managedWorkflowSupport) { | ||
managedWorkflowSupport.checkForNameDuplication(dependentResourceSpecs); | ||
dependentResourcesByName = dependentResourceSpecs | ||
.stream().collect(Collectors.toMap(DependentResourceSpec::getName, | ||
spec -> managedWorkflowSupport.createAndConfigureFrom(spec, client))); | ||
private boolean resolved; | ||
|
||
DefaultManagedWorkflow(List<DependentResourceSpec> dependentResourceSpecs, Workflow<P> workflow) { | ||
isEmptyWorkflow = dependentResourceSpecs.isEmpty(); | ||
workflow = | ||
managedWorkflowSupport.createWorkflow(dependentResourceSpecs, dependentResourcesByName); | ||
isCleaner = checkIfCleaner(); | ||
this.workflow = workflow; | ||
} | ||
|
||
public WorkflowReconcileResult reconcile(P primary, Context<P> context) { | ||
checkIfResolved(); | ||
return workflow.reconcile(primary, context); | ||
} | ||
|
||
public WorkflowCleanupResult cleanup(P primary, Context<P> context) { | ||
checkIfResolved(); | ||
return workflow.cleanup(primary, context); | ||
} | ||
|
||
private boolean checkIfCleaner() { | ||
for (var dr : workflow.getDependentResources()) { | ||
if (dr instanceof Deleter && !(dr instanceof GarbageCollected)) { | ||
return true; | ||
} | ||
} | ||
return false; | ||
} | ||
|
||
public boolean isCleaner() { | ||
return isCleaner; | ||
return workflow.hasCleaner(); | ||
} | ||
|
||
public boolean isEmptyWorkflow() { | ||
return isEmptyWorkflow; | ||
} | ||
|
||
public Map<String, DependentResource> getDependentResourcesByName() { | ||
return dependentResourcesByName; | ||
checkIfResolved(); | ||
final var nodes = workflow.nodes(); | ||
final var result = new HashMap<String, DependentResource>(nodes.size()); | ||
nodes.forEach((key, drn) -> result.put(key, workflow.getDependentResourceFor(drn))); | ||
return result; | ||
} | ||
|
||
@Override | ||
public ManagedWorkflow<P> resolve(KubernetesClient client, List<DependentResourceSpec> specs) { | ||
if (!resolved) { | ||
workflow.resolve(client, specs); | ||
resolved = true; | ||
} | ||
return this; | ||
} | ||
|
||
private void checkIfResolved() { | ||
if (!resolved) { | ||
throw new IllegalStateException("resolve should be called before"); | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this naming somehow feels strange
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What would you name it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
deletable
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also problem here is that GarbageCollected is only applicatble for KubernetesDependentResource, so it is little smelly that it's on this layer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, wasn't too sure where to put it but the problem is that the code in this method was spread at different spots that didn't have anything to do with
KubernetesDependentResource
either so it's less smelly now than it was.