Skip to content

Commit

Permalink
Rotate taproot swap-in addresses (#584)
Browse files Browse the repository at this point in the history
Addresses are generated one by one, as soon as we detect that the last generated address has been used. There is no lookahead.

Scanning 'synchronous' (not really synchronous, but we rely the least possible on the mailbox). This makes the `window` disappear because we are scanning addresses one by one.

Some cleanup by extracting wallets outside of `Peer`.

---------

Co-authored-by: sstone <fabrice.drouin@acinq.fr>
  • Loading branch information
pm47 and sstone authored Feb 19, 2024
1 parent 2fe01d5 commit 506003f
Show file tree
Hide file tree
Showing 19 changed files with 518 additions and 127 deletions.
1 change: 1 addition & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -306,6 +306,7 @@ if (currentOs.isLinux) {
filter.excludeTestsMatching("*IntegrationTest")
filter.excludeTestsMatching("*ElectrumClientTest")
filter.excludeTestsMatching("*ElectrumMiniWalletTest")
filter.excludeTestsMatching("*SwapInWalletTestsCommon")
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
package fr.acinq.lightning.blockchain.electrum

import co.touchlab.kermit.Logger
import fr.acinq.bitcoin.*
import fr.acinq.lightning.SwapInParams
import fr.acinq.lightning.logging.*
import fr.acinq.lightning.logging.debug
import fr.acinq.lightning.logging.info
import fr.acinq.lightning.utils.sat
import fr.acinq.lightning.utils.sum
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Job
Expand All @@ -13,25 +16,41 @@ import kotlinx.coroutines.flow.consumeAsFlow
import kotlinx.coroutines.flow.filterIsInstance
import kotlinx.coroutines.launch

data class WalletState(val addresses: Map<String, List<Utxo>>) {
val utxos: List<Utxo> = addresses.flatMap { it.value }
data class WalletState(val addresses: Map<String, AddressState>) {
val utxos: List<Utxo> = addresses.flatMap { it.value.utxos }
val totalBalance = utxos.map { it.amount }.sum()
val lastDerivedAddress: Pair<String, AddressMeta.Derived>? = addresses
.mapNotNull { entry -> (entry.value.meta as? AddressMeta.Derived)?.let { entry.key to it } }
.maxByOrNull { it.second.index }

fun withoutReservedUtxos(reserved: Set<OutPoint>): WalletState {
return copy(addresses = addresses.mapValues {
it.value.filter { item -> !reserved.contains(item.outPoint) }
it.value.copy(utxos = it.value.utxos.filter { item -> !reserved.contains(item.outPoint) })
})
}

fun withConfirmations(currentBlockHeight: Int, swapInParams: SwapInParams): WalletWithConfirmations = WalletWithConfirmations(
swapInParams = swapInParams, currentBlockHeight = currentBlockHeight, all = utxos,
)

data class Utxo(val txId: TxId, val outputIndex: Int, val blockHeight: Long, val previousTx: Transaction) {
data class Utxo(val txId: TxId, val outputIndex: Int, val blockHeight: Long, val previousTx: Transaction, val addressMeta: AddressMeta) {
val outPoint = OutPoint(previousTx, outputIndex.toLong())
val amount = previousTx.txOut[outputIndex].amount
}

data class AddressState(val meta: AddressMeta, val alreadyUsed: Boolean, val utxos: List<Utxo>)

sealed class AddressMeta {
data object Single : AddressMeta()
data class Derived(val index: Int) : AddressMeta()
}

val AddressMeta.indexOrNull: Int?
get() = when (this) {
is AddressMeta.Single -> null
is AddressMeta.Derived -> this.index
}

/**
* The utxos of a wallet may be discriminated against their number of confirmations. Typically, this is used in the
* context of a funding, which should happen only after a given depth.
Expand Down Expand Up @@ -82,6 +101,9 @@ private sealed interface WalletCommand {
data object ElectrumConnected : WalletCommand
data class ElectrumNotification(val msg: ElectrumResponse) : WalletCommand
data class AddAddress(val bitcoinAddress: String) : WalletCommand
data class AddAddressGenerator(val generator: AddressGenerator) : WalletCommand

class AddressGenerator(val generateAddress: (Int) -> String)
}
}

Expand All @@ -92,23 +114,19 @@ class ElectrumMiniWallet(
val chainHash: BlockHash,
private val client: IElectrumClient,
private val scope: CoroutineScope,
loggerFactory: LoggerFactory,
private val name: String = ""
private val logger: Logger
) : CoroutineScope by scope {

private val logger = MDCLogger(loggerFactory.newLogger(this::class))
private fun mdc(): Map<String, Any> {
return mapOf(
"wallet" to name,
"utxos" to walletStateFlow.value.utxos.size,
"balance" to walletStateFlow.value.totalBalance
)
}

// state flow with the current balance
private val _walletStateFlow = MutableStateFlow(WalletState(emptyMap()))
val walletStateFlow get() = _walletStateFlow.asStateFlow()

// generator, if used
private var addressGenerator: WalletCommand.Companion.AddressGenerator? = null

// all current meta associated to each address
private var addressMetas: Map<String, WalletState.AddressMeta> = emptyMap()

// all currently watched script hashes and their corresponding bitcoin address
private var scriptHashes: Map<ByteVector32, String> = emptyMap()

Expand All @@ -121,6 +139,12 @@ class ElectrumMiniWallet(
}
}

fun addAddressGenerator(generator: (Int) -> String) {
launch {
mailbox.send(WalletCommand.Companion.AddAddressGenerator(WalletCommand.Companion.AddressGenerator(generator)))
}
}

/** This function should only be used in tests, to test the wallet notification flow. */
fun setWalletState(walletState: WalletState) {
launch {
Expand All @@ -131,25 +155,31 @@ class ElectrumMiniWallet(
private val job: Job

init {

suspend fun WalletState.processSubscriptionResponse(msg: ScriptHashSubscriptionResponse): WalletState {
val bitcoinAddress = scriptHashes[msg.scriptHash]
val addressMeta = bitcoinAddress?.let { addressMetas[it] }
return when {
bitcoinAddress == null -> {
// this should never happen
logger.error { "received subscription response for script hash ${msg.scriptHash} that does not match any address" }
bitcoinAddress == null || addressMeta == null -> {
// this will happen because multiple wallets may be sharing the same Electrum connection (e.g. swap-in and final wallet)
logger.debug { "received subscription response for script hash ${msg.scriptHash} that does not match any address" }
this
}
msg.status == null -> this.copy(addresses = this.addresses + (bitcoinAddress to listOf()))
msg.status == null -> {
logger.info { "address=$bitcoinAddress index=${addressMeta.indexOrNull ?: "n/a"} utxos=(unused)" }
this.copy(addresses = this.addresses + (bitcoinAddress to WalletState.AddressState(addressMeta, alreadyUsed = false, utxos = listOf())))
}
else -> {
val unspents = client.getScriptHashUnspents(msg.scriptHash)
val previouslysKnownTxs = (_walletStateFlow.value.addresses[bitcoinAddress] ?: emptyList()).map { it.txId to it.previousTx }.toMap()
val previouslysKnownTxs = (this.addresses[bitcoinAddress]?.utxos ?: emptyList()).associate { it.txId to it.previousTx }
val utxos = unspents
.mapNotNull { item -> (previouslysKnownTxs[item.txid] ?: client.getTx(item.txid))?.let { item to it } } // only retrieve txs from electrum if necessary and ignore the utxo if the parent tx cannot be retrieved
.map { (item, previousTx) -> WalletState.Utxo(item.txid, item.outputIndex, item.blockHeight, previousTx) }
val nextWalletState = this.copy(addresses = this.addresses + (bitcoinAddress to utxos))
logger.info(mdc()) { "${unspents.size} utxo(s) for address=$bitcoinAddress balance=${nextWalletState.totalBalance}" }
.map { (item, previousTx) -> WalletState.Utxo(item.txid, item.outputIndex, item.blockHeight, previousTx, addressMeta) }
val nextAddressState = WalletState.AddressState(addressMeta, alreadyUsed = true, utxos)
val nextWalletState = this.copy(addresses = this.addresses + (bitcoinAddress to nextAddressState))
logger.info { "address=$bitcoinAddress index=${addressMeta.indexOrNull ?: "n/a"} utxos=${unspents.size} amount=${unspents.sumOf { it.value }.sat}" }
unspents.forEach { logger.debug { "utxo=${it.outPoint.txid}:${it.outPoint.index} amount=${it.value} sat" } }
nextWalletState
return nextWalletState
}
}
}
Expand All @@ -159,11 +189,10 @@ class ElectrumMiniWallet(
* Depending on the status of the electrum connection, the subscription may or may not be sent to a server.
* It is the responsibility of the caller to resubscribe on reconnection.
*/
suspend fun subscribe(scriptHash: ByteVector32, bitcoinAddress: String) {
kotlin.runCatching { client.startScriptHashSubscription(scriptHash) }.map { response ->
logger.info { "subscribed to address=$bitcoinAddress scriptHash=$scriptHash" }
_walletStateFlow.value = _walletStateFlow.value.processSubscriptionResponse(response)
}
suspend fun WalletState.subscribe(scriptHash: ByteVector32, bitcoinAddress: String): WalletState {
val response = client.startScriptHashSubscription(scriptHash)
logger.debug { "subscribed to address=$bitcoinAddress scriptHash=$scriptHash" }
return processSubscriptionResponse(response)
}

fun computeScriptHash(bitcoinAddress: String): ByteVector32? {
Expand All @@ -172,6 +201,30 @@ class ElectrumMiniWallet(
.right
}

suspend fun WalletState.addAddress(bitcoinAddress: String, meta: WalletState.AddressMeta): WalletState {
return computeScriptHash(bitcoinAddress)?.let { scriptHash ->
if (!scriptHashes.containsKey(scriptHash)) {
logger.debug { "adding new address=${bitcoinAddress} index=${meta.indexOrNull ?: "n/a"}" }
scriptHashes = scriptHashes + (scriptHash to bitcoinAddress)
addressMetas = addressMetas + (bitcoinAddress to meta)
subscribe(scriptHash, bitcoinAddress)
} else this
} ?: this
}

suspend fun WalletState.addAddress(generator: WalletCommand.Companion.AddressGenerator, addressIndex: Int): WalletState {
return this.addAddress(generator.generateAddress(addressIndex), WalletState.AddressMeta.Derived(addressIndex))
}

suspend fun WalletState.maybeGenerateNext(generator: WalletCommand.Companion.AddressGenerator): WalletState {
val lastDerivedAddressState = this.lastDerivedAddress?.let { this.addresses[it.first] }
return when {
lastDerivedAddressState == null -> this.addAddress(generator, 0).maybeGenerateNext(generator) // there is no existing derived address: initialization
lastDerivedAddressState.alreadyUsed -> this.addAddress(generator, lastDerivedAddressState.meta.indexOrNull!! + 1).maybeGenerateNext(generator) // most recent derived address is used, need to generate a new one
else -> this // nothing to do
}
}

job = launch {
launch {
// listen to connection events
Expand All @@ -185,23 +238,33 @@ class ElectrumMiniWallet(
mailbox.consumeAsFlow().collect {
when (it) {
is WalletCommand.Companion.ElectrumConnected -> {
logger.info(mdc()) { "electrum connected" }
scriptHashes.forEach { (scriptHash, address) -> subscribe(scriptHash, address) }
logger.info { "electrum connected" }
val walletState1 = scriptHashes
.toList()
.fold(_walletStateFlow.value) { walletState, (scriptHash, address) ->
walletState.subscribe(scriptHash, address)
}
val walletState2 = addressGenerator?.let { gen -> walletState1.maybeGenerateNext(gen) } ?: walletState1
_walletStateFlow.value = walletState2

}

is WalletCommand.Companion.ElectrumNotification -> {
if (it.msg is ScriptHashSubscriptionResponse) {
_walletStateFlow.value = _walletStateFlow.value.processSubscriptionResponse(it.msg)
val walletState1 = _walletStateFlow.value.processSubscriptionResponse(it.msg)
val walletState2 = addressGenerator?.let { gen -> walletState1.maybeGenerateNext(gen) } ?: walletState1
_walletStateFlow.value = walletState2
}
}

is WalletCommand.Companion.AddAddress -> {
computeScriptHash(it.bitcoinAddress)?.let { scriptHash ->
if (!scriptHashes.containsKey(scriptHash)) {
logger.info(mdc()) { "adding new address=${it.bitcoinAddress}" }
scriptHashes = scriptHashes + (scriptHash to it.bitcoinAddress)
subscribe(scriptHash, it.bitcoinAddress)
}
_walletStateFlow.value = _walletStateFlow.value.addAddress(it.bitcoinAddress, WalletState.AddressMeta.Single)
}
is WalletCommand.Companion.AddAddressGenerator -> {
if (addressGenerator == null) {
logger.info { "adding new address generator" }
addressGenerator = it.generator
_walletStateFlow.value = _walletStateFlow.value.maybeGenerateNext(it.generator)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package fr.acinq.lightning.blockchain.electrum

import fr.acinq.bitcoin.Bitcoin
import fr.acinq.lightning.crypto.KeyManager
import fr.acinq.lightning.logging.LoggerFactory
import fr.acinq.lightning.logging.info
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.distinctUntilChangedBy
import kotlinx.coroutines.launch

class FinalWallet(
chain: Bitcoin.Chain,
finalWalletKeys: KeyManager.Bip84OnChainKeys,
electrum: IElectrumClient,
scope: CoroutineScope,
loggerFactory: LoggerFactory
) {
private val logger = loggerFactory.newLogger(this::class)

val wallet = ElectrumMiniWallet(chain.chainHash, electrum, scope, logger)
val finalAddress: String = finalWalletKeys.address(addressIndex = 0L).also { wallet.addAddress(it) }

init {
scope.launch {
wallet.walletStateFlow
.distinctUntilChangedBy { it.totalBalance }
.collect { wallet ->
logger.info { "${wallet.totalBalance} available on final wallet with ${wallet.utxos.size} utxos" }
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package fr.acinq.lightning.blockchain.electrum

import fr.acinq.bitcoin.Bitcoin
import fr.acinq.lightning.crypto.KeyManager
import fr.acinq.lightning.logging.LoggerFactory
import fr.acinq.lightning.logging.info
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.flow.MutableStateFlow
import kotlinx.coroutines.flow.distinctUntilChanged
import kotlinx.coroutines.flow.filterNotNull
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.launch

class SwapInWallet(
chain: Bitcoin.Chain,
swapInKeys: KeyManager.SwapInOnChainKeys,
electrum: IElectrumClient,
scope: CoroutineScope,
loggerFactory: LoggerFactory
) {
private val logger = loggerFactory.newLogger(this::class)

val wallet = ElectrumMiniWallet(chain.chainHash, electrum, scope, logger)

val legacySwapInAddress: String = swapInKeys.legacySwapInProtocol.address(chain)
.also { wallet.addAddress(it) }
val swapInAddressFlow = MutableStateFlow<Pair<String, Int>?>(null)
.also { wallet.addAddressGenerator(generator = { index -> swapInKeys.getSwapInProtocol(index).address(chain) }) }

init {
scope.launch {
// address rotation
wallet.walletStateFlow
.map { it.lastDerivedAddress }
.filterNotNull()
.distinctUntilChanged()
.collect { (address, derived) ->
logger.info { "setting current swap-in address=$address index=${derived.index}" }
swapInAddressFlow.emit(address to derived.index)
}
}
}

}
Loading

0 comments on commit 506003f

Please sign in to comment.