Skip to content

Commit

Permalink
Address @thomash-acinq comments
Browse files Browse the repository at this point in the history
- Use separate function when updating features only
- Remove DB write before channel ready for incoming connection
  • Loading branch information
t-bast committed Jan 30, 2025
1 parent 5df3734 commit aacfecc
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 86 deletions.
15 changes: 10 additions & 5 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import fr.acinq.eclair.payment._
import fr.acinq.eclair.payment.relay.OnTheFlyFunding
import fr.acinq.eclair.payment.relay.Relayer.RelayFees
import fr.acinq.eclair.router.Router
import fr.acinq.eclair.wire.protocol.{ChannelAnnouncement, ChannelUpdate, NodeAnnouncement, NodeInfo}
import fr.acinq.eclair.{CltvExpiry, MilliSatoshi, Paginated, RealShortChannelId, ShortChannelId, TimestampMilli, TimestampSecond}
import fr.acinq.eclair.wire.protocol._
import fr.acinq.eclair.{CltvExpiry, Features, InitFeature, MilliSatoshi, Paginated, RealShortChannelId, ShortChannelId, TimestampMilli, TimestampSecond}
import grizzled.slf4j.Logging
import scodec.bits.ByteVector

Expand Down Expand Up @@ -264,9 +264,14 @@ case class DualPeersDb(primary: PeersDb, secondary: PeersDb) extends PeersDb {

private implicit val ec: ExecutionContext = ExecutionContext.fromExecutor(Executors.newSingleThreadExecutor(new ThreadFactoryBuilder().setNameFormat("db-peers").build()))

override def addOrUpdatePeer(nodeId: Crypto.PublicKey, nodeInfo: NodeInfo): Unit = {
runAsync(secondary.addOrUpdatePeer(nodeId, nodeInfo))
primary.addOrUpdatePeer(nodeId, nodeInfo)
override def addOrUpdatePeer(nodeId: Crypto.PublicKey, address: NodeAddress, features: Features[InitFeature]): Unit = {
runAsync(secondary.addOrUpdatePeer(nodeId, address, features))
primary.addOrUpdatePeer(nodeId, address, features)
}

override def addOrUpdatePeerFeatures(nodeId: Crypto.PublicKey, features: Features[InitFeature]): Unit = {
runAsync(secondary.addOrUpdatePeerFeatures(nodeId, features))
primary.addOrUpdatePeerFeatures(nodeId, features)
}

override def removePeer(nodeId: Crypto.PublicKey): Unit = {
Expand Down
8 changes: 6 additions & 2 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/PeersDb.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,19 @@
package fr.acinq.eclair.db

import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.eclair.{Features, InitFeature, TimestampSecond}
import fr.acinq.eclair.payment.relay.Relayer.RelayFees
import fr.acinq.eclair.wire.protocol.{NodeAddress, NodeInfo}
import fr.acinq.eclair.{Features, InitFeature, TimestampSecond}
import scodec.bits.ByteVector

/** The PeersDb contains information about our direct peers, with whom we have or had channels. */
trait PeersDb {

def addOrUpdatePeer(nodeId: PublicKey, nodeInfo: NodeInfo): Unit
/** Update our DB with a verified address and features for the given peer. */
def addOrUpdatePeer(nodeId: PublicKey, address: NodeAddress, features: Features[InitFeature]): Unit

/** Update our DB with the features for the given peer, without updating its address. */
def addOrUpdatePeerFeatures(nodeId: PublicKey, features: Features[InitFeature]): Unit

def removePeer(nodeId: PublicKey): Unit

Expand Down
63 changes: 32 additions & 31 deletions eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgPeersDb.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import fr.acinq.eclair.db.PeersDb
import fr.acinq.eclair.db.pg.PgUtils.PgLock
import fr.acinq.eclair.payment.relay.Relayer.RelayFees
import fr.acinq.eclair.wire.protocol._
import fr.acinq.eclair.{Features, MilliSatoshi, TimestampSecond}
import fr.acinq.eclair.{Features, InitFeature, MilliSatoshi, TimestampSecond}
import grizzled.slf4j.Logging
import scodec.bits.ByteVector

Expand Down Expand Up @@ -95,37 +95,38 @@ class PgPeersDb(implicit ds: DataSource, lock: PgLock) extends PeersDb with Logg
}
}

override def addOrUpdatePeer(nodeId: Crypto.PublicKey, nodeInfo: NodeInfo): Unit = withMetrics("peers/add-or-update", DbBackends.Postgres) {
override def addOrUpdatePeer(nodeId: Crypto.PublicKey, address: NodeAddress, features: Features[InitFeature]): Unit = withMetrics("peers/add-or-update", DbBackends.Postgres) {
withLock { pg =>
nodeInfo.address_opt match {
case Some(address) =>
val encodedAddress = CommonCodecs.nodeaddress.encode(address).require.toByteArray
val encodedFeatures = CommonCodecs.initFeaturesCodec.encode(nodeInfo.features).require.toByteArray
using(pg.prepareStatement(
"""
| INSERT INTO local.peers (node_id, node_address, node_features)
| VALUES (?, ?, ?)
| ON CONFLICT (node_id)
| DO UPDATE SET node_address = EXCLUDED.node_address, node_features = EXCLUDED.node_features
|""".stripMargin)) { statement =>
statement.setString(1, nodeId.value.toHex)
statement.setBytes(2, encodedAddress)
statement.setBytes(3, encodedFeatures)
statement.executeUpdate()
}
case None =>
val encodedFeatures = CommonCodecs.initFeaturesCodec.encode(nodeInfo.features).require.toByteArray
using(pg.prepareStatement(
"""
| INSERT INTO local.peers (node_id, node_address, node_features)
| VALUES (?, NULL, ?)
| ON CONFLICT (node_id)
| DO UPDATE SET node_features = EXCLUDED.node_features
|""".stripMargin)) { statement =>
statement.setString(1, nodeId.value.toHex)
statement.setBytes(2, encodedFeatures)
statement.executeUpdate()
}
val encodedAddress = CommonCodecs.nodeaddress.encode(address).require.toByteArray
val encodedFeatures = CommonCodecs.initFeaturesCodec.encode(features).require.toByteArray
using(pg.prepareStatement(
"""
| INSERT INTO local.peers (node_id, node_address, node_features)
| VALUES (?, ?, ?)
| ON CONFLICT (node_id)
| DO UPDATE SET node_address = EXCLUDED.node_address, node_features = EXCLUDED.node_features
|""".stripMargin)) { statement =>
statement.setString(1, nodeId.value.toHex)
statement.setBytes(2, encodedAddress)
statement.setBytes(3, encodedFeatures)
statement.executeUpdate()
}
}
}

override def addOrUpdatePeerFeatures(nodeId: Crypto.PublicKey, features: Features[InitFeature]): Unit = withMetrics("peers/add-or-update", DbBackends.Postgres) {
withLock { pg =>
val encodedFeatures = CommonCodecs.initFeaturesCodec.encode(features).require.toByteArray
using(pg.prepareStatement(
"""
| INSERT INTO local.peers (node_id, node_address, node_features)
| VALUES (?, NULL, ?)
| ON CONFLICT (node_id)
| DO UPDATE SET node_features = EXCLUDED.node_features
|""".stripMargin)) { statement =>
statement.setString(1, nodeId.value.toHex)
statement.setBytes(2, encodedFeatures)
statement.executeUpdate()
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,15 +18,15 @@ package fr.acinq.eclair.db.sqlite

import fr.acinq.bitcoin.scalacompat.Crypto
import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey
import fr.acinq.eclair.{Features, InitFeature, MilliSatoshi, TimestampSecond}
import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics
import fr.acinq.eclair.db.Monitoring.Tags.DbBackends
import fr.acinq.eclair.db.PeersDb
import fr.acinq.eclair.db.sqlite.SqliteUtils.{getVersion, setVersion, using}
import fr.acinq.eclair.payment.relay.Relayer.RelayFees
import fr.acinq.eclair.wire.protocol._
import fr.acinq.eclair.{Features, InitFeature, MilliSatoshi, TimestampSecond}
import grizzled.slf4j.Logging
import scodec.bits.{BitVector, ByteVector}
import scodec.bits.ByteVector

import java.sql.{Connection, Statement}

Expand Down Expand Up @@ -82,30 +82,35 @@ class SqlitePeersDb(val sqlite: Connection) extends PeersDb with Logging {
setVersion(statement, DB_NAME, CURRENT_VERSION)
}

override def addOrUpdatePeer(nodeId: Crypto.PublicKey, nodeInfo: NodeInfo): Unit = withMetrics("peers/add-or-update", DbBackends.Sqlite) {
val encodedFeatures = CommonCodecs.initFeaturesCodec.encode(nodeInfo.features).require.toByteArray
val encodedAddress_opt = nodeInfo.address_opt.map(address => CommonCodecs.nodeaddress.encode(address).require.toByteArray)
val unknownPeer = encodedAddress_opt match {
case Some(encodedAddress) =>
using(sqlite.prepareStatement("UPDATE peers SET node_address=?, node_features=? WHERE node_id=?")) { update =>
update.setBytes(1, encodedAddress)
update.setBytes(2, encodedFeatures)
update.setBytes(3, nodeId.value.toArray)
update.executeUpdate() == 0
}
case None =>
using(sqlite.prepareStatement("UPDATE peers SET node_features=? WHERE node_id=?")) { update =>
update.setBytes(1, encodedFeatures)
update.setBytes(2, nodeId.value.toArray)
update.executeUpdate() == 0
override def addOrUpdatePeer(nodeId: Crypto.PublicKey, address: NodeAddress, features: Features[InitFeature]): Unit = withMetrics("peers/add-or-update", DbBackends.Sqlite) {
val encodedFeatures = CommonCodecs.initFeaturesCodec.encode(features).require.toByteArray
val encodedAddress = CommonCodecs.nodeaddress.encode(address).require.toByteArray
using(sqlite.prepareStatement("UPDATE peers SET node_address=?, node_features=? WHERE node_id=?")) { update =>
update.setBytes(1, encodedAddress)
update.setBytes(2, encodedFeatures)
update.setBytes(3, nodeId.value.toArray)
if (update.executeUpdate() == 0) {
using(sqlite.prepareStatement("INSERT INTO peers VALUES (?, ?, ?)")) { statement =>
statement.setBytes(1, nodeId.value.toArray)
statement.setBytes(2, encodedAddress)
statement.setBytes(3, encodedFeatures)
statement.executeUpdate()
}
}
}
if (unknownPeer) {
using(sqlite.prepareStatement("INSERT INTO peers VALUES (?, ?, ?)")) { statement =>
statement.setBytes(1, nodeId.value.toArray)
statement.setBytes(2, encodedAddress_opt.orNull)
statement.setBytes(3, encodedFeatures)
statement.executeUpdate()
}

override def addOrUpdatePeerFeatures(nodeId: Crypto.PublicKey, features: Features[InitFeature]): Unit = withMetrics("peers/add-or-update", DbBackends.Sqlite) {
val encodedFeatures = CommonCodecs.initFeaturesCodec.encode(features).require.toByteArray
using(sqlite.prepareStatement("UPDATE peers SET node_features=? WHERE node_id=?")) { update =>
update.setBytes(1, encodedFeatures)
update.setBytes(2, nodeId.value.toArray)
if (update.executeUpdate() == 0) {
using(sqlite.prepareStatement("INSERT INTO peers VALUES (?, NULL, ?)")) { statement =>
statement.setBytes(1, nodeId.value.toArray)
statement.setBytes(2, encodedFeatures)
statement.executeUpdate()
}
}
}
}
Expand Down
14 changes: 4 additions & 10 deletions eclair-core/src/main/scala/fr/acinq/eclair/io/Peer.scala
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ class Peer(val nodeParams: NodeParams,
val remoteFeatures_opt = d.remoteFeatures_opt match {
case Some(remoteFeatures) if !remoteFeatures.written =>
// We have a channel, so we can write to the DB without any DoS risk.
nodeParams.db.peers.addOrUpdatePeer(remoteNodeId, NodeInfo(remoteFeatures.features, None))
nodeParams.db.peers.addOrUpdatePeerFeatures(remoteNodeId, remoteFeatures.features)
Some(remoteFeatures.copy(written = true))
case _ => d.remoteFeatures_opt
}
Expand Down Expand Up @@ -463,7 +463,7 @@ class Peer(val nodeParams: NodeParams,
}
if (!d.remoteFeaturesWritten) {
// We have a channel, so we can write to the DB without any DoS risk.
nodeParams.db.peers.addOrUpdatePeer(remoteNodeId, NodeInfo(d.remoteFeatures, None))
nodeParams.db.peers.addOrUpdatePeerFeatures(remoteNodeId, d.remoteFeatures)
}
stay() using d.copy(activeChannels = d.activeChannels + e.channelId, remoteFeaturesWritten = true)

Expand Down Expand Up @@ -836,14 +836,8 @@ class Peer(val nodeParams: NodeParams,
if (connectionReady.outgoing) {
// We store the node address and features upon successful outgoing connection, so we can reconnect later.
// The previous address is overwritten: we don't need it since the current one works.
nodeParams.db.peers.addOrUpdatePeer(remoteNodeId, NodeInfo(connectionReady.remoteInit.features, Some(connectionReady.address)))
} else if (channels.nonEmpty) {
// If this is an incoming connection, we only store the peer details in our DB if we have channels with them.
// Otherwise nodes could DoS by simply connecting to us to force us to store data in our DB.
// We don't update the remote address, we don't know if we would successfully connect using the current one.
nodeParams.db.peers.addOrUpdatePeer(remoteNodeId, NodeInfo(connectionReady.remoteInit.features, None))
nodeParams.db.peers.addOrUpdatePeer(remoteNodeId, connectionReady.address, connectionReady.remoteInit.features)
}
val remoteFeaturesWritten = connectionReady.outgoing || channels.nonEmpty

// If we have some data stored from our peer, we send it to them before doing anything else.
peerStorage.data.foreach(connectionReady.peerConnection ! PeerStorageRetrieval(_))
Expand All @@ -865,7 +859,7 @@ class Peer(val nodeParams: NodeParams,
connectionReady.peerConnection ! CurrentFeeCredit(nodeParams.chainHash, feeCredit.getOrElse(0 msat))
}

goto(CONNECTED) using ConnectedData(connectionReady.address, connectionReady.peerConnection, connectionReady.localInit, connectionReady.remoteInit, channels, activeChannels, feerates, None, peerStorage, remoteFeaturesWritten)
goto(CONNECTED) using ConnectedData(connectionReady.address, connectionReady.peerConnection, connectionReady.localInit, connectionReady.remoteInit, channels, activeChannels, feerates, None, peerStorage, remoteFeaturesWritten = connectionReady.outgoing)
}

/**
Expand Down
20 changes: 10 additions & 10 deletions eclair-core/src/test/scala/fr/acinq/eclair/db/PeersDbSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -60,24 +60,24 @@ class PeersDbSpec extends AnyFunSuite {
val peer3 = TestCase(randomKey().publicKey, NodeInfo(Features.empty, Some(Tor3("mrl2d3ilhctt2vw4qzvmz3etzjvpnc6dczliq5chrxetthgbuczuggyd", 4231))))

assert(db.listPeers().isEmpty)
db.addOrUpdatePeer(peer1a.nodeId, peer1a.nodeInfo)
db.addOrUpdatePeer(peer1a.nodeId, peer1a.nodeInfo.address_opt.get, peer1a.nodeInfo.features)
assert(db.getPeer(peer1a.nodeId).contains(peer1a.nodeInfo))
assert(db.getPeer(peer2a.nodeId).isEmpty)
db.addOrUpdatePeer(peer1a.nodeId, peer1a.nodeInfo) // duplicate is ignored
db.addOrUpdatePeer(peer1a.nodeId, peer1a.nodeInfo.address_opt.get, peer1a.nodeInfo.features) // duplicate is ignored
assert(db.listPeers().size == 1)
db.addOrUpdatePeer(peer2a.nodeId, peer2a.nodeInfo)
db.addOrUpdatePeer(peer3.nodeId, peer3.nodeInfo)
db.addOrUpdatePeerFeatures(peer2a.nodeId, peer2a.nodeInfo.features)
db.addOrUpdatePeer(peer3.nodeId, peer3.nodeInfo.address_opt.get, peer3.nodeInfo.features)
assert(db.listPeers().map(p => TestCase(p._1, p._2)).toSet == Set(peer1a, peer2a, peer3))
db.addOrUpdatePeer(peer2b.nodeId, peer2b.nodeInfo)
db.addOrUpdatePeer(peer2b.nodeId, peer2b.nodeInfo.address_opt.get, peer2b.nodeInfo.features)
assert(db.listPeers().map(p => TestCase(p._1, p._2)).toSet == Set(peer1a, peer2b, peer3))
db.removePeer(peer2b.nodeId)
assert(db.listPeers().map(p => TestCase(p._1, p._2)).toSet == Set(peer1a, peer3))
// The first peer updates its address without changing its features.
db.addOrUpdatePeer(peer1b.nodeId, peer1b.nodeInfo)
db.addOrUpdatePeer(peer1b.nodeId, peer1b.nodeInfo.address_opt.get, peer1b.nodeInfo.features)
assert(db.getPeer(peer1b.nodeId).contains(peer1b.nodeInfo))
assert(db.listPeers().map(p => TestCase(p._1, p._2)).toSet == Set(peer1b, peer3))
// The first peer updates its features without an address: the previous address should be kept.
db.addOrUpdatePeer(peer1c.nodeId, peer1c.nodeInfo.copy(address_opt = None))
db.addOrUpdatePeerFeatures(peer1c.nodeId, peer1c.nodeInfo.features)
assert(db.getPeer(peer1c.nodeId).contains(peer1c.nodeInfo))
assert(db.listPeers().map(p => TestCase(p._1, p._2)).toSet == Set(peer1c, peer3))
}
Expand All @@ -89,7 +89,7 @@ class PeersDbSpec extends AnyFunSuite {
implicit val ec: ExecutionContextExecutor = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(8))
val Success(peerAddress) = NodeAddress.fromParts("127.0.0.1", 42000)
val futures = for (_ <- 0 until 10000) yield {
Future(db.addOrUpdatePeer(randomKey().publicKey, NodeInfo(Features.empty, Some(peerAddress))))
Future(db.addOrUpdatePeer(randomKey().publicKey, peerAddress, Features.empty))
}
val res = Future.sequence(futures)
Await.result(res, 60 seconds)
Expand Down Expand Up @@ -187,7 +187,7 @@ class PeersDbSpec extends AnyFunSuite {
val postMigrationDb = new SqlitePeersDb(connection)
assert(postMigrationDb.listPeers() == peerInfos)
val updatedPeerInfo1 = peerInfos(peerId1).copy(features = Features(Features.DataLossProtect -> FeatureSupport.Mandatory))
postMigrationDb.addOrUpdatePeer(peerId1, updatedPeerInfo1.copy(address_opt = None))
postMigrationDb.addOrUpdatePeerFeatures(peerId1, updatedPeerInfo1.features)
assert(postMigrationDb.getPeer(peerId1).contains(updatedPeerInfo1))
}
)
Expand Down Expand Up @@ -215,7 +215,7 @@ class PeersDbSpec extends AnyFunSuite {
val postMigrationDb = dbs.peers
assert(postMigrationDb.listPeers() == peerInfos)
val updatedPeerInfo1 = peerInfos(peerId1).copy(features = Features(Features.ChannelType -> FeatureSupport.Optional))
postMigrationDb.addOrUpdatePeer(peerId1, updatedPeerInfo1.copy(address_opt = None))
postMigrationDb.addOrUpdatePeerFeatures(peerId1, updatedPeerInfo1.features)
assert(postMigrationDb.getPeer(peerId1).contains(updatedPeerInfo1))
}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -862,7 +862,7 @@ class PeerSpec extends FixtureSpec {

// We have information about one of our peers in our DB.
val nodeInfo = NodeInfo(TestConstants.Bob.nodeParams.features.initFeatures(), None)
nodeParams.db.peers.addOrUpdatePeer(remoteNodeId, nodeInfo)
nodeParams.db.peers.addOrUpdatePeerFeatures(remoteNodeId, nodeInfo.features)

// We initialize ourselves after a restart, but our peer doesn't reconnect immediately to us.
switchboard.send(peer, Peer.Init(Set(ChannelCodecsSpec.normal), Map.empty))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ class ReconnectionTaskSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike

val probe = TestProbe()
val peer = TestProbe()
nodeParams.db.peers.addOrUpdatePeer(remoteNodeId, NodeInfo(Features.empty, Some(fakeIPAddress)))
nodeParams.db.peers.addOrUpdatePeer(remoteNodeId, fakeIPAddress, Features.empty)
peer.send(reconnectionTask, Peer.Transition(PeerNothingData, PeerDisconnectedData))
val TransitionWithData(ReconnectionTask.IDLE, ReconnectionTask.WAITING, _, _) = monitor.expectMsgType[TransitionWithData]
probe.send(reconnectionTask, ReconnectionTask.TickReconnect)
Expand Down Expand Up @@ -161,7 +161,7 @@ class ReconnectionTaskSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike
import f._

val peer = TestProbe()
nodeParams.db.peers.addOrUpdatePeer(remoteNodeId, NodeInfo(Features.empty, Some(fakeIPAddress)))
nodeParams.db.peers.addOrUpdatePeer(remoteNodeId, fakeIPAddress, Features.empty)
peer.send(reconnectionTask, Peer.Transition(PeerNothingData, PeerDisconnectedData))
val TransitionWithData(ReconnectionTask.IDLE, ReconnectionTask.WAITING, _, _) = monitor.expectMsgType[TransitionWithData]
val TransitionWithData(ReconnectionTask.WAITING, ReconnectionTask.CONNECTING, _, connectingData: ReconnectionTask.ConnectingData) = monitor.expectMsgType[TransitionWithData]
Expand Down Expand Up @@ -189,7 +189,7 @@ class ReconnectionTaskSpec extends TestKitBaseClass with FixtureAnyFunSuiteLike
import f._

val peer = TestProbe()
nodeParams.db.peers.addOrUpdatePeer(remoteNodeId, NodeInfo(Features.empty, Some(fakeIPAddress)))
nodeParams.db.peers.addOrUpdatePeer(remoteNodeId, fakeIPAddress, Features.empty)
peer.send(reconnectionTask, Peer.Transition(PeerNothingData, PeerDisconnectedData))
val TransitionWithData(ReconnectionTask.IDLE, ReconnectionTask.WAITING, _, _) = monitor.expectMsgType[TransitionWithData]
val TransitionWithData(ReconnectionTask.WAITING, ReconnectionTask.CONNECTING, _, _: ReconnectionTask.ConnectingData) = monitor.expectMsgType[TransitionWithData]
Expand Down

0 comments on commit aacfecc

Please sign in to comment.