Skip to content

Commit

Permalink
ocsp: add load shedding for live signer (#6523)
Browse files Browse the repository at this point in the history
In live.go we use a semaphore to limit how many inflight signing
requests we can have, so a flood of OCSP traffic doesn't flood our CA
instances. If traffic exceeds our capacity to sign responses for long
enough, we want to eventually start fast-rejecting inbound requests that
are unlikely to get serviced before their deadline is reached. To do
that, add a MaxSigningWaiters config field to the OCSP responder.

Note that the files in //semaphore are forked from x/sync/semaphore,
with modifications to add the MaxWaiters field and functionality.

Fixes #6392
  • Loading branch information
jsha authored Dec 12, 2022
1 parent f2bb0e4 commit fe2cf7d
Show file tree
Hide file tree
Showing 12 changed files with 755 additions and 9 deletions.
16 changes: 14 additions & 2 deletions cmd/ocsp-responder/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,9 +81,21 @@ type Config struct {
// should be set to somewhat less than
// (HSM signing capacity) / (number of ocsp-responders).
// Requests that would exceed this limit will block until capacity is
// available and eventually 500.
// available and eventually serve an HTTP 500 Internal Server Error.
MaxInflightSignings int

// A limit on how many goroutines can be waiting for a signing slot at
// a time. When this limit is exceeded, additional signing requests
// will immediately serve an HTTP 500 Internal Server Error until
// we are back below the limit. This provides load shedding for when
// inbound requests arrive faster than our ability to sign them.
// The default of 0 means "no limit." A good value for this is the
// longest queue we can expect to process before a timeout. For
// instance, if the timeout is 5 seconds, and a signing takes 20ms,
// and we have MaxInflightSignings = 40, we can expect to process
// 40 * 5 / 0.02 = 10,000 requests before the oldest request times out.
MaxSigningWaiters int

ShutdownStopTimeout cmd.ConfigDuration

RequiredSerialPrefixes []string
Expand Down Expand Up @@ -189,7 +201,7 @@ as generated by Boulder's ceremony command.
if maxInflight == 0 {
maxInflight = 1000
}
liveSource := live.New(rac, int64(maxInflight))
liveSource := live.New(rac, int64(maxInflight), c.OCSPResponder.MaxSigningWaiters)

rocspSource, err := redis_responder.NewRedisSource(rocspReader, liveSource, liveSigningPeriod, clk, scope, logger)
cmd.FailOnError(err, "Could not create redis source")
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ require (
github.com/zmap/zlint/v3 v3.4.0
golang.org/x/crypto v0.0.0-20220622213112-05595931fe9d
golang.org/x/net v0.0.0-20220926192436-02166a98028e
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4
golang.org/x/sync v0.1.0
golang.org/x/term v0.0.0-20220722155259-a9ba230a4035
golang.org/x/text v0.3.8
google.golang.org/grpc v1.49.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -624,6 +624,8 @@ golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 h1:uVc8UZUe6tr40fFVnUP5Oj+veunVezqYl9z7DYw9xzw=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
Expand Down
6 changes: 3 additions & 3 deletions ocsp/responder/live/live.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ import (
berrors "github.com/letsencrypt/boulder/errors"
"github.com/letsencrypt/boulder/ocsp/responder"
rapb "github.com/letsencrypt/boulder/ra/proto"
"github.com/letsencrypt/boulder/semaphore"
"golang.org/x/crypto/ocsp"
"golang.org/x/sync/semaphore"
"google.golang.org/grpc"
)

Expand All @@ -23,10 +23,10 @@ type Source struct {
sem *semaphore.Weighted
}

func New(ra ocspGenerator, maxInflight int64) *Source {
func New(ra ocspGenerator, maxInflight int64, maxWaiters int) *Source {
return &Source{
ra: ra,
sem: semaphore.NewWeighted(maxInflight),
sem: semaphore.NewWeighted(maxInflight, maxWaiters),
}
}

Expand Down
4 changes: 2 additions & 2 deletions ocsp/responder/live/live_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestLiveResponse(t *testing.T) {
fakeResp, _, _ := ocsp_test.FakeResponse(ocsp.Response{
SerialNumber: eeSerial,
})
source := New(mockOCSPGenerator{fakeResp.Raw}, 1)
source := New(mockOCSPGenerator{fakeResp.Raw}, 1, 0)
resp, err := source.Response(context.Background(), &ocsp.Request{
SerialNumber: eeSerial,
})
Expand All @@ -59,7 +59,7 @@ func TestLiveResponse(t *testing.T) {

func TestNotFound(t *testing.T) {
eeSerial := big.NewInt(1)
source := New(notFoundOCSPGenerator{}, 1)
source := New(notFoundOCSPGenerator{}, 1, 0)
_, err := source.Response(context.Background(), &ocsp.Request{
SerialNumber: eeSerial,
})
Expand Down
152 changes: 152 additions & 0 deletions semaphore/semaphore.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
// Copyright 2017 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Modified by Boulder to provide a load-shedding mechanism.

// Package semaphore provides a weighted semaphore implementation.
package semaphore // import "golang.org/x/sync/semaphore"

import (
"container/list"
"context"
"errors"
"sync"
)

type waiter struct {
n int64
ready chan<- struct{} // Closed when semaphore acquired.
}

// ErrMaxWaiters is returned when Acquire is called, but there are more than
// maxWaiters waiters.
var ErrMaxWaiters = errors.New("too many waiters")

// NewWeighted creates a new weighted semaphore with the given
// maximum combined weight for concurrent access.
// maxWaiters provides a limit such that calls to Acquire
// will immediately error if the number of waiters is that high.
// A maxWaiters of zero means no limit.
func NewWeighted(n int64, maxWaiters int) *Weighted {
w := &Weighted{size: n, maxWaiters: maxWaiters}
return w
}

// Weighted provides a way to bound concurrent access to a resource.
// The callers can request access with a given weight.
type Weighted struct {
size int64
cur int64
mu sync.Mutex
waiters list.List
maxWaiters int
}

// Acquire acquires the semaphore with a weight of n, blocking until resources
// are available or ctx is done. On success, returns nil. On failure, returns
// ctx.Err() and leaves the semaphore unchanged.
//
// If ctx is already done, Acquire may still succeed without blocking.
//
// If there are maxWaiters waiters, Acquire will return an error immediately.
func (s *Weighted) Acquire(ctx context.Context, n int64) error {
s.mu.Lock()
if s.size-s.cur >= n && s.waiters.Len() == 0 {
s.cur += n
s.mu.Unlock()
return nil
}

if n > s.size {
// Don't make other Acquire calls block on one that's doomed to fail.
s.mu.Unlock()
<-ctx.Done()
return ctx.Err()
}

if s.maxWaiters > 0 && s.waiters.Len() >= s.maxWaiters {
return ErrMaxWaiters
}

ready := make(chan struct{})
w := waiter{n: n, ready: ready}
elem := s.waiters.PushBack(w)
s.mu.Unlock()

select {
case <-ctx.Done():
err := ctx.Err()
s.mu.Lock()
select {
case <-ready:
// Acquired the semaphore after we were canceled. Rather than trying to
// fix up the queue, just pretend we didn't notice the cancellation.
err = nil
default:
isFront := s.waiters.Front() == elem
s.waiters.Remove(elem)
// If we're at the front and there're extra tokens left, notify other waiters.
if isFront && s.size > s.cur {
s.notifyWaiters()
}
}
s.mu.Unlock()
return err

case <-ready:
return nil
}
}

// TryAcquire acquires the semaphore with a weight of n without blocking.
// On success, returns true. On failure, returns false and leaves the semaphore unchanged.
func (s *Weighted) TryAcquire(n int64) bool {
s.mu.Lock()
success := s.size-s.cur >= n && s.waiters.Len() == 0
if success {
s.cur += n
}
s.mu.Unlock()
return success
}

// Release releases the semaphore with a weight of n.
func (s *Weighted) Release(n int64) {
s.mu.Lock()
s.cur -= n
if s.cur < 0 {
s.mu.Unlock()
panic("semaphore: released more than held")
}
s.notifyWaiters()
s.mu.Unlock()
}

func (s *Weighted) notifyWaiters() {
for {
next := s.waiters.Front()
if next == nil {
break // No more waiters blocked.
}

w := next.Value.(waiter)
if s.size-s.cur < w.n {
// Not enough tokens for the next waiter. We could keep going (to try to
// find a waiter with a smaller request), but under load that could cause
// starvation for large requests; instead, we leave all remaining waiters
// blocked.
//
// Consider a semaphore used as a read-write lock, with N tokens, N
// readers, and one writer. Each reader can Acquire(1) to obtain a read
// lock. The writer can Acquire(N) to obtain a write lock, excluding all
// of the readers. If we allow the readers to jump ahead in the queue,
// the writer will starve — there is always one token available for every
// reader.
break
}

s.cur += w.n
s.waiters.Remove(next)
close(w.ready)
}
}
132 changes: 132 additions & 0 deletions semaphore/semaphore_bench_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,132 @@
// Copyright 2017 The Go Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

//go:build go1.7
// +build go1.7

package semaphore_test

import (
"context"
"fmt"
"testing"

"github.com/letsencrypt/boulder/semaphore"
)

// weighted is an interface matching a subset of *Weighted. It allows
// alternate implementations for testing and benchmarking.
type weighted interface {
Acquire(context.Context, int64) error
TryAcquire(int64) bool
Release(int64)
}

// semChan implements Weighted using a channel for
// comparing against the condition variable-based implementation.
type semChan chan struct{}

func newSemChan(n int64) semChan {
return semChan(make(chan struct{}, n))
}

func (s semChan) Acquire(_ context.Context, n int64) error {
for i := int64(0); i < n; i++ {
s <- struct{}{}
}
return nil
}

func (s semChan) TryAcquire(n int64) bool {
if int64(len(s))+n > int64(cap(s)) {
return false
}

for i := int64(0); i < n; i++ {
s <- struct{}{}
}
return true
}

func (s semChan) Release(n int64) {
for i := int64(0); i < n; i++ {
<-s
}
}

// acquireN calls Acquire(size) on sem N times and then calls Release(size) N times.
func acquireN(b *testing.B, sem weighted, size int64, N int) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
for j := 0; j < N; j++ {
sem.Acquire(context.Background(), size)
}
for j := 0; j < N; j++ {
sem.Release(size)
}
}
}

// tryAcquireN calls TryAcquire(size) on sem N times and then calls Release(size) N times.
func tryAcquireN(b *testing.B, sem weighted, size int64, N int) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
for j := 0; j < N; j++ {
if !sem.TryAcquire(size) {
b.Fatalf("TryAcquire(%v) = false, want true", size)
}
}
for j := 0; j < N; j++ {
sem.Release(size)
}
}
}

func BenchmarkNewSeq(b *testing.B) {
for _, cap := range []int64{1, 128} {
b.Run(fmt.Sprintf("Weighted-%d", cap), func(b *testing.B) {
for i := 0; i < b.N; i++ {
_ = semaphore.NewWeighted(cap, 0)
}
})
b.Run(fmt.Sprintf("semChan-%d", cap), func(b *testing.B) {
for i := 0; i < b.N; i++ {
_ = newSemChan(cap)
}
})
}
}

func BenchmarkAcquireSeq(b *testing.B) {
for _, c := range []struct {
cap, size int64
N int
}{
{1, 1, 1},
{2, 1, 1},
{16, 1, 1},
{128, 1, 1},
{2, 2, 1},
{16, 2, 8},
{128, 2, 64},
{2, 1, 2},
{16, 8, 2},
{128, 64, 2},
} {
for _, w := range []struct {
name string
w weighted
}{
{"Weighted", semaphore.NewWeighted(c.cap, 0)},
{"semChan", newSemChan(c.cap)},
} {
b.Run(fmt.Sprintf("%s-acquire-%d-%d-%d", w.name, c.cap, c.size, c.N), func(b *testing.B) {
acquireN(b, w.w, c.size, c.N)
})
b.Run(fmt.Sprintf("%s-tryAcquire-%d-%d-%d", w.name, c.cap, c.size, c.N), func(b *testing.B) {
tryAcquireN(b, w.w, c.size, c.N)
})
}
}
}
Loading

0 comments on commit fe2cf7d

Please sign in to comment.