Skip to content

Commit

Permalink
Record begin and end timestamps for relays (#2701)
Browse files Browse the repository at this point in the history
For fighting jamming attempts, or even just to detect one, we need to know how fast relayed HTLCs are fulfilled. We now measure this and store it in the audit database. Previously the "IN" and "OUT" directions for the same HTLC were storing the same timestamp (corresponding to when the HTLC is fulfilled), we now use the timestamp at which we received the UpdateAddHtlc for the "IN" direction.
  • Loading branch information
thomash-acinq authored Jun 23, 2023
1 parent 1519dd0 commit 9db0063
Show file tree
Hide file tree
Showing 18 changed files with 141 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ object Origin {
object Hot {
def apply(replyTo: ActorRef, upstream: Upstream): Hot = upstream match {
case u: Upstream.Local => Origin.LocalHot(replyTo, u.id)
case u: Upstream.Trampoline => Origin.TrampolineRelayedHot(replyTo, u.adds)
case u: Upstream.Trampoline => Origin.TrampolineRelayedHot(replyTo, u.adds.map(_.add))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,12 +76,12 @@ class DbEventHandler(nodeParams: NodeParams) extends Actor with DiagnosticActorL
.withTag(PaymentTags.Relay, PaymentTags.RelayType(e))
.record((e.amountIn - e.amountOut).truncateToSatoshi.toLong)
e match {
case TrampolinePaymentRelayed(_, incoming, outgoing, _, _, _) =>
case TrampolinePaymentRelayed(_, incoming, outgoing, _, _) =>
PaymentMetrics.PaymentParts.withTag(PaymentTags.Direction, PaymentTags.Directions.Received).record(incoming.length)
PaymentMetrics.PaymentParts.withTag(PaymentTags.Direction, PaymentTags.Directions.Sent).record(outgoing.length)
incoming.foreach(p => channelsDb.updateChannelMeta(p.channelId, ChannelEvent.EventType.PaymentReceived))
outgoing.foreach(p => channelsDb.updateChannelMeta(p.channelId, ChannelEvent.EventType.PaymentSent))
case ChannelPaymentRelayed(_, _, _, fromChannelId, toChannelId, _) =>
case ChannelPaymentRelayed(_, _, _, fromChannelId, toChannelId, _, _) =>
channelsDb.updateChannelMeta(fromChannelId, ChannelEvent.EventType.PaymentReceived)
channelsDb.updateChannelMeta(toChannelId, ChannelEvent.EventType.PaymentSent)
}
Expand Down
24 changes: 12 additions & 12 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgAuditDb.scala
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
migration910(statement)
}
if (v < 11) {
migration1011(statement)
migration1011(statement)
}
case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do
case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion")
Expand Down Expand Up @@ -228,10 +228,10 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
override def add(e: PaymentRelayed): Unit = withMetrics("audit/add-payment-relayed", DbBackends.Postgres) {
inTransaction { pg =>
val payments = e match {
case ChannelPaymentRelayed(amountIn, amountOut, _, fromChannelId, toChannelId, ts) =>
case ChannelPaymentRelayed(amountIn, amountOut, _, fromChannelId, toChannelId, startedAt, settledAt) =>
// non-trampoline relayed payments have one input and one output
Seq(RelayedPart(fromChannelId, amountIn, "IN", "channel", ts), RelayedPart(toChannelId, amountOut, "OUT", "channel", ts))
case TrampolinePaymentRelayed(_, incoming, outgoing, nextTrampolineNodeId, nextTrampolineAmount, ts) =>
Seq(RelayedPart(fromChannelId, amountIn, "IN", "channel", startedAt), RelayedPart(toChannelId, amountOut, "OUT", "channel", settledAt))
case TrampolinePaymentRelayed(_, incoming, outgoing, nextTrampolineNodeId, nextTrampolineAmount) =>
using(pg.prepareStatement("INSERT INTO audit.relayed_trampoline VALUES (?, ?, ?, ?)")) { statement =>
statement.setString(1, e.paymentHash.toHex)
statement.setLong(2, nextTrampolineAmount.toLong)
Expand All @@ -240,7 +240,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
statement.executeUpdate()
}
// trampoline relayed payments do MPP aggregation and may have M inputs and N outputs
incoming.map(i => RelayedPart(i.channelId, i.amount, "IN", "trampoline", ts)) ++ outgoing.map(o => RelayedPart(o.channelId, o.amount, "OUT", "trampoline", ts))
incoming.map(i => RelayedPart(i.channelId, i.amount, "IN", "trampoline", i.receivedAt)) ++ outgoing.map(o => RelayedPart(o.channelId, o.amount, "OUT", "trampoline", o.settledAt))
}
for (p <- payments) {
using(pg.prepareStatement("INSERT INTO audit.relayed VALUES (?, ?, ?, ?, ?, ?)")) { statement =>
Expand All @@ -249,7 +249,7 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
statement.setString(3, p.channelId.toHex)
statement.setString(4, p.direction)
statement.setString(5, p.relayType)
statement.setTimestamp(6, e.timestamp.toSqlTimestamp)
statement.setTimestamp(6, p.timestamp.toSqlTimestamp)
statement.executeUpdate()
}
}
Expand Down Expand Up @@ -425,15 +425,15 @@ class PgAuditDb(implicit ds: DataSource) extends AuditDb with Logging {
case (paymentHash, parts) =>
// We may have been routing multiple payments for the same payment_hash (MPP) in both cases (trampoline and channel).
// NB: we may link the wrong in-out parts, but the overall sum will be correct: we sort by amounts to minimize the risk of mismatch.
val incoming = parts.filter(_.direction == "IN").map(p => PaymentRelayed.Part(p.amount, p.channelId)).sortBy(_.amount)
val outgoing = parts.filter(_.direction == "OUT").map(p => PaymentRelayed.Part(p.amount, p.channelId)).sortBy(_.amount)
val incoming = parts.filter(_.direction == "IN").map(p => PaymentRelayed.IncomingPart(p.amount, p.channelId, p.timestamp)).sortBy(_.amount)
val outgoing = parts.filter(_.direction == "OUT").map(p => PaymentRelayed.OutgoingPart(p.amount, p.channelId, p.timestamp)).sortBy(_.amount)
parts.headOption match {
case Some(RelayedPart(_, _, _, "channel", timestamp)) => incoming.zip(outgoing).map {
case (in, out) => ChannelPaymentRelayed(in.amount, out.amount, paymentHash, in.channelId, out.channelId, timestamp)
case Some(RelayedPart(_, _, _, "channel", _)) => incoming.zip(outgoing).map {
case (in, out) => ChannelPaymentRelayed(in.amount, out.amount, paymentHash, in.channelId, out.channelId, in.receivedAt, out.settledAt)
}
case Some(RelayedPart(_, _, _, "trampoline", timestamp)) =>
case Some(RelayedPart(_, _, _, "trampoline", _)) =>
val (nextTrampolineAmount, nextTrampolineNodeId) = trampolineByHash.getOrElse(paymentHash, (0 msat, PlaceHolderPubKey))
TrampolinePaymentRelayed(paymentHash, incoming, outgoing, nextTrampolineNodeId, nextTrampolineAmount, timestamp) :: Nil
TrampolinePaymentRelayed(paymentHash, incoming, outgoing, nextTrampolineNodeId, nextTrampolineAmount) :: Nil
case _ => Nil
}
}.toSeq.sortBy(_.timestamp)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,10 +216,10 @@ class SqliteAuditDb(val sqlite: Connection) extends AuditDb with Logging {

override def add(e: PaymentRelayed): Unit = withMetrics("audit/add-payment-relayed", DbBackends.Sqlite) {
val payments = e match {
case ChannelPaymentRelayed(amountIn, amountOut, _, fromChannelId, toChannelId, ts) =>
case ChannelPaymentRelayed(amountIn, amountOut, _, fromChannelId, toChannelId, startedAt, settledAt) =>
// non-trampoline relayed payments have one input and one output
Seq(RelayedPart(fromChannelId, amountIn, "IN", "channel", ts), RelayedPart(toChannelId, amountOut, "OUT", "channel", ts))
case TrampolinePaymentRelayed(_, incoming, outgoing, nextTrampolineNodeId, nextTrampolineAmount, ts) =>
Seq(RelayedPart(fromChannelId, amountIn, "IN", "channel", startedAt), RelayedPart(toChannelId, amountOut, "OUT", "channel", settledAt))
case TrampolinePaymentRelayed(_, incoming, outgoing, nextTrampolineNodeId, nextTrampolineAmount) =>
using(sqlite.prepareStatement("INSERT INTO relayed_trampoline VALUES (?, ?, ?, ?)")) { statement =>
statement.setBytes(1, e.paymentHash.toArray)
statement.setLong(2, nextTrampolineAmount.toLong)
Expand All @@ -228,8 +228,8 @@ class SqliteAuditDb(val sqlite: Connection) extends AuditDb with Logging {
statement.executeUpdate()
}
// trampoline relayed payments do MPP aggregation and may have M inputs and N outputs
incoming.map(i => RelayedPart(i.channelId, i.amount, "IN", "trampoline", ts)) ++
outgoing.map(o => RelayedPart(o.channelId, o.amount, "OUT", "trampoline", ts))
incoming.map(i => RelayedPart(i.channelId, i.amount, "IN", "trampoline", i.receivedAt)) ++
outgoing.map(o => RelayedPart(o.channelId, o.amount, "OUT", "trampoline", o.settledAt))
}
for (p <- payments) {
using(sqlite.prepareStatement("INSERT INTO relayed VALUES (?, ?, ?, ?, ?, ?)")) { statement =>
Expand All @@ -238,7 +238,7 @@ class SqliteAuditDb(val sqlite: Connection) extends AuditDb with Logging {
statement.setBytes(3, p.channelId.toArray)
statement.setString(4, p.direction)
statement.setString(5, p.relayType)
statement.setLong(6, e.timestamp.toLong)
statement.setLong(6, p.timestamp.toLong)
statement.executeUpdate()
}
}
Expand Down Expand Up @@ -397,15 +397,15 @@ class SqliteAuditDb(val sqlite: Connection) extends AuditDb with Logging {
case (paymentHash, parts) =>
// We may have been routing multiple payments for the same payment_hash (MPP) in both cases (trampoline and channel).
// NB: we may link the wrong in-out parts, but the overall sum will be correct: we sort by amounts to minimize the risk of mismatch.
val incoming = parts.filter(_.direction == "IN").map(p => PaymentRelayed.Part(p.amount, p.channelId)).sortBy(_.amount)
val outgoing = parts.filter(_.direction == "OUT").map(p => PaymentRelayed.Part(p.amount, p.channelId)).sortBy(_.amount)
val incoming = parts.filter(_.direction == "IN").map(p => PaymentRelayed.IncomingPart(p.amount, p.channelId, p.timestamp)).sortBy(_.amount)
val outgoing = parts.filter(_.direction == "OUT").map(p => PaymentRelayed.OutgoingPart(p.amount, p.channelId, p.timestamp)).sortBy(_.amount)
parts.headOption match {
case Some(RelayedPart(_, _, _, "channel", timestamp)) => incoming.zip(outgoing).map {
case (in, out) => ChannelPaymentRelayed(in.amount, out.amount, paymentHash, in.channelId, out.channelId, timestamp)
case Some(RelayedPart(_, _, _, "channel", _)) => incoming.zip(outgoing).map {
case (in, out) => ChannelPaymentRelayed(in.amount, out.amount, paymentHash, in.channelId, out.channelId, in.receivedAt, out.settledAt)
}
case Some(RelayedPart(_, _, _, "trampoline", timestamp)) =>
case Some(RelayedPart(_, _, _, "trampoline", _)) =>
val (nextTrampolineAmount, nextTrampolineNodeId) = trampolineByHash.getOrElse(paymentHash, (0 msat, PlaceHolderPubKey))
TrampolinePaymentRelayed(paymentHash, incoming, outgoing, nextTrampolineNodeId, nextTrampolineAmount, timestamp) :: Nil
TrampolinePaymentRelayed(paymentHash, incoming, outgoing, nextTrampolineNodeId, nextTrampolineAmount) :: Nil
case _ => Nil
}
}.toSeq.sortBy(_.timestamp)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ object Monitoring {
val PaymentAttempt = Kamon.histogram("payment.attempt", "Number of attempts before a payment succeeds")
val SentPaymentDuration = Kamon.timer("payment.duration.sent", "Outgoing payment duration")
val ReceivedPaymentDuration = Kamon.timer("payment.duration.received", "Incoming payment duration")
val RelayedPaymentDuration = Kamon.timer("payment.duration.relayed", "Duration of pending downstream HTLCs during a relay")

// The goal of this metric is to measure whether retrying MPP payments on failing channels yields useful results.
// Once enough data has been collected, we will update the MultiPartPaymentLifecycle logic accordingly.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,22 +86,29 @@ case class PaymentFailed(id: UUID, paymentHash: ByteVector32, failures: Seq[Paym
sealed trait PaymentRelayed extends PaymentEvent {
val amountIn: MilliSatoshi
val amountOut: MilliSatoshi
val timestamp: TimestampMilli
val startedAt: TimestampMilli
val settledAt: TimestampMilli
}

case class ChannelPaymentRelayed(amountIn: MilliSatoshi, amountOut: MilliSatoshi, paymentHash: ByteVector32, fromChannelId: ByteVector32, toChannelId: ByteVector32, timestamp: TimestampMilli = TimestampMilli.now()) extends PaymentRelayed
case class ChannelPaymentRelayed(amountIn: MilliSatoshi, amountOut: MilliSatoshi, paymentHash: ByteVector32, fromChannelId: ByteVector32, toChannelId: ByteVector32, startedAt: TimestampMilli, settledAt: TimestampMilli) extends PaymentRelayed {
override val timestamp: TimestampMilli = settledAt
}

case class TrampolinePaymentRelayed(paymentHash: ByteVector32, incoming: PaymentRelayed.Incoming, outgoing: PaymentRelayed.Outgoing, nextTrampolineNodeId: PublicKey, nextTrampolineAmount: MilliSatoshi, timestamp: TimestampMilli = TimestampMilli.now()) extends PaymentRelayed {
case class TrampolinePaymentRelayed(paymentHash: ByteVector32, incoming: PaymentRelayed.Incoming, outgoing: PaymentRelayed.Outgoing, nextTrampolineNodeId: PublicKey, nextTrampolineAmount: MilliSatoshi) extends PaymentRelayed {
override val amountIn: MilliSatoshi = incoming.map(_.amount).sum
override val amountOut: MilliSatoshi = outgoing.map(_.amount).sum
override val startedAt: TimestampMilli = incoming.map(_.receivedAt).minOption.getOrElse(TimestampMilli.now())
override val settledAt: TimestampMilli = outgoing.map(_.settledAt).maxOption.getOrElse(TimestampMilli.now())
override val timestamp: TimestampMilli = settledAt
}

object PaymentRelayed {

case class Part(amount: MilliSatoshi, channelId: ByteVector32)
case class IncomingPart(amount: MilliSatoshi, channelId: ByteVector32, receivedAt: TimestampMilli)
case class OutgoingPart(amount: MilliSatoshi, channelId: ByteVector32, settledAt: TimestampMilli)

type Incoming = Seq[Part]
type Outgoing = Seq[Part]
type Incoming = Seq[IncomingPart]
type Outgoing = Seq[OutgoingPart]

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import fr.acinq.eclair.payment.send.Recipient
import fr.acinq.eclair.router.Router.{BlindedHop, ChannelHop, Route}
import fr.acinq.eclair.wire.protocol.PaymentOnion.{FinalPayload, IntermediatePayload, PerHopPayload}
import fr.acinq.eclair.wire.protocol._
import fr.acinq.eclair.{CltvExpiry, CltvExpiryDelta, Feature, Features, MilliSatoshi, ShortChannelId, UInt64, randomKey}
import fr.acinq.eclair.{CltvExpiry, CltvExpiryDelta, Feature, Features, MilliSatoshi, ShortChannelId, TimestampMilli, UInt64, randomKey}
import scodec.bits.ByteVector
import scodec.{Attempt, DecodeResult}

Expand Down Expand Up @@ -241,10 +241,12 @@ object OutgoingPaymentPacket {
sealed trait Upstream
object Upstream {
case class Local(id: UUID) extends Upstream
case class Trampoline(adds: Seq[UpdateAddHtlc]) extends Upstream {
val amountIn: MilliSatoshi = adds.map(_.amountMsat).sum
val expiryIn: CltvExpiry = adds.map(_.cltvExpiry).min
case class Trampoline(adds: Seq[ReceivedHtlc]) extends Upstream {
val amountIn: MilliSatoshi = adds.map(_.add.amountMsat).sum
val expiryIn: CltvExpiry = adds.map(_.add.cltvExpiry).min
}

case class ReceivedHtlc(add: UpdateAddHtlc, receivedAt: TimestampMilli)
}
// @formatter:on

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,10 @@ import fr.acinq.eclair.payment.{ChannelPaymentRelayed, IncomingPaymentPacket}
import fr.acinq.eclair.wire.protocol.FailureMessageCodecs.createBadOnionFailure
import fr.acinq.eclair.wire.protocol.PaymentOnion.IntermediatePayload
import fr.acinq.eclair.wire.protocol._
import fr.acinq.eclair.{Logs, NodeParams, TimestampSecond, channel, nodeFee}
import fr.acinq.eclair.{Logs, NodeParams, TimestampMilli, TimestampSecond, channel, nodeFee}

import java.util.UUID
import java.util.concurrent.TimeUnit
import scala.concurrent.duration.DurationLong
import scala.util.Random

Expand Down Expand Up @@ -103,7 +104,8 @@ class ChannelRelay private(nodeParams: NodeParams,
register: ActorRef,
channels: Map[ByteVector32, Relayer.OutgoingChannel],
r: IncomingPaymentPacket.ChannelRelayPacket,
context: ActorContext[ChannelRelay.Command]) {
context: ActorContext[ChannelRelay.Command],
startedAt: TimestampMilli = TimestampMilli.now()) {

import ChannelRelay._

Expand Down Expand Up @@ -155,13 +157,15 @@ class ChannelRelay private(nodeParams: NodeParams,
case WrappedAddResponse(RES_ADD_SETTLED(o: Origin.ChannelRelayedHot, htlc, fulfill: HtlcResult.Fulfill)) =>
context.log.debug("relaying fulfill to upstream")
val cmd = CMD_FULFILL_HTLC(o.originHtlcId, fulfill.paymentPreimage, commit = true)
context.system.eventStream ! EventStream.Publish(ChannelPaymentRelayed(o.amountIn, o.amountOut, htlc.paymentHash, o.originChannelId, htlc.channelId))
context.system.eventStream ! EventStream.Publish(ChannelPaymentRelayed(o.amountIn, o.amountOut, htlc.paymentHash, o.originChannelId, htlc.channelId, startedAt, TimestampMilli.now()))
recordRelayDuration(isSuccess = true)
safeSendAndStop(o.originChannelId, cmd)

case WrappedAddResponse(RES_ADD_SETTLED(o: Origin.ChannelRelayedHot, _, fail: HtlcResult.Fail)) =>
context.log.debug("relaying fail to upstream")
Metrics.recordPaymentRelayFailed(Tags.FailureType.Remote, Tags.RelayType.Channel)
val cmd = translateRelayFailure(o.originHtlcId, fail)
recordRelayDuration(isSuccess = false)
safeSendAndStop(o.originChannelId, cmd)
}

Expand Down Expand Up @@ -310,4 +314,9 @@ class ChannelRelay private(nodeParams: NodeParams,
}
}

private def recordRelayDuration(isSuccess: Boolean): Unit =
Metrics.RelayedPaymentDuration
.withTag(Tags.Relay, Tags.RelayType.Channel)
.withTag(Tags.Success, isSuccess)
.record((TimestampMilli.now() - startedAt).toMillis, TimeUnit.MILLISECONDS)
}
Loading

0 comments on commit 9db0063

Please sign in to comment.