Skip to content

Commit 04fe712

Browse files
committed
fix(dynamic-rate-limit): validate job existence
1 parent 347b618 commit 04fe712

File tree

3 files changed

+86
-25
lines changed

3 files changed

+86
-25
lines changed

src/classes/scripts.ts

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1387,13 +1387,11 @@ export class Scripts {
13871387
*/
13881388
async moveJobFromActiveToWait(jobId: string, token: string) {
13891389
const client = await this.queue.client;
1390-
const lockKey = `${this.queue.toKey(jobId)}:lock`;
13911390

13921391
const keys: (string | number)[] = [
13931392
this.queue.keys.active,
13941393
this.queue.keys.wait,
13951394
this.queue.keys.stalled,
1396-
lockKey,
13971395
this.queue.keys.paused,
13981396
this.queue.keys.meta,
13991397
this.queue.keys.limiter,
@@ -1404,13 +1402,22 @@ export class Scripts {
14041402

14051403
const args = [jobId, token, this.queue.toKey(jobId)];
14061404

1407-
const pttl = await this.execCommand(
1405+
const result = await this.execCommand(
14081406
client,
14091407
'moveJobFromActiveToWait',
14101408
keys.concat(args),
14111409
);
14121410

1413-
return pttl < 0 ? 0 : pttl;
1411+
if (result < 0) {
1412+
throw this.finishedErrors({
1413+
code: result,
1414+
jobId,
1415+
command: 'moveJobFromActiveToWait',
1416+
state: 'active',
1417+
});
1418+
}
1419+
1420+
return result;
14141421
}
14151422

14161423
async obliterate(opts: { force: boolean; count: number }): Promise<number> {

src/commands/moveJobFromActiveToWait-10.lua renamed to src/commands/moveJobFromActiveToWait-9.lua

Lines changed: 28 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -5,13 +5,12 @@
55
KEYS[2] wait key
66
77
KEYS[3] stalled key
8-
KEYS[4] job lock key
9-
KEYS[5] paused key
10-
KEYS[6] meta key
11-
KEYS[7] limiter key
12-
KEYS[8] prioritized key
13-
KEYS[9] marker key
14-
KEYS[10] event key
8+
KEYS[4] paused key
9+
KEYS[5] meta key
10+
KEYS[6] limiter key
11+
KEYS[7] prioritized key
12+
KEYS[8] marker key
13+
KEYS[9] event key
1514
1615
ARGV[1] job id
1716
ARGV[2] lock token
@@ -24,37 +23,45 @@ local rcall = redis.call
2423
--- @include "includes/pushBackJobWithPriority"
2524
--- @include "includes/getOrSetMaxEvents"
2625
--- @include "includes/getTargetQueueList"
26+
--- @include "includes/removeLock"
2727

2828
local jobId = ARGV[1]
2929
local token = ARGV[2]
30-
local lockKey = KEYS[4]
30+
local jobKey = ARGV[3]
3131

32-
local lockToken = rcall("GET", lockKey)
33-
local pttl = rcall("PTTL", KEYS[7])
34-
if lockToken == token then
35-
local metaKey = KEYS[6]
32+
if rcall("EXISTS", jobKey) == 1 then
33+
local errorCode = removeLock(jobKey, KEYS[3], token, jobId)
34+
if errorCode < 0 then
35+
return errorCode
36+
end
37+
38+
local metaKey = KEYS[5]
3639
local removed = rcall("LREM", KEYS[1], 1, jobId)
3740
if removed > 0 then
38-
local target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[1], KEYS[2], KEYS[5])
39-
40-
rcall("SREM", KEYS[3], jobId)
41+
local target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[1], KEYS[2], KEYS[4])
4142

4243
local priority = tonumber(rcall("HGET", ARGV[3], "priority")) or 0
4344

4445
if priority > 0 then
45-
pushBackJobWithPriority(KEYS[8], priority, jobId)
46+
pushBackJobWithPriority(KEYS[7], priority, jobId)
4647
else
47-
addJobInTargetList(target, KEYS[9], "RPUSH", isPausedOrMaxed, jobId)
48+
addJobInTargetList(target, KEYS[8], "RPUSH", isPausedOrMaxed, jobId)
4849
end
4950

50-
rcall("DEL", lockKey)
51-
5251
local maxEvents = getOrSetMaxEvents(metaKey)
5352

5453
-- Emit waiting event
55-
rcall("XADD", KEYS[10], "MAXLEN", "~", maxEvents, "*", "event", "waiting",
54+
rcall("XADD", KEYS[9], "MAXLEN", "~", maxEvents, "*", "event", "waiting",
5655
"jobId", jobId)
5756
end
57+
else
58+
return -1
5859
end
5960

60-
return pttl
61+
local pttl = rcall("PTTL", KEYS[6])
62+
63+
if pttl > 0 then
64+
return pttl
65+
else
66+
return 0
67+
end

tests/test_rate_limiter.ts

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -423,6 +423,53 @@ describe('Rate Limiter', function () {
423423
await worker.close();
424424
});
425425

426+
describe('when job does not exist', () => {
427+
it('should fail with job existence error', async () => {
428+
const dynamicLimit = 250;
429+
const duration = 100;
430+
431+
const worker = new Worker(
432+
queueName,
433+
async job => {
434+
if (job.attemptsStarted === 1) {
435+
await queue.rateLimit(dynamicLimit);
436+
await queue.obliterate({ force: true });
437+
throw Worker.RateLimitError();
438+
}
439+
},
440+
{
441+
autorun: false,
442+
concurrency: 10,
443+
drainDelay: 10, // If test hangs, 10 seconds here helps to fail quicker.
444+
limiter: {
445+
max: 2,
446+
duration,
447+
},
448+
connection,
449+
prefix,
450+
},
451+
);
452+
453+
await worker.waitUntilReady();
454+
455+
const failing = new Promise<void>(resolve => {
456+
worker.on('error', err => {
457+
expect(err.message).to.be.equal(
458+
`Missing key for job ${job.id}. moveJobFromActiveToWait`,
459+
);
460+
resolve();
461+
});
462+
});
463+
464+
const job = await queue.add('test', { foo: 'bar' });
465+
466+
worker.run();
467+
468+
await failing;
469+
await worker.close();
470+
}).timeout(4000);
471+
});
472+
426473
describe('when rate limit is too low', () => {
427474
it('should move job to wait anyway', async function () {
428475
this.timeout(4000);

0 commit comments

Comments
 (0)