Skip to content

Commit 6b9d132

Browse files
committed
WIP: fix: WorkflowAsAgent Sample
* Also makes ChatForwardingExecutor public MERGE PENDING #1781
1 parent 8a3cab3 commit 6b9d132

File tree

4 files changed

+61
-31
lines changed

4 files changed

+61
-31
lines changed

dotnet/samples/GettingStarted/Workflows/Agents/WorkflowAsAnAgent/Program.cs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
using Microsoft.Agents.AI;
66
using Microsoft.Agents.AI.Workflows;
77
using Microsoft.Extensions.AI;
8+
using OpenAI;
89

910
namespace WorkflowAsAnAgentsSample;
1011

dotnet/samples/GettingStarted/Workflows/Agents/WorkflowAsAnAgent/WorkflowFactory.cs

Lines changed: 9 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ internal static class WorkflowFactory
1616
internal static Workflow BuildWorkflow(IChatClient chatClient)
1717
{
1818
// Create executors
19-
var startExecutor = new ConcurrentStartExecutor();
19+
var startExecutor = new ChatForwardingExecutor("Start");
2020
var aggregationExecutor = new ConcurrentAggregationExecutor();
2121
AIAgent frenchAgent = GetLanguageAgent("French", chatClient);
2222
AIAgent englishAgent = GetLanguageAgent("English", chatClient);
@@ -38,34 +38,11 @@ internal static Workflow BuildWorkflow(IChatClient chatClient)
3838
private static ChatClientAgent GetLanguageAgent(string targetLanguage, IChatClient chatClient) =>
3939
new(chatClient, instructions: $"You're a helpful assistant who always responds in {targetLanguage}.", name: $"{targetLanguage}Agent");
4040

41-
/// <summary>
42-
/// Executor that starts the concurrent processing by sending messages to the agents.
43-
/// </summary>
44-
private sealed class ConcurrentStartExecutor() :
45-
Executor<List<ChatMessage>>("ConcurrentStartExecutor")
46-
{
47-
/// <summary>
48-
/// Starts the concurrent processing by sending messages to the agents.
49-
/// </summary>
50-
/// <param name="message">The user message to process</param>
51-
/// <param name="context">Workflow context for accessing workflow services and adding events</param>
52-
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests.
53-
/// The default is <see cref="CancellationToken.None"/>.</param>
54-
public override async ValueTask HandleAsync(List<ChatMessage> message, IWorkflowContext context, CancellationToken cancellationToken = default)
55-
{
56-
// Broadcast the message to all connected agents. Receiving agents will queue
57-
// the message but will not start processing until they receive a turn token.
58-
await context.SendMessageAsync(message, cancellationToken: cancellationToken);
59-
// Broadcast the turn token to kick off the agents.
60-
await context.SendMessageAsync(new TurnToken(emitEvents: true), cancellationToken: cancellationToken);
61-
}
62-
}
63-
6441
/// <summary>
6542
/// Executor that aggregates the results from the concurrent agents.
6643
/// </summary>
6744
private sealed class ConcurrentAggregationExecutor() :
68-
Executor<ChatMessage>("ConcurrentAggregationExecutor")
45+
Executor<ChatMessage>("ConcurrentAggregationExecutor"), IResettableExecutor
6946
{
7047
private readonly List<ChatMessage> _messages = [];
7148

@@ -86,5 +63,12 @@ public override async ValueTask HandleAsync(ChatMessage message, IWorkflowContex
8663
await context.YieldOutputAsync(formattedMessages, cancellationToken);
8764
}
8865
}
66+
67+
/// <inheritdoc/>
68+
public ValueTask ResetAsync()
69+
{
70+
this._messages.Clear();
71+
return default;
72+
}
8973
}
9074
}
Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -4,17 +4,26 @@
44
using System.Threading.Tasks;
55
using Microsoft.Extensions.AI;
66

7-
namespace Microsoft.Agents.AI.Workflows.Specialized;
7+
namespace Microsoft.Agents.AI.Workflows;
88

9-
/// <summary>Executor that forwards all messages.</summary>
10-
internal sealed class ChatForwardingExecutor(string id) : Executor(id, declareCrossRunShareable: true), IResettableExecutor
9+
/// <summary>
10+
/// A ChatProtocol executor that forwards all messages it receives. Useful for splitting inputs into parallel
11+
/// processing paths.
12+
/// </summary>
13+
/// <remarks>This executor is designed to be cross-run shareable and can be reset to its initial state. It handles
14+
/// multiple chat-related types, enabling flexible message forwarding scenarios. Thread safety and reusability are
15+
/// ensured by its design.</remarks>
16+
/// <param name="id">The unique identifier for the executor instance. Used to distinguish this executor within the system.</param>
17+
public sealed class ChatForwardingExecutor(string id) : Executor(id, declareCrossRunShareable: true), IResettableExecutor
1118
{
12-
protected override RouteBuilder ConfigureRoutes(RouteBuilder routeBuilder) =>
19+
/// <inheritdoc/>
20+
protected sealed override RouteBuilder ConfigureRoutes(RouteBuilder routeBuilder) =>
1321
routeBuilder
1422
.AddHandler<string>((message, context, cancellationToken) => context.SendMessageAsync(new ChatMessage(ChatRole.User, message), cancellationToken: cancellationToken))
1523
.AddHandler<ChatMessage>((message, context, cancellationToken) => context.SendMessageAsync(message, cancellationToken: cancellationToken))
1624
.AddHandler<List<ChatMessage>>((messages, context, cancellationToken) => context.SendMessageAsync(messages, cancellationToken: cancellationToken))
1725
.AddHandler<TurnToken>((turnToken, context, cancellationToken) => context.SendMessageAsync(turnToken, cancellationToken: cancellationToken));
1826

27+
/// <inheritdoc/>
1928
public ValueTask ResetAsync() => default;
2029
}

