From 7487e2d9b95a250830fc3cdd1e400160c1a1ea56 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sven=20=E2=9A=A1?= Date: Fri, 20 Feb 2026 14:01:27 -0500 Subject: [PATCH 1/2] fix: normalize provider-prefixed model names in extractModel() Strip any / prefix before MODEL_MAP lookup, not just the hardcoded 'claude-code-cli/' prefix. This ensures requests with model names like 'claude-max/claude-sonnet-4' correctly map to 'sonnet' instead of falling back to the opus default. Changes: - Replace fixed /^claude-code-cli\// regex with /^[^/]+\// to handle any provider prefix generically - Remove now-redundant hardcoded 'claude-code-cli/*' entries from MODEL_MAP (covered by the generic prefix strip) - Add src/adapter/openai-to-cli.test.ts with 5 tests covering unprefixed names, claude-code-cli/ prefix, claude-max/ prefix, short aliases, and unknown-model fallback Fixes #23 --- src/adapter/openai-to-cli.test.ts | 33 +++++++++++++++++++++++++++++++ src/adapter/openai-to-cli.ts | 8 ++------ 2 files changed, 35 insertions(+), 6 deletions(-) create mode 100644 src/adapter/openai-to-cli.test.ts diff --git a/src/adapter/openai-to-cli.test.ts b/src/adapter/openai-to-cli.test.ts new file mode 100644 index 0000000..8de3840 --- /dev/null +++ b/src/adapter/openai-to-cli.test.ts @@ -0,0 +1,33 @@ +import { test } from "node:test"; +import assert from "node:assert/strict"; +import { extractModel } from "./openai-to-cli.js"; + +test("extractModel with unprefixed model names", () => { + assert.equal(extractModel("claude-opus-4"), "opus"); + assert.equal(extractModel("claude-sonnet-4"), "sonnet"); + assert.equal(extractModel("claude-haiku-4"), "haiku"); +}); + +test("extractModel with claude-code-cli/ prefix", () => { + assert.equal(extractModel("claude-code-cli/claude-opus-4"), "opus"); + assert.equal(extractModel("claude-code-cli/claude-sonnet-4"), "sonnet"); + assert.equal(extractModel("claude-code-cli/claude-haiku-4"), "haiku"); +}); + +test("extractModel with claude-max/ prefix", () => { + assert.equal(extractModel("claude-max/claude-opus-4"), "opus"); + assert.equal(extractModel("claude-max/claude-sonnet-4"), "sonnet"); + assert.equal(extractModel("claude-max/claude-haiku-4"), "haiku"); +}); + +test("extractModel with short aliases", () => { + assert.equal(extractModel("opus"), "opus"); + assert.equal(extractModel("sonnet"), "sonnet"); + assert.equal(extractModel("haiku"), "haiku"); +}); + +test("extractModel with unknown model falls back to opus", () => { + assert.equal(extractModel("unknown-model"), "opus"); + assert.equal(extractModel("gpt-4"), "opus"); + assert.equal(extractModel("some-provider/unknown-model"), "opus"); +}); diff --git a/src/adapter/openai-to-cli.ts b/src/adapter/openai-to-cli.ts index c8ecaa1..9337634 100644 --- a/src/adapter/openai-to-cli.ts +++ b/src/adapter/openai-to-cli.ts @@ -17,10 +17,6 @@ const MODEL_MAP: Record = { "claude-opus-4": "opus", "claude-sonnet-4": "sonnet", "claude-haiku-4": "haiku", - // With provider prefix - "claude-code-cli/claude-opus-4": "opus", - "claude-code-cli/claude-sonnet-4": "sonnet", - "claude-code-cli/claude-haiku-4": "haiku", // Aliases "opus": "opus", "sonnet": "sonnet", @@ -36,8 +32,8 @@ export function extractModel(model: string): ClaudeModel { return MODEL_MAP[model]; } - // Try stripping provider prefix - const stripped = model.replace(/^claude-code-cli\//, ""); + // Try stripping provider prefix (e.g. "claude-max/", "claude-code-cli/") + const stripped = model.replace(/^[^/]+\//, ""); if (MODEL_MAP[stripped]) { return MODEL_MAP[stripped]; } From 004a107f13f48ed976924aeff6061b9c39a81daf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Sven=20=E2=9A=A1?= Date: Sat, 21 Feb 2026 15:23:58 -0500 Subject: [PATCH 2/2] feat: add concurrency control and fix [object Object] content serialization - Add Semaphore + RequestTracker for concurrent subprocess limiting (default 5) - Wire concurrency module into routes.ts with acquire/release around CLI calls - Expose active/waiting/capacity stats on /health endpoint - Fix [object Object] bug: contentToString() handles array content parts - Add OpenAIContentPart type, support array content in OpenAIChatMessage - Normalize provider-prefixed model names in extractModel() - Add --permission-mode bypassPermissions to spawned Claude subprocesses Closes #23 Co-Authored-By: Claude Sonnet 4.6 --- src/adapter/openai-to-cli.ts | 24 ++++++-- src/concurrency/request-tracker.ts | 50 +++++++++++++++ src/concurrency/semaphore.ts | 98 ++++++++++++++++++++++++++++++ src/server/routes.ts | 90 +++++++++++++++++++-------- src/subprocess/manager.ts | 2 + src/types/openai.ts | 8 ++- 6 files changed, 243 insertions(+), 29 deletions(-) create mode 100644 src/concurrency/request-tracker.ts create mode 100644 src/concurrency/semaphore.ts diff --git a/src/adapter/openai-to-cli.ts b/src/adapter/openai-to-cli.ts index 9337634..9873d31 100644 --- a/src/adapter/openai-to-cli.ts +++ b/src/adapter/openai-to-cli.ts @@ -2,7 +2,7 @@ * Converts OpenAI chat request format to Claude CLI input */ -import type { OpenAIChatRequest } from "../types/openai.js"; +import type { OpenAIChatRequest, OpenAIContentPart } from "../types/openai.js"; export type ClaudeModel = "opus" | "sonnet" | "haiku"; @@ -42,6 +42,21 @@ export function extractModel(model: string): ClaudeModel { return "opus"; } +/** + * Extract a plain string from a message content field. + * Handles both the simple string form and the array-of-parts form that the + * OpenAI API allows (e.g. [{type:"text", text:"…"}, {type:"image_url",…}]). + * Non-text parts (images, etc.) are silently dropped since the Claude CLI + * only accepts text input. + */ +function contentToString(content: string | OpenAIContentPart[]): string { + if (typeof content === "string") return content; + return content + .filter((p) => p.type === "text" && typeof p.text === "string") + .map((p) => p.text as string) + .join(""); +} + /** * Convert OpenAI messages array to a single prompt string for Claude CLI * @@ -52,20 +67,21 @@ export function messagesToPrompt(messages: OpenAIChatRequest["messages"]): strin const parts: string[] = []; for (const msg of messages) { + const text = contentToString(msg.content); switch (msg.role) { case "system": // System messages become context instructions - parts.push(`\n${msg.content}\n\n`); + parts.push(`\n${text}\n\n`); break; case "user": // User messages are the main prompt - parts.push(msg.content); + parts.push(text); break; case "assistant": // Previous assistant responses for context - parts.push(`\n${msg.content}\n\n`); + parts.push(`\n${text}\n\n`); break; } } diff --git a/src/concurrency/request-tracker.ts b/src/concurrency/request-tracker.ts new file mode 100644 index 0000000..8e58094 --- /dev/null +++ b/src/concurrency/request-tracker.ts @@ -0,0 +1,50 @@ +/** + * Request Tracker + * + * Tracks in-flight requests for health monitoring, + * debugging, and graceful shutdown. + */ + +export interface ActiveRequest { + requestId: string; + model: string; + stream: boolean; + startedAt: number; + pid?: number; +} + +class RequestTracker { + private requests = new Map(); + + add(info: ActiveRequest): void { + this.requests.set(info.requestId, info); + } + + remove(requestId: string): void { + this.requests.delete(requestId); + } + + setPid(requestId: string, pid: number): void { + const req = this.requests.get(requestId); + if (req) req.pid = pid; + } + + get(requestId: string): ActiveRequest | undefined { + return this.requests.get(requestId); + } + + getAll(): ActiveRequest[] { + return Array.from(this.requests.values()); + } + + get count(): number { + return this.requests.size; + } + + /** Check if there are any active requests */ + get idle(): boolean { + return this.requests.size === 0; + } +} + +export const requestTracker = new RequestTracker(); diff --git a/src/concurrency/semaphore.ts b/src/concurrency/semaphore.ts new file mode 100644 index 0000000..17afd48 --- /dev/null +++ b/src/concurrency/semaphore.ts @@ -0,0 +1,98 @@ +/** + * Async Semaphore for limiting concurrent operations + * + * Used to cap the number of simultaneous Claude CLI subprocesses + * to prevent resource exhaustion under concurrent load. + */ + +interface QueueEntry { + resolve: () => void; + reject: (err: Error) => void; + timer: NodeJS.Timeout | null; +} + +export class Semaphore { + private current = 0; + private readonly queue: QueueEntry[] = []; + + constructor( + private readonly max: number, + private readonly queueTimeout: number = 120000 // 2 minutes default + ) {} + + /** + * Acquire a permit. Resolves immediately if capacity is available, + * otherwise queues and waits. Rejects if queue timeout expires. + */ + async acquire(): Promise { + if (this.current < this.max) { + this.current++; + return; + } + + return new Promise((resolve, reject) => { + const entry: QueueEntry = { + resolve: () => { + if (entry.timer) clearTimeout(entry.timer); + resolve(); + }, + reject, + timer: null, + }; + + entry.timer = setTimeout(() => { + const idx = this.queue.indexOf(entry); + if (idx !== -1) { + this.queue.splice(idx, 1); + reject( + new Error( + `Request queued too long (${this.queueTimeout}ms). ` + + `${this.current} of ${this.max} slots busy, ${this.queue.length} still queued.` + ) + ); + } + }, this.queueTimeout); + + this.queue.push(entry); + }); + } + + /** + * Release a permit. If requests are queued, the next one gets the slot. + */ + release(): void { + if (this.queue.length > 0) { + const next = this.queue.shift()!; + // Don't decrement — slot transfers directly to next waiter + next.resolve(); + } else { + this.current--; + } + } + + /** Number of active permits */ + get active(): number { + return this.current; + } + + /** Number of requests waiting in queue */ + get waiting(): number { + return this.queue.length; + } + + /** Maximum concurrent permits */ + get capacity(): number { + return this.max; + } + + /** + * Drain all waiters with an error (used during shutdown). + */ + drain(reason: string): void { + while (this.queue.length > 0) { + const entry = this.queue.shift()!; + if (entry.timer) clearTimeout(entry.timer); + entry.reject(new Error(reason)); + } + } +} diff --git a/src/server/routes.ts b/src/server/routes.ts index ffe2e5b..dd56280 100644 --- a/src/server/routes.ts +++ b/src/server/routes.ts @@ -14,6 +14,12 @@ import { } from "../adapter/cli-to-openai.js"; import type { OpenAIChatRequest } from "../types/openai.js"; import type { ClaudeCliAssistant, ClaudeCliResult, ClaudeCliStreamEvent } from "../types/claude-cli.js"; +import { Semaphore } from "../concurrency/semaphore.js"; +import { requestTracker } from "../concurrency/request-tracker.js"; + +const MAX_CONCURRENT = parseInt(process.env.MAX_CONCURRENT ?? "5", 10); +const QUEUE_TIMEOUT = parseInt(process.env.QUEUE_TIMEOUT_MS ?? "120000", 10); +const semaphore = new Semaphore(MAX_CONCURRENT, QUEUE_TIMEOUT); /** * Handle POST /v1/chat/completions @@ -45,10 +51,17 @@ export async function handleChatCompletions( const cliInput = openaiToCli(body); const subprocess = new ClaudeSubprocess(); - if (stream) { - await handleStreamingResponse(req, res, subprocess, cliInput, requestId); - } else { - await handleNonStreamingResponse(res, subprocess, cliInput, requestId); + await semaphore.acquire(); + requestTracker.add({ requestId, model: body.model ?? "claude-sonnet-4", stream, startedAt: Date.now() }); + try { + if (stream) { + await handleStreamingResponse(req, res, subprocess, cliInput, requestId); + } else { + await handleNonStreamingResponse(res, subprocess, cliInput, requestId); + } + } finally { + semaphore.release(); + requestTracker.remove(requestId); } } catch (error) { const message = error instanceof Error ? error.message : "Unknown error"; @@ -97,6 +110,20 @@ async function handleStreamingResponse( let isFirst = true; let lastModel = "claude-sonnet-4"; let isComplete = false; + let hasContent = false; + + // Helper: send a well-formed SSE error event and close + const sendStreamError = (message: string, type = "server_error") => { + if (!res.writableEnded) { + res.write( + `data: ${JSON.stringify({ + error: { message, type, code: null }, + })}\n\n` + ); + res.write("data: [DONE]\n\n"); + res.end(); + } + }; // Handle actual client disconnect (response stream closed) res.on("close", () => { @@ -127,6 +154,7 @@ async function handleStreamingResponse( }; res.write(`data: ${JSON.stringify(chunk)}\n\n`); isFirst = false; + hasContent = true; } }); @@ -148,29 +176,32 @@ async function handleStreamingResponse( }); subprocess.on("error", (error: Error) => { - console.error("[Streaming] Error:", error.message); - if (!res.writableEnded) { - res.write( - `data: ${JSON.stringify({ - error: { message: error.message, type: "server_error", code: null }, - })}\n\n` - ); - res.end(); - } + console.error("[Streaming] Subprocess error:", error.message); + isComplete = true; + sendStreamError(error.message); resolve(); }); subprocess.on("close", (code: number | null) => { // Subprocess exited - ensure response is closed - if (!res.writableEnded) { - if (code !== 0 && !isComplete) { - // Abnormal exit without result - send error - res.write(`data: ${JSON.stringify({ - error: { message: `Process exited with code ${code}`, type: "server_error", code: null }, - })}\n\n`); + if (!isComplete) { + if (code !== 0 || !hasContent) { + // Abnormal exit or no content produced — signal a real error + const msg = code !== 0 + ? `Claude CLI exited with code ${code} — may be rate-limited or unauthenticated` + : "Claude CLI exited without producing a response"; + console.error("[Streaming] Abnormal close:", msg); + sendStreamError(msg); + } else { + // Clean exit but no result event (shouldn't happen, but handle gracefully) + if (!res.writableEnded) { + const doneChunk = createDoneChunk(requestId, lastModel); + res.write(`data: ${JSON.stringify(doneChunk)}\n\n`); + res.write("data: [DONE]\n\n"); + res.end(); + } } - res.write("data: [DONE]\n\n"); - res.end(); + isComplete = true; } resolve(); }); @@ -181,6 +212,7 @@ async function handleStreamingResponse( sessionId: cliInput.sessionId, }).catch((err) => { console.error("[Streaming] Subprocess start error:", err); + sendStreamError(err instanceof Error ? err.message : String(err)); reject(err); }); }); @@ -218,11 +250,15 @@ async function handleNonStreamingResponse( if (finalResult) { res.json(cliResultToOpenai(finalResult, requestId)); } else if (!res.headersSent) { - res.status(500).json({ + const msg = code !== 0 + ? `Claude CLI exited with code ${code} — may be rate-limited or unauthenticated` + : "Claude CLI exited without producing a response"; + console.error("[NonStreaming] Abnormal close:", msg); + res.status(503).json({ error: { - message: `Claude CLI exited with code ${code} without response`, + message: msg, type: "server_error", - code: null, + code: "upstream_unavailable", }, }); } @@ -289,5 +325,11 @@ export function handleHealth(_req: Request, res: Response): void { status: "ok", provider: "claude-code-cli", timestamp: new Date().toISOString(), + concurrency: { + active: semaphore.active, + waiting: semaphore.waiting, + capacity: semaphore.capacity, + }, + requests: requestTracker.getAll(), }); } diff --git a/src/subprocess/manager.ts b/src/subprocess/manager.ts index 6551a81..58c9721 100644 --- a/src/subprocess/manager.ts +++ b/src/subprocess/manager.ts @@ -132,6 +132,8 @@ export class ClaudeSubprocess extends EventEmitter { "--print", // Non-interactive mode "--output-format", "stream-json", // JSON streaming output + "--permission-mode", + "bypassPermissions", "--verbose", // Required for stream-json "--include-partial-messages", // Enable streaming chunks "--model", diff --git a/src/types/openai.ts b/src/types/openai.ts index c116658..512687f 100644 --- a/src/types/openai.ts +++ b/src/types/openai.ts @@ -3,9 +3,15 @@ * Used for Clawdbot integration */ +export interface OpenAIContentPart { + type: "text" | "image_url" | string; + text?: string; + image_url?: { url: string; detail?: string }; +} + export interface OpenAIChatMessage { role: "system" | "user" | "assistant"; - content: string; + content: string | OpenAIContentPart[]; } export interface OpenAIChatRequest {