diff --git a/eclair-core/src/main/resources/reference.conf b/eclair-core/src/main/resources/reference.conf index 89ae71d876..5113fe81d4 100644 --- a/eclair-core/src/main/resources/reference.conf +++ b/eclair-core/src/main/resources/reference.conf @@ -485,6 +485,16 @@ eclair { migrate-on-restart = false // migrate sqlite -> postgres on restart (only applies if sqlite is primary) compare-on-restart = false // compare sqlite and postgres dbs on restart (only applies if sqlite is primary) } + // During normal channel operation, we need to store information about past HTLCs to be able to punish our peer if + // they publish a revoked commitment. Once a channel closes or a splice transaction confirms, we can clean up past + // data (which reduces the size of our DB). Since there may be millions of rows to delete and we don't want to slow + // down the node, we delete those rows in batches at regular intervals. + revoked-htlc-info-cleaner { + // Number of rows to delete per batch: a higher value will clean up the DB faster, but may have a higher impact on performance. + batch-size = 50000 + // Frequency at which batches of rows are deleted: a lower value will clean up the DB faster, but may have a higher impact on performance. + interval = 15 minutes + } } file-backup { diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala b/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala index d77aab4687..86cc13e091 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/NodeParams.scala @@ -86,7 +86,8 @@ case class NodeParams(nodeKeyManager: NodeKeyManager, blockchainWatchdogThreshold: Int, blockchainWatchdogSources: Seq[String], onionMessageConfig: OnionMessageConfig, - purgeInvoicesInterval: Option[FiniteDuration]) { + purgeInvoicesInterval: Option[FiniteDuration], + revokedHtlcInfoCleanerConfig: RevokedHtlcInfoCleaner.Config) { val privateKey: Crypto.PrivateKey = nodeKeyManager.nodeKey.privateKey val nodeId: PublicKey = nodeKeyManager.nodeId @@ -605,7 +606,11 @@ object NodeParams extends Logging { timeout = FiniteDuration(config.getDuration("onion-messages.reply-timeout").getSeconds, TimeUnit.SECONDS), maxAttempts = config.getInt("onion-messages.max-attempts"), ), - purgeInvoicesInterval = purgeInvoicesInterval + purgeInvoicesInterval = purgeInvoicesInterval, + revokedHtlcInfoCleanerConfig = RevokedHtlcInfoCleaner.Config( + batchSize = config.getInt("db.revoked-htlc-info-cleaner.batch-size"), + interval = FiniteDuration(config.getDuration("db.revoked-htlc-info-cleaner.interval").getSeconds, TimeUnit.SECONDS) + ) ) } } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Commitments.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Commitments.scala index ce2c020ebd..3126f23318 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/Commitments.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/Commitments.scala @@ -1,7 +1,6 @@ package fr.acinq.eclair.channel import akka.event.LoggingAdapter -import com.softwaremill.quicklens.ModifyPimp import fr.acinq.bitcoin.scalacompat.Crypto.{PrivateKey, PublicKey} import fr.acinq.bitcoin.scalacompat.{ByteVector32, ByteVector64, Crypto, Satoshi, SatoshiLong, Script, Transaction, TxId} import fr.acinq.eclair.blockchain.fee.{FeeratePerByte, FeeratePerKw, FeeratesPerKw, OnChainFeeConf} @@ -266,12 +265,16 @@ case class NextRemoteCommit(sig: CommitSig, commit: RemoteCommit) /** * A minimal commitment for a given funding tx. * - * @param fundingTxIndex index of the funding tx in the life of the channel: - * - initial funding tx has index 0 - * - splice txs have index 1, 2, ... - * - commitments that share the same index are rbfed + * @param fundingTxIndex index of the funding tx in the life of the channel: + * - initial funding tx has index 0 + * - splice txs have index 1, 2, ... + * - commitments that share the same index are rbfed + * @param firstRemoteCommitIndex index of the first remote commitment we signed that spends the funding transaction. + * Once the funding transaction confirms, our peer won't be able to publish revoked + * commitments with lower commitment indices. */ case class Commitment(fundingTxIndex: Long, + firstRemoteCommitIndex: Long, remoteFundingPubKey: PublicKey, localFundingStatus: LocalFundingStatus, remoteFundingStatus: RemoteFundingStatus, localCommit: LocalCommit, remoteCommit: RemoteCommit, nextRemoteCommit_opt: Option[NextRemoteCommit]) { @@ -730,6 +733,7 @@ object Commitment { /** Subset of Commitments when we want to work with a single, specific commitment. */ case class FullCommitment(params: ChannelParams, changes: CommitmentChanges, fundingTxIndex: Long, + firstRemoteCommitIndex: Long, remoteFundingPubKey: PublicKey, localFundingStatus: LocalFundingStatus, remoteFundingStatus: RemoteFundingStatus, localCommit: LocalCommit, remoteCommit: RemoteCommit, nextRemoteCommit_opt: Option[NextRemoteCommit]) { @@ -739,7 +743,7 @@ case class FullCommitment(params: ChannelParams, changes: CommitmentChanges, val commitInput = localCommit.commitTxAndRemoteSig.commitTx.input val fundingTxId = commitInput.outPoint.txid val capacity = commitInput.txOut.amount - val commitment = Commitment(fundingTxIndex, remoteFundingPubKey, localFundingStatus, remoteFundingStatus, localCommit, remoteCommit, nextRemoteCommit_opt) + val commitment = Commitment(fundingTxIndex, firstRemoteCommitIndex, remoteFundingPubKey, localFundingStatus, remoteFundingStatus, localCommit, remoteCommit, nextRemoteCommit_opt) def localChannelReserve: Satoshi = commitment.localChannelReserve(params) @@ -803,7 +807,7 @@ case class Commitments(params: ChannelParams, lazy val availableBalanceForReceive: MilliSatoshi = active.map(_.availableBalanceForReceive(params, changes)).min // We always use the last commitment that was created, to make sure we never go back in time. - val latest = FullCommitment(params, changes, active.head.fundingTxIndex, active.head.remoteFundingPubKey, active.head.localFundingStatus, active.head.remoteFundingStatus, active.head.localCommit, active.head.remoteCommit, active.head.nextRemoteCommit_opt) + val latest = FullCommitment(params, changes, active.head.fundingTxIndex, active.head.firstRemoteCommitIndex, active.head.remoteFundingPubKey, active.head.localFundingStatus, active.head.remoteFundingStatus, active.head.localCommit, active.head.remoteCommit, active.head.nextRemoteCommit_opt) val all: Seq[Commitment] = active ++ inactive diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/ChannelOpenSingleFunded.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/ChannelOpenSingleFunded.scala index 0740907597..024738c32c 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/ChannelOpenSingleFunded.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/ChannelOpenSingleFunded.scala @@ -19,7 +19,7 @@ package fr.acinq.eclair.channel.fsm import akka.actor.Status import akka.actor.typed.scaladsl.adapter.actorRefAdapter import akka.pattern.pipe -import fr.acinq.bitcoin.scalacompat.{SatoshiLong, Script, TxHash} +import fr.acinq.bitcoin.scalacompat.{SatoshiLong, Script} import fr.acinq.eclair.blockchain.OnChainWallet.MakeFundingTxResponse import fr.acinq.eclair.blockchain.bitcoind.ZmqWatcher._ import fr.acinq.eclair.channel.Helpers.Funding @@ -277,6 +277,7 @@ trait ChannelOpenSingleFunded extends SingleFundingHandlers with ErrorHandlers { ) val commitment = Commitment( fundingTxIndex = 0, + firstRemoteCommitIndex = 0, remoteFundingPubKey = remoteFundingPubKey, localFundingStatus = SingleFundedUnconfirmedFundingTx(None), remoteFundingStatus = RemoteFundingStatus.NotLocked, @@ -323,6 +324,7 @@ trait ChannelOpenSingleFunded extends SingleFundingHandlers with ErrorHandlers { case Success(_) => val commitment = Commitment( fundingTxIndex = 0, + firstRemoteCommitIndex = 0, remoteFundingPubKey = remoteFundingPubKey, localFundingStatus = SingleFundedUnconfirmedFundingTx(Some(fundingTx)), remoteFundingStatus = RemoteFundingStatus.NotLocked, diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/CommonFundingHandlers.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/CommonFundingHandlers.scala index f846e44a06..e827205890 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/CommonFundingHandlers.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fsm/CommonFundingHandlers.scala @@ -26,6 +26,7 @@ import fr.acinq.eclair.channel.Helpers.getRelayFees import fr.acinq.eclair.channel.LocalFundingStatus.{ConfirmedFundingTx, DualFundedUnconfirmedFundingTx, SingleFundedUnconfirmedFundingTx} import fr.acinq.eclair.channel._ import fr.acinq.eclair.channel.fsm.Channel.{ANNOUNCEMENTS_MINCONF, BroadcastChannelUpdate, PeriodicRefresh, REFRESH_CHANNEL_UPDATE_INTERVAL} +import fr.acinq.eclair.db.RevokedHtlcInfoCleaner import fr.acinq.eclair.wire.protocol.{AnnouncementSignatures, ChannelReady, ChannelReadyTlv, TlvStream} import scala.concurrent.duration.{DurationInt, FiniteDuration} @@ -83,6 +84,12 @@ trait CommonFundingHandlers extends CommonHandlers { } val fundingStatus = ConfirmedFundingTx(w.tx, d.commitments.localFundingSigs(w.tx.txid)) context.system.eventStream.publish(TransactionConfirmed(d.channelId, remoteNodeId, w.tx)) + // When a splice transaction confirms, it double-spends all the commitment transactions that only applied to the + // previous funding transaction. Our peer cannot publish the corresponding revoked commitments anymore, so we can + // clean-up the htlc data that we were storing for the matching penalty transactions. + d.commitments.all.find(_.fundingTxId == w.tx.txid).map(_.firstRemoteCommitIndex).foreach { + commitIndex => context.system.eventStream.publish(RevokedHtlcInfoCleaner.ForgetHtlcInfos(d.channelId, beforeCommitIndex = commitIndex)) + } d.commitments.updateLocalFundingStatus(w.tx.txid, fundingStatus).map { case (commitments1, commitment) => // First of all, we watch the funding tx that is now confirmed. diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fund/InteractiveTxBuilder.scala b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fund/InteractiveTxBuilder.scala index 2d4afcbc94..d5b8f12c40 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/channel/fund/InteractiveTxBuilder.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/channel/fund/InteractiveTxBuilder.scala @@ -964,7 +964,7 @@ object InteractiveTxSigningSession { LocalCommit.fromCommitSig(nodeParams.channelKeyManager, channelParams, fundingTx.txId, fundingTxIndex, fundingParams.remoteFundingPubKey, commitInput, remoteCommitSig, localCommitIndex, unsignedLocalCommit.spec, localPerCommitmentPoint).map { signedLocalCommit => if (shouldSignFirst(fundingParams.isInitiator, channelParams, fundingTx.tx)) { val fundingStatus = LocalFundingStatus.DualFundedUnconfirmedFundingTx(fundingTx, nodeParams.currentBlockHeight, fundingParams) - val commitment = Commitment(fundingTxIndex, fundingParams.remoteFundingPubKey, fundingStatus, RemoteFundingStatus.NotLocked, signedLocalCommit, remoteCommit, None) + val commitment = Commitment(fundingTxIndex, remoteCommit.index, fundingParams.remoteFundingPubKey, fundingStatus, RemoteFundingStatus.NotLocked, signedLocalCommit, remoteCommit, None) SendingSigs(fundingStatus, commitment, fundingTx.localSigs) } else { this.copy(localCommit = Right(signedLocalCommit)) @@ -989,7 +989,7 @@ object InteractiveTxSigningSession { case Right(fullySignedTx) => log.info("interactive-tx fully signed with {} local inputs, {} remote inputs, {} local outputs and {} remote outputs", fullySignedTx.tx.localInputs.length, fullySignedTx.tx.remoteInputs.length, fullySignedTx.tx.localOutputs.length, fullySignedTx.tx.remoteOutputs.length) val fundingStatus = LocalFundingStatus.DualFundedUnconfirmedFundingTx(fullySignedTx, nodeParams.currentBlockHeight, fundingParams) - val commitment = Commitment(fundingTxIndex, fundingParams.remoteFundingPubKey, fundingStatus, RemoteFundingStatus.NotLocked, signedLocalCommit, remoteCommit, None) + val commitment = Commitment(fundingTxIndex, remoteCommit.index, fundingParams.remoteFundingPubKey, fundingStatus, RemoteFundingStatus.NotLocked, signedLocalCommit, remoteCommit, None) Right(SendingSigs(fundingStatus, commitment, fullySignedTx.localSigs)) } } diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/ChannelsDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/ChannelsDb.scala index c312c9786d..6f1f34ecce 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/ChannelsDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/ChannelsDb.scala @@ -18,9 +18,9 @@ package fr.acinq.eclair.db import fr.acinq.bitcoin.scalacompat.ByteVector32 import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey -import fr.acinq.eclair.{CltvExpiry, Paginated, TimestampSecond} import fr.acinq.eclair.channel.PersistentChannelData import fr.acinq.eclair.db.DbEventHandler.ChannelEvent +import fr.acinq.eclair.{CltvExpiry, Paginated} trait ChannelsDb { @@ -30,8 +30,15 @@ trait ChannelsDb { def updateChannelMeta(channelId: ByteVector32, event: ChannelEvent.EventType): Unit + /** Mark a channel as closed, but keep it in the DB. */ def removeChannel(channelId: ByteVector32): Unit + /** Mark revoked HTLC information as obsolete. It will be removed from the DB once [[removeHtlcInfos]] is called. */ + def markHtlcInfosForRemoval(channelId: ByteVector32, beforeCommitIndex: Long): Unit + + /** Remove up to batchSize obsolete revoked HTLC information. */ + def removeHtlcInfos(batchSize: Int): Unit + def listLocalChannels(): Seq[PersistentChannelData] def listClosedChannels(remoteNodeId_opt: Option[PublicKey], paginated_opt: Option[Paginated]): Seq[PersistentChannelData] diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/DbEventHandler.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/DbEventHandler.scala index 1b7eea5aab..e241db7a47 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/DbEventHandler.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/DbEventHandler.scala @@ -16,6 +16,9 @@ package fr.acinq.eclair.db +import akka.actor.typed.SupervisorStrategy +import akka.actor.typed.scaladsl.Behaviors +import akka.actor.typed.scaladsl.adapter.ClassicActorContextOps import akka.actor.{Actor, DiagnosticActorLogging, Props} import akka.event.Logging.MDC import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey @@ -33,8 +36,10 @@ import fr.acinq.eclair.{Logs, NodeParams} */ class DbEventHandler(nodeParams: NodeParams) extends Actor with DiagnosticActorLogging { - val auditDb: AuditDb = nodeParams.db.audit - val channelsDb: ChannelsDb = nodeParams.db.channels + private val auditDb: AuditDb = nodeParams.db.audit + private val channelsDb: ChannelsDb = nodeParams.db.channels + + context.spawn(Behaviors.supervise(RevokedHtlcInfoCleaner(channelsDb, nodeParams.revokedHtlcInfoCleanerConfig)).onFailure(SupervisorStrategy.restart), name = "revoked-htlc-info-cleaner") context.system.eventStream.subscribe(self, classOf[PaymentSent]) context.system.eventStream.subscribe(self, classOf[PaymentFailed]) diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala index 17078e0d91..bf6eb6e669 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/DualDatabases.scala @@ -231,6 +231,16 @@ case class DualChannelsDb(primary: ChannelsDb, secondary: ChannelsDb) extends Ch primary.removeChannel(channelId) } + override def markHtlcInfosForRemoval(channelId: ByteVector32, beforeCommitIndex: Long): Unit = { + runAsync(secondary.markHtlcInfosForRemoval(channelId, beforeCommitIndex)) + primary.markHtlcInfosForRemoval(channelId, beforeCommitIndex) + } + + override def removeHtlcInfos(batchSize: Int): Unit = { + runAsync(secondary.removeHtlcInfos(batchSize)) + primary.removeHtlcInfos(batchSize) + } + override def listLocalChannels(): Seq[PersistentChannelData] = { runAsync(secondary.listLocalChannels()) primary.listLocalChannels() diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/RevokedHtlcInfoCleaner.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/RevokedHtlcInfoCleaner.scala new file mode 100644 index 0000000000..98cc460914 --- /dev/null +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/RevokedHtlcInfoCleaner.scala @@ -0,0 +1,59 @@ +/* + * Copyright 2023 ACINQ SAS + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fr.acinq.eclair.db + +import akka.actor.typed.Behavior +import akka.actor.typed.eventstream.EventStream +import akka.actor.typed.scaladsl.Behaviors +import fr.acinq.bitcoin.scalacompat.ByteVector32 + +import scala.concurrent.duration.FiniteDuration + +/** + * When a channel is closed or a splice transaction confirms, we can remove the information about old HTLCs that was + * stored in the DB to punish revoked commitments. We potentially have millions of rows to delete per channel, and there + * is no rush to remove them. We don't want this to negatively impact active channels, so this actor deletes that data + * in small batches, at regular intervals. + */ +object RevokedHtlcInfoCleaner { + + // @formatter:off + sealed trait Command + case class ForgetHtlcInfos(channelId: ByteVector32, beforeCommitIndex: Long) extends Command + private case object DeleteBatch extends Command + // @formatter:on + + case class Config(batchSize: Int, interval: FiniteDuration) + + def apply(db: ChannelsDb, config: Config): Behavior[Command] = { + Behaviors.setup { context => + context.system.eventStream ! EventStream.Subscribe(context.self) + Behaviors.withTimers { timers => + timers.startTimerWithFixedDelay(DeleteBatch, config.interval) + Behaviors.receiveMessage { + case ForgetHtlcInfos(channelId, beforeCommitIndex) => + db.markHtlcInfosForRemoval(channelId, beforeCommitIndex) + Behaviors.same + case DeleteBatch => + db.removeHtlcInfos(config.batchSize) + Behaviors.same + } + } + } + } + +} diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala index a2f852befc..7a35e059d0 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/pg/PgChannelsDb.scala @@ -36,7 +36,7 @@ import javax.sql.DataSource object PgChannelsDb { val DB_NAME = "channels" - val CURRENT_VERSION = 8 + val CURRENT_VERSION = 9 } class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb with Logging { @@ -65,7 +65,7 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit statement.executeUpdate("ALTER TABLE local_channels ALTER COLUMN last_connected_timestamp SET DATA TYPE TIMESTAMP WITH TIME ZONE USING timestamp with time zone 'epoch' + last_connected_timestamp * interval '1 millisecond'") statement.executeUpdate("ALTER TABLE local_channels ALTER COLUMN closed_timestamp SET DATA TYPE TIMESTAMP WITH TIME ZONE USING timestamp with time zone 'epoch' + closed_timestamp * interval '1 millisecond'") - statement.executeUpdate("ALTER TABLE htlc_infos ALTER COLUMN commitment_number SET DATA TYPE BIGINT USING commitment_number::BIGINT") + statement.executeUpdate("ALTER TABLE htlc_infos ALTER COLUMN commitment_number SET DATA TYPE BIGINT USING commitment_number::BIGINT") } def migration45(statement: Statement): Unit = { @@ -115,12 +115,17 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit statement.executeUpdate("CREATE INDEX local_channels_remote_node_id_idx ON local.channels(remote_node_id)") } + def migration89(statement: Statement): Unit = { + statement.executeUpdate("CREATE TABLE local.htlc_infos_to_remove (channel_id TEXT NOT NULL PRIMARY KEY, before_commitment_number BIGINT NOT NULL)") + } + getVersion(statement, DB_NAME) match { case None => statement.executeUpdate("CREATE SCHEMA IF NOT EXISTS local") statement.executeUpdate("CREATE TABLE local.channels (channel_id TEXT NOT NULL PRIMARY KEY, remote_node_id TEXT NOT NULL, data BYTEA NOT NULL, json JSONB NOT NULL, is_closed BOOLEAN NOT NULL DEFAULT FALSE, created_timestamp TIMESTAMP WITH TIME ZONE, last_payment_sent_timestamp TIMESTAMP WITH TIME ZONE, last_payment_received_timestamp TIMESTAMP WITH TIME ZONE, last_connected_timestamp TIMESTAMP WITH TIME ZONE, closed_timestamp TIMESTAMP WITH TIME ZONE)") statement.executeUpdate("CREATE TABLE local.htlc_infos (channel_id TEXT NOT NULL, commitment_number BIGINT NOT NULL, payment_hash TEXT NOT NULL, cltv_expiry BIGINT NOT NULL, FOREIGN KEY(channel_id) REFERENCES local.channels(channel_id))") + statement.executeUpdate("CREATE TABLE local.htlc_infos_to_remove (channel_id TEXT NOT NULL PRIMARY KEY, before_commitment_number BIGINT NOT NULL)") statement.executeUpdate("CREATE INDEX local_channels_type_idx ON local.channels ((json->>'type'))") statement.executeUpdate("CREATE INDEX local_channels_remote_node_id_idx ON local.channels(remote_node_id)") @@ -145,6 +150,9 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit if (v < 8) { migration78(statement) } + if (v < 9) { + migration89(statement) + } case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion") } @@ -225,10 +233,9 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit statement.executeUpdate() } - using(pg.prepareStatement("DELETE FROM local.htlc_infos WHERE channel_id=?")) { statement => - statement.setString(1, channelId.toHex) - statement.executeUpdate() - } + // The htlc_infos may contain millions of rows, which is very expensive to delete synchronously. + // We instead run an asynchronous job to clean up that data in small batches. + markHtlcInfosForRemoval(channelId, Long.MaxValue) using(pg.prepareStatement("UPDATE local.channels SET is_closed=TRUE, closed_timestamp=? WHERE channel_id=?")) { statement => statement.setTimestamp(1, Timestamp.from(Instant.now())) @@ -238,6 +245,46 @@ class PgChannelsDb(implicit ds: DataSource, lock: PgLock) extends ChannelsDb wit } } + override def markHtlcInfosForRemoval(channelId: ByteVector32, beforeCommitIndex: Long): Unit = withMetrics("channels/forget-htlc-infos", DbBackends.Postgres) { + withLock { pg => + using(pg.prepareStatement("INSERT INTO local.htlc_infos_to_remove (channel_id, before_commitment_number) VALUES(?, ?) ON CONFLICT (channel_id) DO UPDATE SET before_commitment_number = EXCLUDED.before_commitment_number")) { statement => + statement.setString(1, channelId.toHex) + statement.setLong(2, beforeCommitIndex) + statement.executeUpdate() + } + } + } + + override def removeHtlcInfos(batchSize: Int): Unit = withMetrics("channels/remove-htlc-infos", DbBackends.Postgres) { + withLock { pg => + // Check if there are channels that need to be cleaned up. + val channelToCleanUp_opt = using(pg.prepareStatement("SELECT channel_id, before_commitment_number FROM local.htlc_infos_to_remove LIMIT 1")) { statement => + statement.executeQuery().map(rs => { + val channelId = ByteVector32(rs.getByteVector32FromHex("channel_id")) + val beforeCommitmentNumber = rs.getLong("before_commitment_number") + (channelId, beforeCommitmentNumber) + }).lastOption + } + // Remove a batch of HTLC information for that channel. + channelToCleanUp_opt.foreach { case (channelId, beforeCommitmentNumber) => + val deletedCount = using(pg.prepareStatement(s"DELETE FROM local.htlc_infos WHERE channel_id=? AND commitment_number IN (SELECT commitment_number FROM local.htlc_infos WHERE channel_id=? AND commitment_number + statement.setString(1, channelId.toHex) + statement.setString(2, channelId.toHex) + statement.setLong(3, beforeCommitmentNumber) + statement.executeUpdate() + } + logger.info(s"deleted $deletedCount rows from htlc_infos for channelId=$channelId beforeCommitmentNumber=$beforeCommitmentNumber") + // If we've deleted all HTLC information for that channel, we can now remove it from the DB. + if (deletedCount < batchSize) { + using(pg.prepareStatement("DELETE FROM local.htlc_infos_to_remove WHERE channel_id=?")) { statement => + statement.setString(1, channelId.toHex) + statement.executeUpdate() + } + } + } + } + } + override def listLocalChannels(): Seq[PersistentChannelData] = withMetrics("channels/list-local-channels", DbBackends.Postgres) { withLock { pg => using(pg.createStatement) { statement => diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala index ea07977280..e9f9f8d014 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/db/sqlite/SqliteChannelsDb.scala @@ -24,7 +24,7 @@ import fr.acinq.eclair.db.DbEventHandler.ChannelEvent import fr.acinq.eclair.db.Monitoring.Metrics.withMetrics import fr.acinq.eclair.db.Monitoring.Tags.DbBackends import fr.acinq.eclair.wire.internal.channel.ChannelCodecs.channelDataCodec -import fr.acinq.eclair.{CltvExpiry, Paginated, TimestampMilli, TimestampSecond} +import fr.acinq.eclair.{CltvExpiry, Paginated, TimestampMilli} import grizzled.slf4j.Logging import scodec.bits.BitVector @@ -32,7 +32,7 @@ import java.sql.{Connection, Statement} object SqliteChannelsDb { val DB_NAME = "channels" - val CURRENT_VERSION = 4 + val CURRENT_VERSION = 5 } class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging { @@ -79,10 +79,15 @@ class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging { )(logger) } + def migration45(): Unit = { + statement.executeUpdate("CREATE TABLE htlc_infos_to_remove (channel_id BLOB NOT NULL PRIMARY KEY, before_commitment_number INTEGER NOT NULL)") + } + getVersion(statement, DB_NAME) match { case None => statement.executeUpdate("CREATE TABLE local_channels (channel_id BLOB NOT NULL PRIMARY KEY, data BLOB NOT NULL, is_closed BOOLEAN NOT NULL DEFAULT 0, created_timestamp INTEGER, last_payment_sent_timestamp INTEGER, last_payment_received_timestamp INTEGER, last_connected_timestamp INTEGER, closed_timestamp INTEGER)") statement.executeUpdate("CREATE TABLE htlc_infos (channel_id BLOB NOT NULL, commitment_number INTEGER NOT NULL, payment_hash BLOB NOT NULL, cltv_expiry INTEGER NOT NULL, FOREIGN KEY(channel_id) REFERENCES local_channels(channel_id))") + statement.executeUpdate("CREATE TABLE htlc_infos_to_remove (channel_id BLOB NOT NULL PRIMARY KEY, before_commitment_number INTEGER NOT NULL)") statement.executeUpdate("CREATE INDEX htlc_infos_idx ON htlc_infos(channel_id, commitment_number)") case Some(v@(1 | 2 | 3)) => logger.warn(s"migrating db $DB_NAME, found version=$v current=$CURRENT_VERSION") @@ -95,6 +100,9 @@ class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging { if (v < 4) { migration34() } + if (v < 5) { + migration45() + } case Some(CURRENT_VERSION) => () // table is up-to-date, nothing to do case Some(unknownVersion) => throw new RuntimeException(s"Unknown version of DB $DB_NAME found, version=$unknownVersion") } @@ -152,10 +160,9 @@ class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging { statement.executeUpdate() } - using(sqlite.prepareStatement("DELETE FROM htlc_infos WHERE channel_id=?")) { statement => - statement.setBytes(1, channelId.toArray) - statement.executeUpdate() - } + // The htlc_infos may contain millions of rows, which is very expensive to delete synchronously. + // We instead run an asynchronous job to clean up that data in small batches. + markHtlcInfosForRemoval(channelId, Long.MaxValue) using(sqlite.prepareStatement("UPDATE local_channels SET is_closed=1, closed_timestamp=? WHERE channel_id=?")) { statement => statement.setLong(1, TimestampMilli.now().toLong) @@ -164,6 +171,48 @@ class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging { } } + override def markHtlcInfosForRemoval(channelId: ByteVector32, beforeCommitIndex: Long): Unit = withMetrics("channels/forget-htlc-infos", DbBackends.Sqlite) { + using(sqlite.prepareStatement("UPDATE htlc_infos_to_remove SET before_commitment_number=? WHERE channel_id=?")) { update => + update.setLong(1, beforeCommitIndex) + update.setBytes(2, channelId.toArray) + if (update.executeUpdate() == 0) { + using(sqlite.prepareStatement("INSERT INTO htlc_infos_to_remove VALUES (?, ?)")) { statement => + statement.setBytes(1, channelId.toArray) + statement.setLong(2, beforeCommitIndex) + statement.executeUpdate() + } + } + } + } + + override def removeHtlcInfos(batchSize: Int): Unit = withMetrics("channels/remove-htlc-infos", DbBackends.Sqlite) { + // Check if there are channels that need to be cleaned up. + val channelToCleanUp_opt = using(sqlite.prepareStatement("SELECT channel_id, before_commitment_number FROM htlc_infos_to_remove LIMIT 1")) { statement => + statement.executeQuery().map(rs => { + val channelId = ByteVector32(rs.getByteVector32("channel_id")) + val beforeCommitmentNumber = rs.getLong("before_commitment_number") + (channelId, beforeCommitmentNumber) + }).lastOption + } + // Remove a batch of HTLC information for that channel. + channelToCleanUp_opt.foreach { case (channelId, beforeCommitmentNumber) => + val deletedCount = using(sqlite.prepareStatement(s"DELETE FROM htlc_infos WHERE channel_id=? AND commitment_number IN (SELECT commitment_number FROM htlc_infos WHERE channel_id=? AND commitment_number + statement.setBytes(1, channelId.toArray) + statement.setBytes(2, channelId.toArray) + statement.setLong(3, beforeCommitmentNumber) + statement.executeUpdate() + } + logger.info(s"deleted $deletedCount rows from htlc_infos for channelId=$channelId beforeCommitmentNumber=$beforeCommitmentNumber") + // If we've deleted all HTLC information for that channel, we can now remove it from the DB. + if (deletedCount < batchSize) { + using(sqlite.prepareStatement("DELETE FROM htlc_infos_to_remove WHERE channel_id=?")) { statement => + statement.setBytes(1, channelId.toArray) + statement.executeUpdate() + } + } + } + } + override def listLocalChannels(): Seq[PersistentChannelData] = withMetrics("channels/list-local-channels", DbBackends.Sqlite) { using(sqlite.createStatement) { statement => statement.executeQuery("SELECT data FROM local_channels WHERE is_closed=0") @@ -175,21 +224,21 @@ class SqliteChannelsDb(val sqlite: Connection) extends ChannelsDb with Logging { override def listClosedChannels(remoteNodeId_opt: Option[PublicKey], paginated_opt: Option[Paginated]): Seq[PersistentChannelData] = withMetrics("channels/list-closed-channels", DbBackends.Sqlite) { val sql = "SELECT data FROM local_channels WHERE is_closed=1 ORDER BY closed_timestamp DESC" remoteNodeId_opt match { - case None => - using(sqlite.prepareStatement(limited(sql, paginated_opt))) { statement => - statement.executeQuery().mapCodec(channelDataCodec).toSeq - } - case Some(nodeId) => - using(sqlite.prepareStatement(sql)) { statement => - val filtered = statement.executeQuery() - .mapCodec(channelDataCodec).filter(_.remoteNodeId == nodeId) - val limited = paginated_opt match { - case None => filtered - case Some(p) => filtered.slice(p.skip, p.skip + p.count) - } - limited.toSeq + case None => + using(sqlite.prepareStatement(limited(sql, paginated_opt))) { statement => + statement.executeQuery().mapCodec(channelDataCodec).toSeq + } + case Some(nodeId) => + using(sqlite.prepareStatement(sql)) { statement => + val filtered = statement.executeQuery() + .mapCodec(channelDataCodec).filter(_.remoteNodeId == nodeId) + val limited = paginated_opt match { + case None => filtered + case Some(p) => filtered.slice(p.skip, p.skip + p.count) } - } + limited.toSeq + } + } } override def addHtlcInfo(channelId: ByteVector32, commitmentNumber: Long, paymentHash: ByteVector32, cltvExpiry: CltvExpiry): Unit = withMetrics("channels/add-htlc-info", DbBackends.Sqlite) { diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/wire/internal/channel/version0/ChannelTypes0.scala b/eclair-core/src/main/scala/fr/acinq/eclair/wire/internal/channel/version0/ChannelTypes0.scala index 4d3d4bab63..e5fb015785 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/wire/internal/channel/version0/ChannelTypes0.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/wire/internal/channel/version0/ChannelTypes0.scala @@ -239,6 +239,7 @@ private[channel] object ChannelTypes0 { } val commitment = Commitment( fundingTxIndex = 0, + firstRemoteCommitIndex = 0, remoteFundingPubKey = remoteParams.fundingPubKey, // We set an empty funding tx, even if it may be confirmed already (and the channel fully operational). We could // have set a specific Unknown status, but it would have forced us to keep it forever. We will retrieve the diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/wire/internal/channel/version3/ChannelTypes3.scala b/eclair-core/src/main/scala/fr/acinq/eclair/wire/internal/channel/version3/ChannelTypes3.scala index 2720b5b440..ce14345e8e 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/wire/internal/channel/version3/ChannelTypes3.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/wire/internal/channel/version3/ChannelTypes3.scala @@ -45,7 +45,7 @@ private[channel] object ChannelTypes3 { def migrate(): channel.Commitments = channel.Commitments( ChannelParams(channelId, channelConfig, channelFeatures, localParams, remoteParams.migrate(), channelFlags), CommitmentChanges(localChanges, remoteChanges, localNextHtlcId, remoteNextHtlcId), - Seq(Commitment(fundingTxIndex = 0, remoteFundingPubKey = remoteParams.fundingPubKey, localFundingStatus, remoteFundingStatus, localCommit, remoteCommit, remoteNextCommitInfo.left.toOption.map(w => NextRemoteCommit(w.sent, w.nextRemoteCommit)))), + Seq(Commitment(fundingTxIndex = 0, firstRemoteCommitIndex = 0, remoteFundingPubKey = remoteParams.fundingPubKey, localFundingStatus, remoteFundingStatus, localCommit, remoteCommit, remoteNextCommitInfo.left.toOption.map(w => NextRemoteCommit(w.sent, w.nextRemoteCommit)))), inactive = Nil, remoteNextCommitInfo.fold(w => Left(WaitForRev(w.sentAfterLocalCommitIndex)), remotePerCommitmentPoint => Right(remotePerCommitmentPoint)), remotePerCommitmentSecrets, diff --git a/eclair-core/src/main/scala/fr/acinq/eclair/wire/internal/channel/version4/ChannelCodecs4.scala b/eclair-core/src/main/scala/fr/acinq/eclair/wire/internal/channel/version4/ChannelCodecs4.scala index ffe5b5eee5..e8be2ded6e 100644 --- a/eclair-core/src/main/scala/fr/acinq/eclair/wire/internal/channel/version4/ChannelCodecs4.scala +++ b/eclair-core/src/main/scala/fr/acinq/eclair/wire/internal/channel/version4/ChannelCodecs4.scala @@ -6,17 +6,16 @@ import fr.acinq.bitcoin.scalacompat.{OutPoint, ScriptWitness, Transaction, TxOut import fr.acinq.eclair.blockchain.fee.{ConfirmationPriority, ConfirmationTarget} import fr.acinq.eclair.channel.LocalFundingStatus._ import fr.acinq.eclair.channel._ +import fr.acinq.eclair.channel.fund.InteractiveTxBuilder.{FullySignedSharedTransaction, PartiallySignedSharedTransaction} import fr.acinq.eclair.channel.fund.InteractiveTxSigningSession.UnsignedLocalCommit import fr.acinq.eclair.channel.fund.{InteractiveTxBuilder, InteractiveTxSigningSession} import fr.acinq.eclair.crypto.ShaChain -import fr.acinq.eclair.MilliSatoshiLong -import fr.acinq.eclair.channel.fund.InteractiveTxBuilder.{FullySignedSharedTransaction, PartiallySignedSharedTransaction} import fr.acinq.eclair.transactions.Transactions._ import fr.acinq.eclair.transactions.{CommitmentSpec, DirectedHtlc, IncomingHtlc, OutgoingHtlc} import fr.acinq.eclair.wire.protocol.CommonCodecs._ import fr.acinq.eclair.wire.protocol.LightningMessageCodecs._ import fr.acinq.eclair.wire.protocol.{TxSignatures, UpdateAddHtlc, UpdateMessage} -import fr.acinq.eclair.{BlockHeight, FeatureSupport, Features, PermanentChannelFeature, channel} +import fr.acinq.eclair.{BlockHeight, FeatureSupport, Features, MilliSatoshiLong, PermanentChannelFeature, channel} import scodec.bits.{BitVector, ByteVector} import scodec.codecs._ import scodec.{Attempt, Codec} @@ -369,7 +368,13 @@ private[channel] object ChannelCodecs4 { ("sharedTx" | signedSharedTransactionCodec) :: ("createdAt" | blockHeight) :: ("fundingParams" | fundingParamsCodec)).as[DualFundedUnconfirmedFundingTx].xmap( - dfu => (dfu.sharedTx.tx.sharedInput_opt, dfu.fundingParams.sharedInput_opt) match { + dfu => fillSharedInputScript(dfu), + dfu => dfu + ) + + // When decoding interactive-tx from older codecs, we fill the shared input publicKeyScript if necessary. + private def fillSharedInputScript(dfu: DualFundedUnconfirmedFundingTx): DualFundedUnconfirmedFundingTx = { + (dfu.sharedTx.tx.sharedInput_opt, dfu.fundingParams.sharedInput_opt) match { case (Some(sharedTxInput), Some(sharedFundingParamsInput)) if sharedTxInput.publicKeyScript.isEmpty => val sharedTxInput1 = sharedTxInput.copy(publicKeyScript = sharedFundingParamsInput.info.txOut.publicKeyScript) val sharedTx1 = dfu.sharedTx.tx.copy(sharedInput_opt = Some(sharedTxInput1)) @@ -379,9 +384,8 @@ private[channel] object ChannelCodecs4 { } dfu1 case _ => dfu - }, - dfu => dfu - ) + } + } val fundingTxStatusCodec: Codec[LocalFundingStatus] = discriminated[LocalFundingStatus].by(uint8) .typecase(0x01, optional(bool8, txCodec).as[SingleFundedUnconfirmedFundingTx]) @@ -428,8 +432,19 @@ private[channel] object ChannelCodecs4 { ("sig" | lengthDelimited(commitSigCodec)) :: ("commit" | remoteCommitCodec(commitmentSpecCodec))).as[NextRemoteCommit] + private def commitmentCodecWithoutFirstRemoteCommitIndex(htlcs: Set[DirectedHtlc]): Codec[Commitment] = ( + ("fundingTxIndex" | uint32) :: + ("firstRemoteCommitIndex" | provide(0L)) :: + ("fundingPubKey" | publicKey) :: + ("fundingTxStatus" | fundingTxStatusCodec) :: + ("remoteFundingStatus" | remoteFundingStatusCodec) :: + ("localCommit" | localCommitCodec(minimalCommitmentSpecCodec(htlcs))) :: + ("remoteCommit" | remoteCommitCodec(minimalCommitmentSpecCodec(htlcs.map(_.opposite)))) :: + ("nextRemoteCommit_opt" | optional(bool8, nextRemoteCommitCodec(minimalCommitmentSpecCodec(htlcs.map(_.opposite)))))).as[Commitment] + private def commitmentCodec(htlcs: Set[DirectedHtlc]): Codec[Commitment] = ( ("fundingTxIndex" | uint32) :: + ("firstRemoteCommitIndex" | uint64overflow) :: ("fundingPubKey" | publicKey) :: ("fundingTxStatus" | fundingTxStatusCodec) :: ("remoteFundingStatus" | remoteFundingStatusCodec) :: @@ -490,6 +505,21 @@ private[channel] object ChannelCodecs4 { } } + val commitmentsCodecWithoutFirstRemoteCommitIndex: Codec[Commitments] = ( + ("params" | paramsCodec) :: + ("changes" | changesCodec) :: + (("htlcs" | setCodec(htlcCodec)) >>:~ { htlcs => + ("active" | listOfN(uint16, commitmentCodecWithoutFirstRemoteCommitIndex(htlcs))) :: + ("inactive" | listOfN(uint16, commitmentCodecWithoutFirstRemoteCommitIndex(htlcs))) :: + ("remoteNextCommitInfo" | either(bool8, waitForRevCodec, publicKey)) :: + ("remotePerCommitmentSecrets" | byteAligned(ShaChain.shaChainCodec)) :: + ("originChannels" | originsMapCodec) :: + ("remoteChannelData_opt" | optional(bool8, varsizebinarydata)) + })).as[EncodedCommitments].xmap( + encoded => encoded.toCommitments, + commitments => EncodedCommitments(commitments) + ) + val commitmentsCodec: Codec[Commitments] = ( ("params" | paramsCodec) :: ("changes" | changesCodec) :: @@ -505,6 +535,9 @@ private[channel] object ChannelCodecs4 { commitments => EncodedCommitments(commitments) ) + val versionedCommitmentsCodec: Codec[Commitments] = discriminated[Commitments].by(uint8) + .typecase(0x01, commitmentsCodec) + val closingFeeratesCodec: Codec[ClosingFeerates] = ( ("preferred" | feeratePerKw) :: ("min" | feeratePerKw) :: @@ -565,13 +598,23 @@ private[channel] object ChannelCodecs4 { .\(0x02) { case status: SpliceStatus.SpliceWaitingForSigs => status }(interactiveTxWaitingForSigsCodec.as[channel.SpliceStatus.SpliceWaitingForSigs]) val DATA_WAIT_FOR_FUNDING_CONFIRMED_00_Codec: Codec[DATA_WAIT_FOR_FUNDING_CONFIRMED] = ( - ("commitments" | commitmentsCodec) :: + ("commitments" | commitmentsCodecWithoutFirstRemoteCommitIndex) :: + ("waitingSince" | blockHeight) :: + ("deferred" | optional(bool8, lengthDelimited(channelReadyCodec))) :: + ("lastSent" | either(bool8, lengthDelimited(fundingCreatedCodec), lengthDelimited(fundingSignedCodec)))).as[DATA_WAIT_FOR_FUNDING_CONFIRMED] + + val DATA_WAIT_FOR_FUNDING_CONFIRMED_0a_Codec: Codec[DATA_WAIT_FOR_FUNDING_CONFIRMED] = ( + ("commitments" | versionedCommitmentsCodec) :: ("waitingSince" | blockHeight) :: ("deferred" | optional(bool8, lengthDelimited(channelReadyCodec))) :: ("lastSent" | either(bool8, lengthDelimited(fundingCreatedCodec), lengthDelimited(fundingSignedCodec)))).as[DATA_WAIT_FOR_FUNDING_CONFIRMED] val DATA_WAIT_FOR_CHANNEL_READY_01_Codec: Codec[DATA_WAIT_FOR_CHANNEL_READY] = ( - ("commitments" | commitmentsCodec) :: + ("commitments" | commitmentsCodecWithoutFirstRemoteCommitIndex) :: + ("shortIds" | shortids)).as[DATA_WAIT_FOR_CHANNEL_READY] + + val DATA_WAIT_FOR_CHANNEL_READY_0b_Codec: Codec[DATA_WAIT_FOR_CHANNEL_READY] = ( + ("commitments" | versionedCommitmentsCodec) :: ("shortIds" | shortids)).as[DATA_WAIT_FOR_CHANNEL_READY] val DATA_WAIT_FOR_DUAL_FUNDING_SIGNED_09_Codec: Codec[DATA_WAIT_FOR_DUAL_FUNDING_SIGNED] = ( @@ -583,7 +626,16 @@ private[channel] object ChannelCodecs4 { ("remoteChannelData_opt" | optional(bool8, varsizebinarydata))).as[DATA_WAIT_FOR_DUAL_FUNDING_SIGNED] val DATA_WAIT_FOR_DUAL_FUNDING_CONFIRMED_02_Codec: Codec[DATA_WAIT_FOR_DUAL_FUNDING_CONFIRMED] = ( - ("commitments" | commitmentsCodec) :: + ("commitments" | commitmentsCodecWithoutFirstRemoteCommitIndex) :: + ("localPushAmount" | millisatoshi) :: + ("remotePushAmount" | millisatoshi) :: + ("waitingSince" | blockHeight) :: + ("lastChecked" | blockHeight) :: + ("rbfStatus" | rbfStatusCodec) :: + ("deferred" | optional(bool8, lengthDelimited(channelReadyCodec)))).as[DATA_WAIT_FOR_DUAL_FUNDING_CONFIRMED] + + val DATA_WAIT_FOR_DUAL_FUNDING_CONFIRMED_0c_Codec: Codec[DATA_WAIT_FOR_DUAL_FUNDING_CONFIRMED] = ( + ("commitments" | versionedCommitmentsCodec) :: ("localPushAmount" | millisatoshi) :: ("remotePushAmount" | millisatoshi) :: ("waitingSince" | blockHeight) :: @@ -592,11 +644,25 @@ private[channel] object ChannelCodecs4 { ("deferred" | optional(bool8, lengthDelimited(channelReadyCodec)))).as[DATA_WAIT_FOR_DUAL_FUNDING_CONFIRMED] val DATA_WAIT_FOR_DUAL_FUNDING_READY_03_Codec: Codec[DATA_WAIT_FOR_DUAL_FUNDING_READY] = ( - ("commitments" | commitmentsCodec) :: + ("commitments" | commitmentsCodecWithoutFirstRemoteCommitIndex) :: + ("shortIds" | shortids)).as[DATA_WAIT_FOR_DUAL_FUNDING_READY] + + val DATA_WAIT_FOR_DUAL_FUNDING_READY_0d_Codec: Codec[DATA_WAIT_FOR_DUAL_FUNDING_READY] = ( + ("commitments" | versionedCommitmentsCodec) :: ("shortIds" | shortids)).as[DATA_WAIT_FOR_DUAL_FUNDING_READY] val DATA_NORMAL_04_Codec: Codec[DATA_NORMAL] = ( - ("commitments" | commitmentsCodec) :: + ("commitments" | commitmentsCodecWithoutFirstRemoteCommitIndex) :: + ("shortids" | shortids) :: + ("channelAnnouncement" | optional(bool8, lengthDelimited(channelAnnouncementCodec))) :: + ("channelUpdate" | lengthDelimited(channelUpdateCodec)) :: + ("localShutdown" | optional(bool8, lengthDelimited(shutdownCodec))) :: + ("remoteShutdown" | optional(bool8, lengthDelimited(shutdownCodec))) :: + ("closingFeerates" | optional(bool8, closingFeeratesCodec)) :: + ("spliceStatus" | spliceStatusCodec)).as[DATA_NORMAL] + + val DATA_NORMAL_0e_Codec: Codec[DATA_NORMAL] = ( + ("commitments" | versionedCommitmentsCodec) :: ("shortids" | shortids) :: ("channelAnnouncement" | optional(bool8, lengthDelimited(channelAnnouncementCodec))) :: ("channelUpdate" | lengthDelimited(channelUpdateCodec)) :: @@ -606,20 +672,45 @@ private[channel] object ChannelCodecs4 { ("spliceStatus" | spliceStatusCodec)).as[DATA_NORMAL] val DATA_SHUTDOWN_05_Codec: Codec[DATA_SHUTDOWN] = ( - ("commitments" | commitmentsCodec) :: + ("commitments" | commitmentsCodecWithoutFirstRemoteCommitIndex) :: + ("localShutdown" | lengthDelimited(shutdownCodec)) :: + ("remoteShutdown" | lengthDelimited(shutdownCodec)) :: + ("closingFeerates" | optional(bool8, closingFeeratesCodec))).as[DATA_SHUTDOWN] + + val DATA_SHUTDOWN_0f_Codec: Codec[DATA_SHUTDOWN] = ( + ("commitments" | versionedCommitmentsCodec) :: ("localShutdown" | lengthDelimited(shutdownCodec)) :: ("remoteShutdown" | lengthDelimited(shutdownCodec)) :: ("closingFeerates" | optional(bool8, closingFeeratesCodec))).as[DATA_SHUTDOWN] val DATA_NEGOTIATING_06_Codec: Codec[DATA_NEGOTIATING] = ( - ("commitments" | commitmentsCodec) :: + ("commitments" | commitmentsCodecWithoutFirstRemoteCommitIndex) :: + ("localShutdown" | lengthDelimited(shutdownCodec)) :: + ("remoteShutdown" | lengthDelimited(shutdownCodec)) :: + ("closingTxProposed" | listOfN(uint16, listOfN(uint16, lengthDelimited(closingTxProposedCodec)))) :: + ("bestUnpublishedClosingTx_opt" | optional(bool8, closingTxCodec))).as[DATA_NEGOTIATING] + + val DATA_NEGOTIATING_10_Codec: Codec[DATA_NEGOTIATING] = ( + ("commitments" | versionedCommitmentsCodec) :: ("localShutdown" | lengthDelimited(shutdownCodec)) :: ("remoteShutdown" | lengthDelimited(shutdownCodec)) :: ("closingTxProposed" | listOfN(uint16, listOfN(uint16, lengthDelimited(closingTxProposedCodec)))) :: ("bestUnpublishedClosingTx_opt" | optional(bool8, closingTxCodec))).as[DATA_NEGOTIATING] val DATA_CLOSING_07_Codec: Codec[DATA_CLOSING] = ( - ("commitments" | commitmentsCodec) :: + ("commitments" | commitmentsCodecWithoutFirstRemoteCommitIndex) :: + ("waitingSince" | blockHeight) :: + ("finalScriptPubKey" | lengthDelimited(bytes)) :: + ("mutualCloseProposed" | listOfN(uint16, closingTxCodec)) :: + ("mutualClosePublished" | listOfN(uint16, closingTxCodec)) :: + ("localCommitPublished" | optional(bool8, localCommitPublishedCodec)) :: + ("remoteCommitPublished" | optional(bool8, remoteCommitPublishedCodec)) :: + ("nextRemoteCommitPublished" | optional(bool8, remoteCommitPublishedCodec)) :: + ("futureRemoteCommitPublished" | optional(bool8, remoteCommitPublishedCodec)) :: + ("revokedCommitPublished" | listOfN(uint16, revokedCommitPublishedCodec))).as[DATA_CLOSING] + + val DATA_CLOSING_11_Codec: Codec[DATA_CLOSING] = ( + ("commitments" | versionedCommitmentsCodec) :: ("waitingSince" | blockHeight) :: ("finalScriptPubKey" | lengthDelimited(bytes)) :: ("mutualCloseProposed" | listOfN(uint16, closingTxCodec)) :: @@ -631,12 +722,25 @@ private[channel] object ChannelCodecs4 { ("revokedCommitPublished" | listOfN(uint16, revokedCommitPublishedCodec))).as[DATA_CLOSING] val DATA_WAIT_FOR_REMOTE_PUBLISH_FUTURE_COMMITMENT_08_Codec: Codec[DATA_WAIT_FOR_REMOTE_PUBLISH_FUTURE_COMMITMENT] = ( - ("commitments" | commitmentsCodec) :: + ("commitments" | commitmentsCodecWithoutFirstRemoteCommitIndex) :: + ("remoteChannelReestablish" | channelReestablishCodec)).as[DATA_WAIT_FOR_REMOTE_PUBLISH_FUTURE_COMMITMENT] + + val DATA_WAIT_FOR_REMOTE_PUBLISH_FUTURE_COMMITMENT_12_Codec: Codec[DATA_WAIT_FOR_REMOTE_PUBLISH_FUTURE_COMMITMENT] = ( + ("commitments" | versionedCommitmentsCodec) :: ("remoteChannelReestablish" | channelReestablishCodec)).as[DATA_WAIT_FOR_REMOTE_PUBLISH_FUTURE_COMMITMENT] } // Order matters! val channelDataCodec: Codec[PersistentChannelData] = discriminated[PersistentChannelData].by(uint16) + .typecase(0x12, Codecs.DATA_WAIT_FOR_REMOTE_PUBLISH_FUTURE_COMMITMENT_12_Codec) + .typecase(0x11, Codecs.DATA_CLOSING_11_Codec) + .typecase(0x10, Codecs.DATA_NEGOTIATING_10_Codec) + .typecase(0x0f, Codecs.DATA_SHUTDOWN_0f_Codec) + .typecase(0x0e, Codecs.DATA_NORMAL_0e_Codec) + .typecase(0x0d, Codecs.DATA_WAIT_FOR_DUAL_FUNDING_READY_0d_Codec) + .typecase(0x0c, Codecs.DATA_WAIT_FOR_DUAL_FUNDING_CONFIRMED_0c_Codec) + .typecase(0x0b, Codecs.DATA_WAIT_FOR_CHANNEL_READY_0b_Codec) + .typecase(0x0a, Codecs.DATA_WAIT_FOR_FUNDING_CONFIRMED_0a_Codec) .typecase(0x09, Codecs.DATA_WAIT_FOR_DUAL_FUNDING_SIGNED_09_Codec) .typecase(0x08, Codecs.DATA_WAIT_FOR_REMOTE_PUBLISH_FUTURE_COMMITMENT_08_Codec) .typecase(0x07, Codecs.DATA_CLOSING_07_Codec) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala b/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala index 56ba361d16..4389229109 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/TestConstants.scala @@ -23,6 +23,7 @@ import fr.acinq.eclair.blockchain.fee._ import fr.acinq.eclair.channel.fsm.Channel.{ChannelConf, RemoteRbfLimits, UnhandledExceptionStrategy} import fr.acinq.eclair.channel.{ChannelFlags, LocalParams} import fr.acinq.eclair.crypto.keymanager.{LocalChannelKeyManager, LocalNodeKeyManager} +import fr.acinq.eclair.db.RevokedHtlcInfoCleaner import fr.acinq.eclair.io.MessageRelay.RelayAll import fr.acinq.eclair.io.{OpenChannelInterceptor, PeerConnection} import fr.acinq.eclair.message.OnionMessages.OnionMessageConfig @@ -225,7 +226,8 @@ object TestConstants { timeout = 200 millis, maxAttempts = 2, ), - purgeInvoicesInterval = None + purgeInvoicesInterval = None, + revokedHtlcInfoCleanerConfig = RevokedHtlcInfoCleaner.Config(10, 100 millis) ) def channelParams: LocalParams = OpenChannelInterceptor.makeChannelParams( @@ -391,7 +393,8 @@ object TestConstants { timeout = 100 millis, maxAttempts = 2, ), - purgeInvoicesInterval = None + purgeInvoicesInterval = None, + revokedHtlcInfoCleanerConfig = RevokedHtlcInfoCleaner.Config(10, 100 millis) ) def channelParams: LocalParams = OpenChannelInterceptor.makeChannelParams( diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/CommitmentsSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/CommitmentsSpec.scala index f3cfa2940b..467b0f5f96 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/CommitmentsSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/CommitmentsSpec.scala @@ -496,7 +496,7 @@ object CommitmentsSpec { Commitments( ChannelParams(randomBytes32(), ChannelConfig.standard, ChannelFeatures(), localParams, remoteParams, ChannelFlags(announceChannel = announceChannel)), CommitmentChanges(LocalChanges(Nil, Nil, Nil), RemoteChanges(Nil, Nil, Nil), localNextHtlcId = 1, remoteNextHtlcId = 1), - List(Commitment(0, remoteFundingPubKey, LocalFundingStatus.SingleFundedUnconfirmedFundingTx(None), RemoteFundingStatus.Locked, localCommit, remoteCommit, None)), + List(Commitment(0, 0, remoteFundingPubKey, LocalFundingStatus.SingleFundedUnconfirmedFundingTx(None), RemoteFundingStatus.Locked, localCommit, remoteCommit, None)), inactive = Nil, Right(randomKey().publicKey), ShaChain.init, @@ -515,7 +515,7 @@ object CommitmentsSpec { Commitments( ChannelParams(randomBytes32(), ChannelConfig.standard, ChannelFeatures(), localParams, remoteParams, ChannelFlags(announceChannel = announceChannel)), CommitmentChanges(LocalChanges(Nil, Nil, Nil), RemoteChanges(Nil, Nil, Nil), localNextHtlcId = 1, remoteNextHtlcId = 1), - List(Commitment(0, remoteFundingPubKey, LocalFundingStatus.SingleFundedUnconfirmedFundingTx(None), RemoteFundingStatus.Locked, localCommit, remoteCommit, None)), + List(Commitment(0, 0, remoteFundingPubKey, LocalFundingStatus.SingleFundedUnconfirmedFundingTx(None), RemoteFundingStatus.Locked, localCommit, remoteCommit, None)), inactive = Nil, Right(randomKey().publicKey), ShaChain.init, diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/NormalSplicesStateSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/NormalSplicesStateSpec.scala index 11aae21ead..8dafe0e5bb 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/NormalSplicesStateSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/channel/states/e/NormalSplicesStateSpec.scala @@ -33,8 +33,9 @@ import fr.acinq.eclair.channel.fsm.Channel import fr.acinq.eclair.channel.fund.InteractiveTxBuilder.FullySignedSharedTransaction import fr.acinq.eclair.channel.publish.TxPublisher.{PublishFinalTx, PublishReplaceableTx, PublishTx, SetChannelId} import fr.acinq.eclair.channel.states.ChannelStateTestsBase.{FakeTxPublisherFactory, PimpTestFSM} -import fr.acinq.eclair.channel.states.ChannelStateTestsTags.{AnchorOutputsZeroFeeHtlcTxs, NoMaxHtlcValueInFlight, ZeroConf} +import fr.acinq.eclair.channel.states.ChannelStateTestsTags._ import fr.acinq.eclair.channel.states.{ChannelStateTestsBase, ChannelStateTestsTags} +import fr.acinq.eclair.db.RevokedHtlcInfoCleaner.ForgetHtlcInfos import fr.acinq.eclair.payment.relay.Relayer import fr.acinq.eclair.testutils.PimpTestProbe.convert import fr.acinq.eclair.transactions.DirectedHtlc.{incoming, outgoing} @@ -57,7 +58,7 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik implicit val log: akka.event.LoggingAdapter = akka.event.NoLogging override def withFixture(test: OneArgTest): Outcome = { - val tags = test.tags + ChannelStateTestsTags.DualFunding + ChannelStateTestsTags.Splicing + val tags = test.tags + DualFunding + Splicing val setup = init(tags = tags) import setup._ reachNormal(setup, tags) @@ -275,10 +276,10 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik } test("recv CMD_SPLICE (splice-in, non dual-funded channel)") { () => - val f = init(tags = Set(ChannelStateTestsTags.DualFunding, ChannelStateTestsTags.Splicing)) + val f = init(tags = Set(DualFunding, Splicing)) import f._ - reachNormal(f, tags = Set(ChannelStateTestsTags.Splicing)) // we open a non dual-funded channel + reachNormal(f, tags = Set(Splicing)) // we open a non dual-funded channel alice2bob.ignoreMsg { case _: ChannelUpdate => true } bob2alice.ignoreMsg { case _: ChannelUpdate => true } awaitCond(alice.stateName == NORMAL && bob.stateName == NORMAL) @@ -303,7 +304,7 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik assert(postSpliceState.commitments.latest.remoteChannelReserve == 15_000.sat) } - test("recv CMD_SPLICE (splice-in, local and remote commit index mismatch)", Tag(ChannelStateTestsTags.Quiescence)) { f => + test("recv CMD_SPLICE (splice-in, local and remote commit index mismatch)", Tag(Quiescence)) { f => import f._ // Alice and Bob asynchronously exchange HTLCs, which makes their commit indices diverge. @@ -378,7 +379,7 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik sender.expectMsgType[RES_FAILURE[_, _]] } - test("recv CMD_SPLICE (splice-out, would go below reserve, quiescent)", Tag(ChannelStateTestsTags.Quiescence), Tag(NoMaxHtlcValueInFlight)) { f => + test("recv CMD_SPLICE (splice-out, would go below reserve, quiescent)", Tag(Quiescence), Tag(NoMaxHtlcValueInFlight)) { f => import f._ setupHtlcs(f) @@ -471,7 +472,7 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik testSpliceInAndOutCmd(f) } - test("recv CMD_SPLICE (splice-in + splice-out, quiescence)", Tag(ChannelStateTestsTags.Quiescence)) { f => + test("recv CMD_SPLICE (splice-in + splice-out, quiescence)", Tag(Quiescence)) { f => testSpliceInAndOutCmd(f) } @@ -771,19 +772,27 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik assert(bob.stateData.asInstanceOf[DATA_NORMAL].commitments.inactive.map(_.fundingTxIndex) == Seq.empty) } - test("emit post-splice events", Tag(NoMaxHtlcValueInFlight)) { f => + test("emit post-splice events", Tag(NoMaxHtlcValueInFlight), Tag(Quiescence)) { f => import f._ + // Alice and Bob asynchronously exchange HTLCs, which makes their commit indices diverge. + addHtlc(25_000_000 msat, alice, bob, alice2bob, bob2alice) + addHtlc(50_000_000 msat, bob, alice, bob2alice, alice2bob) + crossSign(alice, bob, alice2bob, bob2alice) + val initialState = alice.stateData.asInstanceOf[DATA_NORMAL] assert(initialState.commitments.latest.capacity == 1_500_000.sat) - assert(initialState.commitments.latest.localCommit.spec.toLocal == 800_000_000.msat) - assert(initialState.commitments.latest.localCommit.spec.toRemote == 700_000_000.msat) + assert(initialState.commitments.latest.localCommit.spec.toLocal == 775_000_000.msat) + assert(initialState.commitments.latest.localCommit.spec.toRemote == 650_000_000.msat) + assert(initialState.commitments.localCommitIndex != initialState.commitments.remoteCommitIndex) val aliceEvents = TestProbe() val bobEvents = TestProbe() + systemA.eventStream.subscribe(aliceEvents.ref, classOf[ForgetHtlcInfos]) systemA.eventStream.subscribe(aliceEvents.ref, classOf[AvailableBalanceChanged]) systemA.eventStream.subscribe(aliceEvents.ref, classOf[LocalChannelUpdate]) systemA.eventStream.subscribe(aliceEvents.ref, classOf[LocalChannelDown]) + systemB.eventStream.subscribe(bobEvents.ref, classOf[ForgetHtlcInfos]) systemB.eventStream.subscribe(bobEvents.ref, classOf[AvailableBalanceChanged]) systemB.eventStream.subscribe(bobEvents.ref, classOf[LocalChannelUpdate]) systemB.eventStream.subscribe(bobEvents.ref, classOf[LocalChannelDown]) @@ -797,14 +806,16 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik alice ! WatchFundingConfirmedTriggered(BlockHeight(400000), 42, fundingTx1) alice2bob.expectMsgType[SpliceLocked] alice2bob.forward(bob) + aliceEvents.expectMsg(ForgetHtlcInfos(initialState.channelId, initialState.commitments.remoteCommitIndex)) aliceEvents.expectNoMessage(100 millis) bobEvents.expectNoMessage(100 millis) bob ! WatchFundingConfirmedTriggered(BlockHeight(400000), 42, fundingTx1) bob2alice.expectMsgType[SpliceLocked] bob2alice.forward(alice) - aliceEvents.expectAvailableBalanceChanged(balance = 1_300_000_000.msat, capacity = 2_000_000.sat) - bobEvents.expectAvailableBalanceChanged(balance = 700_000_000.msat, capacity = 2_000_000.sat) + aliceEvents.expectAvailableBalanceChanged(balance = 1_275_000_000.msat, capacity = 2_000_000.sat) + bobEvents.expectMsg(ForgetHtlcInfos(initialState.channelId, initialState.commitments.localCommitIndex)) + bobEvents.expectAvailableBalanceChanged(balance = 650_000_000.msat, capacity = 2_000_000.sat) aliceEvents.expectNoMessage(100 millis) bobEvents.expectNoMessage(100 millis) @@ -812,13 +823,15 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik bob2alice.expectMsgType[SpliceLocked] bob2alice.forward(alice) aliceEvents.expectNoMessage(100 millis) + bobEvents.expectMsg(ForgetHtlcInfos(initialState.channelId, initialState.commitments.localCommitIndex)) bobEvents.expectNoMessage(100 millis) alice ! WatchFundingConfirmedTriggered(BlockHeight(400000), 42, fundingTx2) alice2bob.expectMsgType[SpliceLocked] alice2bob.forward(bob) - aliceEvents.expectAvailableBalanceChanged(balance = 1_800_000_000.msat, capacity = 2_500_000.sat) - bobEvents.expectAvailableBalanceChanged(balance = 700_000_000.msat, capacity = 2_500_000.sat) + aliceEvents.expectMsg(ForgetHtlcInfos(initialState.channelId, initialState.commitments.remoteCommitIndex)) + aliceEvents.expectAvailableBalanceChanged(balance = 1_775_000_000.msat, capacity = 2_500_000.sat) + bobEvents.expectAvailableBalanceChanged(balance = 650_000_000.msat, capacity = 2_500_000.sat) aliceEvents.expectNoMessage(100 millis) bobEvents.expectNoMessage(100 millis) } @@ -1124,7 +1137,7 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik testDisconnectCommitSigNotReceived(f) } - test("disconnect (commit_sig not received, quiescence)", Tag(ChannelStateTestsTags.Quiescence)) { f => + test("disconnect (commit_sig not received, quiescence)", Tag(Quiescence)) { f => testDisconnectCommitSigNotReceived(f) } @@ -1164,7 +1177,7 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik testDisconnectCommitSigReceivedByAlice(f) } - test("disconnect (commit_sig received by alice, quiescence)", Tag(ChannelStateTestsTags.Quiescence)) { f => + test("disconnect (commit_sig received by alice, quiescence)", Tag(Quiescence)) { f => testDisconnectCommitSigReceivedByAlice(f) } @@ -1206,7 +1219,7 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik testDisconnectTxSignaturesSentByBob(f) } - test("disconnect (tx_signatures sent by bob, quiescence)", Tag(ChannelStateTestsTags.Quiescence)) { f => + test("disconnect (tx_signatures sent by bob, quiescence)", Tag(Quiescence)) { f => testDisconnectTxSignaturesSentByBob(f) } @@ -1255,7 +1268,7 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik testDisconnectTxSignaturesReceivedByAlice(f) } - test("disconnect (tx_signatures received by alice, quiescence)", Tag(ChannelStateTestsTags.Quiescence)) { f => + test("disconnect (tx_signatures received by alice, quiescence)", Tag(Quiescence)) { f => testDisconnectTxSignaturesReceivedByAlice(f) } @@ -1301,11 +1314,11 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik resolveHtlcs(f, htlcs, spliceOutFee = 0.sat) } - test("disconnect (tx_signatures received by alice, zero-conf)", Tag(ChannelStateTestsTags.ZeroConf), Tag(ChannelStateTestsTags.AnchorOutputsZeroFeeHtlcTxs)) { f => + test("disconnect (tx_signatures received by alice, zero-conf)", Tag(ZeroConf), Tag(AnchorOutputsZeroFeeHtlcTxs)) { f => testDisconnectTxSignaturesReceivedByAliceZeroConf(f) } - test("disconnect (tx_signatures received by alice, zero-conf, quiescence)", Tag(ChannelStateTestsTags.ZeroConf), Tag(ChannelStateTestsTags.AnchorOutputsZeroFeeHtlcTxs), Tag(ChannelStateTestsTags.Quiescence)) { f => + test("disconnect (tx_signatures received by alice, zero-conf, quiescence)", Tag(ZeroConf), Tag(AnchorOutputsZeroFeeHtlcTxs), Tag(Quiescence)) { f => testDisconnectTxSignaturesReceivedByAliceZeroConf(f) } @@ -1341,7 +1354,7 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik awaitCond(bob.stateData.asInstanceOf[DATA_NORMAL].spliceStatus == SpliceStatus.NoSplice) } - test("don't resend splice_locked when zero-conf channel confirms", Tag(ChannelStateTestsTags.ZeroConf), Tag(ChannelStateTestsTags.AnchorOutputsZeroFeeHtlcTxs)) { f => + test("don't resend splice_locked when zero-conf channel confirms", Tag(ZeroConf), Tag(AnchorOutputsZeroFeeHtlcTxs)) { f => import f._ initiateSplice(f, spliceIn_opt = Some(SpliceIn(500_000 sat, pushAmount = 0 msat))) @@ -1538,7 +1551,7 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik testForceCloseWithMultipleSplicesSimple(f) } - test("force-close with multiple splices (simple, quiescence)", Tag(ChannelStateTestsTags.Quiescence)) { f => + test("force-close with multiple splices (simple, quiescence)", Tag(Quiescence)) { f => testForceCloseWithMultipleSplicesSimple(f) } @@ -1620,7 +1633,7 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik testForceCloseWithMultipleSplicesPreviousActiveRemote(f) } - test("force-close with multiple splices (previous active remote, quiescence)", Tag(ChannelStateTestsTags.Quiescence)) { f => + test("force-close with multiple splices (previous active remote, quiescence)", Tag(Quiescence)) { f => testForceCloseWithMultipleSplicesPreviousActiveRemote(f) } @@ -1698,7 +1711,7 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik testForceCloseWithMultipleSplicesPreviousActiveRevoked(f) } - test("force-close with multiple splices (previous active revoked, quiescent)", Tag(ChannelStateTestsTags.Quiescence)) { f => + test("force-close with multiple splices (previous active revoked, quiescent)", Tag(Quiescence)) { f => testForceCloseWithMultipleSplicesPreviousActiveRevoked(f) } @@ -1809,11 +1822,11 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik assert(Helpers.Closing.isClosed(alice.stateData.asInstanceOf[DATA_CLOSING], None).exists(_.isInstanceOf[RemoteClose])) } - test("force-close with multiple splices (inactive remote)", Tag(ChannelStateTestsTags.ZeroConf), Tag(ChannelStateTestsTags.AnchorOutputsZeroFeeHtlcTxs)) { f => + test("force-close with multiple splices (inactive remote)", Tag(ZeroConf), Tag(AnchorOutputsZeroFeeHtlcTxs)) { f => testForceCloseWithMultipleSplicesInactiveRemote(f) } - test("force-close with multiple splices (inactive remote, quiescence)", Tag(ChannelStateTestsTags.Quiescence), Tag(ChannelStateTestsTags.ZeroConf), Tag(ChannelStateTestsTags.AnchorOutputsZeroFeeHtlcTxs)) { f => + test("force-close with multiple splices (inactive remote, quiescence)", Tag(Quiescence), Tag(ZeroConf), Tag(AnchorOutputsZeroFeeHtlcTxs)) { f => testForceCloseWithMultipleSplicesInactiveRemote(f) } @@ -1928,11 +1941,11 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik assert(Helpers.Closing.isClosed(alice.stateData.asInstanceOf[DATA_CLOSING], None).exists(_.isInstanceOf[RevokedClose])) } - test("force-close with multiple splices (inactive revoked)", Tag(ChannelStateTestsTags.ZeroConf), Tag(ChannelStateTestsTags.AnchorOutputsZeroFeeHtlcTxs)) { f => + test("force-close with multiple splices (inactive revoked)", Tag(ZeroConf), Tag(AnchorOutputsZeroFeeHtlcTxs)) { f => testForceCloseWithMultipleSplicesInactiveRevoked(f) } - test("force-close with multiple splices (inactive revoked, quiescence)", Tag(ChannelStateTestsTags.Quiescence), Tag(ChannelStateTestsTags.ZeroConf), Tag(ChannelStateTestsTags.AnchorOutputsZeroFeeHtlcTxs)) { f => + test("force-close with multiple splices (inactive revoked, quiescence)", Tag(Quiescence), Tag(ZeroConf), Tag(AnchorOutputsZeroFeeHtlcTxs)) { f => testForceCloseWithMultipleSplicesInactiveRevoked(f) } @@ -1971,7 +1984,7 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik bob2blockchain.expectNoMessage(100 millis) } - test("put back watches after restart (inactive)", Tag(ChannelStateTestsTags.ZeroConf), Tag(ChannelStateTestsTags.AnchorOutputsZeroFeeHtlcTxs)) { f => + test("put back watches after restart (inactive)", Tag(ZeroConf), Tag(AnchorOutputsZeroFeeHtlcTxs)) { f => import f._ val fundingTx0 = alice.stateData.asInstanceOf[DATA_NORMAL].commitments.latest.localFundingStatus.signedTx_opt.get @@ -2030,7 +2043,7 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik bob2blockchain.expectNoMessage(100 millis) } - test("recv CMD_SPLICE (splice-in + splice-out) with pre and post splice htlcs", Tag(ChannelStateTestsTags.Quiescence)) { f => + test("recv CMD_SPLICE (splice-in + splice-out) with pre and post splice htlcs", Tag(Quiescence)) { f => import f._ val htlcs = setupHtlcs(f) @@ -2067,7 +2080,7 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik resolveHtlcs(f, htlcs, spliceOutFee = 0.sat) } - test("recv CMD_SPLICE (splice-in + splice-out) with pending htlcs, resolved after splice locked", Tag(ChannelStateTestsTags.Quiescence), Tag(ChannelStateTestsTags.AnchorOutputsZeroFeeHtlcTxs)) { f => + test("recv CMD_SPLICE (splice-in + splice-out) with pending htlcs, resolved after splice locked", Tag(Quiescence), Tag(AnchorOutputsZeroFeeHtlcTxs)) { f => import f._ val htlcs = setupHtlcs(f) @@ -2086,7 +2099,7 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik resolveHtlcs(f, htlcs, spliceOutFee = 0.sat) } - test("recv multiple CMD_SPLICE (splice-in, splice-out, quiescence)", Tag(ChannelStateTestsTags.Quiescence)) { f => + test("recv multiple CMD_SPLICE (splice-in, splice-out, quiescence)", Tag(Quiescence)) { f => val htlcs = setupHtlcs(f) initiateSplice(f, spliceIn_opt = Some(SpliceIn(500_000 sat))) @@ -2095,7 +2108,7 @@ class NormalSplicesStateSpec extends TestKitBaseClass with FixtureAnyFunSuiteLik resolveHtlcs(f, htlcs, spliceOutFee = spliceOutFee(f, capacity = 1_900_000.sat)) } - test("recv invalid htlc signatures during splice-in", Tag(ChannelStateTestsTags.Quiescence)) { f => + test("recv invalid htlc signatures during splice-in", Tag(Quiescence)) { f => import f._ val htlcs = setupHtlcs(f) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/db/ChannelsDbSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/db/ChannelsDbSpec.scala index 25cd2ef44c..bc1e5c2a6a 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/db/ChannelsDbSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/db/ChannelsDbSpec.scala @@ -18,8 +18,7 @@ package fr.acinq.eclair.db import com.softwaremill.quicklens._ import fr.acinq.bitcoin.scalacompat.ByteVector32 -import fr.acinq.bitcoin.scalacompat.Crypto.PrivateKey -import fr.acinq.bitcoin.scalacompat.Crypto.PublicKey +import fr.acinq.bitcoin.scalacompat.Crypto.{PrivateKey, PublicKey} import fr.acinq.eclair.TestDatabases.{TestPgDatabases, TestSqliteDatabases, migrationCheck} import fr.acinq.eclair.channel.RealScidStatus import fr.acinq.eclair.db.ChannelsDbSpec.{getPgTimestamp, getTimestamp, testCases} @@ -31,7 +30,7 @@ import fr.acinq.eclair.db.sqlite.SqliteChannelsDb import fr.acinq.eclair.db.sqlite.SqliteUtils.ExtendedResultSet._ import fr.acinq.eclair.wire.internal.channel.ChannelCodecs.channelDataCodec import fr.acinq.eclair.wire.internal.channel.ChannelCodecsSpec -import fr.acinq.eclair.{CltvExpiry, RealShortChannelId, TestDatabases, TimestampSecond, randomBytes32, randomKey} +import fr.acinq.eclair.{CltvExpiry, RealShortChannelId, TestDatabases, randomBytes32, randomKey} import org.scalatest.funsuite.AnyFunSuite import scodec.bits.ByteVector @@ -59,7 +58,6 @@ class ChannelsDbSpec extends AnyFunSuite { test("add/remove/list channels") { forAllDbs { dbs => val db = dbs.channels - dbs.pendingCommands // needed by db.removeChannel val channel1 = ChannelCodecsSpec.normal val channel2a = ChannelCodecsSpec.normal.modify(_.commitments.params.channelId).setTo(randomBytes32()) @@ -73,7 +71,7 @@ class ChannelsDbSpec extends AnyFunSuite { intercept[SQLException](db.addHtlcInfo(channel1.channelId, commitNumber, paymentHash1, cltvExpiry1)) // no related channel - assert(db.listLocalChannels().toSet == Set.empty) + assert(db.listLocalChannels().isEmpty) db.addOrUpdateChannel(channel1) db.addOrUpdateChannel(channel1) assert(db.listLocalChannels() == List(channel1)) @@ -85,11 +83,11 @@ class ChannelsDbSpec extends AnyFunSuite { assert(db.listLocalChannels() == List(channel1, channel2b)) assert(db.getChannel(channel2b.channelId).contains(channel2b)) - assert(db.listHtlcInfos(channel1.channelId, commitNumber).toList == Nil) + assert(db.listHtlcInfos(channel1.channelId, commitNumber).isEmpty) db.addHtlcInfo(channel1.channelId, commitNumber, paymentHash1, cltvExpiry1) db.addHtlcInfo(channel1.channelId, commitNumber, paymentHash2, cltvExpiry2) - assert(db.listHtlcInfos(channel1.channelId, commitNumber).toList.toSet == Set((paymentHash1, cltvExpiry1), (paymentHash2, cltvExpiry2))) - assert(db.listHtlcInfos(channel1.channelId, 43).toList == Nil) + assert(db.listHtlcInfos(channel1.channelId, commitNumber).toSet == Set((paymentHash1, cltvExpiry1), (paymentHash2, cltvExpiry2))) + assert(db.listHtlcInfos(channel1.channelId, commitNumber + 1).isEmpty) assert(db.listClosedChannels(None, None).isEmpty) db.removeChannel(channel1.channelId) @@ -97,11 +95,70 @@ class ChannelsDbSpec extends AnyFunSuite { assert(db.listLocalChannels() == List(channel2b)) assert(db.listClosedChannels(None, None) == List(channel1)) assert(db.listClosedChannels(Some(channel1.remoteNodeId), None) == List(channel1)) - assert(db.listClosedChannels(Some(PrivateKey(randomBytes32()).publicKey), None) == Nil) - assert(db.listHtlcInfos(channel1.channelId, commitNumber).toList == Nil) + assert(db.listClosedChannels(Some(PrivateKey(randomBytes32()).publicKey), None).isEmpty) + db.removeChannel(channel2b.channelId) assert(db.getChannel(channel2b.channelId).isEmpty) - assert(db.listLocalChannels() == Nil) + assert(db.listLocalChannels().isEmpty) + } + } + + test("remove htlc infos") { + forAllDbs { dbs => + val db = dbs.channels + + val channel1 = ChannelCodecsSpec.normal + val channel2 = ChannelCodecsSpec.normal.modify(_.commitments.params.channelId).setTo(randomBytes32()) + db.addOrUpdateChannel(channel1) + db.addOrUpdateChannel(channel2) + + val commitNumberSplice1 = 50 + val commitNumberSplice2 = 75 + + // The first channel has one splice transaction and is then closed. + db.addHtlcInfo(channel1.channelId, 49, randomBytes32(), CltvExpiry(561)) + db.addHtlcInfo(channel1.channelId, 50, randomBytes32(), CltvExpiry(561)) + db.addHtlcInfo(channel1.channelId, 50, randomBytes32(), CltvExpiry(561)) + db.markHtlcInfosForRemoval(channel1.channelId, commitNumberSplice1) + db.addHtlcInfo(channel1.channelId, 51, randomBytes32(), CltvExpiry(561)) + db.addHtlcInfo(channel1.channelId, 52, randomBytes32(), CltvExpiry(561)) + db.removeChannel(channel1.channelId) + + // The second channel has two splice transactions. + db.addHtlcInfo(channel2.channelId, 48, randomBytes32(), CltvExpiry(561)) + db.addHtlcInfo(channel2.channelId, 48, randomBytes32(), CltvExpiry(561)) + db.addHtlcInfo(channel2.channelId, 49, randomBytes32(), CltvExpiry(561)) + db.addHtlcInfo(channel2.channelId, 50, randomBytes32(), CltvExpiry(561)) + db.markHtlcInfosForRemoval(channel2.channelId, commitNumberSplice1) + db.addHtlcInfo(channel2.channelId, 74, randomBytes32(), CltvExpiry(561)) + db.addHtlcInfo(channel2.channelId, 75, randomBytes32(), CltvExpiry(561)) + db.addHtlcInfo(channel2.channelId, 76, randomBytes32(), CltvExpiry(561)) + db.markHtlcInfosForRemoval(channel2.channelId, commitNumberSplice2) + + // We asynchronously clean-up the HTLC data from the DB in small batches. + val obsoleteHtlcInfo = Seq( + (channel1.channelId, 49), + (channel1.channelId, 50), + (channel1.channelId, 51), + (channel1.channelId, 52), + (channel2.channelId, 48), + (channel2.channelId, 49), + (channel2.channelId, 50), + (channel2.channelId, 74), + ) + db.removeHtlcInfos(10) // This should remove all the data for one of the two channels in one batch + assert(obsoleteHtlcInfo.flatMap { case (channelId, commitNumber) => db.listHtlcInfos(channelId, commitNumber) }.size == 5) + db.removeHtlcInfos(3) // This should remove only part of the data for the remaining channel + assert(obsoleteHtlcInfo.flatMap { case (channelId, commitNumber) => db.listHtlcInfos(channelId, commitNumber) }.size == 2) + db.removeHtlcInfos(3) // This should remove the rest of the data for the remaining channel + obsoleteHtlcInfo.foreach { case (channelId, commitNumber) => db.listHtlcInfos(channelId, commitNumber).isEmpty } + + // The remaining HTLC data shouldn't be removed. + assert(db.listHtlcInfos(channel2.channelId, 75).nonEmpty) + assert(db.listHtlcInfos(channel2.channelId, 76).nonEmpty) + db.removeHtlcInfos(10) // no-op + assert(db.listHtlcInfos(channel2.channelId, 75).nonEmpty) + assert(db.listHtlcInfos(channel2.channelId, 76).nonEmpty) } } diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/db/RevokedHtlcInfoCleanerSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/db/RevokedHtlcInfoCleanerSpec.scala new file mode 100644 index 0000000000..2200e97534 --- /dev/null +++ b/eclair-core/src/test/scala/fr/acinq/eclair/db/RevokedHtlcInfoCleanerSpec.scala @@ -0,0 +1,84 @@ +/* + * Copyright 2023 ACINQ SAS + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package fr.acinq.eclair.db + +import akka.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit +import akka.actor.typed.eventstream.EventStream +import com.softwaremill.quicklens.ModifyPimp +import com.typesafe.config.ConfigFactory +import fr.acinq.eclair.TestDatabases.TestSqliteDatabases +import fr.acinq.eclair.db.RevokedHtlcInfoCleaner.ForgetHtlcInfos +import fr.acinq.eclair.wire.internal.channel.ChannelCodecsSpec +import fr.acinq.eclair.{CltvExpiry, randomBytes32} +import org.scalatest.funsuite.AnyFunSuiteLike + +import scala.concurrent.duration.DurationInt + +class RevokedHtlcInfoCleanerSpec extends ScalaTestWithActorTestKit(ConfigFactory.load("application")) with AnyFunSuiteLike { + + test("remove htlc info from closed channels at regular intervals") { + val channelsDb = TestSqliteDatabases().channels + + val channelId = randomBytes32() + channelsDb.addOrUpdateChannel(ChannelCodecsSpec.normal.modify(_.commitments.params.channelId).setTo(channelId)) + channelsDb.addHtlcInfo(channelId, 17, randomBytes32(), CltvExpiry(561)) + channelsDb.addHtlcInfo(channelId, 19, randomBytes32(), CltvExpiry(1105)) + channelsDb.addHtlcInfo(channelId, 23, randomBytes32(), CltvExpiry(1729)) + channelsDb.removeChannel(channelId) + assert(channelsDb.listHtlcInfos(channelId, 17).nonEmpty) + assert(channelsDb.listHtlcInfos(channelId, 19).nonEmpty) + assert(channelsDb.listHtlcInfos(channelId, 23).nonEmpty) + + val config = RevokedHtlcInfoCleaner.Config(batchSize = 1, interval = 10 millis) + testKit.spawn(RevokedHtlcInfoCleaner(channelsDb, config)) + + eventually { + assert(channelsDb.listHtlcInfos(channelId, 17).isEmpty) + assert(channelsDb.listHtlcInfos(channelId, 19).isEmpty) + assert(channelsDb.listHtlcInfos(channelId, 23).isEmpty) + } + } + + test("remove htlc info from spliced channels at regular intervals") { + val channelsDb = TestSqliteDatabases().channels + + val channelId = randomBytes32() + channelsDb.addOrUpdateChannel(ChannelCodecsSpec.normal.modify(_.commitments.params.channelId).setTo(channelId)) + channelsDb.addHtlcInfo(channelId, 1, randomBytes32(), CltvExpiry(561)) + channelsDb.addHtlcInfo(channelId, 2, randomBytes32(), CltvExpiry(1105)) + channelsDb.addHtlcInfo(channelId, 2, randomBytes32(), CltvExpiry(1105)) + channelsDb.addHtlcInfo(channelId, 3, randomBytes32(), CltvExpiry(1729)) + channelsDb.addHtlcInfo(channelId, 3, randomBytes32(), CltvExpiry(1729)) + channelsDb.addHtlcInfo(channelId, 4, randomBytes32(), CltvExpiry(2465)) + (1 to 4).foreach(i => assert(channelsDb.listHtlcInfos(channelId, i).nonEmpty)) + + val config = RevokedHtlcInfoCleaner.Config(batchSize = 2, interval = 10 millis) + val htlcCleaner = testKit.spawn(RevokedHtlcInfoCleaner(channelsDb, config)) + + htlcCleaner ! ForgetHtlcInfos(channelId, beforeCommitIndex = 3) + eventually { + (1 to 2).foreach(i => assert(channelsDb.listHtlcInfos(channelId, i).isEmpty)) + } + (3 to 4).foreach(i => assert(channelsDb.listHtlcInfos(channelId, i).nonEmpty)) + + testKit.system.eventStream ! EventStream.Publish(ForgetHtlcInfos(channelId, beforeCommitIndex = 5)) + eventually { + (3 to 4).foreach(i => assert(channelsDb.listHtlcInfos(channelId, i).isEmpty)) + } + } + +} diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/json/JsonSerializersSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/json/JsonSerializersSpec.scala index 90ff1ecde8..6ff72c66cd 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/json/JsonSerializersSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/json/JsonSerializersSpec.scala @@ -133,7 +133,7 @@ class JsonSerializersSpec extends TestKitBaseClass with AnyFunSuiteLike with Mat Commitments( ChannelParams(dummyBytes32, ChannelConfig.standard, ChannelFeatures(), localParams, remoteParams, ChannelFlags(announceChannel = true)), CommitmentChanges(LocalChanges(Nil, Nil, Nil), RemoteChanges(Nil, Nil, Nil), localNextHtlcId = 1, remoteNextHtlcId = 1), - List(Commitment(0, dummyPublicKey, LocalFundingStatus.SingleFundedUnconfirmedFundingTx(None), RemoteFundingStatus.Locked, localCommit, remoteCommit, None)), + List(Commitment(0, 0, dummyPublicKey, LocalFundingStatus.SingleFundedUnconfirmedFundingTx(None), RemoteFundingStatus.Locked, localCommit, remoteCommit, None)), inactive = Nil, Right(dummyPublicKey), ShaChain.init, diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/payment/PaymentPacketSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/payment/PaymentPacketSpec.scala index d3b7be71be..624d566df9 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/payment/PaymentPacketSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/payment/PaymentPacketSpec.scala @@ -737,7 +737,7 @@ object PaymentPacketSpec { new Commitments( ChannelParams(channelId, ChannelConfig.standard, channelFeatures, localParams, remoteParams, channelFlags), CommitmentChanges(localChanges, remoteChanges, 0, 0), - List(Commitment(0, null, LocalFundingStatus.SingleFundedUnconfirmedFundingTx(None), RemoteFundingStatus.Locked, localCommit, remoteCommit, None)), + List(Commitment(0, 0, null, LocalFundingStatus.SingleFundedUnconfirmedFundingTx(None), RemoteFundingStatus.Locked, localCommit, remoteCommit, None)), inactive = Nil, Right(randomKey().publicKey), ShaChain.init, diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/wire/internal/channel/ChannelCodecsSpec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/wire/internal/channel/ChannelCodecsSpec.scala index cde6a80a7d..b6df964c9c 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/wire/internal/channel/ChannelCodecsSpec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/wire/internal/channel/ChannelCodecsSpec.scala @@ -93,7 +93,7 @@ class ChannelCodecsSpec extends AnyFunSuite { // and re-encode it with the new codec val bin_new = ByteVector(channelDataCodec.encode(data_new).require.toByteVector.toArray) // data should now be encoded under the new format - assert(bin_new.startsWith(hex"040000")) + assert(bin_new.startsWith(hex"04000a")) // now let's decode it again val data_new2 = channelDataCodec.decode(bin_new.toBitVector).require.value // data should match perfectly @@ -324,7 +324,7 @@ object ChannelCodecsSpec { val commitments = Commitments( ChannelParams(channelId, ChannelConfig.standard, ChannelFeatures(), localParams, remoteParams, channelFlags), CommitmentChanges(LocalChanges(Nil, Nil, Nil), RemoteChanges(Nil, Nil, Nil), localNextHtlcId = 32, remoteNextHtlcId = 4), - Seq(Commitment(fundingTxIndex, remoteFundingPubKey, LocalFundingStatus.SingleFundedUnconfirmedFundingTx(None), RemoteFundingStatus.NotLocked, localCommit, remoteCommit, None)), + Seq(Commitment(fundingTxIndex, 0, remoteFundingPubKey, LocalFundingStatus.SingleFundedUnconfirmedFundingTx(None), RemoteFundingStatus.NotLocked, localCommit, remoteCommit, None)), remoteNextCommitInfo = Right(randomKey().publicKey), remotePerCommitmentSecrets = ShaChain.init, originChannels = origins) diff --git a/eclair-core/src/test/scala/fr/acinq/eclair/wire/internal/channel/version4/ChannelCodecs4Spec.scala b/eclair-core/src/test/scala/fr/acinq/eclair/wire/internal/channel/version4/ChannelCodecs4Spec.scala index 0eeb1bd1cc..a1706baef6 100644 --- a/eclair-core/src/test/scala/fr/acinq/eclair/wire/internal/channel/version4/ChannelCodecs4Spec.scala +++ b/eclair-core/src/test/scala/fr/acinq/eclair/wire/internal/channel/version4/ChannelCodecs4Spec.scala @@ -182,7 +182,7 @@ class ChannelCodecs4Spec extends AnyFunSuite { PrivateKey(ByteVector.fromValidHex("02" * 32)).publicKey )), remoteFundingPubKey = PrivateKey(ByteVector.fromValidHex("01" * 32)).publicKey, - localOutputs = Nil, lockTime = 0, dustLimit = 330.sat, targetFeerate = FeeratePerKw(FeeratePerByte(3.sat)), requireConfirmedInputs = RequireConfirmedInputs(false, false)) + localOutputs = Nil, lockTime = 0, dustLimit = 330.sat, targetFeerate = FeeratePerKw(FeeratePerByte(3.sat)), requireConfirmedInputs = RequireConfirmedInputs(forLocal = false, forRemote = false)), ) assert(decoded == dualFundedUnconfirmedFundingTx) }