Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Proposal: PubSub Subscription Streaming #52

Merged
merged 5 commits into from
Apr 8, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
195 changes: 195 additions & 0 deletions 0013-RS-pubsub-subscription-streaming.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
# PubSub Subscription Streaming

* Author(s): @joshvanl
* State: Ready for Implementation
* Updated: 2024-03-05

## Overview

This is a design proposal to implement a new Dapr runtime gRPC and HTTP API for subscription streaming.
This new gRPC & HTTP API will allow an application to subscribe to a PubSub topic and receive messages through this RPC.
Applications will be able to dynamically subscribe and unsubscribe to topics, and receive messages without opening a port to receive incoming traffic from Dapr.

## Background

Dapr supports applications subscribing to PubSub topic events.
These subscriptions can be configured either:
- `programmatically` by returning the subscription config on the app channel server on app health ready, or
- `declaratively` via Subscription yaml manifests in Self-Hosted or Kubernetes mode.

Today, it is not possible to dynamically update the subscription list without restarting Daprd, though hot reloading for Subscription manifests is [planned](https://github.com/dapr/dapr/issues/7139).
It is common for users to want to dynamically subscribe and unsubscribe to topics inside their applications based on runtime conditions.
In the cases where Dapr is not running as a sidecar, users often do not want to open a public port or create a tunnel in order to receive PubSub messages from Dapr.

A streaming Subscription API will allow applications to dynamically subscribe to PubSub topics and receive messages without opening a port to receive incoming traffic from Dapr.

## Expectations and alternatives

This proposal outlines the gRPC & HTTP streaming API for subscribing to PubSub topics.
This proposal does _not_ address any hot-reloading functionality to the existing programmatic or declarative subscription configuration.
Using a gRPC streaming API is the most natural fit for this feature, as it allows for first class long-lived bi-directional connections to Dapr to receive messages.
A supplementary WebSocket based HTTP API is useful for applications which do not have a gRPC client available or HTTP WebSockets are preferred.
These messages are typed RPC giving the best UX in each SDK.
Once implemented, this feature will need to be implemented in all Dapr SDKs.

## Solution

### gRPC

Rough gRPC PoC implementation: https://github.com/dapr/dapr/commit/ed40c95d11b78ab9a36a4a8f755cf89336ae5a05

The Dapr runtime gRPC server will implement the following new RPC and messages:

```proto
service Dapr {
// SubscribeTopicEventsAlpha1 subscribes to a PubSub topic and receives topic events
// from it.
rpc SubscribeTopicEventsAlpha1(stream SubscribeTopicEventsRequestAlpha1) returns (stream TopicEventRequestAlpha1) {}
}

// SubscribeTopicEventsRequest is a message containing the details for
// subscribing to a topic via streaming.
// The first message must always be the initial request. All subsequent
// messages must be event responses.
message SubscribeTopicEventsRequestAlpha1 {
oneof subscribe_topic_events_request_type {
SubscribeTopicEventsSubscribeRequestAlpha1 request = 1;
SubscribeTopicEventsResponseAlpha1 event_response = 2;
}
}

// SubscribeTopicEventsSubscribeRequest is the initial message containing the
// details for subscribing to a topic via streaming.
message SubscribeTopicEventsSubscribeRequestAlpha1 {
// The name of the pubsub component
string pubsub_name = 1 [json_name = "pubsubName"];

// The pubsub topic
string topic = 2 [json_name = "topic"];

// The metadata passing to pub components
//
// metadata property:
// - key : the key of the message.
optional map<string, string> metadata = 3 [json_name = "metadata"];

// dead_letter_topic is the topic to which messages that fail to be processed
// are sent.
optional string dead_letter_topic = 4 [json_name = "deadLetterTopic"];

// max_in_flight_messages is the maximum number of in-flight messages that
// can be processed by the subscriber at any given time.
// Default is no limit.
optional max_in_flight_messages = 5 [json_name = "maxInFlightMessages"];
}

// SubscribeTopicEventsResponse is a message containing the result of a
// subscription to a topic.
message SubscribeTopicEventsResponseAlpha1 {
// id is the unique identifier for the subscription request.
string id = 1 [json_name = "id"];
mukundansundar marked this conversation as resolved.
Show resolved Hide resolved

// status is the result of the subscription request.
TopicEventResponseAlpha1 status = 2 [json_name = "status"];
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TopicEventResponse refers to a response on a published message am I right in reading this as it being used as a response for the subscription response too? If so seems a bit too tightly coupled in my opinion

}
```

When an application wishes to subscribe to a topic it will initiate a stream with `SubscribeTopicEventsRequest`, and `Send` the initial request `SubscribeTopicEventsSubscribeRequest` containing the options for the subscription.
Daprd will then setup the machinery to add this gRPC RPC stream to the set of subscribers.
The request contains no route or path matching configuration as all events will be sent on this stream.
Subscription gRPC streams are the highest priority when Daprd determines which publisher a message should be sent to.
Only a single PubSub Topic pair may be subscribed at a single time with this API.
If the first message sent to the server is not the initial request, the RPC will return an error.
If any subsequent messages are not `SubscribeTopicEventsResponse` messages, the RPC will return an error.

When a message is published to the topic, Daprd will send a `TopicEventRequest` message on the stream containing the message payload and metadata.
After the application has processed the message, it will send to the server a `SubscribeTopicEventsResponse` containing the `id` of the message and the `status` of the message processing.
Since multiple messages can be sent and processed in the application at the same time, the event `id` is used by the server to track the status of each individual event.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How's the event ID computed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe either from the pubsub itself, or a random UUID here.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't that for cloud events only?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can generate a unique id per message the same way bulk pub/sub does today

An event topic response will follow the timeout resiliency as currently exist for subscriptions.

Client code:

```go
stream, _ := client.SubscribeTopicEventsAlpha1(ctx)
stream.Send(&rtv1.SubscribeTopicEventsRequestAlpha1{
SubscribeTopicEventsRequestTypeAlpha1: &rtv1.SubscribeTopicEventsRequest_RequestAlpha1{
Request: &rtv1.SubscribeTopicEventsSubscribeRequestAlpha1{
PubsubName: "mypub", Topic: "a",
},
},
})

client.PublishEvent(ctx, &rtv1.PublishEventRequest{
PubsubName: "mypub", Topic: "a",
Data: []byte(`{"status": "completed"}`),
DataContentType: "application/json",
})

event, _ := stream.Recv()
stream.Send(&rtv1.SubscribeTopicEventsRequestAlpha1{
SubscribeTopicEventsRequestType: &rtv1.SubscribeTopicEventsRequest_EventResponseAlpha1{
EventResponse: &rtv1.SubscribeTopicEventsResponseAlpha1{
Id: event.Id,
Status: &rtv1.TopicEventResponse{Status: rtv1.TopicEventResponse_SUCCESS},
},
},
})

stream.CloseSend()
```

### HTTP (WebSockets)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

WebSockets make things a lot more complicated, including things like authz. We support HTTP/2 now so we should just use server-sent events (since we don't need bi-di streaming). It's much simpler to implement and use, and doesn't introduce a lot more dependencies on the server.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can't use unidirectional streaming because the client needs to send back information to the server (Dapr), which is the status for each message received. From the auth perspective this wouldn't be different from how an app authenticates to Dapr today (via an access token, and unauthenticated by default) and even in browser to server scenarios a client access token is used.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The send-back can be done with regular HTTP requests. When using HTTP/2, they use the same TCP socket, so this wouldn't be an issue.

I would recommend avoiding adding another protocol as it does make a lot of things more complicated.

  1. Go doesn't support WebSockets over HTTP/2, but only HTTP/1, so the app requires a separate socket: proposal: x/net/http2: support for WebSockets over HTTP/2 golang/go#49918
  2. When connecting to a WS using a web browser, you can't customize the headers, so Dapr authentication wouldn't be usable (https://devcenter.heroku.com/articles/websocket-security#authentication-authorization). I understand that connecting directly to Dapr through a web browser is not officially supported, but some customers do that. Hence why I mentioned auth is more challenging.
  3. WebSockets also adds complexities when dealing with proxies (this may be a problem for some Diagrid services too presumably :) ), and local development requires tooling that is WebSockets-aware.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The auth remains token based auth, but it doesn't have to pass through a header in this case. The initial handshake contains the token as described in the link you posted. This seems like a standard way for client side auth and is also used by the Azure Websockets (PubSub) service, as one example.

Re: web browser, it'd actually be great to make this HTTP path usable from web browsers, where http/2 doesn't have the same support as websockets. This will provide a clearer usage pattern where backend services can connect over gRPC while web services and/or any client that doesn't have gRPC or http/2 support can use websockets.

The only issue we face with websockets is the separate socket issue. If the same port cannot be shared with the existing HTTP server and the Websockets server than this is likely a non-starter.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The auth remains token based auth, but it doesn't have to pass through a header in this case. The initial handshake contains the token as described in the link you posted.

Correct, but that's part of the application protocol we need to define, and it does add complexity.

it'd actually be great to make this HTTP path usable from web browsers, where http/2 doesn't have the same support as websockets

Support for HTTP/2 in browser is just as much as WebSockets. In any case, HTTP/2 is a nice-to-have for long-lived connections but not a requirement (using HTTP/1 would be fine, but would require one separate TCP socket).

The only issue we face with websockets is the separate socket issue. If the same port cannot be shared with the existing HTTP server and the Websockets server than this is likely a non-starter.

Separate socket, not port. So the long-lived connection requires its own TCP socket between the app and Dapr (1 TCP socket for the WebSockets long-lived stream, and then each request from the app will have its own). While in the case of HTTP/2 there's multiplexing being used, so there is at most 1 TCP socket being used (the long-lived stream can share the same socket as every request going to Dapr). This does improve efficiency. But it doesn't require a separate port - the same Go server can serve HTTP/1, HTTP/2, and WebSockets (but this only over HTTP/1 and not HTTP/2, per issue above)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so if we don't need a separate port and there's a path forward for HTTP/2 support for WebSockets then I'm comfortable using WebSockets HTTP 1.1 to start with

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The issue for adding WebSockets over HTTP/2 was opened in 2021 and looks like it's not being worked on. It seems that it is not a priority (understandably, as WebSockets is not as popular as it once was).

I think enabling a new communication protocol should be something not done lightly, as we'll be on the hook to support that for a very long time. There's nothing inherently wrong with HTTP/2, which is already supported and we have all tooling and test in place.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The user experience that comes out of having a full-duplex bidi stream with the receive message/reply to server commands encapsulated within WebSockets is preferable to me than what the experience would be like with SSE, moreover the latter being limited to a text format. We can always change to HTTP2/SSE if we're seeing a reason to do so before moving the API to stable

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Although WebSocket is not that popular, it is still very useful in full-duplex bidi stream case. I think it is a good start and we can Iterate it based on users' response and so on.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am open to looking at the maintainability aspect of Websockets and we could also potentially ping for the interest in the duplex stream from the community.
But this proposal looks good for gRPC as a starter.


Along with a gRPC based streaming API, a WebSocket based HTTP equivalent API will be implemented.
Much like the gRPC API, the HTTP based WebSocket API will follow an initial request-response handshake, followed by a stream of messages to the client with status responses by the client, indexed by the message ID.
The same proto types as using in the gRPC API (but in JSON blobs) will be used for the HTTP API.
JoshVanL marked this conversation as resolved.
Show resolved Hide resolved
The server WebSocket implementation will be based on the [gorilla/websocket](https://github.com/gorilla/websocket) package, as this seems well used, understood and maintained.

The HTTP streaming API will be available at the following endpoint.
As the pubsub and topic information is in the request body, no request configuration is given in the URL.

```
GET: /v1.0-alpha1/subscribe
```

```json
INITIAL_REQUEST (to server) = {
"pubsubName": "mypub",
"topic": "a",
"metadata": {
"key": "value"
},
"deadLetterTopic": "dead-letter-topic",
"maxInFlightMessages": 10
}

TOPIC_EVENT_REQUEST (to application) = {
"id": "123",
"source": "asource",
"type": "atype",
"spec_version": "1.0",
"data_content_type": "application/json",
"data": "abc",
"topic": "a",
"pubsub_name": "mypub",
"path": "/"
}

TOPIC_EVENT_RESPONSE (to server) = {
"id": "123",
"status": {
"status": "SUCCESS"
}
}
```

## Completion Checklist

- [ ] gRPC server implementation in daprd
ItalyPaleAle marked this conversation as resolved.
Show resolved Hide resolved
- [ ] API documentation
- [ ] SDK implementations
- [ ] .Net
- [ ] Java
- [ ] Go
- [ ] Python
- [ ] JavaScript
Comment on lines +190 to +195
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- [ ] SDK implementations
- [ ] .Net
- [ ] Java
- [ ] Go
- [ ] Python
- [ ] JavaScript
- [ ] SDK implementations
- [ ] .Net
- [ ] Java
- [ ] Go
- [ ] Python
- [ ] JavaScript
- [ ] Rust

Please can I propose that this proposal tracks the Rust-SDK too :)