Skip to content

Commit

Permalink
cometindex: make transactions available in event batches (#5097)
Browse files Browse the repository at this point in the history
The raw ABCI event database already has all the transaction data, and
this PR amends our processing pipeline in cometindex to present blocks
containing both a list of events, each of which may directly have all of
this transaction data (along with a transaction hash), and a list of all
the transactions themselves.

This also refactors things to be a bit more opaque, giving us more
control over the internal representation of the batch of events.

The `ContextualizedEvent` struct is probably a little over-engineered,
but avoids needlessly copying the transaction data around. Ultimately
the performance of indexing is tied to data throughput, so we should be
mindful of needless copying in the architecture.

This should help with the draft in #5081, avoiding the need for creating
a bespoke event.

I tested this by setting up a little indexer just parsing out the tx
data and serializing the JSON, and

## Checklist before requesting a review

- [x] I have added guiding text to explain how a reviewer should test
these changes.

- [x] If this code contains consensus-breaking changes, I have added the
"consensus-breaking" label. Otherwise, I declare my belief that there
are not consensus-breaking changes, for the following reason:

  > Indexing code changes only
  • Loading branch information
cronokirby authored Feb 18, 2025
1 parent eb79827 commit fbb124d
Show file tree
Hide file tree
Showing 14 changed files with 215 additions and 130 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 8 additions & 8 deletions crates/bin/pindexer/src/dex_ex/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -796,9 +796,9 @@ impl Events {

pub fn extract(block: &BlockEvents) -> anyhow::Result<Self> {
let mut out = Self::new();
out.height = block.height as i32;
out.height = block.height() as i32;

for event in &block.events {
for event in block.events() {
if let Ok(e) = EventCandlestickData::try_from_event(&event.event) {
let candle = Candle::from_candlestick_data(&e.stick);
out.with_candle(e.pair, candle);
Expand All @@ -818,7 +818,7 @@ impl Events {
},
false,
);
if let Some(tx_hash) = event.tx_hash {
if let Some(tx_hash) = event.tx_hash() {
out.position_open_txs.insert(e.position_id, tx_hash);
}
// A newly opened position might be executed against in this block,
Expand All @@ -838,7 +838,7 @@ impl Events {
},
true,
);
if let Some(tx_hash) = event.tx_hash {
if let Some(tx_hash) = event.tx_hash() {
out.position_withdrawal_txs.insert(e.position_id, tx_hash);
}
out.position_withdrawals.push(e);
Expand Down Expand Up @@ -873,7 +873,7 @@ impl Events {
} else if let Ok(e) = EventQueuePositionClose::try_from_event(&event.event) {
// The position close event is emitted by the dex module at EOB,
// so we need to track it with the tx hash of the closure tx.
if let Some(tx_hash) = event.tx_hash {
if let Some(tx_hash) = event.tx_hash() {
out.position_close_txs.insert(e.position_id, tx_hash);
}
} else if let Ok(e) = EventBatchSwap::try_from_event(&event.event) {
Expand Down Expand Up @@ -1429,19 +1429,19 @@ impl AppView for Component {
let mut events = Events::extract(&block)?;
let time = events
.time
.expect(&format!("no block root event at height {}", block.height));
.expect(&format!("no block root event at height {}", block.height()));
last_time = Some(time);

// Load any missing positions before processing events
events.load_positions(dbtx).await?;

// This is where we are going to build the block summary for the DEX.
self.record_block_summary(dbtx, time, block.height as i32, &events)
self.record_block_summary(dbtx, time, block.height() as i32, &events)
.await?;

// Record batch swap execution traces.
for event in &events.batch_swaps {
self.record_batch_swap_traces(dbtx, time, block.height as i32, event)
self.record_batch_swap_traces(dbtx, time, block.height() as i32, event)
.await?;
}

Expand Down
2 changes: 1 addition & 1 deletion crates/bin/pindexer/src/governance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ impl GovernanceProposals {
async fn index_event(
&self,
dbtx: &mut PgTransaction<'_>,
event: &ContextualizedEvent,
event: ContextualizedEvent<'_>,
) -> Result<(), anyhow::Error> {
match event.event.kind.as_str() {
EVENT_PROPOSAL_SUBMIT => {
Expand Down
4 changes: 2 additions & 2 deletions crates/bin/pindexer/src/ibc/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,10 @@ enum Event {
},
}

impl TryFrom<&ContextualizedEvent> for Event {
impl TryFrom<ContextualizedEvent<'_>> for Event {
type Error = anyhow::Error;

fn try_from(event: &ContextualizedEvent) -> Result<Self, Self::Error> {
fn try_from(event: ContextualizedEvent<'_>) -> Result<Self, Self::Error> {
match EventKind::try_from(event.event.kind.as_str())? {
EventKind::InboundTransfer => {
let pe = pb::EventInboundFungibleTokenTransfer::from_event(&event.event)?;
Expand Down
2 changes: 1 addition & 1 deletion crates/bin/pindexer/src/insights/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,7 +290,7 @@ impl Component {
async fn index_event(
&self,
dbtx: &mut PgTransaction<'_>,
event: &ContextualizedEvent,
event: ContextualizedEvent<'_>,
) -> Result<(), anyhow::Error> {
let height = event.block_height;
if let Ok(e) = EventUndelegate::try_from_event(&event.event) {
Expand Down
2 changes: 1 addition & 1 deletion crates/bin/pindexer/src/stake/delegation_txs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl AppView for DelegationTxs {
.bind(ik.to_string())
.bind(amount.value() as i64)
.bind(event.block_height as i64)
.bind(event.tx_hash.ok_or_else(|| anyhow!("missing tx hash in event"))?)
.bind(event.tx_hash().ok_or_else(|| anyhow!("missing tx hash in event"))?)
.execute(dbtx.as_mut())
.await?;
}
Expand Down
2 changes: 1 addition & 1 deletion crates/bin/pindexer/src/stake/slashings.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ impl Slashings {
async fn index_event(
&self,
dbtx: &mut PgTransaction<'_>,
event: &ContextualizedEvent,
event: ContextualizedEvent<'_>,
) -> Result<(), anyhow::Error> {
let pe = match pb::EventSlashingPenaltyApplied::from_event(event.as_ref()) {
Ok(pe) => pe,
Expand Down
2 changes: 1 addition & 1 deletion crates/bin/pindexer/src/stake/undelegation_txs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl AppView for UndelegationTxs {
.bind(ik.to_string())
.bind(amount.value() as i64)
.bind(event.block_height as i64)
.bind(event.tx_hash.ok_or_else(|| anyhow!("missing tx hash in event"))?)
.bind(event.tx_hash().ok_or_else(|| anyhow!("missing tx hash in event"))?)
.execute(dbtx.as_mut())
.await?;
}
Expand Down
2 changes: 1 addition & 1 deletion crates/bin/pindexer/src/stake/validator_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ impl ValidatorSet {
async fn index_event(
&self,
dbtx: &mut PgTransaction<'_>,
event: &ContextualizedEvent,
event: ContextualizedEvent<'_>,
) -> Result<(), anyhow::Error> {
match event.event.kind.as_str() {
"penumbra.core.component.stake.v1.EventValidatorDefinitionUpload" => {
Expand Down
4 changes: 2 additions & 2 deletions crates/bin/pindexer/src/supply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -650,10 +650,10 @@ impl Event {
}
}

impl<'a> TryFrom<&'a ContextualizedEvent> for Event {
impl TryFrom<ContextualizedEvent<'_>> for Event {
type Error = anyhow::Error;

fn try_from(event: &'a ContextualizedEvent) -> Result<Self, Self::Error> {
fn try_from(event: ContextualizedEvent<'_>) -> Result<Self, Self::Error> {
match event.event.kind.as_str() {
// undelegation
x if x == Event::NAMES[0] => {
Expand Down
2 changes: 2 additions & 0 deletions crates/util/cometindex/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ tracing = {workspace = true}
tracing-subscriber = {workspace = true}
sqlx = {workspace = true, features = ["postgres", "json", "runtime-tokio", "tls-rustls"] }
async-trait = {workspace = true}
prost = {workspace = true}
tendermint = {workspace = true}
tendermint-proto = {workspace = true}
serde_json = {workspace = true}
futures = {workspace = true}
hex = {workspace = true}
Expand Down
20 changes: 15 additions & 5 deletions crates/util/cometindex/src/contextualized.rs
Original file line number Diff line number Diff line change
@@ -1,18 +1,28 @@
use tendermint::abci::Event;

#[derive(Clone, Debug)]
pub struct ContextualizedEvent {
pub event: Event,
#[derive(Clone, Copy, Debug)]
pub struct ContextualizedEvent<'block> {
pub event: &'block Event,
pub block_height: u64,
pub tx_hash: Option<[u8; 32]>,
pub tx: Option<([u8; 32], &'block [u8])>,
/// The rowid of the event in the local database.
///
/// Note that this is a purely local identifier and won't be the same across
/// different event databases.
pub local_rowid: i64,
}

impl AsRef<Event> for ContextualizedEvent {
impl<'block> ContextualizedEvent<'block> {
pub fn tx_hash(&self) -> Option<[u8; 32]> {
self.tx.map(|x| x.0)
}

pub fn tx_data(&self) -> Option<&'block [u8]> {
self.tx.map(|x| x.1)
}
}

impl<'tx> AsRef<Event> for ContextualizedEvent<'tx> {
fn as_ref(&self) -> &Event {
&self.event
}
Expand Down
82 changes: 75 additions & 7 deletions crates/util/cometindex/src/index.rs
Original file line number Diff line number Diff line change
@@ -1,20 +1,88 @@
use std::sync::Arc;
use std::{collections::BTreeMap, sync::Arc};

use async_trait::async_trait;
pub use sqlx::PgPool;
use sqlx::{Postgres, Transaction};
use tendermint::abci::Event;

use crate::ContextualizedEvent;

pub type PgTransaction<'a> = Transaction<'a, Postgres>;

#[derive(Clone, Copy, Debug)]
struct EventReference {
/// Which event in the block this is.
pub event_index: usize,
pub tx_hash: Option<[u8; 32]>,
pub local_rowid: i64,
}

/// Represents all of the events in a given block
#[derive(Clone, Debug)]
pub struct BlockEvents {
/// The height of this block.
pub height: u64,
/// The events contained in this block, in order.
pub events: Vec<ContextualizedEvent>,
height: u64,
event_refs: Vec<EventReference>,
events: Vec<Event>,
transactions: BTreeMap<[u8; 32], Vec<u8>>,
}

// The builder interface for our own crate.
impl BlockEvents {
pub(crate) fn new(height: u64) -> Self {
const EXPECTED_EVENTS: usize = 32;

Self {
height,
event_refs: Vec::with_capacity(EXPECTED_EVENTS),
events: Vec::with_capacity(EXPECTED_EVENTS),
transactions: BTreeMap::new(),
}
}

/// Register a transaction in this block.
pub(crate) fn push_tx(&mut self, hash: [u8; 32], data: Vec<u8>) {
self.transactions.insert(hash, data);
}

/// Register an event in this block.
pub(crate) fn push_event(&mut self, event: Event, tx_hash: Option<[u8; 32]>, local_rowid: i64) {
let event_index = self.events.len();
self.events.push(event);
self.event_refs.push(EventReference {
event_index,
tx_hash,
local_rowid,
});
}
}

impl BlockEvents {
pub fn height(&self) -> u64 {
self.height
}

fn contextualize(&self, event_ref: EventReference) -> ContextualizedEvent<'_> {
let event = &self.events[event_ref.event_index];
let tx = event_ref
.tx_hash
.and_then(|h| Some((h, self.transactions.get(&h)?.as_slice())));
ContextualizedEvent {
event,
block_height: self.height,
tx,
local_rowid: event_ref.local_rowid,
}
}

/// Iterate over the events in this block, in the order that they appear.
pub fn events(&self) -> impl Iterator<Item = ContextualizedEvent<'_>> {
self.event_refs.iter().map(|x| self.contextualize(*x))
}

/// Iterate over transactions (and their hashes) in the order they appear in the block.
pub fn transactions(&self) -> impl Iterator<Item = ([u8; 32], &'_ [u8])> {
self.transactions.iter().map(|x| (*x.0, x.1.as_slice()))
}
}

#[derive(Clone, Debug)]
Expand Down Expand Up @@ -71,8 +139,8 @@ impl EventBatch {
self.by_height.iter().skip(skip)
}

pub fn events(&self) -> impl Iterator<Item = &'_ ContextualizedEvent> {
self.events_by_block().flat_map(|x| x.events.iter())
pub fn events(&self) -> impl Iterator<Item = ContextualizedEvent<'_>> {
self.events_by_block().flat_map(|x| x.events())
}
}

Expand Down
Loading

0 comments on commit fbb124d

Please sign in to comment.