Skip to content

Commit

Permalink
Only clear J.current if we reuse an old job (#290)
Browse files Browse the repository at this point in the history
* Only clear J.current if we reuse an old job

* Bump version + changelog
  • Loading branch information
Hamishpk authored Mar 11, 2024
1 parent f4c5b3d commit 84c6394
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 11 deletions.
4 changes: 4 additions & 0 deletions ChangeLog
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
Version 11.9.2
--------------
* Only clear J.current if we reuse an old job

Version 11.9.1
--------------
* Delete done jobs after resuption time has passed
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
11.9.1
11.9.2
19 changes: 9 additions & 10 deletions mettle/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -419,21 +419,18 @@ func (s *server) eventStream(digest *pb.Digest, create bool) (<-chan *longrunnin
totalRequests.Inc()
created = true
} else if create && time.Since(j.LastUpdate) >= resumptionTime {
// In this path we think the job is too old to be relevant; we don't actually create
// a new job, but we tell the caller we did so it triggers a new execution.
// In this path we think the job is too old to be relevant; clear out any existing current info
// and tell the caller we created a new job..
j.Current = nil
j.StartTime = time.Now()
j.LastUpdate = time.Now()
j.Done = false
created = true
} else {
log.Debug("Resuming existing job for %s", digest.Hash)
}
ch := make(chan *longrunning.Operation, 100)
if created {
// This request is creating a new stream, clear out any existing current job info; it is now
// at best irrelevant and at worst outdated.
j.Current = nil
j.StartTime = time.Now()
j.LastUpdate = time.Now()
j.Done = false
} else if j.Current != nil {
if !created && j.Current != nil {
// This request is resuming an existing stream, give them an update on the latest thing to happen.
// This helps avoid 504s from taking too long to send response headers since it can be an arbitrary
// amount of time until we receive the next real update.
Expand Down Expand Up @@ -579,13 +576,15 @@ func (s *server) process(msg *pubsub.Message) {
func (s *server) periodicallyDeleteJobs() {
for range s.deleteJobsTicker.C {
s.mutex.Lock()
log.Debug("Starting clean")
startTime := time.Now()
for digest, job := range s.jobs {
if shouldDeleteJob(job, digest) {
delete(s.jobs, digest)
}
}
s.mutex.Unlock()
log.Debug("Finished clean")
deleteJobsDurations.Observe(time.Since(startTime).Seconds())
}
}
Expand Down

0 comments on commit 84c6394

Please sign in to comment.