diff --git a/go.mod b/go.mod index a8699f6a..6be6702e 100644 --- a/go.mod +++ b/go.mod @@ -21,6 +21,7 @@ require ( github.com/spf13/cobra v1.8.1 github.com/spf13/pflag v1.0.5 github.com/stretchr/testify v1.10.0 + github.com/vbauerster/mpb/v8 v8.8.3 go.uber.org/mock v0.5.0 gopkg.in/yaml.v2 v2.4.0 k8s.io/klog/v2 v2.130.1 @@ -28,6 +29,8 @@ require ( ) require ( + github.com/VividCortex/ewma v1.2.0 // indirect + github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d // indirect github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 // indirect github.com/atotto/clipboard v0.1.4 // indirect github.com/aymanbagabas/go-osc52/v2 v2.0.1 // indirect diff --git a/go.sum b/go.sum index 0c1f5c48..4700cd8a 100644 --- a/go.sum +++ b/go.sum @@ -13,6 +13,10 @@ github.com/MakeNowJust/heredoc v1.0.0/go.mod h1:mG5amYoWBHf8vpLOuehzbGGw0EHxpZZ6 github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU= github.com/OpenPeeDeeP/depguard v1.0.1/go.mod h1:xsIw86fROiiwelg+jB2uM9PiKihMMmUx/1V+TNhjQvM= github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg= +github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow= +github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4= +github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d h1:licZJFw2RwpHMqeKTCYkitsPqHNxTmd4SNR5r94FGM8= +github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d/go.mod h1:asat636LX7Bqt5lYEZ27JNDcqxfjdBQuJ/MM4CN/Lzo= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= github.com/apparentlymart/go-cidr v1.1.0 h1:2mAhrMoF+nhXqxTzSZMUzDHkLjmIHC+Zzn4tdgBZjnU= @@ -354,6 +358,8 @@ github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyC github.com/valyala/fasthttp v1.2.0/go.mod h1:4vX61m6KN+xDduDNwXrhIAVZaZaZiQ1luJk8LWSxF3s= github.com/valyala/quicktemplate v1.2.0/go.mod h1:EH+4AkTd43SvgIbQHYu59/cJyxDoOVRUAfrukLPuGJ4= github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio= +github.com/vbauerster/mpb/v8 v8.8.3 h1:dTOByGoqwaTJYPubhVz3lO5O6MK553XVgUo33LdnNsQ= +github.com/vbauerster/mpb/v8 v8.8.3/go.mod h1:JfCCrtcMsJwP6ZwMn9e5LMnNyp3TVNpUWWkN+nd4EWk= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= diff --git a/pkg/client/s3client.go b/pkg/client/s3client.go index 7783e62a..5bc7772d 100644 --- a/pkg/client/s3client.go +++ b/pkg/client/s3client.go @@ -17,6 +17,7 @@ package client import ( "errors" "fmt" + "io" "os" "regexp" "sync" @@ -30,6 +31,9 @@ import ( "github.com/IBM/ibm-cos-sdk-go/service/s3" "github.com/IBM/ibm-cos-sdk-go/service/s3/s3manager" "github.com/IBM/platform-services-go-sdk/resourcecontrollerv2" + "github.com/vbauerster/mpb/v8" + "github.com/vbauerster/mpb/v8/decor" + "github.com/ppc64le-cloud/pvsadm/pkg" "k8s.io/klog/v2" "k8s.io/utils/ptr" @@ -235,11 +239,37 @@ func (c *S3Client) CopyObjectToBucket(srcBucketName string, destBucketName strin } type CustomReader struct { - fp *os.File - size int64 - read int64 - signMap map[int64]struct{} - mux sync.Mutex + fp *os.File + size int64 + read int64 + mux sync.Mutex + progresstracker *ProgressTracker + signMap map[int64]struct{} +} + +type ProgressTracker struct { + progress *mpb.Progress + bar *mpb.Bar + isBarSet bool + counter *formattedCounter +} + +type formattedCounter struct { + read *int64 + total int64 +} + +func (f *formattedCounter) Decor(stat decor.Statistics) (string, int) { + str := fmt.Sprintf("%s/%s", formatBytes(*f.read), formatBytes(f.total)) + return str, len(str) +} + +func (f *formattedCounter) Format(string) (string, int) { + return "", 0 +} + +func (f *formattedCounter) Sync() (chan int, bool) { + return nil, false } func (r *CustomReader) Read(p []byte) (int, error) { @@ -249,28 +279,51 @@ func (r *CustomReader) Read(p []byte) (int, error) { func (r *CustomReader) ReadAt(p []byte, off int64) (int, error) { n, err := r.fp.ReadAt(p, off) if err != nil { + if err == io.EOF { + return n, nil + } return n, err } r.mux.Lock() if _, ok := r.signMap[off]; ok { r.read += int64(n) - progress := int(float32(r.read*100) / float32(r.size)) - fmt.Printf("\rUploading: Total read(bytes):%d progress:%d%%", r.read, progress) + r.progresstracker.counter.read = &r.read } else { r.signMap[off] = struct{}{} } + r.progresstracker.bar.SetCurrent(r.read) r.mux.Unlock() return n, nil } +// Format the bytes to a human-readable string +func formatBytes(size int64) string { + const ( + KB = 1024 + MB = KB * 1024 + GB = MB * 1024 + ) + + switch { + case size >= GB: + return fmt.Sprintf("%.2f GB", float64(size)/GB) + case size >= MB: + return fmt.Sprintf("%.2f MB", float64(size)/MB) + case size >= KB: + return fmt.Sprintf("%.2f KB", float64(size)/KB) + default: + return fmt.Sprintf("%d Bytes", size) + } +} + func (r *CustomReader) Seek(offset int64, whence int) (int64, error) { return r.fp.Seek(offset, whence) } // To upload a object to S3 bucket func (c *S3Client) UploadObject(fileName, objectName, bucketName string) error { - klog.Infof("uploading the file %s", fileName) - //Read the content of the file + klog.Infof("Uploading the file %s", fileName) + // Read the content of the file file, err := os.Open(fileName) if err != nil { return fmt.Errorf("err opening file %s, err: %s", fileName, err) @@ -280,13 +333,33 @@ func (c *S3Client) UploadObject(fileName, objectName, bucketName string) error { fileInfo, err := file.Stat() if err != nil { return fmt.Errorf("failed to stat file %v, err: %v", fileName, err) - } + + // Create the custom reader for file reading and progress tracking reader := &CustomReader{ fp: file, size: fileInfo.Size(), signMap: map[int64]struct{}{}, } + + // Initialize progress tracker + progressTracker := &ProgressTracker{ + progress: mpb.New(), + counter: &formattedCounter{read: new(int64), total: reader.size}, + } + + bar := progressTracker.progress.AddBar(reader.size, + mpb.PrependDecorators( + decor.Name("Uploading: ", decor.WC{W: 15}), + progressTracker.counter, + ), + mpb.AppendDecorators( + decor.Percentage(), + ), + ) + reader.progresstracker = progressTracker + reader.progresstracker.bar = bar + // Create an uploader with S3 client uploader := s3manager.NewUploaderWithClient(c.S3Session, func(u *s3manager.Uploader) { u.PartSize = 64 * 1024 * 1024 @@ -299,13 +372,15 @@ func (c *S3Client) UploadObject(fileName, objectName, bucketName string) error { Body: reader, } - // Perform an upload. + // Perform an upload startTime := time.Now() result, err := uploader.Upload(upParams) if err != nil { - return err + return fmt.Errorf("upload failed: %v", err) } - fmt.Println() - klog.Infof("Upload completed successfully in %s seconds to location %s", time.Since(startTime).Round(time.Second), result.Location) + + progressTracker.progress.Wait() + + klog.Infof("Upload completed successfully in %s to location %s", time.Since(startTime).Round(time.Second), result.Location) return nil }