Skip to content

Commit

Permalink
refactor(clustering/rpc): dp does not need concentrator
Browse files Browse the repository at this point in the history
  • Loading branch information
chronolaw committed Nov 5, 2024
1 parent 5e5256c commit 02169bf
Showing 1 changed file with 12 additions and 5 deletions.
17 changes: 12 additions & 5 deletions kong/clustering/rpc/manager.lua
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ local _MT = { __index = _M, }
local server = require("resty.websocket.server")
local client = require("resty.websocket.client")
local socket = require("kong.clustering.rpc.socket")
local concentrator = require("kong.clustering.rpc.concentrator")
local future = require("kong.clustering.rpc.future")
local utils = require("kong.clustering.rpc.utils")
local callbacks = require("kong.clustering.rpc.callbacks")
Expand Down Expand Up @@ -42,15 +41,17 @@ function _M.new(conf, node_id)
-- clients[node_id]: { socket1 => true, socket2 => true, ... }
clients = {},
client_capabilities = {},
client_ips = {}, -- store DP node's ip addr
node_id = node_id,
conf = conf,
cluster_cert = assert(clustering_tls.get_cluster_cert(conf)),
cluster_cert_key = assert(clustering_tls.get_cluster_cert_key(conf)),
callbacks = callbacks.new(),
}

self.concentrator = concentrator.new(self, kong.db)
if conf.role == "control_plane" then
self.concentrator = require("kong.clustering.rpc.concentrator").new(self, kong.db)
self.client_ips = {}, -- store DP node's ip addr
end

return setmetatable(self, _MT)
end
Expand All @@ -59,7 +60,10 @@ end
function _M:_add_socket(socket, capabilities_list)
local sockets = self.clients[socket.node_id]
if not sockets then
assert(self.concentrator:_enqueue_subscribe(socket.node_id))
if self.concentrator then
assert(self.concentrator:_enqueue_subscribe(socket.node_id))
end

sockets = setmetatable({}, { __mode = "k", })
self.clients[socket.node_id] = sockets
end
Expand Down Expand Up @@ -87,7 +91,10 @@ function _M:_remove_socket(socket)
self.clients[node_id] = nil
self.client_ips[node_id] = nil
self.client_capabilities[node_id] = nil
assert(self.concentrator:_enqueue_unsubscribe(node_id))

if self.concentrator then
assert(self.concentrator:_enqueue_unsubscribe(node_id))
end
end
end

Expand Down

0 comments on commit 02169bf

Please sign in to comment.