Skip to content

Commit

Permalink
Don't set lastUpdate when streaming events
Browse files Browse the repository at this point in the history
  • Loading branch information
Hamishpk committed Mar 7, 2024
1 parent fa2b0e1 commit bc18de3
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 7 deletions.
4 changes: 4 additions & 0 deletions ChangeLog
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
Version 11.9.1
--------------
* Don't set lastUpdate on streaming events

Version 11.9.0
--------------
* Revert job deletion change to match previous behaviour
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
11.9.0
11.9.1
9 changes: 3 additions & 6 deletions mettle/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,10 +385,6 @@ func (s *server) WaitExecution(req *pb.WaitExecutionRequest, stream pb.Execution
func (s *server) streamEvents(digest *pb.Digest, ch <-chan *longrunning.Operation, stream pb.Execution_ExecuteServer) error {
for op := range ch {
op.Name = digest.Hash
s.mutex.Lock()
j := s.jobs[digest.Hash]
j.LastUpdate = time.Now()
s.mutex.Unlock()
if err := stream.Send(op); err != nil {
log.Warning("Failed to forward event for %s: %s", digest.Hash, err)
s.stopStream(digest, ch)
Expand Down Expand Up @@ -584,7 +580,7 @@ func (s *server) periodicallyDeleteJobs() {
s.mutex.Lock()
startTime := time.Now()
for digest, job := range s.jobs {
if shouldDeleteJob(job) {
if shouldDeleteJob(job, digest) {
delete(s.jobs, digest)
}
}
Expand All @@ -593,7 +589,7 @@ func (s *server) periodicallyDeleteJobs() {
}
}

func shouldDeleteJob(j *job) bool {
func shouldDeleteJob(j *job, digest string) bool {
timeSinceLastUpdate := time.Since(j.LastUpdate)
if j.Done && len(j.Streams) == 0 && timeSinceLastUpdate > retentionTime {
return true
Expand All @@ -602,6 +598,7 @@ func shouldDeleteJob(j *job) bool {
return true
}
if !j.Done && timeSinceLastUpdate > 2*expiryTime {
log.Warning("Deleting job with %d listeners action: %s", len(j.Streams), digest)
return true
}
return false
Expand Down

0 comments on commit bc18de3

Please sign in to comment.