From 07e3d3b746cd392646402e5eb62794a2064ab10e Mon Sep 17 00:00:00 2001 From: incrypto32 Date: Thu, 27 Feb 2025 13:46:31 +0400 Subject: [PATCH 1/4] Bring spec version into entity cache to set the new kind of vid's --- core/src/subgraph/inputs.rs | 4 ++++ core/src/subgraph/instance_manager.rs | 1 + core/src/subgraph/runner.rs | 8 +++++++- graph/src/components/store/entity_cache.rs | 12 +++++++++++- graph/src/components/subgraph/instance.rs | 6 ++++-- graph/tests/subgraph_datasource_tests.rs | 4 ++-- runtime/test/src/common.rs | 1 + runtime/wasm/src/mapping.rs | 6 +++++- runtime/wasm/src/module/context.rs | 6 +++++- 9 files changed, 40 insertions(+), 8 deletions(-) diff --git a/core/src/subgraph/inputs.rs b/core/src/subgraph/inputs.rs index ca52073ab06..e595f84d0d3 100644 --- a/core/src/subgraph/inputs.rs +++ b/core/src/subgraph/inputs.rs @@ -7,6 +7,7 @@ use graph::{ data::subgraph::{SubgraphFeature, UnifiedMappingApiVersion}, data_source::DataSourceTemplate, prelude::BlockNumber, + semver::Version, }; use std::collections::BTreeSet; use std::sync::Arc; @@ -28,6 +29,7 @@ pub struct IndexingInputs { pub static_filters: bool, pub poi_version: ProofOfIndexingVersion, pub network: String, + pub spec_version: Version, /// Whether to instrument trigger processing and log additional, /// possibly expensive and noisy, information @@ -53,6 +55,7 @@ impl IndexingInputs { static_filters, poi_version, network, + spec_version, instrument, } = self; IndexingInputs { @@ -72,6 +75,7 @@ impl IndexingInputs { static_filters: *static_filters, poi_version: *poi_version, network: network.clone(), + spec_version: spec_version.clone(), instrument: *instrument, } } diff --git a/core/src/subgraph/instance_manager.rs b/core/src/subgraph/instance_manager.rs index 2d54a90417c..a6d27e439eb 100644 --- a/core/src/subgraph/instance_manager.rs +++ b/core/src/subgraph/instance_manager.rs @@ -542,6 +542,7 @@ impl SubgraphInstanceManager { static_filters: self.static_filters, poi_version, network: network.to_string(), + spec_version: manifest.spec_version.clone(), instrument, }; diff --git a/core/src/subgraph/runner.rs b/core/src/subgraph/runner.rs index 1e0fa2d4c8e..c033e483cd1 100644 --- a/core/src/subgraph/runner.rs +++ b/core/src/subgraph/runner.rs @@ -382,6 +382,7 @@ where let mut block_state = BlockState::new( self.inputs.store.clone(), std::mem::take(&mut self.state.entity_lfu_cache), + self.inputs.spec_version.clone(), ); let _section = self @@ -795,6 +796,7 @@ where let block_state = BlockState::new( self.inputs.store.clone(), std::mem::take(&mut self.state.entity_lfu_cache), + self.inputs.spec_version.clone(), ); self.ctx @@ -1156,7 +1158,11 @@ where // Using an `EmptyStore` and clearing the cache for each trigger is a makeshift way to // get causality region isolation. let schema = ReadStore::input_schema(&self.inputs.store); - let mut block_state = BlockState::new(EmptyStore::new(schema), LfuCache::new()); + let mut block_state = BlockState::new( + EmptyStore::new(schema), + LfuCache::new(), + self.inputs.spec_version.clone(), + ); // PoI ignores offchain events. // See also: poi-ignores-offchain diff --git a/graph/src/components/store/entity_cache.rs b/graph/src/components/store/entity_cache.rs index 1a002789d8f..970bced03e4 100644 --- a/graph/src/components/store/entity_cache.rs +++ b/graph/src/components/store/entity_cache.rs @@ -12,6 +12,7 @@ use crate::prelude::{CacheWeight, ENV_VARS}; use crate::schema::{EntityKey, InputSchema}; use crate::util::intern::Error as InternError; use crate::util::lfu_cache::{EvictStats, LfuCache}; +use semver::Version; use super::{BlockNumber, DerivedEntityQuery, LoadRelatedRequest, StoreError}; @@ -113,6 +114,9 @@ pub struct EntityCache { // Sequence number of the next VID value for this block. The value written // in the database consist of a block number and this SEQ number. pub vid_seq: u32, + + /// The spec version of the subgraph being processed + pub spec_version: Option, } impl Debug for EntityCache { @@ -141,6 +145,7 @@ impl EntityCache { store, seq: 0, vid_seq: RESERVED_VIDS, + spec_version: None, } } @@ -152,7 +157,11 @@ impl EntityCache { self.schema.make_entity(iter) } - pub fn with_current(store: Arc, current: EntityLfuCache) -> EntityCache { + pub fn with_current( + store: Arc, + current: EntityLfuCache, + spec_version: Version, + ) -> EntityCache { EntityCache { current, updates: HashMap::new(), @@ -162,6 +171,7 @@ impl EntityCache { store, seq: 0, vid_seq: RESERVED_VIDS, + spec_version: Some(spec_version), } } diff --git a/graph/src/components/subgraph/instance.rs b/graph/src/components/subgraph/instance.rs index 11b473a878d..bacf542f907 100644 --- a/graph/src/components/subgraph/instance.rs +++ b/graph/src/components/subgraph/instance.rs @@ -1,3 +1,5 @@ +use semver::Version; + use crate::{ blockchain::{Blockchain, DataSourceTemplate as _}, components::{ @@ -87,9 +89,9 @@ pub struct BlockState { } impl BlockState { - pub fn new(store: impl ReadStore, lfu_cache: EntityLfuCache) -> Self { + pub fn new(store: impl ReadStore, lfu_cache: EntityLfuCache, spec_version: Version) -> Self { BlockState { - entity_cache: EntityCache::with_current(Arc::new(store), lfu_cache), + entity_cache: EntityCache::with_current(Arc::new(store), lfu_cache, spec_version), deterministic_errors: Vec::new(), created_data_sources: Vec::new(), persisted_data_sources: Vec::new(), diff --git a/graph/tests/subgraph_datasource_tests.rs b/graph/tests/subgraph_datasource_tests.rs index 2c357bf37cd..d13df6be6dd 100644 --- a/graph/tests/subgraph_datasource_tests.rs +++ b/graph/tests/subgraph_datasource_tests.rs @@ -105,10 +105,10 @@ async fn test_triggers_adapter_with_entities() { let id = DeploymentHash::new("test_deployment").unwrap(); let schema = InputSchema::parse_latest( r#" - type User @entity { + type User @entity { id: String! name: String! - age: Int + age: Int } type Post @entity { id: String! diff --git a/runtime/test/src/common.rs b/runtime/test/src/common.rs index 7641dd06d8b..d0cb3a75555 100644 --- a/runtime/test/src/common.rs +++ b/runtime/test/src/common.rs @@ -126,6 +126,7 @@ pub fn mock_context( )) .unwrap(), Default::default(), + LATEST_VERSION.clone(), ), proof_of_indexing: None, host_fns: Arc::new(Vec::new()), diff --git a/runtime/wasm/src/mapping.rs b/runtime/wasm/src/mapping.rs index 8086051961a..ae1baf9cd37 100644 --- a/runtime/wasm/src/mapping.rs +++ b/runtime/wasm/src/mapping.rs @@ -214,7 +214,11 @@ impl MappingContext { host_exports: self.host_exports.cheap_clone(), block_ptr: self.block_ptr.cheap_clone(), timestamp: self.timestamp, - state: BlockState::new(self.state.entity_cache.store.clone(), Default::default()), + state: BlockState::new( + self.state.entity_cache.store.clone(), + Default::default(), + self.state.entity_cache.spec_version.clone().unwrap(), // TODO: handle this unwrap + ), proof_of_indexing: self.proof_of_indexing.cheap_clone(), host_fns: self.host_fns.cheap_clone(), debug_fork: self.debug_fork.cheap_clone(), diff --git a/runtime/wasm/src/module/context.rs b/runtime/wasm/src/module/context.rs index 03cbf244c23..0ff64cfd67a 100644 --- a/runtime/wasm/src/module/context.rs +++ b/runtime/wasm/src/module/context.rs @@ -133,7 +133,11 @@ impl WasmInstanceData { std::mem::replace( state, - BlockState::new(state.entity_cache.store.cheap_clone(), LfuCache::default()), + BlockState::new( + state.entity_cache.store.cheap_clone(), + LfuCache::default(), + state.entity_cache.spec_version.clone().unwrap(), // TODO: handle this unwrap + ), ) } } From b107559b15c55a1f9d80f10d7562a07722e976bb Mon Sep 17 00:00:00 2001 From: incrypto32 Date: Thu, 27 Feb 2025 14:14:24 +0400 Subject: [PATCH 2/4] use `strict_vid_order` bool instead of spec version in entity cache for vid generation --- core/src/subgraph/inputs.rs | 8 +++++++- core/src/subgraph/runner.rs | 6 +++--- graph/src/components/store/entity_cache.rs | 9 ++++----- graph/src/components/subgraph/instance.rs | 6 ++---- runtime/test/src/common.rs | 2 +- runtime/wasm/src/mapping.rs | 2 +- runtime/wasm/src/module/context.rs | 2 +- 7 files changed, 19 insertions(+), 16 deletions(-) diff --git a/core/src/subgraph/inputs.rs b/core/src/subgraph/inputs.rs index e595f84d0d3..da596c44cf8 100644 --- a/core/src/subgraph/inputs.rs +++ b/core/src/subgraph/inputs.rs @@ -4,7 +4,7 @@ use graph::{ store::{DeploymentLocator, SourceableStore, SubgraphFork, WritableStore}, subgraph::ProofOfIndexingVersion, }, - data::subgraph::{SubgraphFeature, UnifiedMappingApiVersion}, + data::subgraph::{SubgraphFeature, UnifiedMappingApiVersion, SPEC_VERSION_1_3_0}, data_source::DataSourceTemplate, prelude::BlockNumber, semver::Version, @@ -79,4 +79,10 @@ impl IndexingInputs { instrument: *instrument, } } + + // Whether to use strict vid order for the subgraph + // This is true for all subgraphs with spec version 1.3.0 or greater + pub fn strict_vid_order(&self) -> bool { + self.spec_version >= SPEC_VERSION_1_3_0 + } } diff --git a/core/src/subgraph/runner.rs b/core/src/subgraph/runner.rs index c033e483cd1..04cf6fee455 100644 --- a/core/src/subgraph/runner.rs +++ b/core/src/subgraph/runner.rs @@ -382,7 +382,7 @@ where let mut block_state = BlockState::new( self.inputs.store.clone(), std::mem::take(&mut self.state.entity_lfu_cache), - self.inputs.spec_version.clone(), + self.inputs.strict_vid_order(), ); let _section = self @@ -796,7 +796,7 @@ where let block_state = BlockState::new( self.inputs.store.clone(), std::mem::take(&mut self.state.entity_lfu_cache), - self.inputs.spec_version.clone(), + self.inputs.strict_vid_order(), ); self.ctx @@ -1161,7 +1161,7 @@ where let mut block_state = BlockState::new( EmptyStore::new(schema), LfuCache::new(), - self.inputs.spec_version.clone(), + self.inputs.strict_vid_order(), ); // PoI ignores offchain events. diff --git a/graph/src/components/store/entity_cache.rs b/graph/src/components/store/entity_cache.rs index 970bced03e4..f802d9e34b9 100644 --- a/graph/src/components/store/entity_cache.rs +++ b/graph/src/components/store/entity_cache.rs @@ -12,7 +12,6 @@ use crate::prelude::{CacheWeight, ENV_VARS}; use crate::schema::{EntityKey, InputSchema}; use crate::util::intern::Error as InternError; use crate::util::lfu_cache::{EvictStats, LfuCache}; -use semver::Version; use super::{BlockNumber, DerivedEntityQuery, LoadRelatedRequest, StoreError}; @@ -116,7 +115,7 @@ pub struct EntityCache { pub vid_seq: u32, /// The spec version of the subgraph being processed - pub spec_version: Option, + pub strict_vid_order: bool, } impl Debug for EntityCache { @@ -145,7 +144,7 @@ impl EntityCache { store, seq: 0, vid_seq: RESERVED_VIDS, - spec_version: None, + strict_vid_order: false, } } @@ -160,7 +159,7 @@ impl EntityCache { pub fn with_current( store: Arc, current: EntityLfuCache, - spec_version: Version, + strict_vid_order: bool, ) -> EntityCache { EntityCache { current, @@ -171,7 +170,7 @@ impl EntityCache { store, seq: 0, vid_seq: RESERVED_VIDS, - spec_version: Some(spec_version), + strict_vid_order, } } diff --git a/graph/src/components/subgraph/instance.rs b/graph/src/components/subgraph/instance.rs index bacf542f907..bed406646c3 100644 --- a/graph/src/components/subgraph/instance.rs +++ b/graph/src/components/subgraph/instance.rs @@ -1,5 +1,3 @@ -use semver::Version; - use crate::{ blockchain::{Blockchain, DataSourceTemplate as _}, components::{ @@ -89,9 +87,9 @@ pub struct BlockState { } impl BlockState { - pub fn new(store: impl ReadStore, lfu_cache: EntityLfuCache, spec_version: Version) -> Self { + pub fn new(store: impl ReadStore, lfu_cache: EntityLfuCache, strict_vid_order: bool) -> Self { BlockState { - entity_cache: EntityCache::with_current(Arc::new(store), lfu_cache, spec_version), + entity_cache: EntityCache::with_current(Arc::new(store), lfu_cache, strict_vid_order), deterministic_errors: Vec::new(), created_data_sources: Vec::new(), persisted_data_sources: Vec::new(), diff --git a/runtime/test/src/common.rs b/runtime/test/src/common.rs index d0cb3a75555..245f46d1f61 100644 --- a/runtime/test/src/common.rs +++ b/runtime/test/src/common.rs @@ -126,7 +126,7 @@ pub fn mock_context( )) .unwrap(), Default::default(), - LATEST_VERSION.clone(), + false, // TODO: Get this as a param ), proof_of_indexing: None, host_fns: Arc::new(Vec::new()), diff --git a/runtime/wasm/src/mapping.rs b/runtime/wasm/src/mapping.rs index ae1baf9cd37..6a5f95438b2 100644 --- a/runtime/wasm/src/mapping.rs +++ b/runtime/wasm/src/mapping.rs @@ -217,7 +217,7 @@ impl MappingContext { state: BlockState::new( self.state.entity_cache.store.clone(), Default::default(), - self.state.entity_cache.spec_version.clone().unwrap(), // TODO: handle this unwrap + self.state.entity_cache.strict_vid_order, ), proof_of_indexing: self.proof_of_indexing.cheap_clone(), host_fns: self.host_fns.cheap_clone(), diff --git a/runtime/wasm/src/module/context.rs b/runtime/wasm/src/module/context.rs index 0ff64cfd67a..a7a25f38a58 100644 --- a/runtime/wasm/src/module/context.rs +++ b/runtime/wasm/src/module/context.rs @@ -136,7 +136,7 @@ impl WasmInstanceData { BlockState::new( state.entity_cache.store.cheap_clone(), LfuCache::default(), - state.entity_cache.spec_version.clone().unwrap(), // TODO: handle this unwrap + state.entity_cache.strict_vid_order, ), ) } From 4f220d6216c30f582f4411e2fcbf49fb9d7635c1 Mon Sep 17 00:00:00 2001 From: Zoran Cvetkov Date: Thu, 27 Feb 2025 12:24:00 +0200 Subject: [PATCH 3/4] check does the entity need a vid --- graph/src/components/store/entity_cache.rs | 28 ++++++++++++---------- store/postgres/src/relational_queries.rs | 6 ++++- 2 files changed, 21 insertions(+), 13 deletions(-) diff --git a/graph/src/components/store/entity_cache.rs b/graph/src/components/store/entity_cache.rs index f802d9e34b9..4ab4ec6b909 100644 --- a/graph/src/components/store/entity_cache.rs +++ b/graph/src/components/store/entity_cache.rs @@ -406,19 +406,23 @@ impl EntityCache { *write_capacity_remaining -= weight; } - // The next VID is based on a block number and a sequence within the block - let vid = ((block as i64) << 32) + self.vid_seq as i64; - self.vid_seq += 1; + let is_object = key.entity_type.is_object_type(); + let mut entity = entity; - let old_vid = entity.set_vid(vid).expect("the vid should be set"); - // Make sure that there was no VID previously set for this entity. - if let Some(ovid) = old_vid { - bail!( - "VID: {} of entity: {} with ID: {} was already present when set in EntityCache", - ovid, - key.entity_type, - entity.id() - ); + if self.strict_vid_order && is_object { + // The next VID is based on a block number and a sequence within the block + let vid = ((block as i64) << 32) + self.vid_seq as i64; + self.vid_seq += 1; + let old_vid = entity.set_vid(vid).expect("the vid should be set"); + // Make sure that there was no VID previously set for this entity. + if let Some(ovid) = old_vid { + bail!( + "VID: {} of entity: {} with ID: {} was already present when set in EntityCache", + ovid, + key.entity_type, + entity.id() + ); + } } self.entity_op(key.clone(), EntityOp::Update(entity)); diff --git a/store/postgres/src/relational_queries.rs b/store/postgres/src/relational_queries.rs index f4b55e89150..b04c22cdeb7 100644 --- a/store/postgres/src/relational_queries.rs +++ b/store/postgres/src/relational_queries.rs @@ -2257,7 +2257,11 @@ impl<'a> InsertRow<'a> { } let br_value = BlockRangeValue::new(table, row.block, row.end); let causality_region = row.causality_region; - let vid = row.entity.vid(); + let vid = if table.object.has_vid_seq() { + row.entity.vid() + } else { + 0 + }; Ok(Self { values, br_value, From fda7d7c2c3dab8b8b279b5a5fd3412c2f3a69065 Mon Sep 17 00:00:00 2001 From: Zoran Cvetkov Date: Thu, 27 Feb 2025 14:46:33 +0200 Subject: [PATCH 4/4] remove vid from comparison --- graph/src/components/store/entity_cache.rs | 14 +++----------- graph/src/data/store/mod.rs | 7 +++++++ 2 files changed, 10 insertions(+), 11 deletions(-) diff --git a/graph/src/components/store/entity_cache.rs b/graph/src/components/store/entity_cache.rs index 4ab4ec6b909..e5f8d25b5b0 100644 --- a/graph/src/components/store/entity_cache.rs +++ b/graph/src/components/store/entity_cache.rs @@ -222,16 +222,8 @@ impl EntityCache { // always creates it in a new style. debug_assert!(match scope { GetScope::Store => { - // Release build will never call this function and hence it's OK - // when that implementation is not correct. fn remove_vid(entity: Option>) -> Option { - entity.map(|e| { - #[allow(unused_mut)] - let mut entity = (*e).clone(); - #[cfg(debug_assertions)] - entity.remove("vid"); - entity - }) + entity.map(|e| e.clone_no_vid()) } remove_vid(entity.clone()) == remove_vid(self.store.get(key).unwrap().map(Arc::new)) } @@ -547,7 +539,7 @@ impl EntityCache { .map_err(|e| key.unknown_attribute(e))?; let data = Arc::new(data); self.current.insert(key.clone(), Some(data.cheap_clone())); - if current != data { + if current.clone_no_vid() != data.clone_no_vid() { Some(Overwrite { key, data, @@ -562,7 +554,7 @@ impl EntityCache { (Some(current), EntityOp::Overwrite(data)) => { let data = Arc::new(data); self.current.insert(key.clone(), Some(data.cheap_clone())); - if current != data { + if current.clone_no_vid() != data.clone_no_vid() { Some(Overwrite { key, data, diff --git a/graph/src/data/store/mod.rs b/graph/src/data/store/mod.rs index e0912a16485..9eaaf352d0c 100644 --- a/graph/src/data/store/mod.rs +++ b/graph/src/data/store/mod.rs @@ -930,6 +930,13 @@ impl Entity { self.0.insert(VID_FIELD, value.into()) } + /// Clone entity and remove the VID. + pub fn clone_no_vid(&self) -> Entity { + let mut c = self.clone(); + c.0.remove(VID_FIELD); + c + } + /// Sets the VID if it's not already set. Should be used only for tests. #[cfg(debug_assertions)] pub fn set_vid_if_empty(&mut self) {