Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Fetch missing txs #8

Closed
wants to merge 10 commits into from
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
target/
90 changes: 45 additions & 45 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,54 +8,50 @@ default-run = "tx-sitter"
members = ["crates/*"]

[dependencies]
async-trait = "0.1.74"
# Third Party
## AWS
aws-config = { version = "1.0.1" }
aws-sdk-kms = "1.3.0"
aws-smithy-types = "1.0.2"
aws-smithy-runtime-api = "1.0.2"
aws-types = "1.0.1"
aws-credential-types = { version = "1.0.1", features = [
"hardcoded-credentials",
] }

## Other
serde = "1.0.136"
aws-sdk-kms = "1.3.0"
aws-smithy-runtime-api = "1.0.2"
aws-smithy-types = "1.0.2"
aws-types = "1.0.1"
axum = { version = "0.6.20", features = ["headers"] }
thiserror = "1.0.50"
headers = "0.3.9"
humantime = "2.1.0"
humantime-serde = "1.1.1"
hyper = "0.14.27"
dotenv = "0.15.0"
base64 = "0.21.5"
bigdecimal = "0.4.2"
chrono = "0.4"
clap = { version = "4.3.0", features = ["env", "derive"] }
config = "0.13.3"
dotenv = "0.15.0"
ethers = { version = "2.0.11", features = ["ws"] }
eyre = "0.6.5"
futures = "0.3"
headers = "0.3.9"
hex = "0.4.3"
hex-literal = "0.4.1"
humantime = "2.1.0"
humantime-serde = "1.1.1"
hyper = "0.14.27"
itertools = "0.12.0"
metrics = "0.21.1"
num-bigint = "0.4.4"
# telemetry-batteries = { path = "../telemetry-batteries" }

# Internal
postgres-docker-utils = { path = "crates/postgres-docker-utils" }
rand = "0.8.5"
reqwest = { version = "0.11.13", default-features = false, features = [
"rustls-tls",
] }

