Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(incremental): reduce the use of timers #13732

Merged
merged 1 commit into from
Oct 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 1 addition & 7 deletions kong/clustering/services/sync/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,7 @@ function _M:init_worker()
-- sync to CP ASAP
assert(self.rpc:sync_once(FIRST_SYNC_DELAY))

assert(ngx.timer.every(EACH_SYNC_DELAY, function(premature)
if premature then
return
end

assert(self.rpc:sync_once())
end))
assert(self.rpc:sync_every(EACH_SYNC_DELAY))
end


Expand Down
212 changes: 113 additions & 99 deletions kong/clustering/services/sync/rpc.lua
Original file line number Diff line number Diff line change
Expand Up @@ -182,137 +182,151 @@ function _M:init(manager, is_cp)
end


function _M:sync_once(delay)
local hdl, err = ngx.timer.at(delay or 0, function(premature)
if premature then
return
end

local res, err = concurrency.with_worker_mutex(SYNC_MUTEX_OPTS, function()
-- here must be 2 times
for _ = 1, 2 do
local ns_deltas, err = kong.rpc:call("control_plane", "kong.sync.v2.get_delta",
{ default =
{ version =
tonumber(declarative.get_current_hash()) or 0,
},
})
if not ns_deltas then
ngx_log(ngx_ERR, "sync get_delta error: ", err)
return true
end

local ns_delta
local function do_sync(premature)
if premature then
return
end

for namespace, delta in pairs(ns_deltas) do
if namespace == "default" then
ns_delta = delta
break -- should we break here?
end
end
local res, err = concurrency.with_worker_mutex(SYNC_MUTEX_OPTS, function()
-- here must be 2 times
for _ = 1, 2 do
local ns_deltas, err = kong.rpc:call("control_plane", "kong.sync.v2.get_delta",
{ default =
{ version =
tonumber(declarative.get_current_hash()) or 0,
},
})
if not ns_deltas then
ngx_log(ngx_ERR, "sync get_delta error: ", err)
return true
end

if not ns_delta then
return nil, "default namespace does not exist inside params"
end
local ns_delta

if #ns_delta.deltas == 0 then
ngx_log(ngx_DEBUG, "no delta to sync")
return true
for namespace, delta in pairs(ns_deltas) do
if namespace == "default" then
ns_delta = delta
break -- should we break here?
end
end

local t = txn.begin(512)
if not ns_delta then
return nil, "default namespace does not exist inside params"
end

if ns_delta.wipe then
t:db_drop(false)
end
if #ns_delta.deltas == 0 then
ngx_log(ngx_DEBUG, "no delta to sync")
return true
end

local db = kong.db
local t = txn.begin(512)

local version = 0
local crud_events = {}
local crud_events_n = 0
if ns_delta.wipe then
t:db_drop(false)
end

for _, delta in ipairs(ns_delta.deltas) do
local delta_type = delta.type
local delta_row = delta.row
local ev
local db = kong.db

if delta_row ~= ngx_null then
-- upsert the entity
-- does the entity already exists?
local old_entity, err = db[delta_type]:select(delta_row)
if err then
return nil, err
end
local version = 0
local crud_events = {}
local crud_events_n = 0

local crud_event_type = "create"
for _, delta in ipairs(ns_delta.deltas) do
local delta_type = delta.type
local delta_row = delta.row
local ev

if old_entity then
local res, err = delete_entity_for_txn(t, delta_type, old_entity, nil)
if not res then
return nil, err
end
if delta_row ~= ngx_null then
-- upsert the entity
-- does the entity already exists?
local old_entity, err = db[delta_type]:select(delta_row)
if err then
return nil, err
end

crud_event_type = "update"
end
local crud_event_type = "create"

local res, err = insert_entity_for_txn(t, delta_type, delta_row, nil)
if old_entity then
local res, err = delete_entity_for_txn(t, delta_type, old_entity, nil)
if not res then
return nil, err
end

ev = { delta_type, crud_event_type, delta_row, old_entity, }
crud_event_type = "update"
end

else
-- delete the entity
local old_entity, err = kong.db[delta_type]:select({ id = delta.id, }) -- TODO: composite key
if err then
return nil, err
end
local res, err = insert_entity_for_txn(t, delta_type, delta_row, nil)
if not res then
return nil, err
end

if old_entity then
local res, err = delete_entity_for_txn(t, delta_type, old_entity, nil)
if not res then
return nil, err
end
end
ev = { delta_type, crud_event_type, delta_row, old_entity, }

ev = { delta_type, "delete", old_entity, }
else
-- delete the entity
local old_entity, err = kong.db[delta_type]:select({ id = delta.id, }) -- TODO: composite key
if err then
return nil, err
end

crud_events_n = crud_events_n + 1
crud_events[crud_events_n] = ev

-- XXX TODO: could delta.version be nil or ngx.null
if type(delta.version) == "number" and delta.version ~= version then
version = delta.version
if old_entity then
local res, err = delete_entity_for_txn(t, delta_type, old_entity, nil)
if not res then
return nil, err
end
end

ev = { delta_type, "delete", old_entity, }
end

t:set(DECLARATIVE_HASH_KEY, fmt("%032d", version))
local ok, err = t:commit()
if not ok then
return nil, err
crud_events_n = crud_events_n + 1
crud_events[crud_events_n] = ev

-- XXX TODO: could delta.version be nil or ngx.null
if type(delta.version) == "number" and delta.version ~= version then
version = delta.version
end
end

t:set(DECLARATIVE_HASH_KEY, fmt("%032d", version))
local ok, err = t:commit()
if not ok then
return nil, err
end

if ns_delta.wipe then
kong.core_cache:purge()
kong.cache:purge()
if ns_delta.wipe then
kong.core_cache:purge()
kong.cache:purge()

else
for _, event in ipairs(crud_events) do
-- delta_type, crud_event_type, delta.row, old_entity
db[event[1]]:post_crud_event(event[2], event[3], event[4])
end
else
for _, event in ipairs(crud_events) do
-- delta_type, crud_event_type, delta.row, old_entity
db[event[1]]:post_crud_event(event[2], event[3], event[4])
end
end -- for _, delta
end
end -- for _, delta

return true
end)
if not res and err ~= "timeout" then
ngx_log(ngx_ERR, "unable to create worker mutex and sync: ", err)
end
return true
end)
if not res and err ~= "timeout" then
ngx_log(ngx_ERR, "unable to create worker mutex and sync: ", err)
end
end


function _M:sync_once(delay)
local hdl, err = ngx.timer.at(delay or 0, do_sync)

if not hdl then
return nil, err
end

return true
end


function _M:sync_every(delay)
local hdl, err = ngx.timer.every(delay, do_sync)

if not hdl then
return nil, err
Expand Down
Loading