diff --git a/python/packages/core/agent_framework/_workflows/_runner.py b/python/packages/core/agent_framework/_workflows/_runner.py index ce1661cc7f..c548e76e53 100644 --- a/python/packages/core/agent_framework/_workflows/_runner.py +++ b/python/packages/core/agent_framework/_workflows/_runner.py @@ -158,7 +158,28 @@ async def run_until_convergence(self) -> AsyncGenerator[WorkflowEvent, None]: self._running = False async def _run_iteration(self) -> None: - async def _deliver_messages(source_executor_id: str, messages: list[WorkflowMessage]) -> None: + """Run a single iteration of the workflow. + + Messages are delivered through edge runners. A source executor may have multiple outgoing edge + runners. All edge runners run concurrently, but messages sent through the same edge runner are + delivered in the order they were sent to preserve message ordering guarantees per edge. + + What this means in practice: + - A message from a source to multiple target is delivered to all targets concurrently. + - Multiple messages from a source to the same target are delivered in the order they were sent. + - Multiple messages from different sources to the same target can be delivered to the target one + at a time in any order, because true parallelism is not realized in Python. + - Multiple message from different sources to different targets are delivered concurrently to all + targets, assuming each message is targeting a unique target, or it falls back to the previous + rules if there are multiple messages targeting the same target. + - Special case: if using a fan-out edge runner (or derived edge runner that replicates messages + to multiple targets such as multi-selection or switch-case) to send messages to targets from + a source by specifying the target, the messages will be delivered to the specified targets + in the order they were sent. This is because all messages go through the same edge runner instance + which preserves message order. + """ + + async def _deliver_messages(source_executor_id: str, source_messages: list[WorkflowMessage]) -> None: """Outer loop to concurrently deliver messages from all sources to their targets.""" async def _deliver_message_inner(edge_runner: EdgeRunner, message: WorkflowMessage) -> bool: @@ -172,13 +193,20 @@ async def _deliver_message_inner(edge_runner: EdgeRunner, message: WorkflowMessa logger.debug(f"No outgoing edges found for executor {source_executor_id}; dropping messages.") return - for message in messages: - # Deliver a message through all edge runners associated with the source executor concurrently. - tasks = [_deliver_message_inner(edge_runner, message) for edge_runner in associated_edge_runners] - await asyncio.gather(*tasks) + async def _deliver_messages_for_edge_runner(edge_runner: EdgeRunner) -> None: + # Preserve message order per edge runner (and therefore per routed target path) + # while still allowing parallelism across different edge runners. + for message in source_messages: + await _deliver_message_inner(edge_runner, message) + + tasks = [_deliver_messages_for_edge_runner(edge_runner) for edge_runner in associated_edge_runners] + await asyncio.gather(*tasks) - messages = await self._ctx.drain_messages() - tasks = [_deliver_messages(source_executor_id, messages) for source_executor_id, messages in messages.items()] + message_batches = await self._ctx.drain_messages() + tasks = [ + _deliver_messages(source_executor_id, source_messages) + for source_executor_id, source_messages in message_batches.items() + ] await asyncio.gather(*tasks) async def _create_checkpoint_if_enabled(self, previous_checkpoint_id: CheckpointID | None) -> CheckpointID | None: diff --git a/python/packages/core/tests/workflow/test_runner.py b/python/packages/core/tests/workflow/test_runner.py index 039c61b07d..eaf69f90b0 100644 --- a/python/packages/core/tests/workflow/test_runner.py +++ b/python/packages/core/tests/workflow/test_runner.py @@ -21,7 +21,7 @@ handler, ) from agent_framework._workflows._const import EXECUTOR_STATE_KEY -from agent_framework._workflows._edge import SingleEdgeGroup +from agent_framework._workflows._edge import FanOutEdgeGroup, SingleEdgeGroup from agent_framework._workflows._runner import Runner from agent_framework._workflows._runner_context import ( InProcRunnerContext, @@ -150,6 +150,154 @@ async def test_runner_run_until_convergence_not_completed(): assert event.type != "status" or event.state != WorkflowRunState.IDLE +async def test_runner_run_iteration_preserves_message_order_per_edge_runner() -> None: + """Test that _run_iteration preserves message order to the same target path.""" + + class RecordingEdgeRunner: + def __init__(self) -> None: + self.received: list[int] = [] + + async def send_message(self, message: WorkflowMessage, state: State, ctx: RunnerContext) -> bool: + message_data = message.data + assert isinstance(message_data, MockMessage) + self.received.append(message_data.data) + return True + + ctx = InProcRunnerContext() + state = State() + runner = Runner([], {}, state, ctx, "test_name", graph_signature_hash="test_hash") + + edge_runner = RecordingEdgeRunner() + runner._edge_runner_map = {"source": [edge_runner]} # type: ignore[assignment] + + for index in range(5): + await ctx.send_message(WorkflowMessage(data=MockMessage(data=index), source_id="source")) + + await runner._run_iteration() + + assert edge_runner.received == [0, 1, 2, 3, 4] + + +async def test_runner_run_iteration_delivers_different_edge_runners_concurrently() -> None: + """Test that different edge runners for the same source are executed concurrently.""" + + class BlockingEdgeRunner: + def __init__(self) -> None: + self.started = asyncio.Event() + self.release = asyncio.Event() + self.call_count = 0 + + async def send_message(self, message: WorkflowMessage, state: State, ctx: RunnerContext) -> bool: + self.call_count += 1 + self.started.set() + await self.release.wait() + return True + + class ProbeEdgeRunner: + def __init__(self) -> None: + self.probe_completed = asyncio.Event() + self.call_count = 0 + + async def send_message(self, message: WorkflowMessage, state: State, ctx: RunnerContext) -> bool: + self.call_count += 1 + self.probe_completed.set() + return True + + ctx = InProcRunnerContext() + state = State() + runner = Runner([], {}, state, ctx, "test_name", graph_signature_hash="test_hash") + + blocking_edge_runner = BlockingEdgeRunner() + probe_edge_runner = ProbeEdgeRunner() + runner._edge_runner_map = {"source": [blocking_edge_runner, probe_edge_runner]} # type: ignore[assignment] + + await ctx.send_message(WorkflowMessage(data=MockMessage(data=1), source_id="source")) + + iteration_task = asyncio.create_task(runner._run_iteration()) + + await blocking_edge_runner.started.wait() + await asyncio.wait_for(probe_edge_runner.probe_completed.wait(), timeout=2.0) + + blocking_edge_runner.release.set() + await iteration_task + + assert blocking_edge_runner.call_count == 1 + assert probe_edge_runner.call_count == 1 + + +async def test_fanout_edge_runner_delivers_to_multiple_targets_concurrently() -> None: + """Test that FanOutEdgeRunner delivers messages to multiple targets concurrently. + + This verifies that when a message is broadcast through a FanOutEdgeGroup (no target_id), + the runner delivers to all targets concurrently rather than sequentially. + """ + + class BlockingExecutor(Executor): + """An executor that blocks until released, used to detect concurrent execution.""" + + def __init__(self, id: str) -> None: + super().__init__(id=id) + self.started = asyncio.Event() + self.release = asyncio.Event() + self.call_count = 0 + + @handler + async def handle(self, message: MockMessage, ctx: WorkflowContext[MockMessage, int]) -> None: + self.call_count += 1 + self.started.set() + await self.release.wait() + + class ProbeExecutor(Executor): + """An executor that completes immediately, used to probe concurrent execution.""" + + def __init__(self, id: str) -> None: + super().__init__(id=id) + self.probe_completed = asyncio.Event() + self.call_count = 0 + + @handler + async def handle(self, message: MockMessage, ctx: WorkflowContext[MockMessage, int]) -> None: + self.call_count += 1 + self.probe_completed.set() + + source = MockExecutor(id="source") + blocking_target = BlockingExecutor(id="blocking_target") + probe_target = ProbeExecutor(id="probe_target") + + # FanOutEdgeGroup broadcasts messages to multiple targets + edge_group = FanOutEdgeGroup(source_id=source.id, target_ids=[blocking_target.id, probe_target.id]) + + executors: dict[str, Executor] = { + source.id: source, + blocking_target.id: blocking_target, + probe_target.id: probe_target, + } + + ctx = InProcRunnerContext() + state = State() + runner = Runner([edge_group], executors, state, ctx, "test_name", graph_signature_hash="test_hash") + + # Queue a message from source (will be delivered to both targets via FanOut) + await ctx.send_message(WorkflowMessage(data=MockMessage(data=1), source_id=source.id)) + + iteration_task = asyncio.create_task(runner._run_iteration()) + + # Wait for the blocking executor to start + await blocking_target.started.wait() + + # If FanOut delivers concurrently, the probe should complete while blocking is still waiting + # If sequential, this would timeout because probe wouldn't start until blocking finishes + await asyncio.wait_for(probe_target.probe_completed.wait(), timeout=2.0) + + # Release the blocking executor to allow iteration to complete + blocking_target.release.set() + await iteration_task + + # Both executors should have been called exactly once + assert blocking_target.call_count == 1 + assert probe_target.call_count == 1 + + async def test_runner_already_running(): """Test that running the runner while it is already running raises an error.""" executor_a = MockExecutor(id="executor_a")