From 8508f0bd6bb30b1eebeb1f0e725e23f6aee0ea81 Mon Sep 17 00:00:00 2001 From: Dmitry Kolesnikov Date: Sun, 6 Apr 2025 17:10:08 +0300 Subject: [PATCH 1/2] file system spool (queue) utility --- README.md | 9 +++ lfs/filesystem.go | 2 +- spool/spool.go | 163 ++++++++++++++++++++++++++++++++++++++++++++ spool/spool_test.go | 47 +++++++++++++ 4 files changed, 220 insertions(+), 1 deletion(-) create mode 100644 spool/spool.go create mode 100644 spool/spool_test.go diff --git a/README.md b/README.md index 4abdadd..0c58027 100644 --- a/README.md +++ b/README.md @@ -85,6 +85,7 @@ go get -u github.com/fogfish/stream - [Presigned Urls](#presigned-urls) - [Error handling](#error-handling) - [Local file system](#local-file-system) + - [Spool - File System Queue](#spool---file-system-queue) - [How To Contribute](#how-to-contribute) - [commit message](#commit-message) - [bugs](#bugs) @@ -342,6 +343,14 @@ import "github.com/fogfish/stream/lfs" fs, err := lfs.New("/path/to/root") ``` +### Spool - File System Queue + +The library implements a convinient utility to implement Linux-like `spool` over the mounted file systems. The spooling is transparently done either over local file system or AWS S3. + +```go + +``` + ## How To Contribute The library is [MIT](LICENSE) licensed and accepts contributions via GitHub pull requests: diff --git a/lfs/filesystem.go b/lfs/filesystem.go index 04e27ef..ff576be 100644 --- a/lfs/filesystem.go +++ b/lfs/filesystem.go @@ -101,7 +101,7 @@ func (fsys *FileSystem) osCreate(ctx, path string) (stream.File, error) { type nopCanceler struct{ *os.File } // Cancel effect of file i/o -func (nopCanceler) Cancel() error { return fmt.Errorf("not supported") } +func (f nopCanceler) Cancel() error { return f.Close() } // To open the file for reading use `Open` function giving the absolute path // starting with `/`, the returned file descriptor is a composite of diff --git a/spool/spool.go b/spool/spool.go new file mode 100644 index 0000000..33a90d0 --- /dev/null +++ b/spool/spool.go @@ -0,0 +1,163 @@ +// +// Copyright (C) 2020 - 2025 Dmitry Kolesnikov +// +// This file may be modified and distributed under the terms +// of the MIT license. See the LICENSE file for details. +// https://github.com/fogfish/stream +// + +package spool + +import ( + "bytes" + "context" + "io" + "io/fs" + "log/slog" + + "github.com/fogfish/stream" +) + +// A FileSystem provides access to a hierarchical file system. +// The abstraction support I/O to local file system or AWS S3. +// Use it with https://github.com/fogfish/stream +type FileSystem interface { + fs.FS + Create(path string, attr *struct{}) (File, error) + Remove(path string) error +} + +// File provides I/O access to individual object on the file system. +type File = interface { + io.Writer + io.Closer + Stat() (fs.FileInfo, error) + Cancel() error +} + +const ( + // spool is immutable, consumed tasks remains in spool. + Immutable = iota + // spool is mutable, consumed tasks are removed + Mutable +) + +type Spool struct { + reader FileSystem + writer FileSystem + mode int +} + +func New(reader, writer FileSystem, mode int) *Spool { + return &Spool{ + reader: reader, + writer: writer, + mode: mode, + } +} + +// Write new file to spool +func (spool *Spool) Write(path string, r io.Reader) error { + return spool.write(spool.reader, path, r) +} + +func (spool *Spool) WriteFile(path string, b []byte) error { + return spool.Write(path, bytes.NewBuffer(b)) +} + +// Apply the spool function over each file in the reader filesystem, producing +// results to writer file system. +func (spool *Spool) ForEach( + ctx context.Context, + dir string, + f func(context.Context, string, io.Reader) (io.ReadCloser, error), +) error { + return fs.WalkDir(spool.reader, dir, + func(path string, d fs.DirEntry, err error) error { + if err != nil { + return err + } + + if d.IsDir() { + return nil + } + + fd, err := spool.reader.Open(path) + if err != nil { + return err + } + defer fd.Close() + + dd, err := f(ctx, path, fd) + if err != nil { + return err + } + if dd == nil { + return nil + } + defer dd.Close() + + err = spool.write(spool.writer, path, dd) + if err != nil { + return err + } + + if spool.mode == Mutable { + err = spool.reader.Remove(path) + if err != nil { + slog.Warn("unable to remove the file", "err", err) + } + } + + return nil + }, + ) +} + +// Apply the spool function over each file in the reader filesystem, producing +// results to writer file system. It is a variant of [ForEach] that used bytes slices. +func (spool *Spool) ForEachFile( + ctx context.Context, + dir string, + f func(context.Context, string, []byte) ([]byte, error), +) error { + return spool.ForEach(ctx, dir, + func(ctx context.Context, path string, r io.Reader) (io.ReadCloser, error) { + in, err := io.ReadAll(r) + if err != nil { + return nil, err + } + + b, err := f(ctx, path, in) + if err != nil { + return nil, err + } + if len(b) == 0 { + return nil, nil + } + + return io.NopCloser(bytes.NewBuffer(b)), nil + }, + ) +} + +func (spool *Spool) write(fs stream.CreateFS[struct{}], path string, r io.Reader) error { + fd, err := fs.Create(path, nil) + if err != nil { + return err + } + + _, err = io.Copy(fd, r) + if err != nil { + fd.Cancel() + return err + } + + err = fd.Close() + if err != nil { + fd.Cancel() + return err + } + + return nil +} diff --git a/spool/spool_test.go b/spool/spool_test.go new file mode 100644 index 0000000..c74184d --- /dev/null +++ b/spool/spool_test.go @@ -0,0 +1,47 @@ +// +// Copyright (C) 2020 - 2025 Dmitry Kolesnikov +// +// This file may be modified and distributed under the terms +// of the MIT license. See the LICENSE file for details. +// https://github.com/fogfish/stream +// + +package spool_test + +import ( + "context" + "os" + "testing" + + "github.com/fogfish/it/v2" + "github.com/fogfish/stream/lfs" + "github.com/fogfish/stream/spool" +) + +func TestSpool(t *testing.T) { + in, err := lfs.NewTempFS(os.TempDir(), "in") + it.Then(t).Must(it.Nil(err)) + + to, err := lfs.NewTempFS(os.TempDir(), "to") + it.Then(t).Must(it.Nil(err)) + + qq := spool.New(in, to, spool.Mutable) + + seq := []string{"/a", "/b", "/c", "/d", "/e", "/f"} + for _, txt := range seq { + err := qq.WriteFile(txt, []byte(txt)) + it.Then(t).Must(it.Nil(err)) + } + + dat := []string{} + qq.ForEachFile(context.Background(), "/", + func(ctx context.Context, path string, b []byte) ([]byte, error) { + dat = append(dat, path) + return b, nil + }, + ) + + it.Then(t).Should( + it.Seq(seq).Equal(dat...), + ) +} From e141efca82cf04fade1c1af065a2eaa7ab72ad86 Mon Sep 17 00:00:00 2001 From: Dmitry Kolesnikov Date: Sun, 6 Apr 2025 17:13:52 +0300 Subject: [PATCH 2/2] add spool example --- README.md | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/README.md b/README.md index 0c58027..9718331 100644 --- a/README.md +++ b/README.md @@ -348,7 +348,17 @@ fs, err := lfs.New("/path/to/root") The library implements a convinient utility to implement Linux-like `spool` over the mounted file systems. The spooling is transparently done either over local file system or AWS S3. ```go +import "github.com/fogfish/stream/spool" +s, err := lfs.New("/path/to/source") +t, err := lfs.New("/path/to/target") + +q := spool.New(s, t, spool.Mutable) +q.ForEachFile(context.Background(), "/", + func(ctx context.Context, path string, b []byte) ([]byte, error) { + return b, nil + }, +) ``` ## How To Contribute