Skip to content

Commit

Permalink
refactor do_sync/sync_handler
Browse files Browse the repository at this point in the history
  • Loading branch information
chronolaw committed Oct 14, 2024
1 parent 8fbb26f commit 2102fd9
Showing 1 changed file with 111 additions and 101 deletions.
212 changes: 111 additions & 101 deletions kong/clustering/services/sync/rpc.lua
Original file line number Diff line number Diff line change
Expand Up @@ -187,127 +187,137 @@ function _M:init(manager, is_cp)
end


local function do_sync(premature)
if premature then
return
local function sync_handler()
local ns_deltas, err = kong.rpc:call("control_plane", "kong.sync.v2.get_delta",
{ default =
{ version =
tonumber(declarative.get_current_hash()) or 0,
},
})
if not ns_deltas then
ngx_log(ngx_ERR, "sync get_delta error: ", err)
return true
end

local res, err = concurrency.with_worker_mutex(SYNC_MUTEX_OPTS, function()
-- here must be 2 times
for _ = 1, 2 do
local ns_deltas, err = kong.rpc:call("control_plane", "kong.sync.v2.get_delta",
{ default =
{ version =
tonumber(declarative.get_current_hash()) or 0,
},
})
if not ns_deltas then
ngx_log(ngx_ERR, "sync get_delta error: ", err)
return true
local ns_delta

for namespace, delta in pairs(ns_deltas) do
if namespace == "default" then
ns_delta = delta
break -- should we break here?
end
end

if not ns_delta then
return nil, "default namespace does not exist inside params"
end

if #ns_delta.deltas == 0 then
ngx_log(ngx_DEBUG, "no delta to sync")
return true
end

local t = txn.begin(512)

if ns_delta.wipe then
t:db_drop(false)
end

local db = kong.db

local version = 0
local crud_events = {}
local crud_events_n = 0

for _, delta in ipairs(ns_delta.deltas) do
local delta_type = delta.type
local delta_row = delta.row
local ev

if delta_row ~= ngx_null then
-- upsert the entity
-- does the entity already exists?
local old_entity, err = db[delta_type]:select(delta_row)
if err then
return nil, err
end

local ns_delta
local crud_event_type = "create"

for namespace, delta in pairs(ns_deltas) do
if namespace == "default" then
ns_delta = delta
break -- should we break here?
if old_entity then
local res, err = delete_entity_for_txn(t, delta_type, old_entity, nil)
if not res then
return nil, err
end
end

if not ns_delta then
return nil, "default namespace does not exist inside params"
crud_event_type = "update"
end

if #ns_delta.deltas == 0 then
ngx_log(ngx_DEBUG, "no delta to sync")
return true
local res, err = insert_entity_for_txn(t, delta_type, delta_row, nil)
if not res then
return nil, err
end

local t = txn.begin(512)
ev = { delta_type, crud_event_type, delta_row, old_entity, }

if ns_delta.wipe then
t:db_drop(false)
else
-- delete the entity
local old_entity, err = kong.db[delta_type]:select({ id = delta.id, }) -- TODO: composite key
if err then
return nil, err
end

local db = kong.db

local version = 0
local crud_events = {}
local crud_events_n = 0

for _, delta in ipairs(ns_delta.deltas) do
local delta_type = delta.type
local delta_row = delta.row
local ev

if delta_row ~= ngx_null then
-- upsert the entity
-- does the entity already exists?
local old_entity, err = db[delta_type]:select(delta_row)
if err then
return nil, err
end

local crud_event_type = "create"

if old_entity then
local res, err = delete_entity_for_txn(t, delta_type, old_entity, nil)
if not res then
return nil, err
end

crud_event_type = "update"
end

local res, err = insert_entity_for_txn(t, delta_type, delta_row, nil)
if not res then
return nil, err
end

ev = { delta_type, crud_event_type, delta_row, old_entity, }

else
-- delete the entity
local old_entity, err = kong.db[delta_type]:select({ id = delta.id, }) -- TODO: composite key
if err then
return nil, err
end

if old_entity then
local res, err = delete_entity_for_txn(t, delta_type, old_entity, nil)
if not res then
return nil, err
end
end

ev = { delta_type, "delete", old_entity, }
if old_entity then
local res, err = delete_entity_for_txn(t, delta_type, old_entity, nil)
if not res then
return nil, err
end
end

ev = { delta_type, "delete", old_entity, }
end

crud_events_n = crud_events_n + 1
crud_events[crud_events_n] = ev
crud_events_n = crud_events_n + 1
crud_events[crud_events_n] = ev

-- XXX TODO: could delta.version be nil or ngx.null
if type(delta.version) == "number" and delta.version ~= version then
version = delta.version
end
end
-- XXX TODO: could delta.version be nil or ngx.null
if type(delta.version) == "number" and delta.version ~= version then
version = delta.version
end
end -- for _, delta

t:set(DECLARATIVE_HASH_KEY, fmt("%032d", version))
local ok, err = t:commit()
if not ok then
return nil, err
end
t:set(DECLARATIVE_HASH_KEY, fmt("%032d", version))
local ok, err = t:commit()
if not ok then
return nil, err
end

if ns_delta.wipe then
kong.core_cache:purge()
kong.cache:purge()
if ns_delta.wipe then
kong.core_cache:purge()
kong.cache:purge()

else
for _, event in ipairs(crud_events) do
-- delta_type, crud_event_type, delta.row, old_entity
db[event[1]]:post_crud_event(event[2], event[3], event[4])
end
else
for _, event in ipairs(crud_events) do
-- delta_type, crud_event_type, delta.row, old_entity
db[event[1]]:post_crud_event(event[2], event[3], event[4])
end
end

return true
end


local function do_sync(premature)
if premature then
return
end

local res, err = concurrency.with_worker_mutex(SYNC_MUTEX_OPTS, function()
-- here must be 2 times
for _ = 1, 2 do
local ok, err = sync_handler()
if not ok then
return nil, err
end
end -- for _, delta

Expand Down

0 comments on commit 2102fd9

Please sign in to comment.