Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add batch limit in http #76

Merged
merged 2 commits into from
Mar 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ var (
utils.RPCGlobalGasCapFlag,
utils.RPCGlobalTxFeeCapFlag,
utils.AllowUnprotectedTxs,
utils.BatchRequestLimit,
}

metricsFlags = []cli.Flag{
Expand Down
1 change: 1 addition & 0 deletions cmd/geth/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ var AppHelpFlagGroups = []flags.FlagGroup{
utils.WSApiFlag,
utils.WSPathPrefixFlag,
utils.WSAllowedOriginsFlag,
utils.BatchRequestLimit,
utils.GraphQLEnabledFlag,
utils.GraphQLCORSDomainFlag,
utils.GraphQLVirtualHostsFlag,
Expand Down
10 changes: 9 additions & 1 deletion cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,11 @@ var (
Name: "rpc.allow-unprotected-txs",
Usage: "Allow for unprotected (non EIP155 signed) transactions to be submitted via RPC",
}

BatchRequestLimit = cli.IntFlag{ //nolint:typecheck
Name: "rpc.batch-request-limit",
Usage: "Maximum number of requests in a batch(only supported in http)",
Value: node.DefaultConfig.BatchRequestLimit,
}
// Network Settings
MaxPeersFlag = cli.IntFlag{
Name: "maxpeers",
Expand Down Expand Up @@ -995,6 +999,10 @@ func setHTTP(ctx *cli.Context, cfg *node.Config) {
if ctx.GlobalIsSet(AllowUnprotectedTxs.Name) {
cfg.AllowUnprotectedTxs = ctx.GlobalBool(AllowUnprotectedTxs.Name)
}
if ctx.IsSet(BatchRequestLimit.Name) {
cfg.BatchRequestLimit = ctx.Int(BatchRequestLimit.Name)
}

}

// setGraphQL creates the GraphQL listener interface string from the set
Expand Down
3 changes: 3 additions & 0 deletions node/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,9 @@ func (api *privateAdminAPI) StartHTTP(host *string, port *int, cors *string, api
CorsAllowedOrigins: api.node.config.HTTPCors,
Vhosts: api.node.config.HTTPVirtualHosts,
Modules: api.node.config.HTTPModules,
rpcEndpointConfig: rpcEndpointConfig{
batchItemLimit: api.node.config.BatchRequestLimit,
},
}
if cors != nil {
config.CorsAllowedOrigins = nil
Expand Down
4 changes: 4 additions & 0 deletions node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,10 @@ type Config struct {

// AllowUnprotectedTxs allows non EIP-155 protected transactions to be send over RPC.
AllowUnprotectedTxs bool `toml:",omitempty"`

// BatchRequestLimit is the maximum number of requests in a batch.
BatchRequestLimit int `toml:",omitempty"`

}

// IPCEndpoint resolves an IPC endpoint based on a configured value, taking into
Expand Down
1 change: 1 addition & 0 deletions node/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ var DefaultConfig = Config{
HTTPTimeouts: rpc.DefaultHTTPTimeouts,
WSPort: DefaultWSPort,
WSModules: []string{"net", "web3"},
BatchRequestLimit: 1000,
GraphQLVirtualHosts: []string{"localhost"},
P2P: p2p.Config{
ListenAddr: ":30303",
Expand Down
9 changes: 8 additions & 1 deletion node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,11 @@ func New(conf *Config) (*Node, error) {
return nil, errors.New(`Config.Name cannot end in ".ipc"`)
}

server := rpc.NewServer()
server.SetBatchLimits(conf.BatchRequestLimit)
node := &Node{
config: conf,
inprocHandler: rpc.NewServer(),
inprocHandler: server,
eventmux: new(event.TypeMux),
log: conf.Logger,
stop: make(chan struct{}),
Expand Down Expand Up @@ -350,13 +352,18 @@ func (n *Node) startRPC() error {
}
}

rpcConfig := rpcEndpointConfig{
batchItemLimit: n.config.BatchRequestLimit,
}

// Configure HTTP.
if n.config.HTTPHost != "" {
config := httpConfig{
CorsAllowedOrigins: n.config.HTTPCors,
Vhosts: n.config.HTTPVirtualHosts,
Modules: n.config.HTTPModules,
prefix: n.config.HTTPPathPrefix,
rpcEndpointConfig: rpcConfig,
}
if err := n.http.setListenAddr(n.config.HTTPHost, n.config.HTTPPort); err != nil {
return err
Expand Down
6 changes: 6 additions & 0 deletions node/rpcstack.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type httpConfig struct {
CorsAllowedOrigins []string
Vhosts []string
prefix string // path prefix on which to mount http handler
rpcEndpointConfig
}

// wsConfig is the JSON-RPC/Websocket configuration
Expand All @@ -49,6 +50,10 @@ type wsConfig struct {
prefix string // path prefix on which to mount ws handler
}

type rpcEndpointConfig struct {
batchItemLimit int
}

type rpcHandler struct {
http.Handler
server *rpc.Server
Expand Down Expand Up @@ -280,6 +285,7 @@ func (h *httpServer) enableRPC(apis []rpc.API, config httpConfig) error {

// Create RPC server and handler.
srv := rpc.NewServer()
srv.SetBatchLimits(config.batchItemLimit)
if err := RegisterApis(apis, config.Modules, srv, false); err != nil {
return err
}
Expand Down
5 changes: 4 additions & 1 deletion rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ type Client struct {
// This function, if non-nil, is called when the connection is lost.
reconnectFunc reconnectFunc

// config fields
batchItemLimit int

// writeConn is used for writing to the connection on the caller's goroutine. It should
// only be accessed outside of dispatch, with the write lock held. The write lock is
// taken by sending on reqInit and released by sending on reqSent.
Expand Down Expand Up @@ -111,7 +114,7 @@ type clientConn struct {

func (c *Client) newClientConn(conn ServerCodec) *clientConn {
ctx := context.WithValue(context.Background(), clientContextKey{}, c)
handler := newHandler(ctx, conn, c.idgen, c.services)
handler := newHandler(ctx, conn, c.idgen, c.services, c.batchItemLimit)
return &clientConn{conn, handler}
}

Expand Down
2 changes: 2 additions & 0 deletions rpc/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ var (

const defaultErrorCode = -32000

const errMsgBatchTooLarge = "batch too large"

type methodNotFoundError struct{ method string }

func (e *methodNotFoundError) ErrorCode() int { return -32601 }
Expand Down
67 changes: 45 additions & 22 deletions rpc/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,18 @@ import (
// }
//
type handler struct {
reg *serviceRegistry
unsubscribeCb *callback
idgen func() ID // subscription ID generator
respWait map[string]*requestOp // active client requests
clientSubs map[string]*ClientSubscription // active client subscriptions
callWG sync.WaitGroup // pending call goroutines
rootCtx context.Context // canceled by close()
cancelRoot func() // cancel function for rootCtx
conn jsonWriter // where responses will be sent
log log.Logger
allowSubscribe bool
reg *serviceRegistry
unsubscribeCb *callback
idgen func() ID // subscription ID generator
respWait map[string]*requestOp // active client requests
clientSubs map[string]*ClientSubscription // active client subscriptions
callWG sync.WaitGroup // pending call goroutines
rootCtx context.Context // canceled by close()
cancelRoot func() // cancel function for rootCtx
conn jsonWriter // where responses will be sent
log log.Logger
allowSubscribe bool
batchRequestLimit int

subLock sync.Mutex
serverSubs map[ID]*Subscription
Expand All @@ -71,19 +72,20 @@ type callProc struct {
notifiers []*Notifier
}

func newHandler(connCtx context.Context, conn jsonWriter, idgen func() ID, reg *serviceRegistry) *handler {
func newHandler(connCtx context.Context, conn jsonWriter, idgen func() ID, reg *serviceRegistry, batchRequestLimit int) *handler {
rootCtx, cancelRoot := context.WithCancel(connCtx)
h := &handler{
reg: reg,
idgen: idgen,
conn: conn,
respWait: make(map[string]*requestOp),
clientSubs: make(map[string]*ClientSubscription),
rootCtx: rootCtx,
cancelRoot: cancelRoot,
allowSubscribe: true,
serverSubs: make(map[ID]*Subscription),
log: log.Root(),
reg: reg,
idgen: idgen,
conn: conn,
respWait: make(map[string]*requestOp),
clientSubs: make(map[string]*ClientSubscription),
rootCtx: rootCtx,
cancelRoot: cancelRoot,
allowSubscribe: true,
serverSubs: make(map[ID]*Subscription),
log: log.Root(),
batchRequestLimit: batchRequestLimit,
}
if conn.remoteAddr() != "" {
h.log = h.log.New("conn", conn.remoteAddr())
Expand All @@ -102,6 +104,14 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage) {
return
}

// Apply limit on total number of requests.
if h.batchRequestLimit != 0 && len(msgs) > h.batchRequestLimit {
h.startCallProc(func(cp *callProc) {
h.respondWithBatchTooLarge(cp, msgs)
})
return
}

// Handle non-call messages first:
calls := make([]*jsonrpcMessage, 0, len(msgs))
for _, msg := range msgs {
Expand Down Expand Up @@ -129,6 +139,19 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage) {
}
})
}
func (h *handler) respondWithBatchTooLarge(cp *callProc, batch []*jsonrpcMessage) {
resp := errorMessage(&invalidRequestError{errMsgBatchTooLarge})
// Find the first call and add its "id" field to the error.
// This is the best we can do, given that the protocol doesn't have a way
// of reporting an error for the entire batch.
for _, msg := range batch {
if msg.isCall() {
resp.ID = msg.ID
break
}
}
h.conn.writeJSON(cp.ctx, []*jsonrpcMessage{resp})
}

// handleMsg handles a single message.
func (h *handler) handleMsg(msg *jsonrpcMessage) {
Expand Down
20 changes: 15 additions & 5 deletions rpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,11 @@ const (

// Server is an RPC server.
type Server struct {
services serviceRegistry
idgen func() ID
run int32
codecs mapset.Set
services serviceRegistry
idgen func() ID
run int32
codecs mapset.Set
batchItemLimit int
}

// NewServer creates a new server instance with no registered handlers.
Expand All @@ -58,6 +59,15 @@ func NewServer() *Server {
return server
}

// SetBatchLimits sets limits applied to batch requests. There are two limits: 'itemLimit'
// is the maximum number of items in a batch.
//
// This method should be called before processing any requests via ServeCodec, ServeHTTP,
// ServeListener etc.
func (s *Server) SetBatchLimits(itemLimit int) {
s.batchItemLimit = itemLimit
}

// RegisterName creates a service for the given receiver type under the given name. When no
// methods on the given receiver match the criteria to be either a RPC method or a
// subscription an error is returned. Otherwise a new service is created and added to the
Expand Down Expand Up @@ -97,7 +107,7 @@ func (s *Server) serveSingleRequest(ctx context.Context, codec ServerCodec) {
return
}

h := newHandler(ctx, codec, s.idgen, &s.services)
h := newHandler(ctx, codec, s.idgen, &s.services, s.batchItemLimit)
h.allowSubscribe = false
defer h.close(io.EOF, nil)

Expand Down
Loading