diff --git a/payday_core/src/api/lightining_api.rs b/payday_core/src/api/lightining_api.rs index 413dadc..d856a0f 100644 --- a/payday_core/src/api/lightining_api.rs +++ b/payday_core/src/api/lightining_api.rs @@ -1,6 +1,9 @@ -use crate::{payment::amount::Amount, Result}; +use std::fmt::Display; + +use crate::{payment::amount::Amount, Error, Result}; use async_trait::async_trait; use lightning_invoice::Bolt11Invoice; +use tokio::{sync::mpsc::Sender, task::JoinHandle}; use super::on_chain_api::OnChainBalance; @@ -33,6 +36,19 @@ pub trait LightningPaymentApi: Send + Sync { async fn pay_to_node_pub_key(&self, pub_key: String, amount: Amount) -> Result<()>; } +/// Allows consuming Lightning transaction events from a Lightning node. +#[async_trait] +pub trait LightningTransactionStreamApi: Send + Sync { + /// Subscribe to Lightning transaction events. The receiver of the channel will get + /// LightningTransaction events. The subscription will resume the node event stream + /// from the given settle_index. At the moment only settled transactions are populated. + async fn subscribe_lightning_transactions( + &self, + sender: Sender, + settle_index: Option, + ) -> Result>; +} + #[derive(Debug, Clone)] pub struct LnInvoice { pub invoice: String, @@ -52,3 +68,61 @@ pub struct NodeBalance { pub onchain: OnChainBalance, pub channel: ChannelBalance, } + +#[derive(Debug, Clone)] +pub struct LightningTransaction { + pub node_id: String, + pub r_hash: String, + pub invoice: String, + pub amount: Amount, + pub amount_paid: Amount, + pub settle_index: u64, +} + +#[derive(Debug, Clone)] +pub enum LightningTransactionEvent { + Settled(LightningTransaction), +} + +#[derive(Debug, Clone, PartialEq)] +pub enum InvoiceState { + OPEN, + SETTLED, + CANCELED, + ACCEPTED, +} + +impl TryFrom for InvoiceState { + type Error = Error; + fn try_from(value: i32) -> Result { + match value { + 0 => Ok(InvoiceState::OPEN), + 1 => Ok(InvoiceState::SETTLED), + 2 => Ok(InvoiceState::CANCELED), + 3 => Ok(InvoiceState::ACCEPTED), + _ => Err(Error::InvalidInvoiceState(format!( + "Invalid invoice state: {}", + value + ))), + } + } +} + +// In this direction we don't need to check for invalid values. +#[allow(clippy::from_over_into)] +impl Into for InvoiceState { + fn into(self) -> i32 { + match self { + InvoiceState::OPEN => 0, + InvoiceState::SETTLED => 1, + InvoiceState::CANCELED => 2, + InvoiceState::ACCEPTED => 3, + } + } +} + +impl Display for InvoiceState { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + write!(f, "{:?}", self) + } +} diff --git a/payday_core/src/api/on_chain_api.rs b/payday_core/src/api/on_chain_api.rs index 19da07b..0148d75 100644 --- a/payday_core/src/api/on_chain_api.rs +++ b/payday_core/src/api/on_chain_api.rs @@ -56,11 +56,6 @@ pub trait OnChainTransactionApi: Send + Sync { ) -> Result>; } -#[async_trait] -pub trait OnChainStreamApi: Send + Sync { - async fn process_events(&self) -> Result>; -} - #[async_trait] pub trait OnChainTransactionEventProcessorApi: Send + Sync { fn node_id(&self) -> String; diff --git a/payday_core/src/error.rs b/payday_core/src/error.rs index 1f7f135..cdee99f 100644 --- a/payday_core/src/error.rs +++ b/payday_core/src/error.rs @@ -9,6 +9,7 @@ pub enum Error { NodeConnectError(String), NodeApiError(String), LightningPaymentFailed(String), + InvalidInvoiceState(String), PublicKey(String), DbError(String), InvalidBitcoinAddress(String), diff --git a/payday_node_lnd/src/lnd.rs b/payday_node_lnd/src/lnd.rs index 162a061..a9968ea 100644 --- a/payday_node_lnd/src/lnd.rs +++ b/payday_node_lnd/src/lnd.rs @@ -1,10 +1,10 @@ use std::collections::HashMap; use async_trait::async_trait; -use bitcoin::{Address, Network}; +use bitcoin::{hex::DisplayHex, Address, Network}; use fedimint_tonic_lnd::{ - lnrpc::{GetTransactionsRequest, Transaction}, + lnrpc::{GetTransactionsRequest, InvoiceSubscription, Transaction}, Client, }; use lightning_invoice::Bolt11Invoice; @@ -13,6 +13,7 @@ use payday_core::{ api::{ lightining_api::{ ChannelBalance, GetLightningBalanceApi, LightningInvoiceApi, LightningPaymentApi, + LightningTransaction, LightningTransactionEvent, LightningTransactionStreamApi, LnInvoice, NodeBalance, }, on_chain_api::{ @@ -29,6 +30,9 @@ use tokio_stream::StreamExt; use crate::wrapper::LndRpcWrapper; +// The numeric state that LND indicates a settled invoice with. +const LND_SETTLED: i32 = 1; + #[derive(Clone)] pub struct Lnd { config: LndConfig, @@ -207,65 +211,11 @@ pub struct LndConfig { pub network: Network, } -/// Converts a satoshi amount to an Amount -fn to_amount(sats: i64) -> bitcoin::Amount { - if sats < 0 { - bitcoin::Amount::ZERO - } else { - bitcoin::Amount::from_sat(sats.unsigned_abs()) - } -} - -/// Converts a Transaction to a list of OnChainTransactionEvents. -fn to_on_chain_events( - tx: &Transaction, - chain: Network, - node_id: &str, -) -> Result> { - let received = tx.amount > 0; - let confirmed = tx.num_confirmations > 0; - - let res = tx - .output_details - .iter() - .filter(|d| { - if received { - d.is_our_address - } else { - !d.is_our_address - } - }) - .flat_map(|d| { - let address = to_address(&d.address, chain); - if let Ok(address) = address { - let payload = OnChainTransaction { - tx_id: tx.tx_hash.to_owned(), - block_height: tx.block_height, - node_id: node_id.to_owned(), - confirmations: tx.num_confirmations, - amount: bitcoin::Amount::from_sat(tx.amount.unsigned_abs()), - address, - }; - - match (confirmed, received) { - (true, true) => Some(OnChainTransactionEvent::ReceivedConfirmed(payload)), - (true, false) => Some(OnChainTransactionEvent::SentConfirmed(payload)), - (false, true) => Some(OnChainTransactionEvent::ReceivedUnconfirmed(payload)), - (false, false) => Some(OnChainTransactionEvent::SentUnconfirmed(payload)), - } - } else { - None - } - }) - .collect(); - Ok(res) -} - -pub struct LndOnChainPaymentEventStream { +pub struct LndPaymentEventStream { config: LndConfig, } -impl LndOnChainPaymentEventStream { +impl LndPaymentEventStream { pub fn new(config: LndConfig) -> Self { Self { config } } @@ -279,7 +229,7 @@ impl LndOnChainPaymentEventStream { } #[async_trait] -impl OnChainTransactionStreamApi for LndOnChainPaymentEventStream { +impl OnChainTransactionStreamApi for LndPaymentEventStream { async fn subscribe_on_chain_transactions( &self, sender: Sender, @@ -333,3 +283,115 @@ impl OnChainTransactionStreamApi for LndOnChainPaymentEventStream { Ok(handle) } } + +#[async_trait] +impl LightningTransactionStreamApi for LndPaymentEventStream { + async fn subscribe_lightning_transactions( + &self, + sender: Sender, + settle_index: Option, + ) -> Result> { + let config = self.config.clone(); + + let handle = tokio::spawn(async move { + let sender = sender.clone(); + let mut lnd: Client = fedimint_tonic_lnd::connect( + config.address.to_string(), + config.cert_path.to_string(), + config.macaroon_file.to_string(), + ) + .await + .expect("Failed to connect to LND lightning transaction stream"); + + let mut stream = lnd + .lightning() + .subscribe_invoices(InvoiceSubscription { + settle_index: settle_index.unwrap_or_default(), + ..Default::default() + }) + .await + .expect("Failed to subscribe to LND lightning transaction events") + .into_inner() + .filter_map(|tx| tx.ok()); + + while let Some(event) = stream.next().await { + if event.state == LND_SETTLED { + if let Ok(event) = to_lightning_event(event, &config.node_id) { + if let Err(e) = sender.send(event).await { + println!("Failed to send lightning transaction event: {:?}", e); + } + } + } + } + }); + Ok(handle) + } +} + +fn to_lightning_event( + event: fedimint_tonic_lnd::lnrpc::Invoice, + node_id: &str, +) -> Result { + Ok(LightningTransactionEvent::Settled(LightningTransaction { + node_id: node_id.to_owned(), + r_hash: event.r_hash.to_lower_hex_string(), + invoice: event.payment_request.to_owned(), + amount: Amount::sats(event.value as u64), + amount_paid: Amount::sats(event.amt_paid_sat as u64), + settle_index: event.settle_index, + })) +} + +/// Converts a satoshi amount to an Amount +fn to_amount(sats: i64) -> bitcoin::Amount { + if sats < 0 { + bitcoin::Amount::ZERO + } else { + bitcoin::Amount::from_sat(sats.unsigned_abs()) + } +} + +/// Converts a Transaction to a list of OnChainTransactionEvents. +fn to_on_chain_events( + tx: &Transaction, + chain: Network, + node_id: &str, +) -> Result> { + let received = tx.amount > 0; + let confirmed = tx.num_confirmations > 0; + + let res = tx + .output_details + .iter() + .filter(|d| { + if received { + d.is_our_address + } else { + !d.is_our_address + } + }) + .flat_map(|d| { + let address = to_address(&d.address, chain); + if let Ok(address) = address { + let payload = OnChainTransaction { + tx_id: tx.tx_hash.to_owned(), + block_height: tx.block_height, + node_id: node_id.to_owned(), + confirmations: tx.num_confirmations, + amount: bitcoin::Amount::from_sat(tx.amount.unsigned_abs()), + address, + }; + + match (confirmed, received) { + (true, true) => Some(OnChainTransactionEvent::ReceivedConfirmed(payload)), + (true, false) => Some(OnChainTransactionEvent::SentConfirmed(payload)), + (false, true) => Some(OnChainTransactionEvent::ReceivedUnconfirmed(payload)), + (false, false) => Some(OnChainTransactionEvent::SentUnconfirmed(payload)), + } + } else { + None + } + }) + .collect(); + Ok(res) +} diff --git a/src/main.rs b/src/main.rs index 2c3c092..d0dff71 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,8 +1,8 @@ use bitcoin::Network; -use payday_core::api::on_chain_api::OnChainTransactionStreamApi; +use payday_core::api::lightining_api::LightningTransactionStreamApi; use payday_core::Result; -use payday_node_lnd::lnd::{LndConfig, LndOnChainPaymentEventStream}; +use payday_node_lnd::lnd::{LndConfig, LndPaymentEventStream}; use serde::{Deserialize, Serialize}; use tokio::task::JoinSet; @@ -63,20 +63,21 @@ async fn main() -> Result<()> { // println!("LND2 invoice: {:?}", ln_invoice2); let (tx, mut rx) = tokio::sync::mpsc::channel(100); - let transactions_1 = LndOnChainPaymentEventStream::new(lnd_config.clone()); - let transactions_2 = LndOnChainPaymentEventStream::new(lnd_config2.clone()); + let transactions_1 = LndPaymentEventStream::new(lnd_config.clone()); + let transactions_2 = LndPaymentEventStream::new(lnd_config2.clone()); let handles = vec![ transactions_1 - .subscribe_on_chain_transactions(tx.clone(), Some(1868219)) + .subscribe_lightning_transactions(tx.clone(), Some(1)) .await .unwrap(), transactions_2 - .subscribe_on_chain_transactions(tx, Some(1868219)) + .subscribe_lightning_transactions(tx, Some(1)) .await .unwrap(), ]; let set = JoinSet::from_iter(handles); + println!("Subscribed to transactions"); while let Some(event) = rx.recv().await { println!("Event: {:?}", event); }