Skip to content

Commit f55eaca

Browse files
authored
Merge branch 'master' into openshift_visit
2 parents cb2a7e7 + 921a469 commit f55eaca

File tree

40 files changed

+638
-476
lines changed

40 files changed

+638
-476
lines changed

CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,12 +8,15 @@
88
* Fix #3295: Fix wrong kind getting registered in KubernetesDeserializer in SharedInformerFactory
99
* Fix #3318: Informer relist add/update should not always be sync events
1010
* Fix #3328: Allow for generic watches of known types
11+
* Fix #3240: MicroTime serialises incorrectly; add custom serializer/deserializer for MicroTime
1112
* Fix #3329: Moved the parsing of resource(String) to the common base client
1213
* Fix #3330: Added usage of the openshift specific handlers for resource(String/HasMetadata) to pickup the right Readiness
1314

1415
#### Improvements
16+
* Fix #3285: adding waiting for list contexts `Informable.informOnCondition`
1517
* Fix #3284: refined how builders are obtained / used by HasMetadataOperation
1618
* Fix #3314: Add more detailed information about what is generated by the CRD generator
19+
* Fix #3307: refined the support for `Readiable.isReady`
1720

1821
#### Dependency Upgrade
1922
* Update Tekton Pipeline Model to v0.25.0
@@ -22,6 +25,10 @@
2225
* Fix #3291: Retrying the HTTP operation in case of IOException too
2326
* Fix #2712: Add support for watching logs in multi-container Controller resources (Deployments, StatefulSets, ReplicaSet etc)
2427

28+
#### _**Note**_: Breaking changes in the API
29+
##### DSL Changes:
30+
- #3307 `Readiable.isReady` returns a boolean rather than a Boolean
31+
2532
### 5.5.0 (2021-06-30)
2633

2734
#### Bugs

kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/Informable.java

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,10 @@
2323

2424
import java.util.List;
2525
import java.util.Map;
26+
import java.util.concurrent.CompletableFuture;
27+
import java.util.concurrent.Future;
2628
import java.util.function.Function;
29+
import java.util.function.Predicate;
2730

