diff --git a/xds/src/main/java/io/grpc/xds/internal/grpcservice/CachedChannelManager.java b/xds/src/main/java/io/grpc/xds/internal/grpcservice/CachedChannelManager.java new file mode 100644 index 00000000000..779cea29e5d --- /dev/null +++ b/xds/src/main/java/io/grpc/xds/internal/grpcservice/CachedChannelManager.java @@ -0,0 +1,138 @@ +/* + * Copyright 2026 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds.internal.grpcservice; + +import static com.google.common.base.Preconditions.checkNotNull; + +import com.google.auto.value.AutoValue; +import com.google.common.annotations.VisibleForTesting; +import io.grpc.ManagedChannel; +import io.grpc.xds.client.ConfiguredChannelCredentials.ChannelCredsConfig; +import io.grpc.xds.internal.grpcservice.GrpcServiceConfig.GoogleGrpcConfig; +import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Function; + +/** + * Concrete class managing the lifecycle of a single ManagedChannel for a GrpcServiceConfig. + */ +public class CachedChannelManager { + private final Function channelCreator; + private final Object lock = new Object(); + + private final AtomicReference channelHolder = new AtomicReference<>(); + private boolean closed; + + /** + * Default constructor for production that creates a channel using the config's target and + * credentials. + */ + public CachedChannelManager() { + this(config -> { + GoogleGrpcConfig googleGrpc = config.googleGrpc(); + return io.grpc.Grpc.newChannelBuilder(googleGrpc.target(), + googleGrpc.configuredChannelCredentials().channelCredentials()).build(); + }); + } + + /** + * Constructor for testing to inject a channel creator. + */ + @VisibleForTesting + public CachedChannelManager(Function channelCreator) { + this.channelCreator = checkNotNull(channelCreator, "channelCreator"); + } + + /** + * Returns a ManagedChannel for the given configuration. If the target or credentials config + * changes, the old channel is shut down and a new one is created. + */ + public ManagedChannel getChannel(GrpcServiceConfig config) { + GoogleGrpcConfig googleGrpc = config.googleGrpc(); + ChannelKey newChannelKey = ChannelKey.of( + googleGrpc.target(), + googleGrpc.configuredChannelCredentials().channelCredsConfig()); + + // 1. Fast path: Lock-free read + ChannelHolder holder = channelHolder.get(); + if (holder != null && holder.channelKey().equals(newChannelKey)) { + return holder.channel(); + } + + ManagedChannel oldChannel = null; + ManagedChannel newChannel; + + // 2. Slow path: Update with locking + synchronized (lock) { + if (closed) { + throw new IllegalStateException("CachedChannelManager is closed"); + } + holder = channelHolder.get(); // Double check + if (holder != null && holder.channelKey().equals(newChannelKey)) { + return holder.channel(); + } + + // 3. Create inside lock to avoid creation storms + newChannel = channelCreator.apply(config); + ChannelHolder newHolder = ChannelHolder.create(newChannelKey, newChannel); + + if (holder != null) { + oldChannel = holder.channel(); + } + channelHolder.set(newHolder); + } + + // 4. Shutdown outside lock + if (oldChannel != null) { + oldChannel.shutdown(); + } + + return newChannel; + } + + /** Removes underlying resources on shutdown. */ + public void close() { + synchronized (lock) { + closed = true; + ChannelHolder holder = channelHolder.getAndSet(null); + if (holder != null) { + holder.channel().shutdown(); + } + } + } + + @AutoValue + abstract static class ChannelKey { + static ChannelKey of(String target, ChannelCredsConfig credentialsConfig) { + return new AutoValue_CachedChannelManager_ChannelKey(target, credentialsConfig); + } + + abstract String target(); + + abstract ChannelCredsConfig channelCredsConfig(); + } + + @AutoValue + abstract static class ChannelHolder { + static ChannelHolder create(ChannelKey channelKey, ManagedChannel channel) { + return new AutoValue_CachedChannelManager_ChannelHolder(channelKey, channel); + } + + abstract ChannelKey channelKey(); + + abstract ManagedChannel channel(); + } +} diff --git a/xds/src/test/java/io/grpc/xds/internal/grpcservice/CachedChannelManagerTest.java b/xds/src/test/java/io/grpc/xds/internal/grpcservice/CachedChannelManagerTest.java new file mode 100644 index 00000000000..76a3d469797 --- /dev/null +++ b/xds/src/test/java/io/grpc/xds/internal/grpcservice/CachedChannelManagerTest.java @@ -0,0 +1,165 @@ +/* + * Copyright 2026 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.xds.internal.grpcservice; + +import static com.google.common.truth.Truth.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.common.collect.ImmutableList; +import io.grpc.ManagedChannel; +import io.grpc.xds.client.ConfiguredChannelCredentials; +import io.grpc.xds.client.ConfiguredChannelCredentials.ChannelCredsConfig; +import io.grpc.xds.internal.grpcservice.GrpcServiceConfig.GoogleGrpcConfig; +import java.util.function.Function; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnit; +import org.mockito.junit.MockitoRule; + +/** + * Unit tests for {@link CachedChannelManager}. + */ +@RunWith(JUnit4.class) +public class CachedChannelManagerTest { + + @Rule + public final MockitoRule mocks = MockitoJUnit.rule(); + + @Mock + private Function mockCreator; + + @Mock + private ManagedChannel mockChannel1; + + @Mock + private ManagedChannel mockChannel2; + + private CachedChannelManager manager; + + private GrpcServiceConfig config1; + private GrpcServiceConfig config2; + + @Before + public void setUp() { + manager = new CachedChannelManager(mockCreator); + + config1 = buildConfig("authz.service.com", "creds1"); + config2 = buildConfig("authz.service.com", "creds2"); // Different creds instance + } + + private GrpcServiceConfig buildConfig(String target, String credsType) { + ChannelCredsConfig credsConfig = mock(ChannelCredsConfig.class); + when(credsConfig.type()).thenReturn(credsType); + + ConfiguredChannelCredentials creds = ConfiguredChannelCredentials.create( + mock(io.grpc.ChannelCredentials.class), credsConfig); + + GoogleGrpcConfig googleGrpc = GoogleGrpcConfig.builder() + .target(target) + .configuredChannelCredentials(creds) + .build(); + + return GrpcServiceConfig.builder() + .googleGrpc(googleGrpc) + .initialMetadata(ImmutableList.of()) + .build(); + } + + @Test + public void getChannel_sameConfig_returnsCached() { + when(mockCreator.apply(config1)).thenReturn(mockChannel1); + + ManagedChannel channela = manager.getChannel(config1); + ManagedChannel channelb = manager.getChannel(config1); + + assertThat(channela).isSameInstanceAs(mockChannel1); + assertThat(channelb).isSameInstanceAs(mockChannel1); + verify(mockCreator, org.mockito.Mockito.times(1)).apply(config1); + } + + @Test + public void getChannel_differentConfig_shutsDownOldAndReturnsNew() { + when(mockCreator.apply(config1)).thenReturn(mockChannel1); + when(mockCreator.apply(config2)).thenReturn(mockChannel2); + + ManagedChannel channel1 = manager.getChannel(config1); + assertThat(channel1).isSameInstanceAs(mockChannel1); + + ManagedChannel channel2 = manager.getChannel(config2); + assertThat(channel2).isSameInstanceAs(mockChannel2); + + verify(mockChannel1).shutdown(); + verify(mockCreator, org.mockito.Mockito.times(1)).apply(config1); + verify(mockCreator, org.mockito.Mockito.times(1)).apply(config2); + } + + @Test + public void close_shutsDownChannel() { + when(mockCreator.apply(config1)).thenReturn(mockChannel1); + + manager.getChannel(config1); + manager.close(); + + verify(mockChannel1).shutdown(); + } + + @Test + public void getChannel_afterClose_throwsException() { + when(mockCreator.apply(config1)).thenReturn(mockChannel1); + + manager.getChannel(config1); + manager.close(); + + try { + manager.getChannel(config1); + org.junit.Assert.fail("Expected IllegalStateException"); + } catch (IllegalStateException e) { + assertThat(e).hasMessageThat().contains("CachedChannelManager is closed"); + } + } + + @Test + public void constructor_defaultCreatesChannel() { + CachedChannelManager defaultManager = new CachedChannelManager(); + io.grpc.ChannelCredentials creds = io.grpc.InsecureChannelCredentials.create(); + ChannelCredsConfig credsConfig = mock(ChannelCredsConfig.class); + when(credsConfig.type()).thenReturn("insecure"); + ConfiguredChannelCredentials configuredCreds = + ConfiguredChannelCredentials.create(creds, credsConfig); + GoogleGrpcConfig googleGrpc = GoogleGrpcConfig.builder() + .target("localhost:8080") + .configuredChannelCredentials(configuredCreds) + .build(); + GrpcServiceConfig config = GrpcServiceConfig.builder() + .googleGrpc(googleGrpc) + .initialMetadata(ImmutableList.of()) + .build(); + + ManagedChannel channel = defaultManager.getChannel(config); + assertThat(channel).isNotNull(); + + channel.shutdownNow(); + defaultManager.close(); + } + +}