Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ private async Task RunLoopAsync(CancellationToken cancellationToken)
// Subscribe to events - they will flow directly to the channel as they're raised
this._stepRunner.OutgoingEvents.EventRaised += OnEventRaisedAsync;

using Activity? activity = this._stepRunner.TelemetryContext.StartWorkflowRunActivity();
Activity? activity = this._stepRunner.TelemetryContext.StartWorkflowRunActivity();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't using dispose the object for us in cases of error and cancellation?

activity?.SetTag(Tags.WorkflowId, this._stepRunner.StartExecutorId).SetTag(Tags.SessionId, this._stepRunner.SessionId);

try
Expand Down Expand Up @@ -93,6 +93,16 @@ private async Task RunLoopAsync(CancellationToken cancellationToken)
RunStatus capturedStatus = this._runStatus;
await this._eventChannel.Writer.WriteAsync(new InternalHaltSignal(currentEpoch, capturedStatus), linkedSource.Token).ConfigureAwait(false);

// Stop the workflow.run Activity when the workflow reaches Idle so the span is
// exported to telemetry backends immediately, rather than waiting for the run loop
// to be cancelled/disposed.
if (activity is not null && capturedStatus == RunStatus.Idle)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this the only status that the workflow will wait for another run call?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is also (Idle)PendingRequests, but I do not think that it is right to add the completed event in that case.

{
activity.AddEvent(new ActivityEvent(EventNames.WorkflowCompleted));
activity.Dispose();
activity = null;
}
Comment on lines +96 to +104
Copy link

Copilot AI Feb 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RunLoopAsync disposes and nulls the workflow run Activity when the workflow reaches RunStatus.Idle, but the run loop continues and can process additional inputs (it calls WaitForInputAsync(...) and then sets _runStatus = RunStatus.Running). Since activity is never re-created after being set to null, subsequent turns will run without a workflow_invoke span, and child spans (e.g., executor.process) will lose their parent correlation. Consider creating a new workflow-run activity each time the run loop resumes from a halted state (and adding the corresponding WorkflowStarted/tags), or alternatively keep the activity open until the run is actually ended if the intent is to span the whole session.

Suggested change
// Stop the workflow.run Activity when the workflow reaches Idle so the span is
// exported to telemetry backends immediately, rather than waiting for the run loop
// to be cancelled/disposed.
if (activity is not null && capturedStatus == RunStatus.Idle)
{
activity.AddEvent(new ActivityEvent(EventNames.WorkflowCompleted));
activity.Dispose();
activity = null;
}
// Keep the workflow.run Activity open across Idle so that subsequent inputs
// processed by this run loop continue to be correlated under the same span.
// The Activity will be completed when the run loop is cancelled/disposed.

Copilot uses AI. Check for mistakes.
Copy link
Member

@lokitoth lokitoth Feb 24, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we retrigger from input, and the activity is null, we should recreate it with a "resume"-like event? Technically, the Workflow execution never "completed". I wonder if we should rethink how we model the lifecycle via events.

If spans can nest, maybe we should look at it more like:

WorkflowSession
{
   Start
   WorkflowRunStage
   {
      ReceivedInput
      FinishedProcessing/Halted
   }
   WorkflowRunStage
   {
      ReceivedInput
      FinishedProcessing/Halted
   }
   WorkflowRunStage
   {
      ReceivedInput
      FinishedProcessing/Halted
   }
   End
}

With better names, though. Then we can keep the outer span open until the finally, but open/close the run-level span as here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I agree that it feels weird to close the root activity when it's still running and open for input. Let me sync with Tao on how Python is modeling this.


// Wait for next input from the consumer
// Works for both Idle (no work) and PendingRequests (waiting for responses)
await this._inputWaiter.WaitForInputAsync(TimeSpan.FromSeconds(1), linkedSource.Token).ConfigureAwait(false);
Expand Down Expand Up @@ -124,7 +134,13 @@ private async Task RunLoopAsync(CancellationToken cancellationToken)

// Mark as ended when run loop exits
this._runStatus = RunStatus.Ended;
activity?.AddEvent(new ActivityEvent(EventNames.WorkflowCompleted));

// Safety net: stop the activity if not already stopped (e.g. on cancellation or error)
if (activity is not null)
{
activity.AddEvent(new ActivityEvent(EventNames.WorkflowCompleted));
activity.Dispose();
}
}

async ValueTask OnEventRaisedAsync(object? sender, WorkflowEvent e)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,204 @@
// Copyright (c) Microsoft. All rights reserved.

using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Threading.Tasks;
using FluentAssertions;
using Microsoft.Agents.AI.Workflows.InProc;
using Microsoft.Agents.AI.Workflows.Observability;

namespace Microsoft.Agents.AI.Workflows.UnitTests;

