Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion lib/gcppubsub/gcppubsubadapters/push_delivery.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,11 @@ func (p *PushDeliveryPublisher) PublishEmailJob(ctx context.Context, job workert
return err
}

if _, err := p.client.Publish(ctx, p.emailTopic, b); err != nil {
id, err := p.client.Publish(ctx, p.emailTopic, b)
if err != nil {
return fmt.Errorf("failed to publish email job: %w", err)
}
slog.InfoContext(ctx, "published email job", "id", id, "eventID", job.Metadata.EventID)

return nil
}
Expand Down
18 changes: 18 additions & 0 deletions lib/workertypes/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,3 +640,21 @@ type EmailDeliveryJob struct {
// Metadata contains context for links and tracking.
Metadata DeliveryMetadata
}

type IncomingEmailDeliveryJob struct {
EmailDeliveryJob
// The ID from the queued event for this specific email job.
// This will be generated by the queuing service.
// This is different from the EventID in the Metadata which is for the original event that triggered
// the event producer in the very beginning.
EmailEventID string
}

var (
// ErrUnrecoverableSystemFailureEmailSending indicates that there's a system failure that should not be retried.
// Examples: System auth issue.
ErrUnrecoverableSystemFailureEmailSending = errors.New("unrecoverable user failure trying to send email")
// ErrUnrecoverableUserFailureEmailSending indicates that there's a user failure that should not be retried.
// Examples: Bad email address.
ErrUnrecoverableUserFailureEmailSending = errors.New("unrecoverable user failure trying to send email")
)
38 changes: 25 additions & 13 deletions workers/email/pkg/sender/sender.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,11 @@ package sender

import (
"context"
"errors"
"log/slog"
"time"

"github.com/GoogleChrome/webstatus.dev/lib/event"
"github.com/GoogleChrome/webstatus.dev/lib/workertypes"
)

Expand All @@ -26,18 +29,20 @@ type EmailSender interface {
}

type ChannelStateManager interface {
RecordSuccess(ctx context.Context, channelID string) error
RecordFailure(ctx context.Context, channelID string, err error) error
RecordSuccess(ctx context.Context, channelID string, timestamp time.Time, eventID string) error
RecordFailure(ctx context.Context, channelID string, err error,
timestamp time.Time, permanentUserFailure bool, emailEventID string) error
}

type TemplateRenderer interface {
RenderDigest(job workertypes.EmailDeliveryJob) (string, string, error)
RenderDigest(job workertypes.IncomingEmailDeliveryJob) (string, string, error)
}

type Sender struct {
sender EmailSender
stateManager ChannelStateManager
renderer TemplateRenderer
now func() time.Time
}

func NewSender(
Expand All @@ -49,36 +54,43 @@ func NewSender(
sender: sender,
stateManager: stateManager,
renderer: renderer,
now: time.Now,
}
}