dotnet/src/Microsoft.Agents.AI.Workflows/ChatProtocolExecutor.cs

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,8 +13,10 @@ internal class ChatProtocolExecutorOptions
1313
public ChatRole? StringMessageChatRole { get; set; }
1414
}
1515

16-
// TODO: Make this a public type (in a later PR; todo: make an issue)
17-
internal abstract class ChatProtocolExecutor : StatefulExecutor<List<ChatMessage>>
16+
/// <summary>
17+
/// .
18+
/// </summary>
19+
public abstract class ChatProtocolExecutor : StatefulExecutor<List<ChatMessage>>
1820
{
1921
private readonly static Func<List<ChatMessage>> s_initFunction = () => [];
2022
private readonly ChatRole? _stringMessageChatRole;
@@ -25,6 +27,11 @@ internal ChatProtocolExecutor(string id, ChatProtocolExecutorOptions? options =
2527
this._stringMessageChatRole = options?.StringMessageChatRole;
2628
}
2729

30+
/// <summary>
31+
/// .
32+
/// </summary>
33+
/// <param name="routeBuilder"></param>
34+
/// <returns></returns>
2835
protected override RouteBuilder ConfigureRoutes(RouteBuilder routeBuilder)
2936
{
3037
if (this._stringMessageChatRole.HasValue)
@@ -40,6 +47,13 @@ protected override RouteBuilder ConfigureRoutes(RouteBuilder routeBuilder)
4047
.AddHandler<TurnToken>(this.TakeTurnAsync);
4148
}
4249

50+
/// <summary>
51+
/// .
52+
/// </summary>
53+
/// <param name="message"></param>
54+
/// <param name="context"></param>
55+
/// <param name="cancellationToken"></param>
56+
/// <returns></returns>
4357
protected ValueTask AddMessageAsync(ChatMessage message, IWorkflowContext context, CancellationToken cancellationToken = default)
4458
{
4559
return this.InvokeWithStateAsync(ForwardMessageAsync, context, cancellationToken: cancellationToken);
@@ -52,6 +66,13 @@ protected ValueTask AddMessageAsync(ChatMessage message, IWorkflowContext contex
5266
}
5367
}
5468

69+
/// <summary>
70+
/// .
71+
/// </summary>
72+
/// <param name="messages"></param>
73+
/// <param name="context"></param>
74+
/// <param name="cancellationToken"></param>
75+
/// <returns></returns>
5576
protected ValueTask AddMessagesAsync(IEnumerable<ChatMessage> messages, IWorkflowContext context, CancellationToken cancellationToken = default)
5677
{
5778
return this.InvokeWithStateAsync(ForwardMessageAsync, context, cancellationToken: cancellationToken);
@@ -64,6 +85,13 @@ protected ValueTask AddMessagesAsync(IEnumerable<ChatMessage> messages, IWorkflo
6485
}
6586
}
6687

88+
/// <summary>
89+
/// .
90+
/// </summary>
91+
/// <param name="token"></param>
92+
/// <param name="context"></param>
93+
/// <param name="cancellationToken"></param>
94+
/// <returns></returns>
6795
public ValueTask TakeTurnAsync(TurnToken token, IWorkflowContext context, CancellationToken cancellationToken = default)
6896
{
6997
return this.InvokeWithStateAsync(InvokeTakeTurnAsync, context, cancellationToken: cancellationToken);
@@ -81,5 +109,13 @@ await this.TakeTurnAsync(maybePendingMessages ?? s_initFunction(), context, toke
81109
}
82110
}
83111

112+
/// <summary>
113+
/// .
114+
/// </summary>
115+
/// <param name="messages"></param>
116+
/// <param name="context"></param>
117+
/// <param name="emitEvents"></param>
118+
/// <param name="cancellationToken"></param>
119+
/// <returns></returns>
84120
protected abstract ValueTask TakeTurnAsync(List<ChatMessage> messages, IWorkflowContext context, bool? emitEvents, CancellationToken cancellationToken = default);
85121
}

0 commit comments

Comments
 (0)