From 6db77dabf7e953c3539c61d2a21ef62513c71ecb Mon Sep 17 00:00:00 2001 From: JeffreyChen Date: Mon, 22 Jun 2026 10:52:58 +0800 Subject: [PATCH] Add transactional outbox for durable at-least-once event delivery --- README.md | 7 ++ README/README_zh-CN.md | 7 ++ README/README_zh-TW.md | 7 ++ .../doc/new_features/v107_features_doc.rst | 46 ++++++++ docs/source/Eng/eng_index.rst | 1 + .../Zh/doc/new_features/v107_features_doc.rst | 38 +++++++ docs/source/Zh/zh_index.rst | 1 + je_auto_control/__init__.py | 3 + .../gui/script_builder/command_schema.py | 16 +++ .../utils/executor/action_executor.py | 23 ++++ .../utils/mcp_server/tools/_factories.py | 25 ++++- .../utils/mcp_server/tools/_handlers.py | 10 ++ je_auto_control/utils/outbox/__init__.py | 4 + je_auto_control/utils/outbox/outbox.py | 88 +++++++++++++++ test/unit_test/headless/test_outbox_batch.py | 104 ++++++++++++++++++ 15 files changed, 379 insertions(+), 1 deletion(-) create mode 100644 docs/source/Eng/doc/new_features/v107_features_doc.rst create mode 100644 docs/source/Zh/doc/new_features/v107_features_doc.rst create mode 100644 je_auto_control/utils/outbox/__init__.py create mode 100644 je_auto_control/utils/outbox/outbox.py create mode 100644 test/unit_test/headless/test_outbox_batch.py diff --git a/README.md b/README.md index 7fb68aa9..a5a6c5bc 100644 --- a/README.md +++ b/README.md @@ -13,6 +13,7 @@ ## Table of Contents +- [What's new (2026-06-22) — Transactional Outbox](#whats-new-2026-06-22--transactional-outbox) - [What's new (2026-06-22) — Optimistic-Concurrency Versioned Store](#whats-new-2026-06-22--optimistic-concurrency-versioned-store) - [What's new (2026-06-22) — Per-Stream Sequence-Gap Detection](#whats-new-2026-06-22--per-stream-sequence-gap-detection) - [What's new (2026-06-22) — Time-Windowed Deduplication](#whats-new-2026-06-22--time-windowed-deduplication) @@ -159,6 +160,12 @@ --- +## What's new (2026-06-22) — Transactional Outbox + +Durably buffer events and drain them at-least-once. Full reference: [`docs/source/Eng/doc/new_features/v107_features_doc.rst`](docs/source/Eng/doc/new_features/v107_features_doc.rst). + +- **`Outbox`** (`AC_outbox_enqueue`, `AC_outbox_pending`): `events.cloud_events` posts synchronously with no durability — a crash or network blip loses the event. The outbox persists each event first, then `drain`s pending entries through an injected sink with at-least-once delivery: a sink failure leaves the entry pending for retry until `max_attempts`, after which it is dead-lettered. `save` / `load` keep events across restarts. Pure-stdlib, deterministic. + ## What's new (2026-06-22) — Optimistic-Concurrency Versioned Store Update only if the version is unchanged (compare-and-swap / If-Match). Full reference: [`docs/source/Eng/doc/new_features/v106_features_doc.rst`](docs/source/Eng/doc/new_features/v106_features_doc.rst). diff --git a/README/README_zh-CN.md b/README/README_zh-CN.md index 804030f8..1b77da7e 100644 --- a/README/README_zh-CN.md +++ b/README/README_zh-CN.md @@ -12,6 +12,7 @@ ## 目录 +- [本次更新 (2026-06-22) — 事务型 Outbox](#本次更新-2026-06-22--事务型-outbox) - [本次更新 (2026-06-22) — 乐观并发版本存储](#本次更新-2026-06-22--乐观并发版本存储) - [本次更新 (2026-06-22) — 逐流序号间隙检测](#本次更新-2026-06-22--逐流序号间隙检测) - [本次更新 (2026-06-22) — 时间窗口去重](#本次更新-2026-06-22--时间窗口去重) @@ -162,6 +163,12 @@ 平滑噪声值序列。完整参考:[`docs/source/Zh/doc/new_features/v102_features_doc.rst`](../docs/source/Zh/doc/new_features/v102_features_doc.rst)。 +## 本次更新 (2026-06-22) — 事务型 Outbox + +持久化缓冲事件并以至少一次传递排空。完整参考:[`docs/source/Zh/doc/new_features/v107_features_doc.rst`](../docs/source/Zh/doc/new_features/v107_features_doc.rst)。 + +- **`Outbox`**(`AC_outbox_enqueue`、`AC_outbox_pending`):`events.cloud_events` 同步发送且无持久化——当机或网络抖动就会丢失事件。Outbox 先持久化每个事件,再通过注入的 sink 以至少一次传递 `drain` 待传递项目:sink 失败时项目维持待传递以供重试,直到 `max_attempts`,之后列为死信。`save` / `load` 让事件能跨重启存活。纯标准库、确定。 + ## 本次更新 (2026-06-22) — 乐观并发版本存储 只在版本未变时更新(compare-and-swap / If-Match)。完整参考:[`docs/source/Zh/doc/new_features/v106_features_doc.rst`](../docs/source/Zh/doc/new_features/v106_features_doc.rst)。 diff --git a/README/README_zh-TW.md b/README/README_zh-TW.md index 22db9472..8b3cdc60 100644 --- a/README/README_zh-TW.md +++ b/README/README_zh-TW.md @@ -12,6 +12,7 @@ ## 目錄 +- [本次更新 (2026-06-22) — 交易型 Outbox](#本次更新-2026-06-22--交易型-outbox) - [本次更新 (2026-06-22) — 樂觀並行版本儲存](#本次更新-2026-06-22--樂觀並行版本儲存) - [本次更新 (2026-06-22) — 逐串流序號間隙偵測](#本次更新-2026-06-22--逐串流序號間隙偵測) - [本次更新 (2026-06-22) — 時間視窗去重](#本次更新-2026-06-22--時間視窗去重) @@ -162,6 +163,12 @@ 平滑雜訊值序列。完整參考:[`docs/source/Zh/doc/new_features/v102_features_doc.rst`](../docs/source/Zh/doc/new_features/v102_features_doc.rst)。 +## 本次更新 (2026-06-22) — 交易型 Outbox + +持久化緩衝事件並以至少一次傳遞排空。完整參考:[`docs/source/Zh/doc/new_features/v107_features_doc.rst`](../docs/source/Zh/doc/new_features/v107_features_doc.rst)。 + +- **`Outbox`**(`AC_outbox_enqueue`、`AC_outbox_pending`):`events.cloud_events` 同步發送且無持久化——當機或網路抖動就會丟失事件。Outbox 先持久化每個事件,再透過注入的 sink 以至少一次傳遞 `drain` 待傳遞項目:sink 失敗時項目維持待傳遞以供重試,直到 `max_attempts`,之後列為死信。`save` / `load` 讓事件能跨重啟存活。純標準函式庫、具決定性。 + ## 本次更新 (2026-06-22) — 樂觀並行版本儲存 只在版本未變時更新(compare-and-swap / If-Match)。完整參考:[`docs/source/Zh/doc/new_features/v106_features_doc.rst`](../docs/source/Zh/doc/new_features/v106_features_doc.rst)。 diff --git a/docs/source/Eng/doc/new_features/v107_features_doc.rst b/docs/source/Eng/doc/new_features/v107_features_doc.rst new file mode 100644 index 00000000..3af254f2 --- /dev/null +++ b/docs/source/Eng/doc/new_features/v107_features_doc.rst @@ -0,0 +1,46 @@ +Transactional Outbox +==================== + +``events.cloud_events`` posts immediately and synchronously — a crash between +"did the work" and "sent the event" loses it, and a network blip drops it (no +durability, no retry, no replay). The transactional-outbox pattern persists each +event first and drains it later with at-least-once delivery and a dead-letter +cap, so events survive sink outages. + +Pure standard library (``json``); imports no ``PySide6``. The delivery ``sink`` +is injected and the store is in-memory with JSON persistence, so draining is +fully deterministic in CI. + +Headless API +------------ + +.. code-block:: python + + from je_auto_control import Outbox + + box = Outbox() + box.enqueue({"type": "order.created", "id": 7}) # buffered, pending + box.enqueue({"type": "order.paid", "id": 7}) + + result = box.drain(post_to_webhook, max_batch=100, max_attempts=5) + # {"sent": 2, "failed": 0, "remaining": 0} + + box.pending() # entries still awaiting delivery + box.dead_letters() # entries that exhausted their attempts + +``enqueue`` appends an event as pending and returns its id. ``drain`` delivers +up to ``max_batch`` pending entries through the injected ``sink``; a sink +exception leaves the entry pending for retry until ``max_attempts``, after which +it is dead-lettered (recorded with its error). Delivery is at-least-once: a sink +that succeeds but is interrupted before the entry is marked sent will be retried. +``save`` / ``load`` persist the whole buffer as JSON so events outlive the +process. + +Executor commands +----------------- + +``AC_outbox_enqueue`` returns ``{id, pending}``; ``AC_outbox_pending`` returns +``{pending}``. Both use a named-instance registry and are exposed as MCP tools +(``ac_outbox_enqueue`` / ``ac_outbox_pending``) and as Script Builder commands +under **Flow**. Draining requires a callable sink, so it stays a headless / API +operation. diff --git a/docs/source/Eng/eng_index.rst b/docs/source/Eng/eng_index.rst index 0b2ef089..6466dc30 100644 --- a/docs/source/Eng/eng_index.rst +++ b/docs/source/Eng/eng_index.rst @@ -129,6 +129,7 @@ Comprehensive guides for all AutoControl features. doc/new_features/v104_features_doc doc/new_features/v105_features_doc doc/new_features/v106_features_doc + doc/new_features/v107_features_doc doc/ocr_backends/ocr_backends_doc doc/observability/observability_doc doc/operations_layer/operations_layer_doc diff --git a/docs/source/Zh/doc/new_features/v107_features_doc.rst b/docs/source/Zh/doc/new_features/v107_features_doc.rst new file mode 100644 index 00000000..0d6b1874 --- /dev/null +++ b/docs/source/Zh/doc/new_features/v107_features_doc.rst @@ -0,0 +1,38 @@ +交易型 Outbox +============= + +``events.cloud_events`` 立即且同步發送——在「完成工作」與「送出事件」之間若當機,事件就遺失;網路抖動也會 +丟失(沒有持久化、沒有重試、沒有重播)。交易型 outbox 模式先持久化每個事件,稍後再以至少一次(at-least-once) +傳遞與死信上限來排空(drain),讓事件能在接收端故障時存活。 + +純標準函式庫(``json``);不匯入 ``PySide6``。傳遞用的 ``sink`` 以注入方式提供,儲存為記憶體內並具 JSON +持久化,因此排空在 CI 中完全具決定性。 + +無頭 API +-------- + +.. code-block:: python + + from je_auto_control import Outbox + + box = Outbox() + box.enqueue({"type": "order.created", "id": 7}) # 已緩衝、待傳遞 + box.enqueue({"type": "order.paid", "id": 7}) + + result = box.drain(post_to_webhook, max_batch=100, max_attempts=5) + # {"sent": 2, "failed": 0, "remaining": 0} + + box.pending() # 仍待傳遞的項目 + box.dead_letters() # 已用盡重試次數的項目 + +``enqueue`` 將事件附加為待傳遞並回傳其 id。``drain`` 透過注入的 ``sink`` 傳遞至多 ``max_batch`` 個待傳遞項目; +``sink`` 拋出例外時,該項目維持待傳遞以供重試,直到 ``max_attempts``,之後被列為死信(連同錯誤一併記錄)。 +傳遞為至少一次:若 ``sink`` 成功但在標記為已送出前被中斷,該項目會被重試。``save`` / ``load`` 以 JSON +持久化整個緩衝區,讓事件能在行程結束後存活。 + +執行器命令 +---------- + +``AC_outbox_enqueue`` 回傳 ``{id, pending}``;``AC_outbox_pending`` 回傳 ``{pending}``。兩者使用具名實例登錄, +並以 MCP 工具(``ac_outbox_enqueue`` / ``ac_outbox_pending``)以及 Script Builder 中 **Flow** 分類下的命令提供。 +排空需要可呼叫的 sink,因此維持為無頭 / API 操作。 diff --git a/docs/source/Zh/zh_index.rst b/docs/source/Zh/zh_index.rst index e750be4f..eed2fa78 100644 --- a/docs/source/Zh/zh_index.rst +++ b/docs/source/Zh/zh_index.rst @@ -129,6 +129,7 @@ AutoControl 所有功能的完整使用指南。 doc/new_features/v104_features_doc doc/new_features/v105_features_doc doc/new_features/v106_features_doc + doc/new_features/v107_features_doc doc/ocr_backends/ocr_backends_doc doc/observability/observability_doc doc/operations_layer/operations_layer_doc diff --git a/je_auto_control/__init__.py b/je_auto_control/__init__.py index 7a437087..b48e5d28 100644 --- a/je_auto_control/__init__.py +++ b/je_auto_control/__init__.py @@ -211,6 +211,8 @@ from je_auto_control.utils.optimistic import ( VersionConflict, VersionedStore, check_if_match, if_match_header, ) +# Transactional outbox (durable at-least-once event delivery) +from je_auto_control.utils.outbox import Outbox # CI workflow annotations (GitHub Actions) from je_auto_control.utils.ci_annotations import ( emit_annotations, format_annotation, @@ -940,6 +942,7 @@ def start_autocontrol_gui(*args, **kwargs): "IdempotencyConflict", "IdempotencyStore", "request_fingerprint", "DedupWindow", "SequenceTracker", "VersionConflict", "VersionedStore", "check_if_match", "if_match_header", + "Outbox", "emit_annotations", "format_annotation", "ClipboardHistory", "default_clipboard_history", "analyze_heal_log", "heal_stats", "scan_secrets", diff --git a/je_auto_control/gui/script_builder/command_schema.py b/je_auto_control/gui/script_builder/command_schema.py index c80c69fc..cb672e2b 100644 --- a/je_auto_control/gui/script_builder/command_schema.py +++ b/je_auto_control/gui/script_builder/command_schema.py @@ -2050,6 +2050,22 @@ def _add_resilience_specs(specs: List[CommandSpec]) -> None: ), description="Read a versioned record {value, version}.", )) + specs.append(CommandSpec( + "AC_outbox_enqueue", "Flow", "Outbox: Enqueue", + fields=( + FieldSpec("name", FieldType.STRING, placeholder="orders"), + FieldSpec("event", FieldType.STRING, + placeholder='{"type": "order.created", "id": 7}'), + ), + description="Durably buffer an event for at-least-once delivery.", + )) + specs.append(CommandSpec( + "AC_outbox_pending", "Flow", "Outbox: Pending", + fields=( + FieldSpec("name", FieldType.STRING, placeholder="orders"), + ), + description="List events still awaiting successful delivery.", + )) specs.append(CommandSpec( "AC_diff_rows", "Data", "Dataset Diff: Rows by Key", fields=( diff --git a/je_auto_control/utils/executor/action_executor.py b/je_auto_control/utils/executor/action_executor.py index e54e8708..ed9064ea 100644 --- a/je_auto_control/utils/executor/action_executor.py +++ b/je_auto_control/utils/executor/action_executor.py @@ -2933,6 +2933,27 @@ def _rate_limit(name: str, rate: float = 1.0, capacity: float = 1.0, _DEDUP_WINDOWS: Dict[str, Any] = {} _SEQUENCE_TRACKERS: Dict[str, Any] = {} _VERSIONED_STORES: Dict[str, Any] = {} +_OUTBOXES: Dict[str, Any] = {} + + +def _outbox_enqueue(name: str, event: Any) -> Dict[str, Any]: + """Adapter: enqueue an event into a named outbox.""" + import json + from je_auto_control.utils.outbox import Outbox + if isinstance(event, str): + try: + event = json.loads(event) + except ValueError: + pass + outbox = _OUTBOXES.setdefault(name, Outbox()) + return {"id": outbox.enqueue(event), "pending": len(outbox.pending())} + + +def _outbox_pending(name: str) -> Dict[str, Any]: + """Adapter: list pending entries of a named outbox.""" + from je_auto_control.utils.outbox import Outbox + outbox = _OUTBOXES.setdefault(name, Outbox()) + return {"pending": outbox.pending()} def _cas_put(name: str, key: str, value: Any, @@ -4615,6 +4636,8 @@ def __init__(self): "AC_sequence_observe": _sequence_observe, "AC_cas_put": _cas_put, "AC_cas_get": _cas_get, + "AC_outbox_enqueue": _outbox_enqueue, + "AC_outbox_pending": _outbox_pending, "AC_detect_drift": _detect_drift, "AC_categorical_drift": _categorical_drift, "AC_diff_rows": _diff_rows, diff --git a/je_auto_control/utils/mcp_server/tools/_factories.py b/je_auto_control/utils/mcp_server/tools/_factories.py index 05a4ea79..b4400fc4 100644 --- a/je_auto_control/utils/mcp_server/tools/_factories.py +++ b/je_auto_control/utils/mcp_server/tools/_factories.py @@ -3576,6 +3576,29 @@ def optimistic_tools() -> List[MCPTool]: ] +def outbox_tools() -> List[MCPTool]: + return [ + MCPTool( + name="ac_outbox_enqueue", + description=("Enqueue 'event' into named outbox 'name' for durable " + "at-least-once delivery. Returns {id, pending}."), + input_schema=schema( + {"name": {"type": "string"}, "event": {"type": "object"}}, + ["name", "event"]), + handler=h.outbox_enqueue, + annotations=NON_DESTRUCTIVE, + ), + MCPTool( + name="ac_outbox_pending", + description=("List the pending entries of named outbox 'name'. " + "Returns {pending}."), + input_schema=schema({"name": {"type": "string"}}, ["name"]), + handler=h.outbox_pending, + annotations=READ_ONLY, + ), + ] + + def sequence_gap_tools() -> List[MCPTool]: return [ MCPTool( @@ -5615,7 +5638,7 @@ def media_assert_tools() -> List[MCPTool]: data_profile_tools, http_problem_tools, dotenv_tools, sse_client_tools, layered_config_tools, data_drift_tools, schema_compat_tools, timeseries_tools, anomaly_tools, smoothing_tools, idempotency_tools, - dedup_window_tools, sequence_gap_tools, optimistic_tools, + dedup_window_tools, sequence_gap_tools, optimistic_tools, outbox_tools, dataset_diff_tools, referential_tools, link_header_tools, multipart_tools, http_content_tools, cookie_jar_tools, http_conditional_tools, saga_tools, decision_table_tools, locator_repair_tools, diff --git a/je_auto_control/utils/mcp_server/tools/_handlers.py b/je_auto_control/utils/mcp_server/tools/_handlers.py index a20a1962..665dc19c 100644 --- a/je_auto_control/utils/mcp_server/tools/_handlers.py +++ b/je_auto_control/utils/mcp_server/tools/_handlers.py @@ -1952,6 +1952,16 @@ def cas_get(name, key): return _cas_get(name, key) +def outbox_enqueue(name, event): + from je_auto_control.utils.executor.action_executor import _outbox_enqueue + return _outbox_enqueue(name, event) + + +def outbox_pending(name): + from je_auto_control.utils.executor.action_executor import _outbox_pending + return _outbox_pending(name) + + def detect_drift(reference, current, threshold=0.25, bins=10): from je_auto_control.utils.executor.action_executor import _detect_drift return _detect_drift(reference, current, threshold, bins) diff --git a/je_auto_control/utils/outbox/__init__.py b/je_auto_control/utils/outbox/__init__.py new file mode 100644 index 00000000..c2e29932 --- /dev/null +++ b/je_auto_control/utils/outbox/__init__.py @@ -0,0 +1,4 @@ +"""Transactional outbox for durable at-least-once event delivery.""" +from je_auto_control.utils.outbox.outbox import Outbox + +__all__ = ["Outbox"] diff --git a/je_auto_control/utils/outbox/outbox.py b/je_auto_control/utils/outbox/outbox.py new file mode 100644 index 00000000..b76a891c --- /dev/null +++ b/je_auto_control/utils/outbox/outbox.py @@ -0,0 +1,88 @@ +"""Transactional outbox: durably buffer events and drain them at-least-once. + +``events.cloud_events`` posts immediately and synchronously — a crash between +"did the work" and "sent the event" loses it, and a network blip drops it (no +durability, no retry, no replay). The outbox pattern persists each event and +drains it later with at-least-once delivery and a dead-letter cap. + +Pure standard library (``json``); imports no ``PySide6``. The delivery ``sink`` +is injected and the store is in-memory with JSON persistence, so draining is +fully deterministic in CI. +""" +import json +from pathlib import Path +from typing import Any, Callable, Dict, List + +Sink = Callable[[Any], Any] + + +class Outbox: + """An ordered buffer of events drained to a sink with retry + dead-letter.""" + + def __init__(self) -> None: + self._events: List[Dict[str, Any]] = [] + self._counter = 0 + + def enqueue(self, event: Any) -> str: + """Append ``event`` as pending; return its id.""" + self._counter += 1 + entry_id = str(self._counter) + self._events.append({"id": entry_id, "event": event, + "status": "pending", "attempts": 0}) + return entry_id + + def pending(self) -> List[Dict[str, Any]]: + """Entries still awaiting successful delivery.""" + return [entry for entry in self._events if entry["status"] == "pending"] + + def dead_letters(self) -> List[Dict[str, Any]]: + """Entries that exhausted their delivery attempts.""" + return [entry for entry in self._events if entry["status"] == "failed"] + + def drain(self, sink: Sink, *, max_batch: int = 100, + max_attempts: int = 5) -> Dict[str, int]: + """Deliver up to ``max_batch`` pending entries via ``sink``. + + On a sink exception the entry is retried until ``max_attempts``, then + dead-lettered. Returns ``{sent, failed, remaining}``. + """ + sent = 0 + failed = 0 + for entry in self.pending()[:max_batch]: + entry["attempts"] += 1 + try: + sink(entry["event"]) + except Exception as error: # pylint: disable=broad-exception-caught + if entry["attempts"] >= max_attempts: + entry["status"] = "failed" + entry["error"] = str(error) + failed += 1 + continue + entry["status"] = "sent" + sent += 1 + return {"sent": sent, "failed": failed, "remaining": len(self.pending())} + + def to_dict(self) -> Dict[str, Any]: + """Return the outbox state as a plain dict.""" + return {"counter": self._counter, + "events": [dict(entry) for entry in self._events]} + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> "Outbox": + """Build an outbox from a :meth:`to_dict` mapping.""" + outbox = cls() + outbox._counter = int(data.get("counter", 0)) + outbox._events = [dict(entry) for entry in data.get("events", [])] + return outbox + + def save(self, path: str) -> str: + """Persist the outbox to ``path`` as JSON; return the path.""" + out = Path(path) + out.parent.mkdir(parents=True, exist_ok=True) + out.write_text(json.dumps(self.to_dict(), indent=2), encoding="utf-8") + return str(out) + + @classmethod + def load(cls, path: str) -> "Outbox": + """Load an outbox from a JSON file.""" + return cls.from_dict(json.loads(Path(path).read_text(encoding="utf-8"))) diff --git a/test/unit_test/headless/test_outbox_batch.py b/test/unit_test/headless/test_outbox_batch.py new file mode 100644 index 00000000..0280b97e --- /dev/null +++ b/test/unit_test/headless/test_outbox_batch.py @@ -0,0 +1,104 @@ +"""Headless tests for the transactional outbox. No Qt.""" +import je_auto_control as ac +from je_auto_control.utils.outbox import Outbox + + +def test_enqueue_marks_pending(): + box = Outbox() + first = box.enqueue({"type": "a"}) + box.enqueue({"type": "b"}) + assert first == "1" + assert [e["event"]["type"] for e in box.pending()] == ["a", "b"] + + +def test_drain_delivers_all(): + box = Outbox() + box.enqueue({"n": 1}) + box.enqueue({"n": 2}) + seen = [] + result = box.drain(seen.append) + assert result == {"sent": 2, "failed": 0, "remaining": 0} + assert [e["n"] for e in seen] == [1, 2] + assert box.pending() == [] + + +def test_drain_retries_then_dead_letters(): + box = Outbox() + box.enqueue({"n": 1}) + + def always_fail(_event): + raise RuntimeError("sink down") + + # Below max_attempts: stays pending for retry. + for _ in range(4): + box.drain(always_fail, max_attempts=5) + assert len(box.pending()) == 1 + assert box.dead_letters() == [] + # Fifth attempt exhausts and dead-letters. + box.drain(always_fail, max_attempts=5) + assert box.pending() == [] + dead = box.dead_letters() + assert len(dead) == 1 and dead[0]["error"] == "sink down" + + +def test_drain_resumes_after_transient_failure(): + box = Outbox() + box.enqueue({"n": 1}) + calls = {"count": 0} + + def flaky(_event): + calls["count"] += 1 + if calls["count"] == 1: + raise RuntimeError("blip") + + box.drain(flaky) # fails, stays pending + result = box.drain(flaky) # succeeds now + assert result["sent"] == 1 + assert box.pending() == [] + + +def test_max_batch_limits_delivery(): + box = Outbox() + for i in range(5): + box.enqueue({"n": i}) + sent = [] + result = box.drain(sent.append, max_batch=2) + assert result == {"sent": 2, "failed": 0, "remaining": 3} + + +def test_save_load_round_trip(tmp_path): + box = Outbox() + box.enqueue({"n": 1}) + path = str(tmp_path / "outbox.json") + box.save(path) + restored = Outbox.load(path) + assert len(restored.pending()) == 1 + assert restored.enqueue({"n": 2}) == "2" # counter preserved + + +# --- wiring --------------------------------------------------------------- + +def test_executor_round_trip(): + name = "outbox-exec-test" + rec = ac.execute_action([[ + "AC_outbox_enqueue", {"name": name, "event": '{"type": "x"}'}]]) + out = next(v for v in rec.values() if isinstance(v, dict)) + assert out["id"] == "1" and out["pending"] == 1 + rec2 = ac.execute_action([["AC_outbox_pending", {"name": name}]]) + pending = next(v for v in rec2.values() if isinstance(v, dict))["pending"] + assert pending[0]["event"]["type"] == "x" + + +def test_wiring(): + known = ac.executor.known_commands() + assert {"AC_outbox_enqueue", "AC_outbox_pending"} <= set(known) + from je_auto_control.utils.mcp_server.tools import build_default_tool_registry + names = {t.name for t in build_default_tool_registry()} + assert {"ac_outbox_enqueue", "ac_outbox_pending"} <= names + from je_auto_control.gui.script_builder.command_schema import _build_specs + specs = {s.command for s in _build_specs()} + assert {"AC_outbox_enqueue", "AC_outbox_pending"} <= specs + + +def test_facade_exports(): + assert hasattr(ac, "Outbox") and "Outbox" in ac.__all__