From e75ee0e9d70dc41213e719fefbcec321dc8ffc27 Mon Sep 17 00:00:00 2001 From: rkonfj Date: Wed, 17 May 2023 16:34:13 +0800 Subject: [PATCH] cmd `serve`: pooling copy buffer --- cmd/serve/cmd.go | 10 ++++++++++ server/server.go | 17 ++++++++++++++--- 2 files changed, 24 insertions(+), 3 deletions(-) diff --git a/cmd/serve/cmd.go b/cmd/serve/cmd.go index 21e5845..cac9ff2 100644 --- a/cmd/serve/cmd.go +++ b/cmd/serve/cmd.go @@ -1,6 +1,7 @@ package serve import ( + "github.com/dustin/go-humanize" "github.com/rkonfj/toh/server" "github.com/spf13/cobra" ) @@ -15,6 +16,7 @@ func init() { RunE: startAction, } Cmd.Flags().String("acl", "acl.json", "file containing access control rules") + Cmd.Flags().String("copy-buf", "16Ki", "buffer size for copying network data") Cmd.Flags().StringP("listen", "l", "localhost:9986", "http server listen address") } @@ -37,5 +39,13 @@ func processServerOptions(cmd *cobra.Command) (options server.Options, err error return } options.ACL, err = cmd.Flags().GetString("acl") + if err != nil { + return + } + copyBuf, err := cmd.Flags().GetString("copy-buf") + if err != nil { + return + } + options.Buf, err = humanize.ParseBytes(copyBuf) return } diff --git a/server/server.go b/server/server.go index 6c46f6a..6cf9ad4 100644 --- a/server/server.go +++ b/server/server.go @@ -3,6 +3,7 @@ package server import ( "context" "io" + "math" "net" "net/http" "sync" @@ -16,11 +17,13 @@ type TohServer struct { options Options acl *ACL trafficEventChan chan *TrafficEvent + bufPool *sync.Pool } type Options struct { Listen string ACL string + Buf uint64 } func NewTohServer(options Options) (*TohServer, error) { @@ -31,7 +34,11 @@ func NewTohServer(options Options) (*TohServer, error) { return &TohServer{ options: options, acl: acl, - trafficEventChan: make(chan *TrafficEvent, 4096), + trafficEventChan: make(chan *TrafficEvent, 2048), + bufPool: &sync.Pool{New: func() any { + buf := make([]byte, int(math.Max(float64(options.Buf), 512))) + return &buf + }}, }, nil } @@ -91,11 +98,15 @@ func (s *TohServer) pipe(wsConn *websocket.Conn, netConn net.Conn) (lbc, rbc int wg.Add(1) go func() { defer wg.Done() - lbc, _ = io.Copy(netConn, RWWS(wsConn)) + buf := s.bufPool.Get().(*[]byte) + defer s.bufPool.Put(buf) + lbc, _ = io.CopyBuffer(netConn, RWWS(wsConn), *buf) logrus.Debugf("ws conn closed, close remote conn(%s) now", netConn.RemoteAddr().String()) netConn.Close() }() - rbc, _ = io.Copy(RWWS(wsConn), netConn) + buf := s.bufPool.Get().(*[]byte) + defer s.bufPool.Put(buf) + rbc, _ = io.CopyBuffer(RWWS(wsConn), netConn, *buf) logrus.Debugf("remote conn(%s) closed, close ws conn now", netConn.RemoteAddr().String()) wsConn.Close(websocket.StatusBadGateway, "remote close") wg.Wait()