Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import io.javaoperatorsdk.operator.processing.event.source.filter.OnAddFilter;
import io.javaoperatorsdk.operator.processing.event.source.filter.OnDeleteFilter;
import io.javaoperatorsdk.operator.processing.event.source.filter.OnUpdateFilter;
import io.javaoperatorsdk.operator.processing.event.source.informer.TemporaryResourceCache;

import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_COMPARABLE_RESOURCE_VERSION;
import static io.javaoperatorsdk.operator.api.reconciler.Constants.DEFAULT_FOLLOW_CONTROLLER_NAMESPACE_CHANGES;
Expand Down Expand Up @@ -143,11 +142,9 @@
boolean comparableResourceVersions() default DEFAULT_COMPARABLE_RESOURCE_VERSION;

/**
* For read-cache-after-write consistency there are some corner cases where we need to check the
* caches see {@link TemporaryResourceCache} periodically. This is the period in milliseconds.
* Applicable only if {@link #comparableResourceVersions()} is true.
*
* @since 5.3.0
* @deprecated Ghost resource checking is now triggered by the informer's onList callback. This
* setting is no longer used.
Comment thread
csviri marked this conversation as resolved.
*/
@Deprecated(forRemoval = true)
long ghostResourceCacheCheckInterval() default DEFAULT_GHOST_RESOURCE_CHECK_INTERVAL_MILLIS;
}
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,6 @@ public class InformerConfiguration<R extends HasMetadata> {
private Long informerListLimit;
private FieldSelector fieldSelector;
private Boolean comparableResourceVersions;
private Duration ghostResourceCacheCheckInterval;

protected InformerConfiguration(
Class<R> resourceClass,
Expand All @@ -70,8 +69,7 @@ protected InformerConfiguration(
ItemStore<R> itemStore,
Long informerListLimit,
FieldSelector fieldSelector,
Boolean comparableResourceVersions,
Duration ghostResourceCacheCheckInterval) {
Boolean comparableResourceVersions) {
this(resourceClass);
this.name = name;
this.namespaces = namespaces;
Expand All @@ -85,7 +83,6 @@ protected InformerConfiguration(
this.informerListLimit = informerListLimit;
this.fieldSelector = fieldSelector;
this.comparableResourceVersions = comparableResourceVersions;
this.ghostResourceCacheCheckInterval = ghostResourceCacheCheckInterval;
}

private InformerConfiguration(Class<R> resourceClass) {
Expand Down Expand Up @@ -121,8 +118,7 @@ public static <R extends HasMetadata> InformerConfiguration<R>.Builder builder(
original.itemStore,
original.informerListLimit,
original.fieldSelector,
original.comparableResourceVersions,
original.ghostResourceCacheCheckInterval)
original.comparableResourceVersions)
.builder;
}

Expand Down Expand Up @@ -301,10 +297,6 @@ public boolean isComparableResourceVersions() {
return comparableResourceVersions;
}

public Duration getGhostResourceCacheCheckInterval() {
return ghostResourceCacheCheckInterval;
}

@SuppressWarnings("UnusedReturnValue")
public class Builder {

Expand All @@ -323,9 +315,6 @@ public InformerConfiguration<R> buildForController() {
comparableResourceVersions = DEFAULT_COMPARABLE_RESOURCE_VERSION;
}

if (ghostResourceCacheCheckInterval == null) {
ghostResourceCacheCheckInterval = DEFAULT_GHOST_RESOURCE_CHECK_INTERVAL;
}
return InformerConfiguration.this;
}

Expand All @@ -341,10 +330,6 @@ public InformerConfiguration<R> build() {
comparableResourceVersions = DEFAULT_COMPARABLE_RESOURCE_VERSION;
}

if (ghostResourceCacheCheckInterval == null) {
ghostResourceCacheCheckInterval = DEFAULT_GHOST_RESOURCE_CHECK_INTERVAL;
}

return InformerConfiguration.this;
}

Expand Down Expand Up @@ -392,8 +377,6 @@ public InformerConfiguration<R>.Builder initFromAnnotation(
.map(f -> new FieldSelector.Field(f.path(), f.value(), f.negated()))
.toList()));
withComparableResourceVersions(informerConfig.comparableResourceVersions());
withGhostResourceCacheCheckInterval(
Duration.ofMillis(informerConfig.ghostResourceCacheCheckInterval()));
}
return this;
}
Expand Down Expand Up @@ -500,8 +483,8 @@ public Builder withComparableResourceVersions(boolean comparableResourceVersions
return this;
}

