Skip to content

Commit

Permalink
Add new param to forbid adding new members
Browse files Browse the repository at this point in the history
  • Loading branch information
yngvar-antonsson committed Jan 22, 2025
1 parent a12d707 commit 6947d2f
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 9 deletions.
25 changes: 19 additions & 6 deletions membership.lua
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ local _sync_trigger = stash.get('_sync_trigger') or fiber.cond()
local _ack_trigger = stash.get('_ack_trigger') or fiber.cond()
local _ack_cache = stash.get('_ack_cache') or {}
local _resolve_cache = stash.get('_resolve_cache') or {}
local _global_params = stash.get('global_params')

local function after_reload()
stash.set('_ack_cache', _ack_cache)
Expand Down Expand Up @@ -484,7 +485,8 @@ local function _protocol_step()
end
-- calculate time difference between local time and member time
local delta = _get_clock_delta(ack_data)
members.set(uri, member.status, member.incarnation, { clock_delta = delta }) -- update timstamp
members.set(uri, member.status, member.incarnation, { clock_delta = delta },
{ forbid_new = _global_params.forbid_new_members }) -- update timstamp
return
end
end
Expand Down Expand Up @@ -519,7 +521,8 @@ local function _protocol_step()
end
-- calculate time difference between local time and member time
local delta = _get_clock_delta(ack_data)
members.set(uri, member.status, member.incarnation, { clock_delta = delta })
members.set(uri, member.status, member.incarnation, { clock_delta = delta },
{ forbid_new = _global_params.forbid_new_members })
return
elseif members.get(uri).status == opts.ALIVE then
local myself = members.get(advertise_uri)
Expand Down Expand Up @@ -601,12 +604,22 @@ end
-- either hostname or IP address being advertised to other members
-- @tparam number port
-- UDP port to bind and advertise
-- @tparam table opts Available options:
--
-- @tparam ?boolean opts.forbid_new_members
-- Forbids to add new members to the group. New members could be added
-- only with `probe_uri`.
-- @treturn boolean `true`
-- @raise Socket bind error
local function init(advertise_host, port)
checks('string', 'number')
local function init(advertise_host, port, params)
checks('string', 'number', { forbid_new_members = '?boolean' })

local parts = uri_tools.parse(advertise_host)

if params and params.forbid_new_members ~= nil then
_global_params.forbid_new_members = params.forbid_new_members
end

if _sock == nil or _sock:name().port ~= port then
local family = parts.ipv6 and 'AF_INET6' or 'AF_INET'
local addr = parts.ipv6 and '::' or '0.0.0.0'
Expand Down Expand Up @@ -924,8 +937,8 @@ end
-- @treturn ?string Possible errors:
--
-- * `"parse error"` - if the URI can not be parsed
-- * `"ping was not sent"` - if hostname could not be reloved
-- * `"no reponce"` - if member does not responf within 0.2 seconds
-- * `"ping was not sent"` - if hostname could not be resloved
-- * `"no responce"` - if member does not respond within 0.2 seconds
local function probe_uri(uri)
checks('string')
local parts = uri_tools.parse(uri)
Expand Down
5 changes: 4 additions & 1 deletion membership/events.lua
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ local _expired = table.copy(stash.get('events._expired')) or {
local _subscribers = table.copy(stash.get('events._subscribers')) or {
-- [fiber.cond] = true
}
local _global_params = stash.get('global_params')

setmetatable(_subscribers, {__mode = 'k'})

function events.after_reload()
Expand Down Expand Up @@ -148,7 +150,8 @@ function events.handle(event)
opts.STATUS_NAMES[event.status]
)
end
members.set(event.uri, event.status, event.incarnation, { payload = event.payload })
members.set(event.uri, event.status, event.incarnation, { payload = event.payload },
{ forbid_new = _global_params.forbid_new_members })

for cond, _ in pairs(_subscribers) do
cond:broadcast()
Expand Down
11 changes: 9 additions & 2 deletions membership/members.lua
Original file line number Diff line number Diff line change
Expand Up @@ -84,10 +84,17 @@ function members.filter_excluding(state, uri1, uri2)
return ret
end

function members.set(uri, status, incarnation, params)
checks('string', 'number', 'number', { payload = '?table', clock_delta = '?number' })
function members.set(uri, status, incarnation, params, set_opts)
checks('string', 'number', 'number',
{ payload = '?table', clock_delta = '?number', },
{ forbid_new = '?boolean', }
)

local member = _all_members[uri]
if member == nil and set_opts and set_opts.forbid_new then
opts.log_debug("New member %s was skipped", uri)
return
end
if member and incarnation < member.incarnation then
error('Can not downgrade incarnation')
end
Expand Down
3 changes: 3 additions & 0 deletions membership/stash.lua
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
local S = rawget(_G, '__membership_stash') or {}
S.global_params = S.global_params or {
forbid_new_members = false,
}

local log = require('log')

Expand Down

0 comments on commit 6947d2f

Please sign in to comment.