Skip to content

Commit

Permalink
Update full processor to show its status on the UI for each job
Browse files Browse the repository at this point in the history
  • Loading branch information
wjrjerome committed Dec 22, 2023
1 parent 4ba8c0b commit 70ff66e
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 103 deletions.
144 changes: 47 additions & 97 deletions src/processors/full.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import bunyan from "bunyan";
import { config } from "../config";
import { MainnetService, SmartContractData } from "../service/mainnet";
import { SubnetBlockInfo, SubnetService } from "../service/subnet";
import { Cache } from "../service/cache";
import { chunkBy, sleep } from "../utils";
import { ForkingError } from "../errors/forkingError";
import { BaseProcessor } from "./base";
Expand All @@ -13,15 +12,13 @@ const REPEAT_JOB_OPT = { jobId: NAME, repeat: { cron: config.cronJob.jobExpressi
export class Full extends BaseProcessor {
private mainnetService: MainnetService;
private subnetService: SubnetService;
cache: Cache;
logger: bunyan;

constructor(logger: bunyan) {
super(NAME);
this.logger = logger;
this.mainnetService = new MainnetService(config.mainnet, logger);
this.subnetService = new SubnetService(config.subnet, logger);
this.cache = this.cache = new Cache(logger);
}

getQueue() {
Expand All @@ -48,73 +45,30 @@ export class Full extends BaseProcessor {

// Reset and start the state sync until success
async reset() {
try {
// Stop and remove repeatable jobs
await this.queue.removeRepeatable(NAME, REPEAT_JOB_OPT.repeat);
// Clean timer
this.cache.cleanCache();
// Pull latest confirmed tx from mainnet
const smartContractData = await this.mainnetService.getLastAudittedBlock();
// Pull latest confirm block from subnet
const lastestSubnetCommittedBlock =
await this.subnetService.getLastCommittedBlockInfo();
const { shouldProcess, from } = await this.shouldProcessSync(
smartContractData,
lastestSubnetCommittedBlock
);

if (shouldProcess) {
await this.submitTxs(
from,
lastestSubnetCommittedBlock.subnetBlockNumber
);
// Store subnet block into cache
this.cache.setLastSubmittedSubnetHeader(lastestSubnetCommittedBlock);
}
// Keep the "jobId: NAME" and its repeat configuration here so that bull won't create a new repeated job each time we run this code.
await this.queue.add({}, REPEAT_JOB_OPT);
} catch (error) {
this.logger.error(
`Error while bootstraping, system will go into sleep mode for ${
config.reBootstrapWaitingTime / 1000 / 60
} minutes before re-processing!, message: ${error?.message}`
);
await sleep(config.reBootstrapWaitingTime);
this.reset();
}
await this.queue.add({}, REPEAT_JOB_OPT);
}

async processEvent() {
// Pull subnet's latest committed block
const lastSubmittedSubnetBlock = await this.getLastSubmittedSubnetHeader();
const lastCommittedBlockInfo = await this.subnetService.getLastCommittedBlockInfo();
if (
lastCommittedBlockInfo.subnetBlockNumber <=
lastSubmittedSubnetBlock.subnetBlockNumber
) {
const msg = `Already on the latest, nothing to subnet, Subnet latest: ${lastCommittedBlockInfo.subnetBlockNumber}, smart contract latest: ${lastSubmittedSubnetBlock.subnetBlockNumber}`;
this.logger.info(msg);
return msg;
}
await this.submitTxs(
lastSubmittedSubnetBlock.subnetBlockNumber,
lastCommittedBlockInfo.subnetBlockNumber
// Pull latest confirmed tx from mainnet
const smartContractData = await this.mainnetService.getLastAuditedBlock();
// Pull latest confirmed block from subnet
const lastestSubnetCommittedBlock =
await this.subnetService.getLastCommittedBlockInfo();

const { shouldProcess, from, msg } = await this.shouldProcessSync(
smartContractData,
lastestSubnetCommittedBlock
);
this.cache.setLastSubmittedSubnetHeader(lastCommittedBlockInfo);
return `Successfully submitted subnet header up till ${lastCommittedBlockInfo.subnetBlockNumber} into parent chain`;

if (shouldProcess) {
await this.submitTxs(
from,
lastestSubnetCommittedBlock.subnetBlockNumber
);
return `Completed sync from ${from} to ${lastestSubnetCommittedBlock.subnetBlockNumber}`;
}
return msg;
};


async getLastSubmittedSubnetHeader(): Promise<SubnetBlockInfo> {
const lastSubmittedSubnetBlock = this.cache.getLastSubmittedSubnetHeader();
if (lastSubmittedSubnetBlock) return lastSubmittedSubnetBlock;
// Else, our cache don't have such data
const smartContractData = await this.mainnetService.getLastAudittedBlock();
return await this.subnetService.getCommittedBlockInfoByNum(
smartContractData.smartContractHeight
);
}


// "from" is exclusive, we submit blocks "from + 1" till "to"
private async submitTxs(from: number, to: number): Promise<void> {
Expand All @@ -140,7 +94,7 @@ export class Full extends BaseProcessor {
private async shouldProcessSync(
smartContractData: SmartContractData,
lastestSubnetCommittedBlock: SubnetBlockInfo
): Promise<{ shouldProcess: boolean; from?: number }> {
): Promise<{ shouldProcess: boolean; msg?: string, from?: number }> {
const { subnetBlockHash, subnetBlockNumber } = lastestSubnetCommittedBlock;
const {
smartContractHash,
Expand All @@ -163,10 +117,8 @@ export class Full extends BaseProcessor {
subnetBlockHash
);
}
this.logger.info(
"Smart contract is ahead of subnet, nothing needs to be done, just wait"
);
return { shouldProcess: false };
const msg = "Smart contract is ahead of subnet, nothing needs to be done, just wait";
return { shouldProcess: false, msg };
} else if (subnetBlockNumber == smartContractCommittedHeight) {
if (smartContractCommittedHash != subnetBlockHash) {
this.logger.error(
Expand All @@ -178,10 +130,7 @@ export class Full extends BaseProcessor {
subnetBlockHash
);
}
this.logger.info(
"Smart contract committed and subnet are already in sync, nothing needs to be done, waiting for new blocks"
);
return { shouldProcess: false };
return { shouldProcess: false, msg: "Smart contract committed and subnet are already in sync, nothing needs to be done, waiting for new blocks" };
} else {
// Check the committed
const auditedCommittedBlockInfoInSubnet =
Expand All @@ -204,12 +153,9 @@ export class Full extends BaseProcessor {
// Verification for committed blocks are completed! We need to check where we shall start sync based on the last audited block (smartContractHash and height) in mainnet
if (smartContractHash == subnetBlockHash) {
// Same block height and hash
this.logger.info(
"Smart contract latest and subnet are already in sync, nothing needs to be done, waiting for new blocks"
);
return { shouldProcess: false };
return { shouldProcess: false, msg: "Smart contract latest and subnet are already in sync, nothing needs to be done, waiting for new blocks" };
} else if (subnetBlockNumber < smartContractHeight) {
// This is when subnet is behind the mainnet latest auditted
// This is when subnet is behind the mainnet latest audited
const subnetHashInSmartContract =
await this.mainnetService.getBlockHashByNumber(subnetBlockNumber);
if (subnetHashInSmartContract != subnetBlockHash) {
Expand All @@ -220,25 +166,26 @@ export class Full extends BaseProcessor {
return {
shouldProcess: true,
from: divergingHeight,
msg: `Forking happened but not yet committed on mainnet, we will need to recursively submit subnet headers from diverging point of ${divergingHeight}`
};
}
this.logger.warn(
"Subnet is behind mainnet latest auditted blocks! This usually means there is another relayer on a different node who is ahead of this relayer in terms of mining and submitting txs. OR there gonna be forking soon!"
);
return { shouldProcess: false };
return {
shouldProcess: false,
msg: "Subnet is behind mainnet latest audited blocks! This usually means there is another relayer on a different node who is ahead of this relayer in terms of mining and submitting txs. OR there gonna be forking soon!"
};
}
// Below is the case where subnet is ahead of mainnet and we need to do some more checks before submit txs
const audittedBlockInfoInSubnet =
const auditedBlockInfoInSubnet =
await this.subnetService.getCommittedBlockInfoByNum(
smartContractHeight
);
if (audittedBlockInfoInSubnet.subnetBlockHash != smartContractHash) {
if (auditedBlockInfoInSubnet.subnetBlockHash != smartContractHash) {
const { divergingHeight } = await this.findDivergingPoint(
smartContractHeight
);
return {
shouldProcess: true,
from: divergingHeight,
from: divergingHeight
};
}
// Everything seems normal, we will just submit txs from this point onwards.
Expand All @@ -249,24 +196,27 @@ export class Full extends BaseProcessor {
}
}


// Find the point where after this "divering block", chain start to split (fork)
private async findDivergingPoint(
heightToSearchFrom: number
): Promise<{ divergingHeight: number; divergingHash: string }> {
const mainnetHash = await this.mainnetService.getBlockHashByNumber(
heightToSearchFrom
);
const subnetBlockInfo = await this.subnetService.getCommittedBlockInfoByNum(
heightToSearchFrom
);
if (mainnetHash != subnetBlockInfo.subnetBlockHash) {
return this.findDivergingPoint(heightToSearchFrom - 1);
let currentHeight = heightToSearchFrom;
let mainnetHash: string;
let subnetBlockInfo: SubnetBlockInfo;

while (currentHeight > 0) {
mainnetHash = await this.mainnetService.getBlockHashByNumber(currentHeight);
subnetBlockInfo = await this.subnetService.getCommittedBlockInfoByNum(currentHeight);

if (mainnetHash != subnetBlockInfo.subnetBlockHash) {
currentHeight--;
} else {
break;
}
}
return {
divergingHash: mainnetHash,
divergingHeight: heightToSearchFrom,
divergingHeight: currentHeight
};
}

}
4 changes: 2 additions & 2 deletions src/processors/lite.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ export class Lite extends BaseProcessor {

private async processEvent() {
// Pull latest confirmed tx from mainnet
const latestBlock = await this.liteMainnetService.getLastAudittedBlock();
const latestBlock = await this.liteMainnetService.getLastAuditedBlock();
// Pull latest confirm block from subnet
const lastestSubnetCommittedBlock =
await this.subnetService.getLastCommittedBlockInfo();
Expand Down Expand Up @@ -125,7 +125,7 @@ export class Lite extends BaseProcessor {
await this.liteMainnetService.submitTxs(results);
}

const last = await this.liteMainnetService.getLastAudittedBlock();
const last = await this.liteMainnetService.getLastAuditedBlock();
scCommittedHeight = last.smartContractCommittedHeight;
scHash = last.smartContractHash;
}
Expand Down
8 changes: 4 additions & 4 deletions src/service/mainnet/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ export class MainnetService {
/*
A method to fetch the last subnet block that has been stored/audited in mainnet XDC
**/
async getLastAudittedBlock(): Promise<SmartContractData> {
async getLastAuditedBlock(): Promise<SmartContractData> {
try {
const result = await this.smartContractInstance.methods
.getLatestBlocks()
Expand All @@ -72,7 +72,7 @@ export class MainnetService {
latestSmComittedHash,
latestSmHeight
);
throw new Error("Unable to get last auditted block informations");
throw new Error("Unable to get last audited block informations");
}
return {
smartContractHash: latestBlockHash,
Expand Down Expand Up @@ -182,7 +182,7 @@ export class LiteMainnetService {
/*
A method to fetch the last subnet block that has been stored/audited in mainnet XDC
**/
async getLastAudittedBlock(): Promise<SmartContractData> {
async getLastAuditedBlock(): Promise<SmartContractData> {
try {
const result = await this.liteSmartContractInstance.methods
.getLatestBlocks()
Expand All @@ -202,7 +202,7 @@ export class LiteMainnetService {
latestSmComittedHash,
latestSmHeight
);
throw new Error("Unable to get last auditted block informations");
throw new Error("Unable to get last audited block informations");
}
return {
smartContractHash: latestBlockHash,
Expand Down

0 comments on commit 70ff66e

Please sign in to comment.