Skip to content

Commit

Permalink
copyremote initial impl, fix for wavefs
Browse files Browse the repository at this point in the history
  • Loading branch information
esimkowitz committed Feb 2, 2025
1 parent bafd175 commit 8cb908f
Show file tree
Hide file tree
Showing 2 changed files with 95 additions and 10 deletions.
81 changes: 80 additions & 1 deletion pkg/remote/fileshare/s3fs/s3fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,15 @@
package s3fs

import (
"archive/tar"
"bytes"
"context"
"errors"
"fmt"
"io"
"io/fs"
"log"
"path"
"regexp"
"strings"
"sync"
Expand Down Expand Up @@ -569,7 +571,51 @@ func (c S3Client) MoveInternal(ctx context.Context, srcConn, destConn *connparse
}

func (c S3Client) CopyRemote(ctx context.Context, srcConn, destConn *connparse.Connection, srcClient fstype.FileShareClient, opts *wshrpc.FileCopyOpts) error {
return errors.ErrUnsupported

destBucket := destConn.Host
destKey := destConn.Path
overwrite := opts != nil && opts.Overwrite
merge := opts != nil && opts.Merge
if destBucket == "" || destBucket == "/" {
return errors.Join(errors.ErrUnsupported, fmt.Errorf("destination bucket must be specified"))
}
entries, err := c.ListEntries(ctx, destConn, nil)
if err != nil {
return err
}
if len(entries) > 1 && !merge {
return errors.Join(errors.ErrUnsupported, fmt.Errorf("more than one entry in destination, use merge option to copy to existing directory"))
}
destPrefix := getPathPrefix(destConn)
readCtx, cancel := context.WithCancelCause(ctx)
defer cancel(nil)
ioch := srcClient.ReadTarStream(readCtx, srcConn, opts)
err = tarcopy.TarCopyDest(readCtx, cancel, ioch, func(next *tar.Header, reader *tar.Reader) error {
log.Printf("copying %v", next.Name)
if next.Typeflag == tar.TypeDir {
return nil
}
fileName, err := cleanPath(path.Join(destPrefix, next.Name))
if !overwrite {
for _, entry := range entries {
if entry.Name == fileName {
return fmt.Errorf("destination already exists: %v", fileName)
}
}
}
_, err = c.client.PutObject(ctx, &s3.PutObjectInput{
Bucket: aws.String(destBucket),
Key: aws.String(destKey + next.Name),
Body: reader,
ContentLength: aws.Int64(next.Size),
})
return err
})
if err != nil {
cancel(err)
return err
}
return nil
}

func (c S3Client) CopyInternal(ctx context.Context, srcConn, destConn *connparse.Connection, opts *wshrpc.FileCopyOpts) error {
Expand Down Expand Up @@ -679,3 +725,36 @@ func getParentPath(conn *connparse.Connection) string {
}
return parentPath
}

func getPathPrefix(conn *connparse.Connection) string {
fullUri := conn.GetFullURI()
pathPrefix := fullUri
lastSlash := strings.LastIndex(fullUri, "/")
if lastSlash > 10 && lastSlash < len(fullUri)-1 {
pathPrefix = fullUri[:lastSlash+1]
}
return pathPrefix
}

func cleanPath(path string) (string, error) {
if path == "" {
return "", fmt.Errorf("path is empty")
}
if strings.HasPrefix(path, "/") {
path = path[1:]
}
if strings.HasPrefix(path, "~") || strings.HasPrefix(path, ".") || strings.HasPrefix(path, "..") {
return "", fmt.Errorf("s3 path cannot start with ~, ., or ..")
}
var newParts []string
for _, part := range strings.Split(path, "/") {
if part == ".." {
if len(newParts) > 0 {
newParts = newParts[:len(newParts)-1]
}
} else if part != "." {
newParts = append(newParts, part)
}
}
return strings.Join(newParts, "/"), nil
}
24 changes: 15 additions & 9 deletions pkg/remote/fileshare/wavefs/wavefs.go
Original file line number Diff line number Diff line change
Expand Up @@ -443,27 +443,33 @@ func (c WaveClient) CopyRemote(ctx context.Context, srcConn, destConn *connparse
if zoneId == "" {
return fmt.Errorf("zoneid not found in connection")
}
overwrite := opts != nil && opts.Overwrite
merge := opts != nil && opts.Merge
destPrefix := getPathPrefix(destConn)
destPrefix = strings.TrimPrefix(destPrefix, destConn.GetSchemeAndHost()+"/")
log.Printf("CopyRemote: srcConn: %v, destConn: %v, destPrefix: %s\n", srcConn, destConn, destPrefix)
entries, err := c.ListEntries(ctx, srcConn, nil)
if err != nil {
return fmt.Errorf("error listing blockfiles: %w", err)
}
if len(entries) > 1 && !merge {
return fmt.Errorf("more than one entry at destination prefix, use merge flag to copy")
}
readCtx, cancel := context.WithCancelCause(ctx)
ioch := srcClient.ReadTarStream(readCtx, srcConn, opts)
err := tarcopy.TarCopyDest(readCtx, cancel, ioch, func(next *tar.Header, reader *tar.Reader) error {
err = tarcopy.TarCopyDest(readCtx, cancel, ioch, func(next *tar.Header, reader *tar.Reader) error {
if next.Typeflag == tar.TypeDir {
return nil
}
fileName, err := cleanPath(path.Join(destPrefix, next.Name))
if err != nil {
return fmt.Errorf("error cleaning path: %w", err)
}
_, err = filestore.WFS.Stat(ctx, zoneId, fileName)
if err != nil {
if !errors.Is(err, fs.ErrNotExist) {
return fmt.Errorf("error getting blockfile info: %w", err)
}
err := filestore.WFS.MakeFile(ctx, zoneId, fileName, nil, wshrpc.FileOpts{})
if err != nil {
return fmt.Errorf("error making blockfile: %w", err)
if !overwrite {
for _, entry := range entries {
if entry.Name == fileName {
return fmt.Errorf("destination already exists: %v", fileName)
}
}
}
log.Printf("CopyRemote: writing file: %s; size: %d\n", fileName, next.Size)
Expand Down

0 comments on commit 8cb908f

Please sign in to comment.