Skip to content

Commit

Permalink
GRPC proxy subscription streams (#722)
Browse files Browse the repository at this point in the history
  • Loading branch information
FZambia authored Oct 9, 2023
1 parent 7c69467 commit 97735b4
Show file tree
Hide file tree
Showing 32 changed files with 1,275 additions and 245 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/FZambia/statik v0.1.2-0.20180217151304-b9f012bb2a1b
github.com/FZambia/tarantool v0.3.1
github.com/FZambia/viper-lite v0.0.0-20220110144934-1899f66c7d0e
github.com/centrifugal/centrifuge v0.30.2
github.com/centrifugal/centrifuge v0.30.3-0.20231009162654-6036b763c9aa
github.com/centrifugal/protocol v0.10.0
github.com/cristalhq/jwt/v5 v5.1.0
github.com/gobwas/glob v0.2.3
Expand Down Expand Up @@ -38,7 +38,7 @@ require (
go.opentelemetry.io/otel/trace v1.19.0
go.uber.org/automaxprocs v1.5.3
golang.org/x/crypto v0.13.0
golang.org/x/sync v0.3.0
golang.org/x/sync v0.4.0
golang.org/x/time v0.3.0
google.golang.org/grpc v1.58.2
google.golang.org/protobuf v1.31.0
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM=
github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE=
github.com/centrifugal/centrifuge v0.30.2 h1:Ru/6lPpOMdqTTDRG/gr74HPUBRq7h574WDVwA47lv0U=
github.com/centrifugal/centrifuge v0.30.2/go.mod h1:6aIBCCjAnwpqr6SfkkU36Z0B9s/IZK+Z5faippXPXbI=
github.com/centrifugal/centrifuge v0.30.3-0.20231009162654-6036b763c9aa h1:EFaapglrL6ebSlWBfzhBqBPwVfUq1kPQWjPNPgD4qv0=
github.com/centrifugal/centrifuge v0.30.3-0.20231009162654-6036b763c9aa/go.mod h1:ojcxqaWUcxfQ9t5dVNwOe2/N66AdQMwrYbjymTfkLvs=
github.com/centrifugal/protocol v0.10.0 h1:Lac48ATVjVjirYPTHxbSMmiQXXajx7dhARKHy1UOL+A=
github.com/centrifugal/protocol v0.10.0/go.mod h1:Tq5I1mBpLHkLxNM9gfb3Gth+sTE2kKU5hH3cVgmVs9s=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
Expand Down Expand Up @@ -246,8 +246,8 @@ golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E=
golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sync v0.4.0 h1:zxkM55ReGkDlKSM+Fu41A+zmbZuaPVbGMzvvdUPznYQ=
golang.org/x/sync v0.4.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
2 changes: 2 additions & 0 deletions internal/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,6 @@ type Client interface {
Context() context.Context
Transport() centrifuge.TransportInfo
AcquireStorage() (map[string]any, func(map[string]any))
Unsubscribe(ch string, unsubscribe ...centrifuge.Unsubscribe)
WritePublication(channel string, publication *centrifuge.Publication, sp centrifuge.StreamPosition) error
}
74 changes: 64 additions & 10 deletions internal/client/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,13 @@ type RPCExtensionFunc func(c Client, e centrifuge.RPCEvent) (centrifuge.RPCReply
// ProxyMap is a structure which contains all configured and already initialized
// proxies which can be used from inside client event handlers.
type ProxyMap struct {
ConnectProxy proxy.ConnectProxy
RefreshProxy proxy.RefreshProxy
RpcProxies map[string]proxy.RPCProxy
PublishProxies map[string]proxy.PublishProxy
SubscribeProxies map[string]proxy.SubscribeProxy
SubRefreshProxies map[string]proxy.SubRefreshProxy
ConnectProxy proxy.ConnectProxy
RefreshProxy proxy.RefreshProxy
RpcProxies map[string]proxy.RPCProxy
PublishProxies map[string]proxy.PublishProxy
SubscribeProxies map[string]proxy.SubscribeProxy
SubRefreshProxies map[string]proxy.SubRefreshProxy
SubscribeStreamProxies map[string]*proxy.SubscribeStreamProxy
}

// Handler ...
Expand Down Expand Up @@ -114,6 +115,14 @@ func (h *Handler) Setup() error {
}).Handle(h.node)
}

var proxySubscribeStreamHandler proxy.SubscribeStreamHandlerFunc
if len(h.proxyMap.SubscribeStreamProxies) > 0 {
proxySubscribeStreamHandler = proxy.NewSubscribeStreamHandler(proxy.SubscribeStreamHandlerConfig{
Proxies: h.proxyMap.SubscribeStreamProxies,
GranularProxyMode: h.granularProxyMode,
}).Handle(h.node)
}

var subRefreshProxyHandler proxy.SubRefreshHandlerFunc
if len(h.proxyMap.SubRefreshProxies) > 0 {
subRefreshProxyHandler = proxy.NewSubRefreshHandler(proxy.SubRefreshHandlerConfig{
Expand Down Expand Up @@ -187,7 +196,7 @@ func (h *Handler) Setup() error {

client.OnSubscribe(func(event centrifuge.SubscribeEvent, cb centrifuge.SubscribeCallback) {
h.runConcurrentlyIfNeeded(client.Context(), concurrency, semaphore, func() {
reply, _, err := h.OnSubscribe(client, event, subscribeProxyHandler)
reply, _, err := h.OnSubscribe(client, event, subscribeProxyHandler, proxySubscribeStreamHandler)
cb(reply, err)
})
})
Expand Down Expand Up @@ -226,6 +235,20 @@ func (h *Handler) Setup() error {
cb(reply, err)
})
})

client.OnUnsubscribe(func(e centrifuge.UnsubscribeEvent) {
if len(h.proxyMap.SubscribeStreamProxies) > 0 {
storage, release := client.AcquireStorage()
streamCancelKey := "stream_cancel_" + e.Channel
cancelFunc, ok := storage[streamCancelKey].(func())
if ok {
cancelFunc()
delete(storage, streamCancelKey)
delete(storage, "stream_publisher_"+e.Channel)
}
release(storage)
}
})
})
return nil
}
Expand Down Expand Up @@ -278,9 +301,7 @@ func (h *Handler) OnClientConnecting(
)

