Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 52 additions & 5 deletions client/common/constants.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
package common

import (
"fmt"
"log"
"os"
"os/exec"
"strconv"
"strings"
"time"
)

Expand All @@ -14,7 +19,7 @@ const (
MB
// GB is gigabytes
GB
// TB is terrabytes
// TB is terabytes
TB
)
const (
Expand Down Expand Up @@ -71,12 +76,12 @@ const (
HeaderContentType = "Content-Type"
MIMEApplicationJSON = "application/json"

// FileSizeLimit is the maximun single file size for non-multipart upload (5GB)
// FileSizeLimit is the maximum single file size for non-multipart upload (5GB)
FileSizeLimit = 5 * GB

// MultipartFileSizeLimit is the maximun single file size for multipart upload (5TB)
// MultipartFileSizeLimit is the maximum single file size for multipart upload (5TB)
MultipartFileSizeLimit = 5 * TB
MinMultipartChunkSize = 5 * MB
MinMultipartChunkSize = 10 * MB

// MaxRetryCount is the maximum retry number per record
MaxRetryCount = 5
Expand All @@ -85,5 +90,47 @@ const (
MaxMultipartParts = 10000
MaxConcurrentUploads = 10
MaxRetries = 5
MinChunkSize = 5 * 1024 * 1024
)

var (
// MinChunkSize is configurable via git config and initialized in init()
MinChunkSize int64
)

func init() {
v, err := GetLfsCustomTransferInt("lfs.customtransfer.drs.multipart-min-chunk-size", 10)
if err != nil {
log.Printf("Warning: Could not read git config for multipart-min-chunk-size, using default (10 MB): %v\n", err)
MinChunkSize = int64(10) * MB
return
}

MinChunkSize = int64(v) * MB
}

func GetLfsCustomTransferInt(key string, defaultValue int64) (int64, error) {
defaultText := strconv.FormatInt(defaultValue, 10)
// TODO cache or get all the configs at once?
cmd := exec.Command("git", "config", "--get", "--default", defaultText, key)
output, err := cmd.Output()
if err != nil {
return defaultValue, fmt.Errorf("error reading git config %s: %v", key, err)
}

value := strings.TrimSpace(string(output))

parsed, err := strconv.ParseInt(value, 10, 64)
if err != nil {
return defaultValue, fmt.Errorf("invalid int value for %s: %q", key, value)
}

if parsed < 0 {
return defaultValue, fmt.Errorf("invalid negative int value for %s: %d", key, parsed)
}

if parsed == 0 || parsed > 500 {
return defaultValue, fmt.Errorf("invalid int value for %s: %d. Must be between 1 and 500", key, parsed)
}

return parsed, nil
}
110 changes: 110 additions & 0 deletions client/common/constants_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package common

import (
"os"
"os/exec"
"path/filepath"
"testing"
)

func TestGetLfsCustomTransferInt(t *testing.T) {
configDir := t.TempDir()
configPath := filepath.Join(configDir, "gitconfig")

setConfig := func(t *testing.T, key, value string) {
t.Helper()
cmd := exec.Command("git", "config", "--file", configPath, key, value)
if err := cmd.Run(); err != nil {
t.Fatalf("set git config %s=%s: %v", key, value, err)
}
}

setEnv := func(t *testing.T) {
t.Helper()
t.Setenv("GIT_CONFIG_GLOBAL", configPath)
t.Setenv("GIT_CONFIG_SYSTEM", os.DevNull)
t.Setenv("GIT_CONFIG_NOSYSTEM", "1")
}

const key = "lfs.customtransfer.drs.multipart-min-chunk-size"

tests := []struct {
name string
value string
defaultVal int64
want int64
wantErr bool
setValue bool
}{
{
name: "missing uses default",
defaultVal: 10,
want: 10,
wantErr: false,
setValue: false,
},
{
name: "valid value",
value: "25",
defaultVal: 10,
want: 25,
wantErr: false,
setValue: true,
},
{
name: "negative value",
value: "-3",
defaultVal: 10,
want: 10,
wantErr: true,
setValue: true,
},
{
name: "zero value",
value: "0",
defaultVal: 10,
want: 10,
wantErr: true,
setValue: true,
},
{
name: "over max",
value: "501",
defaultVal: 10,
want: 10,
wantErr: true,
setValue: true,
},
{
name: "non-integer",
value: "abc",
defaultVal: 10,
want: 10,
wantErr: true,
setValue: true,
},
}

for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if err := os.WriteFile(configPath, nil, 0o600); err != nil {
t.Fatalf("reset git config: %v", err)
}
if tt.setValue {
setConfig(t, key, tt.value)
}
setEnv(t)

got, err := GetLfsCustomTransferInt(key, tt.defaultVal)
if tt.wantErr && err == nil {
t.Fatalf("expected error, got nil")
}
if !tt.wantErr && err != nil {
t.Fatalf("unexpected error: %v", err)
}
if got != tt.want {
t.Fatalf("value = %d, want %d", got, tt.want)
}
})
}
}
12 changes: 12 additions & 0 deletions client/common/progress.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
package common

// ProgressEvent matches the Git LFS custom transfer progress payload.
type ProgressEvent struct {
Event string `json:"event"`
Oid string `json:"oid"`
BytesSoFar int64 `json:"bytesSoFar"`
BytesSinceLast int64 `json:"bytesSinceLast"`
}

