diff --git a/Cargo.lock b/Cargo.lock index 8acba659..2d35b698 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2055,8 +2055,10 @@ dependencies = [ "glados-core", "migration", "rand 0.8.5", + "reqwest", "sea-orm", "serde_json", + "thiserror", "tokio", "tracing", "trin-utils", @@ -3917,9 +3919,9 @@ dependencies = [ [[package]] name = "reqwest" -version = "0.11.15" +version = "0.11.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0ba30cc2c0cd02af1222ed216ba659cdb2f879dfe3181852fe7c50b1d0005949" +checksum = "27b71749df584b7f4cac2c426c127a7c785a5106cc98f7a8feb044115f0fa254" dependencies = [ "base64 0.21.0", "bytes", diff --git a/glados-audit/Cargo.toml b/glados-audit/Cargo.toml index c6aa8581..fa8be7b1 100644 --- a/glados-audit/Cargo.toml +++ b/glados-audit/Cargo.toml @@ -19,11 +19,12 @@ ethportal-api = { git = "https://github.com/ethereum/trin" } glados-core = { path = "../glados-core" } migration = { path = "../migration" } rand = "0.8.5" +reqwest = "0.11.16" sea-orm = "0.10.3" serde_json = "1.0.95" +thiserror = "1.0.40" tokio = "1.21.2" tracing = "0.1.37" trin-utils = {git = "https://github.com/ethereum/trin" } url = "2.3.1" web3 = "0.18.0" - diff --git a/glados-audit/src/cli.rs b/glados-audit/src/cli.rs index 4709d95a..54411d1c 100644 --- a/glados-audit/src/cli.rs +++ b/glados-audit/src/cli.rs @@ -8,18 +8,32 @@ const DEFAULT_DB_URL: &str = "sqlite::memory:"; #[derive(Parser, Debug, Eq, PartialEq)] #[command(author, version, about, long_about = None)] pub struct Args { + /// Database connection URL, such as SQLite or PostgreSQL. #[arg(short, long, default_value = DEFAULT_DB_URL)] pub database_url: String, + + /// IPC Path for connection to Portal node. #[arg(short, long, requires = "transport")] pub ipc_path: Option, + + /// HTTP URL for connection to Portal node. #[arg(short = 'u', long, requires = "transport")] pub http_url: Option, + + /// Portal node connection mode. #[arg(short, long)] pub transport: TransportType, + #[arg(short, long, default_value = "4", help = "number of auditing threads")] pub concurrency: u8, - #[arg(short, long, action(ArgAction::Append), value_enum, default_value = None, help = "Specific strategy to use. Default is to use all available strategies. May be passed multiple times for multiple strategies (--strategy latest --strategy random). Duplicates are permitted (--strategy random --strategy random).")] + + /// Specific strategy to use. Default is to use all available strategies. + /// May be passed multiple times for multiple strategies + /// (--strategy latest --strategy random). + /// Duplicates are permitted (--strategy random --strategy random). + #[arg(short, long, action(ArgAction::Append), value_enum, default_value = None)] pub strategy: Option>, + #[arg( short, long, @@ -48,14 +62,24 @@ pub struct Args { help = "relative weight of the 'random' strategy" )] pub random_strategy_weight: u8, + + /// Non-portal Ethereum execution node for validating data against. + #[arg(long, default_value = "http")] + pub trusted_provider: TrustedProvider, + + /// HTTP URL for connection to non-portal Ethereum execution node. + #[arg(long, requires = "trusted_provider")] + pub provider_http_url: Option, + + /// Pandaops URL for connection to non-portal Ethereum execution node. + #[arg(long, requires = "trusted_provider")] + pub provider_pandaops: Option, } impl Default for Args { fn default() -> Self { Self { database_url: DEFAULT_DB_URL.to_string(), - ipc_path: Default::default(), - http_url: Default::default(), transport: TransportType::HTTP, concurrency: 4, latest_strategy_weight: 1, @@ -63,6 +87,11 @@ impl Default for Args { oldest_strategy_weight: 1, random_strategy_weight: 1, strategy: None, + ipc_path: None, + http_url: None, + trusted_provider: TrustedProvider::HTTP, + provider_http_url: None, + provider_pandaops: None, } } } @@ -168,9 +197,22 @@ mod test { /// Used by a user to specify the intended form of transport /// to connect to a Portal node. -#[derive(Debug, Clone, Eq, PartialEq, ValueEnum)] +#[derive(Debug, Default, Clone, Eq, PartialEq, ValueEnum)] #[clap(rename_all = "snake_case")] pub enum TransportType { + #[default] + HTTP, IPC, +} + +/// Used by a user to specify the intended form of transport +/// to connect to a Portal node. +#[derive(Debug, Default, Clone, Eq, PartialEq, ValueEnum)] +#[clap(rename_all = "snake_case")] +pub enum TrustedProvider { + #[default] + /// An HTTP-based provider. HTTP, + /// A Trin operations provider. + Pandaops, } diff --git a/glados-audit/src/lib.rs b/glados-audit/src/lib.rs index 7915d35f..f9803ae3 100644 --- a/glados-audit/src/lib.rs +++ b/glados-audit/src/lib.rs @@ -1,28 +1,32 @@ use std::{ collections::HashMap, + env, sync::{ atomic::{AtomicU8, Ordering}, Arc, }, }; -use anyhow::{bail, Result}; +use anyhow::{anyhow, bail}; use clap::Parser; use cli::Args; +use entity::{ + content, + content_audit::{self, SelectionStrategy}, + execution_metadata, +}; use ethportal_api::{HistoryContentKey, OverlayContentKey}; +use glados_core::jsonrpc::{PortalClient, TransportConfig}; +use reqwest::header::{HeaderMap, HeaderValue}; use sea_orm::DatabaseConnection; use tokio::{ sync::mpsc::{self, Receiver}, time::{sleep, Duration}, }; use tracing::{debug, error, info}; - -use entity::{ - content, - content_audit::{self, SelectionStrategy}, - execution_metadata, -}; -use glados_core::jsonrpc::{PortalClient, TransportConfig}; +use url::Url; +use validation::Provider; +use web3::{transports::Http, Web3}; use crate::{ cli::TransportType, selection::start_audit_selection_task, validation::content_is_valid, @@ -45,10 +49,13 @@ pub struct AuditConfig { pub weights: HashMap, /// Number requests to a Portal node active at the same time. pub concurrency: u8, + /// An Ethereum execution node to validate content received from + /// the Portal node against. + pub trusted_provider: Provider, } impl AuditConfig { - pub fn from_args() -> Result { + pub fn from_args() -> anyhow::Result { let args = Args::parse(); let transport: TransportConfig = match args.transport { TransportType::IPC => match args.ipc_path { @@ -85,12 +92,51 @@ impl AuditConfig { }; weights.insert(strat.clone(), weight); } + let trusted_provider: Provider = match args.trusted_provider { + cli::TrustedProvider::HTTP => { + match args.provider_http_url{ + Some(url) => { + let transport = Http::new(url.as_str())?; + let w3 = Web3::new(transport); + Provider::Http(w3) + }, + None => bail!("The '--provider-http-url' flag is required if 'http' is selected for the '--trusted-provider'"), + } + }, + cli::TrustedProvider::Pandaops => { + match args.provider_pandaops { + Some(provider_url) => { + let mut headers = HeaderMap::new(); + let client_id = env::var("PANDAOPS_CLIENT_ID") + .map_err(|_| anyhow!("PANDAOPS_CLIENT_ID env var not set."))?; + let client_id = HeaderValue::from_str(&client_id); + let client_secret = env::var("PANDAOPS_CLIENT_SECRET") + .map_err(|_| anyhow!("PANDAOPS_CLIENT_SECRET env var not set."))?; + let client_secret = HeaderValue::from_str(&client_secret); + headers.insert("CF-Access-Client-Id", client_id?); + headers.insert("CF-Access-Client-Secret", client_secret?); + + let client = reqwest::Client::builder() + .default_headers(headers) + .build()?; + let url = Url::parse(&provider_url)?; + let transport = Http::with_client(client, url); + let w3 = Web3::new(transport); + Provider::Pandaops(w3) + }, + None => bail!("The '--provider-pandaops' flag is required if 'pandaops' is selected for the '--trusted-provider'"), + } + } + } + ; + Ok(AuditConfig { database_url: args.database_url, transport, strategies, weights, concurrency: args.concurrency, + trusted_provider, }) } } @@ -111,10 +157,10 @@ pub struct TaskChannel { pub async fn run_glados_audit(conn: DatabaseConnection, config: AuditConfig) { let mut task_channels: Vec = vec![]; - for strategy in config.strategies { + for strategy in &config.strategies { // Each strategy sends tasks to a separate channel. let (tx, rx) = mpsc::channel::(100); - let Some(weight) = config.weights.get(&strategy) else { + let Some(weight) = config.weights.get(strategy) else { error!(strategy=?strategy, "no weight for strategy"); return }; @@ -135,12 +181,7 @@ pub async fn run_glados_audit(conn: DatabaseConnection, config: AuditConfig) { let (collation_tx, collation_rx) = mpsc::channel::(100); tokio::spawn(start_collation(collation_tx, task_channels)); // Perform collated audit tasks. - tokio::spawn(perform_content_audits( - config.transport, - config.concurrency, - collation_rx, - conn, - )); + tokio::spawn(perform_content_audits(config, collation_rx, conn)); debug!("setting up CTRL+C listener"); tokio::signal::ctrl_c() .await @@ -173,19 +214,18 @@ async fn start_collation( } async fn perform_content_audits( - transport: TransportConfig, - concurrency: u8, + config: AuditConfig, mut rx: mpsc::Receiver, conn: DatabaseConnection, ) { let active_threads = Arc::new(AtomicU8::new(0)); loop { let active_count = active_threads.load(Ordering::Relaxed); - if active_count >= concurrency { + if active_count >= config.concurrency { // Each audit is performed in new thread if enough concurrency is available. debug!( active.threads = active_count, - max.threads = concurrency, + max.threads = config.concurrency, "Waiting for responses on all audit threads... Sleeping..." ); sleep(Duration::from_millis(5000)).await; @@ -194,7 +234,7 @@ async fn perform_content_audits( debug!( active.threads = active_count, - max.threads = concurrency, + max.threads = config.concurrency, "Checking Rx channel for audits" ); @@ -204,7 +244,7 @@ async fn perform_content_audits( tokio::spawn(perform_single_audit( active_threads.clone(), task, - transport.clone(), + config.clone(), conn.clone(), )) } @@ -222,10 +262,10 @@ async fn perform_content_audits( async fn perform_single_audit( active_threads: Arc, task: AuditTask, - transport: TransportConfig, + config: AuditConfig, conn: DatabaseConnection, ) { - let client = match PortalClient::from_config(&transport) { + let client = match PortalClient::from_config(&config.transport) { Ok(c) => c, Err(e) => { error!( @@ -252,9 +292,27 @@ async fn perform_single_audit( } }; - // If content was absent audit result is 'fail'. + // If content was absent or invalid the audit result is 'fail'. let audit_result = match content_response { - Some(content_bytes) => content_is_valid(&task.content_key, &content_bytes.raw), + Some(content_bytes) => { + match content_is_valid( + &config.trusted_provider, + &task.content_key, + &content_bytes.raw, + ) + .await + { + Ok(res) => res, + Err(e) => { + error!( + content.key=?task.content_key.to_hex(), + err=?e, + "Problem requesting validation from Trusted provider node."); + active_threads.fetch_sub(1, Ordering::Relaxed); + return; + } + } + } None => false, }; diff --git a/glados-audit/src/validation.rs b/glados-audit/src/validation.rs index 0c3707c3..92f5f3cc 100644 --- a/glados-audit/src/validation.rs +++ b/glados-audit/src/validation.rs @@ -1,57 +1,155 @@ -use ethportal_api::{ - BlockHeaderKey, ContentValue, HistoryContentKey, HistoryContentValue, OverlayContentKey, -}; +use anyhow::{anyhow, Context, Result}; +use ethportal_api::{ContentValue, HistoryContentKey, HistoryContentValue, OverlayContentKey}; +use thiserror::Error; use tracing::warn; use trin_utils::bytes::hex_encode; +use web3::{ + transports::Http, + types::{Block, BlockId, H256}, + Web3, +}; + +/// A connection to a trusted Ethereum execution node. +#[derive(Clone, Debug)] +pub enum Provider { + Http(Web3), + Pandaops(Web3), +} + +#[derive(Error, Debug)] +pub enum ValidationError { + #[error("unable to perform validation {0}")] + Infeasible(anyhow::Error), + #[error("content different to expected {0}")] + Invalid(#[from] anyhow::Error), +} + +impl Provider { + pub async fn get_block_header(&self, block_hash: &[u8; 32]) -> Result>> { + match self { + Provider::Http(web3) => { + let block_hash = BlockId::Hash(block_hash.into()); + Ok(web3.eth().block(block_hash).await?) + } + Provider::Pandaops(web3) => { + let block_hash = BlockId::Hash(block_hash.into()); + Ok(web3.eth().block(block_hash).await?) + } + } + } +} /// Checks that content bytes correspond to a correctly formatted /// content value. -pub fn content_is_valid(content_key: &HistoryContentKey, content_bytes: &[u8]) -> bool { +/// +/// Errors are logged. +pub async fn content_is_valid( + provider: &Provider, + content_key: &HistoryContentKey, + content_bytes: &[u8], +) -> Result { // check deserialization is valid let content: HistoryContentValue = match HistoryContentValue::decode(content_bytes) { Ok(c) => c, Err(e) => { warn!(content.value=hex_encode(content_bytes), err=?e, "could not deserialize content bytes"); - return false; + return Ok(false); } }; // check nature of content is valid + match check_content_correctness(content_key, content, provider).await { + Ok(_) => Ok(true), + Err(ValidationError::Invalid(e)) => { + warn!(content.value=hex_encode(content_bytes), err=?e, "content from portal node + is different to content from trusted provider"); + Ok(false) + } + Err(ValidationError::Infeasible(e)) => Err(ValidationError::Infeasible(e)), + } +} + +/// Returns an error if the content is different from that received from a trusted provider. +async fn check_content_correctness( + content_key: &HistoryContentKey, + content: HistoryContentValue, + provider: &Provider, +) -> Result<(), ValidationError> { match content { HistoryContentValue::BlockHeaderWithProof(h) => { // Reconstruct the key using the block header contents (RLP then hash). - let computed_hash = h.header.hash(); - let computed_key = HistoryContentKey::BlockHeaderWithProof(BlockHeaderKey { - block_hash: computed_hash.into(), - }); - match content_key == &computed_key { - true => true, - false => { - warn!( - content.key = content_key.to_hex(), - content.value = hex_encode(content_bytes), - "computed header hash did not match expected" - ); - false - } + let computed = h.header.hash().to_fixed_bytes(); + let trusted = block_hash_from_key(content_key)?; + if computed != trusted { + return Err(ValidationError::Invalid(anyhow!( + "computed header hash {} did not match expected {}", + hex_encode(computed), + hex_encode(trusted) + ))); } } HistoryContentValue::BlockBody(b) => { // Reconstruct the key using the block body contents. - let _computed_tx_root = b.transactions_root(); - let _computed_uncles_root = b.uncles_root(); - warn!("Need to call trusted provider to check block body correctness."); - true + let computed_tx_root = b.transactions_root()?; + let computed_uncles_root = b.uncles_root()?; + let block_hash = block_hash_from_key(content_key)?; + let header = fetch_block_header(&block_hash, provider).await?; + + if header.transactions_root != computed_tx_root { + return Err(ValidationError::Invalid(anyhow!( + "computed transactions root {} different from trusted provider {}", + hex_encode(computed_tx_root), + hex_encode(header.transactions_root) + ))); + }; + if header.uncles_hash != computed_uncles_root { + return Err(ValidationError::Invalid(anyhow!( + "computed uncles root {} different from trusted provider {}", + hex_encode(computed_uncles_root), + hex_encode(header.uncles_hash) + ))); + }; } HistoryContentValue::Receipts(r) => { // Reconstruct the key using the block body contents. - let _computed_receipts_root = r.root(); - warn!("Need to call trusted provider to check receipts correctness."); - true + let computed_receipts_root = r.root()?; + let block_hash = block_hash_from_key(content_key)?; + let header = fetch_block_header(&block_hash, provider).await?; + if header.receipts_root != computed_receipts_root { + return Err(ValidationError::Invalid(anyhow!( + "computed receipts root {} different from trusted provider {}", + hex_encode(computed_receipts_root), + hex_encode(header.receipts_root) + ))); + } } HistoryContentValue::EpochAccumulator(_e) => { - warn!("Need to check epoch master accumulator for correctness."); - true + warn!("epoch master accumulator structural check passed, but correctness check unimplemented.") } } + Ok(()) +} + +/// Calls trusted provider to get block header for given content key. +async fn fetch_block_header( + block_hash: &[u8; 32], + provider: &Provider, +) -> Result, ValidationError> { + let header = provider + .get_block_header(block_hash) + .await + .with_context(|| "unable to retrieve block header from trusted provider") + .map_err(ValidationError::Infeasible)?; + header + .ok_or_else(|| anyhow!("no block header available from trusted provider for validation")) + .map_err(ValidationError::Infeasible) +} + +// Removes the selector from the content key bytes to obtain the block hash. +fn block_hash_from_key(content_key: &HistoryContentKey) -> anyhow::Result<[u8; 32]> { + let key_bytes = content_key.to_bytes(); + let (_selector, block_hash_slice) = key_bytes.split_at(1); + block_hash_slice + .try_into() + .with_context(|| "unable to derive block hash from content key") }