Skip to content

Commit

Permalink
+swim #10 implement tombstones for dead members in instance
Browse files Browse the repository at this point in the history
  • Loading branch information
ktoso committed Sep 3, 2020
1 parent efdee27 commit c96d8fb
Show file tree
Hide file tree
Showing 3 changed files with 162 additions and 7 deletions.
73 changes: 67 additions & 6 deletions Sources/SWIM/SWIMInstance.swift
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,9 @@ extension SWIM {
self._membersToPingIndex
}

/// Tombstones are needed to avoid accidentally re-adding a member that we confirmed as dead already.
internal var removedDeadMemberTombstones: Set<MemberTombstone> = []

private var _sequenceNumber: SWIM.SequenceNumber = 0
/// Sequence numbers are used to identify messages and pair them up into request/replies.
/// - SeeAlso: `SWIM.SequenceNumber`
Expand Down Expand Up @@ -392,10 +395,16 @@ extension SWIM {

/// Take care about peers with UID and without (!), some shells may not be quite good about this.
internal func addMember(_ peer: SWIMPeer, status: SWIM.Status) -> [AddMemberDirective] {
let maybeExistingMember = self.member(for: peer)

var directives: [AddMemberDirective] = []

if self.hasTombstone(peer.node) {
// We saw this member already and even confirmed it dead, it shall never be added again
self.log.debug("Attempt to re-add already confirmed dead peer \(peer), ignoring it.")
directives.append(.memberAlreadyKnownDead(Member(peer: peer, status: .dead, protocolPeriod: 0)))
return directives
}

let maybeExistingMember = self.member(for: peer)
if let existingMember = maybeExistingMember, existingMember.status.supersedes(status) {
// we already have a newer state for this member
directives.append(.newerMemberAlreadyPresent(existingMember))
Expand Down Expand Up @@ -429,7 +438,7 @@ extension SWIM {
let member = SWIM.Member(peer: peer, status: status, protocolPeriod: self.protocolPeriod)
self._members[member.node] = member

if self.notMyself(member) && !member.isDead {
if self.notMyself(member), !member.isDead {
// We know this is a new member.
//
// Newly added members are inserted at a random spot in the list of members
Expand Down Expand Up @@ -470,6 +479,12 @@ extension SWIM {
/// We already have information about this exact `Member`, and our information is more recent (higher incarnation number).
/// The incoming information was discarded and the returned here member is the most up to date information we have.
case newerMemberAlreadyPresent(SWIM.Member)
/// Member already was part of the cluster, became dead and we removed it.
/// It shall never be part of the cluster again.
///
/// This is only enforced by tombstones which are kept in the system for a period of time,
/// in the hope that all other nodes stop gossiping about this known dead member until then as well.
case memberAlreadyKnownDead(SWIM.Member)
}

/// Implements the round-robin yet shuffled member to probe selection as proposed in the SWIM paper.
Expand Down Expand Up @@ -534,7 +549,7 @@ extension SWIM {
} else if case .suspect = status {
suspicionStartedAt = self.now()
} else if case .unreachable = status,
case .disabled = self.settings.unreachability {
case SWIM.Settings.UnreachabilitySettings.disabled = self.settings.unreachability {
self.log.warning("Attempted to mark \(peer.node) as `.unreachable`, but unreachability is disabled! Promoting to `.dead`!")
status = .dead
}
Expand All @@ -550,6 +565,11 @@ extension SWIM {
if status.isDead {
self._members.removeValue(forKey: peer.node)
self.removeFromMembersToPing(member)
if let uid = member.node.uid {
let deadline = self.protocolPeriod + self.settings.tombstoneTimeToLiveInTicks
let tombstone = MemberTombstone(uid: uid, deadlineProtocolPeriod: deadline)
self.removedDeadMemberTombstones.insert(tombstone)
}
}

self.resetGossipPayloads(member: member)
Expand Down Expand Up @@ -743,7 +763,7 @@ extension SWIM.Instance {
}

func notMyself(_ peer: SWIMAddressablePeer) -> Bool {
!self.isMyself(peer)
!self.isMyself(peer.node)
}

func isMyself(_ member: SWIM.Member) -> Bool {
Expand Down Expand Up @@ -889,6 +909,12 @@ extension SWIM.Instance {
)
}

// 3) periodic cleanup of tombstones
// TODO: could be optimized a bit to keep the "oldest one" and know if we have to scan already or not yet" etc
if self.protocolPeriod % UInt64(self.settings.tombstoneCleanupIntervalInTicks) == 0 {
cleanupTombstones()
}

// 3) ALWAYS schedule the next tick
directives.append(.scheduleNextTick(delay: self.dynamicLHMProtocolInterval))

Expand Down Expand Up @@ -1495,12 +1521,14 @@ extension SWIM.Instance {

// the Shell may need to set up a connection if we just made a move from previousStatus: nil,
// so we definitely need to emit this change
return self.addMember(member.peer, status: member.status).map { directive in
return self.addMember(member.peer, status: member.status).compactMap { directive in
switch directive {
case .added(let member):
return .applied(change: SWIM.MemberStatusChangedEvent(previousStatus: nil, member: member))
case .previousHostPortMemberConfirmedDead(let change):
return .applied(change: change)
case .memberAlreadyKnownDead:
return nil
case .newerMemberAlreadyPresent(let member):
return .applied(change: SWIM.MemberStatusChangedEvent(previousStatus: nil, member: member))
}
Expand Down Expand Up @@ -1571,6 +1599,39 @@ extension SWIM.Instance {
/// The confirmation had not effect, either the peer was not known, or is already dead.
case ignored
}

/// Returns if this node is known to have already been marked dead at some point.
func hasTombstone(_ node: Node) -> Bool {
guard let uid = node.uid else {
return false
}

let anythingAsNotTakenIntoAccountInEquality: UInt64 = 0
return self.removedDeadMemberTombstones.contains(.init(uid: uid, deadlineProtocolPeriod: anythingAsNotTakenIntoAccountInEquality))
}

private func cleanupTombstones() { // time to cleanup the tombstones
self.removedDeadMemberTombstones = self.removedDeadMemberTombstones.filter {
// keep the ones where their deadline is still in the future
self.protocolPeriod < $0.deadlineProtocolPeriod
}
}

/// Used to store known "confirmed dead" member unique identifiers.
struct MemberTombstone: Hashable {
/// UID of the dead member
let uid: UInt64
/// After how many protocol periods ("ticks") should this tombstone be cleaned up
let deadlineProtocolPeriod: UInt64

func hash(into hasher: inout Hasher) {
hasher.combine(self.uid)
}

static func == (lhs: MemberTombstone, rhs: MemberTombstone) -> Bool {
lhs.uid == rhs.uid
}
}
}

extension SWIM.Instance: CustomDebugStringConvertible {
Expand Down
25 changes: 24 additions & 1 deletion Sources/SWIM/Settings.swift
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,29 @@ extension SWIM {
}
}

/// When a member is "confirmed dead" we stop gossiping about it and in order to prevent a node to accidentally
/// re-join the cluster by us having fully forgotten about it while it still remains lingering around, we use tombstones.
///
/// The time to live configures how long the tombstones are kept around, meaning some accumulating overhead,
/// however added safety in case the node "comes back". Note that this may be solved on higher level layers
/// e.g. by forbidding such node to even form a connection to us in a connection-ful implementation, in such case
/// lower timeouts are permittable.
///
/// Assuming a default of 1 second per protocol period (probe interval), the default value results in 4 hours of delay.
public var tombstoneTimeToLiveInTicks: UInt64 =
4 * 60 * 60

/// An interval, as expressed in number of `probeInterval` ticks.
///
/// Every so often the additional task of checking the accumulated tombstones for any overdue ones (see `tombstoneTimeToLive`),
/// will be performed. Outdated tombstones are then removed. This is done this way to benefit from using a plain Set of the tombstones
/// for the checking if a peer has a tombstone or not (O(1), performed frequently), while only having to clean them up periodically (O(n)).
public var tombstoneCleanupIntervalInTicks: Int = 5 * 60 {
willSet {
precondition(newValue > 0, "`tombstoneCleanupIntervalInTicks` MUST be > 0")
}
}

/// Optional feature: Set of "initial contact points" to automatically contact and join upon starting a node
///
/// Optionally, a Shell implementation MAY use this setting automatically contact a set of initial contact point nodes,
Expand All @@ -84,7 +107,7 @@ extension SWIM {
/// Interval at which gossip messages should be issued.
/// This property sets only a base value of probe interval, which will later be multiplied by `SWIM.Instance.localHealthMultiplier`.
/// - SeeAlso: `maxLocalHealthMultiplier`
/// Every `interval` a `fan-out` number of gossip messages will be sent. // TODO which fanout, better docs
/// Every `interval` a `fan-out` number of gossip messages will be sent.
public var probeInterval: DispatchTimeInterval = .seconds(1)

/// Time amount after which a sent ping without ack response is considered timed-out.
Expand Down
71 changes: 71 additions & 0 deletions Tests/SWIMTests/SWIMInstanceTests.swift
Original file line number Diff line number Diff line change
Expand Up @@ -1289,6 +1289,77 @@ final class SWIMInstanceTests: XCTestCase {
XCTAssertNotEqual(swim.nextPeerToPing()?.node, self.second.node)
}

func test_confirmDead_shouldStoreATombstone_disallowAddingAgain() throws {
var settings = SWIM.Settings()
settings.unreachability = .enabled
let swim = SWIM.Instance(settings: settings, myself: self.myself)

_ = swim.addMember(self.second, status: .alive(incarnation: 10))
_ = swim.addMember(self.third, status: .alive(incarnation: 10))

let secondMember = swim.member(for: self.secondNode)!

_ = swim.confirmDead(peer: self.second)
XCTAssertFalse(swim.members.contains(secondMember))
XCTAssertFalse(swim.membersToPing.contains(secondMember))

// "you are already dead"
let directives = swim.addMember(self.second, status: .alive(incarnation: 100))

// no mercy for zombies; don't add it again
XCTAssertTrue(directives.count == 1)
switch directives.first {
case .memberAlreadyKnownDead(let dead):
XCTAssertEqual(dead.status, SWIM.Status.dead)
XCTAssertEqual(dead.node, self.secondNode)
default:
XCTFail("")
}
XCTAssertFalse(swim.members.contains(secondMember))
XCTAssertFalse(swim.membersToPing.contains(secondMember))
}

func test_confirmDead_tombstone_shouldExpireAfterConfiguredAmountOfTicks() throws {
var settings = SWIM.Settings()
settings.tombstoneCleanupIntervalInTicks = 3
settings.tombstoneTimeToLiveInTicks = 2
let swim = SWIM.Instance(settings: settings, myself: self.myself)

_ = swim.addMember(self.second, status: .alive(incarnation: 10))
_ = swim.addMember(self.third, status: .alive(incarnation: 10))

let secondMember = swim.member(for: self.secondNode)!

_ = swim.confirmDead(peer: self.second)
XCTAssertFalse(swim.membersToPing.contains(secondMember))

XCTAssertTrue(
swim.removedDeadMemberTombstones
.contains(.init(uid: self.secondNode.uid!, deadlineProtocolPeriod: 0 /* not part of equality*/ ))
)

_ = swim.onPeriodicPingTick()
_ = swim.onPeriodicPingTick()

XCTAssertTrue(
swim.removedDeadMemberTombstones
.contains(.init(uid: self.secondNode.uid!, deadlineProtocolPeriod: 0 /* not part of equality*/ ))
)

_ = swim.onPeriodicPingTick()
_ = swim.onPeriodicPingTick()

XCTAssertFalse(
swim.removedDeadMemberTombstones
.contains(.init(uid: self.secondNode.uid!, deadlineProtocolPeriod: 0 /* not part of equality*/ ))
)

// past the deadline and tombstone expiration, we'd be able to smuggle in that node again...!
_ = swim.addMember(self.second, status: .alive(incarnation: 135_342))
let member = swim.member(for: self.second)
XCTAssertEqual(member?.node, self.secondNode)
}

// ==== ----------------------------------------------------------------------------------------------------------------
// MARK: Sanity checks

Expand Down

0 comments on commit c96d8fb

Please sign in to comment.