Skip to content
This repository has been archived by the owner on Jun 11, 2024. It is now read-only.

Commit

Permalink
Merge pull request #1926 from LiskHQ/1923-add-failsafe-for-blockchain…
Browse files Browse the repository at this point in the history
…-fork

Add failsafe for blockchain fork
  • Loading branch information
sameersubudhi authored Nov 17, 2023
2 parents bc47bcc + 80c14c5 commit 5284ad8
Showing 1 changed file with 82 additions and 38 deletions.
120 changes: 82 additions & 38 deletions services/blockchain-indexer/shared/indexer/blockchainIndex.js
Original file line number Diff line number Diff line change
Expand Up @@ -137,9 +137,10 @@ const indexBlock = async job => {
let blockHeightToIndex = blockHeightFromJobData;
let addressesToUpdateBalance = [];
let dbTrx;
let block;
let blockToIndexFromNode;

try {
const genesisHeight = await getGenesisHeight();
const blocksTable = await getBlocksTable();

const [lastIndexedBlock = {}] = await blocksTable.find(
Expand All @@ -164,9 +165,51 @@ const indexBlock = async job => {
where: { height: blockHeightToIndex },
limit: 1,
},
['height'],
['id', 'height'],
);

let prevBlockInDB = {};
if (blockHeightToIndex > genesisHeight + 1) {
[prevBlockInDB] = await blocksTable.find(
{
where: { height: blockHeightToIndex - 1 },
limit: 1,
},
['id', 'height'],
);
}

// Get block from node
blockToIndexFromNode = await getBlockByHeight(blockHeightToIndex);
if (!validateBlock(blockToIndexFromNode)) {
throw new Error(
`Invalid block ${blockToIndexFromNode.id} at height ${blockToIndexFromNode.height}.`,
);
}

// If current index block is incorrectly indexed then schedule for deletion
if (Object.keys(currentBlockInDB).length && blockToIndexFromNode.id !== currentBlockInDB.id) {
// eslint-disable-next-line no-use-before-define
await scheduleBlockDeletion(currentBlockInDB);
// eslint-disable-next-line no-use-before-define
await addHeightToIndexBlocksQueue(currentBlockInDB.height);

return;
}

// Incase prev block is incorrect schedule that for deletion
if (
Object.keys(prevBlockInDB).length &&
prevBlockInDB.id !== blockToIndexFromNode.previousBlockID
) {
// eslint-disable-next-line no-use-before-define
await scheduleBlockDeletion(prevBlockInDB);
// eslint-disable-next-line no-use-before-define
await addHeightToIndexBlocksQueue(prevBlockInDB.height);

return;
}

// If current block is already indexed, then index the highest indexed block height + 1
if (Object.keys(currentBlockInDB).length) {
// Skip indexing if the blockchain is fully indexed.
Expand All @@ -176,41 +219,36 @@ const indexBlock = async job => {
blockHeightToIndex = lastIndexedHeight + 1;
}

block = await getBlockByHeight(blockHeightToIndex);
if (!validateBlock(block)) {
throw new Error(`Invalid block ${block.id} at height ${block.height}.`);
}

// Create DB transaction. Queries from here sees a snapshot of the database
const connection = await getDBConnection(MYSQL_ENDPOINT);
dbTrx = await startDBTransaction(connection);
logger.debug(
`Created new MySQL transaction to index block ${block.id} at height ${block.height}.`,
`Created new MySQL transaction to index block ${blockToIndexFromNode.id} at height ${blockToIndexFromNode.height}.`,
);

let blockReward = BigInt('0');

if (block.height === (await getGenesisHeight())) {
if (blockToIndexFromNode.height === genesisHeight) {
await indexGenesisBlockAssets(dbTrx);
}

const events = await getEventsByHeight(block.height);
cacheEventsByBlockID(block.id, events);
const events = await getEventsByHeight(blockToIndexFromNode.height);
cacheEventsByBlockID(blockToIndexFromNode.id, events);

if (block.transactions.length) {
const { transactions, assets, ...blockHeader } = block;
if (blockToIndexFromNode.transactions.length) {
const { transactions, assets, ...blockHeader } = blockToIndexFromNode;

const transactionsTable = await getTransactionsTable();
await BluebirdPromise.map(
block.transactions,
blockToIndexFromNode.transactions,
async (tx, index) => {
// Apply default transformations and index with minimal information by default
tx.index = index;
tx.moduleCommand = `${tx.module}:${tx.command}`;
tx.blockID = block.id;
tx.height = block.height;
tx.blockID = blockToIndexFromNode.id;
tx.height = blockToIndexFromNode.height;
tx.senderAddress = getLisk32AddressFromPublicKey(tx.senderPublicKey);
tx.timestamp = block.timestamp;
tx.timestamp = blockToIndexFromNode.timestamp;
tx.executionStatus = getTransactionExecutionStatus(tx, events);

// Store address -> publicKey mapping
Expand All @@ -221,7 +259,7 @@ const indexBlock = async job => {
// Invoke 'applyTransaction' to execute command specific processing logic
await applyTransaction(blockHeader, tx, events, dbTrx);
},
{ concurrency: block.transactions.length },
{ concurrency: blockToIndexFromNode.transactions.length },
);
}

Expand All @@ -230,14 +268,14 @@ const indexBlock = async job => {
const numRowsAffected = await validatorsTable.increment(
{
increment: { generatedBlocks: 1 },
where: { address: block.generatorAddress },
where: { address: blockToIndexFromNode.generatorAddress },
},
dbTrx,
);
if (numRowsAffected === 0) {
await validatorsTable.upsert(
{
address: block.generatorAddress,
address: blockToIndexFromNode.generatorAddress,
generatedBlocks: 1,
},
dbTrx,
Expand All @@ -248,7 +286,7 @@ const indexBlock = async job => {
const eventsTable = await getEventsTable();
const eventTopicsTable = await getEventTopicsTable();

const { eventsInfo, eventTopicsInfo } = getEventsInfoToIndex(block, events);
const { eventsInfo, eventTopicsInfo } = getEventsInfoToIndex(blockToIndexFromNode, events);
await eventsTable.upsert(eventsInfo, dbTrx);
await eventTopicsTable.upsert(eventTopicsInfo, dbTrx);

Expand All @@ -263,41 +301,41 @@ const indexBlock = async job => {

if (blockReward !== BigInt('0')) {
const commissionAmount = await calcCommissionAmount(
block.generatorAddress,
block.height,
blockToIndexFromNode.generatorAddress,
blockToIndexFromNode.height,
blockReward,
);
const selfStakeReward = await calcSelfStakeReward(
block.generatorAddress,
blockToIndexFromNode.generatorAddress,
blockReward,
commissionAmount,
);

logger.trace(
`Increasing commission for validator ${block.generatorAddress} by ${commissionAmount}.`,
`Increasing commission for validator ${blockToIndexFromNode.generatorAddress} by ${commissionAmount}.`,
);
await validatorsTable.increment(
{
increment: { totalCommission: BigInt(commissionAmount) },
where: { address: block.generatorAddress },
where: { address: blockToIndexFromNode.generatorAddress },
},
dbTrx,
);
logger.debug(
`Increased commission for validator ${block.generatorAddress} by ${commissionAmount}.`,
`Increased commission for validator ${blockToIndexFromNode.generatorAddress} by ${commissionAmount}.`,
);
logger.trace(
`Increasing self-stake rewards for validator ${block.generatorAddress} by ${selfStakeReward}.`,
`Increasing self-stake rewards for validator ${blockToIndexFromNode.generatorAddress} by ${selfStakeReward}.`,
);
await validatorsTable.increment(
{
increment: { totalSelfStakeRewards: BigInt(selfStakeReward) },
where: { address: block.generatorAddress },
where: { address: blockToIndexFromNode.generatorAddress },
},
dbTrx,
);
logger.debug(
`Increased self-stake rewards for validator ${block.generatorAddress} by ${selfStakeReward}.`,
`Increased self-stake rewards for validator ${blockToIndexFromNode.generatorAddress} by ${selfStakeReward}.`,
);
}
}
Expand Down Expand Up @@ -327,32 +365,37 @@ const indexBlock = async job => {
}

const blockToIndex = {
...block,
assetsModules: block.assets.map(asset => asset.module),
...blockToIndexFromNode,
assetsModules: blockToIndexFromNode.assets.map(asset => asset.module),
numberOfEvents: events.length,
reward: blockReward,
};

await blocksTable.upsert(blockToIndex, dbTrx);
await commitDBTransaction(dbTrx);
logger.debug(
`Committed MySQL transaction to index block ${block.id} at height ${block.height}.`,
`Committed MySQL transaction to index block ${blockToIndexFromNode.id} at height ${blockToIndexFromNode.height}.`,
);

// Add safety check to ensure that the DB transaction is actually committed
await waitForIt(
checkBlockHeightIndexStatusInDB.bind(null, block.height, DB_STATUS.COMMIT),
checkBlockHeightIndexStatusInDB.bind(null, blockToIndexFromNode.height, DB_STATUS.COMMIT),
config.db.durabilityVerifyFrequency,
);

// Only schedule address balance updates if the block is indexed successfully
await scheduleAddressesBalanceUpdate(addressesToUpdateBalance);
logger.info(`Successfully indexed block ${block.id} at height ${block.height}.`);
logger.info(
`Successfully indexed block ${blockToIndexFromNode.id} at height ${blockToIndexFromNode.height}.`,
);
} catch (error) {
// Block may not have been initialized when error occurred
const failedBlockInfo = {
id: typeof block === 'undefined' ? undefined : block.id,
height: typeof block === 'undefined' ? blockHeightToIndex : block.height,
id: typeof blockToIndexFromNode === 'undefined' ? undefined : blockToIndexFromNode.id,
height:
typeof blockToIndexFromNode === 'undefined'
? blockHeightToIndex
: blockToIndexFromNode.height,
};

// Processing may fail before a transaction is created
Expand All @@ -364,7 +407,7 @@ const indexBlock = async job => {

// Add safety check to ensure that the DB transaction is rolled back successfully
await waitForIt(
checkBlockHeightIndexStatusInDB.bind(null, block.height, DB_STATUS.ROLLBACK),
checkBlockHeightIndexStatusInDB.bind(null, blockToIndexFromNode.height, DB_STATUS.ROLLBACK),
config.db.durabilityVerifyFrequency,
);
}
Expand Down Expand Up @@ -617,6 +660,7 @@ const deleteIndexedBlocks = async job => {
logger.warn(
`Deleting block(s) with ID(s): ${blockIDs} failed due to: ${error.message}. Will retry.`,
);
logger.warn(error.stack);
throw error;
}
};
Expand Down

0 comments on commit 5284ad8

Please sign in to comment.