subscriptions := make(map[string]centrifuge.SubscribeOptions)

ruleConfig := h.ruleContainer.Config()

var processClientChannels bool

storage := map[string]any{}
Expand Down Expand Up @@ -603,7 +624,7 @@ type SubscribeExtra struct {
}

// OnSubscribe ...
func (h *Handler) OnSubscribe(c Client, e centrifuge.SubscribeEvent, subscribeProxyHandler proxy.SubscribeHandlerFunc) (centrifuge.SubscribeReply, SubscribeExtra, error) {
func (h *Handler) OnSubscribe(c Client, e centrifuge.SubscribeEvent, subscribeProxyHandler proxy.SubscribeHandlerFunc, subscribeStreamHandlerFunc proxy.SubscribeStreamHandlerFunc) (centrifuge.SubscribeReply, SubscribeExtra, error) {
ruleConfig := h.ruleContainer.Config()

if e.Channel == "" {
Expand Down Expand Up @@ -688,6 +709,22 @@ func (h *Handler) OnSubscribe(c Client, e centrifuge.SubscribeEvent, subscribePr
r.ClientSideRefresh = false
}
return r, SubscribeExtra{}, err
} else if (chOpts.ProxySubscribeStream || chOpts.SubscribeStreamProxyName != "") && !isUserLimitedChannel {
if subscribeStreamHandlerFunc == nil {
h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, "stream proxy not enabled", map[string]any{"channel": e.Channel, "user": c.UserID(), "client": c.ID()}))
return centrifuge.SubscribeReply{}, SubscribeExtra{}, centrifuge.ErrorNotAvailable
}
r, publishFunc, cancelFunc, err := subscribeStreamHandlerFunc(c, chOpts.ProxySubscribeStreamBidirectional, e, chOpts, getPerCallData(c))
if chOpts.ProxySubscribeStreamBidirectional {
storage, release := c.AcquireStorage()
storage["stream_cancel_"+e.Channel] = cancelFunc
storage["stream_publisher_"+e.Channel] = publishFunc
release(storage)
}
if chOpts.ProxySubRefresh || chOpts.SubRefreshProxyName != "" {
r.ClientSideRefresh = false
}
return r, SubscribeExtra{}, err
} else if chOpts.SubscribeForClient && (c.UserID() != "" || chOpts.SubscribeForAnonymous) && !isUserLimitedChannel {
allowed = true
options.Source = subsource.ClientAllowed
Expand Down Expand Up @@ -742,6 +779,23 @@ func (h *Handler) OnPublish(c Client, e centrifuge.PublishEvent, publishProxyHan
return centrifuge.PublishReply{}, centrifuge.ErrorNotAvailable
}
return publishProxyHandler(c, e, chOpts, getPerCallData(c))
} else if chOpts.ProxySubscribeStream {
if !chOpts.ProxySubscribeStreamBidirectional {
return centrifuge.PublishReply{}, centrifuge.ErrorNotAvailable
}
storage, release := c.AcquireStorage()
publisher, ok := storage["stream_publisher_"+e.Channel].(proxy.StreamPublishFunc)
release(storage)
if ok {
err = publisher(e.Data)
if err != nil {
return centrifuge.PublishReply{}, err
}
return centrifuge.PublishReply{
Result: &centrifuge.PublishResult{},
}, nil
}
return centrifuge.PublishReply{}, centrifuge.ErrorNotAvailable
} else if chOpts.PublishForClient && (c.UserID() != "" || chOpts.PublishForAnonymous) {
allowed = true
} else if chOpts.PublishForSubscriber && c.IsSubscribed(e.Channel) && (c.UserID() != "" || chOpts.PublishForAnonymous) {
Expand Down
36 changes: 18 additions & 18 deletions internal/client/handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -437,7 +437,7 @@ func TestClientSubscribeChannel(t *testing.T) {

_, _, err = h.OnSubscribe(client, centrifuge.SubscribeEvent{
Channel: "non_existing_namespace:test1",
}, nil)
}, nil, nil)
require.Equal(t, centrifuge.ErrorUnknownChannel, err)
}

