Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion chain/substreams/src/trigger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ where
logger,
);

state.entity_cache.set(key, entity)?;
state.entity_cache.set(key, entity, block.number)?;
}
ParsedChanges::Delete(entity_key) => {
let entity_type = entity_key.entity_type.cheap_clone();
Expand Down
5 changes: 4 additions & 1 deletion core/src/subgraph/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1603,6 +1603,7 @@ async fn update_proof_of_indexing(
key: EntityKey,
digest: Bytes,
block_time: BlockTime,
block: BlockNumber,
) -> Result<(), Error> {
let digest_name = entity_cache.schema.poi_digest();
let mut data = vec![
Expand All @@ -1617,11 +1618,12 @@ async fn update_proof_of_indexing(
data.push((entity_cache.schema.poi_block_time(), block_time));
}
let poi = entity_cache.make_entity(data)?;
entity_cache.set(key, poi)
entity_cache.set(key, poi, block)
}

let _section_guard = stopwatch.start_section("update_proof_of_indexing");

let block_number = proof_of_indexing.get_block();
let mut proof_of_indexing = proof_of_indexing.take();

for (causality_region, stream) in proof_of_indexing.drain() {
Expand Down Expand Up @@ -1657,6 +1659,7 @@ async fn update_proof_of_indexing(
entity_key,
updated_proof_of_indexing,
block_time,
block_number,
)?;
}

Expand Down
29 changes: 26 additions & 3 deletions graph/src/components/store/entity_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,10 @@ use super::{BlockNumber, DerivedEntityQuery, LoadRelatedRequest, StoreError};

pub type EntityLfuCache = LfuCache<EntityKey, Option<Arc<Entity>>>;

// Number of VIDs that are reserved ourside of the generated ones here.
// Currently only 1 for POIs is used, but lets reserve a few more.
const RESERVED_VIDS: u32 = 100;

/// The scope in which the `EntityCache` should perform a `get` operation
pub enum GetScope {
/// Get from all previously stored entities in the store
Expand Down Expand Up @@ -105,6 +109,10 @@ pub struct EntityCache {
/// generated IDs, the `EntityCache` needs to be newly instantiated for
/// each block
seq: u32,

// Sequence number of the next VID value for this block. The value written
// in the database consist of a block number and this SEQ number.
pub vid_seq: u32,
}

impl Debug for EntityCache {
Expand Down Expand Up @@ -132,6 +140,7 @@ impl EntityCache {
schema: store.input_schema(),
store,
seq: 0,
vid_seq: RESERVED_VIDS,
}
}

Expand All @@ -152,6 +161,7 @@ impl EntityCache {
schema: store.input_schema(),
store,
seq: 0,
vid_seq: RESERVED_VIDS,
}
}

Expand Down Expand Up @@ -278,7 +288,7 @@ impl EntityCache {
) -> Result<Option<Entity>, anyhow::Error> {
match op {
EntityOp::Update(entity) | EntityOp::Overwrite(entity)
if query.matches(key, entity) =>
if query.matches(key, &entity) =>
{
Ok(Some(entity.clone()))
}
Expand Down Expand Up @@ -349,10 +359,23 @@ impl EntityCache {
/// with existing data. The entity will be validated against the
/// subgraph schema, and any errors will result in an `Err` being
/// returned.
pub fn set(&mut self, key: EntityKey, entity: Entity) -> Result<(), anyhow::Error> {
pub fn set(
&mut self,
key: EntityKey,
entity: Entity,
block: BlockNumber,
) -> Result<(), anyhow::Error> {
// check the validate for derived fields
let is_valid = entity.validate(&key).is_ok();

// The next VID is based on a block number and a sequence within the block
let vid = ((block as i64) << 32) + self.vid_seq as i64;
self.vid_seq += 1;
let mut entity = entity;
let old_vid = entity.set_vid(vid).expect("the vid should be set");
// Make sure that there was no VID previously set for this entity.
assert!(old_vid.is_none());

self.entity_op(key.clone(), EntityOp::Update(entity));

// The updates we were given are not valid by themselves; force a
Expand Down Expand Up @@ -489,7 +512,7 @@ impl EntityCache {
// Entity was removed and then updated, so it will be overwritten
(Some(current), EntityOp::Overwrite(data)) => {
let data = Arc::new(data);
self.current.insert(key.clone(), Some(data.clone()));
self.current.insert(key.clone(), Some(data.cheap_clone()));
if current != data {
Some(Overwrite {
key,
Expand Down
4 changes: 4 additions & 0 deletions graph/src/components/subgraph/proof_of_indexing/online.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,10 @@ impl ProofOfIndexing {
pub fn take(self) -> HashMap<Id, BlockEventStream> {
self.per_causality_region
}

pub fn get_block(&self) -> BlockNumber {
self.block_number
}
}

pub struct ProofOfIndexingFinisher {
Expand Down
23 changes: 23 additions & 0 deletions graph/src/data/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -910,6 +910,29 @@ impl Entity {
Id::try_from(self.get("id").unwrap().clone()).expect("the id is set to a valid value")
}

/// Return the VID of this entity and if its missing or of a type different than
/// i64 it panics.
pub fn vid(&self) -> i64 {
self.get("vid")
.expect("the vid is set")
.as_int8()
.expect("the vid is set to a valid value")
}

/// Sets the VID of the entity. The previous one is returned.
pub fn set_vid(&mut self, value: i64) -> Result<Option<Value>, InternError> {
self.0.insert("vid", value.into())
}

/// Sets the VID if it's not already set. Should be used only for tests.
#[cfg(debug_assertions)]
pub fn set_vid_if_empty(&mut self) {
let vid = self.get("vid");
if vid.is_none() {
let _ = self.set_vid(100).expect("the vid should be set");
}
}

/// Merges an entity update `update` into this entity.
///
/// If a key exists in both entities, the value from `update` is chosen.
Expand Down
4 changes: 3 additions & 1 deletion graph/src/data/subgraph/api_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,12 @@ pub const SPEC_VERSION_1_1_0: Version = Version::new(1, 1, 0);
pub const SPEC_VERSION_1_2_0: Version = Version::new(1, 2, 0);

// Enables subgraphs as datasource
// Change the way the VID field is generated. It used to be autoincrement. Now its
// based on block number and the sequence of the entities in a block.
pub const SPEC_VERSION_1_3_0: Version = Version::new(1, 3, 0);

// The latest spec version available
pub const LATEST_VERSION: &Version = &SPEC_VERSION_1_2_0;
pub const LATEST_VERSION: &Version = &SPEC_VERSION_1_3_0;

pub const MIN_SPEC_VERSION: Version = Version::new(0, 0, 2);

Expand Down
3 changes: 3 additions & 0 deletions graph/src/schema/input/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub(crate) const POI_OBJECT: &str = "Poi$";
const POI_DIGEST: &str = "digest";
/// The name of the PoI attribute for storing the block time
const POI_BLOCK_TIME: &str = "blockTime";
const VID_FIELD: &str = "vid";

pub mod kw {
pub const ENTITY: &str = "entity";
Expand Down Expand Up @@ -1597,6 +1598,8 @@ fn atom_pool(document: &s::Document) -> AtomPool {
pool.intern(POI_DIGEST);
pool.intern(POI_BLOCK_TIME);

pool.intern(VID_FIELD);

for definition in &document.definitions {
match definition {
s::Definition::TypeDefinition(typedef) => match typedef {
Expand Down
12 changes: 6 additions & 6 deletions runtime/test/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -476,13 +476,13 @@ async fn test_ipfs_block() {
// The user_data value we use with calls to ipfs_map
const USER_DATA: &str = "user_data";

fn make_thing(id: &str, value: &str) -> (String, EntityModification) {
fn make_thing(id: &str, value: &str, vid: i64) -> (String, EntityModification) {
const DOCUMENT: &str = " type Thing @entity { id: String!, value: String!, extra: String }";
lazy_static! {
static ref SCHEMA: InputSchema = InputSchema::raw(DOCUMENT, "doesntmatter");
static ref THING_TYPE: EntityType = SCHEMA.entity_type("Thing").unwrap();
}
let data = entity! { SCHEMA => id: id, value: value, extra: USER_DATA };
let data = entity! { SCHEMA => id: id, value: value, extra: USER_DATA, vid: vid };
let key = THING_TYPE.parse_key(id).unwrap();
(
format!("{{ \"id\": \"{}\", \"value\": \"{}\"}}", id, value),
Expand Down Expand Up @@ -552,8 +552,8 @@ async fn test_ipfs_map(api_version: Version, json_error_msg: &str) {
let subgraph_id = "ipfsMap";

// Try it with two valid objects
let (str1, thing1) = make_thing("one", "eins");
let (str2, thing2) = make_thing("two", "zwei");
let (str1, thing1) = make_thing("one", "eins", 100);
let (str2, thing2) = make_thing("two", "zwei", 100);
let ops = run_ipfs_map(
ipfs.clone(),
subgraph_id,
Expand Down Expand Up @@ -1022,8 +1022,8 @@ async fn test_entity_store(api_version: Version) {

let schema = store.input_schema(&deployment.hash).unwrap();

let alex = entity! { schema => id: "alex", name: "Alex" };
let steve = entity! { schema => id: "steve", name: "Steve" };
let alex = entity! { schema => id: "alex", name: "Alex", vid: 0i64 };
let steve = entity! { schema => id: "steve", name: "Steve", vid: 1i64 };
let user_type = schema.entity_type("User").unwrap();
test_store::insert_entities(
&deployment,
Expand Down
2 changes: 1 addition & 1 deletion runtime/wasm/src/host_exports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ impl HostExports {

state.metrics.track_entity_write(&entity_type, &entity);

state.entity_cache.set(key, entity)?;
state.entity_cache.set(key, entity, block)?;

Ok(())
}
Expand Down
12 changes: 10 additions & 2 deletions store/postgres/src/relational/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,19 +116,26 @@ impl Table {
Ok(cols)
}

let vid_type = if !self.object.is_object_type() {
"bigserial"
} else {
"bigint"
};

if self.immutable {
writeln!(
out,
"
create table {qname} (
{vid} bigserial primary key,
{vid} {vid_type} primary key,
{block} int not null,\n\
{cols},
unique({id})
);",
qname = self.qualified_name,
cols = columns_ddl(self)?,
vid = VID_COLUMN,
vid_type = vid_type,
block = BLOCK_COLUMN,
id = self.primary_key().name
)
Expand All @@ -137,13 +144,14 @@ impl Table {
out,
r#"
create table {qname} (
{vid} bigserial primary key,
{vid} {vid_type} primary key,
{block_range} int4range not null,
{cols}
);"#,
qname = self.qualified_name,
cols = columns_ddl(self)?,
vid = VID_COLUMN,
vid_type = vid_type,
block_range = BLOCK_RANGE_COLUMN
)?;

Expand Down
Loading
Loading