diff --git a/cmd/geth/main.go b/cmd/geth/main.go index d120d6599..197bbcd23 100644 --- a/cmd/geth/main.go +++ b/cmd/geth/main.go @@ -185,6 +185,7 @@ var ( utils.RPCGlobalGasCapFlag, utils.RPCGlobalTxFeeCapFlag, utils.AllowUnprotectedTxs, + utils.BatchRequestLimit, } metricsFlags = []cli.Flag{ diff --git a/cmd/geth/usage.go b/cmd/geth/usage.go index 52b4a1606..702a34b34 100644 --- a/cmd/geth/usage.go +++ b/cmd/geth/usage.go @@ -148,6 +148,7 @@ var AppHelpFlagGroups = []flags.FlagGroup{ utils.WSApiFlag, utils.WSPathPrefixFlag, utils.WSAllowedOriginsFlag, + utils.BatchRequestLimit, utils.GraphQLEnabledFlag, utils.GraphQLCORSDomainFlag, utils.GraphQLVirtualHostsFlag, diff --git a/cmd/utils/flags.go b/cmd/utils/flags.go index 643e169cd..172067ce4 100644 --- a/cmd/utils/flags.go +++ b/cmd/utils/flags.go @@ -621,7 +621,11 @@ var ( Name: "rpc.allow-unprotected-txs", Usage: "Allow for unprotected (non EIP155 signed) transactions to be submitted via RPC", } - + BatchRequestLimit = cli.IntFlag{ //nolint:typecheck + Name: "rpc.batch-request-limit", + Usage: "Maximum number of requests in a batch(only supported in http)", + Value: node.DefaultConfig.BatchRequestLimit, + } // Network Settings MaxPeersFlag = cli.IntFlag{ Name: "maxpeers", @@ -995,6 +999,10 @@ func setHTTP(ctx *cli.Context, cfg *node.Config) { if ctx.GlobalIsSet(AllowUnprotectedTxs.Name) { cfg.AllowUnprotectedTxs = ctx.GlobalBool(AllowUnprotectedTxs.Name) } + if ctx.IsSet(BatchRequestLimit.Name) { + cfg.BatchRequestLimit = ctx.Int(BatchRequestLimit.Name) + } + } // setGraphQL creates the GraphQL listener interface string from the set diff --git a/node/api.go b/node/api.go index be20b89d9..a03c2a26d 100644 --- a/node/api.go +++ b/node/api.go @@ -185,6 +185,9 @@ func (api *privateAdminAPI) StartHTTP(host *string, port *int, cors *string, api CorsAllowedOrigins: api.node.config.HTTPCors, Vhosts: api.node.config.HTTPVirtualHosts, Modules: api.node.config.HTTPModules, + rpcEndpointConfig: rpcEndpointConfig{ + batchItemLimit: api.node.config.BatchRequestLimit, + }, } if cors != nil { config.CorsAllowedOrigins = nil diff --git a/node/config.go b/node/config.go index 26f00cd67..eb898c5fc 100644 --- a/node/config.go +++ b/node/config.go @@ -190,6 +190,10 @@ type Config struct { // AllowUnprotectedTxs allows non EIP-155 protected transactions to be send over RPC. AllowUnprotectedTxs bool `toml:",omitempty"` + + // BatchRequestLimit is the maximum number of requests in a batch. + BatchRequestLimit int `toml:",omitempty"` + } // IPCEndpoint resolves an IPC endpoint based on a configured value, taking into diff --git a/node/defaults.go b/node/defaults.go index c685dde5d..1ffda8dea 100644 --- a/node/defaults.go +++ b/node/defaults.go @@ -45,6 +45,7 @@ var DefaultConfig = Config{ HTTPTimeouts: rpc.DefaultHTTPTimeouts, WSPort: DefaultWSPort, WSModules: []string{"net", "web3"}, + BatchRequestLimit: 1000, GraphQLVirtualHosts: []string{"localhost"}, P2P: p2p.Config{ ListenAddr: ":30303", diff --git a/node/node.go b/node/node.go index ceab1c909..5ad5af021 100644 --- a/node/node.go +++ b/node/node.go @@ -96,9 +96,11 @@ func New(conf *Config) (*Node, error) { return nil, errors.New(`Config.Name cannot end in ".ipc"`) } + server := rpc.NewServer() + server.SetBatchLimits(conf.BatchRequestLimit) node := &Node{ config: conf, - inprocHandler: rpc.NewServer(), + inprocHandler: server, eventmux: new(event.TypeMux), log: conf.Logger, stop: make(chan struct{}), @@ -350,6 +352,10 @@ func (n *Node) startRPC() error { } } + rpcConfig := rpcEndpointConfig{ + batchItemLimit: n.config.BatchRequestLimit, + } + // Configure HTTP. if n.config.HTTPHost != "" { config := httpConfig{ @@ -357,6 +363,7 @@ func (n *Node) startRPC() error { Vhosts: n.config.HTTPVirtualHosts, Modules: n.config.HTTPModules, prefix: n.config.HTTPPathPrefix, + rpcEndpointConfig: rpcConfig, } if err := n.http.setListenAddr(n.config.HTTPHost, n.config.HTTPPort); err != nil { return err diff --git a/node/rpcstack.go b/node/rpcstack.go index 2c55a070b..c641e10e4 100644 --- a/node/rpcstack.go +++ b/node/rpcstack.go @@ -40,6 +40,7 @@ type httpConfig struct { CorsAllowedOrigins []string Vhosts []string prefix string // path prefix on which to mount http handler + rpcEndpointConfig } // wsConfig is the JSON-RPC/Websocket configuration @@ -49,6 +50,10 @@ type wsConfig struct { prefix string // path prefix on which to mount ws handler } +type rpcEndpointConfig struct { + batchItemLimit int +} + type rpcHandler struct { http.Handler server *rpc.Server @@ -280,6 +285,7 @@ func (h *httpServer) enableRPC(apis []rpc.API, config httpConfig) error { // Create RPC server and handler. srv := rpc.NewServer() + srv.SetBatchLimits(config.batchItemLimit) if err := RegisterApis(apis, config.Modules, srv, false); err != nil { return err } diff --git a/rpc/client.go b/rpc/client.go index 198ce6357..77f144634 100644 --- a/rpc/client.go +++ b/rpc/client.go @@ -83,6 +83,9 @@ type Client struct { // This function, if non-nil, is called when the connection is lost. reconnectFunc reconnectFunc + // config fields + batchItemLimit int + // writeConn is used for writing to the connection on the caller's goroutine. It should // only be accessed outside of dispatch, with the write lock held. The write lock is // taken by sending on reqInit and released by sending on reqSent. @@ -111,7 +114,7 @@ type clientConn struct { func (c *Client) newClientConn(conn ServerCodec) *clientConn { ctx := context.WithValue(context.Background(), clientContextKey{}, c) - handler := newHandler(ctx, conn, c.idgen, c.services) + handler := newHandler(ctx, conn, c.idgen, c.services, c.batchItemLimit) return &clientConn{conn, handler} } diff --git a/rpc/errors.go b/rpc/errors.go index 4c06a745f..e58f6458c 100644 --- a/rpc/errors.go +++ b/rpc/errors.go @@ -58,6 +58,8 @@ var ( const defaultErrorCode = -32000 +const errMsgBatchTooLarge = "batch too large" + type methodNotFoundError struct{ method string } func (e *methodNotFoundError) ErrorCode() int { return -32601 } diff --git a/rpc/handler.go b/rpc/handler.go index 85f774a1b..884d26db4 100644 --- a/rpc/handler.go +++ b/rpc/handler.go @@ -50,17 +50,18 @@ import ( // } // type handler struct { - reg *serviceRegistry - unsubscribeCb *callback - idgen func() ID // subscription ID generator - respWait map[string]*requestOp // active client requests - clientSubs map[string]*ClientSubscription // active client subscriptions - callWG sync.WaitGroup // pending call goroutines - rootCtx context.Context // canceled by close() - cancelRoot func() // cancel function for rootCtx - conn jsonWriter // where responses will be sent - log log.Logger - allowSubscribe bool + reg *serviceRegistry + unsubscribeCb *callback + idgen func() ID // subscription ID generator + respWait map[string]*requestOp // active client requests + clientSubs map[string]*ClientSubscription // active client subscriptions + callWG sync.WaitGroup // pending call goroutines + rootCtx context.Context // canceled by close() + cancelRoot func() // cancel function for rootCtx + conn jsonWriter // where responses will be sent + log log.Logger + allowSubscribe bool + batchRequestLimit int subLock sync.Mutex serverSubs map[ID]*Subscription @@ -71,19 +72,20 @@ type callProc struct { notifiers []*Notifier } -func newHandler(connCtx context.Context, conn jsonWriter, idgen func() ID, reg *serviceRegistry) *handler { +func newHandler(connCtx context.Context, conn jsonWriter, idgen func() ID, reg *serviceRegistry, batchRequestLimit int) *handler { rootCtx, cancelRoot := context.WithCancel(connCtx) h := &handler{ - reg: reg, - idgen: idgen, - conn: conn, - respWait: make(map[string]*requestOp), - clientSubs: make(map[string]*ClientSubscription), - rootCtx: rootCtx, - cancelRoot: cancelRoot, - allowSubscribe: true, - serverSubs: make(map[ID]*Subscription), - log: log.Root(), + reg: reg, + idgen: idgen, + conn: conn, + respWait: make(map[string]*requestOp), + clientSubs: make(map[string]*ClientSubscription), + rootCtx: rootCtx, + cancelRoot: cancelRoot, + allowSubscribe: true, + serverSubs: make(map[ID]*Subscription), + log: log.Root(), + batchRequestLimit: batchRequestLimit, } if conn.remoteAddr() != "" { h.log = h.log.New("conn", conn.remoteAddr()) @@ -102,6 +104,14 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage) { return } + // Apply limit on total number of requests. + if h.batchRequestLimit != 0 && len(msgs) > h.batchRequestLimit { + h.startCallProc(func(cp *callProc) { + h.respondWithBatchTooLarge(cp, msgs) + }) + return + } + // Handle non-call messages first: calls := make([]*jsonrpcMessage, 0, len(msgs)) for _, msg := range msgs { @@ -129,6 +139,19 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage) { } }) } +func (h *handler) respondWithBatchTooLarge(cp *callProc, batch []*jsonrpcMessage) { + resp := errorMessage(&invalidRequestError{errMsgBatchTooLarge}) + // Find the first call and add its "id" field to the error. + // This is the best we can do, given that the protocol doesn't have a way + // of reporting an error for the entire batch. + for _, msg := range batch { + if msg.isCall() { + resp.ID = msg.ID + break + } + } + h.conn.writeJSON(cp.ctx, []*jsonrpcMessage{resp}) +} // handleMsg handles a single message. func (h *handler) handleMsg(msg *jsonrpcMessage) { diff --git a/rpc/server.go b/rpc/server.go index 64e078a7f..6294e201e 100644 --- a/rpc/server.go +++ b/rpc/server.go @@ -42,10 +42,11 @@ const ( // Server is an RPC server. type Server struct { - services serviceRegistry - idgen func() ID - run int32 - codecs mapset.Set + services serviceRegistry + idgen func() ID + run int32 + codecs mapset.Set + batchItemLimit int } // NewServer creates a new server instance with no registered handlers. @@ -58,6 +59,15 @@ func NewServer() *Server { return server } +// SetBatchLimits sets limits applied to batch requests. There are two limits: 'itemLimit' +// is the maximum number of items in a batch. +// +// This method should be called before processing any requests via ServeCodec, ServeHTTP, +// ServeListener etc. +func (s *Server) SetBatchLimits(itemLimit int) { + s.batchItemLimit = itemLimit +} + // RegisterName creates a service for the given receiver type under the given name. When no // methods on the given receiver match the criteria to be either a RPC method or a // subscription an error is returned. Otherwise a new service is created and added to the @@ -97,7 +107,7 @@ func (s *Server) serveSingleRequest(ctx context.Context, codec ServerCodec) { return } - h := newHandler(ctx, codec, s.idgen, &s.services) + h := newHandler(ctx, codec, s.idgen, &s.services, s.batchItemLimit) h.allowSubscribe = false defer h.close(io.EOF, nil)