diff --git a/dotnet/samples/GettingStarted/Workflows/Agents/WorkflowAsAnAgent/WorkflowFactory.cs b/dotnet/samples/GettingStarted/Workflows/Agents/WorkflowAsAnAgent/WorkflowFactory.cs index 736c51d555..1335b02cd8 100644 --- a/dotnet/samples/GettingStarted/Workflows/Agents/WorkflowAsAnAgent/WorkflowFactory.cs +++ b/dotnet/samples/GettingStarted/Workflows/Agents/WorkflowAsAnAgent/WorkflowFactory.cs @@ -16,7 +16,7 @@ internal static class WorkflowFactory internal static Workflow BuildWorkflow(IChatClient chatClient) { // Create executors - var startExecutor = new ConcurrentStartExecutor(); + var startExecutor = new ChatForwardingExecutor("Start"); var aggregationExecutor = new ConcurrentAggregationExecutor(); AIAgent frenchAgent = GetLanguageAgent("French", chatClient); AIAgent englishAgent = GetLanguageAgent("English", chatClient); @@ -38,33 +38,11 @@ internal static Workflow BuildWorkflow(IChatClient chatClient) private static ChatClientAgent GetLanguageAgent(string targetLanguage, IChatClient chatClient) => new(chatClient, instructions: $"You're a helpful assistant who always responds in {targetLanguage}.", name: $"{targetLanguage}Agent"); - /// - /// Executor that starts the concurrent processing by sending messages to the agents. - /// - private sealed class ConcurrentStartExecutor() : Executor("ConcurrentStartExecutor") - { - protected override RouteBuilder ConfigureRoutes(RouteBuilder routeBuilder) - { - return routeBuilder - .AddHandler>(this.RouteMessages) - .AddHandler(this.RouteTurnTokenAsync); - } - - private ValueTask RouteMessages(List messages, IWorkflowContext context, CancellationToken cancellationToken) - { - return context.SendMessageAsync(messages, cancellationToken: cancellationToken); - } - - private ValueTask RouteTurnTokenAsync(TurnToken token, IWorkflowContext context, CancellationToken cancellationToken) - { - return context.SendMessageAsync(token, cancellationToken: cancellationToken); - } - } - /// /// Executor that aggregates the results from the concurrent agents. /// - private sealed class ConcurrentAggregationExecutor() : Executor>("ConcurrentAggregationExecutor") + private sealed class ConcurrentAggregationExecutor() : + Executor>("ConcurrentAggregationExecutor"), IResettableExecutor { private readonly List _messages = []; @@ -85,5 +63,12 @@ public override async ValueTask HandleAsync(List message, IWorkflow await context.YieldOutputAsync(formattedMessages, cancellationToken); } } + + /// + public ValueTask ResetAsync() + { + this._messages.Clear(); + return default; + } } } diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/ChatForwardingExecutor.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/ChatForwardingExecutor.cs new file mode 100644 index 0000000000..5bb2f5e237 --- /dev/null +++ b/dotnet/src/Microsoft.Agents.AI.Workflows/ChatForwardingExecutor.cs @@ -0,0 +1,71 @@ +// Copyright (c) Microsoft. All rights reserved. + +using System.Collections.Generic; +using System.Linq; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.AI; + +namespace Microsoft.Agents.AI.Workflows; + +/// +/// Provides configuration options for . +/// +public class ChatForwardingExecutorOptions +{ + /// + /// Gets or sets the chat role to use when converting string messages to instances. + /// If set, the executor will accept string messages and convert them to chat messages with this role. + /// + public ChatRole? StringMessageChatRole { get; set; } +} + +/// +/// A ChatProtocol executor that forwards all messages it receives. Useful for splitting inputs into parallel +/// processing paths. +/// +/// This executor is designed to be cross-run shareable and can be reset to its initial state. It handles +/// multiple chat-related types, enabling flexible message forwarding scenarios. Thread safety and reusability are +/// ensured by its design. +/// The unique identifier for the executor instance. Used to distinguish this executor within the system. +/// Optional configuration settings for the executor. If null, default options are used. +public sealed class ChatForwardingExecutor(string id, ChatForwardingExecutorOptions? options = null) : Executor(id, declareCrossRunShareable: true), IResettableExecutor +{ + private readonly ChatRole? _stringMessageChatRole = options?.StringMessageChatRole; + + /// + protected override RouteBuilder ConfigureRoutes(RouteBuilder routeBuilder) + { + if (this._stringMessageChatRole.HasValue) + { + routeBuilder = routeBuilder.AddHandler( + (message, context) => context.SendMessageAsync(new ChatMessage(ChatRole.User, message))); + } + + return routeBuilder.AddHandler(ForwardMessageAsync) + .AddHandler>(ForwardMessagesAsync) + .AddHandler(ForwardMessagesAsync) + .AddHandler>(ForwardMessagesAsync) + .AddHandler(ForwardTurnTokenAsync); + } + + private static ValueTask ForwardMessageAsync(ChatMessage message, IWorkflowContext context, CancellationToken cancellationToken) + => context.SendMessageAsync(message, cancellationToken); + + // Note that this can be used to split a turn into multiple parallel turns taken, which will cause streaming ChatMessages + // to overlap. + private static ValueTask ForwardTurnTokenAsync(TurnToken message, IWorkflowContext context, CancellationToken cancellationToken) + => context.SendMessageAsync(message, cancellationToken); + + // TODO: This is not ideal, but until we have a way of guaranteeing correct routing of interfaces across serialization + // boundaries, we need to do type unification. It behaves better when used as a handler in ChatProtocolExecutor because + // it is a strictly contravariant use, whereas this forces invariance on the type because it is directly forwarded. + private static ValueTask ForwardMessagesAsync(IEnumerable messages, IWorkflowContext context, CancellationToken cancellationToken) + => context.SendMessageAsync(messages is List messageList ? messageList : messages.ToList(), cancellationToken); + + private static ValueTask ForwardMessagesAsync(ChatMessage[] messages, IWorkflowContext context, CancellationToken cancellationToken) + => context.SendMessageAsync(messages, cancellationToken); + + /// + public ValueTask ResetAsync() => default; +} diff --git a/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/ChatForwardingExecutor.cs b/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/ChatForwardingExecutor.cs deleted file mode 100644 index b395dd4216..0000000000 --- a/dotnet/src/Microsoft.Agents.AI.Workflows/Specialized/ChatForwardingExecutor.cs +++ /dev/null @@ -1,20 +0,0 @@ -// Copyright (c) Microsoft. All rights reserved. - -using System.Collections.Generic; -using System.Threading.Tasks; -using Microsoft.Extensions.AI; - -namespace Microsoft.Agents.AI.Workflows.Specialized; - -/// Executor that forwards all messages. -internal sealed class ChatForwardingExecutor(string id) : Executor(id, declareCrossRunShareable: true), IResettableExecutor -{ - protected override RouteBuilder ConfigureRoutes(RouteBuilder routeBuilder) => - routeBuilder - .AddHandler((message, context, cancellationToken) => context.SendMessageAsync(new ChatMessage(ChatRole.User, message), cancellationToken: cancellationToken)) - .AddHandler((message, context, cancellationToken) => context.SendMessageAsync(message, cancellationToken: cancellationToken)) - .AddHandler>((messages, context, cancellationToken) => context.SendMessageAsync(messages, cancellationToken: cancellationToken)) - .AddHandler((turnToken, context, cancellationToken) => context.SendMessageAsync(turnToken, cancellationToken: cancellationToken)); - - public ValueTask ResetAsync() => default; -}