Skip to content

Commit

Permalink
fix(event_hooks): fixed an issue where event_hooks entities don't wor…
Browse files Browse the repository at this point in the history
…k in Data Planes (#10147)

* add a module variable initialized to avoid register worker_event callback multiple times

* fix(tests): add http call to provide license to CP

* Update kong/enterprise_edition/event_hooks.lua

* update: use pagination.page_size

---------

Co-authored-by: Xiaochen Wang <wangxiaochen0@gmail.com>
Co-authored-by: Yufu Zhao <ms2008vip@gmail.com>
  • Loading branch information
3 people authored Sep 24, 2024
1 parent 3b68c1e commit 0751591
Show file tree
Hide file tree
Showing 4 changed files with 347 additions and 67 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
message: 'Fixed an issue where EventHooks is not working in Data Planes.'
type: bugfix
scope: Clustering
164 changes: 106 additions & 58 deletions kong/enterprise_edition/event_hooks.lua
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,14 @@ local balancer = require "kong.runloop.balancer"

local fmt = string.format
local ipairs = ipairs
local pairs = pairs
local ngx_null = ngx.null
local md5 = ngx.md5
local hmac_sha1 = ngx.hmac_sha1
local timer_at = ngx.timer.at

local GLOBAL_QUERY_OPTS = { workspace = ngx_null, show_ws_id = true }

local QUEUE_OPTS = {
max_batch_size = 1,
max_coalescing_delay = 0,
Expand All @@ -43,6 +46,7 @@ QUEUE_OPTS.max_bytes = nil
local template

local _M = {}
local initialized = false

local events = {}

Expand Down Expand Up @@ -86,8 +90,8 @@ end


_M.publish = function(source, event, opts)
if not _M.enabled() then
return
if not source then
return nil
end

if not events[source] then
Expand Down Expand Up @@ -127,10 +131,15 @@ _M.register = function(entity)
return
end

local callback = _M.callback(entity)
local source = entity.source
local event = entity.event ~= ngx_null and entity.event or nil
local callback = references[entity.id]

if callback then
kong.worker_events.unregister(callback, prefix(source), event)
end

callback = _M.callback(entity)
references[entity.id] = callback

return kong.worker_events.register(callback, prefix(source), event)
Expand All @@ -152,6 +161,16 @@ _M.unregister = function(entity)
end


_M.unregister_all = function()
if not _M.enabled() then
return
end

for _, callback in pairs(references) do
kong.worker_events.unregister(callback)
end
end

local function field_digest(source, event, data)
local fields = events[source] and events[source][event] and
events[source][event].unique
Expand Down Expand Up @@ -302,6 +321,22 @@ _M.callback = function(entity)
return wrap
end

_M.reconfigure = function()
_M.unregister_all()

local page_size
if kong.db.event_hooks.pagination then
page_size = kong.db.event_hooks.pagination.page_size
end

for entity, err in kong.db.event_hooks:each(page_size, GLOBAL_QUERY_OPTS) do
if err then
kong.log.err(err)
else
_M.register(entity)
end
end
end

_M.test = function(entity, data)
-- Get an unwrapped callback, since we want it sync
Expand All @@ -326,74 +361,87 @@ _M.register_events = function(events_handler)
return
end

local dao_adapter = function(data)
return {
entity = data.entity,
old_entity = data.old_entity,
schema = data.schema and data.schema.name,
operation = data.operation,
}
end
-- publish all kong events
local operations = { "create", "update", "delete" }
for _, op in ipairs(operations) do
_M.publish("dao:crud", op, {
fields = { "operation", "entity", "old_entity", "schema" },
adapter = dao_adapter,
})
end
for name, _ in pairs(kong.db.daos) do
_M.publish("crud", name, {
fields = { "operation", "entity", "old_entity", "schema" },
adapter = dao_adapter,
})
if not initialized then
local dao_adapter = function(data)
return {
entity = data.entity,
old_entity = data.old_entity,
schema = data.schema and data.schema.name,
operation = data.operation,
}
end

-- publish all kong events
local operations = { "create", "update", "delete" }
for _, op in ipairs(operations) do
_M.publish("crud", name .. ":" .. op, {
_M.publish("dao:crud", op, {
fields = { "operation", "entity", "old_entity", "schema" },
adapter = dao_adapter,
})
end
end

events_handler.register(function(data, event, source, pid)
local ok, err = _M.emit(source, event, dao_adapter(data))
if not ok then
kong.log.warn("failed to emit event: ", err)
end
end, "crud")
for name, _ in pairs(kong.db.daos) do
_M.publish("crud", name, {
fields = { "operation", "entity", "old_entity", "schema" },
adapter = dao_adapter,
})

events_handler.register(function(data, event, source, pid)
local ok, err = _M.emit(source, event, dao_adapter(data))
if not ok then
kong.log.warn("failed to emit event: ", err)
end
end, "dao:crud")

events_handler.register(_M.crud, "crud", "event_hooks")

-- register a callback to trigger an event_hook balanacer health
-- event
balancer.subscribe_to_healthcheck_events(function(upstream_id, ip, port, hostname, health)
local ok, err = _M.emit("balancer", "health", {
upstream_id = upstream_id,
ip = ip,
port = port,
hostname = hostname,
health = health,
})
if not ok then
kong.log.warn("failed to emit event: ", err)
for _, op in ipairs(operations) do
_M.publish("crud", name .. ":" .. op, {
fields = { "operation", "entity", "old_entity", "schema" },
adapter = dao_adapter,
})
end
end
end)

_M.publish("balancer", "health", {
fields = { "upstream_id", "ip", "port", "hostname", "health" },
})
events_handler.register(function(data, event, source, pid)
local ok, err = _M.emit(source, event, dao_adapter(data))
if not ok then
kong.log.warn("failed to emit event: ", err)
end
end, "crud")

events_handler.register(function(data, event, source, pid)
local ok, err = _M.emit(source, event, dao_adapter(data))
if not ok then
kong.log.warn("failed to emit event: ", err)
end
end, "dao:crud")

events_handler.register(_M.crud, "crud", "event_hooks")

events_handler.register(_M.reconfigure, "declarative", "reconfigure")

-- register a callback to trigger an event_hook balanacer health
-- event
balancer.subscribe_to_healthcheck_events(function(upstream_id, ip, port, hostname, health)
local ok, err = _M.emit("balancer", "health", {
upstream_id = upstream_id,
ip = ip,
port = port,
hostname = hostname,
health = health,
})
if not ok then
kong.log.warn("failed to emit event: ", err)
end
end)

_M.publish("balancer", "health", {
fields = { "upstream_id", "ip", "port", "hostname", "health" },
})
initialized = true
end

-- XXX not so sure this timer is good? the idea is to not hog kong
-- on startup for this secondary feature
timer_at(0, function()
for entity, err in kong.db.event_hooks:each(1000) do
local page_size
if kong.db.event_hooks.pagination then
page_size = kong.db.event_hooks.pagination.page_size
end

for entity, err in kong.db.event_hooks:each(page_size, GLOBAL_QUERY_OPTS) do
if err then
kong.log.err(err)
else
Expand Down
Loading

0 comments on commit 0751591

Please sign in to comment.