Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improving file eviction performance #696

Draft
wants to merge 4 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 35 additions & 13 deletions cache/disk/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,9 @@ type diskCache struct {
accessLogger *log.Logger
containsQueue chan proxyCheck

// Limit the number of simultaneous file removals.
fileRemovalSem *semaphore.Weighted
// Limit the number of simultaneous file removals and filesystem write
// operations (apart from atime updates, which we hope are fast).
diskWaitSem *semaphore.Weighted

mu sync.Mutex
lru SizedLRU
Expand All @@ -103,6 +104,11 @@ func badReqErr(format string, a ...interface{}) *cache.Error {
}
}

var ErrOverloaded = &cache.Error{
Code: http.StatusInsufficientStorage,
Text: "Out of disk space, due to too large or too many concurrent cache requests. Please try again later.",
}

// Non-test users must call this to expose metrics.
func (c *diskCache) RegisterMetrics() {
c.lru.RegisterMetrics()
Expand All @@ -114,6 +120,21 @@ func (c *diskCache) RegisterMetrics() {
// but since the updater func must lock the cache mu, it was deemed
// necessary to have greater control of when to get the cache age
go c.pollCacheAge()

go c.shiftMetricPeriodContinuously()
}

// Shift to new period for metrics every 30 seconds. A period of
// 30 seconds should give margin to catch all peaks (with for example
// a 10 second scrape interval) even in cases of delayed or missed
// scrapes from prometheus.
func (c *diskCache) shiftMetricPeriodContinuously() {
ticker := time.NewTicker(30 * time.Second)
for ; true; <-ticker.C {
c.mu.Lock()
c.lru.shiftToNextMetricPeriod()
c.mu.Unlock()
}
}

// Update metric every minute with the idle time of the least recently used item in the cache
Expand Down Expand Up @@ -167,12 +188,6 @@ func (c *diskCache) getElementPath(key Key, value lruItem) string {
}

func (c *diskCache) removeFile(f string) {
if err := c.fileRemovalSem.Acquire(context.Background(), 1); err != nil {
log.Printf("ERROR: failed to aquire semaphore: %v, unable to remove %s", err, f)
return
}
defer c.fileRemovalSem.Release(1)

err := os.Remove(f)
if err != nil {
log.Printf("ERROR: failed to remove evicted cache file: %s", f)
Expand Down Expand Up @@ -240,6 +255,17 @@ func (c *diskCache) Put(ctx context.Context, kind cache.EntryKind, hash string,
return nil
}

// Put requests are processed using blocking file syscalls, which
// consume one operating system thread per put request. Throttling
// the Put requests with a semaphore to avoid requring too many
// operating system threads. Get requests do not seem to consume any
// significant amount of OS threads and are therefore not throttled.
if err := c.diskWaitSem.Acquire(context.Background(), 1); err != nil {
log.Printf("ERROR: failed to aquire semaphore: %v", err)
return internalErr(err)
}
defer c.diskWaitSem.Release(1)

key := cache.LookupKey(kind, hash)

var tf *os.File // Tempfile.
Expand Down Expand Up @@ -282,11 +308,7 @@ func (c *diskCache) Put(ctx context.Context, kind cache.EntryKind, hash string,
}
if !ok {
c.mu.Unlock()
return &cache.Error{
Code: http.StatusInsufficientStorage,
Text: fmt.Sprintf("The item (%d) + reserved space is larger than the cache's maximum size (%d).",
size, c.lru.MaxSize()),
}
return ErrOverloaded
}
c.mu.Unlock()
unreserve = true
Expand Down
68 changes: 51 additions & 17 deletions cache/disk/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,18 +43,14 @@ func New(dir string, maxSizeBytes int64, opts ...Option) (Cache, error) {
}

// Go defaults to a limit of 10,000 operating system threads.
// We probably don't need half of those for file removals at
// any given point in time, unless the disk/fs can't keep up.
// I suppose it's better to slow down processing than to crash
// when hitting the 10k limit or to run out of disk space.
// Violating that limit would result in a crash and therefore we use
// a semaphore to throttle amount of concurrently running blocking
// file syscalls. A semaphore weight of 5,000 should give plenty of
// margin. The weight should not be set too low because the
// average latency could increase if a few slow clients could block
// all other clients.
semaphoreWeight := int64(5000)

if strings.HasPrefix(runtime.GOOS, "darwin") {
// Mac seems to fail to create os threads when removing
// lots of files, so allow fewer than linux.
semaphoreWeight = 3000
}
log.Printf("Limiting concurrent file removals to %d\n", semaphoreWeight)
log.Printf("Limiting concurrent disk waiting requests to %d\n", semaphoreWeight)

zi, err := zstdimpl.Get("go")
if err != nil {
Expand All @@ -70,7 +66,11 @@ func New(dir string, maxSizeBytes int64, opts ...Option) (Cache, error) {
maxBlobSize: math.MaxInt64,
maxProxyBlobSize: math.MaxInt64,

fileRemovalSem: semaphore.NewWeighted(semaphoreWeight),
// Acquire 1 of these before starting filesystem writes/deletes, or
// reject filesystem writes upon failure (since this will create a
// new OS thread and we don't want to hit Go's default 10,000 OS
// thread limit.
diskWaitSem: semaphore.NewWeighted(semaphoreWeight),

gaugeCacheAge: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "bazel_remote_disk_cache_longest_item_idle_time_seconds",
Expand Down Expand Up @@ -112,7 +112,7 @@ func New(dir string, maxSizeBytes int64, opts ...Option) (Cache, error) {
if err != nil {
return nil, fmt.Errorf("Attempting to migrate the old directory structure failed: %w", err)
}
err = c.loadExistingFiles(maxSizeBytes)
err = c.loadExistingFiles(maxSizeBytes, cc)
if err != nil {
return nil, fmt.Errorf("Loading of existing cache entries failed due to error: %w", err)
}
Expand Down Expand Up @@ -537,7 +537,7 @@ func (c *diskCache) scanDir() (scanResult, error) {
// loadExistingFiles lists all files in the cache directory, and adds them to the
// LRU index so that they can be served. Files are sorted by access time first,
// so that the eviction behavior is preserved across server restarts.
func (c *diskCache) loadExistingFiles(maxSizeBytes int64) error {
func (c *diskCache) loadExistingFiles(maxSizeBytes int64, cc CacheConfig) error {
log.Printf("Loading existing files in %s.\n", c.dir)

result, err := c.scanDir()
Expand All @@ -550,18 +550,40 @@ func (c *diskCache) loadExistingFiles(maxSizeBytes int64) error {
sort.Sort(result)

// The eviction callback deletes the file from disk.
// This function is only called while the lock is held
// This function is only called while the lock is not held
// by the current goroutine.
onEvict := func(key Key, value lruItem) {
f := c.getElementPath(key, value)
// Run in a goroutine so we can release the lock sooner.
go c.removeFile(f)
c.removeFile(f)
}

log.Println("Building LRU index.")

c.lru = NewSizedLRU(maxSizeBytes, onEvict, len(result.item))

log.Printf("Will evict at max_size: %.2f GB", bytesToGigaBytes(maxSizeBytes))

if cc.diskSizeLimit > 0 {
// Only set and print if optional limit is enabled.
c.lru.diskSizeLimit = cc.diskSizeLimit
log.Printf("Will reject at disk_size_limit: %.2f GB",
bytesToGigaBytes(c.lru.diskSizeLimit))
}

// Start one single goroutine running in background, continuously
// waiting for files to be removed and removing them. Benchmarks on
// Linux with XFS file system have surprisingly shown that removal
// sequentially with a single goroutine is much faster than starting
// separate go routines for each file and removing them in parallel
// despite SSDs with high IOPS performance. Benchmarks have also shown
// that the single background goroutine is still slightly faster even
// if the parallel goroutines would be serialized with a semaphore.
// Sequentially evicting all files helps ensure that Go’s default
// limit of 10,000 operating system threads is not violated. Otherwise,
// the number of concurrent removals could explode when a large new
// file suddenly evicts thousands of old small files.
go c.lru.performQueuedEvictionsContinuously()

for i := 0; i < len(result.item); i++ {
ok := c.lru.Add(result.metadata[i].lookupKey, *result.item[i])
if !ok {
Expand All @@ -572,7 +594,19 @@ func (c *diskCache) loadExistingFiles(maxSizeBytes int64) error {
}
}

// Printing progress gives awareness about slow operations.
// And waiting for evictions to complete before accepting client
// connection reduce risk for confusing overload errors at runtime.
log.Println("Waiting for evictions...")
for c.lru.queuedEvictionsSize.Load() > 0 {
time.Sleep(200 * time.Millisecond)
}

log.Println("Finished loading disk cache files.")

return nil
}

func bytesToGigaBytes(bytes int64) float64 {
return float64(bytes) / (1024.0 * 1024.0 * 1024.0)
}
Loading