Skip to content

Commit

Permalink
Check if realyer funds are enough to process submitted transaction
Browse files Browse the repository at this point in the history
(estimates).
  • Loading branch information
piohei committed Sep 2, 2024
1 parent 49dd452 commit 2588866
Show file tree
Hide file tree
Showing 10 changed files with 163 additions and 15 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
.idea/
target/
.env
.env
23 changes: 23 additions & 0 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ use crate::types::wrappers::h256::H256Wrapper;
use crate::types::{
NetworkInfo, RelayerInfo, RelayerUpdate, TransactionPriority, TxStatus,
};
use crate::types::wrappers::hex_u256::HexU256;

Check warning on line 18 in src/db.rs

View workflow job for this annotation

GitHub Actions / cargo fmt

Diff in /home/runner/work/tx-sitter-monolith/tx-sitter-monolith/src/db.rs

Check warning on line 18 in src/db.rs

View workflow job for this annotation

GitHub Actions / cargo fmt

Diff in /home/runner/work/tx-sitter-monolith/tx-sitter-monolith/src/db.rs

pub mod data;

Expand Down Expand Up @@ -271,6 +272,28 @@ impl Database {
Ok(tx_count as usize)
}

#[instrument(skip(self), level = "debug")]
pub async fn get_relayer_pending_txs_gas_limit_sum(
&self,
relayer_id: &str,
) -> eyre::Result<U256> {
let gas_limits: Vec<(HexU256,)> = sqlx::query_as(
r#"
SELECT t.gas_limit
FROM transactions t
LEFT JOIN sent_transactions s ON (t.id = s.tx_id)
WHERE t.relayer_id = $1
AND (s.tx_id IS NULL OR s.status = $2)

Check warning on line 286 in src/db.rs

View workflow job for this annotation

GitHub Actions / cargo fmt

Diff in /home/runner/work/tx-sitter-monolith/tx-sitter-monolith/src/db.rs

Check warning on line 286 in src/db.rs

View workflow job for this annotation

GitHub Actions / cargo fmt

Diff in /home/runner/work/tx-sitter-monolith/tx-sitter-monolith/src/db.rs
"#,
)
.bind(relayer_id)
.bind(TxStatus::Pending)
.fetch_all(&self.pool)
.await?;

Ok(gas_limits.into_iter().fold(U256::zero(), |acc, (v,)| acc + v.0))
}

#[instrument(skip(self), level = "debug")]
pub async fn create_transaction(
&self,
Expand Down
28 changes: 27 additions & 1 deletion src/server.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use std::net::SocketAddr;

Check warning on line 1 in src/server.rs

View workflow job for this annotation

GitHub Actions / cargo fmt

Diff in /home/runner/work/tx-sitter-monolith/tx-sitter-monolith/src/server.rs

Check warning on line 1 in src/server.rs

View workflow job for this annotation

GitHub Actions / cargo fmt

Diff in /home/runner/work/tx-sitter-monolith/tx-sitter-monolith/src/server.rs
use std::sync::Arc;

use ethers::middleware::Middleware;
use ethers::providers::{Http, Provider};
use ethers::signers::Signer;
use eyre::ContextCompat;
use poem::http::StatusCode;
Expand Down Expand Up @@ -251,6 +252,31 @@ impl RelayerApi {
));
}

let relayer_queued_tx_gas_limit_sum = app
.db
.get_relayer_pending_txs_gas_limit_sum(api_token.relayer_id())

Check warning on line 257 in src/server.rs

View workflow job for this annotation

GitHub Actions / cargo fmt

Diff in /home/runner/work/tx-sitter-monolith/tx-sitter-monolith/src/server.rs

Check warning on line 257 in src/server.rs

View workflow job for this annotation

GitHub Actions / cargo fmt

Diff in /home/runner/work/tx-sitter-monolith/tx-sitter-monolith/src/server.rs
.await?;

