From f0757bead3e20db5f47422b02620d20287cf673c Mon Sep 17 00:00:00 2001 From: Seth Erickson Date: Thu, 14 Nov 2024 23:40:48 +0000 Subject: [PATCH] use object declarations iterator for Root.Objects() --- examples/listobjects/main.go | 57 ++++++------------------- namaste.go | 2 +- objectstate.go | 2 +- root.go | 83 ++++++++++++++++++++++++++++-------- root_test.go | 24 ++++++++--- 5 files changed, 98 insertions(+), 70 deletions(-) diff --git a/examples/listobjects/main.go b/examples/listobjects/main.go index df9be78b..eec80849 100644 --- a/examples/listobjects/main.go +++ b/examples/listobjects/main.go @@ -33,24 +33,22 @@ func main() { logger.Error("missing required storage root URI") os.Exit(1) } - fsys, dir, err := parseStoreConn(ctx, storeConn) - if err != nil { - logger.Error("can't parse storage root argument", "err", err) - os.Exit(1) - } - if err := listObjects2(ctx, fsys, dir, numgos, logger); err != nil { + if err := listObjects(ctx, storeConn, numgos, logger); err != nil { logger.Error("exit with errors", "err", err) os.Exit(1) } } -func listObjects2(ctx context.Context, fsys ocfl.FS, dir string, numgos int, log *slog.Logger) (err error) { - allFiles, walkErrFn := ocfl.WalkFiles(ctx, fsys, dir) - defer func() { - err = walkErrFn() - }() - decls := allFiles.Filter(func(f *ocfl.FileRef) bool { return f.Namaste().IsObject() }) - for obj, err := range decls.OpenObjectsBatch(ctx, numgos) { +func listObjects(ctx context.Context, storeConn string, numgos int, log *slog.Logger) (err error) { + fsys, dir, err := parseStoreConn(ctx, storeConn) + if err != nil { + return fmt.Errorf("can't parse storage root argument: %w", err) + } + root, err := ocfl.NewRoot(ctx, fsys, dir) + if err != nil { + return nil + } + for obj, err := range root.ObjectsBatch(ctx, numgos) { if err != nil { log.Error(err.Error()) continue @@ -58,40 +56,9 @@ func listObjects2(ctx context.Context, fsys ocfl.FS, dir string, numgos int, log id := obj.Inventory().ID() fmt.Println(id) } - return + return nil } -// func listObjects(ctx context.Context, fsys ocfl.FS, dir string, gos int, _ *slog.Logger) error { -// objectDirs := func(yield func(string) bool) { -// for dir, err := range ocfl.ObjectPaths(ctx, fsys, dir) { -// if err != nil { -// break -// } -// if !yield(dir) { -// break -// } -// } -// } -// getID := func(dir string) (ocfl.ReadInventory, error) { -// obj, err := ocfl.NewObject(ctx, fsys, dir) -// if err != nil { -// return nil, err -// } -// return obj.Inventory(), nil -// } -// var err error -// resultIter := pipeline.Results(objectDirs, getID, gos) -// resultIter(func(r pipeline.Result[string, ocfl.ReadInventory]) bool { -// if r.Err != nil { -// err = r.Err -// return false -// } -// fmt.Println(r.In, r.Out.ID()) -// return true -// }) -// return err -// } - func parseStoreConn(ctx context.Context, name string) (ocfl.FS, string, error) { //if we were using s3-based backend: rl, err := url.Parse(name) diff --git a/namaste.go b/namaste.go index 941d82cf..73e53324 100644 --- a/namaste.go +++ b/namaste.go @@ -51,7 +51,7 @@ func FindNamaste(items []fs.DirEntry) (Namaste, error) { } } -// Name returns the filename for d (0=TYPE_VERSION) or an empty string if d is +// Name returns the filename for n ('0=TYPE_VERSION') or an empty string if n is // empty func (n Namaste) Name() string { if n.Type == "" || n.Version.Empty() { diff --git a/objectstate.go b/objectstate.go index 620e23ed..c8ce982c 100644 --- a/objectstate.go +++ b/objectstate.go @@ -139,7 +139,7 @@ func (state ObjectState) Empty() bool { return state.Flags == 0 && len(state.VersionDirs) == 0 && len(state.Invalid) == 0 } -// Namaste return the ObjectState's Namaste value, which may be a zero value. +// Namaste returns state's Namaste value, which may be a zero value. func (state ObjectState) Namaste() Namaste { var n Namaste if state.HasNamaste() { diff --git a/root.go b/root.go index a4cd254b..711b36b0 100644 --- a/root.go +++ b/root.go @@ -37,9 +37,9 @@ type Root struct { initArgs *initRootArgs } -// NewRoot returns a new *Root for working with the OCFL storage root at +// NewRoot returns a new *[Root] for working with the OCFL storage root at // directory dir in fsys. It can be used to initialize new storage roots if the -// InitRoot option is used, fsys is an ocfl.WriteFS, and dir is a non-existing +// [InitRoot] option is used, fsys is an ocfl.WriteFS, and dir is a non-existing // or empty directory. func NewRoot(ctx context.Context, fsys FS, dir string, opts ...RootOption) (*Root, error) { r := &Root{fs: fsys, dir: dir} @@ -137,26 +137,48 @@ func (r *Root) ResolveID(id string) (string, error) { return objPath, nil } -// WalkObjectDirs returns an iterator that walks r's directory structure, -// yielding paths and directory entries for all objects. If an error is -// encountered, iteration terminates. The terminating error is accessed with the -// returned error function. -func (r *Root) WalkObjectDirs(ctx context.Context) (iter.Seq2[string, []fs.DirEntry], func() error) { - var walkErr error - dirs := func(yield func(string, []fs.DirEntry) bool) { - for dir, err := range r.walkDirs(ctx) { - if err != nil { - walkErr = err - break +// ObjectDeclarations returns an iterator that yields all OCFL object +// declaration files in r. If an error occurs during iteration, it is returned +// by the error function. +func (r *Root) ObjectDeclarations(ctx context.Context) (FileSeq, func() error) { + allFiles, errFn := WalkFiles(ctx, r.fs, r.dir) + decls := allFiles.Filter(func(f *FileRef) bool { return f.Namaste().IsObject() }) + return decls, errFn +} + +// Objects returns an iterator that yields objects or an error for every object +// declaration file in the root. +func (r *Root) Objects(ctx context.Context) iter.Seq2[*Object, error] { + return func(yield func(*Object, error) bool) { + decls, listErr := r.ObjectDeclarations(ctx) + for obj, err := range decls.OpenObjects(ctx) { + if !yield(obj, err) { + return } - if ParseObjectDir(dir.entries).HasNamaste() { - if !yield(dir.path, dir.entries) { - break - } + } + if err := listErr(); err != nil { + yield(nil, err) + } + } +} + +// ObjectsBatch returns an iterator that uses [FileSeq.OpenObjectsBatch] to open +// objects in the root in numgos separate goroutines, yielding the results +func (r *Root) ObjectsBatch(ctx context.Context, numgos int) iter.Seq2[*Object, error] { + return func(yield func(*Object, error) bool) { + allFiles, listErr := WalkFiles(ctx, r.fs, r.dir) + objs := allFiles. + Filter(func(f *FileRef) bool { return f.Namaste().IsObject() }). + OpenObjectsBatch(ctx, numgos) + for obj, err := range objs { + if !yield(obj, err) { + return } } + if err := listErr(); err != nil { + yield(nil, err) + } } - return dirs, func() error { return walkErr } } // Path returns the root's dir relative to its FS @@ -345,6 +367,7 @@ func writeExtensionConfig(ctx context.Context, fsys WriteFS, root string, config return nil } +// RootOption is used to configure the behavior of [NewRoot]() type RootOption func(*Root) type initRootArgs struct { @@ -365,6 +388,30 @@ func InitRoot(spec Spec, layoutDesc string, extensions ...extension.Extension) R } } +// TODO: export a function for iterating of object root's directory entries. +// +// WalkObjectDirs returns an iterator that walks r's directory structure, +// yielding paths and directory entries for all objects. If an error is +// encountered, iteration terminates. The terminating error is accessed with the +// returned error function. +func (r *Root) walkObjectDirs(ctx context.Context) (iter.Seq2[string, []fs.DirEntry], func() error) { + var walkErr error + dirs := func(yield func(string, []fs.DirEntry) bool) { + for dir, err := range r.walkDirs(ctx) { + if err != nil { + walkErr = err + break + } + if ParseObjectDir(dir.entries).HasNamaste() { + if !yield(dir.path, dir.entries) { + break + } + } + } + } + return dirs, func() error { return walkErr } +} + func (root *Root) walkDirs(ctx context.Context) iter.Seq2[*rootDirRef, error] { return func(yield func(*rootDirRef, error) bool) { root.walkDir(ctx, &rootDirRef{path: "."}, yield) diff --git a/root_test.go b/root_test.go index 58754ab6..d7ee4015 100644 --- a/root_test.go +++ b/root_test.go @@ -84,14 +84,28 @@ func TestRoot(t *testing.T) { root, err := ocfl.NewRoot(ctx, fsys, dir) be.NilErr(t, err) count := 0 - dirs, walkErr := root.WalkObjectDirs(ctx) - for dir := range dirs { - valid := root.ValidateObjectDir(ctx, dir) - be.NilErr(t, valid.Err()) + for obj, err := range root.Objects(ctx) { + be.NilErr(t, err) count++ + be.True(t, obj.Exists()) + } + be.Equal(t, 3, count) + }) + }) + + t.Run("ObjectsBatch", func(t *testing.T) { + t.Run("simple-root", func(t *testing.T) { + fsys := ocfl.DirFS(storeFixturePath) + dir := `1.0/good-stores/simple-root` + root, err := ocfl.NewRoot(ctx, fsys, dir) + be.NilErr(t, err) + count := 0 + for obj, err := range root.ObjectsBatch(ctx, 2) { + be.NilErr(t, err) + count++ + be.True(t, obj.Exists()) } be.Equal(t, 3, count) - be.NilErr(t, walkErr()) }) })