Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 75 additions & 1 deletion payday_core/src/api/lightining_api.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use crate::{payment::amount::Amount, Result};
use std::fmt::Display;

use crate::{payment::amount::Amount, Error, Result};
use async_trait::async_trait;
use lightning_invoice::Bolt11Invoice;
use tokio::{sync::mpsc::Sender, task::JoinHandle};

use super::on_chain_api::OnChainBalance;

Expand Down Expand Up @@ -33,6 +36,19 @@ pub trait LightningPaymentApi: Send + Sync {
async fn pay_to_node_pub_key(&self, pub_key: String, amount: Amount) -> Result<()>;
}

/// Allows consuming Lightning transaction events from a Lightning node.
#[async_trait]
pub trait LightningTransactionStreamApi: Send + Sync {
/// Subscribe to Lightning transaction events. The receiver of the channel will get
/// LightningTransaction events. The subscription will resume the node event stream
/// from the given settle_index. At the moment only settled transactions are populated.
async fn subscribe_lightning_transactions(
&self,
sender: Sender<LightningTransactionEvent>,
settle_index: Option<u64>,
) -> Result<JoinHandle<()>>;
}

#[derive(Debug, Clone)]
pub struct LnInvoice {
pub invoice: String,
Expand All @@ -52,3 +68,61 @@ pub struct NodeBalance {
pub onchain: OnChainBalance,
pub channel: ChannelBalance,
}

#[derive(Debug, Clone)]
pub struct LightningTransaction {
pub node_id: String,
pub r_hash: String,
pub invoice: String,
pub amount: Amount,
pub amount_paid: Amount,
pub settle_index: u64,
}

#[derive(Debug, Clone)]
pub enum LightningTransactionEvent {
Settled(LightningTransaction),
}

#[derive(Debug, Clone, PartialEq)]
pub enum InvoiceState {
OPEN,
SETTLED,
CANCELED,
ACCEPTED,
}

impl TryFrom<i32> for InvoiceState {
type Error = Error;
fn try_from(value: i32) -> Result<Self> {
match value {
0 => Ok(InvoiceState::OPEN),
1 => Ok(InvoiceState::SETTLED),
2 => Ok(InvoiceState::CANCELED),
3 => Ok(InvoiceState::ACCEPTED),
_ => Err(Error::InvalidInvoiceState(format!(
"Invalid invoice state: {}",
value
))),
}
}
}

// In this direction we don't need to check for invalid values.
#[allow(clippy::from_over_into)]
impl Into<i32> for InvoiceState {
fn into(self) -> i32 {
match self {
InvoiceState::OPEN => 0,
InvoiceState::SETTLED => 1,
InvoiceState::CANCELED => 2,
InvoiceState::ACCEPTED => 3,
}
}
}

impl Display for InvoiceState {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{:?}", self)
}
}
5 changes: 0 additions & 5 deletions payday_core/src/api/on_chain_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,6 @@ pub trait OnChainTransactionApi: Send + Sync {
) -> Result<Vec<OnChainTransactionEvent>>;
}

#[async_trait]
pub trait OnChainStreamApi: Send + Sync {
async fn process_events(&self) -> Result<JoinHandle<()>>;
}

