Skip to content
Open
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,15 @@
package io.javaoperatorsdk.operator.api.reconciler;

import java.util.List;
import java.util.stream.Stream;

import io.fabric8.kubernetes.api.model.HasMetadata;

public interface IndexedResourceCache<T extends HasMetadata> extends ResourceCache<T> {

List<T> byIndex(String indexName, String indexKey);

default Stream<T> byIndexStream(String indexName, String indexKey) {
return byIndex(indexName, indexKey).stream();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,22 @@
@SuppressWarnings("unchecked")
public interface ResourceCache<T extends HasMetadata> extends Cache<T> {

/**
* Lists all resources in the given namespace.
*
* @param namespace the namespace to list resources from
* @return a stream of all cached resources in the namespace
*/
default Stream<T> list(String namespace) {
return list(namespace, TRUE);
}

/**
* Lists resources in the given namespace that match the provided predicate.
*
* @param namespace the namespace to list resources from
* @param predicate filter to apply on the resources
* @return a stream of cached resources matching the predicate
*/
Stream<T> list(String namespace, Predicate<T> predicate);
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@
/**
* Provides useful operations to manipulate resources (server-side apply, patch, etc.) in an
* idiomatic way, in particular to make sure that the latest version of the resource is present in
* the caches for the next reconciliation.
* the caches for the next reconciliation. In other words, it provides read-cache-after-write
* consistency.
*
* @param <P> the resource type on which this object operates
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,45 @@
public interface Cache<T> {
Predicate TRUE = (a) -> true;

/**
* Retrieves a resource from the cache by its {@link ResourceID}.
*
* @param resourceID the identifier of the resource
* @return an Optional containing the resource if present in the cache
*/
Optional<T> get(ResourceID resourceID);

/**
* Checks whether a resource with the given {@link ResourceID} exists in the cache.
*
* @param resourceID the identifier of the resource
* @return {@code true} if the resource is present in the cache
*/
default boolean contains(ResourceID resourceID) {
return get(resourceID).isPresent();
}

/**
* Returns a stream of all {@link ResourceID}s currently in the cache.
*
* @return a stream of resource identifiers
*/
Stream<ResourceID> keys();

/**
* Lists all resources in the cache.
*
* @return a stream of all cached resources
*/
default Stream<T> list() {
return list(TRUE);
}

/**
* Lists resources in the cache that match the provided predicate.
*
* @param predicate filter to apply on the resources
* @return a stream of cached resources matching the predicate
*/
Stream<T> list(Predicate<T> predicate);
}
Original file line number Diff line number Diff line change
Expand Up @@ -107,10 +107,6 @@ private void initSources() {
}
}

C configuration() {
return configuration;
}

public void changeNamespaces(Set<String> namespaces) {
var sourcesToRemove =
sources.keySet().stream().filter(k -> !namespaces.contains(k)).collect(Collectors.toSet());
Expand Down Expand Up @@ -256,12 +252,14 @@ public void addIndexers(Map<String, Function<R, List<String>>> indexers) {
this.indexers.putAll(indexers);
}

Comment thread
csviri marked this conversation as resolved.
@Override
public Stream<R> byIndexStream(String indexName, String indexKey) {
Comment thread
github-code-quality[bot] marked this conversation as resolved.
Fixed
return sources.values().stream().flatMap(s -> s.byIndexStream(indexName, indexKey));
}

@Override
public List<R> byIndex(String indexName, String indexKey) {
return sources.values().stream()
.map(s -> s.byIndex(indexName, indexKey))
.flatMap(List::stream)
.collect(Collectors.toList());
return byIndexStream(indexName, indexKey).toList();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,13 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.function.Predicate;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.slf4j.Logger;
Expand Down Expand Up @@ -111,7 +113,6 @@ public R eventFilteringUpdateAndCacheResource(R resourceToUpdate, UnaryOperator<
res.ifPresentOrElse(
r -> {
R latestResource = (R) r.getResource().orElseThrow();

// as previous resource version we use the one from successful update, since
// we process new event here only if that is more recent then the event from our update.
// Note that this is equivalent with the scenario when an informer watch connection
Expand Down Expand Up @@ -222,11 +223,6 @@ public Optional<R> getCachedValue(ResourceID resourceID) {
return get(resourceID);
}

@Override
public Stream<R> list(String namespace, Predicate<R> predicate) {
return manager().list(namespace, predicate);
}

void setTemporalResourceCache(TemporaryResourceCache<R> temporaryResourceCache) {
this.temporaryResourceCache = temporaryResourceCache;
}
Expand All @@ -239,19 +235,163 @@ public void addIndexers(Map<String, Function<R, List<String>>> indexers) {
this.indexers.putAll(indexers);
}

/**
* {@inheritDoc}
*
* <p>This implementation is read-cache-after-write consistent. Results are merged with the
* temporary resource cache to ensure recently written resources are reflected in the output.
*/
@Override
public List<R> byIndex(String indexName, String indexKey) {
return manager().byIndex(indexName, indexKey);
public Stream<R> list(String namespace, Predicate<R> predicate) {
return mergeWithTempCacheForList(manager().list(namespace), namespace, predicate);
Comment thread
csviri marked this conversation as resolved.
}

/**
* {@inheritDoc}
*
* <p>This implementation is read-cache-after-write consistent. Results are merged with the
* temporary resource cache to ensure recently written resources are reflected in the output.
*/
@Override
public Stream<ResourceID> keys() {
return cache.keys();
public Stream<R> list(Predicate<R> predicate) {
return mergeWithTempCacheForList(manager().list(), null, predicate);
}

/**
* {@inheritDoc}
*
* <p>This implementation is read-cache-after-write consistent. Results are merged with the
* temporary resource cache to ensure recently written resources are reflected in the output.
*/
@Override
public Stream<R> list(Predicate<R> predicate) {
return cache.list(predicate);
public Stream<R> byIndexStream(String indexName, String indexKey) {
return mergeWithTempCacheForIndex(
manager().byIndexStream(indexName, indexKey), indexName, indexKey);
}

/**
* {@inheritDoc}
*
* <p>This implementation is read-cache-after-write consistent. Results are merged with the
* temporary resource cache to ensure recently written resources are reflected in the output.
*/
@Override
public List<R> byIndex(String indexName, String indexKey) {
return mergeWithTempCacheForIndex(
manager().byIndexStream(indexName, indexKey), indexName, indexKey)
.collect(Collectors.toList());
}

// namespace is filtered on informer manager level
private Stream<R> mergeWithTempCacheForList(
Stream<R> stream, String namespace, Predicate<R> predicate) {
if (!comparableResourceVersions || temporaryResourceCache.isEmpty()) {

return stream.filter(filterResourceByPredicate(predicate));
}
var tempResources = new HashMap<>(temporaryResourceCache.getResources());
if (tempResources.isEmpty()) {
Comment thread
csviri marked this conversation as resolved.
return stream.filter(filterResourceByPredicate(predicate));
}

var upToDateList =
stream
.map(
r -> {
var resourceID = ResourceID.fromResource(r);
var tempResource = tempResources.remove(resourceID);
if (tempResource != null
&& ReconcilerUtilsInternal.compareResourceVersions(tempResource, r) > 0) {
return tempResource;
}
return r;
})
// we filter on predicate only since namespace changes would not be detected anyway.
.filter(filterResourceByPredicate(predicate))
.toList();

return Stream.concat(
tempResources.values().stream()
.filter(filterResourceByNamespaceAndPredicate(namespace, predicate)),
upToDateList.stream());
}

private Stream<R> mergeWithTempCacheForIndex(
Stream<R> stream, String indexName, String indexKey) {
if (!comparableResourceVersions || temporaryResourceCache.isEmpty()) {
return stream;
}
var tempResources = new HashMap<>(temporaryResourceCache.getResources());
if (tempResources.isEmpty()) {
return stream;
}

var indexer = indexers.get(indexName);
if (indexer == null) {
throw new IllegalArgumentException("Indexer not found for: " + indexName);
}

var upToDateList =
stream
.map(
r -> {
var resourceID = ResourceID.fromResource(r);
var tempResource = tempResources.remove(resourceID);
if (tempResource != null
&& ReconcilerUtilsInternal.compareResourceVersions(tempResource, r) > 0) {
if (!indexer.apply(tempResource).contains(indexKey)) {
return null;
}
return tempResource;
}
return r;
})
.filter(Objects::nonNull)
.toList();

// remaining temp resources are ghost resources — include only those matching the index
return Stream.concat(
tempResources.values().stream().filter(r -> indexer.apply(r).contains(indexKey)),
upToDateList.stream());
}

private static <R extends HasMetadata> Predicate<R> filterResourceByPredicate(
Predicate<R> predicate) {
return filterResourceByNamespaceAndPredicate(null, predicate);
}

private static <R extends HasMetadata> Predicate<R> filterResourceByNamespaceAndPredicate(
String namespace, Predicate<R> predicate) {
return r -> {
if (namespace != null) {
if (!Optional.of(r)
.map(rr -> Objects.equals(namespace, rr.getMetadata().getNamespace()))
.orElse(false)) {
return false;
}
}
if (predicate != null) {
return predicate.test(r);
}
return true;
};
}

/**
* {@inheritDoc}
*
* <p>This implementation is read-cache-after-write consistent. Keys from the temporary resource
* cache (ghost resources) are included in the result.
*/
@Override
public Stream<ResourceID> keys() {
if (!comparableResourceVersions || temporaryResourceCache.isEmpty()) {
return manager().keys();
}
var managerKeys = manager().keys().collect(Collectors.toSet());
var tempKeys = temporaryResourceCache.getResources().keySet();
return Stream.concat(
managerKeys.stream(), tempKeys.stream().filter(k -> !managerKeys.contains(k)));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
*/
package io.javaoperatorsdk.operator.processing.event.source.informer;

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
Expand Down Expand Up @@ -275,4 +276,12 @@ private void checkGhostResources() {
public synchronized Optional<T> getResourceFromCache(ResourceID resourceID) {
return Optional.ofNullable(cache.get(resourceID));
}

synchronized boolean isEmpty() {
return cache.isEmpty();
}

synchronized Map<ResourceID, T> getResources() {
Comment thread
csviri marked this conversation as resolved.
return Collections.unmodifiableMap(cache);
}
}
Loading
Loading