diff --git a/ChangeLog b/ChangeLog index f3dc3dcc..e5c3541f 100644 --- a/ChangeLog +++ b/ChangeLog @@ -1,3 +1,7 @@ +Version 11.9.3 +-------------- + * Avoid some cases of potential reordering of streamed events + Version 11.9.2 -------------- * Only clear J.current if we reuse an old job @@ -5,8 +9,8 @@ Version 11.9.2 Version 11.9.1 -------------- * Delete done jobs after resuption time has passed - * Delete any job after 2x expiry - * Don't set lastUpdate on streaming events + * Delete any job after 2x expiry + * Don't set lastUpdate on streaming events * Add warning logs when deleting jobs with listeners Version 11.9.0 @@ -19,7 +23,7 @@ Version 11.8.5 Version 11.8.4 -------------- - * delete in memory jobs periodically in one routine + * delete in memory jobs periodically in one routine Version 11.7.4 -------------- @@ -64,19 +68,19 @@ Version 11.6.0 Version 11.5.0 -------------- - * Update gocloud to v0.34.0 to support setting max batchsize on topics + * Update gocloud to v0.34.0 to support setting max batchsize on topics Version 11.4.2 -------------- - * Fix bug in api server + * Fix bug in api server Version 11.4.2 -------------- - * Fix bug in api server + * Fix bug in api server Version 11.4.0 -------------- - * Add api server pubsub opts + * Add api server pubsub opts * Add metrics for failed publish requests * Add no execution in progress metric * Add preresponse publish duration metric diff --git a/VERSION b/VERSION index 22d790d1..31ae9dea 100644 --- a/VERSION +++ b/VERSION @@ -1 +1 @@ -11.9.2 +11.9.3 diff --git a/mettle/api/api.go b/mettle/api/api.go index 6290d15a..16012374 100644 --- a/mettle/api/api.go +++ b/mettle/api/api.go @@ -385,13 +385,16 @@ func (s *server) WaitExecution(req *pb.WaitExecutionRequest, stream pb.Execution // streamEvents streams a series of events back to the client. func (s *server) streamEvents(digest *pb.Digest, ch <-chan *longrunning.Operation, stream pb.Execution_ExecuteServer) error { + defer s.stopStream(digest, ch) for op := range ch { op.Name = digest.Hash if err := stream.Send(op); err != nil { log.Warning("Failed to forward event for %s: %s", digest.Hash, err) - s.stopStream(digest, ch) return err } + if op.Done { + break + } } log.Debug("Completed stream for %s", digest.Hash) return nil @@ -443,8 +446,6 @@ func (s *server) eventStream(digest *pb.Digest, create bool) (<-chan *longrunnin // be no further update and no point for the receiver to keep waiting). if created || j.Current == nil || !j.Done { j.Streams = append(j.Streams, ch) - } else { - close(ch) } return ch, created } @@ -546,22 +547,13 @@ func (s *server) process(msg *pubsub.Message) { j.Current = op j.LastUpdate = time.Now() for _, stream := range j.Streams { - // Invoke this in a goroutine so we do not block. - go func(ch chan<- *longrunning.Operation) { - defer func() { - recover() // Avoid any chance of panicking from a 'send on closed channel' - }() - log.Debug("Dispatching update for %s", key) - ch <- &longrunning.Operation{ - Name: op.Name, - Metadata: op.Metadata, - Done: op.Done, - Result: op.Result, - } - if op.Done { - close(ch) - } - }(stream) + log.Debug("Dispatching update for %s", key) + stream <- &longrunning.Operation{ + Name: op.Name, + Metadata: op.Metadata, + Done: op.Done, + Result: op.Result, + } } if op.Done { if !j.StartTime.IsZero() {