Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions sdk/tests/test_async_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -317,3 +317,68 @@ def test_multiple_custom_interceptors_preserved(self):
mock_ch.return_value = MagicMock()
AsyncConfigClient("localhost:9090", interceptors=[a, b])
assert mock_ch.call_args.kwargs["interceptors"] == [a, b]

def test_insecure_token_emits_warning(self):
import warnings

with patch("opendecree.async_client.create_aio_channel") as mock_ch:
mock_ch.return_value = MagicMock()
with warnings.catch_warnings(record=True) as w:
warnings.simplefilter("always")
AsyncConfigClient("localhost:9090", insecure=True, token="tok")
assert len(w) == 1
assert issubclass(w[0].category, UserWarning)
assert "cleartext" in str(w[0].message)

@pytest.mark.asyncio
async def test_get_server_version(self):
client = self._make_client()
from opendecree.types import ServerVersion

sv = ServerVersion(version="0.2.0", commit="abc123")
with patch(
"opendecree.async_client.async_fetch_server_version",
new_callable=AsyncMock,
return_value=sv,
) as mock_fetch:
result = await client.get_server_version()
assert result == sv
mock_fetch.assert_called_once()
# Second call uses cache — fetch not called again
with patch(
"opendecree.async_client.async_fetch_server_version",
new_callable=AsyncMock,
) as mock_fetch2:
result2 = await client.get_server_version()
assert result2 == sv
mock_fetch2.assert_not_called()

@pytest.mark.asyncio
async def test_check_version_true_triggers_compat_check_on_first_call(self):
with patch("opendecree.async_client.create_aio_channel") as mock_ch:
mock_ch.return_value = MagicMock()
client = AsyncConfigClient("localhost:9090", check_version=True)
client._stub = MagicMock()

from opendecree._generated.centralconfig.v1 import types_pb2

mock_resp = MagicMock()
mock_resp.value.HasField.return_value = True
mock_resp.value.value = types_pb2.TypedValue(string_value="hello")
client._stub.GetField = AsyncMock(return_value=mock_resp)

from opendecree.types import ServerVersion

sv = ServerVersion(version="0.2.0", commit="abc")
with patch(
"opendecree.async_client.async_fetch_server_version",
new_callable=AsyncMock,
return_value=sv,
):
with patch("opendecree.async_client.check_version_compatible") as mock_check:
result = await client.get("t1", "some.field")

mock_check.assert_called_once_with("0.2.0")
assert result == "hello"
# version_checked flag prevents second check
assert client._version_checked is True
99 changes: 99 additions & 0 deletions sdk/tests/test_async_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,30 @@ def test_default_max_queue_size(self):
assert f._max_queue_size == _DEFAULT_MAX_QUEUE_SIZE
assert _DEFAULT_MAX_QUEUE_SIZE == 1024

@pytest.mark.asyncio
async def test_changes_spurious_wake_skipped(self):
f = AsyncWatchedField("x", str, "")
f._load_initial("a")

# Set event without putting anything in the queue → spurious wake
f._queue_event.set()

async def _collect_first():
async for c in f.changes():
return c

task = asyncio.create_task(_collect_first())
# Yield twice so the task can process the spurious wake (clears event, loops back)
await asyncio.sleep(0)
await asyncio.sleep(0)

# Now deliver a real change + stop
change = Change(field_path="x", old_value="a", new_value="b", version=1)
f._update("b", change)

result = await asyncio.wait_for(task, timeout=2.0)
assert result.new_value == "b"

@pytest.mark.asyncio
async def test_changes_iterator_after_overflow(self):
f = AsyncWatchedField("x", str, "", max_queue_size=2)
Expand Down Expand Up @@ -410,6 +434,81 @@ async def empty_stream():
_, sub_kwargs = stub.Subscribe.call_args
assert sub_kwargs.get("metadata") == auth_meta

@pytest.mark.asyncio
async def test_processes_stream_response(self):
from opendecree._generated.centralconfig.v1 import types_pb2

w = self._make_watcher()
fee = w.field("fee", float, default=0.0)

response = MagicMock()
response.change.field_path = "fee"
response.change.HasField.side_effect = lambda name: name in ("old_value", "new_value")
response.change.old_value = types_pb2.TypedValue(string_value="0.0")
response.change.new_value = types_pb2.TypedValue(string_value="1.5")
response.change.version = 1
response.change.changed_by = ""

async def stream_with_one_response():
yield response
# Stream ends normally (server closed)

w._stub.Subscribe = MagicMock(return_value=stream_with_one_response())

await w.start()
await asyncio.sleep(0.2)
await w.stop()

assert fee.value == pytest.approx(1.5)

@pytest.mark.asyncio
async def test_stream_loop_returns_when_stopped_during_iteration(self):
from opendecree._generated.centralconfig.v1 import types_pb2

w = self._make_watcher()
w.field("fee", float, default=0.0)

response = MagicMock()
response.change.field_path = "fee"
response.change.HasField.side_effect = lambda name: name in ("old_value", "new_value")
response.change.old_value = types_pb2.TypedValue(string_value="0.0")
response.change.new_value = types_pb2.TypedValue(string_value="1.5")
response.change.version = 1
response.change.changed_by = ""

