Skip to content

Commit

Permalink
fix(job-scheduler): validate if next delayed job exists
Browse files Browse the repository at this point in the history
  • Loading branch information
roggervalf committed Jan 16, 2025
1 parent 71733cc commit 685e384
Show file tree
Hide file tree
Showing 4 changed files with 110 additions and 53 deletions.
28 changes: 15 additions & 13 deletions src/classes/job-scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -165,22 +165,24 @@ export class JobScheduler extends QueueBase {
producerId,
);

const job = new this.Job<T, R, N>(
this,
jobName,
jobData,
mergedOpts,
jobId,
);
if (jobId) {
const job = new this.Job<T, R, N>(
this,
jobName,
jobData,
mergedOpts,
jobId,
);

job.id = jobId;
job.id = jobId;

span?.setAttributes({
[TelemetryAttributes.JobSchedulerId]: jobSchedulerId,
[TelemetryAttributes.JobId]: job.id,
});
span?.setAttributes({
[TelemetryAttributes.JobSchedulerId]: jobSchedulerId,
[TelemetryAttributes.JobId]: job.id,
});

return job;
return job;
}
} else {
const jobId = await this.scripts.updateJobSchedulerNextMillis(
jobSchedulerId,
Expand Down
35 changes: 18 additions & 17 deletions src/commands/addJobScheduler-6.lua
Original file line number Diff line number Diff line change
Expand Up @@ -84,37 +84,38 @@ local schedulerKey = repeatKey .. ":" .. jobSchedulerId
local nextDelayedJobId = "repeat:" .. jobSchedulerId .. ":" .. nextMillis
local nextDelayedJobKey = schedulerKey .. ":" .. nextMillis

local nextDelayedJobDoesNotExist = rcall("EXISTS", nextDelayedJobKey) ~= 1
-- If we are overriding a repeatable job we must delete the delayed job for
-- the next iteration.
local prevMillis = rcall("ZSCORE", repeatKey, jobSchedulerId)
if prevMillis ~= false then
local delayedJobId = "repeat:" .. jobSchedulerId .. ":" .. prevMillis
local prevDelayedJobId = "repeat:" .. jobSchedulerId .. ":" .. prevMillis

if rcall("ZSCORE", delayedKey, delayedJobId) ~= false
and (rcall("EXISTS", nextDelayedJobKey) ~= 1
or delayedJobId == nextDelayedJobId) then
removeJob(delayedJobId, true, prefixKey, true --[[remove debounce key]])
rcall("ZREM", delayedKey, delayedJobId)
if rcall("ZSCORE", delayedKey, prevDelayedJobId) ~= false and then
removeJob(prevDelayedJobId, true, prefixKey, true --[[remove debounce key]])
rcall("ZREM", delayedKey, prevDelayedJobId)
end
end

local schedulerOpts = cmsgpack.unpack(ARGV[2])

storeRepeatableJob(jobSchedulerId, schedulerKey, repeatKey, nextMillis, schedulerOpts, ARGV[4], templateOpts)

local eventsKey = KEYS[5]
local metaKey = KEYS[2]
local maxEvents = getOrSetMaxEvents(metaKey)
if nextDelayedJobDoesNotExist then
local eventsKey = KEYS[5]
local metaKey = KEYS[2]
local maxEvents = getOrSetMaxEvents(metaKey)

rcall("INCR", KEYS[3])
rcall("INCR", KEYS[3])

local delayedOpts = cmsgpack.unpack(ARGV[6])
local delayedOpts = cmsgpack.unpack(ARGV[6])

addDelayedJob(nextDelayedJobKey, nextDelayedJobId, delayedKey, eventsKey, schedulerOpts['name'], ARGV[4], delayedOpts,
timestamp, jobSchedulerId, maxEvents, KEYS[1], nil, nil)
addDelayedJob(nextDelayedJobKey, nextDelayedJobId, delayedKey, eventsKey, schedulerOpts['name'], ARGV[4], delayedOpts,
timestamp, jobSchedulerId, maxEvents, KEYS[1], nil, nil)

if ARGV[9] ~= "" then
rcall("HSET", ARGV[9], "nrjid", nextDelayedJobId)
end
if ARGV[9] ~= "" then
rcall("HSET", ARGV[9], "nrjid", nextDelayedJobId)
end

return nextDelayedJobId .. "" -- convert to string
return nextDelayedJobId .. "" -- convert to string
end
38 changes: 20 additions & 18 deletions src/commands/updateJobScheduler-7.lua
Original file line number Diff line number Diff line change
Expand Up @@ -48,29 +48,31 @@ if prevMillis ~= false then

rcall("ZADD", repeatKey, nextMillis, jobSchedulerId)

local eventsKey = KEYS[5]
local metaKey = KEYS[2]
local maxEvents = getOrSetMaxEvents(metaKey)
if rcall("EXISTS", nextDelayedJobKey) ~= 1 then
local eventsKey = KEYS[5]
local metaKey = KEYS[2]
local maxEvents = getOrSetMaxEvents(metaKey)

rcall("INCR", KEYS[3])
rcall("INCR", KEYS[3])

local delayedOpts = cmsgpack.unpack(ARGV[4])
local delayedOpts = cmsgpack.unpack(ARGV[4])

-- TODO: remove this workaround in next breaking change,
-- all job-schedulers must save job data
local templateData = schedulerAttributes[2] or ARGV[3]
-- TODO: remove this workaround in next breaking change,
-- all job-schedulers must save job data
local templateData = schedulerAttributes[2] or ARGV[3]

if templateData and templateData ~= '{}' then
rcall("HSET", schedulerKey, "data", templateData)
end
if templateData and templateData ~= '{}' then
rcall("HSET", schedulerKey, "data", templateData)
end

addDelayedJob(nextDelayedJobKey, nextDelayedJobId, delayedKey, eventsKey, schedulerAttributes[1],
templateData or '{}', delayedOpts, timestamp, jobSchedulerId, maxEvents, KEYS[1], nil, nil)

if KEYS[7] ~= "" then
rcall("HSET", KEYS[7], "nrjid", nextDelayedJobId)
end
addDelayedJob(nextDelayedJobKey, nextDelayedJobId, delayedKey, eventsKey, schedulerAttributes[1],
templateData or '{}', delayedOpts, timestamp, jobSchedulerId, maxEvents, KEYS[1], nil, nil)
if KEYS[7] ~= "" then
rcall("HSET", KEYS[7], "nrjid", nextDelayedJobId)
end

return nextDelayedJobId .. "" -- convert to string
return nextDelayedJobId .. "" -- convert to string
end
end
end
62 changes: 57 additions & 5 deletions tests/test_job_scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ describe('Job Scheduler', function () {
await queue.close();
await repeat.close();
await queueEvents.close();
await removeAllQueueData(new IORedis(redisHost), queueName);
//await removeAllQueueData(new IORedis(redisHost), queueName);
});

afterAll(async function () {
Expand Down Expand Up @@ -241,8 +241,9 @@ describe('Job Scheduler', function () {
const repeatableJobs = await queue.getJobSchedulers();
expect(repeatableJobs.length).to.be.eql(1);
await this.clock.tickAsync(ONE_MINUTE);
const delayed = await queue.getDelayed();
expect(delayed).to.have.length(1);
const counts = await queue.getJobCounts();
expect(counts.delayed).to.be.eql(0);
expect(counts.completed).to.be.eql(1);

await worker.close();
});
Expand Down Expand Up @@ -1537,6 +1538,8 @@ describe('Job Scheduler', function () {

describe('when repeatable job is promoted', function () {
it('keeps one repeatable and one delayed after being processed', async function () {
this.clock.restore();

const repeatOpts = {
pattern: '0 * 1 * *',
};
Expand All @@ -1562,16 +1565,65 @@ describe('Job Scheduler', function () {
const delayedCount2 = await queue.getDelayedCount();
expect(delayedCount2).to.be.equal(1);

const configs = await repeat.getRepeatableJobs(0, -1, true);
const schedulersCount = await queue.getJobSchedulersCount();

expect(delayedCount).to.be.equal(1);

const count = await queue.count();

expect(count).to.be.equal(1);
expect(configs).to.have.length(1);
expect(schedulersCount).to.be.equal(1);
await worker.close();
});

describe('when scheduler is removed and re-added', function () {
it('should not add next delayed job if already existed in different state than delayed', async function () {
this.clock.restore();

const repeatOpts = {
pattern: '0 * 1 * *',
};

const worker = new Worker(
queueName,
async () => {
await queue.removeJobScheduler('test');
await queue.upsertJobScheduler('test', repeatOpts);
},
{
connection,
prefix,
},
);

const completing = new Promise<void>(resolve => {
worker.on('completed', () => {
resolve();
});
});

const repeatableJob = await queue.upsertJobScheduler(
'test',
repeatOpts,
);
const delayedCount = await queue.getDelayedCount();
expect(delayedCount).to.be.equal(1);

await repeatableJob!.promote();
await completing;

const delayedCountAfter = await queue.getDelayedCount();
expect(delayedCountAfter).to.be.equal(0);

const schedulersCount = await queue.getJobSchedulersCount();

const counts = await queue.getJobCounts();

expect(counts.completed).to.be.equal(1);
expect(schedulersCount).to.be.equal(1);
await worker.close();
});
});
});

it('should allow removing a named repeatable job', async function () {
Expand Down

0 comments on commit 685e384

Please sign in to comment.