Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion xds/src/main/java/io/grpc/xds/PriorityLoadBalancer.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import io.grpc.Status;
import io.grpc.SynchronizationContext;
import io.grpc.SynchronizationContext.ScheduledHandle;
import io.grpc.internal.GrpcUtil;
import io.grpc.util.ForwardingLoadBalancerHelper;
import io.grpc.util.GracefulSwitchLoadBalancer;
import io.grpc.xds.PriorityLoadBalancerProvider.PriorityLbConfig;
Expand Down Expand Up @@ -73,6 +74,8 @@ final class PriorityLoadBalancer extends LoadBalancer {
private SubchannelPicker currentPicker;
// Set to true if currently in the process of handling resolved addresses.
private boolean handlingResolvedAddresses;
static boolean enablePriorityLbChildPolicyCache =
GrpcUtil.getFlag("GRPC_EXPERIMENTAL_ENABLE_PRIORITY_LB_CHILD_POLICY_CACHE", false);

PriorityLoadBalancer(Helper helper) {
this.helper = checkNotNull(helper, "helper");
Expand All @@ -98,7 +101,12 @@ public Status acceptResolvedAddresses(ResolvedAddresses resolvedAddresses) {
if (!prioritySet.contains(priority)) {
ChildLbState childLbState = children.get(priority);
if (childLbState != null) {
childLbState.deactivate();
if (enablePriorityLbChildPolicyCache) {
childLbState.deactivate();
} else {
childLbState.tearDown();
children.remove(priority);
}
}
}
}
Expand Down
179 changes: 141 additions & 38 deletions xds/src/test/java/io/grpc/xds/PriorityLoadBalancerTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,106 @@ public void tearDown() {

@Test
public void acceptResolvedAddresses() {
boolean originalFlagVal = PriorityLoadBalancer.enablePriorityLbChildPolicyCache;
PriorityLoadBalancer.enablePriorityLbChildPolicyCache = true;
try {
SocketAddress socketAddress = new InetSocketAddress(8080);
EquivalentAddressGroup eag = new EquivalentAddressGroup(socketAddress);
eag = AddressFilter.setPathFilter(eag, ImmutableList.of("p1"));
List<EquivalentAddressGroup> addresses = ImmutableList.of(eag);
Attributes attributes =
Attributes.newBuilder().set(Attributes.Key.create("fakeKey"), "fakeValue").build();
Object fooConfig0 = new Object();
PriorityChildConfig priorityChildConfig0 =
new PriorityChildConfig(newChildConfig(fooLbProvider, fooConfig0), true);
Object barConfig0 = new Object();
PriorityChildConfig priorityChildConfig1 =
new PriorityChildConfig(newChildConfig(barLbProvider, barConfig0), true);
Object fooConfig1 = new Object();
PriorityChildConfig priorityChildConfig2 =
new PriorityChildConfig(newChildConfig(fooLbProvider, fooConfig1), true);
PriorityLbConfig priorityLbConfig =
new PriorityLbConfig(
ImmutableMap.of("p0", priorityChildConfig0, "p1", priorityChildConfig1,
"p2", priorityChildConfig2),
ImmutableList.of("p0", "p1", "p2"));
Status status = priorityLb.acceptResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(addresses)
.setAttributes(attributes)
.setLoadBalancingPolicyConfig(priorityLbConfig)
.build());
assertThat(status.getCode()).isEqualTo(Status.Code.OK);
assertThat(fooBalancers).hasSize(1);
assertThat(barBalancers).isEmpty();
LoadBalancer fooBalancer0 = Iterables.getOnlyElement(fooBalancers);
verify(fooBalancer0).acceptResolvedAddresses(resolvedAddressesCaptor.capture());
ResolvedAddresses addressesReceived = resolvedAddressesCaptor.getValue();
assertThat(addressesReceived.getAddresses()).isEmpty();
assertThat(addressesReceived.getAttributes()).isEqualTo(attributes);
assertThat(addressesReceived.getLoadBalancingPolicyConfig()).isEqualTo(fooConfig0);

// Fail over to p1.
fakeClock.forwardTime(10, TimeUnit.SECONDS);
assertThat(fooBalancers).hasSize(1);
assertThat(barBalancers).hasSize(1);
LoadBalancer barBalancer0 = Iterables.getOnlyElement(barBalancers);
verify(barBalancer0).acceptResolvedAddresses(resolvedAddressesCaptor.capture());
addressesReceived = resolvedAddressesCaptor.getValue();
assertThat(Iterables.getOnlyElement(addressesReceived.getAddresses()).getAddresses())
.containsExactly(socketAddress);
assertThat(addressesReceived.getAttributes()).isEqualTo(attributes);
assertThat(addressesReceived.getLoadBalancingPolicyConfig()).isEqualTo(barConfig0);

// Fail over to p2.
fakeClock.forwardTime(10, TimeUnit.SECONDS);
assertThat(fooBalancers).hasSize(2);
assertThat(barBalancers).hasSize(1);
LoadBalancer fooBalancer1 = Iterables.getLast(fooBalancers);
verify(fooBalancer1).acceptResolvedAddresses(resolvedAddressesCaptor.capture());
addressesReceived = resolvedAddressesCaptor.getValue();
assertThat(addressesReceived.getAddresses()).isEmpty();
assertThat(addressesReceived.getAttributes()).isEqualTo(attributes);
assertThat(addressesReceived.getLoadBalancingPolicyConfig()).isEqualTo(fooConfig1);

// New update: p0 and p2 deleted; p1 config changed.
SocketAddress newSocketAddress = new InetSocketAddress(8081);
EquivalentAddressGroup newEag = new EquivalentAddressGroup(newSocketAddress);
newEag = AddressFilter.setPathFilter(newEag, ImmutableList.of("p1"));
List<EquivalentAddressGroup> newAddresses = ImmutableList.of(newEag);
Object newBarConfig = new Object();
PriorityLbConfig newPriorityLbConfig =
new PriorityLbConfig(
ImmutableMap.of("p1",
new PriorityChildConfig(newChildConfig(barLbProvider, newBarConfig), true)),
ImmutableList.of("p1"));
status = priorityLb.acceptResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(newAddresses)
.setLoadBalancingPolicyConfig(newPriorityLbConfig)
.build());
assertThat(status.getCode()).isEqualTo(Status.Code.OK);
assertThat(fooBalancers).hasSize(2);
assertThat(barBalancers).hasSize(1);
verify(barBalancer0, times(2)).acceptResolvedAddresses(resolvedAddressesCaptor.capture());
addressesReceived = resolvedAddressesCaptor.getValue();
assertThat(Iterables.getOnlyElement(addressesReceived.getAddresses()).getAddresses())
.containsExactly(newSocketAddress);
assertThat(addressesReceived.getAttributes()).isEqualTo(Attributes.EMPTY);
assertThat(addressesReceived.getLoadBalancingPolicyConfig()).isEqualTo(newBarConfig);
verify(fooBalancer0, never()).shutdown();
verify(fooBalancer1, never()).shutdown();
fakeClock.forwardTime(15, TimeUnit.MINUTES);
verify(fooBalancer0).shutdown();
verify(fooBalancer1).shutdown();
verify(barBalancer0, never()).shutdown();
} finally {
PriorityLoadBalancer.enablePriorityLbChildPolicyCache = originalFlagVal;
}
}

@Test
public void acceptResolvedAddresses_cacheDisabled() {
SocketAddress socketAddress = new InetSocketAddress(8080);
EquivalentAddressGroup eag = new EquivalentAddressGroup(socketAddress);
eag = AddressFilter.setPathFilter(eag, ImmutableList.of("p1"));
Expand Down Expand Up @@ -233,9 +333,6 @@ public void acceptResolvedAddresses() {
.containsExactly(newSocketAddress);
assertThat(addressesReceived.getAttributes()).isEqualTo(Attributes.EMPTY);
assertThat(addressesReceived.getLoadBalancingPolicyConfig()).isEqualTo(newBarConfig);
verify(fooBalancer0, never()).shutdown();
verify(fooBalancer1, never()).shutdown();
fakeClock.forwardTime(15, TimeUnit.MINUTES);
verify(fooBalancer0).shutdown();
verify(fooBalancer1).shutdown();
verify(barBalancer0, never()).shutdown();
Expand Down Expand Up @@ -297,41 +394,47 @@ public void acceptResolvedAddresses_propagatesChildFailures() {

@Test
public void handleNameResolutionError() {
Object fooConfig0 = new Object();
PriorityChildConfig priorityChildConfig0 =
new PriorityChildConfig(newChildConfig(fooLbProvider, fooConfig0), true);
Object fooConfig1 = new Object();
PriorityChildConfig priorityChildConfig1 =
new PriorityChildConfig(newChildConfig(fooLbProvider, fooConfig1), true);

PriorityLbConfig priorityLbConfig =
new PriorityLbConfig(ImmutableMap.of("p0", priorityChildConfig0), ImmutableList.of("p0"));
priorityLb.acceptResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
.setLoadBalancingPolicyConfig(priorityLbConfig)
.build());
LoadBalancer fooLb0 = Iterables.getOnlyElement(fooBalancers);
Status status = Status.DATA_LOSS.withDescription("fake error");
priorityLb.handleNameResolutionError(status);
verify(fooLb0).handleNameResolutionError(status);

priorityLbConfig =
new PriorityLbConfig(ImmutableMap.of("p1", priorityChildConfig1), ImmutableList.of("p1"));
priorityLb.acceptResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
.setLoadBalancingPolicyConfig(priorityLbConfig)
.build());
assertThat(fooBalancers).hasSize(2);
LoadBalancer fooLb1 = Iterables.getLast(fooBalancers);
status = Status.UNAVAILABLE.withDescription("fake error");
priorityLb.handleNameResolutionError(status);
// fooLb0 is deactivated but not yet deleted. However, because it is delisted by the latest
// address update, name resolution error will not be propagated to it.
verify(fooLb0, never()).shutdown();
verify(fooLb0, never()).handleNameResolutionError(status);
verify(fooLb1).handleNameResolutionError(status);
boolean originalFlagVal = PriorityLoadBalancer.enablePriorityLbChildPolicyCache;
PriorityLoadBalancer.enablePriorityLbChildPolicyCache = true;
try {
Object fooConfig0 = new Object();
PriorityChildConfig priorityChildConfig0 =
new PriorityChildConfig(newChildConfig(fooLbProvider, fooConfig0), true);
Object fooConfig1 = new Object();
PriorityChildConfig priorityChildConfig1 =
new PriorityChildConfig(newChildConfig(fooLbProvider, fooConfig1), true);

PriorityLbConfig priorityLbConfig =
new PriorityLbConfig(ImmutableMap.of("p0", priorityChildConfig0), ImmutableList.of("p0"));
priorityLb.acceptResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
.setLoadBalancingPolicyConfig(priorityLbConfig)
.build());
LoadBalancer fooLb0 = Iterables.getOnlyElement(fooBalancers);
Status status = Status.DATA_LOSS.withDescription("fake error");
priorityLb.handleNameResolutionError(status);
verify(fooLb0).handleNameResolutionError(status);

priorityLbConfig =
new PriorityLbConfig(ImmutableMap.of("p1", priorityChildConfig1), ImmutableList.of("p1"));
priorityLb.acceptResolvedAddresses(
ResolvedAddresses.newBuilder()
.setAddresses(ImmutableList.<EquivalentAddressGroup>of())
.setLoadBalancingPolicyConfig(priorityLbConfig)
.build());
assertThat(fooBalancers).hasSize(2);
LoadBalancer fooLb1 = Iterables.getLast(fooBalancers);
status = Status.UNAVAILABLE.withDescription("fake error");
priorityLb.handleNameResolutionError(status);
// fooLb0 is deactivated but not yet deleted. However, because it is delisted by the latest
// address update, name resolution error will not be propagated to it.
verify(fooLb0, never()).shutdown();
verify(fooLb0, never()).handleNameResolutionError(status);
verify(fooLb1).handleNameResolutionError(status);
} finally {
PriorityLoadBalancer.enablePriorityLbChildPolicyCache = originalFlagVal;
}
}

@Test
Expand Down
Loading