Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: abort signal support #4

Merged
merged 6 commits into from
May 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
97 changes: 57 additions & 40 deletions src/link/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ export type TRPCMQTTLinkOptions = {
export const mqttLink = <TRouter extends AnyRouter>(
opts: TRPCMQTTLinkOptions
): TRPCLink<TRouter> => {
// This runs once, at the initialization of the link
return runtime => {
const {
client,
Expand Down Expand Up @@ -50,39 +51,11 @@ export const mqttLink = <TRouter extends AnyRouter>(
}
});

const request = async (message: TRPCMQTTRequest) =>
new Promise<any>((resolve, reject) => {
const correlationId = randomUUID();
const onTimeout = () => {
responseEmitter.off(correlationId, onMessage);
reject(new TRPCClientError('Request timed out after ' + requestTimeoutMs + 'ms'));
};
const timeout = setTimeout(onTimeout, requestTimeoutMs);
const onMessage = (message: TRPCMQTTResponse) => {
clearTimeout(timeout);
resolve(message);
};
responseEmitter.once(correlationId, onMessage);
if (protocolVersion >= 5) {
// MQTT 5.0+, use the correlationData & responseTopic field
const opts = {
properties: {
responseTopic,
correlationData: Buffer.from(correlationId)
}
};
client.publish(requestTopic, JSON.stringify(message), opts);
} else {
// MQTT < 5.0, use the message itself
client.publish(
requestTopic,
JSON.stringify({ ...message, correlationId, responseTopic })
);
}
});

return ({ op }) => {
// This runs every time a procedure is called
return observable(observer => {
const abortController = new AbortController();

const { id, type, path } = op;

try {
Expand Down Expand Up @@ -114,13 +87,55 @@ export const mqttLink = <TRouter extends AnyRouter>(
observer.complete();
};

request({
trpc: {
id,
method: type,
params: { path, input }
}
})
const request = async (message: TRPCMQTTRequest, signal: AbortSignal) =>
new Promise<any>((resolve, reject) => {
const correlationId = randomUUID();
const onTimeout = () => {
responseEmitter.off(correlationId, onMessage);
signal.onabort = null;
reject(new TRPCClientError('Request timed out after ' + requestTimeoutMs + 'ms'));
};
const onAbort = () => {
// This runs when the request is aborted externally
clearTimeout(timeout);
responseEmitter.off(correlationId, onMessage);
reject(new TRPCClientError('Request aborted'));
};
const timeout = setTimeout(onTimeout, requestTimeoutMs);
signal.onabort = onAbort;
const onMessage = (message: TRPCMQTTResponse) => {
clearTimeout(timeout);
resolve(message);
};
responseEmitter.once(correlationId, onMessage);
if (protocolVersion >= 5) {
// MQTT 5.0+, use the correlationData & responseTopic field
const opts = {
properties: {
responseTopic,
correlationData: Buffer.from(correlationId)
}
};
client.publish(requestTopic, JSON.stringify(message), opts);
} else {
// MQTT < 5.0, use the message itself
client.publish(
requestTopic,
JSON.stringify({ ...message, correlationId, responseTopic })
);
}
});

request(
{
trpc: {
id,
method: type,
params: { path, input }
}
},
abortController.signal
)
.then(onMessage)
.catch(cause => {
observer.error(
Expand All @@ -133,8 +148,10 @@ export const mqttLink = <TRouter extends AnyRouter>(
);
}

// eslint-disable-next-line @typescript-eslint/no-empty-function, prettier/prettier
return () => { };
return () => {
// This runs after every procedure call, whether it was successful, unsuccessful, or externally aborted
abortController.abort();
};
});
};
};
Expand Down
6 changes: 5 additions & 1 deletion test/appRouter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,5 +24,9 @@ export const appRouter = router({
.mutation(({ input }) => {
state.count += input;
return state.count;
})
}),
slow: publicProcedure.query(async () => {
await new Promise(resolve => setTimeout(resolve, 10 * 1000));
return 'done';
})
});
10 changes: 10 additions & 0 deletions test/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,16 @@ test('countUp mutation', async () => {
expect(addTwo).toBe(3);
});

test('abortSignal is handled', async () => {
const controller = new AbortController();
const promise = client.slow.query(undefined, {
signal: controller.signal
});

controller.abort();
await expect(promise).rejects.toThrow('aborted');
});

afterAll(async () => {
mqttClient.end();
broker.close();
Expand Down
Loading