Skip to content

fix: resource cache interface for InformerEventSource #758

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 5 commits into from
Dec 17, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 0 additions & 15 deletions operator-framework-core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -105,20 +105,5 @@
<artifactId>awaitility</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>javax.cache</groupId>
<artifactId>cache-api</artifactId>
<version>${jcache.version}</version>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>jcache</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
package io.javaoperatorsdk.operator.processing.event.source;

import java.util.Collections;
import java.util.Map;
import java.util.Optional;

import javax.cache.Cache;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.concurrent.ConcurrentHashMap;

import io.javaoperatorsdk.operator.OperatorException;
import io.javaoperatorsdk.operator.processing.event.Event;
Expand All @@ -25,13 +23,9 @@
*/
public abstract class CachingEventSource<T> extends LifecycleAwareEventSource {

private static final Logger log = LoggerFactory.getLogger(CachingEventSource.class);
protected Map<ResourceID, T> cache = new ConcurrentHashMap<>();

protected Cache<ResourceID, T> cache;

public CachingEventSource(Cache<ResourceID, T> cache) {
this.cache = cache;
}
public CachingEventSource() {}

protected void handleDelete(ResourceID relatedResourceID) {
if (!isRunning()) {
Expand All @@ -56,8 +50,8 @@ protected void handleEvent(T value, ResourceID relatedResourceID) {
}
}

public Cache<ResourceID, T> getCache() {
return cache;
public Map<ResourceID, T> getCache() {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is too restrictive… I think we should probably simplify ResourceCache and return that instead maybe?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if following, we return all the values, can it be less restrictive?

On the other hand the resource cache is now about K8S resource, and this class is dedicated to non k8s resources. So not sure if we should unify those. Those have totally different requirements. And approach how we are handling them.

Copy link
Collaborator Author

@csviri csviri Dec 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or maybe just a very simple way.

Like having a simple interface with one method
getValue(ResourceID id)

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be addressed in a subsequent PR, let's merge this as-is for now.

return Collections.unmodifiableMap(cache);
}

public Optional<T> getCachedValue(ResourceID resourceID) {
Expand All @@ -67,6 +61,5 @@ public Optional<T> getCachedValue(ResourceID resourceID) {
@Override
public void stop() throws OperatorException {
super.stop();
cache.close();
}
}
Original file line number Diff line number Diff line change
@@ -1,16 +1,10 @@
package io.javaoperatorsdk.operator.processing.event.source.inbound;

import javax.cache.Cache;

import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.CachingEventSource;

public class CachingInboundEventSource<T> extends CachingEventSource<T> {

public CachingInboundEventSource(Cache<ResourceID, T> cache) {
super(cache);
}

public void handleResourceEvent(T resource, ResourceID relatedResourceID) {
super.handleEvent(resource, relatedResourceID);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package io.javaoperatorsdk.operator.processing.event.source.informer;

import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.stream.Stream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -16,8 +19,10 @@
import io.javaoperatorsdk.operator.processing.event.Event;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.AbstractEventSource;
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceCache;

public class InformerEventSource<T extends HasMetadata> extends AbstractEventSource {
public class InformerEventSource<T extends HasMetadata> extends AbstractEventSource
implements ResourceCache<T> {

private static final Logger log = LoggerFactory.getLogger(InformerEventSource.class);

Expand Down Expand Up @@ -132,4 +137,22 @@ public T getAssociated(HasMetadata resource) {
public SharedInformer<T> getSharedInformer() {
return sharedInformer;
}

@Override
public Optional<T> get(ResourceID resourceID) {
return Optional.ofNullable(sharedInformer.getStore()
.getByKey(Cache.namespaceKeyFunc(resourceID.getNamespace().orElse(null),
resourceID.getName())));
}

@Override
public Stream<T> list(Predicate<T> predicate) {
return sharedInformer.getStore().list().stream().filter(predicate);
}

@Override
public Stream<T> list(String namespace, Predicate<T> predicate) {
return sharedInformer.getStore().list().stream()
.filter(v -> namespace.equals(v.getMetadata().getNamespace()) && predicate.test(v));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Predicate;

import javax.cache.Cache;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -44,14 +42,13 @@ public class PerResourcePollingEventSource<T, R extends HasMetadata>
private final long period;

public PerResourcePollingEventSource(ResourceSupplier<T, R> resourceSupplier,
ResourceCache<R> resourceCache, long period, Cache<ResourceID, T> cache) {
this(resourceSupplier, resourceCache, period, cache, null);
ResourceCache<R> resourceCache, long period) {
this(resourceSupplier, resourceCache, period, null);
}

public PerResourcePollingEventSource(ResourceSupplier<T, R> resourceSupplier,
ResourceCache<R> resourceCache, long period, Cache<ResourceID, T> cache,
ResourceCache<R> resourceCache, long period,
Predicate<R> registerPredicate) {
super(cache);
this.resourceSupplier = resourceSupplier;
this.resourceCache = resourceCache;
this.period = period;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,6 @@

import java.util.*;
import java.util.function.Supplier;
import java.util.stream.StreamSupport;

import javax.cache.Cache;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand All @@ -22,8 +19,7 @@ public class PollingEventSource<T> extends CachingEventSource<T> {
private final long period;

public PollingEventSource(Supplier<Map<ResourceID, T>> supplier,
long period, Cache<ResourceID, T> cache) {
super(cache);
long period) {
this.supplierToPoll = supplier;
this.period = period;
}
Expand All @@ -46,8 +42,8 @@ public void run() {
protected void getStateAndFillCache() {
var values = supplierToPoll.get();
values.forEach((k, v) -> super.handleEvent(v, k));
StreamSupport.stream(cache.spliterator(), false)
.filter(e -> !values.containsKey(e.getKey())).map(Cache.Entry::getKey)
cache.keySet().stream()
.filter(e -> !values.containsKey(e))
.forEach(super::handleDelete);
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,19 +1,11 @@
package io.javaoperatorsdk.operator.processing.event.source;

import javax.cache.Cache;
import javax.cache.CacheManager;
import javax.cache.configuration.MutableConfiguration;
import javax.cache.spi.CachingProvider;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import io.javaoperatorsdk.operator.processing.event.Event;
import io.javaoperatorsdk.operator.processing.event.EventHandler;
import io.javaoperatorsdk.operator.processing.event.ResourceID;

import com.github.benmanes.caffeine.jcache.spi.CaffeineCachingProvider;

import static io.javaoperatorsdk.operator.processing.event.source.SampleExternalResource.*;
import static org.assertj.core.api.Assertions.assertThat;
Expand All @@ -22,16 +14,11 @@
class CachingEventSourceTest {

private CachingEventSource<SampleExternalResource> cachingEventSource;
private Cache<ResourceID, SampleExternalResource> cache;
private EventHandler eventHandlerMock = mock(EventHandler.class);

@BeforeEach
public void setup() {
CachingProvider cachingProvider = new CaffeineCachingProvider();
CacheManager cacheManager = cachingProvider.getCacheManager();
cache = cacheManager.createCache("test-caching", new MutableConfiguration<>());

cachingEventSource = new SimpleCachingEventSource(cache);
cachingEventSource = new SimpleCachingEventSource();
cachingEventSource.setEventHandler(eventHandlerMock);
cachingEventSource.start();
}
Expand Down Expand Up @@ -89,9 +76,6 @@ public void noEventOnDeleteIfResourceWasNotInCacheBefore() {

public static class SimpleCachingEventSource
extends CachingEventSource<SampleExternalResource> {
public SimpleCachingEventSource(Cache<ResourceID, SampleExternalResource> cache) {
super(cache);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,6 @@

import java.util.Optional;

import javax.cache.Cache;
import javax.cache.CacheManager;
import javax.cache.configuration.MutableConfiguration;
import javax.cache.spi.CachingProvider;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

Expand All @@ -17,10 +12,7 @@
import io.javaoperatorsdk.operator.processing.event.source.controller.ResourceCache;
import io.javaoperatorsdk.operator.sample.simple.TestCustomResource;

import com.github.benmanes.caffeine.jcache.spi.CaffeineCachingProvider;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.*;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.*;
Expand All @@ -32,22 +24,17 @@ class PerResourcePollingEventSourceTest {
private PerResourcePollingEventSource.ResourceSupplier<SampleExternalResource, TestCustomResource> supplier =
mock(PerResourcePollingEventSource.ResourceSupplier.class);
private ResourceCache<TestCustomResource> resourceCache = mock(ResourceCache.class);
private Cache<ResourceID, SampleExternalResource> cache;
private EventHandler eventHandler = mock(EventHandler.class);
private TestCustomResource testCustomResource = TestUtils.testCustomResource();

@BeforeEach
public void setup() {
CachingProvider cachingProvider = new CaffeineCachingProvider();
CacheManager cacheManager = cachingProvider.getCacheManager();
cache = cacheManager.createCache("test-caching", new MutableConfiguration<>());

when(resourceCache.get(any())).thenReturn(Optional.of(testCustomResource));
when(supplier.getResources(any()))
.thenReturn(Optional.of(SampleExternalResource.testResource1()));

pollingEventSource =
new PerResourcePollingEventSource<>(supplier, resourceCache, PERIOD, cache);
new PerResourcePollingEventSource<>(supplier, resourceCache, PERIOD);
pollingEventSource.setEventHandler(eventHandler);
}

Expand All @@ -63,7 +50,7 @@ public void pollsTheResourceAfterAwareOfIt() throws InterruptedException {

@Test
public void registeringTaskOnAPredicate() throws InterruptedException {
pollingEventSource = new PerResourcePollingEventSource<>(supplier, resourceCache, PERIOD, cache,
pollingEventSource = new PerResourcePollingEventSource<>(supplier, resourceCache, PERIOD,
testCustomResource -> testCustomResource.getMetadata().getGeneration() > 1);
pollingEventSource.setEventHandler(eventHandler);
pollingEventSource.start();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,11 +4,6 @@
import java.util.Map;
import java.util.function.Supplier;

import javax.cache.Cache;
import javax.cache.CacheManager;
import javax.cache.configuration.MutableConfiguration;
import javax.cache.spi.CachingProvider;

import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -17,25 +12,18 @@
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.SampleExternalResource;

import com.github.benmanes.caffeine.jcache.spi.CaffeineCachingProvider;

import static io.javaoperatorsdk.operator.processing.event.source.SampleExternalResource.*;
import static org.mockito.Mockito.*;

class PollingEventSourceTest {

private PollingEventSource<SampleExternalResource> pollingEventSource;
private Supplier<Map<ResourceID, SampleExternalResource>> supplier = mock(Supplier.class);
private Cache<ResourceID, SampleExternalResource> cache;
private EventHandler eventHandler = mock(EventHandler.class);

@BeforeEach
public void setup() {
CachingProvider cachingProvider = new CaffeineCachingProvider();
CacheManager cacheManager = cachingProvider.getCacheManager();
cache = cacheManager.createCache("test-caching", new MutableConfiguration<>());

pollingEventSource = new PollingEventSource<>(supplier, 50, cache);
pollingEventSource = new PollingEventSource<>(supplier, 50);
pollingEventSource.setEventHandler(eventHandler);
}

Expand Down
17 changes: 0 additions & 17 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,6 @@
<formatter-maven-plugin.version>2.17.1</formatter-maven-plugin.version>
<directory-maven-plugin.version>1.0</directory-maven-plugin.version>
<impsort-maven-plugin.version>1.6.2</impsort-maven-plugin.version>
<jcache.version>1.1.1</jcache.version>
<caffein.version>3.0.4</caffein.version>
</properties>

<modules>
Expand Down Expand Up @@ -170,21 +168,6 @@
<artifactId>operator-framework</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>javax.cache</groupId>
<artifactId>cache-api</artifactId>
<version>${jcache.version}</version>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
<version>${caffein.version}</version>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>jcache</artifactId>
<version>${caffein.version}</version>
</dependency>
</dependencies>
</dependencyManagement>

Expand Down
13 changes: 0 additions & 13 deletions sample-operators/mysql-schema/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -67,19 +67,6 @@
<artifactId>jackson-dataformat-yaml</artifactId>
<version>2.13.0</version>
</dependency>
<dependency>
<groupId>javax.cache</groupId>
<artifactId>cache-api</artifactId>
<version>${jcache.version}</version>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>caffeine</artifactId>
</dependency>
<dependency>
<groupId>com.github.ben-manes.caffeine</groupId>
<artifactId>jcache</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
Loading