From 432ba562ea74dfebef26623a2fcca1ba802b4685 Mon Sep 17 00:00:00 2001 From: incrypto32 Date: Fri, 31 Jan 2025 18:41:02 +0400 Subject: [PATCH] Subgraph Composition: Fix Entity Ordering with different Idtyp --- store/postgres/src/relational.rs | 9 +- store/postgres/src/relational_queries.rs | 35 +------- store/test-store/tests/postgres/writable.rs | 95 +++++++++++++++++++++ 3 files changed, 106 insertions(+), 33 deletions(-) diff --git a/store/postgres/src/relational.rs b/store/postgres/src/relational.rs index 00e9b83871c..de7e6895083 100644 --- a/store/postgres/src/relational.rs +++ b/store/postgres/src/relational.rs @@ -603,6 +603,13 @@ impl Layout { Ok((ewt, block)) }; + fn compare_entity_data_ext(a: &EntityDataExt, b: &EntityDataExt) -> std::cmp::Ordering { + a.block_number + .cmp(&b.block_number) + .then_with(|| a.entity.cmp(&b.entity)) + .then_with(|| a.id.cmp(&b.id)) + } + // The algorithm is a similar to merge sort algorithm and it relays on the fact that both vectors // are ordered by (block_number, entity_type, entity_id). It advances simultaneously entities from // both lower_vec and upper_vec and tries to match entities that have entries in both vectors for @@ -616,7 +623,7 @@ impl Layout { while lower_now.is_some() || upper_now.is_some() { let (ewt, block) = match (lower_now, upper_now) { (Some(lower), Some(upper)) => { - match lower.cmp(&upper) { + match compare_entity_data_ext(lower, upper) { std::cmp::Ordering::Greater => { // we have upper bound at this block, but no lower bounds at the same block so it's deletion let (ewt, block) = transform(upper, EntityOperationKind::Delete)?; diff --git a/store/postgres/src/relational_queries.rs b/store/postgres/src/relational_queries.rs index 769fcacb20c..f007a8a620e 100644 --- a/store/postgres/src/relational_queries.rs +++ b/store/postgres/src/relational_queries.rs @@ -27,7 +27,6 @@ use graph::schema::{EntityType, FulltextAlgorithm, FulltextConfig, InputSchema}; use graph::{components::store::AttributeNames, data::store::scalar}; use inflector::Inflector; use itertools::Itertools; -use std::cmp::Ordering; use std::collections::{BTreeMap, BTreeSet, HashSet}; use std::convert::TryFrom; use std::fmt::{self, Display}; @@ -541,7 +540,7 @@ impl EntityData { } } -#[derive(QueryableByName, Clone, Debug, Default, Eq)] +#[derive(QueryableByName, Clone, Debug, Default)] pub struct EntityDataExt { #[diesel(sql_type = Text)] pub entity: String, @@ -549,40 +548,12 @@ pub struct EntityDataExt { pub data: serde_json::Value, #[diesel(sql_type = Integer)] pub block_number: i32, - #[diesel(sql_type = Text)] - pub id: String, + #[diesel(sql_type = Binary)] + pub id: Vec, #[diesel(sql_type = BigInt)] pub vid: i64, } -impl Ord for EntityDataExt { - fn cmp(&self, other: &Self) -> Ordering { - let ord = self.block_number.cmp(&other.block_number); - if ord != Ordering::Equal { - ord - } else { - let ord = self.entity.cmp(&other.entity); - if ord != Ordering::Equal { - ord - } else { - self.id.cmp(&other.id) - } - } - } -} - -impl PartialOrd for EntityDataExt { - fn partial_cmp(&self, other: &Self) -> Option { - Some(self.cmp(other)) - } -} - -impl PartialEq for EntityDataExt { - fn eq(&self, other: &Self) -> bool { - self.cmp(other) == Ordering::Equal - } -} - /// The equivalent of `graph::data::store::Value` but in a form that does /// not require further transformation during `walk_ast`. This form takes /// the idiosyncrasies of how we serialize values into account (e.g., that diff --git a/store/test-store/tests/postgres/writable.rs b/store/test-store/tests/postgres/writable.rs index c47498204e6..2e3e138d567 100644 --- a/store/test-store/tests/postgres/writable.rs +++ b/store/test-store/tests/postgres/writable.rs @@ -28,6 +28,32 @@ const SCHEMA_GQL: &str = " id: ID!, count: Int!, } + type BytesId @entity { + id: Bytes!, + value: String! + } + type Int8Id @entity { + id: Int8!, + value: String! + } + type StringId @entity { + id: String!, + value: String! + } + type PoolCreated @entity(immutable: true) { + id: Bytes!, + token0: Bytes!, + token1: Bytes!, + fee: Int!, + tickSpacing: Int!, + pool: Bytes!, + blockNumber: BigInt!, + blockTimestamp: BigInt!, + transactionHash: Bytes!, + transactionFrom: Bytes!, + transactionGasPrice: BigInt!, + logIndex: BigInt! + } "; const COUNTER: &str = "Counter"; @@ -407,3 +433,72 @@ fn read_immutable_only_range_test() { assert_eq!(e.len(), 4); }) } + +#[test] +fn read_range_pool_created_test() { + run_test(|store, writable, sourceable, deployment| async move { + let result_entities = vec![ + format!("(1, [EntitySourceOperation {{ entity_op: Create, entity_type: EntityType(PoolCreated), entity: Entity {{ blockNumber: BigInt(12369621), blockTimestamp: BigInt(1620243254), fee: Int(500), id: Bytes(0xff80818283848586), logIndex: BigInt(0), pool: Bytes(0x8ad599c3a0ff1de082011efddc58f1908eb6e6d8), tickSpacing: Int(10), token0: Bytes(0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48), token1: Bytes(0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2), transactionFrom: Bytes(0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48), transactionGasPrice: BigInt(100000000000), transactionHash: Bytes(0x12340000000000000000000000000000000000000000000000000000000000000000000000000000), vid: Int8(1) }}, vid: 1 }}])"), + format!("(2, [EntitySourceOperation {{ entity_op: Create, entity_type: EntityType(PoolCreated), entity: Entity {{ blockNumber: BigInt(12369622), blockTimestamp: BigInt(1620243255), fee: Int(3000), id: Bytes(0xff90919293949596), logIndex: BigInt(1), pool: Bytes(0x4585fe77225b41b697c938b018e2ac67ac5a20c0), tickSpacing: Int(60), token0: Bytes(0x2260fac5e5542a773aa44fbcfedf7c193bc2c599), token1: Bytes(0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2), transactionFrom: Bytes(0x2260fac5e5542a773aa44fbcfedf7c193bc2c599), transactionGasPrice: BigInt(100000000000), transactionHash: Bytes(0x12340000000000000000000000000000000000000000000000000000000000000000000000000001), vid: Int8(2) }}, vid: 2 }}])"), + ]; + + // Rest of the test remains the same + let subgraph_store = store.subgraph_store(); + writable.deployment_synced(block_pointer(0)).unwrap(); + + let pool_created_type = TEST_SUBGRAPH_SCHEMA.entity_type("PoolCreated").unwrap(); + let entity_types = vec![pool_created_type.clone()]; + + for count in (1..=2).map(|x| x as i64) { + let id = if count == 1 { + "0xff80818283848586" + } else { + "0xff90919293949596" + }; + + let data = entity! { TEST_SUBGRAPH_SCHEMA => + id: id, + token0: if count == 1 { "0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48" } else { "0x2260fac5e5542a773aa44fbcfedf7c193bc2c599" }, + token1: "0xc02aaa39b223fe8d0a0e5c4f27ead9083c756cc2", + fee: if count == 1 { 500 } else { 3000 }, + tickSpacing: if count == 1 { 10 } else { 60 }, + pool: if count == 1 { "0x8ad599c3a0ff1de082011efddc58f1908eb6e6d8" } else { "0x4585fe77225b41b697c938b018e2ac67ac5a20c0" }, + blockNumber: 12369621 + count - 1, + blockTimestamp: 1620243254 + count - 1, + transactionHash: format!("0x1234{:0>76}", if count == 1 { "0" } else { "1" }), + transactionFrom: if count == 1 { "0xa0b86991c6218b36c1d19d4a2e9eb0ce3606eb48" } else { "0x2260fac5e5542a773aa44fbcfedf7c193bc2c599" }, + transactionGasPrice: 100000000000i64, + logIndex: count - 1, + vid: count + }; + + let key = pool_created_type.parse_key(id).unwrap(); + let op = EntityOperation::Set { + key: key.clone(), + data, + }; + + transact_entity_operations( + &subgraph_store, + &deployment, + block_pointer(count as u8), + vec![op], + ) + .await + .unwrap(); + } + writable.flush().await.unwrap(); + writable.deployment_synced(block_pointer(0)).unwrap(); + + let br: Range = 0..18; + let e: BTreeMap> = sourceable + .get_range(entity_types.clone(), CausalityRegion::ONCHAIN, br.clone()) + .unwrap(); + assert_eq!(e.len(), 2); + for en in &e { + let index = *en.0 - 1; + let a = result_entities[index as usize].clone(); + assert_eq!(a, format!("{:?}", en)); + } + }) +}