diff --git a/.env.example b/.env.example index 14d4fe0e..3a7ff00c 100644 --- a/.env.example +++ b/.env.example @@ -40,6 +40,7 @@ GIN_MODE=debug RABBITMQ_HOST=rabbitmq RABBITMQ_PORT=5672 +RABBITMQ_STREAM_PORT=5552 RABBITMQ_USERNAME=guest RABBITMQ_PASSWORD=guest diff --git a/cmd/serve/main.go b/cmd/serve/main.go index e1238c07..d5d3549d 100644 --- a/cmd/serve/main.go +++ b/cmd/serve/main.go @@ -27,8 +27,10 @@ import ( "fmt" "log/slog" "os" + "time" "github.com/go-mail/mail" + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream" "github.com/aws/aws-sdk-go-v2/aws" "github.com/aws/aws-sdk-go-v2/feature/s3/manager" @@ -36,6 +38,7 @@ import ( "github.com/dhis2-sre/im-manager/internal/log" "github.com/dhis2-sre/im-manager/internal/middleware" "github.com/dhis2-sre/im-manager/pkg/database" + "github.com/dhis2-sre/im-manager/pkg/event" "github.com/dhis2-sre/im-manager/pkg/group" "github.com/dhis2-sre/im-manager/pkg/model" "github.com/dhis2-sre/im-manager/pkg/token" @@ -72,7 +75,7 @@ func run() (err error) { logger := slog.New(log.New(slog.NewJSONHandler(os.Stdout, nil))) db, err := storage.NewDatabase(logger, cfg.Postgresql) if err != nil { - return err + return fmt.Errorf("failed to setup DB: %v", err) } userRepository := user.NewRepository(db) @@ -123,15 +126,18 @@ func run() (err error) { dockerHubClient := integration.NewDockerHubClient(cfg.DockerHub.Username, cfg.DockerHub.Password) - host := hostname() + hostname, err := os.Hostname() + if err != nil { + return fmt.Errorf("failed to get hostname: %v", err) + } consumer, err := rabbitmq.NewConsumer( - cfg.RabbitMqURL.GetUrl(), - rabbitmq.WithConnectionName(host), - rabbitmq.WithConsumerTagPrefix(host), + cfg.RabbitMqURL.GetURI(), + rabbitmq.WithConnectionName(hostname), + rabbitmq.WithConsumerTagPrefix(hostname), rabbitmq.WithLogger(logger.WithGroup("rabbitmq")), ) if err != nil { - return err + return fmt.Errorf("failed to setup RabbitMQ consumer: %v", err) } defer consumer.Close() @@ -153,7 +159,7 @@ func run() (err error) { s3Config, err := newS3Config(cfg.S3Region, s3Endpoint) if err != nil { - return err + return fmt.Errorf("failed to setup S3 config: %v", err) } s3AWSClient := s3.NewFromConfig(s3Config, func(o *s3.Options) { @@ -173,6 +179,29 @@ func run() (err error) { integrationHandler := integration.NewHandler(dockerHubClient, cfg.InstanceService.Host, cfg.DatabaseManagerService.Host) + logger.Info("Connecting with RabbitMQ stream client", "host", cfg.RabbitMqURL.Host, "port", cfg.RabbitMqURL.StreamPort) + env, err := stream.NewEnvironment( + stream.NewEnvironmentOptions(). + SetHost(cfg.RabbitMqURL.Host). + SetPort(cfg.RabbitMqURL.StreamPort). + SetUser(cfg.RabbitMqURL.Username). + SetPassword(cfg.RabbitMqURL.Password). + SetAddressResolver(stream.AddressResolver{Host: cfg.RabbitMqURL.Host, Port: cfg.RabbitMqURL.StreamPort}), + ) + if err != nil { + return fmt.Errorf("failed to connect with RabbitMQ stream client: %v", err) + } + logger.Info("Connected with RabbitMQ stream client", "host", cfg.RabbitMqURL.Host, "port", cfg.RabbitMqURL.StreamPort) + streamName := "events" + err = env.DeclareStream(streamName, + stream.NewStreamOptions(). + SetMaxSegmentSizeBytes(stream.ByteCapacity{}.MB(1)). + SetMaxAge(1*time.Hour)) + if err != nil { + return fmt.Errorf("failed to declare RabbitMQ stream %q: %v", streamName, err) + } + eventHandler := event.NewHandler(logger, env, streamName) + err = user.CreateUser(cfg.AdminUser.Email, cfg.AdminUser.Password, userService, groupService, model.AdministratorGroupName, "admin") if err != nil { return err @@ -201,6 +230,7 @@ func run() (err error) { integration.Routes(r, authentication, integrationHandler) database.Routes(r, authentication.TokenAuthentication, databaseHandler) instance.Routes(r, authentication.TokenAuthentication, instanceHandler) + event.Routes(r, authentication.TokenAuthentication, eventHandler) logger.Info("Listening and serving HTTP") if err := r.Run(); err != nil { @@ -209,14 +239,6 @@ func run() (err error) { return nil } -func hostname() string { - hostname, err := os.Hostname() - if err != nil { - return "im-manager" - } - return hostname -} - type groupService interface { FindOrCreate(name string, hostname string, deployable bool) (*model.Group, error) } diff --git a/docker-compose.yml b/docker-compose.yml index 378f944e..6f399e09 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -75,10 +75,16 @@ services: - prod rabbitmq: - image: rabbitmq:3-management-alpine + image: bitnami/rabbitmq:3.13 ports: - - "5672:5672" - - "15672:15672" + - "127.0.0.1:5672:5672" # regular AMQP + - "127.0.0.1:15672:15672" # management UI + environment: + RABBITMQ_USERNAME: ${RABBITMQ_USERNAME} + RABBITMQ_PASSWORD: ${RABBITMQ_PASSWORD} + RABBITMQ_MANAGEMENT_ALLOW_WEB_ACCESS: true + RABBITMQ_DISK_FREE_ABSOLUTE_LIMIT: "100MB" + RABBITMQ_PLUGINS: "rabbitmq_management,rabbitmq_management_agent,rabbitmq_stream,rabbitmq_stream_management" healthcheck: # https://www.rabbitmq.com/docs/monitoring#stage-3 test: rabbitmq-diagnostics -q check_running && rabbitmq-diagnostics -q check_local_alarms diff --git a/go.mod b/go.mod index 63fb667d..1b7db9f2 100644 --- a/go.mod +++ b/go.mod @@ -10,8 +10,10 @@ require ( github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.17.1 github.com/aws/aws-sdk-go-v2/service/s3 v1.56.1 github.com/dhis2-sre/rabbitmq-client v0.3.1 + github.com/docker/go-connections v0.5.0 github.com/dominikbraun/graph v0.23.0 github.com/gin-contrib/cors v1.7.2 + github.com/gin-contrib/sse v0.1.0 github.com/gin-gonic/gin v1.10.0 github.com/go-mail/mail v2.3.1+incompatible github.com/go-openapi/runtime v0.28.0 @@ -25,7 +27,9 @@ require ( github.com/orandin/slog-gorm v1.3.2 github.com/orlangure/gnomock v0.30.0 github.com/rabbitmq/amqp091-go v1.10.0 + github.com/rabbitmq/rabbitmq-stream-go-client v1.4.4 github.com/stretchr/testify v1.9.0 + github.com/testcontainers/testcontainers-go v0.30.0 go.mozilla.org/sops/v3 v3.7.3 golang.org/x/crypto v0.24.0 golang.org/x/exp v0.0.0-20231108232855-2478ac86f678 @@ -39,7 +43,9 @@ require ( require ( cloud.google.com/go/compute v1.24.0 // indirect cloud.google.com/go/compute/metadata v0.2.3 // indirect + dario.cat/mergo v1.0.0 // indirect github.com/Azure/azure-sdk-for-go v63.3.0+incompatible // indirect + github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 // indirect github.com/Azure/go-autorest v14.2.0+incompatible // indirect github.com/Azure/go-autorest/autorest v0.11.26 // indirect github.com/Azure/go-autorest/autorest/adal v0.9.18 // indirect @@ -51,6 +57,7 @@ require ( github.com/Azure/go-autorest/logger v0.2.1 // indirect github.com/Azure/go-autorest/tracing v0.6.0 // indirect github.com/Microsoft/go-winio v0.6.1 // indirect + github.com/Microsoft/hcsshim v0.11.4 // indirect github.com/ProtonMail/go-crypto v0.0.0-20230923063757-afb1ddc0824c // indirect github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 // indirect github.com/aws/aws-sdk-go v1.47.13 // indirect @@ -73,26 +80,29 @@ require ( github.com/bytedance/sonic v1.11.6 // indirect github.com/bytedance/sonic/loader v0.1.1 // indirect github.com/cenkalti/backoff/v3 v3.2.2 // indirect + github.com/cenkalti/backoff/v4 v4.3.0 // indirect github.com/cloudflare/circl v1.3.7 // indirect github.com/cloudwego/base64x v0.1.4 // indirect github.com/cloudwego/iasm v0.2.0 // indirect + github.com/containerd/containerd v1.7.12 // indirect + github.com/containerd/log v0.1.0 // indirect + github.com/cpuguy83/dockercfg v0.3.1 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect github.com/dimchansky/utfbom v1.1.1 // indirect github.com/distribution/reference v0.5.0 // indirect github.com/docker/docker v25.0.5+incompatible // indirect - github.com/docker/go-connections v0.5.0 // indirect github.com/docker/go-units v0.5.0 // indirect github.com/emicklei/go-restful/v3 v3.11.0 // indirect github.com/evanphx/json-patch v5.7.0+incompatible // indirect github.com/fatih/color v1.16.0 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect github.com/gabriel-vasile/mimetype v1.4.3 // indirect - github.com/gin-contrib/sse v0.1.0 // indirect github.com/go-errors/errors v1.5.1 // indirect github.com/go-jose/go-jose/v3 v3.0.3 // indirect github.com/go-logr/logr v1.4.1 // indirect github.com/go-logr/stdr v1.2.2 // indirect + github.com/go-ole/go-ole v1.3.0 // indirect github.com/go-openapi/analysis v0.23.0 // indirect github.com/go-openapi/errors v0.22.0 // indirect github.com/go-openapi/jsonpointer v0.21.0 // indirect @@ -110,6 +120,7 @@ require ( github.com/golang-jwt/jwt/v4 v4.3.0 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.4 // indirect + github.com/golang/snappy v0.0.4 // indirect github.com/google/btree v1.1.2 // indirect github.com/google/gnostic-models v0.6.9-0.20230804172637-c7be7c783f49 // indirect github.com/google/gofuzz v1.2.0 // indirect @@ -143,6 +154,7 @@ require ( github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/json-iterator/go v1.1.12 // indirect + github.com/klauspost/compress v1.17.8 // indirect github.com/klauspost/cpuid/v2 v2.2.7 // indirect github.com/leodido/go-urn v1.4.0 // indirect github.com/lestrrat-go/blackmagic v1.0.2 // indirect @@ -151,16 +163,23 @@ require ( github.com/lestrrat-go/iter v1.0.2 // indirect github.com/lestrrat-go/option v1.0.1 // indirect github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de // indirect + github.com/lufia/plan9stats v0.0.0-20231016141302-07b5767bb0ed // indirect + github.com/magiconair/properties v1.8.7 // indirect github.com/mailru/easyjson v0.7.7 // indirect github.com/mattn/go-colorable v0.1.13 // indirect github.com/mattn/go-isatty v0.0.20 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect github.com/mitchellh/go-wordwrap v1.0.1 // indirect github.com/mitchellh/mapstructure v1.5.0 // indirect + github.com/moby/patternmatcher v0.6.0 // indirect github.com/moby/spdystream v0.2.0 // indirect + github.com/moby/sys/sequential v0.5.0 // indirect + github.com/moby/sys/user v0.1.0 // indirect + github.com/moby/term v0.5.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect github.com/monochromegane/go-gitignore v0.0.0-20200626010858-205db1a8cc00 // indirect + github.com/morikuni/aec v1.0.0 // indirect github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect github.com/nxadm/tail v1.4.8 // indirect github.com/oklog/ulid v1.3.1 // indirect @@ -168,17 +187,25 @@ require ( github.com/opencontainers/image-spec v1.1.0 // indirect github.com/pelletier/go-toml/v2 v2.2.2 // indirect github.com/peterbourgon/diskv v2.0.1+incompatible // indirect + github.com/pierrec/lz4 v2.6.1+incompatible // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b // indirect github.com/ryanuber/go-glob v1.0.0 // indirect github.com/segmentio/asm v1.2.0 // indirect + github.com/shirou/gopsutil/v3 v3.23.12 // indirect + github.com/shoenig/go-m1cpu v0.1.6 // indirect github.com/sirupsen/logrus v1.9.3 // indirect + github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/spf13/cobra v1.8.0 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/stretchr/objx v0.5.2 // indirect + github.com/tklauser/go-sysconf v0.3.12 // indirect + github.com/tklauser/numcpus v0.6.1 // indirect github.com/twitchyliquid64/golang-asm v0.15.1 // indirect github.com/ugorji/go/codec v1.2.12 // indirect github.com/xlab/treeprint v1.2.0 // indirect + github.com/yusufpapurcu/wmi v1.2.3 // indirect go.mongodb.org/mongo-driver v1.14.0 // indirect go.mozilla.org/gopgagent v0.0.0-20170926210634-4d7ea76ff71a // indirect go.opencensus.io v0.24.0 // indirect @@ -214,7 +241,6 @@ require ( gopkg.in/urfave/cli.v1 v1.20.0 // indirect gopkg.in/yaml.v2 v2.4.0 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect - gotest.tools/v3 v3.5.0 // indirect k8s.io/cli-runtime v0.28.3 // indirect k8s.io/klog/v2 v2.110.1 // indirect k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 // indirect diff --git a/go.sum b/go.sum index 25714f41..b0023425 100644 --- a/go.sum +++ b/go.sum @@ -32,6 +32,8 @@ dario.cat/mergo v1.0.0/go.mod h1:uNxQE+84aUszobStD9th8a29P2fMDhsBdgRYvZOxGmk= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= filippo.io/age v1.2.0 h1:vRDp7pUMaAJzXNIWJVAZnEf/Dyi4Vu4wI8S1LBzufhE= filippo.io/age v1.2.0/go.mod h1:JL9ew2lTN+Pyft4RiNGguFfOpewKwSHm5ayKD/A4004= +github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24 h1:bvDV9vkmnHYOMsOr4WLk+Vo07yKIzd94sVoIqshQ4bU= +github.com/AdaLogics/go-fuzz-headers v0.0.0-20230811130428-ced1acdcaa24/go.mod h1:8o94RPi1/7XTJvwPpRSzSUedZrtlirdB3r9Z20bi2f8= github.com/Azure/azure-sdk-for-go v63.3.0+incompatible h1:INepVujzUrmArRZjDLHbtER+FkvCoEwyRCXGqOlmDII= github.com/Azure/azure-sdk-for-go v63.3.0+incompatible/go.mod h1:9XXNKU+eRnpl9moKnB4QOLf1HestfXbmab5FXxiDBjc= github.com/Azure/go-ansiterm v0.0.0-20230124172434-306776ec8161 h1:L/gRVlceqvL25UVaW/CKtUDjefjrs0SPonmDGUVOYP0= @@ -169,8 +171,8 @@ github.com/cloudwego/iasm v0.2.0/go.mod h1:8rXZaNYT2n95jn+zTI1sDr+IgcD2GVs0nlbbQ github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGXZJjfX53e64911xZQV5JYwmTeXPW+k8Sc= github.com/containerd/containerd v1.7.12 h1:+KQsnv4VnzyxWcfO9mlxxELaoztsDEjOuCMPAuPqgU0= github.com/containerd/containerd v1.7.12/go.mod h1:/5OMpE1p0ylxtEUGY8kuCYkDRzJm9NO1TFMWjUpdevk= -github.com/containerd/continuity v0.2.2 h1:QSqfxcn8c+12slxwu00AtzXrsami0MJb/MQs9lOLHLA= -github.com/containerd/continuity v0.2.2/go.mod h1:pWygW9u7LtS1o4N/Tn0FoCFDIXZ7rxcMX7HX1Dmibvk= +github.com/containerd/continuity v0.4.2 h1:v3y/4Yz5jwnvqPKJJ+7Wf93fyWoCB3F5EclWG023MDM= +github.com/containerd/continuity v0.4.2/go.mod h1:F6PTNCKepoxEaXLQp3wDAjygEnImnZ/7o4JzpodfroQ= github.com/containerd/log v0.1.0 h1:TCJt7ioM2cr/tfR8GPbGf9/VRAX8D2B4PjzCpfX540I= github.com/containerd/log v0.1.0/go.mod h1:VRRf09a7mHDIRezVKTRCrOq78v577GXq3bSa3EhrzVo= github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk= @@ -185,6 +187,8 @@ github.com/cpuguy83/dockercfg v0.3.1/go.mod h1:sugsbF4//dDlL/i+S+rtpIWp+5h0BHJHf github.com/cpuguy83/go-md2man/v2 v2.0.0/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/creack/pty v1.1.9/go.mod h1:oKZEueFk5CKHvIhNR5MUki03XCEU+Q6VDXinZuGJ33E= +github.com/creack/pty v1.1.18 h1:n56/Zwd5o6whRC5PMGretI4IdRLlmBXYNjScPaBgsbY= +github.com/creack/pty v1.1.18/go.mod h1:MOBLtS5ELjhRRrroQr9kyvTxUAFNvYEK993ew/Vr4O4= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -228,9 +232,12 @@ github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4Nij github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k= +github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8= +github.com/frankban/quicktest v1.14.6/go.mod h1:4ptaffx2x8+WTWXmUCuVU6aPUX1/Mz7zb5vbUoiM6w0= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= -github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= +github.com/fsnotify/fsnotify v1.6.0 h1:n+5WquG0fcWoWp6xPWfHdbskMCQaFnG6PfBrh1Ky4HY= +github.com/fsnotify/fsnotify v1.6.0/go.mod h1:sl3t1tCWJFWoRz9R8WJCbQihKKwmorjAbSClcnxKAGw= github.com/gabriel-vasile/mimetype v1.4.3 h1:in2uUcidCuFcDKtdcBxlR0rJ1+fsokWf+uqxgUFjbI0= github.com/gabriel-vasile/mimetype v1.4.3/go.mod h1:d8uq/6HKRL6CGdk+aubisF/M5GcPfT7nKyLpA0lbSSk= github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04= @@ -264,6 +271,7 @@ github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-mail/mail v2.3.1+incompatible h1:UzNOn0k5lpfVtO31cK3hn6I4VEVGhe3lX8AJBAxXExM= github.com/go-mail/mail v2.3.1+incompatible/go.mod h1:VPWjmmNyRsWXQZHVHT3g0YbIINUkSmuKOiLIDkWbL6M= +github.com/go-ole/go-ole v1.2.6/go.mod h1:pprOEPIfldk/42T2oK7lQ4v4JSDwmV0As9GaiUsvbm0= github.com/go-ole/go-ole v1.3.0 h1:Dt6ye7+vXGIKZ7Xtk4s6/xVdGDQynvom7xCFEdWr6uE= github.com/go-ole/go-ole v1.3.0/go.mod h1:5LS6F96DhAwUc7C+1HLexzMXY1xGRSryjyPPKW6zv78= github.com/go-openapi/analysis v0.0.0-20180825180245-b006789cd277/go.mod h1:k70tL6pCuVxPJOHXQ+wIac1FUrvNkHolPie/cLEU6hI= @@ -392,6 +400,8 @@ github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaS github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/protobuf v1.5.4 h1:i7eJL8qZTpSEXOPTxNKhASYpMn+8e5Q6AdndVa1dWek= github.com/golang/protobuf v1.5.4/go.mod h1:lnTiLA8Wa4RWRcIUkrtSVa5nRhsEGBg48fD6rSs7xps= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.1.2 h1:xf4v41cLI2Z6FxbKm+8Bu+m8ifhj15JuZ9sa0jZCMUU= @@ -406,6 +416,7 @@ github.com/google/go-cmp v0.5.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.3/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= +github.com/google/go-cmp v0.5.6/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= @@ -419,8 +430,8 @@ github.com/google/pprof v0.0.0-20190515194954-54271f7e092f/go.mod h1:zfwlbNMJ+OI github.com/google/pprof v0.0.0-20191218002539-d4f498aebedc/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM= github.com/google/pprof v0.0.0-20200212024743-f11f1df84d12/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM= github.com/google/pprof v0.0.0-20200229191704-1ebb73c60ed3/go.mod h1:ZgVRPoUq/hfqzAqh7sHMqb3I9Rq5C59dIz2SbBwJ4eM= -github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1 h1:K6RDEckDVWvDI9JAJYCmNdQXq6neHJOYx3V6jnqNEec= -github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= +github.com/google/pprof v0.0.0-20230926050212-f7f687d19a98 h1:pUa4ghanp6q4IJHwE9RwLgmVFfReJN+KbQ8ExNEUUoQ= +github.com/google/pprof v0.0.0-20230926050212-f7f687d19a98/go.mod h1:czg5+yv1E0ZGTi6S6vVK1mke0fV+FaUhNGcd6VRS9Ik= github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI= github.com/google/s2a-go v0.1.7 h1:60BLSyTrOV4/haCDW4zb1guZItoSq8foHCXrAnjBo/o= github.com/google/s2a-go v0.1.7/go.mod h1:50CgR4k1jNlWBu4UfS4AcfhVe1r6pdZPygJ3R8F0Qdw= @@ -543,8 +554,8 @@ github.com/julienschmidt/httprouter v1.2.0/go.mod h1:SYymIcj16QtmaHHD7aYtjjsJG7V github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvWXihfKN4Q= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= -github.com/klauspost/compress v1.17.2 h1:RlWWUY/Dr4fL8qk9YG7DTZ7PDgME2V4csBXA8L/ixi4= -github.com/klauspost/compress v1.17.2/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0NAMnU= +github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/cpuid/v2 v2.2.7 h1:ZWSB3igEs+d0qvnxR/ZBzXVmxkgt8DdzP6m9pfuVLDM= github.com/klauspost/cpuid/v2 v2.2.7/go.mod h1:Lcz8mBdAVJIBVzewtcLocK12l3Y+JytZYpaMropDUws= @@ -579,6 +590,7 @@ github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de h1:9TO3cAIGXtEhnIaL+V+BEER86oLrvS+kWobKpbJuye0= github.com/liggitt/tabwriter v0.0.0-20181228230101-89fcab3d43de/go.mod h1:zAbeS9B/r2mtpb6U+EI2rYA5OAXxsYw6wTamcNW+zcE= +github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0/go.mod h1:zJYVVT2jmtg6P3p1VtQj7WsuWi/y4VnjVBn7F8KPB3I= github.com/lufia/plan9stats v0.0.0-20231016141302-07b5767bb0ed h1:036IscGBfJsFIgJQzlui7nK1Ncm0tp2ktmPj8xO4N/0= github.com/lufia/plan9stats v0.0.0-20231016141302-07b5767bb0ed/go.mod h1:ilwx/Dta8jXAgpFYFvSWEMwxmbWXyiUHkd5FwyKhb5k= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= @@ -658,18 +670,18 @@ github.com/onsi/ginkgo v1.10.1/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+ github.com/onsi/ginkgo v1.11.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.16.4 h1:29JGrr5oVBm5ulCWet69zQkzWipVXIol6ygQUe/EzNc= github.com/onsi/ginkgo v1.16.4/go.mod h1:dX+/inL/fNMqNlz0e9LfyB9TswhZpCVdJM/Z6Vvnwo0= -github.com/onsi/ginkgo/v2 v2.9.4 h1:xR7vG4IXt5RWx6FfIjyAtsoMAtnc3C/rFXBBd2AjZwE= -github.com/onsi/ginkgo/v2 v2.9.4/go.mod h1:gCQYp2Q+kSoIj7ykSVb9nskRSsR6PUj4AiLywzIhbKM= +github.com/onsi/ginkgo/v2 v2.13.0 h1:0jY9lJquiL8fcf3M4LAXN5aMlS/b2BV86HFFPCPMgE4= +github.com/onsi/ginkgo/v2 v2.13.0/go.mod h1:TE309ZR8s5FsKKpuB1YAQYBzCaAfUgatB/xlT/ETL/o= github.com/onsi/gomega v0.0.0-20170829124025-dcabb60a477c/go.mod h1:C1qb7wdrVGGVU+Z6iS04AVkA3Q65CEZX59MT0QO5uiA= github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= -github.com/onsi/gomega v1.27.6 h1:ENqfyGeS5AX/rlXDd/ETokDz93u0YufY1Pgxuy/PvWE= -github.com/onsi/gomega v1.27.6/go.mod h1:PIQNjfQwkP3aQAH7lf7j87O/5FiNr+ZR8+ipb+qQlhg= +github.com/onsi/gomega v1.28.0 h1:i2rg/p9n/UqIDAMFUJ6qIUUMcsqOuUHgbpbu235Vr1c= +github.com/onsi/gomega v1.28.0/go.mod h1:A1H2JE76sI14WIP57LMKj7FVfCHx3g3BcZVjJG8bjX8= github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= github.com/opencontainers/image-spec v1.1.0 h1:8SG7/vwALn54lVB/0yZ/MMwhFrPYtpEHQb2IpWsCzug= github.com/opencontainers/image-spec v1.1.0/go.mod h1:W4s4sFTMaBeK1BQLXbG4AdM2szdn85PY75RI83NrTrM= -github.com/opencontainers/runc v1.1.0 h1:O9+X96OcDjkmmZyfaG996kV7yq8HsoU2h1XRRQcefG8= -github.com/opencontainers/runc v1.1.0/go.mod h1:Tj1hFw6eFWp/o33uxGf5yF2BX5yz2Z6iptFpuvbbKqc= +github.com/opencontainers/runc v1.1.5 h1:L44KXEpKmfWDcS02aeGm8QNTFXTo2D+8MYGDIJ/GDEs= +github.com/opencontainers/runc v1.1.5/go.mod h1:1J5XiS+vdZ3wCyZybsuxXZWGrgSr8fFJHLXuG2PsnNg= github.com/orandin/slog-gorm v1.3.2 h1:C0lKDQPAx/pF+8K2HL7bdShPwOEJpPM0Bn80zTzxU1g= github.com/orandin/slog-gorm v1.3.2/go.mod h1:MoZ51+b7xE9lwGNPYEhxcUtRNrYzjdcKvA8QXQQGEPA= github.com/orlangure/gnomock v0.30.0 h1:WXq/3KTKRVYe9a3BXa5JMZCCrg2RwNAPB2bZHMxEntE= @@ -683,6 +695,8 @@ github.com/pelletier/go-toml/v2 v2.2.2 h1:aYUidT7k73Pcl9nb2gScu7NSrKCSHIDE89b3+6 github.com/pelletier/go-toml/v2 v2.2.2/go.mod h1:1t835xjRzz80PqgE6HHgN2JOsmgYu/h4qDAS4n929Rs= github.com/peterbourgon/diskv v2.0.1+incompatible h1:UBdAOUP5p4RWqPBg048CAvpKN+vxiaj6gdUUzhl4XmI= github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU= +github.com/pierrec/lz4 v2.6.1+incompatible h1:9UY3+iC23yxF0UfGaYrGplQ+79Rg+h/q9FV9ix19jjM= +github.com/pierrec/lz4 v2.6.1+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -690,6 +704,7 @@ github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINE github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI= +github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE= github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b h1:0LFwY6Q3gMACTjAbMZBjXAqTOzOwFaj2Ld6cjeQ7Rig= github.com/power-devops/perfstat v0.0.0-20221212215047-62379fc7944b/go.mod h1:OmDBASR4679mdNQnz2pUhc2G8CO2JrUAVFDRBDP/hJE= github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw= @@ -704,6 +719,8 @@ github.com/prometheus/procfs v0.0.0-20190507164030-5867b95ac084/go.mod h1:TjEm7z github.com/prometheus/tsdb v0.7.1/go.mod h1:qhTCs0VvXwvX/y3TZrWD7rabWM+ijKTux40TwIPHuXU= github.com/rabbitmq/amqp091-go v1.10.0 h1:STpn5XsHlHGcecLmMFCtg7mqq0RnD+zFr4uzukfVhBw= github.com/rabbitmq/amqp091-go v1.10.0/go.mod h1:Hy4jKW5kQART1u+JkDTF9YYOQUHXqMuhrgxOEeS7G4o= +github.com/rabbitmq/rabbitmq-stream-go-client v1.4.4 h1:fPyWyDvNF1jdO23H3PqRc5J9bFSvv1Fl9/oyu/AP31U= +github.com/rabbitmq/rabbitmq-stream-go-client v1.4.4/go.mod h1:siR4Lc4SKNnKtxHMzG6exzS6uJOm49Ncynglv7En5kI= github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6SoW27p1b0cqNHllgS5HIMJraePCO15w5zCzIWYg= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= @@ -723,6 +740,8 @@ github.com/shirou/gopsutil/v3 v3.23.12 h1:z90NtUkp3bMtmICZKpC4+WaknU1eXtp5vtbQ11 github.com/shirou/gopsutil/v3 v3.23.12/go.mod h1:1FrWgea594Jp7qmjHUUPlJDTPgcsb9mGnXDxavtikzM= github.com/shoenig/go-m1cpu v0.1.6 h1:nxdKQNcEB6vzgA2E2bvzKIYRuNj7XNJ4S/aRSwKzFtM= github.com/shoenig/go-m1cpu v0.1.6/go.mod h1:1JJMcUBvfNwpq05QDQVAnx3gUHr9IYF7GNg9SUEw2VQ= +github.com/shoenig/test v0.6.4 h1:kVTaSd7WLz5WZ2IaoM0RSzRsUD+m8wRR+5qvntpn4LU= +github.com/shoenig/test v0.6.4/go.mod h1:byHiCGXqrVaflBLAMq/srcZIHynQPQgeyvkvXnjqq0k= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= @@ -735,6 +754,8 @@ github.com/smartystreets/goconvey v1.7.2 h1:9RBaZCeXEQ3UselpuwUQHltGVXvdwm6cv1hg github.com/smartystreets/goconvey v1.7.2/go.mod h1:Vw0tHAZW6lzCRk3xgdin6fKYcG+G3Pg9vgXWeJpQFMM= github.com/soheilhy/cmux v0.1.4/go.mod h1:IM3LyeVVIOuxMH7sFAkER9+bJ4dT7Ms6E4xg4kGIyLM= github.com/spaolacci/murmur3 v0.0.0-20180118202830-f09979ecbc72/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= +github.com/spaolacci/murmur3 v1.1.0 h1:7c1g84S4BPRrfL5Xrdp6fOJ206sU9y293DDHaoy0bLI= +github.com/spaolacci/murmur3 v1.1.0/go.mod h1:JwIasOWyU6f++ZhiEuf87xNszmSA2myDM2Kzu9HwQUA= github.com/spf13/afero v1.1.2/go.mod h1:j4pytiNVoe2o6bmDsKpLACNPDBIoEAkihy7loJ1B0CQ= github.com/spf13/afero v1.2.2/go.mod h1:9ZxEEn6pIJ8Rxe320qSDBk6AsU0r9pR7Q4OcevTdifk= github.com/spf13/cast v1.3.0/go.mod h1:Qx5cxh0v+4UWYiBimWS+eyWzqEqokIECu5etghLkUJE= @@ -984,6 +1005,7 @@ golang.org/x/sys v0.0.0-20190606165138-5da285871e9c/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20190616124812-15dcb6c0061f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190624142023-c5567b49c5d0/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191002063906-3421d5a6bb1c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20191005200804-aed5e4c7ecf9/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= @@ -1001,9 +1023,11 @@ golang.org/x/sys v0.0.0-20200302150141-5c8b2ff67527/go.mod h1:h1NjWce9XRLGQEsW7w golang.org/x/sys v0.0.0-20200323222414-85ca7c5b95cd/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= +golang.org/x/sys v0.0.0-20201204225414-ed752295db88/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210423082822-04245dca01da/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210426230700-d19ff857e887/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210615035016-665e8c7367d1/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210616094352-59db8d763f22/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210927094055-39ccf1dd6fa6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220503163025-988cb79eb6c6/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= @@ -1011,11 +1035,14 @@ golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.1.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.2.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.3.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.8.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.11.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.21.0 h1:rF+pYz3DAGSQAxAu1CbC7catZg4ebC4UIeIhKxBZvws= golang.org/x/sys v0.21.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= @@ -1207,6 +1234,7 @@ gorm.io/driver/postgres v1.5.9 h1:DkegyItji119OlcaLjqN11kHoUgZ/j13E0jkJZgD6A8= gorm.io/driver/postgres v1.5.9/go.mod h1:DX3GReXH+3FPWGrrgffdvCk3DQ1dwDPdmbenSkweRGI= gorm.io/gorm v1.25.10 h1:dQpO+33KalOA+aFYGlK+EfxcI5MbO7EP2yYygwh9h+s= gorm.io/gorm v1.25.10/go.mod h1:hbnx/Oo0ChWMn1BIhpy1oYozzpM15i4YPuHDmfYtwg8= +gotest.tools v2.2.0+incompatible h1:VsBPFP1AI068pPrMxtb/S8Zkgf9xEmTLJjfM+P5UIEo= gotest.tools/v3 v3.5.0 h1:Ljk6PdHdOhAb5aDMWXjDLMMhph+BpztA4v1QdqEW2eY= gotest.tools/v3 v3.5.0/go.mod h1:isy3WKz7GK6uNw/sbHzfKBLvlvXwUyV06n6brMxxopU= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/pkg/config/config.go b/pkg/config/config.go index 71f6afb3..3ae77bc6 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -79,10 +79,11 @@ func New() Config { Password: requireEnv("SMTP_PASSWORD"), }, RabbitMqURL: rabbitmq{ - Host: requireEnv("RABBITMQ_HOST"), - Port: requireEnvAsInt("RABBITMQ_PORT"), - Username: requireEnv("RABBITMQ_USERNAME"), - Password: requireEnv("RABBITMQ_PASSWORD"), + Host: requireEnv("RABBITMQ_HOST"), + Port: requireEnvAsInt("RABBITMQ_PORT"), + StreamPort: requireEnvAsInt("RABBITMQ_STREAM_PORT"), + Username: requireEnv("RABBITMQ_USERNAME"), + Password: requireEnv("RABBITMQ_PASSWORD"), }, Redis: Redis{ Host: requireEnv("REDIS_HOST"), @@ -142,10 +143,11 @@ type Postgresql struct { } type rabbitmq struct { - Host string - Port int - Username string - Password string + Host string + Port int + StreamPort int + Username string + Password string } type smtp struct { @@ -155,7 +157,8 @@ type smtp struct { Password string } -func (r rabbitmq) GetUrl() string { +// GetURI returns the AMQP URI for RabbitMQ. +func (r rabbitmq) GetURI() string { return fmt.Sprintf("amqp://%s:%s@%s:%d/", r.Username, r.Password, r.Host, r.Port) } diff --git a/pkg/event/docs.go b/pkg/event/docs.go new file mode 100644 index 00000000..249d1aba --- /dev/null +++ b/pkg/event/docs.go @@ -0,0 +1,9 @@ +package event + +import "github.com/gin-contrib/sse" + +// swagger:response Stream +type _ struct { + // in: body + _ sse.Event +} diff --git a/pkg/event/event_integration_test.go b/pkg/event/event_integration_test.go new file mode 100644 index 00000000..0d6a88b7 --- /dev/null +++ b/pkg/event/event_integration_test.go @@ -0,0 +1,340 @@ +package event_test + +import ( + "bufio" + "context" + "errors" + "fmt" + "log/slog" + "net/http" + "os" + "strconv" + "strings" + "testing" + "time" + + "github.com/dhis2-sre/im-manager/pkg/event" + "github.com/dhis2-sre/im-manager/pkg/inttest" + "github.com/dhis2-sre/im-manager/pkg/model" + "github.com/gin-gonic/gin" + "github.com/google/uuid" + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp" + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/ha" + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/message" + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestEventHandler(t *testing.T) { + t.Parallel() + + db := inttest.SetupDB(t) + + sharedGroup := model.Group{ + Name: "group1", + Hostname: "hostname1", + } + err := db.Create(&sharedGroup).Error + require.NoError(t, err) + + user1 := &model.User{ + Email: "user1@dhis2.org", + EmailToken: uuid.New(), + Groups: []model.Group{ + sharedGroup, + }, + } + err = db.Create(user1).Error + require.NoError(t, err) + + groupExclusiveToUser2 := model.Group{ + Name: "group2", + Hostname: "hostname2", + } + err = db.Create(&groupExclusiveToUser2).Error + require.NoError(t, err) + + user2 := &model.User{ + Email: "user2@dhis2.org", + EmailToken: uuid.New(), + Groups: []model.Group{ + sharedGroup, + groupExclusiveToUser2, + }, + } + err = db.Create(user2).Error + require.NoError(t, err) + + rabbitmq := inttest.SetupRabbitStream(t) + streamName := "test-event-handler" + err = rabbitmq.Environment.DeclareStream(streamName, + stream.NewStreamOptions(). + SetMaxSegmentSizeBytes(stream.ByteCapacity{}.MB(1)). + SetMaxLengthBytes(stream.ByteCapacity{}.MB(20))) + require.NoError(t, err, "failed to declare RabbitMQ stream") + + logger := slog.New(slog.NewTextHandler(os.Stdout, nil)) + // this is only to allow testing using multiple users without bringing in all our auth stack + authenticator := func(ctx *gin.Context) { + userParam := ctx.Query("user") + userID, err := strconv.ParseUint(userParam, 10, 64) + require.NoErrorf(t, err, "failed to parse query param user=%q into user ID", userParam) + + if userID == uint64(user1.ID) { + ctx.Set("user", user1) + return + } else if userID == uint64(user2.ID) { + ctx.Set("user", user2) + return + } + + require.FailNow(t, "provide query param user=userID in the test") + } + client := inttest.SetupHTTPServer(t, func(engine *gin.Engine) { + eventHandler := event.NewHandler(logger, rabbitmq.Environment, streamName) + event.Routes(engine, authenticator, eventHandler) + }) + + eventEmitter := NewEventEmitter(t, rabbitmq.Environment, streamName) + defer eventEmitter.Close() + + t.Log("Sending messages before users are subscribed") + // users should only get the next published message after they subscribed so these messages + // should not be received by anyone + eventEmitter.emit(t, "db-update", sharedGroup.Name, user1) + eventEmitter.emit(t, "db-update", sharedGroup.Name, user2) + eventEmitter.emit(t, "db-update", sharedGroup.Name, nil) + eventEmitter.emit(t, "instance-update", sharedGroup.Name, nil) + + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Second) + defer cancel() + ctxUser1, cancelUser1 := context.WithCancelCause(ctx) + defer cancelUser1(nil) + user1Messages := streamEvents(t, ctxUser1, client, user1, nil) + + t.Log("Sending messages after user1 subscribed") + event5 := eventEmitter.emit(t, "db-update", sharedGroup.Name, user1) + eventEmitter.emit(t, "db-update", sharedGroup.Name, user2) + event7 := eventEmitter.emit(t, "db-update", sharedGroup.Name, nil) + event8 := eventEmitter.emit(t, "instance-update", sharedGroup.Name, nil) + + wantUser1Messages := []sseEvent{event5, event7, event8} + + t.Log("Waiting on messages for user1...") + var gotUser1Messages []sseEvent + for len(wantUser1Messages) != len(gotUser1Messages) { + select { + case <-ctx.Done(): + require.Fail(t, "Timed out waiting on messages.") + case msg := <-user1Messages: + gotUser1Messages = append(gotUser1Messages, msg) + } + } + require.EqualValuesf(t, wantUser1Messages, gotUser1Messages, "mismatch in expected messages for user %d", user1.ID) + + ctxUser2, cancelUser2 := context.WithTimeout(context.Background(), 50*time.Second) + defer cancelUser2() + user2Messages := streamEvents(t, ctxUser2, client, user2, nil) + + t.Log("Sending messages after user2 subscribed") + event9 := eventEmitter.emit(t, "db-update", sharedGroup.Name, user1) + event10 := eventEmitter.emit(t, "db-update", sharedGroup.Name, user2) + event11 := eventEmitter.emit(t, "db-update", sharedGroup.Name, nil) + event12 := eventEmitter.emit(t, "instance-update", groupExclusiveToUser2.Name, nil) + + wantUser1Messages = []sseEvent{event9, event11} + wantUser2Messages := []sseEvent{event10, event11, event12} + + t.Log("Waiting on messages for both users...") + gotUser1Messages = nil + var gotUser2Messages []sseEvent + for len(wantUser1Messages) != len(gotUser1Messages) || len(wantUser2Messages) != len(gotUser2Messages) { + select { + case <-ctxUser1.Done(): + case <-ctxUser2.Done(): + require.FailNow(t, "Timed out waiting on messages.") + case msg := <-user1Messages: + gotUser1Messages = append(gotUser1Messages, msg) + case msg := <-user2Messages: + gotUser2Messages = append(gotUser2Messages, msg) + } + } + require.EqualValuesf(t, wantUser1Messages, gotUser1Messages, "mismatch in expected messages for user %d", user1.ID) + require.EqualValuesf(t, wantUser2Messages, gotUser2Messages, "mismatch in expected messages for user %d", user2.ID) + + cancelUser1(errors.New("drop connection")) + <-user1Messages // wait for user1 to be unsubscribed before sending new messages to test Last-Event-ID + + t.Log("Sending messages after user1 dropped its connection") + event13 := eventEmitter.emit(t, "instance-update", groupExclusiveToUser2.Name, nil) + event14 := eventEmitter.emit(t, "db-update", sharedGroup.Name, nil) + event15 := eventEmitter.emit(t, "instance-update", sharedGroup.Name, nil) + + // When a SSE client disconnects it will send the HTTP header Last-Event-ID with the ID of the + // event it last received. We want to then send the event after that. + user1Messages = streamEvents(t, ctxUser2, client, user1, &event11.ID) + + wantUser1Messages = []sseEvent{event14, event15} + wantUser2Messages = []sseEvent{event13, event14, event15} + + t.Log("Waiting on messages for both users...") + gotUser1Messages, gotUser2Messages = nil, nil + for len(wantUser1Messages) != len(gotUser1Messages) || len(wantUser2Messages) != len(gotUser2Messages) { + select { + case <-ctxUser2.Done(): + require.FailNow(t, "Timed out waiting on messages.") + case msg := <-user1Messages: + gotUser1Messages = append(gotUser1Messages, msg) + case msg := <-user2Messages: + gotUser2Messages = append(gotUser2Messages, msg) + } + } + assert.EqualValuesf(t, wantUser1Messages, gotUser1Messages, "mismatch in expected messages for user %d", user1.ID) + assert.EqualValuesf(t, wantUser2Messages, gotUser2Messages, "mismatch in expected messages for user %d", user2.ID) +} + +func streamEvents(t *testing.T, ctx context.Context, client *inttest.HTTPClient, user *model.User, lastEventId *int64) <-chan sseEvent { + url := fmt.Sprintf("%s/events?user=%d", client.ServerURL, user.ID) + req, err := http.NewRequestWithContext(ctx, "GET", url, nil) + require.NoErrorf(t, err, "User %d failed to create request to stream from %q", user.ID, url) + + if lastEventId != nil { + req.Header.Add("Last-Event-ID", strconv.FormatInt(*lastEventId, 10)) + t.Logf("User %d starts to stream from %q from Last-Event-ID %d", user.ID, url, *lastEventId) + } else { + t.Logf("User %d starts to stream from %q from next event", user.ID, url) + } + + resp, err := client.Client.Do(req) + require.NoErrorf(t, err, "User %d failed to stream from %q", user.ID, url) + require.Equal(t, http.StatusOK, resp.StatusCode, "User %d failed to stream from %q", user.ID, url) + + out := make(chan sseEvent) + go func() { + defer resp.Body.Close() + + sc := bufio.NewScanner(resp.Body) + var event sseEvent + var gotData bool + var newlineCount int + for sc.Scan() { + line := sc.Text() + field, fieldValue, found := strings.Cut(line, ":") + if found { + switch field { + case "id": + id, err := strconv.ParseInt(fieldValue, 10, 64) + require.NoError(t, err, "failed to parse SSE event ID in line %q", line) + event.ID = id + case "event": + event.Event = fieldValue + case "data": + event.Data = fieldValue + gotData = true + newlineCount++ + } + } else if gotData { + newlineCount++ + } + + if newlineCount == 2 { // SSE event is done + out <- event + newlineCount = 0 + } + } + + close(out) + if !errors.Is(sc.Err(), context.Canceled) { + require.NoErrorf(t, sc.Err(), "error scanning event stream from %q for user %d", url, user.ID) + } + t.Logf("User %d stops to stream from %q due: %v", user.ID, url, context.Cause(ctx)) + }() + + return out +} + +// sseEvent is the struct we use to assert on received SSE events. +type sseEvent struct { + ID int64 + Event string + Data string +} + +// eventEmitter emits an event to RabbitMQ which can then be streamed via SSE from the event handler. +type eventEmitter struct { + producer *ha.ReliableProducer + eventCount int64 + eventPublished chan error +} + +func NewEventEmitter(t *testing.T, env *stream.Environment, streamName string) eventEmitter { + producerName := "test-event-handler" + eventPublished := make(chan error) + opts := stream.NewProducerOptions(). + SetProducerName(producerName). + SetClientProvidedName(producerName). + SetFilter( + // each message will get the group as filter key + stream.NewProducerFilter(func(message message.StreamMessage) string { + return fmt.Sprintf("%s", message.GetApplicationProperties()["group"]) + })) + producer, err := ha.NewReliableProducer(env, streamName, opts, func(messageStatus []*stream.ConfirmationStatus) { + go func() { + for _, msgStatus := range messageStatus { + if msgStatus.IsConfirmed() { + eventPublished <- nil + } else { + eventPublished <- fmt.Errorf("failed to publish message with publishing_id=%d", msgStatus.GetMessage().GetPublishingId()) + } + } + }() + }) + require.NoError(t, err, "failed to create RabbitMQ producer") + return eventEmitter{producer: producer, eventPublished: eventPublished} +} + +func (es *eventEmitter) Close() error { + return es.producer.Close() +} + +// emit emits an instance manager event and returns the SSE event we expect an eligible user to +// receive via the event handler. This is a blocking operation. An event has an event counter that +// is 1-indexed for human readability. This event counter is also used for deduplication in RabbitMQ +// by setting it as the publishing id. Since we assert on the SSE event clients should receive we +// also need to assert on the SSE id field. The event handler sets the id field to the RabbitMQ +// offset (0-indexed) so SSE clients can resume on re-connect. +func (es *eventEmitter) emit(t *testing.T, kind, group string, owner *model.User) sseEvent { + streamOffset := es.eventCount + es.eventCount++ + + data := strconv.FormatInt(es.eventCount, 10) + message := amqp.NewMessage([]byte(data)) + // set a publishing id for deduplication + message.SetPublishingId(es.eventCount) + // set properties used for filtering + message.ApplicationProperties = map[string]interface{}{"group": group} + if owner != nil { + message.ApplicationProperties["owner"] = strconv.Itoa(int(owner.ID)) + } + // set property that dictates the SSE event field + message.ApplicationProperties["kind"] = kind + err := es.producer.Send(message) + require.NoErrorf(t, err, "failed to send message to RabbitMQ stream of kind %q, group %q, user %v", kind, group, owner) + + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + select { + case <-ctx.Done(): + assert.FailNow(t, "Timed out waiting on sent message.") + case err := <-es.eventPublished: + require.NoError(t, err) + t.Logf("Sent event %d", es.eventCount) + return sseEvent{ + ID: streamOffset, + Event: kind, + Data: data, + } + } + return sseEvent{} +} diff --git a/pkg/event/handler.go b/pkg/event/handler.go new file mode 100644 index 00000000..cdc0cddc --- /dev/null +++ b/pkg/event/handler.go @@ -0,0 +1,279 @@ +package event + +import ( + "errors" + "fmt" + "log/slog" + "math/rand/v2" + "net/http" + "strconv" + + "github.com/dhis2-sre/im-manager/internal/errdef" + "github.com/dhis2-sre/im-manager/internal/handler" + "github.com/dhis2-sre/im-manager/pkg/model" + "github.com/gin-contrib/sse" + "github.com/gin-gonic/gin" + "github.com/google/uuid" + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp" + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/ha" + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream" + "golang.org/x/exp/maps" +) + +func NewHandler(logger *slog.Logger, env *stream.Environment, streamName string) Handler { + return Handler{ + logger: logger, + env: env, + streamName: streamName, + } +} + +type Handler struct { + logger *slog.Logger + env *stream.Environment + streamName string +} + +func (h Handler) StreamEvents(c *gin.Context) { + // swagger:route GET /events streamSSE + // + // Stream events + // + // Stream events... + // + // responses: + // 200: Stream + // 400: Error + // 403: Error + // 404: Error + // 415: Error + // + // security: + // oauth2: + user, err := handler.GetUserFromContext(c) + if err != nil { + _ = c.Error(err) + return + } + + userGroups := mapUserGroups(user) + if len(userGroups) == 0 { + _ = c.Error(errdef.NewForbidden("you cannot stream events as you are not part of a group. Ask an administrator for help.")) + return + } + + offsetSpec, err := computeOffsetSpec(c) + if err != nil { + h.logger.ErrorContext(c, "Failed to compute RabbitMQ offset spec", "error", err) + _ = c.Error(err) + return + } + + consumerName := fmt.Sprintf("user-%d-%s", user.ID, uuid.NewString()) + retry := computeRetry() + logger := h.logger. + With("consumerName", consumerName). + With("consumerOffsetSpec", offsetSpec.String()). + With("sseRetry", retry) + + filter := stream.NewConsumerFilter(maps.Keys(userGroups), false, postFilter(c, logger, user.ID, userGroups)) + opts := stream.NewConsumerOptions(). + SetConsumerName(consumerName). + SetClientProvidedName(consumerName). + SetManualCommit(). + SetOffset(offsetSpec). + SetFilter(filter) + sseEvents, messageHandler := createMessageHandler(c, logger, retry) + consumer, err := ha.NewReliableConsumer(h.env, h.streamName, opts, messageHandler) + if err != nil { + logger.ErrorContext(c, "Failed to create RabbitMQ consumer", "error", err) + _ = c.Error(err) + return + } + defer consumer.Close() + + c.Writer.Header().Set("Content-Type", "text/event-stream") + c.Writer.Header().Set("Cache-Control", "no-cache") + c.Writer.Header().Set("Connection", "keep-alive") + c.Writer.Header().Set("Transfer-Encoding", "chunked") + c.Writer.WriteHeader(http.StatusOK) + c.Writer.Flush() + logger.InfoContext(c, "Connection established for sending SSE events") + + for { + select { + case <-c.Request.Context().Done(): + logger.InfoContext(c, "Request canceled, returning from /events handler") + return + case sseEvent := <-sseEvents: + c.Render(-1, sseEvent) + c.Writer.Flush() + } + } +} + +// computeOffsetSpec computes the offset from which the SSE client will stream RabbitMQ messages +// from. By default clients will receive the next message that is published to the events stream. +// This means they will not receive "old" events. SSE clients send a "Last-Event-ID" HTTP header on +// re-connect. The "Last-Event-ID" value is a RabbitMQ offset we send in the SSE id field. Clients +// can thus resume where they left off. +func computeOffsetSpec(c *gin.Context) (stream.OffsetSpecification, error) { + lastEventID := c.GetHeader("Last-Event-ID") + if lastEventID == "" { + return stream.OffsetSpecification{}.Next(), nil + } + + lastOffset, err := strconv.ParseInt(lastEventID, 10, 64) + if err != nil { + return stream.OffsetSpecification{}, errdef.NewBadRequest("invalid header %q value: %v", "Last-Event-ID", err) + } + + return stream.OffsetSpecification{}.Offset(lastOffset + 1), nil +} + +// computeRetry computes the SSE computeRetry value in milliseconds. It is composed of a base of 3 seconds with an +// additional jitter of up to 1 second. +func computeRetry() uint { + var base, maxJitter uint = 3_000, 1_001 + // math rand v2 has the better API and is good enough for computing the jitter + // uses a half-open interval [0,n) so 1000ms+1ms + return base + rand.UintN(maxJitter) //nolint:gosec +} + +func mapUserGroups(user *model.User) map[string]struct{} { + result := make(map[string]struct{}, len(user.Groups)) + for _, group := range user.Groups { + result[group.Name] = struct{}{} + } + return result +} + +// postFilter is a RabbitMQ stream post filter that is applied client side. This is necessary as the +// server side filter is probabilistic and can let false positives through. (see +// https://www.rabbitmq.com/blog/2023/10/16/stream-filtering) The filter must be simple and fast. +func postFilter(c *gin.Context, logger *slog.Logger, userID uint, userGroups map[string]struct{}) stream.PostFilter { + return func(message *amqp.Message) bool { + isOwner, err := isUserMessageOwner(userID, message.ApplicationProperties) + if err != nil { + logger.ErrorContext(c, "Failed to post filter RabbitMQ message", "error", err, "applicationProperties", message.ApplicationProperties) + return false + } + isInGroup, err := isInMessageGroup(userGroups, message.ApplicationProperties) + if err != nil { + logger.ErrorContext(c, "Failed to post filter RabbitMQ message", "error", err, "applicationProperties", message.ApplicationProperties) + return false + } + return isOwner && isInGroup + } +} + +// isUserMessageOwner determines if the user is allowed to receive the message. This function only +// considers the "owner" property of a message. Messages that have no owner can be read by the user. +// Messages that have an owner can only be read by the user if the "owner" property value can be +// parsed and matches the userID. +func isUserMessageOwner(userID uint, applicationProperties map[string]any) (bool, error) { + owner, ok := applicationProperties["owner"] + if !ok { + return true, nil + } + + messageOwner, ok := owner.(string) + if !ok { + return false, errors.New(`failed to type assert RabbitMQ message application property "owner" to a string`) + } + + messageOwnerID, err := strconv.ParseUint(messageOwner, 10, 64) + if err != nil { + return false, fmt.Errorf("failed to parse RabbitMQ message application property \"owner\" to a uint: %v", err) + } + + return messageOwnerID == uint64(userID), nil +} + +// isInMessageGroup determines if the user is allowed to receive the message. This function only +// considers the "group" property of a message. Messages that have a group can only be read by the +// user if the "group" property value can be parsed and matches one of the userGroups. "group" is a +// required application property. +func isInMessageGroup(userGroups map[string]struct{}, applicationProperties map[string]any) (bool, error) { + group, ok := applicationProperties["group"] + if !ok { + return false, errors.New(`RabbitMQ message is missing application property "group"`) + } + + messageGroup, ok := group.(string) + if !ok { + return false, errors.New(`failed to type assert RabbitMQ message application property "group" to a string`) + } + + _, ok = userGroups[messageGroup] + return ok, nil +} + +// createMessageHandler returns stream.MessagesHandler that will transform RabbitMQ messages of +// instance manager events into SSE events. These SSE events are sent via the read-only channel +// returned. This is to avoid race conditions when writing the data out to the HTTP response writer. +// Only one Go routine should write to the HTTP response writer. The RabbitMQ stream client runs our +// stream.MessagesHandler in a separate Go routine. +func createMessageHandler(c *gin.Context, logger *slog.Logger, retry uint) (<-chan sse.Event, stream.MessagesHandler) { + out := make(chan sse.Event) + return out, func(consumerContext stream.ConsumerContext, message *amqp.Message) { + defer func() { + if r := recover(); r != nil { + logger.ErrorContext(c, "RabbitMQ message handler panicked", "recover", r) + // We assume that we cannot recover from a panic in a message handler. We thus panic + // again. We do want to log any panic to be notified. + panic(r) + } + }() + + select { + case <-c.Request.Context().Done(): + logger.InfoContext(c, "Request canceled, returning from /events messageHandler") + close(out) + return + default: + sseEvent, err := mapMessageToEvent(retry, consumerContext.Consumer.GetOffset(), message) + if err != nil { + logger.ErrorContext(c, "Failed to map AMQP message", "error", err) + return + } + + select { + case <-c.Request.Context().Done(): + logger.InfoContext(c, "Request canceled, returning from messageHandler") + close(out) + return + case out <- sseEvent: + } + } + } +} + +// mapMessageToEvent maps an AMQP message of an instance manager event to an SSE event. No error is +// returned if the message could be processed and an SSE event should be sent. Do not send an SSE +// event when an error is returned. +func mapMessageToEvent(retry uint, offset int64, message *amqp.Message) (sse.Event, error) { + if len(message.Data) == 0 { + return sse.Event{}, errors.New("received no data") + } + + kindProperty, ok := message.ApplicationProperties["kind"] + if !ok { + return sse.Event{}, errors.New(`RabbitMQ message is missing application property "kind"`) + } + kind, ok := kindProperty.(string) + if !ok { + return sse.Event{}, fmt.Errorf("type assertion of RabbitMQ message application property %q failed, value=%v", "type", kindProperty) + } + + // text/event-stream is text based. Thus our data needs to be converted to a string. Gin + // sse.Event marshalls the Data field using fmt.Sprint which uses the default formatting verb %v + // which for a []byte would print `[65]` for []byte{"A"} instead of `A` + data := string(message.Data[0]) + return sse.Event{ + Id: strconv.FormatInt(offset, 10), + Data: data, + Retry: retry, + Event: kind, + }, nil +} diff --git a/pkg/event/handler_test.go b/pkg/event/handler_test.go new file mode 100644 index 00000000..0276aa26 --- /dev/null +++ b/pkg/event/handler_test.go @@ -0,0 +1,130 @@ +package event + +import ( + "log/slog" + "net/http/httptest" + "os" + "strconv" + "testing" + + "github.com/gin-gonic/gin" + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/amqp" + "github.com/stretchr/testify/assert" +) + +func TestEventPostFilter(t *testing.T) { + logger := slog.New(slog.NewTextHandler(os.Stdout, nil)) + w := httptest.ResponseRecorder{} + c := gin.CreateTestContextOnly(&w, &gin.Engine{}) + + var user1 uint = 1 + user1GroupsMap := map[string]struct{}{ + "group1": {}, + "group3": {}, + "group4": {}, + "group7": {}, + } + var nonMatchingUserID uint = 17 + nonMatchingGroup := "group17" + tests := map[string]struct { + userID uint + userGroupsMap map[string]struct{} + message amqp.Message + want bool + }{ + "MessageForGroupMatchingTheUsersGroup": { + userID: user1, + userGroupsMap: user1GroupsMap, + message: amqp.Message{ + ApplicationProperties: map[string]any{"group": "group3"}, + }, + want: true, + }, + "MessageForGroupAndUserMatchingTheUser": { + userID: user1, + userGroupsMap: user1GroupsMap, + message: amqp.Message{ + ApplicationProperties: map[string]any{"group": "group4", "owner": strconv.Itoa(int(user1))}, + }, + want: true, + }, + "MessageForAnyone": { + userID: user1, + userGroupsMap: user1GroupsMap, + message: amqp.Message{}, + want: false, + }, + "MessageForUserMatchingTheUser": { + userID: user1, + userGroupsMap: user1GroupsMap, + message: amqp.Message{ + ApplicationProperties: map[string]any{"owner": strconv.Itoa(int(user1))}, + }, + want: false, + }, + "MessageForGroupNotMatchingTheUsersGroup": { + userID: user1, + userGroupsMap: user1GroupsMap, + message: amqp.Message{ + ApplicationProperties: map[string]any{"group": "group10"}, + }, + want: false, + }, + "MessageForGroupAndUserNotMatchingTheUsersID": { + userID: user1, + userGroupsMap: user1GroupsMap, + message: amqp.Message{ + ApplicationProperties: map[string]any{"group": "group7", "owner": strconv.Itoa(int(nonMatchingUserID))}, + }, + want: false, + }, + "MessageForGroupAndUserNotMatchingTheUsersGroup": { + userID: user1, + userGroupsMap: user1GroupsMap, + message: amqp.Message{ + ApplicationProperties: map[string]any{"group": nonMatchingGroup, "owner": strconv.Itoa(int(user1))}, + }, + want: false, + }, + "MessageForGroupAndUserNotMatchingTheUsersIDAndGroup": { + userID: user1, + userGroupsMap: user1GroupsMap, + message: amqp.Message{ + ApplicationProperties: map[string]any{"group": nonMatchingGroup, "owner": strconv.Itoa(int(nonMatchingUserID))}, + }, + want: false, + }, + "MessageForGroupAndUserWithNonStringOwner": { + userID: user1, + userGroupsMap: user1GroupsMap, + message: amqp.Message{ + ApplicationProperties: map[string]any{"group": nonMatchingGroup, "owner": user1}, + }, + want: false, + }, + "MessageForGroupAndUserWithNonUintOwner": { + userID: user1, + userGroupsMap: user1GroupsMap, + message: amqp.Message{ + ApplicationProperties: map[string]any{"group": nonMatchingGroup, "owner": "wrong"}, + }, + want: false, + }, + "MessageForGroupWithNonStringGroup": { + userID: user1, + userGroupsMap: user1GroupsMap, + message: amqp.Message{ + ApplicationProperties: map[string]any{"group": 1}, + }, + want: false, + }, + } + + for name, test := range tests { + t.Run(name, func(t *testing.T) { + got := postFilter(c, logger, test.userID, test.userGroupsMap)(&test.message) + + assert.Equal(t, test.want, got) + }) + } +} diff --git a/pkg/event/router.go b/pkg/event/router.go new file mode 100644 index 00000000..4aedbb4a --- /dev/null +++ b/pkg/event/router.go @@ -0,0 +1,11 @@ +package event + +import ( + "github.com/gin-gonic/gin" +) + +func Routes(r *gin.Engine, authenticator gin.HandlerFunc, handler Handler) { + router := r.Group("") + router.Use(authenticator) + router.GET("/events", handler.StreamEvents) +} diff --git a/pkg/instance/ttlDestroyConsumer_test.go b/pkg/instance/ttlDestroyConsumer_test.go index 6b33800d..4ff5318d 100644 --- a/pkg/instance/ttlDestroyConsumer_test.go +++ b/pkg/instance/ttlDestroyConsumer_test.go @@ -18,10 +18,10 @@ import ( func TestConsumeDeletesInstance(t *testing.T) { t.Parallel() - amqpClient := inttest.SetupRabbitMQ(t) + amqpClient := inttest.SetupRabbitMQAMQP(t) consumer, err := rabbitmq.NewConsumer( - amqpClient.URI, + amqpClient.URI(t), rabbitmq.WithConnectionName(t.Name()), rabbitmq.WithConsumerTagPrefix(t.Name()), ) diff --git a/pkg/inttest/rabbitmq.go b/pkg/inttest/rabbitmq.go index dfa24237..a3c10df7 100644 --- a/pkg/inttest/rabbitmq.go +++ b/pkg/inttest/rabbitmq.go @@ -1,48 +1,273 @@ +// Package inttest provides setup functions that create a RabbitMQ container. We are using the +// management image for RabbitMQ so you can debug and interact with tests using its admin panel. Use +// a debugger, adjust timeouts waiting for a message or add a time.Sleep and find the exposed +// management port to login to the UI. You will find it easier to debug if your test configures the +// consumers connection and or consumer tag prefix. package inttest import ( + "context" "fmt" + "strings" "testing" - "github.com/orlangure/gnomock" - "github.com/orlangure/gnomock/preset/rabbitmq" - amqp "github.com/rabbitmq/amqp091-go" + "github.com/docker/go-connections/nat" + amqpgo "github.com/rabbitmq/amqp091-go" + "github.com/rabbitmq/rabbitmq-stream-go-client/pkg/stream" "github.com/stretchr/testify/require" + "github.com/testcontainers/testcontainers-go" + "github.com/testcontainers/testcontainers-go/network" + "github.com/testcontainers/testcontainers-go/wait" ) -// SetupRabbitMQ creates a RabbitMQ container returning an AMQP client ready to send messages to it. -func SetupRabbitMQ(t *testing.T) *amqpTestClient { +const amqpPort = "5672" +const natAMQPPort = amqpPort + "/tcp" +const streamPort = "5552" +const natStreamPort = streamPort + "/tcp" + +// SetupRabbitMQAMQP creates a RabbitMQ with an AMQP client ready to send messages to it. +func SetupRabbitMQAMQP(t *testing.T, options ...rabbitMQOption) *AMQP { t.Helper() + require := require.New(t) + ctx := context.TODO() - container, err := gnomock.Start( - rabbitmq.Preset( - rabbitmq.WithUser("im", "im"), - ), - ) - require.NoError(t, err, "failed to start RabbitMQ") - t.Cleanup(func() { require.NoError(t, gnomock.Stop(container), "failed to stop RabbitMQ") }) - - URI := fmt.Sprintf( - "amqp://%s:%s@%s", - "im", "im", - container.DefaultAddress(), - ) - conn, err := amqp.Dial(URI) - require.NoErrorf(t, err, "failed to connect to RabbitMQ", URI) + net, err := network.New(ctx) + require.NoError(err, "failed setting up Docker network") t.Cleanup(func() { - require.NoErrorf(t, conn.Close(), "failed to close connection to RabbitMQ") + require.NoError(net.Remove(ctx), "failed to remove the Docker network") }) - ch, err := conn.Channel() - require.NoErrorf(t, err, "failed to open channel to RabbitMQ") + rabbitMQContainer, err := NewRabbitMQ(ctx, WithNetwork(net.Name, "rabbitmq")) + require.NoError(err, "failed setting up RabbitMQ") t.Cleanup(func() { - require.NoErrorf(t, ch.Close(), "failed to close channel to RabbitMQ") + require.NoError(rabbitMQContainer.Terminate(ctx), "failed to terminate RabbitMQ") + }) + + URI, err := rabbitMQContainer.AMQPURI(ctx) + require.NoError(err, "failed to get RabbitMQ AMQP URI") + conn, err := amqpgo.Dial(URI) + require.NoError(err, "failed setting up AMQP connection") + channel, err := conn.Channel() + require.NoError(err, "failed setting up AMQP channel") + + return &AMQP{ + rabbitMQContainer: rabbitMQContainer, + conn: conn, + Channel: channel, + } +} + +// AMQP allows making requests to RabbitMQ. It does so by opening a connection and channel to +// RabbitMQ via the low-level github.com/rabbitmq/amqp091-go library. +type AMQP struct { + rabbitMQContainer *rabbitmqContainer + conn *amqpgo.Connection // Connection established with RabbitMQ + Channel *amqpgo.Channel // Channel established with RabbitMQ +} + +// URI is the AMQP URI going to RabbitMQ. +func (a *AMQP) URI(t *testing.T) string { + t.Helper() + + URI, err := a.rabbitMQContainer.AMQPURI(context.TODO()) + require.NoError(t, err, "failed to get RabbitMQ URI") + return URI +} + +// SetupRabbitStream creates a RabbitMQ container with a streaming environment ready to create a +// producer or consumer. +func SetupRabbitStream(t *testing.T) *Stream { + t.Helper() + require := require.New(t) + ctx := context.TODO() + + net, err := network.New(ctx) + require.NoError(err, "failed setting up Docker network") + t.Cleanup(func() { + require.NoError(net.Remove(ctx), "failed to remove the Docker network") + }) + + rabbitMQContainer, err := NewRabbitMQ(ctx, WithNetwork(net.Name, "rabbitmq"), WithStreamingExposed()) + require.NoError(err, "failed setting up RabbitMQ") + t.Cleanup(func() { + require.NoError(rabbitMQContainer.Terminate(ctx), "failed to terminate RabbitMQ") + }) + + URI, err := rabbitMQContainer.StreamURI(ctx) + require.NoError(err, "failed to get RabbitMQ stream URI") + env, err := stream.NewEnvironment( + stream.NewEnvironmentOptions(). + SetUri(URI)) + require.NoError(err, "failed to create new RabbitMQ stream environment") + + return &Stream{ + rabbitMQContainer: rabbitMQContainer, + Environment: env, + } +} + +// Stream allows making requests to RabbitMQ. It does so by opening a stream environment via +// github.com/rabbitmq/rabbitmq-stream-go-client. +type Stream struct { + rabbitMQContainer *rabbitmqContainer + Environment *stream.Environment +} + +// StreamURI is the stream URI going to RabbitMQ. +func (s *Stream) StreamURI(t *testing.T) string { + t.Helper() + + URI, err := s.rabbitMQContainer.StreamURI(context.TODO()) + require.NoError(t, err, "failed to get RabbitMQ stream URI") + return URI +} + +func (s *Stream) StreamPort(t *testing.T) string { + t.Helper() + + port, err := s.rabbitMQContainer.ExposedStreamPort(context.TODO()) + require.NoError(t, err, "failed to get RabbitMQ stream port") + return port +} + +type rabbitmqContainer struct { + testcontainers.Container + user string + pw string + network string + networkAlias string + internalPort string +} + +func (rc *rabbitmqContainer) AMQPURI(ctx context.Context) (string, error) { + ip, err := rc.Host(ctx) + if err != nil { + return "", err + } + port, err := rc.ExposedAMQPPort(ctx) + if err != nil { + return "", err + } + return amqpURI(rc.user, rc.pw, ip, port), nil +} + +func (rc *rabbitmqContainer) ExposedAMQPPort(ctx context.Context) (string, error) { + port, err := rc.MappedPort(ctx, nat.Port(natAMQPPort)) + if err != nil { + return "", err + } + return port.Port(), nil +} + +func (rc *rabbitmqContainer) StreamURI(ctx context.Context) (string, error) { + ip, err := rc.Host(ctx) + if err != nil { + return "", err + } + port, err := rc.ExposedStreamPort(ctx) + if err != nil { + return "", err + } + return streamURI(rc.user, rc.pw, ip, port), nil +} + +func (rc *rabbitmqContainer) ExposedStreamPort(ctx context.Context) (string, error) { + port, err := rc.MappedPort(ctx, nat.Port(natStreamPort)) + if err != nil { + return "", err + } + return port.Port(), nil +} + +type rabbitMQOptions struct { + network string + networkAlias string + exposeStreaming bool +} + +type rabbitMQOption func(*rabbitMQOptions) + +// WithNetwork connects the RabbitMQ container to a specific network and gives it an alias with +// which you can reach it on this network. +func WithNetwork(name, alias string) rabbitMQOption { + return func(options *rabbitMQOptions) { + options.network = name + options.networkAlias = alias + } +} + +// WithStreamingExposed exposes RabbitMQ streaming port 5552 to a fixed port of 5552. +func WithStreamingExposed() rabbitMQOption { + return func(options *rabbitMQOptions) { + options.exposeStreaming = true + } +} + +// NewRabbitMQ creates a RabbitMQ container. The container will be listening and ready to accept +// connections. Connect using default user and password rabbitmq or the credentials you provided via +// the options. +func NewRabbitMQ(ctx context.Context, options ...rabbitMQOption) (*rabbitmqContainer, error) { + opts := &rabbitMQOptions{} + for _, o := range options { + o(opts) + } + + user := "guest" + pw := "guest" + natPortMgmt := "15672/tcp" + exposedPorts := []string{natAMQPPort, natPortMgmt} + if opts.exposeStreaming { + exposedPorts = append(exposedPorts, fmt.Sprintf("%s:%s", streamPort, natStreamPort)) + } + req := testcontainers.ContainerRequest{ + Image: "bitnami/rabbitmq:3.13", + Env: map[string]string{ + "RABBITMQ_USERNAME": user, + "RABBITMQ_PASSWORD": pw, + "BITNAMI_DEBUG": "true", + "RABBITMQ_MANAGEMENT_ALLOW_WEB_ACCESS": "true", + "RABBITMQ_DISK_FREE_ABSOLUTE_LIMIT": "100MB", + "RABBITMQ_PLUGINS": "rabbitmq_management,rabbitmq_management_agent,rabbitmq_stream,rabbitmq_stream_management", + }, + ExposedPorts: exposedPorts, + Files: []testcontainers.ContainerFile{ + { + Reader: strings.NewReader(`SERVER_ADDITIONAL_ERL_ARGS="-rabbitmq_stream advertised_host localhost"`), + ContainerFilePath: "/etc/rabbitmq/rabbitmq-env.conf", + FileMode: 0o444, + }, + }, + WaitingFor: wait.ForLog("Time to start RabbitMQ").WithOccurrence(2), + } + if opts.network != "" { + req.Networks = []string{opts.network} + req.NetworkAliases = map[string][]string{ + opts.network: {opts.networkAlias}, + } + } + + container, err := testcontainers.GenericContainer(ctx, testcontainers.GenericContainerRequest{ + ContainerRequest: req, + Started: true, }) + if err != nil { + return nil, err + } + + return &rabbitmqContainer{ + Container: container, + network: opts.network, + networkAlias: opts.networkAlias, + internalPort: amqpPort, + user: user, + pw: pw, + }, nil +} - return &amqpTestClient{Channel: ch, URI: URI} +func amqpURI(user, pw, ip, port string) string { + return fmt.Sprintf("amqp://%s:%s@%s:%s", user, pw, ip, port) } -type amqpTestClient struct { - Channel *amqp.Channel - URI string +func streamURI(user, pw, ip, port string) string { + return fmt.Sprintf("rabbitmq-stream://%s:%s@%s:%s", user, pw, ip, port) } diff --git a/scripts/instances/events.sh b/scripts/instances/events.sh new file mode 100755 index 00000000..8ab8163d --- /dev/null +++ b/scripts/instances/events.sh @@ -0,0 +1,13 @@ +#!/usr/bin/env bash + +set -euo pipefail + +source ./auth.sh + +LAST_EVENT_ID="${1:-}" + +if [[ -z $LAST_EVENT_ID ]]; then + $HTTP get "$IM_HOST/events" Cookie:accessToken="$ACCESS_TOKEN" +else + $HTTP get "$IM_HOST/events" Cookie:accessToken="$ACCESS_TOKEN" Last-Event-ID:"$LAST_EVENT_ID" +fi diff --git a/skaffold.yaml b/skaffold.yaml index 82b72272..754d331a 100644 --- a/skaffold.yaml +++ b/skaffold.yaml @@ -33,8 +33,15 @@ deploy: createNamespace: true remoteChart: rabbitmq repo: https://charts.bitnami.com/bitnami - version: 12.4.1 + version: 14.4.2 useHelmSecrets: true + setValues: + extraPlugins: "rabbitmq_stream rabbitmq_stream_management" + service: + extraPorts: + - name: streams + port: 5552 + targetPort: 5552 valuesFiles: - helm/data/secrets/{{ .CLASSIFICATION }}/rabbitmq.yaml @@ -92,6 +99,7 @@ deploy: DATABASE_PORT: "5432" RABBITMQ_HOST: im-rabbitmq-{{ .ENVIRONMENT }}.instance-manager-{{ .CLASSIFICATION }}.svc RABBITMQ_PORT: "5672" + RABBITMQ_STREAM_PORT: "5552" S3_BUCKET: im-databases-{{ .CLASSIFICATION }} S3_REGION: eu-west-1 DEFAULT_TTL: "172800" # 48 hours diff --git a/swagger/swagger.yaml b/swagger/swagger.yaml index 0d1d10e5..1b8fc466 100644 --- a/swagger/swagger.yaml +++ b/swagger/swagger.yaml @@ -190,6 +190,18 @@ definitions: $ref: '#/definitions/DeploymentInstanceParameter' type: object x-go-package: github.com/dhis2-sre/im-manager/pkg/model + Event: + properties: + Data: {} + Event: + type: string + Id: + type: string + Retry: + format: uint64 + type: integer + type: object + x-go-package: github.com/gin-contrib/sse ExternalDownload: properties: databaseId: @@ -1078,6 +1090,24 @@ paths: security: - oauth2: [] summary: Delete deployment instance + /events: + get: + description: Stream events... + operationId: streamSSE + responses: + "200": + $ref: '#/responses/Stream' + "400": + $ref: '#/responses/Error' + "403": + $ref: '#/responses/Error' + "404": + $ref: '#/responses/Error' + "415": + $ref: '#/responses/Error' + security: + - oauth2: [] + summary: Stream events /groups: get: description: Find all groups by user @@ -1825,6 +1855,10 @@ responses: description: "" schema: $ref: '#/definitions/InstanceStatus' + Stream: + description: "" + schema: + $ref: '#/definitions/Event' Tokens: description: "" schema: