From 04cd476ea2cbeac30c1e505b4f2d51ebb8941de3 Mon Sep 17 00:00:00 2001 From: Theo Butler Date: Mon, 17 Oct 2022 18:30:15 -0400 Subject: [PATCH] Add support for indexer subgraph pruning (#236) --- graph-gateway/src/indexer_status.rs | 7 + graph-gateway/src/main.rs | 1 + graph-gateway/src/query_engine.rs | 22 +-- graph-gateway/src/tests.rs | 155 +++++++++++---------- indexer-selection/src/lib.rs | 16 +-- indexer-selection/src/selection_factors.rs | 26 +++- indexer-selection/src/simulation.rs | 5 +- prelude/src/bytes.rs | 32 ++++- 8 files changed, 156 insertions(+), 108 deletions(-) diff --git a/graph-gateway/src/indexer_status.rs b/graph-gateway/src/indexer_status.rs index c72a6e61..8f6b07de 100644 --- a/graph-gateway/src/indexer_status.rs +++ b/graph-gateway/src/indexer_status.rs @@ -20,6 +20,7 @@ pub struct Data { pub struct IndexingStatus { pub network: String, pub block: BlockPointer, + pub min_block: Option, pub cost_model: Option>, } @@ -186,6 +187,7 @@ impl Actor { chains { network latestBlock { number hash } + earliestBlock { number hash } } } }"# }); @@ -248,6 +250,10 @@ impl Actor { number: block_status.number.parse().ok()?, hash: block_status.hash.clone(), }, + min_block: chain + .earliest_block + .as_ref() + .and_then(|b| b.number.parse::().ok()), cost_model, }; Some((indexing, status)) @@ -295,6 +301,7 @@ struct IndexingStatusResponse { struct ChainStatus { network: String, latest_block: Option, + earliest_block: Option, } #[derive(Deserialize)] diff --git a/graph-gateway/src/main.rs b/graph-gateway/src/main.rs index 52ccd00e..2bcebb8b 100644 --- a/graph-gateway/src/main.rs +++ b/graph-gateway/src/main.rs @@ -407,6 +407,7 @@ async fn write_indexer_inputs( reported_number: status.block.number, blocks_behind: latest.saturating_sub(status.block.number), behind_reported_block: false, + min_block: status.min_block, }), }, ); diff --git a/graph-gateway/src/query_engine.rs b/graph-gateway/src/query_engine.rs index cc17455b..e70a10fc 100644 --- a/graph-gateway/src/query_engine.rs +++ b/graph-gateway/src/query_engine.rs @@ -12,7 +12,7 @@ use crate::{ }; use futures::future::join_all; use indexer_selection::{ - self, actor::IndexerErrorObservation, Context, FreshnessRequirements, IndexerError, + self, actor::IndexerErrorObservation, BlockRequirements, Context, IndexerError, IndexerPreferences, IndexerScore, ScoringSample, Selection, SelectionError, UnresolvedBlock, }; use indexer_selection::{actor::Update, Indexing, UtilityConfig}; @@ -263,7 +263,7 @@ where .ok_or_else(|| NetworkNotSupported(subgraph.network.clone()))?; let block_constraints = block_constraints(&context).ok_or(MalformedQuery)?; - let block_requirements = join_all( + let resolved_blocks = join_all( block_constraints .iter() .filter_map(|constraint| constraint.clone().into_unresolved()) @@ -272,9 +272,10 @@ where .await .into_iter() .collect::, UnresolvedBlock>>()?; - - let freshness_requirements = FreshnessRequirements { - minimum_block: block_requirements.iter().map(|b| b.number).max(), + let min_block = resolved_blocks.iter().map(|b| b.number).min(); + let max_block = resolved_blocks.iter().map(|b| b.number).max(); + let block_requirements = BlockRequirements { + range: min_block.map(|min| (min, max_block.unwrap())), has_latest: block_constraints.iter().any(|c| match c { BlockConstraint::Unconstrained | BlockConstraint::NumberGTE(_) => true, BlockConstraint::Hash(_) | BlockConstraint::Number(_) => false, @@ -282,10 +283,9 @@ where }; // Reject queries for blocks before minimum start block of subgraph manifest. - match freshness_requirements.minimum_block { - Some(min_block) if min_block < subgraph.min_block => return Err(BlockBeforeMin), - _ => (), - }; + if matches!(min_block, Some(min_block) if min_block < subgraph.min_block) { + return Err(BlockBeforeMin); + } let query_count = context.operations.len().max(1) as u64; let api_key = query.api_key.as_ref().unwrap(); @@ -360,7 +360,7 @@ where latest_block.number, &deployment_indexers, budget, - &freshness_requirements, + &block_requirements, )?; drop(selection_timer); @@ -394,7 +394,7 @@ where let blocks_behind = selection.score.blocks_behind + (latest_unresolved / 2); let latest_query_block = block_cache.latest(blocks_behind).await?; let deterministic_query = - make_query_deterministic(context, &block_requirements, &latest_query_block) + make_query_deterministic(context, &resolved_blocks, &latest_query_block) .ok_or(MalformedQuery)?; let indexing = selection.indexing; diff --git a/graph-gateway/src/tests.rs b/graph-gateway/src/tests.rs index b2e3ee04..e7b5babf 100644 --- a/graph-gateway/src/tests.rs +++ b/graph-gateway/src/tests.rs @@ -1,4 +1,5 @@ use crate::{ + block_constraints::{block_constraints, BlockConstraint}, chains::{self, test::Provider, BlockCache}, fisherman_client::*, indexer_client::*, @@ -12,20 +13,18 @@ use async_trait::async_trait; use indexer_selection::{ actor::{IndexerUpdate, Update}, test_utils::{default_cost_model, test_allocation_id, TEST_KEY}, - BlockStatus, IndexerError, IndexerInfo, Indexing, IndexingStatus, SecretKey, Selection, - UnresolvedBlock, + BlockStatus, Context, IndexerError, IndexerInfo, Indexing, IndexingStatus, SecretKey, + Selection, UnresolvedBlock, }; use prelude::{ buffer_queue::{self, QueueWriter}, decimal, double_buffer, + rand::thread_rng, test_utils::*, *, }; use rand::{ - distributions, - rngs::{OsRng, SmallRng}, - seq::SliceRandom, - Rng, RngCore as _, SeedableRng, + distributions, rngs::SmallRng, seq::SliceRandom as _, Rng as _, RngCore as _, SeedableRng as _, }; use serde_json::json; use std::{ @@ -115,17 +114,18 @@ struct IndexerTopology { staked_grt: TokenAmount, allocated_grt: TokenAmount, blocks_behind: usize, + min_block: Option, fee: TokenAmount, indexer_err: bool, challenge_outcome: ChallengeOutcome, } impl IndexerTopology { - fn block(&self, blocks: usize) -> usize { + fn block(&self, blocks: usize) -> u64 { if blocks == 0 { return 0; } - blocks - self.blocks_behind(blocks) - 1 + (blocks - self.blocks_behind(blocks) - 1) as u64 } fn blocks_behind(&self, blocks: usize) -> usize { @@ -185,7 +185,16 @@ impl Topology { .choose(&mut self.rng) .unwrap() .clone(); - let query_body = if self.flip_coin(32) { "?" } else { BASIC_QUERY }; + let network = self.networks.get(&deployment.network).unwrap(); + let query_body = if self.rng.gen_bool(0.01) { + "?".to_string() + } else { + let constraints = match (self.rng.gen_bool(0.1), network.blocks.choose(&mut self.rng)) { + (true, Some(block)) => format!("(block:{{number:{}}})", block.number), + _ => "".to_string(), + }; + format!("{{ entities{} {{ id }} }}", constraints) + }; let mut query = Query::new("".into(), query_body.into(), None); query.api_key = Some(Arc::new(APIKey::default())); query.subgraph = Some(Ptr::new(SubgraphInfo { @@ -203,7 +212,7 @@ impl Topology { name: self.gen_str(log_2(*self.config.networks.end()).max(1)), blocks: Vec::new(), }; - let block_count = self.gen_len(self.config.blocks.clone(), 32); + let block_count = self.gen_len(self.config.blocks.clone()); for i in 0..block_count { network.blocks.push(self.gen_block(i as u64)); } @@ -215,7 +224,7 @@ impl Topology { id: self.gen_bytes().into(), deployments: Vec::new(), }; - for _ in 0..self.gen_len(self.config.deployments.clone(), 32) { + for _ in 0..self.gen_len(self.config.deployments.clone()) { subgraph.deployments.push(self.gen_deployment()); } subgraph @@ -235,7 +244,7 @@ impl Topology { indexings: Vec::new(), }; let indexers = self.indexers.keys().cloned().collect::>(); - for _ in 0..self.gen_len(self.config.indexings.clone(), 32) { + for _ in 0..self.gen_len(self.config.indexings.clone()) { match indexers.choose(&mut self.rng) { None => break, Some(id) if deployment.indexings.contains(id) => continue, @@ -246,13 +255,20 @@ impl Topology { } fn gen_indexer(&mut self) -> IndexerTopology { + let block_range = 0..=*self.config.blocks.end(); + let min_block = if self.rng.gen_bool(0.1) { + Some(self.rng.gen_range(block_range.clone()) as u64) + } else { + None + }; IndexerTopology { id: self.gen_bytes().into(), staked_grt: self.gen_amount(), allocated_grt: self.gen_amount(), - blocks_behind: self.rng.gen_range(0..=*self.config.blocks.end()), + blocks_behind: self.rng.gen_range(block_range), + min_block, fee: self.gen_amount(), - indexer_err: self.flip_coin(16), + indexer_err: self.rng.gen_bool(0.01), challenge_outcome: self.gen_challenge_outcome(), } } @@ -273,7 +289,7 @@ impl Topology { } fn gen_challenge_outcome(&mut self) -> ChallengeOutcome { - if self.flip_coin(16) { + if self.rng.gen_bool(0.1) { *[ ChallengeOutcome::FailedToProvideAttestation, ChallengeOutcome::DisagreeWithTrustedIndexer, @@ -305,19 +321,14 @@ impl Topology { bytes } - #[inline] - fn flip_coin(&mut self, fraction: u64) -> bool { - (self.rng.next_u64() % fraction) == 0 - } - /// Generate a length in the given range that should only be zero a small fraction of the time. /// This is done to ensure that generated test cases have a reasonable probability to have /// input components necessary to execute a complete query, while also covering the zero cases. - fn gen_len(&mut self, range: RangeInclusive, zero_fraction: u64) -> usize { + fn gen_len(&mut self, range: RangeInclusive) -> usize { if range.end() == &0 { return 0; } - if range.contains(&0) && self.flip_coin(zero_fraction) { + if range.contains(&0) && self.rng.gen_bool(0.05) { return 0; } else { loop { @@ -384,9 +395,10 @@ impl Topology { )])), cost_model: cost_model.clone(), block: Some(BlockStatus { - reported_number: indexer.block(network.blocks.len()) as u64, - blocks_behind: indexer.blocks_behind as u64, + reported_number: indexer.block(network.blocks.len()), + blocks_behind: indexer.blocks_behind(network.blocks.len()) as u64, behind_reported_block: false, + min_block: indexer.min_block, }), }; Some((deployment.id, update)) @@ -429,6 +441,8 @@ impl Topology { query: &Query, result: Result<(), QueryEngineError>, ) -> Result<(), Vec> { + use QueryEngineError::*; + let mut trace = Vec::new(); trace.push(format!("result: {:?}", result)); trace.push(format!("{:#?}", query)); @@ -445,8 +459,31 @@ impl Topology { .map(|id| self.indexers.get(id).unwrap()) .collect::>(); + if indexers.is_empty() { + return Self::expect_err(&mut trace, &result, NoIndexers); + } + + let context = match Context::new(&query.query, "") { + Ok(context) => context, + Err(_) => return Self::expect_err(&mut trace, &result, MalformedQuery), + }; + let required_block = match block_constraints(&context).unwrap().into_iter().next() { + None | Some(BlockConstraint::Unconstrained) => None, + Some(BlockConstraint::Number(n)) => Some(n), + Some(constraint) => unreachable!("unexpected constraint: {:?}", constraint), + }; + + let blocks = &self.networks.get(&subgraph.network).unwrap().blocks; + if blocks.is_empty() { + return Self::expect_err( + &mut trace, + &result, + MissingBlock(UnresolvedBlock::WithNumber(0)), + ); + } + // Valid indexers have the following properties: - fn valid_indexer(indexer: &IndexerTopology) -> bool { + let valid_indexer = |indexer: &IndexerTopology| -> bool { // no failure to indexing the subgraph !indexer.indexer_err // more than zero stake @@ -455,7 +492,11 @@ impl Topology { && (indexer.allocated_grt > TokenAmount::Zero) // fee <= budget && (indexer.fee <= TokenAmount::Enough) - } + // valid minimum block + && required_block.and_then(|required| Some(indexer.min_block? <= required)).unwrap_or(true) + // indexed required block + && required_block.map(|required| indexer.block(blocks.len()) >= required).unwrap_or(true) + }; let valid = indexers .iter() .cloned() @@ -464,8 +505,19 @@ impl Topology { trace.push(format!("valid indexers: {:#?}", valid)); if query.indexer_attempts.is_empty() { - return self - .check_no_attempts(&mut trace, query, &result, &subgraph, &indexers, &valid); + if !valid.is_empty() { + return Self::err_with( + &mut trace, + format!("expected no valid indexer, got {}", valid.len()), + ); + } + if matches!(result, Err(NoIndexerSelected) | Err(FeesTooHigh(_))) { + return Ok(()); + } + return Self::err_with( + &mut trace, + format!("expected no valid indexers, got {:?}", result), + ); } let mut failed_attempts = query.indexer_attempts.clone(); @@ -491,49 +543,6 @@ impl Topology { } } - fn check_no_attempts( - &self, - trace: &mut Vec, - query: &Query, - result: &Result<(), QueryEngineError>, - subgraph: &SubgraphInfo, - indexers: &[&IndexerTopology], - valid: &[&IndexerTopology], - ) -> Result<(), Vec> { - use QueryEngineError::*; - if indexers.is_empty() { - return Self::expect_err(trace, result, NoIndexers); - } - if query.query.as_ref() == "?" { - return Self::expect_err(trace, result, MalformedQuery); - } - if self - .networks - .get(&subgraph.network) - .unwrap() - .blocks - .is_empty() - { - return Self::expect_err(trace, result, MissingBlock(UnresolvedBlock::WithNumber(0))); - } - - if !valid.is_empty() { - return Self::err_with( - trace, - format!("expected no valid indexer, got {}", valid.len()), - ); - } - - let high_fee_count = indexers - .iter() - .filter(|indexer| indexer.fee > TokenAmount::Enough) - .count(); - if high_fee_count > 0 { - return Self::expect_err(trace, result, QueryEngineError::FeesTooHigh(high_fee_count)); - } - Self::expect_err(trace, result, NoIndexerSelected) - } - fn check_failed_attempt( &self, trace: &mut Vec, @@ -631,7 +640,7 @@ impl IndexerInterface for TopologyIndexer { let matcher = Regex::new(r#"block: \{hash: \\"0x([[:xdigit:]]+)\\"}"#).unwrap(); for capture in matcher.captures_iter(&query) { let hash = capture.get(1).unwrap().as_str().parse::().unwrap(); - let number = blocks.iter().position(|block| block.hash == hash).unwrap(); + let number = blocks.iter().position(|block| block.hash == hash).unwrap() as u64; if number > indexer.block(blocks.len()) { return Err(IndexerError::Other(json!({ "errors": vec![json!({ @@ -701,7 +710,7 @@ async fn query_engine() { let seed = env::vars() .find(|(k, _)| k == "TEST_SEED") .and_then(|(_, v)| v.parse::().ok()) - .unwrap_or(OsRng.next_u64()); + .unwrap_or(thread_rng().next_u64()); tracing::info!(%seed); let rng = SmallRng::seed_from_u64(seed); for _ in 0..10 { diff --git a/indexer-selection/src/lib.rs b/indexer-selection/src/lib.rs index bc7eb27b..a3d20ead 100644 --- a/indexer-selection/src/lib.rs +++ b/indexer-selection/src/lib.rs @@ -46,7 +46,7 @@ pub enum SelectionError { #[derive(Debug, Clone, Eq, PartialEq)] pub enum BadIndexerReason { - BehindMinimumBlock, + MissingRequiredBlock, MissingIndexingStatus, QueryNotCosted, FeeTooHigh, @@ -108,9 +108,9 @@ pub struct UtilityConfig { } #[derive(Default, Debug, Eq, PartialEq)] -pub struct FreshnessRequirements { - /// If specified, the subgraph must have indexed up to at least this number. - pub minimum_block: Option, +pub struct BlockRequirements { + /// Range of blocks specified in the query + pub range: Option<(u64, u64)>, /// If true, the query has an unspecified block which means the query benefits from syncing as /// far in the future as possible. pub has_latest: bool, @@ -202,7 +202,7 @@ impl State { latest_block: u64, indexers: &[Address], budget: GRT, - freshness_requirements: &FreshnessRequirements, + requirements: &BlockRequirements, ) -> Result<(Vec, Option), SelectionError> { let mut scores = Vec::new(); let mut high_fee_count = 0; @@ -228,7 +228,7 @@ impl State { latest_block, budget, config, - freshness_requirements, + requirements, restricted, ); // TODO: these logs are currently required for data science. However, we would like to omit these in production and only use the sampled scoring logs. @@ -313,7 +313,7 @@ impl State { latest_block: u64, budget: GRT, config: &UtilityConfig, - freshness_requirements: &FreshnessRequirements, + requirements: &BlockRequirements, restricted: bool, ) -> Result { let mut aggregator = UtilityAggregator::new(); @@ -357,7 +357,7 @@ impl State { aggregator.add(reputation); let data_freshness = selection_factors.expected_freshness_utility( - freshness_requirements, + requirements, config.data_freshness, latest_block, )?; diff --git a/indexer-selection/src/selection_factors.rs b/indexer-selection/src/selection_factors.rs index bbbddbbc..749e1fe0 100644 --- a/indexer-selection/src/selection_factors.rs +++ b/indexer-selection/src/selection_factors.rs @@ -1,6 +1,6 @@ use crate::{ decay::DecayBuffer, performance::*, price_efficiency::*, reputation::*, utility::*, - BadIndexerReason, Context, FreshnessRequirements, SelectionError, + BadIndexerReason, BlockRequirements, Context, SelectionError, }; use cost_model::CostModel; use prelude::*; @@ -30,6 +30,7 @@ pub struct BlockStatus { pub reported_number: u64, pub blocks_behind: u64, pub behind_reported_block: bool, + pub min_block: Option, } impl SelectionFactors { @@ -104,7 +105,7 @@ impl SelectionFactors { pub fn expected_freshness_utility( &self, - requirements: &FreshnessRequirements, + requirements: &BlockRequirements, utility_parameters: UtilityParameters, latest_block: u64, ) -> Result { @@ -114,11 +115,8 @@ impl SelectionFactors { .as_ref() .ok_or(BadIndexerReason::MissingIndexingStatus)?; // Check that the indexer has synced at least up to any minimum block required. - if let Some(minimum) = requirements.minimum_block { - let indexer_latest = latest_block.saturating_sub(status.blocks_behind); - if indexer_latest < minimum { - return Err(BadIndexerReason::BehindMinimumBlock.into()); - } + if !self.meets_requirements(requirements, latest_block) { + return Err(BadIndexerReason::MissingRequiredBlock.into()); } // Add utility if the latest block is requested. Otherwise, data freshness is not a utility, // but a binary of minimum block. Note that it can be both. @@ -134,6 +132,20 @@ impl SelectionFactors { }) } + fn meets_requirements(&self, requirements: &BlockRequirements, latest_block: u64) -> bool { + let status = match self.status.block.as_ref() { + Some(status) => status, + None => return false, + }; + let (min, max) = match requirements.range { + Some(range) => range, + None => return true, + }; + let min_block = status.min_block.unwrap_or(0); + let expected_block_status = latest_block.saturating_sub(status.blocks_behind); + (min_block <= min) && (max <= expected_block_status) + } + pub fn total_allocation(&self) -> GRT { self.status .allocations diff --git a/indexer-selection/src/simulation.rs b/indexer-selection/src/simulation.rs index 0062c95c..5ea5b5bd 100644 --- a/indexer-selection/src/simulation.rs +++ b/indexer-selection/src/simulation.rs @@ -79,6 +79,7 @@ pub async fn simulate( .saturating_sub(characteristics.blocks_behind), blocks_behind: characteristics.blocks_behind, behind_reported_block: false, + min_block: None, }), }, ); @@ -99,8 +100,8 @@ pub async fn simulate( } let mut context = Context::new("{ a }", "").unwrap(); - let freshness_requirements = FreshnessRequirements { - minimum_block: None, + let freshness_requirements = BlockRequirements { + range: None, has_latest: true, }; let latest_block = blocks.last().unwrap().number; diff --git a/prelude/src/bytes.rs b/prelude/src/bytes.rs index 58f5b9ac..ea50a1f2 100644 --- a/prelude/src/bytes.rs +++ b/prelude/src/bytes.rs @@ -35,11 +35,19 @@ macro_rules! bytes_wrapper { ($vis:vis, $id:ident, $len:expr, "HexStr") => { bytes_wrapper!($vis, $id, $len); impl FromStr for $id { - type Err = hex::FromHexError; + type Err = anyhow::Error; fn from_str(s: &str) -> Result { - let mut bytes = [0u8; $len]; + use std::io::Write as _; + let offset = if s.starts_with("0x") {2} else {0}; - hex::decode_to_slice(&s[offset..], &mut bytes)?; + anyhow::ensure!(s[offset..].len() <= $len * 2, "input too long"); + let s = &s[offset..]; + + let mut input = std::io::Cursor::new(['0' as u8; $len * 2]); + input.set_position(($len * 2) - s.len() as u64); + input.write_all(s.as_bytes())?; + let mut bytes = [0u8; $len]; + hex::decode_to_slice(&input.into_inner(), &mut bytes)?; Ok(Self(bytes)) } } @@ -172,9 +180,19 @@ impl fmt::Display for InvalidIPFSHash { #[cfg(test)] mod tests { use super::*; + use crate::test_utils::bytes_from_id; + use rand::{thread_rng, Rng as _}; + + #[test] + fn parse_0_fill() { + for n in [0, 42, 0xefbeadde, thread_rng().gen()] { + let bytes = bytes_from_id(n); + assert_eq!(&hex::encode(&bytes).parse::().unwrap().0, &bytes); + } + } - #[tokio::test] - async fn subgraph_id_encoding() { + #[test] + fn subgraph_id_encoding() { let bytes = hex::decode("67486e65165b1474898247760a4b852d70d95782c6325960e5b6b4fd82fed1bd") .unwrap(); let v1 = "0xdeadbeef678b513255cea949017921c8c9f6ef82-1"; @@ -190,8 +208,8 @@ mod tests { assert_eq!(id1, id2); } - #[tokio::test] - async fn subgraph_deployment_id_encoding() { + #[test] + fn subgraph_deployment_id_encoding() { let ipfs_hash = "QmWmyoMoctfbAaiEs2G46gpeUmhqFRDW6KWo64y5r581Vz"; let mut bytes = [0u8; 32]; bytes.clone_from_slice(