From a72c8e42e6b467d80b2194eab888e40f50d9abdb Mon Sep 17 00:00:00 2001 From: Qi Date: Wed, 5 Jun 2024 12:06:22 +0800 Subject: [PATCH] feat(queue): implement functions `Quque.is_full` & `Queue.will_full` Generating entries is expensive in some cases, so we'd better have a way to observe state of the Queue. --- kong/tools/queue.lua | 179 ++++++++++++++++++++++++++++++++++++------- 1 file changed, 152 insertions(+), 27 deletions(-) diff --git a/kong/tools/queue.lua b/kong/tools/queue.lua index d632cca8112e..102f33a27a3f 100644 --- a/kong/tools/queue.lua +++ b/kong/tools/queue.lua @@ -100,6 +100,95 @@ local function make_queue_key(name) end +local function internal_remaining_capacity(self) + local remaining_entries = self.max_entries - self:count() + + -- we enqueue entries one by one, + -- so it is impossible to have a negative value + assert(remaining_entries >= 0, "queue should not be over capacity") + + if not self.max_bytes then + return remaining_entries + end + + local remaining_bytes = self.max_bytes - self.bytes_queued + + -- we checking remaining_bytes before enqueuing an entry, + -- so it is impossible to have a negative value + assert(remaining_bytes >= 0, "queue should not be over capacity") + + return remaining_entries, remaining_bytes +end + + +local function internal_is_reaching_max_entries(self) + local remaining_entries = internal_remaining_capacity(self) + return remaining_entries == 0 +end + + +local function internal_will_exceed_max_entries(self) + local remaining_entries = internal_remaining_capacity(self) + return remaining_entries - 1 < 0 +end + + +local function internal_is_entry_too_large(self, entry) + local max_bytes = self.max_bytes + + if max_bytes == nil then + return false + end + + if type(entry) ~= "string" then + -- handle non-string entry, including `nil` + return false + end + + return #entry > max_bytes +end + + +local function internal_is_reaching_max_bytes(self) + if self.max_bytes == nil then + return false + end + + local _, remaining_bytes = internal_remaining_capacity(self) + assert(type(remaining_bytes) == "number", "remaining_bytes must be a number") + return remaining_bytes == 0 +end + + +local function internal_will_exceed_max_bytes(self, entry) + if not self.max_bytes then + return false + end + + if type(entry) ~= "string" then + -- handle non-string entry, including `nil` + return false + end + + local _, remaining_bytes = internal_remaining_capacity(self) + assert(type(remaining_bytes) == "number", "remaining_bytes must be a number") + return #entry > remaining_bytes +end + + +local function internal_is_full(self) + return internal_is_reaching_max_entries(self) or internal_is_reaching_max_bytes(self) +end + + +local function internal_will_full(self, entry) + return internal_is_full(self) or + internal_is_entry_too_large(self, entry) or + internal_will_exceed_max_entries(self) or + internal_will_exceed_max_bytes(self, entry) +end + + local queues = {} @@ -193,6 +282,28 @@ function Queue:count() end +function Queue.is_full(queue_conf) + local queue = queues[make_queue_key(queue_conf.name)] + if not queue then + -- treat non-existing queues as not full as they will be created on demand + return false + end + + return internal_is_full(queue) +end + + +function Queue.will_full(queue_conf, entry) + local queue = queues[make_queue_key(queue_conf.name)] + if not queue then + -- treat non-existing queues as not full as they will be created on demand + return false + end + + return internal_will_full(queue, entry) +end + + -- Delete the frontmost entry from the queue and adjust the current utilization variables. function Queue:delete_frontmost_entry() if self.max_bytes then @@ -260,9 +371,9 @@ function Queue:process_once() for _ = 1, entry_count do self:delete_frontmost_entry() end - if self.queue_full then + if self.already_dropped_entries then self:log_info('queue resumed processing') - self.queue_full = false + self.already_dropped_entries = false end local start_time = now() @@ -393,38 +504,50 @@ local function enqueue(self, entry) self.warned = nil end - if self:count() == self.max_entries then - if not self.queue_full then - self.queue_full = true - self:log_err("queue full, dropping old entries until processing is successful again") - end + if internal_is_reaching_max_entries(self) then + self:log_err("queue full, dropping old entries until processing is successful again") self:drop_oldest_entry() + self.already_dropped_entries = true end + if internal_is_entry_too_large(self, entry) then + local err_msg = string.format( + "string to be queued is longer (%d bytes) than the queue's max_bytes (%d bytes)", + #entry, + self.max_bytes + ) + self:log_err(err_msg) + + return nil, err_msg + end + + if internal_will_exceed_max_bytes(self, entry) then + local dropped = 0 + + repeat + self:drop_oldest_entry() + dropped = dropped + 1 + self.already_dropped_entries = true + until not internal_will_exceed_max_bytes(self, entry) + + self:log_err("byte capacity exceeded, %d queue entries were dropped", dropped) + end + + -- safe guard + -- this should never happen if the dropping logic is correct, + -- but it's better to be safe than memory leaky + assert( + -- bracing the function call to get the first return value only + internal_is_full(self) == false, + "queue should not be full after dropping entries" + ) + if self.max_bytes then if type(entry) ~= "string" then self:log_err("queuing non-string entry to a queue that has queue.max_bytes set, capacity monitoring will not be correct") - else - if #entry > self.max_bytes then - local message = string.format( - "string to be queued is longer (%d bytes) than the queue's max_bytes (%d bytes)", - #entry, self.max_bytes) - self:log_err(message) - return nil, message - end - - local dropped = 0 - while self:count() > 0 and (self.bytes_queued + #entry) > self.max_bytes do - self:drop_oldest_entry() - dropped = dropped + 1 - end - if dropped > 0 then - self.queue_full = true - self:log_err("byte capacity exceeded, %d queue entries were dropped", dropped) - end - - self.bytes_queued = self.bytes_queued + #entry end + + self.bytes_queued = self.bytes_queued + #entry end self.entries[self.back] = entry @@ -459,7 +582,9 @@ function Queue._exists(name) end +-- [[ For testing purposes only Queue._CAPACITY_WARNING_THRESHOLD = CAPACITY_WARNING_THRESHOLD +-- ]] return Queue