Skip to content

Commit

Permalink
feat(indexer): add fast near in indexers (#512)
Browse files Browse the repository at this point in the history
  • Loading branch information
cherrybarry authored Sep 30, 2024
1 parent c91f2c6 commit 3f439ef
Show file tree
Hide file tree
Showing 32 changed files with 279 additions and 82 deletions.
2 changes: 1 addition & 1 deletion apps/indexer-balance/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ Balance indexer collects the info about native NEAR token balance changes 📊 (

```
DATABASE_URL=
RPC_URL=
NETWORK=mainnet
AWS_ACCESS_KEY_ID=
AWS_SECRET_ACCESS_KEY=
Expand All @@ -16,5 +15,6 @@ DATABASE_CA=
DATABASE_CERT=
DATABASE_KEY=
BALANCE_START_BLOCK=
BALANCE_DATA_SOURCE= # NEAR_LAKE | FAST_NEAR
SENTRY_DSN=
```
1 change: 1 addition & 0 deletions apps/indexer-balance/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
"nb-lake": "*",
"nb-logger": "*",
"nb-near": "*",
"nb-neardata": "*",
"nb-tsconfig": "*",
"nb-types": "*",
"nb-utils": "*",
Expand Down
14 changes: 10 additions & 4 deletions apps/indexer-balance/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,14 @@ import { cleanEnv, num, str, url } from 'envalid';
import { types } from 'nb-lake';
import { Network } from 'nb-types';

import { DataSource } from '#types/enum';
import { Config } from '#types/types';

const env = cleanEnv(process.env, {
BALANCE_DATA_SOURCE: str({
choices: [DataSource.FAST_NEAR, DataSource.NEAR_LAKE],
default: DataSource.NEAR_LAKE,
}),
BALANCE_START_BLOCK: num({ default: 0 }),
DATABASE_CA: str({ default: '' }),
DATABASE_CERT: str({ default: '' }),
Expand All @@ -14,11 +19,11 @@ const env = cleanEnv(process.env, {
NETWORK: str({
choices: [Network.MAINNET, Network.TESTNET],
}),
RPC_URL: str(),
S3_ENDPOINT: url({ default: '' }),
SENTRY_DSN: str({ default: '' }),
});

const genesisHeight = env.NETWORK === Network.MAINNET ? 9_820_210 : 42_376_888;
let s3Endpoint: null | types.EndpointConfig = null;
const s3BucketName =
env.NETWORK === Network.MAINNET
Expand All @@ -37,15 +42,16 @@ if (env.S3_ENDPOINT) {

const config: Config = {
cacheExpiry: 60 * 60, // cache expiry time in seconds (1 hour)
dataSource: env.BALANCE_DATA_SOURCE,
dbCa: env.DATABASE_CA,
dbCert: env.DATABASE_CERT,
dbKey: env.DATABASE_KEY,
dbUrl: env.DATABASE_URL,
delta: 100, // start from blocks earlier on sync interuption
delta: 1_000, // start from blocks earlier on sync interuption
genesisHeight,
insertLimit: 1_000, // records to insert into the db at a time
network: env.NETWORK,
preloadSize: 10, // blocks to preload in nearlake
rpcUrl: env.RPC_URL,
preloadSize: 100, // blocks to preload in nearlake
s3BucketName,
s3Endpoint,
s3RegionName: 'eu-central-1',
Expand Down
5 changes: 4 additions & 1 deletion apps/indexer-balance/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ import { syncData } from '#services/stream';

(async () => {
try {
logger.info({ network: config.network }, 'initializing balance indexer...');
logger.info(
{ data_source: config.dataSource, network: config.network },
'initializing balance indexer...',
);
logger.info('syncing balance data...');
await syncData();
} catch (error) {
Expand Down
5 changes: 0 additions & 5 deletions apps/indexer-balance/src/libs/near.ts

This file was deleted.

47 changes: 39 additions & 8 deletions apps/indexer-balance/src/services/stream.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import { stream, types } from 'nb-lake';
import { logger } from 'nb-logger';
import { streamBlock } from 'nb-neardata';

import config from '#config';
import knex from '#libs/knex';
import sentry from '#libs/sentry';
import { storeBalance } from '#services/balance';
import { DataSource } from '#types/enum';

const fetchBlocks = async (block: number, limit: number) => {
try {
Expand Down Expand Up @@ -40,16 +42,45 @@ export const syncData = async () => {
const settings = await knex('settings').where({ key: balanceKey }).first();
const latestBlock = settings?.value?.sync;

if (!lakeConfig.startBlockHeight && latestBlock) {
const next = +latestBlock - config.delta;
if (config.dataSource === DataSource.FAST_NEAR) {
let startBlockHeight = config.startBlockHeight;

logger.info(`last synced block: ${latestBlock}`);
logger.info(`syncing from block: ${next}`);
lakeConfig.startBlockHeight = next;
}
if (!startBlockHeight && latestBlock) {
const next = +latestBlock - config.delta / 2;
startBlockHeight = next;
logger.info(`last synced block: ${latestBlock}`);
logger.info(`syncing from block: ${next}`);
}

const stream = streamBlock({
limit: config.preloadSize / 2,
network: config.network,
start: startBlockHeight || config.genesisHeight,
});

for await (const message of stream) {
await onMessage(message);
}

for await (const message of stream(lakeConfig)) {
await onMessage(message);
stream.on('end', () => {
logger.error('stream ended');
process.exit();
});
stream.on('error', (error: Error) => {
logger.error(error);
process.exit();
});
} else {
if (!lakeConfig.startBlockHeight && latestBlock) {
const next = +latestBlock - config.delta;
lakeConfig.startBlockHeight = next;
logger.info(`last synced block: ${latestBlock}`);
logger.info(`syncing from block: ${next}`);
}

for await (const message of stream(lakeConfig)) {
await onMessage(message);
}
}
};

Expand Down
4 changes: 4 additions & 0 deletions apps/indexer-balance/src/types/enum.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
export enum DataSource {
FAST_NEAR = 'FAST_NEAR',
NEAR_LAKE = 'NEAR_LAKE',
}
3 changes: 2 additions & 1 deletion apps/indexer-balance/src/types/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@ import { Network, StateChangeCauseView, StateChangeValueView } from 'nb-types';

export type Config = {
cacheExpiry: number;
dataSource: string;
dbCa: string;
dbCert: string;
dbKey: string;
dbUrl: string;
delta: number;
genesisHeight: number;
insertLimit: number;
network: Network;
preloadSize: number;
rpcUrl: string;
s3BucketName: string;
s3Endpoint: null | types.EndpointConfig;
s3RegionName: string;
Expand Down
1 change: 1 addition & 0 deletions apps/indexer-base/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@ DATABASE_CERT=
DATABASE_KEY=
REDIS_URL=
BASE_START_BLOCK=
BASE_DATA_SOURCE= # NEAR_LAKE | FAST_NEAR
SENTRY_DSN=
```
1 change: 1 addition & 0 deletions apps/indexer-base/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
"nb-json": "*",
"nb-knex": "*",
"nb-logger": "*",
"nb-neardata": "*",
"nb-redis": "*",
"nb-tsconfig": "*",
"nb-types": "*",
Expand Down
6 changes: 6 additions & 0 deletions apps/indexer-base/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,14 @@ import { types } from 'near-lake-framework';

import { Network } from 'nb-types';

import { DataSource } from '#types/enum';
import { Config } from '#types/types';

const env = cleanEnv(process.env, {
BASE_DATA_SOURCE: str({
choices: [DataSource.FAST_NEAR, DataSource.NEAR_LAKE],
default: DataSource.NEAR_LAKE,
}),
BASE_START_BLOCK: num({ default: 0 }),
DATABASE_CA: str({ default: '' }),
DATABASE_CERT: str({ default: '' }),
Expand Down Expand Up @@ -46,6 +51,7 @@ if (env.S3_ENDPOINT) {

const config: Config = {
cacheExpiry: 5 * 60, // cache expiry time in seconds (5 min)
dataSource: env.BASE_DATA_SOURCE,
dbCa: env.DATABASE_CA,
dbCert: env.DATABASE_CERT,
dbKey: env.DATABASE_KEY,
Expand Down
5 changes: 4 additions & 1 deletion apps/indexer-base/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,10 @@ import { syncData } from '#services/stream';

(async () => {
try {
logger.info({ network: config.network }, 'initializing base indexer...');
logger.info(
{ data_source: config.dataSource, network: config.network },
'initializing base indexer...',
);
logger.info('syncing genesis data...');
await syncGenesis();
logger.info('syncing blockchain data...');
Expand Down
47 changes: 39 additions & 8 deletions apps/indexer-base/src/services/stream.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { stream, types } from 'near-lake-framework';

import { logger } from 'nb-logger';
import { streamBlock } from 'nb-neardata';

import config from '#config';
import knex from '#libs/knex';
Expand All @@ -13,6 +14,7 @@ import { storeChunks } from '#services/chunk';
import { storeExecutionOutcomes } from '#services/executionOutcome';
import { storeReceipts } from '#services/receipt';
import { storeTransactions } from '#services/transaction';
import { DataSource } from '#types/enum';

const lakeConfig: types.LakeConfig = {
blocksPreloadPoolSize: config.preloadSize,
Expand All @@ -29,16 +31,45 @@ if (config.s3Endpoint) {
export const syncData = async () => {
const block = await knex('blocks').orderBy('block_height', 'desc').first();

if (!lakeConfig.startBlockHeight && block) {
const next = +block.block_height - config.delta;
if (config.dataSource === DataSource.FAST_NEAR) {
let startBlockHeight = config.startBlockHeight;

logger.info(`last synced block: ${block.block_height}`);
logger.info(`syncing from block: ${next}`);
lakeConfig.startBlockHeight = next;
}
if (!startBlockHeight && block) {
const next = +block.block_height - config.delta / 2;
startBlockHeight = next;
logger.info(`last synced block: ${block.block_height}`);
logger.info(`syncing from block: ${next}`);
}

const stream = streamBlock({
limit: config.preloadSize / 2,
network: config.network,
start: startBlockHeight || config.genesisHeight,
});

for await (const message of stream) {
await onMessage(message);
}

stream.on('end', () => {
logger.error('stream ended');
process.exit();
});
stream.on('error', (error: Error) => {
logger.error(error);
process.exit();
});
} else {
if (!lakeConfig.startBlockHeight && block) {
const next = +block.block_height - config.delta;
lakeConfig.startBlockHeight = next;
logger.info(`last synced block: ${block.block_height}`);
logger.info(`syncing from block: ${next}`);
}

for await (const message of stream(lakeConfig)) {
await onMessage(message);
for await (const message of stream(lakeConfig)) {
await onMessage(message);
}
}
};

Expand Down
4 changes: 4 additions & 0 deletions apps/indexer-base/src/types/enum.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
export enum DataSource {
FAST_NEAR = 'FAST_NEAR',
NEAR_LAKE = 'NEAR_LAKE',
}
1 change: 1 addition & 0 deletions apps/indexer-base/src/types/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import { AccessKeyPermissionKind, ActionKind, Network } from 'nb-types';

export interface Config {
cacheExpiry: number;
dataSource: string;
dbCa: string;
dbCert: string;
dbKey: string;
Expand Down
2 changes: 1 addition & 1 deletion apps/indexer-dex/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,11 @@ NETWORK=mainnet
S3_ENDPOINT=
AWS_ACCESS_KEY_ID=
AWS_SECRET_ACCESS_KEY=
DATA_SOURCE= # NEAR_LAKE | FAST_NEAR
# Optional
DATABASE_CA=
DATABASE_CERT=
DATABASE_KEY=
DEX_DATA_SOURCE= # NEAR_LAKE | FAST_NEAR
SENTRY_DSN=
```
12 changes: 7 additions & 5 deletions apps/indexer-dex/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,21 +7,22 @@ import { DataSource } from '#types/enum';
import { Config } from '#types/types';

const env = cleanEnv(process.env, {
DATA_SOURCE: str({
choices: [DataSource.FAST_NEAR, DataSource.NEAR_LAKE],
default: DataSource.NEAR_LAKE,
}),
DATABASE_CA: str({ default: '' }),
DATABASE_CERT: str({ default: '' }),
DATABASE_KEY: str({ default: '' }),
DATABASE_URL: str(),
DEX_DATA_SOURCE: str({
choices: [DataSource.FAST_NEAR, DataSource.NEAR_LAKE],
default: DataSource.NEAR_LAKE,
}),
NETWORK: str({
choices: [Network.MAINNET, Network.TESTNET],
}),
S3_ENDPOINT: url({ default: '' }),
SENTRY_DSN: str({ default: '' }),
});

const genesisHeight = env.NETWORK === Network.MAINNET ? 9_820_210 : 42_376_888;
let s3Endpoint: null | types.EndpointConfig = null;
const s3BucketName =
env.NETWORK === Network.MAINNET
Expand All @@ -39,12 +40,13 @@ if (env.S3_ENDPOINT) {
}

const config: Config = {
dataSource: env.DATA_SOURCE,
dataSource: env.DEX_DATA_SOURCE,
dbCa: env.DATABASE_CA,
dbCert: env.DATABASE_CERT,
dbKey: env.DATABASE_KEY,
dbUrl: env.DATABASE_URL,
delta: 500,
genesisHeight,
NEAR_TOKEN: 'wrap.near',
network: env.NETWORK,
preloadSize: 50,
Expand Down
5 changes: 4 additions & 1 deletion apps/indexer-dex/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,10 @@ import { syncData } from '#services/stream';
(async () => {
try {
if (config.network === Network.MAINNET) {
logger.info({ network: config.network }, 'initializing dex indexer...');
logger.info(
{ data_source: config.dataSource, network: config.network },
'initializing dex indexer...',
);
logger.info('syncing dex data...');
await syncData();
}
Expand Down
5 changes: 5 additions & 0 deletions apps/indexer-dex/src/libs/big.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import Big from 'big.js';

Big.DP = 24;

export default Big;
Loading

0 comments on commit 3f439ef

Please sign in to comment.