From 81140f4bb9a3bd46963b2902845357ea53ec07b1 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=B6=20Potato?= <2498898516@qq.com> Date: Sat, 13 Jun 2026 19:22:58 +0800 Subject: [PATCH 1/3] feat(workflows): Add AIContextProvider message event and test, Fix the issue where the internal agent in Workflow is unable to pass the content of AIContextProvider to the Workflow. --- .../AgentAIContextProviderMsgEvent.cs | 11 ++ .../Specialized/AIAgentHostExecutor.cs | 96 ++++++++++++- .../WorkflowSession.cs | 5 + .../AIContextProviderWorkflow.cs | 127 ++++++++++++++++++ 4 files changed, 235 insertions(+), 4 deletions(-) create mode 100644 dotnet/src/Microsoft.Agents.AI.Workflows/AgentAIContextProviderMsgEvent.cs create mode 100644 dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/AIContextProviderWorkflow.cs diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/AgentAIContextProviderMsgEvent.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/AgentAIContextProviderMsgEvent.cs new file mode 100644 index 00000000000..05c9aa036a1 --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/AgentAIContextProviderMsgEvent.cs @@ -0,0 +1,11 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System.Collections.Generic; +using Microsoft.Extensions.AI; + +namespace Microsoft.Agents.AI.Workflows; + +internal sealed class AgentAIContextProviderMsgEvent(IReadOnlyList messages) : WorkflowEvent(messages) +{ + public IReadOnlyList Messages { get; } = messages; +} diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/AIAgentHostExecutor.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/AIAgentHostExecutor.cs index cd20fc4336e..dd28f02eeaa 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/AIAgentHostExecutor.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/AIAgentHostExecutor.cs @@ -6,6 +6,7 @@ using System.Text.Json; using System.Threading; using System.Threading.Tasks; +using Microsoft.Agents.AI; using Microsoft.Extensions.AI; namespace Microsoft.Agents.AI.Workflows.Specialized; @@ -219,13 +220,16 @@ private async ValueTask InvokeAgentAsync(IEnumerable { AgentResponse response; AIAgentUnservicedRequestsCollector collector = new(this._userInputHandler, this._functionCallHandler); + AgentSession session = await this.EnsureSessionAsync(context, cancellationToken).ConfigureAwait(false); + List? historyBefore = await this.GetStoredChatHistorySnapshotAsync(session, cancellationToken).ConfigureAwait(false); + List requestMessages = messages as List ?? messages.ToList(); if (emitUpdateEvents) { // Run the agent in streaming mode only when agent run update events are to be emitted. IAsyncEnumerable agentStream = this._agent.RunStreamingAsync( - messages, - await this.EnsureSessionAsync(context, cancellationToken).ConfigureAwait(false), + requestMessages, + session, cancellationToken: cancellationToken); List updates = []; @@ -241,8 +245,8 @@ await this.EnsureSessionAsync(context, cancellationToken).ConfigureAwait(false), else { // Otherwise, run the agent in non-streaming mode. - response = await this._agent.RunAsync(messages, - await this.EnsureSessionAsync(context, cancellationToken).ConfigureAwait(false), + response = await this._agent.RunAsync(requestMessages, + session, cancellationToken: cancellationToken) .ConfigureAwait(false); @@ -254,11 +258,95 @@ await this.EnsureSessionAsync(context, cancellationToken).ConfigureAwait(false), await context.YieldOutputAsync(response, cancellationToken).ConfigureAwait(false); } + await this.EmitEnrichedRequestMessagesAsync(historyBefore, session, context, cancellationToken).ConfigureAwait(false); + await collector.SubmitAsync(context, cancellationToken).ConfigureAwait(false); return response; } + /// + /// Get a snapshot of the chat history for the given session. + /// + /// The session to get the chat history for. + /// Cancellation token. + /// + private async ValueTask?> GetStoredChatHistorySnapshotAsync(AgentSession session, CancellationToken cancellationToken) + { + ChatHistoryProvider? provider = this._agent.GetService(); + if (provider is null) + { + return null; + } + // if the provider is InMemoryChatHistoryProvider, get the messages directly + if (provider is InMemoryChatHistoryProvider inMemoryProvider) + { + return [.. inMemoryProvider.GetMessages(session)]; + } + + // otherwise, invoke the provider to get the messages +#pragma warning disable MAAI001 // Type is for evaluation purposes only and is subject to change or removal in future updates. Suppress this diagnostic to proceed. + ChatHistoryProvider.InvokingContext invokingContext = new(this._agent, session, []); +#pragma warning restore MAAI001 + IEnumerable messages = await provider.InvokingAsync(invokingContext, cancellationToken).ConfigureAwait(false); + return [.. messages]; + } + + /// + /// Detects request messages that were injected by during the + /// latest agent invocation and raises them as so that + /// the workflow layer can persist them into its chat history. + /// + /// + /// + /// This method compares the agent's stored chat history before and after the agent run. + /// Any newly-added messages whose equals + /// are considered enriched + /// request messages and are forwarded to the workflow event stream. + /// + /// + /// If is (e.g. the agent does not + /// expose a ), this method performs no work. + /// + /// + /// + /// Snapshot of the agent's stored chat history taken before the agent was invoked. + /// + /// The agent session used for the invocation. + /// The current workflow context used to emit events. + /// + /// The to monitor for cancellation requests. + /// + private async ValueTask EmitEnrichedRequestMessagesAsync( + List? historyBefore, + AgentSession session, + IWorkflowContext context, + CancellationToken cancellationToken) + { + if (historyBefore is null) + { + return; + } + + List? historyAfter = await this.GetStoredChatHistorySnapshotAsync(session, cancellationToken).ConfigureAwait(false); + if (historyAfter is null || historyAfter.Count <= historyBefore.Count) + { + return; + } + + List enrichedRequestMessages = + [ + .. historyAfter + .Skip(historyBefore.Count) + .Where(message => message.GetAgentRequestMessageSourceType() == AgentRequestMessageSourceType.AIContextProvider) + ]; + + if (enrichedRequestMessages.Count > 0) + { + await context.AddEventAsync(new AgentAIContextProviderMsgEvent(enrichedRequestMessages), cancellationToken).ConfigureAwait(false); + } + } + /// /// Content types that represent meaningful conversational content portable across agents. /// Messages containing only content types not in this set (e.g. reasoning tokens, web search diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowSession.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowSession.cs index 812bda11509..e099cb6b677 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowSession.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/WorkflowSession.cs @@ -477,6 +477,11 @@ IAsyncEnumerable InvokeStageAsync( yield return update; break; + case AgentAIContextProviderMsgEvent requestMessages: + // Add the message in the AIContentProvider to the ChatHistoryProvider of the Workflow. + this.ChatHistoryProvider.AddMessages(this, requestMessages.Messages); + break; + case WorkflowErrorEvent workflowError: Exception? exception = workflowError.Exception; if (exception is TargetInvocationException tie && tie.InnerException != null) diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/AIContextProviderWorkflow.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/AIContextProviderWorkflow.cs new file mode 100644 index 00000000000..1ba028d8f72 --- /dev/null +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/AIContextProviderWorkflow.cs @@ -0,0 +1,127 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System; +using System.Collections.Generic; +using System.Linq; +using System.Runtime.CompilerServices; +using System.Text.Json; +using System.Threading; +using System.Threading.Tasks; +using FluentAssertions; +using Microsoft.Extensions.AI; + +namespace Microsoft.Agents.AI.Workflows.UnitTests; + +/// +/// Validates that messages injected by into an inner agent +/// are correctly persisted into the workflow's chat history, without leaking to downstream agents. +/// +public class AIContextProviderWorkflow +{ + private const string UserText = "Where is Taggia?"; + private const string ContextText = "Taggia is a city in Liguria."; + private const string FirstAgentResponseText = "Taggia is in Liguria."; + + /// + /// Ensures that AIContextProvider-injected messages appear in the workflow session's + /// chat history and survive serialization (regression test for the bug where such + /// messages were lost because WorkflowHostAgent only persisted model outputs). + /// + [Fact] + public async Task Test_WorkflowAsAgent_SerializesAIContextProviderRequestMessagesAsync() + { + // Arrange + ChatClientAgent innerAgent = CreateContextAwareAgent(); + AIAgent workflowAgent = AgentWorkflowBuilder.BuildSequential(innerAgent).AsAIAgent(); + AgentSession session = await workflowAgent.CreateSessionAsync(); + + // Act + await workflowAgent.RunAsync(new ChatMessage(ChatRole.User, UserText), session); + JsonElement serializedSession = await workflowAgent.SerializeSessionAsync(session); + + // Assert + WorkflowSession workflowSession = session.Should().BeOfType().Subject; + string[] historyTexts = + [ + .. workflowSession.ChatHistoryProvider + .GetAllMessages(workflowSession) + .Select(message => message.Text) + ]; + + historyTexts.Should().Contain(UserText); + historyTexts.Should().Contain(ContextText); + historyTexts.Should().Contain(FirstAgentResponseText); + serializedSession.GetRawText().Should().Contain(ContextText); + } + + /// + /// Ensures that AIContextProvider-injected messages are saved to workflow history + /// but are NOT forwarded as part of the input to subsequent agents in the workflow. + /// + [Fact] + public async Task Test_WorkflowAsAgent_DoesNotForwardAIContextProviderRequestMessagesToDownstreamAgentAsync() + { + // Arrange + ChatClientAgent innerAgent = CreateContextAwareAgent(); + RecordingEchoAgent downstreamAgent = new(id: "downstream", name: "downstream", prefix: "downstream:"); + AIAgent workflowAgent = AgentWorkflowBuilder.BuildSequential(innerAgent, downstreamAgent).AsAIAgent(); + + // Act + await workflowAgent.RunAsync(new ChatMessage(ChatRole.User, UserText), await workflowAgent.CreateSessionAsync()); + + // Assert + downstreamAgent.RecordedInputs.Should().ContainSingle(); + string[] downstreamTexts = [.. downstreamAgent.RecordedInputs[0].Select(message => message.Text)]; + downstreamTexts.Should().Contain(FirstAgentResponseText); + downstreamTexts.Should().NotContain(ContextText); + } + + /// Builds an agent whose IChatClient always replies with , prepopulated with a . + private static ChatClientAgent CreateContextAwareAgent() + { + return new ChatClientAgent( + new StubChatClient(_ => new ChatResponse([new ChatMessage(ChatRole.Assistant, FirstAgentResponseText)])), + new ChatClientAgentOptions + { + Name = "inner", + AIContextProviders = [new StaticAIContextProvider(ContextText)] + }); + } + + /// Always injects a single System message containing the configured text. + private sealed class StaticAIContextProvider(string text) : AIContextProvider + { + protected override ValueTask ProvideAIContextAsync(InvokingContext context, CancellationToken cancellationToken = default) + { + return new(new AIContext + { + Messages = [new ChatMessage(ChatRole.System, text)] + }); + } + } + + /// Test double for that returns deterministic responses via the supplied factory. + private sealed class StubChatClient(Func, ChatResponse> responseFactory) : IChatClient + { + public Task GetResponseAsync(IEnumerable messages, ChatOptions? options = null, CancellationToken cancellationToken = default) + => Task.FromResult(responseFactory(messages)); + + public async IAsyncEnumerable GetStreamingResponseAsync( + IEnumerable messages, + ChatOptions? options = null, + [EnumeratorCancellation] CancellationToken cancellationToken = default) + { + ChatResponse response = await this.GetResponseAsync(messages, options, cancellationToken).ConfigureAwait(false); + foreach (ChatResponseUpdate update in response.ToChatResponseUpdates()) + { + yield return update; + } + } + + public object? GetService(Type serviceType, object? serviceKey = null) => null; + + public void Dispose() + { + } + } +} From 876ba93ee9b9b6791608d517a67ab0bbeabae863 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=B6=20Potato?= <2498898516@qq.com> Date: Sat, 13 Jun 2026 20:59:00 +0800 Subject: [PATCH 2/3] perf: Capture chat history snapshots on demand, without having to obtain them every time. --- .../Specialized/AIAgentHostExecutor.cs | 14 ++++++++++++-- 1 file changed, 12 insertions(+), 2 deletions(-) diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/AIAgentHostExecutor.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/AIAgentHostExecutor.cs index dd28f02eeaa..72270bc841e 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/AIAgentHostExecutor.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/AIAgentHostExecutor.cs @@ -221,7 +221,10 @@ private async ValueTask InvokeAgentAsync(IEnumerable AgentResponse response; AIAgentUnservicedRequestsCollector collector = new(this._userInputHandler, this._functionCallHandler); AgentSession session = await this.EnsureSessionAsync(context, cancellationToken).ConfigureAwait(false); - List? historyBefore = await this.GetStoredChatHistorySnapshotAsync(session, cancellationToken).ConfigureAwait(false); + bool shouldCaptureEnrichedRequestMessages = this.HasAIContextProviders(); + List? historyBefore = shouldCaptureEnrichedRequestMessages + ? await this.GetStoredChatHistorySnapshotAsync(session, cancellationToken).ConfigureAwait(false) + : null; List requestMessages = messages as List ?? messages.ToList(); if (emitUpdateEvents) @@ -258,13 +261,20 @@ private async ValueTask InvokeAgentAsync(IEnumerable await context.YieldOutputAsync(response, cancellationToken).ConfigureAwait(false); } - await this.EmitEnrichedRequestMessagesAsync(historyBefore, session, context, cancellationToken).ConfigureAwait(false); + if (shouldCaptureEnrichedRequestMessages) + { + await this.EmitEnrichedRequestMessagesAsync(historyBefore, session, context, cancellationToken).ConfigureAwait(false); + } await collector.SubmitAsync(context, cancellationToken).ConfigureAwait(false); return response; } + private bool HasAIContextProviders() + => this._agent.GetService()?.AIContextProviders is { Count: > 0 } + || this._agent.GetService()?.AIContextProviders?.Any() == true; + /// /// Get a snapshot of the chat history for the given session. /// From 8d3528bf2b25c65d8efc3a2928a6314ba5dca337 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=B6=20Potato?= <2498898516@qq.com> Date: Sat, 13 Jun 2026 21:48:59 +0800 Subject: [PATCH 3/3] fix: Ensure that the messages can be properly filled in when they are being truncated. Correct the file name of the test document. --- .../Specialized/AIAgentHostExecutor.cs | 49 ++++++++++++++- ...w.cs => AIContextProviderWorkflowTests.cs} | 62 ++++++++++++++++++- 2 files changed, 107 insertions(+), 4 deletions(-) rename dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/{AIContextProviderWorkflow.cs => AIContextProviderWorkflowTests.cs} (67%) diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/AIAgentHostExecutor.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/AIAgentHostExecutor.cs index 72270bc841e..55a2c543c95 100644 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/AIAgentHostExecutor.cs +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/AIAgentHostExecutor.cs @@ -339,7 +339,13 @@ private async ValueTask EmitEnrichedRequestMessagesAsync( } List? historyAfter = await this.GetStoredChatHistorySnapshotAsync(session, cancellationToken).ConfigureAwait(false); - if (historyAfter is null || historyAfter.Count <= historyBefore.Count) + if (historyAfter is null) + { + return; + } + + int firstNewMessageIndex = FindFirstDivergenceIndex(historyBefore, historyAfter); + if (firstNewMessageIndex >= historyAfter.Count) { return; } @@ -347,7 +353,7 @@ private async ValueTask EmitEnrichedRequestMessagesAsync( List enrichedRequestMessages = [ .. historyAfter - .Skip(historyBefore.Count) + .Skip(firstNewMessageIndex) .Where(message => message.GetAgentRequestMessageSourceType() == AgentRequestMessageSourceType.AIContextProvider) ]; @@ -357,6 +363,45 @@ .. historyAfter } } + /// + /// Finds the first index where diverges from . + /// Ensure that the messages can be properly filled in when they are being truncated. + /// + /// The index of the first new or changed message. + private static int FindFirstDivergenceIndex(List historyBefore, List historyAfter) + { + int commonLength = Math.Min(historyBefore.Count, historyAfter.Count); + for (int i = 0; i < commonLength; i++) + { + if (!MessagesCompare(historyBefore[i], historyAfter[i])) + { + return i; + } + } + + return commonLength; + } + + /// + /// Compare two messages + /// + /// Previous messages + /// Cuurrent messages + /// + private static bool MessagesCompare(ChatMessage before, ChatMessage after) + { + if (before.MessageId is not null && after.MessageId is not null) + { + return string.Equals(before.MessageId, after.MessageId, StringComparison.Ordinal); + } + + return before.Role == after.Role + && string.Equals(before.AuthorName, after.AuthorName, StringComparison.Ordinal) + && string.Equals(before.Text, after.Text, StringComparison.Ordinal) + && before.GetAgentRequestMessageSourceType() == after.GetAgentRequestMessageSourceType() + && string.Equals(before.GetAgentRequestMessageSourceId(), after.GetAgentRequestMessageSourceId(), StringComparison.Ordinal); + } + /// /// Content types that represent meaningful conversational content portable across agents. /// Messages containing only content types not in this set (e.g. reasoning tokens, web search diff --git a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/AIContextProviderWorkflow.cs b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/AIContextProviderWorkflowTests.cs similarity index 67% rename from dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/AIContextProviderWorkflow.cs rename to dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/AIContextProviderWorkflowTests.cs index 1ba028d8f72..65a004a77bf 100644 --- a/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/AIContextProviderWorkflow.cs +++ b/dotnet/tests/Microsoft.Agents.AI.Workflows.UnitTests/AIContextProviderWorkflowTests.cs @@ -16,7 +16,7 @@ namespace Microsoft.Agents.AI.Workflows.UnitTests; /// Validates that messages injected by into an inner agent /// are correctly persisted into the workflow's chat history, without leaking to downstream agents. /// -public class AIContextProviderWorkflow +public class AIContextProviderWorkflowTests { private const string UserText = "Where is Taggia?"; private const string ContextText = "Taggia is a city in Liguria."; @@ -54,6 +54,32 @@ .. workflowSession.ChatHistoryProvider serializedSession.GetRawText().Should().Contain(ContextText); } + /// + /// Ensures that AIContextProvider-injected messages are still persisted when inner chat history is pruned. + /// + [Fact] + public async Task Test_WorkflowAsAgent_SerializesAIContextProviderRequestMessagesWhenInnerHistoryIsPrunedAsync() + { + // Arrange + RetainingChatHistoryProvider chatHistoryProvider = new(maxStoredMessages: 2); + chatHistoryProvider.Add(new ChatMessage(ChatRole.User, "Previous question") { MessageId = "previous-user" }); + chatHistoryProvider.Add(new ChatMessage(ChatRole.Assistant, "Previous answer") { MessageId = "previous-assistant" }); + ChatClientAgent innerAgent = CreateContextAwareAgent(chatHistoryProvider); + AIAgent workflowAgent = AgentWorkflowBuilder.BuildSequential(innerAgent).AsAIAgent(); + AgentSession session = await workflowAgent.CreateSessionAsync(); + + // Act + await workflowAgent.RunAsync(new ChatMessage(ChatRole.User, UserText), session); + + // Assert + WorkflowSession workflowSession = session.Should().BeOfType().Subject; + workflowSession.ChatHistoryProvider + .GetAllMessages(workflowSession) + .Select(message => message.Text) + .Should() + .Contain(ContextText); + } + /// /// Ensures that AIContextProvider-injected messages are saved to workflow history /// but are NOT forwarded as part of the input to subsequent agents in the workflow. @@ -77,13 +103,14 @@ public async Task Test_WorkflowAsAgent_DoesNotForwardAIContextProviderRequestMes } /// Builds an agent whose IChatClient always replies with , prepopulated with a . - private static ChatClientAgent CreateContextAwareAgent() + private static ChatClientAgent CreateContextAwareAgent(ChatHistoryProvider? chatHistoryProvider = null) { return new ChatClientAgent( new StubChatClient(_ => new ChatResponse([new ChatMessage(ChatRole.Assistant, FirstAgentResponseText)])), new ChatClientAgentOptions { Name = "inner", + ChatHistoryProvider = chatHistoryProvider, AIContextProviders = [new StaticAIContextProvider(ContextText)] }); } @@ -100,6 +127,37 @@ protected override ValueTask ProvideAIContextAsync(InvokingContext co } } + private sealed class RetainingChatHistoryProvider(int maxStoredMessages) : ChatHistoryProvider + { + private readonly List _messages = []; + + public void Add(ChatMessage message) + { + this._messages.Add(message); + } + + protected override ValueTask> InvokingCoreAsync(InvokingContext context, CancellationToken cancellationToken = default) + { + return new(this._messages.Concat(context.RequestMessages)); + } + + protected override ValueTask StoreChatHistoryAsync(InvokedContext context, CancellationToken cancellationToken = default) + { + this._messages.AddRange(context.RequestMessages); + if (context.ResponseMessages is not null) + { + this._messages.AddRange(context.ResponseMessages); + } + + if (this._messages.Count > maxStoredMessages) + { + this._messages.RemoveRange(0, this._messages.Count - maxStoredMessages); + } + + return default; + } + } + /// Test double for that returns deterministic responses via the supplied factory. private sealed class StubChatClient(Func, ChatResponse> responseFactory) : IChatClient {