Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

13 changes: 11 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
[workspace]
members = ["payday_axum", "payday_core", "payday_node_lnd", "payday_postgres"]
resolver = "3"

members = [
"payday_axum",
"payday_core",
"payday",
"payday_node_lnd",
"payday_postgres",
]

[workspace.dependencies]
async-trait = "0.1.86"
Expand All @@ -15,5 +23,6 @@ sqlx = { version = "0.8", features = ["postgres", "json"] }
futures = "0.3.30"
lightning-invoice = { version = "0.33.1", features = ["serde"] }
tracing = "0.1"
tracing-subscriber = "0.3.18"
tracing-subscriber = "0.3.19"
thiserror = "2"
anyhow = "1"
20 changes: 20 additions & 0 deletions payday/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[package]
name = "payday"
version = "0.1.0"
edition = "2024"

[dependencies]
payday_core = { path = "../payday_core" }
payday_postgres = { path = "../payday_postgres" }
payday_node_lnd = { path = "../payday_node_lnd" }
tracing.workspace = true
tracing-subscriber.workspace = true
postgres-es.workspace = true
sqlx.workspace = true
tokio.workspace = true
async-trait.workspace = true
anyhow.workspace = true
bitcoin.workspace = true

[[example]]
name = "on_chain_monitor"
54 changes: 54 additions & 0 deletions payday/examples/on_chain_monitor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use std::sync::Arc;

use bitcoin::Network;
use payday::{on_chain_processor::OnChainEventProcessor, on_chain_service::OnChainService};
use payday_core::aggregate::on_chain_aggregate::OnChainInvoice;
use payday_node_lnd::lnd::{LndConfig, LndPaymentEventStream};
use tracing::info;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt::init();
info!("Starting on-chain monitor");

// db connection
let pool = payday_postgres::create_postgres_pool(
"postgres://postgres:password@localhost:5432/default",
)
.await
.expect("DB connection");

// processes on chain transaction commands
let aggregate = payday_postgres::create_cqrs::<OnChainInvoice>(pool.clone(), vec![], ())
.await
.expect("Aggregate instance");

// contains the event handler
let service = OnChainService::new(aggregate);

let stream = Arc::new(LndPaymentEventStream::new(LndConfig {
node_id: "node1".to_string(),
address: "https://localhost:10008".to_string(),
cert_path: "tls.cert".to_string(),
macaroon_file: "admin.macaroon".to_string(),
network: Network::Signet,
}));
let stream2 = Arc::new(LndPaymentEventStream::new(LndConfig {
node_id: "node2".to_string(),
address: "https://localhost:10009".to_string(),
cert_path: "tls2.cert".to_string(),
macaroon_file: "admin2.macaroon".to_string(),
network: Network::Signet,
}));

// consumes on chain events from all nodes
let on_chain_processor =
OnChainEventProcessor::new(pool, vec![stream, stream2], Arc::new(service));

// start the event monitor
let all = on_chain_processor.start().await;
all.join_all().await;

info!("On-chain monitor finished");
Ok(())
}
2 changes: 2 additions & 0 deletions payday/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
pub mod on_chain_processor;
pub mod on_chain_service;
77 changes: 77 additions & 0 deletions payday/src/on_chain_processor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
use std::sync::Arc;

use payday_core::api::on_chain_api::{
OnChainTransactionEventHandler, OnChainTransactionEventProcessorApi,
OnChainTransactionStreamApi,
};
use payday_core::persistence::offset::OffsetStoreApi;
use payday_core::processor::on_chain_processor::OnChainTransactionProcessor;
use payday_postgres::offset::OffsetStore;
use sqlx::{Pool, Postgres};
use tokio::task::{JoinError, JoinSet};
use tracing::{error, info};

#[allow(dead_code)]
pub struct OnChainEventProcessor {
pool: Pool<Postgres>,
nodes: Vec<Arc<dyn OnChainTransactionStreamApi>>,
handler: Arc<dyn OnChainTransactionEventHandler>,
}

