Skip to content

Comments

Python: Fix workflow runner concurrent processing#4143

Open
TaoChenOSU wants to merge 1 commit intomicrosoft:mainfrom
TaoChenOSU:taochen/python-fix-workflow-runner-concurrent-processing
Open

Python: Fix workflow runner concurrent processing#4143
TaoChenOSU wants to merge 1 commit intomicrosoft:mainfrom
TaoChenOSU:taochen/python-fix-workflow-runner-concurrent-processing

Conversation

@TaoChenOSU
Copy link
Contributor

@TaoChenOSU TaoChenOSU commented Feb 21, 2026

Motivation and Context

Currently, the runner delivers messages from a source executor one after another regardless of the target. This means if a source sends more than one messages to multiple targets, the targets will be receiving those messages one after another. For example, executor A sends message X to executor B and message Y to executor C, message Y will not be delivered to executor C until executor B finishes processing message X. This breaks our concurrency promise.

Description

The runner will now deliver messages to all targets concurrently.

Summary of the current behavior:

  • A single message from a source to multiple targets will be delivered concurrently to all targets.
  • Multiple messages from a source to the same target will be delivered in the order they are sent.
  • Multiple messages from different sources to the same target will be delivered to the target one at a time without order guarantee because Python doesn't have true parallelism.
  • Multiple messages from different sources to different targets will be delivered concurrently to all targets, assuming each message is targeting a unique target, or it falls back to the previous scenario.

Contribution Checklist

  • The code builds clean without any errors or warnings
  • The PR follows the Contribution Guidelines
  • All unit tests pass, and I have added new tests where possible
  • Is this a breaking change? If yes, add "[BREAKING]" prefix to the title of the PR.

@TaoChenOSU TaoChenOSU self-assigned this Feb 21, 2026
@TaoChenOSU TaoChenOSU added python workflows Related to Workflows in agent-framework labels Feb 21, 2026
@github-actions github-actions bot changed the title Fix workflow runner concurrent processing Python: Fix workflow runner concurrent processing Feb 21, 2026
@markwallace-microsoft
Copy link
Member

Python Test Coverage

Python Test Coverage Report •
FileStmtsMissCoverMissing
packages/core/agent_framework/_workflows
   _runner.py173298%283–284
TOTAL21264331584% 

Python Unit Test Overview

Tests Skipped Failures Errors Time
4191 240 💤 0 ❌ 0 🔥 1m 13s ⏱️

@TaoChenOSU TaoChenOSU marked this pull request as ready for review February 21, 2026 07:33
Copilot AI review requested due to automatic review settings February 21, 2026 07:33
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR updates the Python workflow Runner’s message-delivery loop to improve concurrency so that messages from a single source can be delivered to multiple downstream targets in parallel (instead of being blocked behind one another).

Changes:

  • Refactors Runner._run_iteration() to parallelize message delivery across edge runners while preserving per-edge-runner message ordering.
  • Adds unit tests asserting (1) per-edge-runner ordering and (2) concurrent execution across multiple edge runners for the same source.

Reviewed changes

Copilot reviewed 2 out of 2 changed files in this pull request and generated 4 comments.

File Description
python/packages/core/agent_framework/_workflows/_runner.py Changes _run_iteration() delivery scheduling to run different edge runners concurrently while keeping ordering within each edge runner.
python/packages/core/tests/workflow/test_runner.py Adds tests for per-edge-runner ordering and concurrency across multiple edge runners.

Comment on lines +175 to +179
async def _deliver_messages_for_edge_runner(edge_runner: EdgeRunner) -> None:
# Preserve message order per edge runner (and therefore per routed target path)
# while still allowing parallelism across different edge runners.
for message in source_messages:
await _deliver_message_inner(edge_runner, message)
Copy link

Copilot AI Feb 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change only parallelizes delivery across different EdgeRunner instances. For workflows using a single FanOutEdgeRunner/SwitchCaseEdgeRunner (multiple targets inside one edge group), multiple targeted messages (message.target_id differs) will still be delivered sequentially because they share the same edge runner. That appears to leave the PR’s stated concurrency scenario unfixed for fan-out/switch-case graphs. Consider introducing concurrency keyed by (edge runner, resolved target id) or per-target delivery queues/locks so messages to different targets can run in parallel while preserving per-target ordering.

Copilot uses AI. Check for mistakes.
iteration_task = asyncio.create_task(runner._run_iteration())

await blocking_edge_runner.started.wait()
await asyncio.wait_for(probe_edge_runner.probe_completed.wait(), timeout=0.2)
Copy link

Copilot AI Feb 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The 0.2s timeout on waiting for probe_edge_runner can be flaky on slower CI/event loops. Consider increasing the timeout (or structuring the test to avoid timing sensitivity) so the concurrency assertion doesn’t intermittently fail under load.

Suggested change
await asyncio.wait_for(probe_edge_runner.probe_completed.wait(), timeout=0.2)
await asyncio.wait_for(probe_edge_runner.probe_completed.wait(), timeout=2.0)

Copilot uses AI. Check for mistakes.
message_data = message.data
assert isinstance(message_data, MockMessage)
self.received.append(message_data.data)
await asyncio.sleep(0.005)
Copy link

Copilot AI Feb 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The extra sleep(0.005) makes the test slower without strengthening the ordering assertion (ordering is already guaranteed by awaiting each send). Consider removing it or using await asyncio.sleep(0) if you just need to yield control.

Suggested change
await asyncio.sleep(0.005)

Copilot uses AI. Check for mistakes.
Comment on lines +182 to +184
async def test_runner_run_iteration_delivers_different_edge_runners_concurrently() -> None:
"""Test that different edge runners for the same source are executed concurrently."""

Copy link

Copilot AI Feb 21, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The new concurrency test only covers parallelism across multiple edge runners. Given the PR motivation is about delivering to different targets concurrently, it would be good to add coverage for FanOutEdgeGroup/SwitchCaseEdgeGroup with targeted messages (target_id set) to ensure messages to different targets within a single edge runner can also be delivered concurrently (or to document that limitation explicitly).

Copilot uses AI. Check for mistakes.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

python workflows Related to Workflows in agent-framework

Projects

Status: No status

Development

Successfully merging this pull request may close these issues.

2 participants