let block_fees = app.db.get_latest_block_fees_by_chain_id(relayer.chain_id).await?;
if let Some(block_fees) = block_fees {
let gas_limit = relayer_queued_tx_gas_limit_sum + req.gas_limit.0;
let estimated_transactions_cost = block_fees.gas_price * gas_limit;

Check warning on line 264 in src/server.rs

View workflow job for this annotation

GitHub Actions / cargo fmt

Diff in /home/runner/work/tx-sitter-monolith/tx-sitter-monolith/src/server.rs

Check warning on line 264 in src/server.rs

View workflow job for this annotation

GitHub Actions / cargo fmt

Diff in /home/runner/work/tx-sitter-monolith/tx-sitter-monolith/src/server.rs
// TODO: Cache?
let http_provider: Provider<Http> = app.http_provider(relayer.chain_id).await?;

let balance = http_provider.get_balance(relayer.address.0, None).await.map_err(|err| {
eyre::eyre!("Error checking balance: {}", err)
})?;

if balance < estimated_transactions_cost {
return Err(poem::error::Error::from_string(
"Relayer funds are insufficient for transaction to be mined.".to_string(),
StatusCode::UNPROCESSABLE_ENTITY,
));
}
}

let res = app
.db
.create_transaction(
Expand Down
8 changes: 8 additions & 0 deletions src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,14 @@ impl Service {

Ok(())
}

Check warning on line 84 in src/service.rs

View workflow job for this annotation

GitHub Actions / cargo fmt

Diff in /home/runner/work/tx-sitter-monolith/tx-sitter-monolith/src/service.rs

Check warning on line 84 in src/service.rs

View workflow job for this annotation

GitHub Actions / cargo fmt

Diff in /home/runner/work/tx-sitter-monolith/tx-sitter-monolith/src/service.rs

pub async fn is_estimates_ready_for_chain(&self, chain_id: u64) -> bool {
let res = self._app.db.get_latest_block_fees_by_chain_id(chain_id).await;
match res {
Ok(res) => res.is_some(),
Err(_) => false,
}
}
}

