From 8ff8186c61463eb4e224c10105e5765c80094557 Mon Sep 17 00:00:00 2001 From: Thomas HUET Date: Wed, 22 Jan 2025 14:13:28 +0100 Subject: [PATCH] WIP --- docs/release-notes/eclair-vnext.md | 2 + .../scala/fr/acinq/eclair/NodeParams.scala | 2 +- .../main/scala/fr/acinq/eclair/io/Peer.scala | 2 +- .../fr/acinq/eclair/io/PeerConnection.scala | 21 ++++--- .../fr/acinq/eclair/io/Switchboard.scala | 20 +++++-- .../remote/EclairInternalsSerializer.scala | 7 ++- .../scala/fr/acinq/eclair/router/Router.scala | 20 ++----- .../scala/fr/acinq/eclair/router/Sync.scala | 19 ++---- .../fr/acinq/eclair/EclairImplSpec.scala | 2 +- .../scala/fr/acinq/eclair/TestConstants.scala | 4 +- .../acinq/eclair/io/PeerConnectionSpec.scala | 24 ++++---- .../fr/acinq/eclair/io/SwitchboardSpec.scala | 58 ++++++++++++++++++ .../acinq/eclair/router/RoutingSyncSpec.scala | 59 +++---------------- 13 files changed, 128 insertions(+), 112 deletions(-) diff --git a/docs/release-notes/eclair-vnext.md b/docs/release-notes/eclair-vnext.md index bf757a5dfc..272d03f33a 100644 --- a/docs/release-notes/eclair-vnext.md +++ b/docs/release-notes/eclair-vnext.md @@ -24,6 +24,8 @@ There are many organisations that package Java runtimes and development kits, fo ### Miscellaneous improvements and bug fixes +#### Gossip sync limits + On reconnection, eclair now only synchronizes its routing table with a small number of top peers instead of synchronizing with every peer. If you already use `sync-whitelist`, the default behavior has been modified and you must set `router.sync.peer-limit = 0` to keep preventing any synchronization with other nodes. You must also use `router.sync.whitelist` instead of `sync-whitelist`. diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala b/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala index 9323ee1800..5d8d675e9e 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala @@ -656,9 +656,9 @@ object NodeParams extends Logging { watchSpentWindow = watchSpentWindow, channelExcludeDuration = FiniteDuration(config.getDuration("router.channel-exclude-duration").getSeconds, TimeUnit.SECONDS), routerBroadcastInterval = FiniteDuration(config.getDuration("router.broadcast-interval").getSeconds, TimeUnit.SECONDS), - encodingType = EncodingType.UNCOMPRESSED, syncConf = Router.SyncConf( requestNodeAnnouncements = config.getBoolean("router.sync.request-node-announcements"), + encodingType = EncodingType.UNCOMPRESSED, channelRangeChunkSize = config.getInt("router.sync.channel-range-chunk-size"), channelQueryChunkSize = config.getInt("router.sync.channel-query-chunk-size"), peerLimit = config.getInt("router.sync.peer-limit"), diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala b/eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala index 2ced20094a..23210a5865 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala @@ -475,7 +475,7 @@ class Peer(val nodeParams: NodeParams, log.info(s"channel id switch: previousId=$temporaryChannelId nextId=$channelId") // we have our first channel with that peer: let's sync our routing table if (!d.channels.keys.exists(_.isInstanceOf[FinalChannelId])) { - d.peerConnection ! PeerConnection.DoSync(isReconnection = false) + d.peerConnection ! PeerConnection.DoSync(replacePrevious = false) } // NB: we keep the temporary channel id because the switch is not always acknowledged at this point (see https://github.com/lightningnetwork/lightning-rfc/pull/151) // we won't clean it up, but we won't remember the temporary id on channel termination diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/io/PeerConnection.scala b/eclair-core/src/main/scala/fr/acinq/eclair/io/PeerConnection.scala index 57bac1d223..ffcf92cac7 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/io/PeerConnection.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/io/PeerConnection.scala @@ -103,7 +103,7 @@ class PeerConnection(keyPair: KeyPair, conf: PeerConnection.Conf, switchboard: A } when(BEFORE_INIT) { - case Event(InitializeConnection(peer, chainHash, localFeatures, fundingRates_opt), d: BeforeInitData) => + case Event(InitializeConnection(peer, chainHash, localFeatures, doSync, fundingRates_opt), d: BeforeInitData) => d.transport ! TransportHandler.Listener(self) Metrics.PeerConnectionsConnecting.withTag(Tags.ConnectionState, Tags.ConnectionStates.Initializing).increment() log.debug(s"using features=$localFeatures") @@ -120,7 +120,7 @@ class PeerConnection(keyPair: KeyPair, conf: PeerConnection.Conf, switchboard: A d.transport ! localInit startSingleTimer(INIT_TIMER, InitTimeout, conf.initTimeout) unstashAll() // unstash remote init if it already arrived - goto(INITIALIZING) using InitializingData(chainHash, d.pendingAuth, d.remoteNodeId, d.transport, peer, localInit, d.isPersistent) + goto(INITIALIZING) using InitializingData(chainHash, d.pendingAuth, d.remoteNodeId, d.transport, peer, localInit, doSync, d.isPersistent) case Event(_: protocol.Init, _) => log.debug("stashing remote init") @@ -160,7 +160,11 @@ class PeerConnection(keyPair: KeyPair, conf: PeerConnection.Conf, switchboard: A d.peer ! ConnectionReady(self, d.remoteNodeId, d.pendingAuth.address, d.pendingAuth.outgoing, d.localInit, remoteInit) d.pendingAuth.origin_opt.foreach(_ ! ConnectionResult.Connected(self, d.peer)) - self ! DoSync(isReconnection = true) + if (d.doSync) { + self ! DoSync(replacePrevious = true) + } else { + log.info("not syncing with this peer") + } // we will delay all rebroadcasts with this value in order to prevent herd effects (each peer has a different delay) val rebroadcastDelay = Random.nextInt(conf.maxRebroadcastDelay.toSeconds.toInt).seconds @@ -395,12 +399,13 @@ class PeerConnection(keyPair: KeyPair, conf: PeerConnection.Conf, switchboard: A } stay() using d.copy(behavior = behavior1) - case Event(DoSync(isReconnection), d: ConnectedData) => + case Event(DoSync(replacePrevious), d: ConnectedData) => val canUseChannelRangeQueries = Features.canUseFeature(d.localInit.features, d.remoteInit.features, Features.ChannelRangeQueries) val canUseChannelRangeQueriesEx = Features.canUseFeature(d.localInit.features, d.remoteInit.features, Features.ChannelRangeQueriesExtended) if (canUseChannelRangeQueries || canUseChannelRangeQueriesEx) { val flags_opt = if (canUseChannelRangeQueriesEx) Some(QueryChannelRangeTlv.QueryFlags(QueryChannelRangeTlv.QueryFlags.WANT_ALL)) else None - router ! SendChannelQuery(d.chainHash, d.remoteNodeId, self, isReconnection, flags_opt) + log.debug(s"sending sync channel range query with flags_opt=$flags_opt replacePrevious=$replacePrevious") + router ! SendChannelQuery(d.chainHash, d.remoteNodeId, self, replacePrevious, flags_opt) } stay() @@ -531,7 +536,7 @@ object PeerConnection { case object SendPing case object KillIdle case object ResumeAnnouncements - case class DoSync(isReconnection: Boolean) extends RemoteTypes + case class DoSync(replacePrevious: Boolean) extends RemoteTypes // @formatter:on val IGNORE_NETWORK_ANNOUNCEMENTS_PERIOD: FiniteDuration = 5 minutes @@ -558,7 +563,7 @@ object PeerConnection { case object Nothing extends Data case class AuthenticatingData(pendingAuth: PendingAuth, transport: ActorRef, isPersistent: Boolean) extends Data with HasTransport case class BeforeInitData(remoteNodeId: PublicKey, pendingAuth: PendingAuth, transport: ActorRef, isPersistent: Boolean) extends Data with HasTransport - case class InitializingData(chainHash: BlockHash, pendingAuth: PendingAuth, remoteNodeId: PublicKey, transport: ActorRef, peer: ActorRef, localInit: protocol.Init, isPersistent: Boolean) extends Data with HasTransport + case class InitializingData(chainHash: BlockHash, pendingAuth: PendingAuth, remoteNodeId: PublicKey, transport: ActorRef, peer: ActorRef, localInit: protocol.Init, doSync: Boolean, isPersistent: Boolean) extends Data with HasTransport case class ConnectedData(chainHash: BlockHash, remoteNodeId: PublicKey, transport: ActorRef, peer: ActorRef, localInit: protocol.Init, remoteInit: protocol.Init, rebroadcastDelay: FiniteDuration, gossipTimestampFilter: Option[GossipTimestampFilter] = None, behavior: Behavior = Behavior(), expectedPong_opt: Option[ExpectedPong] = None, isPersistent: Boolean) extends Data with HasTransport case class ExpectedPong(ping: Ping, timestamp: TimestampMilli = TimestampMilli.now()) @@ -575,7 +580,7 @@ object PeerConnection { def outgoing: Boolean = remoteNodeId_opt.isDefined // if this is an outgoing connection, we know the node id in advance } case class Authenticated(peerConnection: ActorRef, remoteNodeId: PublicKey, outgoing: Boolean) extends RemoteTypes - case class InitializeConnection(peer: ActorRef, chainHash: BlockHash, features: Features[InitFeature], fundingRates_opt: Option[LiquidityAds.WillFundRates]) extends RemoteTypes + case class InitializeConnection(peer: ActorRef, chainHash: BlockHash, features: Features[InitFeature], doSync: Boolean, fundingRates_opt: Option[LiquidityAds.WillFundRates]) extends RemoteTypes case class ConnectionReady(peerConnection: ActorRef, remoteNodeId: PublicKey, address: NodeAddress, outgoing: Boolean, localInit: protocol.Init, remoteInit: protocol.Init) extends RemoteTypes sealed trait ConnectionResult extends RemoteTypes diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/io/Switchboard.scala b/eclair-core/src/main/scala/fr/acinq/eclair/io/Switchboard.scala index 600aa99ae3..43b1dea9cf 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/io/Switchboard.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/io/Switchboard.scala @@ -20,7 +20,7 @@ import akka.actor.typed.receptionist.{Receptionist, ServiceKey} import akka.actor.typed.scaladsl.Behaviors import akka.actor.typed.scaladsl.adapter.{ClassicActorContextOps, ClassicActorRefOps, ClassicActorSystemOps, TypedActorRefOps} import akka.actor.{Actor, ActorContext, ActorLogging, ActorRef, OneForOneStrategy, Props, Stash, Status, SupervisorStrategy, typed} -import fr.acinq.bitcoin.scalacompat.ByteVector32 +import fr.acinq.bitcoin.scalacompat.{ByteVector32, Satoshi} import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey import fr.acinq.eclair.blockchain.OnchainPubkeyCache import fr.acinq.eclair.channel.Helpers.Closing @@ -69,12 +69,17 @@ class Switchboard(nodeParams: NodeParams, peerFactory: Switchboard.PeerFactory) } log.info("restoring {} peer(s) with {} channel(s) and {} peers with pending on-the-fly funding", peersWithChannels.size, channels.size, (peersWithOnTheFlyFunding.keySet -- peersWithChannels.keySet).size) unstashAll() - context.become(normal(peersWithChannels.keySet)) + val peerCapacities = channels.map { + case channelData: ChannelDataWithoutCommitments => (channelData.remoteNodeId, 0L) + case channelData: ChannelDataWithCommitments => (channelData.remoteNodeId, channelData.commitments.capacity.toLong) + }.groupMapReduce[PublicKey, Long](_._1)(_._2)(_ + _) + val topCapacityPeers = peerCapacities.toSeq.sortWith { case ((_, c1), (_, c2)) => c1 > c2 }.take(nodeParams.routerConf.syncConf.peerLimit).map(_._1).toSet + context.become(normal(peersWithChannels.keySet, topCapacityPeers ++ nodeParams.routerConf.syncConf.whitelist)) case _ => stash() } - def normal(peersWithChannels: Set[PublicKey]): Receive = { + def normal(peersWithChannels: Set[PublicKey], syncWhitelist: Set[PublicKey]): Receive = { case Peer.Connect(publicKey, _, _, _) if publicKey == nodeParams.nodeId => sender() ! Status.Failure(new RuntimeException("cannot open connection with oneself")) @@ -110,14 +115,17 @@ class Switchboard(nodeParams: NodeParams, peerFactory: Switchboard.PeerFactory) val peer = createOrGetPeer(authenticated.remoteNodeId, offlineChannels = Set.empty, pendingOnTheFlyFunding = Map.empty) val features = nodeParams.initFeaturesFor(authenticated.remoteNodeId) val hasChannels = peersWithChannels.contains(authenticated.remoteNodeId) - authenticated.peerConnection ! PeerConnection.InitializeConnection(peer, nodeParams.chainHash, features, nodeParams.liquidityAdsConfig.rates_opt) + val doSync = syncWhitelist.contains(authenticated.remoteNodeId) + authenticated.peerConnection ! PeerConnection.InitializeConnection(peer, nodeParams.chainHash, features, doSync, nodeParams.liquidityAdsConfig.rates_opt) if (!hasChannels && !authenticated.outgoing) { incomingConnectionsTracker ! TrackIncomingConnection(authenticated.remoteNodeId) } - case ChannelIdAssigned(_, remoteNodeId, _, _) => context.become(normal(peersWithChannels + remoteNodeId)) + case ChannelIdAssigned(_, remoteNodeId, _, _) => + val syncWhitelist1 = if (syncWhitelist.size < nodeParams.routerConf.syncConf.peerLimit) syncWhitelist + remoteNodeId else syncWhitelist + context.become(normal(peersWithChannels + remoteNodeId, syncWhitelist1)) - case LastChannelClosed(_, remoteNodeId) => context.become(normal(peersWithChannels - remoteNodeId)) + case LastChannelClosed(_, remoteNodeId) => context.become(normal(peersWithChannels - remoteNodeId, syncWhitelist)) case GetPeers => sender() ! context.children.filterNot(_ == incomingConnectionsTracker.toClassic) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/remote/EclairInternalsSerializer.scala b/eclair-core/src/main/scala/fr/acinq/eclair/remote/EclairInternalsSerializer.scala index 35d386f0a4..4538eed438 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/remote/EclairInternalsSerializer.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/remote/EclairInternalsSerializer.scala @@ -99,6 +99,9 @@ object EclairInternalsSerializer { val syncConfCodec: Codec[Router.SyncConf] = ( ("requestNodeAnnouncements" | bool(8)) :: + ("encodingType" | discriminated[EncodingType].by(uint8) + .typecase(0, provide(EncodingType.UNCOMPRESSED)) + .typecase(1, provide(EncodingType.COMPRESSED_ZLIB))) :: ("channelRangeChunkSize" | int32) :: ("channelQueryChunkSize" | int32) :: ("peerLimit" | int32) :: @@ -108,9 +111,6 @@ object EclairInternalsSerializer { ("watchSpentWindow" | finiteDurationCodec) :: ("channelExcludeDuration" | finiteDurationCodec) :: ("routerBroadcastInterval" | finiteDurationCodec) :: - ("encodingType" | discriminated[EncodingType].by(uint8) - .typecase(0, provide(EncodingType.UNCOMPRESSED)) - .typecase(1, provide(EncodingType.COMPRESSED_ZLIB))) :: ("syncConf" | syncConfCodec) :: ("pathFindingExperimentConf" | pathFindingExperimentConfCodec) :: ("messageRouteParams" | messageRouteParamsCodec) :: @@ -161,6 +161,7 @@ object EclairInternalsSerializer { ("peer" | actorRefCodec(system)) :: ("chainHash" | blockHash) :: ("features" | variableSizeBytes(uint16, initFeaturesCodec)) :: + ("doSync" | bool(8)) :: ("fundingRates" | optional(bool(8), LiquidityAds.Codecs.willFundRates))).as[PeerConnection.InitializeConnection] def connectionReadyCodec(system: ExtendedActorSystem): Codec[PeerConnection.ConnectionReady] = ( diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala b/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala index fab2d95df9..3e9a72deb0 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala @@ -86,16 +86,6 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm context.system.eventStream.publish(ChannelUpdatesReceived(channels.values.flatMap(pc => pc.update_1_opt ++ pc.update_2_opt ++ Nil))) context.system.eventStream.publish(NodesDiscovered(nodes)) - val peerCapacities = channels.values.map(pc => - if (pc.nodeId1 == nodeParams.nodeId) { - Some((pc.nodeId2, pc.capacity)) - } else if (pc.nodeId2 == nodeParams.nodeId) { - Some((pc.nodeId1, pc.capacity)) - } else { - None - }).flatten.groupMapReduce(_._1)(_._2)(_ + _) - val topCapacityPeers = peerCapacities.toSeq.sortWith { case ((_, c1), (_, c2)) => c1 > c2 }.take(nodeParams.routerConf.syncConf.peerLimit).map(_._1).toSet - // watch the funding tx of all these channels // note: some of them may already have been spent, in that case we will receive the watch event immediately (channels.values ++ pruned.values).foreach { pc => @@ -124,8 +114,7 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm excludedChannels = Map.empty, graphWithBalances = GraphWithBalanceEstimates(graph, nodeParams.routerConf.balanceEstimateHalfLife), sync = Map.empty, - spentChannels = Map.empty, - topCapacityPeers = topCapacityPeers) + spentChannels = Map.empty) startWith(NORMAL, data) } @@ -308,7 +297,7 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm stay() using Validation.handleAvailableBalanceChanged(d, e) case Event(s: SendChannelQuery, d) => - stay() using Sync.handleSendChannelQuery(nodeParams.routerConf.syncConf, d, s) + stay() using Sync.handleSendChannelQuery(d, s) case Event(PeerRoutingMessage(peerConnection, remoteNodeId, q: QueryChannelRange), d) => Sync.handleQueryChannelRange(d.channels, nodeParams.routerConf, RemoteGossip(peerConnection, remoteNodeId), q) @@ -380,6 +369,7 @@ object Router { } case class SyncConf(requestNodeAnnouncements: Boolean, + encodingType: EncodingType, channelRangeChunkSize: Int, channelQueryChunkSize: Int, peerLimit: Int, @@ -391,7 +381,6 @@ object Router { case class RouterConf(watchSpentWindow: FiniteDuration, channelExcludeDuration: FiniteDuration, routerBroadcastInterval: FiniteDuration, - encodingType: EncodingType, syncConf: SyncConf, pathFindingExperimentConf: PathFindingExperimentConf, messageRouteParams: MessageRouteParams, @@ -715,7 +704,7 @@ object Router { // @formatter:on // @formatter:off - case class SendChannelQuery(chainHash: BlockHash, remoteNodeId: PublicKey, to: ActorRef, isReconnection: Boolean, flags_opt: Option[QueryChannelRangeTlv]) extends RemoteTypes + case class SendChannelQuery(chainHash: BlockHash, remoteNodeId: PublicKey, to: ActorRef, replacePrevious: Boolean, flags_opt: Option[QueryChannelRangeTlv]) extends RemoteTypes case object GetRoutingState case class RoutingState(channels: Iterable[PublicChannel], nodes: Iterable[NodeAnnouncement]) case object GetRoutingStateStreaming extends RemoteTypes @@ -784,7 +773,6 @@ object Router { graphWithBalances: GraphWithBalanceEstimates, sync: Map[PublicKey, Syncing], // keep tracks of channel range queries sent to each peer. If there is an entry in the map, it means that there is an ongoing query for which we have not yet received an 'end' message spentChannels: Map[TxId, RealShortChannelId], // transactions that spend funding txs that are not yet deeply buried - topCapacityPeers: Set[PublicKey], ) { def resolve(scid: ShortChannelId): Option[KnownChannel] = { // let's assume this is a real scid diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/router/Sync.scala b/eclair-core/src/main/scala/fr/acinq/eclair/router/Sync.scala index 88b997fc9d..569d9b7795 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/router/Sync.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/router/Sync.scala @@ -42,21 +42,13 @@ object Sync { // block almost never exceeds 2800 so this should very rarely be limiting val MAXIMUM_CHUNK_SIZE = 2700 - def handleSendChannelQuery(conf: SyncConf, d: Data, s: SendChannelQuery)(implicit ctx: ActorContext, log: LoggingAdapter): Data = { - // TODO: check that s.remoteNodeId is eligible for sync + def handleSendChannelQuery(d: Data, s: SendChannelQuery)(implicit ctx: ActorContext, log: LoggingAdapter): Data = { implicit val sender: ActorRef = ctx.self // necessary to preserve origin when sending messages to other actors // we currently send query_channel_range when: - // * we just reconnected to a peer with whom we have channels + // * we just (re)connected to a peer with whom we have channels // * we validate our first channel with a peer - val shouldSync = if (s.isReconnection) { - conf.whitelist.contains(s.remoteNodeId) || d.topCapacityPeers.contains(s.remoteNodeId) - } else if (d.sync.contains(s.remoteNodeId)) { // we must ensure we don't send a new query_channel_range while another query is still in progress - log.debug("not sending query_channel_range: sync already in progress") - false - } else { - true - } - if (shouldSync) { + // we must ensure we don't send a new query_channel_range while another query is still in progress + if (s.replacePrevious || !d.sync.contains(s.remoteNodeId)) { // ask for everything val query = QueryChannelRange(s.chainHash, firstBlock = BlockHeight(0), numberOfBlocks = Int.MaxValue.toLong, TlvStream(s.flags_opt.toSet)) log.debug("sending query_channel_range={}", query) @@ -72,6 +64,7 @@ object Sync { // reset our sync state for this peer: we create an entry to ensure we reject duplicate queries and unsolicited reply_channel_range d.copy(sync = d.sync + (s.remoteNodeId -> Syncing(Nil, 0))) } else { + log.debug("not sending query_channel_range: sync already in progress") d } } @@ -88,7 +81,7 @@ object Sync { Metrics.QueryChannelRange.Replies.withoutTags().record(chunks.size) chunks.zipWithIndex.foreach { case (chunk, i) => val syncComplete = i == chunks.size - 1 - val reply = buildReplyChannelRange(chunk, syncComplete, q.chainHash, routerConf.encodingType, q.queryFlags_opt, channels) + val reply = buildReplyChannelRange(chunk, syncComplete, q.chainHash, routerConf.syncConf.encodingType, q.queryFlags_opt, channels) origin.peerConnection ! reply Metrics.ReplyChannelRange.Blocks.withTag(Tags.Direction, Tags.Directions.Outgoing).record(reply.numberOfBlocks) Metrics.ReplyChannelRange.ShortChannelIds.withTag(Tags.Direction, Tags.Directions.Outgoing).record(reply.shortChannelIds.array.size) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/EclairImplSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/EclairImplSpec.scala index 2bc26a6be5..4d0a729e0a 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/EclairImplSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/EclairImplSpec.scala @@ -375,7 +375,7 @@ class EclairImplSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike with I shortIds4.real_opt.get.toLong -> channelId4, ) val g = GraphWithBalanceEstimates(DirectedGraph(Nil), 1 hour) - val routerData = Router.Data(Map.empty, publicChannels, SortedMap.empty, Router.Stash(Map.empty, Map.empty), Router.Rebroadcast(Map.empty, Map.empty, Map.empty), Map.empty, privateChannels, scidMapping, Map.empty, g, Map.empty, Map.empty, Set.empty) + val routerData = Router.Data(Map.empty, publicChannels, SortedMap.empty, Router.Stash(Map.empty, Map.empty), Router.Rebroadcast(Map.empty, Map.empty, Map.empty), Map.empty, privateChannels, scidMapping, Map.empty, g, Map.empty, Map.empty) eclair.findRoute(c, 250_000 msat, None) val routeRequest1 = router.expectMsgType[RouteRequest] diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala b/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala index ba83ab339a..c122dd6a98 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala @@ -196,9 +196,9 @@ object TestConstants { watchSpentWindow = 1 second, channelExcludeDuration = 60 seconds, routerBroadcastInterval = 1 day, // "disables" rebroadcast - encodingType = EncodingType.COMPRESSED_ZLIB, syncConf = Router.SyncConf( requestNodeAnnouncements = true, + encodingType = EncodingType.COMPRESSED_ZLIB, channelRangeChunkSize = 20, channelQueryChunkSize = 5, peerLimit = 10, @@ -378,9 +378,9 @@ object TestConstants { watchSpentWindow = 1 second, channelExcludeDuration = 60 seconds, routerBroadcastInterval = 1 day, // "disables" rebroadcast - encodingType = EncodingType.UNCOMPRESSED, syncConf = Router.SyncConf( requestNodeAnnouncements = true, + encodingType = EncodingType.UNCOMPRESSED, channelRangeChunkSize = 20, channelQueryChunkSize = 5, peerLimit = 20, diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerConnectionSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerConnectionSpec.scala index 43ab2007cd..fda08fff11 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerConnectionSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/io/PeerConnectionSpec.scala @@ -68,19 +68,23 @@ class PeerConnectionSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike wi withFixture(test.toNoArgTest(FixtureParam(aliceParams, remoteNodeId, switchboard, router, connection, transport, peerConnection, peer))) } - def connect(aliceParams: NodeParams, remoteNodeId: PublicKey, switchboard: TestProbe, router: TestProbe, connection: TestProbe, transport: TestProbe, peerConnection: TestFSMRef[PeerConnection.State, PeerConnection.Data, PeerConnection], peer: TestProbe, remoteInit: protocol.Init = protocol.Init(Bob.nodeParams.features.initFeatures()), isPersistent: Boolean = true): Unit = { + def connect(aliceParams: NodeParams, remoteNodeId: PublicKey, switchboard: TestProbe, router: TestProbe, connection: TestProbe, transport: TestProbe, peerConnection: TestFSMRef[PeerConnection.State, PeerConnection.Data, PeerConnection], peer: TestProbe, remoteInit: protocol.Init = protocol.Init(Bob.nodeParams.features.initFeatures()), doSync: Boolean = false, isPersistent: Boolean = true): Unit = { // let's simulate a connection val probe = TestProbe() probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = None, transport_opt = Some(transport.ref), isPersistent = isPersistent)) transport.send(peerConnection, TransportHandler.HandshakeCompleted(remoteNodeId)) switchboard.expectMsg(PeerConnection.Authenticated(peerConnection, remoteNodeId, outgoing = true)) - probe.send(peerConnection, PeerConnection.InitializeConnection(peer.ref, aliceParams.chainHash, aliceParams.features.initFeatures(), None)) + probe.send(peerConnection, PeerConnection.InitializeConnection(peer.ref, aliceParams.chainHash, aliceParams.features.initFeatures(), doSync, None)) transport.expectMsgType[TransportHandler.Listener] val localInit = transport.expectMsgType[protocol.Init] assert(localInit.networks == List(Block.RegtestGenesisBlock.hash)) transport.send(peerConnection, remoteInit) transport.expectMsgType[TransportHandler.ReadAck] - router.expectMsgType[SendChannelQuery] + if (doSync) { + router.expectMsgType[SendChannelQuery] + } else { + router.expectNoMessage(1 second) + } peer.expectMsg(PeerConnection.ConnectionReady(peerConnection, remoteNodeId, address, outgoing = true, localInit, remoteInit)) assert(peerConnection.stateName == PeerConnection.CONNECTED) } @@ -98,7 +102,7 @@ class PeerConnectionSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike wi probe.send(peerConnection, incomingConnection) transport.send(peerConnection, TransportHandler.HandshakeCompleted(remoteNodeId)) switchboard.expectMsg(PeerConnection.Authenticated(peerConnection, remoteNodeId, outgoing = false)) - probe.send(peerConnection, PeerConnection.InitializeConnection(peer.ref, nodeParams.chainHash, nodeParams.features.initFeatures(), None)) + probe.send(peerConnection, PeerConnection.InitializeConnection(peer.ref, nodeParams.chainHash, nodeParams.features.initFeatures(), doSync = false, None)) transport.expectMsgType[TransportHandler.Listener] val localInit = transport.expectMsgType[protocol.Init] assert(localInit.remoteAddress_opt == Some(fakeIPAddress)) @@ -130,7 +134,7 @@ class PeerConnectionSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike wi probe.watch(peerConnection) probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = Some(origin.ref), transport_opt = Some(transport.ref), isPersistent = true)) transport.send(peerConnection, TransportHandler.HandshakeCompleted(remoteNodeId)) - probe.send(peerConnection, PeerConnection.InitializeConnection(peer.ref, nodeParams.chainHash, nodeParams.features.initFeatures(), None)) + probe.send(peerConnection, PeerConnection.InitializeConnection(peer.ref, nodeParams.chainHash, nodeParams.features.initFeatures(), doSync = true, None)) probe.expectTerminated(peerConnection, nodeParams.peerConnectionConf.initTimeout / transport.testKitSettings.TestTimeFactor + 1.second) // we don't want dilated time here origin.expectMsg(PeerConnection.ConnectionResult.InitializationFailed("initialization timed out")) peer.expectMsg(ConnectionDown(peerConnection)) @@ -143,7 +147,7 @@ class PeerConnectionSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike wi probe.watch(transport.ref) probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = Some(origin.ref), transport_opt = Some(transport.ref), isPersistent = true)) transport.send(peerConnection, TransportHandler.HandshakeCompleted(remoteNodeId)) - probe.send(peerConnection, PeerConnection.InitializeConnection(peer.ref, nodeParams.chainHash, nodeParams.features.initFeatures(), None)) + probe.send(peerConnection, PeerConnection.InitializeConnection(peer.ref, nodeParams.chainHash, nodeParams.features.initFeatures(), doSync = true, None)) transport.expectMsgType[TransportHandler.Listener] transport.expectMsgType[protocol.Init] transport.send(peerConnection, LightningMessageCodecs.initCodec.decode(hex"0000 00050100000000".bits).require.value) @@ -160,7 +164,7 @@ class PeerConnectionSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike wi probe.watch(transport.ref) probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = Some(origin.ref), transport_opt = Some(transport.ref), isPersistent = true)) transport.send(peerConnection, TransportHandler.HandshakeCompleted(remoteNodeId)) - probe.send(peerConnection, PeerConnection.InitializeConnection(peer.ref, nodeParams.chainHash, nodeParams.features.initFeatures(), None)) + probe.send(peerConnection, PeerConnection.InitializeConnection(peer.ref, nodeParams.chainHash, nodeParams.features.initFeatures(), doSync = true, None)) transport.expectMsgType[TransportHandler.Listener] transport.expectMsgType[protocol.Init] transport.send(peerConnection, LightningMessageCodecs.initCodec.decode(hex"00050100000000 0000".bits).require.value) @@ -177,7 +181,7 @@ class PeerConnectionSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike wi probe.watch(transport.ref) probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = Some(origin.ref), transport_opt = Some(transport.ref), isPersistent = true)) transport.send(peerConnection, TransportHandler.HandshakeCompleted(remoteNodeId)) - probe.send(peerConnection, PeerConnection.InitializeConnection(peer.ref, nodeParams.chainHash, nodeParams.features.initFeatures(), None)) + probe.send(peerConnection, PeerConnection.InitializeConnection(peer.ref, nodeParams.chainHash, nodeParams.features.initFeatures(), doSync = true, None)) transport.expectMsgType[TransportHandler.Listener] transport.expectMsgType[protocol.Init] // remote activated MPP but forgot payment secret @@ -195,7 +199,7 @@ class PeerConnectionSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike wi probe.watch(transport.ref) probe.send(peerConnection, PeerConnection.PendingAuth(connection.ref, Some(remoteNodeId), address, origin_opt = Some(origin.ref), transport_opt = Some(transport.ref), isPersistent = true)) transport.send(peerConnection, TransportHandler.HandshakeCompleted(remoteNodeId)) - probe.send(peerConnection, PeerConnection.InitializeConnection(peer.ref, nodeParams.chainHash, nodeParams.features.initFeatures(), None)) + probe.send(peerConnection, PeerConnection.InitializeConnection(peer.ref, nodeParams.chainHash, nodeParams.features.initFeatures(), doSync = true, None)) transport.expectMsgType[TransportHandler.Listener] transport.expectMsgType[protocol.Init] transport.send(peerConnection, protocol.Init(Bob.nodeParams.features.initFeatures(), TlvStream(InitTlv.Networks(Block.LivenetGenesisBlock.hash :: Block.SignetGenesisBlock.hash :: Nil)))) @@ -208,7 +212,7 @@ class PeerConnectionSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike wi test("sync when requested") { f => import f._ val remoteInit = protocol.Init(Features(ChannelRangeQueries -> Optional, VariableLengthOnion -> Mandatory, PaymentSecret -> Mandatory, StaticRemoteKey -> Mandatory)) - connect(nodeParams, remoteNodeId, switchboard, router, connection, transport, peerConnection, peer, remoteInit) + connect(nodeParams, remoteNodeId, switchboard, router, connection, transport, peerConnection, peer, remoteInit, doSync = true) } test("reply to ping") { f => diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/io/SwitchboardSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/io/SwitchboardSpec.scala index 368a336137..10725dc2dc 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/io/SwitchboardSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/io/SwitchboardSpec.scala @@ -104,6 +104,64 @@ class SwitchboardSpec extends TestKitBaseClass with AnyFunSuiteLike { peer.expectMsg(Peer.Disconnect(remoteNodeId)) } + def sendFeatures(nodeParams: NodeParams, channels: Seq[PersistentChannelData], remoteNodeId: PublicKey, expectedFeatures: Features[InitFeature], expectedSync: Boolean): Unit = { + val (probe, peer, peerConnection) = (TestProbe(), TestProbe(), TestProbe()) + val switchboard = TestActorRef(new Switchboard(nodeParams, FakePeerFactory(probe, peer))) + switchboard ! Switchboard.Init(channels) + switchboard ! PeerConnection.Authenticated(peerConnection.ref, remoteNodeId, outgoing = true) + val initConnection = peerConnection.expectMsgType[PeerConnection.InitializeConnection] + assert(initConnection.chainHash == nodeParams.chainHash) + assert(initConnection.features == expectedFeatures) + assert(initConnection.doSync == expectedSync) + } + + test("sync if no whitelist is defined and peer has channels") { + val nodeParams = Alice.nodeParams.copy(routerConf = Alice.nodeParams.routerConf.copy(syncConf = Alice.nodeParams.routerConf.syncConf.copy(whitelist = Set.empty))) + val remoteNodeId = ChannelCodecsSpec.normal.commitments.remoteNodeId + sendFeatures(nodeParams, List(ChannelCodecsSpec.normal), remoteNodeId, nodeParams.features.initFeatures(), expectedSync = true) + } + + test("sync if no whitelist is defined and peer creates a channel") { + val nodeParams = Alice.nodeParams.copy(routerConf = Alice.nodeParams.routerConf.copy(syncConf = Alice.nodeParams.routerConf.syncConf.copy(whitelist = Set.empty))) + val (probe, peer, peerConnection) = (TestProbe(), TestProbe(), TestProbe()) + val remoteNodeId = ChannelCodecsSpec.normal.commitments.remoteNodeId + val switchboard = TestActorRef(new Switchboard(nodeParams, FakePeerFactory(probe, peer))) + switchboard ! Switchboard.Init(Nil) + + // We have a channel with our peer, so we trigger a sync when connecting. + switchboard ! ChannelIdAssigned(TestProbe().ref, remoteNodeId, randomBytes32(), randomBytes32()) + switchboard ! PeerConnection.Authenticated(peerConnection.ref, remoteNodeId, outgoing = true) + val initConnection1 = peerConnection.expectMsgType[PeerConnection.InitializeConnection] + assert(initConnection1.chainHash == nodeParams.chainHash) + assert(initConnection1.features == nodeParams.features.initFeatures()) + assert(initConnection1.doSync) + + // We don't have channels with our peer, so we won't trigger a sync when connecting. + switchboard ! LastChannelClosed(peer.ref, remoteNodeId) + switchboard ! PeerConnection.Authenticated(peerConnection.ref, remoteNodeId, outgoing = true) + val initConnection2 = peerConnection.expectMsgType[PeerConnection.InitializeConnection] + assert(initConnection2.chainHash == nodeParams.chainHash) + assert(initConnection2.features == nodeParams.features.initFeatures()) + assert(!initConnection2.doSync) + } + + test("don't sync if no whitelist is defined and peer does not have channels") { + val nodeParams = Alice.nodeParams.copy(routerConf = Alice.nodeParams.routerConf.copy(syncConf = Alice.nodeParams.routerConf.syncConf.copy(whitelist = Set.empty))) + sendFeatures(nodeParams, Nil, randomKey().publicKey, nodeParams.features.initFeatures(), expectedSync = false) + } + + test("sync if whitelist contains peer") { + val remoteNodeId = randomKey().publicKey + val nodeParams = Alice.nodeParams.copy(routerConf = Alice.nodeParams.routerConf.copy(syncConf = Alice.nodeParams.routerConf.syncConf.copy(whitelist = Set(remoteNodeId, randomKey().publicKey, randomKey().publicKey)))) + sendFeatures(nodeParams, Nil, remoteNodeId, nodeParams.features.initFeatures(), expectedSync = true) + } + + test("don't sync if whitelist doesn't contain peer") { + val nodeParams = Alice.nodeParams.copy(routerConf = Alice.nodeParams.routerConf.copy(syncConf = Alice.nodeParams.routerConf.syncConf.copy(whitelist = Set(randomKey().publicKey, randomKey().publicKey, randomKey().publicKey)))) + val remoteNodeId = ChannelCodecsSpec.normal.commitments.remoteNodeId + sendFeatures(nodeParams, List(ChannelCodecsSpec.normal), remoteNodeId, nodeParams.features.initFeatures(), expectedSync = false) + } + test("get peer info") { val (probe, peer) = (TestProbe(), TestProbe()) val switchboard = TestActorRef(new Switchboard(Alice.nodeParams, FakePeerFactory(probe, peer))) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/router/RoutingSyncSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/router/RoutingSyncSpec.scala index 5778ada219..67dec0da40 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/router/RoutingSyncSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/router/RoutingSyncSpec.scala @@ -20,14 +20,12 @@ import akka.actor.typed.scaladsl.adapter.actorRefAdapter import akka.actor.{Actor, Props} import akka.testkit.{TestFSMRef, TestProbe} import fr.acinq.bitcoin.scalacompat.Crypto.{PrivateKey, PublicKey} -import fr.acinq.bitcoin.scalacompat.{Block, ByteVector32, ByteVector64, Satoshi, Script, Transaction, TxId, TxIn, TxOut} +import fr.acinq.bitcoin.scalacompat.{Block, ByteVector32, Satoshi, Script, Transaction, TxId, TxIn, TxOut} import fr.acinq.eclair.TestConstants.{Alice, Bob} import fr.acinq.eclair.RealShortChannelId -import fr.acinq.eclair.TestDatabases.sqliteInMemory import fr.acinq.eclair._ import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher.{UtxoStatus, ValidateRequest, ValidateResult} import fr.acinq.eclair.crypto.TransportHandler -import fr.acinq.eclair.db.Databases import fr.acinq.eclair.io.Peer.PeerRoutingMessage import fr.acinq.eclair.router.Announcements.{makeChannelUpdate, makeNodeAnnouncement} import fr.acinq.eclair.router.BaseRouterSpec.channelAnnouncement @@ -86,7 +84,7 @@ class RoutingSyncSpec extends TestKitBaseClass with AnyFunSuiteLike with Paralle } val srcId = src.underlyingActor.nodeParams.nodeId val tgtId = tgt.underlyingActor.nodeParams.nodeId - sender.send(src, SendChannelQuery(src.underlyingActor.nodeParams.chainHash, tgtId, pipe.ref, isReconnection = true, extendedQueryFlags_opt)) + sender.send(src, SendChannelQuery(src.underlyingActor.nodeParams.chainHash, tgtId, pipe.ref, replacePrevious = true, extendedQueryFlags_opt)) // src sends a query_channel_range to bob val qcr = pipe.expectMsgType[QueryChannelRange] pipe.send(tgt, PeerRoutingMessage(pipe.ref, srcId, qcr)) @@ -136,7 +134,7 @@ class RoutingSyncSpec extends TestKitBaseClass with AnyFunSuiteLike with Paralle test("sync with standard channel queries") { val watcher = system.actorOf(Props(new YesWatcher())) - val alice = TestFSMRef(new Router(Alice.nodeParams.copy(routerConf = Alice.nodeParams.routerConf.copy(syncConf = Alice.nodeParams.routerConf.syncConf.copy(whitelist = Set(Bob.nodeParams.nodeId)))), watcher)) + val alice = TestFSMRef(new Router(Alice.nodeParams, watcher)) val bob = TestFSMRef(new Router(Bob.nodeParams, watcher)) val charlieId = randomKey().publicKey val sender = TestProbe() @@ -185,7 +183,7 @@ class RoutingSyncSpec extends TestKitBaseClass with AnyFunSuiteLike with Paralle def syncWithExtendedQueries(requestNodeAnnouncements: Boolean): Unit = { val watcher = system.actorOf(Props(new YesWatcher())) - val alice = TestFSMRef(new Router(Alice.nodeParams.copy(routerConf = Alice.nodeParams.routerConf.copy(syncConf = Alice.nodeParams.routerConf.syncConf.copy(requestNodeAnnouncements = requestNodeAnnouncements, whitelist = Set(Bob.nodeParams.nodeId)))), watcher)) + val alice = TestFSMRef(new Router(Alice.nodeParams.copy(routerConf = Alice.nodeParams.routerConf.copy(syncConf = Alice.nodeParams.routerConf.syncConf.copy(requestNodeAnnouncements = requestNodeAnnouncements))), watcher)) val bob = TestFSMRef(new Router(Bob.nodeParams, watcher)) val charlieId = randomKey().publicKey val sender = TestProbe() @@ -253,7 +251,7 @@ class RoutingSyncSpec extends TestKitBaseClass with AnyFunSuiteLike with Paralle } test("reset sync state on reconnection") { - val params = Alice.nodeParams.copy(routerConf = Alice.nodeParams.routerConf.copy(syncConf = Alice.nodeParams.routerConf.syncConf.copy(whitelist = Set(Bob.nodeParams.nodeId)))) + val params = TestConstants.Alice.nodeParams val router = TestFSMRef(new Router(params, TestProbe().ref)) val peerConnection = TestProbe() peerConnection.ignoreMsg { case _: TransportHandler.ReadAck => true } @@ -263,13 +261,13 @@ class RoutingSyncSpec extends TestKitBaseClass with AnyFunSuiteLike with Paralle assert(!router.stateData.sync.contains(remoteNodeId)) // ask router to send a channel range query - sender.send(router, SendChannelQuery(params.chainHash, remoteNodeId, sender.ref, isReconnection = true, None)) + sender.send(router, SendChannelQuery(params.chainHash, remoteNodeId, sender.ref, replacePrevious = true, None)) val QueryChannelRange(chainHash, firstBlockNum, numberOfBlocks, _) = sender.expectMsgType[QueryChannelRange] sender.expectMsgType[GossipTimestampFilter] assert(router.stateData.sync.get(remoteNodeId).contains(Syncing(Nil, 0))) // ask router to send another channel range query - sender.send(router, SendChannelQuery(params.chainHash, remoteNodeId, sender.ref, isReconnection = false, None)) + sender.send(router, SendChannelQuery(params.chainHash, remoteNodeId, sender.ref, replacePrevious = false, None)) sender.expectNoMessage(100 millis) // it's a duplicate and should be ignored assert(router.stateData.sync.get(remoteNodeId).contains(Syncing(Nil, 0))) @@ -286,7 +284,7 @@ class RoutingSyncSpec extends TestKitBaseClass with AnyFunSuiteLike with Paralle assert(sync.totalQueries == 1) // simulate a re-connection - sender.send(router, SendChannelQuery(params.chainHash, remoteNodeId, sender.ref, isReconnection = true, None)) + sender.send(router, SendChannelQuery(params.chainHash, remoteNodeId, sender.ref, replacePrevious = true, None)) sender.expectMsgType[QueryChannelRange] sender.expectMsgType[GossipTimestampFilter] assert(router.stateData.sync.get(remoteNodeId).contains(Syncing(Nil, 0))) @@ -330,47 +328,6 @@ class RoutingSyncSpec extends TestKitBaseClass with AnyFunSuiteLike with Paralle .updated(nodeIdB, sync2(nodeIdB).copy(remainingQueries = List(req))) assert(syncProgress(sync3) == SyncProgress(0.875D)) } - - test("do not send channel query to unknown peer") { - val watcher = system.actorOf(Props(new YesWatcher())) - val alice = TestFSMRef(new Router(Alice.nodeParams, watcher)) - val peerConnection = TestProbe() - alice ! SendChannelQuery(Alice.nodeParams.chainHash, Bob.nodeParams.nodeId, peerConnection.ref, isReconnection = true, None) - peerConnection.expectNoMessage() - } - - test("send channel query to peer with top capacity but not to second best") { - // Adding a channel with Bob and a larger one with Carol - val carol = randomKey().publicKey - val connection = sqliteInMemory() - val dbs = Databases.SqliteDatabases(connection, connection, connection, jdbcUrlFile_opt = None) - val sig = ByteVector64.Zeroes - val txid = TxId.fromValidHex("0001" * 16) - val cid1 = RealShortChannelId(Alice.nodeParams.currentBlockHeight, 1, 1) - val ann1 = Announcements.makeChannelAnnouncement(Block.RegtestGenesisBlock.hash, cid1, Alice.nodeParams.nodeId, Bob.nodeParams.nodeId, randomKey().publicKey, randomKey().publicKey, sig, sig, sig, sig) - dbs.network.addChannel(ann1, txid, Satoshi(123)) - val cid2 = RealShortChannelId(Alice.nodeParams.currentBlockHeight, 1, 2) - val ann2 = Announcements.makeChannelAnnouncement(Block.RegtestGenesisBlock.hash, cid2, Alice.nodeParams.nodeId, carol, randomKey().publicKey, randomKey().publicKey, sig, sig, sig, sig) - dbs.network.addChannel(ann2, txid, Satoshi(4567)) - - val watcher = system.actorOf(Props(new YesWatcher())) - val onlyTopPeer = Alice.nodeParams.copy(db = dbs, routerConf = Alice.nodeParams.routerConf.copy(syncConf = Alice.nodeParams.routerConf.syncConf.copy(peerLimit = 1))) - val alice = TestFSMRef(new Router(onlyTopPeer, watcher)) - val peerConnection = TestProbe() - alice ! SendChannelQuery(Alice.nodeParams.chainHash, Bob.nodeParams.nodeId, peerConnection.ref, isReconnection = true, None) - peerConnection.expectNoMessage() - alice ! SendChannelQuery(Alice.nodeParams.chainHash, carol, peerConnection.ref, isReconnection = true, None) - peerConnection.expectMsgType[QueryChannelRange] - } - - test("send channel query to whitelisted peer") { - val watcher = system.actorOf(Props(new YesWatcher())) - val withWhitelistedBob = Alice.nodeParams.copy(routerConf = Alice.nodeParams.routerConf.copy(syncConf = Alice.nodeParams.routerConf.syncConf.copy(whitelist = Set(Bob.nodeParams.nodeId)))) - val alice = TestFSMRef(new Router(withWhitelistedBob, watcher)) - val peerConnection = TestProbe() - alice ! SendChannelQuery(Alice.nodeParams.chainHash, Bob.nodeParams.nodeId, peerConnection.ref, isReconnection = true, None) - peerConnection.expectMsgType[QueryChannelRange] - } } object RoutingSyncSpec {