From 5716db0c914dd5a22673a56c4891a1ef96062b02 Mon Sep 17 00:00:00 2001 From: Matthew W Date: Tue, 21 Jan 2025 20:53:37 +1100 Subject: [PATCH] wip --- src/Session.ts | 157 +++++++++++++++++++++----------------- src/lib/createRequest.ts | 23 ++++++ src/lib/createResponse.ts | 10 +++ 3 files changed, 119 insertions(+), 71 deletions(-) create mode 100644 src/lib/createRequest.ts create mode 100644 src/lib/createResponse.ts diff --git a/src/Session.ts b/src/Session.ts index fae224e..d8fdfb9 100644 --- a/src/Session.ts +++ b/src/Session.ts @@ -1,17 +1,20 @@ import { type IncomingMessage as Http1ServerRequest, ServerResponse as Http1ServerResponse, + IncomingMessage, type OutgoingHttpHeaders, + ServerResponse, } from "node:http"; -import type {Http2ServerRequest, Http2ServerResponse} from "node:http2"; -import {EventBuffer, type EventBufferOptions} from "./EventBuffer"; -import {SseError} from "./lib/SseError"; -import {type EventMap, TypedEmitter} from "./lib/TypedEmitter"; -import {createPushFromIterable} from "./lib/createPushFromIterable"; -import {createPushFromStream} from "./lib/createPushFromStream"; -import {generateId} from "./lib/generateId"; -import {type SanitizerFunction, sanitize} from "./lib/sanitize"; -import {type SerializerFunction, serialize} from "./lib/serialize"; +import type { Http2ServerRequest, Http2ServerResponse } from "node:http2"; +import { EventBuffer, type EventBufferOptions } from "./EventBuffer"; +import { SseError } from "./lib/SseError"; +import { type EventMap, TypedEmitter } from "./lib/TypedEmitter"; +import { createPushFromIterable } from "./lib/createPushFromIterable"; +import { createPushFromStream } from "./lib/createPushFromStream"; +import { generateId } from "./lib/generateId"; +import { type SanitizerFunction, sanitize } from "./lib/sanitize"; +import { type SerializerFunction, serialize } from "./lib/serialize"; +import { Socket } from "node:net"; interface SessionOptions extends Pick { @@ -99,6 +102,8 @@ interface SessionEvents extends EventMap { * @param options - Options given to the session instance. */ class Session extends TypedEmitter { + count = 0; + /** * The last event ID sent to the client. * @@ -132,17 +137,21 @@ class Session extends TypedEmitter { /** * Raw HTTP request. */ - private req: Http1ServerRequest | Http2ServerRequest; + private req: Http1ServerRequest = new IncomingMessage(new Socket()); /** * Raw HTTP response that is the minimal interface needed and forms the * intersection between the HTTP/1.1 and HTTP/2 server response interfaces. */ - private res: - | Http1ServerResponse - | (Http2ServerResponse & { - write: (chunk: string) => void; - }); + private res: Http1ServerResponse = new ServerResponse(this.req); + + private request: Request; + + private response: Response; + + private writer: WritableStreamDefaultWriter; + + private encoder = new TextEncoder(); private serialize: SerializerFunction; private sanitize: SanitizerFunction; @@ -150,82 +159,71 @@ class Session extends TypedEmitter { private initialRetry: number | null; private keepAliveInterval: number | null; private keepAliveTimer?: ReturnType; - private statusCode: number; - private headers: OutgoingHttpHeaders; constructor( - req: Http1ServerRequest | Http2ServerRequest, - res: Http1ServerResponse | Http2ServerResponse, + req: Request, + res: Response, options: SessionOptions = {} ) { super(); - this.req = req; - this.res = res; - const serializer = options.serializer ?? serialize; const sanitizer = options.sanitizer ?? sanitize; this.serialize = serializer; this.sanitize = sanitizer; - this.buffer = new EventBuffer({serializer, sanitizer}); + this.buffer = new EventBuffer({ serializer, sanitizer }); this.trustClientEventId = options.trustClientEventId ?? true; - this.initialRetry = options.retry === null ? null : (options.retry ?? 2000); + this.initialRetry = options.retry === null ? null : options.retry ?? 2000; this.keepAliveInterval = - options.keepAlive === null ? null : (options.keepAlive ?? 10000); - - this.statusCode = options.statusCode ?? 200; - - this.headers = options.headers ?? {}; + options.keepAlive === null ? null : options.keepAlive ?? 10000; this.state = options.state ?? ({} as State); - this.req.once("close", this.onDisconnected); - this.res.once("close", this.onDisconnected); - - setImmediate(this.initialize); - } + const { readable, writable } = new TransformStream(); + + this.writer = writable.getWriter(); + + this.request = req.clone(); + this.response = new Response(readable, { + status: options.statusCode ?? 200, + headers: { + "Content-Type": "text/event-stream", + "Cache-Control": + "private, no-cache, no-store, no-transform, must-revalidate, max-age=0", + Connection: "keep-alive", + Pragma: "no-cache", + "X-Accel-Buffering": "no", + }, + }); + + if (options.headers) { + for (const [name, value] of Object.entries(options.headers)) { + this.response.headers.set(name, (value as string) ?? ""); + } + } - private initialize = () => { - const url = `http://${this.req.headers.host}${this.req.url}`; - const params = new URL(url).searchParams; + const params = new URL(this.request.url).searchParams; if (this.trustClientEventId) { - const givenLastEventId = - this.req.headers["last-event-id"] ?? + this.lastId = + this.request.headers.get("last-event-id") ?? params.get("lastEventId") ?? params.get("evs_last_event_id") ?? ""; - - this.lastId = givenLastEventId as string; } - const headers: OutgoingHttpHeaders = {}; - - if (this.res instanceof Http1ServerResponse) { - headers["Content-Type"] = "text/event-stream"; - headers["Cache-Control"] = - "private, no-cache, no-store, no-transform, must-revalidate, max-age=0"; - headers["Connection"] = "keep-alive"; - headers["Pragma"] = "no-cache"; - headers["X-Accel-Buffering"] = "no"; - } else { - headers["content-type"] = "text/event-stream"; - headers["cache-control"] = - "private, no-cache, no-store, no-transform, must-revalidate, max-age=0"; - headers["pragma"] = "no-cache"; - headers["x-accel-buffering"] = "no"; - } + this.request.signal.addEventListener("abort", this.onDisconnected); - for (const [name, value] of Object.entries(this.headers)) { - headers[name] = value ?? ""; - } + setImmediate(this.initialize); + } - this.res.writeHead(this.statusCode, headers); + private initialize = async () => { + const params = new URL(this.request.url).searchParams; if (params.has("padding")) { this.buffer.comment(" ".repeat(2049)).dispatch(); @@ -242,7 +240,7 @@ class Session extends TypedEmitter { this.flush(); if (this.keepAliveInterval !== null) { - this.keepAliveTimer = setInterval(this.keepAlive, this.keepAliveInterval); + // this.keepAliveTimer = setInterval(this.keepAlive, this.keepAliveInterval); } this.isConnected = true; @@ -250,9 +248,12 @@ class Session extends TypedEmitter { this.emit("connected"); }; - private onDisconnected = () => { - this.req.removeListener("close", this.onDisconnected); - this.res.removeListener("close", this.onDisconnected); + private onDisconnected = async () => { + console.log("onDisconnected"); + + this.request.signal.removeEventListener("abort", this.onDisconnected); + + await this.writer.close(); if (this.keepAliveTimer) { clearInterval(this.keepAliveTimer); @@ -268,6 +269,10 @@ class Session extends TypedEmitter { this.flush(); }; + getRequest = () => this.request; + + getResponse = () => this.response; + /** * @deprecated see https://github.com/MatthewWid/better-sse/issues/52 */ @@ -329,12 +334,22 @@ class Session extends TypedEmitter { * * @deprecated see https://github.com/MatthewWid/better-sse/issues/52 */ - flush = (): this => { - this.res.write(this.buffer.read()); + flush = async () => { + console.log("Attempting to flush", `"${this.buffer.read()}"`); - this.buffer.clear(); + const encoded = this.encoder.encode(this.buffer.read()); - return this; + console.log("about to ready"); + + await this.writer.ready; + + console.log("passed ready"); + + this.writer.write(encoded).then((value) => { + console.log("flush write back", this.count++, value); + }); + + this.buffer.clear(); }; /** @@ -429,5 +444,5 @@ class Session extends TypedEmitter { }; } -export type {SessionOptions, SessionEvents, DefaultSessionState}; -export {Session}; +export type { SessionOptions, SessionEvents, DefaultSessionState }; +export { Session }; diff --git a/src/lib/createRequest.ts b/src/lib/createRequest.ts new file mode 100644 index 0000000..d457c79 --- /dev/null +++ b/src/lib/createRequest.ts @@ -0,0 +1,23 @@ +import type { IncomingMessage } from "node:http"; + +export const createRequest = (req: IncomingMessage | Request): Request => { + if (req instanceof Request) { + return req.clone(); + } + + const url = `http://${req.headers.host}${req.url}`; + + const controller = new AbortController(); + + req.once("close", () => { + controller.abort(); + }); + + const request = new Request(url, { + method: req.method, + headers: req.headers as Record, + signal: controller.signal, + }); + + return request; +}; diff --git a/src/lib/createResponse.ts b/src/lib/createResponse.ts new file mode 100644 index 0000000..5fd3010 --- /dev/null +++ b/src/lib/createResponse.ts @@ -0,0 +1,10 @@ +import type { ServerResponse } from "http"; +import * as stream from "node:stream"; + +export const createResponse = (res: ServerResponse | Response): Response => { + if (res instanceof Response) { + return res.clone(); + } + + const response = stream.Writable.toWeb(res); +};