Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions sdk/docs/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@ ConfigClient(
# Behavior
timeout: float = 10.0, # default RPC timeout in seconds
retry: RetryConfig | None = RetryConfig(), # retry config (None to disable)

# Observability
otel: bool = False, # wire OTel gRPC interceptor (requires opendecree[otel])
)
```

Expand Down Expand Up @@ -146,6 +149,33 @@ client.set(
Use `idempotency_key` only when the write is genuinely safe to apply more than once — for example,
setting a field to a known constant value.

## OpenTelemetry instrumentation

Pass `otel=True` to wire an OpenTelemetry gRPC interceptor. Get, set, and watch RPCs will appear as spans in your application traces.

```python
from opendecree import ConfigClient

client = ConfigClient("localhost:9090", otel=True)
```

Requires the optional extra:

```
pip install 'opendecree[otel]'
```

The OTel interceptor is outermost — it wraps both user-supplied interceptors and the SDK's internal auth interceptor, so every outbound RPC is traced end-to-end. The same flag works on `AsyncConfigClient`:

```python
from opendecree import AsyncConfigClient

async with AsyncConfigClient("localhost:9090", otel=True) as client:
val = await client.get("tenant-id", "payments.fee")
```

The watcher inherits the client's already-instrumented channel — no additional configuration needed.

## Timeouts

The `timeout` parameter sets the default per-RPC deadline in seconds:
Expand Down
7 changes: 7 additions & 0 deletions sdk/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,9 @@ dependencies = [
]

[project.optional-dependencies]
otel = [
"opentelemetry-instrumentation-grpc>=0.50b0",
]
dev = [
"pytest>=9.0.3",
"pytest-cov>=7.1.0",
Expand Down Expand Up @@ -80,6 +83,10 @@ ignore_missing_imports = true
module = "google.rpc.*"
ignore_missing_imports = true

[[tool.mypy.overrides]]
module = "opentelemetry.*"
ignore_missing_imports = true

[tool.pytest.ini_options]
testpaths = ["tests"]
asyncio_mode = "auto"
Expand Down
41 changes: 41 additions & 0 deletions sdk/src/opendecree/_otel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
"""Optional OpenTelemetry gRPC instrumentation.

Lazily imports opentelemetry-instrumentation-grpc so the core SDK does not
require OTel as a dependency. Install the optional extra:

pip install 'opendecree[otel]'
"""

from __future__ import annotations

from typing import Any

_HINT = "Install: pip install 'opendecree[otel]'"


def make_sync_interceptor() -> Any:
"""Return an OpenTelemetryClientInterceptor for use with grpc.intercept_channel."""
try:
from opentelemetry.instrumentation.grpc import OpenTelemetryClientInterceptor
except ImportError as exc:
raise ImportError(
f"opentelemetry-instrumentation-grpc is required when otel=True. {_HINT}"
) from exc
return OpenTelemetryClientInterceptor()


def make_aio_interceptors() -> list[Any]:
"""Return OTel interceptors for a grpc.aio channel.

Uses the internal _aio_client module — there is no public per-channel API
for async OTel instrumentation in opentelemetry-instrumentation-grpc.
"""
try:
from opentelemetry.instrumentation.grpc._aio_client import (
OpenTelemetryAioClientInterceptor,
)
except ImportError as exc:
raise ImportError(
f"opentelemetry-instrumentation-grpc is required when otel=True. {_HINT}"
) from exc
return [OpenTelemetryAioClientInterceptor()]
16 changes: 15 additions & 1 deletion sdk/src/opendecree/async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ def __init__(
retry: RetryConfig | None = None,
check_version: bool = False,
interceptors: list[Any] | None = None,
otel: bool = False,
) -> None:
"""Create a new AsyncConfigClient.

Expand All @@ -80,6 +81,10 @@ def __init__(
interceptors: Optional list of :class:`grpc.aio.ClientInterceptor`
instances to inject (e.g., for logging, tracing, or metrics).
Passed directly to the ``grpc.aio`` channel.
otel: When True, wire an OpenTelemetry gRPC client interceptor so
get/set/watch RPCs appear in your application traces. Requires
``pip install 'opendecree[otel]'``. The OTel interceptor is
outermost, wrapping all other interceptors.
"""
self._timeout = timeout
self._retry = retry if retry is not None else RetryConfig()
Expand All @@ -105,12 +110,21 @@ def __init__(
self._auth_metadata = _build_metadata(
subject=subject, role=role, tenant_id=tenant_id, token=metadata_token
)

# OTel interceptors are outermost; user interceptors follow.
all_interceptors: list[Any] = []
if otel:
from opendecree._otel import make_aio_interceptors

all_interceptors.extend(make_aio_interceptors())
all_interceptors.extend(interceptors or [])

self._channel = create_aio_channel(
target,
insecure=insecure,
credentials=credentials,
token=channel_token,
interceptors=interceptors,
interceptors=all_interceptors or None,
)

cs_pb2, cs_grpc = ensure_stubs()
Expand Down
14 changes: 12 additions & 2 deletions sdk/src/opendecree/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def __init__(
retry: RetryConfig | None = None,
check_version: bool = False,
interceptors: list[Any] | None = None,
otel: bool = False,
) -> None:
"""Create a new ConfigClient.

Expand Down Expand Up @@ -86,6 +87,10 @@ def __init__(
(e.g., for logging, tracing, or metrics). User-supplied
interceptors are applied outermost (before the SDK's internal
auth interceptor).
otel: When True, wire an OpenTelemetry gRPC client interceptor so
get/set/watch RPCs appear in your application traces. Requires
``pip install 'opendecree[otel]'``. The OTel interceptor is
outermost, wrapping all other interceptors.
"""
self._timeout = timeout
self._retry = retry if retry is not None else RetryConfig()
Expand All @@ -109,8 +114,13 @@ def __init__(
metadata = _build_metadata(
subject=subject, role=role, tenant_id=tenant_id, token=metadata_token
)
# User interceptors are outermost; auth interceptor runs inside them.
all_interceptors: list[Any] = list(interceptors) if interceptors else []
# OTel → user interceptors → auth interceptor (outermost first).
all_interceptors: list[Any] = []
if otel:
from opendecree._otel import make_sync_interceptor

all_interceptors.append(make_sync_interceptor())
all_interceptors.extend(interceptors or [])
if metadata:
all_interceptors.append(AuthInterceptor(metadata))

Expand Down
168 changes: 168 additions & 0 deletions sdk/tests/test_otel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,168 @@
"""Tests for OTel opt-in instrumentation flag."""

import sys
from types import ModuleType
from unittest.mock import MagicMock, patch

import pytest


def _fake_otel_modules() -> dict[str, ModuleType]:
"""Build a minimal fake opentelemetry-instrumentation-grpc module tree."""
fake_interceptor = MagicMock(name="OpenTelemetryClientInterceptor")
fake_aio_interceptor = MagicMock(name="OpenTelemetryAioClientInterceptor")

grpc_mod = ModuleType("opentelemetry.instrumentation.grpc")
grpc_mod.OpenTelemetryClientInterceptor = fake_interceptor # type: ignore[attr-defined]

aio_client_mod = ModuleType("opentelemetry.instrumentation.grpc._aio_client")
aio_client_mod.OpenTelemetryAioClientInterceptor = fake_aio_interceptor # type: ignore[attr-defined]

otel_mod = ModuleType("opentelemetry")
instr_mod = ModuleType("opentelemetry.instrumentation")

return {
"opentelemetry": otel_mod,
"opentelemetry.instrumentation": instr_mod,
"opentelemetry.instrumentation.grpc": grpc_mod,
"opentelemetry.instrumentation.grpc._aio_client": aio_client_mod,
}


class TestMakeSyncInterceptor:
def test_returns_interceptor_when_package_installed(self):
from opendecree._otel import make_sync_interceptor

with patch.dict(sys.modules, _fake_otel_modules()):
result = make_sync_interceptor()
assert result is not None

def test_raises_import_error_when_not_installed(self):
from opendecree._otel import make_sync_interceptor

blocked = {k: None for k in _fake_otel_modules()} # type: ignore[assignment]
with patch.dict(sys.modules, blocked):
with pytest.raises(ImportError, match="opendecree\\[otel\\]"):
make_sync_interceptor()


class TestMakeAioInterceptors:
def test_returns_list_when_package_installed(self):
from opendecree._otel import make_aio_interceptors

with patch.dict(sys.modules, _fake_otel_modules()):
result = make_aio_interceptors()
assert isinstance(result, list)
assert len(result) == 1

def test_raises_import_error_when_not_installed(self):
from opendecree._otel import make_aio_interceptors

blocked = {k: None for k in _fake_otel_modules()} # type: ignore[assignment]
with patch.dict(sys.modules, blocked):
with pytest.raises(ImportError, match="opendecree\\[otel\\]"):
make_aio_interceptors()


class TestSyncClientOtel:
def test_otel_false_does_not_call_make_interceptor(self):
with patch("opendecree.client.create_channel") as mock_ch:
mock_ch.return_value = MagicMock()
with patch("opendecree.client.grpc.intercept_channel"):
with patch("opendecree._otel.make_sync_interceptor") as mock_otel:
import opendecree

opendecree.ConfigClient("localhost:9090")
mock_otel.assert_not_called()

def test_otel_true_prepends_interceptor(self):
fake_otel_interceptor = MagicMock(name="otel_interceptor")

with patch("opendecree.client.create_channel") as mock_ch:
mock_ch.return_value = MagicMock()
with patch("opendecree.client.grpc.intercept_channel") as mock_intercept:
mock_intercept.return_value = MagicMock()
with patch.dict(sys.modules, _fake_otel_modules()):
# Make make_sync_interceptor return our fake
with patch(
"opendecree._otel.make_sync_interceptor",
return_value=fake_otel_interceptor,
):
import opendecree

opendecree.ConfigClient("localhost:9090", otel=True)

interceptors_used = mock_intercept.call_args[0][1:]
assert interceptors_used[0] is fake_otel_interceptor

def test_otel_true_otel_before_user_interceptors(self):
fake_otel_interceptor = MagicMock(name="otel_interceptor")
user_interceptor = MagicMock(name="user_interceptor")

with patch("opendecree.client.create_channel") as mock_ch:
mock_ch.return_value = MagicMock()
with patch("opendecree.client.grpc.intercept_channel") as mock_intercept:
mock_intercept.return_value = MagicMock()
with patch(
"opendecree._otel.make_sync_interceptor",
return_value=fake_otel_interceptor,
):
import opendecree

opendecree.ConfigClient(
"localhost:9090", otel=True, interceptors=[user_interceptor]
)

interceptors_used = mock_intercept.call_args[0][1:]
otel_idx = list(interceptors_used).index(fake_otel_interceptor)
user_idx = list(interceptors_used).index(user_interceptor)
assert otel_idx < user_idx


class TestAsyncClientOtel:
def test_otel_false_does_not_call_make_interceptors(self):
with patch("opendecree.async_client.create_aio_channel") as mock_ch:
mock_ch.return_value = MagicMock()
with patch("opendecree._otel.make_aio_interceptors") as mock_otel:
import opendecree

opendecree.AsyncConfigClient("localhost:9090")
mock_otel.assert_not_called()

def test_otel_true_passes_interceptors_to_channel(self):
fake_otel_interceptor = MagicMock(name="aio_otel_interceptor")

with patch("opendecree.async_client.create_aio_channel") as mock_ch:
mock_ch.return_value = MagicMock()
with patch(
"opendecree._otel.make_aio_interceptors",
return_value=[fake_otel_interceptor],
):
import opendecree

opendecree.AsyncConfigClient("localhost:9090", otel=True)

interceptors_passed = mock_ch.call_args.kwargs.get("interceptors")
assert interceptors_passed is not None
assert fake_otel_interceptor in interceptors_passed

def test_otel_true_otel_before_user_interceptors(self):
fake_otel_interceptor = MagicMock(name="aio_otel_interceptor")
user_interceptor = MagicMock(name="user_interceptor")

with patch("opendecree.async_client.create_aio_channel") as mock_ch:
mock_ch.return_value = MagicMock()
with patch(
"opendecree._otel.make_aio_interceptors",
return_value=[fake_otel_interceptor],
):
import opendecree

opendecree.AsyncConfigClient(
"localhost:9090", otel=True, interceptors=[user_interceptor]
)

interceptors_passed = mock_ch.call_args.kwargs.get("interceptors")
otel_idx = interceptors_passed.index(fake_otel_interceptor)
user_idx = interceptors_passed.index(user_interceptor)
assert otel_idx < user_idx