Skip to content

Commit

Permalink
move wallets outside of Peer
Browse files Browse the repository at this point in the history
  • Loading branch information
pm47 committed Jan 18, 2024
1 parent 35ca52a commit e8640e4
Show file tree
Hide file tree
Showing 6 changed files with 140 additions and 62 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package fr.acinq.lightning.blockchain.electrum
import fr.acinq.bitcoin.*
import fr.acinq.lightning.SwapInParams
import fr.acinq.lightning.blockchain.electrum.WalletState.Companion.index
import fr.acinq.lightning.utils.MDCLogger
import fr.acinq.lightning.blockchain.electrum.WalletState.Companion.index
import fr.acinq.lightning.utils.sum
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
Expand All @@ -13,8 +15,6 @@ import kotlinx.coroutines.flow.consumeAsFlow
import kotlinx.coroutines.flow.filterIsInstance
import kotlinx.coroutines.launch
import org.kodein.log.Logger
import org.kodein.log.LoggerFactory
import org.kodein.log.newLogger

data class WalletState(val addresses: Map<String, AddressState>, val parentTxs: Map<TxId, Transaction>) {
/** Electrum sends parent txs separately from utxo outpoints, this boolean indicates when the wallet is consistent */
Expand Down Expand Up @@ -119,23 +119,9 @@ class ElectrumMiniWallet(
val chainHash: BlockHash,
private val client: IElectrumClient,
private val scope: CoroutineScope,
loggerFactory: LoggerFactory,
private val name: String = ""
private val logger: Logger
) : CoroutineScope by scope {

private val logger = loggerFactory.newLogger(this::class)
private fun Logger.mdcinfo(msgCreator: () -> String) {
log(
level = Logger.Level.INFO,
meta = mapOf(
"wallet" to name,
"utxos" to walletStateFlow.value.utxos.size,
"balance" to walletStateFlow.value.totalBalance
),
msgCreator = msgCreator
)
}

// state flow with the current balance
private val _walletStateFlow = MutableStateFlow(WalletState(emptyMap(), emptyMap()))
val walletStateFlow get() = _walletStateFlow.asStateFlow()
Expand Down Expand Up @@ -185,10 +171,10 @@ class ElectrumMiniWallet(
val newUtxos = unspents.minus((_walletStateFlow.value.addresses[bitcoinAddress]?.unspent ?: emptyList()).toSet())
// request new parent txs
val parentTxs = newUtxos.mapNotNull { utxo -> client.getTx(utxo.txid) }
parentTxs.forEach { tx -> logger.mdcinfo { "received parent transaction with txid=${tx.txid}" } }
parentTxs.forEach { tx -> logger.info { "received parent transaction with txid=${tx.txid}" } }
val nextAddressState = WalletState.Companion.AddressState(addressMeta, unspents)
val nextWalletState = this.copy(addresses = this.addresses + (bitcoinAddress to nextAddressState), parentTxs = this.parentTxs + parentTxs.associateBy { it.txid })
logger.mdcinfo { "${unspents.size} utxo(s) for address=$bitcoinAddress index=${addressMeta.index ?: "n/a"} balance=${nextWalletState.totalBalance}" }
logger.info { "${unspents.size} utxo(s) for address=$bitcoinAddress index=${addressMeta.index ?: "n/a"} balance=${nextWalletState.totalBalance}" }
unspents.forEach { logger.debug { "utxo=${it.outPoint.txid}:${it.outPoint.index} amount=${it.value} sat" } }
when (val generator = addressGenerator) {
null -> {}
Expand Down Expand Up @@ -239,10 +225,10 @@ class ElectrumMiniWallet(
client.notifications.collect { mailbox.send(WalletCommand.Companion.ElectrumNotification(it)) }
}
launch {
mailbox.consumeAsFlow().collect { it ->
mailbox.consumeAsFlow().collect {
when (it) {
is WalletCommand.Companion.ElectrumConnected -> {
logger.mdcinfo { "electrum connected" }
logger.info { "electrum connected" }
scriptHashes.forEach { (scriptHash, address) -> subscribe(scriptHash, address) }
}
is WalletCommand.Companion.ElectrumNotification -> {
Expand All @@ -253,7 +239,7 @@ class ElectrumMiniWallet(
is WalletCommand.Companion.AddAddress -> {
computeScriptHash(it.bitcoinAddress)?.let { scriptHash ->
if (!scriptHashes.containsKey(scriptHash)) {
logger.mdcinfo { "adding new address=${it.bitcoinAddress} index=${it.meta.index ?: "n/a"}" }
logger.info { "adding new address=${it.bitcoinAddress} index=${it.meta.index ?: "n/a"}" }
scriptHashes = scriptHashes + (scriptHash to it.bitcoinAddress)
addressMetas = addressMetas + (it.bitcoinAddress to it.meta)
subscribe(scriptHash, it.bitcoinAddress)
Expand All @@ -262,7 +248,7 @@ class ElectrumMiniWallet(
}
is WalletCommand.Companion.AddAddressGenerator -> {
if (addressGenerator == null) {
logger.mdcinfo { "adding new address generator" }
logger.info { "adding new address generator" }
addressGenerator = it.generator to 0
mailbox.send(WalletCommand.Companion.GenerateAddress(it.generator.window))
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package fr.acinq.lightning.blockchain.electrum

import fr.acinq.lightning.NodeParams
import fr.acinq.lightning.crypto.KeyManager
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.distinctUntilChangedBy
import kotlinx.coroutines.launch
import org.kodein.log.LoggerFactory
import org.kodein.log.newLogger

class FinalWallet(
chain: NodeParams.Chain,
finalWalletKeys: KeyManager.Bip84OnChainKeys,
electrum: IElectrumClient,
scope: CoroutineScope,
loggerFactory: LoggerFactory
) {
private val logger = loggerFactory.newLogger(this::class)

val wallet = ElectrumMiniWallet(chain.chainHash, electrum, scope, logger)
val finalAddress: String = finalWalletKeys.address(addressIndex = 0L).also { wallet.addAddress(it) }

init {
scope.launch {
wallet.walletStateFlow
.distinctUntilChangedBy { it.totalBalance }
.collect { wallet ->
logger.info { "${wallet.totalBalance} available on final wallet with ${wallet.utxos.size} utxos" }
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
package fr.acinq.lightning.blockchain.electrum

import fr.acinq.lightning.NodeParams
import fr.acinq.lightning.blockchain.electrum.WalletState.Companion.index
import fr.acinq.lightning.crypto.KeyManager
import fr.acinq.lightning.utils.MDCLogger
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.filterNotNull
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.launch
import org.kodein.log.LoggerFactory
import org.kodein.log.newLogger

class SwapInWallet(
chain: NodeParams.Chain,
swapInKeys: KeyManager.SwapInOnChainKeys,
electrum: IElectrumClient,
addressGenerationWindow: Int,
scope: CoroutineScope,
loggerFactory: LoggerFactory
) {
private val logger = loggerFactory.newLogger(this::class)

val wallet = ElectrumMiniWallet(chain.chainHash, electrum, scope, logger)

val legacySwapInAddress: String = swapInKeys.legacySwapInProtocol.address(chain)
.also { wallet.addAddress(it) }
val swapInAddressFlow = MutableStateFlow<Pair<String, Int>?>(null)
.also { wallet.addAddressGenerator(generator = { index -> swapInKeys.getSwapInProtocol(index).address(chain) }, window = addressGenerationWindow) }

init {
scope.launch {
// address rotation
wallet.walletStateFlow.map { it ->
// take the first unused address with the lowest index
it.addresses
.filter { it.value.unspent.isEmpty() }
.mapNotNull { (key, value) -> value.meta.index?.let { key to it } }
.minByOrNull { it.second }
}
.filterNotNull()
.distinctUntilChanged()
.collect { (address, index) ->
logger.info { "setting current swap-in address=$address index=$index" }
swapInAddressFlow.emit(address to index)
}
}
}

}
30 changes: 3 additions & 27 deletions src/commonMain/kotlin/fr/acinq/lightning/io/Peer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -189,14 +189,8 @@ class Peer(
}
}

val finalWallet = ElectrumMiniWallet(nodeParams.chainHash, watcher.client, scope, nodeParams.loggerFactory, name = "final")
val finalAddress: String = nodeParams.keyManager.finalOnChainWallet.address(addressIndex = 0L).also { finalWallet.addAddress(it) }

val swapInWallet = ElectrumMiniWallet(nodeParams.chainHash, watcher.client, scope, nodeParams.loggerFactory, name = "swap-in")
val legacySwapInAddress: String = nodeParams.keyManager.swapInOnChainWallet.legacySwapInProtocol.address(nodeParams.chain)
.also { swapInWallet.addAddress(it) }
val swapInAddressFlow = MutableStateFlow<String?>(null)
.also { swapInWallet.addAddressGenerator({ index -> nodeParams.keyManager.swapInOnChainWallet.getSwapInProtocol(index).address(nodeParams.chain) }, 20) }
val finalWallet = FinalWallet(nodeParams.chain, nodeParams.keyManager.finalOnChainWallet, watcher.client, scope, nodeParams.loggerFactory)
val swapInWallet = SwapInWallet(nodeParams.chain, nodeParams.keyManager.swapInOnChainWallet, watcher.client, addressGenerationWindow = 20, scope, nodeParams.loggerFactory)

private var swapInJob: Job? = null

Expand All @@ -222,24 +216,6 @@ class Peer(
input.send(WrappedChannelCommand(it.channelId, ChannelCommand.WatchReceived(it)))
}
}
launch {
finalWallet.walletStateFlow
.distinctUntilChangedBy { it.totalBalance }
.collect { wallet ->
logger.info { "${wallet.totalBalance} available on final wallet with ${wallet.utxos.size} utxos" }
}
}
launch {
// address rotation
swapInWallet.walletStateFlow
.onEach { it ->
// take the first unused address with the lowest index
swapInAddressFlow.value = it.addresses
.filter { it.value.unspent.isEmpty() && it.value.meta is WalletState.Companion.AddressMeta.Derived }
.minByOrNull { (it.value.meta as WalletState.Companion.AddressMeta.Derived).index }
?.key
}
}
launch {
// we don't restore closed channels
val bootChannels = db.channels.listLocalChannels().filterNot { it is Closed || it is LegacyWaitForFundingConfirmed }
Expand Down Expand Up @@ -467,7 +443,7 @@ class Peer(
logger.info { "waiting for peer to be ready" }
waitForPeerReady()
swapInJob = launch {
swapInWallet.walletStateFlow
swapInWallet.wallet.walletStateFlow
.filter { it.consistent }
.combine(currentTipFlow.filterNotNull()) { walletState, currentTip -> Pair(walletState, currentTip.first) }
.combine(swapInFeeratesFlow.filterNotNull()) { (walletState, currentTip), feerate -> Triple(walletState, currentTip, feerate) }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,20 @@ import fr.acinq.lightning.utils.sat
import kotlinx.coroutines.flow.filter
import kotlinx.coroutines.flow.first
import org.kodein.log.LoggerFactory
import org.kodein.log.newLogger
import kotlin.test.Test
import kotlin.test.assertContains
import kotlin.test.assertEquals
import kotlin.time.Duration.Companion.seconds

class ElectrumMiniWalletTest : LightningTestSuite() {

val logger = LoggerFactory.default.newLogger(this::class)

@Test
fun `single address with no utxos`() = runSuspendTest(timeout = 15.seconds) {
val client = connectToMainnetServer()
val wallet = ElectrumMiniWallet(Block.LivenetGenesisBlock.hash, client, this, LoggerFactory.default)
val wallet = ElectrumMiniWallet(Block.LivenetGenesisBlock.hash, client, this, logger)
wallet.addAddress("bc1qyjmhaptq78vh5j7tnzu7ujayd8sftjahphxppz")

val walletState = wallet.walletStateFlow
Expand All @@ -36,7 +39,7 @@ class ElectrumMiniWalletTest : LightningTestSuite() {
@Test
fun `single address with existing utxos`() = runSuspendTest(timeout = 15.seconds) {
val client = connectToMainnetServer()
val wallet = ElectrumMiniWallet(Block.LivenetGenesisBlock.hash, client, this, LoggerFactory.default)
val wallet = ElectrumMiniWallet(Block.LivenetGenesisBlock.hash, client, this, logger)
wallet.addAddress("14xb2HATmkBzrHf4CR2hZczEtjYpTh92d2")

val walletState = wallet.walletStateFlow
Expand Down Expand Up @@ -104,7 +107,7 @@ class ElectrumMiniWalletTest : LightningTestSuite() {
@Test
fun `multiple addresses`() = runSuspendTest(timeout = 15.seconds) {
val client = connectToMainnetServer()
val wallet = ElectrumMiniWallet(Block.LivenetGenesisBlock.hash, client, this, LoggerFactory.default)
val wallet = ElectrumMiniWallet(Block.LivenetGenesisBlock.hash, client, this, logger)
wallet.addAddress("16MmJT8VqW465GEyckWae547jKVfMB14P8")
wallet.addAddress("14xb2HATmkBzrHf4CR2hZczEtjYpTh92d2")
wallet.addAddress("1NHFyu1uJ1UoDjtPjqZ4Et3wNCyMGCJ1qV")
Expand Down Expand Up @@ -157,15 +160,15 @@ class ElectrumMiniWalletTest : LightningTestSuite() {
@Test
fun `multiple addresses with generator`() = runSuspendTest(timeout = 15.seconds) {
val client = connectToMainnetServer()
val wallet = ElectrumMiniWallet(Block.LivenetGenesisBlock.hash, client, this, LoggerFactory.default)
val wallet = ElectrumMiniWallet(Block.LivenetGenesisBlock.hash, client, this, logger)
wallet.addAddressGenerator(
{
when (it) {
0 -> "16MmJT8VqW465GEyckWae547jKVfMB14P8"
1 -> "14xb2HATmkBzrHf4CR2hZczEtjYpTh92d2"
2 -> "1NHFyu1uJ1UoDjtPjqZ4Et3wNCyMGCJ1qV"
else -> Bitcoin.addressFromPublicKeyScript(Block.LivenetGenesisBlock.hash, Script.pay2pkh(randomKey().publicKey())).result!!
}
when (it) {
0 -> "16MmJT8VqW465GEyckWae547jKVfMB14P8"
1 -> "14xb2HATmkBzrHf4CR2hZczEtjYpTh92d2"
2 -> "1NHFyu1uJ1UoDjtPjqZ4Et3wNCyMGCJ1qV"
else -> Bitcoin.addressFromPublicKeyScript(Block.LivenetGenesisBlock.hash, Script.pay2pkh(randomKey().publicKey())).result!!
}
},
10
)
Expand Down Expand Up @@ -218,8 +221,8 @@ class ElectrumMiniWalletTest : LightningTestSuite() {
@Test
fun `parallel wallets`() = runSuspendTest(timeout = 15.seconds) {
val client = connectToMainnetServer()
val wallet1 = ElectrumMiniWallet(Block.LivenetGenesisBlock.hash, client, this, LoggerFactory.default, name = "addr-16MmJT")
val wallet2 = ElectrumMiniWallet(Block.LivenetGenesisBlock.hash, client, this, LoggerFactory.default, name = "addr-14xb2H")
val wallet1 = ElectrumMiniWallet(Block.LivenetGenesisBlock.hash, client, this, logger)
val wallet2 = ElectrumMiniWallet(Block.LivenetGenesisBlock.hash, client, this, logger)
wallet1.addAddress("16MmJT8VqW465GEyckWae547jKVfMB14P8")
wallet2.addAddress("14xb2HATmkBzrHf4CR2hZczEtjYpTh92d2")

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package fr.acinq.lightning.blockchain.electrum

import fr.acinq.bitcoin.MnemonicCode
import fr.acinq.lightning.NodeParams
import fr.acinq.lightning.crypto.LocalKeyManager
import fr.acinq.lightning.tests.TestConstants
import fr.acinq.lightning.tests.utils.LightningTestSuite
import fr.acinq.lightning.tests.utils.runSuspendTest
import fr.acinq.lightning.utils.toByteVector
import kotlinx.coroutines.flow.filterNotNull
import kotlinx.coroutines.flow.first
import org.kodein.log.LoggerFactory
import kotlin.test.Test
import kotlin.test.assertEquals
import kotlin.time.Duration.Companion.seconds

class SwapInWalletTestsCommon : LightningTestSuite() {

@Test
fun `swap-in wallet test`() = runSuspendTest(timeout = 15000.seconds) {
val mnemonics = "abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon abandon about".split(" ")
val keyManager = LocalKeyManager(MnemonicCode.toSeed(mnemonics, "").toByteVector(), NodeParams.Chain.Testnet, TestConstants.aliceSwapInServerXpub)
val client = connectToTestnetServer()
val wallet = SwapInWallet(NodeParams.Chain.Testnet, keyManager.swapInOnChainWallet, client, addressGenerationWindow = 3, this, LoggerFactory.default)

// addresses 0 to 5 have funds on them, the current address is the 6th
assertEquals(6, wallet.swapInAddressFlow.filterNotNull().first().second)
}
}

0 comments on commit e8640e4

Please sign in to comment.