Skip to content

Commit 54a65a6

Browse files
authored
Merge branch 'next' into generalized_match
2 parents 7bd9674 + d7b5b87 commit 54a65a6

File tree

8 files changed

+179
-56
lines changed

8 files changed

+179
-56
lines changed

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/LeaderElectionManager.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,19 +66,28 @@ private void init(LeaderElectionConfiguration config) {
6666
config.getLeaseDuration(),
6767
config.getRenewDeadline(),
6868
config.getRetryPeriod(),
69-
leaderCallbacks(),
69+
leaderCallbacks(config),
7070
true,
7171
config.getLeaseName()))
7272
.build();
7373
}
7474

7575

7676

77-
private LeaderCallbacks leaderCallbacks() {
77+
private LeaderCallbacks leaderCallbacks(LeaderElectionConfiguration config) {
7878
return new LeaderCallbacks(
79-
this::startLeading,
80-
this::stopLeading,
81-
leader -> log.info("New leader with identity: {}", leader));
79+
() -> {
80+
config.getLeaderCallbacks().ifPresent(LeaderCallbacks::onStartLeading);
81+
LeaderElectionManager.this.startLeading();
82+
},
83+
() -> {
84+
config.getLeaderCallbacks().ifPresent(LeaderCallbacks::onStopLeading);
85+
LeaderElectionManager.this.stopLeading();
86+
},
87+
leader -> {
88+
config.getLeaderCallbacks().ifPresent(cb -> cb.onNewLeader(leader));
89+
log.info("New leader with identity: {}", leader);
90+
});
8291
}
8392

8493
private void startLeading() {

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/config/LeaderElectionConfiguration.java

Lines changed: 15 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
import java.time.Duration;
44
import java.util.Optional;
55

6+
import io.fabric8.kubernetes.client.extended.leaderelection.LeaderCallbacks;
7+
68
public class LeaderElectionConfiguration {
79

810
public static final Duration LEASE_DURATION_DEFAULT_VALUE = Duration.ofSeconds(15);
@@ -17,13 +19,15 @@ public class LeaderElectionConfiguration {
1719
private final Duration renewDeadline;
1820
private final Duration retryPeriod;
1921

22+
private final LeaderCallbacks leaderCallbacks;
23+
2024
public LeaderElectionConfiguration(String leaseName, String leaseNamespace, String identity) {
2125
this(
2226
leaseName,
2327
leaseNamespace,
2428
LEASE_DURATION_DEFAULT_VALUE,
2529
RENEW_DEADLINE_DEFAULT_VALUE,
26-
RETRY_PERIOD_DEFAULT_VALUE, identity);
30+
RETRY_PERIOD_DEFAULT_VALUE, identity, null);
2731
}
2832

2933
public LeaderElectionConfiguration(String leaseName, String leaseNamespace) {
@@ -32,7 +36,7 @@ public LeaderElectionConfiguration(String leaseName, String leaseNamespace) {
3236
leaseNamespace,
3337
LEASE_DURATION_DEFAULT_VALUE,
3438
RENEW_DEADLINE_DEFAULT_VALUE,
35-
RETRY_PERIOD_DEFAULT_VALUE, null);
39+
RETRY_PERIOD_DEFAULT_VALUE, null, null);
3640
}
3741

3842
public LeaderElectionConfiguration(String leaseName) {
@@ -41,7 +45,7 @@ public LeaderElectionConfiguration(String leaseName) {
4145
null,
4246
LEASE_DURATION_DEFAULT_VALUE,
4347
RENEW_DEADLINE_DEFAULT_VALUE,
44-
RETRY_PERIOD_DEFAULT_VALUE, null);
48+
RETRY_PERIOD_DEFAULT_VALUE, null, null);
4549
}
4650

