Skip to content

Commit

Permalink
enable wsh file cross-remote copy/move (#1725)
Browse files Browse the repository at this point in the history
This adds the ability to stream `tar` archives over channels between
`wsh` instances. The main use cases for this are remote copy and move
operations.

It also completes the `wavefs` implementation of the FileShare interface
to allow copy/move interoperability between wavefiles and other storage
types.

The tar streaming functionality has been broken out into the new
`tarcopy` package for easy reuse.

New `fileshare` functions are added for `CopyInternal`, which allows
copying files internal to a filesystem to bypass the expensive interop
layer, and `MoveInternal`, which does the same for moving a file within
a filesystem. Copying between remotes is now handled by `CopyRemote`,
which accepts the source `FileShareClient` as a parameter. `wsh`
connections use the same implementation for `CopyInternal` and
`CopyRemote` as they need to request the channel on the remote
destination, since we don't offer a way to pass channels as a parameter
to a remote call.

This also adds a recursive `-r` flag to `wsh file rm` to allow for
deleting a directory and all its contents.

S3 support will be addressed in a future PR.

---------

Co-authored-by: sawka <mike@commandline.dev>
Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com>
Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
  • Loading branch information
4 people authored Jan 31, 2025
1 parent d224bd4 commit 902ff9b
Show file tree
Hide file tree
Showing 23 changed files with 790 additions and 316 deletions.
1 change: 1 addition & 0 deletions cmd/generatego/main-generatego.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ func GenerateWshClient() error {
"github.com/wavetermdev/waveterm/pkg/waveobj",
"github.com/wavetermdev/waveterm/pkg/wps",
"github.com/wavetermdev/waveterm/pkg/vdom",
"github.com/wavetermdev/waveterm/pkg/util/iochan/iochantypes",
})
wshDeclMap := wshrpc.GenerateWshCommandDeclMap()
for _, key := range utilfn.GetOrderedMapKeys(wshDeclMap) {
Expand Down
7 changes: 6 additions & 1 deletion cmd/wsh/cmd/wshcmd-file.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func init() {
fileCmd.AddCommand(fileListCmd)
fileCmd.AddCommand(fileCatCmd)
fileCmd.AddCommand(fileWriteCmd)
fileRmCmd.Flags().BoolP("recursive", "r", false, "remove directories recursively")
fileCmd.AddCommand(fileRmCmd)
fileCmd.AddCommand(fileInfoCmd)
fileCmd.AddCommand(fileAppendCmd)
Expand Down Expand Up @@ -259,6 +260,10 @@ func fileRmRun(cmd *cobra.Command, args []string) error {
if err != nil {
return err
}
recursive, err := cmd.Flags().GetBool("recursive")
if err != nil {
return err
}
fileData := wshrpc.FileData{
Info: &wshrpc.FileInfo{
Path: path}}
Expand All @@ -272,7 +277,7 @@ func fileRmRun(cmd *cobra.Command, args []string) error {
return fmt.Errorf("getting file info: %w", err)
}

err = wshclient.FileDeleteCommand(RpcClient, fileData, &wshrpc.RpcOpts{Timeout: DefaultFileTimeout})
err = wshclient.FileDeleteCommand(RpcClient, wshrpc.CommandDeleteFileData{Path: path, Recursive: recursive}, &wshrpc.RpcOpts{Timeout: DefaultFileTimeout})
if err != nil {
return fmt.Errorf("removing file: %w", err)
}
Expand Down
8 changes: 4 additions & 4 deletions frontend/app/store/wshclientapi.ts
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ class RpcApiType {
}

// command "filedelete" [call]
FileDeleteCommand(client: WshClient, data: FileData, opts?: RpcOpts): Promise<void> {
FileDeleteCommand(client: WshClient, data: CommandDeleteFileData, opts?: RpcOpts): Promise<void> {
return client.wshRpcCall("filedelete", data, opts);
}

Expand Down Expand Up @@ -203,7 +203,7 @@ class RpcApiType {
}

// command "filestreamtar" [responsestream]
FileStreamTarCommand(client: WshClient, data: CommandRemoteStreamTarData, opts?: RpcOpts): AsyncGenerator<string, void, boolean> {
FileStreamTarCommand(client: WshClient, data: CommandRemoteStreamTarData, opts?: RpcOpts): AsyncGenerator<Packet, void, boolean> {
return client.wshRpcStream("filestreamtar", data, opts);
}

Expand Down Expand Up @@ -258,7 +258,7 @@ class RpcApiType {
}

// command "remotefiledelete" [call]
RemoteFileDeleteCommand(client: WshClient, data: string, opts?: RpcOpts): Promise<void> {
RemoteFileDeleteCommand(client: WshClient, data: CommandDeleteFileData, opts?: RpcOpts): Promise<void> {
return client.wshRpcCall("remotefiledelete", data, opts);
}

Expand Down Expand Up @@ -313,7 +313,7 @@ class RpcApiType {
}

// command "remotetarstream" [responsestream]
RemoteTarStreamCommand(client: WshClient, data: CommandRemoteStreamTarData, opts?: RpcOpts): AsyncGenerator<string, void, boolean> {
RemoteTarStreamCommand(client: WshClient, data: CommandRemoteStreamTarData, opts?: RpcOpts): AsyncGenerator<Packet, void, boolean> {
return client.wshRpcStream("remotetarstream", data, opts);
}

Expand Down
12 changes: 12 additions & 0 deletions frontend/types/gotypes.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,12 @@ declare global {
blockid: string;
};

// wshrpc.CommandDeleteFileData
type CommandDeleteFileData = {
path: string;
recursive: boolean;
};

// wshrpc.CommandDisposeData
type CommandDisposeData = {
routeid: string;
Expand Down Expand Up @@ -583,6 +589,12 @@ declare global {
// waveobj.ORef
type ORef = string;

// iochantypes.Packet
type Packet = {
Data: string;
Checksum: string;
};

// wshrpc.PathCommandData
type PathCommandData = {
pathtype: string;
Expand Down
29 changes: 7 additions & 22 deletions pkg/remote/awsconn/awsconn.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,28 +124,13 @@ func ParseProfiles() map[string]struct{} {
}

func ListBuckets(ctx context.Context, client *s3.Client) ([]types.Bucket, error) {
var err error
var output *s3.ListBucketsOutput
var buckets []types.Bucket
region := client.Options().Region
bucketPaginator := s3.NewListBucketsPaginator(client, &s3.ListBucketsInput{BucketRegion: &region})
for bucketPaginator.HasMorePages() {
output, err = bucketPaginator.NextPage(ctx)
log.Printf("output: %v", output)
if err != nil {
var apiErr smithy.APIError
if errors.As(err, &apiErr) && apiErr.ErrorCode() == "AccessDenied" {
fmt.Println("You don't have permission to list buckets for this account.")
err = apiErr
} else {
return nil, fmt.Errorf("Couldn't list buckets for your account. Here's why: %v\n", err)
}
break
}
if output == nil {
break
output, err := client.ListBuckets(ctx, &s3.ListBucketsInput{})
if err != nil {
var apiErr smithy.APIError
if errors.As(err, &apiErr) {
return nil, fmt.Errorf("error listing buckets: %v", apiErr)
}
buckets = append(buckets, output.Buckets...)
return nil, fmt.Errorf("error listing buckets: %v", err)
}
return buckets, nil
return output.Buckets, nil
}
6 changes: 5 additions & 1 deletion pkg/remote/connparse/connparse.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ func (c *Connection) GetFullURI() string {
return c.Scheme + "://" + c.GetPathWithHost()
}

func (c *Connection) GetSchemeAndHost() string {
return c.Scheme + "://" + c.Host
}

func ParseURIAndReplaceCurrentHost(ctx context.Context, uri string) (*Connection, error) {
conn, err := ParseURI(uri)
if err != nil {
Expand Down Expand Up @@ -148,7 +152,7 @@ func ParseURI(uri string) (*Connection, error) {
}
if strings.HasPrefix(remotePath, "/~") {
remotePath = strings.TrimPrefix(remotePath, "/")
} else if len(remotePath) > 1 && !windowsDriveRegex.MatchString(remotePath) && !strings.HasPrefix(remotePath, "/") && !strings.HasPrefix(remotePath, "~") {
} else if len(remotePath) > 1 && !windowsDriveRegex.MatchString(remotePath) && !strings.HasPrefix(remotePath, "/") && !strings.HasPrefix(remotePath, "~") && !strings.HasPrefix(remotePath, "./") && !strings.HasPrefix(remotePath, "../") && !strings.HasPrefix(remotePath, ".\\") && !strings.HasPrefix(remotePath, "..\\") && remotePath != ".." {
remotePath = "/" + remotePath
}
}
Expand Down
48 changes: 48 additions & 0 deletions pkg/remote/connparse/connparse_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,54 @@ func TestParseURI_WSHCurrentPathShorthand(t *testing.T) {
}
}

func TestParseURI_WSHCurrentPath(t *testing.T) {
cstr := "./Documents/path/to/file"
c, err := connparse.ParseURI(cstr)
if err != nil {
t.Fatalf("failed to parse URI: %v", err)
}
expected := "./Documents/path/to/file"
if c.Path != expected {
t.Fatalf("expected path to be %q, got %q", expected, c.Path)
}
expected = "current"
if c.Host != expected {
t.Fatalf("expected host to be %q, got %q", expected, c.Host)
}
expected = "wsh"
if c.Scheme != expected {
t.Fatalf("expected scheme to be %q, got %q", expected, c.Scheme)
}
expected = "wsh://current/./Documents/path/to/file"
if c.GetFullURI() != expected {
t.Fatalf("expected full URI to be %q, got %q", expected, c.GetFullURI())
}
}

func TestParseURI_WSHCurrentPathWindows(t *testing.T) {
cstr := ".\\Documents\\path\\to\\file"
c, err := connparse.ParseURI(cstr)
if err != nil {
t.Fatalf("failed to parse URI: %v", err)
}
expected := ".\\Documents\\path\\to\\file"
if c.Path != expected {
t.Fatalf("expected path to be %q, got %q", expected, c.Path)
}
expected = "current"
if c.Host != expected {
t.Fatalf("expected host to be %q, got %q", expected, c.Host)
}
expected = "wsh"
if c.Scheme != expected {
t.Fatalf("expected scheme to be %q, got %q", expected, c.Scheme)
}
expected = "wsh://current/.\\Documents\\path\\to\\file"
if c.GetFullURI() != expected {
t.Fatalf("expected full URI to be %q, got %q", expected, c.GetFullURI())
}
}

func TestParseURI_WSHLocalShorthand(t *testing.T) {
t.Parallel()
cstr := "/~/path/to/file"
Expand Down
59 changes: 35 additions & 24 deletions pkg/remote/fileshare/fileshare.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,11 @@ import (
"fmt"
"log"

"github.com/wavetermdev/waveterm/pkg/remote/awsconn"
"github.com/wavetermdev/waveterm/pkg/remote/connparse"
"github.com/wavetermdev/waveterm/pkg/remote/fileshare/fstype"
"github.com/wavetermdev/waveterm/pkg/remote/fileshare/s3fs"
"github.com/wavetermdev/waveterm/pkg/remote/fileshare/wavefs"
"github.com/wavetermdev/waveterm/pkg/remote/fileshare/wshfs"
"github.com/wavetermdev/waveterm/pkg/util/iochan/iochantypes"
"github.com/wavetermdev/waveterm/pkg/wshrpc"
"github.com/wavetermdev/waveterm/pkg/wshutil"
)
Expand All @@ -29,12 +28,12 @@ func CreateFileShareClient(ctx context.Context, connection string) (fstype.FileS
}
conntype := conn.GetType()
if conntype == connparse.ConnectionTypeS3 {
config, err := awsconn.GetConfig(ctx, connection)
if err != nil {
log.Printf("error getting aws config: %v", err)
return nil, nil
}
return s3fs.NewS3Client(config), conn
// config, err := awsconn.GetConfig(ctx, connection)
// if err != nil {
// log.Printf("error getting aws config: %v", err)
// return nil, nil
// }
return nil, nil
} else if conntype == connparse.ConnectionTypeWave {
return wavefs.NewWaveClient(), conn
} else if conntype == connparse.ConnectionTypeWsh {
Expand All @@ -61,10 +60,10 @@ func ReadStream(ctx context.Context, data wshrpc.FileData) <-chan wshrpc.RespOrE
return client.ReadStream(ctx, conn, data)
}

func ReadTarStream(ctx context.Context, data wshrpc.CommandRemoteStreamTarData) <-chan wshrpc.RespOrErrorUnion[[]byte] {
func ReadTarStream(ctx context.Context, data wshrpc.CommandRemoteStreamTarData) <-chan wshrpc.RespOrErrorUnion[iochantypes.Packet] {
client, conn := CreateFileShareClient(ctx, data.Path)
if conn == nil || client == nil {
return wshutil.SendErrCh[[]byte](fmt.Errorf(ErrorParsingConnection, data.Path))
return wshutil.SendErrCh[iochantypes.Packet](fmt.Errorf(ErrorParsingConnection, data.Path))
}
return client.ReadTarStream(ctx, conn, data.Opts)
}
Expand Down Expand Up @@ -110,35 +109,47 @@ func Mkdir(ctx context.Context, path string) error {
}

func Move(ctx context.Context, data wshrpc.CommandFileCopyData) error {
srcConn, err := connparse.ParseURIAndReplaceCurrentHost(ctx, data.SrcUri)
if err != nil {
return fmt.Errorf("error parsing source connection %s: %v", data.SrcUri, err)
srcClient, srcConn := CreateFileShareClient(ctx, data.SrcUri)
if srcConn == nil || srcClient == nil {
return fmt.Errorf("error creating fileshare client, could not parse source connection %s", data.SrcUri)
}
destClient, destConn := CreateFileShareClient(ctx, data.DestUri)
if destConn == nil || destClient == nil {
return fmt.Errorf("error creating fileshare client, could not parse connection %s or %s", data.SrcUri, data.DestUri)
return fmt.Errorf("error creating fileshare client, could not parse destination connection %s", data.DestUri)
}
if srcConn.Host != destConn.Host {
err := destClient.CopyRemote(ctx, srcConn, destConn, srcClient, data.Opts)
if err != nil {
return fmt.Errorf("cannot copy %q to %q: %w", data.SrcUri, data.DestUri, err)
}
return srcClient.Delete(ctx, srcConn, data.Opts.Recursive)
} else {
return srcClient.MoveInternal(ctx, srcConn, destConn, data.Opts)
}
return destClient.Move(ctx, srcConn, destConn, data.Opts)
}

func Copy(ctx context.Context, data wshrpc.CommandFileCopyData) error {
srcConn, err := connparse.ParseURIAndReplaceCurrentHost(ctx, data.SrcUri)
if err != nil {
return fmt.Errorf("error parsing source connection %s: %v", data.SrcUri, err)
srcClient, srcConn := CreateFileShareClient(ctx, data.SrcUri)
if srcConn == nil || srcClient == nil {
return fmt.Errorf("error creating fileshare client, could not parse source connection %s", data.SrcUri)
}
destClient, destConn := CreateFileShareClient(ctx, data.DestUri)
if destConn == nil || destClient == nil {
return fmt.Errorf("error creating fileshare client, could not parse connection %s or %s", data.SrcUri, data.DestUri)
return fmt.Errorf("error creating fileshare client, could not parse destination connection %s", data.DestUri)
}
if srcConn.Host != destConn.Host {
return destClient.CopyRemote(ctx, srcConn, destConn, srcClient, data.Opts)
} else {
return srcClient.CopyInternal(ctx, srcConn, destConn, data.Opts)
}
return destClient.Copy(ctx, srcConn, destConn, data.Opts)
}

func Delete(ctx context.Context, path string) error {
client, conn := CreateFileShareClient(ctx, path)
func Delete(ctx context.Context, data wshrpc.CommandDeleteFileData) error {
client, conn := CreateFileShareClient(ctx, data.Path)
if conn == nil || client == nil {
return fmt.Errorf(ErrorParsingConnection, path)
return fmt.Errorf(ErrorParsingConnection, data.Path)
}
return client.Delete(ctx, conn)
return client.Delete(ctx, conn, data.Recursive)
}

func Join(ctx context.Context, path string, parts ...string) (string, error) {
Expand Down
15 changes: 9 additions & 6 deletions pkg/remote/fileshare/fstype/fstype.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"

"github.com/wavetermdev/waveterm/pkg/remote/connparse"
"github.com/wavetermdev/waveterm/pkg/util/iochan/iochantypes"
"github.com/wavetermdev/waveterm/pkg/wshrpc"
)

Expand All @@ -18,7 +19,7 @@ type FileShareClient interface {
// ReadStream returns a stream of file data at the given path. If it's a directory, then the list of entries
ReadStream(ctx context.Context, conn *connparse.Connection, data wshrpc.FileData) <-chan wshrpc.RespOrErrorUnion[wshrpc.FileData]
// ReadTarStream returns a stream of tar data at the given path
ReadTarStream(ctx context.Context, conn *connparse.Connection, opts *wshrpc.FileCopyOpts) <-chan wshrpc.RespOrErrorUnion[[]byte]
ReadTarStream(ctx context.Context, conn *connparse.Connection, opts *wshrpc.FileCopyOpts) <-chan wshrpc.RespOrErrorUnion[iochantypes.Packet]
// ListEntries returns the list of entries at the given path, or nothing if the path is a file
ListEntries(ctx context.Context, conn *connparse.Connection, opts *wshrpc.FileListOpts) ([]*wshrpc.FileInfo, error)
// ListEntriesStream returns a stream of entries at the given path
Expand All @@ -29,12 +30,14 @@ type FileShareClient interface {
AppendFile(ctx context.Context, conn *connparse.Connection, data wshrpc.FileData) error
// Mkdir creates a directory at the given path
Mkdir(ctx context.Context, conn *connparse.Connection) error
// Move moves the file from srcConn to destConn
Move(ctx context.Context, srcConn, destConn *connparse.Connection, opts *wshrpc.FileCopyOpts) error
// Copy copies the file from srcConn to destConn
Copy(ctx context.Context, srcConn, destConn *connparse.Connection, opts *wshrpc.FileCopyOpts) error
// Move moves the file within the same connection
MoveInternal(ctx context.Context, srcConn, destConn *connparse.Connection, opts *wshrpc.FileCopyOpts) error
// Copy copies the file within the same connection
CopyInternal(ctx context.Context, srcConn, destConn *connparse.Connection, opts *wshrpc.FileCopyOpts) error
// CopyRemote copies the file between different connections
CopyRemote(ctx context.Context, srcConn, destConn *connparse.Connection, srcClient FileShareClient, opts *wshrpc.FileCopyOpts) error
// Delete deletes the entry at the given path
Delete(ctx context.Context, conn *connparse.Connection) error
Delete(ctx context.Context, conn *connparse.Connection, recursive bool) error
// Join joins the given parts to the connection path
Join(ctx context.Context, conn *connparse.Connection, parts ...string) (string, error)
// GetConnectionType returns the type of connection for the fileshare
Expand Down
Loading

0 comments on commit 902ff9b

Please sign in to comment.