Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: emit cbor filter #788

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions examples/emit_cbor/daemon.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
[source]
type = "N2N"
peers = ["preprod-node.play.dev.cardano.org:3001"]

[chain]
type = "preprod"

[intersect]
type = "Point"
value = [57829508, "f7b3b3f98d1fb76cb53956cf4703abac966f7fc14d965ac8d2f5a107da683637"]

[[filters]]
type = "EmitCbor"

[sink]
type = "Stdout"

[cursor]
type = "File"
path = "cursor"
42 changes: 42 additions & 0 deletions examples/postgresql/cursor
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
[
[
60485013,
"043018656b71c25c2a0338fe7000a53dedc704c5688e0c90a6093f08178a8b36"
],
[
60485009,
"80669929ff82d23e5dd097d1554541bc678d53322df6c7bfec13c62645c68737"
],
[
60485003,
"0f2f548b4360e3553923cc0fd96b9e10ec5142bc43f93e0a0a5509be7fd4aa20"
],
[
60484990,
"4fc80c10ec326b1d1229f3284c8b5d73c442524a95da91727f1f5684b8e58484"
],
[
60484982,
"f6c8a7b2b69b47da827ff255cf92e282493c70e64587ab233426b5b604939ca5"
],
[
60484942,
"15989755ba2c52d302b197771c076491ea15422a83f08f66c90e25ed1a73bb78"
],
[
60484936,
"d6e485277c441e6868fd742c6a3e57a4ca1de30b4e8c619624c054352f0b83a2"
],
[
60484917,
"40844b925abd2004c70694eebd6da02a6ec4c3eca51aba32314364d52679a75b"
],
[
60484876,
"43ae8400494cff3fdde99a7ca780d5b4a216116b44f4b39cc4f24f735eb70570"
],
[
60484869,
"4595260047133e4da97d2939c4edf6087621e6b72380c2179b35d6df35520ab4"
]
]
24 changes: 17 additions & 7 deletions examples/postgresql/daemon.toml
Original file line number Diff line number Diff line change
@@ -1,17 +1,27 @@
[source]
type = "N2N"
peers = ["relays-new.cardano-mainnet.iohk.io:3001"]
peers = ["preprod-node.play.dev.cardano.org:3001"]

[chain]
type = "preprod"

[intersect]
type = "Point"
value = [114220807, "01822b10afde3d09bd5e72758857b669ddadcdaad6776a3dc8ee902c3ace1d7e"]
value = [60484568, "874858059dc6c871105b80b38238cf84a91e617b2afde53904407cb2a3299e9d"]

[[filters]]
type = "SplitBlock"
type = "EmitCbor"

[sink]
type = "SqlDb"
connection = "postgres://postgres:example@localhost:5432/postgres"
apply_template = "INSERT INTO txs (slot, cbor) VALUES ('{{point.slot}}', decode('{{record.hex}}', 'hex'));"
undo_template = "DELETE FROM txs WHERE slot = {{point.slot}}"
reset_template = "DELETE FROM txs WHERE slot > {{point.slot}}"
connection = "postgres://postgres:test1234@localhost:15432/oura_test"
apply_cbor_block_template = "INSERT INTO blocks (slot, cbor) VALUES ('{{point.slot}}', decode('{{record.hex}}', 'hex'));"
undo_cbor_block_template = "DELETE FROM blocks WHERE slot = {{point.slot}}"
apply_cbor_tx_template = "INSERT INTO txs (slot, cbor) VALUES ('{{point.slot}}', decode('{{record.hex}}', 'hex'));"
undo_cbor_tx_template = "DELETE FROM txs WHERE slot = {{point.slot}}"
reset_cbor_block_template = "DELETE FROM blocks WHERE slot > {{point.slot}};"
reset_cbor_tx_template = "DELETE FROM txs WHERE slot > {{point.slot}};"

[cursor]
type = "File"
path = "cursor"
13 changes: 12 additions & 1 deletion examples/postgresql/init.sql
Original file line number Diff line number Diff line change
@@ -1,6 +1,17 @@
-- Table for storing CBOR blocks
CREATE TABLE blocks (
slot INTEGER NOT NULL,
cbor BYTEA
);

-- Index for the blocks table
CREATE INDEX idx_blocks_slot ON blocks(slot);

-- Table for storing CBOR transactions
CREATE TABLE txs (
slot INTEGER NOT NULL,
cbor BYTEA
);

