Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
thomash-acinq committed Jan 27, 2025
1 parent d6d495b commit 8ff8186
Show file tree
Hide file tree
Showing 13 changed files with 128 additions and 112 deletions.
2 changes: 2 additions & 0 deletions docs/release-notes/eclair-vnext.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ There are many organisations that package Java runtimes and development kits, fo

### Miscellaneous improvements and bug fixes

#### Gossip sync limits

On reconnection, eclair now only synchronizes its routing table with a small number of top peers instead of synchronizing with every peer.
If you already use `sync-whitelist`, the default behavior has been modified and you must set `router.sync.peer-limit = 0` to keep preventing any synchronization with other nodes.
You must also use `router.sync.whitelist` instead of `sync-whitelist`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -656,9 +656,9 @@ object NodeParams extends Logging {
watchSpentWindow = watchSpentWindow,
channelExcludeDuration = FiniteDuration(config.getDuration("router.channel-exclude-duration").getSeconds, TimeUnit.SECONDS),
routerBroadcastInterval = FiniteDuration(config.getDuration("router.broadcast-interval").getSeconds, TimeUnit.SECONDS),
encodingType = EncodingType.UNCOMPRESSED,
syncConf = Router.SyncConf(
requestNodeAnnouncements = config.getBoolean("router.sync.request-node-announcements"),
encodingType = EncodingType.UNCOMPRESSED,
channelRangeChunkSize = config.getInt("router.sync.channel-range-chunk-size"),
channelQueryChunkSize = config.getInt("router.sync.channel-query-chunk-size"),
peerLimit = config.getInt("router.sync.peer-limit"),
Expand Down
2 changes: 1 addition & 1 deletion eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ class Peer(val nodeParams: NodeParams,
log.info(s"channel id switch: previousId=$temporaryChannelId nextId=$channelId")
// we have our first channel with that peer: let's sync our routing table
if (!d.channels.keys.exists(_.isInstanceOf[FinalChannelId])) {
d.peerConnection ! PeerConnection.DoSync(isReconnection = false)
d.peerConnection ! PeerConnection.DoSync(replacePrevious = false)
}
// NB: we keep the temporary channel id because the switch is not always acknowledged at this point (see https://github.com/lightningnetwork/lightning-rfc/pull/151)
// we won't clean it up, but we won't remember the temporary id on channel termination
Expand Down
21 changes: 13 additions & 8 deletions eclair-core/src/main/scala/fr/acinq/eclair/io/PeerConnection.scala
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ class PeerConnection(keyPair: KeyPair, conf: PeerConnection.Conf, switchboard: A
}

when(BEFORE_INIT) {
case Event(InitializeConnection(peer, chainHash, localFeatures, fundingRates_opt), d: BeforeInitData) =>
case Event(InitializeConnection(peer, chainHash, localFeatures, doSync, fundingRates_opt), d: BeforeInitData) =>
d.transport ! TransportHandler.Listener(self)
Metrics.PeerConnectionsConnecting.withTag(Tags.ConnectionState, Tags.ConnectionStates.Initializing).increment()
log.debug(s"using features=$localFeatures")
Expand All @@ -120,7 +120,7 @@ class PeerConnection(keyPair: KeyPair, conf: PeerConnection.Conf, switchboard: A
d.transport ! localInit
startSingleTimer(INIT_TIMER, InitTimeout, conf.initTimeout)
unstashAll() // unstash remote init if it already arrived
goto(INITIALIZING) using InitializingData(chainHash, d.pendingAuth, d.remoteNodeId, d.transport, peer, localInit, d.isPersistent)
goto(INITIALIZING) using InitializingData(chainHash, d.pendingAuth, d.remoteNodeId, d.transport, peer, localInit, doSync, d.isPersistent)

case Event(_: protocol.Init, _) =>
log.debug("stashing remote init")
Expand Down Expand Up @@ -160,7 +160,11 @@ class PeerConnection(keyPair: KeyPair, conf: PeerConnection.Conf, switchboard: A
d.peer ! ConnectionReady(self, d.remoteNodeId, d.pendingAuth.address, d.pendingAuth.outgoing, d.localInit, remoteInit)
d.pendingAuth.origin_opt.foreach(_ ! ConnectionResult.Connected(self, d.peer))

self ! DoSync(isReconnection = true)
if (d.doSync) {
self ! DoSync(replacePrevious = true)
} else {
log.info("not syncing with this peer")
}

// we will delay all rebroadcasts with this value in order to prevent herd effects (each peer has a different delay)
val rebroadcastDelay = Random.nextInt(conf.maxRebroadcastDelay.toSeconds.toInt).seconds
Expand Down Expand Up @@ -395,12 +399,13 @@ class PeerConnection(keyPair: KeyPair, conf: PeerConnection.Conf, switchboard: A
}
stay() using d.copy(behavior = behavior1)

case Event(DoSync(isReconnection), d: ConnectedData) =>
case Event(DoSync(replacePrevious), d: ConnectedData) =>
val canUseChannelRangeQueries = Features.canUseFeature(d.localInit.features, d.remoteInit.features, Features.ChannelRangeQueries)
val canUseChannelRangeQueriesEx = Features.canUseFeature(d.localInit.features, d.remoteInit.features, Features.ChannelRangeQueriesExtended)
if (canUseChannelRangeQueries || canUseChannelRangeQueriesEx) {
val flags_opt = if (canUseChannelRangeQueriesEx) Some(QueryChannelRangeTlv.QueryFlags(QueryChannelRangeTlv.QueryFlags.WANT_ALL)) else None
router ! SendChannelQuery(d.chainHash, d.remoteNodeId, self, isReconnection, flags_opt)
log.debug(s"sending sync channel range query with flags_opt=$flags_opt replacePrevious=$replacePrevious")
router ! SendChannelQuery(d.chainHash, d.remoteNodeId, self, replacePrevious, flags_opt)
}
stay()

Expand Down Expand Up @@ -531,7 +536,7 @@ object PeerConnection {
case object SendPing
case object KillIdle
case object ResumeAnnouncements
case class DoSync(isReconnection: Boolean) extends RemoteTypes
case class DoSync(replacePrevious: Boolean) extends RemoteTypes
// @formatter:on

val IGNORE_NETWORK_ANNOUNCEMENTS_PERIOD: FiniteDuration = 5 minutes
Expand All @@ -558,7 +563,7 @@ object PeerConnection {
case object Nothing extends Data
case class AuthenticatingData(pendingAuth: PendingAuth, transport: ActorRef, isPersistent: Boolean) extends Data with HasTransport
case class BeforeInitData(remoteNodeId: PublicKey, pendingAuth: PendingAuth, transport: ActorRef, isPersistent: Boolean) extends Data with HasTransport
case class InitializingData(chainHash: BlockHash, pendingAuth: PendingAuth, remoteNodeId: PublicKey, transport: ActorRef, peer: ActorRef, localInit: protocol.Init, isPersistent: Boolean) extends Data with HasTransport
case class InitializingData(chainHash: BlockHash, pendingAuth: PendingAuth, remoteNodeId: PublicKey, transport: ActorRef, peer: ActorRef, localInit: protocol.Init, doSync: Boolean, isPersistent: Boolean) extends Data with HasTransport
case class ConnectedData(chainHash: BlockHash, remoteNodeId: PublicKey, transport: ActorRef, peer: ActorRef, localInit: protocol.Init, remoteInit: protocol.Init, rebroadcastDelay: FiniteDuration, gossipTimestampFilter: Option[GossipTimestampFilter] = None, behavior: Behavior = Behavior(), expectedPong_opt: Option[ExpectedPong] = None, isPersistent: Boolean) extends Data with HasTransport

case class ExpectedPong(ping: Ping, timestamp: TimestampMilli = TimestampMilli.now())
Expand All @@ -575,7 +580,7 @@ object PeerConnection {
def outgoing: Boolean = remoteNodeId_opt.isDefined // if this is an outgoing connection, we know the node id in advance
}
case class Authenticated(peerConnection: ActorRef, remoteNodeId: PublicKey, outgoing: Boolean) extends RemoteTypes
case class InitializeConnection(peer: ActorRef, chainHash: BlockHash, features: Features[InitFeature], fundingRates_opt: Option[LiquidityAds.WillFundRates]) extends RemoteTypes
case class InitializeConnection(peer: ActorRef, chainHash: BlockHash, features: Features[InitFeature], doSync: Boolean, fundingRates_opt: Option[LiquidityAds.WillFundRates]) extends RemoteTypes
case class ConnectionReady(peerConnection: ActorRef, remoteNodeId: PublicKey, address: NodeAddress, outgoing: Boolean, localInit: protocol.Init, remoteInit: protocol.Init) extends RemoteTypes

sealed trait ConnectionResult extends RemoteTypes
Expand Down
20 changes: 14 additions & 6 deletions eclair-core/src/main/scala/fr/acinq/eclair/io/Switchboard.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import akka.actor.typed.receptionist.{Receptionist, ServiceKey}
import akka.actor.typed.scaladsl.Behaviors
import akka.actor.typed.scaladsl.adapter.{ClassicActorContextOps, ClassicActorRefOps, ClassicActorSystemOps, TypedActorRefOps}
import akka.actor.{Actor, ActorContext, ActorLogging, ActorRef, OneForOneStrategy, Props, Stash, Status, SupervisorStrategy, typed}
import fr.acinq.bitcoin.scalacompat.ByteVector32
import fr.acinq.bitcoin.scalacompat.{ByteVector32, Satoshi}
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.eclair.blockchain.OnchainPubkeyCache
import fr.acinq.eclair.channel.Helpers.Closing
Expand Down Expand Up @@ -69,12 +69,17 @@ class Switchboard(nodeParams: NodeParams, peerFactory: Switchboard.PeerFactory)
}
log.info("restoring {} peer(s) with {} channel(s) and {} peers with pending on-the-fly funding", peersWithChannels.size, channels.size, (peersWithOnTheFlyFunding.keySet -- peersWithChannels.keySet).size)
unstashAll()
context.become(normal(peersWithChannels.keySet))
val peerCapacities = channels.map {
case channelData: ChannelDataWithoutCommitments => (channelData.remoteNodeId, 0L)
case channelData: ChannelDataWithCommitments => (channelData.remoteNodeId, channelData.commitments.capacity.toLong)
}.groupMapReduce[PublicKey, Long](_._1)(_._2)(_ + _)
val topCapacityPeers = peerCapacities.toSeq.sortWith { case ((_, c1), (_, c2)) => c1 > c2 }.take(nodeParams.routerConf.syncConf.peerLimit).map(_._1).toSet
context.become(normal(peersWithChannels.keySet, topCapacityPeers ++ nodeParams.routerConf.syncConf.whitelist))
case _ =>
stash()
}

def normal(peersWithChannels: Set[PublicKey]): Receive = {
def normal(peersWithChannels: Set[PublicKey], syncWhitelist: Set[PublicKey]): Receive = {

case Peer.Connect(publicKey, _, _, _) if publicKey == nodeParams.nodeId =>
sender() ! Status.Failure(new RuntimeException("cannot open connection with oneself"))
Expand Down Expand Up @@ -110,14 +115,17 @@ class Switchboard(nodeParams: NodeParams, peerFactory: Switchboard.PeerFactory)
val peer = createOrGetPeer(authenticated.remoteNodeId, offlineChannels = Set.empty, pendingOnTheFlyFunding = Map.empty)
val features = nodeParams.initFeaturesFor(authenticated.remoteNodeId)
val hasChannels = peersWithChannels.contains(authenticated.remoteNodeId)
authenticated.peerConnection ! PeerConnection.InitializeConnection(peer, nodeParams.chainHash, features, nodeParams.liquidityAdsConfig.rates_opt)
val doSync = syncWhitelist.contains(authenticated.remoteNodeId)
authenticated.peerConnection ! PeerConnection.InitializeConnection(peer, nodeParams.chainHash, features, doSync, nodeParams.liquidityAdsConfig.rates_opt)
if (!hasChannels && !authenticated.outgoing) {
incomingConnectionsTracker ! TrackIncomingConnection(authenticated.remoteNodeId)
}

case ChannelIdAssigned(_, remoteNodeId, _, _) => context.become(normal(peersWithChannels + remoteNodeId))
case ChannelIdAssigned(_, remoteNodeId, _, _) =>
val syncWhitelist1 = if (syncWhitelist.size < nodeParams.routerConf.syncConf.peerLimit) syncWhitelist + remoteNodeId else syncWhitelist
context.become(normal(peersWithChannels + remoteNodeId, syncWhitelist1))

case LastChannelClosed(_, remoteNodeId) => context.become(normal(peersWithChannels - remoteNodeId))
case LastChannelClosed(_, remoteNodeId) => context.become(normal(peersWithChannels - remoteNodeId, syncWhitelist))

case GetPeers => sender() ! context.children.filterNot(_ == incomingConnectionsTracker.toClassic)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,9 @@ object EclairInternalsSerializer {

val syncConfCodec: Codec[Router.SyncConf] = (
("requestNodeAnnouncements" | bool(8)) ::
("encodingType" | discriminated[EncodingType].by(uint8)
.typecase(0, provide(EncodingType.UNCOMPRESSED))
.typecase(1, provide(EncodingType.COMPRESSED_ZLIB))) ::
("channelRangeChunkSize" | int32) ::
("channelQueryChunkSize" | int32) ::
("peerLimit" | int32) ::
Expand All @@ -108,9 +111,6 @@ object EclairInternalsSerializer {
("watchSpentWindow" | finiteDurationCodec) ::
("channelExcludeDuration" | finiteDurationCodec) ::
("routerBroadcastInterval" | finiteDurationCodec) ::
("encodingType" | discriminated[EncodingType].by(uint8)
.typecase(0, provide(EncodingType.UNCOMPRESSED))
.typecase(1, provide(EncodingType.COMPRESSED_ZLIB))) ::
("syncConf" | syncConfCodec) ::
("pathFindingExperimentConf" | pathFindingExperimentConfCodec) ::
("messageRouteParams" | messageRouteParamsCodec) ::
Expand Down Expand Up @@ -161,6 +161,7 @@ object EclairInternalsSerializer {
("peer" | actorRefCodec(system)) ::
("chainHash" | blockHash) ::
("features" | variableSizeBytes(uint16, initFeaturesCodec)) ::
("doSync" | bool(8)) ::
("fundingRates" | optional(bool(8), LiquidityAds.Codecs.willFundRates))).as[PeerConnection.InitializeConnection]

def connectionReadyCodec(system: ExtendedActorSystem): Codec[PeerConnection.ConnectionReady] = (
Expand Down
20 changes: 4 additions & 16 deletions eclair-core/src/main/scala/fr/acinq/eclair/router/Router.scala
Original file line number Diff line number Diff line change
Expand Up @@ -86,16 +86,6 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm
context.system.eventStream.publish(ChannelUpdatesReceived(channels.values.flatMap(pc => pc.update_1_opt ++ pc.update_2_opt ++ Nil)))
context.system.eventStream.publish(NodesDiscovered(nodes))

val peerCapacities = channels.values.map(pc =>
if (pc.nodeId1 == nodeParams.nodeId) {
Some((pc.nodeId2, pc.capacity))
} else if (pc.nodeId2 == nodeParams.nodeId) {
Some((pc.nodeId1, pc.capacity))
} else {
None
}).flatten.groupMapReduce(_._1)(_._2)(_ + _)
val topCapacityPeers = peerCapacities.toSeq.sortWith { case ((_, c1), (_, c2)) => c1 > c2 }.take(nodeParams.routerConf.syncConf.peerLimit).map(_._1).toSet

// watch the funding tx of all these channels
// note: some of them may already have been spent, in that case we will receive the watch event immediately
(channels.values ++ pruned.values).foreach { pc =>
Expand Down Expand Up @@ -124,8 +114,7 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm
excludedChannels = Map.empty,
graphWithBalances = GraphWithBalanceEstimates(graph, nodeParams.routerConf.balanceEstimateHalfLife),
sync = Map.empty,
spentChannels = Map.empty,
topCapacityPeers = topCapacityPeers)
spentChannels = Map.empty)
startWith(NORMAL, data)
}

Expand Down Expand Up @@ -308,7 +297,7 @@ class Router(val nodeParams: NodeParams, watcher: typed.ActorRef[ZmqWatcher.Comm
stay() using Validation.handleAvailableBalanceChanged(d, e)

case Event(s: SendChannelQuery, d) =>
stay() using Sync.handleSendChannelQuery(nodeParams.routerConf.syncConf, d, s)
stay() using Sync.handleSendChannelQuery(d, s)

case Event(PeerRoutingMessage(peerConnection, remoteNodeId, q: QueryChannelRange), d) =>
Sync.handleQueryChannelRange(d.channels, nodeParams.routerConf, RemoteGossip(peerConnection, remoteNodeId), q)
Expand Down Expand Up @@ -380,6 +369,7 @@ object Router {
}

case class SyncConf(requestNodeAnnouncements: Boolean,
encodingType: EncodingType,
channelRangeChunkSize: Int,
channelQueryChunkSize: Int,
peerLimit: Int,
Expand All @@ -391,7 +381,6 @@ object Router {
case class RouterConf(watchSpentWindow: FiniteDuration,
channelExcludeDuration: FiniteDuration,
routerBroadcastInterval: FiniteDuration,
encodingType: EncodingType,
syncConf: SyncConf,
pathFindingExperimentConf: PathFindingExperimentConf,
messageRouteParams: MessageRouteParams,
Expand Down Expand Up @@ -715,7 +704,7 @@ object Router {
// @formatter:on

// @formatter:off
case class SendChannelQuery(chainHash: BlockHash, remoteNodeId: PublicKey, to: ActorRef, isReconnection: Boolean, flags_opt: Option[QueryChannelRangeTlv]) extends RemoteTypes
case class SendChannelQuery(chainHash: BlockHash, remoteNodeId: PublicKey, to: ActorRef, replacePrevious: Boolean, flags_opt: Option[QueryChannelRangeTlv]) extends RemoteTypes
case object GetRoutingState
case class RoutingState(channels: Iterable[PublicChannel], nodes: Iterable[NodeAnnouncement])
case object GetRoutingStateStreaming extends RemoteTypes
Expand Down Expand Up @@ -784,7 +773,6 @@ object Router {
graphWithBalances: GraphWithBalanceEstimates,
sync: Map[PublicKey, Syncing], // keep tracks of channel range queries sent to each peer. If there is an entry in the map, it means that there is an ongoing query for which we have not yet received an 'end' message
spentChannels: Map[TxId, RealShortChannelId], // transactions that spend funding txs that are not yet deeply buried
topCapacityPeers: Set[PublicKey],
) {
def resolve(scid: ShortChannelId): Option[KnownChannel] = {
// let's assume this is a real scid
Expand Down
19 changes: 6 additions & 13 deletions eclair-core/src/main/scala/fr/acinq/eclair/router/Sync.scala
Original file line number Diff line number Diff line change
Expand Up @@ -42,21 +42,13 @@ object Sync {
// block almost never exceeds 2800 so this should very rarely be limiting
val MAXIMUM_CHUNK_SIZE = 2700

def handleSendChannelQuery(conf: SyncConf, d: Data, s: SendChannelQuery)(implicit ctx: ActorContext, log: LoggingAdapter): Data = {
// TODO: check that s.remoteNodeId is eligible for sync
def handleSendChannelQuery(d: Data, s: SendChannelQuery)(implicit ctx: ActorContext, log: LoggingAdapter): Data = {
implicit val sender: ActorRef = ctx.self // necessary to preserve origin when sending messages to other actors
// we currently send query_channel_range when:
// * we just reconnected to a peer with whom we have channels
// * we just (re)connected to a peer with whom we have channels
// * we validate our first channel with a peer
val shouldSync = if (s.isReconnection) {
conf.whitelist.contains(s.remoteNodeId) || d.topCapacityPeers.contains(s.remoteNodeId)
} else if (d.sync.contains(s.remoteNodeId)) { // we must ensure we don't send a new query_channel_range while another query is still in progress
log.debug("not sending query_channel_range: sync already in progress")
false
} else {
true
}
if (shouldSync) {
// we must ensure we don't send a new query_channel_range while another query is still in progress
if (s.replacePrevious || !d.sync.contains(s.remoteNodeId)) {
// ask for everything
val query = QueryChannelRange(s.chainHash, firstBlock = BlockHeight(0), numberOfBlocks = Int.MaxValue.toLong, TlvStream(s.flags_opt.toSet))
log.debug("sending query_channel_range={}", query)
Expand All @@ -72,6 +64,7 @@ object Sync {
// reset our sync state for this peer: we create an entry to ensure we reject duplicate queries and unsolicited reply_channel_range
d.copy(sync = d.sync + (s.remoteNodeId -> Syncing(Nil, 0)))
} else {
log.debug("not sending query_channel_range: sync already in progress")
d
}
}
Expand All @@ -88,7 +81,7 @@ object Sync {
Metrics.QueryChannelRange.Replies.withoutTags().record(chunks.size)
chunks.zipWithIndex.foreach { case (chunk, i) =>
val syncComplete = i == chunks.size - 1
val reply = buildReplyChannelRange(chunk, syncComplete, q.chainHash, routerConf.encodingType, q.queryFlags_opt, channels)
val reply = buildReplyChannelRange(chunk, syncComplete, q.chainHash, routerConf.syncConf.encodingType, q.queryFlags_opt, channels)
origin.peerConnection ! reply
Metrics.ReplyChannelRange.Blocks.withTag(Tags.Direction, Tags.Directions.Outgoing).record(reply.numberOfBlocks)
Metrics.ReplyChannelRange.ShortChannelIds.withTag(Tags.Direction, Tags.Directions.Outgoing).record(reply.shortChannelIds.array.size)
Expand Down
Loading

0 comments on commit 8ff8186

Please sign in to comment.