diff --git a/storage/sealer/fr32/readers.go b/storage/sealer/fr32/readers.go index 163c520aa4d..cf14f1e98e8 100644 --- a/storage/sealer/fr32/readers.go +++ b/storage/sealer/fr32/readers.go @@ -4,6 +4,7 @@ import ( "io" "math/bits" + pool "github.com/libp2p/go-buffer-pool" "golang.org/x/xerrors" "github.com/filecoin-project/go-state-types/abi" @@ -14,6 +15,8 @@ type unpadReader struct { left uint64 work []byte + + stash []byte } func BufSize(sz abi.PaddedPieceSize) int { @@ -31,6 +34,10 @@ func NewUnpadReaderBuf(src io.Reader, sz abi.PaddedPieceSize, buf []byte) (io.Re return nil, xerrors.Errorf("bad piece size: %w", err) } + if abi.PaddedPieceSize(len(buf)).Validate() != nil { + return nil, xerrors.Errorf("bad buffer size") + } + return &unpadReader{ src: src, @@ -40,6 +47,39 @@ func NewUnpadReaderBuf(src io.Reader, sz abi.PaddedPieceSize, buf []byte) (io.Re } func (r *unpadReader) Read(out []byte) (int, error) { + idealReadSize := abi.PaddedPieceSize(len(r.work)).Unpadded() + + var err error + var rn int + if len(r.stash) == 0 && len(out) < int(idealReadSize) { + r.stash = pool.Get(int(idealReadSize)) + + rn, err = r.readInner(r.stash) + r.stash = r.stash[:rn] + } + + if len(r.stash) > 0 { + n := copy(out, r.stash) + r.stash = r.stash[n:] + + if len(r.stash) == 0 { + pool.Put(r.stash) + r.stash = nil + } + + if err == io.EOF && rn > n { + err = nil + } + + return n, err + } + + return r.readInner(out) +} + +// readInner reads from the underlying reader into the provided buffer. +// It requires that out[] is padded(power-of-two).unpadded()-sized, ideally quite large. +func (r *unpadReader) readInner(out []byte) (int, error) { if r.left == 0 { return 0, io.EOF } diff --git a/storage/sealer/fr32/readers_test.go b/storage/sealer/fr32/readers_test.go index f84b9d67a2a..a4493518137 100644 --- a/storage/sealer/fr32/readers_test.go +++ b/storage/sealer/fr32/readers_test.go @@ -3,6 +3,8 @@ package fr32_test import ( "bufio" "bytes" + "crypto/rand" + "fmt" "io" "testing" @@ -34,3 +36,134 @@ func TestUnpadReader(t *testing.T) { require.Equal(t, raw, readered) } + +func TestPadWriterUnpadReader(t *testing.T) { + testCases := []struct { + name string + unpadSize abi.UnpaddedPieceSize + readSizes []int + }{ + { + name: "2K with aligned reads", + unpadSize: 2 * 127 * 8, // 2K unpadded + readSizes: []int{127, 127 * 4}, + }, + { + name: "Small piece, various read sizes", + unpadSize: 2 * 127 * 8, // 1016 bytes unpadded + readSizes: []int{1, 63, 127, 128, 255, 1016}, + }, + { + name: "Medium piece, various read sizes", + unpadSize: 127 * 1024, // 128KB unpadded + readSizes: []int{1, 127, 128, 255, 1024, 4096, 65536}, + }, + { + name: "Large piece, various read sizes", + unpadSize: 127 * 1024 * 1024, // 128MB unpadded + readSizes: []int{1, 127, 128, 255, 1024, 4096, 65536, 10 << 20, 11 << 20, 11<<20 + 134}, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + // Generate random unpadded data + unpadded := make([]byte, tc.unpadSize) + rand.Read(unpadded) + + // Create a buffer to store padded data + paddedBuf := new(bytes.Buffer) + + // Create and use PadWriter + padWriter := fr32.NewPadWriter(paddedBuf) + written, err := padWriter.Write(unpadded) + require.NoError(t, err) + require.Equal(t, int(tc.unpadSize), written) + require.NoError(t, padWriter.Close()) + + // Create UnpadReader + paddedSize := tc.unpadSize.Padded() + unpadReader, err := fr32.NewUnpadReader(bytes.NewReader(paddedBuf.Bytes()), paddedSize) + require.NoError(t, err) + + offset := int64(0) + for _, readSize := range tc.readSizes { + t.Run(fmt.Sprintf("ReadSize_%d_Offset_%d", readSize, offset), func(t *testing.T) { + // Seek to offset + require.NoError(t, err) + + // Read data + readBuf := make([]byte, readSize) + n, err := io.ReadFull(unpadReader, readBuf) + require.NoError(t, err) + require.Equal(t, readSize, n) + + // Compare with original unpadded data + expected := unpadded[offset : offset+int64(len(readBuf))] + require.Equal(t, expected, readBuf) + offset += int64(n) + }) + } + }) + } +} + +func TestUnpadReaderSmallReads(t *testing.T) { + unpadSize := abi.UnpaddedPieceSize(127 * 1024) // 128KB unpadded + unpadded := make([]byte, unpadSize) + rand.Read(unpadded) + + paddedBuf := new(bytes.Buffer) + padWriter := fr32.NewPadWriter(paddedBuf) + _, err := padWriter.Write(unpadded) + require.NoError(t, err) + require.NoError(t, padWriter.Close()) + + paddedSize := unpadSize.Padded() + unpadReader, err := fr32.NewUnpadReader(bytes.NewReader(paddedBuf.Bytes()), paddedSize) + require.NoError(t, err) + + result := make([]byte, 0, unpadSize) + smallBuf := make([]byte, 1) // Read one byte at a time + + for { + n, err := unpadReader.Read(smallBuf) + if err == io.EOF { + break + } + require.NoError(t, err) + result = append(result, smallBuf[:n]...) + } + + require.Equal(t, unpadded, result) +} + +func TestUnpadReaderLargeReads(t *testing.T) { + unpadSize := abi.UnpaddedPieceSize(127 * 1024 * 1024) // 128MB unpadded + unpadded := make([]byte, unpadSize) + rand.Read(unpadded) + + paddedBuf := new(bytes.Buffer) + padWriter := fr32.NewPadWriter(paddedBuf) + _, err := padWriter.Write(unpadded) + require.NoError(t, err) + require.NoError(t, padWriter.Close()) + + paddedSize := unpadSize.Padded() + unpadReader, err := fr32.NewUnpadReader(bytes.NewReader(paddedBuf.Bytes()), paddedSize) + require.NoError(t, err) + + largeBuf := make([]byte, 10*1024*1024) // 10MB buffer + result := make([]byte, 0, unpadSize) + + for { + n, err := unpadReader.Read(largeBuf) + if err == io.EOF { + break + } + require.NoError(t, err) + result = append(result, largeBuf[:n]...) + } + + require.Equal(t, unpadded, result) +}