From 2984e9128e9859d7007fd1d83a8c27d54c68d68c Mon Sep 17 00:00:00 2001 From: JeffreyChen Date: Mon, 22 Jun 2026 06:41:58 +0800 Subject: [PATCH 1/2] Add time-series transforms (rate / downsample / resample) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit observability counters store only the current value (nothing turns a counter into a per-second rate) and cost_telemetry only buckets by day. Add Prometheus-style reset-aware ts_rate / ts_irate / ts_increase / ts_delta / ts_idelta plus ts_downsample (tumbling buckets) and ts_resample (grid, last/linear/none fill) over (timestamp, value) series. No wall clock — windows use the series' own timestamps. Wired through facade, executor (AC_ts_rate / AC_ts_downsample), MCP, and the Script Builder with a headless test batch and EN/Zh docs. --- README.md | 7 + README/README_zh-CN.md | 7 + README/README_zh-TW.md | 7 + .../Eng/doc/new_features/v98_features_doc.rst | 43 ++++++ docs/source/Eng/eng_index.rst | 1 + .../Zh/doc/new_features/v98_features_doc.rst | 37 +++++ docs/source/Zh/zh_index.rst | 1 + je_auto_control/__init__.py | 7 + .../gui/script_builder/command_schema.py | 20 +++ .../utils/executor/action_executor.py | 23 +++ .../utils/mcp_server/tools/_factories.py | 29 ++++ .../utils/mcp_server/tools/_handlers.py | 10 ++ je_auto_control/utils/timeseries/__init__.py | 10 ++ .../utils/timeseries/timeseries.py | 133 ++++++++++++++++++ .../headless/test_timeseries_batch.py | 95 +++++++++++++ 15 files changed, 430 insertions(+) create mode 100644 docs/source/Eng/doc/new_features/v98_features_doc.rst create mode 100644 docs/source/Zh/doc/new_features/v98_features_doc.rst create mode 100644 je_auto_control/utils/timeseries/__init__.py create mode 100644 je_auto_control/utils/timeseries/timeseries.py create mode 100644 test/unit_test/headless/test_timeseries_batch.py diff --git a/README.md b/README.md index c5bb6a50..0e3929cc 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,7 @@ ## Table of Contents +- [What's new (2026-06-22) — Time-Series Transforms](#whats-new-2026-06-22--time-series-transforms) - [What's new (2026-06-22) — Unicode Text Normalisation & Slugify](#whats-new-2026-06-22--unicode-text-normalisation--slugify) - [What's new (2026-06-22) — JSON-Schema Compatibility Checking](#whats-new-2026-06-22--json-schema-compatibility-checking) - [What's new (2026-06-22) — Typed Configuration Schema](#whats-new-2026-06-22--typed-configuration-schema) @@ -150,6 +151,12 @@ --- +## What's new (2026-06-22) — Time-Series Transforms + +Turn counters into rates; downsample and resample. Full reference: [`docs/source/Eng/doc/new_features/v98_features_doc.rst`](docs/source/Eng/doc/new_features/v98_features_doc.rst). + +- **`ts_rate` / `ts_irate` / `ts_increase` / `ts_delta` / `ts_downsample` / `ts_resample`** (`AC_ts_rate`, `AC_ts_downsample`): `observability` counters store only the current value (no counter→rate anywhere) and `cost_telemetry` only buckets by day. This adds Prometheus-style reset-aware rate/increase/delta over `(timestamp, value)` series, tumbling-bucket downsampling (avg/sum/min/max/first/last/count), and grid resampling (last/linear/none). No wall clock — deterministic. Pure-stdlib. + ## What's new (2026-06-22) — Unicode Text Normalisation & Slugify Canonicalize text before fuzzy/search/OCR matching. Full reference: [`docs/source/Eng/doc/new_features/v97_features_doc.rst`](docs/source/Eng/doc/new_features/v97_features_doc.rst). diff --git a/README/README_zh-CN.md b/README/README_zh-CN.md index 8aa13d5d..0d97977a 100644 --- a/README/README_zh-CN.md +++ b/README/README_zh-CN.md @@ -12,6 +12,7 @@ ## 目录 +- [本次更新 (2026-06-22) — 时间序列变换](#本次更新-2026-06-22--时间序列变换) - [本次更新 (2026-06-22) — Unicode 文本规范化与 Slug](#本次更新-2026-06-22--unicode-文本规范化与-slug) - [本次更新 (2026-06-22) — JSON-Schema 兼容性检查](#本次更新-2026-06-22--json-schema-兼容性检查) - [本次更新 (2026-06-22) — 具类型的配置结构](#本次更新-2026-06-22--具类型的配置结构) @@ -149,6 +150,12 @@ --- +## 本次更新 (2026-06-22) — 时间序列变换 + +把计数器转成速率;降采样与重采样。完整参考:[`docs/source/Zh/doc/new_features/v98_features_doc.rst`](../docs/source/Zh/doc/new_features/v98_features_doc.rst)。 + +- **`ts_rate` / `ts_irate` / `ts_increase` / `ts_delta` / `ts_downsample` / `ts_resample`**(`AC_ts_rate`、`AC_ts_downsample`):`observability` 计数器只存当前值(无处可把计数器转速率),`cost_telemetry` 只以天分桶。本功能在 `(timestamp, value)` 序列上加入 Prometheus 风格、具重置感知的 rate/increase/delta、tumbling-bucket 降采样(avg/sum/min/max/first/last/count)与网格重采样(last/linear/none)。不读 wall clock、确定。纯标准库。 + ## 本次更新 (2026-06-22) — Unicode 文本规范化与 Slug 在 fuzzy/search/OCR 匹配前规范化文本。完整参考:[`docs/source/Zh/doc/new_features/v97_features_doc.rst`](../docs/source/Zh/doc/new_features/v97_features_doc.rst)。 diff --git a/README/README_zh-TW.md b/README/README_zh-TW.md index 3848c564..26b8e618 100644 --- a/README/README_zh-TW.md +++ b/README/README_zh-TW.md @@ -12,6 +12,7 @@ ## 目錄 +- [本次更新 (2026-06-22) — 時間序列轉換](#本次更新-2026-06-22--時間序列轉換) - [本次更新 (2026-06-22) — Unicode 文字正規化與 Slug](#本次更新-2026-06-22--unicode-文字正規化與-slug) - [本次更新 (2026-06-22) — JSON-Schema 相容性檢查](#本次更新-2026-06-22--json-schema-相容性檢查) - [本次更新 (2026-06-22) — 具型別的設定結構](#本次更新-2026-06-22--具型別的設定結構) @@ -149,6 +150,12 @@ --- +## 本次更新 (2026-06-22) — 時間序列轉換 + +把計數器轉成速率;降採樣與重採樣。完整參考:[`docs/source/Zh/doc/new_features/v98_features_doc.rst`](../docs/source/Zh/doc/new_features/v98_features_doc.rst)。 + +- **`ts_rate` / `ts_irate` / `ts_increase` / `ts_delta` / `ts_downsample` / `ts_resample`**(`AC_ts_rate`、`AC_ts_downsample`):`observability` 計數器只存當前值(無處可把計數器轉速率),`cost_telemetry` 只以天分桶。本功能在 `(timestamp, value)` 序列上加入 Prometheus 風格、具重置感知的 rate/increase/delta、tumbling-bucket 降採樣(avg/sum/min/max/first/last/count)與網格重採樣(last/linear/none)。不讀 wall clock、具決定性。純標準函式庫。 + ## 本次更新 (2026-06-22) — Unicode 文字正規化與 Slug 在 fuzzy/search/OCR 比對前正規化文字。完整參考:[`docs/source/Zh/doc/new_features/v97_features_doc.rst`](../docs/source/Zh/doc/new_features/v97_features_doc.rst)。 diff --git a/docs/source/Eng/doc/new_features/v98_features_doc.rst b/docs/source/Eng/doc/new_features/v98_features_doc.rst new file mode 100644 index 00000000..56bff8fe --- /dev/null +++ b/docs/source/Eng/doc/new_features/v98_features_doc.rst @@ -0,0 +1,43 @@ +Time-Series Transforms +====================== + +``observability`` counters and gauges store only the *current* value — nothing +turned a counter into a per-second rate — and ``cost_telemetry`` only buckets by +a fixed day. This adds Prometheus-style ``rate`` / ``irate`` / ``increase`` / +``delta`` (reset-aware) plus tumbling-bucket ``downsample`` and grid +``resample`` over ``(timestamp, value)`` sequences. + +Pure standard library (``bisect``); imports no ``PySide6``. No wall clock is +read — windows use the series' own timestamps — so every function is fully +deterministic in CI. + +Headless API +------------ + +.. code-block:: python + + from je_auto_control import ts_rate, ts_increase, ts_downsample, ts_resample + + series = [(0, 0), (10, 50), (20, 120)] # (timestamp_s, counter_value) + ts_rate(series) # 6.0 (120 over 20s) + ts_rate(series, window_s=10) # rate over the last 10s only + ts_increase(series) # 120.0 (reset-aware) + + ts_downsample([(0, 1), (3, 3), (5, 10)], 5, "avg") # [(0, 2.0), (5, 10.0)] + ts_resample([(0, 0), (20, 20)], 10, fill="linear") # [(0,0),(10,10),(20,20)] + +``ts_rate`` / ``ts_increase`` treat a value drop as a counter reset (Prometheus +semantics); ``ts_irate`` is the instant rate from the last two samples; +``ts_delta`` / ``ts_idelta`` are gauge first-to-last and last-two differences. +``ts_downsample`` rolls the series into ``bucket_s`` tumbling buckets aggregated +by ``avg`` / ``sum`` / ``min`` / ``max`` / ``first`` / ``last`` / ``count``. +``ts_resample`` aligns to a fixed grid, filling with ``"last"`` (carry forward), +``"linear"`` (interpolate), or ``None`` (gaps). + +Executor commands +----------------- + +``AC_ts_rate`` returns ``{rate}`` for a ``series`` (optional ``window_s``); +``AC_ts_downsample`` returns ``{buckets}`` for a ``series`` and ``bucket_s`` +(optional ``agg``). Both are exposed as MCP tools (``ac_ts_rate`` / +``ac_ts_downsample``) and as Script Builder commands under **Data**. diff --git a/docs/source/Eng/eng_index.rst b/docs/source/Eng/eng_index.rst index 5e52e253..f984ad30 100644 --- a/docs/source/Eng/eng_index.rst +++ b/docs/source/Eng/eng_index.rst @@ -120,6 +120,7 @@ Comprehensive guides for all AutoControl features. doc/new_features/v95_features_doc doc/new_features/v96_features_doc doc/new_features/v97_features_doc + doc/new_features/v98_features_doc doc/ocr_backends/ocr_backends_doc doc/observability/observability_doc doc/operations_layer/operations_layer_doc diff --git a/docs/source/Zh/doc/new_features/v98_features_doc.rst b/docs/source/Zh/doc/new_features/v98_features_doc.rst new file mode 100644 index 00000000..9cc9db71 --- /dev/null +++ b/docs/source/Zh/doc/new_features/v98_features_doc.rst @@ -0,0 +1,37 @@ +時間序列轉換 +========== + +``observability`` 的計數器與量規只儲存*當前*值 —— 沒有任何東西能把計數器轉成每秒速率 —— 而 +``cost_telemetry`` 只以固定的「天」分桶。本功能在 ``(timestamp, value)`` 序列上加入 Prometheus 風格的 +``rate`` / ``irate`` / ``increase`` / ``delta``(具重置感知),以及 tumbling-bucket ``downsample`` 與 +網格 ``resample``。 + +純標準函式庫(``bisect``);不匯入 ``PySide6``。不讀取 wall clock —— 視窗使用序列自身的時間戳 —— 因此每個 +函式皆完全具決定性。 + +無頭 API +-------- + +.. code-block:: python + + from je_auto_control import ts_rate, ts_increase, ts_downsample, ts_resample + + series = [(0, 0), (10, 50), (20, 120)] # (timestamp_s, counter_value) + ts_rate(series) # 6.0(20 秒內 120) + ts_rate(series, window_s=10) # 只看最後 10 秒的速率 + ts_increase(series) # 120.0(重置感知) + + ts_downsample([(0, 1), (3, 3), (5, 10)], 5, "avg") # [(0, 2.0), (5, 10.0)] + ts_resample([(0, 0), (20, 20)], 10, fill="linear") # [(0,0),(10,10),(20,20)] + +``ts_rate`` / ``ts_increase`` 把值下降視為計數器重置(Prometheus 語意);``ts_irate`` 是最後兩個樣本的 +瞬時速率;``ts_delta`` / ``ts_idelta`` 是量規的首尾差與最後兩點差。``ts_downsample`` 把序列滾成 ``bucket_s`` +的 tumbling 桶,以 ``avg`` / ``sum`` / ``min`` / ``max`` / ``first`` / ``last`` / ``count`` 聚合。 +``ts_resample`` 對齊到固定網格,以 ``"last"``(前向填補)、``"linear"``(內插)或 ``None``(留缺)填值。 + +執行器命令 +---------- + +``AC_ts_rate`` 對 ``series``(可選 ``window_s``)回傳 ``{rate}``;``AC_ts_downsample`` 對 ``series`` 與 +``bucket_s``(可選 ``agg``)回傳 ``{buckets}``。兩者皆以 MCP 工具(``ac_ts_rate`` / ``ac_ts_downsample``) +以及 Script Builder 中 **Data** 分類下的命令提供。 diff --git a/docs/source/Zh/zh_index.rst b/docs/source/Zh/zh_index.rst index ed23b32a..1775667f 100644 --- a/docs/source/Zh/zh_index.rst +++ b/docs/source/Zh/zh_index.rst @@ -120,6 +120,7 @@ AutoControl 所有功能的完整使用指南。 doc/new_features/v95_features_doc doc/new_features/v96_features_doc doc/new_features/v97_features_doc + doc/new_features/v98_features_doc doc/ocr_backends/ocr_backends_doc doc/observability/observability_doc doc/operations_layer/operations_layer_doc diff --git a/je_auto_control/__init__.py b/je_auto_control/__init__.py index 0c0985b2..6c313630 100644 --- a/je_auto_control/__init__.py +++ b/je_auto_control/__init__.py @@ -414,6 +414,11 @@ ) # Mergeable streaming latency digest + exact percentiles from je_auto_control.utils.percentiles import LatencyDigest, exact_percentiles +# Time-series transforms: rate / irate / delta / downsample / resample +from je_auto_control.utils.timeseries import ( + ts_delta, ts_downsample, ts_idelta, ts_increase, ts_irate, ts_rate, + ts_resample, +) # Bulkhead concurrency isolation + rate-limit header parsing from je_auto_control.utils.bulkhead import ( Bulkhead, BulkheadFullError, next_delay, parse_ratelimit, parse_retry_after, @@ -978,6 +983,8 @@ def start_autocontrol_gui(*args, **kwargs): "run_experiment", "BurnRule", "burn_alerts", "burn_rate", "default_burn_rules", "evaluate_slo", "LatencyDigest", "exact_percentiles", + "ts_delta", "ts_downsample", "ts_idelta", "ts_increase", "ts_irate", + "ts_rate", "ts_resample", "Bulkhead", "BulkheadFullError", "next_delay", "parse_ratelimit", "parse_retry_after", "Cassette", "CassetteMissError", diff --git a/je_auto_control/gui/script_builder/command_schema.py b/je_auto_control/gui/script_builder/command_schema.py index 8a194672..76ecfc32 100644 --- a/je_auto_control/gui/script_builder/command_schema.py +++ b/je_auto_control/gui/script_builder/command_schema.py @@ -1921,6 +1921,26 @@ def _add_resilience_specs(specs: List[CommandSpec]) -> None: ), description="Classify JSON-Schema changes as backward/forward/full.", )) + specs.append(CommandSpec( + "AC_ts_rate", "Data", "Time-Series: Counter Rate", + fields=( + FieldSpec("series", FieldType.STRING, + placeholder="[[0, 0], [10, 50], [20, 120]]"), + FieldSpec("window_s", FieldType.FLOAT, optional=True), + ), + description="Per-second counter rate (reset-aware) over a series.", + )) + specs.append(CommandSpec( + "AC_ts_downsample", "Data", "Time-Series: Downsample", + fields=( + FieldSpec("series", FieldType.STRING, + placeholder="[[0, 1], [5, 3], [12, 9]]"), + FieldSpec("bucket_s", FieldType.FLOAT, placeholder="10"), + FieldSpec("agg", FieldType.STRING, optional=True, + placeholder="avg|sum|min|max|first|last|count"), + ), + description="Roll a series into tumbling buckets by aggregate.", + )) specs.append(CommandSpec( "AC_diff_rows", "Data", "Dataset Diff: Rows by Key", fields=( diff --git a/je_auto_control/utils/executor/action_executor.py b/je_auto_control/utils/executor/action_executor.py index d459705f..02d9b6e8 100644 --- a/je_auto_control/utils/executor/action_executor.py +++ b/je_auto_control/utils/executor/action_executor.py @@ -3369,6 +3369,27 @@ def _percentiles(samples: Any, qs: Any = None) -> Dict[str, Any]: return {"percentiles": {str(q): value for q, value in result.items()}} +def _ts_rate(series: Any, window_s: Any = None) -> Dict[str, Any]: + """Adapter: per-second counter rate over a (ts, value) series.""" + import json + from je_auto_control.utils.timeseries import ts_rate + if isinstance(series, str): + series = json.loads(series) + window = float(window_s) if window_s is not None else None + return {"rate": ts_rate(series, window_s=window)} + + +def _ts_downsample(series: Any, bucket_s: Any, + agg: str = "avg") -> Dict[str, Any]: + """Adapter: downsample a (ts, value) series into tumbling buckets.""" + import json + from je_auto_control.utils.timeseries import ts_downsample + if isinstance(series, str): + series = json.loads(series) + buckets = ts_downsample(series, float(bucket_s), agg) + return {"buckets": [list(point) for point in buckets]} + + def _evaluate_slo(records: Any, target: float, window_s: Optional[float] = None) -> Dict[str, Any]: """Adapter: SLI + error budget for outcome records (list or JSON string).""" @@ -4464,6 +4485,8 @@ def __init__(self): "AC_resolve_config": _resolve_config, "AC_explain_config": _explain_config, "AC_check_compatibility": _check_compatibility, + "AC_ts_rate": _ts_rate, + "AC_ts_downsample": _ts_downsample, "AC_detect_drift": _detect_drift, "AC_categorical_drift": _categorical_drift, "AC_diff_rows": _diff_rows, diff --git a/je_auto_control/utils/mcp_server/tools/_factories.py b/je_auto_control/utils/mcp_server/tools/_factories.py index c19fa56f..fb975d1b 100644 --- a/je_auto_control/utils/mcp_server/tools/_factories.py +++ b/je_auto_control/utils/mcp_server/tools/_factories.py @@ -3548,6 +3548,34 @@ def dataset_diff_tools() -> List[MCPTool]: ] +def timeseries_tools() -> List[MCPTool]: + return [ + MCPTool( + name="ac_ts_rate", + description=("Per-second counter rate (reset-aware) over a 'series' " + "of [timestamp, value] pairs, optional 'window_s'. " + "Returns {rate}."), + input_schema=schema( + {"series": {"type": "array"}, "window_s": {"type": "number"}}, + ["series"]), + handler=h.ts_rate, + annotations=READ_ONLY, + ), + MCPTool( + name="ac_ts_downsample", + description=("Roll a 'series' of [timestamp, value] pairs into " + "'bucket_s' tumbling buckets aggregated by 'agg' " + "(avg/sum/min/max/first/last/count). Returns {buckets}."), + input_schema=schema( + {"series": {"type": "array"}, "bucket_s": {"type": "number"}, + "agg": {"type": "string"}}, + ["series", "bucket_s"]), + handler=h.ts_downsample, + annotations=READ_ONLY, + ), + ] + + def schema_compat_tools() -> List[MCPTool]: return [ MCPTool( @@ -5411,6 +5439,7 @@ def media_assert_tools() -> List[MCPTool]: secret_ref_tools, config_schema_tools, config_redaction_tools, data_profile_tools, http_problem_tools, dotenv_tools, sse_client_tools, layered_config_tools, data_drift_tools, schema_compat_tools, + timeseries_tools, dataset_diff_tools, referential_tools, link_header_tools, multipart_tools, http_content_tools, cookie_jar_tools, http_conditional_tools, saga_tools, decision_table_tools, locator_repair_tools, diff --git a/je_auto_control/utils/mcp_server/tools/_handlers.py b/je_auto_control/utils/mcp_server/tools/_handlers.py index 9c68be9d..20b1683a 100644 --- a/je_auto_control/utils/mcp_server/tools/_handlers.py +++ b/je_auto_control/utils/mcp_server/tools/_handlers.py @@ -1880,6 +1880,16 @@ def check_compatibility(old, new, mode="backward"): return _check_compatibility(old, new, mode) +def ts_rate(series, window_s=None): + from je_auto_control.utils.executor.action_executor import _ts_rate + return _ts_rate(series, window_s) + + +def ts_downsample(series, bucket_s, agg="avg"): + from je_auto_control.utils.executor.action_executor import _ts_downsample + return _ts_downsample(series, bucket_s, agg) + + def detect_drift(reference, current, threshold=0.25, bins=10): from je_auto_control.utils.executor.action_executor import _detect_drift return _detect_drift(reference, current, threshold, bins) diff --git a/je_auto_control/utils/timeseries/__init__.py b/je_auto_control/utils/timeseries/__init__.py new file mode 100644 index 00000000..92ca092e --- /dev/null +++ b/je_auto_control/utils/timeseries/__init__.py @@ -0,0 +1,10 @@ +"""Time-series transforms (rate / downsample / resample) for AutoControl.""" +from je_auto_control.utils.timeseries.timeseries import ( + ts_delta, ts_downsample, ts_idelta, ts_increase, ts_irate, ts_rate, + ts_resample, +) + +__all__ = [ + "ts_delta", "ts_downsample", "ts_idelta", "ts_increase", "ts_irate", + "ts_rate", "ts_resample", +] diff --git a/je_auto_control/utils/timeseries/timeseries.py b/je_auto_control/utils/timeseries/timeseries.py new file mode 100644 index 00000000..66afc3f0 --- /dev/null +++ b/je_auto_control/utils/timeseries/timeseries.py @@ -0,0 +1,133 @@ +"""Time-series transforms over ``(timestamp, value)`` sequences. + +``observability`` counters/gauges store only the *current* value — nothing turns +a counter into a per-second rate, and ``cost_telemetry`` only buckets by a fixed +day. This adds Prometheus-style ``rate`` / ``irate`` / ``increase`` / ``delta`` +(reset-aware) plus tumbling-bucket ``downsample`` and grid ``resample``. + +Pure standard library (``bisect``); imports no ``PySide6``. No wall clock is +read — windows use the series' own timestamps — so every function is fully +deterministic in CI. +""" +import bisect +from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple + +Point = Tuple[float, float] +Series = Sequence[Point] + +_AGGS: Dict[str, Callable[[List[float]], float]] = { + "avg": lambda values: sum(values) / len(values), + "sum": sum, + "min": min, + "max": max, + "first": lambda values: values[0], + "last": lambda values: values[-1], + "count": len, +} + + +def _sorted(series: Series) -> List[Point]: + return sorted((float(ts), float(value)) for ts, value in series) + + +def ts_increase(series: Series) -> float: + """Total counter increase over the series, treating drops as resets.""" + points = _sorted(series) + if len(points) < 2: + return 0.0 + total = 0.0 + previous = points[0][1] + for _, value in points[1:]: + total += (value - previous) if value >= previous else value + previous = value + return total + + +def ts_rate(series: Series, *, window_s: Optional[float] = None) -> float: + """Per-second rate of a monotonic counter (reset-aware), over an optional window.""" + points = _sorted(series) + if window_s is not None and points: + cutoff = points[-1][0] - window_s + points = [point for point in points if point[0] >= cutoff] + if len(points) < 2: + return 0.0 + span = points[-1][0] - points[0][0] + return ts_increase(points) / span if span > 0 else 0.0 + + +def ts_irate(series: Series) -> float: + """Instant per-second rate from the last two samples (reset-aware).""" + points = _sorted(series) + if len(points) < 2: + return 0.0 + (time_a, value_a), (time_b, value_b) = points[-2], points[-1] + span = time_b - time_a + if span <= 0: + return 0.0 + increase = (value_b - value_a) if value_b >= value_a else value_b + return increase / span + + +def ts_delta(series: Series) -> float: + """First-to-last difference (for gauges).""" + points = _sorted(series) + return (points[-1][1] - points[0][1]) if len(points) >= 2 else 0.0 + + +def ts_idelta(series: Series) -> float: + """Difference of the last two samples (for gauges).""" + points = _sorted(series) + return (points[-1][1] - points[-2][1]) if len(points) >= 2 else 0.0 + + +def ts_downsample(series: Series, bucket_s: float, + agg: str = "avg") -> List[Point]: + """Roll the series into ``bucket_s`` tumbling buckets aggregated by ``agg``.""" + if bucket_s <= 0: + raise ValueError("bucket_s must be positive") + func = _AGGS.get(agg) + if func is None: + raise ValueError(f"unknown agg: {agg!r}") + buckets: Dict[float, List[float]] = {} + for ts, value in _sorted(series): + start = (ts // bucket_s) * bucket_s + buckets.setdefault(start, []).append(value) + return [(start, float(func(values))) + for start, values in sorted(buckets.items())] + + +def _value_at(times: List[float], values: List[float], at: float, + fill: Optional[str]) -> Optional[float]: + index = bisect.bisect_right(times, at) - 1 + if index < 0: + return values[0] if fill in ("last", "linear") else None + if times[index] == at or fill == "last": + return values[index] + if fill == "linear" and index + 1 < len(times): + time_0, value_0 = times[index], values[index] + time_1, value_1 = times[index + 1], values[index + 1] + return value_0 + (value_1 - value_0) * (at - time_0) / (time_1 - time_0) + if fill == "linear": + return values[index] + return None + + +def ts_resample(series: Series, bucket_s: float, *, + fill: Optional[str] = "last") -> List[Tuple[float, Any]]: + """Align the series to a fixed ``bucket_s`` grid, filling per ``fill``. + + ``fill`` is ``"last"`` (carry forward), ``"linear"`` (interpolate), or + ``None`` (gaps become ``None``). + """ + if bucket_s <= 0: + raise ValueError("bucket_s must be positive") + points = _sorted(series) + if not points: + return [] + times = [point[0] for point in points] + values = [point[1] for point in points] + start = (times[0] // bucket_s) * bucket_s + steps = int((times[-1] - start) // bucket_s) + 1 + return [(start + index * bucket_s, + _value_at(times, values, start + index * bucket_s, fill)) + for index in range(steps)] diff --git a/test/unit_test/headless/test_timeseries_batch.py b/test/unit_test/headless/test_timeseries_batch.py new file mode 100644 index 00000000..95c9c4a2 --- /dev/null +++ b/test/unit_test/headless/test_timeseries_batch.py @@ -0,0 +1,95 @@ +"""Headless tests for time-series transforms. Pure stdlib, no Qt.""" +import json + +import pytest + +import je_auto_control as ac +from je_auto_control.utils.timeseries import ( + ts_delta, ts_downsample, ts_idelta, ts_increase, ts_irate, ts_rate, + ts_resample, +) + + +def test_rate_and_increase(): + series = [(0, 0), (10, 50), (20, 120)] + assert ts_increase(series) == pytest.approx(120.0) + assert ts_rate(series) == pytest.approx(6.0) # 120 over 20s + + +def test_rate_handles_counter_reset(): + series = [(0, 90), (10, 100), (20, 5), (30, 25)] # reset between 100 and 5 + # increase = (100-90) + reset(5) + (25-5) = 10 + 5 + 20 = 35 + assert ts_increase(series) == pytest.approx(35.0) + assert ts_rate(series) == pytest.approx(35.0 / 30) + + +def test_rate_window(): + series = [(0, 0), (10, 10), (20, 30), (30, 60)] + # window 10 keeps the last two points (20,30),(30,60) → 30 over 10s + assert ts_rate(series, window_s=10) == pytest.approx(3.0) + + +def test_irate_and_delta(): + series = [(0, 0), (10, 10), (20, 40)] + assert ts_irate(series) == pytest.approx(3.0) # (40-10)/10 + assert ts_delta(series) == pytest.approx(40.0) + assert ts_idelta(series) == pytest.approx(30.0) + + +def test_downsample_aggregates(): + series = [(0, 1), (3, 3), (5, 10), (12, 9)] + buckets = ts_downsample(series, 5, "avg") + assert buckets[0] == (0, pytest.approx(2.0)) # bucket [0,5): 1,3 + assert dict(ts_downsample(series, 5, "max"))[5] == pytest.approx(10.0) + assert dict(ts_downsample(series, 5, "count"))[0] == 2 + + +def test_downsample_validation(): + with pytest.raises(ValueError): + ts_downsample([(0, 1)], 0) + with pytest.raises(ValueError): + ts_downsample([(0, 1)], 5, "median") + + +def test_resample_fill_modes(): + series = [(0, 0), (20, 20)] + last = ts_resample(series, 10, fill="last") + assert last == [(0, 0.0), (10, 0.0), (20, 20.0)] + linear = ts_resample(series, 10, fill="linear") + assert linear[1][1] == pytest.approx(10.0) # midpoint interpolated + nofill = ts_resample(series, 10, fill=None) + assert nofill[1][1] is None + + +def test_empty_series(): + assert ts_rate([]) == 0.0 and ts_resample([], 5) == [] + + +# --- wiring --------------------------------------------------------------- + +def test_executor_round_trip(): + series = json.dumps([[0, 0], [10, 50], [20, 120]]) + rec = ac.execute_action([["AC_ts_rate", {"series": series}]]) + assert next(v for v in rec.values() + if isinstance(v, dict))["rate"] == pytest.approx(6.0) + rec2 = ac.execute_action([[ + "AC_ts_downsample", {"series": series, "bucket_s": 10}]]) + buckets = next(v for v in rec2.values() if isinstance(v, dict))["buckets"] + assert len(buckets) == 3 + + +def test_wiring(): + known = ac.executor.known_commands() + assert {"AC_ts_rate", "AC_ts_downsample"} <= set(known) + from je_auto_control.utils.mcp_server.tools import build_default_tool_registry + names = {t.name for t in build_default_tool_registry()} + assert {"ac_ts_rate", "ac_ts_downsample"} <= names + from je_auto_control.gui.script_builder.command_schema import _build_specs + specs = {s.command for s in _build_specs()} + assert {"AC_ts_rate", "AC_ts_downsample"} <= specs + + +def test_facade_exports(): + for attr in ("ts_rate", "ts_irate", "ts_increase", "ts_delta", "ts_idelta", + "ts_downsample", "ts_resample"): + assert hasattr(ac, attr) and attr in ac.__all__ From b4aa9ad8d6b2336eced9817cba1e9934c1d12c5f Mon Sep 17 00:00:00 2001 From: JeffreyChen Date: Mon, 22 Jun 2026 06:48:09 +0800 Subject: [PATCH 2/2] Use pytest.approx for float comparisons in timeseries test (Sonar S1244) --- test/unit_test/headless/test_timeseries_batch.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/test/unit_test/headless/test_timeseries_batch.py b/test/unit_test/headless/test_timeseries_batch.py index 95c9c4a2..fd1a4ac7 100644 --- a/test/unit_test/headless/test_timeseries_batch.py +++ b/test/unit_test/headless/test_timeseries_batch.py @@ -54,7 +54,8 @@ def test_downsample_validation(): def test_resample_fill_modes(): series = [(0, 0), (20, 20)] last = ts_resample(series, 10, fill="last") - assert last == [(0, 0.0), (10, 0.0), (20, 20.0)] + assert [t for t, _ in last] == [0, 10, 20] + assert [v for _, v in last] == pytest.approx([0.0, 0.0, 20.0]) linear = ts_resample(series, 10, fill="linear") assert linear[1][1] == pytest.approx(10.0) # midpoint interpolated nofill = ts_resample(series, 10, fill=None) @@ -62,7 +63,7 @@ def test_resample_fill_modes(): def test_empty_series(): - assert ts_rate([]) == 0.0 and ts_resample([], 5) == [] + assert ts_rate([]) == pytest.approx(0.0) and ts_resample([], 5) == [] # --- wiring ---------------------------------------------------------------