diff --git a/containers/api-proxy/Dockerfile b/containers/api-proxy/Dockerfile index 8e74fe1d..c7a084f6 100644 --- a/containers/api-proxy/Dockerfile +++ b/containers/api-proxy/Dockerfile @@ -15,7 +15,7 @@ COPY package*.json ./ RUN npm ci --omit=dev # Copy application files -COPY server.js logging.js metrics.js rate-limiter.js ./ +COPY server.js logging.js metrics.js rate-limiter.js token-extractor.js ./ # Create non-root user RUN addgroup -S apiproxy && adduser -S apiproxy -G apiproxy diff --git a/containers/api-proxy/rate-limiter.js b/containers/api-proxy/rate-limiter.js index 45693ada..199c2599 100644 --- a/containers/api-proxy/rate-limiter.js +++ b/containers/api-proxy/rate-limiter.js @@ -1,10 +1,11 @@ /** * Sliding Window Counter Rate Limiter for AWF API Proxy. * - * Provides per-provider rate limiting with three limit types: + * Provides per-provider rate limiting with four limit types: * - RPM: requests per minute (1-second granularity, 60 slots) * - RPH: requests per hour (1-minute granularity, 60 slots) * - Bytes/min: request bytes per minute (1-second granularity, 60 slots) + * - TPM: tokens per minute (1-second granularity, 60 slots) * * Algorithm: sliding window counter — counts in the current window plus a * weighted portion of the previous window based on elapsed time. @@ -20,6 +21,7 @@ const DEFAULT_RPM = 600; const DEFAULT_RPH = 1000; const DEFAULT_BYTES_PM = 50 * 1024 * 1024; // 50 MB +const DEFAULT_TPM = 0; // 0 means disabled/unlimited — opt-in like other limits // ── Window sizes ──────────────────────────────────────────────────────── const MINUTE_SLOTS = 60; // 1-second granularity for per-minute windows @@ -143,6 +145,8 @@ class ProviderState { this.rphWindow = createWindow(HOUR_SLOTS); // Bytes per minute: 1-second granularity this.bytesWindow = createWindow(MINUTE_SLOTS); + // Tokens per minute: 1-second granularity + this.tpmWindow = createWindow(MINUTE_SLOTS); } } @@ -152,12 +156,14 @@ class RateLimiter { * @param {number} [config.rpm=600] - Max requests per minute * @param {number} [config.rph=1000] - Max requests per hour * @param {number} [config.bytesPm=52428800] - Max bytes per minute (50 MB) + * @param {number} [config.tpm=0] - Max tokens per minute (0 = unlimited) * @param {boolean} [config.enabled=true] - Whether rate limiting is active */ constructor(config = {}) { this.rpm = config.rpm ?? DEFAULT_RPM; this.rph = config.rph ?? DEFAULT_RPH; this.bytesPm = config.bytesPm ?? DEFAULT_BYTES_PM; + this.tpm = config.tpm ?? DEFAULT_TPM; this.enabled = config.enabled !== false; /** @type {Map} */ @@ -184,6 +190,10 @@ class RateLimiter { * If allowed, the request is counted (recorded in windows). * If denied, no recording happens — the caller should return 429. * + * Note: TPM check uses previously recorded token consumption. Tokens + * are recorded post-response via recordTokens(), so the check here + * decides if the NEXT request is allowed based on PREVIOUS consumption. + * * @param {string} provider - e.g. "openai", "anthropic", "copilot" * @param {number} [requestBytes=0] - Size of the request body in bytes * @returns {{ @@ -253,6 +263,23 @@ class RateLimiter { }; } + // Check TPM (tokens per minute) — only if configured + if (this.tpm > 0) { + const tpmCount = getWindowCount(state.tpmWindow, nowSec, MINUTE_SLOTS); + if (tpmCount >= this.tpm) { + const retryAfter = estimateRetryAfter(state.tpmWindow, nowSec, MINUTE_SLOTS, this.tpm); + const resetAt = nowSec + retryAfter; + return { + allowed: false, + limitType: 'tpm', + limit: this.tpm, + remaining: 0, + retryAfter, + resetAt, + }; + } + } + // All checks passed — record the request recordInWindow(state.rpmWindow, nowSec, MINUTE_SLOTS, 1); recordInWindow(state.rphWindow, nowMin, HOUR_SLOTS, 1); @@ -275,10 +302,32 @@ class RateLimiter { } } + /** + * Record token usage for a provider after a response completes. + * + * This is separate from check() because token counts are only known + * after the response is received and parsed. The recorded tokens are + * used by subsequent check() calls to enforce the TPM limit. + * + * @param {string} provider - e.g. "openai", "anthropic", "copilot" + * @param {number} tokenCount - Number of tokens consumed by the response + */ + recordTokens(provider, tokenCount) { + if (!this.enabled || this.tpm <= 0 || !tokenCount || tokenCount <= 0) return; + + try { + const state = this._getState(provider); + const nowSec = Math.floor(Date.now() / 1000); + recordInWindow(state.tpmWindow, nowSec, MINUTE_SLOTS, tokenCount); + } catch (_err) { + // Fail-open: ignore recording errors + } + } + /** * Get rate limit status for a single provider. * @param {string} provider - * @returns {object} Status with rpm, rph windows + * @returns {object} Status with rpm, rph, tpm windows */ getStatus(provider) { if (!this.enabled) { @@ -288,11 +337,15 @@ class RateLimiter { try { const state = this.providers.get(provider); if (!state) { - return { + const status = { enabled: true, rpm: { limit: this.rpm, remaining: this.rpm, reset: 0 }, rph: { limit: this.rph, remaining: this.rph, reset: 0 }, }; + if (this.tpm > 0) { + status.tpm = { limit: this.tpm, remaining: this.tpm, reset: 0 }; + } + return status; } const nowMs = Date.now(); @@ -309,7 +362,7 @@ class RateLimiter { ? estimateRetryAfter(state.rphWindow, nowMin, HOUR_SLOTS, this.rph) * 60 : 0; - return { + const status = { enabled: true, rpm: { limit: this.rpm, @@ -322,6 +375,21 @@ class RateLimiter { reset: rphRetry > 0 ? Math.floor(nowMs / 1000) + rphRetry : 0, }, }; + + // Include TPM status if configured + if (this.tpm > 0) { + const tpmCount = getWindowCount(state.tpmWindow, nowSec, MINUTE_SLOTS); + const tpmRetry = tpmCount >= this.tpm + ? estimateRetryAfter(state.tpmWindow, nowSec, MINUTE_SLOTS, this.tpm) + : 0; + status.tpm = { + limit: this.tpm, + remaining: Math.max(0, this.tpm - tpmCount), + reset: tpmRetry > 0 ? nowSec + tpmRetry : 0, + }; + } + + return status; } catch (_err) { return { enabled: true, error: 'internal_error' }; } @@ -344,9 +412,10 @@ class RateLimiter { * Create a RateLimiter from environment variables. * * Reads: - * - AWF_RATE_LIMIT_RPM (default: 60) + * - AWF_RATE_LIMIT_RPM (default: 600) * - AWF_RATE_LIMIT_RPH (default: 1000) * - AWF_RATE_LIMIT_BYTES_PM (default: 52428800) + * - AWF_RATE_LIMIT_TPM (default: 0 — disabled) * - AWF_RATE_LIMIT_ENABLED (default: "false" — rate limiting is opt-in) * * @returns {RateLimiter} @@ -355,13 +424,15 @@ function create() { const rawRpm = parseInt(process.env.AWF_RATE_LIMIT_RPM, 10); const rawRph = parseInt(process.env.AWF_RATE_LIMIT_RPH, 10); const rawBytesPm = parseInt(process.env.AWF_RATE_LIMIT_BYTES_PM, 10); + const rawTpm = parseInt(process.env.AWF_RATE_LIMIT_TPM, 10); const rpm = (Number.isFinite(rawRpm) && rawRpm > 0) ? rawRpm : DEFAULT_RPM; const rph = (Number.isFinite(rawRph) && rawRph > 0) ? rawRph : DEFAULT_RPH; const bytesPm = (Number.isFinite(rawBytesPm) && rawBytesPm > 0) ? rawBytesPm : DEFAULT_BYTES_PM; + const tpm = (Number.isFinite(rawTpm) && rawTpm > 0) ? rawTpm : DEFAULT_TPM; const enabled = process.env.AWF_RATE_LIMIT_ENABLED === 'true'; - return new RateLimiter({ rpm, rph, bytesPm, enabled }); + return new RateLimiter({ rpm, rph, bytesPm, tpm, enabled }); } module.exports = { RateLimiter, create }; diff --git a/containers/api-proxy/server.js b/containers/api-proxy/server.js index 9c6acc4f..da87cc98 100644 --- a/containers/api-proxy/server.js +++ b/containers/api-proxy/server.js @@ -17,6 +17,7 @@ const { HttpsProxyAgent } = require('https-proxy-agent'); const { generateRequestId, sanitizeForLog, logRequest } = require('./logging'); const metrics = require('./metrics'); const rateLimiter = require('./rate-limiter'); +const { createTokenExtractor } = require('./token-extractor'); // Create rate limiter from environment variables const limiter = rateLimiter.create(); @@ -319,7 +320,36 @@ function proxyRequest(req, res, targetHost, injectHeaders, provider) { // Copy response headers and add X-Request-ID const resHeaders = { ...proxyRes.headers, 'x-request-id': requestId }; res.writeHead(proxyRes.statusCode, resHeaders); - proxyRes.pipe(res); + + // Extract token counts from response (best-effort, fail-open) + const resContentType = proxyRes.headers['content-type'] || ''; + const resContentEncoding = proxyRes.headers['content-encoding'] || ''; + const tokenExtractor = createTokenExtractor({ + provider, + contentType: resContentType, + contentEncoding: resContentEncoding, + }); + + tokenExtractor.on('tokens', (tokens) => { + if (tokens.input > 0) { + metrics.increment('tokens_input_total', { provider }, tokens.input); + } + if (tokens.output > 0) { + metrics.increment('tokens_output_total', { provider }, tokens.output); + } + if (tokens.total > 0 && typeof limiter.recordTokens === 'function') { + limiter.recordTokens(provider, tokens.total); + } + logRequest('info', 'tokens_recorded', { + request_id: requestId, + provider, + input_tokens: tokens.input, + output_tokens: tokens.output, + total_tokens: tokens.total, + }); + }); + + proxyRes.pipe(tokenExtractor).pipe(res); }); proxyReq.on('error', (err) => { diff --git a/containers/api-proxy/token-extractor.js b/containers/api-proxy/token-extractor.js new file mode 100644 index 00000000..5acb5441 --- /dev/null +++ b/containers/api-proxy/token-extractor.js @@ -0,0 +1,235 @@ +/** + * Token Extractor — a Transform stream that observes LLM API response data + * flowing through and extracts token usage counts. + * + * Data passes through unchanged (client sees identical response). + * On stream end, emits a 'tokens' event with { input, output, total }. + * + * Handles four response formats: + * 1. Anthropic non-streaming (JSON with usage.input_tokens / output_tokens) + * 2. Anthropic SSE (message_start → input_tokens, message_delta → output_tokens) + * 3. OpenAI non-streaming (JSON with usage.prompt_tokens / completion_tokens) + * 4. OpenAI SSE (usage field in chunk before [DONE]) + * + * Zero external dependencies. + */ + +'use strict'; + +const { Transform } = require('stream'); + +const ZERO_TOKENS = { input: 0, output: 0, total: 0 }; + +/** + * Extract token counts from a parsed Anthropic JSON response. + */ +function extractAnthropic(body) { + if (!body || typeof body !== 'object') return null; + const usage = body.usage; + if (!usage || typeof usage !== 'object') return null; + const input = typeof usage.input_tokens === 'number' ? usage.input_tokens : 0; + const output = typeof usage.output_tokens === 'number' ? usage.output_tokens : 0; + return { input, output, total: input + output }; +} + +/** + * Extract token counts from a parsed OpenAI JSON response. + * Also used for Copilot (same format). + */ +function extractOpenAI(body) { + if (!body || typeof body !== 'object') return null; + const usage = body.usage; + if (!usage || typeof usage !== 'object') return null; + const input = typeof usage.prompt_tokens === 'number' ? usage.prompt_tokens : 0; + const output = typeof usage.completion_tokens === 'number' ? usage.completion_tokens : 0; + const total = typeof usage.total_tokens === 'number' ? usage.total_tokens : input + output; + return { input, output, total }; +} + +/** + * Try to parse a JSON string, returning null on failure. + */ +function safeParse(str) { + try { + return JSON.parse(str); + } catch { + return null; + } +} + +class TokenExtractor extends Transform { + /** + * @param {Object} opts + * @param {string} opts.provider - 'anthropic' | 'openai' | 'copilot' + * @param {boolean} opts.isSSE - whether Content-Type indicates SSE + * @param {boolean} opts.skipExtraction - skip extraction (e.g. compressed) + */ + constructor({ provider, isSSE, skipExtraction }) { + super(); + this._provider = provider; + this._isSSE = isSSE; + this._skipExtraction = skipExtraction; + + if (isSSE) { + // SSE mode: parse line-by-line, track tokens incrementally + this._sseInput = 0; + this._sseOutput = 0; + this._sseTotal = 0; + this._sseLineBuf = ''; // buffer for incomplete lines across chunks + } else { + // Non-streaming mode: buffer the full body + this._chunks = []; + } + } + + _transform(chunk, encoding, callback) { + // Always pass data through unchanged + if (!this._skipExtraction) { + if (this._isSSE) { + this._processSSEChunk(chunk); + } else { + this._chunks.push(chunk); + } + } + callback(null, chunk); + } + + _flush(callback) { + try { + if (this._skipExtraction) { + this.emit('tokens', { ...ZERO_TOKENS }); + callback(); + return; + } + + if (this._isSSE) { + // Process any remaining data in the line buffer + if (this._sseLineBuf.trim()) { + this._processSSELine(this._sseLineBuf); + } + const total = this._sseTotal || (this._sseInput + this._sseOutput); + this.emit('tokens', { + input: this._sseInput, + output: this._sseOutput, + total, + }); + } else { + this._extractFromBuffer(); + } + } catch { + this.emit('tokens', { ...ZERO_TOKENS }); + } + callback(); + } + + /** + * Process an SSE chunk, splitting into lines and extracting token data. + */ + _processSSEChunk(chunk) { + const text = this._sseLineBuf + chunk.toString('utf8'); + const lines = text.split('\n'); + // Last element may be incomplete — keep it in the buffer + this._sseLineBuf = lines.pop(); + + for (const line of lines) { + this._processSSELine(line); + } + } + + /** + * Process a single SSE line, looking for data: prefixed JSON with usage info. + */ + _processSSELine(line) { + const trimmed = line.trim(); + if (!trimmed.startsWith('data:')) return; + + const payload = trimmed.slice(5).trim(); + if (!payload || payload === '[DONE]') return; + + const obj = safeParse(payload); + if (!obj) return; + + if (this._provider === 'anthropic') { + this._extractAnthropicSSE(obj); + } else { + // openai / copilot + this._extractOpenAISSE(obj); + } + } + + /** + * Anthropic SSE: input from message_start, output from message_delta. + */ + _extractAnthropicSSE(obj) { + if (obj.type === 'message_start' && obj.message && obj.message.usage) { + const u = obj.message.usage; + if (typeof u.input_tokens === 'number') { + this._sseInput = u.input_tokens; + } + } + if (obj.type === 'message_delta' && obj.usage) { + const u = obj.usage; + if (typeof u.output_tokens === 'number') { + this._sseOutput = u.output_tokens; + } + } + } + + /** + * OpenAI SSE: usage field appears in the final chunk before [DONE]. + */ + _extractOpenAISSE(obj) { + if (obj.usage && typeof obj.usage === 'object') { + const u = obj.usage; + if (typeof u.prompt_tokens === 'number') this._sseInput = u.prompt_tokens; + if (typeof u.completion_tokens === 'number') this._sseOutput = u.completion_tokens; + if (typeof u.total_tokens === 'number') this._sseTotal = u.total_tokens; + } + } + + /** + * Parse buffered non-streaming response and extract tokens. + */ + _extractFromBuffer() { + if (!this._chunks.length) { + this.emit('tokens', { ...ZERO_TOKENS }); + return; + } + + const body = Buffer.concat(this._chunks).toString('utf8'); + const parsed = safeParse(body); + if (!parsed) { + this.emit('tokens', { ...ZERO_TOKENS }); + return; + } + + let result = null; + if (this._provider === 'anthropic') { + result = extractAnthropic(parsed); + } else { + // openai / copilot + result = extractOpenAI(parsed); + } + + this.emit('tokens', result || { ...ZERO_TOKENS }); + } +} + +/** + * Factory function to create a TokenExtractor stream. + * + * @param {Object} opts + * @param {string} opts.provider - 'anthropic' | 'openai' | 'copilot' + * @param {string} opts.contentType - response Content-Type header value + * @param {string} [opts.contentEncoding] - response Content-Encoding header value + * @returns {TokenExtractor} + */ +function createTokenExtractor({ provider, contentType, contentEncoding }) { + const isSSE = typeof contentType === 'string' && contentType.includes('text/event-stream'); + const enc = (contentEncoding || '').toLowerCase(); + const skipExtraction = enc === 'gzip' || enc === 'br' || enc === 'deflate'; + + return new TokenExtractor({ provider, isSSE, skipExtraction }); +} + +module.exports = { createTokenExtractor, TokenExtractor }; diff --git a/containers/api-proxy/token-extractor.test.js b/containers/api-proxy/token-extractor.test.js new file mode 100644 index 00000000..7afd1e58 --- /dev/null +++ b/containers/api-proxy/token-extractor.test.js @@ -0,0 +1,270 @@ +/** + * Tests for token-extractor.js + */ + +'use strict'; + +const { createTokenExtractor } = require('./token-extractor'); +const { Readable, Writable } = require('stream'); +const { pipeline } = require('stream/promises'); + +/** + * Helper: pipe data through a token extractor and collect the tokens event + output. + */ +async function extract(data, opts) { + const extractor = createTokenExtractor(opts); + const outputChunks = []; + + const tokensPromise = new Promise((resolve) => { + extractor.on('tokens', resolve); + }); + + const source = Readable.from(typeof data === 'string' ? [Buffer.from(data)] : data.map(d => Buffer.from(d))); + const sink = new Writable({ + write(chunk, enc, cb) { + outputChunks.push(chunk); + cb(); + }, + }); + + await pipeline(source, extractor, sink); + const tokens = await tokensPromise; + const output = Buffer.concat(outputChunks).toString('utf8'); + return { tokens, output }; +} + +// ─── Anthropic non-streaming ────────────────────────────────────── + +describe('Anthropic non-streaming', () => { + const baseOpts = { provider: 'anthropic', contentType: 'application/json', contentEncoding: '' }; + + test('extracts input_tokens and output_tokens', async () => { + const body = JSON.stringify({ + id: 'msg_123', + content: [{ type: 'text', text: 'Hello' }], + usage: { input_tokens: 100, output_tokens: 50 }, + }); + const { tokens, output } = await extract(body, baseOpts); + expect(tokens).toEqual({ input: 100, output: 50, total: 150 }); + expect(output).toBe(body); // data passes through unchanged + }); + + test('handles missing usage field', async () => { + const body = JSON.stringify({ id: 'msg_123', content: [] }); + const { tokens } = await extract(body, baseOpts); + expect(tokens).toEqual({ input: 0, output: 0, total: 0 }); + }); + + test('handles error response (no usage)', async () => { + const body = JSON.stringify({ type: 'error', error: { message: 'rate limited' } }); + const { tokens } = await extract(body, baseOpts); + expect(tokens).toEqual({ input: 0, output: 0, total: 0 }); + }); + + test('handles malformed JSON', async () => { + const { tokens } = await extract('not json at all{{{', baseOpts); + expect(tokens).toEqual({ input: 0, output: 0, total: 0 }); + }); + + test('handles empty body', async () => { + const { tokens } = await extract('', baseOpts); + expect(tokens).toEqual({ input: 0, output: 0, total: 0 }); + }); + + test('handles multi-chunk body', async () => { + const body = JSON.stringify({ + usage: { input_tokens: 200, output_tokens: 100 }, + }); + // Split into multiple chunks + const chunks = [body.slice(0, 20), body.slice(20)]; + const { tokens, output } = await extract(chunks, baseOpts); + expect(tokens).toEqual({ input: 200, output: 100, total: 300 }); + expect(output).toBe(body); + }); +}); + +// ─── OpenAI non-streaming ───────────────────────────────────────── + +describe('OpenAI non-streaming', () => { + const baseOpts = { provider: 'openai', contentType: 'application/json', contentEncoding: '' }; + + test('extracts prompt_tokens and completion_tokens', async () => { + const body = JSON.stringify({ + id: 'chatcmpl-123', + choices: [{ message: { content: 'Hi' } }], + usage: { prompt_tokens: 100, completion_tokens: 50, total_tokens: 150 }, + }); + const { tokens, output } = await extract(body, baseOpts); + expect(tokens).toEqual({ input: 100, output: 50, total: 150 }); + expect(output).toBe(body); + }); + + test('uses total_tokens from response', async () => { + const body = JSON.stringify({ + usage: { prompt_tokens: 10, completion_tokens: 20, total_tokens: 35 }, // total != sum (cached) + }); + const { tokens } = await extract(body, baseOpts); + expect(tokens).toEqual({ input: 10, output: 20, total: 35 }); + }); + + test('handles missing usage', async () => { + const body = JSON.stringify({ id: 'chatcmpl-123', choices: [] }); + const { tokens } = await extract(body, baseOpts); + expect(tokens).toEqual({ input: 0, output: 0, total: 0 }); + }); +}); + +// ─── Copilot non-streaming (same as OpenAI) ──────────────────────── + +describe('Copilot non-streaming', () => { + test('extracts tokens using OpenAI format', async () => { + const body = JSON.stringify({ + usage: { prompt_tokens: 80, completion_tokens: 40, total_tokens: 120 }, + }); + const { tokens } = await extract(body, { + provider: 'copilot', contentType: 'application/json', contentEncoding: '', + }); + expect(tokens).toEqual({ input: 80, output: 40, total: 120 }); + }); +}); + +// ─── Anthropic SSE ──────────────────────────────────────────────── + +describe('Anthropic SSE', () => { + const baseOpts = { provider: 'anthropic', contentType: 'text/event-stream', contentEncoding: '' }; + + test('extracts tokens from message_start and message_delta events', async () => { + const sse = [ + 'event: message_start\n', + 'data: {"type":"message_start","message":{"usage":{"input_tokens":100}}}\n', + '\n', + 'event: content_block_delta\n', + 'data: {"type":"content_block_delta","delta":{"type":"text_delta","text":"Hello"}}\n', + '\n', + 'event: message_delta\n', + 'data: {"type":"message_delta","delta":{"stop_reason":"end_turn"},"usage":{"output_tokens":50}}\n', + '\n', + ]; + const { tokens, output } = await extract(sse, baseOpts); + expect(tokens).toEqual({ input: 100, output: 50, total: 150 }); + expect(output).toBe(sse.join('')); // data passes through unchanged + }); + + test('handles SSE with no token events', async () => { + const sse = [ + 'event: ping\n', + 'data: {}\n', + '\n', + ]; + const { tokens } = await extract(sse, baseOpts); + expect(tokens).toEqual({ input: 0, output: 0, total: 0 }); + }); + + test('handles SSE with only input tokens (no delta)', async () => { + const sse = [ + 'event: message_start\n', + 'data: {"type":"message_start","message":{"usage":{"input_tokens":75}}}\n', + '\n', + ]; + const { tokens } = await extract(sse, baseOpts); + expect(tokens).toEqual({ input: 75, output: 0, total: 75 }); + }); + + test('handles data split across chunks', async () => { + const fullLine = 'data: {"type":"message_start","message":{"usage":{"input_tokens":42}}}\n'; + const chunks = [fullLine.slice(0, 30), fullLine.slice(30)]; + const sseChunks = [ + 'event: message_start\n', + ...chunks, + '\nevent: message_delta\n', + 'data: {"type":"message_delta","usage":{"output_tokens":10}}\n\n', + ]; + const { tokens } = await extract(sseChunks, baseOpts); + expect(tokens).toEqual({ input: 42, output: 10, total: 52 }); + }); +}); + +// ─── OpenAI SSE ─────────────────────────────────────────────────── + +describe('OpenAI SSE', () => { + const baseOpts = { provider: 'openai', contentType: 'text/event-stream', contentEncoding: '' }; + + test('extracts tokens from usage chunk before [DONE]', async () => { + const sse = [ + 'data: {"id":"chatcmpl-1","choices":[{"delta":{"content":"Hi"}}]}\n\n', + 'data: {"id":"chatcmpl-1","choices":[],"usage":{"prompt_tokens":100,"completion_tokens":50,"total_tokens":150}}\n\n', + 'data: [DONE]\n\n', + ]; + const { tokens, output } = await extract(sse, baseOpts); + expect(tokens).toEqual({ input: 100, output: 50, total: 150 }); + expect(output).toBe(sse.join('')); + }); + + test('handles SSE with no usage chunk', async () => { + const sse = [ + 'data: {"id":"chatcmpl-1","choices":[{"delta":{"content":"Hi"}}]}\n\n', + 'data: [DONE]\n\n', + ]; + const { tokens } = await extract(sse, baseOpts); + expect(tokens).toEqual({ input: 0, output: 0, total: 0 }); + }); +}); + +// ─── Copilot SSE (same as OpenAI) ──────────────────────────────── + +describe('Copilot SSE', () => { + test('extracts tokens using OpenAI SSE format', async () => { + const sse = [ + 'data: {"id":"1","choices":[],"usage":{"prompt_tokens":60,"completion_tokens":30,"total_tokens":90}}\n\n', + 'data: [DONE]\n\n', + ]; + const { tokens } = await extract(sse, { + provider: 'copilot', contentType: 'text/event-stream', contentEncoding: '', + }); + expect(tokens).toEqual({ input: 60, output: 30, total: 90 }); + }); +}); + +// ─── Content-Encoding (compressed) ──────────────────────────────── + +describe('compressed responses', () => { + test.each(['gzip', 'br', 'deflate'])('skips extraction for %s', async (enc) => { + const body = JSON.stringify({ usage: { input_tokens: 999, output_tokens: 999 } }); + const { tokens, output } = await extract(body, { + provider: 'anthropic', contentType: 'application/json', contentEncoding: enc, + }); + expect(tokens).toEqual({ input: 0, output: 0, total: 0 }); + expect(output).toBe(body); // still passes through + }); + + test('does not skip for empty content-encoding', async () => { + const body = JSON.stringify({ usage: { input_tokens: 10, output_tokens: 5 } }); + const { tokens } = await extract(body, { + provider: 'anthropic', contentType: 'application/json', contentEncoding: '', + }); + expect(tokens).toEqual({ input: 10, output: 5, total: 15 }); + }); +}); + +// ─── Data integrity ─────────────────────────────────────────────── + +describe('data integrity', () => { + test('binary data passes through unchanged', async () => { + const binary = Buffer.from([0x00, 0x01, 0xff, 0xfe, 0x80]); + const extractor = createTokenExtractor({ + provider: 'openai', contentType: 'application/octet-stream', contentEncoding: '', + }); + const outputChunks = []; + + const tokensPromise = new Promise((resolve) => extractor.on('tokens', resolve)); + const source = Readable.from([binary]); + const sink = new Writable({ + write(chunk, enc, cb) { outputChunks.push(chunk); cb(); }, + }); + + await pipeline(source, extractor, sink); + await tokensPromise; + + expect(Buffer.concat(outputChunks)).toEqual(binary); + }); +}); diff --git a/src/cli.test.ts b/src/cli.test.ts index 6420ed37..2573e2f4 100644 --- a/src/cli.test.ts +++ b/src/cli.test.ts @@ -1426,7 +1426,7 @@ describe('cli', () => { it('should return defaults when no options provided', () => { const r = buildRateLimitConfig({}); expect('config' in r).toBe(true); - if ('config' in r) { expect(r.config).toEqual({ enabled: false, rpm: 0, rph: 0, bytesPm: 0 }); } + if ('config' in r) { expect(r.config).toEqual({ enabled: false, rpm: 0, rph: 0, bytesPm: 0, tpm: 0 }); } }); it('should disable with rateLimit=false even if limits provided', () => { const r = buildRateLimitConfig({ rateLimit: false, rateLimitRpm: '30' }); @@ -1465,7 +1465,25 @@ describe('cli', () => { }); it('should accept all custom values', () => { const r = buildRateLimitConfig({ rateLimitRpm: '10', rateLimitRph: '100', rateLimitBytesPm: '5000000' }); - if ('config' in r) { expect(r.config).toEqual({ enabled: true, rpm: 10, rph: 100, bytesPm: 5000000 }); } + if ('config' in r) { expect(r.config).toEqual({ enabled: true, rpm: 10, rph: 100, bytesPm: 5000000, tpm: 0 }); } + }); + it('should enable rate limiting with only TPM flag', () => { + const r = buildRateLimitConfig({ rateLimitTpm: '50000' }); + expect('config' in r).toBe(true); + if ('config' in r) { expect(r.config.enabled).toBe(true); expect(r.config.tpm).toBe(50000); } + }); + it('should error on negative TPM', () => { + expect('error' in buildRateLimitConfig({ rateLimitTpm: '-1' })).toBe(true); + }); + it('should error on zero TPM', () => { + expect('error' in buildRateLimitConfig({ rateLimitTpm: '0' })).toBe(true); + }); + it('should error on non-integer TPM', () => { + expect('error' in buildRateLimitConfig({ rateLimitTpm: 'abc' })).toBe(true); + }); + it('should accept TPM with RPM together', () => { + const r = buildRateLimitConfig({ rateLimitRpm: '100', rateLimitTpm: '10000' }); + if ('config' in r) { expect(r.config).toEqual({ enabled: true, rpm: 100, rph: 10000, bytesPm: 52428800, tpm: 10000 }); } }); }); @@ -1490,6 +1508,9 @@ describe('cli', () => { it('should fail when --no-rate-limit used without api proxy', () => { expect(validateRateLimitFlags(false, { rateLimit: false }).valid).toBe(false); }); + it('should fail when --rate-limit-tpm used without api proxy', () => { + expect(validateRateLimitFlags(false, { rateLimitTpm: '10000' }).valid).toBe(false); + }); it('should pass when all flags used with api proxy enabled', () => { const r = validateRateLimitFlags(true, { rateLimitRpm: '10', rateLimitRph: '100', rateLimit: false }); expect(r.valid).toBe(true); diff --git a/src/cli.ts b/src/cli.ts index b06238e5..79200f3d 100644 --- a/src/cli.ts +++ b/src/cli.ts @@ -303,23 +303,25 @@ export function buildRateLimitConfig(options: { rateLimitRpm?: string; rateLimitRph?: string; rateLimitBytesPm?: string; + rateLimitTpm?: string; }): { config: RateLimitConfig } | { error: string } { // --no-rate-limit explicitly disables (even if other flags are set) if (options.rateLimit === false) { - return { config: { enabled: false, rpm: 0, rph: 0, bytesPm: 0 } }; + return { config: { enabled: false, rpm: 0, rph: 0, bytesPm: 0, tpm: 0 } }; } // Rate limiting is opt-in: disabled unless at least one --rate-limit-* flag is provided const hasAnyLimit = options.rateLimitRpm !== undefined || options.rateLimitRph !== undefined || - options.rateLimitBytesPm !== undefined; + options.rateLimitBytesPm !== undefined || + options.rateLimitTpm !== undefined; if (!hasAnyLimit) { - return { config: { enabled: false, rpm: 0, rph: 0, bytesPm: 0 } }; + return { config: { enabled: false, rpm: 0, rph: 0, bytesPm: 0, tpm: 0 } }; } // Defaults for any limit not explicitly set - const config: RateLimitConfig = { enabled: true, rpm: 600, rph: 10000, bytesPm: 52428800 }; + const config: RateLimitConfig = { enabled: true, rpm: 600, rph: 10000, bytesPm: 52428800, tpm: 0 }; if (options.rateLimitRpm !== undefined) { const rpm = parseInt(options.rateLimitRpm, 10); @@ -336,6 +338,11 @@ export function buildRateLimitConfig(options: { if (isNaN(bytesPm) || bytesPm <= 0) return { error: '--rate-limit-bytes-pm must be a positive integer' }; config.bytesPm = bytesPm; } + if (options.rateLimitTpm !== undefined) { + const tpm = parseInt(options.rateLimitTpm, 10); + if (isNaN(tpm) || tpm <= 0) return { error: '--rate-limit-tpm must be a positive integer' }; + config.tpm = tpm; + } return { config }; } @@ -348,11 +355,13 @@ export function validateRateLimitFlags(enableApiProxy: boolean, options: { rateLimitRpm?: string; rateLimitRph?: string; rateLimitBytesPm?: string; + rateLimitTpm?: string; }): FlagValidationResult { if (!enableApiProxy) { const hasRateLimitFlags = options.rateLimitRpm !== undefined || options.rateLimitRph !== undefined || options.rateLimitBytesPm !== undefined || + options.rateLimitTpm !== undefined || options.rateLimit === false; if (hasRateLimitFlags) { return { valid: false, error: 'Rate limit flags require --enable-api-proxy' }; @@ -815,6 +824,10 @@ program '--rate-limit-bytes-pm ', 'Enable rate limiting: max request bytes per minute per provider (requires --enable-api-proxy)', ) + .option( + '--rate-limit-tpm ', + 'Enable rate limiting: max tokens per minute per provider (requires --enable-api-proxy)', + ) .option( '--no-rate-limit', 'Explicitly disable rate limiting in the API proxy (requires --enable-api-proxy)', @@ -1094,7 +1107,7 @@ program process.exit(1); } config.rateLimitConfig = rateLimitResult.config; - logger.debug(`Rate limiting: enabled=${rateLimitResult.config.enabled}, rpm=${rateLimitResult.config.rpm}, rph=${rateLimitResult.config.rph}, bytesPm=${rateLimitResult.config.bytesPm}`); + logger.debug(`Rate limiting: enabled=${rateLimitResult.config.enabled}, rpm=${rateLimitResult.config.rpm}, rph=${rateLimitResult.config.rph}, bytesPm=${rateLimitResult.config.bytesPm}, tpm=${rateLimitResult.config.tpm}`); } // Error if rate limit flags are used without --enable-api-proxy diff --git a/src/docker-manager.test.ts b/src/docker-manager.test.ts index a431c66c..cfcf2d37 100644 --- a/src/docker-manager.test.ts +++ b/src/docker-manager.test.ts @@ -1790,7 +1790,7 @@ describe('docker-manager', () => { ...mockConfig, enableApiProxy: true, openaiApiKey: 'sk-test-key', - rateLimitConfig: { enabled: true, rpm: 30, rph: 500, bytesPm: 10485760 }, + rateLimitConfig: { enabled: true, rpm: 30, rph: 500, bytesPm: 10485760, tpm: 0 }, }; const result = generateDockerCompose(configWithRateLimit, mockNetworkConfigWithProxy); const proxy = result.services['api-proxy']; @@ -1806,7 +1806,7 @@ describe('docker-manager', () => { ...mockConfig, enableApiProxy: true, openaiApiKey: 'sk-test-key', - rateLimitConfig: { enabled: false, rpm: 60, rph: 1000, bytesPm: 52428800 }, + rateLimitConfig: { enabled: false, rpm: 60, rph: 1000, bytesPm: 52428800, tpm: 0 }, }; const result = generateDockerCompose(configWithRateLimit, mockNetworkConfigWithProxy); const proxy = result.services['api-proxy']; diff --git a/src/docker-manager.ts b/src/docker-manager.ts index aff4bad9..c355c399 100644 --- a/src/docker-manager.ts +++ b/src/docker-manager.ts @@ -993,6 +993,7 @@ export function generateDockerCompose( AWF_RATE_LIMIT_RPM: String(config.rateLimitConfig.rpm), AWF_RATE_LIMIT_RPH: String(config.rateLimitConfig.rph), AWF_RATE_LIMIT_BYTES_PM: String(config.rateLimitConfig.bytesPm), + AWF_RATE_LIMIT_TPM: String(config.rateLimitConfig.tpm), }), }, healthcheck: { diff --git a/src/types.ts b/src/types.ts index 1ee1c8bf..e5244d1d 100644 --- a/src/types.ts +++ b/src/types.ts @@ -523,6 +523,8 @@ export interface RateLimitConfig { rph: number; /** Max request bytes per minute per provider (default: 52428800 = 50 MB) */ bytesPm: number; + /** Max tokens per minute per provider (default: 0 = unlimited/disabled) */ + tpm: number; } /** diff --git a/tests/fixtures/awf-runner.ts b/tests/fixtures/awf-runner.ts index a3b5cfeb..acfecf76 100644 --- a/tests/fixtures/awf-runner.ts +++ b/tests/fixtures/awf-runner.ts @@ -22,6 +22,7 @@ export interface AwfOptions { rateLimitRpm?: number; // Requests per minute per provider rateLimitRph?: number; // Requests per hour per provider rateLimitBytesPm?: number; // Request bytes per minute per provider + rateLimitTpm?: number; // Tokens per minute per provider noRateLimit?: boolean; // Disable rate limiting envAll?: boolean; // Pass all host environment variables to container (--env-all) cliEnv?: Record; // Explicit -e KEY=VALUE flags passed to AWF CLI @@ -126,6 +127,9 @@ export class AwfRunner { if (options.rateLimitBytesPm !== undefined) { args.push('--rate-limit-bytes-pm', String(options.rateLimitBytesPm)); } + if (options.rateLimitTpm !== undefined) { + args.push('--rate-limit-tpm', String(options.rateLimitTpm)); + } if (options.noRateLimit) { args.push('--no-rate-limit'); } @@ -306,6 +310,9 @@ export class AwfRunner { if (options.rateLimitBytesPm !== undefined) { args.push('--rate-limit-bytes-pm', String(options.rateLimitBytesPm)); } + if (options.rateLimitTpm !== undefined) { + args.push('--rate-limit-tpm', String(options.rateLimitTpm)); + } if (options.noRateLimit) { args.push('--no-rate-limit'); } diff --git a/tests/integration/api-proxy-observability.test.ts b/tests/integration/api-proxy-observability.test.ts index 83d4ef98..a49fc2ce 100644 --- a/tests/integration/api-proxy-observability.test.ts +++ b/tests/integration/api-proxy-observability.test.ts @@ -14,6 +14,64 @@ import { cleanup } from '../fixtures/cleanup'; // The API proxy sidecar is at this fixed IP on the awf-net network const API_PROXY_IP = '172.30.0.30'; +/** + * Extract the last JSON object from stdout. + * + * When --build-local is used, Docker build output is mixed into stdout before + * the actual command output. This helper finds the last complete top-level + * JSON object in the output so that JSON.parse works reliably. + */ +function extractLastJson(stdout: string): unknown { + // Find the last '{' that starts a top-level JSON object + let depth = 0; + let jsonEnd = -1; + let jsonStart = -1; + + // Scan backwards from end to find the last complete JSON object + for (let i = stdout.length - 1; i >= 0; i--) { + const ch = stdout[i]; + if (ch === '}') { + if (depth === 0) jsonEnd = i; + depth++; + } else if (ch === '{') { + depth--; + if (depth === 0) { + jsonStart = i; + break; + } + } + } + + if (jsonStart === -1 || jsonEnd === -1) { + throw new Error(`No JSON object found in stdout (length=${stdout.length}): ${stdout.slice(-200)}`); + } + + return JSON.parse(stdout.slice(jsonStart, jsonEnd + 1)); +} + +/** + * Extract the HTTP response section from stdout when curl -i is used. + * + * Docker build output appears before the HTTP response. This finds the last + * HTTP response block (starting with "HTTP/") in stdout. + */ +function extractHttpResponse(stdout: string): string { + // Find the last occurrence of an HTTP status line + const httpPattern = /HTTP\/[\d.]+ \d+/g; + let lastMatch: RegExpExecArray | null = null; + let match: RegExpExecArray | null; + while ((match = httpPattern.exec(stdout)) !== null) { + lastMatch = match; + } + + if (lastMatch) { + return stdout.slice(lastMatch.index); + } + + // Fallback: return the whole stdout + return stdout; +} + describe('API Proxy Observability', () => { let runner: AwfRunner; @@ -176,10 +234,13 @@ describe('API Proxy Observability', () => { ); expect(result).toSucceed(); - const lower = result.stdout.toLowerCase(); + // Extract only the HTTP response portion to avoid Docker build output pollution + const httpResponse = extractHttpResponse(result.stdout); + const lower = httpResponse.toLowerCase(); expect(lower).toContain('x-request-id'); - // The injected ID should NOT appear — proxy should have generated a UUID instead - expect(result.stdout).not.toContain('