From 3d0704156da33399348570e7e76c1b9c02be99c1 Mon Sep 17 00:00:00 2001 From: Fei Hu Date: Wed, 2 Mar 2022 09:23:42 -0800 Subject: [PATCH 1/2] support pipelining, key prefix strip and domain socket --- cmd/basic/main.go | 19 ++++++-- protocol.go | 117 ++++++++++++++++++++++++++++++---------------- 2 files changed, 91 insertions(+), 45 deletions(-) diff --git a/cmd/basic/main.go b/cmd/basic/main.go index ad36e90..f4d0954 100644 --- a/cmd/basic/main.go +++ b/cmd/basic/main.go @@ -3,12 +3,13 @@ package main import ( "flag" "fmt" - pcgr "github.com/dgryski/go-pcgr" - mct "github.com/dormando/mctester" "math/rand" "os" "runtime/pprof" "time" + + pcgr "github.com/dgryski/go-pcgr" + mct "github.com/feihu-stripe/mctester" ) var cpuprofile = flag.String("cpuprofile", "", "dump cpu profile to file") @@ -31,6 +32,10 @@ func main() { zipfV := flag.Float64("zipfV", float64(*keySpace/2), "zipf V value (pull below this number") valueSize := flag.Uint("valuesize", 1000, "size of value (in bytes) to store on miss") clientFlags := flag.Uint("clientflags", 0, "(32bit unsigned) client flag bits to set on miss") + pipelines := flag.Uint("pipeliness", 1, "(32bit unsigned) stack this many GET requests into the same syscall.") + server := flag.String("server", "127.0.0.1:11211", "ip and port to connect to") + socket := flag.String("socket", "", "domain socket to connect to") + stripKeyPrefix := flag.Bool("stripkeyprefix", false, "strip key prefix before comparing with response.") flag.Parse() @@ -53,7 +58,10 @@ func main() { */ bl := &BasicLoader{ - servers: []string{"127.0.0.1:11211"}, + servers: []string{*server}, + socket: *socket, + pipelines: *pipelines, + stripKeyPrefix: *stripKeyPrefix, desiredConnCount: *connCount, requestsPerSleep: *reqPerSleep, requestBundlesPerConn: *reqBundlePerConn, @@ -100,6 +108,9 @@ func main() { // - variances: how often to change item sizes type BasicLoader struct { servers []string + socket string + pipelines uint + stripKeyPrefix bool stopAfter time.Time desiredConnCount int requestsPerSleep int @@ -145,7 +156,7 @@ func (l *BasicLoader) Run() { func (l *BasicLoader) Worker(doneChan chan<- int) { // FIXME: selector. host := l.servers[0] - mc := mct.NewClient(host) + mc := mct.NewClient(host, l.socket, l.pipelines, l.keyPrefix, l.stripKeyPrefix) bundles := l.requestBundlesPerConn rs := pcgr.New(time.Now().UnixNano(), 0) diff --git a/protocol.go b/protocol.go index 4a48820..16bc87c 100644 --- a/protocol.go +++ b/protocol.go @@ -17,6 +17,7 @@ import ( "io" "net" "strconv" + "strings" "time" ) @@ -36,7 +37,14 @@ type mcConn struct { } func (c *Client) connectToMc() (*mcConn, error) { - conn, err := net.DialTimeout("tcp", c.Host, c.ConnectTimeout) + var conn net.Conn + var err error + if c.socket != "" { + conn, err = net.DialTimeout("unix", c.socket, c.ConnectTimeout) + } else { + conn, err = net.DialTimeout("tcp", c.Host, c.ConnectTimeout) + } + if err != nil { return nil, err } @@ -50,6 +58,7 @@ type Client struct { // read or write timeout NetTimeout time.Duration Host string + socket string cn *mcConn WBufSize int RBufSize int @@ -57,12 +66,20 @@ type Client struct { // binprot structure cache. binpkt *packet opaque uint32 // just for binprot? + + pipelines int + keyPrefix string + stripKeyPrefix bool } -func NewClient(host string) (client *Client) { +func NewClient(host string, socket string, pipelines uint, keyPrefix string, stripKeyPrefix bool) (client *Client) { client = &Client{ - Host: host, - binpkt: &packet{}, + Host: host, + socket: socket, + pipelines: int(pipelines), + keyPrefix: keyPrefix, + stripKeyPrefix: stripKeyPrefix, + binpkt: &packet{}, } //client.rs = rand.NewSource(time.Now().UnixNano()) return client @@ -319,57 +336,75 @@ func (c *Client) MetaDebug(key string) (err error) { ////////////////////////////////////////////// func (c *Client) Get(key string) (flags uint64, value []byte, code McCode, err error) { + pipelines := c.pipelines + // Expected key from response + respKey := key + if c.stripKeyPrefix { + respKey = strings.TrimPrefix(key, c.keyPrefix) + } + err = c.runNow(key, len(key)+6, func() error { b := c.cn.b - b.WriteString("get ") - b.WriteString(key) - b.WriteString("\r\n") - err = b.Flush() - - if err != nil { - return err + for i := 0; i < pipelines; i++ { + b.WriteString("get ") + b.WriteString(key) + b.WriteString("\r\n") } + err = b.Flush() - line, err := b.ReadBytes('\n') if err != nil { return err } - if bytes.Equal(line, []byte("END\r\n")) { - code = McMISS - } else { - parts := bytes.Split(line[:len(line)-2], []byte(" ")) - if !bytes.Equal(parts[0], []byte("VALUE")) { - // TODO: This should look for ERROR/SERVER_ERROR/etc - return ErrUnexpectedResponse - } - if len(parts) != 4 { - return ErrUnexpectedResponse - } - if !bytes.Equal(parts[1], []byte(key)) { - // FIXME: how do we embed the received vs expected in here? - // use the brand-new golang error wrapping thing? - return ErrKeyDoesNotMatch - } - flags, _ = ParseUint(parts[2]) - size, _ := ParseUint(parts[3]) - - value = make([]byte, size+2) - _, err := io.ReadFull(b, value) + for i := 0; i < pipelines; i++ { + line, err := b.ReadBytes('\n') if err != nil { return err } - if !bytes.Equal(value[len(value)-2:], []byte("\r\n")) { - return ErrCorruptValue + if bytes.Equal(line, []byte("END\r\n")) { + code = McMISS + } else { + parts := bytes.Split(line[:len(line)-2], []byte(" ")) + if !bytes.Equal(parts[0], []byte("VALUE")) { + // TODO: This should look for ERROR/SERVER_ERROR/etc + fmt.Print("Unexpected Reponse: ", string(line), "\n") + continue + } + if len(parts) != 4 { + fmt.Print("Unexpected Reponse: ", "parts not 4", "\n") + continue + } + if !bytes.Equal(parts[1], []byte(respKey)) { + fmt.Print("Unmatched Key: ", string(parts[1]), " and ", respKey, "\n") + // FIXME: how do we embed the received vs expected in here? + // use the brand-new golang error wrapping thing? + continue + } + flags, _ = ParseUint(parts[2]) + size, _ := ParseUint(parts[3]) + + value = make([]byte, size+2) + _, err := io.ReadFull(b, value) + if err != nil { + fmt.Print("io ReadFull error, return", "\n") + return err + } + + if !bytes.Equal(value[len(value)-2:], []byte("\r\n")) { + fmt.Print("Unmatched Value", "\n") + continue + } + code = McHIT + value = value[:size] + + line, err = b.ReadBytes('\n') + if !bytes.Equal(line, []byte("END\r\n")) { + fmt.Print("Unmatched Reponse: ", string(line), " is not END\r\n") + continue + } } - code = McHIT - value = value[:size] - line, err = b.ReadBytes('\n') - if !bytes.Equal(line, []byte("END\r\n")) { - return ErrUnexpectedResponse - } } return nil From 450655f53c1f39971b08335531ffcbba22d6212a Mon Sep 17 00:00:00 2001 From: Fei Hu Date: Wed, 2 Mar 2022 09:31:15 -0800 Subject: [PATCH 2/2] fix typo --- cmd/basic/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/basic/main.go b/cmd/basic/main.go index f4d0954..c183171 100644 --- a/cmd/basic/main.go +++ b/cmd/basic/main.go @@ -32,7 +32,7 @@ func main() { zipfV := flag.Float64("zipfV", float64(*keySpace/2), "zipf V value (pull below this number") valueSize := flag.Uint("valuesize", 1000, "size of value (in bytes) to store on miss") clientFlags := flag.Uint("clientflags", 0, "(32bit unsigned) client flag bits to set on miss") - pipelines := flag.Uint("pipeliness", 1, "(32bit unsigned) stack this many GET requests into the same syscall.") + pipelines := flag.Uint("pipelines", 1, "(32bit unsigned) stack this many GET requests into the same syscall.") server := flag.String("server", "127.0.0.1:11211", "ip and port to connect to") socket := flag.String("socket", "", "domain socket to connect to") stripKeyPrefix := flag.Bool("stripkeyprefix", false, "strip key prefix before comparing with response.")