From 9d19f3a9ace722b27f83ebba48d68707c2bff153 Mon Sep 17 00:00:00 2001 From: "Jiaxiao (mossaka) Zhou" Date: Thu, 26 Feb 2026 18:40:16 +0000 Subject: [PATCH 1/8] feat(api-proxy): add token extractor PassThrough stream Transform stream that observes LLM API responses flowing through and extracts token usage counts without modifying the data. Supports Anthropic/OpenAI/Copilot in both streaming (SSE) and non-streaming (JSON) modes. Emits a 'tokens' event with {input, output, total}. Co-Authored-By: Claude Opus 4.6 --- containers/api-proxy/token-extractor.js | 235 ++++++++++++++++ containers/api-proxy/token-extractor.test.js | 270 +++++++++++++++++++ 2 files changed, 505 insertions(+) create mode 100644 containers/api-proxy/token-extractor.js create mode 100644 containers/api-proxy/token-extractor.test.js diff --git a/containers/api-proxy/token-extractor.js b/containers/api-proxy/token-extractor.js new file mode 100644 index 00000000..5acb5441 --- /dev/null +++ b/containers/api-proxy/token-extractor.js @@ -0,0 +1,235 @@ +/** + * Token Extractor — a Transform stream that observes LLM API response data + * flowing through and extracts token usage counts. + * + * Data passes through unchanged (client sees identical response). + * On stream end, emits a 'tokens' event with { input, output, total }. + * + * Handles four response formats: + * 1. Anthropic non-streaming (JSON with usage.input_tokens / output_tokens) + * 2. Anthropic SSE (message_start → input_tokens, message_delta → output_tokens) + * 3. OpenAI non-streaming (JSON with usage.prompt_tokens / completion_tokens) + * 4. OpenAI SSE (usage field in chunk before [DONE]) + * + * Zero external dependencies. + */ + +'use strict'; + +const { Transform } = require('stream'); + +const ZERO_TOKENS = { input: 0, output: 0, total: 0 }; + +/** + * Extract token counts from a parsed Anthropic JSON response. + */ +function extractAnthropic(body) { + if (!body || typeof body !== 'object') return null; + const usage = body.usage; + if (!usage || typeof usage !== 'object') return null; + const input = typeof usage.input_tokens === 'number' ? usage.input_tokens : 0; + const output = typeof usage.output_tokens === 'number' ? usage.output_tokens : 0; + return { input, output, total: input + output }; +} + +/** + * Extract token counts from a parsed OpenAI JSON response. + * Also used for Copilot (same format). + */ +function extractOpenAI(body) { + if (!body || typeof body !== 'object') return null; + const usage = body.usage; + if (!usage || typeof usage !== 'object') return null; + const input = typeof usage.prompt_tokens === 'number' ? usage.prompt_tokens : 0; + const output = typeof usage.completion_tokens === 'number' ? usage.completion_tokens : 0; + const total = typeof usage.total_tokens === 'number' ? usage.total_tokens : input + output; + return { input, output, total }; +} + +/** + * Try to parse a JSON string, returning null on failure. + */ +function safeParse(str) { + try { + return JSON.parse(str); + } catch { + return null; + } +} + +class TokenExtractor extends Transform { + /** + * @param {Object} opts + * @param {string} opts.provider - 'anthropic' | 'openai' | 'copilot' + * @param {boolean} opts.isSSE - whether Content-Type indicates SSE + * @param {boolean} opts.skipExtraction - skip extraction (e.g. compressed) + */ + constructor({ provider, isSSE, skipExtraction }) { + super(); + this._provider = provider; + this._isSSE = isSSE; + this._skipExtraction = skipExtraction; + + if (isSSE) { + // SSE mode: parse line-by-line, track tokens incrementally + this._sseInput = 0; + this._sseOutput = 0; + this._sseTotal = 0; + this._sseLineBuf = ''; // buffer for incomplete lines across chunks + } else { + // Non-streaming mode: buffer the full body + this._chunks = []; + } + } + + _transform(chunk, encoding, callback) { + // Always pass data through unchanged + if (!this._skipExtraction) { + if (this._isSSE) { + this._processSSEChunk(chunk); + } else { + this._chunks.push(chunk); + } + } + callback(null, chunk); + } + + _flush(callback) { + try { + if (this._skipExtraction) { + this.emit('tokens', { ...ZERO_TOKENS }); + callback(); + return; + } + + if (this._isSSE) { + // Process any remaining data in the line buffer + if (this._sseLineBuf.trim()) { + this._processSSELine(this._sseLineBuf); + } + const total = this._sseTotal || (this._sseInput + this._sseOutput); + this.emit('tokens', { + input: this._sseInput, + output: this._sseOutput, + total, + }); + } else { + this._extractFromBuffer(); + } + } catch { + this.emit('tokens', { ...ZERO_TOKENS }); + } + callback(); + } + + /** + * Process an SSE chunk, splitting into lines and extracting token data. + */ + _processSSEChunk(chunk) { + const text = this._sseLineBuf + chunk.toString('utf8'); + const lines = text.split('\n'); + // Last element may be incomplete — keep it in the buffer + this._sseLineBuf = lines.pop(); + + for (const line of lines) { + this._processSSELine(line); + } + } + + /** + * Process a single SSE line, looking for data: prefixed JSON with usage info. + */ + _processSSELine(line) { + const trimmed = line.trim(); + if (!trimmed.startsWith('data:')) return; + + const payload = trimmed.slice(5).trim(); + if (!payload || payload === '[DONE]') return; + + const obj = safeParse(payload); + if (!obj) return; + + if (this._provider === 'anthropic') { + this._extractAnthropicSSE(obj); + } else { + // openai / copilot + this._extractOpenAISSE(obj); + } + } + + /** + * Anthropic SSE: input from message_start, output from message_delta. + */ + _extractAnthropicSSE(obj) { + if (obj.type === 'message_start' && obj.message && obj.message.usage) { + const u = obj.message.usage; + if (typeof u.input_tokens === 'number') { + this._sseInput = u.input_tokens; + } + } + if (obj.type === 'message_delta' && obj.usage) { + const u = obj.usage; + if (typeof u.output_tokens === 'number') { + this._sseOutput = u.output_tokens; + } + } + } + + /** + * OpenAI SSE: usage field appears in the final chunk before [DONE]. + */ + _extractOpenAISSE(obj) { + if (obj.usage && typeof obj.usage === 'object') { + const u = obj.usage; + if (typeof u.prompt_tokens === 'number') this._sseInput = u.prompt_tokens; + if (typeof u.completion_tokens === 'number') this._sseOutput = u.completion_tokens; + if (typeof u.total_tokens === 'number') this._sseTotal = u.total_tokens; + } + } + + /** + * Parse buffered non-streaming response and extract tokens. + */ + _extractFromBuffer() { + if (!this._chunks.length) { + this.emit('tokens', { ...ZERO_TOKENS }); + return; + } + + const body = Buffer.concat(this._chunks).toString('utf8'); + const parsed = safeParse(body); + if (!parsed) { + this.emit('tokens', { ...ZERO_TOKENS }); + return; + } + + let result = null; + if (this._provider === 'anthropic') { + result = extractAnthropic(parsed); + } else { + // openai / copilot + result = extractOpenAI(parsed); + } + + this.emit('tokens', result || { ...ZERO_TOKENS }); + } +} + +/** + * Factory function to create a TokenExtractor stream. + * + * @param {Object} opts + * @param {string} opts.provider - 'anthropic' | 'openai' | 'copilot' + * @param {string} opts.contentType - response Content-Type header value + * @param {string} [opts.contentEncoding] - response Content-Encoding header value + * @returns {TokenExtractor} + */ +function createTokenExtractor({ provider, contentType, contentEncoding }) { + const isSSE = typeof contentType === 'string' && contentType.includes('text/event-stream'); + const enc = (contentEncoding || '').toLowerCase(); + const skipExtraction = enc === 'gzip' || enc === 'br' || enc === 'deflate'; + + return new TokenExtractor({ provider, isSSE, skipExtraction }); +} + +module.exports = { createTokenExtractor, TokenExtractor }; diff --git a/containers/api-proxy/token-extractor.test.js b/containers/api-proxy/token-extractor.test.js new file mode 100644 index 00000000..7afd1e58 --- /dev/null +++ b/containers/api-proxy/token-extractor.test.js @@ -0,0 +1,270 @@ +/** + * Tests for token-extractor.js + */ + +'use strict'; + +const { createTokenExtractor } = require('./token-extractor'); +const { Readable, Writable } = require('stream'); +const { pipeline } = require('stream/promises'); + +/** + * Helper: pipe data through a token extractor and collect the tokens event + output. + */ +async function extract(data, opts) { + const extractor = createTokenExtractor(opts); + const outputChunks = []; + + const tokensPromise = new Promise((resolve) => { + extractor.on('tokens', resolve); + }); + + const source = Readable.from(typeof data === 'string' ? [Buffer.from(data)] : data.map(d => Buffer.from(d))); + const sink = new Writable({ + write(chunk, enc, cb) { + outputChunks.push(chunk); + cb(); + }, + }); + + await pipeline(source, extractor, sink); + const tokens = await tokensPromise; + const output = Buffer.concat(outputChunks).toString('utf8'); + return { tokens, output }; +} + +// ─── Anthropic non-streaming ────────────────────────────────────── + +describe('Anthropic non-streaming', () => { + const baseOpts = { provider: 'anthropic', contentType: 'application/json', contentEncoding: '' }; + + test('extracts input_tokens and output_tokens', async () => { + const body = JSON.stringify({ + id: 'msg_123', + content: [{ type: 'text', text: 'Hello' }], + usage: { input_tokens: 100, output_tokens: 50 }, + }); + const { tokens, output } = await extract(body, baseOpts); + expect(tokens).toEqual({ input: 100, output: 50, total: 150 }); + expect(output).toBe(body); // data passes through unchanged + }); + + test('handles missing usage field', async () => { + const body = JSON.stringify({ id: 'msg_123', content: [] }); + const { tokens } = await extract(body, baseOpts); + expect(tokens).toEqual({ input: 0, output: 0, total: 0 }); + }); + + test('handles error response (no usage)', async () => { + const body = JSON.stringify({ type: 'error', error: { message: 'rate limited' } }); + const { tokens } = await extract(body, baseOpts); + expect(tokens).toEqual({ input: 0, output: 0, total: 0 }); + }); + + test('handles malformed JSON', async () => { + const { tokens } = await extract('not json at all{{{', baseOpts); + expect(tokens).toEqual({ input: 0, output: 0, total: 0 }); + }); + + test('handles empty body', async () => { + const { tokens } = await extract('', baseOpts); + expect(tokens).toEqual({ input: 0, output: 0, total: 0 }); + }); + + test('handles multi-chunk body', async () => { + const body = JSON.stringify({ + usage: { input_tokens: 200, output_tokens: 100 }, + }); + // Split into multiple chunks + const chunks = [body.slice(0, 20), body.slice(20)]; + const { tokens, output } = await extract(chunks, baseOpts); + expect(tokens).toEqual({ input: 200, output: 100, total: 300 }); + expect(output).toBe(body); + }); +}); + +// ─── OpenAI non-streaming ───────────────────────────────────────── + +describe('OpenAI non-streaming', () => { + const baseOpts = { provider: 'openai', contentType: 'application/json', contentEncoding: '' }; + + test('extracts prompt_tokens and completion_tokens', async () => { + const body = JSON.stringify({ + id: 'chatcmpl-123', + choices: [{ message: { content: 'Hi' } }], + usage: { prompt_tokens: 100, completion_tokens: 50, total_tokens: 150 }, + }); + const { tokens, output } = await extract(body, baseOpts); + expect(tokens).toEqual({ input: 100, output: 50, total: 150 }); + expect(output).toBe(body); + }); + + test('uses total_tokens from response', async () => { + const body = JSON.stringify({ + usage: { prompt_tokens: 10, completion_tokens: 20, total_tokens: 35 }, // total != sum (cached) + }); + const { tokens } = await extract(body, baseOpts); + expect(tokens).toEqual({ input: 10, output: 20, total: 35 }); + }); + + test('handles missing usage', async () => { + const body = JSON.stringify({ id: 'chatcmpl-123', choices: [] }); + const { tokens } = await extract(body, baseOpts); + expect(tokens).toEqual({ input: 0, output: 0, total: 0 }); + }); +}); + +// ─── Copilot non-streaming (same as OpenAI) ──────────────────────── + +describe('Copilot non-streaming', () => { + test('extracts tokens using OpenAI format', async () => { + const body = JSON.stringify({ + usage: { prompt_tokens: 80, completion_tokens: 40, total_tokens: 120 }, + }); + const { tokens } = await extract(body, { + provider: 'copilot', contentType: 'application/json', contentEncoding: '', + }); + expect(tokens).toEqual({ input: 80, output: 40, total: 120 }); + }); +}); + +// ─── Anthropic SSE ──────────────────────────────────────────────── + +describe('Anthropic SSE', () => { + const baseOpts = { provider: 'anthropic', contentType: 'text/event-stream', contentEncoding: '' }; + + test('extracts tokens from message_start and message_delta events', async () => { + const sse = [ + 'event: message_start\n', + 'data: {"type":"message_start","message":{"usage":{"input_tokens":100}}}\n', + '\n', + 'event: content_block_delta\n', + 'data: {"type":"content_block_delta","delta":{"type":"text_delta","text":"Hello"}}\n', + '\n', + 'event: message_delta\n', + 'data: {"type":"message_delta","delta":{"stop_reason":"end_turn"},"usage":{"output_tokens":50}}\n', + '\n', + ]; + const { tokens, output } = await extract(sse, baseOpts); + expect(tokens).toEqual({ input: 100, output: 50, total: 150 }); + expect(output).toBe(sse.join('')); // data passes through unchanged + }); + + test('handles SSE with no token events', async () => { + const sse = [ + 'event: ping\n', + 'data: {}\n', + '\n', + ]; + const { tokens } = await extract(sse, baseOpts); + expect(tokens).toEqual({ input: 0, output: 0, total: 0 }); + }); + + test('handles SSE with only input tokens (no delta)', async () => { + const sse = [ + 'event: message_start\n', + 'data: {"type":"message_start","message":{"usage":{"input_tokens":75}}}\n', + '\n', + ]; + const { tokens } = await extract(sse, baseOpts); + expect(tokens).toEqual({ input: 75, output: 0, total: 75 }); + }); + + test('handles data split across chunks', async () => { + const fullLine = 'data: {"type":"message_start","message":{"usage":{"input_tokens":42}}}\n'; + const chunks = [fullLine.slice(0, 30), fullLine.slice(30)]; + const sseChunks = [ + 'event: message_start\n', + ...chunks, + '\nevent: message_delta\n', + 'data: {"type":"message_delta","usage":{"output_tokens":10}}\n\n', + ]; + const { tokens } = await extract(sseChunks, baseOpts); + expect(tokens).toEqual({ input: 42, output: 10, total: 52 }); + }); +}); + +// ─── OpenAI SSE ─────────────────────────────────────────────────── + +describe('OpenAI SSE', () => { + const baseOpts = { provider: 'openai', contentType: 'text/event-stream', contentEncoding: '' }; + + test('extracts tokens from usage chunk before [DONE]', async () => { + const sse = [ + 'data: {"id":"chatcmpl-1","choices":[{"delta":{"content":"Hi"}}]}\n\n', + 'data: {"id":"chatcmpl-1","choices":[],"usage":{"prompt_tokens":100,"completion_tokens":50,"total_tokens":150}}\n\n', + 'data: [DONE]\n\n', + ]; + const { tokens, output } = await extract(sse, baseOpts); + expect(tokens).toEqual({ input: 100, output: 50, total: 150 }); + expect(output).toBe(sse.join('')); + }); + + test('handles SSE with no usage chunk', async () => { + const sse = [ + 'data: {"id":"chatcmpl-1","choices":[{"delta":{"content":"Hi"}}]}\n\n', + 'data: [DONE]\n\n', + ]; + const { tokens } = await extract(sse, baseOpts); + expect(tokens).toEqual({ input: 0, output: 0, total: 0 }); + }); +}); + +// ─── Copilot SSE (same as OpenAI) ──────────────────────────────── + +describe('Copilot SSE', () => { + test('extracts tokens using OpenAI SSE format', async () => { + const sse = [ + 'data: {"id":"1","choices":[],"usage":{"prompt_tokens":60,"completion_tokens":30,"total_tokens":90}}\n\n', + 'data: [DONE]\n\n', + ]; + const { tokens } = await extract(sse, { + provider: 'copilot', contentType: 'text/event-stream', contentEncoding: '', + }); + expect(tokens).toEqual({ input: 60, output: 30, total: 90 }); + }); +}); + +// ─── Content-Encoding (compressed) ──────────────────────────────── + +describe('compressed responses', () => { + test.each(['gzip', 'br', 'deflate'])('skips extraction for %s', async (enc) => { + const body = JSON.stringify({ usage: { input_tokens: 999, output_tokens: 999 } }); + const { tokens, output } = await extract(body, { + provider: 'anthropic', contentType: 'application/json', contentEncoding: enc, + }); + expect(tokens).toEqual({ input: 0, output: 0, total: 0 }); + expect(output).toBe(body); // still passes through + }); + + test('does not skip for empty content-encoding', async () => { + const body = JSON.stringify({ usage: { input_tokens: 10, output_tokens: 5 } }); + const { tokens } = await extract(body, { + provider: 'anthropic', contentType: 'application/json', contentEncoding: '', + }); + expect(tokens).toEqual({ input: 10, output: 5, total: 15 }); + }); +}); + +// ─── Data integrity ─────────────────────────────────────────────── + +describe('data integrity', () => { + test('binary data passes through unchanged', async () => { + const binary = Buffer.from([0x00, 0x01, 0xff, 0xfe, 0x80]); + const extractor = createTokenExtractor({ + provider: 'openai', contentType: 'application/octet-stream', contentEncoding: '', + }); + const outputChunks = []; + + const tokensPromise = new Promise((resolve) => extractor.on('tokens', resolve)); + const source = Readable.from([binary]); + const sink = new Writable({ + write(chunk, enc, cb) { outputChunks.push(chunk); cb(); }, + }); + + await pipeline(source, extractor, sink); + await tokensPromise; + + expect(Buffer.concat(outputChunks)).toEqual(binary); + }); +}); From 994080a8de0dd69b4f4522745c63a52b7515da8d Mon Sep 17 00:00:00 2001 From: "Jiaxiao (mossaka) Zhou" Date: Thu, 26 Feb 2026 18:44:39 +0000 Subject: [PATCH 2/8] feat(api-proxy): add token-per-minute rate limiting and CLI support Add TPM (tokens per minute) as a fourth rate limit type alongside RPM, RPH, and bytes-per-minute. TPM is opt-in (disabled by default, 0 means unlimited) and uses the same sliding window algorithm. Key changes: - rate-limiter.js: tpmWindow in ProviderState, TPM check in check(), new recordTokens() method for post-response token recording, TPM status in getStatus(), AWF_RATE_LIMIT_TPM env var in create() - cli.ts: --rate-limit-tpm flag, buildRateLimitConfig/validateRateLimitFlags - types.ts: tpm field in RateLimitConfig - docker-manager.ts: AWF_RATE_LIMIT_TPM env var to api-proxy container - awf-runner.ts: rateLimitTpm test option - Dockerfile: token-extractor.js in COPY line Co-Authored-By: Claude Opus 4.6 --- containers/api-proxy/Dockerfile | 2 +- containers/api-proxy/rate-limiter.js | 83 ++++++++++++++++++++++++++-- src/cli.test.ts | 4 +- src/cli.ts | 23 ++++++-- src/docker-manager.test.ts | 4 +- src/docker-manager.ts | 1 + src/types.ts | 2 + tests/fixtures/awf-runner.ts | 7 +++ 8 files changed, 110 insertions(+), 16 deletions(-) diff --git a/containers/api-proxy/Dockerfile b/containers/api-proxy/Dockerfile index 8e74fe1d..c7a084f6 100644 --- a/containers/api-proxy/Dockerfile +++ b/containers/api-proxy/Dockerfile @@ -15,7 +15,7 @@ COPY package*.json ./ RUN npm ci --omit=dev # Copy application files -COPY server.js logging.js metrics.js rate-limiter.js ./ +COPY server.js logging.js metrics.js rate-limiter.js token-extractor.js ./ # Create non-root user RUN addgroup -S apiproxy && adduser -S apiproxy -G apiproxy diff --git a/containers/api-proxy/rate-limiter.js b/containers/api-proxy/rate-limiter.js index 45693ada..199c2599 100644 --- a/containers/api-proxy/rate-limiter.js +++ b/containers/api-proxy/rate-limiter.js @@ -1,10 +1,11 @@ /** * Sliding Window Counter Rate Limiter for AWF API Proxy. * - * Provides per-provider rate limiting with three limit types: + * Provides per-provider rate limiting with four limit types: * - RPM: requests per minute (1-second granularity, 60 slots) * - RPH: requests per hour (1-minute granularity, 60 slots) * - Bytes/min: request bytes per minute (1-second granularity, 60 slots) + * - TPM: tokens per minute (1-second granularity, 60 slots) * * Algorithm: sliding window counter — counts in the current window plus a * weighted portion of the previous window based on elapsed time. @@ -20,6 +21,7 @@ const DEFAULT_RPM = 600; const DEFAULT_RPH = 1000; const DEFAULT_BYTES_PM = 50 * 1024 * 1024; // 50 MB +const DEFAULT_TPM = 0; // 0 means disabled/unlimited — opt-in like other limits // ── Window sizes ──────────────────────────────────────────────────────── const MINUTE_SLOTS = 60; // 1-second granularity for per-minute windows @@ -143,6 +145,8 @@ class ProviderState { this.rphWindow = createWindow(HOUR_SLOTS); // Bytes per minute: 1-second granularity this.bytesWindow = createWindow(MINUTE_SLOTS); + // Tokens per minute: 1-second granularity + this.tpmWindow = createWindow(MINUTE_SLOTS); } } @@ -152,12 +156,14 @@ class RateLimiter { * @param {number} [config.rpm=600] - Max requests per minute * @param {number} [config.rph=1000] - Max requests per hour * @param {number} [config.bytesPm=52428800] - Max bytes per minute (50 MB) + * @param {number} [config.tpm=0] - Max tokens per minute (0 = unlimited) * @param {boolean} [config.enabled=true] - Whether rate limiting is active */ constructor(config = {}) { this.rpm = config.rpm ?? DEFAULT_RPM; this.rph = config.rph ?? DEFAULT_RPH; this.bytesPm = config.bytesPm ?? DEFAULT_BYTES_PM; + this.tpm = config.tpm ?? DEFAULT_TPM; this.enabled = config.enabled !== false; /** @type {Map} */ @@ -184,6 +190,10 @@ class RateLimiter { * If allowed, the request is counted (recorded in windows). * If denied, no recording happens — the caller should return 429. * + * Note: TPM check uses previously recorded token consumption. Tokens + * are recorded post-response via recordTokens(), so the check here + * decides if the NEXT request is allowed based on PREVIOUS consumption. + * * @param {string} provider - e.g. "openai", "anthropic", "copilot" * @param {number} [requestBytes=0] - Size of the request body in bytes * @returns {{ @@ -253,6 +263,23 @@ class RateLimiter { }; } + // Check TPM (tokens per minute) — only if configured + if (this.tpm > 0) { + const tpmCount = getWindowCount(state.tpmWindow, nowSec, MINUTE_SLOTS); + if (tpmCount >= this.tpm) { + const retryAfter = estimateRetryAfter(state.tpmWindow, nowSec, MINUTE_SLOTS, this.tpm); + const resetAt = nowSec + retryAfter; + return { + allowed: false, + limitType: 'tpm', + limit: this.tpm, + remaining: 0, + retryAfter, + resetAt, + }; + } + } + // All checks passed — record the request recordInWindow(state.rpmWindow, nowSec, MINUTE_SLOTS, 1); recordInWindow(state.rphWindow, nowMin, HOUR_SLOTS, 1); @@ -275,10 +302,32 @@ class RateLimiter { } } + /** + * Record token usage for a provider after a response completes. + * + * This is separate from check() because token counts are only known + * after the response is received and parsed. The recorded tokens are + * used by subsequent check() calls to enforce the TPM limit. + * + * @param {string} provider - e.g. "openai", "anthropic", "copilot" + * @param {number} tokenCount - Number of tokens consumed by the response + */ + recordTokens(provider, tokenCount) { + if (!this.enabled || this.tpm <= 0 || !tokenCount || tokenCount <= 0) return; + + try { + const state = this._getState(provider); + const nowSec = Math.floor(Date.now() / 1000); + recordInWindow(state.tpmWindow, nowSec, MINUTE_SLOTS, tokenCount); + } catch (_err) { + // Fail-open: ignore recording errors + } + } + /** * Get rate limit status for a single provider. * @param {string} provider - * @returns {object} Status with rpm, rph windows + * @returns {object} Status with rpm, rph, tpm windows */ getStatus(provider) { if (!this.enabled) { @@ -288,11 +337,15 @@ class RateLimiter { try { const state = this.providers.get(provider); if (!state) { - return { + const status = { enabled: true, rpm: { limit: this.rpm, remaining: this.rpm, reset: 0 }, rph: { limit: this.rph, remaining: this.rph, reset: 0 }, }; + if (this.tpm > 0) { + status.tpm = { limit: this.tpm, remaining: this.tpm, reset: 0 }; + } + return status; } const nowMs = Date.now(); @@ -309,7 +362,7 @@ class RateLimiter { ? estimateRetryAfter(state.rphWindow, nowMin, HOUR_SLOTS, this.rph) * 60 : 0; - return { + const status = { enabled: true, rpm: { limit: this.rpm, @@ -322,6 +375,21 @@ class RateLimiter { reset: rphRetry > 0 ? Math.floor(nowMs / 1000) + rphRetry : 0, }, }; + + // Include TPM status if configured + if (this.tpm > 0) { + const tpmCount = getWindowCount(state.tpmWindow, nowSec, MINUTE_SLOTS); + const tpmRetry = tpmCount >= this.tpm + ? estimateRetryAfter(state.tpmWindow, nowSec, MINUTE_SLOTS, this.tpm) + : 0; + status.tpm = { + limit: this.tpm, + remaining: Math.max(0, this.tpm - tpmCount), + reset: tpmRetry > 0 ? nowSec + tpmRetry : 0, + }; + } + + return status; } catch (_err) { return { enabled: true, error: 'internal_error' }; } @@ -344,9 +412,10 @@ class RateLimiter { * Create a RateLimiter from environment variables. * * Reads: - * - AWF_RATE_LIMIT_RPM (default: 60) + * - AWF_RATE_LIMIT_RPM (default: 600) * - AWF_RATE_LIMIT_RPH (default: 1000) * - AWF_RATE_LIMIT_BYTES_PM (default: 52428800) + * - AWF_RATE_LIMIT_TPM (default: 0 — disabled) * - AWF_RATE_LIMIT_ENABLED (default: "false" — rate limiting is opt-in) * * @returns {RateLimiter} @@ -355,13 +424,15 @@ function create() { const rawRpm = parseInt(process.env.AWF_RATE_LIMIT_RPM, 10); const rawRph = parseInt(process.env.AWF_RATE_LIMIT_RPH, 10); const rawBytesPm = parseInt(process.env.AWF_RATE_LIMIT_BYTES_PM, 10); + const rawTpm = parseInt(process.env.AWF_RATE_LIMIT_TPM, 10); const rpm = (Number.isFinite(rawRpm) && rawRpm > 0) ? rawRpm : DEFAULT_RPM; const rph = (Number.isFinite(rawRph) && rawRph > 0) ? rawRph : DEFAULT_RPH; const bytesPm = (Number.isFinite(rawBytesPm) && rawBytesPm > 0) ? rawBytesPm : DEFAULT_BYTES_PM; + const tpm = (Number.isFinite(rawTpm) && rawTpm > 0) ? rawTpm : DEFAULT_TPM; const enabled = process.env.AWF_RATE_LIMIT_ENABLED === 'true'; - return new RateLimiter({ rpm, rph, bytesPm, enabled }); + return new RateLimiter({ rpm, rph, bytesPm, tpm, enabled }); } module.exports = { RateLimiter, create }; diff --git a/src/cli.test.ts b/src/cli.test.ts index 6420ed37..feabeab3 100644 --- a/src/cli.test.ts +++ b/src/cli.test.ts @@ -1426,7 +1426,7 @@ describe('cli', () => { it('should return defaults when no options provided', () => { const r = buildRateLimitConfig({}); expect('config' in r).toBe(true); - if ('config' in r) { expect(r.config).toEqual({ enabled: false, rpm: 0, rph: 0, bytesPm: 0 }); } + if ('config' in r) { expect(r.config).toEqual({ enabled: false, rpm: 0, rph: 0, bytesPm: 0, tpm: 0 }); } }); it('should disable with rateLimit=false even if limits provided', () => { const r = buildRateLimitConfig({ rateLimit: false, rateLimitRpm: '30' }); @@ -1465,7 +1465,7 @@ describe('cli', () => { }); it('should accept all custom values', () => { const r = buildRateLimitConfig({ rateLimitRpm: '10', rateLimitRph: '100', rateLimitBytesPm: '5000000' }); - if ('config' in r) { expect(r.config).toEqual({ enabled: true, rpm: 10, rph: 100, bytesPm: 5000000 }); } + if ('config' in r) { expect(r.config).toEqual({ enabled: true, rpm: 10, rph: 100, bytesPm: 5000000, tpm: 0 }); } }); }); diff --git a/src/cli.ts b/src/cli.ts index b06238e5..79200f3d 100644 --- a/src/cli.ts +++ b/src/cli.ts @@ -303,23 +303,25 @@ export function buildRateLimitConfig(options: { rateLimitRpm?: string; rateLimitRph?: string; rateLimitBytesPm?: string; + rateLimitTpm?: string; }): { config: RateLimitConfig } | { error: string } { // --no-rate-limit explicitly disables (even if other flags are set) if (options.rateLimit === false) { - return { config: { enabled: false, rpm: 0, rph: 0, bytesPm: 0 } }; + return { config: { enabled: false, rpm: 0, rph: 0, bytesPm: 0, tpm: 0 } }; } // Rate limiting is opt-in: disabled unless at least one --rate-limit-* flag is provided const hasAnyLimit = options.rateLimitRpm !== undefined || options.rateLimitRph !== undefined || - options.rateLimitBytesPm !== undefined; + options.rateLimitBytesPm !== undefined || + options.rateLimitTpm !== undefined; if (!hasAnyLimit) { - return { config: { enabled: false, rpm: 0, rph: 0, bytesPm: 0 } }; + return { config: { enabled: false, rpm: 0, rph: 0, bytesPm: 0, tpm: 0 } }; } // Defaults for any limit not explicitly set - const config: RateLimitConfig = { enabled: true, rpm: 600, rph: 10000, bytesPm: 52428800 }; + const config: RateLimitConfig = { enabled: true, rpm: 600, rph: 10000, bytesPm: 52428800, tpm: 0 }; if (options.rateLimitRpm !== undefined) { const rpm = parseInt(options.rateLimitRpm, 10); @@ -336,6 +338,11 @@ export function buildRateLimitConfig(options: { if (isNaN(bytesPm) || bytesPm <= 0) return { error: '--rate-limit-bytes-pm must be a positive integer' }; config.bytesPm = bytesPm; } + if (options.rateLimitTpm !== undefined) { + const tpm = parseInt(options.rateLimitTpm, 10); + if (isNaN(tpm) || tpm <= 0) return { error: '--rate-limit-tpm must be a positive integer' }; + config.tpm = tpm; + } return { config }; } @@ -348,11 +355,13 @@ export function validateRateLimitFlags(enableApiProxy: boolean, options: { rateLimitRpm?: string; rateLimitRph?: string; rateLimitBytesPm?: string; + rateLimitTpm?: string; }): FlagValidationResult { if (!enableApiProxy) { const hasRateLimitFlags = options.rateLimitRpm !== undefined || options.rateLimitRph !== undefined || options.rateLimitBytesPm !== undefined || + options.rateLimitTpm !== undefined || options.rateLimit === false; if (hasRateLimitFlags) { return { valid: false, error: 'Rate limit flags require --enable-api-proxy' }; @@ -815,6 +824,10 @@ program '--rate-limit-bytes-pm ', 'Enable rate limiting: max request bytes per minute per provider (requires --enable-api-proxy)', ) + .option( + '--rate-limit-tpm ', + 'Enable rate limiting: max tokens per minute per provider (requires --enable-api-proxy)', + ) .option( '--no-rate-limit', 'Explicitly disable rate limiting in the API proxy (requires --enable-api-proxy)', @@ -1094,7 +1107,7 @@ program process.exit(1); } config.rateLimitConfig = rateLimitResult.config; - logger.debug(`Rate limiting: enabled=${rateLimitResult.config.enabled}, rpm=${rateLimitResult.config.rpm}, rph=${rateLimitResult.config.rph}, bytesPm=${rateLimitResult.config.bytesPm}`); + logger.debug(`Rate limiting: enabled=${rateLimitResult.config.enabled}, rpm=${rateLimitResult.config.rpm}, rph=${rateLimitResult.config.rph}, bytesPm=${rateLimitResult.config.bytesPm}, tpm=${rateLimitResult.config.tpm}`); } // Error if rate limit flags are used without --enable-api-proxy diff --git a/src/docker-manager.test.ts b/src/docker-manager.test.ts index a431c66c..cfcf2d37 100644 --- a/src/docker-manager.test.ts +++ b/src/docker-manager.test.ts @@ -1790,7 +1790,7 @@ describe('docker-manager', () => { ...mockConfig, enableApiProxy: true, openaiApiKey: 'sk-test-key', - rateLimitConfig: { enabled: true, rpm: 30, rph: 500, bytesPm: 10485760 }, + rateLimitConfig: { enabled: true, rpm: 30, rph: 500, bytesPm: 10485760, tpm: 0 }, }; const result = generateDockerCompose(configWithRateLimit, mockNetworkConfigWithProxy); const proxy = result.services['api-proxy']; @@ -1806,7 +1806,7 @@ describe('docker-manager', () => { ...mockConfig, enableApiProxy: true, openaiApiKey: 'sk-test-key', - rateLimitConfig: { enabled: false, rpm: 60, rph: 1000, bytesPm: 52428800 }, + rateLimitConfig: { enabled: false, rpm: 60, rph: 1000, bytesPm: 52428800, tpm: 0 }, }; const result = generateDockerCompose(configWithRateLimit, mockNetworkConfigWithProxy); const proxy = result.services['api-proxy']; diff --git a/src/docker-manager.ts b/src/docker-manager.ts index aff4bad9..c355c399 100644 --- a/src/docker-manager.ts +++ b/src/docker-manager.ts @@ -993,6 +993,7 @@ export function generateDockerCompose( AWF_RATE_LIMIT_RPM: String(config.rateLimitConfig.rpm), AWF_RATE_LIMIT_RPH: String(config.rateLimitConfig.rph), AWF_RATE_LIMIT_BYTES_PM: String(config.rateLimitConfig.bytesPm), + AWF_RATE_LIMIT_TPM: String(config.rateLimitConfig.tpm), }), }, healthcheck: { diff --git a/src/types.ts b/src/types.ts index 1ee1c8bf..e5244d1d 100644 --- a/src/types.ts +++ b/src/types.ts @@ -523,6 +523,8 @@ export interface RateLimitConfig { rph: number; /** Max request bytes per minute per provider (default: 52428800 = 50 MB) */ bytesPm: number; + /** Max tokens per minute per provider (default: 0 = unlimited/disabled) */ + tpm: number; } /** diff --git a/tests/fixtures/awf-runner.ts b/tests/fixtures/awf-runner.ts index a3b5cfeb..acfecf76 100644 --- a/tests/fixtures/awf-runner.ts +++ b/tests/fixtures/awf-runner.ts @@ -22,6 +22,7 @@ export interface AwfOptions { rateLimitRpm?: number; // Requests per minute per provider rateLimitRph?: number; // Requests per hour per provider rateLimitBytesPm?: number; // Request bytes per minute per provider + rateLimitTpm?: number; // Tokens per minute per provider noRateLimit?: boolean; // Disable rate limiting envAll?: boolean; // Pass all host environment variables to container (--env-all) cliEnv?: Record; // Explicit -e KEY=VALUE flags passed to AWF CLI @@ -126,6 +127,9 @@ export class AwfRunner { if (options.rateLimitBytesPm !== undefined) { args.push('--rate-limit-bytes-pm', String(options.rateLimitBytesPm)); } + if (options.rateLimitTpm !== undefined) { + args.push('--rate-limit-tpm', String(options.rateLimitTpm)); + } if (options.noRateLimit) { args.push('--no-rate-limit'); } @@ -306,6 +310,9 @@ export class AwfRunner { if (options.rateLimitBytesPm !== undefined) { args.push('--rate-limit-bytes-pm', String(options.rateLimitBytesPm)); } + if (options.rateLimitTpm !== undefined) { + args.push('--rate-limit-tpm', String(options.rateLimitTpm)); + } if (options.noRateLimit) { args.push('--no-rate-limit'); } From c210cbcd3b7f8e0febd56889042a1fb2e350812a Mon Sep 17 00:00:00 2001 From: "Jiaxiao (mossaka) Zhou" Date: Thu, 26 Feb 2026 18:48:01 +0000 Subject: [PATCH 3/8] feat(api-proxy): integrate token extractor into proxy response pipeline MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Pipe responses through token extractor to capture LLM token usage: - proxyRes → tokenExtractor → res (data passes through unchanged) - Listens for 'tokens' event to record metrics (tokens_input_total, tokens_output_total) and feed rate limiter's TPM window - Logs token counts as structured 'tokens_recorded' event - Defensive: checks limiter.recordTokens exists before calling Co-Authored-By: Claude Opus 4.6 (1M context) --- containers/api-proxy/server.js | 32 +++++++++++++++++++++++++++++++- 1 file changed, 31 insertions(+), 1 deletion(-) diff --git a/containers/api-proxy/server.js b/containers/api-proxy/server.js index 9c6acc4f..da87cc98 100644 --- a/containers/api-proxy/server.js +++ b/containers/api-proxy/server.js @@ -17,6 +17,7 @@ const { HttpsProxyAgent } = require('https-proxy-agent'); const { generateRequestId, sanitizeForLog, logRequest } = require('./logging'); const metrics = require('./metrics'); const rateLimiter = require('./rate-limiter'); +const { createTokenExtractor } = require('./token-extractor'); // Create rate limiter from environment variables const limiter = rateLimiter.create(); @@ -319,7 +320,36 @@ function proxyRequest(req, res, targetHost, injectHeaders, provider) { // Copy response headers and add X-Request-ID const resHeaders = { ...proxyRes.headers, 'x-request-id': requestId }; res.writeHead(proxyRes.statusCode, resHeaders); - proxyRes.pipe(res); + + // Extract token counts from response (best-effort, fail-open) + const resContentType = proxyRes.headers['content-type'] || ''; + const resContentEncoding = proxyRes.headers['content-encoding'] || ''; + const tokenExtractor = createTokenExtractor({ + provider, + contentType: resContentType, + contentEncoding: resContentEncoding, + }); + + tokenExtractor.on('tokens', (tokens) => { + if (tokens.input > 0) { + metrics.increment('tokens_input_total', { provider }, tokens.input); + } + if (tokens.output > 0) { + metrics.increment('tokens_output_total', { provider }, tokens.output); + } + if (tokens.total > 0 && typeof limiter.recordTokens === 'function') { + limiter.recordTokens(provider, tokens.total); + } + logRequest('info', 'tokens_recorded', { + request_id: requestId, + provider, + input_tokens: tokens.input, + output_tokens: tokens.output, + total_tokens: tokens.total, + }); + }); + + proxyRes.pipe(tokenExtractor).pipe(res); }); proxyReq.on('error', (err) => { From fef17a69752c741ce5c81f73f7ab19e967a16e99 Mon Sep 17 00:00:00 2001 From: "Jiaxiao (mossaka) Zhou" Date: Thu, 26 Feb 2026 18:48:37 +0000 Subject: [PATCH 4/8] test: add integration tests for token rate limiting Tests verify end-to-end TPM rate limiting with Docker containers: - No token-based 429s when --rate-limit-tpm is not set - TPM appears in /health when configured - TPM absent from /health when not configured - AWF_RATE_LIMIT_TPM env var correctly passed to api-proxy - --rate-limit-tpm alone enables the rate limiter - Default RPM/RPH still active when only TPM is set Co-Authored-By: Claude Opus 4.6 --- .../api-proxy-token-ratelimit.test.ts | 186 ++++++++++++++++++ 1 file changed, 186 insertions(+) create mode 100644 tests/integration/api-proxy-token-ratelimit.test.ts diff --git a/tests/integration/api-proxy-token-ratelimit.test.ts b/tests/integration/api-proxy-token-ratelimit.test.ts new file mode 100644 index 00000000..9886f3d7 --- /dev/null +++ b/tests/integration/api-proxy-token-ratelimit.test.ts @@ -0,0 +1,186 @@ +/** + * API Proxy Token Rate Limiting Integration Tests + * + * Tests that token-per-minute (TPM) rate limiting works end-to-end with + * actual Docker containers. Uses the --rate-limit-tpm flag to enable + * token-based rate limiting. + * + * Note: These tests require the token-extractor.js module to be present + * in the api-proxy container. If the module is not yet merged, tests + * that depend on actual token extraction from responses will be skipped. + */ + +/// + +import { describe, test, expect, beforeAll, afterAll } from '@jest/globals'; +import { createRunner, AwfRunner } from '../fixtures/awf-runner'; +import { cleanup } from '../fixtures/cleanup'; + +// The API proxy sidecar is at this fixed IP on the awf-net network +const API_PROXY_IP = '172.30.0.30'; + +describe('API Proxy Token Rate Limiting', () => { + let runner: AwfRunner; + + beforeAll(async () => { + await cleanup(false); + runner = createRunner(); + }); + + afterAll(async () => { + await cleanup(false); + }); + + test('should not token-rate-limit by default (no --rate-limit-tpm)', async () => { + // Without --rate-limit-tpm, no token-based 429s should occur + const script = [ + 'ALL_OK=true', + 'for i in 1 2 3 4 5; do', + ` CODE=$(curl -s -o /dev/null -w "%{http_code}" -X POST http://${API_PROXY_IP}:10001/v1/messages -H "Content-Type: application/json" -d "{\\"model\\":\\"test\\"}")`, + ' if [ "$CODE" = "429" ]; then ALL_OK=false; fi', + 'done', + 'if [ "$ALL_OK" = "true" ]; then echo "NO_TPM_LIMITS"; else echo "GOT_429"; fi', + ].join('\n'); + + const result = await runner.runWithSudo( + `bash -c '${script}'`, + { + allowDomains: ['api.anthropic.com'], + enableApiProxy: true, + buildLocal: true, + // No rateLimitTpm — TPM is disabled by default + logLevel: 'debug', + timeout: 120000, + env: { + ANTHROPIC_API_KEY: 'sk-ant-fake-test-key-12345', + }, + } + ); + + expect(result).toSucceed(); + expect(result.stdout).toContain('NO_TPM_LIMITS'); + }, 180000); + + test('should show TPM in /health when --rate-limit-tpm is set', async () => { + // Set a TPM limit and verify it shows in the health endpoint + const script = [ + // Make one request to create provider state + `curl -s -X POST http://${API_PROXY_IP}:10001/v1/messages -H 'Content-Type: application/json' -d '{"model":"test"}' > /dev/null`, + // Check health for TPM config + `curl -s http://${API_PROXY_IP}:10000/health`, + ].join(' && '); + + const result = await runner.runWithSudo( + `bash -c "${script}"`, + { + allowDomains: ['api.anthropic.com'], + enableApiProxy: true, + buildLocal: true, + rateLimitTpm: 10000, + logLevel: 'debug', + timeout: 120000, + env: { + ANTHROPIC_API_KEY: 'sk-ant-fake-test-key-12345', + }, + } + ); + + expect(result).toSucceed(); + // The health response should include rate_limits with TPM info + expect(result.stdout).toContain('"rate_limits"'); + // TPM limit should appear in the health output + expect(result.stdout).toContain('"tpm"'); + }, 180000); + + test('should not show TPM in /health when --rate-limit-tpm is not set', async () => { + // Without TPM configured, health should not include TPM section + const script = [ + // Make one request to create provider state + `curl -s -X POST http://${API_PROXY_IP}:10001/v1/messages -H 'Content-Type: application/json' -d '{"model":"test"}' > /dev/null`, + // Check health + `curl -s http://${API_PROXY_IP}:10000/health`, + ].join(' && '); + + const result = await runner.runWithSudo( + `bash -c "${script}"`, + { + allowDomains: ['api.anthropic.com'], + enableApiProxy: true, + buildLocal: true, + rateLimitRpm: 100, + // No rateLimitTpm + logLevel: 'debug', + timeout: 120000, + env: { + ANTHROPIC_API_KEY: 'sk-ant-fake-test-key-12345', + }, + } + ); + + expect(result).toSucceed(); + expect(result.stdout).toContain('"rate_limits"'); + // TPM should NOT appear when not configured + expect(result.stdout).not.toContain('"tpm"'); + }, 180000); + + test('should pass AWF_RATE_LIMIT_TPM env var to api-proxy container', async () => { + // Verify the env var is passed by checking docker-compose config + // The simplest way: set a TPM value and check the health endpoint + // shows the correct limit value + const script = [ + `curl -s -X POST http://${API_PROXY_IP}:10001/v1/messages -H 'Content-Type: application/json' -d '{"model":"test"}' > /dev/null`, + `curl -s http://${API_PROXY_IP}:10000/health`, + ].join(' && '); + + const result = await runner.runWithSudo( + `bash -c "${script}"`, + { + allowDomains: ['api.anthropic.com'], + enableApiProxy: true, + buildLocal: true, + rateLimitTpm: 5000, + logLevel: 'debug', + timeout: 120000, + env: { + ANTHROPIC_API_KEY: 'sk-ant-fake-test-key-12345', + }, + } + ); + + expect(result).toSucceed(); + // The TPM limit of 5000 should be reflected in the health endpoint + expect(result.stdout).toContain('"limit":5000'); + }, 180000); + + test('--rate-limit-tpm alone should enable rate limiting', async () => { + // Using only --rate-limit-tpm (without --rate-limit-rpm) should still + // enable the rate limiter and show rate_limits in health + const script = [ + `curl -s -X POST http://${API_PROXY_IP}:10001/v1/messages -H 'Content-Type: application/json' -d '{"model":"test"}' > /dev/null`, + `curl -s http://${API_PROXY_IP}:10000/health`, + ].join(' && '); + + const result = await runner.runWithSudo( + `bash -c "${script}"`, + { + allowDomains: ['api.anthropic.com'], + enableApiProxy: true, + buildLocal: true, + rateLimitTpm: 10000, + // No rateLimitRpm, rateLimitRph, or rateLimitBytesPm + logLevel: 'debug', + timeout: 120000, + env: { + ANTHROPIC_API_KEY: 'sk-ant-fake-test-key-12345', + }, + } + ); + + expect(result).toSucceed(); + // Rate limiting should be enabled even with only TPM set + expect(result.stdout).toContain('"rate_limits"'); + expect(result.stdout).toContain('"tpm"'); + // Default RPM/RPH should also be active since rate limiting is enabled + expect(result.stdout).toContain('"rpm"'); + }, 180000); +}); From 169a71c72db93179c29676f55631a9c9c0753faa Mon Sep 17 00:00:00 2001 From: "Jiaxiao (mossaka) Zhou" Date: Thu, 26 Feb 2026 19:20:38 +0000 Subject: [PATCH 5/8] fix: make integration tests robust against Docker build output in stdout When --build-local is used, Docker build output gets mixed into stdout before the actual command output. This causes JSON.parse() failures and false positives for not.toContain() assertions. Add extractLastJson() helper that scans backwards from end of stdout to find the last complete JSON object, and extractHttpResponse() helper that finds the last HTTP response block. Apply these to all 7 failing tests in api-proxy-observability and api-proxy-rate-limit test suites. Co-Authored-By: Claude Opus 4.6 (1M context) --- .../api-proxy-observability.test.ts | 74 +++++++++++++++++-- .../integration/api-proxy-rate-limit.test.ts | 74 +++++++++++++++++-- 2 files changed, 136 insertions(+), 12 deletions(-) diff --git a/tests/integration/api-proxy-observability.test.ts b/tests/integration/api-proxy-observability.test.ts index 83d4ef98..a49fc2ce 100644 --- a/tests/integration/api-proxy-observability.test.ts +++ b/tests/integration/api-proxy-observability.test.ts @@ -14,6 +14,64 @@ import { cleanup } from '../fixtures/cleanup'; // The API proxy sidecar is at this fixed IP on the awf-net network const API_PROXY_IP = '172.30.0.30'; +/** + * Extract the last JSON object from stdout. + * + * When --build-local is used, Docker build output is mixed into stdout before + * the actual command output. This helper finds the last complete top-level + * JSON object in the output so that JSON.parse works reliably. + */ +function extractLastJson(stdout: string): unknown { + // Find the last '{' that starts a top-level JSON object + let depth = 0; + let jsonEnd = -1; + let jsonStart = -1; + + // Scan backwards from end to find the last complete JSON object + for (let i = stdout.length - 1; i >= 0; i--) { + const ch = stdout[i]; + if (ch === '}') { + if (depth === 0) jsonEnd = i; + depth++; + } else if (ch === '{') { + depth--; + if (depth === 0) { + jsonStart = i; + break; + } + } + } + + if (jsonStart === -1 || jsonEnd === -1) { + throw new Error(`No JSON object found in stdout (length=${stdout.length}): ${stdout.slice(-200)}`); + } + + return JSON.parse(stdout.slice(jsonStart, jsonEnd + 1)); +} + +/** + * Extract the HTTP response section from stdout when curl -i is used. + * + * Docker build output appears before the HTTP response. This finds the last + * HTTP response block (starting with "HTTP/") in stdout. + */ +function extractHttpResponse(stdout: string): string { + // Find the last occurrence of an HTTP status line + const httpPattern = /HTTP\/[\d.]+ \d+/g; + let lastMatch: RegExpExecArray | null = null; + let match: RegExpExecArray | null; + while ((match = httpPattern.exec(stdout)) !== null) { + lastMatch = match; + } + + if (lastMatch) { + return stdout.slice(lastMatch.index); + } + + // Fallback: return the whole stdout + return stdout; +} + describe('API Proxy Observability', () => { let runner: AwfRunner; @@ -176,10 +234,13 @@ describe('API Proxy Observability', () => { ); expect(result).toSucceed(); - const lower = result.stdout.toLowerCase(); + // Extract only the HTTP response portion to avoid Docker build output pollution + const httpResponse = extractHttpResponse(result.stdout); + const lower = httpResponse.toLowerCase(); expect(lower).toContain('x-request-id'); - // The injected ID should NOT appear — proxy should have generated a UUID instead - expect(result.stdout).not.toContain('