Skip to content

Commit

Permalink
Add ETH RPC API to watcher server (#535)
Browse files Browse the repository at this point in the history
* Add ETH RPC API to watcher server

* Add eth_call API handler

* Add error handling to eth_call handler

* Parse block tag in eth_call handler

* Add a flag to enable ETH RPC server

* Fix lint errors

* Update block tag parsing
  • Loading branch information
prathamesh0 authored Sep 13, 2024
1 parent ea5ff93 commit b46d881
Show file tree
Hide file tree
Showing 9 changed files with 241 additions and 6 deletions.
8 changes: 6 additions & 2 deletions packages/cli/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,8 @@ import {
readParty,
UpstreamConfig,
fillBlocks,
createGQLLogger
createGQLLogger,
createEthRPCHandlers
} from '@cerc-io/util';
import { TypeSource } from '@graphql-tools/utils';
import type {
Expand Down Expand Up @@ -285,6 +286,7 @@ export class ServerCmd {
const jobQueue = this._baseCmd.jobQueue;
const indexer = this._baseCmd.indexer;
const eventWatcher = this._baseCmd.eventWatcher;
const ethProvider = this._baseCmd.ethProvider;

assert(config);
assert(jobQueue);
Expand Down Expand Up @@ -317,9 +319,11 @@ export class ServerCmd {
const gqlLogger = createGQLLogger(config.server.gql.logDir);
const resolvers = await createResolvers(indexer, eventWatcher, gqlLogger);

const ethRPCHandlers = await createEthRPCHandlers(indexer, ethProvider);

// Create an Express app
const app: Application = express();
const server = await createAndStartServer(app, typeDefs, resolvers, config.server, paymentsManager);
const server = await createAndStartServer(app, typeDefs, resolvers, ethRPCHandlers, config.server, paymentsManager);

await startGQLMetricsServer(config);

Expand Down
8 changes: 7 additions & 1 deletion packages/graph-node/test/utils/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

import assert from 'assert';
import { DeepPartial, FindConditions, FindManyOptions } from 'typeorm';
import { providers } from 'ethers';
import { ethers } from 'ethers';

import {
IndexerInterface,
Expand All @@ -28,6 +28,8 @@ import { GetStorageAt, getStorageValue, MappingKey, StorageLayout } from '@cerc-
export class Indexer implements IndexerInterface {
_getStorageAt: GetStorageAt;
_storageLayoutMap: Map<string, StorageLayout> = new Map();
_contractMap: Map<string, ethers.utils.Interface> = new Map();

eventSignaturesMap: Map<string, string[]> = new Map();

constructor (ethClient: EthClient, storageLayoutMap?: Map<string, StorageLayout>) {
Expand All @@ -50,6 +52,10 @@ export class Indexer implements IndexerInterface {
return this._storageLayoutMap;
}

get contractMap (): Map<string, ethers.utils.Interface> {
return this._contractMap;
}

async init (): Promise<void> {
return undefined;
}
Expand Down
1 change: 1 addition & 0 deletions packages/util/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
"it-length-prefixed": "^8.0.4",
"it-pipe": "^2.0.5",
"it-pushable": "^3.1.2",
"jayson": "^4.1.2",
"js-yaml": "^4.1.0",
"json-bigint": "^1.0.0",
"lodash": "^4.17.21",
Expand Down
3 changes: 3 additions & 0 deletions packages/util/src/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,9 @@ export interface ServerConfig {
// Flag to specify whether RPC endpoint supports block hash as block tag parameter
// https://ethereum.org/en/developers/docs/apis/json-rpc/#default-block
rpcSupportsBlockHashParam: boolean;

// Enable ETH JSON RPC server at /rpc
enableEthRPCServer: boolean;
}

export interface FundingAmountsConfig {
Expand Down
134 changes: 134 additions & 0 deletions packages/util/src/eth-rpc-handlers.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
/* eslint-disable @typescript-eslint/no-unused-vars */
import { utils } from 'ethers';

import { JsonRpcProvider } from '@ethersproject/providers';

import { IndexerInterface } from './types';

const CODE_INVALID_PARAMS = -32602;
const CODE_INTERNAL_ERROR = -32603;
const CODE_SERVER_ERROR = -32000;

const ERROR_CONTRACT_MAP_NOT_SET = 'Contract map not set';
const ERROR_CONTRACT_ABI_NOT_FOUND = 'Contract ABI not found';
const ERROR_CONTRACT_INSUFFICIENT_PARAMS = 'Insufficient params';
const ERROR_CONTRACT_NOT_RECOGNIZED = 'Contract not recognized';
const ERROR_CONTRACT_METHOD_NOT_FOUND = 'Contract method not found';
const ERROR_METHOD_NOT_IMPLEMENTED = 'Method not implemented';
const ERROR_INVALID_BLOCK_TAG = 'Invalid block tag';
const ERROR_BLOCK_NOT_FOUND = 'Block not found';

const DEFAULT_BLOCK_TAG = 'latest';

class ErrorWithCode extends Error {
code: number;
constructor (code: number, message: string) {
super(message);
this.code = code;
}
}

export const createEthRPCHandlers = async (
indexer: IndexerInterface,
ethProvider: JsonRpcProvider
): Promise<any> => {
return {
eth_blockNumber: async (args: any, callback: any) => {
const syncStatus = await indexer.getSyncStatus();
const result = syncStatus ? `0x${syncStatus.latestProcessedBlockNumber.toString(16)}` : '0x';

callback(null, result);
},

eth_call: async (args: any, callback: any) => {
try {
if (!indexer.contractMap) {
throw new ErrorWithCode(CODE_INTERNAL_ERROR, ERROR_CONTRACT_MAP_NOT_SET);
}

if (args.length === 0) {
throw new ErrorWithCode(CODE_INVALID_PARAMS, ERROR_CONTRACT_INSUFFICIENT_PARAMS);
}

const { to, data } = args[0];
const blockTag = args.length > 1 ? args[1] : DEFAULT_BLOCK_TAG;

const blockHash = await parseBlockTag(indexer, ethProvider, blockTag);

const watchedContract = indexer.getWatchedContracts().find(contract => contract.address === to);
if (!watchedContract) {
throw new ErrorWithCode(CODE_INVALID_PARAMS, ERROR_CONTRACT_NOT_RECOGNIZED);
}

const contractInterface = indexer.contractMap.get(watchedContract.kind);
if (!contractInterface) {
throw new ErrorWithCode(CODE_INTERNAL_ERROR, ERROR_CONTRACT_ABI_NOT_FOUND);
}

// Slice out method signature from data
const functionSelector = data.slice(0, 10);

// Find the matching function from the ABI
const functionFragment = contractInterface.getFunction(functionSelector);
if (!functionFragment) {
throw new ErrorWithCode(CODE_INVALID_PARAMS, ERROR_CONTRACT_METHOD_NOT_FOUND);
}

// Decode the data based on the matched function
const decodedData = contractInterface.decodeFunctionData(functionFragment, data);

const functionName = functionFragment.name;
const indexerMethod = (indexer as any)[functionName].bind(indexer);
if (!indexerMethod) {
throw new ErrorWithCode(CODE_SERVER_ERROR, ERROR_METHOD_NOT_IMPLEMENTED);
}

const result = await indexerMethod(blockHash, to, ...decodedData);
const encodedResult = contractInterface.encodeFunctionResult(functionFragment, Array.isArray(result.value) ? result.value : [result.value]);

callback(null, encodedResult);
} catch (error: any) {
let callBackError;
if (error instanceof ErrorWithCode) {
callBackError = { code: error.code, message: error.message };
} else {
callBackError = { code: CODE_SERVER_ERROR, message: error.message };
}

callback(callBackError);
}
},

eth_getLogs: async (args: any, callback: any) => {
// TODO: Implement
}
};
};

const parseBlockTag = async (indexer: IndexerInterface, ethProvider: JsonRpcProvider, blockTag: string): Promise<string> => {
if (utils.isHexString(blockTag)) {
// Return value if hex string is of block hash length
if (utils.hexDataLength(blockTag) === 32) {
return blockTag;
}

// Treat hex value as a block number
const block = await ethProvider.getBlock(blockTag);
if (block === null) {
throw new ErrorWithCode(CODE_INVALID_PARAMS, ERROR_BLOCK_NOT_FOUND);
}

return block.hash;
}

if (blockTag === DEFAULT_BLOCK_TAG) {
const syncStatus = await indexer.getSyncStatus();
if (!syncStatus) {
throw new ErrorWithCode(CODE_INTERNAL_ERROR, 'SyncStatus not found');
}

return syncStatus.latestProcessedBlockHash;
}

throw new ErrorWithCode(CODE_INVALID_PARAMS, ERROR_INVALID_BLOCK_TAG);
};
1 change: 1 addition & 0 deletions packages/util/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,3 +28,4 @@ export * from './eth';
export * from './consensus';
export * from './validate-config';
export * from './logger';
export * from './eth-rpc-handlers';
23 changes: 22 additions & 1 deletion packages/util/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import debug from 'debug';
import responseCachePlugin from 'apollo-server-plugin-response-cache';
import { InMemoryLRUCache } from '@apollo/utils.keyvaluecache';
import queue from 'express-queue';
import jayson from 'jayson';
import { json as jsonParser } from 'body-parser';

import { TypeSource } from '@graphql-tools/utils';
import { makeExecutableSchema } from '@graphql-tools/schema';
Expand All @@ -22,11 +24,13 @@ import { PaymentsManager, paymentsPlugin } from './payments';
const log = debug('vulcanize:server');

const DEFAULT_GQL_PATH = '/graphql';
const ETH_RPC_PATH = '/rpc';

export const createAndStartServer = async (
app: Application,
typeDefs: TypeSource,
resolvers: any,
ethRPCHandlers: any,
serverConfig: ServerConfig,
paymentsManager?: PaymentsManager
): Promise<ApolloServer> => {
Expand Down Expand Up @@ -98,8 +102,25 @@ export const createAndStartServer = async (
path: gqlPath
});

if (serverConfig.enableEthRPCServer) {
// Create a JSON-RPC server to handle ETH RPC calls
const rpcServer = jayson.Server(ethRPCHandlers);

// Mount the JSON-RPC server to ETH_RPC_PATH
app.use(
ETH_RPC_PATH,
jsonParser(),
// TODO: Handle GET requests as well to match Geth's behaviour
rpcServer.middleware()
);
}

httpServer.listen(port, host, () => {
log(`Server is listening on ${host}:${port}${server.graphqlPath}`);
log(`GQL server is listening on http://${host}:${port}${server.graphqlPath}`);

if (serverConfig.enableEthRPCServer) {
log(`ETH JSON RPC server is listening on http://${host}:${port}${ETH_RPC_PATH}`);
}
});

return server;
Expand Down
3 changes: 2 additions & 1 deletion packages/util/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
//

import { Connection, DeepPartial, EntityTarget, FindConditions, FindManyOptions, ObjectLiteral, QueryRunner } from 'typeorm';
import { Transaction } from 'ethers';
import { ethers, Transaction } from 'ethers';

import { MappingKey, StorageLayout } from '@cerc-io/solidity-mapper';

Expand Down Expand Up @@ -161,6 +161,7 @@ export interface IndexerInterface {
readonly serverConfig: ServerConfig
readonly upstreamConfig: UpstreamConfig
readonly storageLayoutMap: Map<string, StorageLayout>
readonly contractMap: Map<string, ethers.utils.Interface>
// eslint-disable-next-line no-use-before-define
readonly graphWatcher?: GraphWatcherInterface
init (): Promise<void>
Expand Down
Loading

0 comments on commit b46d881

Please sign in to comment.