Skip to content

Commit

Permalink
Make elan read from redis (and a bunch of other improvements to redis…
Browse files Browse the repository at this point in the history
…). (#307)

* Make elan read from redis.

* Move redis flags to a common package.

* Add redis flags to elan.

* Implement redis.Limiter and use that with the clients.

* Allow setting redis MaxSize in flags/environment.

* Bump version.

* Add redis.DefaultMaxSize.

* Rename flags to opts.
  • Loading branch information
fische authored May 2, 2024
1 parent 947353e commit f31b091
Show file tree
Hide file tree
Showing 17 changed files with 256 additions and 134 deletions.
7 changes: 7 additions & 0 deletions ChangeLog
Original file line number Diff line number Diff line change
@@ -1,3 +1,10 @@
Version 11.10.0
--------------
* Move redis flags to a common package.
* Implement redis.Limiter and use that with the redis clients.
* Allow setting redis MaxSize in flags/environment.
* Make elan read from redis.

Version 11.9.12
--------------
* Make BatchDownload honour maxSize when uploading to redis.
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
11.9.12
11.10.0
1 change: 1 addition & 0 deletions elan/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ go_binary(
"//cli",
"//elan/rpc",
"//grpcutil",
"//redis",
],
)

Expand Down
5 changes: 4 additions & 1 deletion elan/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/thought-machine/please-servers/cli"
"github.com/thought-machine/please-servers/elan/rpc"
"github.com/thought-machine/please-servers/grpcutil"
"github.com/thought-machine/please-servers/redis"
)

