Skip to content

Commit

Permalink
[Flight] Allow aborting encodeReply (#31106)
Browse files Browse the repository at this point in the history
Allow aborting encoding arguments to a Server Action if a Promise
doesn't resolve. That way at least part of the arguments can be used on
the receiving side. This leaves it unresolved in the stream rather than
encoding an error.

This should error on the receiving side when the stream closes but it
doesn't right now in the Edge/Browser versions because closing happens
immediately before we've had a chance to call `.then()` so the Chunks
are still in pending state. This is an existing bug also in
FlightClient.
  • Loading branch information
sebmarkbage authored Oct 1, 2024
1 parent d8c90fa commit 99c056a
Show file tree
Hide file tree
Showing 7 changed files with 102 additions and 11 deletions.
17 changes: 16 additions & 1 deletion packages/react-client/src/ReactFlightReplyClient.js
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ export function processReply(
temporaryReferences: void | TemporaryReferenceSet,
resolve: (string | FormData) => void,
reject: (error: mixed) => void,
): void {
): (reason: mixed) => void {
let nextPartId = 1;
let pendingParts = 0;
let formData: null | FormData = null;
Expand Down Expand Up @@ -841,6 +841,19 @@ export function processReply(
return JSON.stringify(model, resolveToJSON);
}

function abort(reason: mixed): void {
if (pendingParts > 0) {
pendingParts = 0; // Don't resolve again later.
// Resolve with what we have so far, which may have holes at this point.
// They'll error when the stream completes on the server.
if (formData === null) {
resolve(json);
} else {
resolve(formData);
}
}
}

const json = serializeModel(root, 0);

if (formData === null) {
Expand All @@ -854,6 +867,8 @@ export function processReply(
resolve(formData);
}
}

return abort;
}

const boundCache: WeakMap<
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,12 @@ function createFromFetch<T>(

function encodeReply(
value: ReactServerValue,
options?: {temporaryReferences?: TemporaryReferenceSet},
options?: {temporaryReferences?: TemporaryReferenceSet, signal?: AbortSignal},
): Promise<
string | URLSearchParams | FormData,
> /* We don't use URLSearchParams yet but maybe */ {
return new Promise((resolve, reject) => {
processReply(
const abort = processReply(
value,
'',
options && options.temporaryReferences
Expand All @@ -135,6 +135,18 @@ function encodeReply(
resolve,
reject,
);
if (options && options.signal) {
const signal = options.signal;
if (signal.aborted) {
abort((signal: any).reason);
} else {
const listener = () => {
abort((signal: any).reason);
signal.removeEventListener('abort', listener);
};
signal.addEventListener('abort', listener);
}
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,12 @@ function createFromFetch<T>(

function encodeReply(
value: ReactServerValue,
options?: {temporaryReferences?: TemporaryReferenceSet},
options?: {temporaryReferences?: TemporaryReferenceSet, signal?: AbortSignal},
): Promise<
string | URLSearchParams | FormData,
> /* We don't use URLSearchParams yet but maybe */ {
return new Promise((resolve, reject) => {
processReply(
const abort = processReply(
value,
'',
options && options.temporaryReferences
Expand All @@ -134,6 +134,18 @@ function encodeReply(
resolve,
reject,
);
if (options && options.signal) {
const signal = options.signal;
if (signal.aborted) {
abort((signal: any).reason);
} else {
const listener = () => {
abort((signal: any).reason);
signal.removeEventListener('abort', listener);
};
signal.addEventListener('abort', listener);
}
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,12 +149,12 @@ function createFromFetch<T>(

function encodeReply(
value: ReactServerValue,
options?: {temporaryReferences?: TemporaryReferenceSet},
options?: {temporaryReferences?: TemporaryReferenceSet, signal?: AbortSignal},
): Promise<
string | URLSearchParams | FormData,
> /* We don't use URLSearchParams yet but maybe */ {
return new Promise((resolve, reject) => {
processReply(
const abort = processReply(
value,
'',
options && options.temporaryReferences
Expand All @@ -163,6 +163,18 @@ function encodeReply(
resolve,
reject,
);
if (options && options.signal) {
const signal = options.signal;
if (signal.aborted) {
abort((signal: any).reason);
} else {
const listener = () => {
abort((signal: any).reason);
signal.removeEventListener('abort', listener);
};
signal.addEventListener('abort', listener);
}
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -618,4 +618,20 @@ describe('ReactFlightDOMReply', () => {
const root = await ReactServerDOMServer.decodeReply(body, webpackServerMap);
expect(root.prop.obj).toBe(root.prop);
});

it('can abort an unresolved model and get the partial result', async () => {
const promise = new Promise(r => {});
const controller = new AbortController();
const bodyPromise = ReactServerDOMClient.encodeReply(
{promise: promise, hello: 'world'},
{signal: controller.signal},
);
controller.abort();

const result = await ReactServerDOMServer.decodeReply(await bodyPromise);
expect(result.hello).toBe('world');
// TODO: await result.promise should reject at this point because the stream
// has closed but that's a bug in both ReactFlightReplyServer and ReactFlightClient.
// It just halts in this case.
});
});
Original file line number Diff line number Diff line change
Expand Up @@ -120,12 +120,12 @@ function createFromFetch<T>(

function encodeReply(
value: ReactServerValue,
options?: {temporaryReferences?: TemporaryReferenceSet},
options?: {temporaryReferences?: TemporaryReferenceSet, signal?: AbortSignal},
): Promise<
string | URLSearchParams | FormData,
> /* We don't use URLSearchParams yet but maybe */ {
return new Promise((resolve, reject) => {
processReply(
const abort = processReply(
value,
'',
options && options.temporaryReferences
Expand All @@ -134,6 +134,18 @@ function encodeReply(
resolve,
reject,
);
if (options && options.signal) {
const signal = options.signal;
if (signal.aborted) {
abort((signal: any).reason);
} else {
const listener = () => {
abort((signal: any).reason);
signal.removeEventListener('abort', listener);
};
signal.addEventListener('abort', listener);
}
}
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,12 +149,12 @@ function createFromFetch<T>(

function encodeReply(
value: ReactServerValue,
options?: {temporaryReferences?: TemporaryReferenceSet},
options?: {temporaryReferences?: TemporaryReferenceSet, signal?: AbortSignal},
): Promise<
string | URLSearchParams | FormData,
> /* We don't use URLSearchParams yet but maybe */ {
return new Promise((resolve, reject) => {
processReply(
const abort = processReply(
value,
'',
options && options.temporaryReferences
Expand All @@ -163,6 +163,18 @@ function encodeReply(
resolve,
reject,
);
if (options && options.signal) {
const signal = options.signal;
if (signal.aborted) {
abort((signal: any).reason);
} else {
const listener = () => {
abort((signal: any).reason);
signal.removeEventListener('abort', listener);
};
signal.addEventListener('abort', listener);
}
}
});
}

Expand Down

0 comments on commit 99c056a

Please sign in to comment.