Skip to content

Commit

Permalink
Implement iops write bench for controller and replica
Browse files Browse the repository at this point in the history
Signed-off-by: Shuo Wu <shuo.wu@suse.com>
  • Loading branch information
shuo-wu committed Dec 4, 2023
1 parent 10934d6 commit 95e4930
Show file tree
Hide file tree
Showing 6 changed files with 115 additions and 3 deletions.
11 changes: 11 additions & 0 deletions pkg/controller/control.go
Original file line number Diff line number Diff line change
Expand Up @@ -1173,3 +1173,14 @@ func (c *Controller) GetLatestMetics() *ptypes.Metrics {
func getAverageLatency(totalLatency, iops uint64) uint64 {
return totalLatency / iops
}

func (c *Controller) Bench(benchType string, thread int, size int64) (output string, err error) {
if size%4096 != 0 {
return "", fmt.Errorf("failed to bench volume engine with size %v, because it is not multiple of volume block size 4096", size)
}
if size > c.size {
return "", fmt.Errorf("failed to bench volume engine with size %v, because it is greater than the engine size %v", size, c.size)
}

return util.Bench(benchType, thread, size, c.WriteAt, c.ReadAt)
}
11 changes: 10 additions & 1 deletion pkg/controller/rpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,6 +179,15 @@ func (cs *ControllerServer) VolumeUnmapMarkSnapChainRemovedSet(ctx context.Conte
return cs.getVolume(), nil
}

func (cs *ControllerServer) VolumeBench(ctx context.Context, req *ptypes.VolumeBenchRequest) (*ptypes.VolumeBenchResponse, error) {
output, err := cs.c.Bench(req.BenchType, int(req.Thread), req.Size)
if err != nil {
return nil, err
}

return &ptypes.VolumeBenchResponse{Output: output}, nil
}

func (cs *ControllerServer) ReplicaList(ctx context.Context, req *empty.Empty) (*ptypes.ReplicaListReply, error) {
return &ptypes.ReplicaListReply{
Replicas: cs.listControllerReplica(),
Expand Down Expand Up @@ -234,7 +243,7 @@ func (cs *ControllerServer) ReplicaVerifyRebuild(ctx context.Context, req *ptype
}

func (cs *ControllerServer) JournalList(ctx context.Context, req *ptypes.JournalListRequest) (*empty.Empty, error) {
//ListJournal flushes operation journal (replica read/write, ping, etc.) accumulated since previous flush
// ListJournal flushes operation journal (replica read/write, ping, etc.) accumulated since previous flush
journal.PrintLimited(int(req.Limit))
return &empty.Empty{}, nil
}
Expand Down
14 changes: 12 additions & 2 deletions pkg/replica/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"encoding/json"
"fmt"
"github.com/rancher/go-fibmap"
"io"
"os"
"path"
Expand All @@ -14,8 +15,6 @@ import (
"sync"
"syscall"

"github.com/rancher/go-fibmap"

"github.com/pkg/errors"
"github.com/sirupsen/logrus"

Expand Down Expand Up @@ -1215,6 +1214,17 @@ func (r *Replica) Expand(size int64) (err error) {
return nil
}

func (r *Replica) Bench(benchType string, thread int, size int64) (output string, err error) {
if size%diskutil.VolumeSectorSize != 0 {
return "", fmt.Errorf("failed to bench volume replica with size %v, because it is not multiple of volume sector size %v", size, diskutil.VolumeSectorSize)
}
if size > r.info.Size {
return "", fmt.Errorf("failed to bench volume replica with size %v, because it is greater than the replica size %v", size, r.info.Size)
}

return util.Bench(benchType, thread, size, r.WriteAt, r.ReadAt)
}

func (r *Replica) WriteAt(buf []byte, offset int64) (int, error) {
if r.readOnly {
return 0, fmt.Errorf("cannot write on read-only replica")
Expand Down
9 changes: 9 additions & 0 deletions pkg/replica/rpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,15 @@ func (rs *ReplicaServer) ReplicaExpand(ctx context.Context, req *ptypes.ReplicaE
return &ptypes.ReplicaExpandResponse{Replica: rs.getReplica()}, nil
}

func (rs *ReplicaServer) ReplicaBench(ctx context.Context, req *ptypes.ReplicaBenchRequest) (*ptypes.ReplicaBenchResponse, error) {
output, err := rs.s.Bench(req.BenchType, int(req.Thread), req.Size)
if err != nil {
return nil, err
}

return &ptypes.ReplicaBenchResponse{Output: output}, nil
}

func (rs *ReplicaServer) DiskRemove(ctx context.Context, req *ptypes.DiskRemoveRequest) (*ptypes.DiskRemoveResponse, error) {
if err := rs.s.RemoveDiffDisk(req.Name, req.Force); err != nil {
return nil, err
Expand Down
13 changes: 13 additions & 0 deletions pkg/replica/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,19 @@ func (s *Server) Expand(size int64) error {
return s.r.Expand(size)
}

func (s *Server) Bench(benchType string, thread int, size int64) (string, error) {
s.Lock()
if s.r == nil {
s.Unlock()
return "", nil
}
s.Unlock()

logrus.Infof("Replica server starts to bench %s with %v thread, test size %v", benchType, thread, size)

return s.r.Bench(benchType, thread, size)
}

func (s *Server) RemoveDiffDisk(name string, force bool) error {
s.Lock()
defer s.Unlock()
Expand Down
60 changes: 60 additions & 0 deletions pkg/util/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package util
import (
"fmt"
"io"
"math"
"math/rand"
"net"
"net/http"
"net/url"
Expand All @@ -13,6 +15,7 @@ import (
"runtime"
"strconv"
"strings"
"sync"
"syscall"
"time"

Expand Down Expand Up @@ -316,3 +319,60 @@ func GetAddresses(volumeName, address string, dataServerProtocol types.DataServe
return "", "", "", -1, fmt.Errorf("unsupported protocol: %v", dataServerProtocol)
}
}

var letterRunes = []rune("abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ")

func RandStringRunes(n int) string {
b := make([]rune, n)
for i := range b {
b[i] = letterRunes[rand.Intn(len(letterRunes))]
}
return string(b)
}

func Bench(benchType string, thread int, size int64, writeAt, readAt func([]byte, int64) (int, error)) (output string, err error) {
lock := sync.Mutex{}

blockSize := 4096 // 4KB

blockBytes := []byte(RandStringRunes(blockSize))
ChunkSize := int(math.Ceil(float64(size) / float64(thread)))

wg := sync.WaitGroup{}
wg.Add(thread)
startTime := time.Now()
for i := 0; i < thread; i++ {
idx := i
go func() {
defer wg.Done()

start := int64(idx) * int64(ChunkSize)
end := int64(idx+1) * int64(ChunkSize)
for offset := start; offset < end; offset += int64(blockSize) {
if offset+int64(blockSize) > end {
blockBytes = blockBytes[:end-offset]
}

if _, writeErr := writeAt(blockBytes, offset); writeErr != nil {
lock.Lock()
err = writeErr
lock.Unlock()
return
}
}
}()
}
wg.Wait()

if err != nil {
return "", err
}

duration := time.Since(startTime)
switch benchType {
case "iops-write":
res := int(float64(size) / float64(blockSize) / float64(duration) * 1000000000)
output = fmt.Sprintf("instance iops write %v/s, size %v, duration %vs, thread count %v", res, size, duration.Seconds(), thread)
}
return output, nil
}

0 comments on commit 95e4930

Please sign in to comment.