Skip to content

Commit

Permalink
Use parallelism approach from other admin commands
Browse files Browse the repository at this point in the history
  • Loading branch information
beautifulentropy committed Aug 29, 2024
1 parent b908aef commit 5510010
Showing 1 changed file with 37 additions and 38 deletions.
75 changes: 37 additions & 38 deletions cmd/admin/pause_identifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,16 +10,16 @@ import (
"os"
"strconv"
"sync"
"sync/atomic"

"github.com/letsencrypt/boulder/identifier"
sapb "github.com/letsencrypt/boulder/sa/proto"
"github.com/letsencrypt/boulder/semaphore"
)

// subcommandPauseIdentifier encapsulates the "admin pause-identifiers" command.
type subcommandPauseIdentifier struct {
batchFile string
maxInFlight int64
parallelism uint
}

var _ subcommand = (*subcommandPauseIdentifier)(nil)
Expand All @@ -30,7 +30,7 @@ func (p *subcommandPauseIdentifier) Desc() string {

func (p *subcommandPauseIdentifier) Flags(flag *flag.FlagSet) {
flag.StringVar(&p.batchFile, "batch-file", "", "Path to a CSV file containing (account ID, identifier type, identifier value)")
flag.Int64Var(&p.maxInFlight, "max-in-flight", 10, "The maximum number of concurrent pause requests to send to the SA")
flag.UintVar(&p.parallelism, "parallelism", 10, "The maximum number of concurrent pause requests to send to the SA (default: 10)")
}

func (p *subcommandPauseIdentifier) Run(ctx context.Context, a *admin) error {
Expand All @@ -43,7 +43,7 @@ func (p *subcommandPauseIdentifier) Run(ctx context.Context, a *admin) error {
return err
}

_, err = a.pauseIdentifiers(ctx, identifiers, p.maxInFlight)
_, err = a.pauseIdentifiers(ctx, identifiers, p.parallelism)
if err != nil {
return err
}
Expand All @@ -53,11 +53,11 @@ func (p *subcommandPauseIdentifier) Run(ctx context.Context, a *admin) error {

// pauseIdentifiers pauses each account, identifier pair in the provided slice
// of pauseCSVData entries. It will pause up to maxInFlight identifiers at a
// time. If any errors occur while pausing, they will be gathered and returned
// as a single error.
func (a *admin) pauseIdentifiers(ctx context.Context, entries []pauseCSVData, maxInFlight int64) ([]*sapb.PauseIdentifiersResponse, error) {
// time. If any errors occur while pausing they are logged and an error is
// returned.
func (a *admin) pauseIdentifiers(ctx context.Context, entries []pauseCSVData, parallelism uint) ([]*sapb.PauseIdentifiersResponse, error) {
if len(entries) <= 0 {
return nil, errors.New("cannot pause identifiers because no pauseData was sent")
return nil, errors.New("no identifiers to pause")
}

accountToIdentifiers := make(map[int64][]*sapb.Identifier)
Expand All @@ -68,51 +68,50 @@ func (a *admin) pauseIdentifiers(ctx context.Context, entries []pauseCSVData, ma
})
}

var errCount atomic.Uint64
respChan := make(chan *sapb.PauseIdentifiersResponse, len(accountToIdentifiers))
errorsChan := make(chan error, len(accountToIdentifiers))
sem := semaphore.NewWeighted(maxInFlight, 0)
work := make(chan struct {
accountID int64
identifiers []*sapb.Identifier
}, parallelism)

var wg sync.WaitGroup
for accountID, identifiers := range accountToIdentifiers {
for i := uint(0); i < parallelism; i++ {
wg.Add(1)
go func(accountID int64, identifiers []*sapb.Identifier) {
go func() {
defer wg.Done()

err := sem.Acquire(ctx, 1)
if err != nil {
errorsChan <- fmt.Errorf("while acquiring semaphore to pause identifier(s) %q for account %d: %w", identifiers, accountID, err)
return
for data := range work {
response, err := a.sac.PauseIdentifiers(ctx, &sapb.PauseRequest{
RegistrationID: data.accountID,
Identifiers: data.identifiers,
})
if err != nil {
errCount.Add(1)
a.log.Errf("error pausing identifier(s) %q for account %d: %v", data.identifiers, data.accountID, err)
} else {
respChan <- response
}
}
defer sem.Release(1)

response, err := a.sac.PauseIdentifiers(ctx, &sapb.PauseRequest{
RegistrationID: accountID,
Identifiers: identifiers,
})
if err != nil {
errorsChan <- fmt.Errorf("while pausing identifier(s) %q for account %d: %w", identifiers, accountID, err)
return
}
respChan <- response
}(accountID, identifiers)
}()
}

for accountID, identifiers := range accountToIdentifiers {
work <- struct {
accountID int64
identifiers []*sapb.Identifier
}{accountID, identifiers}
}
close(work)
wg.Wait()
close(respChan)
close(errorsChan)

responses := make([]*sapb.PauseIdentifiersResponse, 0)
var responses []*sapb.PauseIdentifiersResponse
for response := range respChan {
responses = append(responses, response)
}

var errors []error
for err := range errorsChan {
errors = append(errors, err)
}

if len(errors) > 0 {
return responses, fmt.Errorf("one or more errors occurred while pausing: %v", errors)
if errCount.Load() > 0 {
return responses, fmt.Errorf("encountered %d errors while pausing identifiers; see logs above for details", errCount.Load())
}

return responses, nil
Expand Down

0 comments on commit 5510010

Please sign in to comment.