From 47b898e115237b92f4d61b10947a35b3c368a36b Mon Sep 17 00:00:00 2001 From: chronolaw Date: Sun, 3 Nov 2024 14:35:31 +0800 Subject: [PATCH 1/5] try_connect in rpc --- kong/clustering/rpc/manager.lua | 16 +++++++++++++--- kong/init.lua | 11 +---------- 2 files changed, 14 insertions(+), 13 deletions(-) diff --git a/kong/clustering/rpc/manager.lua b/kong/clustering/rpc/manager.lua index 7881b1661ffe..85383c150176 100644 --- a/kong/clustering/rpc/manager.lua +++ b/kong/clustering/rpc/manager.lua @@ -274,6 +274,18 @@ function _M:handle_websocket() end +function _M:try_connect(reconnection_delay) + ngx.timer.at(reconnection_delay or 0, function(premature) + self:connect(premature, + "control_plane", -- node_id + self.conf.cluster_control_plane, -- host + "/v2/outlet", -- path + self.cluster_cert.cdata, + self.cluster_cert_key) + end) +end + + function _M:connect(premature, node_id, host, path, cert, key) if premature then return @@ -350,9 +362,7 @@ function _M:connect(premature, node_id, host, path, cert, key) ::err:: if not exiting() then - ngx.timer.at(reconnection_delay, function(premature) - self:connect(premature, node_id, host, path, cert, key) - end) + self:try_connect(reconnection_delay) end end diff --git a/kong/init.lua b/kong/init.lua index e76f67e3f9d7..c271cc44eb3a 100644 --- a/kong/init.lua +++ b/kong/init.lua @@ -999,17 +999,8 @@ function Kong.init_worker() -- rpc and incremental sync if kong.rpc and is_http_module then - -- only available in http subsystem - local cluster_tls = require("kong.clustering.tls") - if is_data_plane(kong.configuration) then - ngx.timer.at(0, function(premature) - kong.rpc:connect(premature, - "control_plane", kong.configuration.cluster_control_plane, - "/v2/outlet", - cluster_tls.get_cluster_cert(kong.configuration).cdata, - cluster_tls.get_cluster_cert_key(kong.configuration)) - end) + kong.rpc:try_connect() else -- control_plane kong.rpc.concentrator:start() From 4d719d3ae7dbfdc364c71dbc2feae26ba59cb216 Mon Sep 17 00:00:00 2001 From: chronolaw Date: Sun, 3 Nov 2024 14:41:16 +0800 Subject: [PATCH 2/5] init_worker in rpc --- kong/clustering/rpc/manager.lua | 12 ++++++++++++ kong/init.lua | 8 ++------ 2 files changed, 14 insertions(+), 6 deletions(-) diff --git a/kong/clustering/rpc/manager.lua b/kong/clustering/rpc/manager.lua index 85383c150176..cbfd91478017 100644 --- a/kong/clustering/rpc/manager.lua +++ b/kong/clustering/rpc/manager.lua @@ -286,6 +286,18 @@ function _M:try_connect(reconnection_delay) end +function _M:init_worker() + if self.conf.role == "data_plane" then + -- data_plane will try to connect to cp + self:try_connect() + + else + -- control_plane + self.concentrator:start() + end +end + + function _M:connect(premature, node_id, host, path, cert, key) if premature then return diff --git a/kong/init.lua b/kong/init.lua index c271cc44eb3a..04e14b52070d 100644 --- a/kong/init.lua +++ b/kong/init.lua @@ -999,12 +999,8 @@ function Kong.init_worker() -- rpc and incremental sync if kong.rpc and is_http_module then - if is_data_plane(kong.configuration) then - kong.rpc:try_connect() - - else -- control_plane - kong.rpc.concentrator:start() - end + -- rpc init connection + kong.rpc:init_worker() -- init incremental sync if kong.sync then From ff4af7c72e3269bf0c38c99d05ac9de003a9c38e Mon Sep 17 00:00:00 2001 From: chronolaw Date: Sun, 3 Nov 2024 14:46:02 +0800 Subject: [PATCH 3/5] socket.node_id --- kong/clustering/rpc/manager.lua | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/kong/clustering/rpc/manager.lua b/kong/clustering/rpc/manager.lua index cbfd91478017..206b4cbf8670 100644 --- a/kong/clustering/rpc/manager.lua +++ b/kong/clustering/rpc/manager.lua @@ -57,14 +57,16 @@ end function _M:_add_socket(socket, capabilities_list) - local sockets = self.clients[socket.node_id] + local node_id = socket.node_id + + local sockets = self.clients[node_id] if not sockets then - assert(self.concentrator:_enqueue_subscribe(socket.node_id)) + assert(self.concentrator:_enqueue_subscribe(node_id)) sockets = setmetatable({}, { __mode = "k", }) - self.clients[socket.node_id] = sockets + self.clients[node_id] = sockets end - self.client_capabilities[socket.node_id] = { + self.client_capabilities[node_id] = { set = pl_tablex_makeset(capabilities_list), list = capabilities_list, } From b99b605898d4b1e48fafc0623e30665693fcb760 Mon Sep 17 00:00:00 2001 From: chronolaw Date: Sun, 3 Nov 2024 14:55:03 +0800 Subject: [PATCH 4/5] init.lua --- kong/init.lua | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/kong/init.lua b/kong/init.lua index 04e14b52070d..28cc18ba124d 100644 --- a/kong/init.lua +++ b/kong/init.lua @@ -997,12 +997,15 @@ function Kong.init_worker() if kong.clustering then -- rpc and incremental sync - if kong.rpc and is_http_module then + if is_http_module then - -- rpc init connection - kong.rpc:init_worker() + -- init rpc connection + if kong.rpc then + kong.rpc:init_worker() + end -- init incremental sync + -- should run after rpc init successfully if kong.sync then kong.sync:init_worker() end From ab2358c64093957fdce436a524e66687b367e413 Mon Sep 17 00:00:00 2001 From: chronolaw Date: Tue, 5 Nov 2024 11:05:01 +0800 Subject: [PATCH 5/5] style clean --- kong/init.lua | 26 ++++++++++++-------------- 1 file changed, 12 insertions(+), 14 deletions(-) diff --git a/kong/init.lua b/kong/init.lua index 28cc18ba124d..8a1520f663ea 100644 --- a/kong/init.lua +++ b/kong/init.lua @@ -886,7 +886,7 @@ function Kong.init_worker() if kong.clustering then -- full sync dp - + local is_dp_full_sync_agent = process.type() == "privileged agent" and not kong.sync if is_control_plane(kong.configuration) or -- CP needs to support both full and incremental sync @@ -894,7 +894,7 @@ function Kong.init_worker() then kong.clustering:init_worker() end - + -- DP full sync agent skips the rest of the init_worker if is_dp_full_sync_agent then return @@ -995,20 +995,18 @@ function Kong.init_worker() plugin_servers.start() end - if kong.clustering then - -- rpc and incremental sync - if is_http_module then + -- rpc and incremental sync + if is_http_module then - -- init rpc connection - if kong.rpc then - kong.rpc:init_worker() - end + -- init rpc connection + if kong.rpc then + kong.rpc:init_worker() + end - -- init incremental sync - -- should run after rpc init successfully - if kong.sync then - kong.sync:init_worker() - end + -- init incremental sync + -- should run after rpc init successfully + if kong.sync then + kong.sync:init_worker() end end