Skip to content

Commit

Permalink
admin: Perform unpauseAccount batches in parallel
Browse files Browse the repository at this point in the history
  • Loading branch information
beautifulentropy committed Aug 29, 2024
1 parent ea62f9a commit 63fae52
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 21 deletions.
90 changes: 70 additions & 20 deletions cmd/admin/unpause_account.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,19 @@ import (
"fmt"
"os"
"strconv"
"sync"
"sync/atomic"

sapb "github.com/letsencrypt/boulder/sa/proto"
"github.com/letsencrypt/boulder/unpause"
"golang.org/x/exp/maps"
)

// subcommandUnpauseAccount encapsulates the "admin unpause-account" command.
type subcommandUnpauseAccount struct {
batchFile string
regID int64
accountID int64
batchFile string
parallelism uint
}

var _ subcommand = (*subcommandUnpauseAccount)(nil)
Expand All @@ -26,16 +30,17 @@ func (u *subcommandUnpauseAccount) Desc() string {
}

func (u *subcommandUnpauseAccount) Flags(flag *flag.FlagSet) {
flag.Int64Var(&u.accountID, "account", 0, "A single account ID to unpause")
flag.StringVar(&u.batchFile, "batch-file", "", "Path to a file containing multiple account IDs where each is separated by a newline")
flag.Int64Var(&u.regID, "account", 0, "A single account ID to unpause")
flag.UintVar(&u.parallelism, "parallelism", 10, "The maximum number of concurrent unpause requests to send to the SA (default: 10)")
}

func (u *subcommandUnpauseAccount) Run(ctx context.Context, a *admin) error {
// This is a map of all input-selection flags to whether or not they were set
// to a non-default value. We use this to ensure that exactly one input
// selection flag was given on the command line.
setInputs := map[string]bool{
"-account": u.regID != 0,
"-account": u.accountID != 0,
"-batch-file": u.batchFile != "",
}
maps.DeleteFunc(setInputs, func(_ string, v bool) bool { return !v })
Expand All @@ -49,7 +54,7 @@ func (u *subcommandUnpauseAccount) Run(ctx context.Context, a *admin) error {
var err error
switch maps.Keys(setInputs)[0] {
case "-account":
regIDs = []int64{u.regID}
regIDs = []int64{u.accountID}
case "-batch-file":
regIDs, err = a.readUnpauseAccountFile(u.batchFile)
default:
Expand All @@ -59,32 +64,77 @@ func (u *subcommandUnpauseAccount) Run(ctx context.Context, a *admin) error {
return fmt.Errorf("collecting serials to revoke: %w", err)
}

_, err = a.unpauseAccounts(ctx, regIDs)
_, err = a.unpauseAccounts(ctx, regIDs, u.parallelism)
if err != nil {
return err
}

return nil
}

// unpauseAccount allows administratively unpausing all identifiers for an
// account. Returns a slice of int64 which is counter of unpaused accounts or an
// error.
func (a *admin) unpauseAccounts(ctx context.Context, regIDs []int64) ([]int64, error) {
var count []int64
if len(regIDs) <= 0 {
return count, errors.New("no regIDs sent for unpausing")
type unpauseCount struct {
accountID int64
count int64
}

// unpauseAccount concurrently unpauses all identifiers for each account using
// up to `parallelism` workers. It returns a count of the number of identifiers
// unpaused for each account and any accumulated errors.
func (a *admin) unpauseAccounts(ctx context.Context, accountIDs []int64, parallelism uint) ([]unpauseCount, error) {
if len(accountIDs) <= 0 {
return nil, errors.New("no account IDs provided for unpausing")
}

for _, regID := range regIDs {
response, err := a.sac.UnpauseAccount(ctx, &sapb.RegistrationID{Id: regID})
if err != nil {
return count, err
}
count = append(count, response.Count)
countChan := make(chan unpauseCount, len(accountIDs))
work := make(chan int64, parallelism)
retryCount := make(map[int64]int)
const maxRetries = 3

var wg sync.WaitGroup
var errCount atomic.Uint64
for i := uint(0); i < parallelism; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for accountID := range work {
if retryCount[accountID] >= maxRetries {
a.log.Errf("retry limit reached for accountID %d", accountID)
continue
}

response, err := a.sac.UnpauseAccount(ctx, &sapb.RegistrationID{Id: accountID})
if err != nil {
errCount.Add(1)
a.log.Errf("error unpausing accountID %d: %v", accountID, err)
continue
}
if response.Count >= unpause.RequestLimit {
retryCount[accountID]++
work <- accountID
}
countChan <- unpauseCount{accountID: accountID, count: response.Count}
}
}()
}

for _, accountID := range accountIDs {
work <- accountID
}

wg.Wait()
close(work)
close(countChan)

var unpauseCounts []unpauseCount
for count := range countChan {
unpauseCounts = append(unpauseCounts, count)
}

if errCount.Load() > 0 {
return unpauseCounts, fmt.Errorf("encountered %d errors while unpausing; see logs above for details", errCount.Load())
}

return count, nil
return unpauseCounts, nil
}

// readUnpauseAccountFile parses the contents of a file containing one account
Expand Down
2 changes: 1 addition & 1 deletion cmd/admin/unpause_account_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func TestUnpauseAccounts(t *testing.T) {
}
a := admin{sac: testCase.saImpl, log: log}

count, err := a.unpauseAccounts(context.Background(), testCase.regIDs)
count, err := a.unpauseAccounts(context.Background(), testCase.regIDs, 10)
if testCase.expectErr {
test.AssertError(t, err, "should have errored, but did not")
} else {
Expand Down

0 comments on commit 63fae52

Please sign in to comment.