Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reject high-cost requests instead of creating more OS threads when overloaded #639

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions cache/disk/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,9 @@ type diskCache struct {
accessLogger *log.Logger
containsQueue chan proxyCheck

// Limit the number of simultaneous file removals.
fileRemovalSem *semaphore.Weighted
// Limit the number of simultaneous file removals and filesystem write
// operations (apart from atime updates, which we hope are fast).
diskWaitSem *semaphore.Weighted

mu sync.Mutex
lru SizedLRU
Expand All @@ -103,6 +104,11 @@ func badReqErr(format string, a ...interface{}) *cache.Error {
}
}

var ErrOverloaded = &cache.Error{
Code: http.StatusServiceUnavailable, // Too many requests/disk overloaded.
Text: "Too many requests/disk overloaded (please try again later)",
}

// Non-test users must call this to expose metrics.
func (c *diskCache) RegisterMetrics() {
c.lru.RegisterMetrics()
Expand Down Expand Up @@ -167,11 +173,11 @@ func (c *diskCache) getElementPath(key Key, value lruItem) string {
}

func (c *diskCache) removeFile(f string) {
if err := c.fileRemovalSem.Acquire(context.Background(), 1); err != nil {
if err := c.diskWaitSem.Acquire(context.Background(), 1); err != nil {
log.Printf("ERROR: failed to aquire semaphore: %v, unable to remove %s", err, f)
return
}
defer c.fileRemovalSem.Release(1)
defer c.diskWaitSem.Release(1)

err := os.Remove(f)
if err != nil {
Expand Down Expand Up @@ -240,6 +246,12 @@ func (c *diskCache) Put(ctx context.Context, kind cache.EntryKind, hash string,
return nil
}

if !c.diskWaitSem.TryAcquire(1) {
// We are probably overloaded, and want to avoid hitting Go's default 10,000 max OS thread limit.
return ErrOverloaded
}
defer c.diskWaitSem.Release(1)

key := cache.LookupKey(kind, hash)

var tf *os.File // Tempfile.
Expand Down
6 changes: 5 additions & 1 deletion cache/disk/load.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,11 @@ func New(dir string, maxSizeBytes int64, opts ...Option) (Cache, error) {
maxBlobSize: math.MaxInt64,
maxProxyBlobSize: math.MaxInt64,

fileRemovalSem: semaphore.NewWeighted(semaphoreWeight),
// Acquire 1 of these before starting filesystem writes/deletes, or
// reject filesystem writes upon failure (since this will create a
// new OS thread and we don't want to hit Go's default 10,000 OS
// thread limit.
diskWaitSem: semaphore.NewWeighted(semaphoreWeight),

gaugeCacheAge: prometheus.NewGauge(prometheus.GaugeOpts{
Name: "bazel_remote_disk_cache_longest_item_idle_time_seconds",
Expand Down
4 changes: 4 additions & 0 deletions server/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,10 @@ func gRPCErrCode(err error, dflt codes.Code) codes.Code {
return codes.OK
}

if err == disk.ErrOverloaded {
return codes.ResourceExhausted
}

cerr, ok := err.(*cache.Error)
if ok && cerr.Code == http.StatusBadRequest {
return codes.InvalidArgument
Expand Down
12 changes: 9 additions & 3 deletions server/grpc_ac.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,12 @@ func (s *grpcServer) maybeInline(ctx context.Context, inline bool, slice *[]byte
return nil // Not inlined, nothing to do.
}

if (*digest).SizeBytes <= 0 {
// Unexpected corner case?
*slice = []byte{}
return nil
}

if *digest == nil {
hash := sha256.Sum256(*slice)
*digest = &pb.Digest{
Expand All @@ -183,10 +189,10 @@ func (s *grpcServer) maybeInline(ctx context.Context, inline bool, slice *[]byte
if !found {
err := s.cache.Put(ctx, cache.CAS, (*digest).Hash, (*digest).SizeBytes,
bytes.NewReader(*slice))
if err != nil && err != io.EOF {
return err
if err == nil || err == io.EOF {
s.accessLogger.Printf("GRPC CAS PUT %s OK", (*digest).Hash)
}
s.accessLogger.Printf("GRPC CAS PUT %s OK", (*digest).Hash)
// De-inline failed (possibly due to "resource overload"), that's OK though.
}

*slice = []byte{}
Expand Down
35 changes: 24 additions & 11 deletions server/grpc_asset.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"crypto/sha256"
"encoding/base64"
"encoding/hex"
"fmt"
"io"
"net/http"
"net/url"
Expand All @@ -19,13 +20,21 @@ import (
pb "github.com/buchgr/bazel-remote/genproto/build/bazel/remote/execution/v2"

"github.com/buchgr/bazel-remote/cache"
"github.com/buchgr/bazel-remote/cache/disk"
)

// FetchServer implementation

var errNilFetchBlobRequest = grpc_status.Error(codes.InvalidArgument,
"expected a non-nil *FetchBlobRequest")

var resourceExhaustedResponse = asset.FetchBlobResponse{
Status: &status.Status{
Code: int32(codes.ResourceExhausted),
Message: "Storage appears to be falling behind",
},
}

func (s *grpcServer) FetchBlob(ctx context.Context, req *asset.FetchBlobRequest) (*asset.FetchBlobResponse, error) {

var sha256Str string
Expand Down Expand Up @@ -114,8 +123,8 @@ func (s *grpcServer) FetchBlob(ctx context.Context, req *asset.FetchBlobRequest)
// See if we can download one of the URIs.

for _, uri := range req.GetUris() {
ok, actualHash, size := s.fetchItem(ctx, uri, sha256Str)
if ok {
actualHash, size, err := s.fetchItem(ctx, uri, sha256Str)
if err == nil {
return &asset.FetchBlobResponse{
Status: &status.Status{Code: int32(codes.OK)},
BlobDigest: &pb.Digest{
Expand All @@ -126,6 +135,10 @@ func (s *grpcServer) FetchBlob(ctx context.Context, req *asset.FetchBlobRequest)
}, nil
}

if err == disk.ErrOverloaded {
return &resourceExhaustedResponse, nil
}

// Not a simple file. Not yet handled...
}

Expand All @@ -134,29 +147,29 @@ func (s *grpcServer) FetchBlob(ctx context.Context, req *asset.FetchBlobRequest)
}, nil
}

func (s *grpcServer) fetchItem(ctx context.Context, uri string, expectedHash string) (bool, string, int64) {
func (s *grpcServer) fetchItem(ctx context.Context, uri string, expectedHash string) (string, int64, error) {
u, err := url.Parse(uri)
if err != nil {
s.errorLogger.Printf("unable to parse URI: %s err: %v", uri, err)
return false, "", int64(-1)
return "", int64(-1), err
}

if u.Scheme != "http" && u.Scheme != "https" {
s.errorLogger.Printf("unsupported URI: %s", uri)
return false, "", int64(-1)
return "", int64(-1), fmt.Errorf("Unknown URL scheme: %q", u.Scheme)
}

resp, err := http.Get(uri)
if err != nil {
s.errorLogger.Printf("failed to get URI: %s err: %v", uri, err)
return false, "", int64(-1)
return "", int64(-1), err
}
defer resp.Body.Close()
rc := resp.Body

s.accessLogger.Printf("GRPC ASSET FETCH %s %s", uri, resp.Status)
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
return false, "", int64(-1)
return "", int64(-1), fmt.Errorf("Unsuccessful HTTP status code: %d", resp.StatusCode)
}

expectedSize := resp.ContentLength
Expand All @@ -166,7 +179,7 @@ func (s *grpcServer) fetchItem(ctx context.Context, uri string, expectedHash str
data, err := io.ReadAll(resp.Body)
if err != nil {
s.errorLogger.Printf("failed to read data: %v", uri)
return false, "", int64(-1)
return "", int64(-1), err
}

expectedSize = int64(len(data))
Expand All @@ -176,7 +189,7 @@ func (s *grpcServer) fetchItem(ctx context.Context, uri string, expectedHash str
if expectedHash != "" && hashStr != expectedHash {
s.errorLogger.Printf("URI data has hash %s, expected %s",
hashStr, expectedHash)
return false, "", int64(-1)
return "", int64(-1), fmt.Errorf("URI data has hash %s, expected %s", hashStr, expectedHash)
}

expectedHash = hashStr
Expand All @@ -186,10 +199,10 @@ func (s *grpcServer) fetchItem(ctx context.Context, uri string, expectedHash str
err = s.cache.Put(ctx, cache.CAS, expectedHash, expectedSize, rc)
if err != nil && err != io.EOF {
s.errorLogger.Printf("failed to Put %s: %v", expectedHash, err)
return false, "", int64(-1)
return "", int64(-1), err
}

return true, expectedHash, expectedSize
return expectedHash, expectedSize, nil
}

func (s *grpcServer) FetchDirectory(context.Context, *asset.FetchDirectoryRequest) (*asset.FetchDirectoryResponse, error) {
Expand Down
3 changes: 2 additions & 1 deletion server/grpc_bytestream.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,6 +544,7 @@ func (s *grpcServer) Write(srv bytestream.ByteStream_WriteServer) error {
}
return nil
}

if err == nil {
// Unexpected early return. Should not happen.
msg := fmt.Sprintf("GRPC BYTESTREAM WRITE INTERNAL ERROR %s", resourceName)
Expand All @@ -553,7 +554,7 @@ func (s *grpcServer) Write(srv bytestream.ByteStream_WriteServer) error {

msg := fmt.Sprintf("GRPC BYTESTREAM WRITE CACHE ERROR: %s %v", resourceName, err)
s.accessLogger.Printf(msg)
return status.Error(codes.Internal, msg)
return status.Error(gRPCErrCode(err, codes.Internal), msg)
}

select {
Expand Down