Skip to content
This repository has been archived by the owner on Jun 11, 2024. It is now read-only.

Commit

Permalink
Improve indexing process (#2047)
Browse files Browse the repository at this point in the history
* 🐎 Improve indexing speed

- Replace token_getBalances invocation with token_getBalance

* 🔨 Include mainnet and testnet genesis blocks within connector
  • Loading branch information
sameersubudhi authored Apr 9, 2024
1 parent 0e21e4c commit 0c575fe
Show file tree
Hide file tree
Showing 8 changed files with 145 additions and 23 deletions.
2 changes: 1 addition & 1 deletion docs/CONTRIBUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ _Before_ submitting a pull request, please make sure the following is done:
Airbnb with the
[lisk extension](https://github.com/LiskHQ/eslint-config-lisk-base).
1. Format your code using [Prettier](https://prettier.io/). This can be performed manually
with `npm run format`.
with `yarn format`.
1. Submit a pull request via GitHub. Include issue numbers in the PR title, at
the end with: `Description - Closes #IssueNumber`.
1. Check that Jenkins CI tests pass (pull request turns green). First time
Expand Down
8 changes: 7 additions & 1 deletion services/blockchain-connector/.dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -67,4 +67,10 @@ db_data
benchmark

# Data download directory
data
data/*
!data/00000000
data/00000000/*
!data/00000000/genesis_block.json
!data/01000000
data/01000000/*
!data/01000000/genesis_block.json
8 changes: 7 additions & 1 deletion services/blockchain-connector/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -62,4 +62,10 @@ xunit-report.xml
db_data

# Data download directory
data
data/*
!data/00000000
data/00000000/*
!data/00000000/genesis_block.json
!data/01000000
data/01000000/*
!data/01000000/genesis_block.json
47 changes: 47 additions & 0 deletions services/blockchain-connector/data/00000000/genesis_block.json

Large diffs are not rendered by default.

47 changes: 47 additions & 0 deletions services/blockchain-connector/data/01000000/genesis_block.json

Large diffs are not rendered by default.

7 changes: 6 additions & 1 deletion services/blockchain-connector/shared/sdk/blocksUtils.js
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,12 @@ const getGenesisBlockFromFS = async () => {
}

const block = await new Promise((resolve, reject) => {
readStream.pipe(parseStream.on('data', data => resolve(data)));
readStream.pipe(
parseStream.on('data', data => {
logger.info('Successfully read the genesis block from the FS.');
return resolve(data);
}),
);
parseStream.on('error', err => reject(err));
});

Expand Down
45 changes: 28 additions & 17 deletions services/blockchain-indexer/shared/indexer/accountBalanceIndex.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,10 @@ const {
} = require('lisk-service-framework');

const config = require('../../config');
const accountBalancesTableSchema = require('../database/schema/accountBalances');

const { MODULE } = require('../constants');
const { getTokenBalances } = require('../dataService');
const accountBalancesTableSchema = require('../database/schema/accountBalances');

const logger = Logger();

Expand All @@ -34,9 +35,11 @@ const MYSQL_ENDPOINT = config.endpoints.mysql;

const getAccountBalancesTable = () => getTableInstance(accountBalancesTableSchema, MYSQL_ENDPOINT);

const updateAccountBalances = async address => {
const updateAccountBalances = async addressTokenID => {
const accountBalancesTable = await getAccountBalancesTable();
const { data: balanceInfos } = await getTokenBalances({ address });
const [address, cTokenID] = addressTokenID.split('_');
const params = cTokenID ? { address, tokenID: cTokenID } : { address };
const { data: balanceInfos } = await getTokenBalances(params);

const updatedTokenBalances = balanceInfos.map(balanceInfo => {
const { tokenID, availableBalance, lockedBalances } = balanceInfo;
Expand All @@ -56,14 +59,14 @@ const updateAccountBalances = async address => {
await accountBalancesTable.upsert(updatedTokenBalances);
};

const scheduleAddressesBalanceUpdate = async addresses => {
if (addresses.length) {
redis.sadd(config.set.accountBalanceUpdate.name, addresses);
const scheduleAddressesBalanceUpdate = async addressTokens => {
if (addressTokens.length) {
redis.sadd(config.set.accountBalanceUpdate.name, addressTokens);
}
};

const getAddressesFromTokenEvents = events => {
const addressesToUpdate = [];
const getAddressesFromTokenEvents = async events => {
const addressTokensToUpdate = [];
const tokenModuleEvents = events.filter(event => event.module === MODULE.TOKEN);

// eslint-disable-next-line no-restricted-syntax
Expand All @@ -74,33 +77,41 @@ const getAddressesFromTokenEvents = events => {
for (const key of eventDataKeys) {
if (key.toLowerCase().includes('address')) {
const address = eventData[key];
addressesToUpdate.push(address);
const tokenID = eventData.tokenID || eventData.messageFeeTokenID;
if (tokenID) {
addressTokensToUpdate.push(`${address}_${tokenID}`);
} else {
addressTokensToUpdate.push(address);
}
}
}
}

return addressesToUpdate;
return addressTokensToUpdate;
};

const triggerAccountsBalanceUpdate = async () => {
const addresses = await redis.spop(
const addressTokenEntries = await redis.spop(
config.set.accountBalanceUpdate.name,
config.set.accountBalanceUpdate.batchSize,
);

const numAddressesScheduled = addresses.length;
const numAddressesScheduled = addressTokenEntries.length;
try {
// eslint-disable-next-line no-restricted-syntax
while (addresses.length) {
const address = addresses.shift();
await updateAccountBalances(address);
while (addressTokenEntries.length) {
const addressToken = addressTokenEntries.shift();
await updateAccountBalances(addressToken).catch(err => {
addressTokenEntries.push(addressToken);
throw err;
});
}
logger.info(`Successfully updated account balances for ${numAddressesScheduled} account(s).`);
} catch (err) {
// Reschedule accounts balance update on error for remaining addresses
await scheduleAddressesBalanceUpdate(addresses);
await scheduleAddressesBalanceUpdate(addressTokenEntries);

const numPending = addresses.length;
const numPending = addressTokenEntries.length;
const numSuccess = numAddressesScheduled - numPending;
logger.info(
`Successfully updated account balances for ${numSuccess} account(s). Rescheduling updates for ${numPending} account(s).`,
Expand Down
4 changes: 2 additions & 2 deletions services/blockchain-indexer/shared/indexer/blockchainIndex.js
Original file line number Diff line number Diff line change
Expand Up @@ -354,7 +354,7 @@ const indexBlock = async job => {
await updateTotalLockedAmounts(tokenIDLockedAmountChangeMap, dbTrx);

// Get addresses to schedule account balance updates from token module events
addressesToUpdateBalance = getAddressesFromTokenEvents(events);
addressesToUpdateBalance = await getAddressesFromTokenEvents(events);
}

const blockToIndex = {
Expand Down Expand Up @@ -624,7 +624,7 @@ const deleteIndexedBlocks = async job => {
await updateTotalLockedAmounts(tokenIDLockedAmountChangeMap, dbTrx);

// Get addresses to schedule account balance updates from token module events
addressesToUpdateBalance = getAddressesFromTokenEvents(events);
addressesToUpdateBalance = await getAddressesFromTokenEvents(events);
}

// Invalidate cached events for this block. Must be done after processing all event related calculations
Expand Down

0 comments on commit 0c575fe

Please sign in to comment.