From 97735b4f758a28635b8fe63a2b9424aab7c8d3f0 Mon Sep 17 00:00:00 2001 From: Alexander Emelin Date: Mon, 9 Oct 2023 21:35:28 +0300 Subject: [PATCH] GRPC proxy subscription streams (#722) --- go.mod | 4 +- go.sum | 8 +- internal/client/client.go | 2 + internal/client/handler.go | 74 +++- internal/client/handler_test.go | 36 +- internal/proxy/client.go | 2 + internal/proxy/connect_grpc.go | 12 +- internal/proxy/connect_handler_test.go | 8 +- internal/proxy/connect_http.go | 10 +- internal/proxy/grpc.go | 7 +- internal/proxy/proxy.go | 16 +- internal/proxy/publish_grpc.go | 14 +- internal/proxy/publish_http.go | 12 +- internal/proxy/refresh_grpc.go | 14 +- internal/proxy/refresh_http.go | 12 +- internal/proxy/rpc_grpc.go | 14 +- internal/proxy/rpc_http.go | 12 +- internal/proxy/sub_refresh_grpc.go | 14 +- internal/proxy/sub_refresh_http.go | 12 +- internal/proxy/subscribe_grpc.go | 14 +- internal/proxy/subscribe_http.go | 12 +- internal/proxy/subscribe_stream_grpc.go | 48 +++ internal/proxy/subscribe_stream_handler.go | 353 ++++++++++++++++++ internal/proxy/unknown_keys.go | 2 +- internal/proxyproto/proxy.pb.go | 395 ++++++++++++++++++--- internal/proxyproto/proxy.proto | 41 +++ internal/proxyproto/proxy_grpc.pb.go | 162 ++++++++- internal/rule/namespace.go | 9 + internal/rule/rule.go | 3 + internal/subsource/source.go | 2 + internal/tools/test_helpers.go | 20 ++ main.go | 176 ++++++--- 32 files changed, 1275 insertions(+), 245 deletions(-) create mode 100644 internal/proxy/subscribe_stream_grpc.go create mode 100644 internal/proxy/subscribe_stream_handler.go diff --git a/go.mod b/go.mod index 25b3c0adcc..4b89d3bda0 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,7 @@ require ( github.com/FZambia/statik v0.1.2-0.20180217151304-b9f012bb2a1b github.com/FZambia/tarantool v0.3.1 github.com/FZambia/viper-lite v0.0.0-20220110144934-1899f66c7d0e - github.com/centrifugal/centrifuge v0.30.2 + github.com/centrifugal/centrifuge v0.30.3-0.20231009162654-6036b763c9aa github.com/centrifugal/protocol v0.10.0 github.com/cristalhq/jwt/v5 v5.1.0 github.com/gobwas/glob v0.2.3 @@ -38,7 +38,7 @@ require ( go.opentelemetry.io/otel/trace v1.19.0 go.uber.org/automaxprocs v1.5.3 golang.org/x/crypto v0.13.0 - golang.org/x/sync v0.3.0 + golang.org/x/sync v0.4.0 golang.org/x/time v0.3.0 google.golang.org/grpc v1.58.2 google.golang.org/protobuf v1.31.0 diff --git a/go.sum b/go.sum index 37c42911da..d3f55ccf3d 100644 --- a/go.sum +++ b/go.sum @@ -14,8 +14,8 @@ github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= github.com/cenkalti/backoff/v4 v4.2.1 h1:y4OZtCnogmCPw98Zjyt5a6+QwPLGkiQsYW5oUqylYbM= github.com/cenkalti/backoff/v4 v4.2.1/go.mod h1:Y3VNntkOUPxTVeUxJ/G5vcM//AlwfmyYozVcomhLiZE= -github.com/centrifugal/centrifuge v0.30.2 h1:Ru/6lPpOMdqTTDRG/gr74HPUBRq7h574WDVwA47lv0U= -github.com/centrifugal/centrifuge v0.30.2/go.mod h1:6aIBCCjAnwpqr6SfkkU36Z0B9s/IZK+Z5faippXPXbI= +github.com/centrifugal/centrifuge v0.30.3-0.20231009162654-6036b763c9aa h1:EFaapglrL6ebSlWBfzhBqBPwVfUq1kPQWjPNPgD4qv0= +github.com/centrifugal/centrifuge v0.30.3-0.20231009162654-6036b763c9aa/go.mod h1:ojcxqaWUcxfQ9t5dVNwOe2/N66AdQMwrYbjymTfkLvs= github.com/centrifugal/protocol v0.10.0 h1:Lac48ATVjVjirYPTHxbSMmiQXXajx7dhARKHy1UOL+A= github.com/centrifugal/protocol v0.10.0/go.mod h1:Tq5I1mBpLHkLxNM9gfb3Gth+sTE2kKU5hH3cVgmVs9s= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= @@ -246,8 +246,8 @@ golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20181221193216-37e7f081c4d4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.3.0 h1:ftCYgMx6zT/asHUrPw8BLLscYtGznsLAnjq5RH9P66E= -golang.org/x/sync v0.3.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= +golang.org/x/sync v0.4.0 h1:zxkM55ReGkDlKSM+Fu41A+zmbZuaPVbGMzvvdUPznYQ= +golang.org/x/sync v0.4.0/go.mod h1:FU7BRWz2tNW+3quACPkgCx/L+uEAv1htQ0V83Z9Rj+Y= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= diff --git a/internal/client/client.go b/internal/client/client.go index 9b17590172..15fd77e234 100644 --- a/internal/client/client.go +++ b/internal/client/client.go @@ -14,4 +14,6 @@ type Client interface { Context() context.Context Transport() centrifuge.TransportInfo AcquireStorage() (map[string]any, func(map[string]any)) + Unsubscribe(ch string, unsubscribe ...centrifuge.Unsubscribe) + WritePublication(channel string, publication *centrifuge.Publication, sp centrifuge.StreamPosition) error } diff --git a/internal/client/handler.go b/internal/client/handler.go index 999439c5e2..10c26aeebf 100644 --- a/internal/client/handler.go +++ b/internal/client/handler.go @@ -22,12 +22,13 @@ type RPCExtensionFunc func(c Client, e centrifuge.RPCEvent) (centrifuge.RPCReply // ProxyMap is a structure which contains all configured and already initialized // proxies which can be used from inside client event handlers. type ProxyMap struct { - ConnectProxy proxy.ConnectProxy - RefreshProxy proxy.RefreshProxy - RpcProxies map[string]proxy.RPCProxy - PublishProxies map[string]proxy.PublishProxy - SubscribeProxies map[string]proxy.SubscribeProxy - SubRefreshProxies map[string]proxy.SubRefreshProxy + ConnectProxy proxy.ConnectProxy + RefreshProxy proxy.RefreshProxy + RpcProxies map[string]proxy.RPCProxy + PublishProxies map[string]proxy.PublishProxy + SubscribeProxies map[string]proxy.SubscribeProxy + SubRefreshProxies map[string]proxy.SubRefreshProxy + SubscribeStreamProxies map[string]*proxy.SubscribeStreamProxy } // Handler ... @@ -114,6 +115,14 @@ func (h *Handler) Setup() error { }).Handle(h.node) } + var proxySubscribeStreamHandler proxy.SubscribeStreamHandlerFunc + if len(h.proxyMap.SubscribeStreamProxies) > 0 { + proxySubscribeStreamHandler = proxy.NewSubscribeStreamHandler(proxy.SubscribeStreamHandlerConfig{ + Proxies: h.proxyMap.SubscribeStreamProxies, + GranularProxyMode: h.granularProxyMode, + }).Handle(h.node) + } + var subRefreshProxyHandler proxy.SubRefreshHandlerFunc if len(h.proxyMap.SubRefreshProxies) > 0 { subRefreshProxyHandler = proxy.NewSubRefreshHandler(proxy.SubRefreshHandlerConfig{ @@ -187,7 +196,7 @@ func (h *Handler) Setup() error { client.OnSubscribe(func(event centrifuge.SubscribeEvent, cb centrifuge.SubscribeCallback) { h.runConcurrentlyIfNeeded(client.Context(), concurrency, semaphore, func() { - reply, _, err := h.OnSubscribe(client, event, subscribeProxyHandler) + reply, _, err := h.OnSubscribe(client, event, subscribeProxyHandler, proxySubscribeStreamHandler) cb(reply, err) }) }) @@ -226,6 +235,20 @@ func (h *Handler) Setup() error { cb(reply, err) }) }) + + client.OnUnsubscribe(func(e centrifuge.UnsubscribeEvent) { + if len(h.proxyMap.SubscribeStreamProxies) > 0 { + storage, release := client.AcquireStorage() + streamCancelKey := "stream_cancel_" + e.Channel + cancelFunc, ok := storage[streamCancelKey].(func()) + if ok { + cancelFunc() + delete(storage, streamCancelKey) + delete(storage, "stream_publisher_"+e.Channel) + } + release(storage) + } + }) }) return nil } @@ -278,9 +301,7 @@ func (h *Handler) OnClientConnecting( ) subscriptions := make(map[string]centrifuge.SubscribeOptions) - ruleConfig := h.ruleContainer.Config() - var processClientChannels bool storage := map[string]any{} @@ -603,7 +624,7 @@ type SubscribeExtra struct { } // OnSubscribe ... -func (h *Handler) OnSubscribe(c Client, e centrifuge.SubscribeEvent, subscribeProxyHandler proxy.SubscribeHandlerFunc) (centrifuge.SubscribeReply, SubscribeExtra, error) { +func (h *Handler) OnSubscribe(c Client, e centrifuge.SubscribeEvent, subscribeProxyHandler proxy.SubscribeHandlerFunc, subscribeStreamHandlerFunc proxy.SubscribeStreamHandlerFunc) (centrifuge.SubscribeReply, SubscribeExtra, error) { ruleConfig := h.ruleContainer.Config() if e.Channel == "" { @@ -688,6 +709,22 @@ func (h *Handler) OnSubscribe(c Client, e centrifuge.SubscribeEvent, subscribePr r.ClientSideRefresh = false } return r, SubscribeExtra{}, err + } else if (chOpts.ProxySubscribeStream || chOpts.SubscribeStreamProxyName != "") && !isUserLimitedChannel { + if subscribeStreamHandlerFunc == nil { + h.node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, "stream proxy not enabled", map[string]any{"channel": e.Channel, "user": c.UserID(), "client": c.ID()})) + return centrifuge.SubscribeReply{}, SubscribeExtra{}, centrifuge.ErrorNotAvailable + } + r, publishFunc, cancelFunc, err := subscribeStreamHandlerFunc(c, chOpts.ProxySubscribeStreamBidirectional, e, chOpts, getPerCallData(c)) + if chOpts.ProxySubscribeStreamBidirectional { + storage, release := c.AcquireStorage() + storage["stream_cancel_"+e.Channel] = cancelFunc + storage["stream_publisher_"+e.Channel] = publishFunc + release(storage) + } + if chOpts.ProxySubRefresh || chOpts.SubRefreshProxyName != "" { + r.ClientSideRefresh = false + } + return r, SubscribeExtra{}, err } else if chOpts.SubscribeForClient && (c.UserID() != "" || chOpts.SubscribeForAnonymous) && !isUserLimitedChannel { allowed = true options.Source = subsource.ClientAllowed @@ -742,6 +779,23 @@ func (h *Handler) OnPublish(c Client, e centrifuge.PublishEvent, publishProxyHan return centrifuge.PublishReply{}, centrifuge.ErrorNotAvailable } return publishProxyHandler(c, e, chOpts, getPerCallData(c)) + } else if chOpts.ProxySubscribeStream { + if !chOpts.ProxySubscribeStreamBidirectional { + return centrifuge.PublishReply{}, centrifuge.ErrorNotAvailable + } + storage, release := c.AcquireStorage() + publisher, ok := storage["stream_publisher_"+e.Channel].(proxy.StreamPublishFunc) + release(storage) + if ok { + err = publisher(e.Data) + if err != nil { + return centrifuge.PublishReply{}, err + } + return centrifuge.PublishReply{ + Result: ¢rifuge.PublishResult{}, + }, nil + } + return centrifuge.PublishReply{}, centrifuge.ErrorNotAvailable } else if chOpts.PublishForClient && (c.UserID() != "" || chOpts.PublishForAnonymous) { allowed = true } else if chOpts.PublishForSubscriber && c.IsSubscribed(e.Channel) && (c.UserID() != "" || chOpts.PublishForAnonymous) { diff --git a/internal/client/handler_test.go b/internal/client/handler_test.go index 529ae173c2..a1e519ca69 100644 --- a/internal/client/handler_test.go +++ b/internal/client/handler_test.go @@ -437,7 +437,7 @@ func TestClientSubscribeChannel(t *testing.T) { _, _, err = h.OnSubscribe(client, centrifuge.SubscribeEvent{ Channel: "non_existing_namespace:test1", - }, nil) + }, nil, nil) require.Equal(t, centrifuge.ErrorUnknownChannel, err) } @@ -475,7 +475,7 @@ func TestClientSubscribeChannelNoPermission(t *testing.T) { _, _, err = h.OnSubscribe(client, centrifuge.SubscribeEvent{ Channel: "test1", - }, nil) + }, nil, nil) require.Equal(t, centrifuge.ErrorPermissionDenied, err) } @@ -514,7 +514,7 @@ func TestClientSubscribeChannelUserLimitedError(t *testing.T) { _, _, err = h.OnSubscribe(client, centrifuge.SubscribeEvent{ Channel: "test#13", - }, nil) + }, nil, nil) require.Equal(t, centrifuge.ErrorPermissionDenied, err) } @@ -553,7 +553,7 @@ func TestClientSubscribeChannelUserLimitedOK(t *testing.T) { _, _, err = h.OnSubscribe(client, centrifuge.SubscribeEvent{ Channel: "test#12", - }, nil) + }, nil, nil) require.NoError(t, err) } @@ -592,25 +592,25 @@ func TestClientSubscribeWithToken(t *testing.T) { _, _, err = h.OnSubscribe(client, centrifuge.SubscribeEvent{ Channel: "$test1", Token: "", - }, nil) + }, nil, nil) require.Equal(t, centrifuge.ErrorPermissionDenied, err) _, _, err = h.OnSubscribe(client, centrifuge.SubscribeEvent{ Channel: "$test1", Token: "invalid", - }, nil) + }, nil, nil) require.Equal(t, centrifuge.ErrorPermissionDenied, err) _, _, err = h.OnSubscribe(client, centrifuge.SubscribeEvent{ Channel: "$test1", Token: getSubscribeTokenHS("12", "$test1", 123), - }, nil) + }, nil, nil) require.Equal(t, centrifuge.ErrorTokenExpired, err) reply, _, err := h.OnSubscribe(client, centrifuge.SubscribeEvent{ Channel: "$test1", Token: getSubscribeTokenHS("12", "$test1", 0), - }, nil) + }, nil, nil) require.NoError(t, err) require.Zero(t, reply.Options.ExpireAt) } @@ -650,7 +650,7 @@ func TestClientSubscribeWithTokenAnonymous(t *testing.T) { _, _, err = h.OnSubscribe(client, centrifuge.SubscribeEvent{ Channel: "$test1", Token: getSubscribeTokenHS("", "$test1", 0), - }, nil) + }, nil, nil) require.NoError(t, err) } @@ -690,7 +690,7 @@ func TestClientSideSubRefresh(t *testing.T) { reply, _, err := h.OnSubscribe(client, centrifuge.SubscribeEvent{ Channel: "$test1", Token: getSubscribeTokenHS("12", "$test1", time.Now().Unix()+10), - }, nil) + }, nil, nil) require.NoError(t, err) require.True(t, reply.Options.ExpireAt > 0) @@ -758,13 +758,13 @@ func TestClientSideSubRefresh_SeparateSubTokenConfig(t *testing.T) { _, _, err = h.OnSubscribe(client, centrifuge.SubscribeEvent{ Channel: "$test1", Token: getSubscribeTokenHS("12", "$test1", time.Now().Unix()+10), - }, nil) + }, nil, nil) require.Equal(t, centrifuge.ErrorPermissionDenied, err) reply, _, err := h.OnSubscribe(client, centrifuge.SubscribeEvent{ Channel: "$test1", Token: getSubscribeTokenHSWithSecret("12", "$test1", time.Now().Unix()+10, "new_secret"), - }, nil) + }, nil, nil) require.NoError(t, err) require.True(t, reply.Options.ExpireAt > 0) @@ -795,7 +795,7 @@ func TestClientSubscribePrivateChannelWithExpiringToken(t *testing.T) { _, _, err = h.OnSubscribe(¢rifuge.Client{}, centrifuge.SubscribeEvent{ Channel: "$test1", Token: getSubscribeTokenHS("", "$test1", 10), - }, nil) + }, nil, nil) require.Equal(t, centrifuge.ErrorTokenExpired, err) } @@ -812,7 +812,7 @@ func TestClientSubscribePermissionDeniedForAnonymous(t *testing.T) { _, _, err = h.OnSubscribe(¢rifuge.Client{}, centrifuge.SubscribeEvent{ Channel: "test1", - }, nil) + }, nil, nil) require.Equal(t, centrifuge.ErrorPermissionDenied, err) } @@ -1087,7 +1087,7 @@ func TestClientOnSubscribe_UserLimitedChannelDoesNotCallProxy(t *testing.T) { }, }, centrifuge.SubscribeEvent{ Channel: "user#42", - }, proxyFunc) + }, proxyFunc, nil) require.NoError(t, err) require.Equal(t, 0, numProxyCalls) @@ -1097,7 +1097,7 @@ func TestClientOnSubscribe_UserLimitedChannelDoesNotCallProxy(t *testing.T) { }, }, centrifuge.SubscribeEvent{ Channel: "user", - }, proxyFunc) + }, proxyFunc, nil) require.NoError(t, err) require.Equal(t, 1, numProxyCalls) } @@ -1130,7 +1130,7 @@ func TestClientOnSubscribe_UserLimitedChannelNotAllowedForAnotherUser(t *testing }, }, centrifuge.SubscribeEvent{ Channel: "user#42", - }, proxyFunc) + }, proxyFunc, nil) require.NoError(t, err) _, _, err = h.OnSubscribe(&tools.TestClientMock{ @@ -1142,6 +1142,6 @@ func TestClientOnSubscribe_UserLimitedChannelNotAllowedForAnotherUser(t *testing }, }, centrifuge.SubscribeEvent{ Channel: "user#42", - }, proxyFunc) + }, proxyFunc, nil) require.ErrorIs(t, err, centrifuge.ErrorPermissionDenied) } diff --git a/internal/proxy/client.go b/internal/proxy/client.go index 5cd709b2ec..12bf01828c 100644 --- a/internal/proxy/client.go +++ b/internal/proxy/client.go @@ -11,4 +11,6 @@ type Client interface { UserID() string Context() context.Context Transport() centrifuge.TransportInfo + Unsubscribe(ch string, unsubscribe ...centrifuge.Unsubscribe) + WritePublication(channel string, publication *centrifuge.Publication, sp centrifuge.StreamPosition) error } diff --git a/internal/proxy/connect_grpc.go b/internal/proxy/connect_grpc.go index 3abc66f3a6..07714371c0 100644 --- a/internal/proxy/connect_grpc.go +++ b/internal/proxy/connect_grpc.go @@ -12,14 +12,14 @@ import ( // GRPCConnectProxy ... type GRPCConnectProxy struct { - proxy Proxy + config Config client proxyproto.CentrifugoProxyClient } var _ ConnectProxy = (*GRPCConnectProxy)(nil) // NewGRPCConnectProxy ... -func NewGRPCConnectProxy(p Proxy) (*GRPCConnectProxy, error) { +func NewGRPCConnectProxy(p Config) (*GRPCConnectProxy, error) { host, err := getGrpcHost(p.Endpoint) if err != nil { return nil, fmt.Errorf("error getting grpc host: %v", err) @@ -35,7 +35,7 @@ func NewGRPCConnectProxy(p Proxy) (*GRPCConnectProxy, error) { return nil, fmt.Errorf("error connecting to GRPC proxy server: %v", err) } return &GRPCConnectProxy{ - proxy: p, + config: p, client: proxyproto.NewCentrifugoProxyClient(conn), }, nil } @@ -47,12 +47,12 @@ func (p *GRPCConnectProxy) Protocol() string { // UseBase64 ... func (p *GRPCConnectProxy) UseBase64() bool { - return p.proxy.BinaryEncoding + return p.config.BinaryEncoding } // ProxyConnect proxies connect control to application backend. func (p *GRPCConnectProxy) ProxyConnect(ctx context.Context, req *proxyproto.ConnectRequest) (*proxyproto.ConnectResponse, error) { - ctx, cancel := context.WithTimeout(ctx, time.Duration(p.proxy.Timeout)) + ctx, cancel := context.WithTimeout(ctx, time.Duration(p.config.Timeout)) defer cancel() - return p.client.Connect(grpcRequestContext(ctx, p.proxy), req, grpc.ForceCodec(grpcCodec)) + return p.client.Connect(grpcRequestContext(ctx, p.config), req, grpc.ForceCodec(grpcCodec)) } diff --git a/internal/proxy/connect_handler_test.go b/internal/proxy/connect_handler_test.go index 057c142fb8..dcad8beaf5 100644 --- a/internal/proxy/connect_handler_test.go +++ b/internal/proxy/connect_handler_test.go @@ -22,8 +22,8 @@ type grpcConnHandleTestCase struct { connectProxyHandler *ConnectHandler } -func getTestGrpcProxy(commonProxyTestCase *tools.CommonGRPCProxyTestCase) Proxy { - return Proxy{ +func getTestGrpcProxy(commonProxyTestCase *tools.CommonGRPCProxyTestCase) Config { + return Config{ Endpoint: commonProxyTestCase.Listener.Addr().String(), Timeout: tools.Duration(5 * time.Second), testGrpcDialer: func(ctx context.Context, s string) (net.Conn, error) { @@ -32,8 +32,8 @@ func getTestGrpcProxy(commonProxyTestCase *tools.CommonGRPCProxyTestCase) Proxy } } -func getTestHttpProxy(commonProxyTestCase *tools.CommonHTTPProxyTestCase, endpoint string) Proxy { - return Proxy{ +func getTestHttpProxy(commonProxyTestCase *tools.CommonHTTPProxyTestCase, endpoint string) Config { + return Config{ Endpoint: commonProxyTestCase.Server.URL + endpoint, Timeout: tools.Duration(5 * time.Second), StaticHttpHeaders: map[string]string{ diff --git a/internal/proxy/connect_http.go b/internal/proxy/connect_http.go index 90e828d5e9..47ecbb56b1 100644 --- a/internal/proxy/connect_http.go +++ b/internal/proxy/connect_http.go @@ -9,16 +9,16 @@ import ( // HTTPConnectProxy ... type HTTPConnectProxy struct { - proxy Proxy + config Config httpCaller HTTPCaller } var _ ConnectProxy = (*HTTPConnectProxy)(nil) // NewHTTPConnectProxy ... -func NewHTTPConnectProxy(p Proxy) (*HTTPConnectProxy, error) { +func NewHTTPConnectProxy(p Config) (*HTTPConnectProxy, error) { return &HTTPConnectProxy{ - proxy: p, + config: p, httpCaller: NewHTTPCaller(proxyHTTPClient(time.Duration(p.Timeout))), }, nil } @@ -30,7 +30,7 @@ func (p *HTTPConnectProxy) Protocol() string { // UseBase64 ... func (p *HTTPConnectProxy) UseBase64() bool { - return p.proxy.BinaryEncoding + return p.config.BinaryEncoding } // ProxyConnect proxies connect control to application backend. @@ -39,7 +39,7 @@ func (p *HTTPConnectProxy) ProxyConnect(ctx context.Context, req *proxyproto.Con if err != nil { return nil, err } - respData, err := p.httpCaller.CallHTTP(ctx, p.proxy.Endpoint, httpRequestHeaders(ctx, p.proxy), data) + respData, err := p.httpCaller.CallHTTP(ctx, p.config.Endpoint, httpRequestHeaders(ctx, p.config), data) if err != nil { return nil, err } diff --git a/internal/proxy/grpc.go b/internal/proxy/grpc.go index 25247129f3..123b81261a 100644 --- a/internal/proxy/grpc.go +++ b/internal/proxy/grpc.go @@ -47,7 +47,7 @@ func getGrpcHost(endpoint string) (string, error) { return host, nil } -func getDialOpts(p Proxy) ([]grpc.DialOption, error) { +func getDialOpts(p Config) ([]grpc.DialOption, error) { var dialOpts []grpc.DialOption if p.GrpcCredentialsKey != "" { dialOpts = append(dialOpts, grpc.WithPerRPCCredentials(&rpcCredentials{ @@ -69,16 +69,15 @@ func getDialOpts(p Proxy) ([]grpc.DialOption, error) { dialOpts = append(dialOpts, grpc.WithContextDialer(p.testGrpcDialer)) } - dialOpts = append(dialOpts, grpc.WithBlock()) return dialOpts, nil } -func grpcRequestContext(ctx context.Context, proxy Proxy) context.Context { +func grpcRequestContext(ctx context.Context, proxy Config) context.Context { md := requestMetadata(ctx, proxy.HttpHeaders, proxy.GrpcMetadata) return metadata.NewOutgoingContext(ctx, md) } -func httpRequestHeaders(ctx context.Context, proxy Proxy) http.Header { +func httpRequestHeaders(ctx context.Context, proxy Config) http.Header { return requestHeaders(ctx, proxy.HttpHeaders, proxy.GrpcMetadata, proxy.StaticHttpHeaders) } diff --git a/internal/proxy/proxy.go b/internal/proxy/proxy.go index 5a1107f8f8..2f43b11bed 100644 --- a/internal/proxy/proxy.go +++ b/internal/proxy/proxy.go @@ -9,8 +9,8 @@ import ( "github.com/centrifugal/centrifugo/v5/internal/tools" ) -// Proxy model. -type Proxy struct { +// Config for proxy. +type Config struct { // Name is a unique name of proxy to reference. Name string `mapstructure:"name" json:"name"` // Endpoint - HTTP address or GRPC service endpoint. @@ -57,42 +57,42 @@ func isHttpEndpoint(endpoint string) bool { return strings.HasPrefix(endpoint, "http://") || strings.HasPrefix(endpoint, "https://") } -func GetConnectProxy(p Proxy) (ConnectProxy, error) { +func GetConnectProxy(p Config) (ConnectProxy, error) { if isHttpEndpoint(p.Endpoint) { return NewHTTPConnectProxy(p) } return NewGRPCConnectProxy(p) } -func GetRefreshProxy(p Proxy) (RefreshProxy, error) { +func GetRefreshProxy(p Config) (RefreshProxy, error) { if isHttpEndpoint(p.Endpoint) { return NewHTTPRefreshProxy(p) } return NewGRPCRefreshProxy(p) } -func GetRpcProxy(p Proxy) (RPCProxy, error) { +func GetRpcProxy(p Config) (RPCProxy, error) { if isHttpEndpoint(p.Endpoint) { return NewHTTPRPCProxy(p) } return NewGRPCRPCProxy(p) } -func GetSubRefreshProxy(p Proxy) (SubRefreshProxy, error) { +func GetSubRefreshProxy(p Config) (SubRefreshProxy, error) { if isHttpEndpoint(p.Endpoint) { return NewHTTPSubRefreshProxy(p) } return NewGRPCSubRefreshProxy(p) } -func GetPublishProxy(p Proxy) (PublishProxy, error) { +func GetPublishProxy(p Config) (PublishProxy, error) { if isHttpEndpoint(p.Endpoint) { return NewHTTPPublishProxy(p) } return NewGRPCPublishProxy(p) } -func GetSubscribeProxy(p Proxy) (SubscribeProxy, error) { +func GetSubscribeProxy(p Config) (SubscribeProxy, error) { if isHttpEndpoint(p.Endpoint) { return NewHTTPSubscribeProxy(p) } diff --git a/internal/proxy/publish_grpc.go b/internal/proxy/publish_grpc.go index 9c12d403c5..83e9b4b38c 100644 --- a/internal/proxy/publish_grpc.go +++ b/internal/proxy/publish_grpc.go @@ -12,14 +12,14 @@ import ( // GRPCPublishProxy ... type GRPCPublishProxy struct { - proxy Proxy + config Config client proxyproto.CentrifugoProxyClient } var _ PublishProxy = (*GRPCPublishProxy)(nil) // NewGRPCPublishProxy ... -func NewGRPCPublishProxy(p Proxy) (*GRPCPublishProxy, error) { +func NewGRPCPublishProxy(p Config) (*GRPCPublishProxy, error) { host, err := getGrpcHost(p.Endpoint) if err != nil { return nil, fmt.Errorf("error getting grpc host: %v", err) @@ -35,16 +35,16 @@ func NewGRPCPublishProxy(p Proxy) (*GRPCPublishProxy, error) { return nil, fmt.Errorf("error connecting to GRPC proxy server: %v", err) } return &GRPCPublishProxy{ - proxy: p, + config: p, client: proxyproto.NewCentrifugoProxyClient(conn), }, nil } // ProxyPublish proxies Publish to application backend. func (p *GRPCPublishProxy) ProxyPublish(ctx context.Context, req *proxyproto.PublishRequest) (*proxyproto.PublishResponse, error) { - ctx, cancel := context.WithTimeout(ctx, time.Duration(p.proxy.Timeout)) + ctx, cancel := context.WithTimeout(ctx, time.Duration(p.config.Timeout)) defer cancel() - return p.client.Publish(grpcRequestContext(ctx, p.proxy), req, grpc.ForceCodec(grpcCodec)) + return p.client.Publish(grpcRequestContext(ctx, p.config), req, grpc.ForceCodec(grpcCodec)) } // Protocol ... @@ -54,10 +54,10 @@ func (p *GRPCPublishProxy) Protocol() string { // UseBase64 ... func (p *GRPCPublishProxy) UseBase64() bool { - return p.proxy.BinaryEncoding + return p.config.BinaryEncoding } // IncludeMeta ... func (p *GRPCPublishProxy) IncludeMeta() bool { - return p.proxy.IncludeConnectionMeta + return p.config.IncludeConnectionMeta } diff --git a/internal/proxy/publish_http.go b/internal/proxy/publish_http.go index 1eef99cab6..0a27e6cedc 100644 --- a/internal/proxy/publish_http.go +++ b/internal/proxy/publish_http.go @@ -22,17 +22,17 @@ type PublishRequestHTTP struct { // HTTPPublishProxy ... type HTTPPublishProxy struct { - proxy Proxy + config Config httpCaller HTTPCaller } var _ PublishProxy = (*HTTPPublishProxy)(nil) // NewHTTPPublishProxy ... -func NewHTTPPublishProxy(p Proxy) (*HTTPPublishProxy, error) { +func NewHTTPPublishProxy(p Config) (*HTTPPublishProxy, error) { return &HTTPPublishProxy{ httpCaller: NewHTTPCaller(proxyHTTPClient(time.Duration(p.Timeout))), - proxy: p, + config: p, }, nil } @@ -42,7 +42,7 @@ func (p *HTTPPublishProxy) ProxyPublish(ctx context.Context, req *proxyproto.Pub if err != nil { return nil, err } - respData, err := p.httpCaller.CallHTTP(ctx, p.proxy.Endpoint, httpRequestHeaders(ctx, p.proxy), data) + respData, err := p.httpCaller.CallHTTP(ctx, p.config.Endpoint, httpRequestHeaders(ctx, p.config), data) if err != nil { return nil, err } @@ -56,10 +56,10 @@ func (p *HTTPPublishProxy) Protocol() string { // UseBase64 ... func (p *HTTPPublishProxy) UseBase64() bool { - return p.proxy.BinaryEncoding + return p.config.BinaryEncoding } // IncludeMeta ... func (p *HTTPPublishProxy) IncludeMeta() bool { - return p.proxy.IncludeConnectionMeta + return p.config.IncludeConnectionMeta } diff --git a/internal/proxy/refresh_grpc.go b/internal/proxy/refresh_grpc.go index d0e0b5e59b..001fa74507 100644 --- a/internal/proxy/refresh_grpc.go +++ b/internal/proxy/refresh_grpc.go @@ -12,14 +12,14 @@ import ( // GRPCRefreshProxy ... type GRPCRefreshProxy struct { - proxy Proxy + config Config client proxyproto.CentrifugoProxyClient } var _ RefreshProxy = (*GRPCRefreshProxy)(nil) // NewGRPCRefreshProxy ... -func NewGRPCRefreshProxy(p Proxy) (*GRPCRefreshProxy, error) { +func NewGRPCRefreshProxy(p Config) (*GRPCRefreshProxy, error) { host, err := getGrpcHost(p.Endpoint) if err != nil { return nil, fmt.Errorf("error getting grpc host: %v", err) @@ -35,16 +35,16 @@ func NewGRPCRefreshProxy(p Proxy) (*GRPCRefreshProxy, error) { return nil, fmt.Errorf("error connecting to GRPC proxy server: %v", err) } return &GRPCRefreshProxy{ - proxy: p, + config: p, client: proxyproto.NewCentrifugoProxyClient(conn), }, nil } // ProxyRefresh proxies refresh to application backend. func (p *GRPCRefreshProxy) ProxyRefresh(ctx context.Context, req *proxyproto.RefreshRequest) (*proxyproto.RefreshResponse, error) { - ctx, cancel := context.WithTimeout(ctx, time.Duration(p.proxy.Timeout)) + ctx, cancel := context.WithTimeout(ctx, time.Duration(p.config.Timeout)) defer cancel() - return p.client.Refresh(grpcRequestContext(ctx, p.proxy), req, grpc.ForceCodec(grpcCodec)) + return p.client.Refresh(grpcRequestContext(ctx, p.config), req, grpc.ForceCodec(grpcCodec)) } // Protocol ... @@ -54,10 +54,10 @@ func (p *GRPCRefreshProxy) Protocol() string { // UseBase64 ... func (p *GRPCRefreshProxy) UseBase64() bool { - return p.proxy.BinaryEncoding + return p.config.BinaryEncoding } // IncludeMeta ... func (p *GRPCRefreshProxy) IncludeMeta() bool { - return p.proxy.IncludeConnectionMeta + return p.config.IncludeConnectionMeta } diff --git a/internal/proxy/refresh_http.go b/internal/proxy/refresh_http.go index 953a97e1d3..b5741e6721 100644 --- a/internal/proxy/refresh_http.go +++ b/internal/proxy/refresh_http.go @@ -16,16 +16,16 @@ type RefreshRequestHTTP struct { // HTTPRefreshProxy ... type HTTPRefreshProxy struct { - proxy Proxy + config Config httpCaller HTTPCaller } var _ RefreshProxy = (*HTTPRefreshProxy)(nil) // NewHTTPRefreshProxy ... -func NewHTTPRefreshProxy(p Proxy) (*HTTPRefreshProxy, error) { +func NewHTTPRefreshProxy(p Config) (*HTTPRefreshProxy, error) { return &HTTPRefreshProxy{ - proxy: p, + config: p, httpCaller: NewHTTPCaller(proxyHTTPClient(time.Duration(p.Timeout))), }, nil } @@ -36,7 +36,7 @@ func (p *HTTPRefreshProxy) ProxyRefresh(ctx context.Context, req *proxyproto.Ref if err != nil { return nil, err } - respData, err := p.httpCaller.CallHTTP(ctx, p.proxy.Endpoint, httpRequestHeaders(ctx, p.proxy), data) + respData, err := p.httpCaller.CallHTTP(ctx, p.config.Endpoint, httpRequestHeaders(ctx, p.config), data) if err != nil { return nil, err } @@ -50,10 +50,10 @@ func (p *HTTPRefreshProxy) Protocol() string { // UseBase64 ... func (p *HTTPRefreshProxy) UseBase64() bool { - return p.proxy.BinaryEncoding + return p.config.BinaryEncoding } // IncludeMeta ... func (p *HTTPRefreshProxy) IncludeMeta() bool { - return p.proxy.IncludeConnectionMeta + return p.config.IncludeConnectionMeta } diff --git a/internal/proxy/rpc_grpc.go b/internal/proxy/rpc_grpc.go index f05b2f2924..5e55808efd 100644 --- a/internal/proxy/rpc_grpc.go +++ b/internal/proxy/rpc_grpc.go @@ -12,14 +12,14 @@ import ( // GRPCRPCProxy ... type GRPCRPCProxy struct { - proxy Proxy + config Config client proxyproto.CentrifugoProxyClient } var _ RPCProxy = (*GRPCRPCProxy)(nil) // NewGRPCRPCProxy ... -func NewGRPCRPCProxy(p Proxy) (*GRPCRPCProxy, error) { +func NewGRPCRPCProxy(p Config) (*GRPCRPCProxy, error) { host, err := getGrpcHost(p.Endpoint) if err != nil { return nil, fmt.Errorf("error getting grpc host: %v", err) @@ -35,16 +35,16 @@ func NewGRPCRPCProxy(p Proxy) (*GRPCRPCProxy, error) { return nil, fmt.Errorf("error connecting to GRPC proxy server: %v", err) } return &GRPCRPCProxy{ - proxy: p, + config: p, client: proxyproto.NewCentrifugoProxyClient(conn), }, nil } // ProxyRPC ... func (p *GRPCRPCProxy) ProxyRPC(ctx context.Context, req *proxyproto.RPCRequest) (*proxyproto.RPCResponse, error) { - ctx, cancel := context.WithTimeout(ctx, time.Duration(p.proxy.Timeout)) + ctx, cancel := context.WithTimeout(ctx, time.Duration(p.config.Timeout)) defer cancel() - return p.client.RPC(grpcRequestContext(ctx, p.proxy), req, grpc.ForceCodec(grpcCodec)) + return p.client.RPC(grpcRequestContext(ctx, p.config), req, grpc.ForceCodec(grpcCodec)) } // Protocol ... @@ -54,10 +54,10 @@ func (p *GRPCRPCProxy) Protocol() string { // UseBase64 ... func (p *GRPCRPCProxy) UseBase64() bool { - return p.proxy.BinaryEncoding + return p.config.BinaryEncoding } // IncludeMeta ... func (p *GRPCRPCProxy) IncludeMeta() bool { - return p.proxy.IncludeConnectionMeta + return p.config.IncludeConnectionMeta } diff --git a/internal/proxy/rpc_http.go b/internal/proxy/rpc_http.go index 76496f6436..d78a37ed58 100644 --- a/internal/proxy/rpc_http.go +++ b/internal/proxy/rpc_http.go @@ -9,16 +9,16 @@ import ( // HTTPRPCProxy ... type HTTPRPCProxy struct { - proxy Proxy + config Config httpCaller HTTPCaller } var _ RPCProxy = (*HTTPRPCProxy)(nil) // NewHTTPRPCProxy ... -func NewHTTPRPCProxy(p Proxy) (*HTTPRPCProxy, error) { +func NewHTTPRPCProxy(p Config) (*HTTPRPCProxy, error) { return &HTTPRPCProxy{ - proxy: p, + config: p, httpCaller: NewHTTPCaller(proxyHTTPClient(time.Duration(p.Timeout))), }, nil } @@ -29,7 +29,7 @@ func (p *HTTPRPCProxy) ProxyRPC(ctx context.Context, req *proxyproto.RPCRequest) if err != nil { return nil, err } - respData, err := p.httpCaller.CallHTTP(ctx, p.proxy.Endpoint, httpRequestHeaders(ctx, p.proxy), data) + respData, err := p.httpCaller.CallHTTP(ctx, p.config.Endpoint, httpRequestHeaders(ctx, p.config), data) if err != nil { return nil, err } @@ -43,10 +43,10 @@ func (p *HTTPRPCProxy) Protocol() string { // UseBase64 ... func (p *HTTPRPCProxy) UseBase64() bool { - return p.proxy.BinaryEncoding + return p.config.BinaryEncoding } // IncludeMeta ... func (p *HTTPRPCProxy) IncludeMeta() bool { - return p.proxy.IncludeConnectionMeta + return p.config.IncludeConnectionMeta } diff --git a/internal/proxy/sub_refresh_grpc.go b/internal/proxy/sub_refresh_grpc.go index bbfa127c01..4cf15ddfa2 100644 --- a/internal/proxy/sub_refresh_grpc.go +++ b/internal/proxy/sub_refresh_grpc.go @@ -12,14 +12,14 @@ import ( // GRPCSubRefreshProxy ... type GRPCSubRefreshProxy struct { - proxy Proxy + config Config client proxyproto.CentrifugoProxyClient } var _ SubRefreshProxy = (*GRPCSubRefreshProxy)(nil) // NewGRPCSubRefreshProxy ... -func NewGRPCSubRefreshProxy(p Proxy) (*GRPCSubRefreshProxy, error) { +func NewGRPCSubRefreshProxy(p Config) (*GRPCSubRefreshProxy, error) { host, err := getGrpcHost(p.Endpoint) if err != nil { return nil, fmt.Errorf("error getting grpc host: %v", err) @@ -35,16 +35,16 @@ func NewGRPCSubRefreshProxy(p Proxy) (*GRPCSubRefreshProxy, error) { return nil, fmt.Errorf("error connecting to GRPC proxy server: %v", err) } return &GRPCSubRefreshProxy{ - proxy: p, + config: p, client: proxyproto.NewCentrifugoProxyClient(conn), }, nil } // ProxySubRefresh proxies refresh to application backend. func (p *GRPCSubRefreshProxy) ProxySubRefresh(ctx context.Context, req *proxyproto.SubRefreshRequest) (*proxyproto.SubRefreshResponse, error) { - ctx, cancel := context.WithTimeout(ctx, time.Duration(p.proxy.Timeout)) + ctx, cancel := context.WithTimeout(ctx, time.Duration(p.config.Timeout)) defer cancel() - return p.client.SubRefresh(grpcRequestContext(ctx, p.proxy), req, grpc.ForceCodec(grpcCodec)) + return p.client.SubRefresh(grpcRequestContext(ctx, p.config), req, grpc.ForceCodec(grpcCodec)) } // Protocol ... @@ -54,10 +54,10 @@ func (p *GRPCSubRefreshProxy) Protocol() string { // UseBase64 ... func (p *GRPCSubRefreshProxy) UseBase64() bool { - return p.proxy.BinaryEncoding + return p.config.BinaryEncoding } // IncludeMeta ... func (p *GRPCSubRefreshProxy) IncludeMeta() bool { - return p.proxy.IncludeConnectionMeta + return p.config.IncludeConnectionMeta } diff --git a/internal/proxy/sub_refresh_http.go b/internal/proxy/sub_refresh_http.go index 23593c216e..4de1640c07 100644 --- a/internal/proxy/sub_refresh_http.go +++ b/internal/proxy/sub_refresh_http.go @@ -17,16 +17,16 @@ type SubRefreshRequestHTTP struct { // HTTPSubRefreshProxy ... type HTTPSubRefreshProxy struct { - proxy Proxy + config Config httpCaller HTTPCaller } var _ SubRefreshProxy = (*HTTPSubRefreshProxy)(nil) // NewHTTPSubRefreshProxy ... -func NewHTTPSubRefreshProxy(p Proxy) (*HTTPSubRefreshProxy, error) { +func NewHTTPSubRefreshProxy(p Config) (*HTTPSubRefreshProxy, error) { return &HTTPSubRefreshProxy{ - proxy: p, + config: p, httpCaller: NewHTTPCaller(proxyHTTPClient(time.Duration(p.Timeout))), }, nil } @@ -37,7 +37,7 @@ func (p *HTTPSubRefreshProxy) ProxySubRefresh(ctx context.Context, req *proxypro if err != nil { return nil, err } - respData, err := p.httpCaller.CallHTTP(ctx, p.proxy.Endpoint, httpRequestHeaders(ctx, p.proxy), data) + respData, err := p.httpCaller.CallHTTP(ctx, p.config.Endpoint, httpRequestHeaders(ctx, p.config), data) if err != nil { return nil, err } @@ -51,10 +51,10 @@ func (p *HTTPSubRefreshProxy) Protocol() string { // UseBase64 ... func (p *HTTPSubRefreshProxy) UseBase64() bool { - return p.proxy.BinaryEncoding + return p.config.BinaryEncoding } // IncludeMeta ... func (p *HTTPSubRefreshProxy) IncludeMeta() bool { - return p.proxy.IncludeConnectionMeta + return p.config.IncludeConnectionMeta } diff --git a/internal/proxy/subscribe_grpc.go b/internal/proxy/subscribe_grpc.go index aa49f4c9f8..f2c64c9f07 100644 --- a/internal/proxy/subscribe_grpc.go +++ b/internal/proxy/subscribe_grpc.go @@ -12,14 +12,14 @@ import ( // GRPCSubscribeProxy ... type GRPCSubscribeProxy struct { - proxy Proxy + config Config client proxyproto.CentrifugoProxyClient } var _ SubscribeProxy = (*GRPCSubscribeProxy)(nil) // NewGRPCSubscribeProxy ... -func NewGRPCSubscribeProxy(p Proxy) (*GRPCSubscribeProxy, error) { +func NewGRPCSubscribeProxy(p Config) (*GRPCSubscribeProxy, error) { host, err := getGrpcHost(p.Endpoint) if err != nil { return nil, fmt.Errorf("error getting grpc host: %v", err) @@ -35,16 +35,16 @@ func NewGRPCSubscribeProxy(p Proxy) (*GRPCSubscribeProxy, error) { return nil, fmt.Errorf("error connecting to GRPC proxy server: %v", err) } return &GRPCSubscribeProxy{ - proxy: p, + config: p, client: proxyproto.NewCentrifugoProxyClient(conn), }, nil } // ProxySubscribe proxies Subscribe to application backend. func (p *GRPCSubscribeProxy) ProxySubscribe(ctx context.Context, req *proxyproto.SubscribeRequest) (*proxyproto.SubscribeResponse, error) { - ctx, cancel := context.WithTimeout(ctx, time.Duration(p.proxy.Timeout)) + ctx, cancel := context.WithTimeout(ctx, time.Duration(p.config.Timeout)) defer cancel() - return p.client.Subscribe(grpcRequestContext(ctx, p.proxy), req, grpc.ForceCodec(grpcCodec)) + return p.client.Subscribe(grpcRequestContext(ctx, p.config), req, grpc.ForceCodec(grpcCodec)) } // Protocol ... @@ -54,10 +54,10 @@ func (p *GRPCSubscribeProxy) Protocol() string { // UseBase64 ... func (p *GRPCSubscribeProxy) UseBase64() bool { - return p.proxy.BinaryEncoding + return p.config.BinaryEncoding } // IncludeMeta ... func (p *GRPCSubscribeProxy) IncludeMeta() bool { - return p.proxy.IncludeConnectionMeta + return p.config.IncludeConnectionMeta } diff --git a/internal/proxy/subscribe_http.go b/internal/proxy/subscribe_http.go index 1e1d2ccd04..6cf0276f27 100644 --- a/internal/proxy/subscribe_http.go +++ b/internal/proxy/subscribe_http.go @@ -9,16 +9,16 @@ import ( // HTTPSubscribeProxy ... type HTTPSubscribeProxy struct { - proxy Proxy + config Config httpCaller HTTPCaller } var _ SubscribeProxy = (*HTTPSubscribeProxy)(nil) // NewHTTPSubscribeProxy ... -func NewHTTPSubscribeProxy(p Proxy) (*HTTPSubscribeProxy, error) { +func NewHTTPSubscribeProxy(p Config) (*HTTPSubscribeProxy, error) { return &HTTPSubscribeProxy{ - proxy: p, + config: p, httpCaller: NewHTTPCaller(proxyHTTPClient(time.Duration(p.Timeout))), }, nil } @@ -29,7 +29,7 @@ func (p *HTTPSubscribeProxy) ProxySubscribe(ctx context.Context, req *proxyproto if err != nil { return nil, err } - respData, err := p.httpCaller.CallHTTP(ctx, p.proxy.Endpoint, httpRequestHeaders(ctx, p.proxy), data) + respData, err := p.httpCaller.CallHTTP(ctx, p.config.Endpoint, httpRequestHeaders(ctx, p.config), data) if err != nil { return nil, err } @@ -43,10 +43,10 @@ func (p *HTTPSubscribeProxy) Protocol() string { // UseBase64 ... func (p *HTTPSubscribeProxy) UseBase64() bool { - return p.proxy.BinaryEncoding + return p.config.BinaryEncoding } // IncludeMeta ... func (p *HTTPSubscribeProxy) IncludeMeta() bool { - return p.proxy.IncludeConnectionMeta + return p.config.IncludeConnectionMeta } diff --git a/internal/proxy/subscribe_stream_grpc.go b/internal/proxy/subscribe_stream_grpc.go new file mode 100644 index 0000000000..c9bb203109 --- /dev/null +++ b/internal/proxy/subscribe_stream_grpc.go @@ -0,0 +1,48 @@ +package proxy + +import ( + "context" + "fmt" + "time" + + "github.com/centrifugal/centrifugo/v5/internal/proxyproto" + + "google.golang.org/grpc" +) + +type SubscribeStreamProxy struct { + config Config + client proxyproto.CentrifugoProxyClient +} + +func NewSubscribeStreamProxy(p Config) (*SubscribeStreamProxy, error) { + host, err := getGrpcHost(p.Endpoint) + if err != nil { + return nil, fmt.Errorf("error getting grpc host: %v", err) + } + ctx, cancel := context.WithTimeout(context.Background(), time.Duration(p.Timeout)) + defer cancel() + dialOpts, err := getDialOpts(p) + if err != nil { + return nil, fmt.Errorf("error creating GRPC dial options: %v", err) + } + conn, err := grpc.DialContext(ctx, host, dialOpts...) + if err != nil { + return nil, fmt.Errorf("error connecting to GRPC proxy server: %v", err) + } + + return &SubscribeStreamProxy{ + config: p, + client: proxyproto.NewCentrifugoProxyClient(conn), + }, nil +} + +// SubscribeUnidirectional ... +func (p *SubscribeStreamProxy) SubscribeUnidirectional(ctx context.Context, req *proxyproto.SubscribeRequest) (proxyproto.CentrifugoProxy_SubscribeUnidirectionalClient, error) { + return p.client.SubscribeUnidirectional(grpcRequestContext(ctx, p.config), req, grpc.ForceCodec(grpcCodec)) +} + +// SubscribeBidirectional ... +func (p *SubscribeStreamProxy) SubscribeBidirectional(ctx context.Context) (proxyproto.CentrifugoProxy_SubscribeBidirectionalClient, error) { + return p.client.SubscribeBidirectional(grpcRequestContext(ctx, p.config), grpc.ForceCodec(grpcCodec)) +} diff --git a/internal/proxy/subscribe_stream_handler.go b/internal/proxy/subscribe_stream_handler.go new file mode 100644 index 0000000000..ad66c94be9 --- /dev/null +++ b/internal/proxy/subscribe_stream_handler.go @@ -0,0 +1,353 @@ +package proxy + +import ( + "context" + "encoding/base64" + "errors" + "io" + "sync" + "time" + + "github.com/centrifugal/centrifugo/v5/internal/proxyproto" + "github.com/centrifugal/centrifugo/v5/internal/rule" + "github.com/centrifugal/centrifugo/v5/internal/subsource" + + "github.com/centrifugal/centrifuge" + "github.com/prometheus/client_golang/prometheus" +) + +// SubscribeStreamHandlerConfig ... +type SubscribeStreamHandlerConfig struct { + Proxies map[string]*SubscribeStreamProxy + GranularProxyMode bool +} + +// SubscribeStreamHandler ... +type SubscribeStreamHandler struct { + config SubscribeStreamHandlerConfig + + summary prometheus.Observer + histogram prometheus.Observer + errors prometheus.Counter + granularSummary map[string]prometheus.Observer + granularHistogram map[string]prometheus.Observer + granularErrors map[string]prometheus.Counter +} + +// NewSubscribeStreamHandler ... +func NewSubscribeStreamHandler(c SubscribeStreamHandlerConfig) *SubscribeStreamHandler { + h := &SubscribeStreamHandler{ + config: c, + } + if h.config.GranularProxyMode { + summary := map[string]prometheus.Observer{} + histogram := map[string]prometheus.Observer{} + errCounters := map[string]prometheus.Counter{} + for k := range c.Proxies { + name := k + if name == "" { + name = "__default__" + } + summary[name] = granularProxyCallDurationSummary.WithLabelValues("subscribe_stream", name) + histogram[name] = granularProxyCallDurationHistogram.WithLabelValues("subscribe_stream", name) + errCounters[name] = granularProxyCallErrorCount.WithLabelValues("subscribe_stream", name) + } + h.granularSummary = summary + h.granularHistogram = histogram + h.granularErrors = errCounters + } else { + h.summary = proxyCallDurationSummary.WithLabelValues("grpc", "subscribe_stream") + h.histogram = proxyCallDurationHistogram.WithLabelValues("grpc", "subscribe_stream") + h.errors = proxyCallErrorCount.WithLabelValues("grpc", "subscribe_stream") + } + return h +} + +// StreamPublishFunc ... +type StreamPublishFunc func(data []byte) error + +// SubscribeStreamHandlerFunc ... +type SubscribeStreamHandlerFunc func( + Client, bool, centrifuge.SubscribeEvent, rule.ChannelOptions, PerCallData, +) (centrifuge.SubscribeReply, StreamPublishFunc, func(), error) + +// Handle ... +func (h *SubscribeStreamHandler) Handle(node *centrifuge.Node) SubscribeStreamHandlerFunc { + return func( + client Client, bidi bool, e centrifuge.SubscribeEvent, + chOpts rule.ChannelOptions, pcd PerCallData, + ) (centrifuge.SubscribeReply, StreamPublishFunc, func(), error) { + started := time.Now() + + var p *SubscribeStreamProxy + var summary prometheus.Observer + var histogram prometheus.Observer + var errCounter prometheus.Counter + + if h.config.GranularProxyMode { + proxyName := chOpts.SubscribeStreamProxyName + if proxyName == "" { + node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelInfo, "subscribe stream proxy not configured for a channel", map[string]any{"channel": e.Channel})) + return centrifuge.SubscribeReply{}, nil, nil, centrifuge.ErrorNotAvailable + } + p = h.config.Proxies[proxyName] + summary = h.granularSummary[proxyName] + histogram = h.granularHistogram[proxyName] + errCounter = h.granularErrors[proxyName] + } else { + p = h.config.Proxies[""] + summary = h.summary + histogram = h.histogram + errCounter = h.errors + } + + req := &proxyproto.SubscribeRequest{ + Client: client.ID(), + Protocol: string(client.Transport().Protocol()), + Transport: client.Transport().Name(), + Encoding: getEncoding(p.config.BinaryEncoding), + + User: client.UserID(), + Channel: e.Channel, + Token: e.Token, + } + if !p.config.BinaryEncoding { + req.Data = e.Data + } else { + req.B64Data = base64.StdEncoding.EncodeToString(e.Data) + } + if p.config.IncludeConnectionMeta && pcd.Meta != nil { + req.Meta = proxyproto.Raw(pcd.Meta) + } + + subscriptionReady := make(chan struct{}) + var subscriptionReadyOnce sync.Once + + subscribeRep, publishFunc, cancelFunc, err := p.SubscribeStream( + client.Context(), + bidi, + req, + func(pub *proxyproto.Publication, err error) { + subscriptionReadyOnce.Do(func() { + select { + case <-subscriptionReady: + case <-client.Context().Done(): + return + } + }) + select { + case <-client.Context().Done(): + return + default: + } + if err != nil { + if errors.Is(err, io.EOF) { + client.Unsubscribe(e.Channel, centrifuge.Unsubscribe{ + Code: centrifuge.UnsubscribeCodeServer, + Reason: "server unsubscribe", + }) + return + } + client.Unsubscribe(e.Channel, centrifuge.Unsubscribe{ + Code: centrifuge.UnsubscribeCodeInsufficient, + Reason: "insufficient state", + }) + return + } + _ = client.WritePublication(e.Channel, ¢rifuge.Publication{ + Data: pub.Data, + Tags: pub.Tags, + }, centrifuge.StreamPosition{}) + }, + ) + + duration := time.Since(started).Seconds() + + if err != nil { + select { + case <-client.Context().Done(): + // Client connection already closed. + return centrifuge.SubscribeReply{}, nil, nil, centrifuge.DisconnectConnectionClosed + default: + } + summary.Observe(duration) + histogram.Observe(duration) + errCounter.Inc() + node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error from subscribe stream proxy", map[string]any{"error": err.Error()})) + //proxyCallErrorCount.WithLabelValues(proxyName, "subscribe", "internal").Inc() + return centrifuge.SubscribeReply{}, nil, nil, err + } + + summary.Observe(duration) + histogram.Observe(duration) + + if subscribeRep.Disconnect != nil { + //proxyCallErrorCount.WithLabelValues(proxyName, "subscribe", "disconnect_"+strconv.FormatUint(uint64(subscribeRep.Disconnect.Code), 10)).Inc() + return centrifuge.SubscribeReply{}, nil, nil, ¢rifuge.Disconnect{ + Code: subscribeRep.Disconnect.Code, + Reason: subscribeRep.Disconnect.Reason, + } + } + if subscribeRep.Error != nil { + //proxyCallErrorCount.WithLabelValues(proxyName, "subscribe", "error_"+strconv.FormatUint(uint64(subscribeRep.Error.Code), 10)).Inc() + return centrifuge.SubscribeReply{}, nil, nil, ¢rifuge.Error{ + Code: subscribeRep.Error.Code, + Message: subscribeRep.Error.Message, + } + } + + presence := chOpts.Presence + joinLeave := chOpts.JoinLeave + pushJoinLeave := chOpts.ForcePushJoinLeave + + var info []byte + var data []byte + var expireAt int64 + + if subscribeRep.Result != nil { + if subscribeRep.Result.B64Info != "" { + decodedInfo, err := base64.StdEncoding.DecodeString(subscribeRep.Result.B64Info) + if err != nil { + node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error decoding base64 info", map[string]any{"client": client.ID(), "error": err.Error()})) + return centrifuge.SubscribeReply{}, nil, nil, centrifuge.ErrorInternal + } + info = decodedInfo + } else { + info = subscribeRep.Result.Info + } + if subscribeRep.Result.B64Data != "" { + decodedData, err := base64.StdEncoding.DecodeString(subscribeRep.Result.B64Data) + if err != nil { + node.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error decoding base64 data", map[string]any{"client": client.ID(), "error": err.Error()})) + return centrifuge.SubscribeReply{}, nil, nil, centrifuge.ErrorInternal + } + data = decodedData + } else { + data = subscribeRep.Result.Data + } + + result := subscribeRep.Result + + if result.Override != nil && result.Override.Presence != nil { + presence = result.Override.Presence.Value + } + if result.Override != nil && result.Override.JoinLeave != nil { + joinLeave = result.Override.JoinLeave.Value + } + if result.Override != nil && result.Override.ForcePushJoinLeave != nil { + pushJoinLeave = result.Override.ForcePushJoinLeave.Value + } + + expireAt = result.ExpireAt + } + + return centrifuge.SubscribeReply{ + Options: centrifuge.SubscribeOptions{ + ExpireAt: expireAt, + ChannelInfo: info, + EmitPresence: presence, + EmitJoinLeave: joinLeave, + PushJoinLeave: pushJoinLeave, + EnableRecovery: false, // Not used for subscribe stream proxy. + EnablePositioning: false, // Not used for subscribe stream proxy. + Data: data, + Source: subsource.StreamProxy, + HistoryMetaTTL: 0, // Not used for subscribe stream proxy. + }, + ClientSideRefresh: true, + SubscriptionReady: subscriptionReady, + }, publishFunc, cancelFunc, nil + } +} + +type OnPublication func(pub *proxyproto.Publication, err error) + +type ChannelStreamReader interface { + Recv() (*proxyproto.StreamSubscribeResponse, error) +} + +// SubscribeStream ... +func (p *SubscribeStreamProxy) SubscribeStream( + ctx context.Context, + bidi bool, + sr *proxyproto.SubscribeRequest, + pubFunc OnPublication, +) (*proxyproto.SubscribeResponse, StreamPublishFunc, func(), error) { + ctx, cancel := context.WithCancel(ctx) + + var stream ChannelStreamReader + + var publishFunc StreamPublishFunc + + if bidi { + bidiStream, err := p.SubscribeBidirectional(ctx) + if err != nil { + cancel() + return nil, nil, nil, err + } + err = bidiStream.Send(&proxyproto.StreamSubscribeRequest{ + SubscribeRequest: sr, + }) + if err != nil { + cancel() + return nil, nil, nil, err + } + stream = bidiStream.(ChannelStreamReader) + publishFunc = func(data []byte) error { + return bidiStream.Send(&proxyproto.StreamSubscribeRequest{ + Publication: &proxyproto.Publication{ + Data: data, + }, + }) + } + } else { + var err error + stream, err = p.SubscribeUnidirectional(ctx, sr) + if err != nil { + cancel() + return nil, nil, nil, err + } + } + + firstMessageReceived := make(chan struct{}) + + go func() { + select { + case <-ctx.Done(): + cancel() + return + case <-time.After(time.Duration(p.config.Timeout)): + cancel() + return + case <-firstMessageReceived: + } + }() + + resp, err := stream.Recv() + if err != nil { + cancel() + return nil, nil, nil, err + } + close(firstMessageReceived) + + go func() { + for { + pubResp, err := stream.Recv() + if err != nil { + cancel() + pubFunc(nil, err) + return + } + pub := pubResp.GetPublication() + if pub != nil { + // TODO: better handling of unexpected nil publication. + pubFunc(pub, nil) + } + } + }() + + if resp.SubscribeResponse == nil { + return nil, nil, nil, errors.New("no subscribe response in first message from stream proxy") + } + return resp.SubscribeResponse, publishFunc, cancel, nil +} diff --git a/internal/proxy/unknown_keys.go b/internal/proxy/unknown_keys.go index 3ff5dbffe1..0c982df96f 100644 --- a/internal/proxy/unknown_keys.go +++ b/internal/proxy/unknown_keys.go @@ -17,7 +17,7 @@ func WarnUnknownProxyKeys(jsonProxies []byte) { return } for _, jsonMap := range jsonMaps { - var data Proxy + var data Config unknownKeys := tools.FindUnknownKeys(jsonMap, data) for _, key := range unknownKeys { if key == "name" { diff --git a/internal/proxyproto/proxy.pb.go b/internal/proxyproto/proxy.pb.go index 0e64070a80..8d388f82c2 100644 --- a/internal/proxyproto/proxy.pb.go +++ b/internal/proxyproto/proxy.pb.go @@ -1963,6 +1963,180 @@ func (x *SubRefreshResponse) GetDisconnect() *Disconnect { return nil } +// Publication is an event to be sent to a client. +// We intentionally make it use the same Protobuf numbers for fields as our client protocol +// Publication - for now only for consistency. +type Publication struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Data Raw `protobuf:"bytes,4,opt,name=data,proto3" json:"data,omitempty"` + Tags map[string]string `protobuf:"bytes,7,rep,name=tags,proto3" json:"tags,omitempty" protobuf_key:"bytes,1,opt,name=key,proto3" protobuf_val:"bytes,2,opt,name=value,proto3"` +} + +func (x *Publication) Reset() { + *x = Publication{} + if protoimpl.UnsafeEnabled { + mi := &file_proxy_proto_msgTypes[25] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *Publication) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*Publication) ProtoMessage() {} + +func (x *Publication) ProtoReflect() protoreflect.Message { + mi := &file_proxy_proto_msgTypes[25] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use Publication.ProtoReflect.Descriptor instead. +func (*Publication) Descriptor() ([]byte, []int) { + return file_proxy_proto_rawDescGZIP(), []int{25} +} + +func (x *Publication) GetData() []byte { + if x != nil { + return x.Data + } + return nil +} + +func (x *Publication) GetTags() map[string]string { + if x != nil { + return x.Tags + } + return nil +} + +type StreamSubscribeRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // Centrifugo always sends this within the first message upon user subscription request. + // It's always not set in the following StreamRequest messages from Centrifugo. + SubscribeRequest *SubscribeRequest `protobuf:"bytes,1,opt,name=subscribe_request,json=subscribeRequest,proto3" json:"subscribe_request,omitempty"` + // Publication may be set when client publishes to the on-demand stream. If you are using + // bidirectional stream then Centrifugo assumes publications from client-side are allowed. + Publication *Publication `protobuf:"bytes,2,opt,name=publication,proto3" json:"publication,omitempty"` +} + +func (x *StreamSubscribeRequest) Reset() { + *x = StreamSubscribeRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_proxy_proto_msgTypes[26] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *StreamSubscribeRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*StreamSubscribeRequest) ProtoMessage() {} + +func (x *StreamSubscribeRequest) ProtoReflect() protoreflect.Message { + mi := &file_proxy_proto_msgTypes[26] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use StreamSubscribeRequest.ProtoReflect.Descriptor instead. +func (*StreamSubscribeRequest) Descriptor() ([]byte, []int) { + return file_proxy_proto_rawDescGZIP(), []int{26} +} + +func (x *StreamSubscribeRequest) GetSubscribeRequest() *SubscribeRequest { + if x != nil { + return x.SubscribeRequest + } + return nil +} + +func (x *StreamSubscribeRequest) GetPublication() *Publication { + if x != nil { + return x.Publication + } + return nil +} + +type StreamSubscribeResponse struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + // SubscribeResponse may optionally be set in the first message from backend to Centrifugo. + SubscribeResponse *SubscribeResponse `protobuf:"bytes,1,opt,name=subscribe_response,json=subscribeResponse,proto3" json:"subscribe_response,omitempty"` + // Publication goes to client. Can't be set in the first message from backend to Centrifugo. + Publication *Publication `protobuf:"bytes,2,opt,name=publication,proto3" json:"publication,omitempty"` +} + +func (x *StreamSubscribeResponse) Reset() { + *x = StreamSubscribeResponse{} + if protoimpl.UnsafeEnabled { + mi := &file_proxy_proto_msgTypes[27] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *StreamSubscribeResponse) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*StreamSubscribeResponse) ProtoMessage() {} + +func (x *StreamSubscribeResponse) ProtoReflect() protoreflect.Message { + mi := &file_proxy_proto_msgTypes[27] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use StreamSubscribeResponse.ProtoReflect.Descriptor instead. +func (*StreamSubscribeResponse) Descriptor() ([]byte, []int) { + return file_proxy_proto_rawDescGZIP(), []int{27} +} + +func (x *StreamSubscribeResponse) GetSubscribeResponse() *SubscribeResponse { + if x != nil { + return x.SubscribeResponse + } + return nil +} + +func (x *StreamSubscribeResponse) GetPublication() *Publication { + if x != nil { + return x.Publication + } + return nil +} + var File_proxy_proto protoreflect.FileDescriptor var file_proxy_proto_rawDesc = []byte{ @@ -2271,49 +2445,103 @@ var file_proxy_proto_rawDesc = []byte{ 0x20, 0x01, 0x28, 0x0b, 0x32, 0x28, 0x2e, 0x63, 0x65, 0x6e, 0x74, 0x72, 0x69, 0x66, 0x75, 0x67, 0x61, 0x6c, 0x2e, 0x63, 0x65, 0x6e, 0x74, 0x72, 0x69, 0x66, 0x75, 0x67, 0x6f, 0x2e, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x2e, 0x44, 0x69, 0x73, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x52, 0x0a, - 0x64, 0x69, 0x73, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x32, 0x84, 0x05, 0x0a, 0x0f, 0x43, - 0x65, 0x6e, 0x74, 0x72, 0x69, 0x66, 0x75, 0x67, 0x6f, 0x50, 0x72, 0x6f, 0x78, 0x79, 0x12, 0x66, - 0x0a, 0x07, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x12, 0x2c, 0x2e, 0x63, 0x65, 0x6e, 0x74, - 0x72, 0x69, 0x66, 0x75, 0x67, 0x61, 0x6c, 0x2e, 0x63, 0x65, 0x6e, 0x74, 0x72, 0x69, 0x66, 0x75, - 0x67, 0x6f, 0x2e, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x2e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, - 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2d, 0x2e, 0x63, 0x65, 0x6e, 0x74, 0x72, 0x69, - 0x66, 0x75, 0x67, 0x61, 0x6c, 0x2e, 0x63, 0x65, 0x6e, 0x74, 0x72, 0x69, 0x66, 0x75, 0x67, 0x6f, - 0x2e, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x2e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x52, 0x65, - 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x66, 0x0a, 0x07, 0x52, 0x65, 0x66, 0x72, 0x65, 0x73, - 0x68, 0x12, 0x2c, 0x2e, 0x63, 0x65, 0x6e, 0x74, 0x72, 0x69, 0x66, 0x75, 0x67, 0x61, 0x6c, 0x2e, - 0x63, 0x65, 0x6e, 0x74, 0x72, 0x69, 0x66, 0x75, 0x67, 0x6f, 0x2e, 0x70, 0x72, 0x6f, 0x78, 0x79, - 0x2e, 0x52, 0x65, 0x66, 0x72, 0x65, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, - 0x2d, 0x2e, 0x63, 0x65, 0x6e, 0x74, 0x72, 0x69, 0x66, 0x75, 0x67, 0x61, 0x6c, 0x2e, 0x63, 0x65, - 0x6e, 0x74, 0x72, 0x69, 0x66, 0x75, 0x67, 0x6f, 0x2e, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x2e, 0x52, - 0x65, 0x66, 0x72, 0x65, 0x73, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x6c, - 0x0a, 0x09, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x12, 0x2e, 0x2e, 0x63, 0x65, + 0x64, 0x69, 0x73, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x22, 0xc1, 0x01, 0x0a, 0x0b, 0x50, + 0x75, 0x62, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x12, 0x12, 0x0a, 0x04, 0x64, 0x61, + 0x74, 0x61, 0x18, 0x04, 0x20, 0x01, 0x28, 0x0c, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, 0x12, 0x47, + 0x0a, 0x04, 0x74, 0x61, 0x67, 0x73, 0x18, 0x07, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x33, 0x2e, 0x63, + 0x65, 0x6e, 0x74, 0x72, 0x69, 0x66, 0x75, 0x67, 0x61, 0x6c, 0x2e, 0x63, 0x65, 0x6e, 0x74, 0x72, + 0x69, 0x66, 0x75, 0x67, 0x6f, 0x2e, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x2e, 0x50, 0x75, 0x62, 0x6c, + 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x2e, 0x54, 0x61, 0x67, 0x73, 0x45, 0x6e, 0x74, 0x72, + 0x79, 0x52, 0x04, 0x74, 0x61, 0x67, 0x73, 0x1a, 0x37, 0x0a, 0x09, 0x54, 0x61, 0x67, 0x73, 0x45, + 0x6e, 0x74, 0x72, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x6b, 0x65, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, + 0x09, 0x52, 0x03, 0x6b, 0x65, 0x79, 0x12, 0x14, 0x0a, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x18, + 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x76, 0x61, 0x6c, 0x75, 0x65, 0x3a, 0x02, 0x38, 0x01, + 0x4a, 0x04, 0x08, 0x01, 0x10, 0x02, 0x4a, 0x04, 0x08, 0x02, 0x10, 0x03, 0x4a, 0x04, 0x08, 0x03, + 0x10, 0x04, 0x4a, 0x04, 0x08, 0x05, 0x10, 0x06, 0x4a, 0x04, 0x08, 0x06, 0x10, 0x07, 0x22, 0xc2, + 0x01, 0x0a, 0x16, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, + 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x5b, 0x0a, 0x11, 0x73, 0x75, 0x62, + 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x5f, 0x72, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x18, 0x01, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x2e, 0x2e, 0x63, 0x65, 0x6e, 0x74, 0x72, 0x69, 0x66, 0x75, 0x67, + 0x61, 0x6c, 0x2e, 0x63, 0x65, 0x6e, 0x74, 0x72, 0x69, 0x66, 0x75, 0x67, 0x6f, 0x2e, 0x70, 0x72, + 0x6f, 0x78, 0x79, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, + 0x75, 0x65, 0x73, 0x74, 0x52, 0x10, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x4b, 0x0a, 0x0b, 0x70, 0x75, 0x62, 0x6c, 0x69, 0x63, + 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x02, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x29, 0x2e, 0x63, 0x65, 0x6e, 0x74, 0x72, 0x69, 0x66, 0x75, 0x67, 0x61, 0x6c, 0x2e, 0x63, 0x65, 0x6e, 0x74, 0x72, 0x69, - 0x66, 0x75, 0x67, 0x6f, 0x2e, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, - 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2f, 0x2e, 0x63, 0x65, + 0x66, 0x75, 0x67, 0x6f, 0x2e, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, + 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x0b, 0x70, 0x75, 0x62, 0x6c, 0x69, 0x63, 0x61, 0x74, + 0x69, 0x6f, 0x6e, 0x22, 0xc6, 0x01, 0x0a, 0x17, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x53, 0x75, + 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, + 0x5e, 0x0a, 0x12, 0x73, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x5f, 0x72, 0x65, 0x73, + 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x2f, 0x2e, 0x63, 0x65, 0x6e, 0x74, 0x72, 0x69, 0x66, 0x75, 0x67, 0x61, 0x6c, 0x2e, 0x63, 0x65, 0x6e, 0x74, 0x72, 0x69, 0x66, 0x75, 0x67, 0x6f, 0x2e, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x2e, 0x53, 0x75, 0x62, 0x73, 0x63, - 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x66, 0x0a, 0x07, - 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x12, 0x2c, 0x2e, 0x63, 0x65, 0x6e, 0x74, 0x72, 0x69, + 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x52, 0x11, 0x73, 0x75, + 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, + 0x4b, 0x0a, 0x0b, 0x70, 0x75, 0x62, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x0b, 0x32, 0x29, 0x2e, 0x63, 0x65, 0x6e, 0x74, 0x72, 0x69, 0x66, 0x75, 0x67, + 0x61, 0x6c, 0x2e, 0x63, 0x65, 0x6e, 0x74, 0x72, 0x69, 0x66, 0x75, 0x67, 0x6f, 0x2e, 0x70, 0x72, + 0x6f, 0x78, 0x79, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, + 0x0b, 0x70, 0x75, 0x62, 0x6c, 0x69, 0x63, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x32, 0x95, 0x07, 0x0a, + 0x0f, 0x43, 0x65, 0x6e, 0x74, 0x72, 0x69, 0x66, 0x75, 0x67, 0x6f, 0x50, 0x72, 0x6f, 0x78, 0x79, + 0x12, 0x66, 0x0a, 0x07, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x12, 0x2c, 0x2e, 0x63, 0x65, + 0x6e, 0x74, 0x72, 0x69, 0x66, 0x75, 0x67, 0x61, 0x6c, 0x2e, 0x63, 0x65, 0x6e, 0x74, 0x72, 0x69, + 0x66, 0x75, 0x67, 0x6f, 0x2e, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x2e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, + 0x63, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2d, 0x2e, 0x63, 0x65, 0x6e, 0x74, + 0x72, 0x69, 0x66, 0x75, 0x67, 0x61, 0x6c, 0x2e, 0x63, 0x65, 0x6e, 0x74, 0x72, 0x69, 0x66, 0x75, + 0x67, 0x6f, 0x2e, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x2e, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, + 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x66, 0x0a, 0x07, 0x52, 0x65, 0x66, 0x72, + 0x65, 0x73, 0x68, 0x12, 0x2c, 0x2e, 0x63, 0x65, 0x6e, 0x74, 0x72, 0x69, 0x66, 0x75, 0x67, 0x61, + 0x6c, 0x2e, 0x63, 0x65, 0x6e, 0x74, 0x72, 0x69, 0x66, 0x75, 0x67, 0x6f, 0x2e, 0x70, 0x72, 0x6f, + 0x78, 0x79, 0x2e, 0x52, 0x65, 0x66, 0x72, 0x65, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x2d, 0x2e, 0x63, 0x65, 0x6e, 0x74, 0x72, 0x69, 0x66, 0x75, 0x67, 0x61, 0x6c, 0x2e, + 0x63, 0x65, 0x6e, 0x74, 0x72, 0x69, 0x66, 0x75, 0x67, 0x6f, 0x2e, 0x70, 0x72, 0x6f, 0x78, 0x79, + 0x2e, 0x52, 0x65, 0x66, 0x72, 0x65, 0x73, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x12, 0x6c, 0x0a, 0x09, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x12, 0x2e, 0x2e, + 0x63, 0x65, 0x6e, 0x74, 0x72, 0x69, 0x66, 0x75, 0x67, 0x61, 0x6c, 0x2e, 0x63, 0x65, 0x6e, 0x74, + 0x72, 0x69, 0x66, 0x75, 0x67, 0x6f, 0x2e, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x2e, 0x53, 0x75, 0x62, + 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2f, 0x2e, + 0x63, 0x65, 0x6e, 0x74, 0x72, 0x69, 0x66, 0x75, 0x67, 0x61, 0x6c, 0x2e, 0x63, 0x65, 0x6e, 0x74, + 0x72, 0x69, 0x66, 0x75, 0x67, 0x6f, 0x2e, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x2e, 0x53, 0x75, 0x62, + 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x66, + 0x0a, 0x07, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x12, 0x2c, 0x2e, 0x63, 0x65, 0x6e, 0x74, + 0x72, 0x69, 0x66, 0x75, 0x67, 0x61, 0x6c, 0x2e, 0x63, 0x65, 0x6e, 0x74, 0x72, 0x69, 0x66, 0x75, + 0x67, 0x6f, 0x2e, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2d, 0x2e, 0x63, 0x65, 0x6e, 0x74, 0x72, 0x69, 0x66, 0x75, 0x67, 0x61, 0x6c, 0x2e, 0x63, 0x65, 0x6e, 0x74, 0x72, 0x69, 0x66, 0x75, 0x67, 0x6f, 0x2e, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x2d, 0x2e, 0x63, 0x65, 0x6e, 0x74, 0x72, 0x69, 0x66, 0x75, - 0x67, 0x61, 0x6c, 0x2e, 0x63, 0x65, 0x6e, 0x74, 0x72, 0x69, 0x66, 0x75, 0x67, 0x6f, 0x2e, 0x70, - 0x72, 0x6f, 0x78, 0x79, 0x2e, 0x50, 0x75, 0x62, 0x6c, 0x69, 0x73, 0x68, 0x52, 0x65, 0x73, 0x70, - 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x5a, 0x0a, 0x03, 0x52, 0x50, 0x43, 0x12, 0x28, 0x2e, 0x63, 0x65, - 0x6e, 0x74, 0x72, 0x69, 0x66, 0x75, 0x67, 0x61, 0x6c, 0x2e, 0x63, 0x65, 0x6e, 0x74, 0x72, 0x69, - 0x66, 0x75, 0x67, 0x6f, 0x2e, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x2e, 0x52, 0x50, 0x43, 0x52, 0x65, - 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x29, 0x2e, 0x63, 0x65, 0x6e, 0x74, 0x72, 0x69, 0x66, 0x75, - 0x67, 0x61, 0x6c, 0x2e, 0x63, 0x65, 0x6e, 0x74, 0x72, 0x69, 0x66, 0x75, 0x67, 0x6f, 0x2e, 0x70, - 0x72, 0x6f, 0x78, 0x79, 0x2e, 0x52, 0x50, 0x43, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, - 0x12, 0x6f, 0x0a, 0x0a, 0x53, 0x75, 0x62, 0x52, 0x65, 0x66, 0x72, 0x65, 0x73, 0x68, 0x12, 0x2f, - 0x2e, 0x63, 0x65, 0x6e, 0x74, 0x72, 0x69, 0x66, 0x75, 0x67, 0x61, 0x6c, 0x2e, 0x63, 0x65, 0x6e, - 0x74, 0x72, 0x69, 0x66, 0x75, 0x67, 0x6f, 0x2e, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x2e, 0x53, 0x75, - 0x62, 0x52, 0x65, 0x66, 0x72, 0x65, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, - 0x30, 0x2e, 0x63, 0x65, 0x6e, 0x74, 0x72, 0x69, 0x66, 0x75, 0x67, 0x61, 0x6c, 0x2e, 0x63, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x5a, 0x0a, 0x03, 0x52, 0x50, 0x43, 0x12, 0x28, 0x2e, + 0x63, 0x65, 0x6e, 0x74, 0x72, 0x69, 0x66, 0x75, 0x67, 0x61, 0x6c, 0x2e, 0x63, 0x65, 0x6e, 0x74, + 0x72, 0x69, 0x66, 0x75, 0x67, 0x6f, 0x2e, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x2e, 0x52, 0x50, 0x43, + 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x29, 0x2e, 0x63, 0x65, 0x6e, 0x74, 0x72, 0x69, + 0x66, 0x75, 0x67, 0x61, 0x6c, 0x2e, 0x63, 0x65, 0x6e, 0x74, 0x72, 0x69, 0x66, 0x75, 0x67, 0x6f, + 0x2e, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x2e, 0x52, 0x50, 0x43, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, + 0x73, 0x65, 0x12, 0x6f, 0x0a, 0x0a, 0x53, 0x75, 0x62, 0x52, 0x65, 0x66, 0x72, 0x65, 0x73, 0x68, + 0x12, 0x2f, 0x2e, 0x63, 0x65, 0x6e, 0x74, 0x72, 0x69, 0x66, 0x75, 0x67, 0x61, 0x6c, 0x2e, 0x63, + 0x65, 0x6e, 0x74, 0x72, 0x69, 0x66, 0x75, 0x67, 0x6f, 0x2e, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x2e, + 0x53, 0x75, 0x62, 0x52, 0x65, 0x66, 0x72, 0x65, 0x73, 0x68, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, + 0x74, 0x1a, 0x30, 0x2e, 0x63, 0x65, 0x6e, 0x74, 0x72, 0x69, 0x66, 0x75, 0x67, 0x61, 0x6c, 0x2e, + 0x63, 0x65, 0x6e, 0x74, 0x72, 0x69, 0x66, 0x75, 0x67, 0x6f, 0x2e, 0x70, 0x72, 0x6f, 0x78, 0x79, + 0x2e, 0x53, 0x75, 0x62, 0x52, 0x65, 0x66, 0x72, 0x65, 0x73, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, + 0x6e, 0x73, 0x65, 0x12, 0x82, 0x01, 0x0a, 0x17, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, + 0x65, 0x55, 0x6e, 0x69, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x61, 0x6c, 0x12, + 0x2e, 0x2e, 0x63, 0x65, 0x6e, 0x74, 0x72, 0x69, 0x66, 0x75, 0x67, 0x61, 0x6c, 0x2e, 0x63, 0x65, 0x6e, 0x74, 0x72, 0x69, 0x66, 0x75, 0x67, 0x6f, 0x2e, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x2e, 0x53, - 0x75, 0x62, 0x52, 0x65, 0x66, 0x72, 0x65, 0x73, 0x68, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, - 0x65, 0x42, 0x0f, 0x5a, 0x0d, 0x2e, 0x2f, 0x3b, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x70, 0x72, 0x6f, - 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, + 0x35, 0x2e, 0x63, 0x65, 0x6e, 0x74, 0x72, 0x69, 0x66, 0x75, 0x67, 0x61, 0x6c, 0x2e, 0x63, 0x65, + 0x6e, 0x74, 0x72, 0x69, 0x66, 0x75, 0x67, 0x6f, 0x2e, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x2e, 0x53, + 0x74, 0x72, 0x65, 0x61, 0x6d, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, + 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x30, 0x01, 0x12, 0x89, 0x01, 0x0a, 0x16, 0x53, 0x75, 0x62, + 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x42, 0x69, 0x64, 0x69, 0x72, 0x65, 0x63, 0x74, 0x69, 0x6f, + 0x6e, 0x61, 0x6c, 0x12, 0x34, 0x2e, 0x63, 0x65, 0x6e, 0x74, 0x72, 0x69, 0x66, 0x75, 0x67, 0x61, + 0x6c, 0x2e, 0x63, 0x65, 0x6e, 0x74, 0x72, 0x69, 0x66, 0x75, 0x67, 0x6f, 0x2e, 0x70, 0x72, 0x6f, + 0x78, 0x79, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x53, 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, + 0x62, 0x65, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x35, 0x2e, 0x63, 0x65, 0x6e, 0x74, + 0x72, 0x69, 0x66, 0x75, 0x67, 0x61, 0x6c, 0x2e, 0x63, 0x65, 0x6e, 0x74, 0x72, 0x69, 0x66, 0x75, + 0x67, 0x6f, 0x2e, 0x70, 0x72, 0x6f, 0x78, 0x79, 0x2e, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x53, + 0x75, 0x62, 0x73, 0x63, 0x72, 0x69, 0x62, 0x65, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, + 0x28, 0x01, 0x30, 0x01, 0x42, 0x0f, 0x5a, 0x0d, 0x2e, 0x2f, 0x3b, 0x70, 0x72, 0x6f, 0x78, 0x79, + 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( @@ -2328,7 +2556,7 @@ func file_proxy_proto_rawDescGZIP() []byte { return file_proxy_proto_rawDescData } -var file_proxy_proto_msgTypes = make([]protoimpl.MessageInfo, 26) +var file_proxy_proto_msgTypes = make([]protoimpl.MessageInfo, 30) var file_proxy_proto_goTypes = []interface{}{ (*Disconnect)(nil), // 0: centrifugal.centrifugo.proxy.Disconnect (*Error)(nil), // 1: centrifugal.centrifugo.proxy.Error @@ -2355,11 +2583,15 @@ var file_proxy_proto_goTypes = []interface{}{ (*SubRefreshRequest)(nil), // 22: centrifugal.centrifugo.proxy.SubRefreshRequest (*SubRefreshResult)(nil), // 23: centrifugal.centrifugo.proxy.SubRefreshResult (*SubRefreshResponse)(nil), // 24: centrifugal.centrifugo.proxy.SubRefreshResponse - nil, // 25: centrifugal.centrifugo.proxy.ConnectResult.SubsEntry + (*Publication)(nil), // 25: centrifugal.centrifugo.proxy.Publication + (*StreamSubscribeRequest)(nil), // 26: centrifugal.centrifugo.proxy.StreamSubscribeRequest + (*StreamSubscribeResponse)(nil), // 27: centrifugal.centrifugo.proxy.StreamSubscribeResponse + nil, // 28: centrifugal.centrifugo.proxy.ConnectResult.SubsEntry + nil, // 29: centrifugal.centrifugo.proxy.Publication.TagsEntry } var file_proxy_proto_depIdxs = []int32{ 13, // 0: centrifugal.centrifugo.proxy.SubscribeOptions.override:type_name -> centrifugal.centrifugo.proxy.SubscribeOptionOverride - 25, // 1: centrifugal.centrifugo.proxy.ConnectResult.subs:type_name -> centrifugal.centrifugo.proxy.ConnectResult.SubsEntry + 28, // 1: centrifugal.centrifugo.proxy.ConnectResult.subs:type_name -> centrifugal.centrifugo.proxy.ConnectResult.SubsEntry 5, // 2: centrifugal.centrifugo.proxy.ConnectResult.caps:type_name -> centrifugal.centrifugo.proxy.ChannelsCapability 4, // 3: centrifugal.centrifugo.proxy.ConnectResponse.result:type_name -> centrifugal.centrifugo.proxy.ConnectResult 1, // 4: centrifugal.centrifugo.proxy.ConnectResponse.error:type_name -> centrifugal.centrifugo.proxy.Error @@ -2386,24 +2618,33 @@ var file_proxy_proto_depIdxs = []int32{ 23, // 25: centrifugal.centrifugo.proxy.SubRefreshResponse.result:type_name -> centrifugal.centrifugo.proxy.SubRefreshResult 1, // 26: centrifugal.centrifugo.proxy.SubRefreshResponse.error:type_name -> centrifugal.centrifugo.proxy.Error 0, // 27: centrifugal.centrifugo.proxy.SubRefreshResponse.disconnect:type_name -> centrifugal.centrifugo.proxy.Disconnect - 3, // 28: centrifugal.centrifugo.proxy.ConnectResult.SubsEntry.value:type_name -> centrifugal.centrifugo.proxy.SubscribeOptions - 2, // 29: centrifugal.centrifugo.proxy.CentrifugoProxy.Connect:input_type -> centrifugal.centrifugo.proxy.ConnectRequest - 7, // 30: centrifugal.centrifugo.proxy.CentrifugoProxy.Refresh:input_type -> centrifugal.centrifugo.proxy.RefreshRequest - 10, // 31: centrifugal.centrifugo.proxy.CentrifugoProxy.Subscribe:input_type -> centrifugal.centrifugo.proxy.SubscribeRequest - 16, // 32: centrifugal.centrifugo.proxy.CentrifugoProxy.Publish:input_type -> centrifugal.centrifugo.proxy.PublishRequest - 19, // 33: centrifugal.centrifugo.proxy.CentrifugoProxy.RPC:input_type -> centrifugal.centrifugo.proxy.RPCRequest - 22, // 34: centrifugal.centrifugo.proxy.CentrifugoProxy.SubRefresh:input_type -> centrifugal.centrifugo.proxy.SubRefreshRequest - 6, // 35: centrifugal.centrifugo.proxy.CentrifugoProxy.Connect:output_type -> centrifugal.centrifugo.proxy.ConnectResponse - 9, // 36: centrifugal.centrifugo.proxy.CentrifugoProxy.Refresh:output_type -> centrifugal.centrifugo.proxy.RefreshResponse - 15, // 37: centrifugal.centrifugo.proxy.CentrifugoProxy.Subscribe:output_type -> centrifugal.centrifugo.proxy.SubscribeResponse - 18, // 38: centrifugal.centrifugo.proxy.CentrifugoProxy.Publish:output_type -> centrifugal.centrifugo.proxy.PublishResponse - 21, // 39: centrifugal.centrifugo.proxy.CentrifugoProxy.RPC:output_type -> centrifugal.centrifugo.proxy.RPCResponse - 24, // 40: centrifugal.centrifugo.proxy.CentrifugoProxy.SubRefresh:output_type -> centrifugal.centrifugo.proxy.SubRefreshResponse - 35, // [35:41] is the sub-list for method output_type - 29, // [29:35] is the sub-list for method input_type - 29, // [29:29] is the sub-list for extension type_name - 29, // [29:29] is the sub-list for extension extendee - 0, // [0:29] is the sub-list for field type_name + 29, // 28: centrifugal.centrifugo.proxy.Publication.tags:type_name -> centrifugal.centrifugo.proxy.Publication.TagsEntry + 10, // 29: centrifugal.centrifugo.proxy.StreamSubscribeRequest.subscribe_request:type_name -> centrifugal.centrifugo.proxy.SubscribeRequest + 25, // 30: centrifugal.centrifugo.proxy.StreamSubscribeRequest.publication:type_name -> centrifugal.centrifugo.proxy.Publication + 15, // 31: centrifugal.centrifugo.proxy.StreamSubscribeResponse.subscribe_response:type_name -> centrifugal.centrifugo.proxy.SubscribeResponse + 25, // 32: centrifugal.centrifugo.proxy.StreamSubscribeResponse.publication:type_name -> centrifugal.centrifugo.proxy.Publication + 3, // 33: centrifugal.centrifugo.proxy.ConnectResult.SubsEntry.value:type_name -> centrifugal.centrifugo.proxy.SubscribeOptions + 2, // 34: centrifugal.centrifugo.proxy.CentrifugoProxy.Connect:input_type -> centrifugal.centrifugo.proxy.ConnectRequest + 7, // 35: centrifugal.centrifugo.proxy.CentrifugoProxy.Refresh:input_type -> centrifugal.centrifugo.proxy.RefreshRequest + 10, // 36: centrifugal.centrifugo.proxy.CentrifugoProxy.Subscribe:input_type -> centrifugal.centrifugo.proxy.SubscribeRequest + 16, // 37: centrifugal.centrifugo.proxy.CentrifugoProxy.Publish:input_type -> centrifugal.centrifugo.proxy.PublishRequest + 19, // 38: centrifugal.centrifugo.proxy.CentrifugoProxy.RPC:input_type -> centrifugal.centrifugo.proxy.RPCRequest + 22, // 39: centrifugal.centrifugo.proxy.CentrifugoProxy.SubRefresh:input_type -> centrifugal.centrifugo.proxy.SubRefreshRequest + 10, // 40: centrifugal.centrifugo.proxy.CentrifugoProxy.SubscribeUnidirectional:input_type -> centrifugal.centrifugo.proxy.SubscribeRequest + 26, // 41: centrifugal.centrifugo.proxy.CentrifugoProxy.SubscribeBidirectional:input_type -> centrifugal.centrifugo.proxy.StreamSubscribeRequest + 6, // 42: centrifugal.centrifugo.proxy.CentrifugoProxy.Connect:output_type -> centrifugal.centrifugo.proxy.ConnectResponse + 9, // 43: centrifugal.centrifugo.proxy.CentrifugoProxy.Refresh:output_type -> centrifugal.centrifugo.proxy.RefreshResponse + 15, // 44: centrifugal.centrifugo.proxy.CentrifugoProxy.Subscribe:output_type -> centrifugal.centrifugo.proxy.SubscribeResponse + 18, // 45: centrifugal.centrifugo.proxy.CentrifugoProxy.Publish:output_type -> centrifugal.centrifugo.proxy.PublishResponse + 21, // 46: centrifugal.centrifugo.proxy.CentrifugoProxy.RPC:output_type -> centrifugal.centrifugo.proxy.RPCResponse + 24, // 47: centrifugal.centrifugo.proxy.CentrifugoProxy.SubRefresh:output_type -> centrifugal.centrifugo.proxy.SubRefreshResponse + 27, // 48: centrifugal.centrifugo.proxy.CentrifugoProxy.SubscribeUnidirectional:output_type -> centrifugal.centrifugo.proxy.StreamSubscribeResponse + 27, // 49: centrifugal.centrifugo.proxy.CentrifugoProxy.SubscribeBidirectional:output_type -> centrifugal.centrifugo.proxy.StreamSubscribeResponse + 42, // [42:50] is the sub-list for method output_type + 34, // [34:42] is the sub-list for method input_type + 34, // [34:34] is the sub-list for extension type_name + 34, // [34:34] is the sub-list for extension extendee + 0, // [0:34] is the sub-list for field type_name } func init() { file_proxy_proto_init() } @@ -2712,6 +2953,42 @@ func file_proxy_proto_init() { return nil } } + file_proxy_proto_msgTypes[25].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*Publication); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proxy_proto_msgTypes[26].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*StreamSubscribeRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_proxy_proto_msgTypes[27].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*StreamSubscribeResponse); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } } type x struct{} out := protoimpl.TypeBuilder{ @@ -2719,7 +2996,7 @@ func file_proxy_proto_init() { GoPackagePath: reflect.TypeOf(x{}).PkgPath(), RawDescriptor: file_proxy_proto_rawDesc, NumEnums: 0, - NumMessages: 26, + NumMessages: 30, NumExtensions: 0, NumServices: 1, }, diff --git a/internal/proxyproto/proxy.proto b/internal/proxyproto/proxy.proto index bf8d021393..3f4a8b4ec9 100644 --- a/internal/proxyproto/proxy.proto +++ b/internal/proxyproto/proxy.proto @@ -5,12 +5,28 @@ package centrifugal.centrifugo.proxy; option go_package = "./;proxyproto"; service CentrifugoProxy { + // Connect to proxy connection authentication and communicate initial state. rpc Connect(ConnectRequest) returns (ConnectResponse); + // Refresh to proxy decision about connection expiration to the app backend. rpc Refresh(RefreshRequest) returns (RefreshResponse); + // Subscribe to proxy subscription attempts to channels. rpc Subscribe(SubscribeRequest) returns (SubscribeResponse); + // Publish to proxy publication attempts to channels. rpc Publish(PublishRequest) returns (PublishResponse); + // RPC to execute custom logic on the backend over request through the real-time connection. rpc RPC(RPCRequest) returns (RPCResponse); + // SubRefresh to proxy decision about subscription expiration to the app backend. rpc SubRefresh(SubRefreshRequest) returns (SubRefreshResponse); + // SubscribeUnidirectional is an EXPERIMENTAL method which allows handling unidirectional + // subscription streams. Stream starts with SubscribeRequest similar to Subscribe rpc, + // then expects StreamSubscribeResponse with SubscribeResponse as first message, and + // StreamSubscribeResponse with Publication afterwards. + rpc SubscribeUnidirectional(SubscribeRequest) returns (stream StreamSubscribeResponse); + // SubscribeBidirectional is an EXPERIMENTAL method which allows handling bidirectional + // subscription streams. Stream receives StreamSubscribeRequest. First StreamSubscribeRequest + // always contains SubscribeRequest, then StreamSubscribeRequest will contain data published + // by client. Reverse direction works the same way as in SubscribeUnidirectional. + rpc SubscribeBidirectional(stream StreamSubscribeRequest) returns (stream StreamSubscribeResponse); } message Disconnect { @@ -215,3 +231,28 @@ message SubRefreshResponse { Error error = 2; Disconnect disconnect = 3; } + +// Publication is an event to be sent to a client. +// We intentionally make it use the same Protobuf numbers for fields as our client protocol +// Publication - for now only for consistency. +message Publication { + reserved 1, 2, 3, 5, 6; + bytes data = 4; + map tags = 7; +} + +message StreamSubscribeRequest { + // Centrifugo always sends this within the first message upon user subscription request. + // It's always not set in the following StreamRequest messages from Centrifugo. + SubscribeRequest subscribe_request = 1; + // Publication may be set when client publishes to the on-demand stream. If you are using + // bidirectional stream then Centrifugo assumes publications from client-side are allowed. + Publication publication = 2; +} + +message StreamSubscribeResponse { + // SubscribeResponse may optionally be set in the first message from backend to Centrifugo. + SubscribeResponse subscribe_response = 1; + // Publication goes to client. Can't be set in the first message from backend to Centrifugo. + Publication publication = 2; +} diff --git a/internal/proxyproto/proxy_grpc.pb.go b/internal/proxyproto/proxy_grpc.pb.go index 3bf8db2a8c..f9cde80351 100644 --- a/internal/proxyproto/proxy_grpc.pb.go +++ b/internal/proxyproto/proxy_grpc.pb.go @@ -18,12 +18,28 @@ const _ = grpc.SupportPackageIsVersion7 // // For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. type CentrifugoProxyClient interface { + // Connect to proxy connection authentication and communicate initial state. Connect(ctx context.Context, in *ConnectRequest, opts ...grpc.CallOption) (*ConnectResponse, error) + // Refresh to proxy decision about connection expiration to the app backend. Refresh(ctx context.Context, in *RefreshRequest, opts ...grpc.CallOption) (*RefreshResponse, error) + // Subscribe to proxy subscription attempts to channels. Subscribe(ctx context.Context, in *SubscribeRequest, opts ...grpc.CallOption) (*SubscribeResponse, error) + // Publish to proxy publication attempts to channels. Publish(ctx context.Context, in *PublishRequest, opts ...grpc.CallOption) (*PublishResponse, error) + // RPC to execute custom logic on the backend over request through the real-time connection. RPC(ctx context.Context, in *RPCRequest, opts ...grpc.CallOption) (*RPCResponse, error) + // SubRefresh to proxy decision about subscription expiration to the app backend. SubRefresh(ctx context.Context, in *SubRefreshRequest, opts ...grpc.CallOption) (*SubRefreshResponse, error) + // SubscribeUnidirectional is an EXPERIMENTAL method which allows handling unidirectional + // subscription streams. Stream starts with SubscribeRequest similar to Subscribe rpc, + // then expects StreamSubscribeResponse with SubscribeResponse as first message, and + // StreamSubscribeResponse with Publication afterwards. + SubscribeUnidirectional(ctx context.Context, in *SubscribeRequest, opts ...grpc.CallOption) (CentrifugoProxy_SubscribeUnidirectionalClient, error) + // SubscribeBidirectional is an EXPERIMENTAL method which allows handling bidirectional + // subscription streams. Stream receives StreamSubscribeRequest. First StreamSubscribeRequest + // always contains SubscribeRequest, then StreamSubscribeRequest will contain data published + // by client. Reverse direction works the same way as in SubscribeUnidirectional. + SubscribeBidirectional(ctx context.Context, opts ...grpc.CallOption) (CentrifugoProxy_SubscribeBidirectionalClient, error) } type centrifugoProxyClient struct { @@ -88,16 +104,95 @@ func (c *centrifugoProxyClient) SubRefresh(ctx context.Context, in *SubRefreshRe return out, nil } +func (c *centrifugoProxyClient) SubscribeUnidirectional(ctx context.Context, in *SubscribeRequest, opts ...grpc.CallOption) (CentrifugoProxy_SubscribeUnidirectionalClient, error) { + stream, err := c.cc.NewStream(ctx, &CentrifugoProxy_ServiceDesc.Streams[0], "/centrifugal.centrifugo.proxy.CentrifugoProxy/SubscribeUnidirectional", opts...) + if err != nil { + return nil, err + } + x := ¢rifugoProxySubscribeUnidirectionalClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type CentrifugoProxy_SubscribeUnidirectionalClient interface { + Recv() (*StreamSubscribeResponse, error) + grpc.ClientStream +} + +type centrifugoProxySubscribeUnidirectionalClient struct { + grpc.ClientStream +} + +func (x *centrifugoProxySubscribeUnidirectionalClient) Recv() (*StreamSubscribeResponse, error) { + m := new(StreamSubscribeResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func (c *centrifugoProxyClient) SubscribeBidirectional(ctx context.Context, opts ...grpc.CallOption) (CentrifugoProxy_SubscribeBidirectionalClient, error) { + stream, err := c.cc.NewStream(ctx, &CentrifugoProxy_ServiceDesc.Streams[1], "/centrifugal.centrifugo.proxy.CentrifugoProxy/SubscribeBidirectional", opts...) + if err != nil { + return nil, err + } + x := ¢rifugoProxySubscribeBidirectionalClient{stream} + return x, nil +} + +type CentrifugoProxy_SubscribeBidirectionalClient interface { + Send(*StreamSubscribeRequest) error + Recv() (*StreamSubscribeResponse, error) + grpc.ClientStream +} + +type centrifugoProxySubscribeBidirectionalClient struct { + grpc.ClientStream +} + +func (x *centrifugoProxySubscribeBidirectionalClient) Send(m *StreamSubscribeRequest) error { + return x.ClientStream.SendMsg(m) +} + +func (x *centrifugoProxySubscribeBidirectionalClient) Recv() (*StreamSubscribeResponse, error) { + m := new(StreamSubscribeResponse) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + // CentrifugoProxyServer is the server API for CentrifugoProxy service. // All implementations must embed UnimplementedCentrifugoProxyServer // for forward compatibility type CentrifugoProxyServer interface { + // Connect to proxy connection authentication and communicate initial state. Connect(context.Context, *ConnectRequest) (*ConnectResponse, error) + // Refresh to proxy decision about connection expiration to the app backend. Refresh(context.Context, *RefreshRequest) (*RefreshResponse, error) + // Subscribe to proxy subscription attempts to channels. Subscribe(context.Context, *SubscribeRequest) (*SubscribeResponse, error) + // Publish to proxy publication attempts to channels. Publish(context.Context, *PublishRequest) (*PublishResponse, error) + // RPC to execute custom logic on the backend over request through the real-time connection. RPC(context.Context, *RPCRequest) (*RPCResponse, error) + // SubRefresh to proxy decision about subscription expiration to the app backend. SubRefresh(context.Context, *SubRefreshRequest) (*SubRefreshResponse, error) + // SubscribeUnidirectional is an EXPERIMENTAL method which allows handling unidirectional + // subscription streams. Stream starts with SubscribeRequest similar to Subscribe rpc, + // then expects StreamSubscribeResponse with SubscribeResponse as first message, and + // StreamSubscribeResponse with Publication afterwards. + SubscribeUnidirectional(*SubscribeRequest, CentrifugoProxy_SubscribeUnidirectionalServer) error + // SubscribeBidirectional is an EXPERIMENTAL method which allows handling bidirectional + // subscription streams. Stream receives StreamSubscribeRequest. First StreamSubscribeRequest + // always contains SubscribeRequest, then StreamSubscribeRequest will contain data published + // by client. Reverse direction works the same way as in SubscribeUnidirectional. + SubscribeBidirectional(CentrifugoProxy_SubscribeBidirectionalServer) error mustEmbedUnimplementedCentrifugoProxyServer() } @@ -123,6 +218,12 @@ func (UnimplementedCentrifugoProxyServer) RPC(context.Context, *RPCRequest) (*RP func (UnimplementedCentrifugoProxyServer) SubRefresh(context.Context, *SubRefreshRequest) (*SubRefreshResponse, error) { return nil, status.Errorf(codes.Unimplemented, "method SubRefresh not implemented") } +func (UnimplementedCentrifugoProxyServer) SubscribeUnidirectional(*SubscribeRequest, CentrifugoProxy_SubscribeUnidirectionalServer) error { + return status.Errorf(codes.Unimplemented, "method SubscribeUnidirectional not implemented") +} +func (UnimplementedCentrifugoProxyServer) SubscribeBidirectional(CentrifugoProxy_SubscribeBidirectionalServer) error { + return status.Errorf(codes.Unimplemented, "method SubscribeBidirectional not implemented") +} func (UnimplementedCentrifugoProxyServer) mustEmbedUnimplementedCentrifugoProxyServer() {} // UnsafeCentrifugoProxyServer may be embedded to opt out of forward compatibility for this service. @@ -244,6 +345,53 @@ func _CentrifugoProxy_SubRefresh_Handler(srv interface{}, ctx context.Context, d return interceptor(ctx, in, info, handler) } +func _CentrifugoProxy_SubscribeUnidirectional_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(SubscribeRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(CentrifugoProxyServer).SubscribeUnidirectional(m, ¢rifugoProxySubscribeUnidirectionalServer{stream}) +} + +type CentrifugoProxy_SubscribeUnidirectionalServer interface { + Send(*StreamSubscribeResponse) error + grpc.ServerStream +} + +type centrifugoProxySubscribeUnidirectionalServer struct { + grpc.ServerStream +} + +func (x *centrifugoProxySubscribeUnidirectionalServer) Send(m *StreamSubscribeResponse) error { + return x.ServerStream.SendMsg(m) +} + +func _CentrifugoProxy_SubscribeBidirectional_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(CentrifugoProxyServer).SubscribeBidirectional(¢rifugoProxySubscribeBidirectionalServer{stream}) +} + +type CentrifugoProxy_SubscribeBidirectionalServer interface { + Send(*StreamSubscribeResponse) error + Recv() (*StreamSubscribeRequest, error) + grpc.ServerStream +} + +type centrifugoProxySubscribeBidirectionalServer struct { + grpc.ServerStream +} + +func (x *centrifugoProxySubscribeBidirectionalServer) Send(m *StreamSubscribeResponse) error { + return x.ServerStream.SendMsg(m) +} + +func (x *centrifugoProxySubscribeBidirectionalServer) Recv() (*StreamSubscribeRequest, error) { + m := new(StreamSubscribeRequest) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + // CentrifugoProxy_ServiceDesc is the grpc.ServiceDesc for CentrifugoProxy service. // It's only intended for direct use with grpc.RegisterService, // and not to be introspected or modified (even as a copy) @@ -276,6 +424,18 @@ var CentrifugoProxy_ServiceDesc = grpc.ServiceDesc{ Handler: _CentrifugoProxy_SubRefresh_Handler, }, }, - Streams: []grpc.StreamDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "SubscribeUnidirectional", + Handler: _CentrifugoProxy_SubscribeUnidirectional_Handler, + ServerStreams: true, + }, + { + StreamName: "SubscribeBidirectional", + Handler: _CentrifugoProxy_SubscribeBidirectional_Handler, + ServerStreams: true, + ClientStreams: true, + }, + }, Metadata: "proxy.proto", } diff --git a/internal/rule/namespace.go b/internal/rule/namespace.go index 9a51cc8e6f..addefe7d1f 100644 --- a/internal/rule/namespace.go +++ b/internal/rule/namespace.go @@ -132,5 +132,14 @@ type ChannelOptions struct { // SubRefreshProxyName of proxy to use for sub refresh operations in namespace. SubRefreshProxyName string `mapstructure:"sub_refresh_proxy_name" json:"sub_refresh_proxy_name"` + // ProxySubscribeStream enables using subscription stream proxy for the namespace. + ProxySubscribeStream bool `mapstructure:"proxy_subscribe_stream" json:"proxy_subscribe_stream"` + + // ProxySubscribeStreamBidirectional enables using bidirectional stream proxy for the namespace. + ProxySubscribeStreamBidirectional bool `mapstructure:"proxy_subscribe_stream_bidirectional" json:"proxy_subscribe_stream_bidirectional"` + + // SubscribeStreamProxyName of proxy to use for subscribe stream operations in namespace. + SubscribeStreamProxyName string `mapstructure:"subscribe_stream_proxy_name" json:"subscribe_stream_proxy_name"` + Compiled } diff --git a/internal/rule/rule.go b/internal/rule/rule.go index 6873067a72..f480580037 100644 --- a/internal/rule/rule.go +++ b/internal/rule/rule.go @@ -139,6 +139,9 @@ func ValidateChannelOptions(c ChannelOptions) error { return fmt.Errorf("invalid channel regex %s: %w", c.ChannelRegex, err) } } + if (c.ProxySubscribeStream || c.SubscribeStreamProxyName != "") && (c.ProxySubscribe || c.ProxyPublish || c.ProxySubRefresh) { + return fmt.Errorf("can't use subscribe stream proxy together with subscribe, publish or sub refresh proxies") + } return nil } diff --git a/internal/subsource/source.go b/internal/subsource/source.go index f3931ce608..e07b020ff5 100644 --- a/internal/subsource/source.go +++ b/internal/subsource/source.go @@ -15,4 +15,6 @@ const ( UserPersonal = 9 ServerAPI = 10 ConnectionCap = 11 + Expression = 12 + StreamProxy = 13 ) diff --git a/internal/tools/test_helpers.go b/internal/tools/test_helpers.go index 50eded23e4..eebe4587fe 100644 --- a/internal/tools/test_helpers.go +++ b/internal/tools/test_helpers.go @@ -215,6 +215,21 @@ type TestClientMock struct { storage map[string]any } +func (m *TestClientMock) Send(bytes []byte) error { + //TODO implement me + panic("implement me") +} + +func (m *TestClientMock) Disconnect(disconnect ...centrifuge.Disconnect) { + //TODO implement me + panic("implement me") +} + +func (m *TestClientMock) WritePublication(channel string, publication *centrifuge.Publication, sp centrifuge.StreamPosition) error { + //TODO implement me + panic("implement me") +} + func (m *TestClientMock) ID() string { if m.IDFunc != nil { return m.IDFunc() @@ -260,3 +275,8 @@ func (m *TestClientMock) AcquireStorage() (map[string]any, func(map[string]any)) m.storageMu.Unlock() } } + +func (m *TestClientMock) Unsubscribe(ch string, unsubscribe ...centrifuge.Unsubscribe) { + //TODO implement me + panic("implement me") +} diff --git a/main.go b/main.go index d4c8d97c70..ba1e214073 100644 --- a/main.go +++ b/main.go @@ -253,18 +253,22 @@ var defaults = map[string]any{ "refresh_proxy_name": "", "rpc_proxy_name": "", - "proxy_connect_endpoint": "", - "proxy_refresh_endpoint": "", - "proxy_subscribe_endpoint": "", - "proxy_publish_endpoint": "", - "proxy_sub_refresh_endpoint": "", - "proxy_rpc_endpoint": "", - "proxy_connect_timeout": time.Second, - "proxy_rpc_timeout": time.Second, - "proxy_refresh_timeout": time.Second, - "proxy_subscribe_timeout": time.Second, - "proxy_publish_timeout": time.Second, - "proxy_sub_refresh_timeout": time.Second, + "proxy_connect_endpoint": "", + "proxy_refresh_endpoint": "", + "proxy_subscribe_endpoint": "", + "proxy_publish_endpoint": "", + "proxy_sub_refresh_endpoint": "", + "proxy_rpc_endpoint": "", + "proxy_subscribe_stream_endpoint": "", + + "proxy_connect_timeout": time.Second, + "proxy_rpc_timeout": time.Second, + "proxy_refresh_timeout": time.Second, + "proxy_subscribe_timeout": time.Second, + "proxy_publish_timeout": time.Second, + "proxy_sub_refresh_timeout": time.Second, + "proxy_subscribe_stream_timeout": time.Second, + "proxy_grpc_metadata": []string{}, "proxy_http_headers": []string{}, "proxy_static_http_headers": map[string]string{}, @@ -530,9 +534,9 @@ func main() { proxyMap, proxyEnabled = proxyMapConfig() } - nodeConfig := nodeConfig(build.Version) + nodeCfg := nodeConfig(build.Version) - node, err := centrifuge.New(nodeConfig) + node, err := centrifuge.New(nodeCfg) if err != nil { log.Fatal().Msgf("error creating Centrifuge Node: %v", err) } @@ -747,7 +751,7 @@ func main() { exporter = graphite.New(graphite.Config{ Address: net.JoinHostPort(viper.GetString("graphite_host"), strconv.Itoa(viper.GetInt("graphite_port"))), Gatherer: prometheus.DefaultGatherer, - Prefix: strings.TrimSuffix(viper.GetString("graphite_prefix"), ".") + "." + graphite.PreparePathComponent(nodeConfig.Name), + Prefix: strings.TrimSuffix(viper.GetString("graphite_prefix"), ".") + "." + graphite.PreparePathComponent(nodeCfg.Name), Interval: GetDuration("graphite_interval"), Tags: viper.GetBool("graphite_tags"), }) @@ -1593,7 +1597,6 @@ func ruleConfig() rule.Config { cfg.AllowPositioning = v.GetBool("allow_positioning") cfg.AllowRecovery = v.GetBool("allow_recovery") cfg.ForceRecovery = v.GetBool("force_recovery") - cfg.AllowRecovery = v.GetBool("allow_recovery") cfg.SubscribeForAnonymous = v.GetBool("allow_subscribe_for_anonymous") cfg.SubscribeForClient = v.GetBool("allow_subscribe_for_client") cfg.PublishForAnonymous = v.GetBool("allow_publish_for_anonymous") @@ -1613,6 +1616,8 @@ func ruleConfig() rule.Config { cfg.SubscribeProxyName = v.GetString("subscribe_proxy_name") cfg.PublishProxyName = v.GetString("publish_proxy_name") cfg.SubRefreshProxyName = v.GetString("sub_refresh_proxy_name") + cfg.ProxySubscribeStream = v.GetBool("proxy_stream_subscribe") + cfg.ProxySubscribeStreamBidirectional = v.GetBool("proxy_subscribe_stream_bidirectional") cfg.Namespaces = namespacesFromConfig(v) @@ -1721,31 +1726,34 @@ func GetDuration(key string, secondsPrecision ...bool) time.Duration { func proxyMapConfig() (*client.ProxyMap, bool) { v := viper.GetViper() + proxyMap := &client.ProxyMap{ - SubscribeProxies: map[string]proxy.SubscribeProxy{}, - PublishProxies: map[string]proxy.PublishProxy{}, - RpcProxies: map[string]proxy.RPCProxy{}, - SubRefreshProxies: map[string]proxy.SubRefreshProxy{}, + SubscribeProxies: map[string]proxy.SubscribeProxy{}, + PublishProxies: map[string]proxy.PublishProxy{}, + RpcProxies: map[string]proxy.RPCProxy{}, + SubRefreshProxies: map[string]proxy.SubRefreshProxy{}, + SubscribeStreamProxies: map[string]*proxy.SubscribeStreamProxy{}, + } + + proxyConfig := proxy.Config{ + BinaryEncoding: v.GetBool("proxy_binary_encoding"), + IncludeConnectionMeta: v.GetBool("proxy_include_connection_meta"), + GrpcCertFile: v.GetString("proxy_grpc_cert_file"), + GrpcCredentialsKey: v.GetString("proxy_grpc_credentials_key"), + GrpcCredentialsValue: v.GetString("proxy_grpc_credentials_value"), + GrpcMetadata: v.GetStringSlice("proxy_grpc_metadata"), } - p := proxy.Proxy{} - p.GrpcMetadata = v.GetStringSlice("proxy_grpc_metadata") - p.HttpHeaders = v.GetStringSlice("proxy_http_headers") - for i, header := range p.HttpHeaders { - p.HttpHeaders[i] = strings.ToLower(header) + proxyConfig.HttpHeaders = v.GetStringSlice("proxy_http_headers") + for i, header := range proxyConfig.HttpHeaders { + proxyConfig.HttpHeaders[i] = strings.ToLower(header) } staticHttpHeaders, err := tools.MapStringString(v, "proxy_static_http_headers") if err != nil { log.Fatal().Err(err).Msg("malformed configuration for proxy_static_http_headers") } - p.StaticHttpHeaders = staticHttpHeaders - - p.BinaryEncoding = v.GetBool("proxy_binary_encoding") - p.IncludeConnectionMeta = v.GetBool("proxy_include_connection_meta") - p.GrpcCertFile = v.GetString("proxy_grpc_cert_file") - p.GrpcCredentialsKey = v.GetString("proxy_grpc_credentials_key") - p.GrpcCredentialsValue = v.GetString("proxy_grpc_credentials_value") + proxyConfig.StaticHttpHeaders = staticHttpHeaders connectEndpoint := v.GetString("proxy_connect_endpoint") connectTimeout := GetDuration("proxy_connect_timeout") @@ -1759,12 +1767,17 @@ func proxyMapConfig() (*client.ProxyMap, bool) { publishTimeout := GetDuration("proxy_publish_timeout") subRefreshEndpoint := v.GetString("proxy_sub_refresh_endpoint") subRefreshTimeout := GetDuration("proxy_sub_refresh_timeout") + proxyStreamSubscribeEndpoint := v.GetString("proxy_subscribe_stream_endpoint") + if strings.HasPrefix(proxyStreamSubscribeEndpoint, "http") { + log.Fatal().Msg("error creating subscribe stream proxy: only GRPC endpoints supported") + } + proxyStreamSubscribeTimeout := GetDuration("proxy_subscribe_stream_timeout") if connectEndpoint != "" { - p.Endpoint = connectEndpoint - p.Timeout = tools.Duration(connectTimeout) + proxyConfig.Endpoint = connectEndpoint + proxyConfig.Timeout = tools.Duration(connectTimeout) var err error - proxyMap.ConnectProxy, err = proxy.GetConnectProxy(p) + proxyMap.ConnectProxy, err = proxy.GetConnectProxy(proxyConfig) if err != nil { log.Fatal().Msgf("error creating connect proxy: %v", err) } @@ -1772,10 +1785,10 @@ func proxyMapConfig() (*client.ProxyMap, bool) { } if refreshEndpoint != "" { - p.Endpoint = refreshEndpoint - p.Timeout = tools.Duration(refreshTimeout) + proxyConfig.Endpoint = refreshEndpoint + proxyConfig.Timeout = tools.Duration(refreshTimeout) var err error - proxyMap.RefreshProxy, err = proxy.GetRefreshProxy(p) + proxyMap.RefreshProxy, err = proxy.GetRefreshProxy(proxyConfig) if err != nil { log.Fatal().Msgf("error creating refresh proxy: %v", err) } @@ -1783,9 +1796,9 @@ func proxyMapConfig() (*client.ProxyMap, bool) { } if subscribeEndpoint != "" { - p.Endpoint = subscribeEndpoint - p.Timeout = tools.Duration(subscribeTimeout) - sp, err := proxy.GetSubscribeProxy(p) + proxyConfig.Endpoint = subscribeEndpoint + proxyConfig.Timeout = tools.Duration(subscribeTimeout) + sp, err := proxy.GetSubscribeProxy(proxyConfig) if err != nil { log.Fatal().Msgf("error creating subscribe proxy: %v", err) } @@ -1794,9 +1807,9 @@ func proxyMapConfig() (*client.ProxyMap, bool) { } if publishEndpoint != "" { - p.Endpoint = publishEndpoint - p.Timeout = tools.Duration(publishTimeout) - pp, err := proxy.GetPublishProxy(p) + proxyConfig.Endpoint = publishEndpoint + proxyConfig.Timeout = tools.Duration(publishTimeout) + pp, err := proxy.GetPublishProxy(proxyConfig) if err != nil { log.Fatal().Msgf("error creating publish proxy: %v", err) } @@ -1805,9 +1818,9 @@ func proxyMapConfig() (*client.ProxyMap, bool) { } if rpcEndpoint != "" { - p.Endpoint = rpcEndpoint - p.Timeout = tools.Duration(rpcTimeout) - rp, err := proxy.GetRpcProxy(p) + proxyConfig.Endpoint = rpcEndpoint + proxyConfig.Timeout = tools.Duration(rpcTimeout) + rp, err := proxy.GetRpcProxy(proxyConfig) if err != nil { log.Fatal().Msgf("error creating rpc proxy: %v", err) } @@ -1816,9 +1829,9 @@ func proxyMapConfig() (*client.ProxyMap, bool) { } if subRefreshEndpoint != "" { - p.Endpoint = subRefreshEndpoint - p.Timeout = tools.Duration(subRefreshTimeout) - srp, err := proxy.GetSubRefreshProxy(p) + proxyConfig.Endpoint = subRefreshEndpoint + proxyConfig.Timeout = tools.Duration(subRefreshTimeout) + srp, err := proxy.GetSubRefreshProxy(proxyConfig) if err != nil { log.Fatal().Msgf("error creating sub refresh proxy: %v", err) } @@ -1826,21 +1839,34 @@ func proxyMapConfig() (*client.ProxyMap, bool) { log.Info().Str("endpoint", subRefreshEndpoint).Msg("sub refresh proxy enabled") } + if proxyStreamSubscribeEndpoint != "" { + proxyConfig.Endpoint = proxyStreamSubscribeEndpoint + proxyConfig.Timeout = tools.Duration(proxyStreamSubscribeTimeout) + streamProxy, err := proxy.NewSubscribeStreamProxy(proxyConfig) + if err != nil { + log.Fatal().Msgf("error creating subscribe stream proxy: %v", err) + } + proxyMap.SubscribeStreamProxies[""] = streamProxy + log.Info().Str("endpoint", proxyStreamSubscribeEndpoint).Msg("subscribe stream proxy enabled") + } + proxyEnabled := connectEndpoint != "" || refreshEndpoint != "" || - rpcEndpoint != "" || subscribeEndpoint != "" || publishEndpoint != "" || subRefreshEndpoint != "" + rpcEndpoint != "" || subscribeEndpoint != "" || publishEndpoint != "" || + subRefreshEndpoint != "" || proxyStreamSubscribeEndpoint != "" return proxyMap, proxyEnabled } func granularProxyMapConfig(ruleConfig rule.Config) (*client.ProxyMap, bool) { proxyMap := &client.ProxyMap{ - RpcProxies: map[string]proxy.RPCProxy{}, - PublishProxies: map[string]proxy.PublishProxy{}, - SubscribeProxies: map[string]proxy.SubscribeProxy{}, - SubRefreshProxies: map[string]proxy.SubRefreshProxy{}, + RpcProxies: map[string]proxy.RPCProxy{}, + PublishProxies: map[string]proxy.PublishProxy{}, + SubscribeProxies: map[string]proxy.SubscribeProxy{}, + SubRefreshProxies: map[string]proxy.SubRefreshProxy{}, + SubscribeStreamProxies: map[string]*proxy.SubscribeStreamProxy{}, } proxyList := granularProxiesFromConfig(viper.GetViper()) - proxies := make(map[string]proxy.Proxy) + proxies := make(map[string]proxy.Config) for _, p := range proxyList { for i, header := range p.HttpHeaders { p.HttpHeaders[i] = strings.ToLower(header) @@ -1918,10 +1944,28 @@ func granularProxyMapConfig(ruleConfig rule.Config) (*client.ProxyMap, bool) { proxyEnabled = true } + subscribeStreamProxyName := ruleConfig.SubscribeStreamProxyName + if subscribeStreamProxyName != "" { + p, ok := proxies[subscribeStreamProxyName] + if !ok { + log.Fatal().Msgf("subscribe stream proxy not found: %s", subscribeStreamProxyName) + } + if strings.HasPrefix(p.Endpoint, "http") { + log.Fatal().Msgf("error creating subscribe stream proxy %s only GRPC endpoints supported", subscribeStreamProxyName) + } + sp, err := proxy.NewSubscribeStreamProxy(p) + if err != nil { + log.Fatal().Msgf("error creating subscribe proxy: %v", err) + } + proxyMap.SubscribeStreamProxies[subscribeProxyName] = sp + proxyEnabled = true + } + for _, ns := range ruleConfig.Namespaces { subscribeProxyName := ns.SubscribeProxyName publishProxyName := ns.PublishProxyName subRefreshProxyName := ns.SubRefreshProxyName + subscribeStreamProxyName := ns.SubscribeStreamProxyName if subscribeProxyName != "" { p, ok := proxies[subscribeProxyName] @@ -1961,6 +2005,22 @@ func granularProxyMapConfig(ruleConfig rule.Config) (*client.ProxyMap, bool) { proxyMap.SubRefreshProxies[subRefreshProxyName] = srp proxyEnabled = true } + + if subscribeStreamProxyName != "" { + p, ok := proxies[subscribeStreamProxyName] + if !ok { + log.Fatal().Msgf("subscribe stream proxy not found: %s", subscribeStreamProxyName) + } + if strings.HasPrefix(p.Endpoint, "http") { + log.Fatal().Msgf("error creating subscribe stream proxy %s only GRPC endpoints supported", subscribeStreamProxyName) + } + ssp, err := proxy.NewSubscribeStreamProxy(p) + if err != nil { + log.Fatal().Msgf("error creating subscribe stream proxy: %v", err) + } + proxyMap.SubscribeStreamProxies[subscribeStreamProxyName] = ssp + proxyEnabled = true + } } rpcProxyName := ruleConfig.RpcProxyName @@ -1999,8 +2059,8 @@ func granularProxyMapConfig(ruleConfig rule.Config) (*client.ProxyMap, bool) { var proxyNamePattern = "^[-a-zA-Z0-9_.]{2,}$" var proxyNameRe = regexp.MustCompile(proxyNamePattern) -func granularProxiesFromConfig(v *viper.Viper) []proxy.Proxy { - var proxies []proxy.Proxy +func granularProxiesFromConfig(v *viper.Viper) []proxy.Config { + var proxies []proxy.Config if !v.IsSet("proxies") { return proxies }