Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
17 commits
Select commit Hold shift + click to select a range
10ecb3d
test: add failing tests for run-skill execution marker blind spot
Trecek May 31, 2026
bc8efa2
refactor: rename _has_active_dispatch_marker -> _has_active_execution…
Trecek May 31, 2026
a5bfcd0
feat: thread marker_dir and caller_session_id through run() call chain
Trecek May 31, 2026
b3948cb
feat: extract execution_marker context manager and wire run_skill mar…
Trecek May 31, 2026
aa101f3
test: add TestExecutionMarkerLifecycle to close marker lifecycle veri…
Trecek May 31, 2026
1705732
test: add failing tests for pre-resume success guard and has_complete…
Trecek May 31, 2026
88a8f6c
feat: implement pre-resume success guard and has_completed_dispatch r…
Trecek May 31, 2026
d88c2d0
feat: bound analyze-pipeline-health stale_threshold to 2400s and cap …
Trecek May 31, 2026
a5f301a
chore: commit pending session changes
Trecek May 31, 2026
88e4ee2
fix: catch FileNotFoundError from open_continued in _run_dispatch
Trecek May 31, 2026
e447882
fix(review): use uuid suffix and anyio task group in execution_marker
Trecek May 31, 2026
1d77c2d
fix(review): forward stored dispatch_id/session_id in pre-flight guard
Trecek May 31, 2026
c04cbe6
fix(review): strengthen test assertions for monitor status and dispat…
Trecek May 31, 2026
36a9889
fix(review): use asyncio.create_task with proper exception handling
Trecek May 31, 2026
85b34f5
fix(review): add upper-bound elapsed assertion to execution marker su…
Trecek May 31, 2026
8a157f0
fix(review): add parameter-existence assertions before default-value …
Trecek May 31, 2026
e2edc1b
fix(review): register _execution_marker in core stem cascade map
Trecek May 31, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/autoskillit/core/CLAUDE.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ Sub-packages: types/ (see types/CLAUDE.md) and runtime/ (see runtime/CLAUDE.md).
| `_plugin_cache.py` | Plugin cache lifecycle: retiring cache, install locking, kitchen registry |
| `_plugin_ids.py` | `DIRECT_PREFIX`, `MARKETPLACE_PREFIX`, `detect_autoskillit_mcp_prefix` (stdlib-only) |
| `_install_detect.py` | `is_dev_install()` — editable-install detection for config resolution |
| `_execution_marker.py` | `execution_marker` async context manager — unified write/heartbeat/cleanup for stale-detector suppression markers |
| `_step_context.py` | `current_step_name`, `current_order_id` ContextVars for pipeline step attribution |
| `feature_flags.py` | `is_feature_enabled()` — IL-0 feature gate resolution primitive |
| `tool_sequence_analysis.py` | Cross-session tool call sequence DFG analysis (stdlib-only) |
Expand Down
1 change: 1 addition & 0 deletions src/autoskillit/core/__init__.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ from ._cmd_runner import CmdRunner as CmdRunner
from ._cmd_runner import default_cmd_runner as default_cmd_runner
from ._cmd_runner import run_gh as run_gh
from ._cmd_runner import run_git as run_git
from ._execution_marker import execution_marker as execution_marker
from ._install_detect import DirectUrlInfo as DirectUrlInfo
from ._install_detect import _is_release_tag as _is_release_tag
from ._install_detect import _is_stable_track as _is_stable_track
Expand Down
92 changes: 92 additions & 0 deletions src/autoskillit/core/_execution_marker.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
"""Unified execution marker protocol for stale-detector suppression.

Async context manager that writes a ``{label}-in-progress-{session_id}-{uuid}.marker``
file, heartbeats its mtime, and deletes it on exit. Lives in ``core/`` (IL-0) so
both ``fleet/_api.py`` (IL-2) and ``server/tools/`` (IL-3) can import it without
violating layer constraints.
"""

from __future__ import annotations

import asyncio
import os
import uuid
from collections.abc import AsyncGenerator
from contextlib import asynccontextmanager
from pathlib import Path

import anyio

from .io import write_versioned_json
from .logging import get_logger

logger = get_logger(__name__)


