Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

## Table of Contents

- [What's new (2026-06-22) — Transactional Outbox](#whats-new-2026-06-22--transactional-outbox)
- [What's new (2026-06-22) — Optimistic-Concurrency Versioned Store](#whats-new-2026-06-22--optimistic-concurrency-versioned-store)
- [What's new (2026-06-22) — Per-Stream Sequence-Gap Detection](#whats-new-2026-06-22--per-stream-sequence-gap-detection)
- [What's new (2026-06-22) — Time-Windowed Deduplication](#whats-new-2026-06-22--time-windowed-deduplication)
Expand Down Expand Up @@ -159,6 +160,12 @@

---

## What's new (2026-06-22) — Transactional Outbox

Durably buffer events and drain them at-least-once. Full reference: [`docs/source/Eng/doc/new_features/v107_features_doc.rst`](docs/source/Eng/doc/new_features/v107_features_doc.rst).

- **`Outbox`** (`AC_outbox_enqueue`, `AC_outbox_pending`): `events.cloud_events` posts synchronously with no durability — a crash or network blip loses the event. The outbox persists each event first, then `drain`s pending entries through an injected sink with at-least-once delivery: a sink failure leaves the entry pending for retry until `max_attempts`, after which it is dead-lettered. `save` / `load` keep events across restarts. Pure-stdlib, deterministic.

## What's new (2026-06-22) — Optimistic-Concurrency Versioned Store

Update only if the version is unchanged (compare-and-swap / If-Match). Full reference: [`docs/source/Eng/doc/new_features/v106_features_doc.rst`](docs/source/Eng/doc/new_features/v106_features_doc.rst).
Expand Down
7 changes: 7 additions & 0 deletions README/README_zh-CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

## 目录

- [本次更新 (2026-06-22) — 事务型 Outbox](#本次更新-2026-06-22--事务型-outbox)
- [本次更新 (2026-06-22) — 乐观并发版本存储](#本次更新-2026-06-22--乐观并发版本存储)
- [本次更新 (2026-06-22) — 逐流序号间隙检测](#本次更新-2026-06-22--逐流序号间隙检测)
- [本次更新 (2026-06-22) — 时间窗口去重](#本次更新-2026-06-22--时间窗口去重)
Expand Down Expand Up @@ -162,6 +163,12 @@

平滑噪声值序列。完整参考:[`docs/source/Zh/doc/new_features/v102_features_doc.rst`](../docs/source/Zh/doc/new_features/v102_features_doc.rst)。

## 本次更新 (2026-06-22) — 事务型 Outbox

持久化缓冲事件并以至少一次传递排空。完整参考:[`docs/source/Zh/doc/new_features/v107_features_doc.rst`](../docs/source/Zh/doc/new_features/v107_features_doc.rst)。

- **`Outbox`**(`AC_outbox_enqueue`、`AC_outbox_pending`):`events.cloud_events` 同步发送且无持久化——当机或网络抖动就会丢失事件。Outbox 先持久化每个事件,再通过注入的 sink 以至少一次传递 `drain` 待传递项目:sink 失败时项目维持待传递以供重试,直到 `max_attempts`,之后列为死信。`save` / `load` 让事件能跨重启存活。纯标准库、确定。

## 本次更新 (2026-06-22) — 乐观并发版本存储

只在版本未变时更新(compare-and-swap / If-Match)。完整参考:[`docs/source/Zh/doc/new_features/v106_features_doc.rst`](../docs/source/Zh/doc/new_features/v106_features_doc.rst)。
Expand Down
7 changes: 7 additions & 0 deletions README/README_zh-TW.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

## 目錄

- [本次更新 (2026-06-22) — 交易型 Outbox](#本次更新-2026-06-22--交易型-outbox)
- [本次更新 (2026-06-22) — 樂觀並行版本儲存](#本次更新-2026-06-22--樂觀並行版本儲存)
- [本次更新 (2026-06-22) — 逐串流序號間隙偵測](#本次更新-2026-06-22--逐串流序號間隙偵測)
- [本次更新 (2026-06-22) — 時間視窗去重](#本次更新-2026-06-22--時間視窗去重)
Expand Down Expand Up @@ -162,6 +163,12 @@

平滑雜訊值序列。完整參考:[`docs/source/Zh/doc/new_features/v102_features_doc.rst`](../docs/source/Zh/doc/new_features/v102_features_doc.rst)。

## 本次更新 (2026-06-22) — 交易型 Outbox

持久化緩衝事件並以至少一次傳遞排空。完整參考:[`docs/source/Zh/doc/new_features/v107_features_doc.rst`](../docs/source/Zh/doc/new_features/v107_features_doc.rst)。

- **`Outbox`**(`AC_outbox_enqueue`、`AC_outbox_pending`):`events.cloud_events` 同步發送且無持久化——當機或網路抖動就會丟失事件。Outbox 先持久化每個事件,再透過注入的 sink 以至少一次傳遞 `drain` 待傳遞項目:sink 失敗時項目維持待傳遞以供重試,直到 `max_attempts`,之後列為死信。`save` / `load` 讓事件能跨重啟存活。純標準函式庫、具決定性。

## 本次更新 (2026-06-22) — 樂觀並行版本儲存

只在版本未變時更新(compare-and-swap / If-Match)。完整參考:[`docs/source/Zh/doc/new_features/v106_features_doc.rst`](../docs/source/Zh/doc/new_features/v106_features_doc.rst)。
Expand Down
46 changes: 46 additions & 0 deletions docs/source/Eng/doc/new_features/v107_features_doc.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
Transactional Outbox
====================

``events.cloud_events`` posts immediately and synchronously — a crash between
"did the work" and "sent the event" loses it, and a network blip drops it (no
durability, no retry, no replay). The transactional-outbox pattern persists each
event first and drains it later with at-least-once delivery and a dead-letter
cap, so events survive sink outages.

Pure standard library (``json``); imports no ``PySide6``. The delivery ``sink``
is injected and the store is in-memory with JSON persistence, so draining is
fully deterministic in CI.

Headless API
------------

.. code-block:: python

from je_auto_control import Outbox

box = Outbox()
box.enqueue({"type": "order.created", "id": 7}) # buffered, pending
box.enqueue({"type": "order.paid", "id": 7})

result = box.drain(post_to_webhook, max_batch=100, max_attempts=5)
# {"sent": 2, "failed": 0, "remaining": 0}

box.pending() # entries still awaiting delivery
box.dead_letters() # entries that exhausted their attempts

``enqueue`` appends an event as pending and returns its id. ``drain`` delivers
up to ``max_batch`` pending entries through the injected ``sink``; a sink
exception leaves the entry pending for retry until ``max_attempts``, after which
it is dead-lettered (recorded with its error). Delivery is at-least-once: a sink
that succeeds but is interrupted before the entry is marked sent will be retried.
``save`` / ``load`` persist the whole buffer as JSON so events outlive the
process.

Executor commands
-----------------

``AC_outbox_enqueue`` returns ``{id, pending}``; ``AC_outbox_pending`` returns
``{pending}``. Both use a named-instance registry and are exposed as MCP tools
(``ac_outbox_enqueue`` / ``ac_outbox_pending``) and as Script Builder commands
under **Flow**. Draining requires a callable sink, so it stays a headless / API
operation.
1 change: 1 addition & 0 deletions docs/source/Eng/eng_index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ Comprehensive guides for all AutoControl features.
doc/new_features/v104_features_doc
doc/new_features/v105_features_doc
doc/new_features/v106_features_doc
doc/new_features/v107_features_doc
doc/ocr_backends/ocr_backends_doc
doc/observability/observability_doc
doc/operations_layer/operations_layer_doc
Expand Down
38 changes: 38 additions & 0 deletions docs/source/Zh/doc/new_features/v107_features_doc.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
交易型 Outbox
=============

``events.cloud_events`` 立即且同步發送——在「完成工作」與「送出事件」之間若當機,事件就遺失;網路抖動也會
丟失(沒有持久化、沒有重試、沒有重播)。交易型 outbox 模式先持久化每個事件,稍後再以至少一次(at-least-once)
傳遞與死信上限來排空(drain),讓事件能在接收端故障時存活。

純標準函式庫(``json``);不匯入 ``PySide6``。傳遞用的 ``sink`` 以注入方式提供,儲存為記憶體內並具 JSON
持久化,因此排空在 CI 中完全具決定性。

無頭 API
--------

.. code-block:: python

from je_auto_control import Outbox

box = Outbox()
box.enqueue({"type": "order.created", "id": 7}) # 已緩衝、待傳遞
box.enqueue({"type": "order.paid", "id": 7})

result = box.drain(post_to_webhook, max_batch=100, max_attempts=5)
# {"sent": 2, "failed": 0, "remaining": 0}

box.pending() # 仍待傳遞的項目
box.dead_letters() # 已用盡重試次數的項目

``enqueue`` 將事件附加為待傳遞並回傳其 id。``drain`` 透過注入的 ``sink`` 傳遞至多 ``max_batch`` 個待傳遞項目;
``sink`` 拋出例外時,該項目維持待傳遞以供重試,直到 ``max_attempts``,之後被列為死信(連同錯誤一併記錄)。
傳遞為至少一次:若 ``sink`` 成功但在標記為已送出前被中斷,該項目會被重試。``save`` / ``load`` 以 JSON
持久化整個緩衝區,讓事件能在行程結束後存活。

執行器命令
----------

``AC_outbox_enqueue`` 回傳 ``{id, pending}``;``AC_outbox_pending`` 回傳 ``{pending}``。兩者使用具名實例登錄,
並以 MCP 工具(``ac_outbox_enqueue`` / ``ac_outbox_pending``)以及 Script Builder 中 **Flow** 分類下的命令提供。
排空需要可呼叫的 sink,因此維持為無頭 / API 操作。
1 change: 1 addition & 0 deletions docs/source/Zh/zh_index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,7 @@ AutoControl 所有功能的完整使用指南。
doc/new_features/v104_features_doc
doc/new_features/v105_features_doc
doc/new_features/v106_features_doc
doc/new_features/v107_features_doc
doc/ocr_backends/ocr_backends_doc
doc/observability/observability_doc
doc/operations_layer/operations_layer_doc
Expand Down
3 changes: 3 additions & 0 deletions je_auto_control/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,8 @@
from je_auto_control.utils.optimistic import (
VersionConflict, VersionedStore, check_if_match, if_match_header,
)
# Transactional outbox (durable at-least-once event delivery)
from je_auto_control.utils.outbox import Outbox
# CI workflow annotations (GitHub Actions)
from je_auto_control.utils.ci_annotations import (
emit_annotations, format_annotation,
Expand Down Expand Up @@ -940,6 +942,7 @@ def start_autocontrol_gui(*args, **kwargs):
"IdempotencyConflict", "IdempotencyStore", "request_fingerprint",
"DedupWindow", "SequenceTracker",
"VersionConflict", "VersionedStore", "check_if_match", "if_match_header",
"Outbox",
"emit_annotations", "format_annotation",
"ClipboardHistory", "default_clipboard_history",
"analyze_heal_log", "heal_stats", "scan_secrets",
Expand Down
16 changes: 16 additions & 0 deletions je_auto_control/gui/script_builder/command_schema.py
Original file line number Diff line number Diff line change
Expand Up @@ -2050,6 +2050,22 @@ def _add_resilience_specs(specs: List[CommandSpec]) -> None:
),
description="Read a versioned record {value, version}.",
))
specs.append(CommandSpec(
"AC_outbox_enqueue", "Flow", "Outbox: Enqueue",
fields=(
FieldSpec("name", FieldType.STRING, placeholder="orders"),
FieldSpec("event", FieldType.STRING,
placeholder='{"type": "order.created", "id": 7}'),
),
description="Durably buffer an event for at-least-once delivery.",
))
specs.append(CommandSpec(
"AC_outbox_pending", "Flow", "Outbox: Pending",
fields=(
FieldSpec("name", FieldType.STRING, placeholder="orders"),
),
description="List events still awaiting successful delivery.",
))
specs.append(CommandSpec(
"AC_diff_rows", "Data", "Dataset Diff: Rows by Key",
fields=(
Expand Down
23 changes: 23 additions & 0 deletions je_auto_control/utils/executor/action_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -2933,6 +2933,27 @@ def _rate_limit(name: str, rate: float = 1.0, capacity: float = 1.0,
_DEDUP_WINDOWS: Dict[str, Any] = {}
_SEQUENCE_TRACKERS: Dict[str, Any] = {}
_VERSIONED_STORES: Dict[str, Any] = {}
_OUTBOXES: Dict[str, Any] = {}


def _outbox_enqueue(name: str, event: Any) -> Dict[str, Any]:
"""Adapter: enqueue an event into a named outbox."""
import json
from je_auto_control.utils.outbox import Outbox
if isinstance(event, str):
try:
event = json.loads(event)
except ValueError:
pass
outbox = _OUTBOXES.setdefault(name, Outbox())
return {"id": outbox.enqueue(event), "pending": len(outbox.pending())}


def _outbox_pending(name: str) -> Dict[str, Any]:
"""Adapter: list pending entries of a named outbox."""
from je_auto_control.utils.outbox import Outbox
outbox = _OUTBOXES.setdefault(name, Outbox())
return {"pending": outbox.pending()}


def _cas_put(name: str, key: str, value: Any,
Expand Down Expand Up @@ -4615,6 +4636,8 @@ def __init__(self):
"AC_sequence_observe": _sequence_observe,
"AC_cas_put": _cas_put,
"AC_cas_get": _cas_get,
"AC_outbox_enqueue": _outbox_enqueue,
"AC_outbox_pending": _outbox_pending,
"AC_detect_drift": _detect_drift,
"AC_categorical_drift": _categorical_drift,
"AC_diff_rows": _diff_rows,
Expand Down
25 changes: 24 additions & 1 deletion je_auto_control/utils/mcp_server/tools/_factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -3576,6 +3576,29 @@ def optimistic_tools() -> List[MCPTool]:
]


def outbox_tools() -> List[MCPTool]:
return [
MCPTool(
name="ac_outbox_enqueue",
description=("Enqueue 'event' into named outbox 'name' for durable "
"at-least-once delivery. Returns {id, pending}."),
input_schema=schema(
{"name": {"type": "string"}, "event": {"type": "object"}},
["name", "event"]),
handler=h.outbox_enqueue,
annotations=NON_DESTRUCTIVE,
),
MCPTool(
name="ac_outbox_pending",
description=("List the pending entries of named outbox 'name'. "
"Returns {pending}."),
input_schema=schema({"name": {"type": "string"}}, ["name"]),
handler=h.outbox_pending,
annotations=READ_ONLY,
),
]


def sequence_gap_tools() -> List[MCPTool]:
return [
MCPTool(
Expand Down Expand Up @@ -5615,7 +5638,7 @@ def media_assert_tools() -> List[MCPTool]:
data_profile_tools, http_problem_tools, dotenv_tools,
sse_client_tools, layered_config_tools, data_drift_tools, schema_compat_tools,
timeseries_tools, anomaly_tools, smoothing_tools, idempotency_tools,
dedup_window_tools, sequence_gap_tools, optimistic_tools,
dedup_window_tools, sequence_gap_tools, optimistic_tools, outbox_tools,
dataset_diff_tools, referential_tools, link_header_tools, multipart_tools,
http_content_tools, cookie_jar_tools, http_conditional_tools,
saga_tools, decision_table_tools, locator_repair_tools,
Expand Down
10 changes: 10 additions & 0 deletions je_auto_control/utils/mcp_server/tools/_handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -1952,6 +1952,16 @@ def cas_get(name, key):
return _cas_get(name, key)


def outbox_enqueue(name, event):
from je_auto_control.utils.executor.action_executor import _outbox_enqueue
return _outbox_enqueue(name, event)


def outbox_pending(name):
from je_auto_control.utils.executor.action_executor import _outbox_pending
return _outbox_pending(name)


def detect_drift(reference, current, threshold=0.25, bins=10):
from je_auto_control.utils.executor.action_executor import _detect_drift
return _detect_drift(reference, current, threshold, bins)
Expand Down
4 changes: 4 additions & 0 deletions je_auto_control/utils/outbox/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
"""Transactional outbox for durable at-least-once event delivery."""
from je_auto_control.utils.outbox.outbox import Outbox

__all__ = ["Outbox"]
88 changes: 88 additions & 0 deletions je_auto_control/utils/outbox/outbox.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
"""Transactional outbox: durably buffer events and drain them at-least-once.

``events.cloud_events`` posts immediately and synchronously — a crash between
"did the work" and "sent the event" loses it, and a network blip drops it (no
durability, no retry, no replay). The outbox pattern persists each event and
drains it later with at-least-once delivery and a dead-letter cap.

Pure standard library (``json``); imports no ``PySide6``. The delivery ``sink``
is injected and the store is in-memory with JSON persistence, so draining is
fully deterministic in CI.
"""
import json
from pathlib import Path
from typing import Any, Callable, Dict, List

Sink = Callable[[Any], Any]


class Outbox:
"""An ordered buffer of events drained to a sink with retry + dead-letter."""

def __init__(self) -> None:
self._events: List[Dict[str, Any]] = []
self._counter = 0

def enqueue(self, event: Any) -> str:
"""Append ``event`` as pending; return its id."""
self._counter += 1
entry_id = str(self._counter)
self._events.append({"id": entry_id, "event": event,
"status": "pending", "attempts": 0})
return entry_id

def pending(self) -> List[Dict[str, Any]]:
"""Entries still awaiting successful delivery."""
return [entry for entry in self._events if entry["status"] == "pending"]

def dead_letters(self) -> List[Dict[str, Any]]:
"""Entries that exhausted their delivery attempts."""
return [entry for entry in self._events if entry["status"] == "failed"]

def drain(self, sink: Sink, *, max_batch: int = 100,
max_attempts: int = 5) -> Dict[str, int]:
"""Deliver up to ``max_batch`` pending entries via ``sink``.

On a sink exception the entry is retried until ``max_attempts``, then
dead-lettered. Returns ``{sent, failed, remaining}``.
"""
sent = 0
failed = 0
for entry in self.pending()[:max_batch]:
entry["attempts"] += 1
try:
sink(entry["event"])
except Exception as error: # pylint: disable=broad-exception-caught
if entry["attempts"] >= max_attempts:
entry["status"] = "failed"
entry["error"] = str(error)
failed += 1
continue
entry["status"] = "sent"
sent += 1
return {"sent": sent, "failed": failed, "remaining": len(self.pending())}

def to_dict(self) -> Dict[str, Any]:
"""Return the outbox state as a plain dict."""
return {"counter": self._counter,
"events": [dict(entry) for entry in self._events]}

@classmethod
def from_dict(cls, data: Dict[str, Any]) -> "Outbox":
"""Build an outbox from a :meth:`to_dict` mapping."""
outbox = cls()
outbox._counter = int(data.get("counter", 0))
outbox._events = [dict(entry) for entry in data.get("events", [])]
return outbox

def save(self, path: str) -> str:
"""Persist the outbox to ``path`` as JSON; return the path."""
out = Path(path)
out.parent.mkdir(parents=True, exist_ok=True)
out.write_text(json.dumps(self.to_dict(), indent=2), encoding="utf-8")
return str(out)

@classmethod
def load(cls, path: str) -> "Outbox":
"""Load an outbox from a JSON file."""
return cls.from_dict(json.loads(Path(path).read_text(encoding="utf-8")))
Loading
Loading