From 07780053e65939fa6c1c136d94bd7aa26f862381 Mon Sep 17 00:00:00 2001 From: Erwan d'Orgeville Date: Fri, 24 May 2024 08:28:59 -0400 Subject: [PATCH 1/6] feat(test): add abortSignal test --- test/appRouter.ts | 6 +++++- test/index.test.ts | 13 +++++++++++++ 2 files changed, 18 insertions(+), 1 deletion(-) diff --git a/test/appRouter.ts b/test/appRouter.ts index cb8c42d..6e12747 100644 --- a/test/appRouter.ts +++ b/test/appRouter.ts @@ -24,5 +24,9 @@ export const appRouter = router({ .mutation(({ input }) => { state.count += input; return state.count; - }) + }), + takesASecondToResolve: publicProcedure.query(async () => { + await new Promise(resolve => setTimeout(resolve, 2000)); + return 'done'; + }) }); diff --git a/test/index.test.ts b/test/index.test.ts index a05586e..4a18713 100644 --- a/test/index.test.ts +++ b/test/index.test.ts @@ -57,6 +57,19 @@ test('countUp mutation', async () => { expect(addTwo).toBe(3); }); +test('abortSignal is handled & event listeners cleaned up', async () => { + const controller = new AbortController(); + const promise = client.takesASecondToResolve.query(undefined, { + signal: controller.signal + }); + + controller.abort(); + await expect(promise).rejects.toThrow('aborted'); + + // Only the server should still be listening, client should have cleaned up + expect(mqttClient.listeners('message').length).toBe(1); +}); + afterAll(async () => { mqttClient.end(); broker.close(); From f5440042d820cbfdbd26181632766ce099419cef Mon Sep 17 00:00:00 2001 From: Erwan d'Orgeville Date: Fri, 24 May 2024 09:27:33 -0400 Subject: [PATCH 2/6] fix(test): remove wrong event listener count test, rely on Jest open handle detection instead --- test/index.test.ts | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/test/index.test.ts b/test/index.test.ts index 4a18713..7407668 100644 --- a/test/index.test.ts +++ b/test/index.test.ts @@ -57,7 +57,7 @@ test('countUp mutation', async () => { expect(addTwo).toBe(3); }); -test('abortSignal is handled & event listeners cleaned up', async () => { +test('abortSignal is handled', async () => { const controller = new AbortController(); const promise = client.takesASecondToResolve.query(undefined, { signal: controller.signal @@ -65,9 +65,6 @@ test('abortSignal is handled & event listeners cleaned up', async () => { controller.abort(); await expect(promise).rejects.toThrow('aborted'); - - // Only the server should still be listening, client should have cleaned up - expect(mqttClient.listeners('message').length).toBe(1); }); afterAll(async () => { From f9ca6da527967d2efed47ad8ac733c3d14e5d83f Mon Sep 17 00:00:00 2001 From: Erwan d'Orgeville Date: Fri, 24 May 2024 09:27:58 -0400 Subject: [PATCH 3/6] fix(test): rename procedure to `slow` --- test/appRouter.ts | 2 +- test/index.test.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/test/appRouter.ts b/test/appRouter.ts index 6e12747..db85e59 100644 --- a/test/appRouter.ts +++ b/test/appRouter.ts @@ -25,7 +25,7 @@ export const appRouter = router({ state.count += input; return state.count; }), - takesASecondToResolve: publicProcedure.query(async () => { + slow: publicProcedure.query(async () => { await new Promise(resolve => setTimeout(resolve, 2000)); return 'done'; }) diff --git a/test/index.test.ts b/test/index.test.ts index 7407668..1293c17 100644 --- a/test/index.test.ts +++ b/test/index.test.ts @@ -59,7 +59,7 @@ test('countUp mutation', async () => { test('abortSignal is handled', async () => { const controller = new AbortController(); - const promise = client.takesASecondToResolve.query(undefined, { + const promise = client.slow.query(undefined, { signal: controller.signal }); From a306c2e0f297d614f253d511a8b6d296bbbe1018 Mon Sep 17 00:00:00 2001 From: Erwan d'Orgeville Date: Fri, 24 May 2024 09:29:06 -0400 Subject: [PATCH 4/6] fix(test): increase `slow` to 10s --- test/appRouter.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/appRouter.ts b/test/appRouter.ts index db85e59..fe00d48 100644 --- a/test/appRouter.ts +++ b/test/appRouter.ts @@ -26,7 +26,7 @@ export const appRouter = router({ return state.count; }), slow: publicProcedure.query(async () => { - await new Promise(resolve => setTimeout(resolve, 2000)); + await new Promise(resolve => setTimeout(resolve, 10 * 1000)); return 'done'; }) }); From 672d641ad29059f67d89961213126ca7745afb20 Mon Sep 17 00:00:00 2001 From: Erwan d'Orgeville Date: Fri, 24 May 2024 09:29:44 -0400 Subject: [PATCH 5/6] feat: add support for aborting requests, uses its own internal abort controller --- src/link/index.ts | 98 ++++++++++++++++++++++++++++------------------- 1 file changed, 58 insertions(+), 40 deletions(-) diff --git a/src/link/index.ts b/src/link/index.ts index a731f92..4948c29 100644 --- a/src/link/index.ts +++ b/src/link/index.ts @@ -17,6 +17,7 @@ export type TRPCMQTTLinkOptions = { export const mqttLink = ( opts: TRPCMQTTLinkOptions ): TRPCLink => { + // This runs once, at the initialization of the link return runtime => { const { client, @@ -50,39 +51,11 @@ export const mqttLink = ( } }); - const request = async (message: TRPCMQTTRequest) => - new Promise((resolve, reject) => { - const correlationId = randomUUID(); - const onTimeout = () => { - responseEmitter.off(correlationId, onMessage); - reject(new TRPCClientError('Request timed out after ' + requestTimeoutMs + 'ms')); - }; - const timeout = setTimeout(onTimeout, requestTimeoutMs); - const onMessage = (message: TRPCMQTTResponse) => { - clearTimeout(timeout); - resolve(message); - }; - responseEmitter.once(correlationId, onMessage); - if (protocolVersion >= 5) { - // MQTT 5.0+, use the correlationData & responseTopic field - const opts = { - properties: { - responseTopic, - correlationData: Buffer.from(correlationId) - } - }; - client.publish(requestTopic, JSON.stringify(message), opts); - } else { - // MQTT < 5.0, use the message itself - client.publish( - requestTopic, - JSON.stringify({ ...message, correlationId, responseTopic }) - ); - } - }); - return ({ op }) => { + // This runs every time a procedure is called return observable(observer => { + const abortController = new AbortController(); + const { id, type, path } = op; try { @@ -114,13 +87,56 @@ export const mqttLink = ( observer.complete(); }; - request({ - trpc: { - id, - method: type, - params: { path, input } - } - }) + const request = async (message: TRPCMQTTRequest, signal: AbortSignal) => + new Promise((resolve, reject) => { + const correlationId = randomUUID(); + const onTimeout = () => { + responseEmitter.off(correlationId, onMessage); + signal.onabort = null; + reject(new TRPCClientError('Request timed out after ' + requestTimeoutMs + 'ms')); + }; + const onAbort = () => { + // This runs when the request is aborted externally + console.log('Aborting request', path); + clearTimeout(timeout); + responseEmitter.off(correlationId, onMessage); + reject(new TRPCClientError('Request aborted')); + }; + const timeout = setTimeout(onTimeout, requestTimeoutMs); + signal.onabort = onAbort; + const onMessage = (message: TRPCMQTTResponse) => { + clearTimeout(timeout); + resolve(message); + }; + responseEmitter.once(correlationId, onMessage); + if (protocolVersion >= 5) { + // MQTT 5.0+, use the correlationData & responseTopic field + const opts = { + properties: { + responseTopic, + correlationData: Buffer.from(correlationId) + } + }; + client.publish(requestTopic, JSON.stringify(message), opts); + } else { + // MQTT < 5.0, use the message itself + client.publish( + requestTopic, + JSON.stringify({ ...message, correlationId, responseTopic }) + ); + } + }); + + request( + { + trpc: { + id, + method: type, + params: { path, input } + } + }, + abortController.signal + ) .then(onMessage) .catch(cause => { observer.error( @@ -133,8 +149,10 @@ export const mqttLink = ( ); } - // eslint-disable-next-line @typescript-eslint/no-empty-function, prettier/prettier - return () => { }; + return () => { + // This runs after every procedure call, whether it was successful, unsuccessful, or externally aborted + abortController.abort(); + }; }); }; }; From 309b44186ef4c17e6c4296b827ba72e53504e228 Mon Sep 17 00:00:00 2001 From: Erwan d'Orgeville Date: Fri, 24 May 2024 09:35:55 -0400 Subject: [PATCH 6/6] fix: remove leftover console.log --- src/link/index.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/src/link/index.ts b/src/link/index.ts index 4948c29..e3d45d8 100644 --- a/src/link/index.ts +++ b/src/link/index.ts @@ -97,7 +97,6 @@ export const mqttLink = ( }; const onAbort = () => { // This runs when the request is aborted externally - console.log('Aborting request', path); clearTimeout(timeout); responseEmitter.off(correlationId, onMessage); reject(new TRPCClientError('Request aborted'));