Skip to content

Commit

Permalink
feat: optimize multiple upload
Browse files Browse the repository at this point in the history
  • Loading branch information
juchaosong committed Aug 20, 2024
1 parent 75ff34d commit e786109
Show file tree
Hide file tree
Showing 6 changed files with 152 additions and 32 deletions.
5 changes: 4 additions & 1 deletion pkg/cmd/record/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ func NewCreateCommand(cfgPath *string) *cobra.Command {
projectSlug = ""
labelDisplayNames []string
thumbnail = ""
multiOpts = &upload_utils.MultipartOpts{}
)

cmd := &cobra.Command{
Expand Down Expand Up @@ -96,7 +97,7 @@ func NewCreateCommand(cfgPath *string) *cobra.Command {
log.Fatalf("unable to create minio client: %v", err)
}

um, err := upload_utils.NewUploadManager(mc)
um, err := upload_utils.NewUploadManager(mc, multiOpts)
if err != nil {
log.Fatalf("Failed to create upload manager: %v", err)
}
Expand All @@ -114,6 +115,8 @@ func NewCreateCommand(cfgPath *string) *cobra.Command {
cmd.Flags().StringSliceVarP(&labelDisplayNames, "labels", "l", []string{}, "labels of the record.")
cmd.Flags().StringVarP(&projectSlug, "project", "p", "", "the slug of the working project")
cmd.Flags().StringVarP(&thumbnail, "thumbnail", "i", "", "thumbnail path of the record.")
cmd.Flags().UintVarP(&multiOpts.Threads, "parallel", "P", 4, "upload number of parts in parallel")
cmd.Flags().StringVarP(&multiOpts.Size, "part-size", "s", "128Mib", "each part size")

return cmd
}
5 changes: 4 additions & 1 deletion pkg/cmd/record/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func NewUploadCommand(cfgPath *string) *cobra.Command {
isRecursive = false
includeHidden = false
projectSlug = ""
multiOpts = &upload_utils.MultipartOpts{}
)

cmd := &cobra.Command{
Expand Down Expand Up @@ -94,7 +95,7 @@ func NewUploadCommand(cfgPath *string) *cobra.Command {
if err != nil {
log.Fatalf("unable to create minio client: %v", err)
}
um, err := upload_utils.NewUploadManager(mc)
um, err := upload_utils.NewUploadManager(mc, multiOpts)
if err != nil {
log.Fatalf("unable to create upload manager: %v", err)
}
Expand Down Expand Up @@ -141,6 +142,8 @@ func NewUploadCommand(cfgPath *string) *cobra.Command {
cmd.Flags().BoolVarP(&isRecursive, "recursive", "R", false, "upload files in the current directory recursively")
cmd.Flags().BoolVarP(&includeHidden, "include-hidden", "H", false, "include hidden files (\"dot\" files) in the upload")
cmd.Flags().StringVarP(&projectSlug, "project", "p", "", "the slug of the working project")
cmd.Flags().UintVarP(&multiOpts.Threads, "parallel", "P", 4, "upload number of parts in parallel")
cmd.Flags().StringVarP(&multiOpts.Size, "part-size", "s", "128Mib", "each part size")

return cmd
}
Expand Down
13 changes: 12 additions & 1 deletion pkg/cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,13 +32,23 @@ import (
)

func NewCommand() *cobra.Command {
cfgPath := ""
var (
cfgPath string
logLevel string
)

cmd := &cobra.Command{
Use: constants.CLIName,
Short: "",
Version: cocli.GetVersion(),
PersistentPreRun: func(cmd *cobra.Command, args []string) {
log.SetLevel(log.DebugLevel)
level, err := log.ParseLevel(logLevel)
if err != nil {
log.Fatalf("Log level is invalid, should one of trace|debug|info|warn|error.")
}
log.SetLevel(level)

// check if cfgPath exists and isFile
cfgPathInfo, err := os.Stat(cfgPath)

Expand Down Expand Up @@ -90,6 +100,7 @@ func NewCommand() *cobra.Command {
}

cmd.PersistentFlags().StringVar(&cfgPath, "config", constants.DefaultConfigPath, "config file path")
cmd.PersistentFlags().StringVar(&logLevel, "log-level", "info", "log level, one of: trace|debug|info|warn|error")

cmd.AddCommand(NewCompletionCommand())
cmd.AddCommand(action.NewRootCommand(&cfgPath))
Expand Down
47 changes: 47 additions & 0 deletions pkg/cmd_utils/upload_utils/heap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
package upload_utils

import (
"container/heap"
)

// An IntHeap is a min-heap of ints.
type IntHeap []int

func NewHeap(s []int) *IntHeap {
h := IntHeap(s)
heap.Init(&h)
return &h
}

func (h IntHeap) Len() int { return len(h) }
func (h IntHeap) Less(i, j int) bool { return h[i] < h[j] }
func (h IntHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }

func (h *IntHeap) Push(x any) {
*h = append(*h, x.(int))
}

func (h *IntHeap) Pop() any {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}

func (h *IntHeap) Peek() int {
return (*h)[0]
}

func (h *IntHeap) Remove(x any) {
toRemove := x.(int)
for i := 0; i < len(*h); i++ {
n := (*h)[i]
if n == toRemove {
(*h)[i], (*h)[len(*h)-1] = (*h)[len(*h)-1], (*h)[i]
(*h) = (*h)[:len(*h)-1]
i--
}
}
heap.Init(h)
}
29 changes: 29 additions & 0 deletions pkg/cmd_utils/upload_utils/opt.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package upload_utils

import (
"github.com/dustin/go-humanize"
"github.com/pkg/errors"
)

var (
defaultPartSize = uint64(1024 * 1024 * 128)
)

type MultipartOpts struct {
Threads uint
Size string
}

func (opt *MultipartOpts) Valid() error {
if _, err := opt.partSize(); err != nil {
return errors.Wrap(err, "parse part size")
}
return nil
}

func (opt *MultipartOpts) partSize() (uint64, error) {
if len(opt.Size) == 0 {
return defaultPartSize, nil
}
return humanize.ParseBytes(opt.Size)
}
85 changes: 56 additions & 29 deletions pkg/cmd_utils/upload_utils/upload_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ const (
uploadIdKeyTemplate = "STORE-KEY-UPLOAD-ID-%s"
uploadedSizeKeyTemplate = "STORE-KEY-UPLOADED-SIZE-%s"
partsKeyTemplate = "STORE-KEY-PARTS-%s"
minPartSize = 1024 * 1024 * 16 // 16MiB
maxSinglePutObjectSize = 1024 * 1024 * 1024 * 500 // 5GiB
maxSinglePutObjectSize = 1024 * 1024 * 1024 * 500 // 500GiB
defaultWindowSize = 1024 * 1024 * 1024 // 1GiB
uploadDBRelativePath = ".cocli.uploader.db"
)

Expand All @@ -55,6 +55,7 @@ type FileInfo struct {
// UploadManager is a manager for uploading files through minio client.
// Note that it's user's responsibility to check the Errs field after Wait() to see if there's any error.
type UploadManager struct {
opts *MultipartOpts
db *leveldb.DB
client *minio.Client
uploadProgressChan chan UpdateStatusMsg
Expand All @@ -65,7 +66,7 @@ type UploadManager struct {
sync.WaitGroup
}

func NewUploadManager(client *minio.Client) (*UploadManager, error) {
func NewUploadManager(client *minio.Client, opts *MultipartOpts) (*UploadManager, error) {
// init db
homeDir, err := os.UserHomeDir()
if err != nil {
Expand All @@ -78,6 +79,7 @@ func NewUploadManager(client *minio.Client) (*UploadManager, error) {
}

um := &UploadManager{
opts: opts,
uploadProgressChan: make(chan UpdateStatusMsg, 10),
db: uploadDB,
client: client,
Expand Down Expand Up @@ -143,12 +145,17 @@ func (um *UploadManager) FPutObject(absPath string, bucket string, key string, u
um.Add(1)
go func() {
defer um.Done()
um.client.TraceOn(log.StandardLogger().WriterLevel(log.DebugLevel))
um.client.TraceOn(log.StandardLogger().WriterLevel(log.TraceLevel))

var err error
if fileInfo.Size > int64(minPartSize) {
size, err := um.opts.partSize()
if err != nil {
um.AddErr(absPath, err)
return
}

if fileInfo.Size > int64(size) {
err = um.FMultipartPutObject(context.Background(), bucket, key,
absPath, fileInfo.Size, minio.PutObjectOptions{UserTags: userTags})
absPath, fileInfo.Size, minio.PutObjectOptions{UserTags: userTags, PartSize: size, NumThreads: um.opts.Threads})
} else {
progress := newUploadProgressReader(absPath, fileInfo.Size, um.uploadProgressChan)
um.StatusMonitor.Send(UpdateStatusMsg{Name: absPath, Status: UploadInProgress})
Expand Down Expand Up @@ -247,38 +254,33 @@ func (um *UploadManager) FMultipartPutObject(ctx context.Context, bucket string,
}
}

if opts.PartSize == 0 {
opts.PartSize = minPartSize
}

// Calculate the optimal parts info for a given size.
totalPartsCount, partSize, lastPartSize, err := minio.OptimalPartInfo(fileSize, opts.PartSize)
if err != nil {
return errors.Wrap(err, "Optimal part info failed")
}
log.Debugf("Total part: %v, part size: %v, last part size: %v", totalPartsCount, partSize, lastPartSize)

// Declare a channel that sends the next part number to be uploaded.
uploadPartsCh := make(chan int)
uploadPartsCh := make(chan int, opts.NumThreads)
// Declare a channel that sends back the response of a part upload.
uploadedPartsCh := make(chan uploadedPartRes)
uploadedPartsCh := make(chan uploadedPartRes, opts.NumThreads)
// Used for readability, lastPartNumber is always totalPartsCount.
lastPartNumber := totalPartsCount

// Send each part number to the channel to be processed.
go func() {
defer close(uploadPartsCh)
for p := 1; p <= totalPartsCount; p++ {
if slices.Contains(partNumbers, p) {
log.Debugf("Part: %d already uploaded", p)
continue
}
log.Debugf("Part: %d need to upload", p)
uploadPartsCh <- p
curPart := 1
uploadingParts := NewHeap(make([]int, 0, opts.NumThreads))
for curPart <= totalPartsCount && uploadingParts.Len() < int(opts.NumThreads) {
if slices.Contains(partNumbers, curPart) {
log.Debugf("Part: %d already uploaded", curPart)
curPart++
continue
}
}()

if opts.NumThreads == 0 {
opts.NumThreads = 4
log.Debugf("Part: %d need to upload", curPart)
uploadingParts.Push(curPart)
uploadPartsCh <- curPart
curPart++
}

// Get reader of the file to be uploaded.
Expand Down Expand Up @@ -329,16 +331,16 @@ func (um *UploadManager) FMultipartPutObject(ctx context.Context, bucket string,
}()
}

// Gather the responses as they occur and update any progress bar
numToUpload := totalPartsCount - len(partNumbers)
for m := 1; m <= numToUpload; m++ {
upload:
for {
select {
case <-ctx.Done():
return ctx.Err()
case uploadRes := <-uploadedPartsCh:
if uploadRes.Error != nil {
return uploadRes.Error
}

// Update the uploadedSize.
uploadedSize += uploadRes.Part.Size
parts = append(parts, minio.CompletePart{
Expand All @@ -363,6 +365,31 @@ func (um *UploadManager) FMultipartPutObject(ctx context.Context, bucket string,
log.Errorf("Store uploaded parts err: %v", err)
}
um.uploadProgressChan <- UpdateStatusMsg{Name: filePath, Uploaded: uploadedSize}

uploadingParts.Remove(uploadRes.Part.PartNumber)
if curPart > totalPartsCount {
if uploadingParts.Len() > 0 {
continue
} else {
close(uploadPartsCh)
break upload
}
} else {
windowSize := defaultWindowSize
// Make sure at least one part is uploading.
if windowSize < int(opts.PartSize) {
windowSize = int(opts.PartSize)
}
for ; curPart <= totalPartsCount && uploadingParts.Len() < int(opts.NumThreads) && (curPart-uploadingParts.Peek())*int(opts.PartSize) <= windowSize; curPart++ {
if slices.Contains(partNumbers, curPart) {
log.Debugf("Part: %d already uploaded", curPart)
continue
}
log.Debugf("Part: %d need to upload", curPart)
uploadingParts.Push(curPart)
uploadPartsCh <- curPart
}
}
}
}

Expand Down

0 comments on commit e786109

Please sign in to comment.