Skip to content

Commit

Permalink
wsh file overhaul without cross-remote copy and S3 (#1790)
Browse files Browse the repository at this point in the history
  • Loading branch information
esimkowitz authored Jan 22, 2025
1 parent a5fa098 commit 11fec56
Show file tree
Hide file tree
Showing 61 changed files with 3,418 additions and 1,202 deletions.
1 change: 1 addition & 0 deletions cmd/generatego/main-generatego.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ func GenerateWshClient() error {
gogen.GenerateBoilerplate(&buf, "wshclient", []string{
"github.com/wavetermdev/waveterm/pkg/wshutil",
"github.com/wavetermdev/waveterm/pkg/wshrpc",
"github.com/wavetermdev/waveterm/pkg/wconfig",
"github.com/wavetermdev/waveterm/pkg/waveobj",
"github.com/wavetermdev/waveterm/pkg/wps",
"github.com/wavetermdev/waveterm/pkg/vdom",
Expand Down
2 changes: 2 additions & 0 deletions cmd/server/main-server.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/wavetermdev/waveterm/pkg/filestore"
"github.com/wavetermdev/waveterm/pkg/panichandler"
"github.com/wavetermdev/waveterm/pkg/remote/conncontroller"
"github.com/wavetermdev/waveterm/pkg/remote/fileshare/wshfs"
"github.com/wavetermdev/waveterm/pkg/service"
"github.com/wavetermdev/waveterm/pkg/telemetry"
"github.com/wavetermdev/waveterm/pkg/util/shellutil"
Expand Down Expand Up @@ -175,6 +176,7 @@ func shutdownActivityUpdate() {

func createMainWshClient() {
rpc := wshserver.GetMainRpcClient()
wshfs.RpcClient = rpc
wshutil.DefaultRouter.RegisterRoute(wshutil.DefaultRoute, rpc, true)
wps.Broker.SetClient(wshutil.DefaultRouter)
localConnWsh := wshutil.MakeWshRpc(nil, nil, wshrpc.RpcContext{Conn: wshrpc.LocalConnName}, &wshremote.ServerImpl{})
Expand Down
3 changes: 3 additions & 0 deletions cmd/wsh/cmd/wshcmd-connserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (

"github.com/spf13/cobra"
"github.com/wavetermdev/waveterm/pkg/panichandler"
"github.com/wavetermdev/waveterm/pkg/remote/fileshare/wshfs"
"github.com/wavetermdev/waveterm/pkg/util/packetparser"
"github.com/wavetermdev/waveterm/pkg/wavebase"
"github.com/wavetermdev/waveterm/pkg/wshrpc"
Expand Down Expand Up @@ -180,6 +181,7 @@ func serverRunRouter(jwtToken string) error {
if err != nil {
return fmt.Errorf("error setting up connserver rpc client: %v", err)
}
wshfs.RpcClient = client
go runListener(unixListener, router)
// run the sysinfo loop
wshremote.RunSysInfoLoop(client, client.GetRpcContext().Conn)
Expand Down Expand Up @@ -224,6 +226,7 @@ func serverRunNormal(jwtToken string) error {
if err != nil {
return err
}
wshfs.RpcClient = RpcClient
WriteStdout("running wsh connserver (%s)\n", RpcContext.Conn)
go wshremote.RunSysInfoLoop(RpcClient, RpcContext.Conn)
select {} // run forever
Expand Down
67 changes: 43 additions & 24 deletions cmd/wsh/cmd/wshcmd-file-util.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,9 @@ import (
"io/fs"
"strings"

"github.com/wavetermdev/waveterm/pkg/remote/connparse"
"github.com/wavetermdev/waveterm/pkg/util/fileutil"
"github.com/wavetermdev/waveterm/pkg/util/wavefileutil"
"github.com/wavetermdev/waveterm/pkg/wshrpc"
"github.com/wavetermdev/waveterm/pkg/wshrpc/wshclient"
)
Expand All @@ -24,15 +27,11 @@ func convertNotFoundErr(err error) error {
return err
}

func ensureWaveFile(origName string, fileData wshrpc.CommandFileData) (*wshrpc.WaveFileInfo, error) {
func ensureFile(origName string, fileData wshrpc.FileData) (*wshrpc.FileInfo, error) {
info, err := wshclient.FileInfoCommand(RpcClient, fileData, &wshrpc.RpcOpts{Timeout: DefaultFileTimeout})
err = convertNotFoundErr(err)
if err == fs.ErrNotExist {
createData := wshrpc.CommandFileCreateData{
ZoneId: fileData.ZoneId,
FileName: fileData.FileName,
}
err = wshclient.FileCreateCommand(RpcClient, createData, &wshrpc.RpcOpts{Timeout: DefaultFileTimeout})
err = wshclient.FileCreateCommand(RpcClient, fileData, &wshrpc.RpcOpts{Timeout: DefaultFileTimeout})
if err != nil {
return nil, fmt.Errorf("creating file: %w", err)
}
Expand All @@ -48,7 +47,7 @@ func ensureWaveFile(origName string, fileData wshrpc.CommandFileData) (*wshrpc.W
return info, nil
}

func streamWriteToWaveFile(fileData wshrpc.CommandFileData, reader io.Reader) error {
func streamWriteToFile(fileData wshrpc.FileData, reader io.Reader) error {
// First truncate the file with an empty write
emptyWrite := fileData
emptyWrite.Data64 = ""
Expand Down Expand Up @@ -81,7 +80,7 @@ func streamWriteToWaveFile(fileData wshrpc.CommandFileData, reader io.Reader) er
appendData := fileData
appendData.Data64 = base64.StdEncoding.EncodeToString(chunk)

err = wshclient.FileAppendCommand(RpcClient, appendData, &wshrpc.RpcOpts{Timeout: fileTimeout})
err = wshclient.FileAppendCommand(RpcClient, appendData, &wshrpc.RpcOpts{Timeout: int64(fileTimeout)})
if err != nil {
return fmt.Errorf("appending chunk to file: %w", err)
}
Expand All @@ -90,7 +89,7 @@ func streamWriteToWaveFile(fileData wshrpc.CommandFileData, reader io.Reader) er
return nil
}

func streamReadFromWaveFile(fileData wshrpc.CommandFileData, size int64, writer io.Writer) error {
func streamReadFromFile(fileData wshrpc.FileData, size int64, writer io.Writer) error {
const chunkSize = 32 * 1024 // 32KB chunks
for offset := int64(0); offset < size; offset += chunkSize {
// Calculate the length of this chunk
Expand All @@ -100,19 +99,19 @@ func streamReadFromWaveFile(fileData wshrpc.CommandFileData, size int64, writer
}

// Set up the ReadAt request
fileData.At = &wshrpc.CommandFileDataAt{
fileData.At = &wshrpc.FileDataAt{
Offset: offset,
Size: int64(length),
Size: length,
}

// Read the chunk
content64, err := wshclient.FileReadCommand(RpcClient, fileData, &wshrpc.RpcOpts{Timeout: fileTimeout})
data, err := wshclient.FileReadCommand(RpcClient, fileData, &wshrpc.RpcOpts{Timeout: int64(fileTimeout)})
if err != nil {
return fmt.Errorf("reading chunk at offset %d: %w", offset, err)
}

// Decode and write the chunk
chunk, err := base64.StdEncoding.DecodeString(content64)
chunk, err := base64.StdEncoding.DecodeString(data.Data64)
if err != nil {
return fmt.Errorf("decoding chunk at offset %d: %w", offset, err)
}
Expand All @@ -127,7 +126,7 @@ func streamReadFromWaveFile(fileData wshrpc.CommandFileData, size int64, writer
}

type fileListResult struct {
info *wshrpc.WaveFileInfo
info *wshrpc.FileInfo
err error
}

Expand All @@ -139,9 +138,9 @@ func streamFileList(zoneId string, path string, recursive bool, filesOnly bool)
go func() {
defer close(resultChan)

fileData := wshrpc.CommandFileData{
ZoneId: zoneId,
FileName: path,
fileData := wshrpc.FileData{
Info: &wshrpc.FileInfo{
Path: fmt.Sprintf(wavefileutil.WaveFilePathPattern, zoneId, path)},
}

info, err := wshclient.FileInfoCommand(RpcClient, fileData, &wshrpc.RpcOpts{Timeout: 2000})
Expand Down Expand Up @@ -169,13 +168,12 @@ func streamFileList(zoneId string, path string, recursive bool, filesOnly bool)
foundAny := false

for {
listData := wshrpc.CommandFileListData{
ZoneId: zoneId,
Prefix: prefix,
All: recursive,
Offset: offset,
Limit: 100,
}
listData := wshrpc.FileListData{
Path: fmt.Sprintf(wavefileutil.WaveFilePathPattern, zoneId, prefix),
Opts: &wshrpc.FileListOpts{
All: recursive,
Offset: offset,
Limit: 100}}

files, err := wshclient.FileListCommand(RpcClient, listData, &wshrpc.RpcOpts{Timeout: 2000})
if err != nil {
Expand Down Expand Up @@ -210,3 +208,24 @@ func streamFileList(zoneId string, path string, recursive bool, filesOnly bool)

return resultChan, nil
}

func fixRelativePaths(path string) (string, error) {
conn, err := connparse.ParseURI(path)
if err != nil {
return "", err
}
if conn.Scheme == connparse.ConnectionTypeWsh {
if conn.Host == connparse.ConnHostCurrent {
conn.Host = RpcContext.Conn
fixedPath, err := fileutil.FixPath(conn.Path)
if err != nil {
return "", err
}
conn.Path = fixedPath
}
if conn.Host == "" {
conn.Host = wshrpc.LocalConnName
}
}
return conn.GetFullURI(), nil
}
Loading

0 comments on commit 11fec56

Please sign in to comment.