diff --git a/lib/gcppubsub/gcppubsubadapters/push_delivery.go b/lib/gcppubsub/gcppubsubadapters/push_delivery.go index 0a42bd39c..c3a6703ea 100644 --- a/lib/gcppubsub/gcppubsubadapters/push_delivery.go +++ b/lib/gcppubsub/gcppubsubadapters/push_delivery.go @@ -55,9 +55,11 @@ func (p *PushDeliveryPublisher) PublishEmailJob(ctx context.Context, job workert return err } - if _, err := p.client.Publish(ctx, p.emailTopic, b); err != nil { + id, err := p.client.Publish(ctx, p.emailTopic, b) + if err != nil { return fmt.Errorf("failed to publish email job: %w", err) } + slog.InfoContext(ctx, "published email job", "id", id, "eventID", job.Metadata.EventID) return nil } diff --git a/lib/workertypes/types.go b/lib/workertypes/types.go index fef3c93db..d4d27f097 100644 --- a/lib/workertypes/types.go +++ b/lib/workertypes/types.go @@ -640,3 +640,21 @@ type EmailDeliveryJob struct { // Metadata contains context for links and tracking. Metadata DeliveryMetadata } + +type IncomingEmailDeliveryJob struct { + EmailDeliveryJob + // The ID from the queued event for this specific email job. + // This will be generated by the queuing service. + // This is different from the EventID in the Metadata which is for the original event that triggered + // the event producer in the very beginning. + EmailEventID string +} + +var ( + // ErrUnrecoverableSystemFailureEmailSending indicates that there's a system failure that should not be retried. + // Examples: System auth issue. + ErrUnrecoverableSystemFailureEmailSending = errors.New("unrecoverable user failure trying to send email") + // ErrUnrecoverableUserFailureEmailSending indicates that there's a user failure that should not be retried. + // Examples: Bad email address. + ErrUnrecoverableUserFailureEmailSending = errors.New("unrecoverable user failure trying to send email") +) diff --git a/workers/email/pkg/sender/sender.go b/workers/email/pkg/sender/sender.go index 940e653ae..139e2a4d9 100644 --- a/workers/email/pkg/sender/sender.go +++ b/workers/email/pkg/sender/sender.go @@ -16,8 +16,11 @@ package sender import ( "context" + "errors" "log/slog" + "time" + "github.com/GoogleChrome/webstatus.dev/lib/event" "github.com/GoogleChrome/webstatus.dev/lib/workertypes" ) @@ -26,18 +29,20 @@ type EmailSender interface { } type ChannelStateManager interface { - RecordSuccess(ctx context.Context, channelID string) error - RecordFailure(ctx context.Context, channelID string, err error) error + RecordSuccess(ctx context.Context, channelID string, timestamp time.Time, eventID string) error + RecordFailure(ctx context.Context, channelID string, err error, + timestamp time.Time, permanentUserFailure bool, emailEventID string) error } type TemplateRenderer interface { - RenderDigest(job workertypes.EmailDeliveryJob) (string, string, error) + RenderDigest(job workertypes.IncomingEmailDeliveryJob) (string, string, error) } type Sender struct { sender EmailSender stateManager ChannelStateManager renderer TemplateRenderer + now func() time.Time } func NewSender( @@ -49,36 +54,43 @@ func NewSender( sender: sender, stateManager: stateManager, renderer: renderer, + now: time.Now, } } -func (s *Sender) ProcessMessage(ctx context.Context, job workertypes.EmailDeliveryJob) error { +func (s *Sender) ProcessMessage(ctx context.Context, job workertypes.IncomingEmailDeliveryJob) error { // 1. Render (Parsing happens inside RenderDigest implementation) subject, body, err := s.renderer.RenderDigest(job) if err != nil { slog.ErrorContext(ctx, "failed to render email", "subscription_id", job.SubscriptionID, "error", err) - if err := s.stateManager.RecordFailure(ctx, job.ChannelID, err); err != nil { - slog.ErrorContext(ctx, "failed to record channel failure", "channel_id", job.ChannelID, "error", err) + if dbErr := s.stateManager.RecordFailure(ctx, job.ChannelID, err, s.now(), false, job.EmailEventID); dbErr != nil { + slog.ErrorContext(ctx, "failed to record channel failure", "channel_id", job.ChannelID, "error", dbErr) } - // Rendering errors might be transient or permanent. Assuming permanent for template bugs. - return nil + + return err } // 2. Send if err := s.sender.Send(ctx, job.RecipientEmail, subject, body); err != nil { + isPermanentUserError := errors.Is(err, workertypes.ErrUnrecoverableUserFailureEmailSending) + isPermanent := errors.Is(err, workertypes.ErrUnrecoverableSystemFailureEmailSending) || + isPermanentUserError slog.ErrorContext(ctx, "failed to send email", "recipient", job.RecipientEmail, "error", err) // Record failure in DB - if dbErr := s.stateManager.RecordFailure(ctx, job.ChannelID, err); dbErr != nil { + if dbErr := s.stateManager.RecordFailure(ctx, job.ChannelID, err, s.now(), + isPermanentUserError, job.EmailEventID); dbErr != nil { slog.ErrorContext(ctx, "failed to record channel failure", "channel_id", job.ChannelID, "error", dbErr) } + if isPermanent { + return err + } - // Return error to NACK the message and retry sending? - // Sending failures (network, rate limit) are often transient. - return err + // If not permanent, wrap with ErrTransient to trigger NACK (which will retry) + return errors.Join(event.ErrTransientFailure, err) } // 3. Success - if err := s.stateManager.RecordSuccess(ctx, job.ChannelID); err != nil { + if err := s.stateManager.RecordSuccess(ctx, job.ChannelID, s.now(), job.EmailEventID); err != nil { // Non-critical error, but good to log slog.WarnContext(ctx, "failed to record channel success", "channel_id", job.ChannelID, "error", err) } diff --git a/workers/email/pkg/sender/sender_test.go b/workers/email/pkg/sender/sender_test.go index 10218f298..b4f8a886f 100644 --- a/workers/email/pkg/sender/sender_test.go +++ b/workers/email/pkg/sender/sender_test.go @@ -20,6 +20,7 @@ import ( "testing" "time" + "github.com/GoogleChrome/webstatus.dev/lib/event" "github.com/GoogleChrome/webstatus.dev/lib/workertypes" "github.com/google/go-cmp/cmp" ) @@ -43,25 +44,37 @@ func (m *mockEmailSender) Send(_ context.Context, to, subject, body string) erro return m.sendErr } +type successCall struct { + channelID string + emailEventID string + timestamp time.Time +} + type mockChannelStateManager struct { - successCalls []string // channelIDs + successCalls []successCall failureCalls []failureCall recordErr error } type failureCall struct { - channelID string - err error + channelID string + emailEventID string + err error + isPermanentUserError bool + timestamp time.Time } -func (m *mockChannelStateManager) RecordSuccess(_ context.Context, channelID string) error { - m.successCalls = append(m.successCalls, channelID) +func (m *mockChannelStateManager) RecordSuccess(_ context.Context, channelID string, + timestamp time.Time, emailEventID string) error { + m.successCalls = append(m.successCalls, successCall{channelID, emailEventID, timestamp}) return m.recordErr } -func (m *mockChannelStateManager) RecordFailure(_ context.Context, channelID string, err error) error { - m.failureCalls = append(m.failureCalls, failureCall{channelID, err}) +func (m *mockChannelStateManager) RecordFailure(_ context.Context, channelID string, err error, + timestamp time.Time, isPermanentUserError bool, emailEventID string, +) error { + m.failureCalls = append(m.failureCalls, failureCall{channelID, emailEventID, err, isPermanentUserError, timestamp}) return m.recordErr } @@ -70,10 +83,10 @@ type mockTemplateRenderer struct { renderSubject string renderBody string renderErr error - renderInput workertypes.EmailDeliveryJob + renderInput workertypes.IncomingEmailDeliveryJob } -func (m *mockTemplateRenderer) RenderDigest(job workertypes.EmailDeliveryJob) (string, string, error) { +func (m *mockTemplateRenderer) RenderDigest(job workertypes.IncomingEmailDeliveryJob) (string, string, error) { m.renderInput = job return m.renderSubject, m.renderBody, m.renderErr @@ -95,16 +108,23 @@ func testMetadata() workertypes.DeliveryMetadata { } } +func fakeNow() time.Time { + return time.Date(2025, 1, 1, 12, 0, 0, 0, time.UTC) +} + // --- Tests --- func TestProcessMessage_Success(t *testing.T) { ctx := context.Background() - job := workertypes.EmailDeliveryJob{ - SubscriptionID: "sub-1", - Metadata: testMetadata(), - RecipientEmail: "user@example.com", - SummaryRaw: []byte("{}"), - ChannelID: "chan-1", + job := workertypes.IncomingEmailDeliveryJob{ + EmailDeliveryJob: workertypes.EmailDeliveryJob{ + SubscriptionID: "sub-1", + Metadata: testMetadata(), + RecipientEmail: "user@example.com", + SummaryRaw: []byte("{}"), + ChannelID: "chan-1", + }, + EmailEventID: "job-id", } sender := new(mockEmailSender) @@ -114,6 +134,7 @@ func TestProcessMessage_Success(t *testing.T) { renderer.renderBody = "Body" h := NewSender(sender, stateManager, renderer) + h.now = fakeNow err := h.ProcessMessage(ctx, job) if err != nil { @@ -137,19 +158,28 @@ func TestProcessMessage_Success(t *testing.T) { if len(stateManager.successCalls) != 1 { t.Errorf("Expected 1 success record, got %d", len(stateManager.successCalls)) } - if stateManager.successCalls[0] != testChannelID { - t.Errorf("Success recorded for wrong channel: %s", stateManager.successCalls[0]) + if stateManager.successCalls[0].channelID != testChannelID { + t.Errorf("Success recorded for wrong channel: %v", stateManager.successCalls[0]) + } + if stateManager.successCalls[0].emailEventID != "job-id" { + t.Errorf("Success recorded for wrong event: %v", stateManager.successCalls[0]) + } + if !stateManager.successCalls[0].timestamp.Equal(fakeNow()) { + t.Errorf("Success recorded with wrong timestamp: %v", stateManager.successCalls[0]) } } func TestProcessMessage_RenderError(t *testing.T) { ctx := context.Background() - job := workertypes.EmailDeliveryJob{ - SubscriptionID: "sub-1", - Metadata: testMetadata(), - RecipientEmail: "user@example.com", - SummaryRaw: []byte("{}"), - ChannelID: "chan-1", + job := workertypes.IncomingEmailDeliveryJob{ + EmailDeliveryJob: workertypes.EmailDeliveryJob{ + SubscriptionID: "sub-1", + Metadata: testMetadata(), + RecipientEmail: "user@example.com", + SummaryRaw: []byte("{}"), + ChannelID: "chan-1", + }, + EmailEventID: "job-id", } sender := new(mockEmailSender) @@ -158,10 +188,16 @@ func TestProcessMessage_RenderError(t *testing.T) { renderer.renderErr = errors.New("template error") h := NewSender(sender, stateManager, renderer) + h.now = fakeNow + + // Should return non transient error (ACK) for rendering error + err := h.ProcessMessage(ctx, job) + if errors.Is(err, event.ErrTransientFailure) { + t.Errorf("Expected non transient error for render failure, got %v", err) + } - // Should return nil (ACK) for rendering error (assuming permanent for now) - if err := h.ProcessMessage(ctx, job); err != nil { - t.Errorf("Expected nil error for render failure, got %v", err) + if !errors.Is(err, renderer.renderErr) { + t.Errorf("Expected configured renderer error, got %v", err) } // Should record failure @@ -177,34 +213,81 @@ func TestProcessMessage_RenderError(t *testing.T) { func TestProcessMessage_SendError(t *testing.T) { ctx := context.Background() - job := workertypes.EmailDeliveryJob{ - SubscriptionID: "sub-1", - Metadata: testMetadata(), - RecipientEmail: "user@example.com", - SummaryRaw: []byte("{}"), - ChannelID: "chan-1", + job := workertypes.IncomingEmailDeliveryJob{ + EmailDeliveryJob: workertypes.EmailDeliveryJob{ + SubscriptionID: "sub-1", + Metadata: testMetadata(), + RecipientEmail: "user@example.com", + SummaryRaw: []byte("{}"), + ChannelID: "chan-1", + }, + EmailEventID: "job-id", } - sendErr := errors.New("smtp timeout") - sender := &mockEmailSender{sendErr: sendErr, sentCalls: nil} - stateManager := new(mockChannelStateManager) - renderer := new(mockTemplateRenderer) - renderer.renderSubject = "S" - renderer.renderBody = "B" - - h := NewSender(sender, stateManager, renderer) - - // Should return error (NACK) for send failure to allow retry - err := h.ProcessMessage(ctx, job) - if !errors.Is(err, sendErr) { - t.Errorf("Expected send error to propagate, got %v", err) + testCases := []struct { + name string + sendErr error + isPermanentUserError bool + wantNack bool + }{ + { + "regular error = NACK", + errors.New("send error"), + false, + true, + }, + { + "user error = ACK", + workertypes.ErrUnrecoverableUserFailureEmailSending, + true, + false, + }, + { + "system error = ACK", + workertypes.ErrUnrecoverableSystemFailureEmailSending, + false, + false, + }, } - // Should record failure in DB as well - if len(stateManager.failureCalls) != 1 { - t.Fatal("Expected failure recording") - } - if stateManager.failureCalls[0].channelID != testChannelID { - t.Errorf("Recorded failure for wrong channel") + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + sender := &mockEmailSender{sendErr: tc.sendErr, sentCalls: nil} + stateManager := new(mockChannelStateManager) + renderer := new(mockTemplateRenderer) + renderer.renderSubject = "S" + renderer.renderBody = "B" + + h := NewSender(sender, stateManager, renderer) + h.now = fakeNow + + err := h.ProcessMessage(ctx, job) + if !errors.Is(err, tc.sendErr) { + t.Errorf("Expected send error %v, got %v", tc.sendErr, err) + } + // Should record failure in DB as well + if len(stateManager.failureCalls) != 1 { + t.Fatal("Expected failure recording") + } + if stateManager.failureCalls[0].channelID != testChannelID { + t.Errorf("Recorded failure for wrong channel") + } + if stateManager.failureCalls[0].emailEventID != "job-id" { + t.Errorf("Recorded failure for wrong event") + } + if tc.isPermanentUserError != stateManager.failureCalls[0].isPermanentUserError { + t.Errorf("Recorded failure for wrong error type") + } + if !stateManager.failureCalls[0].timestamp.Equal(fakeNow()) { + t.Errorf("Recorded failure for wrong timestamp") + } + + if tc.wantNack { + if !errors.Is(err, event.ErrTransientFailure) { + t.Errorf("Expected transient failure for NACK, got %v", err) + } + } + }) } + }