From 2cf57c3191291b2bc565633a0aac559c5189dfdf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Thu, 30 Apr 2026 15:38:49 +0200 Subject: [PATCH 01/19] feat: strong consistency variant for informer list and byIndex (#3316) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: Chris Laprun Signed-off-by: Attila Mészáros --- .../api/reconciler/IndexedResourceCache.java | 4 + .../api/reconciler/ResourceOperations.java | 3 +- .../source/informer/InformerManager.java | 9 +- .../source/informer/InformerWrapper.java | 5 + .../informer/ManagedInformerEventSource.java | 132 +++++++++++++++-- .../informer/TemporaryResourceCache.java | 8 + .../informer/InformerEventSourceTest.java | 138 +++++++++++++++++- 7 files changed, 282 insertions(+), 17 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/IndexedResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/IndexedResourceCache.java index 5ccc5894a1..b8f8f3381a 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/IndexedResourceCache.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/IndexedResourceCache.java @@ -16,9 +16,13 @@ package io.javaoperatorsdk.operator.api.reconciler; import java.util.List; +import java.util.stream.Stream; import io.fabric8.kubernetes.api.model.HasMetadata; public interface IndexedResourceCache extends ResourceCache { + List byIndex(String indexName, String indexKey); + + Stream byIndexStream(String indexName, String indexKey); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ResourceOperations.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ResourceOperations.java index de4d00d717..6f9e73d8ed 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ResourceOperations.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ResourceOperations.java @@ -37,7 +37,8 @@ /** * Provides useful operations to manipulate resources (server-side apply, patch, etc.) in an * idiomatic way, in particular to make sure that the latest version of the resource is present in - * the caches for the next reconciliation. + * the caches for the next reconciliation. In other words provides read-cache-after-write + * consistency. * * @param

the resource type on which this object operates */ diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java index 6632ce631e..a3ef72a2a3 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java @@ -256,12 +256,13 @@ public void addIndexers(Map>> indexers) { this.indexers.putAll(indexers); } + public Stream byIndexStream(String indexName, String indexKey) { + return sources.values().stream().map(s -> s.byIndex(indexName, indexKey)).flatMap(List::stream); + } + @Override public List byIndex(String indexName, String indexKey) { - return sources.values().stream() - .map(s -> s.byIndex(indexName, indexKey)) - .flatMap(List::stream) - .collect(Collectors.toList()); + return byIndexStream(indexName, indexKey).collect(Collectors.toList()); } @Override diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java index 541068aa93..8885c225c8 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java @@ -195,6 +195,11 @@ public List byIndex(String indexName, String indexKey) { return informer.getIndexer().byIndex(indexName, indexKey); } + @Override + public Stream byIndexStream(String indexName, String indexKey) { + return byIndex(indexName, indexKey).stream(); + } + @Override public String toString() { return informerInfo() + " (" + informer + ')'; diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index 26543e8322..69a5f36bf4 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -23,6 +23,7 @@ import java.util.function.Function; import java.util.function.Predicate; import java.util.function.UnaryOperator; +import java.util.stream.Collectors; import java.util.stream.Stream; import org.slf4j.Logger; @@ -111,7 +112,6 @@ public R eventFilteringUpdateAndCacheResource(R resourceToUpdate, UnaryOperator< res.ifPresentOrElse( r -> { R latestResource = (R) r.getResource().orElseThrow(); - // as previous resource version we use the one from successful update, since // we process new event here only if that is more recent then the event from our update. // Note that this is equivalent with the scenario when an informer watch connection @@ -222,11 +222,6 @@ public Optional getCachedValue(ResourceID resourceID) { return get(resourceID); } - @Override - public Stream list(String namespace, Predicate predicate) { - return manager().list(namespace, predicate); - } - void setTemporalResourceCache(TemporaryResourceCache temporaryResourceCache) { this.temporaryResourceCache = temporaryResourceCache; } @@ -239,19 +234,134 @@ public void addIndexers(Map>> indexers) { this.indexers.putAll(indexers); } + @Override + public Stream list(String namespace, Predicate predicate) { + return manager().list(namespace, predicate); + } + + @Override + public Stream list(Predicate predicate) { + return cache.list(predicate); + } + @Override public List byIndex(String indexName, String indexKey) { return manager().byIndex(indexName, indexKey); } - @Override - public Stream keys() { - return cache.keys(); + public Stream byIndexStream(String indexName, String indexKey) { + return manager().byIndexStream(indexName, indexKey); + } + + /** + * Like {@link #list(String, Predicate)} but for read-cache-after-write consistency. This is + * useful when resources are updated using {@link + * io.javaoperatorsdk.operator.api.reconciler.ResourceOperations}. + */ + public Stream listWithStrongConsistency(String namespace, Predicate predicate) { + return mergeWithWithTempCacheResources( + manager().list(namespace, predicate), namespace, predicate); + } + + /** + * Like {@link #list(Predicate)} but for read-cache-after-write consistency. This is useful when + * resources are updated using {@link + * io.javaoperatorsdk.operator.api.reconciler.ResourceOperations}. + */ + public Stream listWithStrongConsistency(Predicate predicate) { + return mergeWithWithTempCacheResources(cache.list(predicate), null, predicate); + } + + /** + * Like {@link #byIndexStream(String, String)} but for read-cache-after-write consistency. This is + * useful when resources are updated using {@link + * io.javaoperatorsdk.operator.api.reconciler.ResourceOperations}. + */ + public Stream byIndexStreamWithStrongConsistency(String indexName, String indexKey) { + return mergeWithWithTempCacheResources( + manager().byIndexStream(indexName, indexKey), indexName, indexKey); + } + + private Stream mergeWithWithTempCacheResources( + Stream stream, String indexName, String indexKey) { + return mergeWithWithTempCacheResources(stream, null, null, indexName, indexKey); + } + + private Stream mergeWithWithTempCacheResources( + Stream stream, String namespace, Predicate predicate) { + return mergeWithWithTempCacheResources(stream, namespace, predicate, null, null); + } + + private Stream mergeWithWithTempCacheResources( + Stream stream, + String namespace, + Predicate predicate, + String indexName, + String indexKey) { + if (!comparableResourceVersions || temporaryResourceCache.isEmpty()) { + return stream; + } + var allTempResources = temporaryResourceCache.getResources(); + Map tempResources; + if (namespace == null && predicate == null) { + tempResources = new HashMap<>(allTempResources); + } else { + // filtering the temp cache according the user input (predicate, namespace) + tempResources = + allTempResources.entrySet().stream() + .filter( + e -> { + if (namespace != null) { + var res = + e.getKey().getNamespace().map(ns -> ns.equals(namespace)).orElse(false); + if (!res) return false; + } + if (predicate != null) { + return predicate.test(e.getValue()); + } + return true; + }) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + if (tempResources.isEmpty()) { + return stream; + } + var upToDateList = + stream + .map( + r -> { + var resourceID = ResourceID.fromResource(r); + // removing the id from the related temp resources + // this is important so we can detect ghost resources: + // all that remains is ghost resource + var tempResource = tempResources.remove(resourceID); + // using the latest version + if (tempResource != null + && ReconcilerUtilsInternal.compareResourceVersions(tempResource, r) > 0) { + return tempResource; + } + return r; + }) + .toList(); + Stream tempResourceStream; + // ghost resource handling + if (indexName != null && indexKey != null) { + var indexer = indexers.get(indexName); + if (indexer == null) { + throw new IllegalArgumentException("Indexer not found for: " + indexName); + } + // we check if the ghost resource is part of the index + tempResourceStream = + tempResources.values().stream().filter(r -> indexer.apply(r).contains(indexKey)); + } else { + tempResourceStream = tempResources.values().stream(); + } + return Stream.concat(tempResourceStream, upToDateList.stream()); } @Override - public Stream list(Predicate predicate) { - return cache.list(predicate); + public Stream keys() { + return cache.keys(); } @Override diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java index 5a4486f756..39543843b8 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java @@ -275,4 +275,12 @@ private void checkGhostResources() { public synchronized Optional getResourceFromCache(ResourceID resourceID) { return Optional.ofNullable(cache.get(resourceID)); } + + synchronized boolean isEmpty() { + return cache.isEmpty(); + } + + synchronized Map getResources() { + return new HashMap<>(cache); + } } diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java index e60ac02280..7313cc3a48 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java @@ -16,11 +16,14 @@ package io.javaoperatorsdk.operator.processing.event.source.informer; import java.time.Duration; +import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.Set; import java.util.concurrent.CountDownLatch; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.stream.Stream; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -56,7 +59,9 @@ import static org.mockito.ArgumentMatchers.argThat; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isNull; +import static org.mockito.ArgumentMatchers.nullable; import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; @@ -87,10 +92,12 @@ void setup() { when(secondaryToPrimaryMapper.toPrimaryResourceIDs(any())) .thenReturn(Set.of(ResourceID.fromResource(testDeployment()))); when(informerEventSourceConfiguration.getInformerConfig()).thenReturn(informerConfig); - when(informerConfig.getEffectiveNamespaces(any())).thenReturn(DEFAULT_NAMESPACES_SET); + when(informerEventSourceConfiguration.getResourceClass()).thenReturn(Deployment.class); when(informerConfig.getGhostResourceCacheCheckInterval()) .thenReturn(Constants.DEFAULT_GHOST_RESOURCE_CHECK_INTERVAL); + when(informerConfig.isComparableResourceVersions()).thenReturn(true); + when(informerConfig.getEffectiveNamespaces(any())).thenReturn(DEFAULT_NAMESPACES_SET); informerEventSource = spy( new InformerEventSource<>(informerEventSourceConfiguration, clientMock) { @@ -533,6 +540,135 @@ void informerStoppedHandlerShouldBeCalledWhenInformerStops() { verify(informerStoppedHandler, atLeastOnce()).onStop(any(), eq(exception)); } + @Test + void listWithStrongConsistencyReplacesResourceFromTempCache() { + var original = testDeployment(); + var newer = testDeployment(); + newer.getMetadata().setResourceVersion("5"); + + when(temporaryResourceCache.getResources()) + .thenReturn(Map.of(ResourceID.fromResource(original), newer)); + + var mim = mock(InformerManager.class); + when(mim.list(nullable(String.class), any())).thenReturn(Stream.of(original)); + doReturn(mim).when(informerEventSource).manager(); + + var result = informerEventSource.listWithStrongConsistency(null, r -> true).toList(); + + assertThat(result).containsExactly(newer); + } + + @Test + void listWithStrongConsistencyKeepsResourceWhenNotInTempCache() { + var original = testDeployment(); + + when(temporaryResourceCache.getResources()).thenReturn(Map.of()); + + var mim = mock(InformerManager.class); + when(mim.list(nullable(String.class), any())).thenReturn(Stream.of(original)); + doReturn(mim).when(informerEventSource).manager(); + + var result = informerEventSource.listWithStrongConsistency("default", r -> true).toList(); + + assertThat(result).containsExactly(original); + } + + @Test + void listWithStrongConsistencyReplacesOnlyMatchingResources() { + var dep1 = testDeployment(); + var dep2 = testDeployment(); + dep2.getMetadata().setName("other"); + var newerDep1 = testDeployment(); + newerDep1.getMetadata().setResourceVersion("5"); + + when(temporaryResourceCache.getResources()) + .thenReturn(Map.of(ResourceID.fromResource(dep1), newerDep1)); + + var informerManager = mock(InformerManager.class); + when(informerManager.list(nullable(String.class), any())).thenReturn(Stream.of(dep1, dep2)); + doReturn(informerManager).when(informerEventSource).manager(); + + var result = informerEventSource.listWithStrongConsistency(null, r -> true).toList(); + + assertThat(result).containsExactlyInAnyOrder(newerDep1, dep2); + } + + @Test + void byIndexStreamWithStrongConsistencyReplacesFromTempCache() { + var original = testDeployment(); + var newer = testDeployment(); + newer.getMetadata().setResourceVersion("5"); + + when(temporaryResourceCache.getResources()) + .thenReturn(Map.of(ResourceID.fromResource(original), newer)); + + var informerManager = mock(InformerManager.class); + when(informerManager.byIndexStream(any(), any())).thenReturn(Stream.of(original)); + doReturn(informerManager).when(informerEventSource).manager(); + informerEventSource.addIndexers(Map.of("idx", d -> List.of("key"))); + + var result = informerEventSource.byIndexStreamWithStrongConsistency("idx", "key").toList(); + + assertThat(result).containsExactly(newer); + } + + @Test + void listWithStrongConsistencyKeepsResourceWhenTempCacheHasOlderVersion() { + var original = testDeployment(); + original.getMetadata().setResourceVersion("5"); + var olderTemp = testDeployment(); + olderTemp.getMetadata().setResourceVersion("3"); + + when(temporaryResourceCache.getResources()) + .thenReturn(Map.of(ResourceID.fromResource(original), olderTemp)); + + var mim = mock(InformerManager.class); + when(mim.list(nullable(String.class), any())).thenReturn(Stream.of(original)); + doReturn(mim).when(informerEventSource).manager(); + + var result = informerEventSource.listWithStrongConsistency(null, r -> true).toList(); + + assertThat(result).containsExactly(original); + } + + @Test + void byIndexStreamWithStrongConsistencyKeepsResourceWhenTempCacheHasOlderVersion() { + var original = testDeployment(); + original.getMetadata().setResourceVersion("5"); + var olderTemp = testDeployment(); + olderTemp.getMetadata().setResourceVersion("3"); + + when(temporaryResourceCache.getResources()) + .thenReturn(Map.of(ResourceID.fromResource(original), olderTemp)); + + var mim = mock(InformerManager.class); + when(mim.byIndexStream(any(), any())).thenReturn(Stream.of(original)); + doReturn(mim).when(informerEventSource).manager(); + informerEventSource.addIndexers(Map.of("idx", d -> List.of("key"))); + + var result = informerEventSource.byIndexStreamWithStrongConsistency("idx", "key").toList(); + + assertThat(result).containsExactly(original); + } + + @Test + void listWithStrongConsistencyAddsGhostResources() { + var resource = testDeployment(); + var ghostResource = testDeployment(); + ghostResource.getMetadata().setName("ghost"); + + when(temporaryResourceCache.getResources()) + .thenReturn(Map.of(ResourceID.fromResource(ghostResource), ghostResource)); + + var mim = mock(InformerManager.class); + when(mim.list(nullable(String.class), any())).thenReturn(Stream.of(resource)); + doReturn(mim).when(informerEventSource).manager(); + + var result = informerEventSource.listWithStrongConsistency(null, r -> true).toList(); + + assertThat(result).containsExactlyInAnyOrder(resource, ghostResource); + } + Deployment testDeployment() { Deployment deployment = new Deployment(); deployment.setMetadata(new ObjectMeta()); From 88b60c23600a6d4a3cdfdf2e2ed1836799d73d63 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Tue, 12 May 2026 20:20:05 +0200 Subject: [PATCH 02/19] Read-cache-after-write aware list and by index MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../api/reconciler/ResourceCache.java | 13 ++ .../processing/event/source/Cache.java | 28 +++ .../informer/ManagedInformerEventSource.java | 185 ++++++++++-------- .../informer/InformerEventSourceTest.java | 130 ++++++++++-- 4 files changed, 251 insertions(+), 105 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ResourceCache.java index dd2844d9f8..672b48e540 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ResourceCache.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ResourceCache.java @@ -24,9 +24,22 @@ @SuppressWarnings("unchecked") public interface ResourceCache extends Cache { + /** + * Lists all resources in the given namespace. + * + * @param namespace the namespace to list resources from + * @return a stream of all cached resources in the namespace + */ default Stream list(String namespace) { return list(namespace, TRUE); } + /** + * Lists resources in the given namespace that match the provided predicate. + * + * @param namespace the namespace to list resources from + * @param predicate filter to apply on the resources + * @return a stream of cached resources matching the predicate + */ Stream list(String namespace, Predicate predicate); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/Cache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/Cache.java index ffcfd2df58..b2c2d2692d 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/Cache.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/Cache.java @@ -25,17 +25,45 @@ public interface Cache { Predicate TRUE = (a) -> true; + /** + * Retrieves a resource from the cache by its {@link ResourceID}. + * + * @param resourceID the identifier of the resource + * @return an Optional containing the resource if present in the cache + */ Optional get(ResourceID resourceID); + /** + * Checks whether a resource with the given {@link ResourceID} exists in the cache. + * + * @param resourceID the identifier of the resource + * @return {@code true} if the resource is present in the cache + */ default boolean contains(ResourceID resourceID) { return get(resourceID).isPresent(); } + /** + * Returns a stream of all {@link ResourceID}s currently in the cache. + * + * @return a stream of resource identifiers + */ Stream keys(); + /** + * Lists all resources in the cache. + * + * @return a stream of all cached resources + */ default Stream list() { return list(TRUE); } + /** + * Lists resources in the cache that match the provided predicate. + * + * @param predicate filter to apply on the resources + * @return a stream of cached resources matching the predicate + */ Stream list(Predicate predicate); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index 69a5f36bf4..d33f514f78 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -18,6 +18,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.Optional; import java.util.Set; import java.util.function.Function; @@ -234,134 +235,150 @@ public void addIndexers(Map>> indexers) { this.indexers.putAll(indexers); } + /** + * {@inheritDoc} + * + *

This implementation is read-cache-after-write consistent. Results are merged with the + * temporary resource cache to ensure recently written resources are reflected in the output. + */ @Override public Stream list(String namespace, Predicate predicate) { - return manager().list(namespace, predicate); + return mergeWithTempCacheForList(manager().list(namespace, predicate), namespace, predicate); } + /** + * {@inheritDoc} + * + *

This implementation is read-cache-after-write consistent. Results are merged with the + * temporary resource cache to ensure recently written resources are reflected in the output. + */ @Override public Stream list(Predicate predicate) { - return cache.list(predicate); - } - - @Override - public List byIndex(String indexName, String indexKey) { - return manager().byIndex(indexName, indexKey); - } - - public Stream byIndexStream(String indexName, String indexKey) { - return manager().byIndexStream(indexName, indexKey); + return mergeWithTempCacheForList(cache.list(predicate), null, predicate); } /** - * Like {@link #list(String, Predicate)} but for read-cache-after-write consistency. This is - * useful when resources are updated using {@link - * io.javaoperatorsdk.operator.api.reconciler.ResourceOperations}. + * {@inheritDoc} + * + *

This implementation is read-cache-after-write consistent. Results are merged with the + * temporary resource cache to ensure recently written resources are reflected in the output. */ - public Stream listWithStrongConsistency(String namespace, Predicate predicate) { - return mergeWithWithTempCacheResources( - manager().list(namespace, predicate), namespace, predicate); + @Override + public Stream byIndexStream(String indexName, String indexKey) { + return mergeWithTempCacheForIndex( + manager().byIndexStream(indexName, indexKey), indexName, indexKey); } /** - * Like {@link #list(Predicate)} but for read-cache-after-write consistency. This is useful when - * resources are updated using {@link - * io.javaoperatorsdk.operator.api.reconciler.ResourceOperations}. + * {@inheritDoc} + * + *

This implementation is read-cache-after-write consistent. Results are merged with the + * temporary resource cache to ensure recently written resources are reflected in the output. */ - public Stream listWithStrongConsistency(Predicate predicate) { - return mergeWithWithTempCacheResources(cache.list(predicate), null, predicate); + @Override + public List byIndex(String indexName, String indexKey) { + return mergeWithTempCacheForIndex( + manager().byIndexStream(indexName, indexKey), indexName, indexKey) + .collect(Collectors.toList()); } - /** - * Like {@link #byIndexStream(String, String)} but for read-cache-after-write consistency. This is - * useful when resources are updated using {@link - * io.javaoperatorsdk.operator.api.reconciler.ResourceOperations}. - */ - public Stream byIndexStreamWithStrongConsistency(String indexName, String indexKey) { - return mergeWithWithTempCacheResources( - manager().byIndexStream(indexName, indexKey), indexName, indexKey); - } + private Stream mergeWithTempCacheForList( + Stream stream, String namespace, Predicate predicate) { + if (!comparableResourceVersions || temporaryResourceCache.isEmpty()) { + return stream.filter(filterResourceByNamespaceAndPredicate(namespace, predicate)); + } + var tempResources = new HashMap<>(temporaryResourceCache.getResources()); + if (tempResources.isEmpty()) { + return stream.filter(filterResourceByNamespaceAndPredicate(namespace, predicate)); + } - private Stream mergeWithWithTempCacheResources( - Stream stream, String indexName, String indexKey) { - return mergeWithWithTempCacheResources(stream, null, null, indexName, indexKey); - } + var upToDateList = + stream + .map( + r -> { + var resourceID = ResourceID.fromResource(r); + var tempResource = tempResources.remove(resourceID); + if (tempResource != null + && ReconcilerUtilsInternal.compareResourceVersions(tempResource, r) > 0) { + return tempResource; + } + return r; + }) + .filter(filterResourceByNamespaceAndPredicate(namespace, predicate)) + .toList(); - private Stream mergeWithWithTempCacheResources( - Stream stream, String namespace, Predicate predicate) { - return mergeWithWithTempCacheResources(stream, namespace, predicate, null, null); + return Stream.concat( + tempResources.values().stream() + .filter(filterResourceByNamespaceAndPredicate(namespace, predicate)), + upToDateList.stream()); } - private Stream mergeWithWithTempCacheResources( - Stream stream, - String namespace, - Predicate predicate, - String indexName, - String indexKey) { + private Stream mergeWithTempCacheForIndex( + Stream stream, String indexName, String indexKey) { if (!comparableResourceVersions || temporaryResourceCache.isEmpty()) { return stream; } - var allTempResources = temporaryResourceCache.getResources(); - Map tempResources; - if (namespace == null && predicate == null) { - tempResources = new HashMap<>(allTempResources); - } else { - // filtering the temp cache according the user input (predicate, namespace) - tempResources = - allTempResources.entrySet().stream() - .filter( - e -> { - if (namespace != null) { - var res = - e.getKey().getNamespace().map(ns -> ns.equals(namespace)).orElse(false); - if (!res) return false; - } - if (predicate != null) { - return predicate.test(e.getValue()); - } - return true; - }) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - } + var tempResources = new HashMap<>(temporaryResourceCache.getResources()); if (tempResources.isEmpty()) { return stream; } + + var indexer = indexers.get(indexName); + if (indexer == null) { + throw new IllegalArgumentException("Indexer not found for: " + indexName); + } + var upToDateList = stream .map( r -> { var resourceID = ResourceID.fromResource(r); - // removing the id from the related temp resources - // this is important so we can detect ghost resources: - // all that remains is ghost resource var tempResource = tempResources.remove(resourceID); - // using the latest version if (tempResource != null && ReconcilerUtilsInternal.compareResourceVersions(tempResource, r) > 0) { + if (!indexer.apply(tempResource).contains(indexKey)) { + return null; + } return tempResource; } return r; }) + .filter(Objects::nonNull) .toList(); - Stream tempResourceStream; - // ghost resource handling - if (indexName != null && indexKey != null) { - var indexer = indexers.get(indexName); - if (indexer == null) { - throw new IllegalArgumentException("Indexer not found for: " + indexName); + + // remaining temp resources are ghost resources — include only those matching the index + return Stream.concat( + tempResources.values().stream().filter(r -> indexer.apply(r).contains(indexKey)), + upToDateList.stream()); + } + + private static Predicate filterResourceByNamespaceAndPredicate( + String namespace, Predicate predicate) { + return r -> { + if (namespace != null) { + var res = Optional.of(r).map(ns -> ns.equals(namespace)).orElse(false); + if (!res) return false; } - // we check if the ghost resource is part of the index - tempResourceStream = - tempResources.values().stream().filter(r -> indexer.apply(r).contains(indexKey)); - } else { - tempResourceStream = tempResources.values().stream(); - } - return Stream.concat(tempResourceStream, upToDateList.stream()); + if (predicate != null) { + return predicate.test(r); + } + return true; + }; } + /** + * {@inheritDoc} + * + *

This implementation is read-cache-after-write consistent. Keys from the temporary resource + * cache (ghost resources) are included in the result. + */ @Override public Stream keys() { - return cache.keys(); + if (!comparableResourceVersions || temporaryResourceCache.isEmpty()) { + return manager().keys(); + } + var tempKeys = temporaryResourceCache.getResources().keySet(); + return Stream.concat(manager().keys(), tempKeys.stream().filter(k -> !manager().contains(k))); } @Override diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java index 7313cc3a48..2a68101984 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java @@ -16,6 +16,7 @@ package io.javaoperatorsdk.operator.processing.event.source.informer; import java.time.Duration; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; @@ -541,40 +542,62 @@ void informerStoppedHandlerShouldBeCalledWhenInformerStops() { } @Test - void listWithStrongConsistencyReplacesResourceFromTempCache() { + void listReplacesResourceFromTempCache() { var original = testDeployment(); var newer = testDeployment(); newer.getMetadata().setResourceVersion("5"); when(temporaryResourceCache.getResources()) - .thenReturn(Map.of(ResourceID.fromResource(original), newer)); + .thenReturn(new HashMap<>(Map.of(ResourceID.fromResource(original), newer))); var mim = mock(InformerManager.class); when(mim.list(nullable(String.class), any())).thenReturn(Stream.of(original)); doReturn(mim).when(informerEventSource).manager(); - var result = informerEventSource.listWithStrongConsistency(null, r -> true).toList(); + var result = informerEventSource.list(null, r -> true).toList(); assertThat(result).containsExactly(newer); } @Test - void listWithStrongConsistencyKeepsResourceWhenNotInTempCache() { + void listExcludesResourceWhenTempCacheContainsNewerVersionThatNoLongerMatchesPredicate() { + var original = testDeployment(); + original.getMetadata().setResourceVersion("4"); + var newer = testDeployment(); + newer.getMetadata().setResourceVersion("5"); + + when(temporaryResourceCache.getResources()) + .thenReturn(new HashMap<>(Map.of(ResourceID.fromResource(original), newer))); + + var mim = mock(InformerManager.class); + when(mim.list(nullable(String.class), any())).thenReturn(Stream.of(original)); + doReturn(mim).when(informerEventSource).manager(); + + var result = + informerEventSource + .list(null, r -> !"5".equals(r.getMetadata().getResourceVersion())) + .toList(); + + assertThat(result).isEmpty(); + } + + @Test + void listKeepsResourceWhenNotInTempCache() { var original = testDeployment(); - when(temporaryResourceCache.getResources()).thenReturn(Map.of()); + when(temporaryResourceCache.getResources()).thenReturn(new HashMap<>()); var mim = mock(InformerManager.class); when(mim.list(nullable(String.class), any())).thenReturn(Stream.of(original)); doReturn(mim).when(informerEventSource).manager(); - var result = informerEventSource.listWithStrongConsistency("default", r -> true).toList(); + var result = informerEventSource.list(null, r -> true).toList(); assertThat(result).containsExactly(original); } @Test - void listWithStrongConsistencyReplacesOnlyMatchingResources() { + void listReplacesOnlyMatchingResources() { var dep1 = testDeployment(); var dep2 = testDeployment(); dep2.getMetadata().setName("other"); @@ -582,93 +605,158 @@ void listWithStrongConsistencyReplacesOnlyMatchingResources() { newerDep1.getMetadata().setResourceVersion("5"); when(temporaryResourceCache.getResources()) - .thenReturn(Map.of(ResourceID.fromResource(dep1), newerDep1)); + .thenReturn(new HashMap<>(Map.of(ResourceID.fromResource(dep1), newerDep1))); var informerManager = mock(InformerManager.class); when(informerManager.list(nullable(String.class), any())).thenReturn(Stream.of(dep1, dep2)); doReturn(informerManager).when(informerEventSource).manager(); - var result = informerEventSource.listWithStrongConsistency(null, r -> true).toList(); + var result = informerEventSource.list(null, r -> true).toList(); assertThat(result).containsExactlyInAnyOrder(newerDep1, dep2); } @Test - void byIndexStreamWithStrongConsistencyReplacesFromTempCache() { + void byIndexStreamReplacesFromTempCache() { var original = testDeployment(); var newer = testDeployment(); newer.getMetadata().setResourceVersion("5"); when(temporaryResourceCache.getResources()) - .thenReturn(Map.of(ResourceID.fromResource(original), newer)); + .thenReturn(new HashMap<>(Map.of(ResourceID.fromResource(original), newer))); var informerManager = mock(InformerManager.class); when(informerManager.byIndexStream(any(), any())).thenReturn(Stream.of(original)); doReturn(informerManager).when(informerEventSource).manager(); informerEventSource.addIndexers(Map.of("idx", d -> List.of("key"))); - var result = informerEventSource.byIndexStreamWithStrongConsistency("idx", "key").toList(); + var result = informerEventSource.byIndexStream("idx", "key").toList(); assertThat(result).containsExactly(newer); } @Test - void listWithStrongConsistencyKeepsResourceWhenTempCacheHasOlderVersion() { + void byIndexStreamSkipsNewerTempCacheResourceWhenIndexedValueChanged() { + var original = testDeployment(); + original.getMetadata().setLabels(Map.of("app", "key")); + var newer = testDeployment(); + newer.getMetadata().setResourceVersion("5"); + newer.getMetadata().setLabels(Map.of("app", "other")); + + when(temporaryResourceCache.getResources()) + .thenReturn(new HashMap<>(Map.of(ResourceID.fromResource(original), newer))); + + var informerManager = mock(InformerManager.class); + when(informerManager.byIndexStream(any(), any())).thenReturn(Stream.of(original)); + doReturn(informerManager).when(informerEventSource).manager(); + informerEventSource.addIndexers( + Map.of("idx", d -> List.of(d.getMetadata().getLabels().get("app")))); + + var result = informerEventSource.byIndexStream("idx", "key").toList(); + + assertThat(result).isEmpty(); + } + + @Test + void listKeepsResourceWhenTempCacheHasOlderVersion() { var original = testDeployment(); original.getMetadata().setResourceVersion("5"); var olderTemp = testDeployment(); olderTemp.getMetadata().setResourceVersion("3"); when(temporaryResourceCache.getResources()) - .thenReturn(Map.of(ResourceID.fromResource(original), olderTemp)); + .thenReturn(new HashMap<>(Map.of(ResourceID.fromResource(original), olderTemp))); var mim = mock(InformerManager.class); when(mim.list(nullable(String.class), any())).thenReturn(Stream.of(original)); doReturn(mim).when(informerEventSource).manager(); - var result = informerEventSource.listWithStrongConsistency(null, r -> true).toList(); + var result = informerEventSource.list(null, r -> true).toList(); assertThat(result).containsExactly(original); } @Test - void byIndexStreamWithStrongConsistencyKeepsResourceWhenTempCacheHasOlderVersion() { + void byIndexStreamKeepsResourceWhenTempCacheHasOlderVersion() { var original = testDeployment(); original.getMetadata().setResourceVersion("5"); var olderTemp = testDeployment(); olderTemp.getMetadata().setResourceVersion("3"); when(temporaryResourceCache.getResources()) - .thenReturn(Map.of(ResourceID.fromResource(original), olderTemp)); + .thenReturn(new HashMap<>(Map.of(ResourceID.fromResource(original), olderTemp))); var mim = mock(InformerManager.class); when(mim.byIndexStream(any(), any())).thenReturn(Stream.of(original)); doReturn(mim).when(informerEventSource).manager(); informerEventSource.addIndexers(Map.of("idx", d -> List.of("key"))); - var result = informerEventSource.byIndexStreamWithStrongConsistency("idx", "key").toList(); + var result = informerEventSource.byIndexStream("idx", "key").toList(); assertThat(result).containsExactly(original); } @Test - void listWithStrongConsistencyAddsGhostResources() { + void listAddsGhostResources() { var resource = testDeployment(); var ghostResource = testDeployment(); ghostResource.getMetadata().setName("ghost"); when(temporaryResourceCache.getResources()) - .thenReturn(Map.of(ResourceID.fromResource(ghostResource), ghostResource)); + .thenReturn(new HashMap<>(Map.of(ResourceID.fromResource(ghostResource), ghostResource))); var mim = mock(InformerManager.class); when(mim.list(nullable(String.class), any())).thenReturn(Stream.of(resource)); doReturn(mim).when(informerEventSource).manager(); - var result = informerEventSource.listWithStrongConsistency(null, r -> true).toList(); + var result = informerEventSource.list(null, r -> true).toList(); assertThat(result).containsExactlyInAnyOrder(resource, ghostResource); } + @Test + void keysIncludesGhostResourceKeys() { + var resource = testDeployment(); + var ghostResource = testDeployment(); + ghostResource.getMetadata().setName("ghost"); + + var resourceId = ResourceID.fromResource(resource); + var ghostResourceId = ResourceID.fromResource(ghostResource); + + when(temporaryResourceCache.getResources()).thenReturn(Map.of(ghostResourceId, ghostResource)); + when(temporaryResourceCache.isEmpty()).thenReturn(false); + + var mim = mock(InformerManager.class); + when(mim.keys()).thenReturn(Stream.of(resourceId)); + when(mim.contains(ghostResourceId)).thenReturn(false); + doReturn(mim).when(informerEventSource).manager(); + + var result = informerEventSource.keys().toList(); + + assertThat(result).containsExactlyInAnyOrder(resourceId, ghostResourceId); + } + + @Test + void keysDoesNotDuplicateExistingKeys() { + var resource = testDeployment(); + var newerResource = testDeployment(); + newerResource.getMetadata().setResourceVersion("5"); + + var resourceId = ResourceID.fromResource(resource); + + when(temporaryResourceCache.getResources()).thenReturn(Map.of(resourceId, newerResource)); + when(temporaryResourceCache.isEmpty()).thenReturn(false); + + var mim = mock(InformerManager.class); + when(mim.keys()).thenReturn(Stream.of(resourceId)); + when(mim.contains(resourceId)).thenReturn(true); + doReturn(mim).when(informerEventSource).manager(); + + var result = informerEventSource.keys().toList(); + + assertThat(result).containsExactly(resourceId); + } + Deployment testDeployment() { Deployment deployment = new Deployment(); deployment.setMetadata(new ObjectMeta()); From dde2a2812e5c693dc9551de7d5a2b7ce07e582da Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Tue, 12 May 2026 21:39:49 +0200 Subject: [PATCH 03/19] wip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- AGENTS.md | 2 +- .../event/source/informer/ManagedInformerEventSource.java | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/AGENTS.md b/AGENTS.md index c1b0494034..d723865979 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -186,7 +186,7 @@ public class ConfigMapDependent extends CRUDKubernetesDependentResource Predicate filterResourceByNamespaceAnd String namespace, Predicate predicate) { return r -> { if (namespace != null) { - var res = Optional.of(r).map(ns -> ns.equals(namespace)).orElse(false); + var res = + Optional.of(r) + .map(rr -> namespace.equals(rr.getMetadata().getNamespace())) + .orElse(false); if (!res) return false; } if (predicate != null) { From d4d28a6a669517811e4c2bb9dc0dc9972ee62424 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Wed, 13 May 2026 09:23:19 +0200 Subject: [PATCH 04/19] Potential fix for pull request finding MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> Signed-off-by: Attila Mészáros --- .../operator/api/reconciler/ResourceOperations.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ResourceOperations.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ResourceOperations.java index 6f9e73d8ed..b9ef475509 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ResourceOperations.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/ResourceOperations.java @@ -37,7 +37,7 @@ /** * Provides useful operations to manipulate resources (server-side apply, patch, etc.) in an * idiomatic way, in particular to make sure that the latest version of the resource is present in - * the caches for the next reconciliation. In other words provides read-cache-after-write + * the caches for the next reconciliation. In other words, it provides read-cache-after-write * consistency. * * @param

the resource type on which this object operates From 2a42812433b0ef19e6226f64c6f0aa502302fca4 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Wed, 13 May 2026 09:23:46 +0200 Subject: [PATCH 05/19] Potential fix for pull request finding MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> Signed-off-by: Attila Mészáros --- AGENTS.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/AGENTS.md b/AGENTS.md index d723865979..c1b0494034 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -186,7 +186,7 @@ public class ConfigMapDependent extends CRUDKubernetesDependentResource Date: Wed, 13 May 2026 09:26:37 +0200 Subject: [PATCH 06/19] fixes from PR MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../event/source/informer/InformerManager.java | 1 + .../source/informer/ManagedInformerEventSource.java | 10 +++++----- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java index a3ef72a2a3..004a5cabb5 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java @@ -256,6 +256,7 @@ public void addIndexers(Map>> indexers) { this.indexers.putAll(indexers); } + @Override public Stream byIndexStream(String indexName, String indexKey) { return sources.values().stream().map(s -> s.byIndex(indexName, indexKey)).flatMap(List::stream); } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index 6e71c29641..32c131c8dd 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -356,11 +356,11 @@ private static Predicate filterResourceByNamespaceAnd String namespace, Predicate predicate) { return r -> { if (namespace != null) { - var res = - Optional.of(r) - .map(rr -> namespace.equals(rr.getMetadata().getNamespace())) - .orElse(false); - if (!res) return false; + if (!Optional.of(r) + .map(rr -> Objects.equals(namespace, rr.getMetadata().getNamespace())) + .orElse(false)) { + return false; + } } if (predicate != null) { return predicate.test(r); From f652d12cdc230034952a05fb8836386757734d17 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Wed, 13 May 2026 09:43:59 +0200 Subject: [PATCH 07/19] wip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../informer/ManagedInformerEventSource.java | 18 +++++++++++++----- .../informer/InformerEventSourceTest.java | 8 ++++---- 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index 32c131c8dd..4ca71d668d 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -243,7 +243,7 @@ public void addIndexers(Map>> indexers) { */ @Override public Stream list(String namespace, Predicate predicate) { - return mergeWithTempCacheForList(manager().list(namespace, predicate), namespace, predicate); + return mergeWithTempCacheForList(manager().list(namespace), namespace, predicate); } /** @@ -254,7 +254,7 @@ public Stream list(String namespace, Predicate predicate) { */ @Override public Stream list(Predicate predicate) { - return mergeWithTempCacheForList(cache.list(predicate), null, predicate); + return mergeWithTempCacheForList(cache.list(), null, predicate); } /** @@ -282,14 +282,16 @@ public List byIndex(String indexName, String indexKey) { .collect(Collectors.toList()); } + // namespace is filtered on informer manager level private Stream mergeWithTempCacheForList( Stream stream, String namespace, Predicate predicate) { if (!comparableResourceVersions || temporaryResourceCache.isEmpty()) { - return stream.filter(filterResourceByNamespaceAndPredicate(namespace, predicate)); + + return stream.filter(filterResourceByPredicate(predicate)); } var tempResources = new HashMap<>(temporaryResourceCache.getResources()); if (tempResources.isEmpty()) { - return stream.filter(filterResourceByNamespaceAndPredicate(namespace, predicate)); + return stream.filter(filterResourceByPredicate(predicate)); } var upToDateList = @@ -304,7 +306,8 @@ private Stream mergeWithTempCacheForList( } return r; }) - .filter(filterResourceByNamespaceAndPredicate(namespace, predicate)) + // we filter on predicate only since namespace changes would not be detected any ways. + .filter(filterResourceByPredicate(predicate)) .toList(); return Stream.concat( @@ -352,6 +355,11 @@ private Stream mergeWithTempCacheForIndex( upToDateList.stream()); } + private static Predicate filterResourceByPredicate( + Predicate predicate) { + return filterResourceByNamespaceAndPredicate(null, predicate); + } + private static Predicate filterResourceByNamespaceAndPredicate( String namespace, Predicate predicate) { return r -> { diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java index 2a68101984..2ad84af01c 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java @@ -588,7 +588,7 @@ void listKeepsResourceWhenNotInTempCache() { when(temporaryResourceCache.getResources()).thenReturn(new HashMap<>()); var mim = mock(InformerManager.class); - when(mim.list(nullable(String.class), any())).thenReturn(Stream.of(original)); + when(mim.list(nullable(String.class))).thenReturn(Stream.of(original)); doReturn(mim).when(informerEventSource).manager(); var result = informerEventSource.list(null, r -> true).toList(); @@ -608,7 +608,7 @@ void listReplacesOnlyMatchingResources() { .thenReturn(new HashMap<>(Map.of(ResourceID.fromResource(dep1), newerDep1))); var informerManager = mock(InformerManager.class); - when(informerManager.list(nullable(String.class), any())).thenReturn(Stream.of(dep1, dep2)); + when(informerManager.list(nullable(String.class))).thenReturn(Stream.of(dep1, dep2)); doReturn(informerManager).when(informerEventSource).manager(); var result = informerEventSource.list(null, r -> true).toList(); @@ -668,7 +668,7 @@ void listKeepsResourceWhenTempCacheHasOlderVersion() { .thenReturn(new HashMap<>(Map.of(ResourceID.fromResource(original), olderTemp))); var mim = mock(InformerManager.class); - when(mim.list(nullable(String.class), any())).thenReturn(Stream.of(original)); + when(mim.list(nullable(String.class))).thenReturn(Stream.of(original)); doReturn(mim).when(informerEventSource).manager(); var result = informerEventSource.list(null, r -> true).toList(); @@ -706,7 +706,7 @@ void listAddsGhostResources() { .thenReturn(new HashMap<>(Map.of(ResourceID.fromResource(ghostResource), ghostResource))); var mim = mock(InformerManager.class); - when(mim.list(nullable(String.class), any())).thenReturn(Stream.of(resource)); + when(mim.list(nullable(String.class))).thenReturn(Stream.of(resource)); doReturn(mim).when(informerEventSource).manager(); var result = informerEventSource.list(null, r -> true).toList(); From 3d2160069ace9d56034ce1c5c9f1f3865530f29d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Wed, 13 May 2026 09:46:36 +0200 Subject: [PATCH 08/19] Potential fix for pull request finding MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> Signed-off-by: Attila Mészáros --- .../processing/event/source/informer/InformerManager.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java index 004a5cabb5..f6a03f9f82 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java @@ -258,7 +258,7 @@ public void addIndexers(Map>> indexers) { @Override public Stream byIndexStream(String indexName, String indexKey) { - return sources.values().stream().map(s -> s.byIndex(indexName, indexKey)).flatMap(List::stream); + return sources.values().stream().flatMap(s -> s.byIndexStream(indexName, indexKey)); } @Override From 1666e5285194d3347aca7bc9a7f67931582fd751 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Wed, 13 May 2026 09:48:04 +0200 Subject: [PATCH 09/19] wip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../event/source/informer/ManagedInformerEventSource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index 4ca71d668d..150764d6eb 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -289,7 +289,7 @@ private Stream mergeWithTempCacheForList( return stream.filter(filterResourceByPredicate(predicate)); } - var tempResources = new HashMap<>(temporaryResourceCache.getResources()); + var tempResources = temporaryResourceCache.getResources(); if (tempResources.isEmpty()) { return stream.filter(filterResourceByPredicate(predicate)); } From da8ee3e642ee3fd5938baeb6485790cc6eb2558f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Wed, 13 May 2026 10:07:50 +0200 Subject: [PATCH 10/19] Potential fix for pull request finding MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> Signed-off-by: Attila Mészáros --- .../event/source/informer/ManagedInformerEventSource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index 150764d6eb..50929cfc2d 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -306,7 +306,7 @@ private Stream mergeWithTempCacheForList( } return r; }) - // we filter on predicate only since namespace changes would not be detected any ways. + // we filter on predicate only since namespace changes would not be detected anyway. .filter(filterResourceByPredicate(predicate)) .toList(); From f2ff90267a3bf5a58907deede461efa9bceaed43 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Wed, 13 May 2026 10:08:03 +0200 Subject: [PATCH 11/19] wip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../event/source/informer/ManagedInformerEventSource.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index 50929cfc2d..76098654fd 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -321,7 +321,7 @@ private Stream mergeWithTempCacheForIndex( if (!comparableResourceVersions || temporaryResourceCache.isEmpty()) { return stream; } - var tempResources = new HashMap<>(temporaryResourceCache.getResources()); + var tempResources = temporaryResourceCache.getResources(); if (tempResources.isEmpty()) { return stream; } From 56e47ea809a296789ffef08345dfdb5da9611b6a Mon Sep 17 00:00:00 2001 From: Chris Laprun Date: Wed, 13 May 2026 10:21:54 +0200 Subject: [PATCH 12/19] refactor: minor improvements MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Chris Laprun Signed-off-by: Attila Mészáros --- .../operator/api/reconciler/IndexedResourceCache.java | 4 +++- .../processing/event/source/informer/InformerManager.java | 2 +- .../processing/event/source/informer/InformerWrapper.java | 5 ----- .../event/source/informer/ManagedInformerEventSource.java | 2 +- .../event/source/informer/InformerEventSourceTest.java | 3 ++- 5 files changed, 7 insertions(+), 9 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/IndexedResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/IndexedResourceCache.java index b8f8f3381a..01e3ffb7c8 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/IndexedResourceCache.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/api/reconciler/IndexedResourceCache.java @@ -24,5 +24,7 @@ public interface IndexedResourceCache extends ResourceCac List byIndex(String indexName, String indexKey); - Stream byIndexStream(String indexName, String indexKey); + default Stream byIndexStream(String indexName, String indexKey) { + return byIndex(indexName, indexKey).stream(); + } } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java index f6a03f9f82..aa770ca6ff 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java @@ -263,7 +263,7 @@ public Stream byIndexStream(String indexName, String indexKey) { @Override public List byIndex(String indexName, String indexKey) { - return byIndexStream(indexName, indexKey).collect(Collectors.toList()); + return byIndexStream(indexName, indexKey).toList(); } @Override diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java index 8885c225c8..541068aa93 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerWrapper.java @@ -195,11 +195,6 @@ public List byIndex(String indexName, String indexKey) { return informer.getIndexer().byIndex(indexName, indexKey); } - @Override - public Stream byIndexStream(String indexName, String indexKey) { - return byIndex(indexName, indexKey).stream(); - } - @Override public String toString() { return informerInfo() + " (" + informer + ')'; diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index 76098654fd..e14f213a1e 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -254,7 +254,7 @@ public Stream list(String namespace, Predicate predicate) { */ @Override public Stream list(Predicate predicate) { - return mergeWithTempCacheForList(cache.list(), null, predicate); + return mergeWithTempCacheForList(manager().list(), null, predicate); } /** diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java index 2ad84af01c..0f53f7b2b8 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java @@ -45,6 +45,7 @@ import io.javaoperatorsdk.operator.api.reconciler.Constants; import io.javaoperatorsdk.operator.processing.event.EventHandler; import io.javaoperatorsdk.operator.processing.event.ResourceID; +import io.javaoperatorsdk.operator.processing.event.source.Cache; import io.javaoperatorsdk.operator.processing.event.source.EventFilterTestUtils; import io.javaoperatorsdk.operator.processing.event.source.ResourceAction; import io.javaoperatorsdk.operator.processing.event.source.SecondaryToPrimaryMapper; @@ -554,7 +555,7 @@ void listReplacesResourceFromTempCache() { when(mim.list(nullable(String.class), any())).thenReturn(Stream.of(original)); doReturn(mim).when(informerEventSource).manager(); - var result = informerEventSource.list(null, r -> true).toList(); + var result = informerEventSource.list(null, Cache.TRUE).toList(); assertThat(result).containsExactly(newer); } From 74d0bd927a9620246819c3250ecf2000c46355df Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Wed, 13 May 2026 10:29:37 +0200 Subject: [PATCH 13/19] wip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../event/source/informer/InformerEventSourceTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java index 0f53f7b2b8..1c4a2260f2 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java @@ -552,7 +552,7 @@ void listReplacesResourceFromTempCache() { .thenReturn(new HashMap<>(Map.of(ResourceID.fromResource(original), newer))); var mim = mock(InformerManager.class); - when(mim.list(nullable(String.class), any())).thenReturn(Stream.of(original)); + when(mim.list(nullable(String.class))).thenReturn(Stream.of(original)); doReturn(mim).when(informerEventSource).manager(); var result = informerEventSource.list(null, Cache.TRUE).toList(); From ff2753025884b14cc2c567c16772eeb20a51de15 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Wed, 13 May 2026 12:58:54 +0200 Subject: [PATCH 14/19] wip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../informer/InformerEventSourceTest.java | 23 +++++++++---------- 1 file changed, 11 insertions(+), 12 deletions(-) diff --git a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java index 1c4a2260f2..18659b14f0 100644 --- a/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java +++ b/operator-framework-core/src/test/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerEventSourceTest.java @@ -63,7 +63,6 @@ import static org.mockito.ArgumentMatchers.isNull; import static org.mockito.ArgumentMatchers.nullable; import static org.mockito.Mockito.atLeastOnce; -import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.spy; @@ -553,7 +552,7 @@ void listReplacesResourceFromTempCache() { var mim = mock(InformerManager.class); when(mim.list(nullable(String.class))).thenReturn(Stream.of(original)); - doReturn(mim).when(informerEventSource).manager(); + when(informerEventSource.manager()).thenReturn(mim); var result = informerEventSource.list(null, Cache.TRUE).toList(); @@ -572,7 +571,7 @@ void listExcludesResourceWhenTempCacheContainsNewerVersionThatNoLongerMatchesPre var mim = mock(InformerManager.class); when(mim.list(nullable(String.class), any())).thenReturn(Stream.of(original)); - doReturn(mim).when(informerEventSource).manager(); + when(informerEventSource.manager()).thenReturn(mim); var result = informerEventSource @@ -590,7 +589,7 @@ void listKeepsResourceWhenNotInTempCache() { var mim = mock(InformerManager.class); when(mim.list(nullable(String.class))).thenReturn(Stream.of(original)); - doReturn(mim).when(informerEventSource).manager(); + when(informerEventSource.manager()).thenReturn(mim); var result = informerEventSource.list(null, r -> true).toList(); @@ -610,7 +609,7 @@ void listReplacesOnlyMatchingResources() { var informerManager = mock(InformerManager.class); when(informerManager.list(nullable(String.class))).thenReturn(Stream.of(dep1, dep2)); - doReturn(informerManager).when(informerEventSource).manager(); + when(informerEventSource.manager()).thenReturn(informerManager); var result = informerEventSource.list(null, r -> true).toList(); @@ -628,7 +627,7 @@ void byIndexStreamReplacesFromTempCache() { var informerManager = mock(InformerManager.class); when(informerManager.byIndexStream(any(), any())).thenReturn(Stream.of(original)); - doReturn(informerManager).when(informerEventSource).manager(); + when(informerEventSource.manager()).thenReturn(informerManager); informerEventSource.addIndexers(Map.of("idx", d -> List.of("key"))); var result = informerEventSource.byIndexStream("idx", "key").toList(); @@ -649,7 +648,7 @@ void byIndexStreamSkipsNewerTempCacheResourceWhenIndexedValueChanged() { var informerManager = mock(InformerManager.class); when(informerManager.byIndexStream(any(), any())).thenReturn(Stream.of(original)); - doReturn(informerManager).when(informerEventSource).manager(); + when(informerEventSource.manager()).thenReturn(informerManager); informerEventSource.addIndexers( Map.of("idx", d -> List.of(d.getMetadata().getLabels().get("app")))); @@ -670,7 +669,7 @@ void listKeepsResourceWhenTempCacheHasOlderVersion() { var mim = mock(InformerManager.class); when(mim.list(nullable(String.class))).thenReturn(Stream.of(original)); - doReturn(mim).when(informerEventSource).manager(); + when(informerEventSource.manager()).thenReturn(mim); var result = informerEventSource.list(null, r -> true).toList(); @@ -689,7 +688,7 @@ void byIndexStreamKeepsResourceWhenTempCacheHasOlderVersion() { var mim = mock(InformerManager.class); when(mim.byIndexStream(any(), any())).thenReturn(Stream.of(original)); - doReturn(mim).when(informerEventSource).manager(); + when(informerEventSource.manager()).thenReturn(mim); informerEventSource.addIndexers(Map.of("idx", d -> List.of("key"))); var result = informerEventSource.byIndexStream("idx", "key").toList(); @@ -708,7 +707,7 @@ void listAddsGhostResources() { var mim = mock(InformerManager.class); when(mim.list(nullable(String.class))).thenReturn(Stream.of(resource)); - doReturn(mim).when(informerEventSource).manager(); + when(informerEventSource.manager()).thenReturn(mim); var result = informerEventSource.list(null, r -> true).toList(); @@ -730,7 +729,7 @@ void keysIncludesGhostResourceKeys() { var mim = mock(InformerManager.class); when(mim.keys()).thenReturn(Stream.of(resourceId)); when(mim.contains(ghostResourceId)).thenReturn(false); - doReturn(mim).when(informerEventSource).manager(); + when(informerEventSource.manager()).thenReturn(mim); var result = informerEventSource.keys().toList(); @@ -751,7 +750,7 @@ void keysDoesNotDuplicateExistingKeys() { var mim = mock(InformerManager.class); when(mim.keys()).thenReturn(Stream.of(resourceId)); when(mim.contains(resourceId)).thenReturn(true); - doReturn(mim).when(informerEventSource).manager(); + when(informerEventSource.manager()).thenReturn(mim); var result = informerEventSource.keys().toList(); From b067d1172e3c85b8e229b8c90884d365396f7252 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Wed, 13 May 2026 13:02:20 +0200 Subject: [PATCH 15/19] Potential fix for pull request finding MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com> Signed-off-by: Attila Mészáros --- .../event/source/informer/ManagedInformerEventSource.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index e14f213a1e..73d48535b1 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -388,8 +388,9 @@ public Stream keys() { if (!comparableResourceVersions || temporaryResourceCache.isEmpty()) { return manager().keys(); } + var managerKeys = manager().keys().collect(Collectors.toSet()); var tempKeys = temporaryResourceCache.getResources().keySet(); - return Stream.concat(manager().keys(), tempKeys.stream().filter(k -> !manager().contains(k))); + return Stream.concat(managerKeys.stream(), tempKeys.stream().filter(k -> !managerKeys.contains(k))); } @Override From 8a1e0eca3c5ed3e6a905e4420dc38d42094492c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Wed, 13 May 2026 13:03:01 +0200 Subject: [PATCH 16/19] wip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../processing/event/source/informer/InformerManager.java | 4 ---- .../event/source/informer/ManagedInformerEventSource.java | 3 ++- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java index aa770ca6ff..a0b7938302 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/InformerManager.java @@ -107,10 +107,6 @@ private void initSources() { } } - C configuration() { - return configuration; - } - public void changeNamespaces(Set namespaces) { var sourcesToRemove = sources.keySet().stream().filter(k -> !namespaces.contains(k)).collect(Collectors.toSet()); diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index 73d48535b1..ae3f018d02 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -390,7 +390,8 @@ public Stream keys() { } var managerKeys = manager().keys().collect(Collectors.toSet()); var tempKeys = temporaryResourceCache.getResources().keySet(); - return Stream.concat(managerKeys.stream(), tempKeys.stream().filter(k -> !managerKeys.contains(k))); + return Stream.concat( + managerKeys.stream(), tempKeys.stream().filter(k -> !managerKeys.contains(k))); } @Override From f33bd82dbff5c35d629c896b945e427c295a5593 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Wed, 13 May 2026 21:51:08 +0200 Subject: [PATCH 17/19] wip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../event/source/informer/ManagedInformerEventSource.java | 4 ++-- .../event/source/informer/TemporaryResourceCache.java | 3 ++- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java index ae3f018d02..3a7528858d 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/ManagedInformerEventSource.java @@ -289,7 +289,7 @@ private Stream mergeWithTempCacheForList( return stream.filter(filterResourceByPredicate(predicate)); } - var tempResources = temporaryResourceCache.getResources(); + var tempResources = new HashMap<>(temporaryResourceCache.getResources()); if (tempResources.isEmpty()) { return stream.filter(filterResourceByPredicate(predicate)); } @@ -321,7 +321,7 @@ private Stream mergeWithTempCacheForIndex( if (!comparableResourceVersions || temporaryResourceCache.isEmpty()) { return stream; } - var tempResources = temporaryResourceCache.getResources(); + var tempResources = new HashMap<>(temporaryResourceCache.getResources()); if (tempResources.isEmpty()) { return stream; } diff --git a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java index 39543843b8..551fb8adb2 100644 --- a/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java +++ b/operator-framework-core/src/main/java/io/javaoperatorsdk/operator/processing/event/source/informer/TemporaryResourceCache.java @@ -15,6 +15,7 @@ */ package io.javaoperatorsdk.operator.processing.event.source.informer; +import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.Optional; @@ -281,6 +282,6 @@ synchronized boolean isEmpty() { } synchronized Map getResources() { - return new HashMap<>(cache); + return Collections.unmodifiableMap(cache); } } From c44e0c6283e01e0fa90cdcf725df3e36985f5dff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Thu, 14 May 2026 09:26:33 +0200 Subject: [PATCH 18/19] wip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../CachingFilteringUpdateReconciler.java | 47 +++++++++++++++++-- 1 file changed, 42 insertions(+), 5 deletions(-) diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/cachingfilteringupdate/CachingFilteringUpdateReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/cachingfilteringupdate/CachingFilteringUpdateReconciler.java index 0e9a953129..8157839115 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/cachingfilteringupdate/CachingFilteringUpdateReconciler.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/cachingfilteringupdate/CachingFilteringUpdateReconciler.java @@ -36,21 +36,25 @@ public class CachingFilteringUpdateReconciler implements Reconciler { + public static final String RESOURCE_VERSION_INDEX = "resourceVersionIndex"; private final AtomicBoolean issueFound = new AtomicBoolean(false); + InformerEventSource configMapEventSource; + @Override public UpdateControl reconcile( CachingFilteringUpdateCustomResource resource, Context context) { - context.resourceOperations().serverSideApply(prepareCM(resource, 1)); + var updated = context.resourceOperations().serverSideApply(prepareCM(resource, 1)); var cachedCM = context.getSecondaryResource(ConfigMap.class); if (cachedCM.isEmpty()) { issueFound.set(true); throw new IllegalStateException("Error for resource: " + ResourceID.fromResource(resource)); } - - var updated = context.resourceOperations().serverSideApply(prepareCM(resource, 2)); + checkListContainsCM(updated); + checkIfResourceVersionIndexContainsUpdated(updated); + updated = context.resourceOperations().serverSideApply(prepareCM(resource, 2)); cachedCM = context.getSecondaryResource(ConfigMap.class); if (!cachedCM .orElseThrow() @@ -61,12 +65,43 @@ public UpdateControl reconcile( throw new IllegalStateException( "Update error for resource: " + ResourceID.fromResource(resource)); } + checkListContainsCM(updated); + checkIfResourceVersionIndexContainsUpdated(updated); ensureStatusExists(resource); resource.getStatus().setUpdated(true); return UpdateControl.patchStatus(resource); } + private void checkIfResourceVersionIndexContainsUpdated(ConfigMap updated) { + if (configMapEventSource + .byIndex(RESOURCE_VERSION_INDEX, updated.getMetadata().getResourceVersion()) + .stream() + .noneMatch( + r -> + ResourceID.fromResource(r).equals(ResourceID.fromResource(updated)) + && r.getMetadata() + .getResourceVersion() + .equals(updated.getMetadata().getResourceVersion()))) { + throw new IllegalStateException( + "Index does not contain resource: " + ResourceID.fromResource(updated)); + } + } + + private void checkListContainsCM(ConfigMap updated) { + if (configMapEventSource + .list() + .noneMatch( + r -> + ResourceID.fromResource(r).equals(ResourceID.fromResource(updated)) + && r.getMetadata() + .getResourceVersion() + .equals(updated.getMetadata().getResourceVersion()))) { + throw new IllegalStateException( + "List does not contain resource: " + ResourceID.fromResource(updated)); + } + } + private static ConfigMap prepareCM(CachingFilteringUpdateCustomResource p, int num) { var cm = new ConfigMapBuilder() @@ -84,13 +119,15 @@ private static ConfigMap prepareCM(CachingFilteringUpdateCustomResource p, int n @Override public List> prepareEventSources( EventSourceContext context) { - InformerEventSource cmES = + configMapEventSource = new InformerEventSource<>( InformerEventSourceConfiguration.from( ConfigMap.class, CachingFilteringUpdateCustomResource.class) .build(), context); - return List.of(cmES); + configMapEventSource.addIndexers( + Map.of(RESOURCE_VERSION_INDEX, cm -> List.of(cm.getMetadata().getResourceVersion()))); + return List.of(configMapEventSource); } private void ensureStatusExists(CachingFilteringUpdateCustomResource resource) { From 43a1a901d7fc83400049d4ff368085e3320bcbb1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Attila=20M=C3=A9sz=C3=A1ros?= Date: Thu, 14 May 2026 09:28:37 +0200 Subject: [PATCH 19/19] wip MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Attila Mészáros --- .../CachingFilteringUpdateReconciler.java | 50 ++++++++++--------- 1 file changed, 26 insertions(+), 24 deletions(-) diff --git a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/cachingfilteringupdate/CachingFilteringUpdateReconciler.java b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/cachingfilteringupdate/CachingFilteringUpdateReconciler.java index 8157839115..c8fc206106 100644 --- a/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/cachingfilteringupdate/CachingFilteringUpdateReconciler.java +++ b/operator-framework/src/test/java/io/javaoperatorsdk/operator/baseapi/cachingfilteringupdate/CachingFilteringUpdateReconciler.java @@ -39,38 +39,40 @@ public class CachingFilteringUpdateReconciler public static final String RESOURCE_VERSION_INDEX = "resourceVersionIndex"; private final AtomicBoolean issueFound = new AtomicBoolean(false); - InformerEventSource configMapEventSource; + private InformerEventSource configMapEventSource; @Override public UpdateControl reconcile( CachingFilteringUpdateCustomResource resource, Context context) { + try { + var updated = context.resourceOperations().serverSideApply(prepareCM(resource, 1)); + var cachedCM = context.getSecondaryResource(ConfigMap.class); + if (cachedCM.isEmpty()) { + throw new IllegalStateException("Error for resource: " + ResourceID.fromResource(resource)); + } + checkListContainsCM(updated); + checkIfResourceVersionIndexContainsUpdated(updated); + updated = context.resourceOperations().serverSideApply(prepareCM(resource, 2)); + cachedCM = context.getSecondaryResource(ConfigMap.class); + if (!cachedCM + .orElseThrow() + .getMetadata() + .getResourceVersion() + .equals(updated.getMetadata().getResourceVersion())) { + throw new IllegalStateException( + "Update error for resource: " + ResourceID.fromResource(resource)); + } + checkListContainsCM(updated); + checkIfResourceVersionIndexContainsUpdated(updated); - var updated = context.resourceOperations().serverSideApply(prepareCM(resource, 1)); - var cachedCM = context.getSecondaryResource(ConfigMap.class); - if (cachedCM.isEmpty()) { + ensureStatusExists(resource); + resource.getStatus().setUpdated(true); + return UpdateControl.patchStatus(resource); + } catch (IllegalStateException e) { issueFound.set(true); - throw new IllegalStateException("Error for resource: " + ResourceID.fromResource(resource)); + throw e; } - checkListContainsCM(updated); - checkIfResourceVersionIndexContainsUpdated(updated); - updated = context.resourceOperations().serverSideApply(prepareCM(resource, 2)); - cachedCM = context.getSecondaryResource(ConfigMap.class); - if (!cachedCM - .orElseThrow() - .getMetadata() - .getResourceVersion() - .equals(updated.getMetadata().getResourceVersion())) { - issueFound.set(true); - throw new IllegalStateException( - "Update error for resource: " + ResourceID.fromResource(resource)); - } - checkListContainsCM(updated); - checkIfResourceVersionIndexContainsUpdated(updated); - - ensureStatusExists(resource); - resource.getStatus().setUpdated(true); - return UpdateControl.patchStatus(resource); } private void checkIfResourceVersionIndexContainsUpdated(ConfigMap updated) {