diff --git a/cache/disk/disk.go b/cache/disk/disk.go index c955a07f1..fdc507832 100644 --- a/cache/disk/disk.go +++ b/cache/disk/disk.go @@ -77,8 +77,9 @@ type diskCache struct { accessLogger *log.Logger containsQueue chan proxyCheck - // Limit the number of simultaneous file removals. - fileRemovalSem *semaphore.Weighted + // Limit the number of simultaneous file removals and filesystem write + // operations (apart from atime updates, which we hope are fast). + diskWaitSem *semaphore.Weighted mu sync.Mutex lru SizedLRU @@ -103,6 +104,11 @@ func badReqErr(format string, a ...interface{}) *cache.Error { } } +var ErrOverloaded = &cache.Error{ + Code: http.StatusServiceUnavailable, // Too many requests/disk overloaded. + Text: "Too many requests/disk overloaded (please try again later)", +} + // Non-test users must call this to expose metrics. func (c *diskCache) RegisterMetrics() { c.lru.RegisterMetrics() @@ -167,11 +173,11 @@ func (c *diskCache) getElementPath(key Key, value lruItem) string { } func (c *diskCache) removeFile(f string) { - if err := c.fileRemovalSem.Acquire(context.Background(), 1); err != nil { + if err := c.diskWaitSem.Acquire(context.Background(), 1); err != nil { log.Printf("ERROR: failed to aquire semaphore: %v, unable to remove %s", err, f) return } - defer c.fileRemovalSem.Release(1) + defer c.diskWaitSem.Release(1) err := os.Remove(f) if err != nil { @@ -240,6 +246,12 @@ func (c *diskCache) Put(ctx context.Context, kind cache.EntryKind, hash string, return nil } + if !c.diskWaitSem.TryAcquire(1) { + // We are probably overloaded, and want to avoid hitting Go's default 10,000 max OS thread limit. + return ErrOverloaded + } + defer c.diskWaitSem.Release(1) + key := cache.LookupKey(kind, hash) var tf *os.File // Tempfile. diff --git a/cache/disk/load.go b/cache/disk/load.go index 0ffb4652a..f0682ad70 100644 --- a/cache/disk/load.go +++ b/cache/disk/load.go @@ -70,7 +70,11 @@ func New(dir string, maxSizeBytes int64, opts ...Option) (Cache, error) { maxBlobSize: math.MaxInt64, maxProxyBlobSize: math.MaxInt64, - fileRemovalSem: semaphore.NewWeighted(semaphoreWeight), + // Acquire 1 of these before starting filesystem writes/deletes, or + // reject filesystem writes upon failure (since this will create a + // new OS thread and we don't want to hit Go's default 10,000 OS + // thread limit. + diskWaitSem: semaphore.NewWeighted(semaphoreWeight), gaugeCacheAge: prometheus.NewGauge(prometheus.GaugeOpts{ Name: "bazel_remote_disk_cache_longest_item_idle_time_seconds", diff --git a/server/grpc.go b/server/grpc.go index 42eefb262..1b5fa3f41 100644 --- a/server/grpc.go +++ b/server/grpc.go @@ -238,6 +238,10 @@ func gRPCErrCode(err error, dflt codes.Code) codes.Code { return codes.OK } + if err == disk.ErrOverloaded { + return codes.ResourceExhausted + } + cerr, ok := err.(*cache.Error) if ok && cerr.Code == http.StatusBadRequest { return codes.InvalidArgument diff --git a/server/grpc_ac.go b/server/grpc_ac.go index a5256522a..0dea2a375 100644 --- a/server/grpc_ac.go +++ b/server/grpc_ac.go @@ -171,6 +171,12 @@ func (s *grpcServer) maybeInline(ctx context.Context, inline bool, slice *[]byte return nil // Not inlined, nothing to do. } + if (*digest).SizeBytes <= 0 { + // Unexpected corner case? + *slice = []byte{} + return nil + } + if *digest == nil { hash := sha256.Sum256(*slice) *digest = &pb.Digest{ @@ -183,10 +189,10 @@ func (s *grpcServer) maybeInline(ctx context.Context, inline bool, slice *[]byte if !found { err := s.cache.Put(ctx, cache.CAS, (*digest).Hash, (*digest).SizeBytes, bytes.NewReader(*slice)) - if err != nil && err != io.EOF { - return err + if err == nil || err == io.EOF { + s.accessLogger.Printf("GRPC CAS PUT %s OK", (*digest).Hash) } - s.accessLogger.Printf("GRPC CAS PUT %s OK", (*digest).Hash) + // De-inline failed (possibly due to "resource overload"), that's OK though. } *slice = []byte{} diff --git a/server/grpc_asset.go b/server/grpc_asset.go index 9a51436e6..ec187454c 100644 --- a/server/grpc_asset.go +++ b/server/grpc_asset.go @@ -6,6 +6,7 @@ import ( "crypto/sha256" "encoding/base64" "encoding/hex" + "fmt" "io" "net/http" "net/url" @@ -19,6 +20,7 @@ import ( pb "github.com/buchgr/bazel-remote/genproto/build/bazel/remote/execution/v2" "github.com/buchgr/bazel-remote/cache" + "github.com/buchgr/bazel-remote/cache/disk" ) // FetchServer implementation @@ -26,6 +28,13 @@ import ( var errNilFetchBlobRequest = grpc_status.Error(codes.InvalidArgument, "expected a non-nil *FetchBlobRequest") +var resourceExhaustedResponse = asset.FetchBlobResponse{ + Status: &status.Status{ + Code: int32(codes.ResourceExhausted), + Message: "Storage appears to be falling behind", + }, +} + func (s *grpcServer) FetchBlob(ctx context.Context, req *asset.FetchBlobRequest) (*asset.FetchBlobResponse, error) { var sha256Str string @@ -114,8 +123,8 @@ func (s *grpcServer) FetchBlob(ctx context.Context, req *asset.FetchBlobRequest) // See if we can download one of the URIs. for _, uri := range req.GetUris() { - ok, actualHash, size := s.fetchItem(ctx, uri, sha256Str) - if ok { + actualHash, size, err := s.fetchItem(ctx, uri, sha256Str) + if err == nil { return &asset.FetchBlobResponse{ Status: &status.Status{Code: int32(codes.OK)}, BlobDigest: &pb.Digest{ @@ -126,6 +135,10 @@ func (s *grpcServer) FetchBlob(ctx context.Context, req *asset.FetchBlobRequest) }, nil } + if err == disk.ErrOverloaded { + return &resourceExhaustedResponse, nil + } + // Not a simple file. Not yet handled... } @@ -134,29 +147,29 @@ func (s *grpcServer) FetchBlob(ctx context.Context, req *asset.FetchBlobRequest) }, nil } -func (s *grpcServer) fetchItem(ctx context.Context, uri string, expectedHash string) (bool, string, int64) { +func (s *grpcServer) fetchItem(ctx context.Context, uri string, expectedHash string) (string, int64, error) { u, err := url.Parse(uri) if err != nil { s.errorLogger.Printf("unable to parse URI: %s err: %v", uri, err) - return false, "", int64(-1) + return "", int64(-1), err } if u.Scheme != "http" && u.Scheme != "https" { s.errorLogger.Printf("unsupported URI: %s", uri) - return false, "", int64(-1) + return "", int64(-1), fmt.Errorf("Unknown URL scheme: %q", u.Scheme) } resp, err := http.Get(uri) if err != nil { s.errorLogger.Printf("failed to get URI: %s err: %v", uri, err) - return false, "", int64(-1) + return "", int64(-1), err } defer resp.Body.Close() rc := resp.Body s.accessLogger.Printf("GRPC ASSET FETCH %s %s", uri, resp.Status) if resp.StatusCode < 200 || resp.StatusCode >= 300 { - return false, "", int64(-1) + return "", int64(-1), fmt.Errorf("Unsuccessful HTTP status code: %d", resp.StatusCode) } expectedSize := resp.ContentLength @@ -166,7 +179,7 @@ func (s *grpcServer) fetchItem(ctx context.Context, uri string, expectedHash str data, err := io.ReadAll(resp.Body) if err != nil { s.errorLogger.Printf("failed to read data: %v", uri) - return false, "", int64(-1) + return "", int64(-1), err } expectedSize = int64(len(data)) @@ -176,7 +189,7 @@ func (s *grpcServer) fetchItem(ctx context.Context, uri string, expectedHash str if expectedHash != "" && hashStr != expectedHash { s.errorLogger.Printf("URI data has hash %s, expected %s", hashStr, expectedHash) - return false, "", int64(-1) + return "", int64(-1), fmt.Errorf("URI data has hash %s, expected %s", hashStr, expectedHash) } expectedHash = hashStr @@ -186,10 +199,10 @@ func (s *grpcServer) fetchItem(ctx context.Context, uri string, expectedHash str err = s.cache.Put(ctx, cache.CAS, expectedHash, expectedSize, rc) if err != nil && err != io.EOF { s.errorLogger.Printf("failed to Put %s: %v", expectedHash, err) - return false, "", int64(-1) + return "", int64(-1), err } - return true, expectedHash, expectedSize + return expectedHash, expectedSize, nil } func (s *grpcServer) FetchDirectory(context.Context, *asset.FetchDirectoryRequest) (*asset.FetchDirectoryResponse, error) { diff --git a/server/grpc_bytestream.go b/server/grpc_bytestream.go index d9b3d8e49..4951eee0b 100644 --- a/server/grpc_bytestream.go +++ b/server/grpc_bytestream.go @@ -544,6 +544,7 @@ func (s *grpcServer) Write(srv bytestream.ByteStream_WriteServer) error { } return nil } + if err == nil { // Unexpected early return. Should not happen. msg := fmt.Sprintf("GRPC BYTESTREAM WRITE INTERNAL ERROR %s", resourceName) @@ -553,7 +554,7 @@ func (s *grpcServer) Write(srv bytestream.ByteStream_WriteServer) error { msg := fmt.Sprintf("GRPC BYTESTREAM WRITE CACHE ERROR: %s %v", resourceName, err) s.accessLogger.Printf(msg) - return status.Error(codes.Internal, msg) + return status.Error(gRPCErrCode(err, codes.Internal), msg) } select {