Skip to content

Commit

Permalink
fix(pindexer): support big amounts
Browse files Browse the repository at this point in the history
Use I256 values in pindexer.
  • Loading branch information
zbuc authored and conorsch committed Jan 13, 2025
1 parent c90fa23 commit 31cac86
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 9 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions crates/bin/pindexer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ anyhow = {workspace = true}
clap = {workspace = true}
chrono = {workspace = true}
cometindex = {workspace = true}
ethnum = {workspace = true}
num-bigint = { version = "0.4" }
penumbra-shielded-pool = {workspace = true, default-features = false}
penumbra-stake = {workspace = true, default-features = false}
Expand Down
23 changes: 14 additions & 9 deletions crates/bin/pindexer/src/insights/mod.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use ethnum::I256;
use std::{collections::BTreeMap, iter};

use cometindex::{async_trait, index::EventBatch, AppView, ContextualizedEvent, PgTransaction};
Expand Down Expand Up @@ -150,22 +151,26 @@ async fn asset_flow(
dbtx: &mut PgTransaction<'_>,
asset_id: asset::Id,
height: u64,
flow: i128,
flow: I256,
refund: bool,
depositor_existed: DepositorExisted,
) -> anyhow::Result<()> {
let asset_pool: Option<(String, String, i32)> = sqlx::query_as("SELECT total_value, current_value, unique_depositors FROM insights_shielded_pool WHERE asset_id = $1 ORDER BY height DESC LIMIT 1").bind(asset_id.to_bytes()).fetch_optional(dbtx.as_mut()).await?;
let mut asset_pool = asset_pool
.map(|(t, c, u)| {
anyhow::Result::<(i128, i128, i32)>::Ok((
i128::from_str_radix(&t, 10)?,
i128::from_str_radix(&c, 10)?,
anyhow::Result::<(I256, I256, i32)>::Ok((
I256::from_str_radix(&t, 10)?,
I256::from_str_radix(&c, 10)?,
u,
))
})
.transpose()?
.unwrap_or((0i128, 0i128, 0i32));
asset_pool.0 += if refund { 0 } else { flow.max(0) };
.unwrap_or((I256::ZERO, I256::ZERO, 0i32));
asset_pool.0 += if refund {
I256::ZERO
} else {
flow.max(I256::ZERO)
};
asset_pool.1 += flow;
asset_pool.2 += match depositor_existed {
DepositorExisted::Yes => 0,
Expand Down Expand Up @@ -431,12 +436,12 @@ impl Component {
} else if let Ok(e) = EventInboundFungibleTokenTransfer::try_from_event(&event.event) {
if e.value.asset_id != *STAKING_TOKEN_ASSET_ID {
let existed = register_depositor(dbtx, e.value.asset_id, &e.sender).await?;
let flow = i128::try_from(e.value.amount.value())?;
let flow = I256::try_from(e.value.amount.value())?;
asset_flow(dbtx, e.value.asset_id, height, flow, false, existed).await?;
}
} else if let Ok(e) = EventOutboundFungibleTokenTransfer::try_from_event(&event.event) {
if e.value.asset_id != *STAKING_TOKEN_ASSET_ID {
let flow = i128::try_from(e.value.amount.value())?;
let flow = I256::try_from(e.value.amount.value())?;
// For outbound transfers, never increment unique count
asset_flow(
dbtx,
Expand All @@ -450,7 +455,7 @@ impl Component {
}
} else if let Ok(e) = EventOutboundFungibleTokenRefund::try_from_event(&event.event) {
if e.value.asset_id != *STAKING_TOKEN_ASSET_ID {
let flow = i128::try_from(e.value.amount.value())?;
let flow = I256::try_from(e.value.amount.value())?;
// For outbound transfers, never increment unique count.
asset_flow(
dbtx,
Expand Down

0 comments on commit 31cac86

Please sign in to comment.