From 7d5619dbf2cbf90c046a4a1c0cb86e5c32471616 Mon Sep 17 00:00:00 2001 From: tompro Date: Thu, 20 Feb 2025 15:00:00 +0100 Subject: [PATCH 1/2] Cleanup --- payday_core/src/lib.rs | 5 ----- payday_node_lnd/src/wrapper.rs | 17 +---------------- 2 files changed, 1 insertion(+), 21 deletions(-) diff --git a/payday_core/src/lib.rs b/payday_core/src/lib.rs index 4c3fe17..bb336a1 100644 --- a/payday_core/src/lib.rs +++ b/payday_core/src/lib.rs @@ -1,7 +1,3 @@ -use std::pin::Pin; - -use tokio_stream::Stream; - pub use error::Error; pub mod api; @@ -12,4 +8,3 @@ pub mod payment; pub mod persistence; pub type Result = std::result::Result; -pub type PaydayStream = Pin>>; diff --git a/payday_node_lnd/src/wrapper.rs b/payday_node_lnd/src/wrapper.rs index 4c1c3e3..216ebf3 100644 --- a/payday_node_lnd/src/wrapper.rs +++ b/payday_node_lnd/src/wrapper.rs @@ -17,7 +17,7 @@ use fedimint_tonic_lnd::{ }; use lightning_invoice::Bolt11Invoice; use payday_btc::to_address; -use payday_core::{api::lightining_api::LnInvoice, Error, PaydayStream, Result}; +use payday_core::{api::lightining_api::LnInvoice, Error, Result}; use tokio::sync::{Mutex, MutexGuard}; use tokio_stream::StreamExt; @@ -296,21 +296,6 @@ impl LndRpcWrapper { Ok(result) } - /// Get a stream of onchain transactions relevant to the wallet. As LND RPC does not handle - /// the request arguments, we do not provide any on this method to avoid confusion. - pub async fn subscribe_transactions(&self) -> Result> { - let mut lnd = self.client().await; - let stream = lnd - .lightning() - .subscribe_transactions(GetTransactionsRequest::default()) - .await - .map_err(|e| Error::NodeApiError(e.to_string()))? - .into_inner() - .filter(|tx| tx.is_ok()) - .map(|tx| tx.unwrap()); - Ok(Box::pin(stream)) - } - /// Get a list of onchain transactions between the given start and end heights. pub async fn get_transactions( &self, From b3afa2496b3f1c722884cbef8b6355e327338b95 Mon Sep 17 00:00:00 2001 From: tompro Date: Thu, 20 Feb 2025 19:04:47 +0100 Subject: [PATCH 2/2] Refactor and lightning aggregate --- Cargo.lock | 17 -- Cargo.toml | 5 +- payday_btc/Cargo.toml | 14 -- payday_btc/src/lib.rs | 6 - .../src/aggregate/lightning_aggregate.rs | 162 ++++++++++++++ payday_core/src/aggregate/mod.rs | 2 + .../src/aggregate}/on_chain_aggregate.rs | 24 ++- payday_core/src/api/mod.rs | 1 + payday_core/src/api/payment_event.rs | 49 +++++ payday_core/src/lib.rs | 2 + payday_core/src/processor/mod.rs | 1 + .../src/processor}/on_chain_processor.rs | 4 +- payday_node_lnd/Cargo.toml | 1 - payday_node_lnd/src/lib.rs | 11 + payday_node_lnd/src/lnd.rs | 2 +- payday_node_lnd/src/wrapper.rs | 2 +- payday_postgres/Cargo.toml | 1 - payday_postgres/src/btc_onchain.rs | 203 +++++++++--------- 18 files changed, 350 insertions(+), 157 deletions(-) delete mode 100644 payday_btc/Cargo.toml create mode 100644 payday_core/src/aggregate/lightning_aggregate.rs create mode 100644 payday_core/src/aggregate/mod.rs rename {payday_btc/src => payday_core/src/aggregate}/on_chain_aggregate.rs (95%) create mode 100644 payday_core/src/api/payment_event.rs create mode 100644 payday_core/src/processor/mod.rs rename {payday_btc/src => payday_core/src/processor}/on_chain_processor.rs (99%) diff --git a/Cargo.lock b/Cargo.lock index ab81288..83089d7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2282,20 +2282,6 @@ version = "1.0.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "17359afc20d7ab31fdb42bb844c8b3bb1dabd7dcf7e68428492da7f16966fcef" -[[package]] -name = "payday_btc" -version = "0.1.0" -dependencies = [ - "async-trait", - "bitcoin", - "cqrs-es", - "payday_core", - "serde", - "serde_json", - "tokio", - "tokio-stream", -] - [[package]] name = "payday_core" version = "0.1.0" @@ -2320,7 +2306,6 @@ dependencies = [ "cqrs-es", "fedimint-tonic-lnd", "lightning-invoice", - "payday_btc", "payday_core", "tokio", "tokio-stream", @@ -2332,7 +2317,6 @@ version = "0.1.0" dependencies = [ "async-trait", "cqrs-es", - "payday_btc", "payday_core", "postgres-es", "serde", @@ -2346,7 +2330,6 @@ name = "payday_rs" version = "0.1.0" dependencies = [ "bitcoin", - "payday_btc", "payday_core", "payday_node_lnd", "payday_postgres", diff --git a/Cargo.toml b/Cargo.toml index 441243e..87fb3a6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -6,7 +6,6 @@ edition = "2021" [dependencies] payday_core = { path = "./payday_core" } payday_node_lnd = { path = "./payday_node_lnd" } -payday_btc = { path = "./payday_btc" } payday_surrealdb = { path = "./payday_surrealdb" } payday_postgres = { path = "./payday_postgres" } tokio.workspace = true @@ -17,7 +16,6 @@ tokio-stream.workspace = true [workspace] members = [ - "payday_btc", "payday_core", "payday_node_lnd", "payday_postgres", @@ -31,7 +29,8 @@ serde = { version = "1.0.203", features = ["derive"] } serde_json = "1.0.118" tokio-stream = "0.1.15" chrono = "0.4" -cqrs-es = "0.4.11" +cqrs-es = "0.4.12" +postgres-es = "0.4.12" tokio = { version = "1.38.0", features = ["full"] } sqlx = { version = "0.8", features = ["postgres", "json"] } futures = "0.3.30" diff --git a/payday_btc/Cargo.toml b/payday_btc/Cargo.toml deleted file mode 100644 index 751b9dd..0000000 --- a/payday_btc/Cargo.toml +++ /dev/null @@ -1,14 +0,0 @@ -[package] -name = "payday_btc" -version = "0.1.0" -edition = "2021" - -[dependencies] -payday_core = { path = "../payday_core" } -async-trait = { workspace = true } -bitcoin = { workspace = true } -serde = { workspace = true } -serde_json = { workspace = true } -cqrs-es = { workspace = true } -tokio = { workspace = true } -tokio-stream = { workspace = true } diff --git a/payday_btc/src/lib.rs b/payday_btc/src/lib.rs index 41a2315..e98b493 100644 --- a/payday_btc/src/lib.rs +++ b/payday_btc/src/lib.rs @@ -5,9 +5,3 @@ use std::str::FromStr; use bitcoin::{Address, Network}; use payday_core::Result; - -/// Given a Bitcoin address string and a network, parses and validates the address. -/// Returns a checked address result. -pub fn to_address(addr: &str, network: Network) -> Result
{ - Ok(Address::from_str(addr)?.require_network(network)?) -} diff --git a/payday_core/src/aggregate/lightning_aggregate.rs b/payday_core/src/aggregate/lightning_aggregate.rs new file mode 100644 index 0000000..14fbfac --- /dev/null +++ b/payday_core/src/aggregate/lightning_aggregate.rs @@ -0,0 +1,162 @@ +use crate::{ + api::lightining_api::LightningTransactionEvent, + payment::{ + amount::Amount, + currency::Currency, + invoice::{Error, InvoiceId}, + }, +}; +use async_trait::async_trait; +use cqrs_es::{Aggregate, DomainEvent}; +use lightning_invoice::Bolt11Invoice; +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Clone, Serialize, Deserialize, Default)] +pub struct LightningInvoice { + pub invoice_id: InvoiceId, + pub node_id: String, + pub r_hash: String, + pub invoice: String, + pub amount: Amount, + pub received_amount: Amount, + pub overpaid: bool, + pub paid: bool, +} + +#[async_trait] +pub trait LightningInvoiceService: Send + Sync {} + +pub struct LightningInvoiceServices {} + +#[allow(clippy::large_enum_variant)] +#[derive(Debug, Deserialize)] +pub enum LightningInvoiceCommand { + CreateInvoice { + invoice_id: InvoiceId, + node_id: String, + amount: Amount, + invoice: Bolt11Invoice, + }, + SettleInvoice { + received_amount: Amount, + }, +} + +impl From for LightningInvoiceCommand { + fn from(event: LightningTransactionEvent) -> Self { + match event { + LightningTransactionEvent::Settled(tx) => LightningInvoiceCommand::SettleInvoice { + received_amount: tx.amount_paid, + }, + } + } +} + +#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)] +pub enum LightningInvoiceEvent { + InvoiceCreated { + invoice_id: InvoiceId, + node_id: String, + r_hash: String, + amount: Amount, + invoice: String, + }, + InvoiceSettled { + received_amount: Amount, + overpaid: bool, + paid: bool, + }, +} + +impl DomainEvent for LightningInvoiceEvent { + fn event_type(&self) -> String { + let event_type = match self { + LightningInvoiceEvent::InvoiceCreated { .. } => "LightningInvoiceCreated", + LightningInvoiceEvent::InvoiceSettled { .. } => "LightningInvoiceSettled", + }; + event_type.to_string() + } + + fn event_version(&self) -> String { + "1.0.0".to_string() + } +} + +#[async_trait] +impl Aggregate for LightningInvoice { + type Command = LightningInvoiceCommand; + type Event = LightningInvoiceEvent; + type Error = Error; + type Services = (); + + fn aggregate_type() -> String { + "LightningInvoice".to_string() + } + + async fn handle( + &self, + command: Self::Command, + _service: &Self::Services, + ) -> Result, Self::Error> { + match command { + LightningInvoiceCommand::CreateInvoice { + invoice_id, + node_id, + amount, + invoice, + } => { + if amount.currency != Currency::Btc { + return Err(Error::InvalidCurrency( + amount.currency.to_string(), + Currency::Btc.to_string(), + )); + } + + let r_hash = invoice.payment_hash().to_string(); + let invoice = invoice.to_string(); + + Ok(vec![LightningInvoiceEvent::InvoiceCreated { + invoice_id, + node_id, + r_hash, + amount, + invoice, + }]) + } + LightningInvoiceCommand::SettleInvoice { received_amount } => { + Ok(vec![LightningInvoiceEvent::InvoiceSettled { + received_amount, + overpaid: received_amount.amount > self.amount.amount, + paid: received_amount.amount >= self.amount.amount, + }]) + } + } + } + + fn apply(&mut self, event: Self::Event) { + match event { + LightningInvoiceEvent::InvoiceCreated { + invoice_id, + node_id, + r_hash, + amount, + invoice, + } => { + self.invoice_id = invoice_id; + self.node_id = node_id; + self.r_hash = r_hash; + self.amount = amount; + self.invoice = invoice; + } + LightningInvoiceEvent::InvoiceSettled { + received_amount, + overpaid, + paid, + } => { + self.received_amount = received_amount; + self.overpaid = overpaid; + self.paid = paid; + } + } + } +} diff --git a/payday_core/src/aggregate/mod.rs b/payday_core/src/aggregate/mod.rs new file mode 100644 index 0000000..b331823 --- /dev/null +++ b/payday_core/src/aggregate/mod.rs @@ -0,0 +1,2 @@ +pub mod lightning_aggregate; +pub mod on_chain_aggregate; diff --git a/payday_btc/src/on_chain_aggregate.rs b/payday_core/src/aggregate/on_chain_aggregate.rs similarity index 95% rename from payday_btc/src/on_chain_aggregate.rs rename to payday_core/src/aggregate/on_chain_aggregate.rs index c17aae9..f70860c 100644 --- a/payday_btc/src/on_chain_aggregate.rs +++ b/payday_core/src/aggregate/on_chain_aggregate.rs @@ -1,13 +1,17 @@ +use crate::{ + api::on_chain_api::OnChainTransactionEvent, + payment::{ + amount::Amount, + currency::Currency, + invoice::{Error, InvoiceId}, + }, +}; use async_trait::async_trait; use cqrs_es::{Aggregate, DomainEvent}; -use payday_core::api::on_chain_api::OnChainTransactionEvent; -use payday_core::payment::amount::Amount; -use payday_core::payment::currency::Currency; -use payday_core::payment::invoice::{Error, InvoiceId}; use serde::{Deserialize, Serialize}; #[derive(Debug, Clone, Serialize, Deserialize)] -pub struct BtcOnChainInvoice { +pub struct OnChainInvoice { pub invoice_id: InvoiceId, pub node_id: String, pub address: String, @@ -20,7 +24,7 @@ pub struct BtcOnChainInvoice { pub paid: bool, } -impl Default for BtcOnChainInvoice { +impl Default for OnChainInvoice { fn default() -> Self { Self { invoice_id: "".to_string(), @@ -143,14 +147,14 @@ impl DomainEvent for OnChainInvoiceEvent { } #[async_trait] -impl Aggregate for BtcOnChainInvoice { +impl Aggregate for OnChainInvoice { type Command = OnChainInvoiceCommand; type Event = OnChainInvoiceEvent; type Error = Error; type Services = (); fn aggregate_type() -> String { - "BtcOnChainInvoice".to_string() + "OnChainInvoice".to_string() } async fn handle( @@ -242,12 +246,12 @@ impl Aggregate for BtcOnChainInvoice { #[cfg(test)] mod aggregate_tests { + use crate::payment::currency::Currency; use cqrs_es::test::TestFramework; - use payday_core::payment::currency::Currency; use super::*; - type OnChainInvoiceTestFramework = TestFramework; + type OnChainInvoiceTestFramework = TestFramework; #[test] fn test_create_invoice() { diff --git a/payday_core/src/api/mod.rs b/payday_core/src/api/mod.rs index 9684297..b99f1a8 100644 --- a/payday_core/src/api/mod.rs +++ b/payday_core/src/api/mod.rs @@ -1,2 +1,3 @@ pub mod lightining_api; pub mod on_chain_api; +pub mod payment_event; diff --git a/payday_core/src/api/payment_event.rs b/payday_core/src/api/payment_event.rs new file mode 100644 index 0000000..0557982 --- /dev/null +++ b/payday_core/src/api/payment_event.rs @@ -0,0 +1,49 @@ +use crate::payment::amount::Amount; +use crate::payment::invoice::InvoiceId; +use crate::Result; + +pub trait PaymentEventPublisherApi { + /// Publishes a transaction event. + fn publish_received_event(&self, event: PaymentReceivedEvent) -> Result<()>; +} + +pub enum PaymentReceivedEvent { + /// A payment for an invoice has been received but not confirmed yet. + OnChainUnconfirmed(OnChainPaymentEvent), + /// A payment for an invoice has been received and has been confirmed. + OnChainConfirmed(OnChainPaymentEvent), + /// We received a payment where there is no invoice in the system. + OnChainUnexpected(OnChainUexpectedPaymentEvent), + /// We received a payment for an invoice that has already been paid. + OnChainUsedAddress(OnChainUsedAddressPaymentEvent), +} + +pub struct OnChainData { + pub node_id: String, + pub address: String, + pub confirmations: u64, + pub transaction_id: Option, +} + +pub struct OnChainPaymentEvent { + pub invoice_id: InvoiceId, + pub invoice_amount: Amount, + pub received_amount: Amount, + pub underpayment: bool, + pub overpayment: bool, + pub on_chain_data: OnChainData, + pub paid: bool, +} + +pub struct OnChainUexpectedPaymentEvent { + pub received_amount: Amount, + pub on_chain_data: OnChainData, + pub paid: bool, +} + +pub struct OnChainUsedAddressPaymentEvent { + pub received_amount: Amount, + pub on_chain_data: OnChainData, + pub original_invoice_id: InvoiceId, + pub paid: bool, +} diff --git a/payday_core/src/lib.rs b/payday_core/src/lib.rs index bb336a1..653299e 100644 --- a/payday_core/src/lib.rs +++ b/payday_core/src/lib.rs @@ -1,10 +1,12 @@ pub use error::Error; +pub mod aggregate; pub mod api; pub mod date; pub mod error; pub mod events; pub mod payment; pub mod persistence; +pub mod processor; pub type Result = std::result::Result; diff --git a/payday_core/src/processor/mod.rs b/payday_core/src/processor/mod.rs new file mode 100644 index 0000000..c878fca --- /dev/null +++ b/payday_core/src/processor/mod.rs @@ -0,0 +1 @@ +pub mod on_chain_processor; diff --git a/payday_btc/src/on_chain_processor.rs b/payday_core/src/processor/on_chain_processor.rs similarity index 99% rename from payday_btc/src/on_chain_processor.rs rename to payday_core/src/processor/on_chain_processor.rs index 77a3f5c..c212148 100644 --- a/payday_btc/src/on_chain_processor.rs +++ b/payday_core/src/processor/on_chain_processor.rs @@ -1,7 +1,6 @@ use std::sync::Arc; -use async_trait::async_trait; -use payday_core::{ +use crate::{ api::on_chain_api::{ OnChainTransactionEvent, OnChainTransactionEventHandler, OnChainTransactionEventProcessorApi, @@ -9,6 +8,7 @@ use payday_core::{ persistence::block_height::BlockHeightStoreApi, Result, }; +use async_trait::async_trait; use tokio::sync::Mutex; pub struct OnChainTransactionProcessor { diff --git a/payday_node_lnd/Cargo.toml b/payday_node_lnd/Cargo.toml index 58fe6d6..a4ce534 100644 --- a/payday_node_lnd/Cargo.toml +++ b/payday_node_lnd/Cargo.toml @@ -5,7 +5,6 @@ edition = "2021" [dependencies] payday_core = { path = "../payday_core" } -payday_btc = { path = "../payday_btc" } fedimint-tonic-lnd = "0.2.0" async-trait.workspace = true bitcoin.workspace = true diff --git a/payday_node_lnd/src/lib.rs b/payday_node_lnd/src/lib.rs index 06921fd..588439b 100644 --- a/payday_node_lnd/src/lib.rs +++ b/payday_node_lnd/src/lib.rs @@ -1,2 +1,13 @@ pub mod lnd; pub mod wrapper; + +use std::str::FromStr; + +use bitcoin::{Address, Network}; +use payday_core::Result; + +/// Given a Bitcoin address string and a network, parses and validates the address. +/// Returns a checked address result. +pub fn to_address(addr: &str, network: Network) -> Result
{ + Ok(Address::from_str(addr)?.require_network(network)?) +} diff --git a/payday_node_lnd/src/lnd.rs b/payday_node_lnd/src/lnd.rs index a9968ea..f6be075 100644 --- a/payday_node_lnd/src/lnd.rs +++ b/payday_node_lnd/src/lnd.rs @@ -3,12 +3,12 @@ use std::collections::HashMap; use async_trait::async_trait; use bitcoin::{hex::DisplayHex, Address, Network}; +use crate::to_address; use fedimint_tonic_lnd::{ lnrpc::{GetTransactionsRequest, InvoiceSubscription, Transaction}, Client, }; use lightning_invoice::Bolt11Invoice; -use payday_btc::to_address; use payday_core::{ api::{ lightining_api::{ diff --git a/payday_node_lnd/src/wrapper.rs b/payday_node_lnd/src/wrapper.rs index 216ebf3..1c2ebbc 100644 --- a/payday_node_lnd/src/wrapper.rs +++ b/payday_node_lnd/src/wrapper.rs @@ -6,6 +6,7 @@ //! operations needed for invoicing. use std::{collections::HashMap, sync::Arc, time::Duration}; +use crate::to_address; use bitcoin::{hex::DisplayHex, Address, Amount, Network, PublicKey}; use fedimint_tonic_lnd::{ lnrpc::{ @@ -16,7 +17,6 @@ use fedimint_tonic_lnd::{ Client, }; use lightning_invoice::Bolt11Invoice; -use payday_btc::to_address; use payday_core::{api::lightining_api::LnInvoice, Error, Result}; use tokio::sync::{Mutex, MutexGuard}; use tokio_stream::StreamExt; diff --git a/payday_postgres/Cargo.toml b/payday_postgres/Cargo.toml index c1e56db..0bd9bcb 100644 --- a/payday_postgres/Cargo.toml +++ b/payday_postgres/Cargo.toml @@ -5,7 +5,6 @@ edition = "2021" [dependencies] payday_core = { path = "../payday_core" } -payday_btc = { path = "../payday_btc" } async-trait = { workspace = true } cqrs-es = { workspace = true } sqlx = { workspace = true } diff --git a/payday_postgres/src/btc_onchain.rs b/payday_postgres/src/btc_onchain.rs index e49d9f6..fbb5103 100644 --- a/payday_postgres/src/btc_onchain.rs +++ b/payday_postgres/src/btc_onchain.rs @@ -1,6 +1,7 @@ // use async_trait::async_trait; -// use payday_btc::on_chain_aggregate::{BtcOnChainInvoice, OnChainInvoiceCommand}; // use payday_core::{ +// aggregate::on_chain_aggregate::{BtcOnChainInvoice, OnChainInvoiceCommand}, +// api::on_chain_api::OnChainInvoiceApi, // payment::{ // amount::Amount, // currency::Currency, @@ -10,108 +11,108 @@ // }; // use postgres_es::PostgresCqrs; // use serde_json::Value; - -//pub struct OnChainProcessor { -// name: String, -// supported_payment_type: PaymentType, -// on_chain_api: Box, -// tx_stream: Box, -// cqrs: PostgresCqrs, -//} // -//impl OnChainProcessor { -// pub fn new( -// name: String, -// supported_payment_type: PaymentType, -// on_chain_api: Box, -// tx_stream: Box, -// cqrs: PostgresCqrs, -// ) -> Self { -// Self { -// name, -// supported_payment_type, -// on_chain_api, -// tx_stream, -// cqrs, -// } -// } -//} +// pub struct OnChainProcessor { +// name: String, +// supported_payment_type: PaymentType, +// on_chain_api: Box, +// tx_stream: Box, +// cqrs: PostgresCqrs, +// } // -//#[async_trait] -//impl PaymentProcessorApi for OnChainProcessor { -// fn name(&self) -> String { -// self.name.to_owned() -// } +// impl OnChainProcessor { +// pub fn new( +// name: String, +// supported_payment_type: PaymentType, +// on_chain_api: Box, +// tx_stream: Box, +// cqrs: PostgresCqrs, +// ) -> Self { +// Self { +// name, +// supported_payment_type, +// on_chain_api, +// tx_stream, +// cqrs, +// } +// } +// } // -// fn supported_payment_type(&self) -> PaymentType { -// self.supported_payment_type.to_owned() -// } +// #[async_trait] +// impl PaymentProcessorApi for OnChainProcessor { +// fn name(&self) -> String { +// self.name.to_owned() +// } // -// async fn create_invoice( -// &self, -// invoice_id: InvoiceId, -// amount: Amount, -// _memo: Option, -// ) -> Result { -// let address = self.on_chain_api.new_address().await?; -// self.cqrs -// .execute( -// &address.to_string(), -// OnChainInvoiceCommand::CreateInvoice { -// invoice_id: invoice_id.to_string(), -// amount, -// address: address.to_string(), -// }, -// ) -// .await -// .map_err(|e| PaydayError::DbError(e.to_string()))?; -// Ok(Invoice { -// service_name: self.name(), -// invoice_id, -// amount, -// payment_type: self.supported_payment_type(), -// payment_info: Value::String(address.to_string()), -// }) -// } +// fn supported_payment_type(&self) -> PaymentType { +// self.supported_payment_type.to_owned() +// } // -// async fn process_payment_events(&self) -> Result<()> { -// let mut subscriber = self.tx_stream.subscribe_events()?; -// while let Some(event) = subscriber.recv().await { -// let (aggregate_id, command) = match event { -// OnChainTransactionEvent::ReceivedConfirmed(tx) => ( -// tx.address, -// OnChainInvoiceCommand::SetConfirmed { -// confirmations: tx.confirmations as u64, -// amount: Amount::new(Currency::Btc, tx.amount.to_sat()), -// transaction_id: tx.tx_id.to_owned(), -// }, -// ), -// OnChainTransactionEvent::ReceivedUnconfirmed(tx) => ( -// tx.address, -// OnChainInvoiceCommand::SetPending { -// amount: Amount::new(Currency::Btc, tx.amount.to_sat()), -// }, -// ), -// OnChainTransactionEvent::SentConfirmed(tx) => ( -// tx.address, -// OnChainInvoiceCommand::SetConfirmed { -// confirmations: tx.confirmations as u64, -// amount: Amount::new(Currency::Btc, tx.amount.to_sat()), -// transaction_id: tx.tx_id.to_owned(), -// }, -// ), -// OnChainTransactionEvent::SentUnconfirmed(tx) => ( -// tx.address, -// OnChainInvoiceCommand::SetPending { -// amount: Amount::new(Currency::Btc, tx.amount.to_sat()), -// }, -// ), -// }; -// self.cqrs -// .execute(&aggregate_id.to_string(), command) -// .await -// .map_err(|e| PaydayError::DbError(e.to_string()))?; -// } -// Ok(()) -// } -//} +// async fn create_invoice( +// &self, +// invoice_id: InvoiceId, +// amount: Amount, +// _memo: Option, +// ) -> Result { +// let address = self.on_chain_api.new_address().await?; +// self.cqrs +// .execute( +// &address.to_string(), +// OnChainInvoiceCommand::CreateInvoice { +// invoice_id: invoice_id.to_string(), +// amount, +// address: address.to_string(), +// }, +// ) +// .await +// .map_err(|e| Error::DbError(e.to_string()))?; +// Ok(Invoice { +// service_name: self.name(), +// invoice_id, +// amount, +// payment_type: self.supported_payment_type(), +// payment_info: Value::String(address.to_string()), +// }) +// } +// +// async fn process_payment_events(&self) -> Result<()> { +// let mut subscriber = self.tx_stream.subscribe_events()?; +// while let Some(event) = subscriber.recv().await { +// let (aggregate_id, command) = match event { +// OnChainTransactionEvent::ReceivedConfirmed(tx) => ( +// tx.address, +// OnChainInvoiceCommand::SetConfirmed { +// confirmations: tx.confirmations as u64, +// amount: Amount::new(Currency::Btc, tx.amount.to_sat()), +// transaction_id: tx.tx_id.to_owned(), +// }, +// ), +// OnChainTransactionEvent::ReceivedUnconfirmed(tx) => ( +// tx.address, +// OnChainInvoiceCommand::SetPending { +// amount: Amount::new(Currency::Btc, tx.amount.to_sat()), +// }, +// ), +// OnChainTransactionEvent::SentConfirmed(tx) => ( +// tx.address, +// OnChainInvoiceCommand::SetConfirmed { +// confirmations: tx.confirmations as u64, +// amount: Amount::new(Currency::Btc, tx.amount.to_sat()), +// transaction_id: tx.tx_id.to_owned(), +// }, +// ), +// OnChainTransactionEvent::SentUnconfirmed(tx) => ( +// tx.address, +// OnChainInvoiceCommand::SetPending { +// amount: Amount::new(Currency::Btc, tx.amount.to_sat()), +// }, +// ), +// }; +// self.cqrs +// .execute(&aggregate_id.to_string(), command) +// .await +// .map_err(|e| Error::DbError(e.to_string()))?; +// } +// Ok(()) +// } +// }