Skip to content

Commit

Permalink
Reject high-cost requests instead of creating more OS threads when ov…
Browse files Browse the repository at this point in the history
…erloaded

We have been using a file removal semaphore with weight 5,000 (half of Go's
default 10,000 maximum OS threads, beyond which Go will crash), in an attempt
to avoid crashing when the filesystem/storage layer can't keep up with our
requirements.

This change renames that semaphore to `diskWaitSem` and also uses it for
disk-write operations. When the semaphore cannot be acquired for disk-writes,
we return HTTP 503 (service unavailable) or gRPC RESOURCE_EXHAUSTED error codes
to the client.

Relates to buchgr#638
  • Loading branch information
mostynb authored and ulrfa committed Sep 1, 2023
1 parent b899a9d commit 1bc4fc9
Show file tree
Hide file tree
Showing 6 changed files with 60 additions and 20 deletions.
20 changes: 16 additions & 4 deletions cache/disk/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,9 @@ type diskCache struct {
accessLogger *log.Logger
containsQueue chan proxyCheck

// Limit the number of simultaneous file removals.
fileRemovalSem *semaphore.Weighted
// Limit the number of simultaneous file removals and filesystem write
// operations (apart from atime updates, which we hope are fast).
diskWaitSem *semaphore.Weighted

mu sync.Mutex
lru SizedLRU
Expand All @@ -103,6 +104,11 @@ func badReqErr(format string, a ...interface{}) *cache.Error {
}
}

var ErrOverloaded = &cache.Error{
Code: http.StatusServiceUnavailable, // Too many requests/disk overloaded.
Text: "Too many requests/disk overloaded (please try again later)",
}