// ProgressCallback emits transfer progress updates.
type ProgressCallback func(ProgressEvent) error
4 changes: 4 additions & 0 deletions client/common/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,21 +15,25 @@ type FileUploadRequestObject struct {
Filename string
FileMetadata FileMetadata
GUID string
OID string
PresignedURL string
Bucket string `json:"bucket,omitempty"`
Progress ProgressCallback
}

// FileDownloadResponseObject defines a object for file download
type FileDownloadResponseObject struct {
DownloadPath string
Filename string
GUID string
OID string
URL string
Range int64
Overwrite bool
Skip bool
Response *http.Response
Writer io.Writer
Progress ProgressCallback
}

// FileMetadata defines the metadata accepted by the new object management API, Shepherd
Expand Down
64 changes: 47 additions & 17 deletions client/download/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,18 @@ func downloadFiles(
// Scoreboard: maxRetries = 0 for now (no retry logic yet)
sb := logs.NewSB(0, logger)

p := mpb.New(mpb.WithOutput(os.Stdout))
useProgressBars := true
for _, fdr := range files {
if fdr.Progress != nil {
useProgressBars = false
break
}
}

var p *mpb.Progress
if useProgressBars {
p = mpb.New(mpb.WithOutput(os.Stdout))
}

var eg errgroup.Group
eg.SetLimit(numParallel)
Expand Down Expand Up @@ -101,29 +112,46 @@ func downloadFiles(

// Progress bar for this file
total := fdr.Response.ContentLength + fdr.Range
bar := p.AddBar(total,
mpb.PrependDecorators(
decor.Name(truncateFilename(fdr.Filename, 40)+" "),
decor.CountersKibiByte("% .1f / % .1f"),
),
mpb.AppendDecorators(
decor.Percentage(),
decor.AverageSpeed(decor.SizeB1024(0), "% .1f"),
),
)
var writer io.Writer = file
var bar *mpb.Bar
var tracker *progressWriter

if useProgressBars {
bar = p.AddBar(total,
mpb.PrependDecorators(
decor.Name(truncateFilename(fdr.Filename, 40)+" "),
decor.CountersKibiByte("% .1f / % .1f"),
),
mpb.AppendDecorators(
decor.Percentage(),
decor.AverageSpeed(decor.SizeB1024(0), "% .1f"),
),
)

if fdr.Range > 0 {
bar.SetCurrent(fdr.Range)
}

if fdr.Range > 0 {
bar.SetCurrent(fdr.Range)
writer = bar.ProxyWriter(file)
} else if fdr.Progress != nil {
tracker = newProgressWriter(file, fdr.Progress, resolveDownloadOID(*fdr), total)
writer = tracker
}

writer := bar.ProxyWriter(file)

_, copyErr := io.Copy(writer, fdr.Response.Body)
_ = fdr.Response.Body.Close()
_ = file.Close()

if tracker != nil {
if finalizeErr := tracker.Finalize(); finalizeErr != nil && copyErr == nil {
copyErr = finalizeErr
}
}

if copyErr != nil {
bar.Abort(true)
if bar != nil {
bar.Abort(true)
}
err = fmt.Errorf("download failed for %s: %w", fdr.Filename, copyErr)
return err
}
Expand All @@ -134,7 +162,9 @@ func downloadFiles(

// Wait for all downloads
_ = eg.Wait()
p.Wait()
if p != nil {
p.Wait()
}

// Combine errors
var combinedError error
Expand Down
68 changes: 68 additions & 0 deletions client/download/progress_writer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
package download

import (
"io"

"github.com/calypr/data-client/client/common"
)

type progressWriter struct {
writer io.Writer
onProgress common.ProgressCallback
oid string
total int64
bytesSoFar int64
}

func newProgressWriter(writer io.Writer, onProgress common.ProgressCallback, oid string, total int64) *progressWriter {
return &progressWriter{
writer: writer,
onProgress: onProgress,
oid: oid,
total: total,
}
}

func (pw *progressWriter) Write(p []byte) (int, error) {
n, err := pw.writer.Write(p)
if n > 0 && pw.onProgress != nil {
delta := int64(n)
pw.bytesSoFar += delta
if progressErr := pw.onProgress(common.ProgressEvent{
Event: "progress",
Oid: pw.oid,
BytesSoFar: pw.bytesSoFar,
BytesSinceLast: delta,
}); progressErr != nil {
return n, progressErr
}
}
return n, err
}

func (pw *progressWriter) Finalize() error {
if pw.onProgress == nil {
return nil
}
if pw.total == 0 || pw.bytesSoFar >= pw.total {
return nil
}
delta := pw.total - pw.bytesSoFar
pw.bytesSoFar = pw.total
return pw.onProgress(common.ProgressEvent{
Event: "progress",
Oid: pw.oid,
BytesSoFar: pw.bytesSoFar,
BytesSinceLast: delta,
})
}

func resolveDownloadOID(fdr common.FileDownloadResponseObject) string {
if fdr.OID != "" {
return fdr.OID
}
if fdr.GUID != "" {
return fdr.GUID
}
return fdr.Filename
}
Loading