From 3f439ef875f7a47f169520e0f0bf7dcb8055edfa Mon Sep 17 00:00:00 2001 From: Cherry Bary <84170175+cherrybarry@users.noreply.github.com> Date: Mon, 30 Sep 2024 16:46:28 +0530 Subject: [PATCH] feat(indexer): add fast near in indexers (#512) --- apps/indexer-balance/README.md | 2 +- apps/indexer-balance/package.json | 1 + apps/indexer-balance/src/config.ts | 14 +++- apps/indexer-balance/src/index.ts | 5 +- apps/indexer-balance/src/libs/near.ts | 5 -- apps/indexer-balance/src/services/stream.ts | 47 +++++++++-- apps/indexer-balance/src/types/enum.ts | 4 + apps/indexer-balance/src/types/types.ts | 3 +- apps/indexer-base/README.md | 1 + apps/indexer-base/package.json | 1 + apps/indexer-base/src/config.ts | 6 ++ apps/indexer-base/src/index.ts | 5 +- apps/indexer-base/src/services/stream.ts | 47 +++++++++-- apps/indexer-base/src/types/enum.ts | 4 + apps/indexer-base/src/types/types.ts | 1 + apps/indexer-dex/README.md | 2 +- apps/indexer-dex/src/config.ts | 12 +-- apps/indexer-dex/src/index.ts | 5 +- apps/indexer-dex/src/libs/big.ts | 5 ++ apps/indexer-dex/src/libs/utils.ts | 3 +- apps/indexer-dex/src/services/stream.ts | 13 ++- apps/indexer-dex/src/types/types.ts | 1 + apps/indexer-events/README.md | 2 +- apps/indexer-events/package.json | 1 + apps/indexer-events/src/config.ts | 14 +++- apps/indexer-events/src/index.ts | 5 +- apps/indexer-events/src/libs/near.ts | 5 -- apps/indexer-events/src/services/stream.ts | 47 +++++++++-- apps/indexer-events/src/types/enum.ts | 4 + apps/indexer-events/src/types/types.ts | 3 +- packages/nb-neardata/package.json | 3 +- packages/nb-neardata/src/index.ts | 90 ++++++++++++++++----- 32 files changed, 279 insertions(+), 82 deletions(-) delete mode 100644 apps/indexer-balance/src/libs/near.ts create mode 100644 apps/indexer-balance/src/types/enum.ts create mode 100644 apps/indexer-base/src/types/enum.ts create mode 100644 apps/indexer-dex/src/libs/big.ts delete mode 100644 apps/indexer-events/src/libs/near.ts create mode 100644 apps/indexer-events/src/types/enum.ts diff --git a/apps/indexer-balance/README.md b/apps/indexer-balance/README.md index 38a6e472..93649bcb 100644 --- a/apps/indexer-balance/README.md +++ b/apps/indexer-balance/README.md @@ -6,7 +6,6 @@ Balance indexer collects the info about native NEAR token balance changes 📊 ( ``` DATABASE_URL= -RPC_URL= NETWORK=mainnet AWS_ACCESS_KEY_ID= AWS_SECRET_ACCESS_KEY= @@ -16,5 +15,6 @@ DATABASE_CA= DATABASE_CERT= DATABASE_KEY= BALANCE_START_BLOCK= +BALANCE_DATA_SOURCE= # NEAR_LAKE | FAST_NEAR SENTRY_DSN= ``` diff --git a/apps/indexer-balance/package.json b/apps/indexer-balance/package.json index 8f73cf9f..c003f494 100644 --- a/apps/indexer-balance/package.json +++ b/apps/indexer-balance/package.json @@ -29,6 +29,7 @@ "nb-lake": "*", "nb-logger": "*", "nb-near": "*", + "nb-neardata": "*", "nb-tsconfig": "*", "nb-types": "*", "nb-utils": "*", diff --git a/apps/indexer-balance/src/config.ts b/apps/indexer-balance/src/config.ts index 8837800f..8f7f021b 100644 --- a/apps/indexer-balance/src/config.ts +++ b/apps/indexer-balance/src/config.ts @@ -3,9 +3,14 @@ import { cleanEnv, num, str, url } from 'envalid'; import { types } from 'nb-lake'; import { Network } from 'nb-types'; +import { DataSource } from '#types/enum'; import { Config } from '#types/types'; const env = cleanEnv(process.env, { + BALANCE_DATA_SOURCE: str({ + choices: [DataSource.FAST_NEAR, DataSource.NEAR_LAKE], + default: DataSource.NEAR_LAKE, + }), BALANCE_START_BLOCK: num({ default: 0 }), DATABASE_CA: str({ default: '' }), DATABASE_CERT: str({ default: '' }), @@ -14,11 +19,11 @@ const env = cleanEnv(process.env, { NETWORK: str({ choices: [Network.MAINNET, Network.TESTNET], }), - RPC_URL: str(), S3_ENDPOINT: url({ default: '' }), SENTRY_DSN: str({ default: '' }), }); +const genesisHeight = env.NETWORK === Network.MAINNET ? 9_820_210 : 42_376_888; let s3Endpoint: null | types.EndpointConfig = null; const s3BucketName = env.NETWORK === Network.MAINNET @@ -37,15 +42,16 @@ if (env.S3_ENDPOINT) { const config: Config = { cacheExpiry: 60 * 60, // cache expiry time in seconds (1 hour) + dataSource: env.BALANCE_DATA_SOURCE, dbCa: env.DATABASE_CA, dbCert: env.DATABASE_CERT, dbKey: env.DATABASE_KEY, dbUrl: env.DATABASE_URL, - delta: 100, // start from blocks earlier on sync interuption + delta: 1_000, // start from blocks earlier on sync interuption + genesisHeight, insertLimit: 1_000, // records to insert into the db at a time network: env.NETWORK, - preloadSize: 10, // blocks to preload in nearlake - rpcUrl: env.RPC_URL, + preloadSize: 100, // blocks to preload in nearlake s3BucketName, s3Endpoint, s3RegionName: 'eu-central-1', diff --git a/apps/indexer-balance/src/index.ts b/apps/indexer-balance/src/index.ts index 460a6733..272c9950 100644 --- a/apps/indexer-balance/src/index.ts +++ b/apps/indexer-balance/src/index.ts @@ -7,7 +7,10 @@ import { syncData } from '#services/stream'; (async () => { try { - logger.info({ network: config.network }, 'initializing balance indexer...'); + logger.info( + { data_source: config.dataSource, network: config.network }, + 'initializing balance indexer...', + ); logger.info('syncing balance data...'); await syncData(); } catch (error) { diff --git a/apps/indexer-balance/src/libs/near.ts b/apps/indexer-balance/src/libs/near.ts deleted file mode 100644 index cf3bbdac..00000000 --- a/apps/indexer-balance/src/libs/near.ts +++ /dev/null @@ -1,5 +0,0 @@ -import { RPC } from 'nb-near'; - -import config from '#config'; - -export default new RPC(config.rpcUrl); diff --git a/apps/indexer-balance/src/services/stream.ts b/apps/indexer-balance/src/services/stream.ts index 0fe04bca..03124e61 100644 --- a/apps/indexer-balance/src/services/stream.ts +++ b/apps/indexer-balance/src/services/stream.ts @@ -1,10 +1,12 @@ import { stream, types } from 'nb-lake'; import { logger } from 'nb-logger'; +import { streamBlock } from 'nb-neardata'; import config from '#config'; import knex from '#libs/knex'; import sentry from '#libs/sentry'; import { storeBalance } from '#services/balance'; +import { DataSource } from '#types/enum'; const fetchBlocks = async (block: number, limit: number) => { try { @@ -40,16 +42,45 @@ export const syncData = async () => { const settings = await knex('settings').where({ key: balanceKey }).first(); const latestBlock = settings?.value?.sync; - if (!lakeConfig.startBlockHeight && latestBlock) { - const next = +latestBlock - config.delta; + if (config.dataSource === DataSource.FAST_NEAR) { + let startBlockHeight = config.startBlockHeight; - logger.info(`last synced block: ${latestBlock}`); - logger.info(`syncing from block: ${next}`); - lakeConfig.startBlockHeight = next; - } + if (!startBlockHeight && latestBlock) { + const next = +latestBlock - config.delta / 2; + startBlockHeight = next; + logger.info(`last synced block: ${latestBlock}`); + logger.info(`syncing from block: ${next}`); + } + + const stream = streamBlock({ + limit: config.preloadSize / 2, + network: config.network, + start: startBlockHeight || config.genesisHeight, + }); + + for await (const message of stream) { + await onMessage(message); + } - for await (const message of stream(lakeConfig)) { - await onMessage(message); + stream.on('end', () => { + logger.error('stream ended'); + process.exit(); + }); + stream.on('error', (error: Error) => { + logger.error(error); + process.exit(); + }); + } else { + if (!lakeConfig.startBlockHeight && latestBlock) { + const next = +latestBlock - config.delta; + lakeConfig.startBlockHeight = next; + logger.info(`last synced block: ${latestBlock}`); + logger.info(`syncing from block: ${next}`); + } + + for await (const message of stream(lakeConfig)) { + await onMessage(message); + } } }; diff --git a/apps/indexer-balance/src/types/enum.ts b/apps/indexer-balance/src/types/enum.ts new file mode 100644 index 00000000..57352d41 --- /dev/null +++ b/apps/indexer-balance/src/types/enum.ts @@ -0,0 +1,4 @@ +export enum DataSource { + FAST_NEAR = 'FAST_NEAR', + NEAR_LAKE = 'NEAR_LAKE', +} diff --git a/apps/indexer-balance/src/types/types.ts b/apps/indexer-balance/src/types/types.ts index de25a088..fd9543ec 100644 --- a/apps/indexer-balance/src/types/types.ts +++ b/apps/indexer-balance/src/types/types.ts @@ -3,15 +3,16 @@ import { Network, StateChangeCauseView, StateChangeValueView } from 'nb-types'; export type Config = { cacheExpiry: number; + dataSource: string; dbCa: string; dbCert: string; dbKey: string; dbUrl: string; delta: number; + genesisHeight: number; insertLimit: number; network: Network; preloadSize: number; - rpcUrl: string; s3BucketName: string; s3Endpoint: null | types.EndpointConfig; s3RegionName: string; diff --git a/apps/indexer-base/README.md b/apps/indexer-base/README.md index d94cc5ca..783d090b 100644 --- a/apps/indexer-base/README.md +++ b/apps/indexer-base/README.md @@ -19,5 +19,6 @@ DATABASE_CERT= DATABASE_KEY= REDIS_URL= BASE_START_BLOCK= +BASE_DATA_SOURCE= # NEAR_LAKE | FAST_NEAR SENTRY_DSN= ``` diff --git a/apps/indexer-base/package.json b/apps/indexer-base/package.json index 4af92310..ae86d994 100644 --- a/apps/indexer-base/package.json +++ b/apps/indexer-base/package.json @@ -33,6 +33,7 @@ "nb-json": "*", "nb-knex": "*", "nb-logger": "*", + "nb-neardata": "*", "nb-redis": "*", "nb-tsconfig": "*", "nb-types": "*", diff --git a/apps/indexer-base/src/config.ts b/apps/indexer-base/src/config.ts index 43c76cba..faad6f37 100644 --- a/apps/indexer-base/src/config.ts +++ b/apps/indexer-base/src/config.ts @@ -3,9 +3,14 @@ import { types } from 'near-lake-framework'; import { Network } from 'nb-types'; +import { DataSource } from '#types/enum'; import { Config } from '#types/types'; const env = cleanEnv(process.env, { + BASE_DATA_SOURCE: str({ + choices: [DataSource.FAST_NEAR, DataSource.NEAR_LAKE], + default: DataSource.NEAR_LAKE, + }), BASE_START_BLOCK: num({ default: 0 }), DATABASE_CA: str({ default: '' }), DATABASE_CERT: str({ default: '' }), @@ -46,6 +51,7 @@ if (env.S3_ENDPOINT) { const config: Config = { cacheExpiry: 5 * 60, // cache expiry time in seconds (5 min) + dataSource: env.BASE_DATA_SOURCE, dbCa: env.DATABASE_CA, dbCert: env.DATABASE_CERT, dbKey: env.DATABASE_KEY, diff --git a/apps/indexer-base/src/index.ts b/apps/indexer-base/src/index.ts index f5fd2b71..e72b2468 100644 --- a/apps/indexer-base/src/index.ts +++ b/apps/indexer-base/src/index.ts @@ -8,7 +8,10 @@ import { syncData } from '#services/stream'; (async () => { try { - logger.info({ network: config.network }, 'initializing base indexer...'); + logger.info( + { data_source: config.dataSource, network: config.network }, + 'initializing base indexer...', + ); logger.info('syncing genesis data...'); await syncGenesis(); logger.info('syncing blockchain data...'); diff --git a/apps/indexer-base/src/services/stream.ts b/apps/indexer-base/src/services/stream.ts index e1ff588f..af63ac8f 100644 --- a/apps/indexer-base/src/services/stream.ts +++ b/apps/indexer-base/src/services/stream.ts @@ -1,6 +1,7 @@ import { stream, types } from 'near-lake-framework'; import { logger } from 'nb-logger'; +import { streamBlock } from 'nb-neardata'; import config from '#config'; import knex from '#libs/knex'; @@ -13,6 +14,7 @@ import { storeChunks } from '#services/chunk'; import { storeExecutionOutcomes } from '#services/executionOutcome'; import { storeReceipts } from '#services/receipt'; import { storeTransactions } from '#services/transaction'; +import { DataSource } from '#types/enum'; const lakeConfig: types.LakeConfig = { blocksPreloadPoolSize: config.preloadSize, @@ -29,16 +31,45 @@ if (config.s3Endpoint) { export const syncData = async () => { const block = await knex('blocks').orderBy('block_height', 'desc').first(); - if (!lakeConfig.startBlockHeight && block) { - const next = +block.block_height - config.delta; + if (config.dataSource === DataSource.FAST_NEAR) { + let startBlockHeight = config.startBlockHeight; - logger.info(`last synced block: ${block.block_height}`); - logger.info(`syncing from block: ${next}`); - lakeConfig.startBlockHeight = next; - } + if (!startBlockHeight && block) { + const next = +block.block_height - config.delta / 2; + startBlockHeight = next; + logger.info(`last synced block: ${block.block_height}`); + logger.info(`syncing from block: ${next}`); + } + + const stream = streamBlock({ + limit: config.preloadSize / 2, + network: config.network, + start: startBlockHeight || config.genesisHeight, + }); + + for await (const message of stream) { + await onMessage(message); + } + + stream.on('end', () => { + logger.error('stream ended'); + process.exit(); + }); + stream.on('error', (error: Error) => { + logger.error(error); + process.exit(); + }); + } else { + if (!lakeConfig.startBlockHeight && block) { + const next = +block.block_height - config.delta; + lakeConfig.startBlockHeight = next; + logger.info(`last synced block: ${block.block_height}`); + logger.info(`syncing from block: ${next}`); + } - for await (const message of stream(lakeConfig)) { - await onMessage(message); + for await (const message of stream(lakeConfig)) { + await onMessage(message); + } } }; diff --git a/apps/indexer-base/src/types/enum.ts b/apps/indexer-base/src/types/enum.ts new file mode 100644 index 00000000..57352d41 --- /dev/null +++ b/apps/indexer-base/src/types/enum.ts @@ -0,0 +1,4 @@ +export enum DataSource { + FAST_NEAR = 'FAST_NEAR', + NEAR_LAKE = 'NEAR_LAKE', +} diff --git a/apps/indexer-base/src/types/types.ts b/apps/indexer-base/src/types/types.ts index b28a9d8c..d9153e01 100644 --- a/apps/indexer-base/src/types/types.ts +++ b/apps/indexer-base/src/types/types.ts @@ -4,6 +4,7 @@ import { AccessKeyPermissionKind, ActionKind, Network } from 'nb-types'; export interface Config { cacheExpiry: number; + dataSource: string; dbCa: string; dbCert: string; dbKey: string; diff --git a/apps/indexer-dex/README.md b/apps/indexer-dex/README.md index 428db7b9..1cb5b602 100644 --- a/apps/indexer-dex/README.md +++ b/apps/indexer-dex/README.md @@ -10,11 +10,11 @@ NETWORK=mainnet S3_ENDPOINT= AWS_ACCESS_KEY_ID= AWS_SECRET_ACCESS_KEY= -DATA_SOURCE= # NEAR_LAKE | FAST_NEAR # Optional DATABASE_CA= DATABASE_CERT= DATABASE_KEY= +DEX_DATA_SOURCE= # NEAR_LAKE | FAST_NEAR SENTRY_DSN= ``` diff --git a/apps/indexer-dex/src/config.ts b/apps/indexer-dex/src/config.ts index d77c3c51..31acb967 100644 --- a/apps/indexer-dex/src/config.ts +++ b/apps/indexer-dex/src/config.ts @@ -7,14 +7,14 @@ import { DataSource } from '#types/enum'; import { Config } from '#types/types'; const env = cleanEnv(process.env, { - DATA_SOURCE: str({ - choices: [DataSource.FAST_NEAR, DataSource.NEAR_LAKE], - default: DataSource.NEAR_LAKE, - }), DATABASE_CA: str({ default: '' }), DATABASE_CERT: str({ default: '' }), DATABASE_KEY: str({ default: '' }), DATABASE_URL: str(), + DEX_DATA_SOURCE: str({ + choices: [DataSource.FAST_NEAR, DataSource.NEAR_LAKE], + default: DataSource.NEAR_LAKE, + }), NETWORK: str({ choices: [Network.MAINNET, Network.TESTNET], }), @@ -22,6 +22,7 @@ const env = cleanEnv(process.env, { SENTRY_DSN: str({ default: '' }), }); +const genesisHeight = env.NETWORK === Network.MAINNET ? 9_820_210 : 42_376_888; let s3Endpoint: null | types.EndpointConfig = null; const s3BucketName = env.NETWORK === Network.MAINNET @@ -39,12 +40,13 @@ if (env.S3_ENDPOINT) { } const config: Config = { - dataSource: env.DATA_SOURCE, + dataSource: env.DEX_DATA_SOURCE, dbCa: env.DATABASE_CA, dbCert: env.DATABASE_CERT, dbKey: env.DATABASE_KEY, dbUrl: env.DATABASE_URL, delta: 500, + genesisHeight, NEAR_TOKEN: 'wrap.near', network: env.NETWORK, preloadSize: 50, diff --git a/apps/indexer-dex/src/index.ts b/apps/indexer-dex/src/index.ts index 706acbe2..43b779e9 100644 --- a/apps/indexer-dex/src/index.ts +++ b/apps/indexer-dex/src/index.ts @@ -9,7 +9,10 @@ import { syncData } from '#services/stream'; (async () => { try { if (config.network === Network.MAINNET) { - logger.info({ network: config.network }, 'initializing dex indexer...'); + logger.info( + { data_source: config.dataSource, network: config.network }, + 'initializing dex indexer...', + ); logger.info('syncing dex data...'); await syncData(); } diff --git a/apps/indexer-dex/src/libs/big.ts b/apps/indexer-dex/src/libs/big.ts new file mode 100644 index 00000000..2056a758 --- /dev/null +++ b/apps/indexer-dex/src/libs/big.ts @@ -0,0 +1,5 @@ +import Big from 'big.js'; + +Big.DP = 24; + +export default Big; diff --git a/apps/indexer-dex/src/libs/utils.ts b/apps/indexer-dex/src/libs/utils.ts index fb73321a..1a53c2ab 100644 --- a/apps/indexer-dex/src/libs/utils.ts +++ b/apps/indexer-dex/src/libs/utils.ts @@ -1,7 +1,5 @@ import { createRequire } from 'module'; -import Big from 'big.js'; - import { ExecutionStatus } from 'nb-neardata'; import { DexEventType, DexPairs } from 'nb-types'; @@ -9,6 +7,7 @@ import config from '#config'; import { DexEventIndex } from '#types/enum'; import { DexPairMeta, SwapPair } from '#types/types'; +import Big from './big.js'; import knex from './knex.js'; const require = createRequire(import.meta.url); diff --git a/apps/indexer-dex/src/services/stream.ts b/apps/indexer-dex/src/services/stream.ts index 1f393fed..29db7fb0 100644 --- a/apps/indexer-dex/src/services/stream.ts +++ b/apps/indexer-dex/src/services/stream.ts @@ -55,12 +55,19 @@ export const syncData = async () => { } if (config.dataSource === DataSource.FAST_NEAR) { - const stream = streamBlock(startBlockHeight, config.preloadSize); + const stream = streamBlock({ + network: config.network, + start: startBlockHeight, + }); - stream.on('data', async (message: types.StreamerMessage) => { + for await (const message of stream) { await onMessage(message); - }); + } + stream.on('end', () => { + logger.error('stream ended'); + process.exit(); + }); stream.on('error', (error: Error) => { logger.error(error); process.exit(); diff --git a/apps/indexer-dex/src/types/types.ts b/apps/indexer-dex/src/types/types.ts index 441ea863..9f440fe7 100644 --- a/apps/indexer-dex/src/types/types.ts +++ b/apps/indexer-dex/src/types/types.ts @@ -10,6 +10,7 @@ export type Config = { dbKey: string; dbUrl: string; delta: number; + genesisHeight: number; NEAR_TOKEN: string; network: Network; preloadSize: number; diff --git a/apps/indexer-events/README.md b/apps/indexer-events/README.md index cb01e2ad..da167fde 100644 --- a/apps/indexer-events/README.md +++ b/apps/indexer-events/README.md @@ -8,7 +8,6 @@ You can find the code for legacy token implementations in the `src/services/cont ``` DATABASE_URL= -RPC_URL= NETWORK=mainnet AWS_ACCESS_KEY_ID= AWS_SECRET_ACCESS_KEY= @@ -18,5 +17,6 @@ DATABASE_CA= DATABASE_CERT= DATABASE_KEY= EVENTS_START_BLOCK= +EVENTS_DATA_SOURCE= # NEAR_LAKE | FAST_NEAR SENTRY_DSN= ``` diff --git a/apps/indexer-events/package.json b/apps/indexer-events/package.json index b1b494e5..5f854e36 100644 --- a/apps/indexer-events/package.json +++ b/apps/indexer-events/package.json @@ -32,6 +32,7 @@ "nb-lake": "*", "nb-logger": "*", "nb-near": "*", + "nb-neardata": "*", "nb-tsconfig": "*", "nb-types": "*", "nb-utils": "*", diff --git a/apps/indexer-events/src/config.ts b/apps/indexer-events/src/config.ts index 5826c2f6..ebb8f43c 100644 --- a/apps/indexer-events/src/config.ts +++ b/apps/indexer-events/src/config.ts @@ -3,6 +3,7 @@ import { types } from 'near-lake-framework'; import { Network } from 'nb-types'; +import { DataSource } from '#types/enum'; import { Config } from '#types/types'; const env = cleanEnv(process.env, { @@ -10,15 +11,19 @@ const env = cleanEnv(process.env, { DATABASE_CERT: str({ default: '' }), DATABASE_KEY: str({ default: '' }), DATABASE_URL: str(), + EVENTS_DATA_SOURCE: str({ + choices: [DataSource.FAST_NEAR, DataSource.NEAR_LAKE], + default: DataSource.NEAR_LAKE, + }), EVENTS_START_BLOCK: num({ default: 0 }), NETWORK: str({ choices: [Network.MAINNET, Network.TESTNET], }), - RPC_URL: str(), S3_ENDPOINT: url({ default: '' }), SENTRY_DSN: str({ default: '' }), }); +const genesisHeight = env.NETWORK === Network.MAINNET ? 9_820_210 : 42_376_888; let s3Endpoint: null | types.EndpointConfig = null; const s3BucketName = env.NETWORK === Network.MAINNET @@ -37,15 +42,16 @@ if (env.S3_ENDPOINT) { const config: Config = { cacheExpiry: 60 * 60, // cache expiry time in seconds (1 hour) + dataSource: env.EVENTS_DATA_SOURCE, dbCa: env.DATABASE_CA, dbCert: env.DATABASE_CERT, dbKey: env.DATABASE_KEY, dbUrl: env.DATABASE_URL, - delta: 100, // start from blocks earlier on sync interuption + delta: 1_000, // start from blocks earlier on sync interuption + genesisHeight, insertLimit: 1_000, // records to insert into the db at a time network: env.NETWORK, - preloadSize: 10, // blocks to preload in nearlake - rpcUrl: env.RPC_URL, + preloadSize: 100, // blocks to preload in nearlake s3BucketName, s3Endpoint, s3RegionName: 'eu-central-1', diff --git a/apps/indexer-events/src/index.ts b/apps/indexer-events/src/index.ts index 6ede2277..c4c462b7 100644 --- a/apps/indexer-events/src/index.ts +++ b/apps/indexer-events/src/index.ts @@ -7,7 +7,10 @@ import { syncData } from '#services/stream'; (async () => { try { - logger.info({ network: config.network }, 'initializing events indexer...'); + logger.info( + { data_source: config.dataSource, network: config.network }, + 'initializing events indexer...', + ); logger.info('syncing events data...'); await syncData(); } catch (error) { diff --git a/apps/indexer-events/src/libs/near.ts b/apps/indexer-events/src/libs/near.ts deleted file mode 100644 index cf3bbdac..00000000 --- a/apps/indexer-events/src/libs/near.ts +++ /dev/null @@ -1,5 +0,0 @@ -import { RPC } from 'nb-near'; - -import config from '#config'; - -export default new RPC(config.rpcUrl); diff --git a/apps/indexer-events/src/services/stream.ts b/apps/indexer-events/src/services/stream.ts index 9318aacc..edd0fcf3 100644 --- a/apps/indexer-events/src/services/stream.ts +++ b/apps/indexer-events/src/services/stream.ts @@ -1,10 +1,12 @@ import { stream, types } from 'nb-lake'; import { logger } from 'nb-logger'; +import { streamBlock } from 'nb-neardata'; import config from '#config'; import knex from '#libs/knex'; import sentry from '#libs/sentry'; import { storeEvents } from '#services/events'; +import { DataSource } from '#types/enum'; const fetchBlocks = async (block: number, limit: number) => { try { @@ -40,16 +42,45 @@ export const syncData = async () => { const settings = await knex('settings').where({ key: eventsKey }).first(); const latestBlock = settings?.value?.sync; - if (!lakeConfig.startBlockHeight && latestBlock) { - const next = +latestBlock - config.delta; + if (config.dataSource === DataSource.FAST_NEAR) { + let startBlockHeight = config.startBlockHeight; - logger.info(`last synced block: ${latestBlock}`); - logger.info(`syncing from block: ${next}`); - lakeConfig.startBlockHeight = next; - } + if (!startBlockHeight && latestBlock) { + const next = +latestBlock - config.delta / 2; + startBlockHeight = next; + logger.info(`last synced block: ${latestBlock}`); + logger.info(`syncing from block: ${next}`); + } + + const stream = streamBlock({ + limit: config.preloadSize / 2, + network: config.network, + start: startBlockHeight || config.genesisHeight, + }); + + for await (const message of stream) { + await onMessage(message); + } - for await (const message of stream(lakeConfig)) { - await onMessage(message); + stream.on('end', () => { + logger.error('stream ended'); + process.exit(); + }); + stream.on('error', (error: Error) => { + logger.error(error); + process.exit(); + }); + } else { + if (!lakeConfig.startBlockHeight && latestBlock) { + const next = +latestBlock - config.delta; + lakeConfig.startBlockHeight = next; + logger.info(`last synced block: ${latestBlock}`); + logger.info(`syncing from block: ${next}`); + } + + for await (const message of stream(lakeConfig)) { + await onMessage(message); + } } }; diff --git a/apps/indexer-events/src/types/enum.ts b/apps/indexer-events/src/types/enum.ts new file mode 100644 index 00000000..57352d41 --- /dev/null +++ b/apps/indexer-events/src/types/enum.ts @@ -0,0 +1,4 @@ +export enum DataSource { + FAST_NEAR = 'FAST_NEAR', + NEAR_LAKE = 'NEAR_LAKE', +} diff --git a/apps/indexer-events/src/types/types.ts b/apps/indexer-events/src/types/types.ts index 00ed6657..a538a4ff 100644 --- a/apps/indexer-events/src/types/types.ts +++ b/apps/indexer-events/src/types/types.ts @@ -4,15 +4,16 @@ import { EventCause, Network } from 'nb-types'; export type Config = { cacheExpiry: number; + dataSource: string; dbCa: string; dbCert: string; dbKey: string; dbUrl: string; delta: number; + genesisHeight: number; insertLimit: number; network: Network; preloadSize: number; - rpcUrl: string; s3BucketName: string; s3Endpoint: null | types.EndpointConfig; s3RegionName: string; diff --git a/packages/nb-neardata/package.json b/packages/nb-neardata/package.json index 73df128a..654a03d8 100644 --- a/packages/nb-neardata/package.json +++ b/packages/nb-neardata/package.json @@ -1,6 +1,6 @@ { "name": "nb-neardata", - "version": "0.1.1", + "version": "0.1.2", "author": "NearBlocks", "license": "Business Source License 1.1", "type": "module", @@ -15,6 +15,7 @@ "eslint-config-custom-node": "*", "nb-logger": "*", "nb-tsconfig": "*", + "nb-types": "*", "nb-utils": "*", "typescript": "~5.2" } diff --git a/packages/nb-neardata/src/index.ts b/packages/nb-neardata/src/index.ts index a20113ba..c4bebe4e 100644 --- a/packages/nb-neardata/src/index.ts +++ b/packages/nb-neardata/src/index.ts @@ -3,6 +3,7 @@ import { Readable } from 'stream'; import axios from 'axios'; import { logger } from 'nb-logger'; +import { Network } from 'nb-types'; import { retry } from 'nb-utils'; import { Message } from './type.js'; @@ -11,15 +12,28 @@ export * from './type.js'; interface BlockReadable extends Readable { block?: number; + final?: number; + last?: number; } +export type BlockStreamConfig = { + limit?: number; + network: Network; + start: number; +}; + interface CamelCaseObject { [key: string]: unknown; } const delay = 700; const retries = 20; -const endpoint = 'https://mainnet.neardata.xyz/v0/block'; + +const endpoint = (network: string) => { + return network === Network.MAINNET + ? 'https://mainnet.neardata.xyz/v0' + : 'https://testnet.neardata.xyz/v0'; +}; const camelCase = (str: string): string => { return str.replace(/_([a-z])/g, (g) => g[1].toUpperCase()); @@ -43,10 +57,10 @@ export const camelCaseKeys = (obj: T): T => { return newObj as T; }; -const fetch = async (block: number) => { +const fetch = async (network: string, block: number): Promise => { return await retry( async () => { - const response = await axios.get(`${endpoint}/${block}`, { + const response = await axios.get(`${endpoint(network)}/block/${block}`, { timeout: 5000, }); @@ -56,33 +70,69 @@ const fetch = async (block: number) => { ); }; -export const streamBlock = (start: number, concurrency = 10) => { - const highWaterMark = concurrency * 2; +const fetchFinal = async (network: string): Promise => { + return await retry( + async () => { + const response = await axios.get( + `${endpoint(network)}/last_block/final`, + { + timeout: 5000, + }, + ); + + return response.data; + }, + { delay, retries }, + ); +}; + +export const streamBlock = (config: BlockStreamConfig) => { + const network = config.network; + const start = config.start; + const limit = config.limit ?? 10; const readable = new Readable({ - highWaterMark, + highWaterMark: limit * 2 + 1, objectMode: true, async read(this: BlockReadable) { - const block = this.block || start; - const promises: Promise[] = []; + try { + // eslint-disable-next-line no-constant-condition + whileLoop: while (true) { + const block = this.block || start; + const promises: Promise[] = []; + const hasLast = !this.last || block - this.last > 1000; + const hasFinal = !this.final || this.final - block > limit + 1; + + if (hasLast && hasFinal) { + this.final = (await fetchFinal(network)).block.header.height; + this.last = block; + } - for (let i = 0; i < concurrency; i++) { - promises.push(fetch(block + i)); - } + const concurrency = hasFinal ? limit : 1; - try { - const results = await Promise.all(promises); + for (let i = 0; i < concurrency; i++) { + promises.push(fetch(network, block + i)); + } - for (const result of results) { - if (result) { - if (!this.push(camelCaseKeys(result))) { - logger.warn(`paused: ${block}`); - this.pause(); + const results = await Promise.all(promises); + + for (const result of results) { + if (!result && concurrency === 1) { + this.block = block + 1; + continue whileLoop; + } + + if (result) { + if (!this.push(camelCaseKeys(result))) { + logger.warn(`paused: ${block}`); + this.pause(); + } } } - } - this.block = block + concurrency; + this.block = block + concurrency; + break whileLoop; + } } catch (error) { this.emit('error', error); this.push(null);