Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions chain/substreams/src/trigger.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,7 @@ where
state.entity_cache.set(
key,
entity,
block.number,
Some(&mut state.write_capacity_remaining),
)?;
}
Expand Down
5 changes: 4 additions & 1 deletion core/src/subgraph/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1670,6 +1670,7 @@ async fn update_proof_of_indexing(
key: EntityKey,
digest: Bytes,
block_time: BlockTime,
block: BlockNumber,
) -> Result<(), Error> {
let digest_name = entity_cache.schema.poi_digest();
let mut data = vec![
Expand All @@ -1684,11 +1685,12 @@ async fn update_proof_of_indexing(
data.push((entity_cache.schema.poi_block_time(), block_time));
}
let poi = entity_cache.make_entity(data)?;
entity_cache.set(key, poi, None)
entity_cache.set(key, poi, block, None)
}

let _section_guard = stopwatch.start_section("update_proof_of_indexing");

let block_number = proof_of_indexing.get_block();
let mut proof_of_indexing = proof_of_indexing.take();

for (causality_region, stream) in proof_of_indexing.drain() {
Expand Down Expand Up @@ -1724,6 +1726,7 @@ async fn update_proof_of_indexing(
entity_key,
updated_proof_of_indexing,
block_time,
block_number,
)?;
}

Expand Down
31 changes: 28 additions & 3 deletions graph/src/components/store/entity_cache.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use anyhow::anyhow;
use anyhow::{anyhow, bail};
use std::borrow::Borrow;
use std::collections::HashMap;
use std::fmt::{self, Debug};
Expand All @@ -17,6 +17,10 @@ use super::{BlockNumber, DerivedEntityQuery, LoadRelatedRequest, StoreError};

pub type EntityLfuCache = LfuCache<EntityKey, Option<Arc<Entity>>>;

// Number of VIDs that are reserved outside of the generated ones here.
// Currently none is used, but lets reserve a few more.
const RESERVED_VIDS: u32 = 100;

/// The scope in which the `EntityCache` should perform a `get` operation
pub enum GetScope {
/// Get from all previously stored entities in the store
Expand Down Expand Up @@ -105,6 +109,10 @@ pub struct EntityCache {
/// generated IDs, the `EntityCache` needs to be newly instantiated for
/// each block
seq: u32,

// Sequence number of the next VID value for this block. The value written
// in the database consist of a block number and this SEQ number.
pub vid_seq: u32,
}

impl Debug for EntityCache {
Expand Down Expand Up @@ -132,6 +140,7 @@ impl EntityCache {
schema: store.input_schema(),
store,
seq: 0,
vid_seq: RESERVED_VIDS,
}
}

Expand All @@ -152,6 +161,7 @@ impl EntityCache {
schema: store.input_schema(),
store,
seq: 0,
vid_seq: RESERVED_VIDS,
}
}

Expand Down Expand Up @@ -353,14 +363,14 @@ impl EntityCache {
&mut self,
key: EntityKey,
entity: Entity,
block: BlockNumber,
write_capacity_remaining: Option<&mut usize>,
) -> Result<(), anyhow::Error> {
// check the validate for derived fields
let is_valid = entity.validate(&key).is_ok();

if let Some(write_capacity_remaining) = write_capacity_remaining {
let weight = entity.weight();

if !self.current.contains_key(&key) && weight > *write_capacity_remaining {
return Err(anyhow!(
"exceeded block write limit when writing entity `{}`",
Expand All @@ -371,6 +381,21 @@ impl EntityCache {
*write_capacity_remaining -= weight;
}

// The next VID is based on a block number and a sequence within the block
let vid = ((block as i64) << 32) + self.vid_seq as i64;
self.vid_seq += 1;
let mut entity = entity;
let old_vid = entity.set_vid(vid).expect("the vid should be set");
// Make sure that there was no VID previously set for this entity.
if let Some(ovid) = old_vid {
bail!(
"VID: {} of entity: {} with ID: {} was already present when set in EntityCache",
ovid,
key.entity_type,
entity.id()
);
}

self.entity_op(key.clone(), EntityOp::Update(entity));

// The updates we were given are not valid by themselves; force a
Expand Down Expand Up @@ -507,7 +532,7 @@ impl EntityCache {
// Entity was removed and then updated, so it will be overwritten
(Some(current), EntityOp::Overwrite(data)) => {
let data = Arc::new(data);
self.current.insert(key.clone(), Some(data.clone()));
self.current.insert(key.clone(), Some(data.cheap_clone()));
if current != data {
Some(Overwrite {
key,
Expand Down
4 changes: 4 additions & 0 deletions graph/src/components/subgraph/proof_of_indexing/online.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,10 @@ impl ProofOfIndexing {
pub fn take(self) -> HashMap<Id, BlockEventStream> {
self.per_causality_region
}

pub fn get_block(&self) -> BlockNumber {
self.block_number
}
}

pub struct ProofOfIndexingFinisher {
Expand Down
25 changes: 24 additions & 1 deletion graph/src/data/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::{
derive::CacheWeight,
prelude::{lazy_static, q, r, s, CacheWeight, QueryExecutionError},
runtime::gas::{Gas, GasSizeOf},
schema::{EntityKey, EntityType},
schema::{input::VID_FIELD, EntityKey, EntityType},
util::intern::{self, AtomPool},
util::intern::{Error as InternError, NullValue, Object},
};
Expand Down Expand Up @@ -910,6 +910,29 @@ impl Entity {
Id::try_from(self.get("id").unwrap().clone()).expect("the id is set to a valid value")
}

/// Return the VID of this entity and if its missing or of a type different than
/// i64 it panics.
pub fn vid(&self) -> i64 {
self.get(VID_FIELD)
.expect("the vid must be set")
.as_int8()
.expect("the vid must be set to a valid value")
}

/// Sets the VID of the entity. The previous one is returned.
pub fn set_vid(&mut self, value: i64) -> Result<Option<Value>, InternError> {
self.0.insert(VID_FIELD, value.into())
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One additional thought: it would be great if we could guarantee that an entity is always constructed with the vid set, similar to how that's done for the id. I haven't checked how much work that is, but it would make the various expects sprinkled around here easier to accept.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like the suggestion but would like to postpone it together with the testing request above for a later stage. WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The test for the VID order is implemented now: 166c909

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am fine with postponing the enforcement of having a vid


/// Sets the VID if it's not already set. Should be used only for tests.
#[cfg(debug_assertions)]
pub fn set_vid_if_empty(&mut self) {
let vid = self.get(VID_FIELD);
if vid.is_none() {
let _ = self.set_vid(100).expect("the vid should be set");
}
}

/// Merges an entity update `update` into this entity.
///
/// If a key exists in both entities, the value from `update` is chosen.
Expand Down
7 changes: 5 additions & 2 deletions graph/src/data/subgraph/api_version.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,11 +54,14 @@ pub const SPEC_VERSION_1_1_0: Version = Version::new(1, 1, 0);
// Enables eth call declarations and indexed arguments(topics) filtering in manifest
pub const SPEC_VERSION_1_2_0: Version = Version::new(1, 2, 0);

// Enables subgraphs as datasource
// Enables subgraphs as datasource.
// Changes the way the VID field is generated. It used to be autoincrement. Now its
// based on block number and the order of the entities in a block. The latter
// represents the write order across all entity types in the subgraph.
pub const SPEC_VERSION_1_3_0: Version = Version::new(1, 3, 0);

// The latest spec version available
pub const LATEST_VERSION: &Version = &SPEC_VERSION_1_2_0;
pub const LATEST_VERSION: &Version = &SPEC_VERSION_1_3_0;

pub const MIN_SPEC_VERSION: Version = Version::new(0, 0, 2);

Expand Down
3 changes: 3 additions & 0 deletions graph/src/schema/input/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub(crate) const POI_OBJECT: &str = "Poi$";
const POI_DIGEST: &str = "digest";
/// The name of the PoI attribute for storing the block time
const POI_BLOCK_TIME: &str = "blockTime";
pub(crate) const VID_FIELD: &str = "vid";

pub mod kw {
pub const ENTITY: &str = "entity";
Expand Down Expand Up @@ -1597,6 +1598,8 @@ fn atom_pool(document: &s::Document) -> AtomPool {
pool.intern(POI_DIGEST);
pool.intern(POI_BLOCK_TIME);

pool.intern(VID_FIELD);

for definition in &document.definitions {
match definition {
s::Definition::TypeDefinition(typedef) => match typedef {
Expand Down
2 changes: 1 addition & 1 deletion graph/src/schema/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ pub mod ast;
mod entity_key;
mod entity_type;
mod fulltext;
mod input;
pub(crate) mod input;

pub use api::{is_introspection_field, APISchemaError, INTROSPECTION_QUERY_TYPE};

Expand Down
12 changes: 6 additions & 6 deletions runtime/test/src/test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -477,13 +477,13 @@ async fn test_ipfs_block() {
// The user_data value we use with calls to ipfs_map
const USER_DATA: &str = "user_data";

fn make_thing(id: &str, value: &str) -> (String, EntityModification) {
fn make_thing(id: &str, value: &str, vid: i64) -> (String, EntityModification) {
const DOCUMENT: &str = " type Thing @entity { id: String!, value: String!, extra: String }";
lazy_static! {
static ref SCHEMA: InputSchema = InputSchema::raw(DOCUMENT, "doesntmatter");
static ref THING_TYPE: EntityType = SCHEMA.entity_type("Thing").unwrap();
}
let data = entity! { SCHEMA => id: id, value: value, extra: USER_DATA };
let data = entity! { SCHEMA => id: id, value: value, extra: USER_DATA, vid: vid };
let key = THING_TYPE.parse_key(id).unwrap();
(
format!("{{ \"id\": \"{}\", \"value\": \"{}\"}}", id, value),
Expand Down Expand Up @@ -553,8 +553,8 @@ async fn test_ipfs_map(api_version: Version, json_error_msg: &str) {
let subgraph_id = "ipfsMap";

// Try it with two valid objects
let (str1, thing1) = make_thing("one", "eins");
let (str2, thing2) = make_thing("two", "zwei");
let (str1, thing1) = make_thing("one", "eins", 100);
let (str2, thing2) = make_thing("two", "zwei", 100);
let ops = run_ipfs_map(
subgraph_id,
format!("{}\n{}", str1, str2),
Expand Down Expand Up @@ -1001,8 +1001,8 @@ async fn test_entity_store(api_version: Version) {

let schema = store.input_schema(&deployment.hash).unwrap();

let alex = entity! { schema => id: "alex", name: "Alex" };
let steve = entity! { schema => id: "steve", name: "Steve" };
let alex = entity! { schema => id: "alex", name: "Alex", vid: 0i64 };
let steve = entity! { schema => id: "steve", name: "Steve", vid: 1i64 };
let user_type = schema.entity_type("User").unwrap();
test_store::insert_entities(
&deployment,
Expand Down
9 changes: 6 additions & 3 deletions runtime/wasm/src/host_exports.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,9 +350,12 @@ impl HostExports {

state.metrics.track_entity_write(&entity_type, &entity);

state
.entity_cache
.set(key, entity, Some(&mut state.write_capacity_remaining))?;
state.entity_cache.set(
key,
entity,
block,
Some(&mut state.write_capacity_remaining),
)?;

Ok(())
}
Expand Down
13 changes: 11 additions & 2 deletions store/postgres/src/relational/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,19 +116,27 @@ impl Table {
Ok(cols)
}

// Currently the agregations entities don't have VIDs in insertion order
let vid_type = if self.object.is_object_type() {
"bigint"
} else {
"bigserial"
};

if self.immutable {
writeln!(
out,
"
create table {qname} (
{vid} bigserial primary key,
{vid} {vid_type} primary key,
{block} int not null,\n\
{cols},
unique({id})
);",
qname = self.qualified_name,
cols = columns_ddl(self)?,
vid = VID_COLUMN,
vid_type = vid_type,
block = BLOCK_COLUMN,
id = self.primary_key().name
)
Expand All @@ -137,13 +145,14 @@ impl Table {
out,
r#"
create table {qname} (
{vid} bigserial primary key,
{vid} {vid_type} primary key,
{block_range} int4range not null,
{cols}
);"#,
qname = self.qualified_name,
cols = columns_ddl(self)?,
vid = VID_COLUMN,
vid_type = vid_type,
block_range = BLOCK_RANGE_COLUMN
)?;

Expand Down
Loading