From 7a330cf78c4d90c0f9128cce4ac44622ce22e272 Mon Sep 17 00:00:00 2001 From: zeevdr Date: Thu, 28 May 2026 20:57:20 +0300 Subject: [PATCH] test: raise unit test coverage to 100% across all SDK modules MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds targeted tests for every previously uncovered branch, covering the 23 missing statements that were keeping coverage below perfection: - _channel.py: invoke the _token_call_credentials inner callback directly - _convert.py: hit the case-_ fallback in typed_value_to_string via a fake TypedValue - _retry.py: async deadline tests — loop-top break (line 116) and sleep-clipping (line 130) - async_client.py: insecure-token UserWarning, get_server_version() caching, and the check_version=True lazy-compat path - async_watcher.py: spurious-wake path in changes(), stream loop body (process_change + stopped-mid-iteration return), and AioRpcError while _stopped=True - watcher.py: queue-cond wait when queue is empty, stream loop body (process_change + stopped-mid-iteration return), and RpcError while stop_event set All 282 tests pass; total coverage 100% (927/927 statements). Closes #112 Co-Authored-By: Claude --- sdk/tests/test_async_client.py | 65 +++++++++++++++ sdk/tests/test_async_watcher.py | 99 +++++++++++++++++++++++ sdk/tests/test_channel.py | 22 ++++++ sdk/tests/test_convert.py | 23 ++++++ sdk/tests/test_retry.py | 52 ++++++++++++ sdk/tests/test_watcher.py | 135 ++++++++++++++++++++++++++++++++ 6 files changed, 396 insertions(+) diff --git a/sdk/tests/test_async_client.py b/sdk/tests/test_async_client.py index 83c35d6..8a2c252 100644 --- a/sdk/tests/test_async_client.py +++ b/sdk/tests/test_async_client.py @@ -317,3 +317,68 @@ def test_multiple_custom_interceptors_preserved(self): mock_ch.return_value = MagicMock() AsyncConfigClient("localhost:9090", interceptors=[a, b]) assert mock_ch.call_args.kwargs["interceptors"] == [a, b] + + def test_insecure_token_emits_warning(self): + import warnings + + with patch("opendecree.async_client.create_aio_channel") as mock_ch: + mock_ch.return_value = MagicMock() + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter("always") + AsyncConfigClient("localhost:9090", insecure=True, token="tok") + assert len(w) == 1 + assert issubclass(w[0].category, UserWarning) + assert "cleartext" in str(w[0].message) + + @pytest.mark.asyncio + async def test_get_server_version(self): + client = self._make_client() + from opendecree.types import ServerVersion + + sv = ServerVersion(version="0.2.0", commit="abc123") + with patch( + "opendecree.async_client.async_fetch_server_version", + new_callable=AsyncMock, + return_value=sv, + ) as mock_fetch: + result = await client.get_server_version() + assert result == sv + mock_fetch.assert_called_once() + # Second call uses cache — fetch not called again + with patch( + "opendecree.async_client.async_fetch_server_version", + new_callable=AsyncMock, + ) as mock_fetch2: + result2 = await client.get_server_version() + assert result2 == sv + mock_fetch2.assert_not_called() + + @pytest.mark.asyncio + async def test_check_version_true_triggers_compat_check_on_first_call(self): + with patch("opendecree.async_client.create_aio_channel") as mock_ch: + mock_ch.return_value = MagicMock() + client = AsyncConfigClient("localhost:9090", check_version=True) + client._stub = MagicMock() + + from opendecree._generated.centralconfig.v1 import types_pb2 + + mock_resp = MagicMock() + mock_resp.value.HasField.return_value = True + mock_resp.value.value = types_pb2.TypedValue(string_value="hello") + client._stub.GetField = AsyncMock(return_value=mock_resp) + + from opendecree.types import ServerVersion + + sv = ServerVersion(version="0.2.0", commit="abc") + with patch( + "opendecree.async_client.async_fetch_server_version", + new_callable=AsyncMock, + return_value=sv, + ): + with patch("opendecree.async_client.check_version_compatible") as mock_check: + result = await client.get("t1", "some.field") + + mock_check.assert_called_once_with("0.2.0") + assert result == "hello" + # version_checked flag prevents second check + assert client._version_checked is True diff --git a/sdk/tests/test_async_watcher.py b/sdk/tests/test_async_watcher.py index d2a9acf..d29e2bd 100644 --- a/sdk/tests/test_async_watcher.py +++ b/sdk/tests/test_async_watcher.py @@ -195,6 +195,30 @@ def test_default_max_queue_size(self): assert f._max_queue_size == _DEFAULT_MAX_QUEUE_SIZE assert _DEFAULT_MAX_QUEUE_SIZE == 1024 + @pytest.mark.asyncio + async def test_changes_spurious_wake_skipped(self): + f = AsyncWatchedField("x", str, "") + f._load_initial("a") + + # Set event without putting anything in the queue → spurious wake + f._queue_event.set() + + async def _collect_first(): + async for c in f.changes(): + return c + + task = asyncio.create_task(_collect_first()) + # Yield twice so the task can process the spurious wake (clears event, loops back) + await asyncio.sleep(0) + await asyncio.sleep(0) + + # Now deliver a real change + stop + change = Change(field_path="x", old_value="a", new_value="b", version=1) + f._update("b", change) + + result = await asyncio.wait_for(task, timeout=2.0) + assert result.new_value == "b" + @pytest.mark.asyncio async def test_changes_iterator_after_overflow(self): f = AsyncWatchedField("x", str, "", max_queue_size=2) @@ -410,6 +434,81 @@ async def empty_stream(): _, sub_kwargs = stub.Subscribe.call_args assert sub_kwargs.get("metadata") == auth_meta + @pytest.mark.asyncio + async def test_processes_stream_response(self): + from opendecree._generated.centralconfig.v1 import types_pb2 + + w = self._make_watcher() + fee = w.field("fee", float, default=0.0) + + response = MagicMock() + response.change.field_path = "fee" + response.change.HasField.side_effect = lambda name: name in ("old_value", "new_value") + response.change.old_value = types_pb2.TypedValue(string_value="0.0") + response.change.new_value = types_pb2.TypedValue(string_value="1.5") + response.change.version = 1 + response.change.changed_by = "" + + async def stream_with_one_response(): + yield response + # Stream ends normally (server closed) + + w._stub.Subscribe = MagicMock(return_value=stream_with_one_response()) + + await w.start() + await asyncio.sleep(0.2) + await w.stop() + + assert fee.value == pytest.approx(1.5) + + @pytest.mark.asyncio + async def test_stream_loop_returns_when_stopped_during_iteration(self): + from opendecree._generated.centralconfig.v1 import types_pb2 + + w = self._make_watcher() + w.field("fee", float, default=0.0) + + response = MagicMock() + response.change.field_path = "fee" + response.change.HasField.side_effect = lambda name: name in ("old_value", "new_value") + response.change.old_value = types_pb2.TypedValue(string_value="0.0") + response.change.new_value = types_pb2.TypedValue(string_value="1.5") + response.change.version = 1 + response.change.changed_by = "" + + async def stream_already_stopped(): + w._stopped = True # mark stopped before the loop body runs + yield response + + w._stub.Subscribe = MagicMock(return_value=stream_already_stopped()) + + await w.start() + await asyncio.sleep(0.2) + + assert w._task is not None + assert w._task.done() + await w.stop() + + @pytest.mark.asyncio + async def test_stream_aiorpc_error_while_stopped_exits_cleanly(self): + w = self._make_watcher() + w.field("fee", float, default=0.0) + + async def error_after_stop(): + w._stopped = True # mark stopped before raising + raise FakeRpcError(grpc.StatusCode.UNAVAILABLE, "gone") + yield # makes it an async generator + + w._stub.Subscribe = MagicMock(return_value=error_after_stop()) + + await w.start() + await asyncio.sleep(0.2) + + # Task should have exited cleanly (stopped=True path) + assert w._task is not None + assert w._task.done() + await w.stop() + @pytest.mark.asyncio async def test_task_name_sanitizes_control_chars(self): stub = MagicMock() diff --git a/sdk/tests/test_channel.py b/sdk/tests/test_channel.py index 24b1422..b3b7776 100644 --- a/sdk/tests/test_channel.py +++ b/sdk/tests/test_channel.py @@ -181,3 +181,25 @@ def test_keepalive_override(self): opts = dict(kwargs["options"]) assert opts["grpc.keepalive_time_ms"] == 60000 assert opts["grpc.keepalive_timeout_ms"] == 5000 + + +def test_token_call_credentials_callback_injects_bearer(): + from opendecree._channel import _token_call_credentials + + captured = [] + + def fake_metadata_call_creds(fn): + captured.append(fn) + return MagicMock(spec=grpc.CallCredentials) + + with patch( + "opendecree._channel.grpc.metadata_call_credentials", + side_effect=fake_metadata_call_creds, + ): + _token_call_credentials("secret-tok") + + # Invoke the captured inner callback directly + received = [] + captured[0](None, lambda meta, err: received.append((meta, err))) + assert received[0][0] == [("authorization", "Bearer secret-tok")] + assert received[0][1] is None diff --git a/sdk/tests/test_convert.py b/sdk/tests/test_convert.py index 7dc121a..5a40bf2 100644 --- a/sdk/tests/test_convert.py +++ b/sdk/tests/test_convert.py @@ -276,3 +276,26 @@ def test_typed_value_to_string_time(): tv = types_pb2.TypedValue(time_value=t) result = typed_value_to_string(tv) assert "2023" in result # RFC 3339 format + + +def test_typed_value_to_string_unknown_kind(): + """The case _ fallback returns str(val) for unrecognised kind strings.""" + from unittest.mock import patch + + from opendecree._convert import typed_value_to_string + from opendecree._generated.centralconfig.v1 import types_pb2 + + class FakeTV: + def WhichOneof(self, name): # noqa: N802 + return "exotic_kind" + + def __getattr__(self, name): + return "exotic_value" + + fake_instance = FakeTV() + + # Patch the TypedValue class inside the generated module so isinstance passes. + with patch.object(types_pb2, "TypedValue", FakeTV): + result = typed_value_to_string(fake_instance) + + assert result == "exotic_value" diff --git a/sdk/tests/test_retry.py b/sdk/tests/test_retry.py index 36f4d1c..f888387 100644 --- a/sdk/tests/test_retry.py +++ b/sdk/tests/test_retry.py @@ -224,3 +224,55 @@ async def fake_sleep(s: float) -> None: await async_with_retry(RetryConfig(max_attempts=3, total_timeout=0.1), fn) assert slept == [] + + +@pytest.mark.asyncio +async def test_async_deadline_already_passed_before_second_attempt(): + """Async loop-top deadline check stops further attempts once budget is exhausted.""" + err = FakeRpcError(grpc.StatusCode.UNAVAILABLE) + call_count = 0 + + async def fn() -> str: + nonlocal call_count + call_count += 1 + raise err + + slept: list[float] = [] + + async def fake_sleep(s: float) -> None: + slept.append(s) + + with patch("opendecree._retry.asyncio.sleep", side_effect=fake_sleep): + # monotonic: [deadline_start, loop-top-0 ok, remaining ok, loop-top-1 expired] + with patch("opendecree._retry.time.monotonic", side_effect=[0.0, 0.0, 0.05, 0.2]): + with pytest.raises(grpc.aio.AioRpcError): + await async_with_retry(RetryConfig(max_attempts=3, total_timeout=0.1), fn) + + assert call_count == 1 # second attempt blocked by loop-top break + + +@pytest.mark.asyncio +async def test_async_deadline_clips_sleep(): + """Async sleep is clipped to remaining budget so total wall time stays bounded.""" + err = FakeRpcError(grpc.StatusCode.UNAVAILABLE) + call_count = 0 + + async def fn() -> str: + nonlocal call_count + call_count += 1 + if call_count == 1: + raise err + return "ok" + + slept: list[float] = [] + + async def fake_sleep(s: float) -> None: + slept.append(s) + + with patch("opendecree._retry.asyncio.sleep", side_effect=fake_sleep): + # monotonic: [deadline_start=0.0, loop-top-0=0.0, remaining-check=0.05, loop-top-1=0.05] + with patch("opendecree._retry.time.monotonic", side_effect=[0.0, 0.0, 0.05, 0.05]): + result = await async_with_retry(RetryConfig(max_attempts=3, total_timeout=0.1), fn) + + assert result == "ok" + assert slept[0] <= 0.05 + 1e-9 # sleep clipped to remaining budget diff --git a/sdk/tests/test_watcher.py b/sdk/tests/test_watcher.py index ad874be..1f8eceb 100644 --- a/sdk/tests/test_watcher.py +++ b/sdk/tests/test_watcher.py @@ -229,6 +229,38 @@ def test_default_max_queue_size(self): assert f._max_queue_size == _DEFAULT_MAX_QUEUE_SIZE assert _DEFAULT_MAX_QUEUE_SIZE == 1024 + def test_changes_blocks_until_change_arrives(self): + import threading + + f = WatchedField("x", str, "") + f._load_initial("a") + + results: list = [] + done = threading.Event() + + def consume(): + for c in f.changes(): + results.append(c) + break + done.set() + + t = threading.Thread(target=consume, daemon=True) + t.start() + + time.sleep(0.05) # let thread enter the wait + + from opendecree.types import Change + + change = Change(field_path="x", old_value="a", new_value="b", version=1) + f._update("b", change) + f._stop() + + done.wait(timeout=2.0) + t.join(timeout=2.0) + + assert len(results) == 1 + assert results[0].new_value == "b" + def test_changes_iterator_after_overflow(self): from opendecree.types import Change @@ -499,3 +531,106 @@ def test_thread_name_sanitizes_control_chars(self): assert "\x1f" not in w._thread.name assert "tenantevil" in w._thread.name w.stop() + + def test_processes_stream_response(self): + from opendecree._generated.centralconfig.v1 import types_pb2 + + stub = MagicMock() + pb2 = MagicMock() + mock_config_resp = MagicMock() + mock_config_resp.config.values = [] + stub.GetConfig.return_value = mock_config_resp + + w = ConfigWatcher(stub, pb2, "t1", timeout=5.0) + fee = w.field("fee", float, default=0.0) + + response = MagicMock() + response.change.field_path = "fee" + response.change.HasField.side_effect = lambda name: name in ("old_value", "new_value") + response.change.old_value = types_pb2.TypedValue(string_value="0.0") + response.change.new_value = types_pb2.TypedValue(string_value="1.5") + response.change.version = 1 + response.change.changed_by = "" + + stub.Subscribe.return_value = iter([response]) + + w.start() + time.sleep(0.2) + w.stop() + + assert fee.value == pytest.approx(1.5) + + def test_stream_loop_returns_when_stopped_during_iteration(self): + stub = MagicMock() + pb2 = MagicMock() + mock_config_resp = MagicMock() + mock_config_resp.config.values = [] + stub.GetConfig.return_value = mock_config_resp + + w = ConfigWatcher(stub, pb2, "t1", timeout=5.0) + w.field("fee", float, default=0.0) + + class StopThenYieldStream: + def __iter__(self): + return self + + def __next__(self): + w._stop_event.set() # mark stopped before the loop body runs + response = MagicMock() + response.change.field_path = "fee" + response.change.HasField.side_effect = lambda name: ( + name + in ( + "old_value", + "new_value", + ) + ) + from opendecree._generated.centralconfig.v1 import types_pb2 + + response.change.old_value = types_pb2.TypedValue(string_value="0.0") + response.change.new_value = types_pb2.TypedValue(string_value="1.5") + response.change.version = 1 + response.change.changed_by = "" + return response + + def cancel(self): + pass + + stub.Subscribe.return_value = StopThenYieldStream() + + w.start() + time.sleep(0.2) + w.stop() + + # Thread exited via early return on line 248 + assert w._thread is None + + def test_stream_rpc_error_while_stopped_exits_cleanly(self): + stub = MagicMock() + pb2 = MagicMock() + mock_config_resp = MagicMock() + mock_config_resp.config.values = [] + stub.GetConfig.return_value = mock_config_resp + + w = ConfigWatcher(stub, pb2, "t1", timeout=5.0) + w.field("fee", float, default=0.0) + + class ErrorStream: + def __iter__(self): + return self + + def __next__(self): + w._stop_event.set() # mark stopped before raising + raise FakeRpcError(grpc.StatusCode.UNAVAILABLE, "gone") + + def cancel(self): + pass + + stub.Subscribe.return_value = ErrorStream() + + w.start() + time.sleep(0.2) + w.stop() + + # Thread should have exited on line 265 return path + assert w._thread is None