diff --git a/kong/clustering/services/sync/hooks.lua b/kong/clustering/services/sync/hooks.lua index 336bd3226aa1..ae7bbbe90620 100644 --- a/kong/clustering/services/sync/hooks.lua +++ b/kong/clustering/services/sync/hooks.lua @@ -75,10 +75,10 @@ function _M:notify_all_nodes() end -function _M:entity_delta_writer(row, name, options, ws_id, is_delete) +function _M:entity_delta_writer(entity, name, options, ws_id, is_delete) -- composite key, like { id = ... } local schema = kong.db[name].schema - local pk = schema:extract_pk_values(row) + local pk = schema:extract_pk_values(entity) assert(schema:validate_primary_key(pk)) @@ -87,7 +87,7 @@ function _M:entity_delta_writer(row, name, options, ws_id, is_delete) type = name, pk = pk, ws_id = ws_id, - row = is_delete and ngx_null or row, + entity = is_delete and ngx_null or entity, }, } @@ -105,7 +105,7 @@ function _M:entity_delta_writer(row, name, options, ws_id, is_delete) self:notify_all_nodes() - return row -- for other hooks + return entity -- for other hooks end @@ -137,21 +137,21 @@ function _M:register_dao_hooks() end end - local function post_hook_writer_func(row, name, options, ws_id) + local function post_hook_writer_func(entity, name, options, ws_id) if not is_db_export(name) then - return row + return entity end - return self:entity_delta_writer(row, name, options, ws_id) + return self:entity_delta_writer(entity, name, options, ws_id) end - local function post_hook_delete_func(row, name, options, ws_id, cascade_entries) + local function post_hook_delete_func(entity, name, options, ws_id, cascade_entries) if not is_db_export(name) then - return row + return entity end - -- set lmdb value to ngx_null then return row - return self:entity_delta_writer(row, name, options, ws_id, true) + -- set lmdb value to ngx_null then return entity + return self:entity_delta_writer(entity, name, options, ws_id, true) end local dao_hooks = { diff --git a/kong/clustering/services/sync/rpc.lua b/kong/clustering/services/sync/rpc.lua index 1a8f749af01d..4d46cef1fb5a 100644 --- a/kong/clustering/services/sync/rpc.lua +++ b/kong/clustering/services/sync/rpc.lua @@ -213,8 +213,8 @@ local function do_sync() -- and replace the old one with it local default_ws_changed for _, delta in ipairs(deltas) do - if delta.type == "workspaces" and delta.row.name == "default" then - kong.default_workspace = delta.row.id + if delta.type == "workspaces" and delta.entity.name == "default" then + kong.default_workspace = delta.entity.id default_ws_changed = true break end @@ -236,20 +236,20 @@ local function do_sync() local crud_events_n = 0 -- delta should look like: - -- { type = ..., row = { ... }, version = 1, ws_id = ..., } + -- { type = ..., entity = { ... }, version = 1, ws_id = ..., } for _, delta in ipairs(deltas) do local delta_type = delta.type - local delta_row = delta.row + local delta_entity = delta.entity local ev -- delta must have ws_id to generate the correct lmdb key -- set the correct workspace for item opts.workspace = assert(delta.ws_id) - if delta_row ~= ngx_null then + if delta_entity ~= ngx_null then -- upsert the entity -- does the entity already exists? - local old_entity, err = db[delta_type]:select(delta_row) + local old_entity, err = db[delta_type]:select(delta_entity) if err then return nil, err end @@ -264,12 +264,12 @@ local function do_sync() end end - local res, err = insert_entity_for_txn(t, delta_type, delta_row, opts) + local res, err = insert_entity_for_txn(t, delta_type, delta_entity, opts) if not res then return nil, err end - ev = { delta_type, crud_event_type, delta_row, old_entity, } + ev = { delta_type, crud_event_type, delta_entity, old_entity, } else -- delete the entity @@ -319,7 +319,7 @@ local function do_sync() else for _, event in ipairs(crud_events) do - -- delta_type, crud_event_type, delta.row, old_entity + -- delta_type, crud_event_type, delta.entity, old_entity db[event[1]]:post_crud_event(event[2], event[3], event[4]) end end diff --git a/kong/clustering/services/sync/strategies/postgres.lua b/kong/clustering/services/sync/strategies/postgres.lua index 616a4e38cc78..bbc397d773cc 100644 --- a/kong/clustering/services/sync/strategies/postgres.lua +++ b/kong/clustering/services/sync/strategies/postgres.lua @@ -67,14 +67,14 @@ local NEW_VERSION_QUERY = [[ new_version integer; BEGIN INSERT INTO clustering_sync_version DEFAULT VALUES RETURNING version INTO new_version; - INSERT INTO clustering_sync_delta (version, type, pk, ws_id, row) VALUES %s; + INSERT INTO clustering_sync_delta (version, type, pk, ws_id, entity) VALUES %s; END $$; ]] -- deltas: { --- { type = "service", "pk" = { id = "d78eb00f..." }, "ws_id" = "73478cf6...", row = "JSON", } --- { type = "route", "pk" = { id = "0a5bac5c..." }, "ws_id" = "73478cf6...", row = "JSON", } +-- { type = "service", "pk" = { id = "d78eb00f..." }, "ws_id" = "73478cf6...", entity = "JSON", } +-- { type = "route", "pk" = { id = "0a5bac5c..." }, "ws_id" = "73478cf6...", entity = "JSON", } -- } function _M:insert_delta(deltas) local buf = buffer.new() @@ -83,7 +83,7 @@ function _M:insert_delta(deltas) self.connector:escape_literal(d.type), self.connector:escape_literal(cjson_encode(d.pk)), self.connector:escape_literal(d.ws_id or kong.default_workspace), - self.connector:escape_literal(cjson_encode(d.row))) + self.connector:escape_literal(cjson_encode(d.entity))) end local sql = string_format(NEW_VERSION_QUERY, buf:get()) diff --git a/kong/db/declarative/export.lua b/kong/db/declarative/export.lua index acbd0f225d78..e20d3c1d8469 100644 --- a/kong/db/declarative/export.lua +++ b/kong/db/declarative/export.lua @@ -363,7 +363,7 @@ local sync_emitter = { emit_entity = function(self, entity_name, entity_data) self.out_n = self.out_n + 1 - self.out[self.out_n] = { type = entity_name , row = entity_data, version = self.sync_version, + self.out[self.out_n] = { type = entity_name , entity = entity_data, version = self.sync_version, ws_id = kong.default_workspace, } end, diff --git a/kong/db/migrations/core/024_370_to_380.lua b/kong/db/migrations/core/024_370_to_380.lua index afb94a09f7a3..b433500f7edc 100644 --- a/kong/db/migrations/core/024_370_to_380.lua +++ b/kong/db/migrations/core/024_370_to_380.lua @@ -11,7 +11,7 @@ return { "type" TEXT NOT NULL, "pk" JSON NOT NULL, "ws_id" UUID NOT NULL, - "row" JSON, + "entity" JSON, FOREIGN KEY (version) REFERENCES clustering_sync_version(version) ON DELETE CASCADE ); CREATE INDEX IF NOT EXISTS clustering_sync_delta_version_idx ON clustering_sync_delta (version); diff --git a/spec/05-migration/db/migrations/core/024_370_to_380_spec.lua b/spec/05-migration/db/migrations/core/024_370_to_380_spec.lua index f4a6145d16be..cf0f04513c68 100644 --- a/spec/05-migration/db/migrations/core/024_370_to_380_spec.lua +++ b/spec/05-migration/db/migrations/core/024_370_to_380_spec.lua @@ -12,6 +12,6 @@ describe("database migration", function() assert.table_has_column("clustering_sync_delta", "type", "text") assert.table_has_column("clustering_sync_delta", "pk", "json") assert.table_has_column("clustering_sync_delta", "ws_id", "uuid") - assert.table_has_column("clustering_sync_delta", "row", "json") + assert.table_has_column("clustering_sync_delta", "entity", "json") end) end)