Skip to content

Commit

Permalink
Merge pull request #70 from taskrabbit/cleanOldWorkers
Browse files Browse the repository at this point in the history
cleanOldWorkers
  • Loading branch information
evantahler committed May 7, 2015
2 parents 9795ae2 + 0a598d7 commit 142f354
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 1 deletion.
75 changes: 75 additions & 0 deletions lib/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,81 @@ queue.prototype.allWorkingOn = function(callback){
});
};

queue.prototype.forceCleanWorker = function(workerName, callback){
var self = this;
self.workers(function(err, workers){
var queues = workers[workerName];
var errorPayload;
if(err){ callback(err); }
else if(!queues){ callback(new Error('worker not round')); }
else{
self.workingOn(workerName, queues, function(err, workingOn){
if(err){ callback(err); }
else if(workingOn){
workingOn = JSON.parse(workingOn);
errorPayload = {
worker: workerName,
queue: workingOn.queue,
payload: workingOn.payload,
exception: 'Worker Timeout (killed manually)',
error: 'Worker Timeout (killed manually)',
backtrace: null,
failed_at: (new Date()).toString()
};
self.connection.redis.incr(self.connection.key('stat', 'failed'));
self.connection.redis.incr(self.connection.key('stat', 'failed', workerName));
self.connection.redis.rpush(self.connection.key('failed'), JSON.stringify(errorPayload));
}

self.connection.redis.del([
self.connection.key('stat', 'failed', workerName),
self.connection.key('stat', 'processed', workerName),
self.connection.key('worker', workerName),
self.connection.redis.srem(self.connection.key('workers'), workerName + ':' + queues)
], function(err, data){
callback(err, errorPayload);
});

});
}
});
};

queue.prototype.cleanOldWorkers = function(age, callback){
// note: this method will remove the data created by a "stuck" worker and move the payload to the error queue
// however, it will not actually remove any processes which may be running. A job *may* be running that you have removed
var self = this;
var results = {};
self.allWorkingOn(function(err, data){
if(err && typeof callback === 'function'){
callback(err);
}else if((!data || hashLength(data) && typeof callback === 'function' ) === 0){
callback(null, results);
}else{
var started = 0;
for(var workerName in data){
started++;
if(Date.now() - Date.parse(data[workerName].run_at) > age){
self.forceCleanWorker(workerName, function(error, errorPayload){
if(errorPayload && errorPayload.worker ){ results[errorPayload.worker] = errorPayload; }
started--;
if(started === 0 && typeof callback === 'function'){
callback(null, results);
}
});
}else{
process.nextTick(function(){
started--;
if(started === 0 && typeof callback === 'function'){
callback(null, results);
}
});
}
}
}
});
};

queue.prototype.failedCount = function(callback){
var self = this;
self.connection.redis.llen(self.connection.key('failed'), function(err, length){
Expand Down
63 changes: 62 additions & 1 deletion test/core/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -402,7 +402,7 @@ describe('queue', function(){
describe('worker status', function(){
var workerA;
var workerB;
var timeout = 100;
var timeout = 500;

var jobs = {
"slowJob": {
Expand Down Expand Up @@ -479,6 +479,67 @@ describe('queue', function(){
queue.enqueue(specHelper.queue, "slowJob");
workerA.start();
});

it('can remove stuck workers', function(done){
var age = 1;
var listener = workerA.on('job', function(q, job, failure){
workerA.removeAllListeners('job');

queue.allWorkingOn(function(err, data){
var paylaod = data['workerA'].payload;
paylaod.queue.should.equal('test_queue');
paylaod.class.should.equal('slowJob');

queue.cleanOldWorkers(age, function(err, data){
should.not.exist(err);
Object.keys(data).length.should.equal(1);
data.workerA.queue.should.equal('test_queue');
data.workerA.worker.should.equal('workerA');
data.workerA.payload.class.should.equal('slowJob');

specHelper.redis.rpop(specHelper.namespace + ":" + "failed", function(err, data){
data = JSON.parse(data);
data.queue.should.equal(specHelper.queue);
data.exception.should.equal('Worker Timeout (killed manually)');
data.error.should.equal('Worker Timeout (killed manually)');
data.payload.class.should.equal('slowJob');

queue.allWorkingOn(function(err, data){
Object.keys(data).length.should.equal(1);
data.workerB.should.equal('started');
done();
});
});
});
});
});

queue.enqueue(specHelper.queue, "slowJob");
workerA.start();
});

it('will not remove stuck jobs within the timelimit', function(done){
var age = 999;
var listener = workerA.on('job', function(q, job, failure){
workerA.removeAllListeners('job');

queue.cleanOldWorkers(age, function(err, data){
should.not.exist(err);
Object.keys(data).length.should.equal(0);
queue.allWorkingOn(function(err, data){
var paylaod = data['workerA'].payload;
paylaod.queue.should.equal('test_queue');
paylaod.class.should.equal('slowJob');

done();
});
});
});

queue.enqueue(specHelper.queue, "slowJob");
workerA.start();
});

});

});
Expand Down

0 comments on commit 142f354

Please sign in to comment.