async fn initialize_predefined_values(
Expand Down
18 changes: 9 additions & 9 deletions src/tasks/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,15 +108,15 @@ pub async fn backfill_to_block(
rpc: &Provider<Http>,

Check warning on line 108 in src/tasks/index.rs

View workflow job for this annotation

GitHub Actions / cargo fmt

Diff in /home/runner/work/tx-sitter-monolith/tx-sitter-monolith/src/tasks/index.rs

Check warning on line 108 in src/tasks/index.rs

View workflow job for this annotation

GitHub Actions / cargo fmt

Diff in /home/runner/work/tx-sitter-monolith/tx-sitter-monolith/src/tasks/index.rs
latest_block: Block<H256>,
) -> eyre::Result<()> {
// Get the latest block from the db
let Some(latest_db_block_number) =
app.db.get_latest_block_number(chain_id).await?
else {
tracing::info!(chain_id, "No latest block");
return Ok(());
};

let next_block_number: u64 = latest_db_block_number + 1;
let next_block_number: u64 =
if let Some(latest_db_block_number) =
app.db.get_latest_block_number(chain_id).await? {
latest_db_block_number + 1
}
else {
tracing::info!(chain_id, "No latest block");
0
};

// Get the first block from the stream and backfill any missing blocks
let latest_block_number = latest_block
Expand Down
2 changes: 1 addition & 1 deletion tests/common/anvil_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ impl AnvilBuilder {
tokio::time::sleep(Duration::from_secs(block_time)).await;

// We need to seed some transactions so we can get fee estimates on the first block
middleware
let tx = middleware

Check failure on line 53 in tests/common/anvil_builder.rs

View workflow job for this annotation

GitHub Actions / cargo test

unused variable: `tx`

Check failure on line 53 in tests/common/anvil_builder.rs

View workflow job for this annotation

GitHub Actions / cargo test

unused variable: `tx`
.send_transaction(
Eip1559TransactionRequest {
to: Some(DEFAULT_ANVIL_ACCOUNT.into()),
Expand Down
14 changes: 14 additions & 0 deletions tests/common/service_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use std::time::Duration;

use ethers::utils::AnvilInstance;
use sqlx::Pool;

Check failure on line 5 in tests/common/service_builder.rs

View workflow job for this annotation

GitHub Actions / cargo test

unused import: `sqlx::Pool`

Check failure on line 5 in tests/common/service_builder.rs

View workflow job for this annotation

GitHub Actions / cargo test

unused import: `sqlx::Pool`
use tx_sitter::api_key::ApiKey;
use tx_sitter::client::TxSitterClient;
use tx_sitter::config::{
Expand Down Expand Up @@ -94,6 +95,19 @@ impl ServiceBuilder {
let client =
TxSitterClient::new(format!("http://{}", service.local_addr()));

// Awaits for estimates to be ready

Check warning on line 98 in tests/common/service_builder.rs

View workflow job for this annotation

GitHub Actions / cargo fmt

Diff in /home/runner/work/tx-sitter-monolith/tx-sitter-monolith/tests/common/service_builder.rs

Check warning on line 98 in tests/common/service_builder.rs

View workflow job for this annotation

GitHub Actions / cargo fmt

Diff in /home/runner/work/tx-sitter-monolith/tx-sitter-monolith/tests/common/service_builder.rs

Check warning on line 98 in tests/common/service_builder.rs

View workflow job for this annotation

GitHub Actions / cargo fmt

Diff in /home/runner/work/tx-sitter-monolith/tx-sitter-monolith/tests/common/service_builder.rs

Check warning on line 98 in tests/common/service_builder.rs

View workflow job for this annotation

GitHub Actions / cargo fmt

Diff in /home/runner/work/tx-sitter-monolith/tx-sitter-monolith/tests/common/service_builder.rs
let mut is_estimates_ready = false;
for _ in 0..30 {
if service.is_estimates_ready_for_chain(DEFAULT_ANVIL_CHAIN_ID).await {
is_estimates_ready = true;
break;
}
tokio::time::sleep(Duration::from_secs(1)).await;
}
if !is_estimates_ready {
eyre::bail!("Estimates were not ready!");
}

Ok((service, client))
}
}
2 changes: 1 addition & 1 deletion tests/escalation.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ mod common;
use crate::common::prelude::*;

const ESCALATION_INTERVAL: Duration = Duration::from_secs(2);
const ANVIL_BLOCK_TIME: u64 = 6;
const ANVIL_BLOCK_TIME: u64 = 10;

#[tokio::test]
async fn escalation() -> eyre::Result<()> {
Expand Down
25 changes: 23 additions & 2 deletions tests/send_too_many_txs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,29 @@ async fn send_too_many_txs() -> eyre::Result<()> {
})
.await?;

let provider = setup_provider(anvil.endpoint()).await?;
let init_value: U256 = parse_units("1", "ether")?.into();

// Send some funds to created relayer
client
.send_tx(
&api_key,
&SendTxRequest {
to: secondary_relayer_address.clone(),
value: init_value.into(),
data: None,
gas_limit: U256::from(21_000).into(),
priority: TransactionPriority::Regular,
tx_id: None,
blobs: None,
},
)
.await?;

tracing::info!("Waiting for secondary relayer initial balance");
await_balance(&provider, init_value, secondary_relayer_address.0)
.await?;

let CreateApiKeyResponse {
api_key: secondary_api_key,
} = client.create_relayer_api_key(&secondary_relayer_id).await?;
Expand All @@ -42,8 +65,6 @@ async fn send_too_many_txs() -> eyre::Result<()> {
)
.await?;

let provider = setup_provider(anvil.endpoint()).await?;

// Send a transaction
let value: U256 = parse_units("0.01", "ether")?.into();

Expand Down
55 changes: 55 additions & 0 deletions tests/send_when_insufficient_funds.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
mod common;

use tx_sitter::client::ClientError;
use crate::common::prelude::*;

const ESCALATION_INTERVAL: Duration = Duration::from_secs(2);
const ANVIL_BLOCK_TIME: u64 = 6;

#[tokio::test]
async fn send_when_insufficient_funds() -> eyre::Result<()> {
setup_tracing();

let (db_url, _db_container) = setup_db().await?;
let anvil = AnvilBuilder::default()
.block_time(ANVIL_BLOCK_TIME)
.spawn()
.await?;

let (_service, client) = ServiceBuilder::default()
.escalation_interval(ESCALATION_INTERVAL)
.build(&anvil, &db_url)
.await?;

let CreateApiKeyResponse { api_key } =
client.create_relayer_api_key(DEFAULT_RELAYER_ID).await?;

// Send a transaction
let value: U256 = parse_units("1", "ether")?.into();
for _ in 0..10 {
let tx = client.send_tx(&api_key,
&SendTxRequest {
to: ARBITRARY_ADDRESS.into(),
value: value.into(),
gas_limit: U256::from_dec_str("1000000000000")?.into(),
..Default::default()
},
).await;

match tx {
Err(err) => {
match err {
ClientError::TxSitter(status_code, message) => {
assert_eq!(status_code, reqwest::StatusCode::UNPROCESSABLE_ENTITY);
assert_eq!(message, "Relayer funds are insufficient for transaction to be mined.");
return Ok(());
}
_ => {}
}
}
_ => {}
}
}

eyre::bail!("Should return error response with information about insufficient funds.")
}

0 comments on commit 2588866

Please sign in to comment.