Skip to content

Commit

Permalink
Implement failover for RPC endpoints in watcher (#506)
Browse files Browse the repository at this point in the history
* Handle RPC endpoint server errors and switch failover endpoints

* Add config maxNewBlockRetries for switching to failover endpoint

* Upgrade package versions

* Move unknown events removal after event processing for historical sync

* Rename doFailOverEndpoints to switchClients
  • Loading branch information
nikugogoi authored May 9, 2024
1 parent 6d837dc commit c9696c3
Show file tree
Hide file tree
Showing 25 changed files with 195 additions and 62 deletions.
2 changes: 1 addition & 1 deletion lerna.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"packages": [
"packages/*"
],
"version": "0.2.85",
"version": "0.2.86",
"npmClient": "yarn",
"useWorkspaces": true,
"command": {
Expand Down
2 changes: 1 addition & 1 deletion packages/cache/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@cerc-io/cache",
"version": "0.2.85",
"version": "0.2.86",
"description": "Generic object cache",
"main": "dist/index.js",
"scripts": {
Expand Down
12 changes: 6 additions & 6 deletions packages/cli/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@cerc-io/cli",
"version": "0.2.85",
"version": "0.2.86",
"main": "dist/index.js",
"license": "AGPL-3.0",
"scripts": {
Expand All @@ -15,13 +15,13 @@
},
"dependencies": {
"@apollo/client": "^3.7.1",
"@cerc-io/cache": "^0.2.85",
"@cerc-io/ipld-eth-client": "^0.2.85",
"@cerc-io/cache": "^0.2.86",
"@cerc-io/ipld-eth-client": "^0.2.86",
"@cerc-io/libp2p": "^0.42.2-laconic-0.1.4",
"@cerc-io/nitro-node": "^0.1.15",
"@cerc-io/peer": "^0.2.85",
"@cerc-io/rpc-eth-client": "^0.2.85",
"@cerc-io/util": "^0.2.85",
"@cerc-io/peer": "^0.2.86",
"@cerc-io/rpc-eth-client": "^0.2.86",
"@cerc-io/util": "^0.2.86",
"@ethersproject/providers": "^5.4.4",
"@graphql-tools/utils": "^9.1.1",
"@ipld/dag-cbor": "^8.0.0",
Expand Down
36 changes: 33 additions & 3 deletions packages/cli/src/job-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import { hideBin } from 'yargs/helpers';
import 'reflect-metadata';
import assert from 'assert';
import { ConnectionOptions } from 'typeorm';
import { errors } from 'ethers';
import debug from 'debug';

import { JsonRpcProvider } from '@ethersproject/providers';
import {
Expand All @@ -20,10 +22,14 @@ import {
GraphWatcherInterface,
startMetricsServer,
Config,
UpstreamConfig
UpstreamConfig,
NEW_BLOCK_MAX_RETRIES_ERROR
} from '@cerc-io/util';

import { BaseCmd } from './base';
import { initClients } from './utils/index';

const log = debug('vulcanize:job-runner');

interface Arguments {
configFile: string;
Expand All @@ -33,6 +39,10 @@ export class JobRunnerCmd {
_argv?: Arguments;
_baseCmd: BaseCmd;

_currentEndpointIndex = {
rpcProviderEndpoint: 0
};

constructor () {
this._baseCmd = new BaseCmd();
}
Expand Down Expand Up @@ -110,7 +120,27 @@ export class JobRunnerCmd {
await indexer.addContracts();
}

const jobRunner = new JobRunner(config.jobQueue, indexer, jobQueue);
const jobRunner = new JobRunner(
config.jobQueue,
indexer,
jobQueue,
async (error: any) => {
// Check if it is a server error or timeout from ethers.js
// https://docs.ethers.org/v5/api/utils/logger/#errors--server-error
// https://docs.ethers.org/v5/api/utils/logger/#errors--timeout
if (error.code === errors.SERVER_ERROR || error.code === errors.TIMEOUT || error.message === NEW_BLOCK_MAX_RETRIES_ERROR) {
const oldRpcEndpoint = config.upstream.ethServer.rpcProviderEndpoints[this._currentEndpointIndex.rpcProviderEndpoint];
++this._currentEndpointIndex.rpcProviderEndpoint;

if (this._currentEndpointIndex.rpcProviderEndpoint === config.upstream.ethServer.rpcProviderEndpoints.length) {
this._currentEndpointIndex.rpcProviderEndpoint = 0;
}

const { ethClient, ethProvider } = await initClients(config, this._currentEndpointIndex);
indexer.switchClients({ ethClient, ethProvider });
log(`RPC endpoint ${oldRpcEndpoint} is not working; failing over to new RPC endpoint ${ethProvider.connection.url}`);
}
});

// Delete all active and pending (before completed) jobs to start job-runner without old queued jobs
await jobRunner.jobQueue.deleteAllJobs('completed');
Expand All @@ -121,7 +151,7 @@ export class JobRunnerCmd {
await startJobRunner(jobRunner);
jobRunner.handleShutdown();

await startMetricsServer(config, indexer);
await startMetricsServer(config, indexer, this._currentEndpointIndex);
}

_getArgv (): any {
Expand Down
12 changes: 7 additions & 5 deletions packages/cli/src/utils/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ export function readPeerId (filePath: string): PeerIdObj {
return JSON.parse(peerIdJson);
}

export const initClients = async (config: Config): Promise<{
export const initClients = async (config: Config, endpointIndexes = { rpcProviderEndpoint: 0 }): Promise<{
ethClient: EthClient,
ethProvider: providers.JsonRpcProvider
}> => {
Expand All @@ -32,30 +32,32 @@ export const initClients = async (config: Config): Promise<{
assert(dbConfig, 'Missing database config');
assert(upstreamConfig, 'Missing upstream config');

const { ethServer: { gqlApiEndpoint, rpcProviderEndpoint, rpcClient = false }, cache: cacheConfig } = upstreamConfig;
const { ethServer: { gqlApiEndpoint, rpcProviderEndpoints, rpcClient = false }, cache: cacheConfig } = upstreamConfig;

assert(rpcProviderEndpoint, 'Missing upstream ethServer.rpcProviderEndpoint');
assert(rpcProviderEndpoints, 'Missing upstream ethServer.rpcProviderEndpoints');
assert(rpcProviderEndpoints.length, 'No endpoints configured in ethServer.rpcProviderEndpoints');

const cache = await getCache(cacheConfig);

let ethClient: EthClient;

if (rpcClient) {
ethClient = new RpcEthClient({
rpcEndpoint: rpcProviderEndpoint,
rpcEndpoint: rpcProviderEndpoints[endpointIndexes.rpcProviderEndpoint],
cache
});
} else {
assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint');

// TODO: Implement failover for GQL endpoint
ethClient = new GqlEthClient({
gqlEndpoint: gqlApiEndpoint,
cache
});
}

const ethProvider = getCustomProvider({
url: rpcProviderEndpoint,
url: rpcProviderEndpoints[endpointIndexes.rpcProviderEndpoint],
allowGzip: true
});

Expand Down
4 changes: 2 additions & 2 deletions packages/codegen/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@cerc-io/codegen",
"version": "0.2.85",
"version": "0.2.86",
"description": "Code generator",
"private": true,
"main": "index.js",
Expand All @@ -20,7 +20,7 @@
},
"homepage": "https://github.com/cerc-io/watcher-ts#readme",
"dependencies": {
"@cerc-io/util": "^0.2.85",
"@cerc-io/util": "^0.2.86",
"@graphql-tools/load-files": "^6.5.2",
"@npmcli/package-json": "^5.0.0",
"@poanet/solidity-flattener": "https://github.com/vulcanize/solidity-flattener.git",
Expand Down
7 changes: 6 additions & 1 deletion packages/codegen/src/templates/config-template.handlebars
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@
[upstream]
[upstream.ethServer]
gqlApiEndpoint = "http://127.0.0.1:8082/graphql"
rpcProviderEndpoint = "http://127.0.0.1:8081"
rpcProviderEndpoints = [
"http://127.0.0.1:8081"
]

# Boolean flag to specify if rpc-eth-client should be used for RPC endpoint instead of ipld-eth-client (ipld-eth-server GQL client)
rpcClient = false
Expand Down Expand Up @@ -100,3 +102,6 @@
# Max block range of historical processing after which it waits for completion of events processing
# If set to -1 historical processing does not wait for events processing and completes till latest canonical block
historicalMaxFetchAhead = 10000

# Max number of retries to fetch new block after which watcher will failover to other RPC endpoints
maxNewBlockRetries = 3
9 changes: 9 additions & 0 deletions packages/codegen/src/templates/indexer-template.handlebars
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,15 @@ export class Indexer implements IndexerInterface {
await this._baseIndexer.fetchStateStatus();
}

switchClients ({ ethClient, ethProvider }: { ethClient: EthClient, ethProvider: BaseProvider }): void {
this._ethClient = ethClient;
this._ethProvider = ethProvider;
this._baseIndexer.switchClients({ ethClient, ethProvider });
{{#if (subgraphPath)}}
this._graphWatcher.switchClients({ ethClient, ethProvider });
{{/if}}
}

{{#if (subgraphPath)}}
async getMetaData (block: BlockHeight): Promise<ResultMeta | null> {
return this._baseIndexer.getMetaData(block);
Expand Down
10 changes: 5 additions & 5 deletions packages/codegen/src/templates/package-template.handlebars
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@
"homepage": "https://github.com/cerc-io/watcher-ts#readme",
"dependencies": {
"@apollo/client": "^3.3.19",
"@cerc-io/cli": "^0.2.85",
"@cerc-io/ipld-eth-client": "^0.2.85",
"@cerc-io/solidity-mapper": "^0.2.85",
"@cerc-io/util": "^0.2.85",
"@cerc-io/cli": "^0.2.86",
"@cerc-io/ipld-eth-client": "^0.2.86",
"@cerc-io/solidity-mapper": "^0.2.86",
"@cerc-io/util": "^0.2.86",
{{#if (subgraphPath)}}
"@cerc-io/graph-node": "^0.2.85",
"@cerc-io/graph-node": "^0.2.86",
{{/if}}
"@ethersproject/providers": "^5.4.4",
"debug": "^4.3.1",
Expand Down
10 changes: 5 additions & 5 deletions packages/graph-node/package.json
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
{
"name": "@cerc-io/graph-node",
"version": "0.2.85",
"version": "0.2.86",
"main": "dist/index.js",
"license": "AGPL-3.0",
"devDependencies": {
"@cerc-io/solidity-mapper": "^0.2.85",
"@cerc-io/solidity-mapper": "^0.2.86",
"@ethersproject/providers": "^5.4.4",
"@graphprotocol/graph-ts": "^0.22.0",
"@nomiclabs/hardhat-ethers": "^2.0.2",
Expand Down Expand Up @@ -51,9 +51,9 @@
"dependencies": {
"@apollo/client": "^3.3.19",
"@cerc-io/assemblyscript": "0.19.10-watcher-ts-0.1.2",
"@cerc-io/cache": "^0.2.85",
"@cerc-io/ipld-eth-client": "^0.2.85",
"@cerc-io/util": "^0.2.85",
"@cerc-io/cache": "^0.2.86",
"@cerc-io/ipld-eth-client": "^0.2.86",
"@cerc-io/util": "^0.2.86",
"@types/json-diff": "^0.5.2",
"@types/yargs": "^17.0.0",
"bn.js": "^4.11.9",
Expand Down
5 changes: 5 additions & 0 deletions packages/graph-node/src/watcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,11 @@ export class GraphWatcher {
this.fillEventSignatureMap();
}

async switchClients ({ ethClient, ethProvider }: { ethClient: EthClient, ethProvider: providers.BaseProvider }) {
this._ethClient = ethClient;
this._ethProvider = ethProvider;
}

fillEventSignatureMap () {
this._dataSources.forEach(contract => {
if (contract.kind === 'ethereum/contract' && contract.mapping.kind === 'ethereum/events') {
Expand Down
5 changes: 5 additions & 0 deletions packages/graph-node/test/utils/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import assert from 'assert';
import { DeepPartial, FindConditions, FindManyOptions } from 'typeorm';
import { providers } from 'ethers';

import {
IndexerInterface,
Expand Down Expand Up @@ -338,4 +339,8 @@ export class Indexer implements IndexerInterface {
async getFullTransactions (txHashList: string[]): Promise<EthFullTransaction[]> {
return [];
}

switchClients ({ ethClient, ethProvider }: { ethClient: EthClient, ethProvider: providers.BaseProvider }): void {
return undefined;
}
}
6 changes: 3 additions & 3 deletions packages/ipld-eth-client/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@cerc-io/ipld-eth-client",
"version": "0.2.85",
"version": "0.2.86",
"description": "IPLD ETH Client",
"main": "dist/index.js",
"scripts": {
Expand All @@ -20,8 +20,8 @@
"homepage": "https://github.com/cerc-io/watcher-ts#readme",
"dependencies": {
"@apollo/client": "^3.7.1",
"@cerc-io/cache": "^0.2.85",
"@cerc-io/util": "^0.2.85",
"@cerc-io/cache": "^0.2.86",
"@cerc-io/util": "^0.2.86",
"cross-fetch": "^3.1.4",
"debug": "^4.3.1",
"ethers": "^5.4.4",
Expand Down
2 changes: 1 addition & 1 deletion packages/peer/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@cerc-io/peer",
"version": "0.2.85",
"version": "0.2.86",
"description": "libp2p module",
"main": "dist/index.js",
"exports": "./dist/index.js",
Expand Down
8 changes: 4 additions & 4 deletions packages/rpc-eth-client/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@cerc-io/rpc-eth-client",
"version": "0.2.85",
"version": "0.2.86",
"description": "RPC ETH Client",
"main": "dist/index.js",
"scripts": {
Expand All @@ -19,9 +19,9 @@
},
"homepage": "https://github.com/cerc-io/watcher-ts#readme",
"dependencies": {
"@cerc-io/cache": "^0.2.85",
"@cerc-io/ipld-eth-client": "^0.2.85",
"@cerc-io/util": "^0.2.85",
"@cerc-io/cache": "^0.2.86",
"@cerc-io/ipld-eth-client": "^0.2.86",
"@cerc-io/util": "^0.2.86",
"chai": "^4.3.4",
"ethers": "^5.4.4",
"left-pad": "^1.3.0",
Expand Down
2 changes: 1 addition & 1 deletion packages/solidity-mapper/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@cerc-io/solidity-mapper",
"version": "0.2.85",
"version": "0.2.86",
"main": "dist/index.js",
"license": "AGPL-3.0",
"devDependencies": {
Expand Down
2 changes: 1 addition & 1 deletion packages/test/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@cerc-io/test",
"version": "0.2.85",
"version": "0.2.86",
"main": "dist/index.js",
"license": "AGPL-3.0",
"private": true,
Expand Down
2 changes: 1 addition & 1 deletion packages/tracing-client/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@cerc-io/tracing-client",
"version": "0.2.85",
"version": "0.2.86",
"description": "ETH VM tracing client",
"main": "dist/index.js",
"scripts": {
Expand Down
8 changes: 4 additions & 4 deletions packages/util/package.json
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
{
"name": "@cerc-io/util",
"version": "0.2.85",
"version": "0.2.86",
"main": "dist/index.js",
"license": "AGPL-3.0",
"dependencies": {
"@apollo/utils.keyvaluecache": "^1.0.1",
"@cerc-io/nitro-node": "^0.1.15",
"@cerc-io/peer": "^0.2.85",
"@cerc-io/solidity-mapper": "^0.2.85",
"@cerc-io/peer": "^0.2.86",
"@cerc-io/solidity-mapper": "^0.2.86",
"@cerc-io/ts-channel": "1.0.3-ts-nitro-0.1.1",
"@ethersproject/properties": "^5.7.0",
"@ethersproject/providers": "^5.4.4",
Expand Down Expand Up @@ -52,7 +52,7 @@
"yargs": "^17.0.1"
},
"devDependencies": {
"@cerc-io/cache": "^0.2.85",
"@cerc-io/cache": "^0.2.86",
"@nomiclabs/hardhat-waffle": "^2.0.1",
"@types/bunyan": "^1.8.8",
"@types/express": "^4.17.14",
Expand Down
Loading

0 comments on commit c9696c3

Please sign in to comment.