4751
public LeaderElectionConfiguration(
@@ -50,7 +54,7 @@ public LeaderElectionConfiguration(
5054
Duration leaseDuration,
5155
Duration renewDeadline,
5256
Duration retryPeriod) {
53-
this(leaseName, leaseNamespace, leaseDuration, renewDeadline, retryPeriod, null);
57+
this(leaseName, leaseNamespace, leaseDuration, renewDeadline, retryPeriod, null, null);
5458
}
5559

5660
public LeaderElectionConfiguration(
@@ -59,13 +63,15 @@ public LeaderElectionConfiguration(
5963
Duration leaseDuration,
6064
Duration renewDeadline,
6165
Duration retryPeriod,
62-
String identity) {
66+
String identity,
67+
LeaderCallbacks leaderCallbacks) {
6368
this.leaseName = leaseName;
6469
this.leaseNamespace = leaseNamespace;
6570
this.leaseDuration = leaseDuration;
6671
this.renewDeadline = renewDeadline;
6772
this.retryPeriod = retryPeriod;
6873
this.identity = identity;
74+
this.leaderCallbacks = leaderCallbacks;
6975
}
7076

7177
public Optional<String> getLeaseNamespace() {
@@ -91,4 +97,8 @@ public Duration getRetryPeriod() {
9197
public Optional<String> getIdentity() {
9298
return Optional.ofNullable(identity);
9399
}
100+
101+
public Optional<LeaderCallbacks> getLeaderCallbacks() {
102+
return Optional.ofNullable(leaderCallbacks);
103+
}
94104
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
package io.javaoperatorsdk.operator.api.config;
2+
3+
import java.time.Duration;
4+
5+
import io.fabric8.kubernetes.client.extended.leaderelection.LeaderCallbacks;
6+
7+
import static io.javaoperatorsdk.operator.api.config.LeaderElectionConfiguration.*;
8+
9+
public final class LeaderElectionConfigurationBuilder {
10+
11+
private String leaseName;
12+
private String leaseNamespace;
13+
private String identity;
14+
private Duration leaseDuration = LEASE_DURATION_DEFAULT_VALUE;
15+
private Duration renewDeadline = RENEW_DEADLINE_DEFAULT_VALUE;
16+
private Duration retryPeriod = RETRY_PERIOD_DEFAULT_VALUE;
17+
private LeaderCallbacks leaderCallbacks;
18+
19+
private LeaderElectionConfigurationBuilder(String leaseName) {
20+
this.leaseName = leaseName;
21+
}
22+
23+
public static LeaderElectionConfigurationBuilder aLeaderElectionConfiguration(String leaseName) {
24+
return new LeaderElectionConfigurationBuilder(leaseName);
25+
}
26+
27+
public LeaderElectionConfigurationBuilder withLeaseNamespace(String leaseNamespace) {
28+
this.leaseNamespace = leaseNamespace;
29+
return this;
30+
}
31+
32+
public LeaderElectionConfigurationBuilder withIdentity(String identity) {
33+
this.identity = identity;
34+
return this;
35+
}
36+
37+
public LeaderElectionConfigurationBuilder withLeaseDuration(Duration leaseDuration) {
38+
this.leaseDuration = leaseDuration;
39+
return this;
40+
}
41+
42+
public LeaderElectionConfigurationBuilder withRenewDeadline(Duration renewDeadline) {
43+
this.renewDeadline = renewDeadline;
44+
return this;
45+
}
46+
47+
public LeaderElectionConfigurationBuilder withRetryPeriod(Duration retryPeriod) {
48+
this.retryPeriod = retryPeriod;
49+
return this;
50+
}
51+
52+
public LeaderElectionConfigurationBuilder withLeaderCallbacks(LeaderCallbacks leaderCallbacks) {
53+
this.leaderCallbacks = leaderCallbacks;
54+
return this;
55+
}
56+
57+
public LeaderElectionConfiguration build() {
58+
return new LeaderElectionConfiguration(leaseName, leaseNamespace, leaseDuration, renewDeadline,
59+
retryPeriod, identity, leaderCallbacks);
60+
}
61+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
package io.javaoperatorsdk.operator.api.reconciler;
2+
3+
import java.util.Optional;
4+
import java.util.function.Function;
5+
6+
import io.fabric8.kubernetes.api.model.HasMetadata;
7+
import io.javaoperatorsdk.operator.processing.event.source.informer.InformerEventSource;
8+
9+
/**
10+
* Uses a custom index of {@link InformerEventSource} to access the target resource. The index needs
11+
* to be explicitly created when the event source is defined. This approach improves the performance
12+
* to access the resource.
13+
*/
14+
public class IndexDiscriminator<R extends HasMetadata, P extends HasMetadata>
15+
implements ResourceDiscriminator<R, P> {
16+
17+
private final String indexName;
18+
private final String eventSourceName;
19+
private final Function<P, String> keyMapper;
20+
21+
public IndexDiscriminator(String indexName, Function<P, String> keyMapper) {
22+
this(indexName, null, keyMapper);
23+
}
24+
25+
public IndexDiscriminator(String indexName, String eventSourceName,
26+
Function<P, String> keyMapper) {
27+
this.indexName = indexName;
28+
this.eventSourceName = eventSourceName;
29+
this.keyMapper = keyMapper;
30+
}
31+
32+
@Override
33+
public Optional<R> distinguish(Class<R> resource,
34+
P primary,
35+
Context<P> context) {
36+
37+
InformerEventSource<R, P> eventSource =
38+
(InformerEventSource<R, P>) context
39+
.eventSourceRetriever()
40+
.getResourceEventSourceFor(resource, eventSourceName);
41+
var resources = eventSource.byIndex(indexName, keyMapper.apply(primary));
42+
if (resources.isEmpty()) {
43+
return Optional.empty();
44+
} else if (resources.size() > 1) {
45+
throw new IllegalStateException("More than one resource found");
46+
} else {
47+
return Optional.of(resources.get(0));
48+
}
49+
}
50+
}

operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ResourceIDMatcherDiscriminator.java

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,21 +5,41 @@
55

66
import io.fabric8.kubernetes.api.model.HasMetadata;
77
import io.javaoperatorsdk.operator.processing.event.ResourceID;
8+
import io.javaoperatorsdk.operator.processing.event.source.Cache;
89

910
public class ResourceIDMatcherDiscriminator<R extends HasMetadata, P extends HasMetadata>
1011
implements ResourceDiscriminator<R, P> {
1112

13+
14+
private final String eventSourceName;
1215
private final Function<P, ResourceID> mapper;
1316

1417
public ResourceIDMatcherDiscriminator(Function<P, ResourceID> mapper) {
18+
this(null, mapper);
19+
}
20+
21+
public ResourceIDMatcherDiscriminator(String eventSourceName, Function<P, ResourceID> mapper) {
22+
this.eventSourceName = eventSourceName;
1523
this.mapper = mapper;
1624
}
1725

26+
@SuppressWarnings("unchecked")
1827
@Override
1928
public Optional<R> distinguish(Class<R> resource, P primary, Context<P> context) {
2029
var resourceID = mapper.apply(primary);
21-
return context.getSecondaryResourcesAsStream(resource)
22-
.filter(resourceID::isSameResource)
23-
.findFirst();
30+
if (eventSourceName != null) {
31+
return ((Cache<R>) context.eventSourceRetriever().getResourceEventSourceFor(resource,
32+
eventSourceName))
33+
.get(resourceID);
34+
} else {
35+
var eventSources = context.eventSourceRetriever().getResourceEventSourcesFor(resource);
36+
if (eventSources.size() == 1) {
37+
return ((Cache<R>) eventSources.get(0)).get(resourceID);
38+
} else {
39+
return context.getSecondaryResourcesAsStream(resource)
40+
.filter(resourceID::isSameResource)
41+
.findFirst();
42+
}
43+
}
2444
}
2545
}

operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/indexdiscriminator/IndexDiscriminator.java

Lines changed: 0 additions & 41 deletions
This file was deleted.

operator-framework/src/test/java/io/javaoperatorsdk/operator/sample/indexdiscriminator/IndexDiscriminatorTestReconciler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,10 +81,10 @@ public Map<String, EventSource> prepareEventSources(
8181

8282
firstDependentResourceConfigMap
8383
.setResourceDiscriminator(
84-
new IndexDiscriminator(CONFIG_MAP_INDEX_1, FIRST_CONFIG_MAP_SUFFIX_1));
84+
new TestIndexDiscriminator(CONFIG_MAP_INDEX_1, FIRST_CONFIG_MAP_SUFFIX_1));
8585
secondDependentResourceConfigMap
8686
.setResourceDiscriminator(
87-
new IndexDiscriminator(CONFIG_MAP_INDEX_2, FIRST_CONFIG_MAP_SUFFIX_2));
87+
new TestIndexDiscriminator(CONFIG_MAP_INDEX_2, FIRST_CONFIG_MAP_SUFFIX_2));
8888
return EventSourceInitializer.nameEventSources(eventSource);
8989
}
9090

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
package io.javaoperatorsdk.operator.sample.indexdiscriminator;
2+
3+
import io.fabric8.kubernetes.api.model.ConfigMap;
4+
import io.javaoperatorsdk.operator.api.reconciler.IndexDiscriminator;
5+
6+
import static io.javaoperatorsdk.operator.sample.indexdiscriminator.IndexDiscriminatorTestReconciler.configMapKeyFromPrimary;
7+
8+
public class TestIndexDiscriminator
9+
extends IndexDiscriminator<ConfigMap, IndexDiscriminatorTestCustomResource> {
10+
11+
public TestIndexDiscriminator(String indexName, String nameSuffix) {
12+
super(indexName, p -> configMapKeyFromPrimary(p, nameSuffix));
13+
}
14+
}

0 commit comments

Comments
 (0)