diff --git a/src/autoskillit/core/CLAUDE.md b/src/autoskillit/core/CLAUDE.md index 4cefcec4b2..12a1ea0d39 100644 --- a/src/autoskillit/core/CLAUDE.md +++ b/src/autoskillit/core/CLAUDE.md @@ -22,6 +22,7 @@ Sub-packages: types/ (see types/CLAUDE.md) and runtime/ (see runtime/CLAUDE.md). | `_plugin_cache.py` | Plugin cache lifecycle: retiring cache, install locking, kitchen registry | | `_plugin_ids.py` | `DIRECT_PREFIX`, `MARKETPLACE_PREFIX`, `detect_autoskillit_mcp_prefix` (stdlib-only) | | `_install_detect.py` | `is_dev_install()` — editable-install detection for config resolution | +| `_execution_marker.py` | `execution_marker` async context manager — unified write/heartbeat/cleanup for stale-detector suppression markers | | `_step_context.py` | `current_step_name`, `current_order_id` ContextVars for pipeline step attribution | | `feature_flags.py` | `is_feature_enabled()` — IL-0 feature gate resolution primitive | | `tool_sequence_analysis.py` | Cross-session tool call sequence DFG analysis (stdlib-only) | diff --git a/src/autoskillit/core/__init__.pyi b/src/autoskillit/core/__init__.pyi index d4098b76ca..fdc76bfd96 100644 --- a/src/autoskillit/core/__init__.pyi +++ b/src/autoskillit/core/__init__.pyi @@ -4,6 +4,7 @@ from ._cmd_runner import CmdRunner as CmdRunner from ._cmd_runner import default_cmd_runner as default_cmd_runner from ._cmd_runner import run_gh as run_gh from ._cmd_runner import run_git as run_git +from ._execution_marker import execution_marker as execution_marker from ._install_detect import DirectUrlInfo as DirectUrlInfo from ._install_detect import _is_release_tag as _is_release_tag from ._install_detect import _is_stable_track as _is_stable_track diff --git a/src/autoskillit/core/_execution_marker.py b/src/autoskillit/core/_execution_marker.py new file mode 100644 index 0000000000..a8c09d8674 --- /dev/null +++ b/src/autoskillit/core/_execution_marker.py @@ -0,0 +1,92 @@ +"""Unified execution marker protocol for stale-detector suppression. + +Async context manager that writes a ``{label}-in-progress-{session_id}-{uuid}.marker`` +file, heartbeats its mtime, and deletes it on exit. Lives in ``core/`` (IL-0) so +both ``fleet/_api.py`` (IL-2) and ``server/tools/`` (IL-3) can import it without +violating layer constraints. +""" + +from __future__ import annotations + +import asyncio +import os +import uuid +from collections.abc import AsyncGenerator +from contextlib import asynccontextmanager +from pathlib import Path + +import anyio + +from .io import write_versioned_json +from .logging import get_logger + +logger = get_logger(__name__) + + +async def _touch_marker(marker_path: Path, interval: float) -> None: + try: + marker_path.touch() + except OSError: + logger.warning("execution_marker: touch failed %s", marker_path, exc_info=True) + while True: + await anyio.sleep(interval) + try: + marker_path.touch() + except OSError: + logger.warning("execution_marker: touch failed %s", marker_path, exc_info=True) + + +@asynccontextmanager +async def execution_marker( + marker_dir: Path | None, + session_id: str, + label: str, + heartbeat_interval: float = 30.0, +) -> AsyncGenerator[Path | None]: + """Write, heartbeat, and clean up an execution marker. + + Yields the marker ``Path`` on success, or ``None`` when ``marker_dir`` is + ``None`` or the initial write fails (suppression disabled, same semantics + as ``fleet/_api.py``'s dispatch marker). + """ + if marker_dir is None: + yield None + return + + marker_path = marker_dir / f"{label}-in-progress-{session_id}-{uuid.uuid4().hex[:8]}.marker" + try: + write_versioned_json( + marker_path, + { + "label": label, + "orchestrator_pid": os.getpid(), + "session_id": session_id, + }, + schema_version=1, + ) + except OSError: + logger.warning("execution_marker_write_failed", marker=str(marker_path), exc_info=True) + yield None + return + + hb_task: asyncio.Task[None] | None = None + try: + hb_task = asyncio.get_running_loop().create_task( + _touch_marker(marker_path, heartbeat_interval) + ) + yield marker_path + finally: + if hb_task is not None: + hb_task.cancel() + try: + await hb_task + except asyncio.CancelledError: + pass + except Exception: + logger.warning("execution_marker: heartbeat failed", exc_info=True) + try: + marker_path.unlink(missing_ok=True) + except OSError: + logger.warning( + "execution_marker_unlink_failed", marker=str(marker_path), exc_info=True + ) diff --git a/src/autoskillit/core/types/_type_protocols_execution.py b/src/autoskillit/core/types/_type_protocols_execution.py index ba63e32356..d47aa5be20 100644 --- a/src/autoskillit/core/types/_type_protocols_execution.py +++ b/src/autoskillit/core/types/_type_protocols_execution.py @@ -67,6 +67,8 @@ async def run( resume_checkpoint: SessionCheckpoint | None = None, resume_message: str | None = None, backend_override: str | None = None, + marker_dir: Path | None = None, + caller_session_id: str | None = None, ) -> SkillResult: ... async def dispatch_food_truck( diff --git a/src/autoskillit/core/types/_type_subprocess.py b/src/autoskillit/core/types/_type_subprocess.py index 9d757ac851..d7b9bfbb08 100644 --- a/src/autoskillit/core/types/_type_subprocess.py +++ b/src/autoskillit/core/types/_type_subprocess.py @@ -130,14 +130,14 @@ class SubprocessRunner(Protocol): Parameters ---------- marker_dir : Path | None - Directory containing ``dispatch-in-progress-{session_id}-*.marker`` files. - When non-None, the session log monitor checks for active dispatch markers + Directory containing ``*-in-progress-{session_id}-*.marker`` files. + When non-None, the session log monitor checks for active execution markers before issuing stale-kill signals, suppressing kills while a fleet dispatch - is in progress. Default ``None`` (no suppression). + or run_skill call is in progress. Default ``None`` (no suppression). session_id : str | None - Caller's session identity, used to scope dispatch-marker glob patterns to - the originating fleet session. Threaded from fleet dispatch through headless - execution to ``_session_log_monitor``'s ``caller_session_id`` parameter. + Caller's session identity, used to scope execution-marker glob patterns to + the originating session. Threaded from fleet dispatch / run_skill through + headless execution to ``_session_log_monitor``'s ``caller_session_id`` parameter. Default ``None`` (match any marker). """ diff --git a/src/autoskillit/execution/headless/__init__.py b/src/autoskillit/execution/headless/__init__.py index 5891bd4638..4c4aa288b5 100644 --- a/src/autoskillit/execution/headless/__init__.py +++ b/src/autoskillit/execution/headless/__init__.py @@ -135,6 +135,8 @@ async def run_headless_core( resume_checkpoint: SessionCheckpoint | None = None, resume_message: str | None = None, backend_override: str | None = None, + marker_dir: Path | None = None, + caller_session_id: str | None = None, ) -> SkillResult: """Shared headless runner used by run_skill. @@ -235,6 +237,8 @@ async def run_headless_core( step_backend=step_backend, model_identifier=resolved_model or "", profile_name=profile_name, + marker_dir=marker_dir, + session_id=caller_session_id, ) @@ -277,6 +281,8 @@ async def run( resume_checkpoint: SessionCheckpoint | None = None, resume_message: str | None = None, backend_override: str | None = None, + marker_dir: Path | None = None, + caller_session_id: str | None = None, ) -> SkillResult: cfg = self._ctx.config.run_skill effective_timeout = timeout if timeout is not None else cfg.timeout @@ -313,6 +319,8 @@ async def run( resume_checkpoint=resume_checkpoint, resume_message=resume_message, backend_override=backend_override, + marker_dir=marker_dir, + caller_session_id=caller_session_id, ) async def dispatch_food_truck( diff --git a/src/autoskillit/execution/process/__init__.py b/src/autoskillit/execution/process/__init__.py index b22a641d7e..9cbfbe86f1 100644 --- a/src/autoskillit/execution/process/__init__.py +++ b/src/autoskillit/execution/process/__init__.py @@ -46,7 +46,7 @@ from autoskillit.execution.process._process_monitor import ( _has_active_api_connection, _has_active_child_processes, - _has_active_dispatch_marker, + _has_active_execution_marker, _heartbeat, _session_log_monitor, ) @@ -84,7 +84,7 @@ "RaceSignals", "_has_active_api_connection", "_has_active_child_processes", - "_has_active_dispatch_marker", + "_has_active_execution_marker", "_heartbeat", "_jsonl_contains_marker", "_jsonl_has_record_type", diff --git a/src/autoskillit/execution/process/_process_monitor.py b/src/autoskillit/execution/process/_process_monitor.py index b0ad8b378f..b95b9a4315 100644 --- a/src/autoskillit/execution/process/_process_monitor.py +++ b/src/autoskillit/execution/process/_process_monitor.py @@ -156,18 +156,18 @@ def _has_active_child_processes(pid: int) -> bool: return active -def _has_active_dispatch_marker( +def _has_active_execution_marker( marker_dir: Path, session_id: str | None = None, max_marker_age: float = 60.0, ) -> bool: - """Return True if any dispatch-in-progress marker was touched within max_marker_age seconds.""" + """Return True if any execution-in-progress marker was touched within max_marker_age secs.""" try: now = time.time() pattern = ( - f"dispatch-in-progress-{session_id}-*.marker" + f"*-in-progress-{session_id}-*.marker" if session_id is not None - else "dispatch-in-progress-*.marker" + else "*-in-progress-*.marker" ) for p in marker_dir.glob(pattern): try: @@ -390,7 +390,7 @@ async def _session_log_monitor( elapsed, pid, ) - elif marker_dir is not None and _has_active_dispatch_marker( + elif marker_dir is not None and _has_active_execution_marker( marker_dir, session_id=caller_session_id ): if suppression_start is None: diff --git a/src/autoskillit/execution/process/_process_race.py b/src/autoskillit/execution/process/_process_race.py index b40fc70ea3..3c32e050d1 100644 --- a/src/autoskillit/execution/process/_process_race.py +++ b/src/autoskillit/execution/process/_process_race.py @@ -14,7 +14,7 @@ from autoskillit.execution.process._process_monitor import ( _has_active_api_connection, _has_active_child_processes, - _has_active_dispatch_marker, + _has_active_execution_marker, _heartbeat, _session_log_monitor, ) @@ -189,7 +189,7 @@ async def _watch_stdout_idle( last_growth_time = _time.monotonic() suppression_start_marker = None elif _time.monotonic() - last_growth_time >= idle_output_timeout: - if marker_dir is not None and _has_active_dispatch_marker( + if marker_dir is not None and _has_active_execution_marker( marker_dir, session_id=session_id ): now = _time.monotonic() @@ -240,7 +240,7 @@ async def _watch_child_activity( """Extend the wall-clock CancelScope.deadline when child processes are active. Polls _has_active_child_processes, _has_active_api_connection, and - _has_active_dispatch_marker every _poll_interval seconds. When any + _has_active_execution_marker every _poll_interval seconds. When any returns True, pushes timeout_scope.deadline forward (up to max_extension_seconds beyond the original deadline). @@ -267,7 +267,7 @@ async def _watch_child_activity( or _has_active_api_connection(pid) or ( marker_dir is not None - and _has_active_dispatch_marker(marker_dir, session_id=session_id) + and _has_active_execution_marker(marker_dir, session_id=session_id) ) ) if not active: diff --git a/src/autoskillit/fleet/__init__.py b/src/autoskillit/fleet/__init__.py index b54e0990c7..2a4c23287a 100644 --- a/src/autoskillit/fleet/__init__.py +++ b/src/autoskillit/fleet/__init__.py @@ -63,8 +63,10 @@ from .state_recovery import ( classify_stale_dispatch, derive_orchestrator_resume_spec, + find_completed_dispatch, find_dispatch_for_issue, has_blocking_dispatch, + has_completed_dispatch, ) from .state_types import ( _INFRASTRUCTURE_FAILURE_REASONS, # noqa: F401 @@ -125,7 +127,9 @@ "GateRecordResult", "append_dispatch_record", "build_protected_campaign_ids", + "find_completed_dispatch", "has_blocking_dispatch", + "has_completed_dispatch", "has_failed_dispatch", "record_gate_outcome", "mark_dispatch_interrupted", diff --git a/src/autoskillit/fleet/_api.py b/src/autoskillit/fleet/_api.py index d3407b5aa8..032352a3b5 100644 --- a/src/autoskillit/fleet/_api.py +++ b/src/autoskillit/fleet/_api.py @@ -3,8 +3,6 @@ from __future__ import annotations import asyncio -import functools -import os import time from collections.abc import Callable from pathlib import Path @@ -23,7 +21,6 @@ claude_code_project_dir, get_logger, truncate_text, - write_versioned_json, ) from autoskillit.fleet._capture import _extract_captures, _normalize_capture_spec from autoskillit.fleet._expressions import _CAMPAIGN_REF_RE, _interpolate_campaign_refs @@ -38,6 +35,7 @@ if TYPE_CHECKING: from autoskillit.fleet.sidecar import IssueSidecarEntry + from autoskillit.fleet.state import DispatchRecord, DispatchStateHandle from autoskillit.pipeline.context import ToolContext logger = get_logger(__name__) @@ -106,24 +104,6 @@ def _post_dispatch_cleanup( ) -async def _touch_dispatch_marker( - marker_path: Path, interval: float = 30.0, trigger: anyio.Event | None = None -) -> None: - """Periodically touch marker_path to refresh mtime; runs until trigger is set.""" - try: - marker_path.touch() - except OSError: - logger.warning("_touch_dispatch_marker: failed to touch %s", marker_path, exc_info=True) - while trigger is None or not trigger.is_set(): - await anyio.sleep(interval) - try: - marker_path.touch() - except OSError: - logger.warning( - "_touch_dispatch_marker: failed to touch %s", marker_path, exc_info=True - ) - - async def execute_dispatch( tool_ctx: ToolContext, recipe: str, @@ -232,6 +212,24 @@ def _reject(error_code: FleetErrorCode, message: str, **kwargs: Any) -> Dispatch lock.release() +def _build_success_short_circuit( + record: DispatchRecord, + handle: DispatchStateHandle, +) -> DispatchResult: + """Return a DispatchResult that mirrors a prior succeeded dispatch without re-launching.""" + return DispatchResult( + outcome=DispatchCompleted( + success=True, + dispatch_status=DispatchStatus.SUCCESS, + dispatch_id=record.dispatch_id, + dispatched_session_id=record.dispatched_session_id, + reason=record.reason, + token_usage=dict(record.token_usage), + ), + per_dispatch_state_path=handle.state_path, + ) + + async def _run_dispatch( tool_ctx: ToolContext, recipe: str, @@ -361,28 +359,43 @@ async def _run_dispatch( dispatches_dir.mkdir(parents=True, exist_ok=True) campaign_id = tool_ctx.kitchen_id + recipe_snapshot = { + "recipe_name": recipe_obj.name, + "recipe_path": str(recipe_obj.path), + "recipe_version": recipe_obj.recipe_version or "", + "content_hash": recipe_obj.content_hash or "", + "effective_ingredients": dict(effective_ingredients), + } prior_session_chain: list[str] = [] prior_dispatched_session_id = "" if resume_session_id and prior_dispatch_id: - handle = DispatchStateHandle.open_continued(dispatches_dir, prior_dispatch_id) try: + handle = DispatchStateHandle.open_continued(dispatches_dir, prior_dispatch_id) prior_state = read_state(handle.state_path) if prior_state: for d in prior_state.dispatches: if d.name == effective_name: + if d.status == DispatchStatus.SUCCESS: + logger.info( + "resume_skipped_prior_success", + dispatch_name=effective_name, + prior_dispatch_id=prior_dispatch_id, + ) + return _build_success_short_circuit(d, handle) prior_session_chain = list(d.session_chain) prior_dispatched_session_id = d.dispatched_session_id break except (OSError, ValueError, KeyError, TypeError): logger.warning("failed to read prior session chain from state", exc_info=True) + handle = DispatchStateHandle.create_fresh( + dispatches_dir, + campaign_id, + effective_name, + "", + [DispatchRecord(name=effective_name, caller_session_id=caller_session_id)], + recipe_snapshot, + ) else: - recipe_snapshot = { - "recipe_name": recipe_obj.name, - "recipe_path": str(recipe_obj.path), - "recipe_version": recipe_obj.recipe_version or "", - "content_hash": recipe_obj.content_hash or "", - "effective_ingredients": dict(effective_ingredients), - } handle = DispatchStateHandle.create_fresh( dispatches_dir, campaign_id, @@ -572,71 +585,50 @@ def _on_spawn(pid: int, ticks: int) -> None: ) marker_dir: Path | None = None - marker_path: Path | None = None try: marker_dir = claude_code_project_dir(str(tool_ctx.project_dir)) except OSError: pass - if marker_dir is not None: - marker_path = marker_dir / f"dispatch-in-progress-{caller_session_id}-{dispatch_id}.marker" - try: - write_versioned_json( - marker_path, - { - "dispatch_id": dispatch_id, - "orchestrator_pid": os.getpid(), - "session_id": caller_session_id, - }, - schema_version=1, - ) - except OSError: - logger.warning("dispatch_marker_write_failed", marker=str(marker_path), exc_info=True) - marker_dir = None - marker_path = None + from autoskillit.core import execution_marker # noqa: PLC0415 _dispatch_completed_normally = False - _hb_trigger = anyio.Event() try: - async with anyio.create_task_group() as tg: - if marker_path is not None: - tg.start_soon( - functools.partial(_touch_dispatch_marker, marker_path, trigger=_hb_trigger) - ) - try: - skill_result = await tool_ctx.executor.dispatch_food_truck( - orchestrator_prompt=prompt, - cwd=str(tool_ctx.project_dir), - completion_marker=completion_marker, - prior_completion_markers=prior_completion_markers, - resume_session_id=resume_session_id, - resume_checkpoint=resume_checkpoint, - kitchen_id=tool_ctx.kitchen_id, - order_id=dispatch_id, - campaign_id=campaign_id, - dispatch_id=dispatch_id, - caller_session_id=caller_session_id, - project_dir=str(tool_ctx.project_dir), - marker_dir=marker_dir, - session_id=caller_session_id, - timeout=resolved_timeout, - idle_output_timeout=float(idle_output_timeout) - if idle_output_timeout is not None - else None, - env_extras={ - "AUTOSKILLIT_PROJECT_DIR": str(tool_ctx.project_dir), - "AUTOSKILLIT_CAMPAIGN_ID": campaign_id, - "AUTOSKILLIT_DISPATCH_ID": dispatch_id, - "AUTOSKILLIT_SESSION_DEADLINE": str(started_at + resolved_timeout), - }, - requires_packs=list(full_recipe.requires_packs) or ["kitchen-core"], - on_spawn=_on_spawn, - sentinel_contract=sentinel_contract, - resume_message=resume_message, - ) - finally: - _hb_trigger.set() - tg.cancel_scope.cancel() + async with execution_marker( + marker_dir, + caller_session_id, + "dispatch", + ): + skill_result = await tool_ctx.executor.dispatch_food_truck( + orchestrator_prompt=prompt, + cwd=str(tool_ctx.project_dir), + completion_marker=completion_marker, + prior_completion_markers=prior_completion_markers, + resume_session_id=resume_session_id, + resume_checkpoint=resume_checkpoint, + kitchen_id=tool_ctx.kitchen_id, + order_id=dispatch_id, + campaign_id=campaign_id, + dispatch_id=dispatch_id, + caller_session_id=caller_session_id, + project_dir=str(tool_ctx.project_dir), + marker_dir=marker_dir, + session_id=caller_session_id, + timeout=resolved_timeout, + idle_output_timeout=float(idle_output_timeout) + if idle_output_timeout is not None + else None, + env_extras={ + "AUTOSKILLIT_PROJECT_DIR": str(tool_ctx.project_dir), + "AUTOSKILLIT_CAMPAIGN_ID": campaign_id, + "AUTOSKILLIT_DISPATCH_ID": dispatch_id, + "AUTOSKILLIT_SESSION_DEADLINE": str(started_at + resolved_timeout), + }, + requires_packs=list(full_recipe.requires_packs) or ["kitchen-core"], + on_spawn=_on_spawn, + sentinel_contract=sentinel_contract, + resume_message=resume_message, + ) ended_at = time.time() _dispatch_completed_normally = True @@ -659,13 +651,6 @@ def _on_spawn(pid: int, ticks: int) -> None: ) raise finally: - if marker_path is not None: - try: - marker_path.unlink(missing_ok=True) - except OSError: - logger.warning( - "dispatch_marker_unlink_failed", marker=str(marker_path), exc_info=True - ) if not _dispatch_completed_normally: from autoskillit.fleet._label_cleanup import cleanup_orphaned_labels # noqa: PLC0415 diff --git a/src/autoskillit/fleet/state.py b/src/autoskillit/fleet/state.py index 0849f68fd8..bb5484d587 100644 --- a/src/autoskillit/fleet/state.py +++ b/src/autoskillit/fleet/state.py @@ -25,7 +25,9 @@ ) from autoskillit.fleet.state_gates import record_gate_outcome from autoskillit.fleet.state_recovery import ( + find_completed_dispatch, has_blocking_dispatch, + has_completed_dispatch, has_failed_dispatch, resume_campaign_from_state, ) @@ -56,7 +58,9 @@ # re-exported from state_gates "record_gate_outcome", # re-exported from state_recovery + "find_completed_dispatch", "has_blocking_dispatch", + "has_completed_dispatch", "has_failed_dispatch", "resume_campaign_from_state", # re-exported from state_types diff --git a/src/autoskillit/fleet/state_recovery.py b/src/autoskillit/fleet/state_recovery.py index c718e9537f..1827f12072 100644 --- a/src/autoskillit/fleet/state_recovery.py +++ b/src/autoskillit/fleet/state_recovery.py @@ -30,8 +30,10 @@ __all__ = [ "classify_stale_dispatch", "derive_orchestrator_resume_spec", + "find_completed_dispatch", "find_dispatch_for_issue", "has_blocking_dispatch", + "has_completed_dispatch", "has_failed_dispatch", "resume_campaign_from_state", ] @@ -116,6 +118,33 @@ def has_blocking_dispatch(state_path: Path) -> bool: return False +def has_completed_dispatch(state_path: Path, dispatch_name: str) -> bool: + """Return True if the named dispatch already has SUCCESS status. + + Returns False when the file is missing or corrupted (fail-open). + """ + return find_completed_dispatch(state_path, dispatch_name) is not None + + +def find_completed_dispatch(state_path: Path, dispatch_name: str) -> DispatchRecord | None: + """Return the DispatchRecord if the named dispatch has SUCCESS status. + + Returns None when the file is missing, corrupted, or no matching SUCCESS + record exists (fail-open). + """ + from autoskillit.fleet.state import read_state # noqa: PLC0415 + + if not state_path.exists(): + return None + state = read_state(state_path) + if state is None: + return None + for d in state.dispatches: + if d.name == dispatch_name and d.status == DispatchStatus.SUCCESS: + return d + return None + + def _is_abandon_kill_metadata(retry_reason: str, infra_exit_category: str) -> bool: """Return True when stored kill metadata indicates resume would be futile.""" if retry_reason in _ABANDON_REASONS: diff --git a/src/autoskillit/recipes/implementation.json b/src/autoskillit/recipes/implementation.json index c620c5df3d..727657d4ae 100644 --- a/src/autoskillit/recipes/implementation.json +++ b/src/autoskillit/recipes/implementation.json @@ -1427,6 +1427,7 @@ "description": "Run post-pipeline diagnostic analysis on session logs (no-CI path)", "tool": "run_skill", "model": "", + "stale_threshold": 2400, "optional": true, "skip_when_false": "inputs.post_run_diagnostics", "with": { @@ -2337,6 +2338,7 @@ "description": "Run post-pipeline diagnostic analysis on session logs (unconfirmed path)", "tool": "run_skill", "model": "", + "stale_threshold": 2400, "optional": true, "skip_when_false": "inputs.post_run_diagnostics", "with": { @@ -2391,6 +2393,7 @@ "description": "Run post-pipeline diagnostic analysis on session logs", "tool": "run_skill", "model": "", + "stale_threshold": 2400, "optional": true, "skip_when_false": "inputs.post_run_diagnostics", "with": { @@ -2407,6 +2410,7 @@ "description": "Run post-pipeline diagnostic analysis on session logs (error path)", "tool": "run_skill", "model": "", + "stale_threshold": 2400, "optional": true, "skip_when_false": "inputs.post_run_diagnostics", "with": { diff --git a/src/autoskillit/recipes/implementation.yaml b/src/autoskillit/recipes/implementation.yaml index b3fe6ac0f1..70e60a522d 100644 --- a/src/autoskillit/recipes/implementation.yaml +++ b/src/autoskillit/recipes/implementation.yaml @@ -1318,6 +1318,7 @@ steps: description: Run post-pipeline diagnostic analysis on session logs (no-CI path) tool: run_skill model: '' + stale_threshold: 2400 optional: true skip_when_false: inputs.post_run_diagnostics with: @@ -2150,6 +2151,7 @@ steps: path) tool: run_skill model: '' + stale_threshold: 2400 optional: true skip_when_false: inputs.post_run_diagnostics with: @@ -2205,6 +2207,7 @@ steps: description: Run post-pipeline diagnostic analysis on session logs tool: run_skill model: '' + stale_threshold: 2400 optional: true skip_when_false: inputs.post_run_diagnostics with: @@ -2224,6 +2227,7 @@ steps: description: Run post-pipeline diagnostic analysis on session logs (error path) tool: run_skill model: '' + stale_threshold: 2400 optional: true skip_when_false: inputs.post_run_diagnostics with: diff --git a/src/autoskillit/recipes/merge-prs.json b/src/autoskillit/recipes/merge-prs.json index a0ccdc330e..e8c5e4ccd0 100644 --- a/src/autoskillit/recipes/merge-prs.json +++ b/src/autoskillit/recipes/merge-prs.json @@ -1698,6 +1698,7 @@ "description": "Run post-pipeline diagnostic analysis on session logs", "tool": "run_skill", "model": "", + "stale_threshold": 2400, "optional": true, "skip_when_false": "inputs.post_run_diagnostics", "with": { @@ -1714,6 +1715,7 @@ "description": "Run post-pipeline diagnostic analysis on session logs (error path)", "tool": "run_skill", "model": "", + "stale_threshold": 2400, "optional": true, "skip_when_false": "inputs.post_run_diagnostics", "with": { diff --git a/src/autoskillit/recipes/merge-prs.yaml b/src/autoskillit/recipes/merge-prs.yaml index 8c6d95fd53..bc6a69f931 100644 --- a/src/autoskillit/recipes/merge-prs.yaml +++ b/src/autoskillit/recipes/merge-prs.yaml @@ -1658,6 +1658,7 @@ steps: description: Run post-pipeline diagnostic analysis on session logs tool: run_skill model: '' + stale_threshold: 2400 optional: true skip_when_false: inputs.post_run_diagnostics with: @@ -1677,6 +1678,7 @@ steps: description: Run post-pipeline diagnostic analysis on session logs (error path) tool: run_skill model: '' + stale_threshold: 2400 optional: true skip_when_false: inputs.post_run_diagnostics with: diff --git a/src/autoskillit/recipes/remediation.json b/src/autoskillit/recipes/remediation.json index 1c6df0bcf5..ccef31e9e9 100644 --- a/src/autoskillit/recipes/remediation.json +++ b/src/autoskillit/recipes/remediation.json @@ -1547,6 +1547,7 @@ "description": "Run post-pipeline diagnostic analysis on session logs (no-CI path)", "tool": "run_skill", "model": "", + "stale_threshold": 2400, "optional": true, "skip_when_false": "inputs.post_run_diagnostics", "with": { @@ -2429,6 +2430,7 @@ "description": "Run post-pipeline diagnostic analysis on session logs (unconfirmed path)", "tool": "run_skill", "model": "", + "stale_threshold": 2400, "optional": true, "skip_when_false": "inputs.post_run_diagnostics", "with": { @@ -2511,6 +2513,7 @@ "description": "Run post-pipeline diagnostic analysis on session logs", "tool": "run_skill", "model": "", + "stale_threshold": 2400, "optional": true, "skip_when_false": "inputs.post_run_diagnostics", "with": { @@ -2527,6 +2530,7 @@ "description": "Run post-pipeline diagnostic analysis on session logs (error path)", "tool": "run_skill", "model": "", + "stale_threshold": 2400, "optional": true, "skip_when_false": "inputs.post_run_diagnostics", "with": { diff --git a/src/autoskillit/recipes/remediation.yaml b/src/autoskillit/recipes/remediation.yaml index d76ce6be16..16644db945 100644 --- a/src/autoskillit/recipes/remediation.yaml +++ b/src/autoskillit/recipes/remediation.yaml @@ -1404,6 +1404,7 @@ steps: description: Run post-pipeline diagnostic analysis on session logs (no-CI path) tool: run_skill model: '' + stale_threshold: 2400 optional: true skip_when_false: inputs.post_run_diagnostics with: @@ -2204,6 +2205,7 @@ steps: path) tool: run_skill model: '' + stale_threshold: 2400 optional: true skip_when_false: inputs.post_run_diagnostics with: @@ -2291,6 +2293,7 @@ steps: description: Run post-pipeline diagnostic analysis on session logs tool: run_skill model: '' + stale_threshold: 2400 optional: true skip_when_false: inputs.post_run_diagnostics with: @@ -2310,6 +2313,7 @@ steps: description: Run post-pipeline diagnostic analysis on session logs (error path) tool: run_skill model: '' + stale_threshold: 2400 optional: true skip_when_false: inputs.post_run_diagnostics with: diff --git a/src/autoskillit/server/tools/tools_execution.py b/src/autoskillit/server/tools/tools_execution.py index 164d5b6b5e..bec0152e5e 100644 --- a/src/autoskillit/server/tools/tools_execution.py +++ b/src/autoskillit/server/tools/tools_execution.py @@ -671,39 +671,59 @@ async def run_skill( _sn_token = _current_step_name.set(_canonical_step_name(step_name)) _oid_token = _current_order_id.set(effective_order_id) + from autoskillit.core import ( # noqa: PLC0415 + claude_code_project_dir, + execution_marker, + find_caller_session_id, + ) + + _marker_dir: Path | None = None + try: + _marker_dir = claude_code_project_dir(str(tool_ctx.project_dir)) + except OSError: + pass + _orchestrator_sid = find_caller_session_id(project_dir=tool_ctx.project_dir) + _start = time.monotonic() try: - skill_result = await tool_ctx.executor.run( - resolved_command, - cwd, - model=effective_model, - add_dirs=skill_add_dirs, - step_name=step_name, - kitchen_id=tool_ctx.kitchen_id, - order_id=effective_order_id, - expected_output_patterns=expected_output_patterns, - write_behavior=write_spec, - stale_threshold=float(stale_threshold) - if stale_threshold is not None - else None, - idle_output_timeout=float(idle_output_timeout) - if idle_output_timeout is not None - else None, - completion_marker=invocation_marker, - recipe_name=tool_ctx.recipe_name, - recipe_content_hash=tool_ctx.recipe_content_hash, - recipe_composite_hash=tool_ctx.recipe_composite_hash, - recipe_version=tool_ctx.recipe_version, - allowed_write_prefix=allowed_write_prefix, - allowed_write_prefixes=allowed_write_prefixes, - readonly_skill=is_read_only, - write_watch_dirs=write_watch_dirs, - provider_extras=provider_extras, - profile_name=profile_name_out, - provider_name=profile_name_out, - backend_override=backend_override, - resume_session_id=resume_session_id, - ) + async with execution_marker( + _marker_dir, + _orchestrator_sid, + "run-skill", + ): + skill_result = await tool_ctx.executor.run( + resolved_command, + cwd, + model=effective_model, + add_dirs=skill_add_dirs, + step_name=step_name, + kitchen_id=tool_ctx.kitchen_id, + order_id=effective_order_id, + expected_output_patterns=expected_output_patterns, + write_behavior=write_spec, + stale_threshold=float(stale_threshold) + if stale_threshold is not None + else None, + idle_output_timeout=float(idle_output_timeout) + if idle_output_timeout is not None + else None, + completion_marker=invocation_marker, + recipe_name=tool_ctx.recipe_name, + recipe_content_hash=tool_ctx.recipe_content_hash, + recipe_composite_hash=tool_ctx.recipe_composite_hash, + recipe_version=tool_ctx.recipe_version, + allowed_write_prefix=allowed_write_prefix, + allowed_write_prefixes=allowed_write_prefixes, + readonly_skill=is_read_only, + write_watch_dirs=write_watch_dirs, + provider_extras=provider_extras, + profile_name=profile_name_out, + provider_name=profile_name_out, + backend_override=backend_override, + resume_session_id=resume_session_id, + marker_dir=_marker_dir, + caller_session_id=_orchestrator_sid, + ) if skill_result.success: tool_ctx.audit.record_success(skill_command) _clear_run_skill_state(tool_ctx.project_dir) diff --git a/src/autoskillit/server/tools/tools_fleet_dispatch.py b/src/autoskillit/server/tools/tools_fleet_dispatch.py index 0f0be53a0a..c19815086e 100755 --- a/src/autoskillit/server/tools/tools_fleet_dispatch.py +++ b/src/autoskillit/server/tools/tools_fleet_dispatch.py @@ -275,6 +275,19 @@ async def dispatch_food_truck( caller_session_id = find_caller_session_id(project_dir=tool_ctx.project_dir) effective_name = dispatch_name or recipe + if campaign_state_path_str: + from autoskillit.fleet import find_completed_dispatch # noqa: PLC0415 + + prior_record = find_completed_dispatch(Path(campaign_state_path_str), effective_name) + if prior_record is not None: + return DispatchCompleted( + success=True, + dispatch_status=DispatchStatus.SUCCESS, + dispatch_id=prior_record.dispatch_id, + dispatched_session_id=prior_record.dispatched_session_id, + reason="prior dispatch already succeeded", + ).to_envelope() + if skip_when: dispatches_dir = tool_ctx.temp_dir / "dispatches" accumulated_captures = read_all_campaign_captures(dispatches_dir, tool_ctx.kitchen_id) diff --git a/src/autoskillit/skills_extended/analyze-pipeline-health/SKILL.md b/src/autoskillit/skills_extended/analyze-pipeline-health/SKILL.md index c150a262a1..ebe24edbf6 100644 --- a/src/autoskillit/skills_extended/analyze-pipeline-health/SKILL.md +++ b/src/autoskillit/skills_extended/analyze-pipeline-health/SKILL.md @@ -36,6 +36,7 @@ Coordinator skill that reads session logs from a pipeline run, groups them by st - Filter sessions.jsonl by kitchen_id to scope to this pipeline run - Spawn scanner subagents in parallel (one per step group) - Use model: "haiku" for scanner subagents +- Cap each scanner's investigation budget: set `maxTurns` to the limit in the agent definition and include a wall-clock soft-deadline instruction in the scanner prompt (e.g. "complete your analysis within 15 minutes; report partial findings if you reach the limit") - Report "no issues found" clearly when the pipeline is clean - Issue all Task calls in a single message to maximize parallelism diff --git a/tests/_test_filter.py b/tests/_test_filter.py index 4e09f7dd7e..e4cccf4d67 100644 --- a/tests/_test_filter.py +++ b/tests/_test_filter.py @@ -244,6 +244,7 @@ class ImportContext(enum.StrEnum): ), "_type_exceptions": frozenset({"core", "fleet", "recipe", "server"}), "_step_context": frozenset({"core", "execution", "pipeline", "server"}), + "_execution_marker": frozenset({"core", "execution", "fleet", "server"}), } # Narrow per-module cascade for execution/. Modules not listed here fall through diff --git a/tests/arch/CLAUDE.md b/tests/arch/CLAUDE.md index 058ca207e7..a086b53fbe 100644 --- a/tests/arch/CLAUDE.md +++ b/tests/arch/CLAUDE.md @@ -74,7 +74,7 @@ AST enforcement, sub-package layer contracts, and architectural invariant tests. | `test_variadic_ordering.py` | Architectural invariant: positional initial_prompt must precede all variadic CLI flags in build_interactive_cmd | | `test_flag_metadata_coverage.py` | Closed categorization guard: every ClaudeFlags member must appear in VARIADIC_CLAUDE_FLAGS or NON_VARIADIC_CLAUDE_FLAGS | | `test_interactive_ordering_gate.py` | AST guard: _session_launch.py and _cook.py must import and call assert_interactive_ordering before subprocess invocation | -| `test_watcher_signal_consistency.py` | Structural guard: all process watchers must call `_has_active_dispatch_marker` | +| `test_watcher_signal_consistency.py` | Structural guard: all process watchers must call `_has_active_execution_marker` | | `test_write_restriction_coverage.py` | Architectural invariant: skills with prose write restrictions in NEVER blocks have runtime enforcement (read_only, output_dir, or allowlist) | | `test_model_identity_contract.py` | AST guard: detect_model_drift must use normalize_model_id and _models_match — raw alias/full-ID comparison is a false-positive source | | `test_helpers_exports.py` | Asserts shared test helpers export required symbols (strip_markdown_code_regions) | diff --git a/tests/arch/test_execution_source_split.py b/tests/arch/test_execution_source_split.py index 8037854206..4bd55df122 100644 --- a/tests/arch/test_execution_source_split.py +++ b/tests/arch/test_execution_source_split.py @@ -16,7 +16,7 @@ "_headless_execute.py", ] HEADLESS_SIZE_BUDGETS = { - "headless/__init__.py": 460, + "headless/__init__.py": 465, "headless/_headless_helpers.py": 220, "headless/_headless_execute.py": 595, "headless/_headless_recovery.py": 366, diff --git a/tests/arch/test_watcher_signal_consistency.py b/tests/arch/test_watcher_signal_consistency.py index c623ad89dc..ece69e9f0e 100644 --- a/tests/arch/test_watcher_signal_consistency.py +++ b/tests/arch/test_watcher_signal_consistency.py @@ -1,4 +1,4 @@ -"""Structural guard: all process watchers must call _has_active_dispatch_marker.""" +"""Structural guard: all process watchers must call _has_active_execution_marker.""" from __future__ import annotations @@ -12,7 +12,7 @@ _PROCESS_RACE = Path("src/autoskillit/execution/process/_process_race.py") _PROCESS_MONITOR = Path("src/autoskillit/execution/process/_process_monitor.py") -_WATCHERS_THAT_MUST_CHECK_DISPATCH_MARKER = frozenset( +_WATCHERS_THAT_MUST_CHECK_EXECUTION_MARKER = frozenset( { "_watch_child_activity", "_session_log_monitor", @@ -37,13 +37,15 @@ def _functions_calling_predicate(source_path: Path, predicate: str) -> set[str]: return result -@pytest.mark.parametrize("watcher", sorted(_WATCHERS_THAT_MUST_CHECK_DISPATCH_MARKER)) -def test_watcher_calls_has_active_dispatch_marker(watcher: str) -> None: - """Each watcher in the set must call _has_active_dispatch_marker.""" - callers_race = _functions_calling_predicate(_PROCESS_RACE, "_has_active_dispatch_marker") - callers_monitor = _functions_calling_predicate(_PROCESS_MONITOR, "_has_active_dispatch_marker") +@pytest.mark.parametrize("watcher", sorted(_WATCHERS_THAT_MUST_CHECK_EXECUTION_MARKER)) +def test_watcher_calls_has_active_execution_marker(watcher: str) -> None: + """Each watcher in the set must call _has_active_execution_marker.""" + callers_race = _functions_calling_predicate(_PROCESS_RACE, "_has_active_execution_marker") + callers_monitor = _functions_calling_predicate( + _PROCESS_MONITOR, "_has_active_execution_marker" + ) all_callers = callers_race | callers_monitor assert watcher in all_callers, ( - f"{watcher} does not call _has_active_dispatch_marker. " + f"{watcher} does not call _has_active_execution_marker. " f"Functions that do: {sorted(all_callers)}" ) diff --git a/tests/execution/test_headless_provider_forwarding.py b/tests/execution/test_headless_provider_forwarding.py index 6ca701cdbf..91cf5333d3 100644 --- a/tests/execution/test_headless_provider_forwarding.py +++ b/tests/execution/test_headless_provider_forwarding.py @@ -929,3 +929,86 @@ async def fake_runner(cmd, **kwargs): ) backend.stream_parser.assert_called_once_with(completion_marker="%%TEST_MARKER%%") + + +@pytest.mark.anyio +async def test_run_headless_core_forwards_marker_dir_and_caller_session_id( + minimal_ctx, tmp_path, monkeypatch +) -> None: + """run_headless_core passes marker_dir + caller_session_id to _execute_claude_headless.""" + from autoskillit.core import CmdSpec + from autoskillit.execution.headless import run_headless_core + + execute_kwargs: dict = {} + marker_dir = tmp_path / "markers" + marker_dir.mkdir() + + backend = _mock_backend() + backend.build_skill_session_cmd.return_value = CmdSpec( + cmd=("claude", "--print", "test"), env={} + ) + minimal_ctx.backend = backend + + async def fake_execute(spec, cwd, ctx, **kwargs): # noqa: ARG001 + execute_kwargs.update(kwargs) + return _STUB_RESULT + + monkeypatch.setattr("autoskillit.execution.headless._execute_claude_headless", fake_execute) + + await run_headless_core( + "/autoskillit:probe", + str(tmp_path), + minimal_ctx, + marker_dir=marker_dir, + caller_session_id="orchestrator-session-abc", + ) + + assert execute_kwargs.get("marker_dir") == marker_dir + assert execute_kwargs.get("session_id") == "orchestrator-session-abc" + + +@pytest.mark.anyio +async def test_default_executor_run_forwards_marker_dir_and_caller_session_id( + minimal_ctx, tmp_path, monkeypatch +) -> None: + """DefaultHeadlessExecutor.run() passes marker_dir + caller_session_id to run_headless_core.""" + import autoskillit.execution.headless as _headless_mod + from autoskillit.execution.headless import DefaultHeadlessExecutor + + captured: dict = {} + marker_dir = tmp_path / "markers" + marker_dir.mkdir() + + async def fake_core(skill_command, cwd, ctx, **kwargs): # noqa: ARG001 + captured.update(kwargs) + return _STUB_RESULT + + monkeypatch.setattr(_headless_mod, "run_headless_core", fake_core) + + executor = DefaultHeadlessExecutor(minimal_ctx) + await executor.run( + "/autoskillit:probe", + str(tmp_path), + marker_dir=marker_dir, + caller_session_id="orchestrator-session-xyz", + ) + + assert captured.get("marker_dir") == marker_dir + assert captured.get("caller_session_id") == "orchestrator-session-xyz" + + +def test_headless_executor_protocol_includes_marker_dir_params() -> None: + """HeadlessExecutor.run() Protocol includes marker_dir and caller_session_id.""" + import inspect + + from autoskillit.core.types import HeadlessExecutor + + sig = inspect.signature(HeadlessExecutor.run) + assert "marker_dir" in sig.parameters, ( + "marker_dir missing from HeadlessExecutor.run() signature" + ) + assert "caller_session_id" in sig.parameters, ( + "caller_session_id missing from HeadlessExecutor.run() signature" + ) + assert sig.parameters["marker_dir"].default is None + assert sig.parameters["caller_session_id"].default is None diff --git a/tests/execution/test_process_deadline_extension.py b/tests/execution/test_process_deadline_extension.py index 8eff2579f7..1f17059bcb 100644 --- a/tests/execution/test_process_deadline_extension.py +++ b/tests/execution/test_process_deadline_extension.py @@ -185,7 +185,7 @@ async def test_extends_deadline_when_dispatch_marker_active(monkeypatch, tmp_pat lambda pid: False, ) monkeypatch.setattr( - "autoskillit.execution.process._process_race._has_active_dispatch_marker", + "autoskillit.execution.process._process_race._has_active_execution_marker", lambda marker_dir, **kw: True, ) trigger = anyio.Event() @@ -228,7 +228,7 @@ async def test_no_extension_when_marker_inactive(monkeypatch, tmp_path) -> None: lambda pid: False, ) monkeypatch.setattr( - "autoskillit.execution.process._process_race._has_active_dispatch_marker", + "autoskillit.execution.process._process_race._has_active_execution_marker", lambda marker_dir, **kw: False, ) trigger = anyio.Event() diff --git a/tests/execution/test_process_heartbeat.py b/tests/execution/test_process_heartbeat.py index be26effa97..dab0b231a3 100644 --- a/tests/execution/test_process_heartbeat.py +++ b/tests/execution/test_process_heartbeat.py @@ -18,7 +18,7 @@ from autoskillit.execution.process import ( _has_active_api_connection, _has_active_child_processes, - _has_active_dispatch_marker, + _has_active_execution_marker, _heartbeat, _session_log_monitor, run_managed_async, @@ -750,19 +750,19 @@ async def test_stream_parser_skips_empty_result(self, tmp_path): ) -class TestHasActiveDispatchMarker: - """Unit tests for _has_active_dispatch_marker.""" +class TestHasActiveExecutionMarker: + """Unit tests for _has_active_execution_marker.""" def test_dispatch_marker_nonexistent_dir_returns_false(self, tmp_path): """Nonexistent marker directory returns False without raising.""" - result = _has_active_dispatch_marker(tmp_path / "nonexistent") + result = _has_active_execution_marker(tmp_path / "nonexistent") assert result is False def test_dispatch_marker_fresh_marker_returns_true(self, tmp_path): """A fresh marker file causes the function to return True.""" marker = tmp_path / "dispatch-in-progress-sess1-abc.marker" marker.touch() - result = _has_active_dispatch_marker(tmp_path) + result = _has_active_execution_marker(tmp_path) assert result is True def test_dispatch_marker_expired_marker_returns_false(self, tmp_path): @@ -772,15 +772,15 @@ def test_dispatch_marker_expired_marker_returns_false(self, tmp_path): old_time = time.time() - 120 _os.utime(marker, (old_time, old_time)) - result = _has_active_dispatch_marker(tmp_path, max_marker_age=60.0) + result = _has_active_execution_marker(tmp_path, max_marker_age=60.0) assert result is False def test_dispatch_marker_session_id_filters(self, tmp_path): """When session_id is provided, only markers matching that session are considered.""" (tmp_path / "dispatch-in-progress-abc-001.marker").touch() (tmp_path / "dispatch-in-progress-xyz-002.marker").touch() - result_matching = _has_active_dispatch_marker(tmp_path, session_id="abc") - result_non_matching = _has_active_dispatch_marker(tmp_path, session_id="def") + result_matching = _has_active_execution_marker(tmp_path, session_id="abc") + result_non_matching = _has_active_execution_marker(tmp_path, session_id="def") assert result_matching is True assert result_non_matching is False @@ -788,13 +788,13 @@ def test_dispatch_marker_none_session_id_matches_all(self, tmp_path): """session_id=None matches any dispatch-in-progress marker.""" marker = tmp_path / "dispatch-in-progress-xyz-001.marker" marker.touch() - result = _has_active_dispatch_marker(tmp_path, session_id=None) + result = _has_active_execution_marker(tmp_path, session_id=None) assert result is True def test_dispatch_marker_no_logger_calls(self): """The function body contains no logger.* attribute access calls.""" - source = inspect.getsource(_has_active_dispatch_marker) + source = inspect.getsource(_has_active_execution_marker) tree = ast.parse(source) for node in ast.walk(tree): if ( @@ -802,4 +802,21 @@ def test_dispatch_marker_no_logger_calls(self): and isinstance(node.value, ast.Name) and node.value.id == "logger" ): - pytest.fail(f"Found logger.{node.attr} in _has_active_dispatch_marker body") + pytest.fail(f"Found logger.{node.attr} in _has_active_execution_marker body") + + @pytest.mark.parametrize( + "prefix", + ["dispatch-in-progress", "run-skill-in-progress"], + ) + def test_execution_marker_matches_both_prefixes(self, tmp_path, prefix): + """Both dispatch-in-progress-* and run-skill-in-progress-* markers are matched.""" + marker = tmp_path / f"{prefix}-sess1-step.marker" + marker.touch() + result = _has_active_execution_marker(tmp_path) + assert result is True, f"{prefix}-* marker not matched by *-in-progress-* glob pattern" + + def test_wrong_session_id_gives_no_match(self, tmp_path): + """Marker for session-A does not match when globbing for session-B.""" + (tmp_path / "run-skill-in-progress-session-A-step.marker").touch() + assert _has_active_execution_marker(tmp_path, session_id="session-B") is False + assert _has_active_execution_marker(tmp_path, session_id="session-A") is True diff --git a/tests/execution/test_process_idle_watchdog.py b/tests/execution/test_process_idle_watchdog.py index 934a989c28..132cab9536 100644 --- a/tests/execution/test_process_idle_watchdog.py +++ b/tests/execution/test_process_idle_watchdog.py @@ -106,7 +106,7 @@ async def test_watch_stdout_idle_suppression_evaluated_fires_once_during_suppres await anyio.Path(stdout_file).write_bytes(b"initial output\n") monkeypatch.setattr( - "autoskillit.execution.process._process_race._has_active_dispatch_marker", + "autoskillit.execution.process._process_race._has_active_execution_marker", lambda marker_dir, session_id=None: True, ) @@ -286,7 +286,7 @@ async def test_watch_stdout_idle_suppressed_by_dispatch_marker( await anyio.Path(stdout_file).write_bytes(b"initial output\n") monkeypatch.setattr( - "autoskillit.execution.process._process_race._has_active_dispatch_marker", + "autoskillit.execution.process._process_race._has_active_execution_marker", lambda marker_dir, session_id=None: True, ) @@ -330,7 +330,7 @@ async def test_watch_stdout_idle_fires_when_suppression_cap_exceeded( await anyio.Path(stdout_file).write_bytes(b"initial output\n") monkeypatch.setattr( - "autoskillit.execution.process._process_race._has_active_dispatch_marker", + "autoskillit.execution.process._process_race._has_active_execution_marker", lambda marker_dir, session_id=None: True, ) @@ -390,7 +390,7 @@ async def test_watch_stdout_idle_suppression_timer_resets_on_growth( await anyio.Path(stdout_file).write_bytes(b"initial\n") monkeypatch.setattr( - "autoskillit.execution.process._process_race._has_active_dispatch_marker", + "autoskillit.execution.process._process_race._has_active_execution_marker", lambda marker_dir, session_id=None: True, ) @@ -433,7 +433,7 @@ async def test_watch_stdout_idle_emits_suppression_warning( await anyio.Path(stdout_file).write_bytes(b"initial output\n") monkeypatch.setattr( - "autoskillit.execution.process._process_race._has_active_dispatch_marker", + "autoskillit.execution.process._process_race._has_active_execution_marker", lambda marker_dir, session_id=None: True, ) @@ -483,7 +483,7 @@ async def test_watch_stdout_idle_marker_false_fires_immediately( await anyio.Path(stdout_file).write_bytes(b"initial output\n") monkeypatch.setattr( - "autoskillit.execution.process._process_race._has_active_dispatch_marker", + "autoskillit.execution.process._process_race._has_active_execution_marker", lambda marker_dir, session_id=None: False, ) @@ -515,7 +515,7 @@ async def test_watch_stdout_idle_dispatch_marker_suppresses_stall( ) -> None: """Active dispatch marker suppresses idle stall within the suppression window.""" monkeypatch.setattr( - "autoskillit.execution.process._process_race._has_active_dispatch_marker", + "autoskillit.execution.process._process_race._has_active_execution_marker", lambda marker_dir, session_id=None: True, ) stdout_file = tmp_path / "stdout.txt" @@ -606,7 +606,7 @@ async def test_watch_stdout_idle_marker_suppression_bounded( ) -> None: """Suppression cap exceeded — idle stall fires despite active marker.""" monkeypatch.setattr( - "autoskillit.execution.process._process_race._has_active_dispatch_marker", + "autoskillit.execution.process._process_race._has_active_execution_marker", lambda marker_dir, session_id=None: True, ) stdout_file = tmp_path / "stdout.txt" @@ -638,4 +638,57 @@ async def test_watch_stdout_idle_marker_suppression_bounded( assert elapsed < 5.0 +@pytest.mark.anyio +async def test_stdout_idle_NOT_fired_when_execution_marker_active( + tmp_path: anyio.Path, +) -> None: + """IDLE_STALL NOT fired when stdout silent but run-skill-in-progress marker is fresh. + + Failing before implementation: run-skill-in-progress-* marker is invisible to + dispatch-in-progress-* glob, so IDLE_STALL fires immediately. + After fix: marker matched by *-in-progress-* glob, stall suppressed. + """ + stdout_file = tmp_path / "stdout.txt" + await anyio.Path(stdout_file).write_bytes(b"initial output\n") + + marker_dir = tmp_path / "marker_dir" + await anyio.Path(marker_dir).mkdir() + marker_path = marker_dir / "run-skill-in-progress-caller-session-step1.marker" + await anyio.Path(marker_path).write_text("{}") + + acc = RaceAccumulator() + trigger = anyio.Event() + + async def touch_marker() -> None: + for _ in range(100): + await anyio.sleep(0.02) + try: + await anyio.Path(marker_path).touch() + except OSError: + break + + with anyio.move_on_after(0.5): + async with anyio.create_task_group() as tg: + tg.start_soon(touch_marker) + tg.start_soon( + functools.partial( + _watch_stdout_idle, + stdout_file, + 0.05, # idle_output_timeout — short to trigger quickly if not suppressed + acc, + trigger, + 0.02, # _poll_interval + marker_dir=marker_dir, + session_id="caller-session", + max_suppression_seconds=10.0, + ) + ) + await trigger.wait() + + assert not acc.idle_stall, ( + "IDLE_STALL fired even though run-skill execution marker was active. " + "The *-in-progress-* glob fix is needed." + ) + + # test write diff --git a/tests/execution/test_process_session_log_monitor.py b/tests/execution/test_process_session_log_monitor.py index 9937cb6295..7787bf80de 100644 --- a/tests/execution/test_process_session_log_monitor.py +++ b/tests/execution/test_process_session_log_monitor.py @@ -777,6 +777,102 @@ async def test_phase2_no_spurious_read_on_preexisting_content(self, tmp_path): assert result.status == ChannelBStatus.STALE +@pytest.mark.anyio +async def test_stale_NOT_fired_when_execution_marker_active(tmp_path): + """Stale NOT fired when JSONL silent but a run-skill-in-progress marker is active and fresh. + + This is a TDD guard for the execution marker blind spot: current code uses + dispatch-in-progress-* glob, so run-skill-in-progress-* markers are invisible. + After the fix (glob -> *-in-progress-*), this test must pass. + """ + import json + + session_file = tmp_path / "abc123.jsonl" + session_file.write_text( + json.dumps({"type": "assistant", "message": {"role": "assistant", "content": "working"}}) + + "\n" + ) + spawn_time = time.time() - 1 + + marker_dir = tmp_path / "marker_dir" + marker_dir.mkdir() + marker_path = marker_dir / "run-skill-in-progress-caller-session-step1.marker" + marker_path.write_text("{}") + + monitor_returned = anyio.Event() + monitor_status: list[object] = [] + + async def run_monitor() -> None: + result = await _session_log_monitor( + tmp_path, + "DONE", + stale_threshold=0.05, + spawn_time=spawn_time, + marker_dir=marker_dir, + caller_session_id="caller-session", + _phase1_poll=0.01, + _phase2_poll=0.05, + max_suppression_seconds=10.0, + ) + monitor_status.append(result.status) + monitor_returned.set() + + async def touch_marker() -> None: + for _ in range(60): + await anyio.sleep(0.05) + try: + marker_path.touch() + except OSError: + break + + with anyio.move_on_after(0.6): + async with anyio.create_task_group() as tg: + tg.start_soon(touch_marker) + tg.start_soon(run_monitor) + + assert not monitor_returned.is_set(), ( + f"Monitor returned early: stale fired with active run-skill execution marker. " + f"Status: {monitor_status}" + ) + assert monitor_status == [], ( + f"Monitor emitted a status despite not returning: {monitor_status}" + ) + + +@pytest.mark.anyio +async def test_stale_fired_when_execution_marker_expired(tmp_path): + """Stale fires when JSONL is silent and the run-skill execution marker is expired.""" + import json + import os + + session_file = tmp_path / "abc123.jsonl" + session_file.write_text( + json.dumps({"type": "assistant", "message": {"role": "assistant", "content": "working"}}) + + "\n" + ) + spawn_time = time.time() - 1 + + marker_dir = tmp_path / "marker_dir" + marker_dir.mkdir() + marker_path = marker_dir / "run-skill-in-progress-caller-session-step1.marker" + marker_path.write_text("{}") + past = time.time() - 120 # 120s old — well beyond default max_marker_age=60s + os.utime(marker_path, (past, past)) + + with anyio.fail_after(3.0): + result = await _session_log_monitor( + tmp_path, + "DONE", + stale_threshold=0.05, + spawn_time=spawn_time, + marker_dir=marker_dir, + caller_session_id="caller-session", + _phase1_poll=0.01, + _phase2_poll=0.05, + ) + assert result.status == ChannelBStatus.STALE + + @pytest.mark.anyio async def test_watch_session_log_passes_marker_dir_to_monitor_kwargs( tmp_path: anyio.Path, monkeypatch: pytest.MonkeyPatch diff --git a/tests/execution/test_process_session_log_monitor_dispatch_marker.py b/tests/execution/test_process_session_log_monitor_dispatch_marker.py index 231195e06d..a834af49c2 100644 --- a/tests/execution/test_process_session_log_monitor_dispatch_marker.py +++ b/tests/execution/test_process_session_log_monitor_dispatch_marker.py @@ -20,12 +20,12 @@ class TestStaleSuppressionDispatchMarker: Tests the following scenarios for dispatch marker stale suppression: - T1: Active marker (True→False via monkeypatch) suppresses stale, then fires - T2: Expired marker (mtime > 60s) does NOT suppress stale (real helper, no monkeypatch — - exercises the mtime-expiry threshold in _has_active_dispatch_marker directly) + exercises the mtime-expiry threshold in _has_active_execution_marker directly) - T3: No marker_dir (None) skips dispatch check entirely (regression guard) - T4: Bounded suppression fires after max_suppression_seconds - T5: Session-scoped matching — sessionA marker does not suppress sessionB - Monkeypatch target: autoskillit.execution.process._process_monitor._has_active_dispatch_marker + Monkeypatch target: autoskillit.execution.process._process_monitor._has_active_execution_marker Convention: caller_session_id= is the kwarg used at call sites. Note: T1, T4 monkeypatch the helper; T2, T5 use real marker files to test helper internals. """ @@ -43,7 +43,7 @@ def side_effect_fn(marker_dir, session_id=None): return call_count["n"] == 1 monkeypatch.setattr( - "autoskillit.execution.process._process_monitor._has_active_dispatch_marker", + "autoskillit.execution.process._process_monitor._has_active_execution_marker", side_effect_fn, ) with anyio.fail_after(5.0): @@ -107,7 +107,7 @@ async def test_stale_marker_suppression_bounded(self, tmp_path, monkeypatch): session_file.write_text("") spawn_time = time.time() - 10 monkeypatch.setattr( - "autoskillit.execution.process._process_monitor._has_active_dispatch_marker", + "autoskillit.execution.process._process_monitor._has_active_execution_marker", lambda marker_dir, session_id=None: True, ) with anyio.fail_after(3.0): @@ -197,7 +197,7 @@ def fake_dispatch_marker(marker_dir, session_id=None): fake_child_proc, ) monkeypatch.setattr( - "autoskillit.execution.process._process_monitor._has_active_dispatch_marker", + "autoskillit.execution.process._process_monitor._has_active_execution_marker", fake_dispatch_marker, ) with anyio.fail_after(5.0): @@ -233,7 +233,7 @@ async def test_dispatch_marker_bounded_by_max_suppression(self, tmp_path, monkey lambda pid: False, ) monkeypatch.setattr( - "autoskillit.execution.process._process_monitor._has_active_dispatch_marker", + "autoskillit.execution.process._process_monitor._has_active_execution_marker", lambda marker_dir, session_id=None: True, ) with structlog.testing.capture_logs() as logs: @@ -285,7 +285,7 @@ def fake_dispatch_marker(marker_dir, session_id=None): fake_child_proc, ) monkeypatch.setattr( - "autoskillit.execution.process._process_monitor._has_active_dispatch_marker", + "autoskillit.execution.process._process_monitor._has_active_execution_marker", fake_dispatch_marker, ) with structlog.testing.capture_logs() as logs: @@ -327,7 +327,7 @@ async def test_dispatch_marker_bounded_kill_emits_warning_with_fields( lambda pid: False, ) monkeypatch.setattr( - "autoskillit.execution.process._process_monitor._has_active_dispatch_marker", + "autoskillit.execution.process._process_monitor._has_active_execution_marker", lambda marker_dir, session_id=None: True, ) with structlog.testing.capture_logs() as logs: @@ -352,7 +352,7 @@ async def test_dispatch_marker_bounded_kill_emits_warning_with_fields( @pytest.mark.anyio async def test_marker_dir_none_skips_dispatch_check(self, tmp_path, monkeypatch): - """When marker_dir is None (default), _has_active_dispatch_marker is never called.""" + """When marker_dir is None (default), _has_active_execution_marker is never called.""" session_file = tmp_path / "abc123.jsonl" session_file.write_text("") spawn_time = time.time() - 10 @@ -377,7 +377,7 @@ def track_dispatch_marker(marker_dir, session_id=None): fake_child_proc, ) monkeypatch.setattr( - "autoskillit.execution.process._process_monitor._has_active_dispatch_marker", + "autoskillit.execution.process._process_monitor._has_active_execution_marker", track_dispatch_marker, ) with anyio.fail_after(5.0): diff --git a/tests/execution/test_process_session_log_monitor_stale_suppression.py b/tests/execution/test_process_session_log_monitor_stale_suppression.py index 7ef2295ff4..81bfda1478 100644 --- a/tests/execution/test_process_session_log_monitor_stale_suppression.py +++ b/tests/execution/test_process_session_log_monitor_stale_suppression.py @@ -3,6 +3,7 @@ from __future__ import annotations import time +from pathlib import Path from unittest.mock import patch import anyio @@ -316,3 +317,86 @@ def _api_conn(pid): assert elapsed >= 0.15, ( f"elapsed {elapsed:.2f}s below 0.15s — suppression may not have fired" ) + + +class TestExecutionMarkerSuppression: + """Execution marker (run-skill-in-progress-*) suppression for _session_log_monitor. + + Covers the run_skill MCP tool blind spot: markers named run-skill-in-progress-* + must suppress stale kills just like dispatch-in-progress-* markers. + """ + + @pytest.mark.anyio + async def test_execution_marker_suppression_bounded_by_max_suppression_seconds(self, tmp_path): + """Stale fires after max_suppression_seconds despite fresh run-skill marker. + + Failing before implementation: run-skill-in-progress-* marker is invisible to + dispatch-in-progress-* glob, so STALE fires immediately (elapsed << max_suppression). + After fix: marker found, suppression active, fires only after max_suppression_seconds. + """ + session_file = tmp_path / "abc123.jsonl" + session_file.write_text( + '{"type": "assistant", "message": {"role": "assistant", "content": "working"}}\n' + ) + spawn_time = time.time() - 1 + + marker_dir = tmp_path / "marker_dir" + marker_dir.mkdir() + marker_path = marker_dir / "run-skill-in-progress-caller-session-step1.marker" + marker_path.write_text("{}") + + max_suppression = 0.3 + + async def touch_marker() -> None: + for _ in range(200): + await anyio.sleep(0.02) + try: + marker_path.touch() + except OSError: + break + + start = time.monotonic() + with anyio.fail_after(5.0): + async with anyio.create_task_group() as tg: + tg.start_soon(touch_marker) + result = await _session_log_monitor( + tmp_path, + "DONE", + stale_threshold=0.05, + spawn_time=spawn_time, + marker_dir=marker_dir, + caller_session_id="caller-session", + _phase1_poll=0.01, + _phase2_poll=0.05, + max_suppression_seconds=max_suppression, + ) + tg.cancel_scope.cancel() + + elapsed = time.monotonic() - start + assert result.status == ChannelBStatus.STALE + assert elapsed >= max_suppression, ( + f"STALE fired after {elapsed:.3f}s, expected >= {max_suppression}s. " + "run-skill execution marker suppression not working — " + "marker may not be matched by the glob pattern." + ) + assert elapsed < 5.0, ( + f"STALE fired after {elapsed:.3f}s — expected prompt firing after " + f"max_suppression={max_suppression}s, not near the 5s timeout ceiling." + ) + + +class TestExecutionMarkerLifecycle: + @pytest.mark.anyio + async def test_execution_marker_lifecycle(self, tmp_path: Path) -> None: + from autoskillit.core._execution_marker import execution_marker + + marker_dir = tmp_path / "markers" + marker_dir.mkdir() + + async with execution_marker(marker_dir, "session-123", "run-skill") as path: + assert path is not None + matches = list(marker_dir.glob("run-skill-in-progress-session-123-*.marker")) + assert len(matches) == 1 + + remaining = list(marker_dir.glob("*-in-progress-*.marker")) + assert remaining == [], f"marker not cleaned up: {remaining}" diff --git a/tests/execution/test_process_submodules.py b/tests/execution/test_process_submodules.py index 696f87b2a6..2a15c3934b 100644 --- a/tests/execution/test_process_submodules.py +++ b/tests/execution/test_process_submodules.py @@ -21,7 +21,7 @@ "RaceSignals", "_has_active_api_connection", "_has_active_child_processes", - "_has_active_dispatch_marker", + "_has_active_execution_marker", "_heartbeat", "_jsonl_contains_marker", "_jsonl_has_record_type", @@ -99,7 +99,7 @@ def test_process_monitor_exports(): are defined in _process_monitor submodule.""" from autoskillit.execution.process._process_monitor import ( _has_active_api_connection, - _has_active_dispatch_marker, + _has_active_execution_marker, _heartbeat, _session_log_monitor, ) @@ -112,9 +112,9 @@ def test_process_monitor_exports(): assert ( _has_active_api_connection.__module__ == "autoskillit.execution.process._process_monitor" ) - assert callable(_has_active_dispatch_marker) + assert callable(_has_active_execution_marker) assert ( - _has_active_dispatch_marker.__module__ == "autoskillit.execution.process._process_monitor" + _has_active_execution_marker.__module__ == "autoskillit.execution.process._process_monitor" ) diff --git a/tests/fakes.py b/tests/fakes.py index 7240a20ab7..4589781122 100644 --- a/tests/fakes.py +++ b/tests/fakes.py @@ -94,6 +94,8 @@ class ExecutorCall: resume_checkpoint: SessionCheckpoint | None = None resume_message: str | None = None backend_override: str | None = None + marker_dir: Path | None = None + caller_session_id: str | None = None @dataclasses.dataclass @@ -193,6 +195,8 @@ async def run( resume_checkpoint: SessionCheckpoint | None = None, resume_message: str | None = None, backend_override: str | None = None, + marker_dir: Path | None = None, + caller_session_id: str | None = None, ) -> SkillResult: self.calls.append( ExecutorCall( @@ -226,6 +230,8 @@ async def run( resume_checkpoint=resume_checkpoint, resume_message=resume_message, backend_override=backend_override, + marker_dir=marker_dir, + caller_session_id=caller_session_id, ) ) if self._queue: diff --git a/tests/fleet/CLAUDE.md b/tests/fleet/CLAUDE.md index 02c59a61f3..6b0645fec4 100644 --- a/tests/fleet/CLAUDE.md +++ b/tests/fleet/CLAUDE.md @@ -12,7 +12,7 @@ Fleet campaign dispatch, state persistence, and sidecar tests. | `test_dispatch_reaper.py` | Tests for `fleet._dispatch_reaper.reap_stale_dispatches` — orphan kill, dead pid, recycled pid, create_time fallback, dry-run, idempotency | | `test_api.py` | Tests for fleet._api module (Group J) | | `test_api_split_integrity.py` | Structural guard: fleet `_api.py` split — verifies new modules export expected symbols and public API surface is preserved | -| `test_api_dispatch_marker.py` | Tests for _run_dispatch marker lifecycle and _touch_dispatch_marker heartbeat | +| `test_api_dispatch_marker.py` | Tests for _run_dispatch marker lifecycle via execution_marker context manager | | `test_campaign_capture.py` | Tests for campaign capture extraction and ingredient interpolation (Group J) | | `test_capture_roundtrip.py` | Tests for prompt-extractor field name alignment — verifies sentinel examples use bare names matching `_extract_captures` expectations | | `test_checkpoint_bridge.py` | Tests for checkpoint_from_sidecar converting IssueSidecarEntry to SessionCheckpoint | diff --git a/tests/fleet/test_api.py b/tests/fleet/test_api.py index 15e9944ce7..c5f04f9bf3 100644 --- a/tests/fleet/test_api.py +++ b/tests/fleet/test_api.py @@ -312,60 +312,49 @@ async def test_execute_dispatch_returns_dispatch_result_with_state_path_on_succe assert result.per_dispatch_state_path.exists() -class TestTouchDispatchMarker: - """Unit tests for _touch_dispatch_marker helper.""" +class TestTouchMarker: + """Unit tests for _touch_marker helper (core._execution_marker).""" @pytest.mark.anyio - async def test_touch_dispatch_marker_creates_heartbeat_loop(self, tmp_path: Path) -> None: - """Heartbeat loop refreshes mtime until trigger is set.""" + async def test_touch_marker_creates_heartbeat_loop(self, tmp_path: Path) -> None: + """Heartbeat loop refreshes mtime until cancelled.""" import os - from autoskillit.fleet._api import _touch_dispatch_marker + from autoskillit.core._execution_marker import _touch_marker marker = tmp_path / "test.marker" marker.touch() - os.utime(marker, (0, 0)) # Set to epoch 0 so any real touch shows as newer + os.utime(marker, (0, 0)) original_mtime_ns = marker.stat().st_mtime_ns - trigger = anyio.Event() - - async def _heartbeat_wrapper() -> None: - await _touch_dispatch_marker(marker, interval=0.05, trigger=trigger) - async with anyio.create_task_group() as tg: - tg.start_soon(_heartbeat_wrapper) - # 1.0s gives ~20 heartbeat cycles; tolerates WSL2 CLOCK_REALTIME backward - # jumps of up to ~0.9s from NTP sync without causing a spurious failure. + tg.start_soon(_touch_marker, marker, 0.05) await anyio.sleep(1.0) - trigger.set() + tg.cancel_scope.cancel() new_mtime_ns = marker.stat().st_mtime_ns assert new_mtime_ns > original_mtime_ns, "marker mtime should have been refreshed" @pytest.mark.anyio - async def test_touch_dispatch_marker_oserror_does_not_propagate(self, tmp_path: Path) -> None: + async def test_touch_marker_oserror_does_not_propagate(self, tmp_path: Path) -> None: """OSError during touch logs but does not raise.""" - from autoskillit.fleet._api import _touch_dispatch_marker + from autoskillit.core._execution_marker import _touch_marker subdir = tmp_path / "subdir" subdir.mkdir() marker_file = subdir / "test.marker" - marker_file.touch() # create the file first - marker_file.chmod(0o000) # make unwritable so touch() fails - trigger = anyio.Event() - - async def _heartbeat_wrapper() -> None: - await _touch_dispatch_marker(marker_file, interval=0.05, trigger=trigger) + marker_file.touch() + marker_file.chmod(0o000) try: async with anyio.create_task_group() as tg: - tg.start_soon(_heartbeat_wrapper) + tg.start_soon(_touch_marker, marker_file, 0.05) await anyio.sleep(0.12) - trigger.set() + tg.cancel_scope.cancel() except OSError: - pytest.fail("_touch_dispatch_marker should not propagate OSError") + pytest.fail("_touch_marker should not propagate OSError") finally: - marker_file.chmod(0o644) # restore for cleanup + marker_file.chmod(0o644) class TestDispatchMarkerLifecycle: @@ -514,11 +503,12 @@ async def _capture_marker(*args, **kwargs): assert len(captured_json) == 1, "should have captured marker JSON" data = captured_json[0] - assert "dispatch_id" in data + assert "label" in data assert "orchestrator_pid" in data assert "session_id" in data assert isinstance(data["orchestrator_pid"], int) assert isinstance(data["session_id"], str) + assert data["label"] == "dispatch" class TestWriteDispatchToCampaignStateRefusal: diff --git a/tests/fleet/test_api_dispatch_marker.py b/tests/fleet/test_api_dispatch_marker.py index 0c45cf296c..9bf11afe25 100644 --- a/tests/fleet/test_api_dispatch_marker.py +++ b/tests/fleet/test_api_dispatch_marker.py @@ -1,4 +1,4 @@ -"""Tests for _run_dispatch marker lifecycle and _touch_dispatch_marker heartbeat.""" +"""Tests for _run_dispatch marker lifecycle via execution_marker context manager.""" from __future__ import annotations @@ -8,7 +8,8 @@ import anyio import pytest -from autoskillit.fleet._api import _run_dispatch, _touch_dispatch_marker +from autoskillit.core._execution_marker import _touch_marker +from autoskillit.fleet._api import _run_dispatch from tests.fleet._helpers import _no_sleep_quota_checker, _noop_quota_refresher, _setup_dispatch pytestmark = [pytest.mark.layer("fleet"), pytest.mark.medium, pytest.mark.feature("fleet")] @@ -25,7 +26,7 @@ async def test_marker_created_before_dispatch(tool_ctx, monkeypatch, tmp_path: P ) async def _asserting_dispatch(**_kw): - found = list(marker_dir.glob("dispatch-in-progress--*.marker")) + found = list(marker_dir.glob("*-in-progress-*.marker")) assert len(found) == 1, f"Expected 1 marker, found {len(found)}" raise asyncio.CancelledError @@ -67,7 +68,7 @@ async def test_marker_deleted_after_success(tool_ctx, monkeypatch, tmp_path: Pat quota_refresher=_noop_quota_refresher, ) - remaining = list(marker_dir.glob("dispatch-in-progress--*.marker")) + remaining = list(marker_dir.glob("*-in-progress-*.marker")) assert remaining == [], f"Expected no marker files, found {remaining}" @@ -101,7 +102,7 @@ async def _raise_runtime_error(**_kw): except* RuntimeError: pass - remaining = list(marker_dir.glob("dispatch-in-progress--*.marker")) + remaining = list(marker_dir.glob("*-in-progress-*.marker")) assert remaining == [], f"Expected no marker files after exception, found {remaining}" @@ -111,14 +112,12 @@ async def test_heartbeat_refreshes_mtime(tmp_path: Path) -> None: marker_path.write_text("{}") initial_mtime = marker_path.stat().st_mtime - trigger = anyio.Event() - async with anyio.create_task_group() as tg: - tg.start_soon(_touch_dispatch_marker, marker_path, 0.05, trigger) + tg.start_soon(_touch_marker, marker_path, 0.05) async def _stop(): await anyio.sleep(2.1) - trigger.set() + tg.cancel_scope.cancel() tg.start_soon(_stop) diff --git a/tests/fleet/test_resume_preflight.py b/tests/fleet/test_resume_preflight.py index 4b8f18315b..e91cbc48a0 100644 --- a/tests/fleet/test_resume_preflight.py +++ b/tests/fleet/test_resume_preflight.py @@ -214,3 +214,187 @@ async def test_session_id_mismatch_logs_warning(self, tool_ctx, monkeypatch, tmp assert mismatch_logs, f"Expected session_id_continuity_mismatch warning, got: {logs}" assert mismatch_logs[0]["resume_session_id"] == "original-session-id" assert mismatch_logs[0]["returned_session_id"] == "returned-different-session" + + +class TestResumeSuccessGuard: + @pytest.mark.anyio + async def test_resume_rejected_when_prior_dispatch_succeeded(self, tool_ctx, monkeypatch): + """_run_dispatch returns cached SUCCESS when prior dispatch already succeeded.""" + from autoskillit.fleet import DispatchRecord, DispatchStatus, write_initial_state + from autoskillit.fleet._api import execute_dispatch + from autoskillit.fleet.state_types import DispatchCompleted + + _setup_dispatch(tool_ctx, monkeypatch) + + dispatches_dir = tool_ctx.temp_dir / "dispatches" + dispatches_dir.mkdir(parents=True, exist_ok=True) + prior_id = "prior-dispatch-succeeded" + state_path = dispatches_dir / f"{prior_id}.json" + write_initial_state( + state_path, + tool_ctx.kitchen_id, + "camp", + "", + [ + DispatchRecord( + name="test-recipe", + status=DispatchStatus.SUCCESS, + dispatch_id="completed-dispatch-id", + dispatched_session_id="completed-session-id", + reason="completed_clean", + ) + ], + ) + + result = await execute_dispatch( + tool_ctx=tool_ctx, + recipe="test-recipe", + task="t", + ingredients=None, + dispatch_name=None, + timeout_sec=None, + prompt_builder=lambda **_: "prompt", + quota_checker=_no_sleep_quota_checker, + quota_refresher=_noop_quota_refresher, + resume_session_id="some-session-id", + prior_dispatch_id=prior_id, + ) + + assert isinstance(result.outcome, DispatchCompleted) + assert result.outcome.success is True + assert result.outcome.dispatch_status == DispatchStatus.SUCCESS + assert result.outcome.dispatch_id == "completed-dispatch-id" + assert result.outcome.dispatched_session_id == "completed-session-id" + assert result.outcome.reason == "completed_clean" + assert len(tool_ctx.executor.dispatch_calls) == 0 + + @pytest.mark.anyio + async def test_resume_proceeds_when_prior_dispatch_resumable( + self, tool_ctx, monkeypatch, tmp_path + ): + """_run_dispatch proceeds normally when prior dispatch is RESUMABLE.""" + from autoskillit.fleet import DispatchRecord, DispatchStatus, write_initial_state + from autoskillit.fleet._api import execute_dispatch + + _setup_dispatch(tool_ctx, monkeypatch) + + jsonl_file = tmp_path / "session.jsonl" + jsonl_file.touch() + monkeypatch.setattr("autoskillit.fleet._api.claude_code_log_path", lambda *_: jsonl_file) + monkeypatch.setattr( + "autoskillit.fleet._api.parse_l3_result_block", + lambda **_: _make_no_sentinel(), + ) + + dispatches_dir = tool_ctx.temp_dir / "dispatches" + dispatches_dir.mkdir(parents=True, exist_ok=True) + prior_id = "prior-dispatch-resumable" + state_path = dispatches_dir / f"{prior_id}.json" + write_initial_state( + state_path, + tool_ctx.kitchen_id, + "camp", + "", + [ + DispatchRecord( + name="test-recipe", + status=DispatchStatus.RESUMABLE, + dispatched_session_id="resumable-session-id", + ) + ], + ) + + await execute_dispatch( + tool_ctx=tool_ctx, + recipe="test-recipe", + task="t", + ingredients=None, + dispatch_name=None, + timeout_sec=None, + prompt_builder=lambda **_: "prompt", + quota_checker=_no_sleep_quota_checker, + quota_refresher=_noop_quota_refresher, + resume_session_id="resumable-session-id", + prior_dispatch_id=prior_id, + ) + + assert len(tool_ctx.executor.dispatch_calls) == 1 + + @pytest.mark.anyio + async def test_resume_proceeds_when_prior_dispatch_not_in_state( + self, tool_ctx, monkeypatch, tmp_path + ): + """_run_dispatch proceeds when prior state has no matching dispatch (fail-open).""" + from autoskillit.fleet import DispatchRecord, DispatchStatus, write_initial_state + from autoskillit.fleet._api import execute_dispatch + + _setup_dispatch(tool_ctx, monkeypatch) + + jsonl_file = tmp_path / "session.jsonl" + jsonl_file.touch() + monkeypatch.setattr("autoskillit.fleet._api.claude_code_log_path", lambda *_: jsonl_file) + monkeypatch.setattr( + "autoskillit.fleet._api.parse_l3_result_block", + lambda **_: _make_no_sentinel(), + ) + + dispatches_dir = tool_ctx.temp_dir / "dispatches" + dispatches_dir.mkdir(parents=True, exist_ok=True) + prior_id = "prior-dispatch-other-name" + state_path = dispatches_dir / f"{prior_id}.json" + write_initial_state( + state_path, + tool_ctx.kitchen_id, + "camp", + "", + [DispatchRecord(name="other-recipe", status=DispatchStatus.SUCCESS)], + ) + + await execute_dispatch( + tool_ctx=tool_ctx, + recipe="test-recipe", + task="t", + ingredients=None, + dispatch_name=None, + timeout_sec=None, + prompt_builder=lambda **_: "prompt", + quota_checker=_no_sleep_quota_checker, + quota_refresher=_noop_quota_refresher, + resume_session_id="some-session-id", + prior_dispatch_id=prior_id, + ) + + assert len(tool_ctx.executor.dispatch_calls) == 1 + + @pytest.mark.anyio + async def test_resume_proceeds_when_prior_state_missing(self, tool_ctx, monkeypatch, tmp_path): + """_run_dispatch proceeds when prior state file does not exist (fail-open).""" + from autoskillit.fleet._api import execute_dispatch + + _setup_dispatch(tool_ctx, monkeypatch) + + jsonl_file = tmp_path / "session.jsonl" + jsonl_file.touch() + monkeypatch.setattr("autoskillit.fleet._api.claude_code_log_path", lambda *_: jsonl_file) + monkeypatch.setattr( + "autoskillit.fleet._api.parse_l3_result_block", + lambda **_: _make_no_sentinel(), + ) + + prior_id = "prior-dispatch-nonexistent" + + await execute_dispatch( + tool_ctx=tool_ctx, + recipe="test-recipe", + task="t", + ingredients=None, + dispatch_name=None, + timeout_sec=None, + prompt_builder=lambda **_: "prompt", + quota_checker=_no_sleep_quota_checker, + quota_refresher=_noop_quota_refresher, + resume_session_id="some-session-id", + prior_dispatch_id=prior_id, + ) + + assert len(tool_ctx.executor.dispatch_calls) == 1 diff --git a/tests/fleet/test_state.py b/tests/fleet/test_state.py index 6a3909242f..34a1354155 100644 --- a/tests/fleet/test_state.py +++ b/tests/fleet/test_state.py @@ -944,6 +944,42 @@ def test_mixed_infrastructure_and_logic_failure_halts(self, tmp_path: Path) -> N assert has_failed_dispatch(sp) is True +class TestHasCompletedDispatch: + def test_has_completed_dispatch_returns_true_for_success(self, tmp_path: Path) -> None: + """has_completed_dispatch returns True when the named dispatch has SUCCESS status.""" + from autoskillit.fleet.state_recovery import has_completed_dispatch + + sp = _state_path(tmp_path) + write_initial_state( + sp, + "cid", + "camp", + "/m.yaml", + [DispatchRecord(name="d1", status=DispatchStatus.SUCCESS)], + ) + assert has_completed_dispatch(sp, "d1") is True + + def test_has_completed_dispatch_returns_false_for_non_success(self, tmp_path: Path) -> None: + """has_completed_dispatch returns False for RESUMABLE, PENDING, FAILURE, etc.""" + from autoskillit.fleet.state_recovery import has_completed_dispatch + + sp = _state_path(tmp_path) + write_initial_state(sp, "cid", "camp", "/m.yaml", _make_dispatches("d1", "d2", "d3")) + append_dispatch_record( + sp, DispatchRecord(name="d2", status=DispatchStatus.FAILURE, reason="task-failed") + ) + assert has_completed_dispatch(sp, "d1") is False + assert has_completed_dispatch(sp, "d2") is False + assert has_completed_dispatch(sp, "d3") is False + + def test_has_completed_dispatch_returns_false_for_missing_state(self, tmp_path: Path) -> None: + """has_completed_dispatch returns False when state file is missing (fail-open).""" + from autoskillit.fleet.state_recovery import has_completed_dispatch + + sp = tmp_path / "nonexistent" / "state.json" + assert has_completed_dispatch(sp, "any-dispatch") is False + + class TestRetryReasonPropagation: def test_retry_reason_stored_in_dispatch_record(self, tmp_path: Path) -> None: sp = _state_path(tmp_path) diff --git a/tests/infra/test_schema_read_convention.py b/tests/infra/test_schema_read_convention.py index 57255f1bae..bd0c8481f1 100644 --- a/tests/infra/test_schema_read_convention.py +++ b/tests/infra/test_schema_read_convention.py @@ -81,7 +81,9 @@ def _scan_read_versioned_json_callers() -> set[str]: "src/autoskillit/planner/consolidation.py": "Transient single-pipeline-run artifacts", "src/autoskillit/planner/validation.py": "Transient single-pipeline-run artifacts", "src/autoskillit/execution/_recording_skills.py": "Informational manifest — never read back", - "src/autoskillit/fleet/_api.py": "Progress signal — written and deleted, never read back", + "src/autoskillit/core/_execution_marker.py": ( + "Progress signal — written and deleted, never read back" + ), "src/autoskillit/execution/session_log.py": ( "token_usage.json readers use dual-key fallback, not version-gated reading" ), diff --git a/tests/test_test_filter_core_cascade.py b/tests/test_test_filter_core_cascade.py index d736144872..6014c3eb5b 100644 --- a/tests/test_test_filter_core_cascade.py +++ b/tests/test_test_filter_core_cascade.py @@ -106,6 +106,7 @@ def test_all_entries_present(self) -> None: "_type_constants_registries", "_type_exceptions", "_step_context", + "_execution_marker", } assert set(MODULE_CASCADE_CORE.keys()) == expected_stems