Skip to content

Commit

Permalink
Merge pull request #34 from tablelandnetwork/bcalza/coldstorage
Browse files Browse the repository at this point in the history
adds retrieval from cold storage back
  • Loading branch information
brunocalza authored Feb 9, 2024
2 parents 0e2ce0e + de49236 commit 3429c2b
Show file tree
Hide file tree
Showing 5 changed files with 89 additions and 26 deletions.
12 changes: 1 addition & 11 deletions cmd/vaults/commands.go
Original file line number Diff line number Diff line change
Expand Up @@ -590,7 +590,6 @@ func newListEventsCommand() *cli.Command {

func newRetrieveCommand() *cli.Command {
var output, provider string
var cache bool
var timeout int64

return &cli.Command{
Expand Down Expand Up @@ -618,15 +617,6 @@ func newRetrieveCommand() *cli.Command {
Destination: &provider,
Value: DefaultProviderHost,
},
&cli.BoolFlag{
Name: "cache",
Aliases: []string{"c"},
Category: "OPTIONAL:",
Usage: "Retrieves from cache by setting this flag",
DefaultText: "current directory",
Destination: &cache,
Value: true,
},
&cli.Int64Flag{
Name: "timeout",
Aliases: []string{"t"},
Expand All @@ -648,7 +638,7 @@ func newRetrieveCommand() *cli.Command {
return errors.New("CID is invalid")
}

retriever := app.NewRetriever(vaultsprovider.New(provider), cache, timeout)
retriever := app.NewRetriever(vaultsprovider.New(provider), timeout)
if err := retriever.Retrieve(cCtx.Context, rootCid, output); err != nil {
return fmt.Errorf("failed to retrieve: %s", err)
}
Expand Down
93 changes: 81 additions & 12 deletions internal/app/retriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,15 @@ type Retriever struct {
}

// NewRetriever creates a new Retriever.
func NewRetriever(provider VaultsProvider, cache bool, timeout int64) *Retriever {
if cache {
return &Retriever{
store: &cacheStore{
func NewRetriever(provider VaultsProvider, timeout int64) *Retriever {
return &Retriever{
store: &coldStore{
retriever: &cacheStore{
provider: provider,
},
timeout: timeout,
}
},
timeout: timeout,
}

panic("cold store not implemented yet")
}

// Retrieve retrieves file from the network.
Expand Down Expand Up @@ -100,9 +98,68 @@ func (cs *cacheStore) retrieveFile(ctx context.Context, cid cid.Cid, output stri
return nil
}

type coldStore struct{} // nolint
type coldStore struct {
retriever retriever
}

func (cs *coldStore) retrieveFile(ctx context.Context, c cid.Cid, output string, timeout int64) error {
// try cache first. no matter the error try cold store
err := cs.retriever.retrieveFile(ctx, c, output, timeout)
if err == nil {
return nil
}

lassie, err := lassie.NewLassie(ctx)
if err != nil {
return fmt.Errorf("failed to create lassie instance: %s", err)
}

carOpts := []car.Option{
car.WriteAsCarV1(true),
car.StoreIdentityCIDs(false),
car.UseWholeCIDs(false),
}

if output == "" {
output = "." // Default to current directory
}
// Ensure path is a valid directory
info, err := os.Stat(output)
if err != nil {
return fmt.Errorf("failed to access output directory: %s", err)
}
if !info.IsDir() {
return fmt.Errorf("output path is not a directory: %s", output)
}
carPath := path.Join(output, fmt.Sprintf("%s.car", c.String()))
carWriter := deferred.NewDeferredCarWriterForPath(carPath, []cid.Cid{c}, carOpts...)

carStore := storage.NewCachingTempStore(
carWriter.BlockWriteOpener(), storage.NewDeferredStorageCar(os.TempDir(), c),
)
defer func() {
_ = carStore.Close()
}()

request, err := types.NewRequestForPath(carStore, c, "", trustlessutils.DagScopeAll, nil)
if err != nil {
return fmt.Errorf("failed to create request: %s", err)
}

if _, err := lassie.Fetch(ctx, request, []types.FetchOption{}...); err != nil {
return fmt.Errorf("failed to fetch: %s", err)
}

return nil
}

func (cs *coldStore) retrieveStdout(ctx context.Context, c cid.Cid, timeout int64) error {
// try cache first. no matter the error try cold store
err := cs.retriever.retrieveStdout(ctx, c, timeout)
if err == nil {
return nil
}

func (cs *coldStore) retrieve(ctx context.Context, c cid.Cid, path string) error { // nolint
lassie, err := lassie.NewLassie(ctx)
if err != nil {
return fmt.Errorf("failed to create lassie instance: %s", err)
Expand All @@ -114,10 +171,16 @@ func (cs *coldStore) retrieve(ctx context.Context, c cid.Cid, path string) error
car.UseWholeCIDs(false),
}

carWriter := deferred.NewDeferredCarWriterForPath(path, []cid.Cid{c}, carOpts...)
// Create a temporary file only for writing to stdout case
tmpFile, err := os.CreateTemp("", fmt.Sprintf("%s.car", c.String()))
if err != nil {
return fmt.Errorf("failed to create temporary file: %s", err)
}
defer func() {
_ = carWriter.Close()
_ = os.Remove(tmpFile.Name())
}()
carWriter := deferred.NewDeferredCarWriterForPath(tmpFile.Name(), []cid.Cid{c}, carOpts...)

carStore := storage.NewCachingTempStore(
carWriter.BlockWriteOpener(), storage.NewDeferredStorageCar(os.TempDir(), c),
)
Expand All @@ -134,5 +197,11 @@ func (cs *coldStore) retrieve(ctx context.Context, c cid.Cid, path string) error
return fmt.Errorf("failed to fetch: %s", err)
}

_, _ = tmpFile.Seek(0, io.SeekStart)
_, err = io.Copy(os.Stdout, tmpFile)
if err != nil {
return fmt.Errorf("failed to write to stdout: %s", err)
}

return nil
}
4 changes: 2 additions & 2 deletions internal/app/retriever_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
)

func TestRetrieverFileOutput(t *testing.T) {
retriever := NewRetriever(&vaultsProviderMock{}, true, 0)
retriever := NewRetriever(&vaultsProviderMock{}, 0)
output := t.TempDir()
cid := cid.Cid{}
err := retriever.Retrieve(context.Background(), cid, output)
Expand All @@ -33,7 +33,7 @@ func TestRetrieverStdoutOutput(t *testing.T) {
r, w, _ := os.Pipe()
os.Stdout = w // overwrite os.Stdout so we can read from it

retriever := NewRetriever(&vaultsProviderMock{}, true, 0)
retriever := NewRetriever(&vaultsProviderMock{}, 0)

err := retriever.Retrieve(context.Background(), cid.Cid{}, "-")
require.NoError(t, err)
Expand Down
4 changes: 4 additions & 0 deletions internal/app/vaults_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package app

import (
"context"
"errors"
"io"

"github.com/ipfs/go-cid"
Expand Down Expand Up @@ -53,3 +54,6 @@ type RetrieveEventParams struct {
Timeout int64
CID cid.Cid
}

// ErrNotFoundInCache is an error when file is not found in cache.
var ErrNotFoundInCache = errors.New("not found in cache")
2 changes: 1 addition & 1 deletion pkg/vaultsprovider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func (bp *VaultsProvider) RetrieveEvent(
}()

if resp.StatusCode == http.StatusNotFound {
return "", errors.New("not found")
return "", app.ErrNotFoundInCache
}

re := regexp.MustCompile(`".+"`)
Expand Down

0 comments on commit 3429c2b

Please sign in to comment.