Skip to content

Commit

Permalink
feat: transport-dom time out streams, emit aborts (#2074)
Browse files Browse the repository at this point in the history
* time out stalled streams
* cancelled stream requests emit abort event
  • Loading branch information
turbocrime authored Feb 26, 2025
1 parent 521caaa commit a11bfe3
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 25 deletions.
5 changes: 5 additions & 0 deletions .changeset/popular-keys-look.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'@penumbra-zone/transport-dom': patch
---

response streams will now respect the request's timeout configuration.
8 changes: 4 additions & 4 deletions packages/transport-chrome/src/with-dom.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -470,7 +470,7 @@ describe('session client with transport-dom', () => {
});

describe("doesn't emit abort events", () => {
it('can cancel streams before init, but does not emit an abort', async () => {
it.fails('can cancel streams before init, but does not emit an abort', async () => {
const { uncaughtExceptionListener, restoreUncaughtExceptionListener } =
replaceUncaughtExceptionListener();
onTestFinished(restoreUncaughtExceptionListener);
Expand Down Expand Up @@ -503,7 +503,7 @@ describe('session client with transport-dom', () => {
expect(uncaughtExceptionListener).not.toHaveBeenCalled();
});

it('can cancel streams already in progress, but does not emit an abort', async () => {
it.fails('can cancel streams already in progress, but does not emit an abort', async () => {
const { uncaughtExceptionListener, restoreUncaughtExceptionListener } =
replaceUncaughtExceptionListener();
onTestFinished(restoreUncaughtExceptionListener);
Expand Down Expand Up @@ -577,7 +577,7 @@ describe('session client with transport-dom', () => {
});

describe('emits abort events', () => {
it.fails('can cancel streams before init, and emits an abort', async () => {
it('can cancel streams before init, and emits an abort', async () => {
const { uncaughtExceptionListener, restoreUncaughtExceptionListener } =
replaceUncaughtExceptionListener();
onTestFinished(restoreUncaughtExceptionListener);
Expand Down Expand Up @@ -629,7 +629,7 @@ describe('session client with transport-dom', () => {
expect(uncaughtExceptionListener).not.toHaveBeenCalled();
});

it.fails('can cancel streams already in progress, and emits an abort', async () => {
it('can cancel streams already in progress, and emits an abort', async () => {
const { uncaughtExceptionListener, restoreUncaughtExceptionListener } =
replaceUncaughtExceptionListener();
onTestFinished(restoreUncaughtExceptionListener);
Expand Down
10 changes: 5 additions & 5 deletions packages/transport-dom/src/create.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ describe('channel transport', () => {
expect(messages).toMatchObject(introduceResponse);
});

it.fails('should time out streaming requests that stall', async () => {
it('should time out streaming requests that stall', async () => {
otherEnd.mockImplementation((event: MessageEvent<unknown>) => {
const { requestId } = event.data as TransportMessage;
const stream = ReadableStream.from(
Expand Down Expand Up @@ -480,7 +480,7 @@ describe('channel transport', () => {
});

describe("doesn't emit abort events", () => {
it('can cancel streams before init, but does not emit an abort', async () => {
it.fails('can cancel streams before init, but does not emit an abort', async () => {
expect(otherEnd).not.toHaveBeenCalled();

const ac = new AbortController();
Expand Down Expand Up @@ -508,7 +508,7 @@ describe('channel transport', () => {
expect(otherEnd).toHaveBeenCalledOnce();
});

it('can cancel streams already in progress, but does not emit an abort', async () => {
it.fails('can cancel streams already in progress, but does not emit an abort', async () => {
const errorEventListener = vi.fn();
window.addEventListener('error', errorEventListener);
onTestFinished(() => window.removeEventListener('error', errorEventListener));
Expand Down Expand Up @@ -575,7 +575,7 @@ describe('channel transport', () => {
});

describe('emits abort events', () => {
it.fails('can cancel streams before init, and emits an abort', async () => {
it('can cancel streams before init, and emits an abort', async () => {
expect(otherEnd).not.toHaveBeenCalled();

const ac = new AbortController();
Expand Down Expand Up @@ -611,7 +611,7 @@ describe('channel transport', () => {
);
});

it.fails('can cancel streams already in progress, and emits an abort', async () => {
it('can cancel streams already in progress, and emits an abort', async () => {
const defaultTimeoutMs = 200;
const responses: PlainMessage<IntroduceResponse>[] = [
{ sentence: 'something remarkably similar' },
Expand Down
53 changes: 37 additions & 16 deletions packages/transport-dom/src/create.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,21 +154,23 @@ export const createChannelTransport = ({
async unary<I extends Message<I> = AnyMessage, O extends Message<O> = AnyMessage>(
service: ServiceType,
method: MethodInfo<I, O>,
signal: AbortSignal | undefined,
signal: AbortSignal | undefined = new AbortController().signal,
timeoutMs: number | undefined = defaultTimeoutMs,
header: HeadersInit | undefined,
input: PartialMessage<I>,
): Promise<UnaryResponse<I, O>> {
transportFailure.signal.throwIfAborted();
if (transportFailure.signal.aborted) {
throw transportFailure.signal.reason;
}
port ??= await connect();

const requestId = crypto.randomUUID();

const requestFailure = new AbortController();
const deadline = timeoutMs ? AbortSignal.timeout(timeoutMs) : undefined;
const requestDeadline = timeoutMs ? AbortSignal.timeout(timeoutMs) : undefined;

const response = Promise.race([
rejectOnSignal(transportFailure.signal, requestFailure.signal, deadline, signal),
rejectOnSignal(transportFailure.signal, requestFailure.signal, requestDeadline, signal),
new Promise<TransportMessage>((resolve, reject) => {
pending.set(requestId, (tev: TransportEvent) => {
if (isTransportMessage(tev, requestId)) {
Expand All @@ -182,13 +184,13 @@ export const createChannelTransport = ({
}),
]).finally(() => pending.delete(requestId));

if (!signal?.aborted) {
if (!signal.aborted) {
try {
switch (method.kind) {
case MethodKind.Unary:
{
const message = Any.pack(new method.I(input)).toJson(jsonOptions);
signal?.addEventListener('abort', () =>
signal.addEventListener('abort', () =>
port?.postMessage({ requestId, abort: true } satisfies TransportAbort),
);

Expand Down Expand Up @@ -224,21 +226,23 @@ export const createChannelTransport = ({
async stream<I extends Message<I> = AnyMessage, O extends Message<O> = AnyMessage>(
service: ServiceType,
method: MethodInfo<I, O>,
signal: AbortSignal | undefined,
signal: AbortSignal | undefined = new AbortController().signal,
timeoutMs: number | undefined = defaultTimeoutMs,
header: HeadersInit | undefined,
input: AsyncIterable<PartialMessage<I>>,
): Promise<StreamResponse<I, O>> {
transportFailure.signal.throwIfAborted();
if (transportFailure.signal.aborted) {
throw transportFailure.signal.reason;
}
port ??= await connect();

const requestId = crypto.randomUUID();

const requestFailure = new AbortController();
const deadline = timeoutMs ? AbortSignal.timeout(timeoutMs) : undefined;
const requestDeadline = timeoutMs ? AbortSignal.timeout(timeoutMs) : undefined;

const response = Promise.race([
rejectOnSignal(transportFailure.signal, requestFailure.signal, deadline, signal),
rejectOnSignal(transportFailure.signal, requestFailure.signal, requestDeadline, signal),
new Promise<TransportStream>((resolve, reject) => {
pending.set(requestId, (tev: TransportEvent) => {
if (isTransportStream(tev, requestId)) {
Expand All @@ -252,7 +256,7 @@ export const createChannelTransport = ({
}),
]).finally(() => pending.delete(requestId));

if (!signal?.aborted) {
if (!signal.aborted) {
try {
switch (method.kind) {
case MethodKind.ServerStreaming:
Expand All @@ -267,6 +271,9 @@ export const createChannelTransport = ({
// confirm the input stream ended after one message with content
if (done && typeof value === 'object' && value !== null) {
const message = Any.pack(new method.I(value as object)).toJson(jsonOptions);
signal.addEventListener('abort', () =>
port?.postMessage({ requestId, abort: true } satisfies TransportAbort),
);
port.postMessage({
requestId,
message,
Expand All @@ -293,6 +300,9 @@ export const createChannelTransport = ({
cont.enqueue(Any.pack(new method.I(chunk)).toJson(jsonOptions)),
}),
);
signal.addEventListener('abort', () =>
port?.postMessage({ requestId, abort: true } satisfies TransportAbort),
);
port.postMessage(
{
requestId,
Expand All @@ -311,24 +321,35 @@ export const createChannelTransport = ({
}
}

const chunkAc = new AbortController();
const chunkDeadline = chunkAc.signal;
const chunkDeadlineExceeded = () => {
if (timeoutMs) {
chunkAc.abort(ConnectError.from('Stream stalled', Code.DeadlineExceeded));
}
};

return {
service,
method,
stream: true,
header: new Headers((await response).header),
trailer: new Headers((await response).trailer),
message: await response.then(({ stream }) =>
stream.pipeThrough(
message: await response.then(({ stream }) => {
let chunkTimeout = setTimeout(chunkDeadlineExceeded, timeoutMs);
return stream.pipeThrough(
new TransformStream({
transform: (chunk, cont) => {
clearTimeout(chunkTimeout);
chunkTimeout = setTimeout(chunkDeadlineExceeded, timeoutMs);
const o = new method.O();
Any.fromJson(chunk, jsonOptions).unpackTo(o);
cont.enqueue(o);
},
}),
{ signal },
),
),
{ signal: AbortSignal.any([signal, chunkDeadline]) },
);
}),
};
},
};
Expand Down

0 comments on commit a11bfe3

Please sign in to comment.