Skip to content

Commit

Permalink
fix(clustering/rpc): add missing dao hooks of upsert/delete/update_by (
Browse files Browse the repository at this point in the history
…#13819)

And also emit more debug logs

---------

Co-authored-by: Chrono <chrono_cpp@me.com>
  • Loading branch information
StarlightIbuki and chronolaw authored Nov 5, 2024
1 parent 6cb8235 commit a44e59d
Show file tree
Hide file tree
Showing 5 changed files with 64 additions and 12 deletions.
18 changes: 18 additions & 0 deletions kong/clustering/rpc/manager.lua
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ local cjson = require("cjson.safe")

local ngx_var = ngx.var
local ngx_ERR = ngx.ERR
local ngx_DEBUG = ngx.DEBUG
local ngx_log = ngx.log
local ngx_exit = ngx.exit
local ngx_time = ngx.time
Expand Down Expand Up @@ -172,12 +173,22 @@ function _M:call(node_id, method, ...)

local params = {...}

ngx_log(ngx_DEBUG,
"[rpc] calling ", method,
"(node_id: ", node_id, ")",
" via ", res == "local" and "local" or "concentrator"
)

if res == "local" then
res, err = self:_local_call(node_id, method, params)

if not res then
ngx_log(ngx_DEBUG, "[rpc] ", method, " failed, err: ", err)
return nil, err
end

ngx_log(ngx_DEBUG, "[rpc] ", method, " succeeded")

return res
end

Expand All @@ -188,14 +199,21 @@ function _M:call(node_id, method, ...)
assert(fut:start())

local ok, err = fut:wait(5)

if err then
ngx_log(ngx_DEBUG, "[rpc] ", method, " failed, err: ", err)

return nil, err
end

if ok then
ngx_log(ngx_DEBUG, "[rpc] ", method, " succeeded")

return fut.result
end

ngx_log(ngx_DEBUG, "[rpc] ", method, " failed, err: ", fut.error.message)

return nil, fut.error.message
end

Expand Down
2 changes: 2 additions & 0 deletions kong/clustering/rpc/socket.lua
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ function _M:start()
if payload.method then
-- invoke

ngx_log(ngx_DEBUG, "[rpc] got RPC call: ", payload.method, " (id: ", payload.id, ")")

local dispatch_cb = self.manager.callbacks.callbacks[payload.method]
if not dispatch_cb then
local res, err = self.outgoing:push(new_error(payload.id, jsonrpc.METHOD_NOT_FOUND))
Expand Down
28 changes: 27 additions & 1 deletion kong/clustering/services/sync/hooks.lua
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ function _M:notify_all_nodes()
return
end

ngx_log(ngx_DEBUG, "[kong.sync.v2] notifying all nodes of new version: ", latest_version)

local msg = { default = { new_version = latest_version, }, }

for _, node in ipairs(get_all_nodes_with_sync_cap()) do
Expand Down Expand Up @@ -113,6 +115,9 @@ end
function _M:register_dao_hooks()
local function is_db_export(name)
local db_export = kong.db[name].schema.db_export

ngx_log(ngx_DEBUG, "[kong.sync.v2] name: ", name, " db_export: ", db_export)

return db_export == nil or db_export == true
end

Expand All @@ -131,6 +136,8 @@ function _M:register_dao_hooks()
return
end

ngx_log(ngx_DEBUG, "[kong.sync.v2] failed. Canceling ", name)

local res, err = self.strategy:cancel_txn()
if not res then
ngx_log(ngx_ERR, "unable to cancel cancel_txn: ", tostring(err))
Expand All @@ -142,6 +149,8 @@ function _M:register_dao_hooks()
return entity
end

ngx_log(ngx_DEBUG, "[kong.sync.v2] new delta due to writing ", name)

return self:entity_delta_writer(entity, name, options, ws_id)
end

Expand All @@ -150,7 +159,9 @@ function _M:register_dao_hooks()
return entity
end

-- set lmdb value to ngx_null then return entity
ngx_log(ngx_DEBUG, "[kong.sync.v2] new delta due to deleting ", name)

-- set lmdb value to ngx_null then return row
return self:entity_delta_writer(entity, name, options, ws_id, true)
end

Expand All @@ -174,6 +185,21 @@ function _M:register_dao_hooks()
["dao:upsert:pre"] = pre_hook_func,
["dao:upsert:fail"] = fail_hook_func,
["dao:upsert:post"] = post_hook_writer_func,

-- dao:upsert_by
["dao:upsert_by:pre"] = pre_hook_func,
["dao:upsert_by:fail"] = fail_hook_func,
["dao:upsert_by:post"] = post_hook_writer_func,

-- dao:delete_by
["dao:delete_by:pre"] = pre_hook_func,
["dao:delete_by:fail"] = fail_hook_func,
["dao:delete_by:post"] = post_hook_delete_func,

-- dao:update_by
["dao:update_by:pre"] = pre_hook_func,
["dao:update_by:fail"] = fail_hook_func,
["dao:update_by:post"] = post_hook_writer_func,
}

