From 8597ef7caf0e9a19fe6164a4e938472637cf7167 Mon Sep 17 00:00:00 2001 From: JY Tan Date: Fri, 6 Feb 2026 16:00:08 -0800 Subject: [PATCH 1/3] Commit --- drift/core/tracing/td_attributes.py | 6 ++ drift/core/tracing/td_span_processor.py | 13 ++++ drift/instrumentation/django/middleware.py | 8 +++ tests/unit/test_td_span_processor.py | 69 ++++++++++++++++++++++ 4 files changed, 96 insertions(+) diff --git a/drift/core/tracing/td_attributes.py b/drift/core/tracing/td_attributes.py index c0bc0c9..e96bbb3 100644 --- a/drift/core/tracing/td_attributes.py +++ b/drift/core/tracing/td_attributes.py @@ -39,5 +39,11 @@ class TdSpanAttributes: TRANSFORM_METADATA = "td.transform_metadata" STACK_TRACE = "td.stack_trace" + # Export control + # Set by framework middleware (e.g., Django) after exporting a full span + # via sdk.collect_span(). Tells TdSpanProcessor.on_end() to skip the span + # so it doesn't produce a duplicate empty root span. + EXPORTED_BY_INSTRUMENTATION = "td.exported_by_instrumentation" + # Replay mode REPLAY_TRACE_ID = "td.replay_trace_id" diff --git a/drift/core/tracing/td_span_processor.py b/drift/core/tracing/td_span_processor.py index be616d8..4e4522d 100644 --- a/drift/core/tracing/td_span_processor.py +++ b/drift/core/tracing/td_span_processor.py @@ -122,6 +122,19 @@ def on_end(self, span: ReadableSpan) -> None: ) return + # Skip spans already exported by framework middleware (e.g., Django _capture_span). + # Those middlewares create a full CleanSpanData with HTTP body data and export it + # directly via sdk.collect_span(). Processing them again here would produce a + # duplicate root span with empty inputValue/outputValue. + attributes = dict(span.attributes) if span.attributes else {} + from .td_attributes import TdSpanAttributes + + if attributes.get(TdSpanAttributes.EXPORTED_BY_INSTRUMENTATION): + logger.debug( + f"[TdSpanProcessor] Skipping span '{span.name}' - already exported by instrumentation" + ) + return + try: # Convert OTel span to CleanSpanData logger.debug(f"[TdSpanProcessor] Converting span '{span.name}' to CleanSpanData") diff --git a/drift/instrumentation/django/middleware.py b/drift/instrumentation/django/middleware.py index d052429..d4d6fe3 100644 --- a/drift/instrumentation/django/middleware.py +++ b/drift/instrumentation/django/middleware.py @@ -457,6 +457,10 @@ def dict_to_schema_merges(merges_dict): sdk.collect_span(clean_span) + # Mark OTel span so TdSpanProcessor.on_end() skips it - we already + # exported the full span with HTTP body data above. + span_info.span.set_attribute(TdSpanAttributes.EXPORTED_BY_INSTRUMENTATION, True) + def _capture_error_span(self, request: HttpRequest, exception: Exception, span_info: SpanInfo) -> None: """Create and collect an error span. @@ -553,3 +557,7 @@ def dict_to_schema_merges(merges_dict): ) sdk.collect_span(clean_span) + + # Mark OTel span so TdSpanProcessor.on_end() skips it - we already + # exported the full error span above. + span_info.span.set_attribute(TdSpanAttributes.EXPORTED_BY_INSTRUMENTATION, True) diff --git a/tests/unit/test_td_span_processor.py b/tests/unit/test_td_span_processor.py index eae2f61..647d01d 100644 --- a/tests/unit/test_td_span_processor.py +++ b/tests/unit/test_td_span_processor.py @@ -235,6 +235,75 @@ def test_processes_drift_span_in_record_mode(self, mocker): processor._batch_processor.add_span.assert_called_once_with(mock_clean_span) + def test_skips_span_already_exported_by_instrumentation(self, mocker): + """Should skip spans with EXPORTED_BY_INSTRUMENTATION attribute set. + + Framework middlewares (e.g., Django _capture_span) export a full + CleanSpanData with HTTP body data via sdk.collect_span() and then + set this attribute. Processing the span again in on_end() would + produce a duplicate empty root span. + """ + from drift.core.tracing.td_attributes import TdSpanAttributes + + mock_exporter = mocker.MagicMock() + processor = TdSpanProcessor( + exporter=mock_exporter, + mode=TuskDriftMode.RECORD, + ) + processor._started = True + processor._batch_processor = mocker.MagicMock() + + mock_span = self._create_mock_span(mocker) + mock_span.attributes = { + TdSpanAttributes.EXPORTED_BY_INSTRUMENTATION: True, + } + + mock_converter = mocker.patch("drift.core.tracing.td_span_processor.otel_span_to_clean_span_data") + + processor.on_end(mock_span) + + # Should not convert or add to batch — span was already exported + mock_converter.assert_not_called() + processor._batch_processor.add_span.assert_not_called() + + def test_processes_span_without_exported_flag(self, mocker): + """Should process spans that don't have EXPORTED_BY_INSTRUMENTATION set. + + Child spans (redis, psycopg2, etc.) and framework spans that use + the OTel-only export path should still be processed normally. + """ + mock_exporter = mocker.MagicMock() + processor = TdSpanProcessor( + exporter=mock_exporter, + mode=TuskDriftMode.RECORD, + ) + processor._started = True + processor._batch_processor = mocker.MagicMock() + + mock_blocking = mocker.patch("drift.core.tracing.td_span_processor.TraceBlockingManager") + mock_blocking_instance = mocker.MagicMock() + mock_blocking_instance.is_trace_blocked.return_value = False + mock_blocking.get_instance.return_value = mock_blocking_instance + + mock_converter = mocker.patch("drift.core.tracing.td_span_processor.otel_span_to_clean_span_data") + mock_clean_span = mocker.MagicMock() + mock_clean_span.trace_id = "a" * 32 + mock_clean_span.name = "redis.MGET" + mock_clean_span.kind = SpanKind.CLIENT + mock_clean_span.status.code = StatusCode.OK + mock_clean_span.to_proto.return_value = mocker.MagicMock() + mock_converter.return_value = mock_clean_span + + mock_span = self._create_mock_span(mocker, name="redis.MGET") + # No EXPORTED_BY_INSTRUMENTATION attribute — should process normally + mock_span.attributes = {} + + mocker.patch("drift.core.tracing.td_span_processor.should_block_span", return_value=False) + processor.on_end(mock_span) + + mock_converter.assert_called_once() + processor._batch_processor.add_span.assert_called_once_with(mock_clean_span) + def test_skips_blocked_trace(self, mocker): """Should skip processing when trace is blocked.""" mock_exporter = mocker.MagicMock() From 738c59368997792557a68ad8b3b4ccf0712277bb Mon Sep 17 00:00:00 2001 From: JY Tan Date: Fri, 6 Feb 2026 16:13:44 -0800 Subject: [PATCH 2/3] Fix lint --- drift/instrumentation/psycopg/instrumentation.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/drift/instrumentation/psycopg/instrumentation.py b/drift/instrumentation/psycopg/instrumentation.py index f2f5f66..5226aee 100644 --- a/drift/instrumentation/psycopg/instrumentation.py +++ b/drift/instrumentation/psycopg/instrumentation.py @@ -243,7 +243,7 @@ async def patched_async_connect(conninfo="", **kwargs): # Replace the classmethod with our patched version AsyncConnection.connect = classmethod( lambda cls, conninfo="", **kwargs: patched_async_connect(conninfo, **kwargs) - ) # type: ignore[method-assign] + ) logger.info("psycopg.AsyncConnection.connect instrumented") # Also patch AsyncConnectionPool to inject cursor_factory @@ -281,7 +281,7 @@ def patched_init(pool_self, conninfo="", **kwargs): return original_init(pool_self, conninfo, **kwargs) - AsyncConnectionPool.__init__ = patched_init # type: ignore[method-assign] + AsyncConnectionPool.__init__ = patched_init logger.info("psycopg_pool.AsyncConnectionPool.__init__ instrumented") def _create_async_cursor_factory(self, sdk: TuskDrift, base_factory=None): @@ -296,7 +296,7 @@ def _create_async_cursor_factory(self, sdk: TuskDrift, base_factory=None): from psycopg import AsyncCursor as BaseAsyncCursor except ImportError: logger.warning("[ASYNC_CURSOR_FACTORY] Could not import psycopg.AsyncCursor") - BaseAsyncCursor = object # type: ignore + BaseAsyncCursor = object base = base_factory or BaseAsyncCursor From 4c0f6301305ee6a315c2b8838f14edb3e2ebe172 Mon Sep 17 00:00:00 2001 From: JY Tan Date: Fri, 6 Feb 2026 16:24:06 -0800 Subject: [PATCH 3/3] Refactor --- drift/core/tracing/td_attributes.py | 6 - drift/core/tracing/td_span_processor.py | 13 -- drift/instrumentation/django/middleware.py | 205 +++++---------------- tests/unit/test_td_span_processor.py | 69 ------- 4 files changed, 46 insertions(+), 247 deletions(-) diff --git a/drift/core/tracing/td_attributes.py b/drift/core/tracing/td_attributes.py index e96bbb3..c0bc0c9 100644 --- a/drift/core/tracing/td_attributes.py +++ b/drift/core/tracing/td_attributes.py @@ -39,11 +39,5 @@ class TdSpanAttributes: TRANSFORM_METADATA = "td.transform_metadata" STACK_TRACE = "td.stack_trace" - # Export control - # Set by framework middleware (e.g., Django) after exporting a full span - # via sdk.collect_span(). Tells TdSpanProcessor.on_end() to skip the span - # so it doesn't produce a duplicate empty root span. - EXPORTED_BY_INSTRUMENTATION = "td.exported_by_instrumentation" - # Replay mode REPLAY_TRACE_ID = "td.replay_trace_id" diff --git a/drift/core/tracing/td_span_processor.py b/drift/core/tracing/td_span_processor.py index 4e4522d..be616d8 100644 --- a/drift/core/tracing/td_span_processor.py +++ b/drift/core/tracing/td_span_processor.py @@ -122,19 +122,6 @@ def on_end(self, span: ReadableSpan) -> None: ) return - # Skip spans already exported by framework middleware (e.g., Django _capture_span). - # Those middlewares create a full CleanSpanData with HTTP body data and export it - # directly via sdk.collect_span(). Processing them again here would produce a - # duplicate root span with empty inputValue/outputValue. - attributes = dict(span.attributes) if span.attributes else {} - from .td_attributes import TdSpanAttributes - - if attributes.get(TdSpanAttributes.EXPORTED_BY_INSTRUMENTATION): - logger.debug( - f"[TdSpanProcessor] Skipping span '{span.name}' - already exported by instrumentation" - ) - return - try: # Convert OTel span to CleanSpanData logger.debug(f"[TdSpanProcessor] Converting span '{span.name}' to CleanSpanData") diff --git a/drift/instrumentation/django/middleware.py b/drift/instrumentation/django/middleware.py index d4d6fe3..464ce85 100644 --- a/drift/instrumentation/django/middleware.py +++ b/drift/instrumentation/django/middleware.py @@ -2,12 +2,15 @@ from __future__ import annotations +import json import logging import time from collections.abc import Callable from typing import TYPE_CHECKING from opentelemetry.trace import SpanKind as OTelSpanKind +from opentelemetry.trace import Status +from opentelemetry.trace import StatusCode as OTelStatusCode logger = logging.getLogger(__name__) @@ -17,13 +20,8 @@ from ...core.tracing import TdSpanAttributes from ...core.tracing.span_utils import CreateSpanOptions, SpanInfo, SpanUtils from ...core.types import ( - CleanSpanData, - Duration, PackageType, SpanKind, - SpanStatus, - StatusCode, - Timestamp, TuskDriftMode, replay_trace_id_context, span_kind_context, @@ -282,7 +280,12 @@ def _normalize_html_response(self, response: HttpResponse) -> HttpResponse: return normalize_html_response(response) def _capture_span(self, request: HttpRequest, response: HttpResponse, span_info: SpanInfo) -> None: - """Create and collect a span from request/response data. + """Finalize span with request/response data by setting OTel attributes. + + Sets INPUT_VALUE, OUTPUT_VALUE, schema merges, and status on the OTel + span. When span.end() is called, TdSpanProcessor.on_end() converts + these attributes to CleanSpanData and exports it - the same single-write + pattern used by Flask (WSGI handler) and FastAPI. Args: request: Django HttpRequest object @@ -294,12 +297,7 @@ def _capture_span(self, request: HttpRequest, response: HttpResponse, span_info: if not start_time_ns or not span_info.span.is_recording(): return - # Use trace_id and span_id from span_info trace_id = span_info.trace_id - span_id = span_info.span_id - - end_time_ns = time.time_ns() - duration_ns = end_time_ns - start_time_ns # Build input_value using WSGI utilities request_body = getattr(request, "_drift_request_body", None) @@ -357,7 +355,8 @@ def _capture_span(self, request: HttpRequest, response: HttpResponse, span_info: f"Blocking trace {trace_id} - binary response: {content_type} " f"(decoded as {decoded_type.name if decoded_type else 'unknown'})" ) - return # Skip span creation + span_info.span.set_status(Status(OTelStatusCode.ERROR, "Binary content blocked")) + return # Apply transforms if present transform_metadata = None @@ -372,97 +371,41 @@ def _capture_span(self, request: HttpRequest, response: HttpResponse, span_info: input_value = span_data.input_value or input_value output_value = span_data.output_value or output_value - # Build schema merges and generate schemas - # Note: Django uses direct CleanSpanData creation instead of OTel spans, - # so we need to generate schemas here instead of in the converter - from ...core.json_schema_helper import JsonSchemaHelper - - input_schema_merges_dict = build_input_schema_merges(input_value) - output_schema_merges_dict = build_output_schema_merges(output_value) - - # Convert dict back to SchemaMerge objects for JsonSchemaHelper - from ...core.json_schema_helper import DecodedType, EncodingType, SchemaMerge - - def dict_to_schema_merges(merges_dict): - result = {} - for key, merge_data in merges_dict.items(): - encoding = EncodingType(merge_data["encoding"]) if "encoding" in merge_data else None - decoded_type = DecodedType(merge_data["decoded_type"]) if "decoded_type" in merge_data else None - match_importance = merge_data.get("match_importance") - result[key] = SchemaMerge( - encoding=encoding, decoded_type=decoded_type, match_importance=match_importance - ) - return result - - input_schema_merges = dict_to_schema_merges(input_schema_merges_dict) - output_schema_merges = dict_to_schema_merges(output_schema_merges_dict) - - input_schema_info = JsonSchemaHelper.generate_schema_and_hash(input_value, input_schema_merges) - output_schema_info = JsonSchemaHelper.generate_schema_and_hash(output_value, output_schema_merges) - - from ...core.drift_sdk import TuskDrift - - sdk = TuskDrift.get_instance() - # Derive timestamp from start_time_ns - timestamp_seconds = start_time_ns // 1_000_000_000 - timestamp_nanos = start_time_ns % 1_000_000_000 - duration_seconds = duration_ns // 1_000_000_000 - duration_nanos = duration_ns % 1_000_000_000 - - # Match Node SDK: >= 400 is considered an error - if status_code >= 400: - status = SpanStatus(code=StatusCode.ERROR, message=f"HTTP {status_code}") - else: - status = SpanStatus(code=StatusCode.OK, message="") - # Django-specific: use route template for span name to avoid cardinality explosion method = request.method or "" route_template = getattr(request, "_drift_route_template", None) if route_template: - # Use route template (e.g., "users//") span_name = f"{method} {route_template}" else: - # Fallback to literal path (e.g., for 404s) span_name = f"{method} {request.path}" + span_info.span.set_attribute(TdSpanAttributes.NAME, span_name) - # Only create and collect span in RECORD mode - # In REPLAY mode, we only set up context for child spans but don't record the root span - if sdk.mode == TuskDriftMode.RECORD: - clean_span = CleanSpanData( - trace_id=trace_id, - span_id=span_id, - parent_span_id="", - name=span_name, - package_name="django", - instrumentation_name="DjangoInstrumentation", - submodule_name=method, - package_type=PackageType.HTTP, - kind=SpanKind.SERVER, - input_value=input_value, - output_value=output_value, - input_schema=input_schema_info.schema, - output_schema=output_schema_info.schema, - input_value_hash=input_schema_info.decoded_value_hash, - output_value_hash=output_schema_info.decoded_value_hash, - input_schema_hash=input_schema_info.decoded_schema_hash, - output_schema_hash=output_schema_info.decoded_schema_hash, - status=status, - is_pre_app_start=span_info.is_pre_app_start, - is_root_span=True, - timestamp=Timestamp(seconds=timestamp_seconds, nanos=timestamp_nanos), - duration=Duration(seconds=duration_seconds, nanos=duration_nanos), - transform_metadata=transform_metadata, - metadata=None, - ) + # Set data attributes - TdSpanProcessor.on_end() reads these to build CleanSpanData + span_info.span.set_attribute(TdSpanAttributes.INPUT_VALUE, json.dumps(input_value)) + span_info.span.set_attribute(TdSpanAttributes.OUTPUT_VALUE, json.dumps(output_value)) + + # Set schema merge hints (schemas are generated at export time by the converter) + input_schema_merges = build_input_schema_merges(input_value) + output_schema_merges = build_output_schema_merges(output_value) + span_info.span.set_attribute(TdSpanAttributes.INPUT_SCHEMA_MERGES, json.dumps(input_schema_merges)) + span_info.span.set_attribute(TdSpanAttributes.OUTPUT_SCHEMA_MERGES, json.dumps(output_schema_merges)) - sdk.collect_span(clean_span) + if transform_metadata: + span_info.span.set_attribute(TdSpanAttributes.TRANSFORM_METADATA, json.dumps(transform_metadata)) - # Mark OTel span so TdSpanProcessor.on_end() skips it - we already - # exported the full span with HTTP body data above. - span_info.span.set_attribute(TdSpanAttributes.EXPORTED_BY_INSTRUMENTATION, True) + # Set status based on HTTP status code + if status_code >= 400: + span_info.span.set_status(Status(OTelStatusCode.ERROR, f"HTTP {status_code}")) + else: + span_info.span.set_status(Status(OTelStatusCode.OK)) def _capture_error_span(self, request: HttpRequest, exception: Exception, span_info: SpanInfo) -> None: - """Create and collect an error span. + """Finalize span with error data by setting OTel attributes. + + Sets INPUT_VALUE, OUTPUT_VALUE (with error info), schema merges, and + ERROR status on the OTel span. When span.end() is called, + TdSpanProcessor.on_end() converts and exports - same pattern as + Flask/FastAPI. Args: request: Django HttpRequest object @@ -474,13 +417,6 @@ def _capture_error_span(self, request: HttpRequest, exception: Exception, span_i if not start_time_ns or not span_info.span.is_recording(): return - # Use trace_id and span_id from span_info - trace_id = span_info.trace_id - span_id = span_info.span_id - - end_time_ns = time.time_ns() - duration_ns = end_time_ns - start_time_ns - # Build input_value request_body = getattr(request, "_drift_request_body", None) input_value = build_input_value(request.META, request_body) @@ -494,70 +430,21 @@ def _capture_error_span(self, request: HttpRequest, exception: Exception, span_i str(exception), ) - # Build schema merges and generate schemas - from ...core.json_schema_helper import DecodedType, EncodingType, JsonSchemaHelper, SchemaMerge - - input_schema_merges_dict = build_input_schema_merges(input_value) - output_schema_merges_dict = build_output_schema_merges(output_value) - - def dict_to_schema_merges(merges_dict): - result = {} - for key, merge_data in merges_dict.items(): - encoding = EncodingType(merge_data["encoding"]) if "encoding" in merge_data else None - decoded_type = DecodedType(merge_data["decoded_type"]) if "decoded_type" in merge_data else None - match_importance = merge_data.get("match_importance") - result[key] = SchemaMerge( - encoding=encoding, decoded_type=decoded_type, match_importance=match_importance - ) - return result - - input_schema_merges = dict_to_schema_merges(input_schema_merges_dict) - output_schema_merges = dict_to_schema_merges(output_schema_merges_dict) - - input_schema_info = JsonSchemaHelper.generate_schema_and_hash(input_value, input_schema_merges) - output_schema_info = JsonSchemaHelper.generate_schema_and_hash(output_value, output_schema_merges) - - from ...core.drift_sdk import TuskDrift - - sdk = TuskDrift.get_instance() - timestamp_seconds = start_time_ns // 1_000_000_000 - timestamp_nanos = start_time_ns % 1_000_000_000 - duration_seconds = duration_ns // 1_000_000_000 - duration_nanos = duration_ns % 1_000_000_000 - + # Update span name with route template method = request.method or "" route_template = getattr(request, "_drift_route_template", None) span_name = f"{method} {route_template}" if route_template else f"{method} {request.path}" + span_info.span.set_attribute(TdSpanAttributes.NAME, span_name) - clean_span = CleanSpanData( - trace_id=trace_id, - span_id=span_id, - parent_span_id="", - name=span_name, - package_name="django", - instrumentation_name="DjangoInstrumentation", - submodule_name=method, - package_type=PackageType.HTTP, - kind=SpanKind.SERVER, - input_value=input_value, - output_value=output_value, - input_schema=input_schema_info.schema, - output_schema=output_schema_info.schema, - input_value_hash=input_schema_info.decoded_value_hash, - output_value_hash=output_schema_info.decoded_value_hash, - input_schema_hash=input_schema_info.decoded_schema_hash, - output_schema_hash=output_schema_info.decoded_schema_hash, - status=SpanStatus(code=StatusCode.ERROR, message=f"Exception: {type(exception).__name__}"), - is_pre_app_start=span_info.is_pre_app_start, - is_root_span=True, - timestamp=Timestamp(seconds=timestamp_seconds, nanos=timestamp_nanos), - duration=Duration(seconds=duration_seconds, nanos=duration_nanos), - transform_metadata=None, - metadata=None, - ) + # Set data attributes - TdSpanProcessor.on_end() reads these to build CleanSpanData + span_info.span.set_attribute(TdSpanAttributes.INPUT_VALUE, json.dumps(input_value)) + span_info.span.set_attribute(TdSpanAttributes.OUTPUT_VALUE, json.dumps(output_value)) - sdk.collect_span(clean_span) + # Set schema merge hints (schemas are generated at export time by the converter) + input_schema_merges = build_input_schema_merges(input_value) + output_schema_merges = build_output_schema_merges(output_value) + span_info.span.set_attribute(TdSpanAttributes.INPUT_SCHEMA_MERGES, json.dumps(input_schema_merges)) + span_info.span.set_attribute(TdSpanAttributes.OUTPUT_SCHEMA_MERGES, json.dumps(output_schema_merges)) - # Mark OTel span so TdSpanProcessor.on_end() skips it - we already - # exported the full error span above. - span_info.span.set_attribute(TdSpanAttributes.EXPORTED_BY_INSTRUMENTATION, True) + # Set error status + span_info.span.set_status(Status(OTelStatusCode.ERROR, f"Exception: {type(exception).__name__}")) diff --git a/tests/unit/test_td_span_processor.py b/tests/unit/test_td_span_processor.py index 647d01d..eae2f61 100644 --- a/tests/unit/test_td_span_processor.py +++ b/tests/unit/test_td_span_processor.py @@ -235,75 +235,6 @@ def test_processes_drift_span_in_record_mode(self, mocker): processor._batch_processor.add_span.assert_called_once_with(mock_clean_span) - def test_skips_span_already_exported_by_instrumentation(self, mocker): - """Should skip spans with EXPORTED_BY_INSTRUMENTATION attribute set. - - Framework middlewares (e.g., Django _capture_span) export a full - CleanSpanData with HTTP body data via sdk.collect_span() and then - set this attribute. Processing the span again in on_end() would - produce a duplicate empty root span. - """ - from drift.core.tracing.td_attributes import TdSpanAttributes - - mock_exporter = mocker.MagicMock() - processor = TdSpanProcessor( - exporter=mock_exporter, - mode=TuskDriftMode.RECORD, - ) - processor._started = True - processor._batch_processor = mocker.MagicMock() - - mock_span = self._create_mock_span(mocker) - mock_span.attributes = { - TdSpanAttributes.EXPORTED_BY_INSTRUMENTATION: True, - } - - mock_converter = mocker.patch("drift.core.tracing.td_span_processor.otel_span_to_clean_span_data") - - processor.on_end(mock_span) - - # Should not convert or add to batch — span was already exported - mock_converter.assert_not_called() - processor._batch_processor.add_span.assert_not_called() - - def test_processes_span_without_exported_flag(self, mocker): - """Should process spans that don't have EXPORTED_BY_INSTRUMENTATION set. - - Child spans (redis, psycopg2, etc.) and framework spans that use - the OTel-only export path should still be processed normally. - """ - mock_exporter = mocker.MagicMock() - processor = TdSpanProcessor( - exporter=mock_exporter, - mode=TuskDriftMode.RECORD, - ) - processor._started = True - processor._batch_processor = mocker.MagicMock() - - mock_blocking = mocker.patch("drift.core.tracing.td_span_processor.TraceBlockingManager") - mock_blocking_instance = mocker.MagicMock() - mock_blocking_instance.is_trace_blocked.return_value = False - mock_blocking.get_instance.return_value = mock_blocking_instance - - mock_converter = mocker.patch("drift.core.tracing.td_span_processor.otel_span_to_clean_span_data") - mock_clean_span = mocker.MagicMock() - mock_clean_span.trace_id = "a" * 32 - mock_clean_span.name = "redis.MGET" - mock_clean_span.kind = SpanKind.CLIENT - mock_clean_span.status.code = StatusCode.OK - mock_clean_span.to_proto.return_value = mocker.MagicMock() - mock_converter.return_value = mock_clean_span - - mock_span = self._create_mock_span(mocker, name="redis.MGET") - # No EXPORTED_BY_INSTRUMENTATION attribute — should process normally - mock_span.attributes = {} - - mocker.patch("drift.core.tracing.td_span_processor.should_block_span", return_value=False) - processor.on_end(mock_span) - - mock_converter.assert_called_once() - processor._batch_processor.add_span.assert_called_once_with(mock_clean_span) - def test_skips_blocked_trace(self, mocker): """Should skip processing when trace is blocked.""" mock_exporter = mocker.MagicMock()