async def stream_already_stopped():
w._stopped = True # mark stopped before the loop body runs
yield response

w._stub.Subscribe = MagicMock(return_value=stream_already_stopped())

await w.start()
await asyncio.sleep(0.2)

assert w._task is not None
assert w._task.done()
await w.stop()

@pytest.mark.asyncio
async def test_stream_aiorpc_error_while_stopped_exits_cleanly(self):
w = self._make_watcher()
w.field("fee", float, default=0.0)

async def error_after_stop():
w._stopped = True # mark stopped before raising
raise FakeRpcError(grpc.StatusCode.UNAVAILABLE, "gone")
yield # makes it an async generator

w._stub.Subscribe = MagicMock(return_value=error_after_stop())

await w.start()
await asyncio.sleep(0.2)

# Task should have exited cleanly (stopped=True path)
assert w._task is not None
assert w._task.done()
await w.stop()

@pytest.mark.asyncio
async def test_task_name_sanitizes_control_chars(self):
stub = MagicMock()
Expand Down
22 changes: 22 additions & 0 deletions sdk/tests/test_channel.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,3 +181,25 @@ def test_keepalive_override(self):
opts = dict(kwargs["options"])
assert opts["grpc.keepalive_time_ms"] == 60000
assert opts["grpc.keepalive_timeout_ms"] == 5000


def test_token_call_credentials_callback_injects_bearer():
from opendecree._channel import _token_call_credentials

captured = []

def fake_metadata_call_creds(fn):
captured.append(fn)
return MagicMock(spec=grpc.CallCredentials)

with patch(
"opendecree._channel.grpc.metadata_call_credentials",
side_effect=fake_metadata_call_creds,
):
_token_call_credentials("secret-tok")

# Invoke the captured inner callback directly
received = []
captured[0](None, lambda meta, err: received.append((meta, err)))
assert received[0][0] == [("authorization", "Bearer secret-tok")]
assert received[0][1] is None
23 changes: 23 additions & 0 deletions sdk/tests/test_convert.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,3 +276,26 @@ def test_typed_value_to_string_time():
tv = types_pb2.TypedValue(time_value=t)
result = typed_value_to_string(tv)
assert "2023" in result # RFC 3339 format


def test_typed_value_to_string_unknown_kind():
"""The case _ fallback returns str(val) for unrecognised kind strings."""
from unittest.mock import patch

from opendecree._convert import typed_value_to_string
from opendecree._generated.centralconfig.v1 import types_pb2

class FakeTV:
def WhichOneof(self, name): # noqa: N802
return "exotic_kind"

def __getattr__(self, name):
return "exotic_value"

fake_instance = FakeTV()

# Patch the TypedValue class inside the generated module so isinstance passes.
with patch.object(types_pb2, "TypedValue", FakeTV):
result = typed_value_to_string(fake_instance)

assert result == "exotic_value"
52 changes: 52 additions & 0 deletions sdk/tests/test_retry.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,3 +224,55 @@ async def fake_sleep(s: float) -> None:
await async_with_retry(RetryConfig(max_attempts=3, total_timeout=0.1), fn)

assert slept == []


@pytest.mark.asyncio
async def test_async_deadline_already_passed_before_second_attempt():
"""Async loop-top deadline check stops further attempts once budget is exhausted."""
err = FakeRpcError(grpc.StatusCode.UNAVAILABLE)
call_count = 0

async def fn() -> str:
nonlocal call_count
call_count += 1
raise err

slept: list[float] = []

async def fake_sleep(s: float) -> None:
slept.append(s)

with patch("opendecree._retry.asyncio.sleep", side_effect=fake_sleep):
# monotonic: [deadline_start, loop-top-0 ok, remaining ok, loop-top-1 expired]
with patch("opendecree._retry.time.monotonic", side_effect=[0.0, 0.0, 0.05, 0.2]):
with pytest.raises(grpc.aio.AioRpcError):
await async_with_retry(RetryConfig(max_attempts=3, total_timeout=0.1), fn)

assert call_count == 1 # second attempt blocked by loop-top break


@pytest.mark.asyncio
async def test_async_deadline_clips_sleep():
"""Async sleep is clipped to remaining budget so total wall time stays bounded."""
err = FakeRpcError(grpc.StatusCode.UNAVAILABLE)
call_count = 0

async def fn() -> str:
nonlocal call_count
call_count += 1
if call_count == 1:
raise err
return "ok"

slept: list[float] = []

async def fake_sleep(s: float) -> None:
slept.append(s)

with patch("opendecree._retry.asyncio.sleep", side_effect=fake_sleep):
# monotonic: [deadline_start=0.0, loop-top-0=0.0, remaining-check=0.05, loop-top-1=0.05]
with patch("opendecree._retry.time.monotonic", side_effect=[0.0, 0.0, 0.05, 0.05]):
result = await async_with_retry(RetryConfig(max_attempts=3, total_timeout=0.1), fn)

assert result == "ok"
assert slept[0] <= 0.05 + 1e-9 # sleep clipped to remaining budget
Loading