From 1aabebd53a694b6aa8934c856866c5995b320066 Mon Sep 17 00:00:00 2001 From: roggervalf Date: Wed, 8 Jan 2025 22:12:52 -0500 Subject: [PATCH] refactor: only update nextMillis if producerId matches --- src/commands/updateJobScheduler-6.lua | 34 +++++++++++++-------------- 1 file changed, 16 insertions(+), 18 deletions(-) diff --git a/src/commands/updateJobScheduler-6.lua b/src/commands/updateJobScheduler-6.lua index b2a303ff67..883d4af1d7 100644 --- a/src/commands/updateJobScheduler-6.lua +++ b/src/commands/updateJobScheduler-6.lua @@ -27,6 +27,7 @@ local timestamp = ARGV[4] local nextMillis = ARGV[1] local jobSchedulerId = ARGV[2] local prefixKey = ARGV[5] +local producerId = ARGV[7] -- Includes --- @include "includes/addDelayedJob" @@ -39,31 +40,28 @@ local nextDelayedJobKey = schedulerKey .. ":" .. nextMillis -- Validate that scheduler exists. local prevMillis = rcall("ZSCORE", repeatKey, jobSchedulerId) if prevMillis ~= false then - local schedulerAttributes = rcall("HMGET", schedulerKey, "name", "data") + local currentDelayedJobId = "repeat:" .. jobSchedulerId .. ":" .. prevMillis - rcall("ZADD", repeatKey, nextMillis, jobSchedulerId) + if producerId == currentDelayedJobId then + local schedulerAttributes = rcall("HMGET", schedulerKey, "name", "data") - local eventsKey = KEYS[5] - local metaKey = KEYS[2] - local maxEvents = getOrSetMaxEvents(metaKey) + rcall("ZADD", repeatKey, nextMillis, jobSchedulerId) - rcall("INCR", KEYS[3]) + local eventsKey = KEYS[5] + local metaKey = KEYS[2] + local maxEvents = getOrSetMaxEvents(metaKey) - local delayedOpts = cmsgpack.unpack(ARGV[3]) - local producerId = ARGV[7] + rcall("INCR", KEYS[3]) - if producerId then - local currentDelayedJobId = "repeat:" .. jobSchedulerId .. ":" .. prevMillis + local delayedOpts = cmsgpack.unpack(ARGV[3]) - if producerId == currentDelayedJobId then - addDelayedJob(nextDelayedJobKey, nextDelayedJobId, delayedKey, eventsKey, schedulerAttributes[1], - schedulerAttributes[2] or "{}", delayedOpts, timestamp, jobSchedulerId, maxEvents, KEYS[1], nil, nil) + addDelayedJob(nextDelayedJobKey, nextDelayedJobId, delayedKey, eventsKey, schedulerAttributes[1], + schedulerAttributes[2] or "{}", delayedOpts, timestamp, jobSchedulerId, maxEvents, KEYS[1], nil, nil) - if ARGV[6] ~= "" then - rcall("HSET", ARGV[6], "nrjid", nextDelayedJobId) - end - - return nextDelayedJobId .. "" -- convert to string + if ARGV[6] ~= "" then + rcall("HSET", ARGV[6], "nrjid", nextDelayedJobId) end + + return nextDelayedJobId .. "" -- convert to string end end