diff --git a/groot/internal/rcompress/rcompress.go b/groot/internal/rcompress/rcompress.go index e9b710ab2..75f26eca6 100644 --- a/groot/internal/rcompress/rcompress.go +++ b/groot/internal/rcompress/rcompress.go @@ -55,6 +55,14 @@ type Settings struct { Lvl int } +// SettingsFrom create a Settings value from the provided compression +// configuration (compression algorithm and compression level), using +// ROOT's encoding. +func SettingsFrom(compr int32) Settings { + alg, lvl := rootCompressAlgLvl(compr) + return Settings{Alg: alg, Lvl: lvl} +} + // DefaultSettings is the default compression algorithm and level used // in ROOT files and trees. var DefaultSettings = Settings{Alg: ZLIB, Lvl: flate.BestSpeed} diff --git a/groot/riofs/dir.go b/groot/riofs/dir.go index 5d07c8645..ce22b937d 100644 --- a/groot/riofs/dir.go +++ b/groot/riofs/dir.go @@ -464,7 +464,7 @@ func (dir *tdirectoryFile) Put(name string, obj root.Object) error { dir.addStreamer(si) } - key, err := newKeyFrom(dir, name, title, rdict.GoName2Cxx(typename), obj, dir.file) + key, err := newKeyFrom(dir, name, title, rdict.GoName2Cxx(typename), obj, dir.file, nil) // FIXME(sbinet): wire in key-opt ? if err != nil { return fmt.Errorf("riofs: could not create key %q for object %T: %w", name, obj, err) } diff --git a/groot/riofs/file.go b/groot/riofs/file.go index 9547547e8..204b643ba 100644 --- a/groot/riofs/file.go +++ b/groot/riofs/file.go @@ -574,7 +574,7 @@ func (f *File) writeStreamerInfo() error { return fmt.Errorf("riofs: could not write StreamerInfo list: %w", err) } - key, err = newKeyFromBuf(&f.dir, "StreamerInfo", sinfos.Title(), sinfos.Class(), 1, buf.Bytes(), f) + key, err = newKeyFromBuf(&f.dir, "StreamerInfo", sinfos.Title(), sinfos.Class(), 1, buf.Bytes(), f, nil) if err != nil { return fmt.Errorf("riofs: could not create StreamerInfo key: %w", err) } diff --git a/groot/riofs/key.go b/groot/riofs/key.go index 048629b72..9babd6f6a 100644 --- a/groot/riofs/key.go +++ b/groot/riofs/key.go @@ -18,6 +18,27 @@ import ( "go-hep.org/x/hep/groot/rvers" ) +// KeyOption configures how a Key may be created. +type KeyOption func(cfg *keyCfg) error + +// WithKeyCompression configures a Key to use the provided compression scheme. +func WithKeyCompression(compression int32) KeyOption { + return func(cfg *keyCfg) error { + cfg.compr = rcompress.SettingsFrom(compression) + return nil + } +} + +type keyCfg struct { + compr rcompress.Settings +} + +func newKeyCfg() *keyCfg { + return &keyCfg{ + compr: rcompress.Settings{Alg: rcompress.Inherit}, + } +} + // noKeyError is the error returned when a riofs.Key could not be found. type noKeyError struct { key string @@ -122,20 +143,28 @@ func newKey(dir *tdirectoryFile, name, title, class string, objlen int32, f *Fil // NewKey creates a new key from the provided serialized object buffer. // NewKey puts the key and its payload at the end of the provided file f. // Depending on the file configuration, NewKey may compress the provided object buffer. -func NewKey(dir Directory, name, title, class string, cycle int16, obj []byte, f *File) (Key, error) { +func NewKey(dir Directory, name, title, class string, cycle int16, obj []byte, f *File, kopts ...KeyOption) (Key, error) { var d *tdirectoryFile if dir != nil { d = dir.(*tdirectoryFile) } - return newKeyFromBuf(d, name, title, class, cycle, obj, f) + return newKeyFromBuf(d, name, title, class, cycle, obj, f, kopts) } -func newKeyFrom(dir *tdirectoryFile, name, title, class string, obj root.Object, f *File) (Key, error) { +func newKeyFrom(dir *tdirectoryFile, name, title, class string, obj root.Object, f *File, kopts []KeyOption) (Key, error) { var err error if dir == nil { dir = &f.dir } + kcfg := newKeyCfg() + for _, opt := range kopts { + err = opt(kcfg) + if err != nil { + return Key{}, fmt.Errorf("riofs: could not setup Key option: %w", err) + } + } + keylen := keylenFor(name, title, class, dir, f.end) buf := rbytes.NewWBuffer(nil, nil, uint32(keylen), dir.file) @@ -171,7 +200,12 @@ func newKeyFrom(dir *tdirectoryFile, name, title, class string, obj root.Object, k.rvers += 1000 } - k.buf, err = rcompress.Compress(nil, buf.Bytes(), k.f.compression) + compress := k.f.compression + if kcfg.compr.Alg != rcompress.Inherit { + compress = kcfg.compr.Compression() + } + + k.buf, err = rcompress.Compress(nil, buf.Bytes(), compress) if err != nil { return k, fmt.Errorf("riofs: could not compress object %T for key %q: %w", obj, name, err) } @@ -185,12 +219,20 @@ func newKeyFrom(dir *tdirectoryFile, name, title, class string, obj root.Object, return k, nil } -func newKeyFromBuf(dir *tdirectoryFile, name, title, class string, cycle int16, buf []byte, f *File) (Key, error) { +func newKeyFromBuf(dir *tdirectoryFile, name, title, class string, cycle int16, buf []byte, f *File, kopts []KeyOption) (Key, error) { var err error if dir == nil { dir = &f.dir } + kcfg := newKeyCfg() + for _, opt := range kopts { + err = opt(kcfg) + if err != nil { + return Key{}, fmt.Errorf("riofs: could not setup Key option: %w", err) + } + } + keylen := keylenFor(name, title, class, dir, f.end) objlen := int32(len(buf)) k := Key{ @@ -212,7 +254,12 @@ func newKeyFromBuf(dir *tdirectoryFile, name, title, class string, cycle int16, k.rvers += 1000 } - k.buf, err = rcompress.Compress(nil, buf, k.f.compression) + compress := k.f.compression + if kcfg.compr.Alg != rcompress.Inherit { + compress = kcfg.compr.Compression() + } + + k.buf, err = rcompress.Compress(nil, buf, compress) if err != nil { return k, fmt.Errorf("riofs: could not compress object %s for key %q: %w", class, name, err) } diff --git a/groot/rtree/basket.go b/groot/rtree/basket.go index 58d21028d..f5061dedc 100644 --- a/groot/rtree/basket.go +++ b/groot/rtree/basket.go @@ -306,7 +306,7 @@ func (b *Basket) grow(n int) { b.offsets = append(b.offsets, make([]int32, delta)...) } -func (b *Basket) writeFile(f *riofs.File) (totBytes int64, zipBytes int64, err error) { +func (b *Basket) writeFile(f *riofs.File, compr int32) (totBytes int64, zipBytes int64, err error) { header := b.header b.header = true defer func() { @@ -332,7 +332,7 @@ func (b *Basket) writeFile(f *riofs.File) (totBytes int64, zipBytes int64, err e b.wbuf.WriteArrayI32(b.offsets[:b.nevbuf]) b.wbuf.WriteI32(0) } - b.key, err = riofs.NewKey(nil, b.key.Name(), b.key.Title(), b.Class(), int16(b.key.Cycle()), b.wbuf.Bytes(), f) + b.key, err = riofs.NewKey(nil, b.key.Name(), b.key.Title(), b.Class(), int16(b.key.Cycle()), b.wbuf.Bytes(), f, riofs.WithKeyCompression(compr)) if err != nil { return 0, 0, fmt.Errorf("rtree: could not create basket-key: %w", err) } diff --git a/groot/rtree/branch.go b/groot/rtree/branch.go index c47fa1280..d3c7569b8 100644 --- a/groot/rtree/branch.go +++ b/groot/rtree/branch.go @@ -749,7 +749,7 @@ func (b *tbranch) flush() error { } f := b.tree.getFile() - totBytes, zipBytes, err := b.ctx.bk.writeFile(f) + totBytes, zipBytes, err := b.ctx.bk.writeFile(f, int32(b.compress)) if err != nil { return fmt.Errorf("could not marshal basket[%d] (branch=%q): %w", b.writeBasket, b.Name(), err) } diff --git a/groot/rtree/writer.go b/groot/rtree/writer.go index c11803bd5..5cfa33dee 100644 --- a/groot/rtree/writer.go +++ b/groot/rtree/writer.go @@ -72,6 +72,14 @@ func WithZlib(level int) WriteOption { } } +// WithZstd configures a ROOT tree to use zstd as a compression mechanism. +func WithZstd(level int) WriteOption { + return func(opt *wopt) error { + opt.compress = rcompress.Settings{Alg: rcompress.ZSTD, Lvl: level}.Compression() + return nil + } +} + // WithBasketSize configures a ROOT tree to use 'size' (in bytes) as a basket buffer size. // if size is <= 0, the default buffer size is used (DefaultBasketSize). func WithBasketSize(size int) WriteOption { diff --git a/groot/rtree/writer_test.go b/groot/rtree/writer_test.go index 32ecb2483..b8149d780 100644 --- a/groot/rtree/writer_test.go +++ b/groot/rtree/writer_test.go @@ -5,6 +5,7 @@ package rtree import ( + "compress/flate" "fmt" "os" "path/filepath" @@ -12,6 +13,7 @@ import ( "sync" "testing" + "go-hep.org/x/hep/groot/internal/rcompress" "go-hep.org/x/hep/groot/rbase" "go-hep.org/x/hep/groot/riofs" "golang.org/x/exp/rand" @@ -244,3 +246,99 @@ func TestWriteThisStreamers(t *testing.T) { } } } + +func TestWriterWithCompression(t *testing.T) { + tmp, err := os.MkdirTemp("", "groot-rtree-") + if err != nil { + t.Fatal(err) + } + defer func() { + _ = os.RemoveAll(tmp) + }() + + for _, tc := range []struct { + wopt WriteOption + want rcompress.Kind + }{ + // {WithoutCompression(flate.BestCompression), rcompress.}, + {WithLZ4(flate.BestCompression), rcompress.LZ4}, + {WithLZMA(flate.BestCompression), rcompress.LZMA}, + {WithZlib(flate.BestCompression), rcompress.ZLIB}, + {WithZstd(flate.BestCompression), rcompress.ZSTD}, + } { + t.Run("alg-"+tc.want.String(), func(t *testing.T) { + fname := filepath.Join(tmp, "groot-alg-"+tc.want.String()) + f, err := riofs.Create(fname) + if err != nil { + t.Fatalf("could not create file %q: %v", fname, err) + } + defer f.Close() + + var ( + evt struct { + N int32 + Sli []float64 `groot:"Sli[N]"` + } + wvars = WriteVarsFromStruct(&evt) + ) + w, err := NewWriter(f, "tree", wvars, tc.wopt) + if err != nil { + t.Errorf("could not create tree writer: %+v", err) + return + } + defer w.Close() + + rng := rand.New(rand.NewSource(1234)) + for i := 0; i < 100; i++ { + evt.N = rng.Int31n(10) + 1 + evt.Sli = evt.Sli[:0] + for j := 0; j < int(evt.N); j++ { + evt.Sli = append(evt.Sli, rng.Float64()) + } + _, err = w.Write() + if err != nil { + t.Errorf("could not write event %d: %+v", i, err) + return + } + } + + err = w.Close() + if err != nil { + t.Errorf("could not close tree writer: %+v", err) + return + } + + err = f.Close() + if err != nil { + t.Errorf("could not close root file: %+v", err) + return + } + + { + f, err := riofs.Open(fname) + if err != nil { + t.Fatalf("could not open ROOT file %q: %v", fname, err) + } + defer f.Close() + + tree, err := riofs.Get[Tree](f, "tree") + if err != nil { + t.Fatalf("could not open tree: %v", err) + } + bname := "Sli" + b := tree.Branch(bname) + if b == nil { + t.Fatalf("could not retrieve branch %q", bname) + } + bb, ok := b.(*tbranch) + if !ok { + t.Fatalf("unexpected type for branch %q: %T", bname, b) + } + xset := rcompress.SettingsFrom(int32(bb.compress)) + if got, want := xset.Alg, tc.want; got != want { + t.Fatalf("invalid compression algorithm for branch %q: got=%v, want=%v", b.Name(), got, want) + } + } + }) + } +}