diff --git a/pkg/util/util.go b/pkg/util/util.go index 87b2f1ca4..9035cc5e9 100644 --- a/pkg/util/util.go +++ b/pkg/util/util.go @@ -310,7 +310,13 @@ func RandStringRunes(n int) string { } func Bench(benchType string, thread int, size int64, writeAt, readAt func([]byte, int64) (int, error)) (output string, err error) { - lock := sync.Mutex{} + benchTypeInList := strings.Split(benchType, "-") + if len(benchTypeInList) != 3 || + (benchTypeInList[0] != "seq" && benchTypeInList[0] != "rand") || + (benchTypeInList[1] != "iops" && benchTypeInList[1] != "bandwidth" && benchTypeInList[1] != "latency") || + (benchTypeInList[2] != "read" && benchTypeInList[2] != "write") { + return "", fmt.Errorf("invalid bench type %s", benchType) + } if thread != 1 && strings.Contains(benchType, "-latency-") { logrus.Warnf("Using single thread for latency related benchmark") @@ -322,41 +328,92 @@ func Bench(benchType string, thread int, size int64, writeAt, readAt func([]byte blockSize = 1 << 20 // 1MB } - blockBytes := []byte(RandStringRunes(blockSize)) + var duration time.Duration + + // Prepare data before read + if benchTypeInList[2] == "read" { + // Typically 4-thread write is enough + if _, err := dataIOWithMultipleThread(false, 4, 1<<20, size, writeAt); err != nil { + return "", err + } + + if duration, err = dataIOWithMultipleThread(benchTypeInList[0] == "rand", thread, blockSize, size, readAt); err != nil { + return "", err + } + } + + if benchTypeInList[2] == "write" { + if duration, err = dataIOWithMultipleThread(benchTypeInList[0] == "rand", thread, blockSize, size, writeAt); err != nil { + return "", err + } + } + + switch benchTypeInList[1] { + case "iops": + res := int(float64(size) / float64(blockSize) / float64(duration) * 1000000000) + output = fmt.Sprintf("instance %s %v/s, size %v, duration %vs, thread count %v", benchType, res, size, duration.Seconds(), thread) + case "bandwidth": + res := int(float64(size) / float64(duration) * 1000000000 / float64(1<<10)) + output = fmt.Sprintf("instance %s %vKB/s, size %v, duration %vs, thread count %v", benchType, res, size, duration.Seconds(), thread) + case "latency": + res := float64(duration) / 1000 / (float64(size) / float64(blockSize)) + output = fmt.Sprintf("instance %s %.2fus, size %v, duration %vs, thread count %v", benchType, res, size, duration.Seconds(), thread) + } + return output, nil +} + +func dataIOWithMultipleThread(isRandomIO bool, thread, blockSize int, size int64, ioAt func([]byte, int64) (int, error)) (duration time.Duration, err error) { + lock := sync.Mutex{} + chunkSize := int(math.Ceil(float64(size) / float64(thread))) chunkBlocks := int(math.Ceil(float64(chunkSize) / float64(blockSize))) + var sequenceList []int + if isRandomIO { + sequenceList = make([]int, chunkBlocks) + for i := 0; i < chunkBlocks; i++ { + sequenceList[i] = i + } + rand.Shuffle(chunkBlocks, func(i, j int) { sequenceList[i], sequenceList[j] = sequenceList[j], sequenceList[i] }) + } + + if chunkSize < blockSize { + return 0, fmt.Errorf("the io thread count is too much so that each thread cannot operate a single block") + } wg := sync.WaitGroup{} wg.Add(thread) + startTime := time.Now() + defer func() { + duration = time.Since(startTime) + }() + for i := 0; i < thread; i++ { idx := i go func() { defer wg.Done() + // Ignore this randomly generate data if the ioAt is readAt + blockBytes := []byte(RandStringRunes(blockSize)) + start := int64(idx) * int64(chunkSize) end := int64(idx+1) * int64(chunkSize) offset := start for cnt := 0; cnt < chunkBlocks; cnt++ { - if strings.HasPrefix(benchType, "seq-") { - offset = start + int64(cnt*blockSize) - if offset+int64(blockSize) > end { - blockBytes = blockBytes[:end-offset] - } - } else if strings.HasPrefix(benchType, "rand-") { - offset = start + int64(rand.Intn(cnt)*blockSize) + if isRandomIO { + offset = start + int64(sequenceList[cnt]*blockSize) if offset+int64(blockSize) > end { offset -= int64(blockSize) } } else { - lock.Lock() - err = fmt.Errorf("invalid bench type %s", benchType) - lock.Unlock() - return + offset = start + int64(cnt*blockSize) + if offset+int64(blockSize) > end { + blockBytes = blockBytes[:end-offset] + } } - if _, writeErr := writeAt(blockBytes, offset); writeErr != nil { + if _, ioErr := ioAt(blockBytes, offset); ioErr != nil { lock.Lock() - err = writeErr + err = ioErr lock.Unlock() return } @@ -365,27 +422,5 @@ func Bench(benchType string, thread int, size int64, writeAt, readAt func([]byte } wg.Wait() - if err != nil { - return "", err - } - - duration := time.Since(startTime) - switch benchType { - case "seq-iops-write": - fallthrough - case "rand-iops-write": - res := int(float64(size) / float64(blockSize) / float64(duration) * 1000000000) - output = fmt.Sprintf("instance %s %v/s, size %v, duration %vs, thread count %v", benchType, res, size, duration.Seconds(), thread) - case "seq-bandwidth-write": - fallthrough - case "rand-bandwidth-write": - res := int(float64(size) / float64(duration) * 1000000000 / float64(1<<10)) - output = fmt.Sprintf("instance %s %vKB/s, size %v, duration %vs, thread count %v", benchType, res, size, duration.Seconds(), thread) - case "seq-latency-write": - fallthrough - case "rand-latency-write": - res := float64(duration) / 1000 / (float64(size) / float64(blockSize)) - output = fmt.Sprintf("instance %s %.2fus, size %v, duration %vs, thread count %v", benchType, res, size, duration.Seconds(), thread) - } - return output, nil + return }