Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@
import com.azure.cosmos.implementation.HttpConstants;
import com.azure.cosmos.implementation.OperationCancelledException;
import com.azure.cosmos.implementation.TestConfigurations;
import com.azure.cosmos.models.CosmosChangeFeedRequestOptions;
import com.azure.cosmos.models.CosmosContainerProperties;
import com.azure.cosmos.models.CosmosItemRequestOptions;
import com.azure.cosmos.models.CosmosItemResponse;
import com.azure.cosmos.models.CosmosQueryRequestOptions;
import com.azure.cosmos.models.FeedRange;
import com.azure.cosmos.models.PartitionKey;
import com.azure.cosmos.models.SqlQuerySpec;
import com.azure.cosmos.rx.TestSuiteBase;
Expand Down Expand Up @@ -258,6 +260,41 @@ public void queryItemWithEndToEndTimeoutPolicyInOptionsShouldTimeout() {
}
}

@Test(groups = {"fast"}, timeOut = 10000L, retryAnalyzer = FlakyTestRetryAnalyzer.class)
public void queryChangeFeedWithEndToEndTimeoutPolicyInOptionsShouldTimeout() {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since, CosmosEndToEndOperationLatencyPolicyConfig wiring introduces availability strategy, we could increase coverage there for change feed and availability strategy.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@FabianMeiswinkel - what is the risk here - https://github.com/Azure/azure-sdk-for-java/blob/main/sdk/cosmos/azure-cosmos/src/main/java/com/azure/cosmos/implementation/query/ChangeFeedFetcher.java#L318? There might be some diagnostic gaps which can be fixed in a later PR but wanted to understand a bit more on the risks of enabling availability strategy for change feed against multi-writer accounts.

if (getClientBuilder().buildConnectionPolicy().getConnectionMode() != ConnectionMode.DIRECT) {
throw new SkipException("Failure injection only supported for DIRECT mode");
}

CosmosAsyncClient cosmosClient = initializeClient(endToEndOperationLatencyPolicyConfig);
FaultInjectionRule faultInjectionRule = null;
try {
CosmosEndToEndOperationLatencyPolicyConfig endToEndOperationLatencyPolicyConfig =
new CosmosEndToEndOperationLatencyPolicyConfigBuilder(Duration.ofSeconds(1))
.build();

CosmosChangeFeedRequestOptions options =
CosmosChangeFeedRequestOptions.createForProcessingFromBeginning(FeedRange.forFullRange());
options.setCosmosEndToEndOperationLatencyPolicyConfig(endToEndOperationLatencyPolicyConfig);

faultInjectionRule = injectFailure(createdContainer, FaultInjectionOperationType.READ_FEED_ITEM, null);
CosmosPagedFlux<TestObject> changeFeedPagedFlux =
createdContainer.queryChangeFeed(options, TestObject.class);

StepVerifier.create(changeFeedPagedFlux)
.expectErrorMatches(throwable -> throwable instanceof OperationCancelledException
&& ((OperationCancelledException) throwable).getSubStatusCode()
== HttpConstants.SubStatusCodes.CLIENT_OPERATION_TIMEOUT)
.verify();
} finally {
if (faultInjectionRule != null) {
faultInjectionRule.disable();
}

safeClose(cosmosClient);
}
}

