diff --git a/src/classes/worker.ts b/src/classes/worker.ts index b79ddf4648..81052cec8c 100644 --- a/src/classes/worker.ts +++ b/src/classes/worker.ts @@ -835,14 +835,14 @@ will never work with more accuracy than 1ms. */ }); const handleCompleted = async (result: ResultType) => { - jobsInProgress.delete(inProgressItem); - if (!this.connection.closing) { const completed = await job.moveToCompleted( result, token, fetchNextCallback() && !(this.closing || this.paused), ); + jobsInProgress.delete(inProgressItem); + this.emit('completed', job, result, 'active'); span?.addEvent('job completed', { @@ -857,8 +857,6 @@ will never work with more accuracy than 1ms. */ }; const handleFailed = async (err: Error) => { - jobsInProgress.delete(inProgressItem); - if (!this.connection.closing) { try { // Check if the job was manually rate-limited @@ -877,6 +875,8 @@ will never work with more accuracy than 1ms. */ } const result = await job.moveToFailed(err, token, true); + jobsInProgress.delete(inProgressItem); + this.emit('failed', job, err, 'active'); span?.addEvent('job failed', { @@ -911,6 +911,8 @@ will never work with more accuracy than 1ms. */ const failed = await handleFailed(err); return failed; } finally { + jobsInProgress.delete(inProgressItem); + span?.setAttributes({ [TelemetryAttributes.JobFinishedTimestamp]: Date.now(), [TelemetryAttributes.JobProcessedTimestamp]: processedOn, @@ -1121,6 +1123,9 @@ will never work with more accuracy than 1ms. */ for (const item of jobsInProgress) { const { job, ts } = item; + + // In theory ts should always be defined here so this + // check should not be necessary. if (!ts) { item.ts = now; continue; @@ -1141,7 +1146,7 @@ will never work with more accuracy than 1ms. */ } this.startLockExtenderTimer(jobsInProgress); - }, this.opts.lockRenewTime / 2); + }, this.opts.lockRenewTime / 10); } } } @@ -1207,8 +1212,6 @@ will never work with more accuracy than 1ms. */ ); for (const jobId of erroredJobIds) { - // TODO: Send signal to process function that the job has been lost. - this.emit( 'error', new Error(`could not renew lock for job ${jobId}`), diff --git a/tests/test_worker.ts b/tests/test_worker.ts index 0b0fecc633..beb9d52958 100644 --- a/tests/test_worker.ts +++ b/tests/test_worker.ts @@ -1984,7 +1984,7 @@ describe('workers', function () { connection, prefix, lockDuration: 1000, - lockRenewTime: 3000, // The lock will not be updated in time + lockRenewTime: 15000, // The lock will not be updated in time }, ); await worker.waitUntilReady();