-
Notifications
You must be signed in to change notification settings - Fork 33
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Proposal: PubSub Subscription Streaming #52
Changes from 4 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
@@ -0,0 +1,159 @@ | ||||||||||||||||||||||||||||
# PubSub Subscription Streaming | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
* Author(s): @joshvanl | ||||||||||||||||||||||||||||
* State: Ready for Implementation | ||||||||||||||||||||||||||||
* Updated: 2024-03-05 | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
## Overview | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
This is a design proposal to implement a new Dapr runtime gRPC and HTTP API for subscription streaming. | ||||||||||||||||||||||||||||
This new gRPC & HTTP API will allow an application to subscribe to a PubSub topic and receive messages through this RPC. | ||||||||||||||||||||||||||||
Applications will be able to dynamically subscribe and unsubscribe to topics, and receive messages without opening a port to receive incoming traffic from Dapr. | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
## Background | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
Dapr supports applications subscribing to PubSub topic events. | ||||||||||||||||||||||||||||
These subscriptions can be configured either: | ||||||||||||||||||||||||||||
- `programmatically` by returning the subscription config on the app channel server on app health ready, or | ||||||||||||||||||||||||||||
- `declaratively` via Subscription yaml manifests in Self-Hosted or Kubernetes mode. | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
Today, it is not possible to dynamically update the subscription list without restarting Daprd, though hot reloading for Subscription manifests is [planned](https://github.com/dapr/dapr/issues/7139). | ||||||||||||||||||||||||||||
It is common for users to want to dynamically subscribe and unsubscribe to topics inside their applications based on runtime conditions. | ||||||||||||||||||||||||||||
In the cases where Dapr is not running as a sidecar, users often do not want to open a public port or create a tunnel in order to receive PubSub messages from Dapr. | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
A streaming Subscription API will allow applications to dynamically subscribe to PubSub topics and receive messages without opening a port to receive incoming traffic from Dapr. | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
## Expectations and alternatives | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
This proposal outlines the gRPC & HTTP streaming API for subscribing to PubSub topics. | ||||||||||||||||||||||||||||
This proposal does _not_ address any hot-reloading functionality to the existing programmatic or declarative subscription configuration. | ||||||||||||||||||||||||||||
Using a gRPC streaming API is the most natural fit for this feature, as it allows for first class long-lived bi-directional connections to Dapr to receive messages. | ||||||||||||||||||||||||||||
A supplementary WebSocket based HTTP API is useful for applications which do not have a gRPC client available or HTTP WebSockets are preferred. | ||||||||||||||||||||||||||||
These messages are typed RPC giving the best UX in each SDK. | ||||||||||||||||||||||||||||
Once implemented, this feature will need to be implemented in all Dapr SDKs. | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
## Solution | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
### gRPC | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
Rough gRPC PoC implementation: https://github.com/dapr/dapr/commit/ed40c95d11b78ab9a36a4a8f755cf89336ae5a05 | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
The Dapr runtime gRPC server will implement the following new RPC and messages: | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
```proto | ||||||||||||||||||||||||||||
service Dapr { | ||||||||||||||||||||||||||||
// SubscribeTopicEvents subscribes to a PubSub topic and receives topic events | ||||||||||||||||||||||||||||
// from it. | ||||||||||||||||||||||||||||
rpc SubscribeTopicEvents(stream SubscribeTopicEventsRequest) returns (stream TopicEventRequest) {} | ||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I was more looking for a name which covers both messages. |
||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
// SubscribeTopicEventsRequest is a message containing the details for | ||||||||||||||||||||||||||||
// subscribing to a topic via streaming. | ||||||||||||||||||||||||||||
// The first message must always be the initial request. All subsequent | ||||||||||||||||||||||||||||
// messages must be event responses. | ||||||||||||||||||||||||||||
message SubscribeTopicEventsRequest { | ||||||||||||||||||||||||||||
oneof subscribe_topic_events_request_type { | ||||||||||||||||||||||||||||
SubscribeTopicEventsSubscribeRequest request = 1; | ||||||||||||||||||||||||||||
SubscribeTopicEventsResponse event_response = 2; | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
// SubscribeTopicEventsSubscribeRequest is the initial message containing the | ||||||||||||||||||||||||||||
// details for subscribing to a topic via streaming. | ||||||||||||||||||||||||||||
message SubscribeTopicEventsSubscribeRequest { | ||||||||||||||||||||||||||||
// The name of the pubsub component | ||||||||||||||||||||||||||||
string pubsub_name = 1 [json_name = "pubsubName"]; | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
// The pubsub topic | ||||||||||||||||||||||||||||
string topic = 2 [json_name = "topic"]; | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
// The metadata passing to pub components | ||||||||||||||||||||||||||||
// | ||||||||||||||||||||||||||||
// metadata property: | ||||||||||||||||||||||||||||
// - key : the key of the message. | ||||||||||||||||||||||||||||
map<string, string> metadata = 3 [json_name = "metadata"]; | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
// dead_letter_topic is the topic to which messages that fail to be processed | ||||||||||||||||||||||||||||
// are sent. | ||||||||||||||||||||||||||||
optional string dead_letter_topic = 4 [json_name = "deadLetterTopic"]; | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
mukundansundar marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
// SubscribeTopicEventsResponse is a message containing the result of a | ||||||||||||||||||||||||||||
// subscription to a topic. | ||||||||||||||||||||||||||||
message SubscribeTopicEventsResponse { | ||||||||||||||||||||||||||||
// id is the unique identifier for the subscription request. | ||||||||||||||||||||||||||||
string id = 1 [json_name = "id"]; | ||||||||||||||||||||||||||||
mukundansundar marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
// status is the result of the subscription request. | ||||||||||||||||||||||||||||
TopicEventResponse status = 2 [json_name = "status"]; | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
``` | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
When an application wishes to subscribe to a topic it will initiate a stream with `SubscribeTopicEventsRequest`, and `Send` the initial request `SubscribeTopicEventsSubscribeRequest` containing the options for the subscription. | ||||||||||||||||||||||||||||
Daprd will then setup the machinery to add this gRPC RPC stream to the set of subscribers. | ||||||||||||||||||||||||||||
The request contains no route or path matching configuration as all events will be sent on this stream. | ||||||||||||||||||||||||||||
Subscription gRPC streams are the highest priority when Daprd determines which publisher a message should be sent to. | ||||||||||||||||||||||||||||
Only a single PubSub Topic pair may be subscribed at a single time with this API. | ||||||||||||||||||||||||||||
If the first message sent to the server is not the initial request, the RPC will return an error. | ||||||||||||||||||||||||||||
If any subsequent messages are not `SubscribeTopicEventsResponse` messages, the RPC will return an error. | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
When a message is published to the topic, Daprd will send a `TopicEventRequest` message on the stream containing the message payload and metadata. | ||||||||||||||||||||||||||||
After the application has processed the message, it will send to the server a `SubscribeTopicEventsResponse` containing the `id` of the message and the `status` of the message processing. | ||||||||||||||||||||||||||||
Since multiple messages can be sent and processed in the application at the same time, the event `id` is used by the server to track the status of each individual event. | ||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. How's the event ID computed? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I believe either from the pubsub itself, or a random UUID here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Isn't that for cloud events only? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can generate a unique id per message the same way bulk pub/sub does today |
||||||||||||||||||||||||||||
An event topic response will follow the timeout resiliency as currently exist for subscriptions. | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
Client code: | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
```go | ||||||||||||||||||||||||||||
stream, _ := client.SubscribeTopicEvents(ctx) | ||||||||||||||||||||||||||||
stream.Send(&rtv1.SubscribeTopicEventsRequest{ | ||||||||||||||||||||||||||||
SubscribeTopicEventsRequestType: &rtv1.SubscribeTopicEventsRequest_Request{ | ||||||||||||||||||||||||||||
Request: &rtv1.SubscribeTopicEventsSubscribeRequest{ | ||||||||||||||||||||||||||||
PubsubName: "mypub", Topic: "a", | ||||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||||
}) | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
client.PublishEvent(ctx, &rtv1.PublishEventRequest{ | ||||||||||||||||||||||||||||
PubsubName: "mypub", Topic: "a", | ||||||||||||||||||||||||||||
Data: []byte(`{"status": "completed"}`), | ||||||||||||||||||||||||||||
DataContentType: "application/json", | ||||||||||||||||||||||||||||
}) | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
event, _ := stream.Recv() | ||||||||||||||||||||||||||||
stream.Send(&rtv1.SubscribeTopicEventsRequest{ | ||||||||||||||||||||||||||||
SubscribeTopicEventsRequestType: &rtv1.SubscribeTopicEventsRequest_EventResponse{ | ||||||||||||||||||||||||||||
EventResponse: &rtv1.SubscribeTopicEventsResponse{ | ||||||||||||||||||||||||||||
Id: event.Id, | ||||||||||||||||||||||||||||
Status: &rtv1.TopicEventResponse{Status: rtv1.TopicEventResponse_SUCCESS}, | ||||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||||
}, | ||||||||||||||||||||||||||||
}) | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
stream.CloseSend() | ||||||||||||||||||||||||||||
``` | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
### HTTP (WebSockets) | ||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. WebSockets make things a lot more complicated, including things like authz. We support HTTP/2 now so we should just use server-sent events (since we don't need bi-di streaming). It's much simpler to implement and use, and doesn't introduce a lot more dependencies on the server. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We can't use unidirectional streaming because the client needs to send back information to the server (Dapr), which is the status for each message received. From the auth perspective this wouldn't be different from how an app authenticates to Dapr today (via an access token, and unauthenticated by default) and even in browser to server scenarios a client access token is used. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The send-back can be done with regular HTTP requests. When using HTTP/2, they use the same TCP socket, so this wouldn't be an issue. I would recommend avoiding adding another protocol as it does make a lot of things more complicated.
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The auth remains token based auth, but it doesn't have to pass through a header in this case. The initial handshake contains the token as described in the link you posted. This seems like a standard way for client side auth and is also used by the Azure Websockets (PubSub) service, as one example. Re: web browser, it'd actually be great to make this HTTP path usable from web browsers, where http/2 doesn't have the same support as websockets. This will provide a clearer usage pattern where backend services can connect over gRPC while web services and/or any client that doesn't have gRPC or http/2 support can use websockets. The only issue we face with websockets is the separate socket issue. If the same port cannot be shared with the existing HTTP server and the Websockets server than this is likely a non-starter. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Correct, but that's part of the application protocol we need to define, and it does add complexity.
Support for HTTP/2 in browser is just as much as WebSockets. In any case, HTTP/2 is a nice-to-have for long-lived connections but not a requirement (using HTTP/1 would be fine, but would require one separate TCP socket).
Separate socket, not port. So the long-lived connection requires its own TCP socket between the app and Dapr (1 TCP socket for the WebSockets long-lived stream, and then each request from the app will have its own). While in the case of HTTP/2 there's multiplexing being used, so there is at most 1 TCP socket being used (the long-lived stream can share the same socket as every request going to Dapr). This does improve efficiency. But it doesn't require a separate port - the same Go server can serve HTTP/1, HTTP/2, and WebSockets (but this only over HTTP/1 and not HTTP/2, per issue above) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, so if we don't need a separate port and there's a path forward for HTTP/2 support for WebSockets then I'm comfortable using WebSockets HTTP 1.1 to start with There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The issue for adding WebSockets over HTTP/2 was opened in 2021 and looks like it's not being worked on. It seems that it is not a priority (understandably, as WebSockets is not as popular as it once was). I think enabling a new communication protocol should be something not done lightly, as we'll be on the hook to support that for a very long time. There's nothing inherently wrong with HTTP/2, which is already supported and we have all tooling and test in place. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The user experience that comes out of having a full-duplex bidi stream with the receive message/reply to server commands encapsulated within WebSockets is preferable to me than what the experience would be like with SSE, moreover the latter being limited to a text format. We can always change to HTTP2/SSE if we're seeing a reason to do so before moving the API to stable There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Although WebSocket is not that popular, it is still very useful in full-duplex bidi stream case. I think it is a good start and we can Iterate it based on users' response and so on. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I am open to looking at the maintainability aspect of Websockets and we could also potentially ping for the interest in the duplex stream from the community. |
||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
Along with a gRPC based streaming API, a WebSocket based HTTP equivalent API will be implemented. | ||||||||||||||||||||||||||||
Much like the gRPC API, the HTTP based WebSocket API will follow an initial request-response handshake, followed by a stream of messages to the client with status responses by the client, indexed by the message ID. | ||||||||||||||||||||||||||||
The same proto types as using in the gRPC API (but in JSON blobs) will be used for the HTTP API. | ||||||||||||||||||||||||||||
JoshVanL marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||||
The server WebSocket implementation will be based on the [gorilla/websocket](https://github.com/gorilla/websocket) package, as this seems well used, understood and maintained. | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
The HTTP streaming API will be available at the following endpoint. | ||||||||||||||||||||||||||||
As the pubsub and topic information is in the request body, no request configuration is given in the URL. | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
``` | ||||||||||||||||||||||||||||
GET: /v1.0/subscribe | ||||||||||||||||||||||||||||
``` | ||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. can we have a request response sample for this endpoint? |
||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
## Completion Checklist | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
- [ ] gRPC server implementation in daprd | ||||||||||||||||||||||||||||
ItalyPaleAle marked this conversation as resolved.
Show resolved
Hide resolved
|
||||||||||||||||||||||||||||
- [ ] API documentation | ||||||||||||||||||||||||||||
- [ ] SDK implementations | ||||||||||||||||||||||||||||
- [ ] .Net | ||||||||||||||||||||||||||||
- [ ] Java | ||||||||||||||||||||||||||||
- [ ] Go | ||||||||||||||||||||||||||||
- [ ] Python | ||||||||||||||||||||||||||||
- [ ] JavaScript | ||||||||||||||||||||||||||||
Comment on lines
+190
to
+195
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Please can I propose that this proposal tracks the Rust-SDK too :) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
later on, would we want to use the same RPC for bulk subscribe as well?
If so should the TopicEventRequest stream be wrapped by another message so that we can extend to Bulk Subscribe event as well?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, we can wrap the
TopicEventRequest
for doing bulk messages, though don't necessarily see the use case for this.