Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 19 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ go get -u github.com/fogfish/stream
- [Presigned Urls](#presigned-urls)
- [Error handling](#error-handling)
- [Local file system](#local-file-system)
- [Spool - File System Queue](#spool---file-system-queue)
- [How To Contribute](#how-to-contribute)
- [commit message](#commit-message)
- [bugs](#bugs)
Expand Down Expand Up @@ -342,6 +343,24 @@ import "github.com/fogfish/stream/lfs"
fs, err := lfs.New("/path/to/root")
```

### Spool - File System Queue

The library implements a convinient utility to implement Linux-like `spool` over the mounted file systems. The spooling is transparently done either over local file system or AWS S3.

```go
import "github.com/fogfish/stream/spool"

s, err := lfs.New("/path/to/source")
t, err := lfs.New("/path/to/target")

q := spool.New(s, t, spool.Mutable)
q.ForEachFile(context.Background(), "/",
func(ctx context.Context, path string, b []byte) ([]byte, error) {
return b, nil
},
)
```

## How To Contribute

The library is [MIT](LICENSE) licensed and accepts contributions via GitHub pull requests:
Expand Down
2 changes: 1 addition & 1 deletion lfs/filesystem.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ func (fsys *FileSystem) osCreate(ctx, path string) (stream.File, error) {
type nopCanceler struct{ *os.File }

// Cancel effect of file i/o
func (nopCanceler) Cancel() error { return fmt.Errorf("not supported") }
func (f nopCanceler) Cancel() error { return f.Close() }

// To open the file for reading use `Open` function giving the absolute path
// starting with `/`, the returned file descriptor is a composite of
Expand Down
163 changes: 163 additions & 0 deletions spool/spool.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
//
// Copyright (C) 2020 - 2025 Dmitry Kolesnikov
//
// This file may be modified and distributed under the terms
// of the MIT license. See the LICENSE file for details.
// https://github.com/fogfish/stream
//

package spool

import (
"bytes"
"context"
"io"
"io/fs"
"log/slog"

"github.com/fogfish/stream"
)

// A FileSystem provides access to a hierarchical file system.
// The abstraction support I/O to local file system or AWS S3.
// Use it with https://github.com/fogfish/stream
type FileSystem interface {
fs.FS
Create(path string, attr *struct{}) (File, error)
Remove(path string) error
}

// File provides I/O access to individual object on the file system.
type File = interface {
io.Writer
io.Closer
Stat() (fs.FileInfo, error)
Cancel() error
}

const (
// spool is immutable, consumed tasks remains in spool.
Immutable = iota
// spool is mutable, consumed tasks are removed
Mutable
)

type Spool struct {
reader FileSystem
writer FileSystem
mode int
}

func New(reader, writer FileSystem, mode int) *Spool {
return &Spool{
reader: reader,
writer: writer,
mode: mode,
}
}

// Write new file to spool
func (spool *Spool) Write(path string, r io.Reader) error {
return spool.write(spool.reader, path, r)
}

func (spool *Spool) WriteFile(path string, b []byte) error {
return spool.Write(path, bytes.NewBuffer(b))
}

// Apply the spool function over each file in the reader filesystem, producing
// results to writer file system.
func (spool *Spool) ForEach(
ctx context.Context,
dir string,
f func(context.Context, string, io.Reader) (io.ReadCloser, error),
) error {
return fs.WalkDir(spool.reader, dir,
func(path string, d fs.DirEntry, err error) error {
if err != nil {
return err
}

if d.IsDir() {
return nil
}

fd, err := spool.reader.Open(path)
if err != nil {
return err
}
defer fd.Close()

dd, err := f(ctx, path, fd)
if err != nil {
return err
}
if dd == nil {
return nil
}
defer dd.Close()

err = spool.write(spool.writer, path, dd)
if err != nil {
return err
}

if spool.mode == Mutable {
err = spool.reader.Remove(path)
if err != nil {
slog.Warn("unable to remove the file", "err", err)
}
}

return nil
},
)
}

// Apply the spool function over each file in the reader filesystem, producing
// results to writer file system. It is a variant of [ForEach] that used bytes slices.
func (spool *Spool) ForEachFile(
ctx context.Context,
dir string,
f func(context.Context, string, []byte) ([]byte, error),
) error {
return spool.ForEach(ctx, dir,
func(ctx context.Context, path string, r io.Reader) (io.ReadCloser, error) {
in, err := io.ReadAll(r)
if err != nil {
return nil, err
}

b, err := f(ctx, path, in)
if err != nil {
return nil, err
}
if len(b) == 0 {
return nil, nil
}

return io.NopCloser(bytes.NewBuffer(b)), nil
},
)
}

func (spool *Spool) write(fs stream.CreateFS[struct{}], path string, r io.Reader) error {
fd, err := fs.Create(path, nil)
if err != nil {
return err
}

_, err = io.Copy(fd, r)
if err != nil {
fd.Cancel()
return err
}

err = fd.Close()
if err != nil {
fd.Cancel()
return err
}

return nil
}
47 changes: 47 additions & 0 deletions spool/spool_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
//
// Copyright (C) 2020 - 2025 Dmitry Kolesnikov
//
// This file may be modified and distributed under the terms
// of the MIT license. See the LICENSE file for details.
// https://github.com/fogfish/stream
//

package spool_test

import (
"context"
"os"
"testing"

"github.com/fogfish/it/v2"
"github.com/fogfish/stream/lfs"
"github.com/fogfish/stream/spool"
)

func TestSpool(t *testing.T) {
in, err := lfs.NewTempFS(os.TempDir(), "in")
it.Then(t).Must(it.Nil(err))

to, err := lfs.NewTempFS(os.TempDir(), "to")
it.Then(t).Must(it.Nil(err))

qq := spool.New(in, to, spool.Mutable)

seq := []string{"/a", "/b", "/c", "/d", "/e", "/f"}
for _, txt := range seq {
err := qq.WriteFile(txt, []byte(txt))
it.Then(t).Must(it.Nil(err))
}

dat := []string{}
qq.ForEachFile(context.Background(), "/",
func(ctx context.Context, path string, b []byte) ([]byte, error) {
dat = append(dat, path)
return b, nil
},
)

it.Then(t).Should(
it.Seq(seq).Equal(dat...),
)
}
Loading