var opts = struct {
Expand All @@ -18,6 +19,7 @@ var opts = struct {
DirCacheSize int64 `long:"dir_cache_size" default:"10240" description:"Number of directory entries to cache for GetTree"`
KnownBlobCacheSize flags.ByteSize `long:"known_blob_cache_size" description:"Max size of known blob cache (in approximate bytes)"`
Admin cli.AdminOpts `group:"Options controlling HTTP admin server" namespace:"admin"`
Redis redis.Opts `group:"Options controlling connection to Redis" namespace:"redis"`
}{
Usage: `
Elan is an implementation of the content-addressable storage and action cache services
Expand All @@ -33,6 +35,7 @@ modes are intended for testing only.

func main() {
_, info := cli.ParseFlagsOrDie("Elan", &opts, &opts.Logging)
_, readRedis := opts.Redis.Clients()
go cli.ServeAdmin("elan", opts.Admin, info)
rpc.ServeForever(opts.GRPC, opts.Storage, opts.Parallelism, opts.DirCacheSize, int64(opts.KnownBlobCacheSize))
rpc.ServeForever(opts.GRPC, opts.Storage, opts.Parallelism, opts.DirCacheSize, int64(opts.KnownBlobCacheSize), readRedis, opts.Redis.MaxSize)
}
2 changes: 2 additions & 0 deletions elan/rpc/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ go_library(
"///third_party/go/github.com_bazelbuild_remote-apis//build/bazel/remote/execution/v2",
"///third_party/go/github.com_bazelbuild_remote-apis//build/bazel/semver",
"///third_party/go/github.com_dgraph-io_ristretto//:ristretto",
"///third_party/go/github.com_go-redis_redis_v8//:v8",
"///third_party/go/github.com_golang_protobuf//proto",
"///third_party/go/github.com_hashicorp_go-multierror//:go-multierror",
"///third_party/go/github.com_klauspost_compress//zstd",
Expand All @@ -34,6 +35,7 @@ go_library(
"///third_party/go/google.golang.org_grpc//health/grpc_health_v1",
"//grpcutil",
"//proto/purity",
"//redis",
"//rexclient",
"//third_party/go:grpc",
],
Expand Down
2 changes: 1 addition & 1 deletion elan/rpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func New(url string, tls bool, tokenFile string) (Client, error) {
// We can't use url.Parse here because it tends to put too much into the scheme (e.g. example.org:8080 -> scheme:example.org)
if strings.Contains(url, "://") {
return &elanClient{
s: createServer(url, 8, 10240, 10*1024*1024),
s: createServer(url, 8, 10240, 10*1024*1024, nil, 0),
timeout: 3 * time.Minute,
}, nil
}
Expand Down
2 changes: 1 addition & 1 deletion elan/rpc/compression_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@ func testMain(m *testing.M) int {
lis, s := startServer(grpcutil.Opts{
Host: "127.0.0.1",
Port: 7777,
}, storage, 5, 1000, 1000)
}, storage, 5, 1000, 1000, nil, 0)
go grpcutil.ServeForever(lis, s)
defer s.Stop()

Expand Down
2 changes: 1 addition & 1 deletion elan/rpc/eclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (e *elanClient) WriteBlob(b []byte) (*pb.Digest, error) {
key := e.s.compressedKey("cas", dg, compressed)
ctx, cancel := context.WithTimeout(context.Background(), e.timeout)
defer cancel()
if e.s.blobExists(ctx, key) {
if e.s.blobExists(ctx, CASPrefix, dg, compressed, false) {
return dg, nil
}
err := e.s.bucket.WriteAll(ctx, key, b)
Expand Down
63 changes: 54 additions & 9 deletions elan/rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
pb "github.com/bazelbuild/remote-apis/build/bazel/remote/execution/v2"
"github.com/bazelbuild/remote-apis/build/bazel/semver"
"github.com/dgraph-io/ristretto"
"github.com/go-redis/redis/v8"
"github.com/golang/protobuf/proto"
"github.com/hashicorp/go-multierror"
"github.com/klauspost/compress/zstd"
Expand All @@ -47,11 +48,14 @@ import (

"github.com/thought-machine/please-servers/grpcutil"
ppb "github.com/thought-machine/please-servers/proto/purity"
rediscommon "github.com/thought-machine/please-servers/redis"
"github.com/thought-machine/please-servers/rexclient"
)

const timeout = 2 * time.Minute

const CASPrefix = "cas"

var log = logging.MustGetLogger()

// emptyHash is the sha256 hash of the empty file.
Expand Down Expand Up @@ -143,14 +147,17 @@ func init() {
}

// ServeForever serves on the given port until terminated.
func ServeForever(opts grpcutil.Opts, storage string, parallelism int, maxDirCacheSize, maxKnownBlobCacheSize int64) {
lis, s := startServer(opts, storage, parallelism, maxDirCacheSize, maxKnownBlobCacheSize)
func ServeForever(opts grpcutil.Opts, storage string, parallelism int, maxDirCacheSize, maxKnownBlobCacheSize int64, readRedis *redis.Client, redisMaxSize int64) {
lis, s := startServer(opts, storage, parallelism, maxDirCacheSize, maxKnownBlobCacheSize, readRedis, redisMaxSize)
grpcutil.ServeForever(lis, s)
}

func createServer(storage string, parallelism int, maxDirCacheSize, maxKnownBlobCacheSize int64) *server {
func createServer(storage string, parallelism int, maxDirCacheSize, maxKnownBlobCacheSize int64, readRedis *redis.Client, redisMaxSize int64) *server {
dec, _ := zstd.NewReader(nil)
enc, _ := zstd.NewWriter(nil, zstd.WithEncoderLevel(zstd.SpeedFastest))
if redisMaxSize <= 0 {
redisMaxSize = rediscommon.DefaultMaxSize
}
return &server{
bytestreamRe: regexp.MustCompile("(?:uploads/[0-9a-f-]+/)?(blobs|compressed-blobs/zstd)/([0-9a-f]+)/([0-9]+)"),
storageRoot: strings.TrimPrefix(storage, "file://"),
Expand All @@ -161,11 +168,13 @@ func createServer(storage string, parallelism int, maxDirCacheSize, maxKnownBlob
knownBlobCache: mustCache(maxKnownBlobCacheSize),
compressor: enc,
decompressor: dec,
readRedis: readRedis,
redisMaxSize: redisMaxSize,
}
}

func startServer(opts grpcutil.Opts, storage string, parallelism int, maxDirCacheSize, maxKnownBlobCacheSize int64) (net.Listener, *grpc.Server) {
srv := createServer(storage, parallelism, maxDirCacheSize, maxKnownBlobCacheSize)
func startServer(opts grpcutil.Opts, storage string, parallelism int, maxDirCacheSize, maxKnownBlobCacheSize int64, readRedis *redis.Client, redisMaxSize int64) (net.Listener, *grpc.Server) {
srv := createServer(storage, parallelism, maxDirCacheSize, maxKnownBlobCacheSize, readRedis, redisMaxSize)
lis, s := grpcutil.NewServer(opts)
pb.RegisterCapabilitiesServer(s, srv)
pb.RegisterActionCacheServer(s, srv)
Expand Down Expand Up @@ -200,6 +209,8 @@ type server struct {
dirCache, knownBlobCache *ristretto.Cache
compressor *zstd.Encoder
decompressor *zstd.Decoder
readRedis *redis.Client
redisMaxSize int64
}

func (s *server) GetCapabilities(ctx context.Context, req *pb.GetCapabilitiesRequest) (*pb.ServerCapabilities, error) {
Expand Down Expand Up @@ -299,7 +310,7 @@ func (s *server) FindMissingBlobs(ctx context.Context, req *pb.FindMissingBlobsR
continue // Ignore the empty blob.
}
go func(d *pb.Digest) {
if !s.blobExists(ctx, s.key("cas", d)) && !s.blobExists(ctx, s.compressedKey("cas", d, true)) {
if !s.blobExists(ctx, "cas", d, false, true) && !s.blobExists(ctx, "cas", d, true, false) {
mutex.Lock()
resp.MissingBlobDigests = append(resp.MissingBlobDigests, d)
mutex.Unlock()
Expand All @@ -312,7 +323,17 @@ func (s *server) FindMissingBlobs(ctx context.Context, req *pb.FindMissingBlobsR
}

// blobExists returns true if this blob exists in the underlying storage.
func (s *server) blobExists(ctx context.Context, key string) bool {
func (s *server) blobExists(ctx context.Context, prefix string, digest *pb.Digest, compressed, redis bool) bool {
if redis && s.readRedis != nil && prefix == CASPrefix && digest.SizeBytes < s.redisMaxSize {
exists, err := s.readRedis.Exists(ctx, digest.Hash).Result()
if err != nil {
log.Warningf("Failed to check blob in Redis: %v", err)
} else if exists > 0 {
return true
}
}

key := s.compressedKey(prefix, digest, compressed)
if s.knownBlobCache != nil {
if _, present := s.knownBlobCache.Get(key); present {
knownBlobCacheHits.Inc()
Expand Down Expand Up @@ -371,7 +392,7 @@ func (s *server) BatchUpdateBlobs(ctx context.Context, req *pb.BatchUpdateBlobsR
rr.Status.Code = int32(codes.InvalidArgument)
rr.Status.Message = fmt.Sprintf("Blob sizes do not match (%d / %d)", len(r.Data), r.Digest.SizeBytes)
blobSizeMismatches.Inc()
} else if s.blobExists(ctx, s.compressedKey("cas", r.Digest, compressed)) {
} else if s.blobExists(ctx, "cas", r.Digest, compressed, true) {
log.Debug("Blob %s already exists remotely", r.Digest.Hash)
} else if err := s.writeAll(ctx, r.Digest, r.Data, compressed); err != nil {
log.Errorf("Error writing blob %s: %s", r.Digest, err)
Expand Down Expand Up @@ -506,6 +527,20 @@ func (s *server) readCompressed(ctx context.Context, prefix string, digest *pb.D
if s.isEmpty(digest) {
return ioutil.NopCloser(bytes.NewReader(nil)), compressed, nil
}
if s.readRedis != nil && prefix == CASPrefix && digest.SizeBytes < s.redisMaxSize {
// NOTE: we could use GETRANGE here, but given it's a bit more expensive on the redis
// side and redisMaxSize is quite small, we get the whole blob
blob, err := s.readRedis.Get(ctx, digest.Hash).Bytes()
if err != nil {
log.Warningf("Failed to get blob in Redis: %v", err)
} else if blob != nil && limit == 0 {
return ioutil.NopCloser(bytes.NewReader(nil)), false, nil
} else if blob != nil && limit > 0 {
return io.NopCloser(bytes.NewReader(blob[offset : offset+limit])), false, nil
} else if blob != nil && limit < 0 {
return io.NopCloser(bytes.NewReader(blob[offset:])), false, nil
}
}
r, err := s.readBlob(ctx, s.compressedKey(prefix, digest, compressed), offset, limit)
if err == nil {
blobsServed.WithLabelValues(batchLabel(false, true), compressorLabel(compressed), compressorLabel(compressed)).Inc()
Expand Down Expand Up @@ -585,6 +620,16 @@ func (s *server) readAllBlobBatched(ctx context.Context, prefix string, digest *
} else if s.isEmpty(digest) {
return nil, false, nil
}

if s.readRedis != nil && prefix == CASPrefix && digest.SizeBytes < s.redisMaxSize {
blob, err := s.readRedis.Get(ctx, digest.Hash).Bytes()
if err != nil {
log.Warningf("Failed to get blob in Redis: %v", err)
} else if blob != nil {
return blob, false, nil
}
}

// TODO(peterebden): Is it worth trying to cache any knowledge of where to go first for blobs?
// Or just guessing based on blob size?
if allowCompression {
Expand Down Expand Up @@ -642,7 +687,7 @@ func (s *server) compressedKey(prefix string, digest *pb.Digest, compressed bool

func (s *server) writeBlob(ctx context.Context, prefix string, digest *pb.Digest, r io.Reader, compressed bool) error {
key := s.compressedKey(prefix, digest, compressed)
if s.isEmpty(digest) || s.blobExists(ctx, key) {
if s.isEmpty(digest) || s.blobExists(ctx, prefix, digest, compressed, true) {
// Read and discard entire content; there is no need to update.
// There seems to be no way for the server to signal the caller to abort in this way, so
// this seems like the most compatible way.
Expand Down
2 changes: 1 addition & 1 deletion mettle/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,14 @@ go_binary(
srcs = ["main.go"],
visibility = ["//package:all"],
deps = [
"///third_party/go/github.com_go-redis_redis_v8//:v8",
"///third_party/go/github.com_peterebden_go-cli-init_v4//flags",
"///third_party/go/github.com_peterebden_go-cli-init_v4//logging",
"//cli",
"//grpcutil",
"//mettle/api",
"//mettle/common",
"//mettle/worker",
"//redis",
],
)

Expand Down
Loading

0 comments on commit f31b091

Please sign in to comment.