Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

96 add correctness check #102

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion glados-audit/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,12 @@ ethportal-api = { git = "https://github.com/ethereum/trin" }
glados-core = { path = "../glados-core" }
migration = { path = "../migration" }
rand = "0.8.5"
reqwest = "0.11.16"
sea-orm = "0.10.3"
serde_json = "1.0.95"
thiserror = "1.0.40"
tokio = "1.21.2"
tracing = "0.1.37"
trin-utils = {git = "https://github.com/ethereum/trin" }
url = "2.3.1"
web3 = "0.18.0"

50 changes: 46 additions & 4 deletions glados-audit/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,18 +8,32 @@ const DEFAULT_DB_URL: &str = "sqlite::memory:";
#[derive(Parser, Debug, Eq, PartialEq)]
#[command(author, version, about, long_about = None)]
pub struct Args {
/// Database connection URL, such as SQLite or PostgreSQL.
#[arg(short, long, default_value = DEFAULT_DB_URL)]
pub database_url: String,

/// IPC Path for connection to Portal node.
#[arg(short, long, requires = "transport")]
pub ipc_path: Option<PathBuf>,

/// HTTP URL for connection to Portal node.
#[arg(short = 'u', long, requires = "transport")]
pub http_url: Option<Url>,

/// Portal node connection mode.
#[arg(short, long)]
pub transport: TransportType,

#[arg(short, long, default_value = "4", help = "number of auditing threads")]
pub concurrency: u8,
#[arg(short, long, action(ArgAction::Append), value_enum, default_value = None, help = "Specific strategy to use. Default is to use all available strategies. May be passed multiple times for multiple strategies (--strategy latest --strategy random). Duplicates are permitted (--strategy random --strategy random).")]

/// Specific strategy to use. Default is to use all available strategies.
/// May be passed multiple times for multiple strategies
/// (--strategy latest --strategy random).
/// Duplicates are permitted (--strategy random --strategy random).
#[arg(short, long, action(ArgAction::Append), value_enum, default_value = None)]
pub strategy: Option<Vec<SelectionStrategy>>,

#[arg(
short,
long,
Expand Down Expand Up @@ -48,21 +62,36 @@ pub struct Args {
help = "relative weight of the 'random' strategy"
)]
pub random_strategy_weight: u8,

/// Non-portal Ethereum execution node for validating data against.
#[arg(long, default_value = "http")]
pub trusted_provider: TrustedProvider,

/// HTTP URL for connection to non-portal Ethereum execution node.
#[arg(long, requires = "trusted_provider")]
pub provider_http_url: Option<Url>,

/// Pandaops URL for connection to non-portal Ethereum execution node.
#[arg(long, requires = "trusted_provider")]
pub provider_pandaops: Option<String>,
}