for ev, func in pairs(dao_hooks) do
Expand Down
6 changes: 6 additions & 0 deletions kong/db/dao/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -814,12 +814,14 @@ local function generate_foreign_key_methods(schema)
local entity_to_update, rbw_entity, err, err_t = check_update(self, unique_value,
entity, options, name)
if not entity_to_update then
run_hook("dao:update_by:fail", err_t, entity_to_update, self.schema.name, options)
return nil, err, err_t
end

local row, err_t = self.strategy:update_by_field(name, unique_value,
entity_to_update, options)
if not row then
run_hook("dao:update_by:fail", err_t, entity_to_update, self.schema.name, options)
return nil, tostring(err_t), err_t
end

Expand Down Expand Up @@ -869,12 +871,14 @@ local function generate_foreign_key_methods(schema)
local row, err_t = self.strategy:upsert_by_field(name, unique_value,
entity_to_upsert, options)
if not row then
run_hook("dao:upsert_by:fail", err_t, entity_to_upsert, self.schema.name, options)
return nil, tostring(err_t), err_t
end

local ws_id = row.ws_id
row, err, err_t = self:row_to_entity(row, options)
if not row then
run_hook("dao:upsert_by:fail", err_t, entity_to_upsert, self.schema.name, options)
return nil, err, err_t
end

Expand Down Expand Up @@ -928,9 +932,11 @@ local function generate_foreign_key_methods(schema)
local rows_affected
rows_affected, err_t = self.strategy:delete_by_field(name, unique_value, options)
if err_t then
run_hook("dao:delete_by:fail", err_t, entity, self.schema.name, options)
return nil, tostring(err_t), err_t

elseif not rows_affected then
run_hook("dao:delete_by:post", nil, self.schema.name, options, entity.ws_id, nil)
return nil
end

Expand Down
22 changes: 11 additions & 11 deletions spec/02-integration/09-hybrid_mode/01-sync_spec.lua
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,13 @@ local uuid = require("kong.tools.uuid").uuid
local KEY_AUTH_PLUGIN


for _, inc_sync in ipairs { "on", "off" } do
for _, inc_sync in ipairs { "on", "off" } do
for _, strategy in helpers.each_strategy() do

--- XXX FIXME: enable inc_sync = on
-- skips the rest of the tests. We will fix them in a follow-up PR
local skip_inc_sync = inc_sync == "on" and pending or describe

describe("CP/DP communication #" .. strategy .. " inc_sync=" .. inc_sync, function()

lazy_setup(function()
Expand Down Expand Up @@ -623,11 +627,7 @@ describe("CP/DP #version check #" .. strategy, function()
end)
end)

--- XXX FIXME: enable inc_sync = on
-- skips the rest of the tests. We will fix them in a follow-up PR
local skip_inc_sync = inc_sync == "on" and pending or describe

skip_inc_sync("CP/DP config sync #" .. strategy, function()
describe("CP/DP config sync #" .. strategy, function()
lazy_setup(function()
helpers.get_db_utils(strategy) -- runs migrations

Expand Down Expand Up @@ -776,12 +776,12 @@ skip_inc_sync("CP/DP labels #" .. strategy, function()

describe("status API", function()
it("shows DP status", function()
helpers.wait_until(function()
local admin_client = helpers.admin_client()
finally(function()
admin_client:close()
end)
local admin_client = helpers.admin_client()
finally(function()
admin_client:close()
end)

helpers.wait_until(function()
local res = assert(admin_client:get("/clustering/data-planes"))
local body = assert.res_status(200, res)
local json = cjson.decode(body)
Expand Down

1 comment on commit a44e59d

@github-actions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Bazel Build

Docker image available kong/kong:a44e59dc1d848ceb14b18ada757c7def9d635877
Artifacts available https://github.com/Kong/kong/actions/runs/11678714547

Please sign in to comment.