From 2102fd939efbf51cf21020cf36b996bf253c3a6d Mon Sep 17 00:00:00 2001 From: chronolaw Date: Mon, 14 Oct 2024 11:07:03 +0800 Subject: [PATCH] refactor do_sync/sync_handler --- kong/clustering/services/sync/rpc.lua | 212 ++++++++++++++------------ 1 file changed, 111 insertions(+), 101 deletions(-) diff --git a/kong/clustering/services/sync/rpc.lua b/kong/clustering/services/sync/rpc.lua index 5fe100ef734b..ba075bf3ff33 100644 --- a/kong/clustering/services/sync/rpc.lua +++ b/kong/clustering/services/sync/rpc.lua @@ -187,127 +187,137 @@ function _M:init(manager, is_cp) end -local function do_sync(premature) - if premature then - return +local function sync_handler() + local ns_deltas, err = kong.rpc:call("control_plane", "kong.sync.v2.get_delta", + { default = + { version = + tonumber(declarative.get_current_hash()) or 0, + }, + }) + if not ns_deltas then + ngx_log(ngx_ERR, "sync get_delta error: ", err) + return true end - local res, err = concurrency.with_worker_mutex(SYNC_MUTEX_OPTS, function() - -- here must be 2 times - for _ = 1, 2 do - local ns_deltas, err = kong.rpc:call("control_plane", "kong.sync.v2.get_delta", - { default = - { version = - tonumber(declarative.get_current_hash()) or 0, - }, - }) - if not ns_deltas then - ngx_log(ngx_ERR, "sync get_delta error: ", err) - return true + local ns_delta + + for namespace, delta in pairs(ns_deltas) do + if namespace == "default" then + ns_delta = delta + break -- should we break here? + end + end + + if not ns_delta then + return nil, "default namespace does not exist inside params" + end + + if #ns_delta.deltas == 0 then + ngx_log(ngx_DEBUG, "no delta to sync") + return true + end + + local t = txn.begin(512) + + if ns_delta.wipe then + t:db_drop(false) + end + + local db = kong.db + + local version = 0 + local crud_events = {} + local crud_events_n = 0 + + for _, delta in ipairs(ns_delta.deltas) do + local delta_type = delta.type + local delta_row = delta.row + local ev + + if delta_row ~= ngx_null then + -- upsert the entity + -- does the entity already exists? + local old_entity, err = db[delta_type]:select(delta_row) + if err then + return nil, err end - local ns_delta + local crud_event_type = "create" - for namespace, delta in pairs(ns_deltas) do - if namespace == "default" then - ns_delta = delta - break -- should we break here? + if old_entity then + local res, err = delete_entity_for_txn(t, delta_type, old_entity, nil) + if not res then + return nil, err end - end - if not ns_delta then - return nil, "default namespace does not exist inside params" + crud_event_type = "update" end - if #ns_delta.deltas == 0 then - ngx_log(ngx_DEBUG, "no delta to sync") - return true + local res, err = insert_entity_for_txn(t, delta_type, delta_row, nil) + if not res then + return nil, err end - local t = txn.begin(512) + ev = { delta_type, crud_event_type, delta_row, old_entity, } - if ns_delta.wipe then - t:db_drop(false) + else + -- delete the entity + local old_entity, err = kong.db[delta_type]:select({ id = delta.id, }) -- TODO: composite key + if err then + return nil, err end - local db = kong.db - - local version = 0 - local crud_events = {} - local crud_events_n = 0 - - for _, delta in ipairs(ns_delta.deltas) do - local delta_type = delta.type - local delta_row = delta.row - local ev - - if delta_row ~= ngx_null then - -- upsert the entity - -- does the entity already exists? - local old_entity, err = db[delta_type]:select(delta_row) - if err then - return nil, err - end - - local crud_event_type = "create" - - if old_entity then - local res, err = delete_entity_for_txn(t, delta_type, old_entity, nil) - if not res then - return nil, err - end - - crud_event_type = "update" - end - - local res, err = insert_entity_for_txn(t, delta_type, delta_row, nil) - if not res then - return nil, err - end - - ev = { delta_type, crud_event_type, delta_row, old_entity, } - - else - -- delete the entity - local old_entity, err = kong.db[delta_type]:select({ id = delta.id, }) -- TODO: composite key - if err then - return nil, err - end - - if old_entity then - local res, err = delete_entity_for_txn(t, delta_type, old_entity, nil) - if not res then - return nil, err - end - end - - ev = { delta_type, "delete", old_entity, } + if old_entity then + local res, err = delete_entity_for_txn(t, delta_type, old_entity, nil) + if not res then + return nil, err end + end + + ev = { delta_type, "delete", old_entity, } + end - crud_events_n = crud_events_n + 1 - crud_events[crud_events_n] = ev + crud_events_n = crud_events_n + 1 + crud_events[crud_events_n] = ev - -- XXX TODO: could delta.version be nil or ngx.null - if type(delta.version) == "number" and delta.version ~= version then - version = delta.version - end - end + -- XXX TODO: could delta.version be nil or ngx.null + if type(delta.version) == "number" and delta.version ~= version then + version = delta.version + end + end -- for _, delta - t:set(DECLARATIVE_HASH_KEY, fmt("%032d", version)) - local ok, err = t:commit() - if not ok then - return nil, err - end + t:set(DECLARATIVE_HASH_KEY, fmt("%032d", version)) + local ok, err = t:commit() + if not ok then + return nil, err + end - if ns_delta.wipe then - kong.core_cache:purge() - kong.cache:purge() + if ns_delta.wipe then + kong.core_cache:purge() + kong.cache:purge() - else - for _, event in ipairs(crud_events) do - -- delta_type, crud_event_type, delta.row, old_entity - db[event[1]]:post_crud_event(event[2], event[3], event[4]) - end + else + for _, event in ipairs(crud_events) do + -- delta_type, crud_event_type, delta.row, old_entity + db[event[1]]:post_crud_event(event[2], event[3], event[4]) + end + end + + return true +end + + +local function do_sync(premature) + if premature then + return + end + + local res, err = concurrency.with_worker_mutex(SYNC_MUTEX_OPTS, function() + -- here must be 2 times + for _ = 1, 2 do + local ok, err = sync_handler() + if not ok then + return nil, err end end -- for _, delta