From 445d5a92938737d0f54cdc7e94d3e109f6422a1a Mon Sep 17 00:00:00 2001 From: prathamesh0 <42446521+prathamesh0@users.noreply.github.com> Date: Thu, 26 Oct 2023 12:05:29 +0530 Subject: [PATCH] Process template events at last when following subgraph events order (#439) * Process template events at last when following subgraph events order * Update mock test indexer --- packages/cli/src/index-block.ts | 2 +- .../src/templates/indexer-template.handlebars | 8 + packages/graph-node/test/utils/indexer.ts | 8 + packages/util/src/common.ts | 149 ++++++++++++++++-- packages/util/src/index-block.ts | 3 +- packages/util/src/job-runner.ts | 2 +- packages/util/src/types.ts | 2 + 7 files changed, 156 insertions(+), 18 deletions(-) diff --git a/packages/cli/src/index-block.ts b/packages/cli/src/index-block.ts index c5d083149..a0c95fd74 100644 --- a/packages/cli/src/index-block.ts +++ b/packages/cli/src/index-block.ts @@ -95,7 +95,7 @@ export class IndexBlockCmd { assert(indexer); assert(database); - await indexBlock(indexer, config.jobQueue.eventsInBatch, this._argv); + await indexBlock(indexer, config.jobQueue.eventsInBatch, config.jobQueue.subgraphEventsOrder, this._argv); await database.close(); } diff --git a/packages/codegen/src/templates/indexer-template.handlebars b/packages/codegen/src/templates/indexer-template.handlebars index e573651f0..17512faa7 100644 --- a/packages/codegen/src/templates/indexer-template.handlebars +++ b/packages/codegen/src/templates/indexer-template.handlebars @@ -564,6 +564,10 @@ export class Indexer implements IndexerInterface { return this._baseIndexer.saveEventEntity(dbEvent); } + async saveEvents (dbEvents: Event[]): Promise { + return this._baseIndexer.saveEvents(dbEvents); + } + async getEventsByFilter (blockHash: string, contract?: string, name?: string): Promise> { return this._baseIndexer.getEventsByFilter(blockHash, contract, name); } @@ -572,6 +576,10 @@ export class Indexer implements IndexerInterface { return this._baseIndexer.isWatchedContract(address); } + getWatchedContracts (): Contract[] { + return this._baseIndexer.getWatchedContracts(); + } + getContractsByKind (kind: string): Contract[] { return this._baseIndexer.getContractsByKind(kind); } diff --git a/packages/graph-node/test/utils/indexer.ts b/packages/graph-node/test/utils/indexer.ts index 17f80474e..9992e9639 100644 --- a/packages/graph-node/test/utils/indexer.ts +++ b/packages/graph-node/test/utils/indexer.ts @@ -177,6 +177,10 @@ export class Indexer implements IndexerInterface { return dbEvent; } + async saveEvents (dbEvents: EventInterface[]): Promise { + assert(dbEvents); + } + async processEvent (event: EventInterface): Promise { assert(event); } @@ -201,6 +205,10 @@ export class Indexer implements IndexerInterface { return undefined; } + getWatchedContracts (): ContractInterface[] { + return []; + } + async watchContract (address: string, kind: string, checkpoint: boolean, startingBlock: number): Promise { return undefined; } diff --git a/packages/util/src/common.ts b/packages/util/src/common.ts index f780c33c4..0a430ee25 100644 --- a/packages/util/src/common.ts +++ b/packages/util/src/common.ts @@ -255,12 +255,37 @@ export const _fetchBatchBlocks = async ( * @param block * @param eventsInBatch */ -export const processBatchEvents = async (indexer: IndexerInterface, block: BlockProgressInterface, eventsInBatch: number): Promise => { +export const processBatchEvents = async (indexer: IndexerInterface, block: BlockProgressInterface, eventsInBatch: number, subgraphEventsOrder: boolean): Promise => { + let dbBlock: BlockProgressInterface, dbEvents: EventInterface[]; + if (subgraphEventsOrder) { + ({ dbBlock, dbEvents } = await processEventsInSubgraphOrder(indexer, block, eventsInBatch)); + } else { + ({ dbBlock, dbEvents } = await processEvents(indexer, block, eventsInBatch)); + } + + if (indexer.processBlockAfterEvents) { + if (!dbBlock.isComplete) { + await indexer.processBlockAfterEvents(block.blockHash, block.blockNumber); + } + } + + dbBlock.isComplete = true; + + console.time('time:common#processBatchEvents-updateBlockProgress-saveEvents'); + await Promise.all([ + indexer.updateBlockProgress(dbBlock, dbBlock.lastProcessedEventIndex), + indexer.saveEvents(dbEvents) + ]); + console.timeEnd('time:common#processBatchEvents-updateBlockProgress-saveEvents'); +}; + +export const processEvents = async (indexer: IndexerInterface, block: BlockProgressInterface, eventsInBatch: number): Promise<{ dbBlock: BlockProgressInterface, dbEvents: EventInterface[] }> => { + const dbEvents: EventInterface[] = []; let page = 0; // Check if block processing is complete. while (block.numProcessedEvents < block.numEvents) { - console.time('time:common#processBacthEvents-fetching_events_batch'); + console.time('time:common#processEvents-fetching_events_batch'); // Fetch events in batches const events = await indexer.getBlockEvents( @@ -274,16 +299,16 @@ export const processBatchEvents = async (indexer: IndexerInterface, block: Block } ); - console.timeEnd('time:common#processBacthEvents-fetching_events_batch'); + console.timeEnd('time:common#processEvents-fetching_events_batch'); if (events.length) { log(`Processing events batch from index ${events[0].index} to ${events[0].index + events.length - 1}`); } - console.time('time:common#processBatchEvents-processing_events_batch'); + console.time('time:common#processEvents-processing_events_batch'); // Process events in loop - for (let event of events) { + for (const event of events) { // Skipping check for order of events processing since logIndex in FEVM is not index of log in block // Check was introduced to avoid reprocessing block events incase of restarts. But currently on restarts, unprocessed block is removed and reprocessed from first event log // if (event.index <= block.lastProcessedEventIndex) { @@ -308,28 +333,122 @@ export const processBatchEvents = async (indexer: IndexerInterface, block: Block ...logObj, eventSignature }); - event = await indexer.saveEventEntity(event); + + // Save updated event to the db + dbEvents.push(event); } await indexer.processEvent(event); } - block = await indexer.updateBlockProgress(block, event.index); + block.lastProcessedEventIndex = event.index; + block.numProcessedEvents++; } - console.timeEnd('time:common#processBatchEvents-processing_events_batch'); + console.timeEnd('time:common#processEvents-processing_events_batch'); } - if (indexer.processBlockAfterEvents) { - if (!block.isComplete) { - await indexer.processBlockAfterEvents(block.blockHash, block.blockNumber); + return { dbBlock: block, dbEvents }; +}; + +export const processEventsInSubgraphOrder = async (indexer: IndexerInterface, block: BlockProgressInterface, eventsInBatch: number): Promise<{ dbBlock: BlockProgressInterface, dbEvents: EventInterface[] }> => { + // Create list of initially watched contracts + const initiallyWatchedContracts: string[] = indexer.getWatchedContracts().map(contract => contract.address); + const unwatchedContractEvents: EventInterface[] = []; + + const dbEvents: EventInterface[] = []; + let page = 0; + + // Check if we are out of events. + let numFetchedEvents = 0; + while (numFetchedEvents < block.numEvents) { + console.time('time:common#processEventsInSubgraphOrder-fetching_events_batch'); + + // Fetch events in batches + const events = await indexer.getBlockEvents( + block.blockHash, + {}, + { + skip: (page++) * (eventsInBatch || DEFAULT_EVENTS_IN_BATCH), + limit: eventsInBatch || DEFAULT_EVENTS_IN_BATCH, + orderBy: 'index', + orderDirection: OrderDirection.asc + } + ); + numFetchedEvents += events.length; + + console.timeEnd('time:common#processEventsInSubgraphOrder-fetching_events_batch'); + + if (events.length) { + log(`Processing events batch from index ${events[0].index} to ${events[0].index + events.length - 1}`); + } + + console.time('time:common#processEventsInSubgraphOrder-processing_events_batch'); + + // First process events for initially watched contracts + const watchedContractEvents: EventInterface[] = []; + events.forEach(event => { + if (initiallyWatchedContracts.includes(event.contract)) { + watchedContractEvents.push(event); + } else { + unwatchedContractEvents.push(event); + } + }); + + // Process known events in a loop + for (const event of watchedContractEvents) { + // Skipping check for order of events processing since logIndex in FEVM is not index of log in block + // Check was introduced to avoid reprocessing block events incase of restarts. But currently on restarts, unprocessed block is removed and reprocessed from first event log + // if (event.index <= block.lastProcessedEventIndex) { + // throw new Error(`Events received out of order for block number ${block.blockNumber} hash ${block.blockHash}, got event index ${eventIndex} and lastProcessedEventIndex ${block.lastProcessedEventIndex}, aborting`); + // } + + await indexer.processEvent(event); + + block.lastProcessedEventIndex = event.index; + block.numProcessedEvents++; } + + console.timeEnd('time:common#processEventsInSubgraphOrder-processing_events_batch'); } - block.isComplete = true; - console.time('time:common#processBatchEvents-updateBlockProgress'); - await indexer.updateBlockProgress(block, block.lastProcessedEventIndex); - console.timeEnd('time:common#processBatchEvents-updateBlockProgress'); + console.time('time:common#processEventsInSubgraphOrder-processing_unwatched_events'); + + // At last, process all the events of newly watched contracts + for (const event of unwatchedContractEvents) { + const watchedContract = indexer.isWatchedContract(event.contract); + + if (watchedContract) { + // We might not have parsed this event yet. This can happen if the contract was added + // as a result of a previous event in the same block. + if (event.eventName === UNKNOWN_EVENT_NAME) { + const logObj = JSON.parse(event.extraInfo); + + assert(indexer.parseEventNameAndArgs); + assert(typeof watchedContract !== 'boolean'); + const { eventName, eventInfo, eventSignature } = indexer.parseEventNameAndArgs(watchedContract.kind, logObj); + + event.eventName = eventName; + event.eventInfo = JSONbigNative.stringify(eventInfo); + event.extraInfo = JSONbigNative.stringify({ + ...logObj, + eventSignature + }); + + // Save updated event to the db + dbEvents.push(event); + } + + await indexer.processEvent(event); + } + + block.lastProcessedEventIndex = Math.max(block.lastProcessedEventIndex + 1, event.index); + block.numProcessedEvents++; + } + + console.timeEnd('time:common#processEventsInSubgraphOrder-processing_unwatched_events'); + + return { dbBlock: block, dbEvents }; }; /** diff --git a/packages/util/src/index-block.ts b/packages/util/src/index-block.ts index de593732b..8969c504e 100644 --- a/packages/util/src/index-block.ts +++ b/packages/util/src/index-block.ts @@ -10,6 +10,7 @@ import { processBatchEvents } from './common'; export const indexBlock = async ( indexer: IndexerInterface, eventsInBatch: number, + subgraphEventsOrder: boolean, argv: { block: number, } @@ -44,6 +45,6 @@ export const indexBlock = async ( assert(indexer.processBlock); await indexer.processBlock(blockProgress); - await processBatchEvents(indexer, blockProgress, eventsInBatch); + await processBatchEvents(indexer, blockProgress, eventsInBatch, subgraphEventsOrder); } }; diff --git a/packages/util/src/job-runner.ts b/packages/util/src/job-runner.ts index 4beccc442..c476faf81 100644 --- a/packages/util/src/job-runner.ts +++ b/packages/util/src/job-runner.ts @@ -461,7 +461,7 @@ export class JobRunner { const { block } = prefetchedBlock; console.time('time:job-runner#_processEvents-events'); - await processBatchEvents(this._indexer, block, this._jobQueueConfig.eventsInBatch); + await processBatchEvents(this._indexer, block, this._jobQueueConfig.eventsInBatch, this._jobQueueConfig.subgraphEventsOrder); console.timeEnd('time:job-runner#_processEvents-events'); // Update metrics diff --git a/packages/util/src/types.ts b/packages/util/src/types.ts index 604af5575..84f331e83 100644 --- a/packages/util/src/types.ts +++ b/packages/util/src/types.ts @@ -106,9 +106,11 @@ export interface IndexerInterface { updateStateSyncStatusCheckpointBlock (blockNumber: number, force?: boolean): Promise markBlocksAsPruned (blocks: BlockProgressInterface[]): Promise saveEventEntity (dbEvent: EventInterface): Promise + saveEvents (dbEvents: EventInterface[]): Promise processEvent (event: EventInterface): Promise parseEventNameAndArgs?: (kind: string, logObj: any) => any isWatchedContract: (address: string) => ContractInterface | undefined; + getWatchedContracts: () => ContractInterface[] getContractsByKind?: (kind: string) => ContractInterface[] addContracts?: () => Promise cacheContract: (contract: ContractInterface) => void;