From a44e59dc1d848ceb14b18ada757c7def9d635877 Mon Sep 17 00:00:00 2001 From: Xumin <100666470+StarlightIbuki@users.noreply.github.com> Date: Tue, 5 Nov 2024 14:23:21 +0800 Subject: [PATCH] fix(clustering/rpc): add missing dao hooks of upsert/delete/update_by (#13819) And also emit more debug logs --------- Co-authored-by: Chrono --- kong/clustering/rpc/manager.lua | 18 ++++++++++++ kong/clustering/rpc/socket.lua | 2 ++ kong/clustering/services/sync/hooks.lua | 28 ++++++++++++++++++- kong/db/dao/init.lua | 6 ++++ .../09-hybrid_mode/01-sync_spec.lua | 22 +++++++-------- 5 files changed, 64 insertions(+), 12 deletions(-) diff --git a/kong/clustering/rpc/manager.lua b/kong/clustering/rpc/manager.lua index 7881b1661ffe..5d614997567a 100644 --- a/kong/clustering/rpc/manager.lua +++ b/kong/clustering/rpc/manager.lua @@ -18,6 +18,7 @@ local cjson = require("cjson.safe") local ngx_var = ngx.var local ngx_ERR = ngx.ERR +local ngx_DEBUG = ngx.DEBUG local ngx_log = ngx.log local ngx_exit = ngx.exit local ngx_time = ngx.time @@ -172,12 +173,22 @@ function _M:call(node_id, method, ...) local params = {...} + ngx_log(ngx_DEBUG, + "[rpc] calling ", method, + "(node_id: ", node_id, ")", + " via ", res == "local" and "local" or "concentrator" + ) + if res == "local" then res, err = self:_local_call(node_id, method, params) + if not res then + ngx_log(ngx_DEBUG, "[rpc] ", method, " failed, err: ", err) return nil, err end + ngx_log(ngx_DEBUG, "[rpc] ", method, " succeeded") + return res end @@ -188,14 +199,21 @@ function _M:call(node_id, method, ...) assert(fut:start()) local ok, err = fut:wait(5) + if err then + ngx_log(ngx_DEBUG, "[rpc] ", method, " failed, err: ", err) + return nil, err end if ok then + ngx_log(ngx_DEBUG, "[rpc] ", method, " succeeded") + return fut.result end + ngx_log(ngx_DEBUG, "[rpc] ", method, " failed, err: ", fut.error.message) + return nil, fut.error.message end diff --git a/kong/clustering/rpc/socket.lua b/kong/clustering/rpc/socket.lua index 95ef614df73d..ca8a80d622cd 100644 --- a/kong/clustering/rpc/socket.lua +++ b/kong/clustering/rpc/socket.lua @@ -148,6 +148,8 @@ function _M:start() if payload.method then -- invoke + ngx_log(ngx_DEBUG, "[rpc] got RPC call: ", payload.method, " (id: ", payload.id, ")") + local dispatch_cb = self.manager.callbacks.callbacks[payload.method] if not dispatch_cb then local res, err = self.outgoing:push(new_error(payload.id, jsonrpc.METHOD_NOT_FOUND)) diff --git a/kong/clustering/services/sync/hooks.lua b/kong/clustering/services/sync/hooks.lua index ae7bbbe90620..654ce74f1c59 100644 --- a/kong/clustering/services/sync/hooks.lua +++ b/kong/clustering/services/sync/hooks.lua @@ -59,6 +59,8 @@ function _M:notify_all_nodes() return end + ngx_log(ngx_DEBUG, "[kong.sync.v2] notifying all nodes of new version: ", latest_version) + local msg = { default = { new_version = latest_version, }, } for _, node in ipairs(get_all_nodes_with_sync_cap()) do @@ -113,6 +115,9 @@ end function _M:register_dao_hooks() local function is_db_export(name) local db_export = kong.db[name].schema.db_export + + ngx_log(ngx_DEBUG, "[kong.sync.v2] name: ", name, " db_export: ", db_export) + return db_export == nil or db_export == true end @@ -131,6 +136,8 @@ function _M:register_dao_hooks() return end + ngx_log(ngx_DEBUG, "[kong.sync.v2] failed. Canceling ", name) + local res, err = self.strategy:cancel_txn() if not res then ngx_log(ngx_ERR, "unable to cancel cancel_txn: ", tostring(err)) @@ -142,6 +149,8 @@ function _M:register_dao_hooks() return entity end + ngx_log(ngx_DEBUG, "[kong.sync.v2] new delta due to writing ", name) + return self:entity_delta_writer(entity, name, options, ws_id) end @@ -150,7 +159,9 @@ function _M:register_dao_hooks() return entity end - -- set lmdb value to ngx_null then return entity + ngx_log(ngx_DEBUG, "[kong.sync.v2] new delta due to deleting ", name) + + -- set lmdb value to ngx_null then return row return self:entity_delta_writer(entity, name, options, ws_id, true) end @@ -174,6 +185,21 @@ function _M:register_dao_hooks() ["dao:upsert:pre"] = pre_hook_func, ["dao:upsert:fail"] = fail_hook_func, ["dao:upsert:post"] = post_hook_writer_func, + + -- dao:upsert_by + ["dao:upsert_by:pre"] = pre_hook_func, + ["dao:upsert_by:fail"] = fail_hook_func, + ["dao:upsert_by:post"] = post_hook_writer_func, + + -- dao:delete_by + ["dao:delete_by:pre"] = pre_hook_func, + ["dao:delete_by:fail"] = fail_hook_func, + ["dao:delete_by:post"] = post_hook_delete_func, + + -- dao:update_by + ["dao:update_by:pre"] = pre_hook_func, + ["dao:update_by:fail"] = fail_hook_func, + ["dao:update_by:post"] = post_hook_writer_func, } for ev, func in pairs(dao_hooks) do diff --git a/kong/db/dao/init.lua b/kong/db/dao/init.lua index a20cb6813e7c..b064bf126124 100644 --- a/kong/db/dao/init.lua +++ b/kong/db/dao/init.lua @@ -814,12 +814,14 @@ local function generate_foreign_key_methods(schema) local entity_to_update, rbw_entity, err, err_t = check_update(self, unique_value, entity, options, name) if not entity_to_update then + run_hook("dao:update_by:fail", err_t, entity_to_update, self.schema.name, options) return nil, err, err_t end local row, err_t = self.strategy:update_by_field(name, unique_value, entity_to_update, options) if not row then + run_hook("dao:update_by:fail", err_t, entity_to_update, self.schema.name, options) return nil, tostring(err_t), err_t end @@ -869,12 +871,14 @@ local function generate_foreign_key_methods(schema) local row, err_t = self.strategy:upsert_by_field(name, unique_value, entity_to_upsert, options) if not row then + run_hook("dao:upsert_by:fail", err_t, entity_to_upsert, self.schema.name, options) return nil, tostring(err_t), err_t end local ws_id = row.ws_id row, err, err_t = self:row_to_entity(row, options) if not row then + run_hook("dao:upsert_by:fail", err_t, entity_to_upsert, self.schema.name, options) return nil, err, err_t end @@ -928,9 +932,11 @@ local function generate_foreign_key_methods(schema) local rows_affected rows_affected, err_t = self.strategy:delete_by_field(name, unique_value, options) if err_t then + run_hook("dao:delete_by:fail", err_t, entity, self.schema.name, options) return nil, tostring(err_t), err_t elseif not rows_affected then + run_hook("dao:delete_by:post", nil, self.schema.name, options, entity.ws_id, nil) return nil end diff --git a/spec/02-integration/09-hybrid_mode/01-sync_spec.lua b/spec/02-integration/09-hybrid_mode/01-sync_spec.lua index f434f7c37531..48b60395027c 100644 --- a/spec/02-integration/09-hybrid_mode/01-sync_spec.lua +++ b/spec/02-integration/09-hybrid_mode/01-sync_spec.lua @@ -12,9 +12,13 @@ local uuid = require("kong.tools.uuid").uuid local KEY_AUTH_PLUGIN -for _, inc_sync in ipairs { "on", "off" } do +for _, inc_sync in ipairs { "on", "off" } do for _, strategy in helpers.each_strategy() do +--- XXX FIXME: enable inc_sync = on +-- skips the rest of the tests. We will fix them in a follow-up PR +local skip_inc_sync = inc_sync == "on" and pending or describe + describe("CP/DP communication #" .. strategy .. " inc_sync=" .. inc_sync, function() lazy_setup(function() @@ -623,11 +627,7 @@ describe("CP/DP #version check #" .. strategy, function() end) end) ---- XXX FIXME: enable inc_sync = on --- skips the rest of the tests. We will fix them in a follow-up PR -local skip_inc_sync = inc_sync == "on" and pending or describe - -skip_inc_sync("CP/DP config sync #" .. strategy, function() +describe("CP/DP config sync #" .. strategy, function() lazy_setup(function() helpers.get_db_utils(strategy) -- runs migrations @@ -776,12 +776,12 @@ skip_inc_sync("CP/DP labels #" .. strategy, function() describe("status API", function() it("shows DP status", function() - helpers.wait_until(function() - local admin_client = helpers.admin_client() - finally(function() - admin_client:close() - end) + local admin_client = helpers.admin_client() + finally(function() + admin_client:close() + end) + helpers.wait_until(function() local res = assert(admin_client:get("/clustering/data-planes")) local body = assert.res_status(200, res) local json = cjson.decode(body)