Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement failover for RPC endpoints in watcher #506

Merged
merged 5 commits into from
May 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lerna.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"packages": [
"packages/*"
],
"version": "0.2.85",
"version": "0.2.86",
"npmClient": "yarn",
"useWorkspaces": true,
"command": {
Expand Down
2 changes: 1 addition & 1 deletion packages/cache/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@cerc-io/cache",
"version": "0.2.85",
"version": "0.2.86",
"description": "Generic object cache",
"main": "dist/index.js",
"scripts": {
Expand Down
12 changes: 6 additions & 6 deletions packages/cli/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@cerc-io/cli",
"version": "0.2.85",
"version": "0.2.86",
"main": "dist/index.js",
"license": "AGPL-3.0",
"scripts": {
Expand All @@ -15,13 +15,13 @@
},
"dependencies": {
"@apollo/client": "^3.7.1",
"@cerc-io/cache": "^0.2.85",
"@cerc-io/ipld-eth-client": "^0.2.85",
"@cerc-io/cache": "^0.2.86",
"@cerc-io/ipld-eth-client": "^0.2.86",
"@cerc-io/libp2p": "^0.42.2-laconic-0.1.4",
"@cerc-io/nitro-node": "^0.1.15",
"@cerc-io/peer": "^0.2.85",
"@cerc-io/rpc-eth-client": "^0.2.85",
"@cerc-io/util": "^0.2.85",
"@cerc-io/peer": "^0.2.86",
"@cerc-io/rpc-eth-client": "^0.2.86",
"@cerc-io/util": "^0.2.86",
"@ethersproject/providers": "^5.4.4",
"@graphql-tools/utils": "^9.1.1",
"@ipld/dag-cbor": "^8.0.0",
Expand Down
36 changes: 33 additions & 3 deletions packages/cli/src/job-runner.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import { hideBin } from 'yargs/helpers';
import 'reflect-metadata';
import assert from 'assert';
import { ConnectionOptions } from 'typeorm';
import { errors } from 'ethers';
import debug from 'debug';

import { JsonRpcProvider } from '@ethersproject/providers';
import {
Expand All @@ -20,10 +22,14 @@ import {
GraphWatcherInterface,
startMetricsServer,
Config,
UpstreamConfig
UpstreamConfig,
NEW_BLOCK_MAX_RETRIES_ERROR
} from '@cerc-io/util';

import { BaseCmd } from './base';
import { initClients } from './utils/index';

const log = debug('vulcanize:job-runner');

interface Arguments {
configFile: string;
Expand All @@ -33,6 +39,10 @@ export class JobRunnerCmd {
_argv?: Arguments;
_baseCmd: BaseCmd;

_currentEndpointIndex = {
rpcProviderEndpoint: 0
};

constructor () {
this._baseCmd = new BaseCmd();
}
Expand Down Expand Up @@ -110,7 +120,27 @@ export class JobRunnerCmd {
await indexer.addContracts();
}

const jobRunner = new JobRunner(config.jobQueue, indexer, jobQueue);
const jobRunner = new JobRunner(
config.jobQueue,
indexer,
jobQueue,
async (error: any) => {
// Check if it is a server error or timeout from ethers.js
// https://docs.ethers.org/v5/api/utils/logger/#errors--server-error
// https://docs.ethers.org/v5/api/utils/logger/#errors--timeout
if (error.code === errors.SERVER_ERROR || error.code === errors.TIMEOUT || error.message === NEW_BLOCK_MAX_RETRIES_ERROR) {
const oldRpcEndpoint = config.upstream.ethServer.rpcProviderEndpoints[this._currentEndpointIndex.rpcProviderEndpoint];
++this._currentEndpointIndex.rpcProviderEndpoint;

if (this._currentEndpointIndex.rpcProviderEndpoint === config.upstream.ethServer.rpcProviderEndpoints.length) {
this._currentEndpointIndex.rpcProviderEndpoint = 0;
}

const { ethClient, ethProvider } = await initClients(config, this._currentEndpointIndex);
indexer.switchClients({ ethClient, ethProvider });
log(`RPC endpoint ${oldRpcEndpoint} is not working; failing over to new RPC endpoint ${ethProvider.connection.url}`);
}
});

// Delete all active and pending (before completed) jobs to start job-runner without old queued jobs
await jobRunner.jobQueue.deleteAllJobs('completed');
Expand All @@ -121,7 +151,7 @@ export class JobRunnerCmd {
await startJobRunner(jobRunner);
jobRunner.handleShutdown();

await startMetricsServer(config, indexer);
await startMetricsServer(config, indexer, this._currentEndpointIndex);
}

_getArgv (): any {
Expand Down
12 changes: 7 additions & 5 deletions packages/cli/src/utils/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ export function readPeerId (filePath: string): PeerIdObj {
return JSON.parse(peerIdJson);
}

export const initClients = async (config: Config): Promise<{
export const initClients = async (config: Config, endpointIndexes = { rpcProviderEndpoint: 0 }): Promise<{
ethClient: EthClient,
ethProvider: providers.JsonRpcProvider
}> => {
Expand All @@ -32,30 +32,32 @@ export const initClients = async (config: Config): Promise<{
assert(dbConfig, 'Missing database config');
assert(upstreamConfig, 'Missing upstream config');

const { ethServer: { gqlApiEndpoint, rpcProviderEndpoint, rpcClient = false }, cache: cacheConfig } = upstreamConfig;
const { ethServer: { gqlApiEndpoint, rpcProviderEndpoints, rpcClient = false }, cache: cacheConfig } = upstreamConfig;

assert(rpcProviderEndpoint, 'Missing upstream ethServer.rpcProviderEndpoint');
assert(rpcProviderEndpoints, 'Missing upstream ethServer.rpcProviderEndpoints');
assert(rpcProviderEndpoints.length, 'No endpoints configured in ethServer.rpcProviderEndpoints');

const cache = await getCache(cacheConfig);

let ethClient: EthClient;

if (rpcClient) {
ethClient = new RpcEthClient({
rpcEndpoint: rpcProviderEndpoint,
rpcEndpoint: rpcProviderEndpoints[endpointIndexes.rpcProviderEndpoint],
cache
});
} else {
assert(gqlApiEndpoint, 'Missing upstream ethServer.gqlApiEndpoint');

// TODO: Implement failover for GQL endpoint
ethClient = new GqlEthClient({
gqlEndpoint: gqlApiEndpoint,
cache
});
}

const ethProvider = getCustomProvider({
url: rpcProviderEndpoint,
url: rpcProviderEndpoints[endpointIndexes.rpcProviderEndpoint],
allowGzip: true
});

Expand Down
4 changes: 2 additions & 2 deletions packages/codegen/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@cerc-io/codegen",
"version": "0.2.85",
"version": "0.2.86",
"description": "Code generator",
"private": true,
"main": "index.js",
Expand All @@ -20,7 +20,7 @@
},
"homepage": "https://github.com/cerc-io/watcher-ts#readme",
"dependencies": {
"@cerc-io/util": "^0.2.85",
"@cerc-io/util": "^0.2.86",
"@graphql-tools/load-files": "^6.5.2",
"@npmcli/package-json": "^5.0.0",
"@poanet/solidity-flattener": "https://github.com/vulcanize/solidity-flattener.git",
Expand Down
7 changes: 6 additions & 1 deletion packages/codegen/src/templates/config-template.handlebars
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,9 @@
[upstream]
[upstream.ethServer]
gqlApiEndpoint = "http://127.0.0.1:8082/graphql"
rpcProviderEndpoint = "http://127.0.0.1:8081"
rpcProviderEndpoints = [
"http://127.0.0.1:8081"
]

# Boolean flag to specify if rpc-eth-client should be used for RPC endpoint instead of ipld-eth-client (ipld-eth-server GQL client)
rpcClient = false
Expand Down Expand Up @@ -100,3 +102,6 @@
# Max block range of historical processing after which it waits for completion of events processing
# If set to -1 historical processing does not wait for events processing and completes till latest canonical block
historicalMaxFetchAhead = 10000

# Max number of retries to fetch new block after which watcher will failover to other RPC endpoints
maxNewBlockRetries = 3
9 changes: 9 additions & 0 deletions packages/codegen/src/templates/indexer-template.handlebars
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,15 @@ export class Indexer implements IndexerInterface {
await this._baseIndexer.fetchStateStatus();
}

switchClients ({ ethClient, ethProvider }: { ethClient: EthClient, ethProvider: BaseProvider }): void {
this._ethClient = ethClient;
this._ethProvider = ethProvider;
this._baseIndexer.switchClients({ ethClient, ethProvider });
{{#if (subgraphPath)}}
this._graphWatcher.switchClients({ ethClient, ethProvider });
{{/if}}
}

{{#if (subgraphPath)}}
async getMetaData (block: BlockHeight): Promise<ResultMeta | null> {
return this._baseIndexer.getMetaData(block);
Expand Down
10 changes: 5 additions & 5 deletions packages/codegen/src/templates/package-template.handlebars
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@
"homepage": "https://github.com/cerc-io/watcher-ts#readme",
"dependencies": {
"@apollo/client": "^3.3.19",
"@cerc-io/cli": "^0.2.85",
"@cerc-io/ipld-eth-client": "^0.2.85",
"@cerc-io/solidity-mapper": "^0.2.85",
"@cerc-io/util": "^0.2.85",
"@cerc-io/cli": "^0.2.86",
"@cerc-io/ipld-eth-client": "^0.2.86",
"@cerc-io/solidity-mapper": "^0.2.86",
"@cerc-io/util": "^0.2.86",
{{#if (subgraphPath)}}
"@cerc-io/graph-node": "^0.2.85",
"@cerc-io/graph-node": "^0.2.86",
{{/if}}
"@ethersproject/providers": "^5.4.4",
"debug": "^4.3.1",
Expand Down
10 changes: 5 additions & 5 deletions packages/graph-node/package.json
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
{
"name": "@cerc-io/graph-node",
"version": "0.2.85",
"version": "0.2.86",
"main": "dist/index.js",
"license": "AGPL-3.0",
"devDependencies": {
"@cerc-io/solidity-mapper": "^0.2.85",
"@cerc-io/solidity-mapper": "^0.2.86",
"@ethersproject/providers": "^5.4.4",
"@graphprotocol/graph-ts": "^0.22.0",
"@nomiclabs/hardhat-ethers": "^2.0.2",
Expand Down Expand Up @@ -51,9 +51,9 @@
"dependencies": {
"@apollo/client": "^3.3.19",
"@cerc-io/assemblyscript": "0.19.10-watcher-ts-0.1.2",
"@cerc-io/cache": "^0.2.85",
"@cerc-io/ipld-eth-client": "^0.2.85",
"@cerc-io/util": "^0.2.85",
"@cerc-io/cache": "^0.2.86",
"@cerc-io/ipld-eth-client": "^0.2.86",
"@cerc-io/util": "^0.2.86",
"@types/json-diff": "^0.5.2",
"@types/yargs": "^17.0.0",
"bn.js": "^4.11.9",
Expand Down
5 changes: 5 additions & 0 deletions packages/graph-node/src/watcher.ts
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,11 @@ export class GraphWatcher {
this.fillEventSignatureMap();
}

async switchClients ({ ethClient, ethProvider }: { ethClient: EthClient, ethProvider: providers.BaseProvider }) {
this._ethClient = ethClient;
this._ethProvider = ethProvider;
}

fillEventSignatureMap () {
this._dataSources.forEach(contract => {
if (contract.kind === 'ethereum/contract' && contract.mapping.kind === 'ethereum/events') {
Expand Down
5 changes: 5 additions & 0 deletions packages/graph-node/test/utils/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import assert from 'assert';
import { DeepPartial, FindConditions, FindManyOptions } from 'typeorm';
import { providers } from 'ethers';

import {
IndexerInterface,
Expand Down Expand Up @@ -338,4 +339,8 @@ export class Indexer implements IndexerInterface {
async getFullTransactions (txHashList: string[]): Promise<EthFullTransaction[]> {
return [];
}

switchClients ({ ethClient, ethProvider }: { ethClient: EthClient, ethProvider: providers.BaseProvider }): void {
return undefined;
}
}
6 changes: 3 additions & 3 deletions packages/ipld-eth-client/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@cerc-io/ipld-eth-client",
"version": "0.2.85",
"version": "0.2.86",
"description": "IPLD ETH Client",
"main": "dist/index.js",
"scripts": {
Expand All @@ -20,8 +20,8 @@
"homepage": "https://github.com/cerc-io/watcher-ts#readme",
"dependencies": {
"@apollo/client": "^3.7.1",
"@cerc-io/cache": "^0.2.85",
"@cerc-io/util": "^0.2.85",
"@cerc-io/cache": "^0.2.86",
"@cerc-io/util": "^0.2.86",
"cross-fetch": "^3.1.4",
"debug": "^4.3.1",
"ethers": "^5.4.4",
Expand Down
2 changes: 1 addition & 1 deletion packages/peer/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@cerc-io/peer",
"version": "0.2.85",
"version": "0.2.86",
"description": "libp2p module",
"main": "dist/index.js",
"exports": "./dist/index.js",
Expand Down
8 changes: 4 additions & 4 deletions packages/rpc-eth-client/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@cerc-io/rpc-eth-client",
"version": "0.2.85",
"version": "0.2.86",
"description": "RPC ETH Client",
"main": "dist/index.js",
"scripts": {
Expand All @@ -19,9 +19,9 @@
},
"homepage": "https://github.com/cerc-io/watcher-ts#readme",
"dependencies": {
"@cerc-io/cache": "^0.2.85",
"@cerc-io/ipld-eth-client": "^0.2.85",
"@cerc-io/util": "^0.2.85",
"@cerc-io/cache": "^0.2.86",
"@cerc-io/ipld-eth-client": "^0.2.86",
"@cerc-io/util": "^0.2.86",
"chai": "^4.3.4",
"ethers": "^5.4.4",
"left-pad": "^1.3.0",
Expand Down
2 changes: 1 addition & 1 deletion packages/solidity-mapper/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@cerc-io/solidity-mapper",
"version": "0.2.85",
"version": "0.2.86",
"main": "dist/index.js",
"license": "AGPL-3.0",
"devDependencies": {
Expand Down
2 changes: 1 addition & 1 deletion packages/test/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@cerc-io/test",
"version": "0.2.85",
"version": "0.2.86",
"main": "dist/index.js",
"license": "AGPL-3.0",
"private": true,
Expand Down
2 changes: 1 addition & 1 deletion packages/tracing-client/package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@cerc-io/tracing-client",
"version": "0.2.85",
"version": "0.2.86",
"description": "ETH VM tracing client",
"main": "dist/index.js",
"scripts": {
Expand Down
8 changes: 4 additions & 4 deletions packages/util/package.json
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
{
"name": "@cerc-io/util",
"version": "0.2.85",
"version": "0.2.86",
"main": "dist/index.js",
"license": "AGPL-3.0",
"dependencies": {
"@apollo/utils.keyvaluecache": "^1.0.1",
"@cerc-io/nitro-node": "^0.1.15",
"@cerc-io/peer": "^0.2.85",
"@cerc-io/solidity-mapper": "^0.2.85",
"@cerc-io/peer": "^0.2.86",
"@cerc-io/solidity-mapper": "^0.2.86",
"@cerc-io/ts-channel": "1.0.3-ts-nitro-0.1.1",
"@ethersproject/properties": "^5.7.0",
"@ethersproject/providers": "^5.4.4",
Expand Down Expand Up @@ -52,7 +52,7 @@
"yargs": "^17.0.1"
},
"devDependencies": {
"@cerc-io/cache": "^0.2.85",
"@cerc-io/cache": "^0.2.86",
"@nomiclabs/hardhat-waffle": "^2.0.1",
"@types/bunyan": "^1.8.8",
"@types/express": "^4.17.14",
Expand Down
Loading
Loading