From 6490021b4f19b745f70b4a26578ad0b0549d0889 Mon Sep 17 00:00:00 2001 From: tewbo Date: Sat, 21 Mar 2026 06:27:27 +0300 Subject: [PATCH 1/8] + otel sync tracing support --- examples/opentelemetry/compose-e2e.yaml | 61 +++ examples/opentelemetry/example.py | 42 +++ .../grafana/dashboards/README.md | 5 + .../provisioning/dashboards/dashboards.yaml | 13 + .../provisioning/datasources/datasources.yaml | 22 ++ .../opentelemetry/otel-collector-config.yaml | 44 +++ examples/opentelemetry/prometheus.yaml | 7 + examples/opentelemetry/tempo.yaml | 15 + examples/opentelemetry/ydb_config/README.md | 28 ++ .../ydb_config/otel-tracing-snippet.yaml | 26 ++ .../ydb_config/ydb-config-with-tracing.yaml | 349 ++++++++++++++++++ setup.py | 1 + ydb/connection.py | 4 + ydb/opentelemetry/__init__.py | 8 + ydb/opentelemetry/_plugin.py | 96 +++++ ydb/opentelemetry/tracing.py | 85 +++++ ydb/pool.py | 10 +- ydb/query/session.py | 54 +-- ydb/query/transaction.py | 93 +++-- 19 files changed, 894 insertions(+), 69 deletions(-) create mode 100644 examples/opentelemetry/compose-e2e.yaml create mode 100644 examples/opentelemetry/example.py create mode 100644 examples/opentelemetry/grafana/dashboards/README.md create mode 100644 examples/opentelemetry/grafana/provisioning/dashboards/dashboards.yaml create mode 100644 examples/opentelemetry/grafana/provisioning/datasources/datasources.yaml create mode 100644 examples/opentelemetry/otel-collector-config.yaml create mode 100644 examples/opentelemetry/prometheus.yaml create mode 100644 examples/opentelemetry/tempo.yaml create mode 100644 examples/opentelemetry/ydb_config/README.md create mode 100644 examples/opentelemetry/ydb_config/otel-tracing-snippet.yaml create mode 100644 examples/opentelemetry/ydb_config/ydb-config-with-tracing.yaml create mode 100644 ydb/opentelemetry/__init__.py create mode 100644 ydb/opentelemetry/_plugin.py create mode 100644 ydb/opentelemetry/tracing.py diff --git a/examples/opentelemetry/compose-e2e.yaml b/examples/opentelemetry/compose-e2e.yaml new file mode 100644 index 00000000..933d9a38 --- /dev/null +++ b/examples/opentelemetry/compose-e2e.yaml @@ -0,0 +1,61 @@ +version: "3.3" +services: + ydb: + image: ydbplatform/local-ydb:trunk + restart: always + hostname: localhost + platform: linux/amd64 + environment: + YDB_DEFAULT_LOG_LEVEL: NOTICE + GRPC_TLS_PORT: "2135" + GRPC_PORT: "2136" + MON_PORT: "8765" + YDB_USE_IN_MEMORY_PDISKS: "true" + command: [ "--config-path", "/ydb_config/ydb-config-with-tracing.yaml" ] + ports: + - "2135:2135" + - "2136:2136" + - "8765:8765" + volumes: + - ./ydb_config:/ydb_config:ro + + otel-collector: + image: otel/opentelemetry-collector-contrib:latest + command: [ "--config=/etc/otelcol/config.yaml" ] + volumes: + - ./otel-collector-config.yaml:/etc/otelcol/config.yaml:ro + ports: + - "4317:4317" + - "4318:4318" + - "9464:9464" + - "13133:13133" + - "13317:55679" + + prometheus: + image: prom/prometheus:latest + volumes: + - ./prometheus.yaml:/etc/prometheus/prometheus.yml:ro + ports: + - "9090:9090" + depends_on: [ otel-collector ] + + tempo: + image: grafana/tempo:2.4.1 + command: [ "-config.file=/etc/tempo.yaml" ] + volumes: + - ./tempo.yaml:/etc/tempo.yaml:ro + ports: + - "3200:3200" + depends_on: [ otel-collector ] + + grafana: + image: grafana/grafana:10.4.2 + environment: + GF_AUTH_ANONYMOUS_ENABLED: "true" + GF_AUTH_ANONYMOUS_ORG_ROLE: "Admin" + volumes: + - ./grafana/provisioning:/etc/grafana/provisioning:ro + - ./grafana/dashboards:/var/lib/grafana/dashboards:ro + ports: + - "3000:3000" + depends_on: [ prometheus, tempo ] diff --git a/examples/opentelemetry/example.py b/examples/opentelemetry/example.py new file mode 100644 index 00000000..797caff3 --- /dev/null +++ b/examples/opentelemetry/example.py @@ -0,0 +1,42 @@ +from opentelemetry import trace +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter +from opentelemetry.sdk.resources import Resource + +from random import randint + +resource = Resource(attributes={"service.name": "ydb-python-test"}) + +provider = TracerProvider(resource=resource) + +provider.add_span_processor( + BatchSpanProcessor(OTLPSpanExporter(endpoint="http://localhost:4317/v1/traces")) # или 4317 grpc +) +trace.set_tracer_provider(provider) + +tracer = trace.get_tracer(__name__) + +import ydb +from ydb.opentelemetry import enable_tracing + +enable_tracing() + +endpoint = "grpc://localhost:2136" +database = "/local" + +with ydb.Driver(endpoint=endpoint, database=database, credentials=ydb.default_credentials()) as driver: + driver.wait(timeout=5) + + with tracer.start_as_current_span("ydb-load-test"): + with ydb.QuerySessionPool(driver) as pool: + pool.execute_with_retries("CREATE TABLE IF NOT EXISTS example(key UInt64, value String, PRIMARY KEY (key))") + rand_value = randint(10000, 99999) + for i in range(rand_value, rand_value + 3): + val = f"value{i}" + pool.execute_with_retries(f"INSERT INTO example (key, value) VALUES ({i}, '{val}')") + + res = pool.execute_with_retries("SELECT * FROM example") + print(res.pop().rows) + +provider.shutdown() \ No newline at end of file diff --git a/examples/opentelemetry/grafana/dashboards/README.md b/examples/opentelemetry/grafana/dashboards/README.md new file mode 100644 index 00000000..eb47493a --- /dev/null +++ b/examples/opentelemetry/grafana/dashboards/README.md @@ -0,0 +1,5 @@ +This folder is intentionally left empty. + +Grafana is provisioned with Tempo + Prometheus datasources; use **Explore** to search traces. + + diff --git a/examples/opentelemetry/grafana/provisioning/dashboards/dashboards.yaml b/examples/opentelemetry/grafana/provisioning/dashboards/dashboards.yaml new file mode 100644 index 00000000..5ccefdc1 --- /dev/null +++ b/examples/opentelemetry/grafana/provisioning/dashboards/dashboards.yaml @@ -0,0 +1,13 @@ +apiVersion: 1 + +providers: + - name: 'default' + orgId: 1 + folder: '' + type: file + disableDeletion: true + editable: false + options: + path: /var/lib/grafana/dashboards + + diff --git a/examples/opentelemetry/grafana/provisioning/datasources/datasources.yaml b/examples/opentelemetry/grafana/provisioning/datasources/datasources.yaml new file mode 100644 index 00000000..05ba5bd9 --- /dev/null +++ b/examples/opentelemetry/grafana/provisioning/datasources/datasources.yaml @@ -0,0 +1,22 @@ +apiVersion: 1 + +datasources: + - name: Prometheus + type: prometheus + access: proxy + url: http://prometheus:9090 + isDefault: true + editable: false + + - name: Tempo + type: tempo + access: proxy + url: http://tempo:3200 + editable: false + jsonData: + tracesToMetrics: + datasourceUid: Prometheus + serviceMap: + datasourceUid: Prometheus + + diff --git a/examples/opentelemetry/otel-collector-config.yaml b/examples/opentelemetry/otel-collector-config.yaml new file mode 100644 index 00000000..7f784445 --- /dev/null +++ b/examples/opentelemetry/otel-collector-config.yaml @@ -0,0 +1,44 @@ +receivers: + otlp: + protocols: + grpc: + endpoint: 0.0.0.0:4317 + http: + endpoint: 0.0.0.0:4318 + +processors: + batch: { } + +exporters: + prometheus: + endpoint: 0.0.0.0:9464 + resource_to_telemetry_conversion: + enabled: true + + otlp/tempo: + endpoint: tempo:4317 + tls: + insecure: true + + debug: + verbosity: detailed + +extensions: + health_check: + endpoint: 0.0.0.0:13133 + + zpages: + endpoint: 0.0.0.0:55679 + +service: + extensions: [ health_check, zpages ] + pipelines: + metrics: + receivers: [ otlp ] + processors: [ batch ] + exporters: [ prometheus ] + + traces: + receivers: [ otlp ] + processors: [ batch ] + exporters: [ otlp/tempo, debug ] diff --git a/examples/opentelemetry/prometheus.yaml b/examples/opentelemetry/prometheus.yaml new file mode 100644 index 00000000..64b31821 --- /dev/null +++ b/examples/opentelemetry/prometheus.yaml @@ -0,0 +1,7 @@ +global: + scrape_interval: 5s + +scrape_configs: + - job_name: otel-collector + static_configs: + - targets: ["otel-collector:9464"] diff --git a/examples/opentelemetry/tempo.yaml b/examples/opentelemetry/tempo.yaml new file mode 100644 index 00000000..43dbb19c --- /dev/null +++ b/examples/opentelemetry/tempo.yaml @@ -0,0 +1,15 @@ +server: + http_listen_port: 3200 + +distributor: + receivers: + otlp: + protocols: + grpc: + endpoint: 0.0.0.0:4317 + +storage: + trace: + backend: local + local: + path: /tmp/tempo diff --git a/examples/opentelemetry/ydb_config/README.md b/examples/opentelemetry/ydb_config/README.md new file mode 100644 index 00000000..cbffaaba --- /dev/null +++ b/examples/opentelemetry/ydb_config/README.md @@ -0,0 +1,28 @@ +# YDB server-side tracing (OpenTelemetry) + +This folder is used to keep a **custom YDB config** that enables server-side OpenTelemetry tracing. + +## 1) Export the default config from a running container + +If YDB is running as `ydb-local`: + +```bash +docker cp ydb-local:/ydb_data/cluster/kikimr_configs/config.yaml ./ydb_config/ydb-config.yaml +``` + +## 2) Enable OpenTelemetry exporter in the config + +Edit `ydb-config.yaml` and add the contents of `otel-tracing-snippet.yaml` (usually as a top-level section). + +Default OTLP endpoint (inside docker-compose network): `grpc://otel-collector:4317` +Default service name (so you can find it in Tempo/Grafana): `ydb` + +## 3) Run with the overridden config + +Restart YDB (the main `compose-e2e.yaml` will automatically use `--config-path` if `ydb-config.yaml` exists): + +```bash +docker-compose -f compose-e2e.yaml up -d --force-recreate ydb +``` + +Now you should see additional server-side traces in Tempo/Grafana (service name defaults to `ydb-local` in the snippet). diff --git a/examples/opentelemetry/ydb_config/otel-tracing-snippet.yaml b/examples/opentelemetry/ydb_config/otel-tracing-snippet.yaml new file mode 100644 index 00000000..bd5978d2 --- /dev/null +++ b/examples/opentelemetry/ydb_config/otel-tracing-snippet.yaml @@ -0,0 +1,26 @@ +tracing_config: + backend: + opentelemetry: + collector_url: grpc://otel-collector:4317 + service_name: ydb + external_throttling: + - scope: + database: /local + max_traces_per_minute: 60 + max_traces_burst: 3 + # Highest tracing detail for *sampled* traces (YDB-generated trace-id). + # Note: requests with an external `traceparent` are traced at level 13 (Detailed) per YDB docs. + sampling: + - scope: + database: /local + fraction: 1 + level: 15 + max_traces_per_minute: 1000 + max_traces_burst: 100 + uploader: + max_exported_spans_per_second: 30 + max_spans_in_batch: 100 + max_bytes_in_batch: 10485760 # 10 MiB + max_export_requests_inflight: 3 + max_batch_accumulation_milliseconds: 5000 + span_export_timeout_seconds: 120 diff --git a/examples/opentelemetry/ydb_config/ydb-config-with-tracing.yaml b/examples/opentelemetry/ydb_config/ydb-config-with-tracing.yaml new file mode 100644 index 00000000..ef93d0e6 --- /dev/null +++ b/examples/opentelemetry/ydb_config/ydb-config-with-tracing.yaml @@ -0,0 +1,349 @@ +actor_system_config: + batch_executor: 2 + executor: + - name: System + spin_threshold: 0 + threads: 2 + type: BASIC + - name: User + spin_threshold: 0 + threads: 3 + type: BASIC + - name: Batch + spin_threshold: 0 + threads: 2 + type: BASIC + - name: IO + threads: 1 + time_per_mailbox_micro_secs: 100 + type: IO + - name: IC + spin_threshold: 10 + threads: 1 + time_per_mailbox_micro_secs: 100 + type: BASIC + io_executor: 3 + scheduler: + progress_threshold: 10000 + resolution: 1024 + spin_threshold: 0 + service_executor: + - executor_id: 4 + service_name: Interconnect + sys_executor: 0 + user_executor: 1 +blob_storage_config: + service_set: + availability_domains: 1 + groups: + - erasure_species: 0 + group_generation: 1 + group_id: 0 + rings: + - fail_domains: + - vdisk_locations: + - node_id: 1 + pdisk_guid: 1 + pdisk_id: 1 + vdisk_slot_id: 0 + pdisks: + - node_id: 1 + path: SectorMap:1:64 + pdisk_category: 0 + pdisk_guid: 1 + pdisk_id: 1 + vdisks: + - vdisk_id: + domain: 0 + group_generation: 1 + group_id: 0 + ring: 0 + vdisk: 0 + vdisk_location: + node_id: 1 + pdisk_guid: 1 + pdisk_id: 1 + vdisk_slot_id: 0 +channel_profile_config: + profile: + - channel: + - erasure_species: none + pdisk_category: 0 + storage_pool_kind: hdd + - erasure_species: none + pdisk_category: 0 + storage_pool_kind: hdd + - erasure_species: none + pdisk_category: 0 + storage_pool_kind: hdd + profile_id: 0 + - channel: + - erasure_species: none + pdisk_category: 0 + storage_pool_kind: hdd + - erasure_species: none + pdisk_category: 0 + storage_pool_kind: hdd + - erasure_species: none + pdisk_category: 0 + storage_pool_kind: hdd + - erasure_species: none + pdisk_category: 0 + storage_pool_kind: hdd + - erasure_species: none + pdisk_category: 0 + storage_pool_kind: hdd + - erasure_species: none + pdisk_category: 0 + storage_pool_kind: hdd + - erasure_species: none + pdisk_category: 0 + storage_pool_kind: hdd + profile_id: 1 +domains_config: + domain: + - domain_id: 1 + name: local + storage_pool_types: + - kind: hdd + pool_config: + box_id: 1 + erasure_species: none + kind: hdd + pdisk_filter: + - property: + - type: ROT + vdisk_kind: Default + - kind: hdd1 + pool_config: + box_id: 1 + erasure_species: none + kind: hdd + pdisk_filter: + - property: + - type: ROT + vdisk_kind: Default + - kind: hdd2 + pool_config: + box_id: 1 + erasure_species: none + kind: hdd + pdisk_filter: + - property: + - type: ROT + vdisk_kind: Default + - kind: hdde + pool_config: + box_id: 1 + encryption_mode: 1 + erasure_species: none + kind: hdd + pdisk_filter: + - property: + - type: ROT + vdisk_kind: Default + security_config: + default_users: + - name: root + password: '1234' + state_storage: + - ring: + nto_select: 1 + ring: + - node: + - 1 + use_ring_specific_node_selection: true + ssid: 1 +feature_flags: + enable_drain_on_shutdown: false + enable_mvcc_snapshot_reads: true + enable_persistent_query_stats: true + enable_public_api_external_blobs: false + enable_scheme_transactions_at_scheme_shard: true +federated_query_config: + audit: + enabled: false + uaconfig: + uri: '' + checkpoint_coordinator: + checkpointing_period_millis: 1000 + enabled: true + max_inflight: 1 + storage: + endpoint: '' + common: + ids_prefix: pt + use_bearer_for_ydb: true + control_plane_proxy: + enabled: true + request_timeout: 30s + control_plane_storage: + available_binding: + - DATA_STREAMS + - OBJECT_STORAGE + available_connection: + - YDB_DATABASE + - CLICKHOUSE_CLUSTER + - DATA_STREAMS + - OBJECT_STORAGE + - MONITORING + enabled: true + storage: + endpoint: '' + db_pool: + enabled: true + storage: + endpoint: '' + enabled: false + gateways: + dq: + default_settings: [] + enabled: true + pq: + cluster_mapping: [] + solomon: + cluster_mapping: [] + nodes_manager: + enabled: true + pending_fetcher: + enabled: true + pinger: + ping_period: 30s + private_api: + enabled: true + private_proxy: + enabled: true + resource_manager: + enabled: true + token_accessor: + enabled: true +grpc_config: + ca: /ydb_certs/ca.pem + cert: /ydb_certs/cert.pem + host: '[::]' + key: /ydb_certs/key.pem + services: + - nbs + - legacy + - tablet_service + - yql + - discovery + - cms + - locking + - kesus + - pq + - pqcd + - pqv1 + - topic + - datastreams + - scripting + - clickhouse_internal + - rate_limiter + - analytics + - export + - import + - yq + - keyvalue + - monitoring + - auth + - query_service + - view +interconnect_config: + start_tcp: true +kafka_proxy_config: + enable_kafka_proxy: true + listening_port: 9092 +kqpconfig: + settings: + - name: _ResultRowsLimit + value: '1000' +log_config: + default_level: 5 + entry: [] + sys_log: false +nameservice_config: + node: + - address: ::1 + host: localhost + node_id: 1 + port: 19001 + walle_location: + body: 1 + data_center: '1' + rack: '1' +net_classifier_config: + cms_config_timeout_seconds: 30 + net_data_file_path: /ydb_data/netData.tsv + updater_config: + net_data_update_interval_seconds: 60 + retry_interval_seconds: 30 +pqcluster_discovery_config: + enabled: false +pqconfig: + check_acl: false + cluster_table_path: '' + clusters_update_timeout_sec: 1 + enable_proto_source_id_info: true + enabled: true + max_storage_node_port: 65535 + meta_cache_timeout_sec: 1 + quoting_config: + enable_quoting: false + require_credentials_in_new_protocol: false + root: '' + topics_are_first_class_citizen: true + version_table_path: '' +sqs_config: + enable_dead_letter_queues: true + enable_sqs: false + force_queue_creation_v2: true + force_queue_deletion_v2: true + scheme_cache_hard_refresh_time_seconds: 0 + scheme_cache_soft_refresh_time_seconds: 0 +static_erasure: none +system_tablets: + default_node: + - 1 + flat_schemeshard: + - info: + tablet_id: 72057594046678944 + flat_tx_coordinator: + - node: + - 1 + tx_allocator: + - node: + - 1 + tx_mediator: + - node: + - 1 +table_service_config: + resource_manager: + channel_buffer_size: 262144 + mkql_heavy_program_memory_limit: 1048576 + mkql_light_program_memory_limit: 65536 + verbose_memory_limit_exception: true + sql_version: 1 +tracing_config: + backend: + opentelemetry: + collector_url: grpc://otel-collector:4317 + service_name: ydb + external_throttling: + - scope: + database: /local + max_traces_per_minute: 1000 + max_traces_burst: 100 + sampling: + - scope: + database: /local + fraction: 1.0 + level: 15 + max_traces_per_minute: 1000 +# max_traces_burst: 100 + uploader: + max_exported_spans_per_second: 30 + max_spans_in_batch: 100 + max_bytes_in_batch: 10485760 # 10 MiB + max_export_requests_inflight: 3 + max_batch_accumulation_milliseconds: 5000 + span_export_timeout_seconds: 120 diff --git a/setup.py b/setup.py index da56aebf..454fc467 100644 --- a/setup.py +++ b/setup.py @@ -37,5 +37,6 @@ options={"bdist_wheel": {"universal": True}}, extras_require={ "yc": ["yandexcloud", ], + "tracing": ["opentelemetry-api>=1.0.0", "opentelemetry-sdk>=1.0.0"], } ) diff --git a/ydb/connection.py b/ydb/connection.py index 85187f65..50f4571d 100644 --- a/ydb/connection.py +++ b/ydb/connection.py @@ -24,6 +24,7 @@ import grpc from . import issues, _apis, _utilities from . import default_pem +from .opentelemetry.tracing import get_trace_metadata _stubs_list = ( _apis.TableService.Stub, @@ -176,6 +177,9 @@ def _construct_metadata(driver_config, settings): metadata.extend(getattr(settings, "headers", [])) metadata.append(_utilities.x_ydb_sdk_build_info_header(getattr(driver_config, "_additional_sdk_headers", ()))) + + metadata.extend(get_trace_metadata()) + return metadata diff --git a/ydb/opentelemetry/__init__.py b/ydb/opentelemetry/__init__.py new file mode 100644 index 00000000..c732dbf4 --- /dev/null +++ b/ydb/opentelemetry/__init__.py @@ -0,0 +1,8 @@ +def enable_tracing(): + """Enable OpenTelemetry trace context propagation and span creation for all YDB gRPC calls.""" + from ydb.opentelemetry._plugin import _enable_tracing + + _enable_tracing() + + +__all__ = ["enable_tracing"] diff --git a/ydb/opentelemetry/_plugin.py b/ydb/opentelemetry/_plugin.py new file mode 100644 index 00000000..828e7000 --- /dev/null +++ b/ydb/opentelemetry/_plugin.py @@ -0,0 +1,96 @@ +from contextlib import contextmanager + +_MIN_OTEL_VERSION = "1.0.0" + +_tracer = None +_enabled = False + + +def _check_dependencies(): + try: + from opentelemetry.version import __version__ as otel_version + except ImportError: + raise ImportError( + "OpenTelemetry packages are required for tracing support. " + "Install them with: pip install ydb[tracing]" + ) from None + + from packaging.version import Version + + if Version(otel_version) < Version(_MIN_OTEL_VERSION): + raise ImportError( + f"OpenTelemetry >= {_MIN_OTEL_VERSION} is required, " + f"but {otel_version} is installed. " + "Upgrade with: pip install ydb[tracing]" + ) + + +def _otel_metadata_hook(): + """Injects W3C Trace Context (traceparent/tracestate) into gRPC metadata.""" + from opentelemetry.propagate import inject + + headers = {} + inject(headers) + return list(headers.items()) + + +@contextmanager +def _otel_span(name, attributes=None, kind=None): + from opentelemetry import trace + + kind_map = { + "client": trace.SpanKind.CLIENT, + "internal": trace.SpanKind.INTERNAL, + } + otel_kind = kind_map.get(kind, trace.SpanKind.CLIENT) + with _tracer.start_as_current_span( + name, + kind=otel_kind, + attributes=attributes or {}, + ) as span: + try: + yield span + except Exception as e: + _otel_set_error(span, e) + raise + + +def _otel_set_error(span, exception): + """Records an exception on the span and sets ERROR status.""" + if span is None: + return + + from opentelemetry.trace import StatusCode + from ydb import issues + + attrs = {} + if isinstance(exception, issues.Error): + status_code = getattr(exception, "status", None) + if status_code is not None: + attrs["db.response.status_code"] = str(status_code) + attrs["error.type"] = status_code.name + else: + attrs["error.type"] = type(exception).__qualname__ + else: + attrs["error.type"] = type(exception).__qualname__ + + span.set_attributes(attrs) + span.set_status(StatusCode.ERROR, str(exception)) + span.record_exception(exception) + + +def _enable_tracing(): + global _enabled, _tracer + + if _enabled: + return + + _check_dependencies() + + from opentelemetry import trace + from ydb.opentelemetry.tracing import _registry + + _tracer = trace.get_tracer("ydb.sdk") + _enabled = True + _registry.set_metadata_hook(_otel_metadata_hook) + _registry.set_span_factory(_otel_span) diff --git a/ydb/opentelemetry/tracing.py b/ydb/opentelemetry/tracing.py new file mode 100644 index 00000000..265eff94 --- /dev/null +++ b/ydb/opentelemetry/tracing.py @@ -0,0 +1,85 @@ +from contextlib import contextmanager + + +@contextmanager +def _noop_span(name, attributes=None, kind=None): + yield None + + +class OtelTracingRegistry: + """Singleton registry for OpenTelemetry tracing. + + Holds the span factory and metadata hook. + By default everything is no-op until :func:`enable_tracing` is called + from :mod:`ydb.opentelemetry`. + """ + + def __init__(self): + self._metadata_hook = None + self._span_factory = _noop_span + + def create_span(self, name, attributes=None, kind=None): + """Create a tracing span (context manager).""" + return self._span_factory(name, attributes, kind=kind) + + def get_trace_metadata(self): + """Return tracing metadata (e.g. W3C traceparent) for gRPC calls.""" + if self._metadata_hook is not None: + return self._metadata_hook() + return [] + + def set_metadata_hook(self, hook): + """Set a hook that returns tracing metadata for gRPC calls. + + *hook* must be a callable returning a list of ``(key, value)`` tuples. + """ + self._metadata_hook = hook + + def set_span_factory(self, factory): + """Set a span factory for tracing SDK operations. + + *factory* must be a context-manager factory: + ``factory(name, attributes, kind) -> context manager yielding span``. + """ + self._span_factory = factory + + +_registry = OtelTracingRegistry() + + + +def create_span(name, attributes=None, kind=None): + """Create a tracing span via the global registry.""" + return _registry.create_span(name, attributes, kind) + + +def get_trace_metadata(): + """Return tracing metadata for gRPC calls.""" + return _registry.get_trace_metadata() + + +def create_ydb_span(name, driver_config, session_id=None, node_id=None, tx_id=None, kind=None): + """Create a span pre-filled with standard YDB attributes. + + :param name: Span name (e.g. ``"ydb.ExecuteQuery"``). + :param driver_config: :class:`ydb.DriverConfig` instance. + :param session_id: Optional session ID. + :param node_id: Optional node ID. + :param tx_id: Optional transaction ID. + :param kind: Optional span kind (``"client"`` or ``"internal"``). + """ + endpoint = getattr(driver_config, "endpoint", None) or "" + host, _, port = endpoint.rpartition(":") + attrs = { + "db.system.name": "ydb", + "db.namespace": getattr(driver_config, "database", None) or "", + "server.address": host, + "server.port": int(port) if port.isdigit() else 0, + } + if session_id is not None: + attrs["ydb.session.id"] = session_id or "" + if node_id is not None: + attrs["ydb.node.id"] = node_id or 0 + if tx_id is not None: + attrs["ydb.tx.id"] = tx_id or "" + return _registry.create_span(name, attributes=attrs, kind=kind) diff --git a/ydb/pool.py b/ydb/pool.py index 1d1374e6..bf03a612 100644 --- a/ydb/pool.py +++ b/ydb/pool.py @@ -10,6 +10,7 @@ from typing import Any, Callable, ContextManager, List, Optional, Set, Tuple, TYPE_CHECKING from . import connection as connection_impl, issues, resolver, _utilities, tracing +from .opentelemetry.tracing import create_ydb_span from abc import abstractmethod from .connection import Connection, EndpointKey @@ -412,10 +413,11 @@ def wait(self, timeout: Optional[float] = None, fail_fast: bool = False) -> None :param timeout: A timeout to wait in seconds :return: None """ - if fail_fast: - self._store.add_fast_fail().result(timeout) - else: - self._store.subscribe().result(timeout) + with create_ydb_span("ydb.Driver.Initialize", self._driver_config, kind="internal"): + if fail_fast: + self._store.add_fast_fail().result(timeout) + else: + self._store.subscribe().result(timeout) def _on_disconnected(self, connection: Connection) -> None: """ diff --git a/ydb/query/session.py b/ydb/query/session.py index b21c6ba4..d9f379f1 100644 --- a/ydb/query/session.py +++ b/ydb/query/session.py @@ -18,6 +18,7 @@ from .base import QueryExplainResultFormat from .. import _apis, issues, _utilities +from ..opentelemetry.tracing import create_ydb_span from ..settings import BaseRequestSettings from ..connection import _RpcState as RpcState, EndpointKey from .._grpc.grpcwrapper import common_utils @@ -368,8 +369,9 @@ def create(self, settings: Optional[BaseRequestSettings] = None) -> "QuerySessio if self._closed: raise RuntimeError("Session is already closed.") - self._create_call(settings=settings) - self._attach() + with create_ydb_span("ydb.CreateSession", self._driver._driver_config): + self._create_call(settings=settings) + self._attach() return self @@ -435,30 +437,32 @@ def execute( """ self._check_session_ready_to_use() - stream_it = self._execute_call( - query=query, - parameters=parameters, - commit_tx=True, - syntax=syntax, - exec_mode=exec_mode, - stats_mode=stats_mode, - schema_inclusion_mode=schema_inclusion_mode, - result_set_format=result_set_format, - arrow_format_settings=arrow_format_settings, - concurrent_result_sets=concurrent_result_sets, - settings=settings, - ) + with create_ydb_span("ydb.ExecuteQuery", self._driver._driver_config, + session_id=self._session_id, node_id=self._node_id): + stream_it = self._execute_call( + query=query, + parameters=parameters, + commit_tx=True, + syntax=syntax, + exec_mode=exec_mode, + stats_mode=stats_mode, + schema_inclusion_mode=schema_inclusion_mode, + result_set_format=result_set_format, + arrow_format_settings=arrow_format_settings, + concurrent_result_sets=concurrent_result_sets, + settings=settings, + ) - return base.SyncResponseContextIterator( - stream_it, - lambda resp: base.wrap_execute_query_response( - rpc_state=None, - response_pb=resp, - session=self, - settings=self._settings, - ), - on_error=self._on_execute_stream_error, - ) + return base.SyncResponseContextIterator( + stream_it, + lambda resp: base.wrap_execute_query_response( + rpc_state=None, + response_pb=resp, + session=self, + settings=self._settings, + ), + on_error=self._on_execute_stream_error, + ) def explain( self, diff --git a/ydb/query/transaction.py b/ydb/query/transaction.py index 687a5eaf..8631ba52 100644 --- a/ydb/query/transaction.py +++ b/ydb/query/transaction.py @@ -17,6 +17,7 @@ _apis, issues, ) +from ..opentelemetry.tracing import create_ydb_span from .._grpc.grpcwrapper import ydb_topic as _ydb_topic from .._grpc.grpcwrapper import ydb_query as _ydb_query from ..connection import _RpcState as RpcState @@ -553,13 +554,17 @@ def commit(self, settings: Optional[BaseRequestSettings] = None) -> None: self._ensure_prev_stream_finished() - try: - self._execute_callbacks_sync(base.TxEvent.BEFORE_COMMIT) - self._commit_call(settings) - self._execute_callbacks_sync(base.TxEvent.AFTER_COMMIT, exc=None) - except BaseException as e: # TODO: probably should be less wide - self._execute_callbacks_sync(base.TxEvent.AFTER_COMMIT, exc=e) - raise e + with create_ydb_span("ydb.Commit", self._driver._driver_config, + session_id=self.session.session_id, + node_id=self.session.node_id, + tx_id=self._tx_state.tx_id): + try: + self._execute_callbacks_sync(base.TxEvent.BEFORE_COMMIT) + self._commit_call(settings) + self._execute_callbacks_sync(base.TxEvent.AFTER_COMMIT, exc=None) + except BaseException as e: # TODO: probably should be less wide + self._execute_callbacks_sync(base.TxEvent.AFTER_COMMIT, exc=e) + raise e def rollback(self, settings: Optional[BaseRequestSettings] = None) -> None: """Calls rollback on a transaction if it is open otherwise is no-op. If transaction execution @@ -579,13 +584,17 @@ def rollback(self, settings: Optional[BaseRequestSettings] = None) -> None: self._ensure_prev_stream_finished() - try: - self._execute_callbacks_sync(base.TxEvent.BEFORE_ROLLBACK) - self._rollback_call(settings) - self._execute_callbacks_sync(base.TxEvent.AFTER_ROLLBACK, exc=None) - except BaseException as e: # TODO: probably should be less wide - self._execute_callbacks_sync(base.TxEvent.AFTER_ROLLBACK, exc=e) - raise e + with create_ydb_span("ydb.Rollback", self._driver._driver_config, + session_id=self.session.session_id, + node_id=self.session.node_id, + tx_id=self._tx_state.tx_id): + try: + self._execute_callbacks_sync(base.TxEvent.BEFORE_ROLLBACK) + self._rollback_call(settings) + self._execute_callbacks_sync(base.TxEvent.AFTER_ROLLBACK, exc=None) + except BaseException as e: # TODO: probably should be less wide + self._execute_callbacks_sync(base.TxEvent.AFTER_ROLLBACK, exc=e) + raise e def execute( self, @@ -634,30 +643,34 @@ def execute( """ self._ensure_prev_stream_finished() - stream_it = self._execute_call( - query=query, - commit_tx=commit_tx, - syntax=syntax, - exec_mode=exec_mode, - stats_mode=stats_mode, - schema_inclusion_mode=schema_inclusion_mode, - result_set_format=result_set_format, - arrow_format_settings=arrow_format_settings, - parameters=parameters, - concurrent_result_sets=concurrent_result_sets, - settings=settings, - ) - - self._prev_stream = base.SyncResponseContextIterator( - stream_it, - lambda resp: base.wrap_execute_query_response( - rpc_state=None, - response_pb=resp, - session=self.session, - tx=self, + with create_ydb_span("ydb.ExecuteQuery", self._driver._driver_config, + session_id=self.session.session_id, + node_id=self.session.node_id, + tx_id=self._tx_state.tx_id): + stream_it = self._execute_call( + query=query, commit_tx=commit_tx, - settings=self.session._settings, - ), - on_error=self.session._on_execute_stream_error, - ) - return self._prev_stream + syntax=syntax, + exec_mode=exec_mode, + stats_mode=stats_mode, + schema_inclusion_mode=schema_inclusion_mode, + result_set_format=result_set_format, + arrow_format_settings=arrow_format_settings, + parameters=parameters, + concurrent_result_sets=concurrent_result_sets, + settings=settings, + ) + + self._prev_stream = base.SyncResponseContextIterator( + stream_it, + lambda resp: base.wrap_execute_query_response( + rpc_state=None, + response_pb=resp, + session=self.session, + tx=self, + commit_tx=commit_tx, + settings=self.session._settings, + ), + on_error=self.session._on_execute_stream_error, + ) + return self._prev_stream From 5998749c1647611aeaf4eaa07948782369f33d0d Mon Sep 17 00:00:00 2001 From: tewbo Date: Sat, 21 Mar 2026 17:53:37 +0300 Subject: [PATCH 2/8] + add async spans --- examples/opentelemetry/example_async.py | 48 ++++++++++ .../{example.py => example_sync.py} | 9 +- examples/opentelemetry/example_tx.py | 60 ++++++++++++ ydb/aio/connection.py | 4 + ydb/aio/pool.py | 4 +- ydb/aio/query/session.py | 54 ++++++----- ydb/aio/query/transaction.py | 93 +++++++++++-------- 7 files changed, 201 insertions(+), 71 deletions(-) create mode 100644 examples/opentelemetry/example_async.py rename examples/opentelemetry/{example.py => example_sync.py} (81%) create mode 100644 examples/opentelemetry/example_tx.py diff --git a/examples/opentelemetry/example_async.py b/examples/opentelemetry/example_async.py new file mode 100644 index 00000000..4e7aa87a --- /dev/null +++ b/examples/opentelemetry/example_async.py @@ -0,0 +1,48 @@ +import asyncio + +from opentelemetry import trace +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter +from opentelemetry.sdk.resources import Resource + +from random import randint + +resource = Resource(attributes={"service.name": "ydb-python-test-async"}) + +provider = TracerProvider(resource=resource) + +provider.add_span_processor( + BatchSpanProcessor(OTLPSpanExporter(endpoint="http://localhost:4317/v1/traces")) +) +trace.set_tracer_provider(provider) + +tracer = trace.get_tracer(__name__) + +import ydb +from ydb.opentelemetry import enable_tracing + +enable_tracing() + +endpoint = "grpc://localhost:2136" +database = "/local" + + +async def main(): + async with ydb.aio.Driver(endpoint=endpoint, database=database, credentials=ydb.default_credentials()) as driver: + await driver.wait(timeout=5) + + with tracer.start_as_current_span("ydb-load-test-async"): + async with ydb.aio.QuerySessionPool(driver) as pool: + await pool.execute_with_retries("CREATE TABLE IF NOT EXISTS example(key UInt64, value String, PRIMARY KEY (key))") + rand_value = randint(10000, 100000) + val = f"value{rand_value}" + await pool.execute_with_retries(f"INSERT INTO example (key, value) VALUES ({rand_value}, '{val}')") + + res = await pool.execute_with_retries("SELECT * FROM example") + print(res.pop().rows) + + +asyncio.run(main()) + +provider.shutdown() diff --git a/examples/opentelemetry/example.py b/examples/opentelemetry/example_sync.py similarity index 81% rename from examples/opentelemetry/example.py rename to examples/opentelemetry/example_sync.py index 797caff3..dc9f45e8 100644 --- a/examples/opentelemetry/example.py +++ b/examples/opentelemetry/example_sync.py @@ -11,7 +11,7 @@ provider = TracerProvider(resource=resource) provider.add_span_processor( - BatchSpanProcessor(OTLPSpanExporter(endpoint="http://localhost:4317/v1/traces")) # или 4317 grpc + BatchSpanProcessor(OTLPSpanExporter(endpoint="http://localhost:4317/v1/traces")) ) trace.set_tracer_provider(provider) @@ -31,10 +31,9 @@ with tracer.start_as_current_span("ydb-load-test"): with ydb.QuerySessionPool(driver) as pool: pool.execute_with_retries("CREATE TABLE IF NOT EXISTS example(key UInt64, value String, PRIMARY KEY (key))") - rand_value = randint(10000, 99999) - for i in range(rand_value, rand_value + 3): - val = f"value{i}" - pool.execute_with_retries(f"INSERT INTO example (key, value) VALUES ({i}, '{val}')") + rand_value = randint(10000, 100000) + val = f"value{rand_value}" + pool.execute_with_retries(f"INSERT INTO example (key, value) VALUES ({rand_value}, '{val}')") res = pool.execute_with_retries("SELECT * FROM example") print(res.pop().rows) diff --git a/examples/opentelemetry/example_tx.py b/examples/opentelemetry/example_tx.py new file mode 100644 index 00000000..38f74de0 --- /dev/null +++ b/examples/opentelemetry/example_tx.py @@ -0,0 +1,60 @@ +from opentelemetry import trace +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter +from opentelemetry.sdk.resources import Resource + +from random import randint + +resource = Resource(attributes={"service.name": "ydb-python-test-tx"}) + +provider = TracerProvider(resource=resource) + +provider.add_span_processor( + BatchSpanProcessor(OTLPSpanExporter(endpoint="http://localhost:4317/v1/traces")) +) +trace.set_tracer_provider(provider) + +tracer = trace.get_tracer(__name__) + +import ydb +from ydb.opentelemetry import enable_tracing + +enable_tracing() + +endpoint = "grpc://localhost:2136" +database = "/local" + +with ydb.Driver(endpoint=endpoint, database=database, credentials=ydb.default_credentials()) as driver: + driver.wait(timeout=5) + + with tracer.start_as_current_span("ydb-tx-test"): + pool = ydb.QuerySessionPool(driver) + + pool.execute_with_retries("CREATE TABLE IF NOT EXISTS example(key UInt64, value String, PRIMARY KEY (key))") + + # Commit example: insert a row inside an explicit transaction + with tracer.start_as_current_span("commit-flow"): + with pool.checkout() as session: + with session.transaction() as tx: + rand_value = randint(10000, 100000) + with tx.execute(f"INSERT INTO example (key, value) VALUES ({rand_value}, 'committed')") as _: + pass + tx.commit() + + # Rollback example: insert a row and then rollback + with tracer.start_as_current_span("rollback-flow"): + with pool.checkout() as session: + with session.transaction() as tx: + rand_value = randint(10000, 100000) + with tx.execute(f"INSERT INTO example (key, value) VALUES ({rand_value}, 'rolled_back')") as _: + pass + tx.rollback() + + # Verify: only the committed row should be present + res = pool.execute_with_retries("SELECT * FROM example ORDER BY key") + print(res.pop().rows) + + pool.stop() + +provider.shutdown() diff --git a/ydb/aio/connection.py b/ydb/aio/connection.py index 9e03450d..a0278c41 100644 --- a/ydb/aio/connection.py +++ b/ydb/aio/connection.py @@ -26,6 +26,7 @@ from ydb.driver import DriverConfig from ydb.settings import BaseRequestSettings from ydb import issues +from ydb.opentelemetry.tracing import get_trace_metadata # Workaround for good IDE and universal for runtime if TYPE_CHECKING: @@ -71,6 +72,9 @@ async def _construct_metadata( metadata.append((YDB_REQUEST_TYPE_HEADER, settings.request_type)) metadata.append(_utilities.x_ydb_sdk_build_info_header(getattr(driver_config, "_additional_sdk_headers", ()))) + + metadata.extend(get_trace_metadata()) + return metadata diff --git a/ydb/aio/pool.py b/ydb/aio/pool.py index 0e96602c..e3aafbe0 100644 --- a/ydb/aio/pool.py +++ b/ydb/aio/pool.py @@ -6,6 +6,7 @@ from typing import Any, Callable, Optional, Tuple, TYPE_CHECKING from ydb import issues +from ydb.opentelemetry.tracing import create_ydb_span from ydb.pool import ConnectionsCache as _ConnectionsCache, IConnectionPool from .connection import Connection, EndpointKey @@ -244,7 +245,8 @@ async def __wrapper__() -> None: return __wrapper__ async def wait(self, timeout: Optional[float] = 7.0, fail_fast: bool = False) -> None: # type: ignore[override] # async override of sync method - await self._store.get(fast_fail=fail_fast, wait_timeout=timeout if timeout is not None else 7.0) + with create_ydb_span("ydb.Driver.Initialize", self._driver_config, kind="internal"): + await self._store.get(fast_fail=fail_fast, wait_timeout=timeout if timeout is not None else 7.0) def discovery_debug_details(self) -> str: if self._discovery: diff --git a/ydb/aio/query/session.py b/ydb/aio/query/session.py index 67e62ff6..80a236df 100644 --- a/ydb/aio/query/session.py +++ b/ydb/aio/query/session.py @@ -19,6 +19,7 @@ from ...query import base from ...query.session import BaseQuerySession +from ...opentelemetry.tracing import create_ydb_span from ..._constants import DEFAULT_INITIAL_RESPONSE_TIMEOUT @@ -105,8 +106,9 @@ async def create(self, settings: Optional[BaseRequestSettings] = None) -> "Query if self._closed: raise RuntimeError("Session is already closed") - await self._create_call(settings=settings) - await self._attach() + with create_ydb_span("ydb.CreateSession", self._driver._driver_config): + await self._create_call(settings=settings) + await self._attach() return self @@ -159,30 +161,32 @@ async def execute( """ self._check_session_ready_to_use() - stream_it = await self._execute_call( - query=query, - parameters=parameters, - commit_tx=True, - syntax=syntax, - exec_mode=exec_mode, - stats_mode=stats_mode, - schema_inclusion_mode=schema_inclusion_mode, - result_set_format=result_set_format, - arrow_format_settings=arrow_format_settings, - concurrent_result_sets=concurrent_result_sets, - settings=settings, - ) + with create_ydb_span("ydb.ExecuteQuery", self._driver._driver_config, + session_id=self._session_id, node_id=self._node_id): + stream_it = await self._execute_call( + query=query, + parameters=parameters, + commit_tx=True, + syntax=syntax, + exec_mode=exec_mode, + stats_mode=stats_mode, + schema_inclusion_mode=schema_inclusion_mode, + result_set_format=result_set_format, + arrow_format_settings=arrow_format_settings, + concurrent_result_sets=concurrent_result_sets, + settings=settings, + ) - return AsyncResponseContextIterator( - it=stream_it, - wrapper=lambda resp: base.wrap_execute_query_response( - rpc_state=None, - response_pb=resp, - session=self, - settings=self._settings, - ), - on_error=self._on_execute_stream_error, - ) + return AsyncResponseContextIterator( + it=stream_it, + wrapper=lambda resp: base.wrap_execute_query_response( + rpc_state=None, + response_pb=resp, + session=self, + settings=self._settings, + ), + on_error=self._on_execute_stream_error, + ) async def explain( self, diff --git a/ydb/aio/query/transaction.py b/ydb/aio/query/transaction.py index 69c77478..746831a4 100644 --- a/ydb/aio/query/transaction.py +++ b/ydb/aio/query/transaction.py @@ -12,6 +12,7 @@ BaseQueryTxContext, QueryTxStateEnum, ) +from ...opentelemetry.tracing import create_ydb_span if TYPE_CHECKING: from .session import QuerySession @@ -106,13 +107,17 @@ async def commit(self, settings: Optional[BaseRequestSettings] = None) -> None: await self._ensure_prev_stream_finished() - try: - await self._execute_callbacks_async(base.TxEvent.BEFORE_COMMIT) - await self._commit_call(settings) - await self._execute_callbacks_async(base.TxEvent.AFTER_COMMIT, exc=None) - except BaseException as e: - await self._execute_callbacks_async(base.TxEvent.AFTER_COMMIT, exc=e) - raise e + with create_ydb_span("ydb.Commit", self._driver._driver_config, + session_id=self.session.session_id, + node_id=self.session.node_id, + tx_id=self._tx_state.tx_id): + try: + await self._execute_callbacks_async(base.TxEvent.BEFORE_COMMIT) + await self._commit_call(settings) + await self._execute_callbacks_async(base.TxEvent.AFTER_COMMIT, exc=None) + except BaseException as e: + await self._execute_callbacks_async(base.TxEvent.AFTER_COMMIT, exc=e) + raise e async def rollback(self, settings: Optional[BaseRequestSettings] = None) -> None: """Calls rollback on a transaction if it is open otherwise is no-op. If transaction execution @@ -133,13 +138,17 @@ async def rollback(self, settings: Optional[BaseRequestSettings] = None) -> None await self._ensure_prev_stream_finished() - try: - await self._execute_callbacks_async(base.TxEvent.BEFORE_ROLLBACK) - await self._rollback_call(settings) - await self._execute_callbacks_async(base.TxEvent.AFTER_ROLLBACK, exc=None) - except BaseException as e: - await self._execute_callbacks_async(base.TxEvent.AFTER_ROLLBACK, exc=e) - raise e + with create_ydb_span("ydb.Rollback", self._driver._driver_config, + session_id=self.session.session_id, + node_id=self.session.node_id, + tx_id=self._tx_state.tx_id): + try: + await self._execute_callbacks_async(base.TxEvent.BEFORE_ROLLBACK) + await self._rollback_call(settings) + await self._execute_callbacks_async(base.TxEvent.AFTER_ROLLBACK, exc=None) + except BaseException as e: + await self._execute_callbacks_async(base.TxEvent.AFTER_ROLLBACK, exc=e) + raise e async def execute( self, @@ -187,30 +196,34 @@ async def execute( """ await self._ensure_prev_stream_finished() - stream_it = await self._execute_call( - query=query, - parameters=parameters, - commit_tx=commit_tx, - syntax=syntax, - exec_mode=exec_mode, - stats_mode=stats_mode, - schema_inclusion_mode=schema_inclusion_mode, - result_set_format=result_set_format, - arrow_format_settings=arrow_format_settings, - concurrent_result_sets=concurrent_result_sets, - settings=settings, - ) - - self._prev_stream = AsyncResponseContextIterator( - it=stream_it, - wrapper=lambda resp: base.wrap_execute_query_response( - rpc_state=None, - response_pb=resp, - session=self.session, - tx=self, + with create_ydb_span("ydb.ExecuteQuery", self._driver._driver_config, + session_id=self.session.session_id, + node_id=self.session.node_id, + tx_id=self._tx_state.tx_id): + stream_it = await self._execute_call( + query=query, + parameters=parameters, commit_tx=commit_tx, - settings=self.session._settings, - ), - on_error=self.session._on_execute_stream_error, - ) - return self._prev_stream + syntax=syntax, + exec_mode=exec_mode, + stats_mode=stats_mode, + schema_inclusion_mode=schema_inclusion_mode, + result_set_format=result_set_format, + arrow_format_settings=arrow_format_settings, + concurrent_result_sets=concurrent_result_sets, + settings=settings, + ) + + self._prev_stream = AsyncResponseContextIterator( + it=stream_it, + wrapper=lambda resp: base.wrap_execute_query_response( + rpc_state=None, + response_pb=resp, + session=self.session, + tx=self, + commit_tx=commit_tx, + settings=self.session._settings, + ), + on_error=self.session._on_execute_stream_error, + ) + return self._prev_stream From db012124823276b05946908d8dd1e650d6857520 Mon Sep 17 00:00:00 2001 From: tewbo Date: Tue, 24 Mar 2026 07:47:18 +0300 Subject: [PATCH 3/8] + test and refactor --- examples/opentelemetry/example.py | 65 ++++++ examples/opentelemetry/example_async.py | 48 ---- examples/opentelemetry/example_sync.py | 41 ---- examples/opentelemetry/example_tx.py | 60 ----- tests/tracing/__init__.py | 0 tests/tracing/conftest.py | 55 +++++ tests/tracing/test_tracing_async.py | 243 ++++++++++++++++++++ tests/tracing/test_tracing_sync.py | 281 ++++++++++++++++++++++++ ydb/aio/query/base.py | 18 +- ydb/aio/query/session.py | 13 +- ydb/aio/query/transaction.py | 41 +++- ydb/opentelemetry/__init__.py | 8 +- ydb/opentelemetry/_plugin.py | 122 +++++----- ydb/opentelemetry/tracing.py | 69 +++--- ydb/query/base.py | 18 +- ydb/query/session.py | 13 +- ydb/query/transaction.py | 41 +++- 17 files changed, 853 insertions(+), 283 deletions(-) create mode 100644 examples/opentelemetry/example.py delete mode 100644 examples/opentelemetry/example_async.py delete mode 100644 examples/opentelemetry/example_sync.py delete mode 100644 examples/opentelemetry/example_tx.py create mode 100644 tests/tracing/__init__.py create mode 100644 tests/tracing/conftest.py create mode 100644 tests/tracing/test_tracing_async.py create mode 100644 tests/tracing/test_tracing_sync.py diff --git a/examples/opentelemetry/example.py b/examples/opentelemetry/example.py new file mode 100644 index 00000000..fad3111a --- /dev/null +++ b/examples/opentelemetry/example.py @@ -0,0 +1,65 @@ +"""Minimal example: OpenTelemetry tracing for YDB Python SDK.""" + +import asyncio + +from opentelemetry import trace +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import BatchSpanProcessor +from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter +from opentelemetry.sdk.resources import Resource + +import ydb +from ydb.opentelemetry import enable_tracing + +resource = Resource(attributes={"service.name": "ydb-example"}) +provider = TracerProvider(resource=resource) +provider.add_span_processor(BatchSpanProcessor(OTLPSpanExporter(endpoint="http://localhost:4317"))) +trace.set_tracer_provider(provider) + +enable_tracing() + +tracer = trace.get_tracer(__name__) + +ENDPOINT = "grpc://localhost:2136" +DATABASE = "/local" + + +def sync_example(): + """Sync: session execute and transaction execute + commit.""" + with ydb.Driver(endpoint=ENDPOINT, database=DATABASE) as driver: + driver.wait(timeout=5) + + with ydb.QuerySessionPool(driver) as pool: + with tracer.start_as_current_span("sync-example"): + pool.execute_with_retries("SELECT 1") + + def tx_callee(session): + with session.transaction() as tx: + list(tx.execute("SELECT 1")) + tx.commit() + + pool.retry_operation_sync(tx_callee) + + +async def async_example(): + """Async: session execute and transaction execute + commit.""" + async with ydb.aio.Driver(endpoint=ENDPOINT, database=DATABASE) as driver: + await driver.wait(timeout=5) + + async with ydb.aio.QuerySessionPool(driver) as pool: + with tracer.start_as_current_span("async-example"): + await pool.execute_with_retries("SELECT 1") + + async def tx_callee(session): + async with session.transaction() as tx: + result = await tx.execute("SELECT 1") + async for _ in result: + pass + await tx.commit() + + await pool.retry_operation_async(tx_callee) + +sync_example() +asyncio.run(async_example()) + +provider.shutdown() diff --git a/examples/opentelemetry/example_async.py b/examples/opentelemetry/example_async.py deleted file mode 100644 index 4e7aa87a..00000000 --- a/examples/opentelemetry/example_async.py +++ /dev/null @@ -1,48 +0,0 @@ -import asyncio - -from opentelemetry import trace -from opentelemetry.sdk.trace import TracerProvider -from opentelemetry.sdk.trace.export import BatchSpanProcessor -from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter -from opentelemetry.sdk.resources import Resource - -from random import randint - -resource = Resource(attributes={"service.name": "ydb-python-test-async"}) - -provider = TracerProvider(resource=resource) - -provider.add_span_processor( - BatchSpanProcessor(OTLPSpanExporter(endpoint="http://localhost:4317/v1/traces")) -) -trace.set_tracer_provider(provider) - -tracer = trace.get_tracer(__name__) - -import ydb -from ydb.opentelemetry import enable_tracing - -enable_tracing() - -endpoint = "grpc://localhost:2136" -database = "/local" - - -async def main(): - async with ydb.aio.Driver(endpoint=endpoint, database=database, credentials=ydb.default_credentials()) as driver: - await driver.wait(timeout=5) - - with tracer.start_as_current_span("ydb-load-test-async"): - async with ydb.aio.QuerySessionPool(driver) as pool: - await pool.execute_with_retries("CREATE TABLE IF NOT EXISTS example(key UInt64, value String, PRIMARY KEY (key))") - rand_value = randint(10000, 100000) - val = f"value{rand_value}" - await pool.execute_with_retries(f"INSERT INTO example (key, value) VALUES ({rand_value}, '{val}')") - - res = await pool.execute_with_retries("SELECT * FROM example") - print(res.pop().rows) - - -asyncio.run(main()) - -provider.shutdown() diff --git a/examples/opentelemetry/example_sync.py b/examples/opentelemetry/example_sync.py deleted file mode 100644 index dc9f45e8..00000000 --- a/examples/opentelemetry/example_sync.py +++ /dev/null @@ -1,41 +0,0 @@ -from opentelemetry import trace -from opentelemetry.sdk.trace import TracerProvider -from opentelemetry.sdk.trace.export import BatchSpanProcessor -from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter -from opentelemetry.sdk.resources import Resource - -from random import randint - -resource = Resource(attributes={"service.name": "ydb-python-test"}) - -provider = TracerProvider(resource=resource) - -provider.add_span_processor( - BatchSpanProcessor(OTLPSpanExporter(endpoint="http://localhost:4317/v1/traces")) -) -trace.set_tracer_provider(provider) - -tracer = trace.get_tracer(__name__) - -import ydb -from ydb.opentelemetry import enable_tracing - -enable_tracing() - -endpoint = "grpc://localhost:2136" -database = "/local" - -with ydb.Driver(endpoint=endpoint, database=database, credentials=ydb.default_credentials()) as driver: - driver.wait(timeout=5) - - with tracer.start_as_current_span("ydb-load-test"): - with ydb.QuerySessionPool(driver) as pool: - pool.execute_with_retries("CREATE TABLE IF NOT EXISTS example(key UInt64, value String, PRIMARY KEY (key))") - rand_value = randint(10000, 100000) - val = f"value{rand_value}" - pool.execute_with_retries(f"INSERT INTO example (key, value) VALUES ({rand_value}, '{val}')") - - res = pool.execute_with_retries("SELECT * FROM example") - print(res.pop().rows) - -provider.shutdown() \ No newline at end of file diff --git a/examples/opentelemetry/example_tx.py b/examples/opentelemetry/example_tx.py deleted file mode 100644 index 38f74de0..00000000 --- a/examples/opentelemetry/example_tx.py +++ /dev/null @@ -1,60 +0,0 @@ -from opentelemetry import trace -from opentelemetry.sdk.trace import TracerProvider -from opentelemetry.sdk.trace.export import BatchSpanProcessor -from opentelemetry.exporter.otlp.proto.grpc.trace_exporter import OTLPSpanExporter -from opentelemetry.sdk.resources import Resource - -from random import randint - -resource = Resource(attributes={"service.name": "ydb-python-test-tx"}) - -provider = TracerProvider(resource=resource) - -provider.add_span_processor( - BatchSpanProcessor(OTLPSpanExporter(endpoint="http://localhost:4317/v1/traces")) -) -trace.set_tracer_provider(provider) - -tracer = trace.get_tracer(__name__) - -import ydb -from ydb.opentelemetry import enable_tracing - -enable_tracing() - -endpoint = "grpc://localhost:2136" -database = "/local" - -with ydb.Driver(endpoint=endpoint, database=database, credentials=ydb.default_credentials()) as driver: - driver.wait(timeout=5) - - with tracer.start_as_current_span("ydb-tx-test"): - pool = ydb.QuerySessionPool(driver) - - pool.execute_with_retries("CREATE TABLE IF NOT EXISTS example(key UInt64, value String, PRIMARY KEY (key))") - - # Commit example: insert a row inside an explicit transaction - with tracer.start_as_current_span("commit-flow"): - with pool.checkout() as session: - with session.transaction() as tx: - rand_value = randint(10000, 100000) - with tx.execute(f"INSERT INTO example (key, value) VALUES ({rand_value}, 'committed')") as _: - pass - tx.commit() - - # Rollback example: insert a row and then rollback - with tracer.start_as_current_span("rollback-flow"): - with pool.checkout() as session: - with session.transaction() as tx: - rand_value = randint(10000, 100000) - with tx.execute(f"INSERT INTO example (key, value) VALUES ({rand_value}, 'rolled_back')") as _: - pass - tx.rollback() - - # Verify: only the committed row should be present - res = pool.execute_with_retries("SELECT * FROM example ORDER BY key") - print(res.pop().rows) - - pool.stop() - -provider.shutdown() diff --git a/tests/tracing/__init__.py b/tests/tracing/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/tests/tracing/conftest.py b/tests/tracing/conftest.py new file mode 100644 index 00000000..b54ffd05 --- /dev/null +++ b/tests/tracing/conftest.py @@ -0,0 +1,55 @@ +"""Shared fixtures for OpenTelemetry tracing tests. + +Sets up an in-memory TracerProvider so that spans created by the SDK +can be collected and inspected without any external backend. +""" + +import pytest + +from opentelemetry import trace +from opentelemetry.sdk.trace import TracerProvider +from opentelemetry.sdk.trace.export import SimpleSpanProcessor +from opentelemetry.sdk.trace.export.in_memory_span_exporter import InMemorySpanExporter + +from ydb.opentelemetry.tracing import _registry + + +_provider = TracerProvider() +_exporter = InMemorySpanExporter() +_provider.add_span_processor(SimpleSpanProcessor(_exporter)) +trace.set_tracer_provider(_provider) + + +@pytest.fixture() +def otel_setup(): + """Enable SDK tracing, yield the exporter, then restore noop defaults. + + Each test gets a clean exporter (cleared before and after). + """ + import ydb.opentelemetry._plugin as _plugin + + _exporter.clear() + + _plugin._enabled = False + _plugin._tracer = None + + from ydb.opentelemetry import enable_tracing + + enable_tracing() + + yield _exporter + + # Restore noop state + _registry.set_create_span(None) + _registry.set_metadata_hook(None) + _plugin._enabled = False + _plugin._tracer = None + _exporter.clear() + + +class FakeDriverConfig: + def __init__(self, endpoint="test_endpoint:1337", database="/test_database"): + self.endpoint = endpoint + self.database = database + self.query_client_settings = None + self.tracer = None diff --git a/tests/tracing/test_tracing_async.py b/tests/tracing/test_tracing_async.py new file mode 100644 index 00000000..08bcddba --- /dev/null +++ b/tests/tracing/test_tracing_async.py @@ -0,0 +1,243 @@ +"""Unit tests for OpenTelemetry tracing — asynchronous SDK operations. + +Mirrors the sync tests but exercises the async code paths in ydb.aio.query. +""" + +from opentelemetry.trace import StatusCode, SpanKind +from ydb.query.transaction import QueryTxStateEnum +from .conftest import FakeDriverConfig +from unittest.mock import AsyncMock, MagicMock, patch + +import asyncio +import pytest + +async def _empty_async_iter(): + return + yield # noqa: makes this an async generator + + +def _get_spans(exporter, name=None): + spans = exporter.get_finished_spans() + if name is not None: + spans = [s for s in spans if s.name == name] + return spans + + +def _get_single_span(exporter, name): + spans = _get_spans(exporter, name) + assert len(spans) == 1, f"Expected 1 span named '{name}', got {len(spans)}: {[s.name for s in exporter.get_finished_spans()]}" + return spans[0] + + +def _make_async_session_mock(driver_config=None): + """Create a mock that behaves like an async QuerySession after create().""" + cfg = driver_config or FakeDriverConfig() + driver = MagicMock() + driver._driver_config = cfg + + session = MagicMock() + session._driver = driver + session._session_id = "test-session-id" + session._node_id = 12345 + session.session_id = "test-session-id" + session.node_id = 12345 + return session, driver + + +def _make_async_tx(session, driver): + """Create a real async QueryTxContext wired to mocked session/driver.""" + from ydb._grpc.grpcwrapper.ydb_query_public_types import QuerySerializableReadWrite + from ydb.aio.query.transaction import QueryTxContext + + tx = QueryTxContext(driver, session, QuerySerializableReadWrite()) + tx._tx_state._change_state(QueryTxStateEnum.BEGINED) + tx._tx_state.tx_id = "test-tx-id" + return tx + + +class TestAsyncCreateSessionSpan: + @pytest.mark.asyncio + async def test_create_session_emits_span(self, otel_setup): + exporter = otel_setup + + from ydb.aio.query.session import QuerySession + + qs = QuerySession.__new__(QuerySession) + cfg = FakeDriverConfig() + driver = MagicMock() + driver._driver_config = cfg + qs._driver = driver + qs._session_id = None + qs._closed = False + + with patch.object(QuerySession, "_create_call", new_callable=AsyncMock): + with patch.object(QuerySession, "_attach", new_callable=AsyncMock): + await qs.create() + + span = _get_single_span(exporter, "ydb.CreateSession") + assert span.kind == SpanKind.CLIENT + attrs = dict(span.attributes) + assert attrs["db.system.name"] == "ydb" + assert attrs["db.namespace"] == "/test_database" + assert attrs["server.address"] == "test_endpoint" + assert attrs["server.port"] == 1337 + + +class TestAsyncExecuteQuerySpan: + @pytest.mark.asyncio + async def test_session_execute_emits_span(self, otel_setup): + exporter = otel_setup + + from ydb.aio.query.session import QuerySession + + qs = QuerySession.__new__(QuerySession) + cfg = FakeDriverConfig() + driver = MagicMock() + driver._driver_config = cfg + qs._driver = driver + qs._session_id = "test-session-id" + qs._node_id = 12345 + qs._closed = False + + fake_stream = _empty_async_iter() + with patch.object(QuerySession, "_execute_call", new_callable=AsyncMock, return_value=fake_stream): + result = await qs.execute("SELECT 1;") + async for _ in result: + pass + + span = _get_single_span(exporter, "ydb.ExecuteQuery") + attrs = dict(span.attributes) + assert attrs["ydb.session.id"] == "test-session-id" + assert attrs["ydb.node.id"] == 12345 + + @pytest.mark.asyncio + async def test_tx_execute_emits_span_with_tx_id(self, otel_setup): + exporter = otel_setup + session, driver = _make_async_session_mock() + tx = _make_async_tx(session, driver) + + fake_stream = _empty_async_iter() + with patch.object(type(tx), "_execute_call", new_callable=AsyncMock, return_value=fake_stream): + tx._prev_stream = None + result = await tx.execute("SELECT 1;") + async for _ in result: + pass + + span = _get_single_span(exporter, "ydb.ExecuteQuery") + attrs = dict(span.attributes) + assert attrs["ydb.tx.id"] == "test-tx-id" + assert attrs["ydb.session.id"] == "test-session-id" + assert attrs["ydb.node.id"] == 12345 + + +class TestAsyncCommitSpan: + @pytest.mark.asyncio + async def test_commit_emits_span(self, otel_setup): + exporter = otel_setup + session, driver = _make_async_session_mock() + tx = _make_async_tx(session, driver) + + with patch.object(type(tx), "_commit_call", new_callable=AsyncMock): + await tx.commit() + + span = _get_single_span(exporter, "ydb.Commit") + assert span.kind == SpanKind.CLIENT + attrs = dict(span.attributes) + assert attrs["ydb.tx.id"] == "test-tx-id" + assert attrs["ydb.session.id"] == "test-session-id" + + +class TestAsyncRollbackSpan: + @pytest.mark.asyncio + async def test_rollback_emits_span(self, otel_setup): + exporter = otel_setup + session, driver = _make_async_session_mock() + tx = _make_async_tx(session, driver) + + with patch.object(type(tx), "_rollback_call", new_callable=AsyncMock): + await tx.rollback() + + span = _get_single_span(exporter, "ydb.Rollback") + assert span.kind == SpanKind.CLIENT + attrs = dict(span.attributes) + assert attrs["ydb.tx.id"] == "test-tx-id" + assert attrs["ydb.session.id"] == "test-session-id" + + +class TestAsyncErrorHandling: + @pytest.mark.asyncio + async def test_error_sets_error_status(self, otel_setup): + exporter = otel_setup + + from ydb import issues + + class FakeStatus: + name = "SCHEME_ERROR" + + exc = issues.SchemeError("Table not found") + exc.status = FakeStatus() + + from ydb.aio.query.session import QuerySession + + qs = QuerySession.__new__(QuerySession) + cfg = FakeDriverConfig() + driver = MagicMock() + driver._driver_config = cfg + qs._driver = driver + qs._session_id = "test-session-id" + qs._node_id = 12345 + qs._closed = False + + with patch.object(QuerySession, "_execute_call", new_callable=AsyncMock, side_effect=exc): + with pytest.raises(issues.SchemeError): + await qs.execute("SELECT * FROM non_existing_table") + + span = _get_single_span(exporter, "ydb.ExecuteQuery") + assert span.status.status_code == StatusCode.ERROR + attrs = dict(span.attributes) + assert attrs["error.type"] == "SCHEME_ERROR" + assert len(span.events) > 0 + + +class TestAsyncConcurrentSpansIsolation: + @pytest.mark.asyncio + async def test_parallel_executes_do_not_become_parent_child(self, otel_setup): + """Two concurrent execute calls must produce sibling spans, not parent-child.""" + exporter = otel_setup + + from ydb.aio.query.session import QuerySession + + async def _slow_async_iter(): + await asyncio.sleep(0.5) + return + yield # noqa + + def _make_session(): + qs = QuerySession.__new__(QuerySession) + cfg = FakeDriverConfig() + driver = MagicMock() + driver._driver_config = cfg + qs._driver = driver + qs._session_id = "test-session-id" + qs._node_id = 1 + qs._closed = False + return qs + + async def do_execute(qs): + fake_stream = _slow_async_iter() + with patch.object(QuerySession, "_execute_call", new_callable=AsyncMock, return_value=fake_stream): + result = await qs.execute("SELECT 1") + async for _ in result: + pass + + qs1 = _make_session() + qs2 = _make_session() + await asyncio.gather(do_execute(qs1), do_execute(qs2)) + + spans = _get_spans(exporter, "ydb.ExecuteQuery") + assert len(spans) == 2 + + ids = {s.context.span_id for s in spans} + for s in spans: + if s.parent is not None: + assert s.parent.span_id not in ids, "Concurrent spans must be siblings, not parent-child" diff --git a/tests/tracing/test_tracing_sync.py b/tests/tracing/test_tracing_sync.py new file mode 100644 index 00000000..1bc6f835 --- /dev/null +++ b/tests/tracing/test_tracing_sync.py @@ -0,0 +1,281 @@ +"""Unit tests for OpenTelemetry tracing — synchronous SDK operations. + +Uses an in-memory span exporter to verify that correct spans, attributes, +parent-child relationships, and error handling are produced by the SDK. +No real YDB connection is needed. +""" + +from unittest.mock import MagicMock, patch +from opentelemetry import trace +from opentelemetry.trace import StatusCode, SpanKind +from ydb.opentelemetry.tracing import _registry, create_ydb_span +from ydb.query.transaction import QueryTxStateEnum +from .conftest import FakeDriverConfig + +import pytest + +def _get_spans(exporter, name=None): + spans = exporter.get_finished_spans() + if name is not None: + spans = [s for s in spans if s.name == name] + return spans + + +def _get_single_span(exporter, name): + spans = _get_spans(exporter, name) + assert len(spans) == 1, f"Expected 1 span named '{name}', got {len(spans)}: {[s.name for s in exporter.get_finished_spans()]}" + return spans[0] + + +def _make_session_mock(driver_config=None): + """Create a mock that behaves like a sync QuerySession after create().""" + cfg = driver_config or FakeDriverConfig() + driver = MagicMock() + driver._driver_config = cfg + + session = MagicMock() + session._driver = driver + session._session_id = "test-session-id" + session._node_id = 12345 + session.session_id = "test-session-id" + session.node_id = 12345 + return session, driver + + +def _make_tx(session, driver): + """Create a real QueryTxContext wired to mocked session/driver.""" + from ydb._grpc.grpcwrapper.ydb_query_public_types import QuerySerializableReadWrite + from ydb.query.transaction import QueryTxContext + + tx = QueryTxContext(driver, session, QuerySerializableReadWrite()) + # Simulate that the transaction has been started (so commit/rollback create spans) + tx._tx_state._change_state(QueryTxStateEnum.BEGINED) + tx._tx_state.tx_id = "test-tx-id" + return tx + + +class TestCreateSessionSpan: + def test_create_session_emits_span(self, otel_setup): + exporter = otel_setup + + from ydb.query.session import QuerySession + + qs = QuerySession.__new__(QuerySession) + cfg = FakeDriverConfig() + driver = MagicMock() + driver._driver_config = cfg + qs._driver = driver + qs._session_id = None + qs._closed = False + + with patch.object(QuerySession, "_create_call", return_value=None): + with patch.object(QuerySession, "_attach", return_value=None): + qs.create() + + span = _get_single_span(exporter, "ydb.CreateSession") + assert span.kind == SpanKind.CLIENT + attrs = dict(span.attributes) + assert attrs["db.system.name"] == "ydb" + assert attrs["db.namespace"] == "/test_database" + assert attrs["server.address"] == "test_endpoint" + assert attrs["server.port"] == 1337 + assert span.status.status_code == StatusCode.UNSET + + +class TestExecuteQuerySpan: + def test_session_execute_emits_span(self, otel_setup): + exporter = otel_setup + + from ydb.query.session import QuerySession + + qs = QuerySession.__new__(QuerySession) + cfg = FakeDriverConfig() + driver = MagicMock() + driver._driver_config = cfg + qs._driver = driver + qs._session_id = "test-session-id" + qs._node_id = 12345 + qs._closed = False + + fake_stream = iter([]) # empty stream that raises StopIteration immediately + with patch.object(QuerySession, "_execute_call", return_value=fake_stream): + result = qs.execute("SELECT 1;") + # Consume the iterator to finish the span + list(result) + + span = _get_single_span(exporter, "ydb.ExecuteQuery") + assert span.kind == SpanKind.CLIENT + attrs = dict(span.attributes) + assert attrs["db.system.name"] == "ydb" + assert attrs["db.namespace"] == "/test_database" + assert attrs["server.address"] == "test_endpoint" + assert attrs["server.port"] == 1337 + assert attrs["ydb.session.id"] == "test-session-id" + assert attrs["ydb.node.id"] == 12345 + + def test_tx_execute_emits_span_with_tx_id(self, otel_setup): + exporter = otel_setup + session, driver = _make_session_mock() + tx = _make_tx(session, driver) + + fake_stream = iter([]) + with patch.object(type(tx), "_execute_call", return_value=fake_stream): + tx._prev_stream = None + result = tx.execute("SELECT 1;") + list(result) + + span = _get_single_span(exporter, "ydb.ExecuteQuery") + attrs = dict(span.attributes) + assert attrs["ydb.tx.id"] == "test-tx-id" + assert attrs["ydb.session.id"] == "test-session-id" + assert attrs["ydb.node.id"] == 12345 + + +class TestCommitSpan: + def test_commit_emits_span(self, otel_setup): + exporter = otel_setup + session, driver = _make_session_mock() + tx = _make_tx(session, driver) + + with patch.object(type(tx), "_commit_call", return_value=None): + tx.commit() + + span = _get_single_span(exporter, "ydb.Commit") + assert span.kind == SpanKind.CLIENT + attrs = dict(span.attributes) + assert attrs["db.system.name"] == "ydb" + assert attrs["ydb.tx.id"] == "test-tx-id" + assert attrs["ydb.session.id"] == "test-session-id" + assert attrs["ydb.node.id"] == 12345 + + +class TestRollbackSpan: + def test_rollback_emits_span(self, otel_setup): + exporter = otel_setup + session, driver = _make_session_mock() + tx = _make_tx(session, driver) + + with patch.object(type(tx), "_rollback_call", return_value=None): + tx.rollback() + + span = _get_single_span(exporter, "ydb.Rollback") + assert span.kind == SpanKind.CLIENT + attrs = dict(span.attributes) + assert attrs["db.system.name"] == "ydb" + assert attrs["ydb.tx.id"] == "test-tx-id" + assert attrs["ydb.session.id"] == "test-session-id" + assert attrs["ydb.node.id"] == 12345 + + +class TestErrorHandling: + def test_error_sets_error_status_and_attributes(self, otel_setup): + exporter = otel_setup + + from ydb import issues + + exc = issues.SchemeError("Table not found") + + from ydb.query.session import QuerySession + + qs = QuerySession.__new__(QuerySession) + cfg = FakeDriverConfig() + driver = MagicMock() + driver._driver_config = cfg + qs._driver = driver + qs._session_id = "test-session-id" + qs._node_id = 12345 + qs._closed = False + + with patch.object(QuerySession, "_execute_call", side_effect=exc): + with pytest.raises(issues.SchemeError): + qs.execute("SELECT * FROM non_existing_table") + + span = _get_single_span(exporter, "ydb.ExecuteQuery") + assert span.status.status_code == StatusCode.ERROR + attrs = dict(span.attributes) + assert attrs["error.type"] == "SCHEME_ERROR" + assert attrs["db.response.status_code"] == "SCHEME_ERROR" + assert len(span.events) > 0 + + +class TestNoSpansWhenDisabled: + def test_no_spans_without_enable_tracing(self): + """Without enable_tracing(), the registry uses noop — no spans are created.""" + + from tests.tracing.conftest import _exporter + + _registry.set_create_span(None) + _registry.set_metadata_hook(None) + _exporter.clear() + + with create_ydb_span("ydb.CreateSession", FakeDriverConfig()): + pass + + assert len(_exporter.get_finished_spans()) == 0 + + +class TestParentChildRelationship: + def test_sdk_span_is_child_of_user_span(self, otel_setup): + exporter = otel_setup + + tracer = trace.get_tracer("test.tracer") + + with tracer.start_as_current_span("user.operation") as parent_span: + with create_ydb_span("ydb.ExecuteQuery", FakeDriverConfig(), session_id="s1", node_id=1): + pass + + spans = exporter.get_finished_spans() + ydb_span = next(s for s in spans if s.name == "ydb.ExecuteQuery") + user_span = next(s for s in spans if s.name == "user.operation") + + assert ydb_span.parent is not None + assert ydb_span.parent.span_id == user_span.context.span_id + assert ydb_span.context.trace_id == user_span.context.trace_id + + +class TestTraceMetadataInjection: + def test_get_trace_metadata_returns_traceparent(self, otel_setup): + from ydb.opentelemetry.tracing import get_trace_metadata + + tracer = trace.get_tracer("test.tracer") + + with tracer.start_as_current_span("test.span"): + metadata = get_trace_metadata() + + keys = [k for k, v in metadata] + assert "traceparent" in keys + + +class TestDriverInitializeSpan: + def test_driver_initialize_emits_internal_span(self, otel_setup): + exporter = otel_setup + + cfg = FakeDriverConfig() + + with create_ydb_span("ydb.Driver.Initialize", cfg, kind="internal"): + pass + + span = _get_single_span(exporter, "ydb.Driver.Initialize") + assert span.kind == SpanKind.INTERNAL + attrs = dict(span.attributes) + assert attrs["db.system.name"] == "ydb" + assert attrs["db.namespace"] == "/test_database" + + +class TestCommonAttributes: + @pytest.mark.parametrize("endpoint,expected_host,expected_port", [ + ("grpc://host.example.com:2136", "grpc://host.example.com", 2136), + ("localhost:2136", "localhost", 2136), + ]) + def test_endpoint_parsing(self, otel_setup, endpoint, expected_host, expected_port): + exporter = otel_setup + cfg = FakeDriverConfig(endpoint=endpoint, database="/mydb") + + with create_ydb_span("ydb.Test", cfg): + pass + + span = _get_single_span(exporter, "ydb.Test") + attrs = dict(span.attributes) + assert attrs["server.address"] == expected_host + assert attrs["server.port"] == expected_port + assert attrs["db.namespace"] == "/mydb" diff --git a/ydb/aio/query/base.py b/ydb/aio/query/base.py index 66df3703..cbf22e98 100644 --- a/ydb/aio/query/base.py +++ b/ydb/aio/query/base.py @@ -2,9 +2,10 @@ class AsyncResponseContextIterator(_utilities.AsyncResponseIterator): - def __init__(self, it, wrapper, on_error=None): + def __init__(self, it, wrapper, on_error=None, span=None): super().__init__(it, wrapper) self._on_error = on_error + self._span = span async def __aenter__(self) -> "AsyncResponseContextIterator": return self @@ -12,12 +13,27 @@ async def __aenter__(self) -> "AsyncResponseContextIterator": async def _next(self): try: return await super()._next() + except StopAsyncIteration: + self._finish_span() + raise except Exception as e: if self._on_error: self._on_error(e) + self._finish_span(e) raise e + def _finish_span(self, exception=None): + if self._span is not None: + if exception is not None: + self._span.set_error(exception) + self._span.end() + self._span = None + + def __del__(self): + self._finish_span() + async def __aexit__(self, exc_type, exc_val, exc_tb): # To close stream on YDB it is necessary to scroll through it to the end async for _ in self: pass + self._finish_span() diff --git a/ydb/aio/query/session.py b/ydb/aio/query/session.py index 80a236df..bd291069 100644 --- a/ydb/aio/query/session.py +++ b/ydb/aio/query/session.py @@ -161,8 +161,11 @@ async def execute( """ self._check_session_ready_to_use() - with create_ydb_span("ydb.ExecuteQuery", self._driver._driver_config, - session_id=self._session_id, node_id=self._node_id): + span = create_ydb_span( + "ydb.ExecuteQuery", self._driver._driver_config, session_id=self._session_id, node_id=self._node_id + ) + + try: stream_it = await self._execute_call( query=query, parameters=parameters, @@ -186,7 +189,13 @@ async def execute( settings=self._settings, ), on_error=self._on_execute_stream_error, + span=span, ) + except Exception as e: + if span is not None: + span.set_error(e) + span.end() + raise async def explain( self, diff --git a/ydb/aio/query/transaction.py b/ydb/aio/query/transaction.py index 746831a4..87666984 100644 --- a/ydb/aio/query/transaction.py +++ b/ydb/aio/query/transaction.py @@ -107,10 +107,13 @@ async def commit(self, settings: Optional[BaseRequestSettings] = None) -> None: await self._ensure_prev_stream_finished() - with create_ydb_span("ydb.Commit", self._driver._driver_config, - session_id=self.session.session_id, - node_id=self.session.node_id, - tx_id=self._tx_state.tx_id): + with create_ydb_span( + "ydb.Commit", + self._driver._driver_config, + session_id=self.session.session_id, + node_id=self.session.node_id, + tx_id=self._tx_state.tx_id, + ): try: await self._execute_callbacks_async(base.TxEvent.BEFORE_COMMIT) await self._commit_call(settings) @@ -138,10 +141,13 @@ async def rollback(self, settings: Optional[BaseRequestSettings] = None) -> None await self._ensure_prev_stream_finished() - with create_ydb_span("ydb.Rollback", self._driver._driver_config, - session_id=self.session.session_id, - node_id=self.session.node_id, - tx_id=self._tx_state.tx_id): + with create_ydb_span( + "ydb.Rollback", + self._driver._driver_config, + session_id=self.session.session_id, + node_id=self.session.node_id, + tx_id=self._tx_state.tx_id, + ): try: await self._execute_callbacks_async(base.TxEvent.BEFORE_ROLLBACK) await self._rollback_call(settings) @@ -196,10 +202,15 @@ async def execute( """ await self._ensure_prev_stream_finished() - with create_ydb_span("ydb.ExecuteQuery", self._driver._driver_config, - session_id=self.session.session_id, - node_id=self.session.node_id, - tx_id=self._tx_state.tx_id): + span = create_ydb_span( + "ydb.ExecuteQuery", + self._driver._driver_config, + session_id=self.session.session_id, + node_id=self.session.node_id, + tx_id=self._tx_state.tx_id, + ) + + try: stream_it = await self._execute_call( query=query, parameters=parameters, @@ -225,5 +236,11 @@ async def execute( settings=self.session._settings, ), on_error=self.session._on_execute_stream_error, + span=span, ) return self._prev_stream + except Exception as e: + if span is not None: + span.set_error(e) + span.end() + raise diff --git a/ydb/opentelemetry/__init__.py b/ydb/opentelemetry/__init__.py index c732dbf4..f9587e9e 100644 --- a/ydb/opentelemetry/__init__.py +++ b/ydb/opentelemetry/__init__.py @@ -1,6 +1,12 @@ def enable_tracing(): """Enable OpenTelemetry trace context propagation and span creation for all YDB gRPC calls.""" - from ydb.opentelemetry._plugin import _enable_tracing + try: + from ydb.opentelemetry._plugin import _enable_tracing + except ImportError: + raise ImportError( + "OpenTelemetry packages are required for tracing support. " + "Install them with: pip install ydb[tracing]" + ) from None _enable_tracing() diff --git a/ydb/opentelemetry/_plugin.py b/ydb/opentelemetry/_plugin.py index 828e7000..81a54186 100644 --- a/ydb/opentelemetry/_plugin.py +++ b/ydb/opentelemetry/_plugin.py @@ -1,82 +1,77 @@ -from contextlib import contextmanager +from opentelemetry import context, trace +from opentelemetry.propagate import inject +from opentelemetry.trace import StatusCode -_MIN_OTEL_VERSION = "1.0.0" +from ydb import issues +from ydb.opentelemetry.tracing import _registry _tracer = None _enabled = False - -def _check_dependencies(): - try: - from opentelemetry.version import __version__ as otel_version - except ImportError: - raise ImportError( - "OpenTelemetry packages are required for tracing support. " - "Install them with: pip install ydb[tracing]" - ) from None - - from packaging.version import Version - - if Version(otel_version) < Version(_MIN_OTEL_VERSION): - raise ImportError( - f"OpenTelemetry >= {_MIN_OTEL_VERSION} is required, " - f"but {otel_version} is installed. " - "Upgrade with: pip install ydb[tracing]" - ) +_KIND_MAP = { + "client": trace.SpanKind.CLIENT, + "internal": trace.SpanKind.INTERNAL, +} def _otel_metadata_hook(): """Injects W3C Trace Context (traceparent/tracestate) into gRPC metadata.""" - from opentelemetry.propagate import inject - headers = {} inject(headers) return list(headers.items()) -@contextmanager -def _otel_span(name, attributes=None, kind=None): - from opentelemetry import trace +def _set_error_on_span(span, exception): + if isinstance(exception, issues.Error) and exception.status is not None: + error_type = exception.status.name + span.set_attribute("db.response.status_code", error_type) + else: + error_type = type(exception).__qualname__ - kind_map = { - "client": trace.SpanKind.CLIENT, - "internal": trace.SpanKind.INTERNAL, - } - otel_kind = kind_map.get(kind, trace.SpanKind.CLIENT) - with _tracer.start_as_current_span( - name, - kind=otel_kind, - attributes=attributes or {}, - ) as span: - try: - yield span - except Exception as e: - _otel_set_error(span, e) - raise + span.set_attribute("error.type", error_type) + span.set_status(StatusCode.ERROR, str(exception)) + span.record_exception(exception) -def _otel_set_error(span, exception): - """Records an exception on the span and sets ERROR status.""" - if span is None: - return +class TracingSpan: + """Wrapper around an OTel span that manages context lifecycle. - from opentelemetry.trace import StatusCode - from ydb import issues - - attrs = {} - if isinstance(exception, issues.Error): - status_code = getattr(exception, "status", None) - if status_code is not None: - attrs["db.response.status_code"] = str(status_code) - attrs["error.type"] = status_code.name - else: - attrs["error.type"] = type(exception).__qualname__ - else: - attrs["error.type"] = type(exception).__qualname__ + Can be used as a context manager or manually + """ - span.set_attributes(attrs) - span.set_status(StatusCode.ERROR, str(exception)) - span.record_exception(exception) + def __init__(self, span, token): + self._span = span + self._token = token + + def set_error(self, exception): + _set_error_on_span(self._span, exception) + + def end(self): + self._span.end() + if self._token is not None: + context.detach(self._token) + self._token = None + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + if exc_val is not None: + self.set_error(exc_val) + self.end() + return False + + +def _create_span(name, attributes=None, kind=None): + # Can be used as a context manager or manually + span = _tracer.start_span( + name, + kind=_KIND_MAP.get(kind, trace.SpanKind.CLIENT), + attributes=attributes or {}, + ) + ctx = trace.set_span_in_context(span) + token = context.attach(ctx) + return TracingSpan(span, token) def _enable_tracing(): @@ -85,12 +80,7 @@ def _enable_tracing(): if _enabled: return - _check_dependencies() - - from opentelemetry import trace - from ydb.opentelemetry.tracing import _registry - _tracer = trace.get_tracer("ydb.sdk") _enabled = True _registry.set_metadata_hook(_otel_metadata_hook) - _registry.set_span_factory(_otel_span) + _registry.set_create_span(_create_span) diff --git a/ydb/opentelemetry/tracing.py b/ydb/opentelemetry/tracing.py index 265eff94..07a0ead7 100644 --- a/ydb/opentelemetry/tracing.py +++ b/ydb/opentelemetry/tracing.py @@ -1,26 +1,37 @@ -from contextlib import contextmanager +class _NoopSpan: + """Returned by create_ydb_span when tracing is disabled.""" + def set_error(self, exception): + pass -@contextmanager -def _noop_span(name, attributes=None, kind=None): - yield None + def end(self): + pass + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + return False + + +_NOOP_SPAN = _NoopSpan() class OtelTracingRegistry: """Singleton registry for OpenTelemetry tracing. - Holds the span factory and metadata hook. - By default everything is no-op until :func:`enable_tracing` is called - from :mod:`ydb.opentelemetry`. + By default everything is no-op until :func:`enable_tracing` is called. """ def __init__(self): self._metadata_hook = None - self._span_factory = _noop_span + self._create_span_func = None def create_span(self, name, attributes=None, kind=None): - """Create a tracing span (context manager).""" - return self._span_factory(name, attributes, kind=kind) + """Create a span. Returns a TracingSpan or _NoopSpan.""" + if self._create_span_func is None: + return _NOOP_SPAN + return self._create_span_func(name, attributes, kind=kind) def get_trace_metadata(self): """Return tracing metadata (e.g. W3C traceparent) for gRPC calls.""" @@ -29,45 +40,21 @@ def get_trace_metadata(self): return [] def set_metadata_hook(self, hook): - """Set a hook that returns tracing metadata for gRPC calls. - - *hook* must be a callable returning a list of ``(key, value)`` tuples. - """ self._metadata_hook = hook - def set_span_factory(self, factory): - """Set a span factory for tracing SDK operations. - - *factory* must be a context-manager factory: - ``factory(name, attributes, kind) -> context manager yielding span``. - """ - self._span_factory = factory + def set_create_span(self, func): + self._create_span_func = func _registry = OtelTracingRegistry() - -def create_span(name, attributes=None, kind=None): - """Create a tracing span via the global registry.""" - return _registry.create_span(name, attributes, kind) - - def get_trace_metadata(): """Return tracing metadata for gRPC calls.""" return _registry.get_trace_metadata() -def create_ydb_span(name, driver_config, session_id=None, node_id=None, tx_id=None, kind=None): - """Create a span pre-filled with standard YDB attributes. - - :param name: Span name (e.g. ``"ydb.ExecuteQuery"``). - :param driver_config: :class:`ydb.DriverConfig` instance. - :param session_id: Optional session ID. - :param node_id: Optional node ID. - :param tx_id: Optional transaction ID. - :param kind: Optional span kind (``"client"`` or ``"internal"``). - """ +def _build_ydb_attrs(driver_config, session_id=None, node_id=None, tx_id=None): endpoint = getattr(driver_config, "endpoint", None) or "" host, _, port = endpoint.rpartition(":") attrs = { @@ -82,4 +69,12 @@ def create_ydb_span(name, driver_config, session_id=None, node_id=None, tx_id=No attrs["ydb.node.id"] = node_id or 0 if tx_id is not None: attrs["ydb.tx.id"] = tx_id or "" + return attrs + + +def create_ydb_span(name, driver_config, session_id=None, node_id=None, tx_id=None, kind=None): + """Create a span pre-filled with standard YDB attributes. + Can be used as a context manager or manually. + """ + attrs = _build_ydb_attrs(driver_config, session_id, node_id, tx_id) return _registry.create_span(name, attributes=attrs, kind=kind) diff --git a/ydb/query/base.py b/ydb/query/base.py index e7764e1c..09b8a40b 100644 --- a/ydb/query/base.py +++ b/ydb/query/base.py @@ -73,9 +73,10 @@ class QueryResultSetFormat(enum.IntEnum): class SyncResponseContextIterator(_utilities.SyncResponseIterator): - def __init__(self, it, wrapper, on_error=None): + def __init__(self, it, wrapper, on_error=None, span=None): super().__init__(it, wrapper) self._on_error = on_error + self._span = span def __enter__(self) -> "SyncResponseContextIterator": return self @@ -83,15 +84,30 @@ def __enter__(self) -> "SyncResponseContextIterator": def _next(self): try: return super()._next() + except StopIteration: + self._finish_span() + raise except Exception as e: if self._on_error: self._on_error(e) + self._finish_span(e) raise e + def _finish_span(self, exception=None): + if self._span is not None: + if exception is not None: + self._span.set_error(exception) + self._span.end() + self._span = None + + def __del__(self): + self._finish_span() + def __exit__(self, exc_type, exc_val, exc_tb): # To close stream on YDB it is necessary to scroll through it to the end for _ in self: pass + self._finish_span() class QueryClientSettings: diff --git a/ydb/query/session.py b/ydb/query/session.py index d9f379f1..91a35dce 100644 --- a/ydb/query/session.py +++ b/ydb/query/session.py @@ -437,8 +437,11 @@ def execute( """ self._check_session_ready_to_use() - with create_ydb_span("ydb.ExecuteQuery", self._driver._driver_config, - session_id=self._session_id, node_id=self._node_id): + span = create_ydb_span( + "ydb.ExecuteQuery", self._driver._driver_config, session_id=self._session_id, node_id=self._node_id + ) + + try: stream_it = self._execute_call( query=query, parameters=parameters, @@ -462,7 +465,13 @@ def execute( settings=self._settings, ), on_error=self._on_execute_stream_error, + span=span, ) + except Exception as e: + if span is not None: + span.set_error(e) + span.end() + raise def explain( self, diff --git a/ydb/query/transaction.py b/ydb/query/transaction.py index 8631ba52..d2aef95c 100644 --- a/ydb/query/transaction.py +++ b/ydb/query/transaction.py @@ -554,10 +554,13 @@ def commit(self, settings: Optional[BaseRequestSettings] = None) -> None: self._ensure_prev_stream_finished() - with create_ydb_span("ydb.Commit", self._driver._driver_config, - session_id=self.session.session_id, - node_id=self.session.node_id, - tx_id=self._tx_state.tx_id): + with create_ydb_span( + "ydb.Commit", + self._driver._driver_config, + session_id=self.session.session_id, + node_id=self.session.node_id, + tx_id=self._tx_state.tx_id, + ): try: self._execute_callbacks_sync(base.TxEvent.BEFORE_COMMIT) self._commit_call(settings) @@ -584,10 +587,13 @@ def rollback(self, settings: Optional[BaseRequestSettings] = None) -> None: self._ensure_prev_stream_finished() - with create_ydb_span("ydb.Rollback", self._driver._driver_config, - session_id=self.session.session_id, - node_id=self.session.node_id, - tx_id=self._tx_state.tx_id): + with create_ydb_span( + "ydb.Rollback", + self._driver._driver_config, + session_id=self.session.session_id, + node_id=self.session.node_id, + tx_id=self._tx_state.tx_id, + ): try: self._execute_callbacks_sync(base.TxEvent.BEFORE_ROLLBACK) self._rollback_call(settings) @@ -643,10 +649,15 @@ def execute( """ self._ensure_prev_stream_finished() - with create_ydb_span("ydb.ExecuteQuery", self._driver._driver_config, - session_id=self.session.session_id, - node_id=self.session.node_id, - tx_id=self._tx_state.tx_id): + span = create_ydb_span( + "ydb.ExecuteQuery", + self._driver._driver_config, + session_id=self.session.session_id, + node_id=self.session.node_id, + tx_id=self._tx_state.tx_id, + ) + + try: stream_it = self._execute_call( query=query, commit_tx=commit_tx, @@ -672,5 +683,11 @@ def execute( settings=self.session._settings, ), on_error=self.session._on_execute_stream_error, + span=span, ) return self._prev_stream + except Exception as e: + if span is not None: + span.set_error(e) + span.end() + raise From acdc32f4904169caee0c5b4de20c1ac6e9b62519 Mon Sep 17 00:00:00 2001 From: tewbo Date: Tue, 24 Mar 2026 07:58:54 +0300 Subject: [PATCH 4/8] * format --- examples/opentelemetry/example.py | 1 + tests/tracing/conftest.py | 1 - tests/tracing/test_tracing_async.py | 5 ++++- tests/tracing/test_tracing_sync.py | 16 +++++++++++----- ydb/opentelemetry/__init__.py | 3 +-- ydb/query/base.py | 1 - ydb/query/session.py | 24 ++++++++---------------- ydb/query/transaction.py | 24 ++++++++---------------- 8 files changed, 33 insertions(+), 42 deletions(-) diff --git a/examples/opentelemetry/example.py b/examples/opentelemetry/example.py index fad3111a..55be4257 100644 --- a/examples/opentelemetry/example.py +++ b/examples/opentelemetry/example.py @@ -59,6 +59,7 @@ async def tx_callee(session): await pool.retry_operation_async(tx_callee) + sync_example() asyncio.run(async_example()) diff --git a/tests/tracing/conftest.py b/tests/tracing/conftest.py index b54ffd05..94f653b8 100644 --- a/tests/tracing/conftest.py +++ b/tests/tracing/conftest.py @@ -13,7 +13,6 @@ from ydb.opentelemetry.tracing import _registry - _provider = TracerProvider() _exporter = InMemorySpanExporter() _provider.add_span_processor(SimpleSpanProcessor(_exporter)) diff --git a/tests/tracing/test_tracing_async.py b/tests/tracing/test_tracing_async.py index 08bcddba..4b059f2c 100644 --- a/tests/tracing/test_tracing_async.py +++ b/tests/tracing/test_tracing_async.py @@ -11,6 +11,7 @@ import asyncio import pytest + async def _empty_async_iter(): return yield # noqa: makes this an async generator @@ -25,7 +26,9 @@ def _get_spans(exporter, name=None): def _get_single_span(exporter, name): spans = _get_spans(exporter, name) - assert len(spans) == 1, f"Expected 1 span named '{name}', got {len(spans)}: {[s.name for s in exporter.get_finished_spans()]}" + assert ( + len(spans) == 1 + ), f"Expected 1 span named '{name}', got {len(spans)}: {[s.name for s in exporter.get_finished_spans()]}" return spans[0] diff --git a/tests/tracing/test_tracing_sync.py b/tests/tracing/test_tracing_sync.py index 1bc6f835..42dfae74 100644 --- a/tests/tracing/test_tracing_sync.py +++ b/tests/tracing/test_tracing_sync.py @@ -14,6 +14,7 @@ import pytest + def _get_spans(exporter, name=None): spans = exporter.get_finished_spans() if name is not None: @@ -23,7 +24,9 @@ def _get_spans(exporter, name=None): def _get_single_span(exporter, name): spans = _get_spans(exporter, name) - assert len(spans) == 1, f"Expected 1 span named '{name}', got {len(spans)}: {[s.name for s in exporter.get_finished_spans()]}" + assert ( + len(spans) == 1 + ), f"Expected 1 span named '{name}', got {len(spans)}: {[s.name for s in exporter.get_finished_spans()]}" return spans[0] @@ -263,10 +266,13 @@ def test_driver_initialize_emits_internal_span(self, otel_setup): class TestCommonAttributes: - @pytest.mark.parametrize("endpoint,expected_host,expected_port", [ - ("grpc://host.example.com:2136", "grpc://host.example.com", 2136), - ("localhost:2136", "localhost", 2136), - ]) + @pytest.mark.parametrize( + "endpoint,expected_host,expected_port", + [ + ("grpc://host.example.com:2136", "grpc://host.example.com", 2136), + ("localhost:2136", "localhost", 2136), + ], + ) def test_endpoint_parsing(self, otel_setup, endpoint, expected_host, expected_port): exporter = otel_setup cfg = FakeDriverConfig(endpoint=endpoint, database="/mydb") diff --git a/ydb/opentelemetry/__init__.py b/ydb/opentelemetry/__init__.py index f9587e9e..144e7bc4 100644 --- a/ydb/opentelemetry/__init__.py +++ b/ydb/opentelemetry/__init__.py @@ -4,8 +4,7 @@ def enable_tracing(): from ydb.opentelemetry._plugin import _enable_tracing except ImportError: raise ImportError( - "OpenTelemetry packages are required for tracing support. " - "Install them with: pip install ydb[tracing]" + "OpenTelemetry packages are required for tracing support. " "Install them with: pip install ydb[tracing]" ) from None _enable_tracing() diff --git a/ydb/query/base.py b/ydb/query/base.py index 09b8a40b..1aeb4f6b 100644 --- a/ydb/query/base.py +++ b/ydb/query/base.py @@ -27,7 +27,6 @@ from ydb._topic_common.common import CallFromSyncToAsync, _get_shared_event_loop from ydb._grpc.grpcwrapper.common_utils import to_thread - if typing.TYPE_CHECKING: from .transaction import BaseQueryTxContext from .session import BaseQuerySession diff --git a/ydb/query/session.py b/ydb/query/session.py index 91a35dce..f2ef9b7c 100644 --- a/ydb/query/session.py +++ b/ydb/query/session.py @@ -145,14 +145,12 @@ def _on_execute_stream_error(self, e: Exception) -> None: @overload def _create_call( self: "BaseQuerySession[SyncDriver]", settings: Optional[BaseRequestSettings] = None - ) -> "BaseQuerySession[SyncDriver]": - ... + ) -> "BaseQuerySession[SyncDriver]": ... @overload def _create_call( self: "BaseQuerySession[AsyncDriver]", settings: Optional[BaseRequestSettings] = None - ) -> Awaitable["BaseQuerySession[AsyncDriver]"]: - ... + ) -> Awaitable["BaseQuerySession[AsyncDriver]"]: ... def _create_call( self, settings: Optional[BaseRequestSettings] = None @@ -171,14 +169,12 @@ def _create_call( @overload def _delete_call( self: "BaseQuerySession[SyncDriver]", settings: Optional[BaseRequestSettings] = None - ) -> "BaseQuerySession[SyncDriver]": - ... + ) -> "BaseQuerySession[SyncDriver]": ... @overload def _delete_call( self: "BaseQuerySession[AsyncDriver]", settings: Optional[BaseRequestSettings] = None - ) -> Awaitable["BaseQuerySession[AsyncDriver]"]: - ... + ) -> Awaitable["BaseQuerySession[AsyncDriver]"]: ... def _delete_call( self, settings: Optional[BaseRequestSettings] = None @@ -198,14 +194,12 @@ def _delete_call( @overload def _attach_call( self: "BaseQuerySession[SyncDriver]", - ) -> GrpcStreamCall[_apis.ydb_query.SessionState]: - ... + ) -> GrpcStreamCall[_apis.ydb_query.SessionState]: ... @overload def _attach_call( self: "BaseQuerySession[AsyncDriver]", - ) -> Awaitable[GrpcStreamCall[_apis.ydb_query.SessionState]]: - ... + ) -> Awaitable[GrpcStreamCall[_apis.ydb_query.SessionState]]: ... def _attach_call( self, @@ -234,8 +228,7 @@ def _execute_call( arrow_format_settings: Optional[base.ArrowFormatSettings] = None, concurrent_result_sets: bool = False, settings: Optional[BaseRequestSettings] = None, - ) -> Iterable[_apis.ydb_query.ExecuteQueryResponsePart]: - ... + ) -> Iterable[_apis.ydb_query.ExecuteQueryResponsePart]: ... @overload def _execute_call( @@ -251,8 +244,7 @@ def _execute_call( arrow_format_settings: Optional[base.ArrowFormatSettings] = None, concurrent_result_sets: bool = False, settings: Optional[BaseRequestSettings] = None, - ) -> Awaitable[Iterable[_apis.ydb_query.ExecuteQueryResponsePart]]: - ... + ) -> Awaitable[Iterable[_apis.ydb_query.ExecuteQueryResponsePart]]: ... def _execute_call( self, diff --git a/ydb/query/transaction.py b/ydb/query/transaction.py index d2aef95c..9b4f427d 100644 --- a/ydb/query/transaction.py +++ b/ydb/query/transaction.py @@ -286,14 +286,12 @@ def _check_external_error_set(self): @overload def _begin_call( self: "BaseQueryTxContext[SyncDriver]", settings: Optional[BaseRequestSettings] - ) -> "BaseQueryTxContext[SyncDriver]": - ... + ) -> "BaseQueryTxContext[SyncDriver]": ... @overload def _begin_call( self: "BaseQueryTxContext[AsyncDriver]", settings: Optional[BaseRequestSettings] - ) -> Awaitable["BaseQueryTxContext[AsyncDriver]"]: - ... + ) -> Awaitable["BaseQueryTxContext[AsyncDriver]"]: ... def _begin_call( self, settings: Optional[BaseRequestSettings] @@ -315,14 +313,12 @@ def _begin_call( @overload def _commit_call( self: "BaseQueryTxContext[SyncDriver]", settings: Optional[BaseRequestSettings] - ) -> "BaseQueryTxContext[SyncDriver]": - ... + ) -> "BaseQueryTxContext[SyncDriver]": ... @overload def _commit_call( self: "BaseQueryTxContext[AsyncDriver]", settings: Optional[BaseRequestSettings] - ) -> Awaitable["BaseQueryTxContext[AsyncDriver]"]: - ... + ) -> Awaitable["BaseQueryTxContext[AsyncDriver]"]: ... def _commit_call( self, settings: Optional[BaseRequestSettings] @@ -345,14 +341,12 @@ def _commit_call( @overload def _rollback_call( self: "BaseQueryTxContext[SyncDriver]", settings: Optional[BaseRequestSettings] - ) -> "BaseQueryTxContext[SyncDriver]": - ... + ) -> "BaseQueryTxContext[SyncDriver]": ... @overload def _rollback_call( self: "BaseQueryTxContext[AsyncDriver]", settings: Optional[BaseRequestSettings] - ) -> Awaitable["BaseQueryTxContext[AsyncDriver]"]: - ... + ) -> Awaitable["BaseQueryTxContext[AsyncDriver]"]: ... def _rollback_call( self, settings: Optional[BaseRequestSettings] @@ -386,8 +380,7 @@ def _execute_call( arrow_format_settings: Optional[base.ArrowFormatSettings], concurrent_result_sets: Optional[bool], settings: Optional[BaseRequestSettings], - ) -> Iterable[_apis.ydb_query.ExecuteQueryResponsePart]: - ... + ) -> Iterable[_apis.ydb_query.ExecuteQueryResponsePart]: ... @overload def _execute_call( @@ -403,8 +396,7 @@ def _execute_call( arrow_format_settings: Optional[base.ArrowFormatSettings], concurrent_result_sets: Optional[bool], settings: Optional[BaseRequestSettings], - ) -> Awaitable[Iterable[_apis.ydb_query.ExecuteQueryResponsePart]]: - ... + ) -> Awaitable[Iterable[_apis.ydb_query.ExecuteQueryResponsePart]]: ... def _execute_call( self, From 7bf72a98cfe0b33975bff7015362c275e29e6663 Mon Sep 17 00:00:00 2001 From: tewbo Date: Tue, 24 Mar 2026 08:29:01 +0300 Subject: [PATCH 5/8] * add otel to test requirements --- pyproject.toml | 1 + test-requirements.txt | 2 ++ 2 files changed, 3 insertions(+) diff --git a/pyproject.toml b/pyproject.toml index 41e7ef6f..0b08f0b2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -41,6 +41,7 @@ module = [ "requests.*", "ydb.public.api.*", "contrib.ydb.public.api.*", + "opentelemetry.*", ] ignore_missing_imports = true diff --git a/test-requirements.txt b/test-requirements.txt index a5b65963..0976ce50 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -43,6 +43,8 @@ sqlalchemy==1.4.26 pylint-protobuf cython freezegun>=1.3.0 +opentelemetry-api>=1.0.0 +opentelemetry-sdk>=1.0.0 # pytest-cov yandexcloud -e . From 74cc57d2bbb77b542af73c071b188b863fdc646d Mon Sep 17 00:00:00 2001 From: tewbo Date: Tue, 24 Mar 2026 08:41:55 +0300 Subject: [PATCH 6/8] fix black checkstyle --- ydb/query/session.py | 24 ++++++++++++++++-------- ydb/query/transaction.py | 24 ++++++++++++++++-------- 2 files changed, 32 insertions(+), 16 deletions(-) diff --git a/ydb/query/session.py b/ydb/query/session.py index f2ef9b7c..91a35dce 100644 --- a/ydb/query/session.py +++ b/ydb/query/session.py @@ -145,12 +145,14 @@ def _on_execute_stream_error(self, e: Exception) -> None: @overload def _create_call( self: "BaseQuerySession[SyncDriver]", settings: Optional[BaseRequestSettings] = None - ) -> "BaseQuerySession[SyncDriver]": ... + ) -> "BaseQuerySession[SyncDriver]": + ... @overload def _create_call( self: "BaseQuerySession[AsyncDriver]", settings: Optional[BaseRequestSettings] = None - ) -> Awaitable["BaseQuerySession[AsyncDriver]"]: ... + ) -> Awaitable["BaseQuerySession[AsyncDriver]"]: + ... def _create_call( self, settings: Optional[BaseRequestSettings] = None @@ -169,12 +171,14 @@ def _create_call( @overload def _delete_call( self: "BaseQuerySession[SyncDriver]", settings: Optional[BaseRequestSettings] = None - ) -> "BaseQuerySession[SyncDriver]": ... + ) -> "BaseQuerySession[SyncDriver]": + ... @overload def _delete_call( self: "BaseQuerySession[AsyncDriver]", settings: Optional[BaseRequestSettings] = None - ) -> Awaitable["BaseQuerySession[AsyncDriver]"]: ... + ) -> Awaitable["BaseQuerySession[AsyncDriver]"]: + ... def _delete_call( self, settings: Optional[BaseRequestSettings] = None @@ -194,12 +198,14 @@ def _delete_call( @overload def _attach_call( self: "BaseQuerySession[SyncDriver]", - ) -> GrpcStreamCall[_apis.ydb_query.SessionState]: ... + ) -> GrpcStreamCall[_apis.ydb_query.SessionState]: + ... @overload def _attach_call( self: "BaseQuerySession[AsyncDriver]", - ) -> Awaitable[GrpcStreamCall[_apis.ydb_query.SessionState]]: ... + ) -> Awaitable[GrpcStreamCall[_apis.ydb_query.SessionState]]: + ... def _attach_call( self, @@ -228,7 +234,8 @@ def _execute_call( arrow_format_settings: Optional[base.ArrowFormatSettings] = None, concurrent_result_sets: bool = False, settings: Optional[BaseRequestSettings] = None, - ) -> Iterable[_apis.ydb_query.ExecuteQueryResponsePart]: ... + ) -> Iterable[_apis.ydb_query.ExecuteQueryResponsePart]: + ... @overload def _execute_call( @@ -244,7 +251,8 @@ def _execute_call( arrow_format_settings: Optional[base.ArrowFormatSettings] = None, concurrent_result_sets: bool = False, settings: Optional[BaseRequestSettings] = None, - ) -> Awaitable[Iterable[_apis.ydb_query.ExecuteQueryResponsePart]]: ... + ) -> Awaitable[Iterable[_apis.ydb_query.ExecuteQueryResponsePart]]: + ... def _execute_call( self, diff --git a/ydb/query/transaction.py b/ydb/query/transaction.py index 9b4f427d..d2aef95c 100644 --- a/ydb/query/transaction.py +++ b/ydb/query/transaction.py @@ -286,12 +286,14 @@ def _check_external_error_set(self): @overload def _begin_call( self: "BaseQueryTxContext[SyncDriver]", settings: Optional[BaseRequestSettings] - ) -> "BaseQueryTxContext[SyncDriver]": ... + ) -> "BaseQueryTxContext[SyncDriver]": + ... @overload def _begin_call( self: "BaseQueryTxContext[AsyncDriver]", settings: Optional[BaseRequestSettings] - ) -> Awaitable["BaseQueryTxContext[AsyncDriver]"]: ... + ) -> Awaitable["BaseQueryTxContext[AsyncDriver]"]: + ... def _begin_call( self, settings: Optional[BaseRequestSettings] @@ -313,12 +315,14 @@ def _begin_call( @overload def _commit_call( self: "BaseQueryTxContext[SyncDriver]", settings: Optional[BaseRequestSettings] - ) -> "BaseQueryTxContext[SyncDriver]": ... + ) -> "BaseQueryTxContext[SyncDriver]": + ... @overload def _commit_call( self: "BaseQueryTxContext[AsyncDriver]", settings: Optional[BaseRequestSettings] - ) -> Awaitable["BaseQueryTxContext[AsyncDriver]"]: ... + ) -> Awaitable["BaseQueryTxContext[AsyncDriver]"]: + ... def _commit_call( self, settings: Optional[BaseRequestSettings] @@ -341,12 +345,14 @@ def _commit_call( @overload def _rollback_call( self: "BaseQueryTxContext[SyncDriver]", settings: Optional[BaseRequestSettings] - ) -> "BaseQueryTxContext[SyncDriver]": ... + ) -> "BaseQueryTxContext[SyncDriver]": + ... @overload def _rollback_call( self: "BaseQueryTxContext[AsyncDriver]", settings: Optional[BaseRequestSettings] - ) -> Awaitable["BaseQueryTxContext[AsyncDriver]"]: ... + ) -> Awaitable["BaseQueryTxContext[AsyncDriver]"]: + ... def _rollback_call( self, settings: Optional[BaseRequestSettings] @@ -380,7 +386,8 @@ def _execute_call( arrow_format_settings: Optional[base.ArrowFormatSettings], concurrent_result_sets: Optional[bool], settings: Optional[BaseRequestSettings], - ) -> Iterable[_apis.ydb_query.ExecuteQueryResponsePart]: ... + ) -> Iterable[_apis.ydb_query.ExecuteQueryResponsePart]: + ... @overload def _execute_call( @@ -396,7 +403,8 @@ def _execute_call( arrow_format_settings: Optional[base.ArrowFormatSettings], concurrent_result_sets: Optional[bool], settings: Optional[BaseRequestSettings], - ) -> Awaitable[Iterable[_apis.ydb_query.ExecuteQueryResponsePart]]: ... + ) -> Awaitable[Iterable[_apis.ydb_query.ExecuteQueryResponsePart]]: + ... def _execute_call( self, From de1d6d9c02b033e6355200ef02936f6346ed14a1 Mon Sep 17 00:00:00 2001 From: tewbo Date: Tue, 24 Mar 2026 08:45:33 +0300 Subject: [PATCH 7/8] fix flake8 checkstyle --- tests/conftest.py | 6 ------ 1 file changed, 6 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 9151ede0..6de23fe7 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -261,7 +261,6 @@ def topic_consumer(): @pytest.fixture() -@pytest.mark.asyncio() async def topic_path(driver, topic_consumer, database) -> str: topic_path = database + "/test-topic" @@ -279,7 +278,6 @@ async def topic_path(driver, topic_consumer, database) -> str: @pytest.fixture() -@pytest.mark.asyncio() async def topic2_path(driver, topic_consumer, database) -> str: topic_path = database + "/test-topic2" @@ -297,7 +295,6 @@ async def topic2_path(driver, topic_consumer, database) -> str: @pytest.fixture() -@pytest.mark.asyncio() async def topic_with_two_partitions_path(driver, topic_consumer, database) -> str: topic_path = database + "/test-topic-two-partitions" @@ -317,7 +314,6 @@ async def topic_with_two_partitions_path(driver, topic_consumer, database) -> st @pytest.fixture() -@pytest.mark.asyncio() async def topic_with_messages(driver, topic_consumer, database): topic_path = database + "/test-topic-with-messages" try: @@ -348,7 +344,6 @@ async def topic_with_messages(driver, topic_consumer, database): @pytest.fixture() -@pytest.mark.asyncio() async def topic_with_messages_with_metadata(driver, topic_consumer, database): topic_path = database + "/test-topic-with-messages-with-metadata" try: @@ -373,7 +368,6 @@ async def topic_with_messages_with_metadata(driver, topic_consumer, database): @pytest.fixture() -@pytest.mark.asyncio() async def topic_reader(driver, topic_consumer, topic_path) -> ydb.TopicReaderAsyncIO: reader = driver.topic_client.reader(topic=topic_path, consumer=topic_consumer) yield reader From 3dda4176f7fe335fa36a718d4d24eda2d07c7590 Mon Sep 17 00:00:00 2001 From: tewbo Date: Tue, 24 Mar 2026 09:37:08 +0300 Subject: [PATCH 8/8] make property from driver config --- tests/conftest.py | 6 ++++++ tests/tracing/test_tracing_sync.py | 2 +- ydb/aio/query/session.py | 4 ++-- ydb/aio/query/transaction.py | 6 +++--- ydb/query/session.py | 10 +++++++--- ydb/query/transaction.py | 10 +++++++--- 6 files changed, 26 insertions(+), 12 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 6de23fe7..9151ede0 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -261,6 +261,7 @@ def topic_consumer(): @pytest.fixture() +@pytest.mark.asyncio() async def topic_path(driver, topic_consumer, database) -> str: topic_path = database + "/test-topic" @@ -278,6 +279,7 @@ async def topic_path(driver, topic_consumer, database) -> str: @pytest.fixture() +@pytest.mark.asyncio() async def topic2_path(driver, topic_consumer, database) -> str: topic_path = database + "/test-topic2" @@ -295,6 +297,7 @@ async def topic2_path(driver, topic_consumer, database) -> str: @pytest.fixture() +@pytest.mark.asyncio() async def topic_with_two_partitions_path(driver, topic_consumer, database) -> str: topic_path = database + "/test-topic-two-partitions" @@ -314,6 +317,7 @@ async def topic_with_two_partitions_path(driver, topic_consumer, database) -> st @pytest.fixture() +@pytest.mark.asyncio() async def topic_with_messages(driver, topic_consumer, database): topic_path = database + "/test-topic-with-messages" try: @@ -344,6 +348,7 @@ async def topic_with_messages(driver, topic_consumer, database): @pytest.fixture() +@pytest.mark.asyncio() async def topic_with_messages_with_metadata(driver, topic_consumer, database): topic_path = database + "/test-topic-with-messages-with-metadata" try: @@ -368,6 +373,7 @@ async def topic_with_messages_with_metadata(driver, topic_consumer, database): @pytest.fixture() +@pytest.mark.asyncio() async def topic_reader(driver, topic_consumer, topic_path) -> ydb.TopicReaderAsyncIO: reader = driver.topic_client.reader(topic=topic_path, consumer=topic_consumer) yield reader diff --git a/tests/tracing/test_tracing_sync.py b/tests/tracing/test_tracing_sync.py index 42dfae74..03c020de 100644 --- a/tests/tracing/test_tracing_sync.py +++ b/tests/tracing/test_tracing_sync.py @@ -223,7 +223,7 @@ def test_sdk_span_is_child_of_user_span(self, otel_setup): tracer = trace.get_tracer("test.tracer") - with tracer.start_as_current_span("user.operation") as parent_span: + with tracer.start_as_current_span("user.operation"): with create_ydb_span("ydb.ExecuteQuery", FakeDriverConfig(), session_id="s1", node_id=1): pass diff --git a/ydb/aio/query/session.py b/ydb/aio/query/session.py index bd291069..4c8c1c99 100644 --- a/ydb/aio/query/session.py +++ b/ydb/aio/query/session.py @@ -106,7 +106,7 @@ async def create(self, settings: Optional[BaseRequestSettings] = None) -> "Query if self._closed: raise RuntimeError("Session is already closed") - with create_ydb_span("ydb.CreateSession", self._driver._driver_config): + with create_ydb_span("ydb.CreateSession", self._driver_config): await self._create_call(settings=settings) await self._attach() @@ -162,7 +162,7 @@ async def execute( self._check_session_ready_to_use() span = create_ydb_span( - "ydb.ExecuteQuery", self._driver._driver_config, session_id=self._session_id, node_id=self._node_id + "ydb.ExecuteQuery", self._driver_config, session_id=self._session_id, node_id=self._node_id ) try: diff --git a/ydb/aio/query/transaction.py b/ydb/aio/query/transaction.py index 87666984..cd764067 100644 --- a/ydb/aio/query/transaction.py +++ b/ydb/aio/query/transaction.py @@ -109,7 +109,7 @@ async def commit(self, settings: Optional[BaseRequestSettings] = None) -> None: with create_ydb_span( "ydb.Commit", - self._driver._driver_config, + self._driver_config, session_id=self.session.session_id, node_id=self.session.node_id, tx_id=self._tx_state.tx_id, @@ -143,7 +143,7 @@ async def rollback(self, settings: Optional[BaseRequestSettings] = None) -> None with create_ydb_span( "ydb.Rollback", - self._driver._driver_config, + self._driver_config, session_id=self.session.session_id, node_id=self.session.node_id, tx_id=self._tx_state.tx_id, @@ -204,7 +204,7 @@ async def execute( span = create_ydb_span( "ydb.ExecuteQuery", - self._driver._driver_config, + self._driver_config, session_id=self.session.session_id, node_id=self.session.node_id, tx_id=self._tx_state.tx_id, diff --git a/ydb/query/session.py b/ydb/query/session.py index 91a35dce..3b546f76 100644 --- a/ydb/query/session.py +++ b/ydb/query/session.py @@ -31,7 +31,7 @@ from .._constants import DEFAULT_INITIAL_RESPONSE_TIMEOUT, DEFAULT_LONG_STREAM_TIMEOUT if TYPE_CHECKING: - from ..driver import Driver as SyncDriver + from ..driver import Driver as SyncDriver, DriverConfig from ..aio.driver import Driver as AsyncDriver @@ -85,6 +85,10 @@ def __init__(self, driver: DriverT, settings: Optional[base.QueryClientSettings] self._last_query_stats = None + @property + def _driver_config(self) -> Optional["DriverConfig"]: + return getattr(self._driver, "_driver_config", None) + @property def session_id(self) -> Optional[str]: return self._session_id @@ -369,7 +373,7 @@ def create(self, settings: Optional[BaseRequestSettings] = None) -> "QuerySessio if self._closed: raise RuntimeError("Session is already closed.") - with create_ydb_span("ydb.CreateSession", self._driver._driver_config): + with create_ydb_span("ydb.CreateSession", self._driver_config): self._create_call(settings=settings) self._attach() @@ -438,7 +442,7 @@ def execute( self._check_session_ready_to_use() span = create_ydb_span( - "ydb.ExecuteQuery", self._driver._driver_config, session_id=self._session_id, node_id=self._node_id + "ydb.ExecuteQuery", self._driver_config, session_id=self._session_id, node_id=self._node_id ) try: diff --git a/ydb/query/transaction.py b/ydb/query/transaction.py index d2aef95c..f96b7788 100644 --- a/ydb/query/transaction.py +++ b/ydb/query/transaction.py @@ -245,6 +245,10 @@ def __init__(self, driver: DriverT, session: "BaseQuerySession", tx_mode: base.B self._external_error = None self._last_query_stats = None + @property + def _driver_config(self): + return getattr(self._driver, "_driver_config", None) + @property def session_id(self) -> Optional[str]: """ @@ -556,7 +560,7 @@ def commit(self, settings: Optional[BaseRequestSettings] = None) -> None: with create_ydb_span( "ydb.Commit", - self._driver._driver_config, + self._driver_config, session_id=self.session.session_id, node_id=self.session.node_id, tx_id=self._tx_state.tx_id, @@ -589,7 +593,7 @@ def rollback(self, settings: Optional[BaseRequestSettings] = None) -> None: with create_ydb_span( "ydb.Rollback", - self._driver._driver_config, + self._driver_config, session_id=self.session.session_id, node_id=self.session.node_id, tx_id=self._tx_state.tx_id, @@ -651,7 +655,7 @@ def execute( span = create_ydb_span( "ydb.ExecuteQuery", - self._driver._driver_config, + self._driver_config, session_id=self.session.session_id, node_id=self.session.node_id, tx_id=self._tx_state.tx_id,