Skip to content

Commit a72cf28

Browse files
authored
Merge pull request #81 from dlecocq/dan/heartbeat-worker-count
Heartbeat Worker Registration
2 parents 9d2cca3 + 235ef51 commit a72cf28

File tree

2 files changed

+21
-0
lines changed

2 files changed

+21
-0
lines changed

job.lua

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -652,6 +652,9 @@ function QlessJob:heartbeat(now, worker, data)
652652
-- Add this job to the list of jobs handled by this worker
653653
redis.call('zadd', 'ql:w:' .. worker .. ':jobs', expires, self.jid)
654654

655+
-- Make sure we this worker to the list of seen workers
656+
redis.call('zadd', 'ql:workers', now, worker)
657+
655658
-- And now we should just update the locks
656659
local queue = Qless.queue(
657660
redis.call('hget', QlessJob.ns .. self.jid, 'queue'))

test/test_worker.py

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,24 @@ def test_basic(self):
2424
'stalled': 0
2525
}])
2626

27+
def test_worker_registration_lapse(self):
28+
'''Workers that fail to check in expire from the list of active workers'''
29+
self.lua('config.set', 0, 'max-worker-age', 10)
30+
self.lua('pop', 1, 'queue', 'worker', 1)
31+
self.assertEqual(self.lua('workers', 15), {})
32+
33+
def test_worker_heartbeat_registration(self):
34+
'''Heartbeating registers a worker as being active.'''
35+
self.lua('config.set', 0, 'max-worker-age', 10)
36+
self.lua('put', 0, 'worker', 'queue', 'jid', 'klass', {}, 0)
37+
self.lua('pop', 1, 'queue', 'worker', 1)
38+
self.lua('heartbeat', 10, 'jid', 'worker', {})
39+
self.assertEqual(self.lua('workers', 15), [{
40+
'name': 'worker',
41+
'jobs': 1,
42+
'stalled': 0,
43+
}])
44+
2745
def test_stalled(self):
2846
'''We should be able to detect stalled jobs'''
2947
self.lua('put', 0, 'worker', 'queue', 'jid', 'klass', {}, 0)

0 commit comments

Comments
 (0)