Skip to content

Commit

Permalink
send messages by batch
Browse files Browse the repository at this point in the history
  • Loading branch information
ggazzo committed Apr 2, 2020
1 parent 2174d01 commit 41d795b
Showing 1 changed file with 48 additions and 36 deletions.
84 changes: 48 additions & 36 deletions lib/server/push.api.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,15 @@ Push.setBadge = function(/* id, count */) {

var isConfigured = false;

var sendWorker = function(task, interval) {
var scheduleTask = function(task, interval) {
if (typeof Push.Log === 'function') {
Push.Log('Push: Send worker started, using interval:', interval);
}
if (Push.debug) {
console.log('Push: Send worker started, using interval: ' + interval);
}

return Meteor.setInterval(function() {
return Meteor.setTimeout(function() {
// xxx: add exponential backoff on error
try {
task();
Expand Down Expand Up @@ -668,41 +668,46 @@ Push.Configure = function(options) {
} // Else could not reserve
}; // EO sendNotification

sendWorker(function() {
const processQueue = async function() {

if (isSendingNotification) {
return;
}
if (isSendingNotification) {
return;
}

try {
// Set send fence
isSendingNotification = true;

// Set send fence
isSendingNotification = true;
// var countSent = 0;
var batchSize = options.sendBatchSize || 1;

// var countSent = 0;
var batchSize = options.sendBatchSize || 1;
var now = +new Date();

var now = +new Date();
// Find notifications that are not being or already sent
var pendingNotifications = Push.notifications.find({ $and: [
// Message is not sent
{ sent : false },
// And not being sent by other instances
{ sending: { $lt: now } },
// And not queued for future
{ $or: [
{ delayUntil: { $exists: false } },
{ delayUntil: { $lte: new Date() } }
]
}
]}, {
// Sort by created date
sort: { createdAt: 1 },
limit: batchSize
}).fetch();

// Find notifications that are not being or already sent
var pendingNotifications = Push.notifications.find({ $and: [
// Message is not sent
{ sent : false },
// And not being sent by other instances
{ sending: { $lt: now } },
// And not queued for future
{ $or: [
{ delayUntil: { $exists: false } },
{ delayUntil: { $lte: new Date() } }
]
}
]}, {
// Sort by created date
sort: { createdAt: 1 },
limit: batchSize
});

pendingNotifications.forEach(function(notification) {
if(pendingNotifications.length === 0) {
return scheduleTask(processQueue, options.sendInterval || 15000);
}

await Promise.all(pendingNotifications.map(function(notification) {
return new Promise((resolve, reject) => {
Meteor.defer(() => {
try {
sendNotification(notification);
} catch(error) {
Expand All @@ -712,14 +717,21 @@ Push.Configure = function(options) {
if (Push.debug) {
console.log('Push: Could not send notification id: "' + notification._id + '", Error: ' + error.message);
}
} finally {
resolve();
}
}); // EO forEach
} finally {
});
})
})); // EO forEach

// Remove the send fence
isSendingNotification = false;
}
}, options.sendInterval || 15000); // Default every 15th sec
// Remove the send fence
isSendingNotification = false;

processQueue(); // continue asap

}

scheduleTask(processQueue, options.sendInterval || 15000);

} else {
if (Push.debug) {
Expand Down

0 comments on commit 41d795b

Please sign in to comment.