diff --git a/internal/proxy/subscribe_stream_handler.go b/internal/proxy/subscribe_stream_handler.go index e4231c7402..ad66c94be9 100644 --- a/internal/proxy/subscribe_stream_handler.go +++ b/internal/proxy/subscribe_stream_handler.go @@ -67,11 +67,16 @@ func NewSubscribeStreamHandler(c SubscribeStreamHandlerConfig) *SubscribeStreamH type StreamPublishFunc func(data []byte) error // SubscribeStreamHandlerFunc ... -type SubscribeStreamHandlerFunc func(Client, bool, centrifuge.SubscribeEvent, rule.ChannelOptions, PerCallData) (centrifuge.SubscribeReply, StreamPublishFunc, func(), error) +type SubscribeStreamHandlerFunc func( + Client, bool, centrifuge.SubscribeEvent, rule.ChannelOptions, PerCallData, +) (centrifuge.SubscribeReply, StreamPublishFunc, func(), error) // Handle ... func (h *SubscribeStreamHandler) Handle(node *centrifuge.Node) SubscribeStreamHandlerFunc { - return func(client Client, bidi bool, e centrifuge.SubscribeEvent, chOpts rule.ChannelOptions, pcd PerCallData) (centrifuge.SubscribeReply, StreamPublishFunc, func(), error) { + return func( + client Client, bidi bool, e centrifuge.SubscribeEvent, + chOpts rule.ChannelOptions, pcd PerCallData, + ) (centrifuge.SubscribeReply, StreamPublishFunc, func(), error) { started := time.Now() var p *SubscribeStreamProxy @@ -118,38 +123,43 @@ func (h *SubscribeStreamHandler) Handle(node *centrifuge.Node) SubscribeStreamHa subscriptionReady := make(chan struct{}) var subscriptionReadyOnce sync.Once - subscribeRep, publishFunc, cancelFunc, err := p.SubscribeStream(client.Context(), bidi, req, func(pub *proxyproto.Publication, err error) { - subscriptionReadyOnce.Do(func() { + subscribeRep, publishFunc, cancelFunc, err := p.SubscribeStream( + client.Context(), + bidi, + req, + func(pub *proxyproto.Publication, err error) { + subscriptionReadyOnce.Do(func() { + select { + case <-subscriptionReady: + case <-client.Context().Done(): + return + } + }) select { - case <-subscriptionReady: case <-client.Context().Done(): return + default: } - }) - select { - case <-client.Context().Done(): - return - default: - } - if err != nil { - if errors.Is(err, io.EOF) { + if err != nil { + if errors.Is(err, io.EOF) { + client.Unsubscribe(e.Channel, centrifuge.Unsubscribe{ + Code: centrifuge.UnsubscribeCodeServer, + Reason: "server unsubscribe", + }) + return + } client.Unsubscribe(e.Channel, centrifuge.Unsubscribe{ - Code: centrifuge.UnsubscribeCodeServer, - Reason: "server unsubscribe", + Code: centrifuge.UnsubscribeCodeInsufficient, + Reason: "insufficient state", }) return } - client.Unsubscribe(e.Channel, centrifuge.Unsubscribe{ - Code: centrifuge.UnsubscribeCodeInsufficient, - Reason: "insufficient state", - }) - return - } - _ = client.WritePublication(e.Channel, ¢rifuge.Publication{ - Data: pub.Data, - Tags: pub.Tags, - }, centrifuge.StreamPosition{}) - }) + _ = client.WritePublication(e.Channel, ¢rifuge.Publication{ + Data: pub.Data, + Tags: pub.Tags, + }, centrifuge.StreamPosition{}) + }, + ) duration := time.Since(started).Seconds()