diff --git a/dir.go b/dir.go index 42f22d6..f46f0ca 100644 --- a/dir.go +++ b/dir.go @@ -11,6 +11,7 @@ package stream import ( "context" "errors" + "strings" "io/fs" @@ -63,7 +64,7 @@ func (dd *dd[T]) readAll() ([]fs.DirEntry, error) { req := &s3.ListObjectsV2Input{ Bucket: aws.String(dd.fs.bucket), MaxKeys: aws.Int32(dd.fs.lslimit), - Prefix: dd.s3Key(), + Prefix: dd.s3Key(dd.fs.root), } ctx, cancel := context.WithTimeout(context.Background(), dd.fs.timeout) @@ -80,7 +81,9 @@ func (dd *dd[T]) readAll() ([]fs.DirEntry, error) { } for _, el := range val.Contents { - seq = append(seq, dd.objectToDirEntry(el)) + if !strings.HasSuffix(*el.Key, "/") { + seq = append(seq, dd.objectToDirEntry(el)) + } } cnt := int(aws.ToInt32(val.KeyCount)) @@ -97,7 +100,7 @@ func (dd *dd[T]) objectToDirEntry(t types.Object) fs.DirEntry { // It is assumed by fs.FS implementations (e.g. WalkDir) and also requires // Name to be basename. It is not convenient for S3 where file system is flat. path := aws.ToString(t.Key) - path = path[len(dd.path)-1:] + path = path[len(dd.fs.root+dd.path)-1:] // ETag // ObjectStorageClass diff --git a/examples/s3fs-walk/s3fs.go b/examples/s3fs-walk/s3fs.go index 783a7fb..0de6487 100644 --- a/examples/s3fs-walk/s3fs.go +++ b/examples/s3fs-walk/s3fs.go @@ -52,7 +52,7 @@ func walk() error { func read(s3fs fs.FS, path string) error { fd, err := s3fs.Open(path) if err != nil { - return err + return fmt.Errorf("open %s has failed %w", path, err) } defer fd.Close() @@ -63,7 +63,7 @@ func read(s3fs fs.FS, path string) error { fi, err := fd.Stat() if err != nil { - return err + return fmt.Errorf("stat %s has failed %w", path, err) } output(fi, buf) diff --git a/file.go b/file.go index bb7189e..c9832b9 100644 --- a/file.go +++ b/file.go @@ -44,14 +44,15 @@ func (f info[T]) Sys() any { return f.attr } func (f info[T]) Type() fs.FileMode { return f.mode.Type() } func (f info[T]) Info() (fs.FileInfo, error) { return f, nil } -func (f info[T]) s3Key() *string { return s3Key(f.path) } +func (f info[T]) s3Key(root string) *string { return s3Key(root, f.path) } -func s3Key(path string) *string { - if path[0] == '/' { - return aws.String(path[1:]) +func s3Key(root, path string) *string { + file := root + path + if file[0] == '/' { + return aws.String(file[1:]) } - return aws.String(path) + return aws.String(file) } //------------------------------------------------------------------------------ @@ -107,7 +108,7 @@ func (fd *reader[T]) Stat() (fs.FileInfo, error) { func (fd *reader[T]) lazyOpen() error { req := &s3.GetObjectInput{ Bucket: aws.String(fd.fs.bucket), - Key: fd.s3Key(), + Key: fd.s3Key(fd.fs.root), } ctx, cancel := context.WithTimeout(context.Background(), fd.fs.timeout) @@ -136,7 +137,7 @@ func (fd *reader[T]) lazyOpen() error { fd.fs.codec.DecodeGetOutput(val, fd.info.attr) if fd.fs.signer != nil && fd.fs.codec.s != nil { - if url, err := fd.fs.preSignGetUrl(fd.s3Key()); err == nil { + if url, err := fd.fs.preSignGetUrl(fd.s3Key(fd.fs.root)); err == nil { fd.fs.codec.s.Put(fd.info.attr, url) } } @@ -215,7 +216,7 @@ func (fd *writer[T]) lazyOpen() { req := &s3.PutObjectInput{ Bucket: aws.String(fd.fs.bucket), - Key: fd.s3Key(), + Key: fd.s3Key(fd.fs.root), Body: fd.r, Metadata: make(map[string]string), } @@ -239,7 +240,7 @@ func (fd *writer[T]) preSignPutUrl() (string, error) { req := &s3.PutObjectInput{ Bucket: aws.String(fd.fs.bucket), - Key: fd.s3Key(), + Key: fd.s3Key(fd.fs.root), Metadata: make(map[string]string), } fd.fs.codec.EncodePutInput(fd.attr, req) diff --git a/filesystem.go b/filesystem.go index 57b9e82..fe794c4 100644 --- a/filesystem.go +++ b/filesystem.go @@ -18,6 +18,7 @@ import ( "errors" "fmt" "io/fs" + "path/filepath" "regexp" "strings" "time" @@ -31,6 +32,7 @@ import ( type FileSystem[T any] struct { Opts bucket string + root string codec *codec[T] } @@ -52,9 +54,14 @@ func New[T any](bucket string, opt ...Option) (*FileSystem[T], error) { } fsys := FileSystem[T]{ - Opts: optsDefault(), - bucket: bucket, - codec: newCodec[T](), + Opts: optsDefault(), + codec: newCodec[T](), + } + + seq := strings.SplitN(bucket, "/", 2) + fsys.bucket, fsys.root = seq[0], "" + if len(seq) > 1 { + fsys.root = filepath.Join("/", seq[1]) } if err := opts.Apply(&fsys.Opts, opt); err != nil { @@ -126,7 +133,7 @@ func (fsys *FileSystem[T]) Stat(path string) (fs.FileInfo, error) { req := &s3.HeadObjectInput{ Bucket: aws.String(fsys.bucket), - Key: info.s3Key(), + Key: info.s3Key(fsys.root), } val, err := fsys.api.HeadObject(ctx, req) @@ -149,7 +156,7 @@ func (fsys *FileSystem[T]) Stat(path string) (fs.FileInfo, error) { fsys.codec.DecodeHeadOutput(val, info.attr) if fsys.signer != nil && fsys.codec.s != nil { - if url, err := fsys.preSignGetUrl(info.s3Key()); err == nil { + if url, err := fsys.preSignGetUrl(info.s3Key(fsys.root)); err == nil { fsys.codec.s.Put(info.attr, url) } } @@ -255,7 +262,7 @@ func (fsys *FileSystem[T]) Remove(path string) error { req := &s3.DeleteObjectInput{ Bucket: &fsys.bucket, - Key: s3Key(path), + Key: s3Key(fsys.root, path), } _, err := fsys.api.DeleteObject(ctx, req) @@ -290,7 +297,7 @@ func (fsys *FileSystem[T]) Copy(source, target string) error { req := &s3.CopyObjectInput{ Bucket: &fsys.bucket, - Key: s3Key(source), + Key: s3Key(fsys.root, source), CopySource: aws.String(target[5:]), } @@ -316,7 +323,7 @@ func (fsys *FileSystem[T]) Wait(path string, timeout time.Duration) error { req := &s3.HeadObjectInput{ Bucket: aws.String(fsys.bucket), - Key: s3Key(path), + Key: s3Key(fsys.root, path), } err := waiter.Wait(context.Background(), req, timeout) diff --git a/filesystem_test.go b/filesystem_test.go index 88ebb84..ae14e30 100644 --- a/filesystem_test.go +++ b/filesystem_test.go @@ -213,15 +213,17 @@ var ( ) func TestNew(t *testing.T) { - s3fs, err := stream.NewFS("test", - stream.WithS3(s3GetObject), - stream.WithS3Upload(s3PutObject), - stream.WithS3Signer(s3PresignGetObject), - stream.WithIOTimeout(5*time.Second), - stream.WithPreSignUrlTTL(20*time.Second), - stream.WithListingLimit(1000), - ) - it.Then(t).Should(it.Nil(err)).ShouldNot(it.Nil(s3fs)) + for _, mnt := range []string{"test", "test/a", "test/a/b"} { + s3fs, err := stream.NewFS(mnt, + stream.WithS3(s3GetObject), + stream.WithS3Upload(s3PutObject), + stream.WithS3Signer(s3PresignGetObject), + stream.WithIOTimeout(5*time.Second), + stream.WithPreSignUrlTTL(20*time.Second), + stream.WithListingLimit(1000), + ) + it.Then(t).Should(it.Nil(err)).ShouldNot(it.Nil(s3fs)) + } } func TestReadWrite(t *testing.T) { diff --git a/spool/spool.go b/spool/spool.go index 33a90d0..b8c302c 100644 --- a/spool/spool.go +++ b/spool/spool.go @@ -13,8 +13,8 @@ import ( "context" "io" "io/fs" - "log/slog" + "github.com/fogfish/opts" "github.com/fogfish/stream" ) @@ -36,24 +36,47 @@ type File = interface { } const ( + immutable = iota + mutable +) + +const ( + strict = iota + skiperror +) + +var ( + withMutability = opts.ForName[Spool, int]("mutable") + // spool is immutable, consumed tasks remains in spool. - Immutable = iota + IsImmutable = withMutability(immutable) // spool is mutable, consumed tasks are removed - Mutable + IsMutable = withMutability(mutable) + + withStrict = opts.ForName[Spool, int]("strict") + + // spool fails on error + WithStrict = withStrict(strict) + // spool skips error, continue as warning + WithSkipError = withStrict(skiperror) ) type Spool struct { - reader FileSystem - writer FileSystem - mode int + reader FileSystem + writer FileSystem + mutable int + strict int } -func New(reader, writer FileSystem, mode int) *Spool { - return &Spool{ +func New(reader, writer FileSystem, opt ...opts.Option[Spool]) *Spool { + s := &Spool{ reader: reader, writer: writer, - mode: mode, } + + opts.Apply(s, opt) + + return s } // Write new file to spool @@ -65,6 +88,14 @@ func (spool *Spool) WriteFile(path string, b []byte) error { return spool.Write(path, bytes.NewBuffer(b)) } +func (spool *Spool) iserr(err error) error { + if spool.strict == skiperror { + return nil + } + + return err +} + // Apply the spool function over each file in the reader filesystem, producing // results to writer file system. func (spool *Spool) ForEach( @@ -84,13 +115,13 @@ func (spool *Spool) ForEach( fd, err := spool.reader.Open(path) if err != nil { - return err + return spool.iserr(err) } defer fd.Close() dd, err := f(ctx, path, fd) if err != nil { - return err + return spool.iserr(err) } if dd == nil { return nil @@ -99,13 +130,13 @@ func (spool *Spool) ForEach( err = spool.write(spool.writer, path, dd) if err != nil { - return err + return spool.iserr(err) } - if spool.mode == Mutable { + if spool.mutable == mutable { err = spool.reader.Remove(path) if err != nil { - slog.Warn("unable to remove the file", "err", err) + return spool.iserr(err) } } diff --git a/spool/spool_test.go b/spool/spool_test.go index c74184d..ad503af 100644 --- a/spool/spool_test.go +++ b/spool/spool_test.go @@ -25,7 +25,7 @@ func TestSpool(t *testing.T) { to, err := lfs.NewTempFS(os.TempDir(), "to") it.Then(t).Must(it.Nil(err)) - qq := spool.New(in, to, spool.Mutable) + qq := spool.New(in, to) seq := []string{"/a", "/b", "/c", "/d", "/e", "/f"} for _, txt := range seq {