diff --git a/core/src/subgraph/inputs.rs b/core/src/subgraph/inputs.rs index ca52073ab06..da596c44cf8 100644 --- a/core/src/subgraph/inputs.rs +++ b/core/src/subgraph/inputs.rs @@ -4,9 +4,10 @@ use graph::{ store::{DeploymentLocator, SourceableStore, SubgraphFork, WritableStore}, subgraph::ProofOfIndexingVersion, }, - data::subgraph::{SubgraphFeature, UnifiedMappingApiVersion}, + data::subgraph::{SubgraphFeature, UnifiedMappingApiVersion, SPEC_VERSION_1_3_0}, data_source::DataSourceTemplate, prelude::BlockNumber, + semver::Version, }; use std::collections::BTreeSet; use std::sync::Arc; @@ -28,6 +29,7 @@ pub struct IndexingInputs { pub static_filters: bool, pub poi_version: ProofOfIndexingVersion, pub network: String, + pub spec_version: Version, /// Whether to instrument trigger processing and log additional, /// possibly expensive and noisy, information @@ -53,6 +55,7 @@ impl IndexingInputs { static_filters, poi_version, network, + spec_version, instrument, } = self; IndexingInputs { @@ -72,7 +75,14 @@ impl IndexingInputs { static_filters: *static_filters, poi_version: *poi_version, network: network.clone(), + spec_version: spec_version.clone(), instrument: *instrument, } } + + // Whether to use strict vid order for the subgraph + // This is true for all subgraphs with spec version 1.3.0 or greater + pub fn strict_vid_order(&self) -> bool { + self.spec_version >= SPEC_VERSION_1_3_0 + } } diff --git a/core/src/subgraph/instance_manager.rs b/core/src/subgraph/instance_manager.rs index 2d54a90417c..a6d27e439eb 100644 --- a/core/src/subgraph/instance_manager.rs +++ b/core/src/subgraph/instance_manager.rs @@ -542,6 +542,7 @@ impl SubgraphInstanceManager { static_filters: self.static_filters, poi_version, network: network.to_string(), + spec_version: manifest.spec_version.clone(), instrument, }; diff --git a/core/src/subgraph/runner.rs b/core/src/subgraph/runner.rs index 1e0fa2d4c8e..04cf6fee455 100644 --- a/core/src/subgraph/runner.rs +++ b/core/src/subgraph/runner.rs @@ -382,6 +382,7 @@ where let mut block_state = BlockState::new( self.inputs.store.clone(), std::mem::take(&mut self.state.entity_lfu_cache), + self.inputs.strict_vid_order(), ); let _section = self @@ -795,6 +796,7 @@ where let block_state = BlockState::new( self.inputs.store.clone(), std::mem::take(&mut self.state.entity_lfu_cache), + self.inputs.strict_vid_order(), ); self.ctx @@ -1156,7 +1158,11 @@ where // Using an `EmptyStore` and clearing the cache for each trigger is a makeshift way to // get causality region isolation. let schema = ReadStore::input_schema(&self.inputs.store); - let mut block_state = BlockState::new(EmptyStore::new(schema), LfuCache::new()); + let mut block_state = BlockState::new( + EmptyStore::new(schema), + LfuCache::new(), + self.inputs.strict_vid_order(), + ); // PoI ignores offchain events. // See also: poi-ignores-offchain diff --git a/graph/src/components/store/entity_cache.rs b/graph/src/components/store/entity_cache.rs index 1a002789d8f..e5f8d25b5b0 100644 --- a/graph/src/components/store/entity_cache.rs +++ b/graph/src/components/store/entity_cache.rs @@ -113,6 +113,9 @@ pub struct EntityCache { // Sequence number of the next VID value for this block. The value written // in the database consist of a block number and this SEQ number. pub vid_seq: u32, + + /// The spec version of the subgraph being processed + pub strict_vid_order: bool, } impl Debug for EntityCache { @@ -141,6 +144,7 @@ impl EntityCache { store, seq: 0, vid_seq: RESERVED_VIDS, + strict_vid_order: false, } } @@ -152,7 +156,11 @@ impl EntityCache { self.schema.make_entity(iter) } - pub fn with_current(store: Arc, current: EntityLfuCache) -> EntityCache { + pub fn with_current( + store: Arc, + current: EntityLfuCache, + strict_vid_order: bool, + ) -> EntityCache { EntityCache { current, updates: HashMap::new(), @@ -162,6 +170,7 @@ impl EntityCache { store, seq: 0, vid_seq: RESERVED_VIDS, + strict_vid_order, } } @@ -213,16 +222,8 @@ impl EntityCache { // always creates it in a new style. debug_assert!(match scope { GetScope::Store => { - // Release build will never call this function and hence it's OK - // when that implementation is not correct. fn remove_vid(entity: Option>) -> Option { - entity.map(|e| { - #[allow(unused_mut)] - let mut entity = (*e).clone(); - #[cfg(debug_assertions)] - entity.remove("vid"); - entity - }) + entity.map(|e| e.clone_no_vid()) } remove_vid(entity.clone()) == remove_vid(self.store.get(key).unwrap().map(Arc::new)) } @@ -397,19 +398,23 @@ impl EntityCache { *write_capacity_remaining -= weight; } - // The next VID is based on a block number and a sequence within the block - let vid = ((block as i64) << 32) + self.vid_seq as i64; - self.vid_seq += 1; + let is_object = key.entity_type.is_object_type(); + let mut entity = entity; - let old_vid = entity.set_vid(vid).expect("the vid should be set"); - // Make sure that there was no VID previously set for this entity. - if let Some(ovid) = old_vid { - bail!( - "VID: {} of entity: {} with ID: {} was already present when set in EntityCache", - ovid, - key.entity_type, - entity.id() - ); + if self.strict_vid_order && is_object { + // The next VID is based on a block number and a sequence within the block + let vid = ((block as i64) << 32) + self.vid_seq as i64; + self.vid_seq += 1; + let old_vid = entity.set_vid(vid).expect("the vid should be set"); + // Make sure that there was no VID previously set for this entity. + if let Some(ovid) = old_vid { + bail!( + "VID: {} of entity: {} with ID: {} was already present when set in EntityCache", + ovid, + key.entity_type, + entity.id() + ); + } } self.entity_op(key.clone(), EntityOp::Update(entity)); @@ -534,7 +539,7 @@ impl EntityCache { .map_err(|e| key.unknown_attribute(e))?; let data = Arc::new(data); self.current.insert(key.clone(), Some(data.cheap_clone())); - if current != data { + if current.clone_no_vid() != data.clone_no_vid() { Some(Overwrite { key, data, @@ -549,7 +554,7 @@ impl EntityCache { (Some(current), EntityOp::Overwrite(data)) => { let data = Arc::new(data); self.current.insert(key.clone(), Some(data.cheap_clone())); - if current != data { + if current.clone_no_vid() != data.clone_no_vid() { Some(Overwrite { key, data, diff --git a/graph/src/components/subgraph/instance.rs b/graph/src/components/subgraph/instance.rs index 11b473a878d..bed406646c3 100644 --- a/graph/src/components/subgraph/instance.rs +++ b/graph/src/components/subgraph/instance.rs @@ -87,9 +87,9 @@ pub struct BlockState { } impl BlockState { - pub fn new(store: impl ReadStore, lfu_cache: EntityLfuCache) -> Self { + pub fn new(store: impl ReadStore, lfu_cache: EntityLfuCache, strict_vid_order: bool) -> Self { BlockState { - entity_cache: EntityCache::with_current(Arc::new(store), lfu_cache), + entity_cache: EntityCache::with_current(Arc::new(store), lfu_cache, strict_vid_order), deterministic_errors: Vec::new(), created_data_sources: Vec::new(), persisted_data_sources: Vec::new(), diff --git a/graph/src/data/store/mod.rs b/graph/src/data/store/mod.rs index e0912a16485..9eaaf352d0c 100644 --- a/graph/src/data/store/mod.rs +++ b/graph/src/data/store/mod.rs @@ -930,6 +930,13 @@ impl Entity { self.0.insert(VID_FIELD, value.into()) } + /// Clone entity and remove the VID. + pub fn clone_no_vid(&self) -> Entity { + let mut c = self.clone(); + c.0.remove(VID_FIELD); + c + } + /// Sets the VID if it's not already set. Should be used only for tests. #[cfg(debug_assertions)] pub fn set_vid_if_empty(&mut self) { diff --git a/graph/tests/subgraph_datasource_tests.rs b/graph/tests/subgraph_datasource_tests.rs index 2c357bf37cd..d13df6be6dd 100644 --- a/graph/tests/subgraph_datasource_tests.rs +++ b/graph/tests/subgraph_datasource_tests.rs @@ -105,10 +105,10 @@ async fn test_triggers_adapter_with_entities() { let id = DeploymentHash::new("test_deployment").unwrap(); let schema = InputSchema::parse_latest( r#" - type User @entity { + type User @entity { id: String! name: String! - age: Int + age: Int } type Post @entity { id: String! diff --git a/runtime/test/src/common.rs b/runtime/test/src/common.rs index 7641dd06d8b..245f46d1f61 100644 --- a/runtime/test/src/common.rs +++ b/runtime/test/src/common.rs @@ -126,6 +126,7 @@ pub fn mock_context( )) .unwrap(), Default::default(), + false, // TODO: Get this as a param ), proof_of_indexing: None, host_fns: Arc::new(Vec::new()), diff --git a/runtime/wasm/src/mapping.rs b/runtime/wasm/src/mapping.rs index 8086051961a..6a5f95438b2 100644 --- a/runtime/wasm/src/mapping.rs +++ b/runtime/wasm/src/mapping.rs @@ -214,7 +214,11 @@ impl MappingContext { host_exports: self.host_exports.cheap_clone(), block_ptr: self.block_ptr.cheap_clone(), timestamp: self.timestamp, - state: BlockState::new(self.state.entity_cache.store.clone(), Default::default()), + state: BlockState::new( + self.state.entity_cache.store.clone(), + Default::default(), + self.state.entity_cache.strict_vid_order, + ), proof_of_indexing: self.proof_of_indexing.cheap_clone(), host_fns: self.host_fns.cheap_clone(), debug_fork: self.debug_fork.cheap_clone(), diff --git a/runtime/wasm/src/module/context.rs b/runtime/wasm/src/module/context.rs index 03cbf244c23..a7a25f38a58 100644 --- a/runtime/wasm/src/module/context.rs +++ b/runtime/wasm/src/module/context.rs @@ -133,7 +133,11 @@ impl WasmInstanceData { std::mem::replace( state, - BlockState::new(state.entity_cache.store.cheap_clone(), LfuCache::default()), + BlockState::new( + state.entity_cache.store.cheap_clone(), + LfuCache::default(), + state.entity_cache.strict_vid_order, + ), ) } } diff --git a/store/postgres/src/relational_queries.rs b/store/postgres/src/relational_queries.rs index f4b55e89150..b04c22cdeb7 100644 --- a/store/postgres/src/relational_queries.rs +++ b/store/postgres/src/relational_queries.rs @@ -2257,7 +2257,11 @@ impl<'a> InsertRow<'a> { } let br_value = BlockRangeValue::new(table, row.block, row.end); let causality_region = row.causality_region; - let vid = row.entity.vid(); + let vid = if table.object.has_vid_seq() { + row.entity.vid() + } else { + 0 + }; Ok(Self { values, br_value,