Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Allow reloading some server timings #5071

Merged
merged 1 commit into from
Sep 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions internal/cmd/commands/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -859,6 +859,10 @@ func (c *Command) Reload(newConf *config.Config) error {
reloadErrors = stderrors.Join(reloadErrors, fmt.Errorf("failed to reload controller api rate limits: %w", err))
}

if err := c.reloadControllerTimings(newConf); err != nil {
reloadErrors = stderrors.Join(reloadErrors, fmt.Errorf("failed to reload controller timings: %w", err))
}

if newConf != nil && c.worker != nil {
workerReloadErr := func() error {
if newConf.Controller != nil {
Expand Down Expand Up @@ -977,6 +981,14 @@ func (c *Command) reloadControllerRateLimits(newConfig *config.Config) error {
return c.controller.ReloadRateLimiter(newConfig)
}

func (c *Command) reloadControllerTimings(newConfig *config.Config) error {
if c.controller == nil || newConfig == nil || newConfig.Controller == nil {
return nil
}

return c.controller.ReloadTimings(newConfig)
}

// acquireSchemaManager returns a schema manager and generally acquires a shared lock on
// the database. This is done as a mechanism to disallow running migration commands
// while the database is in use.
Expand Down
36 changes: 36 additions & 0 deletions internal/daemon/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -682,3 +682,39 @@ func (c *Controller) Shutdown() error {
func (c *Controller) WorkerStatusUpdateTimes() *sync.Map {
return c.workerStatusUpdateTimes
}

// ReloadTimings reloads timing related parameters
func (c *Controller) ReloadTimings(newConfig *config.Config) error {
const op = "controller.(Controller).ReloadTimings"

switch {
case newConfig == nil:
return errors.New(c.baseContext, errors.InvalidParameter, op, "nil config")
case newConfig.Controller == nil:
return errors.New(c.baseContext, errors.InvalidParameter, op, "nil config.Controller")
}

switch newConfig.Controller.WorkerStatusGracePeriodDuration {
case 0:
c.workerStatusGracePeriod.Store(int64(server.DefaultLiveness))
default:
c.workerStatusGracePeriod.Store(int64(newConfig.Controller.WorkerStatusGracePeriodDuration))
}
switch newConfig.Controller.LivenessTimeToStaleDuration {
case 0:
c.livenessTimeToStale.Store(int64(server.DefaultLiveness))
default:
c.livenessTimeToStale.Store(int64(newConfig.Controller.LivenessTimeToStaleDuration))
}

switch newConfig.Controller.GetDownstreamWorkersTimeoutDuration {
case 0:
to := server.DefaultLiveness
c.getDownstreamWorkersTimeout.Store(&to)
default:
to := newConfig.Controller.GetDownstreamWorkersTimeoutDuration
c.getDownstreamWorkersTimeout.Store(&to)
}

return nil
}
23 changes: 23 additions & 0 deletions internal/daemon/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,29 @@ func (w *Worker) Reload(ctx context.Context, newConf *config.Config) {
ar.InitialAddresses(w.conf.RawConfig.Worker.InitialUpstreams)
}
}

switch newConf.Worker.SuccessfulStatusGracePeriodDuration {
case 0:
w.successfulStatusGracePeriod.Store(int64(server.DefaultLiveness))
default:
w.successfulStatusGracePeriod.Store(int64(newConf.Worker.SuccessfulStatusGracePeriodDuration))
}
switch newConf.Worker.StatusCallTimeoutDuration {
case 0:
w.statusCallTimeoutDuration.Store(int64(common.DefaultStatusTimeout))
default:
w.statusCallTimeoutDuration.Store(int64(newConf.Worker.StatusCallTimeoutDuration))
}
switch newConf.Worker.GetDownstreamWorkersTimeoutDuration {
case 0:
to := server.DefaultLiveness
w.getDownstreamWorkersTimeoutDuration.Store(&to)
default:
to := newConf.Worker.GetDownstreamWorkersTimeoutDuration
w.getDownstreamWorkersTimeoutDuration.Store(&to)
}
// See comment about this in worker.go
session.CloseCallTimeout.Store(w.successfulStatusGracePeriod.Load())
}

func (w *Worker) Start() error {
Expand Down
14 changes: 7 additions & 7 deletions internal/session/job_session_cleanup.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ type sessionConnectionCleanupJob struct {
// The amount of time to give disconnected workers before marking their
// connections as closed. This should be larger than the liveness setting
// for the worker.
gracePeriod *atomic.Int64
workerStatusGracePeriod *atomic.Int64

// The total number of connections closed in the last run.
totalClosed int
Expand All @@ -47,21 +47,21 @@ type sessionConnectionCleanupJob struct {
func newSessionConnectionCleanupJob(
ctx context.Context,
writer db.Writer,
gracePeriod *atomic.Int64,
workerStatusGracePeriod *atomic.Int64,
) (*sessionConnectionCleanupJob, error) {
const op = "session.newNewSessionConnectionCleanupJob"
switch {
case writer == nil:
return nil, errors.New(ctx, errors.InvalidParameter, op, "missing db writer")
case gracePeriod == nil:
case workerStatusGracePeriod == nil:
return nil, errors.New(ctx, errors.InvalidParameter, op, "missing grace period")
case gracePeriod.Load() == 0:
case workerStatusGracePeriod.Load() == 0:
return nil, errors.New(ctx, errors.InvalidParameter, op, "grace period is zero")
}

return &sessionConnectionCleanupJob{
writer: writer,
gracePeriod: gracePeriod,
writer: writer,
workerStatusGracePeriod: workerStatusGracePeriod,
}, nil
}

Expand Down Expand Up @@ -98,7 +98,7 @@ func (j *sessionConnectionCleanupJob) Run(ctx context.Context) error {
j.totalClosed = 0

// Run the atomic dead worker cleanup job.
gracePeriod := time.Duration(j.gracePeriod.Load())
gracePeriod := time.Duration(j.workerStatusGracePeriod.Load())
results, err := j.closeConnectionsForDeadWorkers(ctx, gracePeriod)
if err != nil {
return errors.Wrap(ctx, err, op)
Expand Down
6 changes: 3 additions & 3 deletions internal/session/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,14 @@ import (
const deleteTerminatedThreshold = time.Hour

// RegisterJobs registers session related jobs with the provided scheduler.
func RegisterJobs(ctx context.Context, scheduler *scheduler.Scheduler, w db.Writer, r db.Reader, k *kms.Kms, gracePeriod *atomic.Int64) error {
func RegisterJobs(ctx context.Context, scheduler *scheduler.Scheduler, w db.Writer, r db.Reader, k *kms.Kms, workerStatusGracePeriod *atomic.Int64) error {
const op = "session.RegisterJobs"

if gracePeriod == nil {
if workerStatusGracePeriod == nil {
return errors.New(ctx, errors.InvalidParameter, op, "nil grace period")
}

sessionConnectionCleanupJob, err := newSessionConnectionCleanupJob(ctx, w, gracePeriod)
sessionConnectionCleanupJob, err := newSessionConnectionCleanupJob(ctx, w, workerStatusGracePeriod)
if err != nil {
return fmt.Errorf("error creating session cleanup job: %w", err)
}
Expand Down
Loading