Skip to content

Commit

Permalink
internal/daemon/controller+worker: propagate downstream timeout
Browse files Browse the repository at this point in the history
Propagate the downstream worker timeout into the
downstream receiver and downstream ticker
so we have a configurable consistent timeout
value to use for worker to worker interactions.

Also change the atomic.Pointer to an atomic.Int64
for consistency with other values.
  • Loading branch information
johanbrandhorst committed Sep 9, 2024
1 parent d45aa40 commit b5972fc
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 36 deletions.
35 changes: 17 additions & 18 deletions internal/daemon/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"strings"
"sync"
"sync/atomic"
"time"

"github.com/hashicorp/boundary/internal/alias"
talias "github.com/hashicorp/boundary/internal/alias/target"
Expand Down Expand Up @@ -89,10 +88,10 @@ type downstreamWorkersTicker interface {
}

var (
downstreamReceiverFactory func() downstreamReceiver
downstreamReceiverFactory func(*atomic.Int64) (downstreamReceiver, error)

downstreamersFactory func(context.Context, string, string) (common.Downstreamers, error)
downstreamWorkersTickerFactory func(context.Context, string, string, common.Downstreamers, downstreamReceiver, time.Duration) (downstreamWorkersTicker, error)
downstreamWorkersTickerFactory func(context.Context, string, string, common.Downstreamers, downstreamReceiver, *atomic.Int64) (downstreamWorkersTicker, error)
commandClientFactory func(context.Context, *Controller) error
extControllerFactory func(ctx context.Context, c *Controller, r db.Reader, w db.Writer, kms *kms.Kms) (intglobals.ControllerExtension, error)
)
Expand Down Expand Up @@ -124,7 +123,7 @@ type Controller struct {
// because they are casted to time.Duration.
workerStatusGracePeriod *atomic.Int64
livenessTimeToStale *atomic.Int64
getDownstreamWorkersTimeout *atomic.Pointer[time.Duration]
getDownstreamWorkersTimeout *atomic.Int64

apiGrpcServer *grpc.Server
apiGrpcServerListener grpcServerListener
Expand Down Expand Up @@ -190,11 +189,7 @@ func New(ctx context.Context, conf *Config) (*Controller, error) {
downstreamConnManager: cluster.NewDownstreamManager(),
workerStatusGracePeriod: new(atomic.Int64),
livenessTimeToStale: new(atomic.Int64),
getDownstreamWorkersTimeout: new(atomic.Pointer[time.Duration]),
}

if downstreamReceiverFactory != nil {
c.downstreamConns = downstreamReceiverFactory()
getDownstreamWorkersTimeout: new(atomic.Int64),
}

c.started.Store(false)
Expand Down Expand Up @@ -243,11 +238,17 @@ func New(ctx context.Context, conf *Config) (*Controller, error) {

switch conf.RawConfig.Controller.GetDownstreamWorkersTimeoutDuration {
case 0:
to := server.DefaultLiveness
c.getDownstreamWorkersTimeout.Store(&to)
c.getDownstreamWorkersTimeout.Store(int64(server.DefaultLiveness))
default:
to := conf.RawConfig.Controller.GetDownstreamWorkersTimeoutDuration
c.getDownstreamWorkersTimeout.Store(&to)
c.getDownstreamWorkersTimeout.Store(int64(conf.RawConfig.Controller.GetDownstreamWorkersTimeoutDuration))
}

if downstreamReceiverFactory != nil {
var err error
c.downstreamConns, err = downstreamReceiverFactory(c.getDownstreamWorkersTimeout)
if err != nil {
return nil, fmt.Errorf("%s: unable to initialize downstream receiver: %w", op, err)
}
}

clusterListeners := make([]*base.ServerListener, 0)
Expand Down Expand Up @@ -591,7 +592,7 @@ func (c *Controller) Start() error {
// we'll use "root" to designate that this is the root of the graph (aka
// a controller)
boundVer := version.Get().VersionNumber()
dswTicker, err := downstreamWorkersTickerFactory(c.baseContext, "root", boundVer, c.downstreamWorkers, c.downstreamConns, *c.getDownstreamWorkersTimeout.Load())
dswTicker, err := downstreamWorkersTickerFactory(c.baseContext, "root", boundVer, c.downstreamWorkers, c.downstreamConns, c.getDownstreamWorkersTimeout)
if err != nil {
return fmt.Errorf("error creating downstream workers ticker: %w", err)
}
Expand Down Expand Up @@ -709,11 +710,9 @@ func (c *Controller) ReloadTimings(newConfig *config.Config) error {

switch newConfig.Controller.GetDownstreamWorkersTimeoutDuration {
case 0:
to := server.DefaultLiveness
c.getDownstreamWorkersTimeout.Store(&to)
c.getDownstreamWorkersTimeout.Store(int64(server.DefaultLiveness))
default:
to := newConfig.Controller.GetDownstreamWorkersTimeoutDuration
c.getDownstreamWorkersTimeout.Store(&to)
c.getDownstreamWorkersTimeout.Store(int64(newConfig.Controller.GetDownstreamWorkersTimeoutDuration))
}

return nil
Expand Down
1 change: 1 addition & 0 deletions internal/daemon/worker/status_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ func TestWorkerWaitForNextSuccessfulStatusUpdate(t *testing.T) {
})
err := event.InitSysEventer(testLogger, testLock, "TestWorkerWaitForNextSuccessfulStatusUpdate", event.WithEventerConfig(testConfig))
require.NoError(t, err)
t.Cleanup(func() { event.TestResetSystEventer(t) })
for _, name := range []string{"ok", "timeout"} {
t.Run(name, func(t *testing.T) {
require := require.New(t)
Expand Down
31 changes: 15 additions & 16 deletions internal/daemon/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ type recorderManager interface {

// reverseConnReceiverFactory provides a simple factory which a Worker can use to
// create its reverseConnReceiver
var reverseConnReceiverFactory func() reverseConnReceiver
var reverseConnReceiverFactory func(*atomic.Int64) (reverseConnReceiver, error)

var recordingStorageFactory func(
ctx context.Context,
Expand Down Expand Up @@ -189,7 +189,7 @@ type Worker struct {
// because they are casted to time.Duration.
successfulStatusGracePeriod *atomic.Int64
statusCallTimeoutDuration *atomic.Int64
getDownstreamWorkersTimeoutDuration *atomic.Pointer[time.Duration]
getDownstreamWorkersTimeoutDuration *atomic.Int64

// AuthRotationNextRotation is useful in tests to understand how long to
// sleep
Expand Down Expand Up @@ -232,18 +232,13 @@ func New(ctx context.Context, conf *Config) (*Worker, error) {
localStorageState: new(atomic.Value),
successfulStatusGracePeriod: new(atomic.Int64),
statusCallTimeoutDuration: new(atomic.Int64),
getDownstreamWorkersTimeoutDuration: new(atomic.Pointer[time.Duration]),
getDownstreamWorkersTimeoutDuration: new(atomic.Int64),
upstreamConnectionState: new(atomic.Value),
downstreamWorkers: new(atomic.Pointer[downstreamersContainer]),
}

w.operationalState.Store(server.UnknownOperationalState)
w.localStorageState.Store(server.UnknownLocalStorageState)

if reverseConnReceiverFactory != nil {
w.downstreamReceiver = reverseConnReceiverFactory()
}

w.lastStatusSuccess.Store((*LastStatusInformation)(nil))
scheme := strconv.FormatInt(time.Now().UnixNano(), 36)
controllerResolver := manual.NewBuilderWithScheme(scheme)
Expand Down Expand Up @@ -338,15 +333,21 @@ func New(ctx context.Context, conf *Config) (*Worker, error) {
}
switch conf.RawConfig.Worker.GetDownstreamWorkersTimeoutDuration {
case 0:
to := server.DefaultLiveness
w.getDownstreamWorkersTimeoutDuration.Store(&to)
w.getDownstreamWorkersTimeoutDuration.Store(int64(server.DefaultLiveness))
default:
to := conf.RawConfig.Worker.GetDownstreamWorkersTimeoutDuration
w.getDownstreamWorkersTimeoutDuration.Store(&to)
w.getDownstreamWorkersTimeoutDuration.Store(int64(conf.RawConfig.Worker.GetDownstreamWorkersTimeoutDuration))
}
// FIXME: This is really ugly, but works.
session.CloseCallTimeout.Store(w.successfulStatusGracePeriod.Load())

if reverseConnReceiverFactory != nil {
var err error
w.downstreamReceiver, err = reverseConnReceiverFactory(w.getDownstreamWorkersTimeoutDuration)
if err != nil {
return nil, fmt.Errorf("%s: error creating reverse connection receiver: %w", op, err)
}
}

if eventListenerFactory != nil {
var err error
w.storageEventListener, err = eventListenerFactory(w)
Expand Down Expand Up @@ -429,11 +430,9 @@ func (w *Worker) Reload(ctx context.Context, newConf *config.Config) {
}
switch newConf.Worker.GetDownstreamWorkersTimeoutDuration {
case 0:
to := server.DefaultLiveness
w.getDownstreamWorkersTimeoutDuration.Store(&to)
w.getDownstreamWorkersTimeoutDuration.Store(int64(server.DefaultLiveness))
default:
to := newConf.Worker.GetDownstreamWorkersTimeoutDuration
w.getDownstreamWorkersTimeoutDuration.Store(&to)
w.getDownstreamWorkersTimeoutDuration.Store(int64(newConf.Worker.GetDownstreamWorkersTimeoutDuration))
}
// See comment about this in worker.go
session.CloseCallTimeout.Store(w.successfulStatusGracePeriod.Load())
Expand Down
4 changes: 2 additions & 2 deletions internal/daemon/worker/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func TestWorkerNew(t *testing.T) {
}
if util.IsNil(tt.in.Eventer) {
require.NoError(t, event.InitSysEventer(hclog.Default(), &sync.Mutex{}, "worker_test", event.WithEventerConfig(&event.EventerConfig{})))
defer event.TestResetSystEventer(t)
t.Cleanup(func() { event.TestResetSystEventer(t) })
tt.in.Eventer = event.SysEventer()
}

Expand Down Expand Up @@ -332,7 +332,7 @@ func TestSetupWorkerAuthStorage(t *testing.T) {

func Test_Worker_getSessionTls(t *testing.T) {
require.NoError(t, event.InitSysEventer(hclog.Default(), &sync.Mutex{}, "worker_test", event.WithEventerConfig(&event.EventerConfig{})))
defer event.TestResetSystEventer(t)
t.Cleanup(func() { event.TestResetSystEventer(t) })

conf := &Config{
Server: &base.Server{
Expand Down

0 comments on commit b5972fc

Please sign in to comment.