Skip to content

Commit

Permalink
Send back execution metadata with messages
Browse files Browse the repository at this point in the history
  • Loading branch information
peterebden committed Jun 15, 2024
1 parent b6ffca3 commit d3fb9c5
Show file tree
Hide file tree
Showing 4 changed files with 11 additions and 10 deletions.
2 changes: 1 addition & 1 deletion elan/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -14,5 +14,5 @@ go_binary(
sh_cmd(
name = "run_local",
srcs = [":elan"],
cmd = f"mkdir -p plz-out/elan && exec $(out_location :elan) --host {CONFIG.LOCAL_HOST} --port 7777 -s file://\\\\$PWD/plz-out/elan --log_file plz-out/log/elan.log --admin_host 127.0.0.1 --admin_port 9992 --token_file grpcutil/token.txt --known_blob_cache_size 20M",
cmd = f"mkdir -p plz-out/elan && TMPDIR='plz-out/elan' exec $(out_location :elan) --host {CONFIG.LOCAL_HOST} --port 7777 -s file://\\\\$PWD/plz-out/elan --log_file plz-out/log/elan.log --admin_host 127.0.0.1 --admin_port 9992 --token_file grpcutil/token.txt --known_blob_cache_size 20M",
)
6 changes: 3 additions & 3 deletions mettle/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ func (s *server) Execute(req *pb.ExecuteRequest, stream pb.Execution_ExecuteServ
// If we're allowed to check the cache, see if this one has already been done.
// A well-behaved client will likely have done this itself but we should make sure again.
if !req.SkipCacheLookup {
if err := stream.Send(common.BuildOperation(pb.ExecutionStage_CACHE_CHECK, req.ActionDigest, nil)); err != nil {
if err := stream.Send(common.BuildOperation(pb.ExecutionStage_CACHE_CHECK, req.ActionDigest, nil, nil)); err != nil {
log.Warningf("Failed to forward to stream: %s", err)
}
if ar, err := s.client.CheckActionCache(context.Background(), req.ActionDigest); err != nil {
Expand All @@ -320,7 +320,7 @@ func (s *server) Execute(req *pb.ExecuteRequest, stream pb.Execution_ExecuteServ
Result: ar,
CachedResult: true,
Status: &rpcstatus.Status{Code: int32(codes.OK)},
}))
}, nil))
}
}

Expand All @@ -331,7 +331,7 @@ func (s *server) Execute(req *pb.ExecuteRequest, stream pb.Execution_ExecuteServ
}
// Dispatch a pre-emptive response message to let our colleagues know we've queued it.
// We will also receive & forward this message.
b := common.MarshalOperation(pb.ExecutionStage_QUEUED, req.ActionDigest, nil)
b := common.MarshalOperation(pb.ExecutionStage_QUEUED, req.ActionDigest, nil, nil)
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
preResponseStartTime := time.Now()
Expand Down
11 changes: 6 additions & 5 deletions mettle/common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,10 +173,11 @@ func CheckPath(path string) error {
}

// BuildOperation constructs a longrunning.Operation proto for a task. response may be nil.
func BuildOperation(stage pb.ExecutionStage_Value, actionDigest *pb.Digest, response *pb.ExecuteResponse) *longrunning.Operation {
func BuildOperation(stage pb.ExecutionStage_Value, actionDigest *pb.Digest, response *pb.ExecuteResponse, metadata *pb.ExecutedActionMetadata) *longrunning.Operation {
any, _ := ptypes.MarshalAny(&pb.ExecuteOperationMetadata{
Stage: stage,
ActionDigest: actionDigest,
Stage: stage,
ActionDigest: actionDigest,
PartialExecutionMetadata: metadata,
})
op := &longrunning.Operation{
Name: actionDigest.Hash,
Expand All @@ -191,7 +192,7 @@ func BuildOperation(stage pb.ExecutionStage_Value, actionDigest *pb.Digest, resp
}

// MarshalOperation is like BuildOperation but gives you back the serialised proto.
func MarshalOperation(stage pb.ExecutionStage_Value, actionDigest *pb.Digest, response *pb.ExecuteResponse) []byte {
b, _ := proto.Marshal(BuildOperation(stage, actionDigest, response))
func MarshalOperation(stage pb.ExecutionStage_Value, actionDigest *pb.Digest, response *pb.ExecuteResponse, metadata *pb.ExecutedActionMetadata) []byte {
b, _ := proto.Marshal(BuildOperation(stage, actionDigest, response, metadata))
return b
}
2 changes: 1 addition & 1 deletion mettle/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -1065,7 +1065,7 @@ func (w *worker) collectOutputs(ar *pb.ActionResult, cmd *pb.Command) error {
// update sends an update on the response channel
func (w *worker) update(stage pb.ExecutionStage_Value, response *pb.ExecuteResponse) error {
w.Report(true, stage == pb.ExecutionStage_EXECUTING, true, stage.String())
body := common.MarshalOperation(stage, w.actionDigest, response)
body := common.MarshalOperation(stage, w.actionDigest, response, w.metadata)
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute)
defer cancel()
return common.PublishWithOrderingKey(ctx, w.responses, body, w.actionDigest.Hash, w.name)
Expand Down

0 comments on commit d3fb9c5

Please sign in to comment.