Skip to content
This repository has been archived by the owner on Jun 11, 2024. It is now read-only.

Commit

Permalink
Merge pull request #1982 from LiskHQ/1980-indexer-queries-for-block-h…
Browse files Browse the repository at this point in the history
…eights-in-future

Indexer queries for block heights in future
  • Loading branch information
sameersubudhi authored Dec 11, 2023
2 parents 8a5ac48 + b4313e1 commit f209f34
Show file tree
Hide file tree
Showing 43 changed files with 131 additions and 107 deletions.
21 changes: 12 additions & 9 deletions docs/api/version3.md
Original file line number Diff line number Diff line change
Expand Up @@ -337,6 +337,12 @@ _Supports pagination._
"fee": "5166000",
"minFee": "165000",
"size": 166,
"block": {
"id": "ebb1ba587a1e8385a2aac1317edcb872c05b2b07df6560fabd0f0d23d7d6a0df",
"height": 122721,
"timestamp": 1678989430,
"isFinal": true
},
"sender": {
"address": "lskyvvam5rxyvbvofxbdfcupxetzmqxu22phm4yuo",
"publicKey": "475697e34ae02b394721020d38677a072dbd5c03d61c1c8fdd6563eb66160fa3",
Expand All @@ -348,21 +354,18 @@ _Supports pagination._
"recipientAddress": "lskezo8pcrbsoceuuu64rpc8w2qkont2ec3n772yu",
"data": ""
},
"block": {
"id": "ebb1ba587a1e8385a2aac1317edcb872c05b2b07df6560fabd0f0d23d7d6a0df",
"height": 122721,
"timestamp": 1678989430,
"isFinal": true
},
"signatures": [
"48425002226745847e155cf5480478c2336a43bb178439e9058cc2b50e26335cf7c8360b6c6a49793d7ae8d087bc746cab9618655e6a0adba4694cce2015b50f"
],
"executionStatus": "successful",
"index": 0,
"meta": {
"recipient": {
"address": "lskezo8pcrbsoceuuu64rpc8w2qkont2ec3n772yu",
"publicKey": null,
"name": null
}
},
"executionStatus": "successful",
"index": 0
}
},
],
"meta": {
Expand Down
2 changes: 1 addition & 1 deletion framework/src/queue.js
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ const queueInstance = (
});

queue.on('failed', (job, err) => {
logger.warn(`${job.name} job failed with error: ${err.message}.`);
logger.warn(`${job.name} job failed with error: ${err.message}`);
logger.debug(`${job.name} job failed with error:\n${err.stack}`);
});

Expand Down
2 changes: 1 addition & 1 deletion services/blockchain-app-registry/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ app.addEvents(path.join(__dirname, 'events'));

