Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,496 changes: 414 additions & 2,082 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 1 addition & 7 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ edition = "2021"
[dependencies]
payday_core = { path = "./payday_core" }
payday_node_lnd = { path = "./payday_node_lnd" }
payday_surrealdb = { path = "./payday_surrealdb" }
payday_postgres = { path = "./payday_postgres" }
tokio.workspace = true
bitcoin.workspace = true
Expand All @@ -15,12 +14,7 @@ serde_json.workspace = true
tokio-stream.workspace = true

[workspace]
members = [
"payday_core",
"payday_node_lnd",
"payday_postgres",
"payday_surrealdb",
]
members = ["payday_core", "payday_node_lnd", "payday_postgres"]

[workspace.dependencies]
async-trait = "0.1.86"
Expand Down
2 changes: 1 addition & 1 deletion payday_core/src/api/on_chain_api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ pub trait OnChainTransactionApi: Send + Sync {
#[async_trait]
pub trait OnChainTransactionEventProcessorApi: Send + Sync {
fn node_id(&self) -> String;
async fn get_block_height(&self) -> Result<i32>;
async fn get_offset(&self) -> Result<i32>;
async fn set_block_height(&self, block_height: i32) -> Result<()>;
async fn process_event(&self, event: OnChainTransactionEvent) -> Result<()>;
}
Expand Down
16 changes: 0 additions & 16 deletions payday_core/src/persistence/block_height.rs

This file was deleted.

2 changes: 1 addition & 1 deletion payday_core/src/persistence/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
pub mod block_height;
pub mod cqrs;
pub mod offset;
16 changes: 16 additions & 0 deletions payday_core/src/persistence/offset.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
use async_trait::async_trait;
use serde::{Deserialize, Serialize};

use crate::Result;

#[async_trait]
pub trait OffsetStoreApi: Send + Sync {
async fn get_offset(&self) -> Result<Offset>;
async fn set_offset(&self, offset: u64) -> Result<()>;
}

#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Offset {
pub id: String,
pub offset: u64,
}
16 changes: 6 additions & 10 deletions payday_core/src/processor/on_chain_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,23 +5,23 @@ use crate::{
OnChainTransactionEvent, OnChainTransactionEventHandler,
OnChainTransactionEventProcessorApi,
},
persistence::block_height::BlockHeightStoreApi,
persistence::offset::OffsetStoreApi,
Result,
};
use async_trait::async_trait;
use tokio::sync::Mutex;

pub struct OnChainTransactionProcessor {
node_id: String,
block_height_store: Box<dyn BlockHeightStoreApi>,
block_height_store: Box<dyn OffsetStoreApi>,
handler: Box<dyn OnChainTransactionEventHandler>,
current_block_height: Arc<Mutex<i32>>,
}

impl OnChainTransactionProcessor {
pub fn new(
node_id: &str,
block_height_store: Box<dyn BlockHeightStoreApi>,
block_height_store: Box<dyn OffsetStoreApi>,
handler: Box<dyn OnChainTransactionEventHandler>,
) -> Self {
Self {
Expand All @@ -38,22 +38,18 @@ impl OnChainTransactionEventProcessorApi for OnChainTransactionProcessor {
fn node_id(&self) -> String {
self.node_id.to_string()
}
async fn get_block_height(&self) -> Result<i32> {
async fn get_offset(&self) -> Result<i32> {
let mut current_block_height = self.current_block_height.lock().await;
if *current_block_height < 0 {
*current_block_height = self
.block_height_store
.get_block_height(&self.node_id)
.await?
.block_height as i32;
*current_block_height = self.block_height_store.get_offset().await?.offset as i32;
}
Ok(*current_block_height)
}
async fn set_block_height(&self, block_height: i32) -> Result<()> {
let mut current_block_height = self.current_block_height.lock().await;
if *current_block_height < block_height {
self.block_height_store
.set_block_height(&self.node_id, block_height as u64)
.set_offset(block_height as u64)
.await?;
*current_block_height = block_height;
}
Expand Down
4 changes: 4 additions & 0 deletions payday_postgres/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,7 @@ serde = { workspace = true }
serde_json = { workspace = true }
tokio = { workspace = true }
postgres-es = { version = "0.4.11" }

[dev-dependencies]
testcontainers = { version = "0.23" }
testcontainers-modules = { version = "0.11.5", features = ["postgres"] }
30 changes: 30 additions & 0 deletions payday_postgres/db/migrations/init.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
-- a single table is used for all events in the cqrs system
CREATE TABLE IF NOT EXISTS events
(
aggregate_type text NOT NULL,
aggregate_id text NOT NULL,
sequence bigint CHECK (sequence >= 0) NOT NULL,
event_type text NOT NULL,
event_version text NOT NULL,
payload json NOT NULL,
metadata json NOT NULL,
PRIMARY KEY (aggregate_type, aggregate_id, sequence)
);

-- this table is only needed if snapshotting is employed
CREATE TABLE IF NOT EXISTS snapshots
(
aggregate_type text NOT NULL,
aggregate_id text NOT NULL,
last_sequence bigint CHECK (last_sequence >= 0) NOT NULL,
current_snapshot bigint CHECK (current_snapshot >= 0) NOT NULL,
payload json NOT NULL,
PRIMARY KEY (aggregate_type, aggregate_id, last_sequence)
);

-- stores offset of different event streams
CREATE TABLE IF NOT EXISTS offsets
(
id text NOT NULL PRIMARY KEY,
current_offset bigint NOT NULL
);
65 changes: 0 additions & 65 deletions payday_postgres/src/block_height.rs

This file was deleted.

34 changes: 34 additions & 0 deletions payday_postgres/src/db/migrations/setup.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
-- This is just the basic schema. To put this into production you should
-- provide this within some sanitized migration script or process. Here it is
-- mainly used to bootstrap tables for test runs.

-- a single table is used for all events in the cqrs system
CREATE TABLE IF NOT EXISTS events
(
aggregate_type text NOT NULL,
aggregate_id text NOT NULL,
sequence bigint CHECK (sequence >= 0) NOT NULL,
event_type text NOT NULL,
event_version text NOT NULL,
payload json NOT NULL,
metadata json NOT NULL,
PRIMARY KEY (aggregate_type, aggregate_id, sequence)
);

-- this table is only needed if snapshotting is employed
CREATE TABLE IF NOT EXISTS snapshots
(
aggregate_type text NOT NULL,
aggregate_id text NOT NULL,
last_sequence bigint CHECK (last_sequence >= 0) NOT NULL,
current_snapshot bigint CHECK (current_snapshot >= 0) NOT NULL,
payload json NOT NULL,
PRIMARY KEY (aggregate_type, aggregate_id, last_sequence)
);

-- stores offset of different event streams
CREATE TABLE IF NOT EXISTS offsets
(
id text NOT NULL PRIMARY KEY,
current_offset bigint NOT NULL
);
62 changes: 61 additions & 1 deletion payday_postgres/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
pub mod block_height;
pub mod btc_onchain;
pub mod offset;

use cqrs_es::{Aggregate, Query};
use payday_core::{persistence::cqrs::Cqrs, Error, Result};
Expand All @@ -13,6 +13,15 @@ pub async fn create_postgres_pool(connection_string: &str) -> Result<Pool<Postgr
Ok(pool)
}

pub async fn init_tables(pool: Pool<Postgres>) -> Result<()> {
let sql = include_str!("../db/migrations/init.sql");
sqlx::raw_sql(sql)
.execute(&pool)
.await
.map_err(|e| Error::DbError(e.to_string()))?;
Ok(())
}

pub async fn create_cqrs<A>(
pool: Pool<Postgres>,
queries: Vec<Box<dyn Query<A>>>,
Expand All @@ -24,3 +33,54 @@ where
let cqrs = postgres_cqrs(pool, queries, services);
Ok(cqrs)
}

#[cfg(test)]
mod test_utils {
use sqlx::{Pool, Postgres};
use testcontainers::ContainerAsync;
use testcontainers_modules::testcontainers::runners::AsyncRunner;
use tokio::sync::OnceCell;

static POSTGRES_CONTAINER: OnceCell<
ContainerAsync<testcontainers_modules::postgres::Postgres>,
> = OnceCell::const_new();

async fn get_postgres_container(
) -> &'static ContainerAsync<testcontainers_modules::postgres::Postgres> {
POSTGRES_CONTAINER
.get_or_init(|| async {
testcontainers_modules::postgres::Postgres::default()
.start()
.await
.expect("unable to start postgres container")
})
.await
}

static POSTGRES_POOL: OnceCell<Pool<Postgres>> = OnceCell::const_new();

/// crates a static postgres that will be the same for all tests. So when testing
/// keep in mind that the database might have state from other tests.
pub async fn get_postgres_pool() -> Pool<Postgres> {
let pool = POSTGRES_POOL
.get_or_init(|| async {
let container = get_postgres_container().await;
let connection_string = format!(
"postgres://postgres:postgres@127.0.0.1:{}/postgres",
container
.get_host_port_ipv4(5432)
.await
.expect("unable to get postgres test port")
);
let pool = super::create_postgres_pool(&connection_string)
.await
.expect("unable to create postgres pool");
super::init_tables(pool.clone())
.await
.expect("unable to init tables");
pool
})
.await;
pool.clone()
}
}
Loading