1
1
package io .javaoperatorsdk .operator ;
2
2
3
- import java .io .Closeable ;
4
3
import java .io .IOException ;
5
- import java .util .ArrayList ;
4
+ import java .util .LinkedList ;
6
5
import java .util .List ;
6
+ import java .util .concurrent .locks .ReentrantLock ;
7
7
8
8
import org .slf4j .Logger ;
9
9
import org .slf4j .LoggerFactory ;
@@ -24,16 +24,13 @@ public class Operator implements AutoCloseable {
24
24
private static final Logger log = LoggerFactory .getLogger (Operator .class );
25
25
private final KubernetesClient k8sClient ;
26
26
private final ConfigurationService configurationService ;
27
- private final Object lock ;
28
- private final List <ConfiguredController > controllers ;
29
- private volatile boolean started ;
27
+ private final ReentrantLock lock = new ReentrantLock () ;
28
+ private final List <ConfiguredController > controllers = new LinkedList <>() ;
29
+ private volatile boolean started = false ;
30
30
31
31
public Operator (KubernetesClient k8sClient , ConfigurationService configurationService ) {
32
32
this .k8sClient = k8sClient ;
33
33
this .configurationService = configurationService ;
34
- this .lock = new Object ();
35
- this .controllers = new ArrayList <>();
36
- this .started = false ;
37
34
DefaultEventHandler .setEventMonitor (new EventMonitor () {
38
35
@ Override
39
36
public void processedEvent (String uid , Event event ) {
@@ -67,10 +64,14 @@ public ConfigurationService getConfigurationService() {
67
64
*/
68
65
@ SuppressWarnings ("unchecked" )
69
66
public void start () {
70
- synchronized (lock ) {
67
+ try {
68
+ lock .lock ();
71
69
if (started ) {
72
70
return ;
73
71
}
72
+ if (controllers .isEmpty ()) {
73
+ throw new OperatorException ("No ResourceController exists. Exiting!" );
74
+ }
74
75
75
76
final var version = configurationService .getVersion ();
76
77
log .info (
@@ -79,10 +80,6 @@ public void start() {
79
80
version .getCommit (),
80
81
version .getBuiltTime ());
81
82
82
- if (controllers .isEmpty ()) {
83
- throw new OperatorException ("No ResourceController exists. Exiting!" );
84
- }
85
-
86
83
log .info ("Client version: {}" , Version .clientVersion ());
87
84
try {
88
85
final var k8sVersion = k8sClient .getVersion ();
@@ -94,33 +91,37 @@ public void start() {
94
91
throw new OperatorException ("Error retrieving the server version" , e );
95
92
}
96
93
97
- controllers .forEach (ConfiguredController ::start );
98
-
94
+ controllers .parallelStream ().forEach (ConfiguredController ::start );
99
95
started = true ;
96
+ } finally {
97
+ lock .unlock ();
100
98
}
101
99
}
102
100
103
101
/** Stop the operator. */
104
102
@ Override
105
103
public void close () {
106
- synchronized (lock ) {
104
+ log .info (
105
+ "Operator SDK {} is shutting down..." , configurationService .getVersion ().getSdkVersion ());
106
+
107
+ try {
108
+ lock .lock ();
107
109
if (!started ) {
108
110
return ;
109
111
}
110
112
111
- log .info (
112
- "Operator SDK {} is shutting down..." , configurationService .getVersion ().getSdkVersion ());
113
-
114
- for (Closeable closeable : this .controllers ) {
113
+ this .controllers .parallelStream ().forEach (closeable -> {
115
114
try {
116
115
log .debug ("closing {}" , closeable );
117
116
closeable .close ();
118
117
} catch (IOException e ) {
119
118
log .warn ("Error closing {}" , closeable , e );
120
119
}
121
- }
120
+ });
122
121
123
122
started = false ;
123
+ } finally {
124
+ lock .unlock ();
124
125
}
125
126
}
126
127
0 commit comments