-
Notifications
You must be signed in to change notification settings - Fork 1.2k
.NET: Fixing issue where OpenTelemetry span is never exported in .NET in-process workflow execution #4196
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
.NET: Fixing issue where OpenTelemetry span is never exported in .NET in-process workflow execution #4196
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -60,7 +60,7 @@ private async Task RunLoopAsync(CancellationToken cancellationToken) | |||||||||||||||||||||||||
| // Subscribe to events - they will flow directly to the channel as they're raised | ||||||||||||||||||||||||||
| this._stepRunner.OutgoingEvents.EventRaised += OnEventRaisedAsync; | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| using Activity? activity = this._stepRunner.TelemetryContext.StartWorkflowRunActivity(); | ||||||||||||||||||||||||||
| Activity? activity = this._stepRunner.TelemetryContext.StartWorkflowRunActivity(); | ||||||||||||||||||||||||||
| activity?.SetTag(Tags.WorkflowId, this._stepRunner.StartExecutorId).SetTag(Tags.SessionId, this._stepRunner.SessionId); | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| try | ||||||||||||||||||||||||||
|
|
@@ -93,6 +93,16 @@ private async Task RunLoopAsync(CancellationToken cancellationToken) | |||||||||||||||||||||||||
| RunStatus capturedStatus = this._runStatus; | ||||||||||||||||||||||||||
| await this._eventChannel.Writer.WriteAsync(new InternalHaltSignal(currentEpoch, capturedStatus), linkedSource.Token).ConfigureAwait(false); | ||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||
| // Stop the workflow.run Activity when the workflow reaches Idle so the span is | ||||||||||||||||||||||||||
| // exported to telemetry backends immediately, rather than waiting for the run loop | ||||||||||||||||||||||||||
alliscode marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||
| // to be cancelled/disposed. | ||||||||||||||||||||||||||
| if (activity is not null && capturedStatus == RunStatus.Idle) | ||||||||||||||||||||||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is this the only status that the workflow will wait for another run call?
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. There is also (Idle)PendingRequests, but I do not think that it is right to add the completed event in that case. |
||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||
| activity.AddEvent(new ActivityEvent(EventNames.WorkflowCompleted)); | ||||||||||||||||||||||||||
| activity.Dispose(); | ||||||||||||||||||||||||||
| activity = null; | ||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||
|
Comment on lines
+96
to
+104
|
||||||||||||||||||||||||||
| // Stop the workflow.run Activity when the workflow reaches Idle so the span is | |
| // exported to telemetry backends immediately, rather than waiting for the run loop | |
| // to be cancelled/disposed. | |
| if (activity is not null && capturedStatus == RunStatus.Idle) | |
| { | |
| activity.AddEvent(new ActivityEvent(EventNames.WorkflowCompleted)); | |
| activity.Dispose(); | |
| activity = null; | |
| } | |
| // Keep the workflow.run Activity open across Idle so that subsequent inputs | |
| // processed by this run loop continue to be correlated under the same span. | |
| // The Activity will be completed when the run loop is cancelled/disposed. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we retrigger from input, and the activity is null, we should recreate it with a "resume"-like event? Technically, the Workflow execution never "completed". I wonder if we should rethink how we model the lifecycle via events.
If spans can nest, maybe we should look at it more like:
WorkflowSession
{
Start
WorkflowRunStage
{
ReceivedInput
FinishedProcessing/Halted
}
WorkflowRunStage
{
ReceivedInput
FinishedProcessing/Halted
}
WorkflowRunStage
{
ReceivedInput
FinishedProcessing/Halted
}
End
}
With better names, though. Then we can keep the outer span open until the finally, but open/close the run-level span as here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I agree that it feels weird to close the root activity when it's still running and open for input. Let me sync with Tao on how Python is modeling this.
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
| @@ -0,0 +1,204 @@ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Copyright (c) Microsoft. All rights reserved. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| using System; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| using System.Collections.Concurrent; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| using System.Collections.Generic; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| using System.Diagnostics; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| using System.Linq; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| using System.Threading.Tasks; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| using FluentAssertions; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| using Microsoft.Agents.AI.Workflows.InProc; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| using Microsoft.Agents.AI.Workflows.Observability; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| namespace Microsoft.Agents.AI.Workflows.UnitTests; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// <summary> | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// Regression test for https://github.com/microsoft/agent-framework/issues/4155 | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// Verifies that the workflow.run Activity is properly stopped/disposed so it gets exported | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// to telemetry backends. The ActivityStopped callback must fire for the workflow.run span. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// </summary> | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| [Collection("ObservabilityTests")] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| public sealed class WorkflowRunActivityStopTests : IDisposable | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private readonly ActivityListener _activityListener; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private readonly ConcurrentBag<Activity> _startedActivities = []; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private readonly ConcurrentBag<Activity> _stoppedActivities = []; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private bool _isDisposed; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| public WorkflowRunActivityStopTests() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| this._activityListener = new ActivityListener | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ShouldListenTo = source => source.Name.Contains(typeof(Workflow).Namespace!), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Sample = (ref ActivityCreationOptions<ActivityContext> options) => ActivitySamplingResult.AllData, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ActivityStarted = activity => this._startedActivities.Add(activity), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ActivityStopped = activity => this._stoppedActivities.Add(activity), | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| }; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| ActivitySource.AddActivityListener(this._activityListener); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| public void Dispose() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if (!this._isDisposed) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| this._activityListener?.Dispose(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| this._isDisposed = true; | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// <summary> | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// Creates a simple sequential workflow with OpenTelemetry enabled. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// </summary> | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| private static Workflow CreateWorkflow() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Func<string, string> uppercaseFunc = s => s.ToUpperInvariant(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| var uppercase = uppercaseFunc.BindAsExecutor("UppercaseExecutor"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Func<string, string> reverseFunc = s => new string(s.Reverse().ToArray()); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| var reverse = reverseFunc.BindAsExecutor("ReverseTextExecutor"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| WorkflowBuilder builder = new(uppercase); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| builder.AddEdge(uppercase, reverse).WithOutputFrom(reverse); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return builder.WithOpenTelemetry().Build(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// <summary> | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// Verifies that the workflow.run Activity is stopped (and thus exportable) when | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// using the Lockstep execution environment. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// Bug: The Activity created by LockstepRunEventStream.TakeEventStreamAsync is never | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// disposed because yield break in async iterators does not trigger using disposal. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// </summary> | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| [Fact] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| public async Task WorkflowRunActivity_IsStopped_Lockstep() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Arrange | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| using var testActivity = new Activity("WorkflowRunStopTest_Lockstep").Start(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Act | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| var workflow = CreateWorkflow(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Run run = await InProcessExecution.Lockstep.RunAsync(workflow, "Hello, World!"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| await run.DisposeAsync(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Assert - workflow.run should have been started | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| var startedWorkflowRuns = this._startedActivities | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .Where(a => a.RootId == testActivity.RootId && | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| a.OperationName.StartsWith(ActivityNames.WorkflowRun, StringComparison.Ordinal)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .ToList(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| startedWorkflowRuns.Should().HaveCount(1, "workflow.run Activity should be started"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Assert - workflow.run should have been stopped (i.e., Dispose/Stop was called) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // This is the core assertion for issue #4155: the ActivityStopped callback must fire | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| var stoppedWorkflowRuns = this._stoppedActivities | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .Where(a => a.RootId == testActivity.RootId && | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| a.OperationName.StartsWith(ActivityNames.WorkflowRun, StringComparison.Ordinal)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .ToList(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| stoppedWorkflowRuns.Should().HaveCount(1, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "workflow.run Activity should be stopped/disposed so it is exported to telemetry backends (issue #4155)"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// <summary> | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// Verifies that the workflow.run Activity is stopped when using the OffThread (Default) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// execution environment (StreamingRunEventStream). | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// </summary> | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| [Fact] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| public async Task WorkflowRunActivity_IsStopped_OffThread() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Arrange | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| using var testActivity = new Activity("WorkflowRunStopTest_OffThread").Start(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Act | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| var workflow = CreateWorkflow(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Run run = await InProcessExecution.OffThread.RunAsync(workflow, "Hello, World!"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| await run.DisposeAsync(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Assert - workflow.run should have been started | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| var startedWorkflowRuns = this._startedActivities | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .Where(a => a.RootId == testActivity.RootId && | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| a.OperationName.StartsWith(ActivityNames.WorkflowRun, StringComparison.Ordinal)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .ToList(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| startedWorkflowRuns.Should().HaveCount(1, "workflow.run Activity should be started"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Assert - workflow.run should have been stopped | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| var stoppedWorkflowRuns = this._stoppedActivities | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .Where(a => a.RootId == testActivity.RootId && | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| a.OperationName.StartsWith(ActivityNames.WorkflowRun, StringComparison.Ordinal)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .ToList(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| stoppedWorkflowRuns.Should().HaveCount(1, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "workflow.run Activity should be stopped/disposed so it is exported to telemetry backends (issue #4155)"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// <summary> | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// Verifies that the workflow.run Activity is stopped when using the streaming API | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// (StreamingRun.WatchStreamAsync) with the OffThread execution environment. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// This matches the exact usage pattern described in the issue. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// </summary> | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| [Fact] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| public async Task WorkflowRunActivity_IsStopped_Streaming_OffThread() | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Arrange | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| using var testActivity = new Activity("WorkflowRunStopTest_Streaming_OffThread").Start(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Act - use streaming path (WatchStreamAsync), which is the pattern from the issue | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| var workflow = CreateWorkflow(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| await using StreamingRun run = await InProcessExecution.OffThread.RunStreamingAsync(workflow, "Hello, World!"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| await foreach (WorkflowEvent evt in run.WatchStreamAsync()) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Consume all events | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Assert - workflow.run should have been started | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| var startedWorkflowRuns = this._startedActivities | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .Where(a => a.RootId == testActivity.RootId && | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| a.OperationName.StartsWith(ActivityNames.WorkflowRun, StringComparison.Ordinal)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .ToList(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| startedWorkflowRuns.Should().HaveCount(1, "workflow.run Activity should be started"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| // Assert - workflow.run should have been stopped | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| var stoppedWorkflowRuns = this._stoppedActivities | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .Where(a => a.RootId == testActivity.RootId && | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| a.OperationName.StartsWith(ActivityNames.WorkflowRun, StringComparison.Ordinal)) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| .ToList(); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| stoppedWorkflowRuns.Should().HaveCount(1, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| "workflow.run Activity should be stopped/disposed so it is exported to telemetry backends (issue #4155)"); | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| } | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// <summary> | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| /// <summary> | |
| /// <summary> | |
| /// Verifies that a new workflow.run activity is started and stopped for each | |
| /// streaming invocation, even when using the same workflow in a multi-turn pattern. | |
| /// </summary> | |
| [Fact] | |
| public async Task WorkflowRunActivity_IsStopped_Streaming_OffThread_MultiTurn() | |
| { | |
| // Arrange | |
| using var testActivity = new Activity("WorkflowRunStopTest_Streaming_OffThread_MultiTurn").Start(); | |
| var workflow = CreateWorkflow(); | |
| // Act - first streaming run | |
| await using (StreamingRun run1 = await InProcessExecution.OffThread.RunStreamingAsync(workflow, "Hello, World!")) | |
| { | |
| await foreach (WorkflowEvent evt in run1.WatchStreamAsync()) | |
| { | |
| // Consume all events from first turn | |
| } | |
| } | |
| // Act - second streaming run (multi-turn scenario with same workflow) | |
| await using (StreamingRun run2 = await InProcessExecution.OffThread.RunStreamingAsync(workflow, "Second turn!")) | |
| { | |
| await foreach (WorkflowEvent evt in run2.WatchStreamAsync()) | |
| { | |
| // Consume all events from second turn | |
| } | |
| } | |
| // Assert - two workflow.run activities should have been started | |
| var startedWorkflowRuns = this._startedActivities | |
| .Where(a => a.RootId == testActivity.RootId && | |
| a.OperationName.StartsWith(ActivityNames.WorkflowRun, StringComparison.Ordinal)) | |
| .ToList(); | |
| startedWorkflowRuns.Should().HaveCount(2, | |
| "each streaming invocation should start its own workflow.run Activity"); | |
| // Assert - both workflow.run activities should have been stopped | |
| var stoppedWorkflowRuns = this._stoppedActivities | |
| .Where(a => a.RootId == testActivity.RootId && | |
| a.OperationName.StartsWith(ActivityNames.WorkflowRun, StringComparison.Ordinal)) | |
| .ToList(); | |
| stoppedWorkflowRuns.Should().HaveCount(2, | |
| "each workflow.run Activity should be stopped/disposed so it is exported to telemetry backends in multi-turn scenarios"); | |
| } | |
| /// <summary> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wouldn't
usingdispose the object for us in cases of error and cancellation?