Expand Down Expand Up @@ -475,7 +475,7 @@ func TestClientSubscribeChannelNoPermission(t *testing.T) {

_, _, err = h.OnSubscribe(client, centrifuge.SubscribeEvent{
Channel: "test1",
}, nil)
}, nil, nil)
require.Equal(t, centrifuge.ErrorPermissionDenied, err)
}

Expand Down Expand Up @@ -514,7 +514,7 @@ func TestClientSubscribeChannelUserLimitedError(t *testing.T) {

_, _, err = h.OnSubscribe(client, centrifuge.SubscribeEvent{
Channel: "test#13",
}, nil)
}, nil, nil)
require.Equal(t, centrifuge.ErrorPermissionDenied, err)
}

Expand Down Expand Up @@ -553,7 +553,7 @@ func TestClientSubscribeChannelUserLimitedOK(t *testing.T) {

_, _, err = h.OnSubscribe(client, centrifuge.SubscribeEvent{
Channel: "test#12",
}, nil)
}, nil, nil)
require.NoError(t, err)
}

Expand Down Expand Up @@ -592,25 +592,25 @@ func TestClientSubscribeWithToken(t *testing.T) {
_, _, err = h.OnSubscribe(client, centrifuge.SubscribeEvent{
Channel: "$test1",
Token: "",
}, nil)
}, nil, nil)
require.Equal(t, centrifuge.ErrorPermissionDenied, err)

_, _, err = h.OnSubscribe(client, centrifuge.SubscribeEvent{
Channel: "$test1",
Token: "invalid",
}, nil)
}, nil, nil)
require.Equal(t, centrifuge.ErrorPermissionDenied, err)

_, _, err = h.OnSubscribe(client, centrifuge.SubscribeEvent{
Channel: "$test1",
Token: getSubscribeTokenHS("12", "$test1", 123),
}, nil)
}, nil, nil)
require.Equal(t, centrifuge.ErrorTokenExpired, err)

reply, _, err := h.OnSubscribe(client, centrifuge.SubscribeEvent{
Channel: "$test1",
Token: getSubscribeTokenHS("12", "$test1", 0),
}, nil)
}, nil, nil)
require.NoError(t, err)
require.Zero(t, reply.Options.ExpireAt)
}
Expand Down Expand Up @@ -650,7 +650,7 @@ func TestClientSubscribeWithTokenAnonymous(t *testing.T) {
_, _, err = h.OnSubscribe(client, centrifuge.SubscribeEvent{
Channel: "$test1",
Token: getSubscribeTokenHS("", "$test1", 0),
}, nil)
}, nil, nil)
require.NoError(t, err)
}

Expand Down Expand Up @@ -690,7 +690,7 @@ func TestClientSideSubRefresh(t *testing.T) {
reply, _, err := h.OnSubscribe(client, centrifuge.SubscribeEvent{
Channel: "$test1",
Token: getSubscribeTokenHS("12", "$test1", time.Now().Unix()+10),
}, nil)
}, nil, nil)
require.NoError(t, err)
require.True(t, reply.Options.ExpireAt > 0)

Expand Down Expand Up @@ -758,13 +758,13 @@ func TestClientSideSubRefresh_SeparateSubTokenConfig(t *testing.T) {
_, _, err = h.OnSubscribe(client, centrifuge.SubscribeEvent{
Channel: "$test1",
Token: getSubscribeTokenHS("12", "$test1", time.Now().Unix()+10),
}, nil)
}, nil, nil)
require.Equal(t, centrifuge.ErrorPermissionDenied, err)

reply, _, err := h.OnSubscribe(client, centrifuge.SubscribeEvent{
Channel: "$test1",
Token: getSubscribeTokenHSWithSecret("12", "$test1", time.Now().Unix()+10, "new_secret"),
}, nil)
}, nil, nil)
require.NoError(t, err)
require.True(t, reply.Options.ExpireAt > 0)