impl Default for Args {
fn default() -> Self {
Self {
database_url: DEFAULT_DB_URL.to_string(),
ipc_path: Default::default(),
http_url: Default::default(),
transport: TransportType::HTTP,
concurrency: 4,
latest_strategy_weight: 1,
failed_strategy_weight: 1,
oldest_strategy_weight: 1,
random_strategy_weight: 1,
strategy: None,
ipc_path: None,
http_url: None,
trusted_provider: TrustedProvider::HTTP,
provider_http_url: None,
provider_pandaops: None,
}
}
}
Expand Down Expand Up @@ -168,9 +197,22 @@ mod test {

/// Used by a user to specify the intended form of transport
/// to connect to a Portal node.
#[derive(Debug, Clone, Eq, PartialEq, ValueEnum)]
#[derive(Debug, Default, Clone, Eq, PartialEq, ValueEnum)]
#[clap(rename_all = "snake_case")]
pub enum TransportType {
#[default]
HTTP,
IPC,
}

/// Used by a user to specify the intended form of transport
/// to connect to a Portal node.
#[derive(Debug, Default, Clone, Eq, PartialEq, ValueEnum)]
#[clap(rename_all = "snake_case")]
pub enum TrustedProvider {
#[default]
/// An HTTP-based provider.
HTTP,
/// A Trin operations provider.
Pandaops,
}
112 changes: 85 additions & 27 deletions glados-audit/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,32 @@
use std::{
collections::HashMap,
env,
sync::{
atomic::{AtomicU8, Ordering},
Arc,
},
};

use anyhow::{bail, Result};
use anyhow::{anyhow, bail};
use clap::Parser;
use cli::Args;
use entity::{
content,
content_audit::{self, SelectionStrategy},
execution_metadata,
};
use ethportal_api::{HistoryContentKey, OverlayContentKey};
use glados_core::jsonrpc::{PortalClient, TransportConfig};
use reqwest::header::{HeaderMap, HeaderValue};
use sea_orm::DatabaseConnection;
use tokio::{
sync::mpsc::{self, Receiver},
time::{sleep, Duration},
};
use tracing::{debug, error, info};

use entity::{
content,
content_audit::{self, SelectionStrategy},
execution_metadata,
};
use glados_core::jsonrpc::{PortalClient, TransportConfig};
use url::Url;
use validation::Provider;
use web3::{transports::Http, Web3};

use crate::{
cli::TransportType, selection::start_audit_selection_task, validation::content_is_valid,
Expand All @@ -45,10 +49,13 @@ pub struct AuditConfig {
pub weights: HashMap<SelectionStrategy, u8>,
/// Number requests to a Portal node active at the same time.
pub concurrency: u8,
/// An Ethereum execution node to validate content received from
/// the Portal node against.
pub trusted_provider: Provider,
}

impl AuditConfig {
pub fn from_args() -> Result<AuditConfig> {
pub fn from_args() -> anyhow::Result<AuditConfig> {
let args = Args::parse();
let transport: TransportConfig = match args.transport {
TransportType::IPC => match args.ipc_path {
Expand Down Expand Up @@ -85,12 +92,51 @@ impl AuditConfig {
};
weights.insert(strat.clone(), weight);
}
let trusted_provider: Provider = match args.trusted_provider {
cli::TrustedProvider::HTTP => {
match args.provider_http_url{
Some(url) => {
let transport = Http::new(url.as_str())?;
let w3 = Web3::new(transport);
Provider::Http(w3)
},
None => bail!("The '--provider-http-url' flag is required if 'http' is selected for the '--trusted-provider'"),
}
},
cli::TrustedProvider::Pandaops => {
match args.provider_pandaops {
Some(provider_url) => {
let mut headers = HeaderMap::new();
let client_id = env::var("PANDAOPS_CLIENT_ID")
.map_err(|_| anyhow!("PANDAOPS_CLIENT_ID env var not set."))?;
let client_id = HeaderValue::from_str(&client_id);
let client_secret = env::var("PANDAOPS_CLIENT_SECRET")
.map_err(|_| anyhow!("PANDAOPS_CLIENT_SECRET env var not set."))?;
let client_secret = HeaderValue::from_str(&client_secret);
headers.insert("CF-Access-Client-Id", client_id?);
headers.insert("CF-Access-Client-Secret", client_secret?);

let client = reqwest::Client::builder()
.default_headers(headers)
.build()?;
let url = Url::parse(&provider_url)?;
let transport = Http::with_client(client, url);
let w3 = Web3::new(transport);
Provider::Pandaops(w3)
},
None => bail!("The '--provider-pandaops' flag is required if 'pandaops' is selected for the '--trusted-provider'"),
}
}
}
;

Ok(AuditConfig {
database_url: args.database_url,
transport,
strategies,
weights,
concurrency: args.concurrency,
trusted_provider,
})
}
}
Expand All @@ -111,10 +157,10 @@ pub struct TaskChannel {

pub async fn run_glados_audit(conn: DatabaseConnection, config: AuditConfig) {
let mut task_channels: Vec<TaskChannel> = vec![];
for strategy in config.strategies {
for strategy in &config.strategies {
// Each strategy sends tasks to a separate channel.
let (tx, rx) = mpsc::channel::<AuditTask>(100);
let Some(weight) = config.weights.get(&strategy) else {
let Some(weight) = config.weights.get(strategy) else {
error!(strategy=?strategy, "no weight for strategy");
return
};
Expand All @@ -135,12 +181,7 @@ pub async fn run_glados_audit(conn: DatabaseConnection, config: AuditConfig) {
let (collation_tx, collation_rx) = mpsc::channel::<AuditTask>(100);
tokio::spawn(start_collation(collation_tx, task_channels));
// Perform collated audit tasks.
tokio::spawn(perform_content_audits(
config.transport,
config.concurrency,
collation_rx,
conn,
));
tokio::spawn(perform_content_audits(config, collation_rx, conn));
debug!("setting up CTRL+C listener");
tokio::signal::ctrl_c()
.await
Expand Down Expand Up @@ -173,19 +214,18 @@ async fn start_collation(
}

async fn perform_content_audits(
transport: TransportConfig,
concurrency: u8,
config: AuditConfig,
mut rx: mpsc::Receiver<AuditTask>,
conn: DatabaseConnection,
) {
let active_threads = Arc::new(AtomicU8::new(0));
loop {
let active_count = active_threads.load(Ordering::Relaxed);
if active_count >= concurrency {
if active_count >= config.concurrency {
// Each audit is performed in new thread if enough concurrency is available.
debug!(
active.threads = active_count,
max.threads = concurrency,
max.threads = config.concurrency,
"Waiting for responses on all audit threads... Sleeping..."
);
sleep(Duration::from_millis(5000)).await;
Expand All @@ -194,7 +234,7 @@ async fn perform_content_audits(

debug!(
active.threads = active_count,
max.threads = concurrency,
max.threads = config.concurrency,
"Checking Rx channel for audits"
);

Expand All @@ -204,7 +244,7 @@ async fn perform_content_audits(
tokio::spawn(perform_single_audit(
active_threads.clone(),
task,
transport.clone(),
config.clone(),
conn.clone(),
))
}
Expand All @@ -222,10 +262,10 @@ async fn perform_content_audits(
async fn perform_single_audit(
active_threads: Arc<AtomicU8>,
task: AuditTask,
transport: TransportConfig,
config: AuditConfig,
conn: DatabaseConnection,
) {
let client = match PortalClient::from_config(&transport) {
let client = match PortalClient::from_config(&config.transport) {
Ok(c) => c,
Err(e) => {
error!(
Expand All @@ -252,9 +292,27 @@ async fn perform_single_audit(
}
};

// If content was absent audit result is 'fail'.
// If content was absent or invalid the audit result is 'fail'.
let audit_result = match content_response {
Some(content_bytes) => content_is_valid(&task.content_key, &content_bytes.raw),
Some(content_bytes) => {
match content_is_valid(
&config.trusted_provider,
&task.content_key,
&content_bytes.raw,
)
.await
{
Ok(res) => res,
Err(e) => {
error!(
content.key=?task.content_key.to_hex(),
err=?e,
"Problem requesting validation from Trusted provider node.");
active_threads.fetch_sub(1, Ordering::Relaxed);
return;
}
}
}
None => false,
};

Expand Down
Loading