diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/LockstepRunEventStream.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/LockstepRunEventStream.cs index f26f27ad96..506a0d1039 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/LockstepRunEventStream.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/LockstepRunEventStream.cs @@ -18,6 +18,7 @@ internal sealed class LockstepRunEventStream : IRunEventStream private int _isDisposed; private readonly ISuperStepRunner _stepRunner; + private Activity? _sessionActivity; public ValueTask GetStatusAsync(CancellationToken cancellationToken = default) => new(this.RunStatus); @@ -30,7 +31,16 @@ public LockstepRunEventStream(ISuperStepRunner stepRunner) public void Start() { - // No-op for lockstep execution + // Save and restore Activity.Current so the long-lived session activity + // doesn't leak into caller code via AsyncLocal. + Activity? previousActivity = Activity.Current; + + this._sessionActivity = this._stepRunner.TelemetryContext.StartWorkflowSessionActivity(); + this._sessionActivity?.SetTag(Tags.WorkflowId, this._stepRunner.StartExecutorId) + .SetTag(Tags.SessionId, this._stepRunner.SessionId); + this._sessionActivity?.AddEvent(new ActivityEvent(EventNames.SessionStarted)); + + Activity.Current = previousActivity; } public async IAsyncEnumerable TakeEventStreamAsync(bool blockOnPendingRequest, [EnumeratorCancellation] CancellationToken cancellationToken = default) @@ -44,19 +54,23 @@ public async IAsyncEnumerable TakeEventStreamAsync(bool blockOnPe } #endif - CancellationTokenSource linkedSource = CancellationTokenSource.CreateLinkedTokenSource(this._stopCancellation.Token, cancellationToken); + using CancellationTokenSource linkedSource = CancellationTokenSource.CreateLinkedTokenSource(this._stopCancellation.Token, cancellationToken); ConcurrentQueue eventSink = []; this._stepRunner.OutgoingEvents.EventRaised += OnWorkflowEventAsync; - using Activity? activity = this._stepRunner.TelemetryContext.StartWorkflowRunActivity(); - activity?.SetTag(Tags.WorkflowId, this._stepRunner.StartExecutorId).SetTag(Tags.SessionId, this._stepRunner.SessionId); + // Re-establish session as parent so the run activity nests correctly. + Activity.Current = this._sessionActivity; + + // Not 'using' — must dispose explicitly in finally for deterministic export. + Activity? runActivity = this._stepRunner.TelemetryContext.StartWorkflowRunActivity(); + runActivity?.SetTag(Tags.WorkflowId, this._stepRunner.StartExecutorId).SetTag(Tags.SessionId, this._stepRunner.SessionId); try { this.RunStatus = RunStatus.Running; - activity?.AddEvent(new ActivityEvent(EventNames.WorkflowStarted)); + runActivity?.AddEvent(new ActivityEvent(EventNames.WorkflowStarted)); do { @@ -65,7 +79,7 @@ public async IAsyncEnumerable TakeEventStreamAsync(bool blockOnPe { // Because we may be yielding out of this function, we need to ensure that the Activity.Current // is set to our activity for the duration of this loop iteration. - Activity.Current = activity; + Activity.Current = runActivity; // Drain SuperSteps while there are steps to run try @@ -75,13 +89,13 @@ public async IAsyncEnumerable TakeEventStreamAsync(bool blockOnPe catch (OperationCanceledException) { } - catch (Exception ex) when (activity is not null) + catch (Exception ex) when (runActivity is not null) { - activity.AddEvent(new ActivityEvent(EventNames.WorkflowError, tags: new() { + runActivity.AddEvent(new ActivityEvent(EventNames.WorkflowError, tags: new() { { Tags.ErrorType, ex.GetType().FullName }, - { Tags.BuildErrorMessage, ex.Message }, + { Tags.ErrorMessage, ex.Message }, })); - activity.CaptureException(ex); + runActivity.CaptureException(ex); throw; } @@ -129,12 +143,16 @@ public async IAsyncEnumerable TakeEventStreamAsync(bool blockOnPe } } while (!ShouldBreak()); - activity?.AddEvent(new ActivityEvent(EventNames.WorkflowCompleted)); + runActivity?.AddEvent(new ActivityEvent(EventNames.WorkflowCompleted)); } finally { this.RunStatus = this._stepRunner.HasUnservicedRequests ? RunStatus.PendingRequests : RunStatus.Idle; this._stepRunner.OutgoingEvents.EventRaised -= OnWorkflowEventAsync; + + // Explicitly dispose the Activity so Activity.Stop fires deterministically, + // regardless of how the async iterator enumerator is disposed. + runActivity?.Dispose(); } ValueTask OnWorkflowEventAsync(object? sender, WorkflowEvent e) @@ -172,6 +190,14 @@ public ValueTask DisposeAsync() { this._stopCancellation.Cancel(); + // Stop the session activity + if (this._sessionActivity is not null) + { + this._sessionActivity.AddEvent(new ActivityEvent(EventNames.SessionCompleted)); + this._sessionActivity.Dispose(); + this._sessionActivity = null; + } + this._stopCancellation.Dispose(); this._inputWaiter.Dispose(); } diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs index a6c34f2b9f..a09dedd8ad 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Execution/StreamingRunEventStream.cs @@ -55,13 +55,20 @@ public void Start() private async Task RunLoopAsync(CancellationToken cancellationToken) { using CancellationTokenSource errorSource = new(); - CancellationTokenSource linkedSource = CancellationTokenSource.CreateLinkedTokenSource(errorSource.Token, cancellationToken); + using CancellationTokenSource linkedSource = CancellationTokenSource.CreateLinkedTokenSource(errorSource.Token, cancellationToken); // Subscribe to events - they will flow directly to the channel as they're raised this._stepRunner.OutgoingEvents.EventRaised += OnEventRaisedAsync; - using Activity? activity = this._stepRunner.TelemetryContext.StartWorkflowRunActivity(); - activity?.SetTag(Tags.WorkflowId, this._stepRunner.StartExecutorId).SetTag(Tags.SessionId, this._stepRunner.SessionId); + // Start the session-level activity that spans the entire run loop lifetime. + // Individual run-stage activities are nested within this session activity. + Activity? sessionActivity = this._stepRunner.TelemetryContext.StartWorkflowSessionActivity(); + sessionActivity?.SetTag(Tags.WorkflowId, this._stepRunner.StartExecutorId) + .SetTag(Tags.SessionId, this._stepRunner.SessionId); + + Activity? runActivity = null; + + sessionActivity?.AddEvent(new ActivityEvent(EventNames.SessionStarted)); try { @@ -70,10 +77,15 @@ private async Task RunLoopAsync(CancellationToken cancellationToken) await this._inputWaiter.WaitForInputAsync(cancellationToken: linkedSource.Token).ConfigureAwait(false); this._runStatus = RunStatus.Running; - activity?.AddEvent(new ActivityEvent(EventNames.WorkflowStarted)); while (!linkedSource.Token.IsCancellationRequested) { + // Start a new run-stage activity for this input→processing→halt cycle + runActivity = this._stepRunner.TelemetryContext.StartWorkflowRunActivity(); + runActivity?.SetTag(Tags.WorkflowId, this._stepRunner.StartExecutorId) + .SetTag(Tags.SessionId, this._stepRunner.SessionId); + runActivity?.AddEvent(new ActivityEvent(EventNames.WorkflowStarted)); + // Run all available supersteps continuously // Events are streamed out in real-time as they happen via the event handler while (this._stepRunner.HasUnprocessedMessages && !linkedSource.Token.IsCancellationRequested) @@ -93,6 +105,15 @@ private async Task RunLoopAsync(CancellationToken cancellationToken) RunStatus capturedStatus = this._runStatus; await this._eventChannel.Writer.WriteAsync(new InternalHaltSignal(currentEpoch, capturedStatus), linkedSource.Token).ConfigureAwait(false); + // Close the run-stage activity when processing halts. + // A new run activity will be created when the next input arrives. + if (runActivity is not null) + { + runActivity.AddEvent(new ActivityEvent(EventNames.WorkflowCompleted)); + runActivity.Dispose(); + runActivity = null; + } + // Wait for next input from the consumer // Works for both Idle (no work) and PendingRequests (waiting for responses) await this._inputWaiter.WaitForInputAsync(TimeSpan.FromSeconds(1), linkedSource.Token).ConfigureAwait(false); @@ -107,14 +128,26 @@ private async Task RunLoopAsync(CancellationToken cancellationToken) } catch (Exception ex) { - if (activity != null) + // Record error on the run-stage activity if one is active + if (runActivity is not null) + { + runActivity.AddEvent(new ActivityEvent(EventNames.WorkflowError, tags: new() { + { Tags.ErrorType, ex.GetType().FullName }, + { Tags.ErrorMessage, ex.Message }, + })); + runActivity.CaptureException(ex); + } + + // Record error on the session activity + if (sessionActivity is not null) { - activity.AddEvent(new ActivityEvent(EventNames.WorkflowError, tags: new() { + sessionActivity.AddEvent(new ActivityEvent(EventNames.SessionError, tags: new() { { Tags.ErrorType, ex.GetType().FullName }, - { Tags.BuildErrorMessage, ex.Message }, + { Tags.ErrorMessage, ex.Message }, })); - activity.CaptureException(ex); + sessionActivity.CaptureException(ex); } + await this._eventChannel.Writer.WriteAsync(new WorkflowErrorEvent(ex), linkedSource.Token).ConfigureAwait(false); } finally @@ -124,7 +157,20 @@ private async Task RunLoopAsync(CancellationToken cancellationToken) // Mark as ended when run loop exits this._runStatus = RunStatus.Ended; - activity?.AddEvent(new ActivityEvent(EventNames.WorkflowCompleted)); + + // Stop the run-stage activity if not already stopped (e.g. on cancellation or error) + if (runActivity is not null) + { + runActivity.AddEvent(new ActivityEvent(EventNames.WorkflowCompleted)); + runActivity.Dispose(); + } + + // Stop the session activity — the session always ends when the run loop exits + if (sessionActivity is not null) + { + sessionActivity.AddEvent(new ActivityEvent(EventNames.SessionCompleted)); + sessionActivity.Dispose(); + } } async ValueTask OnEventRaisedAsync(object? sender, WorkflowEvent e) diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/ActivityNames.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/ActivityNames.cs index 9d2912ecd3..1639fc3c3c 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/ActivityNames.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/ActivityNames.cs @@ -5,7 +5,8 @@ namespace Microsoft.Agents.AI.Workflows.Observability; internal static class ActivityNames { public const string WorkflowBuild = "workflow.build"; - public const string WorkflowRun = "workflow_invoke"; + public const string WorkflowSession = "workflow.session"; + public const string WorkflowInvoke = "workflow_invoke"; public const string MessageSend = "message.send"; public const string ExecutorProcess = "executor.process"; public const string EdgeGroupProcess = "edge_group.process"; diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/EventNames.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/EventNames.cs index 8b9f5bbde8..84540efdc8 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/EventNames.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/EventNames.cs @@ -8,6 +8,9 @@ internal static class EventNames public const string BuildValidationCompleted = "build.validation_completed"; public const string BuildCompleted = "build.completed"; public const string BuildError = "build.error"; + public const string SessionStarted = "session.started"; + public const string SessionCompleted = "session.completed"; + public const string SessionError = "session.error"; public const string WorkflowStarted = "workflow.started"; public const string WorkflowCompleted = "workflow.completed"; public const string WorkflowError = "workflow.error"; diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/Tags.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/Tags.cs index 88c68eceb9..47ce701794 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/Tags.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/Tags.cs @@ -11,6 +11,7 @@ internal static class Tags public const string BuildErrorMessage = "build.error.message"; public const string BuildErrorType = "build.error.type"; public const string ErrorType = "error.type"; + public const string ErrorMessage = "error.message"; public const string SessionId = "session.id"; public const string ExecutorId = "executor.id"; public const string ExecutorType = "executor.type"; diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/WorkflowTelemetryContext.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/WorkflowTelemetryContext.cs index e4b8d7a851..974ffce5c5 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/WorkflowTelemetryContext.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Observability/WorkflowTelemetryContext.cs @@ -88,7 +88,25 @@ public WorkflowTelemetryContext(WorkflowTelemetryOptions options, ActivitySource } /// - /// Starts a workflow run activity if enabled. + /// Starts a workflow session activity if enabled. This is the outer/parent span + /// that represents the entire lifetime of a workflow execution (from start + /// until stop, cancellation, or error) within the current trace. + /// Individual run stages are typically nested within it. + /// + /// An activity if workflow run telemetry is enabled, otherwise null. + public Activity? StartWorkflowSessionActivity() + { + if (!this.IsEnabled || this.Options.DisableWorkflowRun) + { + return null; + } + + return this.ActivitySource.StartActivity(ActivityNames.WorkflowSession); + } + + /// + /// Starts a workflow run activity if enabled. This represents a single + /// input-to-halt cycle within a workflow session. /// /// An activity if workflow run telemetry is enabled, otherwise null. public Activity? StartWorkflowRunActivity() @@ -98,7 +116,7 @@ public WorkflowTelemetryContext(WorkflowTelemetryOptions options, ActivitySource return null; } - return this.ActivitySource.StartActivity(ActivityNames.WorkflowRun); + return this.ActivitySource.StartActivity(ActivityNames.WorkflowInvoke); } /// diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ObservabilityTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ObservabilityTests.cs index e7a99d5ca2..af8a9d8e0d 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ObservabilityTests.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/ObservabilityTests.cs @@ -43,15 +43,16 @@ public ObservabilityTests() /// Create a sample workflow for testing. /// /// - /// This workflow is expected to create 8 activities that will be captured by the tests + /// This workflow is expected to create 9 activities that will be captured by the tests /// - ActivityNames.WorkflowBuild - /// - ActivityNames.WorkflowRun - /// -- ActivityNames.EdgeGroupProcess - /// -- ActivityNames.ExecutorProcess (UppercaseExecutor) - /// --- ActivityNames.MessageSend - /// ---- ActivityNames.EdgeGroupProcess - /// -- ActivityNames.ExecutorProcess (ReverseTextExecutor) - /// --- ActivityNames.MessageSend + /// - ActivityNames.WorkflowSession + /// -- ActivityNames.WorkflowInvoke + /// --- ActivityNames.EdgeGroupProcess + /// --- ActivityNames.ExecutorProcess (UppercaseExecutor) + /// ---- ActivityNames.MessageSend + /// ----- ActivityNames.EdgeGroupProcess + /// --- ActivityNames.ExecutorProcess (ReverseTextExecutor) + /// ---- ActivityNames.MessageSend /// /// The created workflow. private static Workflow CreateWorkflow() @@ -74,7 +75,8 @@ private static Dictionary GetExpectedActivityNameCounts() => new() { { ActivityNames.WorkflowBuild, 1 }, - { ActivityNames.WorkflowRun, 1 }, + { ActivityNames.WorkflowSession, 1 }, + { ActivityNames.WorkflowInvoke, 1 }, { ActivityNames.EdgeGroupProcess, 2 }, { ActivityNames.ExecutorProcess, 2 }, { ActivityNames.MessageSend, 2 } @@ -113,7 +115,7 @@ private async Task TestWorkflowEndToEndActivitiesAsync(string executionEnvironme // Assert var capturedActivities = this._capturedActivities.Where(a => a.RootId == testActivity.RootId).ToList(); - capturedActivities.Should().HaveCount(8, "Exactly 8 activities should be created."); + capturedActivities.Should().HaveCount(9, "Exactly 9 activities should be created."); // Make sure all expected activities exist and have the correct count foreach (var kvp in GetExpectedActivityNameCounts()) @@ -125,7 +127,7 @@ private async Task TestWorkflowEndToEndActivitiesAsync(string executionEnvironme } // Verify WorkflowRun activity events include workflow lifecycle events - var workflowRunActivity = capturedActivities.First(a => a.OperationName.StartsWith(ActivityNames.WorkflowRun, StringComparison.Ordinal)); + var workflowRunActivity = capturedActivities.First(a => a.OperationName.StartsWith(ActivityNames.WorkflowInvoke, StringComparison.Ordinal)); var activityEvents = workflowRunActivity.Events.ToList(); activityEvents.Should().Contain(e => e.Name == EventNames.WorkflowStarted, "activity should have workflow started event"); activityEvents.Should().Contain(e => e.Name == EventNames.WorkflowCompleted, "activity should have workflow completed event"); @@ -273,8 +275,11 @@ public async Task DisableWorkflowRun_PreventsWorkflowRunActivityAsync() // Assert var capturedActivities = this._capturedActivities.Where(a => a.RootId == testActivity.RootId).ToList(); capturedActivities.Should().NotContain( - a => a.OperationName.StartsWith(ActivityNames.WorkflowRun, StringComparison.Ordinal), + a => a.OperationName.StartsWith(ActivityNames.WorkflowInvoke, StringComparison.Ordinal), "WorkflowRun activity should be disabled."); + capturedActivities.Should().NotContain( + a => a.OperationName.StartsWith(ActivityNames.WorkflowSession, StringComparison.Ordinal), + "WorkflowSession activity should also be disabled when DisableWorkflowRun is true."); capturedActivities.Should().Contain( a => a.OperationName.StartsWith(ActivityNames.WorkflowBuild, StringComparison.Ordinal), "Other activities should still be created."); @@ -303,7 +308,7 @@ public async Task DisableExecutorProcess_PreventsExecutorProcessActivityAsync() a => a.OperationName.StartsWith(ActivityNames.ExecutorProcess, StringComparison.Ordinal), "ExecutorProcess activity should be disabled."); capturedActivities.Should().Contain( - a => a.OperationName.StartsWith(ActivityNames.WorkflowRun, StringComparison.Ordinal), + a => a.OperationName.StartsWith(ActivityNames.WorkflowInvoke, StringComparison.Ordinal), "Other activities should still be created."); } diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowRunActivityStopTests.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowRunActivityStopTests.cs new file mode 100644 index 0000000000..f35910f26b --- /dev/null +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/WorkflowRunActivityStopTests.cs @@ -0,0 +1,344 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; +using System.Collections.Concurrent; +using System.Diagnostics; +using System.Linq; +using System.Threading.Tasks; +using FluentAssertions; +using Microsoft.Agents.AI.Workflows.Observability; + +namespace Microsoft.Agents.AI.Workflows.UnitTests; + +/// +/// Regression test for https://github.com/microsoft/agent-framework/issues/4155 +/// Verifies that the workflow_invoke Activity is properly stopped/disposed so it gets exported +/// to telemetry backends. The ActivityStopped callback must fire for the workflow_invoke span. +/// +[Collection("ObservabilityTests")] +public sealed class WorkflowRunActivityStopTests : IDisposable +{ + private readonly ActivityListener _activityListener; + private readonly ConcurrentBag _startedActivities = []; + private readonly ConcurrentBag _stoppedActivities = []; + private bool _isDisposed; + + public WorkflowRunActivityStopTests() + { + this._activityListener = new ActivityListener + { + ShouldListenTo = source => source.Name.Contains(typeof(Workflow).Namespace!), + Sample = (ref ActivityCreationOptions options) => ActivitySamplingResult.AllData, + ActivityStarted = activity => this._startedActivities.Add(activity), + ActivityStopped = activity => this._stoppedActivities.Add(activity), + }; + ActivitySource.AddActivityListener(this._activityListener); + } + + public void Dispose() + { + if (!this._isDisposed) + { + this._activityListener?.Dispose(); + this._isDisposed = true; + } + } + + /// + /// Creates a simple sequential workflow with OpenTelemetry enabled. + /// + private static Workflow CreateWorkflow() + { + Func uppercaseFunc = s => s.ToUpperInvariant(); + var uppercase = uppercaseFunc.BindAsExecutor("UppercaseExecutor"); + + Func reverseFunc = s => new string(s.Reverse().ToArray()); + var reverse = reverseFunc.BindAsExecutor("ReverseTextExecutor"); + + WorkflowBuilder builder = new(uppercase); + builder.AddEdge(uppercase, reverse).WithOutputFrom(reverse); + + return builder.WithOpenTelemetry().Build(); + } + + /// + /// Verifies that the workflow_invoke Activity is stopped (and thus exportable) when + /// using the Lockstep execution environment. + /// Bug: The Activity created by LockstepRunEventStream.TakeEventStreamAsync is never + /// disposed because yield break in async iterators does not trigger using disposal. + /// + [Fact] + public async Task WorkflowRunActivity_IsStopped_LockstepAsync() + { + // Arrange + using var testActivity = new Activity("WorkflowRunStopTest_Lockstep").Start(); + + // Act + var workflow = CreateWorkflow(); + Run run = await InProcessExecution.Lockstep.RunAsync(workflow, "Hello, World!"); + await run.DisposeAsync(); + + // Assert - workflow.session should have been started and stopped + var startedSessions = this._startedActivities + .Where(a => a.RootId == testActivity.RootId && + a.OperationName.StartsWith(ActivityNames.WorkflowSession, StringComparison.Ordinal)) + .ToList(); + startedSessions.Should().HaveCount(1, "workflow.session Activity should be started"); + + var stoppedSessions = this._stoppedActivities + .Where(a => a.RootId == testActivity.RootId && + a.OperationName.StartsWith(ActivityNames.WorkflowSession, StringComparison.Ordinal)) + .ToList(); + stoppedSessions.Should().HaveCount(1, + "workflow.session Activity should be stopped/disposed so it is exported to telemetry backends"); + + // Assert - workflow_invoke should have been started and stopped + var startedWorkflowRuns = this._startedActivities + .Where(a => a.RootId == testActivity.RootId && + a.OperationName.StartsWith(ActivityNames.WorkflowInvoke, StringComparison.Ordinal)) + .ToList(); + startedWorkflowRuns.Should().HaveCount(1, "workflow_invoke Activity should be started"); + + var stoppedWorkflowRuns = this._stoppedActivities + .Where(a => a.RootId == testActivity.RootId && + a.OperationName.StartsWith(ActivityNames.WorkflowInvoke, StringComparison.Ordinal)) + .ToList(); + stoppedWorkflowRuns.Should().HaveCount(1, + "workflow_invoke Activity should be stopped/disposed so it is exported to telemetry backends (issue #4155)"); + } + + /// + /// Verifies that the workflow_invoke Activity is stopped when using the OffThread (Default) + /// execution environment (StreamingRunEventStream). + /// + [Fact] + public async Task WorkflowRunActivity_IsStopped_OffThreadAsync() + { + // Arrange + using var testActivity = new Activity("WorkflowRunStopTest_OffThread").Start(); + + // Act + var workflow = CreateWorkflow(); + Run run = await InProcessExecution.OffThread.RunAsync(workflow, "Hello, World!"); + await run.DisposeAsync(); + + // Assert - workflow.session should have been started and stopped + var startedSessions = this._startedActivities + .Where(a => a.RootId == testActivity.RootId && + a.OperationName.StartsWith(ActivityNames.WorkflowSession, StringComparison.Ordinal)) + .ToList(); + startedSessions.Should().HaveCount(1, "workflow.session Activity should be started"); + + var stoppedSessions = this._stoppedActivities + .Where(a => a.RootId == testActivity.RootId && + a.OperationName.StartsWith(ActivityNames.WorkflowSession, StringComparison.Ordinal)) + .ToList(); + stoppedSessions.Should().HaveCount(1, + "workflow.session Activity should be stopped/disposed so it is exported to telemetry backends"); + + // Assert - workflow_invoke should have been started and stopped + var startedWorkflowRuns = this._startedActivities + .Where(a => a.RootId == testActivity.RootId && + a.OperationName.StartsWith(ActivityNames.WorkflowInvoke, StringComparison.Ordinal)) + .ToList(); + startedWorkflowRuns.Should().HaveCount(1, "workflow_invoke Activity should be started"); + + var stoppedWorkflowRuns = this._stoppedActivities + .Where(a => a.RootId == testActivity.RootId && + a.OperationName.StartsWith(ActivityNames.WorkflowInvoke, StringComparison.Ordinal)) + .ToList(); + stoppedWorkflowRuns.Should().HaveCount(1, + "workflow_invoke Activity should be stopped/disposed so it is exported to telemetry backends (issue #4155)"); + } + + /// + /// Verifies that the workflow_invoke Activity is stopped when using the streaming API + /// (StreamingRun.WatchStreamAsync) with the OffThread execution environment. + /// This matches the exact usage pattern described in the issue. + /// + [Fact] + public async Task WorkflowRunActivity_IsStopped_Streaming_OffThreadAsync() + { + // Arrange + using var testActivity = new Activity("WorkflowRunStopTest_Streaming_OffThread").Start(); + + // Act - use streaming path (WatchStreamAsync), which is the pattern from the issue + var workflow = CreateWorkflow(); + StreamingRun run = await InProcessExecution.OffThread.RunStreamingAsync(workflow, "Hello, World!"); + await foreach (WorkflowEvent evt in run.WatchStreamAsync()) + { + // Consume all events + } + + // Dispose the run before asserting — the run Activity is disposed when the + // run loop exits, which happens during DisposeAsync. Without this, assertions + // can race against the background run loop's finally block. + await run.DisposeAsync(); + + // Assert - workflow.session should have been started + var startedSessions = this._startedActivities + .Where(a => a.RootId == testActivity.RootId && + a.OperationName.StartsWith(ActivityNames.WorkflowSession, StringComparison.Ordinal)) + .ToList(); + startedSessions.Should().HaveCount(1, "workflow.session Activity should be started"); + + // Assert - workflow_invoke should have been started + var startedWorkflowRuns = this._startedActivities + .Where(a => a.RootId == testActivity.RootId && + a.OperationName.StartsWith(ActivityNames.WorkflowInvoke, StringComparison.Ordinal)) + .ToList(); + startedWorkflowRuns.Should().HaveCount(1, "workflow_invoke Activity should be started"); + + // Assert - workflow_invoke should have been stopped + var stoppedWorkflowRuns = this._stoppedActivities + .Where(a => a.RootId == testActivity.RootId && + a.OperationName.StartsWith(ActivityNames.WorkflowInvoke, StringComparison.Ordinal)) + .ToList(); + stoppedWorkflowRuns.Should().HaveCount(1, + "workflow_invoke Activity should be stopped/disposed so it is exported to telemetry backends (issue #4155)"); + } + + /// + /// Verifies that a new workflow_invoke activity is started and stopped for each + /// streaming invocation, even when using the same workflow in a multi-turn pattern, + /// and that each session gets its own session activity. + /// + [Fact] + public async Task WorkflowRunActivity_IsStopped_Streaming_OffThread_MultiTurnAsync() + { + // Arrange + using var testActivity = new Activity("WorkflowRunStopTest_Streaming_OffThread_MultiTurn").Start(); + + var workflow = CreateWorkflow(); + + // Act - first streaming run + await using (StreamingRun run1 = await InProcessExecution.OffThread.RunStreamingAsync(workflow, "Hello, World!")) + { + await foreach (WorkflowEvent evt in run1.WatchStreamAsync()) + { + // Consume all events from first turn + } + } + + // Act - second streaming run (multi-turn scenario with same workflow) + await using (StreamingRun run2 = await InProcessExecution.OffThread.RunStreamingAsync(workflow, "Second turn!")) + { + await foreach (WorkflowEvent evt in run2.WatchStreamAsync()) + { + // Consume all events from second turn + } + } + + // Assert - two workflow.session activities should have been started and stopped + var startedSessions = this._startedActivities + .Where(a => a.RootId == testActivity.RootId && + a.OperationName.StartsWith(ActivityNames.WorkflowSession, StringComparison.Ordinal)) + .ToList(); + startedSessions.Should().HaveCount(2, + "each streaming invocation should start its own workflow.session Activity"); + + var stoppedSessions = this._stoppedActivities + .Where(a => a.RootId == testActivity.RootId && + a.OperationName.StartsWith(ActivityNames.WorkflowSession, StringComparison.Ordinal)) + .ToList(); + stoppedSessions.Should().HaveCount(2, + "each workflow.session Activity should be stopped/disposed so it is exported to telemetry backends"); + + // Assert - two workflow_invoke activities should have been started and stopped + var startedWorkflowRuns = this._startedActivities + .Where(a => a.RootId == testActivity.RootId && + a.OperationName.StartsWith(ActivityNames.WorkflowInvoke, StringComparison.Ordinal)) + .ToList(); + startedWorkflowRuns.Should().HaveCount(2, + "each streaming invocation should start its own workflow_invoke Activity"); + + var stoppedWorkflowRuns = this._stoppedActivities + .Where(a => a.RootId == testActivity.RootId && + a.OperationName.StartsWith(ActivityNames.WorkflowInvoke, StringComparison.Ordinal)) + .ToList(); + stoppedWorkflowRuns.Should().HaveCount(2, + "each workflow_invoke Activity should be stopped/disposed so it is exported to telemetry backends in multi-turn scenarios"); + } + + /// + /// Verifies that all started activities (not just workflow_invoke) are properly stopped. + /// This ensures no spans are "leaked" without being exported. + /// + [Fact] + public async Task AllActivities_AreStopped_AfterWorkflowCompletionAsync() + { + // Arrange + using var testActivity = new Activity("AllActivitiesStopTest").Start(); + + // Act + var workflow = CreateWorkflow(); + Run run = await InProcessExecution.Lockstep.RunAsync(workflow, "Hello, World!"); + await run.DisposeAsync(); + + // Assert - every started activity should also be stopped + var started = this._startedActivities + .Where(a => a.RootId == testActivity.RootId) + .Select(a => a.Id) + .ToHashSet(); + + var stopped = this._stoppedActivities + .Where(a => a.RootId == testActivity.RootId) + .Select(a => a.Id) + .ToHashSet(); + + var neverStopped = started.Except(stopped).ToList(); + if (neverStopped.Count > 0) + { + var neverStoppedNames = this._startedActivities + .Where(a => neverStopped.Contains(a.Id)) + .Select(a => a.OperationName) + .ToList(); + neverStoppedNames.Should().BeEmpty( + "all started activities should be stopped so they are exported. " + + $"Activities started but never stopped: [{string.Join(", ", neverStoppedNames)}]"); + } + } + + /// + /// Verifies that Activity.Current is not leaked after lockstep RunAsync. + /// Application code creating activities after RunAsync returns should not + /// be parented under the workflow session span. The run activity should + /// still nest correctly under the session. + /// + [Fact] + public async Task Lockstep_SessionActivity_DoesNotLeak_IntoCaller_ActivityCurrentAsync() + { + // Arrange + using var testActivity = new Activity("SessionLeakTest").Start(); + var workflow = CreateWorkflow(); + + // Act — run the workflow via lockstep (Start + drain happen inside RunAsync) + Run run = await InProcessExecution.Lockstep.RunAsync(workflow, "Hello, World!"); + + // Create an application activity after RunAsync returns. + // If the session leaked into Activity.Current, this would be parented under it. + using var appActivity = new Activity("AppWork").Start(); + appActivity.Stop(); + + await run.DisposeAsync(); + + // Assert — the app activity should be parented under the test root, not the session + var sessionActivities = this._startedActivities + .Where(a => a.RootId == testActivity.RootId && + a.OperationName.StartsWith(ActivityNames.WorkflowSession, StringComparison.Ordinal)) + .ToList(); + sessionActivities.Should().HaveCount(1, "one session activity should exist"); + + appActivity.ParentId.Should().Be(testActivity.Id, + "application activity should be parented under the test root, not the workflow session"); + + // Assert — the run activity should still be parented under the session + var invokeActivities = this._startedActivities + .Where(a => a.RootId == testActivity.RootId && + a.OperationName.StartsWith(ActivityNames.WorkflowInvoke, StringComparison.Ordinal)) + .ToList(); + invokeActivities.Should().HaveCount(1, "one workflow_invoke activity should exist"); + invokeActivities[0].ParentId.Should().Be(sessionActivities[0].Id, + "workflow_invoke activity should be nested under the session activity"); + } +}