From 2ceefdaaec4e461bb730608cd5e1e5e74a3d5008 Mon Sep 17 00:00:00 2001 From: Nour Balaha Date: Wed, 12 Jul 2023 12:32:21 +0900 Subject: [PATCH] feat(worker): add AWS support to decompress and webhook handlers (#729) * update DecompressHandler * wip: decompress handler * refactor DecompressHandler * fix webhook handler * fix: return err * refactor: decompress and webhook handlers * refactor: SNSMessageHeader * refactor request * removed extra line * refactor the handler * refactor 2 * refactor: DecompressHandler * add validation for message signature * add signature validation for webhook handler * refactor: decompress and webhook handlers * fix * refactor * use json.NewDecoder instead of c.Bind * fix: isGCP function --- go.work.sum | 5 ++ worker/go.mod | 1 + worker/go.sum | 2 + worker/internal/app/handler.go | 129 +++++++++++++++++++++++++++++---- 4 files changed, 122 insertions(+), 15 deletions(-) diff --git a/go.work.sum b/go.work.sum index 2260396dc4..035c2281e0 100644 --- a/go.work.sum +++ b/go.work.sum @@ -126,6 +126,7 @@ cloud.google.com/go/clouddms v1.4.0/go.mod h1:Eh7sUGCC+aKry14O1NRljhjyrr0NFC0G2c cloud.google.com/go/clouddms v1.5.0 h1:E7v4TpDGUyEm1C/4KIrpVSOCTm0P6vWdHT0I4mostRA= cloud.google.com/go/clouddms v1.5.0/go.mod h1:QSxQnhikCLUw13iAbffF2CZxAER3xDGNHjsTAkQJcQA= cloud.google.com/go/cloudtasks v1.8.0/go.mod h1:gQXUIwCSOI4yPVK7DgTVFiiP0ZW/eQkydWzwVMdHxrI= +cloud.google.com/go/cloudtasks v1.10.0 h1:uK5k6abf4yligFgYFnG0ni8msai/dSv6mDmiBulU0hU= cloud.google.com/go/cloudtasks v1.10.0/go.mod h1:NDSoTLkZ3+vExFEWu2UJV1arUyzVDAiZtdWcsUyNwBs= cloud.google.com/go/compute v0.1.0/go.mod h1:GAesmwr110a34z04OlxYkATPBEfVhkymfTBXtfbBFow= cloud.google.com/go/compute v1.3.0/go.mod h1:cCZiE1NHEtai4wiufUhW8I8S1JKkAnhnQJWM7YD99wM= @@ -680,6 +681,7 @@ github.com/garyburd/redigo v1.1.1-0.20170914051019-70e1b1943d4f h1:Sk0u0gIncQaQD github.com/garyburd/redigo v1.1.1-0.20170914051019-70e1b1943d4f/go.mod h1:NR3MbYisc3/PwhQ00EMzDiPmrwpPxAn5GI05/YaO1SY= github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk= github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE= +github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI= github.com/gin-gonic/gin v1.8.1 h1:4+fr/el88TOO3ewCmQr8cx/CtZ/umlIRIs5M4NTNjf8= github.com/go-chi/chi/v5 v5.0.7 h1:rDTPXLDHGATaeHvVlLcR4Qe0zftYethFucbjVQ1PxU8= github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1 h1:QbL/5oDUmRBzO9/Z7Seo6zf912W/a6Sr4Eu0G/3Jho0= @@ -768,6 +770,7 @@ github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639 h1:mV02weK github.com/inconshreveable/log15 v0.0.0-20170622235902-74a0988b5f80 h1:g/SJtZVYc1cxSB8lgrgqeOlIdi4MhqNNHYRAC8y+g4c= github.com/inconshreveable/log15 v0.0.0-20170622235902-74a0988b5f80/go.mod h1:cOaXtrgN4ScfRrD9Bre7U1thNq5RtJ8ZoP4iXVGRj6o= github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM= +github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/jstemmer/go-junit-report v0.9.1 h1:6QPYqodiu3GuPL+7mfx+NwDdp2eTkp9IfEUpgAwUN0o= github.com/juju/gnuflag v0.0.0-20171113085948-2ce1bb71843d h1:c93kUJDtVAXFEhsCh5jSxyOJmFHuzcihnslQiX8Urwo= github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5 h1:PJr+ZMXIecYc1Ey2zucXdR73SMBtgjPgwa31099IMv0= @@ -778,6 +781,7 @@ github.com/kevinmbeaulieu/eq-go v1.0.0/go.mod h1:G3S8ajA56gKBZm4UB9AOyoOS37JO3ro github.com/kisielk/gotool v1.0.0 h1:AV2c/EiW3KqPNT9ZKl07ehoAGi4C5/01Cfbblndcapg= github.com/klauspost/asmfmt v1.3.2 h1:4Ri7ox3EwapiOjCki+hw14RyKk201CN4rzyCJRFLpK4= github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4= +github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/kr/fs v0.1.0 h1:Jskdu9ieNAYnjxsi0LbQp1ulIKZV1LAFgK1tWhpZgl8= github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs= github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI= @@ -820,6 +824,7 @@ github.com/mitchellh/mapstructure v1.3.1/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RR github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M= +github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk= github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno= github.com/pelletier/go-toml v1.0.1-0.20170904195809-1d6b12b7cb29 h1:6P7XZEBu/ZWizC/liUX4UYm4nEAACofmSkOzY39RBxM= github.com/pelletier/go-toml v1.0.1-0.20170904195809-1d6b12b7cb29/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic= diff --git a/worker/go.mod b/worker/go.mod index 1a3302d96e..3c5ec021b3 100644 --- a/worker/go.mod +++ b/worker/go.mod @@ -17,6 +17,7 @@ require ( github.com/kennygrant/sanitize v1.2.4 github.com/labstack/echo/v4 v4.9.1 github.com/reearth/reearthx v0.0.0-20221109022045-dd54f4626639 + github.com/robbiet480/go.sns v0.0.0-20230523235941-e8d832c79d68 github.com/samber/lo v1.28.2 github.com/spf13/afero v1.9.2 github.com/stretchr/testify v1.8.1 diff --git a/worker/go.sum b/worker/go.sum index 5531be15eb..351bd2e870 100644 --- a/worker/go.sum +++ b/worker/go.sum @@ -262,6 +262,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA= github.com/reearth/reearthx v0.0.0-20221109022045-dd54f4626639 h1:qGHbGg2CsQinp9YcjRbmyCbTYgID+5qCrzvzx8D1veM= github.com/reearth/reearthx v0.0.0-20221109022045-dd54f4626639/go.mod h1:YZMXO1RhQ5fFL0GIOFvJq2GNskW7w+xoW4Zfu2QUXhw= +github.com/robbiet480/go.sns v0.0.0-20230523235941-e8d832c79d68 h1:Jknsfy5cqCH6qAuoU1qNZ51hfBJfMSJYwsH9j9mdVnw= +github.com/robbiet480/go.sns v0.0.0-20230523235941-e8d832c79d68/go.mod h1:9CDhL7uDVy8vEVDNPJzxq89dPaPBWP6hxQcC8woBHus= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rwcarlsen/goexif v0.0.0-20190401172101-9e8deecbddbd/go.mod h1:hPqNNc0+uJM6H+SuU8sEs5K5IQeKccPqeSjfgcKGgPk= github.com/samber/lo v1.28.2 h1:f1gctelJ5YQk336wCN+Elr90FyhZ6ArhelD5kjhNTz4= diff --git a/worker/internal/app/handler.go b/worker/internal/app/handler.go index 4e1180588f..f29be0f028 100644 --- a/worker/internal/app/handler.go +++ b/worker/internal/app/handler.go @@ -3,12 +3,16 @@ package app import ( "encoding/base64" "encoding/json" + "errors" + "io" "net/http" + "strings" "github.com/labstack/echo/v4" rhttp "github.com/reearth/reearth-cms/worker/internal/adapter/http" "github.com/reearth/reearth-cms/worker/pkg/webhook" "github.com/reearth/reearthx/log" + sns "github.com/robbiet480/go.sns" ) type Handler struct { @@ -22,11 +26,20 @@ func NewHandler(c *rhttp.Controller) *Handler { func (h Handler) DecompressHandler() echo.HandlerFunc { return func(c echo.Context) error { var input rhttp.DecompressInput - if err := c.Bind(&input); err != nil { - log.Errorf("failed to decompress: err=%s", err.Error()) + var err error + + if h.isAWS(c.Request()) { + input, err = parseSNSDecompressMessage(c.Request().Body) + } else if h.isGCP(c.Request()) { + input, err = parsePubSubDecompressMessage(c.Request().Body) + } else { + err = errors.New("unsupported request source") + } + + if err != nil { + log.Errorf("failed to parse request body: %s", err.Error()) return err } - log.Infof("decompression start: Asset=%s, Path=%s", input.AssetID, input.Path) if err := h.Controller.DecompressController.Decompress(c.Request().Context(), input); err != nil { log.Errorf("failed to decompress. input: %#v err:%s", input, err.Error()) @@ -39,28 +52,114 @@ func (h Handler) DecompressHandler() echo.HandlerFunc { func (h Handler) WebhookHandler() echo.HandlerFunc { return func(c echo.Context) error { - var msg msgBody - var w webhook.Webhook - if err := c.Bind(&msg); err != nil { - if err := c.Bind(&w); err != nil { - return err - } - } else if data, err := msg.Data(); err != nil { - return err - } else if err := json.Unmarshal(data, &w); err != nil { + var webhook webhook.Webhook + var err error + + if h.isAWS(c.Request()) { + webhook, err = parseSNSWebhookMessage(c.Request().Body) + } else if h.isGCP(c.Request()) { + webhook, err = parsePubSubWebhookMessage(c.Request().Body) + } else { + err = errors.New("unsupported request source") + } + + if err != nil { + log.Errorf("failed to parse request body: %s", err.Error()) return err } - if err := h.Controller.WebhookController.Webhook(c.Request().Context(), &w); err != nil { - log.Errorf("failed to send webhook. webhook: %#v err:%s", w, err.Error()) + if err := h.Controller.WebhookController.Webhook(c.Request().Context(), &webhook); err != nil { + log.Errorf("failed to send webhook. webhook: %#v err:%s", webhook, err.Error()) return err } - log.Info("webhook has been sent: %#v", w) + log.Infof("webhook has been sent: %#v", webhook) return c.NoContent(http.StatusOK) } } +func (h Handler) isAWS(r *http.Request) bool { + return r.Header.Get("X-Amz-Sns-Message-Type") == "Notification" +} + +func (h Handler) isGCP(r *http.Request) bool { + // TODO: need to find a better way to detect GCP requests + for headerName := range r.Header { + if strings.HasPrefix(strings.ToLower(headerName), "x-goog-") { + return true + } + } + return false +} + +func parseSNSDecompressMessage(body io.Reader) (rhttp.DecompressInput, error) { + var payload sns.Payload + var input rhttp.DecompressInput + + if err := json.NewDecoder(body).Decode(&payload); err != nil { + return input, err + } + + if err := json.Unmarshal([]byte(payload.Message), &input); err != nil { + return input, err + } + + // Validates payload's signature + if err := payload.VerifyPayload(); err != nil { + return input, err + } + + return input, nil +} + +func parsePubSubDecompressMessage(body io.Reader) (rhttp.DecompressInput, error) { + var input rhttp.DecompressInput + + if err := json.NewDecoder(body).Decode(&input); err != nil { + log.Errorf("failed to decompress: err=%s", err.Error()) + return input, err + } + + return input, nil +} + +func parseSNSWebhookMessage(body io.Reader) (webhook.Webhook, error) { + var payload sns.Payload + var w webhook.Webhook + + if err := json.NewDecoder(body).Decode(&payload); err != nil { + return w, err + } + + if err := json.Unmarshal([]byte(payload.Message), &w); err != nil { + return w, err + } + + // Validates payload's signature + if err := payload.VerifyPayload(); err != nil { + return w, err + } + + return w, nil +} + +func parsePubSubWebhookMessage(body io.Reader) (webhook.Webhook, error) { + var msg msgBody + var w webhook.Webhook + + if err := json.NewDecoder(body).Decode(&msg); err != nil { + if err := json.NewDecoder(body).Decode(&w); err != nil { + return w, err + } + } else if data, err := msg.Data(); err != nil { + return w, err + } else if err := json.Unmarshal(data, &w); err != nil { + return w, err + } + + return w, nil +} + type msgBody struct { Message struct { Data string `json:"data"`