From 5a70982044570878cbe2877cc2ed7db0ecae1cf4 Mon Sep 17 00:00:00 2001 From: Richard Myers Date: Wed, 29 Jan 2025 11:45:23 +0100 Subject: [PATCH 1/5] Add router support for batched splices A splice tx could involve more than one parent channel. The Router must track the set of channels spent by a given tx until matching channel announcements are received or the spend tx is deeply buried. If a splice tx spends more than one parent channel between the same nodes, then there's no way to know which new channel announcement corresponds to which parent channel. We simply update the first one found. --- .../scala/fr/acinq/eclair/router/Router.scala | 6 +- .../fr/acinq/eclair/router/Validation.scala | 68 +++++++--- .../basic/channel/GossipIntegrationSpec.scala | 18 ++- .../fr/acinq/eclair/router/RouterSpec.scala | 126 +++++++++++++++++- 4 files changed, 187 insertions(+), 31 deletions(-) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala b/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala index f026d7f32e..a849fb42c0 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala @@ -264,11 +264,11 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm val fundingTxId = d.channels.get(shortChannelId).orElse(d.prunedChannels.get(shortChannelId)).get.fundingTxId log.info("funding tx txId={} of channelId={} has been spent by txId={}: waiting for the spending tx to have enough confirmations before removing the channel from the graph", fundingTxId, shortChannelId, spendingTx.txid) watcher ! WatchTxConfirmed(self, spendingTx.txid, ANNOUNCEMENTS_MINCONF * 2) - stay() using d.copy(spentChannels = d.spentChannels + (spendingTx.txid -> shortChannelId)) + stay() using d.copy(spentChannels = d.spentChannels.updated(spendingTx.txid, d.spentChannels.getOrElse(spendingTx.txid, Set.empty) + shortChannelId)) case Event(WatchTxConfirmedTriggered(_, _, spendingTx), d) => d.spentChannels.get(spendingTx.txid) match { - case Some(shortChannelId) => stay() using Validation.handleChannelSpent(d, watcher, nodeParams.db.network, spendingTx.txid, shortChannelId) + case Some(shortChannelIds) => stay() using Validation.handleChannelSpent(d, watcher, nodeParams.db.network, spendingTx.txid, shortChannelIds) case None => stay() } @@ -771,7 +771,7 @@ object Router { excludedChannels: Map[ChannelDesc, ExcludedChannelStatus], // those channels are temporarily excluded from route calculation, because their node returned a TemporaryChannelFailure graphWithBalances: GraphWithBalanceEstimates, sync: Map[PublicKey, Syncing], // keep tracks of channel range queries sent to each peer. If there is an entry in the map, it means that there is an ongoing query for which we have not yet received an 'end' message - spentChannels: Map[TxId, RealShortChannelId], // transactions that spend funding txs that are not yet deeply buried + spentChannels: Map[TxId, Set[RealShortChannelId]], // transactions that spend funding txs that are not yet deeply buried ) { def resolve(scid: ShortChannelId): Option[KnownChannel] = { // let's assume this is a real scid diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala b/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala index ed6dd28fdf..482b3f7b54 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala @@ -113,7 +113,14 @@ object Validation { log.debug("validation successful for shortChannelId={}", c.shortChannelId) remoteOrigins.foreach(o => sendDecision(o.peerConnection, GossipDecision.Accepted(c))) val capacity = tx.txOut(outputIndex).amount - d0.spentChannels.get(tx.txid) match { + val parentScid = d0.spentChannels.get(tx.txid).flatMap { parentScids => + parentScids.find { parentScid => + d0.channels.get(parentScid).exists { parent => + Set(c.nodeId1, c.nodeId2) == Set(parent.nodeId1, parent.nodeId2) + } + } + } + parentScid match { case Some(parentScid) => d0.channels.get(parentScid) match { case Some(parentChannel) => @@ -191,9 +198,12 @@ object Validation { // we need to update the graph because the edge identifiers and capacity change from the parent scid to the new splice scid log.debug("updating the graph for shortChannelId={}", newPubChan.shortChannelId) val graph1 = d.graphWithBalances.updateChannel(ChannelDesc(parentChannel.shortChannelId, parentChannel.nodeId1, parentChannel.nodeId2), ann.shortChannelId, capacity) - val spendingTxs = d.spentChannels.filter(_._2 == parentChannel.shortChannelId).keySet - spendingTxs.foreach(txId => watcher ! UnwatchTxConfirmed(txId)) - val spentChannels1 = d.spentChannels -- spendingTxs + val spentChannels1 = d.spentChannels.collect { + case (txId, parentScids) if (parentScids - parentChannel.shortChannelId).nonEmpty => + txId -> (parentScids - parentChannel.shortChannelId) + } + (d.spentChannels -- spentChannels1.keys).keys.foreach(txId => watcher ! UnwatchTxConfirmed(txId)) + d.copy( // we also add the splice scid -> channelId and remove the parent scid -> channelId mappings channels = d.channels + (newPubChan.shortChannelId -> newPubChan) - parentChannel.shortChannelId, @@ -267,34 +277,41 @@ object Validation { } else d1 } - def handleChannelSpent(d: Data, watcher: typed.ActorRef[ZmqWatcher.Command], db: NetworkDb, spendingTxId: TxId, shortChannelId: RealShortChannelId)(implicit ctx: ActorContext, log: LoggingAdapter): Data = { + def handleChannelSpent(d: Data, watcher: typed.ActorRef[ZmqWatcher.Command], db: NetworkDb, spendingTxId: TxId, shortChannelIds: Set[RealShortChannelId])(implicit ctx: ActorContext, log: LoggingAdapter): Data = { implicit val sender: ActorRef = ctx.self // necessary to preserve origin when sending messages to other actors - val lostChannel = d.channels.get(shortChannelId).orElse(d.prunedChannels.get(shortChannelId)).get - log.info("funding tx for channelId={} was spent", shortChannelId) + + val lostChannels = shortChannelIds.map(shortChannelId => d.channels.get(shortChannelId).orElse(d.prunedChannels.get(shortChannelId)).get) + log.info("funding tx for channelIds={} was spent", shortChannelIds) // we need to remove nodes that aren't tied to any channels anymore - val channels1 = d.channels - shortChannelId - val prunedChannels1 = d.prunedChannels - shortChannelId - val lostNodes = Seq(lostChannel.nodeId1, lostChannel.nodeId2).filterNot(nodeId => hasChannels(nodeId, channels1.values)) + val channels1 = d.channels -- shortChannelIds + val prunedChannels1 = d.prunedChannels -- shortChannelIds + val lostNodes = lostChannels.flatMap(lostChannel => Seq(lostChannel.nodeId1, lostChannel.nodeId2).filterNot(nodeId => hasChannels(nodeId, channels1.values))) // let's clean the db and send the events - log.info("pruning shortChannelId={} (spent)", shortChannelId) - db.removeChannel(shortChannelId) // NB: this also removes channel updates + log.info("pruning shortChannelIds={} (spent)", shortChannelIds) + shortChannelIds.foreach(db.removeChannel(_)) // NB: this also removes channel updates // we also need to remove updates from the graph - val graphWithBalances1 = d.graphWithBalances - .removeChannel(ChannelDesc(lostChannel.shortChannelId, lostChannel.nodeId1, lostChannel.nodeId2)) + val graphWithBalances1 = lostChannels.foldLeft(d.graphWithBalances) { (graph, lostChannel) => + graph.removeChannel(ChannelDesc(lostChannel.shortChannelId, lostChannel.nodeId1, lostChannel.nodeId2)) + } // we notify front nodes - ctx.system.eventStream.publish(ChannelLost(shortChannelId)) + shortChannelIds.foreach(shortChannelId => ctx.system.eventStream.publish(ChannelLost(shortChannelId))) lostNodes.foreach { nodeId => log.info("pruning nodeId={} (spent)", nodeId) db.removeNode(nodeId) ctx.system.eventStream.publish(NodeLost(nodeId)) } - // we no longer need to track this or alternative transactions that spent the parent channel - // either this channel was really closed, or it was spliced and the announcement was not received in time - // we will re-add a spliced channel as a new channel later when we receive the announcement - watcher ! UnwatchExternalChannelSpent(lostChannel.fundingTxId, outputIndex(lostChannel.ann.shortChannelId)) - val spendingTxs = d.spentChannels.filter(_._2 == shortChannelId).keySet - // stop watching the spending txs that will never confirm, we already got confirmations for this spending tx + lostChannels.foreach { + lostChannel => + // we no longer need to track this or alternative transactions that spent the parent channel + // either this channel was really closed, or it was spliced and the announcement was not received in time + // we will re-add a spliced channel as a new channel later when we receive the announcement + watcher ! UnwatchExternalChannelSpent(lostChannel.fundingTxId, outputIndex(lostChannel.ann.shortChannelId)) + } + + // We do not need to track other spending txs if one or more of their inputs have been spent by this spending tx + val spendingTxs = d.spentChannels.filter(_._2.intersect(shortChannelIds).nonEmpty).keySet + // stop watching the spending txs that will never confirm because this alternate spending tx confirmed (spendingTxs - spendingTxId).foreach(txId => watcher ! UnwatchTxConfirmed(txId)) val spentChannels1 = d.spentChannels -- spendingTxs d.copy(nodes = d.nodes -- lostNodes, channels = channels1, prunedChannels = prunedChannels1, graphWithBalances = graphWithBalances1, spentChannels = spentChannels1) @@ -599,7 +616,14 @@ object Validation { log.debug("this is a known pruned local channel, processing channel_update for channelId={} scid={}", lcu.channelId, ann.shortChannelId) handleChannelUpdate(d, db, nodeParams.currentBlockHeight, Left(lcu)) case Some(ann) => - val d1 = d.spentChannels.get(ann.fundingTxId).flatMap(parentScid => d.channels.get(parentScid)) match { + // A single transaction may splice multiple channels (batching), in which case we have multiple parent + // channels. We cannot know which parent channel this announcement corresponds to, but it doesn't matter. + // We only need to update one of the parent channels between the same set of nodes to correctly update + // our graph. + val d1 = d.spentChannels + .getOrElse(ann.fundingTxId, Set.empty) + .flatMap(d.channels.get) + .find(parent => parent.nodeId1 == ann.announcement.nodeId1 && parent.nodeId2 == ann.announcement.nodeId2) match { case Some(parentChannel) => // This is a splice for which we haven't processed the (local) channel_announcement yet. log.debug("processing channel_announcement for local splice with fundingTxId={} channelId={} scid={} (previous={})", ann.fundingTxId, lcu.channelId, ann.shortChannelId, parentChannel.shortChannelId) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/integration/basic/channel/GossipIntegrationSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/integration/basic/channel/GossipIntegrationSpec.scala index 1911f21c44..cc89874819 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/integration/basic/channel/GossipIntegrationSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/integration/basic/channel/GossipIntegrationSpec.scala @@ -52,13 +52,19 @@ class GossipIntegrationSpec extends FixtureSpec with IntegrationPatience { val channels = getPeerChannels(alice, bob.nodeId) ++ getPeerChannels(bob, carol.nodeId) assert(channels.map(_.data.channelId).toSet == Set(channelId_ab, channelId_bc)) - eventually { + val scid_ab = eventually { assert(getChannelData(alice, channelId_ab).asInstanceOf[DATA_NORMAL].commitments.latest.localFundingStatus.isInstanceOf[LocalFundingStatus.ConfirmedFundingTx]) assert(getChannelData(bob, channelId_bc).asInstanceOf[DATA_NORMAL].commitments.latest.localFundingStatus.isInstanceOf[LocalFundingStatus.ConfirmedFundingTx]) + val scid_ab = getChannelData(alice, channelId_ab).asInstanceOf[DATA_NORMAL].commitments.latest.shortChannelId_opt.get + // Wait for Alice to receive both initial local channel updates. + inside(getRouterData(alice)) { routerData => + val channel_ab = routerData.channels(scid_ab) + Seq(channel_ab.update_1_opt, channel_ab.update_2_opt).flatten.foreach(u => assert(u.shortChannelId == scid_ab)) + } + scid_ab } // We splice in to increase the capacity of the alice->bob channel. - val scid_ab = getChannelData(alice, channelId_ab).asInstanceOf[DATA_NORMAL].commitments.latest.shortChannelId_opt.get val spliceTxId = spliceIn(alice, channelId_ab, 100_000 sat, None).asInstanceOf[RES_SPLICE].fundingTxId // The announcement for the splice transaction and the corresponding channel updates are broadcast. @@ -73,10 +79,12 @@ class GossipIntegrationSpec extends FixtureSpec with IntegrationPatience { val splice_scid_ab = channelData_alice.commitments.latest.shortChannelId_opt.get assert(splice_scid_ab != scid_ab) assert(channelData_bob.commitments.latest.shortChannelId_opt.contains(splice_scid_ab)) + val scid_bc = getChannelData(bob, channelId_bc).asInstanceOf[DATA_NORMAL].commitments.latest.shortChannelId_opt.get // Alice creates a channel_announcement for the splice transaction and updates the graph. val spliceAnn = inside(getRouterData(alice)) { routerData => - assert(routerData.channels.contains(splice_scid_ab)) + assert(routerData.channels.keys == Set(splice_scid_ab, scid_bc)) + assert(routerData.spentChannels.isEmpty) val channel_ab = routerData.channels(splice_scid_ab) assert(channel_ab.capacity == 200_000.sat) assert(channel_ab.update_1_opt.nonEmpty && channel_ab.update_2_opt.nonEmpty) @@ -91,6 +99,8 @@ class GossipIntegrationSpec extends FixtureSpec with IntegrationPatience { // Bob also creates a channel_announcement for the splice transaction and updates the graph. inside(getRouterData(bob)) { routerData => + assert(routerData.channels.keys == Set(splice_scid_ab, scid_bc)) + assert(routerData.spentChannels.isEmpty) assert(routerData.channels.get(splice_scid_ab).map(_.ann).contains(spliceAnn)) routerData.channels.get(splice_scid_ab).foreach(c => { assert(c.capacity == 200_000.sat) @@ -106,6 +116,8 @@ class GossipIntegrationSpec extends FixtureSpec with IntegrationPatience { // The channel_announcement for the splice propagates to Carol. inside(getRouterData(carol)) { routerData => + assert(routerData.channels.keys == Set(splice_scid_ab, scid_bc)) + assert(routerData.spentChannels.isEmpty) assert(routerData.channels.get(splice_scid_ab).map(_.ann).contains(spliceAnn)) routerData.channels.get(splice_scid_ab).foreach(c => { assert(c.capacity == 200_000.sat) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/router/RouterSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/router/RouterSpec.scala index 4736d68603..a0b24a16a4 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/router/RouterSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/router/RouterSpec.scala @@ -330,6 +330,8 @@ class RouterSpec extends BaseRouterSpec { nextFundingTx } + def batchSpendingTx(spendingTxs: Seq[Transaction]): Transaction = Transaction(version = 0, txIn = spendingTxs.flatMap(_.txIn), txOut = spendingTxs.flatMap(_.txOut), lockTime = 0) + test("properly announce lost channels and nodes") { fixture => import fixture._ val eventListener = TestProbe() @@ -1201,14 +1203,14 @@ class RouterSpec extends BaseRouterSpec { assert(g.getEdge(ChannelDesc(scid_ab, b, a)).isEmpty) assert(edge_ab.capacity == capacity2 && edge_ba.capacity == capacity2) - // The channel update for the splice is confirmed and the channel is not removed. - router ! WatchTxConfirmedTriggered(BlockHeight(0), 0, spendingTx(funding_a, funding_b)) + // The channel update for the splice is confirmed and the channel is updated. + router ! WatchTxConfirmedTriggered(BlockHeight(0), 0, spliceTx2) eventListener.expectMsg(ChannelsDiscovered(SingleChannelDiscovered(spliceAnn, capacity2, None, None) :: Nil)) eventListener.expectMsg(ChannelLost(scid_ab)) peerConnection.expectNoMessage(100 millis) eventListener.expectNoMessage(100 millis) - // The router no longer tracks the parent scid. + // The router no longer tracks the parent scids. val probe = TestProbe() awaitAssert({ probe.send(router, GetRouterData) @@ -1218,4 +1220,122 @@ class RouterSpec extends BaseRouterSpec { }) } + test("update two existing channels with a batch splice") { fixture => + import fixture._ + + val eventListener = TestProbe() + system.eventStream.subscribe(eventListener.ref, classOf[NetworkEvent]) + val peerConnection = TestProbe() + + // Channel ab1 and ab2 will both be spent by the same splice tx. + val spliceTx1 = spendingTx(funding_a, funding_b, publicChannelCapacity - 100_000.sat) + val spliceTx2 = spendingTx(funding_b, funding_c, publicChannelCapacity + 50_000.sat) + val batchSpliceTx = batchSpendingTx(Seq(spliceTx1, spliceTx2)) + + // Channel ab is spent by a splice tx1. + router ! WatchExternalChannelSpentTriggered(scid_ab, spliceTx1) + inside(watcher.expectMsgType[WatchTxConfirmed]) { w => + assert(w.txId == spliceTx1.txid) + assert(w.minDepth == 12) + } + + // Channel bc is spent by a splice tx2. + router ! WatchExternalChannelSpentTriggered(scid_bc, spliceTx2) + inside(watcher.expectMsgType[WatchTxConfirmed]) { w => + assert(w.txId == spliceTx2.txid) + assert(w.minDepth == 12) + } + + // Channel ab is spent by the batch splice tx. + router ! WatchExternalChannelSpentTriggered(scid_ab, batchSpliceTx) + inside(watcher.expectMsgType[WatchTxConfirmed]) { w => + assert(w.txId == batchSpliceTx.txid) + assert(w.minDepth == 12) + } + + // Channel bc is also spent by the batch splice tx. + router ! WatchExternalChannelSpentTriggered(scid_bc, batchSpliceTx) + inside(watcher.expectMsgType[WatchTxConfirmed]) { w => + assert(w.txId == batchSpliceTx.txid) + assert(w.minDepth == 12) + } + + // Channels ab and bc are also both spent by an RBF of the batch splice tx. + val capacityAB_RBF = publicChannelCapacity - 100_000.sat - 1000.sat + val batchSpliceTx_RBF = batchSpendingTx(Seq(spendingTx(funding_a, funding_b, capacityAB_RBF), spliceTx2)) + router ! WatchExternalChannelSpentTriggered(scid_ab, batchSpliceTx_RBF) + inside(watcher.expectMsgType[WatchTxConfirmed]) { w => + assert(w.txId == batchSpliceTx_RBF.txid) + assert(w.minDepth == 12) + } + router ! WatchExternalChannelSpentTriggered(scid_bc, batchSpliceTx_RBF) + inside(watcher.expectMsgType[WatchTxConfirmed]) { w => + assert(w.txId == batchSpliceTx_RBF.txid) + assert(w.minDepth == 12) + } + + // The router tracks the possible spending txs for scid ab and scid bc. + awaitAssert({ + val probe = TestProbe() + probe.send(router, GetRouterData) + val routerData = probe.expectMsgType[Data] + assert(routerData.spentChannels(spliceTx1.txid) == Set(scid_ab)) + assert(routerData.spentChannels(spliceTx2.txid) == Set(scid_bc)) + assert(routerData.spentChannels(batchSpliceTx.txid) == Set(scid_ab, scid_bc)) + assert(routerData.spentChannels(batchSpliceTx_RBF.txid) == Set(scid_ab, scid_bc)) + }) + + // The splice of channel ab is announced. + val spliceScid = RealShortChannelId(BlockHeight(450000), 1, 0) + val spliceAnn = channelAnnouncement(spliceScid, priv_a, priv_b, priv_funding_a, priv_funding_b) + peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, spliceAnn)) + peerConnection.expectNoMessage(100 millis) + assert(watcher.expectMsgType[ValidateRequest].ann == spliceAnn) + watcher.send(router, ValidateResult(spliceAnn, Right(batchSpliceTx_RBF, UtxoStatus.Unspent))) + peerConnection.expectMsg(TransportHandler.ReadAck(spliceAnn)) + peerConnection.expectMsg(GossipDecision.Accepted(spliceAnn)) + assert(peerConnection.sender() == router) + + // And the graph should be updated too. + val sender = TestProbe() + sender.send(router, Router.GetRouterData) + val routerData = sender.expectMsgType[Data] + val g = routerData.graphWithBalances.graph + val edge_ab = g.getEdge(ChannelDesc(spliceScid, a, b)).get + val edge_ba = g.getEdge(ChannelDesc(spliceScid, b, a)).get + assert(g.getEdge(ChannelDesc(scid_ab, a, b)).isEmpty) + assert(g.getEdge(ChannelDesc(scid_ab, b, a)).isEmpty) + assert(edge_ab.capacity == capacityAB_RBF && edge_ba.capacity == capacityAB_RBF) + + // The router still tracks the possible spending txs for scid bc, but not scid ab. + awaitAssert({ + val probe = TestProbe() + probe.send(router, GetRouterData) + val routerData = probe.expectMsgType[Data] + assert(!routerData.spentChannels.contains(spliceTx1.txid)) + assert(routerData.spentChannels(spliceTx2.txid) == Set(scid_bc)) + assert(routerData.spentChannels(batchSpliceTx.txid) == Set(scid_bc)) + assert(routerData.spentChannels(batchSpliceTx_RBF.txid) == Set(scid_bc)) + }) + + // The splice channel update for the scid ab splice confirms and the channel is updated. + router ! WatchTxConfirmedTriggered(BlockHeight(0), 0, batchSpliceTx_RBF) + eventListener.expectMsg(ChannelsDiscovered(SingleChannelDiscovered(spliceAnn, capacityAB_RBF, None, None) :: Nil)) + eventListener.expectMsg(ChannelLost(scid_ab)) + // No splice channel update for scid bc was received yet so it is removed. + eventListener.expectMsg(ChannelLost(scid_bc)) + peerConnection.expectNoMessage(100 millis) + eventListener.expectNoMessage(100 millis) + + // The router no longer tracks the parent scids. + awaitAssert({ + val probe = TestProbe() + probe.send(router, GetRouterData) + val routerData = probe.expectMsgType[Data] + assert(routerData.spentChannels.isEmpty) + assert(!routerData.channels.contains(scid_ab)) + assert(!routerData.channels.contains(scid_bc)) + }) + } + } From dcfde3ac3283f943697ce1b78ed0821d48c7e0a2 Mon Sep 17 00:00:00 2001 From: Richard Myers Date: Fri, 31 Jan 2025 16:28:49 +0100 Subject: [PATCH 2/5] Fix issues found by t-bast Also added a function in `BaseRouterSpec` to add a second b-c channel for the batch splice test in `Routerspec`. The alternative to adding a 2nd b-c channel in `BaseRouterSpec` for all tests requires too many other tests to need updating. --- .../fr/acinq/eclair/router/Validation.scala | 46 ++-- .../acinq/eclair/router/BaseRouterSpec.scala | 36 +++ .../fr/acinq/eclair/router/RouterSpec.scala | 251 +++++++++++------- 3 files changed, 205 insertions(+), 128 deletions(-) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala b/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala index 482b3f7b54..0e5ecb9468 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/router/Validation.scala @@ -113,25 +113,16 @@ object Validation { log.debug("validation successful for shortChannelId={}", c.shortChannelId) remoteOrigins.foreach(o => sendDecision(o.peerConnection, GossipDecision.Accepted(c))) val capacity = tx.txOut(outputIndex).amount - val parentScid = d0.spentChannels.get(tx.txid).flatMap { parentScids => - parentScids.find { parentScid => - d0.channels.get(parentScid).exists { parent => - Set(c.nodeId1, c.nodeId2) == Set(parent.nodeId1, parent.nodeId2) - } - } - } - parentScid match { - case Some(parentScid) => - d0.channels.get(parentScid) match { - case Some(parentChannel) => - Some(updateSplicedPublicChannel(d0, nodeParams, watcher, c, tx.txid, capacity, parentChannel)) - case None => - log.error("spent parent channel shortChannelId={} not found for splice shortChannelId={}", parentScid, c.shortChannelId) - val spendingTxs = d0.spentChannels.filter(_._2 == parentScid).keySet - spendingTxs.foreach(txId => watcher ! UnwatchTxConfirmed(txId)) - val d1 = d0.copy(spentChannels = d0.spentChannels -- spendingTxs) - Some(addPublicChannel(d1, nodeParams, watcher, c, tx.txid, capacity, None)) - } + // A single transaction may splice multiple channels (batching), in which case we have multiple parent + // channels. We cannot know which parent channel this announcement corresponds to, but it doesn't matter. + // We only need to update one of the parent channels between the same set of nodes to correctly update + // our graph. + val parentChannel_opt = d0.spentChannels + .getOrElse(tx.txid, Set.empty) + .flatMap(d0.channels.get) + .find(parent => parent.nodeId1 == c.nodeId1 && parent.nodeId2 == c.nodeId2) + parentChannel_opt match { + case Some(parentChannel) => Some(updateSplicedPublicChannel(d0, nodeParams, watcher, c, tx.txid, capacity, parentChannel)) case None => Some(addPublicChannel(d0, nodeParams, watcher, c, tx.txid, capacity, None)) } } @@ -202,8 +193,8 @@ object Validation { case (txId, parentScids) if (parentScids - parentChannel.shortChannelId).nonEmpty => txId -> (parentScids - parentChannel.shortChannelId) } - (d.spentChannels -- spentChannels1.keys).keys.foreach(txId => watcher ! UnwatchTxConfirmed(txId)) - + // No need to keep watching transactions that have been removed from spentChannels. + (d.spentChannels.keySet -- spentChannels1.keys).foreach(txId => watcher ! UnwatchTxConfirmed(txId)) d.copy( // we also add the splice scid -> channelId and remove the parent scid -> channelId mappings channels = d.channels + (newPubChan.shortChannelId -> newPubChan) - parentChannel.shortChannelId, @@ -279,15 +270,14 @@ object Validation { def handleChannelSpent(d: Data, watcher: typed.ActorRef[ZmqWatcher.Command], db: NetworkDb, spendingTxId: TxId, shortChannelIds: Set[RealShortChannelId])(implicit ctx: ActorContext, log: LoggingAdapter): Data = { implicit val sender: ActorRef = ctx.self // necessary to preserve origin when sending messages to other actors - - val lostChannels = shortChannelIds.map(shortChannelId => d.channels.get(shortChannelId).orElse(d.prunedChannels.get(shortChannelId)).get) - log.info("funding tx for channelIds={} was spent", shortChannelIds) + val lostChannels = shortChannelIds.flatMap(shortChannelId => d.channels.get(shortChannelId).orElse(d.prunedChannels.get(shortChannelId))) + log.info("funding tx for channelIds={} was spent", shortChannelIds.mkString(",")) // we need to remove nodes that aren't tied to any channels anymore val channels1 = d.channels -- shortChannelIds val prunedChannels1 = d.prunedChannels -- shortChannelIds val lostNodes = lostChannels.flatMap(lostChannel => Seq(lostChannel.nodeId1, lostChannel.nodeId2).filterNot(nodeId => hasChannels(nodeId, channels1.values))) // let's clean the db and send the events - log.info("pruning shortChannelIds={} (spent)", shortChannelIds) + log.info("pruning shortChannelIds={} (spent)", shortChannelIds.mkString(",")) shortChannelIds.foreach(db.removeChannel(_)) // NB: this also removes channel updates // we also need to remove updates from the graph val graphWithBalances1 = lostChannels.foldLeft(d.graphWithBalances) { (graph, lostChannel) => @@ -309,9 +299,11 @@ object Validation { watcher ! UnwatchExternalChannelSpent(lostChannel.fundingTxId, outputIndex(lostChannel.ann.shortChannelId)) } - // We do not need to track other spending txs if one or more of their inputs have been spent by this spending tx + // We may have received RBF candidates for this splice: we can find them by looking at transactions that spend one + // of the channels we're removing (note that they may spend a slightly different set of channels). + // Those transactions cannot confirm anymore (they have been double-spent by the current one), so we should stop + // watching them. val spendingTxs = d.spentChannels.filter(_._2.intersect(shortChannelIds).nonEmpty).keySet - // stop watching the spending txs that will never confirm because this alternate spending tx confirmed (spendingTxs - spendingTxId).foreach(txId => watcher ! UnwatchTxConfirmed(txId)) val spentChannels1 = d.spentChannels -- spendingTxs d.copy(nodes = d.nodes -- lostNodes, channels = channels1, prunedChannels = prunedChannels1, graphWithBalances = graphWithBalances1, spentChannels = spentChannels1) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/router/BaseRouterSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/router/BaseRouterSpec.scala index e28dfb66c0..7090f6853f 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/router/BaseRouterSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/router/BaseRouterSpec.scala @@ -231,6 +231,42 @@ abstract class BaseRouterSpec extends TestKitBaseClass with FixtureAnyFunSuiteLi } } + def addChannel(router: ActorRef, watcher: TestProbe, scid: RealShortChannelId, priv1: PrivateKey, priv2: PrivateKey, priv_funding1: PrivateKey, priv_funding2: PrivateKey): (ChannelAnnouncement, ChannelUpdate, ChannelUpdate) = { + val channelAnnoucement = channelAnnouncement(scid, priv1, priv2, priv_funding1, priv_funding2) + val pub1 = priv1.publicKey + val pub2 = priv2.publicKey + val update1 = makeChannelUpdate(Block.RegtestGenesisBlock.hash, priv1, pub2, scid, CltvExpiryDelta(7), htlcMinimumMsat = 0 msat, feeBaseMsat = 10 msat, feeProportionalMillionths = 10, htlcMaximumMsat = htlcMaximum) + val update2 = makeChannelUpdate(Block.RegtestGenesisBlock.hash, priv2, pub1, scid, CltvExpiryDelta(7), htlcMinimumMsat = 0 msat, feeBaseMsat = 10 msat, feeProportionalMillionths = 10, htlcMaximumMsat = htlcMaximum) + val pub_funding1 = priv_funding1.publicKey + val pub_funding2 = priv_funding2.publicKey + assert(ChannelDesc(update1, channelAnnoucement) == ChannelDesc(channelAnnoucement.shortChannelId, pub1, pub2)) + val sender1 = TestProbe() + val peerConnection = TestProbe() + peerConnection.ignoreMsg { case _: TransportHandler.ReadAck => true } + peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, channelAnnoucement)) + peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, update1)) + peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, update2)) + assert(watcher.expectMsgType[ValidateRequest].ann == channelAnnoucement) + watcher.send(router, ValidateResult(channelAnnoucement, Right((Transaction(version = 0, txIn = Nil, txOut = TxOut(publicChannelCapacity, write(pay2wsh(Scripts.multiSig2of2(pub_funding1, pub_funding2)))) :: Nil, lockTime = 0), UtxoStatus.Unspent)))) + assert(watcher.expectMsgType[WatchExternalChannelSpent].shortChannelId == scid) + peerConnection.expectMsgAllOf( + GossipDecision.Accepted(channelAnnoucement), + GossipDecision.Accepted(update1), + GossipDecision.Accepted(update2) + ) + peerConnection.expectNoMessage() + awaitCond({ + sender1.send(router, GetNodes) + val nodes = sender1.expectMsgType[Iterable[NodeAnnouncement]] + sender1.send(router, GetChannels) + val channels = sender1.expectMsgType[Iterable[ChannelAnnouncement]] + sender1.send(router, GetChannelUpdates) + val updates = sender1.expectMsgType[Iterable[ChannelUpdate]] + nodes.size == 8 && channels.size == 6 && updates.size == 14 + }, max = 10 seconds, interval = 1 second) + (channelAnnoucement, update1, update2) + } + } object BaseRouterSpec { diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/router/RouterSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/router/RouterSpec.scala index a0b24a16a4..61808d1ade 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/router/RouterSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/router/RouterSpec.scala @@ -326,11 +326,11 @@ class RouterSpec extends BaseRouterSpec { def spendingTx(node1: PublicKey, node2: PublicKey, capacity: Satoshi = publicChannelCapacity): Transaction = { val fundingScript = write(pay2wsh(Scripts.multiSig2of2(node1, node2))) - val nextFundingTx = Transaction(version = 0, txIn = TxIn(OutPoint(fundingTx(node1, node2, capacity), 0), fundingScript, 0) :: Nil, txOut = TxOut(capacity, fundingScript) :: Nil, lockTime = 0) + val nextFundingTx = Transaction(version = 2, txIn = TxIn(OutPoint(fundingTx(node1, node2, capacity), 0), fundingScript, 0) :: Nil, txOut = TxOut(capacity, fundingScript) :: Nil, lockTime = 0) nextFundingTx } - def batchSpendingTx(spendingTxs: Seq[Transaction]): Transaction = Transaction(version = 0, txIn = spendingTxs.flatMap(_.txIn), txOut = spendingTxs.flatMap(_.txOut), lockTime = 0) + def batchSpendingTx(spendingTxs: Seq[Transaction]): Transaction = Transaction(version = 2, txIn = spendingTxs.flatMap(_.txIn), txOut = spendingTxs.flatMap(_.txOut), lockTime = 0) test("properly announce lost channels and nodes") { fixture => import fixture._ @@ -1155,12 +1155,39 @@ class RouterSpec extends BaseRouterSpec { } } + def processSpliceChannelAnnouncement(fixture: FixtureParam, parentScid: RealShortChannelId, channelAnnouncement: ChannelAnnouncement, spliceTx: Transaction, newCapacity: Satoshi): Unit = { + import fixture._ + // A splice of the channel is announced and validated. + val peerConnection = TestProbe() + peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, channelAnnouncement)) + peerConnection.expectNoMessage(100 millis) + assert(watcher.expectMsgType[ValidateRequest].ann == channelAnnouncement) + watcher.send(router, ValidateResult(channelAnnouncement, Right(spliceTx, UtxoStatus.Unspent))) + assert(watcher.expectMsgType[WatchExternalChannelSpent].shortChannelId == channelAnnouncement.shortChannelId) + watcher.expectMsgType[UnwatchExternalChannelSpent] // unwatch the parent channel + peerConnection.expectMsg(TransportHandler.ReadAck(channelAnnouncement)) + peerConnection.expectMsg(GossipDecision.Accepted(channelAnnouncement)) + assert(peerConnection.sender() == router) + + // And the graph should be updated too. + val sender = TestProbe() + sender.send(router, Router.GetRouterData) + inside(sender.expectMsgType[Data]) { routerData => + val g = routerData.graphWithBalances.graph + val edge_ab = g.getEdge(ChannelDesc(channelAnnouncement.shortChannelId, channelAnnouncement.nodeId1, channelAnnouncement.nodeId2)).get + val edge_ba = g.getEdge(ChannelDesc(channelAnnouncement.shortChannelId, channelAnnouncement.nodeId2, channelAnnouncement.nodeId1)).get + assert(g.getEdge(ChannelDesc(parentScid, channelAnnouncement.nodeId1, channelAnnouncement.nodeId2)).isEmpty) + assert(g.getEdge(ChannelDesc(parentScid, channelAnnouncement.nodeId2, channelAnnouncement.nodeId1)).isEmpty) + assert(newCapacity == spliceTx.txOut(channelAnnouncement.shortChannelId.outputIndex).amount) + assert(edge_ab.capacity == newCapacity && edge_ba.capacity == newCapacity) + } + } + test("update an existing channel after a splice") { fixture => import fixture._ val eventListener = TestProbe() system.eventStream.subscribe(eventListener.ref, classOf[NetworkEvent]) - val peerConnection = TestProbe() // Channel ab is spent by a splice tx. val capacity1 = publicChannelCapacity - 100_000.sat @@ -1173,8 +1200,8 @@ class RouterSpec extends BaseRouterSpec { eventListener.expectNoMessage(100 millis) // Channel ab is spent and confirmed by an RBF of splice tx. - val capacity2 = publicChannelCapacity - 100_000.sat - 1000.sat - val spliceTx2 = spendingTx(funding_a, funding_b, capacity2) + val newCapacity = publicChannelCapacity - 100_000.sat - 1000.sat + val spliceTx2 = spendingTx(funding_a, funding_b, newCapacity) router ! WatchExternalChannelSpentTriggered(scid_ab, spliceTx2) inside(watcher.expectMsgType[WatchTxConfirmed]) { w => assert(w.txId == spliceTx2.txid) @@ -1185,30 +1212,7 @@ class RouterSpec extends BaseRouterSpec { // The splice of channel ab is announced. val spliceScid = RealShortChannelId(BlockHeight(450000), 1, 0) val spliceAnn = channelAnnouncement(spliceScid, priv_a, priv_b, priv_funding_a, priv_funding_b) - peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, spliceAnn)) - peerConnection.expectNoMessage(100 millis) - assert(watcher.expectMsgType[ValidateRequest].ann == spliceAnn) - watcher.send(router, ValidateResult(spliceAnn, Right(spliceTx2, UtxoStatus.Unspent))) - peerConnection.expectMsg(TransportHandler.ReadAck(spliceAnn)) - peerConnection.expectMsg(GossipDecision.Accepted(spliceAnn)) - assert(peerConnection.sender() == router) - - // And the graph should be updated too. - val sender = TestProbe() - sender.send(router, Router.GetRouterData) - val g = sender.expectMsgType[Data].graphWithBalances.graph - val edge_ab = g.getEdge(ChannelDesc(spliceScid, a, b)).get - val edge_ba = g.getEdge(ChannelDesc(spliceScid, b, a)).get - assert(g.getEdge(ChannelDesc(scid_ab, a, b)).isEmpty) - assert(g.getEdge(ChannelDesc(scid_ab, b, a)).isEmpty) - assert(edge_ab.capacity == capacity2 && edge_ba.capacity == capacity2) - - // The channel update for the splice is confirmed and the channel is updated. - router ! WatchTxConfirmedTriggered(BlockHeight(0), 0, spliceTx2) - eventListener.expectMsg(ChannelsDiscovered(SingleChannelDiscovered(spliceAnn, capacity2, None, None) :: Nil)) - eventListener.expectMsg(ChannelLost(scid_ab)) - peerConnection.expectNoMessage(100 millis) - eventListener.expectNoMessage(100 millis) + processSpliceChannelAnnouncement(fixture, scid_ab, spliceAnn, spliceTx2, newCapacity) // The router no longer tracks the parent scids. val probe = TestProbe() @@ -1220,49 +1224,64 @@ class RouterSpec extends BaseRouterSpec { }) } - test("update two existing channels with a batch splice") { fixture => + test("update multiple existing channels with a batch splice") { fixture => import fixture._ + // add second b-c channel + val scid_bc2 = RealShortChannelId(BlockHeight(420000), 7, 0) + addChannel(fixture.router, fixture.watcher, scid_bc2, priv_b, priv_c, priv_funding_b, priv_funding_c) + val eventListener = TestProbe() system.eventStream.subscribe(eventListener.ref, classOf[NetworkEvent]) - val peerConnection = TestProbe() - // Channel ab1 and ab2 will both be spent by the same splice tx. - val spliceTx1 = spendingTx(funding_a, funding_b, publicChannelCapacity - 100_000.sat) - val spliceTx2 = spendingTx(funding_b, funding_c, publicChannelCapacity + 50_000.sat) - val batchSpliceTx = batchSpendingTx(Seq(spliceTx1, spliceTx2)) + val newCapacity_ab = publicChannelCapacity - 100_000.sat + val newCapacity_bc = publicChannelCapacity + 50_000.sat + val spliceTx_ab = spendingTx(funding_a, funding_b, newCapacity_ab) + val spliceTx_bc = spendingTx(funding_b, funding_c, newCapacity_bc) + val spliceTx_bc2 = spendingTx(funding_b, funding_c) + val batchSpliceTx = batchSpendingTx(Seq(spliceTx_ab, spliceTx_bc, spliceTx_bc2)) - // Channel ab is spent by a splice tx1. - router ! WatchExternalChannelSpentTriggered(scid_ab, spliceTx1) + // Channel ab is spent by a splice tx. + router ! WatchExternalChannelSpentTriggered(scid_ab, spliceTx_ab) inside(watcher.expectMsgType[WatchTxConfirmed]) { w => - assert(w.txId == spliceTx1.txid) + assert(w.txId == spliceTx_ab.txid) assert(w.minDepth == 12) } - // Channel bc is spent by a splice tx2. - router ! WatchExternalChannelSpentTriggered(scid_bc, spliceTx2) + // Channel bc is spent by a splice tx. + router ! WatchExternalChannelSpentTriggered(scid_bc, spliceTx_bc) inside(watcher.expectMsgType[WatchTxConfirmed]) { w => - assert(w.txId == spliceTx2.txid) + assert(w.txId == spliceTx_bc.txid) + assert(w.minDepth == 12) + } + + // Channel bc2 is spent by a splice tx. + router ! WatchExternalChannelSpentTriggered(scid_bc2, spliceTx_bc2) + inside(watcher.expectMsgType[WatchTxConfirmed]) { w => + assert(w.txId == spliceTx_bc2.txid) assert(w.minDepth == 12) } - // Channel ab is spent by the batch splice tx. + // Channels ab, bc and bc2 are all spent by the same batch splice tx. router ! WatchExternalChannelSpentTriggered(scid_ab, batchSpliceTx) inside(watcher.expectMsgType[WatchTxConfirmed]) { w => assert(w.txId == batchSpliceTx.txid) assert(w.minDepth == 12) } - - // Channel bc is also spent by the batch splice tx. router ! WatchExternalChannelSpentTriggered(scid_bc, batchSpliceTx) inside(watcher.expectMsgType[WatchTxConfirmed]) { w => assert(w.txId == batchSpliceTx.txid) assert(w.minDepth == 12) } + router ! WatchExternalChannelSpentTriggered(scid_bc2, batchSpliceTx) + inside(watcher.expectMsgType[WatchTxConfirmed]) { w => + assert(w.txId == batchSpliceTx.txid) + assert(w.minDepth == 12) + } - // Channels ab and bc are also both spent by an RBF of the batch splice tx. - val capacityAB_RBF = publicChannelCapacity - 100_000.sat - 1000.sat - val batchSpliceTx_RBF = batchSpendingTx(Seq(spendingTx(funding_a, funding_b, capacityAB_RBF), spliceTx2)) + // Channels ab, bc and bc2 are also all spent by an RBF of the batch splice tx. + val newCapacity_ab_RBF = newCapacity_ab - 1000.sat + val batchSpliceTx_RBF = batchSpendingTx(Seq(spendingTx(funding_a, funding_b, newCapacity_ab_RBF), spliceTx_bc, spliceTx_bc2)) router ! WatchExternalChannelSpentTriggered(scid_ab, batchSpliceTx_RBF) inside(watcher.expectMsgType[WatchTxConfirmed]) { w => assert(w.txId == batchSpliceTx_RBF.txid) @@ -1273,69 +1292,99 @@ class RouterSpec extends BaseRouterSpec { assert(w.txId == batchSpliceTx_RBF.txid) assert(w.minDepth == 12) } + router ! WatchExternalChannelSpentTriggered(scid_bc2, batchSpliceTx_RBF) + inside(watcher.expectMsgType[WatchTxConfirmed]) { w => + assert(w.txId == batchSpliceTx_RBF.txid) + assert(w.minDepth == 12) + } - // The router tracks the possible spending txs for scid ab and scid bc. - awaitAssert({ - val probe = TestProbe() - probe.send(router, GetRouterData) - val routerData = probe.expectMsgType[Data] - assert(routerData.spentChannels(spliceTx1.txid) == Set(scid_ab)) - assert(routerData.spentChannels(spliceTx2.txid) == Set(scid_bc)) - assert(routerData.spentChannels(batchSpliceTx.txid) == Set(scid_ab, scid_bc)) - assert(routerData.spentChannels(batchSpliceTx_RBF.txid) == Set(scid_ab, scid_bc)) - }) + // The router tracks the possible spending txs for channels ab, bc and bc2. + val sender = TestProbe() + sender.send(router, GetRouterData) + inside(sender.expectMsgType[Data]) { routerData => + awaitAssert({ + assert(routerData.spentChannels(spliceTx_ab.txid) == Set(scid_ab)) + assert(routerData.spentChannels(spliceTx_bc.txid) == Set(scid_bc)) + assert(routerData.spentChannels(spliceTx_bc2.txid) == Set(scid_bc2)) + assert(routerData.spentChannels(batchSpliceTx.txid) == Set(scid_ab, scid_bc, scid_bc2)) + assert(routerData.spentChannels(batchSpliceTx_RBF.txid) == Set(scid_ab, scid_bc, scid_bc2)) + }) + } - // The splice of channel ab is announced. - val spliceScid = RealShortChannelId(BlockHeight(450000), 1, 0) - val spliceAnn = channelAnnouncement(spliceScid, priv_a, priv_b, priv_funding_a, priv_funding_b) - peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, spliceAnn)) - peerConnection.expectNoMessage(100 millis) - assert(watcher.expectMsgType[ValidateRequest].ann == spliceAnn) - watcher.send(router, ValidateResult(spliceAnn, Right(batchSpliceTx_RBF, UtxoStatus.Unspent))) - peerConnection.expectMsg(TransportHandler.ReadAck(spliceAnn)) - peerConnection.expectMsg(GossipDecision.Accepted(spliceAnn)) - assert(peerConnection.sender() == router) + // The splice of channel ab is announced, verified and added to the graph; the parent channel is removed from the graph. + val spliceScid_ab = RealShortChannelId(BlockHeight(450000), 1, 0) + val spliceAnn_ab = channelAnnouncement(spliceScid_ab, priv_a, priv_b, priv_funding_a, priv_funding_b) + processSpliceChannelAnnouncement(fixture, scid_ab, spliceAnn_ab, batchSpliceTx_RBF, newCapacity_ab_RBF) + assert(watcher.expectMsgType[UnwatchTxConfirmed].txId == spliceTx_ab.txid) - // And the graph should be updated too. - val sender = TestProbe() - sender.send(router, Router.GetRouterData) - val routerData = sender.expectMsgType[Data] - val g = routerData.graphWithBalances.graph - val edge_ab = g.getEdge(ChannelDesc(spliceScid, a, b)).get - val edge_ba = g.getEdge(ChannelDesc(spliceScid, b, a)).get - assert(g.getEdge(ChannelDesc(scid_ab, a, b)).isEmpty) - assert(g.getEdge(ChannelDesc(scid_ab, b, a)).isEmpty) - assert(edge_ab.capacity == capacityAB_RBF && edge_ba.capacity == capacityAB_RBF) - - // The router still tracks the possible spending txs for scid bc, but not scid ab. - awaitAssert({ - val probe = TestProbe() - probe.send(router, GetRouterData) - val routerData = probe.expectMsgType[Data] - assert(!routerData.spentChannels.contains(spliceTx1.txid)) - assert(routerData.spentChannels(spliceTx2.txid) == Set(scid_bc)) - assert(routerData.spentChannels(batchSpliceTx.txid) == Set(scid_bc)) - assert(routerData.spentChannels(batchSpliceTx_RBF.txid) == Set(scid_bc)) - }) + // The router still tracks the possible spending txs for channels bc and bc2. + sender.send(router, GetRouterData) + inside(sender.expectMsgType[Data]) { routerData => + awaitAssert({ + assert(!routerData.spentChannels.contains(spliceTx_ab.txid)) + assert(routerData.spentChannels.contains(spliceTx_bc.txid)) + assert(routerData.spentChannels.contains(spliceTx_bc2.txid)) + assert(routerData.spentChannels(spliceTx_bc.txid) == Set(scid_bc)) + assert(routerData.spentChannels(batchSpliceTx.txid) == Set(scid_bc, scid_bc2)) + assert(routerData.spentChannels(batchSpliceTx_RBF.txid) == Set(scid_bc, scid_bc2)) + }) + } - // The splice channel update for the scid ab splice confirms and the channel is updated. - router ! WatchTxConfirmedTriggered(BlockHeight(0), 0, batchSpliceTx_RBF) - eventListener.expectMsg(ChannelsDiscovered(SingleChannelDiscovered(spliceAnn, capacityAB_RBF, None, None) :: Nil)) + // The splice of channel bc is announced, verified and added to the graph; the parent channel is removed from the graph. + val spliceScid_bc = RealShortChannelId(BlockHeight(450000), 1, 1) + val spliceAnn_bc = channelAnnouncement(spliceScid_bc, priv_b, priv_c, priv_funding_b, priv_funding_c) + processSpliceChannelAnnouncement(fixture, scid_bc, spliceAnn_bc, batchSpliceTx_RBF, newCapacity_bc) + assert(watcher.expectMsgType[UnwatchTxConfirmed].txId == spliceTx_bc.txid) + + // The router still tracks the possible spending txs for channel bc or bc2 - either could be considered the parent of scid_bc. + sender.send(router, GetRouterData) + inside(sender.expectMsgType[Data]) { routerData => + awaitAssert({ + if (routerData.spentChannels.contains(spliceTx_bc.txid)) { + assert(!routerData.spentChannels.contains(spliceTx_bc2.txid)) + assert(routerData.spentChannels(spliceTx_bc.txid) == Set(scid_bc)) + assert(routerData.spentChannels(batchSpliceTx.txid) == Set(scid_bc)) + assert(routerData.spentChannels(batchSpliceTx_RBF.txid) == Set(scid_bc)) + } + else { + assert(routerData.spentChannels.contains(spliceTx_bc2.txid)) + assert(routerData.spentChannels(spliceTx_bc2.txid) == Set(scid_bc2)) + assert(routerData.spentChannels(batchSpliceTx.txid) == Set(scid_bc2)) + assert(routerData.spentChannels(batchSpliceTx_RBF.txid) == Set(scid_bc2)) + } + }) + } + + // Splice channel updates received for ab and bc add new channels to and remove the parent channels from the graph. + eventListener.expectMsg(ChannelsDiscovered(SingleChannelDiscovered(spliceAnn_ab, newCapacity_ab_RBF, None, None) :: Nil)) eventListener.expectMsg(ChannelLost(scid_ab)) - // No splice channel update for scid bc was received yet so it is removed. + eventListener.expectMsg(ChannelsDiscovered(SingleChannelDiscovered(spliceAnn_bc, newCapacity_bc, None, None) :: Nil)) eventListener.expectMsg(ChannelLost(scid_bc)) - peerConnection.expectNoMessage(100 millis) + + // No splice channel update for channel bc2 was received before the batch splice tx confirms so channel bc2 is removed. + sender.send(router, GetRouterData) + val fundingTxId_bc2 = sender.expectMsgType[Data].channels(scid_bc2).fundingTxId + router ! WatchTxConfirmedTriggered(BlockHeight(0), 0, batchSpliceTx_RBF) + assert(watcher.expectMsgType[UnwatchExternalChannelSpent].txId == fundingTxId_bc2) + eventListener.expectMsg(ChannelLost(scid_bc2)) eventListener.expectNoMessage(100 millis) + // Alternative spending transactions in the mempool are now unspendable and need not be watched. + assert((1 to 2).map(_ => watcher.expectMsgType[UnwatchTxConfirmed].txId).toSet == Set(spliceTx_bc2.txid, batchSpliceTx.txid)) + watcher.expectNoMessage(100 millis) + // The router no longer tracks the parent scids. - awaitAssert({ - val probe = TestProbe() - probe.send(router, GetRouterData) - val routerData = probe.expectMsgType[Data] - assert(routerData.spentChannels.isEmpty) - assert(!routerData.channels.contains(scid_ab)) - assert(!routerData.channels.contains(scid_bc)) - }) + sender.send(router, GetRouterData) + inside(sender.expectMsgType[Data]) { routerData => + awaitAssert({ + assert(routerData.spentChannels.isEmpty) + assert(!routerData.channels.contains(scid_ab)) + assert(!routerData.channels.contains(scid_bc)) + assert(!routerData.channels.contains(scid_bc2)) + assert(routerData.channels.contains(spliceScid_ab)) + assert(routerData.channels.contains(spliceScid_bc)) + }) + } } } From 790ffb58e6aeb59c192bb4597e37bdbd4c097f71 Mon Sep 17 00:00:00 2001 From: Richard Myers Date: Wed, 5 Feb 2025 11:50:12 +0100 Subject: [PATCH 3/5] Add more fixes suggested by @t-bast --- .../acinq/eclair/router/BaseRouterSpec.scala | 24 ++++++++++--------- .../fr/acinq/eclair/router/RouterSpec.scala | 9 ++++--- 2 files changed, 19 insertions(+), 14 deletions(-) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/router/BaseRouterSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/router/BaseRouterSpec.scala index 7090f6853f..9d21c490a4 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/router/BaseRouterSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/router/BaseRouterSpec.scala @@ -232,39 +232,41 @@ abstract class BaseRouterSpec extends TestKitBaseClass with FixtureAnyFunSuiteLi } def addChannel(router: ActorRef, watcher: TestProbe, scid: RealShortChannelId, priv1: PrivateKey, priv2: PrivateKey, priv_funding1: PrivateKey, priv_funding2: PrivateKey): (ChannelAnnouncement, ChannelUpdate, ChannelUpdate) = { - val channelAnnoucement = channelAnnouncement(scid, priv1, priv2, priv_funding1, priv_funding2) + val ann = channelAnnouncement(scid, priv1, priv2, priv_funding1, priv_funding2) val pub1 = priv1.publicKey val pub2 = priv2.publicKey val update1 = makeChannelUpdate(Block.RegtestGenesisBlock.hash, priv1, pub2, scid, CltvExpiryDelta(7), htlcMinimumMsat = 0 msat, feeBaseMsat = 10 msat, feeProportionalMillionths = 10, htlcMaximumMsat = htlcMaximum) val update2 = makeChannelUpdate(Block.RegtestGenesisBlock.hash, priv2, pub1, scid, CltvExpiryDelta(7), htlcMinimumMsat = 0 msat, feeBaseMsat = 10 msat, feeProportionalMillionths = 10, htlcMaximumMsat = htlcMaximum) val pub_funding1 = priv_funding1.publicKey val pub_funding2 = priv_funding2.publicKey - assert(ChannelDesc(update1, channelAnnoucement) == ChannelDesc(channelAnnoucement.shortChannelId, pub1, pub2)) + assert(ChannelDesc(update1, ann) == ChannelDesc(ann.shortChannelId, pub1, pub2)) val sender1 = TestProbe() val peerConnection = TestProbe() peerConnection.ignoreMsg { case _: TransportHandler.ReadAck => true } - peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, channelAnnoucement)) + peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, ann)) peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, update1)) peerConnection.send(router, PeerRoutingMessage(peerConnection.ref, remoteNodeId, update2)) - assert(watcher.expectMsgType[ValidateRequest].ann == channelAnnoucement) - watcher.send(router, ValidateResult(channelAnnoucement, Right((Transaction(version = 0, txIn = Nil, txOut = TxOut(publicChannelCapacity, write(pay2wsh(Scripts.multiSig2of2(pub_funding1, pub_funding2)))) :: Nil, lockTime = 0), UtxoStatus.Unspent)))) + assert(watcher.expectMsgType[ValidateRequest].ann == ann) + watcher.send(router, ValidateResult(ann, Right((Transaction(version = 2, txIn = Nil, txOut = TxOut(publicChannelCapacity, write(pay2wsh(Scripts.multiSig2of2(pub_funding1, pub_funding2)))) :: Nil, lockTime = 0), UtxoStatus.Unspent)))) assert(watcher.expectMsgType[WatchExternalChannelSpent].shortChannelId == scid) peerConnection.expectMsgAllOf( - GossipDecision.Accepted(channelAnnoucement), + GossipDecision.Accepted(ann), GossipDecision.Accepted(update1), GossipDecision.Accepted(update2) ) - peerConnection.expectNoMessage() + peerConnection.expectNoMessage(100 millis) awaitCond({ sender1.send(router, GetNodes) val nodes = sender1.expectMsgType[Iterable[NodeAnnouncement]] sender1.send(router, GetChannels) - val channels = sender1.expectMsgType[Iterable[ChannelAnnouncement]] + val channels = sender1.expectMsgType[Iterable[ChannelAnnouncement]].toSeq sender1.send(router, GetChannelUpdates) - val updates = sender1.expectMsgType[Iterable[ChannelUpdate]] - nodes.size == 8 && channels.size == 6 && updates.size == 14 + val updates = sender1.expectMsgType[Iterable[ChannelUpdate]].toSeq + nodes.exists(_.nodeId == pub1) && nodes.exists(_.nodeId == pub2) && + channels.contains(ann) && + updates.contains(update1) && updates.contains(update2) }, max = 10 seconds, interval = 1 second) - (channelAnnoucement, update1, update2) + (ann, update1, update2) } } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/router/RouterSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/router/RouterSpec.scala index 61808d1ade..d43d4f14da 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/router/RouterSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/router/RouterSpec.scala @@ -1345,8 +1345,7 @@ class RouterSpec extends BaseRouterSpec { assert(routerData.spentChannels(spliceTx_bc.txid) == Set(scid_bc)) assert(routerData.spentChannels(batchSpliceTx.txid) == Set(scid_bc)) assert(routerData.spentChannels(batchSpliceTx_RBF.txid) == Set(scid_bc)) - } - else { + } else { assert(routerData.spentChannels.contains(spliceTx_bc2.txid)) assert(routerData.spentChannels(spliceTx_bc2.txid) == Set(scid_bc2)) assert(routerData.spentChannels(batchSpliceTx.txid) == Set(scid_bc2)) @@ -1370,7 +1369,11 @@ class RouterSpec extends BaseRouterSpec { eventListener.expectNoMessage(100 millis) // Alternative spending transactions in the mempool are now unspendable and need not be watched. - assert((1 to 2).map(_ => watcher.expectMsgType[UnwatchTxConfirmed].txId).toSet == Set(spliceTx_bc2.txid, batchSpliceTx.txid)) + val unwatchedTxs = Set( + watcher.expectMsgType[UnwatchTxConfirmed].txId, + watcher.expectMsgType[UnwatchTxConfirmed].txId, + ) + assert(unwatchedTxs == Set(spliceTx_bc2.txid, batchSpliceTx.txid)) watcher.expectNoMessage(100 millis) // The router no longer tracks the parent scids. From 840bc3a65eb92ffebdbbe6fcc767a539d3b238d1 Mon Sep 17 00:00:00 2001 From: Richard Myers Date: Wed, 5 Feb 2025 15:02:28 +0100 Subject: [PATCH 4/5] Fix check that both initial local channel updates are received Previous check would succeed even if one or both updates were None. --- .../integration/basic/channel/GossipIntegrationSpec.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/integration/basic/channel/GossipIntegrationSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/integration/basic/channel/GossipIntegrationSpec.scala index cc89874819..32b19ea443 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/integration/basic/channel/GossipIntegrationSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/integration/basic/channel/GossipIntegrationSpec.scala @@ -59,7 +59,8 @@ class GossipIntegrationSpec extends FixtureSpec with IntegrationPatience { // Wait for Alice to receive both initial local channel updates. inside(getRouterData(alice)) { routerData => val channel_ab = routerData.channels(scid_ab) - Seq(channel_ab.update_1_opt, channel_ab.update_2_opt).flatten.foreach(u => assert(u.shortChannelId == scid_ab)) + val receivedUpdates = Seq(channel_ab.update_1_opt, channel_ab.update_2_opt).flatten + assert(receivedUpdates.count(_.shortChannelId == scid_ab) == 2) } scid_ab } From b8aa05e716d3c4bc2abc4801a78fd1584634f901 Mon Sep 17 00:00:00 2001 From: Richard Myers Date: Wed, 5 Feb 2025 16:48:27 +0100 Subject: [PATCH 5/5] Fix test to get router data using awaitAssert properly If assert fails, should call getRouterData again for the next awaitAssert loop. --- .../fr/acinq/eclair/router/RouterSpec.scala | 54 +++++++++---------- 1 file changed, 27 insertions(+), 27 deletions(-) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/router/RouterSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/router/RouterSpec.scala index d43d4f14da..2856f0f48c 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/router/RouterSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/router/RouterSpec.scala @@ -1300,16 +1300,16 @@ class RouterSpec extends BaseRouterSpec { // The router tracks the possible spending txs for channels ab, bc and bc2. val sender = TestProbe() - sender.send(router, GetRouterData) - inside(sender.expectMsgType[Data]) { routerData => - awaitAssert({ + awaitAssert({ + sender.send(router, GetRouterData) + inside(sender.expectMsgType[Data]) { routerData => assert(routerData.spentChannels(spliceTx_ab.txid) == Set(scid_ab)) assert(routerData.spentChannels(spliceTx_bc.txid) == Set(scid_bc)) assert(routerData.spentChannels(spliceTx_bc2.txid) == Set(scid_bc2)) assert(routerData.spentChannels(batchSpliceTx.txid) == Set(scid_ab, scid_bc, scid_bc2)) assert(routerData.spentChannels(batchSpliceTx_RBF.txid) == Set(scid_ab, scid_bc, scid_bc2)) - }) - } + } + }) // The splice of channel ab is announced, verified and added to the graph; the parent channel is removed from the graph. val spliceScid_ab = RealShortChannelId(BlockHeight(450000), 1, 0) @@ -1318,17 +1318,17 @@ class RouterSpec extends BaseRouterSpec { assert(watcher.expectMsgType[UnwatchTxConfirmed].txId == spliceTx_ab.txid) // The router still tracks the possible spending txs for channels bc and bc2. - sender.send(router, GetRouterData) - inside(sender.expectMsgType[Data]) { routerData => - awaitAssert({ - assert(!routerData.spentChannels.contains(spliceTx_ab.txid)) - assert(routerData.spentChannels.contains(spliceTx_bc.txid)) - assert(routerData.spentChannels.contains(spliceTx_bc2.txid)) - assert(routerData.spentChannels(spliceTx_bc.txid) == Set(scid_bc)) - assert(routerData.spentChannels(batchSpliceTx.txid) == Set(scid_bc, scid_bc2)) - assert(routerData.spentChannels(batchSpliceTx_RBF.txid) == Set(scid_bc, scid_bc2)) - }) - } + awaitAssert({ + sender.send(router, GetRouterData) + inside(sender.expectMsgType[Data]) { routerData => + assert(!routerData.spentChannels.contains(spliceTx_ab.txid)) + assert(routerData.spentChannels.contains(spliceTx_bc.txid)) + assert(routerData.spentChannels.contains(spliceTx_bc2.txid)) + assert(routerData.spentChannels(spliceTx_bc.txid) == Set(scid_bc)) + assert(routerData.spentChannels(batchSpliceTx.txid) == Set(scid_bc, scid_bc2)) + assert(routerData.spentChannels(batchSpliceTx_RBF.txid) == Set(scid_bc, scid_bc2)) + } + }) // The splice of channel bc is announced, verified and added to the graph; the parent channel is removed from the graph. val spliceScid_bc = RealShortChannelId(BlockHeight(450000), 1, 1) @@ -1337,10 +1337,10 @@ class RouterSpec extends BaseRouterSpec { assert(watcher.expectMsgType[UnwatchTxConfirmed].txId == spliceTx_bc.txid) // The router still tracks the possible spending txs for channel bc or bc2 - either could be considered the parent of scid_bc. - sender.send(router, GetRouterData) - inside(sender.expectMsgType[Data]) { routerData => - awaitAssert({ - if (routerData.spentChannels.contains(spliceTx_bc.txid)) { + awaitAssert({ + sender.send(router, GetRouterData) + inside(sender.expectMsgType[Data]) { routerData => + if (routerData.spentChannels.contains(spliceTx_bc.txid)) { assert(!routerData.spentChannels.contains(spliceTx_bc2.txid)) assert(routerData.spentChannels(spliceTx_bc.txid) == Set(scid_bc)) assert(routerData.spentChannels(batchSpliceTx.txid) == Set(scid_bc)) @@ -1351,8 +1351,8 @@ class RouterSpec extends BaseRouterSpec { assert(routerData.spentChannels(batchSpliceTx.txid) == Set(scid_bc2)) assert(routerData.spentChannels(batchSpliceTx_RBF.txid) == Set(scid_bc2)) } - }) - } + } + }) // Splice channel updates received for ab and bc add new channels to and remove the parent channels from the graph. eventListener.expectMsg(ChannelsDiscovered(SingleChannelDiscovered(spliceAnn_ab, newCapacity_ab_RBF, None, None) :: Nil)) @@ -1377,17 +1377,17 @@ class RouterSpec extends BaseRouterSpec { watcher.expectNoMessage(100 millis) // The router no longer tracks the parent scids. - sender.send(router, GetRouterData) - inside(sender.expectMsgType[Data]) { routerData => - awaitAssert({ + awaitAssert({ + sender.send(router, GetRouterData) + inside(sender.expectMsgType[Data]) { routerData => assert(routerData.spentChannels.isEmpty) assert(!routerData.channels.contains(scid_ab)) assert(!routerData.channels.contains(scid_bc)) assert(!routerData.channels.contains(scid_bc2)) assert(routerData.channels.contains(spliceScid_ab)) assert(routerData.channels.contains(spliceScid_bc)) - }) - } + } + }) } }