Skip to content

Commit 0fc616f

Browse files
authored
feat: workflows can be created without associated dependent resource (#1632)
This is a first step towards workflow graphs being resolved at build time from a factory, the dependent resources being injected as needed at runtime via the resolve mechanism.
1 parent c6fb5f1 commit 0fc616f

26 files changed

+709
-510
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/ConfigurationService.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
import io.javaoperatorsdk.operator.api.monitoring.Metrics;
1717
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
1818
import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResourceFactory;
19+
import io.javaoperatorsdk.operator.processing.dependent.workflow.ManagedWorkflowFactory;
1920

2021
import com.fasterxml.jackson.core.JsonProcessingException;
2122
import com.fasterxml.jackson.databind.ObjectMapper;
@@ -193,4 +194,8 @@ default Optional<InformerStoppedHandler> getInformerStoppedHandler() {
193194
}
194195
});
195196
}
197+
198+
default ManagedWorkflowFactory getWorkflowFactory() {
199+
return ManagedWorkflowFactory.DEFAULT;
200+
}
196201
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/dependent/DependentResource.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,4 +64,8 @@ default Optional<R> getSecondaryResource(P primary, Context<P> context) {
6464
static String defaultNameFor(Class<? extends DependentResource> dependentResourceClass) {
6565
return dependentResourceClass.getName();
6666
}
67+
68+
default boolean isDeletable() {
69+
return this instanceof Deleter;
70+
}
6771
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/Controller.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -77,12 +77,12 @@ public Controller(Reconciler<P> reconciler,
7777
this.reconciler = reconciler;
7878
this.configuration = configuration;
7979
this.kubernetesClient = kubernetesClient;
80-
this.metrics = Optional.ofNullable(ConfigurationServiceProvider.instance().getMetrics())
81-
.orElse(Metrics.NOOP);
80+
final var configurationService = ConfigurationServiceProvider.instance();
81+
this.metrics = Optional.ofNullable(configurationService.getMetrics()).orElse(Metrics.NOOP);
8282
contextInitializer = reconciler instanceof ContextInitializer;
8383
isCleaner = reconciler instanceof Cleaner;
84-
managedWorkflow =
85-
ManagedWorkflow.workflowFor(kubernetesClient, configuration.getDependentResources());
84+
managedWorkflow = configurationService.getWorkflowFactory().workflowFor(configuration);
85+
managedWorkflow.resolve(kubernetesClient, configuration.getDependentResources());
8686

8787
eventSourceManager = new EventSourceManager<>(this);
8888
eventProcessor = new EventProcessor<>(eventSourceManager);

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/dependent/kubernetes/KubernetesDependentResource.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -294,4 +294,9 @@ public KubernetesDependentResourceConfig configFrom(KubernetesDependent kubeDepe
294294
public Optional<KubernetesDependentResourceConfig<R>> configuration() {
295295
return Optional.ofNullable(kubernetesDependentResourceConfig);
296296
}
297+
298+
@Override
299+
public boolean isDeletable() {
300+
return super.isDeletable() && !garbageCollected;
301+
}
297302
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,103 @@
1+
package io.javaoperatorsdk.operator.processing.dependent.workflow;
2+
3+
import java.util.LinkedList;
4+
import java.util.List;
5+
import java.util.Optional;
6+
7+
import io.fabric8.kubernetes.api.model.HasMetadata;
8+
import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource;
9+
10+
@SuppressWarnings("rawtypes")
11+
abstract class AbstractDependentResourceNode<R, P extends HasMetadata>
12+
implements DependentResourceNode<R, P> {
13+
14+
private final List<DependentResourceNode> dependsOn = new LinkedList<>();
15+
private final List<DependentResourceNode> parents = new LinkedList<>();
16+
private final String name;
17+
private Condition<R, P> reconcilePrecondition;
18+
private Condition<R, P> deletePostcondition;
19+
private Condition<R, P> readyPostcondition;
20+
private DependentResource<R, P> dependentResource;
21+
22+
protected AbstractDependentResourceNode(String name) {
23+
this.name = name;
24+
}
25+
26+
@Override
27+
public List<? extends DependentResourceNode> getDependsOn() {
28+
return dependsOn;
29+
}
30+
31+
@Override
32+
public void addParent(DependentResourceNode parent) {
33+
parents.add(parent);
34+
}
35+
36+
@Override
37+
public void addDependsOnRelation(DependentResourceNode node) {
38+
node.addParent(this);
39+
dependsOn.add(node);
40+
}
41+
42+
@Override
43+
public List<DependentResourceNode> getParents() {
44+
return parents;
45+
}
46+
47+
@Override
48+
public String getName() {
49+
return name;
50+
}
51+
52+
@Override
53+
public Optional<Condition<R, P>> getReconcilePrecondition() {
54+
return Optional.ofNullable(reconcilePrecondition);
55+
}
56+
57+
@Override
58+
public Optional<Condition<R, P>> getDeletePostcondition() {
59+
return Optional.ofNullable(deletePostcondition);
60+
}
61+
62+
public void setReconcilePrecondition(Condition<R, P> reconcilePrecondition) {
63+
this.reconcilePrecondition = reconcilePrecondition;
64+
}
65+
66+
public void setDeletePostcondition(Condition<R, P> cleanupCondition) {
67+
this.deletePostcondition = cleanupCondition;
68+
}
69+
70+
@Override
71+
public Optional<Condition<R, P>> getReadyPostcondition() {
72+
return Optional.ofNullable(readyPostcondition);
73+
}
74+
75+
public void setReadyPostcondition(Condition<R, P> readyPostcondition) {
76+
this.readyPostcondition = readyPostcondition;
77+
}
78+
79+
public DependentResource<R, P> getDependentResource() {
80+
return dependentResource;
81+
}
82+
83+
public void setDependentResource(DependentResource<R, P> dependentResource) {
84+
this.dependentResource = dependentResource;
85+
}
86+
87+
@Override
88+
public boolean equals(Object o) {
89+
if (this == o) {
90+
return true;
91+
}
92+
if (o == null || getClass() != o.getClass()) {
93+
return false;
94+
}
95+
AbstractDependentResourceNode<?, ?> that = (AbstractDependentResourceNode<?, ?>) o;
96+
return name.equals(that.name);
97+
}
98+
99+
@Override
100+
public int hashCode() {
101+
return name.hashCode();
102+
}
103+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,114 @@
1+
package io.javaoperatorsdk.operator.processing.dependent.workflow;
2+
3+
import java.util.HashMap;
4+
import java.util.Map;
5+
import java.util.Map.Entry;
6+
import java.util.Optional;
7+
import java.util.Set;
8+
import java.util.concurrent.ConcurrentHashMap;
9+
import java.util.concurrent.Future;
10+
import java.util.stream.Collectors;
11+
12+
import org.slf4j.Logger;
13+
14+
import io.fabric8.kubernetes.api.model.HasMetadata;
15+
import io.javaoperatorsdk.operator.api.reconciler.Context;
16+
import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource;
17+
18+
@SuppressWarnings("rawtypes")
19+
public abstract class AbstractWorkflowExecutor<P extends HasMetadata> {
20+
21+
protected final Workflow<P> workflow;
22+
protected final P primary;
23+
protected final Context<P> context;
24+
/**
25+
* Covers both deleted and reconciled
26+
*/
27+
private final Set<DependentResourceNode> alreadyVisited = ConcurrentHashMap.newKeySet();
28+
private final Map<DependentResourceNode, Future<?>> actualExecutions = new HashMap<>();
29+
private final Map<DependentResourceNode, Exception> exceptionsDuringExecution =
30+
new ConcurrentHashMap<>();
31+
32+
public AbstractWorkflowExecutor(Workflow<P> workflow, P primary, Context<P> context) {
33+
this.workflow = workflow;
34+
this.primary = primary;
35+
this.context = context;
36+
}
37+
38+
protected abstract Logger logger();
39+
40+
protected synchronized void waitForScheduledExecutionsToRun() {
41+
while (true) {
42+
try {
43+
this.wait();
44+
if (noMoreExecutionsScheduled()) {
45+
break;
46+
} else {
47+
logger().warn("Notified but still resources under execution. This should not happen.");
48+
}
49+
} catch (InterruptedException e) {
50+
logger().warn("Thread interrupted", e);
51+
Thread.currentThread().interrupt();
52+
}
53+
}
54+
}
55+
56+
protected boolean noMoreExecutionsScheduled() {
57+
return actualExecutions.isEmpty();
58+
}
59+
60+
protected boolean alreadyVisited(DependentResourceNode<?, P> dependentResourceNode) {
61+
return alreadyVisited.contains(dependentResourceNode);
62+
}
63+
64+
protected void markAsVisited(DependentResourceNode<?, P> dependentResourceNode) {
65+
alreadyVisited.add(dependentResourceNode);
66+
}
67+
68+
protected boolean isExecutingNow(DependentResourceNode<?, P> dependentResourceNode) {
69+
return actualExecutions.containsKey(dependentResourceNode);
70+
}
71+
72+
protected void markAsExecuting(DependentResourceNode<?, P> dependentResourceNode,
73+
Future<?> future) {
74+
actualExecutions.put(dependentResourceNode, future);
75+
}
76+
77+
protected synchronized void handleExceptionInExecutor(
78+
DependentResourceNode<?, P> dependentResourceNode,
79+
RuntimeException e) {
80+
exceptionsDuringExecution.put(dependentResourceNode, e);
81+
}
82+
83+
protected boolean isInError(DependentResourceNode<?, P> dependentResourceNode) {
84+
return exceptionsDuringExecution.containsKey(dependentResourceNode);
85+
}
86+
87+
protected Map<DependentResource, Exception> getErroredDependents() {
88+
return exceptionsDuringExecution.entrySet().stream()
89+
.collect(
90+
Collectors.toMap(e -> workflow.getDependentResourceFor(e.getKey()), Entry::getValue));
91+
}
92+
93+
protected synchronized void handleNodeExecutionFinish(
94+
DependentResourceNode<?, P> dependentResourceNode) {
95+
logger().debug("Finished execution for: {}", dependentResourceNode);
96+
actualExecutions.remove(dependentResourceNode);
97+
if (noMoreExecutionsScheduled()) {
98+
this.notifyAll();
99+
}
100+
}
101+
102+
@SuppressWarnings("unchecked")
103+
protected <R> DependentResource<R, P> getDependentResourceFor(DependentResourceNode<R, P> drn) {
104+
return (DependentResource<R, P>) workflow.getDependentResourceFor(drn);
105+
}
106+
107+
protected <R> boolean isConditionMet(Optional<Condition<R, P>> condition,
108+
DependentResource<R, P> dependentResource) {
109+
return condition.map(c -> c.isMet(primary,
110+
dependentResource.getSecondaryResource(primary, context).orElse(null),
111+
context))
112+
.orElse(true);
113+
}
114+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package io.javaoperatorsdk.operator.processing.dependent.workflow;
2+
3+
import io.fabric8.kubernetes.api.model.HasMetadata;
4+
import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource;
5+
6+
class DefaultDependentResourceNode<R, P extends HasMetadata>
7+
extends AbstractDependentResourceNode<R, P> {
8+
9+
public DefaultDependentResourceNode(DependentResource<R, P> dependentResource) {
10+
this(dependentResource, null, null);
11+
}
12+
13+
public DefaultDependentResourceNode(DependentResource<R, P> dependentResource,
14+
Condition<R, P> reconcilePrecondition, Condition<R, P> deletePostcondition) {
15+
super(getNameFor(dependentResource));
16+
setDependentResource(dependentResource);
17+
setReconcilePrecondition(reconcilePrecondition);
18+
setDeletePostcondition(deletePostcondition);
19+
}
20+
21+
@SuppressWarnings("rawtypes")
22+
static String getNameFor(DependentResource dependentResource) {
23+
return DependentResource.defaultNameFor(dependentResource.getClass()) + "#"
24+
+ dependentResource.hashCode();
25+
}
26+
27+
@Override
28+
public String toString() {
29+
return "DependentResourceNode{" + getDependentResource() + '}';
30+
}
31+
}
Original file line numberDiff line numberDiff line change
@@ -1,65 +1,65 @@
11
package io.javaoperatorsdk.operator.processing.dependent.workflow;
22

3+
import java.util.HashMap;
34
import java.util.List;
45
import java.util.Map;
5-
import java.util.stream.Collectors;
66

77
import io.fabric8.kubernetes.api.model.HasMetadata;
88
import io.fabric8.kubernetes.client.KubernetesClient;
99
import io.javaoperatorsdk.operator.api.config.dependent.DependentResourceSpec;
1010
import io.javaoperatorsdk.operator.api.reconciler.Context;
11-
import io.javaoperatorsdk.operator.api.reconciler.dependent.Deleter;
1211
import io.javaoperatorsdk.operator.api.reconciler.dependent.DependentResource;
13-
import io.javaoperatorsdk.operator.api.reconciler.dependent.GarbageCollected;
1412

1513
@SuppressWarnings("rawtypes")
1614
public class DefaultManagedWorkflow<P extends HasMetadata> implements ManagedWorkflow<P> {
1715

1816
private final Workflow<P> workflow;
19-
private final boolean isCleaner;
2017
private final boolean isEmptyWorkflow;
21-
private final Map<String, DependentResource> dependentResourcesByName;
22-
23-
DefaultManagedWorkflow(KubernetesClient client,
24-
List<DependentResourceSpec> dependentResourceSpecs,
25-
ManagedWorkflowSupport managedWorkflowSupport) {
26-
managedWorkflowSupport.checkForNameDuplication(dependentResourceSpecs);
27-
dependentResourcesByName = dependentResourceSpecs
28-
.stream().collect(Collectors.toMap(DependentResourceSpec::getName,
29-
spec -> managedWorkflowSupport.createAndConfigureFrom(spec, client)));
18+
private boolean resolved;
3019

20+
DefaultManagedWorkflow(List<DependentResourceSpec> dependentResourceSpecs, Workflow<P> workflow) {
3121
isEmptyWorkflow = dependentResourceSpecs.isEmpty();
32-
workflow =
33-
managedWorkflowSupport.createWorkflow(dependentResourceSpecs, dependentResourcesByName);
34-
isCleaner = checkIfCleaner();
22+
this.workflow = workflow;
3523
}
3624

3725
public WorkflowReconcileResult reconcile(P primary, Context<P> context) {
26+
checkIfResolved();
3827
return workflow.reconcile(primary, context);
3928
}
4029

4130
public WorkflowCleanupResult cleanup(P primary, Context<P> context) {
31+
checkIfResolved();
4232
return workflow.cleanup(primary, context);
4333
}
4434

45-
private boolean checkIfCleaner() {
46-
for (var dr : workflow.getDependentResources()) {
47-
if (dr instanceof Deleter && !(dr instanceof GarbageCollected)) {
48-
return true;
49-
}
50-
}
51-
return false;
52-
}
53-
5435
public boolean isCleaner() {
55-
return isCleaner;
36+
return workflow.hasCleaner();
5637
}
5738

5839
public boolean isEmptyWorkflow() {
5940
return isEmptyWorkflow;
6041
}
6142

6243
public Map<String, DependentResource> getDependentResourcesByName() {
63-
return dependentResourcesByName;
44+
checkIfResolved();
45+
final var nodes = workflow.nodes();
46+
final var result = new HashMap<String, DependentResource>(nodes.size());
47+
nodes.forEach((key, drn) -> result.put(key, workflow.getDependentResourceFor(drn)));
48+
return result;
49+
}
50+
51+
@Override
52+
public ManagedWorkflow<P> resolve(KubernetesClient client, List<DependentResourceSpec> specs) {
53+
if (!resolved) {
54+
workflow.resolve(client, specs);
55+
resolved = true;
56+
}
57+
return this;
58+
}
59+
60+
private void checkIfResolved() {
61+
if (!resolved) {
62+
throw new IllegalStateException("resolve should be called before");
63+
}
6464
}
6565
}

0 commit comments

Comments
 (0)