func (s *Sender) ProcessMessage(ctx context.Context, job workertypes.EmailDeliveryJob) error {
func (s *Sender) ProcessMessage(ctx context.Context, job workertypes.IncomingEmailDeliveryJob) error {
// 1. Render (Parsing happens inside RenderDigest implementation)
subject, body, err := s.renderer.RenderDigest(job)
if err != nil {
slog.ErrorContext(ctx, "failed to render email", "subscription_id", job.SubscriptionID, "error", err)
if err := s.stateManager.RecordFailure(ctx, job.ChannelID, err); err != nil {
slog.ErrorContext(ctx, "failed to record channel failure", "channel_id", job.ChannelID, "error", err)
if dbErr := s.stateManager.RecordFailure(ctx, job.ChannelID, err, s.now(), false, job.EmailEventID); dbErr != nil {
slog.ErrorContext(ctx, "failed to record channel failure", "channel_id", job.ChannelID, "error", dbErr)
}
// Rendering errors might be transient or permanent. Assuming permanent for template bugs.
return nil

return err
}

// 2. Send
if err := s.sender.Send(ctx, job.RecipientEmail, subject, body); err != nil {
isPermanentUserError := errors.Is(err, workertypes.ErrUnrecoverableUserFailureEmailSending)
isPermanent := errors.Is(err, workertypes.ErrUnrecoverableSystemFailureEmailSending) ||
isPermanentUserError
slog.ErrorContext(ctx, "failed to send email", "recipient", job.RecipientEmail, "error", err)
// Record failure in DB
if dbErr := s.stateManager.RecordFailure(ctx, job.ChannelID, err); dbErr != nil {
if dbErr := s.stateManager.RecordFailure(ctx, job.ChannelID, err, s.now(),
isPermanentUserError, job.EmailEventID); dbErr != nil {
slog.ErrorContext(ctx, "failed to record channel failure", "channel_id", job.ChannelID, "error", dbErr)
}
if isPermanent {
return err
}

// Return error to NACK the message and retry sending?
// Sending failures (network, rate limit) are often transient.
return err
// If not permanent, wrap with ErrTransient to trigger NACK (which will retry)
return errors.Join(event.ErrTransientFailure, err)
}

// 3. Success
if err := s.stateManager.RecordSuccess(ctx, job.ChannelID); err != nil {
if err := s.stateManager.RecordSuccess(ctx, job.ChannelID, s.now(), job.EmailEventID); err != nil {
// Non-critical error, but good to log
slog.WarnContext(ctx, "failed to record channel success", "channel_id", job.ChannelID, "error", err)
}
Expand Down
185 changes: 134 additions & 51 deletions workers/email/pkg/sender/sender_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"testing"
"time"

"github.com/GoogleChrome/webstatus.dev/lib/event"
"github.com/GoogleChrome/webstatus.dev/lib/workertypes"
"github.com/google/go-cmp/cmp"
)
Expand All @@ -43,25 +44,37 @@ func (m *mockEmailSender) Send(_ context.Context, to, subject, body string) erro
return m.sendErr
}

type successCall struct {
channelID string
emailEventID string
timestamp time.Time
}

type mockChannelStateManager struct {
successCalls []string // channelIDs
successCalls []successCall
failureCalls []failureCall
recordErr error
}

type failureCall struct {
channelID string
err error
channelID string
emailEventID string
err error
isPermanentUserError bool
timestamp time.Time
}

func (m *mockChannelStateManager) RecordSuccess(_ context.Context, channelID string) error {
m.successCalls = append(m.successCalls, channelID)
func (m *mockChannelStateManager) RecordSuccess(_ context.Context, channelID string,
timestamp time.Time, emailEventID string) error {
m.successCalls = append(m.successCalls, successCall{channelID, emailEventID, timestamp})

return m.recordErr
}

func (m *mockChannelStateManager) RecordFailure(_ context.Context, channelID string, err error) error {
m.failureCalls = append(m.failureCalls, failureCall{channelID, err})
func (m *mockChannelStateManager) RecordFailure(_ context.Context, channelID string, err error,
timestamp time.Time, isPermanentUserError bool, emailEventID string,
) error {
m.failureCalls = append(m.failureCalls, failureCall{channelID, emailEventID, err, isPermanentUserError, timestamp})

return m.recordErr
}
Expand All @@ -70,10 +83,10 @@ type mockTemplateRenderer struct {
renderSubject string
renderBody string
renderErr error
renderInput workertypes.EmailDeliveryJob
renderInput workertypes.IncomingEmailDeliveryJob
}

func (m *mockTemplateRenderer) RenderDigest(job workertypes.EmailDeliveryJob) (string, string, error) {
func (m *mockTemplateRenderer) RenderDigest(job workertypes.IncomingEmailDeliveryJob) (string, string, error) {
m.renderInput = job

return m.renderSubject, m.renderBody, m.renderErr
Expand All @@ -95,16 +108,23 @@ func testMetadata() workertypes.DeliveryMetadata {
}
}

func fakeNow() time.Time {
return time.Date(2025, 1, 1, 12, 0, 0, 0, time.UTC)
}

// --- Tests ---

func TestProcessMessage_Success(t *testing.T) {
ctx := context.Background()
job := workertypes.EmailDeliveryJob{
SubscriptionID: "sub-1",
Metadata: testMetadata(),
RecipientEmail: "user@example.com",
SummaryRaw: []byte("{}"),
ChannelID: "chan-1",
job := workertypes.IncomingEmailDeliveryJob{
EmailDeliveryJob: workertypes.EmailDeliveryJob{
SubscriptionID: "sub-1",
Metadata: testMetadata(),
RecipientEmail: "user@example.com",
SummaryRaw: []byte("{}"),
ChannelID: "chan-1",
},
EmailEventID: "job-id",
}

sender := new(mockEmailSender)
Expand All @@ -114,6 +134,7 @@ func TestProcessMessage_Success(t *testing.T) {
renderer.renderBody = "Body"

h := NewSender(sender, stateManager, renderer)
h.now = fakeNow

err := h.ProcessMessage(ctx, job)
if err != nil {
Expand All @@ -137,19 +158,28 @@ func TestProcessMessage_Success(t *testing.T) {
if len(stateManager.successCalls) != 1 {
t.Errorf("Expected 1 success record, got %d", len(stateManager.successCalls))
}
if stateManager.successCalls[0] != testChannelID {
t.Errorf("Success recorded for wrong channel: %s", stateManager.successCalls[0])
if stateManager.successCalls[0].channelID != testChannelID {
t.Errorf("Success recorded for wrong channel: %v", stateManager.successCalls[0])
}
if stateManager.successCalls[0].emailEventID != "job-id" {
t.Errorf("Success recorded for wrong event: %v", stateManager.successCalls[0])
}
if !stateManager.successCalls[0].timestamp.Equal(fakeNow()) {
t.Errorf("Success recorded with wrong timestamp: %v", stateManager.successCalls[0])
}
}

func TestProcessMessage_RenderError(t *testing.T) {
ctx := context.Background()
job := workertypes.EmailDeliveryJob{
SubscriptionID: "sub-1",
Metadata: testMetadata(),
RecipientEmail: "user@example.com",
SummaryRaw: []byte("{}"),
ChannelID: "chan-1",
job := workertypes.IncomingEmailDeliveryJob{
EmailDeliveryJob: workertypes.EmailDeliveryJob{
SubscriptionID: "sub-1",
Metadata: testMetadata(),
RecipientEmail: "user@example.com",
SummaryRaw: []byte("{}"),
ChannelID: "chan-1",
},
EmailEventID: "job-id",
}

sender := new(mockEmailSender)
Expand All @@ -158,10 +188,16 @@ func TestProcessMessage_RenderError(t *testing.T) {
renderer.renderErr = errors.New("template error")

h := NewSender(sender, stateManager, renderer)
h.now = fakeNow

// Should return non transient error (ACK) for rendering error
err := h.ProcessMessage(ctx, job)
if errors.Is(err, event.ErrTransientFailure) {
t.Errorf("Expected non transient error for render failure, got %v", err)
}

// Should return nil (ACK) for rendering error (assuming permanent for now)
if err := h.ProcessMessage(ctx, job); err != nil {
t.Errorf("Expected nil error for render failure, got %v", err)
if !errors.Is(err, renderer.renderErr) {
t.Errorf("Expected configured renderer error, got %v", err)
}

// Should record failure
Expand All @@ -177,34 +213,81 @@ func TestProcessMessage_RenderError(t *testing.T) {

func TestProcessMessage_SendError(t *testing.T) {
ctx := context.Background()
job := workertypes.EmailDeliveryJob{
SubscriptionID: "sub-1",
Metadata: testMetadata(),
RecipientEmail: "user@example.com",
SummaryRaw: []byte("{}"),
ChannelID: "chan-1",
job := workertypes.IncomingEmailDeliveryJob{
EmailDeliveryJob: workertypes.EmailDeliveryJob{
SubscriptionID: "sub-1",
Metadata: testMetadata(),
RecipientEmail: "user@example.com",
SummaryRaw: []byte("{}"),
ChannelID: "chan-1",
},
EmailEventID: "job-id",
}

sendErr := errors.New("smtp timeout")
sender := &mockEmailSender{sendErr: sendErr, sentCalls: nil}
stateManager := new(mockChannelStateManager)
renderer := new(mockTemplateRenderer)
renderer.renderSubject = "S"
renderer.renderBody = "B"

h := NewSender(sender, stateManager, renderer)

// Should return error (NACK) for send failure to allow retry
err := h.ProcessMessage(ctx, job)
if !errors.Is(err, sendErr) {
t.Errorf("Expected send error to propagate, got %v", err)
testCases := []struct {
name string
sendErr error
isPermanentUserError bool
wantNack bool
}{
{
"regular error = NACK",
errors.New("send error"),
false,
true,
},
{
"user error = ACK",
workertypes.ErrUnrecoverableUserFailureEmailSending,
true,
false,
},
{
"system error = ACK",
workertypes.ErrUnrecoverableSystemFailureEmailSending,
false,
false,
},
}

// Should record failure in DB as well
if len(stateManager.failureCalls) != 1 {
t.Fatal("Expected failure recording")
}
if stateManager.failureCalls[0].channelID != testChannelID {
t.Errorf("Recorded failure for wrong channel")
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
sender := &mockEmailSender{sendErr: tc.sendErr, sentCalls: nil}
stateManager := new(mockChannelStateManager)
renderer := new(mockTemplateRenderer)
renderer.renderSubject = "S"
renderer.renderBody = "B"

h := NewSender(sender, stateManager, renderer)
h.now = fakeNow

err := h.ProcessMessage(ctx, job)
if !errors.Is(err, tc.sendErr) {
t.Errorf("Expected send error %v, got %v", tc.sendErr, err)
}
// Should record failure in DB as well
if len(stateManager.failureCalls) != 1 {
t.Fatal("Expected failure recording")
}
if stateManager.failureCalls[0].channelID != testChannelID {
t.Errorf("Recorded failure for wrong channel")
}
if stateManager.failureCalls[0].emailEventID != "job-id" {
t.Errorf("Recorded failure for wrong event")
}
if tc.isPermanentUserError != stateManager.failureCalls[0].isPermanentUserError {
t.Errorf("Recorded failure for wrong error type")
}
if !stateManager.failureCalls[0].timestamp.Equal(fakeNow()) {
t.Errorf("Recorded failure for wrong timestamp")
}

if tc.wantNack {
if !errors.Is(err, event.ErrTransientFailure) {
t.Errorf("Expected transient failure for NACK, got %v", err)
}
}
})
}

}
Loading