diff --git a/membership.lua b/membership.lua index 9b31d9b..e38bb55 100755 --- a/membership.lua +++ b/membership.lua @@ -37,6 +37,7 @@ local _sync_trigger = stash.get('_sync_trigger') or fiber.cond() local _ack_trigger = stash.get('_ack_trigger') or fiber.cond() local _ack_cache = stash.get('_ack_cache') or {} local _resolve_cache = stash.get('_resolve_cache') or {} +local _global_params = stash.get('global_params') local function after_reload() stash.set('_ack_cache', _ack_cache) @@ -484,7 +485,8 @@ local function _protocol_step() end -- calculate time difference between local time and member time local delta = _get_clock_delta(ack_data) - members.set(uri, member.status, member.incarnation, { clock_delta = delta }) -- update timstamp + members.set(uri, member.status, member.incarnation, { clock_delta = delta }, + { forbid_new = _global_params.forbid_new_members }) -- update timstamp return end end @@ -519,7 +521,8 @@ local function _protocol_step() end -- calculate time difference between local time and member time local delta = _get_clock_delta(ack_data) - members.set(uri, member.status, member.incarnation, { clock_delta = delta }) + members.set(uri, member.status, member.incarnation, { clock_delta = delta }, + { forbid_new = _global_params.forbid_new_members }) return elseif members.get(uri).status == opts.ALIVE then local myself = members.get(advertise_uri) @@ -601,12 +604,22 @@ end -- either hostname or IP address being advertised to other members -- @tparam number port -- UDP port to bind and advertise +-- @tparam table opts Available options: +-- +-- @tparam ?boolean opts.forbid_new_members +-- Forbids to add new members to the group. New members could be added +-- only with `probe_uri`. -- @treturn boolean `true` -- @raise Socket bind error -local function init(advertise_host, port) - checks('string', 'number') +local function init(advertise_host, port, params) + checks('string', 'number', { forbid_new_members = '?boolean' }) local parts = uri_tools.parse(advertise_host) + + if params and params.forbid_new_members ~= nil then + _global_params.forbid_new_members = params.forbid_new_members + end + if _sock == nil or _sock:name().port ~= port then local family = parts.ipv6 and 'AF_INET6' or 'AF_INET' local addr = parts.ipv6 and '::' or '0.0.0.0' @@ -924,8 +937,8 @@ end -- @treturn ?string Possible errors: -- -- * `"parse error"` - if the URI can not be parsed --- * `"ping was not sent"` - if hostname could not be reloved --- * `"no reponce"` - if member does not responf within 0.2 seconds +-- * `"ping was not sent"` - if hostname could not be resloved +-- * `"no responce"` - if member does not respond within 0.2 seconds local function probe_uri(uri) checks('string') local parts = uri_tools.parse(uri) diff --git a/membership/events.lua b/membership/events.lua index 93a38b7..89660d7 100644 --- a/membership/events.lua +++ b/membership/events.lua @@ -23,6 +23,8 @@ local _expired = table.copy(stash.get('events._expired')) or { local _subscribers = table.copy(stash.get('events._subscribers')) or { -- [fiber.cond] = true } +local _global_params = stash.get('global_params') + setmetatable(_subscribers, {__mode = 'k'}) function events.after_reload() @@ -148,7 +150,8 @@ function events.handle(event) opts.STATUS_NAMES[event.status] ) end - members.set(event.uri, event.status, event.incarnation, { payload = event.payload }) + members.set(event.uri, event.status, event.incarnation, { payload = event.payload }, + { forbid_new = _global_params.forbid_new_members }) for cond, _ in pairs(_subscribers) do cond:broadcast() diff --git a/membership/members.lua b/membership/members.lua index 2234723..4cf7597 100644 --- a/membership/members.lua +++ b/membership/members.lua @@ -84,10 +84,17 @@ function members.filter_excluding(state, uri1, uri2) return ret end -function members.set(uri, status, incarnation, params) - checks('string', 'number', 'number', { payload = '?table', clock_delta = '?number' }) +function members.set(uri, status, incarnation, params, set_opts) + checks('string', 'number', 'number', + { payload = '?table', clock_delta = '?number', }, + { forbid_new = '?boolean', } + ) local member = _all_members[uri] + if member == nil and set_opts and set_opts.forbid_new then + opts.log_debug("New member %s was skipped", uri) + return + end if member and incarnation < member.incarnation then error('Can not downgrade incarnation') end diff --git a/membership/stash.lua b/membership/stash.lua index d616291..800515f 100644 --- a/membership/stash.lua +++ b/membership/stash.lua @@ -1,4 +1,7 @@ local S = rawget(_G, '__membership_stash') or {} +S.global_params = S.global_params or { + forbid_new_members = false, +} local log = require('log')