Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
MatthewWid committed Jan 21, 2025
1 parent 104ec88 commit 5716db0
Show file tree
Hide file tree
Showing 3 changed files with 119 additions and 71 deletions.
157 changes: 86 additions & 71 deletions src/Session.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,20 @@
import {
type IncomingMessage as Http1ServerRequest,
ServerResponse as Http1ServerResponse,
IncomingMessage,
type OutgoingHttpHeaders,
ServerResponse,
} from "node:http";
import type {Http2ServerRequest, Http2ServerResponse} from "node:http2";
import {EventBuffer, type EventBufferOptions} from "./EventBuffer";
import {SseError} from "./lib/SseError";
import {type EventMap, TypedEmitter} from "./lib/TypedEmitter";
import {createPushFromIterable} from "./lib/createPushFromIterable";
import {createPushFromStream} from "./lib/createPushFromStream";
import {generateId} from "./lib/generateId";
import {type SanitizerFunction, sanitize} from "./lib/sanitize";
import {type SerializerFunction, serialize} from "./lib/serialize";
import type { Http2ServerRequest, Http2ServerResponse } from "node:http2";
import { EventBuffer, type EventBufferOptions } from "./EventBuffer";
import { SseError } from "./lib/SseError";
import { type EventMap, TypedEmitter } from "./lib/TypedEmitter";
import { createPushFromIterable } from "./lib/createPushFromIterable";
import { createPushFromStream } from "./lib/createPushFromStream";
import { generateId } from "./lib/generateId";
import { type SanitizerFunction, sanitize } from "./lib/sanitize";
import { type SerializerFunction, serialize } from "./lib/serialize";
import { Socket } from "node:net";