Expand Down Expand Up @@ -795,7 +795,7 @@ func TestClientSubscribePrivateChannelWithExpiringToken(t *testing.T) {
_, _, err = h.OnSubscribe(&centrifuge.Client{}, centrifuge.SubscribeEvent{
Channel: "$test1",
Token: getSubscribeTokenHS("", "$test1", 10),
}, nil)
}, nil, nil)
require.Equal(t, centrifuge.ErrorTokenExpired, err)
}

Expand All @@ -812,7 +812,7 @@ func TestClientSubscribePermissionDeniedForAnonymous(t *testing.T) {

_, _, err = h.OnSubscribe(&centrifuge.Client{}, centrifuge.SubscribeEvent{
Channel: "test1",
}, nil)
}, nil, nil)
require.Equal(t, centrifuge.ErrorPermissionDenied, err)
}

Expand Down Expand Up @@ -1087,7 +1087,7 @@ func TestClientOnSubscribe_UserLimitedChannelDoesNotCallProxy(t *testing.T) {
},
}, centrifuge.SubscribeEvent{
Channel: "user#42",
}, proxyFunc)
}, proxyFunc, nil)
require.NoError(t, err)
require.Equal(t, 0, numProxyCalls)

Expand All @@ -1097,7 +1097,7 @@ func TestClientOnSubscribe_UserLimitedChannelDoesNotCallProxy(t *testing.T) {
},
}, centrifuge.SubscribeEvent{
Channel: "user",
}, proxyFunc)
}, proxyFunc, nil)
require.NoError(t, err)
require.Equal(t, 1, numProxyCalls)
}
Expand Down Expand Up @@ -1130,7 +1130,7 @@ func TestClientOnSubscribe_UserLimitedChannelNotAllowedForAnotherUser(t *testing
},
}, centrifuge.SubscribeEvent{
Channel: "user#42",
}, proxyFunc)
}, proxyFunc, nil)
require.NoError(t, err)

_, _, err = h.OnSubscribe(&tools.TestClientMock{
Expand All @@ -1142,6 +1142,6 @@ func TestClientOnSubscribe_UserLimitedChannelNotAllowedForAnotherUser(t *testing
},
}, centrifuge.SubscribeEvent{
Channel: "user#42",
}, proxyFunc)
}, proxyFunc, nil)
require.ErrorIs(t, err, centrifuge.ErrorPermissionDenied)
}
2 changes: 2 additions & 0 deletions internal/proxy/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,4 +11,6 @@ type Client interface {
UserID() string
Context() context.Context
Transport() centrifuge.TransportInfo
Unsubscribe(ch string, unsubscribe ...centrifuge.Unsubscribe)
WritePublication(channel string, publication *centrifuge.Publication, sp centrifuge.StreamPosition) error
}
12 changes: 6 additions & 6 deletions internal/proxy/connect_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,14 @@ import (

// GRPCConnectProxy ...
type GRPCConnectProxy struct {
proxy Proxy
config Config
client proxyproto.CentrifugoProxyClient
}

var _ ConnectProxy = (*GRPCConnectProxy)(nil)

// NewGRPCConnectProxy ...
func NewGRPCConnectProxy(p Proxy) (*GRPCConnectProxy, error) {
func NewGRPCConnectProxy(p Config) (*GRPCConnectProxy, error) {
host, err := getGrpcHost(p.Endpoint)
if err != nil {
return nil, fmt.Errorf("error getting grpc host: %v", err)
Expand All @@ -35,7 +35,7 @@ func NewGRPCConnectProxy(p Proxy) (*GRPCConnectProxy, error) {
return nil, fmt.Errorf("error connecting to GRPC proxy server: %v", err)
}
return &GRPCConnectProxy{
proxy: p,
config: p,
client: proxyproto.NewCentrifugoProxyClient(conn),
}, nil
}
Expand All @@ -47,12 +47,12 @@ func (p *GRPCConnectProxy) Protocol() string {

// UseBase64 ...
func (p *GRPCConnectProxy) UseBase64() bool {
return p.proxy.BinaryEncoding
return p.config.BinaryEncoding
}

// ProxyConnect proxies connect control to application backend.
func (p *GRPCConnectProxy) ProxyConnect(ctx context.Context, req *proxyproto.ConnectRequest) (*proxyproto.ConnectResponse, error) {
ctx, cancel := context.WithTimeout(ctx, time.Duration(p.proxy.Timeout))
ctx, cancel := context.WithTimeout(ctx, time.Duration(p.config.Timeout))
defer cancel()
return p.client.Connect(grpcRequestContext(ctx, p.proxy), req, grpc.ForceCodec(grpcCodec))
return p.client.Connect(grpcRequestContext(ctx, p.config), req, grpc.ForceCodec(grpcCodec))
}
Loading

0 comments on commit 97735b4

Please sign in to comment.