Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(clustering): delta version is null #13789

Merged
merged 3 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 13 additions & 6 deletions kong/clustering/services/sync/rpc.lua
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ local txn = require("resty.lmdb.transaction")
local declarative = require("kong.db.declarative")
local constants = require("kong.constants")
local concurrency = require("kong.concurrency")
local isempty = require("table.isempty")


local insert_entity_for_txn = declarative.insert_entity_for_txn
Expand Down Expand Up @@ -93,7 +94,7 @@ function _M:init_cp(manager)
end

-- is the node empty? If so, just do a full sync to bring it up to date faster
if default_namespace_version == 0 or latest_version == ngx_null or
if default_namespace_version == 0 or
latest_version - default_namespace_version > FULL_SYNC_THRESHOLD
then
-- we need to full sync because holes are found
Expand All @@ -118,7 +119,7 @@ function _M:init_cp(manager)
return nil, err
end

if #res == 0 then
if isempty(res) then
ngx_log(ngx_DEBUG,
"[kong.sync.v2] no delta for node_id: ", node_id,
", current_version: ", default_namespace_version,
Expand Down Expand Up @@ -213,16 +214,18 @@ local function do_sync()
return nil, "default namespace does not exist inside params"
end

if #ns_delta.deltas == 0 then
if isempty(ns_delta.deltas) then
ngx_log(ngx_DEBUG, "no delta to sync")
return true
end

-- we should find the correct default workspace
-- and replace the old one with it
local default_ws_changed
for _, delta in ipairs(ns_delta.deltas) do
if delta.type == "workspaces" and delta.row.name == "default" then
kong.default_workspace = delta.row.id
default_ws_changed = true
break
end
end
Expand Down Expand Up @@ -295,8 +298,10 @@ local function do_sync()
crud_events_n = crud_events_n + 1
crud_events[crud_events_n] = ev

-- XXX TODO: could delta.version be nil or ngx.null
if type(delta.version) == "number" and delta.version ~= version then
-- delta.version should not be nil or ngx.null
assert(type(delta.version) == "number")

if delta.version ~= version then
version = delta.version
end
end -- for _, delta
Expand All @@ -305,7 +310,9 @@ local function do_sync()
t:set(DECLARATIVE_HASH_KEY, fmt("%032d", version))

-- store the correct default workspace uuid
t:set(DECLARATIVE_DEFAULT_WORKSPACE_KEY, kong.default_workspace)
if default_ws_changed then
t:set(DECLARATIVE_DEFAULT_WORKSPACE_KEY, kong.default_workspace)
end

local ok, err = t:commit()
if not ok then
Expand Down
8 changes: 7 additions & 1 deletion kong/clustering/services/sync/strategies/postgres.lua
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ local buffer = require("string.buffer")

local string_format = string.format
local cjson_encode = cjson.encode
local ngx_null = ngx.null
local ngx_log = ngx.log
local ngx_ERR = ngx.ERR
local ngx_DEBUG = ngx.DEBUG
Expand Down Expand Up @@ -99,7 +100,12 @@ function _M:get_latest_version()
return nil, err
end

return res[1] and res[1].max_version
local ver = res[1] and res[1].max_version
if ver == ngx_null then
return 0
end

return ver
end


Expand Down
4 changes: 4 additions & 0 deletions kong/db/declarative/export.lua
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,11 @@ local function export_from_db_impl(emitter, skip_ws, skip_disabled_entities, exp
return nil, err
end

-- it will be ngx.null when the table clustering_sync_version is empty
sync_version = assert(ok[1].max)
if sync_version == null then
sync_version = 0
end
end

emitter:emit_toplevel({
Expand Down
Loading