Skip to content

Commit

Permalink
feat(queue): implement functions Quque.is_full & Queue.will_full
Browse files Browse the repository at this point in the history
Generating entries is expensive in some cases,
so we'd better have a way to observe state of the Queue.
  • Loading branch information
ADD-SP committed Jun 5, 2024
1 parent 4adb677 commit a72c8e4
Showing 1 changed file with 152 additions and 27 deletions.
179 changes: 152 additions & 27 deletions kong/tools/queue.lua
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,95 @@ local function make_queue_key(name)
end


local function internal_remaining_capacity(self)
local remaining_entries = self.max_entries - self:count()

-- we enqueue entries one by one,
-- so it is impossible to have a negative value
assert(remaining_entries >= 0, "queue should not be over capacity")

if not self.max_bytes then
return remaining_entries
end

local remaining_bytes = self.max_bytes - self.bytes_queued

-- we checking remaining_bytes before enqueuing an entry,
-- so it is impossible to have a negative value
assert(remaining_bytes >= 0, "queue should not be over capacity")

return remaining_entries, remaining_bytes
end


local function internal_is_reaching_max_entries(self)
local remaining_entries = internal_remaining_capacity(self)
return remaining_entries == 0
end


local function internal_will_exceed_max_entries(self)
local remaining_entries = internal_remaining_capacity(self)
return remaining_entries - 1 < 0
end


local function internal_is_entry_too_large(self, entry)
local max_bytes = self.max_bytes

if max_bytes == nil then
return false
end

if type(entry) ~= "string" then
-- handle non-string entry, including `nil`
return false
end

return #entry > max_bytes
end


local function internal_is_reaching_max_bytes(self)
if self.max_bytes == nil then
return false
end

local _, remaining_bytes = internal_remaining_capacity(self)
assert(type(remaining_bytes) == "number", "remaining_bytes must be a number")
return remaining_bytes == 0
end


local function internal_will_exceed_max_bytes(self, entry)
if not self.max_bytes then
return false
end

if type(entry) ~= "string" then
-- handle non-string entry, including `nil`
return false
end

local _, remaining_bytes = internal_remaining_capacity(self)
assert(type(remaining_bytes) == "number", "remaining_bytes must be a number")
return #entry > remaining_bytes
end


local function internal_is_full(self)
return internal_is_reaching_max_entries(self) or internal_is_reaching_max_bytes(self)
end


local function internal_will_full(self, entry)
return internal_is_full(self) or
internal_is_entry_too_large(self, entry) or
internal_will_exceed_max_entries(self) or
internal_will_exceed_max_bytes(self, entry)
end


local queues = {}


Expand Down Expand Up @@ -193,6 +282,28 @@ function Queue:count()
end


function Queue.is_full(queue_conf)
local queue = queues[make_queue_key(queue_conf.name)]
if not queue then
-- treat non-existing queues as not full as they will be created on demand
return false
end

return internal_is_full(queue)
end


function Queue.will_full(queue_conf, entry)
local queue = queues[make_queue_key(queue_conf.name)]
if not queue then
-- treat non-existing queues as not full as they will be created on demand
return false
end

return internal_will_full(queue, entry)
end


-- Delete the frontmost entry from the queue and adjust the current utilization variables.
function Queue:delete_frontmost_entry()
if self.max_bytes then
Expand Down Expand Up @@ -260,9 +371,9 @@ function Queue:process_once()
for _ = 1, entry_count do
self:delete_frontmost_entry()
end
if self.queue_full then
if self.already_dropped_entries then
self:log_info('queue resumed processing')
self.queue_full = false
self.already_dropped_entries = false
end

local start_time = now()
Expand Down Expand Up @@ -393,38 +504,50 @@ local function enqueue(self, entry)
self.warned = nil
end

if self:count() == self.max_entries then
if not self.queue_full then
self.queue_full = true
self:log_err("queue full, dropping old entries until processing is successful again")
end
if internal_is_reaching_max_entries(self) then
self:log_err("queue full, dropping old entries until processing is successful again")
self:drop_oldest_entry()
self.already_dropped_entries = true
end

if internal_is_entry_too_large(self, entry) then
local err_msg = string.format(
"string to be queued is longer (%d bytes) than the queue's max_bytes (%d bytes)",
#entry,
self.max_bytes
)
self:log_err(err_msg)

return nil, err_msg
end

if internal_will_exceed_max_bytes(self, entry) then
local dropped = 0

repeat
self:drop_oldest_entry()
dropped = dropped + 1
self.already_dropped_entries = true
until not internal_will_exceed_max_bytes(self, entry)

self:log_err("byte capacity exceeded, %d queue entries were dropped", dropped)
end

-- safe guard
-- this should never happen if the dropping logic is correct,
-- but it's better to be safe than memory leaky
assert(
-- bracing the function call to get the first return value only
internal_is_full(self) == false,
"queue should not be full after dropping entries"
)

if self.max_bytes then
if type(entry) ~= "string" then
self:log_err("queuing non-string entry to a queue that has queue.max_bytes set, capacity monitoring will not be correct")
else
if #entry > self.max_bytes then
local message = string.format(
"string to be queued is longer (%d bytes) than the queue's max_bytes (%d bytes)",
#entry, self.max_bytes)
self:log_err(message)
return nil, message
end

local dropped = 0
while self:count() > 0 and (self.bytes_queued + #entry) > self.max_bytes do
self:drop_oldest_entry()
dropped = dropped + 1
end
if dropped > 0 then
self.queue_full = true
self:log_err("byte capacity exceeded, %d queue entries were dropped", dropped)
end

self.bytes_queued = self.bytes_queued + #entry
end

self.bytes_queued = self.bytes_queued + #entry
end

self.entries[self.back] = entry
Expand Down Expand Up @@ -459,7 +582,9 @@ function Queue._exists(name)
end


-- [[ For testing purposes only
Queue._CAPACITY_WARNING_THRESHOLD = CAPACITY_WARNING_THRESHOLD
-- ]]


return Queue

0 comments on commit a72c8e4

Please sign in to comment.