interface SessionOptions<State = DefaultSessionState>
extends Pick<EventBufferOptions, "serializer" | "sanitizer"> {
Expand Down Expand Up @@ -99,6 +102,8 @@ interface SessionEvents extends EventMap {
* @param options - Options given to the session instance.
*/
class Session<State = DefaultSessionState> extends TypedEmitter<SessionEvents> {
count = 0;

/**
* The last event ID sent to the client.
*
Expand Down Expand Up @@ -132,100 +137,93 @@ class Session<State = DefaultSessionState> extends TypedEmitter<SessionEvents> {
/**
* Raw HTTP request.
*/
private req: Http1ServerRequest | Http2ServerRequest;
private req: Http1ServerRequest = new IncomingMessage(new Socket());

/**
* Raw HTTP response that is the minimal interface needed and forms the
* intersection between the HTTP/1.1 and HTTP/2 server response interfaces.
*/
private res:
| Http1ServerResponse
| (Http2ServerResponse & {
write: (chunk: string) => void;
});
private res: Http1ServerResponse = new ServerResponse(this.req);

private request: Request;

private response: Response;

private writer: WritableStreamDefaultWriter;

private encoder = new TextEncoder();

private serialize: SerializerFunction;
private sanitize: SanitizerFunction;
private trustClientEventId: boolean;
private initialRetry: number | null;
private keepAliveInterval: number | null;
private keepAliveTimer?: ReturnType<typeof setInterval>;
private statusCode: number;
private headers: OutgoingHttpHeaders;

constructor(
req: Http1ServerRequest | Http2ServerRequest,
res: Http1ServerResponse | Http2ServerResponse,
req: Request,
res: Response,
options: SessionOptions<State> = {}
) {
super();

this.req = req;
this.res = res;

const serializer = options.serializer ?? serialize;
const sanitizer = options.sanitizer ?? sanitize;

this.serialize = serializer;
this.sanitize = sanitizer;

this.buffer = new EventBuffer({serializer, sanitizer});
this.buffer = new EventBuffer({ serializer, sanitizer });

this.trustClientEventId = options.trustClientEventId ?? true;

this.initialRetry = options.retry === null ? null : (options.retry ?? 2000);
this.initialRetry = options.retry === null ? null : options.retry ?? 2000;

this.keepAliveInterval =
options.keepAlive === null ? null : (options.keepAlive ?? 10000);

this.statusCode = options.statusCode ?? 200;

this.headers = options.headers ?? {};
options.keepAlive === null ? null : options.keepAlive ?? 10000;

this.state = options.state ?? ({} as State);

this.req.once("close", this.onDisconnected);
this.res.once("close", this.onDisconnected);

setImmediate(this.initialize);
}
const { readable, writable } = new TransformStream();

this.writer = writable.getWriter();

this.request = req.clone();

Check failure on line 191 in src/Session.ts

View workflow job for this annotation

GitHub Actions / test

Unhandled error

TypeError: req.clone is not a function ❯ new Session src/Session.ts:191:22 ❯ Server.<anonymous> src/Channel.test.ts:62:21 ❯ Server.emit node:events:524:28 ❯ parserOnIncoming node:_http_server:1153:12 ❯ HTTPParser.parserOnHeadersComplete node:_http_common:117:17 This error originated in "src/Channel.test.ts" test file. It doesn't mean the error was thrown inside the file itself, but while it was running. The latest test that might've caused the error is "can register and store an active session". It might mean one of the following: - The error was thrown, while Vitest was running this test. - If the error occurred after the test had been completed, this was the last documented test before it was thrown.

Check failure on line 191 in src/Session.ts

View workflow job for this annotation

GitHub Actions / test

Unhandled error

TypeError: req.clone is not a function ❯ new Session src/Session.ts:191:22 ❯ src/createSession.ts:10:19 ❯ Module.createSession src/createSession.ts:9:2 ❯ Server.<anonymous> src/createSession.test.ts:29:26 ❯ Server.emit node:events:524:28 ❯ parserOnIncoming node:_http_server:1153:12 ❯ HTTPParser.parserOnHeadersComplete node:_http_common:117:17 This error originated in "src/createSession.test.ts" test file. It doesn't mean the error was thrown inside the file itself, but while it was running. The latest test that might've caused the error is "resolves with an instance of a session". It might mean one of the following: - The error was thrown, while Vitest was running this test. - If the error occurred after the test had been completed, this was the last documented test before it was thrown.

Check failure on line 191 in src/Session.ts

View workflow job for this annotation

GitHub Actions / test

Unhandled error

TypeError: req.clone is not a function ❯ new Session src/Session.ts:191:22 ❯ Server.<anonymous> src/Channel.test.ts:87:21 ❯ Server.emit node:events:524:28 ❯ parserOnIncoming node:_http_server:1153:12 ❯ HTTPParser.parserOnHeadersComplete node:_http_common:117:17 This error originated in "src/Channel.test.ts" test file. It doesn't mean the error was thrown inside the file itself, but while it was running. The latest test that might've caused the error is "throws when registering a disconnected session". It might mean one of the following: - The error was thrown, while Vitest was running this test. - If the error occurred after the test had been completed, this was the last documented test before it was thrown.

Check failure on line 191 in src/Session.ts

View workflow job for this annotation

GitHub Actions / test

Unhandled error

TypeError: req.clone is not a function ❯ new Session src/Session.ts:191:22 ❯ Server.<anonymous> src/Session.test.ts:63:21 ❯ Server.emit node:events:524:28 ❯ parserOnIncoming node:_http_server:1153:12 ❯ HTTPParser.parserOnHeadersComplete node:_http_common:117:17 This error originated in "src/Session.test.ts" test file. It doesn't mean the error was thrown inside the file itself, but while it was running. The latest test that might've caused the error is "fires the connection event non-synchronously after response headers are sent". It might mean one of the following: - The error was thrown, while Vitest was running this test. - If the error occurred after the test had been completed, this was the last documented test before it was thrown.

Check failure on line 191 in src/Session.ts

View workflow job for this annotation

GitHub Actions / test

Unhandled error

TypeError: req.clone is not a function ❯ new Session src/Session.ts:191:22 ❯ Server.<anonymous> src/Session.test.ts:76:21 ❯ Server.emit node:events:524:28 ❯ parserOnIncoming node:_http_server:1153:12 ❯ HTTPParser.parserOnHeadersComplete node:_http_common:117:17 This error originated in "src/Session.test.ts" test file. It doesn't mean the error was thrown inside the file itself, but while it was running. The latest test that might've caused the error is "fires the disconnection event when the client kills the connection". It might mean one of the following: - The error was thrown, while Vitest was running this test. - If the error occurred after the test had been completed, this was the last documented test before it was thrown.

Check failure on line 191 in src/Session.ts

View workflow job for this annotation

GitHub Actions / test

Unhandled error

TypeError: req.clone is not a function ❯ new Session src/Session.ts:191:22 ❯ Server.<anonymous> src/Channel.test.ts:108:21 ❯ Server.emit node:events:524:28 ❯ parserOnIncoming node:_http_server:1153:12 ❯ HTTPParser.parserOnHeadersComplete node:_http_common:117:17 This error originated in "src/Channel.test.ts" test file. It doesn't mean the error was thrown inside the file itself, but while it was running. The latest test that might've caused the error is "emits a session registration event". It might mean one of the following: - The error was thrown, while Vitest was running this test. - If the error occurred after the test had been completed, this was the last documented test before it was thrown.

Check failure on line 191 in src/Session.ts

View workflow job for this annotation

GitHub Actions / test

Unhandled error

TypeError: req.clone is not a function ❯ new Session src/Session.ts:191:22 ❯ Server.<anonymous> src/Session.test.ts:95:21 ❯ Server.emit node:events:524:28 ❯ parserOnIncoming node:_http_server:1153:12 ❯ HTTPParser.parserOnHeadersComplete node:_http_common:117:17 This error originated in "src/Session.test.ts" test file. It doesn't mean the error was thrown inside the file itself, but while it was running. The latest test that might've caused the error is "fires the disconnection event when the server closes the response stream". It might mean one of the following: - The error was thrown, while Vitest was running this test. - If the error occurred after the test had been completed, this was the last documented test before it was thrown.

Check failure on line 191 in src/Session.ts

View workflow job for this annotation

GitHub Actions / test

Unhandled error

TypeError: req.clone is not a function ❯ new Session src/Session.ts:191:22 ❯ Server.<anonymous> src/Channel.test.ts:131:21 ❯ Server.emit node:events:524:28 ❯ parserOnIncoming node:_http_server:1153:12 ❯ HTTPParser.parserOnHeadersComplete node:_http_common:117:17 This error originated in "src/Channel.test.ts" test file. It doesn't mean the error was thrown inside the file itself, but while it was running. The latest test that might've caused the error is "does not emit a registration event if the session is already registered". It might mean one of the following: - The error was thrown, while Vitest was running this test. - If the error occurred after the test had been completed, this was the last documented test before it was thrown.

Check failure on line 191 in src/Session.ts

View workflow job for this annotation

GitHub Actions / test

Unhandled error

TypeError: req.clone is not a function ❯ new Session src/Session.ts:191:22 ❯ Server.<anonymous> src/Channel.test.ts:151:21 ❯ Server.emit node:events:524:28 ❯ parserOnIncoming node:_http_server:1153:12 ❯ HTTPParser.parserOnHeadersComplete node:_http_common:117:17 This error originated in "src/Channel.test.ts" test file. It doesn't mean the error was thrown inside the file itself, but while it was running. The latest test that might've caused the error is "removes a session from the active sessions after deregistering it". It might mean one of the following: - The error was thrown, while Vitest was running this test. - If the error occurred after the test had been completed, this was the last documented test before it was thrown.
this.response = new Response(readable, {
status: options.statusCode ?? 200,
headers: {
"Content-Type": "text/event-stream",
"Cache-Control":
"private, no-cache, no-store, no-transform, must-revalidate, max-age=0",
Connection: "keep-alive",
Pragma: "no-cache",
"X-Accel-Buffering": "no",
},
});

if (options.headers) {
for (const [name, value] of Object.entries(options.headers)) {
this.response.headers.set(name, (value as string) ?? "");
}
}

private initialize = () => {
const url = `http://${this.req.headers.host}${this.req.url}`;
const params = new URL(url).searchParams;
const params = new URL(this.request.url).searchParams;

if (this.trustClientEventId) {
const givenLastEventId =
this.req.headers["last-event-id"] ??
this.lastId =
this.request.headers.get("last-event-id") ??
params.get("lastEventId") ??
params.get("evs_last_event_id") ??
"";

this.lastId = givenLastEventId as string;
}

const headers: OutgoingHttpHeaders = {};

if (this.res instanceof Http1ServerResponse) {
headers["Content-Type"] = "text/event-stream";
headers["Cache-Control"] =
"private, no-cache, no-store, no-transform, must-revalidate, max-age=0";
headers["Connection"] = "keep-alive";
headers["Pragma"] = "no-cache";
headers["X-Accel-Buffering"] = "no";
} else {
headers["content-type"] = "text/event-stream";
headers["cache-control"] =
"private, no-cache, no-store, no-transform, must-revalidate, max-age=0";
headers["pragma"] = "no-cache";
headers["x-accel-buffering"] = "no";
}
this.request.signal.addEventListener("abort", this.onDisconnected);

for (const [name, value] of Object.entries(this.headers)) {
headers[name] = value ?? "";
}
setImmediate(this.initialize);
}

this.res.writeHead(this.statusCode, headers);
private initialize = async () => {
const params = new URL(this.request.url).searchParams;

if (params.has("padding")) {
this.buffer.comment(" ".repeat(2049)).dispatch();
Expand All @@ -242,17 +240,20 @@ class Session<State = DefaultSessionState> extends TypedEmitter<SessionEvents> {
this.flush();

if (this.keepAliveInterval !== null) {
this.keepAliveTimer = setInterval(this.keepAlive, this.keepAliveInterval);
// this.keepAliveTimer = setInterval(this.keepAlive, this.keepAliveInterval);
}

this.isConnected = true;

this.emit("connected");
};

private onDisconnected = () => {
this.req.removeListener("close", this.onDisconnected);
this.res.removeListener("close", this.onDisconnected);
private onDisconnected = async () => {
console.log("onDisconnected");

this.request.signal.removeEventListener("abort", this.onDisconnected);

await this.writer.close();

if (this.keepAliveTimer) {
clearInterval(this.keepAliveTimer);
Expand All @@ -268,6 +269,10 @@ class Session<State = DefaultSessionState> extends TypedEmitter<SessionEvents> {
this.flush();
};

getRequest = () => this.request;

getResponse = () => this.response;

/**
* @deprecated see https://github.com/MatthewWid/better-sse/issues/52
*/
Expand Down Expand Up @@ -329,12 +334,22 @@ class Session<State = DefaultSessionState> extends TypedEmitter<SessionEvents> {
*
* @deprecated see https://github.com/MatthewWid/better-sse/issues/52
*/
flush = (): this => {
this.res.write(this.buffer.read());
flush = async () => {
console.log("Attempting to flush", `"${this.buffer.read()}"`);

this.buffer.clear();
const encoded = this.encoder.encode(this.buffer.read());

return this;
console.log("about to ready");

await this.writer.ready;

console.log("passed ready");

this.writer.write(encoded).then((value) => {
console.log("flush write back", this.count++, value);
});

this.buffer.clear();
};

/**
Expand Down Expand Up @@ -429,5 +444,5 @@ class Session<State = DefaultSessionState> extends TypedEmitter<SessionEvents> {
};
}

export type {SessionOptions, SessionEvents, DefaultSessionState};
export {Session};
export type { SessionOptions, SessionEvents, DefaultSessionState };
export { Session };
23 changes: 23 additions & 0 deletions src/lib/createRequest.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import type { IncomingMessage } from "node:http";

export const createRequest = (req: IncomingMessage | Request): Request => {
if (req instanceof Request) {
return req.clone();
}

const url = `http://${req.headers.host}${req.url}`;

const controller = new AbortController();

req.once("close", () => {
controller.abort();
});

const request = new Request(url, {
method: req.method,
headers: req.headers as Record<string, string>,
signal: controller.signal,
});

return request;
};
10 changes: 10 additions & 0 deletions src/lib/createResponse.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import type { ServerResponse } from "http";
import * as stream from "node:stream";

export const createResponse = (res: ServerResponse | Response): Response => {
if (res instanceof Response) {
return res.clone();
}

const response = stream.Writable.toWeb(res);
};

0 comments on commit 5716db0

Please sign in to comment.