Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor/rename delta row #13807

Merged
merged 3 commits into from
Oct 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 11 additions & 11 deletions kong/clustering/services/sync/hooks.lua
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,10 @@ function _M:notify_all_nodes()
end


function _M:entity_delta_writer(row, name, options, ws_id, is_delete)
function _M:entity_delta_writer(entity, name, options, ws_id, is_delete)
-- composite key, like { id = ... }
local schema = kong.db[name].schema
local pk = schema:extract_pk_values(row)
local pk = schema:extract_pk_values(entity)

assert(schema:validate_primary_key(pk))

Expand All @@ -87,7 +87,7 @@ function _M:entity_delta_writer(row, name, options, ws_id, is_delete)
type = name,
pk = pk,
ws_id = ws_id,
row = is_delete and ngx_null or row,
entity = is_delete and ngx_null or entity,
},
}

Expand All @@ -105,7 +105,7 @@ function _M:entity_delta_writer(row, name, options, ws_id, is_delete)

self:notify_all_nodes()

return row -- for other hooks
return entity -- for other hooks
end


Expand Down Expand Up @@ -137,21 +137,21 @@ function _M:register_dao_hooks()
end
end

local function post_hook_writer_func(row, name, options, ws_id)
local function post_hook_writer_func(entity, name, options, ws_id)
if not is_db_export(name) then
return row
return entity
end

return self:entity_delta_writer(row, name, options, ws_id)
return self:entity_delta_writer(entity, name, options, ws_id)
end

local function post_hook_delete_func(row, name, options, ws_id, cascade_entries)
local function post_hook_delete_func(entity, name, options, ws_id, cascade_entries)
if not is_db_export(name) then
return row
return entity
end

-- set lmdb value to ngx_null then return row
return self:entity_delta_writer(row, name, options, ws_id, true)
-- set lmdb value to ngx_null then return entity
return self:entity_delta_writer(entity, name, options, ws_id, true)
end

local dao_hooks = {
Expand Down
18 changes: 9 additions & 9 deletions kong/clustering/services/sync/rpc.lua
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,8 @@ local function do_sync()
-- and replace the old one with it
local default_ws_changed
for _, delta in ipairs(deltas) do
if delta.type == "workspaces" and delta.row.name == "default" then
kong.default_workspace = delta.row.id
if delta.type == "workspaces" and delta.entity.name == "default" then
kong.default_workspace = delta.entity.id
default_ws_changed = true
break
end
Expand All @@ -236,20 +236,20 @@ local function do_sync()
local crud_events_n = 0

-- delta should look like:
-- { type = ..., row = { ... }, version = 1, ws_id = ..., }
-- { type = ..., entity = { ... }, version = 1, ws_id = ..., }
for _, delta in ipairs(deltas) do
local delta_type = delta.type
local delta_row = delta.row
local delta_entity = delta.entity
local ev

-- delta must have ws_id to generate the correct lmdb key
-- set the correct workspace for item
opts.workspace = assert(delta.ws_id)

if delta_row ~= ngx_null then
if delta_entity ~= ngx_null then
-- upsert the entity
-- does the entity already exists?
local old_entity, err = db[delta_type]:select(delta_row)
local old_entity, err = db[delta_type]:select(delta_entity)
if err then
return nil, err
end
Expand All @@ -264,12 +264,12 @@ local function do_sync()
end
end

local res, err = insert_entity_for_txn(t, delta_type, delta_row, opts)
local res, err = insert_entity_for_txn(t, delta_type, delta_entity, opts)
if not res then
return nil, err
end

ev = { delta_type, crud_event_type, delta_row, old_entity, }
ev = { delta_type, crud_event_type, delta_entity, old_entity, }

else
-- delete the entity
Expand Down Expand Up @@ -319,7 +319,7 @@ local function do_sync()

else
for _, event in ipairs(crud_events) do
-- delta_type, crud_event_type, delta.row, old_entity
-- delta_type, crud_event_type, delta.entity, old_entity
db[event[1]]:post_crud_event(event[2], event[3], event[4])
end
end
Expand Down
8 changes: 4 additions & 4 deletions kong/clustering/services/sync/strategies/postgres.lua
Original file line number Diff line number Diff line change
Expand Up @@ -67,14 +67,14 @@ local NEW_VERSION_QUERY = [[
new_version integer;
BEGIN
INSERT INTO clustering_sync_version DEFAULT VALUES RETURNING version INTO new_version;
INSERT INTO clustering_sync_delta (version, type, pk, ws_id, row) VALUES %s;
INSERT INTO clustering_sync_delta (version, type, pk, ws_id, entity) VALUES %s;
END $$;
]]


-- deltas: {
-- { type = "service", "pk" = { id = "d78eb00f..." }, "ws_id" = "73478cf6...", row = "JSON", }
-- { type = "route", "pk" = { id = "0a5bac5c..." }, "ws_id" = "73478cf6...", row = "JSON", }
-- { type = "service", "pk" = { id = "d78eb00f..." }, "ws_id" = "73478cf6...", entity = "JSON", }
-- { type = "route", "pk" = { id = "0a5bac5c..." }, "ws_id" = "73478cf6...", entity = "JSON", }
-- }
function _M:insert_delta(deltas)
local buf = buffer.new()
Expand All @@ -83,7 +83,7 @@ function _M:insert_delta(deltas)
self.connector:escape_literal(d.type),
self.connector:escape_literal(cjson_encode(d.pk)),
self.connector:escape_literal(d.ws_id or kong.default_workspace),
self.connector:escape_literal(cjson_encode(d.row)))
self.connector:escape_literal(cjson_encode(d.entity)))
end

local sql = string_format(NEW_VERSION_QUERY, buf:get())
Expand Down
2 changes: 1 addition & 1 deletion kong/db/declarative/export.lua
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,7 @@ local sync_emitter = {

emit_entity = function(self, entity_name, entity_data)
self.out_n = self.out_n + 1
self.out[self.out_n] = { type = entity_name , row = entity_data, version = self.sync_version,
self.out[self.out_n] = { type = entity_name , entity = entity_data, version = self.sync_version,
ws_id = kong.default_workspace, }
end,

Expand Down
2 changes: 1 addition & 1 deletion kong/db/migrations/core/024_370_to_380.lua
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ return {
"type" TEXT NOT NULL,
"pk" JSON NOT NULL,
"ws_id" UUID NOT NULL,
"row" JSON,
"entity" JSON,
FOREIGN KEY (version) REFERENCES clustering_sync_version(version) ON DELETE CASCADE
);
CREATE INDEX IF NOT EXISTS clustering_sync_delta_version_idx ON clustering_sync_delta (version);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,6 @@ describe("database migration", function()
assert.table_has_column("clustering_sync_delta", "type", "text")
assert.table_has_column("clustering_sync_delta", "pk", "json")
assert.table_has_column("clustering_sync_delta", "ws_id", "uuid")
assert.table_has_column("clustering_sync_delta", "row", "json")
assert.table_has_column("clustering_sync_delta", "entity", "json")
end)
end)
Loading