diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSources.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSources.java index b53b92e122..f827b04a6d 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSources.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/EventSources.java @@ -1,10 +1,10 @@ package io.javaoperatorsdk.operator.processing.event; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentNavigableMap; import java.util.concurrent.ConcurrentSkipListMap; import java.util.stream.Collectors; @@ -93,7 +93,8 @@ public void add(NamedEventSource eventSource) { + keyAsString(getResourceType(original), name) + " class/name combination"); } - sources.computeIfAbsent(keyFor(original), k -> new HashMap<>()).put(name, eventSource); + sources.computeIfAbsent(keyFor(original), k -> new ConcurrentHashMap<>()).put(name, + eventSource); } @SuppressWarnings("rawtypes") diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourcesTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourcesTest.java index db2a69609d..ad4faecc64 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourcesTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/EventSourcesTest.java @@ -1,5 +1,11 @@ package io.javaoperatorsdk.operator.processing.event; +import java.util.ConcurrentModificationException; +import java.util.concurrent.Phaser; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + import org.junit.jupiter.api.Test; import io.fabric8.kubernetes.api.model.ConfigMap; @@ -184,4 +190,45 @@ void getEventSourcesShouldWork() { assertThat(eventSources.getEventSources(Service.class)).isEmpty(); } + + @Test + void testConcurrentAddRemoveAndGet() throws InterruptedException { + final var concurrentExceptionFound = new AtomicBoolean(false); + for (int i = 0; i < 1000 && !concurrentExceptionFound.get(); i++) { + final var eventSources = new EventSources(); + var eventSourceList = + IntStream.range(1, 20).mapToObj(n -> { + var mockResES = mock(ResourceEventSource.class); + NamedEventSource eventSource = mock(NamedEventSource.class); + when(eventSource.original()).thenReturn(mockResES); + when(eventSource.name()).thenReturn("name" + n); + when(mockResES.resourceType()).thenReturn(HasMetadata.class); + return eventSource; + }).collect(Collectors.toList()); + + IntStream.range(1, 10).forEach(n -> eventSources.add(eventSourceList.get(n - 1))); + + var phaser = new Phaser(2); + + var t1 = new Thread(() -> { + phaser.arriveAndAwaitAdvance(); + IntStream.range(11, 20).forEach(n -> eventSources.add(eventSourceList.get(n - 1))); + }); + var t2 = new Thread(() -> { + phaser.arriveAndAwaitAdvance(); + try { + eventSources.getEventSources(HasMetadata.class); + } catch (ConcurrentModificationException e) { + concurrentExceptionFound.set(true); + } + }); + t1.start(); + t2.start(); + t1.join(); + t2.join(); + } + assertThat(concurrentExceptionFound) + .withFailMessage("ConcurrentModificationException thrown") + .isFalse(); + } }