Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ internal static class WorkflowFactory
internal static Workflow BuildWorkflow(IChatClient chatClient)
{
// Create executors
var startExecutor = new ConcurrentStartExecutor();
var startExecutor = new ChatForwardingExecutor("Start");
var aggregationExecutor = new ConcurrentAggregationExecutor();
AIAgent frenchAgent = GetLanguageAgent("French", chatClient);
AIAgent englishAgent = GetLanguageAgent("English", chatClient);
Expand All @@ -38,33 +38,11 @@ internal static Workflow BuildWorkflow(IChatClient chatClient)
private static ChatClientAgent GetLanguageAgent(string targetLanguage, IChatClient chatClient) =>
new(chatClient, instructions: $"You're a helpful assistant who always responds in {targetLanguage}.", name: $"{targetLanguage}Agent");

/// <summary>
/// Executor that starts the concurrent processing by sending messages to the agents.
/// </summary>
private sealed class ConcurrentStartExecutor() : Executor("ConcurrentStartExecutor")
{
protected override RouteBuilder ConfigureRoutes(RouteBuilder routeBuilder)
{
return routeBuilder
.AddHandler<List<ChatMessage>>(this.RouteMessages)
.AddHandler<TurnToken>(this.RouteTurnTokenAsync);
}

private ValueTask RouteMessages(List<ChatMessage> messages, IWorkflowContext context, CancellationToken cancellationToken)
{
return context.SendMessageAsync(messages, cancellationToken: cancellationToken);
}

private ValueTask RouteTurnTokenAsync(TurnToken token, IWorkflowContext context, CancellationToken cancellationToken)
{
return context.SendMessageAsync(token, cancellationToken: cancellationToken);
}
}

/// <summary>
/// Executor that aggregates the results from the concurrent agents.
/// </summary>
private sealed class ConcurrentAggregationExecutor() : Executor<List<ChatMessage>>("ConcurrentAggregationExecutor")
private sealed class ConcurrentAggregationExecutor() :
Executor<List<ChatMessage>>("ConcurrentAggregationExecutor"), IResettableExecutor
{
private readonly List<ChatMessage> _messages = [];

Expand All @@ -85,5 +63,12 @@ public override async ValueTask HandleAsync(List<ChatMessage> message, IWorkflow
await context.YieldOutputAsync(formattedMessages, cancellationToken);
}
}

/// <inheritdoc/>
public ValueTask ResetAsync()
{
this._messages.Clear();
return default;
}
}
}
71 changes: 71 additions & 0 deletions dotnet/src/Microsoft.Agents.AI.Workflows/ChatForwardingExecutor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
// Copyright (c) Microsoft. All rights reserved.

using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.AI;

namespace Microsoft.Agents.AI.Workflows;

/// <summary>
/// Provides configuration options for <see cref="ChatForwardingExecutor"/>.
/// </summary>
public class ChatForwardingExecutorOptions
{
/// <summary>
/// Gets or sets the chat role to use when converting string messages to <see cref="ChatMessage"/> instances.
/// If set, the executor will accept string messages and convert them to chat messages with this role.
/// </summary>
public ChatRole? StringMessageChatRole { get; set; }
}

/// <summary>
/// A ChatProtocol executor that forwards all messages it receives. Useful for splitting inputs into parallel
/// processing paths.
/// </summary>
/// <remarks>This executor is designed to be cross-run shareable and can be reset to its initial state. It handles
/// multiple chat-related types, enabling flexible message forwarding scenarios. Thread safety and reusability are
/// ensured by its design.</remarks>
/// <param name="id">The unique identifier for the executor instance. Used to distinguish this executor within the system.</param>
/// <param name="options">Optional configuration settings for the executor. If null, default options are used.</param>
public sealed class ChatForwardingExecutor(string id, ChatForwardingExecutorOptions? options = null) : Executor(id, declareCrossRunShareable: true), IResettableExecutor
{
private readonly ChatRole? _stringMessageChatRole = options?.StringMessageChatRole;

/// <inheritdoc/>
protected override RouteBuilder ConfigureRoutes(RouteBuilder routeBuilder)
{
if (this._stringMessageChatRole.HasValue)
{
routeBuilder = routeBuilder.AddHandler<string>(
(message, context) => context.SendMessageAsync(new ChatMessage(ChatRole.User, message)));
}

return routeBuilder.AddHandler<ChatMessage>(ForwardMessageAsync)
.AddHandler<IEnumerable<ChatMessage>>(ForwardMessagesAsync)
.AddHandler<ChatMessage[]>(ForwardMessagesAsync)
.AddHandler<List<ChatMessage>>(ForwardMessagesAsync)
.AddHandler<TurnToken>(ForwardTurnTokenAsync);
}

private static ValueTask ForwardMessageAsync(ChatMessage message, IWorkflowContext context, CancellationToken cancellationToken)
=> context.SendMessageAsync(message, cancellationToken);

// Note that this can be used to split a turn into multiple parallel turns taken, which will cause streaming ChatMessages
// to overlap.
private static ValueTask ForwardTurnTokenAsync(TurnToken message, IWorkflowContext context, CancellationToken cancellationToken)
=> context.SendMessageAsync(message, cancellationToken);

// TODO: This is not ideal, but until we have a way of guaranteeing correct routing of interfaces across serialization
// boundaries, we need to do type unification. It behaves better when used as a handler in ChatProtocolExecutor because
// it is a strictly contravariant use, whereas this forces invariance on the type because it is directly forwarded.
private static ValueTask ForwardMessagesAsync(IEnumerable<ChatMessage> messages, IWorkflowContext context, CancellationToken cancellationToken)
=> context.SendMessageAsync(messages is List<ChatMessage> messageList ? messageList : messages.ToList(), cancellationToken);

private static ValueTask ForwardMessagesAsync(ChatMessage[] messages, IWorkflowContext context, CancellationToken cancellationToken)
=> context.SendMessageAsync(messages, cancellationToken);

/// <inheritdoc/>
public ValueTask ResetAsync() => default;
}

This file was deleted.

Loading