Skip to content

Commit

Permalink
feat: add support for aborting requests, uses its own internal abort …
Browse files Browse the repository at this point in the history
…controller
  • Loading branch information
edorgeville committed May 24, 2024
1 parent a306c2e commit 672d641
Showing 1 changed file with 58 additions and 40 deletions.
98 changes: 58 additions & 40 deletions src/link/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ export type TRPCMQTTLinkOptions = {
export const mqttLink = <TRouter extends AnyRouter>(
opts: TRPCMQTTLinkOptions
): TRPCLink<TRouter> => {
// This runs once, at the initialization of the link
return runtime => {
const {
client,
Expand Down Expand Up @@ -50,39 +51,11 @@ export const mqttLink = <TRouter extends AnyRouter>(
}
});

const request = async (message: TRPCMQTTRequest) =>
new Promise<any>((resolve, reject) => {
const correlationId = randomUUID();
const onTimeout = () => {
responseEmitter.off(correlationId, onMessage);
reject(new TRPCClientError('Request timed out after ' + requestTimeoutMs + 'ms'));
};
const timeout = setTimeout(onTimeout, requestTimeoutMs);
const onMessage = (message: TRPCMQTTResponse) => {
clearTimeout(timeout);
resolve(message);
};
responseEmitter.once(correlationId, onMessage);
if (protocolVersion >= 5) {
// MQTT 5.0+, use the correlationData & responseTopic field
const opts = {
properties: {
responseTopic,
correlationData: Buffer.from(correlationId)
}
};
client.publish(requestTopic, JSON.stringify(message), opts);
} else {
// MQTT < 5.0, use the message itself
client.publish(
requestTopic,
JSON.stringify({ ...message, correlationId, responseTopic })
);
}
});

return ({ op }) => {
// This runs every time a procedure is called
return observable(observer => {
const abortController = new AbortController();

const { id, type, path } = op;

try {
Expand Down Expand Up @@ -114,13 +87,56 @@ export const mqttLink = <TRouter extends AnyRouter>(
observer.complete();
};

request({
trpc: {
id,
method: type,
params: { path, input }
}
})
const request = async (message: TRPCMQTTRequest, signal: AbortSignal) =>
new Promise<any>((resolve, reject) => {
const correlationId = randomUUID();
const onTimeout = () => {
responseEmitter.off(correlationId, onMessage);
signal.onabort = null;
reject(new TRPCClientError('Request timed out after ' + requestTimeoutMs + 'ms'));
};
const onAbort = () => {
// This runs when the request is aborted externally
console.log('Aborting request', path);
clearTimeout(timeout);
responseEmitter.off(correlationId, onMessage);
reject(new TRPCClientError('Request aborted'));
};
const timeout = setTimeout(onTimeout, requestTimeoutMs);
signal.onabort = onAbort;
const onMessage = (message: TRPCMQTTResponse) => {
clearTimeout(timeout);
resolve(message);
};
responseEmitter.once(correlationId, onMessage);
if (protocolVersion >= 5) {
// MQTT 5.0+, use the correlationData & responseTopic field
const opts = {
properties: {
responseTopic,
correlationData: Buffer.from(correlationId)
}
};
client.publish(requestTopic, JSON.stringify(message), opts);
} else {
// MQTT < 5.0, use the message itself
client.publish(
requestTopic,
JSON.stringify({ ...message, correlationId, responseTopic })
);
}
});

request(
{
trpc: {
id,
method: type,
params: { path, input }
}
},
abortController.signal
)
.then(onMessage)
.catch(cause => {
observer.error(
Expand All @@ -133,8 +149,10 @@ export const mqttLink = <TRouter extends AnyRouter>(
);
}

// eslint-disable-next-line @typescript-eslint/no-empty-function, prettier/prettier
return () => { };
return () => {
// This runs after every procedure call, whether it was successful, unsuccessful, or externally aborted
abortController.abort();
};
});
};
};
Expand Down

0 comments on commit 672d641

Please sign in to comment.