Skip to content

Commit

Permalink
some pocing how metrics could be used
Browse files Browse the repository at this point in the history
  • Loading branch information
ktoso committed Aug 26, 2020
1 parent c5ff126 commit 28317b4
Show file tree
Hide file tree
Showing 3 changed files with 99 additions and 18 deletions.
86 changes: 74 additions & 12 deletions Sources/SWIM/Metrics.swift
Original file line number Diff line number Diff line change
Expand Up @@ -17,36 +17,41 @@ import Metrics
extension SWIM {
public struct Metrics {
/// Number of members (total)
let members: Gauge
public let members: Gauge
/// Number of members (alive)
let membersAlive: Gauge
public let membersAlive: Gauge
/// Number of members (suspect)
let membersSuspect: Gauge
public let membersSuspect: Gauge
/// Number of members (unreachable)
let membersUnreachable: Gauge
public let membersUnreachable: Gauge
/// Number of members (dead)
let membersDead: Gauge
public let membersDead: Gauge

/// Records time it takes for ping round-trips
let roundTripTime: Timer // TODO: could do dimensions
public let roundTripTime: Timer // TODO: could do dimensions

/// Records time it takes for (every) pingRequest round-trip
let pingRequestResponseTimeAll: Timer
let pingRequestResponseTimeFirst: Timer
public let pingRequestResponseTimeAll: Timer
public let pingRequestResponseTimeFirst: Timer

/// Records the incarnation of the SWIM instance.
///
/// Incarnation numbers are bumped whenever the node needs to refute some gossip about itself,
/// as such the incarnation number *growth* is an interesting indicator of cluster observation churn.
let incarnation: Gauge
public let incarnation: Gauge

let successfulProbeCount: Gauge
public let successfulProbeCount: Gauge

let failedProbeCount: Gauge
public let failedProbeCount: Gauge

// TODO: message sizes (count and bytes)
public let messageCountInbound: Counter
public let messageBytesInbound: Recorder

init(settings: SWIM.Settings) {
public let messageCountOutbound: Counter
public let messageBytesOutbound: Recorder

public init(settings: SWIM.Settings) {
self.members = Gauge(
label: settings.metrics.makeLabel("members"),
dimensions: [("status", "all")]
Expand Down Expand Up @@ -87,6 +92,63 @@ extension SWIM {
label: settings.metrics.makeLabel("incarnation"),
dimensions: [("type", "failed")]
)

// TODO: how to best design the labels?
self.messageCountInbound = Counter(
label: settings.metrics.makeLabel("message"),
dimensions: [
("type", "count"),
("direction", "in"),
]
)
self.messageBytesInbound = Recorder(
label: settings.metrics.makeLabel("message"),
dimensions: [
("type", "bytes"),
("direction", "in"),
]
)

self.messageCountOutbound = Counter(
label: settings.metrics.makeLabel("message"),
dimensions: [
("type", "count"),
("direction", "out"),
]
)
self.messageBytesOutbound = Recorder(
label: settings.metrics.makeLabel("message"),
dimensions: [
("type", "bytes"),
("direction", "out"),
]
)
}
}
}

extension SWIM.Metrics {
/// Update member metrics metrics based on SWIM's membership.
public func updateMembership(_ members: SWIM.Membership) {
var alives = 0
var suspects = 0
var unreachables = 0
var deads = 0
for member in members {
switch member.status {
case .alive:
alives += 1
case .suspect:
suspects += 1
case .unreachable:
unreachables += 1
case .dead:
deads += 1
}
}
self.membersAlive.record(alives)
self.membersSuspect.record(suspects)
self.membersUnreachable.record(unreachables)
self.membersDead.record(deads)
}
}
17 changes: 14 additions & 3 deletions Sources/SWIM/SWIMInstance.swift
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,11 @@ extension SWIM {
private let peer: SWIMPeer

/// Main members storage, map to values to obtain current members.
internal var _members: [ClusterMembership.Node: SWIM.Member]
internal var _members: [ClusterMembership.Node: SWIM.Member] {
didSet {
self.metrics.updateMembership(self.members)
}
}

/// List of members maintained in random yet stable order, see `addMember` for details.
internal var membersToPing: [SWIM.Member]
Expand Down Expand Up @@ -331,6 +335,13 @@ extension SWIM {

private var _incarnation: SWIM.Incarnation = 0

private func nextIncarnation() {
defer {
self.metrics.incarnation.record(self._incarnation)
}
self._incarnation += 1
}

/// Creates a new SWIM algorithm instance.
public init(settings: SWIM.Settings, myself: SWIMPeer) {
self.settings = settings
Expand Down Expand Up @@ -1390,7 +1401,7 @@ extension SWIM.Instance {
// the incremented incarnation
if suspectedInIncarnation == self.incarnation {
self.adjustLHMultiplier(.refutingSuspectMessageAboutSelf)
self._incarnation += 1
self.nextIncarnation()
// refute the suspicion, we clearly are still alive
self.addToGossip(member: self.member)
return .applied(change: nil)
Expand All @@ -1412,7 +1423,7 @@ extension SWIM.Instance {
// someone suspected us,
// so we need to increment our incarnation number to spread our alive status with the incremented incarnation
if unreachableInIncarnation == self.incarnation {
self._incarnation += 1
self.nextIncarnation()
return .ignored
} else if unreachableInIncarnation > self.incarnation {
self.log.warning("""
Expand Down
14 changes: 11 additions & 3 deletions Sources/SWIMNIOExample/SWIMNIOHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -159,15 +159,17 @@ public final class SWIMNIOHandler: ChannelDuplexHandler {
#else
let callbackKey = PendingResponseCallbackIdentifier(peerAddress: remoteAddress, sequenceNumber: message.sequenceNumber)
#endif
if let callback = self.pendingReplyCallbacks.removeValue(forKey: callbackKey) {

if let index = self.pendingReplyCallbacks.index(forKey: callbackKey) {
let (storedKey, callback) = self.pendingReplyCallbacks.remove(at: index)
// TODO: UIDs of nodes matter
self.log.trace("Received response, key: \(callbackKey); Invoking callback...", metadata: [
"pending/callbacks": Logger.MetadataValue.array(self.pendingReplyCallbacks.map { "\($0)" }),
])
self.shell.swim.metrics.roundTripTime.recordNanoseconds(callback.nanosecondsSinceCallbackStored())
self.shell.swim.metrics.roundTripTime.recordNanoseconds(storedKey.nanosecondsSinceCallbackStored().nanoseconds)
callback(.success(message))
} else {
self.log.trace("No callback for \(callbackKey)... It may have been removed due to a timeout already.", metadata: [
self.log.trace("No callback for \(callbackKey); It may have been removed due to a timeout already.", metadata: [
"pending callbacks": Logger.MetadataValue.array(self.pendingReplyCallbacks.map { "\($0)" }),
])
}
Expand Down Expand Up @@ -203,6 +205,9 @@ extension SWIMNIOHandler {
throw MissingDataError("No data to read")
}

self.shell?.swim.metrics.messageCountInbound.increment()
self.shell?.swim.metrics.messageBytesInbound.record(bytes.readableBytes)

let decoder = SWIMNIODefaultDecoder()
decoder.userInfo[.channelUserInfoKey] = channel
return try decoder.decode(SWIM.Message.self, from: data)
Expand All @@ -212,6 +217,9 @@ extension SWIMNIOHandler {
let encoder = SWIMNIODefaultEncoder()
let data = try encoder.encode(message)

self.shell?.swim.metrics.messageCountOutbound.increment()
self.shell?.swim.metrics.messageBytesOutbound.record(data.count)

let buffer = data.withUnsafeBytes { bytes -> ByteBuffer in
var buffer = allocator.buffer(capacity: data.count)
buffer.writeBytes(bytes)
Expand Down

0 comments on commit 28317b4

Please sign in to comment.