.NET: Fixing issue where OpenTelemetry span is never exported in .NET in-process workflow execution#4196
Conversation
…ity never stopped in streaming OffThread path The WorkflowRunActivity_IsStopped_Streaming_OffThread test demonstrates that the workflow.run OpenTelemetry Activity created in StreamingRunEventStream.RunLoopAsync is started but never stopped when using the OffThread/Default streaming execution. The background run loop keeps running after event consumption completes, so the using Activity? declaration never disposes until explicit StopAsync() is called. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> 2. Fix workflow.run Activity never stopped in streaming OffThread execution (microsoft#4155) The workflow.run OpenTelemetry Activity in StreamingRunEventStream.RunLoopAsync was scoped to the method lifetime via 'using'. Since the run loop only exits on cancellation, the Activity was never stopped/exported until explicit disposal. Fix: Remove 'using' and explicitly dispose the Activity when the workflow reaches Idle status (all supersteps complete). A safety-net disposal in the finally block handles cancellation and error paths. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
| await this._eventChannel.Writer.WriteAsync(new InternalHaltSignal(currentEpoch, capturedStatus), linkedSource.Token).ConfigureAwait(false); | ||
|
|
||
| // Stop the workflow.run Activity when the workflow reaches Idle so the span is | ||
| // exported to telemetry backends immediately, rather than waiting for the run loop |
There was a problem hiding this comment.
nit: We can determine when the spans are actually sent to the backend, but we have to properly close them. I recommend we adjust this comment.
| // Stop the workflow.run Activity when the workflow reaches Idle so the span is | ||
| // exported to telemetry backends immediately, rather than waiting for the run loop | ||
| // to be cancelled/disposed. | ||
| if (activity is not null && capturedStatus == RunStatus.Idle) |
There was a problem hiding this comment.
Is this the only status that the workflow will wait for another run call?
There was a problem hiding this comment.
Pull request overview
This PR aims to ensure OpenTelemetry workflow-run spans (Activity) are reliably stopped/disposed (and therefore exported) during .NET in-process workflow execution, including streaming scenarios, and adds regression tests around activity lifecycle behavior.
Changes:
- Updated
StreamingRunEventStream.RunLoopAsyncto manually manage the workflow-runActivitylifecycle (stop onIdleand ensure disposal on loop exit). - Added
WorkflowRunActivityStopTeststo assert workflow-run activities are started and stopped across multiple execution modes.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 3 comments.
| File | Description |
|---|---|
| dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs | Changes workflow-run Activity disposal timing to stop/export spans earlier and adds a safety-net disposal on exit. |
| dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowRunActivityStopTests.cs | Adds regression coverage validating workflow-run activities are stopped/disposed in lockstep, off-thread, and streaming usage. |
| // Stop the workflow.run Activity when the workflow reaches Idle so the span is | ||
| // exported to telemetry backends immediately, rather than waiting for the run loop | ||
| // to be cancelled/disposed. | ||
| if (activity is not null && capturedStatus == RunStatus.Idle) | ||
| { | ||
| activity.AddEvent(new ActivityEvent(EventNames.WorkflowCompleted)); | ||
| activity.Dispose(); | ||
| activity = null; | ||
| } |
There was a problem hiding this comment.
RunLoopAsync disposes and nulls the workflow run Activity when the workflow reaches RunStatus.Idle, but the run loop continues and can process additional inputs (it calls WaitForInputAsync(...) and then sets _runStatus = RunStatus.Running). Since activity is never re-created after being set to null, subsequent turns will run without a workflow_invoke span, and child spans (e.g., executor.process) will lose their parent correlation. Consider creating a new workflow-run activity each time the run loop resumes from a halted state (and adding the corresponding WorkflowStarted/tags), or alternatively keep the activity open until the run is actually ended if the intent is to span the whole session.
| // Stop the workflow.run Activity when the workflow reaches Idle so the span is | |
| // exported to telemetry backends immediately, rather than waiting for the run loop | |
| // to be cancelled/disposed. | |
| if (activity is not null && capturedStatus == RunStatus.Idle) | |
| { | |
| activity.AddEvent(new ActivityEvent(EventNames.WorkflowCompleted)); | |
| activity.Dispose(); | |
| activity = null; | |
| } | |
| // Keep the workflow.run Activity open across Idle so that subsequent inputs | |
| // processed by this run loop continue to be correlated under the same span. | |
| // The Activity will be completed when the run loop is cancelled/disposed. |
| using CancellationTokenSource errorSource = new(); | ||
| CancellationTokenSource linkedSource = CancellationTokenSource.CreateLinkedTokenSource(errorSource.Token, cancellationToken); | ||
|
|
There was a problem hiding this comment.
linkedSource (created via CancellationTokenSource.CreateLinkedTokenSource) is never disposed. This can retain token registrations longer than needed. Wrap it in a using declaration or dispose it in the finally block alongside errorSource and the event unsubscription.
| "workflow.run Activity should be stopped/disposed so it is exported to telemetry backends (issue #4155)"); | ||
| } | ||
|
|
||
| /// <summary> |
There was a problem hiding this comment.
The tests validate that the first workflow_invoke activity is stopped when the workflow becomes idle, but they don’t cover a common follow-up scenario: resuming the same StreamingRun/session with another input after reaching Idle and then streaming again. Adding a regression test that sends a second input after the first idle halt (and asserts a second run activity is started/stopped) would both document the intended multi-turn behavior and catch cases where the activity isn’t re-created after being disposed.
| /// <summary> | |
| /// <summary> | |
| /// Verifies that a new workflow.run activity is started and stopped for each | |
| /// streaming invocation, even when using the same workflow in a multi-turn pattern. | |
| /// </summary> | |
| [Fact] | |
| public async Task WorkflowRunActivity_IsStopped_Streaming_OffThread_MultiTurn() | |
| { | |
| // Arrange | |
| using var testActivity = new Activity("WorkflowRunStopTest_Streaming_OffThread_MultiTurn").Start(); | |
| var workflow = CreateWorkflow(); | |
| // Act - first streaming run | |
| await using (StreamingRun run1 = await InProcessExecution.OffThread.RunStreamingAsync(workflow, "Hello, World!")) | |
| { | |
| await foreach (WorkflowEvent evt in run1.WatchStreamAsync()) | |
| { | |
| // Consume all events from first turn | |
| } | |
| } | |
| // Act - second streaming run (multi-turn scenario with same workflow) | |
| await using (StreamingRun run2 = await InProcessExecution.OffThread.RunStreamingAsync(workflow, "Second turn!")) | |
| { | |
| await foreach (WorkflowEvent evt in run2.WatchStreamAsync()) | |
| { | |
| // Consume all events from second turn | |
| } | |
| } | |
| // Assert - two workflow.run activities should have been started | |
| var startedWorkflowRuns = this._startedActivities | |
| .Where(a => a.RootId == testActivity.RootId && | |
| a.OperationName.StartsWith(ActivityNames.WorkflowRun, StringComparison.Ordinal)) | |
| .ToList(); | |
| startedWorkflowRuns.Should().HaveCount(2, | |
| "each streaming invocation should start its own workflow.run Activity"); | |
| // Assert - both workflow.run activities should have been stopped | |
| var stoppedWorkflowRuns = this._stoppedActivities | |
| .Where(a => a.RootId == testActivity.RootId && | |
| a.OperationName.StartsWith(ActivityNames.WorkflowRun, StringComparison.Ordinal)) | |
| .ToList(); | |
| stoppedWorkflowRuns.Should().HaveCount(2, | |
| "each workflow.run Activity should be stopped/disposed so it is exported to telemetry backends in multi-turn scenarios"); | |
| } | |
| /// <summary> |
| this._stepRunner.OutgoingEvents.EventRaised += OnEventRaisedAsync; | ||
|
|
||
| using Activity? activity = this._stepRunner.TelemetryContext.StartWorkflowRunActivity(); | ||
| Activity? activity = this._stepRunner.TelemetryContext.StartWorkflowRunActivity(); |
There was a problem hiding this comment.
Wouldn't using dispose the object for us in cases of error and cancellation?
This pull request addresses the issue where workflow run telemetry spans (
Activityobjects) were not always properly stopped and exported, particularly in streaming and lockstep execution environments. The changes ensure that workflow run activities are disposed as soon as the workflow reaches the idle state or when the run loop exits, preventing telemetry data from being lost. Additionally, comprehensive regression tests are added to verify correct activity lifecycle management.Improvements to Activity Lifecycle Management:
workflow.runActivityis disposed immediately when the workflow reaches theIdlestate, so telemetry spans are promptly exported rather than waiting for cancellation or disposal.workflow.runActivityif it was not already stopped when the run loop exits, covering cancellation and error scenarios.usingstatement from the activity initialization to allow manual control over the activity's disposal timing.Testing and Regression Coverage:
WorkflowRunActivityStopTests.csto verify that workflow run activities are always properly stopped and exported to telemetry backends, covering lockstep, off-thread, and streaming execution environments, as well as ensuring that all started activities are stopped.Closes #4155
Contribution Checklist