Skip to content

Commit

Permalink
Propagate pubsub subscriber errors (#185)
Browse files Browse the repository at this point in the history
* Found a blocking issue on pubsub transport if a subscriber had an error.

Signed-off-by: Scott Nichols <nicholss@google.com>

* make sure cancel is called

Signed-off-by: Scott Nichols <nicholss@google.com>

* go mod.

Signed-off-by: Scott Nichols <nicholss@google.com>
  • Loading branch information
n3wscott authored Sep 11, 2019
1 parent e0af3d9 commit 5bacd6c
Showing 1 changed file with 10 additions and 5 deletions.
15 changes: 10 additions & 5 deletions pkg/cloudevents/transport/pubsub/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,7 @@ func (t *Transport) StartReceiver(ctx context.Context) error {
}

cctx, cancel := context.WithCancel(ctx)
defer cancel()
n := len(t.subscriptions)

// Make the channels for quit and errors.
Expand All @@ -280,22 +281,26 @@ func (t *Transport) StartReceiver(ctx context.Context) error {
})
}

// Block for parent context to finish.
<-ctx.Done()
cancel()

// Collect errors and done calls until we have n of them.
errs := []string(nil)
for success := 0; success < n; success++ {
var err error
select {
case err = <-errc:
case <-ctx.Done(): // Block for parent context to finish.
success--
case err = <-errc: // Collect errors
case <-quit:
}
if cancel != nil {
// Stop all other subscriptions.
cancel()
cancel = nil
}
if err != nil {
errs = append(errs, err.Error())
}
}

close(quit)
close(errc)

Expand Down

0 comments on commit 5bacd6c

Please sign in to comment.