Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove goroutine #293

Merged
merged 2 commits into from
Mar 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 11 additions & 7 deletions ChangeLog
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
Version 11.9.3
--------------
* Avoid some cases of potential reordering of streamed events

Version 11.9.2
--------------
* Only clear J.current if we reuse an old job

Version 11.9.1
--------------
* Delete done jobs after resuption time has passed
* Delete any job after 2x expiry
* Don't set lastUpdate on streaming events
* Delete any job after 2x expiry
* Don't set lastUpdate on streaming events
* Add warning logs when deleting jobs with listeners

Version 11.9.0
Expand All @@ -19,7 +23,7 @@ Version 11.8.5

Version 11.8.4
--------------
* delete in memory jobs periodically in one routine
* delete in memory jobs periodically in one routine

Version 11.7.4
--------------
Expand Down Expand Up @@ -64,19 +68,19 @@ Version 11.6.0

Version 11.5.0
--------------
* Update gocloud to v0.34.0 to support setting max batchsize on topics
* Update gocloud to v0.34.0 to support setting max batchsize on topics

Version 11.4.2
--------------
* Fix bug in api server
* Fix bug in api server

Version 11.4.2
--------------
* Fix bug in api server
* Fix bug in api server

Version 11.4.0
--------------
* Add api server pubsub opts
* Add api server pubsub opts
* Add metrics for failed publish requests
* Add no execution in progress metric
* Add preresponse publish duration metric
Expand Down
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
11.9.2
11.9.3
30 changes: 11 additions & 19 deletions mettle/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -385,13 +385,16 @@ func (s *server) WaitExecution(req *pb.WaitExecutionRequest, stream pb.Execution

// streamEvents streams a series of events back to the client.
func (s *server) streamEvents(digest *pb.Digest, ch <-chan *longrunning.Operation, stream pb.Execution_ExecuteServer) error {
defer s.stopStream(digest, ch)
for op := range ch {
op.Name = digest.Hash
if err := stream.Send(op); err != nil {
log.Warning("Failed to forward event for %s: %s", digest.Hash, err)
s.stopStream(digest, ch)
return err
}
if op.Done {
break
}
}
log.Debug("Completed stream for %s", digest.Hash)
return nil
Expand Down Expand Up @@ -443,8 +446,6 @@ func (s *server) eventStream(digest *pb.Digest, create bool) (<-chan *longrunnin
// be no further update and no point for the receiver to keep waiting).
if created || j.Current == nil || !j.Done {
j.Streams = append(j.Streams, ch)
} else {
close(ch)
}
return ch, created
}
Expand Down Expand Up @@ -546,22 +547,13 @@ func (s *server) process(msg *pubsub.Message) {
j.Current = op
j.LastUpdate = time.Now()
for _, stream := range j.Streams {
// Invoke this in a goroutine so we do not block.
go func(ch chan<- *longrunning.Operation) {
defer func() {
recover() // Avoid any chance of panicking from a 'send on closed channel'
}()
log.Debug("Dispatching update for %s", key)
ch <- &longrunning.Operation{
Name: op.Name,
Metadata: op.Metadata,
Done: op.Done,
Result: op.Result,
}
if op.Done {
close(ch)
}
}(stream)
log.Debug("Dispatching update for %s", key)
stream <- &longrunning.Operation{
Name: op.Name,
Metadata: op.Metadata,
Done: op.Done,
Result: op.Result,
}
}
if op.Done {
if !j.StartTime.IsZero() {
Expand Down
Loading