diff --git a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java index b05884305dc..f63fffd368c 100644 --- a/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java +++ b/opentelemetry/src/main/java/io/grpc/opentelemetry/OpenTelemetryMetricsModule.java @@ -48,7 +48,6 @@ import io.grpc.opentelemetry.GrpcOpenTelemetry.TargetFilter; import io.opentelemetry.api.baggage.Baggage; import io.opentelemetry.api.common.AttributesBuilder; -import io.opentelemetry.context.Context; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -159,14 +158,6 @@ static String recordMethodName(String fullMethodName, boolean isGeneratedMethod) return isGeneratedMethod ? fullMethodName : "other"; } - private static Context otelContextWithBaggage() { - Baggage baggage = BAGGAGE_KEY.get(); - if (baggage == null) { - return Context.current(); - } - return Context.current().with(baggage); - } - private static final class ClientTracer extends ClientStreamTracer { @Nullable private static final AtomicLongFieldUpdater outboundWireSizeUpdater; @Nullable private static final AtomicLongFieldUpdater inboundWireSizeUpdater; @@ -282,7 +273,6 @@ public void streamClosed(Status status) { } void recordFinishedAttempt() { - Context otelContext = otelContextWithBaggage(); AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder() .put(METHOD_KEY, fullMethodName) .put(TARGET_KEY, target) @@ -308,15 +298,15 @@ void recordFinishedAttempt() { if (module.resource.clientAttemptDurationCounter() != null ) { module.resource.clientAttemptDurationCounter() - .record(attemptNanos * SECONDS_PER_NANO, attribute, otelContext); + .record(attemptNanos * SECONDS_PER_NANO, attribute); } if (module.resource.clientTotalSentCompressedMessageSizeCounter() != null) { module.resource.clientTotalSentCompressedMessageSizeCounter() - .record(outboundWireSize, attribute, otelContext); + .record(outboundWireSize, attribute); } if (module.resource.clientTotalReceivedCompressedMessageSizeCounter() != null) { module.resource.clientTotalReceivedCompressedMessageSizeCounter() - .record(inboundWireSize, attribute, otelContext); + .record(inboundWireSize, attribute); } } } @@ -448,7 +438,6 @@ void callEnded(Status status) { } void recordFinishedCall() { - Context otelContext = otelContextWithBaggage(); if (attemptsPerCall.get() == 0) { ClientTracer tracer = newClientTracer(null); tracer.attemptNanos = attemptDelayStopwatch.elapsed(TimeUnit.NANOSECONDS); @@ -470,8 +459,7 @@ void recordFinishedCall() { callLatencyNanos * SECONDS_PER_NANO, baseAttributes.toBuilder() .put(STATUS_KEY, status.getCode().toString()) - .build(), - otelContext + .build() ); } @@ -480,7 +468,7 @@ void recordFinishedCall() { long retriesPerCall = Math.max(attemptsPerCall.get() - 1, 0); if (retriesPerCall > 0) { module.resource.clientCallRetriesCounter() - .record(retriesPerCall, baseAttributes, otelContext); + .record(retriesPerCall, baseAttributes); } } @@ -489,7 +477,7 @@ void recordFinishedCall() { long hedges = hedgedAttemptsPerCall.get(); if (hedges > 0) { module.resource.clientCallHedgesCounter() - .record(hedges, baseAttributes, otelContext); + .record(hedges, baseAttributes); } } @@ -498,7 +486,7 @@ void recordFinishedCall() { long transparentRetries = transparentRetriesPerCall.get(); if (transparentRetries > 0) { module.resource.clientCallTransparentRetriesCounter() - .record(transparentRetries, baseAttributes, otelContext); + .record(transparentRetries, baseAttributes); } } @@ -506,8 +494,7 @@ void recordFinishedCall() { if (module.resource.clientCallRetryDelayCounter() != null) { module.resource.clientCallRetryDelayCounter().record( retryDelayNanos * SECONDS_PER_NANO, - baseAttributes, - otelContext + baseAttributes ); } } @@ -553,6 +540,7 @@ private static final class ServerTracer extends ServerStreamTracer { private final Stopwatch stopwatch; private volatile long outboundWireSize; private volatile long inboundWireSize; + private volatile Baggage baggage; ServerTracer(OpenTelemetryMetricsModule module, String fullMethodName, List streamPlugins) { @@ -568,13 +556,23 @@ public void serverCallStarted(ServerCallInfo callInfo) { // which is true for all generated methods. Otherwise, programmatically // created methods result in high cardinality metrics. boolean isSampledToLocalTracing = callInfo.getMethodDescriptor().isSampledToLocalTracing(); + baggage = BAGGAGE_KEY.get(io.grpc.Context.current()); isGeneratedMethod = isSampledToLocalTracing; - io.opentelemetry.api.common.Attributes attribute = - io.opentelemetry.api.common.Attributes.of( - METHOD_KEY, recordMethodName(fullMethodName, isSampledToLocalTracing)); + + AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder() + .put(METHOD_KEY, recordMethodName(fullMethodName, isSampledToLocalTracing)); + + if (baggage != null) { + for (java.util.Map.Entry entry : + baggage.asMap().entrySet()) { + builder.put(entry.getKey(), entry.getValue().getValue()); + } + } + + io.opentelemetry.api.common.Attributes attributes = builder.build(); if (module.resource.serverCallCountCounter() != null) { - module.resource.serverCallCountCounter().add(1, attribute); + module.resource.serverCallCountCounter().add(1, attributes); } } @@ -606,7 +604,6 @@ public void inboundWireSize(long bytes) { */ @Override public void streamClosed(Status status) { - Context otelContext = otelContextWithBaggage(); if (streamClosedUpdater != null) { if (streamClosedUpdater.getAndSet(this, 1) != 0) { return; @@ -622,6 +619,14 @@ public void streamClosed(Status status) { AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder() .put(METHOD_KEY, recordMethodName(fullMethodName, isGeneratedMethod)) .put(STATUS_KEY, status.getCode().toString()); + + if (baggage != null) { + for (java.util.Map.Entry entry : + baggage.asMap().entrySet()) { + builder.put(entry.getKey(), entry.getValue().getValue()); + } + } + for (OpenTelemetryPlugin.ServerStreamPlugin plugin : streamPlugins) { plugin.addLabels(builder); } @@ -629,15 +634,15 @@ public void streamClosed(Status status) { if (module.resource.serverCallDurationCounter() != null) { module.resource.serverCallDurationCounter() - .record(elapsedTimeNanos * SECONDS_PER_NANO, attributes, otelContext); + .record(elapsedTimeNanos * SECONDS_PER_NANO, attributes); } if (module.resource.serverTotalSentCompressedMessageSizeCounter() != null) { module.resource.serverTotalSentCompressedMessageSizeCounter() - .record(outboundWireSize, attributes, otelContext); + .record(outboundWireSize, attributes); } if (module.resource.serverTotalReceivedCompressedMessageSizeCounter() != null) { module.resource.serverTotalReceivedCompressedMessageSizeCounter() - .record(inboundWireSize, attributes, otelContext); + .record(inboundWireSize, attributes); } } } @@ -657,7 +662,8 @@ public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata } streamPlugins = Collections.unmodifiableList(streamPluginsMutable); } - return new ServerTracer(OpenTelemetryMetricsModule.this, fullMethodName, streamPlugins); + return new ServerTracer(OpenTelemetryMetricsModule.this, fullMethodName, + streamPlugins); } } @@ -717,3 +723,4 @@ public void onClose(Status status, Metadata trailers) { } } } + diff --git a/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java index 391f94cefea..6a6c111a4f9 100644 --- a/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java +++ b/opentelemetry/src/test/java/io/grpc/opentelemetry/OpenTelemetryMetricsModuleTest.java @@ -27,7 +27,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyDouble; import static org.mockito.Mockito.verify; import com.google.common.collect.ImmutableMap; @@ -48,22 +47,13 @@ import io.grpc.ServerStreamTracer.ServerCallInfo; import io.grpc.Status; import io.grpc.Status.Code; -import io.grpc.inprocess.InProcessChannelBuilder; -import io.grpc.inprocess.InProcessServerBuilder; import io.grpc.internal.FakeClock; import io.grpc.opentelemetry.GrpcOpenTelemetry.TargetFilter; import io.grpc.opentelemetry.OpenTelemetryMetricsModule.CallAttemptsTracerFactory; import io.grpc.opentelemetry.internal.OpenTelemetryConstants; -import io.grpc.stub.MetadataUtils; -import io.grpc.stub.StreamObserver; import io.grpc.testing.GrpcServerRule; -import io.grpc.testing.protobuf.SimpleRequest; -import io.grpc.testing.protobuf.SimpleResponse; -import io.grpc.testing.protobuf.SimpleServiceGrpc; -import io.opentelemetry.api.OpenTelemetry; import io.opentelemetry.api.baggage.Baggage; import io.opentelemetry.api.common.AttributeKey; -import io.opentelemetry.api.metrics.DoubleHistogram; import io.opentelemetry.api.metrics.Meter; import io.opentelemetry.sdk.common.InstrumentationScopeInfo; import io.opentelemetry.sdk.metrics.data.MetricData; @@ -174,14 +164,9 @@ public String parse(InputStream stream) { private ServerCall.Listener mockServerCallListener; @Captor private ArgumentCaptor statusCaptor; - @Mock - private DoubleHistogram mockServerCallDurationHistogram; - @Captor - private ArgumentCaptor contextCaptor; + private io.grpc.Server server; private io.grpc.ManagedChannel channel; - private OpenTelemetryMetricsResource resource; - private final String serverName = "E2ETestServer-" + Math.random(); private final FakeClock fakeClock = new FakeClock(); private final MethodDescriptor method = @@ -201,9 +186,7 @@ public String parse(InputStream stream) { public void setUp() throws Exception { testMeter = openTelemetryTesting.getOpenTelemetry() .getMeter(OpenTelemetryConstants.INSTRUMENTATION_SCOPE); - resource = OpenTelemetryMetricsResource.builder() - .serverCallDurationCounter(mockServerCallDurationHistogram) - .build(); + } @After @@ -1245,7 +1228,8 @@ public void clientLocalityMetrics_present() { OpenTelemetryMetricsResource resource = GrpcOpenTelemetry.createMetricInstruments(testMeter, enabledMetricsMap, disableDefaultMetrics); OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule( - fakeClock.getStopwatchSupplier(), resource, Arrays.asList("grpc.lb.locality"), emptyList()); + fakeClock.getStopwatchSupplier(), resource, Arrays.asList("grpc.lb.locality"), + emptyList()); OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory = new CallAttemptsTracerFactory(module, target, method.getFullMethodName(), emptyList()); @@ -1313,7 +1297,8 @@ public void clientLocalityMetrics_missing() { OpenTelemetryMetricsResource resource = GrpcOpenTelemetry.createMetricInstruments(testMeter, enabledMetricsMap, disableDefaultMetrics); OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule( - fakeClock.getStopwatchSupplier(), resource, Arrays.asList("grpc.lb.locality"), emptyList()); + fakeClock.getStopwatchSupplier(), resource, Arrays.asList("grpc.lb.locality"), + emptyList()); OpenTelemetryMetricsModule.CallAttemptsTracerFactory callAttemptsTracerFactory = new CallAttemptsTracerFactory(module, target, method.getFullMethodName(), emptyList()); @@ -1629,44 +1614,6 @@ public void serverBasicMetrics() { } - @Test - public void serverBaggagePropagationToMetrics() { - // 1. Create module and tracer factory using the mock resource - OpenTelemetryMetricsModule module = new OpenTelemetryMetricsModule( - fakeClock.getStopwatchSupplier(), resource, emptyList(), emptyList()); - ServerStreamTracer.Factory tracerFactory = module.getServerTracerFactory(); - ServerStreamTracer tracer = - tracerFactory.newServerStreamTracer(method.getFullMethodName(), new Metadata()); - - // 2. Define the test baggage and gRPC context - Baggage testBaggage = Baggage.builder() - .put("user-id", "67") - .build(); - - // This simulates the context that the Tracing module would have created - io.grpc.Context grpcContext = io.grpc.Context.current() - .withValue(OpenTelemetryConstants.BAGGAGE_KEY, testBaggage); - - // 3. Attach the gRPC context, trigger metric recording, and detach - io.grpc.Context previousContext = grpcContext.attach(); - try { - tracer.streamClosed(Status.OK); - } finally { - grpcContext.detach(previousContext); - } - - // 4. Verify the record call and capture the OTel Context - verify(mockServerCallDurationHistogram).record( - anyDouble(), - any(io.opentelemetry.api.common.Attributes.class), - contextCaptor.capture()); - - // 5. Assert on the captured OTel Context - io.opentelemetry.context.Context capturedOtelContext = contextCaptor.getValue(); - Baggage capturedBaggage = Baggage.fromContext(capturedOtelContext); - - assertEquals("67", capturedBaggage.getEntryValue("user-id")); - } @Test public void targetAttributeFilter_notSet_usesOriginalTarget() { @@ -1808,7 +1755,8 @@ private OpenTelemetryMetricsModule newOpenTelemetryMetricsModule( private OpenTelemetryMetricsModule newOpenTelemetryMetricsModule( OpenTelemetryMetricsResource resource, TargetFilter filter) { return new OpenTelemetryMetricsModule( - fakeClock.getStopwatchSupplier(), resource, emptyList(), emptyList(), filter); + fakeClock.getStopwatchSupplier(), resource, emptyList(), emptyList(), + filter); } static class CallInfo extends ServerCallInfo { @@ -1843,62 +1791,202 @@ public String getAuthority() { } @Test - public void serverBaggagePropagation_EndToEnd() throws Exception { - // 1. Create Both Modules - OpenTelemetry otel = openTelemetryTesting.getOpenTelemetry(); - OpenTelemetryTracingModule tracingModule = new OpenTelemetryTracingModule(otel); - OpenTelemetryMetricsModule metricsModule = new OpenTelemetryMetricsModule( - fakeClock.getStopwatchSupplier(), resource, emptyList(), emptyList()); + public void serverMetrics_recordsBaggageInAttributes() { + OpenTelemetryMetricsResource resource = GrpcOpenTelemetry.createMetricInstruments(testMeter, + enabledMetricsMap, disableDefaultMetrics); + OpenTelemetryMetricsModule module = newOpenTelemetryMetricsModule(resource); + ServerStreamTracer.Factory tracerFactory = module.getServerTracerFactory(); - // 2. Create Server with *both* tracer factories - server = InProcessServerBuilder.forName(serverName) - .addService(new SimpleServiceImpl()) // <-- Uses the helper class below - .addStreamTracerFactory(tracingModule.getServerTracerFactory()) - .addStreamTracerFactory(metricsModule.getServerTracerFactory()) - .build() - .start(); + Baggage baggage = Baggage.builder() + .put("baggage-key-1", "baggage-val-1") + .build(); - // 3. Create Client Channel - channel = InProcessChannelBuilder.forName(serverName).directExecutor().build(); + io.grpc.Context grpcContext = io.grpc.Context.current() + .withValue(OpenTelemetryConstants.BAGGAGE_KEY, baggage); - // 4. Manually create baggage headers - Metadata headers = new Metadata(); - headers.put(Metadata.Key.of("baggage", Metadata.ASCII_STRING_MARSHALLER), - "choice=red_pill_or_blue_pill"); + io.grpc.Context previous = grpcContext.attach(); + try { + ServerStreamTracer tracer = tracerFactory.newServerStreamTracer( + method.getFullMethodName(), new Metadata()); + tracer.serverCallStarted( + new CallInfo<>(method, Attributes.EMPTY, null)); + + tracer.inboundMessage(0); + tracer.inboundWireSize(34); + fakeClock.forwardTime(100, java.util.concurrent.TimeUnit.MILLISECONDS); + tracer.outboundMessage(0); + tracer.outboundWireSize(1028); + fakeClock.forwardTime(16, java.util.concurrent.TimeUnit.MILLISECONDS); + tracer.inboundMessage(1); + tracer.inboundWireSize(154); + tracer.outboundMessage(1); + tracer.outboundWireSize(99); + fakeClock.forwardTime(24, java.util.concurrent.TimeUnit.MILLISECONDS); + tracer.streamClosed(Status.CANCELLED); + } finally { + grpcContext.detach(previous); + } - // 5. Make the gRPC call with these headers - ClientInterceptor headerAttachingInterceptor = - MetadataUtils.newAttachHeadersInterceptor(headers); + io.opentelemetry.api.common.Attributes startAttributes = + io.opentelemetry.api.common.Attributes.builder() + .put(METHOD_KEY, method.getFullMethodName()) + .put("baggage-key-1", "baggage-val-1") + .build(); - // Now, create the stub and apply that interceptor - SimpleServiceGrpc.SimpleServiceBlockingStub stub = - SimpleServiceGrpc.newBlockingStub(channel) - .withInterceptors(headerAttachingInterceptor); + io.opentelemetry.api.common.Attributes endAttributes = + io.opentelemetry.api.common.Attributes.builder() + .put(METHOD_KEY, method.getFullMethodName()) + .put(STATUS_KEY, Code.CANCELLED.toString()) + .put("baggage-key-1", "baggage-val-1") + .build(); - // Use the imported SimpleRequest - stub.unaryRpc(SimpleRequest.getDefaultInstance()); + assertThat(openTelemetryTesting.getMetrics()) + .satisfiesExactlyInAnyOrder( + metric -> assertThat(metric) + .hasInstrumentationScope(InstrumentationScopeInfo.create( + OpenTelemetryConstants.INSTRUMENTATION_SCOPE)) + .hasName(SERVER_CALL_COUNT) + .hasUnit("{call}") + .hasLongSumSatisfying( + longSum -> longSum + .hasPointsSatisfying( + point -> point + .hasAttributes(startAttributes) + .hasValue(1))), + metric -> assertThat(metric) + .hasInstrumentationScope(InstrumentationScopeInfo.create( + OpenTelemetryConstants.INSTRUMENTATION_SCOPE)) + .hasName(SERVER_CALL_SENT_TOTAL_COMPRESSED_MESSAGE_SIZE) + .hasUnit("By") + .hasHistogramSatisfying( + histogram -> histogram.hasPointsSatisfying( + point -> point + .hasCount(1) + .hasSum(1028L + 99) + .hasAttributes(endAttributes))), + metric -> assertThat(metric) + .hasInstrumentationScope(InstrumentationScopeInfo.create( + OpenTelemetryConstants.INSTRUMENTATION_SCOPE)) + .hasName(SERVER_CALL_RECV_TOTAL_COMPRESSED_MESSAGE_SIZE) + .hasUnit("By") + .hasHistogramSatisfying( + histogram -> histogram.hasPointsSatisfying( + point -> point + .hasCount(1) + .hasSum(34L + 154) + .hasAttributes(endAttributes))), + metric -> assertThat(metric) + .hasInstrumentationScope(InstrumentationScopeInfo.create( + OpenTelemetryConstants.INSTRUMENTATION_SCOPE)) + .hasName(SERVER_CALL_DURATION) + .hasUnit("s") + .hasHistogramSatisfying( + histogram -> histogram.hasPointsSatisfying( + point -> point + .hasCount(1) + .hasSum(0.1 + 0.016 + 0.024) + .hasAttributes(endAttributes)))); + } - // 6. Verify the Mock - verify(mockServerCallDurationHistogram).record( - anyDouble(), - any(io.opentelemetry.api.common.Attributes.class), - contextCaptor.capture()); + @Test + public void serverMetrics_recordsBaggageInAttributes_endToEnd_customSdk() throws Exception { + io.opentelemetry.api.OpenTelemetry customSdk = new io.opentelemetry.api.OpenTelemetry() { + @Override + public io.opentelemetry.api.trace.TracerProvider getTracerProvider() { + return openTelemetryTesting.getOpenTelemetry().getTracerProvider(); + } - // 7. Assert on the captured Context - io.opentelemetry.context.Context capturedOtelContext = contextCaptor.getValue(); - Baggage capturedBaggage = Baggage.fromContext(capturedOtelContext); + @Override + public io.opentelemetry.api.metrics.MeterProvider getMeterProvider() { + return openTelemetryTesting.getOpenTelemetry().getMeterProvider(); + } - assertEquals("red_pill_or_blue_pill", capturedBaggage.getEntryValue("choice")); - } + @Override + public io.opentelemetry.context.propagation.ContextPropagators getPropagators() { + return io.opentelemetry.context.propagation.ContextPropagators.create( + io.opentelemetry.context.propagation.TextMapPropagator.composite( + io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator.getInstance(), + io.opentelemetry.api.baggage.propagation.W3CBaggagePropagator.getInstance())); + } - /** - * A simple service implementation for the E2E test. - */ - private static class SimpleServiceImpl extends SimpleServiceGrpc.SimpleServiceImplBase { - @Override - public void unaryRpc(SimpleRequest request, StreamObserver responseObserver) { - responseObserver.onNext(SimpleResponse.getDefaultInstance()); - responseObserver.onCompleted(); + @Override + public io.opentelemetry.api.logs.LoggerProvider getLogsBridge() { + return openTelemetryTesting.getOpenTelemetry().getLogsBridge(); + } + }; + + GrpcOpenTelemetry grpcOpenTelemetry = GrpcOpenTelemetry.newBuilder() + .sdk(customSdk) + .enableTracing(true) + .enableMetrics(com.google.common.collect.ImmutableList.of( + "grpc.server.call.duration", + "grpc.server.call.rcvd_total_compressed_message_size", + "grpc.server.call.sent_total_compressed_message_size")) + .build(); + + java.util.concurrent.ExecutorService executor = + java.util.concurrent.Executors.newSingleThreadExecutor(); + io.grpc.inprocess.InProcessServerBuilder serverBuilder = + io.grpc.inprocess.InProcessServerBuilder.forName("test-server").executor(executor); + + serverBuilder.addService( + new io.grpc.testing.protobuf.SimpleServiceGrpc.SimpleServiceImplBase() { + @Override + public void unaryRpc( + io.grpc.testing.protobuf.SimpleRequest request, + io.grpc.stub.StreamObserver observer) { + observer.onNext(io.grpc.testing.protobuf.SimpleResponse.getDefaultInstance()); + observer.onCompleted(); + } + }); + + grpcOpenTelemetry.configureServerBuilder(serverBuilder); + server = serverBuilder.build().start(); + + io.grpc.inprocess.InProcessChannelBuilder channelBuilder = + io.grpc.inprocess.InProcessChannelBuilder.forName("test-server").directExecutor(); + grpcOpenTelemetry.configureChannelBuilder(channelBuilder); + channel = channelBuilder.build(); + io.grpc.testing.protobuf.SimpleServiceGrpc.SimpleServiceBlockingStub stub = + io.grpc.testing.protobuf.SimpleServiceGrpc.newBlockingStub(channel); + + Baggage baggage = Baggage.builder().put("my.baggage.key", "my.baggage.value").build(); + try (io.opentelemetry.context.Scope scope = + io.opentelemetry.context.Context.root().with(baggage).makeCurrent()) { + stub.unaryRpc(io.grpc.testing.protobuf.SimpleRequest.getDefaultInstance()); } + + io.opentelemetry.api.common.Attributes expectedAttributes = + io.opentelemetry.api.common.Attributes.builder() + .put("grpc.method", "grpc.testing.SimpleService/UnaryRpc") + .put("grpc.status", "OK") + .put("my.baggage.key", "my.baggage.value") + .build(); + + assertThat(openTelemetryTesting.getMetrics()) + .filteredOn(m -> m.getName().equals("grpc.server.call.duration")) + .satisfiesExactly(metric -> assertThat(metric) + .hasName("grpc.server.call.duration") + .hasHistogramSatisfying( + histogram -> histogram.hasPointsSatisfying( + point -> point.hasAttributes(expectedAttributes)))); + + assertThat(openTelemetryTesting.getMetrics()) + .filteredOn(m -> m.getName().equals( + "grpc.server.call.sent_total_compressed_message_size")) + .satisfiesExactly(metric -> assertThat(metric) + .hasName("grpc.server.call.sent_total_compressed_message_size") + .hasHistogramSatisfying( + histogram -> histogram.hasPointsSatisfying( + point -> point.hasAttributes(expectedAttributes)))); + + assertThat(openTelemetryTesting.getMetrics()) + .filteredOn(m -> m.getName().equals( + "grpc.server.call.rcvd_total_compressed_message_size")) + .satisfiesExactly(metric -> assertThat(metric) + .hasName("grpc.server.call.rcvd_total_compressed_message_size") + .hasHistogramSatisfying( + histogram -> histogram.hasPointsSatisfying( + point -> point.hasAttributes(expectedAttributes)))); } }