diff --git a/api/src/main/java/io/grpc/ClientStreamTracer.java b/api/src/main/java/io/grpc/ClientStreamTracer.java index 42e1fdfebea..07ceb11fa59 100644 --- a/api/src/main/java/io/grpc/ClientStreamTracer.java +++ b/api/src/main/java/io/grpc/ClientStreamTracer.java @@ -57,6 +57,23 @@ public void streamCreated(@Grpc.TransportAttr Attributes transportAttrs, Metadat public void createPendingStream() { } + /** + * A delay segment started with a specific reason during load balancing. + * + * @param reasonToken the reason for the delay, e.g., "pick_first:connecting" + * @since 1.82.0 + */ + public void delayStarted(String reasonToken) { + } + + /** + * The current delay segment ended. + * + * @since 1.82.0 + */ + public void delayEnded() { + } + /** * Headers has been sent to the socket. */ diff --git a/api/src/main/java/io/grpc/LoadBalancer.java b/api/src/main/java/io/grpc/LoadBalancer.java index 3187ae8ef1b..d3af8822058 100644 --- a/api/src/main/java/io/grpc/LoadBalancer.java +++ b/api/src/main/java/io/grpc/LoadBalancer.java @@ -549,25 +549,30 @@ public static final class PickResult { // True if the result is created by withDrop() private final boolean drop; @Nullable private final String authorityOverride; + @Nullable private final String delayReasonToken; private PickResult( @Nullable Subchannel subchannel, @Nullable ClientStreamTracer.Factory streamTracerFactory, Status status, boolean drop) { - this.subchannel = subchannel; - this.streamTracerFactory = streamTracerFactory; - this.status = checkNotNull(status, "status"); - this.drop = drop; - this.authorityOverride = null; + this(subchannel, streamTracerFactory, status, drop, null, null); } private PickResult( @Nullable Subchannel subchannel, @Nullable ClientStreamTracer.Factory streamTracerFactory, Status status, boolean drop, @Nullable String authorityOverride) { + this(subchannel, streamTracerFactory, status, drop, authorityOverride, null); + } + + private PickResult( + @Nullable Subchannel subchannel, @Nullable ClientStreamTracer.Factory streamTracerFactory, + Status status, boolean drop, @Nullable String authorityOverride, + @Nullable String delayReasonToken) { this.subchannel = subchannel; this.streamTracerFactory = streamTracerFactory; this.status = checkNotNull(status, "status"); this.drop = drop; this.authorityOverride = authorityOverride; + this.delayReasonToken = delayReasonToken; } /** @@ -727,6 +732,22 @@ public static PickResult withNoResult() { return NO_RESULT; } + /** + * No decision could be made. The RPC will stay buffered with a specific reason. + * + * @since 1.82.0 + */ + public static PickResult withNoResult(String delayReasonToken) { + Preconditions.checkNotNull(delayReasonToken, "delayReasonToken"); + return new PickResult(null, null, Status.OK, false, null, delayReasonToken); + } + + /** Returns the delay reason token if any. */ + @Nullable + public String getDelayReasonToken() { + return delayReasonToken; + } + /** Returns the authority override if any. */ @ExperimentalApi("https://github.com/grpc/grpc-java/issues/11656") @Nullable diff --git a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java index 5569e1eecf8..b9269350088 100644 --- a/core/src/main/java/io/grpc/internal/DelayedClientTransport.java +++ b/core/src/main/java/io/grpc/internal/DelayedClientTransport.java @@ -157,7 +157,8 @@ public final ClientStream newStream( synchronized (lock) { PickerState newerState = pickerState; if (state == newerState) { - return createPendingStream(args, tracers, pickResult); + String token = pickResult != null ? pickResult.getDelayReasonToken() : null; + return createPendingStream(args, tracers, pickResult, token); } state = newerState; } @@ -173,8 +174,8 @@ public final ClientStream newStream( */ @GuardedBy("lock") private PendingStream createPendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers, - PickResult pickResult) { - PendingStream pendingStream = new PendingStream(args, tracers); + PickResult pickResult, @Nullable String delayReasonToken) { + PendingStream pendingStream = new PendingStream(args, tracers, delayReasonToken); if (args.getCallOptions().isWaitForReady() && pickResult != null && pickResult.hasResult()) { pendingStream.lastPickStatus = pickResult.getStatus(); } @@ -303,6 +304,7 @@ final void reprocess(@Nullable SubchannelPicker picker) { final ClientTransport transport = GrpcUtil.getTransportFromPickResult(pickResult, callOptions.isWaitForReady()); if (transport != null) { + stream.endDelay(); Executor executor = defaultAppExecutor; // createRealStream may be expensive. It will start real streams on the transport. If // there are pending requests, they will be serialized too, which may be expensive. Since @@ -315,7 +317,9 @@ final void reprocess(@Nullable SubchannelPicker picker) { executor.execute(runnable); } toRemove.add(stream); - } // else: stay pending + } else { // stay pending + stream.updateDelayReason(pickResult.getDelayReasonToken()); + } } synchronized (lock) { @@ -361,11 +365,44 @@ private class PendingStream extends DelayedStream { private final Context context = Context.current(); private final ClientStreamTracer[] tracers; private volatile Status lastPickStatus; + @Nullable private String delayReasonToken; - private PendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers) { + private PendingStream(PickSubchannelArgs args, ClientStreamTracer[] tracers, + @Nullable String initialToken) { super("connecting_and_lb"); this.args = args; this.tracers = tracers; + this.delayReasonToken = initialToken; + if (initialToken != null) { + for (ClientStreamTracer tracer : tracers) { + tracer.delayStarted(initialToken); + } + } + } + + void updateDelayReason(String newToken) { + if (!java.util.Objects.equals(delayReasonToken, newToken)) { + if (delayReasonToken != null) { + for (ClientStreamTracer tracer : tracers) { + tracer.delayEnded(); + } + } + delayReasonToken = newToken; + if (newToken != null) { + for (ClientStreamTracer tracer : tracers) { + tracer.delayStarted(newToken); + } + } + } + } + + void endDelay() { + if (delayReasonToken != null) { + for (ClientStreamTracer tracer : tracers) { + tracer.delayEnded(); + } + delayReasonToken = null; + } } /** Runnable may be null. */ @@ -391,6 +428,7 @@ private Runnable createRealStream(ClientTransport transport, String authorityOve @Override public void cancel(Status reason) { + endDelay(); super.cancel(reason); synchronized (lock) { if (reportTransportTerminated != null) { diff --git a/core/src/main/java/io/grpc/internal/ForwardingClientStreamTracer.java b/core/src/main/java/io/grpc/internal/ForwardingClientStreamTracer.java index e7679ea14cc..e4c2b3b9933 100644 --- a/core/src/main/java/io/grpc/internal/ForwardingClientStreamTracer.java +++ b/core/src/main/java/io/grpc/internal/ForwardingClientStreamTracer.java @@ -39,6 +39,16 @@ public void createPendingStream() { delegate().createPendingStream(); } + @Override + public void delayStarted(String reasonToken) { + delegate().delayStarted(reasonToken); + } + + @Override + public void delayEnded() { + delegate().delayEnded(); + } + @Override public void outboundHeaders() { delegate().outboundHeaders(); diff --git a/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java b/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java index cf4b4c94e04..4111700fffe 100644 --- a/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java +++ b/core/src/main/java/io/grpc/internal/PickFirstLoadBalancer.java @@ -38,6 +38,8 @@ * list and sticking to the first that works. */ final class PickFirstLoadBalancer extends LoadBalancer { + private static final PickResult CONNECTING_RESULT = + PickResult.withNoResult("pick_first:connecting"); private final Helper helper; private Subchannel subchannel; private ConnectivityState currentState = IDLE; @@ -83,7 +85,7 @@ public void onSubchannelState(ConnectivityStateInfo stateInfo) { // The channel state does not get updated when doing name resolving today, so for the moment // let LB report CONNECTION and call subchannel.requestConnection() immediately. - updateBalancingState(CONNECTING, new FixedResultPicker(PickResult.withNoResult())); + updateBalancingState(CONNECTING, new FixedResultPicker(CONNECTING_RESULT)); subchannel.requestConnection(); } else { subchannel.updateAddresses(servers); @@ -135,7 +137,7 @@ private void processSubchannelState(Subchannel subchannel, ConnectivityStateInfo case CONNECTING: // It's safe to use RequestConnectionPicker here, so when coming from IDLE we could leave // the current picker in-place. But ignoring the potential optimization is simpler. - picker = new FixedResultPicker(PickResult.withNoResult()); + picker = new FixedResultPicker(CONNECTING_RESULT); break; case READY: picker = new FixedResultPicker(PickResult.withSubchannel(subchannel)); diff --git a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java index d7e1d4ca4f6..8f34295a701 100644 --- a/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java +++ b/core/src/test/java/io/grpc/internal/DelayedClientTransportTest.java @@ -772,6 +772,55 @@ public void pendingStream_appendTimeoutInsight_waitForReady_withLastPickFailure( + " connecting_and_lb_delay=[0-9]+ns, was_still_waiting]"); } + @Test + public void streamDelayMetrics() { + ClientStreamTracer mockTracer = mock(ClientStreamTracer.class); + ClientStreamTracer[] customTracers = new ClientStreamTracer[] { mockTracer }; + + SubchannelPicker connectingPicker = mock(SubchannelPicker.class); + when(connectingPicker.pickSubchannel(any(PickSubchannelArgs.class))) + .thenReturn(PickResult.withNoResult("pick_first:connecting")); + + delayedTransport.reprocess(connectingPicker); + delayedTransport.newStream(method, headers, callOptions, customTracers); + + InOrder inOrder = inOrder(mockTracer); + inOrder.verify(mockTracer).delayStarted("pick_first:connecting"); + + SubchannelPicker customDelayPicker = mock(SubchannelPicker.class); + when(customDelayPicker.pickSubchannel(any(PickSubchannelArgs.class))) + .thenReturn(PickResult.withNoResult("rls:lookup_pending")); + + delayedTransport.reprocess(customDelayPicker); + + inOrder.verify(mockTracer).delayEnded(); + inOrder.verify(mockTracer).delayStarted("rls:lookup_pending"); + + delayedTransport.reprocess(mockPicker); + + inOrder.verify(mockTracer).delayEnded(); + } + + @Test + public void streamDelayMetrics_cancelled() { + ClientStreamTracer mockTracer = mock(ClientStreamTracer.class); + ClientStreamTracer[] customTracers = new ClientStreamTracer[] { mockTracer }; + + SubchannelPicker connectingPicker = mock(SubchannelPicker.class); + when(connectingPicker.pickSubchannel(any(PickSubchannelArgs.class))) + .thenReturn(PickResult.withNoResult("pick_first:connecting")); + + delayedTransport.reprocess(connectingPicker); + ClientStream stream = delayedTransport.newStream(method, headers, callOptions, customTracers); + stream.start(streamListener); + + verify(mockTracer).delayStarted("pick_first:connecting"); + + stream.cancel(Status.CANCELLED); + + verify(mockTracer).delayEnded(); + } + private static TransportProvider newTransportProvider(final ClientTransport transport) { return new TransportProvider() { @Override diff --git a/core/src/test/java/io/grpc/internal/PickFirstLoadBalancerTest.java b/core/src/test/java/io/grpc/internal/PickFirstLoadBalancerTest.java index 1e130423a45..5bfabd7ea0e 100644 --- a/core/src/test/java/io/grpc/internal/PickFirstLoadBalancerTest.java +++ b/core/src/test/java/io/grpc/internal/PickFirstLoadBalancerTest.java @@ -147,8 +147,9 @@ public void pickAfterResolved() throws Exception { verify(mockSubchannel).requestConnection(); // Calling pickSubchannel() twice gave the same result - assertEquals(pickerCaptor.getValue().pickSubchannel(mockArgs), - pickerCaptor.getValue().pickSubchannel(mockArgs)); + PickResult result = pickerCaptor.getValue().pickSubchannel(mockArgs); + assertThat(result.getDelayReasonToken()).isEqualTo("pick_first:connecting"); + assertEquals(result, pickerCaptor.getValue().pickSubchannel(mockArgs)); verifyNoMoreInteractions(mockHelper); } diff --git a/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java b/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java index a2846fd04c8..2748a2679f1 100644 --- a/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java +++ b/rls/src/main/java/io/grpc/rls/CachingRlsLbClient.java @@ -1050,7 +1050,7 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { convertRlsServerStatus(response.getStatus(), lbPolicyConfig.getRouteLookupConfig().lookupService())); } else { - return PickResult.withNoResult(); + return PickResult.withNoResult("rls:lookup_pending"); } } diff --git a/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java b/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java index a52390743a6..d5d94c4dd6e 100644 --- a/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java +++ b/rls/src/test/java/io/grpc/rls/RlsLoadBalancerTest.java @@ -262,6 +262,7 @@ public void lb_working_withDefaultTarget_rlsResponding() throws Exception { PickResult res = picker.pickSubchannel(searchSubchannelArgs); assertThat(res.getStatus().isOk()).isTrue(); assertThat(res.getSubchannel()).isNull(); + assertThat(res.getDelayReasonToken()).isEqualTo("rls:lookup_pending"); // Cache is warm, but still unconnected res = picker.pickSubchannel(searchSubchannelArgs); inOrder.verify(helper).createSubchannel(any(CreateSubchannelArgs.class)); @@ -493,6 +494,7 @@ public void lb_working_withoutDefaultTarget() throws Exception { PickResult res = picker.pickSubchannel(searchSubchannelArgs); assertThat(res.getStatus().isOk()).isTrue(); assertThat(res.getSubchannel()).isNull(); + assertThat(res.getDelayReasonToken()).isEqualTo("rls:lookup_pending"); // Cache is warm, but still unconnected res = picker.pickSubchannel(searchSubchannelArgs); inOrder.verify(helper).createSubchannel(any(CreateSubchannelArgs.class)); diff --git a/util/src/main/java/io/grpc/util/ForwardingClientStreamTracer.java b/util/src/main/java/io/grpc/util/ForwardingClientStreamTracer.java index 9c9998571e5..1bf24b12a19 100644 --- a/util/src/main/java/io/grpc/util/ForwardingClientStreamTracer.java +++ b/util/src/main/java/io/grpc/util/ForwardingClientStreamTracer.java @@ -38,6 +38,16 @@ public void createPendingStream() { delegate().createPendingStream(); } + @Override + public void delayStarted(String reasonToken) { + delegate().delayStarted(reasonToken); + } + + @Override + public void delayEnded() { + delegate().delayEnded(); + } + @Override public void outboundHeaders() { delegate().outboundHeaders(); diff --git a/util/src/main/java/io/grpc/util/RoundRobinLoadBalancer.java b/util/src/main/java/io/grpc/util/RoundRobinLoadBalancer.java index 22940e875ac..ab0b2c49c21 100644 --- a/util/src/main/java/io/grpc/util/RoundRobinLoadBalancer.java +++ b/util/src/main/java/io/grpc/util/RoundRobinLoadBalancer.java @@ -41,8 +41,10 @@ * EquivalentAddressGroup}s from the {@link NameResolver}. */ final class RoundRobinLoadBalancer extends MultiChildLoadBalancer { + private static final PickResult CONNECTING_RESULT = + PickResult.withNoResult("round_robin:connecting"); private final AtomicInteger sequence = new AtomicInteger(new Random().nextInt()); - private SubchannelPicker currentPicker = new FixedResultPicker(PickResult.withNoResult()); + private SubchannelPicker currentPicker = new FixedResultPicker(CONNECTING_RESULT); public RoundRobinLoadBalancer(Helper helper) { super(helper); @@ -68,7 +70,7 @@ protected void updateOverallBalancingState() { } if (isConnecting) { - updateBalancingState(CONNECTING, new FixedResultPicker(PickResult.withNoResult())); + updateBalancingState(CONNECTING, new FixedResultPicker(CONNECTING_RESULT)); } else { updateBalancingState(TRANSIENT_FAILURE, createReadyPicker(getChildLbStates())); } diff --git a/util/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java b/util/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java index 18854ca1bb6..895cf9b4251 100644 --- a/util/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java +++ b/util/src/test/java/io/grpc/util/RoundRobinLoadBalancerTest.java @@ -86,7 +86,7 @@ public class RoundRobinLoadBalancerTest { private static final Attributes.Key MAJOR_KEY = Attributes.Key.create("major-key"); private static final SubchannelPicker EMPTY_PICKER = - new FixedResultPicker(PickResult.withNoResult()); + new FixedResultPicker(PickResult.withNoResult("round_robin:connecting")); @Rule public final MockitoRule mocks = MockitoJUnit.rule(); diff --git a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java index f6ee60ab1ef..8be155ec0f8 100644 --- a/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java +++ b/xds/src/main/java/io/grpc/xds/CdsLoadBalancer2.java @@ -17,6 +17,7 @@ package io.grpc.xds; import static com.google.common.base.Preconditions.checkNotNull; +import static io.grpc.ConnectivityState.CONNECTING; import static io.grpc.ConnectivityState.TRANSIENT_FAILURE; import static io.grpc.xds.XdsLbPolicies.CDS_POLICY_NAME; import static io.grpc.xds.XdsLbPolicies.PRIORITY_POLICY_NAME; @@ -119,6 +120,8 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) { errorPrefix() + "Unable to find non-dynamic cluster")); } // The dynamic cluster must not have loaded yet + helper.updateBalancingState( + CONNECTING, new FixedResultPicker(PickResult.withNoResult("cds:discovery_pending"))); return Status.OK; } if (!clusterConfigOr.hasValue()) { diff --git a/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java b/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java index 6e4566de76d..ea26c8cc2bc 100644 --- a/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java @@ -322,7 +322,11 @@ public void updateBalancingState(final ConnectivityState newState, } ConnectivityState oldState = connectivityState; connectivityState = newState; - picker = newPicker; + if (newState == CONNECTING || newState == IDLE) { + picker = new PriorityPicker(newPicker, priority); + } else { + picker = newPicker; + } if (deletionTimer != null && deletionTimer.isPending()) { return; @@ -357,4 +361,41 @@ protected Helper delegate() { } } } + + private static final class PriorityPicker extends SubchannelPicker { + private final SubchannelPicker delegate; + private final String priority; + + PriorityPicker(SubchannelPicker delegate, String priority) { + this.delegate = com.google.common.base.Preconditions.checkNotNull(delegate, "delegate"); + this.priority = com.google.common.base.Preconditions.checkNotNull(priority, "priority"); + } + + @Override + public PickResult pickSubchannel(PickSubchannelArgs args) { + PickResult childResult = delegate.pickSubchannel(args); + if (!childResult.hasResult() && childResult.getDelayReasonToken() != null) { + return PickResult.withNoResult( + "priority_" + priority + ":" + childResult.getDelayReasonToken()); + } + return childResult; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + PriorityPicker that = (PriorityPicker) o; + return delegate.equals(that.delegate) && priority.equals(that.priority); + } + + @Override + public int hashCode() { + return java.util.Objects.hash(delegate, priority); + } + } } diff --git a/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java b/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java index 513f4d643ea..15cd5dba621 100644 --- a/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java +++ b/xds/src/main/java/io/grpc/xds/RingHashLoadBalancer.java @@ -356,6 +356,8 @@ public static EquivalentAddressGroup stripAttrs(EquivalentAddressGroup eag) { } private static final class RingHashPicker extends SubchannelPicker { + private static final PickResult RING_HASH_CONNECTING_RESULT = + PickResult.withNoResult("ring_hash:connecting"); private final SynchronizationContext syncContext; private final List ring; // Avoid synchronization between pickSubchannel and subchannel's connectivity state change, @@ -453,7 +455,7 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { // RPCs can be buffered if the next subchannel is pending (per A62). Otherwise, RPCs // are failed unless there is a READY connection. if (subchannelView.connectivityState == CONNECTING) { - return PickResult.withNoResult(); + return RING_HASH_CONNECTING_RESULT; } if (subchannelView.connectivityState == IDLE) { @@ -463,7 +465,8 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { } }); - return PickResult.withNoResult(); // Indicates that this should be retried after backoff + // Indicates that this should be retried after backoff + return RING_HASH_CONNECTING_RESULT; } } } else { @@ -487,7 +490,7 @@ public PickResult pickSubchannel(PickSubchannelArgs args) { } } if (requestedConnection) { - return PickResult.withNoResult(); + return RING_HASH_CONNECTING_RESULT; } } diff --git a/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java b/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java index ff4813fe6a8..51e0d08f223 100644 --- a/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java +++ b/xds/src/test/java/io/grpc/xds/CdsLoadBalancer2Test.java @@ -343,6 +343,34 @@ public void dynamicCluster() { assertThat(this.lastXdsConfig.getClusters()).doesNotContainKey(clusterName); } + @Test + public void discoverDynamicCluster_pending_emitsToken() { + String clusterName = "cluster2"; + CdsConfig cdsConfig = new CdsConfig(clusterName, /*dynamic=*/ true); + + XdsConfig xdsConfig = new XdsConfig(null, null, null, ImmutableMap.of()); + + loadBalancer.acceptResolvedAddresses(ResolvedAddresses.newBuilder() + .setAddresses(Collections.emptyList()) + .setAttributes(Attributes.newBuilder() + .set(XdsAttributes.XDS_CONFIG, xdsConfig) + .set( + XdsAttributes.XDS_CLUSTER_SUBSCRIPT_REGISTRY, + new XdsConfig.XdsClusterSubscriptionRegistry() { + @Override + public XdsConfig.Subscription subscribeToCluster(String clusterName) { + return mock(XdsConfig.Subscription.class); + } + }) + .build()) + .setLoadBalancingPolicyConfig(cdsConfig) + .build()); + + verify(helper).updateBalancingState(eq(ConnectivityState.CONNECTING), pickerCaptor.capture()); + PickResult result = pickerCaptor.getValue().pickSubchannel(mock(PickSubchannelArgs.class)); + assertThat(result.getDelayReasonToken()).isEqualTo("cds:discovery_pending"); + } + @Test public void discoverAggregateCluster_createsPriorityLbPolicy() { CdsLoadBalancerProvider cdsLoadBalancerProvider = new CdsLoadBalancerProvider(lbRegistry); diff --git a/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java index beb568be9ce..6f0db55a8a7 100644 --- a/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java @@ -531,7 +531,8 @@ public void connectingResetFailOverIfSeenReadyOrIdleSinceTransientFailure() { .setLoadBalancingPolicyConfig(priorityLbConfig) .build()); // Nothing important about this verify, other than to provide a baseline - verify(helper).updateBalancingState(eq(CONNECTING), pickerReturns(PickResult.withNoResult())); + verify(helper, times(2)) + .updateBalancingState(eq(CONNECTING), pickerReturns(PickResult.withNoResult())); assertThat(fooBalancers).hasSize(1); assertThat(fooHelpers).hasSize(1); Helper helper0 = Iterables.getOnlyElement(fooHelpers); @@ -547,7 +548,7 @@ public void connectingResetFailOverIfSeenReadyOrIdleSinceTransientFailure() { helper0.updateBalancingState( CONNECTING, EMPTY_PICKER); - verify(helper, times(2)) + verify(helper, times(3)) .updateBalancingState(eq(CONNECTING), pickerReturns(PickResult.withNoResult())); // failover happens @@ -573,7 +574,7 @@ public void failoverTimerNotRestartedOnDupConnecting() { .setLoadBalancingPolicyConfig(priorityLbConfig) .build()); // Nothing important about this verify, other than to provide a baseline - inOrder.verify(helper) + inOrder.verify(helper, times(2)) .updateBalancingState(eq(CONNECTING), pickerReturns(PickResult.withNoResult())); assertThat(fooBalancers).hasSize(1); assertThat(fooHelpers).hasSize(1); @@ -591,7 +592,7 @@ public void failoverTimerNotRestartedOnDupConnecting() { fakeClock.forwardTime(5, TimeUnit.SECONDS); assertThat(fooBalancers).hasSize(2); assertThat(fooHelpers).hasSize(2); - inOrder.verify(helper, times(2)) + inOrder.verify(helper, times(3)) .updateBalancingState(eq(CONNECTING), pickerReturns(PickResult.withNoResult())); Helper helper1 = Iterables.getLast(fooHelpers); @@ -869,7 +870,7 @@ public void raceBetweenShutdownAndChildLbBalancingStateUpdate() { .setAddresses(ImmutableList.of()) .setLoadBalancingPolicyConfig(priorityLbConfig) .build()); - verify(helper).updateBalancingState(eq(CONNECTING), isA(SubchannelPicker.class)); + verify(helper, times(2)).updateBalancingState(eq(CONNECTING), isA(SubchannelPicker.class)); // LB shutdown and subchannel state change can happen simultaneously. If shutdown runs first, // any further balancing state update should be ignored. @@ -907,7 +908,37 @@ public void noDuplicateOverallBalancingStateUpdate() { .setLoadBalancingPolicyConfig(priorityLbConfig) .build()); - verify(helper, times(4)).updateBalancingState(any(), any()); + verify(helper, times(6)).updateBalancingState(any(), any()); + } + + @Test + public void priorityPicker_prependsToken() throws Exception { + PriorityChildConfig priorityChildConfig0 = + new PriorityChildConfig(newChildConfig(fooLbProvider, new Object()), true); + PriorityLbConfig priorityLbConfig = + new PriorityLbConfig(ImmutableMap.of("p0", priorityChildConfig0), ImmutableList.of("p0")); + + priorityLb.acceptResolvedAddresses( + ResolvedAddresses.newBuilder() + .setAddresses(ImmutableList.of()) + .setLoadBalancingPolicyConfig(priorityLbConfig) + .build()); + + Helper helper0 = Iterables.getOnlyElement(fooHelpers); // priority p0 + + SubchannelPicker mockChildPicker = mock(SubchannelPicker.class); + when(mockChildPicker.pickSubchannel(any(PickSubchannelArgs.class))) + .thenReturn(PickResult.withNoResult("child_token")); + + helper0.updateBalancingState(CONNECTING, mockChildPicker); + + verify(helper, atLeastOnce()) + .updateBalancingState(eq(CONNECTING), pickerCaptor.capture()); + + SubchannelPicker priorityPicker = pickerCaptor.getValue(); + PickResult result = priorityPicker.pickSubchannel(mock(PickSubchannelArgs.class)); + + assertThat(result.getDelayReasonToken()).isEqualTo("priority_p0:child_token"); } private void assertLatestConnectivityState(ConnectivityState expectedState) { diff --git a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java index b515ed81158..931d1f4df8e 100644 --- a/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java +++ b/xds/src/test/java/io/grpc/xds/RingHashLoadBalancerTest.java @@ -160,6 +160,7 @@ public void subchannelLazyConnectUntilPicked() { PickResult result = pickerCaptor.getValue().pickSubchannel(args); assertThat(result.getStatus().isOk()).isTrue(); assertThat(result.getSubchannel()).isNull(); + assertThat(result.getDelayReasonToken()).isEqualTo("ring_hash:connecting"); Subchannel subchannel = Iterables.getOnlyElement(subchannels.values()); int expectedTimes = PickFirstLoadBalancerProvider.isEnabledNewPickFirst() && !PickFirstLoadBalancerProvider.isEnabledHappyEyeballs() ? 1 : 2; @@ -524,6 +525,7 @@ public void pickWithRandomHash_atLeastOneSubchannelConnecting() { PickResult result = picker.pickSubchannel(args); assertThat(result.getStatus().isOk()).isTrue(); assertThat(result.getSubchannel()).isNull(); // buffer request + assertThat(result.getDelayReasonToken()).isEqualTo("ring_hash:connecting"); verifyConnection(0); } @@ -546,6 +548,7 @@ public void pickWithRandomHash_firstSubchannelInTransientFailure_remainingSubcha PickResult result = picker.pickSubchannel(args); assertThat(result.getStatus().isOk()).isTrue(); assertThat(result.getSubchannel()).isNull(); // buffer request + assertThat(result.getDelayReasonToken()).isEqualTo("ring_hash:connecting"); verifyConnection(1); } @@ -1161,6 +1164,7 @@ public void tfWithReadyChild_doesNotTriggerIdleChildConnection() { assertThat(connectionRequestedQueue.poll()).isNull(); } + private List initializeLbSubchannels(RingHashConfig config, List servers, InitializationFlags... initFlags) {