Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions sdk/src/opendecree/_watcher_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,14 @@
_RECONNECT_INITIAL = 1.0
_RECONNECT_MAX = 30.0
_RECONNECT_MULTIPLIER = 2.0
_DEFAULT_CHANGE_QUEUE_SIZE = 1024


def _validate_max_queue_size(max_queue_size: int) -> int:
"""Validate a positive change queue size."""
if max_queue_size <= 0:
raise ValueError("max_queue_size must be greater than 0")
return max_queue_size


class _WatchedFieldBase(Generic[T]):
Expand All @@ -35,12 +43,18 @@ def __init__(
self._is_set = False
self._callbacks: list[Callable[[T, T], None]] = []
self._on_callback_error = on_callback_error
self._dropped_changes = 0

@property
def path(self) -> str:
"""The field path this value tracks."""
return self._path

@property
def dropped_changes(self) -> int:
"""Number of queued changes dropped because the change queue was full."""
return self._dropped_changes

def on_change(self, fn: Callable[[T, T], None]) -> Callable[[T, T], None]:
"""Register a callback for value changes. Can be used as a decorator.

Expand Down
46 changes: 42 additions & 4 deletions sdk/src/opendecree/async_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,12 @@
from opendecree._convert import typed_value_to_string
from opendecree._stubs import process_get_all_response
from opendecree._watcher_base import (
_DEFAULT_CHANGE_QUEUE_SIZE,
_RECONNECT_INITIAL,
_RECONNECT_MAX,
_RECONNECT_MULTIPLIER,
_WatchedFieldBase,
_validate_max_queue_size,
)
from opendecree.types import Change

Expand All @@ -56,10 +58,12 @@ def __init__(
type_: type[T],
default: T,
*,
max_queue_size: int = _DEFAULT_CHANGE_QUEUE_SIZE,
on_callback_error: Callable[[Exception], None] | None = None,
) -> None:
max_queue_size = _validate_max_queue_size(max_queue_size)
super().__init__(path, type_, default, on_callback_error=on_callback_error)
self._change_queue: asyncio.Queue[Change | None] = asyncio.Queue()
self._change_queue: asyncio.Queue[Change | None] = asyncio.Queue(maxsize=max_queue_size)

@property
def value(self) -> T:
Expand All @@ -84,15 +88,40 @@ def _update(self, raw_value: str | None, change: Change) -> None:
"""Update the field value from a raw string. Called by the watcher task."""
old, new = self._apply_raw(raw_value)
self._fire_callbacks(old, new)
self._change_queue.put_nowait(change)
self._enqueue_change(change)

def _load_initial(self, raw_value: str) -> None:
"""Set initial value from snapshot. No callbacks fired."""
self._apply_raw(raw_value)

def _stop(self) -> None:
"""Signal the changes() iterator to stop."""
self._change_queue.put_nowait(None)
self._enqueue_stop()

def _enqueue_change(self, change: Change) -> None:
"""Queue a change, dropping the oldest queued change if the queue is full."""
while True:
try:
self._change_queue.put_nowait(change)
return
except asyncio.QueueFull:
try:
self._change_queue.get_nowait()
except asyncio.QueueEmpty:
continue
self._dropped_changes += 1

def _enqueue_stop(self) -> None:
"""Queue the stop sentinel without failing on a full queue."""
while True:
try:
self._change_queue.put_nowait(None)
return
except asyncio.QueueFull:
try:
self._change_queue.get_nowait()
except asyncio.QueueEmpty:
continue


class AsyncConfigWatcher:
Expand Down Expand Up @@ -125,6 +154,7 @@ def field(
type_: type[T],
*,
default: T,
max_queue_size: int = _DEFAULT_CHANGE_QUEUE_SIZE,
on_callback_error: Callable[[Exception], None] | None = None,
) -> AsyncWatchedField[T]:
"""Register a field to watch.
Expand All @@ -135,6 +165,8 @@ def field(
path: Dot-separated field path (e.g., "payments.fee").
type_: Python type to convert values to (str, int, float, bool, timedelta).
default: Default value when the field is null or not set.
max_queue_size: Maximum number of unread changes buffered before
dropping the oldest queued change.
on_callback_error: Optional hook called with the exception when an
on_change callback raises. If not set, the exception is logged.
The hook may re-raise to terminate the watcher's background task.
Expand All @@ -144,7 +176,13 @@ def field(
"""
if self._task is not None:
raise RuntimeError("Cannot register fields after watcher has started")
watched = AsyncWatchedField(path, type_, default, on_callback_error=on_callback_error)
watched = AsyncWatchedField(
path,
type_,
default,
max_queue_size=max_queue_size,
on_callback_error=on_callback_error,
)
self._fields[path] = watched
return watched

Expand Down
46 changes: 42 additions & 4 deletions sdk/src/opendecree/watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@
from opendecree._convert import typed_value_to_string
from opendecree._stubs import process_get_all_response
from opendecree._watcher_base import (
_DEFAULT_CHANGE_QUEUE_SIZE,
_RECONNECT_INITIAL,
_RECONNECT_MAX,
_RECONNECT_MULTIPLIER,
_WatchedFieldBase,
_validate_max_queue_size,
)
from opendecree.types import Change

Expand All @@ -55,11 +57,13 @@ def __init__(
type_: type[T],
default: T,
*,
max_queue_size: int = _DEFAULT_CHANGE_QUEUE_SIZE,
on_callback_error: Callable[[Exception], None] | None = None,
) -> None:
max_queue_size = _validate_max_queue_size(max_queue_size)
super().__init__(path, type_, default, on_callback_error=on_callback_error)
self._lock = threading.Lock()
self._change_queue: queue.Queue[Change] = queue.Queue()
self._change_queue: queue.Queue[Change] = queue.Queue(maxsize=max_queue_size)

@property
def value(self) -> T:
Expand Down Expand Up @@ -90,7 +94,7 @@ def _update(self, raw_value: str | None, change: Change) -> None:
with self._lock:
old, new = self._apply_raw(raw_value)
self._fire_callbacks(old, new)
self._change_queue.put(change)
self._enqueue_change(change)

def _load_initial(self, raw_value: str) -> None:
"""Set initial value from snapshot. No callbacks fired."""
Expand All @@ -99,7 +103,32 @@ def _load_initial(self, raw_value: str) -> None:

def _stop(self) -> None:
"""Signal the changes() iterator to stop."""
self._change_queue.put(_SENTINEL_CHANGE)
self._enqueue_stop()

def _enqueue_change(self, change: Change) -> None:
"""Queue a change, dropping the oldest queued change if the queue is full."""
while True:
try:
self._change_queue.put_nowait(change)
return
except queue.Full:
try:
self._change_queue.get_nowait()
except queue.Empty:
continue
self._dropped_changes += 1

def _enqueue_stop(self) -> None:
"""Queue the stop sentinel without blocking on a full queue."""
while True:
try:
self._change_queue.put_nowait(_SENTINEL_CHANGE)
return
except queue.Full:
try:
self._change_queue.get_nowait()
except queue.Empty:
continue


# Sentinel to signal the changes() iterator to stop.
Expand Down Expand Up @@ -129,6 +158,7 @@ def field(
type_: type[T],
*,
default: T,
max_queue_size: int = _DEFAULT_CHANGE_QUEUE_SIZE,
on_callback_error: Callable[[Exception], None] | None = None,
) -> WatchedField[T]:
"""Register a field to watch.
Expand All @@ -139,6 +169,8 @@ def field(
path: Dot-separated field path (e.g., "payments.fee").
type_: Python type to convert values to (str, int, float, bool, timedelta).
default: Default value when the field is null or not set.
max_queue_size: Maximum number of unread changes buffered before
dropping the oldest queued change.
on_callback_error: Optional hook called with the exception when an
on_change callback raises. If not set, the exception is logged.
The hook may re-raise to terminate the watcher's background loop.
Expand All @@ -148,7 +180,13 @@ def field(
"""
if self._thread is not None:
raise RuntimeError("Cannot register fields after watcher has started")
watched = WatchedField(path, type_, default, on_callback_error=on_callback_error)
watched = WatchedField(
path,
type_,
default,
max_queue_size=max_queue_size,
on_callback_error=on_callback_error,
)
self._fields[path] = watched
return watched

Expand Down
29 changes: 29 additions & 0 deletions sdk/tests/test_async_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,25 @@ async def test_changes_iterator(self):
assert collected[0].new_value == "b"
assert collected[1].new_value == "c"

def test_bounded_queue_drops_oldest_change(self):
f = AsyncWatchedField("x", int, 0, max_queue_size=2)

c1 = Change(field_path="x", old_value="0", new_value="1", version=1)
c2 = Change(field_path="x", old_value="1", new_value="2", version=2)
c3 = Change(field_path="x", old_value="2", new_value="3", version=3)

f._update("1", c1)
f._update("2", c2)
f._update("3", c3)

assert f.dropped_changes == 1
assert f._change_queue.get_nowait().version == 2
assert f._change_queue.get_nowait().version == 3

def test_invalid_max_queue_size_raises(self):
with pytest.raises(ValueError, match="max_queue_size"):
AsyncWatchedField("x", int, 0, max_queue_size=0)

def test_repr(self):
f = AsyncWatchedField("payments.fee", float, 0.01)
assert "payments.fee" in repr(f)
Expand Down Expand Up @@ -161,6 +180,16 @@ def test_register_field(self):
assert isinstance(f, AsyncWatchedField)
assert f.value == 0.01

def test_register_field_with_max_queue_size(self):
w = self._make_watcher()
f = w.field("rate", int, default=0, max_queue_size=1)

f._update("1", Change(field_path="rate", old_value="0", new_value="1", version=1))
f._update("2", Change(field_path="rate", old_value="1", new_value="2", version=2))

assert f.dropped_changes == 1
assert f._change_queue.get_nowait().version == 2

@pytest.mark.asyncio
async def test_cannot_register_after_start(self):
w = self._make_watcher()
Expand Down
33 changes: 33 additions & 0 deletions sdk/tests/test_watcher.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,27 @@ def test_changes_iterator(self):
assert collected[0].new_value == "b"
assert collected[1].new_value == "c"

def test_bounded_queue_drops_oldest_change(self):
f = WatchedField("x", int, 0, max_queue_size=2)

from opendecree.types import Change

c1 = Change(field_path="x", old_value="0", new_value="1", version=1)
c2 = Change(field_path="x", old_value="1", new_value="2", version=2)
c3 = Change(field_path="x", old_value="2", new_value="3", version=3)

f._update("1", c1)
f._update("2", c2)
f._update("3", c3)

assert f.dropped_changes == 1
assert f._change_queue.get_nowait().version == 2
assert f._change_queue.get_nowait().version == 3

def test_invalid_max_queue_size_raises(self):
with pytest.raises(ValueError, match="max_queue_size"):
WatchedField("x", int, 0, max_queue_size=0)

def test_repr(self):
f = WatchedField("payments.fee", float, 0.01)
assert "payments.fee" in repr(f)
Expand Down Expand Up @@ -187,6 +208,18 @@ def test_register_field(self):
assert isinstance(f, WatchedField)
assert f.value == 0.01

def test_register_field_with_max_queue_size(self):
w = self._make_watcher()
f = w.field("payments.fee", int, default=0, max_queue_size=1)

from opendecree.types import Change

f._update("1", Change(field_path="payments.fee", old_value="0", new_value="1", version=1))
f._update("2", Change(field_path="payments.fee", old_value="1", new_value="2", version=2))

assert f.dropped_changes == 1
assert f._change_queue.get_nowait().version == 2

def test_cannot_register_after_start(self):
w = self._make_watcher()
# Mock Subscribe to return an empty iterator.
Expand Down