diff --git a/kong/clustering/rpc/socket.lua b/kong/clustering/rpc/socket.lua index d73620709c41..ca8a80d622cd 100644 --- a/kong/clustering/rpc/socket.lua +++ b/kong/clustering/rpc/socket.lua @@ -149,6 +149,7 @@ function _M:start() -- invoke ngx_log(ngx_DEBUG, "[rpc] got RPC call: ", payload.method, " (id: ", payload.id, ")") + local dispatch_cb = self.manager.callbacks.callbacks[payload.method] if not dispatch_cb then local res, err = self.outgoing:push(new_error(payload.id, jsonrpc.METHOD_NOT_FOUND)) diff --git a/kong/clustering/services/sync/hooks.lua b/kong/clustering/services/sync/hooks.lua index 8ae44ca062b7..80d892ced627 100644 --- a/kong/clustering/services/sync/hooks.lua +++ b/kong/clustering/services/sync/hooks.lua @@ -59,7 +59,7 @@ function _M:notify_all_nodes() return end - ngx_log(ngx_DEBUG, "[rpc:sync] notifying all nodes of new version: ", latest_version) + ngx_log(ngx_DEBUG, "[kong.sync.v2] notifying all nodes of new version: ", latest_version) local msg = { default = { new_version = latest_version, }, } @@ -114,7 +114,8 @@ end -- only control plane has these delta operations function _M:register_dao_hooks() local function is_db_export(name) - ngx_log(ngx_DEBUG, "[rpc:sync] name: ", name, " db_export: ", kong.db[name].schema.db_export) + ngx_log(ngx_DEBUG, "[kong.sync.v2] name: ", name, " db_export: ", kong.db[name].schema.db_export) + local db_export = kong.db[name].schema.db_export return db_export == nil or db_export == true end @@ -134,7 +135,8 @@ function _M:register_dao_hooks() return end - ngx_log(ngx_DEBUG, "[rpc:sync] failed. Canceling ", name) + ngx_log(ngx_DEBUG, "[kong.sync.v2] failed. Canceling ", name) + local res, err = self.strategy:cancel_txn() if not res then ngx_log(ngx_ERR, "unable to cancel cancel_txn: ", tostring(err)) @@ -146,7 +148,8 @@ function _M:register_dao_hooks() return entity end - ngx_log(ngx_DEBUG, "[rpc:sync] new delta due to writing ", name) + ngx_log(ngx_DEBUG, "[kong.sync.v2] new delta due to writing ", name) + return self:entity_delta_writer(entity, name, options, ws_id) end @@ -155,7 +158,8 @@ function _M:register_dao_hooks() return entity end - ngx_log(ngx_DEBUG, "[rpc:sync] new delta due to deleting ", name) + ngx_log(ngx_DEBUG, "[kong.sync.v2] new delta due to deleting ", name) + -- set lmdb value to ngx_null then return row return self:entity_delta_writer(entity, name, options, ws_id, true) end @@ -180,6 +184,21 @@ function _M:register_dao_hooks() ["dao:upsert:pre"] = pre_hook_func, ["dao:upsert:fail"] = fail_hook_func, ["dao:upsert:post"] = post_hook_writer_func, + + -- dao:upsert_by + ["dao:upsert_by:pre"] = pre_hook_func, + ["dao:upsert_by:fail"] = fail_hook_func, + ["dao:upsert_by:post"] = post_hook_writer_func, + + -- dao:delete_by + ["dao:delete_by:pre"] = pre_hook_func, + ["dao:delete_by:fail"] = fail_hook_func, + ["dao:delete_by:post"] = post_hook_delete_func, + + -- dao:update_by + ["dao:update_by:pre"] = pre_hook_func, + ["dao:update_by:fail"] = fail_hook_func, + ["dao:update_by:post"] = post_hook_writer_func, } for ev, func in pairs(dao_hooks) do diff --git a/kong/db/dao/init.lua b/kong/db/dao/init.lua index a20cb6813e7c..b064bf126124 100644 --- a/kong/db/dao/init.lua +++ b/kong/db/dao/init.lua @@ -814,12 +814,14 @@ local function generate_foreign_key_methods(schema) local entity_to_update, rbw_entity, err, err_t = check_update(self, unique_value, entity, options, name) if not entity_to_update then + run_hook("dao:update_by:fail", err_t, entity_to_update, self.schema.name, options) return nil, err, err_t end local row, err_t = self.strategy:update_by_field(name, unique_value, entity_to_update, options) if not row then + run_hook("dao:update_by:fail", err_t, entity_to_update, self.schema.name, options) return nil, tostring(err_t), err_t end @@ -869,12 +871,14 @@ local function generate_foreign_key_methods(schema) local row, err_t = self.strategy:upsert_by_field(name, unique_value, entity_to_upsert, options) if not row then + run_hook("dao:upsert_by:fail", err_t, entity_to_upsert, self.schema.name, options) return nil, tostring(err_t), err_t end local ws_id = row.ws_id row, err, err_t = self:row_to_entity(row, options) if not row then + run_hook("dao:upsert_by:fail", err_t, entity_to_upsert, self.schema.name, options) return nil, err, err_t end @@ -928,9 +932,11 @@ local function generate_foreign_key_methods(schema) local rows_affected rows_affected, err_t = self.strategy:delete_by_field(name, unique_value, options) if err_t then + run_hook("dao:delete_by:fail", err_t, entity, self.schema.name, options) return nil, tostring(err_t), err_t elseif not rows_affected then + run_hook("dao:delete_by:post", nil, self.schema.name, options, entity.ws_id, nil) return nil end diff --git a/spec/02-integration/09-hybrid_mode/01-sync_spec.lua b/spec/02-integration/09-hybrid_mode/01-sync_spec.lua index f434f7c37531..48b60395027c 100644 --- a/spec/02-integration/09-hybrid_mode/01-sync_spec.lua +++ b/spec/02-integration/09-hybrid_mode/01-sync_spec.lua @@ -12,9 +12,13 @@ local uuid = require("kong.tools.uuid").uuid local KEY_AUTH_PLUGIN -for _, inc_sync in ipairs { "on", "off" } do +for _, inc_sync in ipairs { "on", "off" } do for _, strategy in helpers.each_strategy() do +--- XXX FIXME: enable inc_sync = on +-- skips the rest of the tests. We will fix them in a follow-up PR +local skip_inc_sync = inc_sync == "on" and pending or describe + describe("CP/DP communication #" .. strategy .. " inc_sync=" .. inc_sync, function() lazy_setup(function() @@ -623,11 +627,7 @@ describe("CP/DP #version check #" .. strategy, function() end) end) ---- XXX FIXME: enable inc_sync = on --- skips the rest of the tests. We will fix them in a follow-up PR -local skip_inc_sync = inc_sync == "on" and pending or describe - -skip_inc_sync("CP/DP config sync #" .. strategy, function() +describe("CP/DP config sync #" .. strategy, function() lazy_setup(function() helpers.get_db_utils(strategy) -- runs migrations @@ -776,12 +776,12 @@ skip_inc_sync("CP/DP labels #" .. strategy, function() describe("status API", function() it("shows DP status", function() - helpers.wait_until(function() - local admin_client = helpers.admin_client() - finally(function() - admin_client:close() - end) + local admin_client = helpers.admin_client() + finally(function() + admin_client:close() + end) + helpers.wait_until(function() local res = assert(admin_client:get("/clustering/data-planes")) local body = assert.res_status(200, res) local json = cjson.decode(body)