Skip to content

Commit

Permalink
use object declarations iterator for Root.Objects()
Browse files Browse the repository at this point in the history
  • Loading branch information
srerickson committed Nov 14, 2024
1 parent bd98599 commit f0757be
Show file tree
Hide file tree
Showing 5 changed files with 98 additions and 70 deletions.
57 changes: 12 additions & 45 deletions examples/listobjects/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,65 +33,32 @@ func main() {
logger.Error("missing required storage root URI")
os.Exit(1)
}
fsys, dir, err := parseStoreConn(ctx, storeConn)
if err != nil {
logger.Error("can't parse storage root argument", "err", err)
os.Exit(1)
}
if err := listObjects2(ctx, fsys, dir, numgos, logger); err != nil {
if err := listObjects(ctx, storeConn, numgos, logger); err != nil {
logger.Error("exit with errors", "err", err)
os.Exit(1)
}
}

func listObjects2(ctx context.Context, fsys ocfl.FS, dir string, numgos int, log *slog.Logger) (err error) {
allFiles, walkErrFn := ocfl.WalkFiles(ctx, fsys, dir)
defer func() {
err = walkErrFn()
}()
decls := allFiles.Filter(func(f *ocfl.FileRef) bool { return f.Namaste().IsObject() })
for obj, err := range decls.OpenObjectsBatch(ctx, numgos) {
func listObjects(ctx context.Context, storeConn string, numgos int, log *slog.Logger) (err error) {
fsys, dir, err := parseStoreConn(ctx, storeConn)
if err != nil {
return fmt.Errorf("can't parse storage root argument: %w", err)
}
root, err := ocfl.NewRoot(ctx, fsys, dir)
if err != nil {
return nil
}
for obj, err := range root.ObjectsBatch(ctx, numgos) {
if err != nil {
log.Error(err.Error())
continue
}
id := obj.Inventory().ID()
fmt.Println(id)
}
return
return nil
}

// func listObjects(ctx context.Context, fsys ocfl.FS, dir string, gos int, _ *slog.Logger) error {
// objectDirs := func(yield func(string) bool) {
// for dir, err := range ocfl.ObjectPaths(ctx, fsys, dir) {
// if err != nil {
// break
// }
// if !yield(dir) {
// break
// }
// }
// }
// getID := func(dir string) (ocfl.ReadInventory, error) {
// obj, err := ocfl.NewObject(ctx, fsys, dir)
// if err != nil {
// return nil, err
// }
// return obj.Inventory(), nil
// }
// var err error
// resultIter := pipeline.Results(objectDirs, getID, gos)
// resultIter(func(r pipeline.Result[string, ocfl.ReadInventory]) bool {
// if r.Err != nil {
// err = r.Err
// return false
// }
// fmt.Println(r.In, r.Out.ID())
// return true
// })
// return err
// }

func parseStoreConn(ctx context.Context, name string) (ocfl.FS, string, error) {
//if we were using s3-based backend:
rl, err := url.Parse(name)
Expand Down
2 changes: 1 addition & 1 deletion namaste.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func FindNamaste(items []fs.DirEntry) (Namaste, error) {
}
}

// Name returns the filename for d (0=TYPE_VERSION) or an empty string if d is
// Name returns the filename for n ('0=TYPE_VERSION') or an empty string if n is
// empty
func (n Namaste) Name() string {
if n.Type == "" || n.Version.Empty() {
Expand Down
2 changes: 1 addition & 1 deletion objectstate.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ func (state ObjectState) Empty() bool {
return state.Flags == 0 && len(state.VersionDirs) == 0 && len(state.Invalid) == 0
}

// Namaste return the ObjectState's Namaste value, which may be a zero value.
// Namaste returns state's Namaste value, which may be a zero value.
func (state ObjectState) Namaste() Namaste {
var n Namaste
if state.HasNamaste() {
Expand Down
83 changes: 65 additions & 18 deletions root.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ type Root struct {
initArgs *initRootArgs
}

// NewRoot returns a new *Root for working with the OCFL storage root at
// NewRoot returns a new *[Root] for working with the OCFL storage root at
// directory dir in fsys. It can be used to initialize new storage roots if the
// InitRoot option is used, fsys is an ocfl.WriteFS, and dir is a non-existing
// [InitRoot] option is used, fsys is an ocfl.WriteFS, and dir is a non-existing
// or empty directory.
func NewRoot(ctx context.Context, fsys FS, dir string, opts ...RootOption) (*Root, error) {
r := &Root{fs: fsys, dir: dir}
Expand Down Expand Up @@ -137,26 +137,48 @@ func (r *Root) ResolveID(id string) (string, error) {
return objPath, nil
}

// WalkObjectDirs returns an iterator that walks r's directory structure,
// yielding paths and directory entries for all objects. If an error is
// encountered, iteration terminates. The terminating error is accessed with the
// returned error function.
func (r *Root) WalkObjectDirs(ctx context.Context) (iter.Seq2[string, []fs.DirEntry], func() error) {
var walkErr error
dirs := func(yield func(string, []fs.DirEntry) bool) {
for dir, err := range r.walkDirs(ctx) {
if err != nil {
walkErr = err
break
// ObjectDeclarations returns an iterator that yields all OCFL object
// declaration files in r. If an error occurs during iteration, it is returned
// by the error function.
func (r *Root) ObjectDeclarations(ctx context.Context) (FileSeq, func() error) {
allFiles, errFn := WalkFiles(ctx, r.fs, r.dir)
decls := allFiles.Filter(func(f *FileRef) bool { return f.Namaste().IsObject() })
return decls, errFn
}

// Objects returns an iterator that yields objects or an error for every object
// declaration file in the root.
func (r *Root) Objects(ctx context.Context) iter.Seq2[*Object, error] {
return func(yield func(*Object, error) bool) {
decls, listErr := r.ObjectDeclarations(ctx)
for obj, err := range decls.OpenObjects(ctx) {
if !yield(obj, err) {
return
}
if ParseObjectDir(dir.entries).HasNamaste() {
if !yield(dir.path, dir.entries) {
break
}
}
if err := listErr(); err != nil {
yield(nil, err)
}
}
}

// ObjectsBatch returns an iterator that uses [FileSeq.OpenObjectsBatch] to open
// objects in the root in numgos separate goroutines, yielding the results
func (r *Root) ObjectsBatch(ctx context.Context, numgos int) iter.Seq2[*Object, error] {
return func(yield func(*Object, error) bool) {
allFiles, listErr := WalkFiles(ctx, r.fs, r.dir)
objs := allFiles.
Filter(func(f *FileRef) bool { return f.Namaste().IsObject() }).
OpenObjectsBatch(ctx, numgos)
for obj, err := range objs {
if !yield(obj, err) {
return
}
}
if err := listErr(); err != nil {
yield(nil, err)
}
}
return dirs, func() error { return walkErr }
}

// Path returns the root's dir relative to its FS
Expand Down Expand Up @@ -345,6 +367,7 @@ func writeExtensionConfig(ctx context.Context, fsys WriteFS, root string, config
return nil
}

// RootOption is used to configure the behavior of [NewRoot]()
type RootOption func(*Root)

type initRootArgs struct {
Expand All @@ -365,6 +388,30 @@ func InitRoot(spec Spec, layoutDesc string, extensions ...extension.Extension) R
}
}

// TODO: export a function for iterating of object root's directory entries.
//
// WalkObjectDirs returns an iterator that walks r's directory structure,
// yielding paths and directory entries for all objects. If an error is
// encountered, iteration terminates. The terminating error is accessed with the
// returned error function.
func (r *Root) walkObjectDirs(ctx context.Context) (iter.Seq2[string, []fs.DirEntry], func() error) {
var walkErr error
dirs := func(yield func(string, []fs.DirEntry) bool) {
for dir, err := range r.walkDirs(ctx) {
if err != nil {
walkErr = err
break
}
if ParseObjectDir(dir.entries).HasNamaste() {
if !yield(dir.path, dir.entries) {
break
}
}
}
}
return dirs, func() error { return walkErr }
}

func (root *Root) walkDirs(ctx context.Context) iter.Seq2[*rootDirRef, error] {
return func(yield func(*rootDirRef, error) bool) {
root.walkDir(ctx, &rootDirRef{path: "."}, yield)
Expand Down
24 changes: 19 additions & 5 deletions root_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,28 @@ func TestRoot(t *testing.T) {
root, err := ocfl.NewRoot(ctx, fsys, dir)
be.NilErr(t, err)
count := 0
dirs, walkErr := root.WalkObjectDirs(ctx)
for dir := range dirs {
valid := root.ValidateObjectDir(ctx, dir)
be.NilErr(t, valid.Err())
for obj, err := range root.Objects(ctx) {
be.NilErr(t, err)
count++
be.True(t, obj.Exists())
}
be.Equal(t, 3, count)
})
})

t.Run("ObjectsBatch", func(t *testing.T) {
t.Run("simple-root", func(t *testing.T) {
fsys := ocfl.DirFS(storeFixturePath)
dir := `1.0/good-stores/simple-root`
root, err := ocfl.NewRoot(ctx, fsys, dir)
be.NilErr(t, err)
count := 0
for obj, err := range root.ObjectsBatch(ctx, 2) {
be.NilErr(t, err)
count++
be.True(t, obj.Exists())
}
be.Equal(t, 3, count)
be.NilErr(t, walkErr())
})
})

Expand Down

0 comments on commit f0757be

Please sign in to comment.