From e786109307a95ca53a2f2a5e27cb6f416dd4aee7 Mon Sep 17 00:00:00 2001 From: Song Juchao Date: Tue, 20 Aug 2024 11:21:32 +0800 Subject: [PATCH] feat: optimize multiple upload --- pkg/cmd/record/create.go | 5 +- pkg/cmd/record/upload.go | 5 +- pkg/cmd/root.go | 13 ++- pkg/cmd_utils/upload_utils/heap.go | 47 +++++++++++ pkg/cmd_utils/upload_utils/opt.go | 29 +++++++ pkg/cmd_utils/upload_utils/upload_manager.go | 85 +++++++++++++------- 6 files changed, 152 insertions(+), 32 deletions(-) create mode 100644 pkg/cmd_utils/upload_utils/heap.go create mode 100644 pkg/cmd_utils/upload_utils/opt.go diff --git a/pkg/cmd/record/create.go b/pkg/cmd/record/create.go index c9e6801..d043459 100644 --- a/pkg/cmd/record/create.go +++ b/pkg/cmd/record/create.go @@ -36,6 +36,7 @@ func NewCreateCommand(cfgPath *string) *cobra.Command { projectSlug = "" labelDisplayNames []string thumbnail = "" + multiOpts = &upload_utils.MultipartOpts{} ) cmd := &cobra.Command{ @@ -96,7 +97,7 @@ func NewCreateCommand(cfgPath *string) *cobra.Command { log.Fatalf("unable to create minio client: %v", err) } - um, err := upload_utils.NewUploadManager(mc) + um, err := upload_utils.NewUploadManager(mc, multiOpts) if err != nil { log.Fatalf("Failed to create upload manager: %v", err) } @@ -114,6 +115,8 @@ func NewCreateCommand(cfgPath *string) *cobra.Command { cmd.Flags().StringSliceVarP(&labelDisplayNames, "labels", "l", []string{}, "labels of the record.") cmd.Flags().StringVarP(&projectSlug, "project", "p", "", "the slug of the working project") cmd.Flags().StringVarP(&thumbnail, "thumbnail", "i", "", "thumbnail path of the record.") + cmd.Flags().UintVarP(&multiOpts.Threads, "parallel", "P", 4, "upload number of parts in parallel") + cmd.Flags().StringVarP(&multiOpts.Size, "part-size", "s", "128Mib", "each part size") return cmd } diff --git a/pkg/cmd/record/upload.go b/pkg/cmd/record/upload.go index ebfbec6..e005d73 100644 --- a/pkg/cmd/record/upload.go +++ b/pkg/cmd/record/upload.go @@ -44,6 +44,7 @@ func NewUploadCommand(cfgPath *string) *cobra.Command { isRecursive = false includeHidden = false projectSlug = "" + multiOpts = &upload_utils.MultipartOpts{} ) cmd := &cobra.Command{ @@ -94,7 +95,7 @@ func NewUploadCommand(cfgPath *string) *cobra.Command { if err != nil { log.Fatalf("unable to create minio client: %v", err) } - um, err := upload_utils.NewUploadManager(mc) + um, err := upload_utils.NewUploadManager(mc, multiOpts) if err != nil { log.Fatalf("unable to create upload manager: %v", err) } @@ -141,6 +142,8 @@ func NewUploadCommand(cfgPath *string) *cobra.Command { cmd.Flags().BoolVarP(&isRecursive, "recursive", "R", false, "upload files in the current directory recursively") cmd.Flags().BoolVarP(&includeHidden, "include-hidden", "H", false, "include hidden files (\"dot\" files) in the upload") cmd.Flags().StringVarP(&projectSlug, "project", "p", "", "the slug of the working project") + cmd.Flags().UintVarP(&multiOpts.Threads, "parallel", "P", 4, "upload number of parts in parallel") + cmd.Flags().StringVarP(&multiOpts.Size, "part-size", "s", "128Mib", "each part size") return cmd } diff --git a/pkg/cmd/root.go b/pkg/cmd/root.go index 9aecae5..5b7f9fd 100644 --- a/pkg/cmd/root.go +++ b/pkg/cmd/root.go @@ -32,13 +32,23 @@ import ( ) func NewCommand() *cobra.Command { - cfgPath := "" + var ( + cfgPath string + logLevel string + ) cmd := &cobra.Command{ Use: constants.CLIName, Short: "", Version: cocli.GetVersion(), PersistentPreRun: func(cmd *cobra.Command, args []string) { + log.SetLevel(log.DebugLevel) + level, err := log.ParseLevel(logLevel) + if err != nil { + log.Fatalf("Log level is invalid, should one of trace|debug|info|warn|error.") + } + log.SetLevel(level) + // check if cfgPath exists and isFile cfgPathInfo, err := os.Stat(cfgPath) @@ -90,6 +100,7 @@ func NewCommand() *cobra.Command { } cmd.PersistentFlags().StringVar(&cfgPath, "config", constants.DefaultConfigPath, "config file path") + cmd.PersistentFlags().StringVar(&logLevel, "log-level", "info", "log level, one of: trace|debug|info|warn|error") cmd.AddCommand(NewCompletionCommand()) cmd.AddCommand(action.NewRootCommand(&cfgPath)) diff --git a/pkg/cmd_utils/upload_utils/heap.go b/pkg/cmd_utils/upload_utils/heap.go new file mode 100644 index 0000000..c2c685a --- /dev/null +++ b/pkg/cmd_utils/upload_utils/heap.go @@ -0,0 +1,47 @@ +package upload_utils + +import ( + "container/heap" +) + +// An IntHeap is a min-heap of ints. +type IntHeap []int + +func NewHeap(s []int) *IntHeap { + h := IntHeap(s) + heap.Init(&h) + return &h +} + +func (h IntHeap) Len() int { return len(h) } +func (h IntHeap) Less(i, j int) bool { return h[i] < h[j] } +func (h IntHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } + +func (h *IntHeap) Push(x any) { + *h = append(*h, x.(int)) +} + +func (h *IntHeap) Pop() any { + old := *h + n := len(old) + x := old[n-1] + *h = old[0 : n-1] + return x +} + +func (h *IntHeap) Peek() int { + return (*h)[0] +} + +func (h *IntHeap) Remove(x any) { + toRemove := x.(int) + for i := 0; i < len(*h); i++ { + n := (*h)[i] + if n == toRemove { + (*h)[i], (*h)[len(*h)-1] = (*h)[len(*h)-1], (*h)[i] + (*h) = (*h)[:len(*h)-1] + i-- + } + } + heap.Init(h) +} diff --git a/pkg/cmd_utils/upload_utils/opt.go b/pkg/cmd_utils/upload_utils/opt.go new file mode 100644 index 0000000..c6f1760 --- /dev/null +++ b/pkg/cmd_utils/upload_utils/opt.go @@ -0,0 +1,29 @@ +package upload_utils + +import ( + "github.com/dustin/go-humanize" + "github.com/pkg/errors" +) + +var ( + defaultPartSize = uint64(1024 * 1024 * 128) +) + +type MultipartOpts struct { + Threads uint + Size string +} + +func (opt *MultipartOpts) Valid() error { + if _, err := opt.partSize(); err != nil { + return errors.Wrap(err, "parse part size") + } + return nil +} + +func (opt *MultipartOpts) partSize() (uint64, error) { + if len(opt.Size) == 0 { + return defaultPartSize, nil + } + return humanize.ParseBytes(opt.Size) +} diff --git a/pkg/cmd_utils/upload_utils/upload_manager.go b/pkg/cmd_utils/upload_utils/upload_manager.go index 8f4ca4a..694e4ed 100644 --- a/pkg/cmd_utils/upload_utils/upload_manager.go +++ b/pkg/cmd_utils/upload_utils/upload_manager.go @@ -40,8 +40,8 @@ const ( uploadIdKeyTemplate = "STORE-KEY-UPLOAD-ID-%s" uploadedSizeKeyTemplate = "STORE-KEY-UPLOADED-SIZE-%s" partsKeyTemplate = "STORE-KEY-PARTS-%s" - minPartSize = 1024 * 1024 * 16 // 16MiB - maxSinglePutObjectSize = 1024 * 1024 * 1024 * 500 // 5GiB + maxSinglePutObjectSize = 1024 * 1024 * 1024 * 500 // 500GiB + defaultWindowSize = 1024 * 1024 * 1024 // 1GiB uploadDBRelativePath = ".cocli.uploader.db" ) @@ -55,6 +55,7 @@ type FileInfo struct { // UploadManager is a manager for uploading files through minio client. // Note that it's user's responsibility to check the Errs field after Wait() to see if there's any error. type UploadManager struct { + opts *MultipartOpts db *leveldb.DB client *minio.Client uploadProgressChan chan UpdateStatusMsg @@ -65,7 +66,7 @@ type UploadManager struct { sync.WaitGroup } -func NewUploadManager(client *minio.Client) (*UploadManager, error) { +func NewUploadManager(client *minio.Client, opts *MultipartOpts) (*UploadManager, error) { // init db homeDir, err := os.UserHomeDir() if err != nil { @@ -78,6 +79,7 @@ func NewUploadManager(client *minio.Client) (*UploadManager, error) { } um := &UploadManager{ + opts: opts, uploadProgressChan: make(chan UpdateStatusMsg, 10), db: uploadDB, client: client, @@ -143,12 +145,17 @@ func (um *UploadManager) FPutObject(absPath string, bucket string, key string, u um.Add(1) go func() { defer um.Done() - um.client.TraceOn(log.StandardLogger().WriterLevel(log.DebugLevel)) + um.client.TraceOn(log.StandardLogger().WriterLevel(log.TraceLevel)) - var err error - if fileInfo.Size > int64(minPartSize) { + size, err := um.opts.partSize() + if err != nil { + um.AddErr(absPath, err) + return + } + + if fileInfo.Size > int64(size) { err = um.FMultipartPutObject(context.Background(), bucket, key, - absPath, fileInfo.Size, minio.PutObjectOptions{UserTags: userTags}) + absPath, fileInfo.Size, minio.PutObjectOptions{UserTags: userTags, PartSize: size, NumThreads: um.opts.Threads}) } else { progress := newUploadProgressReader(absPath, fileInfo.Size, um.uploadProgressChan) um.StatusMonitor.Send(UpdateStatusMsg{Name: absPath, Status: UploadInProgress}) @@ -247,38 +254,33 @@ func (um *UploadManager) FMultipartPutObject(ctx context.Context, bucket string, } } - if opts.PartSize == 0 { - opts.PartSize = minPartSize - } - // Calculate the optimal parts info for a given size. totalPartsCount, partSize, lastPartSize, err := minio.OptimalPartInfo(fileSize, opts.PartSize) if err != nil { return errors.Wrap(err, "Optimal part info failed") } + log.Debugf("Total part: %v, part size: %v, last part size: %v", totalPartsCount, partSize, lastPartSize) // Declare a channel that sends the next part number to be uploaded. - uploadPartsCh := make(chan int) + uploadPartsCh := make(chan int, opts.NumThreads) // Declare a channel that sends back the response of a part upload. - uploadedPartsCh := make(chan uploadedPartRes) + uploadedPartsCh := make(chan uploadedPartRes, opts.NumThreads) // Used for readability, lastPartNumber is always totalPartsCount. lastPartNumber := totalPartsCount - // Send each part number to the channel to be processed. - go func() { - defer close(uploadPartsCh) - for p := 1; p <= totalPartsCount; p++ { - if slices.Contains(partNumbers, p) { - log.Debugf("Part: %d already uploaded", p) - continue - } - log.Debugf("Part: %d need to upload", p) - uploadPartsCh <- p + curPart := 1 + uploadingParts := NewHeap(make([]int, 0, opts.NumThreads)) + for curPart <= totalPartsCount && uploadingParts.Len() < int(opts.NumThreads) { + if slices.Contains(partNumbers, curPart) { + log.Debugf("Part: %d already uploaded", curPart) + curPart++ + continue } - }() - if opts.NumThreads == 0 { - opts.NumThreads = 4 + log.Debugf("Part: %d need to upload", curPart) + uploadingParts.Push(curPart) + uploadPartsCh <- curPart + curPart++ } // Get reader of the file to be uploaded. @@ -329,9 +331,8 @@ func (um *UploadManager) FMultipartPutObject(ctx context.Context, bucket string, }() } - // Gather the responses as they occur and update any progress bar - numToUpload := totalPartsCount - len(partNumbers) - for m := 1; m <= numToUpload; m++ { +upload: + for { select { case <-ctx.Done(): return ctx.Err() @@ -339,6 +340,7 @@ func (um *UploadManager) FMultipartPutObject(ctx context.Context, bucket string, if uploadRes.Error != nil { return uploadRes.Error } + // Update the uploadedSize. uploadedSize += uploadRes.Part.Size parts = append(parts, minio.CompletePart{ @@ -363,6 +365,31 @@ func (um *UploadManager) FMultipartPutObject(ctx context.Context, bucket string, log.Errorf("Store uploaded parts err: %v", err) } um.uploadProgressChan <- UpdateStatusMsg{Name: filePath, Uploaded: uploadedSize} + + uploadingParts.Remove(uploadRes.Part.PartNumber) + if curPart > totalPartsCount { + if uploadingParts.Len() > 0 { + continue + } else { + close(uploadPartsCh) + break upload + } + } else { + windowSize := defaultWindowSize + // Make sure at least one part is uploading. + if windowSize < int(opts.PartSize) { + windowSize = int(opts.PartSize) + } + for ; curPart <= totalPartsCount && uploadingParts.Len() < int(opts.NumThreads) && (curPart-uploadingParts.Peek())*int(opts.PartSize) <= windowSize; curPart++ { + if slices.Contains(partNumbers, curPart) { + log.Debugf("Part: %d already uploaded", curPart) + continue + } + log.Debugf("Part: %d need to upload", curPart) + uploadingParts.Push(curPart) + uploadPartsCh <- curPart + } + } } }