Skip to content

Commit

Permalink
cleanups
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia committed Oct 7, 2023
1 parent 24e54ea commit c359254
Showing 1 changed file with 36 additions and 26 deletions.
62 changes: 36 additions & 26 deletions internal/proxy/subscribe_stream_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,11 +67,16 @@ func NewSubscribeStreamHandler(c SubscribeStreamHandlerConfig) *SubscribeStreamH
type StreamPublishFunc func(data []byte) error

// SubscribeStreamHandlerFunc ...
type SubscribeStreamHandlerFunc func(Client, bool, centrifuge.SubscribeEvent, rule.ChannelOptions, PerCallData) (centrifuge.SubscribeReply, StreamPublishFunc, func(), error)
type SubscribeStreamHandlerFunc func(
Client, bool, centrifuge.SubscribeEvent, rule.ChannelOptions, PerCallData,
) (centrifuge.SubscribeReply, StreamPublishFunc, func(), error)

// Handle ...
func (h *SubscribeStreamHandler) Handle(node *centrifuge.Node) SubscribeStreamHandlerFunc {
return func(client Client, bidi bool, e centrifuge.SubscribeEvent, chOpts rule.ChannelOptions, pcd PerCallData) (centrifuge.SubscribeReply, StreamPublishFunc, func(), error) {
return func(
client Client, bidi bool, e centrifuge.SubscribeEvent,
chOpts rule.ChannelOptions, pcd PerCallData,
) (centrifuge.SubscribeReply, StreamPublishFunc, func(), error) {
started := time.Now()

var p *SubscribeStreamProxy
Expand Down Expand Up @@ -118,38 +123,43 @@ func (h *SubscribeStreamHandler) Handle(node *centrifuge.Node) SubscribeStreamHa
subscriptionReady := make(chan struct{})
var subscriptionReadyOnce sync.Once

subscribeRep, publishFunc, cancelFunc, err := p.SubscribeStream(client.Context(), bidi, req, func(pub *proxyproto.Publication, err error) {
subscriptionReadyOnce.Do(func() {
subscribeRep, publishFunc, cancelFunc, err := p.SubscribeStream(
client.Context(),
bidi,
req,
func(pub *proxyproto.Publication, err error) {
subscriptionReadyOnce.Do(func() {
select {
case <-subscriptionReady:
case <-client.Context().Done():
return
}
})
select {
case <-subscriptionReady:
case <-client.Context().Done():
return
default:
}
})
select {
case <-client.Context().Done():
return
default:
}
if err != nil {
if errors.Is(err, io.EOF) {
if err != nil {
if errors.Is(err, io.EOF) {
client.Unsubscribe(e.Channel, centrifuge.Unsubscribe{
Code: centrifuge.UnsubscribeCodeServer,
Reason: "server unsubscribe",
})
return
}
client.Unsubscribe(e.Channel, centrifuge.Unsubscribe{
Code: centrifuge.UnsubscribeCodeServer,
Reason: "server unsubscribe",
Code: centrifuge.UnsubscribeCodeInsufficient,
Reason: "insufficient state",
})
return
}
client.Unsubscribe(e.Channel, centrifuge.Unsubscribe{
Code: centrifuge.UnsubscribeCodeInsufficient,
Reason: "insufficient state",
})
return
}
_ = client.WritePublication(e.Channel, &centrifuge.Publication{
Data: pub.Data,
Tags: pub.Tags,
}, centrifuge.StreamPosition{})
})
_ = client.WritePublication(e.Channel, &centrifuge.Publication{
Data: pub.Data,
Tags: pub.Tags,
}, centrifuge.StreamPosition{})
},
)

duration := time.Since(started).Seconds()

Expand Down

0 comments on commit c359254

Please sign in to comment.