Skip to content

Commit c81369a

Browse files
committed
graph: order subgraph triggers by source_idx first
1 parent 043c3df commit c81369a

File tree

7 files changed

+39
-12
lines changed

7 files changed

+39
-12
lines changed

core/src/subgraph/runner.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ where
146146
.iter()
147147
.map(|handler| handler.entity.clone())
148148
.collect(),
149+
manifest_idx: ds.manifest_idx,
149150
})
150151
.collect::<Vec<_>>();
151152

graph/src/blockchain/block_stream.rs

Lines changed: 14 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -357,11 +357,10 @@ impl<C: Blockchain> TriggersAdapterWrapper<C> {
357357
SubgraphTriggerScanRange::Range(from, to) => {
358358
let hash_to_entities = self.fetch_entities_for_filters(filters, from, to).await?;
359359

360-
let block_numbers: BTreeSet<BlockNumber> = hash_to_entities
360+
let block_numbers = hash_to_entities
361361
.iter()
362-
.flat_map(|(_, entities)| entities.keys().copied())
363-
.chain(std::iter::once(to))
364-
.collect();
362+
.flat_map(|(_, entities, _)| entities.keys().copied())
363+
.collect::<BTreeSet<_>>();
365364

366365
let blocks = self
367366
.adapter
@@ -384,6 +383,7 @@ impl<C: Blockchain> TriggersAdapterWrapper<C> {
384383
Vec<(
385384
DeploymentHash,
386385
BTreeMap<BlockNumber, Vec<EntitySourceOperation>>,
386+
u32,
387387
)>,
388388
Error,
389389
> {
@@ -399,7 +399,7 @@ impl<C: Blockchain> TriggersAdapterWrapper<C> {
399399
async move {
400400
let entities =
401401
get_entities_for_range(&store, filter, &schema, from, to).await?;
402-
Ok::<_, Error>((filter.subgraph.clone(), entities))
402+
Ok::<_, Error>((filter.subgraph.clone(), entities, filter.manifest_idx))
403403
}
404404
})
405405
})
@@ -416,12 +416,14 @@ impl<C: Blockchain> TriggersAdapterWrapper<C> {
416416
fn create_subgraph_trigger_from_entities(
417417
subgraph: &DeploymentHash,
418418
entities: Vec<EntitySourceOperation>,
419+
manifest_idx: u32,
419420
) -> Vec<subgraph::TriggerData> {
420421
entities
421422
.into_iter()
422423
.map(|entity| subgraph::TriggerData {
423424
source: subgraph.clone(),
424425
entity,
426+
source_idx: manifest_idx,
425427
})
426428
.collect()
427429
}
@@ -432,6 +434,7 @@ async fn create_subgraph_triggers<C: Blockchain>(
432434
subgraph_data: Vec<(
433435
DeploymentHash,
434436
BTreeMap<BlockNumber, Vec<EntitySourceOperation>>,
437+
u32,
435438
)>,
436439
) -> Result<Vec<BlockWithTriggers<C>>, Error> {
437440
let logger_clone = logger.cheap_clone();
@@ -441,10 +444,13 @@ async fn create_subgraph_triggers<C: Blockchain>(
441444
let block_number = block.number();
442445
let mut all_trigger_data = Vec::new();
443446

444-
for (hash, entities) in subgraph_data.iter() {
447+
for (hash, entities, manifest_idx) in subgraph_data.iter() {
445448
if let Some(block_entities) = entities.get(&block_number) {
446-
let trigger_data =
447-
create_subgraph_trigger_from_entities(hash, block_entities.clone());
449+
let trigger_data = create_subgraph_trigger_from_entities(
450+
hash,
451+
block_entities.clone(),
452+
*manifest_idx,
453+
);
448454
all_trigger_data.extend(trigger_data);
449455
}
450456
}

graph/src/blockchain/mod.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -266,6 +266,7 @@ pub struct SubgraphFilter {
266266
pub subgraph: DeploymentHash,
267267
pub start_block: BlockNumber,
268268
pub entities: Vec<String>,
269+
pub manifest_idx: u32,
269270
}
270271

271272
impl<C: Blockchain> TriggerFilterWrapper<C> {
@@ -467,7 +468,7 @@ where
467468
(Trigger::Chain(data1), Trigger::Chain(data2)) => data1.cmp(data2),
468469
(Trigger::Subgraph(_), Trigger::Chain(_)) => std::cmp::Ordering::Greater,
469470
(Trigger::Chain(_), Trigger::Subgraph(_)) => std::cmp::Ordering::Less,
470-
(Trigger::Subgraph(t1), Trigger::Subgraph(t2)) => t1.entity.vid.cmp(&t2.entity.vid),
471+
(Trigger::Subgraph(t1), Trigger::Subgraph(t2)) => t1.cmp(t2),
471472
}
472473
}
473474
}

graph/src/data_source/subgraph.rs

Lines changed: 18 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -354,18 +354,34 @@ pub struct MappingEntityTrigger {
354354
pub struct TriggerData {
355355
pub source: DeploymentHash,
356356
pub entity: EntitySourceOperation,
357+
pub source_idx: u32,
357358
}
358359

359360
impl TriggerData {
360-
pub fn new(source: DeploymentHash, entity: EntitySourceOperation) -> Self {
361-
Self { source, entity }
361+
pub fn new(source: DeploymentHash, entity: EntitySourceOperation, source_idx: u32) -> Self {
362+
Self { source, entity, source_idx }
362363
}
363364

364365
pub fn entity_type(&self) -> &str {
365366
self.entity.entity_type.as_str()
366367
}
367368
}
368369

370+
impl Ord for TriggerData {
371+
fn cmp(&self, other: &Self) -> std::cmp::Ordering {
372+
match self.source_idx.cmp(&other.source_idx) {
373+
std::cmp::Ordering::Equal => self.entity.vid.cmp(&other.entity.vid),
374+
ord => ord,
375+
}
376+
}
377+
}
378+
379+
impl PartialOrd for TriggerData {
380+
fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
381+
Some(self.cmp(other))
382+
}
383+
}
384+
369385
impl fmt::Debug for TriggerData {
370386
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
371387
write!(

graph/tests/subgraph_datasource_tests.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,7 @@ async fn test_triggers_adapter_with_entities() {
190190
subgraph: id,
191191
start_block: 0,
192192
entities: vec!["User".to_string()], // Only monitoring User entities
193+
manifest_idx: 0,
193194
};
194195

195196
let logger = Logger::root(slog::Discard, slog::o!());

tests/src/fixture/ethereum.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,6 +169,7 @@ pub fn push_test_subgraph_trigger(
169169
entity_type: EntityType,
170170
entity_op: EntityOperationKind,
171171
vid: i64,
172+
source_idx: u32,
172173
) {
173174
let entity = EntitySourceOperation {
174175
entity: entity,
@@ -179,7 +180,7 @@ pub fn push_test_subgraph_trigger(
179180

180181
block
181182
.trigger_data
182-
.push(Trigger::Subgraph(subgraph::TriggerData { source, entity }));
183+
.push(Trigger::Subgraph(subgraph::TriggerData { source, entity, source_idx }));
183184
}
184185

185186
pub fn push_test_command(

tests/tests/runner_tests.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1122,6 +1122,7 @@ async fn subgraph_data_sources() {
11221122
entity_type,
11231123
EntityOperationKind::Create,
11241124
1,
1125+
0,
11251126
);
11261127

11271128
let block_2 = empty_block(block_1.ptr(), test_ptr(2));

0 commit comments

Comments
 (0)