@Test(groups = {"fast"}, timeOut = 10000L, retryAnalyzer = FlakyTestRetryAnalyzer.class)
public void queryItemWithEndToEndTimeoutPolicyInOptionsShouldTimeoutWithClientConfig() {
if (getClientBuilder().buildConnectionPolicy().getConnectionMode() != ConnectionMode.DIRECT) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ public final class CosmosChangeFeedRequestOptionsImpl implements OverridableRequ
private boolean completeAfterAllCurrentChangesRetrieved;
private Long endLSN;
private ReadConsistencyStrategy readConsistencyStrategy;
private CosmosEndToEndOperationLatencyPolicyConfig endToEndOperationLatencyPolicyConfig;

public CosmosChangeFeedRequestOptionsImpl(CosmosChangeFeedRequestOptionsImpl toBeCloned) {
if (toBeCloned.continuationState != null) {
Expand Down Expand Up @@ -80,6 +81,7 @@ public CosmosChangeFeedRequestOptionsImpl(CosmosChangeFeedRequestOptionsImpl toB
this.keywordIdentifiers = toBeCloned.keywordIdentifiers;
this.completeAfterAllCurrentChangesRetrieved = toBeCloned.completeAfterAllCurrentChangesRetrieved;
this.endLSN = toBeCloned.endLSN;
this.endToEndOperationLatencyPolicyConfig = toBeCloned.endToEndOperationLatencyPolicyConfig;
}

public CosmosChangeFeedRequestOptionsImpl(
Expand Down Expand Up @@ -296,8 +298,11 @@ public CosmosChangeFeedRequestOptionsImpl setExcludedRegions(List<String> exclud

@Override
public CosmosEndToEndOperationLatencyPolicyConfig getCosmosEndToEndLatencyPolicyConfig() {
// @TODO: Implement this and some of the others below
return null;
return this.endToEndOperationLatencyPolicyConfig;
}

public void setCosmosEndToEndLatencyPolicyConfig(CosmosEndToEndOperationLatencyPolicyConfig endToEndOperationLatencyPolicyConfig) {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Recommendation — Incomplete Wiring: override() missing e2e config propagation

The new endToEndOperationLatencyPolicyConfig field is properly added to the clone constructor and getter/setter, but the override(CosmosRequestOptions) method (line 416) does not propagate it. Compare with CosmosQueryRequestOptionsBase.override() (line 549) which includes:

this.cosmosEndToEndOperationLatencyPolicyConfig = overrideOption(
    cosmosRequestOptions.getCosmosEndToEndLatencyPolicyConfig(),
    this.cosmosEndToEndOperationLatencyPolicyConfig);

Why this matters: queryDocumentChangeFeedFromPagedFluxInternal applies operation policies at line 4848 via CosmosOperationDetails, which calls override() on the underlying options. If a CosmosOperationPolicy sets an e2e timeout via CosmosRequestOptions, that config will be silently dropped for change feed operations — unlike query operations where it propagates correctly.

Suggested fix: Add the following to CosmosChangeFeedRequestOptionsImpl.override():

this.endToEndOperationLatencyPolicyConfig = overrideOption(
    cosmosRequestOptions.getCosmosEndToEndLatencyPolicyConfig(),
    this.endToEndOperationLatencyPolicyConfig);

⚠️ AI-generated review — may be incorrect. Agree? → resolve the conversation. Disagree? → reply with your reasoning.

this.endToEndOperationLatencyPolicyConfig = endToEndOperationLatencyPolicyConfig;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1520,6 +1520,50 @@ private static <T> Flux<FeedResponse<T>> getFeedResponseFluxWithTimeout(
});
}

private static <T> Flux<FeedResponse<T>> getChangeFeedResponseFluxWithTimeout(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can getFeedResponseFluxWithTimeout be reused?

Flux<FeedResponse<T>> feedResponseFlux,
CosmosEndToEndOperationLatencyPolicyConfig endToEndPolicyConfig,
DiagnosticsClientContext diagnosticsClientContext) {

Duration endToEndTimeout = endToEndPolicyConfig.getEndToEndOperationTimeout();

if (endToEndTimeout.isNegative()) {
return feedResponseFlux
.timeout(endToEndTimeout)
.onErrorMap(throwable -> {
if (throwable instanceof TimeoutException) {
CosmosException cancellationException = getNegativeTimeoutException(null, endToEndTimeout);
cancellationException.setStackTrace(throwable.getStackTrace());

CosmosDiagnostics mostRecentDiagnostics = diagnosticsClientContext.getMostRecentlyCreatedDiagnostics();
if (mostRecentDiagnostics != null) {
BridgeInternal.setCosmosDiagnostics(cancellationException, mostRecentDiagnostics);
}

return cancellationException;
}
return throwable;
});
}

return feedResponseFlux
.timeout(endToEndTimeout)
.onErrorMap(throwable -> {
if (throwable instanceof TimeoutException) {
CosmosException exception = new OperationCancelledException();
exception.setStackTrace(throwable.getStackTrace());

CosmosDiagnostics mostRecentDiagnostics = diagnosticsClientContext.getMostRecentlyCreatedDiagnostics();
if (mostRecentDiagnostics != null) {
BridgeInternal.setCosmosDiagnostics(exception, mostRecentDiagnostics);
}

return exception;
}
return throwable;
});
}

private void addUserAgentSuffix(UserAgentContainer userAgentContainer, Set<UserAgentFeatureFlags> userAgentFeatureFlags) {

if (!this.globalPartitionEndpointManagerForPerPartitionAutomaticFailover.isPerPartitionAutomaticFailoverEnabled()) {
Expand Down Expand Up @@ -4775,7 +4819,25 @@ public <T> Flux<FeedResponse<T>> queryDocumentChangeFeed(
diagnosticsClientContext,
crossRegionAvailabilityContextForRequest);

return changeFeedQueryImpl.executeAsync();
CosmosChangeFeedRequestOptionsImpl implOptions =
ImplementationBridgeHelpers
.CosmosChangeFeedRequestOptionsHelper
.getCosmosChangeFeedRequestOptionsAccessor()
.getImpl(requestOptions);

CosmosEndToEndOperationLatencyPolicyConfig endToEndPolicyConfig =
this.getEffectiveEndToEndOperationLatencyPolicyConfig(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Recommendation — Behavioral Change: PPAF/client-level e2e timeout now applies to Change Feed Processor

With getEffectiveEndToEndOperationLatencyPolicyConfig resolving the effective policy, change feed operations are now subject to:

  1. Client-level cosmosEndToEndOperationLatencyPolicyConfig (if set on the builder)
  2. PPAF-enforced defaults (ppafEnforcedE2ELatencyPolicyConfigForReads) — since OperationType.ReadFeed returns true for isReadOnlyOperation()

Previously, CosmosChangeFeedRequestOptionsImpl.getCosmosEndToEndLatencyPolicyConfig() returned null, so queryDocumentChangeFeed never applied timeout. Now, even when the user doesn't explicitly set a timeout on CosmosChangeFeedRequestOptions, the effective config resolution can produce a non-null timeout.

Why this matters: The Change Feed Processor (CFP) creates its own CosmosChangeFeedRequestOptions without setting e2e timeout. PartitionedByIdCollectionRequestOptionsFactory explicitly disables e2e timeout for lease operations (CosmosItemRequestOptions, CosmosQueryRequestOptions) but does not create a disabled config for the data-path change feed queries. If a user configures client-level e2e timeout (e.g., 5s for point reads) and also uses the Change Feed Processor, CFP's change feed queries could now receive unexpected OperationCancelledException.

Suggested action: Consider whether CFP should explicitly set a disabled/null e2e config on its change feed options, or document this behavioral change so users can adjust their client-level config accordingly.

⚠️ AI-generated review — may be incorrect. Agree? → resolve the conversation. Disagree? → reply with your reasoning.

implOptions.getCosmosEndToEndLatencyPolicyConfig(),
ResourceType.Document,
OperationType.ReadFeed);

Flux<FeedResponse<T>> feedResponseFlux = changeFeedQueryImpl.executeAsync();

if (endToEndPolicyConfig != null && endToEndPolicyConfig.isEnabled()) {
return getChangeFeedResponseFluxWithTimeout(feedResponseFlux, endToEndPolicyConfig, diagnosticsClientContext);
}

return feedResponseFlux;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

import com.azure.cosmos.ConsistencyLevel;
import com.azure.cosmos.CosmosDiagnosticsThresholds;
import com.azure.cosmos.CosmosEndToEndOperationLatencyPolicyConfig;
import com.azure.cosmos.CosmosItemSerializer;
import com.azure.cosmos.ReadConsistencyStrategy;
import com.azure.cosmos.implementation.CosmosChangeFeedRequestOptionsImpl;
Expand Down Expand Up @@ -116,6 +117,20 @@ public CosmosChangeFeedRequestOptions setReadConsistencyStrategy(ReadConsistency
return this;
}

/**
* Sets the {@link CosmosEndToEndOperationLatencyPolicyConfig} to be used for the request. If the config is already
* set on the client, then this will override the client level config for this request.
*
* @param cosmosEndToEndOperationLatencyPolicyConfig the {@link CosmosEndToEndOperationLatencyPolicyConfig}
* @return the CosmosChangeFeedRequestOptions
*/
public CosmosChangeFeedRequestOptions setCosmosEndToEndOperationLatencyPolicyConfig(
CosmosEndToEndOperationLatencyPolicyConfig cosmosEndToEndOperationLatencyPolicyConfig) {

this.actualRequestOptions.setCosmosEndToEndLatencyPolicyConfig(cosmosEndToEndOperationLatencyPolicyConfig);
return this;
}

/**
* Gets the maximum number of pages that will be prefetched from the backend asynchronously
* in the background. By pre-fetching these changes the throughput of processing the
Expand Down
Loading