-
Notifications
You must be signed in to change notification settings - Fork 2.8k
fix: use shdict instead of events module for nodes data exchange #13066
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
9ad59f2
db63da1
05b9aa5
f484131
541dccf
e13e543
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -31,6 +31,7 @@ local ngx_timer_at = ngx.timer.at | |||||||||||||||
| local ngx_timer_every = ngx.timer.every | ||||||||||||||||
| local log = core.log | ||||||||||||||||
| local json_delay_encode = core.json.delay_encode | ||||||||||||||||
| local process = require("ngx.process") | ||||||||||||||||
| local ngx_worker_id = ngx.worker.id | ||||||||||||||||
| local exiting = ngx.worker.exiting | ||||||||||||||||
| local thread_spawn = ngx.thread.spawn | ||||||||||||||||
|
|
@@ -42,16 +43,22 @@ local null = ngx.null | |||||||||||||||
| local type = type | ||||||||||||||||
| local next = next | ||||||||||||||||
|
|
||||||||||||||||
| local all_services = core.table.new(0, 5) | ||||||||||||||||
| local consul_dict = ngx.shared.consul | ||||||||||||||||
| if not consul_dict then | ||||||||||||||||
| error("lua_shared_dict \"consul\" not configured") | ||||||||||||||||
| end | ||||||||||||||||
|
|
||||||||||||||||
| local default_service | ||||||||||||||||
| local default_weight | ||||||||||||||||
| local sort_type | ||||||||||||||||
| local skip_service_map = core.table.new(0, 1) | ||||||||||||||||
| local dump_params | ||||||||||||||||
|
|
||||||||||||||||
| local events | ||||||||||||||||
| local events_list | ||||||||||||||||
| local consul_services | ||||||||||||||||
| -- per-worker cache: service_name -> {raw = "json string", nodes = {decoded table}} | ||||||||||||||||
| -- returns the same table instance while the shared dict value is unchanged, | ||||||||||||||||
| -- so that compare_upstream_node() fast-path (old_t == new_t) works | ||||||||||||||||
| local nodes_cache = {} | ||||||||||||||||
|
|
||||||||||||||||
| local default_skip_services = {"consul"} | ||||||||||||||||
| local default_random_range = 5 | ||||||||||||||||
|
|
@@ -66,53 +73,81 @@ local _M = { | |||||||||||||||
| } | ||||||||||||||||
|
|
||||||||||||||||
|
|
||||||||||||||||
| local function discovery_consul_callback(data, event, source, pid) | ||||||||||||||||
| all_services = data | ||||||||||||||||
| log.notice("update local variable all_services, event is: ", event, | ||||||||||||||||
| "source: ", source, "server pid:", pid, | ||||||||||||||||
| ", all services: ", json_delay_encode(all_services, true)) | ||||||||||||||||
| end | ||||||||||||||||
|
|
||||||||||||||||
|
|
||||||||||||||||
| function _M.all_nodes() | ||||||||||||||||
| return all_services | ||||||||||||||||
| local keys = consul_dict:get_keys(0) | ||||||||||||||||
| local services = core.table.new(0, #keys) | ||||||||||||||||
| for _, key in ipairs(keys) do | ||||||||||||||||
| local value = consul_dict:get(key) | ||||||||||||||||
| if value then | ||||||||||||||||
| local nodes, err = core.json.decode(value) | ||||||||||||||||
| if nodes then | ||||||||||||||||
| services[key] = nodes | ||||||||||||||||
| else | ||||||||||||||||
| log.error("failed to decode nodes for service: ", key, ", error: ", err) | ||||||||||||||||
| end | ||||||||||||||||
| end | ||||||||||||||||
| end | ||||||||||||||||
| return services | ||||||||||||||||
| end | ||||||||||||||||
|
|
||||||||||||||||
|
|
||||||||||||||||
| function _M.nodes(service_name) | ||||||||||||||||
| if not all_services then | ||||||||||||||||
| log.error("all_services is nil, failed to fetch nodes for : ", service_name) | ||||||||||||||||
| return | ||||||||||||||||
| local value = consul_dict:get(service_name) | ||||||||||||||||
| if not value then | ||||||||||||||||
| log.error("consul service not found: ", service_name, ", return default service") | ||||||||||||||||
| return default_service and {default_service} | ||||||||||||||||
| end | ||||||||||||||||
|
|
||||||||||||||||
| local resp_list = all_services[service_name] | ||||||||||||||||
| local cached = nodes_cache[service_name] | ||||||||||||||||
| if cached and cached.raw == value then | ||||||||||||||||
| return cached.nodes | ||||||||||||||||
| end | ||||||||||||||||
|
|
||||||||||||||||
| if not resp_list then | ||||||||||||||||
| log.error("fetch nodes failed by ", service_name, ", return default service") | ||||||||||||||||
| local nodes, err = core.json.decode(value) | ||||||||||||||||
| if not nodes then | ||||||||||||||||
| log.error("fetch nodes failed by ", service_name, ", error: ", err) | ||||||||||||||||
| return default_service and {default_service} | ||||||||||||||||
| end | ||||||||||||||||
|
|
||||||||||||||||
| log.info("process id: ", ngx_worker_id(), ", all_services[", service_name, "] = ", | ||||||||||||||||
| json_delay_encode(resp_list, true)) | ||||||||||||||||
| nodes_cache[service_name] = {raw = value, nodes = nodes} | ||||||||||||||||
|
|
||||||||||||||||
| log.info("process id: ", ngx_worker_id(), ", [", service_name, "] = ", | ||||||||||||||||
| json_delay_encode(nodes, true)) | ||||||||||||||||
|
|
||||||||||||||||
| return resp_list | ||||||||||||||||
| return nodes | ||||||||||||||||
|
Comment on lines
94
to
+117
|
||||||||||||||||
| end | ||||||||||||||||
|
|
||||||||||||||||
|
|
||||||||||||||||
| local function update_all_services(consul_server_url, up_services) | ||||||||||||||||
| -- clean old unused data | ||||||||||||||||
| -- write new/updated values first so readers never see a missing service | ||||||||||||||||
| for k, v in pairs(up_services) do | ||||||||||||||||
| local content, err = core.json.encode(v) | ||||||||||||||||
| if content then | ||||||||||||||||
| local ok, set_err, forcible = consul_dict:set(k, content) | ||||||||||||||||
| if not ok then | ||||||||||||||||
| log.error("failed to set nodes for service: ", k, ", error: ", set_err, | ||||||||||||||||
| ", please consider increasing lua_shared_dict consul size") | ||||||||||||||||
| elseif forcible then | ||||||||||||||||
| log.warn("consul shared dict is full, forcibly evicting items while ", | ||||||||||||||||
| "setting nodes for service: ", k, | ||||||||||||||||
| ", please consider increasing lua_shared_dict consul size") | ||||||||||||||||
| end | ||||||||||||||||
| else | ||||||||||||||||
|
Comment on lines
121
to
+135
|
||||||||||||||||
| log.error("failed to encode nodes for service: ", k, ", error: ", err) | ||||||||||||||||
| end | ||||||||||||||||
| end | ||||||||||||||||
|
|
||||||||||||||||
| -- then delete keys that are no longer present | ||||||||||||||||
| local old_services = consul_services[consul_server_url] or {} | ||||||||||||||||
| for k, _ in pairs(old_services) do | ||||||||||||||||
| all_services[k] = nil | ||||||||||||||||
| if not up_services[k] then | ||||||||||||||||
| consul_dict:delete(k) | ||||||||||||||||
| end | ||||||||||||||||
| end | ||||||||||||||||
| core.table.clear(old_services) | ||||||||||||||||
|
|
||||||||||||||||
| for k, v in pairs(up_services) do | ||||||||||||||||
| all_services[k] = v | ||||||||||||||||
| end | ||||||||||||||||
| consul_services[consul_server_url] = up_services | ||||||||||||||||
|
|
||||||||||||||||
| log.info("update all services: ", json_delay_encode(all_services, true)) | ||||||||||||||||
| log.info("update all services to shared dict") | ||||||||||||||||
| end | ||||||||||||||||
|
|
||||||||||||||||
|
|
||||||||||||||||
|
|
@@ -149,14 +184,21 @@ local function read_dump_services() | |||||||||||||||
| return | ||||||||||||||||
| end | ||||||||||||||||
|
|
||||||||||||||||
| all_services = entity.services | ||||||||||||||||
| log.info("load dump file into memory success") | ||||||||||||||||
| for k, v in pairs(entity.services) do | ||||||||||||||||
| local content, json_err = core.json.encode(v) | ||||||||||||||||
| if content then | ||||||||||||||||
| consul_dict:set(k, content) | ||||||||||||||||
| else | ||||||||||||||||
| log.error("failed to encode dump service: ", k, ", error: ", json_err) | ||||||||||||||||
| end | ||||||||||||||||
| end | ||||||||||||||||
| log.info("load dump file into shared dict success") | ||||||||||||||||
|
Comment on lines
+187
to
+195
|
||||||||||||||||
| end | ||||||||||||||||
|
|
||||||||||||||||
|
|
||||||||||||||||
| local function write_dump_services() | ||||||||||||||||
| local entity = { | ||||||||||||||||
| services = all_services, | ||||||||||||||||
| services = _M.all_nodes(), | ||||||||||||||||
| last_update = ngx.time(), | ||||||||||||||||
| expire = dump_params.expire, -- later need handle it | ||||||||||||||||
|
Comment on lines
199
to
203
|
||||||||||||||||
| } | ||||||||||||||||
|
|
@@ -556,14 +598,6 @@ function _M.connect(premature, consul_server, retry_delay) | |||||||||||||||
|
|
||||||||||||||||
| update_all_services(consul_server.consul_server_url, up_services) | ||||||||||||||||
|
|
||||||||||||||||
| --update events | ||||||||||||||||
| local post_ok, post_err = events:post(events_list._source, | ||||||||||||||||
| events_list.updating, all_services) | ||||||||||||||||
| if not post_ok then | ||||||||||||||||
| log.error("post_event failure with ", events_list._source, | ||||||||||||||||
| ", update all services error: ", post_err) | ||||||||||||||||
| end | ||||||||||||||||
|
|
||||||||||||||||
| if dump_params then | ||||||||||||||||
| ngx_timer_at(0, write_dump_services) | ||||||||||||||||
| end | ||||||||||||||||
|
|
@@ -611,35 +645,32 @@ end | |||||||||||||||
|
|
||||||||||||||||
| function _M.init_worker() | ||||||||||||||||
| local consul_conf = local_conf.discovery.consul | ||||||||||||||||
| dump_params = consul_conf.dump | ||||||||||||||||
|
|
||||||||||||||||
| if consul_conf.dump then | ||||||||||||||||
| local dump = consul_conf.dump | ||||||||||||||||
| dump_params = dump | ||||||||||||||||
|
|
||||||||||||||||
| if dump.load_on_init then | ||||||||||||||||
| read_dump_services() | ||||||||||||||||
| end | ||||||||||||||||
| end | ||||||||||||||||
|
|
||||||||||||||||
| events = require("apisix.events") | ||||||||||||||||
| events_list = events:event_list( | ||||||||||||||||
| "discovery_consul_update_all_services", | ||||||||||||||||
| "updating" | ||||||||||||||||
| ) | ||||||||||||||||
|
|
||||||||||||||||
| if 0 ~= ngx_worker_id() then | ||||||||||||||||
| events:register(discovery_consul_callback, events_list._source, events_list.updating) | ||||||||||||||||
| return | ||||||||||||||||
| end | ||||||||||||||||
|
|
||||||||||||||||
| log.notice("consul_conf: ", json_delay_encode(consul_conf, true)) | ||||||||||||||||
| default_weight = consul_conf.weight | ||||||||||||||||
| sort_type = consul_conf.sort_type | ||||||||||||||||
| -- set default service, used when the server node cannot be found | ||||||||||||||||
| if consul_conf.default_service then | ||||||||||||||||
| default_service = consul_conf.default_service | ||||||||||||||||
| default_service.weight = default_weight | ||||||||||||||||
| end | ||||||||||||||||
|
|
||||||||||||||||
| if process.type() ~= "privileged agent" then | ||||||||||||||||
| return | ||||||||||||||||
| end | ||||||||||||||||
|
|
||||||||||||||||
| -- flush stale data that may persist across reloads, | ||||||||||||||||
| -- since consul_services is re-initialized empty | ||||||||||||||||
| consul_dict:flush_all() | ||||||||||||||||
|
Comment on lines
+662
to
+664
|
||||||||||||||||
| -- flush stale data that may persist across reloads, | |
| -- since consul_services is re-initialized empty | |
| consul_dict:flush_all() | |
| -- flush expired stale data that may persist across reloads, | |
| -- since consul_services is re-initialized empty but existing | |
| -- unexpired shared dict entries may still be in use by workers | |
| consul_dict:flush_expired() |
Copilot
AI
Mar 6, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consul_dict:flush_all() runs in the privileged agent during init_worker(), but it executes after read_dump_services() may have populated the shared dict (and workers also call read_dump_services() before the privileged-agent guard). This can wipe the dump-loaded nodes and leave the dict empty until Consul fetch completes, defeating the dump-on-reload/startup mitigation and potentially causing 503s. Consider either (a) moving the flush earlier (before any dump load) and performing dump load only in the privileged agent, or (b) removing flush_all() and instead cleaning stale keys during update_all_services() in a way that survives reloads.
Copilot
AI
Mar 6, 2026
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This PR changes the cross-worker data sharing mechanism for Consul nodes to rely on lua_shared_dict, but there isn’t a regression test validating the restart/reload scenario from #12398 (e.g., repeated HUP reloads / restarts while sending requests should not produce intermittent 503s once Consul has data). Adding a Test::Nginx case similar to existing HUP-based tests would help prevent regressions.
| -- flush stale data that may persist across reloads, | |
| -- since consul_services is re-initialized empty | |
| consul_dict:flush_all() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nodes()usesconsul_dict:get(service_name). During reloads or refresh patterns that useflush_all(),get()returns nil even though stale values may still exist, causing avoidable 503s. Consider usingget_stale()(and only falling back to default when both fresh+stale are missing). Also consider clearingnodes_cache[service_name]when the dict has no value to avoid retaining entries for removed services indefinitely.