Skip to content

Commit

Permalink
Reland "Allow expiration mailer to work in parallel" (#6133)
Browse files Browse the repository at this point in the history
This reverts commit 7ef6913.

We turned on the `ExpirationMailerDontLookTwice` feature flag in prod, and it's
working fine but not clearing the backlog. Since
#6100 fixed the issue that caused us
to (nearly) stop sending mail when we deployed #6057, this should be safe to
roll forward.

The revert of the revert applied cleanly, except for expiration-mailer/main.go
and `main_test.go`, particularly around the contents `processCerts` (where
`sendToOneRegID` was extracted from) and `sendToOneRegID` itself. So those areas
are good targets for extra attention.
  • Loading branch information
jsha committed May 23, 2022
1 parent 7dcbf69 commit 76f987a
Show file tree
Hide file tree
Showing 9 changed files with 296 additions and 169 deletions.
6 changes: 3 additions & 3 deletions cmd/bad-key-revoker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,12 +194,12 @@ var maxSerials = 100
// sendMessage sends a single email to the provided address with the revoked
// serials
func (bkr *badKeyRevoker) sendMessage(addr string, serials []string) error {
err := bkr.mailer.Connect()
conn, err := bkr.mailer.Connect()
if err != nil {
return err
}
defer func() {
_ = bkr.mailer.Close()
_ = conn.Close()
}()
mutSerials := make([]string, len(serials))
copy(mutSerials, serials)
Expand All @@ -213,7 +213,7 @@ func (bkr *badKeyRevoker) sendMessage(addr string, serials []string) error {
if err != nil {
return err
}
err = bkr.mailer.SendMail([]string{addr}, bkr.emailSubject, message.String())
err = conn.SendMail([]string{addr}, bkr.emailSubject, message.String())
if err != nil {
return err
}
Expand Down
170 changes: 108 additions & 62 deletions cmd/expiration-mailer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"os"
"sort"
"strings"
"sync"
"text/template"
"time"

Expand Down Expand Up @@ -52,6 +53,7 @@ type mailer struct {
emailTemplate *template.Template
subjectTemplate *template.Template
nagTimes []time.Duration
parallelSends uint
limit int
clk clock.Clock
stats mailerStats
Expand All @@ -68,7 +70,7 @@ type mailerStats struct {
certificatesPerAccountNeedingMail prometheus.Histogram
}

func (m *mailer) sendNags(contacts []string, certs []*x509.Certificate) error {
func (m *mailer) sendNags(conn bmail.Conn, contacts []string, certs []*x509.Certificate) error {
// TODO(#6121): Remove this
if !features.Enabled(features.ExpirationMailerDontLookTwice) {
if len(contacts) == 0 {
Expand Down Expand Up @@ -166,7 +168,7 @@ func (m *mailer) sendNags(contacts []string, certs []*x509.Certificate) error {
m.log.Infof("attempting send JSON=%s", string(logStr))

startSending := m.clk.Now()
err = m.mailer.SendMail(emails, subjBuf.String(), msgBuf.String())
err = conn.SendMail(emails, subjBuf.String(), msgBuf.String())
if err != nil {
m.log.Errf("failed send JSON=%s", string(logStr))
return err
Expand Down Expand Up @@ -197,6 +199,11 @@ func (m *mailer) certIsRenewed(ctx context.Context, names []string, issued time.
return present, err
}

type work struct {
regID int64
certs []core.Certificate
}

func (m *mailer) processCerts(ctx context.Context, allCerts []core.Certificate) error {
regIDToCerts := make(map[int64][]core.Certificate)

Expand All @@ -206,88 +213,126 @@ func (m *mailer) processCerts(ctx context.Context, allCerts []core.Certificate)
regIDToCerts[cert.RegistrationID] = cs
}

err := m.mailer.Connect()
parallelSends := m.parallelSends
if parallelSends == 0 {
parallelSends = 1
}

var wg sync.WaitGroup
workChan := make(chan work, len(regIDToCerts))

// Populate the work chan on a goroutine so work is available as soon
// as one of the sender routines starts.
go func(ch chan<- work) {
for regID, certs := range regIDToCerts {
ch <- work{regID, certs}
}
close(workChan)
}(workChan)

for senderNum := uint(0); senderNum < parallelSends; senderNum++ {
// For politeness' sake, don't open more than 1 new connection per
// second.
if senderNum > 0 {
time.Sleep(time.Second)
}

if ctx.Err() != nil {
return ctx.Err()
}

conn, err := m.mailer.Connect()
if err != nil {
m.log.AuditErrf("connecting parallel sender %d: %s", senderNum, err)
return err
}
wg.Add(1)
go func(conn bmail.Conn, ch <-chan work) {
defer wg.Done()
for w := range ch {
err := m.sendToOneRegID(ctx, conn, w.regID, w.certs)
if err != nil {
m.log.AuditErr(err.Error())
}
}
conn.Close()
}(conn, workChan)
}
wg.Wait()
return nil
}

func (m *mailer) sendToOneRegID(ctx context.Context, conn bmail.Conn, regID int64, certs []core.Certificate) error {
if ctx.Err() != nil {
return ctx.Err()
}
reg, err := m.rs.GetRegistration(ctx, &sapb.RegistrationID{Id: regID})
if err != nil {
return fmt.Errorf("connecting to SMTP server: %w", err)
m.stats.errorCount.With(prometheus.Labels{"type": "GetRegistration"}).Inc()
return fmt.Errorf("Error fetching registration %d: %s", regID, err)
}
defer func() {
_ = m.mailer.Close()
}()

for regID, certs := range regIDToCerts {
parsedCerts := []*x509.Certificate{}
for _, cert := range certs {
if ctx.Err() != nil {
return ctx.Err()
}
reg, err := m.rs.GetRegistration(ctx, &sapb.RegistrationID{Id: regID})
parsedCert, err := x509.ParseCertificate(cert.DER)
if err != nil {
m.log.AuditErrf("Error fetching registration %d: %s", regID, err)
m.stats.errorCount.With(prometheus.Labels{"type": "GetRegistration"}).Inc()
// TODO(#1420): tell registration about this error
m.log.AuditErrf("Error parsing certificate %s: %s", cert.Serial, err)
m.stats.errorCount.With(prometheus.Labels{"type": "ParseCertificate"}).Inc()
continue
}

parsedCerts := []*x509.Certificate{}
for _, cert := range certs {
if ctx.Err() != nil {
return ctx.Err()
}
parsedCert, err := x509.ParseCertificate(cert.DER)
renewed, err := m.certIsRenewed(ctx, parsedCert.DNSNames, parsedCert.NotBefore)
if err != nil {
m.log.AuditErrf("expiration-mailer: error fetching renewal state: %v", err)
// assume not renewed
} else if renewed {
m.log.Debugf("Cert %s is already renewed", cert.Serial)
m.stats.certificatesAlreadyRenewed.Add(1)
err := m.updateCertStatus(ctx, cert.Serial)
if err != nil {
// TODO(#1420): tell registration about this error
m.log.AuditErrf("Error parsing certificate %s: %s", cert.Serial, err)
m.stats.errorCount.With(prometheus.Labels{"type": "ParseCertificate"}).Inc()
continue
m.log.AuditErrf("Error updating certificate status for %s: %s", cert.Serial, err)
m.stats.errorCount.With(prometheus.Labels{"type": "UpdateCertificateStatus"}).Inc()
}
continue
}

renewed, err := m.certIsRenewed(ctx, parsedCert.DNSNames, parsedCert.NotBefore)
if err != nil {
m.log.AuditErrf("expiration-mailer: error fetching renewal state: %v", err)
// assume not renewed
} else if renewed {
m.log.Debugf("Cert %s is already renewed", cert.Serial)
m.stats.certificatesAlreadyRenewed.Add(1)
err := m.updateCertStatus(ctx, cert.Serial)
if err != nil {
m.log.AuditErrf("Error updating certificate status for %s: %s", cert.Serial, err)
m.stats.errorCount.With(prometheus.Labels{"type": "UpdateCertificateStatus"}).Inc()
}
continue
}
parsedCerts = append(parsedCerts, parsedCert)
}

parsedCerts = append(parsedCerts, parsedCert)
}
m.stats.certificatesPerAccountNeedingMail.Observe(float64(len(parsedCerts)))

m.stats.certificatesPerAccountNeedingMail.Observe(float64(len(parsedCerts)))
if len(parsedCerts) == 0 {
// all certificates are renewed
return nil
}

if len(parsedCerts) == 0 {
// all certificates are renewed
continue
// TODO(#6121): Remove this
if !features.Enabled(features.ExpirationMailerDontLookTwice) {
if len(reg.Contact) == 0 {
return nil
}
}

// TODO(#6121): Remove this
if !features.Enabled(features.ExpirationMailerDontLookTwice) {
if len(reg.Contact) == 0 {
continue
}
err = m.sendNags(conn, reg.Contact, parsedCerts)
if err != nil {
m.stats.errorCount.With(prometheus.Labels{"type": "SendNags"}).Inc()
return fmt.Errorf("sending nag emails: %s", err)
}
for _, cert := range parsedCerts {
if ctx.Err() != nil {
return ctx.Err()
}

err = m.sendNags(reg.Contact, parsedCerts)
serial := core.SerialToString(cert.SerialNumber)
err = m.updateCertStatus(ctx, serial)
if err != nil {
m.stats.errorCount.With(prometheus.Labels{"type": "SendNags"}).Inc()
m.log.AuditErrf("Error sending nag emails: %s", err)
m.log.AuditErrf("Error updating certificate status for %s: %s", serial, err)
m.stats.errorCount.With(prometheus.Labels{"type": "UpdateCertificateStatus"}).Inc()
continue
}
for _, cert := range parsedCerts {
if ctx.Err() != nil {
return ctx.Err()
}
serial := core.SerialToString(cert.SerialNumber)
err = m.updateCertStatus(ctx, serial)
if err != nil {
m.log.AuditErrf("Error updating certificate status for %s: %s", serial, err)
m.stats.errorCount.With(prometheus.Labels{"type": "UpdateCertificateStatus"}).Inc()
continue
}
}
}
return nil
}
Expand Down Expand Up @@ -651,6 +696,7 @@ func main() {
emailTemplate: tmpl,
nagTimes: nags,
limit: c.Mailer.CertLimit,
parallelSends: c.Mailer.ParallelSends,
clk: clk,
stats: initStats(scope),
}
Expand Down
45 changes: 42 additions & 3 deletions cmd/expiration-mailer/main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"crypto/elliptic"
"crypto/rand"
"crypto/x509"
"errors"
"fmt"
"math/big"
"net"
Expand All @@ -21,6 +22,7 @@ import (
berrors "github.com/letsencrypt/boulder/errors"
"github.com/letsencrypt/boulder/features"
blog "github.com/letsencrypt/boulder/log"
bmail "github.com/letsencrypt/boulder/mail"
"github.com/letsencrypt/boulder/metrics"
"github.com/letsencrypt/boulder/mocks"
"github.com/letsencrypt/boulder/sa"
Expand Down Expand Up @@ -113,7 +115,9 @@ func TestSendNags(t *testing.T) {
DNSNames: []string{"example.com"},
}

err := m.sendNags([]string{emailA}, []*x509.Certificate{cert})
conn, err := m.mailer.Connect()
test.AssertNotError(t, err, "connecting SMTP")
err = m.sendNags(conn, []string{emailA}, []*x509.Certificate{cert})
test.AssertNotError(t, err, "Failed to send warning messages")
test.AssertEquals(t, len(mc.Messages), 1)
test.AssertEquals(t, mocks.MailerMessage{
Expand All @@ -123,7 +127,9 @@ func TestSendNags(t *testing.T) {
}, mc.Messages[0])

mc.Clear()
err = m.sendNags([]string{emailA, emailB}, []*x509.Certificate{cert})
conn, err = m.mailer.Connect()
test.AssertNotError(t, err, "connecting SMTP")
err = m.sendNags(conn, []string{emailA, emailB}, []*x509.Certificate{cert})
test.AssertNotError(t, err, "Failed to send warning messages")
test.AssertEquals(t, len(mc.Messages), 2)
test.AssertEquals(t, mocks.MailerMessage{
Expand All @@ -138,7 +144,9 @@ func TestSendNags(t *testing.T) {
}, mc.Messages[1])

mc.Clear()
err = m.sendNags([]string{}, []*x509.Certificate{cert})
conn, err = m.mailer.Connect()
test.AssertNotError(t, err, "connecting SMTP")
err = m.sendNags(conn, []string{}, []*x509.Certificate{cert})
test.AssertNotError(t, err, "Not an error to pass no email contacts")
test.AssertEquals(t, len(mc.Messages), 0)

Expand Down Expand Up @@ -291,6 +299,37 @@ func TestNoContactCertIsRenewed(t *testing.T) {
test.AssertMetricWithLabelsEquals(t, certsAlreadyRenewed, prometheus.Labels{}, 1.0)
}

func TestProcessCertsParallel(t *testing.T) {
testCtx := setup(t, []time.Duration{time.Hour * 24 * 7})

testCtx.m.parallelSends = 2
certs := addExpiringCerts(t, testCtx)
log.Clear()
testCtx.m.processCerts(context.Background(), certs)
// Test that the lastExpirationNagSent was updated for the certificate
// corresponding to serial4, which is set up as "already renewed" by
// addExpiringCerts.
if len(log.GetAllMatching("DEBUG: SQL: UPDATE certificateStatus .*2006-01-02 15:04:05.999999999.*\"000000000000000000000000000000001339\"")) != 1 {
t.Errorf("Expected an update to certificateStatus, got these log lines:\n%s",
strings.Join(log.GetAllMatching(".*"), "\n"))
}
}

type erroringMailClient struct{}

func (e erroringMailClient) Connect() (bmail.Conn, error) {
return nil, errors.New("whoopsie-doo")
}

func TestProcessCertsConnectError(t *testing.T) {
testCtx := setup(t, []time.Duration{time.Hour * 24 * 7})

testCtx.m.mailer = erroringMailClient{}
certs := addExpiringCerts(t, testCtx)
// Checking that this terminates rather than deadlocks
testCtx.m.processCerts(context.Background(), certs)
}

func TestFindExpiringCertificates(t *testing.T) {
testCtx := setup(t, []time.Duration{time.Hour * 24, time.Hour * 24 * 4, time.Hour * 24 * 7})

Expand Down
4 changes: 3 additions & 1 deletion cmd/expiration-mailer/send_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ func TestSendEarliestCertInfo(t *testing.T) {
serial2,
)

err := ctx.m.sendNags([]string{email1, email2}, []*x509.Certificate{rawCertA, rawCertB})
conn, err := ctx.m.mailer.Connect()
test.AssertNotError(t, err, "connecting SMTP")
err = ctx.m.sendNags(conn, []string{email1, email2}, []*x509.Certificate{rawCertA, rawCertB})
if err != nil {
t.Fatal(err)
}
Expand Down
6 changes: 3 additions & 3 deletions cmd/notify-mailer/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,12 +154,12 @@ func (m *mailer) run() error {
m.log.Infof("Address %q was associated with the most recipients (%d)",
mostRecipients, mostRecipientsLen)

err = m.mailer.Connect()
conn, err := m.mailer.Connect()
if err != nil {
return err
}

defer func() { _ = m.mailer.Close() }()
defer func() { _ = conn.Close() }()

startTime := m.clk.Now()
sortedAddresses := sortAddresses(addressToRecipient)
Expand All @@ -186,7 +186,7 @@ func (m *mailer) run() error {
continue
}

err = m.mailer.SendMail([]string{address}, m.subject, messageBody)
err = conn.SendMail([]string{address}, m.subject, messageBody)
if err != nil {
var badAddrErr bmail.BadAddressSMTPError
if errors.As(err, &badAddrErr) {
Expand Down
Loading

0 comments on commit 76f987a

Please sign in to comment.