async def _touch_marker(marker_path: Path, interval: float) -> None:
try:
marker_path.touch()
except OSError:
logger.warning("execution_marker: touch failed %s", marker_path, exc_info=True)
while True:
await anyio.sleep(interval)
try:
marker_path.touch()
except OSError:
logger.warning("execution_marker: touch failed %s", marker_path, exc_info=True)


@asynccontextmanager
async def execution_marker(
marker_dir: Path | None,
session_id: str,
label: str,
heartbeat_interval: float = 30.0,
) -> AsyncGenerator[Path | None]:
"""Write, heartbeat, and clean up an execution marker.

Yields the marker ``Path`` on success, or ``None`` when ``marker_dir`` is
``None`` or the initial write fails (suppression disabled, same semantics
as ``fleet/_api.py``'s dispatch marker).
"""
if marker_dir is None:
yield None
return

marker_path = marker_dir / f"{label}-in-progress-{session_id}-{uuid.uuid4().hex[:8]}.marker"
try:
write_versioned_json(
marker_path,
{
"label": label,
"orchestrator_pid": os.getpid(),
"session_id": session_id,
},
schema_version=1,
)
except OSError:
logger.warning("execution_marker_write_failed", marker=str(marker_path), exc_info=True)
yield None
return

hb_task: asyncio.Task[None] | None = None
try:
hb_task = asyncio.get_running_loop().create_task(
_touch_marker(marker_path, heartbeat_interval)
)
yield marker_path
finally:
if hb_task is not None:
hb_task.cancel()
try:
await hb_task
except asyncio.CancelledError:
pass
except Exception:
logger.warning("execution_marker: heartbeat failed", exc_info=True)
try:
marker_path.unlink(missing_ok=True)
except OSError:
logger.warning(
"execution_marker_unlink_failed", marker=str(marker_path), exc_info=True
)
2 changes: 2 additions & 0 deletions src/autoskillit/core/types/_type_protocols_execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ async def run(
resume_checkpoint: SessionCheckpoint | None = None,
resume_message: str | None = None,
backend_override: str | None = None,
marker_dir: Path | None = None,
caller_session_id: str | None = None,
) -> SkillResult: ...

