From 62d1abac98e81ff3e31187ac684086d819473b2b Mon Sep 17 00:00:00 2001 From: roggervalf Date: Wed, 8 Jan 2025 21:50:32 -0500 Subject: [PATCH 1/3] fix(job-scheduler): add next delayed job only when previousMillis matches with producerId --- src/classes/scripts.ts | 1 + src/commands/updateJobScheduler-6.lua | 80 +++++++++++++++------------ tests/test_job_scheduler.ts | 67 +++++++++++++++++++++- 3 files changed, 110 insertions(+), 38 deletions(-) diff --git a/src/classes/scripts.ts b/src/classes/scripts.ts index e5e832eda5..b9b60217a1 100644 --- a/src/classes/scripts.ts +++ b/src/classes/scripts.ts @@ -378,6 +378,7 @@ export class Scripts { Date.now(), queueKeys[''], producerId ? this.queue.toKey(producerId) : '', + producerId, ]; return this.execCommand(client, 'updateJobScheduler', keys.concat(args)); diff --git a/src/commands/updateJobScheduler-6.lua b/src/commands/updateJobScheduler-6.lua index 8e9040de80..b2a303ff67 100644 --- a/src/commands/updateJobScheduler-6.lua +++ b/src/commands/updateJobScheduler-6.lua @@ -1,23 +1,24 @@ --[[ Updates a job scheduler and adds next delayed job - Input: - KEYS[1] 'marker', - KEYS[2] 'meta' - KEYS[3] 'id' - KEYS[4] 'delayed' - KEYS[5] events stream key - KEYS[6] 'repeat' key - - ARGV[1] next milliseconds - ARGV[2] jobs scheduler id - ARGV[3] msgpacked delayed opts - ARGV[4] timestamp - ARGV[5] prefix key - ARGV[6] producer key - - Output: - next delayed job id - OK + Input: + KEYS[1] 'marker', + KEYS[2] 'meta' + KEYS[3] 'id' + KEYS[4] 'delayed' + KEYS[5] events stream key + KEYS[6] 'repeat' key + + ARGV[1] next milliseconds + ARGV[2] jobs scheduler id + ARGV[3] msgpacked delayed opts + ARGV[4] timestamp + ARGV[5] prefix key + ARGV[6] producer key + ARGV[7] producer id + + Output: + next delayed job id - OK ]] local rcall = redis.call local repeatKey = KEYS[6] @@ -38,24 +39,31 @@ local nextDelayedJobKey = schedulerKey .. ":" .. nextMillis -- Validate that scheduler exists. local prevMillis = rcall("ZSCORE", repeatKey, jobSchedulerId) if prevMillis ~= false then - local schedulerAttributes = rcall("HMGET", schedulerKey, "name", "data") - - rcall("ZADD", repeatKey, nextMillis, jobSchedulerId) - - local eventsKey = KEYS[5] - local metaKey = KEYS[2] - local maxEvents = getOrSetMaxEvents(metaKey) - - rcall("INCR", KEYS[3]) - - local delayedOpts = cmsgpack.unpack(ARGV[3]) - - addDelayedJob(nextDelayedJobKey, nextDelayedJobId, delayedKey, eventsKey, schedulerAttributes[1], - schedulerAttributes[2] or "{}", delayedOpts, timestamp, jobSchedulerId, maxEvents, KEYS[1], nil, nil) - - if ARGV[6] ~= "" then - rcall("HSET", ARGV[6], "nrjid", nextDelayedJobId) + local schedulerAttributes = rcall("HMGET", schedulerKey, "name", "data") + + rcall("ZADD", repeatKey, nextMillis, jobSchedulerId) + + local eventsKey = KEYS[5] + local metaKey = KEYS[2] + local maxEvents = getOrSetMaxEvents(metaKey) + + rcall("INCR", KEYS[3]) + + local delayedOpts = cmsgpack.unpack(ARGV[3]) + local producerId = ARGV[7] + + if producerId then + local currentDelayedJobId = "repeat:" .. jobSchedulerId .. ":" .. prevMillis + + if producerId == currentDelayedJobId then + addDelayedJob(nextDelayedJobKey, nextDelayedJobId, delayedKey, eventsKey, schedulerAttributes[1], + schedulerAttributes[2] or "{}", delayedOpts, timestamp, jobSchedulerId, maxEvents, KEYS[1], nil, nil) + + if ARGV[6] ~= "" then + rcall("HSET", ARGV[6], "nrjid", nextDelayedJobId) + end + + return nextDelayedJobId .. "" -- convert to string end - - return nextDelayedJobId .. "" -- convert to string + end end diff --git a/tests/test_job_scheduler.ts b/tests/test_job_scheduler.ts index f053f9cfd7..beca75885f 100644 --- a/tests/test_job_scheduler.ts +++ b/tests/test_job_scheduler.ts @@ -13,8 +13,7 @@ import { getNextMillis, Worker, } from '../src/classes'; -import { JobsOptions } from '../src/types'; -import { removeAllQueueData } from '../src/utils'; +import { delay, removeAllQueueData } from '../src/utils'; const moment = require('moment'); @@ -2047,6 +2046,70 @@ describe('Job Scheduler', function () { await worker.close(); }); + describe('when overriding a scheduler', function () { + it('should not continue adding new delayed jobs from previous delayed record', async function () { + this.clock.restore(); + + const repeatOpts = { pattern: '*/2 * * * * *' }; + + let count = 0; + const worker = new Worker( + queueName, + async () => { + if (count === 0) { + await delay(2000); + await queue.pause(); // keep job in waiting list + } + }, + { connection, prefix }, + ); + + const completing = new Promise(async resolve => { + worker.on('completed', async () => { + count++; + if (count === 1) { + const waitingCount = await queue.getWaitingCount(); + expect(waitingCount).to.eql(1); + + await queue.upsertJobScheduler( + 'test', + { pattern: '*/15 * * * * *' }, + { + data: { foo: 'baz' }, + }, + ); + + const waitingCount2 = await queue.getWaitingCount(); + expect(waitingCount2).to.eql(1); + const delayedCount = await queue.getDelayedCount(); + expect(delayedCount).to.eql(1); + + await queue.resume(); + } else { + resolve(); + } + }); + }); + + await queue.upsertJobScheduler('test', repeatOpts, { + data: { foo: 'bar' }, + }); + + await completing; + + const schedulerCount = await queue.getJobSchedulersCount(); + expect(schedulerCount).to.eql(1); + + const delayedCount = await queue.getDelayedCount(); + expect(delayedCount).to.eql(1); + + const totalJobs = await queue.getJobCountByTypes(); + expect(totalJobs).to.eql(3); // 2 completed, 1 delayed + + await worker.close(); + }); + }); + it('should allow adding a repeatable job after removing it', async function () { const repeat = { pattern: '*/5 * * * *', From ec03f8f357a76730d48371ea33f358d605b9b8f5 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Wed, 8 Jan 2025 22:12:52 -0500 Subject: [PATCH 2/3] refactor: only update nextMillis if producerId matches --- src/commands/updateJobScheduler-6.lua | 34 +++++++++++++-------------- 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/src/commands/updateJobScheduler-6.lua b/src/commands/updateJobScheduler-6.lua index b2a303ff67..883d4af1d7 100644 --- a/src/commands/updateJobScheduler-6.lua +++ b/src/commands/updateJobScheduler-6.lua @@ -27,6 +27,7 @@ local timestamp = ARGV[4] local nextMillis = ARGV[1] local jobSchedulerId = ARGV[2] local prefixKey = ARGV[5] +local producerId = ARGV[7] -- Includes --- @include "includes/addDelayedJob" @@ -39,31 +40,28 @@ local nextDelayedJobKey = schedulerKey .. ":" .. nextMillis -- Validate that scheduler exists. local prevMillis = rcall("ZSCORE", repeatKey, jobSchedulerId) if prevMillis ~= false then - local schedulerAttributes = rcall("HMGET", schedulerKey, "name", "data") + local currentDelayedJobId = "repeat:" .. jobSchedulerId .. ":" .. prevMillis - rcall("ZADD", repeatKey, nextMillis, jobSchedulerId) + if producerId == currentDelayedJobId then + local schedulerAttributes = rcall("HMGET", schedulerKey, "name", "data") - local eventsKey = KEYS[5] - local metaKey = KEYS[2] - local maxEvents = getOrSetMaxEvents(metaKey) + rcall("ZADD", repeatKey, nextMillis, jobSchedulerId) - rcall("INCR", KEYS[3]) + local eventsKey = KEYS[5] + local metaKey = KEYS[2] + local maxEvents = getOrSetMaxEvents(metaKey) - local delayedOpts = cmsgpack.unpack(ARGV[3]) - local producerId = ARGV[7] + rcall("INCR", KEYS[3]) - if producerId then - local currentDelayedJobId = "repeat:" .. jobSchedulerId .. ":" .. prevMillis + local delayedOpts = cmsgpack.unpack(ARGV[3]) - if producerId == currentDelayedJobId then - addDelayedJob(nextDelayedJobKey, nextDelayedJobId, delayedKey, eventsKey, schedulerAttributes[1], - schedulerAttributes[2] or "{}", delayedOpts, timestamp, jobSchedulerId, maxEvents, KEYS[1], nil, nil) + addDelayedJob(nextDelayedJobKey, nextDelayedJobId, delayedKey, eventsKey, schedulerAttributes[1], + schedulerAttributes[2] or "{}", delayedOpts, timestamp, jobSchedulerId, maxEvents, KEYS[1], nil, nil) - if ARGV[6] ~= "" then - rcall("HSET", ARGV[6], "nrjid", nextDelayedJobId) - end - - return nextDelayedJobId .. "" -- convert to string + if ARGV[6] ~= "" then + rcall("HSET", ARGV[6], "nrjid", nextDelayedJobId) end + + return nextDelayedJobId .. "" -- convert to string end end From d327bb4af5640670976e13f78be8e7beaf833358 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Sun, 12 Jan 2025 11:26:26 -0500 Subject: [PATCH 3/3] refactor(job-scheduler): move arg as a key in updateJobScheduler --- src/classes/scripts.ts | 2 +- ...dateJobScheduler-6.lua => updateJobScheduler-7.lua} | 10 +++++----- 2 files changed, 6 insertions(+), 6 deletions(-) rename src/commands/{updateJobScheduler-6.lua => updateJobScheduler-7.lua} (91%) diff --git a/src/classes/scripts.ts b/src/classes/scripts.ts index b9b60217a1..93241de272 100644 --- a/src/classes/scripts.ts +++ b/src/classes/scripts.ts @@ -369,6 +369,7 @@ export class Scripts { queueKeys.delayed, queueKeys.events, queueKeys.repeat, + producerId ? this.queue.toKey(producerId) : '', ]; const args = [ @@ -377,7 +378,6 @@ export class Scripts { pack(delayedJobOpts), Date.now(), queueKeys[''], - producerId ? this.queue.toKey(producerId) : '', producerId, ]; diff --git a/src/commands/updateJobScheduler-6.lua b/src/commands/updateJobScheduler-7.lua similarity index 91% rename from src/commands/updateJobScheduler-6.lua rename to src/commands/updateJobScheduler-7.lua index 883d4af1d7..4a6b052312 100644 --- a/src/commands/updateJobScheduler-6.lua +++ b/src/commands/updateJobScheduler-7.lua @@ -8,14 +8,14 @@ KEYS[4] 'delayed' KEYS[5] events stream key KEYS[6] 'repeat' key + KEYS[7] producer key ARGV[1] next milliseconds ARGV[2] jobs scheduler id ARGV[3] msgpacked delayed opts ARGV[4] timestamp ARGV[5] prefix key - ARGV[6] producer key - ARGV[7] producer id + ARGV[6] producer id Output: next delayed job id - OK @@ -27,7 +27,7 @@ local timestamp = ARGV[4] local nextMillis = ARGV[1] local jobSchedulerId = ARGV[2] local prefixKey = ARGV[5] -local producerId = ARGV[7] +local producerId = ARGV[6] -- Includes --- @include "includes/addDelayedJob" @@ -58,8 +58,8 @@ if prevMillis ~= false then addDelayedJob(nextDelayedJobKey, nextDelayedJobId, delayedKey, eventsKey, schedulerAttributes[1], schedulerAttributes[2] or "{}", delayedOpts, timestamp, jobSchedulerId, maxEvents, KEYS[1], nil, nil) - if ARGV[6] ~= "" then - rcall("HSET", ARGV[6], "nrjid", nextDelayedJobId) + if KEYS[7] ~= "" then + rcall("HSET", KEYS[7], "nrjid", nextDelayedJobId) end return nextDelayedJobId .. "" -- convert to string