Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions dir.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ package stream
import (
"context"
"errors"
"strings"

"io/fs"

Expand Down Expand Up @@ -63,7 +64,7 @@ func (dd *dd[T]) readAll() ([]fs.DirEntry, error) {
req := &s3.ListObjectsV2Input{
Bucket: aws.String(dd.fs.bucket),
MaxKeys: aws.Int32(dd.fs.lslimit),
Prefix: dd.s3Key(),
Prefix: dd.s3Key(dd.fs.root),
}

ctx, cancel := context.WithTimeout(context.Background(), dd.fs.timeout)
Expand All @@ -80,7 +81,9 @@ func (dd *dd[T]) readAll() ([]fs.DirEntry, error) {
}

for _, el := range val.Contents {
seq = append(seq, dd.objectToDirEntry(el))
if !strings.HasSuffix(*el.Key, "/") {
seq = append(seq, dd.objectToDirEntry(el))
}
}

cnt := int(aws.ToInt32(val.KeyCount))
Expand All @@ -97,7 +100,7 @@ func (dd *dd[T]) objectToDirEntry(t types.Object) fs.DirEntry {
// It is assumed by fs.FS implementations (e.g. WalkDir) and also requires
// Name to be basename. It is not convenient for S3 where file system is flat.
path := aws.ToString(t.Key)
path = path[len(dd.path)-1:]
path = path[len(dd.fs.root+dd.path)-1:]

// ETag
// ObjectStorageClass
Expand Down
4 changes: 2 additions & 2 deletions examples/s3fs-walk/s3fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func walk() error {
func read(s3fs fs.FS, path string) error {
fd, err := s3fs.Open(path)
if err != nil {
return err
return fmt.Errorf("open %s has failed %w", path, err)
}
defer fd.Close()

Expand All @@ -63,7 +63,7 @@ func read(s3fs fs.FS, path string) error {

fi, err := fd.Stat()
if err != nil {
return err
return fmt.Errorf("stat %s has failed %w", path, err)
}

output(fi, buf)
Expand Down
19 changes: 10 additions & 9 deletions file.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,14 +44,15 @@ func (f info[T]) Sys() any { return f.attr }
func (f info[T]) Type() fs.FileMode { return f.mode.Type() }
func (f info[T]) Info() (fs.FileInfo, error) { return f, nil }

func (f info[T]) s3Key() *string { return s3Key(f.path) }
func (f info[T]) s3Key(root string) *string { return s3Key(root, f.path) }

func s3Key(path string) *string {
if path[0] == '/' {
return aws.String(path[1:])
func s3Key(root, path string) *string {
file := root + path
if file[0] == '/' {
return aws.String(file[1:])
}

return aws.String(path)
return aws.String(file)
}

//------------------------------------------------------------------------------
Expand Down Expand Up @@ -107,7 +108,7 @@ func (fd *reader[T]) Stat() (fs.FileInfo, error) {
func (fd *reader[T]) lazyOpen() error {
req := &s3.GetObjectInput{
Bucket: aws.String(fd.fs.bucket),
Key: fd.s3Key(),
Key: fd.s3Key(fd.fs.root),
}

ctx, cancel := context.WithTimeout(context.Background(), fd.fs.timeout)
Expand Down Expand Up @@ -136,7 +137,7 @@ func (fd *reader[T]) lazyOpen() error {

fd.fs.codec.DecodeGetOutput(val, fd.info.attr)
if fd.fs.signer != nil && fd.fs.codec.s != nil {
if url, err := fd.fs.preSignGetUrl(fd.s3Key()); err == nil {
if url, err := fd.fs.preSignGetUrl(fd.s3Key(fd.fs.root)); err == nil {
fd.fs.codec.s.Put(fd.info.attr, url)
}
}
Expand Down Expand Up @@ -215,7 +216,7 @@ func (fd *writer[T]) lazyOpen() {

req := &s3.PutObjectInput{
Bucket: aws.String(fd.fs.bucket),
Key: fd.s3Key(),
Key: fd.s3Key(fd.fs.root),
Body: fd.r,
Metadata: make(map[string]string),
}
Expand All @@ -239,7 +240,7 @@ func (fd *writer[T]) preSignPutUrl() (string, error) {

req := &s3.PutObjectInput{
Bucket: aws.String(fd.fs.bucket),
Key: fd.s3Key(),
Key: fd.s3Key(fd.fs.root),
Metadata: make(map[string]string),
}
fd.fs.codec.EncodePutInput(fd.attr, req)
Expand Down
23 changes: 15 additions & 8 deletions filesystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"errors"
"fmt"
"io/fs"
"path/filepath"
"regexp"
"strings"
"time"
Expand All @@ -31,6 +32,7 @@ import (
type FileSystem[T any] struct {
Opts
bucket string
root string
codec *codec[T]
}

Expand All @@ -52,9 +54,14 @@ func New[T any](bucket string, opt ...Option) (*FileSystem[T], error) {
}

fsys := FileSystem[T]{
Opts: optsDefault(),
bucket: bucket,
codec: newCodec[T](),
Opts: optsDefault(),
codec: newCodec[T](),
}

seq := strings.SplitN(bucket, "/", 2)
fsys.bucket, fsys.root = seq[0], ""
if len(seq) > 1 {
fsys.root = filepath.Join("/", seq[1])
}

if err := opts.Apply(&fsys.Opts, opt); err != nil {
Expand Down Expand Up @@ -126,7 +133,7 @@ func (fsys *FileSystem[T]) Stat(path string) (fs.FileInfo, error) {

req := &s3.HeadObjectInput{
Bucket: aws.String(fsys.bucket),
Key: info.s3Key(),
Key: info.s3Key(fsys.root),
}

val, err := fsys.api.HeadObject(ctx, req)
Expand All @@ -149,7 +156,7 @@ func (fsys *FileSystem[T]) Stat(path string) (fs.FileInfo, error) {
fsys.codec.DecodeHeadOutput(val, info.attr)

if fsys.signer != nil && fsys.codec.s != nil {
if url, err := fsys.preSignGetUrl(info.s3Key()); err == nil {
if url, err := fsys.preSignGetUrl(info.s3Key(fsys.root)); err == nil {
fsys.codec.s.Put(info.attr, url)
}
}
Expand Down Expand Up @@ -255,7 +262,7 @@ func (fsys *FileSystem[T]) Remove(path string) error {

req := &s3.DeleteObjectInput{
Bucket: &fsys.bucket,
Key: s3Key(path),
Key: s3Key(fsys.root, path),
}

_, err := fsys.api.DeleteObject(ctx, req)
Expand Down Expand Up @@ -290,7 +297,7 @@ func (fsys *FileSystem[T]) Copy(source, target string) error {

req := &s3.CopyObjectInput{
Bucket: &fsys.bucket,
Key: s3Key(source),
Key: s3Key(fsys.root, source),
CopySource: aws.String(target[5:]),
}

Expand All @@ -316,7 +323,7 @@ func (fsys *FileSystem[T]) Wait(path string, timeout time.Duration) error {

req := &s3.HeadObjectInput{
Bucket: aws.String(fsys.bucket),
Key: s3Key(path),
Key: s3Key(fsys.root, path),
}

err := waiter.Wait(context.Background(), req, timeout)
Expand Down
20 changes: 11 additions & 9 deletions filesystem_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,15 +213,17 @@ var (
)

func TestNew(t *testing.T) {
s3fs, err := stream.NewFS("test",
stream.WithS3(s3GetObject),
stream.WithS3Upload(s3PutObject),
stream.WithS3Signer(s3PresignGetObject),
stream.WithIOTimeout(5*time.Second),
stream.WithPreSignUrlTTL(20*time.Second),
stream.WithListingLimit(1000),
)
it.Then(t).Should(it.Nil(err)).ShouldNot(it.Nil(s3fs))
for _, mnt := range []string{"test", "test/a", "test/a/b"} {
s3fs, err := stream.NewFS(mnt,
stream.WithS3(s3GetObject),
stream.WithS3Upload(s3PutObject),
stream.WithS3Signer(s3PresignGetObject),
stream.WithIOTimeout(5*time.Second),
stream.WithPreSignUrlTTL(20*time.Second),
stream.WithListingLimit(1000),
)
it.Then(t).Should(it.Nil(err)).ShouldNot(it.Nil(s3fs))
}
}

func TestReadWrite(t *testing.T) {
Expand Down
59 changes: 45 additions & 14 deletions spool/spool.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import (
"context"
"io"
"io/fs"
"log/slog"

"github.com/fogfish/opts"
"github.com/fogfish/stream"
)

Expand All @@ -36,24 +36,47 @@ type File = interface {
}

const (
immutable = iota
mutable
)

const (
strict = iota
skiperror
)

var (
withMutability = opts.ForName[Spool, int]("mutable")

// spool is immutable, consumed tasks remains in spool.
Immutable = iota
IsImmutable = withMutability(immutable)
// spool is mutable, consumed tasks are removed
Mutable
IsMutable = withMutability(mutable)

withStrict = opts.ForName[Spool, int]("strict")

// spool fails on error
WithStrict = withStrict(strict)
// spool skips error, continue as warning
WithSkipError = withStrict(skiperror)
)

type Spool struct {
reader FileSystem
writer FileSystem
mode int
reader FileSystem
writer FileSystem
mutable int
strict int
}

func New(reader, writer FileSystem, mode int) *Spool {
return &Spool{
func New(reader, writer FileSystem, opt ...opts.Option[Spool]) *Spool {
s := &Spool{
reader: reader,
writer: writer,
mode: mode,
}

opts.Apply(s, opt)

return s
}

// Write new file to spool
Expand All @@ -65,6 +88,14 @@ func (spool *Spool) WriteFile(path string, b []byte) error {
return spool.Write(path, bytes.NewBuffer(b))
}

func (spool *Spool) iserr(err error) error {
if spool.strict == skiperror {
return nil
}

return err
}

// Apply the spool function over each file in the reader filesystem, producing
// results to writer file system.
func (spool *Spool) ForEach(
Expand All @@ -84,13 +115,13 @@ func (spool *Spool) ForEach(

fd, err := spool.reader.Open(path)
if err != nil {
return err
return spool.iserr(err)
}
defer fd.Close()

dd, err := f(ctx, path, fd)
if err != nil {
return err
return spool.iserr(err)
}
if dd == nil {
return nil
Expand All @@ -99,13 +130,13 @@ func (spool *Spool) ForEach(

err = spool.write(spool.writer, path, dd)
if err != nil {
return err
return spool.iserr(err)
}

if spool.mode == Mutable {
if spool.mutable == mutable {
err = spool.reader.Remove(path)
if err != nil {
slog.Warn("unable to remove the file", "err", err)
return spool.iserr(err)
}
}

Expand Down
2 changes: 1 addition & 1 deletion spool/spool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ func TestSpool(t *testing.T) {
to, err := lfs.NewTempFS(os.TempDir(), "to")
it.Then(t).Must(it.Nil(err))

qq := spool.New(in, to, spool.Mutable)
qq := spool.New(in, to)

seq := []string{"/a", "/b", "/c", "/d", "/e", "/f"}
for _, txt := range seq {
Expand Down
Loading