Skip to content

Commit

Permalink
fix(dynamic-rate-limit): validate job lock cases (#2975)
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf authored Dec 25, 2024
1 parent e8ca2ec commit 8bb27ea
Show file tree
Hide file tree
Showing 7 changed files with 132 additions and 68 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
"tsc:all": "tsc && tsc -p tsconfig-cjs.json"
},
"dependencies": {
"cron-parser": "^4.6.0",
"cron-parser": "^4.9.0",
"ioredis": "^5.4.1",
"msgpackr": "^1.11.2",
"node-abort-controller": "^3.1.1",
Expand Down
15 changes: 11 additions & 4 deletions src/classes/scripts.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1387,13 +1387,11 @@ export class Scripts {
*/
async moveJobFromActiveToWait(jobId: string, token: string) {
const client = await this.queue.client;
const lockKey = `${this.queue.toKey(jobId)}:lock`;

const keys: (string | number)[] = [
this.queue.keys.active,
this.queue.keys.wait,
this.queue.keys.stalled,
lockKey,
this.queue.keys.paused,
this.queue.keys.meta,
this.queue.keys.limiter,
Expand All @@ -1404,13 +1402,22 @@ export class Scripts {

const args = [jobId, token, this.queue.toKey(jobId)];

const pttl = await this.execCommand(
const result = await this.execCommand(
client,
'moveJobFromActiveToWait',
keys.concat(args),
);

return pttl < 0 ? 0 : pttl;
if (result < 0) {
throw this.finishedErrors({
code: result,
jobId,
command: 'moveJobFromActiveToWait',
state: 'active',
});
}

return result;
}

async obliterate(opts: { force: boolean; count: number }): Promise<number> {
Expand Down
60 changes: 0 additions & 60 deletions src/commands/moveJobFromActiveToWait-10.lua

This file was deleted.

63 changes: 63 additions & 0 deletions src/commands/moveJobFromActiveToWait-9.lua
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
--[[
Function to move job from active state to wait.
Input:
KEYS[1] active key
KEYS[2] wait key
KEYS[3] stalled key
KEYS[4] paused key
KEYS[5] meta key
KEYS[6] limiter key
KEYS[7] prioritized key
KEYS[8] marker key
KEYS[9] event key
ARGV[1] job id
ARGV[2] lock token
ARGV[3] job id key
]]
local rcall = redis.call

-- Includes
--- @include "includes/addJobInTargetList"
--- @include "includes/pushBackJobWithPriority"
--- @include "includes/getOrSetMaxEvents"
--- @include "includes/getTargetQueueList"
--- @include "includes/removeLock"

local jobId = ARGV[1]
local token = ARGV[2]
local jobKey = ARGV[3]

local errorCode = removeLock(jobKey, KEYS[3], token, jobId)
if errorCode < 0 then
return errorCode
end

local metaKey = KEYS[5]
local removed = rcall("LREM", KEYS[1], 1, jobId)
if removed > 0 then
local target, isPausedOrMaxed = getTargetQueueList(metaKey, KEYS[1], KEYS[2], KEYS[4])

local priority = tonumber(rcall("HGET", ARGV[3], "priority")) or 0

if priority > 0 then
pushBackJobWithPriority(KEYS[7], priority, jobId)
else
addJobInTargetList(target, KEYS[8], "RPUSH", isPausedOrMaxed, jobId)
end

local maxEvents = getOrSetMaxEvents(metaKey)

-- Emit waiting event
rcall("XADD", KEYS[9], "MAXLEN", "~", maxEvents, "*", "event", "waiting",
"jobId", jobId)
end

local pttl = rcall("PTTL", KEYS[6])

if pttl > 0 then
return pttl
else
return 0
end
11 changes: 9 additions & 2 deletions tests/test_events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -791,9 +791,15 @@ describe('events', function () {
);

let deduplicatedCounter = 0;
queueEvents.on('deduplicated', ({ jobId }) => {
deduplicatedCounter++;
const deduplication = new Promise<void>(resolve => {
queueEvents.on('deduplicated', () => {
deduplicatedCounter++;
if (deduplicatedCounter == 2) {
resolve();
}
});
});

await job.remove();

await queue.add(
Expand All @@ -814,6 +820,7 @@ describe('events', function () {
{ deduplication: { id: 'a1' } },
);
await secondJob.remove();
await deduplication;

expect(deduplicatedCounter).to.be.equal(2);
});
Expand Down
47 changes: 47 additions & 0 deletions tests/test_rate_limiter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -423,6 +423,53 @@ describe('Rate Limiter', function () {
await worker.close();
});

describe('when job does not exist', () => {
it('should fail with job existence error', async () => {
const dynamicLimit = 250;
const duration = 100;

const worker = new Worker(
queueName,
async job => {
if (job.attemptsStarted === 1) {
await queue.rateLimit(dynamicLimit);
await queue.obliterate({ force: true });
throw Worker.RateLimitError();
}
},
{
autorun: false,
concurrency: 10,
drainDelay: 10, // If test hangs, 10 seconds here helps to fail quicker.
limiter: {
max: 2,
duration,
},
connection,
prefix,
},
);

await worker.waitUntilReady();

const failing = new Promise<void>(resolve => {
worker.on('error', err => {
expect(err.message).to.be.equal(
`Missing lock for job ${job.id}. moveJobFromActiveToWait`,
);
resolve();
});
});

const job = await queue.add('test', { foo: 'bar' });

worker.run();

await failing;
await worker.close();
}).timeout(4000);
});

describe('when rate limit is too low', () => {
it('should move job to wait anyway', async function () {
this.timeout(4000);
Expand Down
2 changes: 1 addition & 1 deletion yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -2241,7 +2241,7 @@ create-require@^1.1.0:
resolved "https://registry.yarnpkg.com/create-require/-/create-require-1.1.1.tgz#c1d7e8f1e5f6cfc9ff65f9cd352d37348756c333"
integrity sha512-dcKFX3jn0MpIaXjisoRvexIJVEKzaq7z2rZKxf+MSr9TkdmHmsU4m2lcLojrj/FHl8mk5VxMmYA+ftRkP/3oKQ==

cron-parser@^4.6.0:
cron-parser@^4.9.0:
version "4.9.0"
resolved "https://registry.yarnpkg.com/cron-parser/-/cron-parser-4.9.0.tgz#0340694af3e46a0894978c6f52a6dbb5c0f11ad5"
integrity sha512-p0SaNjrHOnQeR8/VnfGbmg9te2kfyYSQ7Sc/j/6DtPL3JQvKxmjO9TSjNFpujqV3vEYYBvNNvXSxzyksBWAx1Q==
Expand Down

0 comments on commit 8bb27ea

Please sign in to comment.