From 02860b10bfe2bda030eb6d4af5e3caeef93a2e93 Mon Sep 17 00:00:00 2001 From: teleivo Date: Wed, 19 Jun 2024 06:16:46 +0200 Subject: [PATCH 01/20] feat: add SSE streaming via /events --- .env.example | 1 + cmd/serve/main.go | 53 ++-- docker-compose.yml | 12 +- go.mod | 32 ++- go.sum | 54 +++- pkg/config/config.go | 21 +- pkg/event/docs.go | 9 + pkg/event/event_integration_test.go | 340 ++++++++++++++++++++++++ pkg/event/handler.go | 270 +++++++++++++++++++ pkg/event/handler_test.go | 125 +++++++++ pkg/event/router.go | 11 + pkg/instance/ttlDestroyConsumer_test.go | 4 +- pkg/inttest/rabbitmq.go | 281 ++++++++++++++++++-- scripts/instances/events.sh | 7 + skaffold.yaml | 10 +- swagger/swagger.yaml | 34 +++ 16 files changed, 1190 insertions(+), 74 deletions(-) create mode 100644 pkg/event/docs.go create mode 100644 pkg/event/event_integration_test.go create mode 100644 pkg/event/handler.go create mode 100644 pkg/event/handler_test.go create mode 100644 pkg/event/router.go create mode 100755 scripts/instances/events.sh diff --git a/.env.example b/.env.example index 14d4fe0e..3a7ff00c 100644 --- a/.env.example +++ b/.env.example @@ -40,6 +40,7 @@ GIN_MODE=debug RABBITMQ_HOST=rabbitmq RABBITMQ_PORT=5672 +RABBITMQ_STREAM_PORT=5552 RABBITMQ_USERNAME=guest RABBITMQ_PASSWORD=guest diff --git a/cmd/serve/main.go b/cmd/serve/main.go index e1238c07..da33cd05 100644 --- a/cmd/serve/main.go +++ b/cmd/serve/main.go @@ -27,8 +27,10 @@ import ( "fmt" "log/slog" "os" + "time" "github.com/go-mail/mail" + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/feature/s3/manager" @@ -36,6 +38,7 @@ import ( "github.com/dhis2-sre/im-manager/internal/log" "github.com/dhis2-sre/im-manager/internal/middleware" "github.com/dhis2-sre/im-manager/pkg/database" + "github.com/dhis2-sre/im-manager/pkg/event" "github.com/dhis2-sre/im-manager/pkg/group" "github.com/dhis2-sre/im-manager/pkg/model" "github.com/dhis2-sre/im-manager/pkg/token" @@ -72,7 +75,7 @@ func run() (err error) { logger := slog.New(log.New(slog.NewJSONHandler(os.Stdout, nil))) db, err := storage.NewDatabase(logger, cfg.Postgresql) if err != nil { - return err + return fmt.Errorf("failed to setup DB: %v", err) } userRepository := user.NewRepository(db) @@ -123,15 +126,18 @@ func run() (err error) { dockerHubClient := integration.NewDockerHubClient(cfg.DockerHub.Username, cfg.DockerHub.Password) - host := hostname() + hostname, err := os.Hostname() + if err != nil { + return fmt.Errorf("failed to get hostname: %v", err) + } consumer, err := rabbitmq.NewConsumer( - cfg.RabbitMqURL.GetUrl(), - rabbitmq.WithConnectionName(host), - rabbitmq.WithConsumerTagPrefix(host), + cfg.RabbitMqURL.GetURI(), + rabbitmq.WithConnectionName(hostname), + rabbitmq.WithConsumerTagPrefix(hostname), rabbitmq.WithLogger(logger.WithGroup("rabbitmq")), ) if err != nil { - return err + return fmt.Errorf("failed to setup RabbitMQ consumer: %v", err) } defer consumer.Close() @@ -153,7 +159,7 @@ func run() (err error) { s3Config, err := newS3Config(cfg.S3Region, s3Endpoint) if err != nil { - return err + return fmt.Errorf("failed to setup S3 config: %v", err) } s3AWSClient := s3.NewFromConfig(s3Config, func(o *s3.Options) { @@ -173,6 +179,30 @@ func run() (err error) { integrationHandler := integration.NewHandler(dockerHubClient, cfg.InstanceService.Host, cfg.DatabaseManagerService.Host) + logger.Info("Connecting with RabbitMQ stream client", "host", cfg.RabbitMqURL.Host, "port", cfg.RabbitMqURL.StreamPort) + env, err := stream.NewEnvironment( + stream.NewEnvironmentOptions(). + SetHost(cfg.RabbitMqURL.Host). + SetPort(cfg.RabbitMqURL.StreamPort). + SetUser(cfg.RabbitMqURL.Username). + SetPassword(cfg.RabbitMqURL.Password). + SetAddressResolver(stream.AddressResolver{Host: cfg.RabbitMqURL.Host, Port: cfg.RabbitMqURL.StreamPort}), + ) + if err != nil { + return fmt.Errorf("failed to connect with RabbitMQ stream client: %v", err) + } + logger.Info("Connected with RabbitMQ stream client", "host", cfg.RabbitMqURL.Host, "port", cfg.RabbitMqURL.StreamPort) + streamName := "events" + err = env.DeclareStream(streamName, + stream.NewStreamOptions(). + SetMaxSegmentSizeBytes(stream.ByteCapacity{}.MB(1)). + SetMaxAge(1*time.Hour). + SetMaxLengthBytes(stream.ByteCapacity{}.GB(1))) + if err != nil { + return fmt.Errorf("failed to declare RabbitMQ stream %q: %v", streamName, err) + } + eventHandler := event.NewHandler(logger, env, streamName) + err = user.CreateUser(cfg.AdminUser.Email, cfg.AdminUser.Password, userService, groupService, model.AdministratorGroupName, "admin") if err != nil { return err @@ -201,6 +231,7 @@ func run() (err error) { integration.Routes(r, authentication, integrationHandler) database.Routes(r, authentication.TokenAuthentication, databaseHandler) instance.Routes(r, authentication.TokenAuthentication, instanceHandler) + event.Routes(r, authentication.TokenAuthentication, eventHandler) logger.Info("Listening and serving HTTP") if err := r.Run(); err != nil { @@ -209,14 +240,6 @@ func run() (err error) { return nil } -func hostname() string { - hostname, err := os.Hostname() - if err != nil { - return "im-manager" - } - return hostname -} - type groupService interface { FindOrCreate(name string, hostname string, deployable bool) (*model.Group, error) } diff --git a/docker-compose.yml b/docker-compose.yml index 378f944e..6f399e09 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -75,10 +75,16 @@ services: - prod rabbitmq: - image: rabbitmq:3-management-alpine + image: bitnami/rabbitmq:3.13 ports: - - "5672:5672" - - "15672:15672" + - "127.0.0.1:5672:5672" # regular AMQP + - "127.0.0.1:15672:15672" # management UI + environment: + RABBITMQ_USERNAME: ${RABBITMQ_USERNAME} + RABBITMQ_PASSWORD: ${RABBITMQ_PASSWORD} + RABBITMQ_MANAGEMENT_ALLOW_WEB_ACCESS: true + RABBITMQ_DISK_FREE_ABSOLUTE_LIMIT: "100MB" + RABBITMQ_PLUGINS: "rabbitmq_management,rabbitmq_management_agent,rabbitmq_stream,rabbitmq_stream_management" healthcheck: # https://www.rabbitmq.com/docs/monitoring#stage-3 test: rabbitmq-diagnostics -q check_running && rabbitmq-diagnostics -q check_local_alarms diff --git a/go.mod b/go.mod index 63fb667d..1b7db9f2 100644 --- a/go.mod +++ b/go.mod @@ -10,8 +10,10 @@ require ( github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.1 github.com/aws/aws-sdk-go-v2/service/s3 v1.56.1 github.com/dhis2-sre/rabbitmq-client v0.3.1 + github.com/docker/go-connections v0.5.0 github.com/dominikbraun/graph v0.23.0 github.com/gin-contrib/cors v1.7.2 + github.com/gin-contrib/sse v0.1.0 github.com/gin-gonic/gin v1.10.0 github.com/go-mail/mail v2.3.1+incompatible github.com/go-openapi/runtime v0.28.0 @@ -25,7 +27,9 @@ require ( github.com/orandin/slog-gorm v1.3.2 github.com/orlangure/gnomock v0.30.0 github.com/rabbitmq/amqp091-go v1.10.0 + github.com/rabbitmq/rabbitmq-stream-go-client v1.4.4 github.com/stretchr/testify v1.9.0 + github.com/testcontainers/testcontainers-go v0.30.0 go.mozilla.org/sops/v3 v3.7.3 golang.org/x/crypto v0.24.0 golang.org/x/exp v0.0.0-20231108232855-2478ac86f678 @@ -39,7 +43,9 @@ require ( require ( cloud.google.com/go/compute v1.24.0 // indirect cloud.google.com/go/compute/metadata v0.2.3 // indirect + dario.cat/mergo v1.0.0 // indirect github.com/Azure/azure-sdk-for-go v63.3.0+incompatible // indirect + github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect github.com/Azure/go-autorest v14.2.0+incompatible // indirect github.com/Azure/go-autorest/autorest v0.11.26 // indirect github.com/Azure/go-autorest/autorest/adal v0.9.18 // indirect @@ -51,6 +57,7 @@ require ( github.com/Azure/go-autorest/logger v0.2.1 // indirect github.com/Azure/go-autorest/tracing v0.6.0 // indirect github.com/Microsoft/go-winio v0.6.1 // indirect + github.com/Microsoft/hcsshim v0.11.4 // indirect github.com/ProtonMail/go-crypto v0.0.0-20230923063757-afb1ddc0824c // indirect github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 // indirect github.com/aws/aws-sdk-go v1.47.13 // indirect @@ -73,26 +80,29 @@ require ( github.com/bytedance/sonic v1.11.6 // indirect github.com/bytedance/sonic/loader v0.1.1 // indirect github.com/cenkalti/backoff/v3 v3.2.2 // indirect + github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cloudflare/circl v1.3.7 // indirect github.com/cloudwego/base64x v0.1.4 // indirect github.com/cloudwego/iasm v0.2.0 // indirect + github.com/containerd/containerd v1.7.12 // indirect + github.com/containerd/log v0.1.0 // indirect + github.com/cpuguy83/dockercfg v0.3.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect github.com/dimchansky/utfbom v1.1.1 // indirect github.com/distribution/reference v0.5.0 // indirect github.com/docker/docker v25.0.5+incompatible // indirect - github.com/docker/go-connections v0.5.0 // indirect github.com/docker/go-units v0.5.0 // indirect github.com/emicklei/go-restful/v3 v3.11.0 // indirect github.com/evanphx/json-patch v5.7.0+incompatible // indirect github.com/fatih/color v1.16.0 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect github.com/gabriel-vasile/mimetype v1.4.3 // indirect - github.com/gin-contrib/sse v0.1.0 // indirect github.com/go-errors/errors v1.5.1 // indirect github.com/go-jose/go-jose/v3 v3.0.3 // indirect github.com/go-logr/logr v1.4.1 // indirect github.com/go-logr/stdr v1.2.2 // indirect + github.com/go-ole/go-ole v1.3.0 // indirect github.com/go-openapi/analysis v0.23.0 // indirect github.com/go-openapi/errors v0.22.0 // indirect github.com/go-openapi/jsonpointer v0.21.0 // indirect @@ -110,6 +120,7 @@ require ( github.com/golang-jwt/jwt/v4 v4.3.0 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.4 // indirect + github.com/golang/snappy v0.0.4 // indirect github.com/google/btree v1.1.2 // indirect github.com/google/gnostic-models v0.6.9-0.20230804172637-c7be7c783f49 // indirect github.com/google/gofuzz v1.2.0 // indirect @@ -143,6 +154,7 @@ require ( github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect + github.com/klauspost/compress v1.17.8 // indirect github.com/klauspost/cpuid/v2 v2.2.7 // indirect github.com/leodido/go-urn v1.4.0 // indirect github.com/lestrrat-go/blackmagic v1.0.2 // indirect @@ -151,16 +163,23 @@ require ( github.com/lestrrat-go/iter v1.0.2 // indirect github.com/lestrrat-go/option v1.0.1 // indirect github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de // indirect + github.com/lufia/plan9stats v0.0.0-20231016141302-07b5767bb0ed // indirect + github.com/magiconair/properties v1.8.7 // indirect github.com/mailru/easyjson v0.7.7 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect github.com/mitchellh/go-wordwrap v1.0.1 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect + github.com/moby/patternmatcher v0.6.0 // indirect github.com/moby/spdystream v0.2.0 // indirect + github.com/moby/sys/sequential v0.5.0 // indirect + github.com/moby/sys/user v0.1.0 // indirect + github.com/moby/term v0.5.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/monochromegane/go-gitignore v0.0.0-20200626010858-205db1a8cc00 // indirect + github.com/morikuni/aec v1.0.0 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/nxadm/tail v1.4.8 // indirect github.com/oklog/ulid v1.3.1 // indirect @@ -168,17 +187,25 @@ require ( github.com/opencontainers/image-spec v1.1.0 // indirect github.com/pelletier/go-toml/v2 v2.2.2 // indirect github.com/peterbourgon/diskv v2.0.1+incompatible // indirect + github.com/pierrec/lz4 v2.6.1+incompatible // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b // indirect github.com/ryanuber/go-glob v1.0.0 // indirect github.com/segmentio/asm v1.2.0 // indirect + github.com/shirou/gopsutil/v3 v3.23.12 // indirect + github.com/shoenig/go-m1cpu v0.1.6 // indirect github.com/sirupsen/logrus v1.9.3 // indirect + github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/spf13/cobra v1.8.0 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/stretchr/objx v0.5.2 // indirect + github.com/tklauser/go-sysconf v0.3.12 // indirect + github.com/tklauser/numcpus v0.6.1 // indirect github.com/twitchyliquid64/golang-asm v0.15.1 // indirect github.com/ugorji/go/codec v1.2.12 // indirect github.com/xlab/treeprint v1.2.0 // indirect + github.com/yusufpapurcu/wmi v1.2.3 // indirect go.mongodb.org/mongo-driver v1.14.0 // indirect go.mozilla.org/gopgagent v0.0.0-20170926210634-4d7ea76ff71a // indirect go.opencensus.io v0.24.0 // indirect @@ -214,7 +241,6 @@ require ( gopkg.in/urfave/cli.v1 v1.20.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect - gotest.tools/v3 v3.5.0 // indirect k8s.io/cli-runtime v0.28.3 // indirect k8s.io/klog/v2 v2.110.1 // indirect k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 // indirect diff --git a/go.sum b/go.sum index 25714f41..b0023425 100644 --- a/go.sum +++ b/go.sum @@ -32,6 +32,8 @@ dario.cat/mergo v1.0.0/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= filippo.io/age v1.2.0 h1:vRDp7pUMaAJzXNIWJVAZnEf/Dyi4Vu4wI8S1LBzufhE= filippo.io/age v1.2.0/go.mod h1:JL9ew2lTN+Pyft4RiNGguFfOpewKwSHm5ayKD/A4004= +github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24 h1:bvDV9vkmnHYOMsOr4WLk+Vo07yKIzd94sVoIqshQ4bU= +github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24/go.mod h1:8o94RPi1/7XTJvwPpRSzSUedZrtlirdB3r9Z20bi2f8= github.com/Azure/azure-sdk-for-go v63.3.0+incompatible h1:INepVujzUrmArRZjDLHbtER+FkvCoEwyRCXGqOlmDII= github.com/Azure/azure-sdk-for-go v63.3.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 h1:L/gRVlceqvL25UVaW/CKtUDjefjrs0SPonmDGUVOYP0= @@ -169,8 +171,8 @@ github.com/cloudwego/iasm v0.2.0/go.mod h1:8rXZaNYT2n95jn+zTI1sDr+IgcD2GVs0nlbbQ github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/containerd/containerd v1.7.12 h1:+KQsnv4VnzyxWcfO9mlxxELaoztsDEjOuCMPAuPqgU0= github.com/containerd/containerd v1.7.12/go.mod h1:/5OMpE1p0ylxtEUGY8kuCYkDRzJm9NO1TFMWjUpdevk= -github.com/containerd/continuity v0.2.2 h1:QSqfxcn8c+12slxwu00AtzXrsami0MJb/MQs9lOLHLA= -github.com/containerd/continuity v0.2.2/go.mod h1:pWygW9u7LtS1o4N/Tn0FoCFDIXZ7rxcMX7HX1Dmibvk= +github.com/containerd/continuity v0.4.2 h1:v3y/4Yz5jwnvqPKJJ+7Wf93fyWoCB3F5EclWG023MDM= +github.com/containerd/continuity v0.4.2/go.mod h1:F6PTNCKepoxEaXLQp3wDAjygEnImnZ/7o4JzpodfroQ= github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I= github.com/containerd/log v0.1.0/go.mod h1:VRRf09a7mHDIRezVKTRCrOq78v577GXq3bSa3EhrzVo= github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk= @@ -185,6 +187,8 @@ github.com/cpuguy83/dockercfg v0.3.1/go.mod h1:sugsbF4//dDlL/i+S+rtpIWp+5h0BHJHf github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/creack/pty v1.1.18 h1:n56/Zwd5o6whRC5PMGretI4IdRLlmBXYNjScPaBgsbY= +github.com/creack/pty v1.1.18/go.mod h1:MOBLtS5ELjhRRrroQr9kyvTxUAFNvYEK993ew/Vr4O4= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -228,9 +232,12 @@ github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4Nij github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= +github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= +github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= -github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= +github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= +github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= github.com/gabriel-vasile/mimetype v1.4.3 h1:in2uUcidCuFcDKtdcBxlR0rJ1+fsokWf+uqxgUFjbI0= github.com/gabriel-vasile/mimetype v1.4.3/go.mod h1:d8uq/6HKRL6CGdk+aubisF/M5GcPfT7nKyLpA0lbSSk= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= @@ -264,6 +271,7 @@ github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-mail/mail v2.3.1+incompatible h1:UzNOn0k5lpfVtO31cK3hn6I4VEVGhe3lX8AJBAxXExM= github.com/go-mail/mail v2.3.1+incompatible/go.mod h1:VPWjmmNyRsWXQZHVHT3g0YbIINUkSmuKOiLIDkWbL6M= +github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/go-ole/go-ole v1.3.0 h1:Dt6ye7+vXGIKZ7Xtk4s6/xVdGDQynvom7xCFEdWr6uE= github.com/go-ole/go-ole v1.3.0/go.mod h1:5LS6F96DhAwUc7C+1HLexzMXY1xGRSryjyPPKW6zv78= github.com/go-openapi/analysis v0.0.0-20180825180245-b006789cd277/go.mod h1:k70tL6pCuVxPJOHXQ+wIac1FUrvNkHolPie/cLEU6hI= @@ -392,6 +400,8 @@ github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaS github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.1.2 h1:xf4v41cLI2Z6FxbKm+8Bu+m8ifhj15JuZ9sa0jZCMUU= @@ -406,6 +416,7 @@ github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= @@ -419,8 +430,8 @@ github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OI github.com/google/pprof v0.0.0-20191218002539-d4f498aebedc/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM= github.com/google/pprof v0.0.0-20200212024743-f11f1df84d12/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM= github.com/google/pprof v0.0.0-20200229191704-1ebb73c60ed3/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM= -github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 h1:K6RDEckDVWvDI9JAJYCmNdQXq6neHJOYx3V6jnqNEec= -github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= +github.com/google/pprof v0.0.0-20230926050212-f7f687d19a98 h1:pUa4ghanp6q4IJHwE9RwLgmVFfReJN+KbQ8ExNEUUoQ= +github.com/google/pprof v0.0.0-20230926050212-f7f687d19a98/go.mod h1:czg5+yv1E0ZGTi6S6vVK1mke0fV+FaUhNGcd6VRS9Ik= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/s2a-go v0.1.7 h1:60BLSyTrOV4/haCDW4zb1guZItoSq8foHCXrAnjBo/o= github.com/google/s2a-go v0.1.7/go.mod h1:50CgR4k1jNlWBu4UfS4AcfhVe1r6pdZPygJ3R8F0Qdw= @@ -543,8 +554,8 @@ github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7V github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4= -github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU= +github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.2.7 h1:ZWSB3igEs+d0qvnxR/ZBzXVmxkgt8DdzP6m9pfuVLDM= github.com/klauspost/cpuid/v2 v2.2.7/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= @@ -579,6 +590,7 @@ github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de h1:9TO3cAIGXtEhnIaL+V+BEER86oLrvS+kWobKpbJuye0= github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de/go.mod h1:zAbeS9B/r2mtpb6U+EI2rYA5OAXxsYw6wTamcNW+zcE= +github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I= github.com/lufia/plan9stats v0.0.0-20231016141302-07b5767bb0ed h1:036IscGBfJsFIgJQzlui7nK1Ncm0tp2ktmPj8xO4N/0= github.com/lufia/plan9stats v0.0.0-20231016141302-07b5767bb0ed/go.mod h1:ilwx/Dta8jXAgpFYFvSWEMwxmbWXyiUHkd5FwyKhb5k= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= @@ -658,18 +670,18 @@ github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+ github.com/onsi/ginkgo v1.11.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.16.4 h1:29JGrr5oVBm5ulCWet69zQkzWipVXIol6ygQUe/EzNc= github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0= -github.com/onsi/ginkgo/v2 v2.9.4 h1:xR7vG4IXt5RWx6FfIjyAtsoMAtnc3C/rFXBBd2AjZwE= -github.com/onsi/ginkgo/v2 v2.9.4/go.mod h1:gCQYp2Q+kSoIj7ykSVb9nskRSsR6PUj4AiLywzIhbKM= +github.com/onsi/ginkgo/v2 v2.13.0 h1:0jY9lJquiL8fcf3M4LAXN5aMlS/b2BV86HFFPCPMgE4= +github.com/onsi/ginkgo/v2 v2.13.0/go.mod h1:TE309ZR8s5FsKKpuB1YAQYBzCaAfUgatB/xlT/ETL/o= github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA= github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= -github.com/onsi/gomega v1.27.6 h1:ENqfyGeS5AX/rlXDd/ETokDz93u0YufY1Pgxuy/PvWE= -github.com/onsi/gomega v1.27.6/go.mod h1:PIQNjfQwkP3aQAH7lf7j87O/5FiNr+ZR8+ipb+qQlhg= +github.com/onsi/gomega v1.28.0 h1:i2rg/p9n/UqIDAMFUJ6qIUUMcsqOuUHgbpbu235Vr1c= +github.com/onsi/gomega v1.28.0/go.mod h1:A1H2JE76sI14WIP57LMKj7FVfCHx3g3BcZVjJG8bjX8= github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= github.com/opencontainers/image-spec v1.1.0 h1:8SG7/vwALn54lVB/0yZ/MMwhFrPYtpEHQb2IpWsCzug= github.com/opencontainers/image-spec v1.1.0/go.mod h1:W4s4sFTMaBeK1BQLXbG4AdM2szdn85PY75RI83NrTrM= -github.com/opencontainers/runc v1.1.0 h1:O9+X96OcDjkmmZyfaG996kV7yq8HsoU2h1XRRQcefG8= -github.com/opencontainers/runc v1.1.0/go.mod h1:Tj1hFw6eFWp/o33uxGf5yF2BX5yz2Z6iptFpuvbbKqc= +github.com/opencontainers/runc v1.1.5 h1:L44KXEpKmfWDcS02aeGm8QNTFXTo2D+8MYGDIJ/GDEs= +github.com/opencontainers/runc v1.1.5/go.mod h1:1J5XiS+vdZ3wCyZybsuxXZWGrgSr8fFJHLXuG2PsnNg= github.com/orandin/slog-gorm v1.3.2 h1:C0lKDQPAx/pF+8K2HL7bdShPwOEJpPM0Bn80zTzxU1g= github.com/orandin/slog-gorm v1.3.2/go.mod h1:MoZ51+b7xE9lwGNPYEhxcUtRNrYzjdcKvA8QXQQGEPA= github.com/orlangure/gnomock v0.30.0 h1:WXq/3KTKRVYe9a3BXa5JMZCCrg2RwNAPB2bZHMxEntE= @@ -683,6 +695,8 @@ github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6 github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/peterbourgon/diskv v2.0.1+incompatible h1:UBdAOUP5p4RWqPBg048CAvpKN+vxiaj6gdUUzhl4XmI= github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU= +github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM= +github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -690,6 +704,7 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI= +github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE= github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b h1:0LFwY6Q3gMACTjAbMZBjXAqTOzOwFaj2Ld6cjeQ7Rig= github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE= github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= @@ -704,6 +719,8 @@ github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084/go.mod h1:TjEm7z github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw= github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= +github.com/rabbitmq/rabbitmq-stream-go-client v1.4.4 h1:fPyWyDvNF1jdO23H3PqRc5J9bFSvv1Fl9/oyu/AP31U= +github.com/rabbitmq/rabbitmq-stream-go-client v1.4.4/go.mod h1:siR4Lc4SKNnKtxHMzG6exzS6uJOm49Ncynglv7En5kI= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= @@ -723,6 +740,8 @@ github.com/shirou/gopsutil/v3 v3.23.12 h1:z90NtUkp3bMtmICZKpC4+WaknU1eXtp5vtbQ11 github.com/shirou/gopsutil/v3 v3.23.12/go.mod h1:1FrWgea594Jp7qmjHUUPlJDTPgcsb9mGnXDxavtikzM= github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFtM= github.com/shoenig/go-m1cpu v0.1.6/go.mod h1:1JJMcUBvfNwpq05QDQVAnx3gUHr9IYF7GNg9SUEw2VQ= +github.com/shoenig/test v0.6.4 h1:kVTaSd7WLz5WZ2IaoM0RSzRsUD+m8wRR+5qvntpn4LU= +github.com/shoenig/test v0.6.4/go.mod h1:byHiCGXqrVaflBLAMq/srcZIHynQPQgeyvkvXnjqq0k= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= @@ -735,6 +754,8 @@ github.com/smartystreets/goconvey v1.7.2 h1:9RBaZCeXEQ3UselpuwUQHltGVXvdwm6cv1hg github.com/smartystreets/goconvey v1.7.2/go.mod h1:Vw0tHAZW6lzCRk3xgdin6fKYcG+G3Pg9vgXWeJpQFMM= github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= +github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= +github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= github.com/spf13/afero v1.2.2/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk= github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= @@ -984,6 +1005,7 @@ golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190616124812-15dcb6c0061f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191002063906-3421d5a6bb1c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -1001,9 +1023,11 @@ golang.org/x/sys v0.0.0-20200302150141-5c8b2ff67527/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210426230700-d19ff857e887/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -1011,11 +1035,14 @@ golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws= golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= @@ -1207,6 +1234,7 @@ gorm.io/driver/postgres v1.5.9 h1:DkegyItji119OlcaLjqN11kHoUgZ/j13E0jkJZgD6A8= gorm.io/driver/postgres v1.5.9/go.mod h1:DX3GReXH+3FPWGrrgffdvCk3DQ1dwDPdmbenSkweRGI= gorm.io/gorm v1.25.10 h1:dQpO+33KalOA+aFYGlK+EfxcI5MbO7EP2yYygwh9h+s= gorm.io/gorm v1.25.10/go.mod h1:hbnx/Oo0ChWMn1BIhpy1oYozzpM15i4YPuHDmfYtwg8= +gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo= gotest.tools/v3 v3.5.0 h1:Ljk6PdHdOhAb5aDMWXjDLMMhph+BpztA4v1QdqEW2eY= gotest.tools/v3 v3.5.0/go.mod h1:isy3WKz7GK6uNw/sbHzfKBLvlvXwUyV06n6brMxxopU= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/pkg/config/config.go b/pkg/config/config.go index 71f6afb3..3ae77bc6 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -79,10 +79,11 @@ func New() Config { Password: requireEnv("SMTP_PASSWORD"), }, RabbitMqURL: rabbitmq{ - Host: requireEnv("RABBITMQ_HOST"), - Port: requireEnvAsInt("RABBITMQ_PORT"), - Username: requireEnv("RABBITMQ_USERNAME"), - Password: requireEnv("RABBITMQ_PASSWORD"), + Host: requireEnv("RABBITMQ_HOST"), + Port: requireEnvAsInt("RABBITMQ_PORT"), + StreamPort: requireEnvAsInt("RABBITMQ_STREAM_PORT"), + Username: requireEnv("RABBITMQ_USERNAME"), + Password: requireEnv("RABBITMQ_PASSWORD"), }, Redis: Redis{ Host: requireEnv("REDIS_HOST"), @@ -142,10 +143,11 @@ type Postgresql struct { } type rabbitmq struct { - Host string - Port int - Username string - Password string + Host string + Port int + StreamPort int + Username string + Password string } type smtp struct { @@ -155,7 +157,8 @@ type smtp struct { Password string } -func (r rabbitmq) GetUrl() string { +// GetURI returns the AMQP URI for RabbitMQ. +func (r rabbitmq) GetURI() string { return fmt.Sprintf("amqp://%s:%s@%s:%d/", r.Username, r.Password, r.Host, r.Port) } diff --git a/pkg/event/docs.go b/pkg/event/docs.go new file mode 100644 index 00000000..249d1aba --- /dev/null +++ b/pkg/event/docs.go @@ -0,0 +1,9 @@ +package event + +import "github.com/gin-contrib/sse" + +// swagger:response Stream +type _ struct { + // in: body + _ sse.Event +} diff --git a/pkg/event/event_integration_test.go b/pkg/event/event_integration_test.go new file mode 100644 index 00000000..79a1bcf2 --- /dev/null +++ b/pkg/event/event_integration_test.go @@ -0,0 +1,340 @@ +package event_test + +import ( + "bufio" + "context" + "errors" + "fmt" + "log/slog" + "net/http" + "os" + "strconv" + "strings" + "testing" + "time" + + "github.com/dhis2-sre/im-manager/pkg/event" + "github.com/dhis2-sre/im-manager/pkg/inttest" + "github.com/dhis2-sre/im-manager/pkg/model" + "github.com/gin-gonic/gin" + "github.com/google/uuid" + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp" + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/ha" + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message" + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestEventHandler(t *testing.T) { + t.Parallel() + + db := inttest.SetupDB(t) + + sharedGroup := model.Group{ + Name: "group1", + Hostname: "hostname1", + } + err := db.Create(&sharedGroup).Error + require.NoError(t, err) + + user1 := &model.User{ + Email: "user1@dhis2.org", + EmailToken: uuid.New(), + Groups: []model.Group{ + sharedGroup, + }, + } + err = db.Create(user1).Error + require.NoError(t, err) + + groupExclusiveToUser2 := model.Group{ + Name: "group2", + Hostname: "hostname2", + } + err = db.Create(&groupExclusiveToUser2).Error + require.NoError(t, err) + + user2 := &model.User{ + Email: "user2@dhis2.org", + EmailToken: uuid.New(), + Groups: []model.Group{ + sharedGroup, + groupExclusiveToUser2, + }, + } + err = db.Create(user2).Error + require.NoError(t, err) + + rabbitmq := inttest.SetupRabbitStream(t) + streamName := "test-event-handler" + err = rabbitmq.Environment.DeclareStream(streamName, + stream.NewStreamOptions(). + SetMaxSegmentSizeBytes(stream.ByteCapacity{}.MB(1)). + SetMaxLengthBytes(stream.ByteCapacity{}.MB(20))) + require.NoError(t, err, "failed to declare RabbitMQ stream") + + logger := slog.New(slog.NewTextHandler(os.Stdout, nil)) + // this is only to allow testing using multiple users without bringing in all our auth stack + authenticator := func(ctx *gin.Context) { + userParam := ctx.Query("user") + userID, err := strconv.ParseUint(userParam, 10, 64) + require.NoErrorf(t, err, "failed to parse query param user=%q into user ID", userParam) + + if userID == uint64(user1.ID) { + ctx.Set("user", user1) + return + } else if userID == uint64(user2.ID) { + ctx.Set("user", user2) + return + } + + require.FailNow(t, "provide query param user=userID in the test") + } + client := inttest.SetupHTTPServer(t, func(engine *gin.Engine) { + eventHandler := event.NewHandler(logger, rabbitmq.Environment, streamName) + event.Routes(engine, authenticator, eventHandler) + }) + + eventEmitter := NewEventEmitter(t, rabbitmq.Environment, streamName) + defer eventEmitter.Close() + + t.Log("Sending messages before users are subscribed") + // users should only get the next published message after they subscribed so these messages + // should not be received by anyone + eventEmitter.emit(t, "db-update", sharedGroup.Name, user1) + eventEmitter.emit(t, "db-update", sharedGroup.Name, user2) + eventEmitter.emit(t, "db-update", sharedGroup.Name, nil) + eventEmitter.emit(t, "instance-update", sharedGroup.Name, nil) + + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Second) + defer cancel() + ctxUser1, cancelUser1 := context.WithCancelCause(ctx) + defer cancelUser1(nil) + user1Messages := streamEvents(t, ctxUser1, client, user1, nil) + + t.Log("Sending messages after user1 subscribed") + event5 := eventEmitter.emit(t, "db-update", sharedGroup.Name, user1) + eventEmitter.emit(t, "db-update", sharedGroup.Name, user2) + event7 := eventEmitter.emit(t, "db-update", sharedGroup.Name, nil) + event8 := eventEmitter.emit(t, "instance-update", sharedGroup.Name, nil) + + wantUser1Messages := []sseEvent{event5, event7, event8} + + t.Log("Waiting on messages for user1...") + var gotUser1Messages []sseEvent + for len(wantUser1Messages) != len(gotUser1Messages) { + select { + case <-ctx.Done(): + require.Fail(t, "Timed out waiting on messages.") + case msg := <-user1Messages: + gotUser1Messages = append(gotUser1Messages, msg) + } + } + require.EqualValuesf(t, wantUser1Messages, gotUser1Messages, "mismatch in expected messages for user %d", user1.ID) + + ctxUser2, cancelUser2 := context.WithTimeout(context.Background(), 50*time.Second) + defer cancelUser2() + user2Messages := streamEvents(t, ctxUser2, client, user2, nil) + + t.Log("Sending messages after user2 subscribed") + event9 := eventEmitter.emit(t, "db-update", sharedGroup.Name, user1) + event10 := eventEmitter.emit(t, "db-update", sharedGroup.Name, user2) + event11 := eventEmitter.emit(t, "db-update", sharedGroup.Name, nil) + event12 := eventEmitter.emit(t, "instance-update", groupExclusiveToUser2.Name, nil) + + wantUser1Messages = []sseEvent{event9, event11} + wantUser2Messages := []sseEvent{event10, event11, event12} + + t.Log("Waiting on messages for both users...") + gotUser1Messages = nil + var gotUser2Messages []sseEvent + for len(wantUser1Messages) != len(gotUser1Messages) || len(wantUser2Messages) != len(gotUser2Messages) { + select { + case <-ctxUser1.Done(): + case <-ctxUser2.Done(): + require.FailNow(t, "Timed out waiting on messages.") + case msg := <-user1Messages: + gotUser1Messages = append(gotUser1Messages, msg) + case msg := <-user2Messages: + gotUser2Messages = append(gotUser2Messages, msg) + } + } + require.EqualValuesf(t, wantUser1Messages, gotUser1Messages, "mismatch in expected messages for user %d", user1.ID) + require.EqualValuesf(t, wantUser2Messages, gotUser2Messages, "mismatch in expected messages for user %d", user2.ID) + + cancelUser1(errors.New("drop connection")) + <-user1Messages // wait for user1 to be unsubscribed before sending new messages to test Last-Event-ID + + t.Log("Sending messages after user1 dropped its connection") + event13 := eventEmitter.emit(t, "instance-update", groupExclusiveToUser2.Name, nil) + event14 := eventEmitter.emit(t, "db-update", sharedGroup.Name, nil) + event15 := eventEmitter.emit(t, "instance-update", sharedGroup.Name, nil) + + // When a SSE client disconnects it will send the HTTP header Last-Event-ID with the ID of the + // event it last received. We want to then send the event after that. + user1Messages = streamEvents(t, ctxUser2, client, user1, &event11.ID) + + wantUser1Messages = []sseEvent{event14, event15} + wantUser2Messages = []sseEvent{event13, event14, event15} + + t.Log("Waiting on messages for both users...") + gotUser1Messages, gotUser2Messages = nil, nil + for len(wantUser1Messages) != len(gotUser1Messages) || len(wantUser2Messages) != len(gotUser2Messages) { + select { + case <-ctxUser2.Done(): + require.FailNow(t, "Timed out waiting on messages.") + case msg := <-user1Messages: + gotUser1Messages = append(gotUser1Messages, msg) + case msg := <-user2Messages: + gotUser2Messages = append(gotUser2Messages, msg) + } + } + assert.EqualValuesf(t, wantUser1Messages, gotUser1Messages, "mismatch in expected messages for user %d", user1.ID) + assert.EqualValuesf(t, wantUser2Messages, gotUser2Messages, "mismatch in expected messages for user %d", user2.ID) +} + +func streamEvents(t *testing.T, ctx context.Context, client *inttest.HTTPClient, user *model.User, lastEventId *int64) <-chan sseEvent { + url := fmt.Sprintf("%s/events?user=%d", client.ServerURL, user.ID) + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) + require.NoErrorf(t, err, "User %d failed to create request to stream from %q", user.ID, url) + + if lastEventId != nil { + req.Header.Add("Last-Event-ID", strconv.FormatInt(*lastEventId, 10)) + t.Logf("User %d starts to stream from %q from Last-Event-ID %d", user.ID, url, *lastEventId) + } else { + t.Logf("User %d starts to stream from %q from next event", user.ID, url) + } + + resp, err := client.Client.Do(req) + require.NoErrorf(t, err, "User %d failed to stream from %q", user.ID, url) + require.Equal(t, http.StatusOK, resp.StatusCode, "User %d failed to stream from %q", user.ID, url) + + out := make(chan sseEvent) + go func() { + defer resp.Body.Close() + + sc := bufio.NewScanner(resp.Body) + var event sseEvent + var gotData bool + var newlineCount int + for sc.Scan() { + line := sc.Text() + field, fieldValue, found := strings.Cut(line, ":") + if found { + switch field { + case "id": + id, err := strconv.ParseInt(fieldValue, 10, 64) + require.NoError(t, err, "failed to parse SSE event ID in line %q", line) + event.ID = id + case "event": + event.Event = fieldValue + case "data": + event.Data = []byte(fieldValue) + gotData = true + newlineCount++ + } + } else if gotData { + newlineCount++ + } + + if newlineCount == 2 { // SSE event is done + out <- event + newlineCount = 0 + } + } + + close(out) + if !errors.Is(sc.Err(), context.Canceled) { + require.NoErrorf(t, sc.Err(), "error scanning event stream from %q for user %d", url, user.ID) + } + t.Logf("User %d stops to stream from %q due: %v", user.ID, url, context.Cause(ctx)) + }() + + return out +} + +// sseEvent is the struct we use to assert on received SSE events. +type sseEvent struct { + ID int64 + Event string + Data []byte +} + +// eventEmitter emits an event to RabbitMQ which can then be streamed via SSE from the event handler. +type eventEmitter struct { + producer *ha.ReliableProducer + eventCount int64 + eventPublished chan error +} + +func NewEventEmitter(t *testing.T, env *stream.Environment, streamName string) eventEmitter { + producerName := "test-event-handler" + eventPublished := make(chan error) + opts := stream.NewProducerOptions(). + SetProducerName(producerName). + SetClientProvidedName(producerName). + SetFilter( + // each message will get the group as filter key + stream.NewProducerFilter(func(message message.StreamMessage) string { + return fmt.Sprintf("%s", message.GetApplicationProperties()["group"]) + })) + producer, err := ha.NewReliableProducer(env, streamName, opts, func(messageStatus []*stream.ConfirmationStatus) { + go func() { + for _, msgStatus := range messageStatus { + if msgStatus.IsConfirmed() { + eventPublished <- nil + } else { + eventPublished <- fmt.Errorf("failed to publish message with publishing_id=%d", msgStatus.GetMessage().GetPublishingId()) + } + } + }() + }) + require.NoError(t, err, "failed to create RabbitMQ producer") + return eventEmitter{producer: producer, eventPublished: eventPublished} +} + +func (es *eventEmitter) Close() error { + return es.producer.Close() +} + +// emit emits an instance manager event and returns the SSE event we expect an eligible user to +// receive via the event handler. This is a blocking operation. An event has an event counter that +// is 1-indexed for human readability. This event counter is also used for deduplication in RabbitMQ +// by setting it as the publishing id. Since we assert on the SSE event clients should receive we +// also need to assert on the SSE id field. The event handler sets the id field to the RabbitMQ +// offset (0-indexed) so SSE clients can resume on re-connect. +func (es *eventEmitter) emit(t *testing.T, kind, group string, owner *model.User) sseEvent { + streamOffset := es.eventCount + es.eventCount++ + + data := []byte(strconv.FormatInt(es.eventCount, 10)) + message := amqp.NewMessage(data) + // set a publishing id for deduplication + message.SetPublishingId(es.eventCount) + // set properties used for filtering + message.ApplicationProperties = map[string]interface{}{"group": group} + if owner != nil { + message.ApplicationProperties["owner"] = strconv.Itoa(int(owner.ID)) + } + // set property that dictates the SSE event field + message.ApplicationProperties["kind"] = kind + err := es.producer.Send(message) + require.NoErrorf(t, err, "failed to send message to RabbitMQ stream of kind %q, group %q, user %v", kind, group, owner) + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + select { + case <-ctx.Done(): + assert.FailNow(t, "Timed out waiting on sent message.") + case err := <-es.eventPublished: + require.NoError(t, err) + t.Logf("Sent event %d", es.eventCount) + return sseEvent{ + ID: streamOffset, + Event: kind, + Data: data, + } + } + return sseEvent{} +} diff --git a/pkg/event/handler.go b/pkg/event/handler.go new file mode 100644 index 00000000..7a93461f --- /dev/null +++ b/pkg/event/handler.go @@ -0,0 +1,270 @@ +package event + +import ( + "errors" + "fmt" + "log/slog" + "math/rand/v2" + "net/http" + "strconv" + + "github.com/dhis2-sre/im-manager/internal/handler" + "github.com/dhis2-sre/im-manager/pkg/model" + "github.com/gin-contrib/sse" + "github.com/gin-gonic/gin" + "github.com/google/uuid" + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp" + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/ha" + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream" + "golang.org/x/exp/maps" +) + +func NewHandler(logger *slog.Logger, env *stream.Environment, streamName string) Handler { + return Handler{ + logger: logger, + env: env, + streamName: streamName, + } +} + +type Handler struct { + logger *slog.Logger + env *stream.Environment + streamName string +} + +func (h Handler) StreamEvents(c *gin.Context) { + // swagger:route GET /events streamSSE + // + // Stream events + // + // Stream events... + // + // responses: + // 200: Stream + // 400: Error + // 403: Error + // 404: Error + // 415: Error + // + // security: + // oauth2: + user, err := handler.GetUserFromContext(c) + if err != nil { + _ = c.Error(err) + return + } + + consumerName := fmt.Sprintf("user-%d-%s", user.ID, uuid.NewString()) + logger := h.logger.WithGroup("consumer").With("name", consumerName) + + userGroups := userGroups(user) + if len(userGroups) == 0 { + _ = c.AbortWithError(403, errors.New("You cannot stream events as you are not part of a group. Ask an administrator for help.")) + return + } + + // check offset to return 400 before any other header in case of an error + offsetSpec, err := computeOffsetSpec(c) + if err != nil { + logger.Error("Failed to compute RabbitMQ offset spec", "error", err) + _ = c.AbortWithError(400, err) + return + } + retry := computeRetry() + logger = h.logger.With("retry", retry) + + filter := stream.NewConsumerFilter(maps.Keys(userGroups), true, postFilter(logger, user.ID, userGroups)) + opts := stream.NewConsumerOptions(). + SetConsumerName(consumerName). + SetClientProvidedName(consumerName). + SetManualCommit(). + SetOffset(offsetSpec). + SetFilter(filter) + sseEvents, messageHandler := createMessageHandler(c.Request.Context().Done(), logger, retry) + consumer, err := ha.NewReliableConsumer(h.env, h.streamName, opts, messageHandler) + if err != nil { + logger.Error("Failed to create RabbitMQ consumer", "error", err) + _ = c.AbortWithError(500, err) + return + } + defer consumer.Close() + + c.Writer.Header().Set("Content-Type", "text/event-stream") + c.Writer.Header().Set("Cache-Control", "no-cache") + c.Writer.Header().Set("Connection", "keep-alive") + c.Writer.Header().Set("Transfer-Encoding", "chunked") + c.Writer.WriteHeader(http.StatusOK) + c.Writer.Flush() + logger.Info("Connection established for sending SSE events", "offsetSpec", offsetSpec) + + for { + select { + case <-c.Request.Context().Done(): + logger.Info("Request canceled, returning from handler") + return + case sseEvent := <-sseEvents: + c.Render(-1, sseEvent) + c.Writer.Flush() + } + } +} + +// computeOffsetSpec computes the offset from which the SSE client will stream RabbitMQ messages +// from. By default clients will receive the next message that is published to the events stream. +// This means they will not receive "old" events. SSE clients send a "Last-Event-ID" HTTP header on +// re-connect. The "Last-Event-ID" value is a RabbitMQ offset we send in the SSE id field. Clients +// can thus resume where they left off. +func computeOffsetSpec(c *gin.Context) (stream.OffsetSpecification, error) { + lastEventID := c.GetHeader("Last-Event-ID") + if lastEventID == "" { + return stream.OffsetSpecification{}.Next(), nil + } + + // "Last-Event-ID" header is sent when SSE clients re-connect + lastOffset, err := strconv.ParseInt(lastEventID, 10, 64) + if err != nil { + return stream.OffsetSpecification{}, fmt.Errorf("invalid %q value: %v", "Last-Event-ID", err) + } + + return stream.OffsetSpecification{}.Offset(lastOffset + 1), nil +} + +// computeRetry computes the SSE computeRetry value in milliseconds. It is composed of a base of 3 seconds with an +// additional jitter of up to 1 second. +func computeRetry() uint { + var base, maxJitter uint = 3_000, 1_001 + // math rand v2 has the better API and is good enough for computing the jitter + return base + rand.UintN(maxJitter) //nolint:gosec +} + +func userGroups(user *model.User) map[string]struct{} { + result := make(map[string]struct{}, len(user.Groups)) + for _, group := range user.Groups { + result[group.Name] = struct{}{} + } + return result +} + +// postFilter is a RabbitMQ stream post filter that is applied client side. This is necessary as the +// server side filter is probabilistic and can let false positives through. (see +// https://www.rabbitmq.com/blog/2023/10/16/stream-filtering) The filter must be simple and fast. +func postFilter(logger *slog.Logger, userID uint, userGroupsMap map[string]struct{}) stream.PostFilter { + return func(message *amqp.Message) bool { + return isUserMessageOwner(logger, userID, message.ApplicationProperties) && isUserPartOfMessageGroup(logger, userGroupsMap, message.ApplicationProperties) + } +} + +// isUserMessageOwner determines if the user is allowed to receive the message. This function only +// considers the "owner" property of a message. Messages that have no owner can be read by the user. +// Messages that have an owner can only be read by the user if the "owner" property value can be +// parsed and matches the userID. +func isUserMessageOwner(logger *slog.Logger, userID uint, applicationProperties map[string]any) bool { + owner, ok := applicationProperties["owner"] + if !ok { + return true + } + + messageOwner, ok := owner.(string) + if !ok { + logger.Error("Failed to type assert RabbitMQ message application property to a string", "messageProperty", "owner", "messagePropertyValue", owner) + return false + } + + messageOwnerID, err := strconv.ParseUint(messageOwner, 10, 64) + if err != nil { + logger.Error("Failed to parse RabbitMQ message application property to a uint", "messageProperty", "owner", "messagePropertyValue", owner, "error", err) + return false + + } + + return messageOwnerID == uint64(userID) +} + +// isUserPartOfMessageGroup determines if the user is allowed to receive the message. This function +// only considers the "group" property of a message. Messages that have no group can be read by the +// user. Messages that have a group can only be read by the user if the "group" property value can +// be parsed and matches one of the userGroupsMap. +func isUserPartOfMessageGroup(logger *slog.Logger, userGroups map[string]struct{}, applicationProperties map[string]any) bool { + group, ok := applicationProperties["group"] + if !ok { + return true + } + + messageGroup, ok := group.(string) + if !ok { + logger.Error("Failed to type assert RabbitMQ message application property to a string", "messageProperty", "group", "messagePropertyValue", group) + return false + } + + _, ok = userGroups[messageGroup] + return ok +} + +// createMessageHandler returns stream.MessagesHandler that will transform RabbitMQ messages of +// instance manager events into SSE events. These SSE events are sent via the read-only channel +// returned. This is to avoid race conditions when writing the data out to the HTTP response writer. +// Only one Go routine should write to the HTTP response writer. The RabbitMQ stream client runs our +// stream.MessagesHandler in a separate Go routine. +func createMessageHandler(done <-chan struct{}, logger *slog.Logger, retry uint) (<-chan sse.Event, stream.MessagesHandler) { + out := make(chan sse.Event) + return out, func(consumerContext stream.ConsumerContext, message *amqp.Message) { + defer func() { + if r := recover(); r != nil { + logger.Error("RabbitMQ message handler panicked", "recover", r) + // We assume that we cannot recover from a panic in a message handler. We thus panic + // again. We do want to log any panic to be notfied. + panic(r) + } + }() + + select { + case <-done: + logger.Info("Request canceled, returning from messageHandler") + close(out) + return + default: + sseEvent, err := mapMessageToEvent(retry, consumerContext.Consumer.GetOffset(), message) + if err != nil { + logger.Error("Failed to map AMQP message", "error", err) + return + } + + select { + case <-done: + logger.Info("Request canceled, returning from messageHandler") + close(out) + return + case out <- sseEvent: + } + } + } +} + +// mapMessageToEvent maps an AMQP message of an instance manager event to an SSE event. No error is +// returned if the message could be processed and an SSE event should be sent. Do not send an SSE +// event when an error is returned. +func mapMessageToEvent(retry uint, offset int64, message *amqp.Message) (sse.Event, error) { + if len(message.Data) == 0 { + return sse.Event{}, errors.New("received no data") + } + + var event string + if kindProperty, ok := message.ApplicationProperties["kind"]; ok { + if kind, ok := kindProperty.(string); ok { + event = kind + } else { + return sse.Event{}, fmt.Errorf("type assertion of RabbitMQ message application property %q failed, value=%v", "type", kindProperty) + } + } + + sseEvent := sse.Event{ + Id: strconv.FormatInt(offset, 10), + Data: string(message.Data[0]), + Retry: retry, + } + if event != "" { // SSE named event + sseEvent.Event = event + } + return sseEvent, nil +} diff --git a/pkg/event/handler_test.go b/pkg/event/handler_test.go new file mode 100644 index 00000000..b78d997e --- /dev/null +++ b/pkg/event/handler_test.go @@ -0,0 +1,125 @@ +package event + +import ( + "log/slog" + "os" + "strconv" + "testing" + + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp" + "github.com/stretchr/testify/assert" +) + +func TestEventPostFilter(t *testing.T) { + logger := slog.New(slog.NewTextHandler(os.Stdout, nil)) + var user1 uint = 1 + user1GroupsMap := map[string]struct{}{ + "group1": {}, + "group3": {}, + "group4": {}, + "group7": {}, + } + var nonMatchingUserID uint = 17 + nonMatchingGroup := "group17" + tests := map[string]struct { + userID uint + userGroupsMap map[string]struct{} + message amqp.Message + want bool + }{ + "MessageForAnyone": { + userID: user1, + userGroupsMap: user1GroupsMap, + message: amqp.Message{}, + want: true, + }, + "MessageForGroupMatchingTheUsersGroup": { + userID: user1, + userGroupsMap: user1GroupsMap, + message: amqp.Message{ + ApplicationProperties: map[string]any{"group": "group3"}, + }, + want: true, + }, + "MessageForUserMatchingTheUser": { + userID: user1, + userGroupsMap: user1GroupsMap, + message: amqp.Message{ + ApplicationProperties: map[string]any{"owner": strconv.Itoa(int(user1))}, + }, + want: true, + }, + "MessageForGroupAndUserMatchingTheUser": { + userID: user1, + userGroupsMap: user1GroupsMap, + message: amqp.Message{ + ApplicationProperties: map[string]any{"group": "group4", "owner": strconv.Itoa(int(user1))}, + }, + want: true, + }, + "MessageForGroupNotMatchingTheUsersGroup": { + userID: user1, + userGroupsMap: user1GroupsMap, + message: amqp.Message{ + ApplicationProperties: map[string]any{"group": "group10"}, + }, + want: false, + }, + "MessageForGroupAndUserNotMatchingTheUsersID": { + userID: user1, + userGroupsMap: user1GroupsMap, + message: amqp.Message{ + ApplicationProperties: map[string]any{"group": "group7", "owner": strconv.Itoa(int(nonMatchingUserID))}, + }, + want: false, + }, + "MessageForGroupAndUserNotMatchingTheUsersGroup": { + userID: user1, + userGroupsMap: user1GroupsMap, + message: amqp.Message{ + ApplicationProperties: map[string]any{"group": nonMatchingGroup, "owner": strconv.Itoa(int(user1))}, + }, + want: false, + }, + "MessageForGroupAndUserNotMatchingTheUsersIDAndGroup": { + userID: user1, + userGroupsMap: user1GroupsMap, + message: amqp.Message{ + ApplicationProperties: map[string]any{"group": nonMatchingGroup, "owner": strconv.Itoa(int(nonMatchingUserID))}, + }, + want: false, + }, + "MessageForGroupAndUserWithNonStringOwner": { + userID: user1, + userGroupsMap: user1GroupsMap, + message: amqp.Message{ + ApplicationProperties: map[string]any{"group": nonMatchingGroup, "owner": user1}, + }, + want: false, + }, + "MessageForGroupAndUserWithNonUintOwner": { + userID: user1, + userGroupsMap: user1GroupsMap, + message: amqp.Message{ + ApplicationProperties: map[string]any{"group": nonMatchingGroup, "owner": "wrong"}, + }, + want: false, + }, + "MessageForGroupWithNonStringGroup": { + userID: user1, + userGroupsMap: user1GroupsMap, + message: amqp.Message{ + ApplicationProperties: map[string]any{"group": 1}, + }, + want: false, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + got := postFilter(logger, test.userID, test.userGroupsMap)(&test.message) + + assert.Equal(t, test.want, got) + }) + } +} diff --git a/pkg/event/router.go b/pkg/event/router.go new file mode 100644 index 00000000..4aedbb4a --- /dev/null +++ b/pkg/event/router.go @@ -0,0 +1,11 @@ +package event + +import ( + "github.com/gin-gonic/gin" +) + +func Routes(r *gin.Engine, authenticator gin.HandlerFunc, handler Handler) { + router := r.Group("") + router.Use(authenticator) + router.GET("/events", handler.StreamEvents) +} diff --git a/pkg/instance/ttlDestroyConsumer_test.go b/pkg/instance/ttlDestroyConsumer_test.go index 6b33800d..4ff5318d 100644 --- a/pkg/instance/ttlDestroyConsumer_test.go +++ b/pkg/instance/ttlDestroyConsumer_test.go @@ -18,10 +18,10 @@ import ( func TestConsumeDeletesInstance(t *testing.T) { t.Parallel() - amqpClient := inttest.SetupRabbitMQ(t) + amqpClient := inttest.SetupRabbitMQAMQP(t) consumer, err := rabbitmq.NewConsumer( - amqpClient.URI, + amqpClient.URI(t), rabbitmq.WithConnectionName(t.Name()), rabbitmq.WithConsumerTagPrefix(t.Name()), ) diff --git a/pkg/inttest/rabbitmq.go b/pkg/inttest/rabbitmq.go index dfa24237..a3c10df7 100644 --- a/pkg/inttest/rabbitmq.go +++ b/pkg/inttest/rabbitmq.go @@ -1,48 +1,273 @@ +// Package inttest provides setup functions that create a RabbitMQ container. We are using the +// management image for RabbitMQ so you can debug and interact with tests using its admin panel. Use +// a debugger, adjust timeouts waiting for a message or add a time.Sleep and find the exposed +// management port to login to the UI. You will find it easier to debug if your test configures the +// consumers connection and or consumer tag prefix. package inttest import ( + "context" "fmt" + "strings" "testing" - "github.com/orlangure/gnomock" - "github.com/orlangure/gnomock/preset/rabbitmq" - amqp "github.com/rabbitmq/amqp091-go" + "github.com/docker/go-connections/nat" + amqpgo "github.com/rabbitmq/amqp091-go" + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream" "github.com/stretchr/testify/require" + "github.com/testcontainers/testcontainers-go" + "github.com/testcontainers/testcontainers-go/network" + "github.com/testcontainers/testcontainers-go/wait" ) -// SetupRabbitMQ creates a RabbitMQ container returning an AMQP client ready to send messages to it. -func SetupRabbitMQ(t *testing.T) *amqpTestClient { +const amqpPort = "5672" +const natAMQPPort = amqpPort + "/tcp" +const streamPort = "5552" +const natStreamPort = streamPort + "/tcp" + +// SetupRabbitMQAMQP creates a RabbitMQ with an AMQP client ready to send messages to it. +func SetupRabbitMQAMQP(t *testing.T, options ...rabbitMQOption) *AMQP { t.Helper() + require := require.New(t) + ctx := context.TODO() - container, err := gnomock.Start( - rabbitmq.Preset( - rabbitmq.WithUser("im", "im"), - ), - ) - require.NoError(t, err, "failed to start RabbitMQ") - t.Cleanup(func() { require.NoError(t, gnomock.Stop(container), "failed to stop RabbitMQ") }) - - URI := fmt.Sprintf( - "amqp://%s:%s@%s", - "im", "im", - container.DefaultAddress(), - ) - conn, err := amqp.Dial(URI) - require.NoErrorf(t, err, "failed to connect to RabbitMQ", URI) + net, err := network.New(ctx) + require.NoError(err, "failed setting up Docker network") t.Cleanup(func() { - require.NoErrorf(t, conn.Close(), "failed to close connection to RabbitMQ") + require.NoError(net.Remove(ctx), "failed to remove the Docker network") }) - ch, err := conn.Channel() - require.NoErrorf(t, err, "failed to open channel to RabbitMQ") + rabbitMQContainer, err := NewRabbitMQ(ctx, WithNetwork(net.Name, "rabbitmq")) + require.NoError(err, "failed setting up RabbitMQ") t.Cleanup(func() { - require.NoErrorf(t, ch.Close(), "failed to close channel to RabbitMQ") + require.NoError(rabbitMQContainer.Terminate(ctx), "failed to terminate RabbitMQ") + }) + + URI, err := rabbitMQContainer.AMQPURI(ctx) + require.NoError(err, "failed to get RabbitMQ AMQP URI") + conn, err := amqpgo.Dial(URI) + require.NoError(err, "failed setting up AMQP connection") + channel, err := conn.Channel() + require.NoError(err, "failed setting up AMQP channel") + + return &AMQP{ + rabbitMQContainer: rabbitMQContainer, + conn: conn, + Channel: channel, + } +} + +// AMQP allows making requests to RabbitMQ. It does so by opening a connection and channel to +// RabbitMQ via the low-level github.com/rabbitmq/amqp091-go library. +type AMQP struct { + rabbitMQContainer *rabbitmqContainer + conn *amqpgo.Connection // Connection established with RabbitMQ + Channel *amqpgo.Channel // Channel established with RabbitMQ +} + +// URI is the AMQP URI going to RabbitMQ. +func (a *AMQP) URI(t *testing.T) string { + t.Helper() + + URI, err := a.rabbitMQContainer.AMQPURI(context.TODO()) + require.NoError(t, err, "failed to get RabbitMQ URI") + return URI +} + +// SetupRabbitStream creates a RabbitMQ container with a streaming environment ready to create a +// producer or consumer. +func SetupRabbitStream(t *testing.T) *Stream { + t.Helper() + require := require.New(t) + ctx := context.TODO() + + net, err := network.New(ctx) + require.NoError(err, "failed setting up Docker network") + t.Cleanup(func() { + require.NoError(net.Remove(ctx), "failed to remove the Docker network") + }) + + rabbitMQContainer, err := NewRabbitMQ(ctx, WithNetwork(net.Name, "rabbitmq"), WithStreamingExposed()) + require.NoError(err, "failed setting up RabbitMQ") + t.Cleanup(func() { + require.NoError(rabbitMQContainer.Terminate(ctx), "failed to terminate RabbitMQ") + }) + + URI, err := rabbitMQContainer.StreamURI(ctx) + require.NoError(err, "failed to get RabbitMQ stream URI") + env, err := stream.NewEnvironment( + stream.NewEnvironmentOptions(). + SetUri(URI)) + require.NoError(err, "failed to create new RabbitMQ stream environment") + + return &Stream{ + rabbitMQContainer: rabbitMQContainer, + Environment: env, + } +} + +// Stream allows making requests to RabbitMQ. It does so by opening a stream environment via +// github.com/rabbitmq/rabbitmq-stream-go-client. +type Stream struct { + rabbitMQContainer *rabbitmqContainer + Environment *stream.Environment +} + +// StreamURI is the stream URI going to RabbitMQ. +func (s *Stream) StreamURI(t *testing.T) string { + t.Helper() + + URI, err := s.rabbitMQContainer.StreamURI(context.TODO()) + require.NoError(t, err, "failed to get RabbitMQ stream URI") + return URI +} + +func (s *Stream) StreamPort(t *testing.T) string { + t.Helper() + + port, err := s.rabbitMQContainer.ExposedStreamPort(context.TODO()) + require.NoError(t, err, "failed to get RabbitMQ stream port") + return port +} + +type rabbitmqContainer struct { + testcontainers.Container + user string + pw string + network string + networkAlias string + internalPort string +} + +func (rc *rabbitmqContainer) AMQPURI(ctx context.Context) (string, error) { + ip, err := rc.Host(ctx) + if err != nil { + return "", err + } + port, err := rc.ExposedAMQPPort(ctx) + if err != nil { + return "", err + } + return amqpURI(rc.user, rc.pw, ip, port), nil +} + +func (rc *rabbitmqContainer) ExposedAMQPPort(ctx context.Context) (string, error) { + port, err := rc.MappedPort(ctx, nat.Port(natAMQPPort)) + if err != nil { + return "", err + } + return port.Port(), nil +} + +func (rc *rabbitmqContainer) StreamURI(ctx context.Context) (string, error) { + ip, err := rc.Host(ctx) + if err != nil { + return "", err + } + port, err := rc.ExposedStreamPort(ctx) + if err != nil { + return "", err + } + return streamURI(rc.user, rc.pw, ip, port), nil +} + +func (rc *rabbitmqContainer) ExposedStreamPort(ctx context.Context) (string, error) { + port, err := rc.MappedPort(ctx, nat.Port(natStreamPort)) + if err != nil { + return "", err + } + return port.Port(), nil +} + +type rabbitMQOptions struct { + network string + networkAlias string + exposeStreaming bool +} + +type rabbitMQOption func(*rabbitMQOptions) + +// WithNetwork connects the RabbitMQ container to a specific network and gives it an alias with +// which you can reach it on this network. +func WithNetwork(name, alias string) rabbitMQOption { + return func(options *rabbitMQOptions) { + options.network = name + options.networkAlias = alias + } +} + +// WithStreamingExposed exposes RabbitMQ streaming port 5552 to a fixed port of 5552. +func WithStreamingExposed() rabbitMQOption { + return func(options *rabbitMQOptions) { + options.exposeStreaming = true + } +} + +// NewRabbitMQ creates a RabbitMQ container. The container will be listening and ready to accept +// connections. Connect using default user and password rabbitmq or the credentials you provided via +// the options. +func NewRabbitMQ(ctx context.Context, options ...rabbitMQOption) (*rabbitmqContainer, error) { + opts := &rabbitMQOptions{} + for _, o := range options { + o(opts) + } + + user := "guest" + pw := "guest" + natPortMgmt := "15672/tcp" + exposedPorts := []string{natAMQPPort, natPortMgmt} + if opts.exposeStreaming { + exposedPorts = append(exposedPorts, fmt.Sprintf("%s:%s", streamPort, natStreamPort)) + } + req := testcontainers.ContainerRequest{ + Image: "bitnami/rabbitmq:3.13", + Env: map[string]string{ + "RABBITMQ_USERNAME": user, + "RABBITMQ_PASSWORD": pw, + "BITNAMI_DEBUG": "true", + "RABBITMQ_MANAGEMENT_ALLOW_WEB_ACCESS": "true", + "RABBITMQ_DISK_FREE_ABSOLUTE_LIMIT": "100MB", + "RABBITMQ_PLUGINS": "rabbitmq_management,rabbitmq_management_agent,rabbitmq_stream,rabbitmq_stream_management", + }, + ExposedPorts: exposedPorts, + Files: []testcontainers.ContainerFile{ + { + Reader: strings.NewReader(`SERVER_ADDITIONAL_ERL_ARGS="-rabbitmq_stream advertised_host localhost"`), + ContainerFilePath: "/etc/rabbitmq/rabbitmq-env.conf", + FileMode: 0o444, + }, + }, + WaitingFor: wait.ForLog("Time to start RabbitMQ").WithOccurrence(2), + } + if opts.network != "" { + req.Networks = []string{opts.network} + req.NetworkAliases = map[string][]string{ + opts.network: {opts.networkAlias}, + } + } + + container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ + ContainerRequest: req, + Started: true, }) + if err != nil { + return nil, err + } + + return &rabbitmqContainer{ + Container: container, + network: opts.network, + networkAlias: opts.networkAlias, + internalPort: amqpPort, + user: user, + pw: pw, + }, nil +} - return &amqpTestClient{Channel: ch, URI: URI} +func amqpURI(user, pw, ip, port string) string { + return fmt.Sprintf("amqp://%s:%s@%s:%s", user, pw, ip, port) } -type amqpTestClient struct { - Channel *amqp.Channel - URI string +func streamURI(user, pw, ip, port string) string { + return fmt.Sprintf("rabbitmq-stream://%s:%s@%s:%s", user, pw, ip, port) } diff --git a/scripts/instances/events.sh b/scripts/instances/events.sh new file mode 100755 index 00000000..adc1c5cc --- /dev/null +++ b/scripts/instances/events.sh @@ -0,0 +1,7 @@ +#!/usr/bin/env bash + +set -euo pipefail + +source ./auth.sh + +$HTTP get "$IM_HOST/events" Cookie:accessToken=$ACCESS_TOKEN diff --git a/skaffold.yaml b/skaffold.yaml index 82b72272..754d331a 100644 --- a/skaffold.yaml +++ b/skaffold.yaml @@ -33,8 +33,15 @@ deploy: createNamespace: true remoteChart: rabbitmq repo: https://charts.bitnami.com/bitnami - version: 12.4.1 + version: 14.4.2 useHelmSecrets: true + setValues: + extraPlugins: "rabbitmq_stream rabbitmq_stream_management" + service: + extraPorts: + - name: streams + port: 5552 + targetPort: 5552 valuesFiles: - helm/data/secrets/{{ .CLASSIFICATION }}/rabbitmq.yaml @@ -92,6 +99,7 @@ deploy: DATABASE_PORT: "5432" RABBITMQ_HOST: im-rabbitmq-{{ .ENVIRONMENT }}.instance-manager-{{ .CLASSIFICATION }}.svc RABBITMQ_PORT: "5672" + RABBITMQ_STREAM_PORT: "5552" S3_BUCKET: im-databases-{{ .CLASSIFICATION }} S3_REGION: eu-west-1 DEFAULT_TTL: "172800" # 48 hours diff --git a/swagger/swagger.yaml b/swagger/swagger.yaml index 0d1d10e5..1b8fc466 100644 --- a/swagger/swagger.yaml +++ b/swagger/swagger.yaml @@ -190,6 +190,18 @@ definitions: $ref: '#/definitions/DeploymentInstanceParameter' type: object x-go-package: github.com/dhis2-sre/im-manager/pkg/model + Event: + properties: + Data: {} + Event: + type: string + Id: + type: string + Retry: + format: uint64 + type: integer + type: object + x-go-package: github.com/gin-contrib/sse ExternalDownload: properties: databaseId: @@ -1078,6 +1090,24 @@ paths: security: - oauth2: [] summary: Delete deployment instance + /events: + get: + description: Stream events... + operationId: streamSSE + responses: + "200": + $ref: '#/responses/Stream' + "400": + $ref: '#/responses/Error' + "403": + $ref: '#/responses/Error' + "404": + $ref: '#/responses/Error' + "415": + $ref: '#/responses/Error' + security: + - oauth2: [] + summary: Stream events /groups: get: description: Find all groups by user @@ -1825,6 +1855,10 @@ responses: description: "" schema: $ref: '#/definitions/InstanceStatus' + Stream: + description: "" + schema: + $ref: '#/definitions/Event' Tokens: description: "" schema: From 366a2b77bd31efb6dae830122edef71d23d86485 Mon Sep 17 00:00:00 2001 From: teleivo Date: Wed, 19 Jun 2024 06:16:46 +0200 Subject: [PATCH 02/20] feat: add SSE streaming via /events --- pkg/event/handler.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/event/handler.go b/pkg/event/handler.go index 7a93461f..748b3a5a 100644 --- a/pkg/event/handler.go +++ b/pkg/event/handler.go @@ -74,6 +74,7 @@ func (h Handler) StreamEvents(c *gin.Context) { retry := computeRetry() logger = h.logger.With("retry", retry) + userGroups := userGroups(user) filter := stream.NewConsumerFilter(maps.Keys(userGroups), true, postFilter(logger, user.ID, userGroups)) opts := stream.NewConsumerOptions(). SetConsumerName(consumerName). From 3860b71834cada9aded71a39d5c5f002890489e6 Mon Sep 17 00:00:00 2001 From: teleivo Date: Wed, 19 Jun 2024 06:16:46 +0200 Subject: [PATCH 03/20] fix: return request ID on status 500 We originally used slogin but switched away from it due to some limitations. We forgot to get the request id from the context via our own function. It was thus not found in the context. --- go.mod | 3 +++ 1 file changed, 3 insertions(+) diff --git a/go.mod b/go.mod index 1b7db9f2..a348588d 100644 --- a/go.mod +++ b/go.mod @@ -124,6 +124,7 @@ require ( github.com/google/btree v1.1.2 // indirect github.com/google/gnostic-models v0.6.9-0.20230804172637-c7be7c783f49 // indirect github.com/google/gofuzz v1.2.0 // indirect + github.com/google/pprof v0.0.0-20230926050212-f7f687d19a98 // indirect github.com/google/s2a-go v0.1.7 // indirect github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect @@ -183,6 +184,8 @@ require ( github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/nxadm/tail v1.4.8 // indirect github.com/oklog/ulid v1.3.1 // indirect + github.com/onsi/ginkgo/v2 v2.13.0 // indirect + github.com/onsi/gomega v1.28.0 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.1.0 // indirect github.com/pelletier/go-toml/v2 v2.2.2 // indirect From 9afce91f5e3167858d9387d616a6aca2f069d5c2 Mon Sep 17 00:00:00 2001 From: teleivo Date: Wed, 12 Jun 2024 14:22:10 +0200 Subject: [PATCH 04/20] feat: add SSE streaming via /events --- go.mod | 3 --- 1 file changed, 3 deletions(-) diff --git a/go.mod b/go.mod index a348588d..1b7db9f2 100644 --- a/go.mod +++ b/go.mod @@ -124,7 +124,6 @@ require ( github.com/google/btree v1.1.2 // indirect github.com/google/gnostic-models v0.6.9-0.20230804172637-c7be7c783f49 // indirect github.com/google/gofuzz v1.2.0 // indirect - github.com/google/pprof v0.0.0-20230926050212-f7f687d19a98 // indirect github.com/google/s2a-go v0.1.7 // indirect github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510 // indirect github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect @@ -184,8 +183,6 @@ require ( github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/nxadm/tail v1.4.8 // indirect github.com/oklog/ulid v1.3.1 // indirect - github.com/onsi/ginkgo/v2 v2.13.0 // indirect - github.com/onsi/gomega v1.28.0 // indirect github.com/opencontainers/go-digest v1.0.0 // indirect github.com/opencontainers/image-spec v1.1.0 // indirect github.com/pelletier/go-toml/v2 v2.2.2 // indirect From 6c35d5398ccc3b53cd1e42e7a1e6a0c696e65eae Mon Sep 17 00:00:00 2001 From: Andreas Jensen Date: Thu, 20 Jun 2024 12:35:51 +0200 Subject: [PATCH 05/20] chore: fix bad merge --- pkg/event/handler.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/pkg/event/handler.go b/pkg/event/handler.go index 748b3a5a..6460d363 100644 --- a/pkg/event/handler.go +++ b/pkg/event/handler.go @@ -58,7 +58,7 @@ func (h Handler) StreamEvents(c *gin.Context) { consumerName := fmt.Sprintf("user-%d-%s", user.ID, uuid.NewString()) logger := h.logger.WithGroup("consumer").With("name", consumerName) - userGroups := userGroups(user) + userGroups := mapUserGroups(user) if len(userGroups) == 0 { _ = c.AbortWithError(403, errors.New("You cannot stream events as you are not part of a group. Ask an administrator for help.")) return @@ -74,7 +74,6 @@ func (h Handler) StreamEvents(c *gin.Context) { retry := computeRetry() logger = h.logger.With("retry", retry) - userGroups := userGroups(user) filter := stream.NewConsumerFilter(maps.Keys(userGroups), true, postFilter(logger, user.ID, userGroups)) opts := stream.NewConsumerOptions(). SetConsumerName(consumerName). @@ -139,7 +138,7 @@ func computeRetry() uint { return base + rand.UintN(maxJitter) //nolint:gosec } -func userGroups(user *model.User) map[string]struct{} { +func mapUserGroups(user *model.User) map[string]struct{} { result := make(map[string]struct{}, len(user.Groups)) for _, group := range user.Groups { result[group.Name] = struct{}{} From faca1926a795a34bcac6489d22bca4e96b38cc3c Mon Sep 17 00:00:00 2001 From: teleivo Date: Thu, 20 Jun 2024 15:45:14 +0200 Subject: [PATCH 06/20] chore: code review --- pkg/event/handler.go | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/pkg/event/handler.go b/pkg/event/handler.go index 6460d363..15670813 100644 --- a/pkg/event/handler.go +++ b/pkg/event/handler.go @@ -8,6 +8,7 @@ import ( "net/http" "strconv" + "github.com/dhis2-sre/im-manager/internal/errdef" "github.com/dhis2-sre/im-manager/internal/handler" "github.com/dhis2-sre/im-manager/pkg/model" "github.com/gin-contrib/sse" @@ -55,20 +56,20 @@ func (h Handler) StreamEvents(c *gin.Context) { return } - consumerName := fmt.Sprintf("user-%d-%s", user.ID, uuid.NewString()) - logger := h.logger.WithGroup("consumer").With("name", consumerName) - userGroups := mapUserGroups(user) if len(userGroups) == 0 { - _ = c.AbortWithError(403, errors.New("You cannot stream events as you are not part of a group. Ask an administrator for help.")) + _ = c.Error(errdef.NewForbidden("You cannot stream events as you are not part of a group. Ask an administrator for help.")) return } + consumerName := fmt.Sprintf("user-%d-%s", user.ID, uuid.NewString()) + logger := h.logger.WithGroup("consumer").With("name", consumerName) + // check offset to return 400 before any other header in case of an error offsetSpec, err := computeOffsetSpec(c) if err != nil { logger.Error("Failed to compute RabbitMQ offset spec", "error", err) - _ = c.AbortWithError(400, err) + _ = c.Error(err) return } retry := computeRetry() @@ -85,7 +86,7 @@ func (h Handler) StreamEvents(c *gin.Context) { consumer, err := ha.NewReliableConsumer(h.env, h.streamName, opts, messageHandler) if err != nil { logger.Error("Failed to create RabbitMQ consumer", "error", err) - _ = c.AbortWithError(500, err) + _ = c.Error(err) return } defer consumer.Close() @@ -121,10 +122,9 @@ func computeOffsetSpec(c *gin.Context) (stream.OffsetSpecification, error) { return stream.OffsetSpecification{}.Next(), nil } - // "Last-Event-ID" header is sent when SSE clients re-connect lastOffset, err := strconv.ParseInt(lastEventID, 10, 64) if err != nil { - return stream.OffsetSpecification{}, fmt.Errorf("invalid %q value: %v", "Last-Event-ID", err) + return stream.OffsetSpecification{}, errdef.NewBadRequest("invalid header %q value: %v", "Last-Event-ID", err) } return stream.OffsetSpecification{}.Offset(lastOffset + 1), nil @@ -175,7 +175,6 @@ func isUserMessageOwner(logger *slog.Logger, userID uint, applicationProperties if err != nil { logger.Error("Failed to parse RabbitMQ message application property to a uint", "messageProperty", "owner", "messagePropertyValue", owner, "error", err) return false - } return messageOwnerID == uint64(userID) From 6a3551bfe912302d6638d177824c325ffe53ff6f Mon Sep 17 00:00:00 2001 From: teleivo Date: Thu, 20 Jun 2024 16:14:16 +0200 Subject: [PATCH 07/20] chore: use lowercase error --- pkg/event/handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/event/handler.go b/pkg/event/handler.go index 15670813..122dbb8f 100644 --- a/pkg/event/handler.go +++ b/pkg/event/handler.go @@ -58,7 +58,7 @@ func (h Handler) StreamEvents(c *gin.Context) { userGroups := mapUserGroups(user) if len(userGroups) == 0 { - _ = c.Error(errdef.NewForbidden("You cannot stream events as you are not part of a group. Ask an administrator for help.")) + _ = c.Error(errdef.NewForbidden("you cannot stream events as you are not part of a group. Ask an administrator for help.")) return } From 0cee322129e65e4da80b6d75ecfd0e90067e1dca Mon Sep 17 00:00:00 2001 From: teleivo Date: Thu, 20 Jun 2024 16:18:47 +0200 Subject: [PATCH 08/20] chore: only support named events --- pkg/event/handler.go | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/pkg/event/handler.go b/pkg/event/handler.go index 122dbb8f..31e060cf 100644 --- a/pkg/event/handler.go +++ b/pkg/event/handler.go @@ -248,22 +248,19 @@ func mapMessageToEvent(retry uint, offset int64, message *amqp.Message) (sse.Eve return sse.Event{}, errors.New("received no data") } - var event string - if kindProperty, ok := message.ApplicationProperties["kind"]; ok { - if kind, ok := kindProperty.(string); ok { - event = kind - } else { - return sse.Event{}, fmt.Errorf("type assertion of RabbitMQ message application property %q failed, value=%v", "type", kindProperty) - } + kindProperty, ok := message.ApplicationProperties["kind"] + if !ok { + return sse.Event{}, errors.New(`RabbitMQ message is missind application property "kind"`) + } + kind, ok := kindProperty.(string) + if !ok { + return sse.Event{}, fmt.Errorf("type assertion of RabbitMQ message application property %q failed, value=%v", "type", kindProperty) } - sseEvent := sse.Event{ + return sse.Event{ Id: strconv.FormatInt(offset, 10), Data: string(message.Data[0]), Retry: retry, - } - if event != "" { // SSE named event - sseEvent.Event = event - } - return sseEvent, nil + Event: kind, + }, nil } From 290b1a0c3da93affc2b92f9439f17a04a13c8941 Mon Sep 17 00:00:00 2001 From: teleivo Date: Thu, 20 Jun 2024 16:36:52 +0200 Subject: [PATCH 09/20] chore: typo --- pkg/event/handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/event/handler.go b/pkg/event/handler.go index 31e060cf..99a7e6c4 100644 --- a/pkg/event/handler.go +++ b/pkg/event/handler.go @@ -250,7 +250,7 @@ func mapMessageToEvent(retry uint, offset int64, message *amqp.Message) (sse.Eve kindProperty, ok := message.ApplicationProperties["kind"] if !ok { - return sse.Event{}, errors.New(`RabbitMQ message is missind application property "kind"`) + return sse.Event{}, errors.New(`RabbitMQ message is missing application property "kind"`) } kind, ok := kindProperty.(string) if !ok { From 31bec2f1a7ebe13cdedf90bcf0769b5837f27744 Mon Sep 17 00:00:00 2001 From: teleivo Date: Thu, 20 Jun 2024 17:34:22 +0200 Subject: [PATCH 10/20] docs: add comment --- pkg/event/handler.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/event/handler.go b/pkg/event/handler.go index 99a7e6c4..f0b0e79d 100644 --- a/pkg/event/handler.go +++ b/pkg/event/handler.go @@ -135,6 +135,7 @@ func computeOffsetSpec(c *gin.Context) (stream.OffsetSpecification, error) { func computeRetry() uint { var base, maxJitter uint = 3_000, 1_001 // math rand v2 has the better API and is good enough for computing the jitter + // uses a half-open interval [0,n) so 1000ms+1ms return base + rand.UintN(maxJitter) //nolint:gosec } From b66e0f0465a1ac234bd26f8a3db52f2633f5a35f Mon Sep 17 00:00:00 2001 From: teleivo Date: Thu, 20 Jun 2024 17:56:54 +0200 Subject: [PATCH 11/20] chore: explain string(data) --- pkg/event/handler.go | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/pkg/event/handler.go b/pkg/event/handler.go index f0b0e79d..9bee0a39 100644 --- a/pkg/event/handler.go +++ b/pkg/event/handler.go @@ -258,9 +258,12 @@ func mapMessageToEvent(retry uint, offset int64, message *amqp.Message) (sse.Eve return sse.Event{}, fmt.Errorf("type assertion of RabbitMQ message application property %q failed, value=%v", "type", kindProperty) } + // string() is needed as sse.Event will marshall using fmt.Sprint() which prints `[65]` for + // []byte{"A"} instead of `A` + data := string(message.Data[0]) return sse.Event{ Id: strconv.FormatInt(offset, 10), - Data: string(message.Data[0]), + Data: data, Retry: retry, Event: kind, }, nil From 029e6375e8ad712dd8601afdc32bfe43dda746f0 Mon Sep 17 00:00:00 2001 From: teleivo Date: Fri, 21 Jun 2024 05:35:00 +0200 Subject: [PATCH 12/20] chore: clear up data conversion from RabbitMQ to SSE --- pkg/event/event_integration_test.go | 8 ++++---- pkg/event/handler.go | 5 +++-- 2 files changed, 7 insertions(+), 6 deletions(-) diff --git a/pkg/event/event_integration_test.go b/pkg/event/event_integration_test.go index 79a1bcf2..0d6a88b7 100644 --- a/pkg/event/event_integration_test.go +++ b/pkg/event/event_integration_test.go @@ -230,7 +230,7 @@ func streamEvents(t *testing.T, ctx context.Context, client *inttest.HTTPClient, case "event": event.Event = fieldValue case "data": - event.Data = []byte(fieldValue) + event.Data = fieldValue gotData = true newlineCount++ } @@ -258,7 +258,7 @@ func streamEvents(t *testing.T, ctx context.Context, client *inttest.HTTPClient, type sseEvent struct { ID int64 Event string - Data []byte + Data string } // eventEmitter emits an event to RabbitMQ which can then be streamed via SSE from the event handler. @@ -308,8 +308,8 @@ func (es *eventEmitter) emit(t *testing.T, kind, group string, owner *model.User streamOffset := es.eventCount es.eventCount++ - data := []byte(strconv.FormatInt(es.eventCount, 10)) - message := amqp.NewMessage(data) + data := strconv.FormatInt(es.eventCount, 10) + message := amqp.NewMessage([]byte(data)) // set a publishing id for deduplication message.SetPublishingId(es.eventCount) // set properties used for filtering diff --git a/pkg/event/handler.go b/pkg/event/handler.go index 9bee0a39..1f23b889 100644 --- a/pkg/event/handler.go +++ b/pkg/event/handler.go @@ -258,8 +258,9 @@ func mapMessageToEvent(retry uint, offset int64, message *amqp.Message) (sse.Eve return sse.Event{}, fmt.Errorf("type assertion of RabbitMQ message application property %q failed, value=%v", "type", kindProperty) } - // string() is needed as sse.Event will marshall using fmt.Sprint() which prints `[65]` for - // []byte{"A"} instead of `A` + // text/event-stream is text based. Thus our data needs to be converted to a string. Gin + // sse.Event marshalls the Data field using fmt.Sprint which uses the default formatting verb %v + // which for a []byte would print `[65]` for []byte{"A"} instead of `A` data := string(message.Data[0]) return sse.Event{ Id: strconv.FormatInt(offset, 10), From b64ecb2a8e3daa5c2eccb775686594b577ffbd49 Mon Sep 17 00:00:00 2001 From: teleivo Date: Fri, 21 Jun 2024 08:35:33 +0200 Subject: [PATCH 13/20] chore: allow resuming with events.sh --- scripts/instances/events.sh | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/scripts/instances/events.sh b/scripts/instances/events.sh index adc1c5cc..8ab8163d 100755 --- a/scripts/instances/events.sh +++ b/scripts/instances/events.sh @@ -4,4 +4,10 @@ set -euo pipefail source ./auth.sh -$HTTP get "$IM_HOST/events" Cookie:accessToken=$ACCESS_TOKEN +LAST_EVENT_ID="${1:-}" + +if [[ -z $LAST_EVENT_ID ]]; then + $HTTP get "$IM_HOST/events" Cookie:accessToken="$ACCESS_TOKEN" +else + $HTTP get "$IM_HOST/events" Cookie:accessToken="$ACCESS_TOKEN" Last-Event-ID:"$LAST_EVENT_ID" +fi From 1cd4a8647bc84b4f262c9cf3b8824bfb03383e0e Mon Sep 17 00:00:00 2001 From: teleivo Date: Fri, 21 Jun 2024 09:38:07 +0200 Subject: [PATCH 14/20] chore: log user, request id by passing the gin.Context --- pkg/event/handler.go | 69 +++++++++++++++++++++------------------ pkg/event/handler_test.go | 7 +++- 2 files changed, 44 insertions(+), 32 deletions(-) diff --git a/pkg/event/handler.go b/pkg/event/handler.go index 1f23b889..29dc278c 100644 --- a/pkg/event/handler.go +++ b/pkg/event/handler.go @@ -62,30 +62,32 @@ func (h Handler) StreamEvents(c *gin.Context) { return } - consumerName := fmt.Sprintf("user-%d-%s", user.ID, uuid.NewString()) - logger := h.logger.WithGroup("consumer").With("name", consumerName) - // check offset to return 400 before any other header in case of an error offsetSpec, err := computeOffsetSpec(c) if err != nil { - logger.Error("Failed to compute RabbitMQ offset spec", "error", err) + h.logger.ErrorContext(c, "Failed to compute RabbitMQ offset spec", "error", err) _ = c.Error(err) return } + + consumerName := fmt.Sprintf("user-%d-%s", user.ID, uuid.NewString()) retry := computeRetry() - logger = h.logger.With("retry", retry) + logger := h.logger. + With("consumerName", consumerName). + With("consumerOffsetSpec", offsetSpec.String()). + With("sseRetry", retry) - filter := stream.NewConsumerFilter(maps.Keys(userGroups), true, postFilter(logger, user.ID, userGroups)) + filter := stream.NewConsumerFilter(maps.Keys(userGroups), true, postFilter(c, logger, user.ID, userGroups)) opts := stream.NewConsumerOptions(). SetConsumerName(consumerName). SetClientProvidedName(consumerName). SetManualCommit(). SetOffset(offsetSpec). SetFilter(filter) - sseEvents, messageHandler := createMessageHandler(c.Request.Context().Done(), logger, retry) + sseEvents, messageHandler := createMessageHandler(c, logger, retry) consumer, err := ha.NewReliableConsumer(h.env, h.streamName, opts, messageHandler) if err != nil { - logger.Error("Failed to create RabbitMQ consumer", "error", err) + logger.ErrorContext(c, "Failed to create RabbitMQ consumer", "error", err) _ = c.Error(err) return } @@ -97,12 +99,12 @@ func (h Handler) StreamEvents(c *gin.Context) { c.Writer.Header().Set("Transfer-Encoding", "chunked") c.Writer.WriteHeader(http.StatusOK) c.Writer.Flush() - logger.Info("Connection established for sending SSE events", "offsetSpec", offsetSpec) + logger.InfoContext(c, "Connection established for sending SSE events") for { select { case <-c.Request.Context().Done(): - logger.Info("Request canceled, returning from handler") + logger.InfoContext(c, "Request canceled, returning from /events handler") return case sseEvent := <-sseEvents: c.Render(-1, sseEvent) @@ -150,9 +152,17 @@ func mapUserGroups(user *model.User) map[string]struct{} { // postFilter is a RabbitMQ stream post filter that is applied client side. This is necessary as the // server side filter is probabilistic and can let false positives through. (see // https://www.rabbitmq.com/blog/2023/10/16/stream-filtering) The filter must be simple and fast. -func postFilter(logger *slog.Logger, userID uint, userGroupsMap map[string]struct{}) stream.PostFilter { +func postFilter(c *gin.Context, logger *slog.Logger, userID uint, userGroups map[string]struct{}) stream.PostFilter { return func(message *amqp.Message) bool { - return isUserMessageOwner(logger, userID, message.ApplicationProperties) && isUserPartOfMessageGroup(logger, userGroupsMap, message.ApplicationProperties) + isOwner, err := isUserMessageOwner(userID, message.ApplicationProperties) + if err != nil { + logger.ErrorContext(c, "Failed to post filter RabbitMQ message", "error", err, "applicationProperties", message.ApplicationProperties) + } + isInGroup, err := isUserPartOfMessageGroup(userGroups, message.ApplicationProperties) + if err != nil { + logger.ErrorContext(c, "Failed to post filter RabbitMQ message", "error", err, "applicationProperties", message.ApplicationProperties) + } + return isOwner && isInGroup } } @@ -160,45 +170,42 @@ func postFilter(logger *slog.Logger, userID uint, userGroupsMap map[string]struc // considers the "owner" property of a message. Messages that have no owner can be read by the user. // Messages that have an owner can only be read by the user if the "owner" property value can be // parsed and matches the userID. -func isUserMessageOwner(logger *slog.Logger, userID uint, applicationProperties map[string]any) bool { +func isUserMessageOwner(userID uint, applicationProperties map[string]any) (bool, error) { owner, ok := applicationProperties["owner"] if !ok { - return true + return true, nil } messageOwner, ok := owner.(string) if !ok { - logger.Error("Failed to type assert RabbitMQ message application property to a string", "messageProperty", "owner", "messagePropertyValue", owner) - return false + return false, errors.New(`failed to type assert RabbitMQ message application property "owner" to a string`) } messageOwnerID, err := strconv.ParseUint(messageOwner, 10, 64) if err != nil { - logger.Error("Failed to parse RabbitMQ message application property to a uint", "messageProperty", "owner", "messagePropertyValue", owner, "error", err) - return false + return false, fmt.Errorf("failed to parse RabbitMQ message application property \"owner\" to a uint: %v", err) } - return messageOwnerID == uint64(userID) + return messageOwnerID == uint64(userID), nil } // isUserPartOfMessageGroup determines if the user is allowed to receive the message. This function // only considers the "group" property of a message. Messages that have no group can be read by the // user. Messages that have a group can only be read by the user if the "group" property value can // be parsed and matches one of the userGroupsMap. -func isUserPartOfMessageGroup(logger *slog.Logger, userGroups map[string]struct{}, applicationProperties map[string]any) bool { +func isUserPartOfMessageGroup(userGroups map[string]struct{}, applicationProperties map[string]any) (bool, error) { group, ok := applicationProperties["group"] if !ok { - return true + return true, nil } messageGroup, ok := group.(string) if !ok { - logger.Error("Failed to type assert RabbitMQ message application property to a string", "messageProperty", "group", "messagePropertyValue", group) - return false + return false, errors.New(`failed to type assert RabbitMQ message application property "group" to a string`) } _, ok = userGroups[messageGroup] - return ok + return ok, nil } // createMessageHandler returns stream.MessagesHandler that will transform RabbitMQ messages of @@ -206,12 +213,12 @@ func isUserPartOfMessageGroup(logger *slog.Logger, userGroups map[string]struct{ // returned. This is to avoid race conditions when writing the data out to the HTTP response writer. // Only one Go routine should write to the HTTP response writer. The RabbitMQ stream client runs our // stream.MessagesHandler in a separate Go routine. -func createMessageHandler(done <-chan struct{}, logger *slog.Logger, retry uint) (<-chan sse.Event, stream.MessagesHandler) { +func createMessageHandler(c *gin.Context, logger *slog.Logger, retry uint) (<-chan sse.Event, stream.MessagesHandler) { out := make(chan sse.Event) return out, func(consumerContext stream.ConsumerContext, message *amqp.Message) { defer func() { if r := recover(); r != nil { - logger.Error("RabbitMQ message handler panicked", "recover", r) + logger.ErrorContext(c, "RabbitMQ message handler panicked", "recover", r) // We assume that we cannot recover from a panic in a message handler. We thus panic // again. We do want to log any panic to be notfied. panic(r) @@ -219,20 +226,20 @@ func createMessageHandler(done <-chan struct{}, logger *slog.Logger, retry uint) }() select { - case <-done: - logger.Info("Request canceled, returning from messageHandler") + case <-c.Request.Context().Done(): + logger.InfoContext(c, "Request canceled, returning from /events messageHandler") close(out) return default: sseEvent, err := mapMessageToEvent(retry, consumerContext.Consumer.GetOffset(), message) if err != nil { - logger.Error("Failed to map AMQP message", "error", err) + logger.ErrorContext(c, "Failed to map AMQP message", "error", err) return } select { - case <-done: - logger.Info("Request canceled, returning from messageHandler") + case <-c.Request.Context().Done(): + logger.InfoContext(c, "Request canceled, returning from messageHandler") close(out) return case out <- sseEvent: diff --git a/pkg/event/handler_test.go b/pkg/event/handler_test.go index b78d997e..80b7c550 100644 --- a/pkg/event/handler_test.go +++ b/pkg/event/handler_test.go @@ -2,16 +2,21 @@ package event import ( "log/slog" + "net/http/httptest" "os" "strconv" "testing" + "github.com/gin-gonic/gin" "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp" "github.com/stretchr/testify/assert" ) func TestEventPostFilter(t *testing.T) { logger := slog.New(slog.NewTextHandler(os.Stdout, nil)) + w := httptest.ResponseRecorder{} + c := gin.CreateTestContextOnly(&w, &gin.Engine{}) + var user1 uint = 1 user1GroupsMap := map[string]struct{}{ "group1": {}, @@ -117,7 +122,7 @@ func TestEventPostFilter(t *testing.T) { for name, test := range tests { t.Run(name, func(t *testing.T) { - got := postFilter(logger, test.userID, test.userGroupsMap)(&test.message) + got := postFilter(c, logger, test.userID, test.userGroupsMap)(&test.message) assert.Equal(t, test.want, got) }) From a431087ed4437b4359d0e1aeb480a563e5b40611 Mon Sep 17 00:00:00 2001 From: teleivo Date: Fri, 21 Jun 2024 10:50:15 +0200 Subject: [PATCH 15/20] chore: only use retention MaxAge of 1h --- cmd/serve/main.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cmd/serve/main.go b/cmd/serve/main.go index da33cd05..d5d3549d 100644 --- a/cmd/serve/main.go +++ b/cmd/serve/main.go @@ -196,8 +196,7 @@ func run() (err error) { err = env.DeclareStream(streamName, stream.NewStreamOptions(). SetMaxSegmentSizeBytes(stream.ByteCapacity{}.MB(1)). - SetMaxAge(1*time.Hour). - SetMaxLengthBytes(stream.ByteCapacity{}.GB(1))) + SetMaxAge(1*time.Hour)) if err != nil { return fmt.Errorf("failed to declare RabbitMQ stream %q: %v", streamName, err) } From 65b17a4135b89dc8e8a1462c1ea4862eaf97172c Mon Sep 17 00:00:00 2001 From: teleivo Date: Fri, 21 Jun 2024 11:42:54 +0200 Subject: [PATCH 16/20] chore: remove comment its not needed --- pkg/event/handler.go | 1 - 1 file changed, 1 deletion(-) diff --git a/pkg/event/handler.go b/pkg/event/handler.go index 29dc278c..e6beae24 100644 --- a/pkg/event/handler.go +++ b/pkg/event/handler.go @@ -62,7 +62,6 @@ func (h Handler) StreamEvents(c *gin.Context) { return } - // check offset to return 400 before any other header in case of an error offsetSpec, err := computeOffsetSpec(c) if err != nil { h.logger.ErrorContext(c, "Failed to compute RabbitMQ offset spec", "error", err) From 69a298094a0ed33ff593800261f2c85b560cabf5 Mon Sep 17 00:00:00 2001 From: teleivo Date: Fri, 21 Jun 2024 11:46:43 +0200 Subject: [PATCH 17/20] chore: fix comment --- pkg/event/handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/event/handler.go b/pkg/event/handler.go index e6beae24..ec0e2d38 100644 --- a/pkg/event/handler.go +++ b/pkg/event/handler.go @@ -191,7 +191,7 @@ func isUserMessageOwner(userID uint, applicationProperties map[string]any) (bool // isUserPartOfMessageGroup determines if the user is allowed to receive the message. This function // only considers the "group" property of a message. Messages that have no group can be read by the // user. Messages that have a group can only be read by the user if the "group" property value can -// be parsed and matches one of the userGroupsMap. +// be parsed and matches one of the userGroups. func isUserPartOfMessageGroup(userGroups map[string]struct{}, applicationProperties map[string]any) (bool, error) { group, ok := applicationProperties["group"] if !ok { From 1d78d85f67b4fc105fad9932125f796d6d219d02 Mon Sep 17 00:00:00 2001 From: teleivo Date: Fri, 21 Jun 2024 12:20:34 +0200 Subject: [PATCH 18/20] chore: every message needs to have a group so use matchUnfiltered as the consumer should not get messages that have no group --- pkg/event/handler.go | 16 ++++++++-------- pkg/event/handler_test.go | 22 +++++++++++----------- 2 files changed, 19 insertions(+), 19 deletions(-) diff --git a/pkg/event/handler.go b/pkg/event/handler.go index ec0e2d38..211e0dae 100644 --- a/pkg/event/handler.go +++ b/pkg/event/handler.go @@ -76,7 +76,7 @@ func (h Handler) StreamEvents(c *gin.Context) { With("consumerOffsetSpec", offsetSpec.String()). With("sseRetry", retry) - filter := stream.NewConsumerFilter(maps.Keys(userGroups), true, postFilter(c, logger, user.ID, userGroups)) + filter := stream.NewConsumerFilter(maps.Keys(userGroups), false, postFilter(c, logger, user.ID, userGroups)) opts := stream.NewConsumerOptions(). SetConsumerName(consumerName). SetClientProvidedName(consumerName). @@ -157,7 +157,7 @@ func postFilter(c *gin.Context, logger *slog.Logger, userID uint, userGroups map if err != nil { logger.ErrorContext(c, "Failed to post filter RabbitMQ message", "error", err, "applicationProperties", message.ApplicationProperties) } - isInGroup, err := isUserPartOfMessageGroup(userGroups, message.ApplicationProperties) + isInGroup, err := isInMessageGroup(userGroups, message.ApplicationProperties) if err != nil { logger.ErrorContext(c, "Failed to post filter RabbitMQ message", "error", err, "applicationProperties", message.ApplicationProperties) } @@ -188,14 +188,14 @@ func isUserMessageOwner(userID uint, applicationProperties map[string]any) (bool return messageOwnerID == uint64(userID), nil } -// isUserPartOfMessageGroup determines if the user is allowed to receive the message. This function -// only considers the "group" property of a message. Messages that have no group can be read by the -// user. Messages that have a group can only be read by the user if the "group" property value can -// be parsed and matches one of the userGroups. -func isUserPartOfMessageGroup(userGroups map[string]struct{}, applicationProperties map[string]any) (bool, error) { +// isInMessageGroup determines if the user is allowed to receive the message. This function only +// considers the "group" property of a message. Messages that have a group can only be read by the +// user if the "group" property value can be parsed and matches one of the userGroups. "group" is a +// required application property. +func isInMessageGroup(userGroups map[string]struct{}, applicationProperties map[string]any) (bool, error) { group, ok := applicationProperties["group"] if !ok { - return true, nil + return false, errors.New(`RabbitMQ message is missing application property "group"`) } messageGroup, ok := group.(string) diff --git a/pkg/event/handler_test.go b/pkg/event/handler_test.go index 80b7c550..0276aa26 100644 --- a/pkg/event/handler_test.go +++ b/pkg/event/handler_test.go @@ -32,12 +32,6 @@ func TestEventPostFilter(t *testing.T) { message amqp.Message want bool }{ - "MessageForAnyone": { - userID: user1, - userGroupsMap: user1GroupsMap, - message: amqp.Message{}, - want: true, - }, "MessageForGroupMatchingTheUsersGroup": { userID: user1, userGroupsMap: user1GroupsMap, @@ -46,21 +40,27 @@ func TestEventPostFilter(t *testing.T) { }, want: true, }, - "MessageForUserMatchingTheUser": { + "MessageForGroupAndUserMatchingTheUser": { userID: user1, userGroupsMap: user1GroupsMap, message: amqp.Message{ - ApplicationProperties: map[string]any{"owner": strconv.Itoa(int(user1))}, + ApplicationProperties: map[string]any{"group": "group4", "owner": strconv.Itoa(int(user1))}, }, want: true, }, - "MessageForGroupAndUserMatchingTheUser": { + "MessageForAnyone": { + userID: user1, + userGroupsMap: user1GroupsMap, + message: amqp.Message{}, + want: false, + }, + "MessageForUserMatchingTheUser": { userID: user1, userGroupsMap: user1GroupsMap, message: amqp.Message{ - ApplicationProperties: map[string]any{"group": "group4", "owner": strconv.Itoa(int(user1))}, + ApplicationProperties: map[string]any{"owner": strconv.Itoa(int(user1))}, }, - want: true, + want: false, }, "MessageForGroupNotMatchingTheUsersGroup": { userID: user1, From cf4a6df46af0da35f04314557b836b0103670424 Mon Sep 17 00:00:00 2001 From: teleivo Date: Fri, 21 Jun 2024 12:23:57 +0200 Subject: [PATCH 19/20] chore: return on error the predicates do return false on error but it looks odd that we do not return --- pkg/event/handler.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/event/handler.go b/pkg/event/handler.go index 211e0dae..743a173c 100644 --- a/pkg/event/handler.go +++ b/pkg/event/handler.go @@ -156,10 +156,12 @@ func postFilter(c *gin.Context, logger *slog.Logger, userID uint, userGroups map isOwner, err := isUserMessageOwner(userID, message.ApplicationProperties) if err != nil { logger.ErrorContext(c, "Failed to post filter RabbitMQ message", "error", err, "applicationProperties", message.ApplicationProperties) + return false } isInGroup, err := isInMessageGroup(userGroups, message.ApplicationProperties) if err != nil { logger.ErrorContext(c, "Failed to post filter RabbitMQ message", "error", err, "applicationProperties", message.ApplicationProperties) + return false } return isOwner && isInGroup } From 4db85a3da92fb759ccfd6540e25bfcbff8aefe49 Mon Sep 17 00:00:00 2001 From: teleivo Date: Fri, 21 Jun 2024 12:53:24 +0200 Subject: [PATCH 20/20] chore: typo --- pkg/event/handler.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/event/handler.go b/pkg/event/handler.go index 743a173c..cdc0cddc 100644 --- a/pkg/event/handler.go +++ b/pkg/event/handler.go @@ -221,7 +221,7 @@ func createMessageHandler(c *gin.Context, logger *slog.Logger, retry uint) (<-ch if r := recover(); r != nil { logger.ErrorContext(c, "RabbitMQ message handler panicked", "recover", r) // We assume that we cannot recover from a panic in a message handler. We thus panic - // again. We do want to log any panic to be notfied. + // again. We do want to log any panic to be notified. panic(r) } }()