Skip to content

Commit

Permalink
groot/{internal/rcompress,riofs,rtree}: expose API to customize key-c…
Browse files Browse the repository at this point in the history
…ompression

Signed-off-by: Sebastien Binet <binet@cern.ch>
  • Loading branch information
sbinet committed Oct 22, 2024
1 parent 7c6853a commit 419b4d8
Show file tree
Hide file tree
Showing 8 changed files with 172 additions and 11 deletions.
8 changes: 8 additions & 0 deletions groot/internal/rcompress/rcompress.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ type Settings struct {
Lvl int
}

// SettingsFrom create a Settings value from the provided compression
// configuration (compression algorithm and compression level), using
// ROOT's encoding.
func SettingsFrom(compr int32) Settings {
alg, lvl := rootCompressAlgLvl(compr)
return Settings{Alg: alg, Lvl: lvl}
}

// DefaultSettings is the default compression algorithm and level used
// in ROOT files and trees.
var DefaultSettings = Settings{Alg: ZLIB, Lvl: flate.BestSpeed}
Expand Down
2 changes: 1 addition & 1 deletion groot/riofs/dir.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,7 +464,7 @@ func (dir *tdirectoryFile) Put(name string, obj root.Object) error {
dir.addStreamer(si)
}

key, err := newKeyFrom(dir, name, title, rdict.GoName2Cxx(typename), obj, dir.file)
key, err := newKeyFrom(dir, name, title, rdict.GoName2Cxx(typename), obj, dir.file, nil) // FIXME(sbinet): wire in key-opt ?
if err != nil {
return fmt.Errorf("riofs: could not create key %q for object %T: %w", name, obj, err)
}
Expand Down
2 changes: 1 addition & 1 deletion groot/riofs/file.go
Original file line number Diff line number Diff line change
Expand Up @@ -574,7 +574,7 @@ func (f *File) writeStreamerInfo() error {
return fmt.Errorf("riofs: could not write StreamerInfo list: %w", err)
}

key, err = newKeyFromBuf(&f.dir, "StreamerInfo", sinfos.Title(), sinfos.Class(), 1, buf.Bytes(), f)
key, err = newKeyFromBuf(&f.dir, "StreamerInfo", sinfos.Title(), sinfos.Class(), 1, buf.Bytes(), f, nil)
if err != nil {
return fmt.Errorf("riofs: could not create StreamerInfo key: %w", err)
}
Expand Down
59 changes: 53 additions & 6 deletions groot/riofs/key.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,27 @@ import (
"go-hep.org/x/hep/groot/rvers"
)

// KeyOption configures how a Key may be created.
type KeyOption func(cfg *keyCfg) error

// WithKeyCompression configures a Key to use the provided compression scheme.
func WithKeyCompression(compression int32) KeyOption {
return func(cfg *keyCfg) error {
cfg.compr = rcompress.SettingsFrom(compression)
return nil
}
}

type keyCfg struct {
compr rcompress.Settings
}

func newKeyCfg() *keyCfg {
return &keyCfg{
compr: rcompress.Settings{Alg: rcompress.Inherit},
}
}

