diff --git a/src/classes/job-scheduler.ts b/src/classes/job-scheduler.ts index b36fb9c4d0..1e7fe9e31a 100644 --- a/src/classes/job-scheduler.ts +++ b/src/classes/job-scheduler.ts @@ -185,6 +185,7 @@ export class JobScheduler extends QueueBase { const jobId = await this.scripts.updateJobSchedulerNextMillis( jobSchedulerId, nextMillis, + JSON.stringify(typeof jobData === 'undefined' ? {} : jobData), Job.optsAsJSON(mergedOpts), producerId, ); diff --git a/src/classes/scripts.ts b/src/classes/scripts.ts index 93241de272..8350c9dff1 100644 --- a/src/classes/scripts.ts +++ b/src/classes/scripts.ts @@ -354,6 +354,7 @@ export class Scripts { async updateJobSchedulerNextMillis( jobSchedulerId: string, nextMillis: number, + templateData: string, delayedJobOpts: JobsOptions, // The job id of the job that produced this next iteration producerId?: string, @@ -375,6 +376,7 @@ export class Scripts { const args = [ nextMillis, jobSchedulerId, + templateData, pack(delayedJobOpts), Date.now(), queueKeys[''], diff --git a/src/commands/updateJobScheduler-7.lua b/src/commands/updateJobScheduler-7.lua index 4a6b052312..326b63de25 100644 --- a/src/commands/updateJobScheduler-7.lua +++ b/src/commands/updateJobScheduler-7.lua @@ -12,10 +12,11 @@ ARGV[1] next milliseconds ARGV[2] jobs scheduler id - ARGV[3] msgpacked delayed opts - ARGV[4] timestamp - ARGV[5] prefix key - ARGV[6] producer id + ARGV[3] Json stringified delayed data + ARGV[4] msgpacked delayed opts + ARGV[5] timestamp + ARGV[6] prefix key + ARGV[7] producer id Output: next delayed job id - OK @@ -23,11 +24,11 @@ local rcall = redis.call local repeatKey = KEYS[6] local delayedKey = KEYS[4] -local timestamp = ARGV[4] local nextMillis = ARGV[1] local jobSchedulerId = ARGV[2] -local prefixKey = ARGV[5] -local producerId = ARGV[6] +local timestamp = ARGV[5] +local prefixKey = ARGV[6] +local producerId = ARGV[7] -- Includes --- @include "includes/addDelayedJob" @@ -53,10 +54,18 @@ if prevMillis ~= false then rcall("INCR", KEYS[3]) - local delayedOpts = cmsgpack.unpack(ARGV[3]) + local delayedOpts = cmsgpack.unpack(ARGV[4]) + + -- TODO: remove this workaround in next breaking change, + -- all job-schedulers must save job data + local templateData = schedulerAttributes[2] or ARGV[3] + + if templateData and templateData ~= '{}' then + rcall("HSET", schedulerKey, "data", templateData) + end addDelayedJob(nextDelayedJobKey, nextDelayedJobId, delayedKey, eventsKey, schedulerAttributes[1], - schedulerAttributes[2] or "{}", delayedOpts, timestamp, jobSchedulerId, maxEvents, KEYS[1], nil, nil) + templateData or '{}', delayedOpts, timestamp, jobSchedulerId, maxEvents, KEYS[1], nil, nil) if KEYS[7] ~= "" then rcall("HSET", KEYS[7], "nrjid", nextDelayedJobId) diff --git a/tests/test_job_scheduler.ts b/tests/test_job_scheduler.ts index beca75885f..0918941573 100644 --- a/tests/test_job_scheduler.ts +++ b/tests/test_job_scheduler.ts @@ -1306,6 +1306,7 @@ describe('Job Scheduler', function () { } } catch (error) { console.log(error); + reject(error); } }); }); @@ -1328,6 +1329,72 @@ describe('Job Scheduler', function () { delayStub.restore(); }); + describe('when template data is only present in delayed job', function () { + it('should continue saving data in next delayed jobs', async function () { + const client = await queue.client; + + const date = new Date('2017-05-05 13:12:00'); + this.clock.setSystemTime(date); + + const nextTick = ONE_DAY + 10 * ONE_SECOND; + const delay = 5 * ONE_SECOND + 500; + + const worker = new Worker( + queueName, + async () => { + await client.hdel(`${prefix}:${queueName}:repeat:repeat`, 'data'); + this.clock.tick(nextTick); + }, + { + autorun: false, + connection, + prefix, + skipStalledCheck: true, + skipLockRenewal: true, + }, + ); + const delayStub = sinon.stub(worker, 'delay').callsFake(async () => { + console.log('delay'); + }); + const templateData = { foo: 'bar' }; + + let prev: Job; + let counter = 0; + const completing = new Promise((resolve, reject) => { + worker.on('completed', async job => { + try { + expect(job.data).to.deep.equal(templateData); + + counter++; + if (counter == 5) { + resolve(); + } + } catch (error) { + console.log(error); + reject(error); + } + }); + }); + + await queue.upsertJobScheduler( + 'repeat', + { + pattern: '0 1 * * *', + endDate: new Date('2017-05-10 01:00:00'), + }, + { data: { foo: 'bar' } }, + ); + + this.clock.tick(nextTick + delay); + + worker.run(); + + await completing; + await worker.close(); + delayStub.restore(); + }); + }); + describe('when utc option is provided', function () { it('repeats once a day for 5 days', async function () { this.timeout(8000);