diff --git a/Cargo.lock b/Cargo.lock
index ab81288..83089d7 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2282,20 +2282,6 @@ version = "1.0.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "17359afc20d7ab31fdb42bb844c8b3bb1dabd7dcf7e68428492da7f16966fcef"
-[[package]]
-name = "payday_btc"
-version = "0.1.0"
-dependencies = [
- "async-trait",
- "bitcoin",
- "cqrs-es",
- "payday_core",
- "serde",
- "serde_json",
- "tokio",
- "tokio-stream",
-]
-
[[package]]
name = "payday_core"
version = "0.1.0"
@@ -2320,7 +2306,6 @@ dependencies = [
"cqrs-es",
"fedimint-tonic-lnd",
"lightning-invoice",
- "payday_btc",
"payday_core",
"tokio",
"tokio-stream",
@@ -2332,7 +2317,6 @@ version = "0.1.0"
dependencies = [
"async-trait",
"cqrs-es",
- "payday_btc",
"payday_core",
"postgres-es",
"serde",
@@ -2346,7 +2330,6 @@ name = "payday_rs"
version = "0.1.0"
dependencies = [
"bitcoin",
- "payday_btc",
"payday_core",
"payday_node_lnd",
"payday_postgres",
diff --git a/Cargo.toml b/Cargo.toml
index 441243e..87fb3a6 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -6,7 +6,6 @@ edition = "2021"
[dependencies]
payday_core = { path = "./payday_core" }
payday_node_lnd = { path = "./payday_node_lnd" }
-payday_btc = { path = "./payday_btc" }
payday_surrealdb = { path = "./payday_surrealdb" }
payday_postgres = { path = "./payday_postgres" }
tokio.workspace = true
@@ -17,7 +16,6 @@ tokio-stream.workspace = true
[workspace]
members = [
- "payday_btc",
"payday_core",
"payday_node_lnd",
"payday_postgres",
@@ -31,7 +29,8 @@ serde = { version = "1.0.203", features = ["derive"] }
serde_json = "1.0.118"
tokio-stream = "0.1.15"
chrono = "0.4"
-cqrs-es = "0.4.11"
+cqrs-es = "0.4.12"
+postgres-es = "0.4.12"
tokio = { version = "1.38.0", features = ["full"] }
sqlx = { version = "0.8", features = ["postgres", "json"] }
futures = "0.3.30"
diff --git a/payday_btc/Cargo.toml b/payday_btc/Cargo.toml
deleted file mode 100644
index 751b9dd..0000000
--- a/payday_btc/Cargo.toml
+++ /dev/null
@@ -1,14 +0,0 @@
-[package]
-name = "payday_btc"
-version = "0.1.0"
-edition = "2021"
-
-[dependencies]
-payday_core = { path = "../payday_core" }
-async-trait = { workspace = true }
-bitcoin = { workspace = true }
-serde = { workspace = true }
-serde_json = { workspace = true }
-cqrs-es = { workspace = true }
-tokio = { workspace = true }
-tokio-stream = { workspace = true }
diff --git a/payday_btc/src/lib.rs b/payday_btc/src/lib.rs
index 41a2315..e98b493 100644
--- a/payday_btc/src/lib.rs
+++ b/payday_btc/src/lib.rs
@@ -5,9 +5,3 @@ use std::str::FromStr;
use bitcoin::{Address, Network};
use payday_core::Result;
-
-/// Given a Bitcoin address string and a network, parses and validates the address.
-/// Returns a checked address result.
-pub fn to_address(addr: &str, network: Network) -> Result
{
- Ok(Address::from_str(addr)?.require_network(network)?)
-}
diff --git a/payday_core/src/aggregate/lightning_aggregate.rs b/payday_core/src/aggregate/lightning_aggregate.rs
new file mode 100644
index 0000000..14fbfac
--- /dev/null
+++ b/payday_core/src/aggregate/lightning_aggregate.rs
@@ -0,0 +1,162 @@
+use crate::{
+ api::lightining_api::LightningTransactionEvent,
+ payment::{
+ amount::Amount,
+ currency::Currency,
+ invoice::{Error, InvoiceId},
+ },
+};
+use async_trait::async_trait;
+use cqrs_es::{Aggregate, DomainEvent};
+use lightning_invoice::Bolt11Invoice;
+use serde::{Deserialize, Serialize};
+
+#[derive(Debug, Clone, Serialize, Deserialize, Default)]
+pub struct LightningInvoice {
+ pub invoice_id: InvoiceId,
+ pub node_id: String,
+ pub r_hash: String,
+ pub invoice: String,
+ pub amount: Amount,
+ pub received_amount: Amount,
+ pub overpaid: bool,
+ pub paid: bool,
+}
+
+#[async_trait]
+pub trait LightningInvoiceService: Send + Sync {}
+
+pub struct LightningInvoiceServices {}
+
+#[allow(clippy::large_enum_variant)]
+#[derive(Debug, Deserialize)]
+pub enum LightningInvoiceCommand {
+ CreateInvoice {
+ invoice_id: InvoiceId,
+ node_id: String,
+ amount: Amount,
+ invoice: Bolt11Invoice,
+ },
+ SettleInvoice {
+ received_amount: Amount,
+ },
+}
+
+impl From for LightningInvoiceCommand {
+ fn from(event: LightningTransactionEvent) -> Self {
+ match event {
+ LightningTransactionEvent::Settled(tx) => LightningInvoiceCommand::SettleInvoice {
+ received_amount: tx.amount_paid,
+ },
+ }
+ }
+}
+
+#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
+pub enum LightningInvoiceEvent {
+ InvoiceCreated {
+ invoice_id: InvoiceId,
+ node_id: String,
+ r_hash: String,
+ amount: Amount,
+ invoice: String,
+ },
+ InvoiceSettled {
+ received_amount: Amount,
+ overpaid: bool,
+ paid: bool,
+ },
+}
+
+impl DomainEvent for LightningInvoiceEvent {
+ fn event_type(&self) -> String {
+ let event_type = match self {
+ LightningInvoiceEvent::InvoiceCreated { .. } => "LightningInvoiceCreated",
+ LightningInvoiceEvent::InvoiceSettled { .. } => "LightningInvoiceSettled",
+ };
+ event_type.to_string()
+ }
+
+ fn event_version(&self) -> String {
+ "1.0.0".to_string()
+ }
+}
+
+#[async_trait]
+impl Aggregate for LightningInvoice {
+ type Command = LightningInvoiceCommand;
+ type Event = LightningInvoiceEvent;
+ type Error = Error;
+ type Services = ();
+
+ fn aggregate_type() -> String {
+ "LightningInvoice".to_string()
+ }
+
+ async fn handle(
+ &self,
+ command: Self::Command,
+ _service: &Self::Services,
+ ) -> Result, Self::Error> {
+ match command {
+ LightningInvoiceCommand::CreateInvoice {
+ invoice_id,
+ node_id,
+ amount,
+ invoice,
+ } => {
+ if amount.currency != Currency::Btc {
+ return Err(Error::InvalidCurrency(
+ amount.currency.to_string(),
+ Currency::Btc.to_string(),
+ ));
+ }
+
+ let r_hash = invoice.payment_hash().to_string();
+ let invoice = invoice.to_string();
+
+ Ok(vec![LightningInvoiceEvent::InvoiceCreated {
+ invoice_id,
+ node_id,
+ r_hash,
+ amount,
+ invoice,
+ }])
+ }
+ LightningInvoiceCommand::SettleInvoice { received_amount } => {
+ Ok(vec![LightningInvoiceEvent::InvoiceSettled {
+ received_amount,
+ overpaid: received_amount.amount > self.amount.amount,
+ paid: received_amount.amount >= self.amount.amount,
+ }])
+ }
+ }
+ }
+
+ fn apply(&mut self, event: Self::Event) {
+ match event {
+ LightningInvoiceEvent::InvoiceCreated {
+ invoice_id,
+ node_id,
+ r_hash,
+ amount,
+ invoice,
+ } => {
+ self.invoice_id = invoice_id;
+ self.node_id = node_id;
+ self.r_hash = r_hash;
+ self.amount = amount;
+ self.invoice = invoice;
+ }
+ LightningInvoiceEvent::InvoiceSettled {
+ received_amount,
+ overpaid,
+ paid,
+ } => {
+ self.received_amount = received_amount;
+ self.overpaid = overpaid;
+ self.paid = paid;
+ }
+ }
+ }
+}
diff --git a/payday_core/src/aggregate/mod.rs b/payday_core/src/aggregate/mod.rs
new file mode 100644
index 0000000..b331823
--- /dev/null
+++ b/payday_core/src/aggregate/mod.rs
@@ -0,0 +1,2 @@
+pub mod lightning_aggregate;
+pub mod on_chain_aggregate;
diff --git a/payday_btc/src/on_chain_aggregate.rs b/payday_core/src/aggregate/on_chain_aggregate.rs
similarity index 95%
rename from payday_btc/src/on_chain_aggregate.rs
rename to payday_core/src/aggregate/on_chain_aggregate.rs
index c17aae9..f70860c 100644
--- a/payday_btc/src/on_chain_aggregate.rs
+++ b/payday_core/src/aggregate/on_chain_aggregate.rs
@@ -1,13 +1,17 @@
+use crate::{
+ api::on_chain_api::OnChainTransactionEvent,
+ payment::{
+ amount::Amount,
+ currency::Currency,
+ invoice::{Error, InvoiceId},
+ },
+};
use async_trait::async_trait;
use cqrs_es::{Aggregate, DomainEvent};
-use payday_core::api::on_chain_api::OnChainTransactionEvent;
-use payday_core::payment::amount::Amount;
-use payday_core::payment::currency::Currency;
-use payday_core::payment::invoice::{Error, InvoiceId};
use serde::{Deserialize, Serialize};
#[derive(Debug, Clone, Serialize, Deserialize)]
-pub struct BtcOnChainInvoice {
+pub struct OnChainInvoice {
pub invoice_id: InvoiceId,
pub node_id: String,
pub address: String,
@@ -20,7 +24,7 @@ pub struct BtcOnChainInvoice {
pub paid: bool,
}
-impl Default for BtcOnChainInvoice {
+impl Default for OnChainInvoice {
fn default() -> Self {
Self {
invoice_id: "".to_string(),
@@ -143,14 +147,14 @@ impl DomainEvent for OnChainInvoiceEvent {
}
#[async_trait]
-impl Aggregate for BtcOnChainInvoice {
+impl Aggregate for OnChainInvoice {
type Command = OnChainInvoiceCommand;
type Event = OnChainInvoiceEvent;
type Error = Error;
type Services = ();
fn aggregate_type() -> String {
- "BtcOnChainInvoice".to_string()
+ "OnChainInvoice".to_string()
}
async fn handle(
@@ -242,12 +246,12 @@ impl Aggregate for BtcOnChainInvoice {
#[cfg(test)]
mod aggregate_tests {
+ use crate::payment::currency::Currency;
use cqrs_es::test::TestFramework;
- use payday_core::payment::currency::Currency;
use super::*;
- type OnChainInvoiceTestFramework = TestFramework;
+ type OnChainInvoiceTestFramework = TestFramework;
#[test]
fn test_create_invoice() {
diff --git a/payday_core/src/api/mod.rs b/payday_core/src/api/mod.rs
index 9684297..b99f1a8 100644
--- a/payday_core/src/api/mod.rs
+++ b/payday_core/src/api/mod.rs
@@ -1,2 +1,3 @@
pub mod lightining_api;
pub mod on_chain_api;
+pub mod payment_event;
diff --git a/payday_core/src/api/payment_event.rs b/payday_core/src/api/payment_event.rs
new file mode 100644
index 0000000..0557982
--- /dev/null
+++ b/payday_core/src/api/payment_event.rs
@@ -0,0 +1,49 @@
+use crate::payment::amount::Amount;
+use crate::payment::invoice::InvoiceId;
+use crate::Result;
+
+pub trait PaymentEventPublisherApi {
+ /// Publishes a transaction event.
+ fn publish_received_event(&self, event: PaymentReceivedEvent) -> Result<()>;
+}
+
+pub enum PaymentReceivedEvent {
+ /// A payment for an invoice has been received but not confirmed yet.
+ OnChainUnconfirmed(OnChainPaymentEvent),
+ /// A payment for an invoice has been received and has been confirmed.
+ OnChainConfirmed(OnChainPaymentEvent),
+ /// We received a payment where there is no invoice in the system.
+ OnChainUnexpected(OnChainUexpectedPaymentEvent),
+ /// We received a payment for an invoice that has already been paid.
+ OnChainUsedAddress(OnChainUsedAddressPaymentEvent),
+}
+
+pub struct OnChainData {
+ pub node_id: String,
+ pub address: String,
+ pub confirmations: u64,
+ pub transaction_id: Option,
+}
+
+pub struct OnChainPaymentEvent {
+ pub invoice_id: InvoiceId,
+ pub invoice_amount: Amount,
+ pub received_amount: Amount,
+ pub underpayment: bool,
+ pub overpayment: bool,
+ pub on_chain_data: OnChainData,
+ pub paid: bool,
+}
+
+pub struct OnChainUexpectedPaymentEvent {
+ pub received_amount: Amount,
+ pub on_chain_data: OnChainData,
+ pub paid: bool,
+}
+
+pub struct OnChainUsedAddressPaymentEvent {
+ pub received_amount: Amount,
+ pub on_chain_data: OnChainData,
+ pub original_invoice_id: InvoiceId,
+ pub paid: bool,
+}
diff --git a/payday_core/src/lib.rs b/payday_core/src/lib.rs
index 4c3fe17..653299e 100644
--- a/payday_core/src/lib.rs
+++ b/payday_core/src/lib.rs
@@ -1,15 +1,12 @@
-use std::pin::Pin;
-
-use tokio_stream::Stream;
-
pub use error::Error;
+pub mod aggregate;
pub mod api;
pub mod date;
pub mod error;
pub mod events;
pub mod payment;
pub mod persistence;
+pub mod processor;
pub type Result = std::result::Result;
-pub type PaydayStream = Pin>>;
diff --git a/payday_core/src/processor/mod.rs b/payday_core/src/processor/mod.rs
new file mode 100644
index 0000000..c878fca
--- /dev/null
+++ b/payday_core/src/processor/mod.rs
@@ -0,0 +1 @@
+pub mod on_chain_processor;
diff --git a/payday_btc/src/on_chain_processor.rs b/payday_core/src/processor/on_chain_processor.rs
similarity index 99%
rename from payday_btc/src/on_chain_processor.rs
rename to payday_core/src/processor/on_chain_processor.rs
index 77a3f5c..c212148 100644
--- a/payday_btc/src/on_chain_processor.rs
+++ b/payday_core/src/processor/on_chain_processor.rs
@@ -1,7 +1,6 @@
use std::sync::Arc;
-use async_trait::async_trait;
-use payday_core::{
+use crate::{
api::on_chain_api::{
OnChainTransactionEvent, OnChainTransactionEventHandler,
OnChainTransactionEventProcessorApi,
@@ -9,6 +8,7 @@ use payday_core::{
persistence::block_height::BlockHeightStoreApi,
Result,
};
+use async_trait::async_trait;
use tokio::sync::Mutex;
pub struct OnChainTransactionProcessor {
diff --git a/payday_node_lnd/Cargo.toml b/payday_node_lnd/Cargo.toml
index 58fe6d6..a4ce534 100644
--- a/payday_node_lnd/Cargo.toml
+++ b/payday_node_lnd/Cargo.toml
@@ -5,7 +5,6 @@ edition = "2021"
[dependencies]
payday_core = { path = "../payday_core" }
-payday_btc = { path = "../payday_btc" }
fedimint-tonic-lnd = "0.2.0"
async-trait.workspace = true
bitcoin.workspace = true
diff --git a/payday_node_lnd/src/lib.rs b/payday_node_lnd/src/lib.rs
index 06921fd..588439b 100644
--- a/payday_node_lnd/src/lib.rs
+++ b/payday_node_lnd/src/lib.rs
@@ -1,2 +1,13 @@
pub mod lnd;
pub mod wrapper;
+
+use std::str::FromStr;
+
+use bitcoin::{Address, Network};
+use payday_core::Result;
+
+/// Given a Bitcoin address string and a network, parses and validates the address.
+/// Returns a checked address result.
+pub fn to_address(addr: &str, network: Network) -> Result {
+ Ok(Address::from_str(addr)?.require_network(network)?)
+}
diff --git a/payday_node_lnd/src/lnd.rs b/payday_node_lnd/src/lnd.rs
index a9968ea..f6be075 100644
--- a/payday_node_lnd/src/lnd.rs
+++ b/payday_node_lnd/src/lnd.rs
@@ -3,12 +3,12 @@ use std::collections::HashMap;
use async_trait::async_trait;
use bitcoin::{hex::DisplayHex, Address, Network};
+use crate::to_address;
use fedimint_tonic_lnd::{
lnrpc::{GetTransactionsRequest, InvoiceSubscription, Transaction},
Client,
};
use lightning_invoice::Bolt11Invoice;
-use payday_btc::to_address;
use payday_core::{
api::{
lightining_api::{
diff --git a/payday_node_lnd/src/wrapper.rs b/payday_node_lnd/src/wrapper.rs
index 4c1c3e3..1c2ebbc 100644
--- a/payday_node_lnd/src/wrapper.rs
+++ b/payday_node_lnd/src/wrapper.rs
@@ -6,6 +6,7 @@
//! operations needed for invoicing.
use std::{collections::HashMap, sync::Arc, time::Duration};
+use crate::to_address;
use bitcoin::{hex::DisplayHex, Address, Amount, Network, PublicKey};
use fedimint_tonic_lnd::{
lnrpc::{
@@ -16,8 +17,7 @@ use fedimint_tonic_lnd::{
Client,
};
use lightning_invoice::Bolt11Invoice;
-use payday_btc::to_address;
-use payday_core::{api::lightining_api::LnInvoice, Error, PaydayStream, Result};
+use payday_core::{api::lightining_api::LnInvoice, Error, Result};
use tokio::sync::{Mutex, MutexGuard};
use tokio_stream::StreamExt;
@@ -296,21 +296,6 @@ impl LndRpcWrapper {
Ok(result)
}
- /// Get a stream of onchain transactions relevant to the wallet. As LND RPC does not handle
- /// the request arguments, we do not provide any on this method to avoid confusion.
- pub async fn subscribe_transactions(&self) -> Result> {
- let mut lnd = self.client().await;
- let stream = lnd
- .lightning()
- .subscribe_transactions(GetTransactionsRequest::default())
- .await
- .map_err(|e| Error::NodeApiError(e.to_string()))?
- .into_inner()
- .filter(|tx| tx.is_ok())
- .map(|tx| tx.unwrap());
- Ok(Box::pin(stream))
- }
-
/// Get a list of onchain transactions between the given start and end heights.
pub async fn get_transactions(
&self,
diff --git a/payday_postgres/Cargo.toml b/payday_postgres/Cargo.toml
index c1e56db..0bd9bcb 100644
--- a/payday_postgres/Cargo.toml
+++ b/payday_postgres/Cargo.toml
@@ -5,7 +5,6 @@ edition = "2021"
[dependencies]
payday_core = { path = "../payday_core" }
-payday_btc = { path = "../payday_btc" }
async-trait = { workspace = true }
cqrs-es = { workspace = true }
sqlx = { workspace = true }
diff --git a/payday_postgres/src/btc_onchain.rs b/payday_postgres/src/btc_onchain.rs
index e49d9f6..fbb5103 100644
--- a/payday_postgres/src/btc_onchain.rs
+++ b/payday_postgres/src/btc_onchain.rs
@@ -1,6 +1,7 @@
// use async_trait::async_trait;
-// use payday_btc::on_chain_aggregate::{BtcOnChainInvoice, OnChainInvoiceCommand};
// use payday_core::{
+// aggregate::on_chain_aggregate::{BtcOnChainInvoice, OnChainInvoiceCommand},
+// api::on_chain_api::OnChainInvoiceApi,
// payment::{
// amount::Amount,
// currency::Currency,
@@ -10,108 +11,108 @@
// };
// use postgres_es::PostgresCqrs;
// use serde_json::Value;
-
-//pub struct OnChainProcessor {
-// name: String,
-// supported_payment_type: PaymentType,
-// on_chain_api: Box,
-// tx_stream: Box,
-// cqrs: PostgresCqrs,
-//}
//
-//impl OnChainProcessor {
-// pub fn new(
-// name: String,
-// supported_payment_type: PaymentType,
-// on_chain_api: Box,
-// tx_stream: Box,
-// cqrs: PostgresCqrs,
-// ) -> Self {
-// Self {
-// name,
-// supported_payment_type,
-// on_chain_api,
-// tx_stream,
-// cqrs,
-// }
-// }
-//}
+// pub struct OnChainProcessor {
+// name: String,
+// supported_payment_type: PaymentType,
+// on_chain_api: Box,
+// tx_stream: Box,
+// cqrs: PostgresCqrs,
+// }
//
-//#[async_trait]
-//impl PaymentProcessorApi for OnChainProcessor {
-// fn name(&self) -> String {
-// self.name.to_owned()
-// }
+// impl OnChainProcessor {
+// pub fn new(
+// name: String,
+// supported_payment_type: PaymentType,
+// on_chain_api: Box,
+// tx_stream: Box,
+// cqrs: PostgresCqrs,
+// ) -> Self {
+// Self {
+// name,
+// supported_payment_type,
+// on_chain_api,
+// tx_stream,
+// cqrs,
+// }
+// }
+// }
//
-// fn supported_payment_type(&self) -> PaymentType {
-// self.supported_payment_type.to_owned()
-// }
+// #[async_trait]
+// impl PaymentProcessorApi for OnChainProcessor {
+// fn name(&self) -> String {
+// self.name.to_owned()
+// }
//
-// async fn create_invoice(
-// &self,
-// invoice_id: InvoiceId,
-// amount: Amount,
-// _memo: Option,
-// ) -> Result {
-// let address = self.on_chain_api.new_address().await?;
-// self.cqrs
-// .execute(
-// &address.to_string(),
-// OnChainInvoiceCommand::CreateInvoice {
-// invoice_id: invoice_id.to_string(),
-// amount,
-// address: address.to_string(),
-// },
-// )
-// .await
-// .map_err(|e| PaydayError::DbError(e.to_string()))?;
-// Ok(Invoice {
-// service_name: self.name(),
-// invoice_id,
-// amount,
-// payment_type: self.supported_payment_type(),
-// payment_info: Value::String(address.to_string()),
-// })
-// }
+// fn supported_payment_type(&self) -> PaymentType {
+// self.supported_payment_type.to_owned()
+// }
//
-// async fn process_payment_events(&self) -> Result<()> {
-// let mut subscriber = self.tx_stream.subscribe_events()?;
-// while let Some(event) = subscriber.recv().await {
-// let (aggregate_id, command) = match event {
-// OnChainTransactionEvent::ReceivedConfirmed(tx) => (
-// tx.address,
-// OnChainInvoiceCommand::SetConfirmed {
-// confirmations: tx.confirmations as u64,
-// amount: Amount::new(Currency::Btc, tx.amount.to_sat()),
-// transaction_id: tx.tx_id.to_owned(),
-// },
-// ),
-// OnChainTransactionEvent::ReceivedUnconfirmed(tx) => (
-// tx.address,
-// OnChainInvoiceCommand::SetPending {
-// amount: Amount::new(Currency::Btc, tx.amount.to_sat()),
-// },
-// ),
-// OnChainTransactionEvent::SentConfirmed(tx) => (
-// tx.address,
-// OnChainInvoiceCommand::SetConfirmed {
-// confirmations: tx.confirmations as u64,
-// amount: Amount::new(Currency::Btc, tx.amount.to_sat()),
-// transaction_id: tx.tx_id.to_owned(),
-// },
-// ),
-// OnChainTransactionEvent::SentUnconfirmed(tx) => (
-// tx.address,
-// OnChainInvoiceCommand::SetPending {
-// amount: Amount::new(Currency::Btc, tx.amount.to_sat()),
-// },
-// ),
-// };
-// self.cqrs
-// .execute(&aggregate_id.to_string(), command)
-// .await
-// .map_err(|e| PaydayError::DbError(e.to_string()))?;
-// }
-// Ok(())
-// }
-//}
+// async fn create_invoice(
+// &self,
+// invoice_id: InvoiceId,
+// amount: Amount,
+// _memo: Option,
+// ) -> Result {
+// let address = self.on_chain_api.new_address().await?;
+// self.cqrs
+// .execute(
+// &address.to_string(),
+// OnChainInvoiceCommand::CreateInvoice {
+// invoice_id: invoice_id.to_string(),
+// amount,
+// address: address.to_string(),
+// },
+// )
+// .await
+// .map_err(|e| Error::DbError(e.to_string()))?;
+// Ok(Invoice {
+// service_name: self.name(),
+// invoice_id,
+// amount,
+// payment_type: self.supported_payment_type(),
+// payment_info: Value::String(address.to_string()),
+// })
+// }
+//
+// async fn process_payment_events(&self) -> Result<()> {
+// let mut subscriber = self.tx_stream.subscribe_events()?;
+// while let Some(event) = subscriber.recv().await {
+// let (aggregate_id, command) = match event {
+// OnChainTransactionEvent::ReceivedConfirmed(tx) => (
+// tx.address,
+// OnChainInvoiceCommand::SetConfirmed {
+// confirmations: tx.confirmations as u64,
+// amount: Amount::new(Currency::Btc, tx.amount.to_sat()),
+// transaction_id: tx.tx_id.to_owned(),
+// },
+// ),
+// OnChainTransactionEvent::ReceivedUnconfirmed(tx) => (
+// tx.address,
+// OnChainInvoiceCommand::SetPending {
+// amount: Amount::new(Currency::Btc, tx.amount.to_sat()),
+// },
+// ),
+// OnChainTransactionEvent::SentConfirmed(tx) => (
+// tx.address,
+// OnChainInvoiceCommand::SetConfirmed {
+// confirmations: tx.confirmations as u64,
+// amount: Amount::new(Currency::Btc, tx.amount.to_sat()),
+// transaction_id: tx.tx_id.to_owned(),
+// },
+// ),
+// OnChainTransactionEvent::SentUnconfirmed(tx) => (
+// tx.address,
+// OnChainInvoiceCommand::SetPending {
+// amount: Amount::new(Currency::Btc, tx.amount.to_sat()),
+// },
+// ),
+// };
+// self.cqrs
+// .execute(&aggregate_id.to_string(), command)
+// .await
+// .map_err(|e| Error::DbError(e.to_string()))?;
+// }
+// Ok(())
+// }
+// }