From 2634e7e6219dfc15bcb0651254ec84af2db79c16 Mon Sep 17 00:00:00 2001 From: Theo Butler Date: Mon, 18 Nov 2024 12:31:47 -0500 Subject: [PATCH] revert "feat: remove query rewrites (#975)" This reverts commit faadab370713fc14be82814dcebc2fde3a640a29. --- Cargo.lock | 100 +++++++++++++++- Cargo.toml | 1 + src/block_constraints.rs | 207 ++++++++++++++++++++++----------- src/chain.rs | 73 +++++------- src/chains.rs | 91 +++++++++++++-- src/client_query.rs | 197 +++++++++++++------------------ src/indexer_client.rs | 61 ++++++++-- src/network/subgraph_client.rs | 5 +- src/reports.rs | 7 +- 9 files changed, 486 insertions(+), 256 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 07aab15de..ff0361c8f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1364,6 +1364,22 @@ version = "0.8.7" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "773648b94d0e5d620f64f280777445740e61fe701025087ec8b57f45c791888b" +[[package]] +name = "cost-model" +version = "0.1.0" +source = "git+https://github.com/graphprotocol/agora?rev=e9530de#e9530de5f782d68ed409e2a18c62ec532db23737" +dependencies = [ + "firestorm", + "fraction", + "graphql 0.2.0", + "itertools 0.13.0", + "lazy_static", + "nom", + "num-bigint", + "num-traits", + "serde_json", +] + [[package]] name = "cpufeatures" version = "0.2.15" @@ -2210,6 +2226,16 @@ dependencies = [ "percent-encoding", ] +[[package]] +name = "fraction" +version = "0.15.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f158e3ff0a1b334408dc9fb811cd99b446986f4d8b741bb08f9df1604085ae7" +dependencies = [ + "lazy_static", + "num", +] + [[package]] name = "fs2" version = "0.4.3" @@ -2406,11 +2432,12 @@ dependencies = [ "anyhow", "assert_matches", "axum", + "cost-model", "custom_debug", "ethers", "faster-hex", "futures", - "graphql", + "graphql 0.3.0", "headers", "hex", "hickory-resolver", @@ -2454,6 +2481,16 @@ dependencies = [ "url", ] +[[package]] +name = "graphql" +version = "0.2.0" +source = "git+https://github.com/edgeandnode/toolshed?tag=graphql-v0.2.0#2df5ee975656027d4a7fbf591baf6a29dfbe0ee6" +dependencies = [ + "firestorm", + "graphql-parser", + "serde", +] + [[package]] name = "graphql" version = "0.3.0" @@ -3467,6 +3504,12 @@ version = "0.3.17" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" +[[package]] +name = "minimal-lexical" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "68354c5c6bd36d73ff3feceb05efa59b6acb7626617f4962be322a825e61f79a" + [[package]] name = "miniz_oxide" version = "0.8.0" @@ -3511,6 +3554,16 @@ version = "1.0.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "650eef8c711430f1a879fdd01d4745a7deea475becfb90269c06775983bbf086" +[[package]] +name = "nom" +version = "7.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d273983c5a657a70a3e8f2a01329822f3b8c8172b73826411a55751e404a0a4a" +dependencies = [ + "memchr", + "minimal-lexical", +] + [[package]] name = "nu-ansi-term" version = "0.46.0" @@ -3521,6 +3574,20 @@ dependencies = [ "winapi", ] +[[package]] +name = "num" +version = "0.4.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35bd024e8b2ff75562e5f34e7f4905839deb4b22955ef5e73d2fea1b9813cb23" +dependencies = [ + "num-bigint", + "num-complex", + "num-integer", + "num-iter", + "num-rational", + "num-traits", +] + [[package]] name = "num-bigint" version = "0.4.6" @@ -3531,6 +3598,15 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-complex" +version = "0.4.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73f88a1307638156682bada9d7604135552957b7818057dcef22705b4d509495" +dependencies = [ + "num-traits", +] + [[package]] name = "num-conv" version = "0.1.0" @@ -3546,6 +3622,28 @@ dependencies = [ "num-traits", ] +[[package]] +name = "num-iter" +version = "0.1.45" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1429034a0490724d0075ebb2bc9e875d6503c3cf69e235a8941aa757d83ef5bf" +dependencies = [ + "autocfg", + "num-integer", + "num-traits", +] + +[[package]] +name = "num-rational" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f83d14da390562dca69fc84082e73e548e1ad308d24accdedd2720017cb37824" +dependencies = [ + "num-bigint", + "num-integer", + "num-traits", +] + [[package]] name = "num-traits" version = "0.2.19" diff --git a/Cargo.toml b/Cargo.toml index 0258dfea1..454d4aee9 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -21,6 +21,7 @@ axum = { version = "0.7.7", default-features = false, features = [ "tokio", "http1", ] } +cost-model = { git = "https://github.com/graphprotocol/agora", rev = "e9530de" } custom_debug = "0.6.1" ethers = "2.0.14" faster-hex = "0.10.0" diff --git a/src/block_constraints.rs b/src/block_constraints.rs index 6757f4a4c..0388dc008 100644 --- a/src/block_constraints.rs +++ b/src/block_constraints.rs @@ -1,15 +1,16 @@ -use std::collections::{BTreeMap, BTreeSet, HashMap}; +use std::{ + collections::{BTreeMap, BTreeSet}, + fmt::Write as _, +}; use anyhow::{anyhow, bail}; +use cost_model::Context; use graphql::{ - graphql_parser::{ - self, - query::{Definition, OperationDefinition, Selection, Text, Value, VariableDefinition}, - }, - IntoStaticValue as _, QueryVariables, StaticValue, + graphql_parser::query::{OperationDefinition, Selection, SelectionSet, Text, Value}, + IntoStaticValue as _, StaticValue, }; use itertools::Itertools as _; -use serde::Deserialize; +use serde_json::{self, json}; use thegraph_core::{BlockHash, BlockNumber}; use crate::{blocks::BlockConstraint, chain::Chain, errors::Error}; @@ -26,10 +27,10 @@ pub struct BlockRequirements { pub fn resolve_block_requirements( chain: &Chain, - request_body: &str, + context: &Context, manifest_min_block: BlockNumber, ) -> Result { - let constraints = block_constraints(request_body).unwrap_or_default(); + let constraints = block_constraints(context).unwrap_or_default(); let latest = constraints.iter().any(|c| match c { BlockConstraint::Unconstrained | BlockConstraint::NumberGTE(_) => true, @@ -75,25 +76,11 @@ pub fn resolve_block_requirements( }) } -fn block_constraints(request_body: &str) -> Result, Error> { - #[derive(Deserialize)] - struct ClientRequest { - query: String, - #[serde(default)] - variables: Option, - } - let request: ClientRequest = - serde_json::from_str(request_body).map_err(|err| Error::BadQuery(err.into()))?; - let document = - graphql_parser::parse_query(&request.query).map_err(|err| Error::BadQuery(err.into()))?; - +fn block_constraints(context: &Context) -> Result, Error> { let mut constraints = BTreeSet::new(); - let vars = &request.variables.map(|vars| vars.0).unwrap_or_default(); - let operations = document.definitions.iter().filter_map(|def| match def { - Definition::Operation(op) => Some(op), - Definition::Fragment(_) => None, - }); - for operation in operations { + let vars = &context.variables; + // ba6c90f1-3baf-45be-ac1c-f60733404436 + for operation in &context.operations { let (selection_set, defaults) = match operation { OperationDefinition::SelectionSet(selection_set) => { (selection_set, BTreeMap::default()) @@ -103,8 +90,8 @@ fn block_constraints(request_body: &str) -> Result, Er let defaults: BTreeMap = query .variable_definitions .iter() - .filter(|d| !vars.contains_key(d.name)) - .filter_map(|d: &VariableDefinition<'_, &'_ str>| { + .filter(|d| !vars.0.contains_key(d.name)) + .filter_map(|d| { Some((d.name.to_string(), d.default_value.as_ref()?.to_graphql())) }) .collect(); @@ -139,14 +126,92 @@ fn block_constraints(request_body: &str) -> Result, Er Ok(constraints) } -fn field_constraint<'c>( - vars: &HashMap, +pub fn rewrite_query<'q>(ctx: &Context<'q>) -> String { + let mut buf: String = Default::default(); + for fragment in &ctx.fragments { + write!(&mut buf, "{}", fragment).unwrap(); + } + if contains_introspection(ctx) { + for operation in &ctx.operations { + write!(&mut buf, "{}", operation).unwrap(); + } + } else { + let serialize_selection_set = + |buf: &mut String, selection_set: &SelectionSet<'q, &'q str>| { + buf.push_str("{\n"); + for selection in &selection_set.items { + match selection { + Selection::Field(field) => { + write!(buf, " {}", field).unwrap(); + } + Selection::FragmentSpread(spread) => { + write!(buf, " {}", spread).unwrap(); + } + Selection::InlineFragment(fragment) => { + write!(buf, " {}", fragment).unwrap(); + } + }; + } + buf.push_str(" _gateway_probe_: _meta { block { hash number timestamp } }\n}\n"); + }; + let serialize_operation = + |buf: &mut String, operation: &OperationDefinition<'q, &'q str>| { + match operation { + OperationDefinition::SelectionSet(selection_set) => { + serialize_selection_set(buf, selection_set); + } + OperationDefinition::Query(query) => { + buf.push_str("query"); + if let Some(name) = query.name { + write!(buf, " {name}").unwrap(); + } + if !query.variable_definitions.is_empty() { + write!(buf, "({}", query.variable_definitions[0]).unwrap(); + for var in &query.variable_definitions[1..] { + write!(buf, ", {var}").unwrap(); + } + buf.push(')'); + } + debug_assert!(query.directives.is_empty()); + buf.push(' '); + serialize_selection_set(buf, &query.selection_set); + } + OperationDefinition::Mutation(_) | OperationDefinition::Subscription(_) => (), + }; + }; + for operation in &ctx.operations { + serialize_operation(&mut buf, operation); + } + } + + serde_json::to_string(&json!({ "query": buf, "variables": ctx.variables })).unwrap() +} + +fn contains_introspection(ctx: &Context<'_>) -> bool { + fn selection_set_has_introspection<'q>(s: &SelectionSet<'q, &'q str>) -> bool { + s.items.iter().any(|selection| match selection { + Selection::Field(f) => f.name.starts_with("__"), // only check top level + Selection::InlineFragment(_) | Selection::FragmentSpread(_) => false, + }) + } + ctx.operations.iter().any(|op| match op { + OperationDefinition::Query(q) => selection_set_has_introspection(&q.selection_set), + OperationDefinition::SelectionSet(s) => selection_set_has_introspection(s), + OperationDefinition::Mutation(_) | OperationDefinition::Subscription(_) => false, + }) +} + +fn field_constraint<'c, T: Text<'c>>( + vars: &cost_model::QueryVariables, defaults: &BTreeMap, - field: &Value<'c, &'c str>, + field: &Value<'c, T>, ) -> anyhow::Result { match field { Value::Object(fields) => parse_constraint(vars, defaults, fields), - Value::Variable(name) => match vars.get(*name).or_else(|| defaults.get(*name)) { + Value::Variable(name) => match vars + .get(name.as_ref()) + .or_else(|| defaults.get(name.as_ref())) + { None => Ok(BlockConstraint::Unconstrained), Some(Value::Object(fields)) => parse_constraint(vars, defaults, fields), _ => Err(anyhow!("malformed block constraint")), @@ -156,7 +221,7 @@ fn field_constraint<'c>( } fn parse_constraint<'c, T: Text<'c>>( - vars: &HashMap, + vars: &cost_model::QueryVariables, defaults: &BTreeMap, fields: &BTreeMap>, ) -> anyhow::Result { @@ -186,7 +251,7 @@ fn parse_constraint<'c, T: Text<'c>>( fn parse_hash<'c, T: Text<'c>>( hash: &Value<'c, T>, - vars: &HashMap, + variables: &cost_model::QueryVariables, defaults: &BTreeMap, ) -> anyhow::Result> { match hash { @@ -194,7 +259,7 @@ fn parse_hash<'c, T: Text<'c>>( .parse() .map(Some) .map_err(|err| anyhow!("malformed block hash: {err}")), - Value::Variable(name) => match vars + Value::Variable(name) => match variables .get(name.as_ref()) .or_else(|| defaults.get(name.as_ref())) { @@ -210,12 +275,12 @@ fn parse_hash<'c, T: Text<'c>>( fn parse_number<'c, T: Text<'c>>( number: &Value<'c, T>, - vars: &HashMap, + variables: &cost_model::QueryVariables, defaults: &BTreeMap, ) -> anyhow::Result> { let n = match number { Value::Int(n) => n, - Value::Variable(name) => match vars + Value::Variable(name) => match variables .get(name.as_ref()) .or_else(|| defaults.get(name.as_ref())) { @@ -231,73 +296,77 @@ fn parse_number<'c, T: Text<'c>>( #[cfg(test)] mod tests { - use super::{block_constraints, BlockConstraint}; + use std::iter::FromIterator as _; + + use alloy_primitives::hex; + + use super::*; #[test] fn tests() { use BlockConstraint::*; + let hash: BlockHash = + hex!("0000000000000000000000000000000000000000000000000000000000054321").into(); let tests = [ - (r#"{"query":"{ a }"}"#, Ok(vec![Unconstrained])), - (r#"{"query":"{ a(abc:true) }"}"#, Ok(vec![Unconstrained])), + ("{ a }", Ok(vec![Unconstrained])), + ("{ a(abc:true) }", Ok(vec![Unconstrained])), + ("{ a(block:{number:10}) }", Ok(vec![Number(10)])), ( - r#"{"query":"{ a(block:{number:10}) }"}"#, - Ok(vec![Number(10)]), - ), - ( - r#"{"query":"{ a(block:{number:10,number_gte:11}) }"}"#, + "{ a(block:{number:10,number_gte:11}) }", Err("bad query: conflicting block constraints"), ), ( - r#"{"query":"{ a(block:{number:1}) b(block:{number:2}) }"}"#, + "{ a(block:{number:1}) b(block:{number:2}) }", Ok(vec![Number(1), Number(2)]), ), ( - r#"{"query":"{ a(block:{hash:\"0x0000000000000000000000000000000000000000000000000000000000054321\"}) }"}"#, - Ok(vec![Hash( - "0x0000000000000000000000000000000000000000000000000000000000054321" - .parse() - .unwrap(), - )]), + &format!("{{ a(block:{{hash:{:?}}})}}", hash.to_string()), + Ok(vec![Hash(hash)]), ), ( - r#"{"query":"{ a(block:{number_gte:1}) b }"}"#, + "{ a(block:{number_gte:1}) b }", Ok(vec![NumberGTE(1), Unconstrained]), ), ( - r#"{"query":"query($n: Int = 1) { a(block:{number_gte:$n}) }"}"#, + "query($n: Int = 1) { a(block:{number_gte:$n}) }", Ok(vec![NumberGTE(1)]), ), ( - r#"{"query":"query($n: Int) { a(block:{number_gte:$n}) }"}"#, + "query($n: Int) { a(block:{number_gte:$n}) }", Ok(vec![Unconstrained]), ), ( - r#"{"query":"query($h: String) { a(block:{hash:$h}) }"}"#, + "query($h: String) { a(block:{hash:$h}) }", Ok(vec![Unconstrained]), ), ( - r#"{"query":"query($b: Block_height) { a(block:$b) }"}"#, + "query($b: Block_height) { a(block:$b) }", Ok(vec![Unconstrained]), ), ( - r#"{"query":"query($b: Block_height = {number_gte:0}) { a(block:$b) }"}"#, + "query($b: Block_height = {number_gte:0}) { a(block:$b) }", Ok(vec![NumberGTE(0)]), ), - ( - r#"{"query":"query($b: Block_height) { a(block:$b) }","variables":{"b":{"number_gte":0}}}"#, - Ok(vec![NumberGTE(0)]), - ), - ( - r#"{"query":"query($b: Int) { a(block:{number:$b}) }","variables":{"b":0}}"#, - Ok(vec![Number(0)]), - ), ]; for (query, expected) in tests { - let constraints = block_constraints(query).map_err(|e| e.to_string()); + let context = Context::new(query, "").unwrap(); + let constraints = block_constraints(&context).map_err(|e| e.to_string()); let expected = expected - .map(|v| v.iter().cloned().collect()) + .map(|v| BTreeSet::from_iter(v.iter().cloned())) .map_err(ToString::to_string); assert_eq!(constraints, expected); } } + + #[test] + fn query_contains_introspection() { + let examples = [ + "{ __schema { queryType { name } } }", + "{ __type(name:\"Droid\") { name description } }", + ]; + for example in examples { + let context = Context::new(example, "").unwrap(); + assert!(super::contains_introspection(&context)); + } + } } diff --git a/src/chain.rs b/src/chain.rs index bf3283520..8e77c805c 100644 --- a/src/chain.rs +++ b/src/chain.rs @@ -1,34 +1,19 @@ use std::{ collections::{BTreeMap, BTreeSet}, iter, - time::Duration, }; use thegraph_core::{BlockHash, IndexerId}; -use tokio::time::Instant; use crate::blocks::Block; #[derive(Default)] -pub struct Chain { - blocks: BTreeMap>, - update: parking_lot::Mutex>, -} +pub struct Chain(BTreeMap>); const MAX_LEN: usize = 512; const DEFAULT_BLOCKS_PER_MINUTE: u64 = 6; -const UPDATE_INTERVAL: Duration = Duration::from_secs(1); impl Chain { - pub fn update_ticket(&self) -> Option<()> { - let mut update = self.update.try_lock()?; - if matches!(*update, Some(t) if t.elapsed() < UPDATE_INTERVAL) { - return None; - } - *update = Some(Instant::now()); - Some(()) - } - pub fn latest(&self) -> Option<&Block> { self.consensus_blocks().next() } @@ -53,39 +38,33 @@ impl Chain { (bps * 60.0) as u64 } - pub fn insert(&mut self, block: Block, indexer: IndexerId) { - tracing::trace!(%indexer, ?block); - if !self.should_insert(&block, &indexer) { - return; - } - if self.blocks.len() >= MAX_LEN { - self.evict(); - } - self.blocks.entry(block).or_default().insert(indexer); - } - - fn should_insert(&self, block: &Block, indexer: &IndexerId) -> bool { + pub fn should_insert(&self, block: &Block, indexer: &IndexerId) -> bool { let redundant = self - .blocks + .0 .get(block) .map(|indexers| indexers.contains(indexer)) .unwrap_or(false); - let lowest_block = self - .blocks - .first_key_value() - .map(|(b, _)| b.number) - .unwrap_or(0); - let has_space = (self.blocks.len() < MAX_LEN) || (block.number > lowest_block); + let lowest_block = self.0.first_key_value().map(|(b, _)| b.number).unwrap_or(0); + let has_space = (self.0.len() < MAX_LEN) || (block.number > lowest_block); !redundant && has_space } + pub fn insert(&mut self, block: Block, indexer: IndexerId) { + tracing::trace!(%indexer, ?block); + debug_assert!(self.should_insert(&block, &indexer)); + if self.0.len() >= MAX_LEN { + self.evict(); + } + self.0.entry(block).or_default().insert(indexer); + } + /// Remove all entries associated with the lowest block number. fn evict(&mut self) { - let min_block = match self.blocks.pop_first() { + let min_block = match self.0.pop_first() { Some((min_block, _)) => min_block, None => return, }; - while let Some(entry) = self.blocks.first_entry() { + while let Some(entry) = self.0.first_entry() { debug_assert!(entry.key().number >= min_block.number); if entry.key().number > min_block.number { break; @@ -121,7 +100,7 @@ impl Chain { } } ConsensusBlocks { - blocks: self.blocks.iter().rev().peekable(), + blocks: self.0.iter().rev().peekable(), } } } @@ -129,7 +108,7 @@ impl Chain { #[cfg(test)] mod tests { use alloy_primitives::U256; - use itertools::Itertools as _; + use itertools::Itertools; use rand::{ rngs::SmallRng, seq::SliceRandom as _, thread_rng, Rng as _, RngCore as _, SeedableRng, }; @@ -158,30 +137,32 @@ mod tests { timestamp, }; let indexer = *indexers.choose(&mut rng).unwrap(); - chain.insert(block, indexer); + if chain.should_insert(&block, &indexer) { + chain.insert(block, indexer); + } } - // println!("{:#?}", chain.blocks); + // println!("{:#?}", chain.0); // println!("{:#?}", chain.consensus_blocks().collect::>()); - assert!(chain.blocks.len() <= MAX_LEN, "chain len above max"); - assert!(chain.consensus_blocks().count() <= chain.blocks.len()); + assert!(chain.0.len() <= MAX_LEN, "chain len above max"); + assert!(chain.consensus_blocks().count() <= chain.0.len()); assert!(chain.blocks_per_minute() > 0); - let blocks = || chain.blocks.keys(); + let blocks = || chain.0.keys(); assert!( blocks().tuple_windows().all(|(a, b)| a.number <= b.number), "chain block numbers not monotonic, check ord impl" ); for block in chain.consensus_blocks() { let max_fork_indexers = chain - .blocks + .0 .iter() .filter(|(block, _)| (block != block) && (block.number == block.number)) .map(|(_, indexers)| indexers.len()) .max() .unwrap_or(0); assert!( - chain.blocks.get(block).unwrap().len() > max_fork_indexers, + chain.0.get(block).unwrap().len() > max_fork_indexers, "consensus block without majority consensus" ); } diff --git a/src/chains.rs b/src/chains.rs index 636602e05..096e9eb30 100644 --- a/src/chains.rs +++ b/src/chains.rs @@ -1,11 +1,36 @@ -use std::collections::{BTreeMap, HashMap}; +use std::{ + collections::{BTreeMap, HashMap}, + time::Duration, +}; -use parking_lot::RwLock; +use parking_lot::{RwLock, RwLockReadGuard}; +use thegraph_core::IndexerId; +use tokio::{ + select, spawn, + sync::mpsc, + time::{interval, MissedTickBehavior}, +}; -use crate::chain::Chain; +use crate::{blocks::Block, chain::Chain, metrics::METRICS}; + +#[derive(Clone)] +pub struct ChainReader { + tx: mpsc::UnboundedSender, + chain: &'static RwLock, +} + +impl ChainReader { + pub fn read(&self) -> RwLockReadGuard<'_, Chain> { + self.chain.read() + } + + pub fn notify(&self, block: Block, indexer: IndexerId) { + let _ = self.tx.send(Msg { block, indexer }); + } +} pub struct Chains { - data: RwLock>>, + data: RwLock>, aliases: BTreeMap, } @@ -17,20 +42,68 @@ impl Chains { } } - pub fn chain(&self, name: &str) -> &'static RwLock { + pub fn chain(&self, name: &str) -> ChainReader { let name = self.aliases.get(name).map(|a| a.as_str()).unwrap_or(name); { let reader = self.data.read(); if let Some(chain) = reader.get(name) { - return chain; + return chain.clone(); } } { let mut writer = self.data.write(); - if !writer.contains_key(name) { - writer.insert(name.to_string(), Box::leak(Default::default())); + writer + .entry(name.to_string()) + .or_insert_with(|| Actor::spawn(name.to_string())) + .clone() + } + } +} + +struct Msg { + block: Block, + indexer: IndexerId, +} + +struct Actor; + +impl Actor { + pub fn spawn(chain_name: String) -> ChainReader { + let chain: &'static RwLock = Box::leak(Box::default()); + let (tx, mut rx) = mpsc::unbounded_channel(); + spawn(async move { + let mut msgs: Vec = Default::default(); + let mut timer = interval(Duration::from_secs(1)); + timer.set_missed_tick_behavior(MissedTickBehavior::Delay); + loop { + select! { + _ = rx.recv_many(&mut msgs, 32) => Self::handle_msgs(chain, &mut msgs), + _ = timer.tick() => { + let blocks_per_minute = chain.read().blocks_per_minute(); + METRICS + .blocks_per_minute + .with_label_values(&[&chain_name]) + .set(blocks_per_minute as i64); + }, + } + } + }); + ChainReader { tx, chain } + } + + fn handle_msgs(chain: &RwLock, msgs: &mut Vec) { + { + let reader = chain.read(); + msgs.retain(|Msg { block, indexer }| reader.should_insert(block, indexer)); + } + { + let mut writer = chain.write(); + for Msg { block, indexer } in msgs.drain(..) { + if writer.should_insert(&block, &indexer) { + writer.insert(block, indexer); + } } } - self.data.read().get(name).unwrap() + debug_assert!(msgs.is_empty()); } } diff --git a/src/client_query.rs b/src/client_query.rs index e965411e4..4ef31ed39 100644 --- a/src/client_query.rs +++ b/src/client_query.rs @@ -4,19 +4,22 @@ use std::{ time::{Duration, Instant}, }; -use alloy_sol_types::Eip712Domain; use anyhow::anyhow; use axum::{ + body::Bytes, extract::{Path, State}, http::{Response, StatusCode}, Extension, }; +use cost_model::Context as AgoraContext; use custom_debug::CustomDebug; use headers::ContentType; use indexer_selection::{ArrayVec, Candidate, Normalized}; use ordered_float::NotNan; +use prost::bytes::Buf; use rand::{thread_rng, Rng as _}; use serde::Deserialize; +use serde_json::value::RawValue; use thegraph_core::{AllocationId, BlockNumber, DeploymentId, IndexerId}; use tokio::sync::mpsc; use tracing::{info_span, Instrument as _}; @@ -25,17 +28,16 @@ use url::Url; use self::{attestation_header::GraphAttestation, context::Context, query_selector::QuerySelector}; use crate::{ auth::AuthSettings, - block_constraints::{resolve_block_requirements, BlockRequirements}, - blocks::Block, + block_constraints::{resolve_block_requirements, rewrite_query, BlockRequirements}, budgets::USD, errors::{Error, IndexerError, IndexerErrors, MissingBlockError, UnavailableReason}, http_ext::HttpBuilderExt as _, - indexer_client::{IndexerAuth, IndexerClient, IndexerResponse}, + indexer_client::{IndexerAuth, IndexerResponse}, indexing_performance, metrics::{with_metric, METRICS}, middleware::RequestId, network::{self, DeploymentError, Indexing, IndexingId, ResolvedSubgraphInfo, SubgraphError}, - receipts::{Receipt, ReceiptStatus}, + receipts::ReceiptStatus, reports, }; @@ -45,12 +47,18 @@ mod query_selector; const SELECTION_LIMIT: usize = 3; +#[derive(Debug, Deserialize)] +pub struct QueryBody { + pub query: String, + pub variables: Option>, +} + pub async fn handle_query( State(ctx): State, Extension(auth): Extension, Extension(RequestId(request_id)): Extension, selector: QuerySelector, - body: String, + payload: Bytes, ) -> Result, Error> { let start_time = Instant::now(); @@ -58,6 +66,9 @@ pub async fn handle_query( // resolve the subgraph deployments for the query. let subgraph = resolve_subgraph_info(&ctx, &auth, selector).await?; + let client_request: QueryBody = + serde_json::from_reader(payload.reader()).map_err(|err| Error::BadQuery(err.into()))?; + // Calculate the budget for the query let grt_per_usd = *ctx.grt_per_usd.borrow(); let one_grt = NotNan::new(1e18).unwrap(); @@ -78,7 +89,14 @@ pub async fn handle_query( let (tx, mut rx) = mpsc::channel(1); tokio::spawn( run_indexer_queries( - ctx, request_id, auth, start_time, subgraph, budget, body, tx, + ctx, + request_id, + auth, + start_time, + subgraph, + budget, + client_request, + tx, ) .in_current_span(), ); @@ -96,7 +114,7 @@ pub async fn handle_query( result.map( |IndexerResponse { - response, + client_response, attestation, .. }| { @@ -104,7 +122,7 @@ pub async fn handle_query( .status(StatusCode::OK) .header_typed(ContentType::json()) .header_typed(GraphAttestation(attestation)) - .body(response) + .body(client_response) .unwrap() }, ) @@ -167,15 +185,34 @@ async fn run_indexer_queries( start_time: Instant, subgraph: ResolvedSubgraphInfo, budget: u128, - client_request: String, + client_request: QueryBody, client_response: mpsc::Sender>, ) { let one_grt = NotNan::new(1e18).unwrap(); let grt_per_usd = *ctx.grt_per_usd.borrow(); + // Create the Agora context from the query and variables + let variables = client_request + .variables + .as_ref() + .map(ToString::to_string) + .unwrap_or_default(); + // We handle these errors here, instead of `handle_query`, because the agora context is tied to + // the lifetime of the query body which may need to extend past the client response. Even if + // it doesn't, it is relatively difficult to convince the compiler of that. + let agora_context = match AgoraContext::new(&client_request.query, &variables) { + Ok(agora_context) => agora_context, + Err(err) => { + client_response + .try_send(Err(Error::BadQuery(anyhow!("{err}")))) + .unwrap(); + return; + } + }; + // Get the chain information for the resolved subgraph - let (chain_head, blocks_per_minute, block_requirements, chain_update) = { - let chain = ctx.chains.chain(&subgraph.chain); + let chain = ctx.chains.chain(&subgraph.chain); + let (chain_head, blocks_per_minute, block_requirements) = { let chain_reader = chain.read(); // Get the chain head block number. Try to get it from the chain head tracker service, if it @@ -190,26 +227,18 @@ async fn run_indexer_queries( // Get the estimated blocks per minute for the chain let blocks_per_minute = chain_reader.blocks_per_minute(); - let block_requirements = match resolve_block_requirements( - &chain_reader, - &client_request, - subgraph.start_block, - ) { - Ok(block_requirements) => block_requirements, - Err(err) => { - client_response - .try_send(Err(Error::BadQuery(anyhow!("{err}")))) - .unwrap(); - return; - } - }; + let block_requirements = + match resolve_block_requirements(&chain_reader, &agora_context, subgraph.start_block) { + Ok(block_requirements) => block_requirements, + Err(err) => { + client_response + .try_send(Err(Error::BadQuery(anyhow!("{err}")))) + .unwrap(); + return; + } + }; - ( - chain_head, - blocks_per_minute, - block_requirements, - chain_reader.update_ticket().is_some(), - ) + (chain_head, blocks_per_minute, block_requirements) }; tracing::debug!(chain_head, blocks_per_minute, ?block_requirements); @@ -228,15 +257,16 @@ async fn run_indexer_queries( indexer_errors.extend(errors); if tracing::enabled!(tracing::Level::TRACE) { - tracing::trace!(client_request); + tracing::trace!(client_query = client_request.query, variables); tracing::trace!(?candidates); } else if tracing::enabled!(tracing::Level::DEBUG) && thread_rng().gen_bool(0.01) { // Log candidates at a low rate to avoid log bloat - tracing::debug!(client_request); + tracing::debug!(client_query = client_request.query, variables); tracing::debug!(?candidates); } - let client_request_bytes = client_request.len() as u32; + let client_request_bytes = client_request.query.len() as u32; + let indexer_query = rewrite_query(&agora_context); let mut indexer_requests: Vec = Default::default(); let mut client_response_time: Option = None; let mut client_response_bytes: Option = None; @@ -282,7 +312,7 @@ async fn run_indexer_queries( let blocks_behind = blocks_behind(seconds_behind, blocks_per_minute); let indexer_client = ctx.indexer_client.clone(); - let client_request = client_request.clone(); + let indexer_query = indexer_query.clone(); let tx = tx.clone(); tokio::spawn( async move { @@ -291,7 +321,7 @@ async fn run_indexer_queries( let deployment_url = url.join(&format!("subgraphs/id/{}", deployment)).unwrap(); let auth = IndexerAuth::Paid(&receipt, ctx.attestation_domain); let result = indexer_client - .query_indexer(deployment_url, auth, &client_request) + .query_indexer(deployment_url, auth, &indexer_query) .in_current_span() .await; let response_time_ms = start_time.elapsed().as_millis() as u16; @@ -306,7 +336,7 @@ async fn run_indexer_queries( response_time_ms, seconds_behind, blocks_behind, - request: client_request, + request: indexer_query, }; tx.try_send(report).unwrap(); } @@ -320,7 +350,7 @@ async fn run_indexer_queries( Ok(response) if client_response_time.is_none() => { let _ = client_response.try_send(Ok(response.clone())); client_response_time = Some(start_time.elapsed()); - client_response_bytes = Some(response.response.len() as u32); + client_response_bytes = Some(response.client_response.len() as u32); } Ok(_) => (), Err(err) => { @@ -372,15 +402,11 @@ async fn run_indexer_queries( .map(|i| i.receipt.grt_value() as f64 * 1e-18) .sum(); let total_fees_usd = USD(NotNan::new(total_fees_grt / *grt_per_usd).unwrap()); - - if chain_update { - let _ = ctx.budgeter.feedback.send(USD(total_fees_usd.0 * 2.0)); - } else { - let _ = ctx.budgeter.feedback.send(total_fees_usd); - } + let _ = ctx.budgeter.feedback.send(total_fees_usd); for indexer_request in &indexer_requests { let latest_block = match &indexer_request.result { + Ok(response) => response.probe_block.as_ref().map(|b| b.number), Err(IndexerError::Unavailable(UnavailableReason::MissingBlock(err))) => err.latest, _ => None, }; @@ -392,8 +418,13 @@ async fn run_indexer_queries( latest_block, ); - if chain_update && indexer_request.result.is_ok() { - update_chain(&ctx, indexer_request); + if let Some(block) = indexer_request + .result + .as_ref() + .ok() + .and_then(|r| r.probe_block.clone()) + { + chain.notify(block, indexer_request.indexer); } let deployment = indexer_request.deployment.to_string(); @@ -459,6 +490,7 @@ struct CandidateMetadata { /// Given a list of indexings, build a list of candidates that are within the required block range /// and have the required performance. +#[allow(clippy::too_many_arguments)] fn build_candidates_list( ctx: &Context, budget: u128, @@ -794,13 +826,13 @@ pub async fn handle_indexer_query( user: auth.user, grt_per_usd, request_bytes: indexer_request.request.len() as u32, - response_bytes: result.as_ref().map(|r| r.response.len() as u32).ok(), + response_bytes: result.as_ref().map(|r| r.client_response.len() as u32).ok(), indexer_requests: vec![indexer_request], }); result.map( |IndexerResponse { - response, + client_response, attestation, .. }| { @@ -808,79 +840,12 @@ pub async fn handle_indexer_query( .status(StatusCode::OK) .header_typed(ContentType::json()) .header_typed(GraphAttestation(attestation)) - .body(response) + .body(client_response) .unwrap() }, ) } -fn update_chain(ctx: &Context, indexer_request: &reports::IndexerRequest) { - let allocation = indexer_request.receipt.allocation().into(); - let receipt = match &indexer_request.receipt { - Receipt::Legacy(fee, _) => ctx.receipt_signer.create_legacy_receipt(allocation, *fee), - Receipt::Tap(r) => ctx - .receipt_signer - .create_receipt(allocation, r.message.value), - }; - let attestation_domain = ctx.attestation_domain.clone(); - let indexer_client = ctx.indexer_client.clone(); - let deployment_url = indexer_request - .url - .parse::() - .unwrap() - .join(&format!("subgraphs/id/{}", indexer_request.deployment)) - .unwrap(); - let subgraph_chain = indexer_request.subgraph_chain.clone(); - let chains = ctx.chains; - let indexer = indexer_request.indexer; - tokio::spawn(async move { - match update_chain_inner(indexer_client, deployment_url, receipt, &attestation_domain).await - { - Ok(block) => { - tracing::debug!(chain = subgraph_chain, ?block); - chains.chain(&subgraph_chain).write().insert(block, indexer); - METRICS - .blocks_per_minute - .with_label_values(&[&subgraph_chain]) - .set(chains.chain(&subgraph_chain).read().blocks_per_minute() as i64); - } - Err(err) => { - tracing::debug!(chain_update_err = format!("{err:#}")); - } - }; - }); -} -async fn update_chain_inner( - indexer_client: IndexerClient, - deployment_url: Url, - receipt: anyhow::Result, - attestation_domain: &Eip712Domain, -) -> anyhow::Result { - let query = r#"{"query":"{meta:_meta{block{number,hash,timestamp}}}"}"#; - let receipt = receipt?; - let auth = IndexerAuth::Paid(&receipt, attestation_domain); - let response = indexer_client - .query_indexer(deployment_url, auth, query) - .await?; - if !response.errors.is_empty() { - anyhow::bail!(anyhow!("indexer errors: {:?}", response.errors)); - } - #[derive(Debug, Deserialize)] - pub struct QueryResponse { - data: QueryData, - } - #[derive(Debug, Deserialize)] - pub struct QueryData { - meta: Meta, - } - #[derive(Debug, Deserialize)] - pub struct Meta { - block: Block, - } - let response: QueryResponse = serde_json::from_str(&response.response)?; - Ok(response.data.meta.block) -} - #[cfg(test)] mod tests { mod require_req_auth { diff --git a/src/indexer_client.rs b/src/indexer_client.rs index edd69b408..5bf44807d 100644 --- a/src/indexer_client.rs +++ b/src/indexer_client.rs @@ -1,11 +1,15 @@ use alloy_sol_types::Eip712Domain; use reqwest::header::AUTHORIZATION; use serde::{Deserialize, Serialize}; -use thegraph_core::attestation::{self, Attestation}; +use thegraph_core::{ + attestation::{self, Attestation}, + BlockHash, BlockNumber, +}; use thegraph_graphql_http::http::response::Error as GQLError; use url::Url; use crate::{ + blocks::Block, errors::{ IndexerError::{self, *}, MissingBlockError, UnavailableReason, @@ -16,9 +20,11 @@ use crate::{ #[derive(Clone, Debug)] pub struct IndexerResponse { - pub response: String, + pub original_response: String, pub attestation: Option, + pub client_response: String, pub errors: Vec, + pub probe_block: Option, } #[derive(Clone)] @@ -78,10 +84,11 @@ impl IndexerClient { return Err(BadResponse(err)); } - let response = payload + let original_response = payload .graphql_response .ok_or_else(|| BadResponse("missing response".into()))?; - let errors = parse_indexer_errors(&response)?; + let (client_response, errors, probe_block) = rewrite_response(&original_response)?; + let errors: Vec = errors.into_iter().map(|err| err.message).collect(); errors .iter() @@ -103,7 +110,7 @@ impl IndexerClient { attestation, &allocation, query, - &response, + &original_response, ) { return Err(BadResponse(format!("bad attestation: {err}"))); } @@ -127,16 +134,21 @@ impl IndexerClient { } Ok(IndexerResponse { - response, + original_response, attestation: payload.attestation, + client_response, errors, + probe_block, }) } } -fn parse_indexer_errors(response: &str) -> Result, IndexerError> { +fn rewrite_response( + response: &str, +) -> Result<(String, Vec, Option), IndexerError> { #[derive(Deserialize, Serialize)] - struct ResponseErrors { + struct Response { + data: Option, #[serde(default)] #[serde(skip_serializing_if = "Vec::is_empty")] errors: Vec, @@ -145,7 +157,24 @@ fn parse_indexer_errors(response: &str) -> Result, IndexerError> { #[serde(skip_serializing_if = "Option::is_none")] error: Option, } - let mut payload: ResponseErrors = + #[derive(Deserialize, Serialize)] + struct ProbedData { + #[serde(rename = "_gateway_probe_", skip_serializing)] + probe: Option, + #[serde(flatten)] + data: serde_json::Value, + } + #[derive(Deserialize)] + struct Meta { + block: MaybeBlock, + } + #[derive(Deserialize)] + struct MaybeBlock { + number: BlockNumber, + hash: BlockHash, + timestamp: Option, + } + let mut payload: Response = serde_json::from_str(response).map_err(|err| BadResponse(err.to_string()))?; if let Some(err) = payload.error.take() { @@ -162,7 +191,19 @@ fn parse_indexer_errors(response: &str) -> Result, IndexerError> { err.message.shrink_to_fit(); } - Ok(payload.errors.into_iter().map(|e| e.message).collect()) + let block = payload + .data + .as_mut() + .and_then(|data| data.probe.take()) + .and_then(|meta| { + Some(Block { + number: meta.block.number, + hash: meta.block.hash, + timestamp: meta.block.timestamp?, + }) + }); + let client_response = serde_json::to_string(&payload).unwrap(); + Ok((client_response, payload.errors, block)) } fn check_block_error(err: &str) -> Result<(), MissingBlockError> { diff --git a/src/network/subgraph_client.rs b/src/network/subgraph_client.rs index 5e76fcaf7..98b52093a 100644 --- a/src/network/subgraph_client.rs +++ b/src/network/subgraph_client.rs @@ -240,11 +240,12 @@ impl Client { ) .await?; tracing::trace!( - response = response.response, + response.original_response, + response.client_response, ?response.errors, ); let response: QueryResponse = - serde_json::from_str(&response.response).context("parse body")?; + serde_json::from_str(&response.client_response).context("parse body")?; if !response.errors.is_empty() { bail!("{:?}", response.errors); } diff --git a/src/reports.rs b/src/reports.rs index c09daf57a..308949339 100644 --- a/src/reports.rs +++ b/src/reports.rs @@ -50,6 +50,7 @@ pub struct Topics { } impl Reporter { + #[allow(clippy::too_many_arguments)] pub fn create( tap_signer: Address, graph_env: String, @@ -147,15 +148,15 @@ impl Reporter { self.write_buf.clear(); for indexer_request in client_request.indexer_requests { - if let Some((response, attestation)) = indexer_request + if let Some((original_response, attestation)) = indexer_request .result .ok() - .and_then(|r| Some((r.response, r.attestation?))) + .and_then(|r| Some((r.original_response, r.attestation?))) { const MAX_PAYLOAD_BYTES: usize = 10_000; AttestationProtobuf { request: Some(indexer_request.request).filter(|r| r.len() <= MAX_PAYLOAD_BYTES), - response: Some(response).filter(|r| r.len() <= MAX_PAYLOAD_BYTES), + response: Some(original_response).filter(|r| r.len() <= MAX_PAYLOAD_BYTES), allocation: indexer_request.receipt.allocation().0 .0.into(), subgraph_deployment: attestation.deployment.0.into(), request_cid: attestation.request_cid.0.into(),