|
9 | 9 |
|
10 | 10 | import anyio |
11 | 11 | from anyio.streams.memory import MemoryObjectReceiveStream, MemoryObjectSendStream |
| 12 | +from opentelemetry.propagate import inject |
12 | 13 | from pydantic import BaseModel, TypeAdapter |
13 | 14 | from typing_extensions import Self |
14 | 15 |
|
@@ -266,6 +267,9 @@ async def send_request( |
266 | 267 | # Store the callback for this request |
267 | 268 | self._progress_callbacks[request_id] = progress_callback |
268 | 269 |
|
| 270 | + # Propagate opentelemetry trace context |
| 271 | + self._inject_otel_context(request_data) |
| 272 | + |
269 | 273 | try: |
270 | 274 | jsonrpc_request = JSONRPCRequest(jsonrpc="2.0", id=request_id, **request_data) |
271 | 275 | await self._write_stream.send(SessionMessage(message=jsonrpc_request, metadata=metadata)) |
@@ -298,18 +302,37 @@ async def send_notification( |
298 | 302 | related_request_id: RequestId | None = None, |
299 | 303 | ) -> None: |
300 | 304 | """Emits a notification, which is a one-way message that does not expect a response.""" |
| 305 | + |
| 306 | + request_data = notification.model_dump(by_alias=True, mode="json", exclude_none=True) |
| 307 | + # Propagate opentelemetry trace context |
| 308 | + self._inject_otel_context(request_data) |
| 309 | + jsonrpc_notification = JSONRPCNotification(jsonrpc="2.0", **request_data) |
| 310 | + |
301 | 311 | # Some transport implementations may need to set the related_request_id |
302 | 312 | # to attribute to the notifications to the request that triggered them. |
303 | | - jsonrpc_notification = JSONRPCNotification( |
304 | | - jsonrpc="2.0", |
305 | | - **notification.model_dump(by_alias=True, mode="json", exclude_none=True), |
306 | | - ) |
307 | 313 | session_message = SessionMessage( |
308 | 314 | message=jsonrpc_notification, |
309 | 315 | metadata=ServerMessageMetadata(related_request_id=related_request_id) if related_request_id else None, |
310 | 316 | ) |
311 | 317 | await self._write_stream.send(session_message) |
312 | 318 |
|
| 319 | + def _inject_otel_context(self, request: dict[str, Any]) -> None: |
| 320 | + """Propagate OpenTelemetry context in `_meta`. |
| 321 | +
|
| 322 | + See |
| 323 | + - SEP414 https://github.com/modelcontextprotocol/modelcontextprotocol/pull/414 |
| 324 | + - OpenTelemetry semantic conventions |
| 325 | + https://github.com/open-telemetry/semantic-conventions/blob/v1.39.0/docs/gen-ai/mcp.md |
| 326 | + """ |
| 327 | + |
| 328 | + carrier: dict[str, str] = {} |
| 329 | + inject(carrier) |
| 330 | + if not carrier: |
| 331 | + return |
| 332 | + |
| 333 | + meta: dict[str, Any] = request.setdefault("params", {}).setdefault("_meta", {}) |
| 334 | + meta.update(carrier) |
| 335 | + |
313 | 336 | async def _send_response(self, request_id: RequestId, response: SendResultT | ErrorData) -> None: |
314 | 337 | if isinstance(response, ErrorData): |
315 | 338 | jsonrpc_error = JSONRPCError(jsonrpc="2.0", id=request_id, error=response) |
|
0 commit comments