From 56a0a76e7b5397d640795d3bf5e84cf3969c35e7 Mon Sep 17 00:00:00 2001 From: tompro Date: Tue, 25 Feb 2025 19:08:52 +0100 Subject: [PATCH 1/2] Monitor --- Cargo.lock | 13 ++++ Cargo.toml | 8 ++- payday_axum/src/routes.rs | 1 + payday_core/src/api/lightining_api.rs | 11 +++- payday_core/src/api/on_chain_api.rs | 16 ++++- payday_core/src/persistence/offset.rs | 4 +- .../src/processor/lightning_processor.rs | 20 +++--- .../src/processor/on_chain_processor.rs | 27 ++++---- payday_monitor/Cargo.toml | 13 ++++ payday_monitor/src/lib.rs | 1 + payday_monitor/src/on_chain_handler.rs | 20 ++++++ payday_node_lnd/src/lnd.rs | 3 + payday_postgres/Cargo.toml | 14 ++-- payday_postgres/src/offset.rs | 65 ++++++++++--------- 14 files changed, 143 insertions(+), 73 deletions(-) create mode 100644 payday_monitor/Cargo.toml create mode 100644 payday_monitor/src/lib.rs create mode 100644 payday_monitor/src/on_chain_handler.rs diff --git a/Cargo.lock b/Cargo.lock index 6107f71..19078ae 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1813,6 +1813,19 @@ dependencies = [ "tokio-stream", ] +[[package]] +name = "payday_monitor" +version = "0.1.0" +dependencies = [ + "payday_core", + "payday_node_lnd", + "payday_postgres", + "postgres-es", + "sqlx", + "tokio", + "tracing", +] + [[package]] name = "payday_node_lnd" version = "0.1.0" diff --git a/Cargo.toml b/Cargo.toml index 94c5d5b..b4b1029 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,5 +1,11 @@ [workspace] -members = ["payday_axum", "payday_core", "payday_node_lnd", "payday_postgres"] +members = [ + "payday_axum", + "payday_core", + "payday_monitor", + "payday_node_lnd", + "payday_postgres", +] [workspace.dependencies] async-trait = "0.1.86" diff --git a/payday_axum/src/routes.rs b/payday_axum/src/routes.rs index e69de29..8b13789 100644 --- a/payday_axum/src/routes.rs +++ b/payday_axum/src/routes.rs @@ -0,0 +1 @@ + diff --git a/payday_core/src/api/lightining_api.rs b/payday_core/src/api/lightining_api.rs index 4273099..2162039 100644 --- a/payday_core/src/api/lightining_api.rs +++ b/payday_core/src/api/lightining_api.rs @@ -51,9 +51,8 @@ pub trait LightningTransactionStreamApi: Send + Sync { #[async_trait] pub trait LightningTransactionEventProcessorApi: Send + Sync { - fn node_id(&self) -> String; - async fn get_offset(&self) -> Result; - async fn set_offset(&self, settle_index: u64) -> Result<()>; + async fn get_offset(&self, id: &str) -> Result; + async fn set_offset(&self, id: &str, settle_index: u64) -> Result<()>; async fn process_event(&self, event: LightningTransactionEvent) -> Result<()>; } @@ -103,6 +102,12 @@ impl LightningTransactionEvent { LightningTransactionEvent::Settled(tx) => Some(tx.settle_index), } } + + pub fn node_id(&self) -> String { + match self { + LightningTransactionEvent::Settled(tx) => tx.node_id.to_owned(), + } + } } #[derive(Debug, Clone, PartialEq)] diff --git a/payday_core/src/api/on_chain_api.rs b/payday_core/src/api/on_chain_api.rs index 3308d4c..f90339a 100644 --- a/payday_core/src/api/on_chain_api.rs +++ b/payday_core/src/api/on_chain_api.rs @@ -58,9 +58,8 @@ pub trait OnChainTransactionApi: Send + Sync { #[async_trait] pub trait OnChainTransactionEventProcessorApi: Send + Sync { - fn node_id(&self) -> String; - async fn get_offset(&self) -> Result; - async fn set_block_height(&self, block_height: u64) -> Result<()>; + async fn get_offset(&self, id: &str) -> Result; + async fn set_block_height(&self, id: &str, block_height: u64) -> Result<()>; async fn process_event(&self, event: OnChainTransactionEvent) -> Result<()>; } @@ -71,6 +70,8 @@ pub trait OnChainTransactionEventHandler: Send + Sync { #[async_trait] pub trait OnChainTransactionStreamApi: Send + Sync { + fn node_id(&self) -> String; + async fn subscribe_on_chain_transactions( &self, sender: Sender, @@ -108,6 +109,15 @@ impl OnChainTransactionEvent { _ => None, } } + + pub fn node_id(&self) -> String { + match self { + OnChainTransactionEvent::ReceivedUnconfirmed(tx) => tx.node_id.to_owned(), + OnChainTransactionEvent::ReceivedConfirmed(tx) => tx.node_id.to_owned(), + OnChainTransactionEvent::SentUnconfirmed(tx) => tx.node_id.to_owned(), + OnChainTransactionEvent::SentConfirmed(tx) => tx.node_id.to_owned(), + } + } } #[derive(Debug, Clone)] diff --git a/payday_core/src/persistence/offset.rs b/payday_core/src/persistence/offset.rs index 03277c4..57f35b2 100644 --- a/payday_core/src/persistence/offset.rs +++ b/payday_core/src/persistence/offset.rs @@ -5,8 +5,8 @@ use crate::Result; #[async_trait] pub trait OffsetStoreApi: Send + Sync { - async fn get_offset(&self) -> Result; - async fn set_offset(&self, offset: u64) -> Result<()>; + async fn get_offset(&self, id: &str) -> Result; + async fn set_offset(&self, id: &str, offset: u64) -> Result<()>; } #[derive(Debug, Clone, Serialize, Deserialize)] diff --git a/payday_core/src/processor/lightning_processor.rs b/payday_core/src/processor/lightning_processor.rs index f98e29b..e8f93e0 100644 --- a/payday_core/src/processor/lightning_processor.rs +++ b/payday_core/src/processor/lightning_processor.rs @@ -9,19 +9,16 @@ use crate::{ use async_trait::async_trait; pub struct LightningTransactionProcessor { - node_id: String, settle_index_store: Box, handler: Box, } impl LightningTransactionProcessor { pub fn new( - node_id: &str, settle_index_store: Box, handler: Box, ) -> Self { Self { - node_id: node_id.to_string(), settle_index_store, handler, } @@ -30,22 +27,23 @@ impl LightningTransactionProcessor { #[async_trait] impl LightningTransactionEventProcessorApi for LightningTransactionProcessor { - fn node_id(&self) -> String { - self.node_id.to_string() - } - async fn get_offset(&self) -> Result { - self.settle_index_store.get_offset().await.map(|o| o.offset) + async fn get_offset(&self, id: &str) -> Result { + self.settle_index_store + .get_offset(id) + .await + .map(|o| o.offset) } - async fn set_offset(&self, block_height: u64) -> Result<()> { - self.settle_index_store.set_offset(block_height).await + async fn set_offset(&self, id: &str, block_height: u64) -> Result<()> { + self.settle_index_store.set_offset(id, block_height).await } async fn process_event(&self, event: LightningTransactionEvent) -> Result<()> { let index = event.settle_index(); + let node_id = event.node_id(); self.handler.process_event(event).await?; if let Some(idx) = index { - self.set_offset(idx).await?; + self.set_offset(&node_id, idx).await?; } Ok(()) } diff --git a/payday_core/src/processor/on_chain_processor.rs b/payday_core/src/processor/on_chain_processor.rs index 32b4112..c14ff7a 100644 --- a/payday_core/src/processor/on_chain_processor.rs +++ b/payday_core/src/processor/on_chain_processor.rs @@ -1,3 +1,5 @@ +use std::sync::Arc; + use crate::{ api::on_chain_api::{ OnChainTransactionEvent, OnChainTransactionEventHandler, @@ -9,19 +11,16 @@ use crate::{ use async_trait::async_trait; pub struct OnChainTransactionProcessor { - node_id: String, block_height_store: Box, - handler: Box, + handler: Arc, } impl OnChainTransactionProcessor { pub fn new( - node_id: &str, block_height_store: Box, - handler: Box, + handler: Arc, ) -> Self { Self { - node_id: node_id.to_string(), block_height_store, handler, } @@ -30,23 +29,23 @@ impl OnChainTransactionProcessor { #[async_trait] impl OnChainTransactionEventProcessorApi for OnChainTransactionProcessor { - fn node_id(&self) -> String { - self.node_id.to_string() - } - - async fn get_offset(&self) -> Result { - self.block_height_store.get_offset().await.map(|o| o.offset) + async fn get_offset(&self, id: &str) -> Result { + self.block_height_store + .get_offset(id) + .await + .map(|o| o.offset) } - async fn set_block_height(&self, block_height: u64) -> Result<()> { - self.block_height_store.set_offset(block_height).await + async fn set_block_height(&self, id: &str, block_height: u64) -> Result<()> { + self.block_height_store.set_offset(id, block_height).await } async fn process_event(&self, event: OnChainTransactionEvent) -> Result<()> { let block_height = event.block_height(); + let node_id = event.node_id(); self.handler.process_event(event).await?; if let Some(bh) = block_height { - self.set_block_height(bh as u64).await?; + self.set_block_height(&node_id, bh as u64).await?; } Ok(()) } diff --git a/payday_monitor/Cargo.toml b/payday_monitor/Cargo.toml new file mode 100644 index 0000000..55a988a --- /dev/null +++ b/payday_monitor/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "payday_monitor" +version = "0.1.0" +edition = "2024" + +[dependencies] +payday_core = { path = "../payday_core" } +payday_postgres = { path = "../payday_postgres" } +payday_node_lnd = { path = "../payday_node_lnd" } +tracing.workspace = true +postgres-es.workspace = true +sqlx.workspace = true +tokio.workspace = true diff --git a/payday_monitor/src/lib.rs b/payday_monitor/src/lib.rs new file mode 100644 index 0000000..b00a67f --- /dev/null +++ b/payday_monitor/src/lib.rs @@ -0,0 +1 @@ +pub mod on_chain_handler; diff --git a/payday_monitor/src/on_chain_handler.rs b/payday_monitor/src/on_chain_handler.rs new file mode 100644 index 0000000..c4d0b79 --- /dev/null +++ b/payday_monitor/src/on_chain_handler.rs @@ -0,0 +1,20 @@ +use std::sync::Arc; + +use payday_core::api::on_chain_api::{OnChainTransactionEventHandler, OnChainTransactionStreamApi}; + +#[allow(dead_code)] +pub struct OnChainEventHandler { + nodes: Vec>, + handler: Arc, +} + +impl OnChainEventHandler { + pub fn new( + nodes: Vec>, + handler: Arc, + ) -> Self { + Self { nodes, handler } + } + + pub async fn process_events(&self) {} +} diff --git a/payday_node_lnd/src/lnd.rs b/payday_node_lnd/src/lnd.rs index 125c8fa..c0130f7 100644 --- a/payday_node_lnd/src/lnd.rs +++ b/payday_node_lnd/src/lnd.rs @@ -230,6 +230,9 @@ impl LndPaymentEventStream { #[async_trait] impl OnChainTransactionStreamApi for LndPaymentEventStream { + fn node_id(&self) -> String { + self.config.node_id.to_owned() + } async fn subscribe_on_chain_transactions( &self, sender: Sender, diff --git a/payday_postgres/Cargo.toml b/payday_postgres/Cargo.toml index eb046bd..e3045b5 100644 --- a/payday_postgres/Cargo.toml +++ b/payday_postgres/Cargo.toml @@ -5,13 +5,13 @@ edition = "2021" [dependencies] payday_core = { path = "../payday_core" } -async-trait = { workspace = true } -cqrs-es = { workspace = true } -sqlx = { workspace = true } -serde = { workspace = true } -serde_json = { workspace = true } -tokio = { workspace = true } -postgres-es = { version = "0.4.11" } +async-trait.workspace = true +cqrs-es.workspace = true +postgres-es.workspace = true +sqlx.workspace = true +serde.workspace = true +serde_json.workspace = true +tokio.workspace = true [dev-dependencies] testcontainers = { version = "0.23" } diff --git a/payday_postgres/src/offset.rs b/payday_postgres/src/offset.rs index c5bd3fa..944d275 100644 --- a/payday_postgres/src/offset.rs +++ b/payday_postgres/src/offset.rs @@ -1,3 +1,5 @@ +use std::collections::HashMap; + use async_trait::async_trait; use payday_core::{ persistence::offset::{Offset, OffsetStoreApi}, @@ -8,36 +10,34 @@ use tokio::sync::Mutex; pub struct OffsetStore { db: Pool, - id: String, - current_offset: Box>>, + current_offset: Mutex>, } impl OffsetStore { - pub fn new(db: Pool, id: &str) -> Self { + pub fn new(db: Pool) -> Self { Self { db, - id: id.to_string(), - current_offset: Box::new(Mutex::new(None)), + current_offset: Mutex::new(HashMap::new()), } } - async fn get_cached(&self) -> Option { + async fn get_cached(&self, id: &str) -> Option { let cached = self.current_offset.lock().await; - *cached + cached.get(id).copied() } - async fn set_cached(&self, offset: u64) { + async fn set_cached(&self, id: &str, offset: u64) { let mut cached = self.current_offset.lock().await; - *cached = Some(offset); + cached.insert(id.to_owned(), offset); } - async fn get_offset_internal(&self) -> Result> { - let cached = self.get_cached().await; + async fn get_offset_internal(&self, id: &str) -> Result> { + let cached = self.get_cached(id).await; if let Some(cached) = cached { return Ok(Some(cached)); } let res: Option = sqlx::query("SELECT current_offset FROM offsets WHERE id = $1") - .bind(&self.id) + .bind(id) .fetch_optional(&self.db) .await .map_err(|e| Error::DbError(e.to_string()))? @@ -45,53 +45,53 @@ impl OffsetStore { match res.and_then(|r| u64::try_from(r).ok()) { Some(offset) => { - self.set_cached(offset).await; + self.set_cached(id, offset).await; Ok(Some(offset)) } _ => Ok(None), } } - async fn set_offset_internal(&self, offset: u64) -> Result<()> { - let existing: Option = self.get_offset_internal().await?; + async fn set_offset_internal(&self, id: &str, offset: u64) -> Result<()> { + let existing: Option = self.get_offset_internal(id).await?; if existing.is_some() { sqlx::query("UPDATE offsets SET current_offset = $1 WHERE id = $2") .bind(offset as i64) - .bind(&self.id) + .bind(id) .execute(&self.db) .await .map_err(|e| Error::DbError(e.to_string()))?; } else { sqlx::query("INSERT INTO offsets (id, current_offset) VALUES ($1, $2)") - .bind(&self.id) + .bind(id) .bind(offset as i64) .execute(&self.db) .await .map_err(|e| Error::DbError(e.to_string()))?; } - self.set_cached(offset).await; + self.set_cached(id, offset).await; Ok(()) } } #[async_trait] impl OffsetStoreApi for OffsetStore { - async fn get_offset(&self) -> Result { - let offset: Option = self.get_offset_internal().await?; + async fn get_offset(&self, id: &str) -> Result { + let offset: Option = self.get_offset_internal(id).await?; match offset { Some(offset) => Ok(Offset { - id: self.id.to_owned(), + id: id.to_owned(), offset, }), None => Ok(Offset { - id: self.id.to_owned(), + id: id.to_owned(), offset: 0, }), } } - async fn set_offset(&self, offset: u64) -> Result<()> { - self.set_offset_internal(offset).await + async fn set_offset(&self, id: &str, offset: u64) -> Result<()> { + self.set_offset_internal(id, offset).await } } @@ -103,9 +103,9 @@ mod tests { #[tokio::test] async fn test_get_set_offset_non_existant() { let db = get_postgres_pool().await; - let store = OffsetStore::new(db, "test_get_set_offset_non_existant"); + let store = OffsetStore::new(db); let result = store - .get_offset() + .get_offset("test_get_set_offset_non_existant") .await .expect("Query executed successfully"); assert!(result.offset == 0); @@ -113,19 +113,20 @@ mod tests { #[tokio::test] async fn test_get_set_offset() { + let id = "test_get_set_offset"; let db = get_postgres_pool().await; - let store = OffsetStore::new(db, "test_get_set_offset"); + let store = OffsetStore::new(db); store - .set_offset(10) + .set_offset(id, 10) .await .expect("Query executed successfully"); - assert!(store.current_offset.lock().await.is_some()); - assert!(store.get_cached().await.is_some()); - assert!(store.get_cached().await.unwrap().eq(&10)); + assert!(store.current_offset.lock().await.get(id).is_some()); + assert!(store.get_cached(id).await.is_some()); + assert!(store.get_cached(id).await.unwrap().eq(&10)); let result = store - .get_offset() + .get_offset(id) .await .expect("Query executed successfully"); assert!(result.offset == 10); From 023b656a717fe83cee99bd251991e16fea5cd0f7 Mon Sep 17 00:00:00 2001 From: tompro Date: Wed, 26 Feb 2025 16:02:06 +0100 Subject: [PATCH 2/2] On chain monitor service --- Cargo.lock | 29 ++++--- Cargo.toml | 7 +- {payday_monitor => payday}/Cargo.toml | 9 ++- payday/examples/on_chain_monitor.rs | 54 +++++++++++++ payday/src/lib.rs | 2 + payday/src/on_chain_processor.rs | 77 +++++++++++++++++++ payday/src/on_chain_service.rs | 34 ++++++++ payday_core/Cargo.toml | 1 + payday_core/src/api/on_chain_api.rs | 2 +- payday_core/src/error.rs | 28 ++++++- .../src/processor/on_chain_processor.rs | 16 ++-- payday_monitor/src/lib.rs | 1 - payday_monitor/src/on_chain_handler.rs | 20 ----- payday_node_lnd/src/lnd.rs | 10 ++- payday_node_lnd/src/wrapper.rs | 22 +++--- payday_postgres/src/lib.rs | 14 ++-- payday_postgres/src/offset.rs | 59 +++++++++----- 17 files changed, 294 insertions(+), 91 deletions(-) rename {payday_monitor => payday}/Cargo.toml (64%) create mode 100644 payday/examples/on_chain_monitor.rs create mode 100644 payday/src/lib.rs create mode 100644 payday/src/on_chain_processor.rs create mode 100644 payday/src/on_chain_service.rs delete mode 100644 payday_monitor/src/lib.rs delete mode 100644 payday_monitor/src/on_chain_handler.rs diff --git a/Cargo.lock b/Cargo.lock index 19078ae..1bbb4b1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1785,6 +1785,23 @@ dependencies = [ "syn 2.0.98", ] +[[package]] +name = "payday" +version = "0.1.0" +dependencies = [ + "anyhow", + "async-trait", + "bitcoin", + "payday_core", + "payday_node_lnd", + "payday_postgres", + "postgres-es", + "sqlx", + "tokio", + "tracing", + "tracing-subscriber", +] + [[package]] name = "payday_axum" version = "0.1.0" @@ -1811,18 +1828,6 @@ dependencies = [ "serde_json", "tokio", "tokio-stream", -] - -[[package]] -name = "payday_monitor" -version = "0.1.0" -dependencies = [ - "payday_core", - "payday_node_lnd", - "payday_postgres", - "postgres-es", - "sqlx", - "tokio", "tracing", ] diff --git a/Cargo.toml b/Cargo.toml index b4b1029..4bcfcd6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,8 +1,10 @@ [workspace] +resolver = "3" + members = [ "payday_axum", "payday_core", - "payday_monitor", + "payday", "payday_node_lnd", "payday_postgres", ] @@ -21,5 +23,6 @@ sqlx = { version = "0.8", features = ["postgres", "json"] } futures = "0.3.30" lightning-invoice = { version = "0.33.1", features = ["serde"] } tracing = "0.1" -tracing-subscriber = "0.3.18" +tracing-subscriber = "0.3.19" thiserror = "2" +anyhow = "1" diff --git a/payday_monitor/Cargo.toml b/payday/Cargo.toml similarity index 64% rename from payday_monitor/Cargo.toml rename to payday/Cargo.toml index 55a988a..d57c61f 100644 --- a/payday_monitor/Cargo.toml +++ b/payday/Cargo.toml @@ -1,5 +1,5 @@ [package] -name = "payday_monitor" +name = "payday" version = "0.1.0" edition = "2024" @@ -8,6 +8,13 @@ payday_core = { path = "../payday_core" } payday_postgres = { path = "../payday_postgres" } payday_node_lnd = { path = "../payday_node_lnd" } tracing.workspace = true +tracing-subscriber.workspace = true postgres-es.workspace = true sqlx.workspace = true tokio.workspace = true +async-trait.workspace = true +anyhow.workspace = true +bitcoin.workspace = true + +[[example]] +name = "on_chain_monitor" diff --git a/payday/examples/on_chain_monitor.rs b/payday/examples/on_chain_monitor.rs new file mode 100644 index 0000000..d00609f --- /dev/null +++ b/payday/examples/on_chain_monitor.rs @@ -0,0 +1,54 @@ +use std::sync::Arc; + +use bitcoin::Network; +use payday::{on_chain_processor::OnChainEventProcessor, on_chain_service::OnChainService}; +use payday_core::aggregate::on_chain_aggregate::OnChainInvoice; +use payday_node_lnd::lnd::{LndConfig, LndPaymentEventStream}; +use tracing::info; + +#[tokio::main] +async fn main() -> Result<(), Box> { + tracing_subscriber::fmt::init(); + info!("Starting on-chain monitor"); + + // db connection + let pool = payday_postgres::create_postgres_pool( + "postgres://postgres:password@localhost:5432/default", + ) + .await + .expect("DB connection"); + + // processes on chain transaction commands + let aggregate = payday_postgres::create_cqrs::(pool.clone(), vec![], ()) + .await + .expect("Aggregate instance"); + + // contains the event handler + let service = OnChainService::new(aggregate); + + let stream = Arc::new(LndPaymentEventStream::new(LndConfig { + node_id: "node1".to_string(), + address: "https://localhost:10008".to_string(), + cert_path: "tls.cert".to_string(), + macaroon_file: "admin.macaroon".to_string(), + network: Network::Signet, + })); + let stream2 = Arc::new(LndPaymentEventStream::new(LndConfig { + node_id: "node2".to_string(), + address: "https://localhost:10009".to_string(), + cert_path: "tls2.cert".to_string(), + macaroon_file: "admin2.macaroon".to_string(), + network: Network::Signet, + })); + + // consumes on chain events from all nodes + let on_chain_processor = + OnChainEventProcessor::new(pool, vec![stream, stream2], Arc::new(service)); + + // start the event monitor + let all = on_chain_processor.start().await; + all.join_all().await; + + info!("On-chain monitor finished"); + Ok(()) +} diff --git a/payday/src/lib.rs b/payday/src/lib.rs new file mode 100644 index 0000000..bcca7a8 --- /dev/null +++ b/payday/src/lib.rs @@ -0,0 +1,2 @@ +pub mod on_chain_processor; +pub mod on_chain_service; diff --git a/payday/src/on_chain_processor.rs b/payday/src/on_chain_processor.rs new file mode 100644 index 0000000..198ea64 --- /dev/null +++ b/payday/src/on_chain_processor.rs @@ -0,0 +1,77 @@ +use std::sync::Arc; + +use payday_core::api::on_chain_api::{ + OnChainTransactionEventHandler, OnChainTransactionEventProcessorApi, + OnChainTransactionStreamApi, +}; +use payday_core::persistence::offset::OffsetStoreApi; +use payday_core::processor::on_chain_processor::OnChainTransactionProcessor; +use payday_postgres::offset::OffsetStore; +use sqlx::{Pool, Postgres}; +use tokio::task::{JoinError, JoinSet}; +use tracing::{error, info}; + +#[allow(dead_code)] +pub struct OnChainEventProcessor { + pool: Pool, + nodes: Vec>, + handler: Arc, +} + +impl OnChainEventProcessor { + pub fn new( + pool: Pool, + nodes: Vec>, + handler: Arc, + ) -> Self { + Self { + pool, + nodes, + handler, + } + } + + pub async fn start(&self) -> JoinSet> { + let (snd, mut rcv) = tokio::sync::mpsc::channel(100); + let mut join_set = JoinSet::new(); + + let offset_store = OffsetStore::new( + self.pool.clone(), + Some("payday.offsets".to_string()), + Some("on_chain".to_string()), + ); + + for node in &self.nodes { + let start_height: Option = offset_store + .get_offset(&node.node_id()) + .await + .ok() + .map(|o| o.offset); + if let Ok(join) = node + .subscribe_on_chain_transactions(snd.clone(), start_height) + .await + { + join_set.spawn(join); + } else { + error!( + "Failed to subscribe to on chain transactions for node {}", + node.node_id() + ); + } + } + + let processor = + OnChainTransactionProcessor::new(Box::new(offset_store), self.handler.clone()); + let handle = tokio::spawn(async move { + while let Some(event) = rcv.recv().await { + info!("Received event: {:?}", event); + if let Err(err) = processor.process_event(event).await { + error!("Failed to process on chain event: {:?}", err); + } + } + }); + + join_set.spawn(handle); + join_set + } +} diff --git a/payday/src/on_chain_service.rs b/payday/src/on_chain_service.rs new file mode 100644 index 0000000..d8ae24a --- /dev/null +++ b/payday/src/on_chain_service.rs @@ -0,0 +1,34 @@ +use async_trait::async_trait; +use payday_core::{ + Result, + aggregate::on_chain_aggregate::{OnChainCommand, OnChainInvoice}, + api::on_chain_api::{OnChainTransactionEvent, OnChainTransactionEventHandler}, + persistence::cqrs::Cqrs, +}; +use postgres_es::PostgresEventRepository; +use tracing::{error, info}; + +pub struct OnChainService { + aggregate: Cqrs, +} + +impl OnChainService { + pub fn new(aggregate: Cqrs) -> Self { + Self { aggregate } + } +} + +#[async_trait] +impl OnChainTransactionEventHandler for OnChainService { + async fn process_event(&self, event: OnChainTransactionEvent) -> Result<()> { + info!("Received on-chain event: {:?}", event); + let command: OnChainCommand = event.into(); + info!("Executing on-chain command: {:?}", command); + if let Err(res) = self.aggregate.execute(&command.id, command.command).await { + error!("Failed to execute on-chain command with: {:?}", res); + } else { + info!("Successfully executed on-chain command"); + } + Ok(()) + } +} diff --git a/payday_core/Cargo.toml b/payday_core/Cargo.toml index 2e46546..a3816e8 100644 --- a/payday_core/Cargo.toml +++ b/payday_core/Cargo.toml @@ -13,3 +13,4 @@ tokio.workspace = true tokio-stream.workspace = true chrono.workspace = true lightning-invoice.workspace = true +tracing.workspace = true diff --git a/payday_core/src/api/on_chain_api.rs b/payday_core/src/api/on_chain_api.rs index f90339a..062fb02 100644 --- a/payday_core/src/api/on_chain_api.rs +++ b/payday_core/src/api/on_chain_api.rs @@ -75,7 +75,7 @@ pub trait OnChainTransactionStreamApi: Send + Sync { async fn subscribe_on_chain_transactions( &self, sender: Sender, - start_height: Option, + start_height: Option, ) -> Result>; } diff --git a/payday_core/src/error.rs b/payday_core/src/error.rs index 8b85b49..ec48825 100644 --- a/payday_core/src/error.rs +++ b/payday_core/src/error.rs @@ -1,21 +1,26 @@ use bitcoin::address::ParseError; use bitcoin::amount::ParseAmountError; use bitcoin::network::ParseNetworkError; +use cqrs_es::AggregateError; + +use crate::payment; #[derive(Debug)] pub enum Error { - NodeConnectError(String), - NodeApiError(String), + NodeConnect(String), + NodeApi(String), LightningPaymentFailed(String), InvalidInvoiceState(String), InvalidLightningInvoice(String), PublicKey(String), - DbError(String), + Db(String), InvalidBitcoinAddress(String), InvalidBitcoinNetwork(String), InvalidBitcoinAmount(String), - EventError(String), + Event(String), InvalidPaymentType(String), + Payment(String), + PaymentProcessing(String), } impl From for Error { @@ -46,3 +51,18 @@ impl From for Error { Error::InvalidLightningInvoice(value.to_string()) } } + +impl From for Error { + fn from(value: payment::Error) -> Self { + Error::Payment(value.to_string()) + } +} + +impl From> for Error { + fn from(value: AggregateError) -> Self { + match value { + AggregateError::UserError(e) => Error::Payment(e.to_string()), + _ => Error::PaymentProcessing(value.to_string()), + } + } +} diff --git a/payday_core/src/processor/on_chain_processor.rs b/payday_core/src/processor/on_chain_processor.rs index c14ff7a..6c0c71c 100644 --- a/payday_core/src/processor/on_chain_processor.rs +++ b/payday_core/src/processor/on_chain_processor.rs @@ -9,6 +9,7 @@ use crate::{ Result, }; use async_trait::async_trait; +use tracing::info; pub struct OnChainTransactionProcessor { block_height_store: Box, @@ -43,6 +44,11 @@ impl OnChainTransactionEventProcessorApi for OnChainTransactionProcessor { async fn process_event(&self, event: OnChainTransactionEvent) -> Result<()> { let block_height = event.block_height(); let node_id = event.node_id(); + info!( + "Processing on-chain event from node: {} with block_height {}", + node_id, + block_height.unwrap_or(0) + ); self.handler.process_event(event).await?; if let Some(bh) = block_height { self.set_block_height(&node_id, bh as u64).await?; @@ -50,13 +56,3 @@ impl OnChainTransactionEventProcessorApi for OnChainTransactionProcessor { Ok(()) } } - -pub struct OnChainTransactionPrintHandler; - -#[async_trait] -impl OnChainTransactionEventHandler for OnChainTransactionPrintHandler { - async fn process_event(&self, event: OnChainTransactionEvent) -> Result<()> { - println!("OnChainEventTransactionEvent: {:?}", event); - Ok(()) - } -} diff --git a/payday_monitor/src/lib.rs b/payday_monitor/src/lib.rs deleted file mode 100644 index b00a67f..0000000 --- a/payday_monitor/src/lib.rs +++ /dev/null @@ -1 +0,0 @@ -pub mod on_chain_handler; diff --git a/payday_monitor/src/on_chain_handler.rs b/payday_monitor/src/on_chain_handler.rs deleted file mode 100644 index c4d0b79..0000000 --- a/payday_monitor/src/on_chain_handler.rs +++ /dev/null @@ -1,20 +0,0 @@ -use std::sync::Arc; - -use payday_core::api::on_chain_api::{OnChainTransactionEventHandler, OnChainTransactionStreamApi}; - -#[allow(dead_code)] -pub struct OnChainEventHandler { - nodes: Vec>, - handler: Arc, -} - -impl OnChainEventHandler { - pub fn new( - nodes: Vec>, - handler: Arc, - ) -> Self { - Self { nodes, handler } - } - - pub async fn process_events(&self) {} -} diff --git a/payday_node_lnd/src/lnd.rs b/payday_node_lnd/src/lnd.rs index c0130f7..8992e38 100644 --- a/payday_node_lnd/src/lnd.rs +++ b/payday_node_lnd/src/lnd.rs @@ -221,9 +221,11 @@ impl LndPaymentEventStream { } /// does fetch potential missing events from the current start_height - async fn start_subscription(&self, start_height: i32) -> Result> { + async fn start_subscription(&self, start_height: u64) -> Result> { let lnd = Lnd::new(self.config.clone()).await?; - let events = lnd.get_onchain_transactions(start_height, -1).await?; + let events = lnd + .get_onchain_transactions(start_height as i32, -1) + .await?; Ok(events) } } @@ -236,7 +238,7 @@ impl OnChainTransactionStreamApi for LndPaymentEventStream { async fn subscribe_on_chain_transactions( &self, sender: Sender, - start_height: Option, + start_height: Option, ) -> Result> { let config = self.config.clone(); let start_events = self @@ -277,7 +279,7 @@ impl OnChainTransactionStreamApi for LndPaymentEventStream { if let Ok(events) = to_on_chain_events(&event, config.network, &config.node_id) { for event in events { if let Err(e) = sender.send(event).await { - println!("Failed to send on chain transaction event: {:?}", e); + println!("Failed to send on chain transaction event: {}", e); } } } diff --git a/payday_node_lnd/src/wrapper.rs b/payday_node_lnd/src/wrapper.rs index c6fc3de..01e6943 100644 --- a/payday_node_lnd/src/wrapper.rs +++ b/payday_node_lnd/src/wrapper.rs @@ -39,13 +39,13 @@ impl LndRpcWrapper { config.macaroon_file.to_string(), ) .await - .map_err(|e| Error::NodeConnectError(e.to_string()))?; + .map_err(|e| Error::NodeConnect(e.to_string()))?; let network_info = lnd .lightning() .get_info(GetInfoRequest {}) .await - .map_err(|e| Error::NodeApiError(e.to_string()))? + .map_err(|e| Error::NodeApi(e.to_string()))? .into_inner() .chains .first() @@ -78,7 +78,7 @@ impl LndRpcWrapper { .lightning() .wallet_balance(WalletBalanceRequest {}) .await - .map_err(|e| Error::NodeApiError(e.to_string()))? + .map_err(|e| Error::NodeApi(e.to_string()))? .into_inner()) } @@ -88,7 +88,7 @@ impl LndRpcWrapper { .lightning() .channel_balance(ChannelBalanceRequest {}) .await - .map_err(|e| Error::NodeApiError(e.to_string()))? + .map_err(|e| Error::NodeApi(e.to_string()))? .into_inner()) } @@ -110,7 +110,7 @@ impl LndRpcWrapper { ..Default::default() }) .await - .map_err(|e| Error::NodeApiError(e.to_string()))? + .map_err(|e| Error::NodeApi(e.to_string()))? .into_inner() .address; let address = to_address(&addr, self.config.network)?; @@ -137,7 +137,7 @@ impl LndRpcWrapper { ..Default::default() }) .await - .map_err(|e| Error::NodeApiError(e.to_string()))? + .map_err(|e| Error::NodeApi(e.to_string()))? .into_inner() .txid; @@ -164,7 +164,7 @@ impl LndRpcWrapper { ..Default::default() }) .await - .map_err(|e| Error::NodeApiError(e.to_string()))? + .map_err(|e| Error::NodeApi(e.to_string()))? .into_inner() .txid; @@ -187,7 +187,7 @@ impl LndRpcWrapper { ..Default::default() }) .await - .map_err(|e| Error::NodeApiError(e.to_string()))? + .map_err(|e| Error::NodeApi(e.to_string()))? .into_inner() .sat_per_vbyte; @@ -211,7 +211,7 @@ impl LndRpcWrapper { ..Default::default() }) .await - .map_err(|e| Error::NodeApiError(e.to_string()))? + .map_err(|e| Error::NodeApi(e.to_string()))? .into_inner(); Ok(LnInvoice { @@ -232,7 +232,7 @@ impl LndRpcWrapper { .router() .send_payment_v2(request) .await - .map_err(|e| Error::NodeApiError(e.to_string()))?; + .map_err(|e| Error::NodeApi(e.to_string()))?; // subscribe until the first non-inflight payment is received match result @@ -311,7 +311,7 @@ impl LndRpcWrapper { ..Default::default() }) .await - .map_err(|e| Error::NodeApiError(e.to_string()))? + .map_err(|e| Error::NodeApi(e.to_string()))? .into_inner() .transactions) } diff --git a/payday_postgres/src/lib.rs b/payday_postgres/src/lib.rs index e90191c..cae508a 100644 --- a/payday_postgres/src/lib.rs +++ b/payday_postgres/src/lib.rs @@ -1,15 +1,15 @@ pub mod btc_onchain; pub mod offset; -use cqrs_es::{Aggregate, Query}; +use cqrs_es::{persist::PersistedEventStore, Aggregate, CqrsFramework, Query}; use payday_core::{persistence::cqrs::Cqrs, Error, Result}; -use postgres_es::{postgres_cqrs, PostgresEventRepository}; +use postgres_es::PostgresEventRepository; use sqlx::{Pool, Postgres}; pub async fn create_postgres_pool(connection_string: &str) -> Result> { let pool = sqlx::PgPool::connect(connection_string) .await - .map_err(|e| Error::DbError(e.to_string()))?; + .map_err(|e| Error::Db(e.to_string()))?; Ok(pool) } @@ -18,7 +18,7 @@ pub async fn init_tables(pool: Pool) -> Result<()> { sqlx::raw_sql(sql) .execute(&pool) .await - .map_err(|e| Error::DbError(e.to_string()))?; + .map_err(|e| Error::Db(e.to_string()))?; Ok(()) } @@ -30,8 +30,10 @@ pub async fn create_cqrs( where A: Aggregate, { - let cqrs = postgres_cqrs(pool, queries, services); - Ok(cqrs) + //let cqrs = postgres_cqrs(pool, queries, services); + let repo = PostgresEventRepository::new(pool).with_tables("payday.events", "payday.snapshots"); + let store = PersistedEventStore::new_event_store(repo); + Ok(CqrsFramework::new(store, queries, services)) } #[cfg(test)] diff --git a/payday_postgres/src/offset.rs b/payday_postgres/src/offset.rs index 944d275..b853c98 100644 --- a/payday_postgres/src/offset.rs +++ b/payday_postgres/src/offset.rs @@ -11,24 +11,52 @@ use tokio::sync::Mutex; pub struct OffsetStore { db: Pool, current_offset: Mutex>, + prefix: Option, + table_name: String, } impl OffsetStore { - pub fn new(db: Pool) -> Self { + pub fn new(db: Pool, table_name: Option, id_prefix: Option) -> Self { Self { db, current_offset: Mutex::new(HashMap::new()), + prefix: id_prefix, + table_name: table_name.unwrap_or("offsets".to_owned()), + } + } + + fn with_prefix(&self, id: &str) -> String { + match &self.prefix { + Some(prefix) => format!("{}:{}", prefix, id), + None => id.to_owned(), } } async fn get_cached(&self, id: &str) -> Option { let cached = self.current_offset.lock().await; - cached.get(id).copied() + cached.get(&self.with_prefix(id)).copied() } async fn set_cached(&self, id: &str, offset: u64) { + if offset <= self.get_cached(id).await.unwrap_or(0) { + return; + } let mut cached = self.current_offset.lock().await; - cached.insert(id.to_owned(), offset); + cached.insert(self.with_prefix(id), offset); + } + + fn select_query(&self) -> String { + format!( + "SELECT current_offset FROM {} WHERE id = $1", + self.table_name + ) + } + + fn upsert_query(&self) -> String { + format!( + "INSERT INTO {} (id, current_offset) VALUES ($1, $2) ON CONFLICT (id) DO UPDATE SET current_offset = $2", + self.table_name + ) } async fn get_offset_internal(&self, id: &str) -> Result> { @@ -36,11 +64,11 @@ impl OffsetStore { if let Some(cached) = cached { return Ok(Some(cached)); } - let res: Option = sqlx::query("SELECT current_offset FROM offsets WHERE id = $1") - .bind(id) + let res: Option = sqlx::query(self.select_query().as_str()) + .bind(self.with_prefix(id)) .fetch_optional(&self.db) .await - .map_err(|e| Error::DbError(e.to_string()))? + .map_err(|e| Error::Db(e.to_string()))? .map(|r| r.get("current_offset")); match res.and_then(|r| u64::try_from(r).ok()) { @@ -54,20 +82,13 @@ impl OffsetStore { async fn set_offset_internal(&self, id: &str, offset: u64) -> Result<()> { let existing: Option = self.get_offset_internal(id).await?; - if existing.is_some() { - sqlx::query("UPDATE offsets SET current_offset = $1 WHERE id = $2") - .bind(offset as i64) - .bind(id) - .execute(&self.db) - .await - .map_err(|e| Error::DbError(e.to_string()))?; - } else { - sqlx::query("INSERT INTO offsets (id, current_offset) VALUES ($1, $2)") - .bind(id) + if existing.is_none_or(|v| v <= offset) { + sqlx::query(self.upsert_query().as_str()) + .bind(self.with_prefix(id)) .bind(offset as i64) .execute(&self.db) .await - .map_err(|e| Error::DbError(e.to_string()))?; + .map_err(|e| Error::Db(e.to_string()))?; } self.set_cached(id, offset).await; Ok(()) @@ -103,7 +124,7 @@ mod tests { #[tokio::test] async fn test_get_set_offset_non_existant() { let db = get_postgres_pool().await; - let store = OffsetStore::new(db); + let store = OffsetStore::new(db, None, None); let result = store .get_offset("test_get_set_offset_non_existant") .await @@ -115,7 +136,7 @@ mod tests { async fn test_get_set_offset() { let id = "test_get_set_offset"; let db = get_postgres_pool().await; - let store = OffsetStore::new(db); + let store = OffsetStore::new(db, None, None); store .set_offset(id, 10) .await