// noKeyError is the error returned when a riofs.Key could not be found.
type noKeyError struct {
key string
Expand Down Expand Up @@ -122,20 +143,28 @@ func newKey(dir *tdirectoryFile, name, title, class string, objlen int32, f *Fil
// NewKey creates a new key from the provided serialized object buffer.
// NewKey puts the key and its payload at the end of the provided file f.
// Depending on the file configuration, NewKey may compress the provided object buffer.
func NewKey(dir Directory, name, title, class string, cycle int16, obj []byte, f *File) (Key, error) {
func NewKey(dir Directory, name, title, class string, cycle int16, obj []byte, f *File, kopts ...KeyOption) (Key, error) {
var d *tdirectoryFile
if dir != nil {
d = dir.(*tdirectoryFile)
}
return newKeyFromBuf(d, name, title, class, cycle, obj, f)
return newKeyFromBuf(d, name, title, class, cycle, obj, f, kopts)
}

func newKeyFrom(dir *tdirectoryFile, name, title, class string, obj root.Object, f *File) (Key, error) {
func newKeyFrom(dir *tdirectoryFile, name, title, class string, obj root.Object, f *File, kopts []KeyOption) (Key, error) {
var err error
if dir == nil {
dir = &f.dir
}

kcfg := newKeyCfg()
for _, opt := range kopts {
err = opt(kcfg)
if err != nil {
return Key{}, fmt.Errorf("riofs: could not setup Key option: %w", err)
}
}

keylen := keylenFor(name, title, class, dir, f.end)

buf := rbytes.NewWBuffer(nil, nil, uint32(keylen), dir.file)
Expand Down Expand Up @@ -171,7 +200,12 @@ func newKeyFrom(dir *tdirectoryFile, name, title, class string, obj root.Object,
k.rvers += 1000
}

k.buf, err = rcompress.Compress(nil, buf.Bytes(), k.f.compression)
compress := k.f.compression
if kcfg.compr.Alg != rcompress.Inherit {
compress = kcfg.compr.Compression()
}

k.buf, err = rcompress.Compress(nil, buf.Bytes(), compress)
if err != nil {
return k, fmt.Errorf("riofs: could not compress object %T for key %q: %w", obj, name, err)
}
Expand All @@ -185,12 +219,20 @@ func newKeyFrom(dir *tdirectoryFile, name, title, class string, obj root.Object,
return k, nil
}

func newKeyFromBuf(dir *tdirectoryFile, name, title, class string, cycle int16, buf []byte, f *File) (Key, error) {
func newKeyFromBuf(dir *tdirectoryFile, name, title, class string, cycle int16, buf []byte, f *File, kopts []KeyOption) (Key, error) {
var err error
if dir == nil {
dir = &f.dir
}

kcfg := newKeyCfg()
for _, opt := range kopts {
err = opt(kcfg)
if err != nil {
return Key{}, fmt.Errorf("riofs: could not setup Key option: %w", err)
}
}

keylen := keylenFor(name, title, class, dir, f.end)
objlen := int32(len(buf))
k := Key{
Expand All @@ -212,7 +254,12 @@ func newKeyFromBuf(dir *tdirectoryFile, name, title, class string, cycle int16,
k.rvers += 1000
}

k.buf, err = rcompress.Compress(nil, buf, k.f.compression)
compress := k.f.compression
if kcfg.compr.Alg != rcompress.Inherit {
compress = kcfg.compr.Compression()
}

k.buf, err = rcompress.Compress(nil, buf, compress)
if err != nil {
return k, fmt.Errorf("riofs: could not compress object %s for key %q: %w", class, name, err)
}
Expand Down
4 changes: 2 additions & 2 deletions groot/rtree/basket.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ func (b *Basket) grow(n int) {
b.offsets = append(b.offsets, make([]int32, delta)...)
}

func (b *Basket) writeFile(f *riofs.File) (totBytes int64, zipBytes int64, err error) {
func (b *Basket) writeFile(f *riofs.File, compr int32) (totBytes int64, zipBytes int64, err error) {
header := b.header
b.header = true
defer func() {
Expand All @@ -332,7 +332,7 @@ func (b *Basket) writeFile(f *riofs.File) (totBytes int64, zipBytes int64, err e
b.wbuf.WriteArrayI32(b.offsets[:b.nevbuf])
b.wbuf.WriteI32(0)
}
b.key, err = riofs.NewKey(nil, b.key.Name(), b.key.Title(), b.Class(), int16(b.key.Cycle()), b.wbuf.Bytes(), f)
b.key, err = riofs.NewKey(nil, b.key.Name(), b.key.Title(), b.Class(), int16(b.key.Cycle()), b.wbuf.Bytes(), f, riofs.WithKeyCompression(compr))
if err != nil {
return 0, 0, fmt.Errorf("rtree: could not create basket-key: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion groot/rtree/branch.go
Original file line number Diff line number Diff line change
Expand Up @@ -749,7 +749,7 @@ func (b *tbranch) flush() error {
}

f := b.tree.getFile()
totBytes, zipBytes, err := b.ctx.bk.writeFile(f)
totBytes, zipBytes, err := b.ctx.bk.writeFile(f, int32(b.compress))
if err != nil {
return fmt.Errorf("could not marshal basket[%d] (branch=%q): %w", b.writeBasket, b.Name(), err)
}
Expand Down
8 changes: 8 additions & 0 deletions groot/rtree/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,14 @@ func WithZlib(level int) WriteOption {
}
}

// WithZstd configures a ROOT tree to use zstd as a compression mechanism.
func WithZstd(level int) WriteOption {
return func(opt *wopt) error {
opt.compress = rcompress.Settings{Alg: rcompress.ZSTD, Lvl: level}.Compression()
return nil
}
}

// WithBasketSize configures a ROOT tree to use 'size' (in bytes) as a basket buffer size.
// if size is <= 0, the default buffer size is used (DefaultBasketSize).
func WithBasketSize(size int) WriteOption {
Expand Down
98 changes: 98 additions & 0 deletions groot/rtree/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,15 @@
package rtree

import (
"compress/flate"
"fmt"
"os"
"path/filepath"
"reflect"
"sync"
"testing"

"go-hep.org/x/hep/groot/internal/rcompress"
"go-hep.org/x/hep/groot/rbase"
"go-hep.org/x/hep/groot/riofs"
"golang.org/x/exp/rand"
Expand Down Expand Up @@ -244,3 +246,99 @@ func TestWriteThisStreamers(t *testing.T) {
}
}
}

func TestWriterWithCompression(t *testing.T) {
tmp, err := os.MkdirTemp("", "groot-rtree-")
if err != nil {
t.Fatal(err)
}
defer func() {
_ = os.RemoveAll(tmp)
}()

for _, tc := range []struct {
wopt WriteOption
want rcompress.Kind
}{
// {WithoutCompression(flate.BestCompression), rcompress.},
{WithLZ4(flate.BestCompression), rcompress.LZ4},
{WithLZMA(flate.BestCompression), rcompress.LZMA},
{WithZlib(flate.BestCompression), rcompress.ZLIB},
{WithZstd(flate.BestCompression), rcompress.ZSTD},
} {
t.Run("alg-"+tc.want.String(), func(t *testing.T) {
fname := filepath.Join(tmp, "groot-alg-"+tc.want.String())
f, err := riofs.Create(fname)
if err != nil {
t.Fatalf("could not create file %q: %v", fname, err)
}
defer f.Close()

var (
evt struct {
N int32
Sli []float64 `groot:"Sli[N]"`
}
wvars = WriteVarsFromStruct(&evt)
)
w, err := NewWriter(f, "tree", wvars, tc.wopt)
if err != nil {
t.Errorf("could not create tree writer: %+v", err)
return
}
defer w.Close()

rng := rand.New(rand.NewSource(1234))
for i := 0; i < 100; i++ {
evt.N = rng.Int31n(10) + 1
evt.Sli = evt.Sli[:0]
for j := 0; j < int(evt.N); j++ {
evt.Sli = append(evt.Sli, rng.Float64())
}
_, err = w.Write()
if err != nil {
t.Errorf("could not write event %d: %+v", i, err)
return
}
}

err = w.Close()
if err != nil {
t.Errorf("could not close tree writer: %+v", err)
return
}

err = f.Close()
if err != nil {
t.Errorf("could not close root file: %+v", err)
return
}

{
f, err := riofs.Open(fname)
if err != nil {
t.Fatalf("could not open ROOT file %q: %v", fname, err)
}
defer f.Close()

tree, err := riofs.Get[Tree](f, "tree")
if err != nil {
t.Fatalf("could not open tree: %v", err)
}
bname := "Sli"
b := tree.Branch(bname)
if b == nil {
t.Fatalf("could not retrieve branch %q", bname)
}
bb, ok := b.(*tbranch)
if !ok {
t.Fatalf("unexpected type for branch %q: %T", bname, b)
}
xset := rcompress.SettingsFrom(int32(bb.compress))
if got, want := xset.Alg, tc.want; got != want {
t.Fatalf("invalid compression algorithm for branch %q: got=%v, want=%v", b.Name(), got, want)
}
}
})
}
}

0 comments on commit 419b4d8

Please sign in to comment.