Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track object upload status through a progress bar #697

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,16 @@ require (
github.com/spf13/cobra v1.8.1
github.com/spf13/pflag v1.0.5
github.com/stretchr/testify v1.10.0
github.com/vbauerster/mpb/v8 v8.8.3
go.uber.org/mock v0.5.0
gopkg.in/yaml.v2 v2.4.0
k8s.io/klog/v2 v2.130.1
k8s.io/utils v0.0.0-20230726121419-3b25d923346b
)

require (
github.com/VividCortex/ewma v1.2.0 // indirect
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d // indirect
github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 // indirect
github.com/atotto/clipboard v0.1.4 // indirect
github.com/aymanbagabas/go-osc52/v2 v2.0.1 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,10 @@ github.com/MakeNowJust/heredoc v1.0.0/go.mod h1:mG5amYoWBHf8vpLOuehzbGGw0EHxpZZ6
github.com/OneOfOne/xxhash v1.2.2/go.mod h1:HSdplMjZKSmBqAxg5vPj2TmRDmfkzw+cTzAElWljhcU=
github.com/OpenPeeDeeP/depguard v1.0.1/go.mod h1:xsIw86fROiiwelg+jB2uM9PiKihMMmUx/1V+TNhjQvM=
github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg=
github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow=
github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4=
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d h1:licZJFw2RwpHMqeKTCYkitsPqHNxTmd4SNR5r94FGM8=
github.com/acarl005/stripansi v0.0.0-20180116102854-5a71ef0e047d/go.mod h1:asat636LX7Bqt5lYEZ27JNDcqxfjdBQuJ/MM4CN/Lzo=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0=
github.com/apparentlymart/go-cidr v1.1.0 h1:2mAhrMoF+nhXqxTzSZMUzDHkLjmIHC+Zzn4tdgBZjnU=
Expand Down Expand Up @@ -354,6 +358,8 @@ github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyC
github.com/valyala/fasthttp v1.2.0/go.mod h1:4vX61m6KN+xDduDNwXrhIAVZaZaZiQ1luJk8LWSxF3s=
github.com/valyala/quicktemplate v1.2.0/go.mod h1:EH+4AkTd43SvgIbQHYu59/cJyxDoOVRUAfrukLPuGJ4=
github.com/valyala/tcplisten v0.0.0-20161114210144-ceec8f93295a/go.mod h1:v3UYOV9WzVtRmSR+PDvWpU/qWl4Wa5LApYYX4ZtKbio=
github.com/vbauerster/mpb/v8 v8.8.3 h1:dTOByGoqwaTJYPubhVz3lO5O6MK553XVgUo33LdnNsQ=
github.com/vbauerster/mpb/v8 v8.8.3/go.mod h1:JfCCrtcMsJwP6ZwMn9e5LMnNyp3TVNpUWWkN+nd4EWk=
github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU=
github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand Down
103 changes: 89 additions & 14 deletions pkg/client/s3client.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package client
import (
"errors"
"fmt"
"io"
"os"
"regexp"
"sync"
Expand All @@ -30,6 +31,9 @@ import (
"github.com/IBM/ibm-cos-sdk-go/service/s3"
"github.com/IBM/ibm-cos-sdk-go/service/s3/s3manager"
"github.com/IBM/platform-services-go-sdk/resourcecontrollerv2"
"github.com/vbauerster/mpb/v8"
"github.com/vbauerster/mpb/v8/decor"

"github.com/ppc64le-cloud/pvsadm/pkg"
"k8s.io/klog/v2"
"k8s.io/utils/ptr"
Expand Down Expand Up @@ -235,11 +239,37 @@ func (c *S3Client) CopyObjectToBucket(srcBucketName string, destBucketName strin
}

type CustomReader struct {
fp *os.File
size int64
read int64
signMap map[int64]struct{}
mux sync.Mutex
fp *os.File
size int64
read int64
mux sync.Mutex
progresstracker *ProgressTracker
signMap map[int64]struct{}
}

type ProgressTracker struct {
progress *mpb.Progress
bar *mpb.Bar
isBarSet bool
counter *formattedCounter
}

type formattedCounter struct {
read *int64
total int64
}

func (f *formattedCounter) Decor(stat decor.Statistics) (string, int) {
str := fmt.Sprintf("%s/%s", formatBytes(*f.read), formatBytes(f.total))
return str, len(str)
}

func (f *formattedCounter) Format(string) (string, int) {
return "", 0
}

func (f *formattedCounter) Sync() (chan int, bool) {
return nil, false
}

func (r *CustomReader) Read(p []byte) (int, error) {
Expand All @@ -249,28 +279,51 @@ func (r *CustomReader) Read(p []byte) (int, error) {
func (r *CustomReader) ReadAt(p []byte, off int64) (int, error) {
n, err := r.fp.ReadAt(p, off)
if err != nil {
if err == io.EOF {
return n, nil
}
return n, err
}
r.mux.Lock()
priyanshikhetwani marked this conversation as resolved.
Show resolved Hide resolved
if _, ok := r.signMap[off]; ok {
r.read += int64(n)
progress := int(float32(r.read*100) / float32(r.size))
fmt.Printf("\rUploading: Total read(bytes):%d progress:%d%%", r.read, progress)
r.progresstracker.counter.read = &r.read
} else {
r.signMap[off] = struct{}{}
}
r.progresstracker.bar.SetCurrent(r.read)
r.mux.Unlock()
return n, nil
}

// Format the bytes to a human-readable string
func formatBytes(size int64) string {
const (
KB = 1024
MB = KB * 1024
GB = MB * 1024
)

switch {
case size >= GB:
return fmt.Sprintf("%.2f GB", float64(size)/GB)
case size >= MB:
return fmt.Sprintf("%.2f MB", float64(size)/MB)
case size >= KB:
return fmt.Sprintf("%.2f KB", float64(size)/KB)
default:
return fmt.Sprintf("%d Bytes", size)
}
}

func (r *CustomReader) Seek(offset int64, whence int) (int64, error) {
return r.fp.Seek(offset, whence)
}

// To upload a object to S3 bucket
func (c *S3Client) UploadObject(fileName, objectName, bucketName string) error {
klog.Infof("uploading the file %s", fileName)
//Read the content of the file
klog.Infof("Uploading the file %s", fileName)
// Read the content of the file
file, err := os.Open(fileName)
if err != nil {
return fmt.Errorf("err opening file %s, err: %s", fileName, err)
Expand All @@ -280,13 +333,33 @@ func (c *S3Client) UploadObject(fileName, objectName, bucketName string) error {
fileInfo, err := file.Stat()
if err != nil {
return fmt.Errorf("failed to stat file %v, err: %v", fileName, err)

}

// Create the custom reader for file reading and progress tracking
reader := &CustomReader{
fp: file,
size: fileInfo.Size(),
signMap: map[int64]struct{}{},
}

// Initialize progress tracker
progressTracker := &ProgressTracker{
progress: mpb.New(),
counter: &formattedCounter{read: new(int64), total: reader.size},
}

bar := progressTracker.progress.AddBar(reader.size,
mpb.PrependDecorators(
decor.Name("Uploading: ", decor.WC{W: 15}),
progressTracker.counter,
),
mpb.AppendDecorators(
decor.Percentage(),
),
)
reader.progresstracker = progressTracker
reader.progresstracker.bar = bar

// Create an uploader with S3 client
uploader := s3manager.NewUploaderWithClient(c.S3Session, func(u *s3manager.Uploader) {
u.PartSize = 64 * 1024 * 1024
Expand All @@ -299,13 +372,15 @@ func (c *S3Client) UploadObject(fileName, objectName, bucketName string) error {
Body: reader,
}

// Perform an upload.
// Perform an upload
startTime := time.Now()
result, err := uploader.Upload(upParams)
if err != nil {
return err
return fmt.Errorf("upload failed: %v", err)
}
fmt.Println()
klog.Infof("Upload completed successfully in %s seconds to location %s", time.Since(startTime).Round(time.Second), result.Location)

progressTracker.progress.Wait()

klog.Infof("Upload completed successfully in %s to location %s", time.Since(startTime).Round(time.Second), result.Location)
return nil
}