Skip to content

Commit

Permalink
fix: fr32: Make UnpadReader be a correct Reader
Browse files Browse the repository at this point in the history
  • Loading branch information
magik6k committed Sep 19, 2024
1 parent 3937142 commit 2883d4f
Show file tree
Hide file tree
Showing 2 changed files with 173 additions and 0 deletions.
40 changes: 40 additions & 0 deletions storage/sealer/fr32/readers.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"io"
"math/bits"

pool "github.com/libp2p/go-buffer-pool"
"golang.org/x/xerrors"

"github.com/filecoin-project/go-state-types/abi"
Expand All @@ -14,6 +15,8 @@ type unpadReader struct {

left uint64
work []byte

stash []byte
}

func BufSize(sz abi.PaddedPieceSize) int {
Expand All @@ -31,6 +34,10 @@ func NewUnpadReaderBuf(src io.Reader, sz abi.PaddedPieceSize, buf []byte) (io.Re
return nil, xerrors.Errorf("bad piece size: %w", err)
}

if abi.PaddedPieceSize(len(buf)).Validate() != nil {
return nil, xerrors.Errorf("bad buffer size")
}

return &unpadReader{
src: src,

Expand All @@ -40,6 +47,39 @@ func NewUnpadReaderBuf(src io.Reader, sz abi.PaddedPieceSize, buf []byte) (io.Re
}

func (r *unpadReader) Read(out []byte) (int, error) {
idealReadSize := abi.PaddedPieceSize(len(r.work)).Unpadded()

var err error
var rn int
if len(r.stash) == 0 && len(out) < int(idealReadSize) {
r.stash = pool.Get(int(idealReadSize))

rn, err = r.readInner(r.stash)
r.stash = r.stash[:rn]
}

if len(r.stash) > 0 {
n := copy(out, r.stash)
r.stash = r.stash[n:]

if len(r.stash) == 0 {
pool.Put(r.stash)
r.stash = nil
}

if err == io.EOF && rn > n {
err = nil
}

return n, err
}

return r.readInner(out)
}

// readInner reads from the underlying reader into the provided buffer.
// It requires that out[] is padded(power-of-two).unpadded()-sized, ideally quite large.
func (r *unpadReader) readInner(out []byte) (int, error) {
if r.left == 0 {
return 0, io.EOF
}
Expand Down
133 changes: 133 additions & 0 deletions storage/sealer/fr32/readers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package fr32_test
import (
"bufio"
"bytes"
"crypto/rand"
"fmt"
"io"
"testing"

Expand Down Expand Up @@ -34,3 +36,134 @@ func TestUnpadReader(t *testing.T) {

require.Equal(t, raw, readered)
}

func TestPadWriterUnpadReader(t *testing.T) {
testCases := []struct {
name string
unpadSize abi.UnpaddedPieceSize
readSizes []int
}{
{
name: "2K with aligned reads",
unpadSize: 2 * 127 * 8, // 2K unpadded
readSizes: []int{127, 127 * 4},
},
{
name: "Small piece, various read sizes",
unpadSize: 2 * 127 * 8, // 1016 bytes unpadded
readSizes: []int{1, 63, 127, 128, 255, 1016},
},
{
name: "Medium piece, various read sizes",
unpadSize: 127 * 1024, // 128KB unpadded
readSizes: []int{1, 127, 128, 255, 1024, 4096, 65536},
},
{
name: "Large piece, various read sizes",
unpadSize: 127 * 1024 * 1024, // 128MB unpadded
readSizes: []int{1, 127, 128, 255, 1024, 4096, 65536, 10 << 20, 11 << 20, 11<<20 + 134},
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Generate random unpadded data
unpadded := make([]byte, tc.unpadSize)
rand.Read(unpadded)

Check failure on line 72 in storage/sealer/fr32/readers_test.go

View workflow job for this annotation

GitHub Actions / Check (lint-all)

Error return value of `rand.Read` is not checked (errcheck)

// Create a buffer to store padded data
paddedBuf := new(bytes.Buffer)

// Create and use PadWriter
padWriter := fr32.NewPadWriter(paddedBuf)
written, err := padWriter.Write(unpadded)
require.NoError(t, err)
require.Equal(t, int(tc.unpadSize), written)
require.NoError(t, padWriter.Close())

// Create UnpadReader
paddedSize := tc.unpadSize.Padded()
unpadReader, err := fr32.NewUnpadReader(bytes.NewReader(paddedBuf.Bytes()), paddedSize)
require.NoError(t, err)

offset := int64(0)
for _, readSize := range tc.readSizes {
t.Run(fmt.Sprintf("ReadSize_%d_Offset_%d", readSize, offset), func(t *testing.T) {
// Seek to offset
require.NoError(t, err)

// Read data
readBuf := make([]byte, readSize)
n, err := io.ReadFull(unpadReader, readBuf)
require.NoError(t, err)
require.Equal(t, readSize, n)

// Compare with original unpadded data
expected := unpadded[offset : offset+int64(len(readBuf))]
require.Equal(t, expected, readBuf)
offset += int64(n)
})
}
})
}
}

func TestUnpadReaderSmallReads(t *testing.T) {
unpadSize := abi.UnpaddedPieceSize(127 * 1024) // 128KB unpadded
unpadded := make([]byte, unpadSize)
rand.Read(unpadded)

Check failure on line 114 in storage/sealer/fr32/readers_test.go

View workflow job for this annotation

GitHub Actions / Check (lint-all)

Error return value of `rand.Read` is not checked (errcheck)

paddedBuf := new(bytes.Buffer)
padWriter := fr32.NewPadWriter(paddedBuf)
_, err := padWriter.Write(unpadded)
require.NoError(t, err)
require.NoError(t, padWriter.Close())

paddedSize := unpadSize.Padded()
unpadReader, err := fr32.NewUnpadReader(bytes.NewReader(paddedBuf.Bytes()), paddedSize)
require.NoError(t, err)

result := make([]byte, 0, unpadSize)
smallBuf := make([]byte, 1) // Read one byte at a time

for {
n, err := unpadReader.Read(smallBuf)
if err == io.EOF {
break
}
require.NoError(t, err)
result = append(result, smallBuf[:n]...)
}

require.Equal(t, unpadded, result)
}

func TestUnpadReaderLargeReads(t *testing.T) {
unpadSize := abi.UnpaddedPieceSize(127 * 1024 * 1024) // 128MB unpadded
unpadded := make([]byte, unpadSize)
rand.Read(unpadded)

Check failure on line 144 in storage/sealer/fr32/readers_test.go

View workflow job for this annotation

GitHub Actions / Check (lint-all)

Error return value of `rand.Read` is not checked (errcheck)

paddedBuf := new(bytes.Buffer)
padWriter := fr32.NewPadWriter(paddedBuf)
_, err := padWriter.Write(unpadded)
require.NoError(t, err)
require.NoError(t, padWriter.Close())

paddedSize := unpadSize.Padded()
unpadReader, err := fr32.NewUnpadReader(bytes.NewReader(paddedBuf.Bytes()), paddedSize)
require.NoError(t, err)

largeBuf := make([]byte, 10*1024*1024) // 10MB buffer
result := make([]byte, 0, unpadSize)

for {
n, err := unpadReader.Read(largeBuf)
if err == io.EOF {
break
}
require.NoError(t, err)
result = append(result, largeBuf[:n]...)
}

require.Equal(t, unpadded, result)
}

0 comments on commit 2883d4f

Please sign in to comment.