Skip to content

Commit

Permalink
Allow reloading some server timings
Browse files Browse the repository at this point in the history
Allows changing grace period and a few other things timing-related via
HUP instead of just a restart.

Tested manually via config changes and HUP.
  • Loading branch information
jefferai committed Sep 4, 2024
1 parent 35d1df7 commit ad415b6
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 10 deletions.
12 changes: 12 additions & 0 deletions internal/cmd/commands/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -859,6 +859,10 @@ func (c *Command) Reload(newConf *config.Config) error {
reloadErrors = stderrors.Join(reloadErrors, fmt.Errorf("failed to reload controller api rate limits: %w", err))
}

if err := c.reloadControllerTimings(newConf); err != nil {
reloadErrors = stderrors.Join(reloadErrors, fmt.Errorf("failed to reload controller timings: %w", err))
}

if newConf != nil && c.worker != nil {
workerReloadErr := func() error {
if newConf.Controller != nil {
Expand Down Expand Up @@ -977,6 +981,14 @@ func (c *Command) reloadControllerRateLimits(newConfig *config.Config) error {
return c.controller.ReloadRateLimiter(newConfig)
}

func (c *Command) reloadControllerTimings(newConfig *config.Config) error {
if c.controller == nil || newConfig == nil || newConfig.Controller == nil {
return nil
}

return c.controller.ReloadTimings(newConfig)
}

// acquireSchemaManager returns a schema manager and generally acquires a shared lock on
// the database. This is done as a mechanism to disallow running migration commands
// while the database is in use.
Expand Down
36 changes: 36 additions & 0 deletions internal/daemon/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -682,3 +682,39 @@ func (c *Controller) Shutdown() error {
func (c *Controller) WorkerStatusUpdateTimes() *sync.Map {
return c.workerStatusUpdateTimes
}

// ReloadTimings reloads timing related parameters
func (c *Controller) ReloadTimings(newConfig *config.Config) error {
const op = "controller.(Controller).ReloadTimings"

switch {
case newConfig == nil:
return errors.New(c.baseContext, errors.InvalidParameter, op, "nil config")
case newConfig.Controller == nil:
return errors.New(c.baseContext, errors.InvalidParameter, op, "nil config.Controller")
}

switch newConfig.Controller.WorkerStatusGracePeriodDuration {
case 0:
c.workerStatusGracePeriod.Store(int64(server.DefaultLiveness))
default:
c.workerStatusGracePeriod.Store(int64(newConfig.Controller.WorkerStatusGracePeriodDuration))
}
switch newConfig.Controller.LivenessTimeToStaleDuration {
case 0:
c.livenessTimeToStale.Store(int64(server.DefaultLiveness))
default:
c.livenessTimeToStale.Store(int64(newConfig.Controller.LivenessTimeToStaleDuration))
}

switch newConfig.Controller.GetDownstreamWorkersTimeoutDuration {
case 0:
to := server.DefaultLiveness
c.getDownstreamWorkersTimeout.Store(&to)
default:
to := newConfig.Controller.GetDownstreamWorkersTimeoutDuration
c.getDownstreamWorkersTimeout.Store(&to)
}

return nil
}
23 changes: 23 additions & 0 deletions internal/daemon/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,29 @@ func (w *Worker) Reload(ctx context.Context, newConf *config.Config) {
ar.InitialAddresses(w.conf.RawConfig.Worker.InitialUpstreams)
}
}

switch newConf.Worker.SuccessfulStatusGracePeriodDuration {
case 0:
w.successfulStatusGracePeriod.Store(int64(server.DefaultLiveness))
default:
w.successfulStatusGracePeriod.Store(int64(newConf.Worker.SuccessfulStatusGracePeriodDuration))
}
switch newConf.Worker.StatusCallTimeoutDuration {
case 0:
w.statusCallTimeoutDuration.Store(int64(common.DefaultStatusTimeout))
default:
w.statusCallTimeoutDuration.Store(int64(newConf.Worker.StatusCallTimeoutDuration))
}
switch newConf.Worker.GetDownstreamWorkersTimeoutDuration {
case 0:
to := server.DefaultLiveness
w.getDownstreamWorkersTimeoutDuration.Store(&to)
default:
to := newConf.Worker.GetDownstreamWorkersTimeoutDuration
w.getDownstreamWorkersTimeoutDuration.Store(&to)
}
// See comment about this in worker.go
session.CloseCallTimeout.Store(w.successfulStatusGracePeriod.Load())
}

func (w *Worker) Start() error {
Expand Down
14 changes: 7 additions & 7 deletions internal/session/job_session_cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type sessionConnectionCleanupJob struct {
// The amount of time to give disconnected workers before marking their
// connections as closed. This should be larger than the liveness setting
// for the worker.
gracePeriod *atomic.Int64
workerStatusGracePeriod *atomic.Int64

// The total number of connections closed in the last run.
totalClosed int
Expand All @@ -47,21 +47,21 @@ type sessionConnectionCleanupJob struct {
func newSessionConnectionCleanupJob(
ctx context.Context,
writer db.Writer,
gracePeriod *atomic.Int64,
workerStatusGracePeriod *atomic.Int64,
) (*sessionConnectionCleanupJob, error) {
const op = "session.newNewSessionConnectionCleanupJob"
switch {
case writer == nil:
return nil, errors.New(ctx, errors.InvalidParameter, op, "missing db writer")
case gracePeriod == nil:
case workerStatusGracePeriod == nil:
return nil, errors.New(ctx, errors.InvalidParameter, op, "missing grace period")
case gracePeriod.Load() == 0:
case workerStatusGracePeriod.Load() == 0:
return nil, errors.New(ctx, errors.InvalidParameter, op, "grace period is zero")
}

return &sessionConnectionCleanupJob{
writer: writer,
gracePeriod: gracePeriod,
writer: writer,
workerStatusGracePeriod: workerStatusGracePeriod,
}, nil
}

Expand Down Expand Up @@ -98,7 +98,7 @@ func (j *sessionConnectionCleanupJob) Run(ctx context.Context) error {
j.totalClosed = 0

// Run the atomic dead worker cleanup job.
gracePeriod := time.Duration(j.gracePeriod.Load())
gracePeriod := time.Duration(j.workerStatusGracePeriod.Load())
results, err := j.closeConnectionsForDeadWorkers(ctx, gracePeriod)
if err != nil {
return errors.Wrap(ctx, err, op)
Expand Down
6 changes: 3 additions & 3 deletions internal/session/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ import (
const deleteTerminatedThreshold = time.Hour

// RegisterJobs registers session related jobs with the provided scheduler.
func RegisterJobs(ctx context.Context, scheduler *scheduler.Scheduler, w db.Writer, r db.Reader, k *kms.Kms, gracePeriod *atomic.Int64) error {
func RegisterJobs(ctx context.Context, scheduler *scheduler.Scheduler, w db.Writer, r db.Reader, k *kms.Kms, workerStatusGracePeriod *atomic.Int64) error {
const op = "session.RegisterJobs"

if gracePeriod == nil {
if workerStatusGracePeriod == nil {
return errors.New(ctx, errors.InvalidParameter, op, "nil grace period")
}

sessionConnectionCleanupJob, err := newSessionConnectionCleanupJob(ctx, w, gracePeriod)
sessionConnectionCleanupJob, err := newSessionConnectionCleanupJob(ctx, w, workerStatusGracePeriod)
if err != nil {
return fmt.Errorf("error creating session cleanup job: %w", err)
}
Expand Down

0 comments on commit ad415b6

Please sign in to comment.