Skip to content

Commit

Permalink
feat: add SSE streaming via /events
Browse files Browse the repository at this point in the history
  • Loading branch information
teleivo committed Jun 19, 2024
1 parent d66665c commit adfa48e
Show file tree
Hide file tree
Showing 16 changed files with 1,169 additions and 58 deletions.
1 change: 1 addition & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ GIN_MODE=debug

RABBITMQ_HOST=rabbitmq
RABBITMQ_PORT=5672
RABBITMQ_STREAM_PORT=5552
RABBITMQ_USERNAME=guest
RABBITMQ_PASSWORD=guest

Expand Down
36 changes: 32 additions & 4 deletions cmd/serve/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,18 @@ import (
"fmt"
"log/slog"
"os"
"time"

"github.com/go-mail/mail"
"github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream"

"github.com/aws/aws-sdk-go-v2/aws"
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
"github.com/aws/aws-sdk-go-v2/service/s3"
"github.com/dhis2-sre/im-manager/internal/log"
"github.com/dhis2-sre/im-manager/internal/middleware"
"github.com/dhis2-sre/im-manager/pkg/database"
"github.com/dhis2-sre/im-manager/pkg/event"
"github.com/dhis2-sre/im-manager/pkg/group"
"github.com/dhis2-sre/im-manager/pkg/model"
"github.com/dhis2-sre/im-manager/pkg/token"
Expand Down Expand Up @@ -72,7 +75,7 @@ func run() (err error) {
logger := slog.New(log.New(slog.NewJSONHandler(os.Stdout, nil)))
db, err := storage.NewDatabase(logger, cfg.Postgresql)
if err != nil {
return err
return fmt.Errorf("failed to setup DB: %v", err)
}

userRepository := user.NewRepository(db)
Expand Down Expand Up @@ -125,13 +128,13 @@ func run() (err error) {

host := hostname()
consumer, err := rabbitmq.NewConsumer(
cfg.RabbitMqURL.GetUrl(),
cfg.RabbitMqURL.GetURI(),
rabbitmq.WithConnectionName(host),
rabbitmq.WithConsumerTagPrefix(host),
rabbitmq.WithLogger(logger.WithGroup("rabbitmq")),
)
if err != nil {
return err
return fmt.Errorf("failed to setup RabbitMQ consumer: %v", err)
}
defer consumer.Close()

Expand All @@ -153,7 +156,7 @@ func run() (err error) {

s3Config, err := newS3Config(cfg.S3Region, s3Endpoint)
if err != nil {
return err
return fmt.Errorf("failed to setup S3 config: %v", err)
}

s3AWSClient := s3.NewFromConfig(s3Config, func(o *s3.Options) {
Expand All @@ -173,6 +176,30 @@ func run() (err error) {

integrationHandler := integration.NewHandler(dockerHubClient, cfg.InstanceService.Host, cfg.DatabaseManagerService.Host)

logger.Info("Connecting with RabbitMQ stream client", "host", cfg.RabbitMqURL.Host, "port", cfg.RabbitMqURL.StreamPort)
env, err := stream.NewEnvironment(
stream.NewEnvironmentOptions().
SetHost(cfg.RabbitMqURL.Host).
SetPort(cfg.RabbitMqURL.StreamPort).
SetUser(cfg.RabbitMqURL.Username).
SetPassword(cfg.RabbitMqURL.Password).
SetAddressResolver(stream.AddressResolver{Host: cfg.RabbitMqURL.Host, Port: cfg.RabbitMqURL.StreamPort}),
)
if err != nil {
return fmt.Errorf("failed to connect with RabbitMQ stream client: %v", err)
}
logger.Info("Connected with RabbitMQ stream client", "host", cfg.RabbitMqURL.Host, "port", cfg.RabbitMqURL.StreamPort)
streamName := "events"
err = env.DeclareStream(streamName,
stream.NewStreamOptions().
SetMaxSegmentSizeBytes(stream.ByteCapacity{}.MB(1)).
SetMaxAge(1*time.Hour).
SetMaxLengthBytes(stream.ByteCapacity{}.GB(1)))
if err != nil {
return fmt.Errorf("failed to declare RabbitMQ stream %q: %v", streamName, err)
}
eventHandler := event.NewHandler(logger, env, streamName)

err = user.CreateUser(cfg.AdminUser.Email, cfg.AdminUser.Password, userService, groupService, model.AdministratorGroupName, "admin")
if err != nil {
return err
Expand Down Expand Up @@ -201,6 +228,7 @@ func run() (err error) {
integration.Routes(r, authentication, integrationHandler)
database.Routes(r, authentication.TokenAuthentication, databaseHandler)
instance.Routes(r, authentication.TokenAuthentication, instanceHandler)
event.Routes(r, authentication.TokenAuthentication, eventHandler)

logger.Info("Listening and serving HTTP")
if err := r.Run(); err != nil {
Expand Down
12 changes: 9 additions & 3 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,16 @@ services:
- prod

rabbitmq:
image: rabbitmq:3-management-alpine
image: bitnami/rabbitmq:3.13
ports:
- "5672:5672"
- "15672:15672"
- "127.0.0.1:5672:5672" # regular AMQP
- "127.0.0.1:15672:15672" # management UI
environment:
RABBITMQ_USERNAME: ${RABBITMQ_USERNAME}
RABBITMQ_PASSWORD: ${RABBITMQ_PASSWORD}
RABBITMQ_MANAGEMENT_ALLOW_WEB_ACCESS: true
RABBITMQ_DISK_FREE_ABSOLUTE_LIMIT: "100MB"
RABBITMQ_PLUGINS: "rabbitmq_management,rabbitmq_management_agent,rabbitmq_stream,rabbitmq_stream_management"
healthcheck:
# https://www.rabbitmq.com/docs/monitoring#stage-3
test: rabbitmq-diagnostics -q check_running && rabbitmq-diagnostics -q check_local_alarms
Expand Down
34 changes: 28 additions & 6 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ require (
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.0
github.com/aws/aws-sdk-go-v2/service/s3 v1.56.0
github.com/dhis2-sre/rabbitmq-client v0.3.1
github.com/docker/go-connections v0.5.0
github.com/dominikbraun/graph v0.23.0
github.com/gin-contrib/cors v1.7.2
github.com/gin-contrib/sse v0.1.0
github.com/gin-gonic/gin v1.10.0
github.com/go-mail/mail v2.3.1+incompatible
github.com/go-openapi/runtime v0.28.0
Expand All @@ -25,7 +27,9 @@ require (
github.com/orandin/slog-gorm v1.3.2
github.com/orlangure/gnomock v0.30.0
github.com/rabbitmq/amqp091-go v1.10.0
github.com/rabbitmq/rabbitmq-stream-go-client v1.4.4
github.com/stretchr/testify v1.9.0
github.com/testcontainers/testcontainers-go v0.30.0
go.mozilla.org/sops/v3 v3.7.3
golang.org/x/crypto v0.24.0
golang.org/x/exp v0.0.0-20231108232855-2478ac86f678
Expand All @@ -39,7 +43,9 @@ require (
require (
cloud.google.com/go/compute v1.24.0 // indirect
cloud.google.com/go/compute/metadata v0.2.3 // indirect
dario.cat/mergo v1.0.0 // indirect
github.com/Azure/azure-sdk-for-go v63.3.0+incompatible // indirect
github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect
github.com/Azure/go-autorest v14.2.0+incompatible // indirect
github.com/Azure/go-autorest/autorest v0.11.26 // indirect
github.com/Azure/go-autorest/autorest/adal v0.9.18 // indirect
Expand All @@ -51,6 +57,7 @@ require (
github.com/Azure/go-autorest/logger v0.2.1 // indirect
github.com/Azure/go-autorest/tracing v0.6.0 // indirect
github.com/Microsoft/go-winio v0.6.1 // indirect
github.com/Microsoft/hcsshim v0.11.4 // indirect
github.com/ProtonMail/go-crypto v0.0.0-20230923063757-afb1ddc0824c // indirect
github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 // indirect
github.com/aws/aws-sdk-go v1.47.13 // indirect
Expand All @@ -73,26 +80,29 @@ require (
github.com/bytedance/sonic v1.11.6 // indirect
github.com/bytedance/sonic/loader v0.1.1 // indirect
github.com/cenkalti/backoff/v3 v3.2.2 // indirect
github.com/cenkalti/backoff/v4 v4.3.0 // indirect
github.com/cloudflare/circl v1.3.7 // indirect
github.com/cloudwego/base64x v0.1.4 // indirect
github.com/cloudwego/iasm v0.2.0 // indirect
github.com/containerd/containerd v1.7.12 // indirect
github.com/containerd/log v0.1.0 // indirect
github.com/cpuguy83/dockercfg v0.3.1 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect
github.com/dimchansky/utfbom v1.1.1 // indirect
github.com/distribution/reference v0.5.0 // indirect
github.com/docker/docker v25.0.5+incompatible // indirect
github.com/docker/go-connections v0.5.0 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/emicklei/go-restful/v3 v3.11.0 // indirect
github.com/evanphx/json-patch v5.7.0+incompatible // indirect
github.com/fatih/color v1.16.0 // indirect
github.com/felixge/httpsnoop v1.0.4 // indirect
github.com/gabriel-vasile/mimetype v1.4.3 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
github.com/go-errors/errors v1.5.1 // indirect
github.com/go-jose/go-jose/v3 v3.0.3 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
github.com/go-ole/go-ole v1.3.0 // indirect
github.com/go-openapi/analysis v0.23.0 // indirect
github.com/go-openapi/errors v0.22.0 // indirect
github.com/go-openapi/jsonpointer v0.21.0 // indirect
Expand All @@ -110,10 +120,10 @@ require (
github.com/golang-jwt/jwt/v4 v4.3.0 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/btree v1.1.2 // indirect
github.com/google/gnostic-models v0.6.9-0.20230804172637-c7be7c783f49 // indirect
github.com/google/gofuzz v1.2.0 // indirect
github.com/google/pprof v0.0.0-20230926050212-f7f687d19a98 // indirect
github.com/google/s2a-go v0.1.7 // indirect
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect
github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect
Expand Down Expand Up @@ -153,36 +163,49 @@ require (
github.com/lestrrat-go/iter v1.0.2 // indirect
github.com/lestrrat-go/option v1.0.1 // indirect
github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de // indirect
github.com/lufia/plan9stats v0.0.0-20231016141302-07b5767bb0ed // indirect
github.com/magiconair/properties v1.8.7 // indirect
github.com/mailru/easyjson v0.7.7 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mitchellh/go-homedir v1.1.0 // indirect
github.com/mitchellh/go-wordwrap v1.0.1 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/moby/patternmatcher v0.6.0 // indirect
github.com/moby/spdystream v0.2.0 // indirect
github.com/moby/sys/sequential v0.5.0 // indirect
github.com/moby/sys/user v0.1.0 // indirect
github.com/moby/term v0.5.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/monochromegane/go-gitignore v0.0.0-20200626010858-205db1a8cc00 // indirect
github.com/morikuni/aec v1.0.0 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/nxadm/tail v1.4.8 // indirect
github.com/oklog/ulid v1.3.1 // indirect
github.com/onsi/ginkgo/v2 v2.13.0 // indirect
github.com/onsi/gomega v1.28.0 // indirect
github.com/opencontainers/go-digest v1.0.0 // indirect
github.com/opencontainers/image-spec v1.1.0 // indirect
github.com/pelletier/go-toml/v2 v2.2.2 // indirect
github.com/peterbourgon/diskv v2.0.1+incompatible // indirect
github.com/pierrec/lz4 v2.6.1+incompatible // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b // indirect
github.com/ryanuber/go-glob v1.0.0 // indirect
github.com/segmentio/asm v1.2.0 // indirect
github.com/shirou/gopsutil/v3 v3.23.12 // indirect
github.com/shoenig/go-m1cpu v0.1.6 // indirect
github.com/sirupsen/logrus v1.9.3 // indirect
github.com/spaolacci/murmur3 v1.1.0 // indirect
github.com/spf13/cobra v1.8.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stretchr/objx v0.5.2 // indirect
github.com/tklauser/go-sysconf v0.3.12 // indirect
github.com/tklauser/numcpus v0.6.1 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/ugorji/go/codec v1.2.12 // indirect
github.com/xlab/treeprint v1.2.0 // indirect
github.com/yusufpapurcu/wmi v1.2.3 // indirect
go.mongodb.org/mongo-driver v1.14.0 // indirect
go.mozilla.org/gopgagent v0.0.0-20170926210634-4d7ea76ff71a // indirect
go.opencensus.io v0.24.0 // indirect
Expand Down Expand Up @@ -218,7 +241,6 @@ require (
gopkg.in/urfave/cli.v1 v1.20.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
gotest.tools/v3 v3.5.0 // indirect
k8s.io/cli-runtime v0.28.3 // indirect
k8s.io/klog/v2 v2.110.1 // indirect
k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 // indirect
Expand Down
Loading

0 comments on commit adfa48e

Please sign in to comment.