Skip to content

Commit

Permalink
Merge pull request #33 from tablelandnetwork/bcalza/uptretrieve
Browse files Browse the repository at this point in the history
updates retrieve command to read from cache
  • Loading branch information
brunocalza authored Jan 19, 2024
2 parents 4043f3b + 4c566d2 commit 0e2ce0e
Show file tree
Hide file tree
Showing 8 changed files with 293 additions and 77 deletions.
108 changes: 33 additions & 75 deletions cmd/vaults/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"encoding/json"
"errors"
"fmt"
"io"
"os"
"path"
"regexp"
Expand All @@ -16,13 +15,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/ethereum/go-ethereum/crypto"
"github.com/filecoin-project/lassie/pkg/lassie"
"github.com/filecoin-project/lassie/pkg/storage"
"github.com/filecoin-project/lassie/pkg/types"
"github.com/ipfs/go-cid"
"github.com/ipld/go-car/v2"
"github.com/ipld/go-car/v2/storage/deferred"
trustlessutils "github.com/ipld/go-trustless-utils"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/olekukonko/tablewriter"
Expand Down Expand Up @@ -596,7 +589,9 @@ func newListEventsCommand() *cli.Command {
}

func newRetrieveCommand() *cli.Command {
var output string
var output, provider string
var cache bool
var timeout int64

return &cli.Command{
Name: "retrieve",
Expand All @@ -614,6 +609,33 @@ func newRetrieveCommand() *cli.Command {
DefaultText: "current directory",
Destination: &output,
},
&cli.StringFlag{
Name: "provider",
Aliases: []string{"p"},
Category: "OPTIONAL:",
Usage: "The provider's address and port (e.g., localhost:8080)",
DefaultText: DefaultProviderHost,
Destination: &provider,
Value: DefaultProviderHost,
},
&cli.BoolFlag{
Name: "cache",
Aliases: []string{"c"},
Category: "OPTIONAL:",
Usage: "Retrieves from cache by setting this flag",
DefaultText: "current directory",
Destination: &cache,
Value: true,
},
&cli.Int64Flag{
Name: "timeout",
Aliases: []string{"t"},
Category: "OPTIONAL:",
Usage: "Timeout for retrieval operation (seconds)",
DefaultText: "no timeout",
Destination: &timeout,
Value: 0,
},
},
Action: func(cCtx *cli.Context) error {
arg := cCtx.Args().Get(0)
Expand All @@ -626,73 +648,9 @@ func newRetrieveCommand() *cli.Command {
return errors.New("CID is invalid")
}

lassie, err := lassie.NewLassie(cCtx.Context)
if err != nil {
return fmt.Errorf("failed to create lassie instance: %s", err)
}

carOpts := []car.Option{
car.WriteAsCarV1(true),
car.StoreIdentityCIDs(false),
car.UseWholeCIDs(false),
}

var carWriter *deferred.DeferredCarWriter
var tmpFile *os.File

if output == "-" {
// Create a temporary file only for writing to stdout case
tmpFile, err = os.CreateTemp("", fmt.Sprintf("%s.car", arg))
if err != nil {
return fmt.Errorf("failed to create temporary file: %s", err)
}
defer func() {
_ = os.Remove(tmpFile.Name())
}()
carWriter = deferred.NewDeferredCarWriterForPath(tmpFile.Name(), []cid.Cid{rootCid}, carOpts...)
} else {
// Write to the provided path or current directory
if output == "" {
output = "." // Default to current directory
}
// Ensure path is a valid directory
info, err := os.Stat(output)
if err != nil {
return fmt.Errorf("failed to access output directory: %s", err)
}
if !info.IsDir() {
return fmt.Errorf("output path is not a directory: %s", output)
}
carPath := path.Join(output, fmt.Sprintf("%s.car", arg))
carWriter = deferred.NewDeferredCarWriterForPath(carPath, []cid.Cid{rootCid}, carOpts...)
}

defer func() {
_ = carWriter.Close()
}()
carStore := storage.NewCachingTempStore(
carWriter.BlockWriteOpener(), storage.NewDeferredStorageCar(os.TempDir(), rootCid),
)
defer func() {
_ = carStore.Close()
}()

request, err := types.NewRequestForPath(carStore, rootCid, "", trustlessutils.DagScopeAll, nil)
if err != nil {
return fmt.Errorf("failed to create request: %s", err)
}

if _, err := lassie.Fetch(cCtx.Context, request, []types.FetchOption{}...); err != nil {
return fmt.Errorf("failed to fetch: %s", err)
}

// Write to stdout only if the output flag is set to '-'
if output == "-" && tmpFile != nil {
_, _ = tmpFile.Seek(0, io.SeekStart)
_, err = io.Copy(os.Stdout, tmpFile)
if err != nil {
return fmt.Errorf("failed to write to stdout: %s", err)
}
retriever := app.NewRetriever(vaultsprovider.New(provider), cache, timeout)
if err := retriever.Retrieve(cCtx.Context, rootCid, output); err != nil {
return fmt.Errorf("failed to retrieve: %s", err)
}

return nil
Expand Down
138 changes: 138 additions & 0 deletions internal/app/retriever.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
package app

import (
"context"
"fmt"
"io"
"os"
"path"

"github.com/filecoin-project/lassie/pkg/lassie"
"github.com/filecoin-project/lassie/pkg/storage"
"github.com/filecoin-project/lassie/pkg/types"
"github.com/ipfs/go-cid"
"github.com/ipld/go-car/v2"
"github.com/ipld/go-car/v2/storage/deferred"
trustlessutils "github.com/ipld/go-trustless-utils"
)

type retriever interface {
retrieveStdout(context.Context, cid.Cid, int64) error
retrieveFile(context.Context, cid.Cid, string, int64) error
}

// Retriever is responsible for retrieving file from the network.
type Retriever struct {
store retriever
timeout int64
}

// NewRetriever creates a new Retriever.
func NewRetriever(provider VaultsProvider, cache bool, timeout int64) *Retriever {
if cache {
return &Retriever{
store: &cacheStore{
provider: provider,
},
timeout: timeout,
}
}

panic("cold store not implemented yet")
}

// Retrieve retrieves file from the network.
func (r *Retriever) Retrieve(ctx context.Context, c cid.Cid, output string) error {
if output == "-" {
return r.store.retrieveStdout(ctx, c, r.timeout)
}

return r.store.retrieveFile(ctx, c, output, r.timeout)
}

type cacheStore struct {
provider VaultsProvider
}

func (cs *cacheStore) retrieveStdout(ctx context.Context, cid cid.Cid, timeout int64) error {
if _, err := cs.provider.RetrieveEvent(ctx, RetrieveEventParams{
Timeout: timeout,
CID: cid,
}, os.Stdout); err != nil {
return fmt.Errorf("failed to retrieve to file: %s", err)
}

return nil
}

func (cs *cacheStore) retrieveFile(ctx context.Context, cid cid.Cid, output string, timeout int64) error {
// Write to the provided path or current directory
if output == "" {
output = "." // Default to current directory
}
// Ensure path is a valid directory
info, err := os.Stat(output)
if err != nil {
return fmt.Errorf("failed to access output directory: %s", err)
}
if !info.IsDir() {
return fmt.Errorf("output path is not a directory: %s", output)
}

f, err := os.OpenFile(path.Join(output, cid.String()), os.O_RDWR|os.O_CREATE, 0o666)
if err != nil {
return fmt.Errorf("failed to open tmp file: %s", err)
}
_, _ = f.Seek(0, io.SeekStart)

filename, err := cs.provider.RetrieveEvent(ctx, RetrieveEventParams{
Timeout: timeout,
CID: cid,
}, f)
if err != nil {
return fmt.Errorf("failed to retrieve to file: %s", err)
}

if err := os.Rename(f.Name(), path.Join(output, fmt.Sprintf("%s-%s", cid.String(), filename))); err != nil {
return fmt.Errorf("failed renaming the file: %s", err)
}

return nil
}

type coldStore struct{} // nolint

func (cs *coldStore) retrieve(ctx context.Context, c cid.Cid, path string) error { // nolint
lassie, err := lassie.NewLassie(ctx)
if err != nil {
return fmt.Errorf("failed to create lassie instance: %s", err)
}

carOpts := []car.Option{
car.WriteAsCarV1(true),
car.StoreIdentityCIDs(false),
car.UseWholeCIDs(false),
}

carWriter := deferred.NewDeferredCarWriterForPath(path, []cid.Cid{c}, carOpts...)
defer func() {
_ = carWriter.Close()
}()
carStore := storage.NewCachingTempStore(
carWriter.BlockWriteOpener(), storage.NewDeferredStorageCar(os.TempDir(), c),
)
defer func() {
_ = carStore.Close()
}()

request, err := types.NewRequestForPath(carStore, c, "", trustlessutils.DagScopeAll, nil)
if err != nil {
return fmt.Errorf("failed to create request: %s", err)
}

if _, err := lassie.Fetch(ctx, request, []types.FetchOption{}...); err != nil {
return fmt.Errorf("failed to fetch: %s", err)
}

return nil
}
46 changes: 46 additions & 0 deletions internal/app/retriever_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
package app

import (
"context"
"fmt"
"io"
"os"
"path"
"testing"

"github.com/ipfs/go-cid"
"github.com/stretchr/testify/require"
)

func TestRetrieverFileOutput(t *testing.T) {
retriever := NewRetriever(&vaultsProviderMock{}, true, 0)
output := t.TempDir()
cid := cid.Cid{}
err := retriever.Retrieve(context.Background(), cid, output)
require.NoError(t, err)

f, err := os.Open(path.Join(output, fmt.Sprintf("%s-%s", cid.String(), "sample.txt")))
require.NoError(t, err)

data, err := io.ReadAll(f)
require.NoError(t, err)

require.Equal(t, []byte("Hello"), data)
}

func TestRetrieverStdoutOutput(t *testing.T) {
old := os.Stdout
r, w, _ := os.Pipe()
os.Stdout = w // overwrite os.Stdout so we can read from it

retriever := NewRetriever(&vaultsProviderMock{}, true, 0)

err := retriever.Retrieve(context.Background(), cid.Cid{}, "-")
require.NoError(t, err)

_ = w.Close()
data, _ := io.ReadAll(r)
os.Stdout = old

require.Equal(t, []byte("Hello"), data)
}
7 changes: 7 additions & 0 deletions internal/app/streamer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,3 +282,10 @@ func (bp *vaultsProviderMock) WriteVaultEvent(
close(bp.uploaderInputs)
return nil
}

func (bp *vaultsProviderMock) RetrieveEvent(
_ context.Context, _ RetrieveEventParams, w io.Writer,
) (string, error) {
_, _ = w.Write([]byte("Hello"))
return "sample.txt", nil
}
1 change: 0 additions & 1 deletion internal/app/timestamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ func ParseTimestamp(ts string) (Timestamp, error) {
if t, err := time.Parse(time.RFC3339, ts); err == nil {
return Timestamp{t.UTC()}, nil
}
fmt.Println(time.Parse(time.RFC3339, ts))

return Timestamp{}, fmt.Errorf("could not parse %s", ts)
}
8 changes: 8 additions & 0 deletions internal/app/uploader.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"io"
"log"
"os"
"strings"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
Expand Down Expand Up @@ -53,10 +54,17 @@ func (bu *VaultsUploader) Upload(
return fmt.Errorf("signing the file: %s", err)
}

filename := filepath
if strings.Contains(filepath, "/") {
parts := strings.Split(filepath, "/")
filename = parts[len(parts)-1]
}

params := WriteVaultEventParams{
Vault: Vault(fmt.Sprintf("%s.%s", bu.namespace, bu.relation)),
Timestamp: ts,
Content: f,
Filename: filename,
ProgressBar: progress,
Signature: hex.EncodeToString(signature),
Size: sz,
Expand Down
Loading

0 comments on commit 0e2ce0e

Please sign in to comment.