From 76f987a1df6e7c5aef6c35fd516b4fa729d1c880 Mon Sep 17 00:00:00 2001 From: Jacob Hoffman-Andrews Date: Mon, 23 May 2022 16:16:43 -0700 Subject: [PATCH] Reland "Allow expiration mailer to work in parallel" (#6133) This reverts commit 7ef6913e7152529b8d275a1a126a5eceb3a953e4. We turned on the `ExpirationMailerDontLookTwice` feature flag in prod, and it's working fine but not clearing the backlog. Since https://github.com/letsencrypt/boulder/pull/6100 fixed the issue that caused us to (nearly) stop sending mail when we deployed #6057, this should be safe to roll forward. The revert of the revert applied cleanly, except for expiration-mailer/main.go and `main_test.go`, particularly around the contents `processCerts` (where `sendToOneRegID` was extracted from) and `sendToOneRegID` itself. So those areas are good targets for extra attention. --- cmd/bad-key-revoker/main.go | 6 +- cmd/expiration-mailer/main.go | 170 ++++++++++++++--------- cmd/expiration-mailer/main_test.go | 45 +++++- cmd/expiration-mailer/send_test.go | 4 +- cmd/notify-mailer/main.go | 6 +- mail/mailer.go | 177 +++++++++++++----------- mail/mailer_test.go | 30 ++-- mocks/mocks.go | 26 +++- test/config-next/expiration-mailer.json | 1 + 9 files changed, 296 insertions(+), 169 deletions(-) diff --git a/cmd/bad-key-revoker/main.go b/cmd/bad-key-revoker/main.go index 8ae11c24fc8..cde3af1e217 100644 --- a/cmd/bad-key-revoker/main.go +++ b/cmd/bad-key-revoker/main.go @@ -194,12 +194,12 @@ var maxSerials = 100 // sendMessage sends a single email to the provided address with the revoked // serials func (bkr *badKeyRevoker) sendMessage(addr string, serials []string) error { - err := bkr.mailer.Connect() + conn, err := bkr.mailer.Connect() if err != nil { return err } defer func() { - _ = bkr.mailer.Close() + _ = conn.Close() }() mutSerials := make([]string, len(serials)) copy(mutSerials, serials) @@ -213,7 +213,7 @@ func (bkr *badKeyRevoker) sendMessage(addr string, serials []string) error { if err != nil { return err } - err = bkr.mailer.SendMail([]string{addr}, bkr.emailSubject, message.String()) + err = conn.SendMail([]string{addr}, bkr.emailSubject, message.String()) if err != nil { return err } diff --git a/cmd/expiration-mailer/main.go b/cmd/expiration-mailer/main.go index a286ed05f2f..bb33d6b9534 100644 --- a/cmd/expiration-mailer/main.go +++ b/cmd/expiration-mailer/main.go @@ -15,6 +15,7 @@ import ( "os" "sort" "strings" + "sync" "text/template" "time" @@ -52,6 +53,7 @@ type mailer struct { emailTemplate *template.Template subjectTemplate *template.Template nagTimes []time.Duration + parallelSends uint limit int clk clock.Clock stats mailerStats @@ -68,7 +70,7 @@ type mailerStats struct { certificatesPerAccountNeedingMail prometheus.Histogram } -func (m *mailer) sendNags(contacts []string, certs []*x509.Certificate) error { +func (m *mailer) sendNags(conn bmail.Conn, contacts []string, certs []*x509.Certificate) error { // TODO(#6121): Remove this if !features.Enabled(features.ExpirationMailerDontLookTwice) { if len(contacts) == 0 { @@ -166,7 +168,7 @@ func (m *mailer) sendNags(contacts []string, certs []*x509.Certificate) error { m.log.Infof("attempting send JSON=%s", string(logStr)) startSending := m.clk.Now() - err = m.mailer.SendMail(emails, subjBuf.String(), msgBuf.String()) + err = conn.SendMail(emails, subjBuf.String(), msgBuf.String()) if err != nil { m.log.Errf("failed send JSON=%s", string(logStr)) return err @@ -197,6 +199,11 @@ func (m *mailer) certIsRenewed(ctx context.Context, names []string, issued time. return present, err } +type work struct { + regID int64 + certs []core.Certificate +} + func (m *mailer) processCerts(ctx context.Context, allCerts []core.Certificate) error { regIDToCerts := make(map[int64][]core.Certificate) @@ -206,88 +213,126 @@ func (m *mailer) processCerts(ctx context.Context, allCerts []core.Certificate) regIDToCerts[cert.RegistrationID] = cs } - err := m.mailer.Connect() + parallelSends := m.parallelSends + if parallelSends == 0 { + parallelSends = 1 + } + + var wg sync.WaitGroup + workChan := make(chan work, len(regIDToCerts)) + + // Populate the work chan on a goroutine so work is available as soon + // as one of the sender routines starts. + go func(ch chan<- work) { + for regID, certs := range regIDToCerts { + ch <- work{regID, certs} + } + close(workChan) + }(workChan) + + for senderNum := uint(0); senderNum < parallelSends; senderNum++ { + // For politeness' sake, don't open more than 1 new connection per + // second. + if senderNum > 0 { + time.Sleep(time.Second) + } + + if ctx.Err() != nil { + return ctx.Err() + } + + conn, err := m.mailer.Connect() + if err != nil { + m.log.AuditErrf("connecting parallel sender %d: %s", senderNum, err) + return err + } + wg.Add(1) + go func(conn bmail.Conn, ch <-chan work) { + defer wg.Done() + for w := range ch { + err := m.sendToOneRegID(ctx, conn, w.regID, w.certs) + if err != nil { + m.log.AuditErr(err.Error()) + } + } + conn.Close() + }(conn, workChan) + } + wg.Wait() + return nil +} + +func (m *mailer) sendToOneRegID(ctx context.Context, conn bmail.Conn, regID int64, certs []core.Certificate) error { + if ctx.Err() != nil { + return ctx.Err() + } + reg, err := m.rs.GetRegistration(ctx, &sapb.RegistrationID{Id: regID}) if err != nil { - return fmt.Errorf("connecting to SMTP server: %w", err) + m.stats.errorCount.With(prometheus.Labels{"type": "GetRegistration"}).Inc() + return fmt.Errorf("Error fetching registration %d: %s", regID, err) } - defer func() { - _ = m.mailer.Close() - }() - for regID, certs := range regIDToCerts { + parsedCerts := []*x509.Certificate{} + for _, cert := range certs { if ctx.Err() != nil { return ctx.Err() } - reg, err := m.rs.GetRegistration(ctx, &sapb.RegistrationID{Id: regID}) + parsedCert, err := x509.ParseCertificate(cert.DER) if err != nil { - m.log.AuditErrf("Error fetching registration %d: %s", regID, err) - m.stats.errorCount.With(prometheus.Labels{"type": "GetRegistration"}).Inc() + // TODO(#1420): tell registration about this error + m.log.AuditErrf("Error parsing certificate %s: %s", cert.Serial, err) + m.stats.errorCount.With(prometheus.Labels{"type": "ParseCertificate"}).Inc() continue } - parsedCerts := []*x509.Certificate{} - for _, cert := range certs { - if ctx.Err() != nil { - return ctx.Err() - } - parsedCert, err := x509.ParseCertificate(cert.DER) + renewed, err := m.certIsRenewed(ctx, parsedCert.DNSNames, parsedCert.NotBefore) + if err != nil { + m.log.AuditErrf("expiration-mailer: error fetching renewal state: %v", err) + // assume not renewed + } else if renewed { + m.log.Debugf("Cert %s is already renewed", cert.Serial) + m.stats.certificatesAlreadyRenewed.Add(1) + err := m.updateCertStatus(ctx, cert.Serial) if err != nil { - // TODO(#1420): tell registration about this error - m.log.AuditErrf("Error parsing certificate %s: %s", cert.Serial, err) - m.stats.errorCount.With(prometheus.Labels{"type": "ParseCertificate"}).Inc() - continue + m.log.AuditErrf("Error updating certificate status for %s: %s", cert.Serial, err) + m.stats.errorCount.With(prometheus.Labels{"type": "UpdateCertificateStatus"}).Inc() } + continue + } - renewed, err := m.certIsRenewed(ctx, parsedCert.DNSNames, parsedCert.NotBefore) - if err != nil { - m.log.AuditErrf("expiration-mailer: error fetching renewal state: %v", err) - // assume not renewed - } else if renewed { - m.log.Debugf("Cert %s is already renewed", cert.Serial) - m.stats.certificatesAlreadyRenewed.Add(1) - err := m.updateCertStatus(ctx, cert.Serial) - if err != nil { - m.log.AuditErrf("Error updating certificate status for %s: %s", cert.Serial, err) - m.stats.errorCount.With(prometheus.Labels{"type": "UpdateCertificateStatus"}).Inc() - } - continue - } + parsedCerts = append(parsedCerts, parsedCert) + } - parsedCerts = append(parsedCerts, parsedCert) - } + m.stats.certificatesPerAccountNeedingMail.Observe(float64(len(parsedCerts))) - m.stats.certificatesPerAccountNeedingMail.Observe(float64(len(parsedCerts))) + if len(parsedCerts) == 0 { + // all certificates are renewed + return nil + } - if len(parsedCerts) == 0 { - // all certificates are renewed - continue + // TODO(#6121): Remove this + if !features.Enabled(features.ExpirationMailerDontLookTwice) { + if len(reg.Contact) == 0 { + return nil } + } - // TODO(#6121): Remove this - if !features.Enabled(features.ExpirationMailerDontLookTwice) { - if len(reg.Contact) == 0 { - continue - } + err = m.sendNags(conn, reg.Contact, parsedCerts) + if err != nil { + m.stats.errorCount.With(prometheus.Labels{"type": "SendNags"}).Inc() + return fmt.Errorf("sending nag emails: %s", err) + } + for _, cert := range parsedCerts { + if ctx.Err() != nil { + return ctx.Err() } - - err = m.sendNags(reg.Contact, parsedCerts) + serial := core.SerialToString(cert.SerialNumber) + err = m.updateCertStatus(ctx, serial) if err != nil { - m.stats.errorCount.With(prometheus.Labels{"type": "SendNags"}).Inc() - m.log.AuditErrf("Error sending nag emails: %s", err) + m.log.AuditErrf("Error updating certificate status for %s: %s", serial, err) + m.stats.errorCount.With(prometheus.Labels{"type": "UpdateCertificateStatus"}).Inc() continue } - for _, cert := range parsedCerts { - if ctx.Err() != nil { - return ctx.Err() - } - serial := core.SerialToString(cert.SerialNumber) - err = m.updateCertStatus(ctx, serial) - if err != nil { - m.log.AuditErrf("Error updating certificate status for %s: %s", serial, err) - m.stats.errorCount.With(prometheus.Labels{"type": "UpdateCertificateStatus"}).Inc() - continue - } - } } return nil } @@ -651,6 +696,7 @@ func main() { emailTemplate: tmpl, nagTimes: nags, limit: c.Mailer.CertLimit, + parallelSends: c.Mailer.ParallelSends, clk: clk, stats: initStats(scope), } diff --git a/cmd/expiration-mailer/main_test.go b/cmd/expiration-mailer/main_test.go index be43629902c..0e73a600af5 100644 --- a/cmd/expiration-mailer/main_test.go +++ b/cmd/expiration-mailer/main_test.go @@ -6,6 +6,7 @@ import ( "crypto/elliptic" "crypto/rand" "crypto/x509" + "errors" "fmt" "math/big" "net" @@ -21,6 +22,7 @@ import ( berrors "github.com/letsencrypt/boulder/errors" "github.com/letsencrypt/boulder/features" blog "github.com/letsencrypt/boulder/log" + bmail "github.com/letsencrypt/boulder/mail" "github.com/letsencrypt/boulder/metrics" "github.com/letsencrypt/boulder/mocks" "github.com/letsencrypt/boulder/sa" @@ -113,7 +115,9 @@ func TestSendNags(t *testing.T) { DNSNames: []string{"example.com"}, } - err := m.sendNags([]string{emailA}, []*x509.Certificate{cert}) + conn, err := m.mailer.Connect() + test.AssertNotError(t, err, "connecting SMTP") + err = m.sendNags(conn, []string{emailA}, []*x509.Certificate{cert}) test.AssertNotError(t, err, "Failed to send warning messages") test.AssertEquals(t, len(mc.Messages), 1) test.AssertEquals(t, mocks.MailerMessage{ @@ -123,7 +127,9 @@ func TestSendNags(t *testing.T) { }, mc.Messages[0]) mc.Clear() - err = m.sendNags([]string{emailA, emailB}, []*x509.Certificate{cert}) + conn, err = m.mailer.Connect() + test.AssertNotError(t, err, "connecting SMTP") + err = m.sendNags(conn, []string{emailA, emailB}, []*x509.Certificate{cert}) test.AssertNotError(t, err, "Failed to send warning messages") test.AssertEquals(t, len(mc.Messages), 2) test.AssertEquals(t, mocks.MailerMessage{ @@ -138,7 +144,9 @@ func TestSendNags(t *testing.T) { }, mc.Messages[1]) mc.Clear() - err = m.sendNags([]string{}, []*x509.Certificate{cert}) + conn, err = m.mailer.Connect() + test.AssertNotError(t, err, "connecting SMTP") + err = m.sendNags(conn, []string{}, []*x509.Certificate{cert}) test.AssertNotError(t, err, "Not an error to pass no email contacts") test.AssertEquals(t, len(mc.Messages), 0) @@ -291,6 +299,37 @@ func TestNoContactCertIsRenewed(t *testing.T) { test.AssertMetricWithLabelsEquals(t, certsAlreadyRenewed, prometheus.Labels{}, 1.0) } +func TestProcessCertsParallel(t *testing.T) { + testCtx := setup(t, []time.Duration{time.Hour * 24 * 7}) + + testCtx.m.parallelSends = 2 + certs := addExpiringCerts(t, testCtx) + log.Clear() + testCtx.m.processCerts(context.Background(), certs) + // Test that the lastExpirationNagSent was updated for the certificate + // corresponding to serial4, which is set up as "already renewed" by + // addExpiringCerts. + if len(log.GetAllMatching("DEBUG: SQL: UPDATE certificateStatus .*2006-01-02 15:04:05.999999999.*\"000000000000000000000000000000001339\"")) != 1 { + t.Errorf("Expected an update to certificateStatus, got these log lines:\n%s", + strings.Join(log.GetAllMatching(".*"), "\n")) + } +} + +type erroringMailClient struct{} + +func (e erroringMailClient) Connect() (bmail.Conn, error) { + return nil, errors.New("whoopsie-doo") +} + +func TestProcessCertsConnectError(t *testing.T) { + testCtx := setup(t, []time.Duration{time.Hour * 24 * 7}) + + testCtx.m.mailer = erroringMailClient{} + certs := addExpiringCerts(t, testCtx) + // Checking that this terminates rather than deadlocks + testCtx.m.processCerts(context.Background(), certs) +} + func TestFindExpiringCertificates(t *testing.T) { testCtx := setup(t, []time.Duration{time.Hour * 24, time.Hour * 24 * 4, time.Hour * 24 * 7}) diff --git a/cmd/expiration-mailer/send_test.go b/cmd/expiration-mailer/send_test.go index 80c5a7e3f3b..9ff4f9d0348 100644 --- a/cmd/expiration-mailer/send_test.go +++ b/cmd/expiration-mailer/send_test.go @@ -33,7 +33,9 @@ func TestSendEarliestCertInfo(t *testing.T) { serial2, ) - err := ctx.m.sendNags([]string{email1, email2}, []*x509.Certificate{rawCertA, rawCertB}) + conn, err := ctx.m.mailer.Connect() + test.AssertNotError(t, err, "connecting SMTP") + err = ctx.m.sendNags(conn, []string{email1, email2}, []*x509.Certificate{rawCertA, rawCertB}) if err != nil { t.Fatal(err) } diff --git a/cmd/notify-mailer/main.go b/cmd/notify-mailer/main.go index 8bc97bd82e3..1f8493ea1bd 100644 --- a/cmd/notify-mailer/main.go +++ b/cmd/notify-mailer/main.go @@ -154,12 +154,12 @@ func (m *mailer) run() error { m.log.Infof("Address %q was associated with the most recipients (%d)", mostRecipients, mostRecipientsLen) - err = m.mailer.Connect() + conn, err := m.mailer.Connect() if err != nil { return err } - defer func() { _ = m.mailer.Close() }() + defer func() { _ = conn.Close() }() startTime := m.clk.Now() sortedAddresses := sortAddresses(addressToRecipient) @@ -186,7 +186,7 @@ func (m *mailer) run() error { continue } - err = m.mailer.SendMail([]string{address}, m.subject, messageBody) + err = conn.SendMail([]string{address}, m.subject, messageBody) if err != nil { var badAddrErr bmail.BadAddressSMTPError if errors.As(err, &badAddrErr) { diff --git a/mail/mailer.go b/mail/mailer.go index b20de9496da..4249108eb0c 100644 --- a/mail/mailer.go +++ b/mail/mailer.go @@ -43,20 +43,37 @@ func (s realSource) generate() *big.Int { return randInt } -// Mailer provides the interface for a mailer +// Mailer is an interface that allows creating Conns. Implementations must +// be safe for concurrent use. type Mailer interface { + Connect() (Conn, error) +} + +// Conn is an interface that allows sending mail. When you are done with a +// Conn, call Close(). Implementations are not required to be safe for +// concurrent use. +type Conn interface { SendMail([]string, string, string) error - Connect() error Close() error } -// MailerImpl defines a mail transfer agent to use for sending mail. It is not -// safe for concurrent access. -type MailerImpl struct { +// connImpl represents a single connection to a mail server. It is not safe +// for concurrent use. +type connImpl struct { + config + client smtpClient +} + +// mailerImpl defines a mail transfer agent to use for sending mail. It is +// safe for concurrent us. +type mailerImpl struct { + config +} + +type config struct { log blog.Logger dialer dialer from mail.Address - client smtpClient clk clock.Clock csprgSource idGenerator reconnectBase time.Duration @@ -126,7 +143,7 @@ func New( logger blog.Logger, stats prometheus.Registerer, reconnectBase time.Duration, - reconnectMax time.Duration) *MailerImpl { + reconnectMax time.Duration) *mailerImpl { sendMailAttempts := prometheus.NewCounterVec(prometheus.CounterOpts{ Name: "send_mail_attempts", @@ -134,42 +151,46 @@ func New( }, []string{"result", "error"}) stats.MustRegister(sendMailAttempts) - return &MailerImpl{ - dialer: &dialerImpl{ - username: username, - password: password, - server: server, - port: port, - rootCAs: rootCAs, + return &mailerImpl{ + config: config{ + dialer: &dialerImpl{ + username: username, + password: password, + server: server, + port: port, + rootCAs: rootCAs, + }, + log: logger, + from: from, + clk: clock.New(), + csprgSource: realSource{}, + reconnectBase: reconnectBase, + reconnectMax: reconnectMax, + sendMailAttempts: sendMailAttempts, }, - log: logger, - from: from, - clk: clock.New(), - csprgSource: realSource{}, - reconnectBase: reconnectBase, - reconnectMax: reconnectMax, - sendMailAttempts: sendMailAttempts, } } // New constructs a Mailer suitable for doing a dry run. It simply logs each // command that would have been run, at debug level. -func NewDryRun(from mail.Address, logger blog.Logger) *MailerImpl { - return &MailerImpl{ - dialer: dryRunClient{logger}, - from: from, - clk: clock.New(), - csprgSource: realSource{}, - sendMailAttempts: prometheus.NewCounterVec(prometheus.CounterOpts{ - Name: "send_mail_attempts", - Help: "A counter of send mail attempts labelled by result", - }, []string{"result", "error"}), +func NewDryRun(from mail.Address, logger blog.Logger) *mailerImpl { + return &mailerImpl{ + config: config{ + dialer: dryRunClient{logger}, + from: from, + clk: clock.New(), + csprgSource: realSource{}, + sendMailAttempts: prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: "send_mail_attempts", + Help: "A counter of send mail attempts labelled by result", + }, []string{"result", "error"}), + }, } } -func (m *MailerImpl) generateMessage(to []string, subject, body string) ([]byte, error) { - mid := m.csprgSource.generate() - now := m.clk.Now().UTC() +func (c config) generateMessage(to []string, subject, body string) ([]byte, error) { + mid := c.csprgSource.generate() + now := c.clk.Now().UTC() addrs := []string{} for _, a := range to { if !core.IsASCII(a) { @@ -179,10 +200,10 @@ func (m *MailerImpl) generateMessage(to []string, subject, body string) ([]byte, } headers := []string{ fmt.Sprintf("To: %s", strings.Join(addrs, ", ")), - fmt.Sprintf("From: %s", m.from.String()), + fmt.Sprintf("From: %s", c.from.String()), fmt.Sprintf("Subject: %s", subject), fmt.Sprintf("Date: %s", now.Format(time.RFC822)), - fmt.Sprintf("Message-Id: <%s.%s.%s>", now.Format("20060102T150405"), mid.String(), m.from.Address), + fmt.Sprintf("Message-Id: <%s.%s.%s>", now.Format("20060102T150405"), mid.String(), c.from.Address), "MIME-Version: 1.0", "Content-Type: text/plain; charset=UTF-8", "Content-Transfer-Encoding: quoted-printable", @@ -208,31 +229,31 @@ func (m *MailerImpl) generateMessage(to []string, subject, body string) ([]byte, )), nil } -func (m *MailerImpl) reconnect() { +func (c *connImpl) reconnect() { for i := 0; ; i++ { - sleepDuration := core.RetryBackoff(i, m.reconnectBase, m.reconnectMax, 2) - m.log.Infof("sleeping for %s before reconnecting mailer", sleepDuration) - m.clk.Sleep(sleepDuration) - m.log.Info("attempting to reconnect mailer") - err := m.Connect() + sleepDuration := core.RetryBackoff(i, c.reconnectBase, c.reconnectMax, 2) + c.log.Infof("sleeping for %s before reconnecting mailer", sleepDuration) + c.clk.Sleep(sleepDuration) + c.log.Info("attempting to reconnect mailer") + client, err := c.dialer.Dial() if err != nil { - m.log.Warningf("reconnect error: %s", err) + c.log.Warningf("reconnect error: %s", err) continue } + c.client = client break } - m.log.Info("reconnected successfully") + c.log.Info("reconnected successfully") } // Connect opens a connection to the specified mail server. It must be called // before SendMail. -func (m *MailerImpl) Connect() error { +func (m *mailerImpl) Connect() (Conn, error) { client, err := m.dialer.Dial() if err != nil { - return err + return nil, err } - m.client = client - return nil + return &connImpl{m.config, client}, nil } type dialerImpl struct { @@ -265,43 +286,43 @@ func (di *dialerImpl) Dial() (smtpClient, error) { // argument as an error. If the reset command also errors, it combines both // errors and returns them. Without this we would get `nested MAIL command`. // https://github.com/letsencrypt/boulder/issues/3191 -func (m *MailerImpl) resetAndError(err error) error { +func (c *connImpl) resetAndError(err error) error { if err == io.EOF { return err } - if err2 := m.client.Reset(); err2 != nil { + if err2 := c.client.Reset(); err2 != nil { return fmt.Errorf("%s (also, on sending RSET: %s)", err, err2) } return err } -func (m *MailerImpl) sendOne(to []string, subject, msg string) error { - if m.client == nil { +func (c *connImpl) sendOne(to []string, subject, msg string) error { + if c.client == nil { return errors.New("call Connect before SendMail") } - body, err := m.generateMessage(to, subject, msg) + body, err := c.generateMessage(to, subject, msg) if err != nil { return err } - if err = m.client.Mail(m.from.String()); err != nil { + if err = c.client.Mail(c.from.String()); err != nil { return err } for _, t := range to { - if err = m.client.Rcpt(t); err != nil { - return m.resetAndError(err) + if err = c.client.Rcpt(t); err != nil { + return c.resetAndError(err) } } - w, err := m.client.Data() + w, err := c.client.Data() if err != nil { - return m.resetAndError(err) + return c.resetAndError(err) } _, err = w.Write(body) if err != nil { - return m.resetAndError(err) + return c.resetAndError(err) } err = w.Close() if err != nil { - return m.resetAndError(err) + return c.resetAndError(err) } return nil } @@ -336,34 +357,34 @@ var badAddressErrorCodes = map[int]bool{ // SendMail sends an email to the provided list of recipients. The email body // is simple text. -func (m *MailerImpl) SendMail(to []string, subject, msg string) error { +func (c *connImpl) SendMail(to []string, subject, msg string) error { var protoErr *textproto.Error for { - err := m.sendOne(to, subject, msg) + err := c.sendOne(to, subject, msg) if err == nil { // If the error is nil, we sent the mail without issue. nice! break } else if err == io.EOF { - m.sendMailAttempts.WithLabelValues("failure", "EOF").Inc() + c.sendMailAttempts.WithLabelValues("failure", "EOF").Inc() // If the error is an EOF, we should try to reconnect on a backoff // schedule, sleeping between attempts. - m.reconnect() + c.reconnect() // After reconnecting, loop around and try `sendOne` again. continue } else if errors.Is(err, syscall.ECONNRESET) { - m.sendMailAttempts.WithLabelValues("failure", "TCP RST").Inc() + c.sendMailAttempts.WithLabelValues("failure", "TCP RST").Inc() // If the error is `syscall.ECONNRESET`, we should try to reconnect on a backoff // schedule, sleeping between attempts. - m.reconnect() + c.reconnect() // After reconnecting, loop around and try `sendOne` again. continue } else if errors.Is(err, syscall.EPIPE) { // EPIPE also seems to be a common way to signal TCP RST. - m.sendMailAttempts.WithLabelValues("failure", "EPIPE").Inc() - m.reconnect() + c.sendMailAttempts.WithLabelValues("failure", "EPIPE").Inc() + c.reconnect() continue } else if errors.As(err, &protoErr) && protoErr.Code == 421 { - m.sendMailAttempts.WithLabelValues("failure", "SMTP 421").Inc() + c.sendMailAttempts.WithLabelValues("failure", "SMTP 421").Inc() /* * If the error is an instance of `textproto.Error` with a SMTP error code, * and that error code is 421 then treat this as a reconnect-able event. @@ -379,28 +400,30 @@ func (m *MailerImpl) SendMail(to []string, subject, msg string) error { * * [0] - https://github.com/letsencrypt/boulder/issues/2249 */ - m.reconnect() + c.reconnect() // After reconnecting, loop around and try `sendOne` again. continue } else if errors.As(err, &protoErr) && badAddressErrorCodes[protoErr.Code] { - m.sendMailAttempts.WithLabelValues("failure", fmt.Sprintf("SMTP %d", protoErr.Code)).Inc() + c.sendMailAttempts.WithLabelValues("failure", fmt.Sprintf("SMTP %d", protoErr.Code)).Inc() return BadAddressSMTPError{fmt.Sprintf("%d: %s", protoErr.Code, protoErr.Msg)} } else { // If it wasn't an EOF error or a recoverable SMTP error it is unexpected and we // return from SendMail() with the error - m.sendMailAttempts.WithLabelValues("failure", "unexpected").Inc() + c.sendMailAttempts.WithLabelValues("failure", "unexpected").Inc() return err } } - m.sendMailAttempts.WithLabelValues("success", "").Inc() + c.sendMailAttempts.WithLabelValues("success", "").Inc() return nil } // Close closes the connection. -func (m *MailerImpl) Close() error { - if m.client == nil { - return errors.New("call Connect before Close") +func (c *connImpl) Close() error { + err := c.client.Close() + if err != nil { + return err } - return m.client.Close() + c.client = nil + return nil } diff --git a/mail/mailer_test.go b/mail/mailer_test.go index a192e40dbf9..d41582fea02 100644 --- a/mail/mailer_test.go +++ b/mail/mailer_test.go @@ -264,7 +264,7 @@ func rstHandler(rstFirst int) connHandler { } } -func setup(t *testing.T) (*MailerImpl, *net.TCPListener, func()) { +func setup(t *testing.T) (*mailerImpl, *net.TCPListener, func()) { fromAddress, _ := mail.ParseAddress("you-are-a-winner@example.com") log := blog.UseMock() @@ -322,11 +322,11 @@ func TestConnect(t *testing.T) { defer cleanUp() go listenForever(l, t, normalHandler) - err := m.Connect() + conn, err := m.Connect() if err != nil { t.Errorf("Failed to connect: %s", err) } - err = m.Close() + err = conn.Close() if err != nil { t.Errorf("Failed to clean up: %s", err) } @@ -344,11 +344,11 @@ func TestReconnectSuccess(t *testing.T) { // With a mailer client that has a max attempt > `closedConns` we expect no // error. The message should be delivered after `closedConns` reconnect // attempts. - err := m.Connect() + conn, err := m.Connect() if err != nil { t.Errorf("Failed to connect: %s", err) } - err = m.SendMail([]string{"hi@bye.com"}, "You are already a winner!", "Just kidding") + err = conn.SendMail([]string{"hi@bye.com"}, "You are already a winner!", "Just kidding") if err != nil { t.Errorf("Expected SendMail() to not fail. Got err: %s", err) } @@ -361,12 +361,12 @@ func TestBadEmailError(t *testing.T) { go listenForever(l, t, badEmailHandler(messages)) - err := m.Connect() + conn, err := m.Connect() if err != nil { t.Errorf("Failed to connect: %s", err) } - err = m.SendMail([]string{"hi@bye.com"}, "You are already a winner!", "Just kidding") + err = conn.SendMail([]string{"hi@bye.com"}, "You are already a winner!", "Just kidding") // We expect there to be an error if err == nil { t.Errorf("Expected SendMail() to return an BadAddressSMTPError, got nil") @@ -393,11 +393,11 @@ func TestReconnectSMTP421(t *testing.T) { // With a mailer client that has a max attempt > `closedConns` we expect no // error. The message should be delivered after `closedConns` reconnect // attempts. - err := m.Connect() + conn, err := m.Connect() if err != nil { t.Errorf("Failed to connect: %s", err) } - err = m.SendMail([]string{"hi@bye.com"}, "You are already a winner!", "Just kidding") + err = conn.SendMail([]string{"hi@bye.com"}, "You are already a winner!", "Just kidding") if err != nil { t.Errorf("Expected SendMail() to not fail. Got err: %s", err) } @@ -439,12 +439,12 @@ func TestOtherError(t *testing.T) { _, _ = conn.Write([]byte("250 Ok yr rset now\r\n")) }) - err := m.Connect() + conn, err := m.Connect() if err != nil { t.Errorf("Failed to connect: %s", err) } - err = m.SendMail([]string{"hi@bye.com"}, "You are already a winner!", "Just kidding") + err = conn.SendMail([]string{"hi@bye.com"}, "You are already a winner!", "Just kidding") // We expect there to be an error if err == nil { t.Errorf("Expected SendMail() to return an error, got nil") @@ -488,12 +488,12 @@ func TestOtherError(t *testing.T) { _, _ = conn.Write([]byte("nop\r\n")) }) - err = m.Connect() + conn, err = m.Connect() if err != nil { t.Errorf("Failed to connect: %s", err) } - err = m.SendMail([]string{"hi@bye.com"}, "You are already a winner!", "Just kidding") + err = conn.SendMail([]string{"hi@bye.com"}, "You are already a winner!", "Just kidding") // We expect there to be an error test.AssertError(t, err, "SendMail didn't fail as expected") test.AssertEquals(t, err.Error(), "999 1.1.1 This would probably be bad? (also, on sending RSET: short response: nop)") @@ -511,11 +511,11 @@ func TestReconnectAfterRST(t *testing.T) { // With a mailer client that has a max attempt > `closedConns` we expect no // error. The message should be delivered after `closedConns` reconnect // attempts. - err := m.Connect() + conn, err := m.Connect() if err != nil { t.Errorf("Failed to connect: %s", err) } - err = m.SendMail([]string{"hi@bye.com"}, "You are already a winner!", "Just kidding") + err = conn.SendMail([]string{"hi@bye.com"}, "You are already a winner!", "Just kidding") if err != nil { t.Errorf("Expected SendMail() to not fail. Got err: %s", err) } diff --git a/mocks/mocks.go b/mocks/mocks.go index a9d32882acc..21e882d63e8 100644 --- a/mocks/mocks.go +++ b/mocks/mocks.go @@ -10,6 +10,7 @@ import ( "io/ioutil" "math/rand" "net" + "sync" "time" "github.com/jmhodges/clock" @@ -22,6 +23,7 @@ import ( berrors "github.com/letsencrypt/boulder/errors" bgrpc "github.com/letsencrypt/boulder/grpc" "github.com/letsencrypt/boulder/identifier" + "github.com/letsencrypt/boulder/mail" "github.com/letsencrypt/boulder/probs" pubpb "github.com/letsencrypt/boulder/publisher/proto" sapb "github.com/letsencrypt/boulder/sa/proto" @@ -560,9 +562,19 @@ func (*PublisherClient) SubmitToSingleCTWithResult(_ context.Context, _ *pubpb.R // Mailer is a mock type Mailer struct { + sync.Mutex Messages []MailerMessage } +var _ mail.Mailer = &Mailer{} + +// mockMailerConn is a mock that satisfies the mail.Conn interface +type mockMailerConn struct { + parent *Mailer +} + +var _ mail.Conn = &mockMailerConn{} + // MailerMessage holds the captured emails from SendMail() type MailerMessage struct { To string @@ -572,13 +584,17 @@ type MailerMessage struct { // Clear removes any previously recorded messages func (m *Mailer) Clear() { + m.Lock() + defer m.Unlock() m.Messages = nil } // SendMail is a mock -func (m *Mailer) SendMail(to []string, subject, msg string) error { +func (m *mockMailerConn) SendMail(to []string, subject, msg string) error { + m.parent.Lock() + defer m.parent.Unlock() for _, rcpt := range to { - m.Messages = append(m.Messages, MailerMessage{ + m.parent.Messages = append(m.parent.Messages, MailerMessage{ To: rcpt, Subject: subject, Body: msg, @@ -588,13 +604,13 @@ func (m *Mailer) SendMail(to []string, subject, msg string) error { } // Close is a mock -func (m *Mailer) Close() error { +func (m *mockMailerConn) Close() error { return nil } // Connect is a mock -func (m *Mailer) Connect() error { - return nil +func (m *Mailer) Connect() (mail.Conn, error) { + return &mockMailerConn{parent: m}, nil } // SAWithFailedChallenges is a mocks.StorageAuthority that has diff --git a/test/config-next/expiration-mailer.json b/test/config-next/expiration-mailer.json index fe672f7c525..a45b4360d8d 100644 --- a/test/config-next/expiration-mailer.json +++ b/test/config-next/expiration-mailer.json @@ -13,6 +13,7 @@ "nagTimes": ["480h", "240h"], "emailTemplate": "test/example-expiration-template", "debugAddr": ":8008", + "parallelSends": 10, "tls": { "caCertFile": "test/grpc-creds/minica.pem", "certFile": "test/grpc-creds/expiration-mailer.boulder/cert.pem",