1
1
package io .javaoperatorsdk .operator .processing .event ;
2
2
3
- import java .util .ArrayList ;
4
- import java .util .Collection ;
3
+ import java .util .HashMap ;
5
4
import java .util .Iterator ;
6
5
import java .util .LinkedHashSet ;
7
- import java .util .List ;
6
+ import java .util .Map ;
8
7
import java .util .Objects ;
9
8
import java .util .Optional ;
10
9
import java .util .Set ;
11
10
import java .util .concurrent .ConcurrentNavigableMap ;
12
11
import java .util .concurrent .ConcurrentSkipListMap ;
13
12
import java .util .concurrent .locks .ReentrantLock ;
14
13
import java .util .stream .Collectors ;
14
+ import java .util .stream .Stream ;
15
15
16
16
import org .slf4j .Logger ;
17
17
import org .slf4j .LoggerFactory ;
@@ -87,7 +87,7 @@ public void start() {
87
87
}
88
88
89
89
@ SuppressWarnings ("rawtypes" )
90
- private void logEventSourceEvent (EventSource eventSource , String event ) {
90
+ private void logEventSourceEvent (NamedEventSource eventSource , String event ) {
91
91
if (log .isDebugEnabled ()) {
92
92
if (eventSource instanceof ResourceEventSource ) {
93
93
ResourceEventSource source = (ResourceEventSource ) eventSource ;
@@ -119,17 +119,24 @@ public void stop() {
119
119
eventProcessor .stop ();
120
120
}
121
121
122
- public final void registerEventSource (EventSource eventSource )
122
+ public final void registerEventSource (EventSource eventSource ) throws OperatorException {
123
+ registerEventSource (null , eventSource );
124
+ }
125
+
126
+ public final void registerEventSource (String name , EventSource eventSource )
123
127
throws OperatorException {
124
128
Objects .requireNonNull (eventSource , "EventSource must not be null" );
125
129
lock .lock ();
126
130
try {
127
- eventSources .add (eventSource );
131
+ if (name == null || name .isBlank ()) {
132
+ name = EventSource .defaultNameFor (eventSource );
133
+ }
134
+ eventSources .add (name , eventSource );
128
135
eventSource .setEventHandler (eventProcessor );
129
136
} catch (IllegalStateException | MissingCRDException e ) {
130
137
throw e ; // leave untouched
131
138
} catch (Exception e ) {
132
- throw new OperatorException ("Couldn't register event source: " + eventSource . name () + " for "
139
+ throw new OperatorException ("Couldn't register event source: " + name + " for "
133
140
+ controller .getConfiguration ().getName () + " controller`" , e );
134
141
} finally {
135
142
lock .unlock ();
@@ -161,7 +168,9 @@ EventHandler getEventHandler() {
161
168
}
162
169
163
170
Set <EventSource > getRegisteredEventSources () {
164
- return eventSources .all ();
171
+ return eventSources .flatMappedSources ()
172
+ .map (NamedEventSource ::original )
173
+ .collect (Collectors .toCollection (LinkedHashSet ::new ));
165
174
}
166
175
167
176
public ControllerResourceEventSource <R > getControllerResourceEventSource () {
@@ -191,8 +200,46 @@ Controller<R> getController() {
191
200
return controller ;
192
201
}
193
202
194
- private static class EventSources <R extends HasMetadata > implements Iterable <EventSource > {
195
- private final ConcurrentNavigableMap <String , List <EventSource >> sources =
203
+ static class NamedEventSource implements EventSource {
204
+ private final EventSource original ;
205
+ private final String name ;
206
+
207
+ private NamedEventSource (EventSource original , String name ) {
208
+ this .original = original ;
209
+ this .name = name ;
210
+ }
211
+
212
+ @ Override
213
+ public void start () throws OperatorException {
214
+ original .start ();
215
+ }
216
+
217
+ @ Override
218
+ public void stop () throws OperatorException {
219
+ original .stop ();
220
+ }
221
+
222
+ @ Override
223
+ public void setEventHandler (EventHandler handler ) {
224
+ original .setEventHandler (handler );
225
+ }
226
+
227
+ public String name () {
228
+ return name ;
229
+ }
230
+
231
+ @ Override
232
+ public String toString () {
233
+ return original + " named: '" + name + "'}" ;
234
+ }
235
+
236
+ public EventSource original () {
237
+ return original ;
238
+ }
239
+ }
240
+
241
+ private static class EventSources <R extends HasMetadata > implements Iterable <NamedEventSource > {
242
+ private final ConcurrentNavigableMap <String , Map <String , EventSource >> sources =
196
243
new ConcurrentSkipListMap <>();
197
244
private final TimerEventSource <R > retryAndRescheduleTimerEventSource = new TimerEventSource <>();
198
245
private ControllerResourceEventSource <R > controllerResourceEventSource ;
@@ -208,34 +255,34 @@ TimerEventSource<R> retryEventSource() {
208
255
}
209
256
210
257
@ Override
211
- public Iterator <EventSource > iterator () {
212
- return sources . values (). stream (). flatMap ( Collection :: stream ).iterator ();
258
+ public Iterator <NamedEventSource > iterator () {
259
+ return flatMappedSources ( ).iterator ();
213
260
}
214
261
215
- public Set < EventSource > all () {
216
- return sources .values ().stream ().flatMap (Collection :: stream )
217
- .collect ( Collectors . toCollection ( LinkedHashSet :: new ));
262
+ private Stream < NamedEventSource > flatMappedSources () {
263
+ return sources .values ().stream ().flatMap (c -> c . entrySet (). stream ( )
264
+ .map ( esEntry -> new NamedEventSource ( esEntry . getValue (), esEntry . getKey ()) ));
218
265
}
219
266
220
267
public void clear () {
221
268
sources .clear ();
222
269
}
223
270
224
- public boolean contains (EventSource source ) {
271
+ public boolean contains (String name , EventSource source ) {
225
272
final var eventSources = sources .get (keyFor (source ));
226
273
if (eventSources == null || eventSources .isEmpty ()) {
227
274
return false ;
228
275
}
229
- return findMatchingSource ( name ( source ), eventSources ). isPresent ( );
276
+ return eventSources . containsKey ( name );
230
277
}
231
278
232
- public void add (EventSource eventSource ) {
233
- if (contains (eventSource )) {
279
+ public void add (String name , EventSource eventSource ) {
280
+ if (contains (name , eventSource )) {
234
281
throw new IllegalArgumentException ("An event source is already registered for the "
235
- + keyAsString (getDependentType (eventSource ), name ( eventSource ) )
282
+ + keyAsString (getDependentType (eventSource ), name )
236
283
+ " class/name combination" );
237
284
}
238
- sources .computeIfAbsent (keyFor (eventSource ), k -> new ArrayList <>()).add ( eventSource );
285
+ sources .computeIfAbsent (keyFor (eventSource ), k -> new HashMap <>()).put ( name , eventSource );
239
286
}
240
287
241
288
@ SuppressWarnings ("rawtypes" )
@@ -245,10 +292,6 @@ private Class<?> getDependentType(EventSource source) {
245
292
: source .getClass ();
246
293
}
247
294
248
- private String name (EventSource source ) {
249
- return source .name ();
250
- }
251
-
252
295
private String keyFor (EventSource source ) {
253
296
return keyFor (getDependentType (source ));
254
297
}
@@ -278,15 +321,15 @@ public <S> ResourceEventSource<R, S> get(Class<S> dependentType, String name) {
278
321
final var size = sourcesForType .size ();
279
322
final EventSource source ;
280
323
if (size == 1 ) {
281
- source = sourcesForType .get ( 0 );
324
+ source = sourcesForType .values (). stream (). findFirst (). orElse ( null );
282
325
} else {
283
326
if (name == null || name .isBlank ()) {
284
327
throw new IllegalArgumentException ("There are multiple EventSources registered for type "
285
328
+ dependentType .getCanonicalName ()
286
329
+ ", you need to provide a name to specify which EventSource you want to query. Known names: "
287
- + sourcesForType . stream (). map ( this :: name ). collect ( Collectors . joining ( "," )));
330
+ + String . join ( "," , sourcesForType . keySet ( )));
288
331
}
289
- source = findMatchingSource ( name , sourcesForType ). orElse ( null );
332
+ source = sourcesForType . get ( name );
290
333
291
334
if (source == null ) {
292
335
return null ;
@@ -309,11 +352,6 @@ public <S> ResourceEventSource<R, S> get(Class<S> dependentType, String name) {
309
352
return res ;
310
353
}
311
354
312
- private Optional <EventSource > findMatchingSource (String name ,
313
- List <EventSource > sourcesForType ) {
314
- return sourcesForType .stream ().filter (es -> name (es ).equals (name )).findAny ();
315
- }
316
-
317
355
@ SuppressWarnings ("rawtypes" )
318
356
private String keyAsString (Class dependentType , String name ) {
319
357
return name != null && name .length () > 0
0 commit comments