#[async_trait]
pub trait OnChainTransactionEventProcessorApi: Send + Sync {
fn node_id(&self) -> String;
Expand Down
1 change: 1 addition & 0 deletions payday_core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ pub enum Error {
NodeConnectError(String),
NodeApiError(String),
LightningPaymentFailed(String),
InvalidInvoiceState(String),
PublicKey(String),
DbError(String),
InvalidBitcoinAddress(String),
Expand Down
180 changes: 121 additions & 59 deletions payday_node_lnd/src/lnd.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use std::collections::HashMap;

use async_trait::async_trait;
use bitcoin::{Address, Network};
use bitcoin::{hex::DisplayHex, Address, Network};

use fedimint_tonic_lnd::{
lnrpc::{GetTransactionsRequest, Transaction},
lnrpc::{GetTransactionsRequest, InvoiceSubscription, Transaction},
Client,
};
use lightning_invoice::Bolt11Invoice;
Expand All @@ -13,6 +13,7 @@ use payday_core::{
api::{
lightining_api::{
ChannelBalance, GetLightningBalanceApi, LightningInvoiceApi, LightningPaymentApi,
LightningTransaction, LightningTransactionEvent, LightningTransactionStreamApi,
LnInvoice, NodeBalance,
},
on_chain_api::{
Expand All @@ -29,6 +30,9 @@ use tokio_stream::StreamExt;

use crate::wrapper::LndRpcWrapper;

// The numeric state that LND indicates a settled invoice with.
const LND_SETTLED: i32 = 1;

#[derive(Clone)]
pub struct Lnd {
config: LndConfig,
Expand Down Expand Up @@ -207,65 +211,11 @@ pub struct LndConfig {
pub network: Network,
}

/// Converts a satoshi amount to an Amount
fn to_amount(sats: i64) -> bitcoin::Amount {
if sats < 0 {
bitcoin::Amount::ZERO
} else {
bitcoin::Amount::from_sat(sats.unsigned_abs())
}
}

/// Converts a Transaction to a list of OnChainTransactionEvents.
fn to_on_chain_events(
tx: &Transaction,
chain: Network,
node_id: &str,
) -> Result<Vec<OnChainTransactionEvent>> {
let received = tx.amount > 0;
let confirmed = tx.num_confirmations > 0;

let res = tx
.output_details
.iter()
.filter(|d| {
if received {
d.is_our_address
} else {
!d.is_our_address
}
})
.flat_map(|d| {
let address = to_address(&d.address, chain);
if let Ok(address) = address {
let payload = OnChainTransaction {
tx_id: tx.tx_hash.to_owned(),
block_height: tx.block_height,
node_id: node_id.to_owned(),
confirmations: tx.num_confirmations,
amount: bitcoin::Amount::from_sat(tx.amount.unsigned_abs()),
address,
};

match (confirmed, received) {
(true, true) => Some(OnChainTransactionEvent::ReceivedConfirmed(payload)),
(true, false) => Some(OnChainTransactionEvent::SentConfirmed(payload)),
(false, true) => Some(OnChainTransactionEvent::ReceivedUnconfirmed(payload)),
(false, false) => Some(OnChainTransactionEvent::SentUnconfirmed(payload)),
}
} else {
None
}
})
.collect();
Ok(res)
}

pub struct LndOnChainPaymentEventStream {
pub struct LndPaymentEventStream {
config: LndConfig,
}

impl LndOnChainPaymentEventStream {
impl LndPaymentEventStream {
pub fn new(config: LndConfig) -> Self {
Self { config }
}
Expand All @@ -279,7 +229,7 @@ impl LndOnChainPaymentEventStream {
}

#[async_trait]
impl OnChainTransactionStreamApi for LndOnChainPaymentEventStream {
impl OnChainTransactionStreamApi for LndPaymentEventStream {
async fn subscribe_on_chain_transactions(
&self,
sender: Sender<OnChainTransactionEvent>,
Expand Down Expand Up @@ -333,3 +283,115 @@ impl OnChainTransactionStreamApi for LndOnChainPaymentEventStream {
Ok(handle)
}
}

#[async_trait]
impl LightningTransactionStreamApi for LndPaymentEventStream {
async fn subscribe_lightning_transactions(
&self,
sender: Sender<LightningTransactionEvent>,
settle_index: Option<u64>,
) -> Result<JoinHandle<()>> {
let config = self.config.clone();

let handle = tokio::spawn(async move {
let sender = sender.clone();
let mut lnd: Client = fedimint_tonic_lnd::connect(
config.address.to_string(),
config.cert_path.to_string(),
config.macaroon_file.to_string(),
)
.await
.expect("Failed to connect to LND lightning transaction stream");

let mut stream = lnd
.lightning()
.subscribe_invoices(InvoiceSubscription {
settle_index: settle_index.unwrap_or_default(),
..Default::default()
})
.await
.expect("Failed to subscribe to LND lightning transaction events")
.into_inner()
.filter_map(|tx| tx.ok());

while let Some(event) = stream.next().await {
if event.state == LND_SETTLED {
if let Ok(event) = to_lightning_event(event, &config.node_id) {
if let Err(e) = sender.send(event).await {
println!("Failed to send lightning transaction event: {:?}", e);
}
}
}
}
});
Ok(handle)
}
}

fn to_lightning_event(
event: fedimint_tonic_lnd::lnrpc::Invoice,
node_id: &str,
) -> Result<LightningTransactionEvent> {
Ok(LightningTransactionEvent::Settled(LightningTransaction {
node_id: node_id.to_owned(),
r_hash: event.r_hash.to_lower_hex_string(),
invoice: event.payment_request.to_owned(),
amount: Amount::sats(event.value as u64),
amount_paid: Amount::sats(event.amt_paid_sat as u64),
settle_index: event.settle_index,
}))
}

/// Converts a satoshi amount to an Amount
fn to_amount(sats: i64) -> bitcoin::Amount {
if sats < 0 {
bitcoin::Amount::ZERO
} else {
bitcoin::Amount::from_sat(sats.unsigned_abs())
}
}

/// Converts a Transaction to a list of OnChainTransactionEvents.
fn to_on_chain_events(
tx: &Transaction,
chain: Network,
node_id: &str,
) -> Result<Vec<OnChainTransactionEvent>> {
let received = tx.amount > 0;
let confirmed = tx.num_confirmations > 0;

let res = tx
.output_details
.iter()
.filter(|d| {
if received {
d.is_our_address
} else {
!d.is_our_address
}
})
.flat_map(|d| {
let address = to_address(&d.address, chain);
if let Ok(address) = address {
let payload = OnChainTransaction {
tx_id: tx.tx_hash.to_owned(),
block_height: tx.block_height,
node_id: node_id.to_owned(),
confirmations: tx.num_confirmations,
amount: bitcoin::Amount::from_sat(tx.amount.unsigned_abs()),
address,
};

match (confirmed, received) {
(true, true) => Some(OnChainTransactionEvent::ReceivedConfirmed(payload)),
(true, false) => Some(OnChainTransactionEvent::SentConfirmed(payload)),
(false, true) => Some(OnChainTransactionEvent::ReceivedUnconfirmed(payload)),
(false, false) => Some(OnChainTransactionEvent::SentUnconfirmed(payload)),
}
} else {
None
}
})
.collect();
Ok(res)
}
13 changes: 7 additions & 6 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use bitcoin::Network;

use payday_core::api::on_chain_api::OnChainTransactionStreamApi;
use payday_core::api::lightining_api::LightningTransactionStreamApi;
use payday_core::Result;
use payday_node_lnd::lnd::{LndConfig, LndOnChainPaymentEventStream};
use payday_node_lnd::lnd::{LndConfig, LndPaymentEventStream};
use serde::{Deserialize, Serialize};
use tokio::task::JoinSet;

Expand Down Expand Up @@ -63,20 +63,21 @@ async fn main() -> Result<()> {
// println!("LND2 invoice: {:?}", ln_invoice2);

let (tx, mut rx) = tokio::sync::mpsc::channel(100);
let transactions_1 = LndOnChainPaymentEventStream::new(lnd_config.clone());
let transactions_2 = LndOnChainPaymentEventStream::new(lnd_config2.clone());
let transactions_1 = LndPaymentEventStream::new(lnd_config.clone());
let transactions_2 = LndPaymentEventStream::new(lnd_config2.clone());
let handles = vec![
transactions_1
.subscribe_on_chain_transactions(tx.clone(), Some(1868219))
.subscribe_lightning_transactions(tx.clone(), Some(1))
.await
.unwrap(),
transactions_2
.subscribe_on_chain_transactions(tx, Some(1868219))
.subscribe_lightning_transactions(tx, Some(1))
.await
.unwrap(),
];
let set = JoinSet::from_iter(handles);

println!("Subscribed to transactions");
while let Some(event) = rx.recv().await {
println!("Event: {:?}", event);
}
Expand Down