From fdec46b4adb805ca4998b96e27208702ba1e01c4 Mon Sep 17 00:00:00 2001 From: zeevdr Date: Sun, 31 May 2026 09:54:38 +0300 Subject: [PATCH] feat(sdk): add otel=True instrumentation flag for Python SDK Adds opt-in OpenTelemetry gRPC instrumentation to ConfigClient and AsyncConfigClient. When otel=True, an OTel interceptor is wired outermost on the channel so get/set/watch RPCs appear in application traces. - New _otel.py module with lazy imports (no dep cost when otel=False) - otel optional extra: pip install opendecree[otel] - Works for sync client, async client, and watchers (inherit channel) - OTel interceptor is outermost (wraps user interceptors + auth) - 100% test coverage; 10 new unit tests Co-Authored-By: Claude Closes #23 --- sdk/docs/configuration.md | 30 ++++++ sdk/pyproject.toml | 7 ++ sdk/src/opendecree/_otel.py | 41 +++++++ sdk/src/opendecree/async_client.py | 16 ++- sdk/src/opendecree/client.py | 14 ++- sdk/tests/test_otel.py | 168 +++++++++++++++++++++++++++++ 6 files changed, 273 insertions(+), 3 deletions(-) create mode 100644 sdk/src/opendecree/_otel.py create mode 100644 sdk/tests/test_otel.py diff --git a/sdk/docs/configuration.md b/sdk/docs/configuration.md index 7e1b4a2..6103b82 100644 --- a/sdk/docs/configuration.md +++ b/sdk/docs/configuration.md @@ -21,6 +21,9 @@ ConfigClient( # Behavior timeout: float = 10.0, # default RPC timeout in seconds retry: RetryConfig | None = RetryConfig(), # retry config (None to disable) + + # Observability + otel: bool = False, # wire OTel gRPC interceptor (requires opendecree[otel]) ) ``` @@ -146,6 +149,33 @@ client.set( Use `idempotency_key` only when the write is genuinely safe to apply more than once — for example, setting a field to a known constant value. +## OpenTelemetry instrumentation + +Pass `otel=True` to wire an OpenTelemetry gRPC interceptor. Get, set, and watch RPCs will appear as spans in your application traces. + +```python +from opendecree import ConfigClient + +client = ConfigClient("localhost:9090", otel=True) +``` + +Requires the optional extra: + +``` +pip install 'opendecree[otel]' +``` + +The OTel interceptor is outermost — it wraps both user-supplied interceptors and the SDK's internal auth interceptor, so every outbound RPC is traced end-to-end. The same flag works on `AsyncConfigClient`: + +```python +from opendecree import AsyncConfigClient + +async with AsyncConfigClient("localhost:9090", otel=True) as client: + val = await client.get("tenant-id", "payments.fee") +``` + +The watcher inherits the client's already-instrumented channel — no additional configuration needed. + ## Timeouts The `timeout` parameter sets the default per-RPC deadline in seconds: diff --git a/sdk/pyproject.toml b/sdk/pyproject.toml index cae4ce4..48cb811 100644 --- a/sdk/pyproject.toml +++ b/sdk/pyproject.toml @@ -28,6 +28,9 @@ dependencies = [ ] [project.optional-dependencies] +otel = [ + "opentelemetry-instrumentation-grpc>=0.50b0", +] dev = [ "pytest>=9.0.3", "pytest-cov>=7.1.0", @@ -80,6 +83,10 @@ ignore_missing_imports = true module = "google.rpc.*" ignore_missing_imports = true +[[tool.mypy.overrides]] +module = "opentelemetry.*" +ignore_missing_imports = true + [tool.pytest.ini_options] testpaths = ["tests"] asyncio_mode = "auto" diff --git a/sdk/src/opendecree/_otel.py b/sdk/src/opendecree/_otel.py new file mode 100644 index 0000000..1c07c1e --- /dev/null +++ b/sdk/src/opendecree/_otel.py @@ -0,0 +1,41 @@ +"""Optional OpenTelemetry gRPC instrumentation. + +Lazily imports opentelemetry-instrumentation-grpc so the core SDK does not +require OTel as a dependency. Install the optional extra: + + pip install 'opendecree[otel]' +""" + +from __future__ import annotations + +from typing import Any + +_HINT = "Install: pip install 'opendecree[otel]'" + + +def make_sync_interceptor() -> Any: + """Return an OpenTelemetryClientInterceptor for use with grpc.intercept_channel.""" + try: + from opentelemetry.instrumentation.grpc import OpenTelemetryClientInterceptor + except ImportError as exc: + raise ImportError( + f"opentelemetry-instrumentation-grpc is required when otel=True. {_HINT}" + ) from exc + return OpenTelemetryClientInterceptor() + + +def make_aio_interceptors() -> list[Any]: + """Return OTel interceptors for a grpc.aio channel. + + Uses the internal _aio_client module — there is no public per-channel API + for async OTel instrumentation in opentelemetry-instrumentation-grpc. + """ + try: + from opentelemetry.instrumentation.grpc._aio_client import ( + OpenTelemetryAioClientInterceptor, + ) + except ImportError as exc: + raise ImportError( + f"opentelemetry-instrumentation-grpc is required when otel=True. {_HINT}" + ) from exc + return [OpenTelemetryAioClientInterceptor()] diff --git a/sdk/src/opendecree/async_client.py b/sdk/src/opendecree/async_client.py index c44878f..167b8da 100644 --- a/sdk/src/opendecree/async_client.py +++ b/sdk/src/opendecree/async_client.py @@ -55,6 +55,7 @@ def __init__( retry: RetryConfig | None = None, check_version: bool = False, interceptors: list[Any] | None = None, + otel: bool = False, ) -> None: """Create a new AsyncConfigClient. @@ -80,6 +81,10 @@ def __init__( interceptors: Optional list of :class:`grpc.aio.ClientInterceptor` instances to inject (e.g., for logging, tracing, or metrics). Passed directly to the ``grpc.aio`` channel. + otel: When True, wire an OpenTelemetry gRPC client interceptor so + get/set/watch RPCs appear in your application traces. Requires + ``pip install 'opendecree[otel]'``. The OTel interceptor is + outermost, wrapping all other interceptors. """ self._timeout = timeout self._retry = retry if retry is not None else RetryConfig() @@ -105,12 +110,21 @@ def __init__( self._auth_metadata = _build_metadata( subject=subject, role=role, tenant_id=tenant_id, token=metadata_token ) + + # OTel interceptors are outermost; user interceptors follow. + all_interceptors: list[Any] = [] + if otel: + from opendecree._otel import make_aio_interceptors + + all_interceptors.extend(make_aio_interceptors()) + all_interceptors.extend(interceptors or []) + self._channel = create_aio_channel( target, insecure=insecure, credentials=credentials, token=channel_token, - interceptors=interceptors, + interceptors=all_interceptors or None, ) cs_pb2, cs_grpc = ensure_stubs() diff --git a/sdk/src/opendecree/client.py b/sdk/src/opendecree/client.py index e32c813..abd7b28 100644 --- a/sdk/src/opendecree/client.py +++ b/sdk/src/opendecree/client.py @@ -58,6 +58,7 @@ def __init__( retry: RetryConfig | None = None, check_version: bool = False, interceptors: list[Any] | None = None, + otel: bool = False, ) -> None: """Create a new ConfigClient. @@ -86,6 +87,10 @@ def __init__( (e.g., for logging, tracing, or metrics). User-supplied interceptors are applied outermost (before the SDK's internal auth interceptor). + otel: When True, wire an OpenTelemetry gRPC client interceptor so + get/set/watch RPCs appear in your application traces. Requires + ``pip install 'opendecree[otel]'``. The OTel interceptor is + outermost, wrapping all other interceptors. """ self._timeout = timeout self._retry = retry if retry is not None else RetryConfig() @@ -109,8 +114,13 @@ def __init__( metadata = _build_metadata( subject=subject, role=role, tenant_id=tenant_id, token=metadata_token ) - # User interceptors are outermost; auth interceptor runs inside them. - all_interceptors: list[Any] = list(interceptors) if interceptors else [] + # OTel → user interceptors → auth interceptor (outermost first). + all_interceptors: list[Any] = [] + if otel: + from opendecree._otel import make_sync_interceptor + + all_interceptors.append(make_sync_interceptor()) + all_interceptors.extend(interceptors or []) if metadata: all_interceptors.append(AuthInterceptor(metadata)) diff --git a/sdk/tests/test_otel.py b/sdk/tests/test_otel.py new file mode 100644 index 0000000..686719c --- /dev/null +++ b/sdk/tests/test_otel.py @@ -0,0 +1,168 @@ +"""Tests for OTel opt-in instrumentation flag.""" + +import sys +from types import ModuleType +from unittest.mock import MagicMock, patch + +import pytest + + +def _fake_otel_modules() -> dict[str, ModuleType]: + """Build a minimal fake opentelemetry-instrumentation-grpc module tree.""" + fake_interceptor = MagicMock(name="OpenTelemetryClientInterceptor") + fake_aio_interceptor = MagicMock(name="OpenTelemetryAioClientInterceptor") + + grpc_mod = ModuleType("opentelemetry.instrumentation.grpc") + grpc_mod.OpenTelemetryClientInterceptor = fake_interceptor # type: ignore[attr-defined] + + aio_client_mod = ModuleType("opentelemetry.instrumentation.grpc._aio_client") + aio_client_mod.OpenTelemetryAioClientInterceptor = fake_aio_interceptor # type: ignore[attr-defined] + + otel_mod = ModuleType("opentelemetry") + instr_mod = ModuleType("opentelemetry.instrumentation") + + return { + "opentelemetry": otel_mod, + "opentelemetry.instrumentation": instr_mod, + "opentelemetry.instrumentation.grpc": grpc_mod, + "opentelemetry.instrumentation.grpc._aio_client": aio_client_mod, + } + + +class TestMakeSyncInterceptor: + def test_returns_interceptor_when_package_installed(self): + from opendecree._otel import make_sync_interceptor + + with patch.dict(sys.modules, _fake_otel_modules()): + result = make_sync_interceptor() + assert result is not None + + def test_raises_import_error_when_not_installed(self): + from opendecree._otel import make_sync_interceptor + + blocked = {k: None for k in _fake_otel_modules()} # type: ignore[assignment] + with patch.dict(sys.modules, blocked): + with pytest.raises(ImportError, match="opendecree\\[otel\\]"): + make_sync_interceptor() + + +class TestMakeAioInterceptors: + def test_returns_list_when_package_installed(self): + from opendecree._otel import make_aio_interceptors + + with patch.dict(sys.modules, _fake_otel_modules()): + result = make_aio_interceptors() + assert isinstance(result, list) + assert len(result) == 1 + + def test_raises_import_error_when_not_installed(self): + from opendecree._otel import make_aio_interceptors + + blocked = {k: None for k in _fake_otel_modules()} # type: ignore[assignment] + with patch.dict(sys.modules, blocked): + with pytest.raises(ImportError, match="opendecree\\[otel\\]"): + make_aio_interceptors() + + +class TestSyncClientOtel: + def test_otel_false_does_not_call_make_interceptor(self): + with patch("opendecree.client.create_channel") as mock_ch: + mock_ch.return_value = MagicMock() + with patch("opendecree.client.grpc.intercept_channel"): + with patch("opendecree._otel.make_sync_interceptor") as mock_otel: + import opendecree + + opendecree.ConfigClient("localhost:9090") + mock_otel.assert_not_called() + + def test_otel_true_prepends_interceptor(self): + fake_otel_interceptor = MagicMock(name="otel_interceptor") + + with patch("opendecree.client.create_channel") as mock_ch: + mock_ch.return_value = MagicMock() + with patch("opendecree.client.grpc.intercept_channel") as mock_intercept: + mock_intercept.return_value = MagicMock() + with patch.dict(sys.modules, _fake_otel_modules()): + # Make make_sync_interceptor return our fake + with patch( + "opendecree._otel.make_sync_interceptor", + return_value=fake_otel_interceptor, + ): + import opendecree + + opendecree.ConfigClient("localhost:9090", otel=True) + + interceptors_used = mock_intercept.call_args[0][1:] + assert interceptors_used[0] is fake_otel_interceptor + + def test_otel_true_otel_before_user_interceptors(self): + fake_otel_interceptor = MagicMock(name="otel_interceptor") + user_interceptor = MagicMock(name="user_interceptor") + + with patch("opendecree.client.create_channel") as mock_ch: + mock_ch.return_value = MagicMock() + with patch("opendecree.client.grpc.intercept_channel") as mock_intercept: + mock_intercept.return_value = MagicMock() + with patch( + "opendecree._otel.make_sync_interceptor", + return_value=fake_otel_interceptor, + ): + import opendecree + + opendecree.ConfigClient( + "localhost:9090", otel=True, interceptors=[user_interceptor] + ) + + interceptors_used = mock_intercept.call_args[0][1:] + otel_idx = list(interceptors_used).index(fake_otel_interceptor) + user_idx = list(interceptors_used).index(user_interceptor) + assert otel_idx < user_idx + + +class TestAsyncClientOtel: + def test_otel_false_does_not_call_make_interceptors(self): + with patch("opendecree.async_client.create_aio_channel") as mock_ch: + mock_ch.return_value = MagicMock() + with patch("opendecree._otel.make_aio_interceptors") as mock_otel: + import opendecree + + opendecree.AsyncConfigClient("localhost:9090") + mock_otel.assert_not_called() + + def test_otel_true_passes_interceptors_to_channel(self): + fake_otel_interceptor = MagicMock(name="aio_otel_interceptor") + + with patch("opendecree.async_client.create_aio_channel") as mock_ch: + mock_ch.return_value = MagicMock() + with patch( + "opendecree._otel.make_aio_interceptors", + return_value=[fake_otel_interceptor], + ): + import opendecree + + opendecree.AsyncConfigClient("localhost:9090", otel=True) + + interceptors_passed = mock_ch.call_args.kwargs.get("interceptors") + assert interceptors_passed is not None + assert fake_otel_interceptor in interceptors_passed + + def test_otel_true_otel_before_user_interceptors(self): + fake_otel_interceptor = MagicMock(name="aio_otel_interceptor") + user_interceptor = MagicMock(name="user_interceptor") + + with patch("opendecree.async_client.create_aio_channel") as mock_ch: + mock_ch.return_value = MagicMock() + with patch( + "opendecree._otel.make_aio_interceptors", + return_value=[fake_otel_interceptor], + ): + import opendecree + + opendecree.AsyncConfigClient( + "localhost:9090", otel=True, interceptors=[user_interceptor] + ) + + interceptors_passed = mock_ch.call_args.kwargs.get("interceptors") + otel_idx = interceptors_passed.index(fake_otel_interceptor) + user_idx = interceptors_passed.index(user_interceptor) + assert otel_idx < user_idx