Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions src/strands/agent/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,16 @@
SlidingWindowConversationManager,
SummarizingConversationManager,
)
from .state_machine import AgentExecutionState, AgentStateMachine, CHECKPOINT_STATES, InvalidStateTransitionError

__all__ = [
"Agent",
"AgentBase",
"AgentResult",
"AgentExecutionState",
"AgentStateMachine",
"CHECKPOINT_STATES",
"InvalidStateTransitionError",
"ConversationManager",
"NullConversationManager",
"SlidingWindowConversationManager",
Expand Down
32 changes: 32 additions & 0 deletions src/strands/agent/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
HookRegistry,
MessageAddedEvent,
)
from ..hooks.events import AgentStateTransitionEvent
from ..hooks.registry import TEvent
from ..interrupt import _InterruptState
from ..models.bedrock import BedrockModel
Expand Down Expand Up @@ -69,6 +70,7 @@
SlidingWindowConversationManager,
)
from .state import AgentState
from .state_machine import AgentExecutionState, AgentStateMachine

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -279,6 +281,11 @@ def __init__(

self._interrupt_state = _InterruptState()

# Formal execution state machine — tracks the agent's lifecycle phase and
# emits AgentStateTransitionEvent hook events on every transition.
self.state_machine = AgentStateMachine()
self.state_machine.add_listener(self._on_state_transition)

# Initialize lock for guarding concurrent invocations
# Using threading.Lock instead of asyncio.Lock because run_async() creates
# separate event loops in different threads, so asyncio.Lock wouldn't work
Expand Down Expand Up @@ -330,6 +337,12 @@ def __init__(

self.hooks.invoke_callbacks(AgentInitializedEvent(agent=self))

def _on_state_transition(
self, old_state: AgentExecutionState, new_state: AgentExecutionState
) -> None:
"""Fire an :class:`~strands.hooks.events.AgentStateTransitionEvent` hook on every state change."""
self.hooks.invoke_callbacks(AgentStateTransitionEvent(agent=self, old_state=old_state, new_state=new_state))

def cancel(self) -> None:
"""Cancel the currently running agent invocation.

Expand Down Expand Up @@ -744,6 +757,7 @@ async def stream_async(
)

try:
self.state_machine.transition(AgentExecutionState.INITIALIZING)
self._interrupt_state.resume(prompt)

self.event_loop_metrics.reset_usage_metrics()
Expand Down Expand Up @@ -793,6 +807,13 @@ async def stream_async(
# Clear cancel signal to allow agent reuse after cancellation
self._cancel_signal.clear()

# Return to IDLE unless we're paused at an INTERRUPTED checkpoint,
# which persists so external code can observe the paused state.
# Use reset() rather than transition() so exceptions in any running
# state (e.g. MODEL_CALL) don't strand the state machine.
if self.state_machine.state != AgentExecutionState.INTERRUPTED:
self.state_machine.reset()

if self._invocation_lock.locked():
self._invocation_lock.release()

Expand Down Expand Up @@ -857,6 +878,13 @@ async def _run_loop(
# Capture the result from the final event if available
if isinstance(event, EventLoopStopEvent):
agent_result = AgentResult(*event["stop"])
stop_reason = event["stop"][0]
if stop_reason == "interrupt":
self.state_machine.transition(AgentExecutionState.INTERRUPTED)
elif stop_reason == "cancelled":
self.state_machine.transition(AgentExecutionState.CANCELLED)
else:
self.state_machine.transition(AgentExecutionState.COMPLETED)

finally:
self.conversation_manager.apply_management(self)
Expand All @@ -872,6 +900,7 @@ async def _run_loop(
# raise TypeError if the resume input is not valid interrupt responses.
self._interrupt_state.resume(after_invocation_event.resume)
current_messages = await self._convert_prompt_to_messages(after_invocation_event.resume)
self.state_machine.transition(AgentExecutionState.INITIALIZING)
else:
current_messages = None

Expand Down Expand Up @@ -914,6 +943,9 @@ async def _execute_event_loop_cycle(
if self._session_manager:
self._session_manager.sync_agent(self)

# Return to INITIALIZING so event_loop_cycle can re-enter MODEL_CALL cleanly
self.state_machine.try_transition(AgentExecutionState.INITIALIZING)

events = self._execute_event_loop_cycle(invocation_state, structured_output_context)
async for event in events:
yield event
Expand Down
Loading