From 4ee88d4437fea680fdca73a55561aa8379d8e684 Mon Sep 17 00:00:00 2001 From: Zoran Cvetkov Date: Thu, 20 Feb 2025 20:17:36 +0200 Subject: [PATCH] wrap entity with vids --- graph/src/components/store/entity_cache.rs | 56 ++++++++++++---------- graph/src/components/store/mod.rs | 4 +- graph/src/components/store/write.rs | 15 +++++- graph/src/data/store/mod.rs | 13 +++++ server/index-node/src/resolver.rs | 2 +- store/postgres/src/relational.rs | 5 +- 6 files changed, 63 insertions(+), 32 deletions(-) diff --git a/graph/src/components/store/entity_cache.rs b/graph/src/components/store/entity_cache.rs index 1a002789d8f..5f386b7bc48 100644 --- a/graph/src/components/store/entity_cache.rs +++ b/graph/src/components/store/entity_cache.rs @@ -7,7 +7,7 @@ use std::sync::Arc; use crate::cheap_clone::CheapClone; use crate::components::store::write::EntityModification; use crate::components::store::{self as s, Entity, EntityOperation}; -use crate::data::store::{EntityValidationError, Id, IdType, IntoEntityIterator}; +use crate::data::store::{EntityV, EntityValidationError, Id, IdType, IntoEntityIterator}; use crate::prelude::{CacheWeight, ENV_VARS}; use crate::schema::{EntityKey, InputSchema}; use crate::util::intern::Error as InternError; @@ -33,8 +33,8 @@ pub enum GetScope { #[derive(Debug, Clone)] enum EntityOp { Remove, - Update(Entity), - Overwrite(Entity), + Update(EntityV), + Overwrite(EntityV), } impl EntityOp { @@ -45,10 +45,10 @@ impl EntityOp { use EntityOp::*; match (self, entity) { (Remove, _) => Ok(None), - (Overwrite(new), _) | (Update(new), None) => Ok(Some(new)), + (Overwrite(new), _) | (Update(new), None) => Ok(Some(new.e)), (Update(updates), Some(entity)) => { let mut e = entity.borrow().clone(); - e.merge_remove_null_fields(updates)?; + e.merge_remove_null_fields(updates.e)?; Ok(Some(e)) } } @@ -69,7 +69,7 @@ impl EntityOp { match self { // This is how `Overwrite` is constructed, by accumulating `Update` onto `Remove`. Remove => *self = Overwrite(update), - Update(current) | Overwrite(current) => current.merge(update), + Update(current) | Overwrite(current) => current.e.merge(update.e), } } } @@ -304,9 +304,9 @@ impl EntityCache { ) -> Result, anyhow::Error> { match op { EntityOp::Update(entity) | EntityOp::Overwrite(entity) - if query.matches(key, entity) => + if query.matches(key, &entity.e) => { - Ok(Some(entity.clone())) + Ok(Some(entity.e.clone())) } EntityOp::Remove => Ok(None), _ => Ok(None), @@ -400,19 +400,19 @@ impl EntityCache { // The next VID is based on a block number and a sequence within the block let vid = ((block as i64) << 32) + self.vid_seq as i64; self.vid_seq += 1; - let mut entity = entity; - let old_vid = entity.set_vid(vid).expect("the vid should be set"); - // Make sure that there was no VID previously set for this entity. - if let Some(ovid) = old_vid { - bail!( - "VID: {} of entity: {} with ID: {} was already present when set in EntityCache", - ovid, - key.entity_type, - entity.id() - ); - } - - self.entity_op(key.clone(), EntityOp::Update(entity)); + // let mut entity = entity; + // let old_vid = entity.set_vid(vid).expect("the vid should be set"); + // // Make sure that there was no VID previously set for this entity. + // if let Some(ovid) = old_vid { + // bail!( + // "VID: {} of entity: {} with ID: {} was already present when set in EntityCache", + // ovid, + // key.entity_type, + // entity.id() + // ); + // } + + self.entity_op(key.clone(), EntityOp::Update(EntityV::new(entity, vid))); // The updates we were given are not valid by themselves; force a // lookup in the database and check again with an entity that merges @@ -517,20 +517,23 @@ impl EntityCache { // Entity was created (None, EntityOp::Update(mut updates)) | (None, EntityOp::Overwrite(mut updates)) => { - updates.remove_null_fields(); - let data = Arc::new(updates); + let vid = updates.vid; + updates.e.remove_null_fields(); + let data = Arc::new(updates.e.clone()); self.current.insert(key.clone(), Some(data.cheap_clone())); Some(Insert { key, data, block, end: None, + vid, }) } // Entity may have been changed (Some(current), EntityOp::Update(updates)) => { + let vid = updates.vid; let mut data = current.as_ref().clone(); - data.merge_remove_null_fields(updates) + data.merge_remove_null_fields(updates.e) .map_err(|e| key.unknown_attribute(e))?; let data = Arc::new(data); self.current.insert(key.clone(), Some(data.cheap_clone())); @@ -540,6 +543,7 @@ impl EntityCache { data, block, end: None, + vid, }) } else { None @@ -547,7 +551,8 @@ impl EntityCache { } // Entity was removed and then updated, so it will be overwritten (Some(current), EntityOp::Overwrite(data)) => { - let data = Arc::new(data); + let vid = data.vid; + let data = Arc::new(data.e.clone()); self.current.insert(key.clone(), Some(data.cheap_clone())); if current != data { Some(Overwrite { @@ -555,6 +560,7 @@ impl EntityCache { data, block, end: None, + vid, }) } else { None diff --git a/graph/src/components/store/mod.rs b/graph/src/components/store/mod.rs index 31b0e62cfae..9713b78c150 100644 --- a/graph/src/components/store/mod.rs +++ b/graph/src/components/store/mod.rs @@ -30,7 +30,7 @@ use crate::cheap_clone::CheapClone; use crate::components::store::write::EntityModification; use crate::constraint_violation; use crate::data::store::scalar::Bytes; -use crate::data::store::{Id, IdList, Value}; +use crate::data::store::{EntityV, Id, IdList, Value}; use crate::data::value::Word; use crate::data_source::CausalityRegion; use crate::derive::CheapClone; @@ -829,7 +829,7 @@ where pub enum EntityOperation { /// Locates the entity specified by `key` and sets its attributes according to the contents of /// `data`. If no entity exists with this key, creates a new entity. - Set { key: EntityKey, data: Entity }, + Set { key: EntityKey, data: EntityV }, /// Removes an entity with the specified key, if one exists. Remove { key: EntityKey }, diff --git a/graph/src/components/store/write.rs b/graph/src/components/store/write.rs index 721e3d80bc1..641af26ce53 100644 --- a/graph/src/components/store/write.rs +++ b/graph/src/components/store/write.rs @@ -45,6 +45,7 @@ pub enum EntityModification { data: Arc, block: BlockNumber, end: Option, + vid: i64, }, /// Update the entity by overwriting it Overwrite { @@ -52,6 +53,7 @@ pub enum EntityModification { data: Arc, block: BlockNumber, end: Option, + vid: i64, }, /// Remove the entity Remove { key: EntityKey, block: BlockNumber }, @@ -67,6 +69,7 @@ pub struct EntityWrite<'a> { // The end of the block range for which this write is valid. The value // of `end` itself is not included in the range pub end: Option, + vid: i64, } impl std::fmt::Display for EntityWrite<'_> { @@ -89,24 +92,28 @@ impl<'a> TryFrom<&'a EntityModification> for EntityWrite<'a> { data, block, end, + vid, } => Ok(EntityWrite { id: &key.entity_id, entity: data, causality_region: key.causality_region, block: *block, end: *end, + vid: *vid, }), EntityModification::Overwrite { key, data, block, end, + vid, } => Ok(EntityWrite { id: &key.entity_id, entity: &data, causality_region: key.causality_region, block: *block, end: *end, + vid: *vid, }), EntityModification::Remove { .. } => Err(()), @@ -213,11 +220,13 @@ impl EntityModification { data, block, end, + vid, } => Ok(Insert { key, data, block, end, + vid, }), Remove { key, .. } => { return Err(constraint_violation!( @@ -271,21 +280,23 @@ impl EntityModification { } impl EntityModification { - pub fn insert(key: EntityKey, data: Entity, block: BlockNumber) -> Self { + pub fn insert(key: EntityKey, data: Entity, block: BlockNumber, vid: i64) -> Self { EntityModification::Insert { key, data: Arc::new(data), block, end: None, + vid, } } - pub fn overwrite(key: EntityKey, data: Entity, block: BlockNumber) -> Self { + pub fn overwrite(key: EntityKey, data: Entity, block: BlockNumber, vid: i64) -> Self { EntityModification::Overwrite { key, data: Arc::new(data), block, end: None, + vid, } } diff --git a/graph/src/data/store/mod.rs b/graph/src/data/store/mod.rs index e0912a16485..c771d00fc7e 100644 --- a/graph/src/data/store/mod.rs +++ b/graph/src/data/store/mod.rs @@ -1105,6 +1105,19 @@ impl std::fmt::Debug for Entity { } } +/// An entity wrapper that has VID too. +#[derive(Debug, Clone, CacheWeight, PartialEq, Eq, Serialize)] +pub struct EntityV { + pub e: Entity, + pub vid: i64, +} + +impl EntityV { + pub fn new(e: Entity, vid: i64) -> Self { + Self { e, vid } + } +} + /// An object that is returned from a query. It's a an `r::Value` which /// carries the attributes of the object (`__typename`, `id` etc.) and /// possibly a pointer to its parent if the query that constructed it is one diff --git a/server/index-node/src/resolver.rs b/server/index-node/src/resolver.rs index 6603d296509..84a4639586a 100644 --- a/server/index-node/src/resolver.rs +++ b/server/index-node/src/resolver.rs @@ -749,7 +749,7 @@ fn entity_changes_to_graphql(entity_changes: Vec) -> r::Value { .push(key.entity_id); } EntityOperation::Set { key, data } => { - updates.entry(key.entity_type).or_default().push(data); + updates.entry(key.entity_type).or_default().push(data.e); } } } diff --git a/store/postgres/src/relational.rs b/store/postgres/src/relational.rs index de7e6895083..c59af005b9b 100644 --- a/store/postgres/src/relational.rs +++ b/store/postgres/src/relational.rs @@ -69,7 +69,7 @@ use crate::{ }, }; use graph::components::store::{AttributeNames, DerivedEntityQuery}; -use graph::data::store::{Id, IdList, IdType, BYTES_SCALAR}; +use graph::data::store::{EntityV, Id, IdList, IdType, BYTES_SCALAR}; use graph::data::subgraph::schema::POI_TABLE; use graph::prelude::{ anyhow, info, BlockNumber, DeploymentHash, Entity, EntityChange, EntityOperation, Logger, @@ -731,9 +731,10 @@ impl Layout { let entity_id = data.id(); processed_entities.insert((entity_type.clone(), entity_id.clone())); + let vid = data.vid(); changes.push(EntityOperation::Set { key: entity_type.key_in(entity_id, CausalityRegion::from_entity(&data)), - data, + data: EntityV::new(data, vid), }); }