async def dispatch_food_truck(
Expand Down
12 changes: 6 additions & 6 deletions src/autoskillit/core/types/_type_subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,14 +130,14 @@ class SubprocessRunner(Protocol):
Parameters
----------
marker_dir : Path | None
Directory containing ``dispatch-in-progress-{session_id}-*.marker`` files.
When non-None, the session log monitor checks for active dispatch markers
Directory containing ``*-in-progress-{session_id}-*.marker`` files.
When non-None, the session log monitor checks for active execution markers
before issuing stale-kill signals, suppressing kills while a fleet dispatch
is in progress. Default ``None`` (no suppression).
or run_skill call is in progress. Default ``None`` (no suppression).
session_id : str | None
Caller's session identity, used to scope dispatch-marker glob patterns to
the originating fleet session. Threaded from fleet dispatch through headless
execution to ``_session_log_monitor``'s ``caller_session_id`` parameter.
Caller's session identity, used to scope execution-marker glob patterns to
the originating session. Threaded from fleet dispatch / run_skill through
headless execution to ``_session_log_monitor``'s ``caller_session_id`` parameter.
Default ``None`` (match any marker).
"""

Expand Down
8 changes: 8 additions & 0 deletions src/autoskillit/execution/headless/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,8 @@ async def run_headless_core(
resume_checkpoint: SessionCheckpoint | None = None,
resume_message: str | None = None,
backend_override: str | None = None,
marker_dir: Path | None = None,
caller_session_id: str | None = None,
) -> SkillResult:
"""Shared headless runner used by run_skill.

Expand Down Expand Up @@ -235,6 +237,8 @@ async def run_headless_core(
step_backend=step_backend,
model_identifier=resolved_model or "",
profile_name=profile_name,
marker_dir=marker_dir,
session_id=caller_session_id,
)


Expand Down Expand Up @@ -277,6 +281,8 @@ async def run(
resume_checkpoint: SessionCheckpoint | None = None,
resume_message: str | None = None,
backend_override: str | None = None,
marker_dir: Path | None = None,
caller_session_id: str | None = None,
) -> SkillResult:
cfg = self._ctx.config.run_skill
effective_timeout = timeout if timeout is not None else cfg.timeout
Expand Down Expand Up @@ -313,6 +319,8 @@ async def run(
resume_checkpoint=resume_checkpoint,
resume_message=resume_message,
backend_override=backend_override,
marker_dir=marker_dir,
caller_session_id=caller_session_id,
)

async def dispatch_food_truck(
Expand Down
4 changes: 2 additions & 2 deletions src/autoskillit/execution/process/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
from autoskillit.execution.process._process_monitor import (
_has_active_api_connection,
_has_active_child_processes,
_has_active_dispatch_marker,
_has_active_execution_marker,
_heartbeat,
_session_log_monitor,
)
Expand Down Expand Up @@ -84,7 +84,7 @@
"RaceSignals",
"_has_active_api_connection",
"_has_active_child_processes",
"_has_active_dispatch_marker",
"_has_active_execution_marker",
"_heartbeat",
"_jsonl_contains_marker",
"_jsonl_has_record_type",
Expand Down
10 changes: 5 additions & 5 deletions src/autoskillit/execution/process/_process_monitor.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,18 +156,18 @@ def _has_active_child_processes(pid: int) -> bool:
return active


def _has_active_dispatch_marker(
def _has_active_execution_marker(
marker_dir: Path,
session_id: str | None = None,
max_marker_age: float = 60.0,
) -> bool:
"""Return True if any dispatch-in-progress marker was touched within max_marker_age seconds."""
"""Return True if any execution-in-progress marker was touched within max_marker_age secs."""
try:
now = time.time()
pattern = (
f"dispatch-in-progress-{session_id}-*.marker"
f"*-in-progress-{session_id}-*.marker"
if session_id is not None
else "dispatch-in-progress-*.marker"
else "*-in-progress-*.marker"
)
for p in marker_dir.glob(pattern):
try:
Expand Down Expand Up @@ -390,7 +390,7 @@ async def _session_log_monitor(
elapsed,
pid,
)
elif marker_dir is not None and _has_active_dispatch_marker(
elif marker_dir is not None and _has_active_execution_marker(
marker_dir, session_id=caller_session_id
):
if suppression_start is None:
Expand Down
8 changes: 4 additions & 4 deletions src/autoskillit/execution/process/_process_race.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from autoskillit.execution.process._process_monitor import (
_has_active_api_connection,
_has_active_child_processes,
_has_active_dispatch_marker,
_has_active_execution_marker,
_heartbeat,
_session_log_monitor,
)
Expand Down Expand Up @@ -189,7 +189,7 @@ async def _watch_stdout_idle(
last_growth_time = _time.monotonic()
suppression_start_marker = None
elif _time.monotonic() - last_growth_time >= idle_output_timeout:
if marker_dir is not None and _has_active_dispatch_marker(
if marker_dir is not None and _has_active_execution_marker(
marker_dir, session_id=session_id
):
now = _time.monotonic()
Expand Down Expand Up @@ -240,7 +240,7 @@ async def _watch_child_activity(
"""Extend the wall-clock CancelScope.deadline when child processes are active.

Polls _has_active_child_processes, _has_active_api_connection, and
_has_active_dispatch_marker every _poll_interval seconds. When any
_has_active_execution_marker every _poll_interval seconds. When any
returns True, pushes timeout_scope.deadline forward (up to
max_extension_seconds beyond the original deadline).

Expand All @@ -267,7 +267,7 @@ async def _watch_child_activity(
or _has_active_api_connection(pid)
or (
marker_dir is not None
and _has_active_dispatch_marker(marker_dir, session_id=session_id)
and _has_active_execution_marker(marker_dir, session_id=session_id)
)
)
if not active:
Expand Down
4 changes: 4 additions & 0 deletions src/autoskillit/fleet/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,10 @@
from .state_recovery import (
classify_stale_dispatch,
derive_orchestrator_resume_spec,
find_completed_dispatch,
find_dispatch_for_issue,
has_blocking_dispatch,
has_completed_dispatch,
)
from .state_types import (
_INFRASTRUCTURE_FAILURE_REASONS, # noqa: F401
Expand Down Expand Up @@ -125,7 +127,9 @@
"GateRecordResult",
"append_dispatch_record",
"build_protected_campaign_ids",
"find_completed_dispatch",
"has_blocking_dispatch",
"has_completed_dispatch",
"has_failed_dispatch",
"record_gate_outcome",
"mark_dispatch_interrupted",
Expand Down
Loading
Loading