Skip to content

Commit

Permalink
Avoid updating StateSyncStatus table when enableState flag is set t…
Browse files Browse the repository at this point in the history
…o false (#440)

* Handle zero hash canonical block incase of FEVM null block

* Fix json-bigint parse in processBatchEvents

* Avoid updating SyncStatus table when enableState is false
  • Loading branch information
nikugogoi authored Oct 26, 2023
1 parent 445d5a9 commit 43463af
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 38 deletions.
14 changes: 11 additions & 3 deletions packages/codegen/src/templates/indexer-template.handlebars
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import debug from 'debug';
{{#if queries}}
import JSONbig from 'json-bigint';
{{/if}}
import { ethers } from 'ethers';
import { ethers, constants } from 'ethers';
{{#if (subgraphPath)}}
import { SelectionNode } from 'graphql';
{{/if}}
Expand Down Expand Up @@ -493,7 +493,11 @@ export class Indexer implements IndexerInterface {
return this._db.getStateSyncStatus();
}

async updateStateSyncStatusIndexedBlock (blockNumber: number, force?: boolean): Promise<StateSyncStatus> {
async updateStateSyncStatusIndexedBlock (blockNumber: number, force?: boolean): Promise<StateSyncStatus | undefined> {
if (!this._serverConfig.enableState) {
return;
}

const dbTx = await this._db.createTransactionRunner();
let res;

Expand Down Expand Up @@ -527,10 +531,14 @@ export class Indexer implements IndexerInterface {
return res;
}

async getLatestCanonicalBlock (): Promise<BlockProgress> {
async getLatestCanonicalBlock (): Promise<BlockProgress | undefined> {
const syncStatus = await this.getSyncStatus();
assert(syncStatus);

if (syncStatus.latestCanonicalBlockHash === constants.HashZero) {
return;
}

const latestCanonicalBlock = await this.getBlockProgress(syncStatus.latestCanonicalBlockHash);
assert(latestCanonicalBlock);

Expand Down
8 changes: 4 additions & 4 deletions packages/graph-node/test/utils/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -189,16 +189,16 @@ export class Indexer implements IndexerInterface {
return undefined;
}

async updateStateSyncStatusIndexedBlock (blockNumber: number, force?: boolean): Promise<StateSyncStatusInterface> {
return {} as StateSyncStatusInterface;
async updateStateSyncStatusIndexedBlock (blockNumber: number, force?: boolean): Promise<StateSyncStatusInterface | undefined> {
return undefined;
}

async updateStateSyncStatusCheckpointBlock (blockNumber: number, force?: boolean): Promise<StateSyncStatusInterface> {
return {} as StateSyncStatusInterface;
}

async getLatestCanonicalBlock (): Promise<BlockProgressInterface> {
return {} as BlockProgressInterface;
async getLatestCanonicalBlock (): Promise<BlockProgressInterface | undefined> {
return undefined;
}

isWatchedContract (address : string): ContractInterface | undefined {
Expand Down
9 changes: 4 additions & 5 deletions packages/util/src/common.ts
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ export const processEvents = async (indexer: IndexerInterface, block: BlockProgr
// We might not have parsed this event yet. This can happen if the contract was added
// as a result of a previous event in the same block.
if (event.eventName === UNKNOWN_EVENT_NAME) {
const logObj = JSON.parse(event.extraInfo);
const logObj = JSONbigNative.parse(event.extraInfo);

assert(indexer.parseEventNameAndArgs);
assert(typeof watchedContract !== 'boolean');
Expand Down Expand Up @@ -422,7 +422,7 @@ export const processEventsInSubgraphOrder = async (indexer: IndexerInterface, bl
// We might not have parsed this event yet. This can happen if the contract was added
// as a result of a previous event in the same block.
if (event.eventName === UNKNOWN_EVENT_NAME) {
const logObj = JSON.parse(event.extraInfo);
const logObj = JSONbigNative.parse(event.extraInfo);

assert(indexer.parseEventNameAndArgs);
assert(typeof watchedContract !== 'boolean');
Expand Down Expand Up @@ -481,12 +481,11 @@ export const createPruningJob = async (jobQueue: JobQueue, latestCanonicalBlockN
* @param blockHash
* @param blockNumber
*/
export const createHooksJob = async (jobQueue: JobQueue, blockHash: string, blockNumber: number): Promise<void> => {
export const createHooksJob = async (jobQueue: JobQueue, blockHash: string): Promise<void> => {
await jobQueue.pushJob(
QUEUE_HOOKS,
{
blockHash,
blockNumber
blockHash
}
);
};
Expand Down
57 changes: 33 additions & 24 deletions packages/util/src/job-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,12 @@ export class JobRunner {

// Create a hooks job for parent block of latestCanonicalBlock pruning for first block is skipped as it is assumed to be a canonical block.
const latestCanonicalBlock = await this._indexer.getLatestCanonicalBlock();
await createHooksJob(this.jobQueue, latestCanonicalBlock.parentHash, latestCanonicalBlock.blockNumber - 1);

// Check if latestCanonicalBlock is undefined incase of null block in FEVM
if (latestCanonicalBlock) {
await createHooksJob(this.jobQueue, latestCanonicalBlock.parentHash);
}

break;
}

Expand Down Expand Up @@ -145,16 +150,19 @@ export class JobRunner {
}

async processHooks (job: any): Promise<void> {
const { data: { blockHash, blockNumber } } = job;
// Get the block and current stateSyncStatus.
const [blockProgress, stateSyncStatus] = await Promise.all([
this._indexer.getBlockProgress(job.data.blockHash),
this._indexer.getStateSyncStatus()
]);

// Get the current stateSyncStatus.
const stateSyncStatus = await this._indexer.getStateSyncStatus();
assert(blockProgress);
const { blockHash, blockNumber, parentHash } = blockProgress;

if (stateSyncStatus) {
if (stateSyncStatus.latestIndexedBlockNumber < (blockNumber - 1)) {
// Create hooks job for parent block.
const [parentBlock] = await this._indexer.getBlocksAtHeight(blockNumber - 1, false);
await createHooksJob(this.jobQueue, parentBlock.blockHash, parentBlock.blockNumber);
await createHooksJob(this.jobQueue, parentHash);

const message = `State for blockNumber ${blockNumber - 1} not indexed yet, aborting`;
log(message);
Expand Down Expand Up @@ -186,32 +194,33 @@ export class JobRunner {

// Get the current stateSyncStatus.
const stateSyncStatus = await this._indexer.getStateSyncStatus();
assert(stateSyncStatus);

if (stateSyncStatus.latestCheckpointBlockNumber >= 0) {
if (stateSyncStatus.latestCheckpointBlockNumber < (blockNumber - 1)) {
// Create a checkpoint job for parent block.
const [parentBlock] = await this._indexer.getBlocksAtHeight(blockNumber - 1, false);
await createCheckpointJob(this.jobQueue, parentBlock.blockHash, parentBlock.blockNumber);
if (stateSyncStatus) {
if (stateSyncStatus.latestCheckpointBlockNumber >= 0) {
if (stateSyncStatus.latestCheckpointBlockNumber < (blockNumber - 1)) {
// Create a checkpoint job for parent block.
const [parentBlock] = await this._indexer.getBlocksAtHeight(blockNumber - 1, false);
await createCheckpointJob(this.jobQueue, parentBlock.blockHash, parentBlock.blockNumber);

const message = `Checkpoints for blockNumber ${blockNumber - 1} not processed yet, aborting`;
log(message);
const message = `Checkpoints for blockNumber ${blockNumber - 1} not processed yet, aborting`;
log(message);

throw new Error(message);
}
throw new Error(message);
}

if (stateSyncStatus.latestCheckpointBlockNumber > (blockNumber - 1)) {
log(`Checkpoints for blockNumber ${blockNumber} already processed`);
if (stateSyncStatus.latestCheckpointBlockNumber > (blockNumber - 1)) {
log(`Checkpoints for blockNumber ${blockNumber} already processed`);

return;
return;
}
}
}

// Process checkpoints for the given block.
await this._indexer.processCheckpoint(blockHash);
// Process checkpoints for the given block.
await this._indexer.processCheckpoint(blockHash);

// Update the stateSyncStatus.
await this._indexer.updateStateSyncStatusCheckpointBlock(blockNumber);
// Update the stateSyncStatus.
await this._indexer.updateStateSyncStatusCheckpointBlock(blockNumber);
}

await this.jobQueue.markComplete(job);
}
Expand Down
4 changes: 2 additions & 2 deletions packages/util/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ export interface IndexerInterface {
getStateSyncStatus (): Promise<StateSyncStatusInterface | undefined>
getBlocks (blockFilter: { blockHash?: string, blockNumber?: number }): Promise<any>
getBlocksAtHeight (height: number, isPruned: boolean): Promise<BlockProgressInterface[]>
getLatestCanonicalBlock (): Promise<BlockProgressInterface>
getLatestCanonicalBlock (): Promise<BlockProgressInterface | undefined>
getLatestStateIndexedBlock (): Promise<BlockProgressInterface>
getBlockEvents (blockHash: string, where: Where, queryOptions: QueryOptions): Promise<Array<EventInterface>>
getAncestorAtDepth (blockHash: string, depth: number): Promise<string>
Expand All @@ -102,7 +102,7 @@ export interface IndexerInterface {
updateSyncStatusChainHead (blockHash: string, blockNumber: number, force?: boolean): Promise<SyncStatusInterface>
updateSyncStatusIndexedBlock (blockHash: string, blockNumber: number, force?: boolean): Promise<SyncStatusInterface>
updateSyncStatusCanonicalBlock (blockHash: string, blockNumber: number, force?: boolean): Promise<SyncStatusInterface>
updateStateSyncStatusIndexedBlock (blockNumber: number, force?: boolean): Promise<StateSyncStatusInterface>
updateStateSyncStatusIndexedBlock (blockNumber: number, force?: boolean): Promise<StateSyncStatusInterface | undefined>
updateStateSyncStatusCheckpointBlock (blockNumber: number, force?: boolean): Promise<StateSyncStatusInterface>
markBlocksAsPruned (blocks: BlockProgressInterface[]): Promise<void>
saveEventEntity (dbEvent: EventInterface): Promise<EventInterface>
Expand Down

0 comments on commit 43463af

Please sign in to comment.