diff --git a/kong/tools/queue.lua b/kong/tools/queue.lua index d632cca8112e..725db7ddbf14 100644 --- a/kong/tools/queue.lua +++ b/kong/tools/queue.lua @@ -95,16 +95,106 @@ local Queue_mt = { } -local function make_queue_key(name) +local function _make_queue_key(name) return (workspaces.get_workspace_id() or "") .. "." .. name end +local function _remaining_capacity(self) + local remaining_entries = self.max_entries - self:count() + local max_bytes = self.max_bytes + + -- we enqueue entries one by one, + -- so it is impossible to have a negative value + assert(remaining_entries >= 0, "queue should not be over capacity") + + if not max_bytes then + return remaining_entries + end + + local remaining_bytes = max_bytes - self.bytes_queued + + -- we check remaining_bytes before enqueueing an entry, + -- so it is impossible to have a negative value + assert(remaining_bytes >= 0, "queue should not be over capacity") + + return remaining_entries, remaining_bytes +end + + +local function _is_reaching_max_entries(self) + -- `()` is used to get the first return value only + return (_remaining_capacity(self)) == 0 +end + + +local function _will_exceed_max_entries(self) + -- `()` is used to get the first return value only + return (_remaining_capacity(self)) - 1 < 0 +end + + +local function _is_entry_too_large(self, entry) + local max_bytes = self.max_bytes + + if not max_bytes then + return false + end + + if type(entry) ~= "string" then + -- handle non-string entry, including `nil` + return false + end + + return #entry > max_bytes +end + + +local function _is_reaching_max_bytes(self) + if not self.max_bytes then + return false + end + + local _, remaining_bytes = _remaining_capacity(self) + return remaining_bytes == 0 +end + + +local function _will_exceed_max_bytes(self, entry) + if not self.max_bytes then + return false + end + + if type(entry) ~= "string" then + -- handle non-string entry, including `nil` + return false + end + + local _, remaining_bytes = _remaining_capacity(self) + return #entry > remaining_bytes +end + + +local function _is_full(self) + return _is_reaching_max_entries(self) or _is_reaching_max_bytes(self) +end + + +local function _can_enqueue(self, entry) + return not ( + _is_full(self) or + _is_entry_too_large(self, entry) or + _will_exceed_max_entries(self) or + _will_exceed_max_bytes(self, entry) + ) +end + + local queues = {} function Queue.exists(name) - return queues[make_queue_key(name)] and true or false + return queues[_make_queue_key(name)] and true or false end ------------------------------------------------------------------------------- @@ -115,7 +205,7 @@ end local function get_or_create_queue(queue_conf, handler, handler_conf) local name = assert(queue_conf.name) - local key = make_queue_key(name) + local key = _make_queue_key(name) local queue = queues[key] if queue then @@ -193,6 +283,28 @@ function Queue:count() end +function Queue.is_full(queue_conf) + local queue = queues[_make_queue_key(queue_conf.name)] + if not queue then + -- treat non-existing queues as not full as they will be created on demand + return false + end + + return _is_full(queue) +end + + +function Queue.can_enqueue(queue_conf, entry) + local queue = queues[_make_queue_key(queue_conf.name)] + if not queue then + -- treat non-existing queues as not full as they will be created on demand + return false + end + + return _can_enqueue(queue, entry) +end + + -- Delete the frontmost entry from the queue and adjust the current utilization variables. function Queue:delete_frontmost_entry() if self.max_bytes then @@ -260,9 +372,9 @@ function Queue:process_once() for _ = 1, entry_count do self:delete_frontmost_entry() end - if self.queue_full then + if self.already_dropped_entries then self:log_info('queue resumed processing') - self.queue_full = false + self.already_dropped_entries = false end local start_time = now() @@ -393,38 +505,54 @@ local function enqueue(self, entry) self.warned = nil end - if self:count() == self.max_entries then - if not self.queue_full then - self.queue_full = true - self:log_err("queue full, dropping old entries until processing is successful again") - end + if _is_reaching_max_entries(self) then + self:log_err("queue full, dropping old entries until processing is successful again") self:drop_oldest_entry() + self.already_dropped_entries = true end + if _is_entry_too_large(self, entry) then + local err_msg = string.format( + "string to be queued is longer (%d bytes) than the queue's max_bytes (%d bytes)", + #entry, + self.max_bytes + ) + self:log_err(err_msg) + + return nil, err_msg + end + + if _will_exceed_max_bytes(self, entry) then + local dropped = 0 + + repeat + self:drop_oldest_entry() + dropped = dropped + 1 + self.already_dropped_entries = true + until not _will_exceed_max_bytes(self, entry) + + self:log_err("byte capacity exceeded, %d queue entries were dropped", dropped) + end + + -- safety guard + -- The queue should not be full if we are running into this situation. + -- Since the dropping logic is complicated, + -- further maintenancers might introduce bugs, + -- so I added this assertion to detect this kind of bug early. + -- It's better to crash early than leak memory + -- as analyze memory leak is hard. + assert( + -- assert that enough space is available on the queue now + _can_enqueue(self, entry), + "queue should not be full after dropping entries" + ) + if self.max_bytes then if type(entry) ~= "string" then self:log_err("queuing non-string entry to a queue that has queue.max_bytes set, capacity monitoring will not be correct") - else - if #entry > self.max_bytes then - local message = string.format( - "string to be queued is longer (%d bytes) than the queue's max_bytes (%d bytes)", - #entry, self.max_bytes) - self:log_err(message) - return nil, message - end - - local dropped = 0 - while self:count() > 0 and (self.bytes_queued + #entry) > self.max_bytes do - self:drop_oldest_entry() - dropped = dropped + 1 - end - if dropped > 0 then - self.queue_full = true - self:log_err("byte capacity exceeded, %d queue entries were dropped", dropped) - end - - self.bytes_queued = self.bytes_queued + #entry end + + self.bytes_queued = self.bytes_queued + #entry end self.entries[self.back] = entry @@ -442,11 +570,41 @@ function Queue.enqueue(queue_conf, handler, handler_conf, value) assert(type(handler) == "function", "arg #2 (handler) must be a function") assert(handler_conf == nil or type(handler_conf) == "table", - "arg #3 (handler_conf) must be a table") - + "arg #3 (handler_conf) must be a table or nil") assert(type(queue_conf.name) == "string", "arg #1 (queue_conf) must include a name") + assert( + type(queue_conf.max_batch_size) == "number", + "arg #1 (queue_conf) max_batch_size must be a number" + ) + assert( + type(queue_conf.max_coalescing_delay) == "number", + "arg #1 (queue_conf) max_coalescing_delay must be a number" + ) + assert( + type(queue_conf.max_entries) == "number", + "arg #1 (queue_conf) max_entries must be a number" + ) + assert( + type(queue_conf.max_retry_time) == "number", + "arg #1 (queue_conf) max_retry_time must be a number" + ) + assert( + type(queue_conf.initial_retry_delay) == "number", + "arg #1 (queue_conf) initial_retry_delay must be a number" + ) + assert( + type(queue_conf.max_retry_delay) == "number", + "arg #1 (queue_conf) max_retry_delay must be a number" + ) + + local max_bytes_type = type(queue_conf.max_bytes) + assert( + max_bytes_type == "nil" or max_bytes_type == "number", + "arg #1 (queue_conf) max_bytes must be a number or nil" + ) + local queue = get_or_create_queue(queue_conf, handler, handler_conf) return enqueue(queue, value) end @@ -454,12 +612,14 @@ end -- For testing, the _exists() function is provided to allow a test to wait for the -- queue to have been completely processed. function Queue._exists(name) - local queue = queues[make_queue_key(name)] + local queue = queues[_make_queue_key(name)] return queue and queue:count() > 0 end +-- [[ For testing purposes only Queue._CAPACITY_WARNING_THRESHOLD = CAPACITY_WARNING_THRESHOLD +-- ]] return Queue diff --git a/spec/01-unit/27-queue_spec.lua b/spec/01-unit/27-queue_spec.lua index 8960d5076198..ec166c295a4e 100644 --- a/spec/01-unit/27-queue_spec.lua +++ b/spec/01-unit/27-queue_spec.lua @@ -788,4 +788,80 @@ describe("plugin queue", function() assert.match_re(log_messages, 'WARN \\[\\] queue continue-processing: handler could not process entries: .*: hard error') assert.match_re(log_messages, 'ERR \\[\\] queue continue-processing: could not send entries, giving up after \\d retries. 1 queue entries were lost') end) + + it("sanity check for function Queue.is_full() & Queue.can_enqueue()", function() + local queue_conf = { + name = "queue-full-checking-too-many-entries", + max_batch_size = 99999, -- avoiding automatically flushing, + max_entries = 2, + max_bytes = nil, -- avoiding bytes limit + max_coalescing_delay = 99999, -- avoiding automatically flushing, + max_retry_time = 60, + initial_retry_delay = 1, + max_retry_delay = 60, + } + + local function enqueue(queue_conf, entry) + Queue.enqueue( + queue_conf, + function() + return true + end, + nil, + entry + ) + end + + assert.is_false(Queue.is_full(queue_conf)) + assert.is_false(Queue.can_enqueue(queue_conf, "One")) + enqueue(queue_conf, "One") + assert.is_false(Queue.is_full(queue_conf)) + + assert.is_true(Queue.can_enqueue(queue_conf, "Two")) + enqueue(queue_conf, "Two") + assert.is_true(Queue.is_full(queue_conf)) + + assert.is_false(Queue.can_enqueue(queue_conf, "Three")) + + + queue_conf = { + name = "queue-full-checking-too-many-bytes", + max_batch_size = 99999, -- avoiding automatically flushing, + max_entries = 99999, -- big enough to avoid entries limit + max_bytes = 2, + max_coalescing_delay = 99999, -- avoiding automatically flushing, + max_retry_time = 60, + initial_retry_delay = 1, + max_retry_delay = 60, + } + + assert.is_false(Queue.is_full(queue_conf)) + assert.is_false(Queue.can_enqueue(queue_conf, "1")) + enqueue(queue_conf, "1") + assert.is_false(Queue.is_full(queue_conf)) + + assert.is_true(Queue.can_enqueue(queue_conf, "2")) + enqueue(queue_conf, "2") + assert.is_true(Queue.is_full(queue_conf)) + + assert.is_false(Queue.can_enqueue(queue_conf, "3")) + + queue_conf = { + name = "queue-full-checking-too-large-entry", + max_batch_size = 99999, -- avoiding automatically flushing, + max_entries = 99999, -- big enough to avoid entries limit + max_bytes = 3, + max_coalescing_delay = 99999, -- avoiding automatically flushing, + max_retry_time = 60, + initial_retry_delay = 1, + max_retry_delay = 60, + } + + enqueue(queue_conf, "1") + + assert.is_false(Queue.is_full(queue_conf)) + assert.is_true(Queue.can_enqueue(queue_conf, "1")) + assert.is_true(Queue.can_enqueue(queue_conf, "11")) + assert.is_false(Queue.can_enqueue(queue_conf, "111")) + end) end)