Skip to content

Commit

Permalink
feat: add hide monitor option to upload
Browse files Browse the repository at this point in the history
  • Loading branch information
ShuhaoQing committed Aug 23, 2024
1 parent f890986 commit 9378069
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 19 deletions.
2 changes: 1 addition & 1 deletion pkg/cmd/record/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func NewCreateCommand(cfgPath *string) *cobra.Command {
}

fmt.Println("Uploading thumbnail to pre-signed url...")
um, err := upload_utils.NewUploadManagerFromConfig(pm, proj, timeout, multiOpts)
um, err := upload_utils.NewUploadManagerFromConfig(pm, proj, timeout, true, multiOpts)
if err != nil {
log.Fatalf("unable to create upload manager: %v", err)
}
Expand Down
10 changes: 6 additions & 4 deletions pkg/cmd/record/upload.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ func NewUploadCommand(cfgPath *string) *cobra.Command {
projectSlug = ""
multiOpts = &upload_utils.MultipartOpts{}
timeout time.Duration
hideMonitor = false
)

cmd := &cobra.Command{
Expand Down Expand Up @@ -82,7 +83,7 @@ func NewUploadCommand(cfgPath *string) *cobra.Command {
fmt.Printf("Uploading files to record: %s\n", recordName.RecordID)

// create minio client and upload manager first.
um, err := upload_utils.NewUploadManagerFromConfig(pm, proj, timeout, multiOpts)
um, err := upload_utils.NewUploadManagerFromConfig(pm, proj, timeout, hideMonitor, multiOpts)
if err != nil {
log.Fatalf("unable to create upload manager: %v", err)
}
Expand Down Expand Up @@ -125,6 +126,7 @@ func NewUploadCommand(cfgPath *string) *cobra.Command {
cmd.Flags().UintVarP(&multiOpts.Threads, "parallel", "P", 4, "upload number of parts in parallel")
cmd.Flags().StringVarP(&multiOpts.Size, "part-size", "s", "128Mib", "each part size")
cmd.Flags().DurationVar(&timeout, "response-timeout", 5*time.Minute, "server response time out")
cmd.Flags().BoolVar(&hideMonitor, "hide-monitor", false, "hide the upload status monitor")

return cmd
}
Expand All @@ -135,7 +137,7 @@ func generateUploadUrlBatches(fileClient api.FileInterface, filesGenerator <-cha
defer close(ret)
var files []*openv1alpha1resource.File
for f := range filesGenerator {
um.StatusMonitor.Send(upload_utils.AddFileMsg{
um.UpdateMonitor(upload_utils.AddFileMsg{
Name: f,
})
checksum, size, err := fs.CalSha256AndSize(f)
Expand All @@ -148,7 +150,7 @@ func generateUploadUrlBatches(fileClient api.FileInterface, filesGenerator <-cha
Size: size,
Sha256: checksum,
}
um.StatusMonitor.Send(upload_utils.UpdateStatusMsg{
um.UpdateMonitor(upload_utils.UpdateStatusMsg{
Name: f,
Total: size,
})
Expand All @@ -166,7 +168,7 @@ func generateUploadUrlBatches(fileClient api.FileInterface, filesGenerator <-cha
Filename: relativePath,
}.String())
if err == nil && getFileRes.Sha256 == checksum && getFileRes.Size == size {
um.StatusMonitor.Send(upload_utils.UpdateStatusMsg{
um.UpdateMonitor(upload_utils.UpdateStatusMsg{
Name: f,
Status: upload_utils.PreviouslyUploaded,
})
Expand Down
48 changes: 34 additions & 14 deletions pkg/cmd_utils/upload_utils/upload_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ type UploadManager struct {
sync.WaitGroup
}

func NewUploadManagerFromConfig(pm *config.ProfileManager, proj *name.Project, timeout time.Duration, multiOpts *MultipartOpts) (*UploadManager, error) {
func NewUploadManagerFromConfig(pm *config.ProfileManager, proj *name.Project, timeout time.Duration, hideMonitor bool, multiOpts *MultipartOpts) (*UploadManager, error) {
generateSecurityTokenRes, err := pm.SecurityTokenCli().GenerateSecurityToken(context.Background(), proj.String())
if err != nil {
return nil, errors.Wrap(err, "unable to generate security token")
Expand All @@ -85,10 +85,10 @@ func NewUploadManagerFromConfig(pm *config.ProfileManager, proj *name.Project, t
if err != nil {
return nil, errors.Wrap(err, "unable to create minio client")
}
return NewUploadManager(mc, multiOpts)
return NewUploadManager(mc, hideMonitor, multiOpts), nil
}

func NewUploadManager(client *minio.Client, opts *MultipartOpts) (*UploadManager, error) {
func NewUploadManager(client *minio.Client, hideMonitor bool, opts *MultipartOpts) *UploadManager {
um := &UploadManager{
opts: opts,
client: client,
Expand All @@ -98,20 +98,34 @@ func NewUploadManager(client *minio.Client, opts *MultipartOpts) (*UploadManager
Errs: make(map[string]error),
}

if hideMonitor {
return um
}

// statusMonitorStartSignal is to ensure status monitor is ready before sending messages.
statusMonitorStartSignal := new(sync.WaitGroup)
um.statusMonitorDoneSignal.Add(1)
um.StatusMonitor = tea.NewProgram(NewUploadStatusMonitor(statusMonitorStartSignal), tea.WithFPS(10))
go um.runUploadStatusMonitor()
statusMonitorStartSignal.Wait()

return um, nil
return um
}

func (um *UploadManager) UpdateMonitor(msg interface{}) {
if um.StatusMonitor != nil {
um.StatusMonitor.Send(msg)
}
}

func (um *UploadManager) Debugf(format string, args ...interface{}) {
if um.isDebug {
msg := fmt.Sprintf(format, args...)
um.StatusMonitor.Printf("DEBUG: %s\n", msg)
if um.StatusMonitor != nil {
um.StatusMonitor.Printf("DEBUG: %s\n", msg)
} else {
log.Debugf(msg)
}
}
}

Expand All @@ -131,13 +145,15 @@ func (um *UploadManager) runUploadStatusMonitor() {
func (um *UploadManager) Wait() {
um.WaitGroup.Wait()
time.Sleep(1 * time.Second) // Buffer time for status monitor to finish receiving messages.
um.StatusMonitor.Quit()
um.statusMonitorDoneSignal.Wait()
if um.StatusMonitor != nil {
um.StatusMonitor.Quit()
um.statusMonitorDoneSignal.Wait()
}
}

// AddErr adds an error to the manager.
func (um *UploadManager) AddErr(path string, err error) {
um.StatusMonitor.Send(UpdateStatusMsg{
um.UpdateMonitor(UpdateStatusMsg{
Name: path,
Status: UploadFailed,
})
Expand Down Expand Up @@ -216,14 +232,14 @@ func (um *UploadManager) FPutObject(absPath string, bucket string, key string, u
absPath: absPath,
monitor: um.StatusMonitor,
}
um.StatusMonitor.Send(UpdateStatusMsg{Name: absPath, Status: UploadInProgress})
um.UpdateMonitor(UpdateStatusMsg{Name: absPath, Status: UploadInProgress})
_, err = um.client.FPutObject(context.Background(), bucket, key, absPath,
minio.PutObjectOptions{Progress: progress, UserTags: userTags})
}
if err != nil {
um.AddErr(absPath, err)
} else {
um.StatusMonitor.Send(UpdateStatusMsg{Name: absPath, Status: UploadCompleted})
um.UpdateMonitor(UpdateStatusMsg{Name: absPath, Status: UploadCompleted})
}
}()
}
Expand Down Expand Up @@ -290,7 +306,7 @@ func (um *UploadManager) FMultipartPutObject(ctx context.Context, bucket string,
} else {
uploadedSize = 0
}
um.StatusMonitor.Send(UpdateStatusMsg{Name: filePath, Uploaded: uploadedSize, Status: UploadInProgress})
um.UpdateMonitor(UpdateStatusMsg{Name: filePath, Uploaded: uploadedSize, Status: UploadInProgress})
um.Debugf("Get uploaded size: %d by: %s", uploadedSize, uploadedSizeKey)

// Fetch uploaded parts
Expand Down Expand Up @@ -473,7 +489,7 @@ func (um *UploadManager) FMultipartPutObject(ctx context.Context, bucket string,
}
}

um.StatusMonitor.Send(UpdateStatusMsg{Name: filePath, Status: MultipartCompletionInProgress})
um.UpdateMonitor(UpdateStatusMsg{Name: filePath, Status: MultipartCompletionInProgress})

// Verify if we uploaded all the data.
if uploadedSize != fileSize {
Expand Down Expand Up @@ -507,7 +523,9 @@ type uploadProgressReader struct {
func (r *uploadProgressReader) Read(b []byte) (int, error) {
n := int64(len(b))
r.uploaded += n
r.monitor.Send(UpdateStatusMsg{Name: r.absPath, Uploaded: n})
if r.monitor != nil {
r.monitor.Send(UpdateStatusMsg{Name: r.absPath, Uploaded: n})
}
return int(n), nil
}

Expand All @@ -526,6 +544,8 @@ type uploadProgressSectionReader struct {

func (r *uploadProgressSectionReader) Read(b []byte) (int, error) {
n, err := r.SectionReader.Read(b)
r.monitor.Send(UpdateStatusMsg{Name: r.absPath, Uploaded: int64(n), Status: UploadInProgress})
if r.monitor != nil {
r.monitor.Send(UpdateStatusMsg{Name: r.absPath, Uploaded: int64(n), Status: UploadInProgress})
}
return n, err
}

0 comments on commit 9378069

Please sign in to comment.