Skip to content

Commit

Permalink
Teach Purity about instance names and start transition away from spec…
Browse files Browse the repository at this point in the history
…ial casing instance name
  • Loading branch information
Garbett1 committed Apr 20, 2024
1 parent 6c38863 commit ec04920
Show file tree
Hide file tree
Showing 4 changed files with 25 additions and 2 deletions.
14 changes: 13 additions & 1 deletion elan/rpc/rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"crypto/sha256"
"encoding/hex"
"fmt"
"google.golang.org/grpc/metadata"
"io"
"io/ioutil"
"net"
Expand Down Expand Up @@ -240,6 +241,16 @@ func (s *server) GetCapabilities(ctx context.Context, req *pb.GetCapabilitiesReq
}

func (s *server) GetActionResult(ctx context.Context, req *pb.GetActionResultRequest) (*pb.ActionResult, error) {
md, _ := metadata.FromIncomingContext(ctx)
callers := md.Get("mettle-caller-name")

caller := ""
if len(callers) > 0 && callers[0] != "" {
caller = callers[0]
} else {
// Be paranoid and assume that a client may not have updated, and set the caller to the instance name
caller = req.InstanceName
}
ctx, cancel := context.WithTimeout(ctx, timeout)
defer cancel()
ar := &pb.ActionResult{}
Expand All @@ -256,7 +267,8 @@ func (s *server) GetActionResult(ctx context.Context, req *pb.GetActionResultReq
ar.StdoutRaw = b
}
}
if s.isFileStorage && req.InstanceName != "purity-gc" {

if s.isFileStorage && caller != "purity-gc" {
now := time.Now()
if err := os.Chtimes(path.Join(s.storageRoot, s.key("ac", req.ActionDigest)), now, now); err != nil {
log.Warning("Failed to change times on file: %s", err)
Expand Down
4 changes: 4 additions & 0 deletions proto/purity/purity.proto
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ service GC {
message ListRequest {
// The prefix of blobs to list. Should be exactly two hex characters.
string prefix = 1;
// The instance name for the CAS server.
string instance_name = 2;
}

message ListResponse {
Expand Down Expand Up @@ -53,6 +55,8 @@ message DeleteRequest {
// False gives the server an option to 'soft' delete them (however it may
// interpret that).
bool hard = 4;
// The instance name for the CAS server.
string instance_name = 5;
}

message DeleteResponse {}
7 changes: 7 additions & 0 deletions purity/gc/gc.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"encoding/hex"
"fmt"
"google.golang.org/grpc/metadata"
"path"
"sort"
"sync"
Expand Down Expand Up @@ -124,6 +125,8 @@ type Action struct {
InputSize, OutputSize int
}

const name = "purity-gc"

type collector struct {
client *client.Client
gcclient ppb.GCClient
Expand Down Expand Up @@ -187,6 +190,7 @@ func (c *collector) LoadAllBlobs() error {
for j := 0; j < 16; j++ {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Minute)
defer cancel()
ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs("mettle-caller-name", name))
resp, err := c.gcclient.List(ctx, &ppb.ListRequest{
Prefix: hex.EncodeToString([]byte{byte(i*16 + j)}),
})
Expand Down Expand Up @@ -475,6 +479,7 @@ func (c *collector) ReplicateBlobs(rf int) error {
if err := c.replicateBlobs("blobs", blobs, func(dg *pb.Digest) error {
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Minute)
defer cancel()
ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs("mettle-caller-name", name))
blob, _, err := c.client.ReadBlob(ctx, digest.NewFromProtoUnvalidated(dg))
if err != nil {
return err
Expand All @@ -487,6 +492,7 @@ func (c *collector) ReplicateBlobs(rf int) error {
return c.replicateBlobs("action results", ars, func(dg *pb.Digest) error {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs("mettle-caller-name", name))
ar, err := c.client.GetActionResult(ctx, &pb.GetActionResultRequest{
InstanceName: c.client.InstanceName,
ActionDigest: dg,
Expand Down Expand Up @@ -616,6 +622,7 @@ func (c *collector) BlobUsage() ([]Blob, error) {
func (c *collector) inputDirs(ar *ppb.ActionResult) (*pb.Action, []*pb.Directory, error) {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs("mettle-caller-name", name))
action := &pb.Action{}
blob, present := c.allBlobs[ar.Hash]
if !present {
Expand Down
2 changes: 1 addition & 1 deletion purity/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ var opts = struct {
Logging cli.LoggingOpts `group:"Options controlling logging output"`
GC struct {
URL string `short:"u" long:"url" required:"true" description:"URL for the storage server"`
InstanceName string `short:"i" long:"instance_name" default:"purity-gc" description:"Name of this execution instance"`
InstanceName string `short:"i" long:"instance_name" default:"mettle" description:"Name of this execution instance"`
TokenFile string `long:"token_file" description:"File containing token to authenticate gRPC requests with"`
TLS bool `long:"tls" description:"Use TLS for communicating with the storage server"`
} `group:"Options controlling GC settings"`
Expand Down

0 comments on commit ec04920

Please sign in to comment.