Skip to content

Commit

Permalink
a few multiqueue fixes
Browse files Browse the repository at this point in the history
do not use sharding worker mode
make sure mqids are ints and not strings
set the mq callback before sending the message
delete callbacks afterwards to prevent memory leak
  • Loading branch information
giorgi-o committed Mar 31, 2023
1 parent d06d59e commit 24c2afc
Showing 1 changed file with 9 additions and 3 deletions.
12 changes: 9 additions & 3 deletions misc/multiqueue.js
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ export const useMultiqueue = () => config.useMultiqueue && client.shard && clien

let mqMessageId = 0;
const callbacks = {};
const setCallback = (mqid, callback) => callbacks[mqid] = callback;
const setCallback = (mqid, callback) => callbacks[parseInt(mqid)] = callback;

export const sendMQRequest = async (type, params={}, callback=()=>{}) => {
const message = {
Expand All @@ -25,8 +25,8 @@ export const sendMQRequest = async (type, params={}, callback=()=>{}) => {
params
}

await sendShardMessage(message);
setCallback(mqMessageId, callback);
await sendShardMessage(message);
}

export const sendMQResponse = async (mqid, params={}) => {
Expand All @@ -48,9 +48,15 @@ export const handleMQRequest = async (message) => {
export const handleMQResponse = async (message) => {
const [shardId, mqid] = message.mqid.split(":");

// check the response is intended for this shard
if(!client.shard.ids.includes(parseInt(shardId))) return;
if(!callbacks[mqid]) return;

// check we have a callback registered
if(!callbacks[mqid]) return console.error(`No callback registered for MQ response ${message.mqid}!`);

// do the thing
callbacks[mqid](message);
delete callbacks[mqid];
}


Expand Down

0 comments on commit 24c2afc

Please sign in to comment.