From cf78cef966bfbd73c643396059b5b95c16ee5770 Mon Sep 17 00:00:00 2001 From: agravator Date: Mon, 23 Feb 2026 22:00:27 +0530 Subject: [PATCH 1/8] fix otel baggage prop --- .../grpc/opentelemetry/GrpcOpenTelemetry.java | 2 +- .../OpenTelemetryMetricsModule.java | 26 ++++++++++++------- 2 files changed, 18 insertions(+), 10 deletions(-) diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java index 6904340ac74..444a0246391 100644 --- a/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java @@ -101,7 +101,7 @@ private GrpcOpenTelemetry(Builder builder) { this.optionalLabels = ImmutableList.copyOf(builder.optionalLabels); this.openTelemetryMetricsModule = new OpenTelemetryMetricsModule( STOPWATCH_SUPPLIER, resource, optionalLabels, builder.plugins, - builder.targetFilter); + openTelemetrySdk.getPropagators(), builder.targetFilter); this.openTelemetryTracingModule = new OpenTelemetryTracingModule(openTelemetrySdk); this.sink = new OpenTelemetryMetricSink(meter, enableMetrics, disableDefault, optionalLabels); } diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java index b05884305dc..55fb19739a9 100644 --- a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java @@ -49,6 +49,7 @@ import io.opentelemetry.api.baggage.Baggage; import io.opentelemetry.api.common.AttributesBuilder; import io.opentelemetry.context.Context; +import io.opentelemetry.context.propagation.ContextPropagators; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -100,24 +101,28 @@ final class OpenTelemetryMetricsModule { private final boolean localityEnabled; private final boolean backendServiceEnabled; private final ImmutableList plugins; + private final ContextPropagators aggregators; @Nullable private final TargetFilter targetAttributeFilter; OpenTelemetryMetricsModule(Supplier stopwatchSupplier, OpenTelemetryMetricsResource resource, - Collection optionalLabels, List plugins) { - this(stopwatchSupplier, resource, optionalLabels, plugins, null); + Collection optionalLabels, List plugins, + ContextPropagators aggregators) { + this(stopwatchSupplier, resource, optionalLabels, plugins, aggregators, null); } OpenTelemetryMetricsModule(Supplier stopwatchSupplier, OpenTelemetryMetricsResource resource, Collection optionalLabels, List plugins, + ContextPropagators aggregators, @Nullable TargetFilter targetAttributeFilter) { this.resource = checkNotNull(resource, "resource"); this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier"); this.localityEnabled = optionalLabels.contains(LOCALITY_KEY.getKey()); this.backendServiceEnabled = optionalLabels.contains(BACKEND_SERVICE_KEY.getKey()); this.plugins = ImmutableList.copyOf(plugins); + this.aggregators = checkNotNull(aggregators, "aggregators"); this.targetAttributeFilter = targetAttributeFilter; } @@ -159,8 +164,7 @@ static String recordMethodName(String fullMethodName, boolean isGeneratedMethod) return isGeneratedMethod ? fullMethodName : "other"; } - private static Context otelContextWithBaggage() { - Baggage baggage = BAGGAGE_KEY.get(); + private static Context otelContextWithBaggage(Baggage baggage) { if (baggage == null) { return Context.current(); } @@ -282,7 +286,7 @@ public void streamClosed(Status status) { } void recordFinishedAttempt() { - Context otelContext = otelContextWithBaggage(); + Context otelContext = otelContextWithBaggage(BAGGAGE_KEY.get()); AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder() .put(METHOD_KEY, fullMethodName) .put(TARGET_KEY, target) @@ -448,7 +452,7 @@ void callEnded(Status status) { } void recordFinishedCall() { - Context otelContext = otelContextWithBaggage(); + Context otelContext = otelContextWithBaggage(BAGGAGE_KEY.get()); if (attemptsPerCall.get() == 0) { ClientTracer tracer = newClientTracer(null); tracer.attemptNanos = attemptDelayStopwatch.elapsed(TimeUnit.NANOSECONDS); @@ -553,13 +557,15 @@ private static final class ServerTracer extends ServerStreamTracer { private final Stopwatch stopwatch; private volatile long outboundWireSize; private volatile long inboundWireSize; + private final Context otelContext; ServerTracer(OpenTelemetryMetricsModule module, String fullMethodName, - List streamPlugins) { + List streamPlugins, Context otelContext) { this.module = checkNotNull(module, "module"); this.fullMethodName = fullMethodName; this.streamPlugins = checkNotNull(streamPlugins, "streamPlugins"); this.stopwatch = module.stopwatchSupplier.get().start(); + this.otelContext = checkNotNull(otelContext, "otelContext"); } @Override @@ -606,7 +612,6 @@ public void inboundWireSize(long bytes) { */ @Override public void streamClosed(Status status) { - Context otelContext = otelContextWithBaggage(); if (streamClosedUpdater != null) { if (streamClosedUpdater.getAndSet(this, 1) != 0) { return; @@ -657,7 +662,10 @@ public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata } streamPlugins = Collections.unmodifiableList(streamPluginsMutable); } - return new ServerTracer(OpenTelemetryMetricsModule.this, fullMethodName, streamPlugins); + Context context = aggregators.getTextMapPropagator().extract( + Context.current(), headers, MetadataGetter.getInstance()); + return new ServerTracer(OpenTelemetryMetricsModule.this, fullMethodName, streamPlugins, + context); } } From 661c9411503022ceca073a5997ff55357fde8c36 Mon Sep 17 00:00:00 2001 From: agravator Date: Tue, 24 Feb 2026 09:40:44 +0530 Subject: [PATCH 2/8] fix test case --- .../OpenTelemetryMetricsModuleTest.java | 63 +++++++++++-------- 1 file changed, 36 insertions(+), 27 deletions(-) diff --git a/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java index 391f94cefea..d14e596beb1 100644 --- a/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java +++ b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java @@ -65,6 +65,8 @@ import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.metrics.DoubleHistogram; import io.opentelemetry.api.metrics.Meter; +import io.opentelemetry.context.Context; +import io.opentelemetry.context.propagation.TextMapSetter; import io.opentelemetry.sdk.common.InstrumentationScopeInfo; import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule; @@ -1245,7 +1247,8 @@ public void clientLocalityMetrics_present() { OpenTelemetryMetricsResource resource = GrpcOpenTelemetry.createMetricInstruments(testMeter, enabledMetricsMap, disableDefaultMetrics); OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule( - fakeClock.getStopwatchSupplier(), resource, Arrays.asList("grpc.lb.locality"), emptyList()); + fakeClock.getStopwatchSupplier(), resource, Arrays.asList("grpc.lb.locality"), + emptyList(), openTelemetryTesting.getOpenTelemetry().getPropagators()); OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory = new CallAttemptsTracerFactory(module, target, method.getFullMethodName(), emptyList()); @@ -1313,7 +1316,8 @@ public void clientLocalityMetrics_missing() { OpenTelemetryMetricsResource resource = GrpcOpenTelemetry.createMetricInstruments(testMeter, enabledMetricsMap, disableDefaultMetrics); OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule( - fakeClock.getStopwatchSupplier(), resource, Arrays.asList("grpc.lb.locality"), emptyList()); + fakeClock.getStopwatchSupplier(), resource, Arrays.asList("grpc.lb.locality"), + emptyList(), openTelemetryTesting.getOpenTelemetry().getPropagators()); OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory = new CallAttemptsTracerFactory(module, target, method.getFullMethodName(), emptyList()); @@ -1378,7 +1382,7 @@ public void clientBackendServiceMetrics_present() { enabledMetricsMap, disableDefaultMetrics); OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule( fakeClock.getStopwatchSupplier(), resource, Arrays.asList("grpc.lb.backend_service"), - emptyList()); + emptyList(), openTelemetryTesting.getOpenTelemetry().getPropagators()); OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory = new CallAttemptsTracerFactory(module, target, method.getFullMethodName(), emptyList()); @@ -1447,7 +1451,7 @@ public void clientBackendServiceMetrics_missing() { enabledMetricsMap, disableDefaultMetrics); OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule( fakeClock.getStopwatchSupplier(), resource, Arrays.asList("grpc.lb.backend_service"), - emptyList()); + emptyList(), openTelemetryTesting.getOpenTelemetry().getPropagators()); OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory = new CallAttemptsTracerFactory(module, target, method.getFullMethodName(), emptyList()); @@ -1631,37 +1635,39 @@ public void serverBasicMetrics() { @Test public void serverBaggagePropagationToMetrics() { - // 1. Create module and tracer factory using the mock resource - OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule( - fakeClock.getStopwatchSupplier(), resource, emptyList(), emptyList()); - ServerStreamTracer.Factory tracerFactory = module.getServerTracerFactory(); - ServerStreamTracer tracer = - tracerFactory.newServerStreamTracer(method.getFullMethodName(), new Metadata()); - - // 2. Define the test baggage and gRPC context + // 1. Define the test baggage Baggage testBaggage = Baggage.builder() .put("user-id", "67") .build(); - // This simulates the context that the Tracing module would have created - io.grpc.Context grpcContext = io.grpc.Context.current() - .withValue(OpenTelemetryConstants.BAGGAGE_KEY, testBaggage); + // 2. Inject baggage into headers + Metadata headers = new Metadata(); + openTelemetryTesting.getOpenTelemetry().getPropagators().getTextMapPropagator() + .inject(Context.root().with(testBaggage), headers, new TextMapSetter() { + @Override + public void set(Metadata carrier, String key, String value) { + carrier.put(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER), value); + } + }); + + // 3. Create module and tracer factory using the mock resource + OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule( + fakeClock.getStopwatchSupplier(), resource, emptyList(), emptyList(), + openTelemetryTesting.getOpenTelemetry().getPropagators()); + ServerStreamTracer.Factory tracerFactory = module.getServerTracerFactory(); + ServerStreamTracer tracer = tracerFactory.newServerStreamTracer(method.getFullMethodName(), + headers); - // 3. Attach the gRPC context, trigger metric recording, and detach - io.grpc.Context previousContext = grpcContext.attach(); - try { - tracer.streamClosed(Status.OK); - } finally { - grpcContext.detach(previousContext); - } + // 4. Trigger metric recording + tracer.streamClosed(Status.OK); - // 4. Verify the record call and capture the OTel Context + // 5. Verify the record call and capture the OTel Context verify(mockServerCallDurationHistogram).record( anyDouble(), any(io.opentelemetry.api.common.Attributes.class), contextCaptor.capture()); - // 5. Assert on the captured OTel Context + // 6. Assert on the captured OTel Context io.opentelemetry.context.Context capturedOtelContext = contextCaptor.getValue(); Baggage capturedBaggage = Baggage.fromContext(capturedOtelContext); @@ -1802,13 +1808,15 @@ public void targetAttributeFilter_rejectsTarget_mapsToOther() { private OpenTelemetryMetricsModule newOpenTelemetryMetricsModule( OpenTelemetryMetricsResource resource) { return new OpenTelemetryMetricsModule( - fakeClock.getStopwatchSupplier(), resource, emptyList(), emptyList()); + fakeClock.getStopwatchSupplier(), resource, emptyList(), emptyList(), + openTelemetryTesting.getOpenTelemetry().getPropagators()); } private OpenTelemetryMetricsModule newOpenTelemetryMetricsModule( OpenTelemetryMetricsResource resource, TargetFilter filter) { return new OpenTelemetryMetricsModule( - fakeClock.getStopwatchSupplier(), resource, emptyList(), emptyList(), filter); + fakeClock.getStopwatchSupplier(), resource, emptyList(), emptyList(), + openTelemetryTesting.getOpenTelemetry().getPropagators(), filter); } static class CallInfo extends ServerCallInfo { @@ -1848,7 +1856,8 @@ public void serverBaggagePropagation_EndToEnd() throws Exception { OpenTelemetry otel = openTelemetryTesting.getOpenTelemetry(); OpenTelemetryTracingModule tracingModule = new OpenTelemetryTracingModule(otel); OpenTelemetryMetricsModule metricsModule = new OpenTelemetryMetricsModule( - fakeClock.getStopwatchSupplier(), resource, emptyList(), emptyList()); + fakeClock.getStopwatchSupplier(), resource, emptyList(), emptyList(), + otel.getPropagators()); // 2. Create Server with *both* tracer factories server = InProcessServerBuilder.forName(serverName) From 396adea656550ac547103ff8bea68bcf5f819d65 Mon Sep 17 00:00:00 2001 From: agravator Date: Wed, 25 Feb 2026 13:41:23 +0530 Subject: [PATCH 3/8] otel: add tests --- .../OpenTelemetryMetricsModule.java | 17 +- .../OpenTelemetryMetricsModuleTest.java | 180 ++++++++++++++++++ 2 files changed, 189 insertions(+), 8 deletions(-) diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java index 55fb19739a9..df8230cda83 100644 --- a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java @@ -101,28 +101,28 @@ final class OpenTelemetryMetricsModule { private final boolean localityEnabled; private final boolean backendServiceEnabled; private final ImmutableList plugins; - private final ContextPropagators aggregators; + private final ContextPropagators contextPropagators; @Nullable private final TargetFilter targetAttributeFilter; OpenTelemetryMetricsModule(Supplier stopwatchSupplier, OpenTelemetryMetricsResource resource, Collection optionalLabels, List plugins, - ContextPropagators aggregators) { - this(stopwatchSupplier, resource, optionalLabels, plugins, aggregators, null); + ContextPropagators contextPropagators) { + this(stopwatchSupplier, resource, optionalLabels, plugins, contextPropagators, null); } OpenTelemetryMetricsModule(Supplier stopwatchSupplier, OpenTelemetryMetricsResource resource, Collection optionalLabels, List plugins, - ContextPropagators aggregators, - @Nullable TargetFilter targetAttributeFilter) { + ContextPropagators contextPropagators, + @Nullable TargetFilter targetAttributeFilter) { this.resource = checkNotNull(resource, "resource"); this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier"); this.localityEnabled = optionalLabels.contains(LOCALITY_KEY.getKey()); this.backendServiceEnabled = optionalLabels.contains(BACKEND_SERVICE_KEY.getKey()); this.plugins = ImmutableList.copyOf(plugins); - this.aggregators = checkNotNull(aggregators, "aggregators"); + this.contextPropagators = checkNotNull(contextPropagators, "contextPropagators"); this.targetAttributeFilter = targetAttributeFilter; } @@ -580,7 +580,7 @@ public void serverCallStarted(ServerCallInfo callInfo) { METHOD_KEY, recordMethodName(fullMethodName, isSampledToLocalTracing)); if (module.resource.serverCallCountCounter() != null) { - module.resource.serverCallCountCounter().add(1, attribute); + module.resource.serverCallCountCounter().add(1, attribute, otelContext); } } @@ -662,7 +662,7 @@ public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata } streamPlugins = Collections.unmodifiableList(streamPluginsMutable); } - Context context = aggregators.getTextMapPropagator().extract( + Context context = contextPropagators.getTextMapPropagator().extract( Context.current(), headers, MetadataGetter.getInstance()); return new ServerTracer(OpenTelemetryMetricsModule.this, fullMethodName, streamPlugins, context); @@ -725,3 +725,4 @@ public void onClose(Status status, Metadata trailers) { } } } + diff --git a/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java index d14e596beb1..3b4ab1732b1 100644 --- a/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java +++ b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java @@ -28,8 +28,13 @@ import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyDouble; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import com.google.common.base.Stopwatch; +import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import io.grpc.Attributes; import io.grpc.CallOptions; @@ -38,6 +43,7 @@ import io.grpc.ClientInterceptor; import io.grpc.ClientInterceptors; import io.grpc.ClientStreamTracer; +import io.grpc.ForwardingClientCall; import io.grpc.KnownLength; import io.grpc.Metadata; import io.grpc.MethodDescriptor; @@ -62,10 +68,15 @@ import io.grpc.testing.protobuf.SimpleServiceGrpc; import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.baggage.Baggage; +import io.opentelemetry.api.baggage.propagation.W3CBaggagePropagator; import io.opentelemetry.api.common.AttributeKey; import io.opentelemetry.api.metrics.DoubleHistogram; +import io.opentelemetry.api.metrics.LongCounter; +import io.opentelemetry.api.metrics.LongHistogram; import io.opentelemetry.api.metrics.Meter; import io.opentelemetry.context.Context; +import io.opentelemetry.context.propagation.ContextPropagators; +import io.opentelemetry.context.propagation.TextMapPropagator; import io.opentelemetry.context.propagation.TextMapSetter; import io.opentelemetry.sdk.common.InstrumentationScopeInfo; import io.opentelemetry.sdk.metrics.data.MetricData; @@ -76,6 +87,8 @@ import java.util.List; import java.util.Map; import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; @@ -96,6 +109,7 @@ */ @RunWith(JUnit4.class) public class OpenTelemetryMetricsModuleTest { + // ... existing code ... private static final CallOptions.Key CUSTOM_OPTION = CallOptions.Key.createWithDefault("option1", "default"); @@ -1910,4 +1924,170 @@ public void unaryRpc(SimpleRequest request, StreamObserver respo responseObserver.onCompleted(); } } + + @Test + public void serverMetricsShouldRecordContextWithBaggage() { + // Mocks + DoubleHistogram serverCallDurationCounter = mock(DoubleHistogram.class); + OpenTelemetryMetricsResource resource = mock(OpenTelemetryMetricsResource.class); + when(resource.serverCallDurationCounter()).thenReturn(serverCallDurationCounter); + + // ContextPropagators with Baggage + ContextPropagators propagators = ContextPropagators.create( + TextMapPropagator.composite(W3CBaggagePropagator.getInstance())); + + // Module + OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule( + Stopwatch::createUnstarted, + resource, + ImmutableList.of(), + ImmutableList.of(), + propagators); + + // Baggage to inject + Baggage baggage = Baggage.builder().put("my-baggage-key", "my-baggage-value").build(); + Metadata headers = new Metadata(); + propagators.getTextMapPropagator().inject(Context.root().with(baggage), headers, + new MetadataSetter()); + + // Create Tracer + io.grpc.ServerStreamTracer.Factory factory = module.getServerTracerFactory(); + io.grpc.ServerStreamTracer tracer = factory.newServerStreamTracer("test/method", headers); + + // Close stream logic + tracer.streamClosed(Status.OK); + + // Verify record called with context (which should have baggage) + verify(serverCallDurationCounter).record( + anyDouble(), + any(), + org.mockito.ArgumentMatchers.argThat(ctx -> { + Baggage b = Baggage.fromContext(ctx); + return "my-baggage-value".equals(b.getEntryValue("my-baggage-key")); + })); + } + + @Test + public void serverMetrics_withExternalExecutor_propagatesBaggage() throws Exception { + // Setup Mocks & Resource + DoubleHistogram serverCallDurationCounter = mock(DoubleHistogram.class); + LongCounter serverCallCountCounter = mock(LongCounter.class); + LongHistogram serverTotalSentCompressedMessageSizeCounter = mock(LongHistogram.class); + LongHistogram serverTotalReceivedCompressedMessageSizeCounter = mock(LongHistogram.class); + OpenTelemetryMetricsResource resource = mock(OpenTelemetryMetricsResource.class); + when(resource.serverCallDurationCounter()).thenReturn(serverCallDurationCounter); + when(resource.serverCallCountCounter()).thenReturn(serverCallCountCounter); + when(resource.serverTotalSentCompressedMessageSizeCounter()) + .thenReturn(serverTotalSentCompressedMessageSizeCounter); + when(resource.serverTotalReceivedCompressedMessageSizeCounter()) + .thenReturn(serverTotalReceivedCompressedMessageSizeCounter); + + // Setup Propagators + ContextPropagators propagators = ContextPropagators.create(W3CBaggagePropagator.getInstance()); + + // Initialize Module + OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule( + Stopwatch::createUnstarted, + resource, + ImmutableList.of(), + ImmutableList.of(), + propagators); + + // Setup Server with Wrapped Executor (Custom Executor) + ExecutorService customExecutor = Executors.newFixedThreadPool(2); + java.util.concurrent.Executor rawExecutor = customExecutor; + + String serverName = InProcessServerBuilder.generateName(); + io.grpc.Server server = InProcessServerBuilder.forName(serverName) + .executor(rawExecutor) + .addService(new SimpleServiceGrpc.SimpleServiceImplBase() { + @Override + public void unaryRpc( + SimpleRequest request, + StreamObserver responseObserver) { + responseObserver.onNext(SimpleResponse.getDefaultInstance()); + responseObserver.onCompleted(); + } + }) + .addStreamTracerFactory(module.getServerTracerFactory()) + .build().start(); + + // Client Interceptor to inject baggage + ClientInterceptor baggageInterceptor = new ClientInterceptor() { + @Override + public ClientCall interceptCall( + MethodDescriptor method, CallOptions callOptions, Channel next) { + return new ForwardingClientCall.SimpleForwardingClientCall( + next.newCall(method, callOptions)) { + @Override + public void start(Listener responseListener, Metadata headers) { + propagators.getTextMapPropagator().inject(Context.current(), headers, + new MetadataSetter()); + super.start(responseListener, headers); + } + }; + } + }; + + // Setup Client and Inject Baggage + io.grpc.ManagedChannel channel = InProcessChannelBuilder.forName(serverName) + .intercept(baggageInterceptor) + .directExecutor() + .build(); + SimpleServiceGrpc.SimpleServiceBlockingStub stub = SimpleServiceGrpc + .newBlockingStub(channel); + + // Define multiple Baggage items + Baggage testBaggage = Baggage.builder() + .put("key1", "value1") + .put("key2", "value/with/special:chars") + .build(); + + // Make the call with Baggage in Context + try (io.opentelemetry.context.Scope scope = Context.current().with(testBaggage).makeCurrent()) { + stub.unaryRpc(SimpleRequest.getDefaultInstance()); + } + + // Shutdown and Wait + channel.shutdownNow(); + server.shutdown().awaitTermination(5, TimeUnit.SECONDS); + customExecutor.shutdownNow(); + + // Verification Logic for Baggage + org.mockito.ArgumentMatcher baggageMatcher = ctx -> { + Baggage b = Baggage.fromContext(ctx); + return "value1".equals(b.getEntryValue("key1")) + && "value/with/special:chars".equals(b.getEntryValue("key2")); + }; + + // Verify all metrics recorded with correct baggage + // Use timeout to avoid race conditions as metrics might be recorded + // asynchronously + verify(serverCallDurationCounter, timeout(5000)).record( + anyDouble(), + any(), // Attributes + org.mockito.ArgumentMatchers.argThat(baggageMatcher)); + + verify(serverCallCountCounter, timeout(5000)).add( + org.mockito.ArgumentMatchers.eq(1L), + any(), // Attributes + org.mockito.ArgumentMatchers.argThat(baggageMatcher)); + + verify(serverTotalSentCompressedMessageSizeCounter, timeout(5000)).record( + org.mockito.ArgumentMatchers.anyLong(), + any(), // Attributes + org.mockito.ArgumentMatchers.argThat(baggageMatcher)); + + verify(serverTotalReceivedCompressedMessageSizeCounter, timeout(5000)).record( + org.mockito.ArgumentMatchers.anyLong(), + any(), // Attributes + org.mockito.ArgumentMatchers.argThat(baggageMatcher)); + } + + private static class MetadataSetter implements TextMapSetter { + @Override + public void set(Metadata carrier, String key, String value) { + carrier.put(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER), value); + } + } } From 96dccf448a8bf81f77b3b9d655b56764d6657562 Mon Sep 17 00:00:00 2001 From: agravator Date: Wed, 25 Feb 2026 14:10:13 +0530 Subject: [PATCH 4/8] fix format --- .../OpenTelemetryMetricsModuleTest.java | 21 ++++++++++--------- 1 file changed, 11 insertions(+), 10 deletions(-) diff --git a/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java index 3b4ab1732b1..5e4c11804e7 100644 --- a/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java +++ b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java @@ -31,7 +31,6 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; -import static org.mockito.Mockito.when; import com.google.common.base.Stopwatch; import com.google.common.collect.ImmutableList; @@ -1929,8 +1928,9 @@ public void unaryRpc(SimpleRequest request, StreamObserver respo public void serverMetricsShouldRecordContextWithBaggage() { // Mocks DoubleHistogram serverCallDurationCounter = mock(DoubleHistogram.class); - OpenTelemetryMetricsResource resource = mock(OpenTelemetryMetricsResource.class); - when(resource.serverCallDurationCounter()).thenReturn(serverCallDurationCounter); + OpenTelemetryMetricsResource resource = OpenTelemetryMetricsResource.builder() + .serverCallDurationCounter(serverCallDurationCounter) + .build(); // ContextPropagators with Baggage ContextPropagators propagators = ContextPropagators.create( @@ -1974,13 +1974,14 @@ public void serverMetrics_withExternalExecutor_propagatesBaggage() throws Except LongCounter serverCallCountCounter = mock(LongCounter.class); LongHistogram serverTotalSentCompressedMessageSizeCounter = mock(LongHistogram.class); LongHistogram serverTotalReceivedCompressedMessageSizeCounter = mock(LongHistogram.class); - OpenTelemetryMetricsResource resource = mock(OpenTelemetryMetricsResource.class); - when(resource.serverCallDurationCounter()).thenReturn(serverCallDurationCounter); - when(resource.serverCallCountCounter()).thenReturn(serverCallCountCounter); - when(resource.serverTotalSentCompressedMessageSizeCounter()) - .thenReturn(serverTotalSentCompressedMessageSizeCounter); - when(resource.serverTotalReceivedCompressedMessageSizeCounter()) - .thenReturn(serverTotalReceivedCompressedMessageSizeCounter); + OpenTelemetryMetricsResource resource = OpenTelemetryMetricsResource.builder() + .serverCallDurationCounter(serverCallDurationCounter) + .serverCallCountCounter(serverCallCountCounter) + .serverTotalSentCompressedMessageSizeCounter( + serverTotalSentCompressedMessageSizeCounter) + .serverTotalReceivedCompressedMessageSizeCounter( + serverTotalReceivedCompressedMessageSizeCounter) + .build(); // Setup Propagators ContextPropagators propagators = ContextPropagators.create(W3CBaggagePropagator.getInstance()); From 04daa070c71c7b01f7b73c51d261919eead2a75a Mon Sep 17 00:00:00 2001 From: agravator Date: Thu, 26 Feb 2026 15:00:45 +0530 Subject: [PATCH 5/8] cover missing cases --- .../OpenTelemetryMetricsModule.java | 12 +- .../OpenTelemetryMetricsModuleTest.java | 247 ++++++++++++++++++ 2 files changed, 256 insertions(+), 3 deletions(-) diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java index df8230cda83..d14ffb9abc6 100644 --- a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java @@ -632,17 +632,23 @@ public void streamClosed(Status status) { } io.opentelemetry.api.common.Attributes attributes = builder.build(); + Context ctxToRecord = otelContext; + Baggage currentBaggage = BAGGAGE_KEY.get(); + if (currentBaggage != null && !currentBaggage.isEmpty()) { + ctxToRecord = ctxToRecord.with(currentBaggage); + } + if (module.resource.serverCallDurationCounter() != null) { module.resource.serverCallDurationCounter() - .record(elapsedTimeNanos * SECONDS_PER_NANO, attributes, otelContext); + .record(elapsedTimeNanos * SECONDS_PER_NANO, attributes, ctxToRecord); } if (module.resource.serverTotalSentCompressedMessageSizeCounter() != null) { module.resource.serverTotalSentCompressedMessageSizeCounter() - .record(outboundWireSize, attributes, otelContext); + .record(outboundWireSize, attributes, ctxToRecord); } if (module.resource.serverTotalReceivedCompressedMessageSizeCounter() != null) { module.resource.serverTotalReceivedCompressedMessageSizeCounter() - .record(inboundWireSize, attributes, otelContext); + .record(inboundWireSize, attributes, ctxToRecord); } } } diff --git a/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java index 5e4c11804e7..cd3ba2dd1c0 100644 --- a/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java +++ b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java @@ -17,6 +17,7 @@ package io.grpc.opentelemetry; import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED; +import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.BAGGAGE_KEY; import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.LOCALITY_KEY; import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.METHOD_KEY; import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.STATUS_KEY; @@ -42,12 +43,14 @@ import io.grpc.ClientInterceptor; import io.grpc.ClientInterceptors; import io.grpc.ClientStreamTracer; +import io.grpc.Contexts; import io.grpc.ForwardingClientCall; import io.grpc.KnownLength; import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.ServerCall; import io.grpc.ServerCallHandler; +import io.grpc.ServerInterceptor; import io.grpc.ServerServiceDefinition; import io.grpc.ServerStreamTracer; import io.grpc.ServerStreamTracer.ServerCallInfo; @@ -59,7 +62,9 @@ import io.grpc.opentelemetry.GrpcOpenTelemetry.TargetFilter; import io.grpc.opentelemetry.OpenTelemetryMetricsModule.CallAttemptsTracerFactory; import io.grpc.opentelemetry.internal.OpenTelemetryConstants; +import io.grpc.stub.ClientCalls; import io.grpc.stub.MetadataUtils; +import io.grpc.stub.ServerCalls; import io.grpc.stub.StreamObserver; import io.grpc.testing.GrpcServerRule; import io.grpc.testing.protobuf.SimpleRequest; @@ -83,6 +88,7 @@ import java.io.IOException; import java.io.InputStream; import java.util.Arrays; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; @@ -2091,4 +2097,245 @@ public void set(Metadata carrier, String key, String value) { carrier.put(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER), value); } } + + @Test + public void clientMetric_baggagePropagation_externalExecutor() throws Exception { + String target = "target:///"; + ExecutorService executor = Executors.newSingleThreadExecutor(); + try { + // Mock the instrument to verify Context + DoubleHistogram mockHistogram = mock(DoubleHistogram.class); + OpenTelemetryMetricsResource resource = OpenTelemetryMetricsResource.builder() + .clientAttemptDurationCounter(mockHistogram) + .build(); + + OpenTelemetryMetricsModule module = newOpenTelemetryMetricsModule(resource); + + MethodDescriptor methodDescriptor = + MethodDescriptor.newBuilder() + .setType(MethodDescriptor.MethodType.UNARY) + .setFullMethodName("service/method") + .setRequestMarshaller(MARSHALLER) + .setResponseMarshaller(MARSHALLER) + .setSampledToLocalTracing(true) + .build(); + + ServerServiceDefinition serviceDef = ServerServiceDefinition.builder("service") + .addMethod(methodDescriptor, ServerCalls.asyncUnaryCall( + new ServerCalls.UnaryMethod() { + @Override + public void invoke(String req, StreamObserver responseObserver) { + responseObserver.onNext("Response"); + responseObserver.onCompleted(); + } + })) + .build(); + + server = InProcessServerBuilder.forName("client-baggage-test") + .directExecutor() + .addService(serviceDef) + .build().start(); + + InProcessChannelBuilder channelBuilder = + InProcessChannelBuilder.forName("client-baggage-test") + .executor(executor); + + // Use the module's interceptor + ClientInterceptor interceptor = module.getClientInterceptor(target); + channel = channelBuilder.intercept(interceptor).build(); + + Baggage baggage = Baggage.builder().put("client_key", "client_value").build(); + try (io.opentelemetry.context.Scope scope = Context.current().with(baggage).makeCurrent()) { + ClientCalls.blockingUnaryCall(channel, methodDescriptor, CallOptions.DEFAULT, "Request"); + } + + ArgumentCaptor contextCaptor = ArgumentCaptor.forClass(Context.class); + + // Use atLeastOnce() and timeout to allow for async execution + verify(mockHistogram, timeout(1000).atLeastOnce()) + .record(anyDouble(), any(io.opentelemetry.api.common.Attributes.class), + contextCaptor.capture()); + + boolean found = false; + for (Context ctx : contextCaptor.getAllValues()) { + Baggage b = Baggage.fromContext(ctx); + if ("client_value".equals(b.getEntryValue("client_key"))) { + found = true; + break; + } + } + assertTrue("Client baggage not found in metrics context", found); + } finally { + executor.shutdown(); + } + } + + @Test + public void serverMetric_baggagePropagation_externalExecutor() throws Exception { + ExecutorService executor = Executors.newSingleThreadExecutor(); + try { + // Mock the instrument + DoubleHistogram mockHistogram = mock(DoubleHistogram.class); + OpenTelemetryMetricsResource resource = OpenTelemetryMetricsResource.builder() + .serverCallDurationCounter(mockHistogram) + .build(); + + // Configure module with propagation + ContextPropagators propagators = + ContextPropagators.create(W3CBaggagePropagator.getInstance()); + + OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule( + fakeClock.getStopwatchSupplier(), resource, emptyList(), emptyList(), propagators); + + MethodDescriptor methodDescriptor = + MethodDescriptor.newBuilder() + .setType(MethodDescriptor.MethodType.UNARY) + .setFullMethodName("service/method") + .setRequestMarshaller(MARSHALLER) + .setResponseMarshaller(MARSHALLER) + .setSampledToLocalTracing(true) + .build(); + + ServerServiceDefinition serviceDef = ServerServiceDefinition.builder("service") + .addMethod(methodDescriptor, ServerCalls.asyncUnaryCall( + new ServerCalls.UnaryMethod() { + @Override + public void invoke(String req, StreamObserver responseObserver) { + responseObserver.onNext("Response"); + responseObserver.onCompleted(); + } + })) + .build(); + + // Use external executor by setting it on builder + InProcessServerBuilder serverBuilder = InProcessServerBuilder.forName("server-baggage-test") + .executor(executor) + .addService(serviceDef) + .addStreamTracerFactory(module.getServerTracerFactory()); + + server = serverBuilder.build().start(); + + channel = InProcessChannelBuilder.forName("server-baggage-test") + .directExecutor() + .build(); + + // We need to inject Baggage into the call. + ClientInterceptor baggageInjector = new ClientInterceptor() { + @Override + public ClientCall interceptCall( + MethodDescriptor method, CallOptions callOptions, Channel next) { + return new ForwardingClientCall.SimpleForwardingClientCall( + next.newCall(method, callOptions)) { + @Override + public void start(Listener responseListener, Metadata headers) { + // Inject baggage manually into headers + Baggage baggage = Baggage.builder().put("server_key", "server_value").build(); + propagators.getTextMapPropagator().inject(Context.current().with(baggage), headers, + new MetadataSetter()); + super.start(responseListener, headers); + } + }; + } + }; + + ClientCalls.blockingUnaryCall( + ClientInterceptors.intercept(channel, baggageInjector), + methodDescriptor, CallOptions.DEFAULT, "Request"); + + ArgumentCaptor contextCaptor = ArgumentCaptor.forClass(Context.class); + + verify(mockHistogram, timeout(1000).atLeastOnce()) + .record(anyDouble(), any(io.opentelemetry.api.common.Attributes.class), + contextCaptor.capture()); + + boolean found = false; + for (Context ctx : contextCaptor.getAllValues()) { + Baggage b = Baggage.fromContext(ctx); + if ("server_value".equals(b.getEntryValue("server_key"))) { + found = true; + break; + } + } + assertTrue("Server baggage not found in metrics context", found); + } finally { + executor.shutdown(); + } + } + + @Test + public void serverMetric_interceptedBaggage() throws Exception { + // This test verifies if baggage added by a ServerInterceptor is visible to + // OpenTelemetry metrics Context. + + OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule( + () -> Stopwatch.createUnstarted(), + resource, + Collections.emptyList(), /* optionalLabels */ + Collections.emptyList(), /* plugins */ + ContextPropagators.noop()); + + ServerInterceptor baggageInterceptor = new ServerInterceptor() { + @Override + public ServerCall.Listener interceptCall( + ServerCall call, Metadata headers, ServerCallHandler next) { + // Add baggage to the context + Baggage baggage = Baggage.builder() + .put("interceptor_key", "interceptor_value") + .build(); + io.grpc.Context ctx = io.grpc.Context.current().withValue(BAGGAGE_KEY, baggage); + return Contexts.interceptCall(ctx, call, headers, next); + } + }; + + MethodDescriptor methodDescriptor = + MethodDescriptor.newBuilder() + .setType(MethodDescriptor.MethodType.UNARY) + // Matching existing method name in test class + .setFullMethodName("package1.service2/method3") + .setRequestMarshaller(MARSHALLER) // Use existing MARSHALLER + .setResponseMarshaller(MARSHALLER) + .setSampledToLocalTracing(true) + .build(); + + ServerServiceDefinition serviceDef = ServerServiceDefinition.builder("package1.service2") + .addMethod(methodDescriptor, ServerCalls.asyncUnaryCall( + new ServerCalls.UnaryMethod() { + @Override + public void invoke(String req, StreamObserver responseObserver) { + responseObserver.onNext("Response"); + responseObserver.onCompleted(); + } + })) + .build(); + + InProcessServerBuilder serverBuilder = InProcessServerBuilder.forName("interceptor-test") + .addService(serviceDef) + .intercept(baggageInterceptor) + .addStreamTracerFactory(module.getServerTracerFactory()); + + server = serverBuilder.build().start(); + + // Use a real channel but we don't need metrics on client side for this test + InProcessChannelBuilder channelBuilder = InProcessChannelBuilder.forName("interceptor-test"); + channel = channelBuilder.build(); + + ClientCalls.blockingUnaryCall(channel, methodDescriptor, CallOptions.DEFAULT, "Request"); + + // Verify that record was called with a Context containing the baggage + // Reusing contextCaptor from the class + verify(mockServerCallDurationHistogram, timeout(1000).atLeastOnce()) + .record(anyDouble(), any(io.opentelemetry.api.common.Attributes.class), + contextCaptor.capture()); + + boolean found = false; + for (io.opentelemetry.context.Context ctx : contextCaptor.getAllValues()) { + // Baggage from OTEL Context + Baggage otelBaggage = Baggage.fromContext(ctx); + if ("interceptor_value".equals(otelBaggage.getEntryValue("interceptor_key"))) { + found = true; + break; + } + } + assertThat(found).isTrue(); + } } From 8480558d74bdfb8681e6fba55aa8a7022dc8c779 Mon Sep 17 00:00:00 2001 From: agravator Date: Mon, 2 Mar 2026 15:58:27 +0530 Subject: [PATCH 6/8] otel: baggage prop --- .../grpc/opentelemetry/GrpcOpenTelemetry.java | 2 +- .../OpenTelemetryMetricsModule.java | 88 +-- .../OpenTelemetryMetricsModuleTest.java | 700 +++++------------- 3 files changed, 217 insertions(+), 573 deletions(-) diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java index 444a0246391..6904340ac74 100644 --- a/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/GrpcOpenTelemetry.java @@ -101,7 +101,7 @@ private GrpcOpenTelemetry(Builder builder) { this.optionalLabels = ImmutableList.copyOf(builder.optionalLabels); this.openTelemetryMetricsModule = new OpenTelemetryMetricsModule( STOPWATCH_SUPPLIER, resource, optionalLabels, builder.plugins, - openTelemetrySdk.getPropagators(), builder.targetFilter); + builder.targetFilter); this.openTelemetryTracingModule = new OpenTelemetryTracingModule(openTelemetrySdk); this.sink = new OpenTelemetryMetricSink(meter, enableMetrics, disableDefault, optionalLabels); } diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java index d14ffb9abc6..45318e3abc4 100644 --- a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java @@ -48,8 +48,6 @@ import io.grpc.opentelemetry.GrpcOpenTelemetry.TargetFilter; import io.opentelemetry.api.baggage.Baggage; import io.opentelemetry.api.common.AttributesBuilder; -import io.opentelemetry.context.Context; -import io.opentelemetry.context.propagation.ContextPropagators; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -101,28 +99,24 @@ final class OpenTelemetryMetricsModule { private final boolean localityEnabled; private final boolean backendServiceEnabled; private final ImmutableList plugins; - private final ContextPropagators contextPropagators; @Nullable private final TargetFilter targetAttributeFilter; OpenTelemetryMetricsModule(Supplier stopwatchSupplier, OpenTelemetryMetricsResource resource, - Collection optionalLabels, List plugins, - ContextPropagators contextPropagators) { - this(stopwatchSupplier, resource, optionalLabels, plugins, contextPropagators, null); + Collection optionalLabels, List plugins) { + this(stopwatchSupplier, resource, optionalLabels, plugins, null); } OpenTelemetryMetricsModule(Supplier stopwatchSupplier, OpenTelemetryMetricsResource resource, Collection optionalLabels, List plugins, - ContextPropagators contextPropagators, - @Nullable TargetFilter targetAttributeFilter) { + @Nullable TargetFilter targetAttributeFilter) { this.resource = checkNotNull(resource, "resource"); this.stopwatchSupplier = checkNotNull(stopwatchSupplier, "stopwatchSupplier"); this.localityEnabled = optionalLabels.contains(LOCALITY_KEY.getKey()); this.backendServiceEnabled = optionalLabels.contains(BACKEND_SERVICE_KEY.getKey()); this.plugins = ImmutableList.copyOf(plugins); - this.contextPropagators = checkNotNull(contextPropagators, "contextPropagators"); this.targetAttributeFilter = targetAttributeFilter; } @@ -164,13 +158,6 @@ static String recordMethodName(String fullMethodName, boolean isGeneratedMethod) return isGeneratedMethod ? fullMethodName : "other"; } - private static Context otelContextWithBaggage(Baggage baggage) { - if (baggage == null) { - return Context.current(); - } - return Context.current().with(baggage); - } - private static final class ClientTracer extends ClientStreamTracer { @Nullable private static final AtomicLongFieldUpdater outboundWireSizeUpdater; @Nullable private static final AtomicLongFieldUpdater inboundWireSizeUpdater; @@ -286,7 +273,6 @@ public void streamClosed(Status status) { } void recordFinishedAttempt() { - Context otelContext = otelContextWithBaggage(BAGGAGE_KEY.get()); AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder() .put(METHOD_KEY, fullMethodName) .put(TARGET_KEY, target) @@ -312,15 +298,15 @@ void recordFinishedAttempt() { if (module.resource.clientAttemptDurationCounter() != null ) { module.resource.clientAttemptDurationCounter() - .record(attemptNanos * SECONDS_PER_NANO, attribute, otelContext); + .record(attemptNanos * SECONDS_PER_NANO, attribute); } if (module.resource.clientTotalSentCompressedMessageSizeCounter() != null) { module.resource.clientTotalSentCompressedMessageSizeCounter() - .record(outboundWireSize, attribute, otelContext); + .record(outboundWireSize, attribute); } if (module.resource.clientTotalReceivedCompressedMessageSizeCounter() != null) { module.resource.clientTotalReceivedCompressedMessageSizeCounter() - .record(inboundWireSize, attribute, otelContext); + .record(inboundWireSize, attribute); } } } @@ -452,7 +438,6 @@ void callEnded(Status status) { } void recordFinishedCall() { - Context otelContext = otelContextWithBaggage(BAGGAGE_KEY.get()); if (attemptsPerCall.get() == 0) { ClientTracer tracer = newClientTracer(null); tracer.attemptNanos = attemptDelayStopwatch.elapsed(TimeUnit.NANOSECONDS); @@ -474,8 +459,7 @@ void recordFinishedCall() { callLatencyNanos * SECONDS_PER_NANO, baseAttributes.toBuilder() .put(STATUS_KEY, status.getCode().toString()) - .build(), - otelContext + .build() ); } @@ -484,7 +468,7 @@ void recordFinishedCall() { long retriesPerCall = Math.max(attemptsPerCall.get() - 1, 0); if (retriesPerCall > 0) { module.resource.clientCallRetriesCounter() - .record(retriesPerCall, baseAttributes, otelContext); + .record(retriesPerCall, baseAttributes); } } @@ -493,7 +477,7 @@ void recordFinishedCall() { long hedges = hedgedAttemptsPerCall.get(); if (hedges > 0) { module.resource.clientCallHedgesCounter() - .record(hedges, baseAttributes, otelContext); + .record(hedges, baseAttributes); } } @@ -502,7 +486,7 @@ void recordFinishedCall() { long transparentRetries = transparentRetriesPerCall.get(); if (transparentRetries > 0) { module.resource.clientCallTransparentRetriesCounter() - .record(transparentRetries, baseAttributes, otelContext); + .record(transparentRetries, baseAttributes); } } @@ -510,8 +494,7 @@ void recordFinishedCall() { if (module.resource.clientCallRetryDelayCounter() != null) { module.resource.clientCallRetryDelayCounter().record( retryDelayNanos * SECONDS_PER_NANO, - baseAttributes, - otelContext + baseAttributes ); } } @@ -557,15 +540,14 @@ private static final class ServerTracer extends ServerStreamTracer { private final Stopwatch stopwatch; private volatile long outboundWireSize; private volatile long inboundWireSize; - private final Context otelContext; + private volatile Baggage baggage; ServerTracer(OpenTelemetryMetricsModule module, String fullMethodName, - List streamPlugins, Context otelContext) { + List streamPlugins) { this.module = checkNotNull(module, "module"); this.fullMethodName = fullMethodName; this.streamPlugins = checkNotNull(streamPlugins, "streamPlugins"); this.stopwatch = module.stopwatchSupplier.get().start(); - this.otelContext = checkNotNull(otelContext, "otelContext"); } @Override @@ -574,13 +556,23 @@ public void serverCallStarted(ServerCallInfo callInfo) { // which is true for all generated methods. Otherwise, programmatically // created methods result in high cardinality metrics. boolean isSampledToLocalTracing = callInfo.getMethodDescriptor().isSampledToLocalTracing(); + baggage = BAGGAGE_KEY.get(io.grpc.Context.current()); isGeneratedMethod = isSampledToLocalTracing; - io.opentelemetry.api.common.Attributes attribute = - io.opentelemetry.api.common.Attributes.of( - METHOD_KEY, recordMethodName(fullMethodName, isSampledToLocalTracing)); + + AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder() + .put(METHOD_KEY, recordMethodName(fullMethodName, isSampledToLocalTracing)); + + if (baggage != null) { + for (java.util.Map.Entry entry : + baggage.asMap().entrySet()) { + builder.put(entry.getKey(), entry.getValue().getValue()); + } + } + + io.opentelemetry.api.common.Attributes attributes = builder.build(); if (module.resource.serverCallCountCounter() != null) { - module.resource.serverCallCountCounter().add(1, attribute, otelContext); + module.resource.serverCallCountCounter().add(1, attributes); } } @@ -627,28 +619,30 @@ public void streamClosed(Status status) { AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder() .put(METHOD_KEY, recordMethodName(fullMethodName, isGeneratedMethod)) .put(STATUS_KEY, status.getCode().toString()); + + if (baggage != null) { + for (java.util.Map.Entry entry : + baggage.asMap().entrySet()) { + builder.put(entry.getKey(), entry.getValue().getValue()); + } + } + for (OpenTelemetryPlugin.ServerStreamPlugin plugin : streamPlugins) { plugin.addLabels(builder); } io.opentelemetry.api.common.Attributes attributes = builder.build(); - Context ctxToRecord = otelContext; - Baggage currentBaggage = BAGGAGE_KEY.get(); - if (currentBaggage != null && !currentBaggage.isEmpty()) { - ctxToRecord = ctxToRecord.with(currentBaggage); - } - if (module.resource.serverCallDurationCounter() != null) { module.resource.serverCallDurationCounter() - .record(elapsedTimeNanos * SECONDS_PER_NANO, attributes, ctxToRecord); + .record(elapsedTimeNanos * SECONDS_PER_NANO, attributes); } if (module.resource.serverTotalSentCompressedMessageSizeCounter() != null) { module.resource.serverTotalSentCompressedMessageSizeCounter() - .record(outboundWireSize, attributes, ctxToRecord); + .record(outboundWireSize, attributes); } if (module.resource.serverTotalReceivedCompressedMessageSizeCounter() != null) { module.resource.serverTotalReceivedCompressedMessageSizeCounter() - .record(inboundWireSize, attributes, ctxToRecord); + .record(inboundWireSize, attributes); } } } @@ -668,10 +662,8 @@ public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata } streamPlugins = Collections.unmodifiableList(streamPluginsMutable); } - Context context = contextPropagators.getTextMapPropagator().extract( - Context.current(), headers, MetadataGetter.getInstance()); - return new ServerTracer(OpenTelemetryMetricsModule.this, fullMethodName, streamPlugins, - context); + return new ServerTracer(OpenTelemetryMetricsModule.this, fullMethodName, + streamPlugins); } } diff --git a/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java index cd3ba2dd1c0..90b328befa6 100644 --- a/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java +++ b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java @@ -17,7 +17,6 @@ package io.grpc.opentelemetry; import static io.grpc.ClientStreamTracer.NAME_RESOLUTION_DELAYED; -import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.BAGGAGE_KEY; import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.LOCALITY_KEY; import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.METHOD_KEY; import static io.grpc.opentelemetry.internal.OpenTelemetryConstants.STATUS_KEY; @@ -28,13 +27,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyDouble; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; -import com.google.common.base.Stopwatch; -import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import io.grpc.Attributes; import io.grpc.CallOptions; @@ -43,57 +37,33 @@ import io.grpc.ClientInterceptor; import io.grpc.ClientInterceptors; import io.grpc.ClientStreamTracer; -import io.grpc.Contexts; -import io.grpc.ForwardingClientCall; import io.grpc.KnownLength; import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.ServerCall; import io.grpc.ServerCallHandler; -import io.grpc.ServerInterceptor; import io.grpc.ServerServiceDefinition; import io.grpc.ServerStreamTracer; import io.grpc.ServerStreamTracer.ServerCallInfo; import io.grpc.Status; import io.grpc.Status.Code; -import io.grpc.inprocess.InProcessChannelBuilder; -import io.grpc.inprocess.InProcessServerBuilder; import io.grpc.internal.FakeClock; import io.grpc.opentelemetry.GrpcOpenTelemetry.TargetFilter; import io.grpc.opentelemetry.OpenTelemetryMetricsModule.CallAttemptsTracerFactory; import io.grpc.opentelemetry.internal.OpenTelemetryConstants; -import io.grpc.stub.ClientCalls; -import io.grpc.stub.MetadataUtils; -import io.grpc.stub.ServerCalls; -import io.grpc.stub.StreamObserver; import io.grpc.testing.GrpcServerRule; -import io.grpc.testing.protobuf.SimpleRequest; -import io.grpc.testing.protobuf.SimpleResponse; -import io.grpc.testing.protobuf.SimpleServiceGrpc; -import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.baggage.Baggage; -import io.opentelemetry.api.baggage.propagation.W3CBaggagePropagator; import io.opentelemetry.api.common.AttributeKey; -import io.opentelemetry.api.metrics.DoubleHistogram; -import io.opentelemetry.api.metrics.LongCounter; -import io.opentelemetry.api.metrics.LongHistogram; import io.opentelemetry.api.metrics.Meter; -import io.opentelemetry.context.Context; -import io.opentelemetry.context.propagation.ContextPropagators; -import io.opentelemetry.context.propagation.TextMapPropagator; -import io.opentelemetry.context.propagation.TextMapSetter; import io.opentelemetry.sdk.common.InstrumentationScopeInfo; import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.testing.junit4.OpenTelemetryRule; import java.io.IOException; import java.io.InputStream; import java.util.Arrays; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import javax.annotation.Nullable; @@ -195,14 +165,9 @@ public String parse(InputStream stream) { private ServerCall.Listener mockServerCallListener; @Captor private ArgumentCaptor statusCaptor; - @Mock - private DoubleHistogram mockServerCallDurationHistogram; - @Captor - private ArgumentCaptor contextCaptor; + private io.grpc.Server server; private io.grpc.ManagedChannel channel; - private OpenTelemetryMetricsResource resource; - private final String serverName = "E2ETestServer-" + Math.random(); private final FakeClock fakeClock = new FakeClock(); private final MethodDescriptor method = @@ -222,9 +187,7 @@ public String parse(InputStream stream) { public void setUp() throws Exception { testMeter = openTelemetryTesting.getOpenTelemetry() .getMeter(OpenTelemetryConstants.INSTRUMENTATION_SCOPE); - resource = OpenTelemetryMetricsResource.builder() - .serverCallDurationCounter(mockServerCallDurationHistogram) - .build(); + } @After @@ -1267,7 +1230,7 @@ public void clientLocalityMetrics_present() { enabledMetricsMap, disableDefaultMetrics); OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule( fakeClock.getStopwatchSupplier(), resource, Arrays.asList("grpc.lb.locality"), - emptyList(), openTelemetryTesting.getOpenTelemetry().getPropagators()); + emptyList()); OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory = new CallAttemptsTracerFactory(module, target, method.getFullMethodName(), emptyList()); @@ -1336,7 +1299,7 @@ public void clientLocalityMetrics_missing() { enabledMetricsMap, disableDefaultMetrics); OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule( fakeClock.getStopwatchSupplier(), resource, Arrays.asList("grpc.lb.locality"), - emptyList(), openTelemetryTesting.getOpenTelemetry().getPropagators()); + emptyList()); OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory = new CallAttemptsTracerFactory(module, target, method.getFullMethodName(), emptyList()); @@ -1401,7 +1364,7 @@ public void clientBackendServiceMetrics_present() { enabledMetricsMap, disableDefaultMetrics); OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule( fakeClock.getStopwatchSupplier(), resource, Arrays.asList("grpc.lb.backend_service"), - emptyList(), openTelemetryTesting.getOpenTelemetry().getPropagators()); + emptyList()); OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory = new CallAttemptsTracerFactory(module, target, method.getFullMethodName(), emptyList()); @@ -1470,7 +1433,7 @@ public void clientBackendServiceMetrics_missing() { enabledMetricsMap, disableDefaultMetrics); OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule( fakeClock.getStopwatchSupplier(), resource, Arrays.asList("grpc.lb.backend_service"), - emptyList(), openTelemetryTesting.getOpenTelemetry().getPropagators()); + emptyList()); OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory = new CallAttemptsTracerFactory(module, target, method.getFullMethodName(), emptyList()); @@ -1652,46 +1615,6 @@ public void serverBasicMetrics() { } - @Test - public void serverBaggagePropagationToMetrics() { - // 1. Define the test baggage - Baggage testBaggage = Baggage.builder() - .put("user-id", "67") - .build(); - - // 2. Inject baggage into headers - Metadata headers = new Metadata(); - openTelemetryTesting.getOpenTelemetry().getPropagators().getTextMapPropagator() - .inject(Context.root().with(testBaggage), headers, new TextMapSetter() { - @Override - public void set(Metadata carrier, String key, String value) { - carrier.put(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER), value); - } - }); - - // 3. Create module and tracer factory using the mock resource - OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule( - fakeClock.getStopwatchSupplier(), resource, emptyList(), emptyList(), - openTelemetryTesting.getOpenTelemetry().getPropagators()); - ServerStreamTracer.Factory tracerFactory = module.getServerTracerFactory(); - ServerStreamTracer tracer = tracerFactory.newServerStreamTracer(method.getFullMethodName(), - headers); - - // 4. Trigger metric recording - tracer.streamClosed(Status.OK); - - // 5. Verify the record call and capture the OTel Context - verify(mockServerCallDurationHistogram).record( - anyDouble(), - any(io.opentelemetry.api.common.Attributes.class), - contextCaptor.capture()); - - // 6. Assert on the captured OTel Context - io.opentelemetry.context.Context capturedOtelContext = contextCaptor.getValue(); - Baggage capturedBaggage = Baggage.fromContext(capturedOtelContext); - - assertEquals("67", capturedBaggage.getEntryValue("user-id")); - } @Test public void targetAttributeFilter_notSet_usesOriginalTarget() { @@ -1827,15 +1750,14 @@ public void targetAttributeFilter_rejectsTarget_mapsToOther() { private OpenTelemetryMetricsModule newOpenTelemetryMetricsModule( OpenTelemetryMetricsResource resource) { return new OpenTelemetryMetricsModule( - fakeClock.getStopwatchSupplier(), resource, emptyList(), emptyList(), - openTelemetryTesting.getOpenTelemetry().getPropagators()); + fakeClock.getStopwatchSupplier(), resource, emptyList(), emptyList()); } private OpenTelemetryMetricsModule newOpenTelemetryMetricsModule( OpenTelemetryMetricsResource resource, TargetFilter filter) { return new OpenTelemetryMetricsModule( fakeClock.getStopwatchSupplier(), resource, emptyList(), emptyList(), - openTelemetryTesting.getOpenTelemetry().getPropagators(), filter); + filter); } static class CallInfo extends ServerCallInfo { @@ -1870,472 +1792,202 @@ public String getAuthority() { } @Test - public void serverBaggagePropagation_EndToEnd() throws Exception { - // 1. Create Both Modules - OpenTelemetry otel = openTelemetryTesting.getOpenTelemetry(); - OpenTelemetryTracingModule tracingModule = new OpenTelemetryTracingModule(otel); - OpenTelemetryMetricsModule metricsModule = new OpenTelemetryMetricsModule( - fakeClock.getStopwatchSupplier(), resource, emptyList(), emptyList(), - otel.getPropagators()); - - // 2. Create Server with *both* tracer factories - server = InProcessServerBuilder.forName(serverName) - .addService(new SimpleServiceImpl()) // <-- Uses the helper class below - .addStreamTracerFactory(tracingModule.getServerTracerFactory()) - .addStreamTracerFactory(metricsModule.getServerTracerFactory()) - .build() - .start(); - - // 3. Create Client Channel - channel = InProcessChannelBuilder.forName(serverName).directExecutor().build(); - - // 4. Manually create baggage headers - Metadata headers = new Metadata(); - headers.put(Metadata.Key.of("baggage", Metadata.ASCII_STRING_MARSHALLER), - "choice=red_pill_or_blue_pill"); - - // 5. Make the gRPC call with these headers - ClientInterceptor headerAttachingInterceptor = - MetadataUtils.newAttachHeadersInterceptor(headers); - - // Now, create the stub and apply that interceptor - SimpleServiceGrpc.SimpleServiceBlockingStub stub = - SimpleServiceGrpc.newBlockingStub(channel) - .withInterceptors(headerAttachingInterceptor); - - // Use the imported SimpleRequest - stub.unaryRpc(SimpleRequest.getDefaultInstance()); - - // 6. Verify the Mock - verify(mockServerCallDurationHistogram).record( - anyDouble(), - any(io.opentelemetry.api.common.Attributes.class), - contextCaptor.capture()); + public void serverMetrics_recordsBaggageInAttributes() { + OpenTelemetryMetricsResource resource = GrpcOpenTelemetry.createMetricInstruments(testMeter, + enabledMetricsMap, disableDefaultMetrics); + OpenTelemetryMetricsModule module = newOpenTelemetryMetricsModule(resource); + ServerStreamTracer.Factory tracerFactory = module.getServerTracerFactory(); - // 7. Assert on the captured Context - io.opentelemetry.context.Context capturedOtelContext = contextCaptor.getValue(); - Baggage capturedBaggage = Baggage.fromContext(capturedOtelContext); + Baggage baggage = Baggage.builder() + .put("baggage-key-1", "baggage-val-1") + .build(); - assertEquals("red_pill_or_blue_pill", capturedBaggage.getEntryValue("choice")); - } + io.grpc.Context grpcContext = io.grpc.Context.current() + .withValue(OpenTelemetryConstants.BAGGAGE_KEY, baggage); - /** - * A simple service implementation for the E2E test. - */ - private static class SimpleServiceImpl extends SimpleServiceGrpc.SimpleServiceImplBase { - @Override - public void unaryRpc(SimpleRequest request, StreamObserver responseObserver) { - responseObserver.onNext(SimpleResponse.getDefaultInstance()); - responseObserver.onCompleted(); + io.grpc.Context previous = grpcContext.attach(); + try { + ServerStreamTracer tracer = tracerFactory.newServerStreamTracer( + method.getFullMethodName(), new Metadata()); + tracer.serverCallStarted( + new CallInfo<>(method, Attributes.EMPTY, null)); + + tracer.inboundMessage(0); + tracer.inboundWireSize(34); + fakeClock.forwardTime(100, java.util.concurrent.TimeUnit.MILLISECONDS); + tracer.outboundMessage(0); + tracer.outboundWireSize(1028); + fakeClock.forwardTime(16, java.util.concurrent.TimeUnit.MILLISECONDS); + tracer.inboundMessage(1); + tracer.inboundWireSize(154); + tracer.outboundMessage(1); + tracer.outboundWireSize(99); + fakeClock.forwardTime(24, java.util.concurrent.TimeUnit.MILLISECONDS); + tracer.streamClosed(Status.CANCELLED); + } finally { + grpcContext.detach(previous); } - } - @Test - public void serverMetricsShouldRecordContextWithBaggage() { - // Mocks - DoubleHistogram serverCallDurationCounter = mock(DoubleHistogram.class); - OpenTelemetryMetricsResource resource = OpenTelemetryMetricsResource.builder() - .serverCallDurationCounter(serverCallDurationCounter) - .build(); - - // ContextPropagators with Baggage - ContextPropagators propagators = ContextPropagators.create( - TextMapPropagator.composite(W3CBaggagePropagator.getInstance())); - - // Module - OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule( - Stopwatch::createUnstarted, - resource, - ImmutableList.of(), - ImmutableList.of(), - propagators); - - // Baggage to inject - Baggage baggage = Baggage.builder().put("my-baggage-key", "my-baggage-value").build(); - Metadata headers = new Metadata(); - propagators.getTextMapPropagator().inject(Context.root().with(baggage), headers, - new MetadataSetter()); - - // Create Tracer - io.grpc.ServerStreamTracer.Factory factory = module.getServerTracerFactory(); - io.grpc.ServerStreamTracer tracer = factory.newServerStreamTracer("test/method", headers); - - // Close stream logic - tracer.streamClosed(Status.OK); - - // Verify record called with context (which should have baggage) - verify(serverCallDurationCounter).record( - anyDouble(), - any(), - org.mockito.ArgumentMatchers.argThat(ctx -> { - Baggage b = Baggage.fromContext(ctx); - return "my-baggage-value".equals(b.getEntryValue("my-baggage-key")); - })); - } - - @Test - public void serverMetrics_withExternalExecutor_propagatesBaggage() throws Exception { - // Setup Mocks & Resource - DoubleHistogram serverCallDurationCounter = mock(DoubleHistogram.class); - LongCounter serverCallCountCounter = mock(LongCounter.class); - LongHistogram serverTotalSentCompressedMessageSizeCounter = mock(LongHistogram.class); - LongHistogram serverTotalReceivedCompressedMessageSizeCounter = mock(LongHistogram.class); - OpenTelemetryMetricsResource resource = OpenTelemetryMetricsResource.builder() - .serverCallDurationCounter(serverCallDurationCounter) - .serverCallCountCounter(serverCallCountCounter) - .serverTotalSentCompressedMessageSizeCounter( - serverTotalSentCompressedMessageSizeCounter) - .serverTotalReceivedCompressedMessageSizeCounter( - serverTotalReceivedCompressedMessageSizeCounter) - .build(); - - // Setup Propagators - ContextPropagators propagators = ContextPropagators.create(W3CBaggagePropagator.getInstance()); - - // Initialize Module - OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule( - Stopwatch::createUnstarted, - resource, - ImmutableList.of(), - ImmutableList.of(), - propagators); - - // Setup Server with Wrapped Executor (Custom Executor) - ExecutorService customExecutor = Executors.newFixedThreadPool(2); - java.util.concurrent.Executor rawExecutor = customExecutor; - - String serverName = InProcessServerBuilder.generateName(); - io.grpc.Server server = InProcessServerBuilder.forName(serverName) - .executor(rawExecutor) - .addService(new SimpleServiceGrpc.SimpleServiceImplBase() { - @Override - public void unaryRpc( - SimpleRequest request, - StreamObserver responseObserver) { - responseObserver.onNext(SimpleResponse.getDefaultInstance()); - responseObserver.onCompleted(); - } - }) - .addStreamTracerFactory(module.getServerTracerFactory()) - .build().start(); - - // Client Interceptor to inject baggage - ClientInterceptor baggageInterceptor = new ClientInterceptor() { - @Override - public ClientCall interceptCall( - MethodDescriptor method, CallOptions callOptions, Channel next) { - return new ForwardingClientCall.SimpleForwardingClientCall( - next.newCall(method, callOptions)) { - @Override - public void start(Listener responseListener, Metadata headers) { - propagators.getTextMapPropagator().inject(Context.current(), headers, - new MetadataSetter()); - super.start(responseListener, headers); - } - }; - } - }; - - // Setup Client and Inject Baggage - io.grpc.ManagedChannel channel = InProcessChannelBuilder.forName(serverName) - .intercept(baggageInterceptor) - .directExecutor() + io.opentelemetry.api.common.Attributes startAttributes = + io.opentelemetry.api.common.Attributes.builder() + .put(METHOD_KEY, method.getFullMethodName()) + .put("baggage-key-1", "baggage-val-1") .build(); - SimpleServiceGrpc.SimpleServiceBlockingStub stub = SimpleServiceGrpc - .newBlockingStub(channel); - // Define multiple Baggage items - Baggage testBaggage = Baggage.builder() - .put("key1", "value1") - .put("key2", "value/with/special:chars") + io.opentelemetry.api.common.Attributes endAttributes = + io.opentelemetry.api.common.Attributes.builder() + .put(METHOD_KEY, method.getFullMethodName()) + .put(STATUS_KEY, Code.CANCELLED.toString()) + .put("baggage-key-1", "baggage-val-1") .build(); - // Make the call with Baggage in Context - try (io.opentelemetry.context.Scope scope = Context.current().with(testBaggage).makeCurrent()) { - stub.unaryRpc(SimpleRequest.getDefaultInstance()); - } - - // Shutdown and Wait - channel.shutdownNow(); - server.shutdown().awaitTermination(5, TimeUnit.SECONDS); - customExecutor.shutdownNow(); - - // Verification Logic for Baggage - org.mockito.ArgumentMatcher baggageMatcher = ctx -> { - Baggage b = Baggage.fromContext(ctx); - return "value1".equals(b.getEntryValue("key1")) - && "value/with/special:chars".equals(b.getEntryValue("key2")); - }; - - // Verify all metrics recorded with correct baggage - // Use timeout to avoid race conditions as metrics might be recorded - // asynchronously - verify(serverCallDurationCounter, timeout(5000)).record( - anyDouble(), - any(), // Attributes - org.mockito.ArgumentMatchers.argThat(baggageMatcher)); - - verify(serverCallCountCounter, timeout(5000)).add( - org.mockito.ArgumentMatchers.eq(1L), - any(), // Attributes - org.mockito.ArgumentMatchers.argThat(baggageMatcher)); - - verify(serverTotalSentCompressedMessageSizeCounter, timeout(5000)).record( - org.mockito.ArgumentMatchers.anyLong(), - any(), // Attributes - org.mockito.ArgumentMatchers.argThat(baggageMatcher)); - - verify(serverTotalReceivedCompressedMessageSizeCounter, timeout(5000)).record( - org.mockito.ArgumentMatchers.anyLong(), - any(), // Attributes - org.mockito.ArgumentMatchers.argThat(baggageMatcher)); - } - - private static class MetadataSetter implements TextMapSetter { - @Override - public void set(Metadata carrier, String key, String value) { - carrier.put(Metadata.Key.of(key, Metadata.ASCII_STRING_MARSHALLER), value); - } + assertThat(openTelemetryTesting.getMetrics()) + .satisfiesExactlyInAnyOrder( + metric -> assertThat(metric) + .hasInstrumentationScope(InstrumentationScopeInfo.create( + OpenTelemetryConstants.INSTRUMENTATION_SCOPE)) + .hasName(SERVER_CALL_COUNT) + .hasUnit("{call}") + .hasLongSumSatisfying( + longSum -> longSum + .hasPointsSatisfying( + point -> point + .hasAttributes(startAttributes) + .hasValue(1))), + metric -> assertThat(metric) + .hasInstrumentationScope(InstrumentationScopeInfo.create( + OpenTelemetryConstants.INSTRUMENTATION_SCOPE)) + .hasName(SERVER_CALL_SENT_TOTAL_COMPRESSED_MESSAGE_SIZE) + .hasUnit("By") + .hasHistogramSatisfying( + histogram -> histogram.hasPointsSatisfying( + point -> point + .hasCount(1) + .hasSum(1028L + 99) + .hasAttributes(endAttributes))), + metric -> assertThat(metric) + .hasInstrumentationScope(InstrumentationScopeInfo.create( + OpenTelemetryConstants.INSTRUMENTATION_SCOPE)) + .hasName(SERVER_CALL_RECV_TOTAL_COMPRESSED_MESSAGE_SIZE) + .hasUnit("By") + .hasHistogramSatisfying( + histogram -> histogram.hasPointsSatisfying( + point -> point + .hasCount(1) + .hasSum(34L + 154) + .hasAttributes(endAttributes))), + metric -> assertThat(metric) + .hasInstrumentationScope(InstrumentationScopeInfo.create( + OpenTelemetryConstants.INSTRUMENTATION_SCOPE)) + .hasName(SERVER_CALL_DURATION) + .hasUnit("s") + .hasHistogramSatisfying( + histogram -> histogram.hasPointsSatisfying( + point -> point + .hasCount(1) + .hasSum(0.1 + 0.016 + 0.024) + .hasAttributes(endAttributes)))); } @Test - public void clientMetric_baggagePropagation_externalExecutor() throws Exception { - String target = "target:///"; - ExecutorService executor = Executors.newSingleThreadExecutor(); - try { - // Mock the instrument to verify Context - DoubleHistogram mockHistogram = mock(DoubleHistogram.class); - OpenTelemetryMetricsResource resource = OpenTelemetryMetricsResource.builder() - .clientAttemptDurationCounter(mockHistogram) - .build(); - - OpenTelemetryMetricsModule module = newOpenTelemetryMetricsModule(resource); - - MethodDescriptor methodDescriptor = - MethodDescriptor.newBuilder() - .setType(MethodDescriptor.MethodType.UNARY) - .setFullMethodName("service/method") - .setRequestMarshaller(MARSHALLER) - .setResponseMarshaller(MARSHALLER) - .setSampledToLocalTracing(true) - .build(); - - ServerServiceDefinition serviceDef = ServerServiceDefinition.builder("service") - .addMethod(methodDescriptor, ServerCalls.asyncUnaryCall( - new ServerCalls.UnaryMethod() { - @Override - public void invoke(String req, StreamObserver responseObserver) { - responseObserver.onNext("Response"); - responseObserver.onCompleted(); - } - })) - .build(); - - server = InProcessServerBuilder.forName("client-baggage-test") - .directExecutor() - .addService(serviceDef) - .build().start(); - - InProcessChannelBuilder channelBuilder = - InProcessChannelBuilder.forName("client-baggage-test") - .executor(executor); - - // Use the module's interceptor - ClientInterceptor interceptor = module.getClientInterceptor(target); - channel = channelBuilder.intercept(interceptor).build(); - - Baggage baggage = Baggage.builder().put("client_key", "client_value").build(); - try (io.opentelemetry.context.Scope scope = Context.current().with(baggage).makeCurrent()) { - ClientCalls.blockingUnaryCall(channel, methodDescriptor, CallOptions.DEFAULT, "Request"); + public void serverMetrics_recordsBaggageInAttributes_endToEnd_customSdk() throws Exception { + io.opentelemetry.api.OpenTelemetry customSdk = new io.opentelemetry.api.OpenTelemetry() { + @Override + public io.opentelemetry.api.trace.TracerProvider getTracerProvider() { + return openTelemetryTesting.getOpenTelemetry().getTracerProvider(); } - ArgumentCaptor contextCaptor = ArgumentCaptor.forClass(Context.class); - - // Use atLeastOnce() and timeout to allow for async execution - verify(mockHistogram, timeout(1000).atLeastOnce()) - .record(anyDouble(), any(io.opentelemetry.api.common.Attributes.class), - contextCaptor.capture()); - - boolean found = false; - for (Context ctx : contextCaptor.getAllValues()) { - Baggage b = Baggage.fromContext(ctx); - if ("client_value".equals(b.getEntryValue("client_key"))) { - found = true; - break; - } + @Override + public io.opentelemetry.api.metrics.MeterProvider getMeterProvider() { + return openTelemetryTesting.getOpenTelemetry().getMeterProvider(); } - assertTrue("Client baggage not found in metrics context", found); - } finally { - executor.shutdown(); - } - } - - @Test - public void serverMetric_baggagePropagation_externalExecutor() throws Exception { - ExecutorService executor = Executors.newSingleThreadExecutor(); - try { - // Mock the instrument - DoubleHistogram mockHistogram = mock(DoubleHistogram.class); - OpenTelemetryMetricsResource resource = OpenTelemetryMetricsResource.builder() - .serverCallDurationCounter(mockHistogram) - .build(); - - // Configure module with propagation - ContextPropagators propagators = - ContextPropagators.create(W3CBaggagePropagator.getInstance()); - - OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule( - fakeClock.getStopwatchSupplier(), resource, emptyList(), emptyList(), propagators); - - MethodDescriptor methodDescriptor = - MethodDescriptor.newBuilder() - .setType(MethodDescriptor.MethodType.UNARY) - .setFullMethodName("service/method") - .setRequestMarshaller(MARSHALLER) - .setResponseMarshaller(MARSHALLER) - .setSampledToLocalTracing(true) - .build(); - - ServerServiceDefinition serviceDef = ServerServiceDefinition.builder("service") - .addMethod(methodDescriptor, ServerCalls.asyncUnaryCall( - new ServerCalls.UnaryMethod() { - @Override - public void invoke(String req, StreamObserver responseObserver) { - responseObserver.onNext("Response"); - responseObserver.onCompleted(); - } - })) - .build(); - // Use external executor by setting it on builder - InProcessServerBuilder serverBuilder = InProcessServerBuilder.forName("server-baggage-test") - .executor(executor) - .addService(serviceDef) - .addStreamTracerFactory(module.getServerTracerFactory()); - - server = serverBuilder.build().start(); - - channel = InProcessChannelBuilder.forName("server-baggage-test") - .directExecutor() - .build(); - - // We need to inject Baggage into the call. - ClientInterceptor baggageInjector = new ClientInterceptor() { - @Override - public ClientCall interceptCall( - MethodDescriptor method, CallOptions callOptions, Channel next) { - return new ForwardingClientCall.SimpleForwardingClientCall( - next.newCall(method, callOptions)) { - @Override - public void start(Listener responseListener, Metadata headers) { - // Inject baggage manually into headers - Baggage baggage = Baggage.builder().put("server_key", "server_value").build(); - propagators.getTextMapPropagator().inject(Context.current().with(baggage), headers, - new MetadataSetter()); - super.start(responseListener, headers); - } - }; - } - }; - - ClientCalls.blockingUnaryCall( - ClientInterceptors.intercept(channel, baggageInjector), - methodDescriptor, CallOptions.DEFAULT, "Request"); - - ArgumentCaptor contextCaptor = ArgumentCaptor.forClass(Context.class); - - verify(mockHistogram, timeout(1000).atLeastOnce()) - .record(anyDouble(), any(io.opentelemetry.api.common.Attributes.class), - contextCaptor.capture()); - - boolean found = false; - for (Context ctx : contextCaptor.getAllValues()) { - Baggage b = Baggage.fromContext(ctx); - if ("server_value".equals(b.getEntryValue("server_key"))) { - found = true; - break; - } + @Override + public io.opentelemetry.context.propagation.ContextPropagators getPropagators() { + return io.opentelemetry.context.propagation.ContextPropagators.create( + io.opentelemetry.context.propagation.TextMapPropagator.composite( + io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator.getInstance(), + io.opentelemetry.api.baggage.propagation.W3CBaggagePropagator.getInstance())); } - assertTrue("Server baggage not found in metrics context", found); - } finally { - executor.shutdown(); - } - } - - @Test - public void serverMetric_interceptedBaggage() throws Exception { - // This test verifies if baggage added by a ServerInterceptor is visible to - // OpenTelemetry metrics Context. - - OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule( - () -> Stopwatch.createUnstarted(), - resource, - Collections.emptyList(), /* optionalLabels */ - Collections.emptyList(), /* plugins */ - ContextPropagators.noop()); - ServerInterceptor baggageInterceptor = new ServerInterceptor() { @Override - public ServerCall.Listener interceptCall( - ServerCall call, Metadata headers, ServerCallHandler next) { - // Add baggage to the context - Baggage baggage = Baggage.builder() - .put("interceptor_key", "interceptor_value") - .build(); - io.grpc.Context ctx = io.grpc.Context.current().withValue(BAGGAGE_KEY, baggage); - return Contexts.interceptCall(ctx, call, headers, next); + public io.opentelemetry.api.logs.LoggerProvider getLogsBridge() { + return openTelemetryTesting.getOpenTelemetry().getLogsBridge(); } }; - MethodDescriptor methodDescriptor = - MethodDescriptor.newBuilder() - .setType(MethodDescriptor.MethodType.UNARY) - // Matching existing method name in test class - .setFullMethodName("package1.service2/method3") - .setRequestMarshaller(MARSHALLER) // Use existing MARSHALLER - .setResponseMarshaller(MARSHALLER) - .setSampledToLocalTracing(true) - .build(); - - ServerServiceDefinition serviceDef = ServerServiceDefinition.builder("package1.service2") - .addMethod(methodDescriptor, ServerCalls.asyncUnaryCall( - new ServerCalls.UnaryMethod() { - @Override - public void invoke(String req, StreamObserver responseObserver) { - responseObserver.onNext("Response"); - responseObserver.onCompleted(); - } - })) + GrpcOpenTelemetry grpcOpenTelemetry = GrpcOpenTelemetry.newBuilder() + .sdk(customSdk) + .enableTracing(true) + .enableMetrics(com.google.common.collect.ImmutableList.of( + "grpc.server.call.duration", + "grpc.server.call.rcvd_total_compressed_message_size", + "grpc.server.call.sent_total_compressed_message_size")) .build(); - InProcessServerBuilder serverBuilder = InProcessServerBuilder.forName("interceptor-test") - .addService(serviceDef) - .intercept(baggageInterceptor) - .addStreamTracerFactory(module.getServerTracerFactory()); - + java.util.concurrent.ExecutorService executor = + java.util.concurrent.Executors.newSingleThreadExecutor(); + io.grpc.inprocess.InProcessServerBuilder serverBuilder = + io.grpc.inprocess.InProcessServerBuilder.forName("test-server").executor(executor); + + serverBuilder.addService( + new io.grpc.testing.protobuf.SimpleServiceGrpc.SimpleServiceImplBase() { + @Override + public void unaryRpc( + io.grpc.testing.protobuf.SimpleRequest request, + io.grpc.stub.StreamObserver observer) { + observer.onNext(io.grpc.testing.protobuf.SimpleResponse.getDefaultInstance()); + observer.onCompleted(); + } + }); + + grpcOpenTelemetry.configureServerBuilder(serverBuilder); server = serverBuilder.build().start(); - // Use a real channel but we don't need metrics on client side for this test - InProcessChannelBuilder channelBuilder = InProcessChannelBuilder.forName("interceptor-test"); + io.grpc.inprocess.InProcessChannelBuilder channelBuilder = + io.grpc.inprocess.InProcessChannelBuilder.forName("test-server").directExecutor(); + grpcOpenTelemetry.configureChannelBuilder(channelBuilder); channel = channelBuilder.build(); + io.grpc.testing.protobuf.SimpleServiceGrpc.SimpleServiceBlockingStub stub = + io.grpc.testing.protobuf.SimpleServiceGrpc.newBlockingStub(channel); - ClientCalls.blockingUnaryCall(channel, methodDescriptor, CallOptions.DEFAULT, "Request"); - - // Verify that record was called with a Context containing the baggage - // Reusing contextCaptor from the class - verify(mockServerCallDurationHistogram, timeout(1000).atLeastOnce()) - .record(anyDouble(), any(io.opentelemetry.api.common.Attributes.class), - contextCaptor.capture()); - - boolean found = false; - for (io.opentelemetry.context.Context ctx : contextCaptor.getAllValues()) { - // Baggage from OTEL Context - Baggage otelBaggage = Baggage.fromContext(ctx); - if ("interceptor_value".equals(otelBaggage.getEntryValue("interceptor_key"))) { - found = true; - break; - } + Baggage baggage = Baggage.builder().put("my.baggage.key", "my.baggage.value").build(); + try (io.opentelemetry.context.Scope scope = + io.opentelemetry.context.Context.root().with(baggage).makeCurrent()) { + stub.unaryRpc(io.grpc.testing.protobuf.SimpleRequest.getDefaultInstance()); } - assertThat(found).isTrue(); + + io.opentelemetry.api.common.Attributes expectedAttributes = + io.opentelemetry.api.common.Attributes.builder() + .put("grpc.method", "grpc.testing.SimpleService/UnaryRpc") + .put("grpc.status", "OK") + .put("my.baggage.key", "my.baggage.value") + .build(); + + assertThat(openTelemetryTesting.getMetrics()) + .filteredOn(m -> m.getName().equals("grpc.server.call.duration")) + .satisfiesExactly(metric -> assertThat(metric) + .hasName("grpc.server.call.duration") + .hasHistogramSatisfying( + histogram -> histogram.hasPointsSatisfying( + point -> point.hasAttributes(expectedAttributes)))); + + assertThat(openTelemetryTesting.getMetrics()) + .filteredOn(m -> m.getName().equals( + "grpc.server.call.sent_total_compressed_message_size")) + .satisfiesExactly(metric -> assertThat(metric) + .hasName("grpc.server.call.sent_total_compressed_message_size") + .hasHistogramSatisfying( + histogram -> histogram.hasPointsSatisfying( + point -> point.hasAttributes(expectedAttributes)))); + + assertThat(openTelemetryTesting.getMetrics()) + .filteredOn(m -> m.getName().equals( + "grpc.server.call.rcvd_total_compressed_message_size")) + .satisfiesExactly(metric -> assertThat(metric) + .hasName("grpc.server.call.rcvd_total_compressed_message_size") + .hasHistogramSatisfying( + histogram -> histogram.hasPointsSatisfying( + point -> point.hasAttributes(expectedAttributes)))); } } From 0e2f9b51bbff8541bac12ae513720c2571870b18 Mon Sep 17 00:00:00 2001 From: agravator Date: Mon, 2 Mar 2026 16:01:08 +0530 Subject: [PATCH 7/8] fix format --- .../io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java | 1 - 1 file changed, 1 deletion(-) diff --git a/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java index 90b328befa6..27f9aa4071f 100644 --- a/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java +++ b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java @@ -84,7 +84,6 @@ */ @RunWith(JUnit4.class) public class OpenTelemetryMetricsModuleTest { - // ... existing code ... private static final CallOptions.Key CUSTOM_OPTION = CallOptions.Key.createWithDefault("option1", "default"); From e9a62ed878e1f17e5234519d7a500a555377138b Mon Sep 17 00:00:00 2001 From: agravator Date: Mon, 2 Mar 2026 16:07:12 +0530 Subject: [PATCH 8/8] fix format --- .../OpenTelemetryMetricsModule.java | 2 +- .../OpenTelemetryMetricsModuleTest.java | 18 +++++++++--------- 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java index 45318e3abc4..f63fffd368c 100644 --- a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java @@ -104,7 +104,7 @@ final class OpenTelemetryMetricsModule { OpenTelemetryMetricsModule(Supplier stopwatchSupplier, OpenTelemetryMetricsResource resource, - Collection optionalLabels, List plugins) { + Collection optionalLabels, List plugins) { this(stopwatchSupplier, resource, optionalLabels, plugins, null); } diff --git a/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java index 27f9aa4071f..6a6c111a4f9 100644 --- a/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java +++ b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java @@ -1228,8 +1228,8 @@ public void clientLocalityMetrics_present() { OpenTelemetryMetricsResource resource = GrpcOpenTelemetry.createMetricInstruments(testMeter, enabledMetricsMap, disableDefaultMetrics); OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule( - fakeClock.getStopwatchSupplier(), resource, Arrays.asList("grpc.lb.locality"), - emptyList()); + fakeClock.getStopwatchSupplier(), resource, Arrays.asList("grpc.lb.locality"), + emptyList()); OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory = new CallAttemptsTracerFactory(module, target, method.getFullMethodName(), emptyList()); @@ -1297,8 +1297,8 @@ public void clientLocalityMetrics_missing() { OpenTelemetryMetricsResource resource = GrpcOpenTelemetry.createMetricInstruments(testMeter, enabledMetricsMap, disableDefaultMetrics); OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule( - fakeClock.getStopwatchSupplier(), resource, Arrays.asList("grpc.lb.locality"), - emptyList()); + fakeClock.getStopwatchSupplier(), resource, Arrays.asList("grpc.lb.locality"), + emptyList()); OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory = new CallAttemptsTracerFactory(module, target, method.getFullMethodName(), emptyList()); @@ -1363,7 +1363,7 @@ public void clientBackendServiceMetrics_present() { enabledMetricsMap, disableDefaultMetrics); OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule( fakeClock.getStopwatchSupplier(), resource, Arrays.asList("grpc.lb.backend_service"), - emptyList()); + emptyList()); OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory = new CallAttemptsTracerFactory(module, target, method.getFullMethodName(), emptyList()); @@ -1432,7 +1432,7 @@ public void clientBackendServiceMetrics_missing() { enabledMetricsMap, disableDefaultMetrics); OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule( fakeClock.getStopwatchSupplier(), resource, Arrays.asList("grpc.lb.backend_service"), - emptyList()); + emptyList()); OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory = new CallAttemptsTracerFactory(module, target, method.getFullMethodName(), emptyList()); @@ -1749,14 +1749,14 @@ public void targetAttributeFilter_rejectsTarget_mapsToOther() { private OpenTelemetryMetricsModule newOpenTelemetryMetricsModule( OpenTelemetryMetricsResource resource) { return new OpenTelemetryMetricsModule( - fakeClock.getStopwatchSupplier(), resource, emptyList(), emptyList()); + fakeClock.getStopwatchSupplier(), resource, emptyList(), emptyList()); } private OpenTelemetryMetricsModule newOpenTelemetryMetricsModule( OpenTelemetryMetricsResource resource, TargetFilter filter) { return new OpenTelemetryMetricsModule( - fakeClock.getStopwatchSupplier(), resource, emptyList(), emptyList(), - filter); + fakeClock.getStopwatchSupplier(), resource, emptyList(), emptyList(), + filter); } static class CallInfo extends ServerCallInfo {