Skip to content

Commit

Permalink
Merge pull request #76 from bladehan1/batchLimitInhttp
Browse files Browse the repository at this point in the history
Add batch limit in http
  • Loading branch information
bladehan1 authored Mar 7, 2024
2 parents b48bfb3 + e283013 commit c9984a2
Show file tree
Hide file tree
Showing 12 changed files with 99 additions and 30 deletions.
1 change: 1 addition & 0 deletions cmd/geth/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,7 @@ var (
utils.RPCGlobalGasCapFlag,
utils.RPCGlobalTxFeeCapFlag,
utils.AllowUnprotectedTxs,
utils.BatchRequestLimit,
}

metricsFlags = []cli.Flag{
Expand Down
1 change: 1 addition & 0 deletions cmd/geth/usage.go
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ var AppHelpFlagGroups = []flags.FlagGroup{
utils.WSApiFlag,
utils.WSPathPrefixFlag,
utils.WSAllowedOriginsFlag,
utils.BatchRequestLimit,
utils.GraphQLEnabledFlag,
utils.GraphQLCORSDomainFlag,
utils.GraphQLVirtualHostsFlag,
Expand Down
10 changes: 9 additions & 1 deletion cmd/utils/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -621,7 +621,11 @@ var (
Name: "rpc.allow-unprotected-txs",
Usage: "Allow for unprotected (non EIP155 signed) transactions to be submitted via RPC",
}

BatchRequestLimit = cli.IntFlag{ //nolint:typecheck
Name: "rpc.batch-request-limit",
Usage: "Maximum number of requests in a batch(only supported in http)",
Value: node.DefaultConfig.BatchRequestLimit,
}
// Network Settings
MaxPeersFlag = cli.IntFlag{
Name: "maxpeers",
Expand Down Expand Up @@ -995,6 +999,10 @@ func setHTTP(ctx *cli.Context, cfg *node.Config) {
if ctx.GlobalIsSet(AllowUnprotectedTxs.Name) {
cfg.AllowUnprotectedTxs = ctx.GlobalBool(AllowUnprotectedTxs.Name)
}
if ctx.IsSet(BatchRequestLimit.Name) {
cfg.BatchRequestLimit = ctx.Int(BatchRequestLimit.Name)
}

}

// setGraphQL creates the GraphQL listener interface string from the set
Expand Down
3 changes: 3 additions & 0 deletions node/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,9 @@ func (api *privateAdminAPI) StartHTTP(host *string, port *int, cors *string, api
CorsAllowedOrigins: api.node.config.HTTPCors,
Vhosts: api.node.config.HTTPVirtualHosts,
Modules: api.node.config.HTTPModules,
rpcEndpointConfig: rpcEndpointConfig{
batchItemLimit: api.node.config.BatchRequestLimit,
},
}
if cors != nil {
config.CorsAllowedOrigins = nil
Expand Down
4 changes: 4 additions & 0 deletions node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,10 @@ type Config struct {

// AllowUnprotectedTxs allows non EIP-155 protected transactions to be send over RPC.
AllowUnprotectedTxs bool `toml:",omitempty"`

// BatchRequestLimit is the maximum number of requests in a batch.
BatchRequestLimit int `toml:",omitempty"`

}

// IPCEndpoint resolves an IPC endpoint based on a configured value, taking into
Expand Down
1 change: 1 addition & 0 deletions node/defaults.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ var DefaultConfig = Config{
HTTPTimeouts: rpc.DefaultHTTPTimeouts,
WSPort: DefaultWSPort,
WSModules: []string{"net", "web3"},
BatchRequestLimit: 1000,
GraphQLVirtualHosts: []string{"localhost"},
P2P: p2p.Config{
ListenAddr: ":30303",
Expand Down
9 changes: 8 additions & 1 deletion node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,11 @@ func New(conf *Config) (*Node, error) {
return nil, errors.New(`Config.Name cannot end in ".ipc"`)
}

server := rpc.NewServer()
server.SetBatchLimits(conf.BatchRequestLimit)
node := &Node{
config: conf,
inprocHandler: rpc.NewServer(),
inprocHandler: server,
eventmux: new(event.TypeMux),
log: conf.Logger,
stop: make(chan struct{}),
Expand Down Expand Up @@ -350,13 +352,18 @@ func (n *Node) startRPC() error {
}
}

rpcConfig := rpcEndpointConfig{
batchItemLimit: n.config.BatchRequestLimit,
}

// Configure HTTP.
if n.config.HTTPHost != "" {
config := httpConfig{
CorsAllowedOrigins: n.config.HTTPCors,
Vhosts: n.config.HTTPVirtualHosts,
Modules: n.config.HTTPModules,
prefix: n.config.HTTPPathPrefix,
rpcEndpointConfig: rpcConfig,
}
if err := n.http.setListenAddr(n.config.HTTPHost, n.config.HTTPPort); err != nil {
return err
Expand Down
6 changes: 6 additions & 0 deletions node/rpcstack.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ type httpConfig struct {
CorsAllowedOrigins []string
Vhosts []string
prefix string // path prefix on which to mount http handler
rpcEndpointConfig
}

// wsConfig is the JSON-RPC/Websocket configuration
Expand All @@ -49,6 +50,10 @@ type wsConfig struct {
prefix string // path prefix on which to mount ws handler
}

type rpcEndpointConfig struct {
batchItemLimit int
}

type rpcHandler struct {
http.Handler
server *rpc.Server
Expand Down Expand Up @@ -280,6 +285,7 @@ func (h *httpServer) enableRPC(apis []rpc.API, config httpConfig) error {

// Create RPC server and handler.
srv := rpc.NewServer()
srv.SetBatchLimits(config.batchItemLimit)
if err := RegisterApis(apis, config.Modules, srv, false); err != nil {
return err
}
Expand Down
5 changes: 4 additions & 1 deletion rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,9 @@ type Client struct {
// This function, if non-nil, is called when the connection is lost.
reconnectFunc reconnectFunc

// config fields
batchItemLimit int

// writeConn is used for writing to the connection on the caller's goroutine. It should
// only be accessed outside of dispatch, with the write lock held. The write lock is
// taken by sending on reqInit and released by sending on reqSent.
Expand Down Expand Up @@ -111,7 +114,7 @@ type clientConn struct {

func (c *Client) newClientConn(conn ServerCodec) *clientConn {
ctx := context.WithValue(context.Background(), clientContextKey{}, c)
handler := newHandler(ctx, conn, c.idgen, c.services)
handler := newHandler(ctx, conn, c.idgen, c.services, c.batchItemLimit)
return &clientConn{conn, handler}
}

Expand Down
2 changes: 2 additions & 0 deletions rpc/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ var (

const defaultErrorCode = -32000

const errMsgBatchTooLarge = "batch too large"

type methodNotFoundError struct{ method string }

func (e *methodNotFoundError) ErrorCode() int { return -32601 }
Expand Down
67 changes: 45 additions & 22 deletions rpc/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,17 +50,18 @@ import (
// }
//
type handler struct {
reg *serviceRegistry
unsubscribeCb *callback
idgen func() ID // subscription ID generator
respWait map[string]*requestOp // active client requests
clientSubs map[string]*ClientSubscription // active client subscriptions
callWG sync.WaitGroup // pending call goroutines
rootCtx context.Context // canceled by close()
cancelRoot func() // cancel function for rootCtx
conn jsonWriter // where responses will be sent
log log.Logger
allowSubscribe bool
reg *serviceRegistry
unsubscribeCb *callback
idgen func() ID // subscription ID generator
respWait map[string]*requestOp // active client requests
clientSubs map[string]*ClientSubscription // active client subscriptions
callWG sync.WaitGroup // pending call goroutines
rootCtx context.Context // canceled by close()
cancelRoot func() // cancel function for rootCtx
conn jsonWriter // where responses will be sent
log log.Logger
allowSubscribe bool
batchRequestLimit int

subLock sync.Mutex
serverSubs map[ID]*Subscription
Expand All @@ -71,19 +72,20 @@ type callProc struct {
notifiers []*Notifier
}

func newHandler(connCtx context.Context, conn jsonWriter, idgen func() ID, reg *serviceRegistry) *handler {
func newHandler(connCtx context.Context, conn jsonWriter, idgen func() ID, reg *serviceRegistry, batchRequestLimit int) *handler {
rootCtx, cancelRoot := context.WithCancel(connCtx)
h := &handler{
reg: reg,
idgen: idgen,
conn: conn,
respWait: make(map[string]*requestOp),
clientSubs: make(map[string]*ClientSubscription),
rootCtx: rootCtx,
cancelRoot: cancelRoot,
allowSubscribe: true,
serverSubs: make(map[ID]*Subscription),
log: log.Root(),
reg: reg,
idgen: idgen,
conn: conn,
respWait: make(map[string]*requestOp),
clientSubs: make(map[string]*ClientSubscription),
rootCtx: rootCtx,
cancelRoot: cancelRoot,
allowSubscribe: true,
serverSubs: make(map[ID]*Subscription),
log: log.Root(),
batchRequestLimit: batchRequestLimit,
}
if conn.remoteAddr() != "" {
h.log = h.log.New("conn", conn.remoteAddr())
Expand All @@ -102,6 +104,14 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage) {
return
}

// Apply limit on total number of requests.
if h.batchRequestLimit != 0 && len(msgs) > h.batchRequestLimit {
h.startCallProc(func(cp *callProc) {
h.respondWithBatchTooLarge(cp, msgs)
})
return
}

// Handle non-call messages first:
calls := make([]*jsonrpcMessage, 0, len(msgs))
for _, msg := range msgs {
Expand Down Expand Up @@ -129,6 +139,19 @@ func (h *handler) handleBatch(msgs []*jsonrpcMessage) {
}
})
}
func (h *handler) respondWithBatchTooLarge(cp *callProc, batch []*jsonrpcMessage) {
resp := errorMessage(&invalidRequestError{errMsgBatchTooLarge})
// Find the first call and add its "id" field to the error.
// This is the best we can do, given that the protocol doesn't have a way
// of reporting an error for the entire batch.
for _, msg := range batch {
if msg.isCall() {
resp.ID = msg.ID
break
}
}
h.conn.writeJSON(cp.ctx, []*jsonrpcMessage{resp})
}

// handleMsg handles a single message.
func (h *handler) handleMsg(msg *jsonrpcMessage) {
Expand Down
20 changes: 15 additions & 5 deletions rpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,11 @@ const (

// Server is an RPC server.
type Server struct {
services serviceRegistry
idgen func() ID
run int32
codecs mapset.Set
services serviceRegistry
idgen func() ID
run int32
codecs mapset.Set
batchItemLimit int
}

// NewServer creates a new server instance with no registered handlers.
Expand All @@ -58,6 +59,15 @@ func NewServer() *Server {
return server
}

// SetBatchLimits sets limits applied to batch requests. There are two limits: 'itemLimit'
// is the maximum number of items in a batch.
//
// This method should be called before processing any requests via ServeCodec, ServeHTTP,
// ServeListener etc.
func (s *Server) SetBatchLimits(itemLimit int) {
s.batchItemLimit = itemLimit
}

// RegisterName creates a service for the given receiver type under the given name. When no
// methods on the given receiver match the criteria to be either a RPC method or a
// subscription an error is returned. Otherwise a new service is created and added to the
Expand Down Expand Up @@ -97,7 +107,7 @@ func (s *Server) serveSingleRequest(ctx context.Context, codec ServerCodec) {
return
}

h := newHandler(ctx, codec, s.idgen, &s.services)
h := newHandler(ctx, codec, s.idgen, &s.services, s.batchItemLimit)
h.allowSubscribe = false
defer h.close(io.EOF, nil)

Expand Down

0 comments on commit c9984a2

Please sign in to comment.