Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions graph/src/data/store/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -735,6 +735,8 @@ where
lazy_static! {
/// The name of the id attribute, `"id"`
pub static ref ID: Word = Word::from("id");
/// The name of the vid attribute, `"vid"`
pub static ref VID: Word = Word::from("vid");
}

/// An entity is represented as a map of attribute names to values.
Expand Down
7 changes: 3 additions & 4 deletions graph/src/schema/entity_type.rs
Original file line number Diff line number Diff line change
Expand Up @@ -151,10 +151,9 @@ impl EntityType {
self.schema.is_object_type(self.atom)
}

// Changes the way the VID field is generated. It used to be autoincrement. Now its
// based on block number and the order of the entities in a block. The latter
// represents the write order across all entity types in the subgraph.
pub fn strict_vid_order(&self) -> bool {
/// Whether the table for this entity type uses a sequence for the `vid` or whether
/// `graph-node` sets them explicitly. See also [`InputSchema.strict_vid_order()`]
pub fn has_vid_seq(&self) -> bool {
// Currently the agregations entities don't have VIDs in insertion order
self.schema.strict_vid_order() && self.is_object_type()
}
Expand Down
4 changes: 4 additions & 0 deletions graph/src/schema/input/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1589,6 +1589,10 @@ impl InputSchema {
Some(EntityType::new(self.cheap_clone(), obj_type.name))
}

/// How the values for the VID field are generated.
/// When this is `false`, this subgraph uses the old way of autoincrementing `vid` in the database.
/// When it is `true`, `graph-node` sets the `vid` explicitly to a number based on block number
/// and the order in which entities are written, and comparing by `vid` will order entities by that order.
pub fn strict_vid_order(&self) -> bool {
self.inner.spec_version >= SPEC_VERSION_1_3_0
}
Expand Down
2 changes: 1 addition & 1 deletion store/postgres/src/relational/ddl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ impl Table {
Ok(cols)
}

let vid_type = if self.object.strict_vid_order() {
let vid_type = if self.object.has_vid_seq() {
"bigint"
} else {
"bigserial"
Expand Down
11 changes: 7 additions & 4 deletions store/postgres/src/relational/dsl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use diesel::sql_types::{
use diesel::{AppearsOnTable, Expression, QueryDsl, QueryResult, SelectableExpression};
use diesel_dynamic_schema::DynamicSelectClause;
use graph::components::store::{AttributeNames, BlockNumber, StoreError, BLOCK_NUMBER_MAX};
use graph::data::store::{Id, IdType, ID};
use graph::data::store::{Id, IdType, ID, VID};
use graph::data_source::CausalityRegion;
use graph::prelude::{lazy_static, ENV_VARS};

Expand Down Expand Up @@ -256,12 +256,14 @@ impl<'a> Table<'a> {
match column_names {
AttributeNames::All => {
cols.extend(self.meta.columns.iter());
cols.push(&*VID_COL);
}
AttributeNames::Select(names) => {
let pk = self.meta.primary_key();
cols.push(pk);
let mut names: Vec<_> = names.iter().filter(|name| *name != &*ID).collect();
let mut names: Vec<_> = names
.iter()
.filter(|name| *name != &*ID && *name != &*VID)
.collect();
names.sort();
for name in names {
let column = self.meta.column_for_field(&name)?;
Expand All @@ -283,8 +285,9 @@ impl<'a> Table<'a> {
}
}

cols.push(&*VID_COL);

if T::WITH_SYSTEM_COLUMNS {
cols.push(&*VID_COL);
if self.meta.immutable {
cols.push(&*BLOCK_COL);
} else {
Expand Down
2 changes: 1 addition & 1 deletion store/postgres/src/relational/prune.rs
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ impl TablePair {

// Make sure the vid sequence continues from where it was in case
// that we use autoincrementing order of the DB
if !self.src.object.strict_vid_order() {
if !self.src.object.has_vid_seq() {
writeln!(
query,
"select setval('{dst_nsp}.{vid_seq}', nextval('{src_nsp}.{vid_seq}'));"
Expand Down
14 changes: 7 additions & 7 deletions store/postgres/src/relational_queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2331,7 +2331,7 @@ impl<'a> InsertQuery<'a> {
if table.has_causality_region {
count += 1;
}
if table.object.strict_vid_order() {
if table.object.has_vid_seq() {
count += 1;
}
for column in table.columns.iter() {
Expand All @@ -2355,7 +2355,7 @@ impl<'a> QueryFragment<Pg> for InsertQuery<'a> {
let out = &mut out;
out.unsafe_to_cache_prepared();

let strict_vid_order = self.table.object.strict_vid_order();
let has_vid_seq = self.table.object.has_vid_seq();

// Construct a query
// insert into schema.table(column, ...)
Expand All @@ -2382,7 +2382,7 @@ impl<'a> QueryFragment<Pg> for InsertQuery<'a> {
out.push_sql(CAUSALITY_REGION_COLUMN);
};

if strict_vid_order {
if has_vid_seq {
out.push_sql(", vid");
}
out.push_sql(") values\n");
Expand All @@ -2402,7 +2402,7 @@ impl<'a> QueryFragment<Pg> for InsertQuery<'a> {
out.push_sql(", ");
out.push_bind_param::<Integer, _>(&row.causality_region)?;
};
if strict_vid_order {
if has_vid_seq {
out.push_sql(", ");
out.push_bind_param::<BigInt, _>(&row.vid)?;
}
Expand Down Expand Up @@ -4805,7 +4805,7 @@ impl<'a> QueryFragment<Pg> for CopyEntityBatchQuery<'a> {
fn walk_ast<'b>(&'b self, mut out: AstPass<'_, 'b, Pg>) -> QueryResult<()> {
out.unsafe_to_cache_prepared();

let strict_vid_order = self.src.object.strict_vid_order();
let has_vid_seq = self.src.object.has_vid_seq();

// Construct a query
// insert into {dst}({columns})
Expand All @@ -4827,7 +4827,7 @@ impl<'a> QueryFragment<Pg> for CopyEntityBatchQuery<'a> {
out.push_sql(", ");
out.push_sql(CAUSALITY_REGION_COLUMN);
};
if strict_vid_order {
if has_vid_seq {
out.push_sql(", vid");
}

Expand Down Expand Up @@ -4895,7 +4895,7 @@ impl<'a> QueryFragment<Pg> for CopyEntityBatchQuery<'a> {
));
}
}
if strict_vid_order {
if has_vid_seq {
out.push_sql(", vid");
}

Expand Down
Loading