CREATE INDEX idx_txs_slot ON txs(slot);
-- Index for the txs table
CREATE INDEX idx_txs_slot ON txs(slot);
74 changes: 74 additions & 0 deletions src/filters/emit_cbor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
//! A filter that emits CBOR blocks and parses them to emit CBOR transactions

use gasket::framework::*;
use serde::Deserialize;
use std::borrow::Cow;

use pallas::ledger::traverse as trv;

use crate::framework::*;

type CborBlock<'a> = Cow<'a, [u8]>;
type CborTx<'a> = Cow<'a, [u8]>;

fn map_block_to_tx(cbor: CborBlock) -> Result<Vec<CborTx>, WorkerError> {
let block = trv::MultiEraBlock::decode(cbor.as_ref()).or_panic()?;

let txs: Vec<_> = block
.txs()
.iter()
.map(|tx| tx.encode())
.map(Cow::Owned)
.collect();

Ok(txs)
}

#[derive(Default, Stage)]
#[stage(name = "filter-emit-cbor", unit = "ChainEvent", worker = "Worker")]
pub struct Stage {
pub input: FilterInputPort,
pub output: FilterOutputPort,

#[metric]
ops_count: gasket::metrics::Counter,
}

#[derive(Default)]
pub struct Worker;

impl From<&Stage> for Worker {
fn from(_: &Stage) -> Self {
Self
}
}

gasket::impl_splitter!(|_worker: Worker, stage: Stage, unit: ChainEvent| => {
let output = unit.clone().try_map_record_to_many(|r| match r {
Record::CborBlock(cbor) => {
let txs = map_block_to_tx(Cow::Borrowed(&cbor))?
.into_iter()
.map(|tx| Record::CborTx(tx.into()))
.collect::<Vec<_>>();

let mut records = vec![Record::CborBlock(cbor.to_vec())];
records.extend(txs);

Ok(records)
}
x => Ok(vec![x]),
})?;

stage.ops_count.inc(1);

output
});

#[derive(Default, Deserialize)]
pub struct Config {}

impl Config {
pub fn bootstrapper(self, _ctx: &Context) -> Result<Stage, Error> {
Ok(Stage::default())
}
}
7 changes: 7 additions & 0 deletions src/filters/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use serde::Deserialize;

use crate::framework::*;

