From 841c2c0efb8ab7f786d669daf4699f2bff7242c9 Mon Sep 17 00:00:00 2001 From: BP602 <3460479+BP602@users.noreply.github.com> Date: Fri, 6 Feb 2026 14:13:40 +0100 Subject: [PATCH] chore: snapshot observability and stream fixes before branch split --- bin/claude-proxy.ts | 3 +- src/logger.ts | 74 ++++++- src/proxy/server.ts | 485 +++++++++++++++++++++++++++++++++----------- src/proxy/types.ts | 4 +- 4 files changed, 443 insertions(+), 123 deletions(-) diff --git a/bin/claude-proxy.ts b/bin/claude-proxy.ts index 60b3c62..e2e898f 100644 --- a/bin/claude-proxy.ts +++ b/bin/claude-proxy.ts @@ -4,5 +4,6 @@ import { startProxyServer } from "../src/proxy/server" const port = parseInt(process.env.CLAUDE_PROXY_PORT || "3456", 10) const host = process.env.CLAUDE_PROXY_HOST || "127.0.0.1" +const idleTimeoutSeconds = parseInt(process.env.CLAUDE_PROXY_IDLE_TIMEOUT_SECONDS || "120", 10) -await startProxyServer({ port, host }) +await startProxyServer({ port, host, idleTimeoutSeconds }) diff --git a/src/logger.ts b/src/logger.ts index 81753fe..07005ef 100644 --- a/src/logger.ts +++ b/src/logger.ts @@ -1,10 +1,72 @@ +import { AsyncLocalStorage } from "node:async_hooks" + +type LogFields = Record + +const contextStore = new AsyncLocalStorage() + const shouldLog = () => process.env["OPENCODE_CLAUDE_PROVIDER_DEBUG"] +const shouldLogStreamDebug = () => process.env["OPENCODE_CLAUDE_PROVIDER_STREAM_DEBUG"] -export const claudeLog = (message: string, extra?: Record) => { - if (!shouldLog()) return - const parts = ["[opencode-claude-code-provider]", message] - if (extra && Object.keys(extra).length > 0) { - parts.push(JSON.stringify(extra)) +const isVerboseStreamEvent = (event: string): boolean => { + return event.startsWith("stream.") || event === "response.empty_stream" +} + +const REDACTED_KEYS = new Set([ + "authorization", + "cookie", + "x-api-key", + "apiKey", + "apikey", + "prompt", + "messages", + "content" +]) + +const sanitize = (value: unknown): unknown => { + if (value === null || value === undefined) return value + + if (typeof value === "string") { + if (value.length > 512) { + return `${value.slice(0, 512)}... [truncated=${value.length}]` + } + return value + } + + if (Array.isArray(value)) { + return value.map(sanitize) } - console.debug(parts.join(" ")) + + if (typeof value === "object") { + const out: Record = {} + for (const [k, v] of Object.entries(value as Record)) { + if (REDACTED_KEYS.has(k)) { + if (typeof v === "string") { + out[k] = `[redacted len=${v.length}]` + } else if (Array.isArray(v)) { + out[k] = `[redacted array len=${v.length}]` + } else { + out[k] = "[redacted]" + } + } else { + out[k] = sanitize(v) + } + } + return out + } + + return value +} + +export const withClaudeLogContext = (context: LogFields, fn: () => T): T => { + return contextStore.run(context, fn) +} + +export const claudeLog = (event: string, extra?: LogFields) => { + if (!shouldLog()) return + if (isVerboseStreamEvent(event) && !shouldLogStreamDebug()) return + + const context = contextStore.getStore() || {} + const payload = sanitize({ ts: new Date().toISOString(), event, ...context, ...(extra || {}) }) + + console.debug(`[opencode-claude-code-provider] ${JSON.stringify(payload)}`) } diff --git a/src/proxy/server.ts b/src/proxy/server.ts index b817a86..9c752c1 100644 --- a/src/proxy/server.ts +++ b/src/proxy/server.ts @@ -1,6 +1,7 @@ import { Hono } from "hono" import { cors } from "hono/cors" import { query } from "@anthropic-ai/claude-agent-sdk" +import PQueue from "p-queue" import type { Context } from "hono" import type { ProxyConfig } from "./types" import { DEFAULT_PROXY_CONFIG } from "./types" @@ -10,6 +11,8 @@ import { existsSync } from "fs" import { fileURLToPath } from "url" import { join, dirname } from "path" import { opencodeMcpServer } from "../mcpTools" +import { randomUUID } from "crypto" +import { withClaudeLogContext } from "../logger" const BLOCKED_BUILTIN_TOOLS = [ "Read", "Write", "Edit", "MultiEdit", @@ -28,6 +31,9 @@ const ALLOWED_MCP_TOOLS = [ `mcp__${MCP_SERVER_NAME}__grep` ] +// Queue to serialize Claude Agent SDK queries and avoid ~60s delay on concurrent requests +const requestQueue = new PQueue({ concurrency: 1 }) + function resolveClaudeExecutable(): string { // 1. Try the SDK's bundled cli.js (same dir as this module's SDK) try { @@ -53,6 +59,11 @@ function mapModelToClaudeModel(model: string): "sonnet" | "opus" | "haiku" { return "sonnet" } +function isClosedControllerError(error: unknown): boolean { + if (!(error instanceof Error)) return false + return error.message.includes("Controller is already closed") +} + export function createProxyServer(config: Partial = {}) { const finalConfig = { ...DEFAULT_PROXY_CONFIG, ...config } const app = new Hono() @@ -69,13 +80,25 @@ export function createProxyServer(config: Partial = {}) { }) }) - const handleMessages = async (c: Context) => { - try { - const body = await c.req.json() - const model = mapModelToClaudeModel(body.model || "sonnet") - const stream = body.stream ?? true - - claudeLog("proxy.anthropic.request", { model, stream, messageCount: body.messages?.length }) + const handleMessages = async ( + c: Context, + requestMeta: { requestId: string; endpoint: string; queueEnteredAt: number; queueStartedAt: number } + ) => { + const requestStartAt = Date.now() + + return withClaudeLogContext({ requestId: requestMeta.requestId, endpoint: requestMeta.endpoint }, async () => { + try { + const body = await c.req.json() + const model = mapModelToClaudeModel(body.model || "sonnet") + const stream = body.stream ?? true + + claudeLog("request.received", { + model, + stream, + queueWaitMs: requestMeta.queueStartedAt - requestMeta.queueEnteredAt, + messageCount: Array.isArray(body.messages) ? body.messages.length : 0, + hasSystemPrompt: Boolean(body.system) + }) // Build system context from the request's system prompt let systemContext = "" @@ -114,51 +137,14 @@ export function createProxyServer(config: Partial = {}) { ? `${systemContext}\n\n${conversationParts}` : conversationParts - if (!stream) { - let fullContent = "" - const response = query({ - prompt, - options: { - maxTurns: 100, - model, - pathToClaudeCodeExecutable: claudeExecutable, - disallowedTools: [...BLOCKED_BUILTIN_TOOLS], - allowedTools: [...ALLOWED_MCP_TOOLS], - mcpServers: { - [MCP_SERVER_NAME]: opencodeMcpServer - } - } - }) - - for await (const message of response) { - if (message.type === "assistant") { - for (const block of message.message.content) { - if (block.type === "text") { - fullContent += block.text - } - } - } - } - - // If no text content was produced (e.g. only tool_use), return a fallback - if (!fullContent) { - fullContent = "I can help with that. Could you provide more details about what you'd like me to do?" - } + if (!stream) { + let fullContent = "" + let assistantMessages = 0 + const upstreamStartAt = Date.now() + let firstChunkAt: number | undefined - return c.json({ - id: `msg_${Date.now()}`, - type: "message", - role: "assistant", - content: [{ type: "text", text: fullContent }], - model: body.model, - stop_reason: "end_turn", - usage: { input_tokens: 0, output_tokens: 0 } - }) - } + claudeLog("upstream.start", { mode: "non_stream", model }) - const encoder = new TextEncoder() - const readable = new ReadableStream({ - async start(controller) { try { const response = query({ prompt, @@ -166,7 +152,6 @@ export function createProxyServer(config: Partial = {}) { maxTurns: 100, model, pathToClaudeCodeExecutable: claudeExecutable, - includePartialMessages: true, disallowedTools: [...BLOCKED_BUILTIN_TOOLS], allowedTools: [...ALLOWED_MCP_TOOLS], mcpServers: { @@ -175,89 +160,358 @@ export function createProxyServer(config: Partial = {}) { } }) - const heartbeat = setInterval(() => { - try { - controller.enqueue(encoder.encode(`: ping\n\n`)) - } catch { - clearInterval(heartbeat) + for await (const message of response) { + if (message.type === "assistant") { + assistantMessages += 1 + if (!firstChunkAt) { + firstChunkAt = Date.now() + claudeLog("upstream.first_chunk", { + mode: "non_stream", + model, + ttfbMs: firstChunkAt - upstreamStartAt + }) + } + + for (const block of message.message.content) { + if (block.type === "text") { + fullContent += block.text + } + } } - }, 15_000) + } + + claudeLog("upstream.completed", { + mode: "non_stream", + model, + assistantMessages, + durationMs: Date.now() - upstreamStartAt + }) + } catch (error) { + claudeLog("upstream.failed", { + mode: "non_stream", + model, + durationMs: Date.now() - upstreamStartAt, + error: error instanceof Error ? error.message : String(error) + }) + throw error + } + + // If no text content was produced (e.g. only tool_use), return a fallback + const fallbackUsed = !fullContent + if (fallbackUsed) { + fullContent = "I can help with that. Could you provide more details about what you'd like me to do?" + claudeLog("response.fallback_used", { mode: "non_stream", reason: "no_text_content" }) + } + + claudeLog("response.completed", { + mode: "non_stream", + model, + durationMs: Date.now() - requestStartAt, + responseChars: fullContent.length, + fallbackUsed + }) + + return c.json({ + id: `msg_${Date.now()}`, + type: "message", + role: "assistant", + content: [{ type: "text", text: fullContent }], + model: body.model, + stop_reason: "end_turn", + usage: { input_tokens: 0, output_tokens: 0 } + }) + } - const skipBlockIndices = new Set() + const encoder = new TextEncoder() + const readable = new ReadableStream({ + async start(controller) { + const upstreamStartAt = Date.now() + let firstChunkAt: number | undefined + let heartbeatCount = 0 + let streamEventsSeen = 0 + let eventsForwarded = 0 + let textEventsForwarded = 0 + let bytesSent = 0 + let streamClosed = false + + claudeLog("upstream.start", { mode: "stream", model }) + + const safeEnqueue = (payload: Uint8Array, source: string): boolean => { + if (streamClosed) return false + try { + controller.enqueue(payload) + bytesSent += payload.byteLength + return true + } catch (error) { + if (isClosedControllerError(error)) { + streamClosed = true + claudeLog("stream.client_closed", { source, streamEventsSeen, eventsForwarded }) + return false + } + + claudeLog("stream.enqueue_failed", { + source, + error: error instanceof Error ? error.message : String(error) + }) + throw error + } + } try { - for await (const message of response) { - if (message.type === "stream_event") { - const event = message.event - const eventType = event.type - const eventIndex = (event as any).index as number | undefined - - // Filter out tool_use content blocks — OpenCode expects text only - if (eventType === "content_block_start") { - const block = (event as any).content_block - if (block?.type === "tool_use") { - if (eventIndex !== undefined) skipBlockIndices.add(eventIndex) - continue - } + const response = query({ + prompt, + options: { + maxTurns: 100, + model, + pathToClaudeCodeExecutable: claudeExecutable, + includePartialMessages: true, + disallowedTools: [...BLOCKED_BUILTIN_TOOLS], + allowedTools: [...ALLOWED_MCP_TOOLS], + mcpServers: { + [MCP_SERVER_NAME]: opencodeMcpServer + } + } + }) + + const heartbeat = setInterval(() => { + heartbeatCount += 1 + try { + const payload = encoder.encode(`: ping\n\n`) + if (!safeEnqueue(payload, "heartbeat")) { + clearInterval(heartbeat) + return } + if (heartbeatCount % 5 === 0) { + claudeLog("stream.heartbeat", { count: heartbeatCount }) + } + } catch (error) { + claudeLog("stream.heartbeat_failed", { + count: heartbeatCount, + error: error instanceof Error ? error.message : String(error) + }) + clearInterval(heartbeat) + } + }, 15_000) + + const skipBlockIndices = new Set() - // Skip deltas and stops for tool_use blocks - if (eventIndex !== undefined && skipBlockIndices.has(eventIndex)) { - continue + try { + for await (const message of response) { + if (streamClosed) { + break } - // Override message_delta to always show end_turn - if (eventType === "message_delta") { - const patched = { - ...event, - delta: { ...((event as any).delta || {}), stop_reason: "end_turn" }, - usage: (event as any).usage || { output_tokens: 0 } + if (message.type === "stream_event") { + streamEventsSeen += 1 + if (!firstChunkAt) { + firstChunkAt = Date.now() + claudeLog("upstream.first_chunk", { + mode: "stream", + model, + ttfbMs: firstChunkAt - upstreamStartAt + }) + } + + const event = message.event + const eventType = event.type + const eventIndex = (event as any).index as number | undefined + + // content block indices are message-scoped; reset skip state per message + if (eventType === "message_start") { + skipBlockIndices.clear() + } + + // Filter out tool_use content blocks — OpenCode expects text only + if (eventType === "content_block_start") { + const block = (event as any).content_block + if (block?.type === "tool_use") { + if (eventIndex !== undefined) skipBlockIndices.add(eventIndex) + continue + } + } + + // Skip deltas and stops for tool_use blocks + if (eventIndex !== undefined && skipBlockIndices.has(eventIndex)) { + continue + } + + // Override message_delta to always show end_turn + if (eventType === "message_delta") { + const patched = { + ...event, + delta: { ...((event as any).delta || {}), stop_reason: "end_turn" }, + usage: (event as any).usage || { output_tokens: 0 } + } + const payload = encoder.encode(`event: ${eventType}\ndata: ${JSON.stringify(patched)}\n\n`) + if (!safeEnqueue(payload, `stream_event:${eventType}`)) { + break + } + eventsForwarded += 1 + continue + } + + // Forward all other events (message_start, text deltas, content_block_start/stop for text, message_stop) + const payload = encoder.encode(`event: ${eventType}\ndata: ${JSON.stringify(event)}\n\n`) + if (!safeEnqueue(payload, `stream_event:${eventType}`)) { + break + } + eventsForwarded += 1 + + if (eventType === "content_block_delta") { + const delta = (event as any).delta + if (delta?.type === "text_delta") { + textEventsForwarded += 1 + } } - controller.enqueue(encoder.encode(`event: ${eventType}\ndata: ${JSON.stringify(patched)}\n\n`)) - continue } + } + } finally { + clearInterval(heartbeat) + } - // Forward all other events (message_start, text deltas, content_block_start/stop for text, message_stop) - controller.enqueue(encoder.encode(`event: ${eventType}\ndata: ${JSON.stringify(event)}\n\n`)) + claudeLog("upstream.completed", { + mode: "stream", + model, + durationMs: Date.now() - upstreamStartAt, + streamEventsSeen, + eventsForwarded, + textEventsForwarded + }) + + if (!streamClosed) { + controller.close() + streamClosed = true + + claudeLog("stream.ended", { + model, + streamEventsSeen, + eventsForwarded, + textEventsForwarded, + bytesSent, + durationMs: Date.now() - requestStartAt + }) + + claudeLog("response.completed", { + mode: "stream", + model, + durationMs: Date.now() - requestStartAt, + streamEventsSeen, + eventsForwarded, + textEventsForwarded + }) + + if (textEventsForwarded === 0) { + claudeLog("response.empty_stream", { + model, + streamEventsSeen, + eventsForwarded, + reason: "no_text_deltas_forwarded" + }) } } - } finally { - clearInterval(heartbeat) + } catch (error) { + if (isClosedControllerError(error)) { + streamClosed = true + claudeLog("stream.client_closed", { + source: "stream_catch", + streamEventsSeen, + eventsForwarded, + textEventsForwarded, + durationMs: Date.now() - requestStartAt + }) + return + } + + claudeLog("upstream.failed", { + mode: "stream", + model, + durationMs: Date.now() - upstreamStartAt, + streamEventsSeen, + textEventsForwarded, + error: error instanceof Error ? error.message : String(error) + }) + claudeLog("proxy.anthropic.error", { error: error instanceof Error ? error.message : String(error) }) + safeEnqueue(encoder.encode(`event: error\ndata: ${JSON.stringify({ + type: "error", + error: { type: "api_error", message: error instanceof Error ? error.message : "Unknown error" } + })}\n\n`), "error_event") + if (!streamClosed) { + controller.close() + streamClosed = true + } } + } + }) - controller.close() - } catch (error) { - claudeLog("proxy.anthropic.error", { error: error instanceof Error ? error.message : String(error) }) - controller.enqueue(encoder.encode(`event: error\ndata: ${JSON.stringify({ - type: "error", - error: { type: "api_error", message: error instanceof Error ? error.message : "Unknown error" } - })}\n\n`)) - controller.close() + return new Response(readable, { + headers: { + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache", + Connection: "keep-alive" } - } - }) + }) + } catch (error) { + claudeLog("error.unhandled", { + durationMs: Date.now() - requestStartAt, + error: error instanceof Error ? error.message : String(error) + }) + claudeLog("proxy.error", { error: error instanceof Error ? error.message : String(error) }) + return c.json({ + type: "error", + error: { + type: "api_error", + message: error instanceof Error ? error.message : "Unknown error" + } + }, 500) + } + }) + } - return new Response(readable, { - headers: { - "Content-Type": "text/event-stream", - "Cache-Control": "no-cache", - Connection: "keep-alive" - } + app.post("/v1/messages", (c) => { + const requestId = c.req.header("x-request-id") || randomUUID() + const queueEnteredAt = Date.now() + claudeLog("queue.enter", { + requestId, + endpoint: "/v1/messages", + queueSize: requestQueue.size, + queuePending: requestQueue.pending + }) + + return requestQueue.add(() => { + const queueStartedAt = Date.now() + claudeLog("queue.start", { + requestId, + endpoint: "/v1/messages", + queueSize: requestQueue.size, + queuePending: requestQueue.pending, + queueWaitMs: queueStartedAt - queueEnteredAt }) - } catch (error) { - claudeLog("proxy.error", { error: error instanceof Error ? error.message : String(error) }) - return c.json({ - type: "error", - error: { - type: "api_error", - message: error instanceof Error ? error.message : "Unknown error" - } - }, 500) - } - } + return handleMessages(c, { requestId, endpoint: "/v1/messages", queueEnteredAt, queueStartedAt }) + }) + }) + + app.post("/messages", (c) => { + const requestId = c.req.header("x-request-id") || randomUUID() + const queueEnteredAt = Date.now() + claudeLog("queue.enter", { + requestId, + endpoint: "/messages", + queueSize: requestQueue.size, + queuePending: requestQueue.pending + }) - app.post("/v1/messages", handleMessages) - app.post("/messages", handleMessages) + return requestQueue.add(() => { + const queueStartedAt = Date.now() + claudeLog("queue.start", { + requestId, + endpoint: "/messages", + queueSize: requestQueue.size, + queuePending: requestQueue.pending, + queueWaitMs: queueStartedAt - queueEnteredAt + }) + return handleMessages(c, { requestId, endpoint: "/messages", queueEnteredAt, queueStartedAt }) + }) + }) return { app, config: finalConfig } } @@ -268,6 +522,7 @@ export async function startProxyServer(config: Partial = {}) { const server = Bun.serve({ port: finalConfig.port, hostname: finalConfig.host, + idleTimeout: finalConfig.idleTimeoutSeconds, fetch: app.fetch }) diff --git a/src/proxy/types.ts b/src/proxy/types.ts index ff93f5c..a8cf8b3 100644 --- a/src/proxy/types.ts +++ b/src/proxy/types.ts @@ -2,10 +2,12 @@ export interface ProxyConfig { port: number host: string debug: boolean + idleTimeoutSeconds: number } export const DEFAULT_PROXY_CONFIG: ProxyConfig = { port: 3456, host: "127.0.0.1", - debug: process.env.CLAUDE_PROXY_DEBUG === "1" + debug: process.env.CLAUDE_PROXY_DEBUG === "1", + idleTimeoutSeconds: 120 }