impl OnChainEventProcessor {
pub fn new(
pool: Pool<Postgres>,
nodes: Vec<Arc<dyn OnChainTransactionStreamApi>>,
handler: Arc<dyn OnChainTransactionEventHandler>,
) -> Self {
Self {
pool,
nodes,
handler,
}
}

pub async fn start(&self) -> JoinSet<Result<(), JoinError>> {
let (snd, mut rcv) = tokio::sync::mpsc::channel(100);
let mut join_set = JoinSet::new();

let offset_store = OffsetStore::new(
self.pool.clone(),
Some("payday.offsets".to_string()),
Some("on_chain".to_string()),
);

for node in &self.nodes {
let start_height: Option<u64> = offset_store
.get_offset(&node.node_id())
.await
.ok()
.map(|o| o.offset);
if let Ok(join) = node
.subscribe_on_chain_transactions(snd.clone(), start_height)
.await
{
join_set.spawn(join);
} else {
error!(
"Failed to subscribe to on chain transactions for node {}",
node.node_id()
);
}
}

let processor =
OnChainTransactionProcessor::new(Box::new(offset_store), self.handler.clone());
let handle = tokio::spawn(async move {
while let Some(event) = rcv.recv().await {
info!("Received event: {:?}", event);
if let Err(err) = processor.process_event(event).await {
error!("Failed to process on chain event: {:?}", err);
}
}
});

join_set.spawn(handle);
join_set
}
}
34 changes: 34 additions & 0 deletions payday/src/on_chain_service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
use async_trait::async_trait;
use payday_core::{
Result,
aggregate::on_chain_aggregate::{OnChainCommand, OnChainInvoice},
api::on_chain_api::{OnChainTransactionEvent, OnChainTransactionEventHandler},
persistence::cqrs::Cqrs,
};
use postgres_es::PostgresEventRepository;
use tracing::{error, info};

pub struct OnChainService {
aggregate: Cqrs<OnChainInvoice, PostgresEventRepository>,
}

impl OnChainService {
pub fn new(aggregate: Cqrs<OnChainInvoice, PostgresEventRepository>) -> Self {
Self { aggregate }
}
}

#[async_trait]
impl OnChainTransactionEventHandler for OnChainService {
async fn process_event(&self, event: OnChainTransactionEvent) -> Result<()> {
info!("Received on-chain event: {:?}", event);
let command: OnChainCommand = event.into();
info!("Executing on-chain command: {:?}", command);
if let Err(res) = self.aggregate.execute(&command.id, command.command).await {
error!("Failed to execute on-chain command with: {:?}", res);
} else {
info!("Successfully executed on-chain command");
}
Ok(())
}
}
1 change: 1 addition & 0 deletions payday_axum/src/routes.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@

1 change: 1 addition & 0 deletions payday_core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,4 @@ tokio.workspace = true
tokio-stream.workspace = true
chrono.workspace = true
lightning-invoice.workspace = true
tracing.workspace = true
11 changes: 8 additions & 3 deletions payday_core/src/api/lightining_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,9 +51,8 @@ pub trait LightningTransactionStreamApi: Send + Sync {

#[async_trait]
pub trait LightningTransactionEventProcessorApi: Send + Sync {
fn node_id(&self) -> String;
async fn get_offset(&self) -> Result<u64>;
async fn set_offset(&self, settle_index: u64) -> Result<()>;
async fn get_offset(&self, id: &str) -> Result<u64>;
async fn set_offset(&self, id: &str, settle_index: u64) -> Result<()>;
async fn process_event(&self, event: LightningTransactionEvent) -> Result<()>;
}

Expand Down Expand Up @@ -103,6 +102,12 @@ impl LightningTransactionEvent {
LightningTransactionEvent::Settled(tx) => Some(tx.settle_index),
}
}

pub fn node_id(&self) -> String {
match self {
LightningTransactionEvent::Settled(tx) => tx.node_id.to_owned(),
}
}
}

#[derive(Debug, Clone, PartialEq)]
Expand Down
18 changes: 14 additions & 4 deletions payday_core/src/api/on_chain_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,8 @@ pub trait OnChainTransactionApi: Send + Sync {

#[async_trait]
pub trait OnChainTransactionEventProcessorApi: Send + Sync {
fn node_id(&self) -> String;
async fn get_offset(&self) -> Result<u64>;
async fn set_block_height(&self, block_height: u64) -> Result<()>;
async fn get_offset(&self, id: &str) -> Result<u64>;
async fn set_block_height(&self, id: &str, block_height: u64) -> Result<()>;
async fn process_event(&self, event: OnChainTransactionEvent) -> Result<()>;
}

Expand All @@ -71,10 +70,12 @@ pub trait OnChainTransactionEventHandler: Send + Sync {

#[async_trait]
pub trait OnChainTransactionStreamApi: Send + Sync {
fn node_id(&self) -> String;

async fn subscribe_on_chain_transactions(
&self,
sender: Sender<OnChainTransactionEvent>,
start_height: Option<i32>,
start_height: Option<u64>,
) -> Result<JoinHandle<()>>;
}

Expand Down Expand Up @@ -108,6 +109,15 @@ impl OnChainTransactionEvent {
_ => None,
}
}

pub fn node_id(&self) -> String {
match self {
OnChainTransactionEvent::ReceivedUnconfirmed(tx) => tx.node_id.to_owned(),
OnChainTransactionEvent::ReceivedConfirmed(tx) => tx.node_id.to_owned(),
OnChainTransactionEvent::SentUnconfirmed(tx) => tx.node_id.to_owned(),
OnChainTransactionEvent::SentConfirmed(tx) => tx.node_id.to_owned(),
}
}
}

#[derive(Debug, Clone)]
Expand Down
28 changes: 24 additions & 4 deletions payday_core/src/error.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,26 @@
use bitcoin::address::ParseError;
use bitcoin::amount::ParseAmountError;
use bitcoin::network::ParseNetworkError;
use cqrs_es::AggregateError;

use crate::payment;

#[derive(Debug)]
pub enum Error {
NodeConnectError(String),
NodeApiError(String),
NodeConnect(String),
NodeApi(String),
LightningPaymentFailed(String),
InvalidInvoiceState(String),
InvalidLightningInvoice(String),
PublicKey(String),
DbError(String),
Db(String),
InvalidBitcoinAddress(String),
InvalidBitcoinNetwork(String),
InvalidBitcoinAmount(String),
EventError(String),
Event(String),
InvalidPaymentType(String),
Payment(String),
PaymentProcessing(String),
}

impl From<ParseNetworkError> for Error {
Expand Down Expand Up @@ -46,3 +51,18 @@ impl From<lightning_invoice::ParseOrSemanticError> for Error {
Error::InvalidLightningInvoice(value.to_string())
}
}

impl From<payment::Error> for Error {
fn from(value: payment::Error) -> Self {
Error::Payment(value.to_string())
}
}

impl From<AggregateError<payment::Error>> for Error {
fn from(value: AggregateError<payment::Error>) -> Self {
match value {
AggregateError::UserError(e) => Error::Payment(e.to_string()),
_ => Error::PaymentProcessing(value.to_string()),
}
}
}
4 changes: 2 additions & 2 deletions payday_core/src/persistence/offset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ use crate::Result;

#[async_trait]
pub trait OffsetStoreApi: Send + Sync {
async fn get_offset(&self) -> Result<Offset>;
async fn set_offset(&self, offset: u64) -> Result<()>;
async fn get_offset(&self, id: &str) -> Result<Offset>;
async fn set_offset(&self, id: &str, offset: u64) -> Result<()>;
}

#[derive(Debug, Clone, Serialize, Deserialize)]
Expand Down
Loading