## Other
serde = "1.0.136"
serde_json = "1.0.91"
strum = { version = "0.25.0", features = ["derive"] }
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
tracing = { version = "0.1", features = ["log"] }
tracing-subscriber = { version = "0.3", default-features = false, features = [
"env-filter",
"std",
"fmt",
"json",
"ansi",
] }
tower-http = { version = "0.4.4", features = [ "trace", "auth" ] }
uuid = { version = "0.8", features = ["v4"] }
futures = "0.3"
chrono = "0.4"
rand = "0.8.5"
sha3 = "0.10.8"
config = "0.13.3"
toml = "0.8.8"
url = "2.4.1"
spki = "0.7.2"
sqlx = { version = "0.7.2", features = [
"time",
"chrono",
Expand All @@ -65,26 +61,30 @@ sqlx = { version = "0.7.2", features = [
"migrate",
"bigdecimal",
] }
metrics = "0.21.1"
num-bigint = "0.4.4"
bigdecimal = "0.4.2"
spki = "0.7.2"
async-trait = "0.1.74"
itertools = "0.12.0"
base64 = "0.21.5"
strum = { version = "0.25.0", features = ["derive"] }

# Company
telemetry-batteries = { git = "https://github.com/worldcoin/telemetry-batteries", branch = "dzejkop/unnest-fields" }
# telemetry-batteries = { path = "../telemetry-batteries" }

# Internal
postgres-docker-utils = { path = "crates/postgres-docker-utils" }
thiserror = "1.0.50"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
toml = "0.8.8"
tower-http = { version = "0.4.4", features = [ "trace", "auth" ] }
tracing = { version = "0.1", features = ["log"] }
tracing-subscriber = { version = "0.3", default-features = false, features = [
"env-filter",
"std",
"fmt",
"json",
"ansi",
] }
url = "2.4.1"
uuid = { version = "0.8", features = ["v4"] }

[dev-dependencies]
test-case = "3.1.0"
indoc = "2.0.3"
fake-rpc = { path = "crates/fake-rpc" }
indoc = "2.0.3"
test-case = "3.1.0"

[features]
default = [ "default-config" ]
default = ["default-config"]
default-config = []
3 changes: 3 additions & 0 deletions db/migrations/001_init.sql
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ CREATE TABLE tx_hashes (
escalated BOOL NOT NULL DEFAULT FALSE
);

ALTER TABLE tx_hashes
ADD UNIQUE (tx_id);

-- Dynamic tx data & data used for escalations
CREATE TABLE sent_transactions (
tx_id VARCHAR(255) PRIMARY KEY REFERENCES transactions(id) ON DELETE CASCADE,
Expand Down
51 changes: 41 additions & 10 deletions src/db.rs
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ impl Database {
.await?)
}

pub async fn insert_tx_broadcast(
pub async fn insert_into_tx_hashes(
&self,
tx_id: &str,
tx_hash: H256,
Expand All @@ -246,21 +246,38 @@ impl Database {
initial_max_priority_fee_per_gas
.to_big_endian(&mut initial_max_priority_fee_per_gas_bytes);

let mut tx = self.pool.begin().await?;

sqlx::query(
r#"
INSERT INTO tx_hashes (tx_id, tx_hash, max_fee_per_gas, max_priority_fee_per_gas)
VALUES ($1, $2, $3, $4)
ON CONFLICT (tx_id) DO NOTHING
"#,
)
.bind(tx_id)
.bind(tx_hash.as_bytes())
.bind(initial_max_fee_per_gas_bytes)
.bind(initial_max_priority_fee_per_gas_bytes)
.execute(tx.as_mut())
.execute(&self.pool)
.await?;

Ok(())
}

pub async fn insert_into_sent_transactions(
&self,
tx_id: &str,
tx_hash: H256,
initial_max_fee_per_gas: U256,
initial_max_priority_fee_per_gas: U256,
) -> eyre::Result<()> {
let mut initial_max_fee_per_gas_bytes = [0u8; 32];
initial_max_fee_per_gas
.to_big_endian(&mut initial_max_fee_per_gas_bytes);

let mut initial_max_priority_fee_per_gas_bytes = [0u8; 32];
initial_max_priority_fee_per_gas
.to_big_endian(&mut initial_max_priority_fee_per_gas_bytes);

sqlx::query(
r#"
INSERT INTO sent_transactions (tx_id, initial_max_fee_per_gas, initial_max_priority_fee_per_gas, valid_tx_hash)
Expand All @@ -271,9 +288,7 @@ impl Database {
.bind(initial_max_fee_per_gas_bytes)
.bind(initial_max_priority_fee_per_gas_bytes)
.bind(tx_hash.as_bytes())
.execute(tx.as_mut()).await?;

tx.commit().await?;
.execute(&self.pool).await?;

Ok(())
}
Expand Down Expand Up @@ -1069,9 +1084,17 @@ mod tests {
let url =
format!("postgres://postgres:postgres@{db_socket_addr}/database");

let db = Database::new(&DatabaseConfig::connection_string(url)).await?;
for _ in 0..5 {
match Database::new(&DatabaseConfig::connection_string(&url)).await
{
Ok(db) => return Ok((db, db_container)),
Err(_) => {
tokio::time::sleep(Duration::from_secs(2)).await;
}
}
}

Ok((db, db_container))
Err(eyre::eyre!("Failed to connect to the database"))
}

async fn full_update(
Expand Down Expand Up @@ -1287,7 +1310,15 @@ mod tests {
let initial_max_fee_per_gas = U256::from(1);
let initial_max_priority_fee_per_gas = U256::from(1);

db.insert_tx_broadcast(
db.insert_into_tx_hashes(
tx_id,
tx_hash_1,
initial_max_fee_per_gas,
initial_max_priority_fee_per_gas,
)
.await?;

db.insert_into_sent_transactions(
tx_id,
tx_hash_1,
initial_max_fee_per_gas,
Expand Down
15 changes: 15 additions & 0 deletions src/keys/universal_signer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use ethers::core::types::transaction::eip2718::TypedTransaction;
use ethers::core::types::transaction::eip712::Eip712;
use ethers::core::types::{Address, Signature as EthSig};
use ethers::signers::{Signer, Wallet, WalletError};
use ethers::types::Bytes;
use thiserror::Error;

use crate::aws::ethers_signer::AwsSigner;
Expand All @@ -13,6 +14,20 @@ pub enum UniversalSigner {
Local(Wallet<SigningKey>),
}

impl UniversalSigner {
pub async fn raw_signed_tx(
&self,
tx: &TypedTransaction,
) -> eyre::Result<Bytes> {
let signature = match self {
Self::Aws(signer) => signer.sign_transaction(tx).await?,
Self::Local(signer) => signer.sign_transaction(tx).await?,
};

Ok(tx.rlp_signed(&signature))
}
}

#[derive(Debug, Error)]
pub enum UniversalError {
#[error("AWS Signer Error: {0}")]
Expand Down
Loading
Loading