@Deprecated(forRemoval = true)
public Builder withGhostResourceCacheCheckInterval(Duration ghostResourceCacheCheckInterval) {
InformerConfiguration.this.ghostResourceCacheCheckInterval = ghostResourceCacheCheckInterval;
return this;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -291,9 +291,9 @@ public Builder<R> withComparableResourceVersion(boolean comparableResourceVersio
return this;
}

@Deprecated(forRemoval = true)
public Builder<R> withGhostResourceCacheCheckInterval(
Duration ghostResourceCacheCheckInterval) {
config.withGhostResourceCacheCheckInterval(ghostResourceCacheCheckInterval);
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@ public final class Constants {
public static final boolean DEFAULT_FOLLOW_CONTROLLER_NAMESPACE_CHANGES = true;
public static final boolean DEFAULT_COMPARABLE_RESOURCE_VERSION = true;

@Deprecated(forRemoval = true)
public static final long DEFAULT_GHOST_RESOURCE_CHECK_INTERVAL_MILLIS = 3 * 60 * 1000;

@Deprecated(forRemoval = true)
public static final Duration DEFAULT_GHOST_RESOURCE_CHECK_INTERVAL =
Duration.ofMillis(DEFAULT_GHOST_RESOURCE_CHECK_INTERVAL_MILLIS);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -153,15 +153,7 @@ public synchronized void start() {
if (isRunning()) {
return;
}
temporaryResourceCache =
new TemporaryResourceCache<>(
comparableResourceVersions,
configuration.getInformerConfig().getGhostResourceCacheCheckInterval().toMillis(),
controllerConfiguration
.getConfigurationService()
.getExecutorServiceManager()
.scheduledExecutorService(),
this);
temporaryResourceCache = new TemporaryResourceCache<>(comparableResourceVersions, this);
this.cache = new InformerManager<>(client, configuration, this);
cache.setControllerConfiguration(controllerConfiguration);
cache.addIndexers(indexers);
Expand All @@ -178,6 +170,11 @@ public synchronized void stop() {
manager().stop();
}

@Override
public void onList(String resourceVersion, boolean remainedEmpty) {
temporaryResourceCache.checkGhostResources();
}

@Override
public void handleRecentResourceUpdate(
ResourceID resourceID, R resource, R previousVersionOfResource) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -71,18 +69,9 @@ public enum EventHandling {

public TemporaryResourceCache(
boolean comparableResourceVersions,
long ghostResourceCheckInterval,
ScheduledExecutorService ghostCheckExecutor,
ManagedInformerEventSource<T, ?, ?> managedInformerEventSource) {
this.comparableResourceVersions = comparableResourceVersions;
this.managedInformerEventSource = managedInformerEventSource;
if (comparableResourceVersions) {
ghostCheckExecutor.scheduleWithFixedDelay(
this::checkGhostResources,
ghostResourceCheckInterval,
ghostResourceCheckInterval,
TimeUnit.MILLISECONDS);
}
}

public synchronized void startEventFilteringModify(ResourceID resourceID) {
Expand Down Expand Up @@ -238,9 +227,10 @@ private String getLastSyncResourceVersion(String namespace) {
* resources: when we create a resource that is deleted right after by third party and the related
* informer have a disconnected watch and this watch needs to do a re-list when connected again.
* In this case neither the ADD nor DELETE event will be propagated to the informer, but we
* explicitly add resources to this cache. Those are cleaned up by this check.
* explicitly add resources to this cache. Those are cleaned up by this check, which is triggered
* by the informer's onList callback.
*/
private void checkGhostResources() {
public void checkGhostResources() {
log.debug("Checking for ghost resources.");
var iterator = cache.entrySet().iterator();
while (iterator.hasNext()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import io.javaoperatorsdk.operator.api.config.ControllerConfiguration;
import io.javaoperatorsdk.operator.api.config.ResolvedControllerConfiguration;
import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration;
import io.javaoperatorsdk.operator.api.reconciler.Constants;
import io.javaoperatorsdk.operator.api.reconciler.Reconciler;
import io.javaoperatorsdk.operator.api.reconciler.UpdateControl;
import io.javaoperatorsdk.operator.processing.Controller;
Expand Down Expand Up @@ -64,8 +63,6 @@ public void setup() {
when(controllerConfig.getConfigurationService()).thenReturn(new BaseConfigurationService());
var ic = mock(InformerConfiguration.class);
when(controllerConfig.getInformerConfig()).thenReturn(ic);
when(ic.getGhostResourceCacheCheckInterval())
.thenReturn(Constants.DEFAULT_GHOST_RESOURCE_CHECK_INTERVAL);

setUpSource(new ControllerEventSource<>(testController), true, controllerConfig);
}
Expand Down Expand Up @@ -329,7 +326,6 @@ public TestConfiguration(
.withOnUpdateFilter(onUpdateFilter)
.withGenericFilter(genericFilter)
.withComparableResourceVersions(true)
.withGhostResourceCacheCheckInterval(Constants.DEFAULT_GHOST_RESOURCE_CHECK_INTERVAL)
.buildForController(),
false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;

import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
Expand All @@ -38,7 +36,6 @@
import io.javaoperatorsdk.operator.api.config.InformerStoppedHandler;
import io.javaoperatorsdk.operator.api.config.informer.InformerConfiguration;
import io.javaoperatorsdk.operator.api.config.informer.InformerEventSourceConfiguration;
import io.javaoperatorsdk.operator.api.reconciler.Constants;
import io.javaoperatorsdk.operator.processing.event.EventHandler;
import io.javaoperatorsdk.operator.processing.event.ResourceID;
import io.javaoperatorsdk.operator.processing.event.source.EventFilterTestUtils;
Expand Down Expand Up @@ -89,8 +86,6 @@ void setup() {
when(informerEventSourceConfiguration.getInformerConfig()).thenReturn(informerConfig);
when(informerConfig.getEffectiveNamespaces(any())).thenReturn(DEFAULT_NAMESPACES_SET);
when(informerEventSourceConfiguration.getResourceClass()).thenReturn(Deployment.class);
when(informerConfig.getGhostResourceCacheCheckInterval())
.thenReturn(Constants.DEFAULT_GHOST_RESOURCE_CHECK_INTERVAL);
informerEventSource =
spy(
new InformerEventSource<>(informerEventSourceConfiguration, clientMock) {
Expand Down Expand Up @@ -349,8 +344,7 @@ void ghostCheckRemovesCachedResourceDuringFilteringUpdate() {
when(mim.lastSyncResourceVersion(any())).thenReturn("1");
when(mim.get(any())).thenReturn(Optional.empty());

var ghostCheckExecutor = Executors.newScheduledThreadPool(1);
temporaryResourceCache = spy(new TemporaryResourceCache<>(true, 50, ghostCheckExecutor, mes));
temporaryResourceCache = spy(new TemporaryResourceCache<>(true, mes));
informerEventSource.setTemporalResourceCache(temporaryResourceCache);

// put resource in cache and start a filtering update
Expand All @@ -363,15 +357,12 @@ void ghostCheckRemovesCachedResourceDuringFilteringUpdate() {
when(mim.lastSyncResourceVersion(any())).thenReturn("3");

// ghost check should remove the cached resource
await()
.untilAsserted(
() -> assertThat(temporaryResourceCache.getResourceFromCache(resourceId)).isEmpty());
temporaryResourceCache.checkGhostResources();
assertThat(temporaryResourceCache.getResourceFromCache(resourceId)).isEmpty();

// complete the filtering update - the resource should not reappear
temporaryResourceCache.doneEventFilterModify(resourceId, "2");
assertThat(temporaryResourceCache.getResourceFromCache(resourceId)).isEmpty();

ghostCheckExecutor.shutdownNow();
}

@Test
Expand All @@ -383,8 +374,7 @@ void ghostCheckRunsConcurrentlyWithPutResource() {
when(mim.lastSyncResourceVersion(any())).thenReturn("1");
when(mim.get(any())).thenReturn(Optional.empty());

var ghostCheckExecutor = Executors.newScheduledThreadPool(1);
temporaryResourceCache = spy(new TemporaryResourceCache<>(true, 50, ghostCheckExecutor, mes));
temporaryResourceCache = spy(new TemporaryResourceCache<>(true, mes));
informerEventSource.setTemporalResourceCache(temporaryResourceCache);

// put a resource that will become a ghost
Expand All @@ -394,22 +384,16 @@ void ghostCheckRunsConcurrentlyWithPutResource() {
// advance sync version so ghost check removes it
when(mim.lastSyncResourceVersion(any())).thenReturn("3");

await()
.untilAsserted(
() ->
assertThat(
temporaryResourceCache.getResourceFromCache(
ResourceID.fromResource(deployment)))
.isEmpty());
temporaryResourceCache.checkGhostResources();
assertThat(temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(deployment)))
.isEmpty();

// now put a newer resource - should succeed even after ghost removal
var newerDeployment = deploymentWithResourceVersion(4);
temporaryResourceCache.putResource(newerDeployment);
assertThat(
temporaryResourceCache.getResourceFromCache(ResourceID.fromResource(newerDeployment)))
.isPresent();

ghostCheckExecutor.shutdownNow();
}

@Test
Expand All @@ -421,8 +405,7 @@ void filteringUpdateAndGhostCheckWithNamespaceChange() {
when(mim.lastSyncResourceVersion(any())).thenReturn("1");
when(mim.get(any())).thenReturn(Optional.empty());

var ghostCheckExecutor = Executors.newScheduledThreadPool(1);
temporaryResourceCache = spy(new TemporaryResourceCache<>(true, 50, ghostCheckExecutor, mes));
temporaryResourceCache = spy(new TemporaryResourceCache<>(true, mes));
informerEventSource.setTemporalResourceCache(temporaryResourceCache);

// start filtering update and put resource
Expand All @@ -434,9 +417,8 @@ void filteringUpdateAndGhostCheckWithNamespaceChange() {
// namespace becomes unwatched - ghost check should clean up
when(mim.isWatchingNamespace(any())).thenReturn(false);

await()
.untilAsserted(
() -> assertThat(temporaryResourceCache.getResourceFromCache(resourceId)).isEmpty());
temporaryResourceCache.checkGhostResources();
assertThat(temporaryResourceCache.getResourceFromCache(resourceId)).isEmpty();

// complete the filtering update
var doneResult = temporaryResourceCache.doneEventFilterModify(resourceId, "2");
Expand All @@ -446,8 +428,6 @@ void filteringUpdateAndGhostCheckWithNamespaceChange() {
// put should be rejected since namespace is no longer watched
temporaryResourceCache.putResource(deploymentWithResourceVersion(3));
assertThat(temporaryResourceCache.getResourceFromCache(resourceId)).isEmpty();

ghostCheckExecutor.shutdownNow();
}

private void assertNoEventProduced() {
Expand Down Expand Up @@ -496,8 +476,7 @@ private void withRealTemporaryResourceCache() {
when(mes.manager()).thenReturn(mim);
when(mim.lastSyncResourceVersion(any())).thenReturn("1");

temporaryResourceCache =
spy(new TemporaryResourceCache<>(true, 100, mock(ScheduledExecutorService.class), mes));
temporaryResourceCache = spy(new TemporaryResourceCache<>(true, mes));
informerEventSource.setTemporalResourceCache(temporaryResourceCache);
}

Expand Down
Loading
Loading