Skip to content
2 changes: 1 addition & 1 deletion containers/api-proxy/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ COPY package*.json ./
RUN npm ci --omit=dev

# Copy application files
COPY server.js logging.js metrics.js rate-limiter.js ./
COPY server.js logging.js metrics.js rate-limiter.js token-extractor.js ./

# Create non-root user
RUN addgroup -S apiproxy && adduser -S apiproxy -G apiproxy
Expand Down
83 changes: 77 additions & 6 deletions containers/api-proxy/rate-limiter.js
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
/**
* Sliding Window Counter Rate Limiter for AWF API Proxy.
*
* Provides per-provider rate limiting with three limit types:
* Provides per-provider rate limiting with four limit types:
* - RPM: requests per minute (1-second granularity, 60 slots)
* - RPH: requests per hour (1-minute granularity, 60 slots)
* - Bytes/min: request bytes per minute (1-second granularity, 60 slots)
* - TPM: tokens per minute (1-second granularity, 60 slots)
*
* Algorithm: sliding window counter — counts in the current window plus a
* weighted portion of the previous window based on elapsed time.
Expand All @@ -20,6 +21,7 @@
const DEFAULT_RPM = 600;
const DEFAULT_RPH = 1000;
const DEFAULT_BYTES_PM = 50 * 1024 * 1024; // 50 MB
const DEFAULT_TPM = 0; // 0 means disabled/unlimited — opt-in like other limits

// ── Window sizes ────────────────────────────────────────────────────────
const MINUTE_SLOTS = 60; // 1-second granularity for per-minute windows
Expand Down Expand Up @@ -143,6 +145,8 @@ class ProviderState {
this.rphWindow = createWindow(HOUR_SLOTS);
// Bytes per minute: 1-second granularity
this.bytesWindow = createWindow(MINUTE_SLOTS);
// Tokens per minute: 1-second granularity
this.tpmWindow = createWindow(MINUTE_SLOTS);
}
}

Expand All @@ -152,12 +156,14 @@ class RateLimiter {
* @param {number} [config.rpm=600] - Max requests per minute
* @param {number} [config.rph=1000] - Max requests per hour
* @param {number} [config.bytesPm=52428800] - Max bytes per minute (50 MB)
* @param {number} [config.tpm=0] - Max tokens per minute (0 = unlimited)
* @param {boolean} [config.enabled=true] - Whether rate limiting is active
*/
constructor(config = {}) {
this.rpm = config.rpm ?? DEFAULT_RPM;
this.rph = config.rph ?? DEFAULT_RPH;
this.bytesPm = config.bytesPm ?? DEFAULT_BYTES_PM;
this.tpm = config.tpm ?? DEFAULT_TPM;
this.enabled = config.enabled !== false;

/** @type {Map<string, ProviderState>} */
Expand All @@ -184,6 +190,10 @@ class RateLimiter {
* If allowed, the request is counted (recorded in windows).
* If denied, no recording happens — the caller should return 429.
*
* Note: TPM check uses previously recorded token consumption. Tokens
* are recorded post-response via recordTokens(), so the check here
* decides if the NEXT request is allowed based on PREVIOUS consumption.
*
* @param {string} provider - e.g. "openai", "anthropic", "copilot"
* @param {number} [requestBytes=0] - Size of the request body in bytes
* @returns {{
Expand Down Expand Up @@ -253,6 +263,23 @@ class RateLimiter {
};
}

// Check TPM (tokens per minute) — only if configured
if (this.tpm > 0) {
const tpmCount = getWindowCount(state.tpmWindow, nowSec, MINUTE_SLOTS);
if (tpmCount >= this.tpm) {
const retryAfter = estimateRetryAfter(state.tpmWindow, nowSec, MINUTE_SLOTS, this.tpm);
const resetAt = nowSec + retryAfter;
return {
allowed: false,
limitType: 'tpm',
limit: this.tpm,
remaining: 0,
Copy link

Copilot AI Feb 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

check() can now return limitType: 'tpm'. The API proxy’s 429 builder in containers/api-proxy/server.js currently only maps rpm/rph/bytes_pm for the human label and window field, so a TPM rejection will produce a confusing label and an incorrect window value (it will fall through to the bytes-per-minute case). Please update the server-side mapping to include 'tpm' (and a distinct window like per_minute_tokens) so clients can interpret the error correctly.

Suggested change
remaining: 0,
remaining: Math.max(0, this.tpm - tpmCount),

Copilot uses AI. Check for mistakes.
retryAfter,
resetAt,
};
}
}

// All checks passed — record the request
recordInWindow(state.rpmWindow, nowSec, MINUTE_SLOTS, 1);
recordInWindow(state.rphWindow, nowMin, HOUR_SLOTS, 1);
Expand All @@ -275,10 +302,32 @@ class RateLimiter {
}
}

/**
* Record token usage for a provider after a response completes.
*
* This is separate from check() because token counts are only known
* after the response is received and parsed. The recorded tokens are
* used by subsequent check() calls to enforce the TPM limit.
*
* @param {string} provider - e.g. "openai", "anthropic", "copilot"
* @param {number} tokenCount - Number of tokens consumed by the response
*/
recordTokens(provider, tokenCount) {
if (!this.enabled || this.tpm <= 0 || !tokenCount || tokenCount <= 0) return;

try {
const state = this._getState(provider);
const nowSec = Math.floor(Date.now() / 1000);
recordInWindow(state.tpmWindow, nowSec, MINUTE_SLOTS, tokenCount);
} catch (_err) {
// Fail-open: ignore recording errors
}
}

/**
* Get rate limit status for a single provider.
* @param {string} provider
* @returns {object} Status with rpm, rph windows
* @returns {object} Status with rpm, rph, tpm windows
*/
getStatus(provider) {
if (!this.enabled) {
Expand All @@ -288,11 +337,15 @@ class RateLimiter {
try {
const state = this.providers.get(provider);
if (!state) {
return {
const status = {
enabled: true,
rpm: { limit: this.rpm, remaining: this.rpm, reset: 0 },
rph: { limit: this.rph, remaining: this.rph, reset: 0 },
};
if (this.tpm > 0) {
status.tpm = { limit: this.tpm, remaining: this.tpm, reset: 0 };
}
return status;
}

const nowMs = Date.now();
Expand All @@ -309,7 +362,7 @@ class RateLimiter {
? estimateRetryAfter(state.rphWindow, nowMin, HOUR_SLOTS, this.rph) * 60
: 0;

return {
const status = {
enabled: true,
rpm: {
limit: this.rpm,
Expand All @@ -322,6 +375,21 @@ class RateLimiter {
reset: rphRetry > 0 ? Math.floor(nowMs / 1000) + rphRetry : 0,
},
};

// Include TPM status if configured
if (this.tpm > 0) {
const tpmCount = getWindowCount(state.tpmWindow, nowSec, MINUTE_SLOTS);
const tpmRetry = tpmCount >= this.tpm
? estimateRetryAfter(state.tpmWindow, nowSec, MINUTE_SLOTS, this.tpm)
: 0;
status.tpm = {
limit: this.tpm,
remaining: Math.max(0, this.tpm - tpmCount),
reset: tpmRetry > 0 ? nowSec + tpmRetry : 0,
};
}

return status;
} catch (_err) {
return { enabled: true, error: 'internal_error' };
}
Expand All @@ -344,9 +412,10 @@ class RateLimiter {
* Create a RateLimiter from environment variables.
*
* Reads:
* - AWF_RATE_LIMIT_RPM (default: 60)
* - AWF_RATE_LIMIT_RPM (default: 600)
* - AWF_RATE_LIMIT_RPH (default: 1000)
* - AWF_RATE_LIMIT_BYTES_PM (default: 52428800)
* - AWF_RATE_LIMIT_TPM (default: 0 — disabled)
* - AWF_RATE_LIMIT_ENABLED (default: "false" — rate limiting is opt-in)
*
* @returns {RateLimiter}
Expand All @@ -355,13 +424,15 @@ function create() {
const rawRpm = parseInt(process.env.AWF_RATE_LIMIT_RPM, 10);
const rawRph = parseInt(process.env.AWF_RATE_LIMIT_RPH, 10);
const rawBytesPm = parseInt(process.env.AWF_RATE_LIMIT_BYTES_PM, 10);
const rawTpm = parseInt(process.env.AWF_RATE_LIMIT_TPM, 10);

const rpm = (Number.isFinite(rawRpm) && rawRpm > 0) ? rawRpm : DEFAULT_RPM;
const rph = (Number.isFinite(rawRph) && rawRph > 0) ? rawRph : DEFAULT_RPH;
const bytesPm = (Number.isFinite(rawBytesPm) && rawBytesPm > 0) ? rawBytesPm : DEFAULT_BYTES_PM;
const tpm = (Number.isFinite(rawTpm) && rawTpm > 0) ? rawTpm : DEFAULT_TPM;
const enabled = process.env.AWF_RATE_LIMIT_ENABLED === 'true';

return new RateLimiter({ rpm, rph, bytesPm, enabled });
return new RateLimiter({ rpm, rph, bytesPm, tpm, enabled });
}

module.exports = { RateLimiter, create };
32 changes: 31 additions & 1 deletion containers/api-proxy/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ const { HttpsProxyAgent } = require('https-proxy-agent');
const { generateRequestId, sanitizeForLog, logRequest } = require('./logging');
const metrics = require('./metrics');
const rateLimiter = require('./rate-limiter');
const { createTokenExtractor } = require('./token-extractor');

// Create rate limiter from environment variables
const limiter = rateLimiter.create();
Expand Down Expand Up @@ -319,7 +320,36 @@ function proxyRequest(req, res, targetHost, injectHeaders, provider) {
// Copy response headers and add X-Request-ID
const resHeaders = { ...proxyRes.headers, 'x-request-id': requestId };
res.writeHead(proxyRes.statusCode, resHeaders);
proxyRes.pipe(res);

// Extract token counts from response (best-effort, fail-open)
const resContentType = proxyRes.headers['content-type'] || '';
const resContentEncoding = proxyRes.headers['content-encoding'] || '';
const tokenExtractor = createTokenExtractor({
provider,
contentType: resContentType,
contentEncoding: resContentEncoding,
});

tokenExtractor.on('tokens', (tokens) => {
if (tokens.input > 0) {
metrics.increment('tokens_input_total', { provider }, tokens.input);
}
if (tokens.output > 0) {
metrics.increment('tokens_output_total', { provider }, tokens.output);
}
if (tokens.total > 0 && typeof limiter.recordTokens === 'function') {
limiter.recordTokens(provider, tokens.total);
}
logRequest('info', 'tokens_recorded', {
request_id: requestId,
provider,
input_tokens: tokens.input,
output_tokens: tokens.output,
total_tokens: tokens.total,
});
});

proxyRes.pipe(tokenExtractor).pipe(res);
});

proxyReq.on('error', (err) => {
Expand Down
Loading
Loading