// Non-test users must call this to expose metrics.
func (c *diskCache) RegisterMetrics() {
c.lru.RegisterMetrics()
Expand Down Expand Up @@ -167,11 +173,11 @@ func (c *diskCache) getElementPath(key Key, value lruItem) string {
}

func (c *diskCache) removeFile(f string) {
if err := c.fileRemovalSem.Acquire(context.Background(), 1); err != nil {
if err := c.diskWaitSem.Acquire(context.Background(), 1); err != nil {
log.Printf("ERROR: failed to aquire semaphore: %v, unable to remove %s", err, f)
return
}
defer c.fileRemovalSem.Release(1)
defer c.diskWaitSem.Release(1)

err := os.Remove(f)
if err != nil {
Expand Down Expand Up @@ -240,6 +246,12 @@ func (c *diskCache) Put(ctx context.Context, kind cache.EntryKind, hash string,
return nil
}

if !c.diskWaitSem.TryAcquire(1) {
// We are probably overloaded, and want to avoid hitting Go's default 10,000 max OS thread limit.
return ErrOverloaded
}
defer c.diskWaitSem.Release(1)

key := cache.LookupKey(kind, hash)

var tf *os.File // Tempfile.
Expand Down
6 changes: 5 additions & 1 deletion cache/disk/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,11 @@ func New(dir string, maxSizeBytes int64, opts ...Option) (Cache, error) {
maxBlobSize: math.MaxInt64,
maxProxyBlobSize: math.MaxInt64,

fileRemovalSem: semaphore.NewWeighted(semaphoreWeight),
// Acquire 1 of these before starting filesystem writes/deletes, or
// reject filesystem writes upon failure (since this will create a
// new OS thread and we don't want to hit Go's default 10,000 OS
// thread limit.
diskWaitSem: semaphore.NewWeighted(semaphoreWeight),

gaugeCacheAge: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "bazel_remote_disk_cache_longest_item_idle_time_seconds",
Expand Down
4 changes: 4 additions & 0 deletions server/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,10 @@ func gRPCErrCode(err error, dflt codes.Code) codes.Code {
return codes.OK
}

if err == disk.ErrOverloaded {
return codes.ResourceExhausted
}

cerr, ok := err.(*cache.Error)
if ok && cerr.Code == http.StatusBadRequest {
return codes.InvalidArgument
Expand Down
12 changes: 9 additions & 3 deletions server/grpc_ac.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,12 @@ func (s *grpcServer) maybeInline(ctx context.Context, inline bool, slice *[]byte
return nil // Not inlined, nothing to do.
}

if (*digest).SizeBytes <= 0 {
// Unexpected corner case?
*slice = []byte{}
return nil
}

if *digest == nil {
hash := sha256.Sum256(*slice)
*digest = &pb.Digest{
Expand All @@ -183,10 +189,10 @@ func (s *grpcServer) maybeInline(ctx context.Context, inline bool, slice *[]byte
if !found {
err := s.cache.Put(ctx, cache.CAS, (*digest).Hash, (*digest).SizeBytes,
bytes.NewReader(*slice))
if err != nil && err != io.EOF {
return err
if err == nil || err == io.EOF {
s.accessLogger.Printf("GRPC CAS PUT %s OK", (*digest).Hash)
}
s.accessLogger.Printf("GRPC CAS PUT %s OK", (*digest).Hash)
// De-inline failed (possibly due to "resource overload"), that's OK though.
}

*slice = []byte{}
Expand Down
35 changes: 24 additions & 11 deletions server/grpc_asset.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"crypto/sha256"
"encoding/base64"
"encoding/hex"
"fmt"
"io"
"net/http"
"net/url"
Expand All @@ -19,13 +20,21 @@ import (
pb "github.com/buchgr/bazel-remote/genproto/build/bazel/remote/execution/v2"

"github.com/buchgr/bazel-remote/cache"
"github.com/buchgr/bazel-remote/cache/disk"
)

// FetchServer implementation

var errNilFetchBlobRequest = grpc_status.Error(codes.InvalidArgument,
"expected a non-nil *FetchBlobRequest")

var resourceExhaustedResponse = asset.FetchBlobResponse{
Status: &status.Status{
Code: int32(codes.ResourceExhausted),
Message: "Storage appears to be falling behind",
},
}

func (s *grpcServer) FetchBlob(ctx context.Context, req *asset.FetchBlobRequest) (*asset.FetchBlobResponse, error) {

var sha256Str string
Expand Down Expand Up @@ -114,8 +123,8 @@ func (s *grpcServer) FetchBlob(ctx context.Context, req *asset.FetchBlobRequest)
// See if we can download one of the URIs.

for _, uri := range req.GetUris() {
ok, actualHash, size := s.fetchItem(ctx, uri, sha256Str)
if ok {
actualHash, size, err := s.fetchItem(ctx, uri, sha256Str)
if err == nil {
return &asset.FetchBlobResponse{
Status: &status.Status{Code: int32(codes.OK)},
BlobDigest: &pb.Digest{
Expand All @@ -126,6 +135,10 @@ func (s *grpcServer) FetchBlob(ctx context.Context, req *asset.FetchBlobRequest)
}, nil
}

if err == disk.ErrOverloaded {
return &resourceExhaustedResponse, nil
}

// Not a simple file. Not yet handled...
}

Expand All @@ -134,29 +147,29 @@ func (s *grpcServer) FetchBlob(ctx context.Context, req *asset.FetchBlobRequest)
}, nil
}

func (s *grpcServer) fetchItem(ctx context.Context, uri string, expectedHash string) (bool, string, int64) {
func (s *grpcServer) fetchItem(ctx context.Context, uri string, expectedHash string) (string, int64, error) {
u, err := url.Parse(uri)
if err != nil {
s.errorLogger.Printf("unable to parse URI: %s err: %v", uri, err)
return false, "", int64(-1)
return "", int64(-1), err
}

if u.Scheme != "http" && u.Scheme != "https" {
s.errorLogger.Printf("unsupported URI: %s", uri)
return false, "", int64(-1)
return "", int64(-1), fmt.Errorf("Unknown URL scheme: %q", u.Scheme)
}

resp, err := http.Get(uri)
if err != nil {
s.errorLogger.Printf("failed to get URI: %s err: %v", uri, err)
return false, "", int64(-1)
return "", int64(-1), err
}
defer resp.Body.Close()
rc := resp.Body

s.accessLogger.Printf("GRPC ASSET FETCH %s %s", uri, resp.Status)
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return false, "", int64(-1)
return "", int64(-1), fmt.Errorf("Unsuccessful HTTP status code: %d", resp.StatusCode)
}

expectedSize := resp.ContentLength
Expand All @@ -166,7 +179,7 @@ func (s *grpcServer) fetchItem(ctx context.Context, uri string, expectedHash str
data, err := io.ReadAll(resp.Body)
if err != nil {
s.errorLogger.Printf("failed to read data: %v", uri)
return false, "", int64(-1)
return "", int64(-1), err
}

expectedSize = int64(len(data))
Expand All @@ -176,7 +189,7 @@ func (s *grpcServer) fetchItem(ctx context.Context, uri string, expectedHash str
if expectedHash != "" && hashStr != expectedHash {
s.errorLogger.Printf("URI data has hash %s, expected %s",
hashStr, expectedHash)
return false, "", int64(-1)
return "", int64(-1), fmt.Errorf("URI data has hash %s, expected %s", hashStr, expectedHash)
}

expectedHash = hashStr
Expand All @@ -186,10 +199,10 @@ func (s *grpcServer) fetchItem(ctx context.Context, uri string, expectedHash str
err = s.cache.Put(ctx, cache.CAS, expectedHash, expectedSize, rc)
if err != nil && err != io.EOF {
s.errorLogger.Printf("failed to Put %s: %v", expectedHash, err)
return false, "", int64(-1)
return "", int64(-1), err
}

return true, expectedHash, expectedSize
return expectedHash, expectedSize, nil
}

func (s *grpcServer) FetchDirectory(context.Context, *asset.FetchDirectoryRequest) (*asset.FetchDirectoryResponse, error) {
Expand Down
3 changes: 2 additions & 1 deletion server/grpc_bytestream.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,7 @@ func (s *grpcServer) Write(srv bytestream.ByteStream_WriteServer) error {
}
return nil
}

if err == nil {
// Unexpected early return. Should not happen.
msg := fmt.Sprintf("GRPC BYTESTREAM WRITE INTERNAL ERROR %s", resourceName)
Expand All @@ -553,7 +554,7 @@ func (s *grpcServer) Write(srv bytestream.ByteStream_WriteServer) error {

msg := fmt.Sprintf("GRPC BYTESTREAM WRITE CACHE ERROR: %s %v", resourceName, err)
s.accessLogger.Printf(msg)
return status.Error(codes.Internal, msg)
return status.Error(gRPCErrCode(err, codes.Internal), msg)
}

select {
Expand Down

0 comments on commit 1bc4fc9

Please sign in to comment.