From 28317b41ee4de84395b6004d103a73c9edb48f6a Mon Sep 17 00:00:00 2001 From: Konrad `ktoso` Malawski Date: Thu, 27 Aug 2020 01:21:51 +0900 Subject: [PATCH] some pocing how metrics could be used --- Sources/SWIM/Metrics.swift | 86 ++++++++++++++++++--- Sources/SWIM/SWIMInstance.swift | 17 +++- Sources/SWIMNIOExample/SWIMNIOHandler.swift | 14 +++- 3 files changed, 99 insertions(+), 18 deletions(-) diff --git a/Sources/SWIM/Metrics.swift b/Sources/SWIM/Metrics.swift index 7d2b558..c0cf070 100644 --- a/Sources/SWIM/Metrics.swift +++ b/Sources/SWIM/Metrics.swift @@ -17,36 +17,41 @@ import Metrics extension SWIM { public struct Metrics { /// Number of members (total) - let members: Gauge + public let members: Gauge /// Number of members (alive) - let membersAlive: Gauge + public let membersAlive: Gauge /// Number of members (suspect) - let membersSuspect: Gauge + public let membersSuspect: Gauge /// Number of members (unreachable) - let membersUnreachable: Gauge + public let membersUnreachable: Gauge /// Number of members (dead) - let membersDead: Gauge + public let membersDead: Gauge /// Records time it takes for ping round-trips - let roundTripTime: Timer // TODO: could do dimensions + public let roundTripTime: Timer // TODO: could do dimensions /// Records time it takes for (every) pingRequest round-trip - let pingRequestResponseTimeAll: Timer - let pingRequestResponseTimeFirst: Timer + public let pingRequestResponseTimeAll: Timer + public let pingRequestResponseTimeFirst: Timer /// Records the incarnation of the SWIM instance. /// /// Incarnation numbers are bumped whenever the node needs to refute some gossip about itself, /// as such the incarnation number *growth* is an interesting indicator of cluster observation churn. - let incarnation: Gauge + public let incarnation: Gauge - let successfulProbeCount: Gauge + public let successfulProbeCount: Gauge - let failedProbeCount: Gauge + public let failedProbeCount: Gauge // TODO: message sizes (count and bytes) + public let messageCountInbound: Counter + public let messageBytesInbound: Recorder - init(settings: SWIM.Settings) { + public let messageCountOutbound: Counter + public let messageBytesOutbound: Recorder + + public init(settings: SWIM.Settings) { self.members = Gauge( label: settings.metrics.makeLabel("members"), dimensions: [("status", "all")] @@ -87,6 +92,63 @@ extension SWIM { label: settings.metrics.makeLabel("incarnation"), dimensions: [("type", "failed")] ) + + // TODO: how to best design the labels? + self.messageCountInbound = Counter( + label: settings.metrics.makeLabel("message"), + dimensions: [ + ("type", "count"), + ("direction", "in"), + ] + ) + self.messageBytesInbound = Recorder( + label: settings.metrics.makeLabel("message"), + dimensions: [ + ("type", "bytes"), + ("direction", "in"), + ] + ) + + self.messageCountOutbound = Counter( + label: settings.metrics.makeLabel("message"), + dimensions: [ + ("type", "count"), + ("direction", "out"), + ] + ) + self.messageBytesOutbound = Recorder( + label: settings.metrics.makeLabel("message"), + dimensions: [ + ("type", "bytes"), + ("direction", "out"), + ] + ) + } + } +} + +extension SWIM.Metrics { + /// Update member metrics metrics based on SWIM's membership. + public func updateMembership(_ members: SWIM.Membership) { + var alives = 0 + var suspects = 0 + var unreachables = 0 + var deads = 0 + for member in members { + switch member.status { + case .alive: + alives += 1 + case .suspect: + suspects += 1 + case .unreachable: + unreachables += 1 + case .dead: + deads += 1 + } } + self.membersAlive.record(alives) + self.membersSuspect.record(suspects) + self.membersUnreachable.record(unreachables) + self.membersDead.record(deads) } } diff --git a/Sources/SWIM/SWIMInstance.swift b/Sources/SWIM/SWIMInstance.swift index 114b058..1c7da3c 100644 --- a/Sources/SWIM/SWIMInstance.swift +++ b/Sources/SWIM/SWIMInstance.swift @@ -262,7 +262,11 @@ extension SWIM { private let peer: SWIMPeer /// Main members storage, map to values to obtain current members. - internal var _members: [ClusterMembership.Node: SWIM.Member] + internal var _members: [ClusterMembership.Node: SWIM.Member] { + didSet { + self.metrics.updateMembership(self.members) + } + } /// List of members maintained in random yet stable order, see `addMember` for details. internal var membersToPing: [SWIM.Member] @@ -331,6 +335,13 @@ extension SWIM { private var _incarnation: SWIM.Incarnation = 0 + private func nextIncarnation() { + defer { + self.metrics.incarnation.record(self._incarnation) + } + self._incarnation += 1 + } + /// Creates a new SWIM algorithm instance. public init(settings: SWIM.Settings, myself: SWIMPeer) { self.settings = settings @@ -1390,7 +1401,7 @@ extension SWIM.Instance { // the incremented incarnation if suspectedInIncarnation == self.incarnation { self.adjustLHMultiplier(.refutingSuspectMessageAboutSelf) - self._incarnation += 1 + self.nextIncarnation() // refute the suspicion, we clearly are still alive self.addToGossip(member: self.member) return .applied(change: nil) @@ -1412,7 +1423,7 @@ extension SWIM.Instance { // someone suspected us, // so we need to increment our incarnation number to spread our alive status with the incremented incarnation if unreachableInIncarnation == self.incarnation { - self._incarnation += 1 + self.nextIncarnation() return .ignored } else if unreachableInIncarnation > self.incarnation { self.log.warning(""" diff --git a/Sources/SWIMNIOExample/SWIMNIOHandler.swift b/Sources/SWIMNIOExample/SWIMNIOHandler.swift index 2ea19b7..cb9426d 100644 --- a/Sources/SWIMNIOExample/SWIMNIOHandler.swift +++ b/Sources/SWIMNIOExample/SWIMNIOHandler.swift @@ -159,15 +159,17 @@ public final class SWIMNIOHandler: ChannelDuplexHandler { #else let callbackKey = PendingResponseCallbackIdentifier(peerAddress: remoteAddress, sequenceNumber: message.sequenceNumber) #endif - if let callback = self.pendingReplyCallbacks.removeValue(forKey: callbackKey) { + + if let index = self.pendingReplyCallbacks.index(forKey: callbackKey) { + let (storedKey, callback) = self.pendingReplyCallbacks.remove(at: index) // TODO: UIDs of nodes matter self.log.trace("Received response, key: \(callbackKey); Invoking callback...", metadata: [ "pending/callbacks": Logger.MetadataValue.array(self.pendingReplyCallbacks.map { "\($0)" }), ]) - self.shell.swim.metrics.roundTripTime.recordNanoseconds(callback.nanosecondsSinceCallbackStored()) + self.shell.swim.metrics.roundTripTime.recordNanoseconds(storedKey.nanosecondsSinceCallbackStored().nanoseconds) callback(.success(message)) } else { - self.log.trace("No callback for \(callbackKey)... It may have been removed due to a timeout already.", metadata: [ + self.log.trace("No callback for \(callbackKey); It may have been removed due to a timeout already.", metadata: [ "pending callbacks": Logger.MetadataValue.array(self.pendingReplyCallbacks.map { "\($0)" }), ]) } @@ -203,6 +205,9 @@ extension SWIMNIOHandler { throw MissingDataError("No data to read") } + self.shell?.swim.metrics.messageCountInbound.increment() + self.shell?.swim.metrics.messageBytesInbound.record(bytes.readableBytes) + let decoder = SWIMNIODefaultDecoder() decoder.userInfo[.channelUserInfoKey] = channel return try decoder.decode(SWIM.Message.self, from: data) @@ -212,6 +217,9 @@ extension SWIMNIOHandler { let encoder = SWIMNIODefaultEncoder() let data = try encoder.encode(message) + self.shell?.swim.metrics.messageCountOutbound.increment() + self.shell?.swim.metrics.messageBytesOutbound.record(data.count) + let buffer = data.withUnsafeBytes { bytes -> ByteBuffer in var buffer = allocator.buffer(capacity: data.count) buffer.writeBytes(bytes)