diff --git a/apps/mesh/src/api/routes/decopilot/routes.ts b/apps/mesh/src/api/routes/decopilot/routes.ts index f98229526..bb18a72f9 100644 --- a/apps/mesh/src/api/routes/decopilot/routes.ts +++ b/apps/mesh/src/api/routes/decopilot/routes.ts @@ -256,56 +256,14 @@ export function createDecopilotRoutes(deps: DecopilotDeps) { await saveMessagesToThread(requestMessage); - // Always create a passthrough client (all real tools) + model client. - // If mode is smart_tool_selection or code_execution, also create the strategy - // client so we get the gateway meta-tools (SEARCH/DESCRIBE/CALL_TOOL/RUN_CODE). - const isGatewayMode = agent.mode !== "passthrough"; - const [modelClient, passthroughClient, strategyClient] = - await Promise.all([ - clientFromConnection(modelConnection, ctx, false), - createVirtualClientFrom(virtualMcp, ctx, "passthrough"), - isGatewayMode - ? createVirtualClientFrom(virtualMcp, ctx, agent.mode) - : Promise.resolve(null), - ]); - - closeClients = () => { - modelClient.close().catch(() => {}); - passthroughClient.close().catch(() => {}); - strategyClient?.close().catch(() => {}); - }; - - // Add streaming support since agents may use streaming models - const streamableModelClient = withStreamingSupport( - modelClient, - models.connectionId, - modelConnection, - ctx, - { superUser: false }, - ); - - // Extract model provider (can stay outside execute) - const modelProvider = await createModelProviderFromClient( - streamableModelClient, - models, - ); - - // MCP client cleanup on run abort (cancel from any pod), not request abort + // Register abort handler early — closeClients is populated inside + // execute once MCP connections are established. abortSignal.addEventListener("abort", () => { closeClients?.(); failThread!(); }); - // Get server instructions if available (for virtual MCP agents) - const serverInstructions = passthroughClient.getInstructions(); - - // Merge platform instructions with request system messages - const systemPrompt = DECOPILOT_BASE_PROMPT(serverInstructions); - const allSystemMessages: ChatMessage[] = [ - systemPrompt, - ...systemMessages, - ]; - + const isGatewayMode = agent.mode !== "passthrough"; const maxOutputTokens = models.thinking.limits?.maxOutputTokens ?? DEFAULT_MAX_TOKENS; @@ -313,23 +271,63 @@ export function createDecopilotRoutes(deps: DecopilotDeps) { let stepCount = 0; let pendingSave: Promise | null = null; + // Pre-load conversation with a basic system prompt (no agent-specific + // instructions). Agent instructions come from the passthrough MCP + // client which is created inside execute to avoid blocking the HTTP + // response and triggering Cloudflare 524 timeouts. const allMessages = await loadAndMergeMessages( mem, requestMessage, - allSystemMessages, + [DECOPILOT_BASE_PROMPT(), ...systemMessages], windowSize, ); const toolOutputMap = new Map(); - // 4. Create stream with writer access for data parts - // Pass originalMessages so handleUIMessageStreamFinish (used by onFinish) - // can locate tool invocations from previous assistant messages during - // tool-approval continuation flows. const uiStream = createUIMessageStream({ originalMessages: allMessages, execute: async ({ writer }) => { - // Create tools inside execute so they have access to writer - // Always get the full passthrough tools (all real tools from connections) + // Create MCP client connections inside execute so the SSE + // response headers are flushed to the client before this + // potentially slow I/O, preventing Cloudflare 524 timeouts. + const [modelClient, passthroughClient, strategyClient] = + await Promise.all([ + clientFromConnection(modelConnection, ctx, false), + createVirtualClientFrom(virtualMcp, ctx, "passthrough"), + isGatewayMode + ? createVirtualClientFrom(virtualMcp, ctx, agent.mode) + : Promise.resolve(null), + ]); + + closeClients = () => { + modelClient.close().catch(() => {}); + passthroughClient.close().catch(() => {}); + strategyClient?.close().catch(() => {}); + }; + + const streamableModelClient = withStreamingSupport( + modelClient, + models.connectionId, + modelConnection, + ctx, + { superUser: false }, + ); + + const modelProvider = await createModelProviderFromClient( + streamableModelClient, + models, + ); + + // Enrich the pre-loaded messages with agent-specific instructions + // from the virtual MCP now that the client is available. + const serverInstructions = passthroughClient.getInstructions(); + const enrichedMessages = serverInstructions?.trim() + ? allMessages.map((msg) => + msg.id === "decopilot-system" + ? DECOPILOT_BASE_PROMPT(serverInstructions) + : msg, + ) + : allMessages; + const passthroughTools = await toolsFromMCP( passthroughClient, toolOutputMap, @@ -337,8 +335,6 @@ export function createDecopilotRoutes(deps: DecopilotDeps) { toolApprovalLevel, ); - // If using a gateway mode, also get the strategy meta-tools - // (GATEWAY_SEARCH_TOOLS, GATEWAY_DESCRIBE_TOOLS, GATEWAY_CALL_TOOL / GATEWAY_RUN_CODE) const strategyTools = strategyClient ? await toolsFromMCP( strategyClient, @@ -360,17 +356,12 @@ export function createDecopilotRoutes(deps: DecopilotDeps) { ctx, ); - // Merge all tools: strategy meta-tools override passthrough tools with the same name, - // and built-in tools take final precedence. const tools = { ...passthroughTools, ...strategyTools, ...builtInTools, }; - // In gateway modes, only expose the strategy meta-tools + built-ins to the LLM. - // The passthrough tools are still registered (so the AI SDK won't throw if the - // model calls a discovered tool directly), but the LLM won't see their schemas. const activeToolNames = strategyClient ? ([ ...Object.keys(strategyTools), @@ -378,12 +369,11 @@ export function createDecopilotRoutes(deps: DecopilotDeps) { ] as (keyof typeof tools)[]) : undefined; - // Process conversation with tools for validation const { systemMessages: processedSystemMessages, messages: processedMessages, originalMessages, - } = await processConversation(allMessages, { + } = await processConversation(enrichedMessages, { windowSize, models, tools,