From 7970a71d19bafa2d84363976bf20099ebd207835 Mon Sep 17 00:00:00 2001 From: Malte Schaaf Date: Thu, 5 Mar 2026 13:37:25 +0100 Subject: [PATCH 1/5] refactor(resampler): extract `_emit_window` method Extract window emission logic from `resample()` into dedicated `_emit_window()` method to allow code sharing between Timer-based and Event-driven resampler implementations. Signed-off-by: Malte Schaaf --- .../sdk/timeseries/_resampling/_resampler.py | 61 +++++++++++-------- 1 file changed, 35 insertions(+), 26 deletions(-) diff --git a/src/frequenz/sdk/timeseries/_resampling/_resampler.py b/src/frequenz/sdk/timeseries/_resampling/_resampler.py index db2631c2e..baa5301c8 100644 --- a/src/frequenz/sdk/timeseries/_resampling/_resampler.py +++ b/src/frequenz/sdk/timeseries/_resampling/_resampler.py @@ -160,14 +160,6 @@ async def resample(self, *, one_shot: bool = False) -> None: Args: one_shot: Wether the resampling should run only for one resampling period. - - Raises: - ResamplingError: If some timeseries source or sink encounters any - errors while receiving or sending samples. In this case the - timer still runs and the timeseries will keep receiving data. - The user should remove (and re-add if desired) the faulty - timeseries from the resampler before calling this method - again). """ # We use a tolerance of 10% of the resampling period tolerance = timedelta( @@ -200,28 +192,45 @@ async def resample(self, *, one_shot: bool = False) -> None: case unexpected: assert_never(unexpected) - # We need to make a copy here because we need to match the results to the - # current resamplers, and since we await here, new resamplers could be added - # or removed from the dict while we awaiting the resampling, which would - # cause the results to be out of sync. - resampler_sources = list(self._resamplers) - results = await asyncio.gather( - *[r.resample(next_tick_time) for r in self._resamplers.values()], - return_exceptions=True, - ) + await self._emit_window(next_tick_time) - exceptions = { - source: result - for source, result in zip(resampler_sources, results) - # CancelledError inherits from BaseException, but we don't want - # to catch *all* BaseExceptions here. - if isinstance(result, (Exception, asyncio.CancelledError)) - } - if exceptions: - raise ResamplingError(exceptions) if one_shot: break + async def _emit_window(self, window_end: datetime) -> None: + """Emit resampled samples for all timeseries at the given window boundary. + + Args: + window_end: The timestamp marking the end of the resampling window. + + Raises: + ResamplingError: If some timeseries source or sink encounters any + errors while receiving or sending samples. In this case the + timer still runs and the timeseries will keep receiving data. + The user should remove (and re-add if desired) the faulty + timeseries from the resampler before calling this method + again). + """ + # We need to make a copy here because we need to match the results to the + # current resamplers, and since we await here, new resamplers could be added + # or removed from the dict while we awaiting the resampling, which would + # cause the results to be out of sync. + resampler_sources = list(self._resamplers) + results = await asyncio.gather( + *[r.resample(window_end) for r in self._resamplers.values()], + return_exceptions=True, + ) + + exceptions = { + source: result + for source, result in zip(resampler_sources, results) + # CancelledError inherits from BaseException, but we don't want + # to catch *all* BaseExceptions here. + if isinstance(result, (Exception, asyncio.CancelledError)) + } + if exceptions: + raise ResamplingError(exceptions) + def _calculate_window_end(self) -> tuple[datetime, timedelta]: """Calculate the end of the current resampling window. From 03c253ae9cb23114827ad875c98912cd8dfbf27a Mon Sep 17 00:00:00 2001 From: Malte Schaaf Date: Thu, 5 Mar 2026 13:40:35 +0100 Subject: [PATCH 2/5] feat(resampler): add sample callback mechanism to `StreamingHelper` Add ability for `StreamingHelper` to notify external consumers when samples arrive via a callback function. This enables event-driven resampler implementations to receive samples without polling internal buffers. Changes: - Added `_sample_callback` attribute to store the callback function - Added `register_sample_callback()` method to register an async callback - Modified `_receive_samples()` to invoke the callback when a sample is added to the buffer This mechanism is used by `EventResampler` to implement event-driven sample processing instead of timer-based polling. Signed-off-by: Malte Schaaf --- .../sdk/timeseries/_resampling/_resampler.py | 24 ++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/src/frequenz/sdk/timeseries/_resampling/_resampler.py b/src/frequenz/sdk/timeseries/_resampling/_resampler.py index baa5301c8..8924ff44a 100644 --- a/src/frequenz/sdk/timeseries/_resampling/_resampler.py +++ b/src/frequenz/sdk/timeseries/_resampling/_resampler.py @@ -12,7 +12,7 @@ from bisect import bisect, bisect_left from collections import deque from datetime import datetime, timedelta, timezone -from typing import assert_never +from typing import Awaitable, Callable, assert_never from frequenz.channels.timer import Timer, TriggerAllMissed, _to_microseconds from frequenz.quantities import Quantity @@ -537,6 +537,9 @@ def __init__( self._helper: _ResamplingHelper = helper self._source: Source = source self._sink: Sink = sink + self._sample_callback: Callable[[Sample[Quantity]], Awaitable[None]] | None = ( + None + ) self._receiving_task: asyncio.Task[None] = asyncio.create_task( self._receive_samples() ) @@ -554,6 +557,22 @@ async def stop(self) -> None: """Cancel the receiving task.""" await cancel_and_await(self._receiving_task) + def register_sample_callback( + self, + callback: Callable[[Sample[Quantity]], Awaitable[None]] | None, + ) -> None: + """Register a callback to be invoked when a sample arrives. + + The callback is called asynchronously each time a sample is received + from the source. This allows consumers (like EventResampler) to be + notified of incoming samples without polling internal buffers. + + Args: + callback: An async function to call when a sample arrives. + If `None`, no callback will be called on new samples. + """ + self._sample_callback = callback + async def _receive_samples(self) -> None: """Pass received samples to the helper. @@ -564,6 +583,9 @@ async def _receive_samples(self) -> None: if sample.value is not None and not sample.value.isnan(): self._helper.add_sample((sample.timestamp, sample.value.base_value)) + if self._sample_callback: + await self._sample_callback(sample) + # We need the noqa because pydoclint can't figure out that `recv_exception` is an # `Exception` instance. async def resample(self, timestamp: datetime) -> None: # noqa: DOC503 From 20615e79264c9bc60960d90f0f1b076ace498adf Mon Sep 17 00:00:00 2001 From: Malte Schaaf Date: Thu, 5 Mar 2026 14:01:46 +0100 Subject: [PATCH 3/5] feat(resampler): implement `EventResampler` for cascaded resampling MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add `EventResampler` class that uses event-driven window management instead of timer-based intervals. This solves data loss issues when cascading resamplers. Problem: When cascading Timer-based resamplers (e.g., 1s → 10s) with `align_to=UNIX_EPOCH`, samples can be lost at window boundaries due to timing synchronization. Solution: EventResampler opens/closes windows based on sample arrival timestamps instead of fixed intervals, ensuring no data loss at boundaries. Changes: - Added `EventResampler` class that inherits from `Resampler` - Windows are emitted when a sample arrives with `timestamp >= window_end` - Maintains window alignment through simple addition of `resampling_period` - Uses sample callback mechanism from `StreamingHelper` for event-driven processing `EventResampler` is optimized for cascaded resampling and should not be used directly with raw, irregular data. Signed-off-by: Malte Schaaf --- .../_resampling/_event_resampler.py | 213 ++++++++++++++++++ 1 file changed, 213 insertions(+) create mode 100644 src/frequenz/sdk/timeseries/_resampling/_event_resampler.py diff --git a/src/frequenz/sdk/timeseries/_resampling/_event_resampler.py b/src/frequenz/sdk/timeseries/_resampling/_event_resampler.py new file mode 100644 index 000000000..5e48f7794 --- /dev/null +++ b/src/frequenz/sdk/timeseries/_resampling/_event_resampler.py @@ -0,0 +1,213 @@ +# License: MIT +# Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +"""Event-driven resampler for cascaded resampling stages.""" + +import asyncio +import logging +from datetime import datetime, timedelta, timezone + +from frequenz.quantities import Quantity + +from .._base_types import Sample +from ._base_types import Sink, Source +from ._config import ResamplerConfig +from ._resampler import Resampler, _ResamplingHelper, _StreamingHelper + +_logger = logging.getLogger(__name__) + + +class EventResampler(Resampler): + """Event-driven resampler for cascaded resampling stages. + + Unlike the standard Timer-based Resampler which uses fixed wall-clock + intervals, EventResampler is triggered by incoming data. Windows are + emitted when a sample arrives that falls outside the current window, + not on a fixed timer schedule. + + Problem Solved: + When cascading Timer-based resamplers (e.g., 1s → 10s) with + align_to=UNIX_EPOCH, samples can be lost at window boundaries due to + timing synchronization issues. EventResampler eliminates this by + opening/closing windows based on actual data arrival. + + Important: This resampler is optimized for continuous data streams + where samples arrive at regular or semi-regular intervals. It is not + suitable for handling raw, irregular data directly from sources. + + Best Used: + Stage 1: Timer-based Resampler (handles raw, irregular data) + Stage 2+: Event-based Resampler (handles continuous data from Stage 1) + + Example: + config = ResamplerConfig( + resampling_period=timedelta(seconds=10), + resampling_function=..., + ) + resampler = EventResampler(config) + resampler.add_timeseries("my_source", source, sink) + await resampler.resample() + + Note: If a long gap occurs without incoming samples (no data for multiple periods), + the corresponding windows will be emitted all at once when data resumes. This is + acceptable for cascaded resampling since the input typically comes from another + Resampler with guaranteed continuous output. + """ + + # pylint: disable=super-init-not-called + def __init__(self, config: ResamplerConfig) -> None: + """Initialize EventResampler. + + This does not call super().__init__() to avoid starting any timers + + Args: + config: Resampler configuration + """ + self._config = config + """The configuration for this resampler.""" + + self._resamplers: dict[Source, _StreamingHelper] = {} + """A mapping between sources and the streaming helper handling that source.""" + + window_end, _ = self._calculate_window_end() + self._window_end: datetime = window_end + """The time in which the current window ends. + + This is used to make sure every resampling window is generated at + precise times. We can't rely on the timer timestamp because timers will + never fire at the exact requested time, so if we don't use a precise + time for the end of the window, the resampling windows we produce will + have different sizes. + + The window end will also be aligned to the `config.align_to` time, so + the window end is deterministic. + """ + + self._window_lock = asyncio.Lock() + """Lock protecting access to `_window_end` during window state transitions.""" + + self._sample_queue: asyncio.Queue[Sample[Quantity]] = asyncio.Queue() + """Queue for samples awaiting processing. Filled by `_StreamingHelper` callbacks, + consumed by the event loop in `resample()`. + """ + + # OVERRIDDEN: Register callback to receive samples asynchronously for + # event-driven window management. + def add_timeseries(self, name: str, source: Source, sink: Sink) -> bool: + """Start resampling a new timeseries. + + Registers the timeseries and sets up a sample callback to enqueue + incoming samples for event-driven processing. + + Args: + name: The name of the timeseries (for logging purposes). + source: The source of the timeseries to resample. + sink: The sink to use to send the resampled data. + + Returns: + `True` if the timeseries was added, `False` if the timeseries was + not added because there already a timeseries using the provided + receiver. + """ + if source in self._resamplers: + return False + + resampler = _StreamingHelper( + _ResamplingHelper(name, self._config), source, sink + ) + + # Register the callback to receive samples from the streaming helper. + resampler.register_sample_callback(self._enqueue_sample) + + self._resamplers[source] = resampler + return True + + async def _enqueue_sample(self, sample: Sample[Quantity]) -> None: + """Add a sample to the processing queue. + + Args: + sample: The sample to enqueue. + """ + await self._sample_queue.put(sample) + + # OVERRIDDEN: no warm-up period needed for event-driven sample accumulation. + def _calculate_window_end(self) -> tuple[datetime, timedelta]: + """Calculate the end of the first resampling window. + + Calculates the next multiple of resampling_period after the current time, + respecting align_to configuration. + + Returns: + A tuple (window_end, delay_time) where: + - window_end: datetime when the first window should end + - delay_time: always timedelta(0) for EventResampler + """ + now = datetime.now(timezone.utc) + period = self._config.resampling_period + align_to = self._config.align_to + + if align_to is None: + return (now + period, timedelta(0)) + + elapsed = (now - align_to) % period + + return ( + (now + (period - elapsed), timedelta(0)) + if elapsed > timedelta(0) + else (now, timedelta(0)) + ) + + async def resample(self, *, one_shot: bool = False) -> None: + """Start event-driven resampling. + + Processes incoming samples from the queue continuously. Windows are + emitted when a sample arrives with a timestamp >= current window_end. + This is in contrast to Timer-based resampling which emits windows at + fixed intervals regardless of data arrival. + + Args: + one_shot: If True, waits for the first window to be emitted, then exits. + + Raises: + asyncio.CancelledError: If the task is cancelled. + """ + try: + while True: + sample = await self._sample_queue.get() + emmitted = await self._process_sample(sample) + + if one_shot and emmitted: + return + + except asyncio.CancelledError: + _logger.info("EventResampler task cancelled") + raise + + async def _process_sample( + self, + sample: Sample[Quantity], + ) -> bool: + """Process an incoming sample and manage window state. + + This method checks if the incoming sample falls outside the current + window and emits completed windows as needed. Returns True if any + windows were emitted. + + Args: + sample: Incoming sample to process + + Returns: + True if at least one window was emitted, False otherwise. + """ + async with self._window_lock: + emmitted = False + while sample.timestamp >= self._window_end: + _logger.debug( + "EventResampler: Sample at %s >= window end %s, closing window", + sample.timestamp, + self._window_end, + ) + await self._emit_window(self._window_end) + self._window_end += self._config.resampling_period + emmitted = True + return emmitted From 75c286e1fb010b79b2019d1291f74b04eea38de7 Mon Sep 17 00:00:00 2001 From: Malte Schaaf Date: Thu, 5 Mar 2026 16:50:30 +0100 Subject: [PATCH 4/5] test(resampler): add comprehensive tests for `EventResampler` Add test suite for `EventResampler` covering window initialization, boundary conditions, and alignment behavior. Tests are parametrized to verify correct behavior with and without `align_to` configuration. Changes: - Added tests for `EventResampler` initialization and window end calculation - Added tests for sample processing before, at, and after window boundaries - Added tests for correct behavior when samples cross multiple windows - Added tests verifying window alignment is maintained through simple addition - Added key test demonstrating no data loss at window boundaries Tests use parametrized fixtures to cover both aligned and non-aligned window scenarios, ensuring the event-driven window management works correctly in all cases. Signed-off-by: Malte Schaaf --- .../_resampling/test_event_resampler.py | 298 ++++++++++++++++++ 1 file changed, 298 insertions(+) create mode 100644 tests/timeseries/_resampling/test_event_resampler.py diff --git a/tests/timeseries/_resampling/test_event_resampler.py b/tests/timeseries/_resampling/test_event_resampler.py new file mode 100644 index 000000000..0920a6cd9 --- /dev/null +++ b/tests/timeseries/_resampling/test_event_resampler.py @@ -0,0 +1,298 @@ +# License: MIT +# Copyright © 2026 Frequenz Energy-as-a-Service GmbH + +"""Tests for the `EventResampler` class.""" + +from dataclasses import dataclass +from datetime import datetime, timedelta, timezone +from unittest.mock import AsyncMock, patch + +import pytest +from frequenz.quantities import Quantity + +from frequenz.sdk.timeseries import Sample +from frequenz.sdk.timeseries._resampling._config import ResamplerConfig +from frequenz.sdk.timeseries._resampling._event_resampler import EventResampler +from frequenz.sdk.timeseries._resampling._resampler import Resampler + +# pylint: disable=protected-access + + +@dataclass +class ResamplerTestCase: + """Data class for holding test case parameters for EventResampler tests.""" + + align_to: datetime | None + """Alignment point for windows. If None, windows are aligned to the first sample time.""" + + first_window_end: datetime + """Expected end time of the first window based on the configuration and start time.""" + + +@pytest.fixture +def now() -> datetime: + """Fixture providing a fixed current time for testing.""" + return datetime(2024, 1, 1, 12, 0, 5, tzinfo=timezone.utc) + + +@pytest.fixture( + params=[ + ResamplerTestCase( + align_to=None, + first_window_end=datetime(2024, 1, 1, 12, 0, 15, tzinfo=timezone.utc), + ), + ResamplerTestCase( + align_to=datetime(1970, 1, 1, tzinfo=timezone.utc), + first_window_end=datetime(2024, 1, 1, 12, 0, 10, tzinfo=timezone.utc), + ), + ], + ids=["no_alignment", "with_alignment"], +) +def resampler_case(request: pytest.FixtureRequest) -> ResamplerTestCase: + """Fixture for EventResampler test cases.""" + assert isinstance(request.param, ResamplerTestCase) + return request.param + + +@pytest.fixture +def resampler_config(resampler_case: ResamplerTestCase) -> ResamplerConfig: + """Create a basic resampler config for testing.""" + return ResamplerConfig( + resampling_period=timedelta(seconds=10), + max_data_age_in_periods=1, + align_to=resampler_case.align_to, + ) + + +@pytest.fixture +def first_window_end(resampler_case: ResamplerTestCase) -> datetime: + """Fixture providing the expected first window end time based on the test case.""" + return resampler_case.first_window_end + + +@patch("frequenz.sdk.timeseries._resampling._event_resampler.datetime") +@pytest.mark.asyncio +async def test_event_resampler_initialization( + mock_datetime: AsyncMock, + resampler_config: ResamplerConfig, + now: datetime, + first_window_end: datetime, +) -> None: + """Event Resampler initializes without errors.""" + mock_datetime.now.return_value = now + resampler = EventResampler(resampler_config) + + assert resampler.config == resampler_config + assert len(resampler._resamplers) == 0 + assert resampler._window_end == first_window_end + + +@pytest.mark.asyncio +async def test_event_resampler_inherits_from_resampler( + resampler_config: ResamplerConfig, +) -> None: + """Event Resampler is a Resampler subclass.""" + resampler = EventResampler(resampler_config) + assert isinstance(resampler, Resampler) + assert hasattr(resampler, "add_timeseries") + assert hasattr(resampler, "remove_timeseries") + assert callable(resampler.add_timeseries) + assert callable(resampler.remove_timeseries) + + +@patch("frequenz.sdk.timeseries._resampling._event_resampler.datetime") +@pytest.mark.asyncio +async def test_window_initialization( + mock_datetime: AsyncMock, + resampler_config: ResamplerConfig, + now: datetime, + first_window_end: datetime, +) -> None: + """Window initializes correctly on first sample.""" + mock_datetime.now.return_value = now + resampler = EventResampler(resampler_config) + + assert resampler._window_end == first_window_end + + sample = Sample(now, Quantity(42.0)) + await resampler._process_sample(sample) + + assert resampler._window_end == first_window_end + + +@patch("frequenz.sdk.timeseries._resampling._event_resampler.datetime") +@pytest.mark.asyncio +async def test_sample_before_first_window_boundary( + mock_datetime: AsyncMock, + resampler_config: ResamplerConfig, + now: datetime, + first_window_end: datetime, +) -> None: + """Samples before window boundary don't trigger emit.""" + mock_datetime.now.return_value = now + resampler = EventResampler(resampler_config) + + with patch.object( + resampler, "_emit_window", new_callable=AsyncMock + ) as mock_emit_window: + # Sample 1 + sample1 = Sample(now + timedelta(seconds=1), Quantity(10.0)) + await resampler._process_sample(sample1) + + assert resampler._window_end == first_window_end + mock_emit_window.assert_not_called() + + # Process sample still within first window + sample2 = Sample(now + timedelta(seconds=3), Quantity(20.0)) + await resampler._process_sample(sample2) + + assert resampler._window_end == first_window_end + mock_emit_window.assert_not_called() + + +@patch("frequenz.sdk.timeseries._resampling._event_resampler.datetime") +@pytest.mark.asyncio +async def test_sample_at_window_boundary_triggers_emit( + mock_datetime: AsyncMock, + resampler_config: ResamplerConfig, + now: datetime, + first_window_end: datetime, +) -> None: + """Sample at window boundary triggers emit and opens new window.""" + mock_datetime.now.return_value = now + resampler = EventResampler(resampler_config) + + with patch.object( + resampler, "_emit_window", new_callable=AsyncMock + ) as mock_emit_window: + # Sample 1 + sample1 = Sample(now + timedelta(seconds=1), Quantity(10.0)) + await resampler._process_sample(sample1) + + assert resampler._window_end == first_window_end + mock_emit_window.assert_not_called() + + # Sample 2 at boundary + sample2 = Sample(now + timedelta(seconds=10), Quantity(20.0)) + await resampler._process_sample(sample2) + + mock_emit_window.assert_called_once_with(first_window_end) + assert resampler._window_end == first_window_end + timedelta(seconds=10) + + +@patch("frequenz.sdk.timeseries._resampling._event_resampler.datetime") +@pytest.mark.asyncio +async def test_sample_after_window_boundary( + mock_datetime: AsyncMock, + resampler_config: ResamplerConfig, + now: datetime, + first_window_end: datetime, +) -> None: + """Sample after window boundary triggers emit.""" + mock_datetime.now.return_value = now + resampler = EventResampler(resampler_config) + + with patch.object( + resampler, "_emit_window", new_callable=AsyncMock + ) as mock_emit_window: + # Sample 1 + sample1 = Sample(now + timedelta(seconds=1), Quantity(10.0)) + await resampler._process_sample(sample1) + + assert resampler._window_end == first_window_end + mock_emit_window.assert_not_called() + + # Sample 2 at boundary + sample2 = Sample(now + timedelta(seconds=11), Quantity(20.0)) + await resampler._process_sample(sample2) + + mock_emit_window.assert_called_once_with(first_window_end) + assert resampler._window_end == first_window_end + timedelta(seconds=10) + + +@patch("frequenz.sdk.timeseries._resampling._event_resampler.datetime") +@pytest.mark.asyncio +async def test_sample_crossing_multiple_windows( + mock_datetime: AsyncMock, + resampler_config: ResamplerConfig, + now: datetime, + first_window_end: datetime, +) -> None: + """Sample crossing multiple windows emits each one.""" + mock_datetime.now.return_value = now + resampler = EventResampler(resampler_config) + + with patch.object( + resampler, "_emit_window", new_callable=AsyncMock + ) as mock_emit_window: + # Sample 1 at 2s + sample1 = Sample(now + timedelta(seconds=2), Quantity(10.0)) + await resampler._process_sample(sample1) + mock_emit_window.assert_not_called() + assert resampler._window_end == first_window_end + + # Sample 2 at 32s + sample2 = Sample(now + timedelta(seconds=32), Quantity(20.0)) + await resampler._process_sample(sample2) + assert mock_emit_window.call_count == 3 + assert resampler._window_end == first_window_end + timedelta(seconds=30) + + +@patch("frequenz.sdk.timeseries._resampling._event_resampler.datetime") +@pytest.mark.asyncio +async def test_window_alignment_maintained( + mock_datetime: AsyncMock, + resampler_config: ResamplerConfig, + now: datetime, + first_window_end: datetime, +) -> None: + """Windows remain aligned when using simple addition.""" + mock_datetime.now.return_value = now + resampler = EventResampler(resampler_config) + send_sequence = [1, 15, 25, 35] # Sample times in seconds + + with patch.object( + resampler, "_emit_window", new_callable=AsyncMock + ) as mock_emit_window: + for offset in send_sequence: + sample = Sample(now + timedelta(seconds=offset), Quantity(float(offset))) + await resampler._process_sample(sample) + + for i, call_args in enumerate(mock_emit_window.call_args_list): + window_end = call_args.args[0] # Extract the first argument from the call + expected = first_window_end + i * resampler_config.resampling_period + assert window_end == expected + + +@patch("frequenz.sdk.timeseries._resampling._event_resampler.datetime") +@pytest.mark.asyncio +async def test_key_benefit_no_data_loss_at_boundaries( + mock_datetime: AsyncMock, + resampler_config: ResamplerConfig, + now: datetime, + first_window_end: datetime, +) -> None: + """ + Key benefit: No data loss at window boundaries. + + This test demonstrates the main value of EventResampler compared + to cascaded TimerResamplers: samples arriving at boundaries are + never lost. + """ + mock_datetime.now.return_value = now + resampler = EventResampler(resampler_config) + arriving_samples = [1.0, 5.0, 9.5, 10.0, 10.1, 15.0, 20.0, 20.5] + + with patch.object( + resampler, "_emit_window", new_callable=AsyncMock + ) as mock_emit_window: + for i, sample_offset in enumerate(arriving_samples): + sample = Sample(now + timedelta(seconds=sample_offset), Quantity(i)) + await resampler._process_sample(sample) + + assert mock_emit_window.call_count == 2 + assert mock_emit_window.call_args_list[0].args[0] == first_window_end + assert mock_emit_window.call_args_list[1].args[ + 0 + ] == first_window_end + timedelta(seconds=10) + assert resampler._window_end == (first_window_end + timedelta(seconds=20)) From a9e30301ffb12884e0da00a554a04a5c75b5a76d Mon Sep 17 00:00:00 2001 From: Malte Schaaf Date: Thu, 5 Mar 2026 16:53:55 +0100 Subject: [PATCH 5/5] docs(ReleaseNotes): update release notes Signed-off-by: Malte Schaaf --- RELEASE_NOTES.md | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index 5f93b7d7b..4fa7acd3d 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -11,6 +11,10 @@ ## New Features - `Resampler`: The resampler can now be configured to have the resampling window closed to the right (default) or left, and to also set the resampler timestamp to the right (default) or left end of the window being resampled. You can configure setting the new options `closed` and `label` in the `ResamplerConfig`. +- `EventResampler`: A new event-driven resampler for cascaded resampling stages. Unlike the timer-based `Resampler`, `EventResampler` emits windows when sample timestamps exceed window boundaries, eliminating data loss at window boundaries in cascaded scenarios. See the class documentation for usage guidelines. +- `StreamingHelper`: Added callback mechanism via `register_sample_callback()` to notify external consumers when samples arrive, enabling event-driven resampling without polling internal buffers. +- `Resampler._emit_window()`: Extracted window emission logic into a dedicated method for code sharing between timer-based and event-driven resampler implementations. + ## Bug Fixes