Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@
import io.grpc.opentelemetry.GrpcOpenTelemetry.TargetFilter;
import io.opentelemetry.api.baggage.Baggage;
import io.opentelemetry.api.common.AttributesBuilder;
import io.opentelemetry.context.Context;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -159,14 +158,6 @@ static String recordMethodName(String fullMethodName, boolean isGeneratedMethod)
return isGeneratedMethod ? fullMethodName : "other";
}

private static Context otelContextWithBaggage() {
Baggage baggage = BAGGAGE_KEY.get();
if (baggage == null) {
return Context.current();
}
return Context.current().with(baggage);
}

private static final class ClientTracer extends ClientStreamTracer {
@Nullable private static final AtomicLongFieldUpdater<ClientTracer> outboundWireSizeUpdater;
@Nullable private static final AtomicLongFieldUpdater<ClientTracer> inboundWireSizeUpdater;
Expand Down Expand Up @@ -282,7 +273,6 @@ public void streamClosed(Status status) {
}

void recordFinishedAttempt() {
Context otelContext = otelContextWithBaggage();
AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder()
.put(METHOD_KEY, fullMethodName)
.put(TARGET_KEY, target)
Expand All @@ -308,15 +298,15 @@ void recordFinishedAttempt() {

if (module.resource.clientAttemptDurationCounter() != null ) {
module.resource.clientAttemptDurationCounter()
.record(attemptNanos * SECONDS_PER_NANO, attribute, otelContext);
.record(attemptNanos * SECONDS_PER_NANO, attribute);
}
if (module.resource.clientTotalSentCompressedMessageSizeCounter() != null) {
module.resource.clientTotalSentCompressedMessageSizeCounter()
.record(outboundWireSize, attribute, otelContext);
.record(outboundWireSize, attribute);
}
if (module.resource.clientTotalReceivedCompressedMessageSizeCounter() != null) {
module.resource.clientTotalReceivedCompressedMessageSizeCounter()
.record(inboundWireSize, attribute, otelContext);
.record(inboundWireSize, attribute);
}
}
}
Expand Down Expand Up @@ -448,7 +438,6 @@ void callEnded(Status status) {
}

void recordFinishedCall() {
Context otelContext = otelContextWithBaggage();
if (attemptsPerCall.get() == 0) {
ClientTracer tracer = newClientTracer(null);
tracer.attemptNanos = attemptDelayStopwatch.elapsed(TimeUnit.NANOSECONDS);
Expand All @@ -470,8 +459,7 @@ void recordFinishedCall() {
callLatencyNanos * SECONDS_PER_NANO,
baseAttributes.toBuilder()
.put(STATUS_KEY, status.getCode().toString())
.build(),
otelContext
.build()
);
}

Expand All @@ -480,7 +468,7 @@ void recordFinishedCall() {
long retriesPerCall = Math.max(attemptsPerCall.get() - 1, 0);
if (retriesPerCall > 0) {
module.resource.clientCallRetriesCounter()
.record(retriesPerCall, baseAttributes, otelContext);
.record(retriesPerCall, baseAttributes);
}
}

Expand All @@ -489,7 +477,7 @@ void recordFinishedCall() {
long hedges = hedgedAttemptsPerCall.get();
if (hedges > 0) {
module.resource.clientCallHedgesCounter()
.record(hedges, baseAttributes, otelContext);
.record(hedges, baseAttributes);
}
}

Expand All @@ -498,16 +486,15 @@ void recordFinishedCall() {
long transparentRetries = transparentRetriesPerCall.get();
if (transparentRetries > 0) {
module.resource.clientCallTransparentRetriesCounter()
.record(transparentRetries, baseAttributes, otelContext);
.record(transparentRetries, baseAttributes);
}
}

// Retry delay
if (module.resource.clientCallRetryDelayCounter() != null) {
module.resource.clientCallRetryDelayCounter().record(
retryDelayNanos * SECONDS_PER_NANO,
baseAttributes,
otelContext
baseAttributes
);
}
}
Expand Down Expand Up @@ -553,6 +540,7 @@ private static final class ServerTracer extends ServerStreamTracer {
private final Stopwatch stopwatch;
private volatile long outboundWireSize;
private volatile long inboundWireSize;
private volatile Baggage baggage;

ServerTracer(OpenTelemetryMetricsModule module, String fullMethodName,
List<OpenTelemetryPlugin.ServerStreamPlugin> streamPlugins) {
Expand All @@ -568,13 +556,23 @@ public void serverCallStarted(ServerCallInfo<?, ?> callInfo) {
// which is true for all generated methods. Otherwise, programmatically
// created methods result in high cardinality metrics.
boolean isSampledToLocalTracing = callInfo.getMethodDescriptor().isSampledToLocalTracing();
baggage = BAGGAGE_KEY.get(io.grpc.Context.current());
isGeneratedMethod = isSampledToLocalTracing;
io.opentelemetry.api.common.Attributes attribute =
io.opentelemetry.api.common.Attributes.of(
METHOD_KEY, recordMethodName(fullMethodName, isSampledToLocalTracing));

AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder()
.put(METHOD_KEY, recordMethodName(fullMethodName, isSampledToLocalTracing));

if (baggage != null) {
for (java.util.Map.Entry<String, io.opentelemetry.api.baggage.BaggageEntry> entry :
baggage.asMap().entrySet()) {
builder.put(entry.getKey(), entry.getValue().getValue());
}
}

io.opentelemetry.api.common.Attributes attributes = builder.build();

if (module.resource.serverCallCountCounter() != null) {
module.resource.serverCallCountCounter().add(1, attribute);
module.resource.serverCallCountCounter().add(1, attributes);
}
}

Expand Down Expand Up @@ -606,7 +604,6 @@ public void inboundWireSize(long bytes) {
*/
@Override
public void streamClosed(Status status) {
Context otelContext = otelContextWithBaggage();
if (streamClosedUpdater != null) {
if (streamClosedUpdater.getAndSet(this, 1) != 0) {
return;
Expand All @@ -622,22 +619,30 @@ public void streamClosed(Status status) {
AttributesBuilder builder = io.opentelemetry.api.common.Attributes.builder()
.put(METHOD_KEY, recordMethodName(fullMethodName, isGeneratedMethod))
.put(STATUS_KEY, status.getCode().toString());

if (baggage != null) {
for (java.util.Map.Entry<String, io.opentelemetry.api.baggage.BaggageEntry> entry :
baggage.asMap().entrySet()) {
builder.put(entry.getKey(), entry.getValue().getValue());
}
}

for (OpenTelemetryPlugin.ServerStreamPlugin plugin : streamPlugins) {
plugin.addLabels(builder);
}
io.opentelemetry.api.common.Attributes attributes = builder.build();

if (module.resource.serverCallDurationCounter() != null) {
module.resource.serverCallDurationCounter()
.record(elapsedTimeNanos * SECONDS_PER_NANO, attributes, otelContext);
.record(elapsedTimeNanos * SECONDS_PER_NANO, attributes);
}
if (module.resource.serverTotalSentCompressedMessageSizeCounter() != null) {
module.resource.serverTotalSentCompressedMessageSizeCounter()
.record(outboundWireSize, attributes, otelContext);
.record(outboundWireSize, attributes);
}
if (module.resource.serverTotalReceivedCompressedMessageSizeCounter() != null) {
module.resource.serverTotalReceivedCompressedMessageSizeCounter()
.record(inboundWireSize, attributes, otelContext);
.record(inboundWireSize, attributes);
}
}
}
Expand All @@ -657,7 +662,8 @@ public ServerStreamTracer newServerStreamTracer(String fullMethodName, Metadata
}
streamPlugins = Collections.unmodifiableList(streamPluginsMutable);
}
return new ServerTracer(OpenTelemetryMetricsModule.this, fullMethodName, streamPlugins);
return new ServerTracer(OpenTelemetryMetricsModule.this, fullMethodName,
streamPlugins);
}
}

Expand Down Expand Up @@ -717,3 +723,4 @@ public void onClose(Status status, Metadata trailers) {
}
}
}

Loading