From 02169bfc0be0d13e00cbf729597b061101eef09c Mon Sep 17 00:00:00 2001 From: chronolaw Date: Tue, 5 Nov 2024 09:44:31 +0800 Subject: [PATCH 1/6] refactor(clustering/rpc): dp does not need concentrator --- kong/clustering/rpc/manager.lua | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/kong/clustering/rpc/manager.lua b/kong/clustering/rpc/manager.lua index 7881b1661ffe..ceaa781507b9 100644 --- a/kong/clustering/rpc/manager.lua +++ b/kong/clustering/rpc/manager.lua @@ -5,7 +5,6 @@ local _MT = { __index = _M, } local server = require("resty.websocket.server") local client = require("resty.websocket.client") local socket = require("kong.clustering.rpc.socket") -local concentrator = require("kong.clustering.rpc.concentrator") local future = require("kong.clustering.rpc.future") local utils = require("kong.clustering.rpc.utils") local callbacks = require("kong.clustering.rpc.callbacks") @@ -42,7 +41,6 @@ function _M.new(conf, node_id) -- clients[node_id]: { socket1 => true, socket2 => true, ... } clients = {}, client_capabilities = {}, - client_ips = {}, -- store DP node's ip addr node_id = node_id, conf = conf, cluster_cert = assert(clustering_tls.get_cluster_cert(conf)), @@ -50,7 +48,10 @@ function _M.new(conf, node_id) callbacks = callbacks.new(), } - self.concentrator = concentrator.new(self, kong.db) + if conf.role == "control_plane" then + self.concentrator = require("kong.clustering.rpc.concentrator").new(self, kong.db) + self.client_ips = {}, -- store DP node's ip addr + end return setmetatable(self, _MT) end @@ -59,7 +60,10 @@ end function _M:_add_socket(socket, capabilities_list) local sockets = self.clients[socket.node_id] if not sockets then - assert(self.concentrator:_enqueue_subscribe(socket.node_id)) + if self.concentrator then + assert(self.concentrator:_enqueue_subscribe(socket.node_id)) + end + sockets = setmetatable({}, { __mode = "k", }) self.clients[socket.node_id] = sockets end @@ -87,7 +91,10 @@ function _M:_remove_socket(socket) self.clients[node_id] = nil self.client_ips[node_id] = nil self.client_capabilities[node_id] = nil - assert(self.concentrator:_enqueue_unsubscribe(node_id)) + + if self.concentrator then + assert(self.concentrator:_enqueue_unsubscribe(node_id)) + end end end From 187a0ae774ed0ace0d46ca78a5aa0a2fe9025ece Mon Sep 17 00:00:00 2001 From: chronolaw Date: Tue, 5 Nov 2024 09:47:41 +0800 Subject: [PATCH 2/6] lint fix --- kong/clustering/rpc/manager.lua | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kong/clustering/rpc/manager.lua b/kong/clustering/rpc/manager.lua index ceaa781507b9..2b9ac482817c 100644 --- a/kong/clustering/rpc/manager.lua +++ b/kong/clustering/rpc/manager.lua @@ -50,7 +50,7 @@ function _M.new(conf, node_id) if conf.role == "control_plane" then self.concentrator = require("kong.clustering.rpc.concentrator").new(self, kong.db) - self.client_ips = {}, -- store DP node's ip addr + self.client_ips = {} -- store DP node's ip addr end return setmetatable(self, _MT) From a2937d4cb6d9cadd98d617cec6e8f2f1f8c1381d Mon Sep 17 00:00:00 2001 From: chronolaw Date: Tue, 5 Nov 2024 09:48:24 +0800 Subject: [PATCH 3/6] fix --- kong/clustering/rpc/manager.lua | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kong/clustering/rpc/manager.lua b/kong/clustering/rpc/manager.lua index 2b9ac482817c..22837235cedf 100644 --- a/kong/clustering/rpc/manager.lua +++ b/kong/clustering/rpc/manager.lua @@ -89,10 +89,10 @@ function _M:_remove_socket(socket) if table_isempty(sockets) then self.clients[node_id] = nil - self.client_ips[node_id] = nil self.client_capabilities[node_id] = nil if self.concentrator then + self.client_ips[node_id] = nil assert(self.concentrator:_enqueue_unsubscribe(node_id)) end end From 1f9d54fcd5f301c95ada1ff5d539aa4e701761c1 Mon Sep 17 00:00:00 2001 From: chronolaw Date: Tue, 5 Nov 2024 09:50:11 +0800 Subject: [PATCH 4/6] check self.client_ips --- kong/clustering/rpc/manager.lua | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/kong/clustering/rpc/manager.lua b/kong/clustering/rpc/manager.lua index 22837235cedf..b91882068f3c 100644 --- a/kong/clustering/rpc/manager.lua +++ b/kong/clustering/rpc/manager.lua @@ -266,7 +266,9 @@ function _M:handle_websocket() self:_add_socket(s, rpc_capabilities) -- store DP's ip addr - self.client_ips[node_id] = ngx_var.remote_addr + if self.client_ips then + self.client_ips[node_id] = ngx_var.remote_addr + end s:start() local res, err = s:join() From 3d9a76454e027929a957221b97e43ec8b47817c5 Mon Sep 17 00:00:00 2001 From: chronolaw Date: Tue, 5 Nov 2024 09:53:39 +0800 Subject: [PATCH 5/6] Revert "check self.client_ips" This reverts commit 1f9d54fcd5f301c95ada1ff5d539aa4e701761c1. --- kong/clustering/rpc/manager.lua | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/kong/clustering/rpc/manager.lua b/kong/clustering/rpc/manager.lua index b91882068f3c..22837235cedf 100644 --- a/kong/clustering/rpc/manager.lua +++ b/kong/clustering/rpc/manager.lua @@ -266,9 +266,7 @@ function _M:handle_websocket() self:_add_socket(s, rpc_capabilities) -- store DP's ip addr - if self.client_ips then - self.client_ips[node_id] = ngx_var.remote_addr - end + self.client_ips[node_id] = ngx_var.remote_addr s:start() local res, err = s:join() From cd54ad7d3ef4b7c1013cac65239c4ce659a28bf0 Mon Sep 17 00:00:00 2001 From: chronolaw Date: Tue, 5 Nov 2024 10:18:58 +0800 Subject: [PATCH 6/6] check self.concentrator --- kong/clustering/rpc/manager.lua | 3 +++ 1 file changed, 3 insertions(+) diff --git a/kong/clustering/rpc/manager.lua b/kong/clustering/rpc/manager.lua index 22837235cedf..0fd55a268080 100644 --- a/kong/clustering/rpc/manager.lua +++ b/kong/clustering/rpc/manager.lua @@ -114,6 +114,9 @@ function _M:_find_node_and_check_capability(node_id, cap) return "local" end + -- now we are on cp side + assert(self.concentrator) + -- does concentrator knows more about this client? local res, err = kong.db.clustering_data_planes:select({ id = node_id }) if err then