Skip to content

Commit

Permalink
[cesium] - revamped file controller
Browse files Browse the repository at this point in the history
  • Loading branch information
LeonLiur committed Aug 1, 2024
1 parent b09a2bc commit 1824bb7
Show file tree
Hide file tree
Showing 4 changed files with 78 additions and 48 deletions.
8 changes: 4 additions & 4 deletions cesium/internal/domain/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func NewErrRangeNotFound(tr telem.TimeRange) error {
type DB struct {
Config
idx *index
files *fileController
fc *fileController
closed *atomic.Bool
}

Expand Down Expand Up @@ -161,7 +161,7 @@ func Open(configs ...Config) (*DB, error) {
return &DB{
Config: cfg,
idx: idx,
files: controller,
fc: controller,
closed: &atomic.Bool{},
}, nil
}
Expand All @@ -179,7 +179,7 @@ func (db *DB) NewIterator(cfg IteratorConfig) *Iterator {
}

func (db *DB) newReader(ctx context.Context, ptr pointer) (*Reader, error) {
internal, err := db.files.acquireReader(ctx, ptr.fileKey)
internal, err := db.fc.acquireReader(ctx, ptr.fileKey)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -211,7 +211,7 @@ func (db *DB) Close() error {
}

w := errors.NewCatcher(errors.WithAggregation())
w.Exec(db.files.close)
w.Exec(db.fc.close)
w.Exec(db.idx.close)
return w.Error()
}
39 changes: 34 additions & 5 deletions cesium/internal/domain/delete.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,21 +181,35 @@ func (db *DB) GarbageCollect(ctx context.Context) error {
return errDBClosed
}

_, err := db.files.gcWriters()
_, err := db.fc.gcWriters()
if err != nil {
return span.Error(err)
}

// There also cannot be any readers open on the file, since any iterators that
// acquire those readers will be symlinked to the old file, causing them to read
// bad data since the new pointers no longer correspond to the old file.
_, err = db.files.gcReaders()
//
// WE ARE BLOCKING ALL READ OPERATIONS ON THE FILE DURING THE ENTIRE DURATION OF GC:
// this is a behaviour that we ideally change in the future to reduce downtime, but
// for now, this is what we implemented.
// The challenge is with the two files during GC: one copy file is made and an
// original file is made. However, existing file handles will point to the original
// file instead of the new file, reading incoherent data with what's stored in the
// index. There are many solutions to this:
// 1. Add a lock on readers before each read operation, and swap the underlying
// file handle for each reader under a lock.
// 2. Use a one-file GC system where no duplicate file is created.
// 3. Wait during GC until all file handles are closed, then swap the file under
// a lock on the file to disallow additional readers from being created. (This might
// be problematic since some readers may never get closed).
_, err = db.fc.gcReaders()
if err != nil {
return span.Error(err)
}

for fileKey := uint16(1); fileKey <= uint16(db.files.counter.Value()); fileKey++ {
if db.files.hasWriter(fileKey) || db.files.hasReader(fileKey) {
for fileKey := uint16(1); fileKey <= uint16(db.fc.counter.Value()); fileKey++ {
if db.fc.hasWriter(fileKey) {
continue
}
s, err := db.FS.Stat(fileKeyToName(fileKey))
Expand Down Expand Up @@ -233,6 +247,21 @@ func (db *DB) garbageCollectFile(key uint16, size int64) error {
offsetDeltaMap = make(map[telem.TimeRange]uint32)
)

db.fc.readers.RLock()
defer db.fc.readers.RUnlock()
rs, ok := db.fc.readers.files[key]
// It's ok if there is no reader entry for the file, this means that no reader has
// been created. And we can be sure that no writer will be created since we hold
// the fc.readers mutex as well, preventing the readers map from being modified.
if ok {
rs.RLock()
defer rs.RUnlock()
// If there's any open file handles on the file, we cannot garbage collect.
if len(rs.open) > 0 {
return nil
}
}

// Find all pointers using the file: there cannot be more pointers using the file
// during GC since the file must be already full – however, there can be less due
// to deletion.
Expand Down Expand Up @@ -321,7 +350,7 @@ func (db *DB) garbageCollectFile(key uint16, size int64) error {
}
db.idx.mu.Unlock()

if err = db.files.rejuvenate(key); err != nil {
if err = db.fc.rejuvenate(key); err != nil {
return err
}

Expand Down
75 changes: 38 additions & 37 deletions cesium/internal/domain/file_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,13 @@ func newErrEntityInUse(entity string, fileKey uint16) error {
return errors.Newf("%s for file %d is in use and cannot be closed", entity, fileKey)
}

// fileReaders represents readers on a file. It provides a mutex lock to prevent any
// modifications to the list of readers.
type fileReaders struct {
sync.RWMutex
open []controlledReader
}

type fileController struct {
Config
writers struct {
Expand All @@ -43,7 +50,7 @@ type fileController struct {
}
readers struct {
sync.RWMutex
open map[uint16][]controlledReader
files map[uint16]*fileReaders
}
release chan struct{}
counter *xio.Int32Counter
Expand All @@ -67,7 +74,7 @@ func openFileController(cfg Config) (*fileController, error) {
counterFile: counterF,
}
fc.writers.open = make(map[uint16]controlledWriter, cfg.MaxDescriptors)
fc.readers.open = make(map[uint16][]controlledReader)
fc.readers.files = make(map[uint16]*fileReaders)
fc.release = make(chan struct{}, cfg.MaxDescriptors)

fc.writers.unopened, err = fc.scanUnopenedFiles()
Expand Down Expand Up @@ -253,13 +260,16 @@ func (fc *fileController) acquireReader(ctx context.Context, key uint16) (*contr
defer span.End()

fc.readers.RLock()
if rs, ok := fc.readers.open[key]; ok {
for _, r := range rs {
if f, ok := fc.readers.files[key]; ok {
f.RLock()
for _, r := range f.open {
if r.tryAcquire() {
f.RUnlock()
fc.readers.RUnlock()
return &r, nil
}
}
f.RUnlock()
}
fc.readers.RUnlock()

Expand Down Expand Up @@ -302,28 +312,38 @@ func (fc *fileController) newReader(ctx context.Context, key uint16) (*controlle
controllerEntry: newPoolEntry(key, fc.release),
}
fc.readers.Lock()
fc.readers.open[key] = append(fc.readers.open[key], r)
f, ok := fc.readers.files[key]
if !ok {
fc.readers.files[key] = &fileReaders{open: []controlledReader{r}}
} else {
f.Lock()
fc.readers.files[key].open = append(fc.readers.files[key].open, r)
f.Unlock()
}
fc.readers.Unlock()
return &r, err
}

func (fc *fileController) gcReaders() (successful bool, err error) {
fc.readers.Lock()
defer fc.readers.Unlock()
for k, v := range fc.readers.open {
for i, r := range v {
for k, f := range fc.readers.files {
f.Lock()
for i, r := range f.open {
if r.tryAcquire() {
err = r.HardClose()
if err != nil {
f.Unlock()
return false, err
}
fc.readers.open[k] = append(v[:i], v[i+1:]...)
fc.readers.files[k].open = append(f.open[:i], f.open[i+1:]...)
successful = true
}
}
if len(fc.readers.open[k]) == 0 {
delete(fc.readers.open, k)
if len(fc.readers.files[k].open) == 0 {
delete(fc.readers.files, k)
}
f.Unlock()
}
return successful, nil
}
Expand Down Expand Up @@ -359,17 +379,6 @@ func (fc *fileController) hasWriter(fileKey uint16) bool {
return ok
}

func (fc *fileController) hasReader(fileKey uint16) bool {
fc.readers.RLock()
defer fc.readers.RUnlock()

rs, ok := fc.readers.open[fileKey]
if !ok {
return false
}
return len(rs) != 0
}

// rejuvenate adds a file key to the unopened writers set. If there is an open writer
// for it, it is removed.
// rejuvenate is called after a file is garbage collected.
Expand All @@ -387,18 +396,6 @@ func (fc *fileController) rejuvenate(fileKey uint16) error {
delete(fc.writers.open, fileKey)
}

if rs, ok := fc.readers.open[fileKey]; ok {
for _, r := range rs {
if !r.tryAcquire() {
return newErrEntityInUse("writer", fileKey)
}
if err := r.ReaderAtCloser.Close(); err != nil {
return err
}
}
delete(fc.readers.open, fileKey)
}

s, err := fc.FS.Stat(fileKeyToName(fileKey))
if err != nil {
return err
Expand All @@ -417,8 +414,10 @@ func (fc *fileController) atDescriptorLimit() bool {
fc.writers.RUnlock()
}()
readerCount := 0
for _, r := range fc.readers.open {
readerCount += len(r)
for _, f := range fc.readers.files {
f.RLock()
readerCount += len(f.open)
f.RUnlock()
}
return readerCount+len(fc.writers.open) >= fc.MaxDescriptors
}
Expand All @@ -439,15 +438,17 @@ func (fc *fileController) close() error {
return w.HardClose()
})
}
for _, v := range fc.readers.open {
for _, r := range v {
for _, f := range fc.readers.files {
f.Lock()
for _, r := range f.open {
c.Exec(func() error {
if !r.tryAcquire() {
return newErrEntityInUse("reader", r.fileKey)
}
return r.HardClose()
})
}
f.Unlock()
}
c.Exec(fc.counterFile.Close)
return c.Error()
Expand Down
4 changes: 2 additions & 2 deletions cesium/internal/domain/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,15 +157,15 @@ func (db *DB) NewWriter(ctx context.Context, cfg WriterConfig) (*Writer, error)
"cannot open writer because there is already data in the writer's time range",
)
}
key, size, internal, err := db.files.acquireWriter(ctx)
key, size, internal, err := db.fc.acquireWriter(ctx)
if err != nil {
return nil, err
}
w := &Writer{
WriterConfig: cfg,
Instrumentation: db.Instrumentation.Child("writer"),
fileKey: key,
fc: db.files,
fc: db.fc,
fileSize: telem.Size(size),
internal: internal,
idx: db.idx,
Expand Down

0 comments on commit 1824bb7

Please sign in to comment.