Skip to content

Commit 18607f3

Browse files
committed
graph: refactor TriggersAdapterWrapper.triggers_in_block to not rely on scan_triggers
1 parent 19e4ef1 commit 18607f3

File tree

1 file changed

+78
-52
lines changed

1 file changed

+78
-52
lines changed

graph/src/blockchain/block_stream.rs

Lines changed: 78 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,24 @@ impl<C: Blockchain> TriggersAdapterWrapper<C> {
328328
source_subgraph_stores,
329329
}
330330
}
331+
332+
pub async fn blocks_with_subgraph_triggers(
333+
&self,
334+
logger: &Logger,
335+
subgraph_filter: &SubgraphFilter,
336+
range: SubgraphTriggerScanRange<C>,
337+
) -> Result<Vec<BlockWithTriggers<C>>, Error> {
338+
let store = self
339+
.source_subgraph_stores
340+
.get(&subgraph_filter.subgraph)
341+
.unwrap(); // TODO(krishna): Avoid unwrap
342+
343+
let schema = crate::components::store::ReadStore::input_schema(store);
344+
345+
let adapter = self.adapter.clone();
346+
347+
scan_subgraph_triggers::<C>(logger, store, &adapter, &schema, &subgraph_filter, range).await
348+
}
331349
}
332350

333351
fn create_subgraph_trigger_from_entity(
@@ -370,34 +388,60 @@ async fn create_subgraph_triggers<C: Blockchain>(
370388
Ok(blocks)
371389
}
372390

391+
pub enum SubgraphTriggerScanRange<C: Blockchain> {
392+
Single(C::Block),
393+
Range(BlockNumber, BlockNumber),
394+
}
395+
373396
async fn scan_subgraph_triggers<C: Blockchain>(
374397
logger: &Logger,
375398
store: &Arc<dyn WritableStore>,
376399
adapter: &Arc<dyn TriggersAdapter<C>>,
377400
schema: &InputSchema,
378401
filter: &SubgraphFilter,
402+
range: SubgraphTriggerScanRange<C>,
403+
) -> Result<Vec<BlockWithTriggers<C>>, Error> {
404+
match range {
405+
SubgraphTriggerScanRange::Single(block) => {
406+
let entities =
407+
get_entities_for_range(store, filter, schema, block.number(), block.number())
408+
.await?;
409+
create_subgraph_triggers::<C>(logger.clone(), vec![block], filter, entities).await
410+
}
411+
SubgraphTriggerScanRange::Range(from, to) => {
412+
let entities = get_entities_for_range(store, filter, schema, from, to).await?;
413+
let mut block_numbers: HashSet<BlockNumber> = entities.keys().cloned().collect();
414+
// Ensure the 'to' block is included in the block_numbers
415+
block_numbers.insert(to);
416+
417+
let blocks = adapter
418+
.load_blocks_by_numbers(logger.clone(), block_numbers)
419+
.await?;
420+
421+
create_subgraph_triggers::<C>(logger.clone(), blocks, filter, entities).await
422+
}
423+
}
424+
}
425+
426+
async fn get_entities_for_range(
427+
store: &Arc<dyn WritableStore>,
428+
filter: &SubgraphFilter,
429+
schema: &InputSchema,
379430
from: BlockNumber,
380431
to: BlockNumber,
381-
) -> Result<Vec<BlockWithTriggers<C>>, Error> {
432+
) -> Result<BTreeMap<BlockNumber, Entity>, Error> {
382433
let entity_types: Vec<EntityType> = filter
383434
.entities
384435
.iter()
385436
.map(|e| schema.entity_type(e).unwrap())
386437
.collect();
387-
388-
let entity_type = entity_types.first().unwrap();
389-
let range = from as u32..to as u32;
390-
let entities = store.get_range(&entity_type, range)?;
391-
let mut block_numbers: HashSet<BlockNumber> = entities.keys().cloned().collect();
392-
393-
// Ensure the 'to' block is included in the block_numbers
394-
block_numbers.insert(to);
395-
396-
let blocks = adapter
397-
.load_blocks_by_numbers(logger.clone(), block_numbers)
398-
.await?;
399-
400-
create_subgraph_triggers::<C>(logger.clone(), blocks, filter, entities).await
438+
let mut entities = BTreeMap::new();
439+
for entity_type in entity_types {
440+
let range = from as u32..to as u32;
441+
let mut entities_for_type = store.get_range(&entity_type, range)?;
442+
entities.append(&mut entities_for_type);
443+
}
444+
Ok(entities)
401445
}
402446

403447
impl<C: Blockchain> TriggersAdapterWrapper<C> {
@@ -418,33 +462,13 @@ impl<C: Blockchain> TriggersAdapterWrapper<C> {
418462
filter: &Arc<TriggerFilterWrapper<C>>,
419463
) -> Result<(Vec<BlockWithTriggers<C>>, BlockNumber), Error> {
420464
if let Some(subgraph_filter) = filter.subgraph_filter.first() {
421-
let store = self
422-
.source_subgraph_stores
423-
.get(&subgraph_filter.subgraph)
424-
.unwrap(); // TODO(krishna): Avoid unwrap
425-
426-
let schema = crate::components::store::ReadStore::input_schema(store);
427-
428-
let adapter = self.adapter.clone();
429-
430-
let blocks_with_triggers = scan_subgraph_triggers::<C>(
431-
logger,
432-
store,
433-
&adapter,
434-
&schema,
435-
&subgraph_filter,
436-
from,
437-
to,
438-
)
439-
.await?;
440-
441-
debug!(
442-
logger,
443-
"Scanned subgraph triggers";
444-
"from" => from,
445-
"to" => to,
446-
"blocks_with_triggers" => blocks_with_triggers.len(),
447-
);
465+
let blocks_with_triggers = self
466+
.blocks_with_subgraph_triggers(
467+
logger,
468+
subgraph_filter,
469+
SubgraphTriggerScanRange::Range(from, to),
470+
)
471+
.await?;
448472

449473
return Ok((blocks_with_triggers, to));
450474
}
@@ -467,19 +491,21 @@ impl<C: Blockchain> TriggersAdapterWrapper<C> {
467491
"block_hash" => block.hash().hash_hex(),
468492
);
469493

470-
let block_number = block.number();
471-
472-
if filter.subgraph_filter.is_empty() {
473-
trace!(logger, "No subgraph filters, scanning triggers in block");
474-
return self
475-
.adapter
476-
.triggers_in_block(logger, block, &filter.chain_filter)
477-
.await;
494+
if let Some(subgraph_filter) = filter.subgraph_filter.first() {
495+
let blocks_with_triggers = self
496+
.blocks_with_subgraph_triggers(
497+
logger,
498+
subgraph_filter,
499+
SubgraphTriggerScanRange::Single(block),
500+
)
501+
.await?;
502+
503+
return Ok(blocks_with_triggers.into_iter().next().unwrap());
478504
}
479505

480-
self.scan_triggers(logger, block_number, block_number, filter)
506+
self.adapter
507+
.triggers_in_block(logger, block, &filter.chain_filter)
481508
.await
482-
.map(|(mut blocks, _)| blocks.pop().unwrap())
483509
}
484510

485511
pub async fn is_on_main_chain(&self, ptr: BlockPtr) -> Result<bool, Error> {

0 commit comments

Comments
 (0)