From c84fcf259c821dd33b7a5b5cb7b2627959a637c5 Mon Sep 17 00:00:00 2001 From: Jonas Jesus Date: Thu, 26 Feb 2026 23:20:00 -0300 Subject: [PATCH] fix(decopilot): prevent Cloudflare 524 timeout on stream endpoint Move MCP client creation (clientFromConnection, createVirtualClientFrom, createModelProviderFromClient) from the synchronous request handler into the stream execute callback. This ensures the SSE response headers are flushed to the client immediately, before the potentially slow MCP connection setup begins. Previously, the handler awaited all MCP client connections before returning createUIMessageStreamResponse, causing Cloudflare to timeout (524) when client initialization exceeded 100 seconds. Agent-specific system instructions are now lazily enriched inside execute once the passthrough client is available, identified by the stable message id decopilot-system. Made-with: Cursor --- apps/mesh/src/api/routes/decopilot/routes.ts | 112 +++++++++---------- 1 file changed, 51 insertions(+), 61 deletions(-) diff --git a/apps/mesh/src/api/routes/decopilot/routes.ts b/apps/mesh/src/api/routes/decopilot/routes.ts index f98229526..bb18a72f9 100644 --- a/apps/mesh/src/api/routes/decopilot/routes.ts +++ b/apps/mesh/src/api/routes/decopilot/routes.ts @@ -256,56 +256,14 @@ export function createDecopilotRoutes(deps: DecopilotDeps) { await saveMessagesToThread(requestMessage); - // Always create a passthrough client (all real tools) + model client. - // If mode is smart_tool_selection or code_execution, also create the strategy - // client so we get the gateway meta-tools (SEARCH/DESCRIBE/CALL_TOOL/RUN_CODE). - const isGatewayMode = agent.mode !== "passthrough"; - const [modelClient, passthroughClient, strategyClient] = - await Promise.all([ - clientFromConnection(modelConnection, ctx, false), - createVirtualClientFrom(virtualMcp, ctx, "passthrough"), - isGatewayMode - ? createVirtualClientFrom(virtualMcp, ctx, agent.mode) - : Promise.resolve(null), - ]); - - closeClients = () => { - modelClient.close().catch(() => {}); - passthroughClient.close().catch(() => {}); - strategyClient?.close().catch(() => {}); - }; - - // Add streaming support since agents may use streaming models - const streamableModelClient = withStreamingSupport( - modelClient, - models.connectionId, - modelConnection, - ctx, - { superUser: false }, - ); - - // Extract model provider (can stay outside execute) - const modelProvider = await createModelProviderFromClient( - streamableModelClient, - models, - ); - - // MCP client cleanup on run abort (cancel from any pod), not request abort + // Register abort handler early — closeClients is populated inside + // execute once MCP connections are established. abortSignal.addEventListener("abort", () => { closeClients?.(); failThread!(); }); - // Get server instructions if available (for virtual MCP agents) - const serverInstructions = passthroughClient.getInstructions(); - - // Merge platform instructions with request system messages - const systemPrompt = DECOPILOT_BASE_PROMPT(serverInstructions); - const allSystemMessages: ChatMessage[] = [ - systemPrompt, - ...systemMessages, - ]; - + const isGatewayMode = agent.mode !== "passthrough"; const maxOutputTokens = models.thinking.limits?.maxOutputTokens ?? DEFAULT_MAX_TOKENS; @@ -313,23 +271,63 @@ export function createDecopilotRoutes(deps: DecopilotDeps) { let stepCount = 0; let pendingSave: Promise | null = null; + // Pre-load conversation with a basic system prompt (no agent-specific + // instructions). Agent instructions come from the passthrough MCP + // client which is created inside execute to avoid blocking the HTTP + // response and triggering Cloudflare 524 timeouts. const allMessages = await loadAndMergeMessages( mem, requestMessage, - allSystemMessages, + [DECOPILOT_BASE_PROMPT(), ...systemMessages], windowSize, ); const toolOutputMap = new Map(); - // 4. Create stream with writer access for data parts - // Pass originalMessages so handleUIMessageStreamFinish (used by onFinish) - // can locate tool invocations from previous assistant messages during - // tool-approval continuation flows. const uiStream = createUIMessageStream({ originalMessages: allMessages, execute: async ({ writer }) => { - // Create tools inside execute so they have access to writer - // Always get the full passthrough tools (all real tools from connections) + // Create MCP client connections inside execute so the SSE + // response headers are flushed to the client before this + // potentially slow I/O, preventing Cloudflare 524 timeouts. + const [modelClient, passthroughClient, strategyClient] = + await Promise.all([ + clientFromConnection(modelConnection, ctx, false), + createVirtualClientFrom(virtualMcp, ctx, "passthrough"), + isGatewayMode + ? createVirtualClientFrom(virtualMcp, ctx, agent.mode) + : Promise.resolve(null), + ]); + + closeClients = () => { + modelClient.close().catch(() => {}); + passthroughClient.close().catch(() => {}); + strategyClient?.close().catch(() => {}); + }; + + const streamableModelClient = withStreamingSupport( + modelClient, + models.connectionId, + modelConnection, + ctx, + { superUser: false }, + ); + + const modelProvider = await createModelProviderFromClient( + streamableModelClient, + models, + ); + + // Enrich the pre-loaded messages with agent-specific instructions + // from the virtual MCP now that the client is available. + const serverInstructions = passthroughClient.getInstructions(); + const enrichedMessages = serverInstructions?.trim() + ? allMessages.map((msg) => + msg.id === "decopilot-system" + ? DECOPILOT_BASE_PROMPT(serverInstructions) + : msg, + ) + : allMessages; + const passthroughTools = await toolsFromMCP( passthroughClient, toolOutputMap, @@ -337,8 +335,6 @@ export function createDecopilotRoutes(deps: DecopilotDeps) { toolApprovalLevel, ); - // If using a gateway mode, also get the strategy meta-tools - // (GATEWAY_SEARCH_TOOLS, GATEWAY_DESCRIBE_TOOLS, GATEWAY_CALL_TOOL / GATEWAY_RUN_CODE) const strategyTools = strategyClient ? await toolsFromMCP( strategyClient, @@ -360,17 +356,12 @@ export function createDecopilotRoutes(deps: DecopilotDeps) { ctx, ); - // Merge all tools: strategy meta-tools override passthrough tools with the same name, - // and built-in tools take final precedence. const tools = { ...passthroughTools, ...strategyTools, ...builtInTools, }; - // In gateway modes, only expose the strategy meta-tools + built-ins to the LLM. - // The passthrough tools are still registered (so the AI SDK won't throw if the - // model calls a discovered tool directly), but the LLM won't see their schemas. const activeToolNames = strategyClient ? ([ ...Object.keys(strategyTools), @@ -378,12 +369,11 @@ export function createDecopilotRoutes(deps: DecopilotDeps) { ] as (keyof typeof tools)[]) : undefined; - // Process conversation with tools for validation const { systemMessages: processedSystemMessages, messages: processedMessages, originalMessages, - } = await processConversation(allMessages, { + } = await processConversation(enrichedMessages, { windowSize, models, tools,