Skip to content

Commit

Permalink
fix(server): Return all subscriptions regardless of the return invoca…
Browse files Browse the repository at this point in the history
…tion order
  • Loading branch information
enisdenjo committed Mar 27, 2024
1 parent 100cd69 commit f442288
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 4 deletions.
65 changes: 65 additions & 0 deletions src/__tests__/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1888,6 +1888,71 @@ describe('Disconnect/close', () => {

await waitForComplete();
});

it('should dispose of all subscriptions on close even if some return is problematic', async () => {
let resolveReturn: () => void = () => {
throw new Error('Return resolved early');
};
let i = 0;

const {
url,
waitForConnect,
waitForOperation,
waitForComplete,
waitForClientClose,
} = await startTServer({
onOperation(_ctx, _msg, _args, result) {
const origReturn = (result as AsyncGenerator).return;
(result as AsyncGenerator).return = async (...args) => {
if (++i === 1) {
// slow down the first return
await new Promise<void>((resolve) => (resolveReturn = resolve));
}
return origReturn(...args);
};
return result;
},
});

const client = await createTClient(url);

client.ws.send(
stringifyMessage<MessageType.ConnectionInit>({
type: MessageType.ConnectionInit,
}),
);
await waitForConnect();

client.ws.send(
stringifyMessage<MessageType.Subscribe>({
type: MessageType.Subscribe,
id: '1',
payload: {
query: 'subscription { ping(key: "slow") }',
},
}),
);
await waitForOperation();

client.ws.send(
stringifyMessage<MessageType.Subscribe>({
type: MessageType.Subscribe,
id: '2',
payload: {
query: 'subscription { ping(key: "ok") }',
},
}),
);
await waitForOperation();

client.ws.close(4321, 'Byebye');
await waitForClientClose();

await waitForComplete();
resolveReturn();
await waitForComplete();
});
});

it('should only accept a Set, Array or string in handleProtocol', () => {
Expand Down
16 changes: 12 additions & 4 deletions src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -848,10 +848,18 @@ export function makeServer<
// wait for close, cleanup and the disconnect callback
return async (code, reason) => {
if (connectionInitWait) clearTimeout(connectionInitWait);
for (const [id, sub] of Object.entries(ctx.subscriptions)) {
if (isAsyncGenerator(sub)) await sub.return(undefined);
delete ctx.subscriptions[id]; // deleting the subscription means no further activity should take place
}

const subs = { ...ctx.subscriptions };
// @ts-expect-error: I can write
ctx.subscriptions = {}; // deleting the subscription means no further activity should take place

// we return all iterable subscriptions immediatelly, independant of the order
await Promise.all(
Object.values(subs)
.filter(isAsyncGenerator)
.map((sub) => sub.return(undefined)),
);

if (ctx.acknowledged) await onDisconnect?.(ctx, code, reason);
await onClose?.(ctx, code, reason);
};
Expand Down

0 comments on commit f442288

Please sign in to comment.