diff --git a/client/common/constants.go b/client/common/constants.go index aae8f10..6f9bc64 100644 --- a/client/common/constants.go +++ b/client/common/constants.go @@ -1,7 +1,12 @@ package common import ( + "fmt" + "log" "os" + "os/exec" + "strconv" + "strings" "time" ) @@ -14,7 +19,7 @@ const ( MB // GB is gigabytes GB - // TB is terrabytes + // TB is terabytes TB ) const ( @@ -71,12 +76,12 @@ const ( HeaderContentType = "Content-Type" MIMEApplicationJSON = "application/json" - // FileSizeLimit is the maximun single file size for non-multipart upload (5GB) + // FileSizeLimit is the maximum single file size for non-multipart upload (5GB) FileSizeLimit = 5 * GB - // MultipartFileSizeLimit is the maximun single file size for multipart upload (5TB) + // MultipartFileSizeLimit is the maximum single file size for multipart upload (5TB) MultipartFileSizeLimit = 5 * TB - MinMultipartChunkSize = 5 * MB + MinMultipartChunkSize = 10 * MB // MaxRetryCount is the maximum retry number per record MaxRetryCount = 5 @@ -85,5 +90,47 @@ const ( MaxMultipartParts = 10000 MaxConcurrentUploads = 10 MaxRetries = 5 - MinChunkSize = 5 * 1024 * 1024 ) + +var ( + // MinChunkSize is configurable via git config and initialized in init() + MinChunkSize int64 +) + +func init() { + v, err := GetLfsCustomTransferInt("lfs.customtransfer.drs.multipart-min-chunk-size", 10) + if err != nil { + log.Printf("Warning: Could not read git config for multipart-min-chunk-size, using default (10 MB): %v\n", err) + MinChunkSize = int64(10) * MB + return + } + + MinChunkSize = int64(v) * MB +} + +func GetLfsCustomTransferInt(key string, defaultValue int64) (int64, error) { + defaultText := strconv.FormatInt(defaultValue, 10) + // TODO cache or get all the configs at once? + cmd := exec.Command("git", "config", "--get", "--default", defaultText, key) + output, err := cmd.Output() + if err != nil { + return defaultValue, fmt.Errorf("error reading git config %s: %v", key, err) + } + + value := strings.TrimSpace(string(output)) + + parsed, err := strconv.ParseInt(value, 10, 64) + if err != nil { + return defaultValue, fmt.Errorf("invalid int value for %s: %q", key, value) + } + + if parsed < 0 { + return defaultValue, fmt.Errorf("invalid negative int value for %s: %d", key, parsed) + } + + if parsed == 0 || parsed > 500 { + return defaultValue, fmt.Errorf("invalid int value for %s: %d. Must be between 1 and 500", key, parsed) + } + + return parsed, nil +} diff --git a/client/common/constants_test.go b/client/common/constants_test.go new file mode 100644 index 0000000..8eed0e0 --- /dev/null +++ b/client/common/constants_test.go @@ -0,0 +1,110 @@ +package common + +import ( + "os" + "os/exec" + "path/filepath" + "testing" +) + +func TestGetLfsCustomTransferInt(t *testing.T) { + configDir := t.TempDir() + configPath := filepath.Join(configDir, "gitconfig") + + setConfig := func(t *testing.T, key, value string) { + t.Helper() + cmd := exec.Command("git", "config", "--file", configPath, key, value) + if err := cmd.Run(); err != nil { + t.Fatalf("set git config %s=%s: %v", key, value, err) + } + } + + setEnv := func(t *testing.T) { + t.Helper() + t.Setenv("GIT_CONFIG_GLOBAL", configPath) + t.Setenv("GIT_CONFIG_SYSTEM", os.DevNull) + t.Setenv("GIT_CONFIG_NOSYSTEM", "1") + } + + const key = "lfs.customtransfer.drs.multipart-min-chunk-size" + + tests := []struct { + name string + value string + defaultVal int64 + want int64 + wantErr bool + setValue bool + }{ + { + name: "missing uses default", + defaultVal: 10, + want: 10, + wantErr: false, + setValue: false, + }, + { + name: "valid value", + value: "25", + defaultVal: 10, + want: 25, + wantErr: false, + setValue: true, + }, + { + name: "negative value", + value: "-3", + defaultVal: 10, + want: 10, + wantErr: true, + setValue: true, + }, + { + name: "zero value", + value: "0", + defaultVal: 10, + want: 10, + wantErr: true, + setValue: true, + }, + { + name: "over max", + value: "501", + defaultVal: 10, + want: 10, + wantErr: true, + setValue: true, + }, + { + name: "non-integer", + value: "abc", + defaultVal: 10, + want: 10, + wantErr: true, + setValue: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if err := os.WriteFile(configPath, nil, 0o600); err != nil { + t.Fatalf("reset git config: %v", err) + } + if tt.setValue { + setConfig(t, key, tt.value) + } + setEnv(t) + + got, err := GetLfsCustomTransferInt(key, tt.defaultVal) + if tt.wantErr && err == nil { + t.Fatalf("expected error, got nil") + } + if !tt.wantErr && err != nil { + t.Fatalf("unexpected error: %v", err) + } + if got != tt.want { + t.Fatalf("value = %d, want %d", got, tt.want) + } + }) + } +} diff --git a/client/common/progress.go b/client/common/progress.go new file mode 100644 index 0000000..c743e7c --- /dev/null +++ b/client/common/progress.go @@ -0,0 +1,12 @@ +package common + +// ProgressEvent matches the Git LFS custom transfer progress payload. +type ProgressEvent struct { + Event string `json:"event"` + Oid string `json:"oid"` + BytesSoFar int64 `json:"bytesSoFar"` + BytesSinceLast int64 `json:"bytesSinceLast"` +} + +// ProgressCallback emits transfer progress updates. +type ProgressCallback func(ProgressEvent) error diff --git a/client/common/types.go b/client/common/types.go index 5a0ac8d..617bd38 100644 --- a/client/common/types.go +++ b/client/common/types.go @@ -15,8 +15,10 @@ type FileUploadRequestObject struct { Filename string FileMetadata FileMetadata GUID string + OID string PresignedURL string Bucket string `json:"bucket,omitempty"` + Progress ProgressCallback } // FileDownloadResponseObject defines a object for file download @@ -24,12 +26,14 @@ type FileDownloadResponseObject struct { DownloadPath string Filename string GUID string + OID string URL string Range int64 Overwrite bool Skip bool Response *http.Response Writer io.Writer + Progress ProgressCallback } // FileMetadata defines the metadata accepted by the new object management API, Shepherd diff --git a/client/download/batch.go b/client/download/batch.go index de86659..be46051 100644 --- a/client/download/batch.go +++ b/client/download/batch.go @@ -40,7 +40,18 @@ func downloadFiles( // Scoreboard: maxRetries = 0 for now (no retry logic yet) sb := logs.NewSB(0, logger) - p := mpb.New(mpb.WithOutput(os.Stdout)) + useProgressBars := true + for _, fdr := range files { + if fdr.Progress != nil { + useProgressBars = false + break + } + } + + var p *mpb.Progress + if useProgressBars { + p = mpb.New(mpb.WithOutput(os.Stdout)) + } var eg errgroup.Group eg.SetLimit(numParallel) @@ -101,29 +112,46 @@ func downloadFiles( // Progress bar for this file total := fdr.Response.ContentLength + fdr.Range - bar := p.AddBar(total, - mpb.PrependDecorators( - decor.Name(truncateFilename(fdr.Filename, 40)+" "), - decor.CountersKibiByte("% .1f / % .1f"), - ), - mpb.AppendDecorators( - decor.Percentage(), - decor.AverageSpeed(decor.SizeB1024(0), "% .1f"), - ), - ) + var writer io.Writer = file + var bar *mpb.Bar + var tracker *progressWriter + + if useProgressBars { + bar = p.AddBar(total, + mpb.PrependDecorators( + decor.Name(truncateFilename(fdr.Filename, 40)+" "), + decor.CountersKibiByte("% .1f / % .1f"), + ), + mpb.AppendDecorators( + decor.Percentage(), + decor.AverageSpeed(decor.SizeB1024(0), "% .1f"), + ), + ) + + if fdr.Range > 0 { + bar.SetCurrent(fdr.Range) + } - if fdr.Range > 0 { - bar.SetCurrent(fdr.Range) + writer = bar.ProxyWriter(file) + } else if fdr.Progress != nil { + tracker = newProgressWriter(file, fdr.Progress, resolveDownloadOID(*fdr), total) + writer = tracker } - writer := bar.ProxyWriter(file) - _, copyErr := io.Copy(writer, fdr.Response.Body) _ = fdr.Response.Body.Close() _ = file.Close() + if tracker != nil { + if finalizeErr := tracker.Finalize(); finalizeErr != nil && copyErr == nil { + copyErr = finalizeErr + } + } + if copyErr != nil { - bar.Abort(true) + if bar != nil { + bar.Abort(true) + } err = fmt.Errorf("download failed for %s: %w", fdr.Filename, copyErr) return err } @@ -134,7 +162,9 @@ func downloadFiles( // Wait for all downloads _ = eg.Wait() - p.Wait() + if p != nil { + p.Wait() + } // Combine errors var combinedError error diff --git a/client/download/progress_writer.go b/client/download/progress_writer.go new file mode 100644 index 0000000..9ed8ab0 --- /dev/null +++ b/client/download/progress_writer.go @@ -0,0 +1,68 @@ +package download + +import ( + "io" + + "github.com/calypr/data-client/client/common" +) + +type progressWriter struct { + writer io.Writer + onProgress common.ProgressCallback + oid string + total int64 + bytesSoFar int64 +} + +func newProgressWriter(writer io.Writer, onProgress common.ProgressCallback, oid string, total int64) *progressWriter { + return &progressWriter{ + writer: writer, + onProgress: onProgress, + oid: oid, + total: total, + } +} + +func (pw *progressWriter) Write(p []byte) (int, error) { + n, err := pw.writer.Write(p) + if n > 0 && pw.onProgress != nil { + delta := int64(n) + pw.bytesSoFar += delta + if progressErr := pw.onProgress(common.ProgressEvent{ + Event: "progress", + Oid: pw.oid, + BytesSoFar: pw.bytesSoFar, + BytesSinceLast: delta, + }); progressErr != nil { + return n, progressErr + } + } + return n, err +} + +func (pw *progressWriter) Finalize() error { + if pw.onProgress == nil { + return nil + } + if pw.total == 0 || pw.bytesSoFar >= pw.total { + return nil + } + delta := pw.total - pw.bytesSoFar + pw.bytesSoFar = pw.total + return pw.onProgress(common.ProgressEvent{ + Event: "progress", + Oid: pw.oid, + BytesSoFar: pw.bytesSoFar, + BytesSinceLast: delta, + }) +} + +func resolveDownloadOID(fdr common.FileDownloadResponseObject) string { + if fdr.OID != "" { + return fdr.OID + } + if fdr.GUID != "" { + return fdr.GUID + } + return fdr.Filename +} diff --git a/client/download/progress_writer_test.go b/client/download/progress_writer_test.go new file mode 100644 index 0000000..8d573c8 --- /dev/null +++ b/client/download/progress_writer_test.go @@ -0,0 +1,46 @@ +package download + +import ( + "bytes" + "io" + "testing" + + "github.com/calypr/data-client/client/common" +) + +func TestProgressWriterFinalizes(t *testing.T) { + payload := bytes.Repeat([]byte("b"), 20) + var events []common.ProgressEvent + + writer := newProgressWriter(io.Discard, func(event common.ProgressEvent) error { + events = append(events, event) + return nil + }, "oid-456", int64(len(payload))) + + if _, err := writer.Write(payload); err != nil { + t.Fatalf("write failed: %v", err) + } + if err := writer.Finalize(); err != nil { + t.Fatalf("finalize failed: %v", err) + } + + if len(events) == 0 { + t.Fatal("expected progress events, got none") + } + + var total int64 + for _, event := range events { + if event.Event != "progress" { + t.Fatalf("unexpected event type: %s", event.Event) + } + total += event.BytesSinceLast + } + + last := events[len(events)-1] + if last.BytesSoFar != int64(len(payload)) { + t.Fatalf("expected final bytesSoFar %d, got %d", len(payload), last.BytesSoFar) + } + if total != int64(len(payload)) { + t.Fatalf("expected bytesSinceLast sum %d, got %d", len(payload), total) + } +} diff --git a/client/download/transfer.go b/client/download/transfer.go new file mode 100644 index 0000000..e54ddab --- /dev/null +++ b/client/download/transfer.go @@ -0,0 +1,97 @@ +package download + +import ( + "context" + "fmt" + "io" + "os" + "path/filepath" + "strings" + + "github.com/calypr/data-client/client/client" + "github.com/calypr/data-client/client/common" +) + +// DownloadSingleWithProgress downloads a single object while emitting progress events. +func DownloadSingleWithProgress( + ctx context.Context, + g3i client.Gen3Interface, + guid string, + downloadPath string, + protocol string, + oid string, + progress common.ProgressCallback, +) error { + var err error + downloadPath, err = common.ParseRootPath(downloadPath) + if err != nil { + return fmt.Errorf("invalid download path: %w", err) + } + if !strings.HasSuffix(downloadPath, "/") { + downloadPath += "/" + } + + renamed := make([]RenamedOrSkippedFileInfo, 0) + info, err := AskGen3ForFileInfo(ctx, g3i, guid, protocol, downloadPath, "original", false, &renamed) + if err != nil { + return err + } + + fdr := common.FileDownloadResponseObject{ + DownloadPath: downloadPath, + Filename: info.Name, + GUID: guid, + OID: oid, + Progress: progress, + } + + protocolText := "" + if protocol != "" { + protocolText = "?protocol=" + protocol + } + if err := GetDownloadResponse(ctx, g3i, &fdr, protocolText); err != nil { + return err + } + + fullPath := filepath.Join(fdr.DownloadPath, fdr.Filename) + if dir := filepath.Dir(fullPath); dir != "." { + if err = os.MkdirAll(dir, 0766); err != nil { + _ = fdr.Response.Body.Close() + return fmt.Errorf("mkdir for %s: %w", fullPath, err) + } + } + + flags := os.O_CREATE | os.O_WRONLY + if fdr.Range > 0 { + flags |= os.O_APPEND + } else if fdr.Overwrite { + flags |= os.O_TRUNC + } + + file, err := os.OpenFile(fullPath, flags, 0666) + if err != nil { + _ = fdr.Response.Body.Close() + return fmt.Errorf("open local file %s: %w", fullPath, err) + } + + total := fdr.Response.ContentLength + fdr.Range + var writer io.Writer = file + var tracker *progressWriter + if fdr.Progress != nil { + tracker = newProgressWriter(file, fdr.Progress, resolveDownloadOID(fdr), total) + writer = tracker + } + + _, copyErr := io.Copy(writer, fdr.Response.Body) + _ = fdr.Response.Body.Close() + _ = file.Close() + if tracker != nil { + if finalizeErr := tracker.Finalize(); finalizeErr != nil && copyErr == nil { + copyErr = finalizeErr + } + } + if copyErr != nil { + return fmt.Errorf("download failed for %s: %w", fdr.Filename, copyErr) + } + return nil +} diff --git a/client/download/transfer_test.go b/client/download/transfer_test.go new file mode 100644 index 0000000..7c702dc --- /dev/null +++ b/client/download/transfer_test.go @@ -0,0 +1,165 @@ +package download + +import ( + "bytes" + "context" + "errors" + "io" + "net/http" + "net/url" + "os" + "path/filepath" + "strings" + "testing" + + "github.com/calypr/data-client/client/api" + "github.com/calypr/data-client/client/common" + "github.com/calypr/data-client/client/conf" + "github.com/calypr/data-client/client/logs" + "github.com/calypr/data-client/client/request" +) + +type fakeGen3Download struct { + cred *conf.Credential + logger *logs.TeeLogger + doFunc func(context.Context, *request.RequestBuilder) (*http.Response, error) +} + +func (f *fakeGen3Download) GetCredential() *conf.Credential { return f.cred } +func (f *fakeGen3Download) Logger() *logs.TeeLogger { return f.logger } +func (f *fakeGen3Download) New(method, url string) *request.RequestBuilder { + return &request.RequestBuilder{Method: method, Url: url} +} +func (f *fakeGen3Download) Do(ctx context.Context, req *request.RequestBuilder) (*http.Response, error) { + return f.doFunc(ctx, req) +} +func (f *fakeGen3Download) CheckPrivileges(context.Context) (map[string]any, error) { + return nil, nil +} +func (f *fakeGen3Download) CheckForShepherdAPI(context.Context) (bool, error) { return false, nil } +func (f *fakeGen3Download) DeleteRecord(context.Context, string) (string, error) { + return "", nil +} +func (f *fakeGen3Download) GetDownloadPresignedUrl(context.Context, string, string) (string, error) { + return "https://download.example.com/object", nil +} +func (f *fakeGen3Download) ParseFenceURLResponse(resp *http.Response) (api.FenceResponse, error) { + return (&api.Functions{}).ParseFenceURLResponse(resp) +} +func (f *fakeGen3Download) ExportCredential(context.Context, *conf.Credential) error { return nil } +func (f *fakeGen3Download) NewAccessToken(context.Context) error { return nil } + +func TestDownloadSingleWithProgressEmitsEvents(t *testing.T) { + payload := bytes.Repeat([]byte("d"), 64) + downloadDir := t.TempDir() + downloadPath := downloadDir + string(os.PathSeparator) + + var events []common.ProgressEvent + progress := func(event common.ProgressEvent) error { + events = append(events, event) + return nil + } + + fake := &fakeGen3Download{ + cred: &conf.Credential{APIEndpoint: "https://example.com", AccessToken: "token"}, + logger: logs.NewTeeLogger("", "", io.Discard), + doFunc: func(_ context.Context, req *request.RequestBuilder) (*http.Response, error) { + switch { + case strings.Contains(req.Url, common.IndexdIndexEndpoint): + return newDownloadJSONResponse(req.Url, `{"file_name":"payload.bin","size":64}`), nil + case strings.HasPrefix(req.Url, "https://download.example.com/"): + return newDownloadResponse(req.Url, payload, http.StatusOK), nil + default: + return nil, errors.New("unexpected request url: " + req.Url) + } + }, + } + + err := DownloadSingleWithProgress(context.Background(), fake, "guid-123", downloadPath, "", "oid-123", progress) + if err != nil { + t.Fatalf("download failed: %v", err) + } + + if len(events) == 0 { + t.Fatal("expected progress events") + } + for i := 1; i < len(events); i++ { + if events[i].BytesSoFar < events[i-1].BytesSoFar { + t.Fatalf("bytesSoFar not monotonic: %d then %d", events[i-1].BytesSoFar, events[i].BytesSoFar) + } + } + last := events[len(events)-1] + if last.BytesSoFar != int64(len(payload)) { + t.Fatalf("expected final bytesSoFar %d, got %d", len(payload), last.BytesSoFar) + } + fullPath := filepath.Join(downloadPath, "payload.bin") + if _, err := os.Stat(fullPath); err != nil { + t.Fatalf("expected file to exist: %v", err) + } +} + +func TestDownloadSingleWithProgressFinalizeOnError(t *testing.T) { + downloadDir := t.TempDir() + downloadPath := downloadDir + string(os.PathSeparator) + + var events []common.ProgressEvent + progress := func(event common.ProgressEvent) error { + events = append(events, event) + return nil + } + + fake := &fakeGen3Download{ + cred: &conf.Credential{APIEndpoint: "https://example.com", AccessToken: "token"}, + logger: logs.NewTeeLogger("", "", io.Discard), + doFunc: func(_ context.Context, req *request.RequestBuilder) (*http.Response, error) { + switch { + case strings.Contains(req.Url, common.IndexdIndexEndpoint): + return newDownloadJSONResponse(req.Url, `{"file_name":"payload.bin","size":64}`), nil + case strings.HasPrefix(req.Url, "https://download.example.com/"): + return newDownloadResponse(req.Url, []byte("short"), http.StatusOK), nil + default: + return nil, errors.New("unexpected request url: " + req.Url) + } + }, + } + + err := DownloadSingleWithProgress(context.Background(), fake, "guid-123", downloadPath, "", "oid-123", progress) + if err == nil { + t.Fatal("expected download error") + } + + if len(events) == 0 { + t.Fatal("expected progress events") + } + last := events[len(events)-1] + if last.BytesSoFar != 64 { + t.Fatalf("expected finalize bytesSoFar 64, got %d", last.BytesSoFar) + } +} + +func newDownloadJSONResponse(rawURL, body string) *http.Response { + parsedURL, err := url.Parse(rawURL) + if err != nil { + parsedURL = &url.URL{} + } + return &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(strings.NewReader(body)), + Request: &http.Request{URL: parsedURL}, + Header: make(http.Header), + } +} + +func newDownloadResponse(rawURL string, payload []byte, status int) *http.Response { + parsedURL, err := url.Parse(rawURL) + if err != nil { + parsedURL = &url.URL{} + } + return &http.Response{ + StatusCode: status, + Body: io.NopCloser(bytes.NewReader(payload)), + ContentLength: int64(len(payload)), + Request: &http.Request{URL: parsedURL}, + Header: make(http.Header), + } +} diff --git a/client/upload/multipart.go b/client/upload/multipart.go index be97d69..df8d0cd 100644 --- a/client/upload/multipart.go +++ b/client/upload/multipart.go @@ -12,6 +12,7 @@ import ( "sort" "strings" "sync" + "sync/atomic" client "github.com/calypr/data-client/client/client" "github.com/calypr/data-client/client/common" @@ -60,18 +61,8 @@ func MultipartUpload(ctx context.Context, g3 client.Gen3Interface, req common.Fi key := fmt.Sprintf("%s/%s", finalGUID, req.Filename) g3.Logger().Printf("Initialized Upload: ID=%s, Key=%s\n", uploadID, key) - optimalChunkSize := func(fSize int64) int64 { - if fSize <= 512*common.MB { - return 32 * common.MB - } - chunkSize := fSize / common.MaxMultipartParts - if chunkSize < common.MinChunkSize { - chunkSize = common.MinChunkSize - } - return ((chunkSize + common.MB - 1) / common.MB) * common.MB - } + chunkSize := OptimalChunkSize(fileSize) - chunkSize := optimalChunkSize(fileSize) numChunks := int((fileSize + chunkSize - 1) / chunkSize) chunks := make(chan int, numChunks) @@ -85,6 +76,7 @@ func MultipartUpload(ctx context.Context, g3 client.Gen3Interface, req common.Fi mu sync.Mutex parts []MultipartPartObject uploadErrors []error + totalBytes int64 // Atomic counter for monotonically increasing BytesSoFar ) // 3. Worker logic @@ -128,6 +120,18 @@ func MultipartUpload(ctx context.Context, g3 client.Gen3Interface, req common.Fi if bar != nil { bar.IncrInt64(size) } + if req.Progress != nil { + currentTotal := atomic.AddInt64(&totalBytes, size) + err = req.Progress(common.ProgressEvent{ + Event: "progress", + Oid: req.OID, + BytesSinceLast: size, + BytesSoFar: currentTotal, + }) + if err != nil { + g3.Logger().Printf("progress callback error: %v", err) + } + } mu.Unlock() } } diff --git a/client/upload/multipart_test.go b/client/upload/multipart_test.go new file mode 100644 index 0000000..c03cbea --- /dev/null +++ b/client/upload/multipart_test.go @@ -0,0 +1,153 @@ +package upload + +import ( + "bytes" + "context" + "fmt" + "io" + "net/http" + "net/http/httptest" + "net/url" + "os" + "strings" + "sync" + "testing" + + "github.com/calypr/data-client/client/api" + "github.com/calypr/data-client/client/common" + "github.com/calypr/data-client/client/conf" + "github.com/calypr/data-client/client/logs" + "github.com/calypr/data-client/client/request" +) + +type fakeGen3Upload struct { + cred *conf.Credential + logger *logs.TeeLogger + doFunc func(context.Context, *request.RequestBuilder) (*http.Response, error) +} + +func (f *fakeGen3Upload) GetCredential() *conf.Credential { return f.cred } +func (f *fakeGen3Upload) Logger() *logs.TeeLogger { return f.logger } +func (f *fakeGen3Upload) New(method, url string) *request.RequestBuilder { + return &request.RequestBuilder{Method: method, Url: url} +} +func (f *fakeGen3Upload) Do(ctx context.Context, req *request.RequestBuilder) (*http.Response, error) { + return f.doFunc(ctx, req) +} +func (f *fakeGen3Upload) CheckPrivileges(context.Context) (map[string]any, error) { + return nil, nil +} +func (f *fakeGen3Upload) CheckForShepherdAPI(context.Context) (bool, error) { return false, nil } +func (f *fakeGen3Upload) DeleteRecord(context.Context, string) (string, error) { + return "", nil +} +func (f *fakeGen3Upload) GetDownloadPresignedUrl(context.Context, string, string) (string, error) { + return "", nil +} +func (f *fakeGen3Upload) ParseFenceURLResponse(resp *http.Response) (api.FenceResponse, error) { + return (&api.Functions{}).ParseFenceURLResponse(resp) +} +func (f *fakeGen3Upload) ExportCredential(context.Context, *conf.Credential) error { return nil } +func (f *fakeGen3Upload) NewAccessToken(context.Context) error { return nil } + +func TestMultipartUploadProgressIntegration(t *testing.T) { + ctx := context.Background() + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != http.MethodPut { + w.WriteHeader(http.StatusMethodNotAllowed) + return + } + _, _ = io.Copy(io.Discard, r.Body) + _ = r.Body.Close() + w.Header().Set("ETag", "etag-123") + w.WriteHeader(http.StatusOK) + })) + defer server.Close() + + file, err := os.CreateTemp(t.TempDir(), "multipart-*.bin") + if err != nil { + t.Fatalf("create temp file: %v", err) + } + defer file.Close() + + fileSize := int64(101 * common.MB) + if err := file.Truncate(fileSize); err != nil { + t.Fatalf("truncate file: %v", err) + } + if _, err := file.Seek(0, io.SeekStart); err != nil { + t.Fatalf("seek file: %v", err) + } + + var ( + events []common.ProgressEvent + mu sync.Mutex + ) + progress := func(event common.ProgressEvent) error { + mu.Lock() + defer mu.Unlock() + events = append(events, event) + return nil + } + + logger := logs.NewTeeLogger("", "", io.Discard) + fake := &fakeGen3Upload{ + cred: &conf.Credential{ + APIEndpoint: "https://example.com", + AccessToken: "token", + }, + logger: logger, + doFunc: func(_ context.Context, req *request.RequestBuilder) (*http.Response, error) { + switch { + case strings.Contains(req.Url, common.FenceDataMultipartInitEndpoint): + return newJSONResponse(req.Url, `{"uploadId":"upload-123","guid":"guid-123"}`), nil + case strings.Contains(req.Url, common.FenceDataMultipartUploadEndpoint): + return newJSONResponse(req.Url, fmt.Sprintf(`{"presigned_url":"%s"}`, server.URL)), nil + case strings.Contains(req.Url, common.FenceDataMultipartCompleteEndpoint): + return newJSONResponse(req.Url, `{}`), nil + default: + return nil, fmt.Errorf("unexpected request url: %s", req.Url) + } + }, + } + + requestObject := common.FileUploadRequestObject{ + FilePath: file.Name(), + Filename: "multipart.bin", + GUID: "guid-123", + OID: "oid-123", + Bucket: "bucket", + Progress: progress, + } + + if err := MultipartUpload(ctx, fake, requestObject, file, false); err != nil { + t.Fatalf("multipart upload failed: %v", err) + } + + mu.Lock() + defer mu.Unlock() + if len(events) == 0 { + t.Fatal("expected progress events") + } + for i := 1; i < len(events); i++ { + if events[i].BytesSoFar < events[i-1].BytesSoFar { + t.Fatalf("bytesSoFar not monotonic: %d then %d", events[i-1].BytesSoFar, events[i].BytesSoFar) + } + } + last := events[len(events)-1] + if last.BytesSoFar != fileSize { + t.Fatalf("expected final bytesSoFar %d, got %d", fileSize, last.BytesSoFar) + } +} + +func newJSONResponse(rawURL, body string) *http.Response { + parsedURL, err := url.Parse(rawURL) + if err != nil { + parsedURL = &url.URL{} + } + return &http.Response{ + StatusCode: http.StatusOK, + Body: io.NopCloser(bytes.NewBufferString(body)), + Request: &http.Request{URL: parsedURL}, + Header: make(http.Header), + } +} diff --git a/client/upload/progress_reader.go b/client/upload/progress_reader.go new file mode 100644 index 0000000..8262ee5 --- /dev/null +++ b/client/upload/progress_reader.go @@ -0,0 +1,68 @@ +package upload + +import ( + "io" + + "github.com/calypr/data-client/client/common" +) + +type progressReader struct { + reader io.Reader + onProgress common.ProgressCallback + oid string + total int64 + bytesSoFar int64 +} + +func newProgressReader(reader io.Reader, onProgress common.ProgressCallback, oid string, total int64) *progressReader { + return &progressReader{ + reader: reader, + onProgress: onProgress, + oid: oid, + total: total, + } +} + +func resolveUploadOID(req common.FileUploadRequestObject) string { + if req.OID != "" { + return req.OID + } + if req.GUID != "" { + return req.GUID + } + return req.Filename +} + +func (pr *progressReader) Read(p []byte) (int, error) { + n, err := pr.reader.Read(p) + if n > 0 && pr.onProgress != nil { + delta := int64(n) + pr.bytesSoFar += delta + if progressErr := pr.onProgress(common.ProgressEvent{ + Event: "progress", + Oid: pr.oid, + BytesSoFar: pr.bytesSoFar, + BytesSinceLast: delta, + }); progressErr != nil { + return n, progressErr + } + } + return n, err +} + +func (pr *progressReader) Finalize() error { + if pr.onProgress == nil { + return nil + } + if pr.total == 0 || pr.bytesSoFar >= pr.total { + return nil + } + delta := pr.total - pr.bytesSoFar + pr.bytesSoFar = pr.total + return pr.onProgress(common.ProgressEvent{ + Event: "progress", + Oid: pr.oid, + BytesSoFar: pr.bytesSoFar, + BytesSinceLast: delta, + }) +} diff --git a/client/upload/progress_reader_test.go b/client/upload/progress_reader_test.go new file mode 100644 index 0000000..de77d8e --- /dev/null +++ b/client/upload/progress_reader_test.go @@ -0,0 +1,46 @@ +package upload + +import ( + "bytes" + "io" + "testing" + + "github.com/calypr/data-client/client/common" +) + +func TestProgressReaderFinalizes(t *testing.T) { + payload := bytes.Repeat([]byte("a"), 16) + var events []common.ProgressEvent + + reader := newProgressReader(bytes.NewReader(payload), func(event common.ProgressEvent) error { + events = append(events, event) + return nil + }, "oid-123", int64(len(payload))) + + if _, err := io.Copy(io.Discard, reader); err != nil { + t.Fatalf("copy failed: %v", err) + } + if err := reader.Finalize(); err != nil { + t.Fatalf("finalize failed: %v", err) + } + + if len(events) == 0 { + t.Fatal("expected progress events, got none") + } + + var total int64 + for _, event := range events { + if event.Event != "progress" { + t.Fatalf("unexpected event type: %s", event.Event) + } + total += event.BytesSinceLast + } + + last := events[len(events)-1] + if last.BytesSoFar != int64(len(payload)) { + t.Fatalf("expected final bytesSoFar %d, got %d", len(payload), last.BytesSoFar) + } + if total != int64(len(payload)) { + t.Fatalf("expected bytesSinceLast sum %d, got %d", len(payload), total) + } +} diff --git a/client/upload/singleFile.go b/client/upload/singleFile.go index cc9a430..32c4194 100644 --- a/client/upload/singleFile.go +++ b/client/upload/singleFile.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "io" "os" "path/filepath" @@ -12,7 +13,7 @@ import ( "github.com/calypr/data-client/client/logs" ) -func UploadSingle(ctx context.Context, profile string, guid string, filePath string, bucketName string, enableLogs bool) error { +func UploadSingle(ctx context.Context, profile string, guid string, oid string, filePath string, bucketName string, enableLogs bool, progressCallback common.ProgressCallback) error { logger, closer := logs.New(profile, logs.WithSucceededLog(), logs.WithFailedLog()) if enableLogs { @@ -67,9 +68,17 @@ func UploadSingle(ctx context.Context, profile string, guid string, filePath str } fileSize := fi.Size() - furObject := common.FileUploadRequestObject{FilePath: filePath, Filename: filename, GUID: guid, Bucket: bucketName} + furObject := common.FileUploadRequestObject{ + FilePath: filePath, + Filename: filename, + GUID: guid, + OID: oid, + Bucket: bucketName, + Progress: progressCallback, + } furObject, err = generateUploadRequest(ctx, g3i, furObject, file, nil) + if err != nil { if enableLogs { sb := g3i.Logger().Scoreboard() @@ -81,7 +90,19 @@ func UploadSingle(ctx context.Context, profile string, guid string, filePath str return fmt.Errorf("[ERROR] Error occurred during request generation for file %s: %s\n", filePath, err.Error()) } - _, err = uploadPart(ctx, furObject.PresignedURL, file, fileSize) + var reader io.Reader = file + var progressTracker *progressReader + if furObject.Progress != nil { + progressTracker = newProgressReader(file, furObject.Progress, resolveUploadOID(furObject), fileSize) + reader = progressTracker + } + + _, err = uploadPart(ctx, furObject.PresignedURL, reader, fileSize) + if progressTracker != nil { + if finalizeErr := progressTracker.Finalize(); finalizeErr != nil && err == nil { + err = finalizeErr + } + } if err != nil { if enableLogs { g3i.Logger().Scoreboard().IncrementSB(1) // Increment failure diff --git a/client/upload/upload.go b/client/upload/upload.go index b786164..14fc894 100644 --- a/client/upload/upload.go +++ b/client/upload/upload.go @@ -38,7 +38,7 @@ func Upload(ctx context.Context, g3 client.Gen3Interface, req common.FileUploadR // Use Single-Part if file is smaller than 5GB (or your defined limit) if fileSize < 5*common.GB { g3.Logger().Printf("File size %d bytes (< 5GB), performing single-part upload\n", fileSize) - UploadSingle(ctx, g3.GetCredential().Profile, req.GUID, req.FilePath, req.Bucket, true) + UploadSingle(ctx, g3.GetCredential().Profile, req.GUID, req.GUID, req.FilePath, req.Bucket, true, nil) } g3.Logger().Printf("File size %d bytes (>= 5GB), performing multipart upload\n", fileSize) return MultipartUpload(ctx, g3, req, file, showProgress) diff --git a/client/upload/utils.go b/client/upload/utils.go index 2dbfa85..c26f3fc 100644 --- a/client/upload/utils.go +++ b/client/upload/utils.go @@ -131,3 +131,59 @@ func FormatSize(size int64) string { return fmt.Sprintf("%.1f"+unitMap[unitSize], float64(size)/float64(unitSize)) } + +// OptimalChunkSize returns a recommended chunk size for the given fileSize (in bytes). +// - <= 100 MB: return fileSize (use single PUT) +// - >100 MB and <= 1 GB: 10 MB +// - >1 GB and <= 10 GB: scaled between 25 MB and 128 MB +// - >10 GB and <= 100 GB: 256 MB +// - >100 GB: scaled between 512 MB and 1024 MB (1 GB) +// See: +// https://cloud.switch.ch/-/documentation/s3/multipart-uploads/#best-practices +func OptimalChunkSize(fileSize int64) int64 { + if fileSize <= 0 { + return 1 * common.MB + } + + switch { + case fileSize <= 100*common.MB: + // Single PUT: return whole file size + return fileSize + + case fileSize <= 1*common.GB: + return 10 * common.MB + + case fileSize <= 10*common.GB: + return scaleLinear(fileSize, 1*common.GB, 10*common.GB, 25*common.MB, 128*common.MB) + + case fileSize <= 100*common.GB: + return 256 * common.MB + + default: + // Scale for very large files; cap scaling at 1 TB for ratio purposes + return scaleLinear(fileSize, 100*common.GB, 1000*common.GB, 512*common.MB, 1024*common.MB) + } +} + +// scaleLinear scales size in [minSize, maxSize] to chunk in [minChunk, maxChunk] (linear). +// Result is rounded down to nearest MB and clamped to [minChunk, maxChunk]. +func scaleLinear(size, minSize, maxSize, minChunk, maxChunk int64) int64 { + if size <= minSize { + return minChunk + } + if size >= maxSize { + return maxChunk + } + ratio := float64(size-minSize) / float64(maxSize-minSize) + chunkF := float64(minChunk) + ratio*(float64(maxChunk-minChunk)) + // round down to nearest MB + mb := int64(common.MB) + chunk := int64(chunkF) / mb * mb + if chunk < minChunk { + return minChunk + } + if chunk > maxChunk { + return maxChunk + } + return chunk +} diff --git a/client/upload/utils_test.go b/client/upload/utils_test.go new file mode 100644 index 0000000..8681096 --- /dev/null +++ b/client/upload/utils_test.go @@ -0,0 +1,124 @@ +package upload + +import ( + "testing" + + "github.com/calypr/data-client/client/common" +) + +func TestOptimalChunkSize(t *testing.T) { + tests := []struct { + name string + fileSize int64 + wantChunkSize int64 + wantParts int64 + }{ + { + name: "0 bytes", + fileSize: 0, + wantChunkSize: 1 * common.MB, + wantParts: 0, + }, + { + name: "1MB", + fileSize: 1 * common.MB, + wantChunkSize: 1 * common.MB, + wantParts: 1, + }, + { + name: "100MB", + fileSize: 100 * common.MB, + wantChunkSize: 100 * common.MB, + wantParts: 1, + }, + { + name: "100MB+1B", + fileSize: 100*common.MB + 1, + wantChunkSize: 10 * common.MB, + wantParts: 11, + }, + { + name: "500MB", + fileSize: 500 * common.MB, + wantChunkSize: 10 * common.MB, + wantParts: 50, + }, + { + name: "1GB", + fileSize: 1 * common.GB, + wantChunkSize: 10 * common.MB, + wantParts: 103, + }, + { + name: "1GB+1B", + fileSize: 1*common.GB + 1, + wantChunkSize: 25 * common.MB, + wantParts: 41, + }, + { + name: "5GB", + fileSize: 5 * common.GB, + wantChunkSize: 70 * common.MB, + wantParts: 74, + }, + { + name: "10GB", + fileSize: 10 * common.GB, + wantChunkSize: 128 * common.MB, + wantParts: 80, + }, + { + name: "10GB+1B", + fileSize: 10*common.GB + 1, + wantChunkSize: 256 * common.MB, + wantParts: 41, + }, + { + name: "50GB", + fileSize: 50 * common.GB, + wantChunkSize: 256 * common.MB, + wantParts: 200, + }, + { + name: "100GB", + fileSize: 100 * common.GB, + wantChunkSize: 256 * common.MB, + wantParts: 400, + }, + { + name: "100GB+1B", + fileSize: 100*common.GB + 1, + wantChunkSize: 512 * common.MB, + wantParts: 201, + }, + { + name: "500GB", + fileSize: 500 * common.GB, + wantChunkSize: 739 * common.MB, + wantParts: 693, + }, + { + name: "1TB", + fileSize: 1 * common.TB, + wantChunkSize: 1 * common.GB, + wantParts: 1024, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + chunkSize := OptimalChunkSize(tt.fileSize) + if chunkSize != tt.wantChunkSize { + t.Fatalf("chunk size = %d, want %d", chunkSize, tt.wantChunkSize) + } + + parts := int64(0) + if tt.fileSize > 0 && chunkSize > 0 { + parts = (tt.fileSize + chunkSize - 1) / chunkSize + } + if parts != tt.wantParts { + t.Fatalf("parts = %d, want %d", parts, tt.wantParts) + } + }) + } +} diff --git a/cmd/upload-multiple.go b/cmd/upload-multiple.go index 13f91d7..66fef2e 100644 --- a/cmd/upload-multiple.go +++ b/cmd/upload-multiple.go @@ -38,6 +38,7 @@ Options to run multipart uploads for large files and parallel batch uploading ar fmt.Printf("Notice: this command uploads to pre-existing GUIDs from a manifest.\nIf you want to upload new files (new GUIDs generated automatically), use \"./data-client upload\" instead.\n\n") ctx := context.Background() + noopProgress := func(common.ProgressEvent) error { return nil } logger, closer := logs.New(profile, logs.WithSucceededLog(), logs.WithFailedLog(), logs.WithScoreboard()) defer closer() @@ -94,6 +95,7 @@ Options to run multipart uploads for large files and parallel batch uploading ar // GUID comes from manifest → override fur.GUID = obj.ObjectID fur.Bucket = bucketName + fur.Progress = noopProgress logger.Println("\t" + localFilePath + " → GUID " + obj.ObjectID) requests = append(requests, fur) @@ -124,7 +126,7 @@ Options to run multipart uploads for large files and parallel batch uploading ar } } else { for _, req := range single { - upload.UploadSingle(ctx, profileConfig.Profile, req.GUID, req.FilePath, req.Bucket, true) + upload.UploadSingle(ctx, profileConfig.Profile, req.GUID, req.GUID, req.FilePath, req.Bucket, true, noopProgress) } } diff --git a/cmd/upload-single.go b/cmd/upload-single.go index d8a8b53..0270e36 100644 --- a/cmd/upload-single.go +++ b/cmd/upload-single.go @@ -5,6 +5,7 @@ import ( "context" "log" + "github.com/calypr/data-client/client/common" "github.com/calypr/data-client/client/upload" "github.com/spf13/cobra" ) @@ -20,7 +21,8 @@ func init() { Long: `Gets a presigned URL for which to upload a file associated with a GUID and then uploads the specified file.`, Example: `./data-client upload-single --profile= --guid=f6923cf3-xxxx-xxxx-xxxx-14ab3f84f9d6 --file=`, Run: func(cmd *cobra.Command, args []string) { - err := upload.UploadSingle(context.Background(), profile, guid, filePath, bucketName, true) + noopProgress := func(common.ProgressEvent) error { return nil } + err := upload.UploadSingle(context.Background(), profile, guid, guid, filePath, bucketName, true, noopProgress) if err != nil { log.Fatalln(err.Error()) } diff --git a/docs/optimal-chunk-size.md b/docs/optimal-chunk-size.md new file mode 100644 index 0000000..86019ba --- /dev/null +++ b/docs/optimal-chunk-size.md @@ -0,0 +1,152 @@ + + +# Engineering note — Optimal Chunk Size Calculation for Multipart Uploads + +## OLD: + optimalChunkSize determines the ideal chunk/part size for multipart upload based on file size. + The chunk size (also known as "message size" or "part size") affects upload performance and + must comply with S3 constraints. + + Calculation logic: + - For files ≤ 512 MB: Returns 32 MB chunks for optimal performance + - For files > 512 MB: Calculates fileSize/maxMultipartParts, with minimum of 5 MB + - Enforces minimum of 5 MB (S3 requirement for all parts except the last) + - Rounds up to nearest MB for alignment + + This results in: + - Files ≤ 512 MB: 32 MB chunks + - Files 512 MB - ~49 GB: 5 MB chunks (minimum enforced) + The ~49 GB threshold (10,000 parts × 5 MB) is where files exceed S3's + 10,000 part limit when using the minimum chunk size + - Files > ~49 GB: Dynamically calculated to stay under 10,000 parts + + Examples: + - 100 MB file → 32 MB chunks (4 parts) + - 1 GB file → 5 MB chunks (~205 parts) + - 10 GB file → 5 MB chunks (~2,048 parts) + - 50 GB file → 6 MB chunks (~8,534 parts) + - 100 GB file → 11 MB chunks (~9,310 parts) + - 1 TB file → 105 MB chunks (~9,987 parts) + +## NEW + +OptimalChunkSize determines the ideal chunk/part size for multipart upload based on file size. +The chunk size (also known as "message size" or "part size") affects upload performance and +must comply with S3 constraints. + +Calculation logic: + - For files ≤ 100 MB: Returns the file size itself (single PUT, no multipart) + - For files > 100 MB and ≤ 1 GB: Returns 10 MB chunks + - For files > 1 GB and ≤ 10 GB: Scales linearly between 25 MB and 128 MB + - For files > 10 GB and ≤ 100 GB: Returns 256 MB chunks + - For files > 100 GB: Scales linearly between 512 MB and 1024 MB (capped at 1 TB for ratio purposes) + - All chunk sizes are rounded down to the nearest MB + - Minimum chunk size is 1 MB (for zero or negative input) + +This results in: + - Files ≤ 100 MB: Single PUT upload + - Files 100 MB - 1 GB: 10 MB chunks + - Files 1 GB - 10 GB: 25-128 MB chunks (scaled) + - Files 10 GB - 100 GB: 256 MB chunks + - Files > 100 GB: 512-1024 MB chunks (scaled) + +Examples: + - 100 MB file → 100 MB chunk (1 part, single PUT) + - 500 MB file → 10 MB chunks (50 parts) + - 1 GB file → 10 MB chunks (103 parts) + - 5 GB file → 70 MB chunks (74 parts, scaled) + - 10 GB file → 128 MB chunks (80 parts) + - 50 GB file → 256 MB chunks (200 parts) + - 100 GB file → 256 MB chunks (400 parts) + - 500 GB file → 739 MB chunks (693 parts, scaled) + - 1 TB file → 1024 MB chunks (1024 parts) + +### Testing + + +```bash +go test ./client/upload -run '^TestOptimalChunkSize$' -v + +``` + +Purpose +- Validate `OptimalChunkSize` behavior and return values (chunk size and number of parts) across thresholds, boundaries and scaled ranges. + +Key behavior to assert +1. Input type and units: sizes are `int64` bytes; tests should use `common.MB` / `common.GB` constants. +2. Parts calculation: `parts = ceil(fileSize / chunk)`; `fileSize == 0` returns `parts == 0`. +3. Scaling: scaled ranges are linear, rounded **down** to the nearest MB and clamped to range. +4. Minimum chunk clamp: result is at least `1 MB`. +5. Boundary semantics: implementation uses `<=` and some ranges start at `X + 1` — include exact, \-1 and \+1 byte checks. + +Parameterized test cases (file size ⇒ expected chunk ⇒ expected parts) +1. `0` bytes + - chunk: `1 MB` (fallback) + - parts: `0` + +2. `1 MB` + - chunk: `1 MB` (<= 100 MB) + - parts: `1` + +3. `100 MB` + - chunk: `100 MB` (<= 100 MB) + - parts: `1` + +4. `100 MB + 1 B` + - chunk: `10 MB` (> 100 MB - <= 1 GB) + - parts: ceil((100 MB + 1 B) / 10 MB) = `11` + +5. `500 MB` + - chunk: `10 MB` + - parts: `50` + +6. `1 GB` (1024 MB) + - chunk: `10 MB` (<= 1 GB) + - parts: ceil(1024 / 10) = `103` + +7. `1 GB + 1 B` + - chunk: `25 MB` (start of 1 GB - 10 GB scaled range) + - parts: ceil((1024 MB + 1 B) / 25 MB) = `41` + +8. `5 GB` (5120 MB) + - chunk: linear between `25 MB` and `128 MB` → ≈ `70 MB` (rounded down) + - parts: ceil(5120 / 70) = `74` + +9. `10 GB` (10240 MB) + - chunk: `128 MB` (end of 1 GB - 10 GB scaled range) + - parts: `80` + +10. `10 GB + 1 B` + - chunk: `256 MB` (> 10 GB - <= 100 GB fixed) + - parts: ceil((10240 MB + 1 B) / 256 MB) = `41` + +11. `50 GB` (51200 MB) + - chunk: `256 MB` + - parts: `200` + +12. `100 GB` (102400 MB) + - chunk: `256 MB` + - parts: `400` + +13. `100 GB + 1 B` + - chunk: `512 MB` (start of > 100 GB scaled range) + - parts: ceil((102400 MB + 1 B) / 512 MB) = `201` + +14. `500 GB` (512000 MB) + - chunk: linear between `512 MB` and `1024 MB` → ≈ `739 MB` (rounded down) + - parts: ceil(512000 / 739) = `693` + +15. `1 TB` (1024 GB = 1,048,576 MB) — note: use project units consistently + - chunk: `1024 MB` (max of scaled range) + - parts: 1,048,576 / 1024 = `1024` + +Test design notes (concise) +1. Use table-driven subtests in `client/upload/utils_test.go`. Include fields: name, `fileSize int64`, `wantChunk int64`, `wantParts int64`. +2. For scaled cases assert: MB alignment, clamped to min/max, and exact `wantParts`. Use integer arithmetic for parts. +3. Add explicit boundary triples for each threshold: exact, -1 byte, +1 byte. +4. Include negative and zero cases to verify fallback behavior. +5. Keep tests deterministic and fast (no external deps). + +Execution +- Run from repo root: `go test ./client/upload -v` +- Run single test: `go test ./client/upload -run '^TestOptimalChunkSize$' -v` \ No newline at end of file