diff --git a/sdk/src/opendecree/_watcher_base.py b/sdk/src/opendecree/_watcher_base.py index 9714920..26aea77 100644 --- a/sdk/src/opendecree/_watcher_base.py +++ b/sdk/src/opendecree/_watcher_base.py @@ -15,6 +15,14 @@ _RECONNECT_INITIAL = 1.0 _RECONNECT_MAX = 30.0 _RECONNECT_MULTIPLIER = 2.0 +_DEFAULT_CHANGE_QUEUE_SIZE = 1024 + + +def _validate_max_queue_size(max_queue_size: int) -> int: + """Validate a positive change queue size.""" + if max_queue_size <= 0: + raise ValueError("max_queue_size must be greater than 0") + return max_queue_size class _WatchedFieldBase(Generic[T]): @@ -35,12 +43,18 @@ def __init__( self._is_set = False self._callbacks: list[Callable[[T, T], None]] = [] self._on_callback_error = on_callback_error + self._dropped_changes = 0 @property def path(self) -> str: """The field path this value tracks.""" return self._path + @property + def dropped_changes(self) -> int: + """Number of queued changes dropped because the change queue was full.""" + return self._dropped_changes + def on_change(self, fn: Callable[[T, T], None]) -> Callable[[T, T], None]: """Register a callback for value changes. Can be used as a decorator. diff --git a/sdk/src/opendecree/async_watcher.py b/sdk/src/opendecree/async_watcher.py index f7d0073..1ffddd9 100644 --- a/sdk/src/opendecree/async_watcher.py +++ b/sdk/src/opendecree/async_watcher.py @@ -30,10 +30,12 @@ from opendecree._convert import typed_value_to_string from opendecree._stubs import process_get_all_response from opendecree._watcher_base import ( + _DEFAULT_CHANGE_QUEUE_SIZE, _RECONNECT_INITIAL, _RECONNECT_MAX, _RECONNECT_MULTIPLIER, _WatchedFieldBase, + _validate_max_queue_size, ) from opendecree.types import Change @@ -56,10 +58,12 @@ def __init__( type_: type[T], default: T, *, + max_queue_size: int = _DEFAULT_CHANGE_QUEUE_SIZE, on_callback_error: Callable[[Exception], None] | None = None, ) -> None: + max_queue_size = _validate_max_queue_size(max_queue_size) super().__init__(path, type_, default, on_callback_error=on_callback_error) - self._change_queue: asyncio.Queue[Change | None] = asyncio.Queue() + self._change_queue: asyncio.Queue[Change | None] = asyncio.Queue(maxsize=max_queue_size) @property def value(self) -> T: @@ -84,7 +88,7 @@ def _update(self, raw_value: str | None, change: Change) -> None: """Update the field value from a raw string. Called by the watcher task.""" old, new = self._apply_raw(raw_value) self._fire_callbacks(old, new) - self._change_queue.put_nowait(change) + self._enqueue_change(change) def _load_initial(self, raw_value: str) -> None: """Set initial value from snapshot. No callbacks fired.""" @@ -92,7 +96,32 @@ def _load_initial(self, raw_value: str) -> None: def _stop(self) -> None: """Signal the changes() iterator to stop.""" - self._change_queue.put_nowait(None) + self._enqueue_stop() + + def _enqueue_change(self, change: Change) -> None: + """Queue a change, dropping the oldest queued change if the queue is full.""" + while True: + try: + self._change_queue.put_nowait(change) + return + except asyncio.QueueFull: + try: + self._change_queue.get_nowait() + except asyncio.QueueEmpty: + continue + self._dropped_changes += 1 + + def _enqueue_stop(self) -> None: + """Queue the stop sentinel without failing on a full queue.""" + while True: + try: + self._change_queue.put_nowait(None) + return + except asyncio.QueueFull: + try: + self._change_queue.get_nowait() + except asyncio.QueueEmpty: + continue class AsyncConfigWatcher: @@ -125,6 +154,7 @@ def field( type_: type[T], *, default: T, + max_queue_size: int = _DEFAULT_CHANGE_QUEUE_SIZE, on_callback_error: Callable[[Exception], None] | None = None, ) -> AsyncWatchedField[T]: """Register a field to watch. @@ -135,6 +165,8 @@ def field( path: Dot-separated field path (e.g., "payments.fee"). type_: Python type to convert values to (str, int, float, bool, timedelta). default: Default value when the field is null or not set. + max_queue_size: Maximum number of unread changes buffered before + dropping the oldest queued change. on_callback_error: Optional hook called with the exception when an on_change callback raises. If not set, the exception is logged. The hook may re-raise to terminate the watcher's background task. @@ -144,7 +176,13 @@ def field( """ if self._task is not None: raise RuntimeError("Cannot register fields after watcher has started") - watched = AsyncWatchedField(path, type_, default, on_callback_error=on_callback_error) + watched = AsyncWatchedField( + path, + type_, + default, + max_queue_size=max_queue_size, + on_callback_error=on_callback_error, + ) self._fields[path] = watched return watched diff --git a/sdk/src/opendecree/watcher.py b/sdk/src/opendecree/watcher.py index 86f3c20..d22103e 100644 --- a/sdk/src/opendecree/watcher.py +++ b/sdk/src/opendecree/watcher.py @@ -29,10 +29,12 @@ from opendecree._convert import typed_value_to_string from opendecree._stubs import process_get_all_response from opendecree._watcher_base import ( + _DEFAULT_CHANGE_QUEUE_SIZE, _RECONNECT_INITIAL, _RECONNECT_MAX, _RECONNECT_MULTIPLIER, _WatchedFieldBase, + _validate_max_queue_size, ) from opendecree.types import Change @@ -55,11 +57,13 @@ def __init__( type_: type[T], default: T, *, + max_queue_size: int = _DEFAULT_CHANGE_QUEUE_SIZE, on_callback_error: Callable[[Exception], None] | None = None, ) -> None: + max_queue_size = _validate_max_queue_size(max_queue_size) super().__init__(path, type_, default, on_callback_error=on_callback_error) self._lock = threading.Lock() - self._change_queue: queue.Queue[Change] = queue.Queue() + self._change_queue: queue.Queue[Change] = queue.Queue(maxsize=max_queue_size) @property def value(self) -> T: @@ -90,7 +94,7 @@ def _update(self, raw_value: str | None, change: Change) -> None: with self._lock: old, new = self._apply_raw(raw_value) self._fire_callbacks(old, new) - self._change_queue.put(change) + self._enqueue_change(change) def _load_initial(self, raw_value: str) -> None: """Set initial value from snapshot. No callbacks fired.""" @@ -99,7 +103,32 @@ def _load_initial(self, raw_value: str) -> None: def _stop(self) -> None: """Signal the changes() iterator to stop.""" - self._change_queue.put(_SENTINEL_CHANGE) + self._enqueue_stop() + + def _enqueue_change(self, change: Change) -> None: + """Queue a change, dropping the oldest queued change if the queue is full.""" + while True: + try: + self._change_queue.put_nowait(change) + return + except queue.Full: + try: + self._change_queue.get_nowait() + except queue.Empty: + continue + self._dropped_changes += 1 + + def _enqueue_stop(self) -> None: + """Queue the stop sentinel without blocking on a full queue.""" + while True: + try: + self._change_queue.put_nowait(_SENTINEL_CHANGE) + return + except queue.Full: + try: + self._change_queue.get_nowait() + except queue.Empty: + continue # Sentinel to signal the changes() iterator to stop. @@ -129,6 +158,7 @@ def field( type_: type[T], *, default: T, + max_queue_size: int = _DEFAULT_CHANGE_QUEUE_SIZE, on_callback_error: Callable[[Exception], None] | None = None, ) -> WatchedField[T]: """Register a field to watch. @@ -139,6 +169,8 @@ def field( path: Dot-separated field path (e.g., "payments.fee"). type_: Python type to convert values to (str, int, float, bool, timedelta). default: Default value when the field is null or not set. + max_queue_size: Maximum number of unread changes buffered before + dropping the oldest queued change. on_callback_error: Optional hook called with the exception when an on_change callback raises. If not set, the exception is logged. The hook may re-raise to terminate the watcher's background loop. @@ -148,7 +180,13 @@ def field( """ if self._thread is not None: raise RuntimeError("Cannot register fields after watcher has started") - watched = WatchedField(path, type_, default, on_callback_error=on_callback_error) + watched = WatchedField( + path, + type_, + default, + max_queue_size=max_queue_size, + on_callback_error=on_callback_error, + ) self._fields[path] = watched return watched diff --git a/sdk/tests/test_async_watcher.py b/sdk/tests/test_async_watcher.py index 76171a7..5709081 100644 --- a/sdk/tests/test_async_watcher.py +++ b/sdk/tests/test_async_watcher.py @@ -85,6 +85,25 @@ async def test_changes_iterator(self): assert collected[0].new_value == "b" assert collected[1].new_value == "c" + def test_bounded_queue_drops_oldest_change(self): + f = AsyncWatchedField("x", int, 0, max_queue_size=2) + + c1 = Change(field_path="x", old_value="0", new_value="1", version=1) + c2 = Change(field_path="x", old_value="1", new_value="2", version=2) + c3 = Change(field_path="x", old_value="2", new_value="3", version=3) + + f._update("1", c1) + f._update("2", c2) + f._update("3", c3) + + assert f.dropped_changes == 1 + assert f._change_queue.get_nowait().version == 2 + assert f._change_queue.get_nowait().version == 3 + + def test_invalid_max_queue_size_raises(self): + with pytest.raises(ValueError, match="max_queue_size"): + AsyncWatchedField("x", int, 0, max_queue_size=0) + def test_repr(self): f = AsyncWatchedField("payments.fee", float, 0.01) assert "payments.fee" in repr(f) @@ -161,6 +180,16 @@ def test_register_field(self): assert isinstance(f, AsyncWatchedField) assert f.value == 0.01 + def test_register_field_with_max_queue_size(self): + w = self._make_watcher() + f = w.field("rate", int, default=0, max_queue_size=1) + + f._update("1", Change(field_path="rate", old_value="0", new_value="1", version=1)) + f._update("2", Change(field_path="rate", old_value="1", new_value="2", version=2)) + + assert f.dropped_changes == 1 + assert f._change_queue.get_nowait().version == 2 + @pytest.mark.asyncio async def test_cannot_register_after_start(self): w = self._make_watcher() diff --git a/sdk/tests/test_watcher.py b/sdk/tests/test_watcher.py index 3099b4f..73d8f58 100644 --- a/sdk/tests/test_watcher.py +++ b/sdk/tests/test_watcher.py @@ -101,6 +101,27 @@ def test_changes_iterator(self): assert collected[0].new_value == "b" assert collected[1].new_value == "c" + def test_bounded_queue_drops_oldest_change(self): + f = WatchedField("x", int, 0, max_queue_size=2) + + from opendecree.types import Change + + c1 = Change(field_path="x", old_value="0", new_value="1", version=1) + c2 = Change(field_path="x", old_value="1", new_value="2", version=2) + c3 = Change(field_path="x", old_value="2", new_value="3", version=3) + + f._update("1", c1) + f._update("2", c2) + f._update("3", c3) + + assert f.dropped_changes == 1 + assert f._change_queue.get_nowait().version == 2 + assert f._change_queue.get_nowait().version == 3 + + def test_invalid_max_queue_size_raises(self): + with pytest.raises(ValueError, match="max_queue_size"): + WatchedField("x", int, 0, max_queue_size=0) + def test_repr(self): f = WatchedField("payments.fee", float, 0.01) assert "payments.fee" in repr(f) @@ -187,6 +208,18 @@ def test_register_field(self): assert isinstance(f, WatchedField) assert f.value == 0.01 + def test_register_field_with_max_queue_size(self): + w = self._make_watcher() + f = w.field("payments.fee", int, default=0, max_queue_size=1) + + from opendecree.types import Change + + f._update("1", Change(field_path="payments.fee", old_value="0", new_value="1", version=1)) + f._update("2", Change(field_path="payments.fee", old_value="1", new_value="2", version=2)) + + assert f.dropped_changes == 1 + assert f._change_queue.get_nowait().version == 2 + def test_cannot_register_after_start(self): w = self._make_watcher() # Mock Subscribe to return an empty iterator.