2831
public interface Informable<T> {
2932

@@ -34,6 +37,21 @@ public interface Informable<T> {
3437
*/
3538
Informable<T> withIndexers(Map<String, Function<T, List<String>>> indexers);
3639

40+
/**
41+
* Similar to a {@link Watch}, but will attempt to handle failures after successfully started.
42+
* and provides a store of all the current resources.
43+
* <p>This returned informer will not support resync.
44+
* <p>This call will be blocking for the initial list and watch.
45+
* <p>You are expected to call stop to terminate the underlying Watch.
46+
* <p>Additional handlers can be added, but processing of the events will be in the websocket thread,
47+
* so consider non-blocking handler operations for more than one handler.
48+
*
49+
* @return a running {@link SharedIndexInformer}
50+
*/
51+
default SharedIndexInformer<T> inform() {
52+
return inform(null, 0);
53+
}
54+
3755
/**
3856
* Similar to a {@link Watch}, but will attempt to handle failures after successfully started.
3957
* and provides a store of all the current resources.
@@ -56,12 +74,21 @@ default SharedIndexInformer<T> inform(ResourceEventHandler<T> handler) {
5674
* <p>This call will be blocking for the initial list and watch.
5775
* <p>You are expected to call stop to terminate the underlying Watch.
5876
* <p>Additional handlers can be added, but processing of the events will be in the websocket thread,
59-
* so consider non-blocking handleroperations for more than one handler.
77+
* so consider non-blocking handler operations for more than one handler.
6078
*
6179
* @param handler to notify
6280
* @param resync the resync period or 0 for no resync
6381
* @return a running {@link SharedIndexInformer}
6482
*/
6583
SharedIndexInformer<T> inform(ResourceEventHandler<T> handler, long resync);
84+
85+
/**
86+
* Return a {@link Future} when the list at this context satisfies the given {@link Predicate}.
87+
* The predicate will be tested against the state of the underlying informer store on every event.
88+
* The returned future should be cancelled by the caller if not waiting for completion to close the underlying informer
89+
* @param condition the {@link Predicate} to test
90+
* @return a {@link CompletableFuture} of the list of items after the condition is met
91+
*/
92+
CompletableFuture<List<T>> informOnCondition(Predicate<List<T>> condition);
6693

6794
}

kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/Readiable.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,5 +17,9 @@
1717

1818
public interface Readiable {
1919

20-
Boolean isReady();
20+
/**
21+
* Check if the resource is ready. If no readiness check exists, this is just an existence check.
22+
* @return true if the resource exists and is ready.
23+
*/
24+
boolean isReady();
2125
}

kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/VisitFromServerGetWatchDeleteRecreateWaitApplicable.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,5 +25,6 @@ public interface VisitFromServerGetWatchDeleteRecreateWaitApplicable<T> extends
2525
Watchable<Watcher<T>>,
2626
Waitable<T, T>,
2727
VisitFromServerWritable<T>,
28-
DryRunable<VisitFromServerWritable<T>> {
28+
DryRunable<VisitFromServerWritable<T>>,
29+
Readiable {
2930
}

kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/base/BaseOperation.java

Lines changed: 63 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,6 @@
1818
import io.fabric8.kubernetes.api.model.ObjectReference;
1919
import io.fabric8.kubernetes.client.dsl.WritableOperation;
2020
import io.fabric8.kubernetes.client.utils.CreateOrReplaceHelper;
21-
import org.slf4j.Logger;
22-
import org.slf4j.LoggerFactory;
2321

2422
import io.fabric8.kubernetes.api.builder.TypedVisitor;
2523
import io.fabric8.kubernetes.api.builder.Visitor;
@@ -78,7 +76,7 @@
7876
import java.util.concurrent.CompletableFuture;
7977
import java.util.concurrent.ExecutionException;
8078
import java.util.concurrent.TimeUnit;
81-
import java.util.concurrent.TimeoutException;
79+
import java.util.concurrent.atomic.AtomicReference;
8280
import java.util.function.Consumer;
8381
import java.util.function.Function;
8482
import java.util.function.Predicate;
@@ -94,7 +92,6 @@ public class BaseOperation<T extends HasMetadata, L extends KubernetesResourceLi
9492
MixedOperation<T, L, R>,
9593
Resource<T> {
9694

97-
private static final Logger LOG = LoggerFactory.getLogger(BaseOperation.class);
9895
private static final String READ_ONLY_UPDATE_EXCEPTION_MESSAGE = "Cannot update read-only resources";
9996
private static final String READ_ONLY_EDIT_EXCEPTION_MESSAGE = "Cannot edit read-only resources";
10097

@@ -953,8 +950,12 @@ public Readiness getReadiness() {
953950
}
954951

955952
@Override
956-
public final Boolean isReady() {
957-
return getReadiness().isReady(get());
953+
public final boolean isReady() {
954+
T item = fromServer().get();
955+
if (item == null) {
956+
return false;
957+
}
958+
return getReadiness().isReady(item);
958959
}
959960

960961
@Override
@@ -964,56 +965,70 @@ public T waitUntilReady(long amount, TimeUnit timeUnit) {
964965

965966
@Override
966967
public T waitUntilCondition(Predicate<T> condition, long amount, TimeUnit timeUnit) {
967-
CompletableFuture<T> future = new CompletableFuture<>();
968-
// tests the condition, trapping any exceptions
969-
Consumer<T> tester = obj -> {
968+
CompletableFuture<T> futureCondition = informOnCondition(l -> {
969+
if (l.isEmpty()) {
970+
return condition.test(null);
971+
}
972+
return condition.test(l.get(0));
973+
}).thenApply(l -> l.isEmpty() ? null : l.get(0));
974+
975+
if (!Utils.waitUntilReady(futureCondition, amount, timeUnit)) {
976+
futureCondition.cancel(true);
977+
T i = getItem();
978+
if (i != null) {
979+
throw new KubernetesClientTimeoutException(i, amount, timeUnit);
980+
}
981+
throw new KubernetesClientTimeoutException(getKind(), getName(), getNamespace(), amount, timeUnit);
982+
}
983+
return futureCondition.getNow(null);
984+
}
985+
986+
@Override
987+
public CompletableFuture<List<T>> informOnCondition(Predicate<List<T>> condition) {
988+
CompletableFuture<List<T>> future = new CompletableFuture<>();
989+
AtomicReference<Runnable> tester = new AtomicReference<>();
990+
991+
// create an informer that supplies the tester with events and empty list handling
992+
SharedIndexInformer<T> informer = this.createInformer(0);
993+
994+
// prevent unnecessary watches and handle closure
995+
future.whenComplete((r, t) -> informer.stop());
996+
997+
// use the cache to evaluate the list predicate, trapping any exceptions
998+
Runnable test = () -> {
970999
try {
971-
if (condition.test(obj)) {
972-
future.complete(obj);
1000+
// could skip if lastResourceVersion has not changed
1001+
List<T> list = informer.getStore().list();
1002+
if (condition.test(list)) {
1003+
future.complete(list);
9731004
}
9741005
} catch (Exception e) {
9751006
future.completeExceptionally(e);
9761007
}
9771008
};
978-
// start an informer that supplies the tester with events and empty list handling
979-
try (SharedIndexInformer<T> informer = this.createInformer(0, l -> {
980-
if (l.getItems().isEmpty()) {
981-
tester.accept(null);
982-
}
983-
}, new ResourceEventHandler<T>() {
984-
1009+
tester.set(test);
1010+
1011+
informer.addEventHandler(new ResourceEventHandler<T>() {
9851012
@Override
9861013
public void onAdd(T obj) {
987-
tester.accept(obj);
1014+
test.run();
9881015
}
989-
9901016
@Override
991-
public void onUpdate(T oldObj, T newObj) {
992-
tester.accept(newObj);
1017+
public void onDelete(T obj, boolean deletedFinalStateUnknown) {
1018+
test.run();
9931019
}
994-
9951020
@Override
996-
public void onDelete(T obj, boolean deletedFinalStateUnknown) {
997-
tester.accept(null);
1021+
public void onUpdate(T oldObj, T newObj) {
1022+
test.run();
9981023
}
999-
})) {
1000-
// prevent unnecessary watches
1001-
future.whenComplete((r,t) -> informer.stop());
1002-
informer.run();
1003-
return future.get(amount, timeUnit);
1004-
} catch (InterruptedException e) {
1005-
Thread.currentThread().interrupt();
1006-
throw KubernetesClientException.launderThrowable(e.getCause());
1007-
} catch (ExecutionException e) {
1008-
throw KubernetesClientException.launderThrowable(e.getCause());
1009-
} catch (TimeoutException e) {
1010-
T i = getItem();
1011-
if (i != null) {
1012-
throw new KubernetesClientTimeoutException(i, amount, timeUnit);
1024+
@Override
1025+
public void onNothing() {
1026+
test.run();
10131027
}
1014-
throw new KubernetesClientTimeoutException(getKind(), getName(), getNamespace(), amount, timeUnit);
1015-
}
1016-
}
1028+
});
1029+
informer.run();
1030+
return future;
1031+
}
10171032

10181033
public void setType(Class<T> type) {
10191034
this.type = type;
@@ -1041,14 +1056,17 @@ public Informable<T> withIndexers(Map<String, Function<T, List<String>>> indexer
10411056

10421057
@Override
10431058
public SharedIndexInformer<T> inform(ResourceEventHandler<T> handler, long resync) {
1044-
DefaultSharedIndexInformer<T, L> result = createInformer(resync, null, handler);
1059+
DefaultSharedIndexInformer<T, L> result = createInformer(resync);
1060+
if (handler != null) {
1061+
result.addEventHandler(handler);
1062+
}
10451063
// synchronous start list/watch must succeed in the calling thread
10461064
// initial add events will be processed in the calling thread as well
10471065
result.run();
10481066
return result;
10491067
}
10501068

1051-
private DefaultSharedIndexInformer<T, L> createInformer(long resync, Consumer<L> onList, ResourceEventHandler<T> handler) {
1069+
private DefaultSharedIndexInformer<T, L> createInformer(long resync) {
10521070
T i = getItem();
10531071
String name = (Utils.isNotNullOrEmpty(getName()) || i != null) ? checkName(i) : null;
10541072

@@ -1060,11 +1078,7 @@ public L list(ListOptions params, String namespace, OperationContext context) {
10601078
if (name != null) {
10611079
params.setFieldSelector("metadata.name="+name);
10621080
}
1063-
L result = BaseOperation.this.list(params);
1064-
if (onList != null) {
1065-
onList.accept(result);
1066-
}
1067-
return result;
1081+
return BaseOperation.this.list(params);
10681082
}
10691083

10701084
@Override
@@ -1075,9 +1089,6 @@ public Watch watch(ListOptions params, String namespace, OperationContext contex
10751089
if (indexers != null) {
10761090
informer.addIndexers(indexers);
10771091
}
1078-
if (handler != null) {
1079-
informer.addEventHandler(handler);
1080-
}
10811092
return informer;
10821093
}
10831094
}

kubernetes-client/src/main/java/io/fabric8/kubernetes/client/dsl/internal/NamespaceVisitFromServerGetWatchDeleteRecreateWaitApplicableImpl.java

Lines changed: 18 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ public HasMetadata apply() {
126126

127127
@Override
128128
public HasMetadata createOrReplace() {
129-
HasMetadata meta = acceptVisitors(item, visitors);
129+
HasMetadata meta = get();
130130
ResourceHandler<HasMetadata, ?> h = handlerOf(meta);
131131
String namespaceToUse = meta.getMetadata().getNamespace();
132132
if (Boolean.TRUE.equals(deletingExisting)) {
@@ -143,29 +143,22 @@ public Waitable<HasMetadata, HasMetadata> createOrReplaceAnd() {
143143
@Override
144144
public Boolean delete() {
145145
//First pass check before deleting
146-
HasMetadata meta = acceptVisitors(item, visitors);
146+
HasMetadata meta = get();
147147
ResourceHandler<HasMetadata, ?> h = handlerOf(meta);
148148
return h.delete(client, config, meta.getMetadata().getNamespace(), propagationPolicy, gracePeriodSeconds, meta, dryRun);
149149
}
150150

151151
@Override
152152
public HasMetadata get() {
153+
HasMetadata meta = acceptVisitors(item, visitors);
153154
if (fromServer) {
154-
HasMetadata meta = acceptVisitors(item, visitors);
155155
ResourceHandler<HasMetadata, ?> h = handlerOf(meta);
156-
HasMetadata reloaded = h.reload(client, config, meta.getMetadata().getNamespace(), meta);
157-
if (reloaded != null) {
158-
HasMetadata edited = reloaded;
159-
//Let's apply any visitor that might have been specified.
160-
for (Visitor v : visitors) {
161-
edited = h.edit(edited).accept(v).build();
162-
}
163-
return edited;
156+
meta = h.reload(client, config, meta.getMetadata().getNamespace(), meta);
157+
if (meta != null) {
158+
return acceptVisitors(meta, visitors);
164159
}
165-
return null;
166-
} else {
167-
return acceptVisitors(item, visitors);
168160
}
161+
return meta;
169162
}
170163

171164
@Override
@@ -216,21 +209,21 @@ public Waitable<HasMetadata, HasMetadata> withWaitRetryBackoff(long initialBacko
216209

217210
@Override
218211
public Watch watch(Watcher<HasMetadata> watcher) {
219-
HasMetadata meta = acceptVisitors(item, visitors);
212+
HasMetadata meta = get();
220213
ResourceHandler<HasMetadata, ?> h = handlerOf(meta);
221214
return h.watch(client, config, meta.getMetadata().getNamespace(), meta, watcher);
222215
}
223216

224217
@Override
225218
public Watch watch(String resourceVersion, Watcher<HasMetadata> watcher) {
226-
HasMetadata meta = acceptVisitors(item, visitors);
219+
HasMetadata meta = get();
227220
ResourceHandler<HasMetadata, ?> h = handlerOf(meta);
228221
return h.watch(client, config, meta.getMetadata().getNamespace(), meta, resourceVersion, watcher);
229222
}
230223

231224
@Override
232225
public Watch watch(ListOptions options, Watcher<HasMetadata> watcher) {
233-
HasMetadata meta = acceptVisitors(item, visitors);
226+
HasMetadata meta = get();
234227
ResourceHandler<HasMetadata, ?> h = handlerOf(meta);
235228
return h.watch(client, config, meta.getMetadata().getNamespace(), meta, options, watcher);
236229
}
@@ -240,13 +233,17 @@ protected Readiness getReadiness() {
240233
}
241234

242235
@Override
243-
public final Boolean isReady() {
244-
return getReadiness().isReady(get());
236+
public final boolean isReady() {
237+
HasMetadata meta = fromServer().get();
238+
if (meta == null) {
239+
return false;
240+
}
241+
return getReadiness().isReady(meta);
245242
}
246243

247244
@Override
248245
public HasMetadata waitUntilReady(long amount, TimeUnit timeUnit) {
249-
HasMetadata meta = acceptVisitors(get(), visitors);
246+
HasMetadata meta = get();
250247
ResourceHandler<HasMetadata, ?> h = handlerOf(meta);
251248
return h.waitUntilReady(client, config, meta.getMetadata().getNamespace(), meta, amount, timeUnit);
252249
}
@@ -259,12 +256,11 @@ public VisitFromServerWritable<HasMetadata> dryRun(boolean isDryRun) {
259256
@Override
260257
public HasMetadata waitUntilCondition(Predicate<HasMetadata> condition, long amount,
261258
TimeUnit timeUnit) {
262-
HasMetadata meta = acceptVisitors(get(), visitors);
259+
HasMetadata meta = get();
263260
ResourceHandler<HasMetadata, ?> h = handlerOf(meta);
264261
return h.waitUntilCondition(client, config, meta.getMetadata().getNamespace(), meta, condition, amount, timeUnit);
265262
}
266263

267-
268264
private static HasMetadata acceptVisitors(HasMetadata item, List<Visitor> visitors) {
269265
ResourceHandler<HasMetadata, ?> h = handlerOf(item);
270266
VisitableBuilder<HasMetadata, ?> builder = h.edit(item);

0 commit comments

Comments
 (0)