diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/IndexedResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/IndexedResourceCache.java index 5ccc5894a1..01e3ffb7c8 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/IndexedResourceCache.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/IndexedResourceCache.java @@ -16,9 +16,15 @@ package io.javaoperatorsdk.operator.api.reconciler; import java.util.List; +import java.util.stream.Stream; import io.fabric8.kubernetes.api.model.HasMetadata; public interface IndexedResourceCache extends ResourceCache { + List byIndex(String indexName, String indexKey); + + default Stream byIndexStream(String indexName, String indexKey) { + return byIndex(indexName, indexKey).stream(); + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ResourceCache.java index dd2844d9f8..672b48e540 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ResourceCache.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ResourceCache.java @@ -24,9 +24,22 @@ @SuppressWarnings("unchecked") public interface ResourceCache extends Cache { + /** + * Lists all resources in the given namespace. + * + * @param namespace the namespace to list resources from + * @return a stream of all cached resources in the namespace + */ default Stream list(String namespace) { return list(namespace, TRUE); } + /** + * Lists resources in the given namespace that match the provided predicate. + * + * @param namespace the namespace to list resources from + * @param predicate filter to apply on the resources + * @return a stream of cached resources matching the predicate + */ Stream list(String namespace, Predicate predicate); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ResourceOperations.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ResourceOperations.java index de4d00d717..b9ef475509 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ResourceOperations.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ResourceOperations.java @@ -37,7 +37,8 @@ /** * Provides useful operations to manipulate resources (server-side apply, patch, etc.) in an * idiomatic way, in particular to make sure that the latest version of the resource is present in - * the caches for the next reconciliation. + * the caches for the next reconciliation. In other words, it provides read-cache-after-write + * consistency. * * @param

the resource type on which this object operates */ diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/Cache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/Cache.java index ffcfd2df58..b2c2d2692d 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/Cache.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/Cache.java @@ -25,17 +25,45 @@ public interface Cache { Predicate TRUE = (a) -> true; + /** + * Retrieves a resource from the cache by its {@link ResourceID}. + * + * @param resourceID the identifier of the resource + * @return an Optional containing the resource if present in the cache + */ Optional get(ResourceID resourceID); + /** + * Checks whether a resource with the given {@link ResourceID} exists in the cache. + * + * @param resourceID the identifier of the resource + * @return {@code true} if the resource is present in the cache + */ default boolean contains(ResourceID resourceID) { return get(resourceID).isPresent(); } + /** + * Returns a stream of all {@link ResourceID}s currently in the cache. + * + * @return a stream of resource identifiers + */ Stream keys(); + /** + * Lists all resources in the cache. + * + * @return a stream of all cached resources + */ default Stream list() { return list(TRUE); } + /** + * Lists resources in the cache that match the provided predicate. + * + * @param predicate filter to apply on the resources + * @return a stream of cached resources matching the predicate + */ Stream list(Predicate predicate); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java index 6632ce631e..a0b7938302 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java @@ -107,10 +107,6 @@ private void initSources() { } } - C configuration() { - return configuration; - } - public void changeNamespaces(Set namespaces) { var sourcesToRemove = sources.keySet().stream().filter(k -> !namespaces.contains(k)).collect(Collectors.toSet()); @@ -256,12 +252,14 @@ public void addIndexers(Map>> indexers) { this.indexers.putAll(indexers); } + @Override + public Stream byIndexStream(String indexName, String indexKey) { + return sources.values().stream().flatMap(s -> s.byIndexStream(indexName, indexKey)); + } + @Override public List byIndex(String indexName, String indexKey) { - return sources.values().stream() - .map(s -> s.byIndex(indexName, indexKey)) - .flatMap(List::stream) - .collect(Collectors.toList()); + return byIndexStream(indexName, indexKey).toList(); } @Override diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index 26543e8322..3a7528858d 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -18,11 +18,13 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.function.Function; import java.util.function.Predicate; import java.util.function.UnaryOperator; +import java.util.stream.Collectors; import java.util.stream.Stream; import org.slf4j.Logger; @@ -111,7 +113,6 @@ public R eventFilteringUpdateAndCacheResource(R resourceToUpdate, UnaryOperator< res.ifPresentOrElse( r -> { R latestResource = (R) r.getResource().orElseThrow(); - // as previous resource version we use the one from successful update, since // we process new event here only if that is more recent then the event from our update. // Note that this is equivalent with the scenario when an informer watch connection @@ -222,11 +223,6 @@ public Optional getCachedValue(ResourceID resourceID) { return get(resourceID); } - @Override - public Stream list(String namespace, Predicate predicate) { - return manager().list(namespace, predicate); - } - void setTemporalResourceCache(TemporaryResourceCache temporaryResourceCache) { this.temporaryResourceCache = temporaryResourceCache; } @@ -239,19 +235,163 @@ public void addIndexers(Map>> indexers) { this.indexers.putAll(indexers); } + /** + * {@inheritDoc} + * + *

This implementation is read-cache-after-write consistent. Results are merged with the + * temporary resource cache to ensure recently written resources are reflected in the output. + */ @Override - public List byIndex(String indexName, String indexKey) { - return manager().byIndex(indexName, indexKey); + public Stream list(String namespace, Predicate predicate) { + return mergeWithTempCacheForList(manager().list(namespace), namespace, predicate); } + /** + * {@inheritDoc} + * + *

This implementation is read-cache-after-write consistent. Results are merged with the + * temporary resource cache to ensure recently written resources are reflected in the output. + */ @Override - public Stream keys() { - return cache.keys(); + public Stream list(Predicate predicate) { + return mergeWithTempCacheForList(manager().list(), null, predicate); } + /** + * {@inheritDoc} + * + *

This implementation is read-cache-after-write consistent. Results are merged with the + * temporary resource cache to ensure recently written resources are reflected in the output. + */ @Override - public Stream list(Predicate predicate) { - return cache.list(predicate); + public Stream byIndexStream(String indexName, String indexKey) { + return mergeWithTempCacheForIndex( + manager().byIndexStream(indexName, indexKey), indexName, indexKey); + } + + /** + * {@inheritDoc} + * + *

This implementation is read-cache-after-write consistent. Results are merged with the + * temporary resource cache to ensure recently written resources are reflected in the output. + */ + @Override + public List byIndex(String indexName, String indexKey) { + return mergeWithTempCacheForIndex( + manager().byIndexStream(indexName, indexKey), indexName, indexKey) + .collect(Collectors.toList()); + } + + // namespace is filtered on informer manager level + private Stream mergeWithTempCacheForList( + Stream stream, String namespace, Predicate predicate) { + if (!comparableResourceVersions || temporaryResourceCache.isEmpty()) { + + return stream.filter(filterResourceByPredicate(predicate)); + } + var tempResources = new HashMap<>(temporaryResourceCache.getResources()); + if (tempResources.isEmpty()) { + return stream.filter(filterResourceByPredicate(predicate)); + } + + var upToDateList = + stream + .map( + r -> { + var resourceID = ResourceID.fromResource(r); + var tempResource = tempResources.remove(resourceID); + if (tempResource != null + && ReconcilerUtilsInternal.compareResourceVersions(tempResource, r) > 0) { + return tempResource; + } + return r; + }) + // we filter on predicate only since namespace changes would not be detected anyway. + .filter(filterResourceByPredicate(predicate)) + .toList(); + + return Stream.concat( + tempResources.values().stream() + .filter(filterResourceByNamespaceAndPredicate(namespace, predicate)), + upToDateList.stream()); + } + + private Stream mergeWithTempCacheForIndex( + Stream stream, String indexName, String indexKey) { + if (!comparableResourceVersions || temporaryResourceCache.isEmpty()) { + return stream; + } + var tempResources = new HashMap<>(temporaryResourceCache.getResources()); + if (tempResources.isEmpty()) { + return stream; + } + + var indexer = indexers.get(indexName); + if (indexer == null) { + throw new IllegalArgumentException("Indexer not found for: " + indexName); + } + + var upToDateList = + stream + .map( + r -> { + var resourceID = ResourceID.fromResource(r); + var tempResource = tempResources.remove(resourceID); + if (tempResource != null + && ReconcilerUtilsInternal.compareResourceVersions(tempResource, r) > 0) { + if (!indexer.apply(tempResource).contains(indexKey)) { + return null; + } + return tempResource; + } + return r; + }) + .filter(Objects::nonNull) + .toList(); + + // remaining temp resources are ghost resources — include only those matching the index + return Stream.concat( + tempResources.values().stream().filter(r -> indexer.apply(r).contains(indexKey)), + upToDateList.stream()); + } + + private static Predicate filterResourceByPredicate( + Predicate predicate) { + return filterResourceByNamespaceAndPredicate(null, predicate); + } + + private static Predicate filterResourceByNamespaceAndPredicate( + String namespace, Predicate predicate) { + return r -> { + if (namespace != null) { + if (!Optional.of(r) + .map(rr -> Objects.equals(namespace, rr.getMetadata().getNamespace())) + .orElse(false)) { + return false; + } + } + if (predicate != null) { + return predicate.test(r); + } + return true; + }; + } + + /** + * {@inheritDoc} + * + *

This implementation is read-cache-after-write consistent. Keys from the temporary resource + * cache (ghost resources) are included in the result. + */ + @Override + public Stream keys() { + if (!comparableResourceVersions || temporaryResourceCache.isEmpty()) { + return manager().keys(); + } + var managerKeys = manager().keys().collect(Collectors.toSet()); + var tempKeys = temporaryResourceCache.getResources().keySet(); + return Stream.concat( + managerKeys.stream(), tempKeys.stream().filter(k -> !managerKeys.contains(k))); } @Override diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java index 5a4486f756..551fb8adb2 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java @@ -15,6 +15,7 @@ */ package io.javaoperatorsdk.operator.processing.event.source.informer; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -275,4 +276,12 @@ private void checkGhostResources() { public synchronized Optional getResourceFromCache(ResourceID resourceID) { return Optional.ofNullable(cache.get(resourceID)); } + + synchronized boolean isEmpty() { + return cache.isEmpty(); + } + + synchronized Map getResources() { + return Collections.unmodifiableMap(cache); + } } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java index e60ac02280..18659b14f0 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java @@ -16,11 +16,15 @@ package io.javaoperatorsdk.operator.processing.event.source.informer; import java.time.Duration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.stream.Stream; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -41,6 +45,7 @@ import io.javaoperatorsdk.operator.api.reconciler.Constants; import io.javaoperatorsdk.operator.processing.event.EventHandler; import io.javaoperatorsdk.operator.processing.event.ResourceID; +import io.javaoperatorsdk.operator.processing.event.source.Cache; import io.javaoperatorsdk.operator.processing.event.source.EventFilterTestUtils; import io.javaoperatorsdk.operator.processing.event.source.ResourceAction; import io.javaoperatorsdk.operator.processing.event.source.SecondaryToPrimaryMapper; @@ -56,6 +61,7 @@ import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isNull; +import static org.mockito.ArgumentMatchers.nullable; import static org.mockito.Mockito.atLeastOnce; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; @@ -87,10 +93,12 @@ void setup() { when(secondaryToPrimaryMapper.toPrimaryResourceIDs(any())) .thenReturn(Set.of(ResourceID.fromResource(testDeployment()))); when(informerEventSourceConfiguration.getInformerConfig()).thenReturn(informerConfig); - when(informerConfig.getEffectiveNamespaces(any())).thenReturn(DEFAULT_NAMESPACES_SET); + when(informerEventSourceConfiguration.getResourceClass()).thenReturn(Deployment.class); when(informerConfig.getGhostResourceCacheCheckInterval()) .thenReturn(Constants.DEFAULT_GHOST_RESOURCE_CHECK_INTERVAL); + when(informerConfig.isComparableResourceVersions()).thenReturn(true); + when(informerConfig.getEffectiveNamespaces(any())).thenReturn(DEFAULT_NAMESPACES_SET); informerEventSource = spy( new InformerEventSource<>(informerEventSourceConfiguration, clientMock) { @@ -533,6 +541,222 @@ void informerStoppedHandlerShouldBeCalledWhenInformerStops() { verify(informerStoppedHandler, atLeastOnce()).onStop(any(), eq(exception)); } + @Test + void listReplacesResourceFromTempCache() { + var original = testDeployment(); + var newer = testDeployment(); + newer.getMetadata().setResourceVersion("5"); + + when(temporaryResourceCache.getResources()) + .thenReturn(new HashMap<>(Map.of(ResourceID.fromResource(original), newer))); + + var mim = mock(InformerManager.class); + when(mim.list(nullable(String.class))).thenReturn(Stream.of(original)); + when(informerEventSource.manager()).thenReturn(mim); + + var result = informerEventSource.list(null, Cache.TRUE).toList(); + + assertThat(result).containsExactly(newer); + } + + @Test + void listExcludesResourceWhenTempCacheContainsNewerVersionThatNoLongerMatchesPredicate() { + var original = testDeployment(); + original.getMetadata().setResourceVersion("4"); + var newer = testDeployment(); + newer.getMetadata().setResourceVersion("5"); + + when(temporaryResourceCache.getResources()) + .thenReturn(new HashMap<>(Map.of(ResourceID.fromResource(original), newer))); + + var mim = mock(InformerManager.class); + when(mim.list(nullable(String.class), any())).thenReturn(Stream.of(original)); + when(informerEventSource.manager()).thenReturn(mim); + + var result = + informerEventSource + .list(null, r -> !"5".equals(r.getMetadata().getResourceVersion())) + .toList(); + + assertThat(result).isEmpty(); + } + + @Test + void listKeepsResourceWhenNotInTempCache() { + var original = testDeployment(); + + when(temporaryResourceCache.getResources()).thenReturn(new HashMap<>()); + + var mim = mock(InformerManager.class); + when(mim.list(nullable(String.class))).thenReturn(Stream.of(original)); + when(informerEventSource.manager()).thenReturn(mim); + + var result = informerEventSource.list(null, r -> true).toList(); + + assertThat(result).containsExactly(original); + } + + @Test + void listReplacesOnlyMatchingResources() { + var dep1 = testDeployment(); + var dep2 = testDeployment(); + dep2.getMetadata().setName("other"); + var newerDep1 = testDeployment(); + newerDep1.getMetadata().setResourceVersion("5"); + + when(temporaryResourceCache.getResources()) + .thenReturn(new HashMap<>(Map.of(ResourceID.fromResource(dep1), newerDep1))); + + var informerManager = mock(InformerManager.class); + when(informerManager.list(nullable(String.class))).thenReturn(Stream.of(dep1, dep2)); + when(informerEventSource.manager()).thenReturn(informerManager); + + var result = informerEventSource.list(null, r -> true).toList(); + + assertThat(result).containsExactlyInAnyOrder(newerDep1, dep2); + } + + @Test + void byIndexStreamReplacesFromTempCache() { + var original = testDeployment(); + var newer = testDeployment(); + newer.getMetadata().setResourceVersion("5"); + + when(temporaryResourceCache.getResources()) + .thenReturn(new HashMap<>(Map.of(ResourceID.fromResource(original), newer))); + + var informerManager = mock(InformerManager.class); + when(informerManager.byIndexStream(any(), any())).thenReturn(Stream.of(original)); + when(informerEventSource.manager()).thenReturn(informerManager); + informerEventSource.addIndexers(Map.of("idx", d -> List.of("key"))); + + var result = informerEventSource.byIndexStream("idx", "key").toList(); + + assertThat(result).containsExactly(newer); + } + + @Test + void byIndexStreamSkipsNewerTempCacheResourceWhenIndexedValueChanged() { + var original = testDeployment(); + original.getMetadata().setLabels(Map.of("app", "key")); + var newer = testDeployment(); + newer.getMetadata().setResourceVersion("5"); + newer.getMetadata().setLabels(Map.of("app", "other")); + + when(temporaryResourceCache.getResources()) + .thenReturn(new HashMap<>(Map.of(ResourceID.fromResource(original), newer))); + + var informerManager = mock(InformerManager.class); + when(informerManager.byIndexStream(any(), any())).thenReturn(Stream.of(original)); + when(informerEventSource.manager()).thenReturn(informerManager); + informerEventSource.addIndexers( + Map.of("idx", d -> List.of(d.getMetadata().getLabels().get("app")))); + + var result = informerEventSource.byIndexStream("idx", "key").toList(); + + assertThat(result).isEmpty(); + } + + @Test + void listKeepsResourceWhenTempCacheHasOlderVersion() { + var original = testDeployment(); + original.getMetadata().setResourceVersion("5"); + var olderTemp = testDeployment(); + olderTemp.getMetadata().setResourceVersion("3"); + + when(temporaryResourceCache.getResources()) + .thenReturn(new HashMap<>(Map.of(ResourceID.fromResource(original), olderTemp))); + + var mim = mock(InformerManager.class); + when(mim.list(nullable(String.class))).thenReturn(Stream.of(original)); + when(informerEventSource.manager()).thenReturn(mim); + + var result = informerEventSource.list(null, r -> true).toList(); + + assertThat(result).containsExactly(original); + } + + @Test + void byIndexStreamKeepsResourceWhenTempCacheHasOlderVersion() { + var original = testDeployment(); + original.getMetadata().setResourceVersion("5"); + var olderTemp = testDeployment(); + olderTemp.getMetadata().setResourceVersion("3"); + + when(temporaryResourceCache.getResources()) + .thenReturn(new HashMap<>(Map.of(ResourceID.fromResource(original), olderTemp))); + + var mim = mock(InformerManager.class); + when(mim.byIndexStream(any(), any())).thenReturn(Stream.of(original)); + when(informerEventSource.manager()).thenReturn(mim); + informerEventSource.addIndexers(Map.of("idx", d -> List.of("key"))); + + var result = informerEventSource.byIndexStream("idx", "key").toList(); + + assertThat(result).containsExactly(original); + } + + @Test + void listAddsGhostResources() { + var resource = testDeployment(); + var ghostResource = testDeployment(); + ghostResource.getMetadata().setName("ghost"); + + when(temporaryResourceCache.getResources()) + .thenReturn(new HashMap<>(Map.of(ResourceID.fromResource(ghostResource), ghostResource))); + + var mim = mock(InformerManager.class); + when(mim.list(nullable(String.class))).thenReturn(Stream.of(resource)); + when(informerEventSource.manager()).thenReturn(mim); + + var result = informerEventSource.list(null, r -> true).toList(); + + assertThat(result).containsExactlyInAnyOrder(resource, ghostResource); + } + + @Test + void keysIncludesGhostResourceKeys() { + var resource = testDeployment(); + var ghostResource = testDeployment(); + ghostResource.getMetadata().setName("ghost"); + + var resourceId = ResourceID.fromResource(resource); + var ghostResourceId = ResourceID.fromResource(ghostResource); + + when(temporaryResourceCache.getResources()).thenReturn(Map.of(ghostResourceId, ghostResource)); + when(temporaryResourceCache.isEmpty()).thenReturn(false); + + var mim = mock(InformerManager.class); + when(mim.keys()).thenReturn(Stream.of(resourceId)); + when(mim.contains(ghostResourceId)).thenReturn(false); + when(informerEventSource.manager()).thenReturn(mim); + + var result = informerEventSource.keys().toList(); + + assertThat(result).containsExactlyInAnyOrder(resourceId, ghostResourceId); + } + + @Test + void keysDoesNotDuplicateExistingKeys() { + var resource = testDeployment(); + var newerResource = testDeployment(); + newerResource.getMetadata().setResourceVersion("5"); + + var resourceId = ResourceID.fromResource(resource); + + when(temporaryResourceCache.getResources()).thenReturn(Map.of(resourceId, newerResource)); + when(temporaryResourceCache.isEmpty()).thenReturn(false); + + var mim = mock(InformerManager.class); + when(mim.keys()).thenReturn(Stream.of(resourceId)); + when(mim.contains(resourceId)).thenReturn(true); + when(informerEventSource.manager()).thenReturn(mim); + + var result = informerEventSource.keys().toList(); + + assertThat(result).containsExactly(resourceId); + } + Deployment testDeployment() { Deployment deployment = new Deployment(); deployment.setMetadata(new ObjectMeta());