/// <summary>
/// Regression test for https://github.com/microsoft/agent-framework/issues/4155
/// Verifies that the workflow.run Activity is properly stopped/disposed so it gets exported
/// to telemetry backends. The ActivityStopped callback must fire for the workflow.run span.
/// </summary>
[Collection("ObservabilityTests")]
public sealed class WorkflowRunActivityStopTests : IDisposable
{
private readonly ActivityListener _activityListener;
private readonly ConcurrentBag<Activity> _startedActivities = [];
private readonly ConcurrentBag<Activity> _stoppedActivities = [];
private bool _isDisposed;

public WorkflowRunActivityStopTests()
{
this._activityListener = new ActivityListener
{
ShouldListenTo = source => source.Name.Contains(typeof(Workflow).Namespace!),
Sample = (ref ActivityCreationOptions<ActivityContext> options) => ActivitySamplingResult.AllData,
ActivityStarted = activity => this._startedActivities.Add(activity),
ActivityStopped = activity => this._stoppedActivities.Add(activity),
};
ActivitySource.AddActivityListener(this._activityListener);
}

public void Dispose()
{
if (!this._isDisposed)
{
this._activityListener?.Dispose();
this._isDisposed = true;
}
}

/// <summary>
/// Creates a simple sequential workflow with OpenTelemetry enabled.
/// </summary>
private static Workflow CreateWorkflow()
{
Func<string, string> uppercaseFunc = s => s.ToUpperInvariant();
var uppercase = uppercaseFunc.BindAsExecutor("UppercaseExecutor");

Func<string, string> reverseFunc = s => new string(s.Reverse().ToArray());
var reverse = reverseFunc.BindAsExecutor("ReverseTextExecutor");

WorkflowBuilder builder = new(uppercase);
builder.AddEdge(uppercase, reverse).WithOutputFrom(reverse);

return builder.WithOpenTelemetry().Build();
}

/// <summary>
/// Verifies that the workflow.run Activity is stopped (and thus exportable) when
/// using the Lockstep execution environment.
/// Bug: The Activity created by LockstepRunEventStream.TakeEventStreamAsync is never
/// disposed because yield break in async iterators does not trigger using disposal.
/// </summary>
[Fact]
public async Task WorkflowRunActivity_IsStopped_Lockstep()
{
// Arrange
using var testActivity = new Activity("WorkflowRunStopTest_Lockstep").Start();

// Act
var workflow = CreateWorkflow();
Run run = await InProcessExecution.Lockstep.RunAsync(workflow, "Hello, World!");
await run.DisposeAsync();

// Assert - workflow.run should have been started
var startedWorkflowRuns = this._startedActivities
.Where(a => a.RootId == testActivity.RootId &&
a.OperationName.StartsWith(ActivityNames.WorkflowRun, StringComparison.Ordinal))
.ToList();
startedWorkflowRuns.Should().HaveCount(1, "workflow.run Activity should be started");

// Assert - workflow.run should have been stopped (i.e., Dispose/Stop was called)
// This is the core assertion for issue #4155: the ActivityStopped callback must fire
var stoppedWorkflowRuns = this._stoppedActivities
.Where(a => a.RootId == testActivity.RootId &&
a.OperationName.StartsWith(ActivityNames.WorkflowRun, StringComparison.Ordinal))
.ToList();
stoppedWorkflowRuns.Should().HaveCount(1,
"workflow.run Activity should be stopped/disposed so it is exported to telemetry backends (issue #4155)");
}

/// <summary>
/// Verifies that the workflow.run Activity is stopped when using the OffThread (Default)
/// execution environment (StreamingRunEventStream).
/// </summary>
[Fact]
public async Task WorkflowRunActivity_IsStopped_OffThread()
{
// Arrange
using var testActivity = new Activity("WorkflowRunStopTest_OffThread").Start();

// Act
var workflow = CreateWorkflow();
Run run = await InProcessExecution.OffThread.RunAsync(workflow, "Hello, World!");
await run.DisposeAsync();

// Assert - workflow.run should have been started
var startedWorkflowRuns = this._startedActivities
.Where(a => a.RootId == testActivity.RootId &&
a.OperationName.StartsWith(ActivityNames.WorkflowRun, StringComparison.Ordinal))
.ToList();
startedWorkflowRuns.Should().HaveCount(1, "workflow.run Activity should be started");

// Assert - workflow.run should have been stopped
var stoppedWorkflowRuns = this._stoppedActivities
.Where(a => a.RootId == testActivity.RootId &&
a.OperationName.StartsWith(ActivityNames.WorkflowRun, StringComparison.Ordinal))
.ToList();
stoppedWorkflowRuns.Should().HaveCount(1,
"workflow.run Activity should be stopped/disposed so it is exported to telemetry backends (issue #4155)");
}

/// <summary>
/// Verifies that the workflow.run Activity is stopped when using the streaming API
/// (StreamingRun.WatchStreamAsync) with the OffThread execution environment.
/// This matches the exact usage pattern described in the issue.
/// </summary>
[Fact]
public async Task WorkflowRunActivity_IsStopped_Streaming_OffThread()
{
// Arrange
using var testActivity = new Activity("WorkflowRunStopTest_Streaming_OffThread").Start();

// Act - use streaming path (WatchStreamAsync), which is the pattern from the issue
var workflow = CreateWorkflow();
await using StreamingRun run = await InProcessExecution.OffThread.RunStreamingAsync(workflow, "Hello, World!");
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
// Consume all events
}

// Assert - workflow.run should have been started
var startedWorkflowRuns = this._startedActivities
.Where(a => a.RootId == testActivity.RootId &&
a.OperationName.StartsWith(ActivityNames.WorkflowRun, StringComparison.Ordinal))
.ToList();
startedWorkflowRuns.Should().HaveCount(1, "workflow.run Activity should be started");

// Assert - workflow.run should have been stopped
var stoppedWorkflowRuns = this._stoppedActivities
.Where(a => a.RootId == testActivity.RootId &&
a.OperationName.StartsWith(ActivityNames.WorkflowRun, StringComparison.Ordinal))
.ToList();
stoppedWorkflowRuns.Should().HaveCount(1,
"workflow.run Activity should be stopped/disposed so it is exported to telemetry backends (issue #4155)");
}

