diff --git a/src/__tests__/server.ts b/src/__tests__/server.ts index a6a9fb52..d58fc65c 100644 --- a/src/__tests__/server.ts +++ b/src/__tests__/server.ts @@ -1888,6 +1888,71 @@ describe('Disconnect/close', () => { await waitForComplete(); }); + + it('should dispose of all subscriptions on close even if some return is problematic', async () => { + let resolveReturn: () => void = () => { + throw new Error('Return resolved early'); + }; + let i = 0; + + const { + url, + waitForConnect, + waitForOperation, + waitForComplete, + waitForClientClose, + } = await startTServer({ + onOperation(_ctx, _msg, _args, result) { + const origReturn = (result as AsyncGenerator).return; + (result as AsyncGenerator).return = async (...args) => { + if (++i === 1) { + // slow down the first return + await new Promise((resolve) => (resolveReturn = resolve)); + } + return origReturn(...args); + }; + return result; + }, + }); + + const client = await createTClient(url); + + client.ws.send( + stringifyMessage({ + type: MessageType.ConnectionInit, + }), + ); + await waitForConnect(); + + client.ws.send( + stringifyMessage({ + type: MessageType.Subscribe, + id: '1', + payload: { + query: 'subscription { ping(key: "slow") }', + }, + }), + ); + await waitForOperation(); + + client.ws.send( + stringifyMessage({ + type: MessageType.Subscribe, + id: '2', + payload: { + query: 'subscription { ping(key: "ok") }', + }, + }), + ); + await waitForOperation(); + + client.ws.close(4321, 'Byebye'); + await waitForClientClose(); + + await waitForComplete(); + resolveReturn(); + await waitForComplete(); + }); }); it('should only accept a Set, Array or string in handleProtocol', () => { diff --git a/src/server.ts b/src/server.ts index ff116a7f..8212a602 100644 --- a/src/server.ts +++ b/src/server.ts @@ -848,10 +848,18 @@ export function makeServer< // wait for close, cleanup and the disconnect callback return async (code, reason) => { if (connectionInitWait) clearTimeout(connectionInitWait); - for (const [id, sub] of Object.entries(ctx.subscriptions)) { - if (isAsyncGenerator(sub)) await sub.return(undefined); - delete ctx.subscriptions[id]; // deleting the subscription means no further activity should take place - } + + const subs = { ...ctx.subscriptions }; + // @ts-expect-error: I can write + ctx.subscriptions = {}; // deleting the subscription means no further activity should take place + + // we return all iterable subscriptions immediatelly, independant of the order + await Promise.all( + Object.values(subs) + .filter(isAsyncGenerator) + .map((sub) => sub.return(undefined)), + ); + if (ctx.acknowledged) await onDisconnect?.(ctx, code, reason); await onClose?.(ctx, code, reason); };