diff --git a/kong/clustering/services/sync/init.lua b/kong/clustering/services/sync/init.lua index 83196c7dd44b..40f1b836241b 100644 --- a/kong/clustering/services/sync/init.lua +++ b/kong/clustering/services/sync/init.lua @@ -56,13 +56,7 @@ function _M:init_worker() -- sync to CP ASAP assert(self.rpc:sync_once(FIRST_SYNC_DELAY)) - assert(ngx.timer.every(EACH_SYNC_DELAY, function(premature) - if premature then - return - end - - assert(self.rpc:sync_once()) - end)) + assert(self.rpc:sync_every(EACH_SYNC_DELAY)) end diff --git a/kong/clustering/services/sync/rpc.lua b/kong/clustering/services/sync/rpc.lua index 33ee105d825d..4c760be896e5 100644 --- a/kong/clustering/services/sync/rpc.lua +++ b/kong/clustering/services/sync/rpc.lua @@ -182,137 +182,151 @@ function _M:init(manager, is_cp) end -function _M:sync_once(delay) - local hdl, err = ngx.timer.at(delay or 0, function(premature) - if premature then - return - end - - local res, err = concurrency.with_worker_mutex(SYNC_MUTEX_OPTS, function() - -- here must be 2 times - for _ = 1, 2 do - local ns_deltas, err = kong.rpc:call("control_plane", "kong.sync.v2.get_delta", - { default = - { version = - tonumber(declarative.get_current_hash()) or 0, - }, - }) - if not ns_deltas then - ngx_log(ngx_ERR, "sync get_delta error: ", err) - return true - end - - local ns_delta +local function do_sync(premature) + if premature then + return + end - for namespace, delta in pairs(ns_deltas) do - if namespace == "default" then - ns_delta = delta - break -- should we break here? - end - end + local res, err = concurrency.with_worker_mutex(SYNC_MUTEX_OPTS, function() + -- here must be 2 times + for _ = 1, 2 do + local ns_deltas, err = kong.rpc:call("control_plane", "kong.sync.v2.get_delta", + { default = + { version = + tonumber(declarative.get_current_hash()) or 0, + }, + }) + if not ns_deltas then + ngx_log(ngx_ERR, "sync get_delta error: ", err) + return true + end - if not ns_delta then - return nil, "default namespace does not exist inside params" - end + local ns_delta - if #ns_delta.deltas == 0 then - ngx_log(ngx_DEBUG, "no delta to sync") - return true + for namespace, delta in pairs(ns_deltas) do + if namespace == "default" then + ns_delta = delta + break -- should we break here? end + end - local t = txn.begin(512) + if not ns_delta then + return nil, "default namespace does not exist inside params" + end - if ns_delta.wipe then - t:db_drop(false) - end + if #ns_delta.deltas == 0 then + ngx_log(ngx_DEBUG, "no delta to sync") + return true + end - local db = kong.db + local t = txn.begin(512) - local version = 0 - local crud_events = {} - local crud_events_n = 0 + if ns_delta.wipe then + t:db_drop(false) + end - for _, delta in ipairs(ns_delta.deltas) do - local delta_type = delta.type - local delta_row = delta.row - local ev + local db = kong.db - if delta_row ~= ngx_null then - -- upsert the entity - -- does the entity already exists? - local old_entity, err = db[delta_type]:select(delta_row) - if err then - return nil, err - end + local version = 0 + local crud_events = {} + local crud_events_n = 0 - local crud_event_type = "create" + for _, delta in ipairs(ns_delta.deltas) do + local delta_type = delta.type + local delta_row = delta.row + local ev - if old_entity then - local res, err = delete_entity_for_txn(t, delta_type, old_entity, nil) - if not res then - return nil, err - end + if delta_row ~= ngx_null then + -- upsert the entity + -- does the entity already exists? + local old_entity, err = db[delta_type]:select(delta_row) + if err then + return nil, err + end - crud_event_type = "update" - end + local crud_event_type = "create" - local res, err = insert_entity_for_txn(t, delta_type, delta_row, nil) + if old_entity then + local res, err = delete_entity_for_txn(t, delta_type, old_entity, nil) if not res then return nil, err end - ev = { delta_type, crud_event_type, delta_row, old_entity, } + crud_event_type = "update" + end - else - -- delete the entity - local old_entity, err = kong.db[delta_type]:select({ id = delta.id, }) -- TODO: composite key - if err then - return nil, err - end + local res, err = insert_entity_for_txn(t, delta_type, delta_row, nil) + if not res then + return nil, err + end - if old_entity then - local res, err = delete_entity_for_txn(t, delta_type, old_entity, nil) - if not res then - return nil, err - end - end + ev = { delta_type, crud_event_type, delta_row, old_entity, } - ev = { delta_type, "delete", old_entity, } + else + -- delete the entity + local old_entity, err = kong.db[delta_type]:select({ id = delta.id, }) -- TODO: composite key + if err then + return nil, err end - crud_events_n = crud_events_n + 1 - crud_events[crud_events_n] = ev - - -- XXX TODO: could delta.version be nil or ngx.null - if type(delta.version) == "number" and delta.version ~= version then - version = delta.version + if old_entity then + local res, err = delete_entity_for_txn(t, delta_type, old_entity, nil) + if not res then + return nil, err + end end + + ev = { delta_type, "delete", old_entity, } end - t:set(DECLARATIVE_HASH_KEY, fmt("%032d", version)) - local ok, err = t:commit() - if not ok then - return nil, err + crud_events_n = crud_events_n + 1 + crud_events[crud_events_n] = ev + + -- XXX TODO: could delta.version be nil or ngx.null + if type(delta.version) == "number" and delta.version ~= version then + version = delta.version end + end + + t:set(DECLARATIVE_HASH_KEY, fmt("%032d", version)) + local ok, err = t:commit() + if not ok then + return nil, err + end - if ns_delta.wipe then - kong.core_cache:purge() - kong.cache:purge() + if ns_delta.wipe then + kong.core_cache:purge() + kong.cache:purge() - else - for _, event in ipairs(crud_events) do - -- delta_type, crud_event_type, delta.row, old_entity - db[event[1]]:post_crud_event(event[2], event[3], event[4]) - end + else + for _, event in ipairs(crud_events) do + -- delta_type, crud_event_type, delta.row, old_entity + db[event[1]]:post_crud_event(event[2], event[3], event[4]) end - end -- for _, delta + end + end -- for _, delta - return true - end) - if not res and err ~= "timeout" then - ngx_log(ngx_ERR, "unable to create worker mutex and sync: ", err) - end + return true end) + if not res and err ~= "timeout" then + ngx_log(ngx_ERR, "unable to create worker mutex and sync: ", err) + end +end + + +function _M:sync_once(delay) + local hdl, err = ngx.timer.at(delay or 0, do_sync) + + if not hdl then + return nil, err + end + + return true +end + + +function _M:sync_every(delay) + local hdl, err = ngx.timer.every(delay, do_sync) if not hdl then return nil, err