/// <summary>
Copy link

Copilot AI Feb 23, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tests validate that the first workflow_invoke activity is stopped when the workflow becomes idle, but they don’t cover a common follow-up scenario: resuming the same StreamingRun/session with another input after reaching Idle and then streaming again. Adding a regression test that sends a second input after the first idle halt (and asserts a second run activity is started/stopped) would both document the intended multi-turn behavior and catch cases where the activity isn’t re-created after being disposed.

Suggested change
/// <summary>
/// <summary>
/// Verifies that a new workflow.run activity is started and stopped for each
/// streaming invocation, even when using the same workflow in a multi-turn pattern.
/// </summary>
[Fact]
public async Task WorkflowRunActivity_IsStopped_Streaming_OffThread_MultiTurn()
{
// Arrange
using var testActivity = new Activity("WorkflowRunStopTest_Streaming_OffThread_MultiTurn").Start();
var workflow = CreateWorkflow();
// Act - first streaming run
await using (StreamingRun run1 = await InProcessExecution.OffThread.RunStreamingAsync(workflow, "Hello, World!"))
{
await foreach (WorkflowEvent evt in run1.WatchStreamAsync())
{
// Consume all events from first turn
}
}
// Act - second streaming run (multi-turn scenario with same workflow)
await using (StreamingRun run2 = await InProcessExecution.OffThread.RunStreamingAsync(workflow, "Second turn!"))
{
await foreach (WorkflowEvent evt in run2.WatchStreamAsync())
{
// Consume all events from second turn
}
}
// Assert - two workflow.run activities should have been started
var startedWorkflowRuns = this._startedActivities
.Where(a => a.RootId == testActivity.RootId &&
a.OperationName.StartsWith(ActivityNames.WorkflowRun, StringComparison.Ordinal))
.ToList();
startedWorkflowRuns.Should().HaveCount(2,
"each streaming invocation should start its own workflow.run Activity");
// Assert - both workflow.run activities should have been stopped
var stoppedWorkflowRuns = this._stoppedActivities
.Where(a => a.RootId == testActivity.RootId &&
a.OperationName.StartsWith(ActivityNames.WorkflowRun, StringComparison.Ordinal))
.ToList();
stoppedWorkflowRuns.Should().HaveCount(2,
"each workflow.run Activity should be stopped/disposed so it is exported to telemetry backends in multi-turn scenarios");
}
/// <summary>

Copilot uses AI. Check for mistakes.
/// Verifies that all started activities (not just workflow.run) are properly stopped.
/// This ensures no spans are "leaked" without being exported.
/// </summary>
[Fact]
public async Task AllActivities_AreStopped_AfterWorkflowCompletion()
{
// Arrange
using var testActivity = new Activity("AllActivitiesStopTest").Start();

// Act
var workflow = CreateWorkflow();
Run run = await InProcessExecution.Lockstep.RunAsync(workflow, "Hello, World!");
await run.DisposeAsync();

// Assert - every started activity should also be stopped
var started = this._startedActivities
.Where(a => a.RootId == testActivity.RootId)
.Select(a => a.Id)
.ToHashSet();

var stopped = this._stoppedActivities
.Where(a => a.RootId == testActivity.RootId)
.Select(a => a.Id)
.ToHashSet();

var neverStopped = started.Except(stopped).ToList();
if (neverStopped.Count > 0)
{
var neverStoppedNames = this._startedActivities
.Where(a => neverStopped.Contains(a.Id))
.Select(a => a.OperationName)
.ToList();
neverStoppedNames.Should().BeEmpty(
"all started activities should be stopped so they are exported. " +
$"Activities started but never stopped: [{string.Join(", ", neverStoppedNames)}]");
}
}
}
Loading