Skip to content

Commit

Permalink
eth tx compatible
Browse files Browse the repository at this point in the history
  • Loading branch information
bufrr committed Nov 18, 2024
1 parent f5145b7 commit e5032a8
Show file tree
Hide file tree
Showing 4 changed files with 139 additions and 10 deletions.
2 changes: 1 addition & 1 deletion benchmark/fabfile.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ def local(ctx, debug=True):
'nodes': 4,
'workers': 1,
'rate': 50_000,
'tx_size': 512,
'tx_size': 200,
'duration': 20,
}
node_params = {
Expand Down
11 changes: 11 additions & 0 deletions node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,17 @@ primary = { path = "../primary" }
worker = { path = "../worker" }
consensus = { path = "../consensus" }

alloy = { version = "0.6.1", features = ["full"] }
alloy-signer = "0.6.1"
alloy-rpc-client = "0.6.1"
alloy-primitives = "0.8.11"
alloy-signer-local = { version = "0.6.1",features = ["mnemonic"] }
eyre = "0.6.12"
hex = "0.4.3"
serde = { version = "1.0", features = ["derive"] }
rocksdb = "0.22.0"
serde_json = "1.0.132"

[features]
benchmark = ["worker/benchmark", "primary/benchmark", "consensus/benchmark"]

Expand Down
41 changes: 36 additions & 5 deletions node/src/benchmark_client.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
// Copyright(C) Facebook, Inc. and its affiliates.

use anyhow::{Context, Result};
use bytes::BufMut as _;
use bytes::BytesMut;
use bytes::{BufMut as _, Bytes};
use clap::{crate_name, crate_version, App, AppSettings};
use env_logger::Env;
use futures::future::join_all;
Expand All @@ -13,6 +14,21 @@ use tokio::net::TcpStream;
use tokio::time::{interval, sleep, Duration, Instant};
use tokio_util::codec::{Framed, LengthDelimitedCodec};

use alloy::primitives::{
utils::{format_units, parse_units},
Address,
};
use alloy::transports::http::reqwest::Url;
use alloy::{
network::{EthereumWallet, TransactionBuilder},
primitives::U256,
providers::{Provider, ProviderBuilder, WalletProvider},
rpc::types::TransactionRequest,
};
use alloy_primitives::address;
use alloy_signer::Signer;
use alloy_signer_local::{coins_bip39::English, MnemonicBuilder};

#[tokio::main]
async fn main() -> Result<()> {
let matches = App::new(crate_name!())
Expand Down Expand Up @@ -102,10 +118,24 @@ impl Client {
let burst = self.rate / PRECISION;
let mut tx = BytesMut::with_capacity(self.size);
let mut counter = 0;
let mut r = rand::thread_rng().gen();
//let mut r = rand::thread_rng().gen();
let mut transport = Framed::new(stream, LengthDelimitedCodec::new());
let interval = interval(Duration::from_millis(BURST_DURATION));
tokio::pin!(interval);
let value = parse_units("1", "wei")?;
let txx = TransactionRequest::default()
.with_to(address!("BBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBBB"))
.with_nonce(1000)
.with_chain_id(1337)
.with_value(value.into())
.with_gas_limit(21_000)
.with_max_priority_fee_per_gas(1_000_000_000)
.with_max_fee_per_gas(20_000_000_000);

let ss = serde_json::to_string(&txx)?;
// let bb = Bytes::from(ss.clone());
let bb = ss.as_bytes();
info!("tx len: {}", ss.len());

// NOTE: This log entry is used to compute performance.
info!("Start sending transactions");
Expand All @@ -122,9 +152,10 @@ impl Client {
tx.put_u8(0u8); // Sample txs start with 0.
tx.put_u64(counter); // This counter identifies the tx.
} else {
r += 1;
tx.put_u8(1u8); // Standard txs start with 1.
tx.put_u64(r); // Ensures all clients send different txs.
// r += 1;
// tx.put_u8(1u8); // Standard txs start with 1.
// tx.put_u64(r); // Ensures all clients send different txs.
tx.put_slice(bb);
};

tx.resize(self.size, 0u8);
Expand Down
95 changes: 91 additions & 4 deletions node/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
// Copyright(C) Facebook, Inc. and its affiliates.

use std::time::Duration;
use alloy_primitives::private::serde;
use anyhow::{Context, Result};
use clap::{crate_name, crate_version, App, AppSettings, ArgMatches, SubCommand};
use config::Export as _;
use config::Import as _;
use config::{Committee, KeyPair, Parameters, WorkerId};
use consensus::Consensus;
use crypto::Digest;
use env_logger::Env;
use log::info;
use primary::{Certificate, Primary};
use store::Store;
use tokio::sync::mpsc::{channel, Receiver};
use tokio::time::sleep;
use worker::Worker;

/// The default channel capacity.
Expand Down Expand Up @@ -127,15 +133,96 @@ async fn run(matches: &ArgMatches<'_>) -> Result<()> {
}

// Analyze the consensus' output.
analyze(rx_output).await;
analyze(rx_output, store_path.parse()?).await;

// If this expression is reached, the program ends and all other tasks terminate.
unreachable!();
}

/// Receives an ordered list of certificates and apply any application-specific logic.
async fn analyze(mut rx_output: Receiver<Certificate>) {
while let Some(_certificate) = rx_output.recv().await {
// NOTE: Here goes the application logic.
async fn analyze(mut rx_output: Receiver<Certificate>, store_path: String) {
loop {
tokio::select! {
Some(certificate) = rx_output.recv() => {
info!("rx_output len: {}", rx_output.len());
let store_path = store_path.clone();
tokio::spawn(async move {
handle_cert(certificate, store_path).await.expect("handle_cert panic");
});
},
else => {
println!("loop exit!!!");
break;
},
}
}
}
async fn reconstruct_batch(
digest: Digest,
worker_id: u32,
store_path: String,
) -> eyre::Result<Vec<u8>> {
let max_attempts = 3;
let backoff_ms = 500;
let db_path = format!("{}-{}", store_path, worker_id);
// Open the database to each worker
let db = rocksdb::DB::open_for_read_only(&rocksdb::Options::default(), db_path, true)?;

for attempt in 0..max_attempts {
// Query the db
let key = digest.to_vec();
match db.get(&key)? {
Some(res) => return Ok(res),
None if attempt < max_attempts - 1 => {
println!(
"digest {} not found, retrying in {}ms",
digest,
backoff_ms * (attempt + 1)
);
sleep(Duration::from_millis(backoff_ms * (attempt + 1))).await;
continue;
}
None => eyre::bail!(
"digest {} not found after {} attempts",
digest,
max_attempts
),
}
}
unreachable!()
}
async fn handle_cert(certificate: Certificate, store_path: String) -> eyre::Result<()> {
// Reconstruct batches from certificate
let futures: Vec<_> = certificate
.header
.payload
.into_iter()
.map(|(digest, worker_id)| reconstruct_batch(digest, worker_id, store_path.clone()))
.collect();

let batches = futures::future::join_all(futures).await;

for batch in batches {
let batch = batch?;
process_batch(batch).await?;
}
Ok(())
}

async fn process_batch(batch: Vec<u8>) -> eyre::Result<()> {
// Deserialize and process the batch
match bincode::deserialize(&batch) {
Ok(WorkerMessage::Batch(txs)) => {
info!("txs len: {}", txs.len());
Ok(())
}
_ => eyre::bail!("Unrecognized message format"),
}
}

pub type Transaction = Vec<u8>;
pub type Batch = Vec<Transaction>;
#[derive(serde::Deserialize)]
pub enum WorkerMessage {
Batch(Batch),
}

0 comments on commit e5032a8

Please sign in to comment.