diff --git a/docs/server/channels.md b/docs/server/channels.md index 3a94a3a0a..b9c20b3fe 100644 --- a/docs/server/channels.md +++ b/docs/server/channels.md @@ -430,6 +430,10 @@ Centrifugo uses Go language [regexp](https://pkg.go.dev/regexp) package for regu `proxy_sub_refresh` (boolean, default `false`) – turns on sub refresh proxy, more info in [proxy chapter](proxy.md) +### proxy_subscribe_stream + +`proxy_subscribe_stream` (boolean, default `false`) - turns on subscribe stream proxy, see [subscription streams](./proxy_streams.md) + ### subscribe_proxy_name `subscribe_proxy_name` (string, default `""`) – turns on subscribe proxy when [granular proxy mode](proxy.md#granular-proxy-mode) is used. Note that `proxy_subscribe` option defined above is ignored in granular proxy mode. @@ -442,6 +446,10 @@ Centrifugo uses Go language [regexp](https://pkg.go.dev/regexp) package for regu `sub_refresh_proxy_name` (string, default `""`) – turns on sub refresh proxy when [granular proxy mode](proxy.md#granular-proxy-mode) is used. Note that `proxy_sub_refresh` option defined above is ignored in granular proxy mode. +### subscribe_stream_proxy_name + +`subscribe_stream_proxy_name` (string, default `""`) – turns on subscribe stream proxy when [granular proxy mode](proxy_streams.md#granular-proxy-mode) is used. Note that `proxy_subscribe_stream` option defined above is ignored in granular proxy mode. + ## Channel config examples Let's look at how to set some of these options in a config. In this example we turning on presence, history features, forcing publication recovery. Also allowing all client connections (including anonymous users) to subscribe to channels and call publish, history, presence APIs if subscribed. diff --git a/docs/server/proxy.md b/docs/server/proxy.md index 154fdedeb..15778eac9 100644 --- a/docs/server/proxy.md +++ b/docs/server/proxy.md @@ -13,6 +13,7 @@ The list of events that can be proxied: * `sub_refresh` - called when a client subscription is going to expire, so it's possible to prolong it or just let it expire. Can also be used just as a periodical subscription liveness callback from Centrifugo to app backend. Works for bidirectional and unidirectional transports. * `publish` - called when a client tries to publish into a channel, so it's possible to check permissions and optionally modify publication data. Works for bidirectional transports only. * `rpc` - called when a client sends RPC, you can do whatever logic you need based on a client-provided RPC method and data. Works for bidirectional transports only. +* and starting from Centrifugo v5.1.0 we have experimental [proxy subscription streams](./proxy_streams.md) which are a bit special, so we describe them in a special doc chapter. At the moment Centrifugo can proxy these events over two protocols: diff --git a/docs/server/proxy_streams.md b/docs/server/proxy_streams.md index 09593744b..cc0430da6 100644 --- a/docs/server/proxy_streams.md +++ b/docs/server/proxy_streams.md @@ -1,50 +1,48 @@ --- id: proxy_streams -sidebar_label: Proxy streams -title: Proxy streams +sidebar_label: Proxy subscription streams +title: Proxy subscription streams draft: true --- :::caution Experimental -This is an experimental feature. We appreciate your feedback to make sure it's useful and solves real-world problems before marking it as stable and commit to the API. +This is an experimental extension of Centrifugo [proxy](./proxy.md). We appreciate your feedback to make sure it's useful and solves real-world problems before marking it as stable and commit to the API. ::: -Proxy streams allow pushing data towards client channel subscription (and optionally connection itself) directly and individually from your application backend over the unidirectional [GRPC](https://grpc.io/) stream. Additionally, bidirectional GRPC streams may be utilized to stream data in both directions. +Proxy subscription streams (available since Centrifugo v5.1.0) allow pushing data towards client channel subscription directly and individually from your application backend over the unidirectional [GRPC](https://grpc.io/) stream. Additionally, bidirectional GRPC streams may be utilized to stream data in both directions. -The stream is established between Centrifugo and your application backend as soon as user subscribes to a channel (or connects to Centrifugo). The scheme may be useful if you want to generate individual streams and these streams should only work for a time while client is subscribed to a channel (or connected to Centrifugo). +The stream is established between Centrifugo and your application backend as soon as user subscribes to a channel. Subscription streams may be useful if you want to generate individual streams and these streams should only work for a time while client is subscribed to a channel. In this case Centrifugo plays a role of WebSocket-to-GRPC streaming proxy – keeping numerous real-time connections from your application's clients and establishing GRPC streams to the backend, multiplexing them using a pool of HTTP/2 (transport used by GRPC) connections: ![](/img/on_demand_stream_connections.png) -BTW, our bidirectional WebSocket fallbacks (HTTP-streaming and SSE) and experimental WebTransport work with proxy streams too. So it's possible to say that Centrifugo may be also Webtransport-to-GRPC proxy or SSE-to-GRPC proxy. +Our bidirectional WebSocket fallbacks (HTTP-streaming and SSE) and experimental WebTransport work with proxy subscription streams too. So it's possible to say that Centrifugo may be also Webtransport-to-GRPC proxy or SSE-to-GRPC proxy. ### Scalability concerns -Using proxy streams increases resource usage on both Centrifugo and app backend sides because it involves more moving parts such as goroutines, additional buffers, connections, etc. +Using proxy subscription streams increases resource usage on both Centrifugo and app backend sides because it involves more moving parts such as goroutines, additional buffers, connections, etc. -The feature is quite niche actually. Read carefully the motivation for proxy streams described in this doc. If you don't really need proxy streams – prefer using Centrifugo usual approach by always publishing messages to channels over [Centrifugo publish API](./server_api.md#publish) whenever an event happens. This is efficient and Centrifugo just drops messages in case of no active subscribers in a channel. I.e. follow our [idiomatic guidelines](./../getting-started/design.md). +The feature is quite niche. Read carefully the motivation described in this doc. If you don't really need proxy streams – prefer using Centrifugo usual approach by always publishing messages to channels over [Centrifugo publish API](./server_api.md#publish) whenever an event happens. This is efficient and Centrifugo just drops messages in case of no active subscribers in a channel. I.e. follow our [idiomatic guidelines](./../getting-started/design.md). :::tip -Use proxy streams only when really needed. +Use proxy subscription streams only when really needed. Specifically, proxy subscription stream may be very useful to stream data for a limited time upon some user action in the app. ::: -Proxy streams should scale well horizontally with adding more servers. But scaling GRPC is more involved and using GRPC streams results into more resources utilized than with the common Centrifugo approach, so make sure the resource consumption is sufficient for your system by performing load tests with your expected load profile. +At the same time proxy subscription streams should scale well horizontally with adding more servers. But scaling GRPC is more involved and using GRPC streams results into more resources utilized than with the common Centrifugo approach, so make sure the resource consumption is sufficient for your system by performing load tests with your expected load profile. The thing is that sometimes proxy streams is the only way to achieve the desired behaviour – at that point they shine even though require more resources and developer effort. Also, not every use case involves tens of thousands of subscriptions/connections to worry about – be realistic about your practical situation. -## Subscription streams +### Motivation and design -Let's start from subscription streams as they seem to add more value for Centrifugo users. Here is a diagram which shows the sequence of events happening when using subscription streams: +Here is a diagram which shows the sequence of events happening when using subscription streams: ![](/img/proxy_subscribe.png) -### Motivation and design - Subscription streams generally solve a task of integrating with third-party streaming providers or external process, possibly with custom filtering. They come into play when it's not feasible to continuously stream all data to various channels, and when you need to deallocate resources on the backend side as soon as stream is not needed anymore. Subscription streams may be also considered as streaming requests – an isolated way to stream something from the backend to the client or from the client to the backend. @@ -79,6 +77,8 @@ Don't forget that Centrifugo namespace system is very flexible – so you can al ### Unidirectional subscription streams +From the configuration point of view subscription streams may be enabled for channel namespace just as additional type of [proxy](./proxy.md). The important difference is that **only GRPC endpoints may be used** - as we are using GRPC streaming RPCs for this functionality. + You can configure subscription streams for channels very similar to how [subscribe proxy](../server/proxy.md#subscribe-proxy) is configured. First, configure subscribe stream proxy, pointing it to the backend which implements our proxy stream GRPC service contract: @@ -86,24 +86,24 @@ First, configure subscribe stream proxy, pointing it to the backend which implem ```json title="config.json" { ... - "proxy_stream_subscribe_endpoint": "grpc://localhost:12000", - "proxy_stream_subscribe_timeout": "3s" + "proxy_subscribe_stream_endpoint": "grpc://localhost:12000", + "proxy_subscribe_stream_timeout": "3s" } ``` -Only `grpc://` endpoints are supported since we are heavily relying on GRPC streaming ecosystem here. In this case `proxy_stream_subscribe_timeout` defines a time how long Centrifugo waits for a first message from a stream which contains subscription details to transfer to a client. +Only `grpc://` endpoints are supported since we are heavily relying on GRPC streaming ecosystem here. In this case `proxy_subscribe_stream_timeout` defines a time how long Centrifugo waits for a first message from a stream which contains subscription details to transfer to a client. Then you can enable subscription streams for channels on a namespace level: ```json title="config.json" { ... - "proxy_stream_subscribe_endpoint": "grpc://localhost:12000", - "proxy_stream_subscribe_timeout": "3s", + "proxy_subscribe_stream_endpoint": "grpc://localhost:12000", + "proxy_subscribe_stream_timeout": "3s", "namespaces": [ { "name": "streams", - "proxy_stream_subscribe": true + "proxy_subscribe_stream": true } ] } @@ -118,19 +118,15 @@ You can not use subscribe, publish, sub_refresh proxy configurations together wi That's it on Centrifugo side. Now on the app backend you should implement GRPC service according to the following definitions: ```php -// CentrifugoProxyStream allows proxying Centrifugo connections and channel subscriptions -// to the application backend in form of unidirectional or bidirectional streams. This way -// it's possible to achieve on-demand streaming when data is only exchanged while client is -// connected or subscribed. -service CentrifugoProxyStream { +service CentrifugoProxy { ... // SubscribeUnidirectional allows handling unidirectional subscription streams. - rpc SubscribeUnidirectional(SubscribeRequest) returns (stream ChannelResponse); + rpc SubscribeUnidirectional(SubscribeRequest) returns (stream StreamSubscribeResponse); ... } ``` -Just follow GRPC tutorials for your programming language to generate server stubs from our Protobuf schema. +GRPC service definitions can be found in the Centrifugo repository: [proxy.proto](https://github.com/centrifugal/centrifugo/blob/master/internal/proxyproto/proxy.proto) - same as [we described before](./proxy.md#grpc-proxy), probably you already have a service which implements some methods from it. If you don't – just follow [GRPC tutorials](https://grpc.io/docs/languages/) for your programming language to generate server stubs from our Protobuf schema – and you are ready to describe stream logic. Here we are looking at unidirectional subscription stream – so the next thing to do is to implement streaming handler on the application backend side which contains stream business logic, i.e. implement `SubscribeUnidirectional` streaming rpc handler. A basic example of such handler in Go may look like this (error handling skipped for brevity): @@ -145,24 +141,24 @@ import ( "strconv" "time" - pb "example/proxystreamproto" + pb "example/proxyproto" "google.golang.org/grpc" ) type streamServer struct { - pb.UnimplementedCentrifugoProxyStreamServer + pb.UnimplementedCentrifugoProxyServer } func (s *streamerServer) SubscribeUnidirectional( req *pb.SubscribeRequest, - stream pb.CentrifugoProxyStream_SubscribeUnidirectionalServer, + stream pb.CentrifugoProxy_SubscribeUnidirectionalServer, ) error { started := time.Now() fmt.Println("unidirectional subscribe called with request", req) defer func() { fmt.Println("unidirectional subscribe finished, elapsed", time.Since(started)) }() - _ = stream.Send(&pb.ChannelResponse{ + _ = stream.Send(&pb.StreamSubscribeResponse{ SubscribeResponse: &pb.SubscribeResponse{}, }) // Now publish data to a stream every 1 second. @@ -173,14 +169,14 @@ func (s *streamerServer) SubscribeUnidirectional( case <-time.After(1000 * time.Millisecond): } pub := &pb.Publication{Data: []byte(`{"input": "` + strconv.Itoa(i) + `"}`)} - _ = stream.Send(&pb.ChannelResponse{Publication: pub}) + _ = stream.Send(&pb.StreamSubscribeResponse{Publication: pub}) } } func main() { lis, _ := net.Listen("tcp", ":12000") s := grpc.NewServer(grpc.MaxConcurrentStreams(math.MaxUint32)) - pb.RegisterCentrifugoProxyStreamServer(s, &streamServer{}) + pb.RegisterCentrifugoProxyServer(s, &streamServer{}) _ = s.Serve(lis) } ``` @@ -191,7 +187,7 @@ Note we have increased `grpc.MaxConcurrentStreams` for server to handle more sim ::: -Centrifugo has some rules about messages in streams. Upon stream establishement Centrifugo expects backend to send first message from a stream - this is a `ChannelResponse` with `SubscribeResponse` in it. Centrifugo waits for this message before replying to the client's subscription command. This way we can communicate initial state with a client and make sure streaming is properly established with all permission checks passed. After sending initial message you can send events (publications) as they appear in your system. +Centrifugo has some rules about messages in streams. Upon stream establishement Centrifugo expects backend to send first message from a stream - this is a `StreamSubscribeResponse` with `SubscribeResponse` in it. Centrifugo waits for this message before replying to the client's subscription command. This way we can communicate initial state with a client and make sure streaming is properly established with all permission checks passed. After sending initial message you can send events (publications) as they appear in your system. Now everything should be ready to test it out from the client side: just subscribe to a channel where stream proxy is on with our SDK – and you will see your stream handler called and data streamed from it to a client. For example, with our Javascript SDK: @@ -219,18 +215,18 @@ In addition to unidirectional streams, Centrifugo supports bidirectional streams In terms of general design bidirectional streams behave similar to unidirectional streams as described above. -When enabling subscription streams, Centrifugo uses unidirectional GRPC streams by default – as those should fit most of the use cases proxy subscription streams were introduced for. To tell Centrifugo use bidirectional streaming add `proxy_stream_subscribe_bidirectional` flag to the namespace configuration: +When enabling subscription streams, Centrifugo uses unidirectional GRPC streams by default – as those should fit most of the use cases proxy subscription streams were introduced for. To tell Centrifugo use bidirectional streaming add `proxy_subscribe_stream_bidirectional` flag to the namespace configuration: ```json title="config.json" { ... - "proxy_stream_subscribe_endpoint": "grpc://localhost:12000", - "proxy_stream_subscribe_timeout": "3s", + "proxy_subscribe_stream_endpoint": "grpc://localhost:12000", + "proxy_subscribe_stream_timeout": "3s", "namespaces": [ { "name": "streams", - "proxy_stream_subscribe": true, - "proxy_stream_subscribe_bidirectional": true + "proxy_subscribe_stream": true, + "proxy_subscribe_stream_bidirectional": true } ] } @@ -239,25 +235,21 @@ When enabling subscription streams, Centrifugo uses unidirectional GRPC streams On the backend you need to implement the following streaming handler: ```php -// CentrifugoProxyStream allows proxying Centrifugo connections and channel subscriptions -// to the application backend in form of unidirectional or bidirectional streams. This way -// it's possible to achieve on-demand streaming when data is only exchanged while client is -// connected or subscribed. -service CentrifugoProxyStream { +service CentrifugoProxy { ... // SubscribeBidirectional allows handling bidirectional subscription streams. - rpc SubscribeBidirectional(stream ChannelRequest) returns (stream ChannelResponse); + rpc SubscribeBidirectional(stream StreamSubscribeRequest) returns (stream StreamSubscribeResponse); ... } ``` -The first `ChannelRequest` message in stream will contain `SubscribeRequest` and Centrifugo expects `ChannelResponse` with `SubscribeResponse` from the backend – just like in unidirectional case described above. +The first `StreamSubscribeRequest` message in stream will contain `SubscribeRequest` and Centrifugo expects `StreamSubscribeResponse` with `SubscribeResponse` from the backend – just like in unidirectional case described above. An example of such handler in Go language which echoes back all publications from client (error handling skipped for brevity): ```go func (s *streamerServer) SubscribeBidirectional( - stream pb.CentrifugoProxyStream_SubscribeBidirectionalServer, + stream pb.CentrifugoProxy_SubscribeBidirectionalServer, ) error { started := time.Now() fmt.Println("bidirectional subscribe called") @@ -267,7 +259,7 @@ func (s *streamerServer) SubscribeBidirectional( // First message always contains SubscribeRequest. req, _ := stream.Recv() fmt.Println("subscribe request received", req.SubscribeRequest) - _ = stream.Send(&pb.ChannelResponse{ + _ = stream.Send(&pb.StreamSubscribeResponse{ SubscribeResponse: &pb.SubscribeResponse{}, }) // The following messages contain publications from client. @@ -277,138 +269,42 @@ func (s *streamerServer) SubscribeBidirectional( fmt.Println("data from client", string(data)) var cd clientData pub := &pb.Publication{Data: data} - _ = stream.Send(&pb.ChannelResponse{Publication: pub}) + _ = stream.Send(&pb.StreamSubscribeResponse{Publication: pub}) } } ``` -## Connection streams - -![](/img/proxy_connect.png) - -### Motivation and design - -That's actually an approach which allows Centrifugo to offer a support for `disconnect` hooks in some form. As you know we do not want to add disconnect hooks to a [standard Centrifugo proxy](./proxy.md) because the disconnect events [are hard to be delivered reliably and in non-racy way](../faq/index.md#why-centrifugo-does-not-have-disconnect-hooks). And we suggest working around that based on periodic pings. Connection streams slightly change our concerns since the stream is canceled as soon as client goes away or Centrifugo node restarted and your backend has a chance to notice that no matter what – since a persistent link in form of GRPC stream will be closed. So we are delegating the responsibility to reliably handle disconnections from Centrifugo to the backend – making the behavior more predictable. - -In terms of general behavior connection streams mostly match subscription streams: - -* when client disconnects – stream to the backend is closed by Centrifugo -* if connection between Centrifugo and backend is lost – client is disconnected with `insufficient state` reason and will reconnect soon automatically -* if stream is cleanly finished by the backend - client will be disconnected with advice to not reconnect -* in bidirectional case client is able to stream data to the backend using `.send(data)` method of our SDKs - -### Unidirectional connections streams - -When the following is configured: - -```json title="config.json" -{ - ... - "proxy_stream_connect_endpoint": "grpc://localhost:12000", - "proxy_stream_connect_timeout": "3s" -} -``` - -– then Centrifugo will start stream to the backend whenever client connects to Centrifugo. - -On the backend you need to have a streaming handler which implement `ConnectUnidirectional` GRPC contract: - -```php -service CentrifugoProxyStream { - ... - // ConnectUnidirectional allows handling unidirectional connection streams. - rpc ConnectUnidirectional(ConnectRequest) returns (stream Response); - ... -} -``` - -Note, that just like in subscription stream case Centrifugo expects first message from the backend in the stream to be a `ConnectResponse` to communicate authentication info and initial state of the client. - -Here is an example of unidirectional connect stream which authenticates connection and will be closed as soon as client is disconnected: - -```go -func (s *streamerServer) ConnectUnidirectional( - req *pb.SubscribeRequest, - stream pb.CentrifugoProxyStream_SubscribeUnidirectionalServer, -) error { - started := time.Now() - fmt.Println("unidirectional connect called with request", req) - defer func() { - fmt.Println("unidirectional connect finished, elapsed", time.Since(started)) - }() - _ = stream.Send(&pb.Response{ - ConnectResponse: &pb.ConnectResponse{ - Result: &pb.ConnectResult{ - User: "test", - }, - }, - }) - <-stream.Context().Done(): - return stream.Context().Err() -} -``` - -You can also send data to the client at any point by sending `Response` with `Message`. This data will me available on the client side in `message` callback. We are showing an example below when describing bidirectional connection streams. +## Granular proxy mode -### Bidirectional connection streams +[Granular proxy mode](./proxy.md#granular-proxy-mode) works with subscription streams in the same manner as for other Centrifugo proxy types. -Same thing but allows client to stream data to the backend by calling `.send(data)` API of our bidirectional SDKs. Can be enabled by adding `proxy_stream_connect_bidirectional` flag to the configuration: +Here is an example how you can define different subscribe stream proxies for different namespaces: -```json title="config.json" +```json title=config.json { ... - "proxy_stream_connect_endpoint": "grpc://localhost:12000", - "proxy_stream_connect_timeout": "3s", - "proxy_stream_connect_bidirectional": true -} -``` - -On the backend you need to have a streaming handler which implement `ConnectBidirectional` GRPC contract: - -```php -service CentrifugoProxyStream { - ... - rpc ConnectBidirectional(stream Request) returns (stream Response); - ... -} -``` - -The first message in stream from Centrifugo to the backend always contains `ConnectRequest`. Then Centrifugo expects first message from the backend in the stream to be a `ConnectResponse` to communicate initial state for the client. All the following messages are bidirectional data message exchange. - -Here is an example of possible streaming handler - this is a simple echo server which authenticates connection and then sends all the messages received from a client back: - -```go -func (s *streamerServer) ConnectBidirectional( - stream pb.CentrifugoProxyStream_ConnectBidirectionalServer, -) error { - started := time.Now() - fmt.Println("bidirectional connect called") - defer func() { - fmt.Println("bidirectional connect finished, elapsed", time.Since(started)) - }() - // First message always contains SubscribeRequest. - req, _ := stream.Recv() - fmt.Println("connect request received", req.ConnectRequest) - _ = stream.Send(&pb.Response{ - ConnectResponse: &pb.ConnectResponse{ - Result: &pb.ConnectResult{ - User: "test", - }, - }, - }) - // The following messages contain publications from client. - for { - req, _ = stream.Recv() - data := req.Message.Data - fmt.Println("message from client", string(data)) - var cd clientData - _ = json.Unmarshal(data, &cd) - msg := &pb.Message{Data: []byte(`{"input": "` + cd.Input + `"}`)} - _ = stream.Send(&pb.Response{Message: msg}) - } + "granular_proxy_mode": true, + "proxies": [ + { + "name": "stream_1", + "endpoint": "grpc://localhost:3000", + "timeout": "500ms", + }, + { + "name": "stream_2", + "endpoint": "grpc://localhost:3001", + "timeout": "500ms", + } + ], + "namespaces": [ + { + "name": "ns1", + "subscribe_stream_proxy_name": "stream_1" + }, + { + "name": "ns2", + "subscribe_stream_proxy_name": "stream_2" + } + ] } ``` - -## Granular stream proxy mode - -At this point we are not providing a way to configure stream proxies in a granular mode similar to what we have for [connection event proxies](../server/proxy.md#granular-proxy-mode). Please reach us out if you need this feature. diff --git a/src/sidebars.js b/src/sidebars.js index bdd0a67b7..6c56b45bb 100644 --- a/src/sidebars.js +++ b/src/sidebars.js @@ -22,7 +22,7 @@ module.exports = { "server/history_and_recovery", "server/presence", "server/proxy", - // "server/proxy_streams", + "server/proxy_streams", "server/admin_web", "server/observability", "server/infra_tuning",