From 4a909059da2673a4cc7d5b4368a67505ea9ac9a2 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Wed, 18 Dec 2024 22:25:15 -0500 Subject: [PATCH 1/4] fix(dynamic-rate-limit): validate job existence --- src/classes/scripts.ts | 15 ++++-- ...t-10.lua => moveJobFromActiveToWait-9.lua} | 49 +++++++++++-------- tests/test_rate_limiter.ts | 47 ++++++++++++++++++ 3 files changed, 86 insertions(+), 25 deletions(-) rename src/commands/{moveJobFromActiveToWait-10.lua => moveJobFromActiveToWait-9.lua} (54%) diff --git a/src/classes/scripts.ts b/src/classes/scripts.ts index e4ee4cbeae..a36359a946 100644 --- a/src/classes/scripts.ts +++ b/src/classes/scripts.ts @@ -1387,13 +1387,11 @@ export class Scripts { */ async moveJobFromActiveToWait(jobId: string, token: string) { const client = await this.queue.client; - const lockKey = `${this.queue.toKey(jobId)}:lock`; const keys: (string | number)[] = [ this.queue.keys.active, this.queue.keys.wait, this.queue.keys.stalled, - lockKey, this.queue.keys.paused, this.queue.keys.meta, this.queue.keys.limiter, @@ -1404,13 +1402,22 @@ export class Scripts { const args = [jobId, token, this.queue.toKey(jobId)]; - const pttl = await this.execCommand( + const result = await this.execCommand( client, 'moveJobFromActiveToWait', keys.concat(args), ); - return pttl < 0 ? 0 : pttl; + if (result < 0) { + throw this.finishedErrors({ + code: result, + jobId, + command: 'moveJobFromActiveToWait', + state: 'active', + }); + } + + return result; } async obliterate(opts: { force: boolean; count: number }): Promise { diff --git a/src/commands/moveJobFromActiveToWait-10.lua b/src/commands/moveJobFromActiveToWait-9.lua similarity index 54% rename from src/commands/moveJobFromActiveToWait-10.lua rename to src/commands/moveJobFromActiveToWait-9.lua index e90d6d2d10..57b0864d47 100644 --- a/src/commands/moveJobFromActiveToWait-10.lua +++ b/src/commands/moveJobFromActiveToWait-9.lua @@ -5,13 +5,12 @@ KEYS[2] wait key KEYS[3] stalled key - KEYS[4] job lock key - KEYS[5] paused key - KEYS[6] meta key - KEYS[7] limiter key - KEYS[8] prioritized key - KEYS[9] marker key - KEYS[10] event key + KEYS[4] paused key + KEYS[5] meta key + KEYS[6] limiter key + KEYS[7] prioritized key + KEYS[8] marker key + KEYS[9] event key ARGV[1] job id ARGV[2] lock token @@ -24,37 +23,45 @@ local rcall = redis.call --- @include "includes/pushBackJobWithPriority" --- @include "includes/getOrSetMaxEvents" --- @include "includes/getTargetQueueList" +--- @include "includes/removeLock" local jobId = ARGV[1] local token = ARGV[2] -local lockKey = KEYS[4] +local jobKey = ARGV[3] -local lockToken = rcall("GET", lockKey) -local pttl = rcall("PTTL", KEYS[7]) -if lockToken == token then - local metaKey = KEYS[6] +if rcall("EXISTS", jobKey) == 1 then + local errorCode = removeLock(jobKey, KEYS[3], token, jobId) + if errorCode < 0 then + return errorCode + end + + local metaKey = KEYS[5] local removed = rcall("LREM", KEYS[1], 1, jobId) if removed > 0 then - local target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[1], KEYS[2], KEYS[5]) - - rcall("SREM", KEYS[3], jobId) + local target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[1], KEYS[2], KEYS[4]) local priority = tonumber(rcall("HGET", ARGV[3], "priority")) or 0 if priority > 0 then - pushBackJobWithPriority(KEYS[8], priority, jobId) + pushBackJobWithPriority(KEYS[7], priority, jobId) else - addJobInTargetList(target, KEYS[9], "RPUSH", isPausedOrMaxed, jobId) + addJobInTargetList(target, KEYS[8], "RPUSH", isPausedOrMaxed, jobId) end - rcall("DEL", lockKey) - local maxEvents = getOrSetMaxEvents(metaKey) -- Emit waiting event - rcall("XADD", KEYS[10], "MAXLEN", "~", maxEvents, "*", "event", "waiting", + rcall("XADD", KEYS[9], "MAXLEN", "~", maxEvents, "*", "event", "waiting", "jobId", jobId) end +else + return -1 end -return pttl +local pttl = rcall("PTTL", KEYS[6]) + +if pttl > 0 then + return pttl +else + return 0 +end \ No newline at end of file diff --git a/tests/test_rate_limiter.ts b/tests/test_rate_limiter.ts index 28e4c5c9ad..3ecaacd949 100644 --- a/tests/test_rate_limiter.ts +++ b/tests/test_rate_limiter.ts @@ -423,6 +423,53 @@ describe('Rate Limiter', function () { await worker.close(); }); + describe('when job does not exist', () => { + it('should fail with job existence error', async () => { + const dynamicLimit = 250; + const duration = 100; + + const worker = new Worker( + queueName, + async job => { + if (job.attemptsStarted === 1) { + await queue.rateLimit(dynamicLimit); + await queue.obliterate({ force: true }); + throw Worker.RateLimitError(); + } + }, + { + autorun: false, + concurrency: 10, + drainDelay: 10, // If test hangs, 10 seconds here helps to fail quicker. + limiter: { + max: 2, + duration, + }, + connection, + prefix, + }, + ); + + await worker.waitUntilReady(); + + const failing = new Promise(resolve => { + worker.on('error', err => { + expect(err.message).to.be.equal( + `Missing key for job ${job.id}. moveJobFromActiveToWait`, + ); + resolve(); + }); + }); + + const job = await queue.add('test', { foo: 'bar' }); + + worker.run(); + + await failing; + await worker.close(); + }).timeout(4000); + }); + describe('when rate limit is too low', () => { it('should move job to wait anyway', async function () { this.timeout(4000); From f35317ecf8f98982377bb680f31cf86e222c768a Mon Sep 17 00:00:00 2001 From: roggervalf Date: Wed, 18 Dec 2024 22:43:27 -0500 Subject: [PATCH 2/4] fix(deps): upgrade cron-parser dependency to v4.9.0 for Luxon CVE-2023-22467 --- package.json | 2 +- yarn.lock | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/package.json b/package.json index 3f9acf4eb6..2eb2f4de7d 100644 --- a/package.json +++ b/package.json @@ -53,7 +53,7 @@ "tsc:all": "tsc && tsc -p tsconfig-cjs.json" }, "dependencies": { - "cron-parser": "^4.6.0", + "cron-parser": "^4.9.0", "ioredis": "^5.4.1", "msgpackr": "^1.11.2", "node-abort-controller": "^3.1.1", diff --git a/yarn.lock b/yarn.lock index f68b48af72..6141eb518f 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2241,7 +2241,7 @@ create-require@^1.1.0: resolved "https://registry.yarnpkg.com/create-require/-/create-require-1.1.1.tgz#c1d7e8f1e5f6cfc9ff65f9cd352d37348756c333" integrity sha512-dcKFX3jn0MpIaXjisoRvexIJVEKzaq7z2rZKxf+MSr9TkdmHmsU4m2lcLojrj/FHl8mk5VxMmYA+ftRkP/3oKQ== -cron-parser@^4.6.0: +cron-parser@^4.9.0: version "4.9.0" resolved "https://registry.yarnpkg.com/cron-parser/-/cron-parser-4.9.0.tgz#0340694af3e46a0894978c6f52a6dbb5c0f11ad5" integrity sha512-p0SaNjrHOnQeR8/VnfGbmg9te2kfyYSQ7Sc/j/6DtPL3JQvKxmjO9TSjNFpujqV3vEYYBvNNvXSxzyksBWAx1Q== From 413c03bc4dec8aa1b70c373464bf87c31bf9a551 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Wed, 18 Dec 2024 23:03:06 -0500 Subject: [PATCH 3/4] chore: fix flaky test --- tests/test_events.ts | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/tests/test_events.ts b/tests/test_events.ts index a1fd791d99..3ec45cc9ce 100644 --- a/tests/test_events.ts +++ b/tests/test_events.ts @@ -791,9 +791,15 @@ describe('events', function () { ); let deduplicatedCounter = 0; - queueEvents.on('deduplicated', ({ jobId }) => { - deduplicatedCounter++; + const deduplication = new Promise(resolve => { + queueEvents.on('deduplicated', () => { + deduplicatedCounter++; + if (deduplicatedCounter == 2) { + resolve(); + } + }); }); + await job.remove(); await queue.add( @@ -814,6 +820,7 @@ describe('events', function () { { deduplication: { id: 'a1' } }, ); await secondJob.remove(); + await deduplication; expect(deduplicatedCounter).to.be.equal(2); }); From 8e0214f26a2adcae44aca5c464778088b21a634b Mon Sep 17 00:00:00 2001 From: roggervalf Date: Thu, 19 Dec 2024 23:06:56 -0500 Subject: [PATCH 4/4] chore: address comments --- src/commands/moveJobFromActiveToWait-9.lua | 40 ++++++++++------------ tests/test_rate_limiter.ts | 2 +- 2 files changed, 19 insertions(+), 23 deletions(-) diff --git a/src/commands/moveJobFromActiveToWait-9.lua b/src/commands/moveJobFromActiveToWait-9.lua index 57b0864d47..6c927a99ec 100644 --- a/src/commands/moveJobFromActiveToWait-9.lua +++ b/src/commands/moveJobFromActiveToWait-9.lua @@ -29,33 +29,29 @@ local jobId = ARGV[1] local token = ARGV[2] local jobKey = ARGV[3] -if rcall("EXISTS", jobKey) == 1 then - local errorCode = removeLock(jobKey, KEYS[3], token, jobId) - if errorCode < 0 then - return errorCode - end +local errorCode = removeLock(jobKey, KEYS[3], token, jobId) +if errorCode < 0 then + return errorCode +end - local metaKey = KEYS[5] - local removed = rcall("LREM", KEYS[1], 1, jobId) - if removed > 0 then - local target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[1], KEYS[2], KEYS[4]) +local metaKey = KEYS[5] +local removed = rcall("LREM", KEYS[1], 1, jobId) +if removed > 0 then + local target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[1], KEYS[2], KEYS[4]) - local priority = tonumber(rcall("HGET", ARGV[3], "priority")) or 0 + local priority = tonumber(rcall("HGET", ARGV[3], "priority")) or 0 - if priority > 0 then - pushBackJobWithPriority(KEYS[7], priority, jobId) - else - addJobInTargetList(target, KEYS[8], "RPUSH", isPausedOrMaxed, jobId) - end + if priority > 0 then + pushBackJobWithPriority(KEYS[7], priority, jobId) + else + addJobInTargetList(target, KEYS[8], "RPUSH", isPausedOrMaxed, jobId) + end - local maxEvents = getOrSetMaxEvents(metaKey) + local maxEvents = getOrSetMaxEvents(metaKey) - -- Emit waiting event - rcall("XADD", KEYS[9], "MAXLEN", "~", maxEvents, "*", "event", "waiting", - "jobId", jobId) - end -else - return -1 + -- Emit waiting event + rcall("XADD", KEYS[9], "MAXLEN", "~", maxEvents, "*", "event", "waiting", + "jobId", jobId) end local pttl = rcall("PTTL", KEYS[6]) diff --git a/tests/test_rate_limiter.ts b/tests/test_rate_limiter.ts index 3ecaacd949..3a9f8c991d 100644 --- a/tests/test_rate_limiter.ts +++ b/tests/test_rate_limiter.ts @@ -455,7 +455,7 @@ describe('Rate Limiter', function () { const failing = new Promise(resolve => { worker.on('error', err => { expect(err.message).to.be.equal( - `Missing key for job ${job.id}. moveJobFromActiveToWait`, + `Missing lock for job ${job.id}. moveJobFromActiveToWait`, ); resolve(); });