From 1824bb7083fbb805406663e5f4abead209f96591 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?L=C3=A9o=20Liu?= Date: Wed, 31 Jul 2024 21:27:44 -0400 Subject: [PATCH] [cesium] - revamped file controller --- cesium/internal/domain/db.go | 8 +-- cesium/internal/domain/delete.go | 39 ++++++++++-- cesium/internal/domain/file_controller.go | 75 ++++++++++++----------- cesium/internal/domain/writer.go | 4 +- 4 files changed, 78 insertions(+), 48 deletions(-) diff --git a/cesium/internal/domain/db.go b/cesium/internal/domain/db.go index 785256aa2b..fa7004b280 100644 --- a/cesium/internal/domain/db.go +++ b/cesium/internal/domain/db.go @@ -71,7 +71,7 @@ func NewErrRangeNotFound(tr telem.TimeRange) error { type DB struct { Config idx *index - files *fileController + fc *fileController closed *atomic.Bool } @@ -161,7 +161,7 @@ func Open(configs ...Config) (*DB, error) { return &DB{ Config: cfg, idx: idx, - files: controller, + fc: controller, closed: &atomic.Bool{}, }, nil } @@ -179,7 +179,7 @@ func (db *DB) NewIterator(cfg IteratorConfig) *Iterator { } func (db *DB) newReader(ctx context.Context, ptr pointer) (*Reader, error) { - internal, err := db.files.acquireReader(ctx, ptr.fileKey) + internal, err := db.fc.acquireReader(ctx, ptr.fileKey) if err != nil { return nil, err } @@ -211,7 +211,7 @@ func (db *DB) Close() error { } w := errors.NewCatcher(errors.WithAggregation()) - w.Exec(db.files.close) + w.Exec(db.fc.close) w.Exec(db.idx.close) return w.Error() } diff --git a/cesium/internal/domain/delete.go b/cesium/internal/domain/delete.go index 859b482fce..2a5cb5dbcf 100644 --- a/cesium/internal/domain/delete.go +++ b/cesium/internal/domain/delete.go @@ -181,7 +181,7 @@ func (db *DB) GarbageCollect(ctx context.Context) error { return errDBClosed } - _, err := db.files.gcWriters() + _, err := db.fc.gcWriters() if err != nil { return span.Error(err) } @@ -189,13 +189,27 @@ func (db *DB) GarbageCollect(ctx context.Context) error { // There also cannot be any readers open on the file, since any iterators that // acquire those readers will be symlinked to the old file, causing them to read // bad data since the new pointers no longer correspond to the old file. - _, err = db.files.gcReaders() + // + // WE ARE BLOCKING ALL READ OPERATIONS ON THE FILE DURING THE ENTIRE DURATION OF GC: + // this is a behaviour that we ideally change in the future to reduce downtime, but + // for now, this is what we implemented. + // The challenge is with the two files during GC: one copy file is made and an + // original file is made. However, existing file handles will point to the original + // file instead of the new file, reading incoherent data with what's stored in the + // index. There are many solutions to this: + // 1. Add a lock on readers before each read operation, and swap the underlying + // file handle for each reader under a lock. + // 2. Use a one-file GC system where no duplicate file is created. + // 3. Wait during GC until all file handles are closed, then swap the file under + // a lock on the file to disallow additional readers from being created. (This might + // be problematic since some readers may never get closed). + _, err = db.fc.gcReaders() if err != nil { return span.Error(err) } - for fileKey := uint16(1); fileKey <= uint16(db.files.counter.Value()); fileKey++ { - if db.files.hasWriter(fileKey) || db.files.hasReader(fileKey) { + for fileKey := uint16(1); fileKey <= uint16(db.fc.counter.Value()); fileKey++ { + if db.fc.hasWriter(fileKey) { continue } s, err := db.FS.Stat(fileKeyToName(fileKey)) @@ -233,6 +247,21 @@ func (db *DB) garbageCollectFile(key uint16, size int64) error { offsetDeltaMap = make(map[telem.TimeRange]uint32) ) + db.fc.readers.RLock() + defer db.fc.readers.RUnlock() + rs, ok := db.fc.readers.files[key] + // It's ok if there is no reader entry for the file, this means that no reader has + // been created. And we can be sure that no writer will be created since we hold + // the fc.readers mutex as well, preventing the readers map from being modified. + if ok { + rs.RLock() + defer rs.RUnlock() + // If there's any open file handles on the file, we cannot garbage collect. + if len(rs.open) > 0 { + return nil + } + } + // Find all pointers using the file: there cannot be more pointers using the file // during GC since the file must be already full – however, there can be less due // to deletion. @@ -321,7 +350,7 @@ func (db *DB) garbageCollectFile(key uint16, size int64) error { } db.idx.mu.Unlock() - if err = db.files.rejuvenate(key); err != nil { + if err = db.fc.rejuvenate(key); err != nil { return err } diff --git a/cesium/internal/domain/file_controller.go b/cesium/internal/domain/file_controller.go index cc096fad55..2b21c1ed08 100644 --- a/cesium/internal/domain/file_controller.go +++ b/cesium/internal/domain/file_controller.go @@ -32,6 +32,13 @@ func newErrEntityInUse(entity string, fileKey uint16) error { return errors.Newf("%s for file %d is in use and cannot be closed", entity, fileKey) } +// fileReaders represents readers on a file. It provides a mutex lock to prevent any +// modifications to the list of readers. +type fileReaders struct { + sync.RWMutex + open []controlledReader +} + type fileController struct { Config writers struct { @@ -43,7 +50,7 @@ type fileController struct { } readers struct { sync.RWMutex - open map[uint16][]controlledReader + files map[uint16]*fileReaders } release chan struct{} counter *xio.Int32Counter @@ -67,7 +74,7 @@ func openFileController(cfg Config) (*fileController, error) { counterFile: counterF, } fc.writers.open = make(map[uint16]controlledWriter, cfg.MaxDescriptors) - fc.readers.open = make(map[uint16][]controlledReader) + fc.readers.files = make(map[uint16]*fileReaders) fc.release = make(chan struct{}, cfg.MaxDescriptors) fc.writers.unopened, err = fc.scanUnopenedFiles() @@ -253,13 +260,16 @@ func (fc *fileController) acquireReader(ctx context.Context, key uint16) (*contr defer span.End() fc.readers.RLock() - if rs, ok := fc.readers.open[key]; ok { - for _, r := range rs { + if f, ok := fc.readers.files[key]; ok { + f.RLock() + for _, r := range f.open { if r.tryAcquire() { + f.RUnlock() fc.readers.RUnlock() return &r, nil } } + f.RUnlock() } fc.readers.RUnlock() @@ -302,7 +312,14 @@ func (fc *fileController) newReader(ctx context.Context, key uint16) (*controlle controllerEntry: newPoolEntry(key, fc.release), } fc.readers.Lock() - fc.readers.open[key] = append(fc.readers.open[key], r) + f, ok := fc.readers.files[key] + if !ok { + fc.readers.files[key] = &fileReaders{open: []controlledReader{r}} + } else { + f.Lock() + fc.readers.files[key].open = append(fc.readers.files[key].open, r) + f.Unlock() + } fc.readers.Unlock() return &r, err } @@ -310,20 +327,23 @@ func (fc *fileController) newReader(ctx context.Context, key uint16) (*controlle func (fc *fileController) gcReaders() (successful bool, err error) { fc.readers.Lock() defer fc.readers.Unlock() - for k, v := range fc.readers.open { - for i, r := range v { + for k, f := range fc.readers.files { + f.Lock() + for i, r := range f.open { if r.tryAcquire() { err = r.HardClose() if err != nil { + f.Unlock() return false, err } - fc.readers.open[k] = append(v[:i], v[i+1:]...) + fc.readers.files[k].open = append(f.open[:i], f.open[i+1:]...) successful = true } } - if len(fc.readers.open[k]) == 0 { - delete(fc.readers.open, k) + if len(fc.readers.files[k].open) == 0 { + delete(fc.readers.files, k) } + f.Unlock() } return successful, nil } @@ -359,17 +379,6 @@ func (fc *fileController) hasWriter(fileKey uint16) bool { return ok } -func (fc *fileController) hasReader(fileKey uint16) bool { - fc.readers.RLock() - defer fc.readers.RUnlock() - - rs, ok := fc.readers.open[fileKey] - if !ok { - return false - } - return len(rs) != 0 -} - // rejuvenate adds a file key to the unopened writers set. If there is an open writer // for it, it is removed. // rejuvenate is called after a file is garbage collected. @@ -387,18 +396,6 @@ func (fc *fileController) rejuvenate(fileKey uint16) error { delete(fc.writers.open, fileKey) } - if rs, ok := fc.readers.open[fileKey]; ok { - for _, r := range rs { - if !r.tryAcquire() { - return newErrEntityInUse("writer", fileKey) - } - if err := r.ReaderAtCloser.Close(); err != nil { - return err - } - } - delete(fc.readers.open, fileKey) - } - s, err := fc.FS.Stat(fileKeyToName(fileKey)) if err != nil { return err @@ -417,8 +414,10 @@ func (fc *fileController) atDescriptorLimit() bool { fc.writers.RUnlock() }() readerCount := 0 - for _, r := range fc.readers.open { - readerCount += len(r) + for _, f := range fc.readers.files { + f.RLock() + readerCount += len(f.open) + f.RUnlock() } return readerCount+len(fc.writers.open) >= fc.MaxDescriptors } @@ -439,8 +438,9 @@ func (fc *fileController) close() error { return w.HardClose() }) } - for _, v := range fc.readers.open { - for _, r := range v { + for _, f := range fc.readers.files { + f.Lock() + for _, r := range f.open { c.Exec(func() error { if !r.tryAcquire() { return newErrEntityInUse("reader", r.fileKey) @@ -448,6 +448,7 @@ func (fc *fileController) close() error { return r.HardClose() }) } + f.Unlock() } c.Exec(fc.counterFile.Close) return c.Error() diff --git a/cesium/internal/domain/writer.go b/cesium/internal/domain/writer.go index 794d510e8b..68b37b498d 100644 --- a/cesium/internal/domain/writer.go +++ b/cesium/internal/domain/writer.go @@ -157,7 +157,7 @@ func (db *DB) NewWriter(ctx context.Context, cfg WriterConfig) (*Writer, error) "cannot open writer because there is already data in the writer's time range", ) } - key, size, internal, err := db.files.acquireWriter(ctx) + key, size, internal, err := db.fc.acquireWriter(ctx) if err != nil { return nil, err } @@ -165,7 +165,7 @@ func (db *DB) NewWriter(ctx context.Context, cfg WriterConfig) (*Writer, error) WriterConfig: cfg, Instrumentation: db.Instrumentation.Child("writer"), fileKey: key, - fc: db.files, + fc: db.fc, fileSize: telem.Size(size), internal: internal, idx: db.idx,