diff --git a/build.config.mjs b/build.config.mjs index d2670f8..6025b26 100644 --- a/build.config.mjs +++ b/build.config.mjs @@ -19,6 +19,7 @@ export default defineBuildConfig({ "cloudflare", "generic", "service-worker", + "uws", ].map((adapter) => `src/adapters/${adapter}.ts`), ], rolldown: { diff --git a/package.json b/package.json index 4afe036..e8d30a6 100644 --- a/package.json +++ b/package.json @@ -14,6 +14,7 @@ "./cloudflare": "./dist/adapters/cloudflare.mjs", "./generic": "./dist/adapters/generic.mjs", "./service-worker": "./dist/adapters/service-worker.mjs", + "./uws": "./dist/adapters/uws.mjs", "./cli": "./dist/cli.mjs", "./static": "./dist/static.mjs", "./log": "./dist/log.mjs", @@ -86,6 +87,9 @@ "undici": "^7.13.0", "vitest": "^3.2.4" }, + "optionalDependencies": { + "uWebSockets.js": "github:uNetworking/uWebSockets.js#v20.52.0" + }, "packageManager": "pnpm@10.12.4", "engines": { "node": ">=20.16.0" diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index b7be11b..8f74aec 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -93,6 +93,10 @@ importers: vitest: specifier: ^3.2.4 version: 3.2.4(@types/node@24.2.0)(jiti@2.5.1) + optionalDependencies: + uWebSockets.js: + specifier: github:uNetworking/uWebSockets.js#v20.52.0 + version: https://codeload.github.com/uNetworking/uWebSockets.js/tar.gz/cfc9a40d8132a34881813cec3d5f8e3a185b3ce3 examples/elysia: devDependencies: @@ -2506,6 +2510,10 @@ packages: engines: {node: '>=14.17'} hasBin: true + uWebSockets.js@https://codeload.github.com/uNetworking/uWebSockets.js/tar.gz/cfc9a40d8132a34881813cec3d5f8e3a185b3ce3: + resolution: {tarball: https://codeload.github.com/uNetworking/uWebSockets.js/tar.gz/cfc9a40d8132a34881813cec3d5f8e3a185b3ce3} + version: 20.52.0 + ufo@1.6.1: resolution: {integrity: sha512-9a4/uxlTWJ4+a5i0ooc1rU7C7YOw3wT+UGqdeNNHWnOF9qcMBgLRS+4IYUqbczewFx4mLEig6gawh7X6mFlEkA==} @@ -4921,6 +4929,9 @@ snapshots: typescript@5.9.2: {} + uWebSockets.js@https://codeload.github.com/uNetworking/uWebSockets.js/tar.gz/cfc9a40d8132a34881813cec3d5f8e3a185b3ce3: + optional: true + ufo@1.6.1: {} uint8array-extras@1.4.0: {} diff --git a/src/adapters/_uws/_common.ts b/src/adapters/_uws/_common.ts new file mode 100644 index 0000000..2a98b36 --- /dev/null +++ b/src/adapters/_uws/_common.ts @@ -0,0 +1,2 @@ +export { kNodeInspect } from "../_node/_common.ts"; +export const kUWSAbort: symbol = /* @__PURE__ */ Symbol.for("srvx.uws.abort"); diff --git a/src/adapters/_uws/headers.ts b/src/adapters/_uws/headers.ts new file mode 100644 index 0000000..007f1a5 --- /dev/null +++ b/src/adapters/_uws/headers.ts @@ -0,0 +1,256 @@ +import { splitSetCookieString } from "cookie-es"; +import { kNodeInspect } from "../_node/_common.ts"; + +import type { UWSServerRequest, UWSServerResponse } from "../../types.ts"; + +export const UWSRequestHeaders: { + new (req: UWSServerRequest): globalThis.Headers; +} = /* @__PURE__ */ (() => { + const _Headers = class Headers implements globalThis.Headers { + _req: UWSServerRequest; + + constructor(req: UWSServerRequest) { + this._req = req; + } + + append(_name: string, _value: string): void { + throw new Error("UWSRequestHeaders are immutable."); + } + + delete(_name: string): void { + throw new Error("UWSRequestHeaders are immutable."); + } + + get(name: string): string | null { + const value = this._req.getHeader(validateHeader(name)); + return value === "" ? null : value; + } + + getSetCookie(): string[] { + const setCookie = this.get("set-cookie"); + if (!setCookie) { + return []; + } + return splitSetCookieString(setCookie); + } + + has(name: string): boolean { + return this.get(validateHeader(name)) !== null; + } + + set(_name: string, _value: string): void { + throw new Error("UWSRequestHeaders are immutable."); + } + + get count(): number { + // Bun-specific addon + throw new Error("Method not implemented."); + } + + getAll(name: "set-cookie" | "Set-Cookie"): string[] { + const lowerName = name.toLowerCase(); + const val = this._req.getHeader(lowerName); + if (lowerName === "set-cookie") { + return val ? splitSetCookieString(val) : []; + } + return val === "" ? [] : val.split(", "); + } + + toJSON(): Record { + const result: Record = {}; + this._req["forEach"]((key, value) => { + result[key] = value; + }); + return result; + } + + forEach( + cb: (value: string, key: string, parent: Headers) => void, + thisArg?: object, + ): void { + this._req["forEach"]((key, value) => { + cb.call(thisArg, value, key, this); + }); + } + + *entries(): HeadersIterator<[string, string]> { + const entries: [string, string][] = []; + this._req["forEach"]((k, v) => { + entries.push([k, v]); + }); + yield* entries; + } + + *keys(): HeadersIterator { + const keys: string[] = []; + this._req["forEach"]((k) => { + keys.push(k); + }); + yield* keys; + } + + *values(): HeadersIterator { + const values: string[] = []; + this._req["forEach"]((_, v) => { + values.push(v); + }); + yield* values; + } + + [Symbol.iterator](): HeadersIterator<[string, string]> { + return this.entries(); + } + + get [Symbol.toStringTag]() { + return "Headers"; + } + + [kNodeInspect]() { + return Object.fromEntries(this.entries()); + } + }; + + Object.setPrototypeOf(_Headers.prototype, globalThis.Headers.prototype); + + return _Headers; +})(); + +export const UWSResponseHeaders: { + new (res: UWSServerResponse): globalThis.Headers; +} = /* @__PURE__ */ (() => { + const _Headers = class Headers implements globalThis.Headers { + _res: UWSServerResponse; + _headers: Record = {}; + + constructor(res: UWSServerResponse) { + this._res = res; + } + + append(name: string, value: string): void { + name = validateHeader(name); + const current = this._headers[name]; + if (current) { + if (Array.isArray(current)) { + current.push(value); + } else { + this._headers[name] = [current, value]; + } + } else { + this._headers[name] = value; + } + this._apply(); + } + + delete(name: string): void { + name = validateHeader(name); + delete this._headers[name]; + this._apply(); + } + + get(name: string): string | null { + const value = this._headers[validateHeader(name)]; + if (value === undefined) { + return null; + } + return Array.isArray(value) ? value.join(", ") : value; + } + + getSetCookie(): string[] { + const setCookie = this._headers["set-cookie"]; + if (!setCookie) { + return []; + } + return Array.isArray(setCookie) ? setCookie : [setCookie]; + } + + has(name: string): boolean { + return this._headers[validateHeader(name)] !== undefined; + } + + set(name: string, value: string): void { + this._headers[validateHeader(name)] = value; + this._apply(); + } + + get count(): number { + // Bun-specific addon + throw new Error("Method not implemented."); + } + + getAll(_name: "set-cookie" | "Set-Cookie"): string[] { + // Bun-specific addon + throw new Error("Method not implemented."); + } + + _apply() { + for (const [key, value] of Object.entries(this._headers)) { + if (Array.isArray(value)) { + // uws allows multiple headers with same name + for (const v of value) { + this._res.writeHeader(key, v); + } + } else { + this._res.writeHeader(key, value); + } + } + } + + toJSON(): Record { + const result: Record = {}; + for (const key in this._headers) { + result[key] = this.get(key)!; + } + return result; + } + + forEach( + cb: (value: string, key: string, parent: Headers) => void, + thisArg?: object, + ): void { + for (const key in this._headers) { + cb.call(thisArg, this.get(key)!, key, this); + } + } + + *entries(): HeadersIterator<[string, string]> { + for (const key in this._headers) { + yield [key, this.get(key)!]; + } + } + + *keys(): HeadersIterator { + for (const key in this._headers) { + yield key; + } + } + + *values(): HeadersIterator { + for (const key in this._headers) { + yield this.get(key)!; + } + } + + [Symbol.iterator](): HeadersIterator<[string, string]> { + return this.entries(); + } + + get [Symbol.toStringTag]() { + return "Headers"; + } + + [kNodeInspect]() { + return this._headers; + } + }; + + Object.setPrototypeOf(_Headers.prototype, globalThis.Headers.prototype); + + return _Headers; +})(); + +function validateHeader(name: string): string { + if (name[0] === ":") { + throw new TypeError(`${JSON.stringify(name)} is an invalid header name.`); + } + return name.toLowerCase(); +} diff --git a/src/adapters/_uws/index.ts b/src/adapters/_uws/index.ts new file mode 100644 index 0000000..58fcbdc --- /dev/null +++ b/src/adapters/_uws/index.ts @@ -0,0 +1,4 @@ +export { UWSRequestHeaders, UWSResponseHeaders } from "./headers.ts"; +export { UWSRequest } from "./request.ts"; +export { UWSResponse } from "./response.ts"; +export { sendUWSResponse } from "./send.ts"; diff --git a/src/adapters/_uws/request.ts b/src/adapters/_uws/request.ts new file mode 100644 index 0000000..272d99a --- /dev/null +++ b/src/adapters/_uws/request.ts @@ -0,0 +1,267 @@ +import { kNodeInspect, kUWSAbort } from "./_common.ts"; +import { UWSRequestHeaders } from "./headers.ts"; + +import type { + UWSServerRequest, + UWSServerResponse, + ServerRequest, + ServerRuntimeContext, +} from "../../types.ts"; + +export type UWSRequestContext = { + req: UWSServerRequest; + res: UWSServerResponse; +}; + +export const UWSRequest = /* @__PURE__ */ (() => { + const unsupportedGetters = [ + "cache", + "credentials", + "destination", + "integrity", + "keepalive", + "mode", + "redirect", + "referrer", + "referrerPolicy", + ] as const; + + const _Request = class Request + implements Omit + { + #headers?: InstanceType; + #bodyUsed: boolean = false; + #abortSignal?: AbortController; + #bodyBytes?: Promise>; + #blobBody?: Promise; + #formDataBody?: Promise; + #jsonBody?: Promise; + #textBody?: Promise; + #bodyStream?: undefined | ReadableStream>; + + _uws: UWSRequestContext; + runtime: ServerRuntimeContext; + + constructor(uwsCtx: UWSRequestContext) { + this._uws = uwsCtx; + this.runtime = { + name: "uws", + uws: uwsCtx, + }; + this._uws.res.onAborted(() => { + this.#abortSignal?.abort(); + }); + } + + get ip() { + const txt = new TextDecoder().decode( + this._uws.res.getRemoteAddressAsText(), + ); + return normalizeIp(txt); + } + + get headers() { + if (!this.#headers) { + this.#headers = new UWSRequestHeaders(this._uws.req); + } + return this.#headers; + } + + clone(): any { + return new _Request({ ...this._uws }); + } + + get url() { + const query = this._uws.req.getQuery(); + return ( + (this._uws.req.getHeader("x-forwarded-proto") === "https" + ? "https://" + : "http://") + + this._uws.req.getHeader("host") + + this._uws.req.getUrl() + + (query ? `?${query}` : "") + ); + } + + get method() { + return this._uws.req.getMethod().toUpperCase(); + } + + get signal() { + if (!this.#abortSignal) { + this.#abortSignal = new AbortController(); + // Allow response pipeline to notify abort + (this._uws.res as unknown as Record void>)[kUWSAbort] = + () => this.#abortSignal?.abort(); + } + return this.#abortSignal.signal; + } + + get bodyUsed() { + return this.#bodyUsed; + } + + get body(): ReadableStream> | null { + if (this.method === "GET" || this.method === "HEAD") { + return null; + } + if (!this.#bodyStream) { + this.#bodyUsed = true; + this.#bodyStream = new ReadableStream({ + start: (controller) => { + this._uws.res.onData((chunk, isLast) => { + controller.enqueue(new Uint8Array(chunk)); + if (isLast) { + controller.close(); + } + }); + }, + }); + } + return this.#bodyStream; + } + + bytes(): Promise> { + if (!this.#bodyBytes) { + const _bodyStream = this.body; + this.#bodyBytes = _bodyStream + ? _readStream(_bodyStream) + : Promise.resolve(new Uint8Array()); + } + return this.#bodyBytes; + } + + arrayBuffer(): Promise { + return this.bytes().then((buff) => buff.buffer); + } + + blob(): Promise { + if (!this.#blobBody) { + this.#blobBody = this.bytes().then((bytes) => { + return new Blob([bytes], { + type: this.headers.get("content-type") || "", + }); + }); + } + return this.#blobBody; + } + + formData(): Promise { + if (!this.#formDataBody) { + this.#formDataBody = new Response(this.body, { + headers: this.headers as unknown as Headers, + }).formData(); + } + return this.#formDataBody; + } + + text(): Promise { + if (!this.#textBody) { + this.#textBody = this.bytes().then((bytes) => { + return new TextDecoder().decode(bytes); + }); + } + return this.#textBody; + } + + json(): Promise { + if (!this.#jsonBody) { + this.#jsonBody = this.text().then((txt) => { + return JSON.parse(txt); + }); + } + return this.#jsonBody; + } + + get [Symbol.toStringTag]() { + return "Request"; + } + + [kNodeInspect]() { + return { + method: this.method, + url: this.url, + headers: this.headers, + }; + } + }; + + for (const key of unsupportedGetters) { + Object.defineProperty(_Request.prototype, key, { + enumerable: true, + configurable: false, + }); + } + + Object.setPrototypeOf(_Request.prototype, globalThis.Request.prototype); + + return _Request; +})() as unknown as { + new (uwsCtx: UWSRequestContext): ServerRequest; +}; + +async function _readStream(stream: ReadableStream) { + const chunks: Uint8Array[] = []; + const reader = stream.getReader(); + while (true) { + const { done, value } = await reader.read(); + if (done) { + break; + } + chunks.push(value); + } + const buffer = new Uint8Array( + chunks.reduce((acc, chunk) => acc + chunk.length, 0), + ); + let offset = 0; + for (const chunk of chunks) { + buffer.set(chunk, offset); + offset += chunk.length; + } + return buffer; +} + +function normalizeIp(txt: string): string { + const lower = txt.toLowerCase(); + + if (lower === "::1") { + return "::1"; + } + + const parts = lower.split(":"); + + if ( + parts.length === 8 && + Number.parseInt(parts[7], 16) === 1 && + parts.slice(0, 7).every((p) => Number.parseInt(p, 16) === 0) + ) { + return "::1"; + } + + const match = lower.match(/:ffff:([0-9a-f]{1,4}):([0-9a-f]{1,4})$/); + + if (match && typeof match.index === "number") { + const prefix = lower.slice(0, Math.max(0, match.index)); + const prefixHextets = prefix.split(":").filter(Boolean); + const isPrefixAllZeros = prefixHextets.every( + (p) => Number.parseInt(p, 16) === 0, + ); + + if (isPrefixAllZeros) { + const hexA = match[1]; + const hexB = match[2]; + const valA = Number.parseInt(hexA, 16); + const valB = Number.parseInt(hexB, 16); + + if (Number.isFinite(valA) && Number.isFinite(valB)) { + const b1 = (valA >> 8) & 0xff; + const b2 = valA & 0xff; + const b3 = (valB >> 8) & 0xff; + const b4 = valB & 0xff; + return `${b1}.${b2}.${b3}.${b4}`; + } + } + } + + return txt; +} diff --git a/src/adapters/_uws/response.ts b/src/adapters/_uws/response.ts new file mode 100644 index 0000000..598f959 --- /dev/null +++ b/src/adapters/_uws/response.ts @@ -0,0 +1,368 @@ +import { splitSetCookieString } from "cookie-es"; +import { Buffer } from "node:buffer"; +import type { Readable as NodeReadable } from "node:stream"; + +export type UWSResponse = InstanceType; + +/** + * Fast Response for uWebSockets.js runtime + * + * It is faster because in most cases it doesn't create a full Response instance. + */ +export const UWSResponse: { + new ( + body?: BodyInit | null, + init?: ResponseInit, + ): globalThis.Response & { + readonly uwsResponse: () => { + status: number; + statusText: string; + headers: [string, string][]; + body: + | string + | Buffer + | Uint8Array + | DataView + | ReadableStream + | NodeReadable + | undefined + | null; + }; + }; +} = /* @__PURE__ */ (() => { + const CONTENT_TYPE = "content-type"; + const JSON_TYPE = "application/json"; + const JSON_HEADER = [[CONTENT_TYPE, JSON_TYPE]] as HeadersInit; + + const _Response = class Response implements globalThis.Response { + #body?: BodyInit | null; + #init?: ResponseInit; + readonly webSocket: any; + + constructor(body?: BodyInit | null, init?: ResponseInit) { + this.#body = body; + this.#init = init; + } + + static json(data: unknown, init?: ResponseInit): Response { + if (init?.headers) { + if (!(init.headers as Record)[CONTENT_TYPE]) { + const initHeaders = new Headers(init.headers); + if (!initHeaders.has(CONTENT_TYPE)) { + initHeaders.set(CONTENT_TYPE, JSON_TYPE); + } + init = { ...init, headers: initHeaders }; + } + } else { + init = init ? { ...init } : {}; + init.headers = JSON_HEADER; + } + return new _Response(JSON.stringify(data), init); + } + + static error(): globalThis.Response { + return globalThis.Response.error(); + } + + static redirect(url: string | URL, status?: number): globalThis.Response { + return globalThis.Response.redirect(url, status); + } + + /** + * Prepare uWebSockets.js response object + */ + uwsResponse() { + const status = this.#init?.status ?? 200; + const statusText = this.#init?.statusText ?? ""; + + const headers: [string, string][] = []; + + const headersInit = this.#init?.headers; + if (this.#headersObj) { + for (const [key, value] of this.#headersObj) { + if (key === "set-cookie") { + for (const setCookie of splitSetCookieString(value)) { + headers.push(["set-cookie", setCookie]); + } + } else { + headers.push([key, value]); + } + } + } else if (headersInit) { + const headerEntries = Array.isArray(headersInit) + ? headersInit + : headersInit.entries + ? (headersInit as Headers).entries() + : Object.entries(headersInit); + for (const [key, value] of headerEntries) { + if (key === "set-cookie") { + for (const setCookie of splitSetCookieString(value)) { + headers.push(["set-cookie", setCookie]); + } + } else { + headers.push([key, value]); + } + } + } + + const bodyInit = this.#body as BodyInit | null | undefined | NodeReadable; + // prettier-ignore + let body: string | Buffer | Uint8Array | DataView | ReadableStream | NodeReadable | undefined | null; + if (bodyInit) { + if (typeof bodyInit === "string") { + body = bodyInit; + } else if (bodyInit instanceof ReadableStream) { + body = bodyInit; + } else if (bodyInit instanceof ArrayBuffer) { + body = Buffer.from(bodyInit); + } else if (bodyInit instanceof Uint8Array) { + body = Buffer.from(bodyInit); + } else if (bodyInit instanceof DataView) { + body = Buffer.from(bodyInit.buffer); + } else if (bodyInit instanceof Blob) { + body = bodyInit.stream(); + if (bodyInit.type) { + headers.push(["content-type", bodyInit.type]); + } + } else if (typeof (bodyInit as NodeReadable).pipe === "function") { + body = bodyInit as NodeReadable; + } else { + const res = new globalThis.Response(bodyInit as BodyInit); + body = res.body as ReadableStream>; + for (const [key, value] of res.headers) { + headers.push([key, value]); + } + } + } + + // Free up memory + this.#body = undefined; + this.#init = undefined; + this.#headersObj = undefined; + this.#responseObj = undefined; + + return { + status, + statusText, + headers, + body, + }; + } + + /** Lazy initialized response instance */ + #responseObj?: globalThis.Response; + + /** Lazy initialized headers instance */ + #headersObj?: Headers; + + clone(): globalThis.Response { + if (this.#responseObj) { + return this.#responseObj.clone(); + } + if (this.#headersObj) { + return new _Response(this.#body, { + ...this.#init, + headers: this.#headersObj, + }); + } + return new _Response(this.#body, this.#init); + } + + get #response(): globalThis.Response { + if (!this.#responseObj) { + this.#responseObj = this.#headersObj + ? new globalThis.Response(this.#body, { + ...this.#init, + headers: this.#headersObj, + }) + : new globalThis.Response(this.#body, this.#init); + // Free up memory + this.#body = undefined; + this.#init = undefined; + this.#headersObj = undefined; + } + return this.#responseObj; + } + + get headers(): Headers { + if (this.#responseObj) { + return this.#responseObj.headers; // Reuse instance + } + if (!this.#headersObj) { + this.#headersObj = new Headers(this.#init?.headers); + } + return this.#headersObj; + } + + get ok(): boolean { + if (this.#responseObj) { + return this.#responseObj.ok; + } + const status = this.#init?.status ?? 200; + return status >= 200 && status < 300; + } + + get redirected(): boolean { + if (this.#responseObj) { + return this.#responseObj.redirected; + } + return false; + } + + get status(): number { + if (this.#responseObj) { + return this.#responseObj.status; + } + return this.#init?.status ?? 200; + } + + get statusText(): string { + if (this.#responseObj) { + return this.#responseObj.statusText; + } + return this.#init?.statusText ?? ""; + } + + get type(): ResponseType { + if (this.#responseObj) { + return this.#responseObj.type; + } + return "default"; + } + + get url(): string { + if (this.#responseObj) { + return this.#responseObj.url; + } + return ""; + } + + // --- body --- + + #fastBody( + as: new (...args: any[]) => T, + ): T | null | false { + const bodyInit = this.#body; + if (bodyInit === null || bodyInit === undefined) { + // No body + return null; + } + if (bodyInit instanceof as) { + // Fast path + return bodyInit; + } + // Not supported + return false; + } + + get body(): ReadableStream> | null { + if (this.#responseObj) { + // Reuse instance + return this.#responseObj.body as ReadableStream< + Uint8Array + >; + } + const fastBody = this.#fastBody(ReadableStream); + if (fastBody !== false) { + // Fast path + return fastBody as ReadableStream>; + } + // Slow path + return this.#response.body as ReadableStream>; + } + + get bodyUsed(): boolean { + if (this.#responseObj) { + return this.#responseObj.bodyUsed; + } + return false; + } + + arrayBuffer(): Promise { + if (this.#responseObj) { + // Reuse instance + return this.#responseObj.arrayBuffer(); + } + const fastBody = this.#fastBody(ArrayBuffer); + if (fastBody !== false) { + // Fast path + return Promise.resolve(fastBody || new ArrayBuffer(0)); + } + // Slow path + return this.#response.arrayBuffer(); + } + + blob(): Promise { + if (this.#responseObj) { + // Reuse instance + return this.#responseObj.blob(); + } + const fastBody = this.#fastBody(Blob); + if (fastBody !== false) { + // Fast path + return Promise.resolve(fastBody || new Blob()); + } + // Slow path + return this.#response.blob(); + } + + bytes(): Promise> { + if (this.#responseObj) { + // Reuse instance + return this.#responseObj.bytes() as Promise>; + } + const fastBody = this.#fastBody(Uint8Array); + if (fastBody !== false) { + // Fast path + return Promise.resolve(fastBody || new Uint8Array()); + } + // Slow path + return this.#response.bytes() as Promise>; + } + + formData(): Promise { + if (this.#responseObj) { + // Reuse instance + return this.#responseObj.formData(); + } + const fastBody = this.#fastBody(FormData); + if (fastBody !== false) { + // TODO: Content-Type should be one of "multipart/form-data" or "application/x-www-form-urlencoded" + // Fast path + return Promise.resolve(fastBody || new FormData()); + } + // Slow path + return this.#response.formData(); + } + + text(): Promise { + if (this.#responseObj) { + // Reuse instance + return this.#responseObj.text(); + } + const bodyInit = this.#body; + if (bodyInit === null || bodyInit === undefined) { + // No body + return Promise.resolve(""); + } + if (typeof bodyInit === "string") { + // Fast path + return Promise.resolve(bodyInit); + } + // Slow path + return this.#response.text(); + } + + json(): Promise { + if (this.#responseObj) { + // Reuse instance + return this.#responseObj.json(); + } + return this.text().then((text) => JSON.parse(text)); + } + }; + + Object.setPrototypeOf(_Response.prototype, globalThis.Response.prototype); + + return _Response as any; +})(); diff --git a/src/adapters/_uws/send.ts b/src/adapters/_uws/send.ts new file mode 100644 index 0000000..37bcaff --- /dev/null +++ b/src/adapters/_uws/send.ts @@ -0,0 +1,232 @@ +import type { UWSServerResponse } from "../../types.ts"; +import type { UWSResponse } from "./response.ts"; +import { kUWSAbort } from "./_common.ts"; + +function isReadableStream(v: unknown): v is ReadableStream { + return typeof ReadableStream !== "undefined" && v instanceof ReadableStream; +} + +function isNodeReadable(v: unknown): v is NodeJS.ReadableStream { + const obj = v as { pipe?: unknown; getReader?: unknown } | null | undefined; + return ( + !!obj && + typeof obj.pipe === "function" && + typeof obj.getReader !== "function" + ); +} + +function hasUwsResponse(v: unknown): v is UWSResponse { + const obj = v as { uwsResponse?: unknown } | null | undefined; + return !!obj && typeof obj.uwsResponse === "function"; +} + +function writeStatusAndHeaders( + res: UWSServerResponse, + status: number, + statusText: string, + headers: Iterable<[string, string]>, +) { + res.cork(() => { + res.writeStatus(`${status} ${statusText || ""}`); + for (const [key, value] of headers) { + res.writeHeader(key, value); + } + }); +} + +async function streamWebReadable( + res: UWSServerResponse, + stream: ReadableStream, +) { + let aborted = false; + res.onAborted(() => { + aborted = true; + try { + // Cancel the readable stream on abort + stream.cancel?.().catch?.(() => { + /* ignore */ + }); + } catch { + /* ignore */ + } + // Propagate to request.signal if available + try { + (res as unknown as Record void>)[kUWSAbort]?.(); + } catch { + /* ignore */ + } + }); + const reader = stream.getReader(); + try { + while (true) { + const { done, value } = await reader.read(); + if (done || aborted) break; + if (value && value.length > 0) { + // Best-effort backpressure handling; small chunks in tests won't saturate. + res.write(value); + } + } + } finally { + if (!aborted) { + // End only if not aborted + res.end(); + } + } +} + +async function streamNodeReadable( + res: UWSServerResponse, + nodeStream: NodeJS.ReadableStream, +) { + let aborted = false; + const onAborted = () => { + aborted = true; + try { + (nodeStream as unknown as { destroy?: () => void }).destroy?.(); + } catch { + /* ignore */ + } + try { + (res as unknown as Record void>)[kUWSAbort]?.(); + } catch { + /* ignore */ + } + }; + res.onAborted(onAborted); + + await new Promise((resolve) => { + const onData = (chunk: unknown) => { + if (aborted) return; + // Ensure Uint8Array or string per uWS API + if (typeof chunk === "string") { + res.write(chunk); + } else if (chunk instanceof Uint8Array) { + res.write(chunk); + } else if (ArrayBuffer.isView(chunk)) { + const view = chunk as ArrayBufferView; + res.write( + new Uint8Array(view.buffer, view.byteOffset, view.byteLength), + ); + } else { + // Fallback stringify + res.write(String(chunk)); + } + }; + const onEnd = () => { + nodeStream.off("data", onData); + nodeStream.off("end", onEnd); + nodeStream.off("error", onError); + if (!aborted) res.end(); + resolve(); + }; + const onError = () => { + nodeStream.off("data", onData); + nodeStream.off("end", onEnd); + nodeStream.off("error", onError); + if (!aborted) res.end(); + resolve(); + }; + nodeStream.on("data", onData); + nodeStream.once("end", onEnd); + nodeStream.once("error", onError); + }); +} + +export async function sendUWSResponse( + res: UWSServerResponse, + webRes: Response | UWSResponse, +): Promise { + if (res.aborted) { + return; + } + + if (!webRes) { + res.cork(() => { + res.writeStatus("500"); + res.end(); + }); + return; + } + + // If this is a fast UWSResponse, fully handle based on the extracted data. + const maybeFast = webRes as unknown; + if (hasUwsResponse(maybeFast)) { + const fast = (maybeFast as UWSResponse).uwsResponse(); + const { status, statusText, headers } = fast; + const body = fast.body as + | string + | Uint8Array + | ArrayBuffer + | DataView + | ReadableStream + | NodeJS.ReadableStream + | null + | undefined; + + // Streaming bodies + if (isReadableStream(body)) { + writeStatusAndHeaders(res, status, statusText, headers); + await streamWebReadable(res, body); + return; + } + if (isNodeReadable(body)) { + writeStatusAndHeaders(res, status, statusText, headers); + await streamNodeReadable(res, body); + return; + } + + // Non-streaming bodies + writeStatusAndHeaders(res, status, statusText, headers); + if (body === null || body === undefined) { + res.end(); + return; + } + if (typeof body === "string") { + res.end(body); + return; + } + if (body instanceof ArrayBuffer) { + res.end(body); + return; + } + if (body instanceof Uint8Array) { + res.end(body); + return; + } + if (body instanceof DataView) { + res.end(new Uint8Array(body.buffer, body.byteOffset, body.byteLength)); + return; + } + // Fallback + res.end(String(body)); + return; + } + + // Standard Response + const body = (webRes as Response).body; + if (isReadableStream(body)) { + writeStatusAndHeaders( + res, + webRes.status, + webRes.statusText, + (webRes.headers as unknown as Headers).entries(), + ); + await streamWebReadable(res, body as ReadableStream); + return; + } + + // Buffer small/finite bodies + const ab = body ? await (webRes as Response).arrayBuffer() : undefined; + if (res.aborted) return; + writeStatusAndHeaders( + res, + webRes.status, + webRes.statusText, + (webRes.headers as unknown as Headers).entries(), + ); + if (ab) { + res.end(ab); + } else { + res.end(); + } +} diff --git a/src/adapters/uws.ts b/src/adapters/uws.ts new file mode 100644 index 0000000..30ae222 --- /dev/null +++ b/src/adapters/uws.ts @@ -0,0 +1,145 @@ +import type { + FetchHandler, + Server, + ServerOptions, + ServerHandler, + UWSHTTPHandler, +} from "../types.ts"; +import { + createWaitUntil, + fmtURL, + printListening, + resolvePortAndHost, +} from "../_utils.ts"; +import { wrapFetch } from "../_middleware.ts"; +import { UWSRequest } from "./_uws/request.ts"; +import { sendUWSResponse } from "./_uws/send.ts"; +import { errorPlugin } from "../_plugins.ts"; + +import type { us_listen_socket } from "uWebSockets.js"; + +export { FastURL } from "../_url.ts"; +export { UWSRequest } from "./_uws/request.ts"; +export { UWSRequestHeaders, UWSResponseHeaders } from "./_uws/headers.ts"; +export { UWSResponse, UWSResponse as FastResponse } from "./_uws/response.ts"; +export { sendUWSResponse } from "./_uws/send.ts"; + +export function serve(options: ServerOptions): Server { + return new UWSServer(options); +} + +export function toUWSHandler(fetchHandler: FetchHandler): UWSHTTPHandler { + return (nodeRes, nodeReq) => { + const request = new UWSRequest({ req: nodeReq, res: nodeRes }); + const response = fetchHandler(request); + if (response instanceof Promise) { + response.then((resolved) => sendUWSResponse(nodeRes, resolved)); + } else { + sendUWSResponse(nodeRes, response); + } + }; +} + +class UWSServer implements Server { + readonly runtime = "uws"; + readonly uws: Server["uws"] = {}; + readonly options: Server["options"]; + readonly fetch: ServerHandler; + + #wait: ReturnType; + #listeningPromise?: Promise; + #listeningInfo?: { hostname?: string; port: number }; + #listenSocket?: us_listen_socket; + + constructor(options: ServerOptions) { + this.options = { ...options, middleware: [...(options.middleware || [])] }; + this.options.uws ??= {}; + + for (const plugin of options.plugins || []) { + plugin(this); + } + errorPlugin(this); + + this.fetch = wrapFetch(this); + this.#wait = createWaitUntil(); + + if (!options.manual) { + this.serve(); + } + } + + serve(): Promise { + if (this.uws?.server) { + return Promise.resolve(this.#listeningPromise).then(() => this); + } + this.#listeningPromise = (async () => { + const uws = await import("uWebSockets.js").catch((error) => { + console.error( + "Please install uWebSockets.js: `npm install uWebSockets.js`", + ); + throw error; + }); + this.uws!.server = + this.options.uws && + "cert_file_name" in this.options.uws && + this.options.uws.cert_file_name && + "key_file_name" in this.options.uws && + this.options.uws.key_file_name + ? uws.SSLApp(this.options.uws) + : uws.App(this.options.uws); + const handler = toUWSHandler(this.fetch); + this.uws!.server.any("/*", handler); + const { port } = resolvePortAndHost(this.options); + await new Promise((resolve, reject) => { + this.uws!.server!.listen( + port, + (listenSocket: us_listen_socket | false) => { + if (listenSocket) { + this.#listenSocket = listenSocket; + const { port, hostname } = resolvePortAndHost({ + ...this.options, + port: uws.us_socket_local_port(listenSocket), + }); + this.#listeningInfo = { hostname, port }; + printListening(this.options, this.url); + resolve(); + } else { + reject(new Error("Failed to listen on port " + port)); + } + }, + ); + }); + })(); + return this.#listeningPromise.then(() => this); + } + + get url(): string | undefined { + return this.#listeningInfo + ? fmtURL( + this.#listeningInfo.hostname, + this.#listeningInfo.port, + !!( + this.options.uws && + "cert_file_name" in this.options.uws && + this.options.uws.cert_file_name && + "key_file_name" in this.options.uws && + this.options.uws.key_file_name + ), + ) + : undefined; + } + + ready(): Promise { + return Promise.resolve(this.#listeningPromise).then(() => this); + } + + async close(): Promise { + await this.#wait.wait(); + if (this.uws?.server && this.#listenSocket) { + const { us_listen_socket_close } = await import("uWebSockets.js"); + us_listen_socket_close(this.#listenSocket); + this.uws.server.close(); + this.#listenSocket = undefined; + } + } +} diff --git a/src/types.ts b/src/types.ts index 378ec21..f5586a6 100644 --- a/src/types.ts +++ b/src/types.ts @@ -4,6 +4,7 @@ import type * as NodeHttp2 from "node:http2"; import type * as NodeNet from "node:net"; import type * as Bun from "bun"; import type * as CF from "@cloudflare/workers-types"; +import type * as uws from "uWebSockets.js"; // Utils type MaybePromise = T | Promise; @@ -158,6 +159,13 @@ export interface ServerOptions { */ deno?: Deno.ServeOptions; + /** + * uWebSockets.js server options + * + * @docs https://github.com/uNetworking/uWebSockets.js + */ + uws?: uws.AppOptions; + /** * Service worker options */ @@ -185,7 +193,8 @@ export interface Server { | "bun" | "cloudflare" | "service-worker" - | "generic"; + | "generic" + | "uws"; /** * Server options @@ -218,6 +227,11 @@ export interface Server { */ readonly deno?: { server?: Deno.HttpServer }; + /** + * uWebSockets.js context. + */ + readonly uws?: { server?: uws.TemplatedApp }; + /** * Server fetch handler */ @@ -274,6 +288,14 @@ export interface ServerRuntimeContext { server: Bun.Server; }; + /** + * Underlying uWebSockets.js server request context. + */ + uws?: { + req: UWSServerRequest; + res: UWSServerResponse; + }; + /** * Underlying Cloudflare request context. */ @@ -340,3 +362,18 @@ export type NodeHTTPMiddleware = ( ) => unknown | Promise; export type CloudflareFetchHandler = CF.ExportedHandlerFetchHandler; + +export type UWSServerRequest = uws.HttpRequest; + +export type UWSServerResponse = uws.HttpResponse; + +export type UWSHTTPHandler = ( + res: UWSServerResponse, + req: UWSServerRequest, +) => void | Promise; + +export type UWSHTTPMiddleware = ( + req: UWSServerRequest, + res: UWSServerResponse, + next: (error?: Error) => void, +) => unknown | Promise; diff --git a/test/uws.test.ts b/test/uws.test.ts new file mode 100644 index 0000000..46aadea --- /dev/null +++ b/test/uws.test.ts @@ -0,0 +1,84 @@ +import { describe, beforeAll, afterAll } from "vitest"; +import { fetch, Agent } from "undici"; +import type { RequestInfo, RequestInit } from "undici"; +import { addTests } from "./_tests.ts"; +import { serve, FastResponse } from "../src/adapters/uws.ts"; +import { getTLSCert } from "./_utils.ts"; +import { fixture } from "./_fixture.ts"; + +const tls = await getTLSCert(); + +const testConfigs = [ + { + name: "http", + Response: globalThis.Response, + }, + { + name: "http, FastResponse", + Response: FastResponse, + }, + { + name: "https", + Response: globalThis.Response, + serveOptions: { tls }, + }, + { + name: "https, FastResponse", + Response: FastResponse, + serveOptions: { tls }, + }, +]; + +for (const config of testConfigs) { + describe.sequential(`uws (${config.name})`, () => { + const client = getHttpClient(config.serveOptions?.tls); + let server: ReturnType | undefined; + + beforeAll(async () => { + server = serve( + fixture( + { + port: 0, + ...config.serveOptions, + }, + config.Response as unknown as typeof Response, + ), + ); + await server!.ready(); + }); + + afterAll(async () => { + await client.agent?.close(); + await server!.close(); + }); + + addTests({ + url: (path) => server!.url! + path.slice(1), + runtime: "uws", + fetch: client.fetch, + }); + }); +} + +function getHttpClient(tlsOptions?: { key: string; cert: string }) { + if (!tlsOptions) { + return { + fetch: globalThis.fetch, + agent: undefined, + }; + } + const httpsAgent = new Agent({ connect: { ...tls } }); + const fetchWithHttps = ( + input: RequestInfo, + init?: RequestInit, + ): Promise => + fetch(input, { + ...init, + dispatcher: httpsAgent, + }) as unknown as Promise; + + return { + fetch: fetchWithHttps as unknown as typeof globalThis.fetch, + agent: httpsAgent, + }; +}