From 507eff2e4f5007065846c7149b86790cc0da5a30 Mon Sep 17 00:00:00 2001 From: James Scott Date: Thu, 1 Jan 2026 21:12:15 +0000 Subject: [PATCH] feat(emailworker): Wire up email worker subscriber This commit connects the email worker's Pub/Sub subscriber to the message handler. It initializes the HTML renderer and the sender, and starts the subscription to process incoming email jobs. The frontend base URL is now configured via an environment variable to support link generation in emails. --- workers/email/cmd/job/main.go | 35 ++++++++++++++++++++++++++++---- workers/email/go.mod | 15 ++++++++++++++ workers/email/go.sum | 7 +++++++ workers/email/manifests/pod.yaml | 2 ++ 4 files changed, 55 insertions(+), 4 deletions(-) diff --git a/workers/email/cmd/job/main.go b/workers/email/cmd/job/main.go index 36a8b15f3..91bf8ec39 100644 --- a/workers/email/cmd/job/main.go +++ b/workers/email/cmd/job/main.go @@ -17,10 +17,16 @@ package main import ( "context" "log/slog" + "net/url" "os" + "github.com/GoogleChrome/webstatus.dev/lib/email/chime/chimeadapters" "github.com/GoogleChrome/webstatus.dev/lib/gcppubsub" + "github.com/GoogleChrome/webstatus.dev/lib/gcppubsub/gcppubsubadapters" "github.com/GoogleChrome/webstatus.dev/lib/gcpspanner" + "github.com/GoogleChrome/webstatus.dev/lib/gcpspanner/spanneradapters" + "github.com/GoogleChrome/webstatus.dev/workers/email/pkg/digest" + "github.com/GoogleChrome/webstatus.dev/workers/email/pkg/sender" ) func main() { @@ -48,6 +54,18 @@ func main() { spannerClient.SetMisingOneImplementationQuery(gcpspanner.LocalMissingOneImplementationQuery{}) } + baseURL := os.Getenv("FRONTEND_BASE_URL") + if baseURL == "" { + slog.ErrorContext(ctx, "FRONTEND_BASE_URL is not set. exiting...") + os.Exit(1) + } + + parsedBaseURL, err := url.Parse(baseURL) + if err != nil { + slog.ErrorContext(ctx, "failed to parse FRONTEND_BASE_URL", "error", err.Error()) + os.Exit(1) + } + // For subscribing to email events emailSubID := os.Getenv("EMAIL_SUBSCRIPTION_ID") if emailSubID == "" { @@ -61,11 +79,20 @@ func main() { os.Exit(1) } - // TODO: https://github.com/GoogleChrome/webstatus.dev/issues/1852 - // Nil handler for now. Will fix later - err = queueClient.Subscribe(ctx, emailSubID, nil) + renderer, err := digest.NewHTMLRenderer(parsedBaseURL.String()) if err != nil { - slog.ErrorContext(ctx, "unable to connect to subscription", "error", err) + // If the template is not valid, the renderer will fail. + slog.ErrorContext(ctx, "unable to create renderer", "error", err) + os.Exit(1) + } + + listener := gcppubsubadapters.NewEmailWorkerSubscriberAdapter(sender.NewSender( + chimeadapters.NewEmailWorkerChimeAdapter(nil), + spanneradapters.NewEmailWorkerChannelStateManager(spannerClient), + renderer, + ), queueClient, emailSubID) + if err := listener.Subscribe(ctx); err != nil { + slog.ErrorContext(ctx, "worker subscriber failed", "error", err) os.Exit(1) } } diff --git a/workers/email/go.mod b/workers/email/go.mod index 050017a66..f1e99c9b6 100644 --- a/workers/email/go.mod +++ b/workers/email/go.mod @@ -16,10 +16,15 @@ require ( cloud.google.com/go v0.123.0 // indirect cloud.google.com/go/auth v0.17.0 // indirect cloud.google.com/go/auth/oauth2adapt v0.2.8 // indirect + cloud.google.com/go/cloudtasks v1.13.7 // indirect cloud.google.com/go/compute/metadata v0.9.0 // indirect + cloud.google.com/go/datastore v1.21.0 // indirect cloud.google.com/go/iam v1.5.3 // indirect + cloud.google.com/go/logging v1.13.1 // indirect + cloud.google.com/go/longrunning v0.7.0 // indirect cloud.google.com/go/monitoring v1.24.3 // indirect cloud.google.com/go/pubsub/v2 v2.0.0 // indirect + cloud.google.com/go/secretmanager v1.16.0 // indirect cloud.google.com/go/spanner v1.86.1 // indirect github.com/GoogleChrome/webstatus.dev/lib/gen v0.0.0-20251119220853-b545639c35ae // indirect github.com/GoogleCloudPlatform/grpc-gcp-go/grpcgcp v1.5.3 // indirect @@ -28,6 +33,7 @@ require ( github.com/apapsch/go-jsonmerge/v2 v2.0.0 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/cncf/xds/go v0.0.0-20251110193048-8bfbf64dc13e // indirect + github.com/deckarep/golang-set v1.8.0 // indirect github.com/envoyproxy/go-control-plane/envoy v1.36.0 // indirect github.com/envoyproxy/protoc-gen-validate v1.2.1 // indirect github.com/felixge/httpsnoop v1.0.4 // indirect @@ -38,10 +44,16 @@ require ( github.com/go-openapi/jsonpointer v0.22.3 // indirect github.com/go-openapi/swag/jsonname v0.25.3 // indirect github.com/golang/groupcache v0.0.0-20241129210726-2c02b8208cf8 // indirect + github.com/gomodule/redigo v1.9.3 // indirect + github.com/google/go-github/v77 v77.0.0 // indirect + github.com/google/go-querystring v1.1.0 // indirect github.com/google/s2a-go v0.1.9 // indirect github.com/google/uuid v1.6.0 // indirect github.com/googleapis/enterprise-certificate-proxy v0.3.7 // indirect github.com/googleapis/gax-go/v2 v2.15.0 // indirect + github.com/gorilla/handlers v1.5.2 // indirect + github.com/gorilla/mux v1.8.1 // indirect + github.com/gorilla/securecookie v1.1.2 // indirect github.com/josharian/intern v1.0.0 // indirect github.com/mailru/easyjson v0.9.1 // indirect github.com/mohae/deepcopy v0.0.0-20170929034955-c48cc78d4826 // indirect @@ -50,7 +62,9 @@ require ( github.com/oasdiff/yaml3 v0.0.0-20250309153720-d2182401db90 // indirect github.com/perimeterx/marshmallow v1.1.5 // indirect github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect + github.com/sirupsen/logrus v1.9.3 // indirect github.com/spiffe/go-spiffe/v2 v2.6.0 // indirect + github.com/web-platform-tests/wpt.fyi v0.0.0-20251118162843-54f805c8a632 // indirect github.com/woodsbury/decimal128 v1.4.0 // indirect go.opencensus.io v0.24.0 // indirect go.opentelemetry.io/auto/sdk v1.2.1 // indirect @@ -76,4 +90,5 @@ require ( google.golang.org/genproto/googleapis/rpc v0.0.0-20251111163417-95abcf5c77ba // indirect google.golang.org/grpc v1.77.0 // indirect google.golang.org/protobuf v1.36.10 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/workers/email/go.sum b/workers/email/go.sum index 7f043218c..672b53cf0 100644 --- a/workers/email/go.sum +++ b/workers/email/go.sum @@ -842,6 +842,8 @@ github.com/google/go-github/v77 v77.0.0 h1:9DsKKbZqil5y/4Z9mNpZDQnpli6PJbqipSuuN github.com/google/go-github/v77 v77.0.0/go.mod h1:c8VmGXRUmaZUqbctUcGEDWYnMrtzZfJhDSylEf1wfmA= github.com/google/go-querystring v1.1.0 h1:AnCroh3fv4ZBgVIf1Iwtovgjaw/GiKJo8M8yD/fhyJ8= github.com/google/go-querystring v1.1.0/go.mod h1:Kcdr2DB4koayq7X8pmAG4sNG59So17icRSOU623lUBU= +github.com/google/gofuzz v1.2.0 h1:xRy4A+RhZaiKjJ1bPfwQ8sedCA+YS2YcCHW6ec7JMi0= +github.com/google/gofuzz v1.2.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= github.com/google/martian v2.1.0+incompatible/go.mod h1:9I4somxYTbIHy5NJKHRl3wXiIaQGbYVAs8BPL6v8lEs= github.com/google/martian/v3 v3.0.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= github.com/google/martian/v3 v3.1.0/go.mod h1:y5Zk1BBys9G+gd6Jrk0W3cC1+ELVxBWuIGO+w/tUAp0= @@ -973,6 +975,8 @@ github.com/opencontainers/image-spec v1.1.1 h1:y0fUlFfIZhPF1W537XOLg0/fcx6zcHCJw github.com/opencontainers/image-spec v1.1.1/go.mod h1:qpqAh3Dmcf36wStyyWU+kCeDgrGnAve2nCC8+7h8Q0M= github.com/perimeterx/marshmallow v1.1.5 h1:a2LALqQ1BlHM8PZblsDdidgv1mWi1DgC2UmX50IvK2s= github.com/perimeterx/marshmallow v1.1.5/go.mod h1:dsXbUu8CRzfYP5a87xpp0xq9S3u0Vchtcl8we9tYaXw= +github.com/phayes/freeport v0.0.0-20220201140144-74d24b5ae9f5 h1:Ii+DKncOVM8Cu1Hc+ETb5K+23HdAMvESYE3ZJ5b5cMI= +github.com/phayes/freeport v0.0.0-20220201140144-74d24b5ae9f5/go.mod h1:iIss55rKnNBTvrwdmkUpLnDpZoAHvWaiq5+iMmen4AE= github.com/phpdave11/gofpdf v1.4.2/go.mod h1:zpO6xFn9yxo3YLyMvW8HcKWVdbNqgIfOOp2dXMnm1mY= github.com/phpdave11/gofpdi v1.0.12/go.mod h1:vBmVV0Do6hSBHC8uKUQ71JGW+ZGQq74llk/7bXwjDoI= github.com/phpdave11/gofpdi v1.0.13/go.mod h1:vBmVV0Do6hSBHC8uKUQ71JGW+ZGQq74llk/7bXwjDoI= @@ -1083,6 +1087,8 @@ go.opentelemetry.io/otel/trace v1.38.0/go.mod h1:j1P9ivuFsTceSWe1oY+EeW3sc+Pp42s go.opentelemetry.io/proto/otlp v0.7.0/go.mod h1:PqfVotwruBrMGOCsRd/89rSnXhoiJIqeYNgFYFoEGnI= go.opentelemetry.io/proto/otlp v0.15.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U= go.opentelemetry.io/proto/otlp v0.19.0/go.mod h1:H7XAot3MsfNsj7EXtrA2q5xSNQ10UqI405h3+duxN4U= +go.uber.org/mock v0.6.0 h1:hyF9dfmbgIX5EfOdasqLsWD6xqpNZlXblLB/Dbnwv3Y= +go.uber.org/mock v0.6.0/go.mod h1:KiVJ4BqZJaMj4svdfmHM0AUx4NJYO8ZNpPnZn1Z+BBU= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= @@ -1328,6 +1334,7 @@ golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220610221304-9f5ed59c137d/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220615213510-4f61da869c0c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220624220833-87e55d714810/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220715151400-c0bba94af5f8/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220728004956-3c1f35247d10/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/workers/email/manifests/pod.yaml b/workers/email/manifests/pod.yaml index b3ab7b3d5..d16c68011 100644 --- a/workers/email/manifests/pod.yaml +++ b/workers/email/manifests/pod.yaml @@ -36,6 +36,8 @@ spec: value: 'pubsub:8060' - name: EMAIL_SUBSCRIPTION_ID value: 'chime-delivery-sub-id' + - name: FRONTEND_BASE_URL + value: 'http://localhost:5555' resources: limits: cpu: 250m