Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
112 changes: 51 additions & 61 deletions apps/mesh/src/api/routes/decopilot/routes.ts
Original file line number Diff line number Diff line change
Expand Up @@ -256,89 +256,85 @@ export function createDecopilotRoutes(deps: DecopilotDeps) {

await saveMessagesToThread(requestMessage);

// Always create a passthrough client (all real tools) + model client.
// If mode is smart_tool_selection or code_execution, also create the strategy
// client so we get the gateway meta-tools (SEARCH/DESCRIBE/CALL_TOOL/RUN_CODE).
const isGatewayMode = agent.mode !== "passthrough";
const [modelClient, passthroughClient, strategyClient] =
await Promise.all([
clientFromConnection(modelConnection, ctx, false),
createVirtualClientFrom(virtualMcp, ctx, "passthrough"),
isGatewayMode
? createVirtualClientFrom(virtualMcp, ctx, agent.mode)
: Promise.resolve(null),
]);

closeClients = () => {
modelClient.close().catch(() => {});
passthroughClient.close().catch(() => {});
strategyClient?.close().catch(() => {});
};

// Add streaming support since agents may use streaming models
const streamableModelClient = withStreamingSupport(
modelClient,
models.connectionId,
modelConnection,
ctx,
{ superUser: false },
);

// Extract model provider (can stay outside execute)
const modelProvider = await createModelProviderFromClient(
streamableModelClient,
models,
);

// MCP client cleanup on run abort (cancel from any pod), not request abort
// Register abort handler early — closeClients is populated inside
// execute once MCP connections are established.
abortSignal.addEventListener("abort", () => {
closeClients?.();
failThread!();
});

// Get server instructions if available (for virtual MCP agents)
const serverInstructions = passthroughClient.getInstructions();

// Merge platform instructions with request system messages
const systemPrompt = DECOPILOT_BASE_PROMPT(serverInstructions);
const allSystemMessages: ChatMessage[] = [
systemPrompt,
...systemMessages,
];

const isGatewayMode = agent.mode !== "passthrough";
const maxOutputTokens =
models.thinking.limits?.maxOutputTokens ?? DEFAULT_MAX_TOKENS;

let streamFinished = false;
let stepCount = 0;
let pendingSave: Promise<void> | null = null;

// Pre-load conversation with a basic system prompt (no agent-specific
// instructions). Agent instructions come from the passthrough MCP
// client which is created inside execute to avoid blocking the HTTP
// response and triggering Cloudflare 524 timeouts.
const allMessages = await loadAndMergeMessages(
mem,
requestMessage,
allSystemMessages,
[DECOPILOT_BASE_PROMPT(), ...systemMessages],
windowSize,
);

const toolOutputMap = new Map<string, string>();
// 4. Create stream with writer access for data parts
// Pass originalMessages so handleUIMessageStreamFinish (used by onFinish)
// can locate tool invocations from previous assistant messages during
// tool-approval continuation flows.
const uiStream = createUIMessageStream({
originalMessages: allMessages,
execute: async ({ writer }) => {
// Create tools inside execute so they have access to writer
// Always get the full passthrough tools (all real tools from connections)
// Create MCP client connections inside execute so the SSE
// response headers are flushed to the client before this
// potentially slow I/O, preventing Cloudflare 524 timeouts.
const [modelClient, passthroughClient, strategyClient] =
await Promise.all([
clientFromConnection(modelConnection, ctx, false),
createVirtualClientFrom(virtualMcp, ctx, "passthrough"),
isGatewayMode
? createVirtualClientFrom(virtualMcp, ctx, agent.mode)
: Promise.resolve(null),
]);

closeClients = () => {
modelClient.close().catch(() => {});
passthroughClient.close().catch(() => {});
strategyClient?.close().catch(() => {});
};

const streamableModelClient = withStreamingSupport(
modelClient,
models.connectionId,
modelConnection,
ctx,
{ superUser: false },
);

const modelProvider = await createModelProviderFromClient(
streamableModelClient,
models,
);

// Enrich the pre-loaded messages with agent-specific instructions
// from the virtual MCP now that the client is available.
const serverInstructions = passthroughClient.getInstructions();
const enrichedMessages = serverInstructions?.trim()
? allMessages.map((msg) =>
msg.id === "decopilot-system"
? DECOPILOT_BASE_PROMPT(serverInstructions)
: msg,
)
: allMessages;

const passthroughTools = await toolsFromMCP(
passthroughClient,
toolOutputMap,
writer,
toolApprovalLevel,
);

// If using a gateway mode, also get the strategy meta-tools
// (GATEWAY_SEARCH_TOOLS, GATEWAY_DESCRIBE_TOOLS, GATEWAY_CALL_TOOL / GATEWAY_RUN_CODE)
const strategyTools = strategyClient
? await toolsFromMCP(
strategyClient,
Expand All @@ -360,30 +356,24 @@ export function createDecopilotRoutes(deps: DecopilotDeps) {
ctx,
);

// Merge all tools: strategy meta-tools override passthrough tools with the same name,
// and built-in tools take final precedence.
const tools = {
...passthroughTools,
...strategyTools,
...builtInTools,
};

// In gateway modes, only expose the strategy meta-tools + built-ins to the LLM.
// The passthrough tools are still registered (so the AI SDK won't throw if the
// model calls a discovered tool directly), but the LLM won't see their schemas.
const activeToolNames = strategyClient
? ([
...Object.keys(strategyTools),
...Object.keys(builtInTools),
] as (keyof typeof tools)[])
: undefined;

// Process conversation with tools for validation
const {
systemMessages: processedSystemMessages,
messages: processedMessages,
originalMessages,
} = await processConversation(allMessages, {
} = await processConversation(enrichedMessages, {
windowSize,
models,
tools,
Expand Down