From 10ecb3d4cb31dda88702ad4369d6a7e098f5cc65 Mon Sep 17 00:00:00 2001 From: Trecek Date: Sat, 30 May 2026 18:23:32 -0700 Subject: [PATCH 01/17] test: add failing tests for run-skill execution marker blind spot Adds TDD tests that fail until the execution marker glob is generalized: - test_stale_NOT_fired_when_execution_marker_active: session monitor must suppress stale when run-skill-in-progress-* marker is fresh - test_stale_fired_when_execution_marker_expired: marker present but stale still fires when execution marker is expired - test_execution_marker_suppression_bounded_by_max_suppression_seconds: suppression is bounded even with continuously-touched run-skill marker - test_stdout_idle_NOT_fired_when_execution_marker_active: idle watchdog must suppress IDLE_STALL when run-skill-in-progress-* marker is fresh Current dispatch-in-progress-* glob does not match run-skill-in-progress-* markers, so tests 1a, 1c, 1d fail before the fix. Co-Authored-By: Claude Sonnet 4.6 --- tests/execution/test_process_idle_watchdog.py | 53 +++++++++++ .../test_process_session_log_monitor.py | 93 +++++++++++++++++++ ...s_session_log_monitor_stale_suppression.py | 62 +++++++++++++ 3 files changed, 208 insertions(+) diff --git a/tests/execution/test_process_idle_watchdog.py b/tests/execution/test_process_idle_watchdog.py index 934a989c28..a4c62bec18 100644 --- a/tests/execution/test_process_idle_watchdog.py +++ b/tests/execution/test_process_idle_watchdog.py @@ -638,4 +638,57 @@ async def test_watch_stdout_idle_marker_suppression_bounded( assert elapsed < 5.0 +@pytest.mark.anyio +async def test_stdout_idle_NOT_fired_when_execution_marker_active( + tmp_path: anyio.Path, +) -> None: + """IDLE_STALL NOT fired when stdout silent but run-skill-in-progress marker is fresh. + + Failing before implementation: run-skill-in-progress-* marker is invisible to + dispatch-in-progress-* glob, so IDLE_STALL fires immediately. + After fix: marker matched by *-in-progress-* glob, stall suppressed. + """ + stdout_file = tmp_path / "stdout.txt" + await anyio.Path(stdout_file).write_bytes(b"initial output\n") + + marker_dir = tmp_path / "marker_dir" + await anyio.Path(marker_dir).mkdir() + marker_path = marker_dir / "run-skill-in-progress-caller-session-step1.marker" + await anyio.Path(marker_path).write_text("{}") + + acc = RaceAccumulator() + trigger = anyio.Event() + + async def touch_marker() -> None: + for _ in range(100): + await anyio.sleep(0.02) + try: + await anyio.Path(marker_path).touch() + except OSError: + break + + with anyio.move_on_after(0.5): + async with anyio.create_task_group() as tg: + tg.start_soon(touch_marker) + tg.start_soon( + functools.partial( + _watch_stdout_idle, + stdout_file, + 0.05, # idle_output_timeout — short to trigger quickly if not suppressed + acc, + trigger, + 0.02, # _poll_interval + marker_dir=marker_dir, + session_id="caller-session", + max_suppression_seconds=10.0, + ) + ) + await trigger.wait() + + assert not acc.idle_stall, ( + "IDLE_STALL fired even though run-skill execution marker was active. " + "The *-in-progress-* glob fix is needed." + ) + + # test write diff --git a/tests/execution/test_process_session_log_monitor.py b/tests/execution/test_process_session_log_monitor.py index 9937cb6295..dbc612ac87 100644 --- a/tests/execution/test_process_session_log_monitor.py +++ b/tests/execution/test_process_session_log_monitor.py @@ -777,6 +777,99 @@ async def test_phase2_no_spurious_read_on_preexisting_content(self, tmp_path): assert result.status == ChannelBStatus.STALE +@pytest.mark.anyio +async def test_stale_NOT_fired_when_execution_marker_active(tmp_path): + """Stale NOT fired when JSONL silent but a run-skill-in-progress marker is active and fresh. + + This is a TDD guard for the execution marker blind spot: current code uses + dispatch-in-progress-* glob, so run-skill-in-progress-* markers are invisible. + After the fix (glob -> *-in-progress-*), this test must pass. + """ + import json + + session_file = tmp_path / "abc123.jsonl" + session_file.write_text( + json.dumps({"type": "assistant", "message": {"role": "assistant", "content": "working"}}) + + "\n" + ) + spawn_time = time.time() - 1 + + marker_dir = tmp_path / "marker_dir" + marker_dir.mkdir() + marker_path = marker_dir / "run-skill-in-progress-caller-session-step1.marker" + marker_path.write_text("{}") + + monitor_returned = anyio.Event() + monitor_status: list[object] = [] + + async def run_monitor() -> None: + result = await _session_log_monitor( + tmp_path, + "DONE", + stale_threshold=0.05, + spawn_time=spawn_time, + marker_dir=marker_dir, + caller_session_id="caller-session", + _phase1_poll=0.01, + _phase2_poll=0.05, + max_suppression_seconds=10.0, + ) + monitor_status.append(result.status) + monitor_returned.set() + + async def touch_marker() -> None: + for _ in range(60): + await anyio.sleep(0.05) + try: + marker_path.touch() + except OSError: + break + + with anyio.move_on_after(0.6): + async with anyio.create_task_group() as tg: + tg.start_soon(touch_marker) + tg.start_soon(run_monitor) + + assert not monitor_returned.is_set(), ( + f"Monitor returned early: stale fired with active run-skill execution marker. " + f"Status: {monitor_status}" + ) + + +@pytest.mark.anyio +async def test_stale_fired_when_execution_marker_expired(tmp_path): + """Stale fires when JSONL is silent and the run-skill execution marker is expired.""" + import json + import os + + session_file = tmp_path / "abc123.jsonl" + session_file.write_text( + json.dumps({"type": "assistant", "message": {"role": "assistant", "content": "working"}}) + + "\n" + ) + spawn_time = time.time() - 1 + + marker_dir = tmp_path / "marker_dir" + marker_dir.mkdir() + marker_path = marker_dir / "run-skill-in-progress-caller-session-step1.marker" + marker_path.write_text("{}") + past = time.time() - 120 # 120s old — well beyond default max_marker_age=60s + os.utime(marker_path, (past, past)) + + with anyio.fail_after(3.0): + result = await _session_log_monitor( + tmp_path, + "DONE", + stale_threshold=0.05, + spawn_time=spawn_time, + marker_dir=marker_dir, + caller_session_id="caller-session", + _phase1_poll=0.01, + _phase2_poll=0.05, + ) + assert result.status == ChannelBStatus.STALE + + @pytest.mark.anyio async def test_watch_session_log_passes_marker_dir_to_monitor_kwargs( tmp_path: anyio.Path, monkeypatch: pytest.MonkeyPatch diff --git a/tests/execution/test_process_session_log_monitor_stale_suppression.py b/tests/execution/test_process_session_log_monitor_stale_suppression.py index 7ef2295ff4..2082273578 100644 --- a/tests/execution/test_process_session_log_monitor_stale_suppression.py +++ b/tests/execution/test_process_session_log_monitor_stale_suppression.py @@ -316,3 +316,65 @@ def _api_conn(pid): assert elapsed >= 0.15, ( f"elapsed {elapsed:.2f}s below 0.15s — suppression may not have fired" ) + + +class TestExecutionMarkerSuppression: + """Execution marker (run-skill-in-progress-*) suppression for _session_log_monitor. + + Covers the run_skill MCP tool blind spot: markers named run-skill-in-progress-* + must suppress stale kills just like dispatch-in-progress-* markers. + """ + + @pytest.mark.anyio + async def test_execution_marker_suppression_bounded_by_max_suppression_seconds(self, tmp_path): + """Stale fires after max_suppression_seconds despite fresh run-skill marker. + + Failing before implementation: run-skill-in-progress-* marker is invisible to + dispatch-in-progress-* glob, so STALE fires immediately (elapsed << max_suppression). + After fix: marker found, suppression active, fires only after max_suppression_seconds. + """ + session_file = tmp_path / "abc123.jsonl" + session_file.write_text( + '{"type": "assistant", "message": {"role": "assistant", "content": "working"}}\n' + ) + spawn_time = time.time() - 1 + + marker_dir = tmp_path / "marker_dir" + marker_dir.mkdir() + marker_path = marker_dir / "run-skill-in-progress-caller-session-step1.marker" + marker_path.write_text("{}") + + max_suppression = 0.3 + + async def touch_marker() -> None: + for _ in range(200): + await anyio.sleep(0.02) + try: + marker_path.touch() + except OSError: + break + + start = time.monotonic() + with anyio.fail_after(5.0): + async with anyio.create_task_group() as tg: + tg.start_soon(touch_marker) + result = await _session_log_monitor( + tmp_path, + "DONE", + stale_threshold=0.05, + spawn_time=spawn_time, + marker_dir=marker_dir, + caller_session_id="caller-session", + _phase1_poll=0.01, + _phase2_poll=0.05, + max_suppression_seconds=max_suppression, + ) + tg.cancel_scope.cancel() + + elapsed = time.monotonic() - start + assert result.status == ChannelBStatus.STALE + assert elapsed >= max_suppression, ( + f"STALE fired after {elapsed:.3f}s, expected >= {max_suppression}s. " + "run-skill execution marker suppression not working — " + "marker may not be matched by the glob pattern." + ) From bc8efa288e48bc1b6125e28d5d84afd24ecd82be Mon Sep 17 00:00:00 2001 From: Trecek Date: Sat, 30 May 2026 18:27:58 -0700 Subject: [PATCH 02/17] refactor: rename _has_active_dispatch_marker -> _has_active_execution_marker Generalizes the marker detection function and its glob pattern to match both dispatch-in-progress-* and run-skill-in-progress-* markers: Old glob: dispatch-in-progress-{session_id}-*.marker New glob: *-in-progress-{session_id}-*.marker This makes the stale detector's suppression mechanism work for any execution context, not just fleet dispatches. All callers in _process_race.py (_watch_stdout_idle, _watch_child_activity) and _process_monitor.py (_session_log_monitor) are updated. Updated all test references: monkeypatch targets, imports, class names, frozenset, arch guard predicate string, and CLAUDE.md table entry. Adds parametrized backward-compat test confirming both prefixes match and a wrong-session-id guard test. Co-Authored-By: Claude Sonnet 4.6 --- .../core/types/_type_subprocess.py | 12 +++--- src/autoskillit/execution/process/__init__.py | 4 +- .../execution/process/_process_monitor.py | 10 ++--- .../execution/process/_process_race.py | 8 ++-- tests/arch/CLAUDE.md | 2 +- tests/arch/test_watcher_signal_consistency.py | 18 +++++---- .../test_process_deadline_extension.py | 4 +- tests/execution/test_process_heartbeat.py | 39 +++++++++++++------ tests/execution/test_process_idle_watchdog.py | 16 ++++---- ...ess_session_log_monitor_dispatch_marker.py | 20 +++++----- tests/execution/test_process_submodules.py | 8 ++-- 11 files changed, 80 insertions(+), 61 deletions(-) diff --git a/src/autoskillit/core/types/_type_subprocess.py b/src/autoskillit/core/types/_type_subprocess.py index 9d757ac851..d7b9bfbb08 100644 --- a/src/autoskillit/core/types/_type_subprocess.py +++ b/src/autoskillit/core/types/_type_subprocess.py @@ -130,14 +130,14 @@ class SubprocessRunner(Protocol): Parameters ---------- marker_dir : Path | None - Directory containing ``dispatch-in-progress-{session_id}-*.marker`` files. - When non-None, the session log monitor checks for active dispatch markers + Directory containing ``*-in-progress-{session_id}-*.marker`` files. + When non-None, the session log monitor checks for active execution markers before issuing stale-kill signals, suppressing kills while a fleet dispatch - is in progress. Default ``None`` (no suppression). + or run_skill call is in progress. Default ``None`` (no suppression). session_id : str | None - Caller's session identity, used to scope dispatch-marker glob patterns to - the originating fleet session. Threaded from fleet dispatch through headless - execution to ``_session_log_monitor``'s ``caller_session_id`` parameter. + Caller's session identity, used to scope execution-marker glob patterns to + the originating session. Threaded from fleet dispatch / run_skill through + headless execution to ``_session_log_monitor``'s ``caller_session_id`` parameter. Default ``None`` (match any marker). """ diff --git a/src/autoskillit/execution/process/__init__.py b/src/autoskillit/execution/process/__init__.py index b22a641d7e..9cbfbe86f1 100644 --- a/src/autoskillit/execution/process/__init__.py +++ b/src/autoskillit/execution/process/__init__.py @@ -46,7 +46,7 @@ from autoskillit.execution.process._process_monitor import ( _has_active_api_connection, _has_active_child_processes, - _has_active_dispatch_marker, + _has_active_execution_marker, _heartbeat, _session_log_monitor, ) @@ -84,7 +84,7 @@ "RaceSignals", "_has_active_api_connection", "_has_active_child_processes", - "_has_active_dispatch_marker", + "_has_active_execution_marker", "_heartbeat", "_jsonl_contains_marker", "_jsonl_has_record_type", diff --git a/src/autoskillit/execution/process/_process_monitor.py b/src/autoskillit/execution/process/_process_monitor.py index b0ad8b378f..b95b9a4315 100644 --- a/src/autoskillit/execution/process/_process_monitor.py +++ b/src/autoskillit/execution/process/_process_monitor.py @@ -156,18 +156,18 @@ def _has_active_child_processes(pid: int) -> bool: return active -def _has_active_dispatch_marker( +def _has_active_execution_marker( marker_dir: Path, session_id: str | None = None, max_marker_age: float = 60.0, ) -> bool: - """Return True if any dispatch-in-progress marker was touched within max_marker_age seconds.""" + """Return True if any execution-in-progress marker was touched within max_marker_age secs.""" try: now = time.time() pattern = ( - f"dispatch-in-progress-{session_id}-*.marker" + f"*-in-progress-{session_id}-*.marker" if session_id is not None - else "dispatch-in-progress-*.marker" + else "*-in-progress-*.marker" ) for p in marker_dir.glob(pattern): try: @@ -390,7 +390,7 @@ async def _session_log_monitor( elapsed, pid, ) - elif marker_dir is not None and _has_active_dispatch_marker( + elif marker_dir is not None and _has_active_execution_marker( marker_dir, session_id=caller_session_id ): if suppression_start is None: diff --git a/src/autoskillit/execution/process/_process_race.py b/src/autoskillit/execution/process/_process_race.py index b40fc70ea3..3c32e050d1 100644 --- a/src/autoskillit/execution/process/_process_race.py +++ b/src/autoskillit/execution/process/_process_race.py @@ -14,7 +14,7 @@ from autoskillit.execution.process._process_monitor import ( _has_active_api_connection, _has_active_child_processes, - _has_active_dispatch_marker, + _has_active_execution_marker, _heartbeat, _session_log_monitor, ) @@ -189,7 +189,7 @@ async def _watch_stdout_idle( last_growth_time = _time.monotonic() suppression_start_marker = None elif _time.monotonic() - last_growth_time >= idle_output_timeout: - if marker_dir is not None and _has_active_dispatch_marker( + if marker_dir is not None and _has_active_execution_marker( marker_dir, session_id=session_id ): now = _time.monotonic() @@ -240,7 +240,7 @@ async def _watch_child_activity( """Extend the wall-clock CancelScope.deadline when child processes are active. Polls _has_active_child_processes, _has_active_api_connection, and - _has_active_dispatch_marker every _poll_interval seconds. When any + _has_active_execution_marker every _poll_interval seconds. When any returns True, pushes timeout_scope.deadline forward (up to max_extension_seconds beyond the original deadline). @@ -267,7 +267,7 @@ async def _watch_child_activity( or _has_active_api_connection(pid) or ( marker_dir is not None - and _has_active_dispatch_marker(marker_dir, session_id=session_id) + and _has_active_execution_marker(marker_dir, session_id=session_id) ) ) if not active: diff --git a/tests/arch/CLAUDE.md b/tests/arch/CLAUDE.md index 058ca207e7..a086b53fbe 100644 --- a/tests/arch/CLAUDE.md +++ b/tests/arch/CLAUDE.md @@ -74,7 +74,7 @@ AST enforcement, sub-package layer contracts, and architectural invariant tests. | `test_variadic_ordering.py` | Architectural invariant: positional initial_prompt must precede all variadic CLI flags in build_interactive_cmd | | `test_flag_metadata_coverage.py` | Closed categorization guard: every ClaudeFlags member must appear in VARIADIC_CLAUDE_FLAGS or NON_VARIADIC_CLAUDE_FLAGS | | `test_interactive_ordering_gate.py` | AST guard: _session_launch.py and _cook.py must import and call assert_interactive_ordering before subprocess invocation | -| `test_watcher_signal_consistency.py` | Structural guard: all process watchers must call `_has_active_dispatch_marker` | +| `test_watcher_signal_consistency.py` | Structural guard: all process watchers must call `_has_active_execution_marker` | | `test_write_restriction_coverage.py` | Architectural invariant: skills with prose write restrictions in NEVER blocks have runtime enforcement (read_only, output_dir, or allowlist) | | `test_model_identity_contract.py` | AST guard: detect_model_drift must use normalize_model_id and _models_match — raw alias/full-ID comparison is a false-positive source | | `test_helpers_exports.py` | Asserts shared test helpers export required symbols (strip_markdown_code_regions) | diff --git a/tests/arch/test_watcher_signal_consistency.py b/tests/arch/test_watcher_signal_consistency.py index c623ad89dc..ece69e9f0e 100644 --- a/tests/arch/test_watcher_signal_consistency.py +++ b/tests/arch/test_watcher_signal_consistency.py @@ -1,4 +1,4 @@ -"""Structural guard: all process watchers must call _has_active_dispatch_marker.""" +"""Structural guard: all process watchers must call _has_active_execution_marker.""" from __future__ import annotations @@ -12,7 +12,7 @@ _PROCESS_RACE = Path("src/autoskillit/execution/process/_process_race.py") _PROCESS_MONITOR = Path("src/autoskillit/execution/process/_process_monitor.py") -_WATCHERS_THAT_MUST_CHECK_DISPATCH_MARKER = frozenset( +_WATCHERS_THAT_MUST_CHECK_EXECUTION_MARKER = frozenset( { "_watch_child_activity", "_session_log_monitor", @@ -37,13 +37,15 @@ def _functions_calling_predicate(source_path: Path, predicate: str) -> set[str]: return result -@pytest.mark.parametrize("watcher", sorted(_WATCHERS_THAT_MUST_CHECK_DISPATCH_MARKER)) -def test_watcher_calls_has_active_dispatch_marker(watcher: str) -> None: - """Each watcher in the set must call _has_active_dispatch_marker.""" - callers_race = _functions_calling_predicate(_PROCESS_RACE, "_has_active_dispatch_marker") - callers_monitor = _functions_calling_predicate(_PROCESS_MONITOR, "_has_active_dispatch_marker") +@pytest.mark.parametrize("watcher", sorted(_WATCHERS_THAT_MUST_CHECK_EXECUTION_MARKER)) +def test_watcher_calls_has_active_execution_marker(watcher: str) -> None: + """Each watcher in the set must call _has_active_execution_marker.""" + callers_race = _functions_calling_predicate(_PROCESS_RACE, "_has_active_execution_marker") + callers_monitor = _functions_calling_predicate( + _PROCESS_MONITOR, "_has_active_execution_marker" + ) all_callers = callers_race | callers_monitor assert watcher in all_callers, ( - f"{watcher} does not call _has_active_dispatch_marker. " + f"{watcher} does not call _has_active_execution_marker. " f"Functions that do: {sorted(all_callers)}" ) diff --git a/tests/execution/test_process_deadline_extension.py b/tests/execution/test_process_deadline_extension.py index 8eff2579f7..1f17059bcb 100644 --- a/tests/execution/test_process_deadline_extension.py +++ b/tests/execution/test_process_deadline_extension.py @@ -185,7 +185,7 @@ async def test_extends_deadline_when_dispatch_marker_active(monkeypatch, tmp_pat lambda pid: False, ) monkeypatch.setattr( - "autoskillit.execution.process._process_race._has_active_dispatch_marker", + "autoskillit.execution.process._process_race._has_active_execution_marker", lambda marker_dir, **kw: True, ) trigger = anyio.Event() @@ -228,7 +228,7 @@ async def test_no_extension_when_marker_inactive(monkeypatch, tmp_path) -> None: lambda pid: False, ) monkeypatch.setattr( - "autoskillit.execution.process._process_race._has_active_dispatch_marker", + "autoskillit.execution.process._process_race._has_active_execution_marker", lambda marker_dir, **kw: False, ) trigger = anyio.Event() diff --git a/tests/execution/test_process_heartbeat.py b/tests/execution/test_process_heartbeat.py index be26effa97..dab0b231a3 100644 --- a/tests/execution/test_process_heartbeat.py +++ b/tests/execution/test_process_heartbeat.py @@ -18,7 +18,7 @@ from autoskillit.execution.process import ( _has_active_api_connection, _has_active_child_processes, - _has_active_dispatch_marker, + _has_active_execution_marker, _heartbeat, _session_log_monitor, run_managed_async, @@ -750,19 +750,19 @@ async def test_stream_parser_skips_empty_result(self, tmp_path): ) -class TestHasActiveDispatchMarker: - """Unit tests for _has_active_dispatch_marker.""" +class TestHasActiveExecutionMarker: + """Unit tests for _has_active_execution_marker.""" def test_dispatch_marker_nonexistent_dir_returns_false(self, tmp_path): """Nonexistent marker directory returns False without raising.""" - result = _has_active_dispatch_marker(tmp_path / "nonexistent") + result = _has_active_execution_marker(tmp_path / "nonexistent") assert result is False def test_dispatch_marker_fresh_marker_returns_true(self, tmp_path): """A fresh marker file causes the function to return True.""" marker = tmp_path / "dispatch-in-progress-sess1-abc.marker" marker.touch() - result = _has_active_dispatch_marker(tmp_path) + result = _has_active_execution_marker(tmp_path) assert result is True def test_dispatch_marker_expired_marker_returns_false(self, tmp_path): @@ -772,15 +772,15 @@ def test_dispatch_marker_expired_marker_returns_false(self, tmp_path): old_time = time.time() - 120 _os.utime(marker, (old_time, old_time)) - result = _has_active_dispatch_marker(tmp_path, max_marker_age=60.0) + result = _has_active_execution_marker(tmp_path, max_marker_age=60.0) assert result is False def test_dispatch_marker_session_id_filters(self, tmp_path): """When session_id is provided, only markers matching that session are considered.""" (tmp_path / "dispatch-in-progress-abc-001.marker").touch() (tmp_path / "dispatch-in-progress-xyz-002.marker").touch() - result_matching = _has_active_dispatch_marker(tmp_path, session_id="abc") - result_non_matching = _has_active_dispatch_marker(tmp_path, session_id="def") + result_matching = _has_active_execution_marker(tmp_path, session_id="abc") + result_non_matching = _has_active_execution_marker(tmp_path, session_id="def") assert result_matching is True assert result_non_matching is False @@ -788,13 +788,13 @@ def test_dispatch_marker_none_session_id_matches_all(self, tmp_path): """session_id=None matches any dispatch-in-progress marker.""" marker = tmp_path / "dispatch-in-progress-xyz-001.marker" marker.touch() - result = _has_active_dispatch_marker(tmp_path, session_id=None) + result = _has_active_execution_marker(tmp_path, session_id=None) assert result is True def test_dispatch_marker_no_logger_calls(self): """The function body contains no logger.* attribute access calls.""" - source = inspect.getsource(_has_active_dispatch_marker) + source = inspect.getsource(_has_active_execution_marker) tree = ast.parse(source) for node in ast.walk(tree): if ( @@ -802,4 +802,21 @@ def test_dispatch_marker_no_logger_calls(self): and isinstance(node.value, ast.Name) and node.value.id == "logger" ): - pytest.fail(f"Found logger.{node.attr} in _has_active_dispatch_marker body") + pytest.fail(f"Found logger.{node.attr} in _has_active_execution_marker body") + + @pytest.mark.parametrize( + "prefix", + ["dispatch-in-progress", "run-skill-in-progress"], + ) + def test_execution_marker_matches_both_prefixes(self, tmp_path, prefix): + """Both dispatch-in-progress-* and run-skill-in-progress-* markers are matched.""" + marker = tmp_path / f"{prefix}-sess1-step.marker" + marker.touch() + result = _has_active_execution_marker(tmp_path) + assert result is True, f"{prefix}-* marker not matched by *-in-progress-* glob pattern" + + def test_wrong_session_id_gives_no_match(self, tmp_path): + """Marker for session-A does not match when globbing for session-B.""" + (tmp_path / "run-skill-in-progress-session-A-step.marker").touch() + assert _has_active_execution_marker(tmp_path, session_id="session-B") is False + assert _has_active_execution_marker(tmp_path, session_id="session-A") is True diff --git a/tests/execution/test_process_idle_watchdog.py b/tests/execution/test_process_idle_watchdog.py index a4c62bec18..132cab9536 100644 --- a/tests/execution/test_process_idle_watchdog.py +++ b/tests/execution/test_process_idle_watchdog.py @@ -106,7 +106,7 @@ async def test_watch_stdout_idle_suppression_evaluated_fires_once_during_suppres await anyio.Path(stdout_file).write_bytes(b"initial output\n") monkeypatch.setattr( - "autoskillit.execution.process._process_race._has_active_dispatch_marker", + "autoskillit.execution.process._process_race._has_active_execution_marker", lambda marker_dir, session_id=None: True, ) @@ -286,7 +286,7 @@ async def test_watch_stdout_idle_suppressed_by_dispatch_marker( await anyio.Path(stdout_file).write_bytes(b"initial output\n") monkeypatch.setattr( - "autoskillit.execution.process._process_race._has_active_dispatch_marker", + "autoskillit.execution.process._process_race._has_active_execution_marker", lambda marker_dir, session_id=None: True, ) @@ -330,7 +330,7 @@ async def test_watch_stdout_idle_fires_when_suppression_cap_exceeded( await anyio.Path(stdout_file).write_bytes(b"initial output\n") monkeypatch.setattr( - "autoskillit.execution.process._process_race._has_active_dispatch_marker", + "autoskillit.execution.process._process_race._has_active_execution_marker", lambda marker_dir, session_id=None: True, ) @@ -390,7 +390,7 @@ async def test_watch_stdout_idle_suppression_timer_resets_on_growth( await anyio.Path(stdout_file).write_bytes(b"initial\n") monkeypatch.setattr( - "autoskillit.execution.process._process_race._has_active_dispatch_marker", + "autoskillit.execution.process._process_race._has_active_execution_marker", lambda marker_dir, session_id=None: True, ) @@ -433,7 +433,7 @@ async def test_watch_stdout_idle_emits_suppression_warning( await anyio.Path(stdout_file).write_bytes(b"initial output\n") monkeypatch.setattr( - "autoskillit.execution.process._process_race._has_active_dispatch_marker", + "autoskillit.execution.process._process_race._has_active_execution_marker", lambda marker_dir, session_id=None: True, ) @@ -483,7 +483,7 @@ async def test_watch_stdout_idle_marker_false_fires_immediately( await anyio.Path(stdout_file).write_bytes(b"initial output\n") monkeypatch.setattr( - "autoskillit.execution.process._process_race._has_active_dispatch_marker", + "autoskillit.execution.process._process_race._has_active_execution_marker", lambda marker_dir, session_id=None: False, ) @@ -515,7 +515,7 @@ async def test_watch_stdout_idle_dispatch_marker_suppresses_stall( ) -> None: """Active dispatch marker suppresses idle stall within the suppression window.""" monkeypatch.setattr( - "autoskillit.execution.process._process_race._has_active_dispatch_marker", + "autoskillit.execution.process._process_race._has_active_execution_marker", lambda marker_dir, session_id=None: True, ) stdout_file = tmp_path / "stdout.txt" @@ -606,7 +606,7 @@ async def test_watch_stdout_idle_marker_suppression_bounded( ) -> None: """Suppression cap exceeded — idle stall fires despite active marker.""" monkeypatch.setattr( - "autoskillit.execution.process._process_race._has_active_dispatch_marker", + "autoskillit.execution.process._process_race._has_active_execution_marker", lambda marker_dir, session_id=None: True, ) stdout_file = tmp_path / "stdout.txt" diff --git a/tests/execution/test_process_session_log_monitor_dispatch_marker.py b/tests/execution/test_process_session_log_monitor_dispatch_marker.py index 231195e06d..a834af49c2 100644 --- a/tests/execution/test_process_session_log_monitor_dispatch_marker.py +++ b/tests/execution/test_process_session_log_monitor_dispatch_marker.py @@ -20,12 +20,12 @@ class TestStaleSuppressionDispatchMarker: Tests the following scenarios for dispatch marker stale suppression: - T1: Active marker (True→False via monkeypatch) suppresses stale, then fires - T2: Expired marker (mtime > 60s) does NOT suppress stale (real helper, no monkeypatch — - exercises the mtime-expiry threshold in _has_active_dispatch_marker directly) + exercises the mtime-expiry threshold in _has_active_execution_marker directly) - T3: No marker_dir (None) skips dispatch check entirely (regression guard) - T4: Bounded suppression fires after max_suppression_seconds - T5: Session-scoped matching — sessionA marker does not suppress sessionB - Monkeypatch target: autoskillit.execution.process._process_monitor._has_active_dispatch_marker + Monkeypatch target: autoskillit.execution.process._process_monitor._has_active_execution_marker Convention: caller_session_id= is the kwarg used at call sites. Note: T1, T4 monkeypatch the helper; T2, T5 use real marker files to test helper internals. """ @@ -43,7 +43,7 @@ def side_effect_fn(marker_dir, session_id=None): return call_count["n"] == 1 monkeypatch.setattr( - "autoskillit.execution.process._process_monitor._has_active_dispatch_marker", + "autoskillit.execution.process._process_monitor._has_active_execution_marker", side_effect_fn, ) with anyio.fail_after(5.0): @@ -107,7 +107,7 @@ async def test_stale_marker_suppression_bounded(self, tmp_path, monkeypatch): session_file.write_text("") spawn_time = time.time() - 10 monkeypatch.setattr( - "autoskillit.execution.process._process_monitor._has_active_dispatch_marker", + "autoskillit.execution.process._process_monitor._has_active_execution_marker", lambda marker_dir, session_id=None: True, ) with anyio.fail_after(3.0): @@ -197,7 +197,7 @@ def fake_dispatch_marker(marker_dir, session_id=None): fake_child_proc, ) monkeypatch.setattr( - "autoskillit.execution.process._process_monitor._has_active_dispatch_marker", + "autoskillit.execution.process._process_monitor._has_active_execution_marker", fake_dispatch_marker, ) with anyio.fail_after(5.0): @@ -233,7 +233,7 @@ async def test_dispatch_marker_bounded_by_max_suppression(self, tmp_path, monkey lambda pid: False, ) monkeypatch.setattr( - "autoskillit.execution.process._process_monitor._has_active_dispatch_marker", + "autoskillit.execution.process._process_monitor._has_active_execution_marker", lambda marker_dir, session_id=None: True, ) with structlog.testing.capture_logs() as logs: @@ -285,7 +285,7 @@ def fake_dispatch_marker(marker_dir, session_id=None): fake_child_proc, ) monkeypatch.setattr( - "autoskillit.execution.process._process_monitor._has_active_dispatch_marker", + "autoskillit.execution.process._process_monitor._has_active_execution_marker", fake_dispatch_marker, ) with structlog.testing.capture_logs() as logs: @@ -327,7 +327,7 @@ async def test_dispatch_marker_bounded_kill_emits_warning_with_fields( lambda pid: False, ) monkeypatch.setattr( - "autoskillit.execution.process._process_monitor._has_active_dispatch_marker", + "autoskillit.execution.process._process_monitor._has_active_execution_marker", lambda marker_dir, session_id=None: True, ) with structlog.testing.capture_logs() as logs: @@ -352,7 +352,7 @@ async def test_dispatch_marker_bounded_kill_emits_warning_with_fields( @pytest.mark.anyio async def test_marker_dir_none_skips_dispatch_check(self, tmp_path, monkeypatch): - """When marker_dir is None (default), _has_active_dispatch_marker is never called.""" + """When marker_dir is None (default), _has_active_execution_marker is never called.""" session_file = tmp_path / "abc123.jsonl" session_file.write_text("") spawn_time = time.time() - 10 @@ -377,7 +377,7 @@ def track_dispatch_marker(marker_dir, session_id=None): fake_child_proc, ) monkeypatch.setattr( - "autoskillit.execution.process._process_monitor._has_active_dispatch_marker", + "autoskillit.execution.process._process_monitor._has_active_execution_marker", track_dispatch_marker, ) with anyio.fail_after(5.0): diff --git a/tests/execution/test_process_submodules.py b/tests/execution/test_process_submodules.py index 696f87b2a6..2a15c3934b 100644 --- a/tests/execution/test_process_submodules.py +++ b/tests/execution/test_process_submodules.py @@ -21,7 +21,7 @@ "RaceSignals", "_has_active_api_connection", "_has_active_child_processes", - "_has_active_dispatch_marker", + "_has_active_execution_marker", "_heartbeat", "_jsonl_contains_marker", "_jsonl_has_record_type", @@ -99,7 +99,7 @@ def test_process_monitor_exports(): are defined in _process_monitor submodule.""" from autoskillit.execution.process._process_monitor import ( _has_active_api_connection, - _has_active_dispatch_marker, + _has_active_execution_marker, _heartbeat, _session_log_monitor, ) @@ -112,9 +112,9 @@ def test_process_monitor_exports(): assert ( _has_active_api_connection.__module__ == "autoskillit.execution.process._process_monitor" ) - assert callable(_has_active_dispatch_marker) + assert callable(_has_active_execution_marker) assert ( - _has_active_dispatch_marker.__module__ == "autoskillit.execution.process._process_monitor" + _has_active_execution_marker.__module__ == "autoskillit.execution.process._process_monitor" ) From a5bfcd0ad7abe12bfc81e59e9e177d5e605de96d Mon Sep 17 00:00:00 2001 From: Trecek Date: Sat, 30 May 2026 18:31:11 -0700 Subject: [PATCH 03/17] feat: thread marker_dir and caller_session_id through run() call chain Adds marker_dir: Path | None and caller_session_id: str | None to: - HeadlessExecutor.run() Protocol - DefaultHeadlessExecutor.run() concrete method - run_headless_core() function Both params are forwarded to _execute_claude_headless() as marker_dir and session_id respectively, making the stale detector's marker-based suppression available for run_skill sessions (not just dispatch_food_truck). Adds integration tests verifying the forwarding chain: - test_run_headless_core_forwards_marker_dir_and_caller_session_id - test_default_executor_run_forwards_marker_dir_and_caller_session_id - test_headless_executor_protocol_includes_marker_dir_params Co-Authored-By: Claude Sonnet 4.6 --- .../core/types/_type_protocols_execution.py | 2 + .../execution/headless/__init__.py | 8 ++ .../test_headless_provider_forwarding.py | 77 +++++++++++++++++++ 3 files changed, 87 insertions(+) diff --git a/src/autoskillit/core/types/_type_protocols_execution.py b/src/autoskillit/core/types/_type_protocols_execution.py index ba63e32356..d47aa5be20 100644 --- a/src/autoskillit/core/types/_type_protocols_execution.py +++ b/src/autoskillit/core/types/_type_protocols_execution.py @@ -67,6 +67,8 @@ async def run( resume_checkpoint: SessionCheckpoint | None = None, resume_message: str | None = None, backend_override: str | None = None, + marker_dir: Path | None = None, + caller_session_id: str | None = None, ) -> SkillResult: ... async def dispatch_food_truck( diff --git a/src/autoskillit/execution/headless/__init__.py b/src/autoskillit/execution/headless/__init__.py index 5891bd4638..4c4aa288b5 100644 --- a/src/autoskillit/execution/headless/__init__.py +++ b/src/autoskillit/execution/headless/__init__.py @@ -135,6 +135,8 @@ async def run_headless_core( resume_checkpoint: SessionCheckpoint | None = None, resume_message: str | None = None, backend_override: str | None = None, + marker_dir: Path | None = None, + caller_session_id: str | None = None, ) -> SkillResult: """Shared headless runner used by run_skill. @@ -235,6 +237,8 @@ async def run_headless_core( step_backend=step_backend, model_identifier=resolved_model or "", profile_name=profile_name, + marker_dir=marker_dir, + session_id=caller_session_id, ) @@ -277,6 +281,8 @@ async def run( resume_checkpoint: SessionCheckpoint | None = None, resume_message: str | None = None, backend_override: str | None = None, + marker_dir: Path | None = None, + caller_session_id: str | None = None, ) -> SkillResult: cfg = self._ctx.config.run_skill effective_timeout = timeout if timeout is not None else cfg.timeout @@ -313,6 +319,8 @@ async def run( resume_checkpoint=resume_checkpoint, resume_message=resume_message, backend_override=backend_override, + marker_dir=marker_dir, + caller_session_id=caller_session_id, ) async def dispatch_food_truck( diff --git a/tests/execution/test_headless_provider_forwarding.py b/tests/execution/test_headless_provider_forwarding.py index 6ca701cdbf..f0a5f25a9c 100644 --- a/tests/execution/test_headless_provider_forwarding.py +++ b/tests/execution/test_headless_provider_forwarding.py @@ -929,3 +929,80 @@ async def fake_runner(cmd, **kwargs): ) backend.stream_parser.assert_called_once_with(completion_marker="%%TEST_MARKER%%") + + +@pytest.mark.anyio +async def test_run_headless_core_forwards_marker_dir_and_caller_session_id( + minimal_ctx, tmp_path, monkeypatch +) -> None: + """run_headless_core passes marker_dir + caller_session_id to _execute_claude_headless.""" + from autoskillit.core import CmdSpec + from autoskillit.execution.headless import run_headless_core + + execute_kwargs: dict = {} + marker_dir = tmp_path / "markers" + marker_dir.mkdir() + + backend = _mock_backend() + backend.build_skill_session_cmd.return_value = CmdSpec( + cmd=("claude", "--print", "test"), env={} + ) + minimal_ctx.backend = backend + + async def fake_execute(spec, cwd, ctx, **kwargs): # noqa: ARG001 + execute_kwargs.update(kwargs) + return _STUB_RESULT + + monkeypatch.setattr("autoskillit.execution.headless._execute_claude_headless", fake_execute) + + await run_headless_core( + "/autoskillit:probe", + str(tmp_path), + minimal_ctx, + marker_dir=marker_dir, + caller_session_id="orchestrator-session-abc", + ) + + assert execute_kwargs.get("marker_dir") == marker_dir + assert execute_kwargs.get("session_id") == "orchestrator-session-abc" + + +@pytest.mark.anyio +async def test_default_executor_run_forwards_marker_dir_and_caller_session_id( + minimal_ctx, tmp_path, monkeypatch +) -> None: + """DefaultHeadlessExecutor.run() passes marker_dir + caller_session_id to run_headless_core.""" + import autoskillit.execution.headless as _headless_mod + from autoskillit.execution.headless import DefaultHeadlessExecutor + + captured: dict = {} + marker_dir = tmp_path / "markers" + marker_dir.mkdir() + + async def fake_core(skill_command, cwd, ctx, **kwargs): # noqa: ARG001 + captured.update(kwargs) + return _STUB_RESULT + + monkeypatch.setattr(_headless_mod, "run_headless_core", fake_core) + + executor = DefaultHeadlessExecutor(minimal_ctx) + await executor.run( + "/autoskillit:probe", + str(tmp_path), + marker_dir=marker_dir, + caller_session_id="orchestrator-session-xyz", + ) + + assert captured.get("marker_dir") == marker_dir + assert captured.get("caller_session_id") == "orchestrator-session-xyz" + + +def test_headless_executor_protocol_includes_marker_dir_params() -> None: + """HeadlessExecutor.run() Protocol includes marker_dir and caller_session_id.""" + import inspect + + from autoskillit.core.types import HeadlessExecutor + + sig = inspect.signature(HeadlessExecutor.run) + assert sig.parameters["marker_dir"].default is None + assert sig.parameters["caller_session_id"].default is None From b3948cba36a93801ec3c5b865b68b2bde48da0d6 Mon Sep 17 00:00:00 2001 From: Trecek Date: Sat, 30 May 2026 19:00:09 -0700 Subject: [PATCH 04/17] feat: extract execution_marker context manager and wire run_skill marker creation Create core/_execution_marker.py with a shared async context manager that handles marker write, heartbeat, and cleanup. Refactor fleet/_api.py to use execution_marker instead of inline marker logic, and wire run_skill in tools_execution.py to create markers before blocking on executor.run(). Co-Authored-By: Claude Opus 4.6 (1M context) --- src/autoskillit/core/CLAUDE.md | 1 + src/autoskillit/core/__init__.pyi | 1 + src/autoskillit/core/_execution_marker.py | 91 +++++++++++++ src/autoskillit/fleet/_api.py | 121 ++++++------------ .../server/tools/tools_execution.py | 82 +++++++----- tests/arch/test_execution_source_split.py | 2 +- tests/fakes.py | 6 + tests/fleet/CLAUDE.md | 2 +- tests/fleet/test_api.py | 31 +++-- tests/fleet/test_api_dispatch_marker.py | 13 +- tests/infra/test_schema_read_convention.py | 4 +- 11 files changed, 213 insertions(+), 141 deletions(-) create mode 100644 src/autoskillit/core/_execution_marker.py diff --git a/src/autoskillit/core/CLAUDE.md b/src/autoskillit/core/CLAUDE.md index 4cefcec4b2..12a1ea0d39 100644 --- a/src/autoskillit/core/CLAUDE.md +++ b/src/autoskillit/core/CLAUDE.md @@ -22,6 +22,7 @@ Sub-packages: types/ (see types/CLAUDE.md) and runtime/ (see runtime/CLAUDE.md). | `_plugin_cache.py` | Plugin cache lifecycle: retiring cache, install locking, kitchen registry | | `_plugin_ids.py` | `DIRECT_PREFIX`, `MARKETPLACE_PREFIX`, `detect_autoskillit_mcp_prefix` (stdlib-only) | | `_install_detect.py` | `is_dev_install()` — editable-install detection for config resolution | +| `_execution_marker.py` | `execution_marker` async context manager — unified write/heartbeat/cleanup for stale-detector suppression markers | | `_step_context.py` | `current_step_name`, `current_order_id` ContextVars for pipeline step attribution | | `feature_flags.py` | `is_feature_enabled()` — IL-0 feature gate resolution primitive | | `tool_sequence_analysis.py` | Cross-session tool call sequence DFG analysis (stdlib-only) | diff --git a/src/autoskillit/core/__init__.pyi b/src/autoskillit/core/__init__.pyi index d4098b76ca..fdc76bfd96 100644 --- a/src/autoskillit/core/__init__.pyi +++ b/src/autoskillit/core/__init__.pyi @@ -4,6 +4,7 @@ from ._cmd_runner import CmdRunner as CmdRunner from ._cmd_runner import default_cmd_runner as default_cmd_runner from ._cmd_runner import run_gh as run_gh from ._cmd_runner import run_git as run_git +from ._execution_marker import execution_marker as execution_marker from ._install_detect import DirectUrlInfo as DirectUrlInfo from ._install_detect import _is_release_tag as _is_release_tag from ._install_detect import _is_stable_track as _is_stable_track diff --git a/src/autoskillit/core/_execution_marker.py b/src/autoskillit/core/_execution_marker.py new file mode 100644 index 0000000000..2dc38522b8 --- /dev/null +++ b/src/autoskillit/core/_execution_marker.py @@ -0,0 +1,91 @@ +"""Unified execution marker protocol for stale-detector suppression. + +Async context manager that writes a ``*-in-progress-{session_id}-{label}.marker`` +file, heartbeats its mtime, and deletes it on exit. Lives in ``core/`` (IL-0) so +both ``fleet/_api.py`` (IL-2) and ``server/tools/`` (IL-3) can import it without +violating layer constraints. +""" + +from __future__ import annotations + +import asyncio +import os +from collections.abc import AsyncGenerator +from contextlib import asynccontextmanager +from pathlib import Path + +import anyio + +from .io import write_versioned_json +from .logging import get_logger + +logger = get_logger(__name__) + + +async def _touch_marker(marker_path: Path, interval: float, trigger: anyio.Event | None) -> None: + try: + marker_path.touch() + except OSError: + logger.warning("execution_marker: touch failed %s", marker_path, exc_info=True) + while trigger is None or not trigger.is_set(): + await anyio.sleep(interval) + try: + marker_path.touch() + except OSError: + logger.warning("execution_marker: touch failed %s", marker_path, exc_info=True) + + +@asynccontextmanager +async def execution_marker( + marker_dir: Path | None, + session_id: str, + label: str, + heartbeat_interval: float = 30.0, +) -> AsyncGenerator[Path | None]: + """Write, heartbeat, and clean up an execution marker. + + Yields the marker ``Path`` on success, or ``None`` when ``marker_dir`` is + ``None`` or the initial write fails (suppression disabled, same semantics + as ``fleet/_api.py``'s dispatch marker). + """ + if marker_dir is None: + yield None + return + + marker_path = marker_dir / f"{label}-in-progress-{session_id}-{label}.marker" + try: + write_versioned_json( + marker_path, + { + "label": label, + "orchestrator_pid": os.getpid(), + "session_id": session_id, + }, + schema_version=1, + ) + except OSError: + logger.warning("execution_marker_write_failed", marker=str(marker_path), exc_info=True) + yield None + return + + hb_task: asyncio.Task[None] | None = None + trigger = anyio.Event() + try: + hb_task = asyncio.get_running_loop().create_task( + _touch_marker(marker_path, heartbeat_interval, trigger) + ) + yield marker_path + finally: + trigger.set() + if hb_task is not None: + hb_task.cancel() + try: + await hb_task + except (asyncio.CancelledError, Exception): + pass + try: + marker_path.unlink(missing_ok=True) + except OSError: + logger.warning( + "execution_marker_unlink_failed", marker=str(marker_path), exc_info=True + ) diff --git a/src/autoskillit/fleet/_api.py b/src/autoskillit/fleet/_api.py index d3407b5aa8..6a6f1f8ca8 100644 --- a/src/autoskillit/fleet/_api.py +++ b/src/autoskillit/fleet/_api.py @@ -3,8 +3,6 @@ from __future__ import annotations import asyncio -import functools -import os import time from collections.abc import Callable from pathlib import Path @@ -23,7 +21,6 @@ claude_code_project_dir, get_logger, truncate_text, - write_versioned_json, ) from autoskillit.fleet._capture import _extract_captures, _normalize_capture_spec from autoskillit.fleet._expressions import _CAMPAIGN_REF_RE, _interpolate_campaign_refs @@ -106,24 +103,6 @@ def _post_dispatch_cleanup( ) -async def _touch_dispatch_marker( - marker_path: Path, interval: float = 30.0, trigger: anyio.Event | None = None -) -> None: - """Periodically touch marker_path to refresh mtime; runs until trigger is set.""" - try: - marker_path.touch() - except OSError: - logger.warning("_touch_dispatch_marker: failed to touch %s", marker_path, exc_info=True) - while trigger is None or not trigger.is_set(): - await anyio.sleep(interval) - try: - marker_path.touch() - except OSError: - logger.warning( - "_touch_dispatch_marker: failed to touch %s", marker_path, exc_info=True - ) - - async def execute_dispatch( tool_ctx: ToolContext, recipe: str, @@ -572,71 +551,50 @@ def _on_spawn(pid: int, ticks: int) -> None: ) marker_dir: Path | None = None - marker_path: Path | None = None try: marker_dir = claude_code_project_dir(str(tool_ctx.project_dir)) except OSError: pass - if marker_dir is not None: - marker_path = marker_dir / f"dispatch-in-progress-{caller_session_id}-{dispatch_id}.marker" - try: - write_versioned_json( - marker_path, - { - "dispatch_id": dispatch_id, - "orchestrator_pid": os.getpid(), - "session_id": caller_session_id, - }, - schema_version=1, - ) - except OSError: - logger.warning("dispatch_marker_write_failed", marker=str(marker_path), exc_info=True) - marker_dir = None - marker_path = None + from autoskillit.core import execution_marker # noqa: PLC0415 _dispatch_completed_normally = False - _hb_trigger = anyio.Event() try: - async with anyio.create_task_group() as tg: - if marker_path is not None: - tg.start_soon( - functools.partial(_touch_dispatch_marker, marker_path, trigger=_hb_trigger) - ) - try: - skill_result = await tool_ctx.executor.dispatch_food_truck( - orchestrator_prompt=prompt, - cwd=str(tool_ctx.project_dir), - completion_marker=completion_marker, - prior_completion_markers=prior_completion_markers, - resume_session_id=resume_session_id, - resume_checkpoint=resume_checkpoint, - kitchen_id=tool_ctx.kitchen_id, - order_id=dispatch_id, - campaign_id=campaign_id, - dispatch_id=dispatch_id, - caller_session_id=caller_session_id, - project_dir=str(tool_ctx.project_dir), - marker_dir=marker_dir, - session_id=caller_session_id, - timeout=resolved_timeout, - idle_output_timeout=float(idle_output_timeout) - if idle_output_timeout is not None - else None, - env_extras={ - "AUTOSKILLIT_PROJECT_DIR": str(tool_ctx.project_dir), - "AUTOSKILLIT_CAMPAIGN_ID": campaign_id, - "AUTOSKILLIT_DISPATCH_ID": dispatch_id, - "AUTOSKILLIT_SESSION_DEADLINE": str(started_at + resolved_timeout), - }, - requires_packs=list(full_recipe.requires_packs) or ["kitchen-core"], - on_spawn=_on_spawn, - sentinel_contract=sentinel_contract, - resume_message=resume_message, - ) - finally: - _hb_trigger.set() - tg.cancel_scope.cancel() + async with execution_marker( + marker_dir, + caller_session_id, + "dispatch", + ): + skill_result = await tool_ctx.executor.dispatch_food_truck( + orchestrator_prompt=prompt, + cwd=str(tool_ctx.project_dir), + completion_marker=completion_marker, + prior_completion_markers=prior_completion_markers, + resume_session_id=resume_session_id, + resume_checkpoint=resume_checkpoint, + kitchen_id=tool_ctx.kitchen_id, + order_id=dispatch_id, + campaign_id=campaign_id, + dispatch_id=dispatch_id, + caller_session_id=caller_session_id, + project_dir=str(tool_ctx.project_dir), + marker_dir=marker_dir, + session_id=caller_session_id, + timeout=resolved_timeout, + idle_output_timeout=float(idle_output_timeout) + if idle_output_timeout is not None + else None, + env_extras={ + "AUTOSKILLIT_PROJECT_DIR": str(tool_ctx.project_dir), + "AUTOSKILLIT_CAMPAIGN_ID": campaign_id, + "AUTOSKILLIT_DISPATCH_ID": dispatch_id, + "AUTOSKILLIT_SESSION_DEADLINE": str(started_at + resolved_timeout), + }, + requires_packs=list(full_recipe.requires_packs) or ["kitchen-core"], + on_spawn=_on_spawn, + sentinel_contract=sentinel_contract, + resume_message=resume_message, + ) ended_at = time.time() _dispatch_completed_normally = True @@ -659,13 +617,6 @@ def _on_spawn(pid: int, ticks: int) -> None: ) raise finally: - if marker_path is not None: - try: - marker_path.unlink(missing_ok=True) - except OSError: - logger.warning( - "dispatch_marker_unlink_failed", marker=str(marker_path), exc_info=True - ) if not _dispatch_completed_normally: from autoskillit.fleet._label_cleanup import cleanup_orphaned_labels # noqa: PLC0415 diff --git a/src/autoskillit/server/tools/tools_execution.py b/src/autoskillit/server/tools/tools_execution.py index 164d5b6b5e..bec0152e5e 100644 --- a/src/autoskillit/server/tools/tools_execution.py +++ b/src/autoskillit/server/tools/tools_execution.py @@ -671,39 +671,59 @@ async def run_skill( _sn_token = _current_step_name.set(_canonical_step_name(step_name)) _oid_token = _current_order_id.set(effective_order_id) + from autoskillit.core import ( # noqa: PLC0415 + claude_code_project_dir, + execution_marker, + find_caller_session_id, + ) + + _marker_dir: Path | None = None + try: + _marker_dir = claude_code_project_dir(str(tool_ctx.project_dir)) + except OSError: + pass + _orchestrator_sid = find_caller_session_id(project_dir=tool_ctx.project_dir) + _start = time.monotonic() try: - skill_result = await tool_ctx.executor.run( - resolved_command, - cwd, - model=effective_model, - add_dirs=skill_add_dirs, - step_name=step_name, - kitchen_id=tool_ctx.kitchen_id, - order_id=effective_order_id, - expected_output_patterns=expected_output_patterns, - write_behavior=write_spec, - stale_threshold=float(stale_threshold) - if stale_threshold is not None - else None, - idle_output_timeout=float(idle_output_timeout) - if idle_output_timeout is not None - else None, - completion_marker=invocation_marker, - recipe_name=tool_ctx.recipe_name, - recipe_content_hash=tool_ctx.recipe_content_hash, - recipe_composite_hash=tool_ctx.recipe_composite_hash, - recipe_version=tool_ctx.recipe_version, - allowed_write_prefix=allowed_write_prefix, - allowed_write_prefixes=allowed_write_prefixes, - readonly_skill=is_read_only, - write_watch_dirs=write_watch_dirs, - provider_extras=provider_extras, - profile_name=profile_name_out, - provider_name=profile_name_out, - backend_override=backend_override, - resume_session_id=resume_session_id, - ) + async with execution_marker( + _marker_dir, + _orchestrator_sid, + "run-skill", + ): + skill_result = await tool_ctx.executor.run( + resolved_command, + cwd, + model=effective_model, + add_dirs=skill_add_dirs, + step_name=step_name, + kitchen_id=tool_ctx.kitchen_id, + order_id=effective_order_id, + expected_output_patterns=expected_output_patterns, + write_behavior=write_spec, + stale_threshold=float(stale_threshold) + if stale_threshold is not None + else None, + idle_output_timeout=float(idle_output_timeout) + if idle_output_timeout is not None + else None, + completion_marker=invocation_marker, + recipe_name=tool_ctx.recipe_name, + recipe_content_hash=tool_ctx.recipe_content_hash, + recipe_composite_hash=tool_ctx.recipe_composite_hash, + recipe_version=tool_ctx.recipe_version, + allowed_write_prefix=allowed_write_prefix, + allowed_write_prefixes=allowed_write_prefixes, + readonly_skill=is_read_only, + write_watch_dirs=write_watch_dirs, + provider_extras=provider_extras, + profile_name=profile_name_out, + provider_name=profile_name_out, + backend_override=backend_override, + resume_session_id=resume_session_id, + marker_dir=_marker_dir, + caller_session_id=_orchestrator_sid, + ) if skill_result.success: tool_ctx.audit.record_success(skill_command) _clear_run_skill_state(tool_ctx.project_dir) diff --git a/tests/arch/test_execution_source_split.py b/tests/arch/test_execution_source_split.py index 8037854206..4bd55df122 100644 --- a/tests/arch/test_execution_source_split.py +++ b/tests/arch/test_execution_source_split.py @@ -16,7 +16,7 @@ "_headless_execute.py", ] HEADLESS_SIZE_BUDGETS = { - "headless/__init__.py": 460, + "headless/__init__.py": 465, "headless/_headless_helpers.py": 220, "headless/_headless_execute.py": 595, "headless/_headless_recovery.py": 366, diff --git a/tests/fakes.py b/tests/fakes.py index 7240a20ab7..4589781122 100644 --- a/tests/fakes.py +++ b/tests/fakes.py @@ -94,6 +94,8 @@ class ExecutorCall: resume_checkpoint: SessionCheckpoint | None = None resume_message: str | None = None backend_override: str | None = None + marker_dir: Path | None = None + caller_session_id: str | None = None @dataclasses.dataclass @@ -193,6 +195,8 @@ async def run( resume_checkpoint: SessionCheckpoint | None = None, resume_message: str | None = None, backend_override: str | None = None, + marker_dir: Path | None = None, + caller_session_id: str | None = None, ) -> SkillResult: self.calls.append( ExecutorCall( @@ -226,6 +230,8 @@ async def run( resume_checkpoint=resume_checkpoint, resume_message=resume_message, backend_override=backend_override, + marker_dir=marker_dir, + caller_session_id=caller_session_id, ) ) if self._queue: diff --git a/tests/fleet/CLAUDE.md b/tests/fleet/CLAUDE.md index 02c59a61f3..6b0645fec4 100644 --- a/tests/fleet/CLAUDE.md +++ b/tests/fleet/CLAUDE.md @@ -12,7 +12,7 @@ Fleet campaign dispatch, state persistence, and sidecar tests. | `test_dispatch_reaper.py` | Tests for `fleet._dispatch_reaper.reap_stale_dispatches` — orphan kill, dead pid, recycled pid, create_time fallback, dry-run, idempotency | | `test_api.py` | Tests for fleet._api module (Group J) | | `test_api_split_integrity.py` | Structural guard: fleet `_api.py` split — verifies new modules export expected symbols and public API surface is preserved | -| `test_api_dispatch_marker.py` | Tests for _run_dispatch marker lifecycle and _touch_dispatch_marker heartbeat | +| `test_api_dispatch_marker.py` | Tests for _run_dispatch marker lifecycle via execution_marker context manager | | `test_campaign_capture.py` | Tests for campaign capture extraction and ingredient interpolation (Group J) | | `test_capture_roundtrip.py` | Tests for prompt-extractor field name alignment — verifies sentinel examples use bare names matching `_extract_captures` expectations | | `test_checkpoint_bridge.py` | Tests for checkpoint_from_sidecar converting IssueSidecarEntry to SessionCheckpoint | diff --git a/tests/fleet/test_api.py b/tests/fleet/test_api.py index 15e9944ce7..0a2fdd5c4b 100644 --- a/tests/fleet/test_api.py +++ b/tests/fleet/test_api.py @@ -312,30 +312,28 @@ async def test_execute_dispatch_returns_dispatch_result_with_state_path_on_succe assert result.per_dispatch_state_path.exists() -class TestTouchDispatchMarker: - """Unit tests for _touch_dispatch_marker helper.""" +class TestTouchMarker: + """Unit tests for _touch_marker helper (core._execution_marker).""" @pytest.mark.anyio - async def test_touch_dispatch_marker_creates_heartbeat_loop(self, tmp_path: Path) -> None: + async def test_touch_marker_creates_heartbeat_loop(self, tmp_path: Path) -> None: """Heartbeat loop refreshes mtime until trigger is set.""" import os - from autoskillit.fleet._api import _touch_dispatch_marker + from autoskillit.core._execution_marker import _touch_marker marker = tmp_path / "test.marker" marker.touch() - os.utime(marker, (0, 0)) # Set to epoch 0 so any real touch shows as newer + os.utime(marker, (0, 0)) original_mtime_ns = marker.stat().st_mtime_ns trigger = anyio.Event() async def _heartbeat_wrapper() -> None: - await _touch_dispatch_marker(marker, interval=0.05, trigger=trigger) + await _touch_marker(marker, interval=0.05, trigger=trigger) async with anyio.create_task_group() as tg: tg.start_soon(_heartbeat_wrapper) - # 1.0s gives ~20 heartbeat cycles; tolerates WSL2 CLOCK_REALTIME backward - # jumps of up to ~0.9s from NTP sync without causing a spurious failure. await anyio.sleep(1.0) trigger.set() @@ -343,19 +341,19 @@ async def _heartbeat_wrapper() -> None: assert new_mtime_ns > original_mtime_ns, "marker mtime should have been refreshed" @pytest.mark.anyio - async def test_touch_dispatch_marker_oserror_does_not_propagate(self, tmp_path: Path) -> None: + async def test_touch_marker_oserror_does_not_propagate(self, tmp_path: Path) -> None: """OSError during touch logs but does not raise.""" - from autoskillit.fleet._api import _touch_dispatch_marker + from autoskillit.core._execution_marker import _touch_marker subdir = tmp_path / "subdir" subdir.mkdir() marker_file = subdir / "test.marker" - marker_file.touch() # create the file first - marker_file.chmod(0o000) # make unwritable so touch() fails + marker_file.touch() + marker_file.chmod(0o000) trigger = anyio.Event() async def _heartbeat_wrapper() -> None: - await _touch_dispatch_marker(marker_file, interval=0.05, trigger=trigger) + await _touch_marker(marker_file, interval=0.05, trigger=trigger) try: async with anyio.create_task_group() as tg: @@ -363,9 +361,9 @@ async def _heartbeat_wrapper() -> None: await anyio.sleep(0.12) trigger.set() except OSError: - pytest.fail("_touch_dispatch_marker should not propagate OSError") + pytest.fail("_touch_marker should not propagate OSError") finally: - marker_file.chmod(0o644) # restore for cleanup + marker_file.chmod(0o644) class TestDispatchMarkerLifecycle: @@ -514,11 +512,12 @@ async def _capture_marker(*args, **kwargs): assert len(captured_json) == 1, "should have captured marker JSON" data = captured_json[0] - assert "dispatch_id" in data + assert "label" in data assert "orchestrator_pid" in data assert "session_id" in data assert isinstance(data["orchestrator_pid"], int) assert isinstance(data["session_id"], str) + assert data["label"] == "dispatch" class TestWriteDispatchToCampaignStateRefusal: diff --git a/tests/fleet/test_api_dispatch_marker.py b/tests/fleet/test_api_dispatch_marker.py index 0c45cf296c..cf9b83e798 100644 --- a/tests/fleet/test_api_dispatch_marker.py +++ b/tests/fleet/test_api_dispatch_marker.py @@ -1,4 +1,4 @@ -"""Tests for _run_dispatch marker lifecycle and _touch_dispatch_marker heartbeat.""" +"""Tests for _run_dispatch marker lifecycle via execution_marker context manager.""" from __future__ import annotations @@ -8,7 +8,8 @@ import anyio import pytest -from autoskillit.fleet._api import _run_dispatch, _touch_dispatch_marker +from autoskillit.core._execution_marker import _touch_marker +from autoskillit.fleet._api import _run_dispatch from tests.fleet._helpers import _no_sleep_quota_checker, _noop_quota_refresher, _setup_dispatch pytestmark = [pytest.mark.layer("fleet"), pytest.mark.medium, pytest.mark.feature("fleet")] @@ -25,7 +26,7 @@ async def test_marker_created_before_dispatch(tool_ctx, monkeypatch, tmp_path: P ) async def _asserting_dispatch(**_kw): - found = list(marker_dir.glob("dispatch-in-progress--*.marker")) + found = list(marker_dir.glob("*-in-progress-*.marker")) assert len(found) == 1, f"Expected 1 marker, found {len(found)}" raise asyncio.CancelledError @@ -67,7 +68,7 @@ async def test_marker_deleted_after_success(tool_ctx, monkeypatch, tmp_path: Pat quota_refresher=_noop_quota_refresher, ) - remaining = list(marker_dir.glob("dispatch-in-progress--*.marker")) + remaining = list(marker_dir.glob("*-in-progress-*.marker")) assert remaining == [], f"Expected no marker files, found {remaining}" @@ -101,7 +102,7 @@ async def _raise_runtime_error(**_kw): except* RuntimeError: pass - remaining = list(marker_dir.glob("dispatch-in-progress--*.marker")) + remaining = list(marker_dir.glob("*-in-progress-*.marker")) assert remaining == [], f"Expected no marker files after exception, found {remaining}" @@ -114,7 +115,7 @@ async def test_heartbeat_refreshes_mtime(tmp_path: Path) -> None: trigger = anyio.Event() async with anyio.create_task_group() as tg: - tg.start_soon(_touch_dispatch_marker, marker_path, 0.05, trigger) + tg.start_soon(_touch_marker, marker_path, 0.05, trigger) async def _stop(): await anyio.sleep(2.1) diff --git a/tests/infra/test_schema_read_convention.py b/tests/infra/test_schema_read_convention.py index 57255f1bae..bd0c8481f1 100644 --- a/tests/infra/test_schema_read_convention.py +++ b/tests/infra/test_schema_read_convention.py @@ -81,7 +81,9 @@ def _scan_read_versioned_json_callers() -> set[str]: "src/autoskillit/planner/consolidation.py": "Transient single-pipeline-run artifacts", "src/autoskillit/planner/validation.py": "Transient single-pipeline-run artifacts", "src/autoskillit/execution/_recording_skills.py": "Informational manifest — never read back", - "src/autoskillit/fleet/_api.py": "Progress signal — written and deleted, never read back", + "src/autoskillit/core/_execution_marker.py": ( + "Progress signal — written and deleted, never read back" + ), "src/autoskillit/execution/session_log.py": ( "token_usage.json readers use dual-key fallback, not version-gated reading" ), From aa101f37a7262a5cc790403b99178ddd5035311c Mon Sep 17 00:00:00 2001 From: Trecek Date: Sat, 30 May 2026 19:34:23 -0700 Subject: [PATCH 05/17] test: add TestExecutionMarkerLifecycle to close marker lifecycle verification gap MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Verifies that execution_marker context manager creates a marker file on entry and deletes it on exit — the lifecycle unit test that was missing from the stale-detector run_skill marker blind spot audit. Co-Authored-By: Claude Sonnet 4.6 --- ...ss_session_log_monitor_stale_suppression.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/tests/execution/test_process_session_log_monitor_stale_suppression.py b/tests/execution/test_process_session_log_monitor_stale_suppression.py index 2082273578..235e3113f9 100644 --- a/tests/execution/test_process_session_log_monitor_stale_suppression.py +++ b/tests/execution/test_process_session_log_monitor_stale_suppression.py @@ -3,6 +3,7 @@ from __future__ import annotations import time +from pathlib import Path from unittest.mock import patch import anyio @@ -378,3 +379,20 @@ async def touch_marker() -> None: "run-skill execution marker suppression not working — " "marker may not be matched by the glob pattern." ) + + +class TestExecutionMarkerLifecycle: + @pytest.mark.anyio + async def test_execution_marker_lifecycle(self, tmp_path: Path) -> None: + from autoskillit.core._execution_marker import execution_marker + + marker_dir = tmp_path / "markers" + marker_dir.mkdir() + + async with execution_marker(marker_dir, "session-123", "run-skill") as path: + assert path is not None + matches = list(marker_dir.glob("run-skill-in-progress-session-123-*.marker")) + assert len(matches) == 1 + + remaining = list(marker_dir.glob("*-in-progress-*.marker")) + assert remaining == [], f"marker not cleaned up: {remaining}" From 17057324cec3f3bb205eaac5a24892f0be39b947 Mon Sep 17 00:00:00 2001 From: Trecek Date: Sat, 30 May 2026 20:14:14 -0700 Subject: [PATCH 06/17] test: add failing tests for pre-resume success guard and has_completed_dispatch Add TestResumeSuccessGuard to test_resume_preflight.py covering the _run_dispatch early-return path when prior dispatch already succeeded. test_resume_rejected_when_prior_dispatch_succeeded fails until the guard is wired in Phase 2. Add TestHasCompletedDispatch to test_state.py and implement has_completed_dispatch in state_recovery.py (name-specific SUCCESS check, fail-open on missing/corrupted state). Co-Authored-By: Claude Sonnet 4.6 --- src/autoskillit/fleet/state_recovery.py | 19 +++ tests/fleet/test_resume_preflight.py | 148 ++++++++++++++++++++++++ tests/fleet/test_state.py | 36 ++++++ 3 files changed, 203 insertions(+) diff --git a/src/autoskillit/fleet/state_recovery.py b/src/autoskillit/fleet/state_recovery.py index c718e9537f..e296693ff0 100644 --- a/src/autoskillit/fleet/state_recovery.py +++ b/src/autoskillit/fleet/state_recovery.py @@ -32,6 +32,7 @@ "derive_orchestrator_resume_spec", "find_dispatch_for_issue", "has_blocking_dispatch", + "has_completed_dispatch", "has_failed_dispatch", "resume_campaign_from_state", ] @@ -116,6 +117,24 @@ def has_blocking_dispatch(state_path: Path) -> bool: return False +def has_completed_dispatch(state_path: Path, dispatch_name: str) -> bool: + """Return True if the named dispatch already has SUCCESS status. + + Returns False when the file is missing or corrupted (fail-open). + """ + from autoskillit.fleet.state import read_state # noqa: PLC0415 + + if not state_path.exists(): + return False + state = read_state(state_path) + if state is None: + return False + for d in state.dispatches: + if d.name == dispatch_name: + return d.status == DispatchStatus.SUCCESS + return False + + def _is_abandon_kill_metadata(retry_reason: str, infra_exit_category: str) -> bool: """Return True when stored kill metadata indicates resume would be futile.""" if retry_reason in _ABANDON_REASONS: diff --git a/tests/fleet/test_resume_preflight.py b/tests/fleet/test_resume_preflight.py index 4b8f18315b..8b3f334442 100644 --- a/tests/fleet/test_resume_preflight.py +++ b/tests/fleet/test_resume_preflight.py @@ -214,3 +214,151 @@ async def test_session_id_mismatch_logs_warning(self, tool_ctx, monkeypatch, tmp assert mismatch_logs, f"Expected session_id_continuity_mismatch warning, got: {logs}" assert mismatch_logs[0]["resume_session_id"] == "original-session-id" assert mismatch_logs[0]["returned_session_id"] == "returned-different-session" + + +class TestResumeSuccessGuard: + @pytest.mark.anyio + async def test_resume_rejected_when_prior_dispatch_succeeded(self, tool_ctx, monkeypatch): + """_run_dispatch returns cached SUCCESS when prior dispatch already succeeded.""" + from autoskillit.fleet import DispatchRecord, DispatchStatus, write_initial_state + from autoskillit.fleet._api import execute_dispatch + from autoskillit.fleet.state_types import DispatchCompleted + + _setup_dispatch(tool_ctx, monkeypatch) + + dispatches_dir = tool_ctx.temp_dir / "dispatches" + dispatches_dir.mkdir(parents=True, exist_ok=True) + prior_id = "prior-dispatch-succeeded" + state_path = dispatches_dir / f"{prior_id}.json" + write_initial_state( + state_path, + tool_ctx.kitchen_id, + "camp", + "", + [ + DispatchRecord( + name="test-recipe", + status=DispatchStatus.SUCCESS, + dispatch_id="completed-dispatch-id", + dispatched_session_id="completed-session-id", + reason="completed_clean", + ) + ], + ) + + result = await execute_dispatch( + tool_ctx=tool_ctx, + recipe="test-recipe", + task="t", + ingredients=None, + dispatch_name=None, + timeout_sec=None, + prompt_builder=lambda **_: "prompt", + quota_checker=_no_sleep_quota_checker, + quota_refresher=_noop_quota_refresher, + resume_session_id="some-session-id", + prior_dispatch_id=prior_id, + ) + + assert isinstance(result.outcome, DispatchCompleted) + assert result.outcome.success is True + assert result.outcome.dispatch_status == DispatchStatus.SUCCESS + assert len(tool_ctx.executor.dispatch_calls) == 0 + + @pytest.mark.anyio + async def test_resume_proceeds_when_prior_dispatch_resumable( + self, tool_ctx, monkeypatch, tmp_path + ): + """_run_dispatch proceeds normally when prior dispatch is RESUMABLE.""" + from autoskillit.fleet import DispatchRecord, DispatchStatus, write_initial_state + from autoskillit.fleet._api import execute_dispatch + + _setup_dispatch(tool_ctx, monkeypatch) + + jsonl_file = tmp_path / "session.jsonl" + jsonl_file.touch() + monkeypatch.setattr("autoskillit.fleet._api.claude_code_log_path", lambda *_: jsonl_file) + monkeypatch.setattr( + "autoskillit.fleet._api.parse_l3_result_block", + lambda **_: _make_no_sentinel(), + ) + + dispatches_dir = tool_ctx.temp_dir / "dispatches" + dispatches_dir.mkdir(parents=True, exist_ok=True) + prior_id = "prior-dispatch-resumable" + state_path = dispatches_dir / f"{prior_id}.json" + write_initial_state( + state_path, + tool_ctx.kitchen_id, + "camp", + "", + [ + DispatchRecord( + name="test-recipe", + status=DispatchStatus.RESUMABLE, + dispatched_session_id="resumable-session-id", + ) + ], + ) + + await execute_dispatch( + tool_ctx=tool_ctx, + recipe="test-recipe", + task="t", + ingredients=None, + dispatch_name=None, + timeout_sec=None, + prompt_builder=lambda **_: "prompt", + quota_checker=_no_sleep_quota_checker, + quota_refresher=_noop_quota_refresher, + resume_session_id="resumable-session-id", + prior_dispatch_id=prior_id, + ) + + assert len(tool_ctx.executor.dispatch_calls) == 1 + + @pytest.mark.anyio + async def test_resume_proceeds_when_prior_dispatch_not_in_state( + self, tool_ctx, monkeypatch, tmp_path + ): + """_run_dispatch proceeds when prior state has no matching dispatch (fail-open).""" + from autoskillit.fleet import DispatchRecord, DispatchStatus, write_initial_state + from autoskillit.fleet._api import execute_dispatch + + _setup_dispatch(tool_ctx, monkeypatch) + + jsonl_file = tmp_path / "session.jsonl" + jsonl_file.touch() + monkeypatch.setattr("autoskillit.fleet._api.claude_code_log_path", lambda *_: jsonl_file) + monkeypatch.setattr( + "autoskillit.fleet._api.parse_l3_result_block", + lambda **_: _make_no_sentinel(), + ) + + dispatches_dir = tool_ctx.temp_dir / "dispatches" + dispatches_dir.mkdir(parents=True, exist_ok=True) + prior_id = "prior-dispatch-other-name" + state_path = dispatches_dir / f"{prior_id}.json" + write_initial_state( + state_path, + tool_ctx.kitchen_id, + "camp", + "", + [DispatchRecord(name="other-recipe", status=DispatchStatus.SUCCESS)], + ) + + await execute_dispatch( + tool_ctx=tool_ctx, + recipe="test-recipe", + task="t", + ingredients=None, + dispatch_name=None, + timeout_sec=None, + prompt_builder=lambda **_: "prompt", + quota_checker=_no_sleep_quota_checker, + quota_refresher=_noop_quota_refresher, + resume_session_id="some-session-id", + prior_dispatch_id=prior_id, + ) + + assert len(tool_ctx.executor.dispatch_calls) == 1 diff --git a/tests/fleet/test_state.py b/tests/fleet/test_state.py index 6a3909242f..34a1354155 100644 --- a/tests/fleet/test_state.py +++ b/tests/fleet/test_state.py @@ -944,6 +944,42 @@ def test_mixed_infrastructure_and_logic_failure_halts(self, tmp_path: Path) -> N assert has_failed_dispatch(sp) is True +class TestHasCompletedDispatch: + def test_has_completed_dispatch_returns_true_for_success(self, tmp_path: Path) -> None: + """has_completed_dispatch returns True when the named dispatch has SUCCESS status.""" + from autoskillit.fleet.state_recovery import has_completed_dispatch + + sp = _state_path(tmp_path) + write_initial_state( + sp, + "cid", + "camp", + "/m.yaml", + [DispatchRecord(name="d1", status=DispatchStatus.SUCCESS)], + ) + assert has_completed_dispatch(sp, "d1") is True + + def test_has_completed_dispatch_returns_false_for_non_success(self, tmp_path: Path) -> None: + """has_completed_dispatch returns False for RESUMABLE, PENDING, FAILURE, etc.""" + from autoskillit.fleet.state_recovery import has_completed_dispatch + + sp = _state_path(tmp_path) + write_initial_state(sp, "cid", "camp", "/m.yaml", _make_dispatches("d1", "d2", "d3")) + append_dispatch_record( + sp, DispatchRecord(name="d2", status=DispatchStatus.FAILURE, reason="task-failed") + ) + assert has_completed_dispatch(sp, "d1") is False + assert has_completed_dispatch(sp, "d2") is False + assert has_completed_dispatch(sp, "d3") is False + + def test_has_completed_dispatch_returns_false_for_missing_state(self, tmp_path: Path) -> None: + """has_completed_dispatch returns False when state file is missing (fail-open).""" + from autoskillit.fleet.state_recovery import has_completed_dispatch + + sp = tmp_path / "nonexistent" / "state.json" + assert has_completed_dispatch(sp, "any-dispatch") is False + + class TestRetryReasonPropagation: def test_retry_reason_stored_in_dispatch_record(self, tmp_path: Path) -> None: sp = _state_path(tmp_path) From 88a8f6cce9941a5c6d78a7e6f16703ad8cf24e7f Mon Sep 17 00:00:00 2001 From: Trecek Date: Sat, 30 May 2026 20:15:48 -0700 Subject: [PATCH 07/17] feat: implement pre-resume success guard and has_completed_dispatch re-exports Wire _build_success_short_circuit into _run_dispatch: when prior dispatch already has DispatchStatus.SUCCESS, return the cached result immediately without launching a subprocess. Add has_completed_dispatch re-exports through state.py and fleet/__init__ so callers can use `from autoskillit.fleet import has_completed_dispatch`. Add dispatch_food_truck MCP tool pre-flight guard: if campaign state already shows SUCCESS for effective_name, return the cached result before execute_dispatch is called (prevents semaphore acquisition and state handle creation for duplicate calls). Co-Authored-By: Claude Sonnet 4.6 --- src/autoskillit/fleet/__init__.py | 2 ++ src/autoskillit/fleet/_api.py | 26 +++++++++++++++++++ src/autoskillit/fleet/state.py | 2 ++ .../server/tools/tools_fleet_dispatch.py | 12 +++++++++ 4 files changed, 42 insertions(+) diff --git a/src/autoskillit/fleet/__init__.py b/src/autoskillit/fleet/__init__.py index b54e0990c7..642216f3ad 100644 --- a/src/autoskillit/fleet/__init__.py +++ b/src/autoskillit/fleet/__init__.py @@ -65,6 +65,7 @@ derive_orchestrator_resume_spec, find_dispatch_for_issue, has_blocking_dispatch, + has_completed_dispatch, ) from .state_types import ( _INFRASTRUCTURE_FAILURE_REASONS, # noqa: F401 @@ -126,6 +127,7 @@ "append_dispatch_record", "build_protected_campaign_ids", "has_blocking_dispatch", + "has_completed_dispatch", "has_failed_dispatch", "record_gate_outcome", "mark_dispatch_interrupted", diff --git a/src/autoskillit/fleet/_api.py b/src/autoskillit/fleet/_api.py index 6a6f1f8ca8..d45164ad32 100644 --- a/src/autoskillit/fleet/_api.py +++ b/src/autoskillit/fleet/_api.py @@ -35,6 +35,7 @@ if TYPE_CHECKING: from autoskillit.fleet.sidecar import IssueSidecarEntry + from autoskillit.fleet.state import DispatchRecord, DispatchStateHandle from autoskillit.pipeline.context import ToolContext logger = get_logger(__name__) @@ -211,6 +212,24 @@ def _reject(error_code: FleetErrorCode, message: str, **kwargs: Any) -> Dispatch lock.release() +def _build_success_short_circuit( + record: DispatchRecord, + handle: DispatchStateHandle, +) -> DispatchResult: + """Return a DispatchResult that mirrors a prior succeeded dispatch without re-launching.""" + return DispatchResult( + outcome=DispatchCompleted( + success=True, + dispatch_status=DispatchStatus.SUCCESS, + dispatch_id=record.dispatch_id, + dispatched_session_id=record.dispatched_session_id, + reason=record.reason, + token_usage=dict(record.token_usage), + ), + per_dispatch_state_path=handle.state_path, + ) + + async def _run_dispatch( tool_ctx: ToolContext, recipe: str, @@ -349,6 +368,13 @@ async def _run_dispatch( if prior_state: for d in prior_state.dispatches: if d.name == effective_name: + if d.status == DispatchStatus.SUCCESS: + logger.info( + "resume_skipped_prior_success", + dispatch_name=effective_name, + prior_dispatch_id=prior_dispatch_id, + ) + return _build_success_short_circuit(d, handle) prior_session_chain = list(d.session_chain) prior_dispatched_session_id = d.dispatched_session_id break diff --git a/src/autoskillit/fleet/state.py b/src/autoskillit/fleet/state.py index 0849f68fd8..6a725aa322 100644 --- a/src/autoskillit/fleet/state.py +++ b/src/autoskillit/fleet/state.py @@ -26,6 +26,7 @@ from autoskillit.fleet.state_gates import record_gate_outcome from autoskillit.fleet.state_recovery import ( has_blocking_dispatch, + has_completed_dispatch, has_failed_dispatch, resume_campaign_from_state, ) @@ -57,6 +58,7 @@ "record_gate_outcome", # re-exported from state_recovery "has_blocking_dispatch", + "has_completed_dispatch", "has_failed_dispatch", "resume_campaign_from_state", # re-exported from state_types diff --git a/src/autoskillit/server/tools/tools_fleet_dispatch.py b/src/autoskillit/server/tools/tools_fleet_dispatch.py index 0f0be53a0a..ff52801fed 100755 --- a/src/autoskillit/server/tools/tools_fleet_dispatch.py +++ b/src/autoskillit/server/tools/tools_fleet_dispatch.py @@ -275,6 +275,18 @@ async def dispatch_food_truck( caller_session_id = find_caller_session_id(project_dir=tool_ctx.project_dir) effective_name = dispatch_name or recipe + if campaign_state_path_str: + from autoskillit.fleet import has_completed_dispatch # noqa: PLC0415 + + if has_completed_dispatch(Path(campaign_state_path_str), effective_name): + return DispatchCompleted( + success=True, + dispatch_status=DispatchStatus.SUCCESS, + dispatch_id="", + dispatched_session_id="", + reason="prior dispatch already succeeded", + ).to_envelope() + if skip_when: dispatches_dir = tool_ctx.temp_dir / "dispatches" accumulated_captures = read_all_campaign_captures(dispatches_dir, tool_ctx.kitchen_id) From d88c2d05eaa6334b3fda98c38786b610799c7cbd Mon Sep 17 00:00:00 2001 From: Trecek Date: Sat, 30 May 2026 20:17:49 -0700 Subject: [PATCH 08/17] feat: bound analyze-pipeline-health stale_threshold to 2400s and cap scanner depth Add stale_threshold: 2400 to all run_diagnostic* steps in implementation, remediation, and merge-prs recipes (10 steps total). The default 1200s is too short for multi-subagent scanner work; 2400s matches the existing convention for long-running steps (implement, retry_worktree). Add scanner maxTurns and wall-clock soft-deadline guidance to analyze-pipeline-health SKILL.md to bound investigation per scanner and prevent the stale detector misfiring window. Co-Authored-By: Claude Sonnet 4.6 --- src/autoskillit/recipes/implementation.yaml | 4 ++++ src/autoskillit/recipes/merge-prs.yaml | 2 ++ src/autoskillit/recipes/remediation.yaml | 4 ++++ .../skills_extended/analyze-pipeline-health/SKILL.md | 1 + 4 files changed, 11 insertions(+) diff --git a/src/autoskillit/recipes/implementation.yaml b/src/autoskillit/recipes/implementation.yaml index b3fe6ac0f1..70e60a522d 100644 --- a/src/autoskillit/recipes/implementation.yaml +++ b/src/autoskillit/recipes/implementation.yaml @@ -1318,6 +1318,7 @@ steps: description: Run post-pipeline diagnostic analysis on session logs (no-CI path) tool: run_skill model: '' + stale_threshold: 2400 optional: true skip_when_false: inputs.post_run_diagnostics with: @@ -2150,6 +2151,7 @@ steps: path) tool: run_skill model: '' + stale_threshold: 2400 optional: true skip_when_false: inputs.post_run_diagnostics with: @@ -2205,6 +2207,7 @@ steps: description: Run post-pipeline diagnostic analysis on session logs tool: run_skill model: '' + stale_threshold: 2400 optional: true skip_when_false: inputs.post_run_diagnostics with: @@ -2224,6 +2227,7 @@ steps: description: Run post-pipeline diagnostic analysis on session logs (error path) tool: run_skill model: '' + stale_threshold: 2400 optional: true skip_when_false: inputs.post_run_diagnostics with: diff --git a/src/autoskillit/recipes/merge-prs.yaml b/src/autoskillit/recipes/merge-prs.yaml index 8c6d95fd53..bc6a69f931 100644 --- a/src/autoskillit/recipes/merge-prs.yaml +++ b/src/autoskillit/recipes/merge-prs.yaml @@ -1658,6 +1658,7 @@ steps: description: Run post-pipeline diagnostic analysis on session logs tool: run_skill model: '' + stale_threshold: 2400 optional: true skip_when_false: inputs.post_run_diagnostics with: @@ -1677,6 +1678,7 @@ steps: description: Run post-pipeline diagnostic analysis on session logs (error path) tool: run_skill model: '' + stale_threshold: 2400 optional: true skip_when_false: inputs.post_run_diagnostics with: diff --git a/src/autoskillit/recipes/remediation.yaml b/src/autoskillit/recipes/remediation.yaml index d76ce6be16..16644db945 100644 --- a/src/autoskillit/recipes/remediation.yaml +++ b/src/autoskillit/recipes/remediation.yaml @@ -1404,6 +1404,7 @@ steps: description: Run post-pipeline diagnostic analysis on session logs (no-CI path) tool: run_skill model: '' + stale_threshold: 2400 optional: true skip_when_false: inputs.post_run_diagnostics with: @@ -2204,6 +2205,7 @@ steps: path) tool: run_skill model: '' + stale_threshold: 2400 optional: true skip_when_false: inputs.post_run_diagnostics with: @@ -2291,6 +2293,7 @@ steps: description: Run post-pipeline diagnostic analysis on session logs tool: run_skill model: '' + stale_threshold: 2400 optional: true skip_when_false: inputs.post_run_diagnostics with: @@ -2310,6 +2313,7 @@ steps: description: Run post-pipeline diagnostic analysis on session logs (error path) tool: run_skill model: '' + stale_threshold: 2400 optional: true skip_when_false: inputs.post_run_diagnostics with: diff --git a/src/autoskillit/skills_extended/analyze-pipeline-health/SKILL.md b/src/autoskillit/skills_extended/analyze-pipeline-health/SKILL.md index c150a262a1..ebe24edbf6 100644 --- a/src/autoskillit/skills_extended/analyze-pipeline-health/SKILL.md +++ b/src/autoskillit/skills_extended/analyze-pipeline-health/SKILL.md @@ -36,6 +36,7 @@ Coordinator skill that reads session logs from a pipeline run, groups them by st - Filter sessions.jsonl by kitchen_id to scope to this pipeline run - Spawn scanner subagents in parallel (one per step group) - Use model: "haiku" for scanner subagents +- Cap each scanner's investigation budget: set `maxTurns` to the limit in the agent definition and include a wall-clock soft-deadline instruction in the scanner prompt (e.g. "complete your analysis within 15 minutes; report partial findings if you reach the limit") - Report "no issues found" clearly when the pipeline is clean - Issue all Task calls in a single message to maximize parallelism From a5f301a8c32da7c34ec4f2ddef10ad920d012cf2 Mon Sep 17 00:00:00 2001 From: Trecek Date: Sat, 30 May 2026 20:40:52 -0700 Subject: [PATCH 09/17] chore: commit pending session changes --- src/autoskillit/recipes/implementation.json | 4 ++++ src/autoskillit/recipes/merge-prs.json | 2 ++ src/autoskillit/recipes/remediation.json | 4 ++++ 3 files changed, 10 insertions(+) diff --git a/src/autoskillit/recipes/implementation.json b/src/autoskillit/recipes/implementation.json index c620c5df3d..727657d4ae 100644 --- a/src/autoskillit/recipes/implementation.json +++ b/src/autoskillit/recipes/implementation.json @@ -1427,6 +1427,7 @@ "description": "Run post-pipeline diagnostic analysis on session logs (no-CI path)", "tool": "run_skill", "model": "", + "stale_threshold": 2400, "optional": true, "skip_when_false": "inputs.post_run_diagnostics", "with": { @@ -2337,6 +2338,7 @@ "description": "Run post-pipeline diagnostic analysis on session logs (unconfirmed path)", "tool": "run_skill", "model": "", + "stale_threshold": 2400, "optional": true, "skip_when_false": "inputs.post_run_diagnostics", "with": { @@ -2391,6 +2393,7 @@ "description": "Run post-pipeline diagnostic analysis on session logs", "tool": "run_skill", "model": "", + "stale_threshold": 2400, "optional": true, "skip_when_false": "inputs.post_run_diagnostics", "with": { @@ -2407,6 +2410,7 @@ "description": "Run post-pipeline diagnostic analysis on session logs (error path)", "tool": "run_skill", "model": "", + "stale_threshold": 2400, "optional": true, "skip_when_false": "inputs.post_run_diagnostics", "with": { diff --git a/src/autoskillit/recipes/merge-prs.json b/src/autoskillit/recipes/merge-prs.json index a0ccdc330e..e8c5e4ccd0 100644 --- a/src/autoskillit/recipes/merge-prs.json +++ b/src/autoskillit/recipes/merge-prs.json @@ -1698,6 +1698,7 @@ "description": "Run post-pipeline diagnostic analysis on session logs", "tool": "run_skill", "model": "", + "stale_threshold": 2400, "optional": true, "skip_when_false": "inputs.post_run_diagnostics", "with": { @@ -1714,6 +1715,7 @@ "description": "Run post-pipeline diagnostic analysis on session logs (error path)", "tool": "run_skill", "model": "", + "stale_threshold": 2400, "optional": true, "skip_when_false": "inputs.post_run_diagnostics", "with": { diff --git a/src/autoskillit/recipes/remediation.json b/src/autoskillit/recipes/remediation.json index 1c6df0bcf5..ccef31e9e9 100644 --- a/src/autoskillit/recipes/remediation.json +++ b/src/autoskillit/recipes/remediation.json @@ -1547,6 +1547,7 @@ "description": "Run post-pipeline diagnostic analysis on session logs (no-CI path)", "tool": "run_skill", "model": "", + "stale_threshold": 2400, "optional": true, "skip_when_false": "inputs.post_run_diagnostics", "with": { @@ -2429,6 +2430,7 @@ "description": "Run post-pipeline diagnostic analysis on session logs (unconfirmed path)", "tool": "run_skill", "model": "", + "stale_threshold": 2400, "optional": true, "skip_when_false": "inputs.post_run_diagnostics", "with": { @@ -2511,6 +2513,7 @@ "description": "Run post-pipeline diagnostic analysis on session logs", "tool": "run_skill", "model": "", + "stale_threshold": 2400, "optional": true, "skip_when_false": "inputs.post_run_diagnostics", "with": { @@ -2527,6 +2530,7 @@ "description": "Run post-pipeline diagnostic analysis on session logs (error path)", "tool": "run_skill", "model": "", + "stale_threshold": 2400, "optional": true, "skip_when_false": "inputs.post_run_diagnostics", "with": { From 88e4ee2a95a68395a9f894207b529f958d0e6a0c Mon Sep 17 00:00:00 2001 From: Trecek Date: Sat, 30 May 2026 21:09:44 -0700 Subject: [PATCH 10/17] fix: catch FileNotFoundError from open_continued in _run_dispatch Move DispatchStateHandle.open_continued inside the try block so that FileNotFoundError (a subclass of OSError) is caught by the existing except handler. Add a create_fresh fallback in the except so handle is always bound after the try/except, enabling fail-open resume when the prior state file is absent. Extract recipe_snapshot before the if/else so it is available in both the except fallback and the else branch. Add test_resume_proceeds_when_prior_state_missing to TestResumeSuccessGuard to verify the fail-open behaviour. Co-Authored-By: Claude Sonnet 4.6 --- src/autoskillit/fleet/_api.py | 24 +++++++++++++------- tests/fleet/test_resume_preflight.py | 33 ++++++++++++++++++++++++++++ 2 files changed, 49 insertions(+), 8 deletions(-) diff --git a/src/autoskillit/fleet/_api.py b/src/autoskillit/fleet/_api.py index d45164ad32..032352a3b5 100644 --- a/src/autoskillit/fleet/_api.py +++ b/src/autoskillit/fleet/_api.py @@ -359,11 +359,18 @@ async def _run_dispatch( dispatches_dir.mkdir(parents=True, exist_ok=True) campaign_id = tool_ctx.kitchen_id + recipe_snapshot = { + "recipe_name": recipe_obj.name, + "recipe_path": str(recipe_obj.path), + "recipe_version": recipe_obj.recipe_version or "", + "content_hash": recipe_obj.content_hash or "", + "effective_ingredients": dict(effective_ingredients), + } prior_session_chain: list[str] = [] prior_dispatched_session_id = "" if resume_session_id and prior_dispatch_id: - handle = DispatchStateHandle.open_continued(dispatches_dir, prior_dispatch_id) try: + handle = DispatchStateHandle.open_continued(dispatches_dir, prior_dispatch_id) prior_state = read_state(handle.state_path) if prior_state: for d in prior_state.dispatches: @@ -380,14 +387,15 @@ async def _run_dispatch( break except (OSError, ValueError, KeyError, TypeError): logger.warning("failed to read prior session chain from state", exc_info=True) + handle = DispatchStateHandle.create_fresh( + dispatches_dir, + campaign_id, + effective_name, + "", + [DispatchRecord(name=effective_name, caller_session_id=caller_session_id)], + recipe_snapshot, + ) else: - recipe_snapshot = { - "recipe_name": recipe_obj.name, - "recipe_path": str(recipe_obj.path), - "recipe_version": recipe_obj.recipe_version or "", - "content_hash": recipe_obj.content_hash or "", - "effective_ingredients": dict(effective_ingredients), - } handle = DispatchStateHandle.create_fresh( dispatches_dir, campaign_id, diff --git a/tests/fleet/test_resume_preflight.py b/tests/fleet/test_resume_preflight.py index 8b3f334442..598b6c0359 100644 --- a/tests/fleet/test_resume_preflight.py +++ b/tests/fleet/test_resume_preflight.py @@ -362,3 +362,36 @@ async def test_resume_proceeds_when_prior_dispatch_not_in_state( ) assert len(tool_ctx.executor.dispatch_calls) == 1 + + @pytest.mark.anyio + async def test_resume_proceeds_when_prior_state_missing(self, tool_ctx, monkeypatch, tmp_path): + """_run_dispatch proceeds when prior state file does not exist (fail-open).""" + from autoskillit.fleet._api import execute_dispatch + + _setup_dispatch(tool_ctx, monkeypatch) + + jsonl_file = tmp_path / "session.jsonl" + jsonl_file.touch() + monkeypatch.setattr("autoskillit.fleet._api.claude_code_log_path", lambda *_: jsonl_file) + monkeypatch.setattr( + "autoskillit.fleet._api.parse_l3_result_block", + lambda **_: _make_no_sentinel(), + ) + + prior_id = "prior-dispatch-nonexistent" + + await execute_dispatch( + tool_ctx=tool_ctx, + recipe="test-recipe", + task="t", + ingredients=None, + dispatch_name=None, + timeout_sec=None, + prompt_builder=lambda **_: "prompt", + quota_checker=_no_sleep_quota_checker, + quota_refresher=_noop_quota_refresher, + resume_session_id="some-session-id", + prior_dispatch_id=prior_id, + ) + + assert len(tool_ctx.executor.dispatch_calls) == 1 From e44788216154d53f42fd2d2b601f5ea63da70149 Mon Sep 17 00:00:00 2001 From: Trecek Date: Sat, 30 May 2026 21:56:29 -0700 Subject: [PATCH 11/17] fix(review): use uuid suffix and anyio task group in execution_marker Replace duplicate label suffix in marker filename with uuid4 hex to prevent collisions between concurrent executions sharing the same label + session_id. Replace asyncio.create_task with anyio task group for structured concurrency and proper exception visibility. Co-Authored-By: Claude Opus 4.6 --- src/autoskillit/core/_execution_marker.py | 41 +++++++++-------------- 1 file changed, 16 insertions(+), 25 deletions(-) diff --git a/src/autoskillit/core/_execution_marker.py b/src/autoskillit/core/_execution_marker.py index 2dc38522b8..0c24585cfa 100644 --- a/src/autoskillit/core/_execution_marker.py +++ b/src/autoskillit/core/_execution_marker.py @@ -1,6 +1,6 @@ """Unified execution marker protocol for stale-detector suppression. -Async context manager that writes a ``*-in-progress-{session_id}-{label}.marker`` +Async context manager that writes a ``{label}-in-progress-{session_id}-{uuid}.marker`` file, heartbeats its mtime, and deletes it on exit. Lives in ``core/`` (IL-0) so both ``fleet/_api.py`` (IL-2) and ``server/tools/`` (IL-3) can import it without violating layer constraints. @@ -8,8 +8,8 @@ from __future__ import annotations -import asyncio import os +import uuid from collections.abc import AsyncGenerator from contextlib import asynccontextmanager from pathlib import Path @@ -22,12 +22,12 @@ logger = get_logger(__name__) -async def _touch_marker(marker_path: Path, interval: float, trigger: anyio.Event | None) -> None: +async def _touch_marker(marker_path: Path, interval: float) -> None: try: marker_path.touch() except OSError: logger.warning("execution_marker: touch failed %s", marker_path, exc_info=True) - while trigger is None or not trigger.is_set(): + while True: await anyio.sleep(interval) try: marker_path.touch() @@ -52,7 +52,7 @@ async def execution_marker( yield None return - marker_path = marker_dir / f"{label}-in-progress-{session_id}-{label}.marker" + marker_path = marker_dir / f"{label}-in-progress-{session_id}-{uuid.uuid4().hex[:8]}.marker" try: write_versioned_json( marker_path, @@ -68,24 +68,15 @@ async def execution_marker( yield None return - hb_task: asyncio.Task[None] | None = None - trigger = anyio.Event() - try: - hb_task = asyncio.get_running_loop().create_task( - _touch_marker(marker_path, heartbeat_interval, trigger) - ) - yield marker_path - finally: - trigger.set() - if hb_task is not None: - hb_task.cancel() - try: - await hb_task - except (asyncio.CancelledError, Exception): - pass + async with anyio.create_task_group() as tg: + tg.start_soon(_touch_marker, marker_path, heartbeat_interval) try: - marker_path.unlink(missing_ok=True) - except OSError: - logger.warning( - "execution_marker_unlink_failed", marker=str(marker_path), exc_info=True - ) + yield marker_path + finally: + tg.cancel_scope.cancel() + try: + marker_path.unlink(missing_ok=True) + except OSError: + logger.warning( + "execution_marker_unlink_failed", marker=str(marker_path), exc_info=True + ) From 1d77c2de45b4349e93b17791c28cd6441059eb65 Mon Sep 17 00:00:00 2001 From: Trecek Date: Sat, 30 May 2026 21:57:43 -0700 Subject: [PATCH 12/17] fix(review): forward stored dispatch_id/session_id in pre-flight guard Add find_completed_dispatch helper that returns the full DispatchRecord for a completed dispatch. Use it in the dispatch_food_truck MCP tool pre-flight guard to forward actual dispatch_id and dispatched_session_id instead of hardcoded empty strings. Co-Authored-By: Claude Opus 4.6 --- src/autoskillit/fleet/__init__.py | 2 ++ src/autoskillit/fleet/state.py | 2 ++ src/autoskillit/fleet/state_recovery.py | 20 ++++++++++++++----- .../server/tools/tools_fleet_dispatch.py | 9 +++++---- 4 files changed, 24 insertions(+), 9 deletions(-) diff --git a/src/autoskillit/fleet/__init__.py b/src/autoskillit/fleet/__init__.py index 642216f3ad..2a4c23287a 100644 --- a/src/autoskillit/fleet/__init__.py +++ b/src/autoskillit/fleet/__init__.py @@ -63,6 +63,7 @@ from .state_recovery import ( classify_stale_dispatch, derive_orchestrator_resume_spec, + find_completed_dispatch, find_dispatch_for_issue, has_blocking_dispatch, has_completed_dispatch, @@ -126,6 +127,7 @@ "GateRecordResult", "append_dispatch_record", "build_protected_campaign_ids", + "find_completed_dispatch", "has_blocking_dispatch", "has_completed_dispatch", "has_failed_dispatch", diff --git a/src/autoskillit/fleet/state.py b/src/autoskillit/fleet/state.py index 6a725aa322..bb5484d587 100644 --- a/src/autoskillit/fleet/state.py +++ b/src/autoskillit/fleet/state.py @@ -25,6 +25,7 @@ ) from autoskillit.fleet.state_gates import record_gate_outcome from autoskillit.fleet.state_recovery import ( + find_completed_dispatch, has_blocking_dispatch, has_completed_dispatch, has_failed_dispatch, @@ -57,6 +58,7 @@ # re-exported from state_gates "record_gate_outcome", # re-exported from state_recovery + "find_completed_dispatch", "has_blocking_dispatch", "has_completed_dispatch", "has_failed_dispatch", diff --git a/src/autoskillit/fleet/state_recovery.py b/src/autoskillit/fleet/state_recovery.py index e296693ff0..1827f12072 100644 --- a/src/autoskillit/fleet/state_recovery.py +++ b/src/autoskillit/fleet/state_recovery.py @@ -30,6 +30,7 @@ __all__ = [ "classify_stale_dispatch", "derive_orchestrator_resume_spec", + "find_completed_dispatch", "find_dispatch_for_issue", "has_blocking_dispatch", "has_completed_dispatch", @@ -122,17 +123,26 @@ def has_completed_dispatch(state_path: Path, dispatch_name: str) -> bool: Returns False when the file is missing or corrupted (fail-open). """ + return find_completed_dispatch(state_path, dispatch_name) is not None + + +def find_completed_dispatch(state_path: Path, dispatch_name: str) -> DispatchRecord | None: + """Return the DispatchRecord if the named dispatch has SUCCESS status. + + Returns None when the file is missing, corrupted, or no matching SUCCESS + record exists (fail-open). + """ from autoskillit.fleet.state import read_state # noqa: PLC0415 if not state_path.exists(): - return False + return None state = read_state(state_path) if state is None: - return False + return None for d in state.dispatches: - if d.name == dispatch_name: - return d.status == DispatchStatus.SUCCESS - return False + if d.name == dispatch_name and d.status == DispatchStatus.SUCCESS: + return d + return None def _is_abandon_kill_metadata(retry_reason: str, infra_exit_category: str) -> bool: diff --git a/src/autoskillit/server/tools/tools_fleet_dispatch.py b/src/autoskillit/server/tools/tools_fleet_dispatch.py index ff52801fed..c19815086e 100755 --- a/src/autoskillit/server/tools/tools_fleet_dispatch.py +++ b/src/autoskillit/server/tools/tools_fleet_dispatch.py @@ -276,14 +276,15 @@ async def dispatch_food_truck( effective_name = dispatch_name or recipe if campaign_state_path_str: - from autoskillit.fleet import has_completed_dispatch # noqa: PLC0415 + from autoskillit.fleet import find_completed_dispatch # noqa: PLC0415 - if has_completed_dispatch(Path(campaign_state_path_str), effective_name): + prior_record = find_completed_dispatch(Path(campaign_state_path_str), effective_name) + if prior_record is not None: return DispatchCompleted( success=True, dispatch_status=DispatchStatus.SUCCESS, - dispatch_id="", - dispatched_session_id="", + dispatch_id=prior_record.dispatch_id, + dispatched_session_id=prior_record.dispatched_session_id, reason="prior dispatch already succeeded", ).to_envelope() From c04cbe6913fbc40fbe6a65e10b6f84c1c6928e8c Mon Sep 17 00:00:00 2001 From: Trecek Date: Sat, 30 May 2026 21:58:13 -0700 Subject: [PATCH 13/17] fix(review): strengthen test assertions for monitor status and dispatch fields Add monitor_status emptiness assertion in test_stale_NOT_fired_when_execution_marker_active to distinguish STALE from other early-exit conditions. Add dispatch_id, dispatched_session_id, and reason field assertions in test_resume_rejected_when_prior_dispatch_succeeded. Co-Authored-By: Claude Opus 4.6 --- tests/execution/test_process_session_log_monitor.py | 3 +++ tests/fleet/test_resume_preflight.py | 3 +++ 2 files changed, 6 insertions(+) diff --git a/tests/execution/test_process_session_log_monitor.py b/tests/execution/test_process_session_log_monitor.py index dbc612ac87..7787bf80de 100644 --- a/tests/execution/test_process_session_log_monitor.py +++ b/tests/execution/test_process_session_log_monitor.py @@ -834,6 +834,9 @@ async def touch_marker() -> None: f"Monitor returned early: stale fired with active run-skill execution marker. " f"Status: {monitor_status}" ) + assert monitor_status == [], ( + f"Monitor emitted a status despite not returning: {monitor_status}" + ) @pytest.mark.anyio diff --git a/tests/fleet/test_resume_preflight.py b/tests/fleet/test_resume_preflight.py index 598b6c0359..e91cbc48a0 100644 --- a/tests/fleet/test_resume_preflight.py +++ b/tests/fleet/test_resume_preflight.py @@ -263,6 +263,9 @@ async def test_resume_rejected_when_prior_dispatch_succeeded(self, tool_ctx, mon assert isinstance(result.outcome, DispatchCompleted) assert result.outcome.success is True assert result.outcome.dispatch_status == DispatchStatus.SUCCESS + assert result.outcome.dispatch_id == "completed-dispatch-id" + assert result.outcome.dispatched_session_id == "completed-session-id" + assert result.outcome.reason == "completed_clean" assert len(tool_ctx.executor.dispatch_calls) == 0 @pytest.mark.anyio From 36a9889a16f25d80d6e4342ce862d2fd78f6bc34 Mon Sep 17 00:00:00 2001 From: Trecek Date: Sat, 30 May 2026 22:06:11 -0700 Subject: [PATCH 14/17] fix(review): use asyncio.create_task with proper exception handling Revert from anyio task group (yield inside task group body causes cleanup issues with @asynccontextmanager) to asyncio.create_task with split exception handling: CancelledError silenced, other exceptions logged. Update tests to match new _touch_marker 2-arg signature. Co-Authored-By: Claude Opus 4.6 --- src/autoskillit/core/_execution_marker.py | 32 +++++++++++++++-------- tests/fleet/test_api.py | 19 ++++---------- tests/fleet/test_api_dispatch_marker.py | 6 ++--- 3 files changed, 28 insertions(+), 29 deletions(-) diff --git a/src/autoskillit/core/_execution_marker.py b/src/autoskillit/core/_execution_marker.py index 0c24585cfa..a8c09d8674 100644 --- a/src/autoskillit/core/_execution_marker.py +++ b/src/autoskillit/core/_execution_marker.py @@ -8,6 +8,7 @@ from __future__ import annotations +import asyncio import os import uuid from collections.abc import AsyncGenerator @@ -68,15 +69,24 @@ async def execution_marker( yield None return - async with anyio.create_task_group() as tg: - tg.start_soon(_touch_marker, marker_path, heartbeat_interval) - try: - yield marker_path - finally: - tg.cancel_scope.cancel() + hb_task: asyncio.Task[None] | None = None + try: + hb_task = asyncio.get_running_loop().create_task( + _touch_marker(marker_path, heartbeat_interval) + ) + yield marker_path + finally: + if hb_task is not None: + hb_task.cancel() try: - marker_path.unlink(missing_ok=True) - except OSError: - logger.warning( - "execution_marker_unlink_failed", marker=str(marker_path), exc_info=True - ) + await hb_task + except asyncio.CancelledError: + pass + except Exception: + logger.warning("execution_marker: heartbeat failed", exc_info=True) + try: + marker_path.unlink(missing_ok=True) + except OSError: + logger.warning( + "execution_marker_unlink_failed", marker=str(marker_path), exc_info=True + ) diff --git a/tests/fleet/test_api.py b/tests/fleet/test_api.py index 0a2fdd5c4b..c5f04f9bf3 100644 --- a/tests/fleet/test_api.py +++ b/tests/fleet/test_api.py @@ -317,7 +317,7 @@ class TestTouchMarker: @pytest.mark.anyio async def test_touch_marker_creates_heartbeat_loop(self, tmp_path: Path) -> None: - """Heartbeat loop refreshes mtime until trigger is set.""" + """Heartbeat loop refreshes mtime until cancelled.""" import os from autoskillit.core._execution_marker import _touch_marker @@ -327,15 +327,10 @@ async def test_touch_marker_creates_heartbeat_loop(self, tmp_path: Path) -> None os.utime(marker, (0, 0)) original_mtime_ns = marker.stat().st_mtime_ns - trigger = anyio.Event() - - async def _heartbeat_wrapper() -> None: - await _touch_marker(marker, interval=0.05, trigger=trigger) - async with anyio.create_task_group() as tg: - tg.start_soon(_heartbeat_wrapper) + tg.start_soon(_touch_marker, marker, 0.05) await anyio.sleep(1.0) - trigger.set() + tg.cancel_scope.cancel() new_mtime_ns = marker.stat().st_mtime_ns assert new_mtime_ns > original_mtime_ns, "marker mtime should have been refreshed" @@ -350,16 +345,12 @@ async def test_touch_marker_oserror_does_not_propagate(self, tmp_path: Path) -> marker_file = subdir / "test.marker" marker_file.touch() marker_file.chmod(0o000) - trigger = anyio.Event() - - async def _heartbeat_wrapper() -> None: - await _touch_marker(marker_file, interval=0.05, trigger=trigger) try: async with anyio.create_task_group() as tg: - tg.start_soon(_heartbeat_wrapper) + tg.start_soon(_touch_marker, marker_file, 0.05) await anyio.sleep(0.12) - trigger.set() + tg.cancel_scope.cancel() except OSError: pytest.fail("_touch_marker should not propagate OSError") finally: diff --git a/tests/fleet/test_api_dispatch_marker.py b/tests/fleet/test_api_dispatch_marker.py index cf9b83e798..9bf11afe25 100644 --- a/tests/fleet/test_api_dispatch_marker.py +++ b/tests/fleet/test_api_dispatch_marker.py @@ -112,14 +112,12 @@ async def test_heartbeat_refreshes_mtime(tmp_path: Path) -> None: marker_path.write_text("{}") initial_mtime = marker_path.stat().st_mtime - trigger = anyio.Event() - async with anyio.create_task_group() as tg: - tg.start_soon(_touch_marker, marker_path, 0.05, trigger) + tg.start_soon(_touch_marker, marker_path, 0.05) async def _stop(): await anyio.sleep(2.1) - trigger.set() + tg.cancel_scope.cancel() tg.start_soon(_stop) From 85b34f561dff384fc119cbe938fefaf7fc0ae6c7 Mon Sep 17 00:00:00 2001 From: Trecek Date: Sat, 30 May 2026 22:26:46 -0700 Subject: [PATCH 15/17] fix(review): add upper-bound elapsed assertion to execution marker suppression test Adds assert elapsed < 5.0 after the lower-bound check so a broken suppression path (marker never matched) cannot silently pass when the test happens to run slowly. Co-Authored-By: Claude Opus 4.6 --- .../test_process_session_log_monitor_stale_suppression.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/execution/test_process_session_log_monitor_stale_suppression.py b/tests/execution/test_process_session_log_monitor_stale_suppression.py index 235e3113f9..81bfda1478 100644 --- a/tests/execution/test_process_session_log_monitor_stale_suppression.py +++ b/tests/execution/test_process_session_log_monitor_stale_suppression.py @@ -379,6 +379,10 @@ async def touch_marker() -> None: "run-skill execution marker suppression not working — " "marker may not be matched by the glob pattern." ) + assert elapsed < 5.0, ( + f"STALE fired after {elapsed:.3f}s — expected prompt firing after " + f"max_suppression={max_suppression}s, not near the 5s timeout ceiling." + ) class TestExecutionMarkerLifecycle: From 8a157f0cc4a273b7afd0c6ca17224f5c37d34845 Mon Sep 17 00:00:00 2001 From: Trecek Date: Sat, 30 May 2026 22:26:57 -0700 Subject: [PATCH 16/17] fix(review): add parameter-existence assertions before default-value checks Adds assert "marker_dir" in sig.parameters and assert "caller_session_id" in sig.parameters before accessing .default, so a missing parameter produces an actionable assertion message instead of an uninformative KeyError. Co-Authored-By: Claude Opus 4.6 --- tests/execution/test_headless_provider_forwarding.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/tests/execution/test_headless_provider_forwarding.py b/tests/execution/test_headless_provider_forwarding.py index f0a5f25a9c..91cf5333d3 100644 --- a/tests/execution/test_headless_provider_forwarding.py +++ b/tests/execution/test_headless_provider_forwarding.py @@ -1004,5 +1004,11 @@ def test_headless_executor_protocol_includes_marker_dir_params() -> None: from autoskillit.core.types import HeadlessExecutor sig = inspect.signature(HeadlessExecutor.run) + assert "marker_dir" in sig.parameters, ( + "marker_dir missing from HeadlessExecutor.run() signature" + ) + assert "caller_session_id" in sig.parameters, ( + "caller_session_id missing from HeadlessExecutor.run() signature" + ) assert sig.parameters["marker_dir"].default is None assert sig.parameters["caller_session_id"].default is None From e2edc1b5264c0e6dd6d1cac939070774656205c5 Mon Sep 17 00:00:00 2001 From: Trecek Date: Sat, 30 May 2026 22:35:12 -0700 Subject: [PATCH 17/17] fix(review): register _execution_marker in core stem cascade map New core/ module _execution_marker was missing from MODULE_CASCADE_CORE, causing test_all_core_stems_classified to fail. Maps it to the four consuming packages: core, execution, fleet, server. Co-Authored-By: Claude Opus 4.6 --- tests/_test_filter.py | 1 + tests/test_test_filter_core_cascade.py | 1 + 2 files changed, 2 insertions(+) diff --git a/tests/_test_filter.py b/tests/_test_filter.py index 4e09f7dd7e..e4cccf4d67 100644 --- a/tests/_test_filter.py +++ b/tests/_test_filter.py @@ -244,6 +244,7 @@ class ImportContext(enum.StrEnum): ), "_type_exceptions": frozenset({"core", "fleet", "recipe", "server"}), "_step_context": frozenset({"core", "execution", "pipeline", "server"}), + "_execution_marker": frozenset({"core", "execution", "fleet", "server"}), } # Narrow per-module cascade for execution/. Modules not listed here fall through diff --git a/tests/test_test_filter_core_cascade.py b/tests/test_test_filter_core_cascade.py index d736144872..6014c3eb5b 100644 --- a/tests/test_test_filter_core_cascade.py +++ b/tests/test_test_filter_core_cascade.py @@ -106,6 +106,7 @@ def test_all_entries_present(self) -> None: "_type_constants_registries", "_type_exceptions", "_step_context", + "_execution_marker", } assert set(MODULE_CASCADE_CORE.keys()) == expected_stems