Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 35 additions & 7 deletions python/packages/core/agent_framework/_workflows/_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,28 @@ async def run_until_convergence(self) -> AsyncGenerator[WorkflowEvent, None]:
self._running = False

async def _run_iteration(self) -> None:
async def _deliver_messages(source_executor_id: str, messages: list[WorkflowMessage]) -> None:
"""Run a single iteration of the workflow.

Messages are delivered through edge runners. A source executor may have multiple outgoing edge
runners. All edge runners run concurrently, but messages sent through the same edge runner are
delivered in the order they were sent to preserve message ordering guarantees per edge.

What this means in practice:
- A message from a source to multiple target is delivered to all targets concurrently.
- Multiple messages from a source to the same target are delivered in the order they were sent.
- Multiple messages from different sources to the same target can be delivered to the target one
at a time in any order, because true parallelism is not realized in Python.
- Multiple message from different sources to different targets are delivered concurrently to all
targets, assuming each message is targeting a unique target, or it falls back to the previous
rules if there are multiple messages targeting the same target.
- Special case: if using a fan-out edge runner (or derived edge runner that replicates messages
to multiple targets such as multi-selection or switch-case) to send messages to targets from
a source by specifying the target, the messages will be delivered to the specified targets
in the order they were sent. This is because all messages go through the same edge runner instance
which preserves message order.
"""

async def _deliver_messages(source_executor_id: str, source_messages: list[WorkflowMessage]) -> None:
"""Outer loop to concurrently deliver messages from all sources to their targets."""

async def _deliver_message_inner(edge_runner: EdgeRunner, message: WorkflowMessage) -> bool:
Expand All @@ -172,13 +193,20 @@ async def _deliver_message_inner(edge_runner: EdgeRunner, message: WorkflowMessa
logger.debug(f"No outgoing edges found for executor {source_executor_id}; dropping messages.")
return

for message in messages:
# Deliver a message through all edge runners associated with the source executor concurrently.
tasks = [_deliver_message_inner(edge_runner, message) for edge_runner in associated_edge_runners]
await asyncio.gather(*tasks)
async def _deliver_messages_for_edge_runner(edge_runner: EdgeRunner) -> None:
# Preserve message order per edge runner (and therefore per routed target path)
# while still allowing parallelism across different edge runners.
for message in source_messages:
await _deliver_message_inner(edge_runner, message)

tasks = [_deliver_messages_for_edge_runner(edge_runner) for edge_runner in associated_edge_runners]
await asyncio.gather(*tasks)

messages = await self._ctx.drain_messages()
tasks = [_deliver_messages(source_executor_id, messages) for source_executor_id, messages in messages.items()]
message_batches = await self._ctx.drain_messages()
tasks = [
_deliver_messages(source_executor_id, source_messages)
for source_executor_id, source_messages in message_batches.items()
]
await asyncio.gather(*tasks)

async def _create_checkpoint_if_enabled(self, previous_checkpoint_id: CheckpointID | None) -> CheckpointID | None:
Expand Down
150 changes: 149 additions & 1 deletion python/packages/core/tests/workflow/test_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
handler,
)
from agent_framework._workflows._const import EXECUTOR_STATE_KEY
from agent_framework._workflows._edge import SingleEdgeGroup
from agent_framework._workflows._edge import FanOutEdgeGroup, SingleEdgeGroup
from agent_framework._workflows._runner import Runner
from agent_framework._workflows._runner_context import (
InProcRunnerContext,
Expand Down Expand Up @@ -150,6 +150,154 @@ async def test_runner_run_until_convergence_not_completed():
assert event.type != "status" or event.state != WorkflowRunState.IDLE


async def test_runner_run_iteration_preserves_message_order_per_edge_runner() -> None:
"""Test that _run_iteration preserves message order to the same target path."""

class RecordingEdgeRunner:
def __init__(self) -> None:
self.received: list[int] = []

async def send_message(self, message: WorkflowMessage, state: State, ctx: RunnerContext) -> bool:
message_data = message.data
assert isinstance(message_data, MockMessage)
self.received.append(message_data.data)
return True

ctx = InProcRunnerContext()
state = State()
runner = Runner([], {}, state, ctx, "test_name", graph_signature_hash="test_hash")

edge_runner = RecordingEdgeRunner()
runner._edge_runner_map = {"source": [edge_runner]} # type: ignore[assignment]

for index in range(5):
await ctx.send_message(WorkflowMessage(data=MockMessage(data=index), source_id="source"))

await runner._run_iteration()

assert edge_runner.received == [0, 1, 2, 3, 4]


async def test_runner_run_iteration_delivers_different_edge_runners_concurrently() -> None:
"""Test that different edge runners for the same source are executed concurrently."""

class BlockingEdgeRunner:
def __init__(self) -> None:
self.started = asyncio.Event()
self.release = asyncio.Event()
self.call_count = 0

async def send_message(self, message: WorkflowMessage, state: State, ctx: RunnerContext) -> bool:
self.call_count += 1
self.started.set()
await self.release.wait()
return True

class ProbeEdgeRunner:
def __init__(self) -> None:
self.probe_completed = asyncio.Event()
self.call_count = 0

async def send_message(self, message: WorkflowMessage, state: State, ctx: RunnerContext) -> bool:
self.call_count += 1
self.probe_completed.set()
return True

ctx = InProcRunnerContext()
state = State()
runner = Runner([], {}, state, ctx, "test_name", graph_signature_hash="test_hash")

blocking_edge_runner = BlockingEdgeRunner()
probe_edge_runner = ProbeEdgeRunner()
runner._edge_runner_map = {"source": [blocking_edge_runner, probe_edge_runner]} # type: ignore[assignment]

await ctx.send_message(WorkflowMessage(data=MockMessage(data=1), source_id="source"))

iteration_task = asyncio.create_task(runner._run_iteration())

await blocking_edge_runner.started.wait()
await asyncio.wait_for(probe_edge_runner.probe_completed.wait(), timeout=2.0)

blocking_edge_runner.release.set()
await iteration_task

assert blocking_edge_runner.call_count == 1
assert probe_edge_runner.call_count == 1


async def test_fanout_edge_runner_delivers_to_multiple_targets_concurrently() -> None:
"""Test that FanOutEdgeRunner delivers messages to multiple targets concurrently.

This verifies that when a message is broadcast through a FanOutEdgeGroup (no target_id),
the runner delivers to all targets concurrently rather than sequentially.
"""

class BlockingExecutor(Executor):
"""An executor that blocks until released, used to detect concurrent execution."""

def __init__(self, id: str) -> None:
super().__init__(id=id)
self.started = asyncio.Event()
self.release = asyncio.Event()
self.call_count = 0

@handler
async def handle(self, message: MockMessage, ctx: WorkflowContext[MockMessage, int]) -> None:
self.call_count += 1
self.started.set()
await self.release.wait()

class ProbeExecutor(Executor):
"""An executor that completes immediately, used to probe concurrent execution."""

def __init__(self, id: str) -> None:
super().__init__(id=id)
self.probe_completed = asyncio.Event()
self.call_count = 0

@handler
async def handle(self, message: MockMessage, ctx: WorkflowContext[MockMessage, int]) -> None:
self.call_count += 1
self.probe_completed.set()

source = MockExecutor(id="source")
blocking_target = BlockingExecutor(id="blocking_target")
probe_target = ProbeExecutor(id="probe_target")

# FanOutEdgeGroup broadcasts messages to multiple targets
edge_group = FanOutEdgeGroup(source_id=source.id, target_ids=[blocking_target.id, probe_target.id])

executors: dict[str, Executor] = {
source.id: source,
blocking_target.id: blocking_target,
probe_target.id: probe_target,
}

ctx = InProcRunnerContext()
state = State()
runner = Runner([edge_group], executors, state, ctx, "test_name", graph_signature_hash="test_hash")

# Queue a message from source (will be delivered to both targets via FanOut)
await ctx.send_message(WorkflowMessage(data=MockMessage(data=1), source_id=source.id))

iteration_task = asyncio.create_task(runner._run_iteration())

# Wait for the blocking executor to start
await blocking_target.started.wait()

# If FanOut delivers concurrently, the probe should complete while blocking is still waiting
# If sequential, this would timeout because probe wouldn't start until blocking finishes
await asyncio.wait_for(probe_target.probe_completed.wait(), timeout=2.0)

# Release the blocking executor to allow iteration to complete
blocking_target.release.set()
await iteration_task

# Both executors should have been called exactly once
assert blocking_target.call_count == 1
assert probe_target.call_count == 1


async def test_runner_already_running():
"""Test that running the runner while it is already running raises an error."""
executor_a = MockExecutor(id="executor_a")
Expand Down