// Run the application
const reportErrorAndExitProcess = err => {
logger.fatal(`Failed to start service ${packageJson.name} due to: ${err.message}.`);
logger.fatal(`Failed to start service ${packageJson.name} due to: ${err.message}`);
logger.fatal(err.stack);
process.exit(1);
};
Expand Down
2 changes: 1 addition & 1 deletion services/blockchain-app-registry/jobs/deleteNonMetadata.js
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ module.exports = [
await removeEmptyDirectoriesAndNonMetaFiles(config.dataDir);
logger.info('Data directory has been successfully cleaned.');
} catch (err) {
logger.warn(`Cleaning data directory failed due to: ${err.message}.`);
logger.warn(`Cleaning data directory failed due to: ${err.message}`);
}
},
},
Expand Down
2 changes: 1 addition & 1 deletion services/blockchain-app-registry/jobs/metadata.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ module.exports = [
await syncWithRemoteRepo();
logger.info('Database has been successfully synchronized.');
} catch (err) {
logger.warn(`Refreshing blockchain application metadata failed due to: ${err.message}.`);
logger.warn(`Refreshing blockchain application metadata failed due to: ${err.message}`);
}
},
},
Expand Down
2 changes: 1 addition & 1 deletion services/blockchain-connector/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ nodeStatus.waitForNode().then(async () => {
await init();
})
.catch(err => {
logger.fatal(`Failed to start service ${packageJson.name} due to: ${err.message}.`);
logger.fatal(`Failed to start service ${packageJson.name} due to: ${err.message}`);
logger.fatal(err.stack);
process.exit(1);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ const chainNewBlockController = async cb => {
assets = block.assets;
} catch (err) {
logger.warn(
`Could not fetch block ${blockHeader.id} within chainNewBlockListener due to: ${err.message}.`,
`Could not fetch block ${blockHeader.id} within chainNewBlockListener due to: ${err.message}`,
);
logger.debug(err.stack);
}
Expand Down
2 changes: 1 addition & 1 deletion services/blockchain-connector/jobs/cacheCleanup.js
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ module.exports = [
logger.info('Cache has been successfully cleaned.');
}
} catch (err) {
logger.warn(`Cleaning cache failed due to: ${err.message}.`);
logger.warn(`Cleaning cache failed due to: ${err.message}`);
}
},
},
Expand Down
2 changes: 1 addition & 1 deletion services/blockchain-connector/shared/sdk/cache.js
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ const cacheBlocksFromWaitlist = async () => {
await blocksCache.upsert({ id: block.header.id, timestamp: block.header.timestamp, block });
} catch (err) {
logger.warn(
`Caching block ${block.header.id} (height: ${block.header.height}) failed. Will re-attempt. Error:\n${err.message}.`,
`Caching block ${block.header.id} (height: ${block.header.height}) failed. Will re-attempt.\nError: ${err.message}`,
);
logger.debug(err.stack);
blockCacheWaitlist.splice(0, 0, block);
Expand Down
16 changes: 11 additions & 5 deletions services/blockchain-connector/shared/sdk/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -89,11 +89,17 @@ const checkIsClientAlive = async () =>
const instantiateClient = async (isForceReInstantiate = false) => {
try {
if (!isInstantiating || isForceReInstantiate) {
isInstantiating = true;

if (!(await checkIsClientAlive()) || isForceReInstantiate) {
isInstantiating = true;
instantiationBeginTime = Date.now();

if (clientCache) await clientCache.disconnect();
if (clientCache) {
clientCache.disconnect().catch(err => {
// Ensure failed disconnection doesn't impact the re-instantiation
logger.warn(`Client disconnection failed due to: ${err.message}`);
});
}

clientCache = config.isUseLiskIPCClient
? await createIPCClient(config.liskAppDataPath)
Expand All @@ -105,15 +111,15 @@ const instantiateClient = async (isForceReInstantiate = false) => {

// Inform listeners about the newly instantiated ApiClient
Signals.get('newApiClient').dispatch();

isInstantiating = false;
}

isInstantiating = false;
return clientCache;
}

if (Date.now() - instantiationBeginTime > MAX_INSTANTIATION_WAIT_TIME) {
// Waited too long, reset the flag to re-attempt client instantiation
logger.warn(
logger.debug(
`MAX_INSTANTIATION_WAIT_TIME of ${MAX_INSTANTIATION_WAIT_TIME}ms has expired. Resetting isInstantiating to false.`,
);
isInstantiating = false;
Expand Down
2 changes: 1 addition & 1 deletion services/blockchain-connector/shared/sdk/events.js
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ const events = [
let eventsCounter;

const logError = (method, err) => {
logger.warn(`Invocation for ${method} failed with error: ${err.message}.`);
logger.warn(`Invocation for ${method} failed with error: ${err.message}`);
logger.debug(err.stack);
};

Expand Down
6 changes: 1 addition & 5 deletions services/blockchain-connector/shared/sdk/formatter.js
Original file line number Diff line number Diff line change
Expand Up @@ -61,10 +61,6 @@ const formatTransaction = (transaction, additionalFee = 0) => {
const schemaCompliantTransaction = parseInputBySchema(transaction, txSchema);

// Calculate transaction min fee
const transactionParams = codec.decodeJSON(
txParamsSchema,
Buffer.from(transaction.params, 'hex'),
);
const schemaCompliantTransactionParams = codec.decode(
txParamsSchema,
Buffer.from(transaction.params, 'hex'),
Expand All @@ -90,7 +86,7 @@ const formatTransaction = (transaction, additionalFee = 0) => {

const formattedTransaction = {
...transaction,
params: transactionParams,
params: codec.decodeJSON(txParamsSchema, Buffer.from(transaction.params, 'hex')),
size: transactionSize,
minFee: transactionMinFee,
};
Expand Down
8 changes: 6 additions & 2 deletions services/blockchain-connector/shared/sdk/pos.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,13 @@ const getAllPosValidators = async isForceReload => {
try {
if (!allPosValidators || isForceReload) {
const response = await invokeEndpoint('pos_getAllValidators');
if (Array.isArray(response)) {
if (response && Array.isArray(response.validators)) {
allPosValidators = response;
logger.info(`Reloaded pos validators list. Validators count: ${allPosValidators.length}.`);
logger.info(
`Reloaded PoS validators list with ${allPosValidators.validators.length} entries.`,
);
} else {
return response;
}
}
return allPosValidators;
Expand Down
2 changes: 1 addition & 1 deletion services/blockchain-coordinator/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ app
await init();
})
.catch(err => {
logger.fatal(`Failed to start service ${packageJson.name} due to: ${err.message}.`);
logger.fatal(`Failed to start service ${packageJson.name} due to: ${err.message}`);
logger.fatal(err.stack);
process.exit(1);
});
2 changes: 2 additions & 0 deletions services/blockchain-coordinator/config.js
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@ config.queue = {
};

config.job = {
progressRefreshInterval: 30 * 1000, // millisecs

// Interval takes priority over schedule and must be greater than 0 to be valid
indexMissingBlocks: {
interval: Number(process.env.JOB_INTERVAL_INDEX_MISSING_BLOCKS) || 0,
Expand Down
2 changes: 1 addition & 1 deletion services/blockchain-coordinator/jobs/missingBlocks.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ module.exports = [
logger.debug('Attempting to schedule indexing for the missing blocks.');
await scheduleMissingBlocksIndexing();
} catch (err) {
logger.warn(`Failed to schedule missing blocks indexing due to: ${err.message}.`);
logger.warn(`Failed to schedule missing blocks indexing due to: ${err.message}`);
logger.trace(err.stack);
}
},
Expand Down
46 changes: 22 additions & 24 deletions services/blockchain-coordinator/shared/scheduler.js
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ const accountMessageQueue = new MessageQueue(
);

let intervalID;
const REFRESH_INTERVAL = 30000;
const REFRESH_INTERVAL = config.job.progressRefreshInterval;

const getInProgressJobCount = async queue => {
const jobCount = await queue.getJobCounts();
Expand Down Expand Up @@ -231,18 +231,18 @@ const scheduleMissingBlocksIndexing = async () => {
return;
}

const genesisHeight = await getGenesisHeight();
const currentHeight = await getCurrentHeight();

// Skip job scheduling when the jobCount is greater than the threshold
const jobCount = await getLiveIndexingJobCount();
if (jobCount > config.job.indexMissingBlocks.skipThreshold) {
logger.info(
`Skipping missing blocks job run. ${jobCount} indexing jobs already in the queue. Current threshold set at: ${config.job.indexMissingBlocks.skipThreshold}.`,
`Skipping missing blocks job run. ${jobCount} indexing jobs already in the queue. Current threshold: ${config.job.indexMissingBlocks.skipThreshold}.`,
);
return;
}

const genesisHeight = await getGenesisHeight();
const currentHeight = await getCurrentHeight();

// Missing blocks are being checked during regular interval
// By default they are checked from the blockchain's beginning
const lastVerifiedHeight = (await getIndexVerifiedHeight()) || genesisHeight;
Expand All @@ -267,17 +267,14 @@ const scheduleMissingBlocksIndexing = async () => {

if (Array.isArray(result)) {
missingBlocksByHeight.push(...result);

if (result.length === 0) {
const lastIndexVerifiedHeight = (await getIndexVerifiedHeight()) || genesisHeight;
if (batchEndHeight <= lastIndexVerifiedHeight + MAX_QUERY_RANGE) {
if (NUM_BATCHES > 1 && i < NUM_BATCHES - 1) {
logger.info(
`No missing blocks found in range ${batchStartHeight} - ${batchEndHeight}. Setting index verified height to ${batchEndHeight}.`,
);
}
}
}
} else {
logger.warn(
`getMissingBlocks returned '${typeof result}' type instead of an Array.\nresult: ${JSON.stringify(
result,
null,
'\t',
)}`,
);
}
}

Expand All @@ -286,24 +283,25 @@ const scheduleMissingBlocksIndexing = async () => {
if (indexStatus) {
const { chainLength, numBlocksIndexed, lastBlockHeight } = indexStatus.data;
const numStillMissingJobs = chainLength - numBlocksIndexed - missingBlocksByHeight.length;
if (numStillMissingJobs > 0 && numStillMissingJobs <= 10) {

if (numStillMissingJobs > 0 && numStillMissingJobs <= 100) {
missingBlocksByHeight.push(
...range(lastBlockHeight - numStillMissingJobs + 1, lastBlockHeight + 1),
);
}
}

if (missingBlocksByHeight.length === 0) {
logger.info(
`No missing blocks found in range ${blockIndexLowerRange} - ${blockIndexHigherRange}. Setting index verified height to ${blockIndexHigherRange}.`,
);
} else {
if (missingBlocksByHeight.length) {
// Schedule indexing for the missing blocks
await scheduleBlocksIndexing(missingBlocksByHeight);
logger.info('Successfully scheduled missing blocks indexing.');
} else {
logger.info(
`No missing blocks found in range ${blockIndexLowerRange} - ${blockIndexHigherRange}.`,
);
}
} catch (err) {
logger.warn(`Scheduling to index missing blocks failed due to: ${err.message}.`);
logger.warn(`Scheduling to index missing blocks failed due to: ${err.message}`);
logger.trace(err.stack);
}
};
Expand All @@ -315,7 +313,7 @@ const init = async () => {
await initIndexingScheduler();
await initEventsScheduler();
} catch (err) {
logger.error(`Unable to initialize coordinator due to: ${err.message}.`);
logger.error(`Unable to initialize coordinator due to: ${err.message}`);
logger.trace(err.stack);
throw err;
}
Expand Down
3 changes: 2 additions & 1 deletion services/blockchain-coordinator/shared/sources/indexer.js
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ const isGenesisBlockIndexed = async () => {

const getIndexStatus = async () => requestIndexer('index.status').catch(() => null);

const getMissingBlocks = async (from, to) => requestIndexer('getMissingBlocks', { from, to });
const getMissingBlocks = async (from, to) =>
requestIndexer('getMissingBlocks', { from, to }).catch(err => err);

const getIndexVerifiedHeight = async () =>
requestIndexer('getIndexVerifiedHeight').catch(() => null);
Expand Down
2 changes: 1 addition & 1 deletion services/blockchain-indexer/app.js
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ const defaultBrokerConfig = {

// Add routes, events & jobs
const reportErrorAndExitProcess = err => {
logger.fatal(`Failed to start service ${packageJson.name} due to: ${err.message}.`);
logger.fatal(`Failed to start service ${packageJson.name} due to: ${err.message}`);
logger.fatal(err.stack);
process.exit(1);
};
Expand Down
4 changes: 2 additions & 2 deletions services/blockchain-indexer/jobs/dataService/knownAccounts.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ module.exports = [
await reloadAccountKnowledge();
logger.info('Successfully initialized accounts knowledge.');
} catch (err) {
logger.warn(`Initializing accounts knowledge failed due to: ${err.message}.`);
logger.warn(`Initializing accounts knowledge failed due to: ${err.message}`);
}
},
controller: async () => {
Expand All @@ -40,7 +40,7 @@ module.exports = [
await reloadAccountKnowledge();
logger.info('Successfully reloaded accounts knowledge.');
} catch (err) {
logger.warn(`Reloading accounts knowledge failed due to: ${err.message}.`);
logger.warn(`Reloading accounts knowledge failed due to: ${err.message}`);
}
},
},
Expand Down
2 changes: 1 addition & 1 deletion services/blockchain-indexer/jobs/dataService/validators.js
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ module.exports = [
try {
await validateValidatorCache();
} catch (err) {
logger.warn(`Validating validators cache failed due to: ${err.message}.`);
logger.warn(`Validating validators cache failed due to: ${err.message}`);
logger.debug(err.stack);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ module.exports = [
await deleteFinalizedCCUMetadata(finalizedHeight);
logger.info('Successfully deleted CCUs metadata until the finalized block height.');
} catch (err) {
logger.warn(`'Deleting CCUs metadata failed due to: ${err.message}.`);
logger.warn(`'Deleting CCUs metadata failed due to: ${err.message}`);
}
},
},
Expand Down
4 changes: 2 additions & 2 deletions services/blockchain-indexer/jobs/indexer/updateAccounts.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ module.exports = [
await triggerAccountUpdates();
logger.info('Triggered account updates successfully.');
} catch (err) {
logger.warn(`Triggering account updates failed due to: ${err.message}.`);
logger.warn(`Triggering account updates failed due to: ${err.message}`);
logger.trace(err.stack);
}
},
Expand All @@ -47,7 +47,7 @@ module.exports = [
await triggerAccountsBalanceUpdate();
logger.info('Triggered account balance updates successfully.');
} catch (err) {
logger.warn(`Triggering account balance updates failed due to: ${err.message}.`);
logger.warn(`Triggering account balance updates failed due to: ${err.message}`);
logger.trace(err.stack);
}
},
Expand Down
Loading

0 comments on commit f209f34

Please sign in to comment.