Skip to content

Commit

Permalink
Refactor/rename delta row (#13807)
Browse files Browse the repository at this point in the history
* change db filed name

* rename in rpc.sync

* rename in export
  • Loading branch information
chronolaw authored Oct 30, 2024
1 parent a7ba490 commit 003e0ca
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 27 deletions.
22 changes: 11 additions & 11 deletions kong/clustering/services/sync/hooks.lua
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,10 @@ function _M:notify_all_nodes()
end


function _M:entity_delta_writer(row, name, options, ws_id, is_delete)
function _M:entity_delta_writer(entity, name, options, ws_id, is_delete)
-- composite key, like { id = ... }
local schema = kong.db[name].schema
local pk = schema:extract_pk_values(row)
local pk = schema:extract_pk_values(entity)

assert(schema:validate_primary_key(pk))

Expand All @@ -87,7 +87,7 @@ function _M:entity_delta_writer(row, name, options, ws_id, is_delete)
type = name,
pk = pk,
ws_id = ws_id,
row = is_delete and ngx_null or row,
entity = is_delete and ngx_null or entity,
},
}

Expand All @@ -105,7 +105,7 @@ function _M:entity_delta_writer(row, name, options, ws_id, is_delete)

self:notify_all_nodes()

return row -- for other hooks
return entity -- for other hooks
end


Expand Down Expand Up @@ -137,21 +137,21 @@ function _M:register_dao_hooks()
end
end

local function post_hook_writer_func(row, name, options, ws_id)
local function post_hook_writer_func(entity, name, options, ws_id)
if not is_db_export(name) then
return row
return entity
end

return self:entity_delta_writer(row, name, options, ws_id)
return self:entity_delta_writer(entity, name, options, ws_id)
end

local function post_hook_delete_func(row, name, options, ws_id, cascade_entries)
local function post_hook_delete_func(entity, name, options, ws_id, cascade_entries)
if not is_db_export(name) then
return row
return entity
end

-- set lmdb value to ngx_null then return row
return self:entity_delta_writer(row, name, options, ws_id, true)
-- set lmdb value to ngx_null then return entity
return self:entity_delta_writer(entity, name, options, ws_id, true)
end

local dao_hooks = {
Expand Down
18 changes: 9 additions & 9 deletions kong/clustering/services/sync/rpc.lua
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,8 @@ local function do_sync()
-- and replace the old one with it
local default_ws_changed
for _, delta in ipairs(deltas) do
if delta.type == "workspaces" and delta.row.name == "default" then
kong.default_workspace = delta.row.id
if delta.type == "workspaces" and delta.entity.name == "default" then
kong.default_workspace = delta.entity.id
default_ws_changed = true
break
end
Expand All @@ -236,20 +236,20 @@ local function do_sync()
local crud_events_n = 0

-- delta should look like:
-- { type = ..., row = { ... }, version = 1, ws_id = ..., }
-- { type = ..., entity = { ... }, version = 1, ws_id = ..., }
for _, delta in ipairs(deltas) do
local delta_type = delta.type
local delta_row = delta.row
local delta_entity = delta.entity
local ev

-- delta must have ws_id to generate the correct lmdb key
-- set the correct workspace for item
opts.workspace = assert(delta.ws_id)

if delta_row ~= ngx_null then
if delta_entity ~= ngx_null then
-- upsert the entity
-- does the entity already exists?
local old_entity, err = db[delta_type]:select(delta_row)
local old_entity, err = db[delta_type]:select(delta_entity)
if err then
return nil, err
end
Expand All @@ -264,12 +264,12 @@ local function do_sync()
end
end

local res, err = insert_entity_for_txn(t, delta_type, delta_row, opts)
local res, err = insert_entity_for_txn(t, delta_type, delta_entity, opts)
if not res then
return nil, err
end

ev = { delta_type, crud_event_type, delta_row, old_entity, }
ev = { delta_type, crud_event_type, delta_entity, old_entity, }

else
-- delete the entity
Expand Down Expand Up @@ -319,7 +319,7 @@ local function do_sync()

else
for _, event in ipairs(crud_events) do
-- delta_type, crud_event_type, delta.row, old_entity
-- delta_type, crud_event_type, delta.entity, old_entity
db[event[1]]:post_crud_event(event[2], event[3], event[4])
end
end
Expand Down
8 changes: 4 additions & 4 deletions kong/clustering/services/sync/strategies/postgres.lua
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,14 @@ local NEW_VERSION_QUERY = [[
new_version integer;
BEGIN
INSERT INTO clustering_sync_version DEFAULT VALUES RETURNING version INTO new_version;
INSERT INTO clustering_sync_delta (version, type, pk, ws_id, row) VALUES %s;
INSERT INTO clustering_sync_delta (version, type, pk, ws_id, entity) VALUES %s;
END $$;
]]


-- deltas: {
-- { type = "service", "pk" = { id = "d78eb00f..." }, "ws_id" = "73478cf6...", row = "JSON", }
-- { type = "route", "pk" = { id = "0a5bac5c..." }, "ws_id" = "73478cf6...", row = "JSON", }
-- { type = "service", "pk" = { id = "d78eb00f..." }, "ws_id" = "73478cf6...", entity = "JSON", }
-- { type = "route", "pk" = { id = "0a5bac5c..." }, "ws_id" = "73478cf6...", entity = "JSON", }
-- }
function _M:insert_delta(deltas)
local buf = buffer.new()
Expand All @@ -83,7 +83,7 @@ function _M:insert_delta(deltas)
self.connector:escape_literal(d.type),
self.connector:escape_literal(cjson_encode(d.pk)),
self.connector:escape_literal(d.ws_id or kong.default_workspace),
self.connector:escape_literal(cjson_encode(d.row)))
self.connector:escape_literal(cjson_encode(d.entity)))
end

local sql = string_format(NEW_VERSION_QUERY, buf:get())
Expand Down
2 changes: 1 addition & 1 deletion kong/db/declarative/export.lua
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ local sync_emitter = {

emit_entity = function(self, entity_name, entity_data)
self.out_n = self.out_n + 1
self.out[self.out_n] = { type = entity_name , row = entity_data, version = self.sync_version,
self.out[self.out_n] = { type = entity_name , entity = entity_data, version = self.sync_version,
ws_id = kong.default_workspace, }
end,

Expand Down
2 changes: 1 addition & 1 deletion kong/db/migrations/core/024_370_to_380.lua
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ return {
"type" TEXT NOT NULL,
"pk" JSON NOT NULL,
"ws_id" UUID NOT NULL,
"row" JSON,
"entity" JSON,
FOREIGN KEY (version) REFERENCES clustering_sync_version(version) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS clustering_sync_delta_version_idx ON clustering_sync_delta (version);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@ describe("database migration", function()
assert.table_has_column("clustering_sync_delta", "type", "text")
assert.table_has_column("clustering_sync_delta", "pk", "json")
assert.table_has_column("clustering_sync_delta", "ws_id", "uuid")
assert.table_has_column("clustering_sync_delta", "row", "json")
assert.table_has_column("clustering_sync_delta", "entity", "json")
end)
end)

0 comments on commit 003e0ca

Please sign in to comment.