diff --git a/cmd/lassie/fetch.go b/cmd/lassie/fetch.go index 5b71caa4..51184a77 100644 --- a/cmd/lassie/fetch.go +++ b/cmd/lassie/fetch.go @@ -46,6 +46,19 @@ var fetchFlags = []cli.Flag{ return nil }, }, + &cli.StringFlag{ + Name: "entity-bytes", + Usage: "describes the byte range to consider when selecting the blocks from a sharded file." + + " Valid values should be of the form from:to, where from and to are byte offsets and to may be '*'", + DefaultText: "defaults to the entire file, 0:*", + Action: func(cctx *cli.Context, v string) error { + if _, err := types.ParseByteRange(v); err != nil { + return fmt.Errorf("invalid entity-bytes parameter, must be of the form from:to," + + " where from and to are byte offsets and to may be '*'") + } + return nil + }, + }, FlagIPNIEndpoint, FlagEventRecorderAuth, FlagEventRecorderInstanceId, @@ -78,6 +91,7 @@ func fetchAction(cctx *cli.Context) error { dataWriter := cctx.App.Writer dagScope := cctx.String("dag-scope") + entityBytes := cctx.String("entity-bytes") tempDir := cctx.String("tempdir") progress := cctx.Bool("progress") @@ -111,6 +125,7 @@ func fetchAction(cctx *cli.Context) error { rootCid, path, dagScope, + entityBytes, tempDir, progress, outfile, @@ -188,6 +203,7 @@ type fetchRunFunc func( rootCid cid.Cid, path string, dagScope string, + entityBytes string, tempDir string, progress bool, outfile string, @@ -207,6 +223,7 @@ func defaultFetchRun( rootCid cid.Cid, path string, dagScope string, + entityBytes string, tempDir string, progress bool, outfile string, @@ -258,7 +275,12 @@ func defaultFetchRun( } }, false) - request, err := types.NewRequestForPath(carStore, rootCid, path, types.DagScope(dagScope)) + byteRange, _ := types.ParseByteRange(entityBytes) + var br *types.ByteRange + if !byteRange.IsDefault() { + br = &byteRange + } + request, err := types.NewRequestForPath(carStore, rootCid, path, types.DagScope(dagScope), br) if err != nil { return err } diff --git a/cmd/lassie/fetch_test.go b/cmd/lassie/fetch_test.go index 2ca6f5aa..15ff0f85 100644 --- a/cmd/lassie/fetch_test.go +++ b/cmd/lassie/fetch_test.go @@ -28,11 +28,12 @@ func TestFetchCommandFlags(t *testing.T) { { name: "with default args", args: []string{"fetch", "bafybeic56z3yccnla3cutmvqsn5zy3g24muupcsjtoyp3pu5pm5amurjx4"}, - assertRun: func(ctx context.Context, lCfg *l.LassieConfig, erCfg *a.EventRecorderConfig, msgWriter io.Writer, dataWriter io.Writer, rootCid cid.Cid, path string, dagScope string, tempDir string, progress bool, outfile string) error { + assertRun: func(ctx context.Context, lCfg *l.LassieConfig, erCfg *a.EventRecorderConfig, msgWriter io.Writer, dataWriter io.Writer, rootCid cid.Cid, path string, dagScope string, entityBytes string, tempDir string, progress bool, outfile string) error { // fetch specific params require.Equal(t, "bafybeic56z3yccnla3cutmvqsn5zy3g24muupcsjtoyp3pu5pm5amurjx4", rootCid.String()) require.Equal(t, "", path) require.Equal(t, string(types.DagScopeAll), dagScope) + require.Empty(t, entityBytes) require.Equal(t, false, progress) require.Equal(t, "bafybeic56z3yccnla3cutmvqsn5zy3g24muupcsjtoyp3pu5pm5amurjx4.car", outfile) @@ -70,7 +71,7 @@ func TestFetchCommandFlags(t *testing.T) { "fetch", "bafybeic56z3yccnla3cutmvqsn5zy3g24muupcsjtoyp3pu5pm5amurjx4/birb.mp4", }, - assertRun: func(ctx context.Context, lCfg *l.LassieConfig, erCfg *a.EventRecorderConfig, msgWriter io.Writer, dataWriter io.Writer, rootCid cid.Cid, path string, dagScope string, tempDir string, progress bool, outfile string) error { + assertRun: func(ctx context.Context, lCfg *l.LassieConfig, erCfg *a.EventRecorderConfig, msgWriter io.Writer, dataWriter io.Writer, rootCid cid.Cid, path string, dagScope string, entityBytes string, tempDir string, progress bool, outfile string) error { require.Equal(t, "/birb.mp4", path) return nil }, @@ -83,7 +84,7 @@ func TestFetchCommandFlags(t *testing.T) { "entity", "bafybeic56z3yccnla3cutmvqsn5zy3g24muupcsjtoyp3pu5pm5amurjx4", }, - assertRun: func(ctx context.Context, lCfg *l.LassieConfig, erCfg *a.EventRecorderConfig, msgWriter io.Writer, dataWriter io.Writer, rootCid cid.Cid, path string, dagScope string, tempDir string, progress bool, outfile string) error { + assertRun: func(ctx context.Context, lCfg *l.LassieConfig, erCfg *a.EventRecorderConfig, msgWriter io.Writer, dataWriter io.Writer, rootCid cid.Cid, path string, dagScope string, entityBytes string, tempDir string, progress bool, outfile string) error { require.Equal(t, string(types.DagScopeEntity), dagScope) return nil }, @@ -96,11 +97,50 @@ func TestFetchCommandFlags(t *testing.T) { "block", "bafybeic56z3yccnla3cutmvqsn5zy3g24muupcsjtoyp3pu5pm5amurjx4", }, - assertRun: func(ctx context.Context, lCfg *l.LassieConfig, erCfg *a.EventRecorderConfig, msgWriter io.Writer, dataWriter io.Writer, rootCid cid.Cid, path string, dagScope string, tempDir string, progress bool, outfile string) error { + assertRun: func(ctx context.Context, lCfg *l.LassieConfig, erCfg *a.EventRecorderConfig, msgWriter io.Writer, dataWriter io.Writer, rootCid cid.Cid, path string, dagScope string, entityBytes string, tempDir string, progress bool, outfile string) error { require.Equal(t, string(types.DagScopeBlock), dagScope) return nil }, }, + { + name: "with entity-bytes 0:*", + args: []string{ + "fetch", + "--entity-bytes", + "0:*", + "bafybeic56z3yccnla3cutmvqsn5zy3g24muupcsjtoyp3pu5pm5amurjx4", + }, + assertRun: func(ctx context.Context, lCfg *l.LassieConfig, erCfg *a.EventRecorderConfig, msgWriter io.Writer, dataWriter io.Writer, rootCid cid.Cid, path string, dagScope string, entityBytes string, tempDir string, progress bool, outfile string) error { + require.Equal(t, "0:*", entityBytes) + return nil + }, + }, + { + name: "with entity-bytes 0:10", + args: []string{ + "fetch", + "--entity-bytes", + "0:10", + "bafybeic56z3yccnla3cutmvqsn5zy3g24muupcsjtoyp3pu5pm5amurjx4", + }, + assertRun: func(ctx context.Context, lCfg *l.LassieConfig, erCfg *a.EventRecorderConfig, msgWriter io.Writer, dataWriter io.Writer, rootCid cid.Cid, path string, dagScope string, entityBytes string, tempDir string, progress bool, outfile string) error { + require.Equal(t, "0:10", entityBytes) + return nil + }, + }, + { + name: "with entity-bytes 1000:20000", + args: []string{ + "fetch", + "--entity-bytes", + "1000:20000", + "bafybeic56z3yccnla3cutmvqsn5zy3g24muupcsjtoyp3pu5pm5amurjx4", + }, + assertRun: func(ctx context.Context, lCfg *l.LassieConfig, erCfg *a.EventRecorderConfig, msgWriter io.Writer, dataWriter io.Writer, rootCid cid.Cid, path string, dagScope string, entityBytes string, tempDir string, progress bool, outfile string) error { + require.Equal(t, "1000:20000", entityBytes) + return nil + }, + }, { name: "with progress", args: []string{ @@ -108,7 +148,7 @@ func TestFetchCommandFlags(t *testing.T) { "--progress", "bafybeic56z3yccnla3cutmvqsn5zy3g24muupcsjtoyp3pu5pm5amurjx4", }, - assertRun: func(ctx context.Context, lCfg *l.LassieConfig, erCfg *a.EventRecorderConfig, msgWriter io.Writer, dataWriter io.Writer, rootCid cid.Cid, path string, dagScope string, tempDir string, progress bool, outfile string) error { + assertRun: func(ctx context.Context, lCfg *l.LassieConfig, erCfg *a.EventRecorderConfig, msgWriter io.Writer, dataWriter io.Writer, rootCid cid.Cid, path string, dagScope string, entityBytes string, tempDir string, progress bool, outfile string) error { require.Equal(t, true, progress) return nil }, @@ -121,7 +161,7 @@ func TestFetchCommandFlags(t *testing.T) { "myfile", "bafybeic56z3yccnla3cutmvqsn5zy3g24muupcsjtoyp3pu5pm5amurjx4", }, - assertRun: func(ctx context.Context, lCfg *l.LassieConfig, erCfg *a.EventRecorderConfig, msgWriter io.Writer, dataWriter io.Writer, rootCid cid.Cid, path string, dagScope string, tempDir string, progress bool, outfile string) error { + assertRun: func(ctx context.Context, lCfg *l.LassieConfig, erCfg *a.EventRecorderConfig, msgWriter io.Writer, dataWriter io.Writer, rootCid cid.Cid, path string, dagScope string, entityBytes string, tempDir string, progress bool, outfile string) error { require.Equal(t, "myfile", outfile) return nil }, @@ -134,7 +174,7 @@ func TestFetchCommandFlags(t *testing.T) { "/ip4/127.0.0.1/tcp/5000/p2p/12D3KooWBSTEYMLSu5FnQjshEVah9LFGEZoQt26eacCEVYfedWA4", "bafybeic56z3yccnla3cutmvqsn5zy3g24muupcsjtoyp3pu5pm5amurjx4", }, - assertRun: func(ctx context.Context, lCfg *l.LassieConfig, erCfg *a.EventRecorderConfig, msgWriter io.Writer, dataWriter io.Writer, rootCid cid.Cid, path string, dagScope string, tempDir string, progress bool, outfile string) error { + assertRun: func(ctx context.Context, lCfg *l.LassieConfig, erCfg *a.EventRecorderConfig, msgWriter io.Writer, dataWriter io.Writer, rootCid cid.Cid, path string, dagScope string, entityBytes string, tempDir string, progress bool, outfile string) error { require.IsType(t, &retriever.DirectCandidateFinder{}, lCfg.Finder, "finder should be a DirectCandidateFinder when providers are specified") return nil }, @@ -147,7 +187,7 @@ func TestFetchCommandFlags(t *testing.T) { "https://cid.contact", "bafybeic56z3yccnla3cutmvqsn5zy3g24muupcsjtoyp3pu5pm5amurjx4", }, - assertRun: func(ctx context.Context, lCfg *l.LassieConfig, erCfg *a.EventRecorderConfig, msgWriter io.Writer, dataWriter io.Writer, rootCid cid.Cid, path string, dagScope string, tempDir string, progress bool, outfile string) error { + assertRun: func(ctx context.Context, lCfg *l.LassieConfig, erCfg *a.EventRecorderConfig, msgWriter io.Writer, dataWriter io.Writer, rootCid cid.Cid, path string, dagScope string, entityBytes string, tempDir string, progress bool, outfile string) error { require.IsType(t, &indexerlookup.IndexerCandidateFinder{}, lCfg.Finder, "finder should be an IndexerCandidateFinder when providing an ipni endpoint") return nil }, @@ -170,7 +210,7 @@ func TestFetchCommandFlags(t *testing.T) { "/mytmpdir", "bafybeic56z3yccnla3cutmvqsn5zy3g24muupcsjtoyp3pu5pm5amurjx4", }, - assertRun: func(ctx context.Context, lCfg *l.LassieConfig, erCfg *a.EventRecorderConfig, msgWriter io.Writer, dataWriter io.Writer, rootCid cid.Cid, path string, dagScope string, tempDir string, progress bool, outfile string) error { + assertRun: func(ctx context.Context, lCfg *l.LassieConfig, erCfg *a.EventRecorderConfig, msgWriter io.Writer, dataWriter io.Writer, rootCid cid.Cid, path string, dagScope string, entityBytes string, tempDir string, progress bool, outfile string) error { require.Equal(t, "/mytmpdir", tempDir) return nil }, @@ -183,7 +223,7 @@ func TestFetchCommandFlags(t *testing.T) { "30s", "bafybeic56z3yccnla3cutmvqsn5zy3g24muupcsjtoyp3pu5pm5amurjx4", }, - assertRun: func(ctx context.Context, lCfg *l.LassieConfig, erCfg *a.EventRecorderConfig, msgWriter io.Writer, dataWriter io.Writer, rootCid cid.Cid, path string, dagScope string, tempDir string, progress bool, outfile string) error { + assertRun: func(ctx context.Context, lCfg *l.LassieConfig, erCfg *a.EventRecorderConfig, msgWriter io.Writer, dataWriter io.Writer, rootCid cid.Cid, path string, dagScope string, entityBytes string, tempDir string, progress bool, outfile string) error { require.Equal(t, 30*time.Second, lCfg.ProviderTimeout) return nil }, @@ -196,7 +236,7 @@ func TestFetchCommandFlags(t *testing.T) { "30s", "bafybeic56z3yccnla3cutmvqsn5zy3g24muupcsjtoyp3pu5pm5amurjx4", }, - assertRun: func(ctx context.Context, lCfg *l.LassieConfig, erCfg *a.EventRecorderConfig, msgWriter io.Writer, dataWriter io.Writer, rootCid cid.Cid, path string, dagScope string, tempDir string, progress bool, outfile string) error { + assertRun: func(ctx context.Context, lCfg *l.LassieConfig, erCfg *a.EventRecorderConfig, msgWriter io.Writer, dataWriter io.Writer, rootCid cid.Cid, path string, dagScope string, entityBytes string, tempDir string, progress bool, outfile string) error { require.Equal(t, 30*time.Second, lCfg.GlobalTimeout) return nil }, @@ -209,7 +249,7 @@ func TestFetchCommandFlags(t *testing.T) { "bitswap,graphsync", "bafybeic56z3yccnla3cutmvqsn5zy3g24muupcsjtoyp3pu5pm5amurjx4", }, - assertRun: func(ctx context.Context, lCfg *l.LassieConfig, erCfg *a.EventRecorderConfig, msgWriter io.Writer, dataWriter io.Writer, rootCid cid.Cid, path string, dagScope string, tempDir string, progress bool, outfile string) error { + assertRun: func(ctx context.Context, lCfg *l.LassieConfig, erCfg *a.EventRecorderConfig, msgWriter io.Writer, dataWriter io.Writer, rootCid cid.Cid, path string, dagScope string, entityBytes string, tempDir string, progress bool, outfile string) error { require.Equal(t, []multicodec.Code{multicodec.TransportBitswap, multicodec.TransportGraphsyncFilecoinv1}, lCfg.Protocols) return nil }, @@ -222,7 +262,7 @@ func TestFetchCommandFlags(t *testing.T) { "12D3KooWBSTEYMLSu5FnQjshEVah9LFGEZoQt26eacCEVYfedWA4,12D3KooWPNbkEgjdBNeaCGpsgCrPRETe4uBZf1ShFXStobdN18ys", "bafybeic56z3yccnla3cutmvqsn5zy3g24muupcsjtoyp3pu5pm5amurjx4", }, - assertRun: func(ctx context.Context, lCfg *l.LassieConfig, erCfg *a.EventRecorderConfig, msgWriter io.Writer, dataWriter io.Writer, rootCid cid.Cid, path string, dagScope string, tempDir string, progress bool, outfile string) error { + assertRun: func(ctx context.Context, lCfg *l.LassieConfig, erCfg *a.EventRecorderConfig, msgWriter io.Writer, dataWriter io.Writer, rootCid cid.Cid, path string, dagScope string, entityBytes string, tempDir string, progress bool, outfile string) error { p1, err := peer.Decode("12D3KooWBSTEYMLSu5FnQjshEVah9LFGEZoQt26eacCEVYfedWA4") require.NoError(t, err) p2, err := peer.Decode("12D3KooWPNbkEgjdBNeaCGpsgCrPRETe4uBZf1ShFXStobdN18ys") @@ -241,7 +281,7 @@ func TestFetchCommandFlags(t *testing.T) { "10", "bafybeic56z3yccnla3cutmvqsn5zy3g24muupcsjtoyp3pu5pm5amurjx4", }, - assertRun: func(ctx context.Context, lCfg *l.LassieConfig, erCfg *a.EventRecorderConfig, msgWriter io.Writer, dataWriter io.Writer, rootCid cid.Cid, path string, dagScope string, tempDir string, progress bool, outfile string) error { + assertRun: func(ctx context.Context, lCfg *l.LassieConfig, erCfg *a.EventRecorderConfig, msgWriter io.Writer, dataWriter io.Writer, rootCid cid.Cid, path string, dagScope string, entityBytes string, tempDir string, progress bool, outfile string) error { require.Equal(t, 10, lCfg.BitswapConcurrency) return nil }, @@ -254,7 +294,7 @@ func TestFetchCommandFlags(t *testing.T) { "https://myeventrecorder.com/v1/retrieval-events", "bafybeic56z3yccnla3cutmvqsn5zy3g24muupcsjtoyp3pu5pm5amurjx4", }, - assertRun: func(ctx context.Context, lCfg *l.LassieConfig, erCfg *a.EventRecorderConfig, msgWriter io.Writer, dataWriter io.Writer, rootCid cid.Cid, path string, dagScope string, tempDir string, progress bool, outfile string) error { + assertRun: func(ctx context.Context, lCfg *l.LassieConfig, erCfg *a.EventRecorderConfig, msgWriter io.Writer, dataWriter io.Writer, rootCid cid.Cid, path string, dagScope string, entityBytes string, tempDir string, progress bool, outfile string) error { require.Equal(t, "https://myeventrecorder.com/v1/retrieval-events", erCfg.EndpointURL) return nil }, @@ -267,7 +307,7 @@ func TestFetchCommandFlags(t *testing.T) { "secret", "bafybeic56z3yccnla3cutmvqsn5zy3g24muupcsjtoyp3pu5pm5amurjx4", }, - assertRun: func(ctx context.Context, lCfg *l.LassieConfig, erCfg *a.EventRecorderConfig, msgWriter io.Writer, dataWriter io.Writer, rootCid cid.Cid, path string, dagScope string, tempDir string, progress bool, outfile string) error { + assertRun: func(ctx context.Context, lCfg *l.LassieConfig, erCfg *a.EventRecorderConfig, msgWriter io.Writer, dataWriter io.Writer, rootCid cid.Cid, path string, dagScope string, entityBytes string, tempDir string, progress bool, outfile string) error { require.Equal(t, "secret", erCfg.EndpointAuthorization) return nil }, @@ -280,7 +320,7 @@ func TestFetchCommandFlags(t *testing.T) { "myinstanceid", "bafybeic56z3yccnla3cutmvqsn5zy3g24muupcsjtoyp3pu5pm5amurjx4", }, - assertRun: func(ctx context.Context, lCfg *l.LassieConfig, erCfg *a.EventRecorderConfig, msgWriter io.Writer, dataWriter io.Writer, rootCid cid.Cid, path string, dagScope string, tempDir string, progress bool, outfile string) error { + assertRun: func(ctx context.Context, lCfg *l.LassieConfig, erCfg *a.EventRecorderConfig, msgWriter io.Writer, dataWriter io.Writer, rootCid cid.Cid, path string, dagScope string, entityBytes string, tempDir string, progress bool, outfile string) error { require.Equal(t, "myinstanceid", erCfg.InstanceID) return nil }, @@ -312,6 +352,6 @@ func TestFetchCommandFlags(t *testing.T) { } } -func noopRun(ctx context.Context, lCfg *l.LassieConfig, erCfg *a.EventRecorderConfig, msgWriter io.Writer, dataWriter io.Writer, rootCid cid.Cid, path string, dagScope string, tempDir string, progress bool, outfile string) error { +func noopRun(ctx context.Context, lCfg *l.LassieConfig, erCfg *a.EventRecorderConfig, msgWriter io.Writer, dataWriter io.Writer, rootCid cid.Cid, path string, dagScope string, entityBytes string, tempDir string, progress bool, outfile string) error { return nil } diff --git a/go.mod b/go.mod index e647c2d4..dd5d30c4 100644 --- a/go.mod +++ b/go.mod @@ -22,10 +22,11 @@ require ( github.com/ipfs/go-ipfs-exchange-interface v0.2.0 github.com/ipfs/go-ipld-format v0.5.0 github.com/ipfs/go-log/v2 v2.5.1 - github.com/ipfs/go-unixfsnode v1.7.1 + github.com/ipfs/go-unixfsnode v1.7.2 github.com/ipld/go-car/v2 v2.10.1 github.com/ipld/go-codec-dagpb v1.6.0 - github.com/ipld/go-ipld-prime v0.20.1-0.20230329011551-5056175565b0 + github.com/ipld/go-ipld-prime v0.20.1-0.20230707090759-349deb22a1fd + github.com/ipld/ipld/specs v0.0.0-20230705075038-29da2e853cdb github.com/ipni/go-libipni v0.0.8-0.20230425184153-86a1fcb7f7ff github.com/libp2p/go-libp2p v0.27.1 github.com/libp2p/go-libp2p-routing-helpers v0.7.0 @@ -144,6 +145,7 @@ require ( github.com/raulk/go-watchdog v1.3.0 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect + github.com/warpfork/go-testmark v0.12.1 // indirect github.com/whyrusleeping/cbor v0.0.0-20171005072247-63513f603b11 // indirect github.com/whyrusleeping/cbor-gen v0.0.0-20230126041949-52956bd4c9aa // indirect github.com/whyrusleeping/chunker v0.0.0-20181014151217-fe64bd25879f // indirect diff --git a/go.sum b/go.sum index 88cf9a7c..0b808ac8 100644 --- a/go.sum +++ b/go.sum @@ -126,7 +126,7 @@ github.com/flynn/noise v1.0.0 h1:DlTHqmzmvcEiKj+4RYo/imoswx/4r6iBlCMfVtrMXpQ= github.com/flynn/noise v1.0.0/go.mod h1:xbMo+0i6+IGbYdJhF31t2eR1BIU0CYc12+BNAKwUTag= github.com/francoispqt/gojay v1.2.13 h1:d2m3sFjloqoIUQU3TsHBgj6qg/BVGlTBeHDUmyJnXKk= github.com/francoispqt/gojay v1.2.13/go.mod h1:ehT5mTG4ua4581f1++1WLG0vPdaA9HaiDsoyrBGkyDY= -github.com/frankban/quicktest v1.14.4 h1:g2rn0vABPOOXmZUj+vbmUp0lPoXEMuhTpIluN0XL9UY= +github.com/frankban/quicktest v1.14.5 h1:dfYrrRyLtiqT9GyKXgdh+k4inNeTvmGbuSgZ3lx3GhA= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE= @@ -329,16 +329,18 @@ github.com/ipfs/go-metrics-interface v0.0.1/go.mod h1:6s6euYU4zowdslK0GKHmqaIZ3j github.com/ipfs/go-peertaskqueue v0.8.1 h1:YhxAs1+wxb5jk7RvS0LHdyiILpNmRIRnZVztekOF0pg= github.com/ipfs/go-peertaskqueue v0.8.1/go.mod h1:Oxxd3eaK279FxeydSPPVGHzbwVeHjatZ2GA8XD+KbPU= github.com/ipfs/go-unixfs v0.4.5 h1:wj8JhxvV1G6CD7swACwSKYa+NgtdWC1RUit+gFnymDU= -github.com/ipfs/go-unixfsnode v1.7.1 h1:RRxO2b6CSr5UQ/kxnGzaChTjp5LWTdf3Y4n8ANZgB/s= -github.com/ipfs/go-unixfsnode v1.7.1/go.mod h1:PVfoyZkX1B34qzT3vJO4nsLUpRCyhnMuHBznRcXirlk= +github.com/ipfs/go-unixfsnode v1.7.2 h1:460jNXtoBO7AJ5RnrNBVY9/ytZwUZOviDhcRxuLpAvA= +github.com/ipfs/go-unixfsnode v1.7.2/go.mod h1:PVfoyZkX1B34qzT3vJO4nsLUpRCyhnMuHBznRcXirlk= github.com/ipfs/go-verifcid v0.0.2 h1:XPnUv0XmdH+ZIhLGKg6U2vaPaRDXb9urMyNVCE7uvTs= github.com/ipld/go-car/v2 v2.10.1 h1:MRDqkONNW9WRhB79u+Z3U5b+NoN7lYA5B8n8qI3+BoI= github.com/ipld/go-car/v2 v2.10.1/go.mod h1:sQEkXVM3csejlb1kCCb+vQ/pWBKX9QtvsrysMQjOgOg= github.com/ipld/go-codec-dagpb v1.6.0 h1:9nYazfyu9B1p3NAgfVdpRco3Fs2nFC72DqVsMj6rOcc= github.com/ipld/go-codec-dagpb v1.6.0/go.mod h1:ANzFhfP2uMJxRBr8CE+WQWs5UsNa0pYtmKZ+agnUw9s= -github.com/ipld/go-ipld-prime v0.20.1-0.20230329011551-5056175565b0 h1:iJTl9tx5DEsnKpppX5PmfdoQ3ITuBmkh3yyEpHWY2SI= -github.com/ipld/go-ipld-prime v0.20.1-0.20230329011551-5056175565b0/go.mod h1:wmOtdy70ajP48iZITH8uLsGJVMqA4EJM61/bSfYYGhs= +github.com/ipld/go-ipld-prime v0.20.1-0.20230707090759-349deb22a1fd h1:/MI1u/0eL1gW3lFxMOs1sX/+bSCQgjQbK+yTJkvXMrc= +github.com/ipld/go-ipld-prime v0.20.1-0.20230707090759-349deb22a1fd/go.mod h1:PRQpXNcJypaPiiSdarsrJABPkYrBvafwDl0B9HjujZ8= github.com/ipld/go-ipld-prime/storage/bsadapter v0.0.0-20230102063945-1a409dc236dd h1:gMlw/MhNr2Wtp5RwGdsW23cs+yCuj9k2ON7i9MiJlRo= +github.com/ipld/ipld/specs v0.0.0-20230705075038-29da2e853cdb h1:SDDUglxqma8Zl5BwGb8VPnkXXR48t7pcPnh9x8Mfp9I= +github.com/ipld/ipld/specs v0.0.0-20230705075038-29da2e853cdb/go.mod h1:eB2ZYKBGUJFuxTCO8YC0jJu41iquw6AvHhpP7N2yw8k= github.com/ipni/go-libipni v0.0.8-0.20230425184153-86a1fcb7f7ff h1:xbKrIvnpQkbF8iHPk/HGcegsypCDpcXWHhzBCLyCWf8= github.com/ipni/go-libipni v0.0.8-0.20230425184153-86a1fcb7f7ff/go.mod h1:paYP9U4N3/vOzGCuN9kU972vtvw9JUcQjOKyiCFGwRk= github.com/ipsn/go-secp256k1 v0.0.0-20180726113642-9d62b9f0bc52 h1:QG4CGBqCeuBo6aZlGAamSkxWdgWfZGeE49eUOWJPA4c= @@ -616,7 +618,9 @@ github.com/urfave/cli/v2 v2.24.4 h1:0gyJJEBYtCV87zI/x2nZCPyDxD51K6xM8SkwjHFCNEU= github.com/urfave/cli/v2 v2.24.4/go.mod h1:GHupkWPMM0M/sj1a2b4wUrWBPzazNrIjouW6fmdJLxc= github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49uaYMPRU= github.com/viant/toolbox v0.24.0/go.mod h1:OxMCG57V0PXuIP2HNQrtJf2CjqdmbrOx5EkMILuUhzM= -github.com/warpfork/go-testmark v0.11.0 h1:J6LnV8KpceDvo7spaNU4+DauH2n1x+6RaO2rJrmpQ9U= +github.com/warpfork/go-fsx v0.3.0/go.mod h1:oTACCMj+Zle+vgVa5SAhGAh7WksYpLgGUCKEAVc+xPg= +github.com/warpfork/go-testmark v0.12.1 h1:rMgCpJfwy1sJ50x0M0NgyphxYYPMOODIJHhsXyEHU0s= +github.com/warpfork/go-testmark v0.12.1/go.mod h1:kHwy7wfvGSPh1rQJYKayD4AbtNaeyZdcGi9tNJTaa5Y= github.com/warpfork/go-wish v0.0.0-20180510122957-5ad1f5abf436/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw= github.com/warpfork/go-wish v0.0.0-20190328234359-8b3e70f8e830/go.mod h1:x6AKhvSSexNrVSrViXSHUEbICjmGXhtgABaHIySUSGw= github.com/warpfork/go-wish v0.0.0-20220906213052-39a1cc7a02d0 h1:GDDkbFiaK8jsSDJfjId/PEGEShv6ugrt4kYsC5UIDaQ= diff --git a/pkg/internal/itest/direct_fetch_test.go b/pkg/internal/itest/direct_fetch_test.go index c15d4551..a03fa60e 100644 --- a/pkg/internal/itest/direct_fetch_test.go +++ b/pkg/internal/itest/direct_fetch_test.go @@ -116,7 +116,7 @@ func TestDirectFetch(t *testing.T) { }() outCar, err := storage.NewReadableWritable(outFile, []cid.Cid{srcData1.Root}, carv2.WriteAsCarV1(true)) req.NoError(err) - request, err := types.NewRequestForPath(outCar, srcData1.Root, "", types.DagScopeAll) + request, err := types.NewRequestForPath(outCar, srcData1.Root, "", types.DagScopeAll, nil) req.NoError(err) _, err = lassie.Fetch(ctx, request, func(types.RetrievalEvent) {}) req.NoError(err) diff --git a/pkg/internal/itest/testpeer/backedstore.go b/pkg/internal/itest/testpeer/backedstore.go new file mode 100644 index 00000000..1b62863d --- /dev/null +++ b/pkg/internal/itest/testpeer/backedstore.go @@ -0,0 +1,94 @@ +package testpeer + +import ( + "bytes" + "context" + "errors" + "io" + + "github.com/ipfs/boxo/blockstore" + blocks "github.com/ipfs/go-block-format" + "github.com/ipfs/go-cid" + "github.com/ipld/go-ipld-prime/linking" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" +) + +var _ blockstore.Blockstore = (*BackedStore)(nil) +var _ blockstore.Blockstore = (*linkSystemBlockstore)(nil) + +type BackedStore struct { + blockstore.Blockstore +} + +func (bs *BackedStore) UseLinkSystem(lsys linking.LinkSystem) { + bs.Blockstore = &linkSystemBlockstore{lsys} +} + +type linkSystemBlockstore struct { + lsys linking.LinkSystem +} + +func (lsbs *linkSystemBlockstore) DeleteBlock(ctx context.Context, c cid.Cid) error { + return errors.New("not supported") +} + +func (lsbs *linkSystemBlockstore) Has(ctx context.Context, c cid.Cid) (bool, error) { + _, err := lsbs.lsys.StorageReadOpener(linking.LinkContext{Ctx: ctx}, cidlink.Link{Cid: c}) + if err != nil { + return false, err + } + return true, nil +} + +func (lsbs *linkSystemBlockstore) Get(ctx context.Context, c cid.Cid) (blocks.Block, error) { + rdr, err := lsbs.lsys.StorageReadOpener(linking.LinkContext{Ctx: ctx}, cidlink.Link{Cid: c}) + if err != nil { + return nil, err + } + var buf bytes.Buffer + _, err = io.Copy(&buf, rdr) + if err != nil { + return nil, err + } + return blocks.NewBlockWithCid(buf.Bytes(), c) +} + +func (lsbs *linkSystemBlockstore) GetSize(ctx context.Context, c cid.Cid) (int, error) { + rdr, err := lsbs.lsys.StorageReadOpener(linking.LinkContext{Ctx: ctx}, cidlink.Link{Cid: c}) + if err != nil { + return 0, err + } + i, err := io.Copy(io.Discard, rdr) + if err != nil { + return 0, err + } + return int(i), nil +} + +func (lsbs *linkSystemBlockstore) Put(ctx context.Context, blk blocks.Block) error { + w, wc, err := lsbs.lsys.StorageWriteOpener(linking.LinkContext{Ctx: ctx}) + if err != nil { + return err + } + if _, err = io.Copy(w, bytes.NewReader(blk.RawData())); err != nil { + return err + } + return wc(cidlink.Link{Cid: blk.Cid()}) +} + +func (lsbs *linkSystemBlockstore) PutMany(ctx context.Context, blks []blocks.Block) error { + for _, blk := range blks { + if err := lsbs.Put(ctx, blk); err != nil { + return err + } + } + return nil +} + +func (lsbs *linkSystemBlockstore) AllKeysChan(ctx context.Context) (<-chan cid.Cid, error) { + return nil, errors.New("not supported") +} + +func (lsbs *linkSystemBlockstore) HashOnRead(enabled bool) { + lsbs.lsys.TrustedStorage = !enabled +} diff --git a/pkg/internal/itest/testpeer/generator.go b/pkg/internal/itest/testpeer/generator.go index 42d52d52..5a74bd88 100644 --- a/pkg/internal/itest/testpeer/generator.go +++ b/pkg/internal/itest/testpeer/generator.go @@ -1,13 +1,11 @@ package testpeer import ( - "bytes" "context" "errors" "fmt" "io" "net" - "net/http" "strings" "testing" "time" @@ -16,7 +14,6 @@ import ( dtimpl "github.com/filecoin-project/go-data-transfer/v2/impl" dtnet "github.com/filecoin-project/go-data-transfer/v2/network" gstransport "github.com/filecoin-project/go-data-transfer/v2/transport/graphsync" - "github.com/filecoin-project/lassie/pkg/types" bsnet "github.com/ipfs/boxo/bitswap/network" "github.com/ipfs/boxo/bitswap/server" "github.com/ipfs/go-cid" @@ -29,17 +26,9 @@ import ( blockstore "github.com/ipfs/go-ipfs-blockstore" delay "github.com/ipfs/go-ipfs-delay" "github.com/ipfs/go-log/v2" - "github.com/ipfs/go-unixfsnode" - "github.com/ipld/go-car/v2" - "github.com/ipld/go-car/v2/storage" - dagpb "github.com/ipld/go-codec-dagpb" "github.com/ipld/go-ipld-prime" - "github.com/ipld/go-ipld-prime/datamodel" "github.com/ipld/go-ipld-prime/linking" cidlink "github.com/ipld/go-ipld-prime/linking/cid" - "github.com/ipld/go-ipld-prime/node/basicnode" - "github.com/ipld/go-ipld-prime/traversal" - "github.com/ipld/go-ipld-prime/traversal/selector" routinghelpers "github.com/libp2p/go-libp2p-routing-helpers" tnet "github.com/libp2p/go-libp2p-testing/net" p2ptestutil "github.com/libp2p/go-libp2p-testing/netutil" @@ -166,7 +155,7 @@ type TestPeer struct { BitswapNetwork bsnet.BitSwapNetwork DatatransferServer datatransfer.Manager HttpServer *TestPeerHttpServer - blockstore blockstore.Blockstore + blockstore *BackedStore Host host.Host blockstoreDelay delay.D LinkSystem *linking.LinkSystem @@ -175,7 +164,7 @@ type TestPeer struct { } // Blockstore returns the block store for this test instance -func (i *TestPeer) Blockstore() blockstore.Blockstore { +func (i *TestPeer) Blockstore() *BackedStore { return i.blockstore } @@ -287,7 +276,8 @@ func newTestPeer(ctx context.Context, mn mocknet.Mocknet, p tnet.Identity) (Test panic(err.Error()) } - dstore := ds_sync.MutexWrap(ds.NewMapDatastore()) + baseStore := ds.NewMapDatastore() + dstore := ds_sync.MutexWrap(baseStore) dstoreDelayed := delayed.New(dstore, bsdelay) bstore, err := blockstore.CachedBlockstore(ctx, @@ -296,11 +286,12 @@ func newTestPeer(ctx context.Context, mn mocknet.Mocknet, p tnet.Identity) (Test if err != nil { return TestPeer{}, nil, err } - lsys := storeutil.LinkSystemForBlockstore(bstore) + backedStore := &BackedStore{bstore} + lsys := storeutil.LinkSystemForBlockstore(backedStore) tp := TestPeer{ Host: client, ID: p.ID(), - blockstore: bstore, + blockstore: backedStore, blockstoreDelay: bsdelay, LinkSystem: &lsys, Cids: make(map[cid.Cid]struct{}), @@ -342,126 +333,6 @@ func StartAndWaitForReady(ctx context.Context, manager datatransfer.Manager) err } } -func MockIpfsHandler(ctx context.Context, lsys linking.LinkSystem) func(http.ResponseWriter, *http.Request) { - return func(res http.ResponseWriter, req *http.Request) { - urlPath := strings.Split(req.URL.Path, "/")[1:] - - // validate CID path parameter - cidStr := urlPath[1] - rootCid, err := cid.Parse(cidStr) - if err != nil { - http.Error(res, fmt.Sprintf("Failed to parse CID path parameter: %s", cidStr), http.StatusBadRequest) - return - } - - // Grab unixfs path if it exists - unixfsPath := "" - if len(urlPath) > 2 { - unixfsPath = "/" + strings.Join(urlPath[2:], "/") - } - - acceptTypes := strings.Split(req.Header.Get("Accept"), ",") - includeDupes := false - for _, acceptType := range acceptTypes { - typeParts := strings.Split(acceptType, ";") - if typeParts[0] == "application/vnd.ipld.car" { - for _, nextPart := range typeParts[1:] { - pair := strings.Split(nextPart, "=") - if len(pair) == 2 { - attr := strings.TrimSpace(pair[0]) - value := strings.TrimSpace(pair[1]) - if attr == "dups" && value == "y" { - includeDupes = true - } - } - } - } - } - - // We're always providing the dag-scope parameter, so add a failure case if we stop - // providing it in the future - if !req.URL.Query().Has("dag-scope") { - http.Error(res, "Missing dag-scope parameter", http.StatusBadRequest) - return - } - - // Parse car scope and use it to get selector - var dagScope types.DagScope - switch req.URL.Query().Get("dag-scope") { - case "all": - dagScope = types.DagScopeAll - case "entity": - dagScope = types.DagScopeEntity - case "block": - dagScope = types.DagScopeBlock - default: - http.Error(res, fmt.Sprintf("Invalid dag-scope parameter: %s", req.URL.Query().Get("dag-scope")), http.StatusBadRequest) - return - } - - selNode := unixfsnode.UnixFSPathSelectorBuilder(unixfsPath, dagScope.TerminalSelectorSpec(), false) - sel, err := selector.CompileSelector(selNode) - if err != nil { - http.Error(res, fmt.Sprintf("Failed to compile selector from dag-scope: %v", err), http.StatusInternalServerError) - return - } - - // Write to response writer - carWriter, err := storage.NewWritable(res, []cid.Cid{rootCid}, car.WriteAsCarV1(true), car.AllowDuplicatePuts(includeDupes)) - if err != nil { - http.Error(res, fmt.Sprintf("Failed to create car writer: %v", err), http.StatusInternalServerError) - return - } - - // Extend the StorageReadOpener func to write to the carWriter - originalSRO := lsys.StorageReadOpener - lsys.StorageReadOpener = func(lc linking.LinkContext, lnk datamodel.Link) (io.Reader, error) { - r, err := originalSRO(lc, lnk) - if err != nil { - return nil, err - } - byts, err := io.ReadAll(r) - if err != nil { - return nil, err - } - err = carWriter.Put(ctx, lnk.(cidlink.Link).Cid.KeyString(), byts) - if err != nil { - return nil, err - } - - return bytes.NewReader(byts), nil - } - - protoChooser := dagpb.AddSupportToChooser(basicnode.Chooser) - lnk := cidlink.Link{Cid: rootCid} - lnkCtx := linking.LinkContext{} - proto, err := protoChooser(lnk, lnkCtx) - if err != nil { - http.Error(res, fmt.Sprintf("Failed to choose prototype node: %s", cidStr), http.StatusBadRequest) - return - } - - rootNode, err := lsys.Load(lnkCtx, lnk, proto) - if err != nil { - http.Error(res, fmt.Sprintf("Failed to load root cid into link system: %v", err), http.StatusInternalServerError) - return - } - - cfg := &traversal.Config{ - Ctx: ctx, - LinkSystem: lsys, - LinkTargetNodePrototypeChooser: protoChooser, - } - progress := traversal.Progress{Cfg: cfg} - - err = progress.WalkAdv(rootNode, sel, visitNoop) - if err != nil { - // if we loaded the first block, we can't write headers any more - return - } - } -} - // RandTestPeerIdentity is a wrapper around // github.com/libp2p/go-libp2p-testing/netutil/RandTestBogusIdentity that // ensures the returned identity has an available port. The identity generated @@ -485,5 +356,3 @@ func RandTestPeerIdentity() (tnet.Identity, error) { } return nil, errors.New("failed to find an available port") } - -func visitNoop(p traversal.Progress, n datamodel.Node, vr traversal.VisitReason) error { return nil } diff --git a/pkg/internal/itest/testpeer/peerhttpserver.go b/pkg/internal/itest/testpeer/peerhttpserver.go index 54588c2e..a439d408 100644 --- a/pkg/internal/itest/testpeer/peerhttpserver.go +++ b/pkg/internal/itest/testpeer/peerhttpserver.go @@ -1,11 +1,26 @@ package testpeer import ( + "bytes" "context" "fmt" + "io" "net" "net/http" + "strings" + "github.com/filecoin-project/lassie/pkg/types" + "github.com/ipfs/go-cid" + "github.com/ipfs/go-unixfsnode" + "github.com/ipld/go-car/v2" + "github.com/ipld/go-car/v2/storage" + dagpb "github.com/ipld/go-codec-dagpb" + "github.com/ipld/go-ipld-prime/datamodel" + "github.com/ipld/go-ipld-prime/linking" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" + "github.com/ipld/go-ipld-prime/node/basicnode" + "github.com/ipld/go-ipld-prime/traversal" + "github.com/ipld/go-ipld-prime/traversal/selector" servertiming "github.com/mitchellh/go-server-timing" ) @@ -65,3 +80,128 @@ func (s *TestPeerHttpServer) Close() error { s.cancel() return s.server.Shutdown(context.Background()) } + +func MockIpfsHandler(ctx context.Context, lsys linking.LinkSystem) func(http.ResponseWriter, *http.Request) { + return func(res http.ResponseWriter, req *http.Request) { + urlPath := strings.Split(req.URL.Path, "/")[1:] + + // validate CID path parameter + cidStr := urlPath[1] + rootCid, err := cid.Parse(cidStr) + if err != nil { + http.Error(res, fmt.Sprintf("Failed to parse CID path parameter: %s", cidStr), http.StatusBadRequest) + return + } + + // Grab unixfs path if it exists + unixfsPath := "" + if len(urlPath) > 2 { + unixfsPath = "/" + strings.Join(urlPath[2:], "/") + } + + acceptTypes := strings.Split(req.Header.Get("Accept"), ",") + includeDupes := false + for _, acceptType := range acceptTypes { + typeParts := strings.Split(acceptType, ";") + if typeParts[0] == "application/vnd.ipld.car" { + for _, nextPart := range typeParts[1:] { + pair := strings.Split(nextPart, "=") + if len(pair) == 2 { + attr := strings.TrimSpace(pair[0]) + value := strings.TrimSpace(pair[1]) + if attr == "dups" && value == "y" { + includeDupes = true + } + } + } + } + } + + // We're always providing the dag-scope parameter, so add a failure case if we stop + // providing it in the future + if !req.URL.Query().Has("dag-scope") { + http.Error(res, "Missing dag-scope parameter", http.StatusBadRequest) + return + } + + // Parse car scope and use it to get selector + var dagScope types.DagScope + switch req.URL.Query().Get("dag-scope") { + case "all": + dagScope = types.DagScopeAll + case "entity": + dagScope = types.DagScopeEntity + case "block": + dagScope = types.DagScopeBlock + default: + http.Error(res, fmt.Sprintf("Invalid dag-scope parameter: %s", req.URL.Query().Get("dag-scope")), http.StatusBadRequest) + return + } + var byteRange *types.ByteRange + if req.URL.Query().Get("entity-bytes") != "" { + br, err := types.ParseByteRange(req.URL.Query().Get("entity-bytes")) + if err != nil { + http.Error(res, fmt.Sprintf("Invalid entity-bytes parameter: %s", req.URL.Query().Get("entity-bytes")), http.StatusBadRequest) + return + } + byteRange = &br + } + + sel, err := selector.CompileSelector(types.PathScopeSelector(unixfsPath, dagScope, byteRange)) + if err != nil { + http.Error(res, fmt.Sprintf("Failed to compile selector from dag-scope: %v", err), http.StatusInternalServerError) + return + } + + // Write to response writer + carWriter, err := storage.NewWritable(res, []cid.Cid{rootCid}, car.WriteAsCarV1(true), car.AllowDuplicatePuts(includeDupes)) + if err != nil { + http.Error(res, fmt.Sprintf("Failed to create car writer: %v", err), http.StatusInternalServerError) + return + } + + // Extend the StorageReadOpener func to write to the carWriter + originalSRO := lsys.StorageReadOpener + lsys.StorageReadOpener = func(lc linking.LinkContext, lnk datamodel.Link) (io.Reader, error) { + r, err := originalSRO(lc, lnk) + if err != nil { + return nil, err + } + byts, err := io.ReadAll(r) + if err != nil { + return nil, err + } + err = carWriter.Put(ctx, lnk.(cidlink.Link).Cid.KeyString(), byts) + if err != nil { + return nil, err + } + + return bytes.NewReader(byts), nil + } + + protoChooser := dagpb.AddSupportToChooser(basicnode.Chooser) + lnk := cidlink.Link{Cid: rootCid} + lnkCtx := linking.LinkContext{} + proto, err := protoChooser(lnk, lnkCtx) + if err != nil { + http.Error(res, fmt.Sprintf("Failed to choose prototype node: %s", cidStr), http.StatusBadRequest) + return + } + + rootNode, err := lsys.Load(lnkCtx, lnk, proto) + if err != nil { + http.Error(res, fmt.Sprintf("Failed to load root cid into link system: %v", err), http.StatusInternalServerError) + return + } + + cfg := &traversal.Config{ + Ctx: ctx, + LinkSystem: lsys, + LinkTargetNodePrototypeChooser: protoChooser, + } + progress := traversal.Progress{Cfg: cfg} + + _ = progress.WalkMatching(rootNode, sel, unixfsnode.BytesConsumingMatcher) + // if we loaded the first block, we can't write headers any more so don't bother + } +} diff --git a/pkg/internal/itest/trustless_fetch_test.go b/pkg/internal/itest/trustless_fetch_test.go new file mode 100644 index 00000000..aa534a22 --- /dev/null +++ b/pkg/internal/itest/trustless_fetch_test.go @@ -0,0 +1,149 @@ +package itest + +import ( + "context" + "fmt" + "io" + "net/http" + "strings" + "sync" + "testing" + "time" + + datatransfer "github.com/filecoin-project/go-data-transfer/v2" + "github.com/filecoin-project/lassie/pkg/internal/itest/mocknet" + "github.com/filecoin-project/lassie/pkg/lassie" + httpserver "github.com/filecoin-project/lassie/pkg/server/http" + "github.com/google/uuid" + "github.com/ipfs/go-unixfsnode" + "github.com/ipld/go-car/v2" + cidlink "github.com/ipld/go-ipld-prime/linking/cid" + trustlesspathing "github.com/ipld/ipld/specs/pkg-go/trustless-pathing" + "github.com/stretchr/testify/require" +) + +func TestTrustlessUnixfsFetch(t *testing.T) { + req := require.New(t) + + testCases, err := trustlesspathing.Unixfs20mVarietyCases() + req.NoError(err) + storage, closer, err := trustlesspathing.Unixfs20mVarietyReadableStorage() + req.NoError(err) + defer closer.Close() + + lsys := cidlink.DefaultLinkSystem() + lsys.TrustedStorage = true + unixfsnode.AddUnixFSReificationToLinkSystem(&lsys) + lsys.SetReadStorage(storage) + + for _, tc := range testCases { + for _, proto := range []string{"http", "graphsync", "bitswap"} { + t.Run(tc.Name+"/"+proto, func(t *testing.T) { + req := require.New(t) + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + t.Logf("query=%s, blocks=%d", tc.AsQuery(), len(tc.ExpectedCids)) + + mrn := mocknet.NewMockRetrievalNet(ctx, t) + switch proto { + case "http": + mrn.AddHttpPeers(1) + case "graphsync": + mrn.AddGraphsyncPeers(1) + case "bitswap": + mrn.AddBitswapPeers(1) + } + require.NoError(t, mrn.MN.LinkAll()) + var finishedChan chan []datatransfer.Event + if proto == "graphsync" { + finishedChan = mocknet.SetupRetrieval(t, mrn.Remotes[0]) + } + + mrn.Remotes[0].Blockstore().UseLinkSystem(lsys) + mrn.Remotes[0].Cids[tc.Root] = struct{}{} + + lassie, err := lassie.NewLassie( + ctx, + lassie.WithProviderTimeout(20*time.Second), + lassie.WithHost(mrn.Self), + lassie.WithFinder(mrn.Finder), + ) + req.NoError(err) + cfg := httpserver.HttpServerConfig{Address: "127.0.0.1", Port: 0, TempDir: t.TempDir()} + httpServer, err := httpserver.NewHttpServer(ctx, lassie, cfg) + req.NoError(err) + serverError := make(chan error, 1) + go func() { + serverError <- httpServer.Start() + }() + responseChan := make(chan *http.Response, 1) + go func() { + // Make a request for our CID and read the complete CAR bytes + addr := fmt.Sprintf("http://%s%s", httpServer.Addr(), tc.AsQuery()) + getReq, err := http.NewRequest("GET", addr, nil) + req.NoError(err) + getReq.Header.Add("Accept", "application/vnd.ipld.car") + t.Log("Fetching", getReq.URL.String()) + resp, err := http.DefaultClient.Do(getReq) + req.NoError(err) + responseChan <- resp + }() + var resp *http.Response + select { + case resp = <-responseChan: + case <-ctx.Done(): + req.FailNow("Did not receive responses") + } + if finishedChan != nil { + // for graphsync + var wg sync.WaitGroup + wg.Add(1) + go func() { + mocknet.WaitForFinish(ctx, t, finishedChan, 1*time.Second) + wg.Done() + }() + wg.Wait() + } + if resp.StatusCode != http.StatusOK { + body, err := io.ReadAll(resp.Body) + req.NoError(err) + req.Failf("200 response code not received", "got code: %d, body: %s", resp.StatusCode, string(body)) + } + req.Equal(fmt.Sprintf(`attachment; filename="%s.car"`, tc.Root.String()), resp.Header.Get("Content-Disposition")) + req.Equal("none", resp.Header.Get("Accept-Ranges")) + req.Equal("public, max-age=29030400, immutable", resp.Header.Get("Cache-Control")) + req.Equal("application/vnd.ipld.car; version=1", resp.Header.Get("Content-Type")) + req.Equal("nosniff", resp.Header.Get("X-Content-Type-Options")) + etagStart := fmt.Sprintf(`"%s.car.`, tc.Root.String()) + etagGot := resp.Header.Get("ETag") + req.True(strings.HasPrefix(etagGot, etagStart), "ETag should start with [%s], got [%s]", etagStart, etagGot) + req.Equal(`"`, etagGot[len(etagGot)-1:], "ETag should end with a quote") + req.Equal(fmt.Sprintf("/ipfs/%s%s", tc.Root.String(), tc.Path), resp.Header.Get("X-Ipfs-Path")) + requestId := resp.Header.Get("X-Trace-Id") + require.NotEmpty(t, requestId) + _, err = uuid.Parse(requestId) + req.NoError(err) + + rdr, err := car.NewBlockReader(resp.Body) + req.NoError(err) + req.Len(rdr.Roots, 1) + req.Equal(tc.Root.String(), rdr.Roots[0].String()) + for ii := 0; ; ii++ { + blk, err := rdr.Next() + if err == io.EOF { + if ii != len(tc.ExpectedCids) { + req.FailNowf("unexpected EOF", "expected %d blocks, got %d", len(tc.ExpectedCids), ii) + } + break + } + req.NoError(err) + if ii >= len(tc.ExpectedCids) { + req.FailNowf("unexpected block", "got block %d, expected %d", ii, len(tc.ExpectedCids)) + } + req.Equal(tc.ExpectedCids[ii].String(), blk.Cid().String(), "unexpected block #%d", ii) + } + }) + } + } +} diff --git a/pkg/internal/testutil/toblocks.go b/pkg/internal/testutil/toblocks.go index bbc43927..81061f8e 100644 --- a/pkg/internal/testutil/toblocks.go +++ b/pkg/internal/testutil/toblocks.go @@ -53,8 +53,7 @@ func ToBlocks(t *testing.T, lsys linking.LinkSystem, root cid.Cid, selNode datam LinkTargetNodePrototypeChooser: dagpb.AddSupportToChooser(basicnode.Chooser), }, } - vf := func(p traversal.Progress, n datamodel.Node, vr traversal.VisitReason) error { return nil } - err = prog.WalkAdv(rootNode, sel, vf) + err = prog.WalkMatching(rootNode, sel, unixfsnode.BytesConsumingMatcher) require.NoError(t, err) return traversedBlocks diff --git a/pkg/retriever/bitswapretriever.go b/pkg/retriever/bitswapretriever.go index f7224dba..bdd2fcf0 100644 --- a/pkg/retriever/bitswapretriever.go +++ b/pkg/retriever/bitswapretriever.go @@ -18,6 +18,7 @@ import ( "github.com/ipfs/boxo/bitswap/network" "github.com/ipfs/boxo/blockservice" "github.com/ipfs/go-cid" + "github.com/ipfs/go-unixfsnode" dagpb "github.com/ipld/go-codec-dagpb" "github.com/ipld/go-ipld-prime/datamodel" "github.com/ipld/go-ipld-prime/linking" @@ -295,10 +296,6 @@ func loaderForSession(retrievalID types.RetrievalID, inProgressCids InProgressCi } } -func noopVisitor(prog traversal.Progress, n datamodel.Node, reason traversal.VisitReason) error { - return nil -} - func easyTraverse( ctx context.Context, root datamodel.Link, @@ -340,7 +337,8 @@ func easyTraverse( if err != nil { return err } - if err := progress.WalkAdv(node, compiledSelector, noopVisitor); err != nil { + + if err := progress.WalkMatching(node, compiledSelector, unixfsnode.BytesConsumingMatcher); err != nil { return err } return ecr.Error diff --git a/pkg/server/http/ipfs.go b/pkg/server/http/ipfs.go index f805c6a1..e5e8738d 100644 --- a/pkg/server/http/ipfs.go +++ b/pkg/server/http/ipfs.go @@ -82,6 +82,12 @@ func ipfsHandler(lassie *lassie.Lassie, cfg HttpServerConfig) func(http.Response return } + byteRange, err := ParseByteRange(req) + if err != nil { + errorResponse(res, statusLogger, http.StatusBadRequest, err) + return + } + protocols, err := parseProtocols(req) if err != nil { errorResponse(res, statusLogger, http.StatusBadRequest, err) @@ -123,7 +129,7 @@ func ipfsHandler(lassie *lassie.Lassie, cfg HttpServerConfig) func(http.Response tempStore := storage.NewDeferredStorageCar(cfg.TempDir) var carWriter storage.DeferredWriter if includeDupes { - carWriter = storage.NewDuplicateAdderCarForStream(req.Context(), rootCid, path.String(), dagScope, tempStore, res) + carWriter = storage.NewDuplicateAdderCarForStream(req.Context(), rootCid, path.String(), dagScope, byteRange, tempStore, res) } else { carWriter = storage.NewDeferredCarWriterForStream(rootCid, res) } @@ -135,7 +141,7 @@ func ipfsHandler(lassie *lassie.Lassie, cfg HttpServerConfig) func(http.Response }() var store types.ReadableWritableStorage = carStore - request, err := types.NewRequestForPath(store, rootCid, path.String(), dagScope) + request, err := types.NewRequestForPath(store, rootCid, path.String(), dagScope, byteRange) if err != nil { errorResponse(res, statusLogger, http.StatusInternalServerError, fmt.Errorf("failed to create request: %w", err)) return @@ -185,7 +191,22 @@ func ipfsHandler(lassie *lassie.Lassie, cfg HttpServerConfig) func(http.Response } // servertiming metrics - logger.Debugw("fetching CID", "retrievalId", retrievalId, "CID", rootCid.String(), "path", path.String(), "dagScope", dagScope) + logger.Debugw("fetching CID", + "retrievalId", + retrievalId, + "CID", + rootCid.String(), + "path", + path.String(), + "dagScope", + dagScope, + "byteRange", + byteRange, + "includeDupes", + includeDupes, + "blockLimit", + blockLimit, + ) stats, err := lassie.Fetch(req.Context(), request, servertimingsSubscriber(req)) // force all blocks to flush diff --git a/pkg/server/http/util.go b/pkg/server/http/util.go index 13be0574..537b1823 100644 --- a/pkg/server/http/util.go +++ b/pkg/server/http/util.go @@ -14,16 +14,7 @@ import ( // parameter is not one of the supported values. func ParseScope(req *http.Request) (types.DagScope, error) { if req.URL.Query().Has("dag-scope") { - switch req.URL.Query().Get("dag-scope") { - case "all": - return types.DagScopeAll, nil - case "entity": - return types.DagScopeEntity, nil - case "block": - return types.DagScopeBlock, nil - default: - return types.DagScopeAll, errors.New("invalid dag-scope parameter") - } + return types.ParseDagScope(req.URL.Query().Get("dag-scope")) } // check for legacy param name -- to do -- delete once we confirm this isn't used any more if req.URL.Query().Has("car-scope") { @@ -41,6 +32,20 @@ func ParseScope(req *http.Request) (types.DagScope, error) { return types.DagScopeAll, nil } +// ParseByteRange returns the entity-bytes query parameter if one is set in the +// query string or nil if one is not set. An error is returned if an +// entity-bytes query string is not a valid byte range. +func ParseByteRange(req *http.Request) (*types.ByteRange, error) { + if req.URL.Query().Has("entity-bytes") { + br, err := types.ParseByteRange(req.URL.Query().Get("entity-bytes")) + if err != nil { + return nil, err + } + return &br, nil + } + return nil, nil +} + // ParseFilename returns the filename query parameter or an error if the filename // extension is not ".car". Lassie only supports returning CAR data. // See https://specs.ipfs.tech/http-gateways/path-gateway/#filename-request-query-parameter diff --git a/pkg/storage/duplicateaddercar.go b/pkg/storage/duplicateaddercar.go index df11ef17..f5b6a164 100644 --- a/pkg/storage/duplicateaddercar.go +++ b/pkg/storage/duplicateaddercar.go @@ -24,13 +24,23 @@ type DuplicateAdderCar struct { root cid.Cid path string scope types.DagScope + bytes *types.ByteRange store *DeferredStorageCar blockStream *blockStream streamCompletion chan error streamCompletionLk sync.Mutex } -func NewDuplicateAdderCarForStream(ctx context.Context, root cid.Cid, path string, scope types.DagScope, store *DeferredStorageCar, outStream io.Writer) *DuplicateAdderCar { +func NewDuplicateAdderCarForStream( + ctx context.Context, + root cid.Cid, + path string, + scope types.DagScope, + bytes *types.ByteRange, + store *DeferredStorageCar, + outStream io.Writer, +) *DuplicateAdderCar { + blockStream := &blockStream{ctx: ctx, seen: make(map[cid.Cid]struct{})} blockStream.blockBuffer = list.New() blockStream.cond = sync.NewCond(&blockStream.mu) @@ -43,6 +53,7 @@ func NewDuplicateAdderCarForStream(ctx context.Context, root cid.Cid, path strin root: root, path: path, scope: scope, + bytes: bytes, store: store, blockStream: blockStream, } @@ -53,7 +64,7 @@ func (da *DuplicateAdderCar) addDupes() { defer func() { da.streamCompletion <- err }() - sel := types.PathScopeSelector(da.path, da.scope) + sel := types.PathScopeSelector(da.path, da.scope, da.bytes) // we're going to do a verified car where we add dupes back in cfg := verifiedcar.Config{ diff --git a/pkg/storage/duplicateaddercar_test.go b/pkg/storage/duplicateaddercar_test.go index 3061ef16..38e57627 100644 --- a/pkg/storage/duplicateaddercar_test.go +++ b/pkg/storage/duplicateaddercar_test.go @@ -19,7 +19,6 @@ import ( ) func TestDuplicateAdderCar(t *testing.T) { - setupStore := &testutil.CorrectedMemStore{ParentStore: &memstore.Store{ Bag: make(map[string][]byte), }} @@ -34,7 +33,7 @@ func TestDuplicateAdderCar(t *testing.T) { store := storage.NewDeferredStorageCar("") ctx := context.Background() - carWriter := storage.NewDuplicateAdderCarForStream(ctx, unixfsFileWithDups.Root, "", types.DagScopeAll, store, buf) + carWriter := storage.NewDuplicateAdderCarForStream(ctx, unixfsFileWithDups.Root, "", types.DagScopeAll, nil, store, buf) cachingTempStore := storage.NewCachingTempStore(carWriter.BlockWriteOpener(), store) // write the root block, containing sharding metadata diff --git a/pkg/types/request.go b/pkg/types/request.go index 20dcedb7..682d4f3e 100644 --- a/pkg/types/request.go +++ b/pkg/types/request.go @@ -3,6 +3,7 @@ package types import ( "errors" "fmt" + "math" "strconv" "strings" @@ -13,7 +14,10 @@ import ( "github.com/ipld/go-ipld-prime" "github.com/ipld/go-ipld-prime/datamodel" cidlink "github.com/ipld/go-ipld-prime/linking/cid" + "github.com/ipld/go-ipld-prime/node/basicnode" ipldstorage "github.com/ipld/go-ipld-prime/storage" + "github.com/ipld/go-ipld-prime/traversal/selector" + "github.com/ipld/go-ipld-prime/traversal/selector/builder" "github.com/libp2p/go-libp2p/core/peer" "github.com/multiformats/go-multicodec" ) @@ -72,6 +76,10 @@ type RetrievalRequest struct { // is not set, Scope and Path will be used to construct a selector. Scope DagScope + // Bytes is the optional byte range within the DAG to fetch. If not set + // the default byte range will fetch the entire file. + Bytes *ByteRange + // Duplicates is a flag that indicates whether duplicate blocks should be // stored into the LinkSystem where they occur in the traversal. Duplicates bool @@ -101,7 +109,14 @@ type RetrievalRequest struct { // and writing and it is explicitly set to be trusted (i.e. it will not // check CIDs match bytes). If the storage is not truested, // request.LinkSystem.TrustedStore should be set to false after this call. -func NewRequestForPath(store ipldstorage.WritableStorage, cid cid.Cid, path string, dagScope DagScope) (RetrievalRequest, error) { +func NewRequestForPath( + store ipldstorage.WritableStorage, + cid cid.Cid, + path string, + dagScope DagScope, + byteRange *ByteRange, +) (RetrievalRequest, error) { + retrievalId, err := NewRetrievalID() if err != nil { return RetrievalRequest{}, err @@ -120,13 +135,48 @@ func NewRequestForPath(store ipldstorage.WritableStorage, cid cid.Cid, path stri Cid: cid, Path: path, Scope: dagScope, + Bytes: byteRange, LinkSystem: linkSystem, }, nil } -func PathScopeSelector(path string, scope DagScope) ipld.Node { +// PathScopeSelector generates a selector for the given path, scope and byte +// range. Use DefaultByteRange() for the default byte range value if none is +// specified. +func PathScopeSelector(path string, scope DagScope, bytes *ByteRange) ipld.Node { // Turn the path / scope into a selector - return unixfsnode.UnixFSPathSelectorBuilder(path, scope.TerminalSelectorSpec(), false) + terminal := scope.TerminalSelectorSpec() + if !bytes.IsDefault() { + var to int64 = math.MaxInt64 + if bytes.To != nil && *bytes.To > 0 { + to = *bytes.To + 1 // selector is exclusive, so increment the end + } + // TODO: negative ranges are not currently supported, we fall-back to matching entire file + if bytes.From >= 0 && to >= 0 { + ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any) + // if we reach a terminal and it's not a file, then we need to fall-back to the default + // selector for the given scope. We do this with a union of the original terminal. + if scope == DagScopeEntity { + // entity is a special case which we can't just union with our matcher because it + // has its own matcher in it which we need to replace with the subset matcher. + terminal = ssb.ExploreInterpretAs("unixfs", + ssb.ExploreUnion( + ssb.MatcherSubset(bytes.From, to), + ssb.ExploreRecursive( + selector.RecursionLimitDepth(1), + ssb.ExploreAll(ssb.ExploreRecursiveEdge()), + ), + ), + ) + } else { + terminal = ssb.ExploreUnion( + ssb.ExploreInterpretAs("unixfs", ssb.MatcherSubset(bytes.From, to)), + terminal, + ) + } + } + } + return unixfsnode.UnixFSPathSelectorBuilder(path, terminal, false) } // GetSelector will safely return a selector for this request. If none has been @@ -135,7 +185,7 @@ func (r RetrievalRequest) GetSelector() ipld.Node { if r.Selector != nil { // custom selector return r.Selector } - return PathScopeSelector(r.Path, r.Scope) + return PathScopeSelector(r.Path, r.Scope, r.Bytes) } // GetUrlPath returns a URL path and query string valid with the Trusted HTTP @@ -155,11 +205,15 @@ func (r RetrievalRequest) GetUrlPath() (string, error) { if legacyScope == string(DagScopeEntity) { legacyScope = "file" } + byteRange := "" + if !r.Bytes.IsDefault() { + byteRange = "&entity-bytes=" + r.Bytes.String() + } path := r.Path if path != "" { path = "/" + path } - return fmt.Sprintf("%s?dag-scope=%s&car-scope=%s", path, scope, legacyScope), nil + return fmt.Sprintf("%s?dag-scope=%s&car-scope=%s%s", path, scope, legacyScope, byteRange), nil } // GetSupportedProtocols will safely return the supported protocols for a specific request. @@ -183,23 +237,31 @@ func (r RetrievalRequest) GetSupportedProtocols(allSupportedProtocols []multicod } func (r RetrievalRequest) Etag() string { - // https://github.com/ipfs/boxo/pull/303/commits/f61f95481041406df46a1781b1daab34b6605650#r1213918777 + // similar, but extended form of: + // https://github.com/ipfs/boxo/blob/a91e44dbdbd4c36a5b25a1b9df6ee237aa4442d2/gateway/handler_car.go#L167-L184 sb := strings.Builder{} sb.WriteString("/ipfs/") sb.WriteString(r.Cid.String()) if r.Path != "" { - sb.WriteString("/") + sb.WriteRune('/') sb.WriteString(datamodel.ParsePath(r.Path).String()) } if r.Scope != DagScopeAll { - sb.WriteString(".") + sb.WriteRune('.') sb.WriteString(string(r.Scope)) } + if !r.Bytes.IsDefault() { + sb.WriteRune('.') + sb.WriteString(strconv.FormatInt(r.Bytes.From, 10)) + if r.Bytes.To != nil { + sb.WriteRune('.') + sb.WriteString(strconv.FormatInt(*r.Bytes.To, 10)) + } + } if r.Duplicates { sb.WriteString(".dups") } sb.WriteString(".dfs") - // range bytes would go here: `.from.to` suffix := strconv.FormatUint(xxhash.Sum64([]byte(sb.String())), 32) return `"` + r.Cid.String() + ".car." + suffix + `"` } diff --git a/pkg/types/request_test.go b/pkg/types/request_test.go index 174cf7ae..554e6b3a 100644 --- a/pkg/types/request_test.go +++ b/pkg/types/request_test.go @@ -22,6 +22,7 @@ func TestEtag(t *testing.T) { cid cid.Cid path string scope types.DagScope + bytes *types.ByteRange dups bool expected string }{ @@ -115,14 +116,56 @@ func TestEtag(t *testing.T) { scope: types.DagScopeAll, expected: `"bafyrgqhai26anf3i7pips7q22coa4sz2fr4gk4q4sqdtymvvjyginfzaqewveaeqdh524nsktaq43j65v22xxrybrtertmcfxufdam3da3hbk.car.9lumqv26cg30t"`, }, + { + cid: cid.MustParse("QmVXsSVjwxMsCwKRCUxEkGb4f4B98gXVy3ih3v4otvcURK"), + scope: types.DagScopeAll, + bytes: &types.ByteRange{From: 0}, // default, not included + expected: `"QmVXsSVjwxMsCwKRCUxEkGb4f4B98gXVy3ih3v4otvcURK.car.58mf8vcmd2eo8"`, + }, + { + cid: cid.MustParse("QmVXsSVjwxMsCwKRCUxEkGb4f4B98gXVy3ih3v4otvcURK"), + scope: types.DagScopeAll, + bytes: &types.ByteRange{From: 10}, + expected: `"QmVXsSVjwxMsCwKRCUxEkGb4f4B98gXVy3ih3v4otvcURK.car.560ditjelh0u2"`, + }, + { + cid: cid.MustParse("QmVXsSVjwxMsCwKRCUxEkGb4f4B98gXVy3ih3v4otvcURK"), + scope: types.DagScopeAll, + bytes: &types.ByteRange{From: 0, To: ptr(200)}, + expected: `"QmVXsSVjwxMsCwKRCUxEkGb4f4B98gXVy3ih3v4otvcURK.car.faqf14andvfmb"`, + }, + { + cid: cid.MustParse("QmVXsSVjwxMsCwKRCUxEkGb4f4B98gXVy3ih3v4otvcURK"), + scope: types.DagScopeAll, + bytes: &types.ByteRange{From: 100, To: ptr(200)}, + expected: `"QmVXsSVjwxMsCwKRCUxEkGb4f4B98gXVy3ih3v4otvcURK.car.bvebrb14stt94"`, + }, + { + cid: cid.MustParse("QmVXsSVjwxMsCwKRCUxEkGb4f4B98gXVy3ih3v4otvcURK"), + scope: types.DagScopeEntity, + bytes: &types.ByteRange{From: 100, To: ptr(200)}, + expected: `"QmVXsSVjwxMsCwKRCUxEkGb4f4B98gXVy3ih3v4otvcURK.car.bq3u6t9t877t3"`, + }, + { + cid: cid.MustParse("QmVXsSVjwxMsCwKRCUxEkGb4f4B98gXVy3ih3v4otvcURK"), + scope: types.DagScopeEntity, + dups: true, + bytes: &types.ByteRange{From: 100, To: ptr(200)}, + expected: `"QmVXsSVjwxMsCwKRCUxEkGb4f4B98gXVy3ih3v4otvcURK.car.fhf498an52uqb"`, + }, } for _, tc := range testCases { - t.Run(fmt.Sprintf("%s:%s:%s:%v", tc.cid.String(), tc.path, tc.scope, tc.dups), func(t *testing.T) { + br := "" + if tc.bytes != nil { + br = ":" + tc.bytes.String() + } + t.Run(fmt.Sprintf("%s:%s:%s:%v%s", tc.cid.String(), tc.path, tc.scope, tc.dups, br), func(t *testing.T) { rr := types.RetrievalRequest{ Cid: tc.cid, Path: tc.path, Scope: tc.scope, + Bytes: tc.bytes, Duplicates: tc.dups, } actual := rr.Etag() @@ -132,3 +175,7 @@ func TestEtag(t *testing.T) { }) } } + +func ptr(i int64) *int64 { + return &i +} diff --git a/pkg/types/types.go b/pkg/types/types.go index 383ce1bf..fb8dd3bd 100644 --- a/pkg/types/types.go +++ b/pkg/types/types.go @@ -5,6 +5,8 @@ import ( "errors" "fmt" "net/url" + "strconv" + "strings" "time" "github.com/filecoin-project/go-state-types/abi" @@ -271,7 +273,7 @@ func (ds DagScope) TerminalSelectorSpec() builder.SelectorSpec { case DagScopeAll: return unixfsnode.ExploreAllRecursivelySelector case DagScopeEntity: - return unixfsnode.MatchUnixFSPreloadSelector // file + return unixfsnode.MatchUnixFSEntitySelector case DagScopeBlock: return matcherSelector case DagScope(""): @@ -280,6 +282,76 @@ func (ds DagScope) TerminalSelectorSpec() builder.SelectorSpec { panic(fmt.Sprintf("unknown DagScope: [%s]", string(ds))) } +func ParseDagScope(s string) (DagScope, error) { + switch s { + case "all": + return DagScopeAll, nil + case "entity": + return DagScopeEntity, nil + case "block": + return DagScopeBlock, nil + default: + return DagScopeAll, errors.New("invalid dag-scope") + } +} + func (ds DagScope) AcceptHeader() string { return "application/vnd.ipld.car;version=1;order=dfs;dups=y" } + +// ByteRange represents a range of bytes in a file. The default value is 0 to +// the end of the file, [0:*]. +// The range is inclusive at both ends, so the case of From==To selects a single +// byte. +// Where the end is * or beyond the end of the file, the end of the file is +// selected. +type ByteRange struct { + From int64 + To *int64 +} + +// IsDefault is roughly equivalent to the range matching [0:*] +func (br *ByteRange) IsDefault() bool { + return br == nil || br.From == 0 && br.To == nil +} + +func (br *ByteRange) String() string { + if br.IsDefault() { + return "0:*" + } + to := "*" // default to end of file + if br.To != nil { + to = strconv.FormatInt(*br.To, 10) + } + return fmt.Sprintf("%d:%s", br.From, to) +} + +func ParseByteRange(s string) (ByteRange, error) { + br := ByteRange{} + if s == "" { + return br, nil + } + parts := strings.Split(s, ":") + if len(parts) != 2 { + return br, fmt.Errorf("invalid entity-bytes: %s", s) + } + var err error + br.From, err = strconv.ParseInt(parts[0], 10, 64) + if err != nil { + return br, err + } + if br.From < 0 { + return br, fmt.Errorf("invalid entity-bytes: %s", s) + } + if parts[1] != "*" { + to, err := strconv.ParseInt(parts[1], 10, 64) + if err != nil { + return br, err + } + if to < 0 { + return br, fmt.Errorf("invalid entity-bytes: %s", s) + } + br.To = &to + } + return br, nil +} diff --git a/pkg/verifiedcar/verifiedcar.go b/pkg/verifiedcar/verifiedcar.go index 03a4c4e6..d3bf7ef6 100644 --- a/pkg/verifiedcar/verifiedcar.go +++ b/pkg/verifiedcar/verifiedcar.go @@ -55,8 +55,6 @@ type Config struct { MaxBlocks uint64 // set a budget for the traversal } -func visitNoop(p traversal.Progress, n datamodel.Node, r traversal.VisitReason) error { return nil } - // Verify reads a CAR from the provided reader, verifies the contents are // strictly what is specified by this Config and writes the blocks to the // provided BlockWriteOpener. It returns the number of blocks and bytes @@ -120,24 +118,19 @@ func (cfg Config) VerifyBlockStream(ctx context.Context, cbr BlockReader, lsys l NodeBudget: math.MaxInt64, } } - lc := linking.LinkContext{Ctx: ctx} - lnk := cidlink.Link{Cid: cfg.Root} - proto, err := protoChooser(lnk, lc) - if err != nil { - return 0, 0, err - } - rootNode, err := lsys.Load(lc, lnk, proto) + + rootNode, err := loadNode(ctx, cfg.Root, lsys) if err != nil { - return 0, 0, err + return 0, 0, fmt.Errorf("failed to load root node: %w", err) } - if err := progress.WalkAdv(rootNode, sel, visitNoop); err != nil { + if err := progress.WalkMatching(rootNode, sel, unixfsnode.BytesConsumingMatcher); err != nil { return 0, 0, traversalError(err) } if nbls.Error != nil { // capture any errors not bubbled up through the traversal, i.e. see // https://github.com/ipld/go-ipld-prime/pull/524 - return 0, 0, nbls.Error + return 0, 0, fmt.Errorf("block load failed during traversal: %w", nbls.Error) } // make sure we don't have any extraneous data beyond what the traversal needs @@ -152,6 +145,20 @@ func (cfg Config) VerifyBlockStream(ctx context.Context, cbr BlockReader, lsys l return bt.blocks, bt.bytes, nil } +func loadNode(ctx context.Context, rootCid cid.Cid, lsys linking.LinkSystem) (datamodel.Node, error) { + lnk := cidlink.Link{Cid: rootCid} + lnkCtx := linking.LinkContext{Ctx: ctx} + proto, err := protoChooser(lnk, lnkCtx) + if err != nil { + return nil, fmt.Errorf("failed to choose prototype for CID %s: %w", rootCid.String(), err) + } + rootNode, err := lsys.Load(lnkCtx, lnk, proto) + if err != nil { + return nil, fmt.Errorf("failed to load root CID: %w", err) + } + return rootNode, nil +} + type NextBlockLinkSystem struct { Error error } diff --git a/pkg/verifiedcar/verifiedcar_test.go b/pkg/verifiedcar/verifiedcar_test.go index 40698131..a48660a6 100644 --- a/pkg/verifiedcar/verifiedcar_test.go +++ b/pkg/verifiedcar/verifiedcar_test.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "errors" + "fmt" "io" "math/rand" "os" @@ -11,6 +12,7 @@ import ( "time" "github.com/filecoin-project/lassie/pkg/internal/testutil" + "github.com/filecoin-project/lassie/pkg/types" "github.com/filecoin-project/lassie/pkg/verifiedcar" blocks "github.com/ipfs/go-block-format" "github.com/ipfs/go-cid" @@ -19,16 +21,112 @@ import ( unixfs "github.com/ipfs/go-unixfsnode/testutil" "github.com/ipld/go-car/v2" "github.com/ipld/go-car/v2/storage" + "github.com/ipld/go-ipld-prime" + "github.com/ipld/go-ipld-prime/codec/dagjson" "github.com/ipld/go-ipld-prime/datamodel" "github.com/ipld/go-ipld-prime/linking" cidlink "github.com/ipld/go-ipld-prime/linking/cid" "github.com/ipld/go-ipld-prime/node/basicnode" "github.com/ipld/go-ipld-prime/storage/memstore" "github.com/ipld/go-ipld-prime/traversal" + "github.com/ipld/go-ipld-prime/traversal/selector" + "github.com/ipld/go-ipld-prime/traversal/selector/builder" selectorparse "github.com/ipld/go-ipld-prime/traversal/selector/parse" + trustlesspathing "github.com/ipld/ipld/specs/pkg-go/trustless-pathing" "github.com/stretchr/testify/require" ) +func TestUnixfs20mVariety(t *testing.T) { + req := require.New(t) + + testCases, err := trustlesspathing.Unixfs20mVarietyCases() + req.NoError(err) + storage, closer, err := trustlesspathing.Unixfs20mVarietyReadableStorage() + req.NoError(err) + defer closer.Close() + + lsys := cidlink.DefaultLinkSystem() + lsys.TrustedStorage = true + unixfsnode.AddUnixFSReificationToLinkSystem(&lsys) + lsys.SetReadStorage(storage) + + for _, tc := range testCases { + t.Run(tc.Name, func(t *testing.T) { + req := require.New(t) + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + t.Logf("query=%s, blocks=%d", tc.AsQuery(), len(tc.ExpectedCids)) + + // tc.ExpectedCids is in the order we expect to see them in a properly + // formed trustless CAR for the given query. So we build our list of + // expected blocks in that order and feed it through makeCarStream to + // produce the expected CAR. + expectedBlocks := make([]expectedBlock, len(tc.ExpectedCids)) + for ii, ec := range tc.ExpectedCids { + byt, err := lsys.LoadRaw(linking.LinkContext{Ctx: ctx}, cidlink.Link{Cid: ec}) + req.NoError(err) + blk, err := blocks.NewBlockWithCid(byt, ec) + req.NoError(err) + expectedBlocks[ii] = expectedBlock{blk, false} + } + + carStream, errorCh := makeCarStream(t, ctx, []cid.Cid{tc.Root}, expectedBlocks, false, false, false, nil) + + lsys := cidlink.DefaultLinkSystem() + var writeCounter int + lsys.StorageWriteOpener = func(lc linking.LinkContext) (io.Writer, linking.BlockWriteCommitter, error) { + var buf bytes.Buffer + return &buf, func(l datamodel.Link) error { + req.Equal(expectedBlocks[writeCounter].Cid().String(), l.(cidlink.Link).Cid.String(), "block %d", writeCounter) + req.Equal(expectedBlocks[writeCounter].RawData(), buf.Bytes(), "block %d", writeCounter) + writeCounter++ + return nil + }, nil + } + lsys.StorageReadOpener = func(lc linking.LinkContext, l datamodel.Link) (io.Reader, error) { + return nil, fmt.Errorf("unexpected read of %s", l.String()) + } + + // Run the verifier over the CAR stream to see if we end up with + // the same query. + scope, err := types.ParseDagScope(tc.Scope) + req.NoError(err) + var byteRange *types.ByteRange + if tc.ByteRange != "" { + br, err := types.ParseByteRange(tc.ByteRange) + req.NoError(err) + byteRange = &br + } + cfg := verifiedcar.Config{ + Root: tc.Root, + Selector: types.PathScopeSelector(tc.Path, scope, byteRange), + } + { + selBytes, _ := ipld.Encode(cfg.Selector, dagjson.Encode) + t.Logf("selector=%s, entity-bytes=%s", string(selBytes), tc.ByteRange) + } + blockCount, byteCount, err := cfg.VerifyCar(ctx, carStream, lsys) + + req.NoError(err) + req.Equal(count(expectedBlocks), blockCount) + req.Equal(sizeOf(expectedBlocks), byteCount) + req.Equal(int(count(expectedBlocks)), writeCounter) + + select { + case err := <-errorCh: + req.NoError(err) + default: + } + + // Make sure we consumed the entire stream. + byt, err := io.ReadAll(carStream) + req.NoError(err) + req.Equal(0, len(byt)) + }) + } +} + func TestVerifiedCar(t *testing.T) { ctx := context.Background() @@ -63,6 +161,17 @@ func TestVerifiedCar(t *testing.T) { unixfsFile := testutil.GenerateNoDupes(func() unixfs.DirEntry { return unixfs.GenerateFile(t, &lsys, rndReader, 4<<20) }) unixfsFileBlocks := testutil.ToBlocks(t, lsys, unixfsFile.Root, allSelector) + unixfsFileRange0_1048576Blocks := unixfsFileBlocks[0:6] + ssb := builder.NewSelectorSpecBuilder(basicnode.Prototype.Any) + ss := ssb.ExploreInterpretAs("unixfs", ssb.MatcherSubset(0, 1<<20)) + unixfsFileRange0_1048576Selector := ss.Node() + + // need the root plus the byte range of 1M->2M, which happens to include the + // block of the 0->1M range because of overlapping data + unixfsFileRange1048576_2097152Blocks := append(append([]blocks.Block{}, unixfsFileBlocks[0]), unixfsFileBlocks[5:10]...) + ss = ssb.ExploreInterpretAs("unixfs", ssb.MatcherSubset(1<<20, 2<<20)) + unixfsFileRange1048576_2097152Selector := ss.Node() + unixfsFileWithDups := unixfs.GenerateFile(t, &lsys, testutil.ZeroReader{}, 4<<20) unixfsFileWithDupsBlocks := testutil.ToBlocks(t, lsys, unixfsFileWithDups.Root, allSelector) var unixfsDir unixfs.DirEntry @@ -80,15 +189,20 @@ func TestVerifiedCar(t *testing.T) { }) unixfsShardedDirBlocks := testutil.ToBlocks(t, lsys, unixfsShardedDir.Root, allSelector) - unixfsPreloadSelector := unixfsnode.MatchUnixFSPreloadSelector.Node() + unixfsPreloadSelector := unixfsnode.MatchUnixFSEntitySelector.Node() unixfsPreloadDirBlocks := testutil.ToBlocks(t, lsys, unixfsDir.Root, unixfsPreloadSelector) unixfsPreloadShardedDirBlocks := testutil.ToBlocks(t, lsys, unixfsShardedDir.Root, unixfsPreloadSelector) - unixfsDirSubsetSelector := unixfsnode.UnixFSPathSelectorBuilder(unixfsDir.Children[1].Path, unixfsnode.MatchUnixFSPreloadSelector, false) + unixfsDirSubsetSelector := unixfsnode.UnixFSPathSelectorBuilder(unixfsDir.Children[1].Path, unixfsnode.MatchUnixFSEntitySelector, false) unixfsWrappedPathSelector := unixfsnode.UnixFSPathSelectorBuilder(wrapPath, unixfsnode.ExploreAllRecursivelySelector, false) - unixfsWrappedPreloadPathSelector := unixfsnode.UnixFSPathSelectorBuilder(wrapPath, unixfsnode.MatchUnixFSPreloadSelector, false) + unixfsWrappedPreloadPathSelector := unixfsnode.UnixFSPathSelectorBuilder(wrapPath, unixfsnode.MatchUnixFSEntitySelector, false) + preloadSubst := ssb.ExploreInterpretAs("unixfs", ssb.ExploreRecursive( + selector.RecursionLimitDepth(1), + ssb.ExploreAll(ssb.ExploreRecursiveEdge()), + )) + unixfsWrappedPreloadPathSelectorSubst := unixfsnode.UnixFSPathSelectorBuilder(wrapPath, preloadSubst, false) unixfsWrappedFile := testutil.GenerateNoDupes(func() unixfs.DirEntry { return unixfs.WrapContent(t, rndReader, &lsys, unixfsFile, wrapPath, false) }) unixfsWrappedFileBlocks := testutil.ToBlocks(t, lsys, unixfsWrappedFile.Root, allSelector) @@ -115,6 +229,7 @@ func TestVerifiedCar(t *testing.T) { mismatchedCidBlk, _ := blocks.NewBlockWithCid(extraneousByts, allBlocks[99].Cid()) testCases := []struct { name string + skip bool blocks []expectedBlock roots []cid.Cid carv2 bool @@ -436,7 +551,7 @@ func TestVerifiedCar(t *testing.T) { roots: []cid.Cid{unixfsExclusiveWrappedShardedDir.Root}, cfg: verifiedcar.Config{ Root: unixfsExclusiveWrappedShardedDir.Root, - Selector: unixfsWrappedPreloadPathSelector, + Selector: unixfsWrappedPreloadPathSelectorSubst, }, }, { @@ -514,11 +629,32 @@ func TestVerifiedCar(t *testing.T) { Selector: allSelector, }, }, + { + name: "unixfs: large sharded file byte range [0:1M]", + blocks: consumedBlocks(unixfsFileRange0_1048576Blocks), + roots: []cid.Cid{unixfsFile.Root}, + cfg: verifiedcar.Config{ + Root: unixfsFile.Root, + Selector: unixfsFileRange0_1048576Selector, + }, + }, + { + name: "unixfs: large sharded file byte range [1M:2M]", + blocks: consumedBlocks(unixfsFileRange1048576_2097152Blocks), + roots: []cid.Cid{unixfsFile.Root}, + cfg: verifiedcar.Config{ + Root: unixfsFile.Root, + Selector: unixfsFileRange1048576_2097152Selector, + }, + }, } for _, testCase := range testCases { testCase := testCase t.Run(testCase.name, func(t *testing.T) { + if testCase.skip { + t.Skip() + } t.Parallel() ctx, cancel := context.WithTimeout(ctx, 2*time.Second) @@ -556,12 +692,18 @@ func TestVerifiedCar(t *testing.T) { }, nil } - carStream := makeCarStream(t, ctx, testCase.roots, testCase.blocks, testCase.carv2, testCase.expectErr != "", testCase.incomingHasDups, testCase.streamErr) + carStream, errorCh := makeCarStream(t, ctx, testCase.roots, testCase.blocks, testCase.carv2, testCase.expectErr != "", testCase.incomingHasDups, testCase.streamErr) blockCount, byteCount, err := testCase.cfg.VerifyCar(ctx, carStream, lsys) // read the rest of data io.ReadAll(carStream) + select { + case err := <-errorCh: + req.NoError(err) + default: + } + if testCase.expectErr != "" { req.ErrorContains(err, testCase.expectErr) req.Equal(uint64(0), blockCount) @@ -585,13 +727,12 @@ func makeCarStream( expectErrors bool, allowDuplicatePuts bool, streamError error, -) io.Reader { +) (io.Reader, chan error) { r, w := io.Pipe() + errorCh := make(chan error, 1) go func() { - req := require.New(t) - var carW io.Writer = w var v2f *os.File @@ -600,7 +741,10 @@ func makeCarStream( // can't create a streaming v2 var err error v2f, err = os.CreateTemp(t.TempDir(), "carv2") - req.NoError(err) + if err != nil { + errorCh <- err + return + } t.Cleanup(func() { v2f.Close() os.Remove(v2f.Name()) @@ -609,8 +753,8 @@ func makeCarStream( } carWriter, err := storage.NewWritable(carW, roots, car.WriteAsCarV1(!carv2), car.AllowDuplicatePuts(allowDuplicatePuts)) - req.NoError(err) if err != nil { + errorCh <- err return } for ii, block := range blocks { @@ -619,14 +763,18 @@ func makeCarStream( return } err := carWriter.Put(ctx, block.Cid().KeyString(), block.RawData()) - if !expectErrors { - req.NoError(err) + if !expectErrors && err != nil { + errorCh <- err + return } if ctx.Err() != nil { return } } - req.NoError(carWriter.Finalize()) + if err := carWriter.Finalize(); err != nil { + errorCh <- err + return + } if carv2 { v2f.Seek(0, io.SeekStart) @@ -635,7 +783,9 @@ func makeCarStream( io.Copy(w, v2f) } - req.NoError(w.Close()) + if err := w.Close(); err != nil { + errorCh <- err + } }() go func() { @@ -645,7 +795,7 @@ func makeCarStream( } }() - return r + return r, errorCh } type expectedBlock struct {