From c5a8d1d3ebaa3d141fd789fba9e9b3c8085654aa Mon Sep 17 00:00:00 2001 From: Mikhail Popov Date: Mon, 26 Aug 2024 14:30:41 +0300 Subject: [PATCH 1/6] added not_check_session feature --- xqueue.lua | 43 +++++++++++++++++++++++++++++++++++++------ 1 file changed, 37 insertions(+), 6 deletions(-) diff --git a/xqueue.lua b/xqueue.lua index c3c803e..d9cf07a 100644 --- a/xqueue.lua +++ b/xqueue.lua @@ -124,6 +124,9 @@ Interface: -- if number, then with default ttl, otherwise only if set during put/release ttr = true|number, -- requires `runat` field -- if number, then with default ttl, otherwise only if set + + not_check_session = true, -- if true, not check session on ack/bury/release and not release task on disconnect + -- requires 'ttr' }, }) @@ -245,6 +248,7 @@ local methods = {} ---@field ttr boolean|number ---@field ttl_default number? ---@field ttr_default number? +---@field not_check_session boolean ---@class xq:table ---@field NEVER integer (Default: 0) @@ -304,6 +308,7 @@ local methods = {} ---Requires runat field and index. ---@field ttr? boolean|number should xqueue allow Time-To-Release on tasks. When specified with number, this value used for ttl_default. ---Requires runat field and index. +---@field not_check_session? boolean should xqueue check session. Requires enabled ttr feature. ---@class xqueue.upgrade.options ---@field format? boxSpaceFormat @@ -745,6 +750,18 @@ function M.upgrade(space,opts,depth) features.ttr = false end + if opts.features.not_check_session then + -- feature not_check_session require ttr because in this case + -- task isn't released on disconnect so to prevent the task from being frozen in the queue we need ttr + if not features.ttr then + error(string.format("feature not_check_session requires enabled ttr" ),2+depth) + end + + features.not_check_session = true + else + features.not_check_session = false + end + if fields.tube then features.tube = true end @@ -1003,6 +1020,10 @@ function M.upgrade(space,opts,depth) if not self.taken[key] then error(string.format( "Task %s not taken by any", key ),2) end + -- if not need to check session that return task + if xq.not_check_session then + return t + end if self.taken[key] ~= box.session.id() then error(string.format( "Task %s taken by %d. Not you (%d)", key, self.taken[key], box.session.id() ),2) end @@ -1149,16 +1170,26 @@ function M.upgrade(space,opts,depth) local old = self.bysid[sid] while next(old) do for key,realkey in pairs(old) do - self.taken[key] = nil + -- if xq with check session we release task on disconnect + -- so need clear from taken + if not xq.features.not_check_session then + self.taken[key] = nil + end old[key] = nil local t = space:get(realkey) if t then if t[ self.fields.status ] == 'T' then - self:wakeup(space:update({ realkey }, { - { '=',self.fields.status,'R' }, - self.have_runat and { '=', self.fields.runat, self.NEVER } or nil - })) - log.info("Rst: T->R {%s}", realkey ) + -- if not check session we doesn't release task + -- it can be ack/bury/release from another connection + if xq.features.not_check_session then + log.info("Rst: task %s taken by another session %s", realkey, sid) + else + self:wakeup(space:update({ realkey }, { + { '=',self.fields.status,'R' }, + self.have_runat and { '=', self.fields.runat, self.NEVER } or nil + })) + log.info("Rst: T->R {%s}", realkey ) + end else log.error( "Rst: %s->? {%s}: wrong status", t[self.fields.status], realkey ) end From 3d7181ad7241be3f7c2117c09d8b337f9a1631a3 Mon Sep 17 00:00:00 2001 From: Mikhail Popov Date: Mon, 26 Aug 2024 18:28:40 +0300 Subject: [PATCH 2/6] fix --- xqueue.lua | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/xqueue.lua b/xqueue.lua index d9cf07a..5d891d0 100644 --- a/xqueue.lua +++ b/xqueue.lua @@ -960,8 +960,12 @@ function M.upgrade(space,opts,depth) { '=',xq.fields.runat, xq.NEVER } }) xq.taken[ key ] = nil - if sid then + if self.bysid[ sid ] then self.bysid[ sid ][ key ] = nil + elseif self.features.not_check_session then + log.info("Runat: task {%s} marked as taken by sid=%s but bysid already is null after disconnect", key, sid) + else + log.error("Runat: task {%s} marked as taken by sid=%s but bysid is null", key, sid) end xq:wakeup(u) else @@ -1021,7 +1025,7 @@ function M.upgrade(space,opts,depth) error(string.format( "Task %s not taken by any", key ),2) end -- if not need to check session that return task - if xq.not_check_session then + if self.features.not_check_session then return t end if self.taken[key] ~= box.session.id() then @@ -1037,8 +1041,10 @@ function M.upgrade(space,opts,depth) self.taken[ key ] = nil if self.bysid[ sid ] then self.bysid[ sid ][ key ] = nil + elseif self.features.not_check_session then + log.info("Task {%s} marked as taken by sid=%s but bysid already is null after disconnect", key, sid) else - log.error( "Task {%s} marked as taken by sid=%s but bysid is null", key, sid) + log.error("Task {%s} marked as taken by sid=%s but bysid is null", key, sid) end else log.error( "Task {%s} not marked as taken, untake by sid=%s", key, box.session.id() ) @@ -1172,7 +1178,7 @@ function M.upgrade(space,opts,depth) for key,realkey in pairs(old) do -- if xq with check session we release task on disconnect -- so need clear from taken - if not xq.features.not_check_session then + if not self.features.not_check_session then self.taken[key] = nil end old[key] = nil @@ -1181,8 +1187,8 @@ function M.upgrade(space,opts,depth) if t[ self.fields.status ] == 'T' then -- if not check session we doesn't release task -- it can be ack/bury/release from another connection - if xq.features.not_check_session then - log.info("Rst: task %s taken by another session %s", realkey, sid) + if self.features.not_check_session then + log.info("Rst: task %s taken by session %s not released", realkey, sid) else self:wakeup(space:update({ realkey }, { { '=',self.fields.status,'R' }, From 7ace6a4197e43aab774e8293e724c7ca5aa92f0c Mon Sep 17 00:00:00 2001 From: Mikhail Popov Date: Mon, 26 Aug 2024 18:31:04 +0300 Subject: [PATCH 3/6] fix comment --- xqueue.lua | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/xqueue.lua b/xqueue.lua index 5d891d0..ae68e4d 100644 --- a/xqueue.lua +++ b/xqueue.lua @@ -308,7 +308,7 @@ local methods = {} ---Requires runat field and index. ---@field ttr? boolean|number should xqueue allow Time-To-Release on tasks. When specified with number, this value used for ttl_default. ---Requires runat field and index. ----@field not_check_session? boolean should xqueue check session. Requires enabled ttr feature. +---@field not_check_session? boolean should xqueue not check session. Requires enabled ttr feature. ---@class xqueue.upgrade.options ---@field format? boxSpaceFormat From a0b0d7fb38f6ff861309d4eabe36f6eb1e418dbf Mon Sep 17 00:00:00 2001 From: Mikhail Popov Date: Thu, 26 Sep 2024 19:33:23 +0300 Subject: [PATCH 4/6] fix --- README.md | 3 +++ xqueue.lua | 36 ++++++++++++++++++------------------ 2 files changed, 21 insertions(+), 18 deletions(-) diff --git a/README.md b/README.md index 1c029ea..08252de 100644 --- a/README.md +++ b/README.md @@ -80,6 +80,9 @@ M.upgrade(space, { ttr = true|number, -- requires `runat` field -- Time To Release. Task is returned into [R]eady unless processed (turned to ack|release from taken) within time -- if number, then with default ttl, otherwise only if set during take + + not_check_session = true, -- requires 'ttr' + -- if true, not check session on ack/bury/release and not release task on disconnect }, -- Set tubes for which statistics collector will be enabled tube_stats = { 'tube-1', 'tube-2' }, diff --git a/xqueue.lua b/xqueue.lua index ae68e4d..cce3414 100644 --- a/xqueue.lua +++ b/xqueue.lua @@ -758,6 +758,16 @@ function M.upgrade(space,opts,depth) end features.not_check_session = true + + setmetatable(self.bysid, { + __serialize = 'map', + __newindex = function(_, _, _) end, + __index = function(_, _) return {} end, + }) + setmetatable(self.taken, { + __serialize = 'map', + __newindex = function(_, _, _) end, + }) else features.not_check_session = false end @@ -882,11 +892,11 @@ function M.upgrade(space,opts,depth) if not r then log.error("Worker for {%s} has error: %s", key, e) else - if xq.taken[ key ] then + if xq.taken[ key ] or self.features.not_check_session then space:ack(task) end end - if xq.taken[ key ] then + if xq.taken[ key ] and not self.features.not_check_session then log.error("Worker for {%s} not released task", key) space:release(task) end @@ -960,12 +970,8 @@ function M.upgrade(space,opts,depth) { '=',xq.fields.runat, xq.NEVER } }) xq.taken[ key ] = nil - if self.bysid[ sid ] then + if sid then self.bysid[ sid ][ key ] = nil - elseif self.features.not_check_session then - log.info("Runat: task {%s} marked as taken by sid=%s but bysid already is null after disconnect", key, sid) - else - log.error("Runat: task {%s} marked as taken by sid=%s but bysid is null", key, sid) end xq:wakeup(u) else @@ -1021,13 +1027,13 @@ function M.upgrade(space,opts,depth) if not t then error(string.format( "Task {%s} was not found", key ),2) end - if not self.taken[key] then - error(string.format( "Task %s not taken by any", key ),2) - end -- if not need to check session that return task if self.features.not_check_session then return t end + if not self.taken[key] then + error(string.format( "Task %s not taken by any", key ),2) + end if self.taken[key] ~= box.session.id() then error(string.format( "Task %s taken by %d. Not you (%d)", key, self.taken[key], box.session.id() ),2) end @@ -1041,12 +1047,10 @@ function M.upgrade(space,opts,depth) self.taken[ key ] = nil if self.bysid[ sid ] then self.bysid[ sid ][ key ] = nil - elseif self.features.not_check_session then - log.info("Task {%s} marked as taken by sid=%s but bysid already is null after disconnect", key, sid) else log.error("Task {%s} marked as taken by sid=%s but bysid is null", key, sid) end - else + elseif not self.features.not_check_session then log.error( "Task {%s} not marked as taken, untake by sid=%s", key, box.session.id() ) end @@ -1176,11 +1180,7 @@ function M.upgrade(space,opts,depth) local old = self.bysid[sid] while next(old) do for key,realkey in pairs(old) do - -- if xq with check session we release task on disconnect - -- so need clear from taken - if not self.features.not_check_session then - self.taken[key] = nil - end + self.taken[key] = nil old[key] = nil local t = space:get(realkey) if t then From 5966af51593ef907c54e6300aa3cbea0c33f6af0 Mon Sep 17 00:00:00 2001 From: Mikhail Popov Date: Thu, 26 Sep 2024 19:36:54 +0300 Subject: [PATCH 5/6] fix condition in worker --- xqueue.lua | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/xqueue.lua b/xqueue.lua index cce3414..fc28b18 100644 --- a/xqueue.lua +++ b/xqueue.lua @@ -896,7 +896,7 @@ function M.upgrade(space,opts,depth) space:ack(task) end end - if xq.taken[ key ] and not self.features.not_check_session then + if xq.taken[ key ] then log.error("Worker for {%s} not released task", key) space:release(task) end From f942352eb7169edf10d95dc8b83dd42bb4642d4e Mon Sep 17 00:00:00 2001 From: Mikhail Popov Date: Sat, 28 Sep 2024 00:05:22 +0300 Subject: [PATCH 6/6] added tests --- test/network_test.lua | 144 +++++++++++++++++++++++++++++++++++++++ test/replicaset_test.lua | 54 ++++++++++++++- 2 files changed, 197 insertions(+), 1 deletion(-) diff --git a/test/network_test.lua b/test/network_test.lua index 19d7506..5f731f8 100644 --- a/test/network_test.lua +++ b/test/network_test.lua @@ -87,3 +87,147 @@ function g.test_untake() tt:close() end + +function g.test_not_untake_with_not_check_session() + local queue = box.schema.space.create('queue', { if_not_exists = true }) --[[@as xqueue.space]] + queue:format({ + { name = 'id', type = 'unsigned' }, + { name = 'status', type = 'string' }, + { name = 'runat', type = 'number' }, + { name = 'payload', type = 'any' }, + }) + local F = { id = 1, status = 2, runat = 3, payload = 4 } + + queue:create_index('primary', { parts = {'id'} }) + queue:create_index('status', { parts = {'status', 'id'} }) + queue:create_index('runat', { parts = {'runat', 'id'} }) + + xqueue.upgrade(queue, { + features = { + id = 'time64', + delayed = true, + retval = 'table', + not_check_session = true, + ttr = 60, + }, + fields = { + status = 'status', + runat = 'runat', + }, + }) + + local tt = netbox.connect('127.0.0.1:3301', { wait_connected = true }) + t.assert(tt:ping(), "connected to self") + + local task = queue:put({ payload = { time = clock.time() } }) + t.assert(task, ":put() has been inserted task") + + local awaiter_fin = fiber.channel() + + fiber.create(function() + local ret, is_processed = queue:wait(task, 3) + t.assert_equals(ret.id, task.id, "Task has been awaited") + t.assert_equals(is_processed, true, "Task has been processed by the consumer") + t.assert(awaiter_fin:put({ ret, is_processed }), "awaiter results were measured") + end) + + local taken = tt:call('box.space.queue:take', {1}, { timeout = 1 }) + t.assert(taken, ":take() returned task via the network") + t.assert_equals(task.id, taken.id, "retutned the same task") + t.assert(box.space.queue.xq.taken, nil, "taken table is empty") + t.assert(box.space.queue.xq.bysid, nil, "bysid table is empty") + + -- not untake on close connection + tt:close() + fiber.sleep(1) + + t.assert_equals(queue:get({ task.id }).status, 'T', "task still in T status") + + tt = netbox.connect('127.0.0.1:3301', { wait_connected = true }) + + local processed_at = clock.time() + local acked = tt:call('box.space.queue:ack', {taken, { update = {{'=', F.payload, { processed_at = processed_at }}} }}, {timeout = 1}) + t.assert_equals(acked[1], taken.id, ":ack() returned taken but completed task") + + local awaiter_res = awaiter_fin:get() + t.assert_equals(awaiter_res[1].id, acked[1], "awaiter saw acknowledged task") + t.assert_equals(awaiter_res[2], true, "awaiter saw task as processed") + + tt:close() +end + +function g.test_untake_with_not_check_session_by_ttr() + local queue = box.schema.space.create('queue', { if_not_exists = true }) --[[@as xqueue.space]] + queue:format({ + { name = 'id', type = 'unsigned' }, + { name = 'status', type = 'string' }, + { name = 'runat', type = 'number' }, + { name = 'payload', type = 'any' }, + }) + local F = { id = 1, status = 2, runat = 3, payload = 4 } + + queue:create_index('primary', { parts = {'id'} }) + queue:create_index('status', { parts = {'status', 'id'} }) + queue:create_index('runat', { parts = {'runat', 'id'} }) + + xqueue.upgrade(queue, { + features = { + id = 'time64', + delayed = true, + retval = 'table', + not_check_session = true, + ttr = 5, + }, + fields = { + status = 'status', + runat = 'runat', + }, + }) + + local tt = netbox.connect('127.0.0.1:3301', { wait_connected = true }) + t.assert(tt:ping(), "connected to self") + + local task = queue:put({ payload = { time = clock.time() } }) + t.assert(task, ":put() has been inserted task") + + local awaiter_fin = fiber.channel() + + local taken = tt:call('box.space.queue:take', {1}, { timeout = 1 }) + t.assert(taken, ":take() returned task via the network") + t.assert_equals(task.id, taken.id, "retutned the same task") + t.assert(queue.xq.taken, nil, "taken table is empty") + t.assert(queue.xq.bysid, nil, "bysid table is empty") + + fiber.create(function() + local ret, is_processed = queue:wait(task, 7) + t.assert_equals(ret.id, task.id, "Task has been awaited") + t.assert_equals(is_processed, true, "Task has been processed by the consumer") + t.assert(awaiter_fin:put({ ret, is_processed }), "awaiter results were measured") + end) + + -- not untake on close connection + tt:close() + t.assert_equals(queue:get({ task.id }).status, 'T', "task still in T status") + + -- untake by ttr + fiber.sleep(5) + t.assert_equals(queue:get({ task.id }).status, 'R', "task was returned to R status") + + tt = netbox.connect('127.0.0.1:3301', { wait_connected = true }) + + taken = tt:call('box.space.queue:take', {1}, { timeout = 1 }) + t.assert(taken, ":take() returned task via the network (2nd)") + t.assert_equals(task.id, taken.id, "retutned the same task (2nd)") + t.assert(queue.xq.taken, nil, "taken table is empty") + t.assert(queue.xq.bysid, nil, "bysid table is empty") + + local processed_at = clock.time() + local acked = tt:call('box.space.queue:ack', {taken, { update = {{'=', F.payload, { processed_at = processed_at }}} }}, {timeout = 1}) + t.assert_equals(acked[1], taken.id, ":ack() returned taken but completed task") + + local awaiter_res = awaiter_fin:get() + t.assert_equals(awaiter_res[1].id, acked[1], "awaiter saw acknowledged task") + t.assert_equals(awaiter_res[2], true, "awaiter saw task as processed") + + tt:close() +end diff --git a/test/replicaset_test.lua b/test/replicaset_test.lua index 90fd564..cedf8ab 100644 --- a/test/replicaset_test.lua +++ b/test/replicaset_test.lua @@ -189,6 +189,29 @@ function g.test_start(test) retval = 'table', } --[[@as xqueue.upgrade.options]] + local queue_not_check_session = { + name = 'queue_not_check_session', + format = { + { name = 'id', type = 'string' }, + { name = 'status', type = 'string' }, + { name = 'runat', type = 'number' }, + { name = 'payload', type = 'any' }, + }, + features = { + id = 'uuid', + keep = false, + delayed = true, + not_check_session = true, + ttr = 60, + }, + fields = { + id = 'id', + status = 'status', + runat = 'runat', + }, + retval = 'table', + } --[[@as xqueue.upgrade.options]] + local had_error local await = {} for _, srv in pairs(replica_set.servers) do @@ -200,6 +223,12 @@ function g.test_start(test) had_error = err log.error("%s: %s", srv.alias, err) end + + local ok, err = pcall(srv.exec, srv, setup_queue, {queue_not_check_session}, { timeout = 20 }) + if not ok then + had_error = err + log.error("%s: %s", srv.alias, err) + end end) table.insert(await, fib) end @@ -214,6 +243,13 @@ function g.test_start(test) local task = rw:call('box.space.simpleq:put', {{payload = {cookie = cookie}}, { delay = 5 } }) t.assert(task, "task was returned") + local task_not_check_session = rw:call('box.space.queue_not_check_session:put', {{payload = {}} }) + t.assert(task_not_check_session, "task was returned") + + local taken = rw:call('box.space.queue_not_check_session:take', { 1 }, { timeout = 1 }) + t.assert(taken, ":take() returned task from master") + t.assert_equals(task_not_check_session.id, taken.id, "returned the same task") + local SWITCH_TIMEOUT = 10 if params.name == 'raft' then @@ -274,8 +310,24 @@ function g.test_start(test) t.assert_equals(trw.alias, rw.alias, "after rw-switch luatest succesfully derived new leader") end + local awaiter_fin = fiber.channel() + + fiber.create(function() + local ret, is_processed + ret, is_processed = rw:call('box.space.queue_not_check_session:wait', { task_not_check_session, 1 }, { timeout = 1 }) + t.assert_equals(ret.id, task_not_check_session.id, "Task has been awaited") + t.assert_equals(is_processed, true, "Task has been processed by the consumer") + t.assert(awaiter_fin:put({ ret, is_processed }), "awaiter results were measured") + end) + + local acked = rw:call('box.space.queue_not_check_session:ack', { taken }, { timeout = 1 }) + t.assert_equals(acked[1], taken.id, ":ack() returned taken but completed task") + + local awaiter_res = awaiter_fin:get() + t.assert_equals(awaiter_res[1].id, acked[1], "awaiter saw acknowledged task") + t.assert_equals(awaiter_res[2], true, "awaiter saw task as processed") + local task = rw:call('box.space.simpleq:take', { 5 }, { timeout = 6 }) t.assert(task, "delayed task has been succesfully taken from new leader") t.assert_equals(task.payload.cookie, cookie, "task.cookie is valid") - end