Skip to content

Commit

Permalink
Merge pull request #1 from raw-ash/revamp
Browse files Browse the repository at this point in the history
Revamp
  • Loading branch information
raw-ash authored Nov 16, 2020
2 parents c3ddfaa + 2966299 commit b886f61
Show file tree
Hide file tree
Showing 19 changed files with 549 additions and 440 deletions.
5 changes: 3 additions & 2 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
.idea/
node_modules/
.idea
node_modules
.devcontainer
package-lock.json
45 changes: 22 additions & 23 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[![](https://img.shields.io/bower/l/mi.svg?style=for-the-badge)](https://github.com/BashWorld/mpu)
[![](https://img.shields.io/bower/l/mi.svg?style=for-the-badge)](https://github.com/raw-ash/mpu)

Helps in passing messages within a cluster between
1) Master to worker(s)
Expand All @@ -13,34 +13,39 @@ Make sure that the `mp-unit.init` function for the master is called after the cl

```js
let os = require('os');
let cluster = require('cluster');
let mpu = require('mp-unit');
let totalWorkers = os.cpus().length;
let cluster = require('cluster');
let totalWorkers = 3;

function logWithTabs(workerId,message){
const tabString = '\t\t\t'.repeat(workerId-1);
console.log(tabString+message);
}

let FUN_KEY = "CHECK";
let MSG_MAP = {};
let fun = function (){
console.log("FROM WORKER "+cluster.worker.id+" := "+process.pid);
return Promise.resolve();
let MSG_MAP = {
[FUN_KEY]: (workerId) => {
logWithTabs(workerId,`[${workerId}] In Worker ${cluster.worker.id}`);
return Promise.resolve();
}
};
MSG_MAP[FUN_KEY] = fun;

let params = {
WORKERS : totalWorkers,
MSG_MAP : MSG_MAP,
TIMEOUT : 10
};
function init(){
mpu.init(MSG_MAP,{FAMILY_SIZE:totalWorkers});
}

if(cluster.isMaster){
for(let i = 0;i<totalWorkers;++i){
cluster.fork();
}
mpu.init(params);
init();
}
else if(cluster.isWorker){
mpu.init(params);
mpu.sendW2WS(FUN_KEY)
init();
logWithTabs(cluster.worker.id,`[${cluster.worker.id}] Start`);
mpu.sendToSiblings({message:FUN_KEY, siblingIds:-1, value:cluster.worker.id})
.then(function (){
logWithTabs(cluster.worker.id,`[${cluster.worker.id}] End`);
cluster.worker.disconnect();
});
}
Expand All @@ -53,10 +58,4 @@ around in the cluster.
The above example shows how a single worker can ask all worker processes (including itself)
to execute a function. Even though every worker process is trying to ask all worker processes
to execute the `fun` function, `mp-unit` module makes sure that only a single call goes to all processes.
Thus, working as though only a single process called the `sendW2WS` (Worker-To-Workers) function.

### Updates
Added sendW2M (Worker-to-Master) function
(future update will add a flag to make sure that only a single call is executed,
if called from all the workers
)
Thus, working as though only a single process called the `sendToSiblings` (Worker-To-Workers) function.
20 changes: 9 additions & 11 deletions index.js
Original file line number Diff line number Diff line change
@@ -1,11 +1,9 @@
exports.init = require('./src/init').init;
exports.sendW2WS = require('./src/main').w2ws;
exports.sendW2M = require('./src/main').w2m;
exports.sendM2W = require('./src/main').m2w;
exports.addToMap = require('./src/constants').addToMap;
exports.removeFromMap = require('./src/constants').removeFromMap;
let constants = require('./src/constants');
exports.setFailureMessage = constants.setFailureMessage;
exports.setProgressMessage = constants.setProgressMessage;
exports.setTimout = constants.setTimeout;
exports.setTotalWorkers = constants.setTotalWorkers;
const init = require('./src/init');
const main = require('./src/main');

module.exports = {
init : init.init,
sendToMaster : main.sendToMaster,
sendToSibling : main.sendToSibling,
sendToSiblings : main.sendToSiblings
};
101 changes: 0 additions & 101 deletions package-lock.json

This file was deleted.

8 changes: 2 additions & 6 deletions package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "mp-unit",
"version": "1.4.0",
"version": "2.0.0",
"description": "Utility to pass messages to other processes within a cluster",
"license": "MIT",
"repository": "BashWorld/mpu",
Expand All @@ -11,9 +11,5 @@
"message",
"passing"
],
"dependencies": {
"cluster": "^0.7.7",
"os": "^0.1.1",
"q": "^1.5.1"
}
"dependencies": {}
}
18 changes: 18 additions & 0 deletions src/addChildListeners.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
const MSG_HELPER = require('./msgHelper');
const JOB_TRACKER = require('./jobTracker');
const executeMessage = require('./executeMessage');

function addChildListeners(childId){
process.on('message',function (message) {
if(MSG_HELPER.isGoingForward(message)){
if(MSG_HELPER.isMessageForThisChild(childId,message)){
executeMessage(message);
}
}
else{
JOB_TRACKER.stopTracking(message);
}
});
}

module.exports = addChildListeners;
89 changes: 89 additions & 0 deletions src/addParentListeners.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
const cluster = require('cluster');
const MSG_HELPER = require('./msgHelper');
const MSG_CONSTANTS = require('./constants');
const executeMessage = require('./executeMessage');

function isValidSibling(siblingId){
return cluster.workers[siblingId] !== undefined;
}

function forwardMessageToSibling(message, worker){
const siblingId = MSG_HELPER.getSiblingId(message);
if(isValidSibling(siblingId)){
cluster.workers[siblingId].send(message);
}
else{
worker.send(MSG_CONSTANTS.ERROR_MESSAGE.SIBLING_MIA);
}
}

function forwardMessageToGivenSiblings(message, worker){
const siblingIds = MSG_HELPER.getSiblingIds(message);
const validSiblingIds = siblingIds.filter(isValidSibling);
if(MSG_HELPER.ignoreChecks(message)){
MSG_HELPER.setSiblingIds(message, validSiblingIds);
validSiblingIds.forEach(siblingId => {
cluster.workers[siblingId].send(message);
});
}
else{
if(siblingIds.length === validSiblingIds.length){
siblingIds.forEach(siblingId => {
cluster.workers[siblingId].send(message);
});
}
else{
worker.send(MSG_CONSTANTS.ERROR_MESSAGE.SIBLINGS_MIA);
}
}
}

function forwardMessageToAllSiblings(message){
for(const workerId in cluster.workers){
cluster.workers[workerId].send(message);
}
}

function forwardMessage(message,worker){
if(MSG_HELPER.isMessageForParent(message)){
executeMessage(message,worker);
}
else if(MSG_HELPER.isMessageForAllSiblings(message)){
forwardMessageToAllSiblings(message);
}
else if(MSG_HELPER.isMessageForSiblings(message)){
forwardMessageToGivenSiblings(message,worker);
}
else if(MSG_HELPER.isMessageForSibling(message)){
forwardMessageToSibling(message,worker)
}
else{
worker.send(MSG_CONSTANTS.ERROR_MESSAGE.INVALID_MSG);
}
}

function sendBackMessage(message){
if(MSG_HELPER.isMessageGoingBackToSibling(message)){
const siblingId = MSG_HELPER.getSourceSiblingId(message);
if(MSG_HELPER.isMessageForAllSiblings(message)){
message.dest.numOfWorkers = Object.keys(cluster.workers).length;
}
cluster.workers[siblingId].send(message);
}
}

function addParentListeners(){
for(let workerId in cluster.workers){
let worker = cluster.workers[workerId];
worker.on('message', function (message) {
if(MSG_HELPER.isGoingForward(message)){
forwardMessage(message,worker);
}
else{
sendBackMessage(message);
}
});
}
}

module.exports = addParentListeners;
Loading

0 comments on commit b886f61

Please sign in to comment.