diff --git a/src/adapter/anthropic-to-cli.ts b/src/adapter/anthropic-to-cli.ts new file mode 100644 index 0000000..59bcc74 --- /dev/null +++ b/src/adapter/anthropic-to-cli.ts @@ -0,0 +1,167 @@ +/** + * Converts Anthropic Messages API request to Claude CLI input + */ + +import fs from "fs"; +import path from "path"; +import os from "os"; +import crypto from "crypto"; +import type { + AnthropicMessagesRequest, + AnthropicContentBlock, + AnthropicImageContent, +} from "../types/anthropic.js"; +import type { ClaudeModel } from "./openai-to-cli.js"; + +export interface CliInput { + prompt: string; + model: ClaudeModel; + sessionId?: string; + tempFiles: string[]; // temp files to clean up after request +} + +const MODEL_MAP: Record = { + "claude-opus-4": "opus", + "claude-opus-4-20250514": "opus", + "claude-opus-4-6": "opus", + "claude-opus-4-6-20250725": "opus", + "claude-sonnet-4": "sonnet", + "claude-sonnet-4-20250514": "sonnet", + "claude-sonnet-4-6": "sonnet", + "claude-sonnet-4-6-20250725": "sonnet", + "claude-haiku-4": "haiku", + "claude-haiku-4-5-20251001": "haiku", + "opus": "opus", + "sonnet": "sonnet", + "haiku": "haiku", +}; + +/** + * Extract Claude model alias from Anthropic model string + */ +export function extractModel(model: string): ClaudeModel { + if (MODEL_MAP[model]) { + return MODEL_MAP[model]; + } + // Fuzzy match + if (model.includes("opus")) return "opus"; + if (model.includes("sonnet")) return "sonnet"; + if (model.includes("haiku")) return "haiku"; + return "sonnet"; +} + +const MIME_TO_EXT: Record = { + "image/jpeg": ".jpg", + "image/png": ".png", + "image/gif": ".gif", + "image/webp": ".webp", +}; + +/** + * Save a base64 image to a temp file, return the file path + */ +function saveImageToTempFile(image: AnthropicImageContent): string | null { + if (image.source.type === "base64" && image.source.data) { + const ext = MIME_TO_EXT[image.source.media_type || ""] || ".png"; + const id = crypto.randomBytes(8).toString("hex"); + const filePath = path.join(os.tmpdir(), `claude-proxy-img-${id}${ext}`); + fs.writeFileSync(filePath, Buffer.from(image.source.data, "base64")); + return filePath; + } + if (image.source.type === "url" && image.source.url) { + return image.source.url; + } + return null; +} + +/** + * Serialize content blocks to text, handling images by saving to temp files + */ +function contentToText( + content: string | AnthropicContentBlock[], + tempFiles: string[] +): string { + if (typeof content === "string") return content; + + const parts: string[] = []; + for (const block of content) { + if (block.type === "text") { + parts.push((block as { text: string }).text); + } else if (block.type === "image") { + const ref = saveImageToTempFile(block as AnthropicImageContent); + if (ref) { + if (ref.startsWith("/") || ref.startsWith(os.tmpdir())) { + tempFiles.push(ref); + parts.push(`[User sent an image. View it with the Read tool at: ${ref}]`); + } else { + // URL-based image + parts.push(`[User sent an image from URL: ${ref}]`); + } + } + } else if (block.type === "tool_use") { + const tu = block as { name: string; input: unknown }; + parts.push(`[Tool call: ${tu.name}(${JSON.stringify(tu.input)})]`); + } else if (block.type === "tool_result") { + const tr = block as { content: string | { text: string }[] }; + const text = typeof tr.content === "string" + ? tr.content + : tr.content.map((c) => c.text).join(""); + parts.push(`[Tool result: ${text}]`); + } + } + return parts.join("\n"); +} + +/** + * Convert Anthropic Messages request to a single prompt for Claude CLI + * + * Claude Code CLI in --print mode expects a single prompt. + * We format system + messages into a readable prompt. + */ +export function anthropicToCli(request: AnthropicMessagesRequest): CliInput { + const parts: string[] = []; + const tempFiles: string[] = []; + + // System prompt + if (request.system) { + const systemText = typeof request.system === "string" + ? request.system + : request.system + .filter((b) => b.type === "text") + .map((b) => b.text) + .join("\n"); + if (systemText) { + parts.push(`\n${systemText}\n\n`); + } + } + + // Messages + for (const msg of request.messages) { + const text = contentToText(msg.content, tempFiles); + if (msg.role === "user") { + parts.push(text); + } else if (msg.role === "assistant") { + parts.push(`\n${text}\n\n`); + } + } + + return { + prompt: parts.join("\n").trim(), + model: extractModel(request.model), + sessionId: request.metadata?.user_id, + tempFiles, + }; +} + +/** + * Clean up temp files created during request processing + */ +export function cleanupTempFiles(files: string[]): void { + for (const f of files) { + try { + if (fs.existsSync(f)) fs.unlinkSync(f); + } catch { + // ignore cleanup errors + } + } +} diff --git a/src/adapter/cli-to-anthropic.ts b/src/adapter/cli-to-anthropic.ts new file mode 100644 index 0000000..9aa6768 --- /dev/null +++ b/src/adapter/cli-to-anthropic.ts @@ -0,0 +1,191 @@ +/** + * Converts Claude CLI output to Anthropic Messages API response format + * + * Key insight: the CLI's stream-json events already use the same event types + * as the Anthropic streaming API (message_start, content_block_delta, etc.), + * so the conversion is mostly passthrough with minor reshaping. + */ + +import type { ClaudeCliAssistant, ClaudeCliResult, ClaudeCliStreamEvent } from "../types/claude-cli.js"; +import type { + AnthropicMessagesResponse, + AnthropicMessageStartEvent, + AnthropicContentBlockStartEvent, + AnthropicContentBlockDeltaEvent, + AnthropicContentBlockStopEvent, + AnthropicMessageDeltaEvent, + AnthropicMessageStopEvent, + AnthropicStreamEvent, + AnthropicUsage, +} from "../types/anthropic.js"; + +/** + * Map CLI stop_reason to Anthropic stop_reason + */ +function mapStopReason( + reason: string | null +): AnthropicMessagesResponse["stop_reason"] { + if (!reason) return null; + if (reason === "end_turn" || reason === "stop_sequence" || reason === "max_tokens" || reason === "tool_use") { + return reason; + } + return "end_turn"; +} + +/** + * Convert CLI assistant message to full Anthropic Messages response (non-streaming) + */ +export function cliAssistantToAnthropic( + message: ClaudeCliAssistant, + requestId: string +): AnthropicMessagesResponse { + return { + id: requestId, + type: "message", + role: "assistant", + model: message.message.model, + content: message.message.content.map((c) => ({ + type: "text" as const, + text: c.text, + })), + stop_reason: mapStopReason(message.message.stop_reason) || "end_turn", + stop_sequence: null, + usage: { + input_tokens: message.message.usage.input_tokens, + output_tokens: message.message.usage.output_tokens, + cache_creation_input_tokens: message.message.usage.cache_creation_input_tokens, + cache_read_input_tokens: message.message.usage.cache_read_input_tokens, + }, + }; +} + +/** + * Convert CLI result to Anthropic Messages response (non-streaming fallback) + */ +export function cliResultToAnthropic( + result: ClaudeCliResult, + requestId: string +): AnthropicMessagesResponse { + const modelName = result.modelUsage + ? Object.keys(result.modelUsage)[0] + : "claude-sonnet-4"; + + return { + id: requestId, + type: "message", + role: "assistant", + model: modelName, + content: [ + { + type: "text", + text: result.result, + }, + ], + stop_reason: result.is_error ? null : "end_turn", + stop_sequence: null, + usage: { + input_tokens: result.usage?.input_tokens || 0, + output_tokens: result.usage?.output_tokens || 0, + cache_creation_input_tokens: result.usage?.cache_creation_input_tokens, + cache_read_input_tokens: result.usage?.cache_read_input_tokens, + }, + }; +} + +/** + * Create message_start SSE event + */ +export function createMessageStartEvent( + requestId: string, + model: string +): AnthropicMessageStartEvent { + return { + type: "message_start", + message: { + id: requestId, + type: "message", + role: "assistant", + model, + content: [], + stop_reason: null, + stop_sequence: null, + usage: { + input_tokens: 0, + output_tokens: 0, + }, + }, + }; +} + +/** + * Create content_block_start SSE event + */ +export function createContentBlockStartEvent( + index: number +): AnthropicContentBlockStartEvent { + return { + type: "content_block_start", + index, + content_block: { + type: "text", + text: "", + }, + }; +} + +/** + * Convert CLI content_delta to Anthropic content_block_delta + */ +export function cliDeltaToAnthropic( + event: ClaudeCliStreamEvent, + index: number +): AnthropicContentBlockDeltaEvent { + return { + type: "content_block_delta", + index, + delta: { + type: "text_delta", + text: event.event.delta?.text || "", + }, + }; +} + +/** + * Create content_block_stop SSE event + */ +export function createContentBlockStopEvent( + index: number +): AnthropicContentBlockStopEvent { + return { + type: "content_block_stop", + index, + }; +} + +/** + * Create message_delta SSE event + */ +export function createMessageDeltaEvent( + stopReason: AnthropicMessagesResponse["stop_reason"], + outputTokens: number +): AnthropicMessageDeltaEvent { + return { + type: "message_delta", + delta: { + stop_reason: stopReason, + stop_sequence: null, + }, + usage: { + output_tokens: outputTokens, + }, + }; +} + +/** + * Create message_stop SSE event + */ +export function createMessageStopEvent(): AnthropicMessageStopEvent { + return { + type: "message_stop", + }; +} diff --git a/src/server/index.ts b/src/server/index.ts index de8b73d..fb9b328 100644 --- a/src/server/index.ts +++ b/src/server/index.ts @@ -6,7 +6,7 @@ import express, { Express, Request, Response, NextFunction } from "express"; import { createServer, Server } from "http"; -import { handleChatCompletions, handleModels, handleHealth } from "./routes.js"; +import { handleChatCompletions, handleAnthropicMessages, handleModels, handleHealth } from "./routes.js"; export interface ServerConfig { port: number; @@ -49,6 +49,7 @@ function createApp(): Express { app.get("/health", handleHealth); app.get("/v1/models", handleModels); app.post("/v1/chat/completions", handleChatCompletions); + app.post("/v1/messages", handleAnthropicMessages); // 404 handler app.use((_req: Request, res: Response) => { @@ -103,6 +104,7 @@ export async function startServer(config: ServerConfig): Promise { serverInstance.listen(port, host, () => { console.log(`[Server] Claude Code CLI provider running at http://${host}:${port}`); console.log(`[Server] OpenAI-compatible endpoint: http://${host}:${port}/v1/chat/completions`); + console.log(`[Server] Anthropic Messages endpoint: http://${host}:${port}/v1/messages`); resolve(serverInstance!); }); }); diff --git a/src/server/routes.ts b/src/server/routes.ts index ffe2e5b..d43a050 100644 --- a/src/server/routes.ts +++ b/src/server/routes.ts @@ -1,7 +1,7 @@ /** * API Route Handlers * - * Implements OpenAI-compatible endpoints for Clawdbot integration + * Implements OpenAI-compatible and Anthropic Messages API endpoints */ import type { Request, Response } from "express"; @@ -12,7 +12,19 @@ import { cliResultToOpenai, createDoneChunk, } from "../adapter/cli-to-openai.js"; +import { anthropicToCli, cleanupTempFiles } from "../adapter/anthropic-to-cli.js"; +import { + cliAssistantToAnthropic, + cliResultToAnthropic, + createMessageStartEvent, + createContentBlockStartEvent, + cliDeltaToAnthropic, + createContentBlockStopEvent, + createMessageDeltaEvent, + createMessageStopEvent, +} from "../adapter/cli-to-anthropic.js"; import type { OpenAIChatRequest } from "../types/openai.js"; +import type { AnthropicMessagesRequest } from "../types/anthropic.js"; import type { ClaudeCliAssistant, ClaudeCliResult, ClaudeCliStreamEvent } from "../types/claude-cli.js"; /** @@ -291,3 +303,269 @@ export function handleHealth(_req: Request, res: Response): void { timestamp: new Date().toISOString(), }); } + +/** + * Handle POST /v1/messages + * + * Anthropic Messages API compatible endpoint. + * Supports both streaming and non-streaming. + */ +export async function handleAnthropicMessages( + req: Request, + res: Response +): Promise { + const requestId = `msg_${uuidv4().replace(/-/g, "").slice(0, 24)}`; + const body = req.body as AnthropicMessagesRequest; + const stream = body.stream === true; + + try { + // Validate request + if (!body.messages || !Array.isArray(body.messages) || body.messages.length === 0) { + res.status(400).json({ + type: "error", + error: { + type: "invalid_request_error", + message: "messages is required and must be a non-empty array", + }, + }); + return; + } + + if (!body.max_tokens) { + res.status(400).json({ + type: "error", + error: { + type: "invalid_request_error", + message: "max_tokens is required", + }, + }); + return; + } + + // Convert to CLI input + const cliInput = anthropicToCli(body); + const subprocess = new ClaudeSubprocess(); + + try { + if (stream) { + await handleAnthropicStreaming(req, res, subprocess, cliInput, requestId, body.model); + } else { + await handleAnthropicNonStreaming(res, subprocess, cliInput, requestId); + } + } finally { + if (cliInput.tempFiles.length > 0) { + cleanupTempFiles(cliInput.tempFiles); + } + } + } catch (error) { + const message = error instanceof Error ? error.message : "Unknown error"; + console.error("[handleAnthropicMessages] Error:", message); + + if (!res.headersSent) { + res.status(500).json({ + type: "error", + error: { + type: "api_error", + message, + }, + }); + } + } +} + +/** + * Handle Anthropic streaming response (SSE) + * + * Uses the Anthropic SSE format: + * event: message_start / content_block_start / content_block_delta / ... + * data: { ... } + */ +async function handleAnthropicStreaming( + req: Request, + res: Response, + subprocess: ClaudeSubprocess, + cliInput: ReturnType, + requestId: string, + requestModel: string +): Promise { + // Set SSE headers + res.setHeader("Content-Type", "text/event-stream"); + res.setHeader("Cache-Control", "no-cache"); + res.setHeader("Connection", "keep-alive"); + res.setHeader("X-Request-Id", requestId); + + res.flushHeaders(); + + return new Promise((resolve, reject) => { + let isComplete = false; + let contentBlockStarted = false; + let lastModel = requestModel; + let totalOutputTokens = 0; + + // Send message_start immediately + const messageStart = createMessageStartEvent(requestId, requestModel); + res.write(`event: message_start\ndata: ${JSON.stringify(messageStart)}\n\n`); + + res.on("close", () => { + if (!isComplete) { + subprocess.kill(); + } + resolve(); + }); + + // Handle content deltas + subprocess.on("content_delta", (event: ClaudeCliStreamEvent) => { + const text = event.event.delta?.text || ""; + if (text && !res.writableEnded) { + // Send content_block_start on first delta + if (!contentBlockStarted) { + const blockStart = createContentBlockStartEvent(0); + res.write(`event: content_block_start\ndata: ${JSON.stringify(blockStart)}\n\n`); + contentBlockStarted = true; + } + + const delta = cliDeltaToAnthropic(event, 0); + res.write(`event: content_block_delta\ndata: ${JSON.stringify(delta)}\n\n`); + } + }); + + // Capture model name from assistant message + subprocess.on("assistant", (message: ClaudeCliAssistant) => { + lastModel = message.message.model; + totalOutputTokens = message.message.usage.output_tokens; + }); + + subprocess.on("result", (result: ClaudeCliResult) => { + isComplete = true; + if (!res.writableEnded) { + totalOutputTokens = result.usage?.output_tokens || totalOutputTokens; + + // Close content block if one was started + if (contentBlockStarted) { + const blockStop = createContentBlockStopEvent(0); + res.write(`event: content_block_stop\ndata: ${JSON.stringify(blockStop)}\n\n`); + } + + // Send message_delta with stop_reason and usage + const messageDelta = createMessageDeltaEvent("end_turn", totalOutputTokens); + res.write(`event: message_delta\ndata: ${JSON.stringify(messageDelta)}\n\n`); + + // Send message_stop + const messageStop = createMessageStopEvent(); + res.write(`event: message_stop\ndata: ${JSON.stringify(messageStop)}\n\n`); + + res.end(); + } + resolve(); + }); + + subprocess.on("error", (error: Error) => { + console.error("[Anthropic Streaming] Error:", error.message); + if (!res.writableEnded) { + const errorEvent = { + type: "error", + error: { type: "api_error", message: error.message }, + }; + res.write(`event: error\ndata: ${JSON.stringify(errorEvent)}\n\n`); + res.end(); + } + resolve(); + }); + + subprocess.on("close", (code: number | null) => { + if (!res.writableEnded) { + if (code !== 0 && !isComplete) { + const errorEvent = { + type: "error", + error: { type: "api_error", message: `Process exited with code ${code}` }, + }; + res.write(`event: error\ndata: ${JSON.stringify(errorEvent)}\n\n`); + } + res.end(); + } + // Clean up temp files (images saved to disk) + if (cliInput.tempFiles.length > 0) { + cleanupTempFiles(cliInput.tempFiles); + } + resolve(); + }); + + subprocess.start(cliInput.prompt, { + model: cliInput.model, + sessionId: cliInput.sessionId, + }).catch((err) => { + console.error("[Anthropic Streaming] Subprocess start error:", err); + reject(err); + }); + }); +} + +/** + * Handle Anthropic non-streaming response + */ +async function handleAnthropicNonStreaming( + res: Response, + subprocess: ClaudeSubprocess, + cliInput: ReturnType, + requestId: string +): Promise { + return new Promise((resolve) => { + let finalAssistant: ClaudeCliAssistant | null = null; + let finalResult: ClaudeCliResult | null = null; + + subprocess.on("assistant", (message: ClaudeCliAssistant) => { + finalAssistant = message; + }); + + subprocess.on("result", (result: ClaudeCliResult) => { + finalResult = result; + }); + + subprocess.on("error", (error: Error) => { + console.error("[Anthropic NonStreaming] Error:", error.message); + if (!res.headersSent) { + res.status(500).json({ + type: "error", + error: { type: "api_error", message: error.message }, + }); + } + resolve(); + }); + + subprocess.on("close", (code: number | null) => { + if (finalAssistant) { + // Prefer assistant message (has full content blocks and usage) + res.json(cliAssistantToAnthropic(finalAssistant, requestId)); + } else if (finalResult) { + // Fallback to result (has text string and usage) + res.json(cliResultToAnthropic(finalResult, requestId)); + } else if (!res.headersSent) { + res.status(500).json({ + type: "error", + error: { + type: "api_error", + message: `Claude CLI exited with code ${code} without response`, + }, + }); + } + // Clean up temp files (images saved to disk) + if (cliInput.tempFiles.length > 0) { + cleanupTempFiles(cliInput.tempFiles); + } + resolve(); + }); + + subprocess.start(cliInput.prompt, { + model: cliInput.model, + sessionId: cliInput.sessionId, + }).catch((error) => { + if (!res.headersSent) { + res.status(500).json({ + type: "error", + error: { type: "api_error", message: error.message }, + }); + } + resolve(); + }); + }); +} diff --git a/src/subprocess/manager.ts b/src/subprocess/manager.ts index 6551a81..1cf61eb 100644 --- a/src/subprocess/manager.ts +++ b/src/subprocess/manager.ts @@ -46,7 +46,7 @@ export class ClaudeSubprocess extends EventEmitter { * Start the Claude CLI subprocess with the given prompt */ async start(prompt: string, options: SubprocessOptions): Promise { - const args = this.buildArgs(prompt, options); + const args = this.buildArgs(options); const timeout = options.timeout || DEFAULT_TIMEOUT; return new Promise((resolve, reject) => { @@ -81,7 +81,8 @@ export class ClaudeSubprocess extends EventEmitter { } }); - // Close stdin since we pass prompt as argument + // Write prompt to stdin to avoid E2BIG (ARG_MAX limit) for long prompts + this.process.stdin?.write(prompt, "utf8"); this.process.stdin?.end(); console.error(`[Subprocess] Process spawned with PID: ${this.process.pid}`); @@ -127,17 +128,18 @@ export class ClaudeSubprocess extends EventEmitter { /** * Build CLI arguments array */ - private buildArgs(prompt: string, options: SubprocessOptions): string[] { + private buildArgs(options: SubprocessOptions): string[] { const args = [ "--print", // Non-interactive mode "--output-format", "stream-json", // JSON streaming output "--verbose", // Required for stream-json "--include-partial-messages", // Enable streaming chunks + "--permission-mode", "bypassPermissions", // Allow tool use (works under root unlike --dangerously-skip-permissions) "--model", options.model, // Model alias (opus/sonnet/haiku) "--no-session-persistence", // Don't save sessions - prompt, // Pass prompt as argument (more reliable than stdin) + // NOTE: prompt is passed via stdin, not as an argument, to avoid E2BIG for long prompts ]; if (options.sessionId) { diff --git a/src/types/anthropic.ts b/src/types/anthropic.ts new file mode 100644 index 0000000..8d9c213 --- /dev/null +++ b/src/types/anthropic.ts @@ -0,0 +1,136 @@ +/** + * Types for Anthropic Messages API + * https://docs.anthropic.com/en/api/messages + */ + +export interface AnthropicTextContent { + type: "text"; + text: string; +} + +export interface AnthropicImageSource { + type: "base64" | "url"; + media_type?: string; + data?: string; + url?: string; +} + +export interface AnthropicImageContent { + type: "image"; + source: AnthropicImageSource; +} + +export interface AnthropicToolUseContent { + type: "tool_use"; + id: string; + name: string; + input: Record; +} + +export interface AnthropicToolResultContent { + type: "tool_result"; + tool_use_id: string; + content: string | AnthropicTextContent[]; +} + +export type AnthropicContentBlock = + | AnthropicTextContent + | AnthropicImageContent + | AnthropicToolUseContent + | AnthropicToolResultContent; + +export interface AnthropicMessage { + role: "user" | "assistant"; + content: string | AnthropicContentBlock[]; +} + +export interface AnthropicMessagesRequest { + model: string; + messages: AnthropicMessage[]; + system?: string | AnthropicTextContent[]; + max_tokens: number; + stream?: boolean; + temperature?: number; + top_p?: number; + top_k?: number; + stop_sequences?: string[]; + metadata?: { user_id?: string }; +} + +export interface AnthropicUsage { + input_tokens: number; + output_tokens: number; + cache_creation_input_tokens?: number; + cache_read_input_tokens?: number; +} + +export interface AnthropicMessagesResponse { + id: string; + type: "message"; + role: "assistant"; + model: string; + content: AnthropicContentBlock[]; + stop_reason: "end_turn" | "max_tokens" | "stop_sequence" | "tool_use" | null; + stop_sequence: string | null; + usage: AnthropicUsage; +} + +// Streaming event types + +export interface AnthropicMessageStartEvent { + type: "message_start"; + message: Omit & { + content: []; + }; +} + +export interface AnthropicContentBlockStartEvent { + type: "content_block_start"; + index: number; + content_block: AnthropicContentBlock; +} + +export interface AnthropicContentBlockDeltaEvent { + type: "content_block_delta"; + index: number; + delta: { + type: "text_delta"; + text: string; + }; +} + +export interface AnthropicContentBlockStopEvent { + type: "content_block_stop"; + index: number; +} + +export interface AnthropicMessageDeltaEvent { + type: "message_delta"; + delta: { + stop_reason: "end_turn" | "max_tokens" | "stop_sequence" | "tool_use" | null; + stop_sequence: string | null; + }; + usage: { + output_tokens: number; + }; +} + +export interface AnthropicMessageStopEvent { + type: "message_stop"; +} + +export type AnthropicStreamEvent = + | AnthropicMessageStartEvent + | AnthropicContentBlockStartEvent + | AnthropicContentBlockDeltaEvent + | AnthropicContentBlockStopEvent + | AnthropicMessageDeltaEvent + | AnthropicMessageStopEvent; + +export interface AnthropicError { + type: "error"; + error: { + type: string; + message: string; + }; +}