diff --git a/.requirements b/.requirements index 0acb3686b48f..abe66cf45f72 100644 --- a/.requirements +++ b/.requirements @@ -11,8 +11,9 @@ LIBEXPAT=2.6.2 LUA_KONG_NGINX_MODULE=a8411f7cf4289049f0bd3e8e40088e7256389ed3 # 0.11.0 LUA_RESTY_LMDB=7d2581cbe30cde18a8482d820c227ca0845c0ded # 1.4.2 LUA_RESTY_EVENTS=8448a92cec36ac04ea522e78f6496ba03c9b1fd8 # 0.2.0 -LUA_RESTY_WEBSOCKET=60eafc3d7153bceb16e6327074e0afc3d94b1316 # 0.4.0 +LUA_RESTY_WEBSOCKET=966c69c39f03029b9b42ec0f8e55aaed7d6eebc0 # 0.4.0.1 ATC_ROUTER=ffd11db657115769bf94f0c4f915f98300bc26b6 # 1.6.2 +SNAPPY=23b3286820105438c5dbb9bc22f1bb85c5812c8a # 1.2.0 KONG_MANAGER=nightly NGX_WASM_MODULE=3bd94e61c55415ccfb0f304fa51143a7d630d6ae diff --git a/build/BUILD.bazel b/build/BUILD.bazel index adecda775a80..865db47cdfe4 100644 --- a/build/BUILD.bazel +++ b/build/BUILD.bazel @@ -103,6 +103,7 @@ kong_directory_genrule( "@openresty", "@openresty//:luajit", "@protoc//:all_srcs", + "@snappy//:snappy", ] + select({ "@kong//:skip_webui_flags": [], "//conditions:default": [ @@ -152,6 +153,9 @@ kong_directory_genrule( tar -cC ${LUAJIT}/share . | tar -xC ${BUILD_DESTDIR}/openresty/luajit/share chmod -R "+rw" ${BUILD_DESTDIR}/openresty/luajit + SNAPPY=${WORKSPACE_PATH}/$(dirname $(echo '$(locations @snappy//:snappy)' | awk '{print $1}')) + cp ${SNAPPY}/libsnappy.so ${BUILD_DESTDIR}/kong/lib + LUAROCKS=${WORKSPACE_PATH}/$(dirname '$(location @luarocks//:luarocks_make)')/luarocks_tree cp -r ${LUAROCKS}/. ${BUILD_DESTDIR}/. diff --git a/build/openresty/repositories.bzl b/build/openresty/repositories.bzl index 8c937144cbdf..dbcb9515830e 100644 --- a/build/openresty/repositories.bzl +++ b/build/openresty/repositories.bzl @@ -10,6 +10,7 @@ load("//build/openresty/atc_router:atc_router_repositories.bzl", "atc_router_rep load("//build/openresty/wasmx:wasmx_repositories.bzl", "wasmx_repositories") load("//build/openresty/wasmx/filters:repositories.bzl", "wasm_filters_repositories") load("//build/openresty/brotli:brotli_repositories.bzl", "brotli_repositories") +load("//build/openresty/snappy:snappy_repositories.bzl", "snappy_repositories") # This is a dummy file to export the module's repository. _NGINX_MODULE_DUMMY_FILE = """ @@ -27,6 +28,7 @@ def openresty_repositories(): wasmx_repositories() wasm_filters_repositories() brotli_repositories() + snappy_repositories() openresty_version = KONG_VAR["OPENRESTY"] diff --git a/build/openresty/snappy/BUILD.bazel b/build/openresty/snappy/BUILD.bazel new file mode 100644 index 000000000000..7830623b5e98 --- /dev/null +++ b/build/openresty/snappy/BUILD.bazel @@ -0,0 +1,206 @@ +# Copyright 2023 Google Inc. All Rights Reserved. +# +# Redistribution and use in source and binary forms, with or without +# modification, are permitted provided that the following conditions are +# met: +# +# * Redistributions of source code must retain the above copyright +# notice, this list of conditions and the following disclaimer. +# * Redistributions in binary form must reproduce the above +# copyright notice, this list of conditions and the following disclaimer +# in the documentation and/or other materials provided with the +# distribution. +# * Neither the name of Google Inc. nor the names of its +# contributors may be used to endorse or promote products derived from +# this software without specific prior written permission. +# +# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS +# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT +# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR +# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT +# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, +# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT +# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, +# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY +# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT +# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE +# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + +package(default_visibility = ["//visibility:public"]) + +licenses(["notice"]) + +SNAPPY_VERSION = (1, 1, 10) + +config_setting( + name = "windows", + constraint_values = ["@platforms//os:windows"], +) + +cc_library( + name = "config", + hdrs = ["config.h"], + defines = ["HAVE_CONFIG_H"], +) + +cc_library( + name = "snappy-stubs-public", + hdrs = [":snappy-stubs-public.h"], +) + +cc_library( + name = "snappy-stubs-internal", + srcs = ["snappy-stubs-internal.cc"], + hdrs = ["snappy-stubs-internal.h"], + deps = [ + ":config", + ":snappy-stubs-public", + ], +) + +cc_library( + name = "snappy", + srcs = [ + "snappy.cc", + "snappy-c.cc", + "snappy-internal.h", + "snappy-sinksource.cc", + ], + hdrs = [ + "snappy.h", + "snappy-c.h", + "snappy-sinksource.h", + ], + copts = select({ + ":windows": [], + "//conditions:default": [ + "-Wno-sign-compare", + ], + }), + deps = [ + ":config", + ":snappy-stubs-internal", + ":snappy-stubs-public", + ], +) + +filegroup( + name = "testdata", + srcs = glob(["testdata/*"]), +) + +cc_library( + name = "snappy-test", + testonly = True, + srcs = [ + "snappy-test.cc", + "snappy_test_data.cc", + ], + hdrs = [ + "snappy-test.h", + "snappy_test_data.h", + ], + deps = [":snappy-stubs-internal"], +) + +cc_test( + name = "snappy_benchmark", + srcs = ["snappy_benchmark.cc"], + data = [":testdata"], + deps = [ + ":snappy", + ":snappy-test", + "//third_party/benchmark:benchmark_main", + ], +) + +cc_test( + name = "snappy_unittest", + srcs = [ + "snappy_unittest.cc", + ], + data = [":testdata"], + deps = [ + ":snappy", + ":snappy-test", + "//third_party/googletest:gtest_main", + ], +) + +# Generate a config.h similar to what cmake would produce. +genrule( + name = "config_h", + outs = ["config.h"], + cmd = """cat <$@ +#define HAVE_STDDEF_H 1 +#define HAVE_STDINT_H 1 +#ifdef __has_builtin +# if !defined(HAVE_BUILTIN_EXPECT) && __has_builtin(__builtin_expect) +# define HAVE_BUILTIN_EXPECT 1 +# endif +# if !defined(HAVE_BUILTIN_CTZ) && __has_builtin(__builtin_ctzll) +# define HAVE_BUILTIN_CTZ 1 +# endif +# if !defined(HAVE_BUILTIN_PREFETCH) && __has_builtin(__builtin_prefetech) +# define HAVE_BUILTIN_PREFETCH 1 +# endif +#elif defined(__GNUC__) && (__GNUC__ > 3 || __GNUC__ == 3 && __GNUC_MINOR__ >= 4) +# ifndef HAVE_BUILTIN_EXPECT +# define HAVE_BUILTIN_EXPECT 1 +# endif +# ifndef HAVE_BUILTIN_CTZ +# define HAVE_BUILTIN_CTZ 1 +# endif +# ifndef HAVE_BUILTIN_PREFETCH +# define HAVE_BUILTIN_PREFETCH 1 +# endif +#endif + +#if defined(_WIN32) && !defined(HAVE_WINDOWS_H) +#define HAVE_WINDOWS_H 1 +#endif + +#ifdef __has_include +# if !defined(HAVE_BYTESWAP_H) && __has_include() +# define HAVE_BYTESWAP_H 1 +# endif +# if !defined(HAVE_UNISTD_H) && __has_include() +# define HAVE_UNISTD_H 1 +# endif +# if !defined(HAVE_SYS_ENDIAN_H) && __has_include() +# define HAVE_SYS_ENDIAN_H 1 +# endif +# if !defined(HAVE_SYS_MMAN_H) && __has_include() +# define HAVE_SYS_MMAN_H 1 +# endif +# if !defined(HAVE_SYS_UIO_H) && __has_include() +# define HAVE_SYS_UIO_H 1 +# endif +# if !defined(HAVE_SYS_TIME_H) && __has_include() +# define HAVE_SYS_TIME_H 1 +# endif +#endif + +#ifndef SNAPPY_IS_BIG_ENDIAN +# ifdef __s390x__ +# define SNAPPY_IS_BIG_ENDIAN 1 +# elif defined(__BYTE_ORDER__) && defined(__ORDER_BIG_ENDIAN__) && __BYTE_ORDER__ == __ORDER_BIG_ENDIAN__ +# define SNAPPY_IS_BIG_ENDIAN 1 +# endif +#endif +EOF +""", +) + +genrule( + name = "snappy_stubs_public_h", + srcs = ["snappy-stubs-public.h.in"], + outs = ["snappy-stubs-public.h"], + # Assume sys/uio.h is available on non-Windows. + # Set the version numbers. + cmd = ("""sed -e 's/$${HAVE_SYS_UIO_H_01}/!_WIN32/g' \ + -e 's/$${PROJECT_VERSION_MAJOR}/%d/g' \ + -e 's/$${PROJECT_VERSION_MINOR}/%d/g' \ + -e 's/$${PROJECT_VERSION_PATCH}/%d/g' \ + $< >$@""" % SNAPPY_VERSION), +) diff --git a/build/openresty/snappy/snappy_repositories.bzl b/build/openresty/snappy/snappy_repositories.bzl new file mode 100644 index 000000000000..4dbd7eeebdb8 --- /dev/null +++ b/build/openresty/snappy/snappy_repositories.bzl @@ -0,0 +1,15 @@ +"""A module defining the dependency snappy""" + +load("@bazel_tools//tools/build_defs/repo:git.bzl", "new_git_repository") +load("@bazel_tools//tools/build_defs/repo:utils.bzl", "maybe") +load("@kong_bindings//:variables.bzl", "KONG_VAR") + +def snappy_repositories(): + maybe( + new_git_repository, + name = "snappy", + branch = KONG_VAR["SNAPPY"], + remote = "https://github.com/google/snappy", + visibility = ["//visibility:public"], # let this to be referenced by openresty build + build_file = "//build/openresty/snappy:BUILD.bazel", + ) diff --git a/changelog/unreleased/kong/cp-dp-rpc.yml b/changelog/unreleased/kong/cp-dp-rpc.yml new file mode 100644 index 000000000000..6dcc77c02e7c --- /dev/null +++ b/changelog/unreleased/kong/cp-dp-rpc.yml @@ -0,0 +1,3 @@ +message: "Remote procedure call (RPC) framework for Hybrid mode deployments." +type: feature +scope: Clustering diff --git a/changelog/unreleased/kong/dynamic-log-level-rpc.yml b/changelog/unreleased/kong/dynamic-log-level-rpc.yml new file mode 100644 index 000000000000..69096eb0afe1 --- /dev/null +++ b/changelog/unreleased/kong/dynamic-log-level-rpc.yml @@ -0,0 +1,6 @@ +message: | + Dynamic log level over Hybrid mode RPC which allows setting DP log level + to a different level for specified duration before reverting back + to the `kong.conf` configured value. +type: feature +scope: Clustering diff --git a/kong-3.7.0-0.rockspec b/kong-3.7.0-0.rockspec index 8aeb8719bef1..35a94a8afcac 100644 --- a/kong-3.7.0-0.rockspec +++ b/kong-3.7.0-0.rockspec @@ -42,6 +42,7 @@ dependencies = { "lua-resty-timer-ng == 0.2.7", "lpeg == 1.1.0", "lua-resty-ljsonschema == 1.1.6-2", + "lua-resty-snappy == 1.0-1", } build = { type = "builtin", @@ -84,6 +85,16 @@ build = { ["kong.clustering.compat.checkers"] = "kong/clustering/compat/checkers.lua", ["kong.clustering.config_helper"] = "kong/clustering/config_helper.lua", ["kong.clustering.tls"] = "kong/clustering/tls.lua", + ["kong.clustering.services.debug"] = "kong/clustering/services/debug.lua", + + ["kong.clustering.rpc.callbacks"] = "kong/clustering/rpc/callbacks.lua", + ["kong.clustering.rpc.future"] = "kong/clustering/rpc/future.lua", + ["kong.clustering.rpc.json_rpc_v2"] = "kong/clustering/rpc/json_rpc_v2.lua", + ["kong.clustering.rpc.manager"] = "kong/clustering/rpc/manager.lua", + ["kong.clustering.rpc.queue"] = "kong/clustering/rpc/queue.lua", + ["kong.clustering.rpc.socket"] = "kong/clustering/rpc/socket.lua", + ["kong.clustering.rpc.utils"] = "kong/clustering/rpc/utils.lua", + ["kong.clustering.rpc.concentrator"] = "kong/clustering/rpc/concentrator.lua", ["kong.cluster_events"] = "kong/cluster_events/init.lua", ["kong.cluster_events.strategies.postgres"] = "kong/cluster_events/strategies/postgres.lua", @@ -291,6 +302,7 @@ build = { ["kong.db.migrations.core.020_330_to_340"] = "kong/db/migrations/core/020_330_to_340.lua", ["kong.db.migrations.core.021_340_to_350"] = "kong/db/migrations/core/021_340_to_350.lua", ["kong.db.migrations.core.022_350_to_360"] = "kong/db/migrations/core/022_350_to_360.lua", + ["kong.db.migrations.core.023_360_to_370"] = "kong/db/migrations/core/023_360_to_370.lua", ["kong.db.migrations.operations.200_to_210"] = "kong/db/migrations/operations/200_to_210.lua", ["kong.db.migrations.operations.212_to_213"] = "kong/db/migrations/operations/212_to_213.lua", ["kong.db.migrations.operations.280_to_300"] = "kong/db/migrations/operations/280_to_300.lua", diff --git a/kong/api/routes/debug.lua b/kong/api/routes/debug.lua index dade7628d0c3..b783a9fd1097 100644 --- a/kong/api/routes/debug.lua +++ b/kong/api/routes/debug.lua @@ -11,6 +11,7 @@ local kong = kong local pcall = pcall local type = type local tostring = tostring +local tonumber = tonumber local get_log_level = require("resty.kong.log").get_log_level @@ -127,4 +128,49 @@ routes[cluster_name] = { end } + +if kong.rpc then + routes["/clustering/data-planes/:node_id/log-level"] = { + GET = function(self) + local res, err = + kong.rpc:call(self.params.node_id, "kong.debug.log_level.v1.get_log_level") + if not res then + return kong.response.exit(500, { message = err, }) + end + + return kong.response.exit(200, res) + end, + PUT = function(self) + local new_level = self.params.current_level + local timeout = self.params.timeout and + math.ceil(tonumber(self.params.timeout)) or nil + + if not new_level then + return kong.response.exit(400, { message = "Required parameter \"current_level\" is missing.", }) + end + + local res, err = kong.rpc:call(self.params.node_id, + "kong.debug.log_level.v1.set_log_level", + new_level, + timeout) + if not res then + return kong.response.exit(500, { message = err, }) + end + + return kong.response.exit(201) + end, + DELETE = function(self) + local res, err = kong.rpc:call(self.params.node_id, + "kong.debug.log_level.v1.set_log_level", + "warn", + 0) + if not res then + return kong.response.exit(500, { message = err, }) + end + + return kong.response.exit(204) + end, + } +end + return routes diff --git a/kong/clustering/control_plane.lua b/kong/clustering/control_plane.lua index 33d427424e7c..6bdfb24e192a 100644 --- a/kong/clustering/control_plane.lua +++ b/kong/clustering/control_plane.lua @@ -238,6 +238,12 @@ function _M:handle_cp_websocket(cert) local sync_status = CLUSTERING_SYNC_STATUS.UNKNOWN local purge_delay = self.conf.cluster_data_plane_purge_delay local update_sync_status = function() + local rpc_peers + + if self.conf.cluster_rpc then + rpc_peers = kong.rpc:get_peers() + end + local ok ok, err = kong.db.clustering_data_planes:upsert({ id = dp_id }, { last_seen = last_seen, @@ -250,6 +256,8 @@ function _M:handle_cp_websocket(cert) sync_status = sync_status, -- TODO: import may have been failed though labels = data.labels, cert_details = dp_cert_details, + -- only update rpc_capabilities if dp_id is connected + rpc_capabilities = rpc_peers and rpc_peers[dp_id] or {}, }, { ttl = purge_delay }) if not ok then ngx_log(ngx_ERR, _log_prefix, "unable to update clustering data plane status: ", err, log_suffix) diff --git a/kong/clustering/rpc/callbacks.lua b/kong/clustering/rpc/callbacks.lua new file mode 100644 index 000000000000..f4aefcb5b65b --- /dev/null +++ b/kong/clustering/rpc/callbacks.lua @@ -0,0 +1,47 @@ +local _M = {} +local _MT = { __index = _M, } + + +local utils = require("kong.clustering.rpc.utils") + + +local parse_method_name = utils.parse_method_name + + +function _M.new() + local self = { + callbacks = {}, + capabilities = {}, -- updated as register() is called + capabilities_list = {}, -- updated as register() is called + } + + return setmetatable(self, _MT) +end + + +function _M:register(method, func) + if self.callbacks[method] then + error("duplicate registration of " .. method) + end + + local cap, func_or_err = parse_method_name(method) + if not cap then + return nil, "unable to get capabilities: " .. func_or_err + end + + if not self.capabilities[cap] then + self.capabilities[cap] = true + table.insert(self.capabilities_list, cap) + end + self.callbacks[method] = func +end + + +-- returns a list of capabilities of this node, like: +-- ["kong.meta.v1", "kong.debug.v1", ...] +function _M:get_capabilities_list() + return self.capabilities_list +end + + +return _M diff --git a/kong/clustering/rpc/concentrator.lua b/kong/clustering/rpc/concentrator.lua new file mode 100644 index 000000000000..a7815d7a6c19 --- /dev/null +++ b/kong/clustering/rpc/concentrator.lua @@ -0,0 +1,303 @@ +local _M = {} +local _MT = { __index = _M, } + + +local uuid = require("resty.jit-uuid") +local queue = require("kong.clustering.rpc.queue") +local cjson = require("cjson") +local jsonrpc = require("kong.clustering.rpc.json_rpc_v2") +local rpc_utils = require("kong.clustering.rpc.utils") + + +local setmetatable = setmetatable +local tostring = tostring +local pcall = pcall +local assert = assert +local string_format = string.format +local cjson_decode = cjson.decode +local cjson_encode = cjson.encode +local exiting = ngx.worker.exiting +local is_timeout = rpc_utils.is_timeout +local ngx_log = ngx.log +local ngx_ERR = ngx.ERR +local ngx_WARN = ngx.WARN +local ngx_DEBUG = ngx.DEBUG + + +local RESP_CHANNEL_PREFIX = "rpc:resp:" -- format: rpc:resp: +local REQ_CHANNEL_PREFIX = "rpc:req:" -- format: rpc:req: + + +local RPC_REQUEST_ENQUEUE_SQL = [[ +BEGIN; + INSERT INTO clustering_rpc_requests ( + "node_id", + "reply_to", + "ttl", + "payload" + ) VALUES ( + %s, + %s, + CURRENT_TIMESTAMP(3) AT TIME ZONE 'UTC' + INTERVAL '%d second', + %s + ); + SELECT pg_notify(%s, NULL); +COMMIT; +]] + + +local RPC_REQUEST_DEQUEUE_SQL = [[ +BEGIN; + DELETE FROM + clustering_rpc_requests + USING ( + SELECT * FROM clustering_rpc_requests WHERE node_id = %s FOR UPDATE SKIP LOCKED + ) q + WHERE q.id = clustering_rpc_requests.id RETURNING clustering_rpc_requests.*; +COMMIT; +]] + + +function _M.new(manager, db) + local self = { + manager = manager, + db = db, + interest = {}, -- id: callback pair + sub_unsub = queue.new(4096), -- pub/sub event queue, executed on the read thread + sequence = 0, + } + + return setmetatable(self, _MT) +end + + +function _M:_get_next_id() + local res = self.sequence + self.sequence = res + 1 + + return res +end + + +local function enqueue_notifications(notifications, notifications_queue) + assert(notifications_queue) + + if notifications then + for _, n in ipairs(notifications) do + assert(notifications_queue:push(n)) + end + end +end + + +function _M:_event_loop(lconn) + local notifications_queue = queue.new(4096) + local rpc_resp_channel_name = RESP_CHANNEL_PREFIX .. self.worker_id + + -- we always subscribe to our worker's receiving channel first + local res, err = lconn:query('LISTEN "' .. rpc_resp_channel_name .. '";') + if not res then + return nil, "unable to subscribe to concentrator response channel: " .. err + end + + while not exiting() do + while true do + local n, err = notifications_queue:pop(0) + if not n then + if err then + return nil, "unable to pop from notifications queue: " .. err + end + + break + end + + assert(n.operation == "notification") + + if n.channel == rpc_resp_channel_name then + -- an response for a previous RPC call we asked for + local payload = cjson_decode(n.payload) + assert(payload.jsonrpc == "2.0") + + -- response + local cb = self.interest[payload.id] + self.interest[payload.id] = nil -- edge trigger only once + + if cb then + local res, err = cb(payload) + if not res then + ngx_log(ngx_WARN, "[rpc] concentrator response interest handler failed: id: ", + payload.id, ", err: ", err) + end + + else + ngx_log(ngx_WARN, "[rpc] no interest for concentrator response id: ", payload.id, ", dropping it") + end + + else + -- other CP inside the cluster asked us to forward a call + assert(n.channel:sub(1, #REQ_CHANNEL_PREFIX) == REQ_CHANNEL_PREFIX, + "unexpected concentrator request channel name: " .. n.channel) + + local target_id = n.channel:sub(#REQ_CHANNEL_PREFIX + 1) + local sql = string_format(RPC_REQUEST_DEQUEUE_SQL, self.db.connector:escape_literal(target_id)) + local calls, err = self.db.connector:query(sql) + if not calls then + return nil, "concentrator request dequeue query failed: " .. err + end + + assert(calls[1] == true) + ngx_log(ngx_DEBUG, "concentrator got ", calls[2].affected_rows, + " calls from database for node ", target_id) + for _, call in ipairs(calls[2]) do + local payload = assert(call.payload) + local reply_to = assert(call.reply_to, + "unknown requester for RPC") + + local res, err = self.manager:_local_call(target_id, payload.method, + payload.params) + if res then + -- call success + res, err = self:_enqueue_rpc_response(reply_to, { + jsonrpc = "2.0", + id = payload.id, + result = res, + }) + if not res then + ngx_log(ngx_WARN, "[rpc] unable to enqueue RPC call result: ", err) + end + + else + -- call failure + res, err = self:_enqueue_rpc_response(reply_to, { + jsonrpc = "2.0", + id = payload.id, + error = { + code = jsonrpc.SERVER_ERROR, + message = tostring(err), + } + }) + if not res then + ngx_log(ngx_WARN, "[rpc] unable to enqueue RPC error: ", err) + end + end + end + end + end + + local res, err = lconn:wait_for_notification() + if not res then + if is_timeout(err) then + return nil, "wait_for_notification error: " .. err + end + + repeat + local sql, err = self.sub_unsub:pop(0) + if err then + return nil, err + end + + local _, notifications + res, err, _, notifications = lconn:query(sql or "SELECT 1;") -- keepalive + if not res then + return nil, "query to Postgres failed: " .. err + end + + enqueue_notifications(notifications, notifications_queue) + until not sql + + else + notifications_queue:push(res) + end + end +end + + +function _M:start(delay) + if not self.worker_id then + -- this can not be generated inside `:new()` as ngx.worker.id() + -- does not yet exist there and can only be generated inside + -- init_worker phase + self.worker_id = uuid.generate_v5(kong.node.get_id(), + tostring(ngx.worker.id())) + end + + assert(ngx.timer.at(delay or 0, function(premature) + if premature then + return + end + + local lconn = self.db.connector:connect("write") + lconn:settimeout(1000) + self.db.connector:store_connection(nil, "write") + + local _, res_or_perr, err = pcall(self._event_loop, self, lconn) + -- _event_loop never returns true + local delay = math.random(5, 10) + + ngx_log(ngx_ERR, "[rpc] concentrator event loop error: ", + res_or_perr or err, ", reconnecting in ", + math.floor(delay), " seconds") + + local res, err = lconn:disconnect() + if not res then + ngx_log(ngx_ERR, "[rpc] unable to close postgres connection: ", err) + end + + self:start(delay) + end)) +end + + +-- enqueue a RPC request to DP node with ID node_id +function _M:_enqueue_rpc_request(node_id, payload) + local sql = string_format(RPC_REQUEST_ENQUEUE_SQL, + self.db.connector:escape_literal(node_id), + self.db.connector:escape_literal(self.worker_id), + 5, + self.db.connector:escape_literal(cjson_encode(payload)), + self.db.connector:escape_literal(REQ_CHANNEL_PREFIX .. node_id)) + return self.db.connector:query(sql) +end + + +-- enqueue a RPC response from CP worker with ID worker_id +function _M:_enqueue_rpc_response(worker_id, payload) + local sql = string_format("SELECT pg_notify(%s, %s);", + self.db.connector:escape_literal(RESP_CHANNEL_PREFIX .. worker_id), + self.db.connector:escape_literal(cjson_encode(payload))) + return self.db.connector:query(sql) +end + + +-- subscribe to RPC calls for worker with ID node_id +function _M:_enqueue_subscribe(node_id) + return self.sub_unsub:push('LISTEN "' .. REQ_CHANNEL_PREFIX .. node_id .. '";') +end + + +-- unsubscribe to RPC calls for worker with ID node_id +function _M:_enqueue_unsubscribe(node_id) + return self.sub_unsub:push('UNLISTEN "' .. REQ_CHANNEL_PREFIX .. node_id .. '";') +end + + +-- asynchronously start executing a RPC, node_id is +-- needed for this implementation, because all nodes +-- over concentrator shares the same "socket" object +-- This way the manager code wouldn't tell the difference +-- between calls made over WebSocket or concentrator +function _M:call(node_id, method, params, callback) + local id = self:_get_next_id() + + self.interest[id] = callback + + return self:_enqueue_rpc_request(node_id, { + jsonrpc = "2.0", + method = method, + params = params, + id = id, + }) +end + + +return _M diff --git a/kong/clustering/rpc/future.lua b/kong/clustering/rpc/future.lua new file mode 100644 index 000000000000..230d8bf09983 --- /dev/null +++ b/kong/clustering/rpc/future.lua @@ -0,0 +1,73 @@ +local _M = {} +local _MT = { __index = _M, } + + +local semaphore = require("ngx.semaphore") + + +local STATE_NEW = 1 +local STATE_IN_PROGRESS = 2 +local STATE_SUCCEED = 3 +local STATE_ERRORED = 4 + + +function _M.new(node_id, socket, method, params) + local self = { + method = method, + params = params, + sema = semaphore.new(), + socket = socket, + node_id = node_id, + id = nil, + result = nil, + error = nil, + state = STATE_NEW, -- STATE_* + } + + return setmetatable(self, _MT) +end + + +-- start executing the future +function _M:start() + assert(self.state == STATE_NEW) + self.state = STATE_IN_PROGRESS + + local callback = function(resp) + assert(resp.jsonrpc == "2.0") + + if resp.result then + -- succeeded + self.result = resp.result + self.state = STATE_SUCCEED + + else + -- errored + self.error = resp.error + self.state = STATE_ERRORED + end + + self.sema:post() + + return true + end + + return self.socket:call(self.node_id, + self.method, + self.params, callback) +end + + +function _M:wait(timeout) + assert(self.state == STATE_IN_PROGRESS) + + local res, err = self.sema:wait(timeout) + if not res then + return res, err + end + + return self.state == STATE_SUCCEED +end + + +return _M diff --git a/kong/clustering/rpc/json_rpc_v2.lua b/kong/clustering/rpc/json_rpc_v2.lua new file mode 100644 index 000000000000..c5ece5d538e9 --- /dev/null +++ b/kong/clustering/rpc/json_rpc_v2.lua @@ -0,0 +1,41 @@ +local assert = assert +local tostring = tostring + + +local _M = { + PARSE_ERROR = -32700, + INVALID_REQUEST = -32600, + METHOD_NOT_FOUND = -32601, + INVALID_PARAMS = -32602, + INTERNAL_ERROR = -32603, + SERVER_ERROR = -32000, +} + + +local ERROR_MSG = { + [_M.PARSE_ERROR] = "Parse error", + [_M.INVALID_REQUEST] = "Invalid Request", + [_M.METHOD_NOT_FOUND] = "Method not found", + [_M.INVALID_PARAMS] = "Invalid params", + [_M.INTERNAL_ERROR] = "Internal error", + [_M.SERVER_ERROR] = "Server error", +} + + +function _M.new_error(id, code, msg) + if not msg then + msg = assert(ERROR_MSG[code], "unknown code: " .. tostring(code)) + end + + return { + jsonrpc = "2.0", + id = id, + error = { + code = code, + message = msg, + } + } +end + + +return _M diff --git a/kong/clustering/rpc/manager.lua b/kong/clustering/rpc/manager.lua new file mode 100644 index 000000000000..5104fdab7235 --- /dev/null +++ b/kong/clustering/rpc/manager.lua @@ -0,0 +1,365 @@ +local _M = {} +local _MT = { __index = _M, } + + +local server = require("resty.websocket.server") +local client = require("resty.websocket.client") +local socket = require("kong.clustering.rpc.socket") +local concentrator = require("kong.clustering.rpc.concentrator") +local future = require("kong.clustering.rpc.future") +local utils = require("kong.clustering.rpc.utils") +local callbacks = require("kong.clustering.rpc.callbacks") +local clustering_tls = require("kong.clustering.tls") +local constants = require("kong.constants") +local table_isempty = require("table.isempty") +local pl_tablex = require("pl.tablex") +local cjson = require("cjson.safe") + + +local ngx_var = ngx.var +local ngx_ERR = ngx.ERR +local ngx_log = ngx.log +local ngx_exit = ngx.exit +local ngx_time = ngx.time +local exiting = ngx.worker.exiting +local pl_tablex_makeset = pl_tablex.makeset +local cjson_encode = cjson.encode +local cjson_decode = cjson.decode +local validate_client_cert = clustering_tls.validate_client_cert +local CLUSTERING_PING_INTERVAL = constants.CLUSTERING_PING_INTERVAL + + +local WS_OPTS = { + timeout = constants.CLUSTERING_TIMEOUT, + max_payload_len = kong.configuration.cluster_max_payload, +} +local KONG_VERSION = kong.version + + +-- create a new RPC manager, node_id is own node_id +function _M.new(conf, node_id) + local self = { + -- clients[node_id]: { socket1 => true, socket2 => true, ... } + clients = {}, + client_capabilities = {}, + node_id = node_id, + conf = conf, + cluster_cert = assert(clustering_tls.get_cluster_cert(conf)), + cluster_cert_key = assert(clustering_tls.get_cluster_cert_key(conf)), + callbacks = callbacks.new(), + } + + self.concentrator = concentrator.new(self, kong.db) + + return setmetatable(self, _MT) +end + + +function _M:_add_socket(socket, capabilities_list) + local sockets = self.clients[socket.node_id] + if not sockets then + assert(self.concentrator:_enqueue_subscribe(socket.node_id)) + sockets = setmetatable({}, { __mode = "k", }) + self.clients[socket.node_id] = sockets + end + + self.client_capabilities[socket.node_id] = { + set = pl_tablex_makeset(capabilities_list), + list = capabilities_list, + } + + assert(not sockets[socket]) + + sockets[socket] = true +end + + +function _M:_remove_socket(socket) + local sockets = assert(self.clients[socket.node_id]) + + assert(sockets[socket]) + + sockets[socket] = nil + + if table_isempty(sockets) then + self.clients[socket.node_id] = nil + self.client_capabilities[socket.node_id] = nil + assert(self.concentrator:_enqueue_unsubscribe(socket.node_id)) + end +end + + +-- Helper that finds a node by node_id and check +-- if capability is supported +-- Returns: "local" if found locally, +-- or "concentrator" if found from the concentrator +-- In case of error, return nil, err instead +function _M:_find_node_and_check_capability(node_id, cap) + if self.client_capabilities[node_id] then + if not self.client_capabilities[node_id].set[cap] then + return nil, "requested capability does not exist, capability: " .. + cap .. ", node_id: " .. node_id + end + + return "local" + end + + -- does concentrator knows more about this client? + local res, err = kong.db.clustering_data_planes:select({ id = node_id }) + if err then + return nil, "unable to query concentrator " .. err + end + + if not res or ngx_time() - res.last_seen > CLUSTERING_PING_INTERVAL * 2 then + return nil, "node is not connected, node_id: " .. node_id + end + + for _, c in ipairs(res.rpc_capabilities) do + if c == cap then + return "concentrator" + end + end + + return nil, "requested capability does not exist, capability: " .. + cap .. ", node_id: " .. node_id +end + + +-- low level helper used internally by :call() and concentrator +-- this one does not consider forwarding using concentrator +-- when node does not exist +function _M:_local_call(node_id, method, params) + if not self.client_capabilities[node_id] then + return nil, "node is not connected, node_id: " .. node_id + end + + local cap = utils.parse_method_name(method) + if not self.client_capabilities[node_id].set[cap] then + return nil, "requested capability does not exist, capability: " .. + cap .. ", node_id: " .. node_id + end + + local s = next(self.clients[node_id]) -- TODO: better LB? + + local fut = future.new(node_id, s, method, params) + assert(fut:start()) + + local ok, err = fut:wait(5) + if err then + return nil, err + end + + if ok then + return fut.result + end + + return nil, fut.error.message +end + + +-- public interface, try call on node_id locally first, +-- if node is not connected, try concentrator next +function _M:call(node_id, method, ...) + local cap = utils.parse_method_name(method) + + local res, err = self:_find_node_and_check_capability(node_id, cap) + if not res then + return nil, err + end + + local params = {...} + + if res == "local" then + res, err = self:_local_call(node_id, method, params) + if not res then + return nil, err + end + + return res + end + + assert(res == "concentrator") + + -- try concentrator + local fut = future.new(node_id, self.concentrator, method, params) + assert(fut:start()) + + local ok, err = fut:wait(5) + if err then + return nil, err + end + + if ok then + return fut.result + end + + return nil, fut.error.message +end + + +-- handle incoming client connections +function _M:handle_websocket() + local kong_version = ngx_var.http_x_kong_version + local node_id = ngx_var.http_x_kong_node_id + local rpc_protocol = ngx_var.http_sec_websocket_protocol + local content_encoding = ngx_var.http_content_encoding + local rpc_capabilities = ngx_var.http_x_kong_rpc_capabilities + + if not kong_version then + ngx_log(ngx_ERR, "[rpc] client did not provide version number") + return ngx_exit(ngx.HTTP_CLOSE) + end + + if not node_id then + ngx_log(ngx_ERR, "[rpc] client did not provide node ID") + return ngx_exit(ngx.HTTP_CLOSE) + end + + if content_encoding ~= "x-snappy-framed" then + ngx_log(ngx_ERR, "[rpc] client does use Snappy compressed frames") + return ngx_exit(ngx.HTTP_CLOSE) + end + + if rpc_protocol ~= "kong.rpc.v1" then + ngx_log(ngx_ERR, "[rpc] unknown RPC protocol: " .. + tostring(rpc_protocol) .. + ", doesn't know how to communicate with client") + return ngx_exit(ngx.HTTP_CLOSE) + end + + if not rpc_capabilities then + ngx_log(ngx_ERR, "[rpc] client did not provide capability list") + return ngx_exit(ngx.HTTP_CLOSE) + end + + rpc_capabilities = cjson_decode(rpc_capabilities) + if not rpc_capabilities then + ngx_log(ngx_ERR, "[rpc] failed to decode client capability list") + return ngx_exit(ngx.HTTP_CLOSE) + end + + local cert, err = validate_client_cert(self.conf, self.cluster_cert, ngx_var.ssl_client_raw_cert) + if not cert then + ngx_log(ngx_ERR, "[rpc] client's certificate failed validation: ", err) + return ngx_exit(ngx.HTTP_CLOSE) + end + + ngx.header["X-Kong-RPC-Capabilities"] = cjson_encode(self.callbacks:get_capabilities_list()) + + local wb, err = server:new(WS_OPTS) + if not wb then + ngx_log(ngx_ERR, "[rpc] unable to establish WebSocket connection with client: ", err) + return ngx_exit(ngx.HTTP_CLOSE) + end + + local s = socket.new(self, wb, node_id) + self:_add_socket(s, rpc_capabilities) + + s:start() + local res, err = s:join() + self:_remove_socket(s) + + if not res then + ngx_log(ngx_ERR, "[rpc] RPC connection broken: ", err, " node_id: ", node_id) + return ngx_exit(ngx.ERROR) + end + + return ngx_exit(ngx.OK) +end + + +function _M:connect(premature, node_id, host, path, cert, key) + if premature then + return + end + + local uri = "wss://" .. host .. path + + local opts = { + ssl_verify = true, + client_cert = cert, + client_priv_key = key, + protocols = "kong.rpc.v1", + headers = { + "X-Kong-Version: " .. KONG_VERSION, + "X-Kong-Node-Id: " .. self.node_id, + "X-Kong-Hostname: " .. kong.node.get_hostname(), + "X-Kong-RPC-Capabilities: " .. cjson_encode(self.callbacks:get_capabilities_list()), + "Content-Encoding: x-snappy-framed" + }, + } + + if self.conf.cluster_mtls == "shared" then + opts.server_name = "kong_clustering" + + else + -- server_name will be set to the host if it is not explicitly defined here + if self.conf.cluster_server_name ~= "" then + opts.server_name = self.conf.cluster_server_name + end + end + + local reconnection_delay = math.random(5, 10) + + local c = assert(client:new(WS_OPTS)) + + local ok, err = c:connect(uri, opts) + if not ok then + ngx_log(ngx_ERR, "[rpc] unable to connect to peer: ", err) + goto err + end + + do + local resp_headers = c:get_resp_headers() + -- FIXME: resp_headers should not be case sensitive + if not resp_headers or not resp_headers["x_kong_rpc_capabilities"] then + ngx_log(ngx_ERR, "[rpc] peer did not provide capability list, node_id: ", node_id) + c:send_close() -- can't do much if this fails + goto err + end + + local capabilities = resp_headers["x_kong_rpc_capabilities"] + capabilities = cjson_decode(capabilities) + if not capabilities then + ngx_log(ngx_ERR, "[rpc] unable to decode peer capability list, node_id: ", node_id, + " list: ", capabilities) + c:send_close() -- can't do much if this fails + goto err + end + + local s = socket.new(self, c, node_id) + s:start() + self:_add_socket(s, capabilities) + + ok, err = s:join() -- main event loop + + self:_remove_socket(s) + + if not ok then + ngx_log(ngx_ERR, "[rpc] connection to node_id: ", node_id, " broken, err: ", + err, ", reconnecting in ", reconnection_delay, " seconds") + end + end + + ::err:: + + if not exiting() then + ngx.timer.at(reconnection_delay, function(premature) + self:connect(premature, node_id, host, path, cert, key) + end) + end +end + + +function _M:get_peers() + local res = {} + + for node_id, cap in pairs(self.client_capabilities) do + res[node_id] = cap.list + end + + return res +end + + +return _M diff --git a/kong/clustering/rpc/queue.lua b/kong/clustering/rpc/queue.lua new file mode 100644 index 000000000000..7b07705be8f5 --- /dev/null +++ b/kong/clustering/rpc/queue.lua @@ -0,0 +1,73 @@ +local semaphore = require("ngx.semaphore") +local table_new = require("table.new") +local rpc_utils = require("kong.clustering.rpc.utils") + + +local assert = assert +local setmetatable = setmetatable +local math_min = math.min +local is_timeout = rpc_utils.is_timeout + + +local _M = {} +local _MT = { __index = _M, } + + +local DEFAULT_QUEUE_LEN = 128 + + +function _M.new(max_len) + local self = { + semaphore = assert(semaphore.new()), + max = max_len, + + elts = table_new(math_min(max_len, DEFAULT_QUEUE_LEN), 0), + first = 0, + last = -1, + } + + return setmetatable(self, _MT) +end + + +function _M:push(item) + local last = self.last + + if last - self.first + 1 >= self.max then + return nil, "queue overflow" + end + + last = last + 1 + self.last = last + self.elts[last] = item + + self.semaphore:post() + + return true +end + + +function _M:pop(timeout) + local ok, err = self.semaphore:wait(timeout) + if not ok then + if is_timeout(err) then + return nil + end + + return nil, err + end + + local first = self.first + + -- queue can not be empty because semaphore succeed + assert(first <= self.last) + + local item = self.elts[first] + self.elts[first] = nil + self.first = first + 1 + + return item +end + + +return _M diff --git a/kong/clustering/rpc/socket.lua b/kong/clustering/rpc/socket.lua new file mode 100644 index 000000000000..243a44522fc2 --- /dev/null +++ b/kong/clustering/rpc/socket.lua @@ -0,0 +1,284 @@ +-- socket represents an open WebSocket connection +-- unlike the WebSocket object, it can be accessed via different requests +-- with the help of semaphores + + +local _M = {} +local _MT = { __index = _M, } + + +local utils = require("kong.clustering.rpc.utils") +local queue = require("kong.clustering.rpc.queue") +local jsonrpc = require("kong.clustering.rpc.json_rpc_v2") +local constants = require("kong.constants") + + +local assert = assert +local string_format = string.format +local kong = kong +local is_timeout = utils.is_timeout +local compress_payload = utils.compress_payload +local decompress_payload = utils.decompress_payload +local exiting = ngx.worker.exiting +local ngx_time = ngx.time +local ngx_log = ngx.log +local new_error = jsonrpc.new_error + + +local CLUSTERING_PING_INTERVAL = constants.CLUSTERING_PING_INTERVAL +local PING_WAIT = CLUSTERING_PING_INTERVAL * 1.5 +local PING_TYPE = "PING" +local PONG_TYPE = "PONG" +local ngx_WARN = ngx.WARN +local ngx_DEBUG = ngx.DEBUG + + +-- create a new socket wrapper, wb is the WebSocket object to use +-- timeout and max_payload_len must already been set by caller when +-- creating the `wb` object +function _M.new(manager, wb, node_id) + local self = { + wb = wb, + interest = {}, -- id: callback pair + outgoing = queue.new(4096), -- write queue + manager = manager, + node_id = node_id, + sequence = 0, + } + + return setmetatable(self, _MT) +end + + +function _M:_get_next_id() + local res = self.sequence + self.sequence = res + 1 + + return res +end + + +function _M._dispatch(premature, self, cb, payload) + if premature then + return + end + + local res, err = cb(self.node_id, unpack(payload.params)) + if not res then + ngx_log(ngx_WARN, "[rpc] RPC callback failed: ", err) + + res, err = self.outgoing:push(new_error(payload.id, jsonrpc.SERVER_ERROR, + tostring(err))) + if not res then + ngx_log(ngx_WARN, "[rpc] unable to push RPC call error: ", err) + end + + return + end + + -- success + res, err = self.outgoing:push({ + jsonrpc = "2.0", + id = payload.id, + result = res, + }) + if not res then + ngx_log(ngx_WARN, "[rpc] unable to push RPC call result: ", err) + end +end + + +-- start reader and writer thread and event loop +function _M:start() + self.read_thread = ngx.thread.spawn(function() + local last_seen = ngx_time() + + while not exiting() do + local data, typ, err = self.wb:recv_frame() + + if err then + if not is_timeout(err) then + return nil, err + end + + local waited = ngx_time() - last_seen + if waited > PING_WAIT then + return nil, "did not receive ping frame from other end within " .. + PING_WAIT .. " seconds" + end + + if waited > CLUSTERING_PING_INTERVAL then + local res, err = self.outgoing:push(PING_TYPE) + if not res then + return nil, "unable to send ping: " .. err + end + end + + -- timeout + goto continue + end + + last_seen = ngx_time() + + if typ == "ping" then + local res, err = self.outgoing:push(PONG_TYPE) + if not res then + return nil, "unable to handle ping: " .. err + end + + goto continue + end + + if typ == "pong" then + ngx_log(ngx_DEBUG, "[rpc] got PONG frame") + + goto continue + end + + if typ == "close" then + return true + end + + assert(typ == "binary") + + local payload = decompress_payload(data) + assert(payload.jsonrpc == "2.0") + + if payload.method then + -- invoke + + local dispatch_cb = self.manager.callbacks.callbacks[payload.method] + if not dispatch_cb then + local res, err = self.outgoing:push(new_error(payload.id, jsonrpc.METHOD_NOT_FOUND)) + if not res then + return nil, "unable to send \"METHOD_NOT_FOUND\" error back to client: " .. err + end + + goto continue + end + + -- call dispatch + local res, err = kong.timer:named_at(string_format("JSON-RPC callback for node_id: %s, id: %d, method: %s", + self.node_id, payload.id, payload.method), + 0, _M._dispatch, self, dispatch_cb, payload) + if not res then + local reso, erro = self.outgoing:push(new_error(payload.id, jsonrpc.INTERNAL_ERROR)) + if not reso then + return nil, "unable to send \"INTERNAL_ERROR\" error back to client: " .. erro + end + + return nil, "unable to dispatch JSON-RPC callback: " .. err + end + + else + -- response + local interest_cb = self.interest[payload.id] + self.interest[payload.id] = nil -- edge trigger only once + + if not interest_cb then + ngx_log(ngx_WARN, "[rpc] no interest for RPC response id: ", payload.id, ", dropping it") + + goto continue + end + + local res, err = interest_cb(payload) + if not res then + ngx_log(ngx_WARN, "[rpc] RPC response interest handler failed: id: ", + payload.id, ", err: ", err) + end + end + + ::continue:: + end + end) + + self.write_thread = ngx.thread.spawn(function() + while not exiting() do + local payload, err = self.outgoing:pop(5) + if err then + return nil, err + end + + if payload then + if payload == PING_TYPE then + local _, err = self.wb:send_ping() + if err then + return nil, "failed to send PING frame to peer: " .. err + + else + ngx_log(ngx_DEBUG, "[rpc] sent PING frame to peer") + end + + elseif payload == PONG_TYPE then + local _, err = self.wb:send_pong() + if err then + return nil, "failed to send PONG frame to peer: " .. err + + else + ngx_log(ngx_DEBUG, "[rpc] sent PONG frame to peer") + end + + else + assert(type(payload) == "table") + + local bytes, err = self.wb:send_binary(compress_payload(payload)) + if not bytes then + return nil, err + end + end + end + end + end) +end + + +function _M:join() + local ok, err, perr = ngx.thread.wait(self.write_thread, self.read_thread) + self:stop() + + if not ok then + return nil, err + end + + if perr then + return nil, perr + end + + return true +end + + +function _M:stop() + ngx.thread.kill(self.write_thread) + ngx.thread.kill(self.read_thread) + + if self.wb.close then + self.wb:close() + + else + self.wb:send_close() + end +end + + +-- asynchronously start executing a RPC, _node_id is not +-- needed for this implementation, but it is important +-- for concentrator socket, so we include it just to keep +-- the signature consistent +function _M:call(node_id, method, params, callback) + assert(node_id == self.node_id) + + local id = self:_get_next_id() + + self.interest[id] = callback + + return self.outgoing:push({ + jsonrpc = "2.0", + method = method, + params = params, + id = id, + }) +end + + +return _M diff --git a/kong/clustering/rpc/utils.lua b/kong/clustering/rpc/utils.lua new file mode 100644 index 000000000000..544d2892932f --- /dev/null +++ b/kong/clustering/rpc/utils.lua @@ -0,0 +1,45 @@ +local _M = {} +local pl_stringx = require("pl.stringx") +local cjson = require("cjson") +local snappy = require("resty.snappy") + + +local string_sub = string.sub +local assert = assert +local cjson_encode = cjson.encode +local cjson_decode = cjson.decode +local rfind = pl_stringx.rfind +local snappy_compress = snappy.compress +local snappy_uncompress = snappy.uncompress + + +function _M.parse_method_name(method) + local pos = rfind(method, ".") + if not pos then + return nil, "not a valid method name" + end + + return method:sub(1, pos - 1), method:sub(pos + 1) +end + + +function _M.is_timeout(err) + return err and (err == "timeout" or string_sub(err, -7) == "timeout") +end + + +function _M.compress_payload(payload) + local json = cjson_encode(payload) + local data = assert(snappy_compress(json)) + return data +end + + +function _M.decompress_payload(compressed) + local json = assert(snappy_uncompress(compressed)) + local data = cjson_decode(json) + return data +end + + +return _M diff --git a/kong/clustering/services/debug.lua b/kong/clustering/services/debug.lua new file mode 100644 index 000000000000..387b19e62a1f --- /dev/null +++ b/kong/clustering/services/debug.lua @@ -0,0 +1,70 @@ +local _M = {} + + +local resty_log = require("resty.kong.log") +local constants = require("kong.constants") + + +local tostring = tostring + + +local function rpc_set_log_level(_node_id, new_log_level, timeout) + if not constants.LOG_LEVELS[new_log_level] then + return nil, "unknown log level: " .. tostring(new_log_level) + end + + if type(new_log_level) == "string" then + new_log_level = constants.LOG_LEVELS[new_log_level] + end + + local timeout = math.ceil(timeout or constants.DYN_LOG_LEVEL_DEFAULT_TIMEOUT) + + local _, _, original_level = resty_log.get_log_level() + if new_log_level == original_level then + timeout = 0 + end + + -- this function should not fail, if it throws exception, let RPC framework handle it + resty_log.set_log_level(new_log_level, timeout) + + local data = { + log_level = new_log_level, + timeout = timeout, + } + -- broadcast to all workers in a node + local ok, err = kong.worker_events.post("debug", "log_level", data) + if not ok then + return nil, err + end + + -- store in shm so that newly spawned workers can update their log levels + ok, err = ngx.shared.kong:set(constants.DYN_LOG_LEVEL_KEY, new_log_level, timeout) + if not ok then + return nil, err + end + + ok, err = ngx.shared.kong:set(constants.DYN_LOG_LEVEL_TIMEOUT_AT_KEY, ngx.time() + timeout, timeout) + if not ok then + return nil, err + end + + return true +end + + +local function rpc_get_log_level(_node_id) + local current_level, timeout, original_level = resty_log.get_log_level() + return { current_level = constants.LOG_LEVELS[current_level], + timeout = timeout, + original_level = constants.LOG_LEVELS[original_level], + } +end + + +function _M.init(manager) + manager.callbacks:register("kong.debug.log_level.v1.get_log_level", rpc_get_log_level) + manager.callbacks:register("kong.debug.log_level.v1.set_log_level", rpc_set_log_level) +end + + +return _M diff --git a/kong/conf_loader/constants.lua b/kong/conf_loader/constants.lua index 94f402451bfb..cda8a9a9ccdb 100644 --- a/kong/conf_loader/constants.lua +++ b/kong/conf_loader/constants.lua @@ -497,6 +497,7 @@ local CONF_PARSERS = { cluster_max_payload = { typ = "number" }, cluster_use_proxy = { typ = "boolean" }, cluster_dp_labels = { typ = "array" }, + cluster_rpc = { typ = "boolean" }, kic = { typ = "boolean" }, pluginserver_names = { typ = "array" }, diff --git a/kong/constants.lua b/kong/constants.lua index 2941ffae3f6f..0050ab1fee4f 100644 --- a/kong/constants.lua +++ b/kong/constants.lua @@ -256,6 +256,7 @@ local constants = { DYN_LOG_LEVEL_KEY = "kong:dyn_log_level", DYN_LOG_LEVEL_TIMEOUT_AT_KEY = "kong:dyn_log_level_timeout_at", + DYN_LOG_LEVEL_DEFAULT_TIMEOUT = 60, ADMIN_GUI_KCONFIG_CACHE_KEY = "admin:gui:kconfig", diff --git a/kong/db/migrations/core/023_360_to_370.lua b/kong/db/migrations/core/023_360_to_370.lua new file mode 100644 index 000000000000..f769ca0b7bcd --- /dev/null +++ b/kong/db/migrations/core/023_360_to_370.lua @@ -0,0 +1,23 @@ +return { + postgres = { + up = [[ + CREATE TABLE IF NOT EXISTS "clustering_rpc_requests" ( + "id" BIGSERIAL PRIMARY KEY, + "node_id" UUID NOT NULL, + "reply_to" UUID NOT NULL, + "ttl" TIMESTAMP WITH TIME ZONE NOT NULL, + "payload" JSON NOT NULL + ); + + CREATE INDEX IF NOT EXISTS "clustering_rpc_requests_node_id_idx" ON "clustering_rpc_requests" ("node_id"); + + DO $$ + BEGIN + ALTER TABLE IF EXISTS ONLY "clustering_data_planes" ADD "rpc_capabilities" TEXT[]; + EXCEPTION WHEN DUPLICATE_COLUMN THEN + -- Do nothing, accept existing state + END; + $$; + ]] + } +} diff --git a/kong/db/migrations/core/init.lua b/kong/db/migrations/core/init.lua index b19a271ce7aa..2f18b1cb5f76 100644 --- a/kong/db/migrations/core/init.lua +++ b/kong/db/migrations/core/init.lua @@ -20,4 +20,5 @@ return { "020_330_to_340", "021_340_to_350", "022_350_to_360", + "023_360_to_370", } diff --git a/kong/db/schema/entities/clustering_data_planes.lua b/kong/db/schema/entities/clustering_data_planes.lua index fb1f43db0990..09aee82548af 100644 --- a/kong/db/schema/entities/clustering_data_planes.lua +++ b/kong/db/schema/entities/clustering_data_planes.lua @@ -46,5 +46,7 @@ return { description = "Certificate details of the DPs.", }, }, + { rpc_capabilities = { type = "set", description = "An array of RPC capabilities this node supports.", + elements = typedefs.capability, } }, }, } diff --git a/kong/db/schema/typedefs.lua b/kong/db/schema/typedefs.lua index 66bfc0a5d724..0b0de71d9ea1 100644 --- a/kong/db/schema/typedefs.lua +++ b/kong/db/schema/typedefs.lua @@ -456,6 +456,11 @@ typedefs.tags = Schema.define { description = "A set of strings representing tags." } +typedefs.capability = Schema.define { + type = "string", + description = "A string representing an RPC capability." +} + local http_protocols = {} for p, s in pairs(constants.PROTOCOLS_WITH_SUBSYSTEM) do if s == "http" then diff --git a/kong/db/strategies/postgres/connector.lua b/kong/db/strategies/postgres/connector.lua index 102259dc5beb..80fd6251fd43 100644 --- a/kong/db/strategies/postgres/connector.lua +++ b/kong/db/strategies/postgres/connector.lua @@ -146,6 +146,7 @@ do local res, err = utils_toposort(table_names, get_table_name_neighbors) if res then + insert(res, 1, "clustering_rpc_requests") insert(res, 1, "cluster_events") end diff --git a/kong/init.lua b/kong/init.lua index 2c837dd0e52b..248ae521c593 100644 --- a/kong/init.lua +++ b/kong/init.lua @@ -687,6 +687,14 @@ function Kong.init() if is_http_module and (is_data_plane(config) or is_control_plane(config)) then kong.clustering = require("kong.clustering").new(config) + + if config.cluster_rpc then + kong.rpc = require("kong.clustering.rpc.manager").new(config, kong.node.get_id()) + + if is_data_plane(config) then + require("kong.clustering.services.debug").init(kong.rpc) + end + end end assert(db.vaults:load_vault_schemas(config.loaded_vaults)) @@ -961,6 +969,23 @@ function Kong.init_worker() if kong.clustering then kong.clustering:init_worker() + + local cluster_tls = require("kong.clustering.tls") + + if kong.rpc and is_http_module then + if is_data_plane(kong.configuration) then + ngx.timer.at(0, function(premature) + kong.rpc:connect(premature, + "control_plane", kong.configuration.cluster_control_plane, + "/v2/outlet", + cluster_tls.get_cluster_cert(kong.configuration).cdata, + cluster_tls.get_cluster_cert_key(kong.configuration)) + end) + + else -- control_plane + kong.rpc.concentrator:start() + end + end end ok, err = wasm.init_worker() @@ -1934,6 +1959,15 @@ function Kong.stream_api() end +function Kong.serve_cluster_rpc_listener(options) + log_init_worker_errors() + + ngx.ctx.KONG_PHASE = PHASES.cluster_listener + + return kong.rpc:handle_websocket() +end + + do local events = require "kong.runloop.events" Kong.stream_config_listener = events.stream_reconfigure_listener diff --git a/kong/templates/kong_defaults.lua b/kong/templates/kong_defaults.lua index 86b22f5760cd..62d117290a9b 100644 --- a/kong/templates/kong_defaults.lua +++ b/kong/templates/kong_defaults.lua @@ -41,6 +41,7 @@ cluster_ocsp = off cluster_max_payload = 16777216 cluster_use_proxy = off cluster_dp_labels = NONE +cluster_rpc = on lmdb_environment_path = dbless.lmdb lmdb_map_size = 2048m diff --git a/kong/templates/nginx_kong.lua b/kong/templates/nginx_kong.lua index 9e4127c489bc..5053c26764ce 100644 --- a/kong/templates/nginx_kong.lua +++ b/kong/templates/nginx_kong.lua @@ -566,6 +566,14 @@ server { Kong.serve_cluster_listener() } } + +> if cluster_rpc then + location = /v2/outlet { + content_by_lua_block { + Kong.serve_cluster_rpc_listener() + } + } +> end -- cluster_rpc is enabled } > end -- role == "control_plane" diff --git a/scripts/explain_manifest/fixtures/ubuntu-22.04-amd64.txt b/scripts/explain_manifest/fixtures/ubuntu-22.04-amd64.txt index 443c3426f7f8..cb1dca234d03 100644 --- a/scripts/explain_manifest/fixtures/ubuntu-22.04-amd64.txt +++ b/scripts/explain_manifest/fixtures/ubuntu-22.04-amd64.txt @@ -54,6 +54,13 @@ Needed : - libc.so.6 +- Path : /usr/local/kong/lib/libsnappy.so + Needed : + - libstdc++.so.6 + - libm.so.6 + - libgcc_s.so.1 + - libc.so.6 + - Path : /usr/local/kong/lib/libssl.so.3 Needed : - libstdc++.so.6 diff --git a/spec/01-unit/01-db/06-postgres_spec.lua b/spec/01-unit/01-db/06-postgres_spec.lua index c1a8ec127642..04382b144821 100644 --- a/spec/01-unit/01-db/06-postgres_spec.lua +++ b/spec/01-unit/01-db/06-postgres_spec.lua @@ -269,7 +269,7 @@ describe("kong.db [#postgres] connector", function() local ts = connector._get_topologically_sorted_table_names it("prepends cluster_events no matter what", function() - assert.same({"cluster_events"}, ts({})) + assert.same({"cluster_events", "clustering_rpc_requests"}, ts({})) end) it("sorts an array of unrelated schemas alphabetically by name", function() @@ -277,7 +277,7 @@ describe("kong.db [#postgres] connector", function() local b = schema_new({ name = "b", ttl = true, fields = {} }) local c = schema_new({ name = "c", ttl = true, fields = {} }) - assert.same({"cluster_events", "a", "b", "c"}, ts({ c, a, b })) + assert.same({"cluster_events", "clustering_rpc_requests", "a", "b", "c"}, ts({ c, a, b })) end) it("ignores non-ttl schemas", function() @@ -285,7 +285,7 @@ describe("kong.db [#postgres] connector", function() local b = schema_new({ name = "b", fields = {} }) local c = schema_new({ name = "c", ttl = true, fields = {} }) - assert.same({"cluster_events", "a", "c"}, ts({ c, a, b })) + assert.same({"cluster_events", "clustering_rpc_requests", "a", "c"}, ts({ c, a, b })) end) it("it puts destinations first", function() @@ -306,14 +306,14 @@ describe("kong.db [#postgres] connector", function() } }) - assert.same({"cluster_events", "a", "c", "b"}, ts({ a, b, c })) + assert.same({"cluster_events", "clustering_rpc_requests", "a", "c", "b"}, ts({ a, b, c })) end) it("puts core entities first, even when no relations", function() local a = schema_new({ name = "a", ttl = true, fields = {} }) local routes = schema_new({ name = "routes", ttl = true, fields = {} }) - assert.same({"cluster_events", "routes", "a"}, ts({ a, routes })) + assert.same({"cluster_events", "clustering_rpc_requests", "routes", "a"}, ts({ a, routes })) end) it("puts workspaces before core and others, when no relations", function() @@ -321,7 +321,7 @@ describe("kong.db [#postgres] connector", function() local workspaces = schema_new({ name = "workspaces", ttl = true, fields = {} }) local routes = schema_new({ name = "routes", ttl = true, fields = {} }) - assert.same({"cluster_events", "workspaces", "routes", "a"}, ts({ a, routes, workspaces })) + assert.same({"cluster_events", "clustering_rpc_requests", "workspaces", "routes", "a"}, ts({ a, routes, workspaces })) end) it("puts workspaces first, core entities second, and other entities afterwards, even with relations", function() @@ -343,7 +343,7 @@ describe("kong.db [#postgres] connector", function() } }) local workspaces = schema_new({ name = "workspaces", ttl = true, fields = {} }) - assert.same({ "cluster_events", "workspaces", "services", "routes", "a", "b" }, + assert.same({ "cluster_events", "clustering_rpc_requests", "workspaces", "services", "routes", "a", "b" }, ts({ services, b, a, workspaces, routes })) end) @@ -358,7 +358,7 @@ describe("kong.db [#postgres] connector", function() { a = { type = "foreign", reference = "a" } } -- we somehow forced workspaces to depend on a } }) - assert.same({ "cluster_events", "a", "workspaces", "services" }, ts({ services, a, workspaces })) + assert.same({ "cluster_events", "clustering_rpc_requests", "a", "workspaces", "services" }, ts({ services, a, workspaces })) end) it("returns an error if cycles are found", function() diff --git a/spec/01-unit/04-prefix_handler_spec.lua b/spec/01-unit/04-prefix_handler_spec.lua index eb0dfd76c7a9..9c3724a98d1f 100644 --- a/spec/01-unit/04-prefix_handler_spec.lua +++ b/spec/01-unit/04-prefix_handler_spec.lua @@ -308,6 +308,32 @@ describe("NGINX conf compiler", function() assert.not_matches("ssl_certificate_by_lua_block", kong_nginx_conf) assert.not_matches("ssl_dhparam", kong_nginx_conf) end) + + it("renders RPC server", function() + local conf = assert(conf_loader(helpers.test_conf_path, { + role = "control_plane", + cluster_cert = "spec/fixtures/kong_clustering.crt", + cluster_cert_key = "spec/fixtures/kong_clustering.key", + cluster_listen = "127.0.0.1:9005", + nginx_conf = "spec/fixtures/custom_nginx.template", + })) + local kong_nginx_conf = prefix_handler.compile_kong_conf(conf) + assert.matches("location = /v2/outlet {", kong_nginx_conf) + end) + + it("does not renders RPC server when inert", function() + local conf = assert(conf_loader(helpers.test_conf_path, { + role = "control_plane", + cluster_cert = "spec/fixtures/kong_clustering.crt", + cluster_cert_key = "spec/fixtures/kong_clustering.key", + cluster_listen = "127.0.0.1:9005", + cluster_rpc = "off", + nginx_conf = "spec/fixtures/custom_nginx.template", + })) + local kong_nginx_conf = prefix_handler.compile_kong_conf(conf) + assert.not_matches("location = /v2/outlet {", kong_nginx_conf) + end) + describe("handles client_ssl", function() it("on", function() local conf = assert(conf_loader(helpers.test_conf_path, { diff --git a/spec/02-integration/09-hybrid_mode/01-sync_spec.lua b/spec/02-integration/09-hybrid_mode/01-sync_spec.lua index a27d02faf785..59eebe4b887d 100644 --- a/spec/02-integration/09-hybrid_mode/01-sync_spec.lua +++ b/spec/02-integration/09-hybrid_mode/01-sync_spec.lua @@ -69,7 +69,6 @@ describe("CP/DP communication #" .. strategy, function() assert.near(14 * 86400, v.ttl, 3) assert.matches("^(%d+%.%d+)%.%d+", v.version) assert.equal(CLUSTERING_SYNC_STATUS.NORMAL, v.sync_status) - assert.equal(CLUSTERING_SYNC_STATUS.NORMAL, v.sync_status) return true end end diff --git a/spec/02-integration/18-hybrid_rpc/01-rpc_spec.lua b/spec/02-integration/18-hybrid_rpc/01-rpc_spec.lua new file mode 100644 index 000000000000..1f0ce4bbb919 --- /dev/null +++ b/spec/02-integration/18-hybrid_rpc/01-rpc_spec.lua @@ -0,0 +1,62 @@ +local helpers = require "spec.helpers" +local cjson = require("cjson.safe") + +for _, strategy in helpers.each_strategy() do + describe("Hybrid Mode RPC #" .. strategy, function() + + lazy_setup(function() + helpers.get_db_utils(strategy, { + "clustering_data_planes", + }) -- runs migrations + + assert(helpers.start_kong({ + role = "control_plane", + cluster_cert = "spec/fixtures/kong_clustering.crt", + cluster_cert_key = "spec/fixtures/kong_clustering.key", + database = strategy, + cluster_listen = "127.0.0.1:9005", + nginx_conf = "spec/fixtures/custom_nginx.template", + })) + + assert(helpers.start_kong({ + role = "data_plane", + database = "off", + prefix = "servroot2", + cluster_cert = "spec/fixtures/kong_clustering.crt", + cluster_cert_key = "spec/fixtures/kong_clustering.key", + cluster_control_plane = "127.0.0.1:9005", + proxy_listen = "0.0.0.0:9002", + nginx_conf = "spec/fixtures/custom_nginx.template", + })) + end) + + lazy_teardown(function() + helpers.stop_kong("servroot2") + helpers.stop_kong() + end) + + describe("status API", function() + it("shows DP RPC capability status", function() + helpers.wait_until(function() + local admin_client = helpers.admin_client() + finally(function() + admin_client:close() + end) + + local res = assert(admin_client:get("/clustering/data-planes")) + local body = assert.res_status(200, res) + local json = cjson.decode(body) + + for _, v in pairs(json.data) do + if v.ip == "127.0.0.1" and v.rpc_capabilities and #v.rpc_capabilities ~= 0 then + table.sort(v.rpc_capabilities) + assert.near(14 * 86400, v.ttl, 3) + assert.same({ "kong.debug.log_level.v1", }, v.rpc_capabilities) + return true + end + end + end, 10) + end) + end) + end) +end diff --git a/spec/02-integration/18-hybrid_rpc/02-log-level_spec.lua b/spec/02-integration/18-hybrid_rpc/02-log-level_spec.lua new file mode 100644 index 000000000000..fcebad0695fc --- /dev/null +++ b/spec/02-integration/18-hybrid_rpc/02-log-level_spec.lua @@ -0,0 +1,181 @@ +local helpers = require "spec.helpers" +local cjson = require("cjson.safe") + + +local function obtain_dp_node_id() + local dp_node_id + + helpers.wait_until(function() + local admin_client = helpers.admin_client() + finally(function() + admin_client:close() + end) + + local res = assert(admin_client:get("/clustering/data-planes")) + local body = assert.res_status(200, res) + local json = cjson.decode(body) + + for _, v in pairs(json.data) do + if v.ip == "127.0.0.1" and ngx.time() - v.last_seen < 3 then + dp_node_id = v.id + return true + end + end + end, 10) + + return dp_node_id +end + + +for _, strategy in helpers.each_strategy() do + describe("Hybrid Mode RPC #" .. strategy, function() + + lazy_setup(function() + helpers.get_db_utils(strategy, { + "clustering_data_planes", + }) -- runs migrations + + assert(helpers.start_kong({ + role = "control_plane", + cluster_cert = "spec/fixtures/kong_clustering.crt", + cluster_cert_key = "spec/fixtures/kong_clustering.key", + database = strategy, + cluster_listen = "127.0.0.1:9005", + nginx_conf = "spec/fixtures/custom_nginx.template", + })) + + assert(helpers.start_kong({ + role = "data_plane", + database = "off", + prefix = "servroot2", + cluster_cert = "spec/fixtures/kong_clustering.crt", + cluster_cert_key = "spec/fixtures/kong_clustering.key", + cluster_control_plane = "127.0.0.1:9005", + proxy_listen = "0.0.0.0:9002", + nginx_conf = "spec/fixtures/custom_nginx.template", + })) + end) + + lazy_teardown(function() + helpers.stop_kong("servroot2") + helpers.stop_kong() + end) + + describe("Dynamic log level over RPC", function() + it("can get the current log level", function() + local dp_node_id = obtain_dp_node_id() + + local admin_client = helpers.admin_client() + finally(function() + admin_client:close() + end) + + local res = assert(admin_client:get("/clustering/data-planes/" .. dp_node_id .. "/log-level")) + local body = assert.res_status(200, res) + local json = cjson.decode(body) + assert.equal(0, json.timeout) + assert.equal("debug", json.current_level) + assert.equal("debug", json.original_level) + end) + + it("can set the current log level", function() + local dp_node_id = obtain_dp_node_id() + + local admin_client = helpers.admin_client() + finally(function() + admin_client:close() + end) + + local res = assert(admin_client:put("/clustering/data-planes/" .. dp_node_id .. "/log-level", + { + headers = { + ["Content-Type"] = "application/json", + }, + body = { + current_level = "info", + timeout = 10, + }, + })) + assert.res_status(201, res) + + local res = assert(admin_client:get("/clustering/data-planes/" .. dp_node_id .. "/log-level")) + local body = assert.res_status(200, res) + local json = cjson.decode(body) + assert.near(10, json.timeout, 3) + assert.equal("info", json.current_level) + assert.equal("debug", json.original_level) + end) + + it("set current log level to original_level turns off feature", function() + local dp_node_id = obtain_dp_node_id() + + local admin_client = helpers.admin_client() + finally(function() + admin_client:close() + end) + + local res = assert(admin_client:put("/clustering/data-planes/" .. dp_node_id .. "/log-level", + { + headers = { + ["Content-Type"] = "application/json", + }, + body = { + current_level = "info", + timeout = 10, + }, + })) + assert.res_status(201, res) + + local res = assert(admin_client:put("/clustering/data-planes/" .. dp_node_id .. "/log-level", + { + headers = { + ["Content-Type"] = "application/json", + }, + body = { + current_level = "debug", + timeout = 10, + }, + })) + assert.res_status(201, res) + + local res = assert(admin_client:get("/clustering/data-planes/" .. dp_node_id .. "/log-level")) + local body = assert.res_status(200, res) + local json = cjson.decode(body) + assert.equal(0, json.timeout) + assert.equal("debug", json.current_level) + assert.equal("debug", json.original_level) + end) + + it("DELETE turns off feature", function() + local dp_node_id = obtain_dp_node_id() + + local admin_client = helpers.admin_client() + finally(function() + admin_client:close() + end) + + local res = assert(admin_client:put("/clustering/data-planes/" .. dp_node_id .. "/log-level", + { + headers = { + ["Content-Type"] = "application/json", + }, + body = { + current_level = "info", + timeout = 10, + }, + })) + assert.res_status(201, res) + + local res = assert(admin_client:delete("/clustering/data-planes/" .. dp_node_id .. "/log-level")) + assert.res_status(204, res) + + local res = assert(admin_client:get("/clustering/data-planes/" .. dp_node_id .. "/log-level")) + local body = assert.res_status(200, res) + local json = cjson.decode(body) + assert.equal(0, json.timeout) + assert.equal("debug", json.current_level) + assert.equal("debug", json.original_level) + end) + end) + end) +end diff --git a/spec/02-integration/18-hybrid_rpc/03-inert_spec.lua b/spec/02-integration/18-hybrid_rpc/03-inert_spec.lua new file mode 100644 index 000000000000..4a6d73cf659c --- /dev/null +++ b/spec/02-integration/18-hybrid_rpc/03-inert_spec.lua @@ -0,0 +1,101 @@ +local helpers = require "spec.helpers" +local cjson = require("cjson.safe") + + +local function obtain_dp_node_id() + local dp_node_id + + helpers.wait_until(function() + local admin_client = helpers.admin_client() + finally(function() + admin_client:close() + end) + + local res = assert(admin_client:get("/clustering/data-planes")) + local body = assert.res_status(200, res) + local json = cjson.decode(body) + + for _, v in pairs(json.data) do + if v.ip == "127.0.0.1" and ngx.time() - v.last_seen < 3 then + dp_node_id = v.id + return true + end + end + end, 10) + + return dp_node_id +end + + +for _, strategy in helpers.each_strategy() do + describe("Hybrid Mode RPC inert #" .. strategy, function() + + lazy_setup(function() + helpers.get_db_utils(strategy, { + "clustering_data_planes", + }) -- runs migrations + + assert(helpers.start_kong({ + role = "control_plane", + cluster_cert = "spec/fixtures/kong_clustering.crt", + cluster_cert_key = "spec/fixtures/kong_clustering.key", + database = strategy, + cluster_listen = "127.0.0.1:9005", + cluster_rpc = "off", + nginx_conf = "spec/fixtures/custom_nginx.template", + })) + + assert(helpers.start_kong({ + role = "data_plane", + database = "off", + prefix = "servroot2", + cluster_cert = "spec/fixtures/kong_clustering.crt", + cluster_cert_key = "spec/fixtures/kong_clustering.key", + cluster_control_plane = "127.0.0.1:9005", + cluster_rpc = "off", + proxy_listen = "0.0.0.0:9002", + nginx_conf = "spec/fixtures/custom_nginx.template", + })) + end) + + lazy_teardown(function() + helpers.stop_kong("servroot2") + helpers.stop_kong() + end) + + describe("RPC inert", function() + it("rpc_capability list should be empty", function() + helpers.wait_until(function() + local admin_client = helpers.admin_client() + finally(function() + admin_client:close() + end) + + local res = assert(admin_client:get("/clustering/data-planes")) + local body = assert.res_status(200, res) + local json = cjson.decode(body) + + for _, v in pairs(json.data) do + if v.ip == "127.0.0.1" then + assert.near(14 * 86400, v.ttl, 3) + assert.equal(0, #v.rpc_capabilities) + return true + end + end + end, 10) + end) + + it("can not get the current log level", function() + local dp_node_id = obtain_dp_node_id() + + local admin_client = helpers.admin_client() + finally(function() + admin_client:close() + end) + + local res = assert(admin_client:get("/clustering/data-planes/" .. dp_node_id .. "/log-level")) + assert.res_status(404, res) + end) + end) + end) +end diff --git a/spec/05-migration/db/migrations/core/023_360_to_370_spec.lua b/spec/05-migration/db/migrations/core/023_360_to_370_spec.lua new file mode 100644 index 000000000000..d9c52c42ec49 --- /dev/null +++ b/spec/05-migration/db/migrations/core/023_360_to_370_spec.lua @@ -0,0 +1,12 @@ +local uh = require "spec/upgrade_helpers" + +describe("database migration", function() + uh.old_after_up("has created the \"clustering_rpc_requests\" table", function() + assert.database_has_relation("clustering_rpc_requests") + assert.table_has_column("clustering_rpc_requests", "id", "bigint") + assert.table_has_column("clustering_rpc_requests", "node_id", "uuid") + assert.table_has_column("clustering_rpc_requests", "reply_to", "uuid") + assert.table_has_column("clustering_rpc_requests", "ttl", "timestamp with time zone") + assert.table_has_column("clustering_rpc_requests", "payload", "json") + end) +end)