pub mod emit_cbor;
pub mod into_json;
pub mod legacy_v1;
pub mod noop;
Expand All @@ -20,6 +21,7 @@ pub enum Bootstrapper {
LegacyV1(legacy_v1::Stage),
ParseCbor(parse_cbor::Stage),
Select(select::Stage),
EmitCbor(emit_cbor::Stage),

#[cfg(feature = "wasm")]
WasmPlugin(wasm_plugin::Stage),
Expand All @@ -34,6 +36,7 @@ impl Bootstrapper {
Bootstrapper::LegacyV1(p) => &mut p.input,
Bootstrapper::ParseCbor(p) => &mut p.input,
Bootstrapper::Select(p) => &mut p.input,
Bootstrapper::EmitCbor(p) => &mut p.input,

#[cfg(feature = "wasm")]
Bootstrapper::WasmPlugin(p) => &mut p.input,
Expand All @@ -48,6 +51,7 @@ impl Bootstrapper {
Bootstrapper::LegacyV1(p) => &mut p.output,
Bootstrapper::ParseCbor(p) => &mut p.output,
Bootstrapper::Select(p) => &mut p.output,
Bootstrapper::EmitCbor(p) => &mut p.output,

#[cfg(feature = "wasm")]
Bootstrapper::WasmPlugin(p) => &mut p.output,
Expand All @@ -62,6 +66,7 @@ impl Bootstrapper {
Bootstrapper::LegacyV1(x) => gasket::runtime::spawn_stage(x, policy),
Bootstrapper::ParseCbor(x) => gasket::runtime::spawn_stage(x, policy),
Bootstrapper::Select(x) => gasket::runtime::spawn_stage(x, policy),
Bootstrapper::EmitCbor(x) => gasket::runtime::spawn_stage(x, policy),

#[cfg(feature = "wasm")]
Bootstrapper::WasmPlugin(x) => gasket::runtime::spawn_stage(x, policy),
Expand All @@ -78,6 +83,7 @@ pub enum Config {
LegacyV1(legacy_v1::Config),
ParseCbor(parse_cbor::Config),
Select(select::Config),
EmitCbor(emit_cbor::Config),

#[cfg(feature = "wasm")]
WasmPlugin(wasm_plugin::Config),
Expand All @@ -92,6 +98,7 @@ impl Config {
Config::LegacyV1(c) => Ok(Bootstrapper::LegacyV1(c.bootstrapper(ctx)?)),
Config::ParseCbor(c) => Ok(Bootstrapper::ParseCbor(c.bootstrapper(ctx)?)),
Config::Select(c) => Ok(Bootstrapper::Select(c.bootstrapper(ctx)?)),
Config::EmitCbor(c) => Ok(Bootstrapper::EmitCbor(c.bootstrapper(ctx)?)),

#[cfg(feature = "wasm")]
Config::WasmPlugin(c) => Ok(Bootstrapper::WasmPlugin(c.bootstrapper(ctx)?)),
Expand Down
57 changes: 43 additions & 14 deletions src/sinks/sql_db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,25 +43,39 @@ impl gasket::framework::Worker<Stage> for Worker {
async fn execute(&mut self, unit: &ChainEvent, stage: &mut Stage) -> Result<(), WorkerError> {
let point = unit.point().clone();

let template = match unit {
let templates = match unit {
ChainEvent::Apply(p, r) => {
let data = hbs_data(p.clone(), Some(r.clone()));
stage.templates.render("apply", &data)
let template = match r {
Record::CborBlock(_) => stage.templates.render("apply_cbor_block", &data),
Record::CborTx(_) => stage.templates.render("apply_cbor_tx", &data),
_ => stage.templates.render("apply", &data),
};
vec![template]
}
ChainEvent::Undo(p, r) => {
let data = hbs_data(p.clone(), Some(r.clone()));
stage.templates.render("undo", &data)
let template = match r {
Record::CborBlock(_) => stage.templates.render("undo_cbor_block", &data),
Record::CborTx(_) => stage.templates.render("undo_cbor_tx", &data),
_ => stage.templates.render("undo", &data),
};
vec![template]
}
ChainEvent::Reset(p) => {
let data = hbs_data(p.clone(), None);
stage.templates.render("reset", &data)
vec![
stage.templates.render("reset_cbor_block", &data),
stage.templates.render("reset_cbor_tx", &data),
]
}
};

let statement = template.or_panic()?;

let result = sqlx::query(&statement).execute(&self.db).await.or_retry()?;
debug!(rows = result.rows_affected(), "sql statement executed");
for template in templates {
let statement = template.or_panic()?;
let result = sqlx::query(&statement).execute(&self.db).await.or_retry()?;
debug!(rows = result.rows_affected(), "sql statement executed");
}

stage.ops_count.inc(1);
stage.latest_block.set(point.slot_or_default() as i64);
Expand Down Expand Up @@ -91,9 +105,12 @@ pub struct Stage {
pub struct Config {
/// eg: sqlite::memory:
pub connection: String,
pub apply_template: String,
pub undo_template: String,
pub reset_template: String,
pub apply_cbor_block_template: String,
pub undo_cbor_block_template: String,
pub apply_cbor_tx_template: String,
pub undo_cbor_tx_template: String,
pub reset_cbor_block_template: String,
pub reset_cbor_tx_template: String,
}

impl Config {
Expand All @@ -103,15 +120,27 @@ impl Config {
let mut templates = handlebars::Handlebars::new();

templates
.register_template_string("apply", &self.apply_template)
.register_template_string("apply_cbor_block", &self.apply_cbor_block_template)
.map_err(Error::config)?;

templates
.register_template_string("undo_cbor_block", &self.undo_cbor_block_template)
.map_err(Error::config)?;

templates
.register_template_string("apply_cbor_tx", &self.apply_cbor_tx_template)
.map_err(Error::config)?;

templates
.register_template_string("undo_cbor_tx", &self.undo_cbor_tx_template)
.map_err(Error::config)?;

templates
.register_template_string("undo", &self.undo_template)
.register_template_string("reset_cbor_block", &self.reset_cbor_block_template)
.map_err(Error::config)?;

templates
.register_template_string("reset", &self.reset_template)
.register_template_string("reset_cbor_tx", &self.reset_cbor_tx_template)
.map_err(Error::config)?;

let stage = Stage {
Expand Down
Loading