Skip to content

Commit

Permalink
feat(worker): add AWS support to decompress and webhook handlers (#729)
Browse files Browse the repository at this point in the history
* update DecompressHandler

* wip: decompress handler

* refactor DecompressHandler

* fix webhook handler

* fix: return err

* refactor: decompress and webhook handlers

* refactor: SNSMessageHeader

* refactor request

* removed extra line

* refactor the handler

* refactor 2

* refactor: DecompressHandler

* add validation for message signature

* add signature validation for webhook handler

* refactor: decompress and webhook handlers

* fix

* refactor

* use json.NewDecoder instead of c.Bind

* fix: isGCP function
  • Loading branch information
nourbalaha authored Jul 12, 2023
1 parent f344499 commit 2ceefda
Show file tree
Hide file tree
Showing 4 changed files with 122 additions and 15 deletions.
5 changes: 5 additions & 0 deletions go.work.sum
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ cloud.google.com/go/clouddms v1.4.0/go.mod h1:Eh7sUGCC+aKry14O1NRljhjyrr0NFC0G2c
cloud.google.com/go/clouddms v1.5.0 h1:E7v4TpDGUyEm1C/4KIrpVSOCTm0P6vWdHT0I4mostRA=
cloud.google.com/go/clouddms v1.5.0/go.mod h1:QSxQnhikCLUw13iAbffF2CZxAER3xDGNHjsTAkQJcQA=
cloud.google.com/go/cloudtasks v1.8.0/go.mod h1:gQXUIwCSOI4yPVK7DgTVFiiP0ZW/eQkydWzwVMdHxrI=
cloud.google.com/go/cloudtasks v1.10.0 h1:uK5k6abf4yligFgYFnG0ni8msai/dSv6mDmiBulU0hU=
cloud.google.com/go/cloudtasks v1.10.0/go.mod h1:NDSoTLkZ3+vExFEWu2UJV1arUyzVDAiZtdWcsUyNwBs=
cloud.google.com/go/compute v0.1.0/go.mod h1:GAesmwr110a34z04OlxYkATPBEfVhkymfTBXtfbBFow=
cloud.google.com/go/compute v1.3.0/go.mod h1:cCZiE1NHEtai4wiufUhW8I8S1JKkAnhnQJWM7YD99wM=
Expand Down Expand Up @@ -680,6 +681,7 @@ github.com/garyburd/redigo v1.1.1-0.20170914051019-70e1b1943d4f h1:Sk0u0gIncQaQD
github.com/garyburd/redigo v1.1.1-0.20170914051019-70e1b1943d4f/go.mod h1:NR3MbYisc3/PwhQ00EMzDiPmrwpPxAn5GI05/YaO1SY=
github.com/ghodss/yaml v1.0.0 h1:wQHKEahhL6wmXdzwWG11gIVCkOv05bNOh+Rxn0yngAk=
github.com/gin-contrib/sse v0.1.0 h1:Y/yl/+YNO8GZSjAhjMsSuLt29uWRFHdHYUb5lYOV9qE=
github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI=
github.com/gin-gonic/gin v1.8.1 h1:4+fr/el88TOO3ewCmQr8cx/CtZ/umlIRIs5M4NTNjf8=
github.com/go-chi/chi/v5 v5.0.7 h1:rDTPXLDHGATaeHvVlLcR4Qe0zftYethFucbjVQ1PxU8=
github.com/go-gl/glfw v0.0.0-20190409004039-e6da0acd62b1 h1:QbL/5oDUmRBzO9/Z7Seo6zf912W/a6Sr4Eu0G/3Jho0=
Expand Down Expand Up @@ -768,6 +770,7 @@ github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639 h1:mV02weK
github.com/inconshreveable/log15 v0.0.0-20170622235902-74a0988b5f80 h1:g/SJtZVYc1cxSB8lgrgqeOlIdi4MhqNNHYRAC8y+g4c=
github.com/inconshreveable/log15 v0.0.0-20170622235902-74a0988b5f80/go.mod h1:cOaXtrgN4ScfRrD9Bre7U1thNq5RtJ8ZoP4iXVGRj6o=
github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnrnM=
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/jstemmer/go-junit-report v0.9.1 h1:6QPYqodiu3GuPL+7mfx+NwDdp2eTkp9IfEUpgAwUN0o=
github.com/juju/gnuflag v0.0.0-20171113085948-2ce1bb71843d h1:c93kUJDtVAXFEhsCh5jSxyOJmFHuzcihnslQiX8Urwo=
github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5 h1:PJr+ZMXIecYc1Ey2zucXdR73SMBtgjPgwa31099IMv0=
Expand All @@ -778,6 +781,7 @@ github.com/kevinmbeaulieu/eq-go v1.0.0/go.mod h1:G3S8ajA56gKBZm4UB9AOyoOS37JO3ro
github.com/kisielk/gotool v1.0.0 h1:AV2c/EiW3KqPNT9ZKl07ehoAGi4C5/01Cfbblndcapg=
github.com/klauspost/asmfmt v1.3.2 h1:4Ri7ox3EwapiOjCki+hw14RyKk201CN4rzyCJRFLpK4=
github.com/klauspost/cpuid/v2 v2.0.9 h1:lgaqFMSdTdQYdZ04uHyN2d/eKdOMyi2YLSvlQIBFYa4=
github.com/klauspost/cpuid/v2 v2.0.9/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/kr/fs v0.1.0 h1:Jskdu9ieNAYnjxsi0LbQp1ulIKZV1LAFgK1tWhpZgl8=
github.com/kr/pretty v0.2.0 h1:s5hAObm+yFO5uHYt5dYjxi2rXrsnmRpJx4OYvIWUaQs=
github.com/kr/pretty v0.2.0/go.mod h1:ipq/a2n7PKx3OHsz4KJII5eveXtPO4qwEXGdVfWzfnI=
Expand Down Expand Up @@ -820,6 +824,7 @@ github.com/mitchellh/mapstructure v1.3.1/go.mod h1:bFUtVrKA4DC2yAKiSyO/QUcy7e+RR
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg=
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q=
github.com/modern-go/reflect2 v1.0.2 h1:xBagoLtFs94CBntxluKeaWgTMpvLxC4ur3nMaC9Gz0M=
github.com/modern-go/reflect2 v1.0.2/go.mod h1:yWuevngMOJpCy52FWWMvUC8ws7m/LJsjYzDa0/r8luk=
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/pelletier/go-toml v1.0.1-0.20170904195809-1d6b12b7cb29 h1:6P7XZEBu/ZWizC/liUX4UYm4nEAACofmSkOzY39RBxM=
github.com/pelletier/go-toml v1.0.1-0.20170904195809-1d6b12b7cb29/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
Expand Down
1 change: 1 addition & 0 deletions worker/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ require (
github.com/kennygrant/sanitize v1.2.4
github.com/labstack/echo/v4 v4.9.1
github.com/reearth/reearthx v0.0.0-20221109022045-dd54f4626639
github.com/robbiet480/go.sns v0.0.0-20230523235941-e8d832c79d68
github.com/samber/lo v1.28.2
github.com/spf13/afero v1.9.2
github.com/stretchr/testify v1.8.1
Expand Down
2 changes: 2 additions & 0 deletions worker/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,8 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
github.com/prometheus/client_model v0.0.0-20190812154241-14fe0d1b01d4/go.mod h1:xMI15A0UPsDsEKsMN9yxemIoYk6Tm2C1GtYGdfGttqA=
github.com/reearth/reearthx v0.0.0-20221109022045-dd54f4626639 h1:qGHbGg2CsQinp9YcjRbmyCbTYgID+5qCrzvzx8D1veM=
github.com/reearth/reearthx v0.0.0-20221109022045-dd54f4626639/go.mod h1:YZMXO1RhQ5fFL0GIOFvJq2GNskW7w+xoW4Zfu2QUXhw=
github.com/robbiet480/go.sns v0.0.0-20230523235941-e8d832c79d68 h1:Jknsfy5cqCH6qAuoU1qNZ51hfBJfMSJYwsH9j9mdVnw=
github.com/robbiet480/go.sns v0.0.0-20230523235941-e8d832c79d68/go.mod h1:9CDhL7uDVy8vEVDNPJzxq89dPaPBWP6hxQcC8woBHus=
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rwcarlsen/goexif v0.0.0-20190401172101-9e8deecbddbd/go.mod h1:hPqNNc0+uJM6H+SuU8sEs5K5IQeKccPqeSjfgcKGgPk=
github.com/samber/lo v1.28.2 h1:f1gctelJ5YQk336wCN+Elr90FyhZ6ArhelD5kjhNTz4=
Expand Down
129 changes: 114 additions & 15 deletions worker/internal/app/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,16 @@ package app
import (
"encoding/base64"
"encoding/json"
"errors"
"io"
"net/http"
"strings"

"github.com/labstack/echo/v4"
rhttp "github.com/reearth/reearth-cms/worker/internal/adapter/http"
"github.com/reearth/reearth-cms/worker/pkg/webhook"
"github.com/reearth/reearthx/log"
sns "github.com/robbiet480/go.sns"
)

type Handler struct {
Expand All @@ -22,11 +26,20 @@ func NewHandler(c *rhttp.Controller) *Handler {
func (h Handler) DecompressHandler() echo.HandlerFunc {
return func(c echo.Context) error {
var input rhttp.DecompressInput
if err := c.Bind(&input); err != nil {
log.Errorf("failed to decompress: err=%s", err.Error())
var err error

if h.isAWS(c.Request()) {
input, err = parseSNSDecompressMessage(c.Request().Body)
} else if h.isGCP(c.Request()) {
input, err = parsePubSubDecompressMessage(c.Request().Body)
} else {
err = errors.New("unsupported request source")
}

if err != nil {
log.Errorf("failed to parse request body: %s", err.Error())
return err
}
log.Infof("decompression start: Asset=%s, Path=%s", input.AssetID, input.Path)

if err := h.Controller.DecompressController.Decompress(c.Request().Context(), input); err != nil {
log.Errorf("failed to decompress. input: %#v err:%s", input, err.Error())
Expand All @@ -39,28 +52,114 @@ func (h Handler) DecompressHandler() echo.HandlerFunc {

func (h Handler) WebhookHandler() echo.HandlerFunc {
return func(c echo.Context) error {
var msg msgBody
var w webhook.Webhook
if err := c.Bind(&msg); err != nil {
if err := c.Bind(&w); err != nil {
return err
}
} else if data, err := msg.Data(); err != nil {
return err
} else if err := json.Unmarshal(data, &w); err != nil {
var webhook webhook.Webhook
var err error

if h.isAWS(c.Request()) {
webhook, err = parseSNSWebhookMessage(c.Request().Body)
} else if h.isGCP(c.Request()) {
webhook, err = parsePubSubWebhookMessage(c.Request().Body)
} else {
err = errors.New("unsupported request source")
}

if err != nil {
log.Errorf("failed to parse request body: %s", err.Error())
return err
}

if err := h.Controller.WebhookController.Webhook(c.Request().Context(), &w); err != nil {
log.Errorf("failed to send webhook. webhook: %#v err:%s", w, err.Error())
if err := h.Controller.WebhookController.Webhook(c.Request().Context(), &webhook); err != nil {
log.Errorf("failed to send webhook. webhook: %#v err:%s", webhook, err.Error())
return err
}

log.Info("webhook has been sent: %#v", w)
log.Infof("webhook has been sent: %#v", webhook)
return c.NoContent(http.StatusOK)
}
}

func (h Handler) isAWS(r *http.Request) bool {
return r.Header.Get("X-Amz-Sns-Message-Type") == "Notification"
}

func (h Handler) isGCP(r *http.Request) bool {
// TODO: need to find a better way to detect GCP requests
for headerName := range r.Header {
if strings.HasPrefix(strings.ToLower(headerName), "x-goog-") {
return true
}
}
return false
}

func parseSNSDecompressMessage(body io.Reader) (rhttp.DecompressInput, error) {
var payload sns.Payload
var input rhttp.DecompressInput

if err := json.NewDecoder(body).Decode(&payload); err != nil {
return input, err
}

if err := json.Unmarshal([]byte(payload.Message), &input); err != nil {
return input, err
}

// Validates payload's signature
if err := payload.VerifyPayload(); err != nil {
return input, err
}

return input, nil
}

func parsePubSubDecompressMessage(body io.Reader) (rhttp.DecompressInput, error) {
var input rhttp.DecompressInput

if err := json.NewDecoder(body).Decode(&input); err != nil {
log.Errorf("failed to decompress: err=%s", err.Error())
return input, err
}

return input, nil
}

func parseSNSWebhookMessage(body io.Reader) (webhook.Webhook, error) {
var payload sns.Payload
var w webhook.Webhook

if err := json.NewDecoder(body).Decode(&payload); err != nil {
return w, err
}

if err := json.Unmarshal([]byte(payload.Message), &w); err != nil {
return w, err
}

// Validates payload's signature
if err := payload.VerifyPayload(); err != nil {
return w, err
}

return w, nil
}

func parsePubSubWebhookMessage(body io.Reader) (webhook.Webhook, error) {
var msg msgBody
var w webhook.Webhook

if err := json.NewDecoder(body).Decode(&msg); err != nil {
if err := json.NewDecoder(body).Decode(&w); err != nil {
return w, err
}
} else if data, err := msg.Data(); err != nil {
return w, err
} else if err := json.Unmarshal(data, &w); err != nil {
return w, err
}

return w, nil
}

type msgBody struct {
Message struct {
Data string `json:"data"`
Expand Down

0 comments on commit 2ceefda

Please sign in to comment.