Skip to content

Commit

Permalink
Process template events at last when following subgraph events order (#…
Browse files Browse the repository at this point in the history
…439)

* Process template events at last when following subgraph events order

* Update mock test indexer
  • Loading branch information
prathamesh0 authored Oct 26, 2023
1 parent b63a93d commit 445d5a9
Show file tree
Hide file tree
Showing 7 changed files with 156 additions and 18 deletions.
2 changes: 1 addition & 1 deletion packages/cli/src/index-block.ts
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ export class IndexBlockCmd {
assert(indexer);
assert(database);

await indexBlock(indexer, config.jobQueue.eventsInBatch, this._argv);
await indexBlock(indexer, config.jobQueue.eventsInBatch, config.jobQueue.subgraphEventsOrder, this._argv);

await database.close();
}
Expand Down
8 changes: 8 additions & 0 deletions packages/codegen/src/templates/indexer-template.handlebars
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,10 @@ export class Indexer implements IndexerInterface {
return this._baseIndexer.saveEventEntity(dbEvent);
}

async saveEvents (dbEvents: Event[]): Promise<void> {
return this._baseIndexer.saveEvents(dbEvents);
}

async getEventsByFilter (blockHash: string, contract?: string, name?: string): Promise<Array<Event>> {
return this._baseIndexer.getEventsByFilter(blockHash, contract, name);
}
Expand All @@ -572,6 +576,10 @@ export class Indexer implements IndexerInterface {
return this._baseIndexer.isWatchedContract(address);
}

getWatchedContracts (): Contract[] {
return this._baseIndexer.getWatchedContracts();
}

getContractsByKind (kind: string): Contract[] {
return this._baseIndexer.getContractsByKind(kind);
}
Expand Down
8 changes: 8 additions & 0 deletions packages/graph-node/test/utils/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,10 @@ export class Indexer implements IndexerInterface {
return dbEvent;
}

async saveEvents (dbEvents: EventInterface[]): Promise<void> {
assert(dbEvents);
}

async processEvent (event: EventInterface): Promise<void> {
assert(event);
}
Expand All @@ -201,6 +205,10 @@ export class Indexer implements IndexerInterface {
return undefined;
}

getWatchedContracts (): ContractInterface[] {
return [];
}

async watchContract (address: string, kind: string, checkpoint: boolean, startingBlock: number): Promise<void> {
return undefined;
}
Expand Down
149 changes: 134 additions & 15 deletions packages/util/src/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -255,12 +255,37 @@ export const _fetchBatchBlocks = async (
* @param block
* @param eventsInBatch
*/
export const processBatchEvents = async (indexer: IndexerInterface, block: BlockProgressInterface, eventsInBatch: number): Promise<void> => {
export const processBatchEvents = async (indexer: IndexerInterface, block: BlockProgressInterface, eventsInBatch: number, subgraphEventsOrder: boolean): Promise<void> => {
let dbBlock: BlockProgressInterface, dbEvents: EventInterface[];
if (subgraphEventsOrder) {
({ dbBlock, dbEvents } = await processEventsInSubgraphOrder(indexer, block, eventsInBatch));
} else {
({ dbBlock, dbEvents } = await processEvents(indexer, block, eventsInBatch));
}

if (indexer.processBlockAfterEvents) {
if (!dbBlock.isComplete) {
await indexer.processBlockAfterEvents(block.blockHash, block.blockNumber);
}
}

dbBlock.isComplete = true;

console.time('time:common#processBatchEvents-updateBlockProgress-saveEvents');
await Promise.all([
indexer.updateBlockProgress(dbBlock, dbBlock.lastProcessedEventIndex),
indexer.saveEvents(dbEvents)
]);
console.timeEnd('time:common#processBatchEvents-updateBlockProgress-saveEvents');
};

export const processEvents = async (indexer: IndexerInterface, block: BlockProgressInterface, eventsInBatch: number): Promise<{ dbBlock: BlockProgressInterface, dbEvents: EventInterface[] }> => {
const dbEvents: EventInterface[] = [];
let page = 0;

// Check if block processing is complete.
while (block.numProcessedEvents < block.numEvents) {
console.time('time:common#processBacthEvents-fetching_events_batch');
console.time('time:common#processEvents-fetching_events_batch');

// Fetch events in batches
const events = await indexer.getBlockEvents(
Expand All @@ -274,16 +299,16 @@ export const processBatchEvents = async (indexer: IndexerInterface, block: Block
}
);

console.timeEnd('time:common#processBacthEvents-fetching_events_batch');
console.timeEnd('time:common#processEvents-fetching_events_batch');

if (events.length) {
log(`Processing events batch from index ${events[0].index} to ${events[0].index + events.length - 1}`);
}

console.time('time:common#processBatchEvents-processing_events_batch');
console.time('time:common#processEvents-processing_events_batch');

// Process events in loop
for (let event of events) {
for (const event of events) {
// Skipping check for order of events processing since logIndex in FEVM is not index of log in block
// Check was introduced to avoid reprocessing block events incase of restarts. But currently on restarts, unprocessed block is removed and reprocessed from first event log
// if (event.index <= block.lastProcessedEventIndex) {
Expand All @@ -308,28 +333,122 @@ export const processBatchEvents = async (indexer: IndexerInterface, block: Block
...logObj,
eventSignature
});
event = await indexer.saveEventEntity(event);

// Save updated event to the db
dbEvents.push(event);
}

await indexer.processEvent(event);
}

block = await indexer.updateBlockProgress(block, event.index);
block.lastProcessedEventIndex = event.index;
block.numProcessedEvents++;
}

console.timeEnd('time:common#processBatchEvents-processing_events_batch');
console.timeEnd('time:common#processEvents-processing_events_batch');
}

if (indexer.processBlockAfterEvents) {
if (!block.isComplete) {
await indexer.processBlockAfterEvents(block.blockHash, block.blockNumber);
return { dbBlock: block, dbEvents };
};

export const processEventsInSubgraphOrder = async (indexer: IndexerInterface, block: BlockProgressInterface, eventsInBatch: number): Promise<{ dbBlock: BlockProgressInterface, dbEvents: EventInterface[] }> => {
// Create list of initially watched contracts
const initiallyWatchedContracts: string[] = indexer.getWatchedContracts().map(contract => contract.address);
const unwatchedContractEvents: EventInterface[] = [];

const dbEvents: EventInterface[] = [];
let page = 0;

// Check if we are out of events.
let numFetchedEvents = 0;
while (numFetchedEvents < block.numEvents) {
console.time('time:common#processEventsInSubgraphOrder-fetching_events_batch');

// Fetch events in batches
const events = await indexer.getBlockEvents(
block.blockHash,
{},
{
skip: (page++) * (eventsInBatch || DEFAULT_EVENTS_IN_BATCH),
limit: eventsInBatch || DEFAULT_EVENTS_IN_BATCH,
orderBy: 'index',
orderDirection: OrderDirection.asc
}
);
numFetchedEvents += events.length;

console.timeEnd('time:common#processEventsInSubgraphOrder-fetching_events_batch');

if (events.length) {
log(`Processing events batch from index ${events[0].index} to ${events[0].index + events.length - 1}`);
}

console.time('time:common#processEventsInSubgraphOrder-processing_events_batch');

// First process events for initially watched contracts
const watchedContractEvents: EventInterface[] = [];
events.forEach(event => {
if (initiallyWatchedContracts.includes(event.contract)) {
watchedContractEvents.push(event);
} else {
unwatchedContractEvents.push(event);
}
});

// Process known events in a loop
for (const event of watchedContractEvents) {
// Skipping check for order of events processing since logIndex in FEVM is not index of log in block
// Check was introduced to avoid reprocessing block events incase of restarts. But currently on restarts, unprocessed block is removed and reprocessed from first event log
// if (event.index <= block.lastProcessedEventIndex) {
// throw new Error(`Events received out of order for block number ${block.blockNumber} hash ${block.blockHash}, got event index ${eventIndex} and lastProcessedEventIndex ${block.lastProcessedEventIndex}, aborting`);
// }

await indexer.processEvent(event);

block.lastProcessedEventIndex = event.index;
block.numProcessedEvents++;
}

console.timeEnd('time:common#processEventsInSubgraphOrder-processing_events_batch');
}

block.isComplete = true;
console.time('time:common#processBatchEvents-updateBlockProgress');
await indexer.updateBlockProgress(block, block.lastProcessedEventIndex);
console.timeEnd('time:common#processBatchEvents-updateBlockProgress');
console.time('time:common#processEventsInSubgraphOrder-processing_unwatched_events');

// At last, process all the events of newly watched contracts
for (const event of unwatchedContractEvents) {
const watchedContract = indexer.isWatchedContract(event.contract);

if (watchedContract) {
// We might not have parsed this event yet. This can happen if the contract was added
// as a result of a previous event in the same block.
if (event.eventName === UNKNOWN_EVENT_NAME) {
const logObj = JSON.parse(event.extraInfo);

assert(indexer.parseEventNameAndArgs);
assert(typeof watchedContract !== 'boolean');
const { eventName, eventInfo, eventSignature } = indexer.parseEventNameAndArgs(watchedContract.kind, logObj);

event.eventName = eventName;
event.eventInfo = JSONbigNative.stringify(eventInfo);
event.extraInfo = JSONbigNative.stringify({
...logObj,
eventSignature
});

// Save updated event to the db
dbEvents.push(event);
}

await indexer.processEvent(event);
}

block.lastProcessedEventIndex = Math.max(block.lastProcessedEventIndex + 1, event.index);
block.numProcessedEvents++;
}

console.timeEnd('time:common#processEventsInSubgraphOrder-processing_unwatched_events');

return { dbBlock: block, dbEvents };
};

/**
Expand Down
3 changes: 2 additions & 1 deletion packages/util/src/index-block.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { processBatchEvents } from './common';
export const indexBlock = async (
indexer: IndexerInterface,
eventsInBatch: number,
subgraphEventsOrder: boolean,
argv: {
block: number,
}
Expand Down Expand Up @@ -44,6 +45,6 @@ export const indexBlock = async (
assert(indexer.processBlock);
await indexer.processBlock(blockProgress);

await processBatchEvents(indexer, blockProgress, eventsInBatch);
await processBatchEvents(indexer, blockProgress, eventsInBatch, subgraphEventsOrder);
}
};
2 changes: 1 addition & 1 deletion packages/util/src/job-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -461,7 +461,7 @@ export class JobRunner {
const { block } = prefetchedBlock;

console.time('time:job-runner#_processEvents-events');
await processBatchEvents(this._indexer, block, this._jobQueueConfig.eventsInBatch);
await processBatchEvents(this._indexer, block, this._jobQueueConfig.eventsInBatch, this._jobQueueConfig.subgraphEventsOrder);
console.timeEnd('time:job-runner#_processEvents-events');

// Update metrics
Expand Down
2 changes: 2 additions & 0 deletions packages/util/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,11 @@ export interface IndexerInterface {
updateStateSyncStatusCheckpointBlock (blockNumber: number, force?: boolean): Promise<StateSyncStatusInterface>
markBlocksAsPruned (blocks: BlockProgressInterface[]): Promise<void>
saveEventEntity (dbEvent: EventInterface): Promise<EventInterface>
saveEvents (dbEvents: EventInterface[]): Promise<void>
processEvent (event: EventInterface): Promise<void>
parseEventNameAndArgs?: (kind: string, logObj: any) => any
isWatchedContract: (address: string) => ContractInterface | undefined;
getWatchedContracts: () => ContractInterface[]
getContractsByKind?: (kind: string) => ContractInterface[]
addContracts?: () => Promise<void>
cacheContract: (contract: ContractInterface) => void;
Expand Down

0 comments on commit 445d5a9

Please sign in to comment.