-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add SSE streaming via /events #791
Merged
Merged
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
bc1b14a
to
894fb7f
Compare
2f45396
to
81905c8
Compare
84a6baa
to
c0dba9f
Compare
9eb98f8
to
6a4d641
Compare
5f64987
to
687bf7b
Compare
adfa48e
to
659bfcb
Compare
07a6a0e
to
da30ecd
Compare
We originally used slogin but switched away from it due to some limitations. We forgot to get the request id from the context via our own function. It was thus not found in the context.
tonsV2
requested changes
Jun 20, 2024
its not needed
so use matchUnfiltered as the consumer should not get messages that have no group
the predicates do return false on error but it looks odd that we do not return
tonsV2
requested changes
Jun 21, 2024
tonsV2
approved these changes
Jun 21, 2024
radnov
approved these changes
Jun 21, 2024
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🚀
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
This PR allows users to stream instance manager events via SSE. An instance manager event can be anything from "Your DB has been saved" to changes to a pod. This PR only implements the event consuming side. Another PR producing the first instance manager event will follow.
Use cases
These use cases are supported:
Note that in case C resume will only work if the EventSource got an event (id) before it lost connection. Only then can it send the HTTP header
Last-Event-ID
which allows us to resume. Otherwise, the user will get new messages only as in case A.Architecture
Clients can stream events via HTTP server-sent events. Our web UI will rely on EventSource to establish, maintain the connection and deliver the SSE events to callbacks.
The instance manager opens a consumer to a RabbitMQ stream for every user connecting to the HTTP /events endpoint. By default (if no HTTP header
Last-Event-ID
) is sent new events will be relayed from RabbitMQ to the user via SSE. If HTTP headerLast-Event-ID
is sent then message fromLast-Event-ID
+1 will be sent.Infrastructure Changes
We needed to do a couple of things to use RabbitMQ streaming with filtering
rabbitmq_stream
kubectl exec -n namespace rabbitmq -it -- rabbitmqctl enable_feature_flag all
Message Retention
Some napkin math 🔢 first:
At some point we want to push k8s status updates via SSE. I watched k8s pod events for a day on all namespaces and saw k8s event numbers of
Note: the numbers are just captured during one day. Activity on other days might be higher and/or grow over time as more users become active.
I sent
20974
messages with this RabbitMQ message data was (add the application headers of group and kind)This resulted in 12M total
RabbitMQ allows us to set retention policies for max segment size, max stream size and max age:
https://www.cloudamqp.com/blog/rabbitmq-streams-and-replay-features-part-3-limits-and-configurations-for-streams-in-rabbitmq.html
https://groups.google.com/g/rabbitmq-users/c/TQG_nE2m4GQ
We decided we want to keep messages for 1h. Our types of messages take up little space
20974 messages / 12MB
. Even storing12000 MB
worth of messages would not cause us harm on disk. These would make up~ 2.1 million
messages which if we would reach in 1h would highlight a much bigger problem. This and the fact that the max age and stream size would be combined in an and is why we only pick the max age retention.We picked
Testing
We have an integration test with 2 users streaming events. The users are in a shared group and one of them is in an exclusive group. We then test the routing logic of users only getting events they should see. We also test resuming when a connection is cancelled.
Docker issues
We switched to using the Docker image in our tests https://github.com/bitnami/containers/tree/main/bitnami/rabbitmq as this is what we use in the cluster.
https://www.rabbitmq.com/blog/2021/07/23/connecting-to-streams When connecting to a stream we pass a URI but the clients will then ask the RabbitMQ nodes for their host/port and use that to stream. This is configured via advertised_host/port in
RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS
orrabbitmq.conf
. This config cannot be changed during runtime.Our tests run outside of Docker while RabbitMQ is in a container. The advertised_port is
5552
by default. If we rely on Docker picking a random port our Go tests will not be able to connect. We thus expose the fixed port5552
. We setadvertised_host
to localhost as our host is not able to resolve the Docker container name or IP (at least not without more ⛑️).The Bitnami image does not allow setting
RABBITMQ_SERVER_ADDITIONAL_ERL_ARGS
an env var so we need to mount a config file.Gnomock does not allow mounting a file 😓 Gnomock also uses the official RabbitMQ image. We thus have to use testcontainers for more control over the Docker container